"""등록 선단 기반 추적기.""" import logging import re import time from datetime import datetime, timezone from typing import Optional import pandas as pd logger = logging.getLogger(__name__) # 어구 이름 패턴 GEAR_PATTERN = re.compile(r'^(.+?)_(\d+)_(\d*)$') GEAR_PATTERN_PCT = re.compile(r'^(.+?)%$') _REGISTRY_CACHE_SEC = 3600 class FleetTracker: def __init__(self) -> None: self._companies: dict[int, dict] = {} # id → {name_cn, name_en} self._vessels: dict[int, dict] = {} # id → {permit_no, name_cn, ...} self._name_cn_map: dict[str, int] = {} # name_cn → vessel_id self._name_en_map: dict[str, int] = {} # name_en(lowercase) → vessel_id self._mmsi_to_vid: dict[str, int] = {} # mmsi → vessel_id (매칭된 것만) self._gear_active: dict[str, dict] = {} # mmsi → {name, parent_mmsi, ...} self._last_registry_load: float = 0.0 def load_registry(self, conn) -> None: """DB에서 fleet_companies + fleet_vessels 로드. 1시간 캐시.""" if time.time() - self._last_registry_load < _REGISTRY_CACHE_SEC: return cur = conn.cursor() cur.execute('SELECT id, name_cn, name_en FROM kcg.fleet_companies') self._companies = {r[0]: {'name_cn': r[1], 'name_en': r[2]} for r in cur.fetchall()} cur.execute( """SELECT id, company_id, permit_no, name_cn, name_en, tonnage, gear_code, fleet_role, pair_vessel_id, mmsi FROM kcg.fleet_vessels""" ) self._vessels = {} self._name_cn_map = {} self._name_en_map = {} self._mmsi_to_vid = {} for r in cur.fetchall(): vid = r[0] v: dict = { 'id': vid, 'company_id': r[1], 'permit_no': r[2], 'name_cn': r[3], 'name_en': r[4], 'tonnage': r[5], 'gear_code': r[6], 'fleet_role': r[7], 'pair_vessel_id': r[8], 'mmsi': r[9], } self._vessels[vid] = v if r[3]: self._name_cn_map[r[3]] = vid if r[4]: self._name_en_map[r[4].lower().strip()] = vid if r[9]: self._mmsi_to_vid[r[9]] = vid cur.close() self._last_registry_load = time.time() logger.info( 'fleet registry loaded: %d companies, %d vessels', len(self._companies), len(self._vessels), ) def match_ais_to_registry(self, ais_vessels: list[dict], conn) -> None: """AIS 선박을 등록 선단에 매칭. DB 업데이트. ais_vessels: [{mmsi, name, lat, lon, sog, cog}, ...] """ cur = conn.cursor() matched = 0 for v in ais_vessels: mmsi = v.get('mmsi', '') name = v.get('name', '') if not mmsi or not name: continue # 이미 매칭됨 → last_seen_at 업데이트 if mmsi in self._mmsi_to_vid: cur.execute( 'UPDATE kcg.fleet_vessels SET last_seen_at = NOW() WHERE id = %s', (self._mmsi_to_vid[mmsi],), ) continue # NAME_EXACT 매칭 vid: Optional[int] = self._name_cn_map.get(name) if not vid: vid = self._name_en_map.get(name.lower().strip()) if vid: cur.execute( """UPDATE kcg.fleet_vessels SET mmsi = %s, match_confidence = 0.95, match_method = 'NAME_EXACT', last_seen_at = NOW(), updated_at = NOW() WHERE id = %s AND (mmsi IS NULL OR mmsi = %s)""", (mmsi, vid, mmsi), ) self._mmsi_to_vid[mmsi] = vid matched += 1 conn.commit() cur.close() if matched > 0: logger.info('AIS→registry matched: %d vessels', matched) def track_gear_identity(self, gear_signals: list[dict], conn) -> None: """어구/어망 정체성 추적. gear_signals: [{mmsi, name, lat, lon}, ...] — 이름이 XXX_숫자_숫자 패턴인 AIS 신호 """ cur = conn.cursor() now = datetime.now(timezone.utc) for g in gear_signals: mmsi = g['mmsi'] name = g['name'] lat = g.get('lat', 0) lon = g.get('lon', 0) # 모선명 + 인덱스 추출 parent_name: Optional[str] = None idx1: Optional[int] = None idx2: Optional[int] = None m = GEAR_PATTERN.match(name) if m: parent_name = m.group(1).strip() idx1 = int(m.group(2)) idx2 = int(m.group(3)) if m.group(3) else None else: m2 = GEAR_PATTERN_PCT.match(name) if m2: parent_name = m2.group(1).strip() # 모선 매칭 parent_mmsi: Optional[str] = None parent_vid: Optional[int] = None if parent_name: vid = self._name_cn_map.get(parent_name) if not vid: vid = self._name_en_map.get(parent_name.lower()) if vid: parent_vid = vid parent_mmsi = self._vessels[vid].get('mmsi') match_method: Optional[str] = 'NAME_PARENT' if parent_vid else None confidence = 0.9 if parent_vid else 0.0 # 기존 활성 행 조회 cur.execute( """SELECT id, name FROM kcg.gear_identity_log WHERE mmsi = %s AND is_active = TRUE""", (mmsi,), ) existing = cur.fetchone() if existing: if existing[1] == name: # 같은 MMSI + 같은 이름 → 위치/시간 업데이트 cur.execute( """UPDATE kcg.gear_identity_log SET last_seen_at = %s, lat = %s, lon = %s WHERE id = %s""", (now, lat, lon, existing[0]), ) else: # 같은 MMSI + 다른 이름 → 이전 비활성화 + 새 행 cur.execute( 'UPDATE kcg.gear_identity_log SET is_active = FALSE WHERE id = %s', (existing[0],), ) cur.execute( """INSERT INTO kcg.gear_identity_log (mmsi, name, parent_name, parent_mmsi, parent_vessel_id, gear_index_1, gear_index_2, lat, lon, match_method, match_confidence, first_seen_at, last_seen_at) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""", (mmsi, name, parent_name, parent_mmsi, parent_vid, idx1, idx2, lat, lon, match_method, confidence, now, now), ) else: # 새 MMSI → 같은 이름이 다른 MMSI로 있는지 확인 cur.execute( """SELECT id, mmsi FROM kcg.gear_identity_log WHERE name = %s AND is_active = TRUE AND mmsi != %s""", (name, mmsi), ) old_mmsi_row = cur.fetchone() if old_mmsi_row: # 같은 이름 + 다른 MMSI → MMSI 변경 cur.execute( 'UPDATE kcg.gear_identity_log SET is_active = FALSE WHERE id = %s', (old_mmsi_row[0],), ) logger.info('gear MMSI change: %s → %s (name=%s)', old_mmsi_row[1], mmsi, name) cur.execute( """INSERT INTO kcg.gear_identity_log (mmsi, name, parent_name, parent_mmsi, parent_vessel_id, gear_index_1, gear_index_2, lat, lon, match_method, match_confidence, first_seen_at, last_seen_at) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""", (mmsi, name, parent_name, parent_mmsi, parent_vid, idx1, idx2, lat, lon, match_method, confidence, now, now), ) conn.commit() cur.close() def build_fleet_clusters(self, vessel_dfs: dict[str, pd.DataFrame]) -> dict[str, dict]: """등록 선단 기준으로 cluster 정보 구성. Returns: {mmsi → {cluster_id, cluster_size, is_leader, fleet_role}} cluster_id = company_id (등록 선단 기준) """ results: dict[str, dict] = {} # 회사별로 현재 AIS 수신 중인 선박 그룹핑 company_vessels: dict[int, list[str]] = {} for mmsi, vid in self._mmsi_to_vid.items(): v = self._vessels.get(vid) if not v or mmsi not in vessel_dfs: continue cid = v['company_id'] company_vessels.setdefault(cid, []).append(mmsi) for cid, mmsis in company_vessels.items(): if len(mmsis) < 2: # 단독 선박 → NOISE for mmsi in mmsis: v = self._vessels.get(self._mmsi_to_vid.get(mmsi, -1), {}) results[mmsi] = { 'cluster_id': -1, 'cluster_size': 1, 'is_leader': False, 'fleet_role': v.get('fleet_role', 'NOISE'), } continue # 2척 이상 → 등록 선단 클러스터 for mmsi in mmsis: vid = self._mmsi_to_vid[mmsi] v = self._vessels[vid] results[mmsi] = { 'cluster_id': cid, 'cluster_size': len(mmsis), 'is_leader': v['fleet_role'] == 'MAIN', 'fleet_role': v['fleet_role'], } # 매칭 안 된 선박 → NOISE for mmsi in vessel_dfs: if mmsi not in results: results[mmsi] = { 'cluster_id': -1, 'cluster_size': 0, 'is_leader': False, 'fleet_role': 'NOISE', } return results def save_snapshot(self, vessel_dfs: dict[str, pd.DataFrame], conn) -> None: """fleet_tracking_snapshot 저장.""" now = datetime.now(timezone.utc) cur = conn.cursor() company_vessels: dict[int, list[str]] = {} for mmsi, vid in self._mmsi_to_vid.items(): v = self._vessels.get(vid) if not v or mmsi not in vessel_dfs: continue company_vessels.setdefault(v['company_id'], []).append(mmsi) for cid, mmsis in company_vessels.items(): active = len(mmsis) total = sum(1 for v in self._vessels.values() if v['company_id'] == cid) lats: list[float] = [] lons: list[float] = [] for mmsi in mmsis: df = vessel_dfs.get(mmsi) if df is not None and len(df) > 0: last = df.iloc[-1] lats.append(float(last['lat'])) lons.append(float(last['lon'])) center_lat = sum(lats) / len(lats) if lats else None center_lon = sum(lons) / len(lons) if lons else None cur.execute( """INSERT INTO kcg.fleet_tracking_snapshot (company_id, snapshot_time, total_vessels, active_vessels, center_lat, center_lon) VALUES (%s, %s, %s, %s, %s, %s)""", (cid, now, total, active, center_lat, center_lon), ) conn.commit() cur.close() logger.info('fleet snapshot saved: %d companies', len(company_vessels)) def get_company_vessels(self, vessel_dfs: dict[str, 'pd.DataFrame']) -> dict[int, list[str]]: """현재 AIS 수신 중인 등록 선단의 회사별 MMSI 목록 반환. Returns: {company_id: [mmsi, ...]} """ result: dict[int, list[str]] = {} for mmsi, vid in self._mmsi_to_vid.items(): v = self._vessels.get(vid) if not v or mmsi not in vessel_dfs: continue result.setdefault(v['company_id'], []).append(mmsi) return result # 싱글턴 fleet_tracker = FleetTracker()