kcg-ai-monitoring/prediction/fleet_tracker.py
htlee a4e29629fc feat(detection): GEAR_IDENTITY_COLLISION 탐지 패턴 추가
동일 어구 이름이 서로 다른 MMSI 로 같은 5분 사이클에 동시 AIS 송출되는
공존 케이스를 신규 탐지 패턴으로 분리해 기록·분류한다. 부수 효과로
fleet_tracker.track_gear_identity 의 PK 충돌로 인한 사이클 실패도 해소.

Prediction
- algorithms/gear_identity.py: detect_gear_name_collisions + classify_severity
- fleet_tracker.py: 공존/교체 분기 분리, UPSERT helper, savepoint 점수 이전
- output/event_generator.py: run_gear_identity_collision_events 추가
- scheduler.py: track_gear_identity 직후 이벤트 승격 호출

Backend (domain/analysis)
- GearIdentityCollision 엔티티 + Repository(Specification+stats)
- GearIdentityCollisionService (@Transactional readOnly / @Auditable resolve)
- GearCollisionController /api/analysis/gear-collisions (list/stats/detail/resolve)
- GearCollisionResponse / StatsResponse / ResolveRequest (record)

DB
- V030__gear_identity_collision.sql: gear_identity_collisions 테이블
  + auth_perm_tree 엔트리(detection:gear-collision nav_sort=950) + 역할별 권한

Frontend
- shared/constants/gearCollisionStatuses.ts + catalogRegistry 등록
- services/gearCollisionApi.ts (list/stats/get/resolve)
- features/detection/GearCollisionDetection.tsx (PageContainer+Section+DataTable
  + 분류 액션 폼, design system SSOT 준수)
- componentRegistry + feature index + i18n detection.json / common.json(ko/en)
2026-04-17 06:53:12 +09:00

742 lines
30 KiB
Python

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""등록 선단 기반 추적기."""
import json
import logging
import re
import time
from datetime import datetime, timezone
from typing import Optional
import pandas as pd
import psycopg2
from algorithms.gear_identity import classify_severity, detect_gear_name_collisions
from algorithms.gear_name_rules import is_trackable_parent_name
from config import qualified_table
logger = logging.getLogger(__name__)
# 어구 이름 패턴 — 공백/영숫자 인덱스/끝_ 허용
GEAR_PATTERN = re.compile(r'^(.+?)_(?=\S*\d)\S+(?:[_ ]\S*)*[_ ]*$|^(\d+)$')
GEAR_PATTERN_PCT = re.compile(r'^(.+?)%$')
_REGISTRY_CACHE_SEC = 3600
FLEET_COMPANIES = qualified_table('fleet_companies')
FLEET_VESSELS = qualified_table('fleet_vessels')
GEAR_IDENTITY_LOG = qualified_table('gear_identity_log')
GEAR_CORRELATION_SCORES = qualified_table('gear_correlation_scores')
GEAR_IDENTITY_COLLISIONS = qualified_table('gear_identity_collisions')
FLEET_TRACKING_SNAPSHOT = qualified_table('fleet_tracking_snapshot')
# 선박명 정규화 (중국/한국 어선 식별자 보존):
# - 중국 어선명 = 업체명(浙岭渔) + 선박번호(20865) 로 고유 식별. 번호 제거 시 동명이 수십 개 발생 → 번호 유지
# - 통일 대상: 공백/구두점, 대소문자, NO./No. prefix
_NAME_STRIP_CHARS = re.compile(r'[\s\-_./,()\[\]·•\u3000#]+')
_NAME_STRIP_PREFIX_NUMBER_MARKER = re.compile(r'\bNO\.?\s*', re.IGNORECASE)
_PERIOD_RANGE_PATTERN = re.compile(
r'(\d{4})[/\-.](\d{1,2})[/\-.](\d{1,2})\s*[-~]\s*(\d{4})[/\-.](\d{1,2})[/\-.](\d{1,2})'
)
def _parse_period_range(raw: str) -> Optional[tuple[datetime, datetime]]:
"""fishing_period 'YYYY/MM/DD - YYYY/MM/DD' 포맷을 파싱.
실패 시 None. 구분자는 / - . 모두 허용, 연결자는 - ~ 허용.
"""
if not raw:
return None
m = _PERIOD_RANGE_PATTERN.search(raw)
if not m:
return None
try:
y1, m1, d1, y2, m2, d2 = (int(x) for x in m.groups())
start = datetime(y1, m1, d1, 0, 0, 0)
end = datetime(y2, m2, d2, 23, 59, 59)
if end < start:
return None
return (start, end)
except (ValueError, TypeError):
return None
def _normalize_vessel_name(name: Optional[str]) -> str:
"""선박명을 매칭용으로 정규화.
1. upper() + strip
2. 'No.123' / 'NO 123' 의 번호 마커만 제거 (숫자 자체는 고유 식별자로 유지)
3. 공백/구두점/중간점/전각공백/#/ 제거
"""
if not name:
return ''
s = name.strip().upper()
s = _NAME_STRIP_PREFIX_NUMBER_MARKER.sub('', s)
s = _NAME_STRIP_CHARS.sub('', s)
return s
class FleetTracker:
def __init__(self) -> None:
self._companies: dict[int, dict] = {} # id → {name_cn, name_en}
self._vessels: dict[int, dict] = {} # id → {permit_no, name_cn, ...}
self._name_cn_map: dict[str, int] = {} # name_cn → vessel_id (정확일치)
self._name_en_map: dict[str, int] = {} # name_en(lowercase) → vessel_id (정확일치)
self._name_fuzzy_map: dict[str, list[int]] = {} # 정규화 이름 → [vessel_id, ...]
self._mmsi_to_vid: dict[str, int] = {} # mmsi → vessel_id (매칭된 것만)
self._gear_active: dict[str, dict] = {} # mmsi → {name, parent_mmsi, ...}
self._last_registry_load: float = 0.0
def load_registry(self, conn) -> None:
"""DB에서 fleet_companies + fleet_vessels 로드. 1시간 캐시."""
if time.time() - self._last_registry_load < _REGISTRY_CACHE_SEC:
return
cur = conn.cursor()
cur.execute(f'SELECT id, name_cn, name_en FROM {FLEET_COMPANIES}')
self._companies = {r[0]: {'name_cn': r[1], 'name_en': r[2]} for r in cur.fetchall()}
# 현재 연도 허가만 조회 (연단위 갱신 정책, permit_year NULL은 legacy 허용)
cur.execute(
f"""SELECT id, company_id, permit_no, name_cn, name_en, tonnage,
gear_code, fleet_role, pair_vessel_id, mmsi, fishery_code
FROM {FLEET_VESSELS}
WHERE permit_year = EXTRACT(YEAR FROM now())::int
OR permit_year IS NULL"""
)
self._vessels = {}
self._name_cn_map = {}
self._name_en_map = {}
self._name_fuzzy_map = {}
self._mmsi_to_vid = {}
for r in cur.fetchall():
vid = r[0]
v: dict = {
'id': vid,
'company_id': r[1],
'permit_no': r[2],
'name_cn': r[3],
'name_en': r[4],
'tonnage': r[5],
'gear_code': r[6],
'fleet_role': r[7],
'pair_vessel_id': r[8],
'mmsi': r[9],
'fishery_code': r[10],
}
self._vessels[vid] = v
if r[3]:
self._name_cn_map[r[3]] = vid
if r[4]:
self._name_en_map[r[4].lower().strip()] = vid
# FUZZY 매칭은 name_en 만 대상 (AIS 가 보고하는 이름은 영문이 주류)
key_en = _normalize_vessel_name(r[4])
if key_en:
self._name_fuzzy_map.setdefault(key_en, []).append(vid)
if r[9]:
self._mmsi_to_vid[r[9]] = vid
cur.close()
self._last_registry_load = time.time()
logger.info(
'fleet registry loaded: %d companies, %d vessels',
len(self._companies),
len(self._vessels),
)
def get_pair_mmsi(self, mmsi: str) -> Optional[str]:
"""PT 쌍끌이 쌍대 선박의 MMSI를 반환. 없으면 None."""
vid = self._mmsi_to_vid.get(mmsi)
if vid is None:
return None
pair_vid = self._vessels.get(vid, {}).get('pair_vessel_id')
if pair_vid is None:
return None
pair_vessel = self._vessels.get(pair_vid)
if pair_vessel is None:
return None
return pair_vessel.get('mmsi')
def get_vessel_gear_code(self, mmsi: str) -> Optional[str]:
"""등록 선박의 어구 코드(C21=PT, C22=OT 등)를 반환."""
vid = self._mmsi_to_vid.get(mmsi)
if vid is None:
return None
return self._vessels.get(vid, {}).get('gear_code')
def get_pt_registered_mmsis(self) -> set[str]:
"""쌍끌이(PT / PT-S) 로 등록된 선박의 MMSI 집합.
fishery_code 미도입 환경에서도 legacy gear_code='C21' 로 동작.
"""
result: set[str] = set()
for v in self._vessels.values():
gc = v.get('gear_code') or ''
mmsi = v.get('mmsi')
if mmsi and gc in ('C21',):
result.add(mmsi)
return result
def get_gear_episodes(self, mmsi: str, conn, hours: int = 24) -> list[dict]:
"""gear_identity_log 에서 최근 N시간 신호 에피소드 목록을 반환.
G-04 MMSI 조작 탐지용. first_seen_at / last_seen_at 포함.
"""
try:
cur = conn.cursor()
cur.execute(
f"""SELECT first_seen_at, last_seen_at
FROM {GEAR_IDENTITY_LOG}
WHERE mmsi = %s
AND first_seen_at > NOW() - (%s || ' hours')::interval
ORDER BY first_seen_at""",
(mmsi, str(hours)),
)
rows = cur.fetchall()
cur.close()
return [{'first_seen_at': r[0], 'last_seen_at': r[1]} for r in rows]
except Exception as exc:
logger.warning('get_gear_episodes 실패 [mmsi=%s]: %s', mmsi, exc)
return []
def get_registered_fishery_code(self, mmsi: str) -> Optional[str]:
"""fleet_vessels 에 등록된 선박의 fishery_code (PT/PT-S/GN/PS/OT/FC).
V029 이후 fishery_code 컬럼을 우선 참조. legacy gear_code(C21 등) 는 별도 경로.
"""
vid = self._mmsi_to_vid.get(mmsi)
if vid is None:
return None
v = self._vessels.get(vid, {})
return v.get('fishery_code') or None
def get_permit_periods(
self, mmsi: str, conn, year: Optional[int] = None,
) -> list[tuple[datetime, datetime]]:
"""선박의 허가 조업 기간을 파싱하여 [(start, end), ...] 반환.
fishery_permit_cn.fishing_period_1/2 의 'YYYY/MM/DD - YYYY/MM/DD' 포맷을 파싱.
'-' (미사용) 또는 파싱 실패 시 해당 구간 생략. G-02 금어기 판정에 사용.
"""
vid = self._mmsi_to_vid.get(mmsi)
if vid is None:
return []
v = self._vessels.get(vid)
if not v:
return []
permit_no = v.get('permit_no')
target_year = year or datetime.now().year
if not permit_no:
return []
try:
cur = conn.cursor()
cur.execute(
"""SELECT fishing_period_1, fishing_period_2
FROM kcg.fishery_permit_cn
WHERE permit_year = %s AND permit_no = %s""",
(target_year, permit_no),
)
row = cur.fetchone()
cur.close()
except Exception as exc:
logger.warning('get_permit_periods DB 실패 [mmsi=%s]: %s', mmsi, exc)
return []
if not row:
return []
periods: list[tuple[datetime, datetime]] = []
for raw in row:
if not raw or raw.strip() in ('-', ''):
continue
parsed = _parse_period_range(raw)
if parsed:
periods.append(parsed)
return periods
def get_gear_positions(
self, mmsi: str, df_vessel: Optional[pd.DataFrame] = None,
) -> list[tuple[float, float]]:
"""고정어구 위치 목록(lat, lon). G-05 drift 탐지용.
현재 단계에서는 선박 DataFrame(track 좌표)을 그대로 전달하는 경로만 지원.
향후 gear_tracking_snapshot 테이블이 도입되면 DB 경로 추가.
"""
if df_vessel is None or len(df_vessel) == 0:
return []
try:
lats = df_vessel['lat'].tolist()
lons = df_vessel['lon'].tolist()
return list(zip(lats, lons))
except Exception as exc:
logger.warning('get_gear_positions 실패 [mmsi=%s]: %s', mmsi, exc)
return []
def match_ais_to_registry(self, ais_vessels: list[dict], conn) -> None:
"""AIS 선박을 등록 선단에 매칭. DB 업데이트.
ais_vessels: [{mmsi, name, lat, lon, sog, cog}, ...]
"""
cur = conn.cursor()
matched_exact = 0
matched_fuzzy = 0
for v in ais_vessels:
mmsi = v.get('mmsi', '')
name = v.get('name', '')
if not mmsi or not name:
continue
# 이미 매칭됨 → last_seen_at 업데이트
if mmsi in self._mmsi_to_vid:
cur.execute(
f'UPDATE {FLEET_VESSELS} SET last_seen_at = NOW() WHERE id = %s',
(self._mmsi_to_vid[mmsi],),
)
continue
# NAME_EXACT 매칭
vid: Optional[int] = self._name_cn_map.get(name)
if not vid:
vid = self._name_en_map.get(name.lower().strip())
method = 'NAME_EXACT'
confidence = 0.95
# NAME_FUZZY 매칭 (정규화 후 lookup)
if not vid:
key = _normalize_vessel_name(name)
if key:
candidates = self._name_fuzzy_map.get(key, [])
# 이미 다른 MMSI에 할당된 vid 제외 → 동명이 중복 매칭 방지
unassigned = [
c for c in candidates
if not self._vessels.get(c, {}).get('mmsi')
or self._vessels[c].get('mmsi') == mmsi
]
if len(unassigned) == 1:
vid = unassigned[0]
method = 'NAME_FUZZY'
confidence = 0.80
if vid:
cur.execute(
f"""UPDATE {FLEET_VESSELS}
SET mmsi = %s, match_confidence = %s, match_method = %s,
last_seen_at = NOW(), updated_at = NOW()
WHERE id = %s AND (mmsi IS NULL OR mmsi = %s)""",
(mmsi, confidence, method, vid, mmsi),
)
self._mmsi_to_vid[mmsi] = vid
self._vessels[vid]['mmsi'] = mmsi
if method == 'NAME_FUZZY':
matched_fuzzy += 1
else:
matched_exact += 1
conn.commit()
cur.close()
if matched_exact or matched_fuzzy:
logger.info(
'AIS→registry matched: exact=%d, fuzzy=%d',
matched_exact, matched_fuzzy,
)
def track_gear_identity(self, gear_signals: list[dict], conn) -> None:
"""어구/어망 정체성 추적.
gear_signals: [{mmsi, name, lat, lon}, ...] — 이름이 XXX_숫자_숫자 패턴인 AIS 신호
동일 이름이 서로 다른 MMSI 로 같은 cycle 에 동시에 수신된 경우는 "공존
(GEAR_IDENTITY_COLLISION)" 으로 간주해 gear_identity_collisions 에 누적하고,
gear_identity_log 는 각 MMSI 별로 독립 active 유지한다 (비활성화/점수 이전 X).
단일 MMSI 이름에 한해 기존 교체(sequential) 로직을 적용한다.
"""
cur = conn.cursor()
now = datetime.now(timezone.utc)
self._recent_collision_ids: list[int] = []
# 1) 공존 감지 + UPSERT (이번 사이클에 동일 이름 다중 MMSI 수신 확인)
collisions = detect_gear_name_collisions(gear_signals, now)
colliding_names: set[str] = {c['name'] for c in collisions}
for c in collisions:
cid = self._upsert_gear_collision(cur, c, now)
if cid is not None:
self._recent_collision_ids.append(cid)
# 2) 개별 신호 처리
for g in gear_signals:
mmsi = g['mmsi']
name = g['name']
lat = g.get('lat', 0)
lon = g.get('lon', 0)
parent_name, idx1, idx2 = self._parse_gear_name(name)
effective_parent_name = parent_name or name
if not is_trackable_parent_name(effective_parent_name):
continue
parent_mmsi, parent_vid = self._match_parent_vessel(parent_name)
match_method: Optional[str] = 'NAME_PARENT' if parent_vid else None
confidence = 0.9 if parent_vid else 0.0
is_colliding = name in colliding_names
# 기존 활성 행 조회
cur.execute(
f"""SELECT id, name FROM {GEAR_IDENTITY_LOG}
WHERE mmsi = %s AND is_active = TRUE""",
(mmsi,),
)
existing = cur.fetchone()
if existing:
if existing[1] == name:
# 같은 MMSI + 같은 이름 → 위치/시간 업데이트
cur.execute(
f"""UPDATE {GEAR_IDENTITY_LOG}
SET last_seen_at = %s, lat = %s, lon = %s
WHERE id = %s""",
(now, lat, lon, existing[0]),
)
else:
# 같은 MMSI + 다른 이름 → 이전 비활성화 + 새 행
cur.execute(
f'UPDATE {GEAR_IDENTITY_LOG} SET is_active = FALSE WHERE id = %s',
(existing[0],),
)
cur.execute(
f"""INSERT INTO {GEAR_IDENTITY_LOG}
(mmsi, name, parent_name, parent_mmsi, parent_vessel_id,
gear_index_1, gear_index_2, lat, lon,
match_method, match_confidence, first_seen_at, last_seen_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
(mmsi, name, parent_name, parent_mmsi, parent_vid,
idx1, idx2, lat, lon,
match_method, confidence, now, now),
)
else:
# 새 MMSI — 이름이 공존 케이스면 다른 MMSI 활성행을 건드리지 않고 이번 것만 INSERT
if not is_colliding:
# 교체 경로 — 같은 이름이 다른 MMSI 로 active 면 MMSI 교체로 간주
cur.execute(
f"""SELECT id, mmsi FROM {GEAR_IDENTITY_LOG}
WHERE name = %s AND is_active = TRUE AND mmsi != %s""",
(name, mmsi),
)
old_mmsi_row = cur.fetchone()
if old_mmsi_row:
cur.execute(
f'UPDATE {GEAR_IDENTITY_LOG} SET is_active = FALSE WHERE id = %s',
(old_mmsi_row[0],),
)
logger.info(
'gear MMSI change: %s%s (name=%s)',
old_mmsi_row[1], mmsi, name,
)
# 어피니티 점수 이전 (savepoint 로 격리 — PK 충돌 시 트랜잭션 유지)
self._transfer_affinity_scores(cur, old_mmsi_row[1], mmsi)
cur.execute(
f"""INSERT INTO {GEAR_IDENTITY_LOG}
(mmsi, name, parent_name, parent_mmsi, parent_vessel_id,
gear_index_1, gear_index_2, lat, lon,
match_method, match_confidence, first_seen_at, last_seen_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
(mmsi, name, parent_name, parent_mmsi, parent_vid,
idx1, idx2, lat, lon,
match_method, confidence, now, now),
)
if collisions:
logger.info(
'gear identity collisions: %d pairs touched (%d distinct names)',
len(collisions), len(colliding_names),
)
conn.commit()
cur.close()
def _parse_gear_name(self, name: str) -> tuple[Optional[str], Optional[int], Optional[int]]:
"""어구 이름에서 parent_name, idx1, idx2 추출."""
parent_name: Optional[str] = None
idx1: Optional[int] = None
idx2: Optional[int] = None
m = GEAR_PATTERN.match(name)
if m:
if m.group(1):
parent_name = m.group(1).strip()
suffix = name[m.end(1):].strip(' _')
digits = re.findall(r'\d+', suffix)
idx1 = int(digits[0]) if len(digits) >= 1 else None
idx2 = int(digits[1]) if len(digits) >= 2 else None
else:
idx1 = int(m.group(2))
else:
m2 = GEAR_PATTERN_PCT.match(name)
if m2:
parent_name = m2.group(1).strip()
return parent_name, idx1, idx2
def _match_parent_vessel(self, parent_name: Optional[str]) -> tuple[Optional[str], Optional[int]]:
"""parent_name → (parent_mmsi, parent_vessel_id) 매칭. EXACT → FUZZY 순."""
if not parent_name:
return None, None
vid = self._name_cn_map.get(parent_name)
if not vid:
vid = self._name_en_map.get(parent_name.lower())
if not vid:
key = _normalize_vessel_name(parent_name)
if key:
candidates = self._name_fuzzy_map.get(key, [])
if len(candidates) == 1:
vid = candidates[0]
if vid:
return self._vessels[vid].get('mmsi'), vid
return None, None
def _transfer_affinity_scores(self, cur, old_mmsi: str, new_mmsi: str) -> None:
"""gear_correlation_scores 의 target_mmsi 를 새 MMSI 로 이전.
PK = (model_id, group_key, sub_cluster_id, target_mmsi) 충돌 가능성이 있어
SAVEPOINT 로 격리. 충돌 시에는 이전을 포기하고 경고만 남긴다 — 상위 트랜잭션은
유지되어 뒤이은 INSERT 가 정상 동작한다.
"""
cur.execute('SAVEPOINT sp_score_xfer')
try:
cur.execute(
f"UPDATE {GEAR_CORRELATION_SCORES} "
"SET target_mmsi = %s, updated_at = NOW() "
"WHERE target_mmsi = %s",
(new_mmsi, old_mmsi),
)
if cur.rowcount > 0:
logger.info(
'transferred %d affinity scores: %s%s',
cur.rowcount, old_mmsi, new_mmsi,
)
cur.execute('RELEASE SAVEPOINT sp_score_xfer')
except psycopg2.errors.UniqueViolation as e:
cur.execute('ROLLBACK TO SAVEPOINT sp_score_xfer')
cur.execute('RELEASE SAVEPOINT sp_score_xfer')
logger.warning(
'affinity score transfer skipped (pk conflict): %s%s | %s',
old_mmsi, new_mmsi, e,
)
def _upsert_gear_collision(
self,
cur,
collision: dict,
now: datetime,
) -> Optional[int]:
"""단일 공존 쌍 UPSERT → row id 반환.
- 기존 행: coexistence_count += 1, last_seen_at/좌표 갱신, max_distance_km =
GREATEST(기존, 신규). status 가 OPEN/REVIEWED 인 경우에 한해 severity 재계산.
- 신규 행: first_seen_at = last_seen_at = now, severity 계산 후 INSERT.
"""
name = collision['name']
mmsi_lo = collision['mmsi_lo']
mmsi_hi = collision['mmsi_hi']
distance_km = collision.get('distance_km') or 0.0
# 모선 힌트 — fleet_tracker 매칭 결과를 우선, 없으면 알고리즘이 추출한 값 사용
hint_parent = collision.get('parent_name')
parent_mmsi, parent_vid = self._match_parent_vessel(hint_parent)
cur.execute(
f"""SELECT id, coexistence_count, max_distance_km, swap_count,
status, parent_name, parent_vessel_id
FROM {GEAR_IDENTITY_COLLISIONS}
WHERE name = %s AND mmsi_lo = %s AND mmsi_hi = %s""",
(name, mmsi_lo, mmsi_hi),
)
row = cur.fetchone()
evidence_fragment = {
'observed_at': now.isoformat(),
'distance_km': distance_km,
'positions': {
mmsi_lo: {
'lat': collision.get('lat_lo'),
'lon': collision.get('lon_lo'),
},
mmsi_hi: {
'lat': collision.get('lat_hi'),
'lon': collision.get('lon_hi'),
},
},
}
if row:
(row_id, cur_count, cur_max_dist, swap_cnt,
cur_status, cur_parent_name, cur_parent_vid) = row
new_count = (cur_count or 0) + 1
merged_max = max(
float(cur_max_dist or 0),
float(distance_km or 0),
)
if cur_status in ('CONFIRMED_ILLEGAL', 'FALSE_POSITIVE'):
new_severity = None # 확정 상태는 유지
else:
new_severity = classify_severity(new_count, merged_max, swap_cnt or 0)
cur.execute(
f"""UPDATE {GEAR_IDENTITY_COLLISIONS}
SET last_seen_at = %s,
coexistence_count = %s,
max_distance_km = %s,
last_lat_lo = %s,
last_lon_lo = %s,
last_lat_hi = %s,
last_lon_hi = %s,
parent_name = COALESCE(%s, parent_name),
parent_vessel_id = COALESCE(%s, parent_vessel_id),
severity = COALESCE(%s, severity),
evidence = COALESCE(evidence, '[]'::jsonb)
|| to_jsonb(%s::jsonb),
updated_at = NOW()
WHERE id = %s""",
(
now, new_count, merged_max,
collision.get('lat_lo'), collision.get('lon_lo'),
collision.get('lat_hi'), collision.get('lon_hi'),
hint_parent, parent_vid,
new_severity,
json.dumps([evidence_fragment]),
row_id,
),
)
return row_id
severity = classify_severity(1, distance_km, 0)
cur.execute(
f"""INSERT INTO {GEAR_IDENTITY_COLLISIONS}
(name, mmsi_lo, mmsi_hi, parent_name, parent_vessel_id,
first_seen_at, last_seen_at,
coexistence_count, max_distance_km,
last_lat_lo, last_lon_lo, last_lat_hi, last_lon_hi,
severity, status, evidence)
VALUES (%s,%s,%s,%s,%s,%s,%s,1,%s,%s,%s,%s,%s,%s,'OPEN',%s)
RETURNING id""",
(
name, mmsi_lo, mmsi_hi, hint_parent, parent_vid,
now, now, distance_km,
collision.get('lat_lo'), collision.get('lon_lo'),
collision.get('lat_hi'), collision.get('lon_hi'),
severity,
json.dumps([evidence_fragment]),
),
)
return cur.fetchone()[0]
def get_recent_collision_ids(self) -> list[int]:
"""이번 사이클에 갱신·추가된 gear_identity_collisions row id 목록."""
return list(getattr(self, '_recent_collision_ids', []))
def build_fleet_clusters(self, vessel_dfs: dict[str, pd.DataFrame]) -> dict[str, dict]:
"""등록 선단 기준으로 cluster 정보 구성.
Returns: {mmsi → {cluster_id, cluster_size, is_leader, fleet_role}}
cluster_id = company_id (등록 선단 기준)
"""
results: dict[str, dict] = {}
# 회사별로 현재 AIS 수신 중인 선박 그룹핑
company_vessels: dict[int, list[str]] = {}
for mmsi, vid in self._mmsi_to_vid.items():
v = self._vessels.get(vid)
if not v or mmsi not in vessel_dfs:
continue
cid = v['company_id']
company_vessels.setdefault(cid, []).append(mmsi)
for cid, mmsis in company_vessels.items():
if len(mmsis) < 2:
# 단독 선박 → NOISE
for mmsi in mmsis:
v = self._vessels.get(self._mmsi_to_vid.get(mmsi, -1), {})
results[mmsi] = {
'cluster_id': -1,
'cluster_size': 1,
'is_leader': False,
'fleet_role': v.get('fleet_role', 'NOISE'),
}
continue
# 2척 이상 → 등록 선단 클러스터
for mmsi in mmsis:
vid = self._mmsi_to_vid[mmsi]
v = self._vessels[vid]
results[mmsi] = {
'cluster_id': cid,
'cluster_size': len(mmsis),
'is_leader': v['fleet_role'] == 'MAIN',
'fleet_role': v['fleet_role'],
}
# 매칭 안 된 선박 → NOISE
for mmsi in vessel_dfs:
if mmsi not in results:
results[mmsi] = {
'cluster_id': -1,
'cluster_size': 0,
'is_leader': False,
'fleet_role': 'NOISE',
}
return results
def save_snapshot(self, vessel_dfs: dict[str, pd.DataFrame], conn) -> None:
"""fleet_tracking_snapshot 저장."""
now = datetime.now(timezone.utc)
cur = conn.cursor()
company_vessels: dict[int, list[str]] = {}
for mmsi, vid in self._mmsi_to_vid.items():
v = self._vessels.get(vid)
if not v or mmsi not in vessel_dfs:
continue
company_vessels.setdefault(v['company_id'], []).append(mmsi)
for cid, mmsis in company_vessels.items():
active = len(mmsis)
total = sum(1 for v in self._vessels.values() if v['company_id'] == cid)
lats: list[float] = []
lons: list[float] = []
for mmsi in mmsis:
df = vessel_dfs.get(mmsi)
if df is not None and len(df) > 0:
last = df.iloc[-1]
lats.append(float(last['lat']))
lons.append(float(last['lon']))
center_lat = sum(lats) / len(lats) if lats else None
center_lon = sum(lons) / len(lons) if lons else None
cur.execute(
f"""INSERT INTO {FLEET_TRACKING_SNAPSHOT}
(company_id, snapshot_time, total_vessels, active_vessels,
center_lat, center_lon)
VALUES (%s, %s, %s, %s, %s, %s)""",
(cid, now, total, active, center_lat, center_lon),
)
conn.commit()
cur.close()
logger.info('fleet snapshot saved: %d companies', len(company_vessels))
def get_company_vessels(self, vessel_dfs: dict[str, 'pd.DataFrame']) -> dict[int, list[str]]:
"""현재 AIS 수신 중인 등록 선단의 회사별 MMSI 목록 반환.
Returns: {company_id: [mmsi, ...]}
"""
result: dict[int, list[str]] = {}
for mmsi, vid in self._mmsi_to_vid.items():
v = self._vessels.get(vid)
if not v or mmsi not in vessel_dfs:
continue
result.setdefault(v['company_id'], []).append(mmsi)
return result
# 싱글턴
fleet_tracker = FleetTracker()