kcg-ai-monitoring/prediction/algorithms/dark_vessel.py
htlee e5d123e4c5 feat(prediction): dark 의심 점수화 + transship 베테랑 관점 재설계
12h 누적 분석 결과 dark/transship이 운영 불가 수준으로 판정되어
탐지 철학을 근본부터 전환.

## dark 재설계: 넓은 탐지 + 의도적 OFF 의심 점수화

기존 "필터 제외" 방식에서 "넓게 기록 + 점수 산출 + 등급별 알람"으로 전환.
해경 베테랑 관점의 8가지 패턴을 가점 합산하여 0~100점 산출.

- P1 이동 중 OFF (gap 직전 SOG > 2kn)
- P2 민감 수역 경계 근처 OFF (영해/접속수역/특정조업수역)
- P3 반복 이력 (7일 내 재발) — 가장 강력
- P4 gap 후 이동거리 비정상 (은폐 이동)
- P5 주간 조업 시간 OFF
- P6 gap 직전 이상 행동 (teleport/급변)
- P7 무허가 선박 가점
- P8 장기 gap (3h/6h 구간별)
- 감점: gap 시작 위치가 한국 AIS 수신 커버리지 밖

완전 제외:
- 어구 AIS (GEAR_PATTERN 매칭, fleet_tracker SSOT)
- 한국 선박 (MMSI 440*, 441*) — 해경 관할 아님

등급: CRITICAL(70+) / HIGH(50~69) / WATCH(30~49) / NONE
이벤트는 HIGH 이상만 생성 (WATCH는 DB 저장만).

신규 함수:
- algorithms/dark_vessel.py: analyze_dark_pattern, compute_dark_suspicion
- scheduler.py: _is_dark_excluded, _fetch_dark_history (사이클당 1회 7일 이력 일괄 조회)

pipeline path + lightweight path 모두 동일 로직 적용.
결과는 features JSONB에 {dark_suspicion_score, dark_patterns,
dark_tier, dark_history_7d, dark_history_24h, gap_start_*} 저장.

## transship 재설계: 베테랑 함정근무자 기준

한정된 함정 자원으로 단속 출동을 결정할 수 있는 신뢰도 확보.

상수 재조정:
- SOG_THRESHOLD_KN: 2.0 → 1.0 (완전 정박만)
- PROXIMITY_DEG: 0.001 → 0.0007 (~77m)
- SUSPECT_DURATION_MIN: 60 → 45 (gap tolerance 있음)
- PAIR_EXPIRY_MIN: 120 → 180
- GAP_TOLERANCE_CYCLES: 2 신규 (GPS 노이즈 완화)

필수 조건 (모두 충족):
- 한국 EEZ 관할 수역 이내
- 환적 불가 선종 제외 (passenger/military/tanker/pilot/tug/sar)
- 어구 AIS 양쪽 제외
- 45분 이상 지속 (miss_count 2 사이클까지 용인)

점수 체계 (base 40):
- 야간(KST 20~04): +15
- 무허가 가점: +20
- COG 편차 > 45°: +20 (나란히 가는 선단 배제)
- 지속 ≥ 90분: +20
- 영해/접속수역 위치: +15

등급: CRITICAL(90+) / HIGH(70~89) / WATCH(50~69)
WATCH는 저장 없이 로그만. HIGH/CRITICAL만 이벤트.

pair_history 구조 확장:
- 기존: {(a,b): datetime}
- 신규: {(a,b): {'first_seen', 'last_seen', 'miss_count', 'last_lat/lon/cog_a/cog_b'}}
- miss_count > GAP_TOLERANCE_CYCLES면 삭제 (즉시 리셋 아님)

## event_generator 룰 교체

- dark_vessel_long 룰 제거 → dark_critical, dark_high (features.dark_tier 기반)
- transship 룰 제거 → transship_critical, transship_high (features.transship_tier 기반)
- DEDUP: ILLEGAL_TRANSSHIP 67→181, DARK_VESSEL 127→131, ZONE_DEPARTURE 127→89

## 공통 정리

- scheduler.py의 _gear_re 삭제, fleet_tracker.GEAR_PATTERN 단일 SSOT로 통합
2026-04-09 07:42:15 +09:00

334 lines
11 KiB
Python

from typing import Callable, Optional
import pandas as pd
from algorithms.location import haversine_nm
GAP_SUSPICIOUS_SEC = 1800 # 30분
GAP_HIGH_SUSPICIOUS_SEC = 3600 # 1시간
GAP_VIOLATION_SEC = 86400 # 24시간
# 한국 AIS 수신 가능 추정 영역 (한반도 + EEZ + 접속수역 여유)
_KR_COVERAGE_LAT = (32.0, 39.5)
_KR_COVERAGE_LON = (124.0, 132.0)
def detect_ais_gaps(df_vessel: pd.DataFrame) -> list[dict]:
"""AIS 수신 기록에서 소실 구간 추출."""
if len(df_vessel) < 2:
return []
gaps = []
records = df_vessel.sort_values('timestamp').to_dict('records')
for i in range(1, len(records)):
prev, curr = records[i - 1], records[i]
prev_ts = pd.Timestamp(prev['timestamp'])
curr_ts = pd.Timestamp(curr['timestamp'])
gap_sec = (curr_ts - prev_ts).total_seconds()
if gap_sec < GAP_SUSPICIOUS_SEC:
continue
disp = haversine_nm(
prev['lat'], prev['lon'],
curr['lat'], curr['lon'],
)
if gap_sec >= GAP_VIOLATION_SEC:
severity = 'VIOLATION'
elif gap_sec >= GAP_HIGH_SUSPICIOUS_SEC:
severity = 'HIGH_SUSPICIOUS'
else:
severity = 'SUSPICIOUS'
gaps.append({
'gap_sec': int(gap_sec),
'gap_min': round(gap_sec / 60, 1),
'displacement_nm': round(disp, 2),
'severity': severity,
})
return gaps
def is_dark_vessel(df_vessel: pd.DataFrame) -> tuple[bool, int]:
"""다크베셀 여부 판정.
Returns: (is_dark, max_gap_duration_min)
"""
gaps = detect_ais_gaps(df_vessel)
if not gaps:
return False, 0
max_gap_min = max(g['gap_min'] for g in gaps)
is_dark = max_gap_min >= 30 # 30분 이상 소실
return is_dark, int(max_gap_min)
def _classify_state(sog: float) -> str:
"""SOG 기준 간단 활동 상태 분류."""
if sog is None:
return 'UNKNOWN'
if sog <= 1.0:
return 'STATIONARY'
if sog <= 5.0:
return 'FISHING'
return 'SAILING'
def analyze_dark_pattern(df_vessel: pd.DataFrame) -> dict:
"""dark 판정 + gap 상세 정보 반환.
가장 긴 gap 한 건을 기준으로 패턴 분석에 필요한 정보를 모두 수집한다.
is_dark가 False이면 나머지 필드는 기본값으로 채움.
Returns:
{
'is_dark': bool,
'gap_min': int,
'gap_start_lat': Optional[float],
'gap_start_lon': Optional[float],
'gap_start_sog': float,
'gap_start_state': str,
'gap_end_lat': Optional[float],
'gap_end_lon': Optional[float],
'gap_distance_nm': float,
'gap_resumed': bool,
'pre_gap_turn_or_teleport': bool,
'avg_sog_before': float,
}
"""
default = {
'is_dark': False,
'gap_min': 0,
'gap_start_lat': None,
'gap_start_lon': None,
'gap_start_sog': 0.0,
'gap_start_state': 'UNKNOWN',
'gap_end_lat': None,
'gap_end_lon': None,
'gap_distance_nm': 0.0,
'gap_resumed': False,
'pre_gap_turn_or_teleport': False,
'avg_sog_before': 0.0,
}
if df_vessel is None or len(df_vessel) < 2:
return default
df_sorted = df_vessel.sort_values('timestamp').reset_index(drop=True)
records = df_sorted.to_dict('records')
# 가장 긴 gap 찾기
max_gap_sec = 0.0
max_gap_idx = -1 # records에서 gap 직후 인덱스 (curr)
for i in range(1, len(records)):
prev_ts = pd.Timestamp(records[i - 1]['timestamp'])
curr_ts = pd.Timestamp(records[i]['timestamp'])
gap_sec = (curr_ts - prev_ts).total_seconds()
if gap_sec > max_gap_sec:
max_gap_sec = gap_sec
max_gap_idx = i
if max_gap_idx < 1 or max_gap_sec < GAP_SUSPICIOUS_SEC:
return default
prev_row = records[max_gap_idx - 1] # gap 직전 마지막 포인트
curr_row = records[max_gap_idx] # gap 직후 첫 포인트
gap_start_lat = float(prev_row.get('lat')) if prev_row.get('lat') is not None else None
gap_start_lon = float(prev_row.get('lon')) if prev_row.get('lon') is not None else None
gap_end_lat = float(curr_row.get('lat')) if curr_row.get('lat') is not None else None
gap_end_lon = float(curr_row.get('lon')) if curr_row.get('lon') is not None else None
# gap 직전 SOG 추정: prev 행의 raw_sog 또는 computed sog 사용
gap_start_sog = float(prev_row.get('sog') or prev_row.get('raw_sog') or 0.0)
# gap 중 이동 거리
if all(v is not None for v in (gap_start_lat, gap_start_lon, gap_end_lat, gap_end_lon)):
gap_distance_nm = haversine_nm(
gap_start_lat, gap_start_lon, gap_end_lat, gap_end_lon,
)
else:
gap_distance_nm = 0.0
# 현재 시점 기준 gap이 "재개되었는지" 판단:
# curr_row가 df_sorted의 마지막 포인트가 아니면 신호가 이미 재개된 상태
# 마지막 포인트면 아직 gap 진행 중(curr_row는 gap 시작 직후 아니라 gap 전의 마지막일 수도 있음)
is_last = (max_gap_idx == len(records) - 1)
# gap이 마지막이면 신호 복귀 미확인
gap_resumed = not is_last or (
is_last and max_gap_idx < len(records) - 1 # 항상 False지만 안전용
)
# 단, max_gap_idx가 마지막이면 gap 후 포인트 없음 → 재개 미확인
if max_gap_idx == len(records) - 1:
gap_resumed = False
else:
gap_resumed = True
# gap 직전 5개 포인트로 평균 SOG + 이상 행동(teleport) 판정
start_idx = max(0, max_gap_idx - 5)
window = records[start_idx:max_gap_idx]
if window:
sogs = [float(r.get('sog') or r.get('raw_sog') or 0.0) for r in window]
avg_sog_before = sum(sogs) / len(sogs) if sogs else 0.0
else:
avg_sog_before = gap_start_sog
# gap 직전 window에 teleportation 발생 여부
pre_gap_turn_or_teleport = False
if len(window) >= 2:
try:
window_df = df_sorted.iloc[start_idx:max_gap_idx].copy()
# spoofing.detect_teleportation 재사용 (순환 import 방지 위해 지연 import)
from algorithms.spoofing import detect_teleportation
teleports = detect_teleportation(window_df)
if teleports:
pre_gap_turn_or_teleport = True
except Exception:
pre_gap_turn_or_teleport = False
return {
'is_dark': True,
'gap_min': int(max_gap_sec / 60),
'gap_start_lat': gap_start_lat,
'gap_start_lon': gap_start_lon,
'gap_start_sog': gap_start_sog,
'gap_start_state': _classify_state(gap_start_sog),
'gap_end_lat': gap_end_lat,
'gap_end_lon': gap_end_lon,
'gap_distance_nm': round(gap_distance_nm, 2),
'gap_resumed': gap_resumed,
'pre_gap_turn_or_teleport': pre_gap_turn_or_teleport,
'avg_sog_before': round(avg_sog_before, 2),
}
def _is_in_kr_coverage(lat: Optional[float], lon: Optional[float]) -> bool:
if lat is None or lon is None:
return False
return (_KR_COVERAGE_LAT[0] <= lat <= _KR_COVERAGE_LAT[1]
and _KR_COVERAGE_LON[0] <= lon <= _KR_COVERAGE_LON[1])
def compute_dark_suspicion(
gap_info: dict,
mmsi: str,
is_permitted: bool,
history: dict,
now_kst_hour: int,
classify_zone_fn: Optional[Callable[[float, float], dict]] = None,
) -> tuple[int, list[str], str]:
"""의도적 AIS OFF 의심 점수 산출.
Args:
gap_info: analyze_dark_pattern 결과
mmsi: 선박 MMSI
is_permitted: 허가 어선 여부
history: {'count_7d': int, 'count_24h': int}
now_kst_hour: 현재 KST 시각 (0~23)
classify_zone_fn: (lat, lon) -> dict. gap_start 위치의 zone 판단
Returns:
(score, patterns, tier)
tier: 'CRITICAL' / 'HIGH' / 'WATCH' / 'NONE'
"""
if not gap_info.get('is_dark'):
return 0, [], 'NONE'
score = 0
patterns: list[str] = []
gap_start_sog = gap_info.get('gap_start_sog') or 0.0
gap_start_state = gap_info.get('gap_start_state', 'UNKNOWN')
gap_start_lat = gap_info.get('gap_start_lat')
gap_start_lon = gap_info.get('gap_start_lon')
gap_min = gap_info.get('gap_min') or 0
# P1: 이동 중 OFF
if gap_start_sog > 5.0:
score += 25
patterns.append('moving_at_off')
elif gap_start_sog > 2.0:
score += 15
patterns.append('slow_moving_at_off')
# P2: gap 시작 위치의 민감 수역
if classify_zone_fn is not None and gap_start_lat is not None and gap_start_lon is not None:
try:
zone_info = classify_zone_fn(gap_start_lat, gap_start_lon)
zone = zone_info.get('zone', '')
if zone in ('TERRITORIAL_SEA', 'CONTIGUOUS_ZONE'):
score += 25
patterns.append('sensitive_zone')
elif zone.startswith('ZONE_'):
score += 15
patterns.append('special_zone')
except Exception:
pass
# P3: 반복 이력 (과거 7일)
h7 = int(history.get('count_7d', 0) or 0)
h24 = int(history.get('count_24h', 0) or 0)
if h7 >= 3:
score += 30
patterns.append('repeat_high')
elif h7 >= 2:
score += 15
patterns.append('repeat_low')
if h24 >= 1:
score += 10
patterns.append('recent_dark')
# P4: gap 후 이동 거리 비정상
gap_distance_nm = gap_info.get('gap_distance_nm') or 0.0
avg_sog_before = gap_info.get('avg_sog_before') or 0.0
if gap_info.get('gap_resumed') and gap_min > 0:
gap_hours = gap_min / 60.0
# 예상 이동 = avg_sog * gap_hours. 2배 초과면 비정상
expected = max(gap_hours * max(avg_sog_before, 1.0), 0.5)
if gap_distance_nm > expected * 2.0:
score += 20
patterns.append('distance_anomaly')
# P5: 주간 조업 시간 OFF
if 6 <= now_kst_hour < 18 and gap_start_state == 'FISHING':
score += 15
patterns.append('daytime_fishing_off')
# P6: gap 직전 이상 행동
if gap_info.get('pre_gap_turn_or_teleport'):
score += 15
patterns.append('teleport_before_gap')
# P7: 무허가
if not is_permitted:
score += 10
patterns.append('unpermitted')
# P8: gap 길이
if gap_min >= 360:
score += 15
patterns.append('very_long_gap')
elif gap_min >= 180:
score += 10
patterns.append('long_gap')
# 감점: gap 시작 위치가 한국 수신 커버리지 밖 → 자연 gap 가능성
if not _is_in_kr_coverage(gap_start_lat, gap_start_lon):
score -= 30
patterns.append('out_of_coverage')
score = max(0, min(100, score))
if score >= 70:
tier = 'CRITICAL'
elif score >= 50:
tier = 'HIGH'
elif score >= 30:
tier = 'WATCH'
else:
tier = 'NONE'
return score, patterns, tier