""" 통계 사전 집계 — prediction_stats_hourly/daily/monthly 갱신. hourly: 매 분석 사이클마다 (최근 48h 보존) daily: 매일 01:00 또는 분석 사이클 후 monthly: daily 합산 """ import json import logging from datetime import date, datetime, timedelta, timezone from typing import Optional from config import qualified_table from db.kcgdb import get_conn logger = logging.getLogger(__name__) STATS_HOURLY = qualified_table('prediction_stats_hourly') STATS_DAILY = qualified_table('prediction_stats_daily') STATS_MONTHLY = qualified_table('prediction_stats_monthly') VAR_TABLE = qualified_table('vessel_analysis_results') EVENTS_TABLE = qualified_table('prediction_events') ENF_TABLE = qualified_table('enforcement_records') # 한국 표준시 (운영 기준 — 일/월 집계 경계) _KST = timezone(timedelta(hours=9)) def _jsonb(d: dict) -> str: return json.dumps(d, ensure_ascii=False) def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict: """현재 시간 기준 hourly 집계.""" now = target_hour or datetime.now(timezone.utc) hour_start = now.replace(minute=0, second=0, microsecond=0) hour_end = hour_start + timedelta(hours=1) with get_conn() as conn: cur = conn.cursor() # 탐지 수 cur.execute( f"SELECT COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s", (hour_start, hour_end) ) total = cur.fetchone()[0] or 0 # 위험 레벨별 cur.execute( f"""SELECT risk_level, COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s AND risk_level IS NOT NULL GROUP BY risk_level""", (hour_start, hour_end) ) by_risk = dict(cur.fetchall()) # 이벤트 수 cur.execute( f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s", (hour_start, hour_end) ) events = cur.fetchone()[0] or 0 # CRITICAL 이벤트 cur.execute( f"""SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s AND level = 'CRITICAL'""", (hour_start, hour_end) ) critical = cur.fetchone()[0] or 0 cur.execute( f"""INSERT INTO {STATS_HOURLY} (stat_hour, total_detections, by_risk_level, event_count, critical_count, updated_at) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (stat_hour) DO UPDATE SET total_detections = EXCLUDED.total_detections, by_risk_level = EXCLUDED.by_risk_level, event_count = EXCLUDED.event_count, critical_count = EXCLUDED.critical_count, updated_at = EXCLUDED.updated_at""", (hour_start, total, _jsonb(by_risk), events, critical, now) ) # 48시간 이전 정리 cutoff = now - timedelta(hours=48) cur.execute(f"DELETE FROM {STATS_HOURLY} WHERE stat_hour < %s", (cutoff,)) conn.commit() result = {'hour': hour_start.isoformat(), 'detections': total, 'events': events} logger.info(f'stats_aggregator hourly: {result}') return result def aggregate_daily(target_date: Optional[date] = None) -> dict: """지정 날짜 기준 daily 집계 (KST 기준).""" d = target_date or datetime.now(_KST).date() # KST 자정을 TIMESTAMPTZ로 표현 (UTC -9시간) day_start = datetime(d.year, d.month, d.day, tzinfo=_KST) day_end = day_start + timedelta(days=1) now = datetime.now(timezone.utc) with get_conn() as conn: cur = conn.cursor() # 총 탐지 cur.execute( f"SELECT COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s", (day_start, day_end) ) total = cur.fetchone()[0] or 0 # 위반 유형별 (unnest) cur.execute( f"""SELECT unnest(violation_categories) AS vt, COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s AND violation_categories IS NOT NULL GROUP BY vt""", (day_start, day_end) ) by_violation = dict(cur.fetchall()) # 위험 레벨별 cur.execute( f"""SELECT risk_level, COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s AND risk_level IS NOT NULL GROUP BY risk_level""", (day_start, day_end) ) by_risk = dict(cur.fetchall()) # 이벤트 cur.execute( f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s", (day_start, day_end) ) event_count = cur.fetchone()[0] or 0 cur.execute( f"""SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s AND level = 'CRITICAL'""", (day_start, day_end) ) critical = cur.fetchone()[0] or 0 # 단속 cur.execute( f"SELECT COUNT(*) FROM {ENF_TABLE} WHERE enforced_at >= %s AND enforced_at < %s", (day_start, day_end) ) enf_count = cur.fetchone()[0] or 0 # 오탐 cur.execute( f"""SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s AND status = 'FALSE_POSITIVE'""", (day_start, day_end) ) fp = cur.fetchone()[0] or 0 # AI 정확도 accuracy = round((1 - fp / max(event_count, 1)) * 100, 2) if event_count > 0 else None cur.execute( f"""INSERT INTO {STATS_DAILY} (stat_date, total_detections, by_violation_type, by_risk_level, event_count, critical_event_count, enforcement_count, false_positive_count, ai_accuracy_pct, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (stat_date) DO UPDATE SET total_detections = EXCLUDED.total_detections, by_violation_type = EXCLUDED.by_violation_type, by_risk_level = EXCLUDED.by_risk_level, event_count = EXCLUDED.event_count, critical_event_count = EXCLUDED.critical_event_count, enforcement_count = EXCLUDED.enforcement_count, false_positive_count = EXCLUDED.false_positive_count, ai_accuracy_pct = EXCLUDED.ai_accuracy_pct, updated_at = EXCLUDED.updated_at""", (d, total, _jsonb(by_violation), _jsonb(by_risk), event_count, critical, enf_count, fp, accuracy, now) ) conn.commit() result = {'date': d.isoformat(), 'detections': total, 'events': event_count, 'accuracy': accuracy} logger.info(f'stats_aggregator daily: {result}') return result def aggregate_monthly(target_month: Optional[date] = None) -> dict: """지정 월 기준 monthly 집계 (daily 합산, KST 기준).""" d = target_month or datetime.now(_KST).date().replace(day=1) month_start = d.replace(day=1) if month_start.month == 12: month_end = month_start.replace(year=month_start.year + 1, month=1) else: month_end = month_start.replace(month=month_start.month + 1) now = datetime.now(timezone.utc) with get_conn() as conn: cur = conn.cursor() cur.execute( f"""SELECT COALESCE(SUM(total_detections), 0), COALESCE(SUM(event_count), 0), COALESCE(SUM(critical_event_count), 0), COALESCE(SUM(enforcement_count), 0), COALESCE(SUM(false_positive_count), 0) FROM {STATS_DAILY} WHERE stat_date >= %s AND stat_date < %s""", (month_start, month_end) ) row = cur.fetchone() total_det, evt, crit, enf, fp = row accuracy = round((1 - fp / max(evt, 1)) * 100, 2) if evt > 0 else None cur.execute( f"""INSERT INTO {STATS_MONTHLY} (stat_month, total_detections, total_enforcements, event_count, critical_event_count, false_positive_count, ai_accuracy_pct, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (stat_month) DO UPDATE SET total_detections = EXCLUDED.total_detections, total_enforcements = EXCLUDED.total_enforcements, event_count = EXCLUDED.event_count, critical_event_count = EXCLUDED.critical_event_count, false_positive_count = EXCLUDED.false_positive_count, ai_accuracy_pct = EXCLUDED.ai_accuracy_pct, updated_at = EXCLUDED.updated_at""", (month_start, total_det, enf, evt, crit, fp, accuracy, now) ) conn.commit() result = {'month': month_start.isoformat(), 'detections': total_det, 'enforcements': enf} logger.info(f'stats_aggregator monthly: {result}') return result