kcg-ai-monitoring/prediction/algorithms/gear_correlation.py
htlee da37a00b8e fix: prediction 5가지 이슈 수정 — 모든 파이프라인 정상 동작
## 이슈 1: gear_correlation Decimal → float TypeError
- prediction/algorithms/gear_correlation.py:785
- _load_all_scores()가 NUMERIC 컬럼을 Decimal로 읽어 float 상수와 연산 시 실패
- float() 명시 변환으로 수정
- 효과: gear correlation 24,474 raw metrics + 3,966 scores 정상 기록

## 이슈 2: violation_classifier classified=0 문제
- prediction/output/violation_classifier.py
- result.get('id')는 AnalysisResult에 없어 항상 None → 모든 UPDATE 건너뜀
- 존재하지 않는 permit_status/gear_judgment 필드에 의존
- (mmsi, analyzed_at) 기준 UPDATE로 변경
- 중국 선박(412/413*) + EEZ 진입은 permit 없어도 EEZ_VIOLATION 판정
- 효과: classified=0 → classified=4~6/cycle

## 이슈 3: kpi_writer 모두 0 (tracking_active 외)
- prediction/output/kpi_writer.py:27
- date.today() + timezone.utc 혼용 → 현재 시각이 UTC로는 아직 '어제'라 '오늘 >= today_start' 쿼리가 0 반환
- KST 기준으로 today_start 계산
- 효과: realtime_detection 0 → 7,107, illegal_transship 0 → 5,033

## 이슈 4: stats_daily 오늘 0건
- prediction/output/stats_aggregator.py:96, 194
- aggregate_daily/monthly가 UTC 경계 사용
- KST 기준 자정으로 수정
- 효과: 2026-04-08 detections 0 → 543,656, events 0 → 5,253

## 이슈 5: parent workflow 테이블 누락 컬럼 (V005 ↔ prediction 불일치)
V016 마이그레이션으로 일괄 추가:
- gear_parent_label_sessions: label_parent_name, normalized_parent_name,
  duration_days, actor, comment, metadata, updated_at 등 8개 컬럼
- gear_group_parent_resolution: parent_name, normalized_parent_name,
  selected_parent_name, confidence, decision_source, top_score, second_score,
  score_margin, stable_cycles, evidence_summary, episode_id, continuity_*,
  prior_bonus_total, last_evaluated_at, last_promoted_at 등 17개 컬럼
- gear_parent_candidate_exclusions: normalized_parent_name, reason_type,
  duration_days, metadata, updated_at, active_from, active_until +
  candidate_mmsi GENERATED ALWAYS AS (excluded_mmsi) 별칭
- gear_group_parent_candidate_snapshots: parent_name

효과: gear parent inference: 925 groups, 301 direct-match, 1329 candidates,
      188 review-required, 925 episode-snapshots 기록 — 전체 모선 워크플로우 정상

## 검증 결과 (e2e)

- analysis cycle: 6,824 vessels, 112초/cycle 정상
- vessel_analysis_results: 10분 13,650건, 총 125만건
- prediction_events: 1시간 138건, 총 12,258건
- prediction_alerts: 1시간 183건
- gear_correlation_scores: 3,966건
- gear_group_parent_resolution: 926건
- stats_hourly: 17행, stats_daily: 오늘 543,656건
- 백엔드 Flyway V016 정상 적용

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 06:47:53 +09:00

856 lines
30 KiB
Python
Raw Blame 히스토리

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""어구 그룹 다단계 연관성 분석 — 멀티모델 패턴 추적.
Phase 1: default 모델 1개로 동작 (DB에서 is_active=true 모델 로드).
Phase 2: 글로벌 모델 max 5개 병렬 실행.
어구 중심 점수 체계:
- 어구 신호 기준 관측 윈도우 (어구 비활성 시 FREEZE)
- 선박 shadow 추적 (비활성 → 활성 전환 시 보너스)
- 적응형 EMA + streak 자기강화
- 퍼센트 기반 무제한 추적 (50%+)
"""
from __future__ import annotations
import logging
import math
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
from algorithms.polygon_builder import _get_time_bucket_age
from config import qualified_table
logger = logging.getLogger(__name__)
# ── 상수 ──────────────────────────────────────────────────────────
_EARTH_RADIUS_NM = 3440.065
_NM_TO_M = 1852.0
CORRELATION_PARAM_MODELS = qualified_table('correlation_param_models')
GEAR_CORRELATION_SCORES = qualified_table('gear_correlation_scores')
GEAR_CORRELATION_RAW_METRICS = qualified_table('gear_correlation_raw_metrics')
# ── 파라미터 모델 ─────────────────────────────────────────────────
@dataclass
class ModelParams:
"""추적 모델의 전체 파라미터셋."""
model_id: int = 1
name: str = 'default'
# EMA
alpha_base: float = 0.30
alpha_min: float = 0.08
alpha_decay_per_streak: float = 0.005
# 임계값
track_threshold: float = 0.50
polygon_threshold: float = 0.70
# 메트릭 가중치 — 어구-선박
w_proximity: float = 0.45
w_visit: float = 0.35
w_activity: float = 0.20
# 메트릭 가중치 — 선박-선박
w_dtw: float = 0.30
w_sog_corr: float = 0.20
w_heading: float = 0.25
w_prox_vv: float = 0.25
# 메트릭 가중치 — 어구-어구
w_prox_persist: float = 0.50
w_drift: float = 0.30
w_signal_sync: float = 0.20
# Freeze 기준
group_quiet_ratio: float = 0.30
normal_gap_hours: float = 1.0
# 감쇠
decay_slow: float = 0.025
decay_fast: float = 0.10
stale_hours: float = 6.0
# Shadow
shadow_stay_bonus: float = 0.10
shadow_return_bonus: float = 0.15
# 거리
candidate_radius_factor: float = 3.0
proximity_threshold_nm: float = 5.0
visit_threshold_nm: float = 5.0
# 야간
night_bonus: float = 1.3
# 장기 감쇠
long_decay_days: float = 7.0
@classmethod
def from_db_row(cls, row: dict) -> ModelParams:
"""DB correlation_param_models 행에서 생성."""
params_json = row.get('params', {})
return cls(
model_id=row['id'],
name=row['name'],
**{k: v for k, v in params_json.items() if hasattr(cls, k)},
)
# ── Haversine 거리 ────────────────────────────────────────────────
def _haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""두 좌표 간 거리 (해리)."""
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
return _EARTH_RADIUS_NM * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
# ── Freeze 판단 ───────────────────────────────────────────────────
def should_freeze(
gear_group_active_ratio: float,
target_last_observed: Optional[datetime],
now: datetime,
params: ModelParams,
) -> tuple[bool, str]:
"""감쇠 적용 여부 판단. 어구 그룹이 기준."""
# 1. 어구 그룹 비활성 → 비교 불가
if gear_group_active_ratio < params.group_quiet_ratio:
return True, 'GROUP_QUIET'
# 2. 개별 부재가 정상 범위
if target_last_observed is not None:
hours_absent = (now - target_last_observed).total_seconds() / 3600
if hours_absent < params.normal_gap_hours:
return True, 'NORMAL_GAP'
return False, 'ACTIVE'
# ── EMA 업데이트 ──────────────────────────────────────────────────
def update_score(
prev_score: Optional[float],
raw_score: Optional[float],
streak: int,
last_observed: Optional[datetime],
now: datetime,
gear_group_active_ratio: float,
shadow_bonus: float,
params: ModelParams,
) -> tuple[float, int, str]:
"""적응형 EMA 점수 업데이트.
Returns: (new_score, new_streak, state)
"""
# 관측 불가
if raw_score is None:
frz, reason = should_freeze(
gear_group_active_ratio, last_observed, now, params,
)
if frz:
return (prev_score or 0.0), streak, reason
# 실제 이탈 → 감쇠
hours_absent = 0.0
if last_observed is not None:
hours_absent = (now - last_observed).total_seconds() / 3600
decay = params.decay_fast if hours_absent > params.stale_hours else params.decay_slow
return max(0.0, (prev_score or 0.0) - decay), 0, 'SIGNAL_LOSS'
# Shadow 보너스
adjusted = min(1.0, raw_score + shadow_bonus)
# Case 1: 임계값 이상 → streak 보상
if adjusted >= params.track_threshold:
streak += 1
alpha = max(params.alpha_min,
params.alpha_base - streak * params.alpha_decay_per_streak)
if prev_score is None:
return adjusted, streak, 'ACTIVE'
return alpha * adjusted + (1.0 - alpha) * prev_score, streak, 'ACTIVE'
# Case 2: 패턴 이탈
alpha = params.alpha_base
if prev_score is None:
return adjusted, 0, 'PATTERN_DIVERGE'
return alpha * adjusted + (1.0 - alpha) * prev_score, 0, 'PATTERN_DIVERGE'
# ── 어구-선박 메트릭 ──────────────────────────────────────────────
def _compute_gear_vessel_metrics(
gear_center_lat: float,
gear_center_lon: float,
gear_radius_nm: float,
vessel_track: list[dict],
params: ModelParams,
) -> dict:
"""어구 그룹 중심 vs 선박 궤적 메트릭.
vessel_track: [{lat, lon, sog, cog, timestamp}, ...]
"""
if not vessel_track:
return {'proximity_ratio': 0, 'visit_score': 0, 'activity_sync': 0, 'composite': 0}
threshold_nm = max(gear_radius_nm * 2, params.proximity_threshold_nm)
# 1. proximity_ratio — 거리 구간별 차등 점수
_PROX_CLOSE_NM = 2.5
_PROX_NEAR_NM = 5.0
_PROX_FAR_NM = 10.0
prox_total = 0.0
for p in vessel_track:
d = _haversine_nm(gear_center_lat, gear_center_lon, p['lat'], p['lon'])
if d < _PROX_CLOSE_NM:
prox_total += 1.0
elif d < _PROX_NEAR_NM:
prox_total += 0.5
elif d < _PROX_FAR_NM:
prox_total += 0.15
proximity_ratio = prox_total / len(vessel_track)
# 2. visit_score — 방문 패턴 (3NM 임계, 8회 기준)
_VISIT_THRESHOLD_NM = 3.0
_VISIT_MAX = 8.0
in_zone = False
visits = 0
stay_points = 0
consecutive_stay = 0
stay_bonus = 0.0
away_points = 0
for p in vessel_track:
d = _haversine_nm(gear_center_lat, gear_center_lon, p['lat'], p['lon'])
if d < _VISIT_THRESHOLD_NM:
if not in_zone:
visits += 1
in_zone = True
consecutive_stay = 0
stay_points += 1
consecutive_stay += 1
if consecutive_stay >= 3:
stay_bonus += 0.05 # 연속 체류 보너스
else:
in_zone = False
consecutive_stay = 0
away_points += 1
visit_count_norm = min(1.0, visits / _VISIT_MAX) if visits > 0 else 0.0
total = stay_points + away_points
stay_ratio = stay_points / total if total > 0 else 0.0
visit_score = min(1.0, 0.5 * visit_count_norm + 0.5 * stay_ratio + stay_bonus)
# 3. activity_sync — 이중 판정 (저속 조업 + 고속 조업)
_MIN_ACTIVITY_POINTS = 6
in_zone_count = 0
activity_total = 0.0
for p in vessel_track:
d = _haversine_nm(gear_center_lat, gear_center_lon, p['lat'], p['lon'])
if d < _PROX_NEAR_NM:
in_zone_count += 1
sog = p.get('sog', 0) or 0
if sog < 3.0:
activity_total += 1.0 # 저속 조업 (정박/어구 관리)
elif sog <= 7.0:
activity_total += 0.6 # 고속 조업 (쌍끌이/예인)
# else: 이동 중 → 0
activity_sync = (activity_total / in_zone_count) if in_zone_count >= _MIN_ACTIVITY_POINTS else 0.0
# 가중 합산
composite = (
params.w_proximity * proximity_ratio
+ params.w_visit * visit_score
+ params.w_activity * activity_sync
)
return {
'proximity_ratio': round(proximity_ratio, 4),
'visit_score': round(visit_score, 4),
'activity_sync': round(activity_sync, 4),
'composite': round(composite, 4),
}
# ── 선박-선박 메트릭 ──────────────────────────────────────────────
def _compute_vessel_vessel_metrics(
track_a: list[dict],
track_b: list[dict],
params: ModelParams,
) -> dict:
"""두 선박 궤적 간 메트릭."""
from algorithms.track_similarity import (
compute_heading_coherence,
compute_proximity_ratio,
compute_sog_correlation,
compute_track_similarity,
)
if not track_a or not track_b:
return {
'dtw_similarity': 0, 'speed_correlation': 0,
'heading_coherence': 0, 'proximity_ratio': 0, 'composite': 0,
}
# DTW
pts_a = [(p['lat'], p['lon']) for p in track_a]
pts_b = [(p['lat'], p['lon']) for p in track_b]
dtw_sim = compute_track_similarity(pts_a, pts_b)
# SOG 상관
sog_a = [p.get('sog', 0) for p in track_a]
sog_b = [p.get('sog', 0) for p in track_b]
sog_corr = compute_sog_correlation(sog_a, sog_b)
# COG 동조
cog_a = [p.get('cog', 0) for p in track_a]
cog_b = [p.get('cog', 0) for p in track_b]
heading = compute_heading_coherence(cog_a, cog_b)
# 근접비
prox = compute_proximity_ratio(pts_a, pts_b, params.proximity_threshold_nm)
composite = (
params.w_dtw * dtw_sim
+ params.w_sog_corr * sog_corr
+ params.w_heading * heading
+ params.w_prox_vv * prox
)
return {
'dtw_similarity': round(dtw_sim, 4),
'speed_correlation': round(sog_corr, 4),
'heading_coherence': round(heading, 4),
'proximity_ratio': round(prox, 4),
'composite': round(composite, 4),
}
# ── 어구-어구 메트릭 ──────────────────────────────────────────────
def _compute_gear_gear_metrics(
center_a: tuple[float, float],
center_b: tuple[float, float],
center_history_a: list[dict],
center_history_b: list[dict],
params: ModelParams,
) -> dict:
"""두 어구 그룹 간 메트릭."""
if not center_history_a or not center_history_b:
return {
'proximity_ratio': 0, 'drift_similarity': 0,
'composite': 0,
}
# 1. 근접 지속성 — 현재 중심 간 거리의 안정성
dist_nm = _haversine_nm(center_a[0], center_a[1], center_b[0], center_b[1])
prox_persist = max(0.0, 1.0 - dist_nm / 20.0) # 20NM 이상이면 0
# 2. 표류 유사도 — 중심 이동 벡터 코사인 유사도
drift_sim = 0.0
n = min(len(center_history_a), len(center_history_b))
if n >= 2:
# 마지막 2점으로 이동 벡터 계산
da_lat = center_history_a[-1].get('lat', 0) - center_history_a[-2].get('lat', 0)
da_lon = center_history_a[-1].get('lon', 0) - center_history_a[-2].get('lon', 0)
db_lat = center_history_b[-1].get('lat', 0) - center_history_b[-2].get('lat', 0)
db_lon = center_history_b[-1].get('lon', 0) - center_history_b[-2].get('lon', 0)
dot = da_lat * db_lat + da_lon * db_lon
mag_a = (da_lat ** 2 + da_lon ** 2) ** 0.5
mag_b = (db_lat ** 2 + db_lon ** 2) ** 0.5
if mag_a > 1e-10 and mag_b > 1e-10:
cos_sim = dot / (mag_a * mag_b)
drift_sim = max(0.0, (cos_sim + 1.0) / 2.0)
composite = (
params.w_prox_persist * prox_persist
+ params.w_drift * drift_sim
)
return {
'proximity_ratio': round(prox_persist, 4),
'drift_similarity': round(drift_sim, 4),
'composite': round(composite, 4),
}
# ── Shadow 보너스 계산 ────────────────────────────────────────────
def compute_shadow_bonus(
vessel_positions_during_inactive: list[dict],
last_known_gear_center: tuple[float, float],
group_radius_nm: float,
params: ModelParams,
) -> tuple[float, bool, bool]:
"""어구 비활성 동안 선박이 어구 근처에 머물렀는지 평가.
Returns: (bonus, stayed_nearby, returned_before_resume)
"""
if not vessel_positions_during_inactive or last_known_gear_center is None:
return 0.0, False, False
gc_lat, gc_lon = last_known_gear_center
threshold_nm = max(group_radius_nm * 2, params.proximity_threshold_nm)
# 1. 평균 거리
dists = [
_haversine_nm(gc_lat, gc_lon, p['lat'], p['lon'])
for p in vessel_positions_during_inactive
]
avg_dist = sum(dists) / len(dists)
stayed = avg_dist < threshold_nm
# 2. 마지막 위치가 근처인지 (복귀 판단)
returned = dists[-1] < threshold_nm if dists else False
bonus = 0.0
if stayed:
bonus += params.shadow_stay_bonus
if returned:
bonus += params.shadow_return_bonus
return bonus, stayed, returned
# ── 후보 필터링 ───────────────────────────────────────────────────
def _compute_group_radius(members: list[dict]) -> float:
"""그룹 멤버 간 최대 거리의 절반 (NM)."""
if len(members) < 2:
return 1.0 # 최소 1NM
max_dist = 0.0
for i in range(len(members)):
for j in range(i + 1, len(members)):
d = _haversine_nm(
members[i]['lat'], members[i]['lon'],
members[j]['lat'], members[j]['lon'],
)
if d > max_dist:
max_dist = d
return max(1.0, max_dist / 2.0)
def find_candidates(
gear_center_lat: float,
gear_center_lon: float,
group_radius_nm: float,
group_mmsis: set[str],
all_positions: dict[str, dict],
params: ModelParams,
) -> list[str]:
"""어구 그룹 주변 후보 MMSI 필터링."""
search_radius = group_radius_nm * params.candidate_radius_factor
candidates = []
for mmsi, pos in all_positions.items():
if mmsi in group_mmsis:
continue
d = _haversine_nm(gear_center_lat, gear_center_lon, pos['lat'], pos['lon'])
if d < search_radius:
candidates.append(mmsi)
return candidates
# ── 메인 실행 ─────────────────────────────────────────────────────
def _get_vessel_track(vessel_store, mmsi: str, hours: int = 6) -> list[dict]:
"""vessel_store에서 특정 MMSI의 최근 N시간 궤적 추출 (벡터화)."""
df = vessel_store._tracks.get(mmsi)
if df is None or len(df) == 0:
return []
import pandas as pd
now = datetime.now(timezone.utc)
cutoff = now - pd.Timedelta(hours=hours)
ts_col = df['timestamp']
if hasattr(ts_col.dtype, 'tz') and ts_col.dtype.tz is not None:
mask = ts_col >= pd.Timestamp(cutoff)
else:
mask = ts_col >= pd.Timestamp(cutoff.replace(tzinfo=None))
recent = df.loc[mask]
if recent.empty:
return []
# 벡터화 추출 (iterrows 대신)
lats = recent['lat'].values
lons = recent['lon'].values
sogs = (recent['sog'] if 'sog' in recent.columns
else recent.get('raw_sog', pd.Series(dtype=float))).fillna(0).values
cogs = (recent['cog'] if 'cog' in recent.columns
else pd.Series(0, index=recent.index)).fillna(0).values
timestamps = recent['timestamp'].tolist()
return [
{'lat': float(lats[i]), 'lon': float(lons[i]),
'sog': float(sogs[i]), 'cog': float(cogs[i]), 'timestamp': timestamps[i]}
for i in range(len(lats))
]
def _compute_gear_active_ratio(
gear_members: list[dict],
all_positions: dict[str, dict],
now: datetime,
stale_sec: float = 3600,
) -> float:
"""어구 그룹의 활성 멤버 비율."""
if not gear_members:
return 0.0
active = 0
for m in gear_members:
pos = all_positions.get(m['mmsi'])
if pos is None:
continue
ts = pos.get('timestamp')
if ts is None:
continue
if isinstance(ts, datetime):
last_dt = ts if ts.tzinfo is not None else ts.replace(tzinfo=timezone.utc)
else:
try:
import pandas as pd
last_dt = pd.Timestamp(ts).to_pydatetime()
if last_dt.tzinfo is None:
last_dt = last_dt.replace(tzinfo=timezone.utc)
except Exception:
continue
age = (now - last_dt).total_seconds()
if age < stale_sec:
active += 1
return active / len(gear_members)
def _is_gear_pattern(name: str) -> bool:
"""어구 이름 패턴 판별."""
import re
return bool(re.match(r'^.+_\d+_\d*$', name or ''))
_MAX_CANDIDATES_PER_GROUP = 30 # 후보 수 상한 (성능 보호)
def run_gear_correlation(
vessel_store,
gear_groups: list[dict],
conn,
) -> dict:
"""어구 연관성 분석 메인 실행 (배치 최적화).
Args:
vessel_store: VesselStore 인스턴스
gear_groups: detect_gear_groups() 결과
conn: kcgdb 커넥션
Returns:
{'updated': int, 'models': int, 'raw_inserted': int}
"""
import time as _time
import re as _re
_gear_re = _re.compile(r'^.+_(?=\S*\d)\S+(?:[_ ]\S*)*[_ ]*$|^.+%$|^\d+$')
t0 = _time.time()
now = datetime.now(timezone.utc)
all_positions = vessel_store.get_all_latest_positions()
# 활성 모델 로드
models = _load_active_models(conn)
if not models:
logger.warning('no active correlation models found')
return {'updated': 0, 'models': 0, 'raw_inserted': 0}
# 기존 점수 전체 사전 로드 (건별 쿼리 대신 벌크)
all_scores = _load_all_scores(conn)
raw_batch: list[tuple] = []
score_batch: list[tuple] = []
total_updated = 0
total_raw = 0
processed_keys: set[tuple] = set() # (model_id, parent_name, sub_cluster_id, target_mmsi)
default_params = models[0]
for gear_group in gear_groups:
parent_name = gear_group['parent_name']
sub_cluster_id = gear_group.get('sub_cluster_id', 0)
members = gear_group['members']
if not members:
continue
# 1h 활성 멤버 필터 (center/radius 계산용)
display_members = [
m for m in members
if _get_time_bucket_age(m.get('mmsi'), all_positions, now) <= 3600
]
# fallback: < 2이면 time_bucket 최신 2개 유지
if len(display_members) < 2 and len(members) >= 2:
display_members = sorted(
members,
key=lambda m: _get_time_bucket_age(m.get('mmsi'), all_positions, now),
)[:2]
active_members = display_members if len(display_members) >= 2 else members
# 그룹 중심 + 반경 (1h 활성 멤버 기반)
center_lat = sum(m['lat'] for m in active_members) / len(active_members)
center_lon = sum(m['lon'] for m in active_members) / len(active_members)
group_radius = _compute_group_radius(active_members)
# 어구 활성도
active_ratio = _compute_gear_active_ratio(members, all_positions, now)
# 그룹 멤버 MMSI 셋
group_mmsis = {m['mmsi'] for m in members}
if gear_group.get('parent_mmsi'):
group_mmsis.add(gear_group['parent_mmsi'])
# 후보 필터링 + 수 제한
candidates = find_candidates(
center_lat, center_lon, group_radius,
group_mmsis, all_positions, default_params,
)
if not candidates:
continue
if len(candidates) > _MAX_CANDIDATES_PER_GROUP:
# 가까운 순서로 제한
candidates.sort(key=lambda m: _haversine_nm(
center_lat, center_lon,
all_positions[m]['lat'], all_positions[m]['lon'],
))
candidates = candidates[:_MAX_CANDIDATES_PER_GROUP]
for target_mmsi in candidates:
target_pos = all_positions.get(target_mmsi)
if target_pos is None:
continue
target_name = target_pos.get('name', '')
target_is_gear = bool(_gear_re.match(target_name or ''))
target_type = 'GEAR_BUOY' if target_is_gear else 'VESSEL'
# 메트릭 계산 (어구는 단순 거리, 선박은 track 기반)
if target_is_gear:
d = _haversine_nm(center_lat, center_lon,
target_pos['lat'], target_pos['lon'])
prox = max(0.0, 1.0 - d / 20.0)
metrics = {'proximity_ratio': prox, 'composite': prox}
else:
vessel_track = _get_vessel_track(vessel_store, target_mmsi, hours=6)
metrics = _compute_gear_vessel_metrics(
center_lat, center_lon, group_radius,
vessel_track, default_params,
)
# raw 메트릭 배치 수집
raw_batch.append((
now, parent_name, sub_cluster_id, target_mmsi, target_type, target_name,
metrics.get('proximity_ratio'), metrics.get('visit_score'),
metrics.get('activity_sync'), metrics.get('dtw_similarity'),
metrics.get('speed_correlation'), metrics.get('heading_coherence'),
metrics.get('drift_similarity'), False, False, active_ratio,
))
total_raw += 1
# 모델별 EMA 업데이트
for model in models:
if target_is_gear:
composite = metrics.get('proximity_ratio', 0) * model.w_prox_persist
else:
composite = (
model.w_proximity * (metrics.get('proximity_ratio') or 0)
+ model.w_visit * (metrics.get('visit_score') or 0)
+ model.w_activity * (metrics.get('activity_sync') or 0)
)
# 사전 로드된 점수에서 조회 (DB 쿼리 없음)
score_key = (model.model_id, parent_name, sub_cluster_id, target_mmsi)
prev = all_scores.get(score_key)
prev_score = prev['current_score'] if prev else None
streak = prev['streak_count'] if prev else 0
last_obs = prev['last_observed_at'] if prev else None
new_score, new_streak, state = update_score(
prev_score, composite, streak,
last_obs, now, active_ratio,
0.0, model,
)
processed_keys.add(score_key)
if new_score >= model.track_threshold or prev is not None:
score_batch.append((
model.model_id, parent_name, sub_cluster_id, target_mmsi,
target_type, target_name,
round(new_score, 6), new_streak, state,
now, now, now,
))
total_updated += 1
# ── 반경 밖 이탈 선박 강제 감쇠 ──────────────────────────────────
# all_scores에 기록이 있지만 이번 사이클 후보에서 빠진 항목:
# 선박이 탐색 반경(group_radius × 3)을 완전히 벗어난 경우.
# Freeze 조건 무시하고 decay_fast 적용 → 빠르게 0으로 수렴.
for score_key, prev in all_scores.items():
if score_key in processed_keys:
continue
prev_score = prev['current_score']
if prev_score is None or prev_score <= 0:
continue
model_id, parent_name_s, sub_cluster_id_s, target_mmsi_s = score_key
# 해당 모델의 decay_fast 파라미터 사용
model_params = next((m for m in models if m.model_id == model_id), default_params)
new_score = max(0.0, prev_score - model_params.decay_fast)
score_batch.append((
model_id, parent_name_s, sub_cluster_id_s, target_mmsi_s,
prev.get('target_type', 'VESSEL'), prev.get('target_name', ''),
round(new_score, 6), 0, 'OUT_OF_RANGE',
prev.get('last_observed_at', now), now, now,
))
total_updated += 1
# 배치 DB 저장
_batch_insert_raw(conn, raw_batch)
_batch_upsert_scores(conn, score_batch)
conn.commit()
elapsed = round(_time.time() - t0, 2)
logger.info(
'gear correlation internals: %.2fs, %d groups, %d raw, %d scores, %d models',
elapsed, len(gear_groups), total_raw, total_updated, len(models),
)
return {
'updated': total_updated,
'models': len(models),
'raw_inserted': total_raw,
}
# ── DB 헬퍼 (배치 최적화) ─────────────────────────────────────────
def _load_active_models(conn) -> list[ModelParams]:
"""활성 모델 로드."""
cur = conn.cursor()
try:
cur.execute(
f"SELECT id, name, params FROM {CORRELATION_PARAM_MODELS} "
"WHERE is_active = TRUE ORDER BY is_default DESC, id ASC"
)
rows = cur.fetchall()
models = []
for row in rows:
import json
params = row[2] if isinstance(row[2], dict) else json.loads(row[2])
models.append(ModelParams.from_db_row({
'id': row[0], 'name': row[1], 'params': params,
}))
return models
except Exception as e:
logger.error('failed to load models: %s', e)
return [ModelParams()]
finally:
cur.close()
def _load_all_scores(conn) -> dict[tuple, dict]:
"""모든 점수를 사전 로드. {(model_id, group_key, sub_cluster_id, target_mmsi): {...}}"""
cur = conn.cursor()
try:
cur.execute(
"SELECT model_id, group_key, sub_cluster_id, target_mmsi, "
"current_score, streak_count, last_observed_at, "
"target_type, target_name "
f"FROM {GEAR_CORRELATION_SCORES}"
)
result = {}
for row in cur.fetchall():
key = (row[0], row[1], row[2], row[3])
# psycopg2가 NUMERIC을 Decimal로 반환하므로 float으로 변환 (float 상수와의 연산 호환)
result[key] = {
'current_score': float(row[4]) if row[4] is not None else 0.0,
'streak_count': row[5],
'last_observed_at': row[6],
'target_type': row[7],
'target_name': row[8],
}
return result
except Exception as e:
logger.warning('failed to load all scores: %s', e)
return {}
finally:
cur.close()
def _batch_insert_raw(conn, batch: list[tuple]):
"""raw 메트릭 배치 INSERT."""
if not batch:
return
cur = conn.cursor()
try:
from psycopg2.extras import execute_values
execute_values(
cur,
f"""INSERT INTO {GEAR_CORRELATION_RAW_METRICS}
(observed_at, group_key, sub_cluster_id, target_mmsi, target_type, target_name,
proximity_ratio, visit_score, activity_sync,
dtw_similarity, speed_correlation, heading_coherence,
drift_similarity, shadow_stay, shadow_return,
gear_group_active_ratio)
VALUES %s""",
batch,
page_size=500,
)
except Exception as e:
logger.warning('batch insert raw failed: %s', e)
finally:
cur.close()
def _batch_upsert_scores(conn, batch: list[tuple]):
"""점수 배치 UPSERT."""
if not batch:
return
cur = conn.cursor()
try:
from psycopg2.extras import execute_values
execute_values(
cur,
f"""INSERT INTO {GEAR_CORRELATION_SCORES}
(model_id, group_key, sub_cluster_id, target_mmsi, target_type, target_name,
current_score, streak_count, freeze_state,
first_observed_at, last_observed_at, updated_at)
VALUES %s
ON CONFLICT (model_id, group_key, sub_cluster_id, target_mmsi)
DO UPDATE SET
target_type = EXCLUDED.target_type,
target_name = EXCLUDED.target_name,
current_score = EXCLUDED.current_score,
streak_count = EXCLUDED.streak_count,
freeze_state = EXCLUDED.freeze_state,
observation_count = {GEAR_CORRELATION_SCORES}.observation_count + 1,
last_observed_at = EXCLUDED.last_observed_at,
updated_at = EXCLUDED.updated_at""",
batch,
page_size=500,
)
except Exception as e:
logger.warning('batch upsert scores failed: %s', e)
finally:
cur.close()