kcg-ai-monitoring/prediction/output/stats_aggregator.py
htlee da37a00b8e fix: prediction 5가지 이슈 수정 — 모든 파이프라인 정상 동작
## 이슈 1: gear_correlation Decimal → float TypeError
- prediction/algorithms/gear_correlation.py:785
- _load_all_scores()가 NUMERIC 컬럼을 Decimal로 읽어 float 상수와 연산 시 실패
- float() 명시 변환으로 수정
- 효과: gear correlation 24,474 raw metrics + 3,966 scores 정상 기록

## 이슈 2: violation_classifier classified=0 문제
- prediction/output/violation_classifier.py
- result.get('id')는 AnalysisResult에 없어 항상 None → 모든 UPDATE 건너뜀
- 존재하지 않는 permit_status/gear_judgment 필드에 의존
- (mmsi, analyzed_at) 기준 UPDATE로 변경
- 중국 선박(412/413*) + EEZ 진입은 permit 없어도 EEZ_VIOLATION 판정
- 효과: classified=0 → classified=4~6/cycle

## 이슈 3: kpi_writer 모두 0 (tracking_active 외)
- prediction/output/kpi_writer.py:27
- date.today() + timezone.utc 혼용 → 현재 시각이 UTC로는 아직 '어제'라 '오늘 >= today_start' 쿼리가 0 반환
- KST 기준으로 today_start 계산
- 효과: realtime_detection 0 → 7,107, illegal_transship 0 → 5,033

## 이슈 4: stats_daily 오늘 0건
- prediction/output/stats_aggregator.py:96, 194
- aggregate_daily/monthly가 UTC 경계 사용
- KST 기준 자정으로 수정
- 효과: 2026-04-08 detections 0 → 543,656, events 0 → 5,253

## 이슈 5: parent workflow 테이블 누락 컬럼 (V005 ↔ prediction 불일치)
V016 마이그레이션으로 일괄 추가:
- gear_parent_label_sessions: label_parent_name, normalized_parent_name,
  duration_days, actor, comment, metadata, updated_at 등 8개 컬럼
- gear_group_parent_resolution: parent_name, normalized_parent_name,
  selected_parent_name, confidence, decision_source, top_score, second_score,
  score_margin, stable_cycles, evidence_summary, episode_id, continuity_*,
  prior_bonus_total, last_evaluated_at, last_promoted_at 등 17개 컬럼
- gear_parent_candidate_exclusions: normalized_parent_name, reason_type,
  duration_days, metadata, updated_at, active_from, active_until +
  candidate_mmsi GENERATED ALWAYS AS (excluded_mmsi) 별칭
- gear_group_parent_candidate_snapshots: parent_name

효과: gear parent inference: 925 groups, 301 direct-match, 1329 candidates,
      188 review-required, 925 episode-snapshots 기록 — 전체 모선 워크플로우 정상

## 검증 결과 (e2e)

- analysis cycle: 6,824 vessels, 112초/cycle 정상
- vessel_analysis_results: 10분 13,650건, 총 125만건
- prediction_events: 1시간 138건, 총 12,258건
- prediction_alerts: 1시간 183건
- gear_correlation_scores: 3,966건
- gear_group_parent_resolution: 926건
- stats_hourly: 17행, stats_daily: 오늘 543,656건
- 백엔드 Flyway V016 정상 적용

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 06:47:53 +09:00

242 lines
9.1 KiB
Python

"""
통계 사전 집계 — prediction_stats_hourly/daily/monthly 갱신.
hourly: 매 분석 사이클마다 (최근 48h 보존)
daily: 매일 01:00 또는 분석 사이클 후
monthly: daily 합산
"""
import json
import logging
from datetime import date, datetime, timedelta, timezone
from typing import Optional
from config import qualified_table
from db.kcgdb import get_conn
logger = logging.getLogger(__name__)
STATS_HOURLY = qualified_table('prediction_stats_hourly')
STATS_DAILY = qualified_table('prediction_stats_daily')
STATS_MONTHLY = qualified_table('prediction_stats_monthly')
VAR_TABLE = qualified_table('vessel_analysis_results')
EVENTS_TABLE = qualified_table('prediction_events')
ENF_TABLE = qualified_table('enforcement_records')
# 한국 표준시 (운영 기준 — 일/월 집계 경계)
_KST = timezone(timedelta(hours=9))
def _jsonb(d: dict) -> str:
return json.dumps(d, ensure_ascii=False)
def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict:
"""현재 시간 기준 hourly 집계."""
now = target_hour or datetime.now(timezone.utc)
hour_start = now.replace(minute=0, second=0, microsecond=0)
hour_end = hour_start + timedelta(hours=1)
with get_conn() as conn:
cur = conn.cursor()
# 탐지 수
cur.execute(
f"SELECT COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s",
(hour_start, hour_end)
)
total = cur.fetchone()[0] or 0
# 위험 레벨별
cur.execute(
f"""SELECT risk_level, COUNT(*) FROM {VAR_TABLE}
WHERE analyzed_at >= %s AND analyzed_at < %s AND risk_level IS NOT NULL
GROUP BY risk_level""",
(hour_start, hour_end)
)
by_risk = dict(cur.fetchall())
# 이벤트 수
cur.execute(
f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s",
(hour_start, hour_end)
)
events = cur.fetchone()[0] or 0
# CRITICAL 이벤트
cur.execute(
f"""SELECT COUNT(*) FROM {EVENTS_TABLE}
WHERE occurred_at >= %s AND occurred_at < %s AND level = 'CRITICAL'""",
(hour_start, hour_end)
)
critical = cur.fetchone()[0] or 0
cur.execute(
f"""INSERT INTO {STATS_HOURLY}
(stat_hour, total_detections, by_risk_level, event_count, critical_count, updated_at)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (stat_hour) DO UPDATE SET
total_detections = EXCLUDED.total_detections,
by_risk_level = EXCLUDED.by_risk_level,
event_count = EXCLUDED.event_count,
critical_count = EXCLUDED.critical_count,
updated_at = EXCLUDED.updated_at""",
(hour_start, total, _jsonb(by_risk), events, critical, now)
)
# 48시간 이전 정리
cutoff = now - timedelta(hours=48)
cur.execute(f"DELETE FROM {STATS_HOURLY} WHERE stat_hour < %s", (cutoff,))
conn.commit()
result = {'hour': hour_start.isoformat(), 'detections': total, 'events': events}
logger.info(f'stats_aggregator hourly: {result}')
return result
def aggregate_daily(target_date: Optional[date] = None) -> dict:
"""지정 날짜 기준 daily 집계 (KST 기준)."""
d = target_date or datetime.now(_KST).date()
# KST 자정을 TIMESTAMPTZ로 표현 (UTC -9시간)
day_start = datetime(d.year, d.month, d.day, tzinfo=_KST)
day_end = day_start + timedelta(days=1)
now = datetime.now(timezone.utc)
with get_conn() as conn:
cur = conn.cursor()
# 총 탐지
cur.execute(
f"SELECT COUNT(*) FROM {VAR_TABLE} WHERE analyzed_at >= %s AND analyzed_at < %s",
(day_start, day_end)
)
total = cur.fetchone()[0] or 0
# 위반 유형별 (unnest)
cur.execute(
f"""SELECT unnest(violation_categories) AS vt, COUNT(*)
FROM {VAR_TABLE}
WHERE analyzed_at >= %s AND analyzed_at < %s AND violation_categories IS NOT NULL
GROUP BY vt""",
(day_start, day_end)
)
by_violation = dict(cur.fetchall())
# 위험 레벨별
cur.execute(
f"""SELECT risk_level, COUNT(*) FROM {VAR_TABLE}
WHERE analyzed_at >= %s AND analyzed_at < %s AND risk_level IS NOT NULL
GROUP BY risk_level""",
(day_start, day_end)
)
by_risk = dict(cur.fetchall())
# 이벤트
cur.execute(
f"SELECT COUNT(*) FROM {EVENTS_TABLE} WHERE occurred_at >= %s AND occurred_at < %s",
(day_start, day_end)
)
event_count = cur.fetchone()[0] or 0
cur.execute(
f"""SELECT COUNT(*) FROM {EVENTS_TABLE}
WHERE occurred_at >= %s AND occurred_at < %s AND level = 'CRITICAL'""",
(day_start, day_end)
)
critical = cur.fetchone()[0] or 0
# 단속
cur.execute(
f"SELECT COUNT(*) FROM {ENF_TABLE} WHERE enforced_at >= %s AND enforced_at < %s",
(day_start, day_end)
)
enf_count = cur.fetchone()[0] or 0
# 오탐
cur.execute(
f"""SELECT COUNT(*) FROM {EVENTS_TABLE}
WHERE occurred_at >= %s AND occurred_at < %s AND status = 'FALSE_POSITIVE'""",
(day_start, day_end)
)
fp = cur.fetchone()[0] or 0
# AI 정확도
accuracy = round((1 - fp / max(event_count, 1)) * 100, 2) if event_count > 0 else None
cur.execute(
f"""INSERT INTO {STATS_DAILY}
(stat_date, total_detections, by_violation_type, by_risk_level,
event_count, critical_event_count, enforcement_count,
false_positive_count, ai_accuracy_pct, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (stat_date) DO UPDATE SET
total_detections = EXCLUDED.total_detections,
by_violation_type = EXCLUDED.by_violation_type,
by_risk_level = EXCLUDED.by_risk_level,
event_count = EXCLUDED.event_count,
critical_event_count = EXCLUDED.critical_event_count,
enforcement_count = EXCLUDED.enforcement_count,
false_positive_count = EXCLUDED.false_positive_count,
ai_accuracy_pct = EXCLUDED.ai_accuracy_pct,
updated_at = EXCLUDED.updated_at""",
(d, total, _jsonb(by_violation), _jsonb(by_risk),
event_count, critical, enf_count, fp, accuracy, now)
)
conn.commit()
result = {'date': d.isoformat(), 'detections': total, 'events': event_count, 'accuracy': accuracy}
logger.info(f'stats_aggregator daily: {result}')
return result
def aggregate_monthly(target_month: Optional[date] = None) -> dict:
"""지정 월 기준 monthly 집계 (daily 합산, KST 기준)."""
d = target_month or datetime.now(_KST).date().replace(day=1)
month_start = d.replace(day=1)
if month_start.month == 12:
month_end = month_start.replace(year=month_start.year + 1, month=1)
else:
month_end = month_start.replace(month=month_start.month + 1)
now = datetime.now(timezone.utc)
with get_conn() as conn:
cur = conn.cursor()
cur.execute(
f"""SELECT
COALESCE(SUM(total_detections), 0),
COALESCE(SUM(event_count), 0),
COALESCE(SUM(critical_event_count), 0),
COALESCE(SUM(enforcement_count), 0),
COALESCE(SUM(false_positive_count), 0)
FROM {STATS_DAILY}
WHERE stat_date >= %s AND stat_date < %s""",
(month_start, month_end)
)
row = cur.fetchone()
total_det, evt, crit, enf, fp = row
accuracy = round((1 - fp / max(evt, 1)) * 100, 2) if evt > 0 else None
cur.execute(
f"""INSERT INTO {STATS_MONTHLY}
(stat_month, total_detections, total_enforcements,
event_count, critical_event_count, false_positive_count,
ai_accuracy_pct, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (stat_month) DO UPDATE SET
total_detections = EXCLUDED.total_detections,
total_enforcements = EXCLUDED.total_enforcements,
event_count = EXCLUDED.event_count,
critical_event_count = EXCLUDED.critical_event_count,
false_positive_count = EXCLUDED.false_positive_count,
ai_accuracy_pct = EXCLUDED.ai_accuracy_pct,
updated_at = EXCLUDED.updated_at""",
(month_start, total_det, enf, evt, crit, fp, accuracy, now)
)
conn.commit()
result = {'month': month_start.isoformat(), 'detections': total_det, 'enforcements': enf}
logger.info(f'stats_aggregator monthly: {result}')
return result