kcg-ai-monitoring/prediction/tests/test_models_core.py
htlee 2ceeb966d8 feat(prediction): Phase 1-2 detection model registry + snapshot 관찰 보강
- models_core 패키지 신설 — BaseDetectionModel / ModelContext / ModelResult
  + Registry (ACTIVE 버전 인스턴스화, DAG 순환 검출, topo 플랜)
  + DAGExecutor (PRIMARY→ctx.shared 주입, SHADOW persist-only 오염 차단)
  + params_loader (5분 TTL 캐시), feature_flag (PREDICTION_USE_MODEL_REGISTRY)
- V034 스키마 정합성 사전 검증 + silent error 3건 선제 방어
  · model_id VARCHAR(64) 초과 시 __init__ 에서 즉시 ValueError
  · metric_key VARCHAR(64) 초과는 경고 후 drop (다른 metric 는 저장)
  · persist 가 ctx.conn 재사용 (pool maxconn=5 고갈 방지)
- scheduler.py — 10단계 feature flag 분기 (기본 0, 구 경로 보존)
- partition_manager — detection_model_run_outputs 월별 파티션 자동 생성/DROP
- 유닛테스트 15 케이스 전체 통과 (DAG 순환, SHADOW 오염 차단, 길이 검증)
- snapshot 스크립트 (hourly/diagnostic) 개선
  · spoofing gt0/gt03/gt05/gt07 세분화 — 'silent fault' vs 'no signal' 구분
  · V030 gear_identity_collisions 원시 섹션 (CRITICAL 51건 OPEN 포착)
  · V034 detection_model_* 모니터링 섹션 (Phase 2 대비)
  · stage timing 집계 + stats_hourly vs events category drift 감시

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 08:07:29 +09:00

432 lines
16 KiB
Python

"""models_core 기반 인프라 (Phase 1-2) 유닛테스트.
DB·서버 없이 순수 파이썬 레벨에서 다음을 검증:
- params_loader 캐시 TTL 동작
- ModelRegistry discover + 버전별 인스턴스화
- DAG topo 정렬 + 순환 검출
- DAGExecutor 의 오염 차단 불변식 (SHADOW 결과는 ctx.shared 에 들어가지 않음)
- PRIMARY 실패 시 후행 모델 skip
- SHADOW 전용(PRIMARY 없음) 모델 스킵 경고
- run_stage 와의 통합 — 예외가 한 버전에 격리되는지
실제 DB 상호작용은 Phase 1-3 testcontainers 기반에서 수행 (후속 커밋).
"""
from __future__ import annotations
import sys
import types
import unittest
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional
# pydantic_settings stub (기존 test_time_bucket 관용)
_stub = types.ModuleType('pydantic_settings')
class _StubBaseSettings:
def __init__(self, **kwargs):
for name, value in self.__class__.__dict__.items():
if name.isupper():
setattr(self, name, kwargs.get(name, value))
_stub.BaseSettings = _StubBaseSettings
sys.modules.setdefault('pydantic_settings', _stub)
from models_core import base as mc_base
from models_core.base import (
BaseDetectionModel,
ModelContext,
ModelResult,
ROLE_PRIMARY,
ROLE_SHADOW,
make_input_ref,
)
from models_core import params_loader
from models_core.executor import DAGExecutor
from models_core.registry import DAGCycleError, ModelRegistry
# ======================================================================
# Fixture 클래스들
# ======================================================================
@dataclass
class _Call:
model_id: str
role: str
version_id: int
def _make_model_class(mid: str, depends: Optional[list] = None, *, raise_for_role: Optional[str] = None):
"""동적으로 BaseDetectionModel 서브클래스 생성."""
class _M(BaseDetectionModel):
model_id = mid
depends_on = list(depends or [])
def run(self, ctx: ModelContext) -> ModelResult:
if raise_for_role and self.role == raise_for_role:
raise RuntimeError(f'intentional failure in {mid}@{self.role}')
ctx.extras.setdefault('_calls', []).append(
_Call(self.model_id, self.role, self.version_id)
)
# input_ref 스키마를 PRIMARY/SHADOW 동일 유지
out_per = [
(make_input_ref('412000001'), {'score': 1.0 if self.role == ROLE_PRIMARY else 1.5}),
]
return ModelResult(
model_id=self.model_id,
version_id=self.version_id,
version_str=self.version_str,
role=self.role,
outputs_per_input=out_per,
metrics={'sentinel': float(self.version_id)},
)
_M.__name__ = f'_M_{mid.replace(".", "_")}'
return _M
def _version_row(id_, model_id, role, version='1.0.0', params=None):
return params_loader.VersionRow(
id=id_, model_id=model_id, role=role, version=version, params=params or {}
)
# ======================================================================
# params_loader 캐시
# ======================================================================
class ParamsCacheTest(unittest.TestCase):
def setUp(self):
params_loader.invalidate_cache()
def test_invalidate_forces_reload(self):
calls = {'n': 0}
def fake_fetch(conn):
calls['n'] += 1
return [_version_row(1, 'a', ROLE_PRIMARY)]
orig = params_loader._fetch_active_versions
params_loader._fetch_active_versions = fake_fetch
try:
rows1 = params_loader.load_active_versions(conn=None)
rows2 = params_loader.load_active_versions(conn=None)
self.assertEqual(calls['n'], 1) # 두 번째는 캐시 HIT
self.assertEqual(len(rows1), 1)
self.assertEqual(len(rows2), 1)
params_loader.invalidate_cache()
params_loader.load_active_versions(conn=None)
self.assertEqual(calls['n'], 2)
finally:
params_loader._fetch_active_versions = orig
params_loader.invalidate_cache()
def test_force_reload_bypasses_ttl(self):
calls = {'n': 0}
def fake_fetch(conn):
calls['n'] += 1
return []
orig = params_loader._fetch_active_versions
params_loader._fetch_active_versions = fake_fetch
try:
params_loader.load_active_versions(conn=None)
params_loader.load_active_versions(conn=None, force_reload=True)
self.assertEqual(calls['n'], 2)
finally:
params_loader._fetch_active_versions = orig
params_loader.invalidate_cache()
# ======================================================================
# Registry topo 정렬 + DAG 검증
# ======================================================================
class RegistryTopoTest(unittest.TestCase):
def _registry_with(self, *model_ids_with_deps):
"""[(model_id, [dep_ids]), ...] 에 맞춘 Registry 생성."""
reg = ModelRegistry()
for mid, deps in model_ids_with_deps:
reg.register_class(_make_model_class(mid, deps))
return reg
def test_topo_order_respects_dependencies(self):
reg = self._registry_with(
('a', []),
('b', ['a']),
('c', ['b']),
)
rows = [
_version_row(10, 'a', ROLE_PRIMARY),
_version_row(11, 'b', ROLE_PRIMARY),
_version_row(12, 'c', ROLE_PRIMARY),
]
plan = reg.build_plan_from_rows(rows)
self.assertEqual(plan.topo_order, ['a', 'b', 'c'])
def test_cycle_detection(self):
reg = self._registry_with(
('a', ['b']),
('b', ['a']),
)
rows = [
_version_row(1, 'a', ROLE_PRIMARY),
_version_row(2, 'b', ROLE_PRIMARY),
]
with self.assertRaises(DAGCycleError):
reg.build_plan_from_rows(rows)
def test_shadow_version_attaches_to_primary_model(self):
reg = self._registry_with(('a', []))
rows = [
_version_row(1, 'a', ROLE_PRIMARY, version='1.0.0'),
_version_row(2, 'a', ROLE_SHADOW, version='1.1.0-shadow'),
_version_row(3, 'a', ROLE_SHADOW, version='1.2.0-shadow'),
]
plan = reg.build_plan_from_rows(rows)
self.assertIn('a', plan.primaries)
self.assertEqual(plan.primaries['a'].version_id, 1)
self.assertEqual(len(plan.shadows['a']), 2)
def test_unknown_model_id_skipped(self):
reg = ModelRegistry() # 클래스 없음
rows = [_version_row(1, 'ghost', ROLE_PRIMARY)]
plan = reg.build_plan_from_rows(rows)
self.assertNotIn('ghost', plan.primaries)
def test_class_depends_on_added_to_edges(self):
reg = self._registry_with(
('base', []),
('child', ['base']),
)
rows = [
_version_row(1, 'base', ROLE_PRIMARY),
_version_row(2, 'child', ROLE_PRIMARY),
]
plan = reg.build_plan_from_rows(rows)
self.assertIn('base', plan.edges['child'])
# ======================================================================
# DAGExecutor 불변식
# ======================================================================
class DAGExecutorTest(unittest.TestCase):
def _collect_persisted(self):
"""persist 훅 2개를 만들어 호출을 가로채는 pair 반환."""
persisted_rows: list[ModelResult] = []
persisted_metrics: list[ModelResult] = []
def p_rows(result: ModelResult, cycle_started_at, *, conn=None):
persisted_rows.append(result)
def p_metrics(result: ModelResult, cycle_started_at, *, conn=None):
persisted_metrics.append(result)
return persisted_rows, persisted_metrics, p_rows, p_metrics
def _ctx(self):
return ModelContext(cycle_started_at=datetime(2026, 4, 20, 0, 0, tzinfo=timezone.utc))
def test_primary_result_injected_into_shared(self):
reg = ModelRegistry()
reg.register_class(_make_model_class('a'))
reg.register_class(_make_model_class('b', ['a']))
rows = [
_version_row(1, 'a', ROLE_PRIMARY),
_version_row(2, 'b', ROLE_PRIMARY),
]
plan = reg.build_plan_from_rows(rows)
pr, pm, p1, p2 = self._collect_persisted()
ctx = self._ctx()
DAGExecutor(plan, persist_fn=p1, persist_metrics_fn=p2).run(ctx)
self.assertIn('a', ctx.shared)
self.assertIn('b', ctx.shared)
self.assertEqual(ctx.shared['a'].role, ROLE_PRIMARY)
def test_shadow_result_not_injected_into_shared(self):
"""가장 중요한 불변식 — SHADOW 결과가 ctx.shared 에 들어가면 오염."""
reg = ModelRegistry()
reg.register_class(_make_model_class('m'))
rows = [
_version_row(1, 'm', ROLE_PRIMARY),
_version_row(2, 'm', ROLE_SHADOW),
]
plan = reg.build_plan_from_rows(rows)
pr, pm, p1, p2 = self._collect_persisted()
ctx = self._ctx()
DAGExecutor(plan, persist_fn=p1, persist_metrics_fn=p2).run(ctx)
# shared 는 PRIMARY 만
self.assertEqual(ctx.shared['m'].role, ROLE_PRIMARY)
self.assertEqual(ctx.shared['m'].version_id, 1)
# 저장은 둘 다 된다
persisted_roles = {r.role for r in pr}
self.assertIn(ROLE_PRIMARY, persisted_roles)
self.assertIn(ROLE_SHADOW, persisted_roles)
def test_downstream_sees_primary_only_even_when_shadow_differs(self):
"""SHADOW 가 다른 값을 리턴해도 후행 PRIMARY 는 선행 PRIMARY 결과만 소비."""
class M_A(BaseDetectionModel):
model_id = 'a'
depends_on = []
def run(self, ctx):
val = 100 if self.role == ROLE_PRIMARY else 999
return ModelResult(
model_id='a', version_id=self.version_id,
version_str=self.version_str, role=self.role,
outputs_per_input=[(make_input_ref('x'), {'v': val})],
metrics={},
)
observed = {'downstream_seen_value': None}
class M_B(BaseDetectionModel):
model_id = 'b'
depends_on = ['a']
def run(self, ctx):
upstream = ctx.shared.get('a')
observed['downstream_seen_value'] = (
upstream.outputs_per_input[0][1]['v'] if upstream else None
)
return ModelResult(
model_id='b', version_id=self.version_id,
version_str=self.version_str, role=self.role,
outputs_per_input=[(make_input_ref('x'), {'echo': observed['downstream_seen_value']})],
)
reg = ModelRegistry()
reg.register_class(M_A)
reg.register_class(M_B)
rows = [
_version_row(1, 'a', ROLE_PRIMARY),
_version_row(2, 'a', ROLE_SHADOW),
_version_row(3, 'b', ROLE_PRIMARY),
]
plan = reg.build_plan_from_rows(rows)
pr, pm, p1, p2 = self._collect_persisted()
ctx = self._ctx()
DAGExecutor(plan, persist_fn=p1, persist_metrics_fn=p2).run(ctx)
# downstream 이 본 값은 PRIMARY(100), SHADOW(999) 가 아님
self.assertEqual(observed['downstream_seen_value'], 100)
def test_primary_failure_skips_downstream(self):
reg = ModelRegistry()
reg.register_class(_make_model_class('a', raise_for_role=ROLE_PRIMARY))
reg.register_class(_make_model_class('b', ['a']))
rows = [
_version_row(1, 'a', ROLE_PRIMARY),
_version_row(2, 'b', ROLE_PRIMARY),
]
plan = reg.build_plan_from_rows(rows)
pr, pm, p1, p2 = self._collect_persisted()
ctx = self._ctx()
summary = DAGExecutor(plan, persist_fn=p1, persist_metrics_fn=p2).run(ctx)
self.assertNotIn('a', ctx.shared)
self.assertNotIn('b', ctx.shared)
self.assertGreaterEqual(summary['failed'], 1)
self.assertGreaterEqual(summary['skipped_missing_deps'], 1)
def test_shadow_failure_does_not_affect_primary_or_persist(self):
cls_ok_primary = _make_model_class('m')
cls_bad_shadow = _make_model_class('m', raise_for_role=ROLE_SHADOW)
# 같은 model_id 를 다른 클래스로 덮으면 Registry 가 ValueError — 대신 같은 클래스 재사용
reg = ModelRegistry()
reg.register_class(_make_model_class('m', raise_for_role=ROLE_SHADOW))
rows = [
_version_row(1, 'm', ROLE_PRIMARY),
_version_row(2, 'm', ROLE_SHADOW),
]
plan = reg.build_plan_from_rows(rows)
pr, pm, p1, p2 = self._collect_persisted()
ctx = self._ctx()
summary = DAGExecutor(plan, persist_fn=p1, persist_metrics_fn=p2).run(ctx)
self.assertEqual(summary['executed'], 1) # PRIMARY 성공
self.assertEqual(summary['shadow_failed'], 1)
self.assertEqual(summary['shadow_ran'], 0)
# PRIMARY 는 persist 된다
self.assertEqual([r.role for r in pr], [ROLE_PRIMARY])
def test_shadow_only_without_primary_is_skipped(self):
reg = ModelRegistry()
reg.register_class(_make_model_class('orphan'))
rows = [_version_row(1, 'orphan', ROLE_SHADOW)]
plan = reg.build_plan_from_rows(rows)
pr, pm, p1, p2 = self._collect_persisted()
ctx = self._ctx()
summary = DAGExecutor(plan, persist_fn=p1, persist_metrics_fn=p2).run(ctx)
self.assertEqual(summary['executed'], 0)
self.assertNotIn('orphan', ctx.shared)
class SilentErrorGuardTest(unittest.TestCase):
"""V034 스키마 컬럼 사이즈 초과 silent 실패 방지."""
def test_model_id_too_long_rejected_at_instantiation(self):
class _TooLong(BaseDetectionModel):
model_id = 'x' * 65 # VARCHAR(64) 초과
def run(self, ctx): # pragma: no cover
return ModelResult(
model_id=self.model_id, version_id=self.version_id,
version_str=self.version_str, role=self.role,
)
with self.assertRaises(ValueError):
_TooLong(version_id=1, version_str='1', role=ROLE_PRIMARY, params={})
def test_long_metric_key_dropped_with_warning(self):
"""_persist_metrics 가 64자 초과 metric_key 를 dropna silent 로 저장하지 않는다."""
from models_core import executor as ex
# fake conn (cursor context manager 불필요 — _execute_insert 가 단순 호출)
captured_rows: list = []
def fake_exec(sql, rows, *, conn=None):
captured_rows.extend(rows)
orig = ex._execute_insert
ex._execute_insert = fake_exec
try:
r = ModelResult(
model_id='m', version_id=1, version_str='1', role=ROLE_PRIMARY,
outputs_per_input=[],
metrics={
'ok_key': 1.0,
'x' * 65: 2.0, # 초과
},
duration_ms=10,
)
ex._persist_metrics(r, cycle_started_at=datetime(2026, 4, 20))
keys = [row[3] for row in captured_rows] # 4번째 컬럼이 metric_key
self.assertIn('ok_key', keys)
self.assertNotIn('x' * 65, keys)
# cycle_duration_ms / output_count 기본값은 포함
self.assertIn('cycle_duration_ms', keys)
self.assertIn('output_count', keys)
finally:
ex._execute_insert = orig
if __name__ == '__main__':
unittest.main()