kcg-ai-monitoring/prediction/db/kcgdb.py
htlee 14eb4c7ea3 feat(prediction): vessel_analysis_results 에 분석 시점 lat/lon 저장
AnalysisResult 에 lat/lon 필드 + to_db_tuple 반영 + upsert_results SQL
컬럼 추가. 분류 파이프라인(last_row) / 경량 분석(all_positions) 두 경로
모두 분석 시점의 선박 위치를 함께 기록해 프론트 미니맵에서 특이운항
판별 위치를 실제 항적 위에 표시할 수 있게 한다.

배포 후 첫 사이클 8173/8173 lat/lon non-null 확인.
2026-04-16 14:30:49 +09:00

336 lines
12 KiB
Python

import json
import logging
from contextlib import contextmanager
from typing import TYPE_CHECKING, Optional
import psycopg2
from psycopg2 import pool
from psycopg2.extras import execute_values
from config import qualified_table, settings
if TYPE_CHECKING:
from models.result import AnalysisResult
logger = logging.getLogger(__name__)
_pool: Optional[pool.ThreadedConnectionPool] = None
GROUP_POLYGON_SNAPSHOTS = qualified_table('group_polygon_snapshots')
def init_pool():
global _pool
_pool = pool.ThreadedConnectionPool(
minconn=1,
maxconn=5,
host=settings.KCGDB_HOST,
port=settings.KCGDB_PORT,
dbname=settings.KCGDB_NAME,
user=settings.KCGDB_USER,
password=settings.KCGDB_PASSWORD,
options=f'-c search_path={settings.KCGDB_SCHEMA},public',
)
logger.info('kcgdb connection pool initialized')
def close_pool():
global _pool
if _pool:
_pool.closeall()
_pool = None
logger.info('kcgdb connection pool closed')
@contextmanager
def get_conn():
conn = _pool.getconn()
try:
yield conn
except Exception:
conn.rollback()
raise
finally:
_pool.putconn(conn)
def check_health() -> bool:
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute('SELECT 1')
return True
except Exception as e:
logger.error('kcgdb health check failed: %s', e)
return False
def upsert_results(results: list['AnalysisResult']) -> int:
"""분석 결과를 vessel_analysis_results 테이블에 upsert."""
if not results:
return 0
insert_sql = """
INSERT INTO vessel_analysis_results (
mmsi, analyzed_at, vessel_type, confidence, fishing_pct,
cluster_id, season, lat, lon, zone_code, dist_to_baseline_nm, activity_state,
ucaf_score, ucft_score, is_dark, gap_duration_min,
spoofing_score, bd09_offset_m, speed_jump_count,
fleet_cluster_id, fleet_is_leader, fleet_role,
risk_score, risk_level,
transship_suspect, transship_pair_mmsi, transship_duration_min,
features,
gear_judgment,
gear_code
) VALUES %s
ON CONFLICT (mmsi, analyzed_at) DO UPDATE SET
vessel_type = EXCLUDED.vessel_type,
confidence = EXCLUDED.confidence,
fishing_pct = EXCLUDED.fishing_pct,
cluster_id = EXCLUDED.cluster_id,
season = EXCLUDED.season,
lat = EXCLUDED.lat,
lon = EXCLUDED.lon,
zone_code = EXCLUDED.zone_code,
dist_to_baseline_nm = EXCLUDED.dist_to_baseline_nm,
activity_state = EXCLUDED.activity_state,
ucaf_score = EXCLUDED.ucaf_score,
ucft_score = EXCLUDED.ucft_score,
is_dark = EXCLUDED.is_dark,
gap_duration_min = EXCLUDED.gap_duration_min,
spoofing_score = EXCLUDED.spoofing_score,
bd09_offset_m = EXCLUDED.bd09_offset_m,
speed_jump_count = EXCLUDED.speed_jump_count,
fleet_cluster_id = EXCLUDED.fleet_cluster_id,
fleet_is_leader = EXCLUDED.fleet_is_leader,
fleet_role = EXCLUDED.fleet_role,
risk_score = EXCLUDED.risk_score,
risk_level = EXCLUDED.risk_level,
transship_suspect = EXCLUDED.transship_suspect,
transship_pair_mmsi = EXCLUDED.transship_pair_mmsi,
transship_duration_min = EXCLUDED.transship_duration_min,
features = EXCLUDED.features,
gear_judgment = EXCLUDED.gear_judgment,
gear_code = EXCLUDED.gear_code
"""
try:
with get_conn() as conn:
with conn.cursor() as cur:
tuples = [r.to_db_tuple() for r in results]
execute_values(cur, insert_sql, tuples, page_size=100)
conn.commit()
count = len(tuples)
logger.info('upserted %d analysis results', count)
return count
except Exception as e:
logger.error('failed to upsert results: %s', e)
return 0
def cleanup_old(hours: int = 48) -> int:
"""오래된 분석 결과 삭제."""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
'DELETE FROM vessel_analysis_results WHERE analyzed_at < NOW() - (%s * INTERVAL \'1 hour\')',
(hours,),
)
deleted = cur.rowcount
conn.commit()
if deleted > 0:
logger.info('cleaned up %d old results (older than %dh)', deleted, hours)
return deleted
except Exception as e:
logger.error('failed to cleanup old results: %s', e)
return 0
def save_group_snapshots(snapshots: list[dict]) -> int:
"""group_polygon_snapshots에 폴리곤 스냅샷 배치 INSERT.
snapshots: polygon_builder.build_all_group_snapshots() 결과
각 항목은: group_type, group_key, group_label, snapshot_time,
polygon_wkt (str|None), center_wkt (str|None),
area_sq_nm, member_count, zone_id, zone_name,
members (list[dict]), color
"""
if not snapshots:
return 0
insert_sql = f"""
INSERT INTO {GROUP_POLYGON_SNAPSHOTS} (
group_type, group_key, group_label, sub_cluster_id, resolution, snapshot_time,
polygon, center_point, area_sq_nm, member_count,
zone_id, zone_name, members, color
) VALUES (
%s, %s, %s, %s, %s, %s,
ST_GeomFromText(%s, 4326), ST_GeomFromText(%s, 4326),
%s, %s, %s, %s, %s::jsonb, %s
)
"""
inserted = 0
try:
with get_conn() as conn:
with conn.cursor() as cur:
for s in snapshots:
cur.execute(
insert_sql,
(
s['group_type'],
s['group_key'],
s['group_label'],
s.get('sub_cluster_id', 0),
s.get('resolution', '6h'),
s['snapshot_time'],
s.get('polygon_wkt'),
s.get('center_wkt'),
s.get('area_sq_nm'),
s.get('member_count'),
s.get('zone_id'),
s.get('zone_name'),
json.dumps(s.get('members', []), ensure_ascii=False),
s.get('color'),
),
)
inserted += 1
conn.commit()
logger.info('saved %d group polygon snapshots', inserted)
return inserted
except Exception as e:
logger.error('failed to save group snapshots: %s', e)
return 0
def fetch_analysis_summary() -> dict:
"""최근 1시간 분석 결과 요약 (채팅 컨텍스트용)."""
try:
with get_conn() as conn:
with conn.cursor() as cur:
# 위험도 분포
cur.execute("""
SELECT risk_level, COUNT(*) FROM vessel_analysis_results
WHERE analyzed_at > NOW() - INTERVAL '1 hour'
GROUP BY risk_level
""")
risk_dist = {row[0]: row[1] for row in cur.fetchall()}
# 수역별 분포
cur.execute("""
SELECT zone, COUNT(*) FROM vessel_analysis_results
WHERE analyzed_at > NOW() - INTERVAL '1 hour'
GROUP BY zone
""")
zone_dist = {row[0]: row[1] for row in cur.fetchall()}
# 다크/스푸핑/환적 카운트
cur.execute("""
SELECT
COUNT(*) FILTER (WHERE is_dark = TRUE) AS dark_count,
COUNT(*) FILTER (WHERE spoofing_score > 0.5) AS spoofing_count,
COUNT(*) FILTER (WHERE is_transship_suspect = TRUE) AS transship_count
FROM vessel_analysis_results
WHERE analyzed_at > NOW() - INTERVAL '1 hour'
""")
row = cur.fetchone()
result = {
'risk_distribution': {**risk_dist, **zone_dist},
'dark_count': row[0] if row else 0,
'spoofing_count': row[1] if row else 0,
'transship_count': row[2] if row else 0,
}
return result
except Exception as e:
logger.error('fetch_analysis_summary failed: %s', e)
return {'risk_distribution': {}, 'dark_count': 0, 'spoofing_count': 0, 'transship_count': 0}
def fetch_recent_high_risk(limit: int = 10) -> list[dict]:
"""위험도 상위 N척 선박 상세 (채팅 컨텍스트용)."""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT mmsi, risk_score, risk_level, zone, is_dark,
is_transship_suspect, activity_state, spoofing_score
FROM vessel_analysis_results
WHERE analyzed_at > NOW() - INTERVAL '1 hour'
ORDER BY risk_score DESC
LIMIT %s
""", (limit,))
rows = cur.fetchall()
result = []
for row in rows:
result.append({
'mmsi': row[0],
'name': row[0], # vessel_store에서 이름 조회 필요시 보강
'risk_score': row[1],
'risk_level': row[2],
'zone': row[3],
'is_dark': row[4],
'is_transship': row[5],
'activity_state': row[6],
'spoofing_score': float(row[7]) if row[7] else 0.0,
})
return result
except Exception as e:
logger.error('fetch_recent_high_risk failed: %s', e)
return []
def fetch_polygon_summary() -> dict:
"""최신 그룹 폴리곤 요약 (채팅 컨텍스트용)."""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(f"""
SELECT group_type, COUNT(*), SUM(member_count)
FROM {GROUP_POLYGON_SNAPSHOTS}
WHERE snapshot_time = (
SELECT MAX(snapshot_time) FROM {GROUP_POLYGON_SNAPSHOTS}
)
GROUP BY group_type
""")
rows = cur.fetchall()
result = {
'fleet_count': 0, 'fleet_members': 0,
'gear_in_zone': 0, 'gear_out_zone': 0,
}
for row in rows:
gtype, count, members = row[0], row[1], row[2] or 0
if gtype == 'FLEET':
result['fleet_count'] = count
result['fleet_members'] = members
elif gtype == 'GEAR_IN_ZONE':
result['gear_in_zone'] = count
elif gtype == 'GEAR_OUT_ZONE':
result['gear_out_zone'] = count
return result
except Exception as e:
logger.error('fetch_polygon_summary failed: %s', e)
return {'fleet_count': 0, 'fleet_members': 0, 'gear_in_zone': 0, 'gear_out_zone': 0}
def cleanup_group_snapshots(days: int = 7) -> int:
"""오래된 그룹 폴리곤 스냅샷 삭제."""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
f"DELETE FROM {GROUP_POLYGON_SNAPSHOTS} "
"WHERE snapshot_time < NOW() - (%s * INTERVAL '1 day')",
(days,),
)
deleted = cur.rowcount
conn.commit()
if deleted > 0:
logger.info('cleaned up %d old group snapshots (older than %dd)', deleted, days)
return deleted
except Exception as e:
logger.error('failed to cleanup group snapshots: %s', e)
return 0