"""궤적 유사도 — 시간 정렬 쌍 비교 + DTW(레거시) 지원.""" import math from typing import Optional _MAX_RESAMPLE_POINTS = 50 _TEMPORAL_INTERVAL_MS = 300_000 # 5분 _MAX_GAP_MS = 14_400_000 # 4시간 — 보간 상한 (어구 간헐 수신 허용) _DECAY_DIST_M = 3000.0 # 지수 감쇠 기준거리 (3km) _COG_PENALTY_THRESHOLD_DEG = 45.0 # COG 차이 페널티 임계 _COG_PENALTY_FACTOR = 1.5 # COG 페널티 배수 def haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """두 좌표 간 거리 (미터).""" R = 6371000 phi1, phi2 = math.radians(lat1), math.radians(lat2) dphi = math.radians(lat2 - lat1) dlam = math.radians(lon2 - lon1) a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2 return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) def _resample(track: list[tuple[float, float]], n: int) -> list[tuple[float, float]]: """궤적을 n 포인트로 균등 리샘플링 (선형 보간).""" if len(track) == 0: return [] if len(track) == 1: return [track[0]] * n if len(track) <= n: return list(track) # 누적 거리 계산 cumulative = [0.0] for i in range(1, len(track)): d = haversine_m(track[i - 1][0], track[i - 1][1], track[i][0], track[i][1]) cumulative.append(cumulative[-1] + d) total_dist = cumulative[-1] if total_dist == 0.0: return [track[0]] * n step = total_dist / (n - 1) result: list[tuple[float, float]] = [] seg = 0 for k in range(n): target = step * k # 해당 target 거리에 해당하는 선분 찾기 while seg < len(cumulative) - 2 and cumulative[seg + 1] < target: seg += 1 seg_len = cumulative[seg + 1] - cumulative[seg] if seg_len == 0.0: result.append(track[seg]) else: t = (target - cumulative[seg]) / seg_len lat = track[seg][0] + t * (track[seg + 1][0] - track[seg][0]) lon = track[seg][1] + t * (track[seg + 1][1] - track[seg][1]) result.append((lat, lon)) return result def _dtw_distance( track_a: list[tuple[float, float]], track_b: list[tuple[float, float]], ) -> float: """두 궤적 간 DTW 거리 (미터 단위 평균 거리).""" n, m = len(track_a), len(track_b) if n == 0 or m == 0: return float('inf') INF = float('inf') # 1D 롤링 DP (공간 최적화) prev = [INF] * (m + 1) prev[0] = 0.0 # 첫 행 초기화 row = [INF] * (m + 1) row[0] = INF dp_prev = [INF] * (m + 1) dp_curr = [INF] * (m + 1) dp_prev[0] = 0.0 for j in range(1, m + 1): dp_prev[j] = INF for i in range(1, n + 1): dp_curr[0] = INF for j in range(1, m + 1): cost = haversine_m(track_a[i - 1][0], track_a[i - 1][1], track_b[j - 1][0], track_b[j - 1][1]) min_prev = min(dp_curr[j - 1], dp_prev[j], dp_prev[j - 1]) dp_curr[j] = cost + min_prev dp_prev, dp_curr = dp_curr, [INF] * (m + 1) # dp_prev는 마지막으로 계산된 행 total = dp_prev[m] if total == INF: return INF return total / (n + m) # ── 시간 정렬 리샘플 (v2) ───────────────────────────────────── def _resample_temporal( track: list[dict], interval_ms: int = _TEMPORAL_INTERVAL_MS, max_gap_ms: int = _MAX_GAP_MS, ) -> list[Optional[dict]]: """타임스탬프 기반 등간격 리샘플. 갭 > max_gap_ms인 슬롯은 None. 입력: [{lat, lon, ts(epoch_ms), cog?}, ...] (ts 정렬 필수 아님) 반환: [dict | None, ...] 5분 간격 슬롯. None = 보간 불가 구간. """ if not track: return [] sorted_pts = sorted(track, key=lambda p: p['ts']) if len(sorted_pts) < 2: return [sorted_pts[0]] t_start = sorted_pts[0]['ts'] t_end = sorted_pts[-1]['ts'] if t_end <= t_start: return [sorted_pts[0]] slots: list[Optional[dict]] = [] seg_idx = 0 # 절대 시간 경계로 정렬 (epoch 기준 interval_ms 배수) t = (t_start // interval_ms) * interval_ms while t <= t_end: # seg_idx를 t가 속하는 구간까지 전진 while seg_idx < len(sorted_pts) - 2 and sorted_pts[seg_idx + 1]['ts'] < t: seg_idx += 1 p0 = sorted_pts[seg_idx] p1 = sorted_pts[min(seg_idx + 1, len(sorted_pts) - 1)] gap = p1['ts'] - p0['ts'] if gap > max_gap_ms or gap <= 0: # 갭이 너무 크거나 동일 시점 → 보간 불가 if abs(t - p0['ts']) < interval_ms: slots.append(p0) else: slots.append(None) else: ratio = (t - p0['ts']) / gap ratio = max(0.0, min(1.0, ratio)) lat = p0['lat'] + ratio * (p1['lat'] - p0['lat']) lon = p0['lon'] + ratio * (p1['lon'] - p0['lon']) cog0 = p0.get('cog') cog1 = p1.get('cog') cog = None if cog0 is not None and cog1 is not None: # 원형 보간 diff = (cog1 - cog0 + 540) % 360 - 180 cog = (cog0 + ratio * diff) % 360 slots.append({'lat': lat, 'lon': lon, 'ts': t, 'cog': cog}) t += interval_ms return slots def _angular_diff(a: float, b: float) -> float: """두 각도의 최소 차이 (0~180).""" diff = abs(a - b) % 360 return min(diff, 360 - diff) def compute_track_similarity_v2( track_a: list[dict], track_b: list[dict], interval_ms: int = _TEMPORAL_INTERVAL_MS, max_gap_ms: int = _MAX_GAP_MS, ) -> float: """시간 정렬 기반 궤적 유사도 (0~1). 입력: [{lat, lon, ts(epoch_ms), cog?}, ...] - 5분 간격으로 양쪽 리샘플 - 동일 시각 슬롯만 쌍으로 비교 - 거리: haversine + COG 페널티 - 점수: exp(-avg_dist / 3000) """ if not track_a or not track_b: return 0.0 slots_a = _resample_temporal(track_a, interval_ms, max_gap_ms) slots_b = _resample_temporal(track_b, interval_ms, max_gap_ms) # 시간 범위 정렬: 공통 구간만 비교 if not slots_a or not slots_b: return 0.0 first_a = next((s for s in slots_a if s is not None), None) first_b = next((s for s in slots_b if s is not None), None) if first_a is None or first_b is None: return 0.0 # 양쪽의 시작/끝 시간 t_start_a = first_a['ts'] t_start_b = first_b['ts'] t_start = max(t_start_a, t_start_b) last_a = next((s for s in reversed(slots_a) if s is not None), None) last_b = next((s for s in reversed(slots_b) if s is not None), None) if last_a is None or last_b is None: return 0.0 t_end = min(last_a['ts'], last_b['ts']) if t_end <= t_start: return 0.0 # 인덱스 매핑 (각 슬롯의 ts → 슬롯) map_a: dict[int, dict] = {} for s in slots_a: if s is not None: map_a[s['ts']] = s map_b: dict[int, dict] = {} for s in slots_b: if s is not None: map_b[s['ts']] = s total_dist = 0.0 count = 0 t = t_start while t <= t_end: # 가장 가까운 슬롯 찾기 (interval 반경 내) sa = map_a.get(t) sb = map_b.get(t) if sa is not None and sb is not None: dist = haversine_m(sa['lat'], sa['lon'], sb['lat'], sb['lon']) # COG 페널티 if sa.get('cog') is not None and sb.get('cog') is not None: cog_diff = _angular_diff(sa['cog'], sb['cog']) if cog_diff > _COG_PENALTY_THRESHOLD_DEG: dist *= _COG_PENALTY_FACTOR total_dist += dist count += 1 t += interval_ms if count < 3: return 0.0 avg_dist = total_dist / count return math.exp(-avg_dist / _DECAY_DIST_M) def compute_track_similarity( track_a: list[tuple[float, float]], track_b: list[tuple[float, float]], max_dist_m: float = 10000.0, ) -> float: """두 궤적의 DTW 거리 기반 유사도 (0~1). track이 비어있으면 0.0 반환. 유사할수록 1.0에 가까움. """ if not track_a or not track_b: return 0.0 a = _resample(track_a, _MAX_RESAMPLE_POINTS) b = _resample(track_b, _MAX_RESAMPLE_POINTS) avg_dist = _dtw_distance(a, b) if avg_dist == float('inf') or max_dist_m <= 0.0: return 0.0 similarity = 1.0 - (avg_dist / max_dist_m) return max(0.0, min(1.0, similarity)) def match_gear_by_track( gear_tracks: dict[str, list[tuple[float, float]]], vessel_tracks: dict[str, list[tuple[float, float]]], threshold: float = 0.6, ) -> list[dict]: """어구 궤적을 선단 선박 궤적과 비교하여 매칭. Args: gear_tracks: mmsi → [(lat, lon), ...] — 어구 궤적 vessel_tracks: mmsi → [(lat, lon), ...] — 선박 궤적 threshold: 유사도 하한 (이상이면 매칭) Returns: [{gear_mmsi, vessel_mmsi, similarity, match_method: 'TRACK_SIMILAR'}] """ results: list[dict] = [] for gear_mmsi, g_track in gear_tracks.items(): if not g_track: continue best_mmsi: str | None = None best_sim = -1.0 for vessel_mmsi, v_track in vessel_tracks.items(): if not v_track: continue sim = compute_track_similarity(g_track, v_track) if sim > best_sim: best_sim = sim best_mmsi = vessel_mmsi if best_mmsi is not None and best_sim >= threshold: results.append({ 'gear_mmsi': gear_mmsi, 'vessel_mmsi': best_mmsi, 'similarity': best_sim, 'match_method': 'TRACK_SIMILAR', }) return results def compute_sog_correlation( sog_a: list[float], sog_b: list[float], ) -> float: """두 SOG 시계열의 피어슨 상관계수 (0~1 정규화). 시계열 길이가 다르면 짧은 쪽 기준으로 자름. 데이터 부족(< 3점)이면 0.0 반환. """ n = min(len(sog_a), len(sog_b)) if n < 3: return 0.0 a = sog_a[:n] b = sog_b[:n] mean_a = sum(a) / n mean_b = sum(b) / n cov = sum((a[i] - mean_a) * (b[i] - mean_b) for i in range(n)) var_a = sum((x - mean_a) ** 2 for x in a) var_b = sum((x - mean_b) ** 2 for x in b) denom = (var_a * var_b) ** 0.5 if denom < 1e-12: return 0.0 corr = cov / denom # -1 ~ 1 return max(0.0, (corr + 1.0) / 2.0) # 0 ~ 1 정규화 def compute_heading_coherence( cog_a: list[float], cog_b: list[float], threshold_deg: float = 30.0, ) -> float: """두 COG 시계열의 방향 동조율 (0~1). angular diff < threshold_deg 인 비율. 시계열 길이가 다르면 짧은 쪽 기준. 데이터 부족(< 3점)이면 0.0 반환. """ n = min(len(cog_a), len(cog_b)) if n < 3: return 0.0 coherent = 0 for i in range(n): diff = abs(cog_a[i] - cog_b[i]) if diff > 180.0: diff = 360.0 - diff if diff < threshold_deg: coherent += 1 return coherent / n def compute_proximity_ratio( track_a: list[tuple[float, float]], track_b: list[tuple[float, float]], threshold_nm: float = 10.0, ) -> float: """두 궤적의 근접 지속비 (0~1). 시간 정렬된 포인트 쌍에서 haversine < threshold_nm 비율. 시계열 길이가 다르면 짧은 쪽 기준. 데이터 부족(< 2점)이면 0.0 반환. """ n = min(len(track_a), len(track_b)) if n < 2: return 0.0 close = 0 threshold_m = threshold_nm * 1852.0 for i in range(n): dist = haversine_m(track_a[i][0], track_a[i][1], track_b[i][0], track_b[i][1]) if dist < threshold_m: close += 1 return close / n