kcg-monitoring/prediction/db/kcgdb.py
htlee 7dd46f2078 feat: 어구 모선 추론(Gear Parent Inference) 시스템 이식
Codex Lab 환경(iran-airstrike-replay-codex)에서 검증 완료된
어구 모선 자동 추론 + 검토 워크플로우 전체를 이식.

## Python (prediction/)
- gear_parent_inference(1,428줄): 다층 점수 모델 (correlation + name + track + prior bonus)
- gear_parent_episode(631줄): Episode 연속성 (Jaccard + 공간거리)
- gear_name_rules: 모선 이름 정규화 + 4자 미만 필터
- scheduler: 추론 호출 단계 추가 (4.8)
- fleet_tracker/kcgdb: SQL qualified_table() 동적화
- gear_correlation: timestamp 필드 추가

## DB (database/migration/ 012~015)
- 후보 스냅샷, resolution, episode, 라벨 세션, 제외 관리 테이블 9개 + VIEW 2개

## Backend (Java)
- 12개 DTO/Controller (ParentInferenceWorkflowController 등)
- GroupPolygonService: parent_resolution LEFT JOIN + 15개 API 메서드

## Frontend
- ParentReviewPanel: 모선 검토 대시보드
- vesselAnalysis: 10개 신규 API 함수 + 6개 타입

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 00:42:31 +09:00

331 lines
12 KiB
Python

import json
import logging
from contextlib import contextmanager
from typing import TYPE_CHECKING, Optional
import psycopg2
from psycopg2 import pool
from psycopg2.extras import execute_values
from config import qualified_table, settings
if TYPE_CHECKING:
from models.result import AnalysisResult
logger = logging.getLogger(__name__)
_pool: Optional[pool.ThreadedConnectionPool] = None
GROUP_POLYGON_SNAPSHOTS = qualified_table('group_polygon_snapshots')
def init_pool():
global _pool
_pool = pool.ThreadedConnectionPool(
minconn=1,
maxconn=5,
host=settings.KCGDB_HOST,
port=settings.KCGDB_PORT,
dbname=settings.KCGDB_NAME,
user=settings.KCGDB_USER,
password=settings.KCGDB_PASSWORD,
options=f'-c search_path={settings.KCGDB_SCHEMA},public',
)
logger.info('kcgdb connection pool initialized')
def close_pool():
global _pool
if _pool:
_pool.closeall()
_pool = None
logger.info('kcgdb connection pool closed')
@contextmanager
def get_conn():
conn = _pool.getconn()
try:
yield conn
except Exception:
conn.rollback()
raise
finally:
_pool.putconn(conn)
def check_health() -> bool:
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute('SELECT 1')
return True
except Exception as e:
logger.error('kcgdb health check failed: %s', e)
return False
def upsert_results(results: list['AnalysisResult']) -> int:
"""분석 결과를 vessel_analysis_results 테이블에 upsert."""
if not results:
return 0
insert_sql = """
INSERT INTO vessel_analysis_results (
mmsi, timestamp, vessel_type, confidence, fishing_pct,
cluster_id, season, zone, dist_to_baseline_nm, activity_state,
ucaf_score, ucft_score, is_dark, gap_duration_min,
spoofing_score, bd09_offset_m, speed_jump_count,
cluster_size, is_leader, fleet_role,
risk_score, risk_level,
is_transship_suspect, transship_pair_mmsi, transship_duration_min,
features, analyzed_at
) VALUES %s
ON CONFLICT (mmsi, timestamp) DO UPDATE SET
vessel_type = EXCLUDED.vessel_type,
confidence = EXCLUDED.confidence,
fishing_pct = EXCLUDED.fishing_pct,
cluster_id = EXCLUDED.cluster_id,
season = EXCLUDED.season,
zone = EXCLUDED.zone,
dist_to_baseline_nm = EXCLUDED.dist_to_baseline_nm,
activity_state = EXCLUDED.activity_state,
ucaf_score = EXCLUDED.ucaf_score,
ucft_score = EXCLUDED.ucft_score,
is_dark = EXCLUDED.is_dark,
gap_duration_min = EXCLUDED.gap_duration_min,
spoofing_score = EXCLUDED.spoofing_score,
bd09_offset_m = EXCLUDED.bd09_offset_m,
speed_jump_count = EXCLUDED.speed_jump_count,
cluster_size = EXCLUDED.cluster_size,
is_leader = EXCLUDED.is_leader,
fleet_role = EXCLUDED.fleet_role,
risk_score = EXCLUDED.risk_score,
risk_level = EXCLUDED.risk_level,
is_transship_suspect = EXCLUDED.is_transship_suspect,
transship_pair_mmsi = EXCLUDED.transship_pair_mmsi,
transship_duration_min = EXCLUDED.transship_duration_min,
features = EXCLUDED.features,
analyzed_at = EXCLUDED.analyzed_at
"""
try:
with get_conn() as conn:
with conn.cursor() as cur:
tuples = [r.to_db_tuple() for r in results]
execute_values(cur, insert_sql, tuples, page_size=100)
conn.commit()
count = len(tuples)
logger.info('upserted %d analysis results', count)
return count
except Exception as e:
logger.error('failed to upsert results: %s', e)
return 0
def cleanup_old(hours: int = 48) -> int:
"""오래된 분석 결과 삭제."""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
'DELETE FROM vessel_analysis_results WHERE analyzed_at < NOW() - (%s * INTERVAL \'1 hour\')',
(hours,),
)
deleted = cur.rowcount
conn.commit()
if deleted > 0:
logger.info('cleaned up %d old results (older than %dh)', deleted, hours)
return deleted
except Exception as e:
logger.error('failed to cleanup old results: %s', e)
return 0
def save_group_snapshots(snapshots: list[dict]) -> int:
"""group_polygon_snapshots에 폴리곤 스냅샷 배치 INSERT.
snapshots: polygon_builder.build_all_group_snapshots() 결과
각 항목은: group_type, group_key, group_label, snapshot_time,
polygon_wkt (str|None), center_wkt (str|None),
area_sq_nm, member_count, zone_id, zone_name,
members (list[dict]), color
"""
if not snapshots:
return 0
insert_sql = f"""
INSERT INTO {GROUP_POLYGON_SNAPSHOTS} (
group_type, group_key, group_label, sub_cluster_id, resolution, snapshot_time,
polygon, center_point, area_sq_nm, member_count,
zone_id, zone_name, members, color
) VALUES (
%s, %s, %s, %s, %s, %s,
ST_GeomFromText(%s, 4326), ST_GeomFromText(%s, 4326),
%s, %s, %s, %s, %s::jsonb, %s
)
"""
inserted = 0
try:
with get_conn() as conn:
with conn.cursor() as cur:
for s in snapshots:
cur.execute(
insert_sql,
(
s['group_type'],
s['group_key'],
s['group_label'],
s.get('sub_cluster_id', 0),
s.get('resolution', '6h'),
s['snapshot_time'],
s.get('polygon_wkt'),
s.get('center_wkt'),
s.get('area_sq_nm'),
s.get('member_count'),
s.get('zone_id'),
s.get('zone_name'),
json.dumps(s.get('members', []), ensure_ascii=False),
s.get('color'),
),
)
inserted += 1
conn.commit()
logger.info('saved %d group polygon snapshots', inserted)
return inserted
except Exception as e:
logger.error('failed to save group snapshots: %s', e)
return 0
def fetch_analysis_summary() -> dict:
"""최근 1시간 분석 결과 요약 (채팅 컨텍스트용)."""
try:
with get_conn() as conn:
with conn.cursor() as cur:
# 위험도 분포
cur.execute("""
SELECT risk_level, COUNT(*) FROM vessel_analysis_results
WHERE analyzed_at > NOW() - INTERVAL '1 hour'
GROUP BY risk_level
""")
risk_dist = {row[0]: row[1] for row in cur.fetchall()}
# 수역별 분포
cur.execute("""
SELECT zone, COUNT(*) FROM vessel_analysis_results
WHERE analyzed_at > NOW() - INTERVAL '1 hour'
GROUP BY zone
""")
zone_dist = {row[0]: row[1] for row in cur.fetchall()}
# 다크/스푸핑/환적 카운트
cur.execute("""
SELECT
COUNT(*) FILTER (WHERE is_dark = TRUE) AS dark_count,
COUNT(*) FILTER (WHERE spoofing_score > 0.5) AS spoofing_count,
COUNT(*) FILTER (WHERE is_transship_suspect = TRUE) AS transship_count
FROM vessel_analysis_results
WHERE analyzed_at > NOW() - INTERVAL '1 hour'
""")
row = cur.fetchone()
result = {
'risk_distribution': {**risk_dist, **zone_dist},
'dark_count': row[0] if row else 0,
'spoofing_count': row[1] if row else 0,
'transship_count': row[2] if row else 0,
}
return result
except Exception as e:
logger.error('fetch_analysis_summary failed: %s', e)
return {'risk_distribution': {}, 'dark_count': 0, 'spoofing_count': 0, 'transship_count': 0}
def fetch_recent_high_risk(limit: int = 10) -> list[dict]:
"""위험도 상위 N척 선박 상세 (채팅 컨텍스트용)."""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT mmsi, risk_score, risk_level, zone, is_dark,
is_transship_suspect, activity_state, spoofing_score
FROM vessel_analysis_results
WHERE analyzed_at > NOW() - INTERVAL '1 hour'
ORDER BY risk_score DESC
LIMIT %s
""", (limit,))
rows = cur.fetchall()
result = []
for row in rows:
result.append({
'mmsi': row[0],
'name': row[0], # vessel_store에서 이름 조회 필요시 보강
'risk_score': row[1],
'risk_level': row[2],
'zone': row[3],
'is_dark': row[4],
'is_transship': row[5],
'activity_state': row[6],
'spoofing_score': float(row[7]) if row[7] else 0.0,
})
return result
except Exception as e:
logger.error('fetch_recent_high_risk failed: %s', e)
return []
def fetch_polygon_summary() -> dict:
"""최신 그룹 폴리곤 요약 (채팅 컨텍스트용)."""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(f"""
SELECT group_type, COUNT(*), SUM(member_count)
FROM {GROUP_POLYGON_SNAPSHOTS}
WHERE snapshot_time = (
SELECT MAX(snapshot_time) FROM {GROUP_POLYGON_SNAPSHOTS}
)
GROUP BY group_type
""")
rows = cur.fetchall()
result = {
'fleet_count': 0, 'fleet_members': 0,
'gear_in_zone': 0, 'gear_out_zone': 0,
}
for row in rows:
gtype, count, members = row[0], row[1], row[2] or 0
if gtype == 'FLEET':
result['fleet_count'] = count
result['fleet_members'] = members
elif gtype == 'GEAR_IN_ZONE':
result['gear_in_zone'] = count
elif gtype == 'GEAR_OUT_ZONE':
result['gear_out_zone'] = count
return result
except Exception as e:
logger.error('fetch_polygon_summary failed: %s', e)
return {'fleet_count': 0, 'fleet_members': 0, 'gear_in_zone': 0, 'gear_out_zone': 0}
def cleanup_group_snapshots(days: int = 7) -> int:
"""오래된 그룹 폴리곤 스냅샷 삭제."""
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
f"DELETE FROM {GROUP_POLYGON_SNAPSHOTS} "
"WHERE snapshot_time < NOW() - (%s * INTERVAL '1 day')",
(days,),
)
deleted = cur.rowcount
conn.commit()
if deleted > 0:
logger.info('cleaned up %d old group snapshots (older than %dd)', deleted, days)
return deleted
except Exception as e:
logger.error('failed to cleanup group snapshots: %s', e)
return 0