diff --git a/prediction/algorithms/dark_vessel.py b/prediction/algorithms/dark_vessel.py index 9e8b9f2..656cbd0 100644 --- a/prediction/algorithms/dark_vessel.py +++ b/prediction/algorithms/dark_vessel.py @@ -1,3 +1,5 @@ +from typing import Callable, Optional + import pandas as pd from algorithms.location import haversine_nm @@ -5,6 +7,10 @@ GAP_SUSPICIOUS_SEC = 1800 # 30분 GAP_HIGH_SUSPICIOUS_SEC = 3600 # 1시간 GAP_VIOLATION_SEC = 86400 # 24시간 +# 한국 AIS 수신 가능 추정 영역 (한반도 + EEZ + 접속수역 여유) +_KR_COVERAGE_LAT = (32.0, 39.5) +_KR_COVERAGE_LON = (124.0, 132.0) + def detect_ais_gaps(df_vessel: pd.DataFrame) -> list[dict]: """AIS 수신 기록에서 소실 구간 추출.""" @@ -57,3 +63,271 @@ def is_dark_vessel(df_vessel: pd.DataFrame) -> tuple[bool, int]: max_gap_min = max(g['gap_min'] for g in gaps) is_dark = max_gap_min >= 30 # 30분 이상 소실 return is_dark, int(max_gap_min) + + +def _classify_state(sog: float) -> str: + """SOG 기준 간단 활동 상태 분류.""" + if sog is None: + return 'UNKNOWN' + if sog <= 1.0: + return 'STATIONARY' + if sog <= 5.0: + return 'FISHING' + return 'SAILING' + + +def analyze_dark_pattern(df_vessel: pd.DataFrame) -> dict: + """dark 판정 + gap 상세 정보 반환. + + 가장 긴 gap 한 건을 기준으로 패턴 분석에 필요한 정보를 모두 수집한다. + is_dark가 False이면 나머지 필드는 기본값으로 채움. + + Returns: + { + 'is_dark': bool, + 'gap_min': int, + 'gap_start_lat': Optional[float], + 'gap_start_lon': Optional[float], + 'gap_start_sog': float, + 'gap_start_state': str, + 'gap_end_lat': Optional[float], + 'gap_end_lon': Optional[float], + 'gap_distance_nm': float, + 'gap_resumed': bool, + 'pre_gap_turn_or_teleport': bool, + 'avg_sog_before': float, + } + """ + default = { + 'is_dark': False, + 'gap_min': 0, + 'gap_start_lat': None, + 'gap_start_lon': None, + 'gap_start_sog': 0.0, + 'gap_start_state': 'UNKNOWN', + 'gap_end_lat': None, + 'gap_end_lon': None, + 'gap_distance_nm': 0.0, + 'gap_resumed': False, + 'pre_gap_turn_or_teleport': False, + 'avg_sog_before': 0.0, + } + + if df_vessel is None or len(df_vessel) < 2: + return default + + df_sorted = df_vessel.sort_values('timestamp').reset_index(drop=True) + records = df_sorted.to_dict('records') + + # 가장 긴 gap 찾기 + max_gap_sec = 0.0 + max_gap_idx = -1 # records에서 gap 직후 인덱스 (curr) + for i in range(1, len(records)): + prev_ts = pd.Timestamp(records[i - 1]['timestamp']) + curr_ts = pd.Timestamp(records[i]['timestamp']) + gap_sec = (curr_ts - prev_ts).total_seconds() + if gap_sec > max_gap_sec: + max_gap_sec = gap_sec + max_gap_idx = i + + if max_gap_idx < 1 or max_gap_sec < GAP_SUSPICIOUS_SEC: + return default + + prev_row = records[max_gap_idx - 1] # gap 직전 마지막 포인트 + curr_row = records[max_gap_idx] # gap 직후 첫 포인트 + + gap_start_lat = float(prev_row.get('lat')) if prev_row.get('lat') is not None else None + gap_start_lon = float(prev_row.get('lon')) if prev_row.get('lon') is not None else None + gap_end_lat = float(curr_row.get('lat')) if curr_row.get('lat') is not None else None + gap_end_lon = float(curr_row.get('lon')) if curr_row.get('lon') is not None else None + + # gap 직전 SOG 추정: prev 행의 raw_sog 또는 computed sog 사용 + gap_start_sog = float(prev_row.get('sog') or prev_row.get('raw_sog') or 0.0) + + # gap 중 이동 거리 + if all(v is not None for v in (gap_start_lat, gap_start_lon, gap_end_lat, gap_end_lon)): + gap_distance_nm = haversine_nm( + gap_start_lat, gap_start_lon, gap_end_lat, gap_end_lon, + ) + else: + gap_distance_nm = 0.0 + + # 현재 시점 기준 gap이 "재개되었는지" 판단: + # curr_row가 df_sorted의 마지막 포인트가 아니면 신호가 이미 재개된 상태 + # 마지막 포인트면 아직 gap 진행 중(curr_row는 gap 시작 직후 아니라 gap 전의 마지막일 수도 있음) + is_last = (max_gap_idx == len(records) - 1) + # gap이 마지막이면 신호 복귀 미확인 + gap_resumed = not is_last or ( + is_last and max_gap_idx < len(records) - 1 # 항상 False지만 안전용 + ) + # 단, max_gap_idx가 마지막이면 gap 후 포인트 없음 → 재개 미확인 + if max_gap_idx == len(records) - 1: + gap_resumed = False + else: + gap_resumed = True + + # gap 직전 5개 포인트로 평균 SOG + 이상 행동(teleport) 판정 + start_idx = max(0, max_gap_idx - 5) + window = records[start_idx:max_gap_idx] + if window: + sogs = [float(r.get('sog') or r.get('raw_sog') or 0.0) for r in window] + avg_sog_before = sum(sogs) / len(sogs) if sogs else 0.0 + else: + avg_sog_before = gap_start_sog + + # gap 직전 window에 teleportation 발생 여부 + pre_gap_turn_or_teleport = False + if len(window) >= 2: + try: + window_df = df_sorted.iloc[start_idx:max_gap_idx].copy() + # spoofing.detect_teleportation 재사용 (순환 import 방지 위해 지연 import) + from algorithms.spoofing import detect_teleportation + teleports = detect_teleportation(window_df) + if teleports: + pre_gap_turn_or_teleport = True + except Exception: + pre_gap_turn_or_teleport = False + + return { + 'is_dark': True, + 'gap_min': int(max_gap_sec / 60), + 'gap_start_lat': gap_start_lat, + 'gap_start_lon': gap_start_lon, + 'gap_start_sog': gap_start_sog, + 'gap_start_state': _classify_state(gap_start_sog), + 'gap_end_lat': gap_end_lat, + 'gap_end_lon': gap_end_lon, + 'gap_distance_nm': round(gap_distance_nm, 2), + 'gap_resumed': gap_resumed, + 'pre_gap_turn_or_teleport': pre_gap_turn_or_teleport, + 'avg_sog_before': round(avg_sog_before, 2), + } + + +def _is_in_kr_coverage(lat: Optional[float], lon: Optional[float]) -> bool: + if lat is None or lon is None: + return False + return (_KR_COVERAGE_LAT[0] <= lat <= _KR_COVERAGE_LAT[1] + and _KR_COVERAGE_LON[0] <= lon <= _KR_COVERAGE_LON[1]) + + +def compute_dark_suspicion( + gap_info: dict, + mmsi: str, + is_permitted: bool, + history: dict, + now_kst_hour: int, + classify_zone_fn: Optional[Callable[[float, float], dict]] = None, +) -> tuple[int, list[str], str]: + """의도적 AIS OFF 의심 점수 산출. + + Args: + gap_info: analyze_dark_pattern 결과 + mmsi: 선박 MMSI + is_permitted: 허가 어선 여부 + history: {'count_7d': int, 'count_24h': int} + now_kst_hour: 현재 KST 시각 (0~23) + classify_zone_fn: (lat, lon) -> dict. gap_start 위치의 zone 판단 + + Returns: + (score, patterns, tier) + tier: 'CRITICAL' / 'HIGH' / 'WATCH' / 'NONE' + """ + if not gap_info.get('is_dark'): + return 0, [], 'NONE' + + score = 0 + patterns: list[str] = [] + + gap_start_sog = gap_info.get('gap_start_sog') or 0.0 + gap_start_state = gap_info.get('gap_start_state', 'UNKNOWN') + gap_start_lat = gap_info.get('gap_start_lat') + gap_start_lon = gap_info.get('gap_start_lon') + gap_min = gap_info.get('gap_min') or 0 + + # P1: 이동 중 OFF + if gap_start_sog > 5.0: + score += 25 + patterns.append('moving_at_off') + elif gap_start_sog > 2.0: + score += 15 + patterns.append('slow_moving_at_off') + + # P2: gap 시작 위치의 민감 수역 + if classify_zone_fn is not None and gap_start_lat is not None and gap_start_lon is not None: + try: + zone_info = classify_zone_fn(gap_start_lat, gap_start_lon) + zone = zone_info.get('zone', '') + if zone in ('TERRITORIAL_SEA', 'CONTIGUOUS_ZONE'): + score += 25 + patterns.append('sensitive_zone') + elif zone.startswith('ZONE_'): + score += 15 + patterns.append('special_zone') + except Exception: + pass + + # P3: 반복 이력 (과거 7일) + h7 = int(history.get('count_7d', 0) or 0) + h24 = int(history.get('count_24h', 0) or 0) + if h7 >= 3: + score += 30 + patterns.append('repeat_high') + elif h7 >= 2: + score += 15 + patterns.append('repeat_low') + if h24 >= 1: + score += 10 + patterns.append('recent_dark') + + # P4: gap 후 이동 거리 비정상 + gap_distance_nm = gap_info.get('gap_distance_nm') or 0.0 + avg_sog_before = gap_info.get('avg_sog_before') or 0.0 + if gap_info.get('gap_resumed') and gap_min > 0: + gap_hours = gap_min / 60.0 + # 예상 이동 = avg_sog * gap_hours. 2배 초과면 비정상 + expected = max(gap_hours * max(avg_sog_before, 1.0), 0.5) + if gap_distance_nm > expected * 2.0: + score += 20 + patterns.append('distance_anomaly') + + # P5: 주간 조업 시간 OFF + if 6 <= now_kst_hour < 18 and gap_start_state == 'FISHING': + score += 15 + patterns.append('daytime_fishing_off') + + # P6: gap 직전 이상 행동 + if gap_info.get('pre_gap_turn_or_teleport'): + score += 15 + patterns.append('teleport_before_gap') + + # P7: 무허가 + if not is_permitted: + score += 10 + patterns.append('unpermitted') + + # P8: gap 길이 + if gap_min >= 360: + score += 15 + patterns.append('very_long_gap') + elif gap_min >= 180: + score += 10 + patterns.append('long_gap') + + # 감점: gap 시작 위치가 한국 수신 커버리지 밖 → 자연 gap 가능성 + if not _is_in_kr_coverage(gap_start_lat, gap_start_lon): + score -= 30 + patterns.append('out_of_coverage') + + score = max(0, min(100, score)) + + if score >= 70: + tier = 'CRITICAL' + elif score >= 50: + tier = 'HIGH' + elif score >= 30: + tier = 'WATCH' + else: + tier = 'NONE' + + return score, patterns, tier diff --git a/prediction/algorithms/transshipment.py b/prediction/algorithms/transshipment.py index 9e26b95..27cdd36 100644 --- a/prediction/algorithms/transshipment.py +++ b/prediction/algorithms/transshipment.py @@ -15,31 +15,40 @@ from __future__ import annotations import logging import math from datetime import datetime, timezone -from typing import Optional +from typing import Callable, Optional import pandas as pd +from fleet_tracker import GEAR_PATTERN + logger = logging.getLogger(__name__) # ────────────────────────────────────────────────────────────── -# 상수 +# 상수 (2026-04-09 재조정 — 베테랑 관점) # ────────────────────────────────────────────────────────────── -SOG_THRESHOLD_KN = 2.0 # 정박/표류 기준 속도 (노트) -PROXIMITY_DEG = 0.001 # 근접 판정 임계값 (~110m) -SUSPECT_DURATION_MIN = 60 # 의심 판정 최소 지속 시간 (분) -PAIR_EXPIRY_MIN = 120 # pair_history 항목 만료 기준 (분) +SOG_THRESHOLD_KN = 1.0 # 2.0 → 1.0 (완전 정박 수준) +PROXIMITY_DEG = 0.0007 # 0.001 → 0.0007 (~77m, GPS 노이즈 포함한 근접) +SUSPECT_DURATION_MIN = 45 # 60 → 45 (gap tolerance 있음) +PAIR_EXPIRY_MIN = 180 # 120 → 180 +GAP_TOLERANCE_CYCLES = 2 # 신규: 2 사이클까지 active에서 빠져도 리셋 안 함 -# 외국 해안 근접 제외 경계 -_CN_LON_MAX = 123.5 # 중국 해안: 경도 < 123.5 -_JP_LON_MIN = 130.5 # 일본 해안: 경도 > 130.5 -_TSUSHIMA_LAT_MIN = 33.8 # 대마도: 위도 > 33.8 AND 경도 > 129.0 +# 외국 해안 근접 제외 경계 (레거시 — 관할 필터로 대체됨) +_CN_LON_MAX = 123.5 +_JP_LON_MIN = 130.5 +_TSUSHIMA_LAT_MIN = 33.8 _TSUSHIMA_LON_MIN = 129.0 -# 탐지 대상 선종 (소문자 정규화 후 비교) -_CANDIDATE_TYPES: frozenset[str] = frozenset({'tanker', 'cargo', 'fishing'}) +# 한국 EEZ 관할 수역 (단속 가능 범위) +_KR_EEZ_LAT = (32.0, 39.5) +_KR_EEZ_LON = (124.0, 132.0) -# 그리드 셀 크기 = PROXIMITY_DEG (셀 하나 = 근접 임계와 동일 크기) +# 환적 불가능 선종 (여객/군함/유조/도선/예인/수색구조) +_TRANSSHIP_EXCLUDED: frozenset[str] = frozenset({ + 'passenger', 'military', 'tanker', 'pilot', 'tug', 'sar', +}) + +# 그리드 셀 크기 _GRID_CELL_DEG = PROXIMITY_DEG @@ -58,6 +67,28 @@ def _is_near_foreign_coast(lat: float, lon: float) -> bool: return False +def _is_in_kr_jurisdiction(lat: float, lon: float) -> bool: + """한국 EEZ 관할 수역 여부 (단속 가능 범위).""" + return (_KR_EEZ_LAT[0] <= lat <= _KR_EEZ_LAT[1] + and _KR_EEZ_LON[0] <= lon <= _KR_EEZ_LON[1]) + + +def _is_candidate_ship_type(vessel_type_a: Optional[str], vessel_type_b: Optional[str]) -> bool: + """환적 후보 선종인지 (명시적 제외만 차단, 미상은 허용).""" + a = (vessel_type_a or '').strip().lower() + b = (vessel_type_b or '').strip().lower() + if a in _TRANSSHIP_EXCLUDED or b in _TRANSSHIP_EXCLUDED: + return False + return True + + +def _is_gear_name(name: Optional[str]) -> bool: + """어구 이름 패턴 매칭 — fleet_tracker.GEAR_PATTERN SSOT.""" + if not name: + return False + return bool(GEAR_PATTERN.match(name)) + + def _cell_key(lat: float, lon: float) -> tuple[int, int]: """위도/경도를 그리드 셀 인덱스로 변환.""" return (int(math.floor(lat / _GRID_CELL_DEG)), @@ -101,14 +132,25 @@ def _pair_key(mmsi_a: str, mmsi_b: str) -> tuple[str, str]: def _evict_expired_pairs( - pair_history: dict[tuple[str, str], datetime], + pair_history: dict, now: datetime, ) -> None: - """PAIR_EXPIRY_MIN 이상 갱신 없는 pair_history 항목 제거.""" - expired = [ - key for key, first_seen in pair_history.items() - if (now - first_seen).total_seconds() / 60 > PAIR_EXPIRY_MIN - ] + """PAIR_EXPIRY_MIN 이상 갱신 없는 pair_history 항목 제거. + + 새 구조: {(a,b): {'first_seen': dt, 'last_seen': dt, 'miss_count': int}} + """ + expired = [] + for key, meta in pair_history.items(): + if not isinstance(meta, dict): + # 레거시 구조 (datetime 직접 저장)는 즉시 제거 → 다음 사이클에서 재구성 + expired.append(key) + continue + last_seen = meta.get('last_seen') or meta.get('first_seen') + if last_seen is None: + expired.append(key) + continue + if (now - last_seen).total_seconds() / 60 > PAIR_EXPIRY_MIN: + expired.append(key) for key in expired: del pair_history[key] @@ -117,24 +159,119 @@ def _evict_expired_pairs( # 공개 API # ────────────────────────────────────────────────────────────── +def _score_pair( + pair: tuple[str, str], + meta: dict, + lat: float, + lon: float, + cog_a: Optional[float], + cog_b: Optional[float], + vessel_info_a: dict, + vessel_info_b: dict, + is_permitted_fn: Optional[Callable[[str], bool]], + now_kst_hour: int, + zone_code: Optional[str], + now: datetime, +) -> Optional[dict]: + """환적 의심 pair에 대해 점수 산출 + severity 반환. + + 필수 조건 실패 시 None. WATCH 이상이면 dict 반환. + """ + # 필수 1: 한국 관할 수역 + if not _is_in_kr_jurisdiction(lat, lon): + return None + # 필수 2: 선종 필터 + if not _is_candidate_ship_type( + vessel_info_a.get('vessel_type'), + vessel_info_b.get('vessel_type'), + ): + return None + # 필수 3: 어구 제외 + if _is_gear_name(vessel_info_a.get('name')) or _is_gear_name(vessel_info_b.get('name')): + return None + # 필수 4: 지속 시간 + first_seen = meta.get('first_seen') + if first_seen is None: + return None + duration_min = int((now - first_seen).total_seconds() / 60) + if duration_min < SUSPECT_DURATION_MIN: + return None + + score = 40 # base + + # 야간 가점 (KST 20:00~04:00) + if now_kst_hour >= 20 or now_kst_hour < 4: + score += 15 + + # 무허가 가점 + if is_permitted_fn is not None: + try: + if not is_permitted_fn(pair[0]) or not is_permitted_fn(pair[1]): + score += 20 + except Exception: + pass + + # COG 편차 (같은 방향 아니면 가점 — 나란히 가는 선단 배제) + if cog_a is not None and cog_b is not None: + try: + diff = abs(float(cog_a) - float(cog_b)) + if diff > 180: + diff = 360 - diff + if diff > 45: + score += 20 + except Exception: + pass + + # 지속 길이 추가 가점 + if duration_min >= 90: + score += 20 + + # 영해/접속수역 추가 가점 + if zone_code in ('TERRITORIAL_SEA', 'CONTIGUOUS_ZONE'): + score += 15 + + if score >= 90: + severity = 'CRITICAL' + elif score >= 70: + severity = 'HIGH' + elif score >= 50: + severity = 'WATCH' + else: + return None + + return { + 'pair_a': pair[0], + 'pair_b': pair[1], + 'duration_min': duration_min, + 'severity': severity, + 'score': score, + 'lat': lat, + 'lon': lon, + } + + def detect_transshipment( df: pd.DataFrame, - pair_history: dict[tuple[str, str], datetime], -) -> list[tuple[str, str, int]]: - """환적 의심 쌍 탐지. + pair_history: dict, + get_vessel_info: Optional[Callable[[str], dict]] = None, + is_permitted: Optional[Callable[[str], bool]] = None, + classify_zone_fn: Optional[Callable[[float, float], dict]] = None, + now_kst_hour: int = 0, +) -> list[dict]: + """환적 의심 쌍 탐지 (점수 기반, 베테랑 관점 필터). Args: df: 선박 위치 DataFrame. 필수 컬럼: mmsi, lat, lon, sog - 선택 컬럼: ship_type (없으면 전체 선종 허용) - pair_history: 쌍별 최초 탐지 시각을 저장하는 영속 dict. - 스케줄러에서 호출 간 유지하여 전달해야 한다. - 키: (mmsi_a, mmsi_b) — mmsi_a < mmsi_b 정규화 적용. - 값: 최초 탐지 시각 (UTC datetime, timezone-aware). + 선택 컬럼: cog + pair_history: {(a,b): {'first_seen', 'last_seen', 'miss_count'}} + get_vessel_info: callable(mmsi) -> {'name', 'vessel_type', ...} + is_permitted: callable(mmsi) -> bool + classify_zone_fn: callable(lat, lon) -> dict (zone 판정) + now_kst_hour: 현재 KST 시각 (0~23) Returns: - [(mmsi_a, mmsi_b, duration_minutes), ...] — 60분 이상 지속된 의심 쌍. - mmsi_a < mmsi_b 정규화 적용. + list[dict] — severity 'CRITICAL'/'HIGH'/'WATCH' 포함 의심 쌍 """ if df.empty: return [] @@ -147,22 +284,15 @@ def detect_transshipment( now = datetime.now(timezone.utc) - # ── 1. 후보 선박 필터 ────────────────────────────────────── - has_type_col = 'ship_type' in df.columns - + # ── 1. 후보 선박 필터 (SOG < 1.0) ───────────────────────── candidate_mask = df['sog'] < SOG_THRESHOLD_KN - - if has_type_col: - type_mask = df['ship_type'].apply(_normalize_type).isin(_CANDIDATE_TYPES) - candidate_mask = candidate_mask & type_mask - candidates = df[candidate_mask].copy() if candidates.empty: _evict_expired_pairs(pair_history, now) return [] - # 외국 해안 근처 제외 + # 외국 해안 근처 제외 (1차 필터) coast_mask = candidates.apply( lambda row: not _is_near_foreign_coast(row['lat'], row['lon']), axis=1, @@ -173,62 +303,162 @@ def detect_transshipment( _evict_expired_pairs(pair_history, now) return [] - records = candidates[['mmsi', 'lat', 'lon']].to_dict('records') + has_cog = 'cog' in candidates.columns + cols = ['mmsi', 'lat', 'lon'] + if has_cog: + cols.append('cog') + records = candidates[cols].to_dict('records') for rec in records: rec['mmsi'] = str(rec['mmsi']) - # ── 2. 그리드 기반 근접 쌍 탐지 ────────────────────────── + # ── 2. 그리드 기반 근접 쌍 탐지 (77m) ─────────────────── grid = _build_grid(records) - active_pairs: set[tuple[str, str]] = set() + active_pairs: dict[tuple[str, str], dict] = {} + + def _try_add_pair(a_rec, b_rec): + if not _within_proximity(a_rec, b_rec): + return + key = _pair_key(a_rec['mmsi'], b_rec['mmsi']) + # 중점 좌표 (점수 산출용) + mid_lat = (a_rec['lat'] + b_rec['lat']) / 2.0 + mid_lon = (a_rec['lon'] + b_rec['lon']) / 2.0 + active_pairs[key] = { + 'lat': mid_lat, 'lon': mid_lon, + 'cog_a': a_rec.get('cog'), 'cog_b': b_rec.get('cog'), + # mmsi_a < mmsi_b 순서로 정렬되었으므로 cog도 맞춰 정렬 필요 + 'mmsi_a': a_rec['mmsi'], 'mmsi_b': b_rec['mmsi'], + } for (row, col), indices in grid.items(): - # 현재 셀 내부 쌍 for i in range(len(indices)): for j in range(i + 1, len(indices)): - a = records[indices[i]] - b = records[indices[j]] - if _within_proximity(a, b): - active_pairs.add(_pair_key(a['mmsi'], b['mmsi'])) - - # 인접 셀 (우측 3셀 + 아래 3셀 = 중복 없는 방향성 순회) + _try_add_pair(records[indices[i]], records[indices[j]]) for dr, dc in ((0, 1), (1, -1), (1, 0), (1, 1)): neighbor_key = (row + dr, col + dc) if neighbor_key not in grid: continue for ai in indices: for bi in grid[neighbor_key]: - a = records[ai] - b = records[bi] - if _within_proximity(a, b): - active_pairs.add(_pair_key(a['mmsi'], b['mmsi'])) + _try_add_pair(records[ai], records[bi]) - # ── 3. pair_history 갱신 ───────────────────────────────── - # 현재 활성 쌍 → 최초 탐지 시각 등록 - for pair in active_pairs: - if pair not in pair_history: - pair_history[pair] = now + # ── 3. pair_history 갱신 (gap tolerance) ───────────────── + active_keys = set(active_pairs.keys()) - # 비활성 쌍 → pair_history에서 제거 (다음 접근 시 재시작) - inactive = [key for key in pair_history if key not in active_pairs] - for key in inactive: - del pair_history[key] + # 활성 쌍 → 등록/갱신 + for pair in active_keys: + if pair not in pair_history or not isinstance(pair_history[pair], dict): + pair_history[pair] = { + 'first_seen': now, + 'last_seen': now, + 'miss_count': 0, + } + else: + pair_history[pair]['last_seen'] = now + pair_history[pair]['miss_count'] = 0 - # 만료 항목 정리 (비활성 제거 후 잔여 방어용) + # 비활성 쌍 → miss_count++ , GAP_TOLERANCE 초과 시 삭제 + for key in list(pair_history.keys()): + if key in active_keys: + continue + meta = pair_history[key] + if not isinstance(meta, dict): + del pair_history[key] + continue + meta['miss_count'] = meta.get('miss_count', 0) + 1 + if meta['miss_count'] > GAP_TOLERANCE_CYCLES: + del pair_history[key] + + # 만료 정리 _evict_expired_pairs(pair_history, now) - # ── 4. 의심 쌍 판정 ────────────────────────────────────── - suspects: list[tuple[str, str, int]] = [] + # ── 4. 점수 기반 의심 쌍 판정 ───────────────────────────── + suspects: list[dict] = [] + rejected_jurisdiction = 0 + rejected_ship_type = 0 + rejected_gear = 0 + rejected_duration = 0 + + for pair, meta in pair_history.items(): + if not isinstance(meta, dict): + continue + first_seen = meta.get('first_seen') + if first_seen is None: + continue + + # active_pairs에 있으면 해당 사이클 좌표·cog 사용, 없으면 이전 값 재사용 (miss 중) + loc_meta = active_pairs.get(pair) + if loc_meta is not None: + lat = loc_meta['lat'] + lon = loc_meta['lon'] + # mmsi_a, mmsi_b 순서를 pair 순서에 맞춤 + if loc_meta['mmsi_a'] == pair[0]: + cog_a, cog_b = loc_meta.get('cog_a'), loc_meta.get('cog_b') + else: + cog_a, cog_b = loc_meta.get('cog_b'), loc_meta.get('cog_a') + meta['last_lat'] = lat + meta['last_lon'] = lon + meta['last_cog_a'] = cog_a + meta['last_cog_b'] = cog_b + else: + lat = meta.get('last_lat') + lon = meta.get('last_lon') + cog_a = meta.get('last_cog_a') + cog_b = meta.get('last_cog_b') + if lat is None or lon is None: + continue + + # 선박 정보 조회 + info_a = get_vessel_info(pair[0]) if get_vessel_info else {} + info_b = get_vessel_info(pair[1]) if get_vessel_info else {} + + # 짧게 pre-check (로깅용) + if not _is_in_kr_jurisdiction(lat, lon): + rejected_jurisdiction += 1 + continue + if not _is_candidate_ship_type(info_a.get('vessel_type'), info_b.get('vessel_type')): + rejected_ship_type += 1 + continue + if _is_gear_name(info_a.get('name')) or _is_gear_name(info_b.get('name')): + rejected_gear += 1 + continue - for pair, first_seen in pair_history.items(): duration_min = int((now - first_seen).total_seconds() / 60) - if duration_min >= SUSPECT_DURATION_MIN: - suspects.append((pair[0], pair[1], duration_min)) + if duration_min < SUSPECT_DURATION_MIN: + rejected_duration += 1 + continue - if suspects: - logger.info( - 'transshipment detection: %d suspect pairs (candidates=%d)', - len(suspects), - len(candidates), + zone_code = None + if classify_zone_fn is not None: + try: + zone_code = classify_zone_fn(lat, lon).get('zone') + except Exception: + pass + + scored = _score_pair( + pair, meta, lat, lon, cog_a, cog_b, + info_a, info_b, is_permitted, + now_kst_hour, zone_code, now, ) + if scored is not None: + suspects.append(scored) + + tier_counts = {'CRITICAL': 0, 'HIGH': 0, 'WATCH': 0} + for s in suspects: + tier_counts[s['severity']] = tier_counts.get(s['severity'], 0) + 1 + + logger.info( + 'transshipment detection: pairs=%d (critical=%d, high=%d, watch=%d, ' + 'rejected_jurisdiction=%d, rejected_ship_type=%d, rejected_gear=%d, ' + 'rejected_duration=%d, candidates=%d)', + len(suspects), + tier_counts.get('CRITICAL', 0), + tier_counts.get('HIGH', 0), + tier_counts.get('WATCH', 0), + rejected_jurisdiction, + rejected_ship_type, + rejected_gear, + rejected_duration, + len(candidates), + ) return suspects diff --git a/prediction/output/event_generator.py b/prediction/output/event_generator.py index 887959a..e1ad2d4 100644 --- a/prediction/output/event_generator.py +++ b/prediction/output/event_generator.py @@ -23,13 +23,13 @@ EVENTS_TABLE = qualified_table('prediction_events') # 사이클이 5분 간격이므로 5의 배수를 피해서 boundary 일제 만료 패턴을 회피한다. DEDUP_WINDOWS = { 'EEZ_INTRUSION': 33, - 'DARK_VESSEL': 127, + 'DARK_VESSEL': 131, # 127 → 131 (2h boundary 회피) 'FLEET_CLUSTER': 367, - 'ILLEGAL_TRANSSHIP': 67, + 'ILLEGAL_TRANSSHIP': 181, # 67 → 181 (환적은 장기 이벤트) 'MMSI_TAMPERING': 33, 'AIS_LOSS': 127, 'SPEED_ANOMALY': 67, - 'ZONE_DEPARTURE': 127, + 'ZONE_DEPARTURE': 89, # 127 → 89 (boundary 분산) 'GEAR_ILLEGAL': 367, 'AIS_RESUME': 67, 'HIGH_RISK_VESSEL': 67, @@ -66,11 +66,25 @@ RULES = [ 'title_fn': lambda r: f"고위험 선박 탐지 (위험도 {r.get('risk_score', 0)})", }, { - 'name': 'dark_vessel_long', - 'condition': lambda r: r.get('is_dark') and (r.get('gap_duration_min', 0) or 0) > 60, + # dark 의심 CRITICAL — 점수 70+ (반복·민감수역·이동중·거리이상 등 복합) + 'name': 'dark_critical', + 'condition': lambda r: (r.get('features') or {}).get('dark_tier') == 'CRITICAL', + 'level': 'CRITICAL', + 'category': 'DARK_VESSEL', + 'title_fn': lambda r: ( + f"고의 AIS 소실 의심 (점수 {(r.get('features') or {}).get('dark_suspicion_score', 0)}, " + f"7일 {(r.get('features') or {}).get('dark_history_7d', 0)}회)" + ), + }, + { + # dark 의심 HIGH — 점수 50~69 + 'name': 'dark_high', + 'condition': lambda r: (r.get('features') or {}).get('dark_tier') == 'HIGH', 'level': 'HIGH', 'category': 'DARK_VESSEL', - 'title_fn': lambda r: f"다크베셀 장기 소실 ({r.get('gap_duration_min', 0)}분)", + 'title_fn': lambda r: ( + f"AIS 소실 의심 (점수 {(r.get('features') or {}).get('dark_suspicion_score', 0)})" + ), }, { 'name': 'spoofing', @@ -80,11 +94,26 @@ RULES = [ 'title_fn': lambda r: f"GPS/MMSI 조작 의심 (점수 {r.get('spoofing_score', 0):.2f})", }, { - 'name': 'transship', - 'condition': lambda r: r.get('transship_suspect'), + # 환적 의심 CRITICAL — 점수 90+ + 'name': 'transship_critical', + 'condition': lambda r: (r.get('features') or {}).get('transship_tier') == 'CRITICAL', + 'level': 'CRITICAL', + 'category': 'ILLEGAL_TRANSSHIP', + 'title_fn': lambda r: ( + f"환적 의심 (점수 {(r.get('features') or {}).get('transship_score', 0)}, " + f"{r.get('transship_duration_min', 0)}분, 상대 {r.get('transship_pair_mmsi', '?')})" + ), + }, + { + # 환적 의심 HIGH — 점수 70~89 + 'name': 'transship_high', + 'condition': lambda r: (r.get('features') or {}).get('transship_tier') == 'HIGH', 'level': 'HIGH', 'category': 'ILLEGAL_TRANSSHIP', - 'title_fn': lambda r: f"환적 의심 (상대: {r.get('transship_pair_mmsi', '미상')})", + 'title_fn': lambda r: ( + f"환적 의심 (점수 {(r.get('features') or {}).get('transship_score', 0)}, " + f"{r.get('transship_duration_min', 0)}분)" + ), }, { 'name': 'fleet_cluster', diff --git a/prediction/scheduler.py b/prediction/scheduler.py index e8d4255..5b2e65e 100644 --- a/prediction/scheduler.py +++ b/prediction/scheduler.py @@ -1,14 +1,18 @@ import logging import time -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from typing import Optional +from zoneinfo import ZoneInfo from apscheduler.schedulers.background import BackgroundScheduler from config import settings +from fleet_tracker import GEAR_PATTERN logger = logging.getLogger(__name__) +_KST = ZoneInfo('Asia/Seoul') + _scheduler: Optional[BackgroundScheduler] = None _last_run: dict = { 'timestamp': None, @@ -20,6 +24,53 @@ _last_run: dict = { _transship_pair_history: dict = {} +# 한국 선박 MMSI prefix — dark 판별 완전 제외 +_KR_DOMESTIC_PREFIXES = ('440', '441') + + +def _is_dark_excluded(mmsi: str, name: str) -> tuple[bool, str]: + """dark 탐지 대상에서 완전 제외할지. 어구/한국선만 필터. + + 사용자 알람은 선박만 대상, 한국선은 해경 관할 아님. + """ + if any(mmsi.startswith(p) for p in _KR_DOMESTIC_PREFIXES): + return True, 'kr_domestic' + if name and GEAR_PATTERN.match(name): + return True, 'gear_signal' + return False, '' + + +def _fetch_dark_history(kcg_conn, mmsi_list: list[str]) -> dict[str, dict]: + """최근 7일 내 is_dark=True 이력을 mmsi별로 집계. + + 사이클 시작 시 한 번에 조회하여 점수 계산 시 재사용. + """ + if not mmsi_list: + return {} + try: + cur = kcg_conn.cursor() + cur.execute( + """ + SELECT mmsi, + count(*) AS n7, + count(*) FILTER (WHERE analyzed_at > now() - interval '24 hours') AS n24, + max(analyzed_at) AS last_at + FROM kcg.vessel_analysis_results + WHERE is_dark = true + AND analyzed_at > now() - interval '7 days' + AND mmsi = ANY(%s) + GROUP BY mmsi + """, + (list(mmsi_list),), + ) + return { + str(m): {'count_7d': int(n7 or 0), 'count_24h': int(n24 or 0), 'last_at': t} + for m, n7, n24, t in cur.fetchall() + } + except Exception as e: + logger.warning('fetch_dark_history failed: %s', e) + return {} + def get_last_run() -> dict: return _last_run.copy() @@ -27,13 +78,12 @@ def get_last_run() -> dict: def run_analysis_cycle(): """5분 주기 분석 사이클 — 인메모리 캐시 기반.""" - import re as _re from cache.vessel_store import vessel_store from db import snpdb, kcgdb from pipeline.orchestrator import ChineseFishingVesselPipeline from algorithms.location import classify_zone from algorithms.fishing_pattern import compute_ucaf_score, compute_ucft_score - from algorithms.dark_vessel import is_dark_vessel + from algorithms.dark_vessel import is_dark_vessel, analyze_dark_pattern, compute_dark_suspicion from algorithms.spoofing import compute_spoofing_score, count_speed_jumps, compute_bd09_offset from algorithms.risk import compute_vessel_risk_score from fleet_tracker import fleet_tracker @@ -75,7 +125,6 @@ def run_analysis_cycle(): return # 4. 등록 선단 기반 fleet 분석 - _gear_re = _re.compile(r'^.+_(?=\S*\d)\S+(?:[_ ]\S*)*[_ ]*$|^\d+$|^.+%$') with kcgdb.get_conn() as kcg_conn: fleet_tracker.load_registry(kcg_conn) @@ -92,7 +141,7 @@ def run_analysis_cycle(): fleet_tracker.match_ais_to_registry(all_ais, kcg_conn) - gear_signals = [v for v in all_ais if _gear_re.match(v.get('name', ''))] + gear_signals = [v for v in all_ais if GEAR_PATTERN.match(v.get('name', '') or '')] fleet_tracker.track_gear_identity(gear_signals, kcg_conn) fleet_roles = fleet_tracker.build_fleet_clusters(vessel_dfs) @@ -152,6 +201,15 @@ def run_analysis_cycle(): logger.warning('gear correlation failed: %s', e) # 5. 선박별 추가 알고리즘 → AnalysisResult 생성 + # dark 이력 일괄 조회 (7일 history) — 사이클당 1회 + now_kst_hour = datetime.now(_KST).hour + all_chinese = vessel_store.get_chinese_mmsis() + with kcgdb.get_conn() as hist_conn: + dark_history_map = _fetch_dark_history(hist_conn, list(all_chinese)) + + pipeline_dark_tiers = {'CRITICAL': 0, 'HIGH': 0, 'WATCH': 0, 'NONE': 0} + pipeline_skip_counts = {'kr_domestic': 0, 'gear_signal': 0} + results = [] for c in classifications: mmsi = c['mmsi'] @@ -169,7 +227,42 @@ def run_analysis_cycle(): ucaf = compute_ucaf_score(df_v, gear) ucft = compute_ucft_score(df_v) - dark, gap_min = is_dark_vessel(df_v) + # ── Dark: 넓은 탐지 + 의도적 OFF 의심 점수화 ── + vname = vessel_store.get_vessel_info(mmsi).get('name', '') or '' + is_permitted = vessel_store.is_permitted(mmsi) + dark_excluded, dark_skip_reason = _is_dark_excluded(mmsi, vname) + if dark_excluded: + pipeline_skip_counts[dark_skip_reason] = pipeline_skip_counts.get(dark_skip_reason, 0) + 1 + dark = False + gap_min = 0 + dark_features: dict = { + 'dark_suspicion_score': 0, + 'dark_patterns': [], + 'dark_tier': 'EXCLUDED', + 'dark_history_7d': 0, + 'dark_history_24h': 0, + } + else: + gap_info = analyze_dark_pattern(df_v) + dark = bool(gap_info.get('is_dark')) + gap_min = int(gap_info.get('gap_min') or 0) + history = dark_history_map.get(mmsi, {'count_7d': 0, 'count_24h': 0}) + score, patterns, tier = compute_dark_suspicion( + gap_info, mmsi, is_permitted, history, + now_kst_hour, classify_zone, + ) + pipeline_dark_tiers[tier] = pipeline_dark_tiers.get(tier, 0) + 1 + dark_features = { + 'dark_suspicion_score': score, + 'dark_patterns': patterns, + 'dark_tier': tier, + 'dark_history_7d': int(history.get('count_7d', 0) or 0), + 'dark_history_24h': int(history.get('count_24h', 0) or 0), + 'gap_start_lat': gap_info.get('gap_start_lat'), + 'gap_start_lon': gap_info.get('gap_start_lon'), + 'gap_start_sog': gap_info.get('gap_start_sog'), + 'gap_start_state': gap_info.get('gap_start_state'), + } spoof_score = compute_spoofing_score(df_v) speed_jumps = count_speed_jumps(df_v) @@ -177,7 +270,6 @@ def run_analysis_cycle(): fleet_info = fleet_roles.get(mmsi, {}) - is_permitted = vessel_store.is_permitted(mmsi) risk_score, risk_level = compute_vessel_risk_score( mmsi, df_v, zone_info, is_permitted=is_permitted, ) @@ -186,6 +278,8 @@ def run_analysis_cycle(): if 'state' in df_v.columns and len(df_v) > 0: activity = df_v['state'].mode().iloc[0] + merged_features = {**(c.get('features', {}) or {}), **dark_features} + results.append(AnalysisResult( mmsi=mmsi, timestamp=ts, @@ -209,9 +303,14 @@ def run_analysis_cycle(): fleet_role=fleet_info.get('fleet_role', 'NOISE'), risk_score=risk_score, risk_level=risk_level, - features=c.get('features', {}), + features=merged_features, )) + logger.info( + 'pipeline dark: tiers=%s skip=%s', + pipeline_dark_tiers, pipeline_skip_counts, + ) + # ── 5.5 경량 분석 — 파이프라인 미통과 412* 선박 ── # vessel_store._tracks의 24h 누적 궤적을 직접 활용하여 dark/spoof 신호도 산출. from algorithms.risk import compute_lightweight_risk_score @@ -225,6 +324,8 @@ def run_analysis_cycle(): lw_count = 0 lw_dark = 0 lw_spoof = 0 + lw_dark_tiers = {'CRITICAL': 0, 'HIGH': 0, 'WATCH': 0, 'NONE': 0} + lw_dark_skip = {'kr_domestic': 0, 'gear_signal': 0} for mmsi in lightweight_mmsis: pos = all_positions.get(mmsi) if pos is None or pos.get('lat') is None: @@ -242,40 +343,71 @@ def run_analysis_cycle(): else: state = 'SAILING' - # 24h 누적 궤적으로 dark/spoofing 산출 (vessel_store._tracks 직접 접근) - df_v = vessel_store._tracks.get(mmsi) - dark = False - gap_min = 0 - spoof_score = 0.0 - speed_jumps = 0 - if df_v is not None and len(df_v) >= 2: - try: - dark, gap_min = is_dark_vessel(df_v) - except Exception: - pass - try: - spoof_score = compute_spoofing_score(df_v) - except Exception: - pass - try: - speed_jumps = count_speed_jumps(df_v) - except Exception: - pass - # 핫픽스 (2026-04-08): 한국 AIS 수신 가능 영역 밖에서의 dark 판정은 오탐. - # 412* 중국 선박이 자국 EEZ로 깊이 들어가면(~124°E 서쪽) 한국 수신소 - # 도달 한계로 자연 gap 발생. 해당 영역 밖은 dark에서 제외한다. - # 영역: 북위 32~39.5, 동경 124~132 (한반도 + EEZ + 접속수역 여유 포함) - if dark: - in_kr_reception = (124.0 <= lon <= 132.0) and (32.0 <= lat <= 39.5) - if not in_kr_reception: - dark = False - gap_min = 0 + is_permitted = vessel_store.is_permitted(mmsi) + vname = vessel_store.get_vessel_info(mmsi).get('name', '') or '' + + # ── Dark: 사전 필터 (어구/한국선) ── + dark_excluded, dark_skip_reason = _is_dark_excluded(mmsi, vname) + if dark_excluded: + lw_dark_skip[dark_skip_reason] = lw_dark_skip.get(dark_skip_reason, 0) + 1 + dark = False + gap_min = 0 + dark_features: dict = { + 'dark_suspicion_score': 0, + 'dark_patterns': [], + 'dark_tier': 'EXCLUDED', + 'dark_history_7d': 0, + 'dark_history_24h': 0, + } + spoof_score = 0.0 + speed_jumps = 0 + else: + df_v = vessel_store._tracks.get(mmsi) + spoof_score = 0.0 + speed_jumps = 0 + if df_v is not None and len(df_v) >= 2: + try: + spoof_score = compute_spoofing_score(df_v) + except Exception: + pass + try: + speed_jumps = count_speed_jumps(df_v) + except Exception: + pass + try: + gap_info = analyze_dark_pattern(df_v) + except Exception: + gap_info = {'is_dark': False, 'gap_min': 0} + else: + gap_info = {'is_dark': False, 'gap_min': 0} + + dark = bool(gap_info.get('is_dark')) + gap_min = int(gap_info.get('gap_min') or 0) + + history = dark_history_map.get(mmsi, {'count_7d': 0, 'count_24h': 0}) + score, patterns, tier = compute_dark_suspicion( + gap_info, mmsi, is_permitted, history, + now_kst_hour, classify_zone, + ) + lw_dark_tiers[tier] = lw_dark_tiers.get(tier, 0) + 1 + + dark_features = { + 'dark_suspicion_score': score, + 'dark_patterns': patterns, + 'dark_tier': tier, + 'dark_history_7d': int(history.get('count_7d', 0) or 0), + 'dark_history_24h': int(history.get('count_24h', 0) or 0), + 'gap_start_lat': gap_info.get('gap_start_lat'), + 'gap_start_lon': gap_info.get('gap_start_lon'), + 'gap_start_sog': gap_info.get('gap_start_sog'), + 'gap_start_state': gap_info.get('gap_start_state'), + } + if dark: lw_dark += 1 if spoof_score > 0.5: lw_spoof += 1 - is_permitted = vessel_store.is_permitted(mmsi) risk_score, risk_level = compute_lightweight_risk_score( zone_info, sog, is_permitted=is_permitted, is_dark=dark, gap_duration_min=gap_min, @@ -308,27 +440,44 @@ def run_analysis_cycle(): is_transship_suspect=False, transship_pair_mmsi='', transship_duration_min=0, + features=dark_features, )) lw_count += 1 logger.info( - 'lightweight analysis: %d vessels (dark=%d, spoof>0.5=%d)', - lw_count, lw_dark, lw_spoof, + 'lightweight analysis: %d vessels (dark=%d, spoof>0.5=%d, tiers=%s, skip=%s)', + lw_count, lw_dark, lw_spoof, lw_dark_tiers, lw_dark_skip, ) - # 6. 환적 의심 탐지 (pair_history 모듈 레벨로 사이클 간 유지) + # 6. 환적 의심 탐지 (점수 기반, 베테랑 관점 필터) from algorithms.transshipment import detect_transshipment results_map = {r.mmsi: r for r in results} - transship_pairs = detect_transshipment(df_targets, _transship_pair_history) - for mmsi_a, mmsi_b, dur in transship_pairs: - if mmsi_a in results_map: - results_map[mmsi_a].is_transship_suspect = True - results_map[mmsi_a].transship_pair_mmsi = mmsi_b - results_map[mmsi_a].transship_duration_min = dur - if mmsi_b in results_map: - results_map[mmsi_b].is_transship_suspect = True - results_map[mmsi_b].transship_pair_mmsi = mmsi_a - results_map[mmsi_b].transship_duration_min = dur + transship_items = detect_transshipment( + df_targets, + _transship_pair_history, + get_vessel_info=vessel_store.get_vessel_info, + is_permitted=vessel_store.is_permitted, + classify_zone_fn=classify_zone, + now_kst_hour=now_kst_hour, + ) + for item in transship_items: + a = item['pair_a'] + b = item['pair_b'] + dur = item['duration_min'] + tier = item['severity'] + if tier == 'WATCH': + continue # WATCH 등급은 저장 안 함 (로그만) + for m, pair in ((a, b), (b, a)): + if m in results_map: + r_obj = results_map[m] + r_obj.is_transship_suspect = True + r_obj.transship_pair_mmsi = pair + r_obj.transship_duration_min = dur + r_obj.features = { + **(r_obj.features or {}), + 'transship_tier': tier, + 'transship_score': item['score'], + } # 7. 결과 저장 upserted = kcgdb.upsert_results(results)