Merge pull request 'fix: prediction e2e — Decimal/violation/KPI/stats/parent workflow 5건 수정' (#12) from feature/prediction-e2e-fixes into develop

This commit is contained in:
htlee 2026-04-08 06:48:19 +09:00
커밋 1ff8a6ac7f
5개의 변경된 파일109개의 추가작업 그리고 19개의 파일을 삭제

파일 보기

@ -0,0 +1,64 @@
-- V016: parent workflow 관련 테이블에 prediction 코드가 요구하는 누락 컬럼 추가
-- gear_parent_inference.py, gear_parent_episode.py가 참조하는 컬럼들
-- === gear_parent_label_sessions ===
ALTER TABLE kcg.gear_parent_label_sessions
ADD COLUMN IF NOT EXISTS label_parent_name VARCHAR(200),
ADD COLUMN IF NOT EXISTS label_parent_vessel_id BIGINT,
ADD COLUMN IF NOT EXISTS normalized_parent_name VARCHAR(100),
ADD COLUMN IF NOT EXISTS duration_days INT DEFAULT 3,
ADD COLUMN IF NOT EXISTS actor VARCHAR(100),
ADD COLUMN IF NOT EXISTS comment TEXT,
ADD COLUMN IF NOT EXISTS metadata JSONB,
ADD COLUMN IF NOT EXISTS updated_at TIMESTAMPTZ DEFAULT now();
CREATE INDEX IF NOT EXISTS idx_label_session_norm
ON kcg.gear_parent_label_sessions(normalized_parent_name, active_from DESC)
WHERE status = 'ACTIVE';
-- === gear_group_parent_resolution ===
ALTER TABLE kcg.gear_group_parent_resolution
ADD COLUMN IF NOT EXISTS parent_name VARCHAR(200),
ADD COLUMN IF NOT EXISTS normalized_parent_name VARCHAR(100),
ADD COLUMN IF NOT EXISTS selected_parent_name VARCHAR(200),
ADD COLUMN IF NOT EXISTS selected_vessel_id BIGINT,
ADD COLUMN IF NOT EXISTS confidence NUMERIC(7,4),
ADD COLUMN IF NOT EXISTS decision_source VARCHAR(30),
ADD COLUMN IF NOT EXISTS top_score NUMERIC(7,4),
ADD COLUMN IF NOT EXISTS second_score NUMERIC(7,4),
ADD COLUMN IF NOT EXISTS score_margin NUMERIC(7,4),
ADD COLUMN IF NOT EXISTS stable_cycles INT DEFAULT 0,
ADD COLUMN IF NOT EXISTS evidence_summary JSONB,
ADD COLUMN IF NOT EXISTS episode_id VARCHAR(50),
ADD COLUMN IF NOT EXISTS continuity_source VARCHAR(30),
ADD COLUMN IF NOT EXISTS continuity_score NUMERIC(7,4),
ADD COLUMN IF NOT EXISTS prior_bonus_total NUMERIC(7,4);
CREATE INDEX IF NOT EXISTS idx_parent_resolution_episode
ON kcg.gear_group_parent_resolution(episode_id);
-- === gear_parent_candidate_exclusions ===
-- gear_parent_inference.py가 참조하는 추가 컬럼
-- 참고: 코드는 candidate_mmsi를 쿼리 — excluded_mmsi의 generated column으로 매핑
ALTER TABLE kcg.gear_parent_candidate_exclusions
ADD COLUMN IF NOT EXISTS normalized_parent_name VARCHAR(100),
ADD COLUMN IF NOT EXISTS reason_type VARCHAR(50),
ADD COLUMN IF NOT EXISTS duration_days INT,
ADD COLUMN IF NOT EXISTS metadata JSONB,
ADD COLUMN IF NOT EXISTS updated_at TIMESTAMPTZ DEFAULT now(),
ADD COLUMN IF NOT EXISTS active_from TIMESTAMPTZ DEFAULT now(),
ADD COLUMN IF NOT EXISTS active_until TIMESTAMPTZ;
-- candidate_mmsi 별칭 (generated column)
ALTER TABLE kcg.gear_parent_candidate_exclusions
ADD COLUMN IF NOT EXISTS candidate_mmsi VARCHAR(20)
GENERATED ALWAYS AS (excluded_mmsi) STORED;
-- === gear_group_parent_resolution 추가 타임스탬프 컬럼 ===
ALTER TABLE kcg.gear_group_parent_resolution
ADD COLUMN IF NOT EXISTS last_evaluated_at TIMESTAMPTZ,
ADD COLUMN IF NOT EXISTS last_promoted_at TIMESTAMPTZ;
-- === gear_group_parent_candidate_snapshots ===
ALTER TABLE kcg.gear_group_parent_candidate_snapshots
ADD COLUMN IF NOT EXISTS parent_name VARCHAR(200);

파일 보기

@ -781,8 +781,9 @@ def _load_all_scores(conn) -> dict[tuple, dict]:
result = {}
for row in cur.fetchall():
key = (row[0], row[1], row[2], row[3])
# psycopg2가 NUMERIC을 Decimal로 반환하므로 float으로 변환 (float 상수와의 연산 호환)
result[key] = {
'current_score': row[4],
'current_score': float(row[4]) if row[4] is not None else 0.0,
'streak_count': row[5],
'last_observed_at': row[6],
'target_type': row[7],

파일 보기

@ -4,7 +4,7 @@
분석 사이클마다 오늘 날짜 기준 카운트를 계산하여 6 KPI 갱신.
"""
import logging
from datetime import date, datetime, timezone
from datetime import datetime, timedelta, timezone
from config import qualified_table
from db.kcgdb import get_conn
@ -16,16 +16,20 @@ EVENTS_TABLE = qualified_table('prediction_events')
ENF_TABLE = qualified_table('enforcement_records')
VAR_TABLE = qualified_table('vessel_analysis_results')
# 한국 표준시 (운영 기준)
_KST = timezone(timedelta(hours=9))
def run_kpi_writer() -> dict:
"""
오늘 날짜 기준으로 6 KPI를 재계산하여 갱신.
오늘(KST) 날짜 기준으로 6 KPI를 재계산하여 갱신.
Returns:
{ kpi_key: value } 딕셔너리
"""
today = date.today()
today_start = datetime(today.year, today.month, today.day, tzinfo=timezone.utc)
# KST 기준 "오늘" 시작 시각 (해당 시각은 UTC로도 비교 가능하므로 DB 필드가 TIMESTAMPTZ면 안전)
now_kst = datetime.now(_KST)
today_start = now_kst.replace(hour=0, minute=0, second=0, microsecond=0)
now = datetime.now(timezone.utc)
results = {}

파일 보기

@ -22,6 +22,9 @@ VAR_TABLE = qualified_table('vessel_analysis_results')
EVENTS_TABLE = qualified_table('prediction_events')
ENF_TABLE = qualified_table('enforcement_records')
# 한국 표준시 (운영 기준 — 일/월 집계 경계)
_KST = timezone(timedelta(hours=9))
def _jsonb(d: dict) -> str:
return json.dumps(d, ensure_ascii=False)
@ -92,9 +95,10 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict:
def aggregate_daily(target_date: Optional[date] = None) -> dict:
"""지정 날짜 기준 daily 집계."""
d = target_date or date.today()
day_start = datetime(d.year, d.month, d.day, tzinfo=timezone.utc)
"""지정 날짜 기준 daily 집계 (KST 기준)."""
d = target_date or datetime.now(_KST).date()
# KST 자정을 TIMESTAMPTZ로 표현 (UTC -9시간)
day_start = datetime(d.year, d.month, d.day, tzinfo=_KST)
day_end = day_start + timedelta(days=1)
now = datetime.now(timezone.utc)
@ -186,8 +190,8 @@ def aggregate_daily(target_date: Optional[date] = None) -> dict:
def aggregate_monthly(target_month: Optional[date] = None) -> dict:
"""지정 월 기준 monthly 집계 (daily 합산)."""
d = target_month or date.today().replace(day=1)
"""지정 월 기준 monthly 집계 (daily 합산, KST 기준)."""
d = target_month or datetime.now(_KST).date().replace(day=1)
month_start = d.replace(day=1)
if month_start.month == 12:
month_end = month_start.replace(year=month_start.year + 1, month=1)

파일 보기

@ -16,7 +16,15 @@ VAR_TABLE = qualified_table('vessel_analysis_results')
def classify_violations(result: dict) -> list[str]:
"""단일 분석 결과에 대해 위반 유형 리스트 반환."""
"""단일 분석 결과에 대해 위반 유형 리스트 반환.
판정 기준:
- EEZ_VIOLATION: 중국선박(412*) + EEZ/NLL/특별금어구역 + 비허가
- DARK_VESSEL: is_dark + 30 이상
- MMSI_TAMPERING: spoofing_score > 0.6
- ILLEGAL_TRANSSHIP: transship_suspect
- RISK_BEHAVIOR: 위반 없이 risk_score >= 70
"""
violations = []
zone = result.get('zone_code', '') or ''
@ -24,13 +32,18 @@ def classify_violations(result: dict) -> list[str]:
is_dark = result.get('is_dark', False)
spoofing = result.get('spoofing_score', 0) or 0
transship = result.get('transship_suspect', False)
permit = result.get('permit_status', 'UNKNOWN') or 'UNKNOWN'
gap_min = result.get('gap_duration_min', 0) or 0
mmsi = str(result.get('mmsi', '') or '')
# permit_status는 선택적 — 없으면 중국 선박인지로 판단 (412* prefix)
permit = result.get('permit_status') or ''
is_chinese = mmsi.startswith('412') or mmsi.startswith('413')
# EEZ 침범
# EEZ 침범: 중국선박이 한국 해역에 진입 (중국선박은 기본적으로 비허가 상정)
if zone in ('NLL', 'SPECIAL_FISHING_1', 'SPECIAL_FISHING_2',
'SPECIAL_FISHING_3', 'SPECIAL_FISHING_4', 'EEZ_KR'):
if permit in ('NONE', 'EXPIRED', 'REVOKED'):
if is_chinese and permit not in ('VALID', 'PERMITTED'):
violations.append('EEZ_VIOLATION')
elif permit in ('NONE', 'EXPIRED', 'REVOKED'):
violations.append('EEZ_VIOLATION')
# 다크베셀
@ -45,7 +58,7 @@ def classify_violations(result: dict) -> list[str]:
if transship:
violations.append('ILLEGAL_TRANSSHIP')
# 어구 불법 (gear_judgment이 있는 경우)
# 어구 불법 (gear_judgment이 있는 경우만 — 현재는 scheduler에서 채우지 않음)
gear_judgment = result.get('gear_judgment', '') or ''
if gear_judgment in ('NO_PERMIT', 'GEAR_MISMATCH', 'ZONE_VIOLATION', 'SEASON_VIOLATION'):
violations.append('ILLEGAL_GEAR')
@ -61,6 +74,8 @@ def run_violation_classifier(analysis_results: list[dict]) -> dict:
"""
분석 결과 리스트에 위반 카테고리를 라벨링하고 DB 업데이트.
AnalysisResult에는 DB id가 없으므로 (mmsi, analyzed_at)으로 UPDATE.
Returns:
{ 'classified': int, 'violations_found': int }
"""
@ -69,16 +84,18 @@ def run_violation_classifier(analysis_results: list[dict]) -> dict:
for result in analysis_results:
violations = classify_violations(result)
result_id = result.get('id')
if result_id and violations:
updates.append((violations, result_id))
mmsi = result.get('mmsi')
analyzed_at = result.get('analyzed_at')
if mmsi and analyzed_at and violations:
updates.append((violations, str(mmsi), analyzed_at))
violations_found += len(violations)
if updates:
with get_conn() as conn:
execute_batch(
conn.cursor(),
f"UPDATE {VAR_TABLE} SET violation_categories = %s WHERE id = %s",
f"UPDATE {VAR_TABLE} SET violation_categories = %s "
f"WHERE mmsi = %s AND analyzed_at = %s",
updates,
)
conn.commit()