- DB 007: fleet_companies, fleet_vessels, gear_identity_log, fleet_tracking_snapshot - 906척 선단 구성 데이터 적재 (497개 회사, 279쌍 PT) - FleetTracker: 등록 선단 ↔ AIS 매칭(NAME_EXACT) + 어구 정체성 추적 - track_similarity.py: DTW 기반 궤적 유사도 (TRACK_SIMILAR 플래그) - scheduler: fleet_tracker 통합 (기존 assign_fleet_roles 대체) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
161 lines
4.8 KiB
Python
161 lines
4.8 KiB
Python
"""궤적 유사도 — DTW(Dynamic Time Warping) 기반."""
|
|
import math
|
|
|
|
_MAX_RESAMPLE_POINTS = 50
|
|
|
|
|
|
def haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
|
"""두 좌표 간 거리 (미터)."""
|
|
R = 6371000
|
|
phi1, phi2 = math.radians(lat1), math.radians(lat2)
|
|
dphi = math.radians(lat2 - lat1)
|
|
dlam = math.radians(lon2 - lon1)
|
|
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
|
|
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
|
|
|
|
|
def _resample(track: list[tuple[float, float]], n: int) -> list[tuple[float, float]]:
|
|
"""궤적을 n 포인트로 균등 리샘플링 (선형 보간)."""
|
|
if len(track) == 0:
|
|
return []
|
|
if len(track) == 1:
|
|
return [track[0]] * n
|
|
if len(track) <= n:
|
|
return list(track)
|
|
|
|
# 누적 거리 계산
|
|
cumulative = [0.0]
|
|
for i in range(1, len(track)):
|
|
d = haversine_m(track[i - 1][0], track[i - 1][1], track[i][0], track[i][1])
|
|
cumulative.append(cumulative[-1] + d)
|
|
|
|
total_dist = cumulative[-1]
|
|
if total_dist == 0.0:
|
|
return [track[0]] * n
|
|
|
|
step = total_dist / (n - 1)
|
|
result: list[tuple[float, float]] = []
|
|
|
|
seg = 0
|
|
for k in range(n):
|
|
target = step * k
|
|
# 해당 target 거리에 해당하는 선분 찾기
|
|
while seg < len(cumulative) - 2 and cumulative[seg + 1] < target:
|
|
seg += 1
|
|
seg_len = cumulative[seg + 1] - cumulative[seg]
|
|
if seg_len == 0.0:
|
|
result.append(track[seg])
|
|
else:
|
|
t = (target - cumulative[seg]) / seg_len
|
|
lat = track[seg][0] + t * (track[seg + 1][0] - track[seg][0])
|
|
lon = track[seg][1] + t * (track[seg + 1][1] - track[seg][1])
|
|
result.append((lat, lon))
|
|
|
|
return result
|
|
|
|
|
|
def _dtw_distance(
|
|
track_a: list[tuple[float, float]],
|
|
track_b: list[tuple[float, float]],
|
|
) -> float:
|
|
"""두 궤적 간 DTW 거리 (미터 단위 평균 거리)."""
|
|
n, m = len(track_a), len(track_b)
|
|
if n == 0 or m == 0:
|
|
return float('inf')
|
|
|
|
INF = float('inf')
|
|
# 1D 롤링 DP (공간 최적화)
|
|
prev = [INF] * (m + 1)
|
|
prev[0] = 0.0
|
|
# 첫 행 초기화
|
|
row = [INF] * (m + 1)
|
|
row[0] = INF
|
|
|
|
dp_prev = [INF] * (m + 1)
|
|
dp_curr = [INF] * (m + 1)
|
|
dp_prev[0] = 0.0
|
|
for j in range(1, m + 1):
|
|
dp_prev[j] = INF
|
|
|
|
for i in range(1, n + 1):
|
|
dp_curr[0] = INF
|
|
for j in range(1, m + 1):
|
|
cost = haversine_m(track_a[i - 1][0], track_a[i - 1][1],
|
|
track_b[j - 1][0], track_b[j - 1][1])
|
|
min_prev = min(dp_curr[j - 1], dp_prev[j], dp_prev[j - 1])
|
|
dp_curr[j] = cost + min_prev
|
|
dp_prev, dp_curr = dp_curr, [INF] * (m + 1)
|
|
|
|
# dp_prev는 마지막으로 계산된 행
|
|
total = dp_prev[m]
|
|
if total == INF:
|
|
return INF
|
|
return total / (n + m)
|
|
|
|
|
|
def compute_track_similarity(
|
|
track_a: list[tuple[float, float]],
|
|
track_b: list[tuple[float, float]],
|
|
max_dist_m: float = 10000.0,
|
|
) -> float:
|
|
"""두 궤적의 DTW 거리 기반 유사도 (0~1).
|
|
|
|
track이 비어있으면 0.0 반환.
|
|
유사할수록 1.0에 가까움.
|
|
"""
|
|
if not track_a or not track_b:
|
|
return 0.0
|
|
|
|
a = _resample(track_a, _MAX_RESAMPLE_POINTS)
|
|
b = _resample(track_b, _MAX_RESAMPLE_POINTS)
|
|
|
|
avg_dist = _dtw_distance(a, b)
|
|
if avg_dist == float('inf') or max_dist_m <= 0.0:
|
|
return 0.0
|
|
|
|
similarity = 1.0 - (avg_dist / max_dist_m)
|
|
return max(0.0, min(1.0, similarity))
|
|
|
|
|
|
def match_gear_by_track(
|
|
gear_tracks: dict[str, list[tuple[float, float]]],
|
|
vessel_tracks: dict[str, list[tuple[float, float]]],
|
|
threshold: float = 0.6,
|
|
) -> list[dict]:
|
|
"""어구 궤적을 선단 선박 궤적과 비교하여 매칭.
|
|
|
|
Args:
|
|
gear_tracks: mmsi → [(lat, lon), ...] — 어구 궤적
|
|
vessel_tracks: mmsi → [(lat, lon), ...] — 선박 궤적
|
|
threshold: 유사도 하한 (이상이면 매칭)
|
|
|
|
Returns:
|
|
[{gear_mmsi, vessel_mmsi, similarity, match_method: 'TRACK_SIMILAR'}]
|
|
"""
|
|
results: list[dict] = []
|
|
|
|
for gear_mmsi, g_track in gear_tracks.items():
|
|
if not g_track:
|
|
continue
|
|
|
|
best_mmsi: str | None = None
|
|
best_sim = -1.0
|
|
|
|
for vessel_mmsi, v_track in vessel_tracks.items():
|
|
if not v_track:
|
|
continue
|
|
sim = compute_track_similarity(g_track, v_track)
|
|
if sim > best_sim:
|
|
best_sim = sim
|
|
best_mmsi = vessel_mmsi
|
|
|
|
if best_mmsi is not None and best_sim >= threshold:
|
|
results.append({
|
|
'gear_mmsi': gear_mmsi,
|
|
'vessel_mmsi': best_mmsi,
|
|
'similarity': best_sim,
|
|
'match_method': 'TRACK_SIMILAR',
|
|
})
|
|
|
|
return results
|