"""어구 그룹 대표 모선 추론.""" from __future__ import annotations import json import logging import math from dataclasses import dataclass from datetime import datetime, timedelta, timezone from typing import Any, Optional from algorithms.gear_correlation import _get_vessel_track from algorithms.gear_parent_episode import ( build_episode_plan, compute_prior_bonus_components, group_to_episode_input, insert_episode_snapshots, load_active_episode_states, load_episode_prior_stats, load_label_prior_stats, load_lineage_prior_stats, sync_episode_states, ) from algorithms.gear_name_rules import is_trackable_parent_name, normalize_parent_name from algorithms.track_similarity import compute_track_similarity_v2, _resample_temporal, haversine_m _KST = timezone(timedelta(hours=9)) def _to_epoch_ms(ts) -> int: """timestamp를 epoch_ms로 변환. tz-naive는 KST로 간주.""" if hasattr(ts, 'timestamp'): if hasattr(ts, 'tzinfo') and ts.tzinfo is not None: return int(ts.timestamp() * 1000) # tz-naive → KST wall-clock으로 간주 import pandas as pd if isinstance(ts, pd.Timestamp): return int(ts.tz_localize(_KST).timestamp() * 1000) return int(ts.replace(tzinfo=_KST).timestamp() * 1000) return int(ts) from config import qualified_table logger = logging.getLogger(__name__) FLEET_VESSELS = qualified_table('fleet_vessels') GROUP_POLYGON_SNAPSHOTS = qualified_table('group_polygon_snapshots') GEAR_CORRELATION_SCORES = qualified_table('gear_correlation_scores') GEAR_CORRELATION_RAW_METRICS = qualified_table('gear_correlation_raw_metrics') CORRELATION_PARAM_MODELS = qualified_table('correlation_param_models') GEAR_GROUP_PARENT_CANDIDATE_SNAPSHOTS = qualified_table('gear_group_parent_candidate_snapshots') GEAR_GROUP_PARENT_RESOLUTION = qualified_table('gear_group_parent_resolution') GEAR_PARENT_CANDIDATE_EXCLUSIONS = qualified_table('gear_parent_candidate_exclusions') GEAR_PARENT_LABEL_SESSIONS = qualified_table('gear_parent_label_sessions') GEAR_PARENT_LABEL_TRACKING_CYCLES = qualified_table('gear_parent_label_tracking_cycles') _SHORT_NAME_STATUS = 'SKIPPED_SHORT_NAME' _NO_CANDIDATE_STATUS = 'NO_CANDIDATE' _MANUAL_CONFIRMED_STATUS = 'MANUAL_CONFIRMED' _AUTO_PROMOTED_STATUS = 'AUTO_PROMOTED' _REVIEW_REQUIRED_STATUS = 'REVIEW_REQUIRED' _UNRESOLVED_STATUS = 'UNRESOLVED' _DIRECT_PARENT_MATCH_STATUS = 'DIRECT_PARENT_MATCH' _REJECT_COOLDOWN_HOURS = 24 _MAX_CORRELATION_CANDIDATES = 5 _MIN_AUTO_PROMOTION_STABLE_CYCLES = 3 _MIN_AUTO_PROMOTION_SCORE = 0.72 _MIN_AUTO_PROMOTION_MARGIN = 0.15 _MIN_REVIEW_REQUIRED_SCORE = 0.60 _MIN_PREFIX_BONUS_SCORE = 0.50 _CHINA_MMSI_PREFIX_BONUS = 0.05 _CHINA_MMSI_PREFIXES = ('412', '413') _TRACK_SUPPORT_POINT_TARGET = 12 _TRACK_SUPPORT_SPAN_TARGET_MINUTES = 90.0 _VISIT_SUPPORT_POINT_TARGET = 8 _VISIT_SUPPORT_SPAN_TARGET_MINUTES = 60.0 _ACTIVITY_SUPPORT_POINT_TARGET = 12 _ACTIVITY_SUPPORT_SPAN_TARGET_MINUTES = 90.0 _VISIT_ZONE_THRESHOLD_NM = 5.0 _RAW_SCORE_WINDOW_HOURS = 6 @dataclass class RegistryVessel: vessel_id: int mmsi: str name_cn: str name_en: str @dataclass class CandidateScore: mmsi: str name: str vessel_id: Optional[int] target_type: str candidate_source: str base_corr_score: float name_match_score: float track_similarity_score: float visit_score_6h: float proximity_score_6h: float activity_sync_score_6h: float stability_score: float registry_bonus: float episode_prior_bonus: float lineage_prior_bonus: float label_prior_bonus: float final_score: float streak_count: int model_id: int model_name: str evidence: dict[str, Any] def _clamp(value: float, floor: float = 0.0, ceil: float = 1.0) -> float: return max(floor, min(ceil, value)) def _china_mmsi_prefix_bonus(mmsi: str, pre_bonus_score: float) -> float: if pre_bonus_score < _MIN_PREFIX_BONUS_SCORE: return 0.0 if any((mmsi or '').startswith(prefix) for prefix in _CHINA_MMSI_PREFIXES): return _CHINA_MMSI_PREFIX_BONUS return 0.0 def _apply_final_score_bonus(mmsi: str, weighted_score: float) -> tuple[float, float, float]: pre_bonus_score = _clamp(weighted_score) china_mmsi_bonus = _china_mmsi_prefix_bonus(mmsi, pre_bonus_score) final_score = _clamp(weighted_score + china_mmsi_bonus) return pre_bonus_score, china_mmsi_bonus, final_score def _to_aware_utc(value: Any) -> Optional[datetime]: if value is None: return None if isinstance(value, datetime): if value.tzinfo is None: return value.replace(tzinfo=timezone.utc) return value.astimezone(timezone.utc) try: parsed = datetime.fromisoformat(str(value)) except Exception: return None if parsed.tzinfo is None: return parsed.replace(tzinfo=timezone.utc) return parsed.astimezone(timezone.utc) def _span_minutes(timestamps: list[datetime]) -> float: if len(timestamps) < 2: return 0.0 return max(0.0, (timestamps[-1] - timestamps[0]).total_seconds() / 60.0) def _support_factor(point_count: int, span_minutes: float, point_target: int, span_target_minutes: float) -> float: if point_count <= 0 or span_minutes <= 0: return 0.0 point_support = min(1.0, point_count / max(point_target, 1)) span_support = min(1.0, span_minutes / max(span_target_minutes, 1.0)) return _clamp(math.sqrt(point_support * span_support)) def _haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float: earth_radius_nm = 3440.065 phi1 = math.radians(lat1) phi2 = math.radians(lat2) dphi = math.radians(lat2 - lat1) dlam = math.radians(lon2 - lon1) a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2 return earth_radius_nm * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) def _build_track_coverage_metrics( center_track: list[dict[str, Any]], vessel_track: list[dict[str, Any]], gear_center_lat: float, gear_center_lon: float, ) -> dict[str, float | int]: vessel_timestamps = sorted( ts for ts in (_to_aware_utc(point.get('timestamp')) for point in vessel_track) if ts is not None ) center_timestamps = sorted( ts for ts in (_to_aware_utc(point.get('timestamp')) for point in center_track) if ts is not None ) track_point_count = len(vessel_track) track_span_minutes = _span_minutes(vessel_timestamps) center_point_count = len(center_track) center_span_minutes = _span_minutes(center_timestamps) overlap_points: list[dict[str, Any]] = vessel_track if vessel_timestamps and center_timestamps: overlap_start = center_timestamps[0] overlap_end = center_timestamps[-1] overlap_points = [ point for point in vessel_track if (ts := _to_aware_utc(point.get('timestamp'))) is not None and overlap_start <= ts <= overlap_end ] overlap_timestamps = sorted( ts for ts in (_to_aware_utc(point.get('timestamp')) for point in overlap_points) if ts is not None ) overlap_point_count = len(overlap_points) overlap_span_minutes = _span_minutes(overlap_timestamps) in_zone_points = [ point for point in overlap_points if _haversine_nm(gear_center_lat, gear_center_lon, float(point['lat']), float(point['lon'])) < _VISIT_ZONE_THRESHOLD_NM ] in_zone_timestamps = sorted( ts for ts in (_to_aware_utc(point.get('timestamp')) for point in in_zone_points) if ts is not None ) in_zone_point_count = len(in_zone_points) in_zone_span_minutes = _span_minutes(in_zone_timestamps) track_coverage_factor = _support_factor( track_point_count, track_span_minutes, _TRACK_SUPPORT_POINT_TARGET, _TRACK_SUPPORT_SPAN_TARGET_MINUTES, ) visit_coverage_factor = _support_factor( in_zone_point_count, in_zone_span_minutes, _VISIT_SUPPORT_POINT_TARGET, _VISIT_SUPPORT_SPAN_TARGET_MINUTES, ) activity_coverage_factor = _support_factor( in_zone_point_count, in_zone_span_minutes, _ACTIVITY_SUPPORT_POINT_TARGET, _ACTIVITY_SUPPORT_SPAN_TARGET_MINUTES, ) coverage_factor = round( (track_coverage_factor + visit_coverage_factor + activity_coverage_factor) / 3.0, 4, ) return { 'trackPointCount': track_point_count, 'trackSpanMinutes': round(track_span_minutes, 1), 'centerPointCount': center_point_count, 'centerSpanMinutes': round(center_span_minutes, 1), 'overlapPointCount': overlap_point_count, 'overlapSpanMinutes': round(overlap_span_minutes, 1), 'inZonePointCount': in_zone_point_count, 'inZoneSpanMinutes': round(in_zone_span_minutes, 1), 'trackCoverageFactor': round(track_coverage_factor, 4), 'visitCoverageFactor': round(visit_coverage_factor, 4), 'activityCoverageFactor': round(activity_coverage_factor, 4), 'coverageFactor': coverage_factor, 'scoreWindowHours': _RAW_SCORE_WINDOW_HOURS, } def _candidate_sources(candidate: Optional[CandidateScore]) -> set[str]: if candidate is None: return set() raw = candidate.evidence.get('sources') if isinstance(raw, list): return {str(item) for item in raw if item} return set() def _top_candidate_stable_cycles(existing: Optional[dict[str, Any]], top_candidate: Optional[CandidateScore]) -> int: if top_candidate is None: return 0 previous_mmsi = None previous_cycles = 0 if existing is not None: previous_summary = existing.get('evidence_summary') or {} previous_mmsi = previous_summary.get('topCandidateMmsi') previous_cycles = int(existing.get('stable_cycles') or 0) if previous_mmsi == top_candidate.mmsi: return max(previous_cycles + 1, 1) return 1 def _status_reason(status: str) -> Optional[str]: if status == _SHORT_NAME_STATUS: return '정규화 이름 길이 4 미만' if status == _NO_CANDIDATE_STATUS: return '후보를 생성하지 못함' if status == _DIRECT_PARENT_MATCH_STATUS: return '그룹 멤버에 직접 모선이 포함됨' return None def _select_status( top_candidate: Optional[CandidateScore], margin: float, stable_cycles: int, ) -> tuple[str, str]: if top_candidate is None: return _NO_CANDIDATE_STATUS, 'AUTO_NO_CANDIDATE' has_correlation = 'CORRELATION' in _candidate_sources(top_candidate) if ( top_candidate.target_type == 'VESSEL' and has_correlation and top_candidate.final_score >= _MIN_AUTO_PROMOTION_SCORE and margin >= _MIN_AUTO_PROMOTION_MARGIN and stable_cycles >= _MIN_AUTO_PROMOTION_STABLE_CYCLES ): return _AUTO_PROMOTED_STATUS, 'AUTO_PROMOTION' if top_candidate.final_score >= _MIN_REVIEW_REQUIRED_SCORE: return _REVIEW_REQUIRED_STATUS, 'AUTO_REVIEW' return _UNRESOLVED_STATUS, 'AUTO_SCORE' def _load_default_model(conn) -> tuple[int, str]: cur = conn.cursor() try: cur.execute( f""" SELECT id, name FROM {CORRELATION_PARAM_MODELS} WHERE is_active = TRUE ORDER BY is_default DESC, id ASC LIMIT 1 """ ) row = cur.fetchone() if row is None: return 1, 'default' return int(row[0]), row[1] or 'default' finally: cur.close() def _load_registry(conn) -> tuple[dict[str, RegistryVessel], dict[str, list[RegistryVessel]]]: cur = conn.cursor() try: cur.execute( f""" SELECT id, COALESCE(mmsi, ''), COALESCE(name_cn, ''), COALESCE(name_en, '') FROM {FLEET_VESSELS} """ ) by_mmsi: dict[str, RegistryVessel] = {} by_normalized_name: dict[str, list[RegistryVessel]] = {} for vessel_id, mmsi, name_cn, name_en in cur.fetchall(): vessel = RegistryVessel( vessel_id=int(vessel_id), mmsi=mmsi or '', name_cn=name_cn or '', name_en=name_en or '', ) if vessel.mmsi: by_mmsi[vessel.mmsi] = vessel for raw_name in (vessel.name_cn, vessel.name_en): normalized = normalize_parent_name(raw_name) if normalized: by_normalized_name.setdefault(normalized, []).append(vessel) return by_mmsi, by_normalized_name finally: cur.close() def _json_to_dict(value: Any) -> dict[str, Any]: if value is None: return {} if isinstance(value, dict): return value try: return json.loads(value) except Exception: return {} def _load_existing_resolution(conn, group_keys: list[str]) -> dict[tuple[str, int], dict[str, Any]]: if not group_keys: return {} cur = conn.cursor() try: cur.execute( f""" SELECT group_key, sub_cluster_id, parent_name, normalized_parent_name, status, selected_parent_mmsi, selected_parent_name, selected_vessel_id, confidence, decision_source, top_score, second_score, score_margin, stable_cycles, approved_by, approved_at, manual_comment, rejected_candidate_mmsi, rejected_at, evidence_summary, episode_id, continuity_source, continuity_score, prior_bonus_total FROM {GEAR_GROUP_PARENT_RESOLUTION} WHERE group_key = ANY(%s) """, (group_keys,), ) result: dict[tuple[str, int], dict[str, Any]] = {} for row in cur.fetchall(): key = (row[0], int(row[1])) result[key] = { 'parent_name': row[2], 'normalized_parent_name': row[3], 'status': row[4], 'selected_parent_mmsi': row[5], 'selected_parent_name': row[6], 'selected_vessel_id': row[7], 'confidence': row[8], 'decision_source': row[9], 'top_score': row[10] or 0.0, 'second_score': row[11] or 0.0, 'score_margin': row[12] or 0.0, 'stable_cycles': row[13] or 0, 'approved_by': row[14], 'approved_at': row[15], 'manual_comment': row[16], 'rejected_candidate_mmsi': row[17], 'rejected_at': row[18], 'evidence_summary': _json_to_dict(row[19]), 'episode_id': row[20], 'continuity_source': row[21], 'continuity_score': row[22] or 0.0, 'prior_bonus_total': row[23] or 0.0, } return result finally: cur.close() def _expire_label_sessions(conn) -> None: cur = conn.cursor() try: cur.execute( f""" UPDATE {GEAR_PARENT_LABEL_SESSIONS} SET status = 'EXPIRED', updated_at = NOW() WHERE status = 'ACTIVE' AND active_until <= NOW() """ ) finally: cur.close() def _load_active_candidate_exclusions(conn, group_keys: list[str]) -> dict[str, Any]: result: dict[str, Any] = { 'global': set(), 'group': {}, } cur = conn.cursor() try: cur.execute( f""" SELECT scope_type, group_key, sub_cluster_id, candidate_mmsi FROM {GEAR_PARENT_CANDIDATE_EXCLUSIONS} WHERE released_at IS NULL AND active_from <= NOW() AND (active_until IS NULL OR active_until > NOW()) AND (scope_type = 'GLOBAL' OR group_key = ANY(%s)) ORDER BY active_from DESC, id DESC """, (group_keys or [''],), ) for scope_type, group_key, sub_cluster_id, candidate_mmsi in cur.fetchall(): if scope_type == 'GLOBAL': result['global'].add(candidate_mmsi) continue key = (group_key, int(sub_cluster_id)) result['group'].setdefault(key, set()).add(candidate_mmsi) return result finally: cur.close() def _load_active_label_sessions(conn, group_keys: list[str]) -> dict[tuple[str, int], dict[str, Any]]: if not group_keys: return {} cur = conn.cursor() try: cur.execute( f""" SELECT DISTINCT ON (group_key, sub_cluster_id) id, group_key, sub_cluster_id, label_parent_mmsi, label_parent_name, label_parent_vessel_id, duration_days, active_from, active_until, actor, comment, metadata FROM {GEAR_PARENT_LABEL_SESSIONS} WHERE status = 'ACTIVE' AND active_from <= NOW() AND active_until > NOW() AND group_key = ANY(%s) ORDER BY group_key, sub_cluster_id, active_from DESC, id DESC """, (group_keys,), ) result: dict[tuple[str, int], dict[str, Any]] = {} for row in cur.fetchall(): result[(row[1], int(row[2]))] = { 'id': int(row[0]), 'group_key': row[1], 'sub_cluster_id': int(row[2]), 'label_parent_mmsi': row[3], 'label_parent_name': row[4], 'label_parent_vessel_id': row[5], 'duration_days': int(row[6]), 'active_from': row[7], 'active_until': row[8], 'actor': row[9], 'comment': row[10], 'metadata': _json_to_dict(row[11]), } return result finally: cur.close() def _load_correlation_scores( conn, default_model_id: int, group_keys: list[str], ) -> dict[tuple[str, int], list[dict[str, Any]]]: if not group_keys: return {} cur = conn.cursor() try: cur.execute( f""" SELECT group_key, sub_cluster_id, target_mmsi, target_type, COALESCE(target_name, ''), current_score, streak_count FROM {GEAR_CORRELATION_SCORES} WHERE model_id = %s AND group_key = ANY(%s) AND target_type = 'VESSEL' ORDER BY group_key, sub_cluster_id, current_score DESC, last_observed_at DESC """, (default_model_id, group_keys), ) result: dict[tuple[str, int], list[dict[str, Any]]] = {} for row in cur.fetchall(): key = (row[0], int(row[1])) result.setdefault(key, []).append({ 'target_mmsi': row[2], 'target_type': row[3], 'target_name': row[4] or '', 'current_score': float(row[5] or 0.0), 'streak_count': int(row[6] or 0), }) return result finally: cur.close() def _load_raw_metric_averages(conn, group_keys: list[str]) -> dict[tuple[str, int, str], dict[str, float]]: if not group_keys: return {} cur = conn.cursor() try: cur.execute( f""" SELECT group_key, sub_cluster_id, target_mmsi, AVG(COALESCE(visit_score, 0)) AS avg_visit, AVG(COALESCE(proximity_ratio, 0)) AS avg_proximity, AVG(COALESCE(activity_sync, 0)) AS avg_activity FROM {GEAR_CORRELATION_RAW_METRICS} WHERE group_key = ANY(%s) AND observed_at > NOW() - INTERVAL '6 hours' GROUP BY group_key, sub_cluster_id, target_mmsi """, (group_keys,), ) result: dict[tuple[str, int, str], dict[str, float]] = {} for row in cur.fetchall(): result[(row[0], int(row[1]), row[2])] = { 'visit_score_6h': float(row[3] or 0.0), 'proximity_score_6h': float(row[4] or 0.0), 'activity_sync_score_6h': float(row[5] or 0.0), } return result finally: cur.close() def _load_group_center_tracks(conn, group_keys: list[str]) -> dict[tuple[str, int], list[dict[str, Any]]]: if not group_keys: return {} cur = conn.cursor() try: cur.execute( f""" SELECT group_key, sub_cluster_id, snapshot_time, ST_Y(center_point) AS lat, ST_X(center_point) AS lon FROM {GROUP_POLYGON_SNAPSHOTS} WHERE group_key = ANY(%s) AND resolution = '1h' AND center_point IS NOT NULL AND snapshot_time > NOW() - INTERVAL '6 hours' ORDER BY group_key, sub_cluster_id, snapshot_time ASC """, (group_keys,), ) result: dict[tuple[str, int], list[dict[str, Any]]] = {} for row in cur.fetchall(): result.setdefault((row[0], int(row[1])), []).append({ 'timestamp': row[2], 'lat': float(row[3]), 'lon': float(row[4]), }) return result finally: cur.close() def _name_match_score(parent_name: str, candidate_name: str, registry: Optional[RegistryVessel]) -> float: def score_pair(left: str, right: str) -> float: raw_left = (left or '').strip().upper() raw_right = (right or '').strip().upper() normalized_left = normalize_parent_name(left) normalized_right = normalize_parent_name(right) alpha_left = ''.join(ch for ch in normalized_left if ch.isalpha()) alpha_right = ''.join(ch for ch in normalized_right if ch.isalpha()) if not normalized_left or not normalized_right: return 0.0 if raw_left and raw_left == raw_right: return 1.0 if normalized_left == normalized_right: return 0.8 if normalized_left.startswith(normalized_right) or normalized_right.startswith(normalized_left): return 0.5 if normalized_left in normalized_right or normalized_right in normalized_left: return 0.5 if alpha_left and alpha_left == alpha_right: return 0.3 return 0.0 score = score_pair(parent_name, candidate_name) if registry is not None: score = max(score, score_pair(parent_name, registry.name_cn)) score = max(score, score_pair(parent_name, registry.name_en)) return score def _candidate_name(candidate_mmsi: str, all_positions: dict[str, dict], registry: Optional[RegistryVessel]) -> str: position_name = (all_positions.get(candidate_mmsi) or {}).get('name', '') if position_name: return position_name if registry is not None: return registry.name_cn or registry.name_en or candidate_mmsi return candidate_mmsi def _direct_parent_member(group: dict[str, Any], all_positions: dict[str, dict]) -> Optional[dict[str, Any]]: members = group.get('members') or [] for member in members: if member.get('isParent') and member.get('mmsi'): return member parent_mmsi = group.get('parent_mmsi') if not parent_mmsi: return None position = all_positions.get(parent_mmsi) or {} return { 'mmsi': parent_mmsi, 'name': position.get('name') or group.get('parent_name') or parent_mmsi, } def _direct_parent_stable_cycles(existing: Optional[dict[str, Any]], direct_parent_mmsi: str) -> int: if existing is None or not direct_parent_mmsi: return 1 previous_mmsi = existing.get('selected_parent_mmsi') if not previous_mmsi: previous_summary = existing.get('evidence_summary') or {} previous_mmsi = previous_summary.get('directParentMmsi') or previous_summary.get('topCandidateMmsi') previous_cycles = int(existing.get('stable_cycles') or 0) if previous_mmsi == direct_parent_mmsi: return max(previous_cycles + 1, 1) return 1 def _build_candidate_scores( vessel_store, observed_at: datetime, group: dict[str, Any], episode_assignment, default_model_id: int, default_model_name: str, score_rows: list[dict[str, Any]], raw_metrics: dict[tuple[str, int, str], dict[str, float]], center_track: list[dict[str, Any]], all_positions: dict[str, dict], registry_by_mmsi: dict[str, RegistryVessel], registry_by_name: dict[str, list[RegistryVessel]], existing: Optional[dict[str, Any]], excluded_candidate_mmsis: set[str], episode_prior_stats: dict[tuple[str, str], dict[str, Any]], lineage_prior_stats: dict[tuple[str, str], dict[str, Any]], label_prior_stats: dict[tuple[str, str], dict[str, Any]], ) -> list[CandidateScore]: group_key = group['parent_name'] sub_cluster_id = int(group.get('sub_cluster_id', 0)) normalized_parent_name = normalize_parent_name(group_key) members = group.get('members') or [] if members: gear_center_lat = sum(float(member['lat']) for member in members) / len(members) gear_center_lon = sum(float(member['lon']) for member in members) / len(members) else: gear_center_lat = 0.0 gear_center_lon = 0.0 candidates: dict[str, dict[str, Any]] = {} score_lookup = {row['target_mmsi']: row for row in score_rows} center_track_latlon = [ (float(point['lat']), float(point['lon'])) for point in center_track if point.get('lat') is not None and point.get('lon') is not None ] # v2: 시간 정렬 비교용 (ts = epoch_ms) center_track_temporal = [ {'lat': float(point['lat']), 'lon': float(point['lon']), 'ts': _to_epoch_ms(point['timestamp'])} for point in center_track if point.get('lat') is not None and point.get('lon') is not None and point.get('timestamp') is not None ] for row in score_rows[:_MAX_CORRELATION_CANDIDATES]: candidates.setdefault(row['target_mmsi'], {'sources': set()})['sources'].add('CORRELATION') for vessel in registry_by_name.get(normalized_parent_name, []): if vessel.mmsi: candidates.setdefault(vessel.mmsi, {'sources': set()})['sources'].add('REGISTRY_NAME') if existing is not None and existing.get('episode_id') == episode_assignment.episode_id: current_candidate = existing.get('selected_parent_mmsi') or existing.get('evidence_summary', {}).get('topCandidateMmsi') if current_candidate: candidates.setdefault(current_candidate, {'sources': set()})['sources'].add('PREVIOUS_SELECTION') if existing is not None: rejected_mmsi = existing.get('rejected_candidate_mmsi') rejected_at = existing.get('rejected_at') if rejected_mmsi and rejected_at is not None: cutoff = datetime.now(timezone.utc) - timedelta(hours=_REJECT_COOLDOWN_HOURS) if rejected_at >= cutoff and rejected_mmsi in candidates: candidates.pop(rejected_mmsi, None) for excluded_mmsi in excluded_candidate_mmsis: candidates.pop(excluded_mmsi, None) scored: list[CandidateScore] = [] for candidate_mmsi, meta in candidates.items(): registry = registry_by_mmsi.get(candidate_mmsi) score_row = score_lookup.get(candidate_mmsi, {}) raw = raw_metrics.get((group_key, sub_cluster_id, candidate_mmsi), {}) vessel_track = _get_vessel_track(vessel_store, candidate_mmsi, hours=6) raw_track_similarity = 0.0 vessel_track_temporal: list[dict] = [] if center_track_temporal and vessel_track: vessel_track_temporal = [ {'lat': p['lat'], 'lon': p['lon'], 'cog': p.get('cog'), 'ts': _to_epoch_ms(p['timestamp'])} for p in vessel_track if p.get('lat') is not None and p.get('lon') is not None ] raw_track_similarity = compute_track_similarity_v2( center_track_temporal, vessel_track_temporal, ) base_corr_score = float(score_row.get('current_score', 0.0) or 0.0) streak_count = int(score_row.get('streak_count', 0) or 0) stability_score = _clamp(streak_count / 18.0) candidate_name = _candidate_name(candidate_mmsi, all_positions, registry) name_match_score = _name_match_score(group_key, candidate_name, registry) registry_bonus = 0.05 if registry is not None else 0.0 raw_visit_score = float(raw.get('visit_score_6h', 0.0) or 0.0) raw_proximity_score = float(raw.get('proximity_score_6h', 0.0) or 0.0) raw_activity_score = float(raw.get('activity_sync_score_6h', 0.0) or 0.0) coverage_metrics = _build_track_coverage_metrics( center_track=center_track, vessel_track=vessel_track, gear_center_lat=gear_center_lat, gear_center_lon=gear_center_lon, ) track_coverage_factor = float(coverage_metrics['trackCoverageFactor']) visit_coverage_factor = float(coverage_metrics['visitCoverageFactor']) activity_coverage_factor = float(coverage_metrics['activityCoverageFactor']) track_similarity = _clamp(raw_track_similarity * track_coverage_factor) visit_score = _clamp(raw_visit_score * visit_coverage_factor) activity_score = _clamp(raw_activity_score * activity_coverage_factor) # proximity: 시간 보간 중심점 기반 거리 구간 차등 점수 proximity_score = 0.0 if center_track_temporal and vessel_track: _NM_TO_M = 1852.0 slots_c = _resample_temporal(center_track_temporal) slots_v = _resample_temporal(vessel_track_temporal) map_c = {s['ts']: s for s in slots_c if s is not None} map_v = {s['ts']: s for s in slots_v if s is not None} common_ts = sorted(set(map_c.keys()) & set(map_v.keys())) if len(common_ts) >= 3: prox_sum = 0.0 for ts in common_ts: sc, sv = map_c[ts], map_v[ts] d_m = haversine_m(sc['lat'], sc['lon'], sv['lat'], sv['lon']) d_nm = d_m / _NM_TO_M if d_nm < 2.5: prox_sum += 1.0 elif d_nm < 5.0: prox_sum += 0.5 elif d_nm < 10.0: prox_sum += 0.15 proximity_score = _clamp(prox_sum / len(common_ts) * track_coverage_factor) weighted_score = ( 0.35 * base_corr_score + 0.15 * name_match_score + 0.15 * track_similarity + 0.10 * visit_score + 0.10 * proximity_score + 0.05 * activity_score + 0.10 * stability_score + registry_bonus ) pre_bonus_score, china_mmsi_bonus, final_score = _apply_final_score_bonus( candidate_mmsi, weighted_score, ) prior_bonus = compute_prior_bonus_components( observed_at=observed_at, normalized_parent_name=normalized_parent_name, episode_id=episode_assignment.episode_id, candidate_mmsi=candidate_mmsi, episode_prior_stats=episode_prior_stats, lineage_prior_stats=lineage_prior_stats, label_prior_stats=label_prior_stats, ) final_score = _clamp(final_score + prior_bonus['priorBonusTotal']) evidence = { 'normalizedParentName': normalized_parent_name, 'episodeId': episode_assignment.episode_id, 'continuitySource': episode_assignment.continuity_source, 'continuityScore': round(float(episode_assignment.continuity_score or 0.0), 6), 'sources': sorted(meta['sources']), 'trackAvailable': bool(vessel_track), 'registryMatched': registry is not None, 'coverage': coverage_metrics, 'evidenceConfidence': coverage_metrics['coverageFactor'], 'scoreBreakdown': { 'baseCorrScore': round(base_corr_score, 4), 'nameMatchScore': round(name_match_score, 4), 'trackSimilarityScore': round(track_similarity, 4), 'visitScore6h': round(visit_score, 4), 'proximityScore6h': round(proximity_score, 4), 'activitySyncScore6h': round(activity_score, 4), 'stabilityScore': round(stability_score, 4), 'registryBonus': round(registry_bonus, 4), 'preBonusScore': round(pre_bonus_score, 4), 'chinaMmsiBonus': round(china_mmsi_bonus, 4), 'episodePriorBonus': round(prior_bonus['episodePriorBonus'], 4), 'lineagePriorBonus': round(prior_bonus['lineagePriorBonus'], 4), 'labelPriorBonus': round(prior_bonus['labelPriorBonus'], 4), 'priorBonusTotal': round(prior_bonus['priorBonusTotal'], 4), }, 'scoreBreakdownRaw': { 'trackSimilarityScore': round(raw_track_similarity, 4), 'visitScore6h': round(raw_visit_score, 4), 'proximityScore6h': round(raw_proximity_score, 4), 'activitySyncScore6h': round(raw_activity_score, 4), }, 'chinaMmsiBonusApplied': china_mmsi_bonus > 0.0, } scored.append(CandidateScore( mmsi=candidate_mmsi, name=candidate_name, vessel_id=registry.vessel_id if registry is not None else None, target_type='VESSEL', candidate_source=','.join(sorted(meta['sources'])), base_corr_score=round(base_corr_score, 6), name_match_score=round(name_match_score, 6), track_similarity_score=round(track_similarity, 6), visit_score_6h=round(visit_score, 6), proximity_score_6h=round(proximity_score, 6), activity_sync_score_6h=round(activity_score, 6), stability_score=round(stability_score, 6), registry_bonus=round(registry_bonus, 6), episode_prior_bonus=round(prior_bonus['episodePriorBonus'], 6), lineage_prior_bonus=round(prior_bonus['lineagePriorBonus'], 6), label_prior_bonus=round(prior_bonus['labelPriorBonus'], 6), final_score=round(final_score, 6), streak_count=streak_count, model_id=default_model_id, model_name=default_model_name, evidence=evidence, )) scored.sort( key=lambda item: ( item.final_score, item.base_corr_score, item.stability_score, item.name_match_score, item.mmsi, ), reverse=True, ) return scored def _insert_candidate_snapshots(conn, observed_at: datetime, rows: list[tuple]) -> int: if not rows: return 0 cur = conn.cursor() try: from psycopg2.extras import execute_values execute_values( cur, f""" INSERT INTO {GEAR_GROUP_PARENT_CANDIDATE_SNAPSHOTS} ( observed_at, group_key, sub_cluster_id, parent_name, normalized_parent_name, episode_id, candidate_mmsi, candidate_name, candidate_vessel_id, rank, candidate_source, model_id, model_name, base_corr_score, name_match_score, track_similarity_score, visit_score_6h, proximity_score_6h, activity_sync_score_6h, stability_score, registry_bonus, episode_prior_bonus, lineage_prior_bonus, label_prior_bonus, final_score, margin_from_top, evidence ) VALUES %s """, rows, page_size=200, ) return len(rows) finally: cur.close() def _insert_label_tracking_rows(conn, rows: list[tuple]) -> int: if not rows: return 0 cur = conn.cursor() try: from psycopg2.extras import execute_values execute_values( cur, f""" INSERT INTO {GEAR_PARENT_LABEL_TRACKING_CYCLES} ( label_session_id, observed_at, candidate_snapshot_observed_at, auto_status, top_candidate_mmsi, top_candidate_name, top_candidate_score, top_candidate_margin, candidate_count, labeled_candidate_present, labeled_candidate_rank, labeled_candidate_score, labeled_candidate_pre_bonus_score, labeled_candidate_margin_from_top, matched_top1, matched_top3, evidence_summary ) VALUES %s ON CONFLICT (label_session_id, observed_at) DO NOTHING """, rows, page_size=200, ) return len(rows) finally: cur.close() def _upsert_resolution(conn, row: tuple) -> None: cur = conn.cursor() try: cur.execute( f""" INSERT INTO {GEAR_GROUP_PARENT_RESOLUTION} ( group_key, sub_cluster_id, parent_name, normalized_parent_name, episode_id, continuity_source, continuity_score, prior_bonus_total, status, selected_parent_mmsi, selected_parent_name, selected_vessel_id, confidence, decision_source, top_score, second_score, score_margin, stable_cycles, last_evaluated_at, last_promoted_at, approved_by, approved_at, manual_comment, rejected_candidate_mmsi, rejected_at, evidence_summary, updated_at ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s ) ON CONFLICT (group_key, sub_cluster_id) DO UPDATE SET parent_name = EXCLUDED.parent_name, normalized_parent_name = EXCLUDED.normalized_parent_name, episode_id = EXCLUDED.episode_id, continuity_source = EXCLUDED.continuity_source, continuity_score = EXCLUDED.continuity_score, prior_bonus_total = EXCLUDED.prior_bonus_total, status = EXCLUDED.status, selected_parent_mmsi = EXCLUDED.selected_parent_mmsi, selected_parent_name = EXCLUDED.selected_parent_name, selected_vessel_id = EXCLUDED.selected_vessel_id, confidence = EXCLUDED.confidence, decision_source = EXCLUDED.decision_source, top_score = EXCLUDED.top_score, second_score = EXCLUDED.second_score, score_margin = EXCLUDED.score_margin, stable_cycles = EXCLUDED.stable_cycles, last_evaluated_at = EXCLUDED.last_evaluated_at, last_promoted_at = EXCLUDED.last_promoted_at, approved_by = EXCLUDED.approved_by, approved_at = EXCLUDED.approved_at, manual_comment = EXCLUDED.manual_comment, rejected_candidate_mmsi = EXCLUDED.rejected_candidate_mmsi, rejected_at = EXCLUDED.rejected_at, evidence_summary = EXCLUDED.evidence_summary, updated_at = EXCLUDED.updated_at """, row, ) finally: cur.close() def _label_tracking_row( observed_at: datetime, label_session: dict[str, Any], auto_status: str, top_candidate: Optional[CandidateScore], margin: float, candidates: list[CandidateScore], ) -> tuple: labeled_candidate = next( (candidate for candidate in candidates if candidate.mmsi == label_session['label_parent_mmsi']), None, ) labeled_rank = None labeled_pre_bonus_score = None labeled_margin_from_top = None if labeled_candidate is not None: for index, candidate in enumerate(candidates, start=1): if candidate.mmsi == labeled_candidate.mmsi: labeled_rank = index break labeled_pre_bonus_score = ( labeled_candidate.evidence.get('scoreBreakdown', {}).get('preBonusScore') if isinstance(labeled_candidate.evidence.get('scoreBreakdown'), dict) else None ) labeled_margin_from_top = round( (top_candidate.final_score - labeled_candidate.final_score) if top_candidate else 0.0, 6, ) evidence_summary = { 'labelParentMmsi': label_session['label_parent_mmsi'], 'labelParentName': label_session.get('label_parent_name'), 'topCandidateSources': sorted(_candidate_sources(top_candidate)), 'candidateMmsis': [candidate.mmsi for candidate in candidates[:5]], } return ( label_session['id'], observed_at, observed_at, auto_status, top_candidate.mmsi if top_candidate else None, top_candidate.name if top_candidate else None, top_candidate.final_score if top_candidate else None, margin if top_candidate else 0.0, len(candidates), labeled_candidate is not None, labeled_rank, labeled_candidate.final_score if labeled_candidate else None, labeled_pre_bonus_score, labeled_margin_from_top, top_candidate is not None and label_session['label_parent_mmsi'] == top_candidate.mmsi, labeled_rank is not None and labeled_rank <= 3, json.dumps(evidence_summary, ensure_ascii=False), ) def run_gear_parent_inference(vessel_store, gear_groups: list[dict], conn) -> dict[str, int]: """미해결 어구 그룹에 대한 대표 모선 추론 실행.""" observed_at = datetime.now(timezone.utc) active_groups = [group for group in gear_groups if group.get('parent_name')] if not active_groups: return {'groups': 0, 'candidates': 0, 'promoted': 0, 'review_required': 0, 'skipped': 0, 'no_candidate': 0, 'direct_matched': 0, 'episode_snapshots': 0} group_keys = sorted({group['parent_name'] for group in active_groups}) episode_inputs = [ group_to_episode_input(group, normalize_parent_name(group['parent_name'])) for group in active_groups ] lineage_keys = sorted({item.normalized_parent_name for item in episode_inputs if item.normalized_parent_name}) previous_episodes = load_active_episode_states(conn, lineage_keys) episode_plan = build_episode_plan(episode_inputs, previous_episodes) episode_prior_stats = load_episode_prior_stats(conn, [assignment.episode_id for assignment in episode_plan.assignments.values()]) lineage_prior_stats = load_lineage_prior_stats(conn, lineage_keys) label_prior_stats = load_label_prior_stats(conn, lineage_keys) registry_by_mmsi, registry_by_name = _load_registry(conn) _expire_label_sessions(conn) existing_resolution = _load_existing_resolution(conn, group_keys) all_positions = vessel_store.get_all_latest_positions() direct_parent_groups = [ group for group in active_groups if _direct_parent_member(group, all_positions) is not None ] unresolved_groups = [ group for group in active_groups if _direct_parent_member(group, all_positions) is None ] default_model_id, default_model_name = _load_default_model(conn) correlation_scores = _load_correlation_scores(conn, default_model_id, group_keys) raw_metric_averages = _load_raw_metric_averages(conn, group_keys) center_tracks = _load_group_center_tracks(conn, group_keys) active_exclusions = _load_active_candidate_exclusions(conn, group_keys) active_label_sessions = _load_active_label_sessions(conn, group_keys) snapshot_rows: list[tuple] = [] label_tracking_rows: list[tuple] = [] episode_snapshot_payloads: dict[tuple[str, int], dict[str, Any]] = {} promoted = 0 review_required = 0 skipped = 0 no_candidate = 0 direct_matched = 0 for group in direct_parent_groups: group_key = group['parent_name'] sub_cluster_id = int(group.get('sub_cluster_id', 0)) key = (group_key, sub_cluster_id) episode_assignment = episode_plan.assignments.get(key) if episode_assignment is None: continue existing = existing_resolution.get(key) direct_parent = _direct_parent_member(group, all_positions) if direct_parent is None: continue normalized_parent_name = normalize_parent_name(group_key) direct_parent_mmsi = str(direct_parent.get('mmsi') or '') direct_parent_name = str(direct_parent.get('name') or group_key or direct_parent_mmsi) stable_cycles = _direct_parent_stable_cycles(existing, direct_parent_mmsi) status_reason = _status_reason(_DIRECT_PARENT_MATCH_STATUS) evidence_summary = { 'episodeId': episode_assignment.episode_id, 'continuitySource': episode_assignment.continuity_source, 'continuityScore': episode_assignment.continuity_score, 'mergedFromEpisodeIds': episode_assignment.merged_from_episode_ids, 'splitFromEpisodeId': episode_assignment.split_from_episode_id, 'normalizedParentName': normalized_parent_name, 'candidateCount': 0, 'directParentMmsi': direct_parent_mmsi, 'directParentName': direct_parent_name, 'statusReason': status_reason, 'trackable': is_trackable_parent_name(group_key), } status = _DIRECT_PARENT_MATCH_STATUS decision_source = 'DIRECT_PARENT_MATCH' selected_parent_mmsi = direct_parent_mmsi selected_parent_name = direct_parent_name selected_vessel_id = registry_by_mmsi.get(direct_parent_mmsi).vessel_id if direct_parent_mmsi in registry_by_mmsi else None confidence = 1.0 last_promoted_at = observed_at if existing is not None and existing.get('status') == _MANUAL_CONFIRMED_STATUS: status = _MANUAL_CONFIRMED_STATUS decision_source = existing.get('decision_source') or 'MANUAL' selected_parent_mmsi = existing.get('selected_parent_mmsi') or selected_parent_mmsi selected_parent_name = existing.get('selected_parent_name') or selected_parent_name selected_vessel_id = existing.get('selected_vessel_id') if existing.get('selected_vessel_id') is not None else selected_vessel_id confidence = existing.get('confidence') or confidence last_promoted_at = existing.get('approved_at') or last_promoted_at evidence_summary['statusReason'] = existing.get('evidence_summary', {}).get('statusReason') or status_reason _upsert_resolution( conn, ( group_key, sub_cluster_id, group_key, normalized_parent_name, episode_assignment.episode_id, episode_assignment.continuity_source, episode_assignment.continuity_score, 0.0, status, selected_parent_mmsi, selected_parent_name, selected_vessel_id, confidence, decision_source, confidence or 0.0, 0.0, confidence or 0.0, stable_cycles, observed_at, last_promoted_at, (existing or {}).get('approved_by'), (existing or {}).get('approved_at'), (existing or {}).get('manual_comment'), (existing or {}).get('rejected_candidate_mmsi'), (existing or {}).get('rejected_at'), json.dumps(evidence_summary, ensure_ascii=False), observed_at, ), ) episode_snapshot_payloads[key] = { 'parentEpisodeIds': episode_assignment.merged_from_episode_ids, 'topCandidateMmsi': selected_parent_mmsi, 'topCandidateScore': confidence or 1.0, 'resolutionStatus': status, 'metadata': { 'splitFromEpisodeId': episode_assignment.split_from_episode_id, 'directParentMmsi': direct_parent_mmsi, }, } direct_matched += 1 for group in unresolved_groups: group_key = group['parent_name'] sub_cluster_id = int(group.get('sub_cluster_id', 0)) key = (group_key, sub_cluster_id) episode_assignment = episode_plan.assignments.get(key) if episode_assignment is None: continue existing = existing_resolution.get(key) normalized_parent_name = normalize_parent_name(group_key) excluded_candidate_mmsis = set(active_exclusions['global']) excluded_candidate_mmsis.update(active_exclusions['group'].get(key, set())) active_label_session = active_label_sessions.get(key) if not is_trackable_parent_name(group_key) and (existing or {}).get('status') != _MANUAL_CONFIRMED_STATUS: skipped += 1 status_reason = _status_reason(_SHORT_NAME_STATUS) evidence_summary = { 'episodeId': episode_assignment.episode_id, 'continuitySource': episode_assignment.continuity_source, 'continuityScore': episode_assignment.continuity_score, 'mergedFromEpisodeIds': episode_assignment.merged_from_episode_ids, 'splitFromEpisodeId': episode_assignment.split_from_episode_id, 'skipReason': status_reason, 'statusReason': status_reason, 'normalizedParentName': normalized_parent_name, } _upsert_resolution( conn, ( group_key, sub_cluster_id, group_key, normalized_parent_name, episode_assignment.episode_id, episode_assignment.continuity_source, episode_assignment.continuity_score, 0.0, _SHORT_NAME_STATUS, None, None, None, None, 'AUTO_SKIP', 0.0, 0.0, 0.0, 0, observed_at, None, None, None, (existing or {}).get('manual_comment'), (existing or {}).get('rejected_candidate_mmsi'), (existing or {}).get('rejected_at'), json.dumps(evidence_summary, ensure_ascii=False), observed_at, ), ) episode_snapshot_payloads[key] = { 'parentEpisodeIds': episode_assignment.merged_from_episode_ids, 'topCandidateMmsi': None, 'topCandidateScore': 0.0, 'resolutionStatus': _SHORT_NAME_STATUS, 'metadata': {'skipReason': status_reason}, } continue candidates = _build_candidate_scores( vessel_store=vessel_store, observed_at=observed_at, group=group, episode_assignment=episode_assignment, default_model_id=default_model_id, default_model_name=default_model_name, score_rows=correlation_scores.get(key, []), raw_metrics=raw_metric_averages, center_track=center_tracks.get(key, []), all_positions=all_positions, registry_by_mmsi=registry_by_mmsi, registry_by_name=registry_by_name, existing=existing, excluded_candidate_mmsis=excluded_candidate_mmsis, episode_prior_stats=episode_prior_stats, lineage_prior_stats=lineage_prior_stats, label_prior_stats=label_prior_stats, ) top_candidate = candidates[0] if candidates else None second_score = candidates[1].final_score if len(candidates) > 1 else 0.0 margin = round((top_candidate.final_score - second_score), 6) if top_candidate else 0.0 stable_cycles = _top_candidate_stable_cycles(existing, top_candidate) for rank, candidate in enumerate(candidates, start=1): snapshot_rows.append(( observed_at, group_key, sub_cluster_id, group_key, normalized_parent_name, episode_assignment.episode_id, candidate.mmsi, candidate.name, candidate.vessel_id, rank, candidate.candidate_source, candidate.model_id, candidate.model_name, candidate.base_corr_score, candidate.name_match_score, candidate.track_similarity_score, candidate.visit_score_6h, candidate.proximity_score_6h, candidate.activity_sync_score_6h, candidate.stability_score, candidate.registry_bonus, candidate.episode_prior_bonus, candidate.lineage_prior_bonus, candidate.label_prior_bonus, candidate.final_score, round(top_candidate.final_score - candidate.final_score, 6) if top_candidate else 0.0, json.dumps(candidate.evidence, ensure_ascii=False), )) status, decision_source = _select_status(top_candidate, margin, stable_cycles) auto_status = status selected_parent_mmsi: Optional[str] = None selected_parent_name: Optional[str] = None selected_vessel_id: Optional[int] = None confidence: Optional[float] = None last_promoted_at: Optional[datetime] = None if top_candidate is not None: if status == _AUTO_PROMOTED_STATUS: selected_parent_mmsi = top_candidate.mmsi selected_parent_name = top_candidate.name selected_vessel_id = top_candidate.vessel_id confidence = top_candidate.final_score last_promoted_at = observed_at promoted += 1 elif status == _REVIEW_REQUIRED_STATUS: selected_parent_mmsi = top_candidate.mmsi selected_parent_name = top_candidate.name selected_vessel_id = top_candidate.vessel_id confidence = top_candidate.final_score review_required += 1 elif status == _NO_CANDIDATE_STATUS: no_candidate += 1 status_reason = _status_reason(status) evidence_summary = { 'episodeId': episode_assignment.episode_id, 'continuitySource': episode_assignment.continuity_source, 'continuityScore': episode_assignment.continuity_score, 'mergedFromEpisodeIds': episode_assignment.merged_from_episode_ids, 'splitFromEpisodeId': episode_assignment.split_from_episode_id, 'normalizedParentName': normalized_parent_name, 'candidateCount': len(candidates), 'topCandidateMmsi': top_candidate.mmsi if top_candidate else None, 'topCandidateName': top_candidate.name if top_candidate else None, 'topCandidateSources': sorted(_candidate_sources(top_candidate)), 'hasCorrelationCandidate': 'CORRELATION' in _candidate_sources(top_candidate), 'recentTopCandidateStableCycles': stable_cycles, 'skipReason': _status_reason(_SHORT_NAME_STATUS) if status == _SHORT_NAME_STATUS else None, 'statusReason': status_reason, 'trackable': is_trackable_parent_name(group_key), 'priorBonusTotal': top_candidate.evidence.get('scoreBreakdown', {}).get('priorBonusTotal') if top_candidate else 0.0, } if excluded_candidate_mmsis: evidence_summary['excludedCandidateMmsis'] = sorted(excluded_candidate_mmsis) if active_label_session is not None: evidence_summary['activeLabelSessionId'] = active_label_session['id'] evidence_summary['activeLabelParentMmsi'] = active_label_session['label_parent_mmsi'] if existing is not None and existing.get('status') == _MANUAL_CONFIRMED_STATUS: status = _MANUAL_CONFIRMED_STATUS decision_source = existing.get('decision_source') or 'MANUAL' selected_parent_mmsi = existing.get('selected_parent_mmsi') selected_parent_name = existing.get('selected_parent_name') selected_vessel_id = existing.get('selected_vessel_id') confidence = existing.get('confidence') or confidence last_promoted_at = existing.get('approved_at') or existing.get('rejected_at') or last_promoted_at _upsert_resolution( conn, ( group_key, sub_cluster_id, group_key, normalized_parent_name, episode_assignment.episode_id, episode_assignment.continuity_source, episode_assignment.continuity_score, top_candidate.evidence.get('scoreBreakdown', {}).get('priorBonusTotal', 0.0) if top_candidate else 0.0, status, selected_parent_mmsi, selected_parent_name, selected_vessel_id, confidence, decision_source, top_candidate.final_score if top_candidate else 0.0, second_score, margin, stable_cycles, observed_at, last_promoted_at, (existing or {}).get('approved_by'), (existing or {}).get('approved_at'), (existing or {}).get('manual_comment'), (existing or {}).get('rejected_candidate_mmsi'), (existing or {}).get('rejected_at'), json.dumps(evidence_summary, ensure_ascii=False), observed_at, ), ) episode_snapshot_payloads[key] = { 'parentEpisodeIds': episode_assignment.merged_from_episode_ids, 'topCandidateMmsi': top_candidate.mmsi if top_candidate else None, 'topCandidateScore': top_candidate.final_score if top_candidate else 0.0, 'resolutionStatus': status, 'metadata': { 'splitFromEpisodeId': episode_assignment.split_from_episode_id, 'candidateCount': len(candidates), 'topCandidateSources': sorted(_candidate_sources(top_candidate)), }, } if active_label_session is not None: label_tracking_rows.append( _label_tracking_row( observed_at=observed_at, label_session=active_label_session, auto_status=auto_status, top_candidate=top_candidate, margin=margin, candidates=candidates, ) ) sync_episode_states(conn, observed_at, episode_plan) inserted = _insert_candidate_snapshots(conn, observed_at, snapshot_rows) episode_snapshots_inserted = insert_episode_snapshots(conn, observed_at, episode_plan, episode_snapshot_payloads) tracking_inserted = _insert_label_tracking_rows(conn, label_tracking_rows) conn.commit() logger.info( 'gear parent inference: %d groups, %d direct-match, %d candidates, %d promoted, %d review, %d skipped, %d no-candidate, %d episode-snapshots, %d label-tracking', len(active_groups), direct_matched, inserted, promoted, review_required, skipped, no_candidate, episode_snapshots_inserted, tracking_inserted, ) return { 'groups': len(active_groups), 'candidates': inserted, 'promoted': promoted, 'review_required': review_required, 'skipped': skipped, 'no_candidate': no_candidate, 'direct_matched': direct_matched, 'episode_snapshots': episode_snapshots_inserted, 'label_tracking': tracking_inserted, }