235 lines
9.0 KiB
Python
235 lines
9.0 KiB
Python
"""환적(Transshipment) 의심 선박 탐지 — 서버사이드 O(n log n) 구현.
|
|
|
|
프론트엔드 useKoreaFilters.ts의 O(n²) 근접 탐지를 대체한다.
|
|
scipy 미설치 환경을 고려하여 그리드 기반 공간 인덱스를 사용한다.
|
|
|
|
알고리즘 개요:
|
|
1. 후보 선박 필터: sog < 2kn, 선종 (tanker/cargo/fishing), 외국 해안선 제외
|
|
2. 그리드 셀 기반 근접 쌍 탐지: O(n log n) ← 셀 분할 + 인접 9셀 조회
|
|
3. pair_history dict로 쌍별 최초 탐지 시각 영속화 (호출 간 유지)
|
|
4. 60분 이상 지속 근접 시 의심 쌍으로 판정
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import math
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
import pandas as pd
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ──────────────────────────────────────────────────────────────
|
|
# 상수
|
|
# ──────────────────────────────────────────────────────────────
|
|
|
|
SOG_THRESHOLD_KN = 2.0 # 정박/표류 기준 속도 (노트)
|
|
PROXIMITY_DEG = 0.001 # 근접 판정 임계값 (~110m)
|
|
SUSPECT_DURATION_MIN = 60 # 의심 판정 최소 지속 시간 (분)
|
|
PAIR_EXPIRY_MIN = 120 # pair_history 항목 만료 기준 (분)
|
|
|
|
# 외국 해안 근접 제외 경계
|
|
_CN_LON_MAX = 123.5 # 중국 해안: 경도 < 123.5
|
|
_JP_LON_MIN = 130.5 # 일본 해안: 경도 > 130.5
|
|
_TSUSHIMA_LAT_MIN = 33.8 # 대마도: 위도 > 33.8 AND 경도 > 129.0
|
|
_TSUSHIMA_LON_MIN = 129.0
|
|
|
|
# 탐지 대상 선종 (소문자 정규화 후 비교)
|
|
_CANDIDATE_TYPES: frozenset[str] = frozenset({'tanker', 'cargo', 'fishing'})
|
|
|
|
# 그리드 셀 크기 = PROXIMITY_DEG (셀 하나 = 근접 임계와 동일 크기)
|
|
_GRID_CELL_DEG = PROXIMITY_DEG
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────
|
|
# 내부 헬퍼
|
|
# ──────────────────────────────────────────────────────────────
|
|
|
|
def _is_near_foreign_coast(lat: float, lon: float) -> bool:
|
|
"""외국 해안 근처 여부 — 중국/일본/대마도 경계 확인."""
|
|
if lon < _CN_LON_MAX:
|
|
return True
|
|
if lon > _JP_LON_MIN:
|
|
return True
|
|
if lat > _TSUSHIMA_LAT_MIN and lon > _TSUSHIMA_LON_MIN:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _cell_key(lat: float, lon: float) -> tuple[int, int]:
|
|
"""위도/경도를 그리드 셀 인덱스로 변환."""
|
|
return (int(math.floor(lat / _GRID_CELL_DEG)),
|
|
int(math.floor(lon / _GRID_CELL_DEG)))
|
|
|
|
|
|
def _build_grid(records: list[dict]) -> dict[tuple[int, int], list[int]]:
|
|
"""선박 리스트를 그리드 셀로 분류.
|
|
|
|
Returns: {(row, col): [record index, ...]}
|
|
"""
|
|
grid: dict[tuple[int, int], list[int]] = {}
|
|
for idx, rec in enumerate(records):
|
|
key = _cell_key(rec['lat'], rec['lon'])
|
|
if key not in grid:
|
|
grid[key] = []
|
|
grid[key].append(idx)
|
|
return grid
|
|
|
|
|
|
def _within_proximity(a: dict, b: dict) -> bool:
|
|
"""두 선박이 PROXIMITY_DEG 이내인지 확인 (위경도 직교 근사)."""
|
|
dlat = abs(a['lat'] - b['lat'])
|
|
if dlat >= PROXIMITY_DEG:
|
|
return False
|
|
cos_lat = math.cos(math.radians((a['lat'] + b['lat']) / 2.0))
|
|
dlon_scaled = abs(a['lon'] - b['lon']) * cos_lat
|
|
return dlon_scaled < PROXIMITY_DEG
|
|
|
|
|
|
def _normalize_type(raw: Optional[str]) -> str:
|
|
"""선종 문자열 소문자 정규화."""
|
|
if not raw:
|
|
return ''
|
|
return raw.strip().lower()
|
|
|
|
|
|
def _pair_key(mmsi_a: str, mmsi_b: str) -> tuple[str, str]:
|
|
"""MMSI 순서를 정규화하여 중복 쌍 방지."""
|
|
return (mmsi_a, mmsi_b) if mmsi_a < mmsi_b else (mmsi_b, mmsi_a)
|
|
|
|
|
|
def _evict_expired_pairs(
|
|
pair_history: dict[tuple[str, str], datetime],
|
|
now: datetime,
|
|
) -> None:
|
|
"""PAIR_EXPIRY_MIN 이상 갱신 없는 pair_history 항목 제거."""
|
|
expired = [
|
|
key for key, first_seen in pair_history.items()
|
|
if (now - first_seen).total_seconds() / 60 > PAIR_EXPIRY_MIN
|
|
]
|
|
for key in expired:
|
|
del pair_history[key]
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────
|
|
# 공개 API
|
|
# ──────────────────────────────────────────────────────────────
|
|
|
|
def detect_transshipment(
|
|
df: pd.DataFrame,
|
|
pair_history: dict[tuple[str, str], datetime],
|
|
) -> list[tuple[str, str, int]]:
|
|
"""환적 의심 쌍 탐지.
|
|
|
|
Args:
|
|
df: 선박 위치 DataFrame.
|
|
필수 컬럼: mmsi, lat, lon, sog
|
|
선택 컬럼: ship_type (없으면 전체 선종 허용)
|
|
pair_history: 쌍별 최초 탐지 시각을 저장하는 영속 dict.
|
|
스케줄러에서 호출 간 유지하여 전달해야 한다.
|
|
키: (mmsi_a, mmsi_b) — mmsi_a < mmsi_b 정규화 적용.
|
|
값: 최초 탐지 시각 (UTC datetime, timezone-aware).
|
|
|
|
Returns:
|
|
[(mmsi_a, mmsi_b, duration_minutes), ...] — 60분 이상 지속된 의심 쌍.
|
|
mmsi_a < mmsi_b 정규화 적용.
|
|
"""
|
|
if df.empty:
|
|
return []
|
|
|
|
required_cols = {'mmsi', 'lat', 'lon', 'sog'}
|
|
missing = required_cols - set(df.columns)
|
|
if missing:
|
|
logger.error('detect_transshipment: missing required columns: %s', missing)
|
|
return []
|
|
|
|
now = datetime.now(timezone.utc)
|
|
|
|
# ── 1. 후보 선박 필터 ──────────────────────────────────────
|
|
has_type_col = 'ship_type' in df.columns
|
|
|
|
candidate_mask = df['sog'] < SOG_THRESHOLD_KN
|
|
|
|
if has_type_col:
|
|
type_mask = df['ship_type'].apply(_normalize_type).isin(_CANDIDATE_TYPES)
|
|
candidate_mask = candidate_mask & type_mask
|
|
|
|
candidates = df[candidate_mask].copy()
|
|
|
|
if candidates.empty:
|
|
_evict_expired_pairs(pair_history, now)
|
|
return []
|
|
|
|
# 외국 해안 근처 제외
|
|
coast_mask = candidates.apply(
|
|
lambda row: not _is_near_foreign_coast(row['lat'], row['lon']),
|
|
axis=1,
|
|
)
|
|
candidates = candidates[coast_mask]
|
|
|
|
if len(candidates) < 2:
|
|
_evict_expired_pairs(pair_history, now)
|
|
return []
|
|
|
|
records = candidates[['mmsi', 'lat', 'lon']].to_dict('records')
|
|
for rec in records:
|
|
rec['mmsi'] = str(rec['mmsi'])
|
|
|
|
# ── 2. 그리드 기반 근접 쌍 탐지 ──────────────────────────
|
|
grid = _build_grid(records)
|
|
active_pairs: set[tuple[str, str]] = set()
|
|
|
|
for (row, col), indices in grid.items():
|
|
# 현재 셀 내부 쌍
|
|
for i in range(len(indices)):
|
|
for j in range(i + 1, len(indices)):
|
|
a = records[indices[i]]
|
|
b = records[indices[j]]
|
|
if _within_proximity(a, b):
|
|
active_pairs.add(_pair_key(a['mmsi'], b['mmsi']))
|
|
|
|
# 인접 셀 (우측 3셀 + 아래 3셀 = 중복 없는 방향성 순회)
|
|
for dr, dc in ((0, 1), (1, -1), (1, 0), (1, 1)):
|
|
neighbor_key = (row + dr, col + dc)
|
|
if neighbor_key not in grid:
|
|
continue
|
|
for ai in indices:
|
|
for bi in grid[neighbor_key]:
|
|
a = records[ai]
|
|
b = records[bi]
|
|
if _within_proximity(a, b):
|
|
active_pairs.add(_pair_key(a['mmsi'], b['mmsi']))
|
|
|
|
# ── 3. pair_history 갱신 ─────────────────────────────────
|
|
# 현재 활성 쌍 → 최초 탐지 시각 등록
|
|
for pair in active_pairs:
|
|
if pair not in pair_history:
|
|
pair_history[pair] = now
|
|
|
|
# 비활성 쌍 → pair_history에서 제거 (다음 접근 시 재시작)
|
|
inactive = [key for key in pair_history if key not in active_pairs]
|
|
for key in inactive:
|
|
del pair_history[key]
|
|
|
|
# 만료 항목 정리 (비활성 제거 후 잔여 방어용)
|
|
_evict_expired_pairs(pair_history, now)
|
|
|
|
# ── 4. 의심 쌍 판정 ──────────────────────────────────────
|
|
suspects: list[tuple[str, str, int]] = []
|
|
|
|
for pair, first_seen in pair_history.items():
|
|
duration_min = int((now - first_seen).total_seconds() / 60)
|
|
if duration_min >= SUSPECT_DURATION_MIN:
|
|
suspects.append((pair[0], pair[1], duration_min))
|
|
|
|
if suspects:
|
|
logger.info(
|
|
'transshipment detection: %d suspect pairs (candidates=%d)',
|
|
len(suspects),
|
|
len(candidates),
|
|
)
|
|
|
|
return suspects
|