from typing import Optional, Tuple import pandas as pd from algorithms.location import classify_zone from algorithms.fishing_pattern import detect_fishing_segments, detect_trawl_uturn from algorithms.dark_vessel import detect_ais_gaps from algorithms.spoofing import detect_teleportation def compute_lightweight_risk_score( zone_info: dict, sog: float, is_permitted: Optional[bool] = None, ) -> Tuple[int, str]: """위치·허가 이력 기반 경량 위험도 (파이프라인 미통과 선박용). compute_vessel_risk_score의 1번(위치)+4번(허가) 로직과 동일. Returns: (risk_score, risk_level) """ score = 0 # 1. 위치 기반 (최대 40점) zone = zone_info.get('zone', '') if zone == 'TERRITORIAL_SEA': score += 40 elif zone == 'CONTIGUOUS_ZONE': score += 10 elif zone.startswith('ZONE_'): if is_permitted is not None and not is_permitted: score += 25 # 4. 허가 이력 (최대 20점) if is_permitted is not None and not is_permitted: score += 20 score = min(score, 100) if score >= 70: level = 'CRITICAL' elif score >= 50: level = 'HIGH' elif score >= 30: level = 'MEDIUM' else: level = 'LOW' return score, level def compute_vessel_risk_score( mmsi: str, df_vessel: pd.DataFrame, zone_info: Optional[dict] = None, is_permitted: Optional[bool] = None, ) -> Tuple[int, str]: """선박별 종합 위반 위험도 (0~100점). Returns: (risk_score, risk_level) """ if len(df_vessel) == 0: return 0, 'LOW' score = 0 # 1. 위치 기반 (최대 40점) if zone_info is None: last = df_vessel.iloc[-1] zone_info = classify_zone(last['lat'], last['lon']) zone = zone_info.get('zone', '') if zone == 'TERRITORIAL_SEA': score += 40 elif zone == 'CONTIGUOUS_ZONE': score += 10 elif zone.startswith('ZONE_'): # 특정어업수역 내 — 무허가면 가산 if is_permitted is not None and not is_permitted: score += 25 # 2. 조업 행위 (최대 30점) segs = detect_fishing_segments(df_vessel) ts_fishing = [s for s in segs if s.get('in_territorial_sea')] if ts_fishing: score += 20 elif segs: score += 5 uturn = detect_trawl_uturn(df_vessel) if uturn.get('trawl_suspected'): score += 10 # 3. AIS 조작 (최대 35점) teleports = detect_teleportation(df_vessel) if teleports: score += 20 from algorithms.spoofing import count_speed_jumps jumps = count_speed_jumps(df_vessel) if jumps >= 3: score += 10 elif jumps >= 1: score += 5 gaps = detect_ais_gaps(df_vessel) critical_gaps = [g for g in gaps if g['gap_min'] >= 60] if critical_gaps: score += 15 elif gaps: score += 5 # 4. 허가 이력 (최대 20점) if is_permitted is not None and not is_permitted: score += 20 score = min(score, 100) if score >= 70: level = 'CRITICAL' elif score >= 50: level = 'HIGH' elif score >= 30: level = 'MEDIUM' else: level = 'LOW' return score, level