kcg-ai-monitoring/prediction/fleet_tracker.py
htlee 535704707b fix(prediction): FUZZY 정규화를 공백/대소문자만으로 축소 + name_en 전용 (A-2 후속)
버그 원인: 초기 정규화가 선박번호(suffix)까지 제거 → '浙岭渔20865' → '浙岭渔' 로
축약 → 동명이 수십 개 발생 → len(unassigned)>1 조건에 전부 탈락 → FUZZY=0건.

중국/한국 어선명은 업체명+선박번호가 고유 식별자이므로 숫자 자체는 보존해야 함.
정규화는 공백/구두점/대소문자/'NO.' 마커만 통일:
  'ZHE LING YU 20865' ↔ 'zhelingyu20865' ↔ 'ZHE-LING-YU-20865' 모두 일치

FUZZY 매칭 key 는 name_en 만 등록 (AIS 보고 이름이 영문이 주류).
2026-04-16 10:13:35 +09:00

588 lines
23 KiB
Python

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""등록 선단 기반 추적기."""
import logging
import re
import time
from datetime import datetime, timezone
from typing import Optional
import pandas as pd
from algorithms.gear_name_rules import is_trackable_parent_name
from config import qualified_table
logger = logging.getLogger(__name__)
# 어구 이름 패턴 — 공백/영숫자 인덱스/끝_ 허용
GEAR_PATTERN = re.compile(r'^(.+?)_(?=\S*\d)\S+(?:[_ ]\S*)*[_ ]*$|^(\d+)$')
GEAR_PATTERN_PCT = re.compile(r'^(.+?)%$')
_REGISTRY_CACHE_SEC = 3600
FLEET_COMPANIES = qualified_table('fleet_companies')
FLEET_VESSELS = qualified_table('fleet_vessels')
GEAR_IDENTITY_LOG = qualified_table('gear_identity_log')
GEAR_CORRELATION_SCORES = qualified_table('gear_correlation_scores')
FLEET_TRACKING_SNAPSHOT = qualified_table('fleet_tracking_snapshot')
# 선박명 정규화 (중국/한국 어선 식별자 보존):
# - 중국 어선명 = 업체명(浙岭渔) + 선박번호(20865) 로 고유 식별. 번호 제거 시 동명이 수십 개 발생 → 번호 유지
# - 통일 대상: 공백/구두점, 대소문자, NO./No. prefix
_NAME_STRIP_CHARS = re.compile(r'[\s\-_./,()\[\]·•\u3000#]+')
_NAME_STRIP_PREFIX_NUMBER_MARKER = re.compile(r'\bNO\.?\s*', re.IGNORECASE)
_PERIOD_RANGE_PATTERN = re.compile(
r'(\d{4})[/\-.](\d{1,2})[/\-.](\d{1,2})\s*[-~]\s*(\d{4})[/\-.](\d{1,2})[/\-.](\d{1,2})'
)
def _parse_period_range(raw: str) -> Optional[tuple[datetime, datetime]]:
"""fishing_period 'YYYY/MM/DD - YYYY/MM/DD' 포맷을 파싱.
실패 시 None. 구분자는 / - . 모두 허용, 연결자는 - ~ 허용.
"""
if not raw:
return None
m = _PERIOD_RANGE_PATTERN.search(raw)
if not m:
return None
try:
y1, m1, d1, y2, m2, d2 = (int(x) for x in m.groups())
start = datetime(y1, m1, d1, 0, 0, 0)
end = datetime(y2, m2, d2, 23, 59, 59)
if end < start:
return None
return (start, end)
except (ValueError, TypeError):
return None
def _normalize_vessel_name(name: Optional[str]) -> str:
"""선박명을 매칭용으로 정규화.
1. upper() + strip
2. 'No.123' / 'NO 123' 의 번호 마커만 제거 (숫자 자체는 고유 식별자로 유지)
3. 공백/구두점/중간점/전각공백/#/ 제거
"""
if not name:
return ''
s = name.strip().upper()
s = _NAME_STRIP_PREFIX_NUMBER_MARKER.sub('', s)
s = _NAME_STRIP_CHARS.sub('', s)
return s
class FleetTracker:
def __init__(self) -> None:
self._companies: dict[int, dict] = {} # id → {name_cn, name_en}
self._vessels: dict[int, dict] = {} # id → {permit_no, name_cn, ...}
self._name_cn_map: dict[str, int] = {} # name_cn → vessel_id (정확일치)
self._name_en_map: dict[str, int] = {} # name_en(lowercase) → vessel_id (정확일치)
self._name_fuzzy_map: dict[str, list[int]] = {} # 정규화 이름 → [vessel_id, ...]
self._mmsi_to_vid: dict[str, int] = {} # mmsi → vessel_id (매칭된 것만)
self._gear_active: dict[str, dict] = {} # mmsi → {name, parent_mmsi, ...}
self._last_registry_load: float = 0.0
def load_registry(self, conn) -> None:
"""DB에서 fleet_companies + fleet_vessels 로드. 1시간 캐시."""
if time.time() - self._last_registry_load < _REGISTRY_CACHE_SEC:
return
cur = conn.cursor()
cur.execute(f'SELECT id, name_cn, name_en FROM {FLEET_COMPANIES}')
self._companies = {r[0]: {'name_cn': r[1], 'name_en': r[2]} for r in cur.fetchall()}
# 현재 연도 허가만 조회 (연단위 갱신 정책, permit_year NULL은 legacy 허용)
cur.execute(
f"""SELECT id, company_id, permit_no, name_cn, name_en, tonnage,
gear_code, fleet_role, pair_vessel_id, mmsi, fishery_code
FROM {FLEET_VESSELS}
WHERE permit_year = EXTRACT(YEAR FROM now())::int
OR permit_year IS NULL"""
)
self._vessels = {}
self._name_cn_map = {}
self._name_en_map = {}
self._name_fuzzy_map = {}
self._mmsi_to_vid = {}
for r in cur.fetchall():
vid = r[0]
v: dict = {
'id': vid,
'company_id': r[1],
'permit_no': r[2],
'name_cn': r[3],
'name_en': r[4],
'tonnage': r[5],
'gear_code': r[6],
'fleet_role': r[7],
'pair_vessel_id': r[8],
'mmsi': r[9],
'fishery_code': r[10],
}
self._vessels[vid] = v
if r[3]:
self._name_cn_map[r[3]] = vid
if r[4]:
self._name_en_map[r[4].lower().strip()] = vid
# FUZZY 매칭은 name_en 만 대상 (AIS 가 보고하는 이름은 영문이 주류)
key_en = _normalize_vessel_name(r[4])
if key_en:
self._name_fuzzy_map.setdefault(key_en, []).append(vid)
if r[9]:
self._mmsi_to_vid[r[9]] = vid
cur.close()
self._last_registry_load = time.time()
logger.info(
'fleet registry loaded: %d companies, %d vessels',
len(self._companies),
len(self._vessels),
)
def get_pair_mmsi(self, mmsi: str) -> Optional[str]:
"""PT 쌍끌이 쌍대 선박의 MMSI를 반환. 없으면 None."""
vid = self._mmsi_to_vid.get(mmsi)
if vid is None:
return None
pair_vid = self._vessels.get(vid, {}).get('pair_vessel_id')
if pair_vid is None:
return None
pair_vessel = self._vessels.get(pair_vid)
if pair_vessel is None:
return None
return pair_vessel.get('mmsi')
def get_vessel_gear_code(self, mmsi: str) -> Optional[str]:
"""등록 선박의 어구 코드(C21=PT, C22=OT 등)를 반환."""
vid = self._mmsi_to_vid.get(mmsi)
if vid is None:
return None
return self._vessels.get(vid, {}).get('gear_code')
def get_pt_registered_mmsis(self) -> set[str]:
"""쌍끌이(PT / PT-S) 로 등록된 선박의 MMSI 집합.
fishery_code 미도입 환경에서도 legacy gear_code='C21' 로 동작.
"""
result: set[str] = set()
for v in self._vessels.values():
gc = v.get('gear_code') or ''
mmsi = v.get('mmsi')
if mmsi and gc in ('C21',):
result.add(mmsi)
return result
def get_gear_episodes(self, mmsi: str, conn, hours: int = 24) -> list[dict]:
"""gear_identity_log 에서 최근 N시간 신호 에피소드 목록을 반환.
G-04 MMSI 조작 탐지용. first_seen_at / last_seen_at 포함.
"""
try:
cur = conn.cursor()
cur.execute(
f"""SELECT first_seen_at, last_seen_at
FROM {GEAR_IDENTITY_LOG}
WHERE mmsi = %s
AND first_seen_at > NOW() - (%s || ' hours')::interval
ORDER BY first_seen_at""",
(mmsi, str(hours)),
)
rows = cur.fetchall()
cur.close()
return [{'first_seen_at': r[0], 'last_seen_at': r[1]} for r in rows]
except Exception as exc:
logger.warning('get_gear_episodes 실패 [mmsi=%s]: %s', mmsi, exc)
return []
def get_registered_fishery_code(self, mmsi: str) -> Optional[str]:
"""fleet_vessels 에 등록된 선박의 fishery_code (PT/PT-S/GN/PS/OT/FC).
V029 이후 fishery_code 컬럼을 우선 참조. legacy gear_code(C21 등) 는 별도 경로.
"""
vid = self._mmsi_to_vid.get(mmsi)
if vid is None:
return None
v = self._vessels.get(vid, {})
return v.get('fishery_code') or None
def get_permit_periods(
self, mmsi: str, conn, year: Optional[int] = None,
) -> list[tuple[datetime, datetime]]:
"""선박의 허가 조업 기간을 파싱하여 [(start, end), ...] 반환.
fishery_permit_cn.fishing_period_1/2 의 'YYYY/MM/DD - YYYY/MM/DD' 포맷을 파싱.
'-' (미사용) 또는 파싱 실패 시 해당 구간 생략. G-02 금어기 판정에 사용.
"""
vid = self._mmsi_to_vid.get(mmsi)
if vid is None:
return []
v = self._vessels.get(vid)
if not v:
return []
permit_no = v.get('permit_no')
target_year = year or datetime.now().year
if not permit_no:
return []
try:
cur = conn.cursor()
cur.execute(
"""SELECT fishing_period_1, fishing_period_2
FROM kcg.fishery_permit_cn
WHERE permit_year = %s AND permit_no = %s""",
(target_year, permit_no),
)
row = cur.fetchone()
cur.close()
except Exception as exc:
logger.warning('get_permit_periods DB 실패 [mmsi=%s]: %s', mmsi, exc)
return []
if not row:
return []
periods: list[tuple[datetime, datetime]] = []
for raw in row:
if not raw or raw.strip() in ('-', ''):
continue
parsed = _parse_period_range(raw)
if parsed:
periods.append(parsed)
return periods
def get_gear_positions(
self, mmsi: str, df_vessel: Optional[pd.DataFrame] = None,
) -> list[tuple[float, float]]:
"""고정어구 위치 목록(lat, lon). G-05 drift 탐지용.
현재 단계에서는 선박 DataFrame(track 좌표)을 그대로 전달하는 경로만 지원.
향후 gear_tracking_snapshot 테이블이 도입되면 DB 경로 추가.
"""
if df_vessel is None or len(df_vessel) == 0:
return []
try:
lats = df_vessel['lat'].tolist()
lons = df_vessel['lon'].tolist()
return list(zip(lats, lons))
except Exception as exc:
logger.warning('get_gear_positions 실패 [mmsi=%s]: %s', mmsi, exc)
return []
def match_ais_to_registry(self, ais_vessels: list[dict], conn) -> None:
"""AIS 선박을 등록 선단에 매칭. DB 업데이트.
ais_vessels: [{mmsi, name, lat, lon, sog, cog}, ...]
"""
cur = conn.cursor()
matched_exact = 0
matched_fuzzy = 0
for v in ais_vessels:
mmsi = v.get('mmsi', '')
name = v.get('name', '')
if not mmsi or not name:
continue
# 이미 매칭됨 → last_seen_at 업데이트
if mmsi in self._mmsi_to_vid:
cur.execute(
f'UPDATE {FLEET_VESSELS} SET last_seen_at = NOW() WHERE id = %s',
(self._mmsi_to_vid[mmsi],),
)
continue
# NAME_EXACT 매칭
vid: Optional[int] = self._name_cn_map.get(name)
if not vid:
vid = self._name_en_map.get(name.lower().strip())
method = 'NAME_EXACT'
confidence = 0.95
# NAME_FUZZY 매칭 (정규화 후 lookup)
if not vid:
key = _normalize_vessel_name(name)
if key:
candidates = self._name_fuzzy_map.get(key, [])
# 이미 다른 MMSI에 할당된 vid 제외 → 동명이 중복 매칭 방지
unassigned = [
c for c in candidates
if not self._vessels.get(c, {}).get('mmsi')
or self._vessels[c].get('mmsi') == mmsi
]
if len(unassigned) == 1:
vid = unassigned[0]
method = 'NAME_FUZZY'
confidence = 0.80
if vid:
cur.execute(
f"""UPDATE {FLEET_VESSELS}
SET mmsi = %s, match_confidence = %s, match_method = %s,
last_seen_at = NOW(), updated_at = NOW()
WHERE id = %s AND (mmsi IS NULL OR mmsi = %s)""",
(mmsi, confidence, method, vid, mmsi),
)
self._mmsi_to_vid[mmsi] = vid
self._vessels[vid]['mmsi'] = mmsi
if method == 'NAME_FUZZY':
matched_fuzzy += 1
else:
matched_exact += 1
conn.commit()
cur.close()
if matched_exact or matched_fuzzy:
logger.info(
'AIS→registry matched: exact=%d, fuzzy=%d',
matched_exact, matched_fuzzy,
)
def track_gear_identity(self, gear_signals: list[dict], conn) -> None:
"""어구/어망 정체성 추적.
gear_signals: [{mmsi, name, lat, lon}, ...] — 이름이 XXX_숫자_숫자 패턴인 AIS 신호
"""
cur = conn.cursor()
now = datetime.now(timezone.utc)
for g in gear_signals:
mmsi = g['mmsi']
name = g['name']
lat = g.get('lat', 0)
lon = g.get('lon', 0)
# 모선명 + 인덱스 추출
parent_name: Optional[str] = None
idx1: Optional[int] = None
idx2: Optional[int] = None
m = GEAR_PATTERN.match(name)
if m:
# group(1): parent+index 패턴, group(2): 순수 숫자 패턴
if m.group(1):
parent_name = m.group(1).strip()
suffix = name[m.end(1):].strip(' _')
digits = re.findall(r'\d+', suffix)
idx1 = int(digits[0]) if len(digits) >= 1 else None
idx2 = int(digits[1]) if len(digits) >= 2 else None
else:
# 순수 숫자 이름 (예: 12345) — parent 없음, 인덱스만
idx1 = int(m.group(2))
else:
m2 = GEAR_PATTERN_PCT.match(name)
if m2:
parent_name = m2.group(1).strip()
effective_parent_name = parent_name or name
if not is_trackable_parent_name(effective_parent_name):
continue
# 모선 매칭 (EXACT → FUZZY 순)
parent_mmsi: Optional[str] = None
parent_vid: Optional[int] = None
if parent_name:
vid = self._name_cn_map.get(parent_name)
if not vid:
vid = self._name_en_map.get(parent_name.lower())
if not vid:
key = _normalize_vessel_name(parent_name)
if key:
candidates = self._name_fuzzy_map.get(key, [])
if len(candidates) == 1:
vid = candidates[0]
if vid:
parent_vid = vid
parent_mmsi = self._vessels[vid].get('mmsi')
match_method: Optional[str] = 'NAME_PARENT' if parent_vid else None
confidence = 0.9 if parent_vid else 0.0
# 기존 활성 행 조회
cur.execute(
f"""SELECT id, name FROM {GEAR_IDENTITY_LOG}
WHERE mmsi = %s AND is_active = TRUE""",
(mmsi,),
)
existing = cur.fetchone()
if existing:
if existing[1] == name:
# 같은 MMSI + 같은 이름 → 위치/시간 업데이트
cur.execute(
f"""UPDATE {GEAR_IDENTITY_LOG}
SET last_seen_at = %s, lat = %s, lon = %s
WHERE id = %s""",
(now, lat, lon, existing[0]),
)
else:
# 같은 MMSI + 다른 이름 → 이전 비활성화 + 새 행
cur.execute(
f'UPDATE {GEAR_IDENTITY_LOG} SET is_active = FALSE WHERE id = %s',
(existing[0],),
)
cur.execute(
f"""INSERT INTO {GEAR_IDENTITY_LOG}
(mmsi, name, parent_name, parent_mmsi, parent_vessel_id,
gear_index_1, gear_index_2, lat, lon,
match_method, match_confidence, first_seen_at, last_seen_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
(mmsi, name, parent_name, parent_mmsi, parent_vid,
idx1, idx2, lat, lon,
match_method, confidence, now, now),
)
else:
# 새 MMSI → 같은 이름이 다른 MMSI로 있는지 확인
cur.execute(
f"""SELECT id, mmsi FROM {GEAR_IDENTITY_LOG}
WHERE name = %s AND is_active = TRUE AND mmsi != %s""",
(name, mmsi),
)
old_mmsi_row = cur.fetchone()
if old_mmsi_row:
# 같은 이름 + 다른 MMSI → MMSI 변경
cur.execute(
f'UPDATE {GEAR_IDENTITY_LOG} SET is_active = FALSE WHERE id = %s',
(old_mmsi_row[0],),
)
logger.info('gear MMSI change: %s%s (name=%s)', old_mmsi_row[1], mmsi, name)
# 어피니티 점수 이전 (이전 MMSI → 새 MMSI)
try:
cur.execute(
f"UPDATE {GEAR_CORRELATION_SCORES} "
"SET target_mmsi = %s, updated_at = NOW() "
"WHERE target_mmsi = %s",
(mmsi, old_mmsi_row[1]),
)
if cur.rowcount > 0:
logger.info(
'transferred %d affinity scores: %s%s',
cur.rowcount, old_mmsi_row[1], mmsi,
)
except Exception as e:
logger.warning('affinity score transfer failed: %s', e)
cur.execute(
f"""INSERT INTO {GEAR_IDENTITY_LOG}
(mmsi, name, parent_name, parent_mmsi, parent_vessel_id,
gear_index_1, gear_index_2, lat, lon,
match_method, match_confidence, first_seen_at, last_seen_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
(mmsi, name, parent_name, parent_mmsi, parent_vid,
idx1, idx2, lat, lon,
match_method, confidence, now, now),
)
conn.commit()
cur.close()
def build_fleet_clusters(self, vessel_dfs: dict[str, pd.DataFrame]) -> dict[str, dict]:
"""등록 선단 기준으로 cluster 정보 구성.
Returns: {mmsi → {cluster_id, cluster_size, is_leader, fleet_role}}
cluster_id = company_id (등록 선단 기준)
"""
results: dict[str, dict] = {}
# 회사별로 현재 AIS 수신 중인 선박 그룹핑
company_vessels: dict[int, list[str]] = {}
for mmsi, vid in self._mmsi_to_vid.items():
v = self._vessels.get(vid)
if not v or mmsi not in vessel_dfs:
continue
cid = v['company_id']
company_vessels.setdefault(cid, []).append(mmsi)
for cid, mmsis in company_vessels.items():
if len(mmsis) < 2:
# 단독 선박 → NOISE
for mmsi in mmsis:
v = self._vessels.get(self._mmsi_to_vid.get(mmsi, -1), {})
results[mmsi] = {
'cluster_id': -1,
'cluster_size': 1,
'is_leader': False,
'fleet_role': v.get('fleet_role', 'NOISE'),
}
continue
# 2척 이상 → 등록 선단 클러스터
for mmsi in mmsis:
vid = self._mmsi_to_vid[mmsi]
v = self._vessels[vid]
results[mmsi] = {
'cluster_id': cid,
'cluster_size': len(mmsis),
'is_leader': v['fleet_role'] == 'MAIN',
'fleet_role': v['fleet_role'],
}
# 매칭 안 된 선박 → NOISE
for mmsi in vessel_dfs:
if mmsi not in results:
results[mmsi] = {
'cluster_id': -1,
'cluster_size': 0,
'is_leader': False,
'fleet_role': 'NOISE',
}
return results
def save_snapshot(self, vessel_dfs: dict[str, pd.DataFrame], conn) -> None:
"""fleet_tracking_snapshot 저장."""
now = datetime.now(timezone.utc)
cur = conn.cursor()
company_vessels: dict[int, list[str]] = {}
for mmsi, vid in self._mmsi_to_vid.items():
v = self._vessels.get(vid)
if not v or mmsi not in vessel_dfs:
continue
company_vessels.setdefault(v['company_id'], []).append(mmsi)
for cid, mmsis in company_vessels.items():
active = len(mmsis)
total = sum(1 for v in self._vessels.values() if v['company_id'] == cid)
lats: list[float] = []
lons: list[float] = []
for mmsi in mmsis:
df = vessel_dfs.get(mmsi)
if df is not None and len(df) > 0:
last = df.iloc[-1]
lats.append(float(last['lat']))
lons.append(float(last['lon']))
center_lat = sum(lats) / len(lats) if lats else None
center_lon = sum(lons) / len(lons) if lons else None
cur.execute(
f"""INSERT INTO {FLEET_TRACKING_SNAPSHOT}
(company_id, snapshot_time, total_vessels, active_vessels,
center_lat, center_lon)
VALUES (%s, %s, %s, %s, %s, %s)""",
(cid, now, total, active, center_lat, center_lon),
)
conn.commit()
cur.close()
logger.info('fleet snapshot saved: %d companies', len(company_vessels))
def get_company_vessels(self, vessel_dfs: dict[str, 'pd.DataFrame']) -> dict[int, list[str]]:
"""현재 AIS 수신 중인 등록 선단의 회사별 MMSI 목록 반환.
Returns: {company_id: [mmsi, ...]}
"""
result: dict[int, list[str]] = {}
for mmsi, vid in self._mmsi_to_vid.items():
v = self._vessels.get(vid)
if not v or mmsi not in vessel_dfs:
continue
result.setdefault(v['company_id'], []).append(mmsi)
return result
# 싱글턴
fleet_tracker = FleetTracker()