"""쌍끌이 트롤 공조 탐지 — DAR-03 G-06. 두 선박의 AIS 궤적을 분석하여 쌍끌이 조업 여부를 판정한다. 판정 기준 (DAR-03 / Kroodsma 2018): - 선박 간격 ≤ 500m (0.27 NM) - 속력 차이 ≤ 0.5 kn - 방향 차이 ≤ 10° - 조업 속력 2.0~4.0 kn - 지속 시간 ≥ 2시간 - 동시 AIS 차단 ≥ 30분 → P-01 추가 """ from __future__ import annotations import logging import math import pandas as pd try: from algorithms.location import haversine_nm except ImportError: def haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float: # type: ignore[misc] """두 좌표 간 거리 (해리). fallback 구현.""" R = 3440.065 dlat = math.radians(lat2 - lat1) dlon = math.radians(lon2 - lon1) a = (math.sin(dlat / 2) ** 2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2) ** 2) return 2 * R * math.asin(math.sqrt(a)) logger = logging.getLogger(__name__) # ────────────────────────────────────────────────────────────── # 상수 # ────────────────────────────────────────────────────────────── PROXIMITY_NM = 0.27 # 500m ≈ 0.27 NM SOG_DELTA_MAX = 0.5 # kn COG_DELTA_MAX = 10.0 # degrees SOG_MIN = 2.0 # kn (조업 속력 하한) SOG_MAX = 4.0 # kn (조업 속력 상한) MIN_SYNC_CYCLES = 24 # 24 × 5min = 2시간 SIMULTANEOUS_GAP_MIN = 30 # 동시 AIS 차단 기준 (분) CYCLE_INTERVAL_MIN = 5 # 5분 리샘플 데이터 # scan_unregistered_pairs 전용 CELL_SIZE = 0.01 # ~1.1km 격자 CANDIDATE_PROXIMITY_FACTOR = 2.0 # 후보 탐색 반경: PROXIMITY_NM × 2 CANDIDATE_SOG_MIN = 1.5 # 후보 속력 하한 (완화) CANDIDATE_SOG_MAX = 5.0 # 후보 속력 상한 (완화) # ────────────────────────────────────────────────────────────── # 내부 헬퍼 # ────────────────────────────────────────────────────────────── def _cog_delta(cog_a: float, cog_b: float) -> float: """두 COG 값의 각도 차이 (0~180°).""" return abs((cog_a - cog_b + 180.0) % 360.0 - 180.0) def _find_gaps(df: pd.DataFrame, gap_threshold_min: float = 10.0) -> list[tuple[pd.Timestamp, pd.Timestamp]]: """DataFrame 행 간 gap_threshold_min 초과 gap 구간 목록 반환. Returns: list of (gap_start, gap_end) as pd.Timestamp pairs """ if len(df) < 2: return [] ts_series = pd.to_datetime(df['timestamp']).sort_values().reset_index(drop=True) gaps: list[tuple[pd.Timestamp, pd.Timestamp]] = [] for i in range(1, len(ts_series)): delta_min = (ts_series.iloc[i] - ts_series.iloc[i - 1]).total_seconds() / 60.0 if delta_min > gap_threshold_min: gaps.append((ts_series.iloc[i - 1], ts_series.iloc[i])) return gaps def _overlap_minutes( gaps_a: list[tuple[pd.Timestamp, pd.Timestamp]], gaps_b: list[tuple[pd.Timestamp, pd.Timestamp]], ) -> float: """두 gap 목록의 시간 구간 겹침 합계 (분).""" total = 0.0 for start_a, end_a in gaps_a: for start_b, end_b in gaps_b: overlap_start = max(start_a, start_b) overlap_end = min(end_a, end_b) if overlap_end > overlap_start: total += (overlap_end - overlap_start).total_seconds() / 60.0 return total def _max_sync_block(synced_series: 'pd.Series[bool]') -> int: """연속 True 블록의 최대 길이 반환.""" max_block = 0 current = 0 for val in synced_series: if val: current += 1 max_block = max(max_block, current) else: current = 0 return max_block def _cell_key(lat: float, lon: float) -> tuple[int, int]: return (round(lat / CELL_SIZE), round(lon / CELL_SIZE)) def _default_result(mmsi_b: str) -> dict: return { 'pair_detected': False, 'sync_duration_min': 0.0, 'max_sync_block_min': 0.0, 'mean_separation_nm': 0.0, 'sog_delta_mean': 0.0, 'cog_delta_mean': 0.0, 'simultaneous_gap_min': 0.0, 'g_codes': [], 'confidence': 0.0, 'pair_mmsi': mmsi_b, } # ────────────────────────────────────────────────────────────── # 공개 API # ────────────────────────────────────────────────────────────── def detect_pair_trawl( df_a: pd.DataFrame, df_b: pd.DataFrame, mmsi_a: str, mmsi_b: str, ) -> dict: """쌍끌이 트롤 공조 탐지 (DAR-03 G-06). Args: df_a: 선박 A의 AIS DataFrame. 필수 컬럼: timestamp, lat, lon, sog, cog df_b: 선박 B의 AIS DataFrame. 필수 컬럼: timestamp, lat, lon, sog, cog mmsi_a: 선박 A MMSI mmsi_b: 선박 B MMSI (결과의 pair_mmsi에 기록) Returns: { 'pair_detected': bool, 'sync_duration_min': float, 'max_sync_block_min': float, 'mean_separation_nm': float, 'sog_delta_mean': float, 'cog_delta_mean': float, 'simultaneous_gap_min': float, 'g_codes': list[str], 'confidence': float, 'pair_mmsi': str, } """ required_cols = {'timestamp', 'lat', 'lon', 'sog', 'cog'} if df_a.empty or df_b.empty: logger.debug('pair_trawl(%s, %s): empty DataFrame', mmsi_a, mmsi_b) return _default_result(mmsi_b) missing_a = required_cols - set(df_a.columns) missing_b = required_cols - set(df_b.columns) if missing_a or missing_b: logger.warning( 'pair_trawl(%s, %s): missing columns a=%s b=%s', mmsi_a, mmsi_b, missing_a, missing_b, ) return _default_result(mmsi_b) # ── Step 1: timestamp inner join ──────────────────────── a = df_a[['timestamp', 'lat', 'lon', 'sog', 'cog']].copy() b = df_b[['timestamp', 'lat', 'lon', 'sog', 'cog']].copy() a['timestamp'] = pd.to_datetime(a['timestamp']) b['timestamp'] = pd.to_datetime(b['timestamp']) merged = pd.merge( a.rename(columns={'lat': 'lat_a', 'lon': 'lon_a', 'sog': 'sog_a', 'cog': 'cog_a'}), b.rename(columns={'lat': 'lat_b', 'lon': 'lon_b', 'sog': 'sog_b', 'cog': 'cog_b'}), on='timestamp', how='inner', ).sort_values('timestamp').reset_index(drop=True) total_aligned = len(merged) if total_aligned < MIN_SYNC_CYCLES: logger.debug( 'pair_trawl(%s, %s): only %d aligned rows (need %d)', mmsi_a, mmsi_b, total_aligned, MIN_SYNC_CYCLES, ) return _default_result(mmsi_b) # ── Step 2: 행별 동기화 지표 계산 ─────────────────────── merged['distance_nm'] = merged.apply( lambda r: haversine_nm(r['lat_a'], r['lon_a'], r['lat_b'], r['lon_b']), axis=1, ) merged['sog_delta'] = (merged['sog_a'] - merged['sog_b']).abs() merged['cog_delta'] = merged.apply( lambda r: _cog_delta(r['cog_a'], r['cog_b']), axis=1, ) merged['both_in_range'] = ( merged['sog_a'].between(SOG_MIN, SOG_MAX) & merged['sog_b'].between(SOG_MIN, SOG_MAX) ) merged['synced'] = ( (merged['distance_nm'] <= PROXIMITY_NM) & (merged['sog_delta'] <= SOG_DELTA_MAX) & (merged['cog_delta'] <= COG_DELTA_MAX) & merged['both_in_range'] ) # ── Step 3: 연속 블록 탐지 ────────────────────────────── max_block_cycles = _max_sync_block(merged['synced']) if max_block_cycles < MIN_SYNC_CYCLES: logger.debug( 'pair_trawl(%s, %s): max sync block %d cycles < %d required', mmsi_a, mmsi_b, max_block_cycles, MIN_SYNC_CYCLES, ) return _default_result(mmsi_b) total_synced = int(merged['synced'].sum()) sync_duration_min = total_synced * CYCLE_INTERVAL_MIN max_sync_block_min = max_block_cycles * CYCLE_INTERVAL_MIN mean_separation_nm = float(merged.loc[merged['synced'], 'distance_nm'].mean()) sog_delta_mean = float(merged.loc[merged['synced'], 'sog_delta'].mean()) cog_delta_mean = float(merged.loc[merged['synced'], 'cog_delta'].mean()) # ── Step 4: 동시 AIS 차단 검출 ────────────────────────── gaps_a = _find_gaps(df_a, gap_threshold_min=10.0) gaps_b = _find_gaps(df_b, gap_threshold_min=10.0) simultaneous_gap_min = _overlap_minutes(gaps_a, gaps_b) g_codes: list[str] = [] if simultaneous_gap_min >= SIMULTANEOUS_GAP_MIN: g_codes.append('P-01') # ── Step 5: 신뢰도 산출 ───────────────────────────────── sync_ratio = min(1.0, total_synced / total_aligned) synced_distances = merged.loc[merged['synced'], 'distance_nm'] if len(synced_distances) > 1: std_distance = float(synced_distances.std()) else: std_distance = 0.0 separation_stability = 1.0 - min(1.0, std_distance / PROXIMITY_NM) sog_sync_quality = 1.0 - min(1.0, sog_delta_mean / SOG_DELTA_MAX) confidence = round( sync_ratio * 0.4 + separation_stability * 0.3 + sog_sync_quality * 0.3, 4, ) logger.info( 'pair_trawl(%s, %s): detected — sync=%.0fmin max_block=%.0fmin ' 'sep=%.3fnm confidence=%.3f g_codes=%s', mmsi_a, mmsi_b, sync_duration_min, max_sync_block_min, mean_separation_nm, confidence, g_codes, ) return { 'pair_detected': True, 'sync_duration_min': round(sync_duration_min, 1), 'max_sync_block_min': round(max_sync_block_min, 1), 'mean_separation_nm': round(mean_separation_nm, 4), 'sog_delta_mean': round(sog_delta_mean, 4), 'cog_delta_mean': round(cog_delta_mean, 4), 'simultaneous_gap_min': round(simultaneous_gap_min, 1), 'g_codes': g_codes, 'confidence': confidence, 'pair_mmsi': mmsi_b, } def scan_unregistered_pairs( vessel_dfs: dict[str, pd.DataFrame], registered_pairs: set[tuple[str, str]], ) -> list[tuple[str, str]]: """fleet_registry에 없는 TRAWL 선박 중 500m 이내 + 속력 동기화 조건을 만족하는 쌍 후보 반환. cell-key partitioning으로 O(n²) 회피. Args: vessel_dfs: mmsi → AIS DataFrame. 각 DataFrame은 timestamp, lat, lon, sog 컬럼 필요 registered_pairs: 이미 확인된 쌍 (fleet_tracker 제공). 정규화: (smaller, larger) Returns: list of (mmsi_a, mmsi_b) candidate pairs (정규화된 순서) """ if len(vessel_dfs) < 2: return [] CANDIDATE_PROXIMITY_NM = PROXIMITY_NM * CANDIDATE_PROXIMITY_FACTOR # ── 각 선박의 마지막 위치 추출 ────────────────────────── last_positions: dict[str, dict] = {} for mmsi, df in vessel_dfs.items(): if df.empty or 'lat' not in df.columns or 'lon' not in df.columns or 'sog' not in df.columns: continue try: last_row = df.sort_values('timestamp').iloc[-1] lat = float(last_row['lat']) lon = float(last_row['lon']) sog = float(last_row['sog']) except Exception: continue last_positions[mmsi] = {'lat': lat, 'lon': lon, 'sog': sog} if len(last_positions) < 2: return [] # ── cell-key 격자 구성 ─────────────────────────────────── cell_map: dict[tuple[int, int], list[str]] = {} for mmsi, pos in last_positions.items(): key = _cell_key(pos['lat'], pos['lon']) if key not in cell_map: cell_map[key] = [] cell_map[key].append(mmsi) # ── 인접 9셀 내 후보 쌍 탐색 ──────────────────────────── candidates: list[tuple[str, str]] = [] checked: set[tuple[str, str]] = set() for mmsi_a, pos_a in last_positions.items(): base_cell = _cell_key(pos_a['lat'], pos_a['lon']) # 인접 8셀 + 자기 셀 수집 neighbor_mmsis: list[str] = [] for dr in (-1, 0, 1): for dc in (-1, 0, 1): neighbor_cell = (base_cell[0] + dr, base_cell[1] + dc) neighbor_mmsis.extend(cell_map.get(neighbor_cell, [])) for mmsi_b in neighbor_mmsis: if mmsi_b == mmsi_a: continue # 정규화된 쌍 키 pair_key: tuple[str, str] = ( (mmsi_a, mmsi_b) if mmsi_a < mmsi_b else (mmsi_b, mmsi_a) ) # 중복 검사 if pair_key in checked: continue checked.add(pair_key) # 이미 등록된 쌍 건너뜀 if pair_key in registered_pairs: continue pos_b = last_positions.get(mmsi_b) if pos_b is None: continue # 속력 범위 필터 (완화 기준) sog_a = pos_a['sog'] sog_b = pos_b['sog'] if not (CANDIDATE_SOG_MIN <= sog_a <= CANDIDATE_SOG_MAX): continue if not (CANDIDATE_SOG_MIN <= sog_b <= CANDIDATE_SOG_MAX): continue # 거리 필터 dist_nm = haversine_nm(pos_a['lat'], pos_a['lon'], pos_b['lat'], pos_b['lon']) if dist_nm > CANDIDATE_PROXIMITY_NM: continue candidates.append(pair_key) logger.debug( 'scan_unregistered_pairs: %d vessels, %d candidates found', len(last_positions), len(candidates), ) return candidates