"""궤적 유사도 — DTW(Dynamic Time Warping) 기반.""" import math _MAX_RESAMPLE_POINTS = 50 def haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """두 좌표 간 거리 (미터).""" R = 6371000 phi1, phi2 = math.radians(lat1), math.radians(lat2) dphi = math.radians(lat2 - lat1) dlam = math.radians(lon2 - lon1) a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2 return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) def _resample(track: list[tuple[float, float]], n: int) -> list[tuple[float, float]]: """궤적을 n 포인트로 균등 리샘플링 (선형 보간).""" if len(track) == 0: return [] if len(track) == 1: return [track[0]] * n if len(track) <= n: return list(track) # 누적 거리 계산 cumulative = [0.0] for i in range(1, len(track)): d = haversine_m(track[i - 1][0], track[i - 1][1], track[i][0], track[i][1]) cumulative.append(cumulative[-1] + d) total_dist = cumulative[-1] if total_dist == 0.0: return [track[0]] * n step = total_dist / (n - 1) result: list[tuple[float, float]] = [] seg = 0 for k in range(n): target = step * k # 해당 target 거리에 해당하는 선분 찾기 while seg < len(cumulative) - 2 and cumulative[seg + 1] < target: seg += 1 seg_len = cumulative[seg + 1] - cumulative[seg] if seg_len == 0.0: result.append(track[seg]) else: t = (target - cumulative[seg]) / seg_len lat = track[seg][0] + t * (track[seg + 1][0] - track[seg][0]) lon = track[seg][1] + t * (track[seg + 1][1] - track[seg][1]) result.append((lat, lon)) return result def _dtw_distance( track_a: list[tuple[float, float]], track_b: list[tuple[float, float]], ) -> float: """두 궤적 간 DTW 거리 (미터 단위 평균 거리).""" n, m = len(track_a), len(track_b) if n == 0 or m == 0: return float('inf') INF = float('inf') # 1D 롤링 DP (공간 최적화) prev = [INF] * (m + 1) prev[0] = 0.0 # 첫 행 초기화 row = [INF] * (m + 1) row[0] = INF dp_prev = [INF] * (m + 1) dp_curr = [INF] * (m + 1) dp_prev[0] = 0.0 for j in range(1, m + 1): dp_prev[j] = INF for i in range(1, n + 1): dp_curr[0] = INF for j in range(1, m + 1): cost = haversine_m(track_a[i - 1][0], track_a[i - 1][1], track_b[j - 1][0], track_b[j - 1][1]) min_prev = min(dp_curr[j - 1], dp_prev[j], dp_prev[j - 1]) dp_curr[j] = cost + min_prev dp_prev, dp_curr = dp_curr, [INF] * (m + 1) # dp_prev는 마지막으로 계산된 행 total = dp_prev[m] if total == INF: return INF return total / (n + m) def compute_track_similarity( track_a: list[tuple[float, float]], track_b: list[tuple[float, float]], max_dist_m: float = 10000.0, ) -> float: """두 궤적의 DTW 거리 기반 유사도 (0~1). track이 비어있으면 0.0 반환. 유사할수록 1.0에 가까움. """ if not track_a or not track_b: return 0.0 a = _resample(track_a, _MAX_RESAMPLE_POINTS) b = _resample(track_b, _MAX_RESAMPLE_POINTS) avg_dist = _dtw_distance(a, b) if avg_dist == float('inf') or max_dist_m <= 0.0: return 0.0 similarity = 1.0 - (avg_dist / max_dist_m) return max(0.0, min(1.0, similarity)) def match_gear_by_track( gear_tracks: dict[str, list[tuple[float, float]]], vessel_tracks: dict[str, list[tuple[float, float]]], threshold: float = 0.6, ) -> list[dict]: """어구 궤적을 선단 선박 궤적과 비교하여 매칭. Args: gear_tracks: mmsi → [(lat, lon), ...] — 어구 궤적 vessel_tracks: mmsi → [(lat, lon), ...] — 선박 궤적 threshold: 유사도 하한 (이상이면 매칭) Returns: [{gear_mmsi, vessel_mmsi, similarity, match_method: 'TRACK_SIMILAR'}] """ results: list[dict] = [] for gear_mmsi, g_track in gear_tracks.items(): if not g_track: continue best_mmsi: str | None = None best_sim = -1.0 for vessel_mmsi, v_track in vessel_tracks.items(): if not v_track: continue sim = compute_track_similarity(g_track, v_track) if sim > best_sim: best_sim = sim best_mmsi = vessel_mmsi if best_mmsi is not None and best_sim >= threshold: results.append({ 'gear_mmsi': gear_mmsi, 'vessel_mmsi': best_mmsi, 'similarity': best_sim, 'match_method': 'TRACK_SIMILAR', }) return results