kcg-ai-monitoring/prediction/algorithms/risk.py
htlee 6fb0b04992 feat(prediction): 경량 분석 riskScore 해상도 개선 + vessel_type 매핑
경량 경로 선박 60.8%가 45점 고정으로 수렴하고 98.6%가 vessel_type
UNKNOWN 으로만 기록되던 문제를 해결한다.

riskScore (compute_lightweight_risk_score)
- dark_suspicion_score(0~100) 직접 반영: min(30, score*0.3)
- EEZ_OR_BEYOND 기선 근접도 가산 (12NM 내 +15, 24NM 내 +8)
- dark_history_24h 가산 (dark_suspicion_score 미반영 케이스만)
- 허가 이력 +20 → +8/+15 차등 (dark_suspicion_score 있을 때 이중계산 방지)
- gap_duration_min 4구간 차등 (fallback: 720m/180m/60m/30m)

vessel_type (신규 vessel_type_mapping.py)
- fleet_vessels fishery_code → VesselType 매핑:
  PT/PT-S/OT → TRAWL, GN → GILLNET, PS → PURSE, FC → CARGO
- GILLNET / CARGO 2개 값 신규 추가 (기존 TRAWL/PURSE/LONGLINE/TRAP/UNKNOWN)
- scheduler.py 경량 경로에서 등록선은 매핑, 미등록선은 UNKNOWN 유지

배포 후 검증 (redis-211 15:15 사이클)
- risk_score 분포: 45점 60.8% → 0% (11~40 범위 고르게 분산)
- vessel_type: UNKNOWN 98.6% → 89.1% (886척이 구체 유형으로 전환,
  TRAWL 518 / LONGLINE 171 / TRAP 78 / PURSE 73 / GILLNET 38 / CARGO 8)
- 412354335 샘플: 45 MEDIUM 고정 → 20 LOW (dss=40 × 0.3 + 축소 허가)
2026-04-16 15:19:55 +09:00

174 lines
5.1 KiB
Python

from typing import Optional, Tuple
import pandas as pd
from algorithms.location import classify_zone
from algorithms.fishing_pattern import detect_fishing_segments, detect_trawl_uturn
from algorithms.dark_vessel import detect_ais_gaps
from algorithms.spoofing import detect_teleportation
def compute_lightweight_risk_score(
zone_info: dict,
sog: float,
is_permitted: Optional[bool] = None,
is_dark: bool = False,
gap_duration_min: int = 0,
spoofing_score: float = 0.0,
dark_suspicion_score: int = 0,
dist_from_baseline_nm: float = 999.0,
dark_history_24h: int = 0,
) -> Tuple[int, str]:
"""위치·허가·다크/스푸핑 기반 경량 위험도 (파이프라인 미통과 선박용).
compute_dark_suspicion 의 패턴 기반 0~100 점수를 직접 반영해 해상도를 높인다.
이중계산 방지: dark_suspicion_score 는 이미 무허가/반복을 포함하므로 dark_suspicion_score > 0
인 경우 허가/반복 가산을 축소한다.
임계값 70/50/30 은 pipeline path(compute_vessel_risk_score)와 동일.
Returns: (risk_score, risk_level)
"""
score = 0
# 1. 위치 기반 (최대 40점) — EEZ 외 기선 근접도 추가
zone = zone_info.get('zone', '')
if zone == 'TERRITORIAL_SEA':
score += 40
elif zone == 'CONTIGUOUS_ZONE':
score += 15
elif zone.startswith('ZONE_'):
if is_permitted is not None and not is_permitted:
score += 25
elif zone == 'EEZ_OR_BEYOND':
# EEZ 외라도 기선 근접 시 가산 (공해·외해 분산)
if dist_from_baseline_nm < 12:
score += 15
elif dist_from_baseline_nm < 24:
score += 8
# 2. 다크 베셀 (최대 30점) — dark_suspicion_score 우선
if is_dark:
if dark_suspicion_score >= 1:
# compute_dark_suspicion 이 산출한 패턴 기반 의심도 반영
score += min(30, round(dark_suspicion_score * 0.3))
else:
# fallback: gap 길이만 기준
if gap_duration_min >= 720:
score += 25
elif gap_duration_min >= 180:
score += 20
elif gap_duration_min >= 60:
score += 15
elif gap_duration_min >= 30:
score += 8
# 3. 스푸핑 (최대 15점) — 현재 중국 선박은 거의 0 (별도 PR 에서 산출 로직 재설계 예정)
if spoofing_score > 0.7:
score += 15
elif spoofing_score > 0.5:
score += 8
# 4. 허가 이력 (최대 15점) — 이중계산 방지
if is_permitted is not None and not is_permitted:
# dark_suspicion_score 에 이미 무허가 +10 반영됨 → 축소
score += 8 if dark_suspicion_score > 0 else 15
# 5. 반복 이력 (최대 10점) — dark_suspicion_score 미반영 케이스만
if dark_suspicion_score == 0 and dark_history_24h > 0:
if dark_history_24h >= 5:
score += 10
elif dark_history_24h >= 2:
score += 5
score = min(score, 100)
if score >= 70:
level = 'CRITICAL'
elif score >= 50:
level = 'HIGH'
elif score >= 30:
level = 'MEDIUM'
else:
level = 'LOW'
return score, level
def compute_vessel_risk_score(
mmsi: str,
df_vessel: pd.DataFrame,
zone_info: Optional[dict] = None,
is_permitted: Optional[bool] = None,
) -> Tuple[int, str]:
"""선박별 종합 위반 위험도 (0~100점).
Returns: (risk_score, risk_level)
"""
if len(df_vessel) == 0:
return 0, 'LOW'
score = 0
# 1. 위치 기반 (최대 40점)
if zone_info is None:
last = df_vessel.iloc[-1]
zone_info = classify_zone(last['lat'], last['lon'])
zone = zone_info.get('zone', '')
if zone == 'TERRITORIAL_SEA':
score += 40
elif zone == 'CONTIGUOUS_ZONE':
score += 10
elif zone.startswith('ZONE_'):
# 특정어업수역 내 — 무허가면 가산
if is_permitted is not None and not is_permitted:
score += 25
# 2. 조업 행위 (최대 30점)
segs = detect_fishing_segments(df_vessel)
ts_fishing = [s for s in segs if s.get('in_territorial_sea')]
if ts_fishing:
score += 20
elif segs:
score += 5
uturn = detect_trawl_uturn(df_vessel)
if uturn.get('trawl_suspected'):
score += 10
# 3. AIS 조작 (최대 35점)
teleports = detect_teleportation(df_vessel)
if teleports:
score += 20
from algorithms.spoofing import count_speed_jumps
jumps = count_speed_jumps(df_vessel)
if jumps >= 3:
score += 10
elif jumps >= 1:
score += 5
gaps = detect_ais_gaps(df_vessel)
critical_gaps = [g for g in gaps if g['gap_min'] >= 60]
if critical_gaps:
score += 15
elif gaps:
score += 5
# 4. 허가 이력 (최대 20점)
if is_permitted is not None and not is_permitted:
score += 20
score = min(score, 100)
if score >= 70:
level = 'CRITICAL'
elif score >= 50:
level = 'HIGH'
elif score >= 30:
level = 'MEDIUM'
else:
level = 'LOW'
return score, level