- models_core 패키지 신설 — BaseDetectionModel / ModelContext / ModelResult + Registry (ACTIVE 버전 인스턴스화, DAG 순환 검출, topo 플랜) + DAGExecutor (PRIMARY→ctx.shared 주입, SHADOW persist-only 오염 차단) + params_loader (5분 TTL 캐시), feature_flag (PREDICTION_USE_MODEL_REGISTRY) - V034 스키마 정합성 사전 검증 + silent error 3건 선제 방어 · model_id VARCHAR(64) 초과 시 __init__ 에서 즉시 ValueError · metric_key VARCHAR(64) 초과는 경고 후 drop (다른 metric 는 저장) · persist 가 ctx.conn 재사용 (pool maxconn=5 고갈 방지) - scheduler.py — 10단계 feature flag 분기 (기본 0, 구 경로 보존) - partition_manager — detection_model_run_outputs 월별 파티션 자동 생성/DROP - 유닛테스트 15 케이스 전체 통과 (DAG 순환, SHADOW 오염 차단, 길이 검증) - snapshot 스크립트 (hourly/diagnostic) 개선 · spoofing gt0/gt03/gt05/gt07 세분화 — 'silent fault' vs 'no signal' 구분 · V030 gear_identity_collisions 원시 섹션 (CRITICAL 51건 OPEN 포착) · V034 detection_model_* 모니터링 섹션 (Phase 2 대비) · stage timing 집계 + stats_hourly vs events category drift 감시 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
177 lines
5.2 KiB
Python
177 lines
5.2 KiB
Python
"""`detection_model_versions.params` JSONB 로더 + 5분 TTL 캐시.
|
||
|
||
- correlation_param_models 패턴의 일반화 — **매 사이클 재로드**를 기본으로,
|
||
다만 한 사이클 내에서 여러 번 조회되는 경우를 위해 TTL 캐시를 둔다.
|
||
- Registry 가 ACTIVE 버전 목록을 조회할 때와 executor 가 개별 버전 params 를
|
||
쓸 때 공통으로 사용.
|
||
|
||
반환 스키마:
|
||
VersionRow = {
|
||
'id': int, # detection_model_versions.id
|
||
'model_id': str,
|
||
'version': str,
|
||
'role': str, # PRIMARY / SHADOW / CHALLENGER
|
||
'params': dict, # JSONB
|
||
}
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import threading
|
||
import time
|
||
from typing import Optional, TypedDict
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class VersionRow(TypedDict):
|
||
id: int
|
||
model_id: str
|
||
version: str
|
||
role: str
|
||
params: dict
|
||
|
||
|
||
_DEFAULT_TTL_SEC = 300 # 5분
|
||
|
||
|
||
class _ParamsCache:
|
||
"""간단 TTL 캐시 (프로세스 로컬).
|
||
|
||
thread-safe: Registry 재구성은 사이클 시작 스레드에서만 일어나지만
|
||
APScheduler 가 동시 job 을 허용할 수 있어 락으로 보호한다.
|
||
"""
|
||
|
||
def __init__(self, ttl_sec: int = _DEFAULT_TTL_SEC) -> None:
|
||
self._ttl = ttl_sec
|
||
self._lock = threading.Lock()
|
||
self._rows: Optional[list[VersionRow]] = None
|
||
self._loaded_at: float = 0.0
|
||
|
||
def get(self, conn, *, force: bool = False) -> list[VersionRow]:
|
||
now = time.time()
|
||
with self._lock:
|
||
stale = (
|
||
self._rows is None
|
||
or force
|
||
or (now - self._loaded_at) > self._ttl
|
||
)
|
||
if stale:
|
||
self._rows = _fetch_active_versions(conn)
|
||
self._loaded_at = now
|
||
logger.info(
|
||
'params cache reloaded: %d ACTIVE versions (ttl=%ds)',
|
||
len(self._rows), self._ttl,
|
||
)
|
||
return list(self._rows or [])
|
||
|
||
def invalidate(self) -> None:
|
||
with self._lock:
|
||
self._rows = None
|
||
self._loaded_at = 0.0
|
||
|
||
|
||
_cache = _ParamsCache()
|
||
|
||
|
||
def load_active_versions(conn, *, force_reload: bool = False) -> list[VersionRow]:
|
||
"""ACTIVE 상태의 모든 model_id × version 을 한 번에 조회.
|
||
|
||
model 단위로 PRIMARY 1 개 + SHADOW/CHALLENGER N 개가 섞여 반환될 수 있다.
|
||
Registry 가 그룹화를 담당.
|
||
|
||
Args:
|
||
conn: psycopg2 connection
|
||
force_reload: True 면 TTL 무시하고 DB 재조회
|
||
|
||
Returns:
|
||
VersionRow 리스트
|
||
"""
|
||
return _cache.get(conn, force=force_reload)
|
||
|
||
|
||
def invalidate_cache() -> None:
|
||
"""운영자 API 가 version 을 promote·archive 한 직후 호출하면
|
||
다음 조회에서 즉시 DB 재로드가 일어난다.
|
||
"""
|
||
_cache.invalidate()
|
||
|
||
|
||
def _fetch_active_versions(conn) -> list[VersionRow]:
|
||
"""SQL — kcg.detection_model_versions WHERE status='ACTIVE'.
|
||
|
||
JSONB 는 psycopg2 기본 설정에서 이미 dict 로 반환되지만, 안전을 위해
|
||
str 인 경우에도 json.loads 로 파싱한다.
|
||
"""
|
||
sql = """
|
||
SELECT v.id,
|
||
v.model_id,
|
||
v.version,
|
||
v.role,
|
||
v.params
|
||
FROM kcg.detection_model_versions v
|
||
JOIN kcg.detection_models m ON m.model_id = v.model_id
|
||
WHERE v.status = 'ACTIVE'
|
||
AND m.is_enabled = TRUE
|
||
ORDER BY v.model_id,
|
||
CASE v.role
|
||
WHEN 'PRIMARY' THEN 0
|
||
WHEN 'CHALLENGER' THEN 1
|
||
WHEN 'SHADOW' THEN 2
|
||
ELSE 3
|
||
END,
|
||
v.id
|
||
"""
|
||
rows: list[VersionRow] = []
|
||
cur = conn.cursor()
|
||
try:
|
||
cur.execute(sql)
|
||
for row in cur.fetchall():
|
||
vid, model_id, version, role, params = row
|
||
if isinstance(params, (bytes, bytearray)):
|
||
params = params.decode('utf-8')
|
||
if isinstance(params, str):
|
||
try:
|
||
params = json.loads(params)
|
||
except json.JSONDecodeError:
|
||
logger.warning(
|
||
'detection_model_versions.id=%s params JSON decode failed — treated as {}',
|
||
vid,
|
||
)
|
||
params = {}
|
||
rows.append(
|
||
VersionRow(
|
||
id=int(vid),
|
||
model_id=str(model_id),
|
||
version=str(version),
|
||
role=str(role),
|
||
params=dict(params or {}),
|
||
)
|
||
)
|
||
return rows
|
||
finally:
|
||
cur.close()
|
||
|
||
|
||
def load_dependencies(conn) -> list[tuple[str, str, str]]:
|
||
"""detection_model_dependencies 전체 엣지 반환.
|
||
|
||
Returns:
|
||
[(model_id, depends_on, input_key), ...]
|
||
"""
|
||
sql = """
|
||
SELECT model_id, depends_on, input_key
|
||
FROM kcg.detection_model_dependencies
|
||
ORDER BY model_id, depends_on, input_key
|
||
"""
|
||
cur = conn.cursor()
|
||
try:
|
||
cur.execute(sql)
|
||
return [
|
||
(str(m), str(d), str(k))
|
||
for m, d, k in cur.fetchall()
|
||
]
|
||
finally:
|
||
cur.close()
|