kcg-monitoring/prediction/algorithms/risk.py
htlee 7573c84e91 fix: 분석 파이프라인 정확도 개선 + 캐시 증분 갱신 + TTS 프록시
- MIN_TRAJ_POINTS 100→20 (16척→684척, 파이프라인 병목 해소)
- risk.py: SOG 급변 count를 위험도 점수에 반영 (+5/+10)
- spoofing.py: BD09 오프셋 중국 MMSI(412*) 예외 (좌표계 노이즈 제거)
- fishing_pattern.py: 마지막 조업 세그먼트 누락 버그 수정
- VesselAnalysisService: 인메모리 캐시 + 증분 갱신 (warmup 2h → incremental)
- nginx: /api/gtts 프록시 추가 (Google TTS CORS 우회)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 06:48:27 +09:00

87 lines
2.2 KiB
Python

from typing import Optional, Tuple
import pandas as pd
from algorithms.location import classify_zone
from algorithms.fishing_pattern import detect_fishing_segments, detect_trawl_uturn
from algorithms.dark_vessel import detect_ais_gaps
from algorithms.spoofing import detect_teleportation
def compute_vessel_risk_score(
mmsi: str,
df_vessel: pd.DataFrame,
zone_info: Optional[dict] = None,
is_permitted: Optional[bool] = None,
) -> Tuple[int, str]:
"""선박별 종합 위반 위험도 (0~100점).
Returns: (risk_score, risk_level)
"""
if len(df_vessel) == 0:
return 0, 'LOW'
score = 0
# 1. 위치 기반 (최대 40점)
if zone_info is None:
last = df_vessel.iloc[-1]
zone_info = classify_zone(last['lat'], last['lon'])
zone = zone_info.get('zone', '')
if zone == 'TERRITORIAL_SEA':
score += 40
elif zone == 'CONTIGUOUS_ZONE':
score += 10
elif zone.startswith('ZONE_'):
# 특정어업수역 내 — 무허가면 가산
if is_permitted is not None and not is_permitted:
score += 25
# 2. 조업 행위 (최대 30점)
segs = detect_fishing_segments(df_vessel)
ts_fishing = [s for s in segs if s.get('in_territorial_sea')]
if ts_fishing:
score += 20
elif segs:
score += 5
uturn = detect_trawl_uturn(df_vessel)
if uturn.get('trawl_suspected'):
score += 10
# 3. AIS 조작 (최대 35점)
teleports = detect_teleportation(df_vessel)
if teleports:
score += 20
from algorithms.spoofing import count_speed_jumps
jumps = count_speed_jumps(df_vessel)
if jumps >= 3:
score += 10
elif jumps >= 1:
score += 5
gaps = detect_ais_gaps(df_vessel)
critical_gaps = [g for g in gaps if g['gap_min'] >= 60]
if critical_gaps:
score += 15
elif gaps:
score += 5
# 4. 허가 이력 (최대 20점)
if is_permitted is not None and not is_permitted:
score += 20
score = min(score, 100)
if score >= 70:
level = 'CRITICAL'
elif score >= 50:
level = 'HIGH'
elif score >= 30:
level = 'MEDIUM'
else:
level = 'LOW'
return score, level