"""쌍끌이 트롤 공조 탐지 — DAR-03 G-06. 두 선박의 AIS 궤적을 분석하여 쌍끌이 조업 여부를 판정한다. 판정 기준 (DAR-03 / Kroodsma 2018): - 선박 간격 ≤ 500m (0.27 NM) - 속력 차이 ≤ 0.5 kn - 방향 차이 ≤ 10° - 조업 속력 2.0~4.0 kn - 지속 시간 ≥ 2시간 - 동시 AIS 차단 ≥ 30분 → P-01 추가 """ from __future__ import annotations import logging import math from typing import Optional import pandas as pd try: from algorithms.location import haversine_nm except ImportError: def haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float: # type: ignore[misc] """두 좌표 간 거리 (해리). fallback 구현.""" R = 3440.065 dlat = math.radians(lat2 - lat1) dlon = math.radians(lon2 - lon1) a = (math.sin(dlat / 2) ** 2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2) ** 2) return 2 * R * math.asin(math.sqrt(a)) logger = logging.getLogger(__name__) # ────────────────────────────────────────────────────────────── # 상수 # ────────────────────────────────────────────────────────────── PROXIMITY_NM = 0.27 # 500m ≈ 0.27 NM (STRONG tier 스펙) SOG_DELTA_MAX = 0.5 # kn (STRONG) COG_DELTA_MAX = 10.0 # degrees (STRONG) SOG_MIN = 2.0 # kn (조업 속력 하한 — STRONG) SOG_MAX = 4.0 # kn (조업 속력 상한 — STRONG) MIN_SYNC_CYCLES = 24 # 24 × 5min = 2시간 (STRONG) SIMULTANEOUS_GAP_MIN = 30 # 동시 AIS 차단 기준 (분) CYCLE_INTERVAL_MIN = 5 # 5분 리샘플 데이터 # Tier별 완화 임계 — AND 게이트로 0건 만들지 않고 신호 강도에 맞춰 분류 # STRONG : 스펙 100% (2h 연속 + 전 조건) # PROBABLE : 1h 연속 또는 누적 2h + sync_ratio 0.6 # SUSPECT : 30분+ + sync_ratio 0.3 (약한 신호, 플래그만) PROBABLE_MIN_BLOCK_CYCLES = 12 # 1h PROBABLE_MIN_SYNC_RATIO = 0.6 SUSPECT_MIN_BLOCK_CYCLES = 6 # 30min SUSPECT_MIN_SYNC_RATIO = 0.3 # 완화 tier 기준: proximity 800m, SOG 1.5-5.0kn, sog_delta 1.0, cog 20° PROBABLE_PROXIMITY_NM = 0.43 # ≈ 800m PROBABLE_SOG_DELTA_MAX = 1.0 PROBABLE_COG_DELTA_MAX = 20.0 PROBABLE_SOG_MIN = 1.5 PROBABLE_SOG_MAX = 5.0 # scan_unregistered_pairs 전용 CELL_SIZE = 0.01 # ~1.1km 격자 CANDIDATE_PROXIMITY_FACTOR = 2.0 # 후보 탐색 반경: PROXIMITY_NM × 2 CANDIDATE_SOG_MIN = 1.5 # 후보 속력 하한 (완화) CANDIDATE_SOG_MAX = 5.0 # 후보 속력 상한 (완화) # ────────────────────────────────────────────────────────────── # 내부 헬퍼 # ────────────────────────────────────────────────────────────── def _cog_delta(cog_a: float, cog_b: float) -> float: """두 COG 값의 각도 차이 (0~180°).""" return abs((cog_a - cog_b + 180.0) % 360.0 - 180.0) def _find_gaps(df: pd.DataFrame, gap_threshold_min: float = 10.0) -> list[tuple[pd.Timestamp, pd.Timestamp]]: """DataFrame 행 간 gap_threshold_min 초과 gap 구간 목록 반환. Returns: list of (gap_start, gap_end) as pd.Timestamp pairs """ if len(df) < 2: return [] ts_series = pd.to_datetime(df['timestamp']).sort_values().reset_index(drop=True) gaps: list[tuple[pd.Timestamp, pd.Timestamp]] = [] for i in range(1, len(ts_series)): delta_min = (ts_series.iloc[i] - ts_series.iloc[i - 1]).total_seconds() / 60.0 if delta_min > gap_threshold_min: gaps.append((ts_series.iloc[i - 1], ts_series.iloc[i])) return gaps def _overlap_minutes( gaps_a: list[tuple[pd.Timestamp, pd.Timestamp]], gaps_b: list[tuple[pd.Timestamp, pd.Timestamp]], ) -> float: """두 gap 목록의 시간 구간 겹침 합계 (분).""" total = 0.0 for start_a, end_a in gaps_a: for start_b, end_b in gaps_b: overlap_start = max(start_a, start_b) overlap_end = min(end_a, end_b) if overlap_end > overlap_start: total += (overlap_end - overlap_start).total_seconds() / 60.0 return total def _max_sync_block(synced_series: 'pd.Series[bool]') -> int: """연속 True 블록의 최대 길이 반환.""" max_block = 0 current = 0 for val in synced_series: if val: current += 1 max_block = max(max_block, current) else: current = 0 return max_block def _cell_key(lat: float, lon: float) -> tuple[int, int]: return (round(lat / CELL_SIZE), round(lon / CELL_SIZE)) def _default_result(mmsi_b: str, reject_reason: str = '') -> dict: return { 'pair_detected': False, 'tier': None, 'reject_reason': reject_reason, 'sync_duration_min': 0.0, 'max_sync_block_min': 0.0, 'mean_separation_nm': 0.0, 'sog_delta_mean': 0.0, 'cog_delta_mean': 0.0, 'simultaneous_gap_min': 0.0, 'g_codes': [], 'confidence': 0.0, 'pair_mmsi': mmsi_b, } # 사이클별 거부 사유 카운터 (scheduler 가 읽어 로그 출력 후 reset) REJECT_COUNTERS: dict[str, int] = { 'empty_df': 0, 'missing_columns': 0, 'insufficient_aligned': 0, 'no_sync_at_any_tier': 0, } def reset_reject_counters() -> None: for k in REJECT_COUNTERS: REJECT_COUNTERS[k] = 0 # ────────────────────────────────────────────────────────────── # 공개 API # ────────────────────────────────────────────────────────────── def detect_pair_trawl( df_a: pd.DataFrame, df_b: pd.DataFrame, mmsi_a: str, mmsi_b: str, role_a: str = 'UNKNOWN', role_b: str = 'UNKNOWN', similarity: float = 0.0, ) -> dict: """쌍끌이 트롤 공조 탐지 (DAR-03 G-06). Args: df_a: 선박 A의 AIS DataFrame. 필수 컬럼: timestamp, lat, lon, sog, cog df_b: 선박 B의 AIS DataFrame. 필수 컬럼: timestamp, lat, lon, sog, cog mmsi_a: 선박 A MMSI mmsi_b: 선박 B MMSI (결과의 pair_mmsi에 기록) role_a/role_b: 'FISHING'|'CARRIER'|'PT_MAIN'|'PT_SUB'|'UNKNOWN' similarity: find_pair_candidates가 계산한 1차 유사도 (0~1) Returns: 위 필드들 + role_a/role_b/similarity/bonus/pair_type """ if df_a.empty or df_b.empty: REJECT_COUNTERS['empty_df'] += 1 logger.debug('pair_trawl(%s, %s): empty DataFrame', mmsi_a, mmsi_b) return _default_result(mmsi_b, 'empty_df') df_a = _ensure_sog_cog(df_a) df_b = _ensure_sog_cog(df_b) join_key = _pair_join_key(df_a) required_cols = {join_key, 'lat', 'lon', 'sog', 'cog'} missing_a = required_cols - set(df_a.columns) missing_b = required_cols - set(df_b.columns) if missing_a or missing_b: REJECT_COUNTERS['missing_columns'] += 1 logger.warning( 'pair_trawl(%s, %s): missing columns a=%s b=%s', mmsi_a, mmsi_b, missing_a, missing_b, ) return _default_result(mmsi_b, 'missing_columns') # ── Step 1: join_key (time_bucket 우선, fallback timestamp) inner join ── a = df_a[[join_key, 'lat', 'lon', 'sog', 'cog']].copy() b = df_b[[join_key, 'lat', 'lon', 'sog', 'cog']].copy() # 같은 bucket 에 다수 샘플이 있으면 평균 a = a.groupby(join_key, as_index=False).mean(numeric_only=True) b = b.groupby(join_key, as_index=False).mean(numeric_only=True) merged = pd.merge( a.rename(columns={'lat': 'lat_a', 'lon': 'lon_a', 'sog': 'sog_a', 'cog': 'cog_a'}), b.rename(columns={'lat': 'lat_b', 'lon': 'lon_b', 'sog': 'sog_b', 'cog': 'cog_b'}), on=join_key, how='inner', ).sort_values(join_key).reset_index(drop=True) total_aligned = len(merged) # SUSPECT tier 조차 성립 불가 (30min = 6 cycles) if total_aligned < SUSPECT_MIN_BLOCK_CYCLES: REJECT_COUNTERS['insufficient_aligned'] += 1 logger.debug( 'pair_trawl(%s, %s): only %d aligned rows (need %d for SUSPECT)', mmsi_a, mmsi_b, total_aligned, SUSPECT_MIN_BLOCK_CYCLES, ) return _default_result(mmsi_b, 'insufficient_aligned') # ── Step 2: 행별 동기화 지표 계산 ─────────────────────── merged['distance_nm'] = merged.apply( lambda r: haversine_nm(r['lat_a'], r['lon_a'], r['lat_b'], r['lon_b']), axis=1, ) merged['sog_delta'] = (merged['sog_a'] - merged['sog_b']).abs() merged['cog_delta'] = merged.apply( lambda r: _cog_delta(r['cog_a'], r['cog_b']), axis=1, ) # STRONG tier: 스펙 100% — 500m / SOG 2-4 / sog_delta 0.5 / cog 10° / 24 cycles 연속 merged['synced_strong'] = ( (merged['distance_nm'] <= PROXIMITY_NM) & (merged['sog_delta'] <= SOG_DELTA_MAX) & (merged['cog_delta'] <= COG_DELTA_MAX) & merged['sog_a'].between(SOG_MIN, SOG_MAX) & merged['sog_b'].between(SOG_MIN, SOG_MAX) ) # PROBABLE tier: 완화 — 800m / SOG 1.5-5 / sog_delta 1.0 / cog 20° / 12 cycles 연속 merged['synced_probable'] = ( (merged['distance_nm'] <= PROBABLE_PROXIMITY_NM) & (merged['sog_delta'] <= PROBABLE_SOG_DELTA_MAX) & (merged['cog_delta'] <= PROBABLE_COG_DELTA_MAX) & merged['sog_a'].between(PROBABLE_SOG_MIN, PROBABLE_SOG_MAX) & merged['sog_b'].between(PROBABLE_SOG_MIN, PROBABLE_SOG_MAX) ) # ── Step 3: tier 분류 — 가장 높은 tier 채택 ────────────── strong_block = _max_sync_block(merged['synced_strong']) probable_block = _max_sync_block(merged['synced_probable']) strong_total = int(merged['synced_strong'].sum()) probable_total = int(merged['synced_probable'].sum()) strong_ratio = strong_total / total_aligned if total_aligned else 0.0 probable_ratio = probable_total / total_aligned if total_aligned else 0.0 tier: Optional[str] = None if strong_block >= MIN_SYNC_CYCLES: tier = 'STRONG' used_col = 'synced_strong' max_block_cycles = strong_block elif probable_block >= PROBABLE_MIN_BLOCK_CYCLES and probable_ratio >= PROBABLE_MIN_SYNC_RATIO: tier = 'PROBABLE' used_col = 'synced_probable' max_block_cycles = probable_block elif probable_block >= SUSPECT_MIN_BLOCK_CYCLES and probable_ratio >= SUSPECT_MIN_SYNC_RATIO: tier = 'SUSPECT' used_col = 'synced_probable' max_block_cycles = probable_block else: REJECT_COUNTERS['no_sync_at_any_tier'] += 1 logger.debug( 'pair_trawl(%s, %s): no tier — strong_block=%d probable_block=%d probable_ratio=%.2f', mmsi_a, mmsi_b, strong_block, probable_block, probable_ratio, ) return _default_result(mmsi_b, 'no_sync_at_any_tier') merged['synced'] = merged[used_col] total_synced = int(merged['synced'].sum()) sync_duration_min = total_synced * CYCLE_INTERVAL_MIN max_sync_block_min = max_block_cycles * CYCLE_INTERVAL_MIN mean_separation_nm = float(merged.loc[merged['synced'], 'distance_nm'].mean()) sog_delta_mean = float(merged.loc[merged['synced'], 'sog_delta'].mean()) cog_delta_mean = float(merged.loc[merged['synced'], 'cog_delta'].mean()) # ── Step 4: 동시 AIS 차단 검출 ────────────────────────── gaps_a = _find_gaps(df_a, gap_threshold_min=10.0) gaps_b = _find_gaps(df_b, gap_threshold_min=10.0) simultaneous_gap_min = _overlap_minutes(gaps_a, gaps_b) g_codes: list[str] = [] if simultaneous_gap_min >= SIMULTANEOUS_GAP_MIN: g_codes.append('P-01') # ── Step 5: 신뢰도 산출 ───────────────────────────────── sync_ratio = min(1.0, total_synced / total_aligned) synced_distances = merged.loc[merged['synced'], 'distance_nm'] if len(synced_distances) > 1: std_distance = float(synced_distances.std()) else: std_distance = 0.0 separation_stability = 1.0 - min(1.0, std_distance / PROXIMITY_NM) sog_sync_quality = 1.0 - min(1.0, sog_delta_mean / SOG_DELTA_MAX) confidence = round( sync_ratio * 0.4 + separation_stability * 0.3 + sog_sync_quality * 0.3, 4, ) # ── role 매칭 가점 (DAR-03 선종 정합) ───────────────────── bonus = 0 pair_type = 'GENERIC' if role_a == 'PT_MAIN' and role_b == 'PT_SUB': bonus = 15; pair_type = 'PT_REGISTERED' elif role_b == 'PT_MAIN' and role_a == 'PT_SUB': bonus = 15; pair_type = 'PT_REGISTERED' elif role_a == 'FISHING' and role_b == 'FISHING': bonus = 10; pair_type = 'COOP_FISHING' elif {role_a, role_b} == {'FISHING', 'CARRIER'}: bonus = 15; pair_type = 'TRANSSHIP_LIKE' logger.info( 'pair_trawl(%s, %s): %s — sync=%.0fmin max_block=%.0fmin ' 'sep=%.3fnm confidence=%.3f bonus=%d type=%s g_codes=%s', mmsi_a, mmsi_b, tier, sync_duration_min, max_sync_block_min, mean_separation_nm, confidence, bonus, pair_type, g_codes, ) return { 'pair_detected': True, 'tier': tier, 'reject_reason': '', 'sync_duration_min': round(sync_duration_min, 1), 'max_sync_block_min': round(max_sync_block_min, 1), 'mean_separation_nm': round(mean_separation_nm, 4), 'sog_delta_mean': round(sog_delta_mean, 4), 'cog_delta_mean': round(cog_delta_mean, 4), 'simultaneous_gap_min': round(simultaneous_gap_min, 1), 'g_codes': g_codes, 'confidence': confidence, 'pair_mmsi': mmsi_b, 'role_a': role_a, 'role_b': role_b, 'similarity': round(similarity, 4), 'bonus': bonus, 'pair_type': pair_type, } def _classify_role( mmsi: str, info: dict, pt_registered: set[str], pt_sub_registered: set[str], ) -> str: """페어 후보 role 판정. 반환: PT_MAIN/PT_SUB/FISHING/CARRIER/UNKNOWN.""" if mmsi in pt_sub_registered: return 'PT_SUB' if mmsi in pt_registered: return 'PT_MAIN' kind = info.get('ship_kind_code', '') if info else '' ship_ty = (info.get('ship_ty') or info.get('vessel_type') or '') if info else '' if kind == '000020': return 'FISHING' if kind in ('000023', '000024'): return 'CARRIER' # 중국 412* 허가 어선은 ship_kind 없어도 FISHING 간주 if mmsi.startswith('412'): return 'FISHING' st = ship_ty.lower() if isinstance(ship_ty, str) else '' if any(k in st for k in ('cargo', 'tanker', 'supply', 'carrier')): return 'CARRIER' if 'fishing' in st: return 'FISHING' return 'UNKNOWN' def _ensure_sog_cog(df: pd.DataFrame) -> pd.DataFrame: """sog/cog 컬럼이 없으면 lat/lon/timestamp 로 haversine 계산하여 추가. pair_trawl 은 vessel_store._tracks(raw_sog 만 보유) 전체를 pool 로 받는다. select_analysis_targets 결과에 의존하면 중국 412* 외 선박(한국 440xxx 등)은 pool 에서 빠지므로, 필요 시 on-demand 로 계산한다. """ if 'sog' in df.columns and 'cog' in df.columns: return df if 'timestamp' not in df.columns or 'lat' not in df.columns or 'lon' not in df.columns: return df try: # cache.vessel_store._compute_sog_cog 를 재사용 from cache.vessel_store import _compute_sog_cog return _compute_sog_cog(df) except Exception: return df def _pair_join_key(df: pd.DataFrame) -> str: """두 선박 궤적 inner-join 시 사용할 시간 키. vessel_store._tracks 는 raw AIS timestamp(ms 단위)를 사용하므로 두 선박 간 동일 timestamp 가 우연히 일치할 확률이 거의 0 → inner join 결과 빈약. 5분 리샘플 단위 time_bucket 컬럼이 있으면 그것을 사용. """ return 'time_bucket' if 'time_bucket' in df.columns else 'timestamp' def _trajectory_similarity( df_a: pd.DataFrame, df_b: pd.DataFrame, min_samples: int = 6, ) -> tuple[float, int, dict]: """두 선박의 공통 타임스탬프 기반 궤적 유사도 (0~1). Returns: (similarity, common_samples, breakdown) breakdown = {location_score, sog_corr, cog_alignment} """ if df_a.empty or df_b.empty: return 0.0, 0, {} df_a = _ensure_sog_cog(df_a) df_b = _ensure_sog_cog(df_b) join_key = _pair_join_key(df_a) cols = {join_key, 'lat', 'lon', 'sog', 'cog'} if cols - set(df_a.columns) or cols - set(df_b.columns): return 0.0, 0, {} try: a = df_a[[join_key, 'lat', 'lon', 'sog', 'cog']].copy() b = df_b[[join_key, 'lat', 'lon', 'sog', 'cog']].copy() # 같은 join_key 에 다수 샘플이 있을 수 있으므로 평균값으로 bucketize a = a.groupby(join_key, as_index=False).mean(numeric_only=True) b = b.groupby(join_key, as_index=False).mean(numeric_only=True) m = pd.merge( a.rename(columns={'lat': 'la', 'lon': 'oa', 'sog': 'sa', 'cog': 'ca'}), b.rename(columns={'lat': 'lb', 'lon': 'ob', 'sog': 'sb', 'cog': 'cb'}), on=join_key, how='inner', ) except Exception: return 0.0, 0, {} n = len(m) if n < min_samples: return 0.0, n, {} # location: 평균 거리 기반 (500m 이내 1.0 → 1km 0.0 선형) dists = [ haversine_nm(r.la, r.oa, r.lb, r.ob) for r in m.itertuples(index=False) ] mean_dist_nm = sum(dists) / len(dists) location_score = max(0.0, min(1.0, 1.0 - (mean_dist_nm - PROXIMITY_NM) / PROXIMITY_NM)) # sog 상관 (Pearson) try: sa = m['sa'].astype(float) sb = m['sb'].astype(float) if sa.std() > 0 and sb.std() > 0: sog_corr = abs(float(sa.corr(sb))) else: sog_corr = 1.0 if (sa - sb).abs().mean() < 0.5 else 0.0 except Exception: sog_corr = 0.0 # cog 방향 일치: 평균 각도차 → 10°=1.0, 90°=0.0 선형 cog_deltas = [ _cog_delta(float(r.ca), float(r.cb)) for r in m.itertuples(index=False) ] mean_cog_delta = sum(cog_deltas) / len(cog_deltas) cog_alignment = max(0.0, min(1.0, 1.0 - (mean_cog_delta - COG_DELTA_MAX) / 80.0)) similarity = 0.4 * location_score + 0.3 * sog_corr + 0.3 * cog_alignment return round(similarity, 4), n, { 'location_score': round(location_score, 3), 'sog_corr': round(sog_corr, 3), 'cog_alignment': round(cog_alignment, 3), 'mean_distance_nm': round(mean_dist_nm, 4), 'mean_cog_delta_deg': round(mean_cog_delta, 2), } # bbox 1차 탐색 반경 (도 단위) BBOX_DEG = 0.01 # 약 1.1km — 주변 후보만 컷 SIMILARITY_PAIR = 0.70 # 유력 페어 SIMILARITY_OBSERVE = 0.45 # 관찰 페어 (0.50 → 0.45 완화: pool 확대 후 recall 확보) def find_pair_candidates( base_mmsis: set[str], vessel_dfs: dict[str, pd.DataFrame], get_vessel_info, pt_registered: Optional[set[str]] = None, pt_sub_registered: Optional[set[str]] = None, bbox_deg: float = BBOX_DEG, min_common_samples: int = 6, similarity_threshold: float = SIMILARITY_OBSERVE, ) -> list[dict]: """base 선박별로 bbox 1차 → 궤적 유사도 2차로 페어 후보 반환. base 조건: PT 등록 선박, 어선(000020), 중국 412* 허가선 등 최소 1척 어선 성격. target: 인접 격자 내 모든 선박. 궤적 유사도 ≥ similarity_threshold 이면 후보. Returns: [{'base_mmsi', 'target_mmsi', 'similarity', 'common_samples', 'base_role', 'target_role', 'breakdown'}, ...] """ pt_registered = pt_registered or set() pt_sub_registered = pt_sub_registered or set() # 각 선박의 최근 위치 → grid 빌드 last_pos: dict[str, dict] = {} for mmsi, df in vessel_dfs.items(): if df.empty or not all(c in df.columns for c in ('lat', 'lon')): continue try: row = df.iloc[-1] last_pos[mmsi] = {'lat': float(row['lat']), 'lon': float(row['lon'])} except Exception: continue def _cell(lat: float, lon: float) -> tuple[int, int]: return (int(lat / bbox_deg), int(lon / bbox_deg)) grid: dict[tuple[int, int], list[str]] = {} for mmsi, p in last_pos.items(): grid.setdefault(_cell(p['lat'], p['lon']), []).append(mmsi) checked: set[tuple[str, str]] = set() results: list[dict] = [] for base in base_mmsis: if base not in last_pos: continue bp = last_pos[base] bc = _cell(bp['lat'], bp['lon']) # 인접 9 cell neighbors: list[str] = [] for dr in (-1, 0, 1): for dc in (-1, 0, 1): neighbors.extend(grid.get((bc[0] + dr, bc[1] + dc), [])) for target in neighbors: if target == base: continue key = (base, target) if base < target else (target, base) if key in checked: continue checked.add(key) sim, n_common, breakdown = _trajectory_similarity( vessel_dfs[base], vessel_dfs[target], min_common_samples, ) if sim < similarity_threshold: continue info_a = get_vessel_info(base) if get_vessel_info else {} info_b = get_vessel_info(target) if get_vessel_info else {} role_a = _classify_role(base, info_a, pt_registered, pt_sub_registered) role_b = _classify_role(target, info_b, pt_registered, pt_sub_registered) # 최소 1척이 어선 성격이어야 함 fishing_like = {'FISHING', 'PT_MAIN', 'PT_SUB'} if role_a not in fishing_like and role_b not in fishing_like: continue results.append({ 'base_mmsi': base, 'target_mmsi': target, 'similarity': sim, 'common_samples': n_common, 'base_role': role_a, 'target_role': role_b, 'breakdown': breakdown, 'is_strong': sim >= SIMILARITY_PAIR, }) logger.info( 'find_pair_candidates: base=%d, pool=%d → candidates=%d (strong=%d)', len(base_mmsis), len(last_pos), len(results), sum(1 for r in results if r['is_strong']), ) return results def scan_unregistered_pairs( vessel_dfs: dict[str, pd.DataFrame], registered_pairs: set[tuple[str, str]], ) -> list[tuple[str, str]]: """fleet_registry에 없는 TRAWL 선박 중 500m 이내 + 속력 동기화 조건을 만족하는 쌍 후보 반환. cell-key partitioning으로 O(n²) 회피. Args: vessel_dfs: mmsi → AIS DataFrame. 각 DataFrame은 timestamp, lat, lon, sog 컬럼 필요 registered_pairs: 이미 확인된 쌍 (fleet_tracker 제공). 정규화: (smaller, larger) Returns: list of (mmsi_a, mmsi_b) candidate pairs (정규화된 순서) """ if len(vessel_dfs) < 2: return [] CANDIDATE_PROXIMITY_NM = PROXIMITY_NM * CANDIDATE_PROXIMITY_FACTOR # ── 각 선박의 마지막 위치 추출 ────────────────────────── last_positions: dict[str, dict] = {} for mmsi, df in vessel_dfs.items(): if df.empty or 'lat' not in df.columns or 'lon' not in df.columns or 'sog' not in df.columns: continue try: last_row = df.sort_values('timestamp').iloc[-1] lat = float(last_row['lat']) lon = float(last_row['lon']) sog = float(last_row['sog']) except Exception: continue last_positions[mmsi] = {'lat': lat, 'lon': lon, 'sog': sog} if len(last_positions) < 2: return [] # ── cell-key 격자 구성 ─────────────────────────────────── cell_map: dict[tuple[int, int], list[str]] = {} for mmsi, pos in last_positions.items(): key = _cell_key(pos['lat'], pos['lon']) if key not in cell_map: cell_map[key] = [] cell_map[key].append(mmsi) # ── 인접 9셀 내 후보 쌍 탐색 ──────────────────────────── candidates: list[tuple[str, str]] = [] checked: set[tuple[str, str]] = set() for mmsi_a, pos_a in last_positions.items(): base_cell = _cell_key(pos_a['lat'], pos_a['lon']) # 인접 8셀 + 자기 셀 수집 neighbor_mmsis: list[str] = [] for dr in (-1, 0, 1): for dc in (-1, 0, 1): neighbor_cell = (base_cell[0] + dr, base_cell[1] + dc) neighbor_mmsis.extend(cell_map.get(neighbor_cell, [])) for mmsi_b in neighbor_mmsis: if mmsi_b == mmsi_a: continue # 정규화된 쌍 키 pair_key: tuple[str, str] = ( (mmsi_a, mmsi_b) if mmsi_a < mmsi_b else (mmsi_b, mmsi_a) ) # 중복 검사 if pair_key in checked: continue checked.add(pair_key) # 이미 등록된 쌍 건너뜀 if pair_key in registered_pairs: continue pos_b = last_positions.get(mmsi_b) if pos_b is None: continue # 속력 범위 필터 (완화 기준) sog_a = pos_a['sog'] sog_b = pos_b['sog'] if not (CANDIDATE_SOG_MIN <= sog_a <= CANDIDATE_SOG_MAX): continue if not (CANDIDATE_SOG_MIN <= sog_b <= CANDIDATE_SOG_MAX): continue # 거리 필터 dist_nm = haversine_nm(pos_a['lat'], pos_a['lon'], pos_b['lat'], pos_b['lon']) if dist_nm > CANDIDATE_PROXIMITY_NM: continue candidates.append(pair_key) logger.debug( 'scan_unregistered_pairs: %d vessels, %d candidates found', len(last_positions), len(candidates), ) return candidates