""" 경보 발송 — CRITICAL/HIGH 이벤트에 대해 prediction_alerts INSERT. 현재는 DASHBOARD 채널만 기록 (실제 SMS/EMAIL은 향후 연동). """ import logging from datetime import datetime, timezone from psycopg2.extras import execute_values from config import qualified_table from db.kcgdb import get_conn logger = logging.getLogger(__name__) ALERTS_TABLE = qualified_table('prediction_alerts') EVENTS_TABLE = qualified_table('prediction_events') # CRITICAL/HIGH 이벤트만 알림 대상 ALERT_LEVELS = ('CRITICAL', 'HIGH') def run_alert_dispatcher() -> dict: """ 아직 알림이 없는 CRITICAL/HIGH 이벤트에 대해 알림 생성. Returns: { 'dispatched': int } """ now = datetime.now(timezone.utc) dispatched = 0 with get_conn() as conn: cur = conn.cursor() # 알림이 아직 없는 CRITICAL/HIGH 이벤트 조회 cur.execute( f"""SELECT e.id, e.ai_confidence FROM {EVENTS_TABLE} e LEFT JOIN {ALERTS_TABLE} a ON a.event_id = e.id WHERE e.level IN %s AND a.id IS NULL ORDER BY e.occurred_at DESC LIMIT 100""", (ALERT_LEVELS,) ) rows = cur.fetchall() if rows: alerts = [ (event_id, 'DASHBOARD', None, now, 'SENT', confidence) for event_id, confidence in rows ] execute_values( cur, f"""INSERT INTO {ALERTS_TABLE} (event_id, channel, recipient, sent_at, delivery_status, ai_confidence) VALUES %s""", alerts, ) conn.commit() dispatched = len(alerts) logger.info(f'alert_dispatcher: dispatched={dispatched}') return {'dispatched': dispatched}