kcg-ai-monitoring/prediction/models/result.py
htlee dac4a3bda2 fix(prediction): features JSONB 중첩 구조 sanitize
AnalysisResult.to_db_tuple이 기존에 features dict 값을 모두 float로
변환했는데, dark_suspicion 구조를 넣으면서 dark_patterns(list) 등
비스칼라 타입이 포함되어 upsert 실패 (float argument not a list).

_sanitize 재귀 함수로 JSON 호환 타입(str/int/float/bool/list/dict/None)을
그대로 보존하도록 변경.
2026-04-09 07:56:04 +09:00

120 lines
3.6 KiB
Python

from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
@dataclass
class AnalysisResult:
"""vessel_analysis_results 테이블 28컬럼 매핑."""
mmsi: str
timestamp: datetime
# 분류 결과
vessel_type: str = 'UNKNOWN'
confidence: float = 0.0
fishing_pct: float = 0.0
cluster_id: int = -1
season: str = 'UNKNOWN'
# ALGO 01: 위치
zone: str = 'EEZ_OR_BEYOND'
dist_to_baseline_nm: float = 999.0
# ALGO 02: 활동 상태
activity_state: str = 'UNKNOWN'
ucaf_score: float = 0.0
ucft_score: float = 0.0
# ALGO 03: 다크 베셀
is_dark: bool = False
gap_duration_min: int = 0
# ALGO 04: GPS 스푸핑
spoofing_score: float = 0.0
bd09_offset_m: float = 0.0
speed_jump_count: int = 0
# ALGO 05+06: 선단
cluster_size: int = 0
is_leader: bool = False
fleet_role: str = 'NOISE'
# ALGO 07: 위험도
risk_score: int = 0
risk_level: str = 'LOW'
# ALGO 08: 환적 의심
is_transship_suspect: bool = False
transship_pair_mmsi: str = ''
transship_duration_min: int = 0
# 특징 벡터
features: dict = field(default_factory=dict)
# 메타
analyzed_at: Optional[datetime] = None
def __post_init__(self):
if self.analyzed_at is None:
self.analyzed_at = datetime.now(timezone.utc)
def to_db_tuple(self) -> tuple:
import json
def _f(v: object) -> float:
"""numpy float → Python float 변환."""
return float(v) if v is not None else 0.0
def _i(v: object) -> int:
"""numpy int → Python int 변환."""
return int(v) if v is not None else 0
# features dict 내부 numpy 값도 변환 (재귀적 처리)
# int/float/bool/str/None/list/dict 모두 허용 (JSON 호환 타입만 유지)
def _sanitize(v):
if v is None or isinstance(v, (str, bool)):
return v
if isinstance(v, (int, float)):
return float(v) if isinstance(v, float) else int(v)
if isinstance(v, dict):
return {str(k): _sanitize(val) for k, val in v.items()}
if isinstance(v, (list, tuple)):
return [_sanitize(x) for x in v]
# numpy 스칼라 등은 float 변환 시도, 실패 시 str
try:
return float(v)
except (TypeError, ValueError):
return str(v)
safe_features = _sanitize(self.features) if self.features else {}
return (
str(self.mmsi),
self.analyzed_at, # analyzed_at (PK 파티션키)
str(self.vessel_type),
_f(self.confidence),
_f(self.fishing_pct),
_i(self.cluster_id),
str(self.season),
str(self.zone), # → zone_code
_f(self.dist_to_baseline_nm),
str(self.activity_state),
_f(self.ucaf_score),
_f(self.ucft_score),
bool(self.is_dark),
_i(self.gap_duration_min),
_f(self.spoofing_score),
_f(self.bd09_offset_m),
_i(self.speed_jump_count),
_i(self.cluster_id), # → fleet_cluster_id
bool(self.is_leader), # → fleet_is_leader
str(self.fleet_role),
_i(self.risk_score),
str(self.risk_level),
bool(self.is_transship_suspect), # → transship_suspect
str(self.transship_pair_mmsi),
_i(self.transship_duration_min),
json.dumps(safe_features),
)