kcg-monitoring/prediction/algorithms/transshipment.py

235 lines
9.0 KiB
Python

"""환적(Transshipment) 의심 선박 탐지 — 서버사이드 O(n log n) 구현.
프론트엔드 useKoreaFilters.ts의 O(n²) 근접 탐지를 대체한다.
scipy 미설치 환경을 고려하여 그리드 기반 공간 인덱스를 사용한다.
알고리즘 개요:
1. 후보 선박 필터: sog < 2kn, 선종 (tanker/cargo/fishing), 외국 해안선 제외
2. 그리드 셀 기반 근접 쌍 탐지: O(n log n) ← 셀 분할 + 인접 9셀 조회
3. pair_history dict로 쌍별 최초 탐지 시각 영속화 (호출 간 유지)
4. 60분 이상 지속 근접 시 의심 쌍으로 판정
"""
from __future__ import annotations
import logging
import math
from datetime import datetime, timezone
from typing import Optional
import pandas as pd
logger = logging.getLogger(__name__)
# ──────────────────────────────────────────────────────────────
# 상수
# ──────────────────────────────────────────────────────────────
SOG_THRESHOLD_KN = 2.0 # 정박/표류 기준 속도 (노트)
PROXIMITY_DEG = 0.001 # 근접 판정 임계값 (~110m)
SUSPECT_DURATION_MIN = 60 # 의심 판정 최소 지속 시간 (분)
PAIR_EXPIRY_MIN = 120 # pair_history 항목 만료 기준 (분)
# 외국 해안 근접 제외 경계
_CN_LON_MAX = 123.5 # 중국 해안: 경도 < 123.5
_JP_LON_MIN = 130.5 # 일본 해안: 경도 > 130.5
_TSUSHIMA_LAT_MIN = 33.8 # 대마도: 위도 > 33.8 AND 경도 > 129.0
_TSUSHIMA_LON_MIN = 129.0
# 탐지 대상 선종 (소문자 정규화 후 비교)
_CANDIDATE_TYPES: frozenset[str] = frozenset({'tanker', 'cargo', 'fishing'})
# 그리드 셀 크기 = PROXIMITY_DEG (셀 하나 = 근접 임계와 동일 크기)
_GRID_CELL_DEG = PROXIMITY_DEG
# ──────────────────────────────────────────────────────────────
# 내부 헬퍼
# ──────────────────────────────────────────────────────────────
def _is_near_foreign_coast(lat: float, lon: float) -> bool:
"""외국 해안 근처 여부 — 중국/일본/대마도 경계 확인."""
if lon < _CN_LON_MAX:
return True
if lon > _JP_LON_MIN:
return True
if lat > _TSUSHIMA_LAT_MIN and lon > _TSUSHIMA_LON_MIN:
return True
return False
def _cell_key(lat: float, lon: float) -> tuple[int, int]:
"""위도/경도를 그리드 셀 인덱스로 변환."""
return (int(math.floor(lat / _GRID_CELL_DEG)),
int(math.floor(lon / _GRID_CELL_DEG)))
def _build_grid(records: list[dict]) -> dict[tuple[int, int], list[int]]:
"""선박 리스트를 그리드 셀로 분류.
Returns: {(row, col): [record index, ...]}
"""
grid: dict[tuple[int, int], list[int]] = {}
for idx, rec in enumerate(records):
key = _cell_key(rec['lat'], rec['lon'])
if key not in grid:
grid[key] = []
grid[key].append(idx)
return grid
def _within_proximity(a: dict, b: dict) -> bool:
"""두 선박이 PROXIMITY_DEG 이내인지 확인 (위경도 직교 근사)."""
dlat = abs(a['lat'] - b['lat'])
if dlat >= PROXIMITY_DEG:
return False
cos_lat = math.cos(math.radians((a['lat'] + b['lat']) / 2.0))
dlon_scaled = abs(a['lon'] - b['lon']) * cos_lat
return dlon_scaled < PROXIMITY_DEG
def _normalize_type(raw: Optional[str]) -> str:
"""선종 문자열 소문자 정규화."""
if not raw:
return ''
return raw.strip().lower()
def _pair_key(mmsi_a: str, mmsi_b: str) -> tuple[str, str]:
"""MMSI 순서를 정규화하여 중복 쌍 방지."""
return (mmsi_a, mmsi_b) if mmsi_a < mmsi_b else (mmsi_b, mmsi_a)
def _evict_expired_pairs(
pair_history: dict[tuple[str, str], datetime],
now: datetime,
) -> None:
"""PAIR_EXPIRY_MIN 이상 갱신 없는 pair_history 항목 제거."""
expired = [
key for key, first_seen in pair_history.items()
if (now - first_seen).total_seconds() / 60 > PAIR_EXPIRY_MIN
]
for key in expired:
del pair_history[key]
# ──────────────────────────────────────────────────────────────
# 공개 API
# ──────────────────────────────────────────────────────────────
def detect_transshipment(
df: pd.DataFrame,
pair_history: dict[tuple[str, str], datetime],
) -> list[tuple[str, str, int]]:
"""환적 의심 쌍 탐지.
Args:
df: 선박 위치 DataFrame.
필수 컬럼: mmsi, lat, lon, sog
선택 컬럼: ship_type (없으면 전체 선종 허용)
pair_history: 쌍별 최초 탐지 시각을 저장하는 영속 dict.
스케줄러에서 호출 간 유지하여 전달해야 한다.
키: (mmsi_a, mmsi_b) — mmsi_a < mmsi_b 정규화 적용.
값: 최초 탐지 시각 (UTC datetime, timezone-aware).
Returns:
[(mmsi_a, mmsi_b, duration_minutes), ...] — 60분 이상 지속된 의심 쌍.
mmsi_a < mmsi_b 정규화 적용.
"""
if df.empty:
return []
required_cols = {'mmsi', 'lat', 'lon', 'sog'}
missing = required_cols - set(df.columns)
if missing:
logger.error('detect_transshipment: missing required columns: %s', missing)
return []
now = datetime.now(timezone.utc)
# ── 1. 후보 선박 필터 ──────────────────────────────────────
has_type_col = 'ship_type' in df.columns
candidate_mask = df['sog'] < SOG_THRESHOLD_KN
if has_type_col:
type_mask = df['ship_type'].apply(_normalize_type).isin(_CANDIDATE_TYPES)
candidate_mask = candidate_mask & type_mask
candidates = df[candidate_mask].copy()
if candidates.empty:
_evict_expired_pairs(pair_history, now)
return []
# 외국 해안 근처 제외
coast_mask = candidates.apply(
lambda row: not _is_near_foreign_coast(row['lat'], row['lon']),
axis=1,
)
candidates = candidates[coast_mask]
if len(candidates) < 2:
_evict_expired_pairs(pair_history, now)
return []
records = candidates[['mmsi', 'lat', 'lon']].to_dict('records')
for rec in records:
rec['mmsi'] = str(rec['mmsi'])
# ── 2. 그리드 기반 근접 쌍 탐지 ──────────────────────────
grid = _build_grid(records)
active_pairs: set[tuple[str, str]] = set()
for (row, col), indices in grid.items():
# 현재 셀 내부 쌍
for i in range(len(indices)):
for j in range(i + 1, len(indices)):
a = records[indices[i]]
b = records[indices[j]]
if _within_proximity(a, b):
active_pairs.add(_pair_key(a['mmsi'], b['mmsi']))
# 인접 셀 (우측 3셀 + 아래 3셀 = 중복 없는 방향성 순회)
for dr, dc in ((0, 1), (1, -1), (1, 0), (1, 1)):
neighbor_key = (row + dr, col + dc)
if neighbor_key not in grid:
continue
for ai in indices:
for bi in grid[neighbor_key]:
a = records[ai]
b = records[bi]
if _within_proximity(a, b):
active_pairs.add(_pair_key(a['mmsi'], b['mmsi']))
# ── 3. pair_history 갱신 ─────────────────────────────────
# 현재 활성 쌍 → 최초 탐지 시각 등록
for pair in active_pairs:
if pair not in pair_history:
pair_history[pair] = now
# 비활성 쌍 → pair_history에서 제거 (다음 접근 시 재시작)
inactive = [key for key in pair_history if key not in active_pairs]
for key in inactive:
del pair_history[key]
# 만료 항목 정리 (비활성 제거 후 잔여 방어용)
_evict_expired_pairs(pair_history, now)
# ── 4. 의심 쌍 판정 ──────────────────────────────────────
suspects: list[tuple[str, str, int]] = []
for pair, first_seen in pair_history.items():
duration_min = int((now - first_seen).total_seconds() / 60)
if duration_min >= SUSPECT_DURATION_MIN:
suspects.append((pair[0], pair[1], duration_min))
if suspects:
logger.info(
'transshipment detection: %d suspect pairs (candidates=%d)',
len(suspects),
len(candidates),
)
return suspects