kcg-monitoring/prediction/db/snpdb.py
htlee a68dfb21b2 feat: Python 어선 분류기 + 배포 설정 + 백엔드 모니터링 프록시
- prediction/: FastAPI 7단계 분류 파이프라인 + 6개 탐지 알고리즘
  - snpdb 궤적 조회 → 인메모리 캐시(13K척) → 분류 → kcgdb 저장
  - APScheduler 5분 주기, Python 3.9 호환
  - 버그 수정: @property last_bucket, SQL INTERVAL 바인딩, rollback, None 가드
  - 보안: DB 비밀번호 하드코딩 제거 → env 환경변수 필수
- deploy/kcg-prediction.service: systemd 서비스 (redis-211, 포트 8001)
- deploy.yml: prediction CI/CD 배포 단계 추가 (192.168.1.18:32023)
- backend: PredictionProxyController (health/status/trigger 프록시)
- backend: AppProperties predictionBaseUrl + AuthFilter 인증 예외

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 12:10:21 +09:00

188 lines
5.6 KiB
Python

import logging
from contextlib import contextmanager
from datetime import datetime
from typing import Optional
import pandas as pd
import psycopg2
from psycopg2 import pool
from config import settings
logger = logging.getLogger(__name__)
_pool: Optional[pool.ThreadedConnectionPool] = None
def init_pool():
global _pool
_pool = pool.ThreadedConnectionPool(
minconn=1,
maxconn=3,
host=settings.SNPDB_HOST,
port=settings.SNPDB_PORT,
dbname=settings.SNPDB_NAME,
user=settings.SNPDB_USER,
password=settings.SNPDB_PASSWORD,
)
logger.info('snpdb connection pool initialized')
def close_pool():
global _pool
if _pool:
_pool.closeall()
_pool = None
logger.info('snpdb connection pool closed')
@contextmanager
def get_conn():
conn = _pool.getconn()
try:
yield conn
finally:
_pool.putconn(conn)
def check_health() -> bool:
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute('SELECT 1')
return True
except Exception as e:
logger.error('snpdb health check failed: %s', e)
return False
def fetch_all_tracks(hours: int = 24) -> pd.DataFrame:
"""한국 해역 전 선박의 궤적 포인트를 조회한다.
LineStringM 지오메트리에서 개별 포인트를 추출하며,
한국 해역(124-132E, 32-39N) 내 최근 N시간 데이터를 반환한다.
"""
query = f"""
SELECT
t.mmsi,
to_timestamp(ST_M((dp).geom)) as timestamp,
ST_Y((dp).geom) as lat,
ST_X((dp).geom) as lon,
CASE
WHEN (dp).path[1] = 1 THEN (t.start_position->>'sog')::float
ELSE COALESCE((t.end_position->>'sog')::float, t.avg_speed::float)
END as raw_sog
FROM signal.t_vessel_tracks_5min t,
LATERAL ST_DumpPoints(t.track_geom) dp
WHERE t.time_bucket >= NOW() - INTERVAL '{hours} hours'
AND t.track_geom && ST_MakeEnvelope(124, 32, 132, 39, 4326)
ORDER BY t.mmsi, to_timestamp(ST_M((dp).geom))
"""
try:
with get_conn() as conn:
df = pd.read_sql_query(query, conn)
logger.info(
'fetch_all_tracks: %d rows, %d vessels (last %dh)',
len(df),
df['mmsi'].nunique() if len(df) > 0 else 0,
hours,
)
return df
except Exception as e:
logger.error('fetch_all_tracks failed: %s', e)
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog'])
def fetch_incremental(last_bucket: datetime) -> pd.DataFrame:
"""last_bucket 이후의 신규 궤적 포인트를 조회한다.
스케줄러 증분 업데이트에 사용되며, time_bucket > last_bucket 조건으로
이미 처리한 버킷을 건너뛴다.
"""
query = """
SELECT
t.mmsi,
to_timestamp(ST_M((dp).geom)) as timestamp,
ST_Y((dp).geom) as lat,
ST_X((dp).geom) as lon,
CASE
WHEN (dp).path[1] = 1 THEN (t.start_position->>'sog')::float
ELSE COALESCE((t.end_position->>'sog')::float, t.avg_speed::float)
END as raw_sog
FROM signal.t_vessel_tracks_5min t,
LATERAL ST_DumpPoints(t.track_geom) dp
WHERE t.time_bucket > %s
AND t.track_geom && ST_MakeEnvelope(124, 32, 132, 39, 4326)
ORDER BY t.mmsi, to_timestamp(ST_M((dp).geom))
"""
try:
with get_conn() as conn:
df = pd.read_sql_query(query, conn, params=(last_bucket,))
logger.info(
'fetch_incremental: %d rows, %d vessels (since %s)',
len(df),
df['mmsi'].nunique() if len(df) > 0 else 0,
last_bucket.isoformat(),
)
return df
except Exception as e:
logger.error('fetch_incremental failed: %s', e)
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'raw_sog'])
def fetch_static_info(mmsi_list: list[str]) -> dict[str, dict]:
"""MMSI 목록에 해당하는 선박 정적 정보를 조회한다.
DISTINCT ON (mmsi)로 최신 레코드만 반환한다.
"""
query = """
SELECT DISTINCT ON (mmsi) mmsi, name, vessel_type, length, width
FROM signal.t_vessel_static
WHERE mmsi = ANY(%s)
ORDER BY mmsi, time_bucket DESC
"""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(query, (mmsi_list,))
rows = cur.fetchall()
result = {
row[0]: {
'name': row[1],
'vessel_type': row[2],
'length': row[3],
'width': row[4],
}
for row in rows
}
logger.info('fetch_static_info: %d vessels resolved', len(result))
return result
except Exception as e:
logger.error('fetch_static_info failed: %s', e)
return {}
def fetch_permit_mmsis() -> set[str]:
"""중국 허가어선 MMSI 목록을 조회한다.
signal.t_chnprmship_positions 테이블에서 DISTINCT mmsi를 반환한다.
"""
query = """
SELECT DISTINCT mmsi FROM signal.t_chnprmship_positions
"""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
result = {row[0] for row in rows}
logger.info('fetch_permit_mmsis: %d permitted vessels', len(result))
return result
except Exception as e:
logger.error('fetch_permit_mmsis failed: %s', e)
return set()