diff --git a/prediction/db/partition_manager.py b/prediction/db/partition_manager.py index 9941229..636b434 100644 --- a/prediction/db/partition_manager.py +++ b/prediction/db/partition_manager.py @@ -17,6 +17,7 @@ logger = logging.getLogger(__name__) SYSTEM_CONFIG = qualified_table('system_config') GEAR_CORRELATION_RAW_METRICS = qualified_table('gear_correlation_raw_metrics') GEAR_CORRELATION_SCORES = qualified_table('gear_correlation_scores') +DETECTION_MODEL_RUN_OUTPUTS = qualified_table('detection_model_run_outputs') def _get_config_int(conn, key: str, default: int) -> int: @@ -99,6 +100,100 @@ def _drop_expired_partitions(conn, retention_days: int) -> int: return dropped +def _create_future_monthly_detection_partitions(conn, months_ahead: int) -> int: + """detection_model_run_outputs 미래 N개월 파티션 생성. + + 월별 RANGE 파티션 (cycle_started_at) — V034 에서 2026-04/05 가 Flyway 로 선생성. + 이후는 이 함수가 매일 돌면서 `months_ahead` 만큼 미리 생성. + + Returns: + 생성된 파티션 수 + """ + cur = conn.cursor() + created = 0 + try: + anchor = date.today().replace(day=1) + for i in range(months_ahead + 1): + # anchor 기준 +i 개월 + y = anchor.year + (anchor.month - 1 + i) // 12 + m = (anchor.month - 1 + i) % 12 + 1 + start = date(y, m, 1) + ny = y + (1 if m == 12 else 0) + nm = 1 if m == 12 else m + 1 + end = date(ny, nm, 1) + partition_name = f'detection_model_run_outputs_{y:04d}_{m:02d}' + cur.execute( + "SELECT 1 FROM pg_class c " + "JOIN pg_namespace n ON n.oid = c.relnamespace " + "WHERE c.relname = %s AND n.nspname = %s", + (partition_name, settings.KCGDB_SCHEMA), + ) + if cur.fetchone() is None: + cur.execute( + f"CREATE TABLE IF NOT EXISTS {qualified_table(partition_name)} " + f"PARTITION OF {DETECTION_MODEL_RUN_OUTPUTS} " + f"FOR VALUES FROM ('{start.isoformat()}') TO ('{end.isoformat()}')" + ) + created += 1 + logger.info( + 'created partition: %s.%s', settings.KCGDB_SCHEMA, partition_name, + ) + conn.commit() + except Exception as e: + conn.rollback() + logger.error('failed to create detection_model_run_outputs partitions: %s', e) + finally: + cur.close() + return created + + +def _drop_expired_monthly_detection_partitions(conn, retention_months: int) -> int: + """detection_model_run_outputs retention_months 초과 월 파티션 DROP. + + SHADOW 원시 결과는 비교 분석 후 가치 낮음 — 기본 retention 은 1개월. + 집계는 detection_model_metrics 에 보존되므로 원시 폐기해도 추적 가능. + """ + cutoff_anchor = date.today().replace(day=1) + # retention_months 만큼 과거로 이동 + y = cutoff_anchor.year + m = cutoff_anchor.month - retention_months + while m <= 0: + m += 12 + y -= 1 + cutoff = date(y, m, 1) + + cur = conn.cursor() + dropped = 0 + try: + cur.execute( + "SELECT c.relname FROM pg_class c " + "JOIN pg_namespace n ON n.oid = c.relnamespace " + "WHERE c.relname LIKE 'detection_model_run_outputs_%%' " + "AND n.nspname = %s AND c.relkind = 'r'", + (settings.KCGDB_SCHEMA,), + ) + for (name,) in cur.fetchall(): + tail = name[len('detection_model_run_outputs_'):] + try: + yy, mm = tail.split('_') + partition_start = date(int(yy), int(mm), 1) + except (ValueError, IndexError): + continue + if partition_start < cutoff: + cur.execute(f'DROP TABLE IF EXISTS {qualified_table(name)}') + dropped += 1 + logger.info( + 'dropped expired partition: %s.%s', settings.KCGDB_SCHEMA, name, + ) + conn.commit() + except Exception as e: + conn.rollback() + logger.error('failed to drop detection_model_run_outputs partitions: %s', e) + finally: + cur.close() + return dropped + + def _cleanup_stale_scores(conn, cleanup_days: int) -> int: """cleanup_days 이상 미관측 점수 레코드 삭제.""" cur = conn.cursor() @@ -131,13 +226,25 @@ def maintain_partitions(): retention = _get_config_int(conn, 'partition.raw_metrics.retention_days', 7) ahead = _get_config_int(conn, 'partition.raw_metrics.create_ahead_days', 3) cleanup_days = _get_config_int(conn, 'partition.scores.cleanup_days', 30) + det_months_ahead = _get_config_int( + conn, 'partition.detection_model_run_outputs.create_ahead_months', 2, + ) + det_retention_months = _get_config_int( + conn, 'partition.detection_model_run_outputs.retention_months', 1, + ) created = _create_future_partitions(conn, ahead) dropped = _drop_expired_partitions(conn, retention) cleaned = _cleanup_stale_scores(conn, cleanup_days) + det_created = _create_future_monthly_detection_partitions(conn, det_months_ahead) + det_dropped = _drop_expired_monthly_detection_partitions(conn, det_retention_months) + logger.info( 'partition maintenance: %d created, %d dropped, %d stale scores cleaned ' - '(retention=%dd, ahead=%dd, cleanup=%dd)', + '(retention=%dd, ahead=%dd, cleanup=%dd); ' + 'detection_model_run_outputs: %d created, %d dropped ' + '(retention_months=%d, ahead_months=%d)', created, dropped, cleaned, retention, ahead, cleanup_days, + det_created, det_dropped, det_retention_months, det_months_ahead, ) diff --git a/prediction/models_core/__init__.py b/prediction/models_core/__init__.py new file mode 100644 index 0000000..acd9b67 --- /dev/null +++ b/prediction/models_core/__init__.py @@ -0,0 +1,26 @@ +"""Detection Model Registry (Phase 1-2). + +V034 detection_models / detection_model_versions 스키마 위에서 +`ACTIVE` 상태 버전들을 인스턴스화하여 사이클 내에서 실행·비교하는 프레임. + +공개 모듈: +- base : BaseDetectionModel, ModelContext, ModelResult +- params_loader: detection_model_versions.params JSONB 로드 + TTL 캐시 +- registry : ACTIVE 버전 전체 로드 + DAG 검증 +- executor : topo 순서 PRIMARY 실행 → ctx.shared 주입 → SHADOW/CHALLENGER 실행 +- feature_flag : 신·구 경로 토글 + +핵심 불변식 (오염 차단): +- SHADOW/CHALLENGER 의 결과는 `ctx.shared[model_id]` 에 기록되지 않는다. +- 후행 PRIMARY 모델은 선행 PRIMARY 결과만 입력으로 받는다. +""" + +from .base import BaseDetectionModel, ModelContext, ModelResult +from .feature_flag import use_model_registry + +__all__ = [ + 'BaseDetectionModel', + 'ModelContext', + 'ModelResult', + 'use_model_registry', +] diff --git a/prediction/models_core/base.py b/prediction/models_core/base.py new file mode 100644 index 0000000..6e9db7b --- /dev/null +++ b/prediction/models_core/base.py @@ -0,0 +1,150 @@ +"""Detection Model 추상 계층. + +prediction 모듈의 기존 함수형 알고리즘(`algorithms/*`) 을 그대로 두고, +Adapter 형태로 감싸서 "모델 단위 실행·버전·파라미터"를 표준화한다. + +설계: +- `ModelContext` — 한 사이클의 공통 입력/공유 상태 (불변 전제) +- `ModelResult` — 한 모델·한 버전의 실행 결과 (입력별 output + 메트릭) +- `BaseDetectionModel` — 등록 가능한 최소 계약 (model_id / version / role / params / run) + +불변식: +- SHADOW/CHALLENGER 는 `ctx.shared[model_id]` 에 기록되지 않음 (Executor 책임) +- `params` 는 DRAFT 로 수정, ACTIVE 는 immutable 스냅샷 (DB 제약과 같은 규약) +""" +from __future__ import annotations + +import logging +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Optional + +logger = logging.getLogger(__name__) + + +# role 상수 — DB CHECK 제약과 동일한 문자열 +ROLE_PRIMARY = 'PRIMARY' +ROLE_SHADOW = 'SHADOW' +ROLE_CHALLENGER = 'CHALLENGER' +ALLOWED_ROLES = (ROLE_PRIMARY, ROLE_SHADOW, ROLE_CHALLENGER) + + +@dataclass +class ModelContext: + """한 사이클 공통 입력 + 모델 간 공유 상태. + + Attributes: + cycle_started_at: 사이클 시작 시각 (모든 모델·버전이 공유) + vessel_store: 인메모리 AIS 캐시 (Optional — 테스트 시 None 허용) + conn: kcgdb psycopg2 connection (Optional — 테스트 시 None 허용) + shared: 선행 모델 PRIMARY 결과 보관소. key=model_id, value=ModelResult + SHADOW/CHALLENGER 는 여기에 쓰지 않는다 (오염 차단). + inputs: 모델이 소비할 공통 입력 목록 (선박 row 등). 버전 간 공정 비교 보장. + extras: 필요시 모델별 보조 데이터 (feature flag, tunable 등) + """ + cycle_started_at: datetime + vessel_store: Any = None + conn: Any = None + shared: dict = field(default_factory=dict) + inputs: list = field(default_factory=list) + extras: dict = field(default_factory=dict) + + +@dataclass +class ModelResult: + """한 모델·한 버전의 실행 결과. + + Attributes: + model_id: 모델 식별자 + version_id: detection_model_versions.id + version_str: 'v1.0.0' 등 사람이 읽는 버전 문자열 + role: PRIMARY / SHADOW / CHALLENGER + outputs_per_input: [(input_ref, output_dict), ...] + input_ref 는 비교용 키(예: {'mmsi': '412...', 'analyzed_at': ...}) + output_dict 는 JSONB 저장 가능한 결과 snapshot + metrics: detection_model_metrics 로 기록될 집계 관측치 + (key=metric_key, value=numeric) + duration_ms: 이 버전 단위 실행 소요 + """ + model_id: str + version_id: int + version_str: str + role: str + outputs_per_input: list[tuple[dict, dict]] = field(default_factory=list) + metrics: dict[str, float] = field(default_factory=dict) + duration_ms: int = 0 + + +class BaseDetectionModel(ABC): + """탐지 모델 추상 베이스. + + 구현체는 `prediction/models_core/registered/` 하위에 두고 + `ModelRegistry.discover_classes()` 가 자동 import 한다. + + 클래스 레벨 속성(model_id / depends_on) 은 **클래스 정의 시** 고정, + 인스턴스 속성(version_id / version_str / role / params) 은 + `ModelRegistry` 가 ACTIVE 버전 스냅샷을 읽어 주입한다. + + 한 `BaseDetectionModel` 서브클래스에 대해 DB 에 N 개 ACTIVE 버전이 있으면 + Registry 는 **각 버전마다 별도 인스턴스**를 생성한다 (PRIMARY 1 + SHADOW/CHALLENGER N). + """ + + # --- 클래스 메타 (서브클래스가 override) --- + model_id: str = '' + depends_on: list[str] = [] + + # V034 스키마 컬럼 길이 상한 — 운영자 실수·장기 실행에서 silent 한 persist 실패를 + # 방지하기 위해 클래스 정의 시점에 선제 검증한다. + _MODEL_ID_MAXLEN = 64 + + def __init__( + self, + version_id: int, + version_str: str, + role: str, + params: dict, + ) -> None: + if role not in ALLOWED_ROLES: + raise ValueError(f'invalid role: {role!r} (expected {ALLOWED_ROLES})') + if not self.model_id: + raise ValueError( + f'{type(self).__name__}.model_id is empty — override as class attribute' + ) + if len(self.model_id) > self._MODEL_ID_MAXLEN: + raise ValueError( + f'{type(self).__name__}.model_id too long ' + f'({len(self.model_id)} > {self._MODEL_ID_MAXLEN}): {self.model_id!r}' + ) + self.version_id = version_id + self.version_str = version_str + self.role = role + self.params: dict = dict(params) if params else {} + + # --- 서브클래스 구현 포인트 --- + @abstractmethod + def run(self, ctx: ModelContext) -> ModelResult: + """한 사이클에 대해 모델을 실행. + + 반환값의 `outputs_per_input` 은 입력 단위 비교가 가능하도록 + **같은 input_ref 스키마를 같은 model_id 내에서 유지**해야 한다. + (PRIMARY 와 SHADOW 의 input_ref 가 일치해야 diff JOIN 이 가능.) + """ + raise NotImplementedError + + # --- 편의 --- + def label(self) -> str: + return f'{self.model_id}@{self.role}[{self.version_str}]' + + def __repr__(self) -> str: # pragma: no cover + return f'<{type(self).__name__} {self.label()} version_id={self.version_id}>' + + +def make_input_ref(mmsi: str, analyzed_at: Optional[datetime] = None, **extra) -> dict: + """관용 input_ref 생성기. PRIMARY/SHADOW 가 같은 포맷을 쓰도록 강제하는 도우미.""" + ref: dict[str, Any] = {'mmsi': str(mmsi)} + if analyzed_at is not None: + ref['analyzed_at'] = analyzed_at.isoformat() if isinstance(analyzed_at, datetime) else analyzed_at + for k, v in extra.items(): + ref[k] = v + return ref diff --git a/prediction/models_core/executor.py b/prediction/models_core/executor.py new file mode 100644 index 0000000..bffa280 --- /dev/null +++ b/prediction/models_core/executor.py @@ -0,0 +1,287 @@ +"""DAGExecutor — ExecutionPlan 을 실제로 돌리고 DB 에 결과/메트릭을 기록한다. + +불변식 (테스트로도 검증): +1. PRIMARY 실행 결과만 `ctx.shared[model_id]` 에 주입 (후행 모델의 입력 소스). +2. SHADOW/CHALLENGER 결과는 `detection_model_run_outputs` 에 저장만, shared 에 **절대 주입 금지**. +3. PRIMARY 가 실패하면 후행 모델 실행 skip (upstream 결과 없음). + SHADOW/CHALLENGER 실패는 그 버전만 skip, 다른 버전·후행 모델에 영향 없음. + +DB persist: +- detection_model_run_outputs (PARTITION BY cycle_started_at): execute_values 배치 INSERT +- detection_model_metrics: 집계 메트릭 + +참고: docs/prediction-analysis.md §7, plans/vast-tinkering-knuth.md Phase 1-2 +""" +from __future__ import annotations + +import logging +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Optional + +from pipeline.stage_runner import run_stage + +from .base import BaseDetectionModel, ModelContext, ModelResult, ROLE_PRIMARY +from .feature_flag import concurrent_shadows +from .registry import ExecutionPlan + +logger = logging.getLogger(__name__) + +# V034 스키마 VARCHAR(64) — 초과하면 persist 가 silent 하게 실패하므로 선제 절단·경고 +_METRIC_KEY_MAXLEN = 64 + + +class DAGExecutor: + """ExecutionPlan 을 실행하고 DB persist 를 담당. + + persist 는 ctx.conn 을 재사용한다 (pool 중복 획득 방지). + ctx.conn 이 None 이면 기본 persist 함수들이 자체적으로 get_conn() 호출. + """ + + def __init__( + self, + plan: ExecutionPlan, + *, + persist_fn=None, + persist_metrics_fn=None, + ) -> None: + self.plan = plan + # 테스트에서 DB 없이 돌리기 위해 persist 훅을 주입 가능하게 만든다. + self._persist_fn = persist_fn or _persist_run_outputs + self._persist_metrics_fn = persist_metrics_fn or _persist_metrics + self._ctx_conn = None # run() 진입 시 셋업 + + def run(self, ctx: ModelContext) -> dict: + """전체 Plan 실행. + + Returns: + {'executed': int, 'failed': int, 'shadow_ran': int, 'shadow_failed': int} + """ + # ctx.conn 이 있으면 persist 도 이 conn 을 재사용하도록 보관한다. + # (maxconn=5 pool 고갈 방지 — persist 마다 별도 get_conn() 획득 금지) + self._ctx_conn = getattr(ctx, 'conn', None) + + summary = { + 'executed': 0, + 'failed': 0, + 'skipped_missing_deps': 0, + 'shadow_ran': 0, + 'shadow_failed': 0, + } + + for model_id in self.plan.topo_order: + primary = self.plan.primaries.get(model_id) + shadows = list(self.plan.shadows.get(model_id, [])) + + if primary is None: + # PRIMARY 없이 SHADOW 만 있는 모델은 실행 불가 (비교 기준이 없음) + if shadows: + logger.warning( + 'model %s has %d SHADOW/CHALLENGER but no PRIMARY — skipping', + model_id, len(shadows), + ) + continue + + # upstream PRIMARY 결과가 모두 있는지 확인 + missing = [ + dep for dep in self.plan.edges.get(model_id, ()) + if dep not in ctx.shared + ] + if missing: + summary['skipped_missing_deps'] += 1 + logger.warning( + 'skip %s — upstream PRIMARY missing: %s', + primary.label(), missing, + ) + # SHADOW 도 같은 이유로 스킵 (정당한 비교 불가) + continue + + primary_result = self._run_single(primary, ctx) + if primary_result is None: + summary['failed'] += 1 + # SHADOW 는 같은 입력이 있어야 비교 의미가 있으므로 이 사이클에선 스킵 + continue + + summary['executed'] += 1 + ctx.shared[model_id] = primary_result + self._persist(primary_result, ctx.cycle_started_at) + + # SHADOW/CHALLENGER 는 shared 주입 **금지** — 결과 persist 만 + if shadows: + ran, failed = self._run_shadows(shadows, ctx) + summary['shadow_ran'] += ran + summary['shadow_failed'] += failed + + logger.info( + 'DAGExecutor done: executed=%d failed=%d skip_deps=%d shadow_ran=%d shadow_failed=%d', + summary['executed'], summary['failed'], + summary['skipped_missing_deps'], + summary['shadow_ran'], summary['shadow_failed'], + ) + return summary + + # ------------------------------------------------------------------ + def _run_single(self, model: BaseDetectionModel, ctx: ModelContext) -> Optional[ModelResult]: + """run_stage 로 감싸서 실패 격리 + 지속시간 계측.""" + t0 = time.time() + result = run_stage(model.label(), model.run, ctx, required=False) + if result is None: + return None + # duration_ms 가 비어있으면 여기서 채움 + if not result.duration_ms: + result.duration_ms = int((time.time() - t0) * 1000) + return result + + def _run_shadows( + self, + shadows: list[BaseDetectionModel], + ctx: ModelContext, + ) -> tuple[int, int]: + ran = 0 + failed = 0 + if concurrent_shadows() and len(shadows) > 1: + with ThreadPoolExecutor(max_workers=min(4, len(shadows))) as pool: + futures = {pool.submit(self._run_single, s, ctx): s for s in shadows} + for fut in as_completed(futures): + s = futures[fut] + try: + r = fut.result() + except Exception: + logger.exception('shadow %s raised', s.label()) + r = None + if r is None: + failed += 1 + continue + ran += 1 + self._persist(r, ctx.cycle_started_at) + else: + for s in shadows: + r = self._run_single(s, ctx) + if r is None: + failed += 1 + continue + ran += 1 + self._persist(r, ctx.cycle_started_at) + return ran, failed + + def _persist(self, result: ModelResult, cycle_started_at) -> None: + conn = self._ctx_conn + try: + self._persist_fn(result, cycle_started_at, conn=conn) + except Exception: + logger.exception( + 'failed to persist run_outputs for %s', result.model_id, + ) + try: + self._persist_metrics_fn(result, cycle_started_at, conn=conn) + except Exception: + logger.exception( + 'failed to persist metrics for %s', result.model_id, + ) + + +# ---------------------------------------------------------------------- +# 기본 persist 구현 — kcgdb 연결을 얻어서 직접 INSERT +# ---------------------------------------------------------------------- +_INSERT_RUN_OUTPUTS = """ + INSERT INTO kcg.detection_model_run_outputs ( + cycle_started_at, model_id, version_id, role, + input_ref, outputs, cycle_duration_ms + ) VALUES %s +""" + +_INSERT_METRICS = """ + INSERT INTO kcg.detection_model_metrics ( + model_id, version_id, role, metric_key, metric_value, cycle_started_at + ) VALUES %s +""" + + +def _persist_run_outputs(result: ModelResult, cycle_started_at, *, conn=None) -> None: + """detection_model_run_outputs 배치 INSERT. + + conn 이 전달되면 **재사용** (pool 중복 획득 방지, 커밋 책임은 호출자). + None 이면 자체적으로 kcgdb.get_conn() 으로 커넥션을 얻고 직접 커밋. + """ + if not result.outputs_per_input: + return + from psycopg2.extras import Json, execute_values + + rows = [ + ( + cycle_started_at, + result.model_id, + result.version_id, + result.role, + Json(input_ref or {}), + Json(output or {}), + result.duration_ms, + ) + for input_ref, output in result.outputs_per_input + ] + _execute_insert(_INSERT_RUN_OUTPUTS, rows, conn=conn) + + +def _execute_insert(sql: str, rows: list, *, conn=None) -> None: + """execute_values 공통 — conn 재사용 시 commit 은 호출자 책임.""" + if not rows: + return + from psycopg2.extras import execute_values + + if conn is not None: + cur = conn.cursor() + try: + execute_values(cur, sql, rows, page_size=200) + conn.commit() + except Exception: + conn.rollback() + raise + finally: + cur.close() + return + + from db import kcgdb + with kcgdb.get_conn() as fresh_conn: + cur = fresh_conn.cursor() + try: + execute_values(cur, sql, rows, page_size=200) + fresh_conn.commit() + except Exception: + fresh_conn.rollback() + raise + finally: + cur.close() + + +def _persist_metrics(result: ModelResult, cycle_started_at, *, conn=None) -> None: + """detection_model_metrics 배치 INSERT. cycle_duration_ms 기본 포함. + + conn 이 전달되면 재사용, None 이면 자체 get_conn(). + metric_key VARCHAR(64) 초과는 경고 후 드롭 (silent 실패 방지). + """ + metrics = dict(result.metrics or {}) + metrics.setdefault('cycle_duration_ms', float(result.duration_ms)) + metrics.setdefault('output_count', float(len(result.outputs_per_input))) + + rows = [] + for key, val in metrics.items(): + if val is None: + continue + if len(key) > _METRIC_KEY_MAXLEN: + logger.warning( + 'metric_key %r exceeds VARCHAR(%d) — dropping (model=%s version=%s)', + key, _METRIC_KEY_MAXLEN, result.model_id, result.version_id, + ) + continue + rows.append(( + result.model_id, + result.version_id, + result.role, + key, + float(val), + cycle_started_at, + )) + _execute_insert(_INSERT_METRICS, rows, conn=conn) + + +__all__ = ['DAGExecutor'] diff --git a/prediction/models_core/feature_flag.py b/prediction/models_core/feature_flag.py new file mode 100644 index 0000000..33dd18a --- /dev/null +++ b/prediction/models_core/feature_flag.py @@ -0,0 +1,29 @@ +"""Detection Model Registry feature flag. + +신·구 prediction 경로를 공존시키는 동안 환경변수로 토글한다. +초기 배포에서는 **0 (구 경로 유지)** 가 기본 — Phase 2 PoC 이 신·구 diff=0 +동치성을 확인한 뒤 1 로 전환하는 별도 릴리즈를 내는 전략. + +환경변수: + PREDICTION_USE_MODEL_REGISTRY '1' 이면 DAGExecutor 기반 신 경로 사용 + PREDICTION_CONCURRENT_SHADOWS '1' 이면 SHADOW/CHALLENGER 를 스레드풀 동시 실행 + (기본 0 — 순차 실행, psycopg2 pool 안전) +""" +from __future__ import annotations + +import os + + +def _bool_env(key: str, default: str = '0') -> bool: + raw = os.getenv(key, default).strip().lower() + return raw in ('1', 'true', 'yes', 'on') + + +def use_model_registry() -> bool: + """models_core Registry·Executor 기반 경로 사용 여부.""" + return _bool_env('PREDICTION_USE_MODEL_REGISTRY', '0') + + +def concurrent_shadows() -> bool: + """SHADOW/CHALLENGER 를 ThreadPoolExecutor 로 동시 실행할지.""" + return _bool_env('PREDICTION_CONCURRENT_SHADOWS', '0') diff --git a/prediction/models_core/params_loader.py b/prediction/models_core/params_loader.py new file mode 100644 index 0000000..f5b240e --- /dev/null +++ b/prediction/models_core/params_loader.py @@ -0,0 +1,176 @@ +"""`detection_model_versions.params` JSONB 로더 + 5분 TTL 캐시. + +- correlation_param_models 패턴의 일반화 — **매 사이클 재로드**를 기본으로, + 다만 한 사이클 내에서 여러 번 조회되는 경우를 위해 TTL 캐시를 둔다. +- Registry 가 ACTIVE 버전 목록을 조회할 때와 executor 가 개별 버전 params 를 + 쓸 때 공통으로 사용. + +반환 스키마: + VersionRow = { + 'id': int, # detection_model_versions.id + 'model_id': str, + 'version': str, + 'role': str, # PRIMARY / SHADOW / CHALLENGER + 'params': dict, # JSONB + } +""" +from __future__ import annotations + +import json +import logging +import threading +import time +from typing import Optional, TypedDict + +logger = logging.getLogger(__name__) + + +class VersionRow(TypedDict): + id: int + model_id: str + version: str + role: str + params: dict + + +_DEFAULT_TTL_SEC = 300 # 5분 + + +class _ParamsCache: + """간단 TTL 캐시 (프로세스 로컬). + + thread-safe: Registry 재구성은 사이클 시작 스레드에서만 일어나지만 + APScheduler 가 동시 job 을 허용할 수 있어 락으로 보호한다. + """ + + def __init__(self, ttl_sec: int = _DEFAULT_TTL_SEC) -> None: + self._ttl = ttl_sec + self._lock = threading.Lock() + self._rows: Optional[list[VersionRow]] = None + self._loaded_at: float = 0.0 + + def get(self, conn, *, force: bool = False) -> list[VersionRow]: + now = time.time() + with self._lock: + stale = ( + self._rows is None + or force + or (now - self._loaded_at) > self._ttl + ) + if stale: + self._rows = _fetch_active_versions(conn) + self._loaded_at = now + logger.info( + 'params cache reloaded: %d ACTIVE versions (ttl=%ds)', + len(self._rows), self._ttl, + ) + return list(self._rows or []) + + def invalidate(self) -> None: + with self._lock: + self._rows = None + self._loaded_at = 0.0 + + +_cache = _ParamsCache() + + +def load_active_versions(conn, *, force_reload: bool = False) -> list[VersionRow]: + """ACTIVE 상태의 모든 model_id × version 을 한 번에 조회. + + model 단위로 PRIMARY 1 개 + SHADOW/CHALLENGER N 개가 섞여 반환될 수 있다. + Registry 가 그룹화를 담당. + + Args: + conn: psycopg2 connection + force_reload: True 면 TTL 무시하고 DB 재조회 + + Returns: + VersionRow 리스트 + """ + return _cache.get(conn, force=force_reload) + + +def invalidate_cache() -> None: + """운영자 API 가 version 을 promote·archive 한 직후 호출하면 + 다음 조회에서 즉시 DB 재로드가 일어난다. + """ + _cache.invalidate() + + +def _fetch_active_versions(conn) -> list[VersionRow]: + """SQL — kcg.detection_model_versions WHERE status='ACTIVE'. + + JSONB 는 psycopg2 기본 설정에서 이미 dict 로 반환되지만, 안전을 위해 + str 인 경우에도 json.loads 로 파싱한다. + """ + sql = """ + SELECT v.id, + v.model_id, + v.version, + v.role, + v.params + FROM kcg.detection_model_versions v + JOIN kcg.detection_models m ON m.model_id = v.model_id + WHERE v.status = 'ACTIVE' + AND m.is_enabled = TRUE + ORDER BY v.model_id, + CASE v.role + WHEN 'PRIMARY' THEN 0 + WHEN 'CHALLENGER' THEN 1 + WHEN 'SHADOW' THEN 2 + ELSE 3 + END, + v.id + """ + rows: list[VersionRow] = [] + cur = conn.cursor() + try: + cur.execute(sql) + for row in cur.fetchall(): + vid, model_id, version, role, params = row + if isinstance(params, (bytes, bytearray)): + params = params.decode('utf-8') + if isinstance(params, str): + try: + params = json.loads(params) + except json.JSONDecodeError: + logger.warning( + 'detection_model_versions.id=%s params JSON decode failed — treated as {}', + vid, + ) + params = {} + rows.append( + VersionRow( + id=int(vid), + model_id=str(model_id), + version=str(version), + role=str(role), + params=dict(params or {}), + ) + ) + return rows + finally: + cur.close() + + +def load_dependencies(conn) -> list[tuple[str, str, str]]: + """detection_model_dependencies 전체 엣지 반환. + + Returns: + [(model_id, depends_on, input_key), ...] + """ + sql = """ + SELECT model_id, depends_on, input_key + FROM kcg.detection_model_dependencies + ORDER BY model_id, depends_on, input_key + """ + cur = conn.cursor() + try: + cur.execute(sql) + return [ + (str(m), str(d), str(k)) + for m, d, k in cur.fetchall() + ] + finally: + cur.close() diff --git a/prediction/models_core/registered/__init__.py b/prediction/models_core/registered/__init__.py new file mode 100644 index 0000000..3c19f1c --- /dev/null +++ b/prediction/models_core/registered/__init__.py @@ -0,0 +1,5 @@ +"""`BaseDetectionModel` 구현체 등록소. + +Phase 1-2 기반 PR 에서는 실제 구현체가 없다 (Phase 2 에서 5 모델 PoC 추가). +이 디렉토리는 `ModelRegistry.discover_classes()` 가 `importlib` 으로 스캔한다. +""" diff --git a/prediction/models_core/registry.py b/prediction/models_core/registry.py new file mode 100644 index 0000000..c8151b6 --- /dev/null +++ b/prediction/models_core/registry.py @@ -0,0 +1,282 @@ +"""ModelRegistry — ACTIVE 버전 전체 인스턴스화 + DAG 검증 + 실행 플랜 생성. + +역할: +1. `prediction/models_core/registered/` 를 스캔하여 BaseDetectionModel 서브클래스를 모음 +2. DB 에서 ACTIVE 버전 목록(PRIMARY + SHADOW/CHALLENGER) 을 읽어 **버전별 인스턴스**를 생성 +3. detection_model_dependencies + 클래스 `depends_on` 을 합쳐 DAG 를 구성하고 순환 검출 +4. Executor 가 쓸 topological 실행 플랜(ExecutionPlan) 을 반환 + +주의: +- 클래스에 `model_id` 가 정의돼 있어도 DB 에 해당 레코드가 없으면 인스턴스화하지 않음 + (즉 DB 가 Single Source of Truth, 코드는 "구현 있음" 선언 역할) +- DB 에 model_id 가 있고 코드에 클래스가 없으면 경고 로그 후 **스킵** (부분 배포 허용) +""" +from __future__ import annotations + +import importlib +import logging +import pkgutil +from collections import defaultdict, deque +from dataclasses import dataclass, field +from typing import Iterable, Optional, Type + +from .base import ( + ALLOWED_ROLES, + ROLE_CHALLENGER, + ROLE_PRIMARY, + ROLE_SHADOW, + BaseDetectionModel, +) +from .params_loader import VersionRow, load_active_versions, load_dependencies + +logger = logging.getLogger(__name__) + + +@dataclass +class ExecutionPlan: + """Executor 가 따를 실행 순서. + + Attributes: + topo_order: PRIMARY 기준 topological order (model_id 문자열 리스트). + SHADOW/CHALLENGER 는 자기 model_id 의 PRIMARY 와 같은 슬롯에서 돈다. + primaries: model_id -> BaseDetectionModel 인스턴스 (PRIMARY) + shadows: model_id -> list[BaseDetectionModel] (SHADOW + CHALLENGER) + edges: DAG 디버깅용 (model_id -> set(depends_on)) + """ + topo_order: list[str] = field(default_factory=list) + primaries: dict[str, BaseDetectionModel] = field(default_factory=dict) + shadows: dict[str, list[BaseDetectionModel]] = field(default_factory=lambda: defaultdict(list)) + edges: dict[str, set[str]] = field(default_factory=lambda: defaultdict(set)) + + +class DAGCycleError(RuntimeError): + """모델 의존성 그래프에 순환이 있을 때.""" + + +class ModelRegistry: + """ACTIVE 버전 인스턴스 저장소 + Plan 제공자.""" + + _DEFAULT_REGISTERED_PKG = 'models_core.registered' + + def __init__(self, registered_pkg: str = _DEFAULT_REGISTERED_PKG) -> None: + self._registered_pkg = registered_pkg + self._classes: dict[str, Type[BaseDetectionModel]] = {} + + # ------------------------------------------------------------------ + # 클래스 discovery + # ------------------------------------------------------------------ + def discover_classes(self) -> dict[str, Type[BaseDetectionModel]]: + """`registered/` 하위 모듈 auto-import + BaseDetectionModel 서브클래스 수집. + + 동일 model_id 가 여러 클래스에서 중복 선언되면 ValueError. + """ + self._classes = {} + try: + pkg = importlib.import_module(self._registered_pkg) + except ImportError: + logger.warning('registered package %s not importable', self._registered_pkg) + return {} + + for mod_info in pkgutil.iter_modules(pkg.__path__, prefix=f'{self._registered_pkg}.'): + try: + module = importlib.import_module(mod_info.name) + except Exception: + logger.exception('failed to import %s', mod_info.name) + continue + for attr_name in dir(module): + obj = getattr(module, attr_name) + if not isinstance(obj, type): + continue + if obj is BaseDetectionModel: + continue + if not issubclass(obj, BaseDetectionModel): + continue + mid = getattr(obj, 'model_id', '') + if not mid: + continue + if mid in self._classes and self._classes[mid] is not obj: + raise ValueError( + f'duplicate model_id {mid!r}: ' + f'{self._classes[mid].__name__} vs {obj.__name__}' + ) + self._classes[mid] = obj + logger.info('discovered %d detection model classes: %s', + len(self._classes), sorted(self._classes.keys())) + return dict(self._classes) + + def register_class(self, cls: Type[BaseDetectionModel]) -> None: + """테스트·수동 등록용.""" + mid = getattr(cls, 'model_id', '') + if not mid: + raise ValueError(f'{cls.__name__}.model_id is empty') + self._classes[mid] = cls + + # ------------------------------------------------------------------ + # Plan 생성 + # ------------------------------------------------------------------ + def build_plan(self, conn, *, force_reload: bool = False) -> ExecutionPlan: + """DB ACTIVE 버전 + 클래스 + DAG 를 합쳐 ExecutionPlan 생성.""" + versions = load_active_versions(conn, force_reload=force_reload) + edges = self._collect_edges(conn, versions) + plan = self._instantiate(versions, edges) + plan.topo_order = self._topo_sort(plan) + return plan + + def build_plan_from_rows( + self, + versions: Iterable[VersionRow], + dependencies: Iterable[tuple[str, str, str]] = (), + ) -> ExecutionPlan: + """테스트용 — DB 없이 in-memory rows 만으로 Plan 생성.""" + edges: dict[str, set[str]] = defaultdict(set) + active_ids = {v['model_id'] for v in versions} + for model_id, depends_on, _key in dependencies: + if model_id in active_ids and depends_on in active_ids: + edges[model_id].add(depends_on) + # 클래스 선언 depends_on 도 합류 + for v in versions: + cls = self._classes.get(v['model_id']) + if cls is None: + continue + for dep in getattr(cls, 'depends_on', []) or []: + if dep in active_ids: + edges[v['model_id']].add(dep) + + plan = self._instantiate(versions, edges) + plan.topo_order = self._topo_sort(plan) + return plan + + # ------------------------------------------------------------------ + # 내부 + # ------------------------------------------------------------------ + def _collect_edges( + self, + conn, + versions: list[VersionRow], + ) -> dict[str, set[str]]: + """DB dependencies + 클래스 선언 depends_on 합산.""" + edges: dict[str, set[str]] = defaultdict(set) + active_ids = {v['model_id'] for v in versions} + + try: + for model_id, depends_on, _key in load_dependencies(conn): + if model_id in active_ids and depends_on in active_ids: + edges[model_id].add(depends_on) + except Exception: + logger.exception('load_dependencies failed — proceeding with class-level depends_on only') + + for v in versions: + cls = self._classes.get(v['model_id']) + if cls is None: + continue + for dep in getattr(cls, 'depends_on', []) or []: + if dep in active_ids: + edges[v['model_id']].add(dep) + return edges + + def _instantiate( + self, + versions: Iterable[VersionRow], + edges: dict[str, set[str]], + ) -> ExecutionPlan: + plan = ExecutionPlan() + plan.edges = defaultdict(set, {k: set(v) for k, v in edges.items()}) + + for v in versions: + mid = v['model_id'] + role = v['role'] + if role not in ALLOWED_ROLES: + logger.warning( + 'skip version id=%s role=%r not in %s', + v['id'], role, ALLOWED_ROLES, + ) + continue + cls = self._classes.get(mid) + if cls is None: + logger.warning( + 'model_id=%s has ACTIVE version %s(role=%s) but no registered class — skipping', + mid, v['version'], role, + ) + continue + try: + inst = cls( + version_id=v['id'], + version_str=v['version'], + role=role, + params=v['params'], + ) + except Exception: + logger.exception( + 'failed to instantiate %s version_id=%s — skipping', + cls.__name__, v['id'], + ) + continue + + if role == ROLE_PRIMARY: + if mid in plan.primaries: + # DB UNIQUE INDEX 가 보장하지만 방어적으로 + logger.error( + 'duplicate PRIMARY for %s (existing id=%s, new id=%s) — keeping existing', + mid, plan.primaries[mid].version_id, v['id'], + ) + continue + plan.primaries[mid] = inst + else: # SHADOW / CHALLENGER + plan.shadows[mid].append(inst) + return plan + + @staticmethod + def _topo_sort(plan: ExecutionPlan) -> list[str]: + """PRIMARY 노드 기준 topological order. 순환 시 DAGCycleError.""" + nodes = set(plan.primaries.keys()) | set(plan.shadows.keys()) + # SHADOW-only 모델도 노드로 취급 (PRIMARY 미등록이면 Executor 가 skip) + in_degree: dict[str, int] = {n: 0 for n in nodes} + adj: dict[str, set[str]] = defaultdict(set) + for node, deps in plan.edges.items(): + if node not in nodes: + continue + for dep in deps: + if dep not in nodes: + continue + adj[dep].add(node) + in_degree[node] = in_degree.get(node, 0) + 1 + + order: list[str] = [] + queue = deque(sorted([n for n, d in in_degree.items() if d == 0])) + while queue: + n = queue.popleft() + order.append(n) + for nxt in sorted(adj[n]): + in_degree[nxt] -= 1 + if in_degree[nxt] == 0: + queue.append(nxt) + + if len(order) != len(nodes): + remaining = [n for n in nodes if n not in order] + raise DAGCycleError( + f'DAG cycle detected among detection models: {sorted(remaining)}' + ) + return order + + +# 편의: 싱글톤 패턴 (운영 환경에서 주로 한 인스턴스만 씀) +_registry_singleton: Optional[ModelRegistry] = None + + +def get_registry() -> ModelRegistry: + global _registry_singleton + if _registry_singleton is None: + _registry_singleton = ModelRegistry() + _registry_singleton.discover_classes() + return _registry_singleton + + +__all__ = [ + 'ModelRegistry', + 'ExecutionPlan', + 'DAGCycleError', + 'get_registry', + 'ROLE_PRIMARY', + 'ROLE_SHADOW', + 'ROLE_CHALLENGER', +] diff --git a/prediction/scheduler.py b/prediction/scheduler.py index fd9a030..a843cdb 100644 --- a/prediction/scheduler.py +++ b/prediction/scheduler.py @@ -74,6 +74,42 @@ def _fetch_dark_history(kcg_conn, mmsi_list: list[str]) -> dict[str, dict]: return {} +def _run_detection_model_registry(cycle_started_at, results): + """Phase 1-2 — ACTIVE 버전 인스턴스를 모두 돌려서 비교·관측 데이터를 남긴다. + + 신 경로는 기존 사이클 결과(`results`)를 대체하지 않는다. ctx.inputs 로 + 전달되어 각 모델이 **같은 입력에 대해** PRIMARY/SHADOW 결과를 내도록 한다. + + Phase 2 에서 실제 모델 클래스가 추가되기 전까지는 ACTIVE 버전이 없어 + 사실상 no-op 에 가깝다. 구 경로와의 공존을 위해 항상 try/except 로 감싼다. + """ + from db import kcgdb + from models_core.base import ModelContext + from models_core.executor import DAGExecutor + from models_core.registry import get_registry + + registry = get_registry() + with kcgdb.get_conn() as conn: + try: + plan = registry.build_plan(conn) + except Exception: + logger.exception('detection model plan build failed — skipping registry stage') + return + + if not plan.primaries and not plan.shadows: + logger.info('detection model registry: no ACTIVE versions — nothing to run') + return + + from dataclasses import asdict + inputs = [asdict(r) for r in (results or [])] + ctx = ModelContext( + cycle_started_at=cycle_started_at, + conn=conn, + inputs=inputs, + ) + DAGExecutor(plan).run(ctx) + + def get_last_run() -> dict: return _last_run.copy() @@ -790,6 +826,22 @@ def run_analysis_cycle(): except Exception as e: logger.exception('failed to cache analysis context for chat: %s', e) + # 10. Detection Model Registry (Phase 1-2) + # PREDICTION_USE_MODEL_REGISTRY=1 일 때만 신 경로 실행. 기본은 구 경로만. + # 이 분기는 기존 사이클 결과를 건드리지 않고, ACTIVE 버전들의 결과를 + # detection_model_run_outputs / detection_model_metrics 에 기록한다. + try: + from models_core.feature_flag import use_model_registry + if use_model_registry(): + run_stage( + 'detection_model_registry', + _run_detection_model_registry, + cycle_started_at=datetime.fromisoformat(_last_run['timestamp']), + results=results, + ) + except Exception as e: + logger.exception('detection model registry stage setup failed: %s', e) + elapsed = round(time.time() - start, 2) _last_run['duration_sec'] = elapsed _last_run['vessel_count'] = len(results) diff --git a/prediction/scripts/diagnostic-snapshot.sh b/prediction/scripts/diagnostic-snapshot.sh index 2403925..376965e 100644 --- a/prediction/scripts/diagnostic-snapshot.sh +++ b/prediction/scripts/diagnostic-snapshot.sh @@ -55,6 +55,22 @@ FROM kcg.vessel_analysis_results WHERE analyzed_at > now() - interval '5 minutes'; SQL +echo "" +echo "--- 1b. SPOOFING signal health (silent-vs-fault 구분) ---" +# gt0 > 0 인데 gt0.5 = 0 이면 "알고리즘 동작 + threshold 미돌파" (정상), +# gt0 = 0 이면 "알고리즘 자체가 계산을 못 하고 있음" (silent fault) → 로그 추적 필요. +$PSQL_TABLE << 'SQL' +SELECT count(*) total, + count(*) FILTER (WHERE spoofing_score > 0) gt0, + count(*) FILTER (WHERE spoofing_score > 0.3) gt03, + count(*) FILTER (WHERE spoofing_score > 0.5) gt05, + count(*) FILTER (WHERE spoofing_score > 0.7) gt07, + round(avg(spoofing_score)::numeric, 4) avg_score, + round(max(spoofing_score)::numeric, 4) max_score +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '5 minutes'; +SQL + #=================================================================== # PART 2: 다크베셀 심층 진단 #=================================================================== @@ -346,15 +362,64 @@ FROM kcg.prediction_kpi_realtime ORDER BY kpi_key; SQL #=================================================================== -# PART 7: 사이클 로그 + 에러 +# PART 6.5: V030 + V034 관찰 (원시 테이블) +#=================================================================== +echo "" +echo "=================================================================" +echo "PART 6.5: V030 gear_identity_collisions + V034 detection_model_*" +echo "=================================================================" + +echo "" +echo "--- 6.5-1. gear_identity_collisions severity x status (1h) ---" +$PSQL_TABLE << 'SQL' +SELECT severity, status, count(*) cnt, max(last_seen_at) last_seen +FROM kcg.gear_identity_collisions +WHERE last_seen_at > now() - interval '1 hour' +GROUP BY severity, status ORDER BY cnt DESC; +SQL + +echo "" +echo "--- 6.5-2. detection_models + 버전 상태 (Phase 1-2 이후 활성) ---" +$PSQL_TABLE << 'SQL' +SELECT count(*) AS catalog, + count(*) FILTER (WHERE is_enabled) AS enabled +FROM kcg.detection_models; +SQL + +$PSQL_TABLE << 'SQL' +SELECT status, coalesce(role,'(null)') role, count(*) cnt +FROM kcg.detection_model_versions +GROUP BY status, role ORDER BY status, role; +SQL + +echo "" +echo "--- 6.5-3. detection_model_run_outputs 5분 적재 (feature flag ON 시 증가) ---" +$PSQL_TABLE << 'SQL' +SELECT model_id, role, count(*) rows +FROM kcg.detection_model_run_outputs +WHERE cycle_started_at > now() - interval '5 minutes' +GROUP BY model_id, role ORDER BY rows DESC LIMIT 10; +SQL + +#=================================================================== +# PART 7: 사이클 로그 + 에러 + stage timing #=================================================================== echo "" echo "=================================================================" echo "PART 7: 사이클 로그 (최근 6분)" echo "=================================================================" +# stage_runner (Phase 0-1) + DAGExecutor (Phase 1-2) 로그 추가 journalctl -u kcg-ai-prediction --since '6 minutes ago' --no-pager 2>/dev/null | \ - grep -E 'analysis cycle:|lightweight|pipeline dark:|event_generator:|pair_trawl|gear_violation|GEAR_ILLEGAL|ERROR|Traceback' | \ - tail -20 + grep -E 'analysis cycle:|lightweight|pipeline dark:|event_generator:|pair_trawl|gear_violation|GEAR_ILLEGAL|stage [a-z_]+ (ok|failed)|DAGExecutor done|detection model registry|ERROR|Traceback' | \ + tail -40 + +echo "" +echo "--- 7-1. STAGE TIMING (소요시간 상위 + 실패) ---" +journalctl -u kcg-ai-prediction --since '6 minutes ago' --no-pager 2>/dev/null | \ + grep -oE 'stage [a-z_@.[:blank:][:digit:].-]+ (ok in [0-9.]+s|failed)' | \ + awk '/failed/ {print "FAIL " $0; next} + /ok in/ {n=split($0,a," "); sec=a[n]; sub(/s$/,"",sec); printf "%8.2fs %s\n", sec, $0}' | \ + sort -rn | awk 'NR<=8 || /^FAIL/' | head -20 #=================================================================== # PART 7.5: 한중어업협정 레지스트리 매칭 (V029) diff --git a/prediction/scripts/hourly-analysis-snapshot.sh b/prediction/scripts/hourly-analysis-snapshot.sh index cb0fa72..5534555 100755 --- a/prediction/scripts/hourly-analysis-snapshot.sh +++ b/prediction/scripts/hourly-analysis-snapshot.sh @@ -37,6 +37,21 @@ SELECT count(*) total, FROM kcg.vessel_analysis_results WHERE analyzed_at > now() - interval '1 hour'; +\echo +\echo === 1a. SPOOFING signal health (silent-vs-fault 구분) === +-- spoof_hi=0 이 "고장"인지 "신호 없음"인지 구분하려면 gt0 / gt03 / gt05 / max 를 모두 본다. +-- gt0 가 0 이면 파이프라인이 spoofing_score 를 아예 계산하지 못하고 있다는 신호 (원인 추적 필요). +-- gt0>0 인데 gt05=0 이면 알고리즘은 동작 중이나 threshold 돌파 대상이 없다 (정상일 수 있음). +SELECT count(*) total, + count(*) FILTER (WHERE spoofing_score > 0) gt0, + count(*) FILTER (WHERE spoofing_score > 0.3) gt03, + count(*) FILTER (WHERE spoofing_score > 0.5) gt05, + count(*) FILTER (WHERE spoofing_score > 0.7) gt07, + round(avg(spoofing_score)::numeric, 4) avg_score, + round(max(spoofing_score)::numeric, 4) max_score +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '1 hour'; + \echo \echo === 2. ZONE x DARK x GEAR_VIOLATION distribution === SELECT zone_code, @@ -369,18 +384,101 @@ SELECT date_trunc('hour', occurred_at AT TIME ZONE 'Asia/Seoul') hr, count(*) FILTER (WHERE category='EEZ_INTRUSION') eez, count(*) FILTER (WHERE category='GEAR_ILLEGAL') gear_illegal, count(*) FILTER (WHERE category='HIGH_RISK_VESSEL') high_risk, + count(*) FILTER (WHERE category='GEAR_IDENTITY_COLLISION') gear_collide, count(*) FILTER (WHERE level='CRITICAL') critical FROM kcg.prediction_events WHERE created_at > now() - interval '24 hours' GROUP BY hr ORDER BY hr DESC LIMIT 25; +\echo +\echo =================================================================== +\echo === V030 GEAR_IDENTITY_COLLISIONS (원시 테이블 관찰) +\echo =================================================================== +\echo +\echo === V030-1. severity x status 분포 (24h) === +SELECT severity, status, count(*) cnt, + max(last_seen_at) last_seen +FROM kcg.gear_identity_collisions +WHERE last_seen_at > now() - interval '24 hours' +GROUP BY severity, status ORDER BY cnt DESC; + +\echo +\echo === V030-2. coexistence/swap 상위 20건 (24h) === +SELECT name, mmsi_lo, mmsi_hi, severity, status, + coexistence_count coex, swap_count swap, + round(max_distance_km::numeric, 1) max_km +FROM kcg.gear_identity_collisions +WHERE last_seen_at > now() - interval '24 hours' +ORDER BY (coexistence_count + swap_count * 5) DESC LIMIT 20; + +\echo +\echo =================================================================== +\echo === V034 DETECTION_MODEL REGISTRY (Phase 1-2) +\echo =================================================================== +\echo +\echo === V034-1. model catalog + enabled 여부 === +SELECT count(*) catalog_total, + count(*) FILTER (WHERE is_enabled) enabled +FROM kcg.detection_models; + +\echo +\echo === V034-2. version 상태 x role 분포 === +SELECT status, coalesce(role,'(null)') role, count(*) cnt +FROM kcg.detection_model_versions +GROUP BY status, role ORDER BY status, role; + +\echo +\echo === V034-3. detection_model_run_outputs 1h 적재 현황 (feature flag ON 시 증가) === +SELECT model_id, role, count(*) rows, + min(cycle_started_at) oldest, max(cycle_started_at) newest +FROM kcg.detection_model_run_outputs +WHERE cycle_started_at > now() - interval '1 hour' +GROUP BY model_id, role ORDER BY rows DESC; + +\echo +\echo === V034-4. detection_model_metrics 최신 5 모델 평균 소요 === +SELECT model_id, role, + round(avg(metric_value) FILTER (WHERE metric_key='cycle_duration_ms')::numeric, 1) avg_ms, + round(avg(metric_value) FILTER (WHERE metric_key='output_count')::numeric, 1) avg_out +FROM kcg.detection_model_metrics +WHERE cycle_started_at > now() - interval '1 hour' +GROUP BY model_id, role ORDER BY model_id, role; + +\echo +\echo === C1. stats_hourly vs raw events 카테고리 drift (event_generator silent drop 감시) === +-- raw prediction_events 에는 있지만 stats_hourly.by_category 에는 없는 카테고리 (반대도 표시) +WITH recent_events AS ( + SELECT DISTINCT category FROM kcg.prediction_events + WHERE created_at > now() - interval '2 hours' +), +stats_cats AS ( + SELECT DISTINCT jsonb_object_keys(by_category) AS category + FROM kcg.prediction_stats_hourly + WHERE stat_hour > now() - interval '2 hours' +) +SELECT 'only_in_events' gap, category FROM recent_events + WHERE category NOT IN (SELECT category FROM stats_cats) +UNION ALL +SELECT 'only_in_stats', category FROM stats_cats + WHERE category NOT IN (SELECT category FROM recent_events); + SQL echo "" echo "=== 13. CYCLE LOG (last 65 min) ===" +# stage_runner, DAGExecutor, detection_model_registry, Traceback 까지 함께 추적 journalctl -u kcg-ai-prediction --since '65 minutes ago' --no-pager 2>/dev/null | \ - grep -E 'lightweight|event_generator:|stats_aggregator hourly|kpi_writer:|analysis cycle:|pair_trawl|gear_violation|GEAR_ILLEGAL|ERROR|Traceback' | \ - tail -60 + grep -E 'lightweight|event_generator:|stats_aggregator hourly|kpi_writer:|analysis cycle:|pair_trawl|gear_violation|GEAR_ILLEGAL|stage [a-z_]+ (ok|failed)|DAGExecutor done|detection model registry|ERROR|Traceback' | \ + tail -80 + +echo "" +echo "=== 14. STAGE TIMING (last 65 min, 소요시간 상위 10 + 실패 전체) ===" +# stage ok in X.XXs / stage failed after 를 수집하여 실패+장시간 스테이지 식별 +journalctl -u kcg-ai-prediction --since '65 minutes ago' --no-pager 2>/dev/null | \ + grep -oE 'stage [a-z_@.[:blank:][:digit:].-]+ (ok in [0-9.]+s|failed)' | \ + awk '/failed/ {print "FAIL " $0; next} + /ok in/ {n=split($0,a," "); sec=a[n]; sub(/s$/,"",sec); printf "%8.2fs %s\n", sec, $0}' | \ + sort -rn | awk 'NR<=10 || /^FAIL/' | head -40 echo "" echo "=== END ===" diff --git a/prediction/tests/test_models_core.py b/prediction/tests/test_models_core.py new file mode 100644 index 0000000..10b90ad --- /dev/null +++ b/prediction/tests/test_models_core.py @@ -0,0 +1,431 @@ +"""models_core 기반 인프라 (Phase 1-2) 유닛테스트. + +DB·서버 없이 순수 파이썬 레벨에서 다음을 검증: +- params_loader 캐시 TTL 동작 +- ModelRegistry discover + 버전별 인스턴스화 +- DAG topo 정렬 + 순환 검출 +- DAGExecutor 의 오염 차단 불변식 (SHADOW 결과는 ctx.shared 에 들어가지 않음) +- PRIMARY 실패 시 후행 모델 skip +- SHADOW 전용(PRIMARY 없음) 모델 스킵 경고 +- run_stage 와의 통합 — 예외가 한 버전에 격리되는지 + +실제 DB 상호작용은 Phase 1-3 testcontainers 기반에서 수행 (후속 커밋). +""" +from __future__ import annotations + +import sys +import types +import unittest +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Optional + +# pydantic_settings stub (기존 test_time_bucket 관용) +_stub = types.ModuleType('pydantic_settings') + + +class _StubBaseSettings: + def __init__(self, **kwargs): + for name, value in self.__class__.__dict__.items(): + if name.isupper(): + setattr(self, name, kwargs.get(name, value)) + + +_stub.BaseSettings = _StubBaseSettings +sys.modules.setdefault('pydantic_settings', _stub) + +from models_core import base as mc_base +from models_core.base import ( + BaseDetectionModel, + ModelContext, + ModelResult, + ROLE_PRIMARY, + ROLE_SHADOW, + make_input_ref, +) +from models_core import params_loader +from models_core.executor import DAGExecutor +from models_core.registry import DAGCycleError, ModelRegistry + + +# ====================================================================== +# Fixture 클래스들 +# ====================================================================== +@dataclass +class _Call: + model_id: str + role: str + version_id: int + + +def _make_model_class(mid: str, depends: Optional[list] = None, *, raise_for_role: Optional[str] = None): + """동적으로 BaseDetectionModel 서브클래스 생성.""" + + class _M(BaseDetectionModel): + model_id = mid + depends_on = list(depends or []) + + def run(self, ctx: ModelContext) -> ModelResult: + if raise_for_role and self.role == raise_for_role: + raise RuntimeError(f'intentional failure in {mid}@{self.role}') + ctx.extras.setdefault('_calls', []).append( + _Call(self.model_id, self.role, self.version_id) + ) + # input_ref 스키마를 PRIMARY/SHADOW 동일 유지 + out_per = [ + (make_input_ref('412000001'), {'score': 1.0 if self.role == ROLE_PRIMARY else 1.5}), + ] + return ModelResult( + model_id=self.model_id, + version_id=self.version_id, + version_str=self.version_str, + role=self.role, + outputs_per_input=out_per, + metrics={'sentinel': float(self.version_id)}, + ) + + _M.__name__ = f'_M_{mid.replace(".", "_")}' + return _M + + +def _version_row(id_, model_id, role, version='1.0.0', params=None): + return params_loader.VersionRow( + id=id_, model_id=model_id, role=role, version=version, params=params or {} + ) + + +# ====================================================================== +# params_loader 캐시 +# ====================================================================== +class ParamsCacheTest(unittest.TestCase): + + def setUp(self): + params_loader.invalidate_cache() + + def test_invalidate_forces_reload(self): + calls = {'n': 0} + + def fake_fetch(conn): + calls['n'] += 1 + return [_version_row(1, 'a', ROLE_PRIMARY)] + + orig = params_loader._fetch_active_versions + params_loader._fetch_active_versions = fake_fetch + try: + rows1 = params_loader.load_active_versions(conn=None) + rows2 = params_loader.load_active_versions(conn=None) + self.assertEqual(calls['n'], 1) # 두 번째는 캐시 HIT + self.assertEqual(len(rows1), 1) + self.assertEqual(len(rows2), 1) + + params_loader.invalidate_cache() + params_loader.load_active_versions(conn=None) + self.assertEqual(calls['n'], 2) + finally: + params_loader._fetch_active_versions = orig + params_loader.invalidate_cache() + + def test_force_reload_bypasses_ttl(self): + calls = {'n': 0} + + def fake_fetch(conn): + calls['n'] += 1 + return [] + + orig = params_loader._fetch_active_versions + params_loader._fetch_active_versions = fake_fetch + try: + params_loader.load_active_versions(conn=None) + params_loader.load_active_versions(conn=None, force_reload=True) + self.assertEqual(calls['n'], 2) + finally: + params_loader._fetch_active_versions = orig + params_loader.invalidate_cache() + + +# ====================================================================== +# Registry topo 정렬 + DAG 검증 +# ====================================================================== +class RegistryTopoTest(unittest.TestCase): + + def _registry_with(self, *model_ids_with_deps): + """[(model_id, [dep_ids]), ...] 에 맞춘 Registry 생성.""" + reg = ModelRegistry() + for mid, deps in model_ids_with_deps: + reg.register_class(_make_model_class(mid, deps)) + return reg + + def test_topo_order_respects_dependencies(self): + reg = self._registry_with( + ('a', []), + ('b', ['a']), + ('c', ['b']), + ) + rows = [ + _version_row(10, 'a', ROLE_PRIMARY), + _version_row(11, 'b', ROLE_PRIMARY), + _version_row(12, 'c', ROLE_PRIMARY), + ] + plan = reg.build_plan_from_rows(rows) + self.assertEqual(plan.topo_order, ['a', 'b', 'c']) + + def test_cycle_detection(self): + reg = self._registry_with( + ('a', ['b']), + ('b', ['a']), + ) + rows = [ + _version_row(1, 'a', ROLE_PRIMARY), + _version_row(2, 'b', ROLE_PRIMARY), + ] + with self.assertRaises(DAGCycleError): + reg.build_plan_from_rows(rows) + + def test_shadow_version_attaches_to_primary_model(self): + reg = self._registry_with(('a', [])) + rows = [ + _version_row(1, 'a', ROLE_PRIMARY, version='1.0.0'), + _version_row(2, 'a', ROLE_SHADOW, version='1.1.0-shadow'), + _version_row(3, 'a', ROLE_SHADOW, version='1.2.0-shadow'), + ] + plan = reg.build_plan_from_rows(rows) + self.assertIn('a', plan.primaries) + self.assertEqual(plan.primaries['a'].version_id, 1) + self.assertEqual(len(plan.shadows['a']), 2) + + def test_unknown_model_id_skipped(self): + reg = ModelRegistry() # 클래스 없음 + rows = [_version_row(1, 'ghost', ROLE_PRIMARY)] + plan = reg.build_plan_from_rows(rows) + self.assertNotIn('ghost', plan.primaries) + + def test_class_depends_on_added_to_edges(self): + reg = self._registry_with( + ('base', []), + ('child', ['base']), + ) + rows = [ + _version_row(1, 'base', ROLE_PRIMARY), + _version_row(2, 'child', ROLE_PRIMARY), + ] + plan = reg.build_plan_from_rows(rows) + self.assertIn('base', plan.edges['child']) + + +# ====================================================================== +# DAGExecutor 불변식 +# ====================================================================== +class DAGExecutorTest(unittest.TestCase): + + def _collect_persisted(self): + """persist 훅 2개를 만들어 호출을 가로채는 pair 반환.""" + persisted_rows: list[ModelResult] = [] + persisted_metrics: list[ModelResult] = [] + + def p_rows(result: ModelResult, cycle_started_at, *, conn=None): + persisted_rows.append(result) + + def p_metrics(result: ModelResult, cycle_started_at, *, conn=None): + persisted_metrics.append(result) + + return persisted_rows, persisted_metrics, p_rows, p_metrics + + def _ctx(self): + return ModelContext(cycle_started_at=datetime(2026, 4, 20, 0, 0, tzinfo=timezone.utc)) + + def test_primary_result_injected_into_shared(self): + reg = ModelRegistry() + reg.register_class(_make_model_class('a')) + reg.register_class(_make_model_class('b', ['a'])) + rows = [ + _version_row(1, 'a', ROLE_PRIMARY), + _version_row(2, 'b', ROLE_PRIMARY), + ] + plan = reg.build_plan_from_rows(rows) + + pr, pm, p1, p2 = self._collect_persisted() + ctx = self._ctx() + DAGExecutor(plan, persist_fn=p1, persist_metrics_fn=p2).run(ctx) + + self.assertIn('a', ctx.shared) + self.assertIn('b', ctx.shared) + self.assertEqual(ctx.shared['a'].role, ROLE_PRIMARY) + + def test_shadow_result_not_injected_into_shared(self): + """가장 중요한 불변식 — SHADOW 결과가 ctx.shared 에 들어가면 오염.""" + reg = ModelRegistry() + reg.register_class(_make_model_class('m')) + rows = [ + _version_row(1, 'm', ROLE_PRIMARY), + _version_row(2, 'm', ROLE_SHADOW), + ] + plan = reg.build_plan_from_rows(rows) + + pr, pm, p1, p2 = self._collect_persisted() + ctx = self._ctx() + DAGExecutor(plan, persist_fn=p1, persist_metrics_fn=p2).run(ctx) + + # shared 는 PRIMARY 만 + self.assertEqual(ctx.shared['m'].role, ROLE_PRIMARY) + self.assertEqual(ctx.shared['m'].version_id, 1) + + # 저장은 둘 다 된다 + persisted_roles = {r.role for r in pr} + self.assertIn(ROLE_PRIMARY, persisted_roles) + self.assertIn(ROLE_SHADOW, persisted_roles) + + def test_downstream_sees_primary_only_even_when_shadow_differs(self): + """SHADOW 가 다른 값을 리턴해도 후행 PRIMARY 는 선행 PRIMARY 결과만 소비.""" + + class M_A(BaseDetectionModel): + model_id = 'a' + depends_on = [] + + def run(self, ctx): + val = 100 if self.role == ROLE_PRIMARY else 999 + return ModelResult( + model_id='a', version_id=self.version_id, + version_str=self.version_str, role=self.role, + outputs_per_input=[(make_input_ref('x'), {'v': val})], + metrics={}, + ) + + observed = {'downstream_seen_value': None} + + class M_B(BaseDetectionModel): + model_id = 'b' + depends_on = ['a'] + + def run(self, ctx): + upstream = ctx.shared.get('a') + observed['downstream_seen_value'] = ( + upstream.outputs_per_input[0][1]['v'] if upstream else None + ) + return ModelResult( + model_id='b', version_id=self.version_id, + version_str=self.version_str, role=self.role, + outputs_per_input=[(make_input_ref('x'), {'echo': observed['downstream_seen_value']})], + ) + + reg = ModelRegistry() + reg.register_class(M_A) + reg.register_class(M_B) + rows = [ + _version_row(1, 'a', ROLE_PRIMARY), + _version_row(2, 'a', ROLE_SHADOW), + _version_row(3, 'b', ROLE_PRIMARY), + ] + plan = reg.build_plan_from_rows(rows) + + pr, pm, p1, p2 = self._collect_persisted() + ctx = self._ctx() + DAGExecutor(plan, persist_fn=p1, persist_metrics_fn=p2).run(ctx) + + # downstream 이 본 값은 PRIMARY(100), SHADOW(999) 가 아님 + self.assertEqual(observed['downstream_seen_value'], 100) + + def test_primary_failure_skips_downstream(self): + reg = ModelRegistry() + reg.register_class(_make_model_class('a', raise_for_role=ROLE_PRIMARY)) + reg.register_class(_make_model_class('b', ['a'])) + rows = [ + _version_row(1, 'a', ROLE_PRIMARY), + _version_row(2, 'b', ROLE_PRIMARY), + ] + plan = reg.build_plan_from_rows(rows) + + pr, pm, p1, p2 = self._collect_persisted() + ctx = self._ctx() + summary = DAGExecutor(plan, persist_fn=p1, persist_metrics_fn=p2).run(ctx) + + self.assertNotIn('a', ctx.shared) + self.assertNotIn('b', ctx.shared) + self.assertGreaterEqual(summary['failed'], 1) + self.assertGreaterEqual(summary['skipped_missing_deps'], 1) + + def test_shadow_failure_does_not_affect_primary_or_persist(self): + cls_ok_primary = _make_model_class('m') + cls_bad_shadow = _make_model_class('m', raise_for_role=ROLE_SHADOW) + # 같은 model_id 를 다른 클래스로 덮으면 Registry 가 ValueError — 대신 같은 클래스 재사용 + reg = ModelRegistry() + reg.register_class(_make_model_class('m', raise_for_role=ROLE_SHADOW)) + rows = [ + _version_row(1, 'm', ROLE_PRIMARY), + _version_row(2, 'm', ROLE_SHADOW), + ] + plan = reg.build_plan_from_rows(rows) + + pr, pm, p1, p2 = self._collect_persisted() + ctx = self._ctx() + summary = DAGExecutor(plan, persist_fn=p1, persist_metrics_fn=p2).run(ctx) + + self.assertEqual(summary['executed'], 1) # PRIMARY 성공 + self.assertEqual(summary['shadow_failed'], 1) + self.assertEqual(summary['shadow_ran'], 0) + # PRIMARY 는 persist 된다 + self.assertEqual([r.role for r in pr], [ROLE_PRIMARY]) + + def test_shadow_only_without_primary_is_skipped(self): + reg = ModelRegistry() + reg.register_class(_make_model_class('orphan')) + rows = [_version_row(1, 'orphan', ROLE_SHADOW)] + plan = reg.build_plan_from_rows(rows) + + pr, pm, p1, p2 = self._collect_persisted() + ctx = self._ctx() + summary = DAGExecutor(plan, persist_fn=p1, persist_metrics_fn=p2).run(ctx) + self.assertEqual(summary['executed'], 0) + self.assertNotIn('orphan', ctx.shared) + + +class SilentErrorGuardTest(unittest.TestCase): + """V034 스키마 컬럼 사이즈 초과 silent 실패 방지.""" + + def test_model_id_too_long_rejected_at_instantiation(self): + class _TooLong(BaseDetectionModel): + model_id = 'x' * 65 # VARCHAR(64) 초과 + + def run(self, ctx): # pragma: no cover + return ModelResult( + model_id=self.model_id, version_id=self.version_id, + version_str=self.version_str, role=self.role, + ) + + with self.assertRaises(ValueError): + _TooLong(version_id=1, version_str='1', role=ROLE_PRIMARY, params={}) + + def test_long_metric_key_dropped_with_warning(self): + """_persist_metrics 가 64자 초과 metric_key 를 dropna silent 로 저장하지 않는다.""" + from models_core import executor as ex + + # fake conn (cursor context manager 불필요 — _execute_insert 가 단순 호출) + captured_rows: list = [] + + def fake_exec(sql, rows, *, conn=None): + captured_rows.extend(rows) + + orig = ex._execute_insert + ex._execute_insert = fake_exec + try: + r = ModelResult( + model_id='m', version_id=1, version_str='1', role=ROLE_PRIMARY, + outputs_per_input=[], + metrics={ + 'ok_key': 1.0, + 'x' * 65: 2.0, # 초과 + }, + duration_ms=10, + ) + ex._persist_metrics(r, cycle_started_at=datetime(2026, 4, 20)) + keys = [row[3] for row in captured_rows] # 4번째 컬럼이 metric_key + self.assertIn('ok_key', keys) + self.assertNotIn('x' * 65, keys) + # cycle_duration_ms / output_count 기본값은 포함 + self.assertIn('cycle_duration_ms', keys) + self.assertIn('output_count', keys) + finally: + ex._execute_insert = orig + + +if __name__ == '__main__': + unittest.main()