iran prediction 47개 Python 파일을 prediction/ 디렉토리로 복제: - algorithms/ 14개 분석 알고리즘 (어구추론, 다크베셀, 스푸핑, 환적, 위험도 등) - pipeline/ 7단계 분류 파이프라인 - cache/vessel_store (24h 슬라이딩 윈도우) - db/ 어댑터 (snpdb 원본조회, kcgdb 결과저장) - chat/ AI 채팅 (Ollama, 후순위) - data/ 정적 데이터 (기선, 특정어업수역 GeoJSON) config.py를 kcgaidb로 재구성 (DB명, 사용자, 비밀번호) DB 연결 검증 완료 (kcgaidb 37개 테이블 접근 확인) Makefile에 dev-prediction / dev-all 타겟 추가 CLAUDE.md에 prediction 섹션 추가 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
395 lines
12 KiB
Python
395 lines
12 KiB
Python
"""궤적 유사도 — 시간 정렬 쌍 비교 + DTW(레거시) 지원."""
|
|
import math
|
|
from typing import Optional
|
|
|
|
_MAX_RESAMPLE_POINTS = 50
|
|
_TEMPORAL_INTERVAL_MS = 300_000 # 5분
|
|
_MAX_GAP_MS = 14_400_000 # 4시간 — 보간 상한 (어구 간헐 수신 허용)
|
|
_DECAY_DIST_M = 3000.0 # 지수 감쇠 기준거리 (3km)
|
|
_COG_PENALTY_THRESHOLD_DEG = 45.0 # COG 차이 페널티 임계
|
|
_COG_PENALTY_FACTOR = 1.5 # COG 페널티 배수
|
|
|
|
|
|
def haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
|
"""두 좌표 간 거리 (미터)."""
|
|
R = 6371000
|
|
phi1, phi2 = math.radians(lat1), math.radians(lat2)
|
|
dphi = math.radians(lat2 - lat1)
|
|
dlam = math.radians(lon2 - lon1)
|
|
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
|
|
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
|
|
|
|
|
def _resample(track: list[tuple[float, float]], n: int) -> list[tuple[float, float]]:
|
|
"""궤적을 n 포인트로 균등 리샘플링 (선형 보간)."""
|
|
if len(track) == 0:
|
|
return []
|
|
if len(track) == 1:
|
|
return [track[0]] * n
|
|
if len(track) <= n:
|
|
return list(track)
|
|
|
|
# 누적 거리 계산
|
|
cumulative = [0.0]
|
|
for i in range(1, len(track)):
|
|
d = haversine_m(track[i - 1][0], track[i - 1][1], track[i][0], track[i][1])
|
|
cumulative.append(cumulative[-1] + d)
|
|
|
|
total_dist = cumulative[-1]
|
|
if total_dist == 0.0:
|
|
return [track[0]] * n
|
|
|
|
step = total_dist / (n - 1)
|
|
result: list[tuple[float, float]] = []
|
|
|
|
seg = 0
|
|
for k in range(n):
|
|
target = step * k
|
|
# 해당 target 거리에 해당하는 선분 찾기
|
|
while seg < len(cumulative) - 2 and cumulative[seg + 1] < target:
|
|
seg += 1
|
|
seg_len = cumulative[seg + 1] - cumulative[seg]
|
|
if seg_len == 0.0:
|
|
result.append(track[seg])
|
|
else:
|
|
t = (target - cumulative[seg]) / seg_len
|
|
lat = track[seg][0] + t * (track[seg + 1][0] - track[seg][0])
|
|
lon = track[seg][1] + t * (track[seg + 1][1] - track[seg][1])
|
|
result.append((lat, lon))
|
|
|
|
return result
|
|
|
|
|
|
def _dtw_distance(
|
|
track_a: list[tuple[float, float]],
|
|
track_b: list[tuple[float, float]],
|
|
) -> float:
|
|
"""두 궤적 간 DTW 거리 (미터 단위 평균 거리)."""
|
|
n, m = len(track_a), len(track_b)
|
|
if n == 0 or m == 0:
|
|
return float('inf')
|
|
|
|
INF = float('inf')
|
|
# 1D 롤링 DP (공간 최적화)
|
|
prev = [INF] * (m + 1)
|
|
prev[0] = 0.0
|
|
# 첫 행 초기화
|
|
row = [INF] * (m + 1)
|
|
row[0] = INF
|
|
|
|
dp_prev = [INF] * (m + 1)
|
|
dp_curr = [INF] * (m + 1)
|
|
dp_prev[0] = 0.0
|
|
for j in range(1, m + 1):
|
|
dp_prev[j] = INF
|
|
|
|
for i in range(1, n + 1):
|
|
dp_curr[0] = INF
|
|
for j in range(1, m + 1):
|
|
cost = haversine_m(track_a[i - 1][0], track_a[i - 1][1],
|
|
track_b[j - 1][0], track_b[j - 1][1])
|
|
min_prev = min(dp_curr[j - 1], dp_prev[j], dp_prev[j - 1])
|
|
dp_curr[j] = cost + min_prev
|
|
dp_prev, dp_curr = dp_curr, [INF] * (m + 1)
|
|
|
|
# dp_prev는 마지막으로 계산된 행
|
|
total = dp_prev[m]
|
|
if total == INF:
|
|
return INF
|
|
return total / (n + m)
|
|
|
|
|
|
# ── 시간 정렬 리샘플 (v2) ─────────────────────────────────────
|
|
|
|
def _resample_temporal(
|
|
track: list[dict],
|
|
interval_ms: int = _TEMPORAL_INTERVAL_MS,
|
|
max_gap_ms: int = _MAX_GAP_MS,
|
|
) -> list[Optional[dict]]:
|
|
"""타임스탬프 기반 등간격 리샘플. 갭 > max_gap_ms인 슬롯은 None.
|
|
|
|
입력: [{lat, lon, ts(epoch_ms), cog?}, ...] (ts 정렬 필수 아님)
|
|
반환: [dict | None, ...] 5분 간격 슬롯. None = 보간 불가 구간.
|
|
"""
|
|
if not track:
|
|
return []
|
|
sorted_pts = sorted(track, key=lambda p: p['ts'])
|
|
if len(sorted_pts) < 2:
|
|
return [sorted_pts[0]]
|
|
|
|
t_start = sorted_pts[0]['ts']
|
|
t_end = sorted_pts[-1]['ts']
|
|
if t_end <= t_start:
|
|
return [sorted_pts[0]]
|
|
|
|
slots: list[Optional[dict]] = []
|
|
seg_idx = 0
|
|
|
|
# 절대 시간 경계로 정렬 (epoch 기준 interval_ms 배수)
|
|
t = (t_start // interval_ms) * interval_ms
|
|
while t <= t_end:
|
|
# seg_idx를 t가 속하는 구간까지 전진
|
|
while seg_idx < len(sorted_pts) - 2 and sorted_pts[seg_idx + 1]['ts'] < t:
|
|
seg_idx += 1
|
|
|
|
p0 = sorted_pts[seg_idx]
|
|
p1 = sorted_pts[min(seg_idx + 1, len(sorted_pts) - 1)]
|
|
gap = p1['ts'] - p0['ts']
|
|
|
|
if gap > max_gap_ms or gap <= 0:
|
|
# 갭이 너무 크거나 동일 시점 → 보간 불가
|
|
if abs(t - p0['ts']) < interval_ms:
|
|
slots.append(p0)
|
|
else:
|
|
slots.append(None)
|
|
else:
|
|
ratio = (t - p0['ts']) / gap
|
|
ratio = max(0.0, min(1.0, ratio))
|
|
lat = p0['lat'] + ratio * (p1['lat'] - p0['lat'])
|
|
lon = p0['lon'] + ratio * (p1['lon'] - p0['lon'])
|
|
cog0 = p0.get('cog')
|
|
cog1 = p1.get('cog')
|
|
cog = None
|
|
if cog0 is not None and cog1 is not None:
|
|
# 원형 보간
|
|
diff = (cog1 - cog0 + 540) % 360 - 180
|
|
cog = (cog0 + ratio * diff) % 360
|
|
slots.append({'lat': lat, 'lon': lon, 'ts': t, 'cog': cog})
|
|
|
|
t += interval_ms
|
|
|
|
return slots
|
|
|
|
|
|
def _angular_diff(a: float, b: float) -> float:
|
|
"""두 각도의 최소 차이 (0~180)."""
|
|
diff = abs(a - b) % 360
|
|
return min(diff, 360 - diff)
|
|
|
|
|
|
def compute_track_similarity_v2(
|
|
track_a: list[dict],
|
|
track_b: list[dict],
|
|
interval_ms: int = _TEMPORAL_INTERVAL_MS,
|
|
max_gap_ms: int = _MAX_GAP_MS,
|
|
) -> float:
|
|
"""시간 정렬 기반 궤적 유사도 (0~1).
|
|
|
|
입력: [{lat, lon, ts(epoch_ms), cog?}, ...]
|
|
- 5분 간격으로 양쪽 리샘플
|
|
- 동일 시각 슬롯만 쌍으로 비교
|
|
- 거리: haversine + COG 페널티
|
|
- 점수: exp(-avg_dist / 3000)
|
|
"""
|
|
if not track_a or not track_b:
|
|
return 0.0
|
|
|
|
slots_a = _resample_temporal(track_a, interval_ms, max_gap_ms)
|
|
slots_b = _resample_temporal(track_b, interval_ms, max_gap_ms)
|
|
|
|
# 시간 범위 정렬: 공통 구간만 비교
|
|
if not slots_a or not slots_b:
|
|
return 0.0
|
|
first_a = next((s for s in slots_a if s is not None), None)
|
|
first_b = next((s for s in slots_b if s is not None), None)
|
|
if first_a is None or first_b is None:
|
|
return 0.0
|
|
|
|
# 양쪽의 시작/끝 시간
|
|
t_start_a = first_a['ts']
|
|
t_start_b = first_b['ts']
|
|
t_start = max(t_start_a, t_start_b)
|
|
last_a = next((s for s in reversed(slots_a) if s is not None), None)
|
|
last_b = next((s for s in reversed(slots_b) if s is not None), None)
|
|
if last_a is None or last_b is None:
|
|
return 0.0
|
|
t_end = min(last_a['ts'], last_b['ts'])
|
|
|
|
if t_end <= t_start:
|
|
return 0.0
|
|
|
|
# 인덱스 매핑 (각 슬롯의 ts → 슬롯)
|
|
map_a: dict[int, dict] = {}
|
|
for s in slots_a:
|
|
if s is not None:
|
|
map_a[s['ts']] = s
|
|
map_b: dict[int, dict] = {}
|
|
for s in slots_b:
|
|
if s is not None:
|
|
map_b[s['ts']] = s
|
|
|
|
total_dist = 0.0
|
|
count = 0
|
|
t = t_start
|
|
while t <= t_end:
|
|
# 가장 가까운 슬롯 찾기 (interval 반경 내)
|
|
sa = map_a.get(t)
|
|
sb = map_b.get(t)
|
|
if sa is not None and sb is not None:
|
|
dist = haversine_m(sa['lat'], sa['lon'], sb['lat'], sb['lon'])
|
|
# COG 페널티
|
|
if sa.get('cog') is not None and sb.get('cog') is not None:
|
|
cog_diff = _angular_diff(sa['cog'], sb['cog'])
|
|
if cog_diff > _COG_PENALTY_THRESHOLD_DEG:
|
|
dist *= _COG_PENALTY_FACTOR
|
|
total_dist += dist
|
|
count += 1
|
|
t += interval_ms
|
|
|
|
if count < 3:
|
|
return 0.0
|
|
|
|
avg_dist = total_dist / count
|
|
return math.exp(-avg_dist / _DECAY_DIST_M)
|
|
|
|
|
|
def compute_track_similarity(
|
|
track_a: list[tuple[float, float]],
|
|
track_b: list[tuple[float, float]],
|
|
max_dist_m: float = 10000.0,
|
|
) -> float:
|
|
"""두 궤적의 DTW 거리 기반 유사도 (0~1).
|
|
|
|
track이 비어있으면 0.0 반환.
|
|
유사할수록 1.0에 가까움.
|
|
"""
|
|
if not track_a or not track_b:
|
|
return 0.0
|
|
|
|
a = _resample(track_a, _MAX_RESAMPLE_POINTS)
|
|
b = _resample(track_b, _MAX_RESAMPLE_POINTS)
|
|
|
|
avg_dist = _dtw_distance(a, b)
|
|
if avg_dist == float('inf') or max_dist_m <= 0.0:
|
|
return 0.0
|
|
|
|
similarity = 1.0 - (avg_dist / max_dist_m)
|
|
return max(0.0, min(1.0, similarity))
|
|
|
|
|
|
def match_gear_by_track(
|
|
gear_tracks: dict[str, list[tuple[float, float]]],
|
|
vessel_tracks: dict[str, list[tuple[float, float]]],
|
|
threshold: float = 0.6,
|
|
) -> list[dict]:
|
|
"""어구 궤적을 선단 선박 궤적과 비교하여 매칭.
|
|
|
|
Args:
|
|
gear_tracks: mmsi → [(lat, lon), ...] — 어구 궤적
|
|
vessel_tracks: mmsi → [(lat, lon), ...] — 선박 궤적
|
|
threshold: 유사도 하한 (이상이면 매칭)
|
|
|
|
Returns:
|
|
[{gear_mmsi, vessel_mmsi, similarity, match_method: 'TRACK_SIMILAR'}]
|
|
"""
|
|
results: list[dict] = []
|
|
|
|
for gear_mmsi, g_track in gear_tracks.items():
|
|
if not g_track:
|
|
continue
|
|
|
|
best_mmsi: str | None = None
|
|
best_sim = -1.0
|
|
|
|
for vessel_mmsi, v_track in vessel_tracks.items():
|
|
if not v_track:
|
|
continue
|
|
sim = compute_track_similarity(g_track, v_track)
|
|
if sim > best_sim:
|
|
best_sim = sim
|
|
best_mmsi = vessel_mmsi
|
|
|
|
if best_mmsi is not None and best_sim >= threshold:
|
|
results.append({
|
|
'gear_mmsi': gear_mmsi,
|
|
'vessel_mmsi': best_mmsi,
|
|
'similarity': best_sim,
|
|
'match_method': 'TRACK_SIMILAR',
|
|
})
|
|
|
|
return results
|
|
|
|
|
|
def compute_sog_correlation(
|
|
sog_a: list[float],
|
|
sog_b: list[float],
|
|
) -> float:
|
|
"""두 SOG 시계열의 피어슨 상관계수 (0~1 정규화).
|
|
|
|
시계열 길이가 다르면 짧은 쪽 기준으로 자름.
|
|
데이터 부족(< 3점)이면 0.0 반환.
|
|
"""
|
|
n = min(len(sog_a), len(sog_b))
|
|
if n < 3:
|
|
return 0.0
|
|
|
|
a = sog_a[:n]
|
|
b = sog_b[:n]
|
|
|
|
mean_a = sum(a) / n
|
|
mean_b = sum(b) / n
|
|
|
|
cov = sum((a[i] - mean_a) * (b[i] - mean_b) for i in range(n))
|
|
var_a = sum((x - mean_a) ** 2 for x in a)
|
|
var_b = sum((x - mean_b) ** 2 for x in b)
|
|
|
|
denom = (var_a * var_b) ** 0.5
|
|
if denom < 1e-12:
|
|
return 0.0
|
|
|
|
corr = cov / denom # -1 ~ 1
|
|
return max(0.0, (corr + 1.0) / 2.0) # 0 ~ 1 정규화
|
|
|
|
|
|
def compute_heading_coherence(
|
|
cog_a: list[float],
|
|
cog_b: list[float],
|
|
threshold_deg: float = 30.0,
|
|
) -> float:
|
|
"""두 COG 시계열의 방향 동조율 (0~1).
|
|
|
|
angular diff < threshold_deg 인 비율.
|
|
시계열 길이가 다르면 짧은 쪽 기준.
|
|
데이터 부족(< 3점)이면 0.0 반환.
|
|
"""
|
|
n = min(len(cog_a), len(cog_b))
|
|
if n < 3:
|
|
return 0.0
|
|
|
|
coherent = 0
|
|
for i in range(n):
|
|
diff = abs(cog_a[i] - cog_b[i])
|
|
if diff > 180.0:
|
|
diff = 360.0 - diff
|
|
if diff < threshold_deg:
|
|
coherent += 1
|
|
|
|
return coherent / n
|
|
|
|
|
|
def compute_proximity_ratio(
|
|
track_a: list[tuple[float, float]],
|
|
track_b: list[tuple[float, float]],
|
|
threshold_nm: float = 10.0,
|
|
) -> float:
|
|
"""두 궤적의 근접 지속비 (0~1).
|
|
|
|
시간 정렬된 포인트 쌍에서 haversine < threshold_nm 비율.
|
|
시계열 길이가 다르면 짧은 쪽 기준.
|
|
데이터 부족(< 2점)이면 0.0 반환.
|
|
"""
|
|
n = min(len(track_a), len(track_b))
|
|
if n < 2:
|
|
return 0.0
|
|
|
|
close = 0
|
|
threshold_m = threshold_nm * 1852.0
|
|
|
|
for i in range(n):
|
|
dist = haversine_m(track_a[i][0], track_a[i][1],
|
|
track_b[i][0], track_b[i][1])
|
|
if dist < threshold_m:
|
|
close += 1
|
|
|
|
return close / n
|