kcg-ai-monitoring/prediction/fleet_tracker.py
htlee 64df7b180c feat(prediction): fleet_tracker NAME_FUZZY 매칭 추가 (A-2)
_normalize_vessel_name()로 선박번호 suffix(호/號/号/NoN/#N) +
공백/구두점 제거 후 upper() 통일. EXACT 실패 시 FUZZY 단계로
매칭 시도. 동명이 후보 2개 이상이거나 이미 다른 MMSI 할당된 vid는
제외하여 중복 방지.

- match_ais_to_registry: NAME_EXACT → NAME_FUZZY (confidence 0.80)
- track_gear_identity: parent_name 매칭에도 FUZZY 적용
- _name_fuzzy_map 캐시로 1회 lookup

검증 목표: fleet_vessels.mmsi 매칭률 8.7% → 30%+
2026-04-16 09:08:35 +09:00

513 lines
20 KiB
Python
Raw Blame 히스토리

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""등록 선단 기반 추적기."""
import logging
import re
import time
from datetime import datetime, timezone
from typing import Optional
import pandas as pd
from algorithms.gear_name_rules import is_trackable_parent_name
from config import qualified_table
logger = logging.getLogger(__name__)
# 어구 이름 패턴 — 공백/영숫자 인덱스/끝_ 허용
GEAR_PATTERN = re.compile(r'^(.+?)_(?=\S*\d)\S+(?:[_ ]\S*)*[_ ]*$|^(\d+)$')
GEAR_PATTERN_PCT = re.compile(r'^(.+?)%$')
_REGISTRY_CACHE_SEC = 3600
FLEET_COMPANIES = qualified_table('fleet_companies')
FLEET_VESSELS = qualified_table('fleet_vessels')
GEAR_IDENTITY_LOG = qualified_table('gear_identity_log')
GEAR_CORRELATION_SCORES = qualified_table('gear_correlation_scores')
FLEET_TRACKING_SNAPSHOT = qualified_table('fleet_tracking_snapshot')
# 선박명 정규화: 공백/구두점 제거, 선박번호 suffix 제거, upper() 통일.
# AIS 이름과 fishery_permit_cn 이름 간 suffix/공백 차이로 NAME_EXACT 매칭률 8.7% → 정규화 기반 매칭으로 회복.
_NAME_STRIP_SUFFIX = re.compile(
r'(?:'
r'[\s_]*(?:NO\.?|#|)?[\s_]*\d+\s*(?:호|號|号)?' # 123 / No.123 / #123 / 123호 / 12号
r'|[\s_]*(?:호|號|号)' # 말미 호/號/号 단독
r')\s*$',
re.IGNORECASE,
)
_NAME_STRIP_CHARS = re.compile(r'[\s\-_./,()\[\]·•\u3000]+')
def _normalize_vessel_name(name: Optional[str]) -> str:
"""선박명을 매칭용으로 정규화.
1. upper() + strip
2. 말미 선박번호 패턴 제거 (호/號/号, No.N, #N, 공백숫자)
3. 남은 공백/구두점 제거
"""
if not name:
return ''
s = name.strip().upper()
s = _NAME_STRIP_SUFFIX.sub('', s)
s = _NAME_STRIP_CHARS.sub('', s)
return s
class FleetTracker:
def __init__(self) -> None:
self._companies: dict[int, dict] = {} # id → {name_cn, name_en}
self._vessels: dict[int, dict] = {} # id → {permit_no, name_cn, ...}
self._name_cn_map: dict[str, int] = {} # name_cn → vessel_id (정확일치)
self._name_en_map: dict[str, int] = {} # name_en(lowercase) → vessel_id (정확일치)
self._name_fuzzy_map: dict[str, list[int]] = {} # 정규화 이름 → [vessel_id, ...]
self._mmsi_to_vid: dict[str, int] = {} # mmsi → vessel_id (매칭된 것만)
self._gear_active: dict[str, dict] = {} # mmsi → {name, parent_mmsi, ...}
self._last_registry_load: float = 0.0
def load_registry(self, conn) -> None:
"""DB에서 fleet_companies + fleet_vessels 로드. 1시간 캐시."""
if time.time() - self._last_registry_load < _REGISTRY_CACHE_SEC:
return
cur = conn.cursor()
cur.execute(f'SELECT id, name_cn, name_en FROM {FLEET_COMPANIES}')
self._companies = {r[0]: {'name_cn': r[1], 'name_en': r[2]} for r in cur.fetchall()}
# 현재 연도 허가만 조회 (연단위 갱신 정책, permit_year NULL은 legacy 허용)
cur.execute(
f"""SELECT id, company_id, permit_no, name_cn, name_en, tonnage,
gear_code, fleet_role, pair_vessel_id, mmsi
FROM {FLEET_VESSELS}
WHERE permit_year = EXTRACT(YEAR FROM now())::int
OR permit_year IS NULL"""
)
self._vessels = {}
self._name_cn_map = {}
self._name_en_map = {}
self._name_fuzzy_map = {}
self._mmsi_to_vid = {}
for r in cur.fetchall():
vid = r[0]
v: dict = {
'id': vid,
'company_id': r[1],
'permit_no': r[2],
'name_cn': r[3],
'name_en': r[4],
'tonnage': r[5],
'gear_code': r[6],
'fleet_role': r[7],
'pair_vessel_id': r[8],
'mmsi': r[9],
}
self._vessels[vid] = v
if r[3]:
self._name_cn_map[r[3]] = vid
key_cn = _normalize_vessel_name(r[3])
if key_cn:
self._name_fuzzy_map.setdefault(key_cn, []).append(vid)
if r[4]:
self._name_en_map[r[4].lower().strip()] = vid
key_en = _normalize_vessel_name(r[4])
if key_en:
self._name_fuzzy_map.setdefault(key_en, []).append(vid)
if r[9]:
self._mmsi_to_vid[r[9]] = vid
cur.close()
self._last_registry_load = time.time()
logger.info(
'fleet registry loaded: %d companies, %d vessels',
len(self._companies),
len(self._vessels),
)
def get_pair_mmsi(self, mmsi: str) -> Optional[str]:
"""PT 쌍끌이 쌍대 선박의 MMSI를 반환. 없으면 None."""
vid = self._mmsi_to_vid.get(mmsi)
if vid is None:
return None
pair_vid = self._vessels.get(vid, {}).get('pair_vessel_id')
if pair_vid is None:
return None
pair_vessel = self._vessels.get(pair_vid)
if pair_vessel is None:
return None
return pair_vessel.get('mmsi')
def get_vessel_gear_code(self, mmsi: str) -> Optional[str]:
"""등록 선박의 어구 코드(C21=PT, C22=OT 등)를 반환."""
vid = self._mmsi_to_vid.get(mmsi)
if vid is None:
return None
return self._vessels.get(vid, {}).get('gear_code')
def get_pt_registered_mmsis(self) -> set[str]:
"""쌍끌이(PT / PT-S) 로 등록된 선박의 MMSI 집합.
fishery_code 미도입 환경에서도 legacy gear_code='C21' 로 동작.
"""
result: set[str] = set()
for v in self._vessels.values():
gc = v.get('gear_code') or ''
mmsi = v.get('mmsi')
if mmsi and gc in ('C21',):
result.add(mmsi)
return result
def get_gear_episodes(self, mmsi: str, conn, hours: int = 24) -> list[dict]:
"""gear_identity_log 에서 최근 N시간 신호 에피소드 목록을 반환.
G-04 MMSI 조작 탐지용. first_seen_at / last_seen_at 포함.
"""
try:
cur = conn.cursor()
cur.execute(
f"""SELECT first_seen_at, last_seen_at
FROM {GEAR_IDENTITY_LOG}
WHERE mmsi = %s
AND first_seen_at > NOW() - (%s || ' hours')::interval
ORDER BY first_seen_at""",
(mmsi, str(hours)),
)
rows = cur.fetchall()
cur.close()
return [{'first_seen_at': r[0], 'last_seen_at': r[1]} for r in rows]
except Exception as exc:
logger.warning('get_gear_episodes 실패 [mmsi=%s]: %s', mmsi, exc)
return []
def get_gear_positions(
self, mmsi: str, df_vessel: Optional[pd.DataFrame] = None,
) -> list[tuple[float, float]]:
"""고정어구 위치 목록(lat, lon). G-05 drift 탐지용.
현재 단계에서는 선박 DataFrame(track 좌표)을 그대로 전달하는 경로만 지원.
향후 gear_tracking_snapshot 테이블이 도입되면 DB 경로 추가.
"""
if df_vessel is None or len(df_vessel) == 0:
return []
try:
lats = df_vessel['lat'].tolist()
lons = df_vessel['lon'].tolist()
return list(zip(lats, lons))
except Exception as exc:
logger.warning('get_gear_positions 실패 [mmsi=%s]: %s', mmsi, exc)
return []
def match_ais_to_registry(self, ais_vessels: list[dict], conn) -> None:
"""AIS 선박을 등록 선단에 매칭. DB 업데이트.
ais_vessels: [{mmsi, name, lat, lon, sog, cog}, ...]
"""
cur = conn.cursor()
matched_exact = 0
matched_fuzzy = 0
for v in ais_vessels:
mmsi = v.get('mmsi', '')
name = v.get('name', '')
if not mmsi or not name:
continue
# 이미 매칭됨 → last_seen_at 업데이트
if mmsi in self._mmsi_to_vid:
cur.execute(
f'UPDATE {FLEET_VESSELS} SET last_seen_at = NOW() WHERE id = %s',
(self._mmsi_to_vid[mmsi],),
)
continue
# NAME_EXACT 매칭
vid: Optional[int] = self._name_cn_map.get(name)
if not vid:
vid = self._name_en_map.get(name.lower().strip())
method = 'NAME_EXACT'
confidence = 0.95
# NAME_FUZZY 매칭 (정규화 후 lookup)
if not vid:
key = _normalize_vessel_name(name)
if key:
candidates = self._name_fuzzy_map.get(key, [])
# 이미 다른 MMSI에 할당된 vid 제외 → 동명이 중복 매칭 방지
unassigned = [
c for c in candidates
if not self._vessels.get(c, {}).get('mmsi')
or self._vessels[c].get('mmsi') == mmsi
]
if len(unassigned) == 1:
vid = unassigned[0]
method = 'NAME_FUZZY'
confidence = 0.80
if vid:
cur.execute(
f"""UPDATE {FLEET_VESSELS}
SET mmsi = %s, match_confidence = %s, match_method = %s,
last_seen_at = NOW(), updated_at = NOW()
WHERE id = %s AND (mmsi IS NULL OR mmsi = %s)""",
(mmsi, confidence, method, vid, mmsi),
)
self._mmsi_to_vid[mmsi] = vid
self._vessels[vid]['mmsi'] = mmsi
if method == 'NAME_FUZZY':
matched_fuzzy += 1
else:
matched_exact += 1
conn.commit()
cur.close()
if matched_exact or matched_fuzzy:
logger.info(
'AIS→registry matched: exact=%d, fuzzy=%d',
matched_exact, matched_fuzzy,
)
def track_gear_identity(self, gear_signals: list[dict], conn) -> None:
"""어구/어망 정체성 추적.
gear_signals: [{mmsi, name, lat, lon}, ...] — 이름이 XXX_숫자_숫자 패턴인 AIS 신호
"""
cur = conn.cursor()
now = datetime.now(timezone.utc)
for g in gear_signals:
mmsi = g['mmsi']
name = g['name']
lat = g.get('lat', 0)
lon = g.get('lon', 0)
# 모선명 + 인덱스 추출
parent_name: Optional[str] = None
idx1: Optional[int] = None
idx2: Optional[int] = None
m = GEAR_PATTERN.match(name)
if m:
# group(1): parent+index 패턴, group(2): 순수 숫자 패턴
if m.group(1):
parent_name = m.group(1).strip()
suffix = name[m.end(1):].strip(' _')
digits = re.findall(r'\d+', suffix)
idx1 = int(digits[0]) if len(digits) >= 1 else None
idx2 = int(digits[1]) if len(digits) >= 2 else None
else:
# 순수 숫자 이름 (예: 12345) — parent 없음, 인덱스만
idx1 = int(m.group(2))
else:
m2 = GEAR_PATTERN_PCT.match(name)
if m2:
parent_name = m2.group(1).strip()
effective_parent_name = parent_name or name
if not is_trackable_parent_name(effective_parent_name):
continue
# 모선 매칭 (EXACT → FUZZY 순)
parent_mmsi: Optional[str] = None
parent_vid: Optional[int] = None
if parent_name:
vid = self._name_cn_map.get(parent_name)
if not vid:
vid = self._name_en_map.get(parent_name.lower())
if not vid:
key = _normalize_vessel_name(parent_name)
if key:
candidates = self._name_fuzzy_map.get(key, [])
if len(candidates) == 1:
vid = candidates[0]
if vid:
parent_vid = vid
parent_mmsi = self._vessels[vid].get('mmsi')
match_method: Optional[str] = 'NAME_PARENT' if parent_vid else None
confidence = 0.9 if parent_vid else 0.0
# 기존 활성 행 조회
cur.execute(
f"""SELECT id, name FROM {GEAR_IDENTITY_LOG}
WHERE mmsi = %s AND is_active = TRUE""",
(mmsi,),
)
existing = cur.fetchone()
if existing:
if existing[1] == name:
# 같은 MMSI + 같은 이름 → 위치/시간 업데이트
cur.execute(
f"""UPDATE {GEAR_IDENTITY_LOG}
SET last_seen_at = %s, lat = %s, lon = %s
WHERE id = %s""",
(now, lat, lon, existing[0]),
)
else:
# 같은 MMSI + 다른 이름 → 이전 비활성화 + 새 행
cur.execute(
f'UPDATE {GEAR_IDENTITY_LOG} SET is_active = FALSE WHERE id = %s',
(existing[0],),
)
cur.execute(
f"""INSERT INTO {GEAR_IDENTITY_LOG}
(mmsi, name, parent_name, parent_mmsi, parent_vessel_id,
gear_index_1, gear_index_2, lat, lon,
match_method, match_confidence, first_seen_at, last_seen_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
(mmsi, name, parent_name, parent_mmsi, parent_vid,
idx1, idx2, lat, lon,
match_method, confidence, now, now),
)
else:
# 새 MMSI → 같은 이름이 다른 MMSI로 있는지 확인
cur.execute(
f"""SELECT id, mmsi FROM {GEAR_IDENTITY_LOG}
WHERE name = %s AND is_active = TRUE AND mmsi != %s""",
(name, mmsi),
)
old_mmsi_row = cur.fetchone()
if old_mmsi_row:
# 같은 이름 + 다른 MMSI → MMSI 변경
cur.execute(
f'UPDATE {GEAR_IDENTITY_LOG} SET is_active = FALSE WHERE id = %s',
(old_mmsi_row[0],),
)
logger.info('gear MMSI change: %s%s (name=%s)', old_mmsi_row[1], mmsi, name)
# 어피니티 점수 이전 (이전 MMSI → 새 MMSI)
try:
cur.execute(
f"UPDATE {GEAR_CORRELATION_SCORES} "
"SET target_mmsi = %s, updated_at = NOW() "
"WHERE target_mmsi = %s",
(mmsi, old_mmsi_row[1]),
)
if cur.rowcount > 0:
logger.info(
'transferred %d affinity scores: %s%s',
cur.rowcount, old_mmsi_row[1], mmsi,
)
except Exception as e:
logger.warning('affinity score transfer failed: %s', e)
cur.execute(
f"""INSERT INTO {GEAR_IDENTITY_LOG}
(mmsi, name, parent_name, parent_mmsi, parent_vessel_id,
gear_index_1, gear_index_2, lat, lon,
match_method, match_confidence, first_seen_at, last_seen_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
(mmsi, name, parent_name, parent_mmsi, parent_vid,
idx1, idx2, lat, lon,
match_method, confidence, now, now),
)
conn.commit()
cur.close()
def build_fleet_clusters(self, vessel_dfs: dict[str, pd.DataFrame]) -> dict[str, dict]:
"""등록 선단 기준으로 cluster 정보 구성.
Returns: {mmsi → {cluster_id, cluster_size, is_leader, fleet_role}}
cluster_id = company_id (등록 선단 기준)
"""
results: dict[str, dict] = {}
# 회사별로 현재 AIS 수신 중인 선박 그룹핑
company_vessels: dict[int, list[str]] = {}
for mmsi, vid in self._mmsi_to_vid.items():
v = self._vessels.get(vid)
if not v or mmsi not in vessel_dfs:
continue
cid = v['company_id']
company_vessels.setdefault(cid, []).append(mmsi)
for cid, mmsis in company_vessels.items():
if len(mmsis) < 2:
# 단독 선박 → NOISE
for mmsi in mmsis:
v = self._vessels.get(self._mmsi_to_vid.get(mmsi, -1), {})
results[mmsi] = {
'cluster_id': -1,
'cluster_size': 1,
'is_leader': False,
'fleet_role': v.get('fleet_role', 'NOISE'),
}
continue
# 2척 이상 → 등록 선단 클러스터
for mmsi in mmsis:
vid = self._mmsi_to_vid[mmsi]
v = self._vessels[vid]
results[mmsi] = {
'cluster_id': cid,
'cluster_size': len(mmsis),
'is_leader': v['fleet_role'] == 'MAIN',
'fleet_role': v['fleet_role'],
}
# 매칭 안 된 선박 → NOISE
for mmsi in vessel_dfs:
if mmsi not in results:
results[mmsi] = {
'cluster_id': -1,
'cluster_size': 0,
'is_leader': False,
'fleet_role': 'NOISE',
}
return results
def save_snapshot(self, vessel_dfs: dict[str, pd.DataFrame], conn) -> None:
"""fleet_tracking_snapshot 저장."""
now = datetime.now(timezone.utc)
cur = conn.cursor()
company_vessels: dict[int, list[str]] = {}
for mmsi, vid in self._mmsi_to_vid.items():
v = self._vessels.get(vid)
if not v or mmsi not in vessel_dfs:
continue
company_vessels.setdefault(v['company_id'], []).append(mmsi)
for cid, mmsis in company_vessels.items():
active = len(mmsis)
total = sum(1 for v in self._vessels.values() if v['company_id'] == cid)
lats: list[float] = []
lons: list[float] = []
for mmsi in mmsis:
df = vessel_dfs.get(mmsi)
if df is not None and len(df) > 0:
last = df.iloc[-1]
lats.append(float(last['lat']))
lons.append(float(last['lon']))
center_lat = sum(lats) / len(lats) if lats else None
center_lon = sum(lons) / len(lons) if lons else None
cur.execute(
f"""INSERT INTO {FLEET_TRACKING_SNAPSHOT}
(company_id, snapshot_time, total_vessels, active_vessels,
center_lat, center_lon)
VALUES (%s, %s, %s, %s, %s, %s)""",
(cid, now, total, active, center_lat, center_lon),
)
conn.commit()
cur.close()
logger.info('fleet snapshot saved: %d companies', len(company_vessels))
def get_company_vessels(self, vessel_dfs: dict[str, 'pd.DataFrame']) -> dict[int, list[str]]:
"""현재 AIS 수신 중인 등록 선단의 회사별 MMSI 목록 반환.
Returns: {company_id: [mmsi, ...]}
"""
result: dict[int, list[str]] = {}
for mmsi, vid in self._mmsi_to_vid.items():
v = self._vessels.get(vid)
if not v or mmsi not in vessel_dfs:
continue
result.setdefault(v['company_id'], []).append(mmsi)
return result
# 싱글턴
fleet_tracker = FleetTracker()