- algorithms/dark_vessel.py: compute_dark_suspicion 에 params 인자 추가 · DARK_SUSPICION_DEFAULT_PARAMS 상수(19 가중치 + SOG/반복/gap/tier 임계) · _merge_default_params 깊이 병합 — params=None 시 BACK-COMPAT 완전 보장 · override 가 DEFAULT 를 변조하지 않는 불변성 - models_core/registered/dark_suspicion_model.py: BaseDetectionModel Adapter · AnalysisResult 리스트에서 gap_info 재평가 · evaluated/critical/high/watch_count 메트릭 기록 - models_core/seeds/v1_dark_suspicion.sql: DRAFT seed (운영 영향 0) · BEGIN/COMMIT 미포함 — 호출자 트랜잭션 제어 (psql -1 또는 wrap) · JSONB params 는 Python DEFAULT 와 1:1 일치 - models_core/seeds/README.md: 실행·승격 절차, 금지 패턴, 롤백 - tests/test_dark_suspicion_params.py: 5건 동치성 검증 · DEFAULT 형태, None↔DEFAULT 동치성, override 불변성 · seed SQL JSONB ↔ Python DEFAULT 정적 일치 검증 - 전체 20/20 테스트 통과 (Phase 1-2 기반 15 + Phase 2-1 동치성 5) - seed SQL 운영 DB dry-run(BEGIN/ROLLBACK) 성공 — INSERT 2건 정상, tier_thresholds 일치 확인 후속: gear_violation_g01_g06 / transshipment_5stage / risk_composite / pair_trawl_tier 는 별도 PR. 각 모델 같은 패턴(params 인자 + DEFAULT 상수 + Adapter + seed). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
433 lines
15 KiB
Python
433 lines
15 KiB
Python
from typing import Callable, Optional
|
|
|
|
import pandas as pd
|
|
from algorithms.location import haversine_nm
|
|
|
|
GAP_SUSPICIOUS_SEC = 6000 # 100분 (30분 → 100분 상향: 자연 gap 과탐 감소)
|
|
GAP_HIGH_SUSPICIOUS_SEC = 10800 # 3시간
|
|
GAP_VIOLATION_SEC = 86400 # 24시간
|
|
|
|
# 한국 AIS 수신 가능 추정 영역 (한반도 + EEZ + 접속수역 여유)
|
|
_KR_COVERAGE_LAT = (32.0, 39.5)
|
|
_KR_COVERAGE_LON = (124.0, 132.0)
|
|
|
|
|
|
def detect_ais_gaps(df_vessel: pd.DataFrame) -> list[dict]:
|
|
"""AIS 수신 기록에서 소실 구간 추출."""
|
|
if len(df_vessel) < 2:
|
|
return []
|
|
|
|
gaps = []
|
|
records = df_vessel.sort_values('timestamp').to_dict('records')
|
|
|
|
for i in range(1, len(records)):
|
|
prev, curr = records[i - 1], records[i]
|
|
prev_ts = pd.Timestamp(prev['timestamp'])
|
|
curr_ts = pd.Timestamp(curr['timestamp'])
|
|
gap_sec = (curr_ts - prev_ts).total_seconds()
|
|
|
|
if gap_sec < GAP_SUSPICIOUS_SEC:
|
|
continue
|
|
|
|
disp = haversine_nm(
|
|
prev['lat'], prev['lon'],
|
|
curr['lat'], curr['lon'],
|
|
)
|
|
|
|
if gap_sec >= GAP_VIOLATION_SEC:
|
|
severity = 'VIOLATION'
|
|
elif gap_sec >= GAP_HIGH_SUSPICIOUS_SEC:
|
|
severity = 'HIGH_SUSPICIOUS'
|
|
else:
|
|
severity = 'SUSPICIOUS'
|
|
|
|
gaps.append({
|
|
'gap_sec': int(gap_sec),
|
|
'gap_min': round(gap_sec / 60, 1),
|
|
'displacement_nm': round(disp, 2),
|
|
'severity': severity,
|
|
})
|
|
|
|
return gaps
|
|
|
|
|
|
def is_dark_vessel(df_vessel: pd.DataFrame) -> tuple[bool, int]:
|
|
"""다크베셀 여부 판정.
|
|
|
|
Returns: (is_dark, max_gap_duration_min)
|
|
"""
|
|
gaps = detect_ais_gaps(df_vessel)
|
|
if not gaps:
|
|
return False, 0
|
|
|
|
max_gap_min = max(g['gap_min'] for g in gaps)
|
|
is_dark = max_gap_min >= (GAP_SUSPICIOUS_SEC / 60) # 상수에서 파생
|
|
return is_dark, int(max_gap_min)
|
|
|
|
|
|
def _classify_state(sog: float) -> str:
|
|
"""SOG 기준 간단 활동 상태 분류."""
|
|
if sog is None:
|
|
return 'UNKNOWN'
|
|
if sog <= 1.0:
|
|
return 'STATIONARY'
|
|
if sog <= 5.0:
|
|
return 'FISHING'
|
|
return 'SAILING'
|
|
|
|
|
|
def analyze_dark_pattern(df_vessel: pd.DataFrame) -> dict:
|
|
"""dark 판정 + gap 상세 정보 반환.
|
|
|
|
가장 긴 gap 한 건을 기준으로 패턴 분석에 필요한 정보를 모두 수집한다.
|
|
is_dark가 False이면 나머지 필드는 기본값으로 채움.
|
|
|
|
Returns:
|
|
{
|
|
'is_dark': bool,
|
|
'gap_min': int,
|
|
'gap_start_lat': Optional[float],
|
|
'gap_start_lon': Optional[float],
|
|
'gap_start_sog': float,
|
|
'gap_start_state': str,
|
|
'gap_end_lat': Optional[float],
|
|
'gap_end_lon': Optional[float],
|
|
'gap_distance_nm': float,
|
|
'gap_resumed': bool,
|
|
'pre_gap_turn_or_teleport': bool,
|
|
'avg_sog_before': float,
|
|
}
|
|
"""
|
|
default = {
|
|
'is_dark': False,
|
|
'gap_min': 0,
|
|
'gap_start_lat': None,
|
|
'gap_start_lon': None,
|
|
'gap_start_sog': 0.0,
|
|
'gap_start_state': 'UNKNOWN',
|
|
'gap_end_lat': None,
|
|
'gap_end_lon': None,
|
|
'gap_distance_nm': 0.0,
|
|
'gap_resumed': False,
|
|
'pre_gap_turn_or_teleport': False,
|
|
'avg_sog_before': 0.0,
|
|
}
|
|
|
|
if df_vessel is None or len(df_vessel) < 2:
|
|
return default
|
|
|
|
df_sorted = df_vessel.sort_values('timestamp').reset_index(drop=True)
|
|
records = df_sorted.to_dict('records')
|
|
|
|
# 가장 긴 gap 찾기
|
|
max_gap_sec = 0.0
|
|
max_gap_idx = -1 # records에서 gap 직후 인덱스 (curr)
|
|
for i in range(1, len(records)):
|
|
prev_ts = pd.Timestamp(records[i - 1]['timestamp'])
|
|
curr_ts = pd.Timestamp(records[i]['timestamp'])
|
|
gap_sec = (curr_ts - prev_ts).total_seconds()
|
|
if gap_sec > max_gap_sec:
|
|
max_gap_sec = gap_sec
|
|
max_gap_idx = i
|
|
|
|
if max_gap_idx < 1 or max_gap_sec < GAP_SUSPICIOUS_SEC:
|
|
return default
|
|
|
|
prev_row = records[max_gap_idx - 1] # gap 직전 마지막 포인트
|
|
curr_row = records[max_gap_idx] # gap 직후 첫 포인트
|
|
|
|
gap_start_lat = float(prev_row.get('lat')) if prev_row.get('lat') is not None else None
|
|
gap_start_lon = float(prev_row.get('lon')) if prev_row.get('lon') is not None else None
|
|
gap_end_lat = float(curr_row.get('lat')) if curr_row.get('lat') is not None else None
|
|
gap_end_lon = float(curr_row.get('lon')) if curr_row.get('lon') is not None else None
|
|
|
|
# gap 직전 SOG 추정: prev 행의 raw_sog 또는 computed sog 사용
|
|
gap_start_sog = float(prev_row.get('sog') or prev_row.get('raw_sog') or 0.0)
|
|
|
|
# gap 중 이동 거리
|
|
if all(v is not None for v in (gap_start_lat, gap_start_lon, gap_end_lat, gap_end_lon)):
|
|
gap_distance_nm = haversine_nm(
|
|
gap_start_lat, gap_start_lon, gap_end_lat, gap_end_lon,
|
|
)
|
|
else:
|
|
gap_distance_nm = 0.0
|
|
|
|
# 현재 시점 기준 gap이 "재개되었는지" 판단:
|
|
# curr_row가 df_sorted의 마지막 포인트가 아니면 신호가 이미 재개된 상태
|
|
# 마지막 포인트면 아직 gap 진행 중(curr_row는 gap 시작 직후 아니라 gap 전의 마지막일 수도 있음)
|
|
is_last = (max_gap_idx == len(records) - 1)
|
|
# gap이 마지막이면 신호 복귀 미확인
|
|
gap_resumed = not is_last or (
|
|
is_last and max_gap_idx < len(records) - 1 # 항상 False지만 안전용
|
|
)
|
|
# 단, max_gap_idx가 마지막이면 gap 후 포인트 없음 → 재개 미확인
|
|
if max_gap_idx == len(records) - 1:
|
|
gap_resumed = False
|
|
else:
|
|
gap_resumed = True
|
|
|
|
# gap 직전 5개 포인트로 평균 SOG + 이상 행동(teleport) 판정
|
|
start_idx = max(0, max_gap_idx - 5)
|
|
window = records[start_idx:max_gap_idx]
|
|
if window:
|
|
sogs = [float(r.get('sog') or r.get('raw_sog') or 0.0) for r in window]
|
|
avg_sog_before = sum(sogs) / len(sogs) if sogs else 0.0
|
|
else:
|
|
avg_sog_before = gap_start_sog
|
|
|
|
# gap 직전 window에 teleportation 발생 여부
|
|
pre_gap_turn_or_teleport = False
|
|
if len(window) >= 2:
|
|
try:
|
|
window_df = df_sorted.iloc[start_idx:max_gap_idx].copy()
|
|
# spoofing.detect_teleportation 재사용 (순환 import 방지 위해 지연 import)
|
|
from algorithms.spoofing import detect_teleportation
|
|
teleports = detect_teleportation(window_df)
|
|
if teleports:
|
|
pre_gap_turn_or_teleport = True
|
|
except Exception:
|
|
pre_gap_turn_or_teleport = False
|
|
|
|
return {
|
|
'is_dark': True,
|
|
'gap_min': int(max_gap_sec / 60),
|
|
'gap_start_lat': gap_start_lat,
|
|
'gap_start_lon': gap_start_lon,
|
|
'gap_start_sog': gap_start_sog,
|
|
'gap_start_state': _classify_state(gap_start_sog),
|
|
'gap_end_lat': gap_end_lat,
|
|
'gap_end_lon': gap_end_lon,
|
|
'gap_distance_nm': round(gap_distance_nm, 2),
|
|
'gap_resumed': gap_resumed,
|
|
'pre_gap_turn_or_teleport': pre_gap_turn_or_teleport,
|
|
'avg_sog_before': round(avg_sog_before, 2),
|
|
}
|
|
|
|
|
|
def _is_in_kr_coverage(lat: Optional[float], lon: Optional[float]) -> bool:
|
|
if lat is None or lon is None:
|
|
return False
|
|
return (_KR_COVERAGE_LAT[0] <= lat <= _KR_COVERAGE_LAT[1]
|
|
and _KR_COVERAGE_LON[0] <= lon <= _KR_COVERAGE_LON[1])
|
|
|
|
|
|
# compute_dark_suspicion 의 기본 파라미터 (`params=None` 시 사용).
|
|
# Phase 2 마이그레이션 — detection_model_versions.params JSONB 로 seed 되며,
|
|
# 운영자가 /ai/detection-models/{dark_suspicion}/versions 로 DRAFT → ACTIVE 시 교체.
|
|
# Python 상수를 단일 진실 공급원으로 삼고 registry seed 가 이 값을 그대로 복사한다.
|
|
DARK_SUSPICION_DEFAULT_PARAMS: dict = {
|
|
'sog_thresholds': {
|
|
'moving': 5.0, # P1 이동 중 OFF 판정 속도
|
|
'slow_moving': 2.0, # P1 서행 OFF 판정 속도
|
|
'underway_deliberate': 3.0, # P10 'under way' + 속도 시 의도성
|
|
},
|
|
'heading_cog_mismatch_deg': 60.0, # P11 heading vs COG diff 임계
|
|
'weights': {
|
|
'P1_moving_off': 25,
|
|
'P1_slow_moving_off': 15,
|
|
'P2_sensitive_zone': 25,
|
|
'P2_special_zone': 15,
|
|
'P3_repeat_high': 30,
|
|
'P3_repeat_low': 15,
|
|
'P3_recent_dark': 10,
|
|
'P4_distance_anomaly': 20,
|
|
'P5_daytime_fishing_off': 15,
|
|
'P6_teleport_before_gap': 15,
|
|
'P7_unpermitted': 10,
|
|
'P8_very_long_gap': 15,
|
|
'P8_long_gap': 10,
|
|
'P9_fishing_vessel_dark': 10,
|
|
'P9_cargo_natural_gap': -10,
|
|
'P10_underway_deliberate': 20,
|
|
'P10_anchored_natural': -15,
|
|
'P11_heading_cog_mismatch': 15,
|
|
'out_of_coverage': -50,
|
|
},
|
|
'repeat_thresholds': {'h7_high': 3, 'h7_low': 2, 'h24_recent': 1},
|
|
'gap_min_thresholds': {'very_long': 360, 'long': 180},
|
|
'p4_distance_multiplier': 2.0, # 예상 이동거리 대비 비정상 판정 배수
|
|
'p5_daytime_range': [6, 18], # [start, end) KST 시
|
|
'tier_thresholds': {'critical': 70, 'high': 50, 'watch': 30},
|
|
}
|
|
|
|
|
|
def _merge_default_params(override: Optional[dict]) -> dict:
|
|
"""override 딕셔너리의 값을 DEFAULT 에 깊이 병합 (unset 키는 기본값 사용)."""
|
|
if not override:
|
|
return DARK_SUSPICION_DEFAULT_PARAMS
|
|
merged = {k: (dict(v) if isinstance(v, dict) else v)
|
|
for k, v in DARK_SUSPICION_DEFAULT_PARAMS.items()}
|
|
for key, val in override.items():
|
|
if isinstance(val, dict) and isinstance(merged.get(key), dict):
|
|
merged[key] = {**merged[key], **val}
|
|
else:
|
|
merged[key] = val
|
|
return merged
|
|
|
|
|
|
def compute_dark_suspicion(
|
|
gap_info: dict,
|
|
mmsi: str,
|
|
is_permitted: bool,
|
|
history: dict,
|
|
now_kst_hour: int,
|
|
classify_zone_fn: Optional[Callable[[float, float], dict]] = None,
|
|
ship_kind_code: str = '',
|
|
nav_status: str = '',
|
|
heading: Optional[float] = None,
|
|
last_cog: Optional[float] = None,
|
|
params: Optional[dict] = None,
|
|
) -> tuple[int, list[str], str]:
|
|
"""의도적 AIS OFF 의심 점수 산출.
|
|
|
|
Args:
|
|
gap_info: analyze_dark_pattern 결과
|
|
mmsi: 선박 MMSI
|
|
is_permitted: 허가 어선 여부
|
|
history: {'count_7d': int, 'count_24h': int}
|
|
now_kst_hour: 현재 KST 시각 (0~23)
|
|
classify_zone_fn: (lat, lon) -> dict. gap_start 위치의 zone 판단
|
|
ship_kind_code: 선종 코드 (000020=어선, 000023=화물 등)
|
|
nav_status: 항해 상태 텍스트 ("Under way using engine" 등)
|
|
heading: 선수 방향 (0~360, signal-batch API)
|
|
last_cog: gap 직전 침로 (0~360)
|
|
params: detection_model_versions.params (None 이면 DEFAULT_PARAMS).
|
|
동일 입력 + params=None 은 Phase 2 이전과 완전 동일한 결과를 낸다.
|
|
|
|
Returns:
|
|
(score, patterns, tier)
|
|
tier: 'CRITICAL' / 'HIGH' / 'WATCH' / 'NONE'
|
|
"""
|
|
if not gap_info.get('is_dark'):
|
|
return 0, [], 'NONE'
|
|
|
|
p = _merge_default_params(params)
|
|
w = p['weights']
|
|
sog = p['sog_thresholds']
|
|
rpt = p['repeat_thresholds']
|
|
gmt = p['gap_min_thresholds']
|
|
tier_thr = p['tier_thresholds']
|
|
day_start, day_end = p['p5_daytime_range']
|
|
|
|
score = 0
|
|
patterns: list[str] = []
|
|
|
|
gap_start_sog = gap_info.get('gap_start_sog') or 0.0
|
|
gap_start_state = gap_info.get('gap_start_state', 'UNKNOWN')
|
|
gap_start_lat = gap_info.get('gap_start_lat')
|
|
gap_start_lon = gap_info.get('gap_start_lon')
|
|
gap_min = gap_info.get('gap_min') or 0
|
|
|
|
# P1: 이동 중 OFF
|
|
if gap_start_sog > sog['moving']:
|
|
score += w['P1_moving_off']
|
|
patterns.append('moving_at_off')
|
|
elif gap_start_sog > sog['slow_moving']:
|
|
score += w['P1_slow_moving_off']
|
|
patterns.append('slow_moving_at_off')
|
|
|
|
# P2: gap 시작 위치의 민감 수역
|
|
if classify_zone_fn is not None and gap_start_lat is not None and gap_start_lon is not None:
|
|
try:
|
|
zone_info = classify_zone_fn(gap_start_lat, gap_start_lon)
|
|
zone = zone_info.get('zone', '')
|
|
if zone in ('TERRITORIAL_SEA', 'CONTIGUOUS_ZONE'):
|
|
score += w['P2_sensitive_zone']
|
|
patterns.append('sensitive_zone')
|
|
elif zone.startswith('ZONE_'):
|
|
score += w['P2_special_zone']
|
|
patterns.append('special_zone')
|
|
except Exception:
|
|
pass
|
|
|
|
# P3: 반복 이력 (과거 7일)
|
|
h7 = int(history.get('count_7d', 0) or 0)
|
|
h24 = int(history.get('count_24h', 0) or 0)
|
|
if h7 >= rpt['h7_high']:
|
|
score += w['P3_repeat_high']
|
|
patterns.append('repeat_high')
|
|
elif h7 >= rpt['h7_low']:
|
|
score += w['P3_repeat_low']
|
|
patterns.append('repeat_low')
|
|
if h24 >= rpt['h24_recent']:
|
|
score += w['P3_recent_dark']
|
|
patterns.append('recent_dark')
|
|
|
|
# P4: gap 후 이동 거리 비정상
|
|
gap_distance_nm = gap_info.get('gap_distance_nm') or 0.0
|
|
avg_sog_before = gap_info.get('avg_sog_before') or 0.0
|
|
if gap_info.get('gap_resumed') and gap_min > 0:
|
|
gap_hours = gap_min / 60.0
|
|
expected = max(gap_hours * max(avg_sog_before, 1.0), 0.5)
|
|
if gap_distance_nm > expected * p['p4_distance_multiplier']:
|
|
score += w['P4_distance_anomaly']
|
|
patterns.append('distance_anomaly')
|
|
|
|
# P5: 주간 조업 시간 OFF
|
|
if day_start <= now_kst_hour < day_end and gap_start_state == 'FISHING':
|
|
score += w['P5_daytime_fishing_off']
|
|
patterns.append('daytime_fishing_off')
|
|
|
|
# P6: gap 직전 이상 행동
|
|
if gap_info.get('pre_gap_turn_or_teleport'):
|
|
score += w['P6_teleport_before_gap']
|
|
patterns.append('teleport_before_gap')
|
|
|
|
# P7: 무허가
|
|
if not is_permitted:
|
|
score += w['P7_unpermitted']
|
|
patterns.append('unpermitted')
|
|
|
|
# P8: gap 길이
|
|
if gap_min >= gmt['very_long']:
|
|
score += w['P8_very_long_gap']
|
|
patterns.append('very_long_gap')
|
|
elif gap_min >= gmt['long']:
|
|
score += w['P8_long_gap']
|
|
patterns.append('long_gap')
|
|
|
|
# P9: 선종별 가중치
|
|
if ship_kind_code == '000020':
|
|
score += w['P9_fishing_vessel_dark']
|
|
patterns.append('fishing_vessel_dark')
|
|
elif ship_kind_code == '000023':
|
|
score += w['P9_cargo_natural_gap']
|
|
patterns.append('cargo_natural_gap')
|
|
|
|
# P10: 항해 상태 기반 의도성
|
|
if nav_status:
|
|
status_lower = nav_status.lower()
|
|
if 'under way' in status_lower and gap_start_sog > sog['underway_deliberate']:
|
|
score += w['P10_underway_deliberate']
|
|
patterns.append('underway_deliberate_off')
|
|
elif 'anchor' in status_lower or 'moored' in status_lower:
|
|
score += w['P10_anchored_natural']
|
|
patterns.append('anchored_natural_gap')
|
|
|
|
# P11: heading vs COG 불일치
|
|
if heading is not None and last_cog is not None:
|
|
diff = abs(heading - last_cog) % 360
|
|
if diff > 180:
|
|
diff = 360 - diff
|
|
if diff > p['heading_cog_mismatch_deg']:
|
|
score += w['P11_heading_cog_mismatch']
|
|
patterns.append('heading_cog_mismatch')
|
|
|
|
# 감점: gap 시작 위치가 한국 수신 커버리지 밖
|
|
if not _is_in_kr_coverage(gap_start_lat, gap_start_lon):
|
|
score += w['out_of_coverage']
|
|
patterns.append('out_of_coverage')
|
|
|
|
score = max(0, min(100, score))
|
|
|
|
if score >= tier_thr['critical']:
|
|
tier = 'CRITICAL'
|
|
elif score >= tier_thr['high']:
|
|
tier = 'HIGH'
|
|
elif score >= tier_thr['watch']:
|
|
tier = 'WATCH'
|
|
else:
|
|
tier = 'NONE'
|
|
|
|
return score, patterns, tier
|