fix(prediction): stats_aggregator hour 경계 silent 누락 복구

배경: prediction 5분 interval 이지만 한 사이클 평균 13분 소요라
사이클이 hour 경계를 넘나드는 경우(12:55 시작 → 13:08 완료)가 흔하다.
이 때 사이클 내 생성된 이벤트(occurred_at=12:57)가 aggregate_hourly
호출 시점(now_kst=13:08) 기준 현재 hour=13:00 만 UPSERT 되어
12:00 hour 는 이전 사이클 snapshot 으로 stale 유지되는 silent drop.

실제 포착: 2026-04-20 12:50 CRITICAL GEAR_IDENTITY_COLLISION 이벤트가
prediction_stats_hourly.by_category 12:00 slot 에서 누락. Phase 1-2
snapshot 의 C1 drift 섹션이 only_in_events=GEAR_IDENTITY_COLLISION 으로 탐지.

수정:
- _aggregate_one_hour(conn, hour_start, updated_at): 단일 hour UPSERT 추출
- aggregate_hourly(): 호출 시마다 previous→current 순서로 2번 집계
  · UPSERT 라 idempotent
  · 반환값은 현재 hour (하위 호환)
  · target_hour 지정 케이스도 ±1h 재집계

검증:
- 3 유닛테스트 (경계 호출 2건 / 반환값 / 일 경계) 전수 통과
- 운영 수동 재집계로 12:00 slot GEAR_IDENTITY_COLLISION: 1 복구
- snapshot 재실행 시 C1 drift 0 확인

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
htlee 2026-04-20 13:32:44 +09:00
부모 b3b5a90a57
커밋 7ab6baeed2
3개의 변경된 파일180개의 추가작업 그리고 30개의 파일을 삭제

파일 보기

@ -4,6 +4,9 @@
## [Unreleased]
### 수정
- **`stats_aggregator.aggregate_hourly` hour 경계 silent 누락 버그** — prediction 5분 interval 이지만 한 사이클 평균 소요가 13분이라 사이클이 hour 경계를 넘나드는 경우(예: 12:55 시작 → 13:08 완료)가 흔함. 이 사이클 내 생성된 이벤트(occurred_at=12:57)가 stats_aggregate_hourly 호출 시점(now_kst=13:08)을 기준으로 **현재 hour=13:00 만** UPSERT 되어 12:00 hour 는 이전 사이클 snapshot 으로 stale 유지되는 silent drop. 실제 운영에서 `2026-04-20 12:50 CRITICAL GEAR_IDENTITY_COLLISION` 이벤트가 `prediction_stats_hourly.by_category` 에서 누락된 것을 Phase 1-2 의 snapshot `C1 drift` 섹션이 포착. 수정: `aggregate_hourly()` 가 호출될 때마다 **현재 + 이전 hour 를 모두 UPSERT** (UPSERT idempotent). `_aggregate_one_hour()` 로 단일 hour 집계를 분리하고 `aggregate_hourly()` 가 previous→current 순서로 호출. target_hour 지정 케이스에서도 ±1h 재집계. 반환값은 현재 hour 만 (하위 호환). 3 유닛테스트 (경계 호출 / 반환값 / 일 경계) 통과, 운영 수동 재집계로 2026-04-20 12:00 slot 에 GEAR_IDENTITY_COLLISION: 1 복구 + `C1 drift` 0 확인
### 추가
- **Phase 2 PoC 5 모델 마이그레이션 완료 (2 런타임 + 3 카탈로그)** — Phase 2 PoC 계획서의 5 알고리즘 전체를 detection_model 카탈로그로 등록하고 운영자 파라미터 튜닝 지점을 확보. 모드는 두 층위로 분리:
- **런타임 override 완성 (2 모델)**`dark_suspicion` (tier 3, DARK_VESSEL, 19 가중치 + sog/반복/gap/tier 임계) · `gear_violation_g01_g06` (tier 4, GEAR, 6 G-code 점수 + signal cycling + gear drift + 허용 어구 매핑). 알고리즘 함수에 `params: dict | None = None` 인자 추가, `_merge_default_*_params` 깊이 병합으로 override 가 DEFAULT 를 변조하지 않는 불변성 보장. `params=None` 호출은 Phase 2 이전과 완전 동일 결과 (BACK-COMPAT). 운영자가 version 을 ACTIVE 로 승격하면 다음 사이클부터 실제 값 교체

파일 보기

@ -30,34 +30,25 @@ def _jsonb(d: dict) -> str:
return json.dumps(d, ensure_ascii=False)
def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict:
"""현재 시간 기준 hourly 집계 (KST hour boundary).
def _aggregate_one_hour(conn, hour_start: datetime, updated_at: datetime) -> dict:
"""단일 hour 의 stats_hourly 레코드를 UPSERT.
DB 컬럼은 모두 timestamptz이므로 aware datetime이면 안전 비교됨.
운영자/대시보드 표기와 stat_hour boundary가 일치하도록 KST 기준.
Why separate: prediction 5 사이클이 평균 13 소요라 사이클이 hour 경계를
넘나드는 경우가 흔하다 (: 12:55 시작 13:08 완료). 이때 사이클 안에서 생성된
이벤트(occurred_at=12:57) 마지막으로 stats_aggregate_hourly 돌렸을
now_kst=13:08 이면 **13:00 hour ** UPSERT 되고 12:00 hour 이전 사이클이
남긴 stale snapshot 유지 카테고리·이벤트 누락.
해결: 현재 + 이전 hour 모두 재집계 (UPSERT idempotent).
"""
if target_hour is not None:
# 외부에서 특정 시점을 지정한 경우 KST로 정규화
if target_hour.tzinfo is None:
target_hour = target_hour.replace(tzinfo=_KST)
now_kst = target_hour.astimezone(_KST)
else:
now_kst = datetime.now(_KST)
hour_start = now_kst.replace(minute=0, second=0, microsecond=0)
hour_end = hour_start + timedelta(hours=1)
updated_at = datetime.now(timezone.utc)
with get_conn() as conn:
cur = conn.cursor()
# 탐지 수
cur = conn.cursor()
try:
cur.execute(
f"SELECT COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s",
(hour_start, hour_end)
)
total = cur.fetchone()[0] or 0
# 위험 레벨별
cur.execute(
f"""SELECT risk_level, COUNT(*) FROM {VAR_TABLE}
WHERE analyzed_at >= %s AND analyzed_at < %s AND risk_level IS NOT NULL
@ -66,7 +57,6 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict:
)
by_risk = dict(cur.fetchall())
# zone별 (vessel_analysis_results.zone_code)
cur.execute(
f"""SELECT zone_code, COUNT(*) FROM {VAR_TABLE}
WHERE analyzed_at >= %s AND analyzed_at < %s AND zone_code IS NOT NULL
@ -75,14 +65,12 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict:
)
by_zone = dict(cur.fetchall())
# 이벤트 수
cur.execute(
f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s",
(hour_start, hour_end)
)
events = cur.fetchone()[0] or 0
# 카테고리별 이벤트 (prediction_events.category)
cur.execute(
f"""SELECT category, COUNT(*) FROM {EVENTS_TABLE}
WHERE occurred_at >= %s AND occurred_at < %s AND category IS NOT NULL
@ -91,7 +79,6 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict:
)
by_category = dict(cur.fetchall())
# CRITICAL 이벤트
cur.execute(
f"""SELECT COUNT(*) FROM {EVENTS_TABLE}
WHERE occurred_at >= %s AND occurred_at < %s AND level = 'CRITICAL'""",
@ -115,14 +102,10 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict:
(hour_start, total, _jsonb(by_category), _jsonb(by_zone),
_jsonb(by_risk), events, critical, updated_at)
)
finally:
cur.close()
# 48시간 이전 정리
cutoff = updated_at - timedelta(hours=48)
cur.execute(f"DELETE FROM {STATS_HOURLY} WHERE stat_hour < %s", (cutoff,))
conn.commit()
result = {
return {
'hour': hour_start.isoformat(),
'detections': total,
'events': events,
@ -130,6 +113,41 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict:
'categories': len(by_category),
'zones': len(by_zone),
}
def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict:
"""현재 + 이전 hour 를 함께 재집계 (경계 누락 방지).
반환값은 **현재 hour** 결과 (하위 호환). 이전 hour 갱신은 부수효과.
target_hour 지정된 경우 시점 ± 1h 재집계.
DB 컬럼은 모두 timestamptz 이므로 aware datetime 이면 안전 비교됨.
운영자/대시보드 표기와 stat_hour boundary 일치하도록 KST 기준.
"""
if target_hour is not None:
if target_hour.tzinfo is None:
target_hour = target_hour.replace(tzinfo=_KST)
now_kst = target_hour.astimezone(_KST)
else:
now_kst = datetime.now(_KST)
current_hour = now_kst.replace(minute=0, second=0, microsecond=0)
previous_hour = current_hour - timedelta(hours=1)
updated_at = datetime.now(timezone.utc)
with get_conn() as conn:
# 이전 hour 먼저 재집계 (경계 누락 복구)
_aggregate_one_hour(conn, previous_hour, updated_at)
# 현재 hour — 반환값 대상
result = _aggregate_one_hour(conn, current_hour, updated_at)
# 48시간 이전 정리 (세션 재사용)
cutoff = updated_at - timedelta(hours=48)
cur = conn.cursor()
try:
cur.execute(f"DELETE FROM {STATS_HOURLY} WHERE stat_hour < %s", (cutoff,))
finally:
cur.close()
conn.commit()
logger.info(f'stats_aggregator hourly: {result}')
return result

파일 보기

@ -0,0 +1,129 @@
"""stats_aggregator.aggregate_hourly 가 현재 + 이전 hour 를 모두 UPSERT 하는지 검증.
배경: prediction 사이클이 hour 경계를 넘나들 (사이클 시작 12:55, 완료 13:08),
stats_aggregate_hourly 13:08 기준으로 hour 재집계하면 12:00 hour
이전 사이클 snapshot 으로 stale 사이클 생성된 이벤트(occurred_at=12:57)
누락되는 silent bug.
해결: aggregate_hourly 호출할 때마다 current_hour + previous_hour 동시 UPSERT.
테스트는 DB 없이 _aggregate_one_hour 호출 인자로 시각이 전달되는지만 검증.
"""
from __future__ import annotations
import sys
import types
import unittest
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
if 'pydantic_settings' not in sys.modules:
stub = types.ModuleType('pydantic_settings')
class _S:
def __init__(self, **kw):
for name, value in self.__class__.__dict__.items():
if name.isupper():
setattr(self, name, kw.get(name, value))
stub.BaseSettings = _S
sys.modules['pydantic_settings'] = stub
# psycopg2 + db.kcgdb stub (로컬 테스트 환경)
if 'psycopg2' not in sys.modules:
pg = types.ModuleType('psycopg2')
pg.pool = types.ModuleType('psycopg2.pool')
pg.pool.ThreadedConnectionPool = object
pg.extras = types.ModuleType('psycopg2.extras')
pg.extras.execute_values = lambda *a, **k: None
sys.modules['psycopg2'] = pg
sys.modules['psycopg2.pool'] = pg.pool
sys.modules['psycopg2.extras'] = pg.extras
if 'db' not in sys.modules:
db_pkg = types.ModuleType('db')
db_pkg.__path__ = []
sys.modules['db'] = db_pkg
if 'db.kcgdb' not in sys.modules:
kcgdb_stub = types.ModuleType('db.kcgdb')
kcgdb_stub.get_conn = lambda: None
sys.modules['db.kcgdb'] = kcgdb_stub
from output import stats_aggregator as sa
class AggregateHourlyBoundaryTest(unittest.TestCase):
def _mock_conn(self):
conn = MagicMock()
conn.cursor.return_value.__enter__ = MagicMock(return_value=conn.cursor.return_value)
conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
return conn
def test_aggregates_both_current_and_previous_hour(self):
"""target_hour=13:08 KST 일 때 _aggregate_one_hour 가 12:00 과 13:00 두 번 호출."""
captured_hours: list[datetime] = []
def fake_one_hour(conn, hour_start, updated_at):
captured_hours.append(hour_start)
return {'hour': hour_start.isoformat(), 'detections': 0, 'events': 0,
'critical': 0, 'categories': 0, 'zones': 0}
target = datetime(2026, 4, 20, 13, 8, 0, tzinfo=sa._KST)
with patch.object(sa, '_aggregate_one_hour', side_effect=fake_one_hour):
with patch.object(sa, 'get_conn') as gc:
cm = MagicMock()
cm.__enter__.return_value = MagicMock()
cm.__exit__.return_value = False
gc.return_value = cm
sa.aggregate_hourly(target_hour=target)
self.assertEqual(len(captured_hours), 2)
# 이전 hour 가 먼저 (복구 목적) → 현재 hour
self.assertEqual(captured_hours[0], datetime(2026, 4, 20, 12, 0, 0, tzinfo=sa._KST))
self.assertEqual(captured_hours[1], datetime(2026, 4, 20, 13, 0, 0, tzinfo=sa._KST))
def test_return_value_reflects_current_hour_only(self):
"""하위 호환: 반환값은 현재 hour 만 (이전 hour 는 부수효과)."""
def fake_one_hour(conn, hour_start, updated_at):
return {'hour': hour_start.isoformat(), 'detections': hour_start.hour * 100,
'events': 0, 'critical': 0, 'categories': 0, 'zones': 0}
target = datetime(2026, 4, 20, 13, 8, 0, tzinfo=sa._KST)
with patch.object(sa, '_aggregate_one_hour', side_effect=fake_one_hour):
with patch.object(sa, 'get_conn') as gc:
cm = MagicMock()
cm.__enter__.return_value = MagicMock()
cm.__exit__.return_value = False
gc.return_value = cm
result = sa.aggregate_hourly(target_hour=target)
# 현재 hour = 13:00 → detections=1300
self.assertEqual(result['detections'], 1300)
self.assertTrue(result['hour'].startswith('2026-04-20T13:00'))
def test_handles_day_boundary(self):
"""target=00:05 이면 previous=전날 23:00 로 정확히 재집계."""
captured_hours: list[datetime] = []
def fake_one_hour(conn, hour_start, updated_at):
captured_hours.append(hour_start)
return {'hour': hour_start.isoformat(), 'detections': 0, 'events': 0,
'critical': 0, 'categories': 0, 'zones': 0}
target = datetime(2026, 4, 21, 0, 5, 0, tzinfo=sa._KST)
with patch.object(sa, '_aggregate_one_hour', side_effect=fake_one_hour):
with patch.object(sa, 'get_conn') as gc:
cm = MagicMock()
cm.__enter__.return_value = MagicMock()
cm.__exit__.return_value = False
gc.return_value = cm
sa.aggregate_hourly(target_hour=target)
self.assertEqual(captured_hours[0], datetime(2026, 4, 20, 23, 0, 0, tzinfo=sa._KST))
self.assertEqual(captured_hours[1], datetime(2026, 4, 21, 0, 0, 0, tzinfo=sa._KST))
if __name__ == '__main__':
unittest.main()