동일 어구 이름이 서로 다른 MMSI 로 같은 5분 사이클에 동시 AIS 송출되는 공존 케이스를 신규 탐지 패턴으로 분리해 기록·분류한다. 부수 효과로 fleet_tracker.track_gear_identity 의 PK 충돌로 인한 사이클 실패도 해소. Prediction - algorithms/gear_identity.py: detect_gear_name_collisions + classify_severity - fleet_tracker.py: 공존/교체 분기 분리, UPSERT helper, savepoint 점수 이전 - output/event_generator.py: run_gear_identity_collision_events 추가 - scheduler.py: track_gear_identity 직후 이벤트 승격 호출 Backend (domain/analysis) - GearIdentityCollision 엔티티 + Repository(Specification+stats) - GearIdentityCollisionService (@Transactional readOnly / @Auditable resolve) - GearCollisionController /api/analysis/gear-collisions (list/stats/detail/resolve) - GearCollisionResponse / StatsResponse / ResolveRequest (record) DB - V030__gear_identity_collision.sql: gear_identity_collisions 테이블 + auth_perm_tree 엔트리(detection:gear-collision nav_sort=950) + 역할별 권한 Frontend - shared/constants/gearCollisionStatuses.ts + catalogRegistry 등록 - services/gearCollisionApi.ts (list/stats/get/resolve) - features/detection/GearCollisionDetection.tsx (PageContainer+Section+DataTable + 분류 액션 폼, design system SSOT 준수) - componentRegistry + feature index + i18n detection.json / common.json(ko/en)
742 lines
30 KiB
Python
742 lines
30 KiB
Python
"""등록 선단 기반 추적기."""
|
||
import json
|
||
import logging
|
||
import re
|
||
import time
|
||
from datetime import datetime, timezone
|
||
from typing import Optional
|
||
|
||
import pandas as pd
|
||
import psycopg2
|
||
|
||
from algorithms.gear_identity import classify_severity, detect_gear_name_collisions
|
||
from algorithms.gear_name_rules import is_trackable_parent_name
|
||
from config import qualified_table
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 어구 이름 패턴 — 공백/영숫자 인덱스/끝_ 허용
|
||
GEAR_PATTERN = re.compile(r'^(.+?)_(?=\S*\d)\S+(?:[_ ]\S*)*[_ ]*$|^(\d+)$')
|
||
GEAR_PATTERN_PCT = re.compile(r'^(.+?)%$')
|
||
|
||
_REGISTRY_CACHE_SEC = 3600
|
||
FLEET_COMPANIES = qualified_table('fleet_companies')
|
||
FLEET_VESSELS = qualified_table('fleet_vessels')
|
||
GEAR_IDENTITY_LOG = qualified_table('gear_identity_log')
|
||
GEAR_CORRELATION_SCORES = qualified_table('gear_correlation_scores')
|
||
GEAR_IDENTITY_COLLISIONS = qualified_table('gear_identity_collisions')
|
||
FLEET_TRACKING_SNAPSHOT = qualified_table('fleet_tracking_snapshot')
|
||
|
||
# 선박명 정규화 (중국/한국 어선 식별자 보존):
|
||
# - 중국 어선명 = 업체명(浙岭渔) + 선박번호(20865) 로 고유 식별. 번호 제거 시 동명이 수십 개 발생 → 번호 유지
|
||
# - 통일 대상: 공백/구두점, 대소문자, NO./No. prefix
|
||
_NAME_STRIP_CHARS = re.compile(r'[\s\-_./,()\[\]·•\u3000##]+')
|
||
_NAME_STRIP_PREFIX_NUMBER_MARKER = re.compile(r'\bNO\.?\s*', re.IGNORECASE)
|
||
|
||
|
||
_PERIOD_RANGE_PATTERN = re.compile(
|
||
r'(\d{4})[/\-.](\d{1,2})[/\-.](\d{1,2})\s*[-~~]\s*(\d{4})[/\-.](\d{1,2})[/\-.](\d{1,2})'
|
||
)
|
||
|
||
|
||
def _parse_period_range(raw: str) -> Optional[tuple[datetime, datetime]]:
|
||
"""fishing_period 'YYYY/MM/DD - YYYY/MM/DD' 포맷을 파싱.
|
||
|
||
실패 시 None. 구분자는 / - . 모두 허용, 연결자는 - ~ ~ 허용.
|
||
"""
|
||
if not raw:
|
||
return None
|
||
m = _PERIOD_RANGE_PATTERN.search(raw)
|
||
if not m:
|
||
return None
|
||
try:
|
||
y1, m1, d1, y2, m2, d2 = (int(x) for x in m.groups())
|
||
start = datetime(y1, m1, d1, 0, 0, 0)
|
||
end = datetime(y2, m2, d2, 23, 59, 59)
|
||
if end < start:
|
||
return None
|
||
return (start, end)
|
||
except (ValueError, TypeError):
|
||
return None
|
||
|
||
|
||
def _normalize_vessel_name(name: Optional[str]) -> str:
|
||
"""선박명을 매칭용으로 정규화.
|
||
|
||
1. upper() + strip
|
||
2. 'No.123' / 'NO 123' 의 번호 마커만 제거 (숫자 자체는 고유 식별자로 유지)
|
||
3. 공백/구두점/중간점/전각공백/#/# 제거
|
||
"""
|
||
if not name:
|
||
return ''
|
||
s = name.strip().upper()
|
||
s = _NAME_STRIP_PREFIX_NUMBER_MARKER.sub('', s)
|
||
s = _NAME_STRIP_CHARS.sub('', s)
|
||
return s
|
||
|
||
|
||
class FleetTracker:
|
||
def __init__(self) -> None:
|
||
self._companies: dict[int, dict] = {} # id → {name_cn, name_en}
|
||
self._vessels: dict[int, dict] = {} # id → {permit_no, name_cn, ...}
|
||
self._name_cn_map: dict[str, int] = {} # name_cn → vessel_id (정확일치)
|
||
self._name_en_map: dict[str, int] = {} # name_en(lowercase) → vessel_id (정확일치)
|
||
self._name_fuzzy_map: dict[str, list[int]] = {} # 정규화 이름 → [vessel_id, ...]
|
||
self._mmsi_to_vid: dict[str, int] = {} # mmsi → vessel_id (매칭된 것만)
|
||
self._gear_active: dict[str, dict] = {} # mmsi → {name, parent_mmsi, ...}
|
||
self._last_registry_load: float = 0.0
|
||
|
||
def load_registry(self, conn) -> None:
|
||
"""DB에서 fleet_companies + fleet_vessels 로드. 1시간 캐시."""
|
||
if time.time() - self._last_registry_load < _REGISTRY_CACHE_SEC:
|
||
return
|
||
|
||
cur = conn.cursor()
|
||
cur.execute(f'SELECT id, name_cn, name_en FROM {FLEET_COMPANIES}')
|
||
self._companies = {r[0]: {'name_cn': r[1], 'name_en': r[2]} for r in cur.fetchall()}
|
||
|
||
# 현재 연도 허가만 조회 (연단위 갱신 정책, permit_year NULL은 legacy 허용)
|
||
cur.execute(
|
||
f"""SELECT id, company_id, permit_no, name_cn, name_en, tonnage,
|
||
gear_code, fleet_role, pair_vessel_id, mmsi, fishery_code
|
||
FROM {FLEET_VESSELS}
|
||
WHERE permit_year = EXTRACT(YEAR FROM now())::int
|
||
OR permit_year IS NULL"""
|
||
)
|
||
self._vessels = {}
|
||
self._name_cn_map = {}
|
||
self._name_en_map = {}
|
||
self._name_fuzzy_map = {}
|
||
self._mmsi_to_vid = {}
|
||
|
||
for r in cur.fetchall():
|
||
vid = r[0]
|
||
v: dict = {
|
||
'id': vid,
|
||
'company_id': r[1],
|
||
'permit_no': r[2],
|
||
'name_cn': r[3],
|
||
'name_en': r[4],
|
||
'tonnage': r[5],
|
||
'gear_code': r[6],
|
||
'fleet_role': r[7],
|
||
'pair_vessel_id': r[8],
|
||
'mmsi': r[9],
|
||
'fishery_code': r[10],
|
||
}
|
||
self._vessels[vid] = v
|
||
if r[3]:
|
||
self._name_cn_map[r[3]] = vid
|
||
if r[4]:
|
||
self._name_en_map[r[4].lower().strip()] = vid
|
||
# FUZZY 매칭은 name_en 만 대상 (AIS 가 보고하는 이름은 영문이 주류)
|
||
key_en = _normalize_vessel_name(r[4])
|
||
if key_en:
|
||
self._name_fuzzy_map.setdefault(key_en, []).append(vid)
|
||
if r[9]:
|
||
self._mmsi_to_vid[r[9]] = vid
|
||
|
||
cur.close()
|
||
self._last_registry_load = time.time()
|
||
logger.info(
|
||
'fleet registry loaded: %d companies, %d vessels',
|
||
len(self._companies),
|
||
len(self._vessels),
|
||
)
|
||
|
||
def get_pair_mmsi(self, mmsi: str) -> Optional[str]:
|
||
"""PT 쌍끌이 쌍대 선박의 MMSI를 반환. 없으면 None."""
|
||
vid = self._mmsi_to_vid.get(mmsi)
|
||
if vid is None:
|
||
return None
|
||
pair_vid = self._vessels.get(vid, {}).get('pair_vessel_id')
|
||
if pair_vid is None:
|
||
return None
|
||
pair_vessel = self._vessels.get(pair_vid)
|
||
if pair_vessel is None:
|
||
return None
|
||
return pair_vessel.get('mmsi')
|
||
|
||
def get_vessel_gear_code(self, mmsi: str) -> Optional[str]:
|
||
"""등록 선박의 어구 코드(C21=PT, C22=OT 등)를 반환."""
|
||
vid = self._mmsi_to_vid.get(mmsi)
|
||
if vid is None:
|
||
return None
|
||
return self._vessels.get(vid, {}).get('gear_code')
|
||
|
||
def get_pt_registered_mmsis(self) -> set[str]:
|
||
"""쌍끌이(PT / PT-S) 로 등록된 선박의 MMSI 집합.
|
||
|
||
fishery_code 미도입 환경에서도 legacy gear_code='C21' 로 동작.
|
||
"""
|
||
result: set[str] = set()
|
||
for v in self._vessels.values():
|
||
gc = v.get('gear_code') or ''
|
||
mmsi = v.get('mmsi')
|
||
if mmsi and gc in ('C21',):
|
||
result.add(mmsi)
|
||
return result
|
||
|
||
def get_gear_episodes(self, mmsi: str, conn, hours: int = 24) -> list[dict]:
|
||
"""gear_identity_log 에서 최근 N시간 신호 에피소드 목록을 반환.
|
||
|
||
G-04 MMSI 조작 탐지용. first_seen_at / last_seen_at 포함.
|
||
"""
|
||
try:
|
||
cur = conn.cursor()
|
||
cur.execute(
|
||
f"""SELECT first_seen_at, last_seen_at
|
||
FROM {GEAR_IDENTITY_LOG}
|
||
WHERE mmsi = %s
|
||
AND first_seen_at > NOW() - (%s || ' hours')::interval
|
||
ORDER BY first_seen_at""",
|
||
(mmsi, str(hours)),
|
||
)
|
||
rows = cur.fetchall()
|
||
cur.close()
|
||
return [{'first_seen_at': r[0], 'last_seen_at': r[1]} for r in rows]
|
||
except Exception as exc:
|
||
logger.warning('get_gear_episodes 실패 [mmsi=%s]: %s', mmsi, exc)
|
||
return []
|
||
|
||
def get_registered_fishery_code(self, mmsi: str) -> Optional[str]:
|
||
"""fleet_vessels 에 등록된 선박의 fishery_code (PT/PT-S/GN/PS/OT/FC).
|
||
|
||
V029 이후 fishery_code 컬럼을 우선 참조. legacy gear_code(C21 등) 는 별도 경로.
|
||
"""
|
||
vid = self._mmsi_to_vid.get(mmsi)
|
||
if vid is None:
|
||
return None
|
||
v = self._vessels.get(vid, {})
|
||
return v.get('fishery_code') or None
|
||
|
||
def get_permit_periods(
|
||
self, mmsi: str, conn, year: Optional[int] = None,
|
||
) -> list[tuple[datetime, datetime]]:
|
||
"""선박의 허가 조업 기간을 파싱하여 [(start, end), ...] 반환.
|
||
|
||
fishery_permit_cn.fishing_period_1/2 의 'YYYY/MM/DD - YYYY/MM/DD' 포맷을 파싱.
|
||
'-' (미사용) 또는 파싱 실패 시 해당 구간 생략. G-02 금어기 판정에 사용.
|
||
"""
|
||
vid = self._mmsi_to_vid.get(mmsi)
|
||
if vid is None:
|
||
return []
|
||
v = self._vessels.get(vid)
|
||
if not v:
|
||
return []
|
||
permit_no = v.get('permit_no')
|
||
target_year = year or datetime.now().year
|
||
if not permit_no:
|
||
return []
|
||
|
||
try:
|
||
cur = conn.cursor()
|
||
cur.execute(
|
||
"""SELECT fishing_period_1, fishing_period_2
|
||
FROM kcg.fishery_permit_cn
|
||
WHERE permit_year = %s AND permit_no = %s""",
|
||
(target_year, permit_no),
|
||
)
|
||
row = cur.fetchone()
|
||
cur.close()
|
||
except Exception as exc:
|
||
logger.warning('get_permit_periods DB 실패 [mmsi=%s]: %s', mmsi, exc)
|
||
return []
|
||
if not row:
|
||
return []
|
||
|
||
periods: list[tuple[datetime, datetime]] = []
|
||
for raw in row:
|
||
if not raw or raw.strip() in ('-', ''):
|
||
continue
|
||
parsed = _parse_period_range(raw)
|
||
if parsed:
|
||
periods.append(parsed)
|
||
return periods
|
||
|
||
def get_gear_positions(
|
||
self, mmsi: str, df_vessel: Optional[pd.DataFrame] = None,
|
||
) -> list[tuple[float, float]]:
|
||
"""고정어구 위치 목록(lat, lon). G-05 drift 탐지용.
|
||
|
||
현재 단계에서는 선박 DataFrame(track 좌표)을 그대로 전달하는 경로만 지원.
|
||
향후 gear_tracking_snapshot 테이블이 도입되면 DB 경로 추가.
|
||
"""
|
||
if df_vessel is None or len(df_vessel) == 0:
|
||
return []
|
||
try:
|
||
lats = df_vessel['lat'].tolist()
|
||
lons = df_vessel['lon'].tolist()
|
||
return list(zip(lats, lons))
|
||
except Exception as exc:
|
||
logger.warning('get_gear_positions 실패 [mmsi=%s]: %s', mmsi, exc)
|
||
return []
|
||
|
||
def match_ais_to_registry(self, ais_vessels: list[dict], conn) -> None:
|
||
"""AIS 선박을 등록 선단에 매칭. DB 업데이트.
|
||
|
||
ais_vessels: [{mmsi, name, lat, lon, sog, cog}, ...]
|
||
"""
|
||
cur = conn.cursor()
|
||
matched_exact = 0
|
||
matched_fuzzy = 0
|
||
|
||
for v in ais_vessels:
|
||
mmsi = v.get('mmsi', '')
|
||
name = v.get('name', '')
|
||
if not mmsi or not name:
|
||
continue
|
||
|
||
# 이미 매칭됨 → last_seen_at 업데이트
|
||
if mmsi in self._mmsi_to_vid:
|
||
cur.execute(
|
||
f'UPDATE {FLEET_VESSELS} SET last_seen_at = NOW() WHERE id = %s',
|
||
(self._mmsi_to_vid[mmsi],),
|
||
)
|
||
continue
|
||
|
||
# NAME_EXACT 매칭
|
||
vid: Optional[int] = self._name_cn_map.get(name)
|
||
if not vid:
|
||
vid = self._name_en_map.get(name.lower().strip())
|
||
method = 'NAME_EXACT'
|
||
confidence = 0.95
|
||
|
||
# NAME_FUZZY 매칭 (정규화 후 lookup)
|
||
if not vid:
|
||
key = _normalize_vessel_name(name)
|
||
if key:
|
||
candidates = self._name_fuzzy_map.get(key, [])
|
||
# 이미 다른 MMSI에 할당된 vid 제외 → 동명이 중복 매칭 방지
|
||
unassigned = [
|
||
c for c in candidates
|
||
if not self._vessels.get(c, {}).get('mmsi')
|
||
or self._vessels[c].get('mmsi') == mmsi
|
||
]
|
||
if len(unassigned) == 1:
|
||
vid = unassigned[0]
|
||
method = 'NAME_FUZZY'
|
||
confidence = 0.80
|
||
|
||
if vid:
|
||
cur.execute(
|
||
f"""UPDATE {FLEET_VESSELS}
|
||
SET mmsi = %s, match_confidence = %s, match_method = %s,
|
||
last_seen_at = NOW(), updated_at = NOW()
|
||
WHERE id = %s AND (mmsi IS NULL OR mmsi = %s)""",
|
||
(mmsi, confidence, method, vid, mmsi),
|
||
)
|
||
self._mmsi_to_vid[mmsi] = vid
|
||
self._vessels[vid]['mmsi'] = mmsi
|
||
if method == 'NAME_FUZZY':
|
||
matched_fuzzy += 1
|
||
else:
|
||
matched_exact += 1
|
||
|
||
conn.commit()
|
||
cur.close()
|
||
if matched_exact or matched_fuzzy:
|
||
logger.info(
|
||
'AIS→registry matched: exact=%d, fuzzy=%d',
|
||
matched_exact, matched_fuzzy,
|
||
)
|
||
|
||
def track_gear_identity(self, gear_signals: list[dict], conn) -> None:
|
||
"""어구/어망 정체성 추적.
|
||
|
||
gear_signals: [{mmsi, name, lat, lon}, ...] — 이름이 XXX_숫자_숫자 패턴인 AIS 신호
|
||
|
||
동일 이름이 서로 다른 MMSI 로 같은 cycle 에 동시에 수신된 경우는 "공존
|
||
(GEAR_IDENTITY_COLLISION)" 으로 간주해 gear_identity_collisions 에 누적하고,
|
||
gear_identity_log 는 각 MMSI 별로 독립 active 유지한다 (비활성화/점수 이전 X).
|
||
단일 MMSI 이름에 한해 기존 교체(sequential) 로직을 적용한다.
|
||
"""
|
||
cur = conn.cursor()
|
||
now = datetime.now(timezone.utc)
|
||
self._recent_collision_ids: list[int] = []
|
||
|
||
# 1) 공존 감지 + UPSERT (이번 사이클에 동일 이름 다중 MMSI 수신 확인)
|
||
collisions = detect_gear_name_collisions(gear_signals, now)
|
||
colliding_names: set[str] = {c['name'] for c in collisions}
|
||
for c in collisions:
|
||
cid = self._upsert_gear_collision(cur, c, now)
|
||
if cid is not None:
|
||
self._recent_collision_ids.append(cid)
|
||
|
||
# 2) 개별 신호 처리
|
||
for g in gear_signals:
|
||
mmsi = g['mmsi']
|
||
name = g['name']
|
||
lat = g.get('lat', 0)
|
||
lon = g.get('lon', 0)
|
||
|
||
parent_name, idx1, idx2 = self._parse_gear_name(name)
|
||
effective_parent_name = parent_name or name
|
||
if not is_trackable_parent_name(effective_parent_name):
|
||
continue
|
||
|
||
parent_mmsi, parent_vid = self._match_parent_vessel(parent_name)
|
||
match_method: Optional[str] = 'NAME_PARENT' if parent_vid else None
|
||
confidence = 0.9 if parent_vid else 0.0
|
||
is_colliding = name in colliding_names
|
||
|
||
# 기존 활성 행 조회
|
||
cur.execute(
|
||
f"""SELECT id, name FROM {GEAR_IDENTITY_LOG}
|
||
WHERE mmsi = %s AND is_active = TRUE""",
|
||
(mmsi,),
|
||
)
|
||
existing = cur.fetchone()
|
||
|
||
if existing:
|
||
if existing[1] == name:
|
||
# 같은 MMSI + 같은 이름 → 위치/시간 업데이트
|
||
cur.execute(
|
||
f"""UPDATE {GEAR_IDENTITY_LOG}
|
||
SET last_seen_at = %s, lat = %s, lon = %s
|
||
WHERE id = %s""",
|
||
(now, lat, lon, existing[0]),
|
||
)
|
||
else:
|
||
# 같은 MMSI + 다른 이름 → 이전 비활성화 + 새 행
|
||
cur.execute(
|
||
f'UPDATE {GEAR_IDENTITY_LOG} SET is_active = FALSE WHERE id = %s',
|
||
(existing[0],),
|
||
)
|
||
cur.execute(
|
||
f"""INSERT INTO {GEAR_IDENTITY_LOG}
|
||
(mmsi, name, parent_name, parent_mmsi, parent_vessel_id,
|
||
gear_index_1, gear_index_2, lat, lon,
|
||
match_method, match_confidence, first_seen_at, last_seen_at)
|
||
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
|
||
(mmsi, name, parent_name, parent_mmsi, parent_vid,
|
||
idx1, idx2, lat, lon,
|
||
match_method, confidence, now, now),
|
||
)
|
||
else:
|
||
# 새 MMSI — 이름이 공존 케이스면 다른 MMSI 활성행을 건드리지 않고 이번 것만 INSERT
|
||
if not is_colliding:
|
||
# 교체 경로 — 같은 이름이 다른 MMSI 로 active 면 MMSI 교체로 간주
|
||
cur.execute(
|
||
f"""SELECT id, mmsi FROM {GEAR_IDENTITY_LOG}
|
||
WHERE name = %s AND is_active = TRUE AND mmsi != %s""",
|
||
(name, mmsi),
|
||
)
|
||
old_mmsi_row = cur.fetchone()
|
||
if old_mmsi_row:
|
||
cur.execute(
|
||
f'UPDATE {GEAR_IDENTITY_LOG} SET is_active = FALSE WHERE id = %s',
|
||
(old_mmsi_row[0],),
|
||
)
|
||
logger.info(
|
||
'gear MMSI change: %s → %s (name=%s)',
|
||
old_mmsi_row[1], mmsi, name,
|
||
)
|
||
# 어피니티 점수 이전 (savepoint 로 격리 — PK 충돌 시 트랜잭션 유지)
|
||
self._transfer_affinity_scores(cur, old_mmsi_row[1], mmsi)
|
||
|
||
cur.execute(
|
||
f"""INSERT INTO {GEAR_IDENTITY_LOG}
|
||
(mmsi, name, parent_name, parent_mmsi, parent_vessel_id,
|
||
gear_index_1, gear_index_2, lat, lon,
|
||
match_method, match_confidence, first_seen_at, last_seen_at)
|
||
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
|
||
(mmsi, name, parent_name, parent_mmsi, parent_vid,
|
||
idx1, idx2, lat, lon,
|
||
match_method, confidence, now, now),
|
||
)
|
||
|
||
if collisions:
|
||
logger.info(
|
||
'gear identity collisions: %d pairs touched (%d distinct names)',
|
||
len(collisions), len(colliding_names),
|
||
)
|
||
conn.commit()
|
||
cur.close()
|
||
|
||
def _parse_gear_name(self, name: str) -> tuple[Optional[str], Optional[int], Optional[int]]:
|
||
"""어구 이름에서 parent_name, idx1, idx2 추출."""
|
||
parent_name: Optional[str] = None
|
||
idx1: Optional[int] = None
|
||
idx2: Optional[int] = None
|
||
m = GEAR_PATTERN.match(name)
|
||
if m:
|
||
if m.group(1):
|
||
parent_name = m.group(1).strip()
|
||
suffix = name[m.end(1):].strip(' _')
|
||
digits = re.findall(r'\d+', suffix)
|
||
idx1 = int(digits[0]) if len(digits) >= 1 else None
|
||
idx2 = int(digits[1]) if len(digits) >= 2 else None
|
||
else:
|
||
idx1 = int(m.group(2))
|
||
else:
|
||
m2 = GEAR_PATTERN_PCT.match(name)
|
||
if m2:
|
||
parent_name = m2.group(1).strip()
|
||
return parent_name, idx1, idx2
|
||
|
||
def _match_parent_vessel(self, parent_name: Optional[str]) -> tuple[Optional[str], Optional[int]]:
|
||
"""parent_name → (parent_mmsi, parent_vessel_id) 매칭. EXACT → FUZZY 순."""
|
||
if not parent_name:
|
||
return None, None
|
||
vid = self._name_cn_map.get(parent_name)
|
||
if not vid:
|
||
vid = self._name_en_map.get(parent_name.lower())
|
||
if not vid:
|
||
key = _normalize_vessel_name(parent_name)
|
||
if key:
|
||
candidates = self._name_fuzzy_map.get(key, [])
|
||
if len(candidates) == 1:
|
||
vid = candidates[0]
|
||
if vid:
|
||
return self._vessels[vid].get('mmsi'), vid
|
||
return None, None
|
||
|
||
def _transfer_affinity_scores(self, cur, old_mmsi: str, new_mmsi: str) -> None:
|
||
"""gear_correlation_scores 의 target_mmsi 를 새 MMSI 로 이전.
|
||
|
||
PK = (model_id, group_key, sub_cluster_id, target_mmsi) 충돌 가능성이 있어
|
||
SAVEPOINT 로 격리. 충돌 시에는 이전을 포기하고 경고만 남긴다 — 상위 트랜잭션은
|
||
유지되어 뒤이은 INSERT 가 정상 동작한다.
|
||
"""
|
||
cur.execute('SAVEPOINT sp_score_xfer')
|
||
try:
|
||
cur.execute(
|
||
f"UPDATE {GEAR_CORRELATION_SCORES} "
|
||
"SET target_mmsi = %s, updated_at = NOW() "
|
||
"WHERE target_mmsi = %s",
|
||
(new_mmsi, old_mmsi),
|
||
)
|
||
if cur.rowcount > 0:
|
||
logger.info(
|
||
'transferred %d affinity scores: %s → %s',
|
||
cur.rowcount, old_mmsi, new_mmsi,
|
||
)
|
||
cur.execute('RELEASE SAVEPOINT sp_score_xfer')
|
||
except psycopg2.errors.UniqueViolation as e:
|
||
cur.execute('ROLLBACK TO SAVEPOINT sp_score_xfer')
|
||
cur.execute('RELEASE SAVEPOINT sp_score_xfer')
|
||
logger.warning(
|
||
'affinity score transfer skipped (pk conflict): %s → %s | %s',
|
||
old_mmsi, new_mmsi, e,
|
||
)
|
||
|
||
def _upsert_gear_collision(
|
||
self,
|
||
cur,
|
||
collision: dict,
|
||
now: datetime,
|
||
) -> Optional[int]:
|
||
"""단일 공존 쌍 UPSERT → row id 반환.
|
||
|
||
- 기존 행: coexistence_count += 1, last_seen_at/좌표 갱신, max_distance_km =
|
||
GREATEST(기존, 신규). status 가 OPEN/REVIEWED 인 경우에 한해 severity 재계산.
|
||
- 신규 행: first_seen_at = last_seen_at = now, severity 계산 후 INSERT.
|
||
"""
|
||
name = collision['name']
|
||
mmsi_lo = collision['mmsi_lo']
|
||
mmsi_hi = collision['mmsi_hi']
|
||
distance_km = collision.get('distance_km') or 0.0
|
||
|
||
# 모선 힌트 — fleet_tracker 매칭 결과를 우선, 없으면 알고리즘이 추출한 값 사용
|
||
hint_parent = collision.get('parent_name')
|
||
parent_mmsi, parent_vid = self._match_parent_vessel(hint_parent)
|
||
|
||
cur.execute(
|
||
f"""SELECT id, coexistence_count, max_distance_km, swap_count,
|
||
status, parent_name, parent_vessel_id
|
||
FROM {GEAR_IDENTITY_COLLISIONS}
|
||
WHERE name = %s AND mmsi_lo = %s AND mmsi_hi = %s""",
|
||
(name, mmsi_lo, mmsi_hi),
|
||
)
|
||
row = cur.fetchone()
|
||
evidence_fragment = {
|
||
'observed_at': now.isoformat(),
|
||
'distance_km': distance_km,
|
||
'positions': {
|
||
mmsi_lo: {
|
||
'lat': collision.get('lat_lo'),
|
||
'lon': collision.get('lon_lo'),
|
||
},
|
||
mmsi_hi: {
|
||
'lat': collision.get('lat_hi'),
|
||
'lon': collision.get('lon_hi'),
|
||
},
|
||
},
|
||
}
|
||
|
||
if row:
|
||
(row_id, cur_count, cur_max_dist, swap_cnt,
|
||
cur_status, cur_parent_name, cur_parent_vid) = row
|
||
new_count = (cur_count or 0) + 1
|
||
merged_max = max(
|
||
float(cur_max_dist or 0),
|
||
float(distance_km or 0),
|
||
)
|
||
if cur_status in ('CONFIRMED_ILLEGAL', 'FALSE_POSITIVE'):
|
||
new_severity = None # 확정 상태는 유지
|
||
else:
|
||
new_severity = classify_severity(new_count, merged_max, swap_cnt or 0)
|
||
cur.execute(
|
||
f"""UPDATE {GEAR_IDENTITY_COLLISIONS}
|
||
SET last_seen_at = %s,
|
||
coexistence_count = %s,
|
||
max_distance_km = %s,
|
||
last_lat_lo = %s,
|
||
last_lon_lo = %s,
|
||
last_lat_hi = %s,
|
||
last_lon_hi = %s,
|
||
parent_name = COALESCE(%s, parent_name),
|
||
parent_vessel_id = COALESCE(%s, parent_vessel_id),
|
||
severity = COALESCE(%s, severity),
|
||
evidence = COALESCE(evidence, '[]'::jsonb)
|
||
|| to_jsonb(%s::jsonb),
|
||
updated_at = NOW()
|
||
WHERE id = %s""",
|
||
(
|
||
now, new_count, merged_max,
|
||
collision.get('lat_lo'), collision.get('lon_lo'),
|
||
collision.get('lat_hi'), collision.get('lon_hi'),
|
||
hint_parent, parent_vid,
|
||
new_severity,
|
||
json.dumps([evidence_fragment]),
|
||
row_id,
|
||
),
|
||
)
|
||
return row_id
|
||
|
||
severity = classify_severity(1, distance_km, 0)
|
||
cur.execute(
|
||
f"""INSERT INTO {GEAR_IDENTITY_COLLISIONS}
|
||
(name, mmsi_lo, mmsi_hi, parent_name, parent_vessel_id,
|
||
first_seen_at, last_seen_at,
|
||
coexistence_count, max_distance_km,
|
||
last_lat_lo, last_lon_lo, last_lat_hi, last_lon_hi,
|
||
severity, status, evidence)
|
||
VALUES (%s,%s,%s,%s,%s,%s,%s,1,%s,%s,%s,%s,%s,%s,'OPEN',%s)
|
||
RETURNING id""",
|
||
(
|
||
name, mmsi_lo, mmsi_hi, hint_parent, parent_vid,
|
||
now, now, distance_km,
|
||
collision.get('lat_lo'), collision.get('lon_lo'),
|
||
collision.get('lat_hi'), collision.get('lon_hi'),
|
||
severity,
|
||
json.dumps([evidence_fragment]),
|
||
),
|
||
)
|
||
return cur.fetchone()[0]
|
||
|
||
def get_recent_collision_ids(self) -> list[int]:
|
||
"""이번 사이클에 갱신·추가된 gear_identity_collisions row id 목록."""
|
||
return list(getattr(self, '_recent_collision_ids', []))
|
||
|
||
def build_fleet_clusters(self, vessel_dfs: dict[str, pd.DataFrame]) -> dict[str, dict]:
|
||
"""등록 선단 기준으로 cluster 정보 구성.
|
||
|
||
Returns: {mmsi → {cluster_id, cluster_size, is_leader, fleet_role}}
|
||
cluster_id = company_id (등록 선단 기준)
|
||
"""
|
||
results: dict[str, dict] = {}
|
||
|
||
# 회사별로 현재 AIS 수신 중인 선박 그룹핑
|
||
company_vessels: dict[int, list[str]] = {}
|
||
for mmsi, vid in self._mmsi_to_vid.items():
|
||
v = self._vessels.get(vid)
|
||
if not v or mmsi not in vessel_dfs:
|
||
continue
|
||
cid = v['company_id']
|
||
company_vessels.setdefault(cid, []).append(mmsi)
|
||
|
||
for cid, mmsis in company_vessels.items():
|
||
if len(mmsis) < 2:
|
||
# 단독 선박 → NOISE
|
||
for mmsi in mmsis:
|
||
v = self._vessels.get(self._mmsi_to_vid.get(mmsi, -1), {})
|
||
results[mmsi] = {
|
||
'cluster_id': -1,
|
||
'cluster_size': 1,
|
||
'is_leader': False,
|
||
'fleet_role': v.get('fleet_role', 'NOISE'),
|
||
}
|
||
continue
|
||
|
||
# 2척 이상 → 등록 선단 클러스터
|
||
for mmsi in mmsis:
|
||
vid = self._mmsi_to_vid[mmsi]
|
||
v = self._vessels[vid]
|
||
results[mmsi] = {
|
||
'cluster_id': cid,
|
||
'cluster_size': len(mmsis),
|
||
'is_leader': v['fleet_role'] == 'MAIN',
|
||
'fleet_role': v['fleet_role'],
|
||
}
|
||
|
||
# 매칭 안 된 선박 → NOISE
|
||
for mmsi in vessel_dfs:
|
||
if mmsi not in results:
|
||
results[mmsi] = {
|
||
'cluster_id': -1,
|
||
'cluster_size': 0,
|
||
'is_leader': False,
|
||
'fleet_role': 'NOISE',
|
||
}
|
||
|
||
return results
|
||
|
||
def save_snapshot(self, vessel_dfs: dict[str, pd.DataFrame], conn) -> None:
|
||
"""fleet_tracking_snapshot 저장."""
|
||
now = datetime.now(timezone.utc)
|
||
cur = conn.cursor()
|
||
|
||
company_vessels: dict[int, list[str]] = {}
|
||
for mmsi, vid in self._mmsi_to_vid.items():
|
||
v = self._vessels.get(vid)
|
||
if not v or mmsi not in vessel_dfs:
|
||
continue
|
||
company_vessels.setdefault(v['company_id'], []).append(mmsi)
|
||
|
||
for cid, mmsis in company_vessels.items():
|
||
active = len(mmsis)
|
||
total = sum(1 for v in self._vessels.values() if v['company_id'] == cid)
|
||
|
||
lats: list[float] = []
|
||
lons: list[float] = []
|
||
for mmsi in mmsis:
|
||
df = vessel_dfs.get(mmsi)
|
||
if df is not None and len(df) > 0:
|
||
last = df.iloc[-1]
|
||
lats.append(float(last['lat']))
|
||
lons.append(float(last['lon']))
|
||
|
||
center_lat = sum(lats) / len(lats) if lats else None
|
||
center_lon = sum(lons) / len(lons) if lons else None
|
||
|
||
cur.execute(
|
||
f"""INSERT INTO {FLEET_TRACKING_SNAPSHOT}
|
||
(company_id, snapshot_time, total_vessels, active_vessels,
|
||
center_lat, center_lon)
|
||
VALUES (%s, %s, %s, %s, %s, %s)""",
|
||
(cid, now, total, active, center_lat, center_lon),
|
||
)
|
||
|
||
conn.commit()
|
||
cur.close()
|
||
logger.info('fleet snapshot saved: %d companies', len(company_vessels))
|
||
|
||
def get_company_vessels(self, vessel_dfs: dict[str, 'pd.DataFrame']) -> dict[int, list[str]]:
|
||
"""현재 AIS 수신 중인 등록 선단의 회사별 MMSI 목록 반환.
|
||
|
||
Returns: {company_id: [mmsi, ...]}
|
||
"""
|
||
result: dict[int, list[str]] = {}
|
||
for mmsi, vid in self._mmsi_to_vid.items():
|
||
v = self._vessels.get(vid)
|
||
if not v or mmsi not in vessel_dfs:
|
||
continue
|
||
result.setdefault(v['company_id'], []).append(mmsi)
|
||
return result
|
||
|
||
|
||
# 싱글턴
|
||
fleet_tracker = FleetTracker()
|