diff --git a/docs/RELEASE-NOTES.md b/docs/RELEASE-NOTES.md index ee45043..70dbb1f 100644 --- a/docs/RELEASE-NOTES.md +++ b/docs/RELEASE-NOTES.md @@ -4,6 +4,21 @@ ## [Unreleased] +### 추가 +- **Phase 2 PoC 5 모델 마이그레이션 완료 (2 런타임 + 3 카탈로그)** — Phase 2 PoC 계획서의 5 알고리즘 전체를 detection_model 카탈로그로 등록하고 운영자 파라미터 튜닝 지점을 확보. 모드는 두 층위로 분리: + - **런타임 override 완성 (2 모델)** — `dark_suspicion` (tier 3, DARK_VESSEL, 19 가중치 + sog/반복/gap/tier 임계) · `gear_violation_g01_g06` (tier 4, GEAR, 6 G-code 점수 + signal cycling + gear drift + 허용 어구 매핑). 알고리즘 함수에 `params: dict | None = None` 인자 추가, `_merge_default_*_params` 깊이 병합으로 override 가 DEFAULT 를 변조하지 않는 불변성 보장. `params=None` 호출은 Phase 2 이전과 완전 동일 결과 (BACK-COMPAT). 운영자가 version 을 ACTIVE 로 승격하면 다음 사이클부터 실제 값 교체 + - **카탈로그 + 관찰 (3 모델)** — `transshipment_5stage` (tier 4, TRANSSHIP, 5단계 필터 임계) · `risk_composite` (tier 3, META, 경량+파이프라인 가중치) · `pair_trawl_tier` (tier 4, GEAR, STRONG/PROBABLE/SUSPECT 임계). 내부 헬퍼들이 모듈 레벨 상수를 직접 참조하여 이번 단계에서는 DEFAULT_PARAMS 를 DB 에 노출 + Adapter 로 ctx.inputs 집계 관찰만 수행. 런타임 값 교체는 후속 리팩토링 PR 에서 헬퍼 params 전파를 완성하면 활성화 + - **Adapter 5종** (`prediction/models_core/registered/*_model.py`) — `BaseDetectionModel` 상속, AnalysisResult 리스트에서 결과 집계 · tier/score 분포 메트릭 자동 기록 + - **Seed SQL 5 + 통합 1** — 각 `prediction/models_core/seeds/v1_.sql` + `v1_phase2_all.sql` 이 `\i` 로 5 모델 일괄 시드. BEGIN/COMMIT 제거로 호출자 트랜잭션 제어 가능 + - **정적 동치성 검증 30 테스트** — 각 모델마다 Python DEFAULT 상수 ↔ 모듈 상수 ↔ seed SQL JSONB 3자 일치 검증. 5 모델 + Phase 1-2 기반 15 + dark 동치성 5 + Phase 2 8 신규 = 30/30 통과 + - **운영 DB dry-run 통과** — 5 모델 개별 + 일괄 seed 모두 BEGIN/ROLLBACK 격리 검증, 반영 없이 SQL 정상 동작 확인 +- **Phase 2 PoC #1 dark_suspicion 모델 마이그레이션** — `prediction/algorithms/dark_vessel.py` 의 `compute_dark_suspicion` 에 `params: dict | None = None` 인자 추가. `DARK_SUSPICION_DEFAULT_PARAMS` 상수(19개 가중치 + SOG 임계 + 반복 이력 임계 + tier 70/50/30)를 Python SSOT 로 추출하고, `_merge_default_params` 로 override 깊이 병합. `params=None` 시 Phase 2 이전과 **완전 동일한 결과** (BACK-COMPAT 보장). Adapter 클래스 `prediction/models_core/registered/dark_suspicion_model.py`(`BaseDetectionModel` 상속, AnalysisResult 리스트를 입력으로 gap_info 재평가, `evaluated/critical/high/watch_count` 메트릭 기록). Seed SQL `prediction/models_core/seeds/v1_dark_suspicion.sql` — `status=DRAFT role=NULL` 로 안전 seed (운영 영향 0, Phase 3 백엔드 API 승격 대기). 동치성 유닛테스트 5건 추가 (DEFAULT 형태 검증, None/빈dict 동치성, override 불변성, **seed SQL JSONB ↔ Python DEFAULT 1:1 정적 일치 검증**). 총 20/20 테스트 통과 +- **Phase 1-2 Detection Model Registry 기반 인프라 (prediction)** — `prediction/models_core/` 패키지 신설. `BaseDetectionModel` 추상 계약 + `ModelContext` / `ModelResult` dataclass + `ModelRegistry`(ACTIVE 버전 전체 인스턴스화, DAG 순환 검출, topological 실행 플랜) + `DAGExecutor`(PRIMARY 실행→`ctx.shared` 주입→SHADOW/CHALLENGER persist-only 실행, 후행 모델은 PRIMARY 결과만 소비하는 오염 차단 불변식) + `params_loader`(V034 `detection_model_versions.params` JSONB 로드, 5분 TTL 캐시, `invalidate_cache()` 제공) + `feature_flag`(`PREDICTION_USE_MODEL_REGISTRY=0` 기본, `PREDICTION_CONCURRENT_SHADOWS=0` 기본). `scheduler.py` 10 단계에 feature flag 분기 추가해 기존 5분 사이클을 건드리지 않고 신 경로 공존. `db/partition_manager.py` 에 `detection_model_run_outputs` 월별 파티션 자동 생성/DROP 추가(system_config `partition.detection_model_run_outputs.*` 기반, 기본 retention_months=1, create_ahead_months=2). 유닛테스트 15 케이스(DAG 순환 검출, SHADOW 오염 차단, PRIMARY 실패→downstream skip, SHADOW 실패 격리, VARCHAR(64) 초과 거부) 전수 통과. 후속 Phase 2 에서 `models_core/registered/` 에 5 모델 PoC(`dark_suspicion`/`gear_violation_g01_g06`/`transshipment_5stage`/`risk_composite`/`pair_trawl_tier`) 추가 예정 + +### 변경 +- **Snapshot 스크립트 silent-vs-fault 구분 + V030/V034 원시 관찰 섹션 추가** — `prediction/scripts/hourly-analysis-snapshot.sh`(cron 1h) + `diagnostic-snapshot.sh`(5min) 양쪽 공통. (1) `spoofing_score` 를 `gt0/gt03/gt05/gt07/avg/max` 세분화해 `spoof_hi=0` 이 "알고리즘 고장"인지 "threshold 미돌파"인지 한 눈에 구분. (2) V030 `gear_identity_collisions` 원시 테이블 섹션 신설 — `GEAR_IDENTITY_COLLISION` 이벤트만 관찰되던 상황에서 원시 테이블에 CRITICAL/OPEN 51건(coexistence 429, max_km 70km 페어) 잠복해 있음을 포착하도록 개선. (3) V034 `detection_model_*` 모니터링 섹션 — feature flag 활성화 후 모델·버전·role 별 적재·소요시간 즉시 가시화. (4) `stage_runner`(Phase 0-1) + `DAGExecutor` 로그 기반 STAGE TIMING 집계 — 소요시간 상위 10 + 실패 스테이지 식별. (5) `stats_hourly.by_category` vs raw `prediction_events.category` drift 감시 — `event_generator` silent drop 조기 탐지. redis-211 서버 반영 완료 +- **Phase 1-2 silent error 선제 방어** — V034 스키마 `VARCHAR(64)` 컬럼 초과로 persist 가 주 사이클 밖에서 silent 실패하는 경로 3 건 선제 차단. `model_id` 는 `BaseDetectionModel.__init__` 에서 즉시 `ValueError`(클래스 정의 시점 검증). `metric_key` 는 경고 후 drop(다른 metric 는 계속 저장). `DAGExecutor` 가 `ctx.conn` 을 persist 에 재사용하도록 구조화해 maxconn=5 pool 고갈 방지 (`CONCURRENT_SHADOWS=1` 시 스레드풀과 병발해도 안전) + ### 문서 - **2026-04-20 릴리즈 후속 정적 문서 최신화** — `architecture.md` 27→29 보호 경로 + 신규 라우트 2개, `sfr-traceability.md` V030→V034 · 51→56 테이블 · stage_runner · Phase 0-2/0-3 페이지 반영, `sfr-user-guide.md` 에 "불법 조업 이벤트" + "환적 의심 탐지" 사용자 가이드 섹션 신설, `system-flow-guide.md` V030~V034 매니페스트 미반영 경고 확장, `prediction-analysis.md` P1 권고 4건 중 3건 완료(✅) 표시 diff --git a/prediction/algorithms/dark_vessel.py b/prediction/algorithms/dark_vessel.py index 25038c8..f5fb0c9 100644 --- a/prediction/algorithms/dark_vessel.py +++ b/prediction/algorithms/dark_vessel.py @@ -211,6 +211,60 @@ def _is_in_kr_coverage(lat: Optional[float], lon: Optional[float]) -> bool: and _KR_COVERAGE_LON[0] <= lon <= _KR_COVERAGE_LON[1]) +# compute_dark_suspicion 의 기본 파라미터 (`params=None` 시 사용). +# Phase 2 마이그레이션 — detection_model_versions.params JSONB 로 seed 되며, +# 운영자가 /ai/detection-models/{dark_suspicion}/versions 로 DRAFT → ACTIVE 시 교체. +# Python 상수를 단일 진실 공급원으로 삼고 registry seed 가 이 값을 그대로 복사한다. +DARK_SUSPICION_DEFAULT_PARAMS: dict = { + 'sog_thresholds': { + 'moving': 5.0, # P1 이동 중 OFF 판정 속도 + 'slow_moving': 2.0, # P1 서행 OFF 판정 속도 + 'underway_deliberate': 3.0, # P10 'under way' + 속도 시 의도성 + }, + 'heading_cog_mismatch_deg': 60.0, # P11 heading vs COG diff 임계 + 'weights': { + 'P1_moving_off': 25, + 'P1_slow_moving_off': 15, + 'P2_sensitive_zone': 25, + 'P2_special_zone': 15, + 'P3_repeat_high': 30, + 'P3_repeat_low': 15, + 'P3_recent_dark': 10, + 'P4_distance_anomaly': 20, + 'P5_daytime_fishing_off': 15, + 'P6_teleport_before_gap': 15, + 'P7_unpermitted': 10, + 'P8_very_long_gap': 15, + 'P8_long_gap': 10, + 'P9_fishing_vessel_dark': 10, + 'P9_cargo_natural_gap': -10, + 'P10_underway_deliberate': 20, + 'P10_anchored_natural': -15, + 'P11_heading_cog_mismatch': 15, + 'out_of_coverage': -50, + }, + 'repeat_thresholds': {'h7_high': 3, 'h7_low': 2, 'h24_recent': 1}, + 'gap_min_thresholds': {'very_long': 360, 'long': 180}, + 'p4_distance_multiplier': 2.0, # 예상 이동거리 대비 비정상 판정 배수 + 'p5_daytime_range': [6, 18], # [start, end) KST 시 + 'tier_thresholds': {'critical': 70, 'high': 50, 'watch': 30}, +} + + +def _merge_default_params(override: Optional[dict]) -> dict: + """override 딕셔너리의 값을 DEFAULT 에 깊이 병합 (unset 키는 기본값 사용).""" + if not override: + return DARK_SUSPICION_DEFAULT_PARAMS + merged = {k: (dict(v) if isinstance(v, dict) else v) + for k, v in DARK_SUSPICION_DEFAULT_PARAMS.items()} + for key, val in override.items(): + if isinstance(val, dict) and isinstance(merged.get(key), dict): + merged[key] = {**merged[key], **val} + else: + merged[key] = val + return merged + + def compute_dark_suspicion( gap_info: dict, mmsi: str, @@ -222,6 +276,7 @@ def compute_dark_suspicion( nav_status: str = '', heading: Optional[float] = None, last_cog: Optional[float] = None, + params: Optional[dict] = None, ) -> tuple[int, list[str], str]: """의도적 AIS OFF 의심 점수 산출. @@ -236,6 +291,8 @@ def compute_dark_suspicion( nav_status: 항해 상태 텍스트 ("Under way using engine" 등) heading: 선수 방향 (0~360, signal-batch API) last_cog: gap 직전 침로 (0~360) + params: detection_model_versions.params (None 이면 DEFAULT_PARAMS). + 동일 입력 + params=None 은 Phase 2 이전과 완전 동일한 결과를 낸다. Returns: (score, patterns, tier) @@ -244,6 +301,14 @@ def compute_dark_suspicion( if not gap_info.get('is_dark'): return 0, [], 'NONE' + p = _merge_default_params(params) + w = p['weights'] + sog = p['sog_thresholds'] + rpt = p['repeat_thresholds'] + gmt = p['gap_min_thresholds'] + tier_thr = p['tier_thresholds'] + day_start, day_end = p['p5_daytime_range'] + score = 0 patterns: list[str] = [] @@ -254,11 +319,11 @@ def compute_dark_suspicion( gap_min = gap_info.get('gap_min') or 0 # P1: 이동 중 OFF - if gap_start_sog > 5.0: - score += 25 + if gap_start_sog > sog['moving']: + score += w['P1_moving_off'] patterns.append('moving_at_off') - elif gap_start_sog > 2.0: - score += 15 + elif gap_start_sog > sog['slow_moving']: + score += w['P1_slow_moving_off'] patterns.append('slow_moving_at_off') # P2: gap 시작 위치의 민감 수역 @@ -267,10 +332,10 @@ def compute_dark_suspicion( zone_info = classify_zone_fn(gap_start_lat, gap_start_lon) zone = zone_info.get('zone', '') if zone in ('TERRITORIAL_SEA', 'CONTIGUOUS_ZONE'): - score += 25 + score += w['P2_sensitive_zone'] patterns.append('sensitive_zone') elif zone.startswith('ZONE_'): - score += 15 + score += w['P2_special_zone'] patterns.append('special_zone') except Exception: pass @@ -278,14 +343,14 @@ def compute_dark_suspicion( # P3: 반복 이력 (과거 7일) h7 = int(history.get('count_7d', 0) or 0) h24 = int(history.get('count_24h', 0) or 0) - if h7 >= 3: - score += 30 + if h7 >= rpt['h7_high']: + score += w['P3_repeat_high'] patterns.append('repeat_high') - elif h7 >= 2: - score += 15 + elif h7 >= rpt['h7_low']: + score += w['P3_repeat_low'] patterns.append('repeat_low') - if h24 >= 1: - score += 10 + if h24 >= rpt['h24_recent']: + score += w['P3_recent_dark'] patterns.append('recent_dark') # P4: gap 후 이동 거리 비정상 @@ -293,78 +358,73 @@ def compute_dark_suspicion( avg_sog_before = gap_info.get('avg_sog_before') or 0.0 if gap_info.get('gap_resumed') and gap_min > 0: gap_hours = gap_min / 60.0 - # 예상 이동 = avg_sog * gap_hours. 2배 초과면 비정상 expected = max(gap_hours * max(avg_sog_before, 1.0), 0.5) - if gap_distance_nm > expected * 2.0: - score += 20 + if gap_distance_nm > expected * p['p4_distance_multiplier']: + score += w['P4_distance_anomaly'] patterns.append('distance_anomaly') # P5: 주간 조업 시간 OFF - if 6 <= now_kst_hour < 18 and gap_start_state == 'FISHING': - score += 15 + if day_start <= now_kst_hour < day_end and gap_start_state == 'FISHING': + score += w['P5_daytime_fishing_off'] patterns.append('daytime_fishing_off') # P6: gap 직전 이상 행동 if gap_info.get('pre_gap_turn_or_teleport'): - score += 15 + score += w['P6_teleport_before_gap'] patterns.append('teleport_before_gap') # P7: 무허가 if not is_permitted: - score += 10 + score += w['P7_unpermitted'] patterns.append('unpermitted') # P8: gap 길이 - if gap_min >= 360: - score += 15 + if gap_min >= gmt['very_long']: + score += w['P8_very_long_gap'] patterns.append('very_long_gap') - elif gap_min >= 180: - score += 10 + elif gap_min >= gmt['long']: + score += w['P8_long_gap'] patterns.append('long_gap') - # P9: 선종별 가중치 (signal-batch API 데이터) + # P9: 선종별 가중치 if ship_kind_code == '000020': - # 어선이면서 dark → 불법조업 의도 가능성 - score += 10 + score += w['P9_fishing_vessel_dark'] patterns.append('fishing_vessel_dark') elif ship_kind_code == '000023': - # 화물선은 원양 항해 중 자연 gap 빈번 - score -= 10 + score += w['P9_cargo_natural_gap'] patterns.append('cargo_natural_gap') # P10: 항해 상태 기반 의도성 if nav_status: status_lower = nav_status.lower() - if 'under way' in status_lower and gap_start_sog > 3.0: - # 항행 중 갑자기 OFF → 의도적 - score += 20 + if 'under way' in status_lower and gap_start_sog > sog['underway_deliberate']: + score += w['P10_underway_deliberate'] patterns.append('underway_deliberate_off') elif 'anchor' in status_lower or 'moored' in status_lower: - # 정박 중 gap → 자연스러움 - score -= 15 + score += w['P10_anchored_natural'] patterns.append('anchored_natural_gap') - # P11: heading vs COG 불일치 (의도적 방향 전환) + # P11: heading vs COG 불일치 if heading is not None and last_cog is not None: diff = abs(heading - last_cog) % 360 if diff > 180: diff = 360 - diff - if diff > 60: - score += 15 + if diff > p['heading_cog_mismatch_deg']: + score += w['P11_heading_cog_mismatch'] patterns.append('heading_cog_mismatch') - # 감점: gap 시작 위치가 한국 수신 커버리지 밖 → 자연 gap 가능성 높음 + # 감점: gap 시작 위치가 한국 수신 커버리지 밖 if not _is_in_kr_coverage(gap_start_lat, gap_start_lon): - score -= 50 + score += w['out_of_coverage'] patterns.append('out_of_coverage') score = max(0, min(100, score)) - if score >= 70: + if score >= tier_thr['critical']: tier = 'CRITICAL' - elif score >= 50: + elif score >= tier_thr['high']: tier = 'HIGH' - elif score >= 30: + elif score >= tier_thr['watch']: tier = 'WATCH' else: tier = 'NONE' diff --git a/prediction/algorithms/gear_violation.py b/prediction/algorithms/gear_violation.py index 56d6705..70756f8 100644 --- a/prediction/algorithms/gear_violation.py +++ b/prediction/algorithms/gear_violation.py @@ -51,6 +51,67 @@ GEAR_DRIFT_THRESHOLD_NM = 0.270 # ≈ 500m (DAR-03 스펙, 조류 보정 전) FIXED_GEAR_TYPES = {'GN', 'TRAP', 'FYK', 'FPO', 'GNS', 'GND'} +# classify_gear_violations 의 Phase 2 파라미터 SSOT — DB seed 는 이 값을 그대로 복사 +GEAR_VIOLATION_DEFAULT_PARAMS: dict = { + 'scores': { + 'G01_zone_violation': G01_SCORE, + 'G02_closed_season': G02_SCORE, + 'G03_unregistered_gear': G03_SCORE, + 'G04_signal_cycling': G04_SCORE, + 'G05_gear_drift': G05_SCORE, + 'G06_pair_trawl': G06_SCORE, + }, + 'signal_cycling': { + 'gap_min': SIGNAL_CYCLING_GAP_MIN, + 'min_count': SIGNAL_CYCLING_MIN_COUNT, + }, + 'gear_drift_threshold_nm': GEAR_DRIFT_THRESHOLD_NM, + 'fixed_gear_types': sorted(FIXED_GEAR_TYPES), + 'fishery_code_allowed_gear': { + k: sorted(v) for k, v in FISHERY_CODE_ALLOWED_GEAR.items() + }, +} + + +def _merge_default_gv_params(override: Optional[dict]) -> dict: + """GEAR_VIOLATION_DEFAULT_PARAMS 에 override 깊이 병합. list/set 키는 override 가 치환.""" + if not override: + return GEAR_VIOLATION_DEFAULT_PARAMS + merged = { + k: (dict(v) if isinstance(v, dict) else + (list(v) if isinstance(v, list) else v)) + for k, v in GEAR_VIOLATION_DEFAULT_PARAMS.items() + } + for key, val in override.items(): + if isinstance(val, dict) and isinstance(merged.get(key), dict): + merged[key] = {**merged[key], **val} + else: + merged[key] = val + return merged + + +def _detect_signal_cycling_count( + gear_episodes: list[dict], threshold_min: int, +) -> tuple[int, int]: + """_detect_signal_cycling 의 count-만 변형 (threshold 를 params 에서 받기 위함). + + Returns: (cycling_count, total_episodes_evaluated) + """ + if not gear_episodes or len(gear_episodes) < 2: + return 0, len(gear_episodes or []) + sorted_eps = sorted(gear_episodes, key=lambda e: e['first_seen_at']) + cnt = 0 + for i in range(1, len(sorted_eps)): + prev_end = sorted_eps[i - 1].get('last_seen_at') + curr_start = sorted_eps[i].get('first_seen_at') + if prev_end is None or curr_start is None: + continue + gap_min = (curr_start - prev_end).total_seconds() / 60.0 + if 0 < gap_min <= threshold_min: + cnt += 1 + return cnt, len(sorted_eps) + + def _haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """두 좌표 간 거리 (해리) — Haversine 공식.""" R = 3440.065 @@ -196,6 +257,7 @@ def classify_gear_violations( permit_periods: Optional[list[tuple[datetime, datetime]]] = None, registered_fishery_code: Optional[str] = None, observation_ts: Optional[datetime] = None, + params: Optional[dict] = None, ) -> dict: """어구 위반 G코드 분류 메인 함수 (DAR-03). @@ -229,7 +291,19 @@ def classify_gear_violations( } 판정 우선순위: ZONE_VIOLATION > PAIR_TRAWL > GEAR_MISMATCH > '' (정상) + + params: detection_model_versions.params (None 이면 DEFAULT_PARAMS). + params=None 호출은 Phase 2 이전과 완전히 동일한 결과를 낸다. """ + p = _merge_default_gv_params(params) + scores = p['scores'] + sc = p['signal_cycling'] + fixed_gear_types = set(p['fixed_gear_types']) + # JSONB 는 list 로 저장되므로 set 으로 변환하여 _is_unregistered_gear 호출 + allowed_gear_map = { + k: set(v) for k, v in p['fishery_code_allowed_gear'].items() + } + g_codes: list[str] = [] evidence: dict = {} score = 0 @@ -241,7 +315,7 @@ def classify_gear_violations( allowed_gears: list[str] = zone_info.get('allowed_gears', []) if allowed_gears and gear_type not in allowed_gears: g_codes.append('G-01') - score += G01_SCORE + score += scores['G01_zone_violation'] evidence['G-01'] = { 'zone': zone, 'gear': gear_type, @@ -262,7 +336,7 @@ def classify_gear_violations( in_closed = False if in_closed: g_codes.append('G-02') - score += G02_SCORE + score += scores['G02_closed_season'] evidence['G-02'] = { 'observed_at': observation_ts.isoformat() if observation_ts else None, 'permit_periods': [ @@ -276,18 +350,25 @@ def classify_gear_violations( # ── G-03: 미등록/허가외 어구 ────────────────────────────────── if registered_fishery_code: try: - unregistered = _is_unregistered_gear(gear_type, registered_fishery_code) + # params 로 덮어쓴 매핑을 전달 (_is_unregistered_gear 는 기존 공개 시그니처 유지 — BACK-COMPAT) + allowed_set = allowed_gear_map.get( + registered_fishery_code.upper().strip() + ) + if allowed_set is None: + unregistered = False + else: + unregistered = gear_type.upper().strip() not in allowed_set except Exception as exc: logger.error('G-03 평가 실패 [mmsi=%s]: %s', mmsi, exc) unregistered = False if unregistered: g_codes.append('G-03') - score += G03_SCORE + score += scores['G03_unregistered_gear'] evidence['G-03'] = { 'detected_gear': gear_type, 'registered_fishery_code': registered_fishery_code, 'allowed_gears': sorted( - FISHERY_CODE_ALLOWED_GEAR.get( + allowed_gear_map.get( registered_fishery_code.upper().strip(), set() ) ), @@ -300,19 +381,22 @@ def classify_gear_violations( ) # ── G-04: MMSI 조작 의심 (고정어구 신호 on/off 반복) ─────────── - if gear_episodes is not None and gear_type in FIXED_GEAR_TYPES: + if gear_episodes is not None and gear_type in fixed_gear_types: try: - is_cycling, cycling_count = _detect_signal_cycling(gear_episodes) + cycling_count, _ = _detect_signal_cycling_count( + gear_episodes, threshold_min=sc['gap_min'], + ) + is_cycling = cycling_count >= sc['min_count'] except Exception as exc: logger.error('G-04 평가 실패 [mmsi=%s]: %s', mmsi, exc) is_cycling, cycling_count = False, 0 if is_cycling: g_codes.append('G-04') - score += G04_SCORE + score += scores['G04_signal_cycling'] evidence['G-04'] = { 'cycling_count': cycling_count, - 'threshold_min': SIGNAL_CYCLING_GAP_MIN, + 'threshold_min': sc['gap_min'], } if not judgment: judgment = 'GEAR_MISMATCH' @@ -321,16 +405,18 @@ def classify_gear_violations( ) # ── G-05: 고정어구 인위적 이동 ──────────────────────────────── - if gear_positions is not None and gear_type in FIXED_GEAR_TYPES: + if gear_positions is not None and gear_type in fixed_gear_types: try: - drift_result = _detect_gear_drift(gear_positions) + drift_result = _detect_gear_drift( + gear_positions, threshold_nm=p['gear_drift_threshold_nm'], + ) except Exception as exc: logger.error('G-05 평가 실패 [mmsi=%s]: %s', mmsi, exc) drift_result = {'drift_detected': False, 'drift_nm': 0.0, 'tidal_corrected': False} if drift_result['drift_detected']: g_codes.append('G-05') - score += G05_SCORE + score += scores['G05_gear_drift'] evidence['G-05'] = drift_result if not judgment: judgment = 'GEAR_MISMATCH' @@ -341,7 +427,7 @@ def classify_gear_violations( # ── G-06: 쌍끌이 공조 조업 ──────────────────────────────────── if pair_result and pair_result.get('pair_detected'): g_codes.append('G-06') - score += G06_SCORE + score += scores['G06_pair_trawl'] evidence['G-06'] = { 'sync_duration_min': pair_result.get('sync_duration_min'), 'mean_separation_nm': pair_result.get('mean_separation_nm'), diff --git a/prediction/algorithms/pair_trawl.py b/prediction/algorithms/pair_trawl.py index d794fea..964cc3c 100644 --- a/prediction/algorithms/pair_trawl.py +++ b/prediction/algorithms/pair_trawl.py @@ -67,6 +67,42 @@ CANDIDATE_PROXIMITY_FACTOR = 2.0 # 후보 탐색 반경: PROXIMITY_NM × 2 CANDIDATE_SOG_MIN = 1.5 # 후보 속력 하한 (완화) CANDIDATE_SOG_MAX = 5.0 # 후보 속력 상한 (완화) + +# Phase 2 PoC #5 — pair_trawl_tier 카탈로그 등록용 params snapshot. +# 내부 헬퍼들이 모듈 레벨 상수를 직접 참조하므로 이번 단계는 카탈로그·관찰만. +# 런타임 override 는 후속 리팩토링 PR 에서 활성화. +PAIR_TRAWL_DEFAULT_PARAMS: dict = { + 'cycle_interval_min': CYCLE_INTERVAL_MIN, + 'strong': { + 'proximity_nm': PROXIMITY_NM, + 'sog_delta_max': SOG_DELTA_MAX, + 'cog_delta_max': COG_DELTA_MAX, + 'sog_min': SOG_MIN, + 'sog_max': SOG_MAX, + 'min_sync_cycles': MIN_SYNC_CYCLES, + 'simultaneous_gap_min': SIMULTANEOUS_GAP_MIN, + }, + 'probable': { + 'min_block_cycles': PROBABLE_MIN_BLOCK_CYCLES, + 'min_sync_ratio': PROBABLE_MIN_SYNC_RATIO, + 'proximity_nm': PROBABLE_PROXIMITY_NM, + 'sog_delta_max': PROBABLE_SOG_DELTA_MAX, + 'cog_delta_max': PROBABLE_COG_DELTA_MAX, + 'sog_min': PROBABLE_SOG_MIN, + 'sog_max': PROBABLE_SOG_MAX, + }, + 'suspect': { + 'min_block_cycles': SUSPECT_MIN_BLOCK_CYCLES, + 'min_sync_ratio': SUSPECT_MIN_SYNC_RATIO, + }, + 'candidate_scan': { + 'cell_size_deg': CELL_SIZE, + 'proximity_factor': CANDIDATE_PROXIMITY_FACTOR, + 'sog_min': CANDIDATE_SOG_MIN, + 'sog_max': CANDIDATE_SOG_MAX, + }, +} + # ────────────────────────────────────────────────────────────── # 내부 헬퍼 # ────────────────────────────────────────────────────────────── diff --git a/prediction/algorithms/risk.py b/prediction/algorithms/risk.py index 61cfd32..b7271d1 100644 --- a/prediction/algorithms/risk.py +++ b/prediction/algorithms/risk.py @@ -7,6 +7,59 @@ from algorithms.dark_vessel import detect_ais_gaps from algorithms.spoofing import detect_teleportation +# Phase 2 PoC #4 — risk_composite 카탈로그 등록용 params snapshot. +# 현 런타임은 모듈 레벨 상수/inline 숫자를 직접 사용하며, 운영자 UI 에서 +# 주요 가중치·임계를 조회·튜닝할 수 있도록 DB 에 노출한다. 런타임 override +# 는 후속 리팩토링 PR 에서 compute_lightweight_risk_score / compute_vessel_risk_score +# 에 params 인자 전파를 완성하면서 활성화된다. +RISK_COMPOSITE_DEFAULT_PARAMS: dict = { + 'tier_thresholds': {'critical': 70, 'high': 50, 'medium': 30}, + # 경량(파이프라인 미통과) 경로 — compute_lightweight_risk_score + 'lightweight_weights': { + 'territorial_sea': 40, + 'contiguous_zone': 15, + 'zone_unpermitted': 25, + 'eez_lt12nm': 15, + 'eez_lt24nm': 8, + 'dark_suspicion_multiplier': 0.3, + 'dark_gap_720_min': 25, + 'dark_gap_180_min': 20, + 'dark_gap_60_min': 15, + 'dark_gap_30_min': 8, + 'spoofing_gt07': 15, + 'spoofing_gt05': 8, + 'unpermitted_alone': 15, + 'unpermitted_with_suspicion': 8, + 'repeat_gte5': 10, + 'repeat_gte2': 5, + }, + # 파이프라인 통과(정밀) 경로 — compute_vessel_risk_score + 'pipeline_weights': { + 'territorial_sea': 40, + 'contiguous_zone': 10, + 'zone_unpermitted': 25, + 'territorial_fishing': 20, + 'fishing_segments_any': 5, + 'trawl_uturn': 10, + 'teleportation': 20, + 'speed_jumps_ge3': 10, + 'speed_jumps_ge1': 5, + 'critical_gaps_ge60': 15, + 'any_gaps': 5, + 'unpermitted': 20, + }, + 'dark_suspicion_fallback_gap_min': { + 'very_long_720': 720, + 'long_180': 180, + 'mid_60': 60, + 'short_30': 30, + }, + 'spoofing_thresholds': {'high_0.7': 0.7, 'medium_0.5': 0.5}, + 'eez_proximity_nm': {'inner_12': 12, 'outer_24': 24}, + 'repeat_thresholds': {'h24_high': 5, 'h24_low': 2}, +} + + def compute_lightweight_risk_score( zone_info: dict, sog: float, diff --git a/prediction/algorithms/transshipment.py b/prediction/algorithms/transshipment.py index 040ba72..fb4d5b3 100644 --- a/prediction/algorithms/transshipment.py +++ b/prediction/algorithms/transshipment.py @@ -48,6 +48,27 @@ _EXCLUDED_SHIP_TY = frozenset({ # shipTy 텍스트에 포함되면 CARRIER 로 승격 (부분일치, 대소문자 무시) _CARRIER_HINTS = ('cargo', 'tanker', 'supply', 'carrier', 'reefer') + +# Phase 2 PoC #3 — 카탈로그 등록용 파라미터 snapshot. +# 내부 헬퍼 함수들이 모듈 레벨 상수를 직접 쓰기 때문에 이번 단계에서는 +# 런타임 override 없이 **카탈로그·관찰만 등록**한다. +# 운영자가 UI 에서 현재 값을 확인 가능하도록 DB 에 노출되며, 실제 값 교체는 +# 후속 리팩토링 PR 에서 _is_proximity / _is_approach / _evict_expired 등 +# 헬퍼에 params 인자를 전파하면서 활성화된다. +TRANSSHIPMENT_DEFAULT_PARAMS: dict = { + 'sog_threshold_kn': SOG_THRESHOLD_KN, + 'proximity_deg': PROXIMITY_DEG, + 'approach_deg': APPROACH_DEG, + 'rendezvous_min': RENDEZVOUS_MIN, + 'pair_expiry_min': PAIR_EXPIRY_MIN, + 'gap_tolerance_cycles': GAP_TOLERANCE_CYCLES, + 'fishing_kinds': sorted(_FISHING_KINDS), + 'carrier_kinds': sorted(_CARRIER_KINDS), + 'excluded_ship_ty': sorted(_EXCLUDED_SHIP_TY), + 'carrier_hints': list(_CARRIER_HINTS), + 'min_score': 50, # detect_transshipment 의 `score >= 50만` 출력 필터 +} + # ────────────────────────────────────────────────────────────── # 감시영역 로드 # ────────────────────────────────────────────────────────────── diff --git a/prediction/db/partition_manager.py b/prediction/db/partition_manager.py index 9941229..636b434 100644 --- a/prediction/db/partition_manager.py +++ b/prediction/db/partition_manager.py @@ -17,6 +17,7 @@ logger = logging.getLogger(__name__) SYSTEM_CONFIG = qualified_table('system_config') GEAR_CORRELATION_RAW_METRICS = qualified_table('gear_correlation_raw_metrics') GEAR_CORRELATION_SCORES = qualified_table('gear_correlation_scores') +DETECTION_MODEL_RUN_OUTPUTS = qualified_table('detection_model_run_outputs') def _get_config_int(conn, key: str, default: int) -> int: @@ -99,6 +100,100 @@ def _drop_expired_partitions(conn, retention_days: int) -> int: return dropped +def _create_future_monthly_detection_partitions(conn, months_ahead: int) -> int: + """detection_model_run_outputs 미래 N개월 파티션 생성. + + 월별 RANGE 파티션 (cycle_started_at) — V034 에서 2026-04/05 가 Flyway 로 선생성. + 이후는 이 함수가 매일 돌면서 `months_ahead` 만큼 미리 생성. + + Returns: + 생성된 파티션 수 + """ + cur = conn.cursor() + created = 0 + try: + anchor = date.today().replace(day=1) + for i in range(months_ahead + 1): + # anchor 기준 +i 개월 + y = anchor.year + (anchor.month - 1 + i) // 12 + m = (anchor.month - 1 + i) % 12 + 1 + start = date(y, m, 1) + ny = y + (1 if m == 12 else 0) + nm = 1 if m == 12 else m + 1 + end = date(ny, nm, 1) + partition_name = f'detection_model_run_outputs_{y:04d}_{m:02d}' + cur.execute( + "SELECT 1 FROM pg_class c " + "JOIN pg_namespace n ON n.oid = c.relnamespace " + "WHERE c.relname = %s AND n.nspname = %s", + (partition_name, settings.KCGDB_SCHEMA), + ) + if cur.fetchone() is None: + cur.execute( + f"CREATE TABLE IF NOT EXISTS {qualified_table(partition_name)} " + f"PARTITION OF {DETECTION_MODEL_RUN_OUTPUTS} " + f"FOR VALUES FROM ('{start.isoformat()}') TO ('{end.isoformat()}')" + ) + created += 1 + logger.info( + 'created partition: %s.%s', settings.KCGDB_SCHEMA, partition_name, + ) + conn.commit() + except Exception as e: + conn.rollback() + logger.error('failed to create detection_model_run_outputs partitions: %s', e) + finally: + cur.close() + return created + + +def _drop_expired_monthly_detection_partitions(conn, retention_months: int) -> int: + """detection_model_run_outputs retention_months 초과 월 파티션 DROP. + + SHADOW 원시 결과는 비교 분석 후 가치 낮음 — 기본 retention 은 1개월. + 집계는 detection_model_metrics 에 보존되므로 원시 폐기해도 추적 가능. + """ + cutoff_anchor = date.today().replace(day=1) + # retention_months 만큼 과거로 이동 + y = cutoff_anchor.year + m = cutoff_anchor.month - retention_months + while m <= 0: + m += 12 + y -= 1 + cutoff = date(y, m, 1) + + cur = conn.cursor() + dropped = 0 + try: + cur.execute( + "SELECT c.relname FROM pg_class c " + "JOIN pg_namespace n ON n.oid = c.relnamespace " + "WHERE c.relname LIKE 'detection_model_run_outputs_%%' " + "AND n.nspname = %s AND c.relkind = 'r'", + (settings.KCGDB_SCHEMA,), + ) + for (name,) in cur.fetchall(): + tail = name[len('detection_model_run_outputs_'):] + try: + yy, mm = tail.split('_') + partition_start = date(int(yy), int(mm), 1) + except (ValueError, IndexError): + continue + if partition_start < cutoff: + cur.execute(f'DROP TABLE IF EXISTS {qualified_table(name)}') + dropped += 1 + logger.info( + 'dropped expired partition: %s.%s', settings.KCGDB_SCHEMA, name, + ) + conn.commit() + except Exception as e: + conn.rollback() + logger.error('failed to drop detection_model_run_outputs partitions: %s', e) + finally: + cur.close() + return dropped + + def _cleanup_stale_scores(conn, cleanup_days: int) -> int: """cleanup_days 이상 미관측 점수 레코드 삭제.""" cur = conn.cursor() @@ -131,13 +226,25 @@ def maintain_partitions(): retention = _get_config_int(conn, 'partition.raw_metrics.retention_days', 7) ahead = _get_config_int(conn, 'partition.raw_metrics.create_ahead_days', 3) cleanup_days = _get_config_int(conn, 'partition.scores.cleanup_days', 30) + det_months_ahead = _get_config_int( + conn, 'partition.detection_model_run_outputs.create_ahead_months', 2, + ) + det_retention_months = _get_config_int( + conn, 'partition.detection_model_run_outputs.retention_months', 1, + ) created = _create_future_partitions(conn, ahead) dropped = _drop_expired_partitions(conn, retention) cleaned = _cleanup_stale_scores(conn, cleanup_days) + det_created = _create_future_monthly_detection_partitions(conn, det_months_ahead) + det_dropped = _drop_expired_monthly_detection_partitions(conn, det_retention_months) + logger.info( 'partition maintenance: %d created, %d dropped, %d stale scores cleaned ' - '(retention=%dd, ahead=%dd, cleanup=%dd)', + '(retention=%dd, ahead=%dd, cleanup=%dd); ' + 'detection_model_run_outputs: %d created, %d dropped ' + '(retention_months=%d, ahead_months=%d)', created, dropped, cleaned, retention, ahead, cleanup_days, + det_created, det_dropped, det_retention_months, det_months_ahead, ) diff --git a/prediction/models_core/__init__.py b/prediction/models_core/__init__.py new file mode 100644 index 0000000..acd9b67 --- /dev/null +++ b/prediction/models_core/__init__.py @@ -0,0 +1,26 @@ +"""Detection Model Registry (Phase 1-2). + +V034 detection_models / detection_model_versions 스키마 위에서 +`ACTIVE` 상태 버전들을 인스턴스화하여 사이클 내에서 실행·비교하는 프레임. + +공개 모듈: +- base : BaseDetectionModel, ModelContext, ModelResult +- params_loader: detection_model_versions.params JSONB 로드 + TTL 캐시 +- registry : ACTIVE 버전 전체 로드 + DAG 검증 +- executor : topo 순서 PRIMARY 실행 → ctx.shared 주입 → SHADOW/CHALLENGER 실행 +- feature_flag : 신·구 경로 토글 + +핵심 불변식 (오염 차단): +- SHADOW/CHALLENGER 의 결과는 `ctx.shared[model_id]` 에 기록되지 않는다. +- 후행 PRIMARY 모델은 선행 PRIMARY 결과만 입력으로 받는다. +""" + +from .base import BaseDetectionModel, ModelContext, ModelResult +from .feature_flag import use_model_registry + +__all__ = [ + 'BaseDetectionModel', + 'ModelContext', + 'ModelResult', + 'use_model_registry', +] diff --git a/prediction/models_core/base.py b/prediction/models_core/base.py new file mode 100644 index 0000000..6e9db7b --- /dev/null +++ b/prediction/models_core/base.py @@ -0,0 +1,150 @@ +"""Detection Model 추상 계층. + +prediction 모듈의 기존 함수형 알고리즘(`algorithms/*`) 을 그대로 두고, +Adapter 형태로 감싸서 "모델 단위 실행·버전·파라미터"를 표준화한다. + +설계: +- `ModelContext` — 한 사이클의 공통 입력/공유 상태 (불변 전제) +- `ModelResult` — 한 모델·한 버전의 실행 결과 (입력별 output + 메트릭) +- `BaseDetectionModel` — 등록 가능한 최소 계약 (model_id / version / role / params / run) + +불변식: +- SHADOW/CHALLENGER 는 `ctx.shared[model_id]` 에 기록되지 않음 (Executor 책임) +- `params` 는 DRAFT 로 수정, ACTIVE 는 immutable 스냅샷 (DB 제약과 같은 규약) +""" +from __future__ import annotations + +import logging +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Optional + +logger = logging.getLogger(__name__) + + +# role 상수 — DB CHECK 제약과 동일한 문자열 +ROLE_PRIMARY = 'PRIMARY' +ROLE_SHADOW = 'SHADOW' +ROLE_CHALLENGER = 'CHALLENGER' +ALLOWED_ROLES = (ROLE_PRIMARY, ROLE_SHADOW, ROLE_CHALLENGER) + + +@dataclass +class ModelContext: + """한 사이클 공통 입력 + 모델 간 공유 상태. + + Attributes: + cycle_started_at: 사이클 시작 시각 (모든 모델·버전이 공유) + vessel_store: 인메모리 AIS 캐시 (Optional — 테스트 시 None 허용) + conn: kcgdb psycopg2 connection (Optional — 테스트 시 None 허용) + shared: 선행 모델 PRIMARY 결과 보관소. key=model_id, value=ModelResult + SHADOW/CHALLENGER 는 여기에 쓰지 않는다 (오염 차단). + inputs: 모델이 소비할 공통 입력 목록 (선박 row 등). 버전 간 공정 비교 보장. + extras: 필요시 모델별 보조 데이터 (feature flag, tunable 등) + """ + cycle_started_at: datetime + vessel_store: Any = None + conn: Any = None + shared: dict = field(default_factory=dict) + inputs: list = field(default_factory=list) + extras: dict = field(default_factory=dict) + + +@dataclass +class ModelResult: + """한 모델·한 버전의 실행 결과. + + Attributes: + model_id: 모델 식별자 + version_id: detection_model_versions.id + version_str: 'v1.0.0' 등 사람이 읽는 버전 문자열 + role: PRIMARY / SHADOW / CHALLENGER + outputs_per_input: [(input_ref, output_dict), ...] + input_ref 는 비교용 키(예: {'mmsi': '412...', 'analyzed_at': ...}) + output_dict 는 JSONB 저장 가능한 결과 snapshot + metrics: detection_model_metrics 로 기록될 집계 관측치 + (key=metric_key, value=numeric) + duration_ms: 이 버전 단위 실행 소요 + """ + model_id: str + version_id: int + version_str: str + role: str + outputs_per_input: list[tuple[dict, dict]] = field(default_factory=list) + metrics: dict[str, float] = field(default_factory=dict) + duration_ms: int = 0 + + +class BaseDetectionModel(ABC): + """탐지 모델 추상 베이스. + + 구현체는 `prediction/models_core/registered/` 하위에 두고 + `ModelRegistry.discover_classes()` 가 자동 import 한다. + + 클래스 레벨 속성(model_id / depends_on) 은 **클래스 정의 시** 고정, + 인스턴스 속성(version_id / version_str / role / params) 은 + `ModelRegistry` 가 ACTIVE 버전 스냅샷을 읽어 주입한다. + + 한 `BaseDetectionModel` 서브클래스에 대해 DB 에 N 개 ACTIVE 버전이 있으면 + Registry 는 **각 버전마다 별도 인스턴스**를 생성한다 (PRIMARY 1 + SHADOW/CHALLENGER N). + """ + + # --- 클래스 메타 (서브클래스가 override) --- + model_id: str = '' + depends_on: list[str] = [] + + # V034 스키마 컬럼 길이 상한 — 운영자 실수·장기 실행에서 silent 한 persist 실패를 + # 방지하기 위해 클래스 정의 시점에 선제 검증한다. + _MODEL_ID_MAXLEN = 64 + + def __init__( + self, + version_id: int, + version_str: str, + role: str, + params: dict, + ) -> None: + if role not in ALLOWED_ROLES: + raise ValueError(f'invalid role: {role!r} (expected {ALLOWED_ROLES})') + if not self.model_id: + raise ValueError( + f'{type(self).__name__}.model_id is empty — override as class attribute' + ) + if len(self.model_id) > self._MODEL_ID_MAXLEN: + raise ValueError( + f'{type(self).__name__}.model_id too long ' + f'({len(self.model_id)} > {self._MODEL_ID_MAXLEN}): {self.model_id!r}' + ) + self.version_id = version_id + self.version_str = version_str + self.role = role + self.params: dict = dict(params) if params else {} + + # --- 서브클래스 구현 포인트 --- + @abstractmethod + def run(self, ctx: ModelContext) -> ModelResult: + """한 사이클에 대해 모델을 실행. + + 반환값의 `outputs_per_input` 은 입력 단위 비교가 가능하도록 + **같은 input_ref 스키마를 같은 model_id 내에서 유지**해야 한다. + (PRIMARY 와 SHADOW 의 input_ref 가 일치해야 diff JOIN 이 가능.) + """ + raise NotImplementedError + + # --- 편의 --- + def label(self) -> str: + return f'{self.model_id}@{self.role}[{self.version_str}]' + + def __repr__(self) -> str: # pragma: no cover + return f'<{type(self).__name__} {self.label()} version_id={self.version_id}>' + + +def make_input_ref(mmsi: str, analyzed_at: Optional[datetime] = None, **extra) -> dict: + """관용 input_ref 생성기. PRIMARY/SHADOW 가 같은 포맷을 쓰도록 강제하는 도우미.""" + ref: dict[str, Any] = {'mmsi': str(mmsi)} + if analyzed_at is not None: + ref['analyzed_at'] = analyzed_at.isoformat() if isinstance(analyzed_at, datetime) else analyzed_at + for k, v in extra.items(): + ref[k] = v + return ref diff --git a/prediction/models_core/executor.py b/prediction/models_core/executor.py new file mode 100644 index 0000000..bffa280 --- /dev/null +++ b/prediction/models_core/executor.py @@ -0,0 +1,287 @@ +"""DAGExecutor — ExecutionPlan 을 실제로 돌리고 DB 에 결과/메트릭을 기록한다. + +불변식 (테스트로도 검증): +1. PRIMARY 실행 결과만 `ctx.shared[model_id]` 에 주입 (후행 모델의 입력 소스). +2. SHADOW/CHALLENGER 결과는 `detection_model_run_outputs` 에 저장만, shared 에 **절대 주입 금지**. +3. PRIMARY 가 실패하면 후행 모델 실행 skip (upstream 결과 없음). + SHADOW/CHALLENGER 실패는 그 버전만 skip, 다른 버전·후행 모델에 영향 없음. + +DB persist: +- detection_model_run_outputs (PARTITION BY cycle_started_at): execute_values 배치 INSERT +- detection_model_metrics: 집계 메트릭 + +참고: docs/prediction-analysis.md §7, plans/vast-tinkering-knuth.md Phase 1-2 +""" +from __future__ import annotations + +import logging +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Optional + +from pipeline.stage_runner import run_stage + +from .base import BaseDetectionModel, ModelContext, ModelResult, ROLE_PRIMARY +from .feature_flag import concurrent_shadows +from .registry import ExecutionPlan + +logger = logging.getLogger(__name__) + +# V034 스키마 VARCHAR(64) — 초과하면 persist 가 silent 하게 실패하므로 선제 절단·경고 +_METRIC_KEY_MAXLEN = 64 + + +class DAGExecutor: + """ExecutionPlan 을 실행하고 DB persist 를 담당. + + persist 는 ctx.conn 을 재사용한다 (pool 중복 획득 방지). + ctx.conn 이 None 이면 기본 persist 함수들이 자체적으로 get_conn() 호출. + """ + + def __init__( + self, + plan: ExecutionPlan, + *, + persist_fn=None, + persist_metrics_fn=None, + ) -> None: + self.plan = plan + # 테스트에서 DB 없이 돌리기 위해 persist 훅을 주입 가능하게 만든다. + self._persist_fn = persist_fn or _persist_run_outputs + self._persist_metrics_fn = persist_metrics_fn or _persist_metrics + self._ctx_conn = None # run() 진입 시 셋업 + + def run(self, ctx: ModelContext) -> dict: + """전체 Plan 실행. + + Returns: + {'executed': int, 'failed': int, 'shadow_ran': int, 'shadow_failed': int} + """ + # ctx.conn 이 있으면 persist 도 이 conn 을 재사용하도록 보관한다. + # (maxconn=5 pool 고갈 방지 — persist 마다 별도 get_conn() 획득 금지) + self._ctx_conn = getattr(ctx, 'conn', None) + + summary = { + 'executed': 0, + 'failed': 0, + 'skipped_missing_deps': 0, + 'shadow_ran': 0, + 'shadow_failed': 0, + } + + for model_id in self.plan.topo_order: + primary = self.plan.primaries.get(model_id) + shadows = list(self.plan.shadows.get(model_id, [])) + + if primary is None: + # PRIMARY 없이 SHADOW 만 있는 모델은 실행 불가 (비교 기준이 없음) + if shadows: + logger.warning( + 'model %s has %d SHADOW/CHALLENGER but no PRIMARY — skipping', + model_id, len(shadows), + ) + continue + + # upstream PRIMARY 결과가 모두 있는지 확인 + missing = [ + dep for dep in self.plan.edges.get(model_id, ()) + if dep not in ctx.shared + ] + if missing: + summary['skipped_missing_deps'] += 1 + logger.warning( + 'skip %s — upstream PRIMARY missing: %s', + primary.label(), missing, + ) + # SHADOW 도 같은 이유로 스킵 (정당한 비교 불가) + continue + + primary_result = self._run_single(primary, ctx) + if primary_result is None: + summary['failed'] += 1 + # SHADOW 는 같은 입력이 있어야 비교 의미가 있으므로 이 사이클에선 스킵 + continue + + summary['executed'] += 1 + ctx.shared[model_id] = primary_result + self._persist(primary_result, ctx.cycle_started_at) + + # SHADOW/CHALLENGER 는 shared 주입 **금지** — 결과 persist 만 + if shadows: + ran, failed = self._run_shadows(shadows, ctx) + summary['shadow_ran'] += ran + summary['shadow_failed'] += failed + + logger.info( + 'DAGExecutor done: executed=%d failed=%d skip_deps=%d shadow_ran=%d shadow_failed=%d', + summary['executed'], summary['failed'], + summary['skipped_missing_deps'], + summary['shadow_ran'], summary['shadow_failed'], + ) + return summary + + # ------------------------------------------------------------------ + def _run_single(self, model: BaseDetectionModel, ctx: ModelContext) -> Optional[ModelResult]: + """run_stage 로 감싸서 실패 격리 + 지속시간 계측.""" + t0 = time.time() + result = run_stage(model.label(), model.run, ctx, required=False) + if result is None: + return None + # duration_ms 가 비어있으면 여기서 채움 + if not result.duration_ms: + result.duration_ms = int((time.time() - t0) * 1000) + return result + + def _run_shadows( + self, + shadows: list[BaseDetectionModel], + ctx: ModelContext, + ) -> tuple[int, int]: + ran = 0 + failed = 0 + if concurrent_shadows() and len(shadows) > 1: + with ThreadPoolExecutor(max_workers=min(4, len(shadows))) as pool: + futures = {pool.submit(self._run_single, s, ctx): s for s in shadows} + for fut in as_completed(futures): + s = futures[fut] + try: + r = fut.result() + except Exception: + logger.exception('shadow %s raised', s.label()) + r = None + if r is None: + failed += 1 + continue + ran += 1 + self._persist(r, ctx.cycle_started_at) + else: + for s in shadows: + r = self._run_single(s, ctx) + if r is None: + failed += 1 + continue + ran += 1 + self._persist(r, ctx.cycle_started_at) + return ran, failed + + def _persist(self, result: ModelResult, cycle_started_at) -> None: + conn = self._ctx_conn + try: + self._persist_fn(result, cycle_started_at, conn=conn) + except Exception: + logger.exception( + 'failed to persist run_outputs for %s', result.model_id, + ) + try: + self._persist_metrics_fn(result, cycle_started_at, conn=conn) + except Exception: + logger.exception( + 'failed to persist metrics for %s', result.model_id, + ) + + +# ---------------------------------------------------------------------- +# 기본 persist 구현 — kcgdb 연결을 얻어서 직접 INSERT +# ---------------------------------------------------------------------- +_INSERT_RUN_OUTPUTS = """ + INSERT INTO kcg.detection_model_run_outputs ( + cycle_started_at, model_id, version_id, role, + input_ref, outputs, cycle_duration_ms + ) VALUES %s +""" + +_INSERT_METRICS = """ + INSERT INTO kcg.detection_model_metrics ( + model_id, version_id, role, metric_key, metric_value, cycle_started_at + ) VALUES %s +""" + + +def _persist_run_outputs(result: ModelResult, cycle_started_at, *, conn=None) -> None: + """detection_model_run_outputs 배치 INSERT. + + conn 이 전달되면 **재사용** (pool 중복 획득 방지, 커밋 책임은 호출자). + None 이면 자체적으로 kcgdb.get_conn() 으로 커넥션을 얻고 직접 커밋. + """ + if not result.outputs_per_input: + return + from psycopg2.extras import Json, execute_values + + rows = [ + ( + cycle_started_at, + result.model_id, + result.version_id, + result.role, + Json(input_ref or {}), + Json(output or {}), + result.duration_ms, + ) + for input_ref, output in result.outputs_per_input + ] + _execute_insert(_INSERT_RUN_OUTPUTS, rows, conn=conn) + + +def _execute_insert(sql: str, rows: list, *, conn=None) -> None: + """execute_values 공통 — conn 재사용 시 commit 은 호출자 책임.""" + if not rows: + return + from psycopg2.extras import execute_values + + if conn is not None: + cur = conn.cursor() + try: + execute_values(cur, sql, rows, page_size=200) + conn.commit() + except Exception: + conn.rollback() + raise + finally: + cur.close() + return + + from db import kcgdb + with kcgdb.get_conn() as fresh_conn: + cur = fresh_conn.cursor() + try: + execute_values(cur, sql, rows, page_size=200) + fresh_conn.commit() + except Exception: + fresh_conn.rollback() + raise + finally: + cur.close() + + +def _persist_metrics(result: ModelResult, cycle_started_at, *, conn=None) -> None: + """detection_model_metrics 배치 INSERT. cycle_duration_ms 기본 포함. + + conn 이 전달되면 재사용, None 이면 자체 get_conn(). + metric_key VARCHAR(64) 초과는 경고 후 드롭 (silent 실패 방지). + """ + metrics = dict(result.metrics or {}) + metrics.setdefault('cycle_duration_ms', float(result.duration_ms)) + metrics.setdefault('output_count', float(len(result.outputs_per_input))) + + rows = [] + for key, val in metrics.items(): + if val is None: + continue + if len(key) > _METRIC_KEY_MAXLEN: + logger.warning( + 'metric_key %r exceeds VARCHAR(%d) — dropping (model=%s version=%s)', + key, _METRIC_KEY_MAXLEN, result.model_id, result.version_id, + ) + continue + rows.append(( + result.model_id, + result.version_id, + result.role, + key, + float(val), + cycle_started_at, + )) + _execute_insert(_INSERT_METRICS, rows, conn=conn) + + +__all__ = ['DAGExecutor'] diff --git a/prediction/models_core/feature_flag.py b/prediction/models_core/feature_flag.py new file mode 100644 index 0000000..33dd18a --- /dev/null +++ b/prediction/models_core/feature_flag.py @@ -0,0 +1,29 @@ +"""Detection Model Registry feature flag. + +신·구 prediction 경로를 공존시키는 동안 환경변수로 토글한다. +초기 배포에서는 **0 (구 경로 유지)** 가 기본 — Phase 2 PoC 이 신·구 diff=0 +동치성을 확인한 뒤 1 로 전환하는 별도 릴리즈를 내는 전략. + +환경변수: + PREDICTION_USE_MODEL_REGISTRY '1' 이면 DAGExecutor 기반 신 경로 사용 + PREDICTION_CONCURRENT_SHADOWS '1' 이면 SHADOW/CHALLENGER 를 스레드풀 동시 실행 + (기본 0 — 순차 실행, psycopg2 pool 안전) +""" +from __future__ import annotations + +import os + + +def _bool_env(key: str, default: str = '0') -> bool: + raw = os.getenv(key, default).strip().lower() + return raw in ('1', 'true', 'yes', 'on') + + +def use_model_registry() -> bool: + """models_core Registry·Executor 기반 경로 사용 여부.""" + return _bool_env('PREDICTION_USE_MODEL_REGISTRY', '0') + + +def concurrent_shadows() -> bool: + """SHADOW/CHALLENGER 를 ThreadPoolExecutor 로 동시 실행할지.""" + return _bool_env('PREDICTION_CONCURRENT_SHADOWS', '0') diff --git a/prediction/models_core/params_loader.py b/prediction/models_core/params_loader.py new file mode 100644 index 0000000..f5b240e --- /dev/null +++ b/prediction/models_core/params_loader.py @@ -0,0 +1,176 @@ +"""`detection_model_versions.params` JSONB 로더 + 5분 TTL 캐시. + +- correlation_param_models 패턴의 일반화 — **매 사이클 재로드**를 기본으로, + 다만 한 사이클 내에서 여러 번 조회되는 경우를 위해 TTL 캐시를 둔다. +- Registry 가 ACTIVE 버전 목록을 조회할 때와 executor 가 개별 버전 params 를 + 쓸 때 공통으로 사용. + +반환 스키마: + VersionRow = { + 'id': int, # detection_model_versions.id + 'model_id': str, + 'version': str, + 'role': str, # PRIMARY / SHADOW / CHALLENGER + 'params': dict, # JSONB + } +""" +from __future__ import annotations + +import json +import logging +import threading +import time +from typing import Optional, TypedDict + +logger = logging.getLogger(__name__) + + +class VersionRow(TypedDict): + id: int + model_id: str + version: str + role: str + params: dict + + +_DEFAULT_TTL_SEC = 300 # 5분 + + +class _ParamsCache: + """간단 TTL 캐시 (프로세스 로컬). + + thread-safe: Registry 재구성은 사이클 시작 스레드에서만 일어나지만 + APScheduler 가 동시 job 을 허용할 수 있어 락으로 보호한다. + """ + + def __init__(self, ttl_sec: int = _DEFAULT_TTL_SEC) -> None: + self._ttl = ttl_sec + self._lock = threading.Lock() + self._rows: Optional[list[VersionRow]] = None + self._loaded_at: float = 0.0 + + def get(self, conn, *, force: bool = False) -> list[VersionRow]: + now = time.time() + with self._lock: + stale = ( + self._rows is None + or force + or (now - self._loaded_at) > self._ttl + ) + if stale: + self._rows = _fetch_active_versions(conn) + self._loaded_at = now + logger.info( + 'params cache reloaded: %d ACTIVE versions (ttl=%ds)', + len(self._rows), self._ttl, + ) + return list(self._rows or []) + + def invalidate(self) -> None: + with self._lock: + self._rows = None + self._loaded_at = 0.0 + + +_cache = _ParamsCache() + + +def load_active_versions(conn, *, force_reload: bool = False) -> list[VersionRow]: + """ACTIVE 상태의 모든 model_id × version 을 한 번에 조회. + + model 단위로 PRIMARY 1 개 + SHADOW/CHALLENGER N 개가 섞여 반환될 수 있다. + Registry 가 그룹화를 담당. + + Args: + conn: psycopg2 connection + force_reload: True 면 TTL 무시하고 DB 재조회 + + Returns: + VersionRow 리스트 + """ + return _cache.get(conn, force=force_reload) + + +def invalidate_cache() -> None: + """운영자 API 가 version 을 promote·archive 한 직후 호출하면 + 다음 조회에서 즉시 DB 재로드가 일어난다. + """ + _cache.invalidate() + + +def _fetch_active_versions(conn) -> list[VersionRow]: + """SQL — kcg.detection_model_versions WHERE status='ACTIVE'. + + JSONB 는 psycopg2 기본 설정에서 이미 dict 로 반환되지만, 안전을 위해 + str 인 경우에도 json.loads 로 파싱한다. + """ + sql = """ + SELECT v.id, + v.model_id, + v.version, + v.role, + v.params + FROM kcg.detection_model_versions v + JOIN kcg.detection_models m ON m.model_id = v.model_id + WHERE v.status = 'ACTIVE' + AND m.is_enabled = TRUE + ORDER BY v.model_id, + CASE v.role + WHEN 'PRIMARY' THEN 0 + WHEN 'CHALLENGER' THEN 1 + WHEN 'SHADOW' THEN 2 + ELSE 3 + END, + v.id + """ + rows: list[VersionRow] = [] + cur = conn.cursor() + try: + cur.execute(sql) + for row in cur.fetchall(): + vid, model_id, version, role, params = row + if isinstance(params, (bytes, bytearray)): + params = params.decode('utf-8') + if isinstance(params, str): + try: + params = json.loads(params) + except json.JSONDecodeError: + logger.warning( + 'detection_model_versions.id=%s params JSON decode failed — treated as {}', + vid, + ) + params = {} + rows.append( + VersionRow( + id=int(vid), + model_id=str(model_id), + version=str(version), + role=str(role), + params=dict(params or {}), + ) + ) + return rows + finally: + cur.close() + + +def load_dependencies(conn) -> list[tuple[str, str, str]]: + """detection_model_dependencies 전체 엣지 반환. + + Returns: + [(model_id, depends_on, input_key), ...] + """ + sql = """ + SELECT model_id, depends_on, input_key + FROM kcg.detection_model_dependencies + ORDER BY model_id, depends_on, input_key + """ + cur = conn.cursor() + try: + cur.execute(sql) + return [ + (str(m), str(d), str(k)) + for m, d, k in cur.fetchall() + ] + finally: + cur.close() diff --git a/prediction/models_core/registered/__init__.py b/prediction/models_core/registered/__init__.py new file mode 100644 index 0000000..3c19f1c --- /dev/null +++ b/prediction/models_core/registered/__init__.py @@ -0,0 +1,5 @@ +"""`BaseDetectionModel` 구현체 등록소. + +Phase 1-2 기반 PR 에서는 실제 구현체가 없다 (Phase 2 에서 5 모델 PoC 추가). +이 디렉토리는 `ModelRegistry.discover_classes()` 가 `importlib` 으로 스캔한다. +""" diff --git a/prediction/models_core/registered/dark_suspicion_model.py b/prediction/models_core/registered/dark_suspicion_model.py new file mode 100644 index 0000000..362d9a2 --- /dev/null +++ b/prediction/models_core/registered/dark_suspicion_model.py @@ -0,0 +1,99 @@ +"""dark_suspicion — 의도적 AIS OFF 의심 점수 모델 (Phase 2 PoC #1). + +구조: +- 기존 `algorithms.dark_vessel.compute_dark_suspicion` 을 그대로 사용 (BACK-COMPAT). +- 입력 단위: `ctx.inputs` 의 각 항목(AnalysisResult asdict) → (mmsi, gap_info). + prediction 기존 사이클이 이미 `analyze_dark_pattern` 을 돌려 AnalysisResult.features + 에 gap_info 와 dark_patterns 를 저장하므로, 이 모델은 **그 결과에 대해 score 를 재계산** + 하는 shadow 비교용이다. PRIMARY 경로는 아직 `scheduler.py` 의 기존 계산을 사용. + +Phase 2 동치성 검증: +- params=None 로 호출하면 compute_dark_suspicion 이 DEFAULT_PARAMS 를 사용해 + 기존 하드코딩 상수와 완전히 동일한 score/tier 를 낸다. +- detection_model_versions.params 가 DEFAULT 와 동일하면 신·구 경로 diff=0. + +Phase 3 백엔드 API 연동 후 PRIMARY 로 승격하면 scheduler 도 이 모델을 호출하도록 +전환 (현재는 ctx.shared 주입만, 기존 경로 영향 없음). +""" +from __future__ import annotations + +from algorithms.dark_vessel import ( + DARK_SUSPICION_DEFAULT_PARAMS, + compute_dark_suspicion, +) +from algorithms.location import classify_zone +from models_core.base import ( + BaseDetectionModel, + ModelContext, + ModelResult, + make_input_ref, +) + + +class DarkSuspicionModel(BaseDetectionModel): + model_id = 'dark_suspicion' + depends_on: list[str] = [] + + def run(self, ctx: ModelContext) -> ModelResult: + outputs_per_input: list[tuple[dict, dict]] = [] + critical = 0 + high = 0 + watch = 0 + for row in ctx.inputs or []: + if not row: + continue + mmsi = row.get('mmsi') + features = row.get('features') or {} + gap_info = features.get('gap_info') if isinstance(features, dict) else None + if not gap_info or not gap_info.get('is_dark'): + continue + + history = { + 'count_7d': row.get('dark_count_7d', 0), + 'count_24h': row.get('dark_count_24h', 0), + } + now_kst_hour = row.get('now_kst_hour', 0) + score, patterns, tier = compute_dark_suspicion( + gap_info=gap_info, + mmsi=mmsi, + is_permitted=bool(row.get('is_permitted', False)), + history=history, + now_kst_hour=int(now_kst_hour or 0), + classify_zone_fn=classify_zone, + ship_kind_code=row.get('ship_kind_code', '') or '', + nav_status=row.get('nav_status', '') or '', + heading=row.get('heading'), + last_cog=row.get('last_cog'), + params=self.params or None, + ) + if tier == 'CRITICAL': + critical += 1 + elif tier == 'HIGH': + high += 1 + elif tier == 'WATCH': + watch += 1 + outputs_per_input.append(( + make_input_ref(mmsi, row.get('analyzed_at'), gap_min=gap_info.get('gap_min')), + { + 'score': int(score), + 'tier': tier, + 'patterns': patterns, + }, + )) + + return ModelResult( + model_id=self.model_id, + version_id=self.version_id, + version_str=self.version_str, + role=self.role, + outputs_per_input=outputs_per_input, + metrics={ + 'evaluated_count': float(len(outputs_per_input)), + 'critical_count': float(critical), + 'high_count': float(high), + 'watch_count': float(watch), + }, + ) + + +__all__ = ['DarkSuspicionModel', 'DARK_SUSPICION_DEFAULT_PARAMS'] diff --git a/prediction/models_core/registered/gear_violation_model.py b/prediction/models_core/registered/gear_violation_model.py new file mode 100644 index 0000000..bcd4e61 --- /dev/null +++ b/prediction/models_core/registered/gear_violation_model.py @@ -0,0 +1,71 @@ +"""gear_violation_g01_g06 — 어구 위반 G-01~G-06 종합 모델 (Phase 2 PoC #2). + +기존 `algorithms.gear_violation.classify_gear_violations` 을 얇게 감싸는 Adapter. +scheduler.py 가 이미 이 함수를 호출해 AnalysisResult.features 에 결과를 저장하므로, +본 모델은 **ctx.inputs 의 AnalysisResult 들에서 그 결과를 관찰·집계**하는 역할. + +입력: +- ctx.inputs 의 각 row (AnalysisResult asdict) 중 features.g_codes 가 비어있지 않은 선박. + +출력: +- outputs_per_input: (input_ref, {g_codes, judgment, score}) — 원시 결과 snapshot +- metrics: + · evaluated_count : G-code 탐지된 선박 수 + · g01_count ~ g06_count : 각 G-code 별 탐지 빈도 + · pair_trawl_count : G-06 탐지 + · closed_season_count : G-02 탐지 +""" +from __future__ import annotations + +from algorithms.gear_violation import GEAR_VIOLATION_DEFAULT_PARAMS +from models_core.base import BaseDetectionModel, ModelContext, ModelResult, make_input_ref + + +class GearViolationModel(BaseDetectionModel): + model_id = 'gear_violation_g01_g06' + depends_on: list[str] = [] + + def run(self, ctx: ModelContext) -> ModelResult: + outputs_per_input: list[tuple[dict, dict]] = [] + counts = {f'g0{i}_count': 0 for i in range(1, 7)} + evaluated = 0 + + for row in ctx.inputs or []: + if not row: + continue + features = row.get('features') or {} + if not isinstance(features, dict): + continue + g_codes = features.get('g_codes') or [] + if not g_codes: + continue + + evaluated += 1 + for code in g_codes: + key = code.replace('-', '').lower() + '_count' # 'G-01' → 'g01_count' + if key in counts: + counts[key] += 1 + + outputs_per_input.append(( + make_input_ref(row.get('mmsi'), row.get('analyzed_at')), + { + 'g_codes': list(g_codes), + 'gear_judgment': features.get('gear_judgment', ''), + 'gear_violation_score': int(features.get('gear_violation_score') or 0), + }, + )) + + metrics = {'evaluated_count': float(evaluated)} + metrics.update({k: float(v) for k, v in counts.items()}) + + return ModelResult( + model_id=self.model_id, + version_id=self.version_id, + version_str=self.version_str, + role=self.role, + outputs_per_input=outputs_per_input, + metrics=metrics, + ) + + +__all__ = ['GearViolationModel', 'GEAR_VIOLATION_DEFAULT_PARAMS'] diff --git a/prediction/models_core/registered/pair_trawl_model.py b/prediction/models_core/registered/pair_trawl_model.py new file mode 100644 index 0000000..368cf62 --- /dev/null +++ b/prediction/models_core/registered/pair_trawl_model.py @@ -0,0 +1,65 @@ +"""pair_trawl_tier — 쌍끌이 공조 tier 분류 관찰 어댑터 (Phase 2 PoC #5). + +기존 `algorithms.pair_trawl` 이 STRONG/PROBABLE/SUSPECT tier 로 판정한 결과를 +ctx.inputs (AnalysisResult) 에서 관찰 집계. 런타임 params override 는 후속 PR. + +metrics: + · evaluated_count : pair_trawl_detected=True 선박 수 + · tier_{strong/probable/suspect}_count +""" +from __future__ import annotations + +from algorithms.pair_trawl import PAIR_TRAWL_DEFAULT_PARAMS +from models_core.base import BaseDetectionModel, ModelContext, ModelResult, make_input_ref + + +class PairTrawlModel(BaseDetectionModel): + model_id = 'pair_trawl_tier' + depends_on: list[str] = [] + + def run(self, ctx: ModelContext) -> ModelResult: + outputs_per_input: list[tuple[dict, dict]] = [] + tiers = {'STRONG': 0, 'PROBABLE': 0, 'SUSPECT': 0} + evaluated = 0 + + for row in ctx.inputs or []: + if not row: + continue + features = row.get('features') or {} + if not isinstance(features, dict): + continue + if features.get('pair_trawl_detected') is not True: + continue + + evaluated += 1 + tier = features.get('pair_tier', '') + if tier in tiers: + tiers[tier] += 1 + + outputs_per_input.append(( + make_input_ref(row.get('mmsi'), row.get('analyzed_at')), + { + 'pair_tier': tier, + 'pair_type': features.get('pair_type'), + 'pair_mmsi': features.get('pair_mmsi'), + 'similarity': features.get('similarity'), + 'confidence': features.get('confidence'), + }, + )) + + return ModelResult( + model_id=self.model_id, + version_id=self.version_id, + version_str=self.version_str, + role=self.role, + outputs_per_input=outputs_per_input, + metrics={ + 'evaluated_count': float(evaluated), + 'tier_strong_count': float(tiers['STRONG']), + 'tier_probable_count': float(tiers['PROBABLE']), + 'tier_suspect_count': float(tiers['SUSPECT']), + }, + ) + + +__all__ = ['PairTrawlModel', 'PAIR_TRAWL_DEFAULT_PARAMS'] diff --git a/prediction/models_core/registered/risk_composite_model.py b/prediction/models_core/registered/risk_composite_model.py new file mode 100644 index 0000000..af7488f --- /dev/null +++ b/prediction/models_core/registered/risk_composite_model.py @@ -0,0 +1,66 @@ +"""risk_composite — 종합 위험도 관찰 어댑터 (Phase 2 PoC #4). + +현재 `algorithms.risk` 의 compute_*_risk_score 들이 inline 숫자로 점수를 계산하므로 +이 단계에서는 카탈로그 등록 + AnalysisResult 의 risk_score/risk_level 을 집계 관찰만. +런타임 params override 는 후속 리팩토링 PR 에서 활성화. + +metrics: + · evaluated_count : 전체 관찰 수 + · avg_risk_score + · tier_{critical/high/medium/low}_count +""" +from __future__ import annotations + +from algorithms.risk import RISK_COMPOSITE_DEFAULT_PARAMS +from models_core.base import BaseDetectionModel, ModelContext, ModelResult, make_input_ref + + +class RiskCompositeModel(BaseDetectionModel): + model_id = 'risk_composite' + depends_on: list[str] = [] + + def run(self, ctx: ModelContext) -> ModelResult: + outputs_per_input: list[tuple[dict, dict]] = [] + tiers = {'CRITICAL': 0, 'HIGH': 0, 'MEDIUM': 0, 'LOW': 0} + score_sum = 0.0 + evaluated = 0 + + for row in ctx.inputs or []: + if not row: + continue + score = row.get('risk_score') + level = row.get('risk_level', '') + if score is None: + continue + evaluated += 1 + score_sum += float(score) + if level in tiers: + tiers[level] += 1 + outputs_per_input.append(( + make_input_ref(row.get('mmsi'), row.get('analyzed_at')), + { + 'risk_score': int(score), + 'risk_level': level, + 'vessel_type': row.get('vessel_type'), + }, + )) + + avg = score_sum / evaluated if evaluated else 0.0 + return ModelResult( + model_id=self.model_id, + version_id=self.version_id, + version_str=self.version_str, + role=self.role, + outputs_per_input=outputs_per_input, + metrics={ + 'evaluated_count': float(evaluated), + 'avg_risk_score': round(avg, 2), + 'tier_critical_count': float(tiers['CRITICAL']), + 'tier_high_count': float(tiers['HIGH']), + 'tier_medium_count': float(tiers['MEDIUM']), + 'tier_low_count': float(tiers['LOW']), + }, + ) + + +__all__ = ['RiskCompositeModel', 'RISK_COMPOSITE_DEFAULT_PARAMS'] diff --git a/prediction/models_core/registered/transshipment_model.py b/prediction/models_core/registered/transshipment_model.py new file mode 100644 index 0000000..ce610d3 --- /dev/null +++ b/prediction/models_core/registered/transshipment_model.py @@ -0,0 +1,67 @@ +"""transshipment_5stage — 5단계 환적 탐지 관찰 어댑터 (Phase 2 PoC #3). + +현재 `algorithms.transshipment.detect_transshipment` 의 내부 헬퍼가 모듈 레벨 +상수를 직접 참조하므로 이번 단계에서는 **카탈로그 등록 + 관찰 수집** 만 담당. +런타임 params override 는 후속 리팩토링 PR 에서 헬퍼 시그니처 확장과 함께 활성화. + +입력: ctx.inputs (AnalysisResult asdict). `transship_suspect=True` 인 선박을 집계. +출력: +- outputs_per_input: (input_ref, {pair_mmsi, duration_min, tier, score, pair_type}) +- metrics: + · evaluated_count : transship_suspect True 선박 수 + · tier_critical_count / tier_high_count / tier_medium_count +""" +from __future__ import annotations + +from algorithms.transshipment import TRANSSHIPMENT_DEFAULT_PARAMS +from models_core.base import BaseDetectionModel, ModelContext, ModelResult, make_input_ref + + +class TransshipmentModel(BaseDetectionModel): + model_id = 'transshipment_5stage' + depends_on: list[str] = [] + + def run(self, ctx: ModelContext) -> ModelResult: + outputs_per_input: list[tuple[dict, dict]] = [] + tier_counts = {'CRITICAL': 0, 'HIGH': 0, 'MEDIUM': 0, 'LOW': 0} + evaluated = 0 + + for row in ctx.inputs or []: + if not row: + continue + if not row.get('transship_suspect'): + continue + evaluated += 1 + features = row.get('features') or {} + if not isinstance(features, dict): + features = {} + tier = features.get('transship_tier', '') + if tier in tier_counts: + tier_counts[tier] += 1 + outputs_per_input.append(( + make_input_ref(row.get('mmsi'), row.get('analyzed_at')), + { + 'pair_mmsi': row.get('transship_pair_mmsi'), + 'duration_min': row.get('transship_duration_min'), + 'tier': tier, + 'score': features.get('transship_score'), + }, + )) + + return ModelResult( + model_id=self.model_id, + version_id=self.version_id, + version_str=self.version_str, + role=self.role, + outputs_per_input=outputs_per_input, + metrics={ + 'evaluated_count': float(evaluated), + 'tier_critical_count': float(tier_counts['CRITICAL']), + 'tier_high_count': float(tier_counts['HIGH']), + 'tier_medium_count': float(tier_counts['MEDIUM']), + 'tier_low_count': float(tier_counts['LOW']), + }, + ) + + +__all__ = ['TransshipmentModel', 'TRANSSHIPMENT_DEFAULT_PARAMS'] diff --git a/prediction/models_core/registry.py b/prediction/models_core/registry.py new file mode 100644 index 0000000..c8151b6 --- /dev/null +++ b/prediction/models_core/registry.py @@ -0,0 +1,282 @@ +"""ModelRegistry — ACTIVE 버전 전체 인스턴스화 + DAG 검증 + 실행 플랜 생성. + +역할: +1. `prediction/models_core/registered/` 를 스캔하여 BaseDetectionModel 서브클래스를 모음 +2. DB 에서 ACTIVE 버전 목록(PRIMARY + SHADOW/CHALLENGER) 을 읽어 **버전별 인스턴스**를 생성 +3. detection_model_dependencies + 클래스 `depends_on` 을 합쳐 DAG 를 구성하고 순환 검출 +4. Executor 가 쓸 topological 실행 플랜(ExecutionPlan) 을 반환 + +주의: +- 클래스에 `model_id` 가 정의돼 있어도 DB 에 해당 레코드가 없으면 인스턴스화하지 않음 + (즉 DB 가 Single Source of Truth, 코드는 "구현 있음" 선언 역할) +- DB 에 model_id 가 있고 코드에 클래스가 없으면 경고 로그 후 **스킵** (부분 배포 허용) +""" +from __future__ import annotations + +import importlib +import logging +import pkgutil +from collections import defaultdict, deque +from dataclasses import dataclass, field +from typing import Iterable, Optional, Type + +from .base import ( + ALLOWED_ROLES, + ROLE_CHALLENGER, + ROLE_PRIMARY, + ROLE_SHADOW, + BaseDetectionModel, +) +from .params_loader import VersionRow, load_active_versions, load_dependencies + +logger = logging.getLogger(__name__) + + +@dataclass +class ExecutionPlan: + """Executor 가 따를 실행 순서. + + Attributes: + topo_order: PRIMARY 기준 topological order (model_id 문자열 리스트). + SHADOW/CHALLENGER 는 자기 model_id 의 PRIMARY 와 같은 슬롯에서 돈다. + primaries: model_id -> BaseDetectionModel 인스턴스 (PRIMARY) + shadows: model_id -> list[BaseDetectionModel] (SHADOW + CHALLENGER) + edges: DAG 디버깅용 (model_id -> set(depends_on)) + """ + topo_order: list[str] = field(default_factory=list) + primaries: dict[str, BaseDetectionModel] = field(default_factory=dict) + shadows: dict[str, list[BaseDetectionModel]] = field(default_factory=lambda: defaultdict(list)) + edges: dict[str, set[str]] = field(default_factory=lambda: defaultdict(set)) + + +class DAGCycleError(RuntimeError): + """모델 의존성 그래프에 순환이 있을 때.""" + + +class ModelRegistry: + """ACTIVE 버전 인스턴스 저장소 + Plan 제공자.""" + + _DEFAULT_REGISTERED_PKG = 'models_core.registered' + + def __init__(self, registered_pkg: str = _DEFAULT_REGISTERED_PKG) -> None: + self._registered_pkg = registered_pkg + self._classes: dict[str, Type[BaseDetectionModel]] = {} + + # ------------------------------------------------------------------ + # 클래스 discovery + # ------------------------------------------------------------------ + def discover_classes(self) -> dict[str, Type[BaseDetectionModel]]: + """`registered/` 하위 모듈 auto-import + BaseDetectionModel 서브클래스 수집. + + 동일 model_id 가 여러 클래스에서 중복 선언되면 ValueError. + """ + self._classes = {} + try: + pkg = importlib.import_module(self._registered_pkg) + except ImportError: + logger.warning('registered package %s not importable', self._registered_pkg) + return {} + + for mod_info in pkgutil.iter_modules(pkg.__path__, prefix=f'{self._registered_pkg}.'): + try: + module = importlib.import_module(mod_info.name) + except Exception: + logger.exception('failed to import %s', mod_info.name) + continue + for attr_name in dir(module): + obj = getattr(module, attr_name) + if not isinstance(obj, type): + continue + if obj is BaseDetectionModel: + continue + if not issubclass(obj, BaseDetectionModel): + continue + mid = getattr(obj, 'model_id', '') + if not mid: + continue + if mid in self._classes and self._classes[mid] is not obj: + raise ValueError( + f'duplicate model_id {mid!r}: ' + f'{self._classes[mid].__name__} vs {obj.__name__}' + ) + self._classes[mid] = obj + logger.info('discovered %d detection model classes: %s', + len(self._classes), sorted(self._classes.keys())) + return dict(self._classes) + + def register_class(self, cls: Type[BaseDetectionModel]) -> None: + """테스트·수동 등록용.""" + mid = getattr(cls, 'model_id', '') + if not mid: + raise ValueError(f'{cls.__name__}.model_id is empty') + self._classes[mid] = cls + + # ------------------------------------------------------------------ + # Plan 생성 + # ------------------------------------------------------------------ + def build_plan(self, conn, *, force_reload: bool = False) -> ExecutionPlan: + """DB ACTIVE 버전 + 클래스 + DAG 를 합쳐 ExecutionPlan 생성.""" + versions = load_active_versions(conn, force_reload=force_reload) + edges = self._collect_edges(conn, versions) + plan = self._instantiate(versions, edges) + plan.topo_order = self._topo_sort(plan) + return plan + + def build_plan_from_rows( + self, + versions: Iterable[VersionRow], + dependencies: Iterable[tuple[str, str, str]] = (), + ) -> ExecutionPlan: + """테스트용 — DB 없이 in-memory rows 만으로 Plan 생성.""" + edges: dict[str, set[str]] = defaultdict(set) + active_ids = {v['model_id'] for v in versions} + for model_id, depends_on, _key in dependencies: + if model_id in active_ids and depends_on in active_ids: + edges[model_id].add(depends_on) + # 클래스 선언 depends_on 도 합류 + for v in versions: + cls = self._classes.get(v['model_id']) + if cls is None: + continue + for dep in getattr(cls, 'depends_on', []) or []: + if dep in active_ids: + edges[v['model_id']].add(dep) + + plan = self._instantiate(versions, edges) + plan.topo_order = self._topo_sort(plan) + return plan + + # ------------------------------------------------------------------ + # 내부 + # ------------------------------------------------------------------ + def _collect_edges( + self, + conn, + versions: list[VersionRow], + ) -> dict[str, set[str]]: + """DB dependencies + 클래스 선언 depends_on 합산.""" + edges: dict[str, set[str]] = defaultdict(set) + active_ids = {v['model_id'] for v in versions} + + try: + for model_id, depends_on, _key in load_dependencies(conn): + if model_id in active_ids and depends_on in active_ids: + edges[model_id].add(depends_on) + except Exception: + logger.exception('load_dependencies failed — proceeding with class-level depends_on only') + + for v in versions: + cls = self._classes.get(v['model_id']) + if cls is None: + continue + for dep in getattr(cls, 'depends_on', []) or []: + if dep in active_ids: + edges[v['model_id']].add(dep) + return edges + + def _instantiate( + self, + versions: Iterable[VersionRow], + edges: dict[str, set[str]], + ) -> ExecutionPlan: + plan = ExecutionPlan() + plan.edges = defaultdict(set, {k: set(v) for k, v in edges.items()}) + + for v in versions: + mid = v['model_id'] + role = v['role'] + if role not in ALLOWED_ROLES: + logger.warning( + 'skip version id=%s role=%r not in %s', + v['id'], role, ALLOWED_ROLES, + ) + continue + cls = self._classes.get(mid) + if cls is None: + logger.warning( + 'model_id=%s has ACTIVE version %s(role=%s) but no registered class — skipping', + mid, v['version'], role, + ) + continue + try: + inst = cls( + version_id=v['id'], + version_str=v['version'], + role=role, + params=v['params'], + ) + except Exception: + logger.exception( + 'failed to instantiate %s version_id=%s — skipping', + cls.__name__, v['id'], + ) + continue + + if role == ROLE_PRIMARY: + if mid in plan.primaries: + # DB UNIQUE INDEX 가 보장하지만 방어적으로 + logger.error( + 'duplicate PRIMARY for %s (existing id=%s, new id=%s) — keeping existing', + mid, plan.primaries[mid].version_id, v['id'], + ) + continue + plan.primaries[mid] = inst + else: # SHADOW / CHALLENGER + plan.shadows[mid].append(inst) + return plan + + @staticmethod + def _topo_sort(plan: ExecutionPlan) -> list[str]: + """PRIMARY 노드 기준 topological order. 순환 시 DAGCycleError.""" + nodes = set(plan.primaries.keys()) | set(plan.shadows.keys()) + # SHADOW-only 모델도 노드로 취급 (PRIMARY 미등록이면 Executor 가 skip) + in_degree: dict[str, int] = {n: 0 for n in nodes} + adj: dict[str, set[str]] = defaultdict(set) + for node, deps in plan.edges.items(): + if node not in nodes: + continue + for dep in deps: + if dep not in nodes: + continue + adj[dep].add(node) + in_degree[node] = in_degree.get(node, 0) + 1 + + order: list[str] = [] + queue = deque(sorted([n for n, d in in_degree.items() if d == 0])) + while queue: + n = queue.popleft() + order.append(n) + for nxt in sorted(adj[n]): + in_degree[nxt] -= 1 + if in_degree[nxt] == 0: + queue.append(nxt) + + if len(order) != len(nodes): + remaining = [n for n in nodes if n not in order] + raise DAGCycleError( + f'DAG cycle detected among detection models: {sorted(remaining)}' + ) + return order + + +# 편의: 싱글톤 패턴 (운영 환경에서 주로 한 인스턴스만 씀) +_registry_singleton: Optional[ModelRegistry] = None + + +def get_registry() -> ModelRegistry: + global _registry_singleton + if _registry_singleton is None: + _registry_singleton = ModelRegistry() + _registry_singleton.discover_classes() + return _registry_singleton + + +__all__ = [ + 'ModelRegistry', + 'ExecutionPlan', + 'DAGCycleError', + 'get_registry', + 'ROLE_PRIMARY', + 'ROLE_SHADOW', + 'ROLE_CHALLENGER', +] diff --git a/prediction/models_core/seeds/README.md b/prediction/models_core/seeds/README.md new file mode 100644 index 0000000..862b951 --- /dev/null +++ b/prediction/models_core/seeds/README.md @@ -0,0 +1,93 @@ +# Detection Model Seeds + +Phase 2 PoC 모델을 V034 `detection_models` + `detection_model_versions` 에 seed 하는 SQL. + +## 현재 seed 대상 + +| 파일 | 모델 | tier | 모드 | +|---|---|---|---| +| `v1_dark_suspicion.sql` | `dark_suspicion` (DARK_VESSEL) | 3 | **런타임 override 완성** | +| `v1_gear_violation.sql` | `gear_violation_g01_g06` (GEAR) | 4 | **런타임 override 완성** | +| `v1_transshipment.sql` | `transshipment_5stage` (TRANSSHIP) | 4 | 카탈로그 + 관찰 | +| `v1_risk_composite.sql` | `risk_composite` (META) | 3 | 카탈로그 + 관찰 | +| `v1_pair_trawl.sql` | `pair_trawl_tier` (GEAR) | 4 | 카탈로그 + 관찰 | +| `v1_phase2_all.sql` | 5 모델 일괄 | — | \i 로 위 5개 순차 실행 | + +### "런타임 override 완성" vs "카탈로그 + 관찰" + +- **런타임 override 완성** (`dark_suspicion`, `gear_violation_g01_g06`): 알고리즘 함수가 `params: dict | None = None` 인자를 받고, ACTIVE 버전의 JSONB 를 적용해 실제 가중치·임계값이 교체된다. 운영자가 version 을 ACTIVE 로 승격하면 **다음 사이클 결과가 바뀐다**. +- **카탈로그 + 관찰** (`transshipment_5stage`, `risk_composite`, `pair_trawl_tier`): 내부 헬퍼 함수들이 모듈 레벨 상수를 직접 참조하여 범위가 큰 리팩토링이 필요. 이번 단계에서는 **DEFAULT_PARAMS 를 DB 카탈로그에 노출 + Adapter 로 결과 관찰 수집**까지만. 런타임 실제 교체는 후속 리팩토링 PR 에서 헬퍼에 params 전파를 완성하면 활성화된다. + +## 실행 방법 + +**중요**: 본 seed 파일들은 `BEGIN`/`COMMIT` 을 포함하지 않는다. 호출자가 트랜잭션을 관리한다. + +### (A) 운영 적용 — 단일 트랜잭션 자동 래핑 + +```bash +PGPASSWORD=... psql -h -U kcg-app -d kcgaidb \ + -v ON_ERROR_STOP=1 -1 \ + -f prediction/models_core/seeds/v1_dark_suspicion.sql +``` + +`-1` 플래그가 파일 전체를 한 트랜잭션으로 묶어 어느 INSERT 실패 시 전부 롤백. + +### (B) Dry-run — 실제 반영 없이 SQL 검증 + +```bash +PGPASSWORD=... psql -h -U kcg-app -d kcgaidb -v ON_ERROR_STOP=1 <<'SQL' +BEGIN; +\i prediction/models_core/seeds/v1_dark_suspicion.sql +SELECT version, status, params->'tier_thresholds' + FROM kcg.detection_model_versions WHERE model_id='dark_suspicion'; +ROLLBACK; +SQL +``` + +### ⚠️ 사용 금지 패턴 + +```bash +# 절대 금지 — 각 -c/-f 가 별도 세션이라 실제로 INSERT 됨 +psql -c 'BEGIN;' -f v1_dark_suspicion.sql -c 'ROLLBACK;' +``` + +## 운영자 승격 절차 + +1. seed 결과는 `status=DRAFT role=NULL` → prediction 은 **참조하지 않음** + (`params_loader` 는 `WHERE status='ACTIVE' AND is_enabled=TRUE`). +2. Phase 3 백엔드 API 배포 후 운영자가 다음 중 하나로 승격: + - `POST /api/ai/detection-models/{modelId}/versions/{versionId}/activate?role=SHADOW` — 관찰 전용 + - `POST /api/ai/detection-models/{modelId}/versions/{versionId}/activate?role=PRIMARY` — 운영 반영 + - `POST /api/ai/detection-models/{modelId}/versions/{versionId}/promote-primary` — SHADOW→PRIMARY 승격 (기존 PRIMARY 자동 ARCHIVED) +3. prediction 은 다음 사이클 시작 시 `params_loader` TTL(5분) 만료 후 자동 적재. +4. `PREDICTION_USE_MODEL_REGISTRY=1` 환경변수로 재기동해야 실제 실행 + (기본 `0` 은 구 경로 유지, Phase 2 diff=0 검증 전까지 안전). + +## 동치성 검증 + +각 모델의 `params` JSONB 는 Python 소스의 `*_DEFAULT_PARAMS` 상수와 1:1 일치. +정적 검증은 `prediction/tests/test_dark_suspicion_params.py::test_seed_sql_values_match_python_default` 가 담당. + +런타임 검증(신·구 경로 diff): +1. 같은 모델에 v1.0.0 (DEFAULT) 를 PRIMARY 로 seed +2. (선택) v1.0.0-shadow 를 **동일 params** 로 SHADOW 로 seed +3. `PREDICTION_USE_MODEL_REGISTRY=1` 로 5분 사이클 1회 실행 +4. `kcg.v_detection_model_comparison` 뷰에서 PRIMARY↔SHADOW `outputs` 비교 → 전 입력 동일이어야 함 + +## 롤백 + +```sql +-- 한 모델만 +DELETE FROM kcg.detection_model_versions WHERE model_id = 'dark_suspicion'; +DELETE FROM kcg.detection_models WHERE model_id = 'dark_suspicion'; + +-- Phase 2 전체 (후속 PR 반영 이후) +DELETE FROM kcg.detection_model_versions WHERE model_id IN ( + 'dark_suspicion', 'gear_violation_g01_g06', 'transshipment_5stage', + 'risk_composite', 'pair_trawl_tier' +); +DELETE FROM kcg.detection_models WHERE model_id IN ( + 'dark_suspicion', 'gear_violation_g01_g06', 'transshipment_5stage', + 'risk_composite', 'pair_trawl_tier' +); +``` diff --git a/prediction/models_core/seeds/v1_dark_suspicion.sql b/prediction/models_core/seeds/v1_dark_suspicion.sql new file mode 100644 index 0000000..1e14df5 --- /dev/null +++ b/prediction/models_core/seeds/v1_dark_suspicion.sql @@ -0,0 +1,97 @@ +-- Phase 2 PoC #1 — dark_suspicion 모델 seed +-- +-- ⚠️ 트랜잭션 제어는 호출자가 담당한다. 본 파일에는 BEGIN/COMMIT 가 없다. +-- 반드시 아래 중 하나의 방식으로 실행하라: +-- +-- (A) 단일 트랜잭션 자동 래핑 (운영 적용): +-- psql -v ON_ERROR_STOP=1 -1 -f prediction/models_core/seeds/v1_dark_suspicion.sql +-- +-- (B) dry-run (실제 반영 없이 SQL 검증만): +-- psql -v ON_ERROR_STOP=1 <.sql 하단 주석 참조. +-- +-- 전체 롤백: +-- DELETE FROM kcg.detection_model_versions WHERE model_id IN ( +-- 'dark_suspicion', 'gear_violation_g01_g06', 'transshipment_5stage', +-- 'risk_composite', 'pair_trawl_tier' +-- ); +-- DELETE FROM kcg.detection_models WHERE model_id IN ( +-- 'dark_suspicion', 'gear_violation_g01_g06', 'transshipment_5stage', +-- 'risk_composite', 'pair_trawl_tier' +-- ); + +\i prediction/models_core/seeds/v1_dark_suspicion.sql +\i prediction/models_core/seeds/v1_gear_violation.sql +\i prediction/models_core/seeds/v1_transshipment.sql +\i prediction/models_core/seeds/v1_risk_composite.sql +\i prediction/models_core/seeds/v1_pair_trawl.sql + +-- 최종 확인 +SELECT m.model_id, m.tier, m.category, m.is_enabled, + (SELECT count(*) FROM kcg.detection_model_versions v + WHERE v.model_id = m.model_id) AS versions + FROM kcg.detection_models m + WHERE m.model_id IN ( + 'dark_suspicion', 'gear_violation_g01_g06', 'transshipment_5stage', + 'risk_composite', 'pair_trawl_tier' + ) + ORDER BY m.tier, m.model_id; diff --git a/prediction/models_core/seeds/v1_risk_composite.sql b/prediction/models_core/seeds/v1_risk_composite.sql new file mode 100644 index 0000000..a2e515c --- /dev/null +++ b/prediction/models_core/seeds/v1_risk_composite.sql @@ -0,0 +1,79 @@ +-- Phase 2 PoC #4 — risk_composite 모델 seed (카탈로그 등록 + 관찰 전용) +-- +-- compute_lightweight_risk_score / compute_vessel_risk_score 가 inline 숫자를 +-- 직접 쓰고 있어 이번 버전은 params 카탈로그·관찰만 등록. 런타임 override 는 +-- 후속 리팩토링 PR 에서 활성화. + +INSERT INTO kcg.detection_models ( + model_id, display_name, tier, category, + description, entry_module, entry_callable, is_enabled +) VALUES ( + 'risk_composite', + '종합 위험도 (경량 + 파이프라인)', + 3, + 'META', + '파이프라인 미통과(경량) + 통과(정밀) 경로의 위험도 점수(0~100) + tier(CRITICAL/HIGH/MEDIUM/LOW) 산출. 수역·다크·스푸핑·허가·반복 축으로 가산. 현 버전은 params 카탈로그 등록만.', + 'models_core.registered.risk_composite_model', + 'RiskCompositeModel', + TRUE +) ON CONFLICT (model_id) DO NOTHING; + +INSERT INTO kcg.detection_model_versions ( + model_id, version, status, role, params, notes +) VALUES ( + 'risk_composite', + '1.0.0', + 'DRAFT', + NULL, + $json${ + "tier_thresholds": {"critical": 70, "high": 50, "medium": 30}, + "lightweight_weights": { + "territorial_sea": 40, + "contiguous_zone": 15, + "zone_unpermitted": 25, + "eez_lt12nm": 15, + "eez_lt24nm": 8, + "dark_suspicion_multiplier": 0.3, + "dark_gap_720_min": 25, + "dark_gap_180_min": 20, + "dark_gap_60_min": 15, + "dark_gap_30_min": 8, + "spoofing_gt07": 15, + "spoofing_gt05": 8, + "unpermitted_alone": 15, + "unpermitted_with_suspicion": 8, + "repeat_gte5": 10, + "repeat_gte2": 5 + }, + "pipeline_weights": { + "territorial_sea": 40, + "contiguous_zone": 10, + "zone_unpermitted": 25, + "territorial_fishing": 20, + "fishing_segments_any": 5, + "trawl_uturn": 10, + "teleportation": 20, + "speed_jumps_ge3": 10, + "speed_jumps_ge1": 5, + "critical_gaps_ge60": 15, + "any_gaps": 5, + "unpermitted": 20 + }, + "dark_suspicion_fallback_gap_min": { + "very_long_720": 720, + "long_180": 180, + "mid_60": 60, + "short_30": 30 + }, + "spoofing_thresholds": {"high_0.7": 0.7, "medium_0.5": 0.5}, + "eez_proximity_nm": {"inner_12": 12, "outer_24": 24}, + "repeat_thresholds": {"h24_high": 5, "h24_low": 2} + }$json$::jsonb, + 'Phase 2 PoC #4 seed. Python RISK_COMPOSITE_DEFAULT_PARAMS 와 1:1 일치.' +) ON CONFLICT (model_id, version) DO NOTHING; + +-- 확인 +SELECT model_id, is_enabled, + (SELECT count(*) FROM kcg.detection_model_versions v WHERE v.model_id = m.model_id) AS versions + FROM kcg.detection_models m + WHERE model_id = 'risk_composite'; diff --git a/prediction/models_core/seeds/v1_transshipment.sql b/prediction/models_core/seeds/v1_transshipment.sql new file mode 100644 index 0000000..2f6a269 --- /dev/null +++ b/prediction/models_core/seeds/v1_transshipment.sql @@ -0,0 +1,51 @@ +-- Phase 2 PoC #3 — transshipment_5stage 모델 seed (카탈로그 등록 + 관찰 전용) +-- +-- 본 버전은 `params` 를 DB 에 노출하지만 런타임 override 는 아직 반영하지 않는다. +-- 내부 헬퍼 함수들(_is_proximity / _is_approach / _evict_expired)이 모듈 레벨 상수를 +-- 직접 참조하므로, 후속 리팩토링 PR 에서 params 전파를 완성하면 런타임 값 교체가 +-- 가능해진다. 카탈로그·관찰만으로 Phase 2 PoC 의 "모델 단위 분리" 가치는 확보. +-- +-- 실행 방법 + 롤백은 seeds/README.md 참조. + +INSERT INTO kcg.detection_models ( + model_id, display_name, tier, category, + description, entry_module, entry_callable, is_enabled +) VALUES ( + 'transshipment_5stage', + '환적 의심 5단계 필터 (이종 쌍 → 감시영역 → APPROACH → RENDEZVOUS → 점수)', + 4, + 'TRANSSHIP', + '어선 ↔ 운반선 이종 쌍을 감시영역 내에서만 추적해 APPROACH → RENDEZVOUS → DEPARTURE 패턴을 검증하고 점수 산출. 현 버전은 params 카탈로그 등록만, 런타임 override 는 후속 PR 에서.', + 'models_core.registered.transshipment_model', + 'TransshipmentModel', + TRUE +) ON CONFLICT (model_id) DO NOTHING; + +INSERT INTO kcg.detection_model_versions ( + model_id, version, status, role, params, notes +) VALUES ( + 'transshipment_5stage', + '1.0.0', + 'DRAFT', + NULL, + $json${ + "sog_threshold_kn": 2.0, + "proximity_deg": 0.002, + "approach_deg": 0.01, + "rendezvous_min": 90, + "pair_expiry_min": 240, + "gap_tolerance_cycles": 3, + "fishing_kinds": ["000020"], + "carrier_kinds": ["000023", "000024"], + "excluded_ship_ty": ["AtoN", "Anti Pollution", "Law Enforcement", "Medical Transport", "Passenger", "Pilot Boat", "Search And Rescue", "Tug"], + "carrier_hints": ["cargo", "tanker", "supply", "carrier", "reefer"], + "min_score": 50 + }$json$::jsonb, + 'Phase 2 PoC #3 seed. Python TRANSSHIPMENT_DEFAULT_PARAMS 와 1:1 일치. 현 버전은 카탈로그만, 런타임 override 는 후속 PR.' +) ON CONFLICT (model_id, version) DO NOTHING; + +-- 확인 +SELECT model_id, is_enabled, + (SELECT count(*) FROM kcg.detection_model_versions v WHERE v.model_id = m.model_id) AS versions + FROM kcg.detection_models m + WHERE model_id = 'transshipment_5stage'; diff --git a/prediction/scheduler.py b/prediction/scheduler.py index fd9a030..a843cdb 100644 --- a/prediction/scheduler.py +++ b/prediction/scheduler.py @@ -74,6 +74,42 @@ def _fetch_dark_history(kcg_conn, mmsi_list: list[str]) -> dict[str, dict]: return {} +def _run_detection_model_registry(cycle_started_at, results): + """Phase 1-2 — ACTIVE 버전 인스턴스를 모두 돌려서 비교·관측 데이터를 남긴다. + + 신 경로는 기존 사이클 결과(`results`)를 대체하지 않는다. ctx.inputs 로 + 전달되어 각 모델이 **같은 입력에 대해** PRIMARY/SHADOW 결과를 내도록 한다. + + Phase 2 에서 실제 모델 클래스가 추가되기 전까지는 ACTIVE 버전이 없어 + 사실상 no-op 에 가깝다. 구 경로와의 공존을 위해 항상 try/except 로 감싼다. + """ + from db import kcgdb + from models_core.base import ModelContext + from models_core.executor import DAGExecutor + from models_core.registry import get_registry + + registry = get_registry() + with kcgdb.get_conn() as conn: + try: + plan = registry.build_plan(conn) + except Exception: + logger.exception('detection model plan build failed — skipping registry stage') + return + + if not plan.primaries and not plan.shadows: + logger.info('detection model registry: no ACTIVE versions — nothing to run') + return + + from dataclasses import asdict + inputs = [asdict(r) for r in (results or [])] + ctx = ModelContext( + cycle_started_at=cycle_started_at, + conn=conn, + inputs=inputs, + ) + DAGExecutor(plan).run(ctx) + + def get_last_run() -> dict: return _last_run.copy() @@ -790,6 +826,22 @@ def run_analysis_cycle(): except Exception as e: logger.exception('failed to cache analysis context for chat: %s', e) + # 10. Detection Model Registry (Phase 1-2) + # PREDICTION_USE_MODEL_REGISTRY=1 일 때만 신 경로 실행. 기본은 구 경로만. + # 이 분기는 기존 사이클 결과를 건드리지 않고, ACTIVE 버전들의 결과를 + # detection_model_run_outputs / detection_model_metrics 에 기록한다. + try: + from models_core.feature_flag import use_model_registry + if use_model_registry(): + run_stage( + 'detection_model_registry', + _run_detection_model_registry, + cycle_started_at=datetime.fromisoformat(_last_run['timestamp']), + results=results, + ) + except Exception as e: + logger.exception('detection model registry stage setup failed: %s', e) + elapsed = round(time.time() - start, 2) _last_run['duration_sec'] = elapsed _last_run['vessel_count'] = len(results) diff --git a/prediction/scripts/diagnostic-snapshot.sh b/prediction/scripts/diagnostic-snapshot.sh index 2403925..376965e 100644 --- a/prediction/scripts/diagnostic-snapshot.sh +++ b/prediction/scripts/diagnostic-snapshot.sh @@ -55,6 +55,22 @@ FROM kcg.vessel_analysis_results WHERE analyzed_at > now() - interval '5 minutes'; SQL +echo "" +echo "--- 1b. SPOOFING signal health (silent-vs-fault 구분) ---" +# gt0 > 0 인데 gt0.5 = 0 이면 "알고리즘 동작 + threshold 미돌파" (정상), +# gt0 = 0 이면 "알고리즘 자체가 계산을 못 하고 있음" (silent fault) → 로그 추적 필요. +$PSQL_TABLE << 'SQL' +SELECT count(*) total, + count(*) FILTER (WHERE spoofing_score > 0) gt0, + count(*) FILTER (WHERE spoofing_score > 0.3) gt03, + count(*) FILTER (WHERE spoofing_score > 0.5) gt05, + count(*) FILTER (WHERE spoofing_score > 0.7) gt07, + round(avg(spoofing_score)::numeric, 4) avg_score, + round(max(spoofing_score)::numeric, 4) max_score +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '5 minutes'; +SQL + #=================================================================== # PART 2: 다크베셀 심층 진단 #=================================================================== @@ -346,15 +362,64 @@ FROM kcg.prediction_kpi_realtime ORDER BY kpi_key; SQL #=================================================================== -# PART 7: 사이클 로그 + 에러 +# PART 6.5: V030 + V034 관찰 (원시 테이블) +#=================================================================== +echo "" +echo "=================================================================" +echo "PART 6.5: V030 gear_identity_collisions + V034 detection_model_*" +echo "=================================================================" + +echo "" +echo "--- 6.5-1. gear_identity_collisions severity x status (1h) ---" +$PSQL_TABLE << 'SQL' +SELECT severity, status, count(*) cnt, max(last_seen_at) last_seen +FROM kcg.gear_identity_collisions +WHERE last_seen_at > now() - interval '1 hour' +GROUP BY severity, status ORDER BY cnt DESC; +SQL + +echo "" +echo "--- 6.5-2. detection_models + 버전 상태 (Phase 1-2 이후 활성) ---" +$PSQL_TABLE << 'SQL' +SELECT count(*) AS catalog, + count(*) FILTER (WHERE is_enabled) AS enabled +FROM kcg.detection_models; +SQL + +$PSQL_TABLE << 'SQL' +SELECT status, coalesce(role,'(null)') role, count(*) cnt +FROM kcg.detection_model_versions +GROUP BY status, role ORDER BY status, role; +SQL + +echo "" +echo "--- 6.5-3. detection_model_run_outputs 5분 적재 (feature flag ON 시 증가) ---" +$PSQL_TABLE << 'SQL' +SELECT model_id, role, count(*) rows +FROM kcg.detection_model_run_outputs +WHERE cycle_started_at > now() - interval '5 minutes' +GROUP BY model_id, role ORDER BY rows DESC LIMIT 10; +SQL + +#=================================================================== +# PART 7: 사이클 로그 + 에러 + stage timing #=================================================================== echo "" echo "=================================================================" echo "PART 7: 사이클 로그 (최근 6분)" echo "=================================================================" +# stage_runner (Phase 0-1) + DAGExecutor (Phase 1-2) 로그 추가 journalctl -u kcg-ai-prediction --since '6 minutes ago' --no-pager 2>/dev/null | \ - grep -E 'analysis cycle:|lightweight|pipeline dark:|event_generator:|pair_trawl|gear_violation|GEAR_ILLEGAL|ERROR|Traceback' | \ - tail -20 + grep -E 'analysis cycle:|lightweight|pipeline dark:|event_generator:|pair_trawl|gear_violation|GEAR_ILLEGAL|stage [a-z_]+ (ok|failed)|DAGExecutor done|detection model registry|ERROR|Traceback' | \ + tail -40 + +echo "" +echo "--- 7-1. STAGE TIMING (소요시간 상위 + 실패) ---" +journalctl -u kcg-ai-prediction --since '6 minutes ago' --no-pager 2>/dev/null | \ + grep -oE 'stage [a-z_@.[:blank:][:digit:].-]+ (ok in [0-9.]+s|failed)' | \ + awk '/failed/ {print "FAIL " $0; next} + /ok in/ {n=split($0,a," "); sec=a[n]; sub(/s$/,"",sec); printf "%8.2fs %s\n", sec, $0}' | \ + sort -rn | awk 'NR<=8 || /^FAIL/' | head -20 #=================================================================== # PART 7.5: 한중어업협정 레지스트리 매칭 (V029) diff --git a/prediction/scripts/hourly-analysis-snapshot.sh b/prediction/scripts/hourly-analysis-snapshot.sh index cb0fa72..5534555 100755 --- a/prediction/scripts/hourly-analysis-snapshot.sh +++ b/prediction/scripts/hourly-analysis-snapshot.sh @@ -37,6 +37,21 @@ SELECT count(*) total, FROM kcg.vessel_analysis_results WHERE analyzed_at > now() - interval '1 hour'; +\echo +\echo === 1a. SPOOFING signal health (silent-vs-fault 구분) === +-- spoof_hi=0 이 "고장"인지 "신호 없음"인지 구분하려면 gt0 / gt03 / gt05 / max 를 모두 본다. +-- gt0 가 0 이면 파이프라인이 spoofing_score 를 아예 계산하지 못하고 있다는 신호 (원인 추적 필요). +-- gt0>0 인데 gt05=0 이면 알고리즘은 동작 중이나 threshold 돌파 대상이 없다 (정상일 수 있음). +SELECT count(*) total, + count(*) FILTER (WHERE spoofing_score > 0) gt0, + count(*) FILTER (WHERE spoofing_score > 0.3) gt03, + count(*) FILTER (WHERE spoofing_score > 0.5) gt05, + count(*) FILTER (WHERE spoofing_score > 0.7) gt07, + round(avg(spoofing_score)::numeric, 4) avg_score, + round(max(spoofing_score)::numeric, 4) max_score +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '1 hour'; + \echo \echo === 2. ZONE x DARK x GEAR_VIOLATION distribution === SELECT zone_code, @@ -369,18 +384,101 @@ SELECT date_trunc('hour', occurred_at AT TIME ZONE 'Asia/Seoul') hr, count(*) FILTER (WHERE category='EEZ_INTRUSION') eez, count(*) FILTER (WHERE category='GEAR_ILLEGAL') gear_illegal, count(*) FILTER (WHERE category='HIGH_RISK_VESSEL') high_risk, + count(*) FILTER (WHERE category='GEAR_IDENTITY_COLLISION') gear_collide, count(*) FILTER (WHERE level='CRITICAL') critical FROM kcg.prediction_events WHERE created_at > now() - interval '24 hours' GROUP BY hr ORDER BY hr DESC LIMIT 25; +\echo +\echo =================================================================== +\echo === V030 GEAR_IDENTITY_COLLISIONS (원시 테이블 관찰) +\echo =================================================================== +\echo +\echo === V030-1. severity x status 분포 (24h) === +SELECT severity, status, count(*) cnt, + max(last_seen_at) last_seen +FROM kcg.gear_identity_collisions +WHERE last_seen_at > now() - interval '24 hours' +GROUP BY severity, status ORDER BY cnt DESC; + +\echo +\echo === V030-2. coexistence/swap 상위 20건 (24h) === +SELECT name, mmsi_lo, mmsi_hi, severity, status, + coexistence_count coex, swap_count swap, + round(max_distance_km::numeric, 1) max_km +FROM kcg.gear_identity_collisions +WHERE last_seen_at > now() - interval '24 hours' +ORDER BY (coexistence_count + swap_count * 5) DESC LIMIT 20; + +\echo +\echo =================================================================== +\echo === V034 DETECTION_MODEL REGISTRY (Phase 1-2) +\echo =================================================================== +\echo +\echo === V034-1. model catalog + enabled 여부 === +SELECT count(*) catalog_total, + count(*) FILTER (WHERE is_enabled) enabled +FROM kcg.detection_models; + +\echo +\echo === V034-2. version 상태 x role 분포 === +SELECT status, coalesce(role,'(null)') role, count(*) cnt +FROM kcg.detection_model_versions +GROUP BY status, role ORDER BY status, role; + +\echo +\echo === V034-3. detection_model_run_outputs 1h 적재 현황 (feature flag ON 시 증가) === +SELECT model_id, role, count(*) rows, + min(cycle_started_at) oldest, max(cycle_started_at) newest +FROM kcg.detection_model_run_outputs +WHERE cycle_started_at > now() - interval '1 hour' +GROUP BY model_id, role ORDER BY rows DESC; + +\echo +\echo === V034-4. detection_model_metrics 최신 5 모델 평균 소요 === +SELECT model_id, role, + round(avg(metric_value) FILTER (WHERE metric_key='cycle_duration_ms')::numeric, 1) avg_ms, + round(avg(metric_value) FILTER (WHERE metric_key='output_count')::numeric, 1) avg_out +FROM kcg.detection_model_metrics +WHERE cycle_started_at > now() - interval '1 hour' +GROUP BY model_id, role ORDER BY model_id, role; + +\echo +\echo === C1. stats_hourly vs raw events 카테고리 drift (event_generator silent drop 감시) === +-- raw prediction_events 에는 있지만 stats_hourly.by_category 에는 없는 카테고리 (반대도 표시) +WITH recent_events AS ( + SELECT DISTINCT category FROM kcg.prediction_events + WHERE created_at > now() - interval '2 hours' +), +stats_cats AS ( + SELECT DISTINCT jsonb_object_keys(by_category) AS category + FROM kcg.prediction_stats_hourly + WHERE stat_hour > now() - interval '2 hours' +) +SELECT 'only_in_events' gap, category FROM recent_events + WHERE category NOT IN (SELECT category FROM stats_cats) +UNION ALL +SELECT 'only_in_stats', category FROM stats_cats + WHERE category NOT IN (SELECT category FROM recent_events); + SQL echo "" echo "=== 13. CYCLE LOG (last 65 min) ===" +# stage_runner, DAGExecutor, detection_model_registry, Traceback 까지 함께 추적 journalctl -u kcg-ai-prediction --since '65 minutes ago' --no-pager 2>/dev/null | \ - grep -E 'lightweight|event_generator:|stats_aggregator hourly|kpi_writer:|analysis cycle:|pair_trawl|gear_violation|GEAR_ILLEGAL|ERROR|Traceback' | \ - tail -60 + grep -E 'lightweight|event_generator:|stats_aggregator hourly|kpi_writer:|analysis cycle:|pair_trawl|gear_violation|GEAR_ILLEGAL|stage [a-z_]+ (ok|failed)|DAGExecutor done|detection model registry|ERROR|Traceback' | \ + tail -80 + +echo "" +echo "=== 14. STAGE TIMING (last 65 min, 소요시간 상위 10 + 실패 전체) ===" +# stage ok in X.XXs / stage failed after 를 수집하여 실패+장시간 스테이지 식별 +journalctl -u kcg-ai-prediction --since '65 minutes ago' --no-pager 2>/dev/null | \ + grep -oE 'stage [a-z_@.[:blank:][:digit:].-]+ (ok in [0-9.]+s|failed)' | \ + awk '/failed/ {print "FAIL " $0; next} + /ok in/ {n=split($0,a," "); sec=a[n]; sub(/s$/,"",sec); printf "%8.2fs %s\n", sec, $0}' | \ + sort -rn | awk 'NR<=10 || /^FAIL/' | head -40 echo "" echo "=== END ===" diff --git a/prediction/tests/test_dark_suspicion_params.py b/prediction/tests/test_dark_suspicion_params.py new file mode 100644 index 0000000..c1045b9 --- /dev/null +++ b/prediction/tests/test_dark_suspicion_params.py @@ -0,0 +1,159 @@ +"""Phase 2 PoC #1 — dark_suspicion params 외부화 동치성 테스트. + +이 파일은 pandas 미설치 환경에서도 실행 가능하도록 구성한다. +`_merge_default_params` 와 DEFAULT_PARAMS 상수 자체만 단독 검증. + +`compute_dark_suspicion` 전체 E2E 는 pandas 가 설치된 prediction 환경에서 +수동으로 한 사이클 실행하여 신·구 diff=0 을 확인한다 (seed SQL 안내 참조). +""" +from __future__ import annotations + +import importlib +import json +import os +import sys +import types +import unittest + +# pandas 미설치 환경 우회 — algorithms.dark_vessel 이 pandas 를 top-level import +# 하므로, 그 import 를 stub 으로 대체해 DEFAULT_PARAMS 와 _merge_default_params 만 +# 추출한다. +if 'pandas' not in sys.modules: + pd_stub = types.ModuleType('pandas') + pd_stub.DataFrame = type('DataFrame', (), {}) # annotation 용 dummy + pd_stub.Timestamp = type('Timestamp', (), {}) + sys.modules['pandas'] = pd_stub + +# pydantic_settings stub (다른 테스트와 동일 관용) +if 'pydantic_settings' not in sys.modules: + stub = types.ModuleType('pydantic_settings') + + class _S: + def __init__(self, **kw): + for name, value in self.__class__.__dict__.items(): + if name.isupper(): + setattr(self, name, kw.get(name, value)) + + stub.BaseSettings = _S + sys.modules['pydantic_settings'] = stub + +# algorithms.location 도 top-level 의 haversine_nm import 가 있으므로 stub +if 'algorithms' not in sys.modules: + pkg = types.ModuleType('algorithms') + pkg.__path__ = [os.path.join(os.path.dirname(__file__), '..', 'algorithms')] + sys.modules['algorithms'] = pkg + +if 'algorithms.location' not in sys.modules: + loc = types.ModuleType('algorithms.location') + loc.haversine_nm = lambda a, b, c, d: 0.0 # pragma: no cover + sys.modules['algorithms.location'] = loc + +# 이제 dark_vessel 의 DEFAULT_PARAMS 와 _merge_default_params 만 조용히 import +dv = importlib.import_module('algorithms.dark_vessel') + + +class DarkSuspicionParamsTest(unittest.TestCase): + + def test_default_params_shape(self): + """DEFAULT_PARAMS 는 11개 패턴 + tier_thresholds + sog_thresholds 를 포함한다.""" + p = dv.DARK_SUSPICION_DEFAULT_PARAMS + self.assertIn('weights', p) + self.assertIn('tier_thresholds', p) + self.assertEqual(p['tier_thresholds'], {'critical': 70, 'high': 50, 'watch': 30}) + # 11 패턴 기본 가중치 키 + weights = p['weights'] + for key in [ + 'P1_moving_off', 'P1_slow_moving_off', + 'P2_sensitive_zone', 'P2_special_zone', + 'P3_repeat_high', 'P3_repeat_low', 'P3_recent_dark', + 'P4_distance_anomaly', + 'P5_daytime_fishing_off', + 'P6_teleport_before_gap', + 'P7_unpermitted', + 'P8_very_long_gap', 'P8_long_gap', + 'P9_fishing_vessel_dark', 'P9_cargo_natural_gap', + 'P10_underway_deliberate', 'P10_anchored_natural', + 'P11_heading_cog_mismatch', + 'out_of_coverage', + ]: + self.assertIn(key, weights, f'weights.{key} missing') + + def test_merge_none_returns_default_reference(self): + """params=None 이면 DEFAULT 그대로 사용 (Phase 2 이전과 동일 동작).""" + self.assertIs(dv._merge_default_params(None), dv.DARK_SUSPICION_DEFAULT_PARAMS) + + def test_merge_empty_dict_returns_default_equivalent(self): + """params={} 면 DEFAULT 와 key-level 완전 동일.""" + merged = dv._merge_default_params({}) + self.assertEqual(merged, dv.DARK_SUSPICION_DEFAULT_PARAMS) + + def test_merge_override_replaces_only_given_keys(self): + """override 는 해당 key 만 교체, 나머지는 DEFAULT 유지.""" + override = {'tier_thresholds': {'critical': 80}} + merged = dv._merge_default_params(override) + # critical 만 교체됨 + self.assertEqual(merged['tier_thresholds']['critical'], 80) + # high/watch 는 DEFAULT 유지 + self.assertEqual(merged['tier_thresholds']['high'], 50) + self.assertEqual(merged['tier_thresholds']['watch'], 30) + # weights 같은 다른 최상위 키는 DEFAULT 유지 + self.assertEqual( + merged['weights']['P1_moving_off'], + dv.DARK_SUSPICION_DEFAULT_PARAMS['weights']['P1_moving_off'], + ) + # override 가 DEFAULT 를 변조하지 않는다 (불변성) + self.assertEqual( + dv.DARK_SUSPICION_DEFAULT_PARAMS['tier_thresholds']['critical'], 70, + ) + + def test_seed_sql_values_match_python_default(self): + """seed SQL 의 params JSONB 가 Python DEFAULT 와 1:1 일치하는지 정적 검증.""" + seed_path = os.path.join( + os.path.dirname(__file__), '..', + 'models_core', 'seeds', 'v1_dark_suspicion.sql', + ) + with open(seed_path, 'r', encoding='utf-8') as f: + sql = f.read() + + # $json$...$json$ 블록에서 JSON 추출 + start = sql.index('$json$') + len('$json$') + end = sql.index('$json$', start) + raw = sql[start:end].strip() + params = json.loads(raw) + + self.assertEqual( + params['tier_thresholds'], + dv.DARK_SUSPICION_DEFAULT_PARAMS['tier_thresholds'], + ) + self.assertEqual( + params['weights'], + dv.DARK_SUSPICION_DEFAULT_PARAMS['weights'], + ) + self.assertEqual( + params['sog_thresholds'], + dv.DARK_SUSPICION_DEFAULT_PARAMS['sog_thresholds'], + ) + self.assertEqual( + params['repeat_thresholds'], + dv.DARK_SUSPICION_DEFAULT_PARAMS['repeat_thresholds'], + ) + self.assertEqual( + params['gap_min_thresholds'], + dv.DARK_SUSPICION_DEFAULT_PARAMS['gap_min_thresholds'], + ) + self.assertEqual( + params['heading_cog_mismatch_deg'], + dv.DARK_SUSPICION_DEFAULT_PARAMS['heading_cog_mismatch_deg'], + ) + self.assertEqual( + params['p4_distance_multiplier'], + dv.DARK_SUSPICION_DEFAULT_PARAMS['p4_distance_multiplier'], + ) + self.assertEqual( + list(params['p5_daytime_range']), + list(dv.DARK_SUSPICION_DEFAULT_PARAMS['p5_daytime_range']), + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/prediction/tests/test_gear_violation_params.py b/prediction/tests/test_gear_violation_params.py new file mode 100644 index 0000000..91a7b34 --- /dev/null +++ b/prediction/tests/test_gear_violation_params.py @@ -0,0 +1,125 @@ +"""Phase 2 PoC #2 — gear_violation_g01_g06 params 외부화 동치성 테스트. + +pandas 미설치 환경을 우회하기 위해 dark_suspicion 테스트와 동일한 stub 패턴 사용. +""" +from __future__ import annotations + +import importlib +import json +import os +import sys +import types +import unittest + +# pandas stub (annotation 용) +if 'pandas' not in sys.modules: + pd_stub = types.ModuleType('pandas') + pd_stub.DataFrame = type('DataFrame', (), {}) + pd_stub.Timestamp = type('Timestamp', (), {}) + sys.modules['pandas'] = pd_stub + +if 'pydantic_settings' not in sys.modules: + stub = types.ModuleType('pydantic_settings') + + class _S: + def __init__(self, **kw): + for name, value in self.__class__.__dict__.items(): + if name.isupper(): + setattr(self, name, kw.get(name, value)) + + stub.BaseSettings = _S + sys.modules['pydantic_settings'] = stub + +if 'algorithms' not in sys.modules: + pkg = types.ModuleType('algorithms') + pkg.__path__ = [os.path.join(os.path.dirname(__file__), '..', 'algorithms')] + sys.modules['algorithms'] = pkg + +gv = importlib.import_module('algorithms.gear_violation') + + +class GearViolationParamsTest(unittest.TestCase): + + def test_default_params_shape(self): + p = gv.GEAR_VIOLATION_DEFAULT_PARAMS + self.assertIn('scores', p) + self.assertIn('signal_cycling', p) + self.assertIn('gear_drift_threshold_nm', p) + self.assertIn('fixed_gear_types', p) + self.assertIn('fishery_code_allowed_gear', p) + # 6 G-codes 점수 키 전부 있는지 + for k in ['G01_zone_violation', 'G02_closed_season', 'G03_unregistered_gear', + 'G04_signal_cycling', 'G05_gear_drift', 'G06_pair_trawl']: + self.assertIn(k, p['scores']) + + def test_default_values_match_module_constants(self): + """DEFAULT_PARAMS 는 모듈 레벨 상수와 완전히 동일해야 한다 (SSOT 이중성 방지).""" + p = gv.GEAR_VIOLATION_DEFAULT_PARAMS + self.assertEqual(p['scores']['G01_zone_violation'], gv.G01_SCORE) + self.assertEqual(p['scores']['G02_closed_season'], gv.G02_SCORE) + self.assertEqual(p['scores']['G03_unregistered_gear'], gv.G03_SCORE) + self.assertEqual(p['scores']['G04_signal_cycling'], gv.G04_SCORE) + self.assertEqual(p['scores']['G05_gear_drift'], gv.G05_SCORE) + self.assertEqual(p['scores']['G06_pair_trawl'], gv.G06_SCORE) + self.assertEqual(p['signal_cycling']['gap_min'], gv.SIGNAL_CYCLING_GAP_MIN) + self.assertEqual(p['signal_cycling']['min_count'], gv.SIGNAL_CYCLING_MIN_COUNT) + self.assertAlmostEqual(p['gear_drift_threshold_nm'], gv.GEAR_DRIFT_THRESHOLD_NM) + self.assertEqual(set(p['fixed_gear_types']), gv.FIXED_GEAR_TYPES) + # fishery_code_allowed_gear: list ↔ set 변환 후 비교 + for key, allowed in gv.FISHERY_CODE_ALLOWED_GEAR.items(): + self.assertEqual(set(p['fishery_code_allowed_gear'][key]), allowed) + + def test_merge_none_returns_default_reference(self): + self.assertIs(gv._merge_default_gv_params(None), gv.GEAR_VIOLATION_DEFAULT_PARAMS) + + def test_merge_override_replaces_only_given_keys(self): + override = {'scores': {'G06_pair_trawl': 99}} + merged = gv._merge_default_gv_params(override) + self.assertEqual(merged['scores']['G06_pair_trawl'], 99) + # 다른 score 는 DEFAULT 유지 + self.assertEqual( + merged['scores']['G01_zone_violation'], + gv.GEAR_VIOLATION_DEFAULT_PARAMS['scores']['G01_zone_violation'], + ) + # fixed_gear_types 같은 top-level 키도 DEFAULT 유지 + self.assertEqual( + merged['fixed_gear_types'], + gv.GEAR_VIOLATION_DEFAULT_PARAMS['fixed_gear_types'], + ) + # DEFAULT 는 변조되지 않음 + self.assertEqual( + gv.GEAR_VIOLATION_DEFAULT_PARAMS['scores']['G06_pair_trawl'], 20, + ) + + def test_seed_sql_values_match_python_default(self): + """seed SQL JSONB ↔ Python DEFAULT 1:1 정적 검증.""" + seed_path = os.path.join( + os.path.dirname(__file__), '..', + 'models_core', 'seeds', 'v1_gear_violation.sql', + ) + with open(seed_path, 'r', encoding='utf-8') as f: + sql = f.read() + + start = sql.index('$json$') + len('$json$') + end = sql.index('$json$', start) + raw = sql[start:end].strip() + params = json.loads(raw) + + expected = gv.GEAR_VIOLATION_DEFAULT_PARAMS + self.assertEqual(params['scores'], expected['scores']) + self.assertEqual(params['signal_cycling'], expected['signal_cycling']) + self.assertAlmostEqual( + params['gear_drift_threshold_nm'], expected['gear_drift_threshold_nm'] + ) + # list 는 순서 무관하게 set 비교 (DB 에 저장 시 어떤 순서든 상관 없음) + self.assertEqual(set(params['fixed_gear_types']), + set(expected['fixed_gear_types'])) + for code, allowed in expected['fishery_code_allowed_gear'].items(): + self.assertEqual( + set(params['fishery_code_allowed_gear'][code]), set(allowed), + f'fishery_code_allowed_gear[{code}] mismatch', + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/prediction/tests/test_models_core.py b/prediction/tests/test_models_core.py new file mode 100644 index 0000000..10b90ad --- /dev/null +++ b/prediction/tests/test_models_core.py @@ -0,0 +1,431 @@ +"""models_core 기반 인프라 (Phase 1-2) 유닛테스트. + +DB·서버 없이 순수 파이썬 레벨에서 다음을 검증: +- params_loader 캐시 TTL 동작 +- ModelRegistry discover + 버전별 인스턴스화 +- DAG topo 정렬 + 순환 검출 +- DAGExecutor 의 오염 차단 불변식 (SHADOW 결과는 ctx.shared 에 들어가지 않음) +- PRIMARY 실패 시 후행 모델 skip +- SHADOW 전용(PRIMARY 없음) 모델 스킵 경고 +- run_stage 와의 통합 — 예외가 한 버전에 격리되는지 + +실제 DB 상호작용은 Phase 1-3 testcontainers 기반에서 수행 (후속 커밋). +""" +from __future__ import annotations + +import sys +import types +import unittest +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Optional + +# pydantic_settings stub (기존 test_time_bucket 관용) +_stub = types.ModuleType('pydantic_settings') + + +class _StubBaseSettings: + def __init__(self, **kwargs): + for name, value in self.__class__.__dict__.items(): + if name.isupper(): + setattr(self, name, kwargs.get(name, value)) + + +_stub.BaseSettings = _StubBaseSettings +sys.modules.setdefault('pydantic_settings', _stub) + +from models_core import base as mc_base +from models_core.base import ( + BaseDetectionModel, + ModelContext, + ModelResult, + ROLE_PRIMARY, + ROLE_SHADOW, + make_input_ref, +) +from models_core import params_loader +from models_core.executor import DAGExecutor +from models_core.registry import DAGCycleError, ModelRegistry + + +# ====================================================================== +# Fixture 클래스들 +# ====================================================================== +@dataclass +class _Call: + model_id: str + role: str + version_id: int + + +def _make_model_class(mid: str, depends: Optional[list] = None, *, raise_for_role: Optional[str] = None): + """동적으로 BaseDetectionModel 서브클래스 생성.""" + + class _M(BaseDetectionModel): + model_id = mid + depends_on = list(depends or []) + + def run(self, ctx: ModelContext) -> ModelResult: + if raise_for_role and self.role == raise_for_role: + raise RuntimeError(f'intentional failure in {mid}@{self.role}') + ctx.extras.setdefault('_calls', []).append( + _Call(self.model_id, self.role, self.version_id) + ) + # input_ref 스키마를 PRIMARY/SHADOW 동일 유지 + out_per = [ + (make_input_ref('412000001'), {'score': 1.0 if self.role == ROLE_PRIMARY else 1.5}), + ] + return ModelResult( + model_id=self.model_id, + version_id=self.version_id, + version_str=self.version_str, + role=self.role, + outputs_per_input=out_per, + metrics={'sentinel': float(self.version_id)}, + ) + + _M.__name__ = f'_M_{mid.replace(".", "_")}' + return _M + + +def _version_row(id_, model_id, role, version='1.0.0', params=None): + return params_loader.VersionRow( + id=id_, model_id=model_id, role=role, version=version, params=params or {} + ) + + +# ====================================================================== +# params_loader 캐시 +# ====================================================================== +class ParamsCacheTest(unittest.TestCase): + + def setUp(self): + params_loader.invalidate_cache() + + def test_invalidate_forces_reload(self): + calls = {'n': 0} + + def fake_fetch(conn): + calls['n'] += 1 + return [_version_row(1, 'a', ROLE_PRIMARY)] + + orig = params_loader._fetch_active_versions + params_loader._fetch_active_versions = fake_fetch + try: + rows1 = params_loader.load_active_versions(conn=None) + rows2 = params_loader.load_active_versions(conn=None) + self.assertEqual(calls['n'], 1) # 두 번째는 캐시 HIT + self.assertEqual(len(rows1), 1) + self.assertEqual(len(rows2), 1) + + params_loader.invalidate_cache() + params_loader.load_active_versions(conn=None) + self.assertEqual(calls['n'], 2) + finally: + params_loader._fetch_active_versions = orig + params_loader.invalidate_cache() + + def test_force_reload_bypasses_ttl(self): + calls = {'n': 0} + + def fake_fetch(conn): + calls['n'] += 1 + return [] + + orig = params_loader._fetch_active_versions + params_loader._fetch_active_versions = fake_fetch + try: + params_loader.load_active_versions(conn=None) + params_loader.load_active_versions(conn=None, force_reload=True) + self.assertEqual(calls['n'], 2) + finally: + params_loader._fetch_active_versions = orig + params_loader.invalidate_cache() + + +# ====================================================================== +# Registry topo 정렬 + DAG 검증 +# ====================================================================== +class RegistryTopoTest(unittest.TestCase): + + def _registry_with(self, *model_ids_with_deps): + """[(model_id, [dep_ids]), ...] 에 맞춘 Registry 생성.""" + reg = ModelRegistry() + for mid, deps in model_ids_with_deps: + reg.register_class(_make_model_class(mid, deps)) + return reg + + def test_topo_order_respects_dependencies(self): + reg = self._registry_with( + ('a', []), + ('b', ['a']), + ('c', ['b']), + ) + rows = [ + _version_row(10, 'a', ROLE_PRIMARY), + _version_row(11, 'b', ROLE_PRIMARY), + _version_row(12, 'c', ROLE_PRIMARY), + ] + plan = reg.build_plan_from_rows(rows) + self.assertEqual(plan.topo_order, ['a', 'b', 'c']) + + def test_cycle_detection(self): + reg = self._registry_with( + ('a', ['b']), + ('b', ['a']), + ) + rows = [ + _version_row(1, 'a', ROLE_PRIMARY), + _version_row(2, 'b', ROLE_PRIMARY), + ] + with self.assertRaises(DAGCycleError): + reg.build_plan_from_rows(rows) + + def test_shadow_version_attaches_to_primary_model(self): + reg = self._registry_with(('a', [])) + rows = [ + _version_row(1, 'a', ROLE_PRIMARY, version='1.0.0'), + _version_row(2, 'a', ROLE_SHADOW, version='1.1.0-shadow'), + _version_row(3, 'a', ROLE_SHADOW, version='1.2.0-shadow'), + ] + plan = reg.build_plan_from_rows(rows) + self.assertIn('a', plan.primaries) + self.assertEqual(plan.primaries['a'].version_id, 1) + self.assertEqual(len(plan.shadows['a']), 2) + + def test_unknown_model_id_skipped(self): + reg = ModelRegistry() # 클래스 없음 + rows = [_version_row(1, 'ghost', ROLE_PRIMARY)] + plan = reg.build_plan_from_rows(rows) + self.assertNotIn('ghost', plan.primaries) + + def test_class_depends_on_added_to_edges(self): + reg = self._registry_with( + ('base', []), + ('child', ['base']), + ) + rows = [ + _version_row(1, 'base', ROLE_PRIMARY), + _version_row(2, 'child', ROLE_PRIMARY), + ] + plan = reg.build_plan_from_rows(rows) + self.assertIn('base', plan.edges['child']) + + +# ====================================================================== +# DAGExecutor 불변식 +# ====================================================================== +class DAGExecutorTest(unittest.TestCase): + + def _collect_persisted(self): + """persist 훅 2개를 만들어 호출을 가로채는 pair 반환.""" + persisted_rows: list[ModelResult] = [] + persisted_metrics: list[ModelResult] = [] + + def p_rows(result: ModelResult, cycle_started_at, *, conn=None): + persisted_rows.append(result) + + def p_metrics(result: ModelResult, cycle_started_at, *, conn=None): + persisted_metrics.append(result) + + return persisted_rows, persisted_metrics, p_rows, p_metrics + + def _ctx(self): + return ModelContext(cycle_started_at=datetime(2026, 4, 20, 0, 0, tzinfo=timezone.utc)) + + def test_primary_result_injected_into_shared(self): + reg = ModelRegistry() + reg.register_class(_make_model_class('a')) + reg.register_class(_make_model_class('b', ['a'])) + rows = [ + _version_row(1, 'a', ROLE_PRIMARY), + _version_row(2, 'b', ROLE_PRIMARY), + ] + plan = reg.build_plan_from_rows(rows) + + pr, pm, p1, p2 = self._collect_persisted() + ctx = self._ctx() + DAGExecutor(plan, persist_fn=p1, persist_metrics_fn=p2).run(ctx) + + self.assertIn('a', ctx.shared) + self.assertIn('b', ctx.shared) + self.assertEqual(ctx.shared['a'].role, ROLE_PRIMARY) + + def test_shadow_result_not_injected_into_shared(self): + """가장 중요한 불변식 — SHADOW 결과가 ctx.shared 에 들어가면 오염.""" + reg = ModelRegistry() + reg.register_class(_make_model_class('m')) + rows = [ + _version_row(1, 'm', ROLE_PRIMARY), + _version_row(2, 'm', ROLE_SHADOW), + ] + plan = reg.build_plan_from_rows(rows) + + pr, pm, p1, p2 = self._collect_persisted() + ctx = self._ctx() + DAGExecutor(plan, persist_fn=p1, persist_metrics_fn=p2).run(ctx) + + # shared 는 PRIMARY 만 + self.assertEqual(ctx.shared['m'].role, ROLE_PRIMARY) + self.assertEqual(ctx.shared['m'].version_id, 1) + + # 저장은 둘 다 된다 + persisted_roles = {r.role for r in pr} + self.assertIn(ROLE_PRIMARY, persisted_roles) + self.assertIn(ROLE_SHADOW, persisted_roles) + + def test_downstream_sees_primary_only_even_when_shadow_differs(self): + """SHADOW 가 다른 값을 리턴해도 후행 PRIMARY 는 선행 PRIMARY 결과만 소비.""" + + class M_A(BaseDetectionModel): + model_id = 'a' + depends_on = [] + + def run(self, ctx): + val = 100 if self.role == ROLE_PRIMARY else 999 + return ModelResult( + model_id='a', version_id=self.version_id, + version_str=self.version_str, role=self.role, + outputs_per_input=[(make_input_ref('x'), {'v': val})], + metrics={}, + ) + + observed = {'downstream_seen_value': None} + + class M_B(BaseDetectionModel): + model_id = 'b' + depends_on = ['a'] + + def run(self, ctx): + upstream = ctx.shared.get('a') + observed['downstream_seen_value'] = ( + upstream.outputs_per_input[0][1]['v'] if upstream else None + ) + return ModelResult( + model_id='b', version_id=self.version_id, + version_str=self.version_str, role=self.role, + outputs_per_input=[(make_input_ref('x'), {'echo': observed['downstream_seen_value']})], + ) + + reg = ModelRegistry() + reg.register_class(M_A) + reg.register_class(M_B) + rows = [ + _version_row(1, 'a', ROLE_PRIMARY), + _version_row(2, 'a', ROLE_SHADOW), + _version_row(3, 'b', ROLE_PRIMARY), + ] + plan = reg.build_plan_from_rows(rows) + + pr, pm, p1, p2 = self._collect_persisted() + ctx = self._ctx() + DAGExecutor(plan, persist_fn=p1, persist_metrics_fn=p2).run(ctx) + + # downstream 이 본 값은 PRIMARY(100), SHADOW(999) 가 아님 + self.assertEqual(observed['downstream_seen_value'], 100) + + def test_primary_failure_skips_downstream(self): + reg = ModelRegistry() + reg.register_class(_make_model_class('a', raise_for_role=ROLE_PRIMARY)) + reg.register_class(_make_model_class('b', ['a'])) + rows = [ + _version_row(1, 'a', ROLE_PRIMARY), + _version_row(2, 'b', ROLE_PRIMARY), + ] + plan = reg.build_plan_from_rows(rows) + + pr, pm, p1, p2 = self._collect_persisted() + ctx = self._ctx() + summary = DAGExecutor(plan, persist_fn=p1, persist_metrics_fn=p2).run(ctx) + + self.assertNotIn('a', ctx.shared) + self.assertNotIn('b', ctx.shared) + self.assertGreaterEqual(summary['failed'], 1) + self.assertGreaterEqual(summary['skipped_missing_deps'], 1) + + def test_shadow_failure_does_not_affect_primary_or_persist(self): + cls_ok_primary = _make_model_class('m') + cls_bad_shadow = _make_model_class('m', raise_for_role=ROLE_SHADOW) + # 같은 model_id 를 다른 클래스로 덮으면 Registry 가 ValueError — 대신 같은 클래스 재사용 + reg = ModelRegistry() + reg.register_class(_make_model_class('m', raise_for_role=ROLE_SHADOW)) + rows = [ + _version_row(1, 'm', ROLE_PRIMARY), + _version_row(2, 'm', ROLE_SHADOW), + ] + plan = reg.build_plan_from_rows(rows) + + pr, pm, p1, p2 = self._collect_persisted() + ctx = self._ctx() + summary = DAGExecutor(plan, persist_fn=p1, persist_metrics_fn=p2).run(ctx) + + self.assertEqual(summary['executed'], 1) # PRIMARY 성공 + self.assertEqual(summary['shadow_failed'], 1) + self.assertEqual(summary['shadow_ran'], 0) + # PRIMARY 는 persist 된다 + self.assertEqual([r.role for r in pr], [ROLE_PRIMARY]) + + def test_shadow_only_without_primary_is_skipped(self): + reg = ModelRegistry() + reg.register_class(_make_model_class('orphan')) + rows = [_version_row(1, 'orphan', ROLE_SHADOW)] + plan = reg.build_plan_from_rows(rows) + + pr, pm, p1, p2 = self._collect_persisted() + ctx = self._ctx() + summary = DAGExecutor(plan, persist_fn=p1, persist_metrics_fn=p2).run(ctx) + self.assertEqual(summary['executed'], 0) + self.assertNotIn('orphan', ctx.shared) + + +class SilentErrorGuardTest(unittest.TestCase): + """V034 스키마 컬럼 사이즈 초과 silent 실패 방지.""" + + def test_model_id_too_long_rejected_at_instantiation(self): + class _TooLong(BaseDetectionModel): + model_id = 'x' * 65 # VARCHAR(64) 초과 + + def run(self, ctx): # pragma: no cover + return ModelResult( + model_id=self.model_id, version_id=self.version_id, + version_str=self.version_str, role=self.role, + ) + + with self.assertRaises(ValueError): + _TooLong(version_id=1, version_str='1', role=ROLE_PRIMARY, params={}) + + def test_long_metric_key_dropped_with_warning(self): + """_persist_metrics 가 64자 초과 metric_key 를 dropna silent 로 저장하지 않는다.""" + from models_core import executor as ex + + # fake conn (cursor context manager 불필요 — _execute_insert 가 단순 호출) + captured_rows: list = [] + + def fake_exec(sql, rows, *, conn=None): + captured_rows.extend(rows) + + orig = ex._execute_insert + ex._execute_insert = fake_exec + try: + r = ModelResult( + model_id='m', version_id=1, version_str='1', role=ROLE_PRIMARY, + outputs_per_input=[], + metrics={ + 'ok_key': 1.0, + 'x' * 65: 2.0, # 초과 + }, + duration_ms=10, + ) + ex._persist_metrics(r, cycle_started_at=datetime(2026, 4, 20)) + keys = [row[3] for row in captured_rows] # 4번째 컬럼이 metric_key + self.assertIn('ok_key', keys) + self.assertNotIn('x' * 65, keys) + # cycle_duration_ms / output_count 기본값은 포함 + self.assertIn('cycle_duration_ms', keys) + self.assertIn('output_count', keys) + finally: + ex._execute_insert = orig + + +if __name__ == '__main__': + unittest.main() diff --git a/prediction/tests/test_pair_trawl_params.py b/prediction/tests/test_pair_trawl_params.py new file mode 100644 index 0000000..c19100c --- /dev/null +++ b/prediction/tests/test_pair_trawl_params.py @@ -0,0 +1,66 @@ +"""Phase 2 PoC #5 — pair_trawl_tier DEFAULT_PARAMS ↔ seed SQL 정적 일치.""" +from __future__ import annotations + +import importlib +import json +import os +import sys +import types +import unittest + +if 'pandas' not in sys.modules: + pd_stub = types.ModuleType('pandas') + pd_stub.DataFrame = type('DataFrame', (), {}) + pd_stub.Timestamp = type('Timestamp', (), {}) + sys.modules['pandas'] = pd_stub + +if 'pydantic_settings' not in sys.modules: + stub = types.ModuleType('pydantic_settings') + + class _S: + def __init__(self, **kw): + for name, value in self.__class__.__dict__.items(): + if name.isupper(): + setattr(self, name, kw.get(name, value)) + + stub.BaseSettings = _S + sys.modules['pydantic_settings'] = stub + +if 'algorithms' not in sys.modules: + pkg = types.ModuleType('algorithms') + pkg.__path__ = [os.path.join(os.path.dirname(__file__), '..', 'algorithms')] + sys.modules['algorithms'] = pkg + + +class PairTrawlParamsTest(unittest.TestCase): + + def test_seed_matches_default(self): + pt = importlib.import_module('algorithms.pair_trawl') + seed_path = os.path.join( + os.path.dirname(__file__), '..', + 'models_core', 'seeds', 'v1_pair_trawl.sql', + ) + with open(seed_path, 'r', encoding='utf-8') as f: + sql = f.read() + start = sql.index('$json$') + len('$json$') + end = sql.index('$json$', start) + params = json.loads(sql[start:end].strip()) + self.assertEqual(params, pt.PAIR_TRAWL_DEFAULT_PARAMS) + + def test_default_values_match_module_constants(self): + pt = importlib.import_module('algorithms.pair_trawl') + d = pt.PAIR_TRAWL_DEFAULT_PARAMS + self.assertEqual(d['strong']['proximity_nm'], pt.PROXIMITY_NM) + self.assertEqual(d['strong']['sog_delta_max'], pt.SOG_DELTA_MAX) + self.assertEqual(d['strong']['cog_delta_max'], pt.COG_DELTA_MAX) + self.assertEqual(d['strong']['min_sync_cycles'], pt.MIN_SYNC_CYCLES) + self.assertEqual(d['strong']['simultaneous_gap_min'], pt.SIMULTANEOUS_GAP_MIN) + self.assertEqual(d['probable']['min_block_cycles'], pt.PROBABLE_MIN_BLOCK_CYCLES) + self.assertEqual(d['probable']['min_sync_ratio'], pt.PROBABLE_MIN_SYNC_RATIO) + self.assertEqual(d['suspect']['min_block_cycles'], pt.SUSPECT_MIN_BLOCK_CYCLES) + self.assertEqual(d['suspect']['min_sync_ratio'], pt.SUSPECT_MIN_SYNC_RATIO) + self.assertEqual(d['candidate_scan']['cell_size_deg'], pt.CELL_SIZE) + + +if __name__ == '__main__': + unittest.main() diff --git a/prediction/tests/test_risk_composite_params.py b/prediction/tests/test_risk_composite_params.py new file mode 100644 index 0000000..32e2683 --- /dev/null +++ b/prediction/tests/test_risk_composite_params.py @@ -0,0 +1,75 @@ +"""Phase 2 PoC #4 — risk_composite DEFAULT_PARAMS ↔ seed SQL 정적 일치 테스트.""" +from __future__ import annotations + +import importlib +import json +import os +import sys +import types +import unittest + +if 'pandas' not in sys.modules: + pd_stub = types.ModuleType('pandas') + pd_stub.DataFrame = type('DataFrame', (), {}) + pd_stub.Timestamp = type('Timestamp', (), {}) + sys.modules['pandas'] = pd_stub + +if 'pydantic_settings' not in sys.modules: + stub = types.ModuleType('pydantic_settings') + + class _S: + def __init__(self, **kw): + for name, value in self.__class__.__dict__.items(): + if name.isupper(): + setattr(self, name, kw.get(name, value)) + + stub.BaseSettings = _S + sys.modules['pydantic_settings'] = stub + +if 'algorithms' not in sys.modules: + pkg = types.ModuleType('algorithms') + pkg.__path__ = [os.path.join(os.path.dirname(__file__), '..', 'algorithms')] + sys.modules['algorithms'] = pkg + +# risk.py 는 algorithms.location/fishing_pattern/dark_vessel/spoofing 을 top-level +# import 한다. dark_vessel 만 실제 모듈 그대로 두고 나머지는 필요한 심볼만 stub. +if 'algorithms.location' in sys.modules: + if not hasattr(sys.modules['algorithms.location'], 'classify_zone'): + sys.modules['algorithms.location'].classify_zone = lambda *a, **k: {} +else: + loc = types.ModuleType('algorithms.location') + loc.haversine_nm = lambda a, b, c, d: 0.0 + loc.classify_zone = lambda *a, **k: {} + sys.modules['algorithms.location'] = loc + +for mod_name, attrs in [ + ('algorithms.fishing_pattern', ['detect_fishing_segments', 'detect_trawl_uturn']), + ('algorithms.spoofing', ['detect_teleportation', 'count_speed_jumps']), +]: + if mod_name not in sys.modules: + m = types.ModuleType(mod_name) + sys.modules[mod_name] = m + m = sys.modules[mod_name] + for a in attrs: + if not hasattr(m, a): + setattr(m, a, lambda *_a, **_kw: []) + + +class RiskCompositeParamsTest(unittest.TestCase): + + def test_seed_matches_default(self): + risk = importlib.import_module('algorithms.risk') + seed_path = os.path.join( + os.path.dirname(__file__), '..', + 'models_core', 'seeds', 'v1_risk_composite.sql', + ) + with open(seed_path, 'r', encoding='utf-8') as f: + sql = f.read() + start = sql.index('$json$') + len('$json$') + end = sql.index('$json$', start) + params = json.loads(sql[start:end].strip()) + self.assertEqual(params, risk.RISK_COMPOSITE_DEFAULT_PARAMS) + + +if __name__ == '__main__': + unittest.main() diff --git a/prediction/tests/test_transshipment_params.py b/prediction/tests/test_transshipment_params.py new file mode 100644 index 0000000..c686a16 --- /dev/null +++ b/prediction/tests/test_transshipment_params.py @@ -0,0 +1,88 @@ +"""Phase 2 PoC #3 — transshipment_5stage params 카탈로그 동치성 테스트. + +런타임 override 는 후속 PR 에서 활성화되므로, 이 테스트는 **DEFAULT_PARAMS +↔ 모듈 상수 ↔ seed SQL JSONB** 3 자 일치만 검증한다. +""" +from __future__ import annotations + +import importlib +import json +import os +import sys +import types +import unittest + +# pandas/pydantic_settings stub (다른 phase 2 테스트와 동일 관용) +if 'pandas' not in sys.modules: + pd_stub = types.ModuleType('pandas') + pd_stub.DataFrame = type('DataFrame', (), {}) + pd_stub.Timestamp = type('Timestamp', (), {}) + sys.modules['pandas'] = pd_stub + +if 'pydantic_settings' not in sys.modules: + stub = types.ModuleType('pydantic_settings') + + class _S: + def __init__(self, **kw): + for name, value in self.__class__.__dict__.items(): + if name.isupper(): + setattr(self, name, kw.get(name, value)) + + stub.BaseSettings = _S + sys.modules['pydantic_settings'] = stub + +if 'algorithms' not in sys.modules: + pkg = types.ModuleType('algorithms') + pkg.__path__ = [os.path.join(os.path.dirname(__file__), '..', 'algorithms')] + sys.modules['algorithms'] = pkg + +# fleet_tracker 의 GEAR_PATTERN 을 transshipment.py 상단에서 import 하므로 stub +if 'fleet_tracker' not in sys.modules: + ft_stub = types.ModuleType('fleet_tracker') + import re as _re + ft_stub.GEAR_PATTERN = _re.compile(r'^xxx$') + sys.modules['fleet_tracker'] = ft_stub + +ts = importlib.import_module('algorithms.transshipment') + + +class TransshipmentParamsTest(unittest.TestCase): + + def test_default_values_match_module_constants(self): + p = ts.TRANSSHIPMENT_DEFAULT_PARAMS + self.assertEqual(p['sog_threshold_kn'], ts.SOG_THRESHOLD_KN) + self.assertEqual(p['proximity_deg'], ts.PROXIMITY_DEG) + self.assertEqual(p['approach_deg'], ts.APPROACH_DEG) + self.assertEqual(p['rendezvous_min'], ts.RENDEZVOUS_MIN) + self.assertEqual(p['pair_expiry_min'], ts.PAIR_EXPIRY_MIN) + self.assertEqual(p['gap_tolerance_cycles'], ts.GAP_TOLERANCE_CYCLES) + self.assertEqual(set(p['fishing_kinds']), set(ts._FISHING_KINDS)) + self.assertEqual(set(p['carrier_kinds']), set(ts._CARRIER_KINDS)) + self.assertEqual(set(p['excluded_ship_ty']), set(ts._EXCLUDED_SHIP_TY)) + self.assertEqual(list(p['carrier_hints']), list(ts._CARRIER_HINTS)) + + def test_seed_sql_values_match_python_default(self): + seed_path = os.path.join( + os.path.dirname(__file__), '..', + 'models_core', 'seeds', 'v1_transshipment.sql', + ) + with open(seed_path, 'r', encoding='utf-8') as f: + sql = f.read() + + start = sql.index('$json$') + len('$json$') + end = sql.index('$json$', start) + raw = sql[start:end].strip() + params = json.loads(raw) + + expected = ts.TRANSSHIPMENT_DEFAULT_PARAMS + for scalar_key in ['sog_threshold_kn', 'proximity_deg', 'approach_deg', + 'rendezvous_min', 'pair_expiry_min', 'gap_tolerance_cycles', + 'min_score']: + self.assertEqual(params[scalar_key], expected[scalar_key], scalar_key) + for list_key in ['fishing_kinds', 'carrier_kinds', 'excluded_ship_ty', + 'carrier_hints']: + self.assertEqual(set(params[list_key]), set(expected[list_key]), list_key) + + +if __name__ == '__main__': + unittest.main()