feat: 어구 연관성 멀티모델 패턴 추적 시스템 (Phase 1 Core)

- gear_correlation.py: 적응형 EMA + freeze + shadow + 배치 최적화
- 5개 글로벌 모델 병렬 추적 (default/aggressive/conservative/proximity-heavy/visit-pattern)
- 어구 중심 점수 체계: 어구 비활성 시 FREEZE, 선박 shadow 추적
- 유형별 메트릭: 어구-선박(proximity+visit+activity), 선박-선박(DTW+SOG+COG)
- DB: correlation_param_models + raw_metrics(일별 파티션) + scores + system_config
- partition_manager: 일별 파티션 생성/정리 (system_config hot-reload)
- track_similarity: SOG상관 + COG동조 + 근접비 3개 메트릭 추가
- scheduler Step 4.7 통합, fleet_tracker MMSI 점수 이전
- chat/tools: query_gear_correlation 도구

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
htlee 2026-03-30 10:36:43 +09:00
부모 3407d37f9b
커밋 812a78f636
7개의 변경된 파일1247개의 추가작업 그리고 0개의 파일을 삭제

파일 보기

@ -0,0 +1,146 @@
-- 010: 어구 연관성 추적 시스템
-- - correlation_param_models: 파라미터 모델 마스터
-- - gear_correlation_raw_metrics: raw 메트릭 (타임스탬프 파티셔닝, 7일 보존)
-- - gear_correlation_scores: 모델별 어피니티 점수 (상태 테이블)
-- - system_config: 런타임 설정 (파티션 보관기간 등)
SET search_path TO kcg, public;
-- ── 파라미터 모델 ──
CREATE TABLE IF NOT EXISTS kcg.correlation_param_models (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL UNIQUE,
is_default BOOLEAN DEFAULT FALSE,
is_active BOOLEAN DEFAULT TRUE,
params JSONB NOT NULL,
description TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- default 모델 삽입
INSERT INTO kcg.correlation_param_models (name, is_default, is_active, params, description)
VALUES ('default', TRUE, TRUE,
'{"alpha_base":0.30,"alpha_min":0.08,"alpha_decay_per_streak":0.005,"track_threshold":0.50,"polygon_threshold":0.70,"w_proximity":0.45,"w_visit":0.35,"w_activity":0.20,"w_dtw":0.30,"w_sog_corr":0.20,"w_heading":0.25,"w_prox_vv":0.25,"w_prox_persist":0.50,"w_drift":0.30,"w_signal_sync":0.20,"group_quiet_ratio":0.30,"normal_gap_hours":1.0,"decay_slow":0.015,"decay_fast":0.08,"stale_hours":6.0,"shadow_stay_bonus":0.10,"shadow_return_bonus":0.15,"candidate_radius_factor":3.0,"proximity_threshold_nm":5.0,"visit_threshold_nm":5.0,"night_bonus":1.3,"long_decay_days":7.0}',
'기본 추적 모델')
ON CONFLICT (name) DO NOTHING;
-- ── Raw 메트릭 (모델 독립, 5분마다 기록, 타임스탬프 파티셔닝) ──
CREATE TABLE IF NOT EXISTS kcg.gear_correlation_raw_metrics (
id BIGSERIAL,
observed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
group_key VARCHAR(100) NOT NULL,
target_mmsi VARCHAR(20) NOT NULL,
target_type VARCHAR(10) NOT NULL,
target_name VARCHAR(200),
-- Raw 메트릭 (모든 모델이 공유)
proximity_ratio DOUBLE PRECISION,
visit_score DOUBLE PRECISION,
activity_sync DOUBLE PRECISION,
dtw_similarity DOUBLE PRECISION,
speed_correlation DOUBLE PRECISION,
heading_coherence DOUBLE PRECISION,
drift_similarity DOUBLE PRECISION,
-- Shadow
shadow_stay BOOLEAN DEFAULT FALSE,
shadow_return BOOLEAN DEFAULT FALSE,
-- 상태
gear_group_active_ratio DOUBLE PRECISION,
PRIMARY KEY (id, observed_at)
) PARTITION BY RANGE (observed_at);
-- 일별 파티션 생성 함수
CREATE OR REPLACE FUNCTION kcg.create_raw_metric_partitions(days_ahead INT DEFAULT 3)
RETURNS void AS $$
DECLARE
d DATE;
partition_name TEXT;
BEGIN
FOR i IN 0..days_ahead LOOP
d := CURRENT_DATE + i;
partition_name := 'gear_correlation_raw_metrics_' || TO_CHAR(d, 'YYYYMMDD');
IF NOT EXISTS (
SELECT 1 FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname = partition_name AND n.nspname = 'kcg'
) THEN
EXECUTE format(
'CREATE TABLE IF NOT EXISTS kcg.%I PARTITION OF kcg.gear_correlation_raw_metrics
FOR VALUES FROM (%L) TO (%L)',
partition_name, d, d + 1
);
END IF;
END LOOP;
END;
$$ LANGUAGE plpgsql;
-- 초기 파티션 생성 (오늘 + 3일)
SELECT kcg.create_raw_metric_partitions(3);
-- raw_metrics 인덱스
CREATE INDEX IF NOT EXISTS idx_raw_metrics_group_time
ON kcg.gear_correlation_raw_metrics (group_key, observed_at DESC);
CREATE INDEX IF NOT EXISTS idx_raw_metrics_target
ON kcg.gear_correlation_raw_metrics (target_mmsi, observed_at DESC);
-- ── 어피니티 점수 (모델별 독립, 상태 테이블) ──
CREATE TABLE IF NOT EXISTS kcg.gear_correlation_scores (
id BIGSERIAL PRIMARY KEY,
model_id INT NOT NULL REFERENCES kcg.correlation_param_models(id) ON DELETE CASCADE,
group_key VARCHAR(100) NOT NULL,
target_mmsi VARCHAR(20) NOT NULL,
target_type VARCHAR(10) NOT NULL,
target_name VARCHAR(200),
-- 모델별 점수 (EMA 결과)
current_score DOUBLE PRECISION DEFAULT 0,
streak_count INT DEFAULT 0,
observation_count INT DEFAULT 0,
-- Shadow 축적
shadow_bonus_total DOUBLE PRECISION DEFAULT 0,
shadow_stay_count INT DEFAULT 0,
shadow_return_count INT DEFAULT 0,
-- 상태
freeze_state VARCHAR(20) DEFAULT 'ACTIVE',
-- 시간
first_observed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_observed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (model_id, group_key, target_mmsi)
);
CREATE INDEX IF NOT EXISTS idx_gc_model_group
ON kcg.gear_correlation_scores (model_id, group_key, current_score DESC);
CREATE INDEX IF NOT EXISTS idx_gc_active
ON kcg.gear_correlation_scores (current_score DESC)
WHERE current_score >= 0.5;
-- ── 시스템 설정 (런타임 변경 가능, 재시작 불필요) ──
CREATE TABLE IF NOT EXISTS kcg.system_config (
key VARCHAR(100) PRIMARY KEY,
value JSONB NOT NULL,
description TEXT,
updated_at TIMESTAMPTZ DEFAULT NOW(),
updated_by VARCHAR(100) DEFAULT 'system'
);
INSERT INTO kcg.system_config (key, value, description) VALUES
('partition.raw_metrics.retention_days', '7',
'raw_metrics 파티션 보관 기간 (일). 초과 시 파티션 DROP.'),
('partition.raw_metrics.create_ahead_days', '3',
'미래 파티션 미리 생성 일수.'),
('partition.scores.cleanup_days', '30',
'미관측 점수 레코드 정리 기간 (일).'),
('correlation.max_active_models', '5',
'동시 활성 모델 최대 수.')
ON CONFLICT (key) DO NOTHING;

파일 보기

@ -0,0 +1,783 @@
"""어구 그룹 다단계 연관성 분석 — 멀티모델 패턴 추적.
Phase 1: default 모델 1개로 동작 (DB에서 is_active=true 모델 로드).
Phase 2: 글로벌 모델 max 5 병렬 실행.
어구 중심 점수 체계:
- 어구 신호 기준 관측 윈도우 (어구 비활성 FREEZE)
- 선박 shadow 추적 (비활성 활성 전환 보너스)
- 적응형 EMA + streak 자기강화
- 퍼센트 기반 무제한 추적 (50%+)
"""
from __future__ import annotations
import logging
import math
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
logger = logging.getLogger(__name__)
# ── 상수 ──────────────────────────────────────────────────────────
_EARTH_RADIUS_NM = 3440.065
_NM_TO_M = 1852.0
# ── 파라미터 모델 ─────────────────────────────────────────────────
@dataclass
class ModelParams:
"""추적 모델의 전체 파라미터셋."""
model_id: int = 1
name: str = 'default'
# EMA
alpha_base: float = 0.30
alpha_min: float = 0.08
alpha_decay_per_streak: float = 0.005
# 임계값
track_threshold: float = 0.50
polygon_threshold: float = 0.70
# 메트릭 가중치 — 어구-선박
w_proximity: float = 0.45
w_visit: float = 0.35
w_activity: float = 0.20
# 메트릭 가중치 — 선박-선박
w_dtw: float = 0.30
w_sog_corr: float = 0.20
w_heading: float = 0.25
w_prox_vv: float = 0.25
# 메트릭 가중치 — 어구-어구
w_prox_persist: float = 0.50
w_drift: float = 0.30
w_signal_sync: float = 0.20
# Freeze 기준
group_quiet_ratio: float = 0.30
normal_gap_hours: float = 1.0
# 감쇠
decay_slow: float = 0.015
decay_fast: float = 0.08
stale_hours: float = 6.0
# Shadow
shadow_stay_bonus: float = 0.10
shadow_return_bonus: float = 0.15
# 거리
candidate_radius_factor: float = 3.0
proximity_threshold_nm: float = 5.0
visit_threshold_nm: float = 5.0
# 야간
night_bonus: float = 1.3
# 장기 감쇠
long_decay_days: float = 7.0
@classmethod
def from_db_row(cls, row: dict) -> ModelParams:
"""DB correlation_param_models 행에서 생성."""
params_json = row.get('params', {})
return cls(
model_id=row['id'],
name=row['name'],
**{k: v for k, v in params_json.items() if hasattr(cls, k)},
)
# ── Haversine 거리 ────────────────────────────────────────────────
def _haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""두 좌표 간 거리 (해리)."""
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
return _EARTH_RADIUS_NM * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
# ── Freeze 판단 ───────────────────────────────────────────────────
def should_freeze(
gear_group_active_ratio: float,
target_last_observed: Optional[datetime],
now: datetime,
params: ModelParams,
) -> tuple[bool, str]:
"""감쇠 적용 여부 판단. 어구 그룹이 기준."""
# 1. 어구 그룹 비활성 → 비교 불가
if gear_group_active_ratio < params.group_quiet_ratio:
return True, 'GROUP_QUIET'
# 2. 개별 부재가 정상 범위
if target_last_observed is not None:
hours_absent = (now - target_last_observed).total_seconds() / 3600
if hours_absent < params.normal_gap_hours:
return True, 'NORMAL_GAP'
return False, 'ACTIVE'
# ── EMA 업데이트 ──────────────────────────────────────────────────
def update_score(
prev_score: Optional[float],
raw_score: Optional[float],
streak: int,
last_observed: Optional[datetime],
now: datetime,
gear_group_active_ratio: float,
shadow_bonus: float,
params: ModelParams,
) -> tuple[float, int, str]:
"""적응형 EMA 점수 업데이트.
Returns: (new_score, new_streak, state)
"""
# 관측 불가
if raw_score is None:
frz, reason = should_freeze(
gear_group_active_ratio, last_observed, now, params,
)
if frz:
return (prev_score or 0.0), streak, reason
# 실제 이탈 → 감쇠
hours_absent = 0.0
if last_observed is not None:
hours_absent = (now - last_observed).total_seconds() / 3600
decay = params.decay_fast if hours_absent > params.stale_hours else params.decay_slow
return max(0.0, (prev_score or 0.0) - decay), 0, 'SIGNAL_LOSS'
# Shadow 보너스
adjusted = min(1.0, raw_score + shadow_bonus)
# Case 1: 임계값 이상 → streak 보상
if adjusted >= params.track_threshold:
streak += 1
alpha = max(params.alpha_min,
params.alpha_base - streak * params.alpha_decay_per_streak)
if prev_score is None:
return adjusted, streak, 'ACTIVE'
return alpha * adjusted + (1.0 - alpha) * prev_score, streak, 'ACTIVE'
# Case 2: 패턴 이탈
alpha = params.alpha_base
if prev_score is None:
return adjusted, 0, 'PATTERN_DIVERGE'
return alpha * adjusted + (1.0 - alpha) * prev_score, 0, 'PATTERN_DIVERGE'
# ── 어구-선박 메트릭 ──────────────────────────────────────────────
def _compute_gear_vessel_metrics(
gear_center_lat: float,
gear_center_lon: float,
gear_radius_nm: float,
vessel_track: list[dict],
params: ModelParams,
) -> dict:
"""어구 그룹 중심 vs 선박 궤적 메트릭.
vessel_track: [{lat, lon, sog, cog, timestamp}, ...]
"""
if not vessel_track:
return {'proximity_ratio': 0, 'visit_score': 0, 'activity_sync': 0, 'composite': 0}
threshold_nm = max(gear_radius_nm * 2, params.proximity_threshold_nm)
# 1. proximity_ratio — 근접 지속비
close_count = 0
for p in vessel_track:
d = _haversine_nm(gear_center_lat, gear_center_lon, p['lat'], p['lon'])
if d < threshold_nm:
close_count += 1
proximity_ratio = close_count / len(vessel_track)
# 2. visit_score — 방문 패턴
visit_threshold = params.visit_threshold_nm
in_zone = False
visits = 0
stay_points = 0
away_points = 0
for p in vessel_track:
d = _haversine_nm(gear_center_lat, gear_center_lon, p['lat'], p['lon'])
if d < visit_threshold:
if not in_zone:
visits += 1
in_zone = True
stay_points += 1
else:
in_zone = False
away_points += 1
visit_count_norm = min(1.0, visits / 5.0) if visits > 0 else 0.0
total = stay_points + away_points
stay_ratio = stay_points / total if total > 0 else 0.0
visit_score = 0.5 * visit_count_norm + 0.5 * stay_ratio
# 3. activity_sync — 영역 내 저속 비율 (조업/관리 행위)
in_zone_count = 0
in_zone_slow = 0
for p in vessel_track:
d = _haversine_nm(gear_center_lat, gear_center_lon, p['lat'], p['lon'])
if d < visit_threshold:
in_zone_count += 1
if p.get('sog', 0) < 2.0:
in_zone_slow += 1
activity_sync = in_zone_slow / in_zone_count if in_zone_count > 0 else 0.0
# 가중 합산
composite = (
params.w_proximity * proximity_ratio
+ params.w_visit * visit_score
+ params.w_activity * activity_sync
)
return {
'proximity_ratio': round(proximity_ratio, 4),
'visit_score': round(visit_score, 4),
'activity_sync': round(activity_sync, 4),
'composite': round(composite, 4),
}
# ── 선박-선박 메트릭 ──────────────────────────────────────────────
def _compute_vessel_vessel_metrics(
track_a: list[dict],
track_b: list[dict],
params: ModelParams,
) -> dict:
"""두 선박 궤적 간 메트릭."""
from algorithms.track_similarity import (
compute_heading_coherence,
compute_proximity_ratio,
compute_sog_correlation,
compute_track_similarity,
)
if not track_a or not track_b:
return {
'dtw_similarity': 0, 'speed_correlation': 0,
'heading_coherence': 0, 'proximity_ratio': 0, 'composite': 0,
}
# DTW
pts_a = [(p['lat'], p['lon']) for p in track_a]
pts_b = [(p['lat'], p['lon']) for p in track_b]
dtw_sim = compute_track_similarity(pts_a, pts_b)
# SOG 상관
sog_a = [p.get('sog', 0) for p in track_a]
sog_b = [p.get('sog', 0) for p in track_b]
sog_corr = compute_sog_correlation(sog_a, sog_b)
# COG 동조
cog_a = [p.get('cog', 0) for p in track_a]
cog_b = [p.get('cog', 0) for p in track_b]
heading = compute_heading_coherence(cog_a, cog_b)
# 근접비
prox = compute_proximity_ratio(pts_a, pts_b, params.proximity_threshold_nm)
composite = (
params.w_dtw * dtw_sim
+ params.w_sog_corr * sog_corr
+ params.w_heading * heading
+ params.w_prox_vv * prox
)
return {
'dtw_similarity': round(dtw_sim, 4),
'speed_correlation': round(sog_corr, 4),
'heading_coherence': round(heading, 4),
'proximity_ratio': round(prox, 4),
'composite': round(composite, 4),
}
# ── 어구-어구 메트릭 ──────────────────────────────────────────────
def _compute_gear_gear_metrics(
center_a: tuple[float, float],
center_b: tuple[float, float],
center_history_a: list[dict],
center_history_b: list[dict],
params: ModelParams,
) -> dict:
"""두 어구 그룹 간 메트릭."""
if not center_history_a or not center_history_b:
return {
'proximity_ratio': 0, 'drift_similarity': 0,
'composite': 0,
}
# 1. 근접 지속성 — 현재 중심 간 거리의 안정성
dist_nm = _haversine_nm(center_a[0], center_a[1], center_b[0], center_b[1])
prox_persist = max(0.0, 1.0 - dist_nm / 20.0) # 20NM 이상이면 0
# 2. 표류 유사도 — 중심 이동 벡터 코사인 유사도
drift_sim = 0.0
n = min(len(center_history_a), len(center_history_b))
if n >= 2:
# 마지막 2점으로 이동 벡터 계산
da_lat = center_history_a[-1].get('lat', 0) - center_history_a[-2].get('lat', 0)
da_lon = center_history_a[-1].get('lon', 0) - center_history_a[-2].get('lon', 0)
db_lat = center_history_b[-1].get('lat', 0) - center_history_b[-2].get('lat', 0)
db_lon = center_history_b[-1].get('lon', 0) - center_history_b[-2].get('lon', 0)
dot = da_lat * db_lat + da_lon * db_lon
mag_a = (da_lat ** 2 + da_lon ** 2) ** 0.5
mag_b = (db_lat ** 2 + db_lon ** 2) ** 0.5
if mag_a > 1e-10 and mag_b > 1e-10:
cos_sim = dot / (mag_a * mag_b)
drift_sim = max(0.0, (cos_sim + 1.0) / 2.0)
composite = (
params.w_prox_persist * prox_persist
+ params.w_drift * drift_sim
)
return {
'proximity_ratio': round(prox_persist, 4),
'drift_similarity': round(drift_sim, 4),
'composite': round(composite, 4),
}
# ── Shadow 보너스 계산 ────────────────────────────────────────────
def compute_shadow_bonus(
vessel_positions_during_inactive: list[dict],
last_known_gear_center: tuple[float, float],
group_radius_nm: float,
params: ModelParams,
) -> tuple[float, bool, bool]:
"""어구 비활성 동안 선박이 어구 근처에 머물렀는지 평가.
Returns: (bonus, stayed_nearby, returned_before_resume)
"""
if not vessel_positions_during_inactive or last_known_gear_center is None:
return 0.0, False, False
gc_lat, gc_lon = last_known_gear_center
threshold_nm = max(group_radius_nm * 2, params.proximity_threshold_nm)
# 1. 평균 거리
dists = [
_haversine_nm(gc_lat, gc_lon, p['lat'], p['lon'])
for p in vessel_positions_during_inactive
]
avg_dist = sum(dists) / len(dists)
stayed = avg_dist < threshold_nm
# 2. 마지막 위치가 근처인지 (복귀 판단)
returned = dists[-1] < threshold_nm if dists else False
bonus = 0.0
if stayed:
bonus += params.shadow_stay_bonus
if returned:
bonus += params.shadow_return_bonus
return bonus, stayed, returned
# ── 후보 필터링 ───────────────────────────────────────────────────
def _compute_group_radius(members: list[dict]) -> float:
"""그룹 멤버 간 최대 거리의 절반 (NM)."""
if len(members) < 2:
return 1.0 # 최소 1NM
max_dist = 0.0
for i in range(len(members)):
for j in range(i + 1, len(members)):
d = _haversine_nm(
members[i]['lat'], members[i]['lon'],
members[j]['lat'], members[j]['lon'],
)
if d > max_dist:
max_dist = d
return max(1.0, max_dist / 2.0)
def find_candidates(
gear_center_lat: float,
gear_center_lon: float,
group_radius_nm: float,
group_mmsis: set[str],
all_positions: dict[str, dict],
params: ModelParams,
) -> list[str]:
"""어구 그룹 주변 후보 MMSI 필터링."""
search_radius = group_radius_nm * params.candidate_radius_factor
candidates = []
for mmsi, pos in all_positions.items():
if mmsi in group_mmsis:
continue
d = _haversine_nm(gear_center_lat, gear_center_lon, pos['lat'], pos['lon'])
if d < search_radius:
candidates.append(mmsi)
return candidates
# ── 메인 실행 ─────────────────────────────────────────────────────
def _get_vessel_track(vessel_store, mmsi: str, hours: int = 6) -> list[dict]:
"""vessel_store에서 특정 MMSI의 최근 N시간 궤적 추출 (벡터화)."""
df = vessel_store._tracks.get(mmsi)
if df is None or len(df) == 0:
return []
import pandas as pd
now = datetime.now(timezone.utc)
cutoff = now - pd.Timedelta(hours=hours)
ts_col = df['timestamp']
if hasattr(ts_col.dtype, 'tz') and ts_col.dtype.tz is not None:
mask = ts_col >= pd.Timestamp(cutoff)
else:
mask = ts_col >= pd.Timestamp(cutoff.replace(tzinfo=None))
recent = df.loc[mask]
if recent.empty:
return []
# 벡터화 추출 (iterrows 대신)
lats = recent['lat'].values
lons = recent['lon'].values
sogs = (recent['sog'] if 'sog' in recent.columns
else recent.get('raw_sog', pd.Series(dtype=float))).fillna(0).values
cogs = (recent['cog'] if 'cog' in recent.columns
else pd.Series(0, index=recent.index)).fillna(0).values
return [
{'lat': float(lats[i]), 'lon': float(lons[i]),
'sog': float(sogs[i]), 'cog': float(cogs[i])}
for i in range(len(lats))
]
def _compute_gear_active_ratio(
gear_members: list[dict],
all_positions: dict[str, dict],
now: datetime,
stale_sec: float = 21600,
) -> float:
"""어구 그룹의 활성 멤버 비율."""
if not gear_members:
return 0.0
active = 0
for m in gear_members:
pos = all_positions.get(m['mmsi'])
if pos is None:
continue
ts = pos.get('timestamp')
if ts is None:
continue
if isinstance(ts, datetime):
last_dt = ts if ts.tzinfo is not None else ts.replace(tzinfo=timezone.utc)
else:
try:
import pandas as pd
last_dt = pd.Timestamp(ts).to_pydatetime()
if last_dt.tzinfo is None:
last_dt = last_dt.replace(tzinfo=timezone.utc)
except Exception:
continue
age = (now - last_dt).total_seconds()
if age < stale_sec:
active += 1
return active / len(gear_members)
def _is_gear_pattern(name: str) -> bool:
"""어구 이름 패턴 판별."""
import re
return bool(re.match(r'^.+_\d+_\d*$', name or ''))
_MAX_CANDIDATES_PER_GROUP = 30 # 후보 수 상한 (성능 보호)
def run_gear_correlation(
vessel_store,
gear_groups: list[dict],
conn,
) -> dict:
"""어구 연관성 분석 메인 실행 (배치 최적화).
Args:
vessel_store: VesselStore 인스턴스
gear_groups: detect_gear_groups() 결과
conn: kcgdb 커넥션
Returns:
{'updated': int, 'models': int, 'raw_inserted': int}
"""
import time as _time
import re as _re
_gear_re = _re.compile(r'^.+_\d+_\d*$')
t0 = _time.time()
now = datetime.now(timezone.utc)
all_positions = vessel_store.get_all_latest_positions()
# 활성 모델 로드
models = _load_active_models(conn)
if not models:
logger.warning('no active correlation models found')
return {'updated': 0, 'models': 0, 'raw_inserted': 0}
# 기존 점수 전체 사전 로드 (건별 쿼리 대신 벌크)
all_scores = _load_all_scores(conn)
raw_batch: list[tuple] = []
score_batch: list[tuple] = []
total_updated = 0
total_raw = 0
default_params = models[0]
for gear_group in gear_groups:
parent_name = gear_group['parent_name']
members = gear_group['members']
if not members:
continue
# 그룹 중심 + 반경
center_lat = sum(m['lat'] for m in members) / len(members)
center_lon = sum(m['lon'] for m in members) / len(members)
group_radius = _compute_group_radius(members)
# 어구 활성도
active_ratio = _compute_gear_active_ratio(members, all_positions, now)
# 그룹 멤버 MMSI 셋
group_mmsis = {m['mmsi'] for m in members}
if gear_group.get('parent_mmsi'):
group_mmsis.add(gear_group['parent_mmsi'])
# 후보 필터링 + 수 제한
candidates = find_candidates(
center_lat, center_lon, group_radius,
group_mmsis, all_positions, default_params,
)
if not candidates:
continue
if len(candidates) > _MAX_CANDIDATES_PER_GROUP:
# 가까운 순서로 제한
candidates.sort(key=lambda m: _haversine_nm(
center_lat, center_lon,
all_positions[m]['lat'], all_positions[m]['lon'],
))
candidates = candidates[:_MAX_CANDIDATES_PER_GROUP]
for target_mmsi in candidates:
target_pos = all_positions.get(target_mmsi)
if target_pos is None:
continue
target_name = target_pos.get('name', '')
target_is_gear = bool(_gear_re.match(target_name or ''))
target_type = 'GEAR_BUOY' if target_is_gear else 'VESSEL'
# 메트릭 계산 (어구는 단순 거리, 선박은 track 기반)
if target_is_gear:
d = _haversine_nm(center_lat, center_lon,
target_pos['lat'], target_pos['lon'])
prox = max(0.0, 1.0 - d / 20.0)
metrics = {'proximity_ratio': prox, 'composite': prox}
else:
vessel_track = _get_vessel_track(vessel_store, target_mmsi, hours=6)
metrics = _compute_gear_vessel_metrics(
center_lat, center_lon, group_radius,
vessel_track, default_params,
)
# raw 메트릭 배치 수집
raw_batch.append((
now, parent_name, target_mmsi, target_type, target_name,
metrics.get('proximity_ratio'), metrics.get('visit_score'),
metrics.get('activity_sync'), metrics.get('dtw_similarity'),
metrics.get('speed_correlation'), metrics.get('heading_coherence'),
metrics.get('drift_similarity'), False, False, active_ratio,
))
total_raw += 1
# 모델별 EMA 업데이트
for model in models:
if target_is_gear:
composite = metrics.get('proximity_ratio', 0) * model.w_prox_persist
else:
composite = (
model.w_proximity * (metrics.get('proximity_ratio') or 0)
+ model.w_visit * (metrics.get('visit_score') or 0)
+ model.w_activity * (metrics.get('activity_sync') or 0)
)
# 사전 로드된 점수에서 조회 (DB 쿼리 없음)
score_key = (model.model_id, parent_name, target_mmsi)
prev = all_scores.get(score_key)
prev_score = prev['current_score'] if prev else None
streak = prev['streak_count'] if prev else 0
last_obs = prev['last_observed_at'] if prev else None
new_score, new_streak, state = update_score(
prev_score, composite, streak,
last_obs, now, active_ratio,
0.0, model,
)
if new_score >= model.track_threshold or prev is not None:
score_batch.append((
model.model_id, parent_name, target_mmsi,
target_type, target_name,
round(new_score, 6), new_streak, state,
now, now, now,
))
total_updated += 1
# 배치 DB 저장
_batch_insert_raw(conn, raw_batch)
_batch_upsert_scores(conn, score_batch)
conn.commit()
elapsed = round(_time.time() - t0, 2)
logger.info(
'gear correlation internals: %.2fs, %d groups, %d raw, %d scores, %d models',
elapsed, len(gear_groups), total_raw, total_updated, len(models),
)
return {
'updated': total_updated,
'models': len(models),
'raw_inserted': total_raw,
}
# ── DB 헬퍼 (배치 최적화) ─────────────────────────────────────────
def _load_active_models(conn) -> list[ModelParams]:
"""활성 모델 로드."""
cur = conn.cursor()
try:
cur.execute(
"SELECT id, name, params FROM kcg.correlation_param_models "
"WHERE is_active = TRUE ORDER BY is_default DESC, id ASC"
)
rows = cur.fetchall()
models = []
for row in rows:
import json
params = row[2] if isinstance(row[2], dict) else json.loads(row[2])
models.append(ModelParams.from_db_row({
'id': row[0], 'name': row[1], 'params': params,
}))
return models
except Exception as e:
logger.error('failed to load models: %s', e)
return [ModelParams()]
finally:
cur.close()
def _load_all_scores(conn) -> dict[tuple, dict]:
"""모든 점수를 사전 로드. {(model_id, group_key, target_mmsi): {...}}"""
cur = conn.cursor()
try:
cur.execute(
"SELECT model_id, group_key, target_mmsi, "
"current_score, streak_count, last_observed_at "
"FROM kcg.gear_correlation_scores"
)
result = {}
for row in cur.fetchall():
key = (row[0], row[1], row[2])
result[key] = {
'current_score': row[3],
'streak_count': row[4],
'last_observed_at': row[5],
}
return result
except Exception as e:
logger.warning('failed to load all scores: %s', e)
return {}
finally:
cur.close()
def _batch_insert_raw(conn, batch: list[tuple]):
"""raw 메트릭 배치 INSERT."""
if not batch:
return
cur = conn.cursor()
try:
from psycopg2.extras import execute_values
execute_values(
cur,
"""INSERT INTO kcg.gear_correlation_raw_metrics
(observed_at, group_key, target_mmsi, target_type, target_name,
proximity_ratio, visit_score, activity_sync,
dtw_similarity, speed_correlation, heading_coherence,
drift_similarity, shadow_stay, shadow_return,
gear_group_active_ratio)
VALUES %s""",
batch,
page_size=500,
)
except Exception as e:
logger.warning('batch insert raw failed: %s', e)
finally:
cur.close()
def _batch_upsert_scores(conn, batch: list[tuple]):
"""점수 배치 UPSERT."""
if not batch:
return
cur = conn.cursor()
try:
from psycopg2.extras import execute_values
execute_values(
cur,
"""INSERT INTO kcg.gear_correlation_scores
(model_id, group_key, target_mmsi, target_type, target_name,
current_score, streak_count, freeze_state,
first_observed_at, last_observed_at, updated_at)
VALUES %s
ON CONFLICT (model_id, group_key, target_mmsi)
DO UPDATE SET
current_score = EXCLUDED.current_score,
streak_count = EXCLUDED.streak_count,
freeze_state = EXCLUDED.freeze_state,
observation_count = kcg.gear_correlation_scores.observation_count + 1,
last_observed_at = EXCLUDED.last_observed_at,
updated_at = EXCLUDED.updated_at""",
batch,
page_size=500,
)
except Exception as e:
logger.warning('batch upsert scores failed: %s', e)
finally:
cur.close()

파일 보기

@ -158,3 +158,87 @@ def match_gear_by_track(
})
return results
def compute_sog_correlation(
sog_a: list[float],
sog_b: list[float],
) -> float:
"""두 SOG 시계열의 피어슨 상관계수 (0~1 정규화).
시계열 길이가 다르면 짧은 기준으로 자름.
데이터 부족(< 3)이면 0.0 반환.
"""
n = min(len(sog_a), len(sog_b))
if n < 3:
return 0.0
a = sog_a[:n]
b = sog_b[:n]
mean_a = sum(a) / n
mean_b = sum(b) / n
cov = sum((a[i] - mean_a) * (b[i] - mean_b) for i in range(n))
var_a = sum((x - mean_a) ** 2 for x in a)
var_b = sum((x - mean_b) ** 2 for x in b)
denom = (var_a * var_b) ** 0.5
if denom < 1e-12:
return 0.0
corr = cov / denom # -1 ~ 1
return max(0.0, (corr + 1.0) / 2.0) # 0 ~ 1 정규화
def compute_heading_coherence(
cog_a: list[float],
cog_b: list[float],
threshold_deg: float = 30.0,
) -> float:
"""두 COG 시계열의 방향 동조율 (0~1).
angular diff < threshold_deg 비율.
시계열 길이가 다르면 짧은 기준.
데이터 부족(< 3)이면 0.0 반환.
"""
n = min(len(cog_a), len(cog_b))
if n < 3:
return 0.0
coherent = 0
for i in range(n):
diff = abs(cog_a[i] - cog_b[i])
if diff > 180.0:
diff = 360.0 - diff
if diff < threshold_deg:
coherent += 1
return coherent / n
def compute_proximity_ratio(
track_a: list[tuple[float, float]],
track_b: list[tuple[float, float]],
threshold_nm: float = 10.0,
) -> float:
"""두 궤적의 근접 지속비 (0~1).
시간 정렬된 포인트 쌍에서 haversine < threshold_nm 비율.
시계열 길이가 다르면 짧은 기준.
데이터 부족(< 2)이면 0.0 반환.
"""
n = min(len(track_a), len(track_b))
if n < 2:
return 0.0
close = 0
threshold_m = threshold_nm * 1852.0
for i in range(n):
dist = haversine_m(track_a[i][0], track_a[i][1],
track_b[i][0], track_b[i][1])
if dist < threshold_m:
close += 1
return close / n

파일 보기

@ -197,6 +197,9 @@ def execute_tool_call(call: dict) -> str:
if tool == 'get_knowledge':
return _get_knowledge(params)
if tool == 'query_gear_correlation':
return _query_gear_correlation(params)
return f'(알 수 없는 도구: {tool})'
@ -357,3 +360,54 @@ def _query_vessel_static(params: dict) -> str:
except Exception as e:
logger.error('vessel static query failed: %s', e)
return f'\n(정적정보 조회 실패: {e})\n'
def _query_gear_correlation(params: dict) -> str:
"""어구 그룹의 연관 선박/어구 조회."""
from db import kcgdb
group_key = params.get('group_key', '')
limit = int(params.get('limit', 10))
with kcgdb.get_conn() as conn:
cur = conn.cursor()
try:
cur.execute(
'SELECT target_name, target_mmsi, target_type, current_score, '
'streak_count, observation_count, proximity_ratio, visit_score, '
'heading_coherence, freeze_state '
'FROM kcg.gear_correlation_scores s '
'JOIN kcg.correlation_param_models m ON s.model_id = m.id '
'WHERE s.group_key = %s AND m.is_default = TRUE AND s.current_score >= 0.3 '
'ORDER BY s.current_score DESC LIMIT %s',
(group_key, limit),
)
rows = cur.fetchall()
except Exception:
return f'어구 그룹 "{group_key}"에 대한 연관성 데이터가 없습니다 (테이블 미생성).'
finally:
cur.close()
if not rows:
return f'어구 그룹 "{group_key}"에 대한 연관성 데이터가 없습니다.'
lines = [f'## {group_key} 연관 분석 (상위 {len(rows)}개, default 모델)']
for r in rows:
name, mmsi, ttype, score, streak, obs, prox, visit, heading, state = r
pct = score * 100
disp_name = name or mmsi
detail_parts = []
if prox is not None:
detail_parts.append(f'근접 {prox*100:.0f}%')
if visit is not None:
detail_parts.append(f'방문 {visit*100:.0f}%')
if heading is not None:
detail_parts.append(f'COG동조 {heading*100:.0f}%')
detail = ', '.join(detail_parts) if detail_parts else ''
lines.append(
f'- **{disp_name}** ({mmsi}, {ttype}): '
f'일치율 {pct:.1f}% (연속 {streak}회, 관측 {obs}회) '
f'[{detail}] 상태: {state}'
)
return '\n'.join(lines)

파일 보기

@ -0,0 +1,136 @@
"""gear_correlation_raw_metrics 파티션 유지보수.
APScheduler 일별 작업으로 실행:
- system_config에서 설정 읽기 (hot-reload, 프로세스 재시작 불필요)
- 미래 파티션 미리 생성
- 만료 파티션 DROP
- 미관측 점수 레코드 정리
"""
import logging
from datetime import date, datetime, timedelta
logger = logging.getLogger(__name__)
def _get_config_int(conn, key: str, default: int) -> int:
"""system_config에서 설정값 조회. 없으면 default."""
cur = conn.cursor()
try:
cur.execute(
"SELECT value::text FROM kcg.system_config WHERE key = %s",
(key,),
)
row = cur.fetchone()
return int(row[0].strip('"')) if row else default
except Exception:
return default
finally:
cur.close()
def _create_future_partitions(conn, days_ahead: int) -> int:
"""미래 N일 파티션 생성. 반환: 생성된 파티션 수."""
cur = conn.cursor()
created = 0
try:
for i in range(days_ahead + 1):
d = date.today() + timedelta(days=i)
partition_name = f'gear_correlation_raw_metrics_{d.strftime("%Y%m%d")}'
cur.execute(
"SELECT 1 FROM pg_class c "
"JOIN pg_namespace n ON n.oid = c.relnamespace "
"WHERE c.relname = %s AND n.nspname = 'kcg'",
(partition_name,),
)
if cur.fetchone() is None:
next_d = d + timedelta(days=1)
cur.execute(
f"CREATE TABLE IF NOT EXISTS kcg.{partition_name} "
f"PARTITION OF kcg.gear_correlation_raw_metrics "
f"FOR VALUES FROM ('{d.isoformat()}') TO ('{next_d.isoformat()}')"
)
created += 1
logger.info('created partition: kcg.%s', partition_name)
conn.commit()
except Exception as e:
conn.rollback()
logger.error('failed to create partitions: %s', e)
finally:
cur.close()
return created
def _drop_expired_partitions(conn, retention_days: int) -> int:
"""retention_days 초과 파티션 DROP. 반환: 삭제된 파티션 수."""
cutoff = date.today() - timedelta(days=retention_days)
cur = conn.cursor()
dropped = 0
try:
cur.execute(
"SELECT c.relname FROM pg_class c "
"JOIN pg_namespace n ON n.oid = c.relnamespace "
"WHERE c.relname LIKE 'gear_correlation_raw_metrics_%%' "
"AND n.nspname = 'kcg' AND c.relkind = 'r'"
)
for (name,) in cur.fetchall():
date_str = name.rsplit('_', 1)[-1]
try:
partition_date = datetime.strptime(date_str, '%Y%m%d').date()
except ValueError:
continue
if partition_date < cutoff:
cur.execute(f'DROP TABLE IF EXISTS kcg.{name}')
dropped += 1
logger.info('dropped expired partition: kcg.%s', name)
conn.commit()
except Exception as e:
conn.rollback()
logger.error('failed to drop partitions: %s', e)
finally:
cur.close()
return dropped
def _cleanup_stale_scores(conn, cleanup_days: int) -> int:
"""cleanup_days 이상 미관측 점수 레코드 삭제."""
cur = conn.cursor()
try:
cur.execute(
"DELETE FROM kcg.gear_correlation_scores "
"WHERE last_observed_at < NOW() - make_interval(days => %s)",
(cleanup_days,),
)
deleted = cur.rowcount
conn.commit()
return deleted
except Exception as e:
conn.rollback()
logger.error('failed to cleanup stale scores: %s', e)
return 0
finally:
cur.close()
def maintain_partitions():
"""일별 파티션 유지보수 — 스케줄러에서 호출.
system_config에서 설정을 매번 읽으므로
API를 통한 설정 변경이 다음 실행 즉시 반영됨.
"""
from db import kcgdb
with kcgdb.get_conn() as conn:
retention = _get_config_int(conn, 'partition.raw_metrics.retention_days', 7)
ahead = _get_config_int(conn, 'partition.raw_metrics.create_ahead_days', 3)
cleanup_days = _get_config_int(conn, 'partition.scores.cleanup_days', 30)
created = _create_future_partitions(conn, ahead)
dropped = _drop_expired_partitions(conn, retention)
cleaned = _cleanup_stale_scores(conn, cleanup_days)
logger.info(
'partition maintenance: %d created, %d dropped, %d stale scores cleaned '
'(retention=%dd, ahead=%dd, cleanup=%dd)',
created, dropped, cleaned, retention, ahead, cleanup_days,
)

파일 보기

@ -210,6 +210,22 @@ class FleetTracker:
)
logger.info('gear MMSI change: %s%s (name=%s)', old_mmsi_row[1], mmsi, name)
# 어피니티 점수 이전 (이전 MMSI → 새 MMSI)
try:
cur.execute(
"UPDATE kcg.gear_correlation_scores "
"SET target_mmsi = %s, updated_at = NOW() "
"WHERE target_mmsi = %s",
(mmsi, old_mmsi_row[1]),
)
if cur.rowcount > 0:
logger.info(
'transferred %d affinity scores: %s%s',
cur.rowcount, old_mmsi_row[1], mmsi,
)
except Exception as e:
logger.warning('affinity score transfer failed: %s', e)
cur.execute(
"""INSERT INTO kcg.gear_identity_log
(mmsi, name, parent_name, parent_mmsi, parent_vessel_id,

파일 보기

@ -99,6 +99,8 @@ def run_analysis_cycle():
fleet_tracker.save_snapshot(vessel_dfs, kcg_conn)
gear_groups = []
# 4.5 그룹 폴리곤 생성 + 저장
try:
from algorithms.polygon_builder import detect_gear_groups, build_all_group_snapshots
@ -116,6 +118,23 @@ def run_analysis_cycle():
except Exception as e:
logger.warning('group polygon generation failed: %s', e)
# 4.7 어구 연관성 분석 (멀티모델 패턴 추적)
try:
from algorithms.gear_correlation import run_gear_correlation
corr_result = run_gear_correlation(
vessel_store=vessel_store,
gear_groups=gear_groups,
conn=kcg_conn,
)
logger.info(
'gear correlation: %d scores updated, %d raw metrics, %d models',
corr_result['updated'], corr_result['raw_inserted'],
corr_result['models'],
)
except Exception as e:
logger.warning('gear correlation failed: %s', e)
# 5. 선박별 추가 알고리즘 → AnalysisResult 생성
results = []
for c in classifications:
@ -329,6 +348,15 @@ def start_scheduler():
max_instances=1,
replace_existing=True,
)
# 파티션 유지보수 (매일 04:00)
from db.partition_manager import maintain_partitions
_scheduler.add_job(
maintain_partitions,
'cron', hour=4, minute=0,
id='partition_maintenance',
max_instances=1,
replace_existing=True,
)
_scheduler.start()
logger.info('scheduler started (interval=%dm)', settings.SCHEDULER_INTERVAL_MIN)