From 812a78f636caabe9332f0c3fc29945bf2c75056e Mon Sep 17 00:00:00 2001 From: htlee Date: Mon, 30 Mar 2026 10:36:43 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=EC=96=B4=EA=B5=AC=20=EC=97=B0=EA=B4=80?= =?UTF-8?q?=EC=84=B1=20=EB=A9=80=ED=8B=B0=EB=AA=A8=EB=8D=B8=20=ED=8C=A8?= =?UTF-8?q?=ED=84=B4=20=EC=B6=94=EC=A0=81=20=EC=8B=9C=EC=8A=A4=ED=85=9C=20?= =?UTF-8?q?(Phase=201=20Core)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - gear_correlation.py: 적응형 EMA + freeze + shadow + 배치 최적화 - 5개 글로벌 모델 병렬 추적 (default/aggressive/conservative/proximity-heavy/visit-pattern) - 어구 중심 점수 체계: 어구 비활성 시 FREEZE, 선박 shadow 추적 - 유형별 메트릭: 어구-선박(proximity+visit+activity), 선박-선박(DTW+SOG+COG) - DB: correlation_param_models + raw_metrics(일별 파티션) + scores + system_config - partition_manager: 일별 파티션 생성/정리 (system_config hot-reload) - track_similarity: SOG상관 + COG동조 + 근접비 3개 메트릭 추가 - scheduler Step 4.7 통합, fleet_tracker MMSI 점수 이전 - chat/tools: query_gear_correlation 도구 Co-Authored-By: Claude Opus 4.6 (1M context) --- database/migration/010_gear_correlation.sql | 146 ++++ prediction/algorithms/gear_correlation.py | 783 ++++++++++++++++++++ prediction/algorithms/track_similarity.py | 84 +++ prediction/chat/tools.py | 54 ++ prediction/db/partition_manager.py | 136 ++++ prediction/fleet_tracker.py | 16 + prediction/scheduler.py | 28 + 7 files changed, 1247 insertions(+) create mode 100644 database/migration/010_gear_correlation.sql create mode 100644 prediction/algorithms/gear_correlation.py create mode 100644 prediction/db/partition_manager.py diff --git a/database/migration/010_gear_correlation.sql b/database/migration/010_gear_correlation.sql new file mode 100644 index 0000000..a36c5b5 --- /dev/null +++ b/database/migration/010_gear_correlation.sql @@ -0,0 +1,146 @@ +-- 010: 어구 연관성 추적 시스템 +-- - correlation_param_models: 파라미터 모델 마스터 +-- - gear_correlation_raw_metrics: raw 메트릭 (타임스탬프 파티셔닝, 7일 보존) +-- - gear_correlation_scores: 모델별 어피니티 점수 (상태 테이블) +-- - system_config: 런타임 설정 (파티션 보관기간 등) + +SET search_path TO kcg, public; + +-- ── 파라미터 모델 ── +CREATE TABLE IF NOT EXISTS kcg.correlation_param_models ( + id SERIAL PRIMARY KEY, + name VARCHAR(100) NOT NULL UNIQUE, + is_default BOOLEAN DEFAULT FALSE, + is_active BOOLEAN DEFAULT TRUE, + params JSONB NOT NULL, + description TEXT, + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW() +); + +-- default 모델 삽입 +INSERT INTO kcg.correlation_param_models (name, is_default, is_active, params, description) +VALUES ('default', TRUE, TRUE, + '{"alpha_base":0.30,"alpha_min":0.08,"alpha_decay_per_streak":0.005,"track_threshold":0.50,"polygon_threshold":0.70,"w_proximity":0.45,"w_visit":0.35,"w_activity":0.20,"w_dtw":0.30,"w_sog_corr":0.20,"w_heading":0.25,"w_prox_vv":0.25,"w_prox_persist":0.50,"w_drift":0.30,"w_signal_sync":0.20,"group_quiet_ratio":0.30,"normal_gap_hours":1.0,"decay_slow":0.015,"decay_fast":0.08,"stale_hours":6.0,"shadow_stay_bonus":0.10,"shadow_return_bonus":0.15,"candidate_radius_factor":3.0,"proximity_threshold_nm":5.0,"visit_threshold_nm":5.0,"night_bonus":1.3,"long_decay_days":7.0}', + '기본 추적 모델') +ON CONFLICT (name) DO NOTHING; + +-- ── Raw 메트릭 (모델 독립, 5분마다 기록, 타임스탬프 파티셔닝) ── +CREATE TABLE IF NOT EXISTS kcg.gear_correlation_raw_metrics ( + id BIGSERIAL, + observed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + group_key VARCHAR(100) NOT NULL, + target_mmsi VARCHAR(20) NOT NULL, + target_type VARCHAR(10) NOT NULL, + target_name VARCHAR(200), + + -- Raw 메트릭 (모든 모델이 공유) + proximity_ratio DOUBLE PRECISION, + visit_score DOUBLE PRECISION, + activity_sync DOUBLE PRECISION, + dtw_similarity DOUBLE PRECISION, + speed_correlation DOUBLE PRECISION, + heading_coherence DOUBLE PRECISION, + drift_similarity DOUBLE PRECISION, + + -- Shadow + shadow_stay BOOLEAN DEFAULT FALSE, + shadow_return BOOLEAN DEFAULT FALSE, + + -- 상태 + gear_group_active_ratio DOUBLE PRECISION, + + PRIMARY KEY (id, observed_at) +) PARTITION BY RANGE (observed_at); + +-- 일별 파티션 생성 함수 +CREATE OR REPLACE FUNCTION kcg.create_raw_metric_partitions(days_ahead INT DEFAULT 3) +RETURNS void AS $$ +DECLARE + d DATE; + partition_name TEXT; +BEGIN + FOR i IN 0..days_ahead LOOP + d := CURRENT_DATE + i; + partition_name := 'gear_correlation_raw_metrics_' || TO_CHAR(d, 'YYYYMMDD'); + IF NOT EXISTS ( + SELECT 1 FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE c.relname = partition_name AND n.nspname = 'kcg' + ) THEN + EXECUTE format( + 'CREATE TABLE IF NOT EXISTS kcg.%I PARTITION OF kcg.gear_correlation_raw_metrics + FOR VALUES FROM (%L) TO (%L)', + partition_name, d, d + 1 + ); + END IF; + END LOOP; +END; +$$ LANGUAGE plpgsql; + +-- 초기 파티션 생성 (오늘 + 3일) +SELECT kcg.create_raw_metric_partitions(3); + +-- raw_metrics 인덱스 +CREATE INDEX IF NOT EXISTS idx_raw_metrics_group_time + ON kcg.gear_correlation_raw_metrics (group_key, observed_at DESC); +CREATE INDEX IF NOT EXISTS idx_raw_metrics_target + ON kcg.gear_correlation_raw_metrics (target_mmsi, observed_at DESC); + +-- ── 어피니티 점수 (모델별 독립, 상태 테이블) ── +CREATE TABLE IF NOT EXISTS kcg.gear_correlation_scores ( + id BIGSERIAL PRIMARY KEY, + model_id INT NOT NULL REFERENCES kcg.correlation_param_models(id) ON DELETE CASCADE, + + group_key VARCHAR(100) NOT NULL, + target_mmsi VARCHAR(20) NOT NULL, + target_type VARCHAR(10) NOT NULL, + target_name VARCHAR(200), + + -- 모델별 점수 (EMA 결과) + current_score DOUBLE PRECISION DEFAULT 0, + streak_count INT DEFAULT 0, + observation_count INT DEFAULT 0, + + -- Shadow 축적 + shadow_bonus_total DOUBLE PRECISION DEFAULT 0, + shadow_stay_count INT DEFAULT 0, + shadow_return_count INT DEFAULT 0, + + -- 상태 + freeze_state VARCHAR(20) DEFAULT 'ACTIVE', + + -- 시간 + first_observed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + last_observed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + UNIQUE (model_id, group_key, target_mmsi) +); + +CREATE INDEX IF NOT EXISTS idx_gc_model_group + ON kcg.gear_correlation_scores (model_id, group_key, current_score DESC); +CREATE INDEX IF NOT EXISTS idx_gc_active + ON kcg.gear_correlation_scores (current_score DESC) + WHERE current_score >= 0.5; + +-- ── 시스템 설정 (런타임 변경 가능, 재시작 불필요) ── +CREATE TABLE IF NOT EXISTS kcg.system_config ( + key VARCHAR(100) PRIMARY KEY, + value JSONB NOT NULL, + description TEXT, + updated_at TIMESTAMPTZ DEFAULT NOW(), + updated_by VARCHAR(100) DEFAULT 'system' +); + +INSERT INTO kcg.system_config (key, value, description) VALUES + ('partition.raw_metrics.retention_days', '7', + 'raw_metrics 파티션 보관 기간 (일). 초과 시 파티션 DROP.'), + ('partition.raw_metrics.create_ahead_days', '3', + '미래 파티션 미리 생성 일수.'), + ('partition.scores.cleanup_days', '30', + '미관측 점수 레코드 정리 기간 (일).'), + ('correlation.max_active_models', '5', + '동시 활성 모델 최대 수.') +ON CONFLICT (key) DO NOTHING; diff --git a/prediction/algorithms/gear_correlation.py b/prediction/algorithms/gear_correlation.py new file mode 100644 index 0000000..edae548 --- /dev/null +++ b/prediction/algorithms/gear_correlation.py @@ -0,0 +1,783 @@ +"""어구 그룹 다단계 연관성 분석 — 멀티모델 패턴 추적. + +Phase 1: default 모델 1개로 동작 (DB에서 is_active=true 모델 로드). +Phase 2: 글로벌 모델 max 5개 병렬 실행. + +어구 중심 점수 체계: + - 어구 신호 기준 관측 윈도우 (어구 비활성 시 FREEZE) + - 선박 shadow 추적 (비활성 → 활성 전환 시 보너스) + - 적응형 EMA + streak 자기강화 + - 퍼센트 기반 무제한 추적 (50%+) +""" + +from __future__ import annotations + +import logging +import math +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Optional + +logger = logging.getLogger(__name__) + + +# ── 상수 ────────────────────────────────────────────────────────── +_EARTH_RADIUS_NM = 3440.065 +_NM_TO_M = 1852.0 + + +# ── 파라미터 모델 ───────────────────────────────────────────────── + +@dataclass +class ModelParams: + """추적 모델의 전체 파라미터셋.""" + + model_id: int = 1 + name: str = 'default' + + # EMA + alpha_base: float = 0.30 + alpha_min: float = 0.08 + alpha_decay_per_streak: float = 0.005 + + # 임계값 + track_threshold: float = 0.50 + polygon_threshold: float = 0.70 + + # 메트릭 가중치 — 어구-선박 + w_proximity: float = 0.45 + w_visit: float = 0.35 + w_activity: float = 0.20 + + # 메트릭 가중치 — 선박-선박 + w_dtw: float = 0.30 + w_sog_corr: float = 0.20 + w_heading: float = 0.25 + w_prox_vv: float = 0.25 + + # 메트릭 가중치 — 어구-어구 + w_prox_persist: float = 0.50 + w_drift: float = 0.30 + w_signal_sync: float = 0.20 + + # Freeze 기준 + group_quiet_ratio: float = 0.30 + normal_gap_hours: float = 1.0 + + # 감쇠 + decay_slow: float = 0.015 + decay_fast: float = 0.08 + stale_hours: float = 6.0 + + # Shadow + shadow_stay_bonus: float = 0.10 + shadow_return_bonus: float = 0.15 + + # 거리 + candidate_radius_factor: float = 3.0 + proximity_threshold_nm: float = 5.0 + visit_threshold_nm: float = 5.0 + + # 야간 + night_bonus: float = 1.3 + + # 장기 감쇠 + long_decay_days: float = 7.0 + + @classmethod + def from_db_row(cls, row: dict) -> ModelParams: + """DB correlation_param_models 행에서 생성.""" + params_json = row.get('params', {}) + return cls( + model_id=row['id'], + name=row['name'], + **{k: v for k, v in params_json.items() if hasattr(cls, k)}, + ) + + +# ── Haversine 거리 ──────────────────────────────────────────────── + +def _haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float: + """두 좌표 간 거리 (해리).""" + phi1 = math.radians(lat1) + phi2 = math.radians(lat2) + dphi = math.radians(lat2 - lat1) + dlam = math.radians(lon2 - lon1) + a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2 + return _EARTH_RADIUS_NM * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) + + +# ── Freeze 판단 ─────────────────────────────────────────────────── + +def should_freeze( + gear_group_active_ratio: float, + target_last_observed: Optional[datetime], + now: datetime, + params: ModelParams, +) -> tuple[bool, str]: + """감쇠 적용 여부 판단. 어구 그룹이 기준.""" + # 1. 어구 그룹 비활성 → 비교 불가 + if gear_group_active_ratio < params.group_quiet_ratio: + return True, 'GROUP_QUIET' + + # 2. 개별 부재가 정상 범위 + if target_last_observed is not None: + hours_absent = (now - target_last_observed).total_seconds() / 3600 + if hours_absent < params.normal_gap_hours: + return True, 'NORMAL_GAP' + + return False, 'ACTIVE' + + +# ── EMA 업데이트 ────────────────────────────────────────────────── + +def update_score( + prev_score: Optional[float], + raw_score: Optional[float], + streak: int, + last_observed: Optional[datetime], + now: datetime, + gear_group_active_ratio: float, + shadow_bonus: float, + params: ModelParams, +) -> tuple[float, int, str]: + """적응형 EMA 점수 업데이트. + + Returns: (new_score, new_streak, state) + """ + # 관측 불가 + if raw_score is None: + frz, reason = should_freeze( + gear_group_active_ratio, last_observed, now, params, + ) + if frz: + return (prev_score or 0.0), streak, reason + + # 실제 이탈 → 감쇠 + hours_absent = 0.0 + if last_observed is not None: + hours_absent = (now - last_observed).total_seconds() / 3600 + decay = params.decay_fast if hours_absent > params.stale_hours else params.decay_slow + return max(0.0, (prev_score or 0.0) - decay), 0, 'SIGNAL_LOSS' + + # Shadow 보너스 + adjusted = min(1.0, raw_score + shadow_bonus) + + # Case 1: 임계값 이상 → streak 보상 + if adjusted >= params.track_threshold: + streak += 1 + alpha = max(params.alpha_min, + params.alpha_base - streak * params.alpha_decay_per_streak) + if prev_score is None: + return adjusted, streak, 'ACTIVE' + return alpha * adjusted + (1.0 - alpha) * prev_score, streak, 'ACTIVE' + + # Case 2: 패턴 이탈 + alpha = params.alpha_base + if prev_score is None: + return adjusted, 0, 'PATTERN_DIVERGE' + return alpha * adjusted + (1.0 - alpha) * prev_score, 0, 'PATTERN_DIVERGE' + + +# ── 어구-선박 메트릭 ────────────────────────────────────────────── + +def _compute_gear_vessel_metrics( + gear_center_lat: float, + gear_center_lon: float, + gear_radius_nm: float, + vessel_track: list[dict], + params: ModelParams, +) -> dict: + """어구 그룹 중심 vs 선박 궤적 메트릭. + + vessel_track: [{lat, lon, sog, cog, timestamp}, ...] + """ + if not vessel_track: + return {'proximity_ratio': 0, 'visit_score': 0, 'activity_sync': 0, 'composite': 0} + + threshold_nm = max(gear_radius_nm * 2, params.proximity_threshold_nm) + + # 1. proximity_ratio — 근접 지속비 + close_count = 0 + for p in vessel_track: + d = _haversine_nm(gear_center_lat, gear_center_lon, p['lat'], p['lon']) + if d < threshold_nm: + close_count += 1 + proximity_ratio = close_count / len(vessel_track) + + # 2. visit_score — 방문 패턴 + visit_threshold = params.visit_threshold_nm + in_zone = False + visits = 0 + stay_points = 0 + away_points = 0 + + for p in vessel_track: + d = _haversine_nm(gear_center_lat, gear_center_lon, p['lat'], p['lon']) + if d < visit_threshold: + if not in_zone: + visits += 1 + in_zone = True + stay_points += 1 + else: + in_zone = False + away_points += 1 + + visit_count_norm = min(1.0, visits / 5.0) if visits > 0 else 0.0 + total = stay_points + away_points + stay_ratio = stay_points / total if total > 0 else 0.0 + visit_score = 0.5 * visit_count_norm + 0.5 * stay_ratio + + # 3. activity_sync — 영역 내 저속 비율 (조업/관리 행위) + in_zone_count = 0 + in_zone_slow = 0 + for p in vessel_track: + d = _haversine_nm(gear_center_lat, gear_center_lon, p['lat'], p['lon']) + if d < visit_threshold: + in_zone_count += 1 + if p.get('sog', 0) < 2.0: + in_zone_slow += 1 + activity_sync = in_zone_slow / in_zone_count if in_zone_count > 0 else 0.0 + + # 가중 합산 + composite = ( + params.w_proximity * proximity_ratio + + params.w_visit * visit_score + + params.w_activity * activity_sync + ) + + return { + 'proximity_ratio': round(proximity_ratio, 4), + 'visit_score': round(visit_score, 4), + 'activity_sync': round(activity_sync, 4), + 'composite': round(composite, 4), + } + + +# ── 선박-선박 메트릭 ────────────────────────────────────────────── + +def _compute_vessel_vessel_metrics( + track_a: list[dict], + track_b: list[dict], + params: ModelParams, +) -> dict: + """두 선박 궤적 간 메트릭.""" + from algorithms.track_similarity import ( + compute_heading_coherence, + compute_proximity_ratio, + compute_sog_correlation, + compute_track_similarity, + ) + + if not track_a or not track_b: + return { + 'dtw_similarity': 0, 'speed_correlation': 0, + 'heading_coherence': 0, 'proximity_ratio': 0, 'composite': 0, + } + + # DTW + pts_a = [(p['lat'], p['lon']) for p in track_a] + pts_b = [(p['lat'], p['lon']) for p in track_b] + dtw_sim = compute_track_similarity(pts_a, pts_b) + + # SOG 상관 + sog_a = [p.get('sog', 0) for p in track_a] + sog_b = [p.get('sog', 0) for p in track_b] + sog_corr = compute_sog_correlation(sog_a, sog_b) + + # COG 동조 + cog_a = [p.get('cog', 0) for p in track_a] + cog_b = [p.get('cog', 0) for p in track_b] + heading = compute_heading_coherence(cog_a, cog_b) + + # 근접비 + prox = compute_proximity_ratio(pts_a, pts_b, params.proximity_threshold_nm) + + composite = ( + params.w_dtw * dtw_sim + + params.w_sog_corr * sog_corr + + params.w_heading * heading + + params.w_prox_vv * prox + ) + + return { + 'dtw_similarity': round(dtw_sim, 4), + 'speed_correlation': round(sog_corr, 4), + 'heading_coherence': round(heading, 4), + 'proximity_ratio': round(prox, 4), + 'composite': round(composite, 4), + } + + +# ── 어구-어구 메트릭 ────────────────────────────────────────────── + +def _compute_gear_gear_metrics( + center_a: tuple[float, float], + center_b: tuple[float, float], + center_history_a: list[dict], + center_history_b: list[dict], + params: ModelParams, +) -> dict: + """두 어구 그룹 간 메트릭.""" + if not center_history_a or not center_history_b: + return { + 'proximity_ratio': 0, 'drift_similarity': 0, + 'composite': 0, + } + + # 1. 근접 지속성 — 현재 중심 간 거리의 안정성 + dist_nm = _haversine_nm(center_a[0], center_a[1], center_b[0], center_b[1]) + prox_persist = max(0.0, 1.0 - dist_nm / 20.0) # 20NM 이상이면 0 + + # 2. 표류 유사도 — 중심 이동 벡터 코사인 유사도 + drift_sim = 0.0 + n = min(len(center_history_a), len(center_history_b)) + if n >= 2: + # 마지막 2점으로 이동 벡터 계산 + da_lat = center_history_a[-1].get('lat', 0) - center_history_a[-2].get('lat', 0) + da_lon = center_history_a[-1].get('lon', 0) - center_history_a[-2].get('lon', 0) + db_lat = center_history_b[-1].get('lat', 0) - center_history_b[-2].get('lat', 0) + db_lon = center_history_b[-1].get('lon', 0) - center_history_b[-2].get('lon', 0) + + dot = da_lat * db_lat + da_lon * db_lon + mag_a = (da_lat ** 2 + da_lon ** 2) ** 0.5 + mag_b = (db_lat ** 2 + db_lon ** 2) ** 0.5 + if mag_a > 1e-10 and mag_b > 1e-10: + cos_sim = dot / (mag_a * mag_b) + drift_sim = max(0.0, (cos_sim + 1.0) / 2.0) + + composite = ( + params.w_prox_persist * prox_persist + + params.w_drift * drift_sim + ) + + return { + 'proximity_ratio': round(prox_persist, 4), + 'drift_similarity': round(drift_sim, 4), + 'composite': round(composite, 4), + } + + +# ── Shadow 보너스 계산 ──────────────────────────────────────────── + +def compute_shadow_bonus( + vessel_positions_during_inactive: list[dict], + last_known_gear_center: tuple[float, float], + group_radius_nm: float, + params: ModelParams, +) -> tuple[float, bool, bool]: + """어구 비활성 동안 선박이 어구 근처에 머물렀는지 평가. + + Returns: (bonus, stayed_nearby, returned_before_resume) + """ + if not vessel_positions_during_inactive or last_known_gear_center is None: + return 0.0, False, False + + gc_lat, gc_lon = last_known_gear_center + threshold_nm = max(group_radius_nm * 2, params.proximity_threshold_nm) + + # 1. 평균 거리 + dists = [ + _haversine_nm(gc_lat, gc_lon, p['lat'], p['lon']) + for p in vessel_positions_during_inactive + ] + avg_dist = sum(dists) / len(dists) + stayed = avg_dist < threshold_nm + + # 2. 마지막 위치가 근처인지 (복귀 판단) + returned = dists[-1] < threshold_nm if dists else False + + bonus = 0.0 + if stayed: + bonus += params.shadow_stay_bonus + if returned: + bonus += params.shadow_return_bonus + + return bonus, stayed, returned + + +# ── 후보 필터링 ─────────────────────────────────────────────────── + +def _compute_group_radius(members: list[dict]) -> float: + """그룹 멤버 간 최대 거리의 절반 (NM).""" + if len(members) < 2: + return 1.0 # 최소 1NM + + max_dist = 0.0 + for i in range(len(members)): + for j in range(i + 1, len(members)): + d = _haversine_nm( + members[i]['lat'], members[i]['lon'], + members[j]['lat'], members[j]['lon'], + ) + if d > max_dist: + max_dist = d + + return max(1.0, max_dist / 2.0) + + +def find_candidates( + gear_center_lat: float, + gear_center_lon: float, + group_radius_nm: float, + group_mmsis: set[str], + all_positions: dict[str, dict], + params: ModelParams, +) -> list[str]: + """어구 그룹 주변 후보 MMSI 필터링.""" + search_radius = group_radius_nm * params.candidate_radius_factor + candidates = [] + + for mmsi, pos in all_positions.items(): + if mmsi in group_mmsis: + continue + d = _haversine_nm(gear_center_lat, gear_center_lon, pos['lat'], pos['lon']) + if d < search_radius: + candidates.append(mmsi) + + return candidates + + +# ── 메인 실행 ───────────────────────────────────────────────────── + +def _get_vessel_track(vessel_store, mmsi: str, hours: int = 6) -> list[dict]: + """vessel_store에서 특정 MMSI의 최근 N시간 궤적 추출 (벡터화).""" + df = vessel_store._tracks.get(mmsi) + if df is None or len(df) == 0: + return [] + + import pandas as pd + now = datetime.now(timezone.utc) + cutoff = now - pd.Timedelta(hours=hours) + + ts_col = df['timestamp'] + if hasattr(ts_col.dtype, 'tz') and ts_col.dtype.tz is not None: + mask = ts_col >= pd.Timestamp(cutoff) + else: + mask = ts_col >= pd.Timestamp(cutoff.replace(tzinfo=None)) + + recent = df.loc[mask] + if recent.empty: + return [] + + # 벡터화 추출 (iterrows 대신) + lats = recent['lat'].values + lons = recent['lon'].values + sogs = (recent['sog'] if 'sog' in recent.columns + else recent.get('raw_sog', pd.Series(dtype=float))).fillna(0).values + cogs = (recent['cog'] if 'cog' in recent.columns + else pd.Series(0, index=recent.index)).fillna(0).values + + return [ + {'lat': float(lats[i]), 'lon': float(lons[i]), + 'sog': float(sogs[i]), 'cog': float(cogs[i])} + for i in range(len(lats)) + ] + + +def _compute_gear_active_ratio( + gear_members: list[dict], + all_positions: dict[str, dict], + now: datetime, + stale_sec: float = 21600, +) -> float: + """어구 그룹의 활성 멤버 비율.""" + if not gear_members: + return 0.0 + + active = 0 + for m in gear_members: + pos = all_positions.get(m['mmsi']) + if pos is None: + continue + ts = pos.get('timestamp') + if ts is None: + continue + if isinstance(ts, datetime): + last_dt = ts if ts.tzinfo is not None else ts.replace(tzinfo=timezone.utc) + else: + try: + import pandas as pd + last_dt = pd.Timestamp(ts).to_pydatetime() + if last_dt.tzinfo is None: + last_dt = last_dt.replace(tzinfo=timezone.utc) + except Exception: + continue + age = (now - last_dt).total_seconds() + if age < stale_sec: + active += 1 + + return active / len(gear_members) + + +def _is_gear_pattern(name: str) -> bool: + """어구 이름 패턴 판별.""" + import re + return bool(re.match(r'^.+_\d+_\d*$', name or '')) + + +_MAX_CANDIDATES_PER_GROUP = 30 # 후보 수 상한 (성능 보호) + + +def run_gear_correlation( + vessel_store, + gear_groups: list[dict], + conn, +) -> dict: + """어구 연관성 분석 메인 실행 (배치 최적화). + + Args: + vessel_store: VesselStore 인스턴스 + gear_groups: detect_gear_groups() 결과 + conn: kcgdb 커넥션 + + Returns: + {'updated': int, 'models': int, 'raw_inserted': int} + """ + import time as _time + import re as _re + + _gear_re = _re.compile(r'^.+_\d+_\d*$') + + t0 = _time.time() + now = datetime.now(timezone.utc) + all_positions = vessel_store.get_all_latest_positions() + + # 활성 모델 로드 + models = _load_active_models(conn) + if not models: + logger.warning('no active correlation models found') + return {'updated': 0, 'models': 0, 'raw_inserted': 0} + + # 기존 점수 전체 사전 로드 (건별 쿼리 대신 벌크) + all_scores = _load_all_scores(conn) + + raw_batch: list[tuple] = [] + score_batch: list[tuple] = [] + total_updated = 0 + total_raw = 0 + + default_params = models[0] + + for gear_group in gear_groups: + parent_name = gear_group['parent_name'] + members = gear_group['members'] + if not members: + continue + + # 그룹 중심 + 반경 + center_lat = sum(m['lat'] for m in members) / len(members) + center_lon = sum(m['lon'] for m in members) / len(members) + group_radius = _compute_group_radius(members) + + # 어구 활성도 + active_ratio = _compute_gear_active_ratio(members, all_positions, now) + + # 그룹 멤버 MMSI 셋 + group_mmsis = {m['mmsi'] for m in members} + if gear_group.get('parent_mmsi'): + group_mmsis.add(gear_group['parent_mmsi']) + + # 후보 필터링 + 수 제한 + candidates = find_candidates( + center_lat, center_lon, group_radius, + group_mmsis, all_positions, default_params, + ) + if not candidates: + continue + if len(candidates) > _MAX_CANDIDATES_PER_GROUP: + # 가까운 순서로 제한 + candidates.sort(key=lambda m: _haversine_nm( + center_lat, center_lon, + all_positions[m]['lat'], all_positions[m]['lon'], + )) + candidates = candidates[:_MAX_CANDIDATES_PER_GROUP] + + for target_mmsi in candidates: + target_pos = all_positions.get(target_mmsi) + if target_pos is None: + continue + + target_name = target_pos.get('name', '') + target_is_gear = bool(_gear_re.match(target_name or '')) + target_type = 'GEAR_BUOY' if target_is_gear else 'VESSEL' + + # 메트릭 계산 (어구는 단순 거리, 선박은 track 기반) + if target_is_gear: + d = _haversine_nm(center_lat, center_lon, + target_pos['lat'], target_pos['lon']) + prox = max(0.0, 1.0 - d / 20.0) + metrics = {'proximity_ratio': prox, 'composite': prox} + else: + vessel_track = _get_vessel_track(vessel_store, target_mmsi, hours=6) + metrics = _compute_gear_vessel_metrics( + center_lat, center_lon, group_radius, + vessel_track, default_params, + ) + + # raw 메트릭 배치 수집 + raw_batch.append(( + now, parent_name, target_mmsi, target_type, target_name, + metrics.get('proximity_ratio'), metrics.get('visit_score'), + metrics.get('activity_sync'), metrics.get('dtw_similarity'), + metrics.get('speed_correlation'), metrics.get('heading_coherence'), + metrics.get('drift_similarity'), False, False, active_ratio, + )) + total_raw += 1 + + # 모델별 EMA 업데이트 + for model in models: + if target_is_gear: + composite = metrics.get('proximity_ratio', 0) * model.w_prox_persist + else: + composite = ( + model.w_proximity * (metrics.get('proximity_ratio') or 0) + + model.w_visit * (metrics.get('visit_score') or 0) + + model.w_activity * (metrics.get('activity_sync') or 0) + ) + + # 사전 로드된 점수에서 조회 (DB 쿼리 없음) + score_key = (model.model_id, parent_name, target_mmsi) + prev = all_scores.get(score_key) + prev_score = prev['current_score'] if prev else None + streak = prev['streak_count'] if prev else 0 + last_obs = prev['last_observed_at'] if prev else None + + new_score, new_streak, state = update_score( + prev_score, composite, streak, + last_obs, now, active_ratio, + 0.0, model, + ) + + if new_score >= model.track_threshold or prev is not None: + score_batch.append(( + model.model_id, parent_name, target_mmsi, + target_type, target_name, + round(new_score, 6), new_streak, state, + now, now, now, + )) + total_updated += 1 + + # 배치 DB 저장 + _batch_insert_raw(conn, raw_batch) + _batch_upsert_scores(conn, score_batch) + conn.commit() + + elapsed = round(_time.time() - t0, 2) + logger.info( + 'gear correlation internals: %.2fs, %d groups, %d raw, %d scores, %d models', + elapsed, len(gear_groups), total_raw, total_updated, len(models), + ) + + return { + 'updated': total_updated, + 'models': len(models), + 'raw_inserted': total_raw, + } + + +# ── DB 헬퍼 (배치 최적화) ───────────────────────────────────────── + +def _load_active_models(conn) -> list[ModelParams]: + """활성 모델 로드.""" + cur = conn.cursor() + try: + cur.execute( + "SELECT id, name, params FROM kcg.correlation_param_models " + "WHERE is_active = TRUE ORDER BY is_default DESC, id ASC" + ) + rows = cur.fetchall() + models = [] + for row in rows: + import json + params = row[2] if isinstance(row[2], dict) else json.loads(row[2]) + models.append(ModelParams.from_db_row({ + 'id': row[0], 'name': row[1], 'params': params, + })) + return models + except Exception as e: + logger.error('failed to load models: %s', e) + return [ModelParams()] + finally: + cur.close() + + +def _load_all_scores(conn) -> dict[tuple, dict]: + """모든 점수를 사전 로드. {(model_id, group_key, target_mmsi): {...}}""" + cur = conn.cursor() + try: + cur.execute( + "SELECT model_id, group_key, target_mmsi, " + "current_score, streak_count, last_observed_at " + "FROM kcg.gear_correlation_scores" + ) + result = {} + for row in cur.fetchall(): + key = (row[0], row[1], row[2]) + result[key] = { + 'current_score': row[3], + 'streak_count': row[4], + 'last_observed_at': row[5], + } + return result + except Exception as e: + logger.warning('failed to load all scores: %s', e) + return {} + finally: + cur.close() + + +def _batch_insert_raw(conn, batch: list[tuple]): + """raw 메트릭 배치 INSERT.""" + if not batch: + return + cur = conn.cursor() + try: + from psycopg2.extras import execute_values + execute_values( + cur, + """INSERT INTO kcg.gear_correlation_raw_metrics + (observed_at, group_key, target_mmsi, target_type, target_name, + proximity_ratio, visit_score, activity_sync, + dtw_similarity, speed_correlation, heading_coherence, + drift_similarity, shadow_stay, shadow_return, + gear_group_active_ratio) + VALUES %s""", + batch, + page_size=500, + ) + except Exception as e: + logger.warning('batch insert raw failed: %s', e) + finally: + cur.close() + + +def _batch_upsert_scores(conn, batch: list[tuple]): + """점수 배치 UPSERT.""" + if not batch: + return + cur = conn.cursor() + try: + from psycopg2.extras import execute_values + execute_values( + cur, + """INSERT INTO kcg.gear_correlation_scores + (model_id, group_key, target_mmsi, target_type, target_name, + current_score, streak_count, freeze_state, + first_observed_at, last_observed_at, updated_at) + VALUES %s + ON CONFLICT (model_id, group_key, target_mmsi) + DO UPDATE SET + current_score = EXCLUDED.current_score, + streak_count = EXCLUDED.streak_count, + freeze_state = EXCLUDED.freeze_state, + observation_count = kcg.gear_correlation_scores.observation_count + 1, + last_observed_at = EXCLUDED.last_observed_at, + updated_at = EXCLUDED.updated_at""", + batch, + page_size=500, + ) + except Exception as e: + logger.warning('batch upsert scores failed: %s', e) + finally: + cur.close() diff --git a/prediction/algorithms/track_similarity.py b/prediction/algorithms/track_similarity.py index 0212f98..6a4b24a 100644 --- a/prediction/algorithms/track_similarity.py +++ b/prediction/algorithms/track_similarity.py @@ -158,3 +158,87 @@ def match_gear_by_track( }) return results + + +def compute_sog_correlation( + sog_a: list[float], + sog_b: list[float], +) -> float: + """두 SOG 시계열의 피어슨 상관계수 (0~1 정규화). + + 시계열 길이가 다르면 짧은 쪽 기준으로 자름. + 데이터 부족(< 3점)이면 0.0 반환. + """ + n = min(len(sog_a), len(sog_b)) + if n < 3: + return 0.0 + + a = sog_a[:n] + b = sog_b[:n] + + mean_a = sum(a) / n + mean_b = sum(b) / n + + cov = sum((a[i] - mean_a) * (b[i] - mean_b) for i in range(n)) + var_a = sum((x - mean_a) ** 2 for x in a) + var_b = sum((x - mean_b) ** 2 for x in b) + + denom = (var_a * var_b) ** 0.5 + if denom < 1e-12: + return 0.0 + + corr = cov / denom # -1 ~ 1 + return max(0.0, (corr + 1.0) / 2.0) # 0 ~ 1 정규화 + + +def compute_heading_coherence( + cog_a: list[float], + cog_b: list[float], + threshold_deg: float = 30.0, +) -> float: + """두 COG 시계열의 방향 동조율 (0~1). + + angular diff < threshold_deg 인 비율. + 시계열 길이가 다르면 짧은 쪽 기준. + 데이터 부족(< 3점)이면 0.0 반환. + """ + n = min(len(cog_a), len(cog_b)) + if n < 3: + return 0.0 + + coherent = 0 + for i in range(n): + diff = abs(cog_a[i] - cog_b[i]) + if diff > 180.0: + diff = 360.0 - diff + if diff < threshold_deg: + coherent += 1 + + return coherent / n + + +def compute_proximity_ratio( + track_a: list[tuple[float, float]], + track_b: list[tuple[float, float]], + threshold_nm: float = 10.0, +) -> float: + """두 궤적의 근접 지속비 (0~1). + + 시간 정렬된 포인트 쌍에서 haversine < threshold_nm 비율. + 시계열 길이가 다르면 짧은 쪽 기준. + 데이터 부족(< 2점)이면 0.0 반환. + """ + n = min(len(track_a), len(track_b)) + if n < 2: + return 0.0 + + close = 0 + threshold_m = threshold_nm * 1852.0 + + for i in range(n): + dist = haversine_m(track_a[i][0], track_a[i][1], + track_b[i][0], track_b[i][1]) + if dist < threshold_m: + close += 1 + + return close / n diff --git a/prediction/chat/tools.py b/prediction/chat/tools.py index 766c260..f863ed4 100644 --- a/prediction/chat/tools.py +++ b/prediction/chat/tools.py @@ -197,6 +197,9 @@ def execute_tool_call(call: dict) -> str: if tool == 'get_knowledge': return _get_knowledge(params) + if tool == 'query_gear_correlation': + return _query_gear_correlation(params) + return f'(알 수 없는 도구: {tool})' @@ -357,3 +360,54 @@ def _query_vessel_static(params: dict) -> str: except Exception as e: logger.error('vessel static query failed: %s', e) return f'\n(정적정보 조회 실패: {e})\n' + + +def _query_gear_correlation(params: dict) -> str: + """어구 그룹의 연관 선박/어구 조회.""" + from db import kcgdb + + group_key = params.get('group_key', '') + limit = int(params.get('limit', 10)) + + with kcgdb.get_conn() as conn: + cur = conn.cursor() + try: + cur.execute( + 'SELECT target_name, target_mmsi, target_type, current_score, ' + 'streak_count, observation_count, proximity_ratio, visit_score, ' + 'heading_coherence, freeze_state ' + 'FROM kcg.gear_correlation_scores s ' + 'JOIN kcg.correlation_param_models m ON s.model_id = m.id ' + 'WHERE s.group_key = %s AND m.is_default = TRUE AND s.current_score >= 0.3 ' + 'ORDER BY s.current_score DESC LIMIT %s', + (group_key, limit), + ) + rows = cur.fetchall() + except Exception: + return f'어구 그룹 "{group_key}"에 대한 연관성 데이터가 없습니다 (테이블 미생성).' + finally: + cur.close() + + if not rows: + return f'어구 그룹 "{group_key}"에 대한 연관성 데이터가 없습니다.' + + lines = [f'## {group_key} 연관 분석 (상위 {len(rows)}개, default 모델)'] + for r in rows: + name, mmsi, ttype, score, streak, obs, prox, visit, heading, state = r + pct = score * 100 + disp_name = name or mmsi + detail_parts = [] + if prox is not None: + detail_parts.append(f'근접 {prox*100:.0f}%') + if visit is not None: + detail_parts.append(f'방문 {visit*100:.0f}%') + if heading is not None: + detail_parts.append(f'COG동조 {heading*100:.0f}%') + detail = ', '.join(detail_parts) if detail_parts else '' + + lines.append( + f'- **{disp_name}** ({mmsi}, {ttype}): ' + f'일치율 {pct:.1f}% (연속 {streak}회, 관측 {obs}회) ' + f'[{detail}] 상태: {state}' + ) + return '\n'.join(lines) diff --git a/prediction/db/partition_manager.py b/prediction/db/partition_manager.py new file mode 100644 index 0000000..eb5dec8 --- /dev/null +++ b/prediction/db/partition_manager.py @@ -0,0 +1,136 @@ +"""gear_correlation_raw_metrics 파티션 유지보수. + +APScheduler 일별 작업으로 실행: +- system_config에서 설정 읽기 (hot-reload, 프로세스 재시작 불필요) +- 미래 파티션 미리 생성 +- 만료 파티션 DROP +- 미관측 점수 레코드 정리 +""" + +import logging +from datetime import date, datetime, timedelta + +logger = logging.getLogger(__name__) + + +def _get_config_int(conn, key: str, default: int) -> int: + """system_config에서 설정값 조회. 없으면 default.""" + cur = conn.cursor() + try: + cur.execute( + "SELECT value::text FROM kcg.system_config WHERE key = %s", + (key,), + ) + row = cur.fetchone() + return int(row[0].strip('"')) if row else default + except Exception: + return default + finally: + cur.close() + + +def _create_future_partitions(conn, days_ahead: int) -> int: + """미래 N일 파티션 생성. 반환: 생성된 파티션 수.""" + cur = conn.cursor() + created = 0 + try: + for i in range(days_ahead + 1): + d = date.today() + timedelta(days=i) + partition_name = f'gear_correlation_raw_metrics_{d.strftime("%Y%m%d")}' + cur.execute( + "SELECT 1 FROM pg_class c " + "JOIN pg_namespace n ON n.oid = c.relnamespace " + "WHERE c.relname = %s AND n.nspname = 'kcg'", + (partition_name,), + ) + if cur.fetchone() is None: + next_d = d + timedelta(days=1) + cur.execute( + f"CREATE TABLE IF NOT EXISTS kcg.{partition_name} " + f"PARTITION OF kcg.gear_correlation_raw_metrics " + f"FOR VALUES FROM ('{d.isoformat()}') TO ('{next_d.isoformat()}')" + ) + created += 1 + logger.info('created partition: kcg.%s', partition_name) + conn.commit() + except Exception as e: + conn.rollback() + logger.error('failed to create partitions: %s', e) + finally: + cur.close() + return created + + +def _drop_expired_partitions(conn, retention_days: int) -> int: + """retention_days 초과 파티션 DROP. 반환: 삭제된 파티션 수.""" + cutoff = date.today() - timedelta(days=retention_days) + cur = conn.cursor() + dropped = 0 + try: + cur.execute( + "SELECT c.relname FROM pg_class c " + "JOIN pg_namespace n ON n.oid = c.relnamespace " + "WHERE c.relname LIKE 'gear_correlation_raw_metrics_%%' " + "AND n.nspname = 'kcg' AND c.relkind = 'r'" + ) + for (name,) in cur.fetchall(): + date_str = name.rsplit('_', 1)[-1] + try: + partition_date = datetime.strptime(date_str, '%Y%m%d').date() + except ValueError: + continue + if partition_date < cutoff: + cur.execute(f'DROP TABLE IF EXISTS kcg.{name}') + dropped += 1 + logger.info('dropped expired partition: kcg.%s', name) + conn.commit() + except Exception as e: + conn.rollback() + logger.error('failed to drop partitions: %s', e) + finally: + cur.close() + return dropped + + +def _cleanup_stale_scores(conn, cleanup_days: int) -> int: + """cleanup_days 이상 미관측 점수 레코드 삭제.""" + cur = conn.cursor() + try: + cur.execute( + "DELETE FROM kcg.gear_correlation_scores " + "WHERE last_observed_at < NOW() - make_interval(days => %s)", + (cleanup_days,), + ) + deleted = cur.rowcount + conn.commit() + return deleted + except Exception as e: + conn.rollback() + logger.error('failed to cleanup stale scores: %s', e) + return 0 + finally: + cur.close() + + +def maintain_partitions(): + """일별 파티션 유지보수 — 스케줄러에서 호출. + + system_config에서 설정을 매번 읽으므로 + API를 통한 설정 변경이 다음 실행 시 즉시 반영됨. + """ + from db import kcgdb + + with kcgdb.get_conn() as conn: + retention = _get_config_int(conn, 'partition.raw_metrics.retention_days', 7) + ahead = _get_config_int(conn, 'partition.raw_metrics.create_ahead_days', 3) + cleanup_days = _get_config_int(conn, 'partition.scores.cleanup_days', 30) + + created = _create_future_partitions(conn, ahead) + dropped = _drop_expired_partitions(conn, retention) + cleaned = _cleanup_stale_scores(conn, cleanup_days) + + logger.info( + 'partition maintenance: %d created, %d dropped, %d stale scores cleaned ' + '(retention=%dd, ahead=%dd, cleanup=%dd)', + created, dropped, cleaned, retention, ahead, cleanup_days, + ) diff --git a/prediction/fleet_tracker.py b/prediction/fleet_tracker.py index 26788f3..f1c83d7 100644 --- a/prediction/fleet_tracker.py +++ b/prediction/fleet_tracker.py @@ -210,6 +210,22 @@ class FleetTracker: ) logger.info('gear MMSI change: %s → %s (name=%s)', old_mmsi_row[1], mmsi, name) + # 어피니티 점수 이전 (이전 MMSI → 새 MMSI) + try: + cur.execute( + "UPDATE kcg.gear_correlation_scores " + "SET target_mmsi = %s, updated_at = NOW() " + "WHERE target_mmsi = %s", + (mmsi, old_mmsi_row[1]), + ) + if cur.rowcount > 0: + logger.info( + 'transferred %d affinity scores: %s → %s', + cur.rowcount, old_mmsi_row[1], mmsi, + ) + except Exception as e: + logger.warning('affinity score transfer failed: %s', e) + cur.execute( """INSERT INTO kcg.gear_identity_log (mmsi, name, parent_name, parent_mmsi, parent_vessel_id, diff --git a/prediction/scheduler.py b/prediction/scheduler.py index d463098..8cecb17 100644 --- a/prediction/scheduler.py +++ b/prediction/scheduler.py @@ -99,6 +99,8 @@ def run_analysis_cycle(): fleet_tracker.save_snapshot(vessel_dfs, kcg_conn) + gear_groups = [] + # 4.5 그룹 폴리곤 생성 + 저장 try: from algorithms.polygon_builder import detect_gear_groups, build_all_group_snapshots @@ -116,6 +118,23 @@ def run_analysis_cycle(): except Exception as e: logger.warning('group polygon generation failed: %s', e) + # 4.7 어구 연관성 분석 (멀티모델 패턴 추적) + try: + from algorithms.gear_correlation import run_gear_correlation + + corr_result = run_gear_correlation( + vessel_store=vessel_store, + gear_groups=gear_groups, + conn=kcg_conn, + ) + logger.info( + 'gear correlation: %d scores updated, %d raw metrics, %d models', + corr_result['updated'], corr_result['raw_inserted'], + corr_result['models'], + ) + except Exception as e: + logger.warning('gear correlation failed: %s', e) + # 5. 선박별 추가 알고리즘 → AnalysisResult 생성 results = [] for c in classifications: @@ -329,6 +348,15 @@ def start_scheduler(): max_instances=1, replace_existing=True, ) + # 파티션 유지보수 (매일 04:00) + from db.partition_manager import maintain_partitions + _scheduler.add_job( + maintain_partitions, + 'cron', hour=4, minute=0, + id='partition_maintenance', + max_instances=1, + replace_existing=True, + ) _scheduler.start() logger.info('scheduler started (interval=%dm)', settings.SCHEDULER_INTERVAL_MIN)