diff --git a/docs/RELEASE-NOTES.md b/docs/RELEASE-NOTES.md index 70dbb1f..62b8396 100644 --- a/docs/RELEASE-NOTES.md +++ b/docs/RELEASE-NOTES.md @@ -4,6 +4,9 @@ ## [Unreleased] +### 수정 +- **`stats_aggregator.aggregate_hourly` hour 경계 silent 누락 버그** — prediction 5분 interval 이지만 한 사이클 평균 소요가 13분이라 사이클이 hour 경계를 넘나드는 경우(예: 12:55 시작 → 13:08 완료)가 흔함. 이 사이클 내 생성된 이벤트(occurred_at=12:57)가 stats_aggregate_hourly 호출 시점(now_kst=13:08)을 기준으로 **현재 hour=13:00 만** UPSERT 되어 12:00 hour 는 이전 사이클 snapshot 으로 stale 유지되는 silent drop. 실제 운영에서 `2026-04-20 12:50 CRITICAL GEAR_IDENTITY_COLLISION` 이벤트가 `prediction_stats_hourly.by_category` 에서 누락된 것을 Phase 1-2 의 snapshot `C1 drift` 섹션이 포착. 수정: `aggregate_hourly()` 가 호출될 때마다 **현재 + 이전 hour 를 모두 UPSERT** (UPSERT idempotent). `_aggregate_one_hour()` 로 단일 hour 집계를 분리하고 `aggregate_hourly()` 가 previous→current 순서로 호출. target_hour 지정 케이스에서도 ±1h 재집계. 반환값은 현재 hour 만 (하위 호환). 3 유닛테스트 (경계 호출 / 반환값 / 일 경계) 통과, 운영 수동 재집계로 2026-04-20 12:00 slot 에 GEAR_IDENTITY_COLLISION: 1 복구 + `C1 drift` 0 확인 + ### 추가 - **Phase 2 PoC 5 모델 마이그레이션 완료 (2 런타임 + 3 카탈로그)** — Phase 2 PoC 계획서의 5 알고리즘 전체를 detection_model 카탈로그로 등록하고 운영자 파라미터 튜닝 지점을 확보. 모드는 두 층위로 분리: - **런타임 override 완성 (2 모델)** — `dark_suspicion` (tier 3, DARK_VESSEL, 19 가중치 + sog/반복/gap/tier 임계) · `gear_violation_g01_g06` (tier 4, GEAR, 6 G-code 점수 + signal cycling + gear drift + 허용 어구 매핑). 알고리즘 함수에 `params: dict | None = None` 인자 추가, `_merge_default_*_params` 깊이 병합으로 override 가 DEFAULT 를 변조하지 않는 불변성 보장. `params=None` 호출은 Phase 2 이전과 완전 동일 결과 (BACK-COMPAT). 운영자가 version 을 ACTIVE 로 승격하면 다음 사이클부터 실제 값 교체 diff --git a/prediction/output/stats_aggregator.py b/prediction/output/stats_aggregator.py index 2f38f85..c1f0441 100644 --- a/prediction/output/stats_aggregator.py +++ b/prediction/output/stats_aggregator.py @@ -30,34 +30,25 @@ def _jsonb(d: dict) -> str: return json.dumps(d, ensure_ascii=False) -def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict: - """현재 시간 기준 hourly 집계 (KST hour boundary). +def _aggregate_one_hour(conn, hour_start: datetime, updated_at: datetime) -> dict: + """단일 hour 의 stats_hourly 레코드를 UPSERT. - DB 컬럼은 모두 timestamptz이므로 aware datetime이면 안전 비교됨. - 운영자/대시보드 표기와 stat_hour boundary가 일치하도록 KST 기준. + Why separate: prediction 5분 사이클이 평균 13분 소요라 한 사이클이 hour 경계를 + 넘나드는 경우가 흔하다 (예: 12:55 시작 → 13:08 완료). 이때 사이클 안에서 생성된 + 이벤트(occurred_at=12:57)가 마지막으로 stats_aggregate_hourly 를 돌렸을 때 + now_kst=13:08 이면 **13:00 hour 만** UPSERT 되고 12:00 hour 는 이전 사이클이 + 남긴 stale snapshot 을 유지 → 새 카테고리·이벤트 누락. + 해결: 현재 + 이전 hour 를 모두 재집계 (UPSERT idempotent). """ - if target_hour is not None: - # 외부에서 특정 시점을 지정한 경우 KST로 정규화 - if target_hour.tzinfo is None: - target_hour = target_hour.replace(tzinfo=_KST) - now_kst = target_hour.astimezone(_KST) - else: - now_kst = datetime.now(_KST) - hour_start = now_kst.replace(minute=0, second=0, microsecond=0) hour_end = hour_start + timedelta(hours=1) - updated_at = datetime.now(timezone.utc) - - with get_conn() as conn: - cur = conn.cursor() - - # 탐지 수 + cur = conn.cursor() + try: cur.execute( f"SELECT COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s", (hour_start, hour_end) ) total = cur.fetchone()[0] or 0 - # 위험 레벨별 cur.execute( f"""SELECT risk_level, COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s AND risk_level IS NOT NULL @@ -66,7 +57,6 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict: ) by_risk = dict(cur.fetchall()) - # zone별 (vessel_analysis_results.zone_code) cur.execute( f"""SELECT zone_code, COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s AND zone_code IS NOT NULL @@ -75,14 +65,12 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict: ) by_zone = dict(cur.fetchall()) - # 이벤트 수 cur.execute( f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s", (hour_start, hour_end) ) events = cur.fetchone()[0] or 0 - # 카테고리별 이벤트 (prediction_events.category) cur.execute( f"""SELECT category, COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s AND category IS NOT NULL @@ -91,7 +79,6 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict: ) by_category = dict(cur.fetchall()) - # CRITICAL 이벤트 cur.execute( f"""SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s AND level = 'CRITICAL'""", @@ -115,14 +102,10 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict: (hour_start, total, _jsonb(by_category), _jsonb(by_zone), _jsonb(by_risk), events, critical, updated_at) ) + finally: + cur.close() - # 48시간 이전 정리 - cutoff = updated_at - timedelta(hours=48) - cur.execute(f"DELETE FROM {STATS_HOURLY} WHERE stat_hour < %s", (cutoff,)) - - conn.commit() - - result = { + return { 'hour': hour_start.isoformat(), 'detections': total, 'events': events, @@ -130,6 +113,41 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict: 'categories': len(by_category), 'zones': len(by_zone), } + + +def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict: + """현재 + 이전 hour 를 함께 재집계 (경계 누락 방지). + + 반환값은 **현재 hour** 결과 (하위 호환). 이전 hour 갱신은 부수효과. + target_hour 가 지정된 경우 그 시점 ± 1h 를 재집계. + + DB 컬럼은 모두 timestamptz 이므로 aware datetime 이면 안전 비교됨. + 운영자/대시보드 표기와 stat_hour boundary 가 일치하도록 KST 기준. + """ + if target_hour is not None: + if target_hour.tzinfo is None: + target_hour = target_hour.replace(tzinfo=_KST) + now_kst = target_hour.astimezone(_KST) + else: + now_kst = datetime.now(_KST) + current_hour = now_kst.replace(minute=0, second=0, microsecond=0) + previous_hour = current_hour - timedelta(hours=1) + updated_at = datetime.now(timezone.utc) + + with get_conn() as conn: + # 이전 hour 먼저 재집계 (경계 누락 복구) + _aggregate_one_hour(conn, previous_hour, updated_at) + # 현재 hour — 반환값 대상 + result = _aggregate_one_hour(conn, current_hour, updated_at) + + # 48시간 이전 정리 (세션 재사용) + cutoff = updated_at - timedelta(hours=48) + cur = conn.cursor() + try: + cur.execute(f"DELETE FROM {STATS_HOURLY} WHERE stat_hour < %s", (cutoff,)) + finally: + cur.close() + conn.commit() logger.info(f'stats_aggregator hourly: {result}') return result diff --git a/prediction/tests/test_stats_aggregator_hour_boundary.py b/prediction/tests/test_stats_aggregator_hour_boundary.py new file mode 100644 index 0000000..3b6a2fc --- /dev/null +++ b/prediction/tests/test_stats_aggregator_hour_boundary.py @@ -0,0 +1,129 @@ +"""stats_aggregator.aggregate_hourly 가 현재 + 이전 hour 를 모두 UPSERT 하는지 검증. + +배경: prediction 사이클이 hour 경계를 넘나들 때 (사이클 시작 12:55, 완료 13:08), +stats_aggregate_hourly 가 13:08 기준으로 한 hour 만 재집계하면 12:00 hour 는 +이전 사이클 snapshot 으로 stale — 이 사이클 내 생성된 이벤트(occurred_at=12:57) +가 누락되는 silent bug. + +해결: aggregate_hourly 를 호출할 때마다 current_hour + previous_hour 를 동시 UPSERT. +이 테스트는 DB 없이 _aggregate_one_hour 호출 인자로 두 시각이 전달되는지만 검증. +""" +from __future__ import annotations + +import sys +import types +import unittest +from datetime import datetime, timedelta, timezone +from unittest.mock import MagicMock, patch + + +if 'pydantic_settings' not in sys.modules: + stub = types.ModuleType('pydantic_settings') + + class _S: + def __init__(self, **kw): + for name, value in self.__class__.__dict__.items(): + if name.isupper(): + setattr(self, name, kw.get(name, value)) + + stub.BaseSettings = _S + sys.modules['pydantic_settings'] = stub + +# psycopg2 + db.kcgdb stub (로컬 테스트 환경) +if 'psycopg2' not in sys.modules: + pg = types.ModuleType('psycopg2') + pg.pool = types.ModuleType('psycopg2.pool') + pg.pool.ThreadedConnectionPool = object + pg.extras = types.ModuleType('psycopg2.extras') + pg.extras.execute_values = lambda *a, **k: None + sys.modules['psycopg2'] = pg + sys.modules['psycopg2.pool'] = pg.pool + sys.modules['psycopg2.extras'] = pg.extras + +if 'db' not in sys.modules: + db_pkg = types.ModuleType('db') + db_pkg.__path__ = [] + sys.modules['db'] = db_pkg +if 'db.kcgdb' not in sys.modules: + kcgdb_stub = types.ModuleType('db.kcgdb') + kcgdb_stub.get_conn = lambda: None + sys.modules['db.kcgdb'] = kcgdb_stub + + +from output import stats_aggregator as sa + + +class AggregateHourlyBoundaryTest(unittest.TestCase): + + def _mock_conn(self): + conn = MagicMock() + conn.cursor.return_value.__enter__ = MagicMock(return_value=conn.cursor.return_value) + conn.cursor.return_value.__exit__ = MagicMock(return_value=False) + return conn + + def test_aggregates_both_current_and_previous_hour(self): + """target_hour=13:08 KST 일 때 _aggregate_one_hour 가 12:00 과 13:00 두 번 호출.""" + captured_hours: list[datetime] = [] + + def fake_one_hour(conn, hour_start, updated_at): + captured_hours.append(hour_start) + return {'hour': hour_start.isoformat(), 'detections': 0, 'events': 0, + 'critical': 0, 'categories': 0, 'zones': 0} + + target = datetime(2026, 4, 20, 13, 8, 0, tzinfo=sa._KST) + with patch.object(sa, '_aggregate_one_hour', side_effect=fake_one_hour): + with patch.object(sa, 'get_conn') as gc: + cm = MagicMock() + cm.__enter__.return_value = MagicMock() + cm.__exit__.return_value = False + gc.return_value = cm + sa.aggregate_hourly(target_hour=target) + + self.assertEqual(len(captured_hours), 2) + # 이전 hour 가 먼저 (복구 목적) → 현재 hour + self.assertEqual(captured_hours[0], datetime(2026, 4, 20, 12, 0, 0, tzinfo=sa._KST)) + self.assertEqual(captured_hours[1], datetime(2026, 4, 20, 13, 0, 0, tzinfo=sa._KST)) + + def test_return_value_reflects_current_hour_only(self): + """하위 호환: 반환값은 현재 hour 만 (이전 hour 는 부수효과).""" + def fake_one_hour(conn, hour_start, updated_at): + return {'hour': hour_start.isoformat(), 'detections': hour_start.hour * 100, + 'events': 0, 'critical': 0, 'categories': 0, 'zones': 0} + + target = datetime(2026, 4, 20, 13, 8, 0, tzinfo=sa._KST) + with patch.object(sa, '_aggregate_one_hour', side_effect=fake_one_hour): + with patch.object(sa, 'get_conn') as gc: + cm = MagicMock() + cm.__enter__.return_value = MagicMock() + cm.__exit__.return_value = False + gc.return_value = cm + result = sa.aggregate_hourly(target_hour=target) + + # 현재 hour = 13:00 → detections=1300 + self.assertEqual(result['detections'], 1300) + self.assertTrue(result['hour'].startswith('2026-04-20T13:00')) + + def test_handles_day_boundary(self): + """target=00:05 이면 previous=전날 23:00 로 정확히 재집계.""" + captured_hours: list[datetime] = [] + + def fake_one_hour(conn, hour_start, updated_at): + captured_hours.append(hour_start) + return {'hour': hour_start.isoformat(), 'detections': 0, 'events': 0, + 'critical': 0, 'categories': 0, 'zones': 0} + + target = datetime(2026, 4, 21, 0, 5, 0, tzinfo=sa._KST) + with patch.object(sa, '_aggregate_one_hour', side_effect=fake_one_hour): + with patch.object(sa, 'get_conn') as gc: + cm = MagicMock() + cm.__enter__.return_value = MagicMock() + cm.__exit__.return_value = False + gc.return_value = cm + sa.aggregate_hourly(target_hour=target) + + self.assertEqual(captured_hours[0], datetime(2026, 4, 20, 23, 0, 0, tzinfo=sa._KST)) + self.assertEqual(captured_hours[1], datetime(2026, 4, 21, 0, 0, 0, tzinfo=sa._KST)) + + +if __name__ == '__main__': + unittest.main()