kcg-monitoring/prediction/algorithms/gear_parent_inference.py
htlee 1033654c82 fix: 모선 추론 점수 가중치 조정 — 100%는 DIRECT_PARENT_MATCH 전용
문제: china_bonus(15%) + prior(20%) 가산으로 일반 후보 23.6%가 100% 도달
- china_bonus: 0.15 → 0.05, 적용 조건: pre >= 0.30 → 0.50
- episode_prior: 0.10 → 0.05
- lineage_prior: 0.05 → 0.03
- label_prior: 0.10 → 0.07
- total_prior_cap: 0.20 → 0.10

결과: 일반 후보 최대 ~93% (라벨 있으면 ~98%), 100%는 직접 모선 일치만

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 09:09:36 +09:00

1429 lines
57 KiB
Python

"""어구 그룹 대표 모선 추론."""
from __future__ import annotations
import json
import logging
import math
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
from algorithms.gear_correlation import _get_vessel_track
from algorithms.gear_parent_episode import (
build_episode_plan,
compute_prior_bonus_components,
group_to_episode_input,
insert_episode_snapshots,
load_active_episode_states,
load_episode_prior_stats,
load_label_prior_stats,
load_lineage_prior_stats,
sync_episode_states,
)
from algorithms.gear_name_rules import is_trackable_parent_name, normalize_parent_name
from algorithms.track_similarity import compute_track_similarity
from config import qualified_table
logger = logging.getLogger(__name__)
FLEET_VESSELS = qualified_table('fleet_vessels')
GROUP_POLYGON_SNAPSHOTS = qualified_table('group_polygon_snapshots')
GEAR_CORRELATION_SCORES = qualified_table('gear_correlation_scores')
GEAR_CORRELATION_RAW_METRICS = qualified_table('gear_correlation_raw_metrics')
CORRELATION_PARAM_MODELS = qualified_table('correlation_param_models')
GEAR_GROUP_PARENT_CANDIDATE_SNAPSHOTS = qualified_table('gear_group_parent_candidate_snapshots')
GEAR_GROUP_PARENT_RESOLUTION = qualified_table('gear_group_parent_resolution')
GEAR_PARENT_CANDIDATE_EXCLUSIONS = qualified_table('gear_parent_candidate_exclusions')
GEAR_PARENT_LABEL_SESSIONS = qualified_table('gear_parent_label_sessions')
GEAR_PARENT_LABEL_TRACKING_CYCLES = qualified_table('gear_parent_label_tracking_cycles')
_SHORT_NAME_STATUS = 'SKIPPED_SHORT_NAME'
_NO_CANDIDATE_STATUS = 'NO_CANDIDATE'
_MANUAL_CONFIRMED_STATUS = 'MANUAL_CONFIRMED'
_AUTO_PROMOTED_STATUS = 'AUTO_PROMOTED'
_REVIEW_REQUIRED_STATUS = 'REVIEW_REQUIRED'
_UNRESOLVED_STATUS = 'UNRESOLVED'
_DIRECT_PARENT_MATCH_STATUS = 'DIRECT_PARENT_MATCH'
_REJECT_COOLDOWN_HOURS = 24
_MAX_CORRELATION_CANDIDATES = 5
_MIN_AUTO_PROMOTION_STABLE_CYCLES = 3
_MIN_AUTO_PROMOTION_SCORE = 0.72
_MIN_AUTO_PROMOTION_MARGIN = 0.15
_MIN_REVIEW_REQUIRED_SCORE = 0.60
_MIN_PREFIX_BONUS_SCORE = 0.50
_CHINA_MMSI_PREFIX_BONUS = 0.05
_CHINA_MMSI_PREFIXES = ('412', '413')
_TRACK_SUPPORT_POINT_TARGET = 12
_TRACK_SUPPORT_SPAN_TARGET_MINUTES = 90.0
_VISIT_SUPPORT_POINT_TARGET = 8
_VISIT_SUPPORT_SPAN_TARGET_MINUTES = 60.0
_ACTIVITY_SUPPORT_POINT_TARGET = 12
_ACTIVITY_SUPPORT_SPAN_TARGET_MINUTES = 90.0
_VISIT_ZONE_THRESHOLD_NM = 5.0
_RAW_SCORE_WINDOW_HOURS = 6
@dataclass
class RegistryVessel:
vessel_id: int
mmsi: str
name_cn: str
name_en: str
@dataclass
class CandidateScore:
mmsi: str
name: str
vessel_id: Optional[int]
target_type: str
candidate_source: str
base_corr_score: float
name_match_score: float
track_similarity_score: float
visit_score_6h: float
proximity_score_6h: float
activity_sync_score_6h: float
stability_score: float
registry_bonus: float
episode_prior_bonus: float
lineage_prior_bonus: float
label_prior_bonus: float
final_score: float
streak_count: int
model_id: int
model_name: str
evidence: dict[str, Any]
def _clamp(value: float, floor: float = 0.0, ceil: float = 1.0) -> float:
return max(floor, min(ceil, value))
def _china_mmsi_prefix_bonus(mmsi: str, pre_bonus_score: float) -> float:
if pre_bonus_score < _MIN_PREFIX_BONUS_SCORE:
return 0.0
if any((mmsi or '').startswith(prefix) for prefix in _CHINA_MMSI_PREFIXES):
return _CHINA_MMSI_PREFIX_BONUS
return 0.0
def _apply_final_score_bonus(mmsi: str, weighted_score: float) -> tuple[float, float, float]:
pre_bonus_score = _clamp(weighted_score)
china_mmsi_bonus = _china_mmsi_prefix_bonus(mmsi, pre_bonus_score)
final_score = _clamp(weighted_score + china_mmsi_bonus)
return pre_bonus_score, china_mmsi_bonus, final_score
def _to_aware_utc(value: Any) -> Optional[datetime]:
if value is None:
return None
if isinstance(value, datetime):
if value.tzinfo is None:
return value.replace(tzinfo=timezone.utc)
return value.astimezone(timezone.utc)
try:
parsed = datetime.fromisoformat(str(value))
except Exception:
return None
if parsed.tzinfo is None:
return parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def _span_minutes(timestamps: list[datetime]) -> float:
if len(timestamps) < 2:
return 0.0
return max(0.0, (timestamps[-1] - timestamps[0]).total_seconds() / 60.0)
def _support_factor(point_count: int, span_minutes: float, point_target: int, span_target_minutes: float) -> float:
if point_count <= 0 or span_minutes <= 0:
return 0.0
point_support = min(1.0, point_count / max(point_target, 1))
span_support = min(1.0, span_minutes / max(span_target_minutes, 1.0))
return _clamp(math.sqrt(point_support * span_support))
def _haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
earth_radius_nm = 3440.065
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
return earth_radius_nm * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def _build_track_coverage_metrics(
center_track: list[dict[str, Any]],
vessel_track: list[dict[str, Any]],
gear_center_lat: float,
gear_center_lon: float,
) -> dict[str, float | int]:
vessel_timestamps = sorted(
ts for ts in (_to_aware_utc(point.get('timestamp')) for point in vessel_track)
if ts is not None
)
center_timestamps = sorted(
ts for ts in (_to_aware_utc(point.get('timestamp')) for point in center_track)
if ts is not None
)
track_point_count = len(vessel_track)
track_span_minutes = _span_minutes(vessel_timestamps)
center_point_count = len(center_track)
center_span_minutes = _span_minutes(center_timestamps)
overlap_points: list[dict[str, Any]] = vessel_track
if vessel_timestamps and center_timestamps:
overlap_start = center_timestamps[0]
overlap_end = center_timestamps[-1]
overlap_points = [
point for point in vessel_track
if (ts := _to_aware_utc(point.get('timestamp'))) is not None and overlap_start <= ts <= overlap_end
]
overlap_timestamps = sorted(
ts for ts in (_to_aware_utc(point.get('timestamp')) for point in overlap_points)
if ts is not None
)
overlap_point_count = len(overlap_points)
overlap_span_minutes = _span_minutes(overlap_timestamps)
in_zone_points = [
point for point in overlap_points
if _haversine_nm(gear_center_lat, gear_center_lon, float(point['lat']), float(point['lon'])) < _VISIT_ZONE_THRESHOLD_NM
]
in_zone_timestamps = sorted(
ts for ts in (_to_aware_utc(point.get('timestamp')) for point in in_zone_points)
if ts is not None
)
in_zone_point_count = len(in_zone_points)
in_zone_span_minutes = _span_minutes(in_zone_timestamps)
track_coverage_factor = _support_factor(
track_point_count,
track_span_minutes,
_TRACK_SUPPORT_POINT_TARGET,
_TRACK_SUPPORT_SPAN_TARGET_MINUTES,
)
visit_coverage_factor = _support_factor(
in_zone_point_count,
in_zone_span_minutes,
_VISIT_SUPPORT_POINT_TARGET,
_VISIT_SUPPORT_SPAN_TARGET_MINUTES,
)
activity_coverage_factor = _support_factor(
in_zone_point_count,
in_zone_span_minutes,
_ACTIVITY_SUPPORT_POINT_TARGET,
_ACTIVITY_SUPPORT_SPAN_TARGET_MINUTES,
)
coverage_factor = round(
(track_coverage_factor + visit_coverage_factor + activity_coverage_factor) / 3.0,
4,
)
return {
'trackPointCount': track_point_count,
'trackSpanMinutes': round(track_span_minutes, 1),
'centerPointCount': center_point_count,
'centerSpanMinutes': round(center_span_minutes, 1),
'overlapPointCount': overlap_point_count,
'overlapSpanMinutes': round(overlap_span_minutes, 1),
'inZonePointCount': in_zone_point_count,
'inZoneSpanMinutes': round(in_zone_span_minutes, 1),
'trackCoverageFactor': round(track_coverage_factor, 4),
'visitCoverageFactor': round(visit_coverage_factor, 4),
'activityCoverageFactor': round(activity_coverage_factor, 4),
'coverageFactor': coverage_factor,
'scoreWindowHours': _RAW_SCORE_WINDOW_HOURS,
}
def _candidate_sources(candidate: Optional[CandidateScore]) -> set[str]:
if candidate is None:
return set()
raw = candidate.evidence.get('sources')
if isinstance(raw, list):
return {str(item) for item in raw if item}
return set()
def _top_candidate_stable_cycles(existing: Optional[dict[str, Any]], top_candidate: Optional[CandidateScore]) -> int:
if top_candidate is None:
return 0
previous_mmsi = None
previous_cycles = 0
if existing is not None:
previous_summary = existing.get('evidence_summary') or {}
previous_mmsi = previous_summary.get('topCandidateMmsi')
previous_cycles = int(existing.get('stable_cycles') or 0)
if previous_mmsi == top_candidate.mmsi:
return max(previous_cycles + 1, 1)
return 1
def _status_reason(status: str) -> Optional[str]:
if status == _SHORT_NAME_STATUS:
return '정규화 이름 길이 4 미만'
if status == _NO_CANDIDATE_STATUS:
return '후보를 생성하지 못함'
if status == _DIRECT_PARENT_MATCH_STATUS:
return '그룹 멤버에 직접 모선이 포함됨'
return None
def _select_status(
top_candidate: Optional[CandidateScore],
margin: float,
stable_cycles: int,
) -> tuple[str, str]:
if top_candidate is None:
return _NO_CANDIDATE_STATUS, 'AUTO_NO_CANDIDATE'
has_correlation = 'CORRELATION' in _candidate_sources(top_candidate)
if (
top_candidate.target_type == 'VESSEL'
and has_correlation
and top_candidate.final_score >= _MIN_AUTO_PROMOTION_SCORE
and margin >= _MIN_AUTO_PROMOTION_MARGIN
and stable_cycles >= _MIN_AUTO_PROMOTION_STABLE_CYCLES
):
return _AUTO_PROMOTED_STATUS, 'AUTO_PROMOTION'
if top_candidate.final_score >= _MIN_REVIEW_REQUIRED_SCORE:
return _REVIEW_REQUIRED_STATUS, 'AUTO_REVIEW'
return _UNRESOLVED_STATUS, 'AUTO_SCORE'
def _load_default_model(conn) -> tuple[int, str]:
cur = conn.cursor()
try:
cur.execute(
f"""
SELECT id, name
FROM {CORRELATION_PARAM_MODELS}
WHERE is_active = TRUE
ORDER BY is_default DESC, id ASC
LIMIT 1
"""
)
row = cur.fetchone()
if row is None:
return 1, 'default'
return int(row[0]), row[1] or 'default'
finally:
cur.close()
def _load_registry(conn) -> tuple[dict[str, RegistryVessel], dict[str, list[RegistryVessel]]]:
cur = conn.cursor()
try:
cur.execute(
f"""
SELECT id, COALESCE(mmsi, ''), COALESCE(name_cn, ''), COALESCE(name_en, '')
FROM {FLEET_VESSELS}
"""
)
by_mmsi: dict[str, RegistryVessel] = {}
by_normalized_name: dict[str, list[RegistryVessel]] = {}
for vessel_id, mmsi, name_cn, name_en in cur.fetchall():
vessel = RegistryVessel(
vessel_id=int(vessel_id),
mmsi=mmsi or '',
name_cn=name_cn or '',
name_en=name_en or '',
)
if vessel.mmsi:
by_mmsi[vessel.mmsi] = vessel
for raw_name in (vessel.name_cn, vessel.name_en):
normalized = normalize_parent_name(raw_name)
if normalized:
by_normalized_name.setdefault(normalized, []).append(vessel)
return by_mmsi, by_normalized_name
finally:
cur.close()
def _json_to_dict(value: Any) -> dict[str, Any]:
if value is None:
return {}
if isinstance(value, dict):
return value
try:
return json.loads(value)
except Exception:
return {}
def _load_existing_resolution(conn, group_keys: list[str]) -> dict[tuple[str, int], dict[str, Any]]:
if not group_keys:
return {}
cur = conn.cursor()
try:
cur.execute(
f"""
SELECT group_key, sub_cluster_id, parent_name, normalized_parent_name,
status, selected_parent_mmsi, selected_parent_name, selected_vessel_id,
confidence, decision_source, top_score, second_score, score_margin,
stable_cycles, approved_by, approved_at, manual_comment,
rejected_candidate_mmsi, rejected_at, evidence_summary,
episode_id, continuity_source, continuity_score, prior_bonus_total
FROM {GEAR_GROUP_PARENT_RESOLUTION}
WHERE group_key = ANY(%s)
""",
(group_keys,),
)
result: dict[tuple[str, int], dict[str, Any]] = {}
for row in cur.fetchall():
key = (row[0], int(row[1]))
result[key] = {
'parent_name': row[2],
'normalized_parent_name': row[3],
'status': row[4],
'selected_parent_mmsi': row[5],
'selected_parent_name': row[6],
'selected_vessel_id': row[7],
'confidence': row[8],
'decision_source': row[9],
'top_score': row[10] or 0.0,
'second_score': row[11] or 0.0,
'score_margin': row[12] or 0.0,
'stable_cycles': row[13] or 0,
'approved_by': row[14],
'approved_at': row[15],
'manual_comment': row[16],
'rejected_candidate_mmsi': row[17],
'rejected_at': row[18],
'evidence_summary': _json_to_dict(row[19]),
'episode_id': row[20],
'continuity_source': row[21],
'continuity_score': row[22] or 0.0,
'prior_bonus_total': row[23] or 0.0,
}
return result
finally:
cur.close()
def _expire_label_sessions(conn) -> None:
cur = conn.cursor()
try:
cur.execute(
f"""
UPDATE {GEAR_PARENT_LABEL_SESSIONS}
SET status = 'EXPIRED',
updated_at = NOW()
WHERE status = 'ACTIVE'
AND active_until <= NOW()
"""
)
finally:
cur.close()
def _load_active_candidate_exclusions(conn, group_keys: list[str]) -> dict[str, Any]:
result: dict[str, Any] = {
'global': set(),
'group': {},
}
cur = conn.cursor()
try:
cur.execute(
f"""
SELECT scope_type, group_key, sub_cluster_id, candidate_mmsi
FROM {GEAR_PARENT_CANDIDATE_EXCLUSIONS}
WHERE released_at IS NULL
AND active_from <= NOW()
AND (active_until IS NULL OR active_until > NOW())
AND (scope_type = 'GLOBAL' OR group_key = ANY(%s))
ORDER BY active_from DESC, id DESC
""",
(group_keys or [''],),
)
for scope_type, group_key, sub_cluster_id, candidate_mmsi in cur.fetchall():
if scope_type == 'GLOBAL':
result['global'].add(candidate_mmsi)
continue
key = (group_key, int(sub_cluster_id))
result['group'].setdefault(key, set()).add(candidate_mmsi)
return result
finally:
cur.close()
def _load_active_label_sessions(conn, group_keys: list[str]) -> dict[tuple[str, int], dict[str, Any]]:
if not group_keys:
return {}
cur = conn.cursor()
try:
cur.execute(
f"""
SELECT DISTINCT ON (group_key, sub_cluster_id)
id, group_key, sub_cluster_id,
label_parent_mmsi, label_parent_name, label_parent_vessel_id,
duration_days, active_from, active_until, actor, comment, metadata
FROM {GEAR_PARENT_LABEL_SESSIONS}
WHERE status = 'ACTIVE'
AND active_from <= NOW()
AND active_until > NOW()
AND group_key = ANY(%s)
ORDER BY group_key, sub_cluster_id, active_from DESC, id DESC
""",
(group_keys,),
)
result: dict[tuple[str, int], dict[str, Any]] = {}
for row in cur.fetchall():
result[(row[1], int(row[2]))] = {
'id': int(row[0]),
'group_key': row[1],
'sub_cluster_id': int(row[2]),
'label_parent_mmsi': row[3],
'label_parent_name': row[4],
'label_parent_vessel_id': row[5],
'duration_days': int(row[6]),
'active_from': row[7],
'active_until': row[8],
'actor': row[9],
'comment': row[10],
'metadata': _json_to_dict(row[11]),
}
return result
finally:
cur.close()
def _load_correlation_scores(
conn,
default_model_id: int,
group_keys: list[str],
) -> dict[tuple[str, int], list[dict[str, Any]]]:
if not group_keys:
return {}
cur = conn.cursor()
try:
cur.execute(
f"""
SELECT group_key, sub_cluster_id, target_mmsi, target_type, COALESCE(target_name, ''),
current_score, streak_count
FROM {GEAR_CORRELATION_SCORES}
WHERE model_id = %s
AND group_key = ANY(%s)
AND target_type = 'VESSEL'
ORDER BY group_key, sub_cluster_id, current_score DESC, last_observed_at DESC
""",
(default_model_id, group_keys),
)
result: dict[tuple[str, int], list[dict[str, Any]]] = {}
for row in cur.fetchall():
key = (row[0], int(row[1]))
result.setdefault(key, []).append({
'target_mmsi': row[2],
'target_type': row[3],
'target_name': row[4] or '',
'current_score': float(row[5] or 0.0),
'streak_count': int(row[6] or 0),
})
return result
finally:
cur.close()
def _load_raw_metric_averages(conn, group_keys: list[str]) -> dict[tuple[str, int, str], dict[str, float]]:
if not group_keys:
return {}
cur = conn.cursor()
try:
cur.execute(
f"""
SELECT group_key,
sub_cluster_id,
target_mmsi,
AVG(COALESCE(visit_score, 0)) AS avg_visit,
AVG(COALESCE(proximity_ratio, 0)) AS avg_proximity,
AVG(COALESCE(activity_sync, 0)) AS avg_activity
FROM {GEAR_CORRELATION_RAW_METRICS}
WHERE group_key = ANY(%s)
AND observed_at > NOW() - INTERVAL '6 hours'
GROUP BY group_key, sub_cluster_id, target_mmsi
""",
(group_keys,),
)
result: dict[tuple[str, int, str], dict[str, float]] = {}
for row in cur.fetchall():
result[(row[0], int(row[1]), row[2])] = {
'visit_score_6h': float(row[3] or 0.0),
'proximity_score_6h': float(row[4] or 0.0),
'activity_sync_score_6h': float(row[5] or 0.0),
}
return result
finally:
cur.close()
def _load_group_center_tracks(conn, group_keys: list[str]) -> dict[tuple[str, int], list[dict[str, Any]]]:
if not group_keys:
return {}
cur = conn.cursor()
try:
cur.execute(
f"""
SELECT group_key, sub_cluster_id, snapshot_time, ST_Y(center_point) AS lat, ST_X(center_point) AS lon
FROM {GROUP_POLYGON_SNAPSHOTS}
WHERE group_key = ANY(%s)
AND resolution = '1h'
AND center_point IS NOT NULL
AND snapshot_time > NOW() - INTERVAL '6 hours'
ORDER BY group_key, sub_cluster_id, snapshot_time ASC
""",
(group_keys,),
)
result: dict[tuple[str, int], list[dict[str, Any]]] = {}
for row in cur.fetchall():
result.setdefault((row[0], int(row[1])), []).append({
'timestamp': row[2],
'lat': float(row[3]),
'lon': float(row[4]),
})
return result
finally:
cur.close()
def _name_match_score(parent_name: str, candidate_name: str, registry: Optional[RegistryVessel]) -> float:
def score_pair(left: str, right: str) -> float:
raw_left = (left or '').strip().upper()
raw_right = (right or '').strip().upper()
normalized_left = normalize_parent_name(left)
normalized_right = normalize_parent_name(right)
alpha_left = ''.join(ch for ch in normalized_left if ch.isalpha())
alpha_right = ''.join(ch for ch in normalized_right if ch.isalpha())
if not normalized_left or not normalized_right:
return 0.0
if raw_left and raw_left == raw_right:
return 1.0
if normalized_left == normalized_right:
return 0.8
if normalized_left.startswith(normalized_right) or normalized_right.startswith(normalized_left):
return 0.5
if normalized_left in normalized_right or normalized_right in normalized_left:
return 0.5
if alpha_left and alpha_left == alpha_right:
return 0.3
return 0.0
score = score_pair(parent_name, candidate_name)
if registry is not None:
score = max(score, score_pair(parent_name, registry.name_cn))
score = max(score, score_pair(parent_name, registry.name_en))
return score
def _candidate_name(candidate_mmsi: str, all_positions: dict[str, dict], registry: Optional[RegistryVessel]) -> str:
position_name = (all_positions.get(candidate_mmsi) or {}).get('name', '')
if position_name:
return position_name
if registry is not None:
return registry.name_cn or registry.name_en or candidate_mmsi
return candidate_mmsi
def _direct_parent_member(group: dict[str, Any], all_positions: dict[str, dict]) -> Optional[dict[str, Any]]:
members = group.get('members') or []
for member in members:
if member.get('isParent') and member.get('mmsi'):
return member
parent_mmsi = group.get('parent_mmsi')
if not parent_mmsi:
return None
position = all_positions.get(parent_mmsi) or {}
return {
'mmsi': parent_mmsi,
'name': position.get('name') or group.get('parent_name') or parent_mmsi,
}
def _direct_parent_stable_cycles(existing: Optional[dict[str, Any]], direct_parent_mmsi: str) -> int:
if existing is None or not direct_parent_mmsi:
return 1
previous_mmsi = existing.get('selected_parent_mmsi')
if not previous_mmsi:
previous_summary = existing.get('evidence_summary') or {}
previous_mmsi = previous_summary.get('directParentMmsi') or previous_summary.get('topCandidateMmsi')
previous_cycles = int(existing.get('stable_cycles') or 0)
if previous_mmsi == direct_parent_mmsi:
return max(previous_cycles + 1, 1)
return 1
def _build_candidate_scores(
vessel_store,
observed_at: datetime,
group: dict[str, Any],
episode_assignment,
default_model_id: int,
default_model_name: str,
score_rows: list[dict[str, Any]],
raw_metrics: dict[tuple[str, int, str], dict[str, float]],
center_track: list[dict[str, Any]],
all_positions: dict[str, dict],
registry_by_mmsi: dict[str, RegistryVessel],
registry_by_name: dict[str, list[RegistryVessel]],
existing: Optional[dict[str, Any]],
excluded_candidate_mmsis: set[str],
episode_prior_stats: dict[tuple[str, str], dict[str, Any]],
lineage_prior_stats: dict[tuple[str, str], dict[str, Any]],
label_prior_stats: dict[tuple[str, str], dict[str, Any]],
) -> list[CandidateScore]:
group_key = group['parent_name']
sub_cluster_id = int(group.get('sub_cluster_id', 0))
normalized_parent_name = normalize_parent_name(group_key)
members = group.get('members') or []
if members:
gear_center_lat = sum(float(member['lat']) for member in members) / len(members)
gear_center_lon = sum(float(member['lon']) for member in members) / len(members)
else:
gear_center_lat = 0.0
gear_center_lon = 0.0
candidates: dict[str, dict[str, Any]] = {}
score_lookup = {row['target_mmsi']: row for row in score_rows}
center_track_latlon = [
(float(point['lat']), float(point['lon']))
for point in center_track
if point.get('lat') is not None and point.get('lon') is not None
]
for row in score_rows[:_MAX_CORRELATION_CANDIDATES]:
candidates.setdefault(row['target_mmsi'], {'sources': set()})['sources'].add('CORRELATION')
for vessel in registry_by_name.get(normalized_parent_name, []):
if vessel.mmsi:
candidates.setdefault(vessel.mmsi, {'sources': set()})['sources'].add('REGISTRY_NAME')
if existing is not None and existing.get('episode_id') == episode_assignment.episode_id:
current_candidate = existing.get('selected_parent_mmsi') or existing.get('evidence_summary', {}).get('topCandidateMmsi')
if current_candidate:
candidates.setdefault(current_candidate, {'sources': set()})['sources'].add('PREVIOUS_SELECTION')
if existing is not None:
rejected_mmsi = existing.get('rejected_candidate_mmsi')
rejected_at = existing.get('rejected_at')
if rejected_mmsi and rejected_at is not None:
cutoff = datetime.now(timezone.utc) - timedelta(hours=_REJECT_COOLDOWN_HOURS)
if rejected_at >= cutoff and rejected_mmsi in candidates:
candidates.pop(rejected_mmsi, None)
for excluded_mmsi in excluded_candidate_mmsis:
candidates.pop(excluded_mmsi, None)
scored: list[CandidateScore] = []
for candidate_mmsi, meta in candidates.items():
registry = registry_by_mmsi.get(candidate_mmsi)
score_row = score_lookup.get(candidate_mmsi, {})
raw = raw_metrics.get((group_key, sub_cluster_id, candidate_mmsi), {})
vessel_track = _get_vessel_track(vessel_store, candidate_mmsi, hours=6)
raw_track_similarity = 0.0
if center_track_latlon and vessel_track:
raw_track_similarity = compute_track_similarity(
center_track_latlon,
[(point['lat'], point['lon']) for point in vessel_track],
)
base_corr_score = float(score_row.get('current_score', 0.0) or 0.0)
streak_count = int(score_row.get('streak_count', 0) or 0)
stability_score = _clamp(streak_count / 6.0)
candidate_name = _candidate_name(candidate_mmsi, all_positions, registry)
name_match_score = _name_match_score(group_key, candidate_name, registry)
registry_bonus = 0.05 if registry is not None else 0.0
raw_visit_score = float(raw.get('visit_score_6h', 0.0) or 0.0)
raw_proximity_score = float(raw.get('proximity_score_6h', 0.0) or 0.0)
raw_activity_score = float(raw.get('activity_sync_score_6h', 0.0) or 0.0)
coverage_metrics = _build_track_coverage_metrics(
center_track=center_track,
vessel_track=vessel_track,
gear_center_lat=gear_center_lat,
gear_center_lon=gear_center_lon,
)
track_coverage_factor = float(coverage_metrics['trackCoverageFactor'])
visit_coverage_factor = float(coverage_metrics['visitCoverageFactor'])
activity_coverage_factor = float(coverage_metrics['activityCoverageFactor'])
track_similarity = _clamp(raw_track_similarity * track_coverage_factor)
visit_score = _clamp(raw_visit_score * visit_coverage_factor)
proximity_score = _clamp(raw_proximity_score * track_coverage_factor)
activity_score = _clamp(raw_activity_score * activity_coverage_factor)
weighted_score = (
0.40 * base_corr_score
+ 0.15 * name_match_score
+ 0.15 * track_similarity
+ 0.10 * visit_score
+ 0.05 * proximity_score
+ 0.05 * activity_score
+ 0.10 * stability_score
+ registry_bonus
)
pre_bonus_score, china_mmsi_bonus, final_score = _apply_final_score_bonus(
candidate_mmsi,
weighted_score,
)
prior_bonus = compute_prior_bonus_components(
observed_at=observed_at,
normalized_parent_name=normalized_parent_name,
episode_id=episode_assignment.episode_id,
candidate_mmsi=candidate_mmsi,
episode_prior_stats=episode_prior_stats,
lineage_prior_stats=lineage_prior_stats,
label_prior_stats=label_prior_stats,
)
final_score = _clamp(final_score + prior_bonus['priorBonusTotal'])
evidence = {
'normalizedParentName': normalized_parent_name,
'episodeId': episode_assignment.episode_id,
'continuitySource': episode_assignment.continuity_source,
'continuityScore': round(float(episode_assignment.continuity_score or 0.0), 6),
'sources': sorted(meta['sources']),
'trackAvailable': bool(vessel_track),
'registryMatched': registry is not None,
'coverage': coverage_metrics,
'evidenceConfidence': coverage_metrics['coverageFactor'],
'scoreBreakdown': {
'baseCorrScore': round(base_corr_score, 4),
'nameMatchScore': round(name_match_score, 4),
'trackSimilarityScore': round(track_similarity, 4),
'visitScore6h': round(visit_score, 4),
'proximityScore6h': round(proximity_score, 4),
'activitySyncScore6h': round(activity_score, 4),
'stabilityScore': round(stability_score, 4),
'registryBonus': round(registry_bonus, 4),
'preBonusScore': round(pre_bonus_score, 4),
'chinaMmsiBonus': round(china_mmsi_bonus, 4),
'episodePriorBonus': round(prior_bonus['episodePriorBonus'], 4),
'lineagePriorBonus': round(prior_bonus['lineagePriorBonus'], 4),
'labelPriorBonus': round(prior_bonus['labelPriorBonus'], 4),
'priorBonusTotal': round(prior_bonus['priorBonusTotal'], 4),
},
'scoreBreakdownRaw': {
'trackSimilarityScore': round(raw_track_similarity, 4),
'visitScore6h': round(raw_visit_score, 4),
'proximityScore6h': round(raw_proximity_score, 4),
'activitySyncScore6h': round(raw_activity_score, 4),
},
'chinaMmsiBonusApplied': china_mmsi_bonus > 0.0,
}
scored.append(CandidateScore(
mmsi=candidate_mmsi,
name=candidate_name,
vessel_id=registry.vessel_id if registry is not None else None,
target_type='VESSEL',
candidate_source=','.join(sorted(meta['sources'])),
base_corr_score=round(base_corr_score, 6),
name_match_score=round(name_match_score, 6),
track_similarity_score=round(track_similarity, 6),
visit_score_6h=round(visit_score, 6),
proximity_score_6h=round(proximity_score, 6),
activity_sync_score_6h=round(activity_score, 6),
stability_score=round(stability_score, 6),
registry_bonus=round(registry_bonus, 6),
episode_prior_bonus=round(prior_bonus['episodePriorBonus'], 6),
lineage_prior_bonus=round(prior_bonus['lineagePriorBonus'], 6),
label_prior_bonus=round(prior_bonus['labelPriorBonus'], 6),
final_score=round(final_score, 6),
streak_count=streak_count,
model_id=default_model_id,
model_name=default_model_name,
evidence=evidence,
))
scored.sort(
key=lambda item: (
item.final_score,
item.base_corr_score,
item.stability_score,
item.name_match_score,
item.mmsi,
),
reverse=True,
)
return scored
def _insert_candidate_snapshots(conn, observed_at: datetime, rows: list[tuple]) -> int:
if not rows:
return 0
cur = conn.cursor()
try:
from psycopg2.extras import execute_values
execute_values(
cur,
f"""
INSERT INTO {GEAR_GROUP_PARENT_CANDIDATE_SNAPSHOTS} (
observed_at, group_key, sub_cluster_id, parent_name, normalized_parent_name, episode_id, candidate_mmsi,
candidate_name, candidate_vessel_id, rank, candidate_source,
model_id, model_name, base_corr_score, name_match_score,
track_similarity_score, visit_score_6h, proximity_score_6h,
activity_sync_score_6h, stability_score, registry_bonus,
episode_prior_bonus, lineage_prior_bonus, label_prior_bonus,
final_score, margin_from_top, evidence
) VALUES %s
""",
rows,
page_size=200,
)
return len(rows)
finally:
cur.close()
def _insert_label_tracking_rows(conn, rows: list[tuple]) -> int:
if not rows:
return 0
cur = conn.cursor()
try:
from psycopg2.extras import execute_values
execute_values(
cur,
f"""
INSERT INTO {GEAR_PARENT_LABEL_TRACKING_CYCLES} (
label_session_id, observed_at, candidate_snapshot_observed_at, auto_status,
top_candidate_mmsi, top_candidate_name, top_candidate_score,
top_candidate_margin, candidate_count, labeled_candidate_present,
labeled_candidate_rank, labeled_candidate_score,
labeled_candidate_pre_bonus_score, labeled_candidate_margin_from_top,
matched_top1, matched_top3, evidence_summary
) VALUES %s
ON CONFLICT (label_session_id, observed_at) DO NOTHING
""",
rows,
page_size=200,
)
return len(rows)
finally:
cur.close()
def _upsert_resolution(conn, row: tuple) -> None:
cur = conn.cursor()
try:
cur.execute(
f"""
INSERT INTO {GEAR_GROUP_PARENT_RESOLUTION} (
group_key, sub_cluster_id, parent_name, normalized_parent_name,
episode_id, continuity_source, continuity_score, prior_bonus_total,
status, selected_parent_mmsi, selected_parent_name, selected_vessel_id,
confidence, decision_source, top_score, second_score, score_margin,
stable_cycles, last_evaluated_at, last_promoted_at, approved_by,
approved_at, manual_comment, rejected_candidate_mmsi, rejected_at,
evidence_summary, updated_at
) VALUES (
%s, %s, %s, %s,
%s, %s, %s, %s,
%s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s,
%s, %s, %s, %s,
%s::jsonb, %s
)
ON CONFLICT (group_key, sub_cluster_id)
DO UPDATE SET
parent_name = EXCLUDED.parent_name,
normalized_parent_name = EXCLUDED.normalized_parent_name,
episode_id = EXCLUDED.episode_id,
continuity_source = EXCLUDED.continuity_source,
continuity_score = EXCLUDED.continuity_score,
prior_bonus_total = EXCLUDED.prior_bonus_total,
status = EXCLUDED.status,
selected_parent_mmsi = EXCLUDED.selected_parent_mmsi,
selected_parent_name = EXCLUDED.selected_parent_name,
selected_vessel_id = EXCLUDED.selected_vessel_id,
confidence = EXCLUDED.confidence,
decision_source = EXCLUDED.decision_source,
top_score = EXCLUDED.top_score,
second_score = EXCLUDED.second_score,
score_margin = EXCLUDED.score_margin,
stable_cycles = EXCLUDED.stable_cycles,
last_evaluated_at = EXCLUDED.last_evaluated_at,
last_promoted_at = EXCLUDED.last_promoted_at,
approved_by = EXCLUDED.approved_by,
approved_at = EXCLUDED.approved_at,
manual_comment = EXCLUDED.manual_comment,
rejected_candidate_mmsi = EXCLUDED.rejected_candidate_mmsi,
rejected_at = EXCLUDED.rejected_at,
evidence_summary = EXCLUDED.evidence_summary,
updated_at = EXCLUDED.updated_at
""",
row,
)
finally:
cur.close()
def _label_tracking_row(
observed_at: datetime,
label_session: dict[str, Any],
auto_status: str,
top_candidate: Optional[CandidateScore],
margin: float,
candidates: list[CandidateScore],
) -> tuple:
labeled_candidate = next(
(candidate for candidate in candidates if candidate.mmsi == label_session['label_parent_mmsi']),
None,
)
labeled_rank = None
labeled_pre_bonus_score = None
labeled_margin_from_top = None
if labeled_candidate is not None:
for index, candidate in enumerate(candidates, start=1):
if candidate.mmsi == labeled_candidate.mmsi:
labeled_rank = index
break
labeled_pre_bonus_score = (
labeled_candidate.evidence.get('scoreBreakdown', {}).get('preBonusScore')
if isinstance(labeled_candidate.evidence.get('scoreBreakdown'), dict)
else None
)
labeled_margin_from_top = round(
(top_candidate.final_score - labeled_candidate.final_score) if top_candidate else 0.0,
6,
)
evidence_summary = {
'labelParentMmsi': label_session['label_parent_mmsi'],
'labelParentName': label_session.get('label_parent_name'),
'topCandidateSources': sorted(_candidate_sources(top_candidate)),
'candidateMmsis': [candidate.mmsi for candidate in candidates[:5]],
}
return (
label_session['id'],
observed_at,
observed_at,
auto_status,
top_candidate.mmsi if top_candidate else None,
top_candidate.name if top_candidate else None,
top_candidate.final_score if top_candidate else None,
margin if top_candidate else 0.0,
len(candidates),
labeled_candidate is not None,
labeled_rank,
labeled_candidate.final_score if labeled_candidate else None,
labeled_pre_bonus_score,
labeled_margin_from_top,
top_candidate is not None and label_session['label_parent_mmsi'] == top_candidate.mmsi,
labeled_rank is not None and labeled_rank <= 3,
json.dumps(evidence_summary, ensure_ascii=False),
)
def run_gear_parent_inference(vessel_store, gear_groups: list[dict], conn) -> dict[str, int]:
"""미해결 어구 그룹에 대한 대표 모선 추론 실행."""
observed_at = datetime.now(timezone.utc)
active_groups = [group for group in gear_groups if group.get('parent_name')]
if not active_groups:
return {'groups': 0, 'candidates': 0, 'promoted': 0, 'review_required': 0, 'skipped': 0, 'no_candidate': 0, 'direct_matched': 0, 'episode_snapshots': 0}
group_keys = sorted({group['parent_name'] for group in active_groups})
episode_inputs = [
group_to_episode_input(group, normalize_parent_name(group['parent_name']))
for group in active_groups
]
lineage_keys = sorted({item.normalized_parent_name for item in episode_inputs if item.normalized_parent_name})
previous_episodes = load_active_episode_states(conn, lineage_keys)
episode_plan = build_episode_plan(episode_inputs, previous_episodes)
episode_prior_stats = load_episode_prior_stats(conn, [assignment.episode_id for assignment in episode_plan.assignments.values()])
lineage_prior_stats = load_lineage_prior_stats(conn, lineage_keys)
label_prior_stats = load_label_prior_stats(conn, lineage_keys)
registry_by_mmsi, registry_by_name = _load_registry(conn)
_expire_label_sessions(conn)
existing_resolution = _load_existing_resolution(conn, group_keys)
all_positions = vessel_store.get_all_latest_positions()
direct_parent_groups = [
group for group in active_groups
if _direct_parent_member(group, all_positions) is not None
]
unresolved_groups = [
group for group in active_groups
if _direct_parent_member(group, all_positions) is None
]
default_model_id, default_model_name = _load_default_model(conn)
correlation_scores = _load_correlation_scores(conn, default_model_id, group_keys)
raw_metric_averages = _load_raw_metric_averages(conn, group_keys)
center_tracks = _load_group_center_tracks(conn, group_keys)
active_exclusions = _load_active_candidate_exclusions(conn, group_keys)
active_label_sessions = _load_active_label_sessions(conn, group_keys)
snapshot_rows: list[tuple] = []
label_tracking_rows: list[tuple] = []
episode_snapshot_payloads: dict[tuple[str, int], dict[str, Any]] = {}
promoted = 0
review_required = 0
skipped = 0
no_candidate = 0
direct_matched = 0
for group in direct_parent_groups:
group_key = group['parent_name']
sub_cluster_id = int(group.get('sub_cluster_id', 0))
key = (group_key, sub_cluster_id)
episode_assignment = episode_plan.assignments.get(key)
if episode_assignment is None:
continue
existing = existing_resolution.get(key)
direct_parent = _direct_parent_member(group, all_positions)
if direct_parent is None:
continue
normalized_parent_name = normalize_parent_name(group_key)
direct_parent_mmsi = str(direct_parent.get('mmsi') or '')
direct_parent_name = str(direct_parent.get('name') or group_key or direct_parent_mmsi)
stable_cycles = _direct_parent_stable_cycles(existing, direct_parent_mmsi)
status_reason = _status_reason(_DIRECT_PARENT_MATCH_STATUS)
evidence_summary = {
'episodeId': episode_assignment.episode_id,
'continuitySource': episode_assignment.continuity_source,
'continuityScore': episode_assignment.continuity_score,
'mergedFromEpisodeIds': episode_assignment.merged_from_episode_ids,
'splitFromEpisodeId': episode_assignment.split_from_episode_id,
'normalizedParentName': normalized_parent_name,
'candidateCount': 0,
'directParentMmsi': direct_parent_mmsi,
'directParentName': direct_parent_name,
'statusReason': status_reason,
'trackable': is_trackable_parent_name(group_key),
}
status = _DIRECT_PARENT_MATCH_STATUS
decision_source = 'DIRECT_PARENT_MATCH'
selected_parent_mmsi = direct_parent_mmsi
selected_parent_name = direct_parent_name
selected_vessel_id = registry_by_mmsi.get(direct_parent_mmsi).vessel_id if direct_parent_mmsi in registry_by_mmsi else None
confidence = 1.0
last_promoted_at = observed_at
if existing is not None and existing.get('status') == _MANUAL_CONFIRMED_STATUS:
status = _MANUAL_CONFIRMED_STATUS
decision_source = existing.get('decision_source') or 'MANUAL'
selected_parent_mmsi = existing.get('selected_parent_mmsi') or selected_parent_mmsi
selected_parent_name = existing.get('selected_parent_name') or selected_parent_name
selected_vessel_id = existing.get('selected_vessel_id') if existing.get('selected_vessel_id') is not None else selected_vessel_id
confidence = existing.get('confidence') or confidence
last_promoted_at = existing.get('approved_at') or last_promoted_at
evidence_summary['statusReason'] = existing.get('evidence_summary', {}).get('statusReason') or status_reason
_upsert_resolution(
conn,
(
group_key,
sub_cluster_id,
group_key,
normalized_parent_name,
episode_assignment.episode_id,
episode_assignment.continuity_source,
episode_assignment.continuity_score,
0.0,
status,
selected_parent_mmsi,
selected_parent_name,
selected_vessel_id,
confidence,
decision_source,
confidence or 0.0,
0.0,
confidence or 0.0,
stable_cycles,
observed_at,
last_promoted_at,
(existing or {}).get('approved_by'),
(existing or {}).get('approved_at'),
(existing or {}).get('manual_comment'),
(existing or {}).get('rejected_candidate_mmsi'),
(existing or {}).get('rejected_at'),
json.dumps(evidence_summary, ensure_ascii=False),
observed_at,
),
)
episode_snapshot_payloads[key] = {
'parentEpisodeIds': episode_assignment.merged_from_episode_ids,
'topCandidateMmsi': selected_parent_mmsi,
'topCandidateScore': confidence or 1.0,
'resolutionStatus': status,
'metadata': {
'splitFromEpisodeId': episode_assignment.split_from_episode_id,
'directParentMmsi': direct_parent_mmsi,
},
}
direct_matched += 1
for group in unresolved_groups:
group_key = group['parent_name']
sub_cluster_id = int(group.get('sub_cluster_id', 0))
key = (group_key, sub_cluster_id)
episode_assignment = episode_plan.assignments.get(key)
if episode_assignment is None:
continue
existing = existing_resolution.get(key)
normalized_parent_name = normalize_parent_name(group_key)
excluded_candidate_mmsis = set(active_exclusions['global'])
excluded_candidate_mmsis.update(active_exclusions['group'].get(key, set()))
active_label_session = active_label_sessions.get(key)
if not is_trackable_parent_name(group_key) and (existing or {}).get('status') != _MANUAL_CONFIRMED_STATUS:
skipped += 1
status_reason = _status_reason(_SHORT_NAME_STATUS)
evidence_summary = {
'episodeId': episode_assignment.episode_id,
'continuitySource': episode_assignment.continuity_source,
'continuityScore': episode_assignment.continuity_score,
'mergedFromEpisodeIds': episode_assignment.merged_from_episode_ids,
'splitFromEpisodeId': episode_assignment.split_from_episode_id,
'skipReason': status_reason,
'statusReason': status_reason,
'normalizedParentName': normalized_parent_name,
}
_upsert_resolution(
conn,
(
group_key,
sub_cluster_id,
group_key,
normalized_parent_name,
episode_assignment.episode_id,
episode_assignment.continuity_source,
episode_assignment.continuity_score,
0.0,
_SHORT_NAME_STATUS,
None,
None,
None,
None,
'AUTO_SKIP',
0.0,
0.0,
0.0,
0,
observed_at,
None,
None,
None,
(existing or {}).get('manual_comment'),
(existing or {}).get('rejected_candidate_mmsi'),
(existing or {}).get('rejected_at'),
json.dumps(evidence_summary, ensure_ascii=False),
observed_at,
),
)
episode_snapshot_payloads[key] = {
'parentEpisodeIds': episode_assignment.merged_from_episode_ids,
'topCandidateMmsi': None,
'topCandidateScore': 0.0,
'resolutionStatus': _SHORT_NAME_STATUS,
'metadata': {'skipReason': status_reason},
}
continue
candidates = _build_candidate_scores(
vessel_store=vessel_store,
observed_at=observed_at,
group=group,
episode_assignment=episode_assignment,
default_model_id=default_model_id,
default_model_name=default_model_name,
score_rows=correlation_scores.get(key, []),
raw_metrics=raw_metric_averages,
center_track=center_tracks.get(key, []),
all_positions=all_positions,
registry_by_mmsi=registry_by_mmsi,
registry_by_name=registry_by_name,
existing=existing,
excluded_candidate_mmsis=excluded_candidate_mmsis,
episode_prior_stats=episode_prior_stats,
lineage_prior_stats=lineage_prior_stats,
label_prior_stats=label_prior_stats,
)
top_candidate = candidates[0] if candidates else None
second_score = candidates[1].final_score if len(candidates) > 1 else 0.0
margin = round((top_candidate.final_score - second_score), 6) if top_candidate else 0.0
stable_cycles = _top_candidate_stable_cycles(existing, top_candidate)
for rank, candidate in enumerate(candidates, start=1):
snapshot_rows.append((
observed_at,
group_key,
sub_cluster_id,
group_key,
normalized_parent_name,
episode_assignment.episode_id,
candidate.mmsi,
candidate.name,
candidate.vessel_id,
rank,
candidate.candidate_source,
candidate.model_id,
candidate.model_name,
candidate.base_corr_score,
candidate.name_match_score,
candidate.track_similarity_score,
candidate.visit_score_6h,
candidate.proximity_score_6h,
candidate.activity_sync_score_6h,
candidate.stability_score,
candidate.registry_bonus,
candidate.episode_prior_bonus,
candidate.lineage_prior_bonus,
candidate.label_prior_bonus,
candidate.final_score,
round(top_candidate.final_score - candidate.final_score, 6) if top_candidate else 0.0,
json.dumps(candidate.evidence, ensure_ascii=False),
))
status, decision_source = _select_status(top_candidate, margin, stable_cycles)
auto_status = status
selected_parent_mmsi: Optional[str] = None
selected_parent_name: Optional[str] = None
selected_vessel_id: Optional[int] = None
confidence: Optional[float] = None
last_promoted_at: Optional[datetime] = None
if top_candidate is not None:
if status == _AUTO_PROMOTED_STATUS:
selected_parent_mmsi = top_candidate.mmsi
selected_parent_name = top_candidate.name
selected_vessel_id = top_candidate.vessel_id
confidence = top_candidate.final_score
last_promoted_at = observed_at
promoted += 1
elif status == _REVIEW_REQUIRED_STATUS:
selected_parent_mmsi = top_candidate.mmsi
selected_parent_name = top_candidate.name
selected_vessel_id = top_candidate.vessel_id
confidence = top_candidate.final_score
review_required += 1
elif status == _NO_CANDIDATE_STATUS:
no_candidate += 1
status_reason = _status_reason(status)
evidence_summary = {
'episodeId': episode_assignment.episode_id,
'continuitySource': episode_assignment.continuity_source,
'continuityScore': episode_assignment.continuity_score,
'mergedFromEpisodeIds': episode_assignment.merged_from_episode_ids,
'splitFromEpisodeId': episode_assignment.split_from_episode_id,
'normalizedParentName': normalized_parent_name,
'candidateCount': len(candidates),
'topCandidateMmsi': top_candidate.mmsi if top_candidate else None,
'topCandidateName': top_candidate.name if top_candidate else None,
'topCandidateSources': sorted(_candidate_sources(top_candidate)),
'hasCorrelationCandidate': 'CORRELATION' in _candidate_sources(top_candidate),
'recentTopCandidateStableCycles': stable_cycles,
'skipReason': _status_reason(_SHORT_NAME_STATUS) if status == _SHORT_NAME_STATUS else None,
'statusReason': status_reason,
'trackable': is_trackable_parent_name(group_key),
'priorBonusTotal': top_candidate.evidence.get('scoreBreakdown', {}).get('priorBonusTotal') if top_candidate else 0.0,
}
if excluded_candidate_mmsis:
evidence_summary['excludedCandidateMmsis'] = sorted(excluded_candidate_mmsis)
if active_label_session is not None:
evidence_summary['activeLabelSessionId'] = active_label_session['id']
evidence_summary['activeLabelParentMmsi'] = active_label_session['label_parent_mmsi']
if existing is not None and existing.get('status') == _MANUAL_CONFIRMED_STATUS:
status = _MANUAL_CONFIRMED_STATUS
decision_source = existing.get('decision_source') or 'MANUAL'
selected_parent_mmsi = existing.get('selected_parent_mmsi')
selected_parent_name = existing.get('selected_parent_name')
selected_vessel_id = existing.get('selected_vessel_id')
confidence = existing.get('confidence') or confidence
last_promoted_at = existing.get('approved_at') or existing.get('rejected_at') or last_promoted_at
_upsert_resolution(
conn,
(
group_key,
sub_cluster_id,
group_key,
normalized_parent_name,
episode_assignment.episode_id,
episode_assignment.continuity_source,
episode_assignment.continuity_score,
top_candidate.evidence.get('scoreBreakdown', {}).get('priorBonusTotal', 0.0) if top_candidate else 0.0,
status,
selected_parent_mmsi,
selected_parent_name,
selected_vessel_id,
confidence,
decision_source,
top_candidate.final_score if top_candidate else 0.0,
second_score,
margin,
stable_cycles,
observed_at,
last_promoted_at,
(existing or {}).get('approved_by'),
(existing or {}).get('approved_at'),
(existing or {}).get('manual_comment'),
(existing or {}).get('rejected_candidate_mmsi'),
(existing or {}).get('rejected_at'),
json.dumps(evidence_summary, ensure_ascii=False),
observed_at,
),
)
episode_snapshot_payloads[key] = {
'parentEpisodeIds': episode_assignment.merged_from_episode_ids,
'topCandidateMmsi': top_candidate.mmsi if top_candidate else None,
'topCandidateScore': top_candidate.final_score if top_candidate else 0.0,
'resolutionStatus': status,
'metadata': {
'splitFromEpisodeId': episode_assignment.split_from_episode_id,
'candidateCount': len(candidates),
'topCandidateSources': sorted(_candidate_sources(top_candidate)),
},
}
if active_label_session is not None:
label_tracking_rows.append(
_label_tracking_row(
observed_at=observed_at,
label_session=active_label_session,
auto_status=auto_status,
top_candidate=top_candidate,
margin=margin,
candidates=candidates,
)
)
sync_episode_states(conn, observed_at, episode_plan)
inserted = _insert_candidate_snapshots(conn, observed_at, snapshot_rows)
episode_snapshots_inserted = insert_episode_snapshots(conn, observed_at, episode_plan, episode_snapshot_payloads)
tracking_inserted = _insert_label_tracking_rows(conn, label_tracking_rows)
conn.commit()
logger.info(
'gear parent inference: %d groups, %d direct-match, %d candidates, %d promoted, %d review, %d skipped, %d no-candidate, %d episode-snapshots, %d label-tracking',
len(active_groups),
direct_matched,
inserted,
promoted,
review_required,
skipped,
no_candidate,
episode_snapshots_inserted,
tracking_inserted,
)
return {
'groups': len(active_groups),
'candidates': inserted,
'promoted': promoted,
'review_required': review_required,
'skipped': skipped,
'no_candidate': no_candidate,
'direct_matched': direct_matched,
'episode_snapshots': episode_snapshots_inserted,
'label_tracking': tracking_inserted,
}