import pandas as pd from algorithms.location import haversine_nm GAP_SUSPICIOUS_SEC = 1800 # 30분 GAP_HIGH_SUSPICIOUS_SEC = 3600 # 1시간 GAP_VIOLATION_SEC = 86400 # 24시간 def detect_ais_gaps(df_vessel: pd.DataFrame) -> list[dict]: """AIS 수신 기록에서 소실 구간 추출.""" if len(df_vessel) < 2: return [] gaps = [] records = df_vessel.sort_values('timestamp').to_dict('records') for i in range(1, len(records)): prev, curr = records[i - 1], records[i] prev_ts = pd.Timestamp(prev['timestamp']) curr_ts = pd.Timestamp(curr['timestamp']) gap_sec = (curr_ts - prev_ts).total_seconds() if gap_sec < GAP_SUSPICIOUS_SEC: continue disp = haversine_nm( prev['lat'], prev['lon'], curr['lat'], curr['lon'], ) if gap_sec >= GAP_VIOLATION_SEC: severity = 'VIOLATION' elif gap_sec >= GAP_HIGH_SUSPICIOUS_SEC: severity = 'HIGH_SUSPICIOUS' else: severity = 'SUSPICIOUS' gaps.append({ 'gap_sec': int(gap_sec), 'gap_min': round(gap_sec / 60, 1), 'displacement_nm': round(disp, 2), 'severity': severity, }) return gaps def is_dark_vessel(df_vessel: pd.DataFrame) -> tuple[bool, int]: """다크베셀 여부 판정. Returns: (is_dark, max_gap_duration_min) """ gaps = detect_ais_gaps(df_vessel) if not gaps: return False, 0 max_gap_min = max(g['gap_min'] for g in gaps) is_dark = max_gap_min >= 30 # 30분 이상 소실 return is_dark, int(max_gap_min)