iran prediction 47개 Python 파일을 prediction/ 디렉토리로 복제: - algorithms/ 14개 분석 알고리즘 (어구추론, 다크베셀, 스푸핑, 환적, 위험도 등) - pipeline/ 7단계 분류 파이프라인 - cache/vessel_store (24h 슬라이딩 윈도우) - db/ 어댑터 (snpdb 원본조회, kcgdb 결과저장) - chat/ AI 채팅 (Ollama, 후순위) - data/ 정적 데이터 (기선, 특정어업수역 GeoJSON) config.py를 kcgaidb로 재구성 (DB명, 사용자, 비밀번호) DB 연결 검증 완료 (kcgaidb 37개 테이블 접근 확인) Makefile에 dev-prediction / dev-all 타겟 추가 CLAUDE.md에 prediction 섹션 추가 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1478 lines
59 KiB
Python
1478 lines
59 KiB
Python
"""어구 그룹 대표 모선 추론."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import math
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any, Optional
|
|
|
|
from algorithms.gear_correlation import _get_vessel_track
|
|
from algorithms.gear_parent_episode import (
|
|
build_episode_plan,
|
|
compute_prior_bonus_components,
|
|
group_to_episode_input,
|
|
insert_episode_snapshots,
|
|
load_active_episode_states,
|
|
load_episode_prior_stats,
|
|
load_label_prior_stats,
|
|
load_lineage_prior_stats,
|
|
sync_episode_states,
|
|
)
|
|
from algorithms.gear_name_rules import is_trackable_parent_name, normalize_parent_name
|
|
from algorithms.track_similarity import compute_track_similarity_v2, _resample_temporal, haversine_m
|
|
|
|
_KST = timezone(timedelta(hours=9))
|
|
|
|
|
|
def _to_epoch_ms(ts) -> int:
|
|
"""timestamp를 epoch_ms로 변환. tz-naive는 KST로 간주."""
|
|
if hasattr(ts, 'timestamp'):
|
|
if hasattr(ts, 'tzinfo') and ts.tzinfo is not None:
|
|
return int(ts.timestamp() * 1000)
|
|
# tz-naive → KST wall-clock으로 간주
|
|
import pandas as pd
|
|
if isinstance(ts, pd.Timestamp):
|
|
return int(ts.tz_localize(_KST).timestamp() * 1000)
|
|
return int(ts.replace(tzinfo=_KST).timestamp() * 1000)
|
|
return int(ts)
|
|
from config import qualified_table
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
FLEET_VESSELS = qualified_table('fleet_vessels')
|
|
GROUP_POLYGON_SNAPSHOTS = qualified_table('group_polygon_snapshots')
|
|
GEAR_CORRELATION_SCORES = qualified_table('gear_correlation_scores')
|
|
GEAR_CORRELATION_RAW_METRICS = qualified_table('gear_correlation_raw_metrics')
|
|
CORRELATION_PARAM_MODELS = qualified_table('correlation_param_models')
|
|
GEAR_GROUP_PARENT_CANDIDATE_SNAPSHOTS = qualified_table('gear_group_parent_candidate_snapshots')
|
|
GEAR_GROUP_PARENT_RESOLUTION = qualified_table('gear_group_parent_resolution')
|
|
GEAR_PARENT_CANDIDATE_EXCLUSIONS = qualified_table('gear_parent_candidate_exclusions')
|
|
GEAR_PARENT_LABEL_SESSIONS = qualified_table('gear_parent_label_sessions')
|
|
GEAR_PARENT_LABEL_TRACKING_CYCLES = qualified_table('gear_parent_label_tracking_cycles')
|
|
|
|
_SHORT_NAME_STATUS = 'SKIPPED_SHORT_NAME'
|
|
_NO_CANDIDATE_STATUS = 'NO_CANDIDATE'
|
|
_MANUAL_CONFIRMED_STATUS = 'MANUAL_CONFIRMED'
|
|
_AUTO_PROMOTED_STATUS = 'AUTO_PROMOTED'
|
|
_REVIEW_REQUIRED_STATUS = 'REVIEW_REQUIRED'
|
|
_UNRESOLVED_STATUS = 'UNRESOLVED'
|
|
_DIRECT_PARENT_MATCH_STATUS = 'DIRECT_PARENT_MATCH'
|
|
_REJECT_COOLDOWN_HOURS = 24
|
|
_MAX_CORRELATION_CANDIDATES = 5
|
|
_MIN_AUTO_PROMOTION_STABLE_CYCLES = 3
|
|
_MIN_AUTO_PROMOTION_SCORE = 0.72
|
|
_MIN_AUTO_PROMOTION_MARGIN = 0.15
|
|
_MIN_REVIEW_REQUIRED_SCORE = 0.60
|
|
_MIN_PREFIX_BONUS_SCORE = 0.50
|
|
_CHINA_MMSI_PREFIX_BONUS = 0.05
|
|
_CHINA_MMSI_PREFIXES = ('412', '413')
|
|
_TRACK_SUPPORT_POINT_TARGET = 12
|
|
_TRACK_SUPPORT_SPAN_TARGET_MINUTES = 90.0
|
|
_VISIT_SUPPORT_POINT_TARGET = 8
|
|
_VISIT_SUPPORT_SPAN_TARGET_MINUTES = 60.0
|
|
_ACTIVITY_SUPPORT_POINT_TARGET = 12
|
|
_ACTIVITY_SUPPORT_SPAN_TARGET_MINUTES = 90.0
|
|
_VISIT_ZONE_THRESHOLD_NM = 5.0
|
|
_RAW_SCORE_WINDOW_HOURS = 6
|
|
|
|
|
|
@dataclass
|
|
class RegistryVessel:
|
|
vessel_id: int
|
|
mmsi: str
|
|
name_cn: str
|
|
name_en: str
|
|
|
|
|
|
@dataclass
|
|
class CandidateScore:
|
|
mmsi: str
|
|
name: str
|
|
vessel_id: Optional[int]
|
|
target_type: str
|
|
candidate_source: str
|
|
base_corr_score: float
|
|
name_match_score: float
|
|
track_similarity_score: float
|
|
visit_score_6h: float
|
|
proximity_score_6h: float
|
|
activity_sync_score_6h: float
|
|
stability_score: float
|
|
registry_bonus: float
|
|
episode_prior_bonus: float
|
|
lineage_prior_bonus: float
|
|
label_prior_bonus: float
|
|
final_score: float
|
|
streak_count: int
|
|
model_id: int
|
|
model_name: str
|
|
evidence: dict[str, Any]
|
|
|
|
|
|
def _clamp(value: float, floor: float = 0.0, ceil: float = 1.0) -> float:
|
|
return max(floor, min(ceil, value))
|
|
|
|
|
|
def _china_mmsi_prefix_bonus(mmsi: str, pre_bonus_score: float) -> float:
|
|
if pre_bonus_score < _MIN_PREFIX_BONUS_SCORE:
|
|
return 0.0
|
|
if any((mmsi or '').startswith(prefix) for prefix in _CHINA_MMSI_PREFIXES):
|
|
return _CHINA_MMSI_PREFIX_BONUS
|
|
return 0.0
|
|
|
|
|
|
def _apply_final_score_bonus(mmsi: str, weighted_score: float) -> tuple[float, float, float]:
|
|
pre_bonus_score = _clamp(weighted_score)
|
|
china_mmsi_bonus = _china_mmsi_prefix_bonus(mmsi, pre_bonus_score)
|
|
final_score = _clamp(weighted_score + china_mmsi_bonus)
|
|
return pre_bonus_score, china_mmsi_bonus, final_score
|
|
|
|
|
|
def _to_aware_utc(value: Any) -> Optional[datetime]:
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, datetime):
|
|
if value.tzinfo is None:
|
|
return value.replace(tzinfo=timezone.utc)
|
|
return value.astimezone(timezone.utc)
|
|
try:
|
|
parsed = datetime.fromisoformat(str(value))
|
|
except Exception:
|
|
return None
|
|
if parsed.tzinfo is None:
|
|
return parsed.replace(tzinfo=timezone.utc)
|
|
return parsed.astimezone(timezone.utc)
|
|
|
|
|
|
def _span_minutes(timestamps: list[datetime]) -> float:
|
|
if len(timestamps) < 2:
|
|
return 0.0
|
|
return max(0.0, (timestamps[-1] - timestamps[0]).total_seconds() / 60.0)
|
|
|
|
|
|
def _support_factor(point_count: int, span_minutes: float, point_target: int, span_target_minutes: float) -> float:
|
|
if point_count <= 0 or span_minutes <= 0:
|
|
return 0.0
|
|
point_support = min(1.0, point_count / max(point_target, 1))
|
|
span_support = min(1.0, span_minutes / max(span_target_minutes, 1.0))
|
|
return _clamp(math.sqrt(point_support * span_support))
|
|
|
|
|
|
def _haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
|
earth_radius_nm = 3440.065
|
|
phi1 = math.radians(lat1)
|
|
phi2 = math.radians(lat2)
|
|
dphi = math.radians(lat2 - lat1)
|
|
dlam = math.radians(lon2 - lon1)
|
|
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
|
|
return earth_radius_nm * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
|
|
|
|
|
def _build_track_coverage_metrics(
|
|
center_track: list[dict[str, Any]],
|
|
vessel_track: list[dict[str, Any]],
|
|
gear_center_lat: float,
|
|
gear_center_lon: float,
|
|
) -> dict[str, float | int]:
|
|
vessel_timestamps = sorted(
|
|
ts for ts in (_to_aware_utc(point.get('timestamp')) for point in vessel_track)
|
|
if ts is not None
|
|
)
|
|
center_timestamps = sorted(
|
|
ts for ts in (_to_aware_utc(point.get('timestamp')) for point in center_track)
|
|
if ts is not None
|
|
)
|
|
|
|
track_point_count = len(vessel_track)
|
|
track_span_minutes = _span_minutes(vessel_timestamps)
|
|
center_point_count = len(center_track)
|
|
center_span_minutes = _span_minutes(center_timestamps)
|
|
|
|
overlap_points: list[dict[str, Any]] = vessel_track
|
|
if vessel_timestamps and center_timestamps:
|
|
overlap_start = center_timestamps[0]
|
|
overlap_end = center_timestamps[-1]
|
|
overlap_points = [
|
|
point for point in vessel_track
|
|
if (ts := _to_aware_utc(point.get('timestamp'))) is not None and overlap_start <= ts <= overlap_end
|
|
]
|
|
overlap_timestamps = sorted(
|
|
ts for ts in (_to_aware_utc(point.get('timestamp')) for point in overlap_points)
|
|
if ts is not None
|
|
)
|
|
overlap_point_count = len(overlap_points)
|
|
overlap_span_minutes = _span_minutes(overlap_timestamps)
|
|
|
|
in_zone_points = [
|
|
point for point in overlap_points
|
|
if _haversine_nm(gear_center_lat, gear_center_lon, float(point['lat']), float(point['lon'])) < _VISIT_ZONE_THRESHOLD_NM
|
|
]
|
|
in_zone_timestamps = sorted(
|
|
ts for ts in (_to_aware_utc(point.get('timestamp')) for point in in_zone_points)
|
|
if ts is not None
|
|
)
|
|
in_zone_point_count = len(in_zone_points)
|
|
in_zone_span_minutes = _span_minutes(in_zone_timestamps)
|
|
|
|
track_coverage_factor = _support_factor(
|
|
track_point_count,
|
|
track_span_minutes,
|
|
_TRACK_SUPPORT_POINT_TARGET,
|
|
_TRACK_SUPPORT_SPAN_TARGET_MINUTES,
|
|
)
|
|
visit_coverage_factor = _support_factor(
|
|
in_zone_point_count,
|
|
in_zone_span_minutes,
|
|
_VISIT_SUPPORT_POINT_TARGET,
|
|
_VISIT_SUPPORT_SPAN_TARGET_MINUTES,
|
|
)
|
|
activity_coverage_factor = _support_factor(
|
|
in_zone_point_count,
|
|
in_zone_span_minutes,
|
|
_ACTIVITY_SUPPORT_POINT_TARGET,
|
|
_ACTIVITY_SUPPORT_SPAN_TARGET_MINUTES,
|
|
)
|
|
coverage_factor = round(
|
|
(track_coverage_factor + visit_coverage_factor + activity_coverage_factor) / 3.0,
|
|
4,
|
|
)
|
|
|
|
return {
|
|
'trackPointCount': track_point_count,
|
|
'trackSpanMinutes': round(track_span_minutes, 1),
|
|
'centerPointCount': center_point_count,
|
|
'centerSpanMinutes': round(center_span_minutes, 1),
|
|
'overlapPointCount': overlap_point_count,
|
|
'overlapSpanMinutes': round(overlap_span_minutes, 1),
|
|
'inZonePointCount': in_zone_point_count,
|
|
'inZoneSpanMinutes': round(in_zone_span_minutes, 1),
|
|
'trackCoverageFactor': round(track_coverage_factor, 4),
|
|
'visitCoverageFactor': round(visit_coverage_factor, 4),
|
|
'activityCoverageFactor': round(activity_coverage_factor, 4),
|
|
'coverageFactor': coverage_factor,
|
|
'scoreWindowHours': _RAW_SCORE_WINDOW_HOURS,
|
|
}
|
|
|
|
|
|
def _candidate_sources(candidate: Optional[CandidateScore]) -> set[str]:
|
|
if candidate is None:
|
|
return set()
|
|
raw = candidate.evidence.get('sources')
|
|
if isinstance(raw, list):
|
|
return {str(item) for item in raw if item}
|
|
return set()
|
|
|
|
|
|
def _top_candidate_stable_cycles(existing: Optional[dict[str, Any]], top_candidate: Optional[CandidateScore]) -> int:
|
|
if top_candidate is None:
|
|
return 0
|
|
previous_mmsi = None
|
|
previous_cycles = 0
|
|
if existing is not None:
|
|
previous_summary = existing.get('evidence_summary') or {}
|
|
previous_mmsi = previous_summary.get('topCandidateMmsi')
|
|
previous_cycles = int(existing.get('stable_cycles') or 0)
|
|
if previous_mmsi == top_candidate.mmsi:
|
|
return max(previous_cycles + 1, 1)
|
|
return 1
|
|
|
|
|
|
def _status_reason(status: str) -> Optional[str]:
|
|
if status == _SHORT_NAME_STATUS:
|
|
return '정규화 이름 길이 4 미만'
|
|
if status == _NO_CANDIDATE_STATUS:
|
|
return '후보를 생성하지 못함'
|
|
if status == _DIRECT_PARENT_MATCH_STATUS:
|
|
return '그룹 멤버에 직접 모선이 포함됨'
|
|
return None
|
|
|
|
|
|
def _select_status(
|
|
top_candidate: Optional[CandidateScore],
|
|
margin: float,
|
|
stable_cycles: int,
|
|
) -> tuple[str, str]:
|
|
if top_candidate is None:
|
|
return _NO_CANDIDATE_STATUS, 'AUTO_NO_CANDIDATE'
|
|
|
|
has_correlation = 'CORRELATION' in _candidate_sources(top_candidate)
|
|
if (
|
|
top_candidate.target_type == 'VESSEL'
|
|
and has_correlation
|
|
and top_candidate.final_score >= _MIN_AUTO_PROMOTION_SCORE
|
|
and margin >= _MIN_AUTO_PROMOTION_MARGIN
|
|
and stable_cycles >= _MIN_AUTO_PROMOTION_STABLE_CYCLES
|
|
):
|
|
return _AUTO_PROMOTED_STATUS, 'AUTO_PROMOTION'
|
|
|
|
if top_candidate.final_score >= _MIN_REVIEW_REQUIRED_SCORE:
|
|
return _REVIEW_REQUIRED_STATUS, 'AUTO_REVIEW'
|
|
|
|
return _UNRESOLVED_STATUS, 'AUTO_SCORE'
|
|
|
|
|
|
def _load_default_model(conn) -> tuple[int, str]:
|
|
cur = conn.cursor()
|
|
try:
|
|
cur.execute(
|
|
f"""
|
|
SELECT id, name
|
|
FROM {CORRELATION_PARAM_MODELS}
|
|
WHERE is_active = TRUE
|
|
ORDER BY is_default DESC, id ASC
|
|
LIMIT 1
|
|
"""
|
|
)
|
|
row = cur.fetchone()
|
|
if row is None:
|
|
return 1, 'default'
|
|
return int(row[0]), row[1] or 'default'
|
|
finally:
|
|
cur.close()
|
|
|
|
|
|
def _load_registry(conn) -> tuple[dict[str, RegistryVessel], dict[str, list[RegistryVessel]]]:
|
|
cur = conn.cursor()
|
|
try:
|
|
cur.execute(
|
|
f"""
|
|
SELECT id, COALESCE(mmsi, ''), COALESCE(name_cn, ''), COALESCE(name_en, '')
|
|
FROM {FLEET_VESSELS}
|
|
"""
|
|
)
|
|
by_mmsi: dict[str, RegistryVessel] = {}
|
|
by_normalized_name: dict[str, list[RegistryVessel]] = {}
|
|
for vessel_id, mmsi, name_cn, name_en in cur.fetchall():
|
|
vessel = RegistryVessel(
|
|
vessel_id=int(vessel_id),
|
|
mmsi=mmsi or '',
|
|
name_cn=name_cn or '',
|
|
name_en=name_en or '',
|
|
)
|
|
if vessel.mmsi:
|
|
by_mmsi[vessel.mmsi] = vessel
|
|
for raw_name in (vessel.name_cn, vessel.name_en):
|
|
normalized = normalize_parent_name(raw_name)
|
|
if normalized:
|
|
by_normalized_name.setdefault(normalized, []).append(vessel)
|
|
return by_mmsi, by_normalized_name
|
|
finally:
|
|
cur.close()
|
|
|
|
|
|
def _json_to_dict(value: Any) -> dict[str, Any]:
|
|
if value is None:
|
|
return {}
|
|
if isinstance(value, dict):
|
|
return value
|
|
try:
|
|
return json.loads(value)
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def _load_existing_resolution(conn, group_keys: list[str]) -> dict[tuple[str, int], dict[str, Any]]:
|
|
if not group_keys:
|
|
return {}
|
|
cur = conn.cursor()
|
|
try:
|
|
cur.execute(
|
|
f"""
|
|
SELECT group_key, sub_cluster_id, parent_name, normalized_parent_name,
|
|
status, selected_parent_mmsi, selected_parent_name, selected_vessel_id,
|
|
confidence, decision_source, top_score, second_score, score_margin,
|
|
stable_cycles, approved_by, approved_at, manual_comment,
|
|
rejected_candidate_mmsi, rejected_at, evidence_summary,
|
|
episode_id, continuity_source, continuity_score, prior_bonus_total
|
|
FROM {GEAR_GROUP_PARENT_RESOLUTION}
|
|
WHERE group_key = ANY(%s)
|
|
""",
|
|
(group_keys,),
|
|
)
|
|
result: dict[tuple[str, int], dict[str, Any]] = {}
|
|
for row in cur.fetchall():
|
|
key = (row[0], int(row[1]))
|
|
result[key] = {
|
|
'parent_name': row[2],
|
|
'normalized_parent_name': row[3],
|
|
'status': row[4],
|
|
'selected_parent_mmsi': row[5],
|
|
'selected_parent_name': row[6],
|
|
'selected_vessel_id': row[7],
|
|
'confidence': row[8],
|
|
'decision_source': row[9],
|
|
'top_score': row[10] or 0.0,
|
|
'second_score': row[11] or 0.0,
|
|
'score_margin': row[12] or 0.0,
|
|
'stable_cycles': row[13] or 0,
|
|
'approved_by': row[14],
|
|
'approved_at': row[15],
|
|
'manual_comment': row[16],
|
|
'rejected_candidate_mmsi': row[17],
|
|
'rejected_at': row[18],
|
|
'evidence_summary': _json_to_dict(row[19]),
|
|
'episode_id': row[20],
|
|
'continuity_source': row[21],
|
|
'continuity_score': row[22] or 0.0,
|
|
'prior_bonus_total': row[23] or 0.0,
|
|
}
|
|
return result
|
|
finally:
|
|
cur.close()
|
|
|
|
|
|
def _expire_label_sessions(conn) -> None:
|
|
cur = conn.cursor()
|
|
try:
|
|
cur.execute(
|
|
f"""
|
|
UPDATE {GEAR_PARENT_LABEL_SESSIONS}
|
|
SET status = 'EXPIRED',
|
|
updated_at = NOW()
|
|
WHERE status = 'ACTIVE'
|
|
AND active_until <= NOW()
|
|
"""
|
|
)
|
|
finally:
|
|
cur.close()
|
|
|
|
|
|
def _load_active_candidate_exclusions(conn, group_keys: list[str]) -> dict[str, Any]:
|
|
result: dict[str, Any] = {
|
|
'global': set(),
|
|
'group': {},
|
|
}
|
|
cur = conn.cursor()
|
|
try:
|
|
cur.execute(
|
|
f"""
|
|
SELECT scope_type, group_key, sub_cluster_id, candidate_mmsi
|
|
FROM {GEAR_PARENT_CANDIDATE_EXCLUSIONS}
|
|
WHERE released_at IS NULL
|
|
AND active_from <= NOW()
|
|
AND (active_until IS NULL OR active_until > NOW())
|
|
AND (scope_type = 'GLOBAL' OR group_key = ANY(%s))
|
|
ORDER BY active_from DESC, id DESC
|
|
""",
|
|
(group_keys or [''],),
|
|
)
|
|
for scope_type, group_key, sub_cluster_id, candidate_mmsi in cur.fetchall():
|
|
if scope_type == 'GLOBAL':
|
|
result['global'].add(candidate_mmsi)
|
|
continue
|
|
key = (group_key, int(sub_cluster_id))
|
|
result['group'].setdefault(key, set()).add(candidate_mmsi)
|
|
return result
|
|
finally:
|
|
cur.close()
|
|
|
|
|
|
def _load_active_label_sessions(conn, group_keys: list[str]) -> dict[tuple[str, int], dict[str, Any]]:
|
|
if not group_keys:
|
|
return {}
|
|
cur = conn.cursor()
|
|
try:
|
|
cur.execute(
|
|
f"""
|
|
SELECT DISTINCT ON (group_key, sub_cluster_id)
|
|
id, group_key, sub_cluster_id,
|
|
label_parent_mmsi, label_parent_name, label_parent_vessel_id,
|
|
duration_days, active_from, active_until, actor, comment, metadata
|
|
FROM {GEAR_PARENT_LABEL_SESSIONS}
|
|
WHERE status = 'ACTIVE'
|
|
AND active_from <= NOW()
|
|
AND active_until > NOW()
|
|
AND group_key = ANY(%s)
|
|
ORDER BY group_key, sub_cluster_id, active_from DESC, id DESC
|
|
""",
|
|
(group_keys,),
|
|
)
|
|
result: dict[tuple[str, int], dict[str, Any]] = {}
|
|
for row in cur.fetchall():
|
|
result[(row[1], int(row[2]))] = {
|
|
'id': int(row[0]),
|
|
'group_key': row[1],
|
|
'sub_cluster_id': int(row[2]),
|
|
'label_parent_mmsi': row[3],
|
|
'label_parent_name': row[4],
|
|
'label_parent_vessel_id': row[5],
|
|
'duration_days': int(row[6]),
|
|
'active_from': row[7],
|
|
'active_until': row[8],
|
|
'actor': row[9],
|
|
'comment': row[10],
|
|
'metadata': _json_to_dict(row[11]),
|
|
}
|
|
return result
|
|
finally:
|
|
cur.close()
|
|
|
|
|
|
def _load_correlation_scores(
|
|
conn,
|
|
default_model_id: int,
|
|
group_keys: list[str],
|
|
) -> dict[tuple[str, int], list[dict[str, Any]]]:
|
|
if not group_keys:
|
|
return {}
|
|
cur = conn.cursor()
|
|
try:
|
|
cur.execute(
|
|
f"""
|
|
SELECT group_key, sub_cluster_id, target_mmsi, target_type, COALESCE(target_name, ''),
|
|
current_score, streak_count
|
|
FROM {GEAR_CORRELATION_SCORES}
|
|
WHERE model_id = %s
|
|
AND group_key = ANY(%s)
|
|
AND target_type = 'VESSEL'
|
|
ORDER BY group_key, sub_cluster_id, current_score DESC, last_observed_at DESC
|
|
""",
|
|
(default_model_id, group_keys),
|
|
)
|
|
result: dict[tuple[str, int], list[dict[str, Any]]] = {}
|
|
for row in cur.fetchall():
|
|
key = (row[0], int(row[1]))
|
|
result.setdefault(key, []).append({
|
|
'target_mmsi': row[2],
|
|
'target_type': row[3],
|
|
'target_name': row[4] or '',
|
|
'current_score': float(row[5] or 0.0),
|
|
'streak_count': int(row[6] or 0),
|
|
})
|
|
return result
|
|
finally:
|
|
cur.close()
|
|
|
|
|
|
def _load_raw_metric_averages(conn, group_keys: list[str]) -> dict[tuple[str, int, str], dict[str, float]]:
|
|
if not group_keys:
|
|
return {}
|
|
cur = conn.cursor()
|
|
try:
|
|
cur.execute(
|
|
f"""
|
|
SELECT group_key,
|
|
sub_cluster_id,
|
|
target_mmsi,
|
|
AVG(COALESCE(visit_score, 0)) AS avg_visit,
|
|
AVG(COALESCE(proximity_ratio, 0)) AS avg_proximity,
|
|
AVG(COALESCE(activity_sync, 0)) AS avg_activity
|
|
FROM {GEAR_CORRELATION_RAW_METRICS}
|
|
WHERE group_key = ANY(%s)
|
|
AND observed_at > NOW() - INTERVAL '6 hours'
|
|
GROUP BY group_key, sub_cluster_id, target_mmsi
|
|
""",
|
|
(group_keys,),
|
|
)
|
|
result: dict[tuple[str, int, str], dict[str, float]] = {}
|
|
for row in cur.fetchall():
|
|
result[(row[0], int(row[1]), row[2])] = {
|
|
'visit_score_6h': float(row[3] or 0.0),
|
|
'proximity_score_6h': float(row[4] or 0.0),
|
|
'activity_sync_score_6h': float(row[5] or 0.0),
|
|
}
|
|
return result
|
|
finally:
|
|
cur.close()
|
|
|
|
|
|
def _load_group_center_tracks(conn, group_keys: list[str]) -> dict[tuple[str, int], list[dict[str, Any]]]:
|
|
if not group_keys:
|
|
return {}
|
|
cur = conn.cursor()
|
|
try:
|
|
cur.execute(
|
|
f"""
|
|
SELECT group_key, sub_cluster_id, snapshot_time, ST_Y(center_point) AS lat, ST_X(center_point) AS lon
|
|
FROM {GROUP_POLYGON_SNAPSHOTS}
|
|
WHERE group_key = ANY(%s)
|
|
AND resolution = '1h'
|
|
AND center_point IS NOT NULL
|
|
AND snapshot_time > NOW() - INTERVAL '6 hours'
|
|
ORDER BY group_key, sub_cluster_id, snapshot_time ASC
|
|
""",
|
|
(group_keys,),
|
|
)
|
|
result: dict[tuple[str, int], list[dict[str, Any]]] = {}
|
|
for row in cur.fetchall():
|
|
result.setdefault((row[0], int(row[1])), []).append({
|
|
'timestamp': row[2],
|
|
'lat': float(row[3]),
|
|
'lon': float(row[4]),
|
|
})
|
|
return result
|
|
finally:
|
|
cur.close()
|
|
|
|
|
|
def _name_match_score(parent_name: str, candidate_name: str, registry: Optional[RegistryVessel]) -> float:
|
|
def score_pair(left: str, right: str) -> float:
|
|
raw_left = (left or '').strip().upper()
|
|
raw_right = (right or '').strip().upper()
|
|
normalized_left = normalize_parent_name(left)
|
|
normalized_right = normalize_parent_name(right)
|
|
alpha_left = ''.join(ch for ch in normalized_left if ch.isalpha())
|
|
alpha_right = ''.join(ch for ch in normalized_right if ch.isalpha())
|
|
if not normalized_left or not normalized_right:
|
|
return 0.0
|
|
if raw_left and raw_left == raw_right:
|
|
return 1.0
|
|
if normalized_left == normalized_right:
|
|
return 0.8
|
|
if normalized_left.startswith(normalized_right) or normalized_right.startswith(normalized_left):
|
|
return 0.5
|
|
if normalized_left in normalized_right or normalized_right in normalized_left:
|
|
return 0.5
|
|
if alpha_left and alpha_left == alpha_right:
|
|
return 0.3
|
|
return 0.0
|
|
|
|
score = score_pair(parent_name, candidate_name)
|
|
if registry is not None:
|
|
score = max(score, score_pair(parent_name, registry.name_cn))
|
|
score = max(score, score_pair(parent_name, registry.name_en))
|
|
return score
|
|
|
|
|
|
def _candidate_name(candidate_mmsi: str, all_positions: dict[str, dict], registry: Optional[RegistryVessel]) -> str:
|
|
position_name = (all_positions.get(candidate_mmsi) or {}).get('name', '')
|
|
if position_name:
|
|
return position_name
|
|
if registry is not None:
|
|
return registry.name_cn or registry.name_en or candidate_mmsi
|
|
return candidate_mmsi
|
|
|
|
|
|
def _direct_parent_member(group: dict[str, Any], all_positions: dict[str, dict]) -> Optional[dict[str, Any]]:
|
|
members = group.get('members') or []
|
|
for member in members:
|
|
if member.get('isParent') and member.get('mmsi'):
|
|
return member
|
|
|
|
parent_mmsi = group.get('parent_mmsi')
|
|
if not parent_mmsi:
|
|
return None
|
|
|
|
position = all_positions.get(parent_mmsi) or {}
|
|
return {
|
|
'mmsi': parent_mmsi,
|
|
'name': position.get('name') or group.get('parent_name') or parent_mmsi,
|
|
}
|
|
|
|
|
|
def _direct_parent_stable_cycles(existing: Optional[dict[str, Any]], direct_parent_mmsi: str) -> int:
|
|
if existing is None or not direct_parent_mmsi:
|
|
return 1
|
|
|
|
previous_mmsi = existing.get('selected_parent_mmsi')
|
|
if not previous_mmsi:
|
|
previous_summary = existing.get('evidence_summary') or {}
|
|
previous_mmsi = previous_summary.get('directParentMmsi') or previous_summary.get('topCandidateMmsi')
|
|
previous_cycles = int(existing.get('stable_cycles') or 0)
|
|
if previous_mmsi == direct_parent_mmsi:
|
|
return max(previous_cycles + 1, 1)
|
|
return 1
|
|
|
|
|
|
def _build_candidate_scores(
|
|
vessel_store,
|
|
observed_at: datetime,
|
|
group: dict[str, Any],
|
|
episode_assignment,
|
|
default_model_id: int,
|
|
default_model_name: str,
|
|
score_rows: list[dict[str, Any]],
|
|
raw_metrics: dict[tuple[str, int, str], dict[str, float]],
|
|
center_track: list[dict[str, Any]],
|
|
all_positions: dict[str, dict],
|
|
registry_by_mmsi: dict[str, RegistryVessel],
|
|
registry_by_name: dict[str, list[RegistryVessel]],
|
|
existing: Optional[dict[str, Any]],
|
|
excluded_candidate_mmsis: set[str],
|
|
episode_prior_stats: dict[tuple[str, str], dict[str, Any]],
|
|
lineage_prior_stats: dict[tuple[str, str], dict[str, Any]],
|
|
label_prior_stats: dict[tuple[str, str], dict[str, Any]],
|
|
) -> list[CandidateScore]:
|
|
group_key = group['parent_name']
|
|
sub_cluster_id = int(group.get('sub_cluster_id', 0))
|
|
normalized_parent_name = normalize_parent_name(group_key)
|
|
members = group.get('members') or []
|
|
if members:
|
|
gear_center_lat = sum(float(member['lat']) for member in members) / len(members)
|
|
gear_center_lon = sum(float(member['lon']) for member in members) / len(members)
|
|
else:
|
|
gear_center_lat = 0.0
|
|
gear_center_lon = 0.0
|
|
|
|
candidates: dict[str, dict[str, Any]] = {}
|
|
score_lookup = {row['target_mmsi']: row for row in score_rows}
|
|
center_track_latlon = [
|
|
(float(point['lat']), float(point['lon']))
|
|
for point in center_track
|
|
if point.get('lat') is not None and point.get('lon') is not None
|
|
]
|
|
# v2: 시간 정렬 비교용 (ts = epoch_ms)
|
|
center_track_temporal = [
|
|
{'lat': float(point['lat']), 'lon': float(point['lon']),
|
|
'ts': _to_epoch_ms(point['timestamp'])}
|
|
for point in center_track
|
|
if point.get('lat') is not None and point.get('lon') is not None and point.get('timestamp') is not None
|
|
]
|
|
|
|
for row in score_rows[:_MAX_CORRELATION_CANDIDATES]:
|
|
candidates.setdefault(row['target_mmsi'], {'sources': set()})['sources'].add('CORRELATION')
|
|
|
|
for vessel in registry_by_name.get(normalized_parent_name, []):
|
|
if vessel.mmsi:
|
|
candidates.setdefault(vessel.mmsi, {'sources': set()})['sources'].add('REGISTRY_NAME')
|
|
|
|
if existing is not None and existing.get('episode_id') == episode_assignment.episode_id:
|
|
current_candidate = existing.get('selected_parent_mmsi') or existing.get('evidence_summary', {}).get('topCandidateMmsi')
|
|
if current_candidate:
|
|
candidates.setdefault(current_candidate, {'sources': set()})['sources'].add('PREVIOUS_SELECTION')
|
|
|
|
if existing is not None:
|
|
rejected_mmsi = existing.get('rejected_candidate_mmsi')
|
|
rejected_at = existing.get('rejected_at')
|
|
if rejected_mmsi and rejected_at is not None:
|
|
cutoff = datetime.now(timezone.utc) - timedelta(hours=_REJECT_COOLDOWN_HOURS)
|
|
if rejected_at >= cutoff and rejected_mmsi in candidates:
|
|
candidates.pop(rejected_mmsi, None)
|
|
|
|
for excluded_mmsi in excluded_candidate_mmsis:
|
|
candidates.pop(excluded_mmsi, None)
|
|
|
|
scored: list[CandidateScore] = []
|
|
for candidate_mmsi, meta in candidates.items():
|
|
registry = registry_by_mmsi.get(candidate_mmsi)
|
|
score_row = score_lookup.get(candidate_mmsi, {})
|
|
raw = raw_metrics.get((group_key, sub_cluster_id, candidate_mmsi), {})
|
|
vessel_track = _get_vessel_track(vessel_store, candidate_mmsi, hours=6)
|
|
raw_track_similarity = 0.0
|
|
vessel_track_temporal: list[dict] = []
|
|
if center_track_temporal and vessel_track:
|
|
vessel_track_temporal = [
|
|
{'lat': p['lat'], 'lon': p['lon'], 'cog': p.get('cog'),
|
|
'ts': _to_epoch_ms(p['timestamp'])}
|
|
for p in vessel_track if p.get('lat') is not None and p.get('lon') is not None
|
|
]
|
|
raw_track_similarity = compute_track_similarity_v2(
|
|
center_track_temporal, vessel_track_temporal,
|
|
)
|
|
|
|
base_corr_score = float(score_row.get('current_score', 0.0) or 0.0)
|
|
streak_count = int(score_row.get('streak_count', 0) or 0)
|
|
stability_score = _clamp(streak_count / 18.0)
|
|
candidate_name = _candidate_name(candidate_mmsi, all_positions, registry)
|
|
name_match_score = _name_match_score(group_key, candidate_name, registry)
|
|
registry_bonus = 0.05 if registry is not None else 0.0
|
|
raw_visit_score = float(raw.get('visit_score_6h', 0.0) or 0.0)
|
|
raw_proximity_score = float(raw.get('proximity_score_6h', 0.0) or 0.0)
|
|
raw_activity_score = float(raw.get('activity_sync_score_6h', 0.0) or 0.0)
|
|
coverage_metrics = _build_track_coverage_metrics(
|
|
center_track=center_track,
|
|
vessel_track=vessel_track,
|
|
gear_center_lat=gear_center_lat,
|
|
gear_center_lon=gear_center_lon,
|
|
)
|
|
track_coverage_factor = float(coverage_metrics['trackCoverageFactor'])
|
|
visit_coverage_factor = float(coverage_metrics['visitCoverageFactor'])
|
|
activity_coverage_factor = float(coverage_metrics['activityCoverageFactor'])
|
|
track_similarity = _clamp(raw_track_similarity * track_coverage_factor)
|
|
visit_score = _clamp(raw_visit_score * visit_coverage_factor)
|
|
activity_score = _clamp(raw_activity_score * activity_coverage_factor)
|
|
|
|
# proximity: 시간 보간 중심점 기반 거리 구간 차등 점수
|
|
proximity_score = 0.0
|
|
if center_track_temporal and vessel_track:
|
|
_NM_TO_M = 1852.0
|
|
slots_c = _resample_temporal(center_track_temporal)
|
|
slots_v = _resample_temporal(vessel_track_temporal)
|
|
map_c = {s['ts']: s for s in slots_c if s is not None}
|
|
map_v = {s['ts']: s for s in slots_v if s is not None}
|
|
common_ts = sorted(set(map_c.keys()) & set(map_v.keys()))
|
|
if len(common_ts) >= 3:
|
|
prox_sum = 0.0
|
|
for ts in common_ts:
|
|
sc, sv = map_c[ts], map_v[ts]
|
|
d_m = haversine_m(sc['lat'], sc['lon'], sv['lat'], sv['lon'])
|
|
d_nm = d_m / _NM_TO_M
|
|
if d_nm < 2.5:
|
|
prox_sum += 1.0
|
|
elif d_nm < 5.0:
|
|
prox_sum += 0.5
|
|
elif d_nm < 10.0:
|
|
prox_sum += 0.15
|
|
proximity_score = _clamp(prox_sum / len(common_ts) * track_coverage_factor)
|
|
|
|
weighted_score = (
|
|
0.35 * base_corr_score
|
|
+ 0.15 * name_match_score
|
|
+ 0.15 * track_similarity
|
|
+ 0.10 * visit_score
|
|
+ 0.10 * proximity_score
|
|
+ 0.05 * activity_score
|
|
+ 0.10 * stability_score
|
|
+ registry_bonus
|
|
)
|
|
pre_bonus_score, china_mmsi_bonus, final_score = _apply_final_score_bonus(
|
|
candidate_mmsi,
|
|
weighted_score,
|
|
)
|
|
prior_bonus = compute_prior_bonus_components(
|
|
observed_at=observed_at,
|
|
normalized_parent_name=normalized_parent_name,
|
|
episode_id=episode_assignment.episode_id,
|
|
candidate_mmsi=candidate_mmsi,
|
|
episode_prior_stats=episode_prior_stats,
|
|
lineage_prior_stats=lineage_prior_stats,
|
|
label_prior_stats=label_prior_stats,
|
|
)
|
|
final_score = _clamp(final_score + prior_bonus['priorBonusTotal'])
|
|
|
|
evidence = {
|
|
'normalizedParentName': normalized_parent_name,
|
|
'episodeId': episode_assignment.episode_id,
|
|
'continuitySource': episode_assignment.continuity_source,
|
|
'continuityScore': round(float(episode_assignment.continuity_score or 0.0), 6),
|
|
'sources': sorted(meta['sources']),
|
|
'trackAvailable': bool(vessel_track),
|
|
'registryMatched': registry is not None,
|
|
'coverage': coverage_metrics,
|
|
'evidenceConfidence': coverage_metrics['coverageFactor'],
|
|
'scoreBreakdown': {
|
|
'baseCorrScore': round(base_corr_score, 4),
|
|
'nameMatchScore': round(name_match_score, 4),
|
|
'trackSimilarityScore': round(track_similarity, 4),
|
|
'visitScore6h': round(visit_score, 4),
|
|
'proximityScore6h': round(proximity_score, 4),
|
|
'activitySyncScore6h': round(activity_score, 4),
|
|
'stabilityScore': round(stability_score, 4),
|
|
'registryBonus': round(registry_bonus, 4),
|
|
'preBonusScore': round(pre_bonus_score, 4),
|
|
'chinaMmsiBonus': round(china_mmsi_bonus, 4),
|
|
'episodePriorBonus': round(prior_bonus['episodePriorBonus'], 4),
|
|
'lineagePriorBonus': round(prior_bonus['lineagePriorBonus'], 4),
|
|
'labelPriorBonus': round(prior_bonus['labelPriorBonus'], 4),
|
|
'priorBonusTotal': round(prior_bonus['priorBonusTotal'], 4),
|
|
},
|
|
'scoreBreakdownRaw': {
|
|
'trackSimilarityScore': round(raw_track_similarity, 4),
|
|
'visitScore6h': round(raw_visit_score, 4),
|
|
'proximityScore6h': round(raw_proximity_score, 4),
|
|
'activitySyncScore6h': round(raw_activity_score, 4),
|
|
},
|
|
'chinaMmsiBonusApplied': china_mmsi_bonus > 0.0,
|
|
}
|
|
scored.append(CandidateScore(
|
|
mmsi=candidate_mmsi,
|
|
name=candidate_name,
|
|
vessel_id=registry.vessel_id if registry is not None else None,
|
|
target_type='VESSEL',
|
|
candidate_source=','.join(sorted(meta['sources'])),
|
|
base_corr_score=round(base_corr_score, 6),
|
|
name_match_score=round(name_match_score, 6),
|
|
track_similarity_score=round(track_similarity, 6),
|
|
visit_score_6h=round(visit_score, 6),
|
|
proximity_score_6h=round(proximity_score, 6),
|
|
activity_sync_score_6h=round(activity_score, 6),
|
|
stability_score=round(stability_score, 6),
|
|
registry_bonus=round(registry_bonus, 6),
|
|
episode_prior_bonus=round(prior_bonus['episodePriorBonus'], 6),
|
|
lineage_prior_bonus=round(prior_bonus['lineagePriorBonus'], 6),
|
|
label_prior_bonus=round(prior_bonus['labelPriorBonus'], 6),
|
|
final_score=round(final_score, 6),
|
|
streak_count=streak_count,
|
|
model_id=default_model_id,
|
|
model_name=default_model_name,
|
|
evidence=evidence,
|
|
))
|
|
|
|
scored.sort(
|
|
key=lambda item: (
|
|
item.final_score,
|
|
item.base_corr_score,
|
|
item.stability_score,
|
|
item.name_match_score,
|
|
item.mmsi,
|
|
),
|
|
reverse=True,
|
|
)
|
|
return scored
|
|
|
|
|
|
def _insert_candidate_snapshots(conn, observed_at: datetime, rows: list[tuple]) -> int:
|
|
if not rows:
|
|
return 0
|
|
cur = conn.cursor()
|
|
try:
|
|
from psycopg2.extras import execute_values
|
|
execute_values(
|
|
cur,
|
|
f"""
|
|
INSERT INTO {GEAR_GROUP_PARENT_CANDIDATE_SNAPSHOTS} (
|
|
observed_at, group_key, sub_cluster_id, parent_name, normalized_parent_name, episode_id, candidate_mmsi,
|
|
candidate_name, candidate_vessel_id, rank, candidate_source,
|
|
model_id, model_name, base_corr_score, name_match_score,
|
|
track_similarity_score, visit_score_6h, proximity_score_6h,
|
|
activity_sync_score_6h, stability_score, registry_bonus,
|
|
episode_prior_bonus, lineage_prior_bonus, label_prior_bonus,
|
|
final_score, margin_from_top, evidence
|
|
) VALUES %s
|
|
""",
|
|
rows,
|
|
page_size=200,
|
|
)
|
|
return len(rows)
|
|
finally:
|
|
cur.close()
|
|
|
|
|
|
def _insert_label_tracking_rows(conn, rows: list[tuple]) -> int:
|
|
if not rows:
|
|
return 0
|
|
cur = conn.cursor()
|
|
try:
|
|
from psycopg2.extras import execute_values
|
|
execute_values(
|
|
cur,
|
|
f"""
|
|
INSERT INTO {GEAR_PARENT_LABEL_TRACKING_CYCLES} (
|
|
label_session_id, observed_at, candidate_snapshot_observed_at, auto_status,
|
|
top_candidate_mmsi, top_candidate_name, top_candidate_score,
|
|
top_candidate_margin, candidate_count, labeled_candidate_present,
|
|
labeled_candidate_rank, labeled_candidate_score,
|
|
labeled_candidate_pre_bonus_score, labeled_candidate_margin_from_top,
|
|
matched_top1, matched_top3, evidence_summary
|
|
) VALUES %s
|
|
ON CONFLICT (label_session_id, observed_at) DO NOTHING
|
|
""",
|
|
rows,
|
|
page_size=200,
|
|
)
|
|
return len(rows)
|
|
finally:
|
|
cur.close()
|
|
|
|
|
|
def _upsert_resolution(conn, row: tuple) -> None:
|
|
cur = conn.cursor()
|
|
try:
|
|
cur.execute(
|
|
f"""
|
|
INSERT INTO {GEAR_GROUP_PARENT_RESOLUTION} (
|
|
group_key, sub_cluster_id, parent_name, normalized_parent_name,
|
|
episode_id, continuity_source, continuity_score, prior_bonus_total,
|
|
status, selected_parent_mmsi, selected_parent_name, selected_vessel_id,
|
|
confidence, decision_source, top_score, second_score, score_margin,
|
|
stable_cycles, last_evaluated_at, last_promoted_at, approved_by,
|
|
approved_at, manual_comment, rejected_candidate_mmsi, rejected_at,
|
|
evidence_summary, updated_at
|
|
) VALUES (
|
|
%s, %s, %s, %s,
|
|
%s, %s, %s, %s,
|
|
%s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s,
|
|
%s, %s, %s, %s,
|
|
%s::jsonb, %s
|
|
)
|
|
ON CONFLICT (group_key, sub_cluster_id)
|
|
DO UPDATE SET
|
|
parent_name = EXCLUDED.parent_name,
|
|
normalized_parent_name = EXCLUDED.normalized_parent_name,
|
|
episode_id = EXCLUDED.episode_id,
|
|
continuity_source = EXCLUDED.continuity_source,
|
|
continuity_score = EXCLUDED.continuity_score,
|
|
prior_bonus_total = EXCLUDED.prior_bonus_total,
|
|
status = EXCLUDED.status,
|
|
selected_parent_mmsi = EXCLUDED.selected_parent_mmsi,
|
|
selected_parent_name = EXCLUDED.selected_parent_name,
|
|
selected_vessel_id = EXCLUDED.selected_vessel_id,
|
|
confidence = EXCLUDED.confidence,
|
|
decision_source = EXCLUDED.decision_source,
|
|
top_score = EXCLUDED.top_score,
|
|
second_score = EXCLUDED.second_score,
|
|
score_margin = EXCLUDED.score_margin,
|
|
stable_cycles = EXCLUDED.stable_cycles,
|
|
last_evaluated_at = EXCLUDED.last_evaluated_at,
|
|
last_promoted_at = EXCLUDED.last_promoted_at,
|
|
approved_by = EXCLUDED.approved_by,
|
|
approved_at = EXCLUDED.approved_at,
|
|
manual_comment = EXCLUDED.manual_comment,
|
|
rejected_candidate_mmsi = EXCLUDED.rejected_candidate_mmsi,
|
|
rejected_at = EXCLUDED.rejected_at,
|
|
evidence_summary = EXCLUDED.evidence_summary,
|
|
updated_at = EXCLUDED.updated_at
|
|
""",
|
|
row,
|
|
)
|
|
finally:
|
|
cur.close()
|
|
|
|
|
|
def _label_tracking_row(
|
|
observed_at: datetime,
|
|
label_session: dict[str, Any],
|
|
auto_status: str,
|
|
top_candidate: Optional[CandidateScore],
|
|
margin: float,
|
|
candidates: list[CandidateScore],
|
|
) -> tuple:
|
|
labeled_candidate = next(
|
|
(candidate for candidate in candidates if candidate.mmsi == label_session['label_parent_mmsi']),
|
|
None,
|
|
)
|
|
labeled_rank = None
|
|
labeled_pre_bonus_score = None
|
|
labeled_margin_from_top = None
|
|
if labeled_candidate is not None:
|
|
for index, candidate in enumerate(candidates, start=1):
|
|
if candidate.mmsi == labeled_candidate.mmsi:
|
|
labeled_rank = index
|
|
break
|
|
labeled_pre_bonus_score = (
|
|
labeled_candidate.evidence.get('scoreBreakdown', {}).get('preBonusScore')
|
|
if isinstance(labeled_candidate.evidence.get('scoreBreakdown'), dict)
|
|
else None
|
|
)
|
|
labeled_margin_from_top = round(
|
|
(top_candidate.final_score - labeled_candidate.final_score) if top_candidate else 0.0,
|
|
6,
|
|
)
|
|
|
|
evidence_summary = {
|
|
'labelParentMmsi': label_session['label_parent_mmsi'],
|
|
'labelParentName': label_session.get('label_parent_name'),
|
|
'topCandidateSources': sorted(_candidate_sources(top_candidate)),
|
|
'candidateMmsis': [candidate.mmsi for candidate in candidates[:5]],
|
|
}
|
|
|
|
return (
|
|
label_session['id'],
|
|
observed_at,
|
|
observed_at,
|
|
auto_status,
|
|
top_candidate.mmsi if top_candidate else None,
|
|
top_candidate.name if top_candidate else None,
|
|
top_candidate.final_score if top_candidate else None,
|
|
margin if top_candidate else 0.0,
|
|
len(candidates),
|
|
labeled_candidate is not None,
|
|
labeled_rank,
|
|
labeled_candidate.final_score if labeled_candidate else None,
|
|
labeled_pre_bonus_score,
|
|
labeled_margin_from_top,
|
|
top_candidate is not None and label_session['label_parent_mmsi'] == top_candidate.mmsi,
|
|
labeled_rank is not None and labeled_rank <= 3,
|
|
json.dumps(evidence_summary, ensure_ascii=False),
|
|
)
|
|
|
|
|
|
def run_gear_parent_inference(vessel_store, gear_groups: list[dict], conn) -> dict[str, int]:
|
|
"""미해결 어구 그룹에 대한 대표 모선 추론 실행."""
|
|
observed_at = datetime.now(timezone.utc)
|
|
active_groups = [group for group in gear_groups if group.get('parent_name')]
|
|
if not active_groups:
|
|
return {'groups': 0, 'candidates': 0, 'promoted': 0, 'review_required': 0, 'skipped': 0, 'no_candidate': 0, 'direct_matched': 0, 'episode_snapshots': 0}
|
|
|
|
group_keys = sorted({group['parent_name'] for group in active_groups})
|
|
episode_inputs = [
|
|
group_to_episode_input(group, normalize_parent_name(group['parent_name']))
|
|
for group in active_groups
|
|
]
|
|
lineage_keys = sorted({item.normalized_parent_name for item in episode_inputs if item.normalized_parent_name})
|
|
previous_episodes = load_active_episode_states(conn, lineage_keys)
|
|
episode_plan = build_episode_plan(episode_inputs, previous_episodes)
|
|
episode_prior_stats = load_episode_prior_stats(conn, [assignment.episode_id for assignment in episode_plan.assignments.values()])
|
|
lineage_prior_stats = load_lineage_prior_stats(conn, lineage_keys)
|
|
label_prior_stats = load_label_prior_stats(conn, lineage_keys)
|
|
registry_by_mmsi, registry_by_name = _load_registry(conn)
|
|
_expire_label_sessions(conn)
|
|
existing_resolution = _load_existing_resolution(conn, group_keys)
|
|
all_positions = vessel_store.get_all_latest_positions()
|
|
direct_parent_groups = [
|
|
group for group in active_groups
|
|
if _direct_parent_member(group, all_positions) is not None
|
|
]
|
|
unresolved_groups = [
|
|
group for group in active_groups
|
|
if _direct_parent_member(group, all_positions) is None
|
|
]
|
|
|
|
default_model_id, default_model_name = _load_default_model(conn)
|
|
correlation_scores = _load_correlation_scores(conn, default_model_id, group_keys)
|
|
raw_metric_averages = _load_raw_metric_averages(conn, group_keys)
|
|
center_tracks = _load_group_center_tracks(conn, group_keys)
|
|
active_exclusions = _load_active_candidate_exclusions(conn, group_keys)
|
|
active_label_sessions = _load_active_label_sessions(conn, group_keys)
|
|
|
|
snapshot_rows: list[tuple] = []
|
|
label_tracking_rows: list[tuple] = []
|
|
episode_snapshot_payloads: dict[tuple[str, int], dict[str, Any]] = {}
|
|
promoted = 0
|
|
review_required = 0
|
|
skipped = 0
|
|
no_candidate = 0
|
|
direct_matched = 0
|
|
|
|
for group in direct_parent_groups:
|
|
group_key = group['parent_name']
|
|
sub_cluster_id = int(group.get('sub_cluster_id', 0))
|
|
key = (group_key, sub_cluster_id)
|
|
episode_assignment = episode_plan.assignments.get(key)
|
|
if episode_assignment is None:
|
|
continue
|
|
existing = existing_resolution.get(key)
|
|
direct_parent = _direct_parent_member(group, all_positions)
|
|
if direct_parent is None:
|
|
continue
|
|
normalized_parent_name = normalize_parent_name(group_key)
|
|
direct_parent_mmsi = str(direct_parent.get('mmsi') or '')
|
|
direct_parent_name = str(direct_parent.get('name') or group_key or direct_parent_mmsi)
|
|
stable_cycles = _direct_parent_stable_cycles(existing, direct_parent_mmsi)
|
|
status_reason = _status_reason(_DIRECT_PARENT_MATCH_STATUS)
|
|
evidence_summary = {
|
|
'episodeId': episode_assignment.episode_id,
|
|
'continuitySource': episode_assignment.continuity_source,
|
|
'continuityScore': episode_assignment.continuity_score,
|
|
'mergedFromEpisodeIds': episode_assignment.merged_from_episode_ids,
|
|
'splitFromEpisodeId': episode_assignment.split_from_episode_id,
|
|
'normalizedParentName': normalized_parent_name,
|
|
'candidateCount': 0,
|
|
'directParentMmsi': direct_parent_mmsi,
|
|
'directParentName': direct_parent_name,
|
|
'statusReason': status_reason,
|
|
'trackable': is_trackable_parent_name(group_key),
|
|
}
|
|
|
|
status = _DIRECT_PARENT_MATCH_STATUS
|
|
decision_source = 'DIRECT_PARENT_MATCH'
|
|
selected_parent_mmsi = direct_parent_mmsi
|
|
selected_parent_name = direct_parent_name
|
|
selected_vessel_id = registry_by_mmsi.get(direct_parent_mmsi).vessel_id if direct_parent_mmsi in registry_by_mmsi else None
|
|
confidence = 1.0
|
|
last_promoted_at = observed_at
|
|
|
|
if existing is not None and existing.get('status') == _MANUAL_CONFIRMED_STATUS:
|
|
status = _MANUAL_CONFIRMED_STATUS
|
|
decision_source = existing.get('decision_source') or 'MANUAL'
|
|
selected_parent_mmsi = existing.get('selected_parent_mmsi') or selected_parent_mmsi
|
|
selected_parent_name = existing.get('selected_parent_name') or selected_parent_name
|
|
selected_vessel_id = existing.get('selected_vessel_id') if existing.get('selected_vessel_id') is not None else selected_vessel_id
|
|
confidence = existing.get('confidence') or confidence
|
|
last_promoted_at = existing.get('approved_at') or last_promoted_at
|
|
evidence_summary['statusReason'] = existing.get('evidence_summary', {}).get('statusReason') or status_reason
|
|
|
|
_upsert_resolution(
|
|
conn,
|
|
(
|
|
group_key,
|
|
sub_cluster_id,
|
|
group_key,
|
|
normalized_parent_name,
|
|
episode_assignment.episode_id,
|
|
episode_assignment.continuity_source,
|
|
episode_assignment.continuity_score,
|
|
0.0,
|
|
status,
|
|
selected_parent_mmsi,
|
|
selected_parent_name,
|
|
selected_vessel_id,
|
|
confidence,
|
|
decision_source,
|
|
confidence or 0.0,
|
|
0.0,
|
|
confidence or 0.0,
|
|
stable_cycles,
|
|
observed_at,
|
|
last_promoted_at,
|
|
(existing or {}).get('approved_by'),
|
|
(existing or {}).get('approved_at'),
|
|
(existing or {}).get('manual_comment'),
|
|
(existing or {}).get('rejected_candidate_mmsi'),
|
|
(existing or {}).get('rejected_at'),
|
|
json.dumps(evidence_summary, ensure_ascii=False),
|
|
observed_at,
|
|
),
|
|
)
|
|
episode_snapshot_payloads[key] = {
|
|
'parentEpisodeIds': episode_assignment.merged_from_episode_ids,
|
|
'topCandidateMmsi': selected_parent_mmsi,
|
|
'topCandidateScore': confidence or 1.0,
|
|
'resolutionStatus': status,
|
|
'metadata': {
|
|
'splitFromEpisodeId': episode_assignment.split_from_episode_id,
|
|
'directParentMmsi': direct_parent_mmsi,
|
|
},
|
|
}
|
|
direct_matched += 1
|
|
|
|
for group in unresolved_groups:
|
|
group_key = group['parent_name']
|
|
sub_cluster_id = int(group.get('sub_cluster_id', 0))
|
|
key = (group_key, sub_cluster_id)
|
|
episode_assignment = episode_plan.assignments.get(key)
|
|
if episode_assignment is None:
|
|
continue
|
|
existing = existing_resolution.get(key)
|
|
normalized_parent_name = normalize_parent_name(group_key)
|
|
excluded_candidate_mmsis = set(active_exclusions['global'])
|
|
excluded_candidate_mmsis.update(active_exclusions['group'].get(key, set()))
|
|
active_label_session = active_label_sessions.get(key)
|
|
|
|
if not is_trackable_parent_name(group_key) and (existing or {}).get('status') != _MANUAL_CONFIRMED_STATUS:
|
|
skipped += 1
|
|
status_reason = _status_reason(_SHORT_NAME_STATUS)
|
|
evidence_summary = {
|
|
'episodeId': episode_assignment.episode_id,
|
|
'continuitySource': episode_assignment.continuity_source,
|
|
'continuityScore': episode_assignment.continuity_score,
|
|
'mergedFromEpisodeIds': episode_assignment.merged_from_episode_ids,
|
|
'splitFromEpisodeId': episode_assignment.split_from_episode_id,
|
|
'skipReason': status_reason,
|
|
'statusReason': status_reason,
|
|
'normalizedParentName': normalized_parent_name,
|
|
}
|
|
_upsert_resolution(
|
|
conn,
|
|
(
|
|
group_key,
|
|
sub_cluster_id,
|
|
group_key,
|
|
normalized_parent_name,
|
|
episode_assignment.episode_id,
|
|
episode_assignment.continuity_source,
|
|
episode_assignment.continuity_score,
|
|
0.0,
|
|
_SHORT_NAME_STATUS,
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
'AUTO_SKIP',
|
|
0.0,
|
|
0.0,
|
|
0.0,
|
|
0,
|
|
observed_at,
|
|
None,
|
|
None,
|
|
None,
|
|
(existing or {}).get('manual_comment'),
|
|
(existing or {}).get('rejected_candidate_mmsi'),
|
|
(existing or {}).get('rejected_at'),
|
|
json.dumps(evidence_summary, ensure_ascii=False),
|
|
observed_at,
|
|
),
|
|
)
|
|
episode_snapshot_payloads[key] = {
|
|
'parentEpisodeIds': episode_assignment.merged_from_episode_ids,
|
|
'topCandidateMmsi': None,
|
|
'topCandidateScore': 0.0,
|
|
'resolutionStatus': _SHORT_NAME_STATUS,
|
|
'metadata': {'skipReason': status_reason},
|
|
}
|
|
continue
|
|
|
|
candidates = _build_candidate_scores(
|
|
vessel_store=vessel_store,
|
|
observed_at=observed_at,
|
|
group=group,
|
|
episode_assignment=episode_assignment,
|
|
default_model_id=default_model_id,
|
|
default_model_name=default_model_name,
|
|
score_rows=correlation_scores.get(key, []),
|
|
raw_metrics=raw_metric_averages,
|
|
center_track=center_tracks.get(key, []),
|
|
all_positions=all_positions,
|
|
registry_by_mmsi=registry_by_mmsi,
|
|
registry_by_name=registry_by_name,
|
|
existing=existing,
|
|
excluded_candidate_mmsis=excluded_candidate_mmsis,
|
|
episode_prior_stats=episode_prior_stats,
|
|
lineage_prior_stats=lineage_prior_stats,
|
|
label_prior_stats=label_prior_stats,
|
|
)
|
|
|
|
top_candidate = candidates[0] if candidates else None
|
|
second_score = candidates[1].final_score if len(candidates) > 1 else 0.0
|
|
margin = round((top_candidate.final_score - second_score), 6) if top_candidate else 0.0
|
|
stable_cycles = _top_candidate_stable_cycles(existing, top_candidate)
|
|
for rank, candidate in enumerate(candidates, start=1):
|
|
snapshot_rows.append((
|
|
observed_at,
|
|
group_key,
|
|
sub_cluster_id,
|
|
group_key,
|
|
normalized_parent_name,
|
|
episode_assignment.episode_id,
|
|
candidate.mmsi,
|
|
candidate.name,
|
|
candidate.vessel_id,
|
|
rank,
|
|
candidate.candidate_source,
|
|
candidate.model_id,
|
|
candidate.model_name,
|
|
candidate.base_corr_score,
|
|
candidate.name_match_score,
|
|
candidate.track_similarity_score,
|
|
candidate.visit_score_6h,
|
|
candidate.proximity_score_6h,
|
|
candidate.activity_sync_score_6h,
|
|
candidate.stability_score,
|
|
candidate.registry_bonus,
|
|
candidate.episode_prior_bonus,
|
|
candidate.lineage_prior_bonus,
|
|
candidate.label_prior_bonus,
|
|
candidate.final_score,
|
|
round(top_candidate.final_score - candidate.final_score, 6) if top_candidate else 0.0,
|
|
json.dumps(candidate.evidence, ensure_ascii=False),
|
|
))
|
|
|
|
status, decision_source = _select_status(top_candidate, margin, stable_cycles)
|
|
auto_status = status
|
|
selected_parent_mmsi: Optional[str] = None
|
|
selected_parent_name: Optional[str] = None
|
|
selected_vessel_id: Optional[int] = None
|
|
confidence: Optional[float] = None
|
|
last_promoted_at: Optional[datetime] = None
|
|
|
|
if top_candidate is not None:
|
|
if status == _AUTO_PROMOTED_STATUS:
|
|
selected_parent_mmsi = top_candidate.mmsi
|
|
selected_parent_name = top_candidate.name
|
|
selected_vessel_id = top_candidate.vessel_id
|
|
confidence = top_candidate.final_score
|
|
last_promoted_at = observed_at
|
|
promoted += 1
|
|
elif status == _REVIEW_REQUIRED_STATUS:
|
|
selected_parent_mmsi = top_candidate.mmsi
|
|
selected_parent_name = top_candidate.name
|
|
selected_vessel_id = top_candidate.vessel_id
|
|
confidence = top_candidate.final_score
|
|
review_required += 1
|
|
elif status == _NO_CANDIDATE_STATUS:
|
|
no_candidate += 1
|
|
|
|
status_reason = _status_reason(status)
|
|
evidence_summary = {
|
|
'episodeId': episode_assignment.episode_id,
|
|
'continuitySource': episode_assignment.continuity_source,
|
|
'continuityScore': episode_assignment.continuity_score,
|
|
'mergedFromEpisodeIds': episode_assignment.merged_from_episode_ids,
|
|
'splitFromEpisodeId': episode_assignment.split_from_episode_id,
|
|
'normalizedParentName': normalized_parent_name,
|
|
'candidateCount': len(candidates),
|
|
'topCandidateMmsi': top_candidate.mmsi if top_candidate else None,
|
|
'topCandidateName': top_candidate.name if top_candidate else None,
|
|
'topCandidateSources': sorted(_candidate_sources(top_candidate)),
|
|
'hasCorrelationCandidate': 'CORRELATION' in _candidate_sources(top_candidate),
|
|
'recentTopCandidateStableCycles': stable_cycles,
|
|
'skipReason': _status_reason(_SHORT_NAME_STATUS) if status == _SHORT_NAME_STATUS else None,
|
|
'statusReason': status_reason,
|
|
'trackable': is_trackable_parent_name(group_key),
|
|
'priorBonusTotal': top_candidate.evidence.get('scoreBreakdown', {}).get('priorBonusTotal') if top_candidate else 0.0,
|
|
}
|
|
if excluded_candidate_mmsis:
|
|
evidence_summary['excludedCandidateMmsis'] = sorted(excluded_candidate_mmsis)
|
|
if active_label_session is not None:
|
|
evidence_summary['activeLabelSessionId'] = active_label_session['id']
|
|
evidence_summary['activeLabelParentMmsi'] = active_label_session['label_parent_mmsi']
|
|
|
|
if existing is not None and existing.get('status') == _MANUAL_CONFIRMED_STATUS:
|
|
status = _MANUAL_CONFIRMED_STATUS
|
|
decision_source = existing.get('decision_source') or 'MANUAL'
|
|
selected_parent_mmsi = existing.get('selected_parent_mmsi')
|
|
selected_parent_name = existing.get('selected_parent_name')
|
|
selected_vessel_id = existing.get('selected_vessel_id')
|
|
confidence = existing.get('confidence') or confidence
|
|
last_promoted_at = existing.get('approved_at') or existing.get('rejected_at') or last_promoted_at
|
|
|
|
_upsert_resolution(
|
|
conn,
|
|
(
|
|
group_key,
|
|
sub_cluster_id,
|
|
group_key,
|
|
normalized_parent_name,
|
|
episode_assignment.episode_id,
|
|
episode_assignment.continuity_source,
|
|
episode_assignment.continuity_score,
|
|
top_candidate.evidence.get('scoreBreakdown', {}).get('priorBonusTotal', 0.0) if top_candidate else 0.0,
|
|
status,
|
|
selected_parent_mmsi,
|
|
selected_parent_name,
|
|
selected_vessel_id,
|
|
confidence,
|
|
decision_source,
|
|
top_candidate.final_score if top_candidate else 0.0,
|
|
second_score,
|
|
margin,
|
|
stable_cycles,
|
|
observed_at,
|
|
last_promoted_at,
|
|
(existing or {}).get('approved_by'),
|
|
(existing or {}).get('approved_at'),
|
|
(existing or {}).get('manual_comment'),
|
|
(existing or {}).get('rejected_candidate_mmsi'),
|
|
(existing or {}).get('rejected_at'),
|
|
json.dumps(evidence_summary, ensure_ascii=False),
|
|
observed_at,
|
|
),
|
|
)
|
|
episode_snapshot_payloads[key] = {
|
|
'parentEpisodeIds': episode_assignment.merged_from_episode_ids,
|
|
'topCandidateMmsi': top_candidate.mmsi if top_candidate else None,
|
|
'topCandidateScore': top_candidate.final_score if top_candidate else 0.0,
|
|
'resolutionStatus': status,
|
|
'metadata': {
|
|
'splitFromEpisodeId': episode_assignment.split_from_episode_id,
|
|
'candidateCount': len(candidates),
|
|
'topCandidateSources': sorted(_candidate_sources(top_candidate)),
|
|
},
|
|
}
|
|
if active_label_session is not None:
|
|
label_tracking_rows.append(
|
|
_label_tracking_row(
|
|
observed_at=observed_at,
|
|
label_session=active_label_session,
|
|
auto_status=auto_status,
|
|
top_candidate=top_candidate,
|
|
margin=margin,
|
|
candidates=candidates,
|
|
)
|
|
)
|
|
|
|
sync_episode_states(conn, observed_at, episode_plan)
|
|
inserted = _insert_candidate_snapshots(conn, observed_at, snapshot_rows)
|
|
episode_snapshots_inserted = insert_episode_snapshots(conn, observed_at, episode_plan, episode_snapshot_payloads)
|
|
tracking_inserted = _insert_label_tracking_rows(conn, label_tracking_rows)
|
|
conn.commit()
|
|
logger.info(
|
|
'gear parent inference: %d groups, %d direct-match, %d candidates, %d promoted, %d review, %d skipped, %d no-candidate, %d episode-snapshots, %d label-tracking',
|
|
len(active_groups),
|
|
direct_matched,
|
|
inserted,
|
|
promoted,
|
|
review_required,
|
|
skipped,
|
|
no_candidate,
|
|
episode_snapshots_inserted,
|
|
tracking_inserted,
|
|
)
|
|
return {
|
|
'groups': len(active_groups),
|
|
'candidates': inserted,
|
|
'promoted': promoted,
|
|
'review_required': review_required,
|
|
'skipped': skipped,
|
|
'no_candidate': no_candidate,
|
|
'direct_matched': direct_matched,
|
|
'episode_snapshots': episode_snapshots_inserted,
|
|
'label_tracking': tracking_inserted,
|
|
}
|