kcg-monitoring/prediction/db/snpdb.py
htlee 23828c742e refactor: codex 이식 완료 — 환경변수 동적화 + @Table schema 제거 + import 정리
- Backend: @Table(schema="kcg") 하드코딩 제거 → application.yml default_schema 활용
- Backend: application.yml/prod.yml 환경변수 ${} 패턴 전환
- Backend: WebConfig CORS 5174 포트 추가
- Frontend: tsconfig resolveJsonModule 추가
- Prediction: scheduler/snpdb/vessel_store import 위치 + 주석 codex 동기화

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 01:17:57 +09:00

211 lines
6.5 KiB
Python

import logging
from contextlib import contextmanager
from datetime import datetime
from typing import Optional
import pandas as pd
import psycopg2
from psycopg2 import pool
from config import settings
from time_bucket import compute_incremental_window_start, compute_initial_window_start, compute_safe_bucket
logger = logging.getLogger(__name__)
_pool: Optional[pool.ThreadedConnectionPool] = None
def init_pool():
global _pool
_pool = pool.ThreadedConnectionPool(
minconn=1,
maxconn=3,
host=settings.SNPDB_HOST,
port=settings.SNPDB_PORT,
dbname=settings.SNPDB_NAME,
user=settings.SNPDB_USER,
password=settings.SNPDB_PASSWORD,
)
logger.info('snpdb connection pool initialized')
def close_pool():
global _pool
if _pool:
_pool.closeall()
_pool = None
logger.info('snpdb connection pool closed')
@contextmanager
def get_conn():
conn = _pool.getconn()
try:
yield conn
finally:
_pool.putconn(conn)
def check_health() -> bool:
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute('SELECT 1')
return True
except Exception as e:
logger.error('snpdb health check failed: %s', e)
return False
def fetch_all_tracks(hours: int = 24) -> pd.DataFrame:
"""한국 해역 전 선박의 궤적 포인트를 조회한다.
LineStringM 지오메트리에서 개별 포인트를 추출하며,
한국 해역(122-132E, 31-39N) 내 최근 N시간 데이터를 반환한다.
"""
safe_bucket = compute_safe_bucket()
window_start = compute_initial_window_start(hours, safe_bucket)
query = """
SELECT
t.mmsi,
to_timestamp(ST_M((dp).geom)) as timestamp,
t.time_bucket,
ST_Y((dp).geom) as lat,
ST_X((dp).geom) as lon,
CASE
WHEN (dp).path[1] = 1 THEN (t.start_position->>'sog')::float
ELSE COALESCE((t.end_position->>'sog')::float, t.avg_speed::float)
END as raw_sog
FROM signal.t_vessel_tracks_5min t,
LATERAL ST_DumpPoints(t.track_geom) dp
WHERE t.time_bucket >= %s
AND t.time_bucket <= %s
AND t.track_geom && ST_MakeEnvelope(122, 31, 132, 39, 4326)
ORDER BY t.mmsi, to_timestamp(ST_M((dp).geom))
"""
try:
with get_conn() as conn:
df = pd.read_sql_query(query, conn, params=(window_start, safe_bucket))
logger.info(
'fetch_all_tracks: %d rows, %d vessels (window=%s..%s, last %dh safe)',
len(df),
df['mmsi'].nunique() if len(df) > 0 else 0,
window_start,
safe_bucket,
hours,
)
return df
except Exception as e:
logger.error('fetch_all_tracks failed: %s', e)
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog'])
def fetch_incremental(last_bucket: datetime) -> pd.DataFrame:
"""last_bucket 이후의 신규 궤적 포인트를 조회한다.
스케줄러 증분 업데이트에 사용되며, time_bucket > last_bucket 조건으로
이미 처리한 버킷을 건너뛴다.
"""
safe_bucket = compute_safe_bucket()
from_bucket = compute_incremental_window_start(last_bucket)
if safe_bucket <= from_bucket:
logger.info(
'fetch_incremental skipped: safe_bucket=%s, from_bucket=%s, last_bucket=%s',
safe_bucket,
from_bucket,
last_bucket,
)
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog'])
query = """
SELECT
t.mmsi,
to_timestamp(ST_M((dp).geom)) as timestamp,
t.time_bucket,
ST_Y((dp).geom) as lat,
ST_X((dp).geom) as lon,
CASE
WHEN (dp).path[1] = 1 THEN (t.start_position->>'sog')::float
ELSE COALESCE((t.end_position->>'sog')::float, t.avg_speed::float)
END as raw_sog
FROM signal.t_vessel_tracks_5min t,
LATERAL ST_DumpPoints(t.track_geom) dp
WHERE t.time_bucket > %s
AND t.time_bucket <= %s
AND t.track_geom && ST_MakeEnvelope(122, 31, 132, 39, 4326)
ORDER BY t.mmsi, to_timestamp(ST_M((dp).geom))
"""
try:
with get_conn() as conn:
df = pd.read_sql_query(query, conn, params=(from_bucket, safe_bucket))
logger.info(
'fetch_incremental: %d rows, %d vessels (from %s, safe %s, last %s)',
len(df),
df['mmsi'].nunique() if len(df) > 0 else 0,
from_bucket.isoformat(),
safe_bucket.isoformat(),
last_bucket.isoformat(),
)
return df
except Exception as e:
logger.error('fetch_incremental failed: %s', e)
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog'])
def fetch_static_info(mmsi_list: list[str]) -> dict[str, dict]:
"""MMSI 목록에 해당하는 선박 정적 정보를 조회한다.
DISTINCT ON (mmsi)로 최신 레코드만 반환한다.
"""
query = """
SELECT DISTINCT ON (mmsi) mmsi, name, vessel_type, length, width
FROM signal.t_vessel_static
WHERE mmsi = ANY(%s)
ORDER BY mmsi, time_bucket DESC
"""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(query, (mmsi_list,))
rows = cur.fetchall()
result = {
row[0]: {
'name': row[1],
'vessel_type': row[2],
'length': row[3],
'width': row[4],
}
for row in rows
}
logger.info('fetch_static_info: %d vessels resolved', len(result))
return result
except Exception as e:
logger.error('fetch_static_info failed: %s', e)
return {}
def fetch_permit_mmsis() -> set[str]:
"""중국 허가어선 MMSI 목록을 조회한다.
signal.t_chnprmship_positions 테이블에서 DISTINCT mmsi를 반환한다.
"""
query = """
SELECT DISTINCT mmsi FROM signal.t_chnprmship_positions
"""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
result = {row[0] for row in rows}
logger.info('fetch_permit_mmsis: %d permitted vessels', len(result))
return result
except Exception as e:
logger.error('fetch_permit_mmsis failed: %s', e)
return set()