"""환적(Transshipment) 의심 선박 탐지 — 서버사이드 O(n log n) 구현. 프론트엔드 useKoreaFilters.ts의 O(n²) 근접 탐지를 대체한다. scipy 미설치 환경을 고려하여 그리드 기반 공간 인덱스를 사용한다. 알고리즘 개요: 1. 후보 선박 필터: sog < 2kn, 선종 (tanker/cargo/fishing), 외국 해안선 제외 2. 그리드 셀 기반 근접 쌍 탐지: O(n log n) ← 셀 분할 + 인접 9셀 조회 3. pair_history dict로 쌍별 최초 탐지 시각 영속화 (호출 간 유지) 4. 60분 이상 지속 근접 시 의심 쌍으로 판정 """ from __future__ import annotations import logging import math from datetime import datetime, timezone from typing import Optional import pandas as pd logger = logging.getLogger(__name__) # ────────────────────────────────────────────────────────────── # 상수 # ────────────────────────────────────────────────────────────── SOG_THRESHOLD_KN = 2.0 # 정박/표류 기준 속도 (노트) PROXIMITY_DEG = 0.001 # 근접 판정 임계값 (~110m) SUSPECT_DURATION_MIN = 60 # 의심 판정 최소 지속 시간 (분) PAIR_EXPIRY_MIN = 120 # pair_history 항목 만료 기준 (분) # 외국 해안 근접 제외 경계 _CN_LON_MAX = 123.5 # 중국 해안: 경도 < 123.5 _JP_LON_MIN = 130.5 # 일본 해안: 경도 > 130.5 _TSUSHIMA_LAT_MIN = 33.8 # 대마도: 위도 > 33.8 AND 경도 > 129.0 _TSUSHIMA_LON_MIN = 129.0 # 탐지 대상 선종 (소문자 정규화 후 비교) _CANDIDATE_TYPES: frozenset[str] = frozenset({'tanker', 'cargo', 'fishing'}) # 그리드 셀 크기 = PROXIMITY_DEG (셀 하나 = 근접 임계와 동일 크기) _GRID_CELL_DEG = PROXIMITY_DEG # ────────────────────────────────────────────────────────────── # 내부 헬퍼 # ────────────────────────────────────────────────────────────── def _is_near_foreign_coast(lat: float, lon: float) -> bool: """외국 해안 근처 여부 — 중국/일본/대마도 경계 확인.""" if lon < _CN_LON_MAX: return True if lon > _JP_LON_MIN: return True if lat > _TSUSHIMA_LAT_MIN and lon > _TSUSHIMA_LON_MIN: return True return False def _cell_key(lat: float, lon: float) -> tuple[int, int]: """위도/경도를 그리드 셀 인덱스로 변환.""" return (int(math.floor(lat / _GRID_CELL_DEG)), int(math.floor(lon / _GRID_CELL_DEG))) def _build_grid(records: list[dict]) -> dict[tuple[int, int], list[int]]: """선박 리스트를 그리드 셀로 분류. Returns: {(row, col): [record index, ...]} """ grid: dict[tuple[int, int], list[int]] = {} for idx, rec in enumerate(records): key = _cell_key(rec['lat'], rec['lon']) if key not in grid: grid[key] = [] grid[key].append(idx) return grid def _within_proximity(a: dict, b: dict) -> bool: """두 선박이 PROXIMITY_DEG 이내인지 확인 (위경도 직교 근사).""" dlat = abs(a['lat'] - b['lat']) if dlat >= PROXIMITY_DEG: return False cos_lat = math.cos(math.radians((a['lat'] + b['lat']) / 2.0)) dlon_scaled = abs(a['lon'] - b['lon']) * cos_lat return dlon_scaled < PROXIMITY_DEG def _normalize_type(raw: Optional[str]) -> str: """선종 문자열 소문자 정규화.""" if not raw: return '' return raw.strip().lower() def _pair_key(mmsi_a: str, mmsi_b: str) -> tuple[str, str]: """MMSI 순서를 정규화하여 중복 쌍 방지.""" return (mmsi_a, mmsi_b) if mmsi_a < mmsi_b else (mmsi_b, mmsi_a) def _evict_expired_pairs( pair_history: dict[tuple[str, str], datetime], now: datetime, ) -> None: """PAIR_EXPIRY_MIN 이상 갱신 없는 pair_history 항목 제거.""" expired = [ key for key, first_seen in pair_history.items() if (now - first_seen).total_seconds() / 60 > PAIR_EXPIRY_MIN ] for key in expired: del pair_history[key] # ────────────────────────────────────────────────────────────── # 공개 API # ────────────────────────────────────────────────────────────── def detect_transshipment( df: pd.DataFrame, pair_history: dict[tuple[str, str], datetime], ) -> list[tuple[str, str, int]]: """환적 의심 쌍 탐지. Args: df: 선박 위치 DataFrame. 필수 컬럼: mmsi, lat, lon, sog 선택 컬럼: ship_type (없으면 전체 선종 허용) pair_history: 쌍별 최초 탐지 시각을 저장하는 영속 dict. 스케줄러에서 호출 간 유지하여 전달해야 한다. 키: (mmsi_a, mmsi_b) — mmsi_a < mmsi_b 정규화 적용. 값: 최초 탐지 시각 (UTC datetime, timezone-aware). Returns: [(mmsi_a, mmsi_b, duration_minutes), ...] — 60분 이상 지속된 의심 쌍. mmsi_a < mmsi_b 정규화 적용. """ if df.empty: return [] required_cols = {'mmsi', 'lat', 'lon', 'sog'} missing = required_cols - set(df.columns) if missing: logger.error('detect_transshipment: missing required columns: %s', missing) return [] now = datetime.now(timezone.utc) # ── 1. 후보 선박 필터 ────────────────────────────────────── has_type_col = 'ship_type' in df.columns candidate_mask = df['sog'] < SOG_THRESHOLD_KN if has_type_col: type_mask = df['ship_type'].apply(_normalize_type).isin(_CANDIDATE_TYPES) candidate_mask = candidate_mask & type_mask candidates = df[candidate_mask].copy() if candidates.empty: _evict_expired_pairs(pair_history, now) return [] # 외국 해안 근처 제외 coast_mask = candidates.apply( lambda row: not _is_near_foreign_coast(row['lat'], row['lon']), axis=1, ) candidates = candidates[coast_mask] if len(candidates) < 2: _evict_expired_pairs(pair_history, now) return [] records = candidates[['mmsi', 'lat', 'lon']].to_dict('records') for rec in records: rec['mmsi'] = str(rec['mmsi']) # ── 2. 그리드 기반 근접 쌍 탐지 ────────────────────────── grid = _build_grid(records) active_pairs: set[tuple[str, str]] = set() for (row, col), indices in grid.items(): # 현재 셀 내부 쌍 for i in range(len(indices)): for j in range(i + 1, len(indices)): a = records[indices[i]] b = records[indices[j]] if _within_proximity(a, b): active_pairs.add(_pair_key(a['mmsi'], b['mmsi'])) # 인접 셀 (우측 3셀 + 아래 3셀 = 중복 없는 방향성 순회) for dr, dc in ((0, 1), (1, -1), (1, 0), (1, 1)): neighbor_key = (row + dr, col + dc) if neighbor_key not in grid: continue for ai in indices: for bi in grid[neighbor_key]: a = records[ai] b = records[bi] if _within_proximity(a, b): active_pairs.add(_pair_key(a['mmsi'], b['mmsi'])) # ── 3. pair_history 갱신 ───────────────────────────────── # 현재 활성 쌍 → 최초 탐지 시각 등록 for pair in active_pairs: if pair not in pair_history: pair_history[pair] = now # 비활성 쌍 → pair_history에서 제거 (다음 접근 시 재시작) inactive = [key for key in pair_history if key not in active_pairs] for key in inactive: del pair_history[key] # 만료 항목 정리 (비활성 제거 후 잔여 방어용) _evict_expired_pairs(pair_history, now) # ── 4. 의심 쌍 판정 ────────────────────────────────────── suspects: list[tuple[str, str, int]] = [] for pair, first_seen in pair_history.items(): duration_min = int((now - first_seen).total_seconds() / 60) if duration_min >= SUSPECT_DURATION_MIN: suspects.append((pair[0], pair[1], duration_min)) if suspects: logger.info( 'transshipment detection: %d suspect pairs (candidates=%d)', len(suspects), len(candidates), ) return suspects