From 0a4d023c76955ab29c19553c55648ee2b83c8f29 Mon Sep 17 00:00:00 2001 From: htlee Date: Wed, 8 Apr 2026 15:18:18 +0900 Subject: [PATCH 1/5] =?UTF-8?q?fix(prediction):=20output=205=EC=A2=85=20?= =?UTF-8?q?=EC=9D=B4=EC=83=81=20=EC=A0=95=EC=83=81=ED=99=94=20(stats/event?= =?UTF-8?q?/lightweight)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 5가지 출력 이상 동시 해결: 1. stats_aggregator (이상 1, 5) - aggregate_hourly에 by_category, by_zone JSON 집계 추가 - hour_start를 KST 기준으로 변경 (대시보드 표기와 boundary 일치) 2. event_generator 룰 정리 (이상 2, 3, 4) - critical_risk 임계값 90→70 (risk.py CRITICAL 분류와 일치) - territorial_sea_violation, contiguous_zone_high_risk, special_zone_entry 신설 (실측 zone_code: TERRITORIAL_SEA/CONTIGUOUS_ZONE/ZONE_*) - 잘못된 NLL/SPECIAL_FISHING_* 룰 제거 - HIGH_RISK_VESSEL 신규 카테고리 (50~69 MEDIUM, 70+ CRITICAL) - break 제거: 한 분석결과가 여러 카테고리에 동시 매칭 가능 3. dedup window prime 분산 (이상 5) - 30/60/120/360분 → 33/67/127/367분 - 5분 사이클 boundary와 LCM 회피하여 정시 일제 만료 패턴 완화 4. lightweight path 신호 보강 (이상 2, 3, 4 근본 해결) - vessel_store._tracks의 24h 누적 궤적으로 dark/spoof/speed_jump 산출 - 6,500 vessels(전체 93%)의 is_dark, spoofing_score가 비로소 채워짐 - compute_lightweight_risk_score에 dark gap, spoofing 가점 추가 (max 60→100 가능, CRITICAL 도달 가능) 시간 처리 원칙 적용: - DB 컬럼은 모두 timestamptz 확인 완료 - aggregate_hourly KST aware datetime 사용 - pandas Timestamp는 source-internal 비교만 (안전) --- prediction/algorithms/risk.py | 22 ++++++++- prediction/output/event_generator.py | 70 ++++++++++++++++++--------- prediction/output/stats_aggregator.py | 56 ++++++++++++++++++--- prediction/scheduler.py | 42 ++++++++++++++-- 4 files changed, 153 insertions(+), 37 deletions(-) diff --git a/prediction/algorithms/risk.py b/prediction/algorithms/risk.py index b4d3505..4f2ef32 100644 --- a/prediction/algorithms/risk.py +++ b/prediction/algorithms/risk.py @@ -11,10 +11,15 @@ def compute_lightweight_risk_score( zone_info: dict, sog: float, is_permitted: Optional[bool] = None, + is_dark: bool = False, + gap_duration_min: int = 0, + spoofing_score: float = 0.0, ) -> Tuple[int, str]: - """위치·허가 이력 기반 경량 위험도 (파이프라인 미통과 선박용). + """위치·허가·다크/스푸핑 기반 경량 위험도 (파이프라인 미통과 선박용). + + pipeline path의 compute_vessel_risk_score와 동일한 임계값(70/50/30)을 사용해 + 분류 결과의 일관성을 유지한다. dark/spoofing 신호를 추가하여 max 100점 도달 가능. - compute_vessel_risk_score의 1번(위치)+4번(허가) 로직과 동일. Returns: (risk_score, risk_level) """ score = 0 @@ -29,6 +34,19 @@ def compute_lightweight_risk_score( if is_permitted is not None and not is_permitted: score += 25 + # 2. 다크 베셀 (최대 25점) + if is_dark: + if gap_duration_min >= 60: + score += 25 + elif gap_duration_min >= 30: + score += 10 + + # 3. 스푸핑 (최대 15점) + if spoofing_score > 0.7: + score += 15 + elif spoofing_score > 0.5: + score += 8 + # 4. 허가 이력 (최대 20점) if is_permitted is not None and not is_permitted: score += 20 diff --git a/prediction/output/event_generator.py b/prediction/output/event_generator.py index 17a5a5c..887959a 100644 --- a/prediction/output/event_generator.py +++ b/prediction/output/event_generator.py @@ -19,36 +19,51 @@ logger = logging.getLogger(__name__) EVENTS_TABLE = qualified_table('prediction_events') -# 카테고리별 dedup 윈도우 (분) +# 카테고리별 dedup 윈도우 (분). +# 사이클이 5분 간격이므로 5의 배수를 피해서 boundary 일제 만료 패턴을 회피한다. DEDUP_WINDOWS = { - 'EEZ_INTRUSION': 30, - 'DARK_VESSEL': 120, - 'FLEET_CLUSTER': 360, - 'ILLEGAL_TRANSSHIP': 60, - 'MMSI_TAMPERING': 30, - 'AIS_LOSS': 120, - 'SPEED_ANOMALY': 60, - 'ZONE_DEPARTURE': 120, - 'GEAR_ILLEGAL': 360, - 'AIS_RESUME': 60, + 'EEZ_INTRUSION': 33, + 'DARK_VESSEL': 127, + 'FLEET_CLUSTER': 367, + 'ILLEGAL_TRANSSHIP': 67, + 'MMSI_TAMPERING': 33, + 'AIS_LOSS': 127, + 'SPEED_ANOMALY': 67, + 'ZONE_DEPARTURE': 127, + 'GEAR_ILLEGAL': 367, + 'AIS_RESUME': 67, + 'HIGH_RISK_VESSEL': 67, } # 이벤트 생성 룰 +# 한 분석결과가 여러 룰에 매칭되면 모두 생성한다 (카테고리별 dedup_key가 분리되어 안전). +# zone_code 실측값: EEZ_OR_BEYOND/ZONE_II/III/IV/CONTIGUOUS_ZONE/TERRITORIAL_SEA +# (algorithms.location.classify_zone 결과) RULES = [ { - 'name': 'critical_risk', - 'condition': lambda r: r.get('risk_score', 0) >= 90, + # 영해 침범 — 가장 심각 + 'name': 'territorial_sea_violation', + 'condition': lambda r: r.get('zone_code') == 'TERRITORIAL_SEA', 'level': 'CRITICAL', 'category': 'EEZ_INTRUSION', - 'title_fn': lambda r: f"고위험 선박 탐지 (위험도 {r.get('risk_score', 0)})", + 'title_fn': lambda r: f"영해 침범 탐지 (위험도 {r.get('risk_score', 0)})", }, { - 'name': 'eez_violation', - 'condition': lambda r: r.get('zone_code', '') in ('NLL', 'SPECIAL_FISHING_1', 'SPECIAL_FISHING_2') - and r.get('risk_score', 0) >= 70, - 'level': 'CRITICAL', + # 접속수역 + 고위험 + 'name': 'contiguous_zone_high_risk', + 'condition': lambda r: r.get('zone_code') == 'CONTIGUOUS_ZONE' + and (r.get('risk_score', 0) or 0) >= 50, + 'level': 'HIGH', 'category': 'EEZ_INTRUSION', - 'title_fn': lambda r: f"EEZ 침범 탐지 ({r.get('zone_code', '')})", + 'title_fn': lambda r: f"접속수역 침입 (위험도 {r.get('risk_score', 0)})", + }, + { + # 종합 위험도 CRITICAL — risk.py 분류와 동일 임계값 + 'name': 'critical_risk', + 'condition': lambda r: (r.get('risk_score', 0) or 0) >= 70, + 'level': 'CRITICAL', + 'category': 'HIGH_RISK_VESSEL', + 'title_fn': lambda r: f"고위험 선박 탐지 (위험도 {r.get('risk_score', 0)})", }, { 'name': 'dark_vessel_long', @@ -79,10 +94,20 @@ RULES = [ 'title_fn': lambda r: f"선단 밀집 감지 (클러스터 {r.get('fleet_cluster_id')})", }, { - 'name': 'high_risk', - 'condition': lambda r: r.get('risk_level') == 'HIGH' and r.get('risk_score', 0) >= 60, + # 특정수역(ZONE_*) 진입 — 운영자 모니터링용 + 'name': 'special_zone_entry', + 'condition': lambda r: (r.get('zone_code') or '').startswith('ZONE_') + and (r.get('risk_score', 0) or 0) >= 40, 'level': 'MEDIUM', 'category': 'ZONE_DEPARTURE', + 'title_fn': lambda r: f"특정수역 진입 ({r.get('zone_code')}, 위험도 {r.get('risk_score', 0)})", + }, + { + # 고위험 행동 패턴 (risk_level=HIGH 이상은 위 critical_risk가 잡고, 50~69점만 여기에) + 'name': 'high_risk', + 'condition': lambda r: 50 <= (r.get('risk_score', 0) or 0) < 70, + 'level': 'MEDIUM', + 'category': 'HIGH_RISK_VESSEL', 'title_fn': lambda r: f"위험 행동 패턴 (위험도 {r.get('risk_score', 0)})", }, ] @@ -181,7 +206,8 @@ def run_event_generator(analysis_results: list[dict]) -> dict: dedup_key, )) generated += 1 - break # 한 분석결과당 최고 우선순위 룰 1개만 + # break 제거: 한 분석결과가 여러 룰에 매칭되면 모두 생성 + # (카테고리별 dedup_key가 분리되어 안전) if events_to_insert: execute_values( diff --git a/prediction/output/stats_aggregator.py b/prediction/output/stats_aggregator.py index 8564786..2f38f85 100644 --- a/prediction/output/stats_aggregator.py +++ b/prediction/output/stats_aggregator.py @@ -31,10 +31,21 @@ def _jsonb(d: dict) -> str: def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict: - """현재 시간 기준 hourly 집계.""" - now = target_hour or datetime.now(timezone.utc) - hour_start = now.replace(minute=0, second=0, microsecond=0) + """현재 시간 기준 hourly 집계 (KST hour boundary). + + DB 컬럼은 모두 timestamptz이므로 aware datetime이면 안전 비교됨. + 운영자/대시보드 표기와 stat_hour boundary가 일치하도록 KST 기준. + """ + if target_hour is not None: + # 외부에서 특정 시점을 지정한 경우 KST로 정규화 + if target_hour.tzinfo is None: + target_hour = target_hour.replace(tzinfo=_KST) + now_kst = target_hour.astimezone(_KST) + else: + now_kst = datetime.now(_KST) + hour_start = now_kst.replace(minute=0, second=0, microsecond=0) hour_end = hour_start + timedelta(hours=1) + updated_at = datetime.now(timezone.utc) with get_conn() as conn: cur = conn.cursor() @@ -55,6 +66,15 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict: ) by_risk = dict(cur.fetchall()) + # zone별 (vessel_analysis_results.zone_code) + cur.execute( + f"""SELECT zone_code, COUNT(*) FROM {VAR_TABLE} + WHERE analyzed_at >= %s AND analyzed_at < %s AND zone_code IS NOT NULL + GROUP BY zone_code""", + (hour_start, hour_end) + ) + by_zone = dict(cur.fetchall()) + # 이벤트 수 cur.execute( f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s", @@ -62,6 +82,15 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict: ) events = cur.fetchone()[0] or 0 + # 카테고리별 이벤트 (prediction_events.category) + cur.execute( + f"""SELECT category, COUNT(*) FROM {EVENTS_TABLE} + WHERE occurred_at >= %s AND occurred_at < %s AND category IS NOT NULL + GROUP BY category""", + (hour_start, hour_end) + ) + by_category = dict(cur.fetchall()) + # CRITICAL 이벤트 cur.execute( f"""SELECT COUNT(*) FROM {EVENTS_TABLE} @@ -72,24 +101,35 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict: cur.execute( f"""INSERT INTO {STATS_HOURLY} - (stat_hour, total_detections, by_risk_level, event_count, critical_count, updated_at) - VALUES (%s, %s, %s, %s, %s, %s) + (stat_hour, total_detections, by_category, by_zone, by_risk_level, + event_count, critical_count, updated_at) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (stat_hour) DO UPDATE SET total_detections = EXCLUDED.total_detections, + by_category = EXCLUDED.by_category, + by_zone = EXCLUDED.by_zone, by_risk_level = EXCLUDED.by_risk_level, event_count = EXCLUDED.event_count, critical_count = EXCLUDED.critical_count, updated_at = EXCLUDED.updated_at""", - (hour_start, total, _jsonb(by_risk), events, critical, now) + (hour_start, total, _jsonb(by_category), _jsonb(by_zone), + _jsonb(by_risk), events, critical, updated_at) ) # 48시간 이전 정리 - cutoff = now - timedelta(hours=48) + cutoff = updated_at - timedelta(hours=48) cur.execute(f"DELETE FROM {STATS_HOURLY} WHERE stat_hour < %s", (cutoff,)) conn.commit() - result = {'hour': hour_start.isoformat(), 'detections': total, 'events': events} + result = { + 'hour': hour_start.isoformat(), + 'detections': total, + 'events': events, + 'critical': critical, + 'categories': len(by_category), + 'zones': len(by_zone), + } logger.info(f'stats_aggregator hourly: {result}') return result diff --git a/prediction/scheduler.py b/prediction/scheduler.py index 4e0b6df..b489d99 100644 --- a/prediction/scheduler.py +++ b/prediction/scheduler.py @@ -213,6 +213,7 @@ def run_analysis_cycle(): )) # ── 5.5 경량 분석 — 파이프라인 미통과 412* 선박 ── + # vessel_store._tracks의 24h 누적 궤적을 직접 활용하여 dark/spoof 신호도 산출. from algorithms.risk import compute_lightweight_risk_score pipeline_mmsis = {c['mmsi'] for c in classifications} @@ -222,6 +223,8 @@ def run_analysis_cycle(): now = datetime.now(timezone.utc) all_positions = vessel_store.get_all_latest_positions() lw_count = 0 + lw_dark = 0 + lw_spoof = 0 for mmsi in lightweight_mmsis: pos = all_positions.get(mmsi) if pos is None or pos.get('lat') is None: @@ -239,9 +242,35 @@ def run_analysis_cycle(): else: state = 'SAILING' + # 24h 누적 궤적으로 dark/spoofing 산출 (vessel_store._tracks 직접 접근) + df_v = vessel_store._tracks.get(mmsi) + dark = False + gap_min = 0 + spoof_score = 0.0 + speed_jumps = 0 + if df_v is not None and len(df_v) >= 2: + try: + dark, gap_min = is_dark_vessel(df_v) + except Exception: + pass + try: + spoof_score = compute_spoofing_score(df_v) + except Exception: + pass + try: + speed_jumps = count_speed_jumps(df_v) + except Exception: + pass + if dark: + lw_dark += 1 + if spoof_score > 0.5: + lw_spoof += 1 + is_permitted = vessel_store.is_permitted(mmsi) risk_score, risk_level = compute_lightweight_risk_score( zone_info, sog, is_permitted=is_permitted, + is_dark=dark, gap_duration_min=gap_min, + spoofing_score=spoof_score, ) # BD-09 오프셋은 중국 선박이므로 제외 (412* = 중국) @@ -256,11 +285,11 @@ def run_analysis_cycle(): activity_state=state, ucaf_score=0.0, ucft_score=0.0, - is_dark=False, - gap_duration_min=0, - spoofing_score=0.0, + is_dark=dark, + gap_duration_min=gap_min, + spoofing_score=spoof_score, bd09_offset_m=0.0, - speed_jump_count=0, + speed_jump_count=speed_jumps, cluster_id=-1, cluster_size=0, is_leader=False, @@ -272,7 +301,10 @@ def run_analysis_cycle(): transship_duration_min=0, )) lw_count += 1 - logger.info('lightweight analysis: %d vessels', lw_count) + logger.info( + 'lightweight analysis: %d vessels (dark=%d, spoof>0.5=%d)', + lw_count, lw_dark, lw_spoof, + ) # 6. 환적 의심 탐지 (pair_history 모듈 레벨로 사이클 간 유지) from algorithms.transshipment import detect_transshipment From 2e5d55a27f6e63d9e46e7bb5bbb878d4c2215079 Mon Sep 17 00:00:00 2001 From: htlee Date: Wed, 8 Apr 2026 16:11:02 +0900 Subject: [PATCH 2/5] =?UTF-8?q?fix(prediction):=20dark=20=ED=8C=90?= =?UTF-8?q?=EC=A0=95=EC=97=90=20=ED=95=9C=EA=B5=AD=20AIS=20=EC=88=98?= =?UTF-8?q?=EC=8B=A0=20=EC=98=81=EC=97=AD=20=ED=95=84=ED=84=B0=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 16:00 cron 1차 분석 결과, lightweight path가 6,500척 중 5,250척(80%)을 dark로 판정. 좌표 검증 결과 모두 30~37°N/122~125°E (동중국해/서해)로 한국 AIS 수신소 도달 한계 영역에 위치하여 정상 운항 중에도 20~24h 통째로 수신이 끊기는 자연 gap이 발생. 핫픽스: lightweight path에서 dark 판정 직후 마지막 위치가 북위 32~39.5, 동경 124~132 (한반도 + EEZ + 접속수역 여유 포함) 밖이면 dark를 False로 강제. 한국 측 관심 영역의 dark 탐지는 그대로 유지. 근본 개편(STATIONARY 정박 필터, 진입 후 단절 패턴, gap 임계값 재조정 등)은 12시간 추적 데이터 수집 후 내일 진행. --- prediction/scheduler.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/prediction/scheduler.py b/prediction/scheduler.py index b489d99..e8d4255 100644 --- a/prediction/scheduler.py +++ b/prediction/scheduler.py @@ -261,6 +261,15 @@ def run_analysis_cycle(): speed_jumps = count_speed_jumps(df_v) except Exception: pass + # 핫픽스 (2026-04-08): 한국 AIS 수신 가능 영역 밖에서의 dark 판정은 오탐. + # 412* 중국 선박이 자국 EEZ로 깊이 들어가면(~124°E 서쪽) 한국 수신소 + # 도달 한계로 자연 gap 발생. 해당 영역 밖은 dark에서 제외한다. + # 영역: 북위 32~39.5, 동경 124~132 (한반도 + EEZ + 접속수역 여유 포함) + if dark: + in_kr_reception = (124.0 <= lon <= 132.0) and (32.0 <= lat <= 39.5) + if not in_kr_reception: + dark = False + gap_min = 0 if dark: lw_dark += 1 if spoof_score > 0.5: From e5d123e4c59c27c655400638709a8a0c762f4f41 Mon Sep 17 00:00:00 2001 From: htlee Date: Thu, 9 Apr 2026 07:42:15 +0900 Subject: [PATCH 3/5] =?UTF-8?q?feat(prediction):=20dark=20=EC=9D=98?= =?UTF-8?q?=EC=8B=AC=20=EC=A0=90=EC=88=98=ED=99=94=20+=20transship=20?= =?UTF-8?q?=EB=B2=A0=ED=85=8C=EB=9E=91=20=EA=B4=80=EC=A0=90=20=EC=9E=AC?= =?UTF-8?q?=EC=84=A4=EA=B3=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 12h 누적 분석 결과 dark/transship이 운영 불가 수준으로 판정되어 탐지 철학을 근본부터 전환. ## dark 재설계: 넓은 탐지 + 의도적 OFF 의심 점수화 기존 "필터 제외" 방식에서 "넓게 기록 + 점수 산출 + 등급별 알람"으로 전환. 해경 베테랑 관점의 8가지 패턴을 가점 합산하여 0~100점 산출. - P1 이동 중 OFF (gap 직전 SOG > 2kn) - P2 민감 수역 경계 근처 OFF (영해/접속수역/특정조업수역) - P3 반복 이력 (7일 내 재발) — 가장 강력 - P4 gap 후 이동거리 비정상 (은폐 이동) - P5 주간 조업 시간 OFF - P6 gap 직전 이상 행동 (teleport/급변) - P7 무허가 선박 가점 - P8 장기 gap (3h/6h 구간별) - 감점: gap 시작 위치가 한국 AIS 수신 커버리지 밖 완전 제외: - 어구 AIS (GEAR_PATTERN 매칭, fleet_tracker SSOT) - 한국 선박 (MMSI 440*, 441*) — 해경 관할 아님 등급: CRITICAL(70+) / HIGH(50~69) / WATCH(30~49) / NONE 이벤트는 HIGH 이상만 생성 (WATCH는 DB 저장만). 신규 함수: - algorithms/dark_vessel.py: analyze_dark_pattern, compute_dark_suspicion - scheduler.py: _is_dark_excluded, _fetch_dark_history (사이클당 1회 7일 이력 일괄 조회) pipeline path + lightweight path 모두 동일 로직 적용. 결과는 features JSONB에 {dark_suspicion_score, dark_patterns, dark_tier, dark_history_7d, dark_history_24h, gap_start_*} 저장. ## transship 재설계: 베테랑 함정근무자 기준 한정된 함정 자원으로 단속 출동을 결정할 수 있는 신뢰도 확보. 상수 재조정: - SOG_THRESHOLD_KN: 2.0 → 1.0 (완전 정박만) - PROXIMITY_DEG: 0.001 → 0.0007 (~77m) - SUSPECT_DURATION_MIN: 60 → 45 (gap tolerance 있음) - PAIR_EXPIRY_MIN: 120 → 180 - GAP_TOLERANCE_CYCLES: 2 신규 (GPS 노이즈 완화) 필수 조건 (모두 충족): - 한국 EEZ 관할 수역 이내 - 환적 불가 선종 제외 (passenger/military/tanker/pilot/tug/sar) - 어구 AIS 양쪽 제외 - 45분 이상 지속 (miss_count 2 사이클까지 용인) 점수 체계 (base 40): - 야간(KST 20~04): +15 - 무허가 가점: +20 - COG 편차 > 45°: +20 (나란히 가는 선단 배제) - 지속 ≥ 90분: +20 - 영해/접속수역 위치: +15 등급: CRITICAL(90+) / HIGH(70~89) / WATCH(50~69) WATCH는 저장 없이 로그만. HIGH/CRITICAL만 이벤트. pair_history 구조 확장: - 기존: {(a,b): datetime} - 신규: {(a,b): {'first_seen', 'last_seen', 'miss_count', 'last_lat/lon/cog_a/cog_b'}} - miss_count > GAP_TOLERANCE_CYCLES면 삭제 (즉시 리셋 아님) ## event_generator 룰 교체 - dark_vessel_long 룰 제거 → dark_critical, dark_high (features.dark_tier 기반) - transship 룰 제거 → transship_critical, transship_high (features.transship_tier 기반) - DEDUP: ILLEGAL_TRANSSHIP 67→181, DARK_VESSEL 127→131, ZONE_DEPARTURE 127→89 ## 공통 정리 - scheduler.py의 _gear_re 삭제, fleet_tracker.GEAR_PATTERN 단일 SSOT로 통합 --- prediction/algorithms/dark_vessel.py | 274 ++++++++++++++++++ prediction/algorithms/transshipment.py | 374 ++++++++++++++++++++----- prediction/output/event_generator.py | 47 +++- prediction/scheduler.py | 249 ++++++++++++---- 4 files changed, 813 insertions(+), 131 deletions(-) diff --git a/prediction/algorithms/dark_vessel.py b/prediction/algorithms/dark_vessel.py index 9e8b9f2..656cbd0 100644 --- a/prediction/algorithms/dark_vessel.py +++ b/prediction/algorithms/dark_vessel.py @@ -1,3 +1,5 @@ +from typing import Callable, Optional + import pandas as pd from algorithms.location import haversine_nm @@ -5,6 +7,10 @@ GAP_SUSPICIOUS_SEC = 1800 # 30분 GAP_HIGH_SUSPICIOUS_SEC = 3600 # 1시간 GAP_VIOLATION_SEC = 86400 # 24시간 +# 한국 AIS 수신 가능 추정 영역 (한반도 + EEZ + 접속수역 여유) +_KR_COVERAGE_LAT = (32.0, 39.5) +_KR_COVERAGE_LON = (124.0, 132.0) + def detect_ais_gaps(df_vessel: pd.DataFrame) -> list[dict]: """AIS 수신 기록에서 소실 구간 추출.""" @@ -57,3 +63,271 @@ def is_dark_vessel(df_vessel: pd.DataFrame) -> tuple[bool, int]: max_gap_min = max(g['gap_min'] for g in gaps) is_dark = max_gap_min >= 30 # 30분 이상 소실 return is_dark, int(max_gap_min) + + +def _classify_state(sog: float) -> str: + """SOG 기준 간단 활동 상태 분류.""" + if sog is None: + return 'UNKNOWN' + if sog <= 1.0: + return 'STATIONARY' + if sog <= 5.0: + return 'FISHING' + return 'SAILING' + + +def analyze_dark_pattern(df_vessel: pd.DataFrame) -> dict: + """dark 판정 + gap 상세 정보 반환. + + 가장 긴 gap 한 건을 기준으로 패턴 분석에 필요한 정보를 모두 수집한다. + is_dark가 False이면 나머지 필드는 기본값으로 채움. + + Returns: + { + 'is_dark': bool, + 'gap_min': int, + 'gap_start_lat': Optional[float], + 'gap_start_lon': Optional[float], + 'gap_start_sog': float, + 'gap_start_state': str, + 'gap_end_lat': Optional[float], + 'gap_end_lon': Optional[float], + 'gap_distance_nm': float, + 'gap_resumed': bool, + 'pre_gap_turn_or_teleport': bool, + 'avg_sog_before': float, + } + """ + default = { + 'is_dark': False, + 'gap_min': 0, + 'gap_start_lat': None, + 'gap_start_lon': None, + 'gap_start_sog': 0.0, + 'gap_start_state': 'UNKNOWN', + 'gap_end_lat': None, + 'gap_end_lon': None, + 'gap_distance_nm': 0.0, + 'gap_resumed': False, + 'pre_gap_turn_or_teleport': False, + 'avg_sog_before': 0.0, + } + + if df_vessel is None or len(df_vessel) < 2: + return default + + df_sorted = df_vessel.sort_values('timestamp').reset_index(drop=True) + records = df_sorted.to_dict('records') + + # 가장 긴 gap 찾기 + max_gap_sec = 0.0 + max_gap_idx = -1 # records에서 gap 직후 인덱스 (curr) + for i in range(1, len(records)): + prev_ts = pd.Timestamp(records[i - 1]['timestamp']) + curr_ts = pd.Timestamp(records[i]['timestamp']) + gap_sec = (curr_ts - prev_ts).total_seconds() + if gap_sec > max_gap_sec: + max_gap_sec = gap_sec + max_gap_idx = i + + if max_gap_idx < 1 or max_gap_sec < GAP_SUSPICIOUS_SEC: + return default + + prev_row = records[max_gap_idx - 1] # gap 직전 마지막 포인트 + curr_row = records[max_gap_idx] # gap 직후 첫 포인트 + + gap_start_lat = float(prev_row.get('lat')) if prev_row.get('lat') is not None else None + gap_start_lon = float(prev_row.get('lon')) if prev_row.get('lon') is not None else None + gap_end_lat = float(curr_row.get('lat')) if curr_row.get('lat') is not None else None + gap_end_lon = float(curr_row.get('lon')) if curr_row.get('lon') is not None else None + + # gap 직전 SOG 추정: prev 행의 raw_sog 또는 computed sog 사용 + gap_start_sog = float(prev_row.get('sog') or prev_row.get('raw_sog') or 0.0) + + # gap 중 이동 거리 + if all(v is not None for v in (gap_start_lat, gap_start_lon, gap_end_lat, gap_end_lon)): + gap_distance_nm = haversine_nm( + gap_start_lat, gap_start_lon, gap_end_lat, gap_end_lon, + ) + else: + gap_distance_nm = 0.0 + + # 현재 시점 기준 gap이 "재개되었는지" 판단: + # curr_row가 df_sorted의 마지막 포인트가 아니면 신호가 이미 재개된 상태 + # 마지막 포인트면 아직 gap 진행 중(curr_row는 gap 시작 직후 아니라 gap 전의 마지막일 수도 있음) + is_last = (max_gap_idx == len(records) - 1) + # gap이 마지막이면 신호 복귀 미확인 + gap_resumed = not is_last or ( + is_last and max_gap_idx < len(records) - 1 # 항상 False지만 안전용 + ) + # 단, max_gap_idx가 마지막이면 gap 후 포인트 없음 → 재개 미확인 + if max_gap_idx == len(records) - 1: + gap_resumed = False + else: + gap_resumed = True + + # gap 직전 5개 포인트로 평균 SOG + 이상 행동(teleport) 판정 + start_idx = max(0, max_gap_idx - 5) + window = records[start_idx:max_gap_idx] + if window: + sogs = [float(r.get('sog') or r.get('raw_sog') or 0.0) for r in window] + avg_sog_before = sum(sogs) / len(sogs) if sogs else 0.0 + else: + avg_sog_before = gap_start_sog + + # gap 직전 window에 teleportation 발생 여부 + pre_gap_turn_or_teleport = False + if len(window) >= 2: + try: + window_df = df_sorted.iloc[start_idx:max_gap_idx].copy() + # spoofing.detect_teleportation 재사용 (순환 import 방지 위해 지연 import) + from algorithms.spoofing import detect_teleportation + teleports = detect_teleportation(window_df) + if teleports: + pre_gap_turn_or_teleport = True + except Exception: + pre_gap_turn_or_teleport = False + + return { + 'is_dark': True, + 'gap_min': int(max_gap_sec / 60), + 'gap_start_lat': gap_start_lat, + 'gap_start_lon': gap_start_lon, + 'gap_start_sog': gap_start_sog, + 'gap_start_state': _classify_state(gap_start_sog), + 'gap_end_lat': gap_end_lat, + 'gap_end_lon': gap_end_lon, + 'gap_distance_nm': round(gap_distance_nm, 2), + 'gap_resumed': gap_resumed, + 'pre_gap_turn_or_teleport': pre_gap_turn_or_teleport, + 'avg_sog_before': round(avg_sog_before, 2), + } + + +def _is_in_kr_coverage(lat: Optional[float], lon: Optional[float]) -> bool: + if lat is None or lon is None: + return False + return (_KR_COVERAGE_LAT[0] <= lat <= _KR_COVERAGE_LAT[1] + and _KR_COVERAGE_LON[0] <= lon <= _KR_COVERAGE_LON[1]) + + +def compute_dark_suspicion( + gap_info: dict, + mmsi: str, + is_permitted: bool, + history: dict, + now_kst_hour: int, + classify_zone_fn: Optional[Callable[[float, float], dict]] = None, +) -> tuple[int, list[str], str]: + """의도적 AIS OFF 의심 점수 산출. + + Args: + gap_info: analyze_dark_pattern 결과 + mmsi: 선박 MMSI + is_permitted: 허가 어선 여부 + history: {'count_7d': int, 'count_24h': int} + now_kst_hour: 현재 KST 시각 (0~23) + classify_zone_fn: (lat, lon) -> dict. gap_start 위치의 zone 판단 + + Returns: + (score, patterns, tier) + tier: 'CRITICAL' / 'HIGH' / 'WATCH' / 'NONE' + """ + if not gap_info.get('is_dark'): + return 0, [], 'NONE' + + score = 0 + patterns: list[str] = [] + + gap_start_sog = gap_info.get('gap_start_sog') or 0.0 + gap_start_state = gap_info.get('gap_start_state', 'UNKNOWN') + gap_start_lat = gap_info.get('gap_start_lat') + gap_start_lon = gap_info.get('gap_start_lon') + gap_min = gap_info.get('gap_min') or 0 + + # P1: 이동 중 OFF + if gap_start_sog > 5.0: + score += 25 + patterns.append('moving_at_off') + elif gap_start_sog > 2.0: + score += 15 + patterns.append('slow_moving_at_off') + + # P2: gap 시작 위치의 민감 수역 + if classify_zone_fn is not None and gap_start_lat is not None and gap_start_lon is not None: + try: + zone_info = classify_zone_fn(gap_start_lat, gap_start_lon) + zone = zone_info.get('zone', '') + if zone in ('TERRITORIAL_SEA', 'CONTIGUOUS_ZONE'): + score += 25 + patterns.append('sensitive_zone') + elif zone.startswith('ZONE_'): + score += 15 + patterns.append('special_zone') + except Exception: + pass + + # P3: 반복 이력 (과거 7일) + h7 = int(history.get('count_7d', 0) or 0) + h24 = int(history.get('count_24h', 0) or 0) + if h7 >= 3: + score += 30 + patterns.append('repeat_high') + elif h7 >= 2: + score += 15 + patterns.append('repeat_low') + if h24 >= 1: + score += 10 + patterns.append('recent_dark') + + # P4: gap 후 이동 거리 비정상 + gap_distance_nm = gap_info.get('gap_distance_nm') or 0.0 + avg_sog_before = gap_info.get('avg_sog_before') or 0.0 + if gap_info.get('gap_resumed') and gap_min > 0: + gap_hours = gap_min / 60.0 + # 예상 이동 = avg_sog * gap_hours. 2배 초과면 비정상 + expected = max(gap_hours * max(avg_sog_before, 1.0), 0.5) + if gap_distance_nm > expected * 2.0: + score += 20 + patterns.append('distance_anomaly') + + # P5: 주간 조업 시간 OFF + if 6 <= now_kst_hour < 18 and gap_start_state == 'FISHING': + score += 15 + patterns.append('daytime_fishing_off') + + # P6: gap 직전 이상 행동 + if gap_info.get('pre_gap_turn_or_teleport'): + score += 15 + patterns.append('teleport_before_gap') + + # P7: 무허가 + if not is_permitted: + score += 10 + patterns.append('unpermitted') + + # P8: gap 길이 + if gap_min >= 360: + score += 15 + patterns.append('very_long_gap') + elif gap_min >= 180: + score += 10 + patterns.append('long_gap') + + # 감점: gap 시작 위치가 한국 수신 커버리지 밖 → 자연 gap 가능성 + if not _is_in_kr_coverage(gap_start_lat, gap_start_lon): + score -= 30 + patterns.append('out_of_coverage') + + score = max(0, min(100, score)) + + if score >= 70: + tier = 'CRITICAL' + elif score >= 50: + tier = 'HIGH' + elif score >= 30: + tier = 'WATCH' + else: + tier = 'NONE' + + return score, patterns, tier diff --git a/prediction/algorithms/transshipment.py b/prediction/algorithms/transshipment.py index 9e26b95..27cdd36 100644 --- a/prediction/algorithms/transshipment.py +++ b/prediction/algorithms/transshipment.py @@ -15,31 +15,40 @@ from __future__ import annotations import logging import math from datetime import datetime, timezone -from typing import Optional +from typing import Callable, Optional import pandas as pd +from fleet_tracker import GEAR_PATTERN + logger = logging.getLogger(__name__) # ────────────────────────────────────────────────────────────── -# 상수 +# 상수 (2026-04-09 재조정 — 베테랑 관점) # ────────────────────────────────────────────────────────────── -SOG_THRESHOLD_KN = 2.0 # 정박/표류 기준 속도 (노트) -PROXIMITY_DEG = 0.001 # 근접 판정 임계값 (~110m) -SUSPECT_DURATION_MIN = 60 # 의심 판정 최소 지속 시간 (분) -PAIR_EXPIRY_MIN = 120 # pair_history 항목 만료 기준 (분) +SOG_THRESHOLD_KN = 1.0 # 2.0 → 1.0 (완전 정박 수준) +PROXIMITY_DEG = 0.0007 # 0.001 → 0.0007 (~77m, GPS 노이즈 포함한 근접) +SUSPECT_DURATION_MIN = 45 # 60 → 45 (gap tolerance 있음) +PAIR_EXPIRY_MIN = 180 # 120 → 180 +GAP_TOLERANCE_CYCLES = 2 # 신규: 2 사이클까지 active에서 빠져도 리셋 안 함 -# 외국 해안 근접 제외 경계 -_CN_LON_MAX = 123.5 # 중국 해안: 경도 < 123.5 -_JP_LON_MIN = 130.5 # 일본 해안: 경도 > 130.5 -_TSUSHIMA_LAT_MIN = 33.8 # 대마도: 위도 > 33.8 AND 경도 > 129.0 +# 외국 해안 근접 제외 경계 (레거시 — 관할 필터로 대체됨) +_CN_LON_MAX = 123.5 +_JP_LON_MIN = 130.5 +_TSUSHIMA_LAT_MIN = 33.8 _TSUSHIMA_LON_MIN = 129.0 -# 탐지 대상 선종 (소문자 정규화 후 비교) -_CANDIDATE_TYPES: frozenset[str] = frozenset({'tanker', 'cargo', 'fishing'}) +# 한국 EEZ 관할 수역 (단속 가능 범위) +_KR_EEZ_LAT = (32.0, 39.5) +_KR_EEZ_LON = (124.0, 132.0) -# 그리드 셀 크기 = PROXIMITY_DEG (셀 하나 = 근접 임계와 동일 크기) +# 환적 불가능 선종 (여객/군함/유조/도선/예인/수색구조) +_TRANSSHIP_EXCLUDED: frozenset[str] = frozenset({ + 'passenger', 'military', 'tanker', 'pilot', 'tug', 'sar', +}) + +# 그리드 셀 크기 _GRID_CELL_DEG = PROXIMITY_DEG @@ -58,6 +67,28 @@ def _is_near_foreign_coast(lat: float, lon: float) -> bool: return False +def _is_in_kr_jurisdiction(lat: float, lon: float) -> bool: + """한국 EEZ 관할 수역 여부 (단속 가능 범위).""" + return (_KR_EEZ_LAT[0] <= lat <= _KR_EEZ_LAT[1] + and _KR_EEZ_LON[0] <= lon <= _KR_EEZ_LON[1]) + + +def _is_candidate_ship_type(vessel_type_a: Optional[str], vessel_type_b: Optional[str]) -> bool: + """환적 후보 선종인지 (명시적 제외만 차단, 미상은 허용).""" + a = (vessel_type_a or '').strip().lower() + b = (vessel_type_b or '').strip().lower() + if a in _TRANSSHIP_EXCLUDED or b in _TRANSSHIP_EXCLUDED: + return False + return True + + +def _is_gear_name(name: Optional[str]) -> bool: + """어구 이름 패턴 매칭 — fleet_tracker.GEAR_PATTERN SSOT.""" + if not name: + return False + return bool(GEAR_PATTERN.match(name)) + + def _cell_key(lat: float, lon: float) -> tuple[int, int]: """위도/경도를 그리드 셀 인덱스로 변환.""" return (int(math.floor(lat / _GRID_CELL_DEG)), @@ -101,14 +132,25 @@ def _pair_key(mmsi_a: str, mmsi_b: str) -> tuple[str, str]: def _evict_expired_pairs( - pair_history: dict[tuple[str, str], datetime], + pair_history: dict, now: datetime, ) -> None: - """PAIR_EXPIRY_MIN 이상 갱신 없는 pair_history 항목 제거.""" - expired = [ - key for key, first_seen in pair_history.items() - if (now - first_seen).total_seconds() / 60 > PAIR_EXPIRY_MIN - ] + """PAIR_EXPIRY_MIN 이상 갱신 없는 pair_history 항목 제거. + + 새 구조: {(a,b): {'first_seen': dt, 'last_seen': dt, 'miss_count': int}} + """ + expired = [] + for key, meta in pair_history.items(): + if not isinstance(meta, dict): + # 레거시 구조 (datetime 직접 저장)는 즉시 제거 → 다음 사이클에서 재구성 + expired.append(key) + continue + last_seen = meta.get('last_seen') or meta.get('first_seen') + if last_seen is None: + expired.append(key) + continue + if (now - last_seen).total_seconds() / 60 > PAIR_EXPIRY_MIN: + expired.append(key) for key in expired: del pair_history[key] @@ -117,24 +159,119 @@ def _evict_expired_pairs( # 공개 API # ────────────────────────────────────────────────────────────── +def _score_pair( + pair: tuple[str, str], + meta: dict, + lat: float, + lon: float, + cog_a: Optional[float], + cog_b: Optional[float], + vessel_info_a: dict, + vessel_info_b: dict, + is_permitted_fn: Optional[Callable[[str], bool]], + now_kst_hour: int, + zone_code: Optional[str], + now: datetime, +) -> Optional[dict]: + """환적 의심 pair에 대해 점수 산출 + severity 반환. + + 필수 조건 실패 시 None. WATCH 이상이면 dict 반환. + """ + # 필수 1: 한국 관할 수역 + if not _is_in_kr_jurisdiction(lat, lon): + return None + # 필수 2: 선종 필터 + if not _is_candidate_ship_type( + vessel_info_a.get('vessel_type'), + vessel_info_b.get('vessel_type'), + ): + return None + # 필수 3: 어구 제외 + if _is_gear_name(vessel_info_a.get('name')) or _is_gear_name(vessel_info_b.get('name')): + return None + # 필수 4: 지속 시간 + first_seen = meta.get('first_seen') + if first_seen is None: + return None + duration_min = int((now - first_seen).total_seconds() / 60) + if duration_min < SUSPECT_DURATION_MIN: + return None + + score = 40 # base + + # 야간 가점 (KST 20:00~04:00) + if now_kst_hour >= 20 or now_kst_hour < 4: + score += 15 + + # 무허가 가점 + if is_permitted_fn is not None: + try: + if not is_permitted_fn(pair[0]) or not is_permitted_fn(pair[1]): + score += 20 + except Exception: + pass + + # COG 편차 (같은 방향 아니면 가점 — 나란히 가는 선단 배제) + if cog_a is not None and cog_b is not None: + try: + diff = abs(float(cog_a) - float(cog_b)) + if diff > 180: + diff = 360 - diff + if diff > 45: + score += 20 + except Exception: + pass + + # 지속 길이 추가 가점 + if duration_min >= 90: + score += 20 + + # 영해/접속수역 추가 가점 + if zone_code in ('TERRITORIAL_SEA', 'CONTIGUOUS_ZONE'): + score += 15 + + if score >= 90: + severity = 'CRITICAL' + elif score >= 70: + severity = 'HIGH' + elif score >= 50: + severity = 'WATCH' + else: + return None + + return { + 'pair_a': pair[0], + 'pair_b': pair[1], + 'duration_min': duration_min, + 'severity': severity, + 'score': score, + 'lat': lat, + 'lon': lon, + } + + def detect_transshipment( df: pd.DataFrame, - pair_history: dict[tuple[str, str], datetime], -) -> list[tuple[str, str, int]]: - """환적 의심 쌍 탐지. + pair_history: dict, + get_vessel_info: Optional[Callable[[str], dict]] = None, + is_permitted: Optional[Callable[[str], bool]] = None, + classify_zone_fn: Optional[Callable[[float, float], dict]] = None, + now_kst_hour: int = 0, +) -> list[dict]: + """환적 의심 쌍 탐지 (점수 기반, 베테랑 관점 필터). Args: df: 선박 위치 DataFrame. 필수 컬럼: mmsi, lat, lon, sog - 선택 컬럼: ship_type (없으면 전체 선종 허용) - pair_history: 쌍별 최초 탐지 시각을 저장하는 영속 dict. - 스케줄러에서 호출 간 유지하여 전달해야 한다. - 키: (mmsi_a, mmsi_b) — mmsi_a < mmsi_b 정규화 적용. - 값: 최초 탐지 시각 (UTC datetime, timezone-aware). + 선택 컬럼: cog + pair_history: {(a,b): {'first_seen', 'last_seen', 'miss_count'}} + get_vessel_info: callable(mmsi) -> {'name', 'vessel_type', ...} + is_permitted: callable(mmsi) -> bool + classify_zone_fn: callable(lat, lon) -> dict (zone 판정) + now_kst_hour: 현재 KST 시각 (0~23) Returns: - [(mmsi_a, mmsi_b, duration_minutes), ...] — 60분 이상 지속된 의심 쌍. - mmsi_a < mmsi_b 정규화 적용. + list[dict] — severity 'CRITICAL'/'HIGH'/'WATCH' 포함 의심 쌍 """ if df.empty: return [] @@ -147,22 +284,15 @@ def detect_transshipment( now = datetime.now(timezone.utc) - # ── 1. 후보 선박 필터 ────────────────────────────────────── - has_type_col = 'ship_type' in df.columns - + # ── 1. 후보 선박 필터 (SOG < 1.0) ───────────────────────── candidate_mask = df['sog'] < SOG_THRESHOLD_KN - - if has_type_col: - type_mask = df['ship_type'].apply(_normalize_type).isin(_CANDIDATE_TYPES) - candidate_mask = candidate_mask & type_mask - candidates = df[candidate_mask].copy() if candidates.empty: _evict_expired_pairs(pair_history, now) return [] - # 외국 해안 근처 제외 + # 외국 해안 근처 제외 (1차 필터) coast_mask = candidates.apply( lambda row: not _is_near_foreign_coast(row['lat'], row['lon']), axis=1, @@ -173,62 +303,162 @@ def detect_transshipment( _evict_expired_pairs(pair_history, now) return [] - records = candidates[['mmsi', 'lat', 'lon']].to_dict('records') + has_cog = 'cog' in candidates.columns + cols = ['mmsi', 'lat', 'lon'] + if has_cog: + cols.append('cog') + records = candidates[cols].to_dict('records') for rec in records: rec['mmsi'] = str(rec['mmsi']) - # ── 2. 그리드 기반 근접 쌍 탐지 ────────────────────────── + # ── 2. 그리드 기반 근접 쌍 탐지 (77m) ─────────────────── grid = _build_grid(records) - active_pairs: set[tuple[str, str]] = set() + active_pairs: dict[tuple[str, str], dict] = {} + + def _try_add_pair(a_rec, b_rec): + if not _within_proximity(a_rec, b_rec): + return + key = _pair_key(a_rec['mmsi'], b_rec['mmsi']) + # 중점 좌표 (점수 산출용) + mid_lat = (a_rec['lat'] + b_rec['lat']) / 2.0 + mid_lon = (a_rec['lon'] + b_rec['lon']) / 2.0 + active_pairs[key] = { + 'lat': mid_lat, 'lon': mid_lon, + 'cog_a': a_rec.get('cog'), 'cog_b': b_rec.get('cog'), + # mmsi_a < mmsi_b 순서로 정렬되었으므로 cog도 맞춰 정렬 필요 + 'mmsi_a': a_rec['mmsi'], 'mmsi_b': b_rec['mmsi'], + } for (row, col), indices in grid.items(): - # 현재 셀 내부 쌍 for i in range(len(indices)): for j in range(i + 1, len(indices)): - a = records[indices[i]] - b = records[indices[j]] - if _within_proximity(a, b): - active_pairs.add(_pair_key(a['mmsi'], b['mmsi'])) - - # 인접 셀 (우측 3셀 + 아래 3셀 = 중복 없는 방향성 순회) + _try_add_pair(records[indices[i]], records[indices[j]]) for dr, dc in ((0, 1), (1, -1), (1, 0), (1, 1)): neighbor_key = (row + dr, col + dc) if neighbor_key not in grid: continue for ai in indices: for bi in grid[neighbor_key]: - a = records[ai] - b = records[bi] - if _within_proximity(a, b): - active_pairs.add(_pair_key(a['mmsi'], b['mmsi'])) + _try_add_pair(records[ai], records[bi]) - # ── 3. pair_history 갱신 ───────────────────────────────── - # 현재 활성 쌍 → 최초 탐지 시각 등록 - for pair in active_pairs: - if pair not in pair_history: - pair_history[pair] = now + # ── 3. pair_history 갱신 (gap tolerance) ───────────────── + active_keys = set(active_pairs.keys()) - # 비활성 쌍 → pair_history에서 제거 (다음 접근 시 재시작) - inactive = [key for key in pair_history if key not in active_pairs] - for key in inactive: - del pair_history[key] + # 활성 쌍 → 등록/갱신 + for pair in active_keys: + if pair not in pair_history or not isinstance(pair_history[pair], dict): + pair_history[pair] = { + 'first_seen': now, + 'last_seen': now, + 'miss_count': 0, + } + else: + pair_history[pair]['last_seen'] = now + pair_history[pair]['miss_count'] = 0 - # 만료 항목 정리 (비활성 제거 후 잔여 방어용) + # 비활성 쌍 → miss_count++ , GAP_TOLERANCE 초과 시 삭제 + for key in list(pair_history.keys()): + if key in active_keys: + continue + meta = pair_history[key] + if not isinstance(meta, dict): + del pair_history[key] + continue + meta['miss_count'] = meta.get('miss_count', 0) + 1 + if meta['miss_count'] > GAP_TOLERANCE_CYCLES: + del pair_history[key] + + # 만료 정리 _evict_expired_pairs(pair_history, now) - # ── 4. 의심 쌍 판정 ────────────────────────────────────── - suspects: list[tuple[str, str, int]] = [] + # ── 4. 점수 기반 의심 쌍 판정 ───────────────────────────── + suspects: list[dict] = [] + rejected_jurisdiction = 0 + rejected_ship_type = 0 + rejected_gear = 0 + rejected_duration = 0 + + for pair, meta in pair_history.items(): + if not isinstance(meta, dict): + continue + first_seen = meta.get('first_seen') + if first_seen is None: + continue + + # active_pairs에 있으면 해당 사이클 좌표·cog 사용, 없으면 이전 값 재사용 (miss 중) + loc_meta = active_pairs.get(pair) + if loc_meta is not None: + lat = loc_meta['lat'] + lon = loc_meta['lon'] + # mmsi_a, mmsi_b 순서를 pair 순서에 맞춤 + if loc_meta['mmsi_a'] == pair[0]: + cog_a, cog_b = loc_meta.get('cog_a'), loc_meta.get('cog_b') + else: + cog_a, cog_b = loc_meta.get('cog_b'), loc_meta.get('cog_a') + meta['last_lat'] = lat + meta['last_lon'] = lon + meta['last_cog_a'] = cog_a + meta['last_cog_b'] = cog_b + else: + lat = meta.get('last_lat') + lon = meta.get('last_lon') + cog_a = meta.get('last_cog_a') + cog_b = meta.get('last_cog_b') + if lat is None or lon is None: + continue + + # 선박 정보 조회 + info_a = get_vessel_info(pair[0]) if get_vessel_info else {} + info_b = get_vessel_info(pair[1]) if get_vessel_info else {} + + # 짧게 pre-check (로깅용) + if not _is_in_kr_jurisdiction(lat, lon): + rejected_jurisdiction += 1 + continue + if not _is_candidate_ship_type(info_a.get('vessel_type'), info_b.get('vessel_type')): + rejected_ship_type += 1 + continue + if _is_gear_name(info_a.get('name')) or _is_gear_name(info_b.get('name')): + rejected_gear += 1 + continue - for pair, first_seen in pair_history.items(): duration_min = int((now - first_seen).total_seconds() / 60) - if duration_min >= SUSPECT_DURATION_MIN: - suspects.append((pair[0], pair[1], duration_min)) + if duration_min < SUSPECT_DURATION_MIN: + rejected_duration += 1 + continue - if suspects: - logger.info( - 'transshipment detection: %d suspect pairs (candidates=%d)', - len(suspects), - len(candidates), + zone_code = None + if classify_zone_fn is not None: + try: + zone_code = classify_zone_fn(lat, lon).get('zone') + except Exception: + pass + + scored = _score_pair( + pair, meta, lat, lon, cog_a, cog_b, + info_a, info_b, is_permitted, + now_kst_hour, zone_code, now, ) + if scored is not None: + suspects.append(scored) + + tier_counts = {'CRITICAL': 0, 'HIGH': 0, 'WATCH': 0} + for s in suspects: + tier_counts[s['severity']] = tier_counts.get(s['severity'], 0) + 1 + + logger.info( + 'transshipment detection: pairs=%d (critical=%d, high=%d, watch=%d, ' + 'rejected_jurisdiction=%d, rejected_ship_type=%d, rejected_gear=%d, ' + 'rejected_duration=%d, candidates=%d)', + len(suspects), + tier_counts.get('CRITICAL', 0), + tier_counts.get('HIGH', 0), + tier_counts.get('WATCH', 0), + rejected_jurisdiction, + rejected_ship_type, + rejected_gear, + rejected_duration, + len(candidates), + ) return suspects diff --git a/prediction/output/event_generator.py b/prediction/output/event_generator.py index 887959a..e1ad2d4 100644 --- a/prediction/output/event_generator.py +++ b/prediction/output/event_generator.py @@ -23,13 +23,13 @@ EVENTS_TABLE = qualified_table('prediction_events') # 사이클이 5분 간격이므로 5의 배수를 피해서 boundary 일제 만료 패턴을 회피한다. DEDUP_WINDOWS = { 'EEZ_INTRUSION': 33, - 'DARK_VESSEL': 127, + 'DARK_VESSEL': 131, # 127 → 131 (2h boundary 회피) 'FLEET_CLUSTER': 367, - 'ILLEGAL_TRANSSHIP': 67, + 'ILLEGAL_TRANSSHIP': 181, # 67 → 181 (환적은 장기 이벤트) 'MMSI_TAMPERING': 33, 'AIS_LOSS': 127, 'SPEED_ANOMALY': 67, - 'ZONE_DEPARTURE': 127, + 'ZONE_DEPARTURE': 89, # 127 → 89 (boundary 분산) 'GEAR_ILLEGAL': 367, 'AIS_RESUME': 67, 'HIGH_RISK_VESSEL': 67, @@ -66,11 +66,25 @@ RULES = [ 'title_fn': lambda r: f"고위험 선박 탐지 (위험도 {r.get('risk_score', 0)})", }, { - 'name': 'dark_vessel_long', - 'condition': lambda r: r.get('is_dark') and (r.get('gap_duration_min', 0) or 0) > 60, + # dark 의심 CRITICAL — 점수 70+ (반복·민감수역·이동중·거리이상 등 복합) + 'name': 'dark_critical', + 'condition': lambda r: (r.get('features') or {}).get('dark_tier') == 'CRITICAL', + 'level': 'CRITICAL', + 'category': 'DARK_VESSEL', + 'title_fn': lambda r: ( + f"고의 AIS 소실 의심 (점수 {(r.get('features') or {}).get('dark_suspicion_score', 0)}, " + f"7일 {(r.get('features') or {}).get('dark_history_7d', 0)}회)" + ), + }, + { + # dark 의심 HIGH — 점수 50~69 + 'name': 'dark_high', + 'condition': lambda r: (r.get('features') or {}).get('dark_tier') == 'HIGH', 'level': 'HIGH', 'category': 'DARK_VESSEL', - 'title_fn': lambda r: f"다크베셀 장기 소실 ({r.get('gap_duration_min', 0)}분)", + 'title_fn': lambda r: ( + f"AIS 소실 의심 (점수 {(r.get('features') or {}).get('dark_suspicion_score', 0)})" + ), }, { 'name': 'spoofing', @@ -80,11 +94,26 @@ RULES = [ 'title_fn': lambda r: f"GPS/MMSI 조작 의심 (점수 {r.get('spoofing_score', 0):.2f})", }, { - 'name': 'transship', - 'condition': lambda r: r.get('transship_suspect'), + # 환적 의심 CRITICAL — 점수 90+ + 'name': 'transship_critical', + 'condition': lambda r: (r.get('features') or {}).get('transship_tier') == 'CRITICAL', + 'level': 'CRITICAL', + 'category': 'ILLEGAL_TRANSSHIP', + 'title_fn': lambda r: ( + f"환적 의심 (점수 {(r.get('features') or {}).get('transship_score', 0)}, " + f"{r.get('transship_duration_min', 0)}분, 상대 {r.get('transship_pair_mmsi', '?')})" + ), + }, + { + # 환적 의심 HIGH — 점수 70~89 + 'name': 'transship_high', + 'condition': lambda r: (r.get('features') or {}).get('transship_tier') == 'HIGH', 'level': 'HIGH', 'category': 'ILLEGAL_TRANSSHIP', - 'title_fn': lambda r: f"환적 의심 (상대: {r.get('transship_pair_mmsi', '미상')})", + 'title_fn': lambda r: ( + f"환적 의심 (점수 {(r.get('features') or {}).get('transship_score', 0)}, " + f"{r.get('transship_duration_min', 0)}분)" + ), }, { 'name': 'fleet_cluster', diff --git a/prediction/scheduler.py b/prediction/scheduler.py index e8d4255..5b2e65e 100644 --- a/prediction/scheduler.py +++ b/prediction/scheduler.py @@ -1,14 +1,18 @@ import logging import time -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from typing import Optional +from zoneinfo import ZoneInfo from apscheduler.schedulers.background import BackgroundScheduler from config import settings +from fleet_tracker import GEAR_PATTERN logger = logging.getLogger(__name__) +_KST = ZoneInfo('Asia/Seoul') + _scheduler: Optional[BackgroundScheduler] = None _last_run: dict = { 'timestamp': None, @@ -20,6 +24,53 @@ _last_run: dict = { _transship_pair_history: dict = {} +# 한국 선박 MMSI prefix — dark 판별 완전 제외 +_KR_DOMESTIC_PREFIXES = ('440', '441') + + +def _is_dark_excluded(mmsi: str, name: str) -> tuple[bool, str]: + """dark 탐지 대상에서 완전 제외할지. 어구/한국선만 필터. + + 사용자 알람은 선박만 대상, 한국선은 해경 관할 아님. + """ + if any(mmsi.startswith(p) for p in _KR_DOMESTIC_PREFIXES): + return True, 'kr_domestic' + if name and GEAR_PATTERN.match(name): + return True, 'gear_signal' + return False, '' + + +def _fetch_dark_history(kcg_conn, mmsi_list: list[str]) -> dict[str, dict]: + """최근 7일 내 is_dark=True 이력을 mmsi별로 집계. + + 사이클 시작 시 한 번에 조회하여 점수 계산 시 재사용. + """ + if not mmsi_list: + return {} + try: + cur = kcg_conn.cursor() + cur.execute( + """ + SELECT mmsi, + count(*) AS n7, + count(*) FILTER (WHERE analyzed_at > now() - interval '24 hours') AS n24, + max(analyzed_at) AS last_at + FROM kcg.vessel_analysis_results + WHERE is_dark = true + AND analyzed_at > now() - interval '7 days' + AND mmsi = ANY(%s) + GROUP BY mmsi + """, + (list(mmsi_list),), + ) + return { + str(m): {'count_7d': int(n7 or 0), 'count_24h': int(n24 or 0), 'last_at': t} + for m, n7, n24, t in cur.fetchall() + } + except Exception as e: + logger.warning('fetch_dark_history failed: %s', e) + return {} + def get_last_run() -> dict: return _last_run.copy() @@ -27,13 +78,12 @@ def get_last_run() -> dict: def run_analysis_cycle(): """5분 주기 분석 사이클 — 인메모리 캐시 기반.""" - import re as _re from cache.vessel_store import vessel_store from db import snpdb, kcgdb from pipeline.orchestrator import ChineseFishingVesselPipeline from algorithms.location import classify_zone from algorithms.fishing_pattern import compute_ucaf_score, compute_ucft_score - from algorithms.dark_vessel import is_dark_vessel + from algorithms.dark_vessel import is_dark_vessel, analyze_dark_pattern, compute_dark_suspicion from algorithms.spoofing import compute_spoofing_score, count_speed_jumps, compute_bd09_offset from algorithms.risk import compute_vessel_risk_score from fleet_tracker import fleet_tracker @@ -75,7 +125,6 @@ def run_analysis_cycle(): return # 4. 등록 선단 기반 fleet 분석 - _gear_re = _re.compile(r'^.+_(?=\S*\d)\S+(?:[_ ]\S*)*[_ ]*$|^\d+$|^.+%$') with kcgdb.get_conn() as kcg_conn: fleet_tracker.load_registry(kcg_conn) @@ -92,7 +141,7 @@ def run_analysis_cycle(): fleet_tracker.match_ais_to_registry(all_ais, kcg_conn) - gear_signals = [v for v in all_ais if _gear_re.match(v.get('name', ''))] + gear_signals = [v for v in all_ais if GEAR_PATTERN.match(v.get('name', '') or '')] fleet_tracker.track_gear_identity(gear_signals, kcg_conn) fleet_roles = fleet_tracker.build_fleet_clusters(vessel_dfs) @@ -152,6 +201,15 @@ def run_analysis_cycle(): logger.warning('gear correlation failed: %s', e) # 5. 선박별 추가 알고리즘 → AnalysisResult 생성 + # dark 이력 일괄 조회 (7일 history) — 사이클당 1회 + now_kst_hour = datetime.now(_KST).hour + all_chinese = vessel_store.get_chinese_mmsis() + with kcgdb.get_conn() as hist_conn: + dark_history_map = _fetch_dark_history(hist_conn, list(all_chinese)) + + pipeline_dark_tiers = {'CRITICAL': 0, 'HIGH': 0, 'WATCH': 0, 'NONE': 0} + pipeline_skip_counts = {'kr_domestic': 0, 'gear_signal': 0} + results = [] for c in classifications: mmsi = c['mmsi'] @@ -169,7 +227,42 @@ def run_analysis_cycle(): ucaf = compute_ucaf_score(df_v, gear) ucft = compute_ucft_score(df_v) - dark, gap_min = is_dark_vessel(df_v) + # ── Dark: 넓은 탐지 + 의도적 OFF 의심 점수화 ── + vname = vessel_store.get_vessel_info(mmsi).get('name', '') or '' + is_permitted = vessel_store.is_permitted(mmsi) + dark_excluded, dark_skip_reason = _is_dark_excluded(mmsi, vname) + if dark_excluded: + pipeline_skip_counts[dark_skip_reason] = pipeline_skip_counts.get(dark_skip_reason, 0) + 1 + dark = False + gap_min = 0 + dark_features: dict = { + 'dark_suspicion_score': 0, + 'dark_patterns': [], + 'dark_tier': 'EXCLUDED', + 'dark_history_7d': 0, + 'dark_history_24h': 0, + } + else: + gap_info = analyze_dark_pattern(df_v) + dark = bool(gap_info.get('is_dark')) + gap_min = int(gap_info.get('gap_min') or 0) + history = dark_history_map.get(mmsi, {'count_7d': 0, 'count_24h': 0}) + score, patterns, tier = compute_dark_suspicion( + gap_info, mmsi, is_permitted, history, + now_kst_hour, classify_zone, + ) + pipeline_dark_tiers[tier] = pipeline_dark_tiers.get(tier, 0) + 1 + dark_features = { + 'dark_suspicion_score': score, + 'dark_patterns': patterns, + 'dark_tier': tier, + 'dark_history_7d': int(history.get('count_7d', 0) or 0), + 'dark_history_24h': int(history.get('count_24h', 0) or 0), + 'gap_start_lat': gap_info.get('gap_start_lat'), + 'gap_start_lon': gap_info.get('gap_start_lon'), + 'gap_start_sog': gap_info.get('gap_start_sog'), + 'gap_start_state': gap_info.get('gap_start_state'), + } spoof_score = compute_spoofing_score(df_v) speed_jumps = count_speed_jumps(df_v) @@ -177,7 +270,6 @@ def run_analysis_cycle(): fleet_info = fleet_roles.get(mmsi, {}) - is_permitted = vessel_store.is_permitted(mmsi) risk_score, risk_level = compute_vessel_risk_score( mmsi, df_v, zone_info, is_permitted=is_permitted, ) @@ -186,6 +278,8 @@ def run_analysis_cycle(): if 'state' in df_v.columns and len(df_v) > 0: activity = df_v['state'].mode().iloc[0] + merged_features = {**(c.get('features', {}) or {}), **dark_features} + results.append(AnalysisResult( mmsi=mmsi, timestamp=ts, @@ -209,9 +303,14 @@ def run_analysis_cycle(): fleet_role=fleet_info.get('fleet_role', 'NOISE'), risk_score=risk_score, risk_level=risk_level, - features=c.get('features', {}), + features=merged_features, )) + logger.info( + 'pipeline dark: tiers=%s skip=%s', + pipeline_dark_tiers, pipeline_skip_counts, + ) + # ── 5.5 경량 분석 — 파이프라인 미통과 412* 선박 ── # vessel_store._tracks의 24h 누적 궤적을 직접 활용하여 dark/spoof 신호도 산출. from algorithms.risk import compute_lightweight_risk_score @@ -225,6 +324,8 @@ def run_analysis_cycle(): lw_count = 0 lw_dark = 0 lw_spoof = 0 + lw_dark_tiers = {'CRITICAL': 0, 'HIGH': 0, 'WATCH': 0, 'NONE': 0} + lw_dark_skip = {'kr_domestic': 0, 'gear_signal': 0} for mmsi in lightweight_mmsis: pos = all_positions.get(mmsi) if pos is None or pos.get('lat') is None: @@ -242,40 +343,71 @@ def run_analysis_cycle(): else: state = 'SAILING' - # 24h 누적 궤적으로 dark/spoofing 산출 (vessel_store._tracks 직접 접근) - df_v = vessel_store._tracks.get(mmsi) - dark = False - gap_min = 0 - spoof_score = 0.0 - speed_jumps = 0 - if df_v is not None and len(df_v) >= 2: - try: - dark, gap_min = is_dark_vessel(df_v) - except Exception: - pass - try: - spoof_score = compute_spoofing_score(df_v) - except Exception: - pass - try: - speed_jumps = count_speed_jumps(df_v) - except Exception: - pass - # 핫픽스 (2026-04-08): 한국 AIS 수신 가능 영역 밖에서의 dark 판정은 오탐. - # 412* 중국 선박이 자국 EEZ로 깊이 들어가면(~124°E 서쪽) 한국 수신소 - # 도달 한계로 자연 gap 발생. 해당 영역 밖은 dark에서 제외한다. - # 영역: 북위 32~39.5, 동경 124~132 (한반도 + EEZ + 접속수역 여유 포함) - if dark: - in_kr_reception = (124.0 <= lon <= 132.0) and (32.0 <= lat <= 39.5) - if not in_kr_reception: - dark = False - gap_min = 0 + is_permitted = vessel_store.is_permitted(mmsi) + vname = vessel_store.get_vessel_info(mmsi).get('name', '') or '' + + # ── Dark: 사전 필터 (어구/한국선) ── + dark_excluded, dark_skip_reason = _is_dark_excluded(mmsi, vname) + if dark_excluded: + lw_dark_skip[dark_skip_reason] = lw_dark_skip.get(dark_skip_reason, 0) + 1 + dark = False + gap_min = 0 + dark_features: dict = { + 'dark_suspicion_score': 0, + 'dark_patterns': [], + 'dark_tier': 'EXCLUDED', + 'dark_history_7d': 0, + 'dark_history_24h': 0, + } + spoof_score = 0.0 + speed_jumps = 0 + else: + df_v = vessel_store._tracks.get(mmsi) + spoof_score = 0.0 + speed_jumps = 0 + if df_v is not None and len(df_v) >= 2: + try: + spoof_score = compute_spoofing_score(df_v) + except Exception: + pass + try: + speed_jumps = count_speed_jumps(df_v) + except Exception: + pass + try: + gap_info = analyze_dark_pattern(df_v) + except Exception: + gap_info = {'is_dark': False, 'gap_min': 0} + else: + gap_info = {'is_dark': False, 'gap_min': 0} + + dark = bool(gap_info.get('is_dark')) + gap_min = int(gap_info.get('gap_min') or 0) + + history = dark_history_map.get(mmsi, {'count_7d': 0, 'count_24h': 0}) + score, patterns, tier = compute_dark_suspicion( + gap_info, mmsi, is_permitted, history, + now_kst_hour, classify_zone, + ) + lw_dark_tiers[tier] = lw_dark_tiers.get(tier, 0) + 1 + + dark_features = { + 'dark_suspicion_score': score, + 'dark_patterns': patterns, + 'dark_tier': tier, + 'dark_history_7d': int(history.get('count_7d', 0) or 0), + 'dark_history_24h': int(history.get('count_24h', 0) or 0), + 'gap_start_lat': gap_info.get('gap_start_lat'), + 'gap_start_lon': gap_info.get('gap_start_lon'), + 'gap_start_sog': gap_info.get('gap_start_sog'), + 'gap_start_state': gap_info.get('gap_start_state'), + } + if dark: lw_dark += 1 if spoof_score > 0.5: lw_spoof += 1 - is_permitted = vessel_store.is_permitted(mmsi) risk_score, risk_level = compute_lightweight_risk_score( zone_info, sog, is_permitted=is_permitted, is_dark=dark, gap_duration_min=gap_min, @@ -308,27 +440,44 @@ def run_analysis_cycle(): is_transship_suspect=False, transship_pair_mmsi='', transship_duration_min=0, + features=dark_features, )) lw_count += 1 logger.info( - 'lightweight analysis: %d vessels (dark=%d, spoof>0.5=%d)', - lw_count, lw_dark, lw_spoof, + 'lightweight analysis: %d vessels (dark=%d, spoof>0.5=%d, tiers=%s, skip=%s)', + lw_count, lw_dark, lw_spoof, lw_dark_tiers, lw_dark_skip, ) - # 6. 환적 의심 탐지 (pair_history 모듈 레벨로 사이클 간 유지) + # 6. 환적 의심 탐지 (점수 기반, 베테랑 관점 필터) from algorithms.transshipment import detect_transshipment results_map = {r.mmsi: r for r in results} - transship_pairs = detect_transshipment(df_targets, _transship_pair_history) - for mmsi_a, mmsi_b, dur in transship_pairs: - if mmsi_a in results_map: - results_map[mmsi_a].is_transship_suspect = True - results_map[mmsi_a].transship_pair_mmsi = mmsi_b - results_map[mmsi_a].transship_duration_min = dur - if mmsi_b in results_map: - results_map[mmsi_b].is_transship_suspect = True - results_map[mmsi_b].transship_pair_mmsi = mmsi_a - results_map[mmsi_b].transship_duration_min = dur + transship_items = detect_transshipment( + df_targets, + _transship_pair_history, + get_vessel_info=vessel_store.get_vessel_info, + is_permitted=vessel_store.is_permitted, + classify_zone_fn=classify_zone, + now_kst_hour=now_kst_hour, + ) + for item in transship_items: + a = item['pair_a'] + b = item['pair_b'] + dur = item['duration_min'] + tier = item['severity'] + if tier == 'WATCH': + continue # WATCH 등급은 저장 안 함 (로그만) + for m, pair in ((a, b), (b, a)): + if m in results_map: + r_obj = results_map[m] + r_obj.is_transship_suspect = True + r_obj.transship_pair_mmsi = pair + r_obj.transship_duration_min = dur + r_obj.features = { + **(r_obj.features or {}), + 'transship_tier': tier, + 'transship_score': item['score'], + } # 7. 결과 저장 upserted = kcgdb.upsert_results(results) From dac4a3bda2b2f1a66e06b3d5a5f351fb79ac4215 Mon Sep 17 00:00:00 2001 From: htlee Date: Thu, 9 Apr 2026 07:56:04 +0900 Subject: [PATCH 4/5] =?UTF-8?q?fix(prediction):=20features=20JSONB=20?= =?UTF-8?q?=EC=A4=91=EC=B2=A9=20=EA=B5=AC=EC=A1=B0=20sanitize?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AnalysisResult.to_db_tuple이 기존에 features dict 값을 모두 float로 변환했는데, dark_suspicion 구조를 넣으면서 dark_patterns(list) 등 비스칼라 타입이 포함되어 upsert 실패 (float argument not a list). _sanitize 재귀 함수로 JSON 호환 타입(str/int/float/bool/list/dict/None)을 그대로 보존하도록 변경. --- prediction/models/result.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/prediction/models/result.py b/prediction/models/result.py index bb7a69c..ae439e7 100644 --- a/prediction/models/result.py +++ b/prediction/models/result.py @@ -70,8 +70,24 @@ class AnalysisResult: """numpy int → Python int 변환.""" return int(v) if v is not None else 0 - # features dict 내부 numpy 값도 변환 - safe_features = {k: float(v) for k, v in self.features.items()} if self.features else {} + # features dict 내부 numpy 값도 변환 (재귀적 처리) + # int/float/bool/str/None/list/dict 모두 허용 (JSON 호환 타입만 유지) + def _sanitize(v): + if v is None or isinstance(v, (str, bool)): + return v + if isinstance(v, (int, float)): + return float(v) if isinstance(v, float) else int(v) + if isinstance(v, dict): + return {str(k): _sanitize(val) for k, val in v.items()} + if isinstance(v, (list, tuple)): + return [_sanitize(x) for x in v] + # numpy 스칼라 등은 float 변환 시도, 실패 시 str + try: + return float(v) + except (TypeError, ValueError): + return str(v) + + safe_features = _sanitize(self.features) if self.features else {} return ( str(self.mmsi), From b15a94066a7d8aed97eb32a1e28eb4511ffe8ba5 Mon Sep 17 00:00:00 2001 From: htlee Date: Thu, 9 Apr 2026 09:55:46 +0900 Subject: [PATCH 5/5] =?UTF-8?q?docs:=20prediction=202=EC=B0=A8=20=EA=B0=9C?= =?UTF-8?q?=ED=8E=B8=20=EB=A6=B4=EB=A6=AC=EC=A6=88=20=EB=85=B8=ED=8A=B8=20?= =?UTF-8?q?+=20hourly=20snapshot=20=EC=8A=A4=ED=81=AC=EB=A6=BD=ED=8A=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - RELEASE-NOTES [Unreleased] 섹션에 dark 의심 점수화 + transship 재설계 변경사항 추가 - prediction/scripts/hourly-analysis-snapshot.sh: 시간별 상태 스냅샷 수집 (25개 섹션) --- docs/RELEASE-NOTES.md | 35 ++ .../scripts/hourly-analysis-snapshot.sh | 340 ++++++++++++++++++ 2 files changed, 375 insertions(+) create mode 100755 prediction/scripts/hourly-analysis-snapshot.sh diff --git a/docs/RELEASE-NOTES.md b/docs/RELEASE-NOTES.md index 25d264d..29f37b9 100644 --- a/docs/RELEASE-NOTES.md +++ b/docs/RELEASE-NOTES.md @@ -4,6 +4,41 @@ ## [Unreleased] +### 추가 +- **Dark Vessel 의심 점수화** — 기존 "gap≥30분→dark" 이분법에서 8가지 패턴 기반 0~100점 점수 산출 + CRITICAL/HIGH/WATCH/NONE 등급 분류 + - P1 이동 중 OFF / P2 민감 수역 / P3 반복 이력(7일) / P4 거리 비정상 / P5 주간 조업 OFF / P6 직전 이상행동 / P7 무허가 / P8 장기 gap + - 한국 AIS 수신 커버리지 밖은 자연 gap 가능성으로 감점 + - 어구(gear) AIS, 한국 선박(440/441) dark 판정 완전 제외 + - `features` JSONB에 `dark_suspicion_score`, `dark_patterns`, `dark_tier`, `dark_history_7d` 등 저장 +- **Transshipment 베테랑 관점 재설계** — 점수 기반 40~130점 판정 (CRITICAL/HIGH/WATCH) + - SOG 2.0→1.0kn, 근접 110→77m, 지속 60→45분 + gap tolerance 2사이클 + - 한국 EEZ 관할 수역 이내 필수, 어구/여객/군함/유조 제외 + - 야간·무허가·COG편차·장기지속·영해위치 가점 + - pair_history 구조 확장: `{'first_seen', 'last_seen', 'miss_count'}` (GPS 노이즈 내성) + +### 변경 +- **stats_aggregator hourly**: UTC→KST hour boundary 전환, `by_category`/`by_zone` JSONB 집계 추가 +- **event_generator 룰 전면 재편**: + - EEZ_INTRUSION: 실측 zone_code(TERRITORIAL_SEA/CONTIGUOUS_ZONE/ZONE_*) 기반 신규 3룰 + - HIGH_RISK_VESSEL: risk.py CRITICAL 임계값과 동일(70점) 연동, 50~69점 MEDIUM 분리 + - DARK_VESSEL: features.dark_tier 기반 CRITICAL/HIGH 룰 (기존 gap>60 룰 교체) + - ILLEGAL_TRANSSHIP: features.transship_tier 기반 CRITICAL/HIGH 룰 (기존 단순 룰 교체) + - break 제거 → mmsi당 복수 카테고리 동시 매칭 가능 + - dedup 윈도우 prime 값 분산 (60/120/360→67/127/367 등, 정시 일제 만료 회피) +- **lightweight path 신호 보강**: vessel_store 24h 궤적으로 dark/spoofing/speed_jump 산출 +- `compute_lightweight_risk_score`에 dark gap + spoofing 가점 추가 (max 60→100) +- `_gear_re` 중복 제거 → `fleet_tracker.GEAR_PATTERN` SSOT 통합 +- `AnalysisResult.to_db_tuple` features sanitize: 중첩 dict/list 지원 + +### 수정 +- `prediction_stats_hourly.by_category`/`by_zone` 영구 NULL → 채움 +- `prediction_stats_hourly.critical_count` 영구 0 → CRITICAL 이벤트 수 반영 +- `prediction_events` 카테고리 2종(ZONE_DEPARTURE/ILLEGAL_TRANSSHIP)만 → 6종 이상 +- KPI `dark_vessel`/`eez_violation` 영구 0 → 정상 집계 +- 이벤트 홀수/짝수시 4~22배 진폭 → dedup prime 분산으로 완화 +- dark 과다 판정 해소: 핫픽스(한국 수신 영역 필터) + 2차(의심 점수화) +- transship 과다 판정 해소: 사이클당 2,400~12,600 → CRITICAL/HIGH/WATCH 점수 기반 + ## [2026-04-08] ### 추가 diff --git a/prediction/scripts/hourly-analysis-snapshot.sh b/prediction/scripts/hourly-analysis-snapshot.sh new file mode 100755 index 0000000..8bf863d --- /dev/null +++ b/prediction/scripts/hourly-analysis-snapshot.sh @@ -0,0 +1,340 @@ +#!/bin/bash +# prediction 시간당 상태 스냅샷 수집기 +# 실행 환경: redis-211 서버 (prediction 서비스 호스트) +# cron: 0 * * * * /home/apps/kcg-ai-prediction/scripts/hourly-analysis-snapshot.sh +# +# 출력: /home/apps/kcg-ai-prediction/data/hourly-analysis/YYYYMMDD-HHMM.txt +# 수집 대상: +# 1. vessel_analysis_results 전체 분포 (pipeline vs lightweight, dark/spoof/risk) +# 2. zone_code 분포 + dark 교차 집계 +# 3. dark vessel gap_duration_min 분포 +# 4. dark vessel activity_state 분포 +# 5. dark vessel 상세 샘플 20건 (mmsi/zone/gap/lat/lon) +# 6. prediction_events 카테고리×level 분포 +# 7. prediction_stats_hourly 최근 2건 +# 8. prediction_kpi_realtime 전체 +# 9. risk_score 히스토그램 +# 10. 직전 1시간 사이클 로그 (journalctl) + +set -u + +OUTDIR=/home/apps/kcg-ai-prediction/data/hourly-analysis +mkdir -p "$OUTDIR" +STAMP=$(date '+%Y%m%d-%H%M') +OUT="$OUTDIR/$STAMP.txt" + +export PGPASSWORD=Kcg2026ai +PSQL="psql -U kcg-app -d kcgaidb -h 211.208.115.83 -P pager=off" + +{ +echo "# prediction hourly snapshot" +echo "# generated: $(date '+%Y-%m-%d %H:%M:%S %Z')" +echo "# host: $(hostname)" +echo "" + +$PSQL << 'SQL' +\echo === 1. VESSEL_ANALYSIS overview (last 1h) === +SELECT count(*) total, + count(*) FILTER (WHERE vessel_type != 'UNKNOWN') pipeline_path, + count(*) FILTER (WHERE vessel_type = 'UNKNOWN') lightweight_path, + count(*) FILTER (WHERE is_dark) dark, + count(*) FILTER (WHERE spoofing_score > 0.5) spoof_hi, + count(*) FILTER (WHERE spoofing_score > 0) spoof_any, + count(*) FILTER (WHERE risk_score >= 70) crit_score, + count(*) FILTER (WHERE risk_level='CRITICAL') crit_lvl, + count(*) FILTER (WHERE risk_level='HIGH') high_lvl, + max(risk_score) max_risk, + round(avg(risk_score)::numeric, 2) avg_risk, + count(*) FILTER (WHERE transship_suspect) transship +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '1 hour'; + +\echo +\echo === 2. ZONE × DARK distribution === +SELECT zone_code, + count(*) total, + count(*) FILTER (WHERE is_dark) dark, + count(*) FILTER (WHERE risk_score >= 70) crit, + round(avg(risk_score)::numeric, 1) avg_risk +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '1 hour' +GROUP BY zone_code ORDER BY total DESC; + +\echo +\echo === 3. DARK GAP distribution (all vessels in last 1h) === +SELECT CASE + WHEN gap_duration_min < 30 THEN 'a_lt30' + WHEN gap_duration_min < 60 THEN 'b_30-59' + WHEN gap_duration_min < 120 THEN 'c_60-119' + WHEN gap_duration_min < 360 THEN 'd_120-359' + WHEN gap_duration_min < 1440 THEN 'e_360-1439' + ELSE 'f_gte1440' END gap_bucket, + count(*) total, + count(*) FILTER (WHERE is_dark) dark, + count(*) FILTER (WHERE is_dark AND vessel_type='UNKNOWN') dark_lightweight, + count(*) FILTER (WHERE is_dark AND vessel_type!='UNKNOWN') dark_pipeline +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '1 hour' +GROUP BY gap_bucket ORDER BY gap_bucket; + +\echo +\echo === 4. DARK vessels by activity_state === +SELECT activity_state, count(*), round(avg(gap_duration_min)::numeric, 0) avg_gap_min +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '1 hour' AND is_dark +GROUP BY activity_state ORDER BY count DESC; + +\echo +\echo === 5. DARK sample top 20 by gap (mmsi/zone/gap/state) === +SELECT mmsi, zone_code, activity_state, gap_duration_min, risk_score +FROM ( + SELECT DISTINCT ON (mmsi) mmsi, zone_code, activity_state, gap_duration_min, + risk_score, analyzed_at + FROM kcg.vessel_analysis_results + WHERE analyzed_at > now() - interval '1 hour' AND is_dark + ORDER BY mmsi, analyzed_at DESC +) latest +ORDER BY gap_duration_min DESC LIMIT 20; + +\echo +\echo === 6. PREDICTION_EVENTS last 1h by category×level === +SELECT category, level, count(*) cnt +FROM kcg.prediction_events +WHERE created_at > now() - interval '1 hour' +GROUP BY category, level ORDER BY cnt DESC; + +\echo +\echo === 7. STATS_HOURLY latest 3 rows === +SELECT stat_hour, total_detections, event_count, critical_count, + by_category::text, by_zone::text +FROM kcg.prediction_stats_hourly +ORDER BY stat_hour DESC LIMIT 3; + +\echo +\echo === 8. KPI REALTIME === +SELECT kpi_key, value, trend, delta_pct, updated_at +FROM kcg.prediction_kpi_realtime ORDER BY kpi_key; + +\echo +\echo === 9. RISK_SCORE histogram (last 1h) === +SELECT CASE + WHEN risk_score < 10 THEN 'a_0-9' + WHEN risk_score < 30 THEN 'b_10-29' + WHEN risk_score < 50 THEN 'c_30-49' + WHEN risk_score < 70 THEN 'd_50-69' + WHEN risk_score < 90 THEN 'e_70-89' + ELSE 'f_90-100' END bucket, + count(*) cnt, + count(*) FILTER (WHERE vessel_type='UNKNOWN') lightweight +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '1 hour' +GROUP BY bucket ORDER BY bucket; + +\echo +\echo === 10. TRANSSHIP, SPOOFING, FLEET 요약 === +SELECT + count(*) FILTER (WHERE transship_suspect) transship_ct, + count(*) FILTER (WHERE spoofing_score > 0.7) spoof_gt070, + count(*) FILTER (WHERE spoofing_score > 0.5 AND spoofing_score <= 0.7) spoof_050_070, + count(*) FILTER (WHERE speed_jump_count > 0) speed_jumps, + count(*) FILTER (WHERE fleet_is_leader) fleet_leader, + count(DISTINCT fleet_cluster_id) FILTER (WHERE fleet_cluster_id IS NOT NULL AND fleet_cluster_id > 0) fleet_clusters +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '1 hour'; + +\echo +\echo === 10-1. FLEET_ROLE distribution === +SELECT fleet_role, count(*), count(DISTINCT mmsi) uniq_mmsi +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '1 hour' +GROUP BY fleet_role ORDER BY count DESC; + +\echo +\echo === 10-2. TRANSSHIPMENT duration histogram === +SELECT CASE + WHEN transship_duration_min < 5 THEN 'a_0-4' + WHEN transship_duration_min < 15 THEN 'b_5-14' + WHEN transship_duration_min < 30 THEN 'c_15-29' + WHEN transship_duration_min < 60 THEN 'd_30-59' + WHEN transship_duration_min < 120 THEN 'e_60-119' + ELSE 'f_gte120' END bucket, count(*) +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '1 hour' AND transship_suspect +GROUP BY bucket ORDER BY bucket; + +\echo +\echo === G1. PIPELINE vessel_type (어구 타입) distribution === +SELECT vessel_type, + count(*), + count(*) FILTER (WHERE fishing_pct > 50) active_fishing, + round(avg(fishing_pct)::numeric, 1) avg_fish_pct, + round(avg(ucaf_score)::numeric, 3) avg_ucaf, + round(avg(ucft_score)::numeric, 3) avg_ucft, + round(avg(risk_score)::numeric, 1) avg_risk +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '1 hour' +GROUP BY vessel_type ORDER BY count DESC; + +\echo +\echo === G2. ACTIVITY_STATE distribution (전체) === +SELECT activity_state, count(*), + count(*) FILTER (WHERE vessel_type != 'UNKNOWN') pipeline, + round(avg(risk_score)::numeric, 1) avg_risk +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '1 hour' +GROUP BY activity_state ORDER BY count DESC; + +\echo +\echo === G3. GEAR_GROUP_PARENT_RESOLUTION status + confidence === +SELECT status, count(*), + round(avg(confidence)::numeric, 3) avg_conf, + round(avg(top_score)::numeric, 3) avg_top, + round(avg(score_margin)::numeric, 3) avg_margin, + round(avg(stable_cycles)::numeric, 1) avg_stable +FROM kcg.gear_group_parent_resolution +GROUP BY status ORDER BY count DESC; + +\echo +\echo === G3-1. PARENT_RESOLUTION decision_source === +SELECT coalesce(decision_source, '(null)') ds, status, count(*) +FROM kcg.gear_group_parent_resolution +GROUP BY ds, status ORDER BY count DESC LIMIT 20; + +\echo +\echo === G4. GEAR_GROUP_EPISODES (active) === +SELECT status, continuity_source, count(*), + round(avg(continuity_score)::numeric, 3) avg_cont, + round(avg(current_member_count)::numeric, 1) avg_members, + round(avg(EXTRACT(EPOCH FROM (now() - first_seen_at))/3600)::numeric, 1) avg_age_h +FROM kcg.gear_group_episodes +WHERE last_seen_at > now() - interval '24 hours' +GROUP BY status, continuity_source ORDER BY count DESC; + +\echo +\echo === G5. GEAR_CORRELATION_SCORES (current_score) 분포 === +SELECT CASE + WHEN current_score < 0.3 THEN 'a_lt0.3' + WHEN current_score < 0.5 THEN 'b_0.3-0.5' + WHEN current_score < 0.7 THEN 'c_0.5-0.7' + WHEN current_score < 0.85 THEN 'd_0.7-0.85' + ELSE 'e_gte0.85' END bucket, + count(*), + count(DISTINCT group_key) uniq_groups, + count(DISTINCT target_mmsi) uniq_targets, + round(avg(streak_count)::numeric, 1) avg_streak +FROM kcg.gear_correlation_scores +WHERE updated_at > now() - interval '1 hour' +GROUP BY bucket ORDER BY bucket; + +\echo +\echo === G5-1. CORRELATION freeze_state === +SELECT freeze_state, count(*), round(avg(current_score)::numeric, 3) avg_score +FROM kcg.gear_correlation_scores +WHERE updated_at > now() - interval '1 hour' +GROUP BY freeze_state ORDER BY count DESC; + +\echo +\echo === G6. GROUP_POLYGON_SNAPSHOTS (last 1h, by type × zone) === +SELECT group_type, + coalesce(zone_id, '(null)') zone, + count(*), + round(avg(area_sq_nm)::numeric, 2) avg_area_nm, + round(avg(member_count)::numeric, 1) avg_members +FROM kcg.group_polygon_snapshots +WHERE snapshot_time > now() - interval '1 hour' +GROUP BY group_type, zone_id ORDER BY count DESC LIMIT 20; + +\echo +\echo === G7. IS_PERMITTED breakdown (lightweight path 기준) === +SELECT + count(*) FILTER (WHERE vessel_type != 'UNKNOWN') pipeline_ct, + count(*) FILTER (WHERE vessel_type = 'UNKNOWN') lightweight_ct, + count(DISTINCT mmsi) FILTER (WHERE risk_score >= 20) risk_gte20_uniq, + count(DISTINCT mmsi) FILTER (WHERE risk_score >= 50) risk_gte50_uniq, + count(DISTINCT mmsi) FILTER (WHERE risk_score >= 70) risk_gte70_uniq +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '1 hour'; + +\echo +\echo === G8. VIOLATION_CATEGORIES (last 1h, unnest) === +SELECT unnest(violation_categories) vcat, count(*) +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '1 hour' AND violation_categories IS NOT NULL +GROUP BY vcat ORDER BY count DESC LIMIT 20; + +\echo +\echo === G9. PREDICTION_EVENTS 24h hourly trend (KST) === +SELECT date_trunc('hour', occurred_at AT TIME ZONE 'Asia/Seoul') hr, + count(*) tot, + count(*) FILTER (WHERE category='DARK_VESSEL') dark, + count(*) FILTER (WHERE category='ILLEGAL_TRANSSHIP') transship, + count(*) FILTER (WHERE category='EEZ_INTRUSION') eez, + count(*) FILTER (WHERE category='HIGH_RISK_VESSEL') high_risk, + count(*) FILTER (WHERE category='ZONE_DEPARTURE') zone_dep, + count(*) FILTER (WHERE level='CRITICAL') critical +FROM kcg.prediction_events +WHERE created_at > now() - interval '24 hours' +GROUP BY hr ORDER BY hr DESC LIMIT 25; + +\echo +\echo === G10. PREDICTION_ALERTS (last 1h) === +SELECT channel, delivery_status, count(*), + round(avg(ai_confidence)::numeric, 3) avg_conf +FROM kcg.prediction_alerts +WHERE sent_at > now() - interval '1 hour' +GROUP BY channel, delivery_status ORDER BY count DESC; + +SQL + +echo "" +echo "=== 11. DARK SAMPLE latest position (snpdb t_vessel_tracks_5min) ===" +# Cross-database 불가 → 두 단계: kcgaidb에서 mmsi 추출 → snpdb에 별도 쿼리 +DARK_MMSIS=$(PGPASSWORD=Kcg2026ai psql -U kcg-app -d kcgaidb -h 211.208.115.83 -tA -c " +SELECT string_agg(quote_literal(mmsi), ',') +FROM (SELECT DISTINCT ON (mmsi) mmsi, gap_duration_min, analyzed_at + FROM kcg.vessel_analysis_results + WHERE analyzed_at > now() - interval '1 hour' AND is_dark + ORDER BY mmsi, analyzed_at DESC) v +WHERE v.mmsi IN ( + SELECT mmsi FROM (SELECT DISTINCT ON (mmsi) mmsi, gap_duration_min, analyzed_at + FROM kcg.vessel_analysis_results + WHERE analyzed_at > now() - interval '1 hour' AND is_dark + ORDER BY mmsi, analyzed_at DESC) x + ORDER BY gap_duration_min DESC LIMIT 20 +);" 2>/dev/null) + +if [ -n "$DARK_MMSIS" ]; then + PGPASSWORD='snp#8932' psql -U snp -d snpdb -h 211.208.115.83 -P pager=off -c " + SELECT DISTINCT ON (mmsi) mmsi, time_bucket, + round(ST_Y(ST_EndPoint(track_geom))::numeric, 4) lat, + round(ST_X(ST_EndPoint(track_geom))::numeric, 4) lon + FROM signal.t_vessel_tracks_5min + WHERE mmsi IN ($DARK_MMSIS) AND time_bucket > now() - interval '24 hours' + ORDER BY mmsi, time_bucket DESC; + " 2>&1 | head -30 +else + echo "(no dark vessels in last 1h)" +fi + +echo "" +echo "=== 12. PREDICTION_EVENTS occurred_at distribution by 10-min buckets ===" +PGPASSWORD=Kcg2026ai psql -U kcg-app -d kcgaidb -h 211.208.115.83 -P pager=off -c " +SELECT date_trunc('hour', occurred_at) + (date_part('minute', occurred_at)::int / 10 * interval '10 minutes') bucket, + category, count(*) +FROM kcg.prediction_events +WHERE created_at > now() - interval '1 hour' +GROUP BY bucket, category +ORDER BY bucket DESC, count DESC LIMIT 30; +" 2>&1 + +echo "" +echo "=== 13. CYCLE LOG (last 65 min) ===" +journalctl -u kcg-ai-prediction --since '65 minutes ago' --no-pager 2>/dev/null | \ + grep -E 'lightweight analysis|event_generator:|stats_aggregator hourly|kpi_writer:|analysis cycle:|ERROR|Traceback' | \ + tail -60 + +echo "" +echo "=== END ===" +} > "$OUT" 2>&1 + +echo "[snapshot] saved: $OUT"