"""환적(Transshipment) 의심 선박 탐지 — 서버사이드 O(n log n) 구현. 프론트엔드 useKoreaFilters.ts의 O(n²) 근접 탐지를 대체한다. scipy 미설치 환경을 고려하여 그리드 기반 공간 인덱스를 사용한다. 알고리즘 개요: 1. 후보 선박 필터: sog < 2kn, 선종 (tanker/cargo/fishing), 외국 해안선 제외 2. 그리드 셀 기반 근접 쌍 탐지: O(n log n) ← 셀 분할 + 인접 9셀 조회 3. pair_history dict로 쌍별 최초 탐지 시각 영속화 (호출 간 유지) 4. 60분 이상 지속 근접 시 의심 쌍으로 판정 """ from __future__ import annotations import logging import math from datetime import datetime, timezone from typing import Callable, Optional import pandas as pd from fleet_tracker import GEAR_PATTERN logger = logging.getLogger(__name__) # ────────────────────────────────────────────────────────────── # 상수 (2026-04-09 재조정 — 베테랑 관점) # ────────────────────────────────────────────────────────────── SOG_THRESHOLD_KN = 1.0 # 2.0 → 1.0 (완전 정박 수준) PROXIMITY_DEG = 0.0007 # 0.001 → 0.0007 (~77m, GPS 노이즈 포함한 근접) SUSPECT_DURATION_MIN = 45 # 60 → 45 (gap tolerance 있음) PAIR_EXPIRY_MIN = 180 # 120 → 180 GAP_TOLERANCE_CYCLES = 2 # 신규: 2 사이클까지 active에서 빠져도 리셋 안 함 # 외국 해안 근접 제외 경계 (레거시 — 관할 필터로 대체됨) _CN_LON_MAX = 123.5 _JP_LON_MIN = 130.5 _TSUSHIMA_LAT_MIN = 33.8 _TSUSHIMA_LON_MIN = 129.0 # 한국 EEZ 관할 수역 (단속 가능 범위) _KR_EEZ_LAT = (32.0, 39.5) _KR_EEZ_LON = (124.0, 132.0) # 환적 불가능 선종 (여객/군함/유조/도선/예인/수색구조) _TRANSSHIP_EXCLUDED: frozenset[str] = frozenset({ 'passenger', 'military', 'tanker', 'pilot', 'tug', 'sar', }) # 그리드 셀 크기 _GRID_CELL_DEG = PROXIMITY_DEG # ────────────────────────────────────────────────────────────── # 내부 헬퍼 # ────────────────────────────────────────────────────────────── def _is_near_foreign_coast(lat: float, lon: float) -> bool: """외국 해안 근처 여부 — 중국/일본/대마도 경계 확인.""" if lon < _CN_LON_MAX: return True if lon > _JP_LON_MIN: return True if lat > _TSUSHIMA_LAT_MIN and lon > _TSUSHIMA_LON_MIN: return True return False def _is_in_kr_jurisdiction(lat: float, lon: float) -> bool: """한국 EEZ 관할 수역 여부 (단속 가능 범위).""" return (_KR_EEZ_LAT[0] <= lat <= _KR_EEZ_LAT[1] and _KR_EEZ_LON[0] <= lon <= _KR_EEZ_LON[1]) def _is_candidate_ship_type(vessel_type_a: Optional[str], vessel_type_b: Optional[str]) -> bool: """환적 후보 선종인지 (명시적 제외만 차단, 미상은 허용).""" a = (vessel_type_a or '').strip().lower() b = (vessel_type_b or '').strip().lower() if a in _TRANSSHIP_EXCLUDED or b in _TRANSSHIP_EXCLUDED: return False return True def _is_gear_name(name: Optional[str]) -> bool: """어구 이름 패턴 매칭 — fleet_tracker.GEAR_PATTERN SSOT.""" if not name: return False return bool(GEAR_PATTERN.match(name)) def _cell_key(lat: float, lon: float) -> tuple[int, int]: """위도/경도를 그리드 셀 인덱스로 변환.""" return (int(math.floor(lat / _GRID_CELL_DEG)), int(math.floor(lon / _GRID_CELL_DEG))) def _build_grid(records: list[dict]) -> dict[tuple[int, int], list[int]]: """선박 리스트를 그리드 셀로 분류. Returns: {(row, col): [record index, ...]} """ grid: dict[tuple[int, int], list[int]] = {} for idx, rec in enumerate(records): key = _cell_key(rec['lat'], rec['lon']) if key not in grid: grid[key] = [] grid[key].append(idx) return grid def _within_proximity(a: dict, b: dict) -> bool: """두 선박이 PROXIMITY_DEG 이내인지 확인 (위경도 직교 근사).""" dlat = abs(a['lat'] - b['lat']) if dlat >= PROXIMITY_DEG: return False cos_lat = math.cos(math.radians((a['lat'] + b['lat']) / 2.0)) dlon_scaled = abs(a['lon'] - b['lon']) * cos_lat return dlon_scaled < PROXIMITY_DEG def _normalize_type(raw: Optional[str]) -> str: """선종 문자열 소문자 정규화.""" if not raw: return '' return raw.strip().lower() def _pair_key(mmsi_a: str, mmsi_b: str) -> tuple[str, str]: """MMSI 순서를 정규화하여 중복 쌍 방지.""" return (mmsi_a, mmsi_b) if mmsi_a < mmsi_b else (mmsi_b, mmsi_a) def _evict_expired_pairs( pair_history: dict, now: datetime, ) -> None: """PAIR_EXPIRY_MIN 이상 갱신 없는 pair_history 항목 제거. 새 구조: {(a,b): {'first_seen': dt, 'last_seen': dt, 'miss_count': int}} """ expired = [] for key, meta in pair_history.items(): if not isinstance(meta, dict): # 레거시 구조 (datetime 직접 저장)는 즉시 제거 → 다음 사이클에서 재구성 expired.append(key) continue last_seen = meta.get('last_seen') or meta.get('first_seen') if last_seen is None: expired.append(key) continue if (now - last_seen).total_seconds() / 60 > PAIR_EXPIRY_MIN: expired.append(key) for key in expired: del pair_history[key] # ────────────────────────────────────────────────────────────── # 공개 API # ────────────────────────────────────────────────────────────── def _score_pair( pair: tuple[str, str], meta: dict, lat: float, lon: float, cog_a: Optional[float], cog_b: Optional[float], vessel_info_a: dict, vessel_info_b: dict, is_permitted_fn: Optional[Callable[[str], bool]], now_kst_hour: int, zone_code: Optional[str], now: datetime, ) -> Optional[dict]: """환적 의심 pair에 대해 점수 산출 + severity 반환. 필수 조건 실패 시 None. WATCH 이상이면 dict 반환. """ # 필수 1: 한국 관할 수역 if not _is_in_kr_jurisdiction(lat, lon): return None # 필수 2: 선종 필터 if not _is_candidate_ship_type( vessel_info_a.get('vessel_type'), vessel_info_b.get('vessel_type'), ): return None # 필수 3: 어구 제외 if _is_gear_name(vessel_info_a.get('name')) or _is_gear_name(vessel_info_b.get('name')): return None # 필수 4: 지속 시간 first_seen = meta.get('first_seen') if first_seen is None: return None duration_min = int((now - first_seen).total_seconds() / 60) if duration_min < SUSPECT_DURATION_MIN: return None score = 40 # base # 야간 가점 (KST 20:00~04:00) if now_kst_hour >= 20 or now_kst_hour < 4: score += 15 # 무허가 가점 if is_permitted_fn is not None: try: if not is_permitted_fn(pair[0]) or not is_permitted_fn(pair[1]): score += 20 except Exception: pass # COG 편차 (같은 방향 아니면 가점 — 나란히 가는 선단 배제) if cog_a is not None and cog_b is not None: try: diff = abs(float(cog_a) - float(cog_b)) if diff > 180: diff = 360 - diff if diff > 45: score += 20 except Exception: pass # 지속 길이 추가 가점 if duration_min >= 90: score += 20 # 영해/접속수역 추가 가점 if zone_code in ('TERRITORIAL_SEA', 'CONTIGUOUS_ZONE'): score += 15 if score >= 90: severity = 'CRITICAL' elif score >= 70: severity = 'HIGH' elif score >= 50: severity = 'WATCH' else: return None return { 'pair_a': pair[0], 'pair_b': pair[1], 'duration_min': duration_min, 'severity': severity, 'score': score, 'lat': lat, 'lon': lon, } def detect_transshipment( df: pd.DataFrame, pair_history: dict, get_vessel_info: Optional[Callable[[str], dict]] = None, is_permitted: Optional[Callable[[str], bool]] = None, classify_zone_fn: Optional[Callable[[float, float], dict]] = None, now_kst_hour: int = 0, ) -> list[dict]: """환적 의심 쌍 탐지 (점수 기반, 베테랑 관점 필터). Args: df: 선박 위치 DataFrame. 필수 컬럼: mmsi, lat, lon, sog 선택 컬럼: cog pair_history: {(a,b): {'first_seen', 'last_seen', 'miss_count'}} get_vessel_info: callable(mmsi) -> {'name', 'vessel_type', ...} is_permitted: callable(mmsi) -> bool classify_zone_fn: callable(lat, lon) -> dict (zone 판정) now_kst_hour: 현재 KST 시각 (0~23) Returns: list[dict] — severity 'CRITICAL'/'HIGH'/'WATCH' 포함 의심 쌍 """ if df.empty: return [] required_cols = {'mmsi', 'lat', 'lon', 'sog'} missing = required_cols - set(df.columns) if missing: logger.error('detect_transshipment: missing required columns: %s', missing) return [] now = datetime.now(timezone.utc) # ── 1. 후보 선박 필터 (SOG < 1.0) ───────────────────────── candidate_mask = df['sog'] < SOG_THRESHOLD_KN candidates = df[candidate_mask].copy() if candidates.empty: _evict_expired_pairs(pair_history, now) return [] # 외국 해안 근처 제외 (1차 필터) coast_mask = candidates.apply( lambda row: not _is_near_foreign_coast(row['lat'], row['lon']), axis=1, ) candidates = candidates[coast_mask] if len(candidates) < 2: _evict_expired_pairs(pair_history, now) return [] has_cog = 'cog' in candidates.columns cols = ['mmsi', 'lat', 'lon'] if has_cog: cols.append('cog') records = candidates[cols].to_dict('records') for rec in records: rec['mmsi'] = str(rec['mmsi']) # ── 2. 그리드 기반 근접 쌍 탐지 (77m) ─────────────────── grid = _build_grid(records) active_pairs: dict[tuple[str, str], dict] = {} def _try_add_pair(a_rec, b_rec): if not _within_proximity(a_rec, b_rec): return key = _pair_key(a_rec['mmsi'], b_rec['mmsi']) # 중점 좌표 (점수 산출용) mid_lat = (a_rec['lat'] + b_rec['lat']) / 2.0 mid_lon = (a_rec['lon'] + b_rec['lon']) / 2.0 active_pairs[key] = { 'lat': mid_lat, 'lon': mid_lon, 'cog_a': a_rec.get('cog'), 'cog_b': b_rec.get('cog'), # mmsi_a < mmsi_b 순서로 정렬되었으므로 cog도 맞춰 정렬 필요 'mmsi_a': a_rec['mmsi'], 'mmsi_b': b_rec['mmsi'], } for (row, col), indices in grid.items(): for i in range(len(indices)): for j in range(i + 1, len(indices)): _try_add_pair(records[indices[i]], records[indices[j]]) for dr, dc in ((0, 1), (1, -1), (1, 0), (1, 1)): neighbor_key = (row + dr, col + dc) if neighbor_key not in grid: continue for ai in indices: for bi in grid[neighbor_key]: _try_add_pair(records[ai], records[bi]) # ── 3. pair_history 갱신 (gap tolerance) ───────────────── active_keys = set(active_pairs.keys()) # 활성 쌍 → 등록/갱신 for pair in active_keys: if pair not in pair_history or not isinstance(pair_history[pair], dict): pair_history[pair] = { 'first_seen': now, 'last_seen': now, 'miss_count': 0, } else: pair_history[pair]['last_seen'] = now pair_history[pair]['miss_count'] = 0 # 비활성 쌍 → miss_count++ , GAP_TOLERANCE 초과 시 삭제 for key in list(pair_history.keys()): if key in active_keys: continue meta = pair_history[key] if not isinstance(meta, dict): del pair_history[key] continue meta['miss_count'] = meta.get('miss_count', 0) + 1 if meta['miss_count'] > GAP_TOLERANCE_CYCLES: del pair_history[key] # 만료 정리 _evict_expired_pairs(pair_history, now) # ── 4. 점수 기반 의심 쌍 판정 ───────────────────────────── suspects: list[dict] = [] rejected_jurisdiction = 0 rejected_ship_type = 0 rejected_gear = 0 rejected_duration = 0 for pair, meta in pair_history.items(): if not isinstance(meta, dict): continue first_seen = meta.get('first_seen') if first_seen is None: continue # active_pairs에 있으면 해당 사이클 좌표·cog 사용, 없으면 이전 값 재사용 (miss 중) loc_meta = active_pairs.get(pair) if loc_meta is not None: lat = loc_meta['lat'] lon = loc_meta['lon'] # mmsi_a, mmsi_b 순서를 pair 순서에 맞춤 if loc_meta['mmsi_a'] == pair[0]: cog_a, cog_b = loc_meta.get('cog_a'), loc_meta.get('cog_b') else: cog_a, cog_b = loc_meta.get('cog_b'), loc_meta.get('cog_a') meta['last_lat'] = lat meta['last_lon'] = lon meta['last_cog_a'] = cog_a meta['last_cog_b'] = cog_b else: lat = meta.get('last_lat') lon = meta.get('last_lon') cog_a = meta.get('last_cog_a') cog_b = meta.get('last_cog_b') if lat is None or lon is None: continue # 선박 정보 조회 info_a = get_vessel_info(pair[0]) if get_vessel_info else {} info_b = get_vessel_info(pair[1]) if get_vessel_info else {} # 짧게 pre-check (로깅용) if not _is_in_kr_jurisdiction(lat, lon): rejected_jurisdiction += 1 continue if not _is_candidate_ship_type(info_a.get('vessel_type'), info_b.get('vessel_type')): rejected_ship_type += 1 continue if _is_gear_name(info_a.get('name')) or _is_gear_name(info_b.get('name')): rejected_gear += 1 continue duration_min = int((now - first_seen).total_seconds() / 60) if duration_min < SUSPECT_DURATION_MIN: rejected_duration += 1 continue zone_code = None if classify_zone_fn is not None: try: zone_code = classify_zone_fn(lat, lon).get('zone') except Exception: pass scored = _score_pair( pair, meta, lat, lon, cog_a, cog_b, info_a, info_b, is_permitted, now_kst_hour, zone_code, now, ) if scored is not None: suspects.append(scored) tier_counts = {'CRITICAL': 0, 'HIGH': 0, 'WATCH': 0} for s in suspects: tier_counts[s['severity']] = tier_counts.get(s['severity'], 0) + 1 logger.info( 'transshipment detection: pairs=%d (critical=%d, high=%d, watch=%d, ' 'rejected_jurisdiction=%d, rejected_ship_type=%d, rejected_gear=%d, ' 'rejected_duration=%d, candidates=%d)', len(suspects), tier_counts.get('CRITICAL', 0), tier_counts.get('HIGH', 0), tier_counts.get('WATCH', 0), rejected_jurisdiction, rejected_ship_type, rejected_gear, rejected_duration, len(candidates), ) return suspects