kcg-monitoring/prediction/algorithms/track_similarity.py
htlee 812a78f636 feat: 어구 연관성 멀티모델 패턴 추적 시스템 (Phase 1 Core)
- gear_correlation.py: 적응형 EMA + freeze + shadow + 배치 최적화
- 5개 글로벌 모델 병렬 추적 (default/aggressive/conservative/proximity-heavy/visit-pattern)
- 어구 중심 점수 체계: 어구 비활성 시 FREEZE, 선박 shadow 추적
- 유형별 메트릭: 어구-선박(proximity+visit+activity), 선박-선박(DTW+SOG+COG)
- DB: correlation_param_models + raw_metrics(일별 파티션) + scores + system_config
- partition_manager: 일별 파티션 생성/정리 (system_config hot-reload)
- track_similarity: SOG상관 + COG동조 + 근접비 3개 메트릭 추가
- scheduler Step 4.7 통합, fleet_tracker MMSI 점수 이전
- chat/tools: query_gear_correlation 도구

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 10:36:43 +09:00

245 lines
6.9 KiB
Python

"""궤적 유사도 — DTW(Dynamic Time Warping) 기반."""
import math
_MAX_RESAMPLE_POINTS = 50
def haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""두 좌표 간 거리 (미터)."""
R = 6371000
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def _resample(track: list[tuple[float, float]], n: int) -> list[tuple[float, float]]:
"""궤적을 n 포인트로 균등 리샘플링 (선형 보간)."""
if len(track) == 0:
return []
if len(track) == 1:
return [track[0]] * n
if len(track) <= n:
return list(track)
# 누적 거리 계산
cumulative = [0.0]
for i in range(1, len(track)):
d = haversine_m(track[i - 1][0], track[i - 1][1], track[i][0], track[i][1])
cumulative.append(cumulative[-1] + d)
total_dist = cumulative[-1]
if total_dist == 0.0:
return [track[0]] * n
step = total_dist / (n - 1)
result: list[tuple[float, float]] = []
seg = 0
for k in range(n):
target = step * k
# 해당 target 거리에 해당하는 선분 찾기
while seg < len(cumulative) - 2 and cumulative[seg + 1] < target:
seg += 1
seg_len = cumulative[seg + 1] - cumulative[seg]
if seg_len == 0.0:
result.append(track[seg])
else:
t = (target - cumulative[seg]) / seg_len
lat = track[seg][0] + t * (track[seg + 1][0] - track[seg][0])
lon = track[seg][1] + t * (track[seg + 1][1] - track[seg][1])
result.append((lat, lon))
return result
def _dtw_distance(
track_a: list[tuple[float, float]],
track_b: list[tuple[float, float]],
) -> float:
"""두 궤적 간 DTW 거리 (미터 단위 평균 거리)."""
n, m = len(track_a), len(track_b)
if n == 0 or m == 0:
return float('inf')
INF = float('inf')
# 1D 롤링 DP (공간 최적화)
prev = [INF] * (m + 1)
prev[0] = 0.0
# 첫 행 초기화
row = [INF] * (m + 1)
row[0] = INF
dp_prev = [INF] * (m + 1)
dp_curr = [INF] * (m + 1)
dp_prev[0] = 0.0
for j in range(1, m + 1):
dp_prev[j] = INF
for i in range(1, n + 1):
dp_curr[0] = INF
for j in range(1, m + 1):
cost = haversine_m(track_a[i - 1][0], track_a[i - 1][1],
track_b[j - 1][0], track_b[j - 1][1])
min_prev = min(dp_curr[j - 1], dp_prev[j], dp_prev[j - 1])
dp_curr[j] = cost + min_prev
dp_prev, dp_curr = dp_curr, [INF] * (m + 1)
# dp_prev는 마지막으로 계산된 행
total = dp_prev[m]
if total == INF:
return INF
return total / (n + m)
def compute_track_similarity(
track_a: list[tuple[float, float]],
track_b: list[tuple[float, float]],
max_dist_m: float = 10000.0,
) -> float:
"""두 궤적의 DTW 거리 기반 유사도 (0~1).
track이 비어있으면 0.0 반환.
유사할수록 1.0에 가까움.
"""
if not track_a or not track_b:
return 0.0
a = _resample(track_a, _MAX_RESAMPLE_POINTS)
b = _resample(track_b, _MAX_RESAMPLE_POINTS)
avg_dist = _dtw_distance(a, b)
if avg_dist == float('inf') or max_dist_m <= 0.0:
return 0.0
similarity = 1.0 - (avg_dist / max_dist_m)
return max(0.0, min(1.0, similarity))
def match_gear_by_track(
gear_tracks: dict[str, list[tuple[float, float]]],
vessel_tracks: dict[str, list[tuple[float, float]]],
threshold: float = 0.6,
) -> list[dict]:
"""어구 궤적을 선단 선박 궤적과 비교하여 매칭.
Args:
gear_tracks: mmsi → [(lat, lon), ...] — 어구 궤적
vessel_tracks: mmsi → [(lat, lon), ...] — 선박 궤적
threshold: 유사도 하한 (이상이면 매칭)
Returns:
[{gear_mmsi, vessel_mmsi, similarity, match_method: 'TRACK_SIMILAR'}]
"""
results: list[dict] = []
for gear_mmsi, g_track in gear_tracks.items():
if not g_track:
continue
best_mmsi: str | None = None
best_sim = -1.0
for vessel_mmsi, v_track in vessel_tracks.items():
if not v_track:
continue
sim = compute_track_similarity(g_track, v_track)
if sim > best_sim:
best_sim = sim
best_mmsi = vessel_mmsi
if best_mmsi is not None and best_sim >= threshold:
results.append({
'gear_mmsi': gear_mmsi,
'vessel_mmsi': best_mmsi,
'similarity': best_sim,
'match_method': 'TRACK_SIMILAR',
})
return results
def compute_sog_correlation(
sog_a: list[float],
sog_b: list[float],
) -> float:
"""두 SOG 시계열의 피어슨 상관계수 (0~1 정규화).
시계열 길이가 다르면 짧은 쪽 기준으로 자름.
데이터 부족(< 3점)이면 0.0 반환.
"""
n = min(len(sog_a), len(sog_b))
if n < 3:
return 0.0
a = sog_a[:n]
b = sog_b[:n]
mean_a = sum(a) / n
mean_b = sum(b) / n
cov = sum((a[i] - mean_a) * (b[i] - mean_b) for i in range(n))
var_a = sum((x - mean_a) ** 2 for x in a)
var_b = sum((x - mean_b) ** 2 for x in b)
denom = (var_a * var_b) ** 0.5
if denom < 1e-12:
return 0.0
corr = cov / denom # -1 ~ 1
return max(0.0, (corr + 1.0) / 2.0) # 0 ~ 1 정규화
def compute_heading_coherence(
cog_a: list[float],
cog_b: list[float],
threshold_deg: float = 30.0,
) -> float:
"""두 COG 시계열의 방향 동조율 (0~1).
angular diff < threshold_deg 인 비율.
시계열 길이가 다르면 짧은 쪽 기준.
데이터 부족(< 3점)이면 0.0 반환.
"""
n = min(len(cog_a), len(cog_b))
if n < 3:
return 0.0
coherent = 0
for i in range(n):
diff = abs(cog_a[i] - cog_b[i])
if diff > 180.0:
diff = 360.0 - diff
if diff < threshold_deg:
coherent += 1
return coherent / n
def compute_proximity_ratio(
track_a: list[tuple[float, float]],
track_b: list[tuple[float, float]],
threshold_nm: float = 10.0,
) -> float:
"""두 궤적의 근접 지속비 (0~1).
시간 정렬된 포인트 쌍에서 haversine < threshold_nm 비율.
시계열 길이가 다르면 짧은 쪽 기준.
데이터 부족(< 2점)이면 0.0 반환.
"""
n = min(len(track_a), len(track_b))
if n < 2:
return 0.0
close = 0
threshold_m = threshold_nm * 1852.0
for i in range(n):
dist = haversine_m(track_a[i][0], track_a[i][1],
track_b[i][0], track_b[i][1])
if dist < threshold_m:
close += 1
return close / n