kcg-monitoring/prediction/algorithms/gear_correlation.py
htlee 7dd46f2078 feat: 어구 모선 추론(Gear Parent Inference) 시스템 이식
Codex Lab 환경(iran-airstrike-replay-codex)에서 검증 완료된
어구 모선 자동 추론 + 검토 워크플로우 전체를 이식.

## Python (prediction/)
- gear_parent_inference(1,428줄): 다층 점수 모델 (correlation + name + track + prior bonus)
- gear_parent_episode(631줄): Episode 연속성 (Jaccard + 공간거리)
- gear_name_rules: 모선 이름 정규화 + 4자 미만 필터
- scheduler: 추론 호출 단계 추가 (4.8)
- fleet_tracker/kcgdb: SQL qualified_table() 동적화
- gear_correlation: timestamp 필드 추가

## DB (database/migration/ 012~015)
- 후보 스냅샷, resolution, episode, 라벨 세션, 제외 관리 테이블 9개 + VIEW 2개

## Backend (Java)
- 12개 DTO/Controller (ParentInferenceWorkflowController 등)
- GroupPolygonService: parent_resolution LEFT JOIN + 15개 API 메서드

## Frontend
- ParentReviewPanel: 모선 검토 대시보드
- vesselAnalysis: 10개 신규 API 함수 + 6개 타입

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 00:42:31 +09:00

835 lines
29 KiB
Python

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""어구 그룹 다단계 연관성 분석 — 멀티모델 패턴 추적.
Phase 1: default 모델 1개로 동작 (DB에서 is_active=true 모델 로드).
Phase 2: 글로벌 모델 max 5개 병렬 실행.
어구 중심 점수 체계:
- 어구 신호 기준 관측 윈도우 (어구 비활성 시 FREEZE)
- 선박 shadow 추적 (비활성 → 활성 전환 시 보너스)
- 적응형 EMA + streak 자기강화
- 퍼센트 기반 무제한 추적 (50%+)
"""
from __future__ import annotations
import logging
import math
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
from algorithms.polygon_builder import _get_time_bucket_age
from config import qualified_table
logger = logging.getLogger(__name__)
# ── 상수 ──────────────────────────────────────────────────────────
_EARTH_RADIUS_NM = 3440.065
_NM_TO_M = 1852.0
CORRELATION_PARAM_MODELS = qualified_table('correlation_param_models')
GEAR_CORRELATION_SCORES = qualified_table('gear_correlation_scores')
GEAR_CORRELATION_RAW_METRICS = qualified_table('gear_correlation_raw_metrics')
# ── 파라미터 모델 ─────────────────────────────────────────────────
@dataclass
class ModelParams:
"""추적 모델의 전체 파라미터셋."""
model_id: int = 1
name: str = 'default'
# EMA
alpha_base: float = 0.30
alpha_min: float = 0.08
alpha_decay_per_streak: float = 0.005
# 임계값
track_threshold: float = 0.50
polygon_threshold: float = 0.70
# 메트릭 가중치 — 어구-선박
w_proximity: float = 0.45
w_visit: float = 0.35
w_activity: float = 0.20
# 메트릭 가중치 — 선박-선박
w_dtw: float = 0.30
w_sog_corr: float = 0.20
w_heading: float = 0.25
w_prox_vv: float = 0.25
# 메트릭 가중치 — 어구-어구
w_prox_persist: float = 0.50
w_drift: float = 0.30
w_signal_sync: float = 0.20
# Freeze 기준
group_quiet_ratio: float = 0.30
normal_gap_hours: float = 1.0
# 감쇠
decay_slow: float = 0.015
decay_fast: float = 0.08
stale_hours: float = 6.0
# Shadow
shadow_stay_bonus: float = 0.10
shadow_return_bonus: float = 0.15
# 거리
candidate_radius_factor: float = 3.0
proximity_threshold_nm: float = 5.0
visit_threshold_nm: float = 5.0
# 야간
night_bonus: float = 1.3
# 장기 감쇠
long_decay_days: float = 7.0
@classmethod
def from_db_row(cls, row: dict) -> ModelParams:
"""DB correlation_param_models 행에서 생성."""
params_json = row.get('params', {})
return cls(
model_id=row['id'],
name=row['name'],
**{k: v for k, v in params_json.items() if hasattr(cls, k)},
)
# ── Haversine 거리 ────────────────────────────────────────────────
def _haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""두 좌표 간 거리 (해리)."""
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
return _EARTH_RADIUS_NM * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
# ── Freeze 판단 ───────────────────────────────────────────────────
def should_freeze(
gear_group_active_ratio: float,
target_last_observed: Optional[datetime],
now: datetime,
params: ModelParams,
) -> tuple[bool, str]:
"""감쇠 적용 여부 판단. 어구 그룹이 기준."""
# 1. 어구 그룹 비활성 → 비교 불가
if gear_group_active_ratio < params.group_quiet_ratio:
return True, 'GROUP_QUIET'
# 2. 개별 부재가 정상 범위
if target_last_observed is not None:
hours_absent = (now - target_last_observed).total_seconds() / 3600
if hours_absent < params.normal_gap_hours:
return True, 'NORMAL_GAP'
return False, 'ACTIVE'
# ── EMA 업데이트 ──────────────────────────────────────────────────
def update_score(
prev_score: Optional[float],
raw_score: Optional[float],
streak: int,
last_observed: Optional[datetime],
now: datetime,
gear_group_active_ratio: float,
shadow_bonus: float,
params: ModelParams,
) -> tuple[float, int, str]:
"""적응형 EMA 점수 업데이트.
Returns: (new_score, new_streak, state)
"""
# 관측 불가
if raw_score is None:
frz, reason = should_freeze(
gear_group_active_ratio, last_observed, now, params,
)
if frz:
return (prev_score or 0.0), streak, reason
# 실제 이탈 → 감쇠
hours_absent = 0.0
if last_observed is not None:
hours_absent = (now - last_observed).total_seconds() / 3600
decay = params.decay_fast if hours_absent > params.stale_hours else params.decay_slow
return max(0.0, (prev_score or 0.0) - decay), 0, 'SIGNAL_LOSS'
# Shadow 보너스
adjusted = min(1.0, raw_score + shadow_bonus)
# Case 1: 임계값 이상 → streak 보상
if adjusted >= params.track_threshold:
streak += 1
alpha = max(params.alpha_min,
params.alpha_base - streak * params.alpha_decay_per_streak)
if prev_score is None:
return adjusted, streak, 'ACTIVE'
return alpha * adjusted + (1.0 - alpha) * prev_score, streak, 'ACTIVE'
# Case 2: 패턴 이탈
alpha = params.alpha_base
if prev_score is None:
return adjusted, 0, 'PATTERN_DIVERGE'
return alpha * adjusted + (1.0 - alpha) * prev_score, 0, 'PATTERN_DIVERGE'
# ── 어구-선박 메트릭 ──────────────────────────────────────────────
def _compute_gear_vessel_metrics(
gear_center_lat: float,
gear_center_lon: float,
gear_radius_nm: float,
vessel_track: list[dict],
params: ModelParams,
) -> dict:
"""어구 그룹 중심 vs 선박 궤적 메트릭.
vessel_track: [{lat, lon, sog, cog, timestamp}, ...]
"""
if not vessel_track:
return {'proximity_ratio': 0, 'visit_score': 0, 'activity_sync': 0, 'composite': 0}
threshold_nm = max(gear_radius_nm * 2, params.proximity_threshold_nm)
# 1. proximity_ratio — 근접 지속비
close_count = 0
for p in vessel_track:
d = _haversine_nm(gear_center_lat, gear_center_lon, p['lat'], p['lon'])
if d < threshold_nm:
close_count += 1
proximity_ratio = close_count / len(vessel_track)
# 2. visit_score — 방문 패턴
visit_threshold = params.visit_threshold_nm
in_zone = False
visits = 0
stay_points = 0
away_points = 0
for p in vessel_track:
d = _haversine_nm(gear_center_lat, gear_center_lon, p['lat'], p['lon'])
if d < visit_threshold:
if not in_zone:
visits += 1
in_zone = True
stay_points += 1
else:
in_zone = False
away_points += 1
visit_count_norm = min(1.0, visits / 5.0) if visits > 0 else 0.0
total = stay_points + away_points
stay_ratio = stay_points / total if total > 0 else 0.0
visit_score = 0.5 * visit_count_norm + 0.5 * stay_ratio
# 3. activity_sync — 영역 내 저속 비율 (조업/관리 행위)
in_zone_count = 0
in_zone_slow = 0
for p in vessel_track:
d = _haversine_nm(gear_center_lat, gear_center_lon, p['lat'], p['lon'])
if d < visit_threshold:
in_zone_count += 1
if p.get('sog', 0) < 2.0:
in_zone_slow += 1
activity_sync = in_zone_slow / in_zone_count if in_zone_count > 0 else 0.0
# 가중 합산
composite = (
params.w_proximity * proximity_ratio
+ params.w_visit * visit_score
+ params.w_activity * activity_sync
)
return {
'proximity_ratio': round(proximity_ratio, 4),
'visit_score': round(visit_score, 4),
'activity_sync': round(activity_sync, 4),
'composite': round(composite, 4),
}
# ── 선박-선박 메트릭 ──────────────────────────────────────────────
def _compute_vessel_vessel_metrics(
track_a: list[dict],
track_b: list[dict],
params: ModelParams,
) -> dict:
"""두 선박 궤적 간 메트릭."""
from algorithms.track_similarity import (
compute_heading_coherence,
compute_proximity_ratio,
compute_sog_correlation,
compute_track_similarity,
)
if not track_a or not track_b:
return {
'dtw_similarity': 0, 'speed_correlation': 0,
'heading_coherence': 0, 'proximity_ratio': 0, 'composite': 0,
}
# DTW
pts_a = [(p['lat'], p['lon']) for p in track_a]
pts_b = [(p['lat'], p['lon']) for p in track_b]
dtw_sim = compute_track_similarity(pts_a, pts_b)
# SOG 상관
sog_a = [p.get('sog', 0) for p in track_a]
sog_b = [p.get('sog', 0) for p in track_b]
sog_corr = compute_sog_correlation(sog_a, sog_b)
# COG 동조
cog_a = [p.get('cog', 0) for p in track_a]
cog_b = [p.get('cog', 0) for p in track_b]
heading = compute_heading_coherence(cog_a, cog_b)
# 근접비
prox = compute_proximity_ratio(pts_a, pts_b, params.proximity_threshold_nm)
composite = (
params.w_dtw * dtw_sim
+ params.w_sog_corr * sog_corr
+ params.w_heading * heading
+ params.w_prox_vv * prox
)
return {
'dtw_similarity': round(dtw_sim, 4),
'speed_correlation': round(sog_corr, 4),
'heading_coherence': round(heading, 4),
'proximity_ratio': round(prox, 4),
'composite': round(composite, 4),
}
# ── 어구-어구 메트릭 ──────────────────────────────────────────────
def _compute_gear_gear_metrics(
center_a: tuple[float, float],
center_b: tuple[float, float],
center_history_a: list[dict],
center_history_b: list[dict],
params: ModelParams,
) -> dict:
"""두 어구 그룹 간 메트릭."""
if not center_history_a or not center_history_b:
return {
'proximity_ratio': 0, 'drift_similarity': 0,
'composite': 0,
}
# 1. 근접 지속성 — 현재 중심 간 거리의 안정성
dist_nm = _haversine_nm(center_a[0], center_a[1], center_b[0], center_b[1])
prox_persist = max(0.0, 1.0 - dist_nm / 20.0) # 20NM 이상이면 0
# 2. 표류 유사도 — 중심 이동 벡터 코사인 유사도
drift_sim = 0.0
n = min(len(center_history_a), len(center_history_b))
if n >= 2:
# 마지막 2점으로 이동 벡터 계산
da_lat = center_history_a[-1].get('lat', 0) - center_history_a[-2].get('lat', 0)
da_lon = center_history_a[-1].get('lon', 0) - center_history_a[-2].get('lon', 0)
db_lat = center_history_b[-1].get('lat', 0) - center_history_b[-2].get('lat', 0)
db_lon = center_history_b[-1].get('lon', 0) - center_history_b[-2].get('lon', 0)
dot = da_lat * db_lat + da_lon * db_lon
mag_a = (da_lat ** 2 + da_lon ** 2) ** 0.5
mag_b = (db_lat ** 2 + db_lon ** 2) ** 0.5
if mag_a > 1e-10 and mag_b > 1e-10:
cos_sim = dot / (mag_a * mag_b)
drift_sim = max(0.0, (cos_sim + 1.0) / 2.0)
composite = (
params.w_prox_persist * prox_persist
+ params.w_drift * drift_sim
)
return {
'proximity_ratio': round(prox_persist, 4),
'drift_similarity': round(drift_sim, 4),
'composite': round(composite, 4),
}
# ── Shadow 보너스 계산 ────────────────────────────────────────────
def compute_shadow_bonus(
vessel_positions_during_inactive: list[dict],
last_known_gear_center: tuple[float, float],
group_radius_nm: float,
params: ModelParams,
) -> tuple[float, bool, bool]:
"""어구 비활성 동안 선박이 어구 근처에 머물렀는지 평가.
Returns: (bonus, stayed_nearby, returned_before_resume)
"""
if not vessel_positions_during_inactive or last_known_gear_center is None:
return 0.0, False, False
gc_lat, gc_lon = last_known_gear_center
threshold_nm = max(group_radius_nm * 2, params.proximity_threshold_nm)
# 1. 평균 거리
dists = [
_haversine_nm(gc_lat, gc_lon, p['lat'], p['lon'])
for p in vessel_positions_during_inactive
]
avg_dist = sum(dists) / len(dists)
stayed = avg_dist < threshold_nm
# 2. 마지막 위치가 근처인지 (복귀 판단)
returned = dists[-1] < threshold_nm if dists else False
bonus = 0.0
if stayed:
bonus += params.shadow_stay_bonus
if returned:
bonus += params.shadow_return_bonus
return bonus, stayed, returned
# ── 후보 필터링 ───────────────────────────────────────────────────
def _compute_group_radius(members: list[dict]) -> float:
"""그룹 멤버 간 최대 거리의 절반 (NM)."""
if len(members) < 2:
return 1.0 # 최소 1NM
max_dist = 0.0
for i in range(len(members)):
for j in range(i + 1, len(members)):
d = _haversine_nm(
members[i]['lat'], members[i]['lon'],
members[j]['lat'], members[j]['lon'],
)
if d > max_dist:
max_dist = d
return max(1.0, max_dist / 2.0)
def find_candidates(
gear_center_lat: float,
gear_center_lon: float,
group_radius_nm: float,
group_mmsis: set[str],
all_positions: dict[str, dict],
params: ModelParams,
) -> list[str]:
"""어구 그룹 주변 후보 MMSI 필터링."""
search_radius = group_radius_nm * params.candidate_radius_factor
candidates = []
for mmsi, pos in all_positions.items():
if mmsi in group_mmsis:
continue
d = _haversine_nm(gear_center_lat, gear_center_lon, pos['lat'], pos['lon'])
if d < search_radius:
candidates.append(mmsi)
return candidates
# ── 메인 실행 ─────────────────────────────────────────────────────
def _get_vessel_track(vessel_store, mmsi: str, hours: int = 6) -> list[dict]:
"""vessel_store에서 특정 MMSI의 최근 N시간 궤적 추출 (벡터화)."""
df = vessel_store._tracks.get(mmsi)
if df is None or len(df) == 0:
return []
import pandas as pd
now = datetime.now(timezone.utc)
cutoff = now - pd.Timedelta(hours=hours)
ts_col = df['timestamp']
if hasattr(ts_col.dtype, 'tz') and ts_col.dtype.tz is not None:
mask = ts_col >= pd.Timestamp(cutoff)
else:
mask = ts_col >= pd.Timestamp(cutoff.replace(tzinfo=None))
recent = df.loc[mask]
if recent.empty:
return []
# 벡터화 추출 (iterrows 대신)
lats = recent['lat'].values
lons = recent['lon'].values
sogs = (recent['sog'] if 'sog' in recent.columns
else recent.get('raw_sog', pd.Series(dtype=float))).fillna(0).values
cogs = (recent['cog'] if 'cog' in recent.columns
else pd.Series(0, index=recent.index)).fillna(0).values
timestamps = recent['timestamp'].tolist()
return [
{'lat': float(lats[i]), 'lon': float(lons[i]),
'sog': float(sogs[i]), 'cog': float(cogs[i]), 'timestamp': timestamps[i]}
for i in range(len(lats))
]
def _compute_gear_active_ratio(
gear_members: list[dict],
all_positions: dict[str, dict],
now: datetime,
stale_sec: float = 3600,
) -> float:
"""어구 그룹의 활성 멤버 비율."""
if not gear_members:
return 0.0
active = 0
for m in gear_members:
pos = all_positions.get(m['mmsi'])
if pos is None:
continue
ts = pos.get('timestamp')
if ts is None:
continue
if isinstance(ts, datetime):
last_dt = ts if ts.tzinfo is not None else ts.replace(tzinfo=timezone.utc)
else:
try:
import pandas as pd
last_dt = pd.Timestamp(ts).to_pydatetime()
if last_dt.tzinfo is None:
last_dt = last_dt.replace(tzinfo=timezone.utc)
except Exception:
continue
age = (now - last_dt).total_seconds()
if age < stale_sec:
active += 1
return active / len(gear_members)
def _is_gear_pattern(name: str) -> bool:
"""어구 이름 패턴 판별."""
import re
return bool(re.match(r'^.+_\d+_\d*$', name or ''))
_MAX_CANDIDATES_PER_GROUP = 30 # 후보 수 상한 (성능 보호)
def run_gear_correlation(
vessel_store,
gear_groups: list[dict],
conn,
) -> dict:
"""어구 연관성 분석 메인 실행 (배치 최적화).
Args:
vessel_store: VesselStore 인스턴스
gear_groups: detect_gear_groups() 결과
conn: kcgdb 커넥션
Returns:
{'updated': int, 'models': int, 'raw_inserted': int}
"""
import time as _time
import re as _re
_gear_re = _re.compile(r'^.+_(?=\S*\d)\S+(?:[_ ]\S*)*[_ ]*$|^.+%$|^\d+$')
t0 = _time.time()
now = datetime.now(timezone.utc)
all_positions = vessel_store.get_all_latest_positions()
# 활성 모델 로드
models = _load_active_models(conn)
if not models:
logger.warning('no active correlation models found')
return {'updated': 0, 'models': 0, 'raw_inserted': 0}
# 기존 점수 전체 사전 로드 (건별 쿼리 대신 벌크)
all_scores = _load_all_scores(conn)
raw_batch: list[tuple] = []
score_batch: list[tuple] = []
total_updated = 0
total_raw = 0
processed_keys: set[tuple] = set() # (model_id, parent_name, sub_cluster_id, target_mmsi)
default_params = models[0]
for gear_group in gear_groups:
parent_name = gear_group['parent_name']
sub_cluster_id = gear_group.get('sub_cluster_id', 0)
members = gear_group['members']
if not members:
continue
# 1h 활성 멤버 필터 (center/radius 계산용)
display_members = [
m for m in members
if _get_time_bucket_age(m.get('mmsi'), all_positions, now) <= 3600
]
# fallback: < 2이면 time_bucket 최신 2개 유지
if len(display_members) < 2 and len(members) >= 2:
display_members = sorted(
members,
key=lambda m: _get_time_bucket_age(m.get('mmsi'), all_positions, now),
)[:2]
active_members = display_members if len(display_members) >= 2 else members
# 그룹 중심 + 반경 (1h 활성 멤버 기반)
center_lat = sum(m['lat'] for m in active_members) / len(active_members)
center_lon = sum(m['lon'] for m in active_members) / len(active_members)
group_radius = _compute_group_radius(active_members)
# 어구 활성도
active_ratio = _compute_gear_active_ratio(members, all_positions, now)
# 그룹 멤버 MMSI 셋
group_mmsis = {m['mmsi'] for m in members}
if gear_group.get('parent_mmsi'):
group_mmsis.add(gear_group['parent_mmsi'])
# 후보 필터링 + 수 제한
candidates = find_candidates(
center_lat, center_lon, group_radius,
group_mmsis, all_positions, default_params,
)
if not candidates:
continue
if len(candidates) > _MAX_CANDIDATES_PER_GROUP:
# 가까운 순서로 제한
candidates.sort(key=lambda m: _haversine_nm(
center_lat, center_lon,
all_positions[m]['lat'], all_positions[m]['lon'],
))
candidates = candidates[:_MAX_CANDIDATES_PER_GROUP]
for target_mmsi in candidates:
target_pos = all_positions.get(target_mmsi)
if target_pos is None:
continue
target_name = target_pos.get('name', '')
target_is_gear = bool(_gear_re.match(target_name or ''))
target_type = 'GEAR_BUOY' if target_is_gear else 'VESSEL'
# 메트릭 계산 (어구는 단순 거리, 선박은 track 기반)
if target_is_gear:
d = _haversine_nm(center_lat, center_lon,
target_pos['lat'], target_pos['lon'])
prox = max(0.0, 1.0 - d / 20.0)
metrics = {'proximity_ratio': prox, 'composite': prox}
else:
vessel_track = _get_vessel_track(vessel_store, target_mmsi, hours=6)
metrics = _compute_gear_vessel_metrics(
center_lat, center_lon, group_radius,
vessel_track, default_params,
)
# raw 메트릭 배치 수집
raw_batch.append((
now, parent_name, sub_cluster_id, target_mmsi, target_type, target_name,
metrics.get('proximity_ratio'), metrics.get('visit_score'),
metrics.get('activity_sync'), metrics.get('dtw_similarity'),
metrics.get('speed_correlation'), metrics.get('heading_coherence'),
metrics.get('drift_similarity'), False, False, active_ratio,
))
total_raw += 1
# 모델별 EMA 업데이트
for model in models:
if target_is_gear:
composite = metrics.get('proximity_ratio', 0) * model.w_prox_persist
else:
composite = (
model.w_proximity * (metrics.get('proximity_ratio') or 0)
+ model.w_visit * (metrics.get('visit_score') or 0)
+ model.w_activity * (metrics.get('activity_sync') or 0)
)
# 사전 로드된 점수에서 조회 (DB 쿼리 없음)
score_key = (model.model_id, parent_name, sub_cluster_id, target_mmsi)
prev = all_scores.get(score_key)
prev_score = prev['current_score'] if prev else None
streak = prev['streak_count'] if prev else 0
last_obs = prev['last_observed_at'] if prev else None
new_score, new_streak, state = update_score(
prev_score, composite, streak,
last_obs, now, active_ratio,
0.0, model,
)
processed_keys.add(score_key)
if new_score >= model.track_threshold or prev is not None:
score_batch.append((
model.model_id, parent_name, sub_cluster_id, target_mmsi,
target_type, target_name,
round(new_score, 6), new_streak, state,
now, now, now,
))
total_updated += 1
# ── 반경 밖 이탈 선박 강제 감쇠 ──────────────────────────────────
# all_scores에 기록이 있지만 이번 사이클 후보에서 빠진 항목:
# 선박이 탐색 반경(group_radius × 3)을 완전히 벗어난 경우.
# Freeze 조건 무시하고 decay_fast 적용 → 빠르게 0으로 수렴.
for score_key, prev in all_scores.items():
if score_key in processed_keys:
continue
prev_score = prev['current_score']
if prev_score is None or prev_score <= 0:
continue
model_id, parent_name_s, sub_cluster_id_s, target_mmsi_s = score_key
# 해당 모델의 decay_fast 파라미터 사용
model_params = next((m for m in models if m.model_id == model_id), default_params)
new_score = max(0.0, prev_score - model_params.decay_fast)
score_batch.append((
model_id, parent_name_s, sub_cluster_id_s, target_mmsi_s,
prev.get('target_type', 'VESSEL'), prev.get('target_name', ''),
round(new_score, 6), 0, 'OUT_OF_RANGE',
prev.get('last_observed_at', now), now, now,
))
total_updated += 1
# 배치 DB 저장
_batch_insert_raw(conn, raw_batch)
_batch_upsert_scores(conn, score_batch)
conn.commit()
elapsed = round(_time.time() - t0, 2)
logger.info(
'gear correlation internals: %.2fs, %d groups, %d raw, %d scores, %d models',
elapsed, len(gear_groups), total_raw, total_updated, len(models),
)
return {
'updated': total_updated,
'models': len(models),
'raw_inserted': total_raw,
}
# ── DB 헬퍼 (배치 최적화) ─────────────────────────────────────────
def _load_active_models(conn) -> list[ModelParams]:
"""활성 모델 로드."""
cur = conn.cursor()
try:
cur.execute(
f"SELECT id, name, params FROM {CORRELATION_PARAM_MODELS} "
"WHERE is_active = TRUE ORDER BY is_default DESC, id ASC"
)
rows = cur.fetchall()
models = []
for row in rows:
import json
params = row[2] if isinstance(row[2], dict) else json.loads(row[2])
models.append(ModelParams.from_db_row({
'id': row[0], 'name': row[1], 'params': params,
}))
return models
except Exception as e:
logger.error('failed to load models: %s', e)
return [ModelParams()]
finally:
cur.close()
def _load_all_scores(conn) -> dict[tuple, dict]:
"""모든 점수를 사전 로드. {(model_id, group_key, sub_cluster_id, target_mmsi): {...}}"""
cur = conn.cursor()
try:
cur.execute(
"SELECT model_id, group_key, sub_cluster_id, target_mmsi, "
"current_score, streak_count, last_observed_at, "
"target_type, target_name "
f"FROM {GEAR_CORRELATION_SCORES}"
)
result = {}
for row in cur.fetchall():
key = (row[0], row[1], row[2], row[3])
result[key] = {
'current_score': row[4],
'streak_count': row[5],
'last_observed_at': row[6],
'target_type': row[7],
'target_name': row[8],
}
return result
except Exception as e:
logger.warning('failed to load all scores: %s', e)
return {}
finally:
cur.close()
def _batch_insert_raw(conn, batch: list[tuple]):
"""raw 메트릭 배치 INSERT."""
if not batch:
return
cur = conn.cursor()
try:
from psycopg2.extras import execute_values
execute_values(
cur,
f"""INSERT INTO {GEAR_CORRELATION_RAW_METRICS}
(observed_at, group_key, sub_cluster_id, target_mmsi, target_type, target_name,
proximity_ratio, visit_score, activity_sync,
dtw_similarity, speed_correlation, heading_coherence,
drift_similarity, shadow_stay, shadow_return,
gear_group_active_ratio)
VALUES %s""",
batch,
page_size=500,
)
except Exception as e:
logger.warning('batch insert raw failed: %s', e)
finally:
cur.close()
def _batch_upsert_scores(conn, batch: list[tuple]):
"""점수 배치 UPSERT."""
if not batch:
return
cur = conn.cursor()
try:
from psycopg2.extras import execute_values
execute_values(
cur,
f"""INSERT INTO {GEAR_CORRELATION_SCORES}
(model_id, group_key, sub_cluster_id, target_mmsi, target_type, target_name,
current_score, streak_count, freeze_state,
first_observed_at, last_observed_at, updated_at)
VALUES %s
ON CONFLICT (model_id, group_key, sub_cluster_id, target_mmsi)
DO UPDATE SET
target_type = EXCLUDED.target_type,
target_name = EXCLUDED.target_name,
current_score = EXCLUDED.current_score,
streak_count = EXCLUDED.streak_count,
freeze_state = EXCLUDED.freeze_state,
observation_count = {GEAR_CORRELATION_SCORES}.observation_count + 1,
last_observed_at = EXCLUDED.last_observed_at,
updated_at = EXCLUDED.updated_at""",
batch,
page_size=500,
)
except Exception as e:
logger.warning('batch upsert scores failed: %s', e)
finally:
cur.close()