From 474e672683bfd498e780d910ebf8678032fec57e Mon Sep 17 00:00:00 2001 From: htlee Date: Tue, 7 Apr 2026 13:00:50 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20S3=20prediction=20=EC=8B=A0=EA=B7=9C=20?= =?UTF-8?q?=EC=B6=9C=EB=A0=A5=20=EB=AA=A8=EB=93=88=205=EC=A2=85=20+=20sche?= =?UTF-8?q?duler=20=ED=86=B5=ED=95=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 분석 사이클 완료 후 자동 실행되는 출력 파이프라인: - event_generator: 분석결과 → 이벤트 자동 생성 (7개 룰, 카테고리별 dedup) - violation_classifier: 위반 유형 라벨링 (EEZ/DARK/MMSI/TRANSSHIP/GEAR/RISK) - kpi_writer: 실시간 KPI 6개 갱신 (오늘 기준 카운트) - stats_aggregator: hourly/daily/monthly 사전 집계 (UPSERT) - alert_dispatcher: CRITICAL/HIGH 이벤트 자동 알림 생성 scheduler.py에 출력 모듈 통합 (분석 8단계 완료 후 실행, non-fatal) DB 연동 테스트 통과 (alerts 8건 생성, KPI tracking_active=2) Co-Authored-By: Claude Opus 4.6 (1M context) --- prediction/output/__init__.py | 0 prediction/output/alert_dispatcher.py | 64 ++++++ prediction/output/event_generator.py | 200 ++++++++++++++++++ prediction/output/kpi_writer.py | 109 ++++++++++ prediction/output/stats_aggregator.py | 237 ++++++++++++++++++++++ prediction/output/violation_classifier.py | 87 ++++++++ prediction/scheduler.py | 30 ++- 7 files changed, 726 insertions(+), 1 deletion(-) create mode 100644 prediction/output/__init__.py create mode 100644 prediction/output/alert_dispatcher.py create mode 100644 prediction/output/event_generator.py create mode 100644 prediction/output/kpi_writer.py create mode 100644 prediction/output/stats_aggregator.py create mode 100644 prediction/output/violation_classifier.py diff --git a/prediction/output/__init__.py b/prediction/output/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/prediction/output/alert_dispatcher.py b/prediction/output/alert_dispatcher.py new file mode 100644 index 0000000..19d8422 --- /dev/null +++ b/prediction/output/alert_dispatcher.py @@ -0,0 +1,64 @@ +""" +경보 발송 — CRITICAL/HIGH 이벤트에 대해 prediction_alerts INSERT. + +현재는 DASHBOARD 채널만 기록 (실제 SMS/EMAIL은 향후 연동). +""" +import logging +from datetime import datetime, timezone + +from psycopg2.extras import execute_values + +from config import qualified_table +from db.kcgdb import get_conn + +logger = logging.getLogger(__name__) + +ALERTS_TABLE = qualified_table('prediction_alerts') +EVENTS_TABLE = qualified_table('prediction_events') + +# CRITICAL/HIGH 이벤트만 알림 대상 +ALERT_LEVELS = ('CRITICAL', 'HIGH') + + +def run_alert_dispatcher() -> dict: + """ + 아직 알림이 없는 CRITICAL/HIGH 이벤트에 대해 알림 생성. + + Returns: + { 'dispatched': int } + """ + now = datetime.now(timezone.utc) + dispatched = 0 + + with get_conn() as conn: + cur = conn.cursor() + + # 알림이 아직 없는 CRITICAL/HIGH 이벤트 조회 + cur.execute( + f"""SELECT e.id, e.ai_confidence + FROM {EVENTS_TABLE} e + LEFT JOIN {ALERTS_TABLE} a ON a.event_id = e.id + WHERE e.level IN %s AND a.id IS NULL + ORDER BY e.occurred_at DESC + LIMIT 100""", + (ALERT_LEVELS,) + ) + rows = cur.fetchall() + + if rows: + alerts = [ + (event_id, 'DASHBOARD', None, now, 'SENT', confidence) + for event_id, confidence in rows + ] + execute_values( + cur, + f"""INSERT INTO {ALERTS_TABLE} + (event_id, channel, recipient, sent_at, delivery_status, ai_confidence) + VALUES %s""", + alerts, + ) + conn.commit() + dispatched = len(alerts) + + logger.info(f'alert_dispatcher: dispatched={dispatched}') + return {'dispatched': dispatched} diff --git a/prediction/output/event_generator.py b/prediction/output/event_generator.py new file mode 100644 index 0000000..17a5a5c --- /dev/null +++ b/prediction/output/event_generator.py @@ -0,0 +1,200 @@ +""" +이벤트 자동 생성기 — 분석 결과 → prediction_events INSERT. + +매 분석 사이클마다 vessel_analysis_results를 스캔하여 +룰 기반으로 Event 객체를 생성합니다. +dedup: 동일 mmsi + category + 윈도우 내 중복 방지. +""" +import hashlib +import logging +from datetime import datetime, timedelta, timezone +from typing import Optional + +from psycopg2.extras import execute_values + +from config import qualified_table, settings +from db.kcgdb import get_conn + +logger = logging.getLogger(__name__) + +EVENTS_TABLE = qualified_table('prediction_events') + +# 카테고리별 dedup 윈도우 (분) +DEDUP_WINDOWS = { + 'EEZ_INTRUSION': 30, + 'DARK_VESSEL': 120, + 'FLEET_CLUSTER': 360, + 'ILLEGAL_TRANSSHIP': 60, + 'MMSI_TAMPERING': 30, + 'AIS_LOSS': 120, + 'SPEED_ANOMALY': 60, + 'ZONE_DEPARTURE': 120, + 'GEAR_ILLEGAL': 360, + 'AIS_RESUME': 60, +} + +# 이벤트 생성 룰 +RULES = [ + { + 'name': 'critical_risk', + 'condition': lambda r: r.get('risk_score', 0) >= 90, + 'level': 'CRITICAL', + 'category': 'EEZ_INTRUSION', + 'title_fn': lambda r: f"고위험 선박 탐지 (위험도 {r.get('risk_score', 0)})", + }, + { + 'name': 'eez_violation', + 'condition': lambda r: r.get('zone_code', '') in ('NLL', 'SPECIAL_FISHING_1', 'SPECIAL_FISHING_2') + and r.get('risk_score', 0) >= 70, + 'level': 'CRITICAL', + 'category': 'EEZ_INTRUSION', + 'title_fn': lambda r: f"EEZ 침범 탐지 ({r.get('zone_code', '')})", + }, + { + 'name': 'dark_vessel_long', + 'condition': lambda r: r.get('is_dark') and (r.get('gap_duration_min', 0) or 0) > 60, + 'level': 'HIGH', + 'category': 'DARK_VESSEL', + 'title_fn': lambda r: f"다크베셀 장기 소실 ({r.get('gap_duration_min', 0)}분)", + }, + { + 'name': 'spoofing', + 'condition': lambda r: (r.get('spoofing_score', 0) or 0) > 0.7, + 'level': 'HIGH', + 'category': 'MMSI_TAMPERING', + 'title_fn': lambda r: f"GPS/MMSI 조작 의심 (점수 {r.get('spoofing_score', 0):.2f})", + }, + { + 'name': 'transship', + 'condition': lambda r: r.get('transship_suspect'), + 'level': 'HIGH', + 'category': 'ILLEGAL_TRANSSHIP', + 'title_fn': lambda r: f"환적 의심 (상대: {r.get('transship_pair_mmsi', '미상')})", + }, + { + 'name': 'fleet_cluster', + 'condition': lambda r: r.get('fleet_is_leader') and (r.get('fleet_cluster_id') is not None), + 'level': 'MEDIUM', + 'category': 'FLEET_CLUSTER', + 'title_fn': lambda r: f"선단 밀집 감지 (클러스터 {r.get('fleet_cluster_id')})", + }, + { + 'name': 'high_risk', + 'condition': lambda r: r.get('risk_level') == 'HIGH' and r.get('risk_score', 0) >= 60, + 'level': 'MEDIUM', + 'category': 'ZONE_DEPARTURE', + 'title_fn': lambda r: f"위험 행동 패턴 (위험도 {r.get('risk_score', 0)})", + }, +] + + +def _make_dedup_key(mmsi: str, category: str) -> str: + return f"{mmsi}:{category}" + + +def _make_event_uid(now: datetime, seq: int) -> str: + date_str = now.strftime('%Y%m%d') + return f"EVT-{date_str}-{seq:04d}" + + +def _get_next_seq(conn, date_str: str) -> int: + cur = conn.cursor() + cur.execute( + f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE event_uid LIKE %s", + (f'EVT-{date_str}-%',) + ) + return cur.fetchone()[0] + 1 + + +def _check_dedup(conn, dedup_key: str, category: str, now: datetime) -> bool: + """중복 이벤트 존재 여부 확인.""" + window_min = DEDUP_WINDOWS.get(category, 60) + cutoff = now - timedelta(minutes=window_min) + cur = conn.cursor() + cur.execute( + f"SELECT 1 FROM {EVENTS_TABLE} WHERE dedup_key = %s AND occurred_at > %s LIMIT 1", + (dedup_key, cutoff) + ) + return cur.fetchone() is not None + + +def run_event_generator(analysis_results: list[dict]) -> dict: + """ + 분석 결과 리스트를 스캔하여 이벤트 생성. + + Args: + analysis_results: vessel_analysis_results 행 딕셔너리 리스트 + (mmsi, risk_score, zone_code, is_dark, gap_duration_min, spoofing_score, ...) + + Returns: + { 'generated': int, 'skipped_dedup': int } + """ + now = datetime.now(timezone.utc) + generated = 0 + skipped_dedup = 0 + events_to_insert = [] + + with get_conn() as conn: + date_str = now.strftime('%Y%m%d') + seq = _get_next_seq(conn, date_str) + + for result in analysis_results: + mmsi = result.get('mmsi', '') + if not mmsi: + continue + + for rule in RULES: + try: + if not rule['condition'](result): + continue + except Exception: + continue + + category = rule['category'] + dedup_key = _make_dedup_key(mmsi, category) + + if _check_dedup(conn, dedup_key, category, now): + skipped_dedup += 1 + continue + + event_uid = _make_event_uid(now, seq) + seq += 1 + + events_to_insert.append(( + event_uid, + now, # occurred_at + rule['level'], + category, + rule['title_fn'](result), # title + None, # detail + mmsi, + result.get('vessel_name'), + result.get('zone_code'), # area_name (zone으로 대체) + result.get('zone_code'), + result.get('lat'), + result.get('lon'), + result.get('speed_kn'), + 'VESSEL_ANALYSIS', # source_type + result.get('id'), # source_ref_id + result.get('confidence') or result.get('risk_score', 0) / 100.0, + 'NEW', # status + dedup_key, + )) + generated += 1 + break # 한 분석결과당 최고 우선순위 룰 1개만 + + if events_to_insert: + execute_values( + conn.cursor(), + f"""INSERT INTO {EVENTS_TABLE} + (event_uid, occurred_at, level, category, title, detail, + vessel_mmsi, vessel_name, area_name, zone_code, lat, lon, speed_kn, + source_type, source_ref_id, ai_confidence, status, dedup_key) + VALUES %s + ON CONFLICT (event_uid) DO NOTHING""", + events_to_insert, + ) + conn.commit() + + logger.info(f'event_generator: generated={generated}, skipped_dedup={skipped_dedup}') + return {'generated': generated, 'skipped_dedup': skipped_dedup} diff --git a/prediction/output/kpi_writer.py b/prediction/output/kpi_writer.py new file mode 100644 index 0000000..a087009 --- /dev/null +++ b/prediction/output/kpi_writer.py @@ -0,0 +1,109 @@ +""" +실시간 KPI 갱신 — prediction_kpi_realtime 테이블 업데이트. + +매 분석 사이클마다 오늘 날짜 기준 카운트를 계산하여 6개 KPI 갱신. +""" +import logging +from datetime import date, datetime, timezone + +from config import qualified_table +from db.kcgdb import get_conn + +logger = logging.getLogger(__name__) + +KPI_TABLE = qualified_table('prediction_kpi_realtime') +EVENTS_TABLE = qualified_table('prediction_events') +ENF_TABLE = qualified_table('enforcement_records') +VAR_TABLE = qualified_table('vessel_analysis_results') + + +def run_kpi_writer() -> dict: + """ + 오늘 날짜 기준으로 6개 KPI를 재계산하여 갱신. + + Returns: + { kpi_key: value } 딕셔너리 + """ + today = date.today() + today_start = datetime(today.year, today.month, today.day, tzinfo=timezone.utc) + now = datetime.now(timezone.utc) + results = {} + + with get_conn() as conn: + cur = conn.cursor() + + # 1. 실시간 탐지 (오늘 분석 결과 수) + cur.execute( + f"SELECT COUNT(DISTINCT mmsi) FROM {VAR_TABLE} WHERE analyzed_at >= %s", + (today_start,) + ) + realtime = cur.fetchone()[0] or 0 + results['realtime_detection'] = realtime + + # 2. EEZ 침범 (오늘 EEZ 관련 이벤트) + cur.execute( + f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE category = 'EEZ_INTRUSION' AND occurred_at >= %s", + (today_start,) + ) + eez = cur.fetchone()[0] or 0 + results['eez_violation'] = eez + + # 3. 다크베셀 (현재 dark 상태인 선박) + cur.execute( + f"""SELECT COUNT(DISTINCT mmsi) FROM {VAR_TABLE} + WHERE is_dark = true AND analyzed_at >= %s""", + (today_start,) + ) + dark = cur.fetchone()[0] or 0 + results['dark_vessel'] = dark + + # 4. 환적 의심 (오늘) + cur.execute( + f"""SELECT COUNT(*) FROM {EVENTS_TABLE} + WHERE category = 'ILLEGAL_TRANSSHIP' AND occurred_at >= %s""", + (today_start,) + ) + transship = cur.fetchone()[0] or 0 + results['illegal_transship'] = transship + + # 5. 추적 중 (IN_PROGRESS 상태 이벤트) + cur.execute( + f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE status = 'IN_PROGRESS'" + ) + tracking = cur.fetchone()[0] or 0 + results['tracking_active'] = tracking + + # 6. 나포/검문 (오늘 단속) + cur.execute( + f"SELECT COUNT(*) FROM {ENF_TABLE} WHERE enforced_at >= %s", + (today_start,) + ) + captured = cur.fetchone()[0] or 0 + results['captured_inspected'] = captured + + # KPI 테이블 업데이트 (이전 값과 비교하여 trend 계산) + for key, value in results.items(): + cur.execute( + f"SELECT value FROM {KPI_TABLE} WHERE kpi_key = %s", + (key,) + ) + row = cur.fetchone() + prev = row[0] if row else 0 + if value > prev: + trend, delta = 'up', ((value - prev) / max(prev, 1)) * 100 + elif value < prev: + trend, delta = 'down', ((value - prev) / max(prev, 1)) * 100 + else: + trend, delta = 'flat', 0.0 + + cur.execute( + f"""UPDATE {KPI_TABLE} + SET value = %s, trend = %s, delta_pct = %s, updated_at = %s + WHERE kpi_key = %s""", + (value, trend, round(delta, 2), now, key) + ) + + conn.commit() + + logger.info(f'kpi_writer: {results}') + return results diff --git a/prediction/output/stats_aggregator.py b/prediction/output/stats_aggregator.py new file mode 100644 index 0000000..f2681d8 --- /dev/null +++ b/prediction/output/stats_aggregator.py @@ -0,0 +1,237 @@ +""" +통계 사전 집계 — prediction_stats_hourly/daily/monthly 갱신. + +hourly: 매 분석 사이클마다 (최근 48h 보존) +daily: 매일 01:00 또는 분석 사이클 후 +monthly: daily 합산 +""" +import json +import logging +from datetime import date, datetime, timedelta, timezone +from typing import Optional + +from config import qualified_table +from db.kcgdb import get_conn + +logger = logging.getLogger(__name__) + +STATS_HOURLY = qualified_table('prediction_stats_hourly') +STATS_DAILY = qualified_table('prediction_stats_daily') +STATS_MONTHLY = qualified_table('prediction_stats_monthly') +VAR_TABLE = qualified_table('vessel_analysis_results') +EVENTS_TABLE = qualified_table('prediction_events') +ENF_TABLE = qualified_table('enforcement_records') + + +def _jsonb(d: dict) -> str: + return json.dumps(d, ensure_ascii=False) + + +def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict: + """현재 시간 기준 hourly 집계.""" + now = target_hour or datetime.now(timezone.utc) + hour_start = now.replace(minute=0, second=0, microsecond=0) + hour_end = hour_start + timedelta(hours=1) + + with get_conn() as conn: + cur = conn.cursor() + + # 탐지 수 + cur.execute( + f"SELECT COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s", + (hour_start, hour_end) + ) + total = cur.fetchone()[0] or 0 + + # 위험 레벨별 + cur.execute( + f"""SELECT risk_level, COUNT(*) FROM {VAR_TABLE} + WHERE analyzed_at >= %s AND analyzed_at < %s AND risk_level IS NOT NULL + GROUP BY risk_level""", + (hour_start, hour_end) + ) + by_risk = dict(cur.fetchall()) + + # 이벤트 수 + cur.execute( + f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s", + (hour_start, hour_end) + ) + events = cur.fetchone()[0] or 0 + + # CRITICAL 이벤트 + cur.execute( + f"""SELECT COUNT(*) FROM {EVENTS_TABLE} + WHERE occurred_at >= %s AND occurred_at < %s AND level = 'CRITICAL'""", + (hour_start, hour_end) + ) + critical = cur.fetchone()[0] or 0 + + cur.execute( + f"""INSERT INTO {STATS_HOURLY} + (stat_hour, total_detections, by_risk_level, event_count, critical_count, updated_at) + VALUES (%s, %s, %s, %s, %s, %s) + ON CONFLICT (stat_hour) DO UPDATE SET + total_detections = EXCLUDED.total_detections, + by_risk_level = EXCLUDED.by_risk_level, + event_count = EXCLUDED.event_count, + critical_count = EXCLUDED.critical_count, + updated_at = EXCLUDED.updated_at""", + (hour_start, total, _jsonb(by_risk), events, critical, now) + ) + + # 48시간 이전 정리 + cutoff = now - timedelta(hours=48) + cur.execute(f"DELETE FROM {STATS_HOURLY} WHERE stat_hour < %s", (cutoff,)) + + conn.commit() + + result = {'hour': hour_start.isoformat(), 'detections': total, 'events': events} + logger.info(f'stats_aggregator hourly: {result}') + return result + + +def aggregate_daily(target_date: Optional[date] = None) -> dict: + """지정 날짜 기준 daily 집계.""" + d = target_date or date.today() + day_start = datetime(d.year, d.month, d.day, tzinfo=timezone.utc) + day_end = day_start + timedelta(days=1) + now = datetime.now(timezone.utc) + + with get_conn() as conn: + cur = conn.cursor() + + # 총 탐지 + cur.execute( + f"SELECT COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s", + (day_start, day_end) + ) + total = cur.fetchone()[0] or 0 + + # 위반 유형별 (unnest) + cur.execute( + f"""SELECT unnest(violation_categories) AS vt, COUNT(*) + FROM {VAR_TABLE} + WHERE analyzed_at >= %s AND analyzed_at < %s AND violation_categories IS NOT NULL + GROUP BY vt""", + (day_start, day_end) + ) + by_violation = dict(cur.fetchall()) + + # 위험 레벨별 + cur.execute( + f"""SELECT risk_level, COUNT(*) FROM {VAR_TABLE} + WHERE analyzed_at >= %s AND analyzed_at < %s AND risk_level IS NOT NULL + GROUP BY risk_level""", + (day_start, day_end) + ) + by_risk = dict(cur.fetchall()) + + # 이벤트 + cur.execute( + f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s", + (day_start, day_end) + ) + event_count = cur.fetchone()[0] or 0 + + cur.execute( + f"""SELECT COUNT(*) FROM {EVENTS_TABLE} + WHERE occurred_at >= %s AND occurred_at < %s AND level = 'CRITICAL'""", + (day_start, day_end) + ) + critical = cur.fetchone()[0] or 0 + + # 단속 + cur.execute( + f"SELECT COUNT(*) FROM {ENF_TABLE} WHERE enforced_at >= %s AND enforced_at < %s", + (day_start, day_end) + ) + enf_count = cur.fetchone()[0] or 0 + + # 오탐 + cur.execute( + f"""SELECT COUNT(*) FROM {EVENTS_TABLE} + WHERE occurred_at >= %s AND occurred_at < %s AND status = 'FALSE_POSITIVE'""", + (day_start, day_end) + ) + fp = cur.fetchone()[0] or 0 + + # AI 정확도 + accuracy = round((1 - fp / max(event_count, 1)) * 100, 2) if event_count > 0 else None + + cur.execute( + f"""INSERT INTO {STATS_DAILY} + (stat_date, total_detections, by_violation_type, by_risk_level, + event_count, critical_event_count, enforcement_count, + false_positive_count, ai_accuracy_pct, updated_at) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (stat_date) DO UPDATE SET + total_detections = EXCLUDED.total_detections, + by_violation_type = EXCLUDED.by_violation_type, + by_risk_level = EXCLUDED.by_risk_level, + event_count = EXCLUDED.event_count, + critical_event_count = EXCLUDED.critical_event_count, + enforcement_count = EXCLUDED.enforcement_count, + false_positive_count = EXCLUDED.false_positive_count, + ai_accuracy_pct = EXCLUDED.ai_accuracy_pct, + updated_at = EXCLUDED.updated_at""", + (d, total, _jsonb(by_violation), _jsonb(by_risk), + event_count, critical, enf_count, fp, accuracy, now) + ) + conn.commit() + + result = {'date': d.isoformat(), 'detections': total, 'events': event_count, 'accuracy': accuracy} + logger.info(f'stats_aggregator daily: {result}') + return result + + +def aggregate_monthly(target_month: Optional[date] = None) -> dict: + """지정 월 기준 monthly 집계 (daily 합산).""" + d = target_month or date.today().replace(day=1) + month_start = d.replace(day=1) + if month_start.month == 12: + month_end = month_start.replace(year=month_start.year + 1, month=1) + else: + month_end = month_start.replace(month=month_start.month + 1) + now = datetime.now(timezone.utc) + + with get_conn() as conn: + cur = conn.cursor() + + cur.execute( + f"""SELECT + COALESCE(SUM(total_detections), 0), + COALESCE(SUM(event_count), 0), + COALESCE(SUM(critical_event_count), 0), + COALESCE(SUM(enforcement_count), 0), + COALESCE(SUM(false_positive_count), 0) + FROM {STATS_DAILY} + WHERE stat_date >= %s AND stat_date < %s""", + (month_start, month_end) + ) + row = cur.fetchone() + total_det, evt, crit, enf, fp = row + + accuracy = round((1 - fp / max(evt, 1)) * 100, 2) if evt > 0 else None + + cur.execute( + f"""INSERT INTO {STATS_MONTHLY} + (stat_month, total_detections, total_enforcements, + event_count, critical_event_count, false_positive_count, + ai_accuracy_pct, updated_at) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (stat_month) DO UPDATE SET + total_detections = EXCLUDED.total_detections, + total_enforcements = EXCLUDED.total_enforcements, + event_count = EXCLUDED.event_count, + critical_event_count = EXCLUDED.critical_event_count, + false_positive_count = EXCLUDED.false_positive_count, + ai_accuracy_pct = EXCLUDED.ai_accuracy_pct, + updated_at = EXCLUDED.updated_at""", + (month_start, total_det, enf, evt, crit, fp, accuracy, now) + ) + conn.commit() + + result = {'month': month_start.isoformat(), 'detections': total_det, 'enforcements': enf} + logger.info(f'stats_aggregator monthly: {result}') + return result diff --git a/prediction/output/violation_classifier.py b/prediction/output/violation_classifier.py new file mode 100644 index 0000000..82a23ab --- /dev/null +++ b/prediction/output/violation_classifier.py @@ -0,0 +1,87 @@ +""" +위반 유형 라벨링 — 분석 결과에 violation_categories[] 태깅. + +vessel_analysis_results의 각 행에 대해 5개 위반 카테고리를 판정하고 +violation_categories TEXT[] 컬럼을 업데이트합니다. +""" +import logging +from psycopg2.extras import execute_batch + +from config import qualified_table +from db.kcgdb import get_conn + +logger = logging.getLogger(__name__) + +VAR_TABLE = qualified_table('vessel_analysis_results') + + +def classify_violations(result: dict) -> list[str]: + """단일 분석 결과에 대해 위반 유형 리스트 반환.""" + violations = [] + + zone = result.get('zone_code', '') or '' + risk_score = result.get('risk_score', 0) or 0 + is_dark = result.get('is_dark', False) + spoofing = result.get('spoofing_score', 0) or 0 + transship = result.get('transship_suspect', False) + permit = result.get('permit_status', 'UNKNOWN') or 'UNKNOWN' + gap_min = result.get('gap_duration_min', 0) or 0 + + # EEZ 침범 + if zone in ('NLL', 'SPECIAL_FISHING_1', 'SPECIAL_FISHING_2', + 'SPECIAL_FISHING_3', 'SPECIAL_FISHING_4', 'EEZ_KR'): + if permit in ('NONE', 'EXPIRED', 'REVOKED'): + violations.append('EEZ_VIOLATION') + + # 다크베셀 + if is_dark and gap_min > 30: + violations.append('DARK_VESSEL') + + # MMSI 변조 + if spoofing > 0.6: + violations.append('MMSI_TAMPERING') + + # 불법환적 + if transship: + violations.append('ILLEGAL_TRANSSHIP') + + # 어구 불법 (gear_judgment이 있는 경우) + gear_judgment = result.get('gear_judgment', '') or '' + if gear_judgment in ('NO_PERMIT', 'GEAR_MISMATCH', 'ZONE_VIOLATION', 'SEASON_VIOLATION'): + violations.append('ILLEGAL_GEAR') + + # 위험 행동 (다른 위반 없이 고위험) + if not violations and risk_score >= 70: + violations.append('RISK_BEHAVIOR') + + return violations + + +def run_violation_classifier(analysis_results: list[dict]) -> dict: + """ + 분석 결과 리스트에 위반 카테고리를 라벨링하고 DB 업데이트. + + Returns: + { 'classified': int, 'violations_found': int } + """ + updates = [] + violations_found = 0 + + for result in analysis_results: + violations = classify_violations(result) + result_id = result.get('id') + if result_id and violations: + updates.append((violations, result_id)) + violations_found += len(violations) + + if updates: + with get_conn() as conn: + execute_batch( + conn.cursor(), + f"UPDATE {VAR_TABLE} SET violation_categories = %s WHERE id = %s", + updates, + ) + conn.commit() + + logger.info(f'violation_classifier: classified={len(updates)}, violations={violations_found}') + return {'classified': len(updates), 'violations_found': violations_found} diff --git a/prediction/scheduler.py b/prediction/scheduler.py index 46a7dea..4e0b6df 100644 --- a/prediction/scheduler.py +++ b/prediction/scheduler.py @@ -293,7 +293,35 @@ def run_analysis_cycle(): upserted = kcgdb.upsert_results(results) kcgdb.cleanup_old(hours=48) - # 8. Redis에 분석 컨텍스트 캐싱 (채팅용) + # 8. 출력 모듈 (이벤트 생성, 위반 분류, KPI 갱신, 통계 집계, 경보) + try: + from output.violation_classifier import run_violation_classifier + from output.event_generator import run_event_generator + from output.kpi_writer import run_kpi_writer + from output.stats_aggregator import aggregate_hourly, aggregate_daily + from output.alert_dispatcher import run_alert_dispatcher + + from dataclasses import asdict + results_dicts = [asdict(r) for r in results] + # 필드명 매핑 (AnalysisResult → 출력 모듈 기대 형식) + for d in results_dicts: + d['zone_code'] = d.pop('zone', None) + d['gap_duration_min'] = d.get('gap_duration_min', 0) + d['transship_suspect'] = d.pop('is_transship_suspect', False) + d['fleet_is_leader'] = d.pop('is_leader', False) + d['fleet_cluster_id'] = d.pop('cluster_id', None) + d['speed_kn'] = None # 분석 결과에 속도 없음 + run_violation_classifier(results_dicts) + run_event_generator(results_dicts) + run_kpi_writer() + aggregate_hourly() + aggregate_daily() + run_alert_dispatcher() + logger.info('output modules completed') + except Exception as e: + logger.warning('output modules failed (non-fatal): %s', e) + + # 9. Redis에 분석 컨텍스트 캐싱 (채팅용) try: from chat.cache import cache_analysis_context