"""선단(Fleet) 패턴 탐지 — 공간+행동 기반. 단순 공간 근접이 아닌, 협조 운항 패턴(유사 속도/방향/역할)으로 선단을 판별. - PT 저인망: 2척, 3NM 이내, 유사 속도(2~5kn) + 유사 방향(20° 이내) - PS 선망: 3~5척, 2NM 이내, 모선(고속)+조명선(정지)+운반선(저속 대형) - FC 환적: 2척, 0.5NM 이내, 양쪽 저속(2kn 이하) """ import logging from typing import Optional import numpy as np import pandas as pd from algorithms.location import haversine_nm, dist_to_baseline logger = logging.getLogger(__name__) def _heading_diff(h1: float, h2: float) -> float: """두 방향 사이 최소 각도차 (0~180).""" d = abs(h1 - h2) % 360 return d if d <= 180 else 360 - d def detect_fleet_patterns( vessel_dfs: dict[str, pd.DataFrame], ) -> dict[int, list[dict]]: """행동 패턴 기반 선단 탐지. Returns: {fleet_id: [{mmsi, lat, lon, sog, cog, role, pattern}, ...]} """ # 각 선박의 최신 스냅샷 추출 snapshots: list[dict] = [] for mmsi, df in vessel_dfs.items(): if df is None or len(df) == 0: continue last = df.iloc[-1] snapshots.append({ 'mmsi': mmsi, 'lat': float(last['lat']), 'lon': float(last['lon']), 'sog': float(last.get('sog', 0)), 'cog': float(last.get('cog', 0)), }) if len(snapshots) < 2: return {} matched: set[str] = set() fleets: dict[int, list[dict]] = {} fleet_id = 0 # 1차: PT 저인망 쌍 탐지 (2척, 3NM, 유사 속도/방향) for i in range(len(snapshots)): if snapshots[i]['mmsi'] in matched: continue a = snapshots[i] for j in range(i + 1, len(snapshots)): if snapshots[j]['mmsi'] in matched: continue b = snapshots[j] dist = haversine_nm(a['lat'], a['lon'], b['lat'], b['lon']) if dist > 3.0: continue # 둘 다 조업 속도 (2~5kn) if not (2.0 <= a['sog'] <= 5.0 and 2.0 <= b['sog'] <= 5.0): continue # 유사 속도 (차이 1kn 미만) if abs(a['sog'] - b['sog']) >= 1.0: continue # 유사 방향 (20° 미만) if _heading_diff(a['cog'], b['cog']) >= 20.0: continue fleets[fleet_id] = [ {**a, 'role': 'LEADER', 'pattern': 'TRAWL_PAIR'}, {**b, 'role': 'MEMBER', 'pattern': 'TRAWL_PAIR'}, ] matched.add(a['mmsi']) matched.add(b['mmsi']) fleet_id += 1 break # 2차: FC 환적 쌍 탐지 (2척, 0.5NM, 양쪽 저속) for i in range(len(snapshots)): if snapshots[i]['mmsi'] in matched: continue a = snapshots[i] for j in range(i + 1, len(snapshots)): if snapshots[j]['mmsi'] in matched: continue b = snapshots[j] dist = haversine_nm(a['lat'], a['lon'], b['lat'], b['lon']) if dist > 0.5: continue if a['sog'] > 2.0 or b['sog'] > 2.0: continue fleets[fleet_id] = [ {**a, 'role': 'LEADER', 'pattern': 'TRANSSHIP'}, {**b, 'role': 'MEMBER', 'pattern': 'TRANSSHIP'}, ] matched.add(a['mmsi']) matched.add(b['mmsi']) fleet_id += 1 break # 3차: PS 선망 선단 탐지 (3~10척, 2NM 이내 클러스터) unmatched = [s for s in snapshots if s['mmsi'] not in matched] for anchor in unmatched: if anchor['mmsi'] in matched: continue nearby = [] for other in unmatched: if other['mmsi'] == anchor['mmsi'] or other['mmsi'] in matched: continue dist = haversine_nm(anchor['lat'], anchor['lon'], other['lat'], other['lon']) if dist <= 2.0: nearby.append(other) if len(nearby) < 2: # 본인 포함 3척 이상 continue # 역할 분류: 고속(모선), 정지(조명선), 나머지(멤버) members = [{**anchor, 'role': 'LEADER', 'pattern': 'PURSE_SEINE'}] matched.add(anchor['mmsi']) for n in nearby[:9]: # 최대 10척 if n['sog'] < 0.5: role = 'LIGHTING' else: role = 'MEMBER' members.append({**n, 'role': role, 'pattern': 'PURSE_SEINE'}) matched.add(n['mmsi']) fleets[fleet_id] = members fleet_id += 1 logger.info('fleet detection: %d fleets found (%d vessels matched)', len(fleets), len(matched)) return fleets def assign_fleet_roles( vessel_dfs: dict[str, pd.DataFrame], cluster_map: dict[str, int], ) -> dict[str, dict]: """선단 역할 할당 — 패턴 매칭 기반. cluster_map은 파이프라인에서 전달되지만, 여기서는 vessel_dfs로 직접 패턴 탐지. """ fleets = detect_fleet_patterns(vessel_dfs) results: dict[str, dict] = {} # 매칭된 선박 (fleet_id를 cluster_id로 사용) fleet_mmsis: set[str] = set() for fid, members in fleets.items(): for m in members: fleet_mmsis.add(m['mmsi']) results[m['mmsi']] = { 'cluster_id': fid, 'cluster_size': len(members), 'is_leader': m['role'] == 'LEADER', 'fleet_role': m['role'], } # 매칭 안 된 선박 → NOISE (cluster_id = -1) for mmsi in vessel_dfs: if mmsi not in fleet_mmsis: results[mmsi] = { 'cluster_id': -1, 'cluster_size': 0, 'is_leader': False, 'fleet_role': 'NOISE', } return results