"""등록 선단 기반 추적기.""" import json import logging import re import time from datetime import datetime, timezone from typing import Optional import pandas as pd import psycopg2 from algorithms.gear_identity import classify_severity, detect_gear_name_collisions from algorithms.gear_name_rules import is_trackable_parent_name from config import qualified_table logger = logging.getLogger(__name__) # 어구 이름 패턴 — 공백/영숫자 인덱스/끝_ 허용 GEAR_PATTERN = re.compile(r'^(.+?)_(?=\S*\d)\S+(?:[_ ]\S*)*[_ ]*$|^(\d+)$') GEAR_PATTERN_PCT = re.compile(r'^(.+?)%$') _REGISTRY_CACHE_SEC = 3600 FLEET_COMPANIES = qualified_table('fleet_companies') FLEET_VESSELS = qualified_table('fleet_vessels') GEAR_IDENTITY_LOG = qualified_table('gear_identity_log') GEAR_CORRELATION_SCORES = qualified_table('gear_correlation_scores') GEAR_IDENTITY_COLLISIONS = qualified_table('gear_identity_collisions') FLEET_TRACKING_SNAPSHOT = qualified_table('fleet_tracking_snapshot') # 선박명 정규화 (중국/한국 어선 식별자 보존): # - 중국 어선명 = 업체명(浙岭渔) + 선박번호(20865) 로 고유 식별. 번호 제거 시 동명이 수십 개 발생 → 번호 유지 # - 통일 대상: 공백/구두점, 대소문자, NO./No. prefix _NAME_STRIP_CHARS = re.compile(r'[\s\-_./,()\[\]·•\u3000##]+') _NAME_STRIP_PREFIX_NUMBER_MARKER = re.compile(r'\bNO\.?\s*', re.IGNORECASE) _PERIOD_RANGE_PATTERN = re.compile( r'(\d{4})[/\-.](\d{1,2})[/\-.](\d{1,2})\s*[-~~]\s*(\d{4})[/\-.](\d{1,2})[/\-.](\d{1,2})' ) def _parse_period_range(raw: str) -> Optional[tuple[datetime, datetime]]: """fishing_period 'YYYY/MM/DD - YYYY/MM/DD' 포맷을 파싱. 실패 시 None. 구분자는 / - . 모두 허용, 연결자는 - ~ ~ 허용. """ if not raw: return None m = _PERIOD_RANGE_PATTERN.search(raw) if not m: return None try: y1, m1, d1, y2, m2, d2 = (int(x) for x in m.groups()) start = datetime(y1, m1, d1, 0, 0, 0) end = datetime(y2, m2, d2, 23, 59, 59) if end < start: return None return (start, end) except (ValueError, TypeError): return None def _normalize_vessel_name(name: Optional[str]) -> str: """선박명을 매칭용으로 정규화. 1. upper() + strip 2. 'No.123' / 'NO 123' 의 번호 마커만 제거 (숫자 자체는 고유 식별자로 유지) 3. 공백/구두점/중간점/전각공백/#/# 제거 """ if not name: return '' s = name.strip().upper() s = _NAME_STRIP_PREFIX_NUMBER_MARKER.sub('', s) s = _NAME_STRIP_CHARS.sub('', s) return s class FleetTracker: def __init__(self) -> None: self._companies: dict[int, dict] = {} # id → {name_cn, name_en} self._vessels: dict[int, dict] = {} # id → {permit_no, name_cn, ...} self._name_cn_map: dict[str, int] = {} # name_cn → vessel_id (정확일치) self._name_en_map: dict[str, int] = {} # name_en(lowercase) → vessel_id (정확일치) self._name_fuzzy_map: dict[str, list[int]] = {} # 정규화 이름 → [vessel_id, ...] self._mmsi_to_vid: dict[str, int] = {} # mmsi → vessel_id (매칭된 것만) self._gear_active: dict[str, dict] = {} # mmsi → {name, parent_mmsi, ...} self._last_registry_load: float = 0.0 def load_registry(self, conn) -> None: """DB에서 fleet_companies + fleet_vessels 로드. 1시간 캐시.""" if time.time() - self._last_registry_load < _REGISTRY_CACHE_SEC: return cur = conn.cursor() cur.execute(f'SELECT id, name_cn, name_en FROM {FLEET_COMPANIES}') self._companies = {r[0]: {'name_cn': r[1], 'name_en': r[2]} for r in cur.fetchall()} # 현재 연도 허가만 조회 (연단위 갱신 정책, permit_year NULL은 legacy 허용) cur.execute( f"""SELECT id, company_id, permit_no, name_cn, name_en, tonnage, gear_code, fleet_role, pair_vessel_id, mmsi, fishery_code FROM {FLEET_VESSELS} WHERE permit_year = EXTRACT(YEAR FROM now())::int OR permit_year IS NULL""" ) self._vessels = {} self._name_cn_map = {} self._name_en_map = {} self._name_fuzzy_map = {} self._mmsi_to_vid = {} for r in cur.fetchall(): vid = r[0] v: dict = { 'id': vid, 'company_id': r[1], 'permit_no': r[2], 'name_cn': r[3], 'name_en': r[4], 'tonnage': r[5], 'gear_code': r[6], 'fleet_role': r[7], 'pair_vessel_id': r[8], 'mmsi': r[9], 'fishery_code': r[10], } self._vessels[vid] = v if r[3]: self._name_cn_map[r[3]] = vid if r[4]: self._name_en_map[r[4].lower().strip()] = vid # FUZZY 매칭은 name_en 만 대상 (AIS 가 보고하는 이름은 영문이 주류) key_en = _normalize_vessel_name(r[4]) if key_en: self._name_fuzzy_map.setdefault(key_en, []).append(vid) if r[9]: self._mmsi_to_vid[r[9]] = vid cur.close() self._last_registry_load = time.time() logger.info( 'fleet registry loaded: %d companies, %d vessels', len(self._companies), len(self._vessels), ) def get_pair_mmsi(self, mmsi: str) -> Optional[str]: """PT 쌍끌이 쌍대 선박의 MMSI를 반환. 없으면 None.""" vid = self._mmsi_to_vid.get(mmsi) if vid is None: return None pair_vid = self._vessels.get(vid, {}).get('pair_vessel_id') if pair_vid is None: return None pair_vessel = self._vessels.get(pair_vid) if pair_vessel is None: return None return pair_vessel.get('mmsi') def get_vessel_gear_code(self, mmsi: str) -> Optional[str]: """등록 선박의 어구 코드(C21=PT, C22=OT 등)를 반환.""" vid = self._mmsi_to_vid.get(mmsi) if vid is None: return None return self._vessels.get(vid, {}).get('gear_code') def get_pt_registered_mmsis(self) -> set[str]: """쌍끌이(PT / PT-S) 로 등록된 선박의 MMSI 집합. fishery_code 미도입 환경에서도 legacy gear_code='C21' 로 동작. """ result: set[str] = set() for v in self._vessels.values(): gc = v.get('gear_code') or '' mmsi = v.get('mmsi') if mmsi and gc in ('C21',): result.add(mmsi) return result def get_gear_episodes(self, mmsi: str, conn, hours: int = 24) -> list[dict]: """gear_identity_log 에서 최근 N시간 신호 에피소드 목록을 반환. G-04 MMSI 조작 탐지용. first_seen_at / last_seen_at 포함. """ try: cur = conn.cursor() cur.execute( f"""SELECT first_seen_at, last_seen_at FROM {GEAR_IDENTITY_LOG} WHERE mmsi = %s AND first_seen_at > NOW() - (%s || ' hours')::interval ORDER BY first_seen_at""", (mmsi, str(hours)), ) rows = cur.fetchall() cur.close() return [{'first_seen_at': r[0], 'last_seen_at': r[1]} for r in rows] except Exception as exc: logger.warning('get_gear_episodes 실패 [mmsi=%s]: %s', mmsi, exc) return [] def get_registered_fishery_code(self, mmsi: str) -> Optional[str]: """fleet_vessels 에 등록된 선박의 fishery_code (PT/PT-S/GN/PS/OT/FC). V029 이후 fishery_code 컬럼을 우선 참조. legacy gear_code(C21 등) 는 별도 경로. """ vid = self._mmsi_to_vid.get(mmsi) if vid is None: return None v = self._vessels.get(vid, {}) return v.get('fishery_code') or None def get_permit_periods( self, mmsi: str, conn, year: Optional[int] = None, ) -> list[tuple[datetime, datetime]]: """선박의 허가 조업 기간을 파싱하여 [(start, end), ...] 반환. fishery_permit_cn.fishing_period_1/2 의 'YYYY/MM/DD - YYYY/MM/DD' 포맷을 파싱. '-' (미사용) 또는 파싱 실패 시 해당 구간 생략. G-02 금어기 판정에 사용. """ vid = self._mmsi_to_vid.get(mmsi) if vid is None: return [] v = self._vessels.get(vid) if not v: return [] permit_no = v.get('permit_no') target_year = year or datetime.now().year if not permit_no: return [] try: cur = conn.cursor() cur.execute( """SELECT fishing_period_1, fishing_period_2 FROM kcg.fishery_permit_cn WHERE permit_year = %s AND permit_no = %s""", (target_year, permit_no), ) row = cur.fetchone() cur.close() except Exception as exc: logger.warning('get_permit_periods DB 실패 [mmsi=%s]: %s', mmsi, exc) return [] if not row: return [] periods: list[tuple[datetime, datetime]] = [] for raw in row: if not raw or raw.strip() in ('-', ''): continue parsed = _parse_period_range(raw) if parsed: periods.append(parsed) return periods def get_gear_positions( self, mmsi: str, df_vessel: Optional[pd.DataFrame] = None, ) -> list[tuple[float, float]]: """고정어구 위치 목록(lat, lon). G-05 drift 탐지용. 현재 단계에서는 선박 DataFrame(track 좌표)을 그대로 전달하는 경로만 지원. 향후 gear_tracking_snapshot 테이블이 도입되면 DB 경로 추가. """ if df_vessel is None or len(df_vessel) == 0: return [] try: lats = df_vessel['lat'].tolist() lons = df_vessel['lon'].tolist() return list(zip(lats, lons)) except Exception as exc: logger.warning('get_gear_positions 실패 [mmsi=%s]: %s', mmsi, exc) return [] def match_ais_to_registry(self, ais_vessels: list[dict], conn) -> None: """AIS 선박을 등록 선단에 매칭. DB 업데이트. ais_vessels: [{mmsi, name, lat, lon, sog, cog}, ...] """ cur = conn.cursor() matched_exact = 0 matched_fuzzy = 0 for v in ais_vessels: mmsi = v.get('mmsi', '') name = v.get('name', '') if not mmsi or not name: continue # 이미 매칭됨 → last_seen_at 업데이트 if mmsi in self._mmsi_to_vid: cur.execute( f'UPDATE {FLEET_VESSELS} SET last_seen_at = NOW() WHERE id = %s', (self._mmsi_to_vid[mmsi],), ) continue # NAME_EXACT 매칭 vid: Optional[int] = self._name_cn_map.get(name) if not vid: vid = self._name_en_map.get(name.lower().strip()) method = 'NAME_EXACT' confidence = 0.95 # NAME_FUZZY 매칭 (정규화 후 lookup) if not vid: key = _normalize_vessel_name(name) if key: candidates = self._name_fuzzy_map.get(key, []) # 이미 다른 MMSI에 할당된 vid 제외 → 동명이 중복 매칭 방지 unassigned = [ c for c in candidates if not self._vessels.get(c, {}).get('mmsi') or self._vessels[c].get('mmsi') == mmsi ] if len(unassigned) == 1: vid = unassigned[0] method = 'NAME_FUZZY' confidence = 0.80 if vid: cur.execute( f"""UPDATE {FLEET_VESSELS} SET mmsi = %s, match_confidence = %s, match_method = %s, last_seen_at = NOW(), updated_at = NOW() WHERE id = %s AND (mmsi IS NULL OR mmsi = %s)""", (mmsi, confidence, method, vid, mmsi), ) self._mmsi_to_vid[mmsi] = vid self._vessels[vid]['mmsi'] = mmsi if method == 'NAME_FUZZY': matched_fuzzy += 1 else: matched_exact += 1 conn.commit() cur.close() if matched_exact or matched_fuzzy: logger.info( 'AIS→registry matched: exact=%d, fuzzy=%d', matched_exact, matched_fuzzy, ) def track_gear_identity(self, gear_signals: list[dict], conn) -> None: """어구/어망 정체성 추적. gear_signals: [{mmsi, name, lat, lon}, ...] — 이름이 XXX_숫자_숫자 패턴인 AIS 신호 동일 이름이 서로 다른 MMSI 로 같은 cycle 에 동시에 수신된 경우는 "공존 (GEAR_IDENTITY_COLLISION)" 으로 간주해 gear_identity_collisions 에 누적하고, gear_identity_log 는 각 MMSI 별로 독립 active 유지한다 (비활성화/점수 이전 X). 단일 MMSI 이름에 한해 기존 교체(sequential) 로직을 적용한다. """ cur = conn.cursor() now = datetime.now(timezone.utc) self._recent_collision_ids: list[int] = [] # 1) 공존 감지 + UPSERT (이번 사이클에 동일 이름 다중 MMSI 수신 확인) collisions = detect_gear_name_collisions(gear_signals, now) colliding_names: set[str] = {c['name'] for c in collisions} for c in collisions: cid = self._upsert_gear_collision(cur, c, now) if cid is not None: self._recent_collision_ids.append(cid) # 2) 개별 신호 처리 for g in gear_signals: mmsi = g['mmsi'] name = g['name'] lat = g.get('lat', 0) lon = g.get('lon', 0) parent_name, idx1, idx2 = self._parse_gear_name(name) effective_parent_name = parent_name or name if not is_trackable_parent_name(effective_parent_name): continue parent_mmsi, parent_vid = self._match_parent_vessel(parent_name) match_method: Optional[str] = 'NAME_PARENT' if parent_vid else None confidence = 0.9 if parent_vid else 0.0 is_colliding = name in colliding_names # 기존 활성 행 조회 cur.execute( f"""SELECT id, name FROM {GEAR_IDENTITY_LOG} WHERE mmsi = %s AND is_active = TRUE""", (mmsi,), ) existing = cur.fetchone() if existing: if existing[1] == name: # 같은 MMSI + 같은 이름 → 위치/시간 업데이트 cur.execute( f"""UPDATE {GEAR_IDENTITY_LOG} SET last_seen_at = %s, lat = %s, lon = %s WHERE id = %s""", (now, lat, lon, existing[0]), ) else: # 같은 MMSI + 다른 이름 → 이전 비활성화 + 새 행 cur.execute( f'UPDATE {GEAR_IDENTITY_LOG} SET is_active = FALSE WHERE id = %s', (existing[0],), ) cur.execute( f"""INSERT INTO {GEAR_IDENTITY_LOG} (mmsi, name, parent_name, parent_mmsi, parent_vessel_id, gear_index_1, gear_index_2, lat, lon, match_method, match_confidence, first_seen_at, last_seen_at) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""", (mmsi, name, parent_name, parent_mmsi, parent_vid, idx1, idx2, lat, lon, match_method, confidence, now, now), ) else: # 새 MMSI — 이름이 공존 케이스면 다른 MMSI 활성행을 건드리지 않고 이번 것만 INSERT if not is_colliding: # 교체 경로 — 같은 이름이 다른 MMSI 로 active 면 MMSI 교체로 간주 cur.execute( f"""SELECT id, mmsi FROM {GEAR_IDENTITY_LOG} WHERE name = %s AND is_active = TRUE AND mmsi != %s""", (name, mmsi), ) old_mmsi_row = cur.fetchone() if old_mmsi_row: cur.execute( f'UPDATE {GEAR_IDENTITY_LOG} SET is_active = FALSE WHERE id = %s', (old_mmsi_row[0],), ) logger.info( 'gear MMSI change: %s → %s (name=%s)', old_mmsi_row[1], mmsi, name, ) # 어피니티 점수 이전 (savepoint 로 격리 — PK 충돌 시 트랜잭션 유지) self._transfer_affinity_scores(cur, old_mmsi_row[1], mmsi) cur.execute( f"""INSERT INTO {GEAR_IDENTITY_LOG} (mmsi, name, parent_name, parent_mmsi, parent_vessel_id, gear_index_1, gear_index_2, lat, lon, match_method, match_confidence, first_seen_at, last_seen_at) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""", (mmsi, name, parent_name, parent_mmsi, parent_vid, idx1, idx2, lat, lon, match_method, confidence, now, now), ) if collisions: logger.info( 'gear identity collisions: %d pairs touched (%d distinct names)', len(collisions), len(colliding_names), ) conn.commit() cur.close() def _parse_gear_name(self, name: str) -> tuple[Optional[str], Optional[int], Optional[int]]: """어구 이름에서 parent_name, idx1, idx2 추출.""" parent_name: Optional[str] = None idx1: Optional[int] = None idx2: Optional[int] = None m = GEAR_PATTERN.match(name) if m: if m.group(1): parent_name = m.group(1).strip() suffix = name[m.end(1):].strip(' _') digits = re.findall(r'\d+', suffix) idx1 = int(digits[0]) if len(digits) >= 1 else None idx2 = int(digits[1]) if len(digits) >= 2 else None else: idx1 = int(m.group(2)) else: m2 = GEAR_PATTERN_PCT.match(name) if m2: parent_name = m2.group(1).strip() return parent_name, idx1, idx2 def _match_parent_vessel(self, parent_name: Optional[str]) -> tuple[Optional[str], Optional[int]]: """parent_name → (parent_mmsi, parent_vessel_id) 매칭. EXACT → FUZZY 순.""" if not parent_name: return None, None vid = self._name_cn_map.get(parent_name) if not vid: vid = self._name_en_map.get(parent_name.lower()) if not vid: key = _normalize_vessel_name(parent_name) if key: candidates = self._name_fuzzy_map.get(key, []) if len(candidates) == 1: vid = candidates[0] if vid: return self._vessels[vid].get('mmsi'), vid return None, None def _transfer_affinity_scores(self, cur, old_mmsi: str, new_mmsi: str) -> None: """gear_correlation_scores 의 target_mmsi 를 새 MMSI 로 이전. PK = (model_id, group_key, sub_cluster_id, target_mmsi) 충돌 가능성이 있어 SAVEPOINT 로 격리. 충돌 시에는 이전을 포기하고 경고만 남긴다 — 상위 트랜잭션은 유지되어 뒤이은 INSERT 가 정상 동작한다. """ cur.execute('SAVEPOINT sp_score_xfer') try: cur.execute( f"UPDATE {GEAR_CORRELATION_SCORES} " "SET target_mmsi = %s, updated_at = NOW() " "WHERE target_mmsi = %s", (new_mmsi, old_mmsi), ) if cur.rowcount > 0: logger.info( 'transferred %d affinity scores: %s → %s', cur.rowcount, old_mmsi, new_mmsi, ) cur.execute('RELEASE SAVEPOINT sp_score_xfer') except psycopg2.errors.UniqueViolation as e: cur.execute('ROLLBACK TO SAVEPOINT sp_score_xfer') cur.execute('RELEASE SAVEPOINT sp_score_xfer') logger.warning( 'affinity score transfer skipped (pk conflict): %s → %s | %s', old_mmsi, new_mmsi, e, ) def _upsert_gear_collision( self, cur, collision: dict, now: datetime, ) -> Optional[int]: """단일 공존 쌍 UPSERT → row id 반환. - 기존 행: coexistence_count += 1, last_seen_at/좌표 갱신, max_distance_km = GREATEST(기존, 신규). status 가 OPEN/REVIEWED 인 경우에 한해 severity 재계산. - 신규 행: first_seen_at = last_seen_at = now, severity 계산 후 INSERT. """ name = collision['name'] mmsi_lo = collision['mmsi_lo'] mmsi_hi = collision['mmsi_hi'] distance_km = collision.get('distance_km') or 0.0 # 모선 힌트 — fleet_tracker 매칭 결과를 우선, 없으면 알고리즘이 추출한 값 사용 hint_parent = collision.get('parent_name') parent_mmsi, parent_vid = self._match_parent_vessel(hint_parent) cur.execute( f"""SELECT id, coexistence_count, max_distance_km, swap_count, status, parent_name, parent_vessel_id FROM {GEAR_IDENTITY_COLLISIONS} WHERE name = %s AND mmsi_lo = %s AND mmsi_hi = %s""", (name, mmsi_lo, mmsi_hi), ) row = cur.fetchone() evidence_fragment = { 'observed_at': now.isoformat(), 'distance_km': distance_km, 'positions': { mmsi_lo: { 'lat': collision.get('lat_lo'), 'lon': collision.get('lon_lo'), }, mmsi_hi: { 'lat': collision.get('lat_hi'), 'lon': collision.get('lon_hi'), }, }, } if row: (row_id, cur_count, cur_max_dist, swap_cnt, cur_status, cur_parent_name, cur_parent_vid) = row new_count = (cur_count or 0) + 1 merged_max = max( float(cur_max_dist or 0), float(distance_km or 0), ) if cur_status in ('CONFIRMED_ILLEGAL', 'FALSE_POSITIVE'): new_severity = None # 확정 상태는 유지 else: new_severity = classify_severity(new_count, merged_max, swap_cnt or 0) cur.execute( f"""UPDATE {GEAR_IDENTITY_COLLISIONS} SET last_seen_at = %s, coexistence_count = %s, max_distance_km = %s, last_lat_lo = %s, last_lon_lo = %s, last_lat_hi = %s, last_lon_hi = %s, parent_name = COALESCE(%s, parent_name), parent_vessel_id = COALESCE(%s, parent_vessel_id), severity = COALESCE(%s, severity), evidence = COALESCE(evidence, '[]'::jsonb) || to_jsonb(%s::jsonb), updated_at = NOW() WHERE id = %s""", ( now, new_count, merged_max, collision.get('lat_lo'), collision.get('lon_lo'), collision.get('lat_hi'), collision.get('lon_hi'), hint_parent, parent_vid, new_severity, json.dumps([evidence_fragment]), row_id, ), ) return row_id severity = classify_severity(1, distance_km, 0) cur.execute( f"""INSERT INTO {GEAR_IDENTITY_COLLISIONS} (name, mmsi_lo, mmsi_hi, parent_name, parent_vessel_id, first_seen_at, last_seen_at, coexistence_count, max_distance_km, last_lat_lo, last_lon_lo, last_lat_hi, last_lon_hi, severity, status, evidence) VALUES (%s,%s,%s,%s,%s,%s,%s,1,%s,%s,%s,%s,%s,%s,'OPEN',%s) RETURNING id""", ( name, mmsi_lo, mmsi_hi, hint_parent, parent_vid, now, now, distance_km, collision.get('lat_lo'), collision.get('lon_lo'), collision.get('lat_hi'), collision.get('lon_hi'), severity, json.dumps([evidence_fragment]), ), ) return cur.fetchone()[0] def get_recent_collision_ids(self) -> list[int]: """이번 사이클에 갱신·추가된 gear_identity_collisions row id 목록.""" return list(getattr(self, '_recent_collision_ids', [])) def build_fleet_clusters(self, vessel_dfs: dict[str, pd.DataFrame]) -> dict[str, dict]: """등록 선단 기준으로 cluster 정보 구성. Returns: {mmsi → {cluster_id, cluster_size, is_leader, fleet_role}} cluster_id = company_id (등록 선단 기준) """ results: dict[str, dict] = {} # 회사별로 현재 AIS 수신 중인 선박 그룹핑 company_vessels: dict[int, list[str]] = {} for mmsi, vid in self._mmsi_to_vid.items(): v = self._vessels.get(vid) if not v or mmsi not in vessel_dfs: continue cid = v['company_id'] company_vessels.setdefault(cid, []).append(mmsi) for cid, mmsis in company_vessels.items(): if len(mmsis) < 2: # 단독 선박 → NOISE for mmsi in mmsis: v = self._vessels.get(self._mmsi_to_vid.get(mmsi, -1), {}) results[mmsi] = { 'cluster_id': -1, 'cluster_size': 1, 'is_leader': False, 'fleet_role': v.get('fleet_role', 'NOISE'), } continue # 2척 이상 → 등록 선단 클러스터 for mmsi in mmsis: vid = self._mmsi_to_vid[mmsi] v = self._vessels[vid] results[mmsi] = { 'cluster_id': cid, 'cluster_size': len(mmsis), 'is_leader': v['fleet_role'] == 'MAIN', 'fleet_role': v['fleet_role'], } # 매칭 안 된 선박 → NOISE for mmsi in vessel_dfs: if mmsi not in results: results[mmsi] = { 'cluster_id': -1, 'cluster_size': 0, 'is_leader': False, 'fleet_role': 'NOISE', } return results def save_snapshot(self, vessel_dfs: dict[str, pd.DataFrame], conn) -> None: """fleet_tracking_snapshot 저장.""" now = datetime.now(timezone.utc) cur = conn.cursor() company_vessels: dict[int, list[str]] = {} for mmsi, vid in self._mmsi_to_vid.items(): v = self._vessels.get(vid) if not v or mmsi not in vessel_dfs: continue company_vessels.setdefault(v['company_id'], []).append(mmsi) for cid, mmsis in company_vessels.items(): active = len(mmsis) total = sum(1 for v in self._vessels.values() if v['company_id'] == cid) lats: list[float] = [] lons: list[float] = [] for mmsi in mmsis: df = vessel_dfs.get(mmsi) if df is not None and len(df) > 0: last = df.iloc[-1] lats.append(float(last['lat'])) lons.append(float(last['lon'])) center_lat = sum(lats) / len(lats) if lats else None center_lon = sum(lons) / len(lons) if lons else None cur.execute( f"""INSERT INTO {FLEET_TRACKING_SNAPSHOT} (company_id, snapshot_time, total_vessels, active_vessels, center_lat, center_lon) VALUES (%s, %s, %s, %s, %s, %s)""", (cid, now, total, active, center_lat, center_lon), ) conn.commit() cur.close() logger.info('fleet snapshot saved: %d companies', len(company_vessels)) def get_company_vessels(self, vessel_dfs: dict[str, 'pd.DataFrame']) -> dict[int, list[str]]: """현재 AIS 수신 중인 등록 선단의 회사별 MMSI 목록 반환. Returns: {company_id: [mmsi, ...]} """ result: dict[int, list[str]] = {} for mmsi, vid in self._mmsi_to_vid.items(): v = self._vessels.get(vid) if not v or mmsi not in vessel_dfs: continue result.setdefault(v['company_id'], []).append(mmsi) return result # 싱글턴 fleet_tracker = FleetTracker()