kcg-ai-monitoring/prediction/algorithms/transshipment.py
htlee 8ff04a8cca feat(prediction): DAR-03 탐지 로직 보강 + 한중어업협정 906척 레지스트리 적재
- V029: kcg.fishery_permit_cn 신규 테이블(연단위, permit_year+permit_no 복합 유니크) + fleet_vessels permit_year/fishery_code 컬럼
- load_fishery_permit_cn.py: xls → DB 적재 스크립트, 906척 + 497 신청인사 upsert
- G-04/G-05/G-06 Dead code 해결: classify_gear_violations 호출 연결, dir() 버그 제거
- find_pair_candidates: bbox 1차 + 궤적 유사도(location/sog_corr/cog_alignment) 2차, role 가점
- spoofing 산식 교체: 1시간 윈도우 + teleport 절대 가점 + extreme 50kn 단독 0.6 확정
- transshipment 선종 완화: shipTy 부분일치 + 412* FISHING 간주
- gear_code DB write 경로 신설 + fleet_tracker API 3개 추가
- cron 스크립트: fishery_permit/pair_type/fleet_role 신규 섹션
2026-04-16 07:43:24 +09:00

542 lines
20 KiB
Python

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""환적(Transshipment) 의심 선박 탐지 — 5단계 필터 파이프라인.
실무 목표: 일일 10건 미만 고신뢰 의심 건.
5단계 필터:
Stage 1: 이종 쌍 필수 (어선 ↔ 운반선) — shipKindCode 기반
Stage 2: 감시영역(Monitoring Zone) 내 선박만 대상
Stage 3: 3단계 패턴 검증 (APPROACH → RENDEZVOUS → DEPARTURE)
Stage 4: 점수 산출 (0~100, 50점 미만 미출력)
Stage 5: 밀집 방폭 (1 운반선 : 최대 1 어선)
"""
from __future__ import annotations
import json
import logging
import math
import os
from datetime import datetime, timezone
from typing import Callable, Optional
import pandas as pd
from fleet_tracker import GEAR_PATTERN
logger = logging.getLogger(__name__)
# ──────────────────────────────────────────────────────────────
# 상수
# ──────────────────────────────────────────────────────────────
SOG_THRESHOLD_KN = 2.0 # 저속 기준 (접현 가능 속도)
PROXIMITY_DEG = 0.002 # ~220m (0.0007 → 0.002 상향: 접현 가능 범위)
APPROACH_DEG = 0.01 # ~1.1km (접근 판정 거리)
RENDEZVOUS_MIN = 90 # 체류 최소 시간 (45 → 90분)
PAIR_EXPIRY_MIN = 240 # 쌍 만료 시간
GAP_TOLERANCE_CYCLES = 3 # 3 사이클(15분)까지 miss 허용
# 선종 분류 (shipKindCode 기반)
_FISHING_KINDS = frozenset({'000020'})
# 운반선: 화물선(000023) + 유조선(000024)만. 000027(기타)은 shipTy로 2차 판정
_CARRIER_KINDS = frozenset({'000023', '000024'})
# 환적 불가 선종 (shipTy 텍스트 기반 2차 필터)
_EXCLUDED_SHIP_TY = frozenset({
'Tug', 'Pilot Boat', 'Search And Rescue', 'Law Enforcement',
'AtoN', 'Anti Pollution', 'Passenger', 'Medical Transport',
})
# shipTy 텍스트에 포함되면 CARRIER 로 승격 (부분일치, 대소문자 무시)
_CARRIER_HINTS = ('cargo', 'tanker', 'supply', 'carrier', 'reefer')
# ──────────────────────────────────────────────────────────────
# 감시영역 로드
# ──────────────────────────────────────────────────────────────
_ZONES_FILE = os.path.join(os.path.dirname(__file__), '..', 'data', 'monitoring_zones.json')
_TRANSSHIP_ZONES: list[dict] = []
def _load_monitoring_zones() -> None:
global _TRANSSHIP_ZONES
try:
with open(_ZONES_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
_TRANSSHIP_ZONES = [
z for z in data.get('zones', [])
if z.get('type') == 'TRANSSHIP' and z.get('enabled')
]
logger.info('loaded %d transship monitoring zones', len(_TRANSSHIP_ZONES))
except Exception as e:
logger.warning('failed to load monitoring zones: %s', e)
_TRANSSHIP_ZONES = []
_load_monitoring_zones()
def _point_in_polygon(lat: float, lon: float, polygon: list[list[float]]) -> bool:
"""Ray-casting point-in-polygon. polygon: [[lon, lat], ...]"""
n = len(polygon)
inside = False
j = n - 1
for i in range(n):
xi, yi = polygon[i][0], polygon[i][1] # lon, lat
xj, yj = polygon[j][0], polygon[j][1]
if ((yi > lat) != (yj > lat)) and (lon < (xj - xi) * (lat - yi) / (yj - yi) + xi):
inside = not inside
j = i
return inside
def _is_in_transship_zone(lat: float, lon: float) -> Optional[str]:
"""감시영역 내 여부. 해당 zone ID 반환, 미해당 시 None."""
for zone in _TRANSSHIP_ZONES:
if _point_in_polygon(lat, lon, zone['polygon']):
return zone['id']
return None
# ──────────────────────────────────────────────────────────────
# Stage 1: 이종 쌍 필터
# ──────────────────────────────────────────────────────────────
def _classify_vessel_role(
ship_kind_code: str,
ship_ty: str,
mmsi: str = '',
) -> str:
"""선박 역할 분류: 'FISHING', 'CARRIER', 'EXCLUDED', 'UNKNOWN'"""
st_lower = ship_ty.lower() if isinstance(ship_ty, str) else ''
if ship_kind_code in _FISHING_KINDS:
return 'FISHING'
if ship_kind_code in _CARRIER_KINDS:
if ship_ty in _EXCLUDED_SHIP_TY:
return 'EXCLUDED'
return 'CARRIER'
# 000027/000028/기타: shipTy 텍스트 부분일치로 완화
if ship_kind_code in ('000027', '000028', ''):
if ship_ty in _EXCLUDED_SHIP_TY:
return 'EXCLUDED'
if any(h in st_lower for h in _CARRIER_HINTS):
return 'CARRIER'
if 'fishing' in st_lower:
return 'FISHING'
# 중국 412* 허가어선은 ship_kind/shipTy 불명이어도 FISHING 간주
if mmsi.startswith('412'):
return 'FISHING'
return 'UNKNOWN'
# 000021(함정), 000022(여객), 000025(관공) → 제외
if ship_kind_code in ('000021', '000022', '000025'):
return 'EXCLUDED'
return 'UNKNOWN'
def _is_transship_pair(role_a: str, role_b: str) -> bool:
"""이종 쌍(어선↔운반선)만 True. 동종 어선 공조는 pair_trawl 경로(COOP_FISHING)에서 처리."""
return (role_a == 'FISHING' and role_b == 'CARRIER') or \
(role_b == 'FISHING' and role_a == 'CARRIER')
# ──────────────────────────────────────────────────────────────
# 내부 헬퍼
# ──────────────────────────────────────────────────────────────
def _haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
R = 3440.065
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat/2)**2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon/2)**2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def _within_proximity(a: dict, b: dict) -> bool:
"""PROXIMITY_DEG 이내인지 (위경도 근사)."""
dlat = abs(a['lat'] - b['lat'])
if dlat >= PROXIMITY_DEG:
return False
cos_lat = math.cos(math.radians((a['lat'] + b['lat']) / 2.0))
dlon_scaled = abs(a['lon'] - b['lon']) * cos_lat
return dlon_scaled < PROXIMITY_DEG
def _within_approach(a: dict, b: dict) -> bool:
"""APPROACH_DEG 이내인지 (~1km)."""
dlat = abs(a['lat'] - b['lat'])
if dlat >= APPROACH_DEG:
return False
cos_lat = math.cos(math.radians((a['lat'] + b['lat']) / 2.0))
dlon_scaled = abs(a['lon'] - b['lon']) * cos_lat
return dlon_scaled < APPROACH_DEG
def _cell_key(lat: float, lon: float) -> tuple[int, int]:
return (int(math.floor(lat / APPROACH_DEG)),
int(math.floor(lon / APPROACH_DEG)))
def _build_grid(records: list[dict]) -> dict[tuple[int, int], list[int]]:
grid: dict[tuple[int, int], list[int]] = {}
for idx, rec in enumerate(records):
key = _cell_key(rec['lat'], rec['lon'])
if key not in grid:
grid[key] = []
grid[key].append(idx)
return grid
def _pair_key(mmsi_a: str, mmsi_b: str) -> tuple[str, str]:
return (mmsi_a, mmsi_b) if mmsi_a < mmsi_b else (mmsi_b, mmsi_a)
def _is_gear_name(name: Optional[str]) -> bool:
if not name:
return False
return bool(GEAR_PATTERN.match(name))
# ──────────────────────────────────────────────────────────────
# Stage 4: 점수 산출
# ──────────────────────────────────────────────────────────────
def _score_pair(
meta: dict,
now: datetime,
is_permitted_fn: Optional[Callable[[str], bool]],
now_kst_hour: int,
zone_id: Optional[str],
vessel_info_a: dict,
vessel_info_b: dict,
) -> Optional[dict]:
"""환적 의심 점수 산출. 50점 미만이면 None."""
phase = meta.get('phase', 'APPROACH')
pair = meta.get('pair')
if not pair:
return None
# 필수: RENDEZVOUS 90분 이상
rendezvous_start = meta.get('rendezvous_start')
if rendezvous_start is None:
return None
rendezvous_min = (now - rendezvous_start).total_seconds() / 60
if rendezvous_min < RENDEZVOUS_MIN:
return None
score = 0
# 3단계 패턴 완성도
has_approach = meta.get('approach_detected', False)
has_departure = meta.get('departure_detected', False)
if has_approach and has_departure:
score += 35
elif has_departure or has_approach:
score += 25
else:
score += 15
# 어선 식별 — pair에서 누가 어선이고 누가 운반선인지
role_a = meta.get('role_a', 'UNKNOWN')
role_b = meta.get('role_b', 'UNKNOWN')
fishing_mmsi = pair[0] if role_a == 'FISHING' else pair[1]
carrier_mmsi = pair[1] if role_a == 'FISHING' else pair[0]
# 운반선 무허가
if is_permitted_fn is not None:
try:
if not is_permitted_fn(carrier_mmsi):
score += 15
except Exception:
pass
# 야간 (20:00~04:00 KST)
if now_kst_hour >= 20 or now_kst_hour < 4:
score += 10
# 수역 가점 (감시영역 종류에 따라)
if zone_id:
if 'EEZ' in zone_id.upper() or '001' in zone_id:
score += 15
else:
score += 10
# 체류 시간 bonus
if rendezvous_min >= 180:
score += 10
elif rendezvous_min >= 120:
score += 5
# heading 병행 판정 (향후 detail API 데이터)
heading_a = vessel_info_a.get('heading')
heading_b = vessel_info_b.get('heading')
if heading_a is not None and heading_b is not None:
diff = abs(heading_a - heading_b)
if diff > 180:
diff = 360 - diff
if diff < 20: # 평행 = 접현 가능
score += 10
score = max(0, min(100, score))
if score < 50:
return None
if score >= 70:
severity = 'CRITICAL'
else:
severity = 'HIGH'
lat = meta.get('last_lat', 0)
lon = meta.get('last_lon', 0)
return {
'pair_a': pair[0],
'pair_b': pair[1],
'fishing_mmsi': fishing_mmsi,
'carrier_mmsi': carrier_mmsi,
'duration_min': int(rendezvous_min),
'severity': severity,
'score': score,
'lat': lat,
'lon': lon,
'zone_id': zone_id,
'phase': phase,
'has_approach': has_approach,
'has_departure': has_departure,
}
# ──────────────────────────────────────────────────────────────
# 공개 API
# ──────────────────────────────────────────────────────────────
def detect_transshipment(
df: pd.DataFrame,
pair_history: dict,
get_vessel_info: Optional[Callable[[str], dict]] = None,
is_permitted: Optional[Callable[[str], bool]] = None,
classify_zone_fn: Optional[Callable[[float, float], dict]] = None,
now_kst_hour: int = 0,
) -> list[dict]:
"""환적 의심 쌍 탐지 — 5단계 필터 파이프라인.
Args:
df: 선박 위치 DataFrame (mmsi, lat, lon, sog 필수)
pair_history: 호출 간 유지되는 쌍 상태 dict
get_vessel_info: mmsi → {ship_kind_code, ship_ty, name, heading, ...}
is_permitted: mmsi → bool (허가 어선 여부)
classify_zone_fn: (lat, lon) → {zone, ...}
now_kst_hour: 현재 KST 시각 (0~23)
Returns:
list[dict] — 의심 쌍 (score >= 50만)
"""
if df.empty:
return []
required_cols = {'mmsi', 'lat', 'lon', 'sog'}
if required_cols - set(df.columns):
return []
now = datetime.now(timezone.utc)
# ── Stage 1+2: 후보 필터 ──────────────────────────────
# SOG < 2kn 선박만
candidates = df[df['sog'] < SOG_THRESHOLD_KN].copy()
if len(candidates) < 2:
_evict_expired(pair_history, now)
return []
# 선종 + 감시영역 필터 → 유효 레코드만
records: list[dict] = []
for _, row in candidates.iterrows():
mmsi = str(row['mmsi'])
lat, lon = float(row['lat']), float(row['lon'])
# Stage 2: 감시영역 내 여부
zone_id = _is_in_transship_zone(lat, lon)
if zone_id is None:
continue
# Stage 1: 선종 분류
info = get_vessel_info(mmsi) if get_vessel_info else {}
kind = info.get('ship_kind_code', '')
ship_ty = info.get('ship_ty', info.get('vessel_type', ''))
role = _classify_vessel_role(kind, ship_ty, mmsi)
if role in ('EXCLUDED', 'UNKNOWN'):
continue
# 어구 신호명 제외
if _is_gear_name(info.get('name')):
continue
rec = {
'mmsi': mmsi, 'lat': lat, 'lon': lon,
'role': role, 'zone_id': zone_id,
'cog': float(row.get('cog', 0)),
}
records.append(rec)
if len(records) < 2:
_evict_expired(pair_history, now)
return []
# 역할별 분리
fishing_recs = [r for r in records if r['role'] == 'FISHING']
carrier_recs = [r for r in records if r['role'] == 'CARRIER']
if not fishing_recs or not carrier_recs:
_evict_expired(pair_history, now)
return []
# ── 그리드 기반 근접 쌍 탐지 (어선 × 운반선만) ──────
carrier_grid = _build_grid(carrier_recs)
active_pairs: dict[tuple[str, str], dict] = {}
for f_rec in fishing_recs:
f_cell = _cell_key(f_rec['lat'], f_rec['lon'])
# 인접 9셀 탐색
for dr in (-1, 0, 1):
for dc in (-1, 0, 1):
neighbor = (f_cell[0] + dr, f_cell[1] + dc)
if neighbor not in carrier_grid:
continue
for ci in carrier_grid[neighbor]:
c_rec = carrier_recs[ci]
is_close = _within_proximity(f_rec, c_rec)
is_approaching = _within_approach(f_rec, c_rec) if not is_close else False
if not is_close and not is_approaching:
continue
key = _pair_key(f_rec['mmsi'], c_rec['mmsi'])
mid_lat = (f_rec['lat'] + c_rec['lat']) / 2
mid_lon = (f_rec['lon'] + c_rec['lon']) / 2
active_pairs[key] = {
'is_close': is_close,
'is_approaching': is_approaching,
'lat': mid_lat, 'lon': mid_lon,
'zone_id': f_rec['zone_id'],
'role_a': 'FISHING' if key[0] == f_rec['mmsi'] else 'CARRIER',
'role_b': 'CARRIER' if key[0] == f_rec['mmsi'] else 'FISHING',
}
# ── Stage 3: pair_history 상태머신 갱신 ─────────────
for key, loc in active_pairs.items():
meta = pair_history.get(key)
if meta is None or not isinstance(meta, dict):
# 신규 쌍
meta = {
'pair': key,
'phase': 'APPROACH' if loc['is_approaching'] else 'RENDEZVOUS',
'approach_detected': loc['is_approaching'],
'rendezvous_start': now if loc['is_close'] else None,
'departure_detected': False,
'miss_count': 0,
'last_lat': loc['lat'],
'last_lon': loc['lon'],
'zone_id': loc['zone_id'],
'role_a': loc['role_a'],
'role_b': loc['role_b'],
}
pair_history[key] = meta
continue
meta['miss_count'] = 0
meta['last_lat'] = loc['lat']
meta['last_lon'] = loc['lon']
if loc['is_close']:
if meta['phase'] == 'APPROACH':
# 접근 → 체류 전환
meta['phase'] = 'RENDEZVOUS'
meta['approach_detected'] = True
meta['rendezvous_start'] = meta.get('rendezvous_start') or now
elif meta['phase'] == 'DEPARTURE':
# 분리 후 재접근 → 체류 재개
meta['phase'] = 'RENDEZVOUS'
# RENDEZVOUS 상태 유지
if meta.get('rendezvous_start') is None:
meta['rendezvous_start'] = now
elif loc['is_approaching']:
if meta['phase'] == 'RENDEZVOUS':
# 체류 중 이격 시작 → 분리 단계
meta['phase'] = 'DEPARTURE'
meta['departure_detected'] = True
elif meta['phase'] == 'APPROACH':
meta['approach_detected'] = True
# 비활성 쌍 miss_count++
for key in list(pair_history.keys()):
if key in active_pairs:
continue
meta = pair_history[key]
if not isinstance(meta, dict):
del pair_history[key]
continue
meta['miss_count'] = meta.get('miss_count', 0) + 1
# 체류 중이던 쌍이 miss → 분리 가능
if meta.get('phase') == 'RENDEZVOUS' and meta['miss_count'] >= 2:
meta['phase'] = 'DEPARTURE'
meta['departure_detected'] = True
if meta['miss_count'] > GAP_TOLERANCE_CYCLES:
del pair_history[key]
_evict_expired(pair_history, now)
# ── Stage 4+5: 점수 산출 + 밀집 방폭 ────────────────
raw_suspects: list[dict] = []
for key, meta in pair_history.items():
if not isinstance(meta, dict):
continue
info_a = get_vessel_info(key[0]) if get_vessel_info else {}
info_b = get_vessel_info(key[1]) if get_vessel_info else {}
scored = _score_pair(
meta, now, is_permitted, now_kst_hour,
meta.get('zone_id'), info_a, info_b,
)
if scored is not None:
raw_suspects.append(scored)
# Stage 5: 밀집 방폭 — 1 운반선 : 최대 1 어선 (최고 점수)
carrier_best: dict[str, dict] = {}
for s in raw_suspects:
carrier = s['carrier_mmsi']
if carrier not in carrier_best or s['score'] > carrier_best[carrier]['score']:
carrier_best[carrier] = s
suspects = list(carrier_best.values())
# 로그
tier_counts = {'CRITICAL': 0, 'HIGH': 0}
for s in suspects:
tier_counts[s['severity']] = tier_counts.get(s['severity'], 0) + 1
logger.info(
'transshipment: pairs=%d (critical=%d, high=%d), '
'candidates: fishing=%d carrier=%d, active_pairs=%d, history=%d',
len(suspects), tier_counts.get('CRITICAL', 0), tier_counts.get('HIGH', 0),
len(fishing_recs), len(carrier_recs), len(active_pairs), len(pair_history),
)
return suspects
def _evict_expired(pair_history: dict, now: datetime) -> None:
"""PAIR_EXPIRY_MIN 이상 갱신 없는 쌍 제거."""
expired = []
for key, meta in pair_history.items():
if not isinstance(meta, dict):
expired.append(key)
continue
miss = meta.get('miss_count', 0)
if miss > GAP_TOLERANCE_CYCLES:
expired.append(key)
for key in expired:
del pair_history[key]