kcg-ai-monitoring/prediction/models_core/executor.py
htlee 2ceeb966d8 feat(prediction): Phase 1-2 detection model registry + snapshot 관찰 보강
- models_core 패키지 신설 — BaseDetectionModel / ModelContext / ModelResult
  + Registry (ACTIVE 버전 인스턴스화, DAG 순환 검출, topo 플랜)
  + DAGExecutor (PRIMARY→ctx.shared 주입, SHADOW persist-only 오염 차단)
  + params_loader (5분 TTL 캐시), feature_flag (PREDICTION_USE_MODEL_REGISTRY)
- V034 스키마 정합성 사전 검증 + silent error 3건 선제 방어
  · model_id VARCHAR(64) 초과 시 __init__ 에서 즉시 ValueError
  · metric_key VARCHAR(64) 초과는 경고 후 drop (다른 metric 는 저장)
  · persist 가 ctx.conn 재사용 (pool maxconn=5 고갈 방지)
- scheduler.py — 10단계 feature flag 분기 (기본 0, 구 경로 보존)
- partition_manager — detection_model_run_outputs 월별 파티션 자동 생성/DROP
- 유닛테스트 15 케이스 전체 통과 (DAG 순환, SHADOW 오염 차단, 길이 검증)
- snapshot 스크립트 (hourly/diagnostic) 개선
  · spoofing gt0/gt03/gt05/gt07 세분화 — 'silent fault' vs 'no signal' 구분
  · V030 gear_identity_collisions 원시 섹션 (CRITICAL 51건 OPEN 포착)
  · V034 detection_model_* 모니터링 섹션 (Phase 2 대비)
  · stage timing 집계 + stats_hourly vs events category drift 감시

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 08:07:29 +09:00

288 lines
10 KiB
Python

"""DAGExecutor — ExecutionPlan 을 실제로 돌리고 DB 에 결과/메트릭을 기록한다.
불변식 (테스트로도 검증):
1. PRIMARY 실행 결과만 `ctx.shared[model_id]` 에 주입 (후행 모델의 입력 소스).
2. SHADOW/CHALLENGER 결과는 `detection_model_run_outputs` 에 저장만, shared 에 **절대 주입 금지**.
3. PRIMARY 가 실패하면 후행 모델 실행 skip (upstream 결과 없음).
SHADOW/CHALLENGER 실패는 그 버전만 skip, 다른 버전·후행 모델에 영향 없음.
DB persist:
- detection_model_run_outputs (PARTITION BY cycle_started_at): execute_values 배치 INSERT
- detection_model_metrics: 집계 메트릭
참고: docs/prediction-analysis.md §7, plans/vast-tinkering-knuth.md Phase 1-2
"""
from __future__ import annotations
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Optional
from pipeline.stage_runner import run_stage
from .base import BaseDetectionModel, ModelContext, ModelResult, ROLE_PRIMARY
from .feature_flag import concurrent_shadows
from .registry import ExecutionPlan
logger = logging.getLogger(__name__)
# V034 스키마 VARCHAR(64) — 초과하면 persist 가 silent 하게 실패하므로 선제 절단·경고
_METRIC_KEY_MAXLEN = 64
class DAGExecutor:
"""ExecutionPlan 을 실행하고 DB persist 를 담당.
persist 는 ctx.conn 을 재사용한다 (pool 중복 획득 방지).
ctx.conn 이 None 이면 기본 persist 함수들이 자체적으로 get_conn() 호출.
"""
def __init__(
self,
plan: ExecutionPlan,
*,
persist_fn=None,
persist_metrics_fn=None,
) -> None:
self.plan = plan
# 테스트에서 DB 없이 돌리기 위해 persist 훅을 주입 가능하게 만든다.
self._persist_fn = persist_fn or _persist_run_outputs
self._persist_metrics_fn = persist_metrics_fn or _persist_metrics
self._ctx_conn = None # run() 진입 시 셋업
def run(self, ctx: ModelContext) -> dict:
"""전체 Plan 실행.
Returns:
{'executed': int, 'failed': int, 'shadow_ran': int, 'shadow_failed': int}
"""
# ctx.conn 이 있으면 persist 도 이 conn 을 재사용하도록 보관한다.
# (maxconn=5 pool 고갈 방지 — persist 마다 별도 get_conn() 획득 금지)
self._ctx_conn = getattr(ctx, 'conn', None)
summary = {
'executed': 0,
'failed': 0,
'skipped_missing_deps': 0,
'shadow_ran': 0,
'shadow_failed': 0,
}
for model_id in self.plan.topo_order:
primary = self.plan.primaries.get(model_id)
shadows = list(self.plan.shadows.get(model_id, []))
if primary is None:
# PRIMARY 없이 SHADOW 만 있는 모델은 실행 불가 (비교 기준이 없음)
if shadows:
logger.warning(
'model %s has %d SHADOW/CHALLENGER but no PRIMARY — skipping',
model_id, len(shadows),
)
continue
# upstream PRIMARY 결과가 모두 있는지 확인
missing = [
dep for dep in self.plan.edges.get(model_id, ())
if dep not in ctx.shared
]
if missing:
summary['skipped_missing_deps'] += 1
logger.warning(
'skip %s — upstream PRIMARY missing: %s',
primary.label(), missing,
)
# SHADOW 도 같은 이유로 스킵 (정당한 비교 불가)
continue
primary_result = self._run_single(primary, ctx)
if primary_result is None:
summary['failed'] += 1
# SHADOW 는 같은 입력이 있어야 비교 의미가 있으므로 이 사이클에선 스킵
continue
summary['executed'] += 1
ctx.shared[model_id] = primary_result
self._persist(primary_result, ctx.cycle_started_at)
# SHADOW/CHALLENGER 는 shared 주입 **금지** — 결과 persist 만
if shadows:
ran, failed = self._run_shadows(shadows, ctx)
summary['shadow_ran'] += ran
summary['shadow_failed'] += failed
logger.info(
'DAGExecutor done: executed=%d failed=%d skip_deps=%d shadow_ran=%d shadow_failed=%d',
summary['executed'], summary['failed'],
summary['skipped_missing_deps'],
summary['shadow_ran'], summary['shadow_failed'],
)
return summary
# ------------------------------------------------------------------
def _run_single(self, model: BaseDetectionModel, ctx: ModelContext) -> Optional[ModelResult]:
"""run_stage 로 감싸서 실패 격리 + 지속시간 계측."""
t0 = time.time()
result = run_stage(model.label(), model.run, ctx, required=False)
if result is None:
return None
# duration_ms 가 비어있으면 여기서 채움
if not result.duration_ms:
result.duration_ms = int((time.time() - t0) * 1000)
return result
def _run_shadows(
self,
shadows: list[BaseDetectionModel],
ctx: ModelContext,
) -> tuple[int, int]:
ran = 0
failed = 0
if concurrent_shadows() and len(shadows) > 1:
with ThreadPoolExecutor(max_workers=min(4, len(shadows))) as pool:
futures = {pool.submit(self._run_single, s, ctx): s for s in shadows}
for fut in as_completed(futures):
s = futures[fut]
try:
r = fut.result()
except Exception:
logger.exception('shadow %s raised', s.label())
r = None
if r is None:
failed += 1
continue
ran += 1
self._persist(r, ctx.cycle_started_at)
else:
for s in shadows:
r = self._run_single(s, ctx)
if r is None:
failed += 1
continue
ran += 1
self._persist(r, ctx.cycle_started_at)
return ran, failed
def _persist(self, result: ModelResult, cycle_started_at) -> None:
conn = self._ctx_conn
try:
self._persist_fn(result, cycle_started_at, conn=conn)
except Exception:
logger.exception(
'failed to persist run_outputs for %s', result.model_id,
)
try:
self._persist_metrics_fn(result, cycle_started_at, conn=conn)
except Exception:
logger.exception(
'failed to persist metrics for %s', result.model_id,
)
# ----------------------------------------------------------------------
# 기본 persist 구현 — kcgdb 연결을 얻어서 직접 INSERT
# ----------------------------------------------------------------------
_INSERT_RUN_OUTPUTS = """
INSERT INTO kcg.detection_model_run_outputs (
cycle_started_at, model_id, version_id, role,
input_ref, outputs, cycle_duration_ms
) VALUES %s
"""
_INSERT_METRICS = """
INSERT INTO kcg.detection_model_metrics (
model_id, version_id, role, metric_key, metric_value, cycle_started_at
) VALUES %s
"""
def _persist_run_outputs(result: ModelResult, cycle_started_at, *, conn=None) -> None:
"""detection_model_run_outputs 배치 INSERT.
conn 이 전달되면 **재사용** (pool 중복 획득 방지, 커밋 책임은 호출자).
None 이면 자체적으로 kcgdb.get_conn() 으로 커넥션을 얻고 직접 커밋.
"""
if not result.outputs_per_input:
return
from psycopg2.extras import Json, execute_values
rows = [
(
cycle_started_at,
result.model_id,
result.version_id,
result.role,
Json(input_ref or {}),
Json(output or {}),
result.duration_ms,
)
for input_ref, output in result.outputs_per_input
]
_execute_insert(_INSERT_RUN_OUTPUTS, rows, conn=conn)
def _execute_insert(sql: str, rows: list, *, conn=None) -> None:
"""execute_values 공통 — conn 재사용 시 commit 은 호출자 책임."""
if not rows:
return
from psycopg2.extras import execute_values
if conn is not None:
cur = conn.cursor()
try:
execute_values(cur, sql, rows, page_size=200)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
cur.close()
return
from db import kcgdb
with kcgdb.get_conn() as fresh_conn:
cur = fresh_conn.cursor()
try:
execute_values(cur, sql, rows, page_size=200)
fresh_conn.commit()
except Exception:
fresh_conn.rollback()
raise
finally:
cur.close()
def _persist_metrics(result: ModelResult, cycle_started_at, *, conn=None) -> None:
"""detection_model_metrics 배치 INSERT. cycle_duration_ms 기본 포함.
conn 이 전달되면 재사용, None 이면 자체 get_conn().
metric_key VARCHAR(64) 초과는 경고 후 드롭 (silent 실패 방지).
"""
metrics = dict(result.metrics or {})
metrics.setdefault('cycle_duration_ms', float(result.duration_ms))
metrics.setdefault('output_count', float(len(result.outputs_per_input)))
rows = []
for key, val in metrics.items():
if val is None:
continue
if len(key) > _METRIC_KEY_MAXLEN:
logger.warning(
'metric_key %r exceeds VARCHAR(%d) — dropping (model=%s version=%s)',
key, _METRIC_KEY_MAXLEN, result.model_id, result.version_id,
)
continue
rows.append((
result.model_id,
result.version_id,
result.role,
key,
float(val),
cycle_started_at,
))
_execute_insert(_INSERT_METRICS, rows, conn=conn)
__all__ = ['DAGExecutor']