diff --git a/backend/src/main/resources/db/migration/V016__parent_workflow_columns.sql b/backend/src/main/resources/db/migration/V016__parent_workflow_columns.sql new file mode 100644 index 0000000..2702fbe --- /dev/null +++ b/backend/src/main/resources/db/migration/V016__parent_workflow_columns.sql @@ -0,0 +1,64 @@ +-- V016: parent workflow 관련 테이블에 prediction 코드가 요구하는 누락 컬럼 추가 +-- gear_parent_inference.py, gear_parent_episode.py가 참조하는 컬럼들 + +-- === gear_parent_label_sessions === +ALTER TABLE kcg.gear_parent_label_sessions + ADD COLUMN IF NOT EXISTS label_parent_name VARCHAR(200), + ADD COLUMN IF NOT EXISTS label_parent_vessel_id BIGINT, + ADD COLUMN IF NOT EXISTS normalized_parent_name VARCHAR(100), + ADD COLUMN IF NOT EXISTS duration_days INT DEFAULT 3, + ADD COLUMN IF NOT EXISTS actor VARCHAR(100), + ADD COLUMN IF NOT EXISTS comment TEXT, + ADD COLUMN IF NOT EXISTS metadata JSONB, + ADD COLUMN IF NOT EXISTS updated_at TIMESTAMPTZ DEFAULT now(); + +CREATE INDEX IF NOT EXISTS idx_label_session_norm + ON kcg.gear_parent_label_sessions(normalized_parent_name, active_from DESC) + WHERE status = 'ACTIVE'; + +-- === gear_group_parent_resolution === +ALTER TABLE kcg.gear_group_parent_resolution + ADD COLUMN IF NOT EXISTS parent_name VARCHAR(200), + ADD COLUMN IF NOT EXISTS normalized_parent_name VARCHAR(100), + ADD COLUMN IF NOT EXISTS selected_parent_name VARCHAR(200), + ADD COLUMN IF NOT EXISTS selected_vessel_id BIGINT, + ADD COLUMN IF NOT EXISTS confidence NUMERIC(7,4), + ADD COLUMN IF NOT EXISTS decision_source VARCHAR(30), + ADD COLUMN IF NOT EXISTS top_score NUMERIC(7,4), + ADD COLUMN IF NOT EXISTS second_score NUMERIC(7,4), + ADD COLUMN IF NOT EXISTS score_margin NUMERIC(7,4), + ADD COLUMN IF NOT EXISTS stable_cycles INT DEFAULT 0, + ADD COLUMN IF NOT EXISTS evidence_summary JSONB, + ADD COLUMN IF NOT EXISTS episode_id VARCHAR(50), + ADD COLUMN IF NOT EXISTS continuity_source VARCHAR(30), + ADD COLUMN IF NOT EXISTS continuity_score NUMERIC(7,4), + ADD COLUMN IF NOT EXISTS prior_bonus_total NUMERIC(7,4); + +CREATE INDEX IF NOT EXISTS idx_parent_resolution_episode + ON kcg.gear_group_parent_resolution(episode_id); + +-- === gear_parent_candidate_exclusions === +-- gear_parent_inference.py가 참조하는 추가 컬럼 +-- 참고: 코드는 candidate_mmsi를 쿼리 — excluded_mmsi의 generated column으로 매핑 +ALTER TABLE kcg.gear_parent_candidate_exclusions + ADD COLUMN IF NOT EXISTS normalized_parent_name VARCHAR(100), + ADD COLUMN IF NOT EXISTS reason_type VARCHAR(50), + ADD COLUMN IF NOT EXISTS duration_days INT, + ADD COLUMN IF NOT EXISTS metadata JSONB, + ADD COLUMN IF NOT EXISTS updated_at TIMESTAMPTZ DEFAULT now(), + ADD COLUMN IF NOT EXISTS active_from TIMESTAMPTZ DEFAULT now(), + ADD COLUMN IF NOT EXISTS active_until TIMESTAMPTZ; + +-- candidate_mmsi 별칭 (generated column) +ALTER TABLE kcg.gear_parent_candidate_exclusions + ADD COLUMN IF NOT EXISTS candidate_mmsi VARCHAR(20) + GENERATED ALWAYS AS (excluded_mmsi) STORED; + +-- === gear_group_parent_resolution 추가 타임스탬프 컬럼 === +ALTER TABLE kcg.gear_group_parent_resolution + ADD COLUMN IF NOT EXISTS last_evaluated_at TIMESTAMPTZ, + ADD COLUMN IF NOT EXISTS last_promoted_at TIMESTAMPTZ; + +-- === gear_group_parent_candidate_snapshots === +ALTER TABLE kcg.gear_group_parent_candidate_snapshots + ADD COLUMN IF NOT EXISTS parent_name VARCHAR(200); diff --git a/prediction/algorithms/gear_correlation.py b/prediction/algorithms/gear_correlation.py index 00ee786..c9615b6 100644 --- a/prediction/algorithms/gear_correlation.py +++ b/prediction/algorithms/gear_correlation.py @@ -781,8 +781,9 @@ def _load_all_scores(conn) -> dict[tuple, dict]: result = {} for row in cur.fetchall(): key = (row[0], row[1], row[2], row[3]) + # psycopg2가 NUMERIC을 Decimal로 반환하므로 float으로 변환 (float 상수와의 연산 호환) result[key] = { - 'current_score': row[4], + 'current_score': float(row[4]) if row[4] is not None else 0.0, 'streak_count': row[5], 'last_observed_at': row[6], 'target_type': row[7], diff --git a/prediction/output/kpi_writer.py b/prediction/output/kpi_writer.py index a087009..c3d1d2f 100644 --- a/prediction/output/kpi_writer.py +++ b/prediction/output/kpi_writer.py @@ -4,7 +4,7 @@ 매 분석 사이클마다 오늘 날짜 기준 카운트를 계산하여 6개 KPI 갱신. """ import logging -from datetime import date, datetime, timezone +from datetime import datetime, timedelta, timezone from config import qualified_table from db.kcgdb import get_conn @@ -16,16 +16,20 @@ EVENTS_TABLE = qualified_table('prediction_events') ENF_TABLE = qualified_table('enforcement_records') VAR_TABLE = qualified_table('vessel_analysis_results') +# 한국 표준시 (운영 기준) +_KST = timezone(timedelta(hours=9)) + def run_kpi_writer() -> dict: """ - 오늘 날짜 기준으로 6개 KPI를 재계산하여 갱신. + 오늘(KST) 날짜 기준으로 6개 KPI를 재계산하여 갱신. Returns: { kpi_key: value } 딕셔너리 """ - today = date.today() - today_start = datetime(today.year, today.month, today.day, tzinfo=timezone.utc) + # KST 기준 "오늘" 시작 시각 (해당 시각은 UTC로도 비교 가능하므로 DB 필드가 TIMESTAMPTZ면 안전) + now_kst = datetime.now(_KST) + today_start = now_kst.replace(hour=0, minute=0, second=0, microsecond=0) now = datetime.now(timezone.utc) results = {} diff --git a/prediction/output/stats_aggregator.py b/prediction/output/stats_aggregator.py index f2681d8..8564786 100644 --- a/prediction/output/stats_aggregator.py +++ b/prediction/output/stats_aggregator.py @@ -22,6 +22,9 @@ VAR_TABLE = qualified_table('vessel_analysis_results') EVENTS_TABLE = qualified_table('prediction_events') ENF_TABLE = qualified_table('enforcement_records') +# 한국 표준시 (운영 기준 — 일/월 집계 경계) +_KST = timezone(timedelta(hours=9)) + def _jsonb(d: dict) -> str: return json.dumps(d, ensure_ascii=False) @@ -92,9 +95,10 @@ def aggregate_hourly(target_hour: Optional[datetime] = None) -> dict: def aggregate_daily(target_date: Optional[date] = None) -> dict: - """지정 날짜 기준 daily 집계.""" - d = target_date or date.today() - day_start = datetime(d.year, d.month, d.day, tzinfo=timezone.utc) + """지정 날짜 기준 daily 집계 (KST 기준).""" + d = target_date or datetime.now(_KST).date() + # KST 자정을 TIMESTAMPTZ로 표현 (UTC -9시간) + day_start = datetime(d.year, d.month, d.day, tzinfo=_KST) day_end = day_start + timedelta(days=1) now = datetime.now(timezone.utc) @@ -186,8 +190,8 @@ def aggregate_daily(target_date: Optional[date] = None) -> dict: def aggregate_monthly(target_month: Optional[date] = None) -> dict: - """지정 월 기준 monthly 집계 (daily 합산).""" - d = target_month or date.today().replace(day=1) + """지정 월 기준 monthly 집계 (daily 합산, KST 기준).""" + d = target_month or datetime.now(_KST).date().replace(day=1) month_start = d.replace(day=1) if month_start.month == 12: month_end = month_start.replace(year=month_start.year + 1, month=1) diff --git a/prediction/output/violation_classifier.py b/prediction/output/violation_classifier.py index 82a23ab..d8f2e90 100644 --- a/prediction/output/violation_classifier.py +++ b/prediction/output/violation_classifier.py @@ -16,7 +16,15 @@ VAR_TABLE = qualified_table('vessel_analysis_results') def classify_violations(result: dict) -> list[str]: - """단일 분석 결과에 대해 위반 유형 리스트 반환.""" + """단일 분석 결과에 대해 위반 유형 리스트 반환. + + 판정 기준: + - EEZ_VIOLATION: 중국선박(412*) + EEZ/NLL/특별금어구역 + 비허가 + - DARK_VESSEL: is_dark + 30분 이상 갭 + - MMSI_TAMPERING: spoofing_score > 0.6 + - ILLEGAL_TRANSSHIP: transship_suspect + - RISK_BEHAVIOR: 위반 없이 risk_score >= 70 + """ violations = [] zone = result.get('zone_code', '') or '' @@ -24,13 +32,18 @@ def classify_violations(result: dict) -> list[str]: is_dark = result.get('is_dark', False) spoofing = result.get('spoofing_score', 0) or 0 transship = result.get('transship_suspect', False) - permit = result.get('permit_status', 'UNKNOWN') or 'UNKNOWN' gap_min = result.get('gap_duration_min', 0) or 0 + mmsi = str(result.get('mmsi', '') or '') + # permit_status는 선택적 — 없으면 중국 선박인지로 판단 (412* prefix) + permit = result.get('permit_status') or '' + is_chinese = mmsi.startswith('412') or mmsi.startswith('413') - # EEZ 침범 + # EEZ 침범: 중국선박이 한국 해역에 진입 (중국선박은 기본적으로 비허가 상정) if zone in ('NLL', 'SPECIAL_FISHING_1', 'SPECIAL_FISHING_2', 'SPECIAL_FISHING_3', 'SPECIAL_FISHING_4', 'EEZ_KR'): - if permit in ('NONE', 'EXPIRED', 'REVOKED'): + if is_chinese and permit not in ('VALID', 'PERMITTED'): + violations.append('EEZ_VIOLATION') + elif permit in ('NONE', 'EXPIRED', 'REVOKED'): violations.append('EEZ_VIOLATION') # 다크베셀 @@ -45,7 +58,7 @@ def classify_violations(result: dict) -> list[str]: if transship: violations.append('ILLEGAL_TRANSSHIP') - # 어구 불법 (gear_judgment이 있는 경우) + # 어구 불법 (gear_judgment이 있는 경우만 — 현재는 scheduler에서 채우지 않음) gear_judgment = result.get('gear_judgment', '') or '' if gear_judgment in ('NO_PERMIT', 'GEAR_MISMATCH', 'ZONE_VIOLATION', 'SEASON_VIOLATION'): violations.append('ILLEGAL_GEAR') @@ -61,6 +74,8 @@ def run_violation_classifier(analysis_results: list[dict]) -> dict: """ 분석 결과 리스트에 위반 카테고리를 라벨링하고 DB 업데이트. + AnalysisResult에는 DB id가 없으므로 (mmsi, analyzed_at)으로 UPDATE. + Returns: { 'classified': int, 'violations_found': int } """ @@ -69,16 +84,18 @@ def run_violation_classifier(analysis_results: list[dict]) -> dict: for result in analysis_results: violations = classify_violations(result) - result_id = result.get('id') - if result_id and violations: - updates.append((violations, result_id)) + mmsi = result.get('mmsi') + analyzed_at = result.get('analyzed_at') + if mmsi and analyzed_at and violations: + updates.append((violations, str(mmsi), analyzed_at)) violations_found += len(violations) if updates: with get_conn() as conn: execute_batch( conn.cursor(), - f"UPDATE {VAR_TABLE} SET violation_categories = %s WHERE id = %s", + f"UPDATE {VAR_TABLE} SET violation_categories = %s " + f"WHERE mmsi = %s AND analyzed_at = %s", updates, ) conn.commit()