"""DAGExecutor — ExecutionPlan 을 실제로 돌리고 DB 에 결과/메트릭을 기록한다. 불변식 (테스트로도 검증): 1. PRIMARY 실행 결과만 `ctx.shared[model_id]` 에 주입 (후행 모델의 입력 소스). 2. SHADOW/CHALLENGER 결과는 `detection_model_run_outputs` 에 저장만, shared 에 **절대 주입 금지**. 3. PRIMARY 가 실패하면 후행 모델 실행 skip (upstream 결과 없음). SHADOW/CHALLENGER 실패는 그 버전만 skip, 다른 버전·후행 모델에 영향 없음. DB persist: - detection_model_run_outputs (PARTITION BY cycle_started_at): execute_values 배치 INSERT - detection_model_metrics: 집계 메트릭 참고: docs/prediction-analysis.md §7, plans/vast-tinkering-knuth.md Phase 1-2 """ from __future__ import annotations import logging import time from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Optional from pipeline.stage_runner import run_stage from .base import BaseDetectionModel, ModelContext, ModelResult, ROLE_PRIMARY from .feature_flag import concurrent_shadows from .registry import ExecutionPlan logger = logging.getLogger(__name__) # V034 스키마 VARCHAR(64) — 초과하면 persist 가 silent 하게 실패하므로 선제 절단·경고 _METRIC_KEY_MAXLEN = 64 class DAGExecutor: """ExecutionPlan 을 실행하고 DB persist 를 담당. persist 는 ctx.conn 을 재사용한다 (pool 중복 획득 방지). ctx.conn 이 None 이면 기본 persist 함수들이 자체적으로 get_conn() 호출. """ def __init__( self, plan: ExecutionPlan, *, persist_fn=None, persist_metrics_fn=None, ) -> None: self.plan = plan # 테스트에서 DB 없이 돌리기 위해 persist 훅을 주입 가능하게 만든다. self._persist_fn = persist_fn or _persist_run_outputs self._persist_metrics_fn = persist_metrics_fn or _persist_metrics self._ctx_conn = None # run() 진입 시 셋업 def run(self, ctx: ModelContext) -> dict: """전체 Plan 실행. Returns: {'executed': int, 'failed': int, 'shadow_ran': int, 'shadow_failed': int} """ # ctx.conn 이 있으면 persist 도 이 conn 을 재사용하도록 보관한다. # (maxconn=5 pool 고갈 방지 — persist 마다 별도 get_conn() 획득 금지) self._ctx_conn = getattr(ctx, 'conn', None) summary = { 'executed': 0, 'failed': 0, 'skipped_missing_deps': 0, 'shadow_ran': 0, 'shadow_failed': 0, } for model_id in self.plan.topo_order: primary = self.plan.primaries.get(model_id) shadows = list(self.plan.shadows.get(model_id, [])) if primary is None: # PRIMARY 없이 SHADOW 만 있는 모델은 실행 불가 (비교 기준이 없음) if shadows: logger.warning( 'model %s has %d SHADOW/CHALLENGER but no PRIMARY — skipping', model_id, len(shadows), ) continue # upstream PRIMARY 결과가 모두 있는지 확인 missing = [ dep for dep in self.plan.edges.get(model_id, ()) if dep not in ctx.shared ] if missing: summary['skipped_missing_deps'] += 1 logger.warning( 'skip %s — upstream PRIMARY missing: %s', primary.label(), missing, ) # SHADOW 도 같은 이유로 스킵 (정당한 비교 불가) continue primary_result = self._run_single(primary, ctx) if primary_result is None: summary['failed'] += 1 # SHADOW 는 같은 입력이 있어야 비교 의미가 있으므로 이 사이클에선 스킵 continue summary['executed'] += 1 ctx.shared[model_id] = primary_result self._persist(primary_result, ctx.cycle_started_at) # SHADOW/CHALLENGER 는 shared 주입 **금지** — 결과 persist 만 if shadows: ran, failed = self._run_shadows(shadows, ctx) summary['shadow_ran'] += ran summary['shadow_failed'] += failed logger.info( 'DAGExecutor done: executed=%d failed=%d skip_deps=%d shadow_ran=%d shadow_failed=%d', summary['executed'], summary['failed'], summary['skipped_missing_deps'], summary['shadow_ran'], summary['shadow_failed'], ) return summary # ------------------------------------------------------------------ def _run_single(self, model: BaseDetectionModel, ctx: ModelContext) -> Optional[ModelResult]: """run_stage 로 감싸서 실패 격리 + 지속시간 계측.""" t0 = time.time() result = run_stage(model.label(), model.run, ctx, required=False) if result is None: return None # duration_ms 가 비어있으면 여기서 채움 if not result.duration_ms: result.duration_ms = int((time.time() - t0) * 1000) return result def _run_shadows( self, shadows: list[BaseDetectionModel], ctx: ModelContext, ) -> tuple[int, int]: ran = 0 failed = 0 if concurrent_shadows() and len(shadows) > 1: with ThreadPoolExecutor(max_workers=min(4, len(shadows))) as pool: futures = {pool.submit(self._run_single, s, ctx): s for s in shadows} for fut in as_completed(futures): s = futures[fut] try: r = fut.result() except Exception: logger.exception('shadow %s raised', s.label()) r = None if r is None: failed += 1 continue ran += 1 self._persist(r, ctx.cycle_started_at) else: for s in shadows: r = self._run_single(s, ctx) if r is None: failed += 1 continue ran += 1 self._persist(r, ctx.cycle_started_at) return ran, failed def _persist(self, result: ModelResult, cycle_started_at) -> None: conn = self._ctx_conn try: self._persist_fn(result, cycle_started_at, conn=conn) except Exception: logger.exception( 'failed to persist run_outputs for %s', result.model_id, ) try: self._persist_metrics_fn(result, cycle_started_at, conn=conn) except Exception: logger.exception( 'failed to persist metrics for %s', result.model_id, ) # ---------------------------------------------------------------------- # 기본 persist 구현 — kcgdb 연결을 얻어서 직접 INSERT # ---------------------------------------------------------------------- _INSERT_RUN_OUTPUTS = """ INSERT INTO kcg.detection_model_run_outputs ( cycle_started_at, model_id, version_id, role, input_ref, outputs, cycle_duration_ms ) VALUES %s """ _INSERT_METRICS = """ INSERT INTO kcg.detection_model_metrics ( model_id, version_id, role, metric_key, metric_value, cycle_started_at ) VALUES %s """ def _persist_run_outputs(result: ModelResult, cycle_started_at, *, conn=None) -> None: """detection_model_run_outputs 배치 INSERT. conn 이 전달되면 **재사용** (pool 중복 획득 방지, 커밋 책임은 호출자). None 이면 자체적으로 kcgdb.get_conn() 으로 커넥션을 얻고 직접 커밋. """ if not result.outputs_per_input: return from psycopg2.extras import Json, execute_values rows = [ ( cycle_started_at, result.model_id, result.version_id, result.role, Json(input_ref or {}), Json(output or {}), result.duration_ms, ) for input_ref, output in result.outputs_per_input ] _execute_insert(_INSERT_RUN_OUTPUTS, rows, conn=conn) def _execute_insert(sql: str, rows: list, *, conn=None) -> None: """execute_values 공통 — conn 재사용 시 commit 은 호출자 책임.""" if not rows: return from psycopg2.extras import execute_values if conn is not None: cur = conn.cursor() try: execute_values(cur, sql, rows, page_size=200) conn.commit() except Exception: conn.rollback() raise finally: cur.close() return from db import kcgdb with kcgdb.get_conn() as fresh_conn: cur = fresh_conn.cursor() try: execute_values(cur, sql, rows, page_size=200) fresh_conn.commit() except Exception: fresh_conn.rollback() raise finally: cur.close() def _persist_metrics(result: ModelResult, cycle_started_at, *, conn=None) -> None: """detection_model_metrics 배치 INSERT. cycle_duration_ms 기본 포함. conn 이 전달되면 재사용, None 이면 자체 get_conn(). metric_key VARCHAR(64) 초과는 경고 후 드롭 (silent 실패 방지). """ metrics = dict(result.metrics or {}) metrics.setdefault('cycle_duration_ms', float(result.duration_ms)) metrics.setdefault('output_count', float(len(result.outputs_per_input))) rows = [] for key, val in metrics.items(): if val is None: continue if len(key) > _METRIC_KEY_MAXLEN: logger.warning( 'metric_key %r exceeds VARCHAR(%d) — dropping (model=%s version=%s)', key, _METRIC_KEY_MAXLEN, result.model_id, result.version_id, ) continue rows.append(( result.model_id, result.version_id, result.role, key, float(val), cycle_started_at, )) _execute_insert(_INSERT_METRICS, rows, conn=conn) __all__ = ['DAGExecutor']