kcg-ai-monitoring/prediction/db/snpdb.py
htlee e2fc355b2c feat: S2 prediction 분석 엔진 모노레포 이식
iran prediction 47개 Python 파일을 prediction/ 디렉토리로 복제:
- algorithms/ 14개 분석 알고리즘 (어구추론, 다크베셀, 스푸핑, 환적, 위험도 등)
- pipeline/ 7단계 분류 파이프라인
- cache/vessel_store (24h 슬라이딩 윈도우)
- db/ 어댑터 (snpdb 원본조회, kcgdb 결과저장)
- chat/ AI 채팅 (Ollama, 후순위)
- data/ 정적 데이터 (기선, 특정어업수역 GeoJSON)

config.py를 kcgaidb로 재구성 (DB명, 사용자, 비밀번호)
DB 연결 검증 완료 (kcgaidb 37개 테이블 접근 확인)
Makefile에 dev-prediction / dev-all 타겟 추가
CLAUDE.md에 prediction 섹션 추가

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 12:56:51 +09:00

211 lines
6.5 KiB
Python

import logging
from contextlib import contextmanager
from datetime import datetime
from typing import Optional
import pandas as pd
import psycopg2
from psycopg2 import pool
from config import settings
from time_bucket import compute_incremental_window_start, compute_initial_window_start, compute_safe_bucket
logger = logging.getLogger(__name__)
_pool: Optional[pool.ThreadedConnectionPool] = None
def init_pool():
global _pool
_pool = pool.ThreadedConnectionPool(
minconn=1,
maxconn=3,
host=settings.SNPDB_HOST,
port=settings.SNPDB_PORT,
dbname=settings.SNPDB_NAME,
user=settings.SNPDB_USER,
password=settings.SNPDB_PASSWORD,
)
logger.info('snpdb connection pool initialized')
def close_pool():
global _pool
if _pool:
_pool.closeall()
_pool = None
logger.info('snpdb connection pool closed')
@contextmanager
def get_conn():
conn = _pool.getconn()
try:
yield conn
finally:
_pool.putconn(conn)
def check_health() -> bool:
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute('SELECT 1')
return True
except Exception as e:
logger.error('snpdb health check failed: %s', e)
return False
def fetch_all_tracks(hours: int = 24) -> pd.DataFrame:
"""한국 해역 전 선박의 궤적 포인트를 조회한다.
LineStringM 지오메트리에서 개별 포인트를 추출하며,
한국 해역(122-132E, 31-39N) 내 최근 N시간 데이터를 반환한다.
"""
safe_bucket = compute_safe_bucket()
window_start = compute_initial_window_start(hours, safe_bucket)
query = """
SELECT
t.mmsi,
to_timestamp(ST_M((dp).geom)) as timestamp,
t.time_bucket,
ST_Y((dp).geom) as lat,
ST_X((dp).geom) as lon,
CASE
WHEN (dp).path[1] = 1 THEN (t.start_position->>'sog')::float
ELSE COALESCE((t.end_position->>'sog')::float, t.avg_speed::float)
END as raw_sog
FROM signal.t_vessel_tracks_5min t,
LATERAL ST_DumpPoints(t.track_geom) dp
WHERE t.time_bucket >= %s
AND t.time_bucket <= %s
AND t.track_geom && ST_MakeEnvelope(122, 31, 132, 39, 4326)
ORDER BY t.mmsi, to_timestamp(ST_M((dp).geom))
"""
try:
with get_conn() as conn:
df = pd.read_sql_query(query, conn, params=(window_start, safe_bucket))
logger.info(
'fetch_all_tracks: %d rows, %d vessels (window=%s..%s, last %dh safe)',
len(df),
df['mmsi'].nunique() if len(df) > 0 else 0,
window_start,
safe_bucket,
hours,
)
return df
except Exception as e:
logger.error('fetch_all_tracks failed: %s', e)
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog'])
def fetch_incremental(last_bucket: datetime) -> pd.DataFrame:
"""last_bucket 이후의 신규 궤적 포인트를 조회한다.
스케줄러 증분 업데이트에 사용되며, time_bucket > last_bucket 조건으로
이미 처리한 버킷을 건너뛴다.
"""
safe_bucket = compute_safe_bucket()
from_bucket = compute_incremental_window_start(last_bucket)
if safe_bucket <= from_bucket:
logger.info(
'fetch_incremental skipped: safe_bucket=%s, from_bucket=%s, last_bucket=%s',
safe_bucket,
from_bucket,
last_bucket,
)
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog'])
query = """
SELECT
t.mmsi,
to_timestamp(ST_M((dp).geom)) as timestamp,
t.time_bucket,
ST_Y((dp).geom) as lat,
ST_X((dp).geom) as lon,
CASE
WHEN (dp).path[1] = 1 THEN (t.start_position->>'sog')::float
ELSE COALESCE((t.end_position->>'sog')::float, t.avg_speed::float)
END as raw_sog
FROM signal.t_vessel_tracks_5min t,
LATERAL ST_DumpPoints(t.track_geom) dp
WHERE t.time_bucket > %s
AND t.time_bucket <= %s
AND t.track_geom && ST_MakeEnvelope(122, 31, 132, 39, 4326)
ORDER BY t.mmsi, to_timestamp(ST_M((dp).geom))
"""
try:
with get_conn() as conn:
df = pd.read_sql_query(query, conn, params=(from_bucket, safe_bucket))
logger.info(
'fetch_incremental: %d rows, %d vessels (from %s, safe %s, last %s)',
len(df),
df['mmsi'].nunique() if len(df) > 0 else 0,
from_bucket.isoformat(),
safe_bucket.isoformat(),
last_bucket.isoformat(),
)
return df
except Exception as e:
logger.error('fetch_incremental failed: %s', e)
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog'])
def fetch_static_info(mmsi_list: list[str]) -> dict[str, dict]:
"""MMSI 목록에 해당하는 선박 정적 정보를 조회한다.
DISTINCT ON (mmsi)로 최신 레코드만 반환한다.
"""
query = """
SELECT DISTINCT ON (mmsi) mmsi, name, vessel_type, length, width
FROM signal.t_vessel_static
WHERE mmsi = ANY(%s)
ORDER BY mmsi, time_bucket DESC
"""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(query, (mmsi_list,))
rows = cur.fetchall()
result = {
row[0]: {
'name': row[1],
'vessel_type': row[2],
'length': row[3],
'width': row[4],
}
for row in rows
}
logger.info('fetch_static_info: %d vessels resolved', len(result))
return result
except Exception as e:
logger.error('fetch_static_info failed: %s', e)
return {}
def fetch_permit_mmsis() -> set[str]:
"""중국 허가어선 MMSI 목록을 조회한다.
signal.t_chnprmship_positions 테이블에서 DISTINCT mmsi를 반환한다.
"""
query = """
SELECT DISTINCT mmsi FROM signal.t_chnprmship_positions
"""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
result = {row[0] for row in rows}
logger.info('fetch_permit_mmsis: %d permitted vessels', len(result))
return result
except Exception as e:
logger.error('fetch_permit_mmsis failed: %s', e)
return set()