""" 통계 사전 집계 — prediction_stats_hourly/daily/monthly 갱신. hourly: 매 분석 사이클마다 (최근 48h 보존) daily: 매일 01:00 또는 분석 사이클 후 monthly: daily 합산 """ import json import logging from datetime import date, datetime, timedelta, timezone from typing import Optional from config import qualified_table from db.kcgdb import get_conn logger = logging.getLogger(__name__) STATS_HOURLY = qualified_table('prediction_stats_hourly') STATS_DAILY = qualified_table('prediction_stats_daily') STATS_MONTHLY = qualified_table('prediction_stats_monthly') VAR_TABLE = qualified_table('vessel_analysis_results') EVENTS_TABLE = qualified_table('prediction_events') ENF_TABLE = qualified_table('enforcement_records') # 한국 표준시 (운영 기준 — 일/월 집계 경계) _KST = timezone(timedelta(hours=9)) def _jsonb(d: dict) -> str: return json.dumps(d, ensure_ascii=False) def _aggregate_one_hour(conn, hour_start: datetime, updated_at: datetime) -> dict: """단일 hour 의 stats_hourly 레코드를 UPSERT. Why separate: prediction 5분 사이클이 평균 13분 소요라 한 사이클이 hour 경계를 넘나드는 경우가 흔하다 (예: 12:55 시작 → 13:08 완료). 이때 사이클 안에서 생성된 이벤트(occurred_at=12:57)가 마지막으로 stats_aggregate_hourly 를 돌렸을 때 now_kst=13:08 이면 **13:00 hour 만** UPSERT 되고 12:00 hour 는 이전 사이클이 남긴 stale snapshot 을 유지 → 새 카테고리·이벤트 누락. 해결: 현재 + 이전 hour 를 모두 재집계 (UPSERT idempotent). """ hour_end = hour_start + timedelta(hours=1) cur = conn.cursor() try: cur.execute( f"SELECT COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s", (hour_start, hour_end) ) total = cur.fetchone()[0] or 0 cur.execute( f"""SELECT risk_level, COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s AND risk_level IS NOT NULL GROUP BY risk_level""", (hour_start, hour_end) ) by_risk = dict(cur.fetchall()) cur.execute( f"""SELECT zone_code, COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s AND zone_code IS NOT NULL GROUP BY zone_code""", (hour_start, hour_end) ) by_zone = dict(cur.fetchall()) cur.execute( f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s", (hour_start, hour_end) ) events = cur.fetchone()[0] or 0 cur.execute( f"""SELECT category, COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s AND category IS NOT NULL GROUP BY category""", (hour_start, hour_end) ) by_category = dict(cur.fetchall()) cur.execute( f"""SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s AND level = 'CRITICAL'""", (hour_start, hour_end) ) critical = cur.fetchone()[0] or 0 cur.execute( f"""INSERT INTO {STATS_HOURLY} (stat_hour, total_detections, by_category, by_zone, by_risk_level, event_count, critical_count, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (stat_hour) DO UPDATE SET total_detections = EXCLUDED.total_detections, by_category = EXCLUDED.by_category, by_zone = EXCLUDED.by_zone, by_risk_level = EXCLUDED.by_risk_level, event_count = EXCLUDED.event_count, critical_count = EXCLUDED.critical_count, updated_at = EXCLUDED.updated_at""", (hour_start, total, _jsonb(by_category), _jsonb(by_zone), _jsonb(by_risk), events, critical, updated_at) ) finally: cur.close() return { 'hour': hour_start.isoformat(), 'detections': total, 'events': events, 'critical': critical, 'categories': len(by_category), 'zones': len(by_zone), } def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict: """현재 + 이전 hour 를 함께 재집계 (경계 누락 방지). 반환값은 **현재 hour** 결과 (하위 호환). 이전 hour 갱신은 부수효과. target_hour 가 지정된 경우 그 시점 ± 1h 를 재집계. DB 컬럼은 모두 timestamptz 이므로 aware datetime 이면 안전 비교됨. 운영자/대시보드 표기와 stat_hour boundary 가 일치하도록 KST 기준. """ if target_hour is not None: if target_hour.tzinfo is None: target_hour = target_hour.replace(tzinfo=_KST) now_kst = target_hour.astimezone(_KST) else: now_kst = datetime.now(_KST) current_hour = now_kst.replace(minute=0, second=0, microsecond=0) previous_hour = current_hour - timedelta(hours=1) updated_at = datetime.now(timezone.utc) with get_conn() as conn: # 이전 hour 먼저 재집계 (경계 누락 복구) _aggregate_one_hour(conn, previous_hour, updated_at) # 현재 hour — 반환값 대상 result = _aggregate_one_hour(conn, current_hour, updated_at) # 48시간 이전 정리 (세션 재사용) cutoff = updated_at - timedelta(hours=48) cur = conn.cursor() try: cur.execute(f"DELETE FROM {STATS_HOURLY} WHERE stat_hour < %s", (cutoff,)) finally: cur.close() conn.commit() logger.info(f'stats_aggregator hourly: {result}') return result def aggregate_daily(target_date: Optional[date] = None) -> dict: """지정 날짜 기준 daily 집계 (KST 기준).""" d = target_date or datetime.now(_KST).date() # KST 자정을 TIMESTAMPTZ로 표현 (UTC -9시간) day_start = datetime(d.year, d.month, d.day, tzinfo=_KST) day_end = day_start + timedelta(days=1) now = datetime.now(timezone.utc) with get_conn() as conn: cur = conn.cursor() # 총 탐지 cur.execute( f"SELECT COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s", (day_start, day_end) ) total = cur.fetchone()[0] or 0 # 위반 유형별 (unnest) cur.execute( f"""SELECT unnest(violation_categories) AS vt, COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s AND violation_categories IS NOT NULL GROUP BY vt""", (day_start, day_end) ) by_violation = dict(cur.fetchall()) # 위험 레벨별 cur.execute( f"""SELECT risk_level, COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s AND risk_level IS NOT NULL GROUP BY risk_level""", (day_start, day_end) ) by_risk = dict(cur.fetchall()) # 이벤트 cur.execute( f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s", (day_start, day_end) ) event_count = cur.fetchone()[0] or 0 cur.execute( f"""SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s AND level = 'CRITICAL'""", (day_start, day_end) ) critical = cur.fetchone()[0] or 0 # 단속 cur.execute( f"SELECT COUNT(*) FROM {ENF_TABLE} WHERE enforced_at >= %s AND enforced_at < %s", (day_start, day_end) ) enf_count = cur.fetchone()[0] or 0 # 오탐 cur.execute( f"""SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s AND status = 'FALSE_POSITIVE'""", (day_start, day_end) ) fp = cur.fetchone()[0] or 0 # AI 정확도 accuracy = round((1 - fp / max(event_count, 1)) * 100, 2) if event_count > 0 else None cur.execute( f"""INSERT INTO {STATS_DAILY} (stat_date, total_detections, by_violation_type, by_risk_level, event_count, critical_event_count, enforcement_count, false_positive_count, ai_accuracy_pct, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (stat_date) DO UPDATE SET total_detections = EXCLUDED.total_detections, by_violation_type = EXCLUDED.by_violation_type, by_risk_level = EXCLUDED.by_risk_level, event_count = EXCLUDED.event_count, critical_event_count = EXCLUDED.critical_event_count, enforcement_count = EXCLUDED.enforcement_count, false_positive_count = EXCLUDED.false_positive_count, ai_accuracy_pct = EXCLUDED.ai_accuracy_pct, updated_at = EXCLUDED.updated_at""", (d, total, _jsonb(by_violation), _jsonb(by_risk), event_count, critical, enf_count, fp, accuracy, now) ) conn.commit() result = {'date': d.isoformat(), 'detections': total, 'events': event_count, 'accuracy': accuracy} logger.info(f'stats_aggregator daily: {result}') return result def aggregate_monthly(target_month: Optional[date] = None) -> dict: """지정 월 기준 monthly 집계 (daily 합산, KST 기준).""" d = target_month or datetime.now(_KST).date().replace(day=1) month_start = d.replace(day=1) if month_start.month == 12: month_end = month_start.replace(year=month_start.year + 1, month=1) else: month_end = month_start.replace(month=month_start.month + 1) now = datetime.now(timezone.utc) with get_conn() as conn: cur = conn.cursor() cur.execute( f"""SELECT COALESCE(SUM(total_detections), 0), COALESCE(SUM(event_count), 0), COALESCE(SUM(critical_event_count), 0), COALESCE(SUM(enforcement_count), 0), COALESCE(SUM(false_positive_count), 0) FROM {STATS_DAILY} WHERE stat_date >= %s AND stat_date < %s""", (month_start, month_end) ) row = cur.fetchone() total_det, evt, crit, enf, fp = row accuracy = round((1 - fp / max(evt, 1)) * 100, 2) if evt > 0 else None cur.execute( f"""INSERT INTO {STATS_MONTHLY} (stat_month, total_detections, total_enforcements, event_count, critical_event_count, false_positive_count, ai_accuracy_pct, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (stat_month) DO UPDATE SET total_detections = EXCLUDED.total_detections, total_enforcements = EXCLUDED.total_enforcements, event_count = EXCLUDED.event_count, critical_event_count = EXCLUDED.critical_event_count, false_positive_count = EXCLUDED.false_positive_count, ai_accuracy_pct = EXCLUDED.ai_accuracy_pct, updated_at = EXCLUDED.updated_at""", (month_start, total_det, enf, evt, crit, fp, accuracy, now) ) conn.commit() result = {'month': month_start.isoformat(), 'detections': total_det, 'enforcements': enf} logger.info(f'stats_aggregator monthly: {result}') return result