kcg-ai-monitoring/prediction/algorithms/transshipment.py
htlee e5d123e4c5 feat(prediction): dark 의심 점수화 + transship 베테랑 관점 재설계
12h 누적 분석 결과 dark/transship이 운영 불가 수준으로 판정되어
탐지 철학을 근본부터 전환.

## dark 재설계: 넓은 탐지 + 의도적 OFF 의심 점수화

기존 "필터 제외" 방식에서 "넓게 기록 + 점수 산출 + 등급별 알람"으로 전환.
해경 베테랑 관점의 8가지 패턴을 가점 합산하여 0~100점 산출.

- P1 이동 중 OFF (gap 직전 SOG > 2kn)
- P2 민감 수역 경계 근처 OFF (영해/접속수역/특정조업수역)
- P3 반복 이력 (7일 내 재발) — 가장 강력
- P4 gap 후 이동거리 비정상 (은폐 이동)
- P5 주간 조업 시간 OFF
- P6 gap 직전 이상 행동 (teleport/급변)
- P7 무허가 선박 가점
- P8 장기 gap (3h/6h 구간별)
- 감점: gap 시작 위치가 한국 AIS 수신 커버리지 밖

완전 제외:
- 어구 AIS (GEAR_PATTERN 매칭, fleet_tracker SSOT)
- 한국 선박 (MMSI 440*, 441*) — 해경 관할 아님

등급: CRITICAL(70+) / HIGH(50~69) / WATCH(30~49) / NONE
이벤트는 HIGH 이상만 생성 (WATCH는 DB 저장만).

신규 함수:
- algorithms/dark_vessel.py: analyze_dark_pattern, compute_dark_suspicion
- scheduler.py: _is_dark_excluded, _fetch_dark_history (사이클당 1회 7일 이력 일괄 조회)

pipeline path + lightweight path 모두 동일 로직 적용.
결과는 features JSONB에 {dark_suspicion_score, dark_patterns,
dark_tier, dark_history_7d, dark_history_24h, gap_start_*} 저장.

## transship 재설계: 베테랑 함정근무자 기준

한정된 함정 자원으로 단속 출동을 결정할 수 있는 신뢰도 확보.

상수 재조정:
- SOG_THRESHOLD_KN: 2.0 → 1.0 (완전 정박만)
- PROXIMITY_DEG: 0.001 → 0.0007 (~77m)
- SUSPECT_DURATION_MIN: 60 → 45 (gap tolerance 있음)
- PAIR_EXPIRY_MIN: 120 → 180
- GAP_TOLERANCE_CYCLES: 2 신규 (GPS 노이즈 완화)

필수 조건 (모두 충족):
- 한국 EEZ 관할 수역 이내
- 환적 불가 선종 제외 (passenger/military/tanker/pilot/tug/sar)
- 어구 AIS 양쪽 제외
- 45분 이상 지속 (miss_count 2 사이클까지 용인)

점수 체계 (base 40):
- 야간(KST 20~04): +15
- 무허가 가점: +20
- COG 편차 > 45°: +20 (나란히 가는 선단 배제)
- 지속 ≥ 90분: +20
- 영해/접속수역 위치: +15

등급: CRITICAL(90+) / HIGH(70~89) / WATCH(50~69)
WATCH는 저장 없이 로그만. HIGH/CRITICAL만 이벤트.

pair_history 구조 확장:
- 기존: {(a,b): datetime}
- 신규: {(a,b): {'first_seen', 'last_seen', 'miss_count', 'last_lat/lon/cog_a/cog_b'}}
- miss_count > GAP_TOLERANCE_CYCLES면 삭제 (즉시 리셋 아님)

## event_generator 룰 교체

- dark_vessel_long 룰 제거 → dark_critical, dark_high (features.dark_tier 기반)
- transship 룰 제거 → transship_critical, transship_high (features.transship_tier 기반)
- DEDUP: ILLEGAL_TRANSSHIP 67→181, DARK_VESSEL 127→131, ZONE_DEPARTURE 127→89

## 공통 정리

- scheduler.py의 _gear_re 삭제, fleet_tracker.GEAR_PATTERN 단일 SSOT로 통합
2026-04-09 07:42:15 +09:00

465 lines
16 KiB
Python

"""환적(Transshipment) 의심 선박 탐지 — 서버사이드 O(n log n) 구현.
프론트엔드 useKoreaFilters.ts의 O(n²) 근접 탐지를 대체한다.
scipy 미설치 환경을 고려하여 그리드 기반 공간 인덱스를 사용한다.
알고리즘 개요:
1. 후보 선박 필터: sog < 2kn, 선종 (tanker/cargo/fishing), 외국 해안선 제외
2. 그리드 셀 기반 근접 쌍 탐지: O(n log n) ← 셀 분할 + 인접 9셀 조회
3. pair_history dict로 쌍별 최초 탐지 시각 영속화 (호출 간 유지)
4. 60분 이상 지속 근접 시 의심 쌍으로 판정
"""
from __future__ import annotations
import logging
import math
from datetime import datetime, timezone
from typing import Callable, Optional
import pandas as pd
from fleet_tracker import GEAR_PATTERN
logger = logging.getLogger(__name__)
# ──────────────────────────────────────────────────────────────
# 상수 (2026-04-09 재조정 — 베테랑 관점)
# ──────────────────────────────────────────────────────────────
SOG_THRESHOLD_KN = 1.0 # 2.0 → 1.0 (완전 정박 수준)
PROXIMITY_DEG = 0.0007 # 0.001 → 0.0007 (~77m, GPS 노이즈 포함한 근접)
SUSPECT_DURATION_MIN = 45 # 60 → 45 (gap tolerance 있음)
PAIR_EXPIRY_MIN = 180 # 120 → 180
GAP_TOLERANCE_CYCLES = 2 # 신규: 2 사이클까지 active에서 빠져도 리셋 안 함
# 외국 해안 근접 제외 경계 (레거시 — 관할 필터로 대체됨)
_CN_LON_MAX = 123.5
_JP_LON_MIN = 130.5
_TSUSHIMA_LAT_MIN = 33.8
_TSUSHIMA_LON_MIN = 129.0
# 한국 EEZ 관할 수역 (단속 가능 범위)
_KR_EEZ_LAT = (32.0, 39.5)
_KR_EEZ_LON = (124.0, 132.0)
# 환적 불가능 선종 (여객/군함/유조/도선/예인/수색구조)
_TRANSSHIP_EXCLUDED: frozenset[str] = frozenset({
'passenger', 'military', 'tanker', 'pilot', 'tug', 'sar',
})
# 그리드 셀 크기
_GRID_CELL_DEG = PROXIMITY_DEG
# ──────────────────────────────────────────────────────────────
# 내부 헬퍼
# ──────────────────────────────────────────────────────────────
def _is_near_foreign_coast(lat: float, lon: float) -> bool:
"""외국 해안 근처 여부 — 중국/일본/대마도 경계 확인."""
if lon < _CN_LON_MAX:
return True
if lon > _JP_LON_MIN:
return True
if lat > _TSUSHIMA_LAT_MIN and lon > _TSUSHIMA_LON_MIN:
return True
return False
def _is_in_kr_jurisdiction(lat: float, lon: float) -> bool:
"""한국 EEZ 관할 수역 여부 (단속 가능 범위)."""
return (_KR_EEZ_LAT[0] <= lat <= _KR_EEZ_LAT[1]
and _KR_EEZ_LON[0] <= lon <= _KR_EEZ_LON[1])
def _is_candidate_ship_type(vessel_type_a: Optional[str], vessel_type_b: Optional[str]) -> bool:
"""환적 후보 선종인지 (명시적 제외만 차단, 미상은 허용)."""
a = (vessel_type_a or '').strip().lower()
b = (vessel_type_b or '').strip().lower()
if a in _TRANSSHIP_EXCLUDED or b in _TRANSSHIP_EXCLUDED:
return False
return True
def _is_gear_name(name: Optional[str]) -> bool:
"""어구 이름 패턴 매칭 — fleet_tracker.GEAR_PATTERN SSOT."""
if not name:
return False
return bool(GEAR_PATTERN.match(name))
def _cell_key(lat: float, lon: float) -> tuple[int, int]:
"""위도/경도를 그리드 셀 인덱스로 변환."""
return (int(math.floor(lat / _GRID_CELL_DEG)),
int(math.floor(lon / _GRID_CELL_DEG)))
def _build_grid(records: list[dict]) -> dict[tuple[int, int], list[int]]:
"""선박 리스트를 그리드 셀로 분류.
Returns: {(row, col): [record index, ...]}
"""
grid: dict[tuple[int, int], list[int]] = {}
for idx, rec in enumerate(records):
key = _cell_key(rec['lat'], rec['lon'])
if key not in grid:
grid[key] = []
grid[key].append(idx)
return grid
def _within_proximity(a: dict, b: dict) -> bool:
"""두 선박이 PROXIMITY_DEG 이내인지 확인 (위경도 직교 근사)."""
dlat = abs(a['lat'] - b['lat'])
if dlat >= PROXIMITY_DEG:
return False
cos_lat = math.cos(math.radians((a['lat'] + b['lat']) / 2.0))
dlon_scaled = abs(a['lon'] - b['lon']) * cos_lat
return dlon_scaled < PROXIMITY_DEG
def _normalize_type(raw: Optional[str]) -> str:
"""선종 문자열 소문자 정규화."""
if not raw:
return ''
return raw.strip().lower()
def _pair_key(mmsi_a: str, mmsi_b: str) -> tuple[str, str]:
"""MMSI 순서를 정규화하여 중복 쌍 방지."""
return (mmsi_a, mmsi_b) if mmsi_a < mmsi_b else (mmsi_b, mmsi_a)
def _evict_expired_pairs(
pair_history: dict,
now: datetime,
) -> None:
"""PAIR_EXPIRY_MIN 이상 갱신 없는 pair_history 항목 제거.
새 구조: {(a,b): {'first_seen': dt, 'last_seen': dt, 'miss_count': int}}
"""
expired = []
for key, meta in pair_history.items():
if not isinstance(meta, dict):
# 레거시 구조 (datetime 직접 저장)는 즉시 제거 → 다음 사이클에서 재구성
expired.append(key)
continue
last_seen = meta.get('last_seen') or meta.get('first_seen')
if last_seen is None:
expired.append(key)
continue
if (now - last_seen).total_seconds() / 60 > PAIR_EXPIRY_MIN:
expired.append(key)
for key in expired:
del pair_history[key]
# ──────────────────────────────────────────────────────────────
# 공개 API
# ──────────────────────────────────────────────────────────────
def _score_pair(
pair: tuple[str, str],
meta: dict,
lat: float,
lon: float,
cog_a: Optional[float],
cog_b: Optional[float],
vessel_info_a: dict,
vessel_info_b: dict,
is_permitted_fn: Optional[Callable[[str], bool]],
now_kst_hour: int,
zone_code: Optional[str],
now: datetime,
) -> Optional[dict]:
"""환적 의심 pair에 대해 점수 산출 + severity 반환.
필수 조건 실패 시 None. WATCH 이상이면 dict 반환.
"""
# 필수 1: 한국 관할 수역
if not _is_in_kr_jurisdiction(lat, lon):
return None
# 필수 2: 선종 필터
if not _is_candidate_ship_type(
vessel_info_a.get('vessel_type'),
vessel_info_b.get('vessel_type'),
):
return None
# 필수 3: 어구 제외
if _is_gear_name(vessel_info_a.get('name')) or _is_gear_name(vessel_info_b.get('name')):
return None
# 필수 4: 지속 시간
first_seen = meta.get('first_seen')
if first_seen is None:
return None
duration_min = int((now - first_seen).total_seconds() / 60)
if duration_min < SUSPECT_DURATION_MIN:
return None
score = 40 # base
# 야간 가점 (KST 20:00~04:00)
if now_kst_hour >= 20 or now_kst_hour < 4:
score += 15
# 무허가 가점
if is_permitted_fn is not None:
try:
if not is_permitted_fn(pair[0]) or not is_permitted_fn(pair[1]):
score += 20
except Exception:
pass
# COG 편차 (같은 방향 아니면 가점 — 나란히 가는 선단 배제)
if cog_a is not None and cog_b is not None:
try:
diff = abs(float(cog_a) - float(cog_b))
if diff > 180:
diff = 360 - diff
if diff > 45:
score += 20
except Exception:
pass
# 지속 길이 추가 가점
if duration_min >= 90:
score += 20
# 영해/접속수역 추가 가점
if zone_code in ('TERRITORIAL_SEA', 'CONTIGUOUS_ZONE'):
score += 15
if score >= 90:
severity = 'CRITICAL'
elif score >= 70:
severity = 'HIGH'
elif score >= 50:
severity = 'WATCH'
else:
return None
return {
'pair_a': pair[0],
'pair_b': pair[1],
'duration_min': duration_min,
'severity': severity,
'score': score,
'lat': lat,
'lon': lon,
}
def detect_transshipment(
df: pd.DataFrame,
pair_history: dict,
get_vessel_info: Optional[Callable[[str], dict]] = None,
is_permitted: Optional[Callable[[str], bool]] = None,
classify_zone_fn: Optional[Callable[[float, float], dict]] = None,
now_kst_hour: int = 0,
) -> list[dict]:
"""환적 의심 쌍 탐지 (점수 기반, 베테랑 관점 필터).
Args:
df: 선박 위치 DataFrame.
필수 컬럼: mmsi, lat, lon, sog
선택 컬럼: cog
pair_history: {(a,b): {'first_seen', 'last_seen', 'miss_count'}}
get_vessel_info: callable(mmsi) -> {'name', 'vessel_type', ...}
is_permitted: callable(mmsi) -> bool
classify_zone_fn: callable(lat, lon) -> dict (zone 판정)
now_kst_hour: 현재 KST 시각 (0~23)
Returns:
list[dict] — severity 'CRITICAL'/'HIGH'/'WATCH' 포함 의심 쌍
"""
if df.empty:
return []
required_cols = {'mmsi', 'lat', 'lon', 'sog'}
missing = required_cols - set(df.columns)
if missing:
logger.error('detect_transshipment: missing required columns: %s', missing)
return []
now = datetime.now(timezone.utc)
# ── 1. 후보 선박 필터 (SOG < 1.0) ─────────────────────────
candidate_mask = df['sog'] < SOG_THRESHOLD_KN
candidates = df[candidate_mask].copy()
if candidates.empty:
_evict_expired_pairs(pair_history, now)
return []
# 외국 해안 근처 제외 (1차 필터)
coast_mask = candidates.apply(
lambda row: not _is_near_foreign_coast(row['lat'], row['lon']),
axis=1,
)
candidates = candidates[coast_mask]
if len(candidates) < 2:
_evict_expired_pairs(pair_history, now)
return []
has_cog = 'cog' in candidates.columns
cols = ['mmsi', 'lat', 'lon']
if has_cog:
cols.append('cog')
records = candidates[cols].to_dict('records')
for rec in records:
rec['mmsi'] = str(rec['mmsi'])
# ── 2. 그리드 기반 근접 쌍 탐지 (77m) ───────────────────
grid = _build_grid(records)
active_pairs: dict[tuple[str, str], dict] = {}
def _try_add_pair(a_rec, b_rec):
if not _within_proximity(a_rec, b_rec):
return
key = _pair_key(a_rec['mmsi'], b_rec['mmsi'])
# 중점 좌표 (점수 산출용)
mid_lat = (a_rec['lat'] + b_rec['lat']) / 2.0
mid_lon = (a_rec['lon'] + b_rec['lon']) / 2.0
active_pairs[key] = {
'lat': mid_lat, 'lon': mid_lon,
'cog_a': a_rec.get('cog'), 'cog_b': b_rec.get('cog'),
# mmsi_a < mmsi_b 순서로 정렬되었으므로 cog도 맞춰 정렬 필요
'mmsi_a': a_rec['mmsi'], 'mmsi_b': b_rec['mmsi'],
}
for (row, col), indices in grid.items():
for i in range(len(indices)):
for j in range(i + 1, len(indices)):
_try_add_pair(records[indices[i]], records[indices[j]])
for dr, dc in ((0, 1), (1, -1), (1, 0), (1, 1)):
neighbor_key = (row + dr, col + dc)
if neighbor_key not in grid:
continue
for ai in indices:
for bi in grid[neighbor_key]:
_try_add_pair(records[ai], records[bi])
# ── 3. pair_history 갱신 (gap tolerance) ─────────────────
active_keys = set(active_pairs.keys())
# 활성 쌍 → 등록/갱신
for pair in active_keys:
if pair not in pair_history or not isinstance(pair_history[pair], dict):
pair_history[pair] = {
'first_seen': now,
'last_seen': now,
'miss_count': 0,
}
else:
pair_history[pair]['last_seen'] = now
pair_history[pair]['miss_count'] = 0
# 비활성 쌍 → miss_count++ , GAP_TOLERANCE 초과 시 삭제
for key in list(pair_history.keys()):
if key in active_keys:
continue
meta = pair_history[key]
if not isinstance(meta, dict):
del pair_history[key]
continue
meta['miss_count'] = meta.get('miss_count', 0) + 1
if meta['miss_count'] > GAP_TOLERANCE_CYCLES:
del pair_history[key]
# 만료 정리
_evict_expired_pairs(pair_history, now)
# ── 4. 점수 기반 의심 쌍 판정 ─────────────────────────────
suspects: list[dict] = []
rejected_jurisdiction = 0
rejected_ship_type = 0
rejected_gear = 0
rejected_duration = 0
for pair, meta in pair_history.items():
if not isinstance(meta, dict):
continue
first_seen = meta.get('first_seen')
if first_seen is None:
continue
# active_pairs에 있으면 해당 사이클 좌표·cog 사용, 없으면 이전 값 재사용 (miss 중)
loc_meta = active_pairs.get(pair)
if loc_meta is not None:
lat = loc_meta['lat']
lon = loc_meta['lon']
# mmsi_a, mmsi_b 순서를 pair 순서에 맞춤
if loc_meta['mmsi_a'] == pair[0]:
cog_a, cog_b = loc_meta.get('cog_a'), loc_meta.get('cog_b')
else:
cog_a, cog_b = loc_meta.get('cog_b'), loc_meta.get('cog_a')
meta['last_lat'] = lat
meta['last_lon'] = lon
meta['last_cog_a'] = cog_a
meta['last_cog_b'] = cog_b
else:
lat = meta.get('last_lat')
lon = meta.get('last_lon')
cog_a = meta.get('last_cog_a')
cog_b = meta.get('last_cog_b')
if lat is None or lon is None:
continue
# 선박 정보 조회
info_a = get_vessel_info(pair[0]) if get_vessel_info else {}
info_b = get_vessel_info(pair[1]) if get_vessel_info else {}
# 짧게 pre-check (로깅용)
if not _is_in_kr_jurisdiction(lat, lon):
rejected_jurisdiction += 1
continue
if not _is_candidate_ship_type(info_a.get('vessel_type'), info_b.get('vessel_type')):
rejected_ship_type += 1
continue
if _is_gear_name(info_a.get('name')) or _is_gear_name(info_b.get('name')):
rejected_gear += 1
continue
duration_min = int((now - first_seen).total_seconds() / 60)
if duration_min < SUSPECT_DURATION_MIN:
rejected_duration += 1
continue
zone_code = None
if classify_zone_fn is not None:
try:
zone_code = classify_zone_fn(lat, lon).get('zone')
except Exception:
pass
scored = _score_pair(
pair, meta, lat, lon, cog_a, cog_b,
info_a, info_b, is_permitted,
now_kst_hour, zone_code, now,
)
if scored is not None:
suspects.append(scored)
tier_counts = {'CRITICAL': 0, 'HIGH': 0, 'WATCH': 0}
for s in suspects:
tier_counts[s['severity']] = tier_counts.get(s['severity'], 0) + 1
logger.info(
'transshipment detection: pairs=%d (critical=%d, high=%d, watch=%d, '
'rejected_jurisdiction=%d, rejected_ship_type=%d, rejected_gear=%d, '
'rejected_duration=%d, candidates=%d)',
len(suspects),
tier_counts.get('CRITICAL', 0),
tier_counts.get('HIGH', 0),
tier_counts.get('WATCH', 0),
rejected_jurisdiction,
rejected_ship_type,
rejected_gear,
rejected_duration,
len(candidates),
)
return suspects