kcg-ai-monitoring/prediction/models/result.py
htlee 14eb4c7ea3 feat(prediction): vessel_analysis_results 에 분석 시점 lat/lon 저장
AnalysisResult 에 lat/lon 필드 + to_db_tuple 반영 + upsert_results SQL
컬럼 추가. 분류 파이프라인(last_row) / 경량 분석(all_positions) 두 경로
모두 분석 시점의 선박 위치를 함께 기록해 프론트 미니맵에서 특이운항
판별 위치를 실제 항적 위에 표시할 수 있게 한다.

배포 후 첫 사이클 8173/8173 lat/lon non-null 확인.
2026-04-16 14:30:49 +09:00

136 lines
4.2 KiB
Python

from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
@dataclass
class AnalysisResult:
"""vessel_analysis_results 테이블 29컬럼 매핑."""
mmsi: str
timestamp: datetime
# 분류 결과
vessel_type: str = 'UNKNOWN'
confidence: float = 0.0
fishing_pct: float = 0.0
cluster_id: int = -1
season: str = 'UNKNOWN'
# ALGO 01: 위치
zone: str = 'EEZ_OR_BEYOND'
dist_to_baseline_nm: float = 999.0
lat: Optional[float] = None
lon: Optional[float] = None
# ALGO 02: 활동 상태
activity_state: str = 'UNKNOWN'
ucaf_score: float = 0.0
ucft_score: float = 0.0
# ALGO 03: 다크 베셀
is_dark: bool = False
gap_duration_min: int = 0
# ALGO 04: GPS 스푸핑
spoofing_score: float = 0.0
bd09_offset_m: float = 0.0
speed_jump_count: int = 0
# ALGO 05+06: 선단
cluster_size: int = 0
is_leader: bool = False
fleet_role: str = 'NOISE'
# ALGO 07: 위험도
risk_score: int = 0
risk_level: str = 'LOW'
# ALGO 08: 환적 의심
is_transship_suspect: bool = False
transship_pair_mmsi: str = ''
transship_duration_min: int = 0
# 특징 벡터
features: dict = field(default_factory=dict)
# ALGO 09: 어구 위반 판정
gear_judgment: str = ''
# 등록 선박 어구 코드 (fleet_vessels.gear_code: C21=PT, C22=OT 등)
gear_code: Optional[str] = None
# 메타
analyzed_at: Optional[datetime] = None
def __post_init__(self):
if self.analyzed_at is None:
self.analyzed_at = datetime.now(timezone.utc)
def to_db_tuple(self) -> tuple:
import json
def _f(v: object) -> float:
"""numpy float → Python float 변환."""
return float(v) if v is not None else 0.0
def _i(v: object) -> int:
"""numpy int → Python int 변환."""
return int(v) if v is not None else 0
# features dict 내부 numpy 값도 변환 (재귀적 처리)
# int/float/bool/str/None/list/dict 모두 허용 (JSON 호환 타입만 유지)
def _sanitize(v):
if v is None or isinstance(v, (str, bool)):
return v
if isinstance(v, (int, float)):
return float(v) if isinstance(v, float) else int(v)
if isinstance(v, dict):
return {str(k): _sanitize(val) for k, val in v.items()}
if isinstance(v, (list, tuple)):
return [_sanitize(x) for x in v]
# numpy 스칼라 등은 float 변환 시도, 실패 시 str
try:
return float(v)
except (TypeError, ValueError):
return str(v)
safe_features = _sanitize(self.features) if self.features else {}
def _opt_f(v: object) -> Optional[float]:
"""Optional float (None 유지)."""
return float(v) if v is not None else None
return (
str(self.mmsi),
self.analyzed_at, # analyzed_at (PK 파티션키)
str(self.vessel_type),
_f(self.confidence),
_f(self.fishing_pct),
_i(self.cluster_id),
str(self.season),
_opt_f(self.lat),
_opt_f(self.lon),
str(self.zone), # → zone_code
_f(self.dist_to_baseline_nm),
str(self.activity_state),
_f(self.ucaf_score),
_f(self.ucft_score),
bool(self.is_dark),
_i(self.gap_duration_min),
_f(self.spoofing_score),
_f(self.bd09_offset_m),
_i(self.speed_jump_count),
_i(self.cluster_id), # → fleet_cluster_id
bool(self.is_leader), # → fleet_is_leader
str(self.fleet_role),
_i(self.risk_score),
str(self.risk_level),
bool(self.is_transship_suspect), # → transship_suspect
str(self.transship_pair_mmsi),
_i(self.transship_duration_min),
json.dumps(safe_features),
str(self.gear_judgment) if self.gear_judgment else None,
str(self.gear_code) if self.gear_code else None,
)