From 0a4d023c76955ab29c19553c55648ee2b83c8f29 Mon Sep 17 00:00:00 2001 From: htlee Date: Wed, 8 Apr 2026 15:18:18 +0900 Subject: [PATCH] =?UTF-8?q?fix(prediction):=20output=205=EC=A2=85=20?= =?UTF-8?q?=EC=9D=B4=EC=83=81=20=EC=A0=95=EC=83=81=ED=99=94=20(stats/event?= =?UTF-8?q?/lightweight)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 5가지 출력 이상 동시 해결: 1. stats_aggregator (이상 1, 5) - aggregate_hourly에 by_category, by_zone JSON 집계 추가 - hour_start를 KST 기준으로 변경 (대시보드 표기와 boundary 일치) 2. event_generator 룰 정리 (이상 2, 3, 4) - critical_risk 임계값 90→70 (risk.py CRITICAL 분류와 일치) - territorial_sea_violation, contiguous_zone_high_risk, special_zone_entry 신설 (실측 zone_code: TERRITORIAL_SEA/CONTIGUOUS_ZONE/ZONE_*) - 잘못된 NLL/SPECIAL_FISHING_* 룰 제거 - HIGH_RISK_VESSEL 신규 카테고리 (50~69 MEDIUM, 70+ CRITICAL) - break 제거: 한 분석결과가 여러 카테고리에 동시 매칭 가능 3. dedup window prime 분산 (이상 5) - 30/60/120/360분 → 33/67/127/367분 - 5분 사이클 boundary와 LCM 회피하여 정시 일제 만료 패턴 완화 4. lightweight path 신호 보강 (이상 2, 3, 4 근본 해결) - vessel_store._tracks의 24h 누적 궤적으로 dark/spoof/speed_jump 산출 - 6,500 vessels(전체 93%)의 is_dark, spoofing_score가 비로소 채워짐 - compute_lightweight_risk_score에 dark gap, spoofing 가점 추가 (max 60→100 가능, CRITICAL 도달 가능) 시간 처리 원칙 적용: - DB 컬럼은 모두 timestamptz 확인 완료 - aggregate_hourly KST aware datetime 사용 - pandas Timestamp는 source-internal 비교만 (안전) --- prediction/algorithms/risk.py | 22 ++++++++- prediction/output/event_generator.py | 70 ++++++++++++++++++--------- prediction/output/stats_aggregator.py | 56 ++++++++++++++++++--- prediction/scheduler.py | 42 ++++++++++++++-- 4 files changed, 153 insertions(+), 37 deletions(-) diff --git a/prediction/algorithms/risk.py b/prediction/algorithms/risk.py index b4d3505..4f2ef32 100644 --- a/prediction/algorithms/risk.py +++ b/prediction/algorithms/risk.py @@ -11,10 +11,15 @@ def compute_lightweight_risk_score( zone_info: dict, sog: float, is_permitted: Optional[bool] = None, + is_dark: bool = False, + gap_duration_min: int = 0, + spoofing_score: float = 0.0, ) -> Tuple[int, str]: - """위치·허가 이력 기반 경량 위험도 (파이프라인 미통과 선박용). + """위치·허가·다크/스푸핑 기반 경량 위험도 (파이프라인 미통과 선박용). + + pipeline path의 compute_vessel_risk_score와 동일한 임계값(70/50/30)을 사용해 + 분류 결과의 일관성을 유지한다. dark/spoofing 신호를 추가하여 max 100점 도달 가능. - compute_vessel_risk_score의 1번(위치)+4번(허가) 로직과 동일. Returns: (risk_score, risk_level) """ score = 0 @@ -29,6 +34,19 @@ def compute_lightweight_risk_score( if is_permitted is not None and not is_permitted: score += 25 + # 2. 다크 베셀 (최대 25점) + if is_dark: + if gap_duration_min >= 60: + score += 25 + elif gap_duration_min >= 30: + score += 10 + + # 3. 스푸핑 (최대 15점) + if spoofing_score > 0.7: + score += 15 + elif spoofing_score > 0.5: + score += 8 + # 4. 허가 이력 (최대 20점) if is_permitted is not None and not is_permitted: score += 20 diff --git a/prediction/output/event_generator.py b/prediction/output/event_generator.py index 17a5a5c..887959a 100644 --- a/prediction/output/event_generator.py +++ b/prediction/output/event_generator.py @@ -19,36 +19,51 @@ logger = logging.getLogger(__name__) EVENTS_TABLE = qualified_table('prediction_events') -# 카테고리별 dedup 윈도우 (분) +# 카테고리별 dedup 윈도우 (분). +# 사이클이 5분 간격이므로 5의 배수를 피해서 boundary 일제 만료 패턴을 회피한다. DEDUP_WINDOWS = { - 'EEZ_INTRUSION': 30, - 'DARK_VESSEL': 120, - 'FLEET_CLUSTER': 360, - 'ILLEGAL_TRANSSHIP': 60, - 'MMSI_TAMPERING': 30, - 'AIS_LOSS': 120, - 'SPEED_ANOMALY': 60, - 'ZONE_DEPARTURE': 120, - 'GEAR_ILLEGAL': 360, - 'AIS_RESUME': 60, + 'EEZ_INTRUSION': 33, + 'DARK_VESSEL': 127, + 'FLEET_CLUSTER': 367, + 'ILLEGAL_TRANSSHIP': 67, + 'MMSI_TAMPERING': 33, + 'AIS_LOSS': 127, + 'SPEED_ANOMALY': 67, + 'ZONE_DEPARTURE': 127, + 'GEAR_ILLEGAL': 367, + 'AIS_RESUME': 67, + 'HIGH_RISK_VESSEL': 67, } # 이벤트 생성 룰 +# 한 분석결과가 여러 룰에 매칭되면 모두 생성한다 (카테고리별 dedup_key가 분리되어 안전). +# zone_code 실측값: EEZ_OR_BEYOND/ZONE_II/III/IV/CONTIGUOUS_ZONE/TERRITORIAL_SEA +# (algorithms.location.classify_zone 결과) RULES = [ { - 'name': 'critical_risk', - 'condition': lambda r: r.get('risk_score', 0) >= 90, + # 영해 침범 — 가장 심각 + 'name': 'territorial_sea_violation', + 'condition': lambda r: r.get('zone_code') == 'TERRITORIAL_SEA', 'level': 'CRITICAL', 'category': 'EEZ_INTRUSION', - 'title_fn': lambda r: f"고위험 선박 탐지 (위험도 {r.get('risk_score', 0)})", + 'title_fn': lambda r: f"영해 침범 탐지 (위험도 {r.get('risk_score', 0)})", }, { - 'name': 'eez_violation', - 'condition': lambda r: r.get('zone_code', '') in ('NLL', 'SPECIAL_FISHING_1', 'SPECIAL_FISHING_2') - and r.get('risk_score', 0) >= 70, - 'level': 'CRITICAL', + # 접속수역 + 고위험 + 'name': 'contiguous_zone_high_risk', + 'condition': lambda r: r.get('zone_code') == 'CONTIGUOUS_ZONE' + and (r.get('risk_score', 0) or 0) >= 50, + 'level': 'HIGH', 'category': 'EEZ_INTRUSION', - 'title_fn': lambda r: f"EEZ 침범 탐지 ({r.get('zone_code', '')})", + 'title_fn': lambda r: f"접속수역 침입 (위험도 {r.get('risk_score', 0)})", + }, + { + # 종합 위험도 CRITICAL — risk.py 분류와 동일 임계값 + 'name': 'critical_risk', + 'condition': lambda r: (r.get('risk_score', 0) or 0) >= 70, + 'level': 'CRITICAL', + 'category': 'HIGH_RISK_VESSEL', + 'title_fn': lambda r: f"고위험 선박 탐지 (위험도 {r.get('risk_score', 0)})", }, { 'name': 'dark_vessel_long', @@ -79,10 +94,20 @@ RULES = [ 'title_fn': lambda r: f"선단 밀집 감지 (클러스터 {r.get('fleet_cluster_id')})", }, { - 'name': 'high_risk', - 'condition': lambda r: r.get('risk_level') == 'HIGH' and r.get('risk_score', 0) >= 60, + # 특정수역(ZONE_*) 진입 — 운영자 모니터링용 + 'name': 'special_zone_entry', + 'condition': lambda r: (r.get('zone_code') or '').startswith('ZONE_') + and (r.get('risk_score', 0) or 0) >= 40, 'level': 'MEDIUM', 'category': 'ZONE_DEPARTURE', + 'title_fn': lambda r: f"특정수역 진입 ({r.get('zone_code')}, 위험도 {r.get('risk_score', 0)})", + }, + { + # 고위험 행동 패턴 (risk_level=HIGH 이상은 위 critical_risk가 잡고, 50~69점만 여기에) + 'name': 'high_risk', + 'condition': lambda r: 50 <= (r.get('risk_score', 0) or 0) < 70, + 'level': 'MEDIUM', + 'category': 'HIGH_RISK_VESSEL', 'title_fn': lambda r: f"위험 행동 패턴 (위험도 {r.get('risk_score', 0)})", }, ] @@ -181,7 +206,8 @@ def run_event_generator(analysis_results: list[dict]) -> dict: dedup_key, )) generated += 1 - break # 한 분석결과당 최고 우선순위 룰 1개만 + # break 제거: 한 분석결과가 여러 룰에 매칭되면 모두 생성 + # (카테고리별 dedup_key가 분리되어 안전) if events_to_insert: execute_values( diff --git a/prediction/output/stats_aggregator.py b/prediction/output/stats_aggregator.py index 8564786..2f38f85 100644 --- a/prediction/output/stats_aggregator.py +++ b/prediction/output/stats_aggregator.py @@ -31,10 +31,21 @@ def _jsonb(d: dict) -> str: def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict: - """현재 시간 기준 hourly 집계.""" - now = target_hour or datetime.now(timezone.utc) - hour_start = now.replace(minute=0, second=0, microsecond=0) + """현재 시간 기준 hourly 집계 (KST hour boundary). + + DB 컬럼은 모두 timestamptz이므로 aware datetime이면 안전 비교됨. + 운영자/대시보드 표기와 stat_hour boundary가 일치하도록 KST 기준. + """ + if target_hour is not None: + # 외부에서 특정 시점을 지정한 경우 KST로 정규화 + if target_hour.tzinfo is None: + target_hour = target_hour.replace(tzinfo=_KST) + now_kst = target_hour.astimezone(_KST) + else: + now_kst = datetime.now(_KST) + hour_start = now_kst.replace(minute=0, second=0, microsecond=0) hour_end = hour_start + timedelta(hours=1) + updated_at = datetime.now(timezone.utc) with get_conn() as conn: cur = conn.cursor() @@ -55,6 +66,15 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict: ) by_risk = dict(cur.fetchall()) + # zone별 (vessel_analysis_results.zone_code) + cur.execute( + f"""SELECT zone_code, COUNT(*) FROM {VAR_TABLE} + WHERE analyzed_at >= %s AND analyzed_at < %s AND zone_code IS NOT NULL + GROUP BY zone_code""", + (hour_start, hour_end) + ) + by_zone = dict(cur.fetchall()) + # 이벤트 수 cur.execute( f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s", @@ -62,6 +82,15 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict: ) events = cur.fetchone()[0] or 0 + # 카테고리별 이벤트 (prediction_events.category) + cur.execute( + f"""SELECT category, COUNT(*) FROM {EVENTS_TABLE} + WHERE occurred_at >= %s AND occurred_at < %s AND category IS NOT NULL + GROUP BY category""", + (hour_start, hour_end) + ) + by_category = dict(cur.fetchall()) + # CRITICAL 이벤트 cur.execute( f"""SELECT COUNT(*) FROM {EVENTS_TABLE} @@ -72,24 +101,35 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict: cur.execute( f"""INSERT INTO {STATS_HOURLY} - (stat_hour, total_detections, by_risk_level, event_count, critical_count, updated_at) - VALUES (%s, %s, %s, %s, %s, %s) + (stat_hour, total_detections, by_category, by_zone, by_risk_level, + event_count, critical_count, updated_at) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (stat_hour) DO UPDATE SET total_detections = EXCLUDED.total_detections, + by_category = EXCLUDED.by_category, + by_zone = EXCLUDED.by_zone, by_risk_level = EXCLUDED.by_risk_level, event_count = EXCLUDED.event_count, critical_count = EXCLUDED.critical_count, updated_at = EXCLUDED.updated_at""", - (hour_start, total, _jsonb(by_risk), events, critical, now) + (hour_start, total, _jsonb(by_category), _jsonb(by_zone), + _jsonb(by_risk), events, critical, updated_at) ) # 48시간 이전 정리 - cutoff = now - timedelta(hours=48) + cutoff = updated_at - timedelta(hours=48) cur.execute(f"DELETE FROM {STATS_HOURLY} WHERE stat_hour < %s", (cutoff,)) conn.commit() - result = {'hour': hour_start.isoformat(), 'detections': total, 'events': events} + result = { + 'hour': hour_start.isoformat(), + 'detections': total, + 'events': events, + 'critical': critical, + 'categories': len(by_category), + 'zones': len(by_zone), + } logger.info(f'stats_aggregator hourly: {result}') return result diff --git a/prediction/scheduler.py b/prediction/scheduler.py index 4e0b6df..b489d99 100644 --- a/prediction/scheduler.py +++ b/prediction/scheduler.py @@ -213,6 +213,7 @@ def run_analysis_cycle(): )) # ── 5.5 경량 분석 — 파이프라인 미통과 412* 선박 ── + # vessel_store._tracks의 24h 누적 궤적을 직접 활용하여 dark/spoof 신호도 산출. from algorithms.risk import compute_lightweight_risk_score pipeline_mmsis = {c['mmsi'] for c in classifications} @@ -222,6 +223,8 @@ def run_analysis_cycle(): now = datetime.now(timezone.utc) all_positions = vessel_store.get_all_latest_positions() lw_count = 0 + lw_dark = 0 + lw_spoof = 0 for mmsi in lightweight_mmsis: pos = all_positions.get(mmsi) if pos is None or pos.get('lat') is None: @@ -239,9 +242,35 @@ def run_analysis_cycle(): else: state = 'SAILING' + # 24h 누적 궤적으로 dark/spoofing 산출 (vessel_store._tracks 직접 접근) + df_v = vessel_store._tracks.get(mmsi) + dark = False + gap_min = 0 + spoof_score = 0.0 + speed_jumps = 0 + if df_v is not None and len(df_v) >= 2: + try: + dark, gap_min = is_dark_vessel(df_v) + except Exception: + pass + try: + spoof_score = compute_spoofing_score(df_v) + except Exception: + pass + try: + speed_jumps = count_speed_jumps(df_v) + except Exception: + pass + if dark: + lw_dark += 1 + if spoof_score > 0.5: + lw_spoof += 1 + is_permitted = vessel_store.is_permitted(mmsi) risk_score, risk_level = compute_lightweight_risk_score( zone_info, sog, is_permitted=is_permitted, + is_dark=dark, gap_duration_min=gap_min, + spoofing_score=spoof_score, ) # BD-09 오프셋은 중국 선박이므로 제외 (412* = 중국) @@ -256,11 +285,11 @@ def run_analysis_cycle(): activity_state=state, ucaf_score=0.0, ucft_score=0.0, - is_dark=False, - gap_duration_min=0, - spoofing_score=0.0, + is_dark=dark, + gap_duration_min=gap_min, + spoofing_score=spoof_score, bd09_offset_m=0.0, - speed_jump_count=0, + speed_jump_count=speed_jumps, cluster_id=-1, cluster_size=0, is_leader=False, @@ -272,7 +301,10 @@ def run_analysis_cycle(): transship_duration_min=0, )) lw_count += 1 - logger.info('lightweight analysis: %d vessels', lw_count) + logger.info( + 'lightweight analysis: %d vessels (dark=%d, spoof>0.5=%d)', + lw_count, lw_dark, lw_spoof, + ) # 6. 환적 의심 탐지 (pair_history 모듈 레벨로 사이클 간 유지) from algorithms.transshipment import detect_transshipment