FleetClusterLayer.tsx 2357줄 → 10개 파일 분리: - fleetClusterTypes/Utils/Constants: 타입, 기하 함수, 모델 상수 - useFleetClusterGeoJson: 27개 useMemo GeoJSON 훅 - FleetClusterMapLayers: MapLibre Source/Layer JSX - CorrelationPanel/HistoryReplayController: 패널 서브컴포넌트 - GearGroupSection/FleetGearListPanel: 좌측 목록 (DRY) - FleetClusterLayer: 오케스트레이터 524줄 deck.gl + Zustand 리플레이 기반 (Phase 0~2): - zustand 5.0.12, @deck.gl/geo-layers 9.2.11 설치 - gearReplayStore: Zustand + rAF 애니메이션 루프 - gearReplayPreprocess: TripsLayer 전처리 + cursor O(1) 보간 - useGearReplayLayers: deck.gl 레이어 빌더 (10fps 스로틀) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
786 lines
26 KiB
Python
786 lines
26 KiB
Python
"""어구 그룹 다단계 연관성 분석 — 멀티모델 패턴 추적.
|
|
|
|
Phase 1: default 모델 1개로 동작 (DB에서 is_active=true 모델 로드).
|
|
Phase 2: 글로벌 모델 max 5개 병렬 실행.
|
|
|
|
어구 중심 점수 체계:
|
|
- 어구 신호 기준 관측 윈도우 (어구 비활성 시 FREEZE)
|
|
- 선박 shadow 추적 (비활성 → 활성 전환 시 보너스)
|
|
- 적응형 EMA + streak 자기강화
|
|
- 퍼센트 기반 무제한 추적 (50%+)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import math
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ── 상수 ──────────────────────────────────────────────────────────
|
|
_EARTH_RADIUS_NM = 3440.065
|
|
_NM_TO_M = 1852.0
|
|
|
|
|
|
# ── 파라미터 모델 ─────────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class ModelParams:
|
|
"""추적 모델의 전체 파라미터셋."""
|
|
|
|
model_id: int = 1
|
|
name: str = 'default'
|
|
|
|
# EMA
|
|
alpha_base: float = 0.30
|
|
alpha_min: float = 0.08
|
|
alpha_decay_per_streak: float = 0.005
|
|
|
|
# 임계값
|
|
track_threshold: float = 0.50
|
|
polygon_threshold: float = 0.70
|
|
|
|
# 메트릭 가중치 — 어구-선박
|
|
w_proximity: float = 0.45
|
|
w_visit: float = 0.35
|
|
w_activity: float = 0.20
|
|
|
|
# 메트릭 가중치 — 선박-선박
|
|
w_dtw: float = 0.30
|
|
w_sog_corr: float = 0.20
|
|
w_heading: float = 0.25
|
|
w_prox_vv: float = 0.25
|
|
|
|
# 메트릭 가중치 — 어구-어구
|
|
w_prox_persist: float = 0.50
|
|
w_drift: float = 0.30
|
|
w_signal_sync: float = 0.20
|
|
|
|
# Freeze 기준
|
|
group_quiet_ratio: float = 0.30
|
|
normal_gap_hours: float = 1.0
|
|
|
|
# 감쇠
|
|
decay_slow: float = 0.015
|
|
decay_fast: float = 0.08
|
|
stale_hours: float = 6.0
|
|
|
|
# Shadow
|
|
shadow_stay_bonus: float = 0.10
|
|
shadow_return_bonus: float = 0.15
|
|
|
|
# 거리
|
|
candidate_radius_factor: float = 3.0
|
|
proximity_threshold_nm: float = 5.0
|
|
visit_threshold_nm: float = 5.0
|
|
|
|
# 야간
|
|
night_bonus: float = 1.3
|
|
|
|
# 장기 감쇠
|
|
long_decay_days: float = 7.0
|
|
|
|
@classmethod
|
|
def from_db_row(cls, row: dict) -> ModelParams:
|
|
"""DB correlation_param_models 행에서 생성."""
|
|
params_json = row.get('params', {})
|
|
return cls(
|
|
model_id=row['id'],
|
|
name=row['name'],
|
|
**{k: v for k, v in params_json.items() if hasattr(cls, k)},
|
|
)
|
|
|
|
|
|
# ── Haversine 거리 ────────────────────────────────────────────────
|
|
|
|
def _haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
|
"""두 좌표 간 거리 (해리)."""
|
|
phi1 = math.radians(lat1)
|
|
phi2 = math.radians(lat2)
|
|
dphi = math.radians(lat2 - lat1)
|
|
dlam = math.radians(lon2 - lon1)
|
|
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
|
|
return _EARTH_RADIUS_NM * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
|
|
|
|
|
# ── Freeze 판단 ───────────────────────────────────────────────────
|
|
|
|
def should_freeze(
|
|
gear_group_active_ratio: float,
|
|
target_last_observed: Optional[datetime],
|
|
now: datetime,
|
|
params: ModelParams,
|
|
) -> tuple[bool, str]:
|
|
"""감쇠 적용 여부 판단. 어구 그룹이 기준."""
|
|
# 1. 어구 그룹 비활성 → 비교 불가
|
|
if gear_group_active_ratio < params.group_quiet_ratio:
|
|
return True, 'GROUP_QUIET'
|
|
|
|
# 2. 개별 부재가 정상 범위
|
|
if target_last_observed is not None:
|
|
hours_absent = (now - target_last_observed).total_seconds() / 3600
|
|
if hours_absent < params.normal_gap_hours:
|
|
return True, 'NORMAL_GAP'
|
|
|
|
return False, 'ACTIVE'
|
|
|
|
|
|
# ── EMA 업데이트 ──────────────────────────────────────────────────
|
|
|
|
def update_score(
|
|
prev_score: Optional[float],
|
|
raw_score: Optional[float],
|
|
streak: int,
|
|
last_observed: Optional[datetime],
|
|
now: datetime,
|
|
gear_group_active_ratio: float,
|
|
shadow_bonus: float,
|
|
params: ModelParams,
|
|
) -> tuple[float, int, str]:
|
|
"""적응형 EMA 점수 업데이트.
|
|
|
|
Returns: (new_score, new_streak, state)
|
|
"""
|
|
# 관측 불가
|
|
if raw_score is None:
|
|
frz, reason = should_freeze(
|
|
gear_group_active_ratio, last_observed, now, params,
|
|
)
|
|
if frz:
|
|
return (prev_score or 0.0), streak, reason
|
|
|
|
# 실제 이탈 → 감쇠
|
|
hours_absent = 0.0
|
|
if last_observed is not None:
|
|
hours_absent = (now - last_observed).total_seconds() / 3600
|
|
decay = params.decay_fast if hours_absent > params.stale_hours else params.decay_slow
|
|
return max(0.0, (prev_score or 0.0) - decay), 0, 'SIGNAL_LOSS'
|
|
|
|
# Shadow 보너스
|
|
adjusted = min(1.0, raw_score + shadow_bonus)
|
|
|
|
# Case 1: 임계값 이상 → streak 보상
|
|
if adjusted >= params.track_threshold:
|
|
streak += 1
|
|
alpha = max(params.alpha_min,
|
|
params.alpha_base - streak * params.alpha_decay_per_streak)
|
|
if prev_score is None:
|
|
return adjusted, streak, 'ACTIVE'
|
|
return alpha * adjusted + (1.0 - alpha) * prev_score, streak, 'ACTIVE'
|
|
|
|
# Case 2: 패턴 이탈
|
|
alpha = params.alpha_base
|
|
if prev_score is None:
|
|
return adjusted, 0, 'PATTERN_DIVERGE'
|
|
return alpha * adjusted + (1.0 - alpha) * prev_score, 0, 'PATTERN_DIVERGE'
|
|
|
|
|
|
# ── 어구-선박 메트릭 ──────────────────────────────────────────────
|
|
|
|
def _compute_gear_vessel_metrics(
|
|
gear_center_lat: float,
|
|
gear_center_lon: float,
|
|
gear_radius_nm: float,
|
|
vessel_track: list[dict],
|
|
params: ModelParams,
|
|
) -> dict:
|
|
"""어구 그룹 중심 vs 선박 궤적 메트릭.
|
|
|
|
vessel_track: [{lat, lon, sog, cog, timestamp}, ...]
|
|
"""
|
|
if not vessel_track:
|
|
return {'proximity_ratio': 0, 'visit_score': 0, 'activity_sync': 0, 'composite': 0}
|
|
|
|
threshold_nm = max(gear_radius_nm * 2, params.proximity_threshold_nm)
|
|
|
|
# 1. proximity_ratio — 근접 지속비
|
|
close_count = 0
|
|
for p in vessel_track:
|
|
d = _haversine_nm(gear_center_lat, gear_center_lon, p['lat'], p['lon'])
|
|
if d < threshold_nm:
|
|
close_count += 1
|
|
proximity_ratio = close_count / len(vessel_track)
|
|
|
|
# 2. visit_score — 방문 패턴
|
|
visit_threshold = params.visit_threshold_nm
|
|
in_zone = False
|
|
visits = 0
|
|
stay_points = 0
|
|
away_points = 0
|
|
|
|
for p in vessel_track:
|
|
d = _haversine_nm(gear_center_lat, gear_center_lon, p['lat'], p['lon'])
|
|
if d < visit_threshold:
|
|
if not in_zone:
|
|
visits += 1
|
|
in_zone = True
|
|
stay_points += 1
|
|
else:
|
|
in_zone = False
|
|
away_points += 1
|
|
|
|
visit_count_norm = min(1.0, visits / 5.0) if visits > 0 else 0.0
|
|
total = stay_points + away_points
|
|
stay_ratio = stay_points / total if total > 0 else 0.0
|
|
visit_score = 0.5 * visit_count_norm + 0.5 * stay_ratio
|
|
|
|
# 3. activity_sync — 영역 내 저속 비율 (조업/관리 행위)
|
|
in_zone_count = 0
|
|
in_zone_slow = 0
|
|
for p in vessel_track:
|
|
d = _haversine_nm(gear_center_lat, gear_center_lon, p['lat'], p['lon'])
|
|
if d < visit_threshold:
|
|
in_zone_count += 1
|
|
if p.get('sog', 0) < 2.0:
|
|
in_zone_slow += 1
|
|
activity_sync = in_zone_slow / in_zone_count if in_zone_count > 0 else 0.0
|
|
|
|
# 가중 합산
|
|
composite = (
|
|
params.w_proximity * proximity_ratio
|
|
+ params.w_visit * visit_score
|
|
+ params.w_activity * activity_sync
|
|
)
|
|
|
|
return {
|
|
'proximity_ratio': round(proximity_ratio, 4),
|
|
'visit_score': round(visit_score, 4),
|
|
'activity_sync': round(activity_sync, 4),
|
|
'composite': round(composite, 4),
|
|
}
|
|
|
|
|
|
# ── 선박-선박 메트릭 ──────────────────────────────────────────────
|
|
|
|
def _compute_vessel_vessel_metrics(
|
|
track_a: list[dict],
|
|
track_b: list[dict],
|
|
params: ModelParams,
|
|
) -> dict:
|
|
"""두 선박 궤적 간 메트릭."""
|
|
from algorithms.track_similarity import (
|
|
compute_heading_coherence,
|
|
compute_proximity_ratio,
|
|
compute_sog_correlation,
|
|
compute_track_similarity,
|
|
)
|
|
|
|
if not track_a or not track_b:
|
|
return {
|
|
'dtw_similarity': 0, 'speed_correlation': 0,
|
|
'heading_coherence': 0, 'proximity_ratio': 0, 'composite': 0,
|
|
}
|
|
|
|
# DTW
|
|
pts_a = [(p['lat'], p['lon']) for p in track_a]
|
|
pts_b = [(p['lat'], p['lon']) for p in track_b]
|
|
dtw_sim = compute_track_similarity(pts_a, pts_b)
|
|
|
|
# SOG 상관
|
|
sog_a = [p.get('sog', 0) for p in track_a]
|
|
sog_b = [p.get('sog', 0) for p in track_b]
|
|
sog_corr = compute_sog_correlation(sog_a, sog_b)
|
|
|
|
# COG 동조
|
|
cog_a = [p.get('cog', 0) for p in track_a]
|
|
cog_b = [p.get('cog', 0) for p in track_b]
|
|
heading = compute_heading_coherence(cog_a, cog_b)
|
|
|
|
# 근접비
|
|
prox = compute_proximity_ratio(pts_a, pts_b, params.proximity_threshold_nm)
|
|
|
|
composite = (
|
|
params.w_dtw * dtw_sim
|
|
+ params.w_sog_corr * sog_corr
|
|
+ params.w_heading * heading
|
|
+ params.w_prox_vv * prox
|
|
)
|
|
|
|
return {
|
|
'dtw_similarity': round(dtw_sim, 4),
|
|
'speed_correlation': round(sog_corr, 4),
|
|
'heading_coherence': round(heading, 4),
|
|
'proximity_ratio': round(prox, 4),
|
|
'composite': round(composite, 4),
|
|
}
|
|
|
|
|
|
# ── 어구-어구 메트릭 ──────────────────────────────────────────────
|
|
|
|
def _compute_gear_gear_metrics(
|
|
center_a: tuple[float, float],
|
|
center_b: tuple[float, float],
|
|
center_history_a: list[dict],
|
|
center_history_b: list[dict],
|
|
params: ModelParams,
|
|
) -> dict:
|
|
"""두 어구 그룹 간 메트릭."""
|
|
if not center_history_a or not center_history_b:
|
|
return {
|
|
'proximity_ratio': 0, 'drift_similarity': 0,
|
|
'composite': 0,
|
|
}
|
|
|
|
# 1. 근접 지속성 — 현재 중심 간 거리의 안정성
|
|
dist_nm = _haversine_nm(center_a[0], center_a[1], center_b[0], center_b[1])
|
|
prox_persist = max(0.0, 1.0 - dist_nm / 20.0) # 20NM 이상이면 0
|
|
|
|
# 2. 표류 유사도 — 중심 이동 벡터 코사인 유사도
|
|
drift_sim = 0.0
|
|
n = min(len(center_history_a), len(center_history_b))
|
|
if n >= 2:
|
|
# 마지막 2점으로 이동 벡터 계산
|
|
da_lat = center_history_a[-1].get('lat', 0) - center_history_a[-2].get('lat', 0)
|
|
da_lon = center_history_a[-1].get('lon', 0) - center_history_a[-2].get('lon', 0)
|
|
db_lat = center_history_b[-1].get('lat', 0) - center_history_b[-2].get('lat', 0)
|
|
db_lon = center_history_b[-1].get('lon', 0) - center_history_b[-2].get('lon', 0)
|
|
|
|
dot = da_lat * db_lat + da_lon * db_lon
|
|
mag_a = (da_lat ** 2 + da_lon ** 2) ** 0.5
|
|
mag_b = (db_lat ** 2 + db_lon ** 2) ** 0.5
|
|
if mag_a > 1e-10 and mag_b > 1e-10:
|
|
cos_sim = dot / (mag_a * mag_b)
|
|
drift_sim = max(0.0, (cos_sim + 1.0) / 2.0)
|
|
|
|
composite = (
|
|
params.w_prox_persist * prox_persist
|
|
+ params.w_drift * drift_sim
|
|
)
|
|
|
|
return {
|
|
'proximity_ratio': round(prox_persist, 4),
|
|
'drift_similarity': round(drift_sim, 4),
|
|
'composite': round(composite, 4),
|
|
}
|
|
|
|
|
|
# ── Shadow 보너스 계산 ────────────────────────────────────────────
|
|
|
|
def compute_shadow_bonus(
|
|
vessel_positions_during_inactive: list[dict],
|
|
last_known_gear_center: tuple[float, float],
|
|
group_radius_nm: float,
|
|
params: ModelParams,
|
|
) -> tuple[float, bool, bool]:
|
|
"""어구 비활성 동안 선박이 어구 근처에 머물렀는지 평가.
|
|
|
|
Returns: (bonus, stayed_nearby, returned_before_resume)
|
|
"""
|
|
if not vessel_positions_during_inactive or last_known_gear_center is None:
|
|
return 0.0, False, False
|
|
|
|
gc_lat, gc_lon = last_known_gear_center
|
|
threshold_nm = max(group_radius_nm * 2, params.proximity_threshold_nm)
|
|
|
|
# 1. 평균 거리
|
|
dists = [
|
|
_haversine_nm(gc_lat, gc_lon, p['lat'], p['lon'])
|
|
for p in vessel_positions_during_inactive
|
|
]
|
|
avg_dist = sum(dists) / len(dists)
|
|
stayed = avg_dist < threshold_nm
|
|
|
|
# 2. 마지막 위치가 근처인지 (복귀 판단)
|
|
returned = dists[-1] < threshold_nm if dists else False
|
|
|
|
bonus = 0.0
|
|
if stayed:
|
|
bonus += params.shadow_stay_bonus
|
|
if returned:
|
|
bonus += params.shadow_return_bonus
|
|
|
|
return bonus, stayed, returned
|
|
|
|
|
|
# ── 후보 필터링 ───────────────────────────────────────────────────
|
|
|
|
def _compute_group_radius(members: list[dict]) -> float:
|
|
"""그룹 멤버 간 최대 거리의 절반 (NM)."""
|
|
if len(members) < 2:
|
|
return 1.0 # 최소 1NM
|
|
|
|
max_dist = 0.0
|
|
for i in range(len(members)):
|
|
for j in range(i + 1, len(members)):
|
|
d = _haversine_nm(
|
|
members[i]['lat'], members[i]['lon'],
|
|
members[j]['lat'], members[j]['lon'],
|
|
)
|
|
if d > max_dist:
|
|
max_dist = d
|
|
|
|
return max(1.0, max_dist / 2.0)
|
|
|
|
|
|
def find_candidates(
|
|
gear_center_lat: float,
|
|
gear_center_lon: float,
|
|
group_radius_nm: float,
|
|
group_mmsis: set[str],
|
|
all_positions: dict[str, dict],
|
|
params: ModelParams,
|
|
) -> list[str]:
|
|
"""어구 그룹 주변 후보 MMSI 필터링."""
|
|
search_radius = group_radius_nm * params.candidate_radius_factor
|
|
candidates = []
|
|
|
|
for mmsi, pos in all_positions.items():
|
|
if mmsi in group_mmsis:
|
|
continue
|
|
d = _haversine_nm(gear_center_lat, gear_center_lon, pos['lat'], pos['lon'])
|
|
if d < search_radius:
|
|
candidates.append(mmsi)
|
|
|
|
return candidates
|
|
|
|
|
|
# ── 메인 실행 ─────────────────────────────────────────────────────
|
|
|
|
def _get_vessel_track(vessel_store, mmsi: str, hours: int = 6) -> list[dict]:
|
|
"""vessel_store에서 특정 MMSI의 최근 N시간 궤적 추출 (벡터화)."""
|
|
df = vessel_store._tracks.get(mmsi)
|
|
if df is None or len(df) == 0:
|
|
return []
|
|
|
|
import pandas as pd
|
|
now = datetime.now(timezone.utc)
|
|
cutoff = now - pd.Timedelta(hours=hours)
|
|
|
|
ts_col = df['timestamp']
|
|
if hasattr(ts_col.dtype, 'tz') and ts_col.dtype.tz is not None:
|
|
mask = ts_col >= pd.Timestamp(cutoff)
|
|
else:
|
|
mask = ts_col >= pd.Timestamp(cutoff.replace(tzinfo=None))
|
|
|
|
recent = df.loc[mask]
|
|
if recent.empty:
|
|
return []
|
|
|
|
# 벡터화 추출 (iterrows 대신)
|
|
lats = recent['lat'].values
|
|
lons = recent['lon'].values
|
|
sogs = (recent['sog'] if 'sog' in recent.columns
|
|
else recent.get('raw_sog', pd.Series(dtype=float))).fillna(0).values
|
|
cogs = (recent['cog'] if 'cog' in recent.columns
|
|
else pd.Series(0, index=recent.index)).fillna(0).values
|
|
|
|
return [
|
|
{'lat': float(lats[i]), 'lon': float(lons[i]),
|
|
'sog': float(sogs[i]), 'cog': float(cogs[i])}
|
|
for i in range(len(lats))
|
|
]
|
|
|
|
|
|
def _compute_gear_active_ratio(
|
|
gear_members: list[dict],
|
|
all_positions: dict[str, dict],
|
|
now: datetime,
|
|
stale_sec: float = 21600,
|
|
) -> float:
|
|
"""어구 그룹의 활성 멤버 비율."""
|
|
if not gear_members:
|
|
return 0.0
|
|
|
|
active = 0
|
|
for m in gear_members:
|
|
pos = all_positions.get(m['mmsi'])
|
|
if pos is None:
|
|
continue
|
|
ts = pos.get('timestamp')
|
|
if ts is None:
|
|
continue
|
|
if isinstance(ts, datetime):
|
|
last_dt = ts if ts.tzinfo is not None else ts.replace(tzinfo=timezone.utc)
|
|
else:
|
|
try:
|
|
import pandas as pd
|
|
last_dt = pd.Timestamp(ts).to_pydatetime()
|
|
if last_dt.tzinfo is None:
|
|
last_dt = last_dt.replace(tzinfo=timezone.utc)
|
|
except Exception:
|
|
continue
|
|
age = (now - last_dt).total_seconds()
|
|
if age < stale_sec:
|
|
active += 1
|
|
|
|
return active / len(gear_members)
|
|
|
|
|
|
def _is_gear_pattern(name: str) -> bool:
|
|
"""어구 이름 패턴 판별."""
|
|
import re
|
|
return bool(re.match(r'^.+_\d+_\d*$', name or ''))
|
|
|
|
|
|
_MAX_CANDIDATES_PER_GROUP = 30 # 후보 수 상한 (성능 보호)
|
|
|
|
|
|
def run_gear_correlation(
|
|
vessel_store,
|
|
gear_groups: list[dict],
|
|
conn,
|
|
) -> dict:
|
|
"""어구 연관성 분석 메인 실행 (배치 최적화).
|
|
|
|
Args:
|
|
vessel_store: VesselStore 인스턴스
|
|
gear_groups: detect_gear_groups() 결과
|
|
conn: kcgdb 커넥션
|
|
|
|
Returns:
|
|
{'updated': int, 'models': int, 'raw_inserted': int}
|
|
"""
|
|
import time as _time
|
|
import re as _re
|
|
|
|
_gear_re = _re.compile(r'^.+_(?=\S*\d)\S+(?:[_ ]\S*)*[_ ]*$|^.+%$|^\d+$')
|
|
|
|
t0 = _time.time()
|
|
now = datetime.now(timezone.utc)
|
|
all_positions = vessel_store.get_all_latest_positions()
|
|
|
|
# 활성 모델 로드
|
|
models = _load_active_models(conn)
|
|
if not models:
|
|
logger.warning('no active correlation models found')
|
|
return {'updated': 0, 'models': 0, 'raw_inserted': 0}
|
|
|
|
# 기존 점수 전체 사전 로드 (건별 쿼리 대신 벌크)
|
|
all_scores = _load_all_scores(conn)
|
|
|
|
raw_batch: list[tuple] = []
|
|
score_batch: list[tuple] = []
|
|
total_updated = 0
|
|
total_raw = 0
|
|
|
|
default_params = models[0]
|
|
|
|
for gear_group in gear_groups:
|
|
parent_name = gear_group['parent_name']
|
|
members = gear_group['members']
|
|
if not members:
|
|
continue
|
|
|
|
# 그룹 중심 + 반경
|
|
center_lat = sum(m['lat'] for m in members) / len(members)
|
|
center_lon = sum(m['lon'] for m in members) / len(members)
|
|
group_radius = _compute_group_radius(members)
|
|
|
|
# 어구 활성도
|
|
active_ratio = _compute_gear_active_ratio(members, all_positions, now)
|
|
|
|
# 그룹 멤버 MMSI 셋
|
|
group_mmsis = {m['mmsi'] for m in members}
|
|
if gear_group.get('parent_mmsi'):
|
|
group_mmsis.add(gear_group['parent_mmsi'])
|
|
|
|
# 후보 필터링 + 수 제한
|
|
candidates = find_candidates(
|
|
center_lat, center_lon, group_radius,
|
|
group_mmsis, all_positions, default_params,
|
|
)
|
|
if not candidates:
|
|
continue
|
|
if len(candidates) > _MAX_CANDIDATES_PER_GROUP:
|
|
# 가까운 순서로 제한
|
|
candidates.sort(key=lambda m: _haversine_nm(
|
|
center_lat, center_lon,
|
|
all_positions[m]['lat'], all_positions[m]['lon'],
|
|
))
|
|
candidates = candidates[:_MAX_CANDIDATES_PER_GROUP]
|
|
|
|
for target_mmsi in candidates:
|
|
target_pos = all_positions.get(target_mmsi)
|
|
if target_pos is None:
|
|
continue
|
|
|
|
target_name = target_pos.get('name', '')
|
|
target_is_gear = bool(_gear_re.match(target_name or ''))
|
|
target_type = 'GEAR_BUOY' if target_is_gear else 'VESSEL'
|
|
|
|
# 메트릭 계산 (어구는 단순 거리, 선박은 track 기반)
|
|
if target_is_gear:
|
|
d = _haversine_nm(center_lat, center_lon,
|
|
target_pos['lat'], target_pos['lon'])
|
|
prox = max(0.0, 1.0 - d / 20.0)
|
|
metrics = {'proximity_ratio': prox, 'composite': prox}
|
|
else:
|
|
vessel_track = _get_vessel_track(vessel_store, target_mmsi, hours=6)
|
|
metrics = _compute_gear_vessel_metrics(
|
|
center_lat, center_lon, group_radius,
|
|
vessel_track, default_params,
|
|
)
|
|
|
|
# raw 메트릭 배치 수집
|
|
raw_batch.append((
|
|
now, parent_name, target_mmsi, target_type, target_name,
|
|
metrics.get('proximity_ratio'), metrics.get('visit_score'),
|
|
metrics.get('activity_sync'), metrics.get('dtw_similarity'),
|
|
metrics.get('speed_correlation'), metrics.get('heading_coherence'),
|
|
metrics.get('drift_similarity'), False, False, active_ratio,
|
|
))
|
|
total_raw += 1
|
|
|
|
# 모델별 EMA 업데이트
|
|
for model in models:
|
|
if target_is_gear:
|
|
composite = metrics.get('proximity_ratio', 0) * model.w_prox_persist
|
|
else:
|
|
composite = (
|
|
model.w_proximity * (metrics.get('proximity_ratio') or 0)
|
|
+ model.w_visit * (metrics.get('visit_score') or 0)
|
|
+ model.w_activity * (metrics.get('activity_sync') or 0)
|
|
)
|
|
|
|
# 사전 로드된 점수에서 조회 (DB 쿼리 없음)
|
|
score_key = (model.model_id, parent_name, target_mmsi)
|
|
prev = all_scores.get(score_key)
|
|
prev_score = prev['current_score'] if prev else None
|
|
streak = prev['streak_count'] if prev else 0
|
|
last_obs = prev['last_observed_at'] if prev else None
|
|
|
|
new_score, new_streak, state = update_score(
|
|
prev_score, composite, streak,
|
|
last_obs, now, active_ratio,
|
|
0.0, model,
|
|
)
|
|
|
|
if new_score >= model.track_threshold or prev is not None:
|
|
score_batch.append((
|
|
model.model_id, parent_name, target_mmsi,
|
|
target_type, target_name,
|
|
round(new_score, 6), new_streak, state,
|
|
now, now, now,
|
|
))
|
|
total_updated += 1
|
|
|
|
# 배치 DB 저장
|
|
_batch_insert_raw(conn, raw_batch)
|
|
_batch_upsert_scores(conn, score_batch)
|
|
conn.commit()
|
|
|
|
elapsed = round(_time.time() - t0, 2)
|
|
logger.info(
|
|
'gear correlation internals: %.2fs, %d groups, %d raw, %d scores, %d models',
|
|
elapsed, len(gear_groups), total_raw, total_updated, len(models),
|
|
)
|
|
|
|
return {
|
|
'updated': total_updated,
|
|
'models': len(models),
|
|
'raw_inserted': total_raw,
|
|
}
|
|
|
|
|
|
# ── DB 헬퍼 (배치 최적화) ─────────────────────────────────────────
|
|
|
|
def _load_active_models(conn) -> list[ModelParams]:
|
|
"""활성 모델 로드."""
|
|
cur = conn.cursor()
|
|
try:
|
|
cur.execute(
|
|
"SELECT id, name, params FROM kcg.correlation_param_models "
|
|
"WHERE is_active = TRUE ORDER BY is_default DESC, id ASC"
|
|
)
|
|
rows = cur.fetchall()
|
|
models = []
|
|
for row in rows:
|
|
import json
|
|
params = row[2] if isinstance(row[2], dict) else json.loads(row[2])
|
|
models.append(ModelParams.from_db_row({
|
|
'id': row[0], 'name': row[1], 'params': params,
|
|
}))
|
|
return models
|
|
except Exception as e:
|
|
logger.error('failed to load models: %s', e)
|
|
return [ModelParams()]
|
|
finally:
|
|
cur.close()
|
|
|
|
|
|
def _load_all_scores(conn) -> dict[tuple, dict]:
|
|
"""모든 점수를 사전 로드. {(model_id, group_key, target_mmsi): {...}}"""
|
|
cur = conn.cursor()
|
|
try:
|
|
cur.execute(
|
|
"SELECT model_id, group_key, target_mmsi, "
|
|
"current_score, streak_count, last_observed_at "
|
|
"FROM kcg.gear_correlation_scores"
|
|
)
|
|
result = {}
|
|
for row in cur.fetchall():
|
|
key = (row[0], row[1], row[2])
|
|
result[key] = {
|
|
'current_score': row[3],
|
|
'streak_count': row[4],
|
|
'last_observed_at': row[5],
|
|
}
|
|
return result
|
|
except Exception as e:
|
|
logger.warning('failed to load all scores: %s', e)
|
|
return {}
|
|
finally:
|
|
cur.close()
|
|
|
|
|
|
def _batch_insert_raw(conn, batch: list[tuple]):
|
|
"""raw 메트릭 배치 INSERT."""
|
|
if not batch:
|
|
return
|
|
cur = conn.cursor()
|
|
try:
|
|
from psycopg2.extras import execute_values
|
|
execute_values(
|
|
cur,
|
|
"""INSERT INTO kcg.gear_correlation_raw_metrics
|
|
(observed_at, group_key, target_mmsi, target_type, target_name,
|
|
proximity_ratio, visit_score, activity_sync,
|
|
dtw_similarity, speed_correlation, heading_coherence,
|
|
drift_similarity, shadow_stay, shadow_return,
|
|
gear_group_active_ratio)
|
|
VALUES %s""",
|
|
batch,
|
|
page_size=500,
|
|
)
|
|
except Exception as e:
|
|
logger.warning('batch insert raw failed: %s', e)
|
|
finally:
|
|
cur.close()
|
|
|
|
|
|
def _batch_upsert_scores(conn, batch: list[tuple]):
|
|
"""점수 배치 UPSERT."""
|
|
if not batch:
|
|
return
|
|
cur = conn.cursor()
|
|
try:
|
|
from psycopg2.extras import execute_values
|
|
execute_values(
|
|
cur,
|
|
"""INSERT INTO kcg.gear_correlation_scores
|
|
(model_id, group_key, target_mmsi, target_type, target_name,
|
|
current_score, streak_count, freeze_state,
|
|
first_observed_at, last_observed_at, updated_at)
|
|
VALUES %s
|
|
ON CONFLICT (model_id, group_key, target_mmsi)
|
|
DO UPDATE SET
|
|
target_type = EXCLUDED.target_type,
|
|
target_name = EXCLUDED.target_name,
|
|
current_score = EXCLUDED.current_score,
|
|
streak_count = EXCLUDED.streak_count,
|
|
freeze_state = EXCLUDED.freeze_state,
|
|
observation_count = kcg.gear_correlation_scores.observation_count + 1,
|
|
last_observed_at = EXCLUDED.last_observed_at,
|
|
updated_at = EXCLUDED.updated_at""",
|
|
batch,
|
|
page_size=500,
|
|
)
|
|
except Exception as e:
|
|
logger.warning('batch upsert scores failed: %s', e)
|
|
finally:
|
|
cur.close()
|