kcg-monitoring/prediction/db/snpdb.py
htlee 2534c9dbca fix: time_bucket 수집 안전 윈도우 도입 — incremental fetch 데이터 누락 방지
snpdb 5분 버킷 데이터가 적재 완료까지 ~12분 소요되는데,
기존 fetch_incremental이 상한 없이 미완성 버킷을 수집하여
_last_bucket이 조기 전진 → 뒤늦게 완성된 행 영구 누락.

- time_bucket.py 신규: safe_bucket(12분 지연) + backfill(3 bucket)
- snpdb.py: fetch_all_tracks/fetch_incremental에 safe 상한 + 백필 하한
- vessel_store.py: merge_incremental sort+keep='last', evict_stale time_bucket 우선
- config.py: SNPDB_SAFE_DELAY_MIN=12, SNPDB_BACKFILL_BUCKETS=3

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 15:11:20 +09:00

212 lines
6.6 KiB
Python

import logging
from contextlib import contextmanager
from datetime import datetime
from typing import Optional
import pandas as pd
import psycopg2
from psycopg2 import pool
from config import settings
from time_bucket import compute_incremental_window_start, compute_initial_window_start, compute_safe_bucket
logger = logging.getLogger(__name__)
_pool: Optional[pool.ThreadedConnectionPool] = None
def init_pool():
global _pool
_pool = pool.ThreadedConnectionPool(
minconn=1,
maxconn=3,
host=settings.SNPDB_HOST,
port=settings.SNPDB_PORT,
dbname=settings.SNPDB_NAME,
user=settings.SNPDB_USER,
password=settings.SNPDB_PASSWORD,
)
logger.info('snpdb connection pool initialized')
def close_pool():
global _pool
if _pool:
_pool.closeall()
_pool = None
logger.info('snpdb connection pool closed')
@contextmanager
def get_conn():
conn = _pool.getconn()
try:
yield conn
finally:
_pool.putconn(conn)
def check_health() -> bool:
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute('SELECT 1')
return True
except Exception as e:
logger.error('snpdb health check failed: %s', e)
return False
def fetch_all_tracks(hours: int = 24) -> pd.DataFrame:
"""한국 해역 전 선박의 궤적 포인트를 조회한다.
LineStringM 지오메트리에서 개별 포인트를 추출하며,
한국 해역(122-132E, 31-39N) 내 최근 N시간 데이터를 반환한다.
safe_bucket(현재 - 12분)까지만 조회하여 미완성 버킷을 제외한다.
"""
safe_bucket = compute_safe_bucket()
window_start = compute_initial_window_start(hours, safe_bucket)
query = """
SELECT
t.mmsi,
to_timestamp(ST_M((dp).geom)) as timestamp,
t.time_bucket,
ST_Y((dp).geom) as lat,
ST_X((dp).geom) as lon,
CASE
WHEN (dp).path[1] = 1 THEN (t.start_position->>'sog')::float
ELSE COALESCE((t.end_position->>'sog')::float, t.avg_speed::float)
END as raw_sog
FROM signal.t_vessel_tracks_5min t,
LATERAL ST_DumpPoints(t.track_geom) dp
WHERE t.time_bucket >= %s
AND t.time_bucket <= %s
AND t.track_geom && ST_MakeEnvelope(122, 31, 132, 39, 4326)
ORDER BY t.mmsi, to_timestamp(ST_M((dp).geom))
"""
try:
with get_conn() as conn:
df = pd.read_sql_query(query, conn, params=(window_start, safe_bucket))
logger.info(
'fetch_all_tracks: %d rows, %d vessels (window=%s..%s, last %dh safe)',
len(df),
df['mmsi'].nunique() if len(df) > 0 else 0,
window_start,
safe_bucket,
hours,
)
return df
except Exception as e:
logger.error('fetch_all_tracks failed: %s', e)
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog'])
def fetch_incremental(last_bucket: datetime) -> pd.DataFrame:
"""last_bucket 이후의 신규 궤적 포인트를 조회한다.
safe_bucket(현재 - 12분)까지만 조회하여 미완성 버킷을 제외하고,
from_bucket(last_bucket - 15분)부터 재수집하여 지연 INSERT를 보상한다.
"""
safe_bucket = compute_safe_bucket()
from_bucket = compute_incremental_window_start(last_bucket)
if safe_bucket <= from_bucket:
logger.info(
'fetch_incremental skipped: safe_bucket=%s, from_bucket=%s, last_bucket=%s',
safe_bucket,
from_bucket,
last_bucket,
)
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog'])
query = """
SELECT
t.mmsi,
to_timestamp(ST_M((dp).geom)) as timestamp,
t.time_bucket,
ST_Y((dp).geom) as lat,
ST_X((dp).geom) as lon,
CASE
WHEN (dp).path[1] = 1 THEN (t.start_position->>'sog')::float
ELSE COALESCE((t.end_position->>'sog')::float, t.avg_speed::float)
END as raw_sog
FROM signal.t_vessel_tracks_5min t,
LATERAL ST_DumpPoints(t.track_geom) dp
WHERE t.time_bucket > %s
AND t.time_bucket <= %s
AND t.track_geom && ST_MakeEnvelope(122, 31, 132, 39, 4326)
ORDER BY t.mmsi, to_timestamp(ST_M((dp).geom))
"""
try:
with get_conn() as conn:
df = pd.read_sql_query(query, conn, params=(from_bucket, safe_bucket))
logger.info(
'fetch_incremental: %d rows, %d vessels (from %s, safe %s, last %s)',
len(df),
df['mmsi'].nunique() if len(df) > 0 else 0,
from_bucket.isoformat(),
safe_bucket.isoformat(),
last_bucket.isoformat(),
)
return df
except Exception as e:
logger.error('fetch_incremental failed: %s', e)
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog'])
def fetch_static_info(mmsi_list: list[str]) -> dict[str, dict]:
"""MMSI 목록에 해당하는 선박 정적 정보를 조회한다.
DISTINCT ON (mmsi)로 최신 레코드만 반환한다.
"""
query = """
SELECT DISTINCT ON (mmsi) mmsi, name, vessel_type, length, width
FROM signal.t_vessel_static
WHERE mmsi = ANY(%s)
ORDER BY mmsi, time_bucket DESC
"""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(query, (mmsi_list,))
rows = cur.fetchall()
result = {
row[0]: {
'name': row[1],
'vessel_type': row[2],
'length': row[3],
'width': row[4],
}
for row in rows
}
logger.info('fetch_static_info: %d vessels resolved', len(result))
return result
except Exception as e:
logger.error('fetch_static_info failed: %s', e)
return {}
def fetch_permit_mmsis() -> set[str]:
"""중국 허가어선 MMSI 목록을 조회한다.
signal.t_chnprmship_positions 테이블에서 DISTINCT mmsi를 반환한다.
"""
query = """
SELECT DISTINCT mmsi FROM signal.t_chnprmship_positions
"""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
result = {row[0] for row in rows}
logger.info('fetch_permit_mmsis: %d permitted vessels', len(result))
return result
except Exception as e:
logger.error('fetch_permit_mmsis failed: %s', e)
return set()