kcg-ai-monitoring/prediction/models_core/base.py
htlee 2ceeb966d8 feat(prediction): Phase 1-2 detection model registry + snapshot 관찰 보강
- models_core 패키지 신설 — BaseDetectionModel / ModelContext / ModelResult
  + Registry (ACTIVE 버전 인스턴스화, DAG 순환 검출, topo 플랜)
  + DAGExecutor (PRIMARY→ctx.shared 주입, SHADOW persist-only 오염 차단)
  + params_loader (5분 TTL 캐시), feature_flag (PREDICTION_USE_MODEL_REGISTRY)
- V034 스키마 정합성 사전 검증 + silent error 3건 선제 방어
  · model_id VARCHAR(64) 초과 시 __init__ 에서 즉시 ValueError
  · metric_key VARCHAR(64) 초과는 경고 후 drop (다른 metric 는 저장)
  · persist 가 ctx.conn 재사용 (pool maxconn=5 고갈 방지)
- scheduler.py — 10단계 feature flag 분기 (기본 0, 구 경로 보존)
- partition_manager — detection_model_run_outputs 월별 파티션 자동 생성/DROP
- 유닛테스트 15 케이스 전체 통과 (DAG 순환, SHADOW 오염 차단, 길이 검증)
- snapshot 스크립트 (hourly/diagnostic) 개선
  · spoofing gt0/gt03/gt05/gt07 세분화 — 'silent fault' vs 'no signal' 구분
  · V030 gear_identity_collisions 원시 섹션 (CRITICAL 51건 OPEN 포착)
  · V034 detection_model_* 모니터링 섹션 (Phase 2 대비)
  · stage timing 집계 + stats_hourly vs events category drift 감시

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 08:07:29 +09:00

151 lines
5.9 KiB
Python

"""Detection Model 추상 계층.
prediction 모듈의 기존 함수형 알고리즘(`algorithms/*`) 을 그대로 두고,
Adapter 형태로 감싸서 "모델 단위 실행·버전·파라미터"를 표준화한다.
설계:
- `ModelContext` — 한 사이클의 공통 입력/공유 상태 (불변 전제)
- `ModelResult` — 한 모델·한 버전의 실행 결과 (입력별 output + 메트릭)
- `BaseDetectionModel` — 등록 가능한 최소 계약 (model_id / version / role / params / run)
불변식:
- SHADOW/CHALLENGER 는 `ctx.shared[model_id]` 에 기록되지 않음 (Executor 책임)
- `params` 는 DRAFT 로 수정, ACTIVE 는 immutable 스냅샷 (DB 제약과 같은 규약)
"""
from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional
logger = logging.getLogger(__name__)
# role 상수 — DB CHECK 제약과 동일한 문자열
ROLE_PRIMARY = 'PRIMARY'
ROLE_SHADOW = 'SHADOW'
ROLE_CHALLENGER = 'CHALLENGER'
ALLOWED_ROLES = (ROLE_PRIMARY, ROLE_SHADOW, ROLE_CHALLENGER)
@dataclass
class ModelContext:
"""한 사이클 공통 입력 + 모델 간 공유 상태.
Attributes:
cycle_started_at: 사이클 시작 시각 (모든 모델·버전이 공유)
vessel_store: 인메모리 AIS 캐시 (Optional — 테스트 시 None 허용)
conn: kcgdb psycopg2 connection (Optional — 테스트 시 None 허용)
shared: 선행 모델 PRIMARY 결과 보관소. key=model_id, value=ModelResult
SHADOW/CHALLENGER 는 여기에 쓰지 않는다 (오염 차단).
inputs: 모델이 소비할 공통 입력 목록 (선박 row 등). 버전 간 공정 비교 보장.
extras: 필요시 모델별 보조 데이터 (feature flag, tunable 등)
"""
cycle_started_at: datetime
vessel_store: Any = None
conn: Any = None
shared: dict = field(default_factory=dict)
inputs: list = field(default_factory=list)
extras: dict = field(default_factory=dict)
@dataclass
class ModelResult:
"""한 모델·한 버전의 실행 결과.
Attributes:
model_id: 모델 식별자
version_id: detection_model_versions.id
version_str: 'v1.0.0' 등 사람이 읽는 버전 문자열
role: PRIMARY / SHADOW / CHALLENGER
outputs_per_input: [(input_ref, output_dict), ...]
input_ref 는 비교용 키(예: {'mmsi': '412...', 'analyzed_at': ...})
output_dict 는 JSONB 저장 가능한 결과 snapshot
metrics: detection_model_metrics 로 기록될 집계 관측치
(key=metric_key, value=numeric)
duration_ms: 이 버전 단위 실행 소요
"""
model_id: str
version_id: int
version_str: str
role: str
outputs_per_input: list[tuple[dict, dict]] = field(default_factory=list)
metrics: dict[str, float] = field(default_factory=dict)
duration_ms: int = 0
class BaseDetectionModel(ABC):
"""탐지 모델 추상 베이스.
구현체는 `prediction/models_core/registered/` 하위에 두고
`ModelRegistry.discover_classes()` 가 자동 import 한다.
클래스 레벨 속성(model_id / depends_on) 은 **클래스 정의 시** 고정,
인스턴스 속성(version_id / version_str / role / params) 은
`ModelRegistry` 가 ACTIVE 버전 스냅샷을 읽어 주입한다.
한 `BaseDetectionModel` 서브클래스에 대해 DB 에 N 개 ACTIVE 버전이 있으면
Registry 는 **각 버전마다 별도 인스턴스**를 생성한다 (PRIMARY 1 + SHADOW/CHALLENGER N).
"""
# --- 클래스 메타 (서브클래스가 override) ---
model_id: str = ''
depends_on: list[str] = []
# V034 스키마 컬럼 길이 상한 — 운영자 실수·장기 실행에서 silent 한 persist 실패를
# 방지하기 위해 클래스 정의 시점에 선제 검증한다.
_MODEL_ID_MAXLEN = 64
def __init__(
self,
version_id: int,
version_str: str,
role: str,
params: dict,
) -> None:
if role not in ALLOWED_ROLES:
raise ValueError(f'invalid role: {role!r} (expected {ALLOWED_ROLES})')
if not self.model_id:
raise ValueError(
f'{type(self).__name__}.model_id is empty — override as class attribute'
)
if len(self.model_id) > self._MODEL_ID_MAXLEN:
raise ValueError(
f'{type(self).__name__}.model_id too long '
f'({len(self.model_id)} > {self._MODEL_ID_MAXLEN}): {self.model_id!r}'
)
self.version_id = version_id
self.version_str = version_str
self.role = role
self.params: dict = dict(params) if params else {}
# --- 서브클래스 구현 포인트 ---
@abstractmethod
def run(self, ctx: ModelContext) -> ModelResult:
"""한 사이클에 대해 모델을 실행.
반환값의 `outputs_per_input` 은 입력 단위 비교가 가능하도록
**같은 input_ref 스키마를 같은 model_id 내에서 유지**해야 한다.
(PRIMARY 와 SHADOW 의 input_ref 가 일치해야 diff JOIN 이 가능.)
"""
raise NotImplementedError
# --- 편의 ---
def label(self) -> str:
return f'{self.model_id}@{self.role}[{self.version_str}]'
def __repr__(self) -> str: # pragma: no cover
return f'<{type(self).__name__} {self.label()} version_id={self.version_id}>'
def make_input_ref(mmsi: str, analyzed_at: Optional[datetime] = None, **extra) -> dict:
"""관용 input_ref 생성기. PRIMARY/SHADOW 가 같은 포맷을 쓰도록 강제하는 도우미."""
ref: dict[str, Any] = {'mmsi': str(mmsi)}
if analyzed_at is not None:
ref['analyzed_at'] = analyzed_at.isoformat() if isinstance(analyzed_at, datetime) else analyzed_at
for k, v in extra.items():
ref[k] = v
return ref