""" 이벤트 자동 생성기 — 분석 결과 → prediction_events INSERT. 매 분석 사이클마다 vessel_analysis_results를 스캔하여 룰 기반으로 Event 객체를 생성합니다. dedup: 동일 mmsi + category + 윈도우 내 중복 방지. """ import hashlib import json import logging from datetime import datetime, timedelta, timezone from typing import Optional from psycopg2.extras import execute_values from config import qualified_table, settings from db.kcgdb import get_conn logger = logging.getLogger(__name__) EVENTS_TABLE = qualified_table('prediction_events') # 카테고리별 dedup 윈도우 (분). # 사이클이 5분 간격이므로 5의 배수를 피해서 boundary 일제 만료 패턴을 회피한다. DEDUP_WINDOWS = { 'EEZ_INTRUSION': 33, 'DARK_VESSEL': 131, # 127 → 131 (2h boundary 회피) 'FLEET_CLUSTER': 367, 'ILLEGAL_TRANSSHIP': 181, # 67 → 181 (환적은 장기 이벤트) 'MMSI_TAMPERING': 33, 'AIS_LOSS': 127, 'SPEED_ANOMALY': 67, 'ZONE_DEPARTURE': 89, # 127 → 89 (boundary 분산) 'GEAR_ILLEGAL': 367, 'AIS_RESUME': 67, 'HIGH_RISK_VESSEL': 67, } # 이벤트 생성 룰 # 한 분석결과가 여러 룰에 매칭되면 모두 생성한다 (카테고리별 dedup_key가 분리되어 안전). # zone_code 실측값: EEZ_OR_BEYOND/ZONE_II/III/IV/CONTIGUOUS_ZONE/TERRITORIAL_SEA # (algorithms.location.classify_zone 결과) RULES = [ { # 영해 침범 — 가장 심각 'name': 'territorial_sea_violation', 'condition': lambda r: r.get('zone_code') == 'TERRITORIAL_SEA', 'level': 'CRITICAL', 'category': 'EEZ_INTRUSION', 'title_fn': lambda r: f"영해 침범 탐지 (위험도 {r.get('risk_score', 0)})", }, { # 접속수역 + 고위험 'name': 'contiguous_zone_high_risk', 'condition': lambda r: r.get('zone_code') == 'CONTIGUOUS_ZONE' and (r.get('risk_score', 0) or 0) >= 50, 'level': 'HIGH', 'category': 'EEZ_INTRUSION', 'title_fn': lambda r: f"접속수역 침입 (위험도 {r.get('risk_score', 0)})", }, { # 종합 위험도 CRITICAL — risk.py 분류와 동일 임계값 'name': 'critical_risk', 'condition': lambda r: (r.get('risk_score', 0) or 0) >= 70, 'level': 'CRITICAL', 'category': 'HIGH_RISK_VESSEL', 'title_fn': lambda r: f"고위험 선박 탐지 (위험도 {r.get('risk_score', 0)})", }, { # dark 의심 CRITICAL — 점수 70+ (반복·민감수역·이동중·거리이상 등 복합) 'name': 'dark_critical', 'condition': lambda r: (r.get('features') or {}).get('dark_tier') == 'CRITICAL', 'level': 'CRITICAL', 'category': 'DARK_VESSEL', 'title_fn': lambda r: ( f"고의 AIS 소실 의심 (점수 {(r.get('features') or {}).get('dark_suspicion_score', 0)}, " f"7일 {(r.get('features') or {}).get('dark_history_7d', 0)}회)" ), }, { # dark 의심 HIGH — 점수 50~69 'name': 'dark_high', 'condition': lambda r: (r.get('features') or {}).get('dark_tier') == 'HIGH', 'level': 'HIGH', 'category': 'DARK_VESSEL', 'title_fn': lambda r: ( f"AIS 소실 의심 (점수 {(r.get('features') or {}).get('dark_suspicion_score', 0)})" ), }, { 'name': 'spoofing', 'condition': lambda r: (r.get('spoofing_score', 0) or 0) > 0.7, 'level': 'HIGH', 'category': 'MMSI_TAMPERING', 'title_fn': lambda r: f"GPS/MMSI 조작 의심 (점수 {r.get('spoofing_score', 0):.2f})", }, { # 환적 의심 CRITICAL — 점수 90+ 'name': 'transship_critical', 'condition': lambda r: (r.get('features') or {}).get('transship_tier') == 'CRITICAL', 'level': 'CRITICAL', 'category': 'ILLEGAL_TRANSSHIP', 'title_fn': lambda r: ( f"환적 의심 (점수 {(r.get('features') or {}).get('transship_score', 0)}, " f"{r.get('transship_duration_min', 0)}분, 상대 {r.get('transship_pair_mmsi', '?')})" ), }, { # 환적 의심 HIGH — 점수 70~89 'name': 'transship_high', 'condition': lambda r: (r.get('features') or {}).get('transship_tier') == 'HIGH', 'level': 'HIGH', 'category': 'ILLEGAL_TRANSSHIP', 'title_fn': lambda r: ( f"환적 의심 (점수 {(r.get('features') or {}).get('transship_score', 0)}, " f"{r.get('transship_duration_min', 0)}분)" ), }, { 'name': 'fleet_cluster', 'condition': lambda r: r.get('fleet_is_leader') and (r.get('fleet_cluster_id') is not None), 'level': 'MEDIUM', 'category': 'FLEET_CLUSTER', 'title_fn': lambda r: f"선단 밀집 감지 (클러스터 {r.get('fleet_cluster_id')})", }, { # 특정수역(ZONE_*) 진입 — 운영자 모니터링용 'name': 'special_zone_entry', 'condition': lambda r: (r.get('zone_code') or '').startswith('ZONE_') and (r.get('risk_score', 0) or 0) >= 40, 'level': 'MEDIUM', 'category': 'ZONE_DEPARTURE', 'title_fn': lambda r: f"특정수역 진입 ({r.get('zone_code')}, 위험도 {r.get('risk_score', 0)})", }, { # 고위험 행동 패턴 (risk_level=HIGH 이상은 위 critical_risk가 잡고, 50~69점만 여기에) 'name': 'high_risk', 'condition': lambda r: 50 <= (r.get('risk_score', 0) or 0) < 70, 'level': 'MEDIUM', 'category': 'HIGH_RISK_VESSEL', 'title_fn': lambda r: f"위험 행동 패턴 (위험도 {r.get('risk_score', 0)})", }, # ── G-code 어구 위반 규칙 ── { 'name': 'g06_pair_trawl', 'condition': lambda r: 'G-06' in ((r.get('features') or {}).get('g_codes') or []), 'level': 'CRITICAL', 'category': 'GEAR_ILLEGAL', 'title_fn': lambda r: ( f"쌍끌이 불법조업 의심 (G-06): " f"{((r.get('features') or {}).get('gear_violation_evidence') or {}).get('G-06', {}).get('sync_duration_min', 0):.0f}분 공조" ), }, { 'name': 'g01_zone_gear_violation', 'condition': lambda r: 'G-01' in ((r.get('features') or {}).get('g_codes') or []), 'level': 'HIGH', 'category': 'GEAR_ILLEGAL', 'title_fn': lambda r: ( f"수역-어구 위반 (G-01): " f"{r.get('vessel_type', '')} 비허가 수역 조업" ), }, { 'name': 'g04_mmsi_cycling', 'condition': lambda r: 'G-04' in ((r.get('features') or {}).get('g_codes') or []), 'level': 'HIGH', 'category': 'MMSI_TAMPERING', 'title_fn': lambda r: "어구 MMSI 조작 의심 (G-04): 신호 주기적 단속", }, { 'name': 'g05_gear_drift', 'condition': lambda r: 'G-05' in ((r.get('features') or {}).get('g_codes') or []), 'level': 'MEDIUM', 'category': 'GEAR_ILLEGAL', 'title_fn': lambda r: "어구 인위적 이동 의심 (G-05): 조류보정 초과 이동", }, ] def _make_dedup_key(mmsi: str, category: str) -> str: return f"{mmsi}:{category}" def _make_event_uid(now: datetime, seq: int) -> str: date_str = now.strftime('%Y%m%d') return f"EVT-{date_str}-{seq:04d}" def _get_next_seq(conn, date_str: str) -> int: cur = conn.cursor() cur.execute( f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE event_uid LIKE %s", (f'EVT-{date_str}-%',) ) return cur.fetchone()[0] + 1 def _check_dedup(conn, dedup_key: str, category: str, now: datetime) -> bool: """중복 이벤트 존재 여부 확인.""" window_min = DEDUP_WINDOWS.get(category, 60) cutoff = now - timedelta(minutes=window_min) cur = conn.cursor() cur.execute( f"SELECT 1 FROM {EVENTS_TABLE} WHERE dedup_key = %s AND occurred_at > %s LIMIT 1", (dedup_key, cutoff) ) return cur.fetchone() is not None def run_event_generator(analysis_results: list[dict]) -> dict: """ 분석 결과 리스트를 스캔하여 이벤트 생성. Args: analysis_results: vessel_analysis_results 행 딕셔너리 리스트 (mmsi, risk_score, zone_code, is_dark, gap_duration_min, spoofing_score, ...) Returns: { 'generated': int, 'skipped_dedup': int } """ now = datetime.now(timezone.utc) generated = 0 skipped_dedup = 0 events_to_insert = [] with get_conn() as conn: date_str = now.strftime('%Y%m%d') seq = _get_next_seq(conn, date_str) for result in analysis_results: mmsi = result.get('mmsi', '') if not mmsi: continue for rule in RULES: try: if not rule['condition'](result): continue except Exception: continue category = rule['category'] dedup_key = _make_dedup_key(mmsi, category) if _check_dedup(conn, dedup_key, category, now): skipped_dedup += 1 continue event_uid = _make_event_uid(now, seq) seq += 1 # features 추출: 이벤트에 연관된 핵심 특성만 저장 raw_features = result.get('features') features_json = json.dumps(raw_features, ensure_ascii=False) if raw_features else None events_to_insert.append(( event_uid, now, # occurred_at rule['level'], category, rule['title_fn'](result), # title None, # detail mmsi, result.get('vessel_name'), result.get('zone_code'), # area_name (zone으로 대체) result.get('zone_code'), result.get('lat'), result.get('lon'), result.get('speed_kn'), 'VESSEL_ANALYSIS', # source_type result.get('id'), # source_ref_id result.get('confidence') or result.get('risk_score', 0) / 100.0, 'NEW', # status dedup_key, features_json, )) generated += 1 # break 제거: 한 분석결과가 여러 룰에 매칭되면 모두 생성 # (카테고리별 dedup_key가 분리되어 안전) if events_to_insert: execute_values( conn.cursor(), f"""INSERT INTO {EVENTS_TABLE} (event_uid, occurred_at, level, category, title, detail, vessel_mmsi, vessel_name, area_name, zone_code, lat, lon, speed_kn, source_type, source_ref_id, ai_confidence, status, dedup_key, features) VALUES %s ON CONFLICT (event_uid) DO NOTHING""", events_to_insert, ) conn.commit() logger.info(f'event_generator: generated={generated}, skipped_dedup={skipped_dedup}') return {'generated': generated, 'skipped_dedup': skipped_dedup}