""" 어구 정체성 충돌(GEAR_IDENTITY_COLLISION) 탐지 알고리즘. 동일 어구 이름이 서로 다른 MMSI 로 같은 5분 사이클 내 동시 AIS 송출되는 케이스를 스푸핑/복제 의심 패턴으로 탐지한다. fleet_tracker.track_gear_identity() 루프 진입 전에 사이클 단위로 사전 집계하는 데 사용된다. """ from datetime import datetime from itertools import combinations from typing import Optional from algorithms.location import haversine_nm # ────────────────────────────────────────────────────────────────── # 공존 판정 · 심각도 임계 # ────────────────────────────────────────────────────────────────── MIN_COEXISTENCE_GROUP = 2 # 같은 이름에 MMSI 2개 이상 IMPOSSIBLE_SPEED_KTS = 60.0 # 두 위치 이동에 필요한 속도가 이보다 크면 물리 불가능 CRITICAL_DISTANCE_KM = 50.0 # 단발이라도 이 거리 이상이면 즉시 CRITICAL HIGH_DISTANCE_KM = 10.0 # HIGH 기준 거리 CRITICAL_COEXISTENCE_COUNT = 3 # 누적 공존 N회 이상이면 CRITICAL 승격 HIGH_COEXISTENCE_COUNT = 2 # 누적 공존 N회 이상이면 HIGH NM_TO_KM = 1.852 # 1 nautical mile = 1.852 km def detect_gear_name_collisions( gear_signals: list[dict], now: datetime, ) -> list[dict]: """동일 이름 · 다중 MMSI 공존 세트 추출. Args: gear_signals: [{mmsi, name, lat, lon}, ...] — track_gear_identity 와 동일 입력. now: 사이클 기준 시각(UTC). Returns: 공존 쌍 리스트. 세 개 이상 동시 송출 케이스는 모든 2-조합을 생성한다. 각 원소: { 'name': str, 'mmsi_lo': str, # 사전순으로 작은 MMSI 'mmsi_hi': str, 'lat_lo', 'lon_lo': float, 'lat_hi', 'lon_hi': float, 'distance_km': float, 'parent_name': Optional[str], # 힌트 (GEAR_PATTERN parent 그룹, 있으면) 'observed_at': datetime, } """ if not gear_signals: return [] # 이름 기준 그룹핑 by_name: dict[str, list[dict]] = {} for sig in gear_signals: name = sig.get('name') mmsi = sig.get('mmsi') if not name or not mmsi: continue by_name.setdefault(name, []).append(sig) collisions: list[dict] = [] for name, signals in by_name.items(): if len(signals) < MIN_COEXISTENCE_GROUP: continue # 같은 MMSI 중복은 제거 (한 cycle 에 동일 MMSI 가 다수 신호로 들어올 수 있음) unique_by_mmsi: dict[str, dict] = {} for sig in signals: unique_by_mmsi.setdefault(sig['mmsi'], sig) if len(unique_by_mmsi) < MIN_COEXISTENCE_GROUP: continue parent_name = _infer_parent_name(name) mmsis = sorted(unique_by_mmsi.keys()) for a, b in combinations(mmsis, 2): sa, sb = unique_by_mmsi[a], unique_by_mmsi[b] dist_km = _haversine_km( sa.get('lat'), sa.get('lon'), sb.get('lat'), sb.get('lon'), ) collisions.append({ 'name': name, 'mmsi_lo': a, 'mmsi_hi': b, 'lat_lo': _to_float(sa.get('lat')), 'lon_lo': _to_float(sa.get('lon')), 'lat_hi': _to_float(sb.get('lat')), 'lon_hi': _to_float(sb.get('lon')), 'distance_km': dist_km, 'parent_name': parent_name, 'observed_at': now, }) return collisions def classify_severity( coexistence_count: int, max_distance_km: Optional[float], swap_count: int = 0, ) -> str: """충돌 심각도 산정. - CRITICAL: 거리 불가능 / 누적 공존 N회 이상 - HIGH: 상당 거리 / 2회 이상 - MEDIUM: 단발 근거리 - LOW: 근거리 + 거리 정보 없음 """ distance = max_distance_km or 0.0 if distance >= CRITICAL_DISTANCE_KM: return 'CRITICAL' if coexistence_count >= CRITICAL_COEXISTENCE_COUNT: return 'CRITICAL' if distance >= HIGH_DISTANCE_KM: return 'HIGH' if coexistence_count >= HIGH_COEXISTENCE_COUNT: return 'HIGH' if swap_count >= HIGH_COEXISTENCE_COUNT: return 'HIGH' if max_distance_km is None or max_distance_km < 0.1: return 'LOW' return 'MEDIUM' def _haversine_km(lat1, lon1, lat2, lon2) -> float: """두 좌표 사이 거리를 km 로 반환. 입력 누락 시 0.0.""" try: if lat1 is None or lon1 is None or lat2 is None or lon2 is None: return 0.0 nm = haversine_nm(float(lat1), float(lon1), float(lat2), float(lon2)) return round(nm * NM_TO_KM, 2) except (TypeError, ValueError): return 0.0 def _to_float(val) -> Optional[float]: if val is None: return None try: return float(val) except (TypeError, ValueError): return None def _infer_parent_name(gear_name: str) -> Optional[str]: """어구 이름에서 모선명 부분 추출 (느슨). fleet_tracker 가 이미 GEAR_PATTERN 으로 정교하게 파싱하지만, 알고리즘 모듈 독립성을 위해 단순 휴리스틱만 유지한다. 값이 필요한 경우 fleet_tracker 호출부에서 덮어쓴다. """ if not gear_name: return None # '_숫자' 로 끝나는 서픽스 제거 base = gear_name parts = base.rsplit('_', 2) if len(parts) >= 2 and any(ch.isdigit() for ch in parts[-1]): return parts[0] return None