kcg-ai-monitoring/prediction/algorithms/transshipment.py

537 lines
20 KiB
Python
Raw Blame 히스토리

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""환적(Transshipment) 의심 선박 탐지 — 5단계 필터 파이프라인.
실무 목표: 일일 10건 미만 고신뢰 의심 건.
5단계 필터:
Stage 1: 이종 쌍 필수 (어선 ↔ 운반선) — shipKindCode 기반
Stage 2: 감시영역(Monitoring Zone) 내 선박만 대상
Stage 3: 3단계 패턴 검증 (APPROACH → RENDEZVOUS → DEPARTURE)
Stage 4: 점수 산출 (0~100, 50점 미만 미출력)
Stage 5: 밀집 방폭 (1 운반선 : 최대 1 어선)
"""
from __future__ import annotations
import json
import logging
import math
import os
from datetime import datetime, timezone
from typing import Callable, Optional
import pandas as pd
from fleet_tracker import GEAR_PATTERN
logger = logging.getLogger(__name__)
# ──────────────────────────────────────────────────────────────
# 상수
# ──────────────────────────────────────────────────────────────
SOG_THRESHOLD_KN = 2.0 # 저속 기준 (접현 가능 속도)
PROXIMITY_DEG = 0.002 # ~220m (0.0007 → 0.002 상향: 접현 가능 범위)
APPROACH_DEG = 0.01 # ~1.1km (접근 판정 거리)
RENDEZVOUS_MIN = 90 # 체류 최소 시간 (45 → 90분)
PAIR_EXPIRY_MIN = 240 # 쌍 만료 시간
GAP_TOLERANCE_CYCLES = 3 # 3 사이클(15분)까지 miss 허용
# 선종 분류 (shipKindCode 기반)
_FISHING_KINDS = frozenset({'000020'})
# 운반선: 화물선(000023) + 유조선(000024)만. 000027(기타)은 shipTy로 2차 판정
_CARRIER_KINDS = frozenset({'000023', '000024'})
# 환적 불가 선종 (shipTy 텍스트 기반 2차 필터)
_EXCLUDED_SHIP_TY = frozenset({
'Tug', 'Pilot Boat', 'Search And Rescue', 'Law Enforcement',
'AtoN', 'Anti Pollution', 'Passenger', 'Medical Transport',
})
# ──────────────────────────────────────────────────────────────
# 감시영역 로드
# ──────────────────────────────────────────────────────────────
_ZONES_FILE = os.path.join(os.path.dirname(__file__), '..', 'data', 'monitoring_zones.json')
_TRANSSHIP_ZONES: list[dict] = []
def _load_monitoring_zones() -> None:
global _TRANSSHIP_ZONES
try:
with open(_ZONES_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
_TRANSSHIP_ZONES = [
z for z in data.get('zones', [])
if z.get('type') == 'TRANSSHIP' and z.get('enabled')
]
logger.info('loaded %d transship monitoring zones', len(_TRANSSHIP_ZONES))
except Exception as e:
logger.warning('failed to load monitoring zones: %s', e)
_TRANSSHIP_ZONES = []
_load_monitoring_zones()
def _point_in_polygon(lat: float, lon: float, polygon: list[list[float]]) -> bool:
"""Ray-casting point-in-polygon. polygon: [[lon, lat], ...]"""
n = len(polygon)
inside = False
j = n - 1
for i in range(n):
xi, yi = polygon[i][0], polygon[i][1] # lon, lat
xj, yj = polygon[j][0], polygon[j][1]
if ((yi > lat) != (yj > lat)) and (lon < (xj - xi) * (lat - yi) / (yj - yi) + xi):
inside = not inside
j = i
return inside
def _is_in_transship_zone(lat: float, lon: float) -> Optional[str]:
"""감시영역 내 여부. 해당 zone ID 반환, 미해당 시 None."""
for zone in _TRANSSHIP_ZONES:
if _point_in_polygon(lat, lon, zone['polygon']):
return zone['id']
return None
# ──────────────────────────────────────────────────────────────
# Stage 1: 이종 쌍 필터
# ──────────────────────────────────────────────────────────────
def _classify_vessel_role(
ship_kind_code: str,
ship_ty: str,
) -> str:
"""선박 역할 분류: 'FISHING', 'CARRIER', 'EXCLUDED', 'UNKNOWN'"""
if ship_kind_code in _FISHING_KINDS:
return 'FISHING'
if ship_kind_code in _CARRIER_KINDS:
# 화물선/유조선 — shipTy가 예인선/관공선이면 제외
if ship_ty in _EXCLUDED_SHIP_TY:
return 'EXCLUDED'
return 'CARRIER'
# 000027(기타) / 000028(미분류): shipTy 텍스트로 엄격 판정
if ship_kind_code in ('000027', '000028'):
if ship_ty == 'Cargo':
return 'CARRIER'
if ship_ty == 'Tanker':
return 'CARRIER'
if ship_ty in _EXCLUDED_SHIP_TY:
return 'EXCLUDED'
# N/A, Vessel, 기타 → UNKNOWN (환적 후보에서 제외)
return 'UNKNOWN'
# 000021(함정), 000022(여객), 000025(관공) → 제외
if ship_kind_code in ('000021', '000022', '000025'):
return 'EXCLUDED'
return 'UNKNOWN'
def _is_transship_pair(role_a: str, role_b: str) -> bool:
"""어선 + 운반선 조합만 True."""
return (role_a == 'FISHING' and role_b == 'CARRIER') or \
(role_b == 'FISHING' and role_a == 'CARRIER')
# ──────────────────────────────────────────────────────────────
# 내부 헬퍼
# ──────────────────────────────────────────────────────────────
def _haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
R = 3440.065
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat/2)**2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon/2)**2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def _within_proximity(a: dict, b: dict) -> bool:
"""PROXIMITY_DEG 이내인지 (위경도 근사)."""
dlat = abs(a['lat'] - b['lat'])
if dlat >= PROXIMITY_DEG:
return False
cos_lat = math.cos(math.radians((a['lat'] + b['lat']) / 2.0))
dlon_scaled = abs(a['lon'] - b['lon']) * cos_lat
return dlon_scaled < PROXIMITY_DEG
def _within_approach(a: dict, b: dict) -> bool:
"""APPROACH_DEG 이내인지 (~1km)."""
dlat = abs(a['lat'] - b['lat'])
if dlat >= APPROACH_DEG:
return False
cos_lat = math.cos(math.radians((a['lat'] + b['lat']) / 2.0))
dlon_scaled = abs(a['lon'] - b['lon']) * cos_lat
return dlon_scaled < APPROACH_DEG
def _cell_key(lat: float, lon: float) -> tuple[int, int]:
return (int(math.floor(lat / APPROACH_DEG)),
int(math.floor(lon / APPROACH_DEG)))
def _build_grid(records: list[dict]) -> dict[tuple[int, int], list[int]]:
grid: dict[tuple[int, int], list[int]] = {}
for idx, rec in enumerate(records):
key = _cell_key(rec['lat'], rec['lon'])
if key not in grid:
grid[key] = []
grid[key].append(idx)
return grid
def _pair_key(mmsi_a: str, mmsi_b: str) -> tuple[str, str]:
return (mmsi_a, mmsi_b) if mmsi_a < mmsi_b else (mmsi_b, mmsi_a)
def _is_gear_name(name: Optional[str]) -> bool:
if not name:
return False
return bool(GEAR_PATTERN.match(name))
# ──────────────────────────────────────────────────────────────
# Stage 4: 점수 산출
# ──────────────────────────────────────────────────────────────
def _score_pair(
meta: dict,
now: datetime,
is_permitted_fn: Optional[Callable[[str], bool]],
now_kst_hour: int,
zone_id: Optional[str],
vessel_info_a: dict,
vessel_info_b: dict,
) -> Optional[dict]:
"""환적 의심 점수 산출. 50점 미만이면 None."""
phase = meta.get('phase', 'APPROACH')
pair = meta.get('pair')
if not pair:
return None
# 필수: RENDEZVOUS 90분 이상
rendezvous_start = meta.get('rendezvous_start')
if rendezvous_start is None:
return None
rendezvous_min = (now - rendezvous_start).total_seconds() / 60
if rendezvous_min < RENDEZVOUS_MIN:
return None
score = 0
# 3단계 패턴 완성도
has_approach = meta.get('approach_detected', False)
has_departure = meta.get('departure_detected', False)
if has_approach and has_departure:
score += 35
elif has_departure or has_approach:
score += 25
else:
score += 15
# 어선 식별 — pair에서 누가 어선이고 누가 운반선인지
role_a = meta.get('role_a', 'UNKNOWN')
role_b = meta.get('role_b', 'UNKNOWN')
fishing_mmsi = pair[0] if role_a == 'FISHING' else pair[1]
carrier_mmsi = pair[1] if role_a == 'FISHING' else pair[0]
# 운반선 무허가
if is_permitted_fn is not None:
try:
if not is_permitted_fn(carrier_mmsi):
score += 15
except Exception:
pass
# 야간 (20:00~04:00 KST)
if now_kst_hour >= 20 or now_kst_hour < 4:
score += 10
# 수역 가점 (감시영역 종류에 따라)
if zone_id:
if 'EEZ' in zone_id.upper() or '001' in zone_id:
score += 15
else:
score += 10
# 체류 시간 bonus
if rendezvous_min >= 180:
score += 10
elif rendezvous_min >= 120:
score += 5
# heading 병행 판정 (향후 detail API 데이터)
heading_a = vessel_info_a.get('heading')
heading_b = vessel_info_b.get('heading')
if heading_a is not None and heading_b is not None:
diff = abs(heading_a - heading_b)
if diff > 180:
diff = 360 - diff
if diff < 20: # 평행 = 접현 가능
score += 10
score = max(0, min(100, score))
if score < 50:
return None
if score >= 70:
severity = 'CRITICAL'
else:
severity = 'HIGH'
lat = meta.get('last_lat', 0)
lon = meta.get('last_lon', 0)
return {
'pair_a': pair[0],
'pair_b': pair[1],
'fishing_mmsi': fishing_mmsi,
'carrier_mmsi': carrier_mmsi,
'duration_min': int(rendezvous_min),
'severity': severity,
'score': score,
'lat': lat,
'lon': lon,
'zone_id': zone_id,
'phase': phase,
'has_approach': has_approach,
'has_departure': has_departure,
}
# ──────────────────────────────────────────────────────────────
# 공개 API
# ──────────────────────────────────────────────────────────────
def detect_transshipment(
df: pd.DataFrame,
pair_history: dict,
get_vessel_info: Optional[Callable[[str], dict]] = None,
is_permitted: Optional[Callable[[str], bool]] = None,
classify_zone_fn: Optional[Callable[[float, float], dict]] = None,
now_kst_hour: int = 0,
) -> list[dict]:
"""환적 의심 쌍 탐지 — 5단계 필터 파이프라인.
Args:
df: 선박 위치 DataFrame (mmsi, lat, lon, sog 필수)
pair_history: 호출 간 유지되는 쌍 상태 dict
get_vessel_info: mmsi → {ship_kind_code, ship_ty, name, heading, ...}
is_permitted: mmsi → bool (허가 어선 여부)
classify_zone_fn: (lat, lon) → {zone, ...}
now_kst_hour: 현재 KST 시각 (0~23)
Returns:
list[dict] — 의심 쌍 (score >= 50만)
"""
if df.empty:
return []
required_cols = {'mmsi', 'lat', 'lon', 'sog'}
if required_cols - set(df.columns):
return []
now = datetime.now(timezone.utc)
# ── Stage 1+2: 후보 필터 ──────────────────────────────
# SOG < 2kn 선박만
candidates = df[df['sog'] < SOG_THRESHOLD_KN].copy()
if len(candidates) < 2:
_evict_expired(pair_history, now)
return []
# 선종 + 감시영역 필터 → 유효 레코드만
records: list[dict] = []
for _, row in candidates.iterrows():
mmsi = str(row['mmsi'])
lat, lon = float(row['lat']), float(row['lon'])
# Stage 2: 감시영역 내 여부
zone_id = _is_in_transship_zone(lat, lon)
if zone_id is None:
continue
# Stage 1: 선종 분류
info = get_vessel_info(mmsi) if get_vessel_info else {}
kind = info.get('ship_kind_code', '')
ship_ty = info.get('ship_ty', info.get('vessel_type', ''))
role = _classify_vessel_role(kind, ship_ty)
if role in ('EXCLUDED', 'UNKNOWN'):
continue
# 어구 신호명 제외
if _is_gear_name(info.get('name')):
continue
rec = {
'mmsi': mmsi, 'lat': lat, 'lon': lon,
'role': role, 'zone_id': zone_id,
'cog': float(row.get('cog', 0)),
}
records.append(rec)
if len(records) < 2:
_evict_expired(pair_history, now)
return []
# 역할별 분리
fishing_recs = [r for r in records if r['role'] == 'FISHING']
carrier_recs = [r for r in records if r['role'] == 'CARRIER']
if not fishing_recs or not carrier_recs:
_evict_expired(pair_history, now)
return []
# ── 그리드 기반 근접 쌍 탐지 (어선 × 운반선만) ──────
carrier_grid = _build_grid(carrier_recs)
active_pairs: dict[tuple[str, str], dict] = {}
for f_rec in fishing_recs:
f_cell = _cell_key(f_rec['lat'], f_rec['lon'])
# 인접 9셀 탐색
for dr in (-1, 0, 1):
for dc in (-1, 0, 1):
neighbor = (f_cell[0] + dr, f_cell[1] + dc)
if neighbor not in carrier_grid:
continue
for ci in carrier_grid[neighbor]:
c_rec = carrier_recs[ci]
is_close = _within_proximity(f_rec, c_rec)
is_approaching = _within_approach(f_rec, c_rec) if not is_close else False
if not is_close and not is_approaching:
continue
key = _pair_key(f_rec['mmsi'], c_rec['mmsi'])
mid_lat = (f_rec['lat'] + c_rec['lat']) / 2
mid_lon = (f_rec['lon'] + c_rec['lon']) / 2
active_pairs[key] = {
'is_close': is_close,
'is_approaching': is_approaching,
'lat': mid_lat, 'lon': mid_lon,
'zone_id': f_rec['zone_id'],
'role_a': 'FISHING' if key[0] == f_rec['mmsi'] else 'CARRIER',
'role_b': 'CARRIER' if key[0] == f_rec['mmsi'] else 'FISHING',
}
# ── Stage 3: pair_history 상태머신 갱신 ─────────────
for key, loc in active_pairs.items():
meta = pair_history.get(key)
if meta is None or not isinstance(meta, dict):
# 신규 쌍
meta = {
'pair': key,
'phase': 'APPROACH' if loc['is_approaching'] else 'RENDEZVOUS',
'approach_detected': loc['is_approaching'],
'rendezvous_start': now if loc['is_close'] else None,
'departure_detected': False,
'miss_count': 0,
'last_lat': loc['lat'],
'last_lon': loc['lon'],
'zone_id': loc['zone_id'],
'role_a': loc['role_a'],
'role_b': loc['role_b'],
}
pair_history[key] = meta
continue
meta['miss_count'] = 0
meta['last_lat'] = loc['lat']
meta['last_lon'] = loc['lon']
if loc['is_close']:
if meta['phase'] == 'APPROACH':
# 접근 → 체류 전환
meta['phase'] = 'RENDEZVOUS'
meta['approach_detected'] = True
meta['rendezvous_start'] = meta.get('rendezvous_start') or now
elif meta['phase'] == 'DEPARTURE':
# 분리 후 재접근 → 체류 재개
meta['phase'] = 'RENDEZVOUS'
# RENDEZVOUS 상태 유지
if meta.get('rendezvous_start') is None:
meta['rendezvous_start'] = now
elif loc['is_approaching']:
if meta['phase'] == 'RENDEZVOUS':
# 체류 중 이격 시작 → 분리 단계
meta['phase'] = 'DEPARTURE'
meta['departure_detected'] = True
elif meta['phase'] == 'APPROACH':
meta['approach_detected'] = True
# 비활성 쌍 miss_count++
for key in list(pair_history.keys()):
if key in active_pairs:
continue
meta = pair_history[key]
if not isinstance(meta, dict):
del pair_history[key]
continue
meta['miss_count'] = meta.get('miss_count', 0) + 1
# 체류 중이던 쌍이 miss → 분리 가능
if meta.get('phase') == 'RENDEZVOUS' and meta['miss_count'] >= 2:
meta['phase'] = 'DEPARTURE'
meta['departure_detected'] = True
if meta['miss_count'] > GAP_TOLERANCE_CYCLES:
del pair_history[key]
_evict_expired(pair_history, now)
# ── Stage 4+5: 점수 산출 + 밀집 방폭 ────────────────
raw_suspects: list[dict] = []
for key, meta in pair_history.items():
if not isinstance(meta, dict):
continue
info_a = get_vessel_info(key[0]) if get_vessel_info else {}
info_b = get_vessel_info(key[1]) if get_vessel_info else {}
scored = _score_pair(
meta, now, is_permitted, now_kst_hour,
meta.get('zone_id'), info_a, info_b,
)
if scored is not None:
raw_suspects.append(scored)
# Stage 5: 밀집 방폭 — 1 운반선 : 최대 1 어선 (최고 점수)
carrier_best: dict[str, dict] = {}
for s in raw_suspects:
carrier = s['carrier_mmsi']
if carrier not in carrier_best or s['score'] > carrier_best[carrier]['score']:
carrier_best[carrier] = s
suspects = list(carrier_best.values())
# 로그
tier_counts = {'CRITICAL': 0, 'HIGH': 0}
for s in suspects:
tier_counts[s['severity']] = tier_counts.get(s['severity'], 0) + 1
logger.info(
'transshipment: pairs=%d (critical=%d, high=%d), '
'candidates: fishing=%d carrier=%d, active_pairs=%d, history=%d',
len(suspects), tier_counts.get('CRITICAL', 0), tier_counts.get('HIGH', 0),
len(fishing_recs), len(carrier_recs), len(active_pairs), len(pair_history),
)
return suspects
def _evict_expired(pair_history: dict, now: datetime) -> None:
"""PAIR_EXPIRY_MIN 이상 갱신 없는 쌍 제거."""
expired = []
for key, meta in pair_history.items():
if not isinstance(meta, dict):
expired.append(key)
continue
miss = meta.get('miss_count', 0)
if miss > GAP_TOLERANCE_CYCLES:
expired.append(key)
for key in expired:
del pair_history[key]