kcg-ai-monitoring/prediction/output/stats_aggregator.py
htlee 7ab6baeed2 fix(prediction): stats_aggregator hour 경계 silent 누락 복구
배경: prediction 5분 interval 이지만 한 사이클 평균 13분 소요라
사이클이 hour 경계를 넘나드는 경우(12:55 시작 → 13:08 완료)가 흔하다.
이 때 사이클 내 생성된 이벤트(occurred_at=12:57)가 aggregate_hourly
호출 시점(now_kst=13:08) 기준 현재 hour=13:00 만 UPSERT 되어
12:00 hour 는 이전 사이클 snapshot 으로 stale 유지되는 silent drop.

실제 포착: 2026-04-20 12:50 CRITICAL GEAR_IDENTITY_COLLISION 이벤트가
prediction_stats_hourly.by_category 12:00 slot 에서 누락. Phase 1-2
snapshot 의 C1 drift 섹션이 only_in_events=GEAR_IDENTITY_COLLISION 으로 탐지.

수정:
- _aggregate_one_hour(conn, hour_start, updated_at): 단일 hour UPSERT 추출
- aggregate_hourly(): 호출 시마다 previous→current 순서로 2번 집계
  · UPSERT 라 idempotent
  · 반환값은 현재 hour (하위 호환)
  · target_hour 지정 케이스도 ±1h 재집계

검증:
- 3 유닛테스트 (경계 호출 2건 / 반환값 / 일 경계) 전수 통과
- 운영 수동 재집계로 12:00 slot GEAR_IDENTITY_COLLISION: 1 복구
- snapshot 재실행 시 C1 drift 0 확인

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 13:32:44 +09:00

300 lines
12 KiB
Python

"""
통계 사전 집계 — prediction_stats_hourly/daily/monthly 갱신.
hourly: 매 분석 사이클마다 (최근 48h 보존)
daily: 매일 01:00 또는 분석 사이클 후
monthly: daily 합산
"""
import json
import logging
from datetime import date, datetime, timedelta, timezone
from typing import Optional
from config import qualified_table
from db.kcgdb import get_conn
logger = logging.getLogger(__name__)
STATS_HOURLY = qualified_table('prediction_stats_hourly')
STATS_DAILY = qualified_table('prediction_stats_daily')
STATS_MONTHLY = qualified_table('prediction_stats_monthly')
VAR_TABLE = qualified_table('vessel_analysis_results')
EVENTS_TABLE = qualified_table('prediction_events')
ENF_TABLE = qualified_table('enforcement_records')
# 한국 표준시 (운영 기준 — 일/월 집계 경계)
_KST = timezone(timedelta(hours=9))
def _jsonb(d: dict) -> str:
return json.dumps(d, ensure_ascii=False)
def _aggregate_one_hour(conn, hour_start: datetime, updated_at: datetime) -> dict:
"""단일 hour 의 stats_hourly 레코드를 UPSERT.
Why separate: prediction 5분 사이클이 평균 13분 소요라 한 사이클이 hour 경계를
넘나드는 경우가 흔하다 (예: 12:55 시작 → 13:08 완료). 이때 사이클 안에서 생성된
이벤트(occurred_at=12:57)가 마지막으로 stats_aggregate_hourly 를 돌렸을 때
now_kst=13:08 이면 **13:00 hour 만** UPSERT 되고 12:00 hour 는 이전 사이클이
남긴 stale snapshot 을 유지 → 새 카테고리·이벤트 누락.
해결: 현재 + 이전 hour 를 모두 재집계 (UPSERT idempotent).
"""
hour_end = hour_start + timedelta(hours=1)
cur = conn.cursor()
try:
cur.execute(
f"SELECT COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s",
(hour_start, hour_end)
)
total = cur.fetchone()[0] or 0
cur.execute(
f"""SELECT risk_level, COUNT(*) FROM {VAR_TABLE}
WHERE analyzed_at >= %s AND analyzed_at < %s AND risk_level IS NOT NULL
GROUP BY risk_level""",
(hour_start, hour_end)
)
by_risk = dict(cur.fetchall())
cur.execute(
f"""SELECT zone_code, COUNT(*) FROM {VAR_TABLE}
WHERE analyzed_at >= %s AND analyzed_at < %s AND zone_code IS NOT NULL
GROUP BY zone_code""",
(hour_start, hour_end)
)
by_zone = dict(cur.fetchall())
cur.execute(
f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s",
(hour_start, hour_end)
)
events = cur.fetchone()[0] or 0
cur.execute(
f"""SELECT category, COUNT(*) FROM {EVENTS_TABLE}
WHERE occurred_at >= %s AND occurred_at < %s AND category IS NOT NULL
GROUP BY category""",
(hour_start, hour_end)
)
by_category = dict(cur.fetchall())
cur.execute(
f"""SELECT COUNT(*) FROM {EVENTS_TABLE}
WHERE occurred_at >= %s AND occurred_at < %s AND level = 'CRITICAL'""",
(hour_start, hour_end)
)
critical = cur.fetchone()[0] or 0
cur.execute(
f"""INSERT INTO {STATS_HOURLY}
(stat_hour, total_detections, by_category, by_zone, by_risk_level,
event_count, critical_count, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (stat_hour) DO UPDATE SET
total_detections = EXCLUDED.total_detections,
by_category = EXCLUDED.by_category,
by_zone = EXCLUDED.by_zone,
by_risk_level = EXCLUDED.by_risk_level,
event_count = EXCLUDED.event_count,
critical_count = EXCLUDED.critical_count,
updated_at = EXCLUDED.updated_at""",
(hour_start, total, _jsonb(by_category), _jsonb(by_zone),
_jsonb(by_risk), events, critical, updated_at)
)
finally:
cur.close()
return {
'hour': hour_start.isoformat(),
'detections': total,
'events': events,
'critical': critical,
'categories': len(by_category),
'zones': len(by_zone),
}
def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict:
"""현재 + 이전 hour 를 함께 재집계 (경계 누락 방지).
반환값은 **현재 hour** 결과 (하위 호환). 이전 hour 갱신은 부수효과.
target_hour 가 지정된 경우 그 시점 ± 1h 를 재집계.
DB 컬럼은 모두 timestamptz 이므로 aware datetime 이면 안전 비교됨.
운영자/대시보드 표기와 stat_hour boundary 가 일치하도록 KST 기준.
"""
if target_hour is not None:
if target_hour.tzinfo is None:
target_hour = target_hour.replace(tzinfo=_KST)
now_kst = target_hour.astimezone(_KST)
else:
now_kst = datetime.now(_KST)
current_hour = now_kst.replace(minute=0, second=0, microsecond=0)
previous_hour = current_hour - timedelta(hours=1)
updated_at = datetime.now(timezone.utc)
with get_conn() as conn:
# 이전 hour 먼저 재집계 (경계 누락 복구)
_aggregate_one_hour(conn, previous_hour, updated_at)
# 현재 hour — 반환값 대상
result = _aggregate_one_hour(conn, current_hour, updated_at)
# 48시간 이전 정리 (세션 재사용)
cutoff = updated_at - timedelta(hours=48)
cur = conn.cursor()
try:
cur.execute(f"DELETE FROM {STATS_HOURLY} WHERE stat_hour < %s", (cutoff,))
finally:
cur.close()
conn.commit()
logger.info(f'stats_aggregator hourly: {result}')
return result
def aggregate_daily(target_date: Optional[date] = None) -> dict:
"""지정 날짜 기준 daily 집계 (KST 기준)."""
d = target_date or datetime.now(_KST).date()
# KST 자정을 TIMESTAMPTZ로 표현 (UTC -9시간)
day_start = datetime(d.year, d.month, d.day, tzinfo=_KST)
day_end = day_start + timedelta(days=1)
now = datetime.now(timezone.utc)
with get_conn() as conn:
cur = conn.cursor()
# 총 탐지
cur.execute(
f"SELECT COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s",
(day_start, day_end)
)
total = cur.fetchone()[0] or 0
# 위반 유형별 (unnest)
cur.execute(
f"""SELECT unnest(violation_categories) AS vt, COUNT(*)
FROM {VAR_TABLE}
WHERE analyzed_at >= %s AND analyzed_at < %s AND violation_categories IS NOT NULL
GROUP BY vt""",
(day_start, day_end)
)
by_violation = dict(cur.fetchall())
# 위험 레벨별
cur.execute(
f"""SELECT risk_level, COUNT(*) FROM {VAR_TABLE}
WHERE analyzed_at >= %s AND analyzed_at < %s AND risk_level IS NOT NULL
GROUP BY risk_level""",
(day_start, day_end)
)
by_risk = dict(cur.fetchall())
# 이벤트
cur.execute(
f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s",
(day_start, day_end)
)
event_count = cur.fetchone()[0] or 0
cur.execute(
f"""SELECT COUNT(*) FROM {EVENTS_TABLE}
WHERE occurred_at >= %s AND occurred_at < %s AND level = 'CRITICAL'""",
(day_start, day_end)
)
critical = cur.fetchone()[0] or 0
# 단속
cur.execute(
f"SELECT COUNT(*) FROM {ENF_TABLE} WHERE enforced_at >= %s AND enforced_at < %s",
(day_start, day_end)
)
enf_count = cur.fetchone()[0] or 0
# 오탐
cur.execute(
f"""SELECT COUNT(*) FROM {EVENTS_TABLE}
WHERE occurred_at >= %s AND occurred_at < %s AND status = 'FALSE_POSITIVE'""",
(day_start, day_end)
)
fp = cur.fetchone()[0] or 0
# AI 정확도
accuracy = round((1 - fp / max(event_count, 1)) * 100, 2) if event_count > 0 else None
cur.execute(
f"""INSERT INTO {STATS_DAILY}
(stat_date, total_detections, by_violation_type, by_risk_level,
event_count, critical_event_count, enforcement_count,
false_positive_count, ai_accuracy_pct, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (stat_date) DO UPDATE SET
total_detections = EXCLUDED.total_detections,
by_violation_type = EXCLUDED.by_violation_type,
by_risk_level = EXCLUDED.by_risk_level,
event_count = EXCLUDED.event_count,
critical_event_count = EXCLUDED.critical_event_count,
enforcement_count = EXCLUDED.enforcement_count,
false_positive_count = EXCLUDED.false_positive_count,
ai_accuracy_pct = EXCLUDED.ai_accuracy_pct,
updated_at = EXCLUDED.updated_at""",
(d, total, _jsonb(by_violation), _jsonb(by_risk),
event_count, critical, enf_count, fp, accuracy, now)
)
conn.commit()
result = {'date': d.isoformat(), 'detections': total, 'events': event_count, 'accuracy': accuracy}
logger.info(f'stats_aggregator daily: {result}')
return result
def aggregate_monthly(target_month: Optional[date] = None) -> dict:
"""지정 월 기준 monthly 집계 (daily 합산, KST 기준)."""
d = target_month or datetime.now(_KST).date().replace(day=1)
month_start = d.replace(day=1)
if month_start.month == 12:
month_end = month_start.replace(year=month_start.year + 1, month=1)
else:
month_end = month_start.replace(month=month_start.month + 1)
now = datetime.now(timezone.utc)
with get_conn() as conn:
cur = conn.cursor()
cur.execute(
f"""SELECT
COALESCE(SUM(total_detections), 0),
COALESCE(SUM(event_count), 0),
COALESCE(SUM(critical_event_count), 0),
COALESCE(SUM(enforcement_count), 0),
COALESCE(SUM(false_positive_count), 0)
FROM {STATS_DAILY}
WHERE stat_date >= %s AND stat_date < %s""",
(month_start, month_end)
)
row = cur.fetchone()
total_det, evt, crit, enf, fp = row
accuracy = round((1 - fp / max(evt, 1)) * 100, 2) if evt > 0 else None
cur.execute(
f"""INSERT INTO {STATS_MONTHLY}
(stat_month, total_detections, total_enforcements,
event_count, critical_event_count, false_positive_count,
ai_accuracy_pct, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (stat_month) DO UPDATE SET
total_detections = EXCLUDED.total_detections,
total_enforcements = EXCLUDED.total_enforcements,
event_count = EXCLUDED.event_count,
critical_event_count = EXCLUDED.critical_event_count,
false_positive_count = EXCLUDED.false_positive_count,
ai_accuracy_pct = EXCLUDED.ai_accuracy_pct,
updated_at = EXCLUDED.updated_at""",
(month_start, total_det, enf, evt, crit, fp, accuracy, now)
)
conn.commit()
result = {'month': month_start.isoformat(), 'detections': total_det, 'enforcements': enf}
logger.info(f'stats_aggregator monthly: {result}')
return result