kcg-ai-monitoring/prediction/output/violation_classifier.py
htlee da37a00b8e fix: prediction 5가지 이슈 수정 — 모든 파이프라인 정상 동작
## 이슈 1: gear_correlation Decimal → float TypeError
- prediction/algorithms/gear_correlation.py:785
- _load_all_scores()가 NUMERIC 컬럼을 Decimal로 읽어 float 상수와 연산 시 실패
- float() 명시 변환으로 수정
- 효과: gear correlation 24,474 raw metrics + 3,966 scores 정상 기록

## 이슈 2: violation_classifier classified=0 문제
- prediction/output/violation_classifier.py
- result.get('id')는 AnalysisResult에 없어 항상 None → 모든 UPDATE 건너뜀
- 존재하지 않는 permit_status/gear_judgment 필드에 의존
- (mmsi, analyzed_at) 기준 UPDATE로 변경
- 중국 선박(412/413*) + EEZ 진입은 permit 없어도 EEZ_VIOLATION 판정
- 효과: classified=0 → classified=4~6/cycle

## 이슈 3: kpi_writer 모두 0 (tracking_active 외)
- prediction/output/kpi_writer.py:27
- date.today() + timezone.utc 혼용 → 현재 시각이 UTC로는 아직 '어제'라 '오늘 >= today_start' 쿼리가 0 반환
- KST 기준으로 today_start 계산
- 효과: realtime_detection 0 → 7,107, illegal_transship 0 → 5,033

## 이슈 4: stats_daily 오늘 0건
- prediction/output/stats_aggregator.py:96, 194
- aggregate_daily/monthly가 UTC 경계 사용
- KST 기준 자정으로 수정
- 효과: 2026-04-08 detections 0 → 543,656, events 0 → 5,253

## 이슈 5: parent workflow 테이블 누락 컬럼 (V005 ↔ prediction 불일치)
V016 마이그레이션으로 일괄 추가:
- gear_parent_label_sessions: label_parent_name, normalized_parent_name,
  duration_days, actor, comment, metadata, updated_at 등 8개 컬럼
- gear_group_parent_resolution: parent_name, normalized_parent_name,
  selected_parent_name, confidence, decision_source, top_score, second_score,
  score_margin, stable_cycles, evidence_summary, episode_id, continuity_*,
  prior_bonus_total, last_evaluated_at, last_promoted_at 등 17개 컬럼
- gear_parent_candidate_exclusions: normalized_parent_name, reason_type,
  duration_days, metadata, updated_at, active_from, active_until +
  candidate_mmsi GENERATED ALWAYS AS (excluded_mmsi) 별칭
- gear_group_parent_candidate_snapshots: parent_name

효과: gear parent inference: 925 groups, 301 direct-match, 1329 candidates,
      188 review-required, 925 episode-snapshots 기록 — 전체 모선 워크플로우 정상

## 검증 결과 (e2e)

- analysis cycle: 6,824 vessels, 112초/cycle 정상
- vessel_analysis_results: 10분 13,650건, 총 125만건
- prediction_events: 1시간 138건, 총 12,258건
- prediction_alerts: 1시간 183건
- gear_correlation_scores: 3,966건
- gear_group_parent_resolution: 926건
- stats_hourly: 17행, stats_daily: 오늘 543,656건
- 백엔드 Flyway V016 정상 적용

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 06:47:53 +09:00

105 lines
3.7 KiB
Python

"""
위반 유형 라벨링 — 분석 결과에 violation_categories[] 태깅.
vessel_analysis_results의 각 행에 대해 5개 위반 카테고리를 판정하고
violation_categories TEXT[] 컬럼을 업데이트합니다.
"""
import logging
from psycopg2.extras import execute_batch
from config import qualified_table
from db.kcgdb import get_conn
logger = logging.getLogger(__name__)
VAR_TABLE = qualified_table('vessel_analysis_results')
def classify_violations(result: dict) -> list[str]:
"""단일 분석 결과에 대해 위반 유형 리스트 반환.
판정 기준:
- EEZ_VIOLATION: 중국선박(412*) + EEZ/NLL/특별금어구역 + 비허가
- DARK_VESSEL: is_dark + 30분 이상 갭
- MMSI_TAMPERING: spoofing_score > 0.6
- ILLEGAL_TRANSSHIP: transship_suspect
- RISK_BEHAVIOR: 위반 없이 risk_score >= 70
"""
violations = []
zone = result.get('zone_code', '') or ''
risk_score = result.get('risk_score', 0) or 0
is_dark = result.get('is_dark', False)
spoofing = result.get('spoofing_score', 0) or 0
transship = result.get('transship_suspect', False)
gap_min = result.get('gap_duration_min', 0) or 0
mmsi = str(result.get('mmsi', '') or '')
# permit_status는 선택적 — 없으면 중국 선박인지로 판단 (412* prefix)
permit = result.get('permit_status') or ''
is_chinese = mmsi.startswith('412') or mmsi.startswith('413')
# EEZ 침범: 중국선박이 한국 해역에 진입 (중국선박은 기본적으로 비허가 상정)
if zone in ('NLL', 'SPECIAL_FISHING_1', 'SPECIAL_FISHING_2',
'SPECIAL_FISHING_3', 'SPECIAL_FISHING_4', 'EEZ_KR'):
if is_chinese and permit not in ('VALID', 'PERMITTED'):
violations.append('EEZ_VIOLATION')
elif permit in ('NONE', 'EXPIRED', 'REVOKED'):
violations.append('EEZ_VIOLATION')
# 다크베셀
if is_dark and gap_min > 30:
violations.append('DARK_VESSEL')
# MMSI 변조
if spoofing > 0.6:
violations.append('MMSI_TAMPERING')
# 불법환적
if transship:
violations.append('ILLEGAL_TRANSSHIP')
# 어구 불법 (gear_judgment이 있는 경우만 — 현재는 scheduler에서 채우지 않음)
gear_judgment = result.get('gear_judgment', '') or ''
if gear_judgment in ('NO_PERMIT', 'GEAR_MISMATCH', 'ZONE_VIOLATION', 'SEASON_VIOLATION'):
violations.append('ILLEGAL_GEAR')
# 위험 행동 (다른 위반 없이 고위험)
if not violations and risk_score >= 70:
violations.append('RISK_BEHAVIOR')
return violations
def run_violation_classifier(analysis_results: list[dict]) -> dict:
"""
분석 결과 리스트에 위반 카테고리를 라벨링하고 DB 업데이트.
AnalysisResult에는 DB id가 없으므로 (mmsi, analyzed_at)으로 UPDATE.
Returns:
{ 'classified': int, 'violations_found': int }
"""
updates = []
violations_found = 0
for result in analysis_results:
violations = classify_violations(result)
mmsi = result.get('mmsi')
analyzed_at = result.get('analyzed_at')
if mmsi and analyzed_at and violations:
updates.append((violations, str(mmsi), analyzed_at))
violations_found += len(violations)
if updates:
with get_conn() as conn:
execute_batch(
conn.cursor(),
f"UPDATE {VAR_TABLE} SET violation_categories = %s "
f"WHERE mmsi = %s AND analyzed_at = %s",
updates,
)
conn.commit()
logger.info(f'violation_classifier: classified={len(updates)}, violations={violations_found}')
return {'classified': len(updates), 'violations_found': violations_found}