from typing import Callable, Optional import pandas as pd from algorithms.location import haversine_nm GAP_SUSPICIOUS_SEC = 6000 # 100분 (30분 → 100분 상향: 자연 gap 과탐 감소) GAP_HIGH_SUSPICIOUS_SEC = 10800 # 3시간 GAP_VIOLATION_SEC = 86400 # 24시간 # 한국 AIS 수신 가능 추정 영역 (한반도 + EEZ + 접속수역 여유) _KR_COVERAGE_LAT = (32.0, 39.5) _KR_COVERAGE_LON = (124.0, 132.0) def detect_ais_gaps(df_vessel: pd.DataFrame) -> list[dict]: """AIS 수신 기록에서 소실 구간 추출.""" if len(df_vessel) < 2: return [] gaps = [] records = df_vessel.sort_values('timestamp').to_dict('records') for i in range(1, len(records)): prev, curr = records[i - 1], records[i] prev_ts = pd.Timestamp(prev['timestamp']) curr_ts = pd.Timestamp(curr['timestamp']) gap_sec = (curr_ts - prev_ts).total_seconds() if gap_sec < GAP_SUSPICIOUS_SEC: continue disp = haversine_nm( prev['lat'], prev['lon'], curr['lat'], curr['lon'], ) if gap_sec >= GAP_VIOLATION_SEC: severity = 'VIOLATION' elif gap_sec >= GAP_HIGH_SUSPICIOUS_SEC: severity = 'HIGH_SUSPICIOUS' else: severity = 'SUSPICIOUS' gaps.append({ 'gap_sec': int(gap_sec), 'gap_min': round(gap_sec / 60, 1), 'displacement_nm': round(disp, 2), 'severity': severity, }) return gaps def is_dark_vessel(df_vessel: pd.DataFrame) -> tuple[bool, int]: """다크베셀 여부 판정. Returns: (is_dark, max_gap_duration_min) """ gaps = detect_ais_gaps(df_vessel) if not gaps: return False, 0 max_gap_min = max(g['gap_min'] for g in gaps) is_dark = max_gap_min >= (GAP_SUSPICIOUS_SEC / 60) # 상수에서 파생 return is_dark, int(max_gap_min) def _classify_state(sog: float) -> str: """SOG 기준 간단 활동 상태 분류.""" if sog is None: return 'UNKNOWN' if sog <= 1.0: return 'STATIONARY' if sog <= 5.0: return 'FISHING' return 'SAILING' def analyze_dark_pattern(df_vessel: pd.DataFrame) -> dict: """dark 판정 + gap 상세 정보 반환. 가장 긴 gap 한 건을 기준으로 패턴 분석에 필요한 정보를 모두 수집한다. is_dark가 False이면 나머지 필드는 기본값으로 채움. Returns: { 'is_dark': bool, 'gap_min': int, 'gap_start_lat': Optional[float], 'gap_start_lon': Optional[float], 'gap_start_sog': float, 'gap_start_state': str, 'gap_end_lat': Optional[float], 'gap_end_lon': Optional[float], 'gap_distance_nm': float, 'gap_resumed': bool, 'pre_gap_turn_or_teleport': bool, 'avg_sog_before': float, } """ default = { 'is_dark': False, 'gap_min': 0, 'gap_start_lat': None, 'gap_start_lon': None, 'gap_start_sog': 0.0, 'gap_start_state': 'UNKNOWN', 'gap_end_lat': None, 'gap_end_lon': None, 'gap_distance_nm': 0.0, 'gap_resumed': False, 'pre_gap_turn_or_teleport': False, 'avg_sog_before': 0.0, } if df_vessel is None or len(df_vessel) < 2: return default df_sorted = df_vessel.sort_values('timestamp').reset_index(drop=True) records = df_sorted.to_dict('records') # 가장 긴 gap 찾기 max_gap_sec = 0.0 max_gap_idx = -1 # records에서 gap 직후 인덱스 (curr) for i in range(1, len(records)): prev_ts = pd.Timestamp(records[i - 1]['timestamp']) curr_ts = pd.Timestamp(records[i]['timestamp']) gap_sec = (curr_ts - prev_ts).total_seconds() if gap_sec > max_gap_sec: max_gap_sec = gap_sec max_gap_idx = i if max_gap_idx < 1 or max_gap_sec < GAP_SUSPICIOUS_SEC: return default prev_row = records[max_gap_idx - 1] # gap 직전 마지막 포인트 curr_row = records[max_gap_idx] # gap 직후 첫 포인트 gap_start_lat = float(prev_row.get('lat')) if prev_row.get('lat') is not None else None gap_start_lon = float(prev_row.get('lon')) if prev_row.get('lon') is not None else None gap_end_lat = float(curr_row.get('lat')) if curr_row.get('lat') is not None else None gap_end_lon = float(curr_row.get('lon')) if curr_row.get('lon') is not None else None # gap 직전 SOG 추정: prev 행의 raw_sog 또는 computed sog 사용 gap_start_sog = float(prev_row.get('sog') or prev_row.get('raw_sog') or 0.0) # gap 중 이동 거리 if all(v is not None for v in (gap_start_lat, gap_start_lon, gap_end_lat, gap_end_lon)): gap_distance_nm = haversine_nm( gap_start_lat, gap_start_lon, gap_end_lat, gap_end_lon, ) else: gap_distance_nm = 0.0 # 현재 시점 기준 gap이 "재개되었는지" 판단: # curr_row가 df_sorted의 마지막 포인트가 아니면 신호가 이미 재개된 상태 # 마지막 포인트면 아직 gap 진행 중(curr_row는 gap 시작 직후 아니라 gap 전의 마지막일 수도 있음) is_last = (max_gap_idx == len(records) - 1) # gap이 마지막이면 신호 복귀 미확인 gap_resumed = not is_last or ( is_last and max_gap_idx < len(records) - 1 # 항상 False지만 안전용 ) # 단, max_gap_idx가 마지막이면 gap 후 포인트 없음 → 재개 미확인 if max_gap_idx == len(records) - 1: gap_resumed = False else: gap_resumed = True # gap 직전 5개 포인트로 평균 SOG + 이상 행동(teleport) 판정 start_idx = max(0, max_gap_idx - 5) window = records[start_idx:max_gap_idx] if window: sogs = [float(r.get('sog') or r.get('raw_sog') or 0.0) for r in window] avg_sog_before = sum(sogs) / len(sogs) if sogs else 0.0 else: avg_sog_before = gap_start_sog # gap 직전 window에 teleportation 발생 여부 pre_gap_turn_or_teleport = False if len(window) >= 2: try: window_df = df_sorted.iloc[start_idx:max_gap_idx].copy() # spoofing.detect_teleportation 재사용 (순환 import 방지 위해 지연 import) from algorithms.spoofing import detect_teleportation teleports = detect_teleportation(window_df) if teleports: pre_gap_turn_or_teleport = True except Exception: pre_gap_turn_or_teleport = False return { 'is_dark': True, 'gap_min': int(max_gap_sec / 60), 'gap_start_lat': gap_start_lat, 'gap_start_lon': gap_start_lon, 'gap_start_sog': gap_start_sog, 'gap_start_state': _classify_state(gap_start_sog), 'gap_end_lat': gap_end_lat, 'gap_end_lon': gap_end_lon, 'gap_distance_nm': round(gap_distance_nm, 2), 'gap_resumed': gap_resumed, 'pre_gap_turn_or_teleport': pre_gap_turn_or_teleport, 'avg_sog_before': round(avg_sog_before, 2), } def _is_in_kr_coverage(lat: Optional[float], lon: Optional[float]) -> bool: if lat is None or lon is None: return False return (_KR_COVERAGE_LAT[0] <= lat <= _KR_COVERAGE_LAT[1] and _KR_COVERAGE_LON[0] <= lon <= _KR_COVERAGE_LON[1]) def compute_dark_suspicion( gap_info: dict, mmsi: str, is_permitted: bool, history: dict, now_kst_hour: int, classify_zone_fn: Optional[Callable[[float, float], dict]] = None, ship_kind_code: str = '', nav_status: str = '', heading: Optional[float] = None, last_cog: Optional[float] = None, ) -> tuple[int, list[str], str]: """의도적 AIS OFF 의심 점수 산출. Args: gap_info: analyze_dark_pattern 결과 mmsi: 선박 MMSI is_permitted: 허가 어선 여부 history: {'count_7d': int, 'count_24h': int} now_kst_hour: 현재 KST 시각 (0~23) classify_zone_fn: (lat, lon) -> dict. gap_start 위치의 zone 판단 ship_kind_code: 선종 코드 (000020=어선, 000023=화물 등) nav_status: 항해 상태 텍스트 ("Under way using engine" 등) heading: 선수 방향 (0~360, signal-batch API) last_cog: gap 직전 침로 (0~360) Returns: (score, patterns, tier) tier: 'CRITICAL' / 'HIGH' / 'WATCH' / 'NONE' """ if not gap_info.get('is_dark'): return 0, [], 'NONE' score = 0 patterns: list[str] = [] gap_start_sog = gap_info.get('gap_start_sog') or 0.0 gap_start_state = gap_info.get('gap_start_state', 'UNKNOWN') gap_start_lat = gap_info.get('gap_start_lat') gap_start_lon = gap_info.get('gap_start_lon') gap_min = gap_info.get('gap_min') or 0 # P1: 이동 중 OFF if gap_start_sog > 5.0: score += 25 patterns.append('moving_at_off') elif gap_start_sog > 2.0: score += 15 patterns.append('slow_moving_at_off') # P2: gap 시작 위치의 민감 수역 if classify_zone_fn is not None and gap_start_lat is not None and gap_start_lon is not None: try: zone_info = classify_zone_fn(gap_start_lat, gap_start_lon) zone = zone_info.get('zone', '') if zone in ('TERRITORIAL_SEA', 'CONTIGUOUS_ZONE'): score += 25 patterns.append('sensitive_zone') elif zone.startswith('ZONE_'): score += 15 patterns.append('special_zone') except Exception: pass # P3: 반복 이력 (과거 7일) h7 = int(history.get('count_7d', 0) or 0) h24 = int(history.get('count_24h', 0) or 0) if h7 >= 3: score += 30 patterns.append('repeat_high') elif h7 >= 2: score += 15 patterns.append('repeat_low') if h24 >= 1: score += 10 patterns.append('recent_dark') # P4: gap 후 이동 거리 비정상 gap_distance_nm = gap_info.get('gap_distance_nm') or 0.0 avg_sog_before = gap_info.get('avg_sog_before') or 0.0 if gap_info.get('gap_resumed') and gap_min > 0: gap_hours = gap_min / 60.0 # 예상 이동 = avg_sog * gap_hours. 2배 초과면 비정상 expected = max(gap_hours * max(avg_sog_before, 1.0), 0.5) if gap_distance_nm > expected * 2.0: score += 20 patterns.append('distance_anomaly') # P5: 주간 조업 시간 OFF if 6 <= now_kst_hour < 18 and gap_start_state == 'FISHING': score += 15 patterns.append('daytime_fishing_off') # P6: gap 직전 이상 행동 if gap_info.get('pre_gap_turn_or_teleport'): score += 15 patterns.append('teleport_before_gap') # P7: 무허가 if not is_permitted: score += 10 patterns.append('unpermitted') # P8: gap 길이 if gap_min >= 360: score += 15 patterns.append('very_long_gap') elif gap_min >= 180: score += 10 patterns.append('long_gap') # P9: 선종별 가중치 (signal-batch API 데이터) if ship_kind_code == '000020': # 어선이면서 dark → 불법조업 의도 가능성 score += 10 patterns.append('fishing_vessel_dark') elif ship_kind_code == '000023': # 화물선은 원양 항해 중 자연 gap 빈번 score -= 10 patterns.append('cargo_natural_gap') # P10: 항해 상태 기반 의도성 if nav_status: status_lower = nav_status.lower() if 'under way' in status_lower and gap_start_sog > 3.0: # 항행 중 갑자기 OFF → 의도적 score += 20 patterns.append('underway_deliberate_off') elif 'anchor' in status_lower or 'moored' in status_lower: # 정박 중 gap → 자연스러움 score -= 15 patterns.append('anchored_natural_gap') # P11: heading vs COG 불일치 (의도적 방향 전환) if heading is not None and last_cog is not None: diff = abs(heading - last_cog) % 360 if diff > 180: diff = 360 - diff if diff > 60: score += 15 patterns.append('heading_cog_mismatch') # 감점: gap 시작 위치가 한국 수신 커버리지 밖 → 자연 gap 가능성 높음 if not _is_in_kr_coverage(gap_start_lat, gap_start_lon): score -= 50 patterns.append('out_of_coverage') score = max(0, min(100, score)) if score >= 70: tier = 'CRITICAL' elif score >= 50: tier = 'HIGH' elif score >= 30: tier = 'WATCH' else: tier = 'NONE' return score, patterns, tier