AnalysisResult 에 lat/lon 필드 + to_db_tuple 반영 + upsert_results SQL 컬럼 추가. 분류 파이프라인(last_row) / 경량 분석(all_positions) 두 경로 모두 분석 시점의 선박 위치를 함께 기록해 프론트 미니맵에서 특이운항 판별 위치를 실제 항적 위에 표시할 수 있게 한다. 배포 후 첫 사이클 8173/8173 lat/lon non-null 확인.
136 lines
4.2 KiB
Python
136 lines
4.2 KiB
Python
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
|
|
@dataclass
|
|
class AnalysisResult:
|
|
"""vessel_analysis_results 테이블 29컬럼 매핑."""
|
|
|
|
mmsi: str
|
|
timestamp: datetime
|
|
|
|
# 분류 결과
|
|
vessel_type: str = 'UNKNOWN'
|
|
confidence: float = 0.0
|
|
fishing_pct: float = 0.0
|
|
cluster_id: int = -1
|
|
season: str = 'UNKNOWN'
|
|
|
|
# ALGO 01: 위치
|
|
zone: str = 'EEZ_OR_BEYOND'
|
|
dist_to_baseline_nm: float = 999.0
|
|
lat: Optional[float] = None
|
|
lon: Optional[float] = None
|
|
|
|
# ALGO 02: 활동 상태
|
|
activity_state: str = 'UNKNOWN'
|
|
ucaf_score: float = 0.0
|
|
ucft_score: float = 0.0
|
|
|
|
# ALGO 03: 다크 베셀
|
|
is_dark: bool = False
|
|
gap_duration_min: int = 0
|
|
|
|
# ALGO 04: GPS 스푸핑
|
|
spoofing_score: float = 0.0
|
|
bd09_offset_m: float = 0.0
|
|
speed_jump_count: int = 0
|
|
|
|
# ALGO 05+06: 선단
|
|
cluster_size: int = 0
|
|
is_leader: bool = False
|
|
fleet_role: str = 'NOISE'
|
|
|
|
# ALGO 07: 위험도
|
|
risk_score: int = 0
|
|
risk_level: str = 'LOW'
|
|
|
|
# ALGO 08: 환적 의심
|
|
is_transship_suspect: bool = False
|
|
transship_pair_mmsi: str = ''
|
|
transship_duration_min: int = 0
|
|
|
|
# 특징 벡터
|
|
features: dict = field(default_factory=dict)
|
|
|
|
# ALGO 09: 어구 위반 판정
|
|
gear_judgment: str = ''
|
|
|
|
# 등록 선박 어구 코드 (fleet_vessels.gear_code: C21=PT, C22=OT 등)
|
|
gear_code: Optional[str] = None
|
|
|
|
# 메타
|
|
analyzed_at: Optional[datetime] = None
|
|
|
|
def __post_init__(self):
|
|
if self.analyzed_at is None:
|
|
self.analyzed_at = datetime.now(timezone.utc)
|
|
|
|
def to_db_tuple(self) -> tuple:
|
|
import json
|
|
|
|
def _f(v: object) -> float:
|
|
"""numpy float → Python float 변환."""
|
|
return float(v) if v is not None else 0.0
|
|
|
|
def _i(v: object) -> int:
|
|
"""numpy int → Python int 변환."""
|
|
return int(v) if v is not None else 0
|
|
|
|
# features dict 내부 numpy 값도 변환 (재귀적 처리)
|
|
# int/float/bool/str/None/list/dict 모두 허용 (JSON 호환 타입만 유지)
|
|
def _sanitize(v):
|
|
if v is None or isinstance(v, (str, bool)):
|
|
return v
|
|
if isinstance(v, (int, float)):
|
|
return float(v) if isinstance(v, float) else int(v)
|
|
if isinstance(v, dict):
|
|
return {str(k): _sanitize(val) for k, val in v.items()}
|
|
if isinstance(v, (list, tuple)):
|
|
return [_sanitize(x) for x in v]
|
|
# numpy 스칼라 등은 float 변환 시도, 실패 시 str
|
|
try:
|
|
return float(v)
|
|
except (TypeError, ValueError):
|
|
return str(v)
|
|
|
|
safe_features = _sanitize(self.features) if self.features else {}
|
|
|
|
def _opt_f(v: object) -> Optional[float]:
|
|
"""Optional float (None 유지)."""
|
|
return float(v) if v is not None else None
|
|
|
|
return (
|
|
str(self.mmsi),
|
|
self.analyzed_at, # analyzed_at (PK 파티션키)
|
|
str(self.vessel_type),
|
|
_f(self.confidence),
|
|
_f(self.fishing_pct),
|
|
_i(self.cluster_id),
|
|
str(self.season),
|
|
_opt_f(self.lat),
|
|
_opt_f(self.lon),
|
|
str(self.zone), # → zone_code
|
|
_f(self.dist_to_baseline_nm),
|
|
str(self.activity_state),
|
|
_f(self.ucaf_score),
|
|
_f(self.ucft_score),
|
|
bool(self.is_dark),
|
|
_i(self.gap_duration_min),
|
|
_f(self.spoofing_score),
|
|
_f(self.bd09_offset_m),
|
|
_i(self.speed_jump_count),
|
|
_i(self.cluster_id), # → fleet_cluster_id
|
|
bool(self.is_leader), # → fleet_is_leader
|
|
str(self.fleet_role),
|
|
_i(self.risk_score),
|
|
str(self.risk_level),
|
|
bool(self.is_transship_suspect), # → transship_suspect
|
|
str(self.transship_pair_mmsi),
|
|
_i(self.transship_duration_min),
|
|
json.dumps(safe_features),
|
|
str(self.gear_judgment) if self.gear_judgment else None,
|
|
str(self.gear_code) if self.gear_code else None,
|
|
)
|