"""환적(Transshipment) 의심 선박 탐지 — 5단계 필터 파이프라인. 실무 목표: 일일 10건 미만 고신뢰 의심 건. 5단계 필터: Stage 1: 이종 쌍 필수 (어선 ↔ 운반선) — shipKindCode 기반 Stage 2: 감시영역(Monitoring Zone) 내 선박만 대상 Stage 3: 3단계 패턴 검증 (APPROACH → RENDEZVOUS → DEPARTURE) Stage 4: 점수 산출 (0~100, 50점 미만 미출력) Stage 5: 밀집 방폭 (1 운반선 : 최대 1 어선) """ from __future__ import annotations import json import logging import math import os from datetime import datetime, timezone from typing import Callable, Optional import pandas as pd from fleet_tracker import GEAR_PATTERN logger = logging.getLogger(__name__) # ────────────────────────────────────────────────────────────── # 상수 # ────────────────────────────────────────────────────────────── SOG_THRESHOLD_KN = 2.0 # 저속 기준 (접현 가능 속도) PROXIMITY_DEG = 0.002 # ~220m (0.0007 → 0.002 상향: 접현 가능 범위) APPROACH_DEG = 0.01 # ~1.1km (접근 판정 거리) RENDEZVOUS_MIN = 90 # 체류 최소 시간 (45 → 90분) PAIR_EXPIRY_MIN = 240 # 쌍 만료 시간 GAP_TOLERANCE_CYCLES = 3 # 3 사이클(15분)까지 miss 허용 # 선종 분류 (shipKindCode 기반) _FISHING_KINDS = frozenset({'000020'}) # 운반선: 화물선(000023) + 유조선(000024)만. 000027(기타)은 shipTy로 2차 판정 _CARRIER_KINDS = frozenset({'000023', '000024'}) # 환적 불가 선종 (shipTy 텍스트 기반 2차 필터) _EXCLUDED_SHIP_TY = frozenset({ 'Tug', 'Pilot Boat', 'Search And Rescue', 'Law Enforcement', 'AtoN', 'Anti Pollution', 'Passenger', 'Medical Transport', }) # shipTy 텍스트에 포함되면 CARRIER 로 승격 (부분일치, 대소문자 무시) _CARRIER_HINTS = ('cargo', 'tanker', 'supply', 'carrier', 'reefer') # ────────────────────────────────────────────────────────────── # 감시영역 로드 # ────────────────────────────────────────────────────────────── _ZONES_FILE = os.path.join(os.path.dirname(__file__), '..', 'data', 'monitoring_zones.json') _TRANSSHIP_ZONES: list[dict] = [] def _load_monitoring_zones() -> None: global _TRANSSHIP_ZONES try: with open(_ZONES_FILE, 'r', encoding='utf-8') as f: data = json.load(f) _TRANSSHIP_ZONES = [ z for z in data.get('zones', []) if z.get('type') == 'TRANSSHIP' and z.get('enabled') ] logger.info('loaded %d transship monitoring zones', len(_TRANSSHIP_ZONES)) except Exception as e: logger.warning('failed to load monitoring zones: %s', e) _TRANSSHIP_ZONES = [] _load_monitoring_zones() def _point_in_polygon(lat: float, lon: float, polygon: list[list[float]]) -> bool: """Ray-casting point-in-polygon. polygon: [[lon, lat], ...]""" n = len(polygon) inside = False j = n - 1 for i in range(n): xi, yi = polygon[i][0], polygon[i][1] # lon, lat xj, yj = polygon[j][0], polygon[j][1] if ((yi > lat) != (yj > lat)) and (lon < (xj - xi) * (lat - yi) / (yj - yi) + xi): inside = not inside j = i return inside def _is_in_transship_zone(lat: float, lon: float) -> Optional[str]: """감시영역 내 여부. 해당 zone ID 반환, 미해당 시 None.""" for zone in _TRANSSHIP_ZONES: if _point_in_polygon(lat, lon, zone['polygon']): return zone['id'] return None # ────────────────────────────────────────────────────────────── # Stage 1: 이종 쌍 필터 # ────────────────────────────────────────────────────────────── def _classify_vessel_role( ship_kind_code: str, ship_ty: str, mmsi: str = '', ) -> str: """선박 역할 분류: 'FISHING', 'CARRIER', 'EXCLUDED', 'UNKNOWN'""" st_lower = ship_ty.lower() if isinstance(ship_ty, str) else '' if ship_kind_code in _FISHING_KINDS: return 'FISHING' if ship_kind_code in _CARRIER_KINDS: if ship_ty in _EXCLUDED_SHIP_TY: return 'EXCLUDED' return 'CARRIER' # 000027/000028/기타: shipTy 텍스트 부분일치로 완화 if ship_kind_code in ('000027', '000028', ''): if ship_ty in _EXCLUDED_SHIP_TY: return 'EXCLUDED' if any(h in st_lower for h in _CARRIER_HINTS): return 'CARRIER' if 'fishing' in st_lower: return 'FISHING' # 중국 412* 허가어선은 ship_kind/shipTy 불명이어도 FISHING 간주 if mmsi.startswith('412'): return 'FISHING' return 'UNKNOWN' # 000021(함정), 000022(여객), 000025(관공) → 제외 if ship_kind_code in ('000021', '000022', '000025'): return 'EXCLUDED' return 'UNKNOWN' def _is_transship_pair(role_a: str, role_b: str) -> bool: """이종 쌍(어선↔운반선)만 True. 동종 어선 공조는 pair_trawl 경로(COOP_FISHING)에서 처리.""" return (role_a == 'FISHING' and role_b == 'CARRIER') or \ (role_b == 'FISHING' and role_a == 'CARRIER') # ────────────────────────────────────────────────────────────── # 내부 헬퍼 # ────────────────────────────────────────────────────────────── def _haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float: R = 3440.065 dlat = math.radians(lat2 - lat1) dlon = math.radians(lon2 - lon1) a = math.sin(dlat/2)**2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon/2)**2 return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) def _within_proximity(a: dict, b: dict) -> bool: """PROXIMITY_DEG 이내인지 (위경도 근사).""" dlat = abs(a['lat'] - b['lat']) if dlat >= PROXIMITY_DEG: return False cos_lat = math.cos(math.radians((a['lat'] + b['lat']) / 2.0)) dlon_scaled = abs(a['lon'] - b['lon']) * cos_lat return dlon_scaled < PROXIMITY_DEG def _within_approach(a: dict, b: dict) -> bool: """APPROACH_DEG 이내인지 (~1km).""" dlat = abs(a['lat'] - b['lat']) if dlat >= APPROACH_DEG: return False cos_lat = math.cos(math.radians((a['lat'] + b['lat']) / 2.0)) dlon_scaled = abs(a['lon'] - b['lon']) * cos_lat return dlon_scaled < APPROACH_DEG def _cell_key(lat: float, lon: float) -> tuple[int, int]: return (int(math.floor(lat / APPROACH_DEG)), int(math.floor(lon / APPROACH_DEG))) def _build_grid(records: list[dict]) -> dict[tuple[int, int], list[int]]: grid: dict[tuple[int, int], list[int]] = {} for idx, rec in enumerate(records): key = _cell_key(rec['lat'], rec['lon']) if key not in grid: grid[key] = [] grid[key].append(idx) return grid def _pair_key(mmsi_a: str, mmsi_b: str) -> tuple[str, str]: return (mmsi_a, mmsi_b) if mmsi_a < mmsi_b else (mmsi_b, mmsi_a) def _is_gear_name(name: Optional[str]) -> bool: if not name: return False return bool(GEAR_PATTERN.match(name)) # ────────────────────────────────────────────────────────────── # Stage 4: 점수 산출 # ────────────────────────────────────────────────────────────── def _score_pair( meta: dict, now: datetime, is_permitted_fn: Optional[Callable[[str], bool]], now_kst_hour: int, zone_id: Optional[str], vessel_info_a: dict, vessel_info_b: dict, ) -> Optional[dict]: """환적 의심 점수 산출. 50점 미만이면 None.""" phase = meta.get('phase', 'APPROACH') pair = meta.get('pair') if not pair: return None # 필수: RENDEZVOUS 90분 이상 rendezvous_start = meta.get('rendezvous_start') if rendezvous_start is None: return None rendezvous_min = (now - rendezvous_start).total_seconds() / 60 if rendezvous_min < RENDEZVOUS_MIN: return None score = 0 # 3단계 패턴 완성도 has_approach = meta.get('approach_detected', False) has_departure = meta.get('departure_detected', False) if has_approach and has_departure: score += 35 elif has_departure or has_approach: score += 25 else: score += 15 # 어선 식별 — pair에서 누가 어선이고 누가 운반선인지 role_a = meta.get('role_a', 'UNKNOWN') role_b = meta.get('role_b', 'UNKNOWN') fishing_mmsi = pair[0] if role_a == 'FISHING' else pair[1] carrier_mmsi = pair[1] if role_a == 'FISHING' else pair[0] # 운반선 무허가 if is_permitted_fn is not None: try: if not is_permitted_fn(carrier_mmsi): score += 15 except Exception: pass # 야간 (20:00~04:00 KST) if now_kst_hour >= 20 or now_kst_hour < 4: score += 10 # 수역 가점 (감시영역 종류에 따라) if zone_id: if 'EEZ' in zone_id.upper() or '001' in zone_id: score += 15 else: score += 10 # 체류 시간 bonus if rendezvous_min >= 180: score += 10 elif rendezvous_min >= 120: score += 5 # heading 병행 판정 (향후 detail API 데이터) heading_a = vessel_info_a.get('heading') heading_b = vessel_info_b.get('heading') if heading_a is not None and heading_b is not None: diff = abs(heading_a - heading_b) if diff > 180: diff = 360 - diff if diff < 20: # 평행 = 접현 가능 score += 10 score = max(0, min(100, score)) if score < 50: return None if score >= 70: severity = 'CRITICAL' else: severity = 'HIGH' lat = meta.get('last_lat', 0) lon = meta.get('last_lon', 0) return { 'pair_a': pair[0], 'pair_b': pair[1], 'fishing_mmsi': fishing_mmsi, 'carrier_mmsi': carrier_mmsi, 'duration_min': int(rendezvous_min), 'severity': severity, 'score': score, 'lat': lat, 'lon': lon, 'zone_id': zone_id, 'phase': phase, 'has_approach': has_approach, 'has_departure': has_departure, } # ────────────────────────────────────────────────────────────── # 공개 API # ────────────────────────────────────────────────────────────── def detect_transshipment( df: pd.DataFrame, pair_history: dict, get_vessel_info: Optional[Callable[[str], dict]] = None, is_permitted: Optional[Callable[[str], bool]] = None, classify_zone_fn: Optional[Callable[[float, float], dict]] = None, now_kst_hour: int = 0, ) -> list[dict]: """환적 의심 쌍 탐지 — 5단계 필터 파이프라인. Args: df: 선박 위치 DataFrame (mmsi, lat, lon, sog 필수) pair_history: 호출 간 유지되는 쌍 상태 dict get_vessel_info: mmsi → {ship_kind_code, ship_ty, name, heading, ...} is_permitted: mmsi → bool (허가 어선 여부) classify_zone_fn: (lat, lon) → {zone, ...} now_kst_hour: 현재 KST 시각 (0~23) Returns: list[dict] — 의심 쌍 (score >= 50만) """ if df.empty: return [] required_cols = {'mmsi', 'lat', 'lon', 'sog'} if required_cols - set(df.columns): return [] now = datetime.now(timezone.utc) # ── Stage 1+2: 후보 필터 ────────────────────────────── # SOG < 2kn 선박만 candidates = df[df['sog'] < SOG_THRESHOLD_KN].copy() if len(candidates) < 2: _evict_expired(pair_history, now) return [] # 선종 + 감시영역 필터 → 유효 레코드만 records: list[dict] = [] for _, row in candidates.iterrows(): mmsi = str(row['mmsi']) lat, lon = float(row['lat']), float(row['lon']) # Stage 2: 감시영역 내 여부 zone_id = _is_in_transship_zone(lat, lon) if zone_id is None: continue # Stage 1: 선종 분류 info = get_vessel_info(mmsi) if get_vessel_info else {} kind = info.get('ship_kind_code', '') ship_ty = info.get('ship_ty', info.get('vessel_type', '')) role = _classify_vessel_role(kind, ship_ty, mmsi) if role in ('EXCLUDED', 'UNKNOWN'): continue # 어구 신호명 제외 if _is_gear_name(info.get('name')): continue rec = { 'mmsi': mmsi, 'lat': lat, 'lon': lon, 'role': role, 'zone_id': zone_id, 'cog': float(row.get('cog', 0)), } records.append(rec) if len(records) < 2: _evict_expired(pair_history, now) return [] # 역할별 분리 fishing_recs = [r for r in records if r['role'] == 'FISHING'] carrier_recs = [r for r in records if r['role'] == 'CARRIER'] if not fishing_recs or not carrier_recs: _evict_expired(pair_history, now) return [] # ── 그리드 기반 근접 쌍 탐지 (어선 × 운반선만) ────── carrier_grid = _build_grid(carrier_recs) active_pairs: dict[tuple[str, str], dict] = {} for f_rec in fishing_recs: f_cell = _cell_key(f_rec['lat'], f_rec['lon']) # 인접 9셀 탐색 for dr in (-1, 0, 1): for dc in (-1, 0, 1): neighbor = (f_cell[0] + dr, f_cell[1] + dc) if neighbor not in carrier_grid: continue for ci in carrier_grid[neighbor]: c_rec = carrier_recs[ci] is_close = _within_proximity(f_rec, c_rec) is_approaching = _within_approach(f_rec, c_rec) if not is_close else False if not is_close and not is_approaching: continue key = _pair_key(f_rec['mmsi'], c_rec['mmsi']) mid_lat = (f_rec['lat'] + c_rec['lat']) / 2 mid_lon = (f_rec['lon'] + c_rec['lon']) / 2 active_pairs[key] = { 'is_close': is_close, 'is_approaching': is_approaching, 'lat': mid_lat, 'lon': mid_lon, 'zone_id': f_rec['zone_id'], 'role_a': 'FISHING' if key[0] == f_rec['mmsi'] else 'CARRIER', 'role_b': 'CARRIER' if key[0] == f_rec['mmsi'] else 'FISHING', } # ── Stage 3: pair_history 상태머신 갱신 ───────────── for key, loc in active_pairs.items(): meta = pair_history.get(key) if meta is None or not isinstance(meta, dict): # 신규 쌍 meta = { 'pair': key, 'phase': 'APPROACH' if loc['is_approaching'] else 'RENDEZVOUS', 'approach_detected': loc['is_approaching'], 'rendezvous_start': now if loc['is_close'] else None, 'departure_detected': False, 'miss_count': 0, 'last_lat': loc['lat'], 'last_lon': loc['lon'], 'zone_id': loc['zone_id'], 'role_a': loc['role_a'], 'role_b': loc['role_b'], } pair_history[key] = meta continue meta['miss_count'] = 0 meta['last_lat'] = loc['lat'] meta['last_lon'] = loc['lon'] if loc['is_close']: if meta['phase'] == 'APPROACH': # 접근 → 체류 전환 meta['phase'] = 'RENDEZVOUS' meta['approach_detected'] = True meta['rendezvous_start'] = meta.get('rendezvous_start') or now elif meta['phase'] == 'DEPARTURE': # 분리 후 재접근 → 체류 재개 meta['phase'] = 'RENDEZVOUS' # RENDEZVOUS 상태 유지 if meta.get('rendezvous_start') is None: meta['rendezvous_start'] = now elif loc['is_approaching']: if meta['phase'] == 'RENDEZVOUS': # 체류 중 이격 시작 → 분리 단계 meta['phase'] = 'DEPARTURE' meta['departure_detected'] = True elif meta['phase'] == 'APPROACH': meta['approach_detected'] = True # 비활성 쌍 miss_count++ for key in list(pair_history.keys()): if key in active_pairs: continue meta = pair_history[key] if not isinstance(meta, dict): del pair_history[key] continue meta['miss_count'] = meta.get('miss_count', 0) + 1 # 체류 중이던 쌍이 miss → 분리 가능 if meta.get('phase') == 'RENDEZVOUS' and meta['miss_count'] >= 2: meta['phase'] = 'DEPARTURE' meta['departure_detected'] = True if meta['miss_count'] > GAP_TOLERANCE_CYCLES: del pair_history[key] _evict_expired(pair_history, now) # ── Stage 4+5: 점수 산출 + 밀집 방폭 ──────────────── raw_suspects: list[dict] = [] for key, meta in pair_history.items(): if not isinstance(meta, dict): continue info_a = get_vessel_info(key[0]) if get_vessel_info else {} info_b = get_vessel_info(key[1]) if get_vessel_info else {} scored = _score_pair( meta, now, is_permitted, now_kst_hour, meta.get('zone_id'), info_a, info_b, ) if scored is not None: raw_suspects.append(scored) # Stage 5: 밀집 방폭 — 1 운반선 : 최대 1 어선 (최고 점수) carrier_best: dict[str, dict] = {} for s in raw_suspects: carrier = s['carrier_mmsi'] if carrier not in carrier_best or s['score'] > carrier_best[carrier]['score']: carrier_best[carrier] = s suspects = list(carrier_best.values()) # 로그 tier_counts = {'CRITICAL': 0, 'HIGH': 0} for s in suspects: tier_counts[s['severity']] = tier_counts.get(s['severity'], 0) + 1 logger.info( 'transshipment: pairs=%d (critical=%d, high=%d), ' 'candidates: fishing=%d carrier=%d, active_pairs=%d, history=%d', len(suspects), tier_counts.get('CRITICAL', 0), tier_counts.get('HIGH', 0), len(fishing_recs), len(carrier_recs), len(active_pairs), len(pair_history), ) return suspects def _evict_expired(pair_history: dict, now: datetime) -> None: """PAIR_EXPIRY_MIN 이상 갱신 없는 쌍 제거.""" expired = [] for key, meta in pair_history.items(): if not isinstance(meta, dict): expired.append(key) continue miss = meta.get('miss_count', 0) if miss > GAP_TOLERANCE_CYCLES: expired.append(key) for key in expired: del pair_history[key]