import logging from contextlib import contextmanager from datetime import datetime from typing import Optional import pandas as pd import psycopg2 from psycopg2 import pool from config import settings from time_bucket import compute_incremental_window_start, compute_initial_window_start, compute_safe_bucket logger = logging.getLogger(__name__) _pool: Optional[pool.ThreadedConnectionPool] = None def init_pool(): global _pool _pool = pool.ThreadedConnectionPool( minconn=1, maxconn=3, host=settings.SNPDB_HOST, port=settings.SNPDB_PORT, dbname=settings.SNPDB_NAME, user=settings.SNPDB_USER, password=settings.SNPDB_PASSWORD, ) logger.info('snpdb connection pool initialized') def close_pool(): global _pool if _pool: _pool.closeall() _pool = None logger.info('snpdb connection pool closed') @contextmanager def get_conn(): conn = _pool.getconn() try: yield conn finally: _pool.putconn(conn) def check_health() -> bool: try: with get_conn() as conn: with conn.cursor() as cur: cur.execute('SELECT 1') return True except Exception as e: logger.error('snpdb health check failed: %s', e) return False def fetch_all_tracks(hours: int = 24) -> pd.DataFrame: """한국 해역 전 선박의 궤적 포인트를 조회한다. LineStringM 지오메트리에서 개별 포인트를 추출하며, 한국 해역(122-132E, 31-39N) 내 최근 N시간 데이터를 반환한다. """ safe_bucket = compute_safe_bucket() window_start = compute_initial_window_start(hours, safe_bucket) query = """ SELECT t.mmsi, to_timestamp(ST_M((dp).geom)) as timestamp, t.time_bucket, ST_Y((dp).geom) as lat, ST_X((dp).geom) as lon, CASE WHEN (dp).path[1] = 1 THEN (t.start_position->>'sog')::float ELSE COALESCE((t.end_position->>'sog')::float, t.avg_speed::float) END as raw_sog FROM signal.t_vessel_tracks_5min t, LATERAL ST_DumpPoints(t.track_geom) dp WHERE t.time_bucket >= %s AND t.time_bucket <= %s AND t.track_geom && ST_MakeEnvelope(122, 31, 132, 39, 4326) ORDER BY t.mmsi, to_timestamp(ST_M((dp).geom)) """ try: with get_conn() as conn: df = pd.read_sql_query(query, conn, params=(window_start, safe_bucket)) logger.info( 'fetch_all_tracks: %d rows, %d vessels (window=%s..%s, last %dh safe)', len(df), df['mmsi'].nunique() if len(df) > 0 else 0, window_start, safe_bucket, hours, ) return df except Exception as e: logger.error('fetch_all_tracks failed: %s', e) return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog']) def fetch_incremental(last_bucket: datetime) -> pd.DataFrame: """last_bucket 이후의 신규 궤적 포인트를 조회한다. 스케줄러 증분 업데이트에 사용되며, time_bucket > last_bucket 조건으로 이미 처리한 버킷을 건너뛴다. """ safe_bucket = compute_safe_bucket() from_bucket = compute_incremental_window_start(last_bucket) if safe_bucket <= from_bucket: logger.info( 'fetch_incremental skipped: safe_bucket=%s, from_bucket=%s, last_bucket=%s', safe_bucket, from_bucket, last_bucket, ) return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog']) query = """ SELECT t.mmsi, to_timestamp(ST_M((dp).geom)) as timestamp, t.time_bucket, ST_Y((dp).geom) as lat, ST_X((dp).geom) as lon, CASE WHEN (dp).path[1] = 1 THEN (t.start_position->>'sog')::float ELSE COALESCE((t.end_position->>'sog')::float, t.avg_speed::float) END as raw_sog FROM signal.t_vessel_tracks_5min t, LATERAL ST_DumpPoints(t.track_geom) dp WHERE t.time_bucket > %s AND t.time_bucket <= %s AND t.track_geom && ST_MakeEnvelope(122, 31, 132, 39, 4326) ORDER BY t.mmsi, to_timestamp(ST_M((dp).geom)) """ try: with get_conn() as conn: df = pd.read_sql_query(query, conn, params=(from_bucket, safe_bucket)) logger.info( 'fetch_incremental: %d rows, %d vessels (from %s, safe %s, last %s)', len(df), df['mmsi'].nunique() if len(df) > 0 else 0, from_bucket.isoformat(), safe_bucket.isoformat(), last_bucket.isoformat(), ) return df except Exception as e: logger.error('fetch_incremental failed: %s', e) return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog']) def fetch_static_info(mmsi_list: list[str]) -> dict[str, dict]: """MMSI 목록에 해당하는 선박 정적 정보를 조회한다. DISTINCT ON (mmsi)로 최신 레코드만 반환한다. """ query = """ SELECT DISTINCT ON (mmsi) mmsi, name, vessel_type, length, width FROM signal.t_vessel_static WHERE mmsi = ANY(%s) ORDER BY mmsi, time_bucket DESC """ try: with get_conn() as conn: with conn.cursor() as cur: cur.execute(query, (mmsi_list,)) rows = cur.fetchall() result = { row[0]: { 'name': row[1], 'vessel_type': row[2], 'length': row[3], 'width': row[4], } for row in rows } logger.info('fetch_static_info: %d vessels resolved', len(result)) return result except Exception as e: logger.error('fetch_static_info failed: %s', e) return {} def fetch_permit_mmsis() -> set[str]: """중국 허가어선 MMSI 목록을 조회한다. signal.t_chnprmship_positions 테이블에서 DISTINCT mmsi를 반환한다. """ query = """ SELECT DISTINCT mmsi FROM signal.t_chnprmship_positions """ try: with get_conn() as conn: with conn.cursor() as cur: cur.execute(query) rows = cur.fetchall() result = {row[0] for row in rows} logger.info('fetch_permit_mmsis: %d permitted vessels', len(result)) return result except Exception as e: logger.error('fetch_permit_mmsis failed: %s', e) return set()