feat: S3 prediction 신규 출력 모듈 5종 + scheduler 통합

분석 사이클 완료 후 자동 실행되는 출력 파이프라인:
- event_generator: 분석결과 → 이벤트 자동 생성 (7개 룰, 카테고리별 dedup)
- violation_classifier: 위반 유형 라벨링 (EEZ/DARK/MMSI/TRANSSHIP/GEAR/RISK)
- kpi_writer: 실시간 KPI 6개 갱신 (오늘 기준 카운트)
- stats_aggregator: hourly/daily/monthly 사전 집계 (UPSERT)
- alert_dispatcher: CRITICAL/HIGH 이벤트 자동 알림 생성

scheduler.py에 출력 모듈 통합 (분석 8단계 완료 후 실행, non-fatal)
DB 연동 테스트 통과 (alerts 8건 생성, KPI tracking_active=2)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
htlee 2026-04-07 13:00:50 +09:00
부모 e2fc355b2c
커밋 474e672683
7개의 변경된 파일726개의 추가작업 그리고 1개의 파일을 삭제

파일 보기

파일 보기

@ -0,0 +1,64 @@
"""
경보 발송 CRITICAL/HIGH 이벤트에 대해 prediction_alerts INSERT.
현재는 DASHBOARD 채널만 기록 (실제 SMS/EMAIL은 향후 연동).
"""
import logging
from datetime import datetime, timezone
from psycopg2.extras import execute_values
from config import qualified_table
from db.kcgdb import get_conn
logger = logging.getLogger(__name__)
ALERTS_TABLE = qualified_table('prediction_alerts')
EVENTS_TABLE = qualified_table('prediction_events')
# CRITICAL/HIGH 이벤트만 알림 대상
ALERT_LEVELS = ('CRITICAL', 'HIGH')
def run_alert_dispatcher() -> dict:
"""
아직 알림이 없는 CRITICAL/HIGH 이벤트에 대해 알림 생성.
Returns:
{ 'dispatched': int }
"""
now = datetime.now(timezone.utc)
dispatched = 0
with get_conn() as conn:
cur = conn.cursor()
# 알림이 아직 없는 CRITICAL/HIGH 이벤트 조회
cur.execute(
f"""SELECT e.id, e.ai_confidence
FROM {EVENTS_TABLE} e
LEFT JOIN {ALERTS_TABLE} a ON a.event_id = e.id
WHERE e.level IN %s AND a.id IS NULL
ORDER BY e.occurred_at DESC
LIMIT 100""",
(ALERT_LEVELS,)
)
rows = cur.fetchall()
if rows:
alerts = [
(event_id, 'DASHBOARD', None, now, 'SENT', confidence)
for event_id, confidence in rows
]
execute_values(
cur,
f"""INSERT INTO {ALERTS_TABLE}
(event_id, channel, recipient, sent_at, delivery_status, ai_confidence)
VALUES %s""",
alerts,
)
conn.commit()
dispatched = len(alerts)
logger.info(f'alert_dispatcher: dispatched={dispatched}')
return {'dispatched': dispatched}

파일 보기

@ -0,0 +1,200 @@
"""
이벤트 자동 생성기 분석 결과 prediction_events INSERT.
분석 사이클마다 vessel_analysis_results를 스캔하여
기반으로 Event 객체를 생성합니다.
dedup: 동일 mmsi + category + 윈도우 중복 방지.
"""
import hashlib
import logging
from datetime import datetime, timedelta, timezone
from typing import Optional
from psycopg2.extras import execute_values
from config import qualified_table, settings
from db.kcgdb import get_conn
logger = logging.getLogger(__name__)
EVENTS_TABLE = qualified_table('prediction_events')
# 카테고리별 dedup 윈도우 (분)
DEDUP_WINDOWS = {
'EEZ_INTRUSION': 30,
'DARK_VESSEL': 120,
'FLEET_CLUSTER': 360,
'ILLEGAL_TRANSSHIP': 60,
'MMSI_TAMPERING': 30,
'AIS_LOSS': 120,
'SPEED_ANOMALY': 60,
'ZONE_DEPARTURE': 120,
'GEAR_ILLEGAL': 360,
'AIS_RESUME': 60,
}
# 이벤트 생성 룰
RULES = [
{
'name': 'critical_risk',
'condition': lambda r: r.get('risk_score', 0) >= 90,
'level': 'CRITICAL',
'category': 'EEZ_INTRUSION',
'title_fn': lambda r: f"고위험 선박 탐지 (위험도 {r.get('risk_score', 0)})",
},
{
'name': 'eez_violation',
'condition': lambda r: r.get('zone_code', '') in ('NLL', 'SPECIAL_FISHING_1', 'SPECIAL_FISHING_2')
and r.get('risk_score', 0) >= 70,
'level': 'CRITICAL',
'category': 'EEZ_INTRUSION',
'title_fn': lambda r: f"EEZ 침범 탐지 ({r.get('zone_code', '')})",
},
{
'name': 'dark_vessel_long',
'condition': lambda r: r.get('is_dark') and (r.get('gap_duration_min', 0) or 0) > 60,
'level': 'HIGH',
'category': 'DARK_VESSEL',
'title_fn': lambda r: f"다크베셀 장기 소실 ({r.get('gap_duration_min', 0)}분)",
},
{
'name': 'spoofing',
'condition': lambda r: (r.get('spoofing_score', 0) or 0) > 0.7,
'level': 'HIGH',
'category': 'MMSI_TAMPERING',
'title_fn': lambda r: f"GPS/MMSI 조작 의심 (점수 {r.get('spoofing_score', 0):.2f})",
},
{
'name': 'transship',
'condition': lambda r: r.get('transship_suspect'),
'level': 'HIGH',
'category': 'ILLEGAL_TRANSSHIP',
'title_fn': lambda r: f"환적 의심 (상대: {r.get('transship_pair_mmsi', '미상')})",
},
{
'name': 'fleet_cluster',
'condition': lambda r: r.get('fleet_is_leader') and (r.get('fleet_cluster_id') is not None),
'level': 'MEDIUM',
'category': 'FLEET_CLUSTER',
'title_fn': lambda r: f"선단 밀집 감지 (클러스터 {r.get('fleet_cluster_id')})",
},
{
'name': 'high_risk',
'condition': lambda r: r.get('risk_level') == 'HIGH' and r.get('risk_score', 0) >= 60,
'level': 'MEDIUM',
'category': 'ZONE_DEPARTURE',
'title_fn': lambda r: f"위험 행동 패턴 (위험도 {r.get('risk_score', 0)})",
},
]
def _make_dedup_key(mmsi: str, category: str) -> str:
return f"{mmsi}:{category}"
def _make_event_uid(now: datetime, seq: int) -> str:
date_str = now.strftime('%Y%m%d')
return f"EVT-{date_str}-{seq:04d}"
def _get_next_seq(conn, date_str: str) -> int:
cur = conn.cursor()
cur.execute(
f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE event_uid LIKE %s",
(f'EVT-{date_str}-%',)
)
return cur.fetchone()[0] + 1
def _check_dedup(conn, dedup_key: str, category: str, now: datetime) -> bool:
"""중복 이벤트 존재 여부 확인."""
window_min = DEDUP_WINDOWS.get(category, 60)
cutoff = now - timedelta(minutes=window_min)
cur = conn.cursor()
cur.execute(
f"SELECT 1 FROM {EVENTS_TABLE} WHERE dedup_key = %s AND occurred_at > %s LIMIT 1",
(dedup_key, cutoff)
)
return cur.fetchone() is not None
def run_event_generator(analysis_results: list[dict]) -> dict:
"""
분석 결과 리스트를 스캔하여 이벤트 생성.
Args:
analysis_results: vessel_analysis_results 딕셔너리 리스트
(mmsi, risk_score, zone_code, is_dark, gap_duration_min, spoofing_score, ...)
Returns:
{ 'generated': int, 'skipped_dedup': int }
"""
now = datetime.now(timezone.utc)
generated = 0
skipped_dedup = 0
events_to_insert = []
with get_conn() as conn:
date_str = now.strftime('%Y%m%d')
seq = _get_next_seq(conn, date_str)
for result in analysis_results:
mmsi = result.get('mmsi', '')
if not mmsi:
continue
for rule in RULES:
try:
if not rule['condition'](result):
continue
except Exception:
continue
category = rule['category']
dedup_key = _make_dedup_key(mmsi, category)
if _check_dedup(conn, dedup_key, category, now):
skipped_dedup += 1
continue
event_uid = _make_event_uid(now, seq)
seq += 1
events_to_insert.append((
event_uid,
now, # occurred_at
rule['level'],
category,
rule['title_fn'](result), # title
None, # detail
mmsi,
result.get('vessel_name'),
result.get('zone_code'), # area_name (zone으로 대체)
result.get('zone_code'),
result.get('lat'),
result.get('lon'),
result.get('speed_kn'),
'VESSEL_ANALYSIS', # source_type
result.get('id'), # source_ref_id
result.get('confidence') or result.get('risk_score', 0) / 100.0,
'NEW', # status
dedup_key,
))
generated += 1
break # 한 분석결과당 최고 우선순위 룰 1개만
if events_to_insert:
execute_values(
conn.cursor(),
f"""INSERT INTO {EVENTS_TABLE}
(event_uid, occurred_at, level, category, title, detail,
vessel_mmsi, vessel_name, area_name, zone_code, lat, lon, speed_kn,
source_type, source_ref_id, ai_confidence, status, dedup_key)
VALUES %s
ON CONFLICT (event_uid) DO NOTHING""",
events_to_insert,
)
conn.commit()
logger.info(f'event_generator: generated={generated}, skipped_dedup={skipped_dedup}')
return {'generated': generated, 'skipped_dedup': skipped_dedup}

파일 보기

@ -0,0 +1,109 @@
"""
실시간 KPI 갱신 prediction_kpi_realtime 테이블 업데이트.
분석 사이클마다 오늘 날짜 기준 카운트를 계산하여 6 KPI 갱신.
"""
import logging
from datetime import date, datetime, timezone
from config import qualified_table
from db.kcgdb import get_conn
logger = logging.getLogger(__name__)
KPI_TABLE = qualified_table('prediction_kpi_realtime')
EVENTS_TABLE = qualified_table('prediction_events')
ENF_TABLE = qualified_table('enforcement_records')
VAR_TABLE = qualified_table('vessel_analysis_results')
def run_kpi_writer() -> dict:
"""
오늘 날짜 기준으로 6 KPI를 재계산하여 갱신.
Returns:
{ kpi_key: value } 딕셔너리
"""
today = date.today()
today_start = datetime(today.year, today.month, today.day, tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
results = {}
with get_conn() as conn:
cur = conn.cursor()
# 1. 실시간 탐지 (오늘 분석 결과 수)
cur.execute(
f"SELECT COUNT(DISTINCT mmsi) FROM {VAR_TABLE} WHERE analyzed_at >= %s",
(today_start,)
)
realtime = cur.fetchone()[0] or 0
results['realtime_detection'] = realtime
# 2. EEZ 침범 (오늘 EEZ 관련 이벤트)
cur.execute(
f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE category = 'EEZ_INTRUSION' AND occurred_at >= %s",
(today_start,)
)
eez = cur.fetchone()[0] or 0
results['eez_violation'] = eez
# 3. 다크베셀 (현재 dark 상태인 선박)
cur.execute(
f"""SELECT COUNT(DISTINCT mmsi) FROM {VAR_TABLE}
WHERE is_dark = true AND analyzed_at >= %s""",
(today_start,)
)
dark = cur.fetchone()[0] or 0
results['dark_vessel'] = dark
# 4. 환적 의심 (오늘)
cur.execute(
f"""SELECT COUNT(*) FROM {EVENTS_TABLE}
WHERE category = 'ILLEGAL_TRANSSHIP' AND occurred_at >= %s""",
(today_start,)
)
transship = cur.fetchone()[0] or 0
results['illegal_transship'] = transship
# 5. 추적 중 (IN_PROGRESS 상태 이벤트)
cur.execute(
f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE status = 'IN_PROGRESS'"
)
tracking = cur.fetchone()[0] or 0
results['tracking_active'] = tracking
# 6. 나포/검문 (오늘 단속)
cur.execute(
f"SELECT COUNT(*) FROM {ENF_TABLE} WHERE enforced_at >= %s",
(today_start,)
)
captured = cur.fetchone()[0] or 0
results['captured_inspected'] = captured
# KPI 테이블 업데이트 (이전 값과 비교하여 trend 계산)
for key, value in results.items():
cur.execute(
f"SELECT value FROM {KPI_TABLE} WHERE kpi_key = %s",
(key,)
)
row = cur.fetchone()
prev = row[0] if row else 0
if value > prev:
trend, delta = 'up', ((value - prev) / max(prev, 1)) * 100
elif value < prev:
trend, delta = 'down', ((value - prev) / max(prev, 1)) * 100
else:
trend, delta = 'flat', 0.0
cur.execute(
f"""UPDATE {KPI_TABLE}
SET value = %s, trend = %s, delta_pct = %s, updated_at = %s
WHERE kpi_key = %s""",
(value, trend, round(delta, 2), now, key)
)
conn.commit()
logger.info(f'kpi_writer: {results}')
return results

파일 보기

@ -0,0 +1,237 @@
"""
통계 사전 집계 prediction_stats_hourly/daily/monthly 갱신.
hourly: 분석 사이클마다 (최근 48h 보존)
daily: 매일 01:00 또는 분석 사이클
monthly: daily 합산
"""
import json
import logging
from datetime import date, datetime, timedelta, timezone
from typing import Optional
from config import qualified_table
from db.kcgdb import get_conn
logger = logging.getLogger(__name__)
STATS_HOURLY = qualified_table('prediction_stats_hourly')
STATS_DAILY = qualified_table('prediction_stats_daily')
STATS_MONTHLY = qualified_table('prediction_stats_monthly')
VAR_TABLE = qualified_table('vessel_analysis_results')
EVENTS_TABLE = qualified_table('prediction_events')
ENF_TABLE = qualified_table('enforcement_records')
def _jsonb(d: dict) -> str:
return json.dumps(d, ensure_ascii=False)
def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict:
"""현재 시간 기준 hourly 집계."""
now = target_hour or datetime.now(timezone.utc)
hour_start = now.replace(minute=0, second=0, microsecond=0)
hour_end = hour_start + timedelta(hours=1)
with get_conn() as conn:
cur = conn.cursor()
# 탐지 수
cur.execute(
f"SELECT COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s",
(hour_start, hour_end)
)
total = cur.fetchone()[0] or 0
# 위험 레벨별
cur.execute(
f"""SELECT risk_level, COUNT(*) FROM {VAR_TABLE}
WHERE analyzed_at >= %s AND analyzed_at < %s AND risk_level IS NOT NULL
GROUP BY risk_level""",
(hour_start, hour_end)
)
by_risk = dict(cur.fetchall())
# 이벤트 수
cur.execute(
f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s",
(hour_start, hour_end)
)
events = cur.fetchone()[0] or 0
# CRITICAL 이벤트
cur.execute(
f"""SELECT COUNT(*) FROM {EVENTS_TABLE}
WHERE occurred_at >= %s AND occurred_at < %s AND level = 'CRITICAL'""",
(hour_start, hour_end)
)
critical = cur.fetchone()[0] or 0
cur.execute(
f"""INSERT INTO {STATS_HOURLY}
(stat_hour, total_detections, by_risk_level, event_count, critical_count, updated_at)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (stat_hour) DO UPDATE SET
total_detections = EXCLUDED.total_detections,
by_risk_level = EXCLUDED.by_risk_level,
event_count = EXCLUDED.event_count,
critical_count = EXCLUDED.critical_count,
updated_at = EXCLUDED.updated_at""",
(hour_start, total, _jsonb(by_risk), events, critical, now)
)
# 48시간 이전 정리
cutoff = now - timedelta(hours=48)
cur.execute(f"DELETE FROM {STATS_HOURLY} WHERE stat_hour < %s", (cutoff,))
conn.commit()
result = {'hour': hour_start.isoformat(), 'detections': total, 'events': events}
logger.info(f'stats_aggregator hourly: {result}')
return result
def aggregate_daily(target_date: Optional[date] = None) -> dict:
"""지정 날짜 기준 daily 집계."""
d = target_date or date.today()
day_start = datetime(d.year, d.month, d.day, tzinfo=timezone.utc)
day_end = day_start + timedelta(days=1)
now = datetime.now(timezone.utc)
with get_conn() as conn:
cur = conn.cursor()
# 총 탐지
cur.execute(
f"SELECT COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s",
(day_start, day_end)
)
total = cur.fetchone()[0] or 0
# 위반 유형별 (unnest)
cur.execute(
f"""SELECT unnest(violation_categories) AS vt, COUNT(*)
FROM {VAR_TABLE}
WHERE analyzed_at >= %s AND analyzed_at < %s AND violation_categories IS NOT NULL
GROUP BY vt""",
(day_start, day_end)
)
by_violation = dict(cur.fetchall())
# 위험 레벨별
cur.execute(
f"""SELECT risk_level, COUNT(*) FROM {VAR_TABLE}
WHERE analyzed_at >= %s AND analyzed_at < %s AND risk_level IS NOT NULL
GROUP BY risk_level""",
(day_start, day_end)
)
by_risk = dict(cur.fetchall())
# 이벤트
cur.execute(
f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s",
(day_start, day_end)
)
event_count = cur.fetchone()[0] or 0
cur.execute(
f"""SELECT COUNT(*) FROM {EVENTS_TABLE}
WHERE occurred_at >= %s AND occurred_at < %s AND level = 'CRITICAL'""",
(day_start, day_end)
)
critical = cur.fetchone()[0] or 0
# 단속
cur.execute(
f"SELECT COUNT(*) FROM {ENF_TABLE} WHERE enforced_at >= %s AND enforced_at < %s",
(day_start, day_end)
)
enf_count = cur.fetchone()[0] or 0
# 오탐
cur.execute(
f"""SELECT COUNT(*) FROM {EVENTS_TABLE}
WHERE occurred_at >= %s AND occurred_at < %s AND status = 'FALSE_POSITIVE'""",
(day_start, day_end)
)
fp = cur.fetchone()[0] or 0
# AI 정확도
accuracy = round((1 - fp / max(event_count, 1)) * 100, 2) if event_count > 0 else None
cur.execute(
f"""INSERT INTO {STATS_DAILY}
(stat_date, total_detections, by_violation_type, by_risk_level,
event_count, critical_event_count, enforcement_count,
false_positive_count, ai_accuracy_pct, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (stat_date) DO UPDATE SET
total_detections = EXCLUDED.total_detections,
by_violation_type = EXCLUDED.by_violation_type,
by_risk_level = EXCLUDED.by_risk_level,
event_count = EXCLUDED.event_count,
critical_event_count = EXCLUDED.critical_event_count,
enforcement_count = EXCLUDED.enforcement_count,
false_positive_count = EXCLUDED.false_positive_count,
ai_accuracy_pct = EXCLUDED.ai_accuracy_pct,
updated_at = EXCLUDED.updated_at""",
(d, total, _jsonb(by_violation), _jsonb(by_risk),
event_count, critical, enf_count, fp, accuracy, now)
)
conn.commit()
result = {'date': d.isoformat(), 'detections': total, 'events': event_count, 'accuracy': accuracy}
logger.info(f'stats_aggregator daily: {result}')
return result
def aggregate_monthly(target_month: Optional[date] = None) -> dict:
"""지정 월 기준 monthly 집계 (daily 합산)."""
d = target_month or date.today().replace(day=1)
month_start = d.replace(day=1)
if month_start.month == 12:
month_end = month_start.replace(year=month_start.year + 1, month=1)
else:
month_end = month_start.replace(month=month_start.month + 1)
now = datetime.now(timezone.utc)
with get_conn() as conn:
cur = conn.cursor()
cur.execute(
f"""SELECT
COALESCE(SUM(total_detections), 0),
COALESCE(SUM(event_count), 0),
COALESCE(SUM(critical_event_count), 0),
COALESCE(SUM(enforcement_count), 0),
COALESCE(SUM(false_positive_count), 0)
FROM {STATS_DAILY}
WHERE stat_date >= %s AND stat_date < %s""",
(month_start, month_end)
)
row = cur.fetchone()
total_det, evt, crit, enf, fp = row
accuracy = round((1 - fp / max(evt, 1)) * 100, 2) if evt > 0 else None
cur.execute(
f"""INSERT INTO {STATS_MONTHLY}
(stat_month, total_detections, total_enforcements,
event_count, critical_event_count, false_positive_count,
ai_accuracy_pct, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (stat_month) DO UPDATE SET
total_detections = EXCLUDED.total_detections,
total_enforcements = EXCLUDED.total_enforcements,
event_count = EXCLUDED.event_count,
critical_event_count = EXCLUDED.critical_event_count,
false_positive_count = EXCLUDED.false_positive_count,
ai_accuracy_pct = EXCLUDED.ai_accuracy_pct,
updated_at = EXCLUDED.updated_at""",
(month_start, total_det, enf, evt, crit, fp, accuracy, now)
)
conn.commit()
result = {'month': month_start.isoformat(), 'detections': total_det, 'enforcements': enf}
logger.info(f'stats_aggregator monthly: {result}')
return result

파일 보기

@ -0,0 +1,87 @@
"""
위반 유형 라벨링 분석 결과에 violation_categories[] 태깅.
vessel_analysis_results의 행에 대해 5 위반 카테고리를 판정하고
violation_categories TEXT[] 컬럼을 업데이트합니다.
"""
import logging
from psycopg2.extras import execute_batch
from config import qualified_table
from db.kcgdb import get_conn
logger = logging.getLogger(__name__)
VAR_TABLE = qualified_table('vessel_analysis_results')
def classify_violations(result: dict) -> list[str]:
"""단일 분석 결과에 대해 위반 유형 리스트 반환."""
violations = []
zone = result.get('zone_code', '') or ''
risk_score = result.get('risk_score', 0) or 0
is_dark = result.get('is_dark', False)
spoofing = result.get('spoofing_score', 0) or 0
transship = result.get('transship_suspect', False)
permit = result.get('permit_status', 'UNKNOWN') or 'UNKNOWN'
gap_min = result.get('gap_duration_min', 0) or 0
# EEZ 침범
if zone in ('NLL', 'SPECIAL_FISHING_1', 'SPECIAL_FISHING_2',
'SPECIAL_FISHING_3', 'SPECIAL_FISHING_4', 'EEZ_KR'):
if permit in ('NONE', 'EXPIRED', 'REVOKED'):
violations.append('EEZ_VIOLATION')
# 다크베셀
if is_dark and gap_min > 30:
violations.append('DARK_VESSEL')
# MMSI 변조
if spoofing > 0.6:
violations.append('MMSI_TAMPERING')
# 불법환적
if transship:
violations.append('ILLEGAL_TRANSSHIP')
# 어구 불법 (gear_judgment이 있는 경우)
gear_judgment = result.get('gear_judgment', '') or ''
if gear_judgment in ('NO_PERMIT', 'GEAR_MISMATCH', 'ZONE_VIOLATION', 'SEASON_VIOLATION'):
violations.append('ILLEGAL_GEAR')
# 위험 행동 (다른 위반 없이 고위험)
if not violations and risk_score >= 70:
violations.append('RISK_BEHAVIOR')
return violations
def run_violation_classifier(analysis_results: list[dict]) -> dict:
"""
분석 결과 리스트에 위반 카테고리를 라벨링하고 DB 업데이트.
Returns:
{ 'classified': int, 'violations_found': int }
"""
updates = []
violations_found = 0
for result in analysis_results:
violations = classify_violations(result)
result_id = result.get('id')
if result_id and violations:
updates.append((violations, result_id))
violations_found += len(violations)
if updates:
with get_conn() as conn:
execute_batch(
conn.cursor(),
f"UPDATE {VAR_TABLE} SET violation_categories = %s WHERE id = %s",
updates,
)
conn.commit()
logger.info(f'violation_classifier: classified={len(updates)}, violations={violations_found}')
return {'classified': len(updates), 'violations_found': violations_found}

파일 보기

@ -293,7 +293,35 @@ def run_analysis_cycle():
upserted = kcgdb.upsert_results(results)
kcgdb.cleanup_old(hours=48)
# 8. Redis에 분석 컨텍스트 캐싱 (채팅용)
# 8. 출력 모듈 (이벤트 생성, 위반 분류, KPI 갱신, 통계 집계, 경보)
try:
from output.violation_classifier import run_violation_classifier
from output.event_generator import run_event_generator
from output.kpi_writer import run_kpi_writer
from output.stats_aggregator import aggregate_hourly, aggregate_daily
from output.alert_dispatcher import run_alert_dispatcher
from dataclasses import asdict
results_dicts = [asdict(r) for r in results]
# 필드명 매핑 (AnalysisResult → 출력 모듈 기대 형식)
for d in results_dicts:
d['zone_code'] = d.pop('zone', None)
d['gap_duration_min'] = d.get('gap_duration_min', 0)
d['transship_suspect'] = d.pop('is_transship_suspect', False)
d['fleet_is_leader'] = d.pop('is_leader', False)
d['fleet_cluster_id'] = d.pop('cluster_id', None)
d['speed_kn'] = None # 분석 결과에 속도 없음
run_violation_classifier(results_dicts)
run_event_generator(results_dicts)
run_kpi_writer()
aggregate_hourly()
aggregate_daily()
run_alert_dispatcher()
logger.info('output modules completed')
except Exception as e:
logger.warning('output modules failed (non-fatal): %s', e)
# 9. Redis에 분석 컨텍스트 캐싱 (채팅용)
try:
from chat.cache import cache_analysis_context