- DB migration 009: group_polygon_snapshots 테이블 (PostGIS geometry) - polygon_builder.py: Shapely 기반 convex hull + buffer 폴리곤 생성 - scheduler.py: 5분 주기 분석 사이클에 폴리곤 생성 Step 4.5 통합 - fleet_tracker.py: get_company_vessels() 메서드 추가 - kcgdb.py: save_group_snapshots(), cleanup_group_snapshots() 추가 - requirements.txt: shapely>=2.0 추가
336 lines
12 KiB
Python
336 lines
12 KiB
Python
"""등록 선단 기반 추적기."""
|
|
import logging
|
|
import re
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
import pandas as pd
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# 어구 이름 패턴
|
|
GEAR_PATTERN = re.compile(r'^(.+?)_(\d+)_(\d*)$')
|
|
GEAR_PATTERN_PCT = re.compile(r'^(.+?)%$')
|
|
|
|
_REGISTRY_CACHE_SEC = 3600
|
|
|
|
|
|
class FleetTracker:
|
|
def __init__(self) -> None:
|
|
self._companies: dict[int, dict] = {} # id → {name_cn, name_en}
|
|
self._vessels: dict[int, dict] = {} # id → {permit_no, name_cn, ...}
|
|
self._name_cn_map: dict[str, int] = {} # name_cn → vessel_id
|
|
self._name_en_map: dict[str, int] = {} # name_en(lowercase) → vessel_id
|
|
self._mmsi_to_vid: dict[str, int] = {} # mmsi → vessel_id (매칭된 것만)
|
|
self._gear_active: dict[str, dict] = {} # mmsi → {name, parent_mmsi, ...}
|
|
self._last_registry_load: float = 0.0
|
|
|
|
def load_registry(self, conn) -> None:
|
|
"""DB에서 fleet_companies + fleet_vessels 로드. 1시간 캐시."""
|
|
if time.time() - self._last_registry_load < _REGISTRY_CACHE_SEC:
|
|
return
|
|
|
|
cur = conn.cursor()
|
|
cur.execute('SELECT id, name_cn, name_en FROM kcg.fleet_companies')
|
|
self._companies = {r[0]: {'name_cn': r[1], 'name_en': r[2]} for r in cur.fetchall()}
|
|
|
|
cur.execute(
|
|
"""SELECT id, company_id, permit_no, name_cn, name_en, tonnage,
|
|
gear_code, fleet_role, pair_vessel_id, mmsi
|
|
FROM kcg.fleet_vessels"""
|
|
)
|
|
self._vessels = {}
|
|
self._name_cn_map = {}
|
|
self._name_en_map = {}
|
|
self._mmsi_to_vid = {}
|
|
|
|
for r in cur.fetchall():
|
|
vid = r[0]
|
|
v: dict = {
|
|
'id': vid,
|
|
'company_id': r[1],
|
|
'permit_no': r[2],
|
|
'name_cn': r[3],
|
|
'name_en': r[4],
|
|
'tonnage': r[5],
|
|
'gear_code': r[6],
|
|
'fleet_role': r[7],
|
|
'pair_vessel_id': r[8],
|
|
'mmsi': r[9],
|
|
}
|
|
self._vessels[vid] = v
|
|
if r[3]:
|
|
self._name_cn_map[r[3]] = vid
|
|
if r[4]:
|
|
self._name_en_map[r[4].lower().strip()] = vid
|
|
if r[9]:
|
|
self._mmsi_to_vid[r[9]] = vid
|
|
|
|
cur.close()
|
|
self._last_registry_load = time.time()
|
|
logger.info(
|
|
'fleet registry loaded: %d companies, %d vessels',
|
|
len(self._companies),
|
|
len(self._vessels),
|
|
)
|
|
|
|
def match_ais_to_registry(self, ais_vessels: list[dict], conn) -> None:
|
|
"""AIS 선박을 등록 선단에 매칭. DB 업데이트.
|
|
|
|
ais_vessels: [{mmsi, name, lat, lon, sog, cog}, ...]
|
|
"""
|
|
cur = conn.cursor()
|
|
matched = 0
|
|
|
|
for v in ais_vessels:
|
|
mmsi = v.get('mmsi', '')
|
|
name = v.get('name', '')
|
|
if not mmsi or not name:
|
|
continue
|
|
|
|
# 이미 매칭됨 → last_seen_at 업데이트
|
|
if mmsi in self._mmsi_to_vid:
|
|
cur.execute(
|
|
'UPDATE kcg.fleet_vessels SET last_seen_at = NOW() WHERE id = %s',
|
|
(self._mmsi_to_vid[mmsi],),
|
|
)
|
|
continue
|
|
|
|
# NAME_EXACT 매칭
|
|
vid: Optional[int] = self._name_cn_map.get(name)
|
|
if not vid:
|
|
vid = self._name_en_map.get(name.lower().strip())
|
|
|
|
if vid:
|
|
cur.execute(
|
|
"""UPDATE kcg.fleet_vessels
|
|
SET mmsi = %s, match_confidence = 0.95, match_method = 'NAME_EXACT',
|
|
last_seen_at = NOW(), updated_at = NOW()
|
|
WHERE id = %s AND (mmsi IS NULL OR mmsi = %s)""",
|
|
(mmsi, vid, mmsi),
|
|
)
|
|
self._mmsi_to_vid[mmsi] = vid
|
|
matched += 1
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
if matched > 0:
|
|
logger.info('AIS→registry matched: %d vessels', matched)
|
|
|
|
def track_gear_identity(self, gear_signals: list[dict], conn) -> None:
|
|
"""어구/어망 정체성 추적.
|
|
|
|
gear_signals: [{mmsi, name, lat, lon}, ...] — 이름이 XXX_숫자_숫자 패턴인 AIS 신호
|
|
"""
|
|
cur = conn.cursor()
|
|
now = datetime.now(timezone.utc)
|
|
|
|
for g in gear_signals:
|
|
mmsi = g['mmsi']
|
|
name = g['name']
|
|
lat = g.get('lat', 0)
|
|
lon = g.get('lon', 0)
|
|
|
|
# 모선명 + 인덱스 추출
|
|
parent_name: Optional[str] = None
|
|
idx1: Optional[int] = None
|
|
idx2: Optional[int] = None
|
|
|
|
m = GEAR_PATTERN.match(name)
|
|
if m:
|
|
parent_name = m.group(1).strip()
|
|
idx1 = int(m.group(2))
|
|
idx2 = int(m.group(3)) if m.group(3) else None
|
|
else:
|
|
m2 = GEAR_PATTERN_PCT.match(name)
|
|
if m2:
|
|
parent_name = m2.group(1).strip()
|
|
|
|
# 모선 매칭
|
|
parent_mmsi: Optional[str] = None
|
|
parent_vid: Optional[int] = None
|
|
if parent_name:
|
|
vid = self._name_cn_map.get(parent_name)
|
|
if not vid:
|
|
vid = self._name_en_map.get(parent_name.lower())
|
|
if vid:
|
|
parent_vid = vid
|
|
parent_mmsi = self._vessels[vid].get('mmsi')
|
|
|
|
match_method: Optional[str] = 'NAME_PARENT' if parent_vid else None
|
|
confidence = 0.9 if parent_vid else 0.0
|
|
|
|
# 기존 활성 행 조회
|
|
cur.execute(
|
|
"""SELECT id, name FROM kcg.gear_identity_log
|
|
WHERE mmsi = %s AND is_active = TRUE""",
|
|
(mmsi,),
|
|
)
|
|
existing = cur.fetchone()
|
|
|
|
if existing:
|
|
if existing[1] == name:
|
|
# 같은 MMSI + 같은 이름 → 위치/시간 업데이트
|
|
cur.execute(
|
|
"""UPDATE kcg.gear_identity_log
|
|
SET last_seen_at = %s, lat = %s, lon = %s
|
|
WHERE id = %s""",
|
|
(now, lat, lon, existing[0]),
|
|
)
|
|
else:
|
|
# 같은 MMSI + 다른 이름 → 이전 비활성화 + 새 행
|
|
cur.execute(
|
|
'UPDATE kcg.gear_identity_log SET is_active = FALSE WHERE id = %s',
|
|
(existing[0],),
|
|
)
|
|
cur.execute(
|
|
"""INSERT INTO kcg.gear_identity_log
|
|
(mmsi, name, parent_name, parent_mmsi, parent_vessel_id,
|
|
gear_index_1, gear_index_2, lat, lon,
|
|
match_method, match_confidence, first_seen_at, last_seen_at)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
|
|
(mmsi, name, parent_name, parent_mmsi, parent_vid,
|
|
idx1, idx2, lat, lon,
|
|
match_method, confidence, now, now),
|
|
)
|
|
else:
|
|
# 새 MMSI → 같은 이름이 다른 MMSI로 있는지 확인
|
|
cur.execute(
|
|
"""SELECT id, mmsi FROM kcg.gear_identity_log
|
|
WHERE name = %s AND is_active = TRUE AND mmsi != %s""",
|
|
(name, mmsi),
|
|
)
|
|
old_mmsi_row = cur.fetchone()
|
|
if old_mmsi_row:
|
|
# 같은 이름 + 다른 MMSI → MMSI 변경
|
|
cur.execute(
|
|
'UPDATE kcg.gear_identity_log SET is_active = FALSE WHERE id = %s',
|
|
(old_mmsi_row[0],),
|
|
)
|
|
logger.info('gear MMSI change: %s → %s (name=%s)', old_mmsi_row[1], mmsi, name)
|
|
|
|
cur.execute(
|
|
"""INSERT INTO kcg.gear_identity_log
|
|
(mmsi, name, parent_name, parent_mmsi, parent_vessel_id,
|
|
gear_index_1, gear_index_2, lat, lon,
|
|
match_method, match_confidence, first_seen_at, last_seen_at)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
|
|
(mmsi, name, parent_name, parent_mmsi, parent_vid,
|
|
idx1, idx2, lat, lon,
|
|
match_method, confidence, now, now),
|
|
)
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
|
|
def build_fleet_clusters(self, vessel_dfs: dict[str, pd.DataFrame]) -> dict[str, dict]:
|
|
"""등록 선단 기준으로 cluster 정보 구성.
|
|
|
|
Returns: {mmsi → {cluster_id, cluster_size, is_leader, fleet_role}}
|
|
cluster_id = company_id (등록 선단 기준)
|
|
"""
|
|
results: dict[str, dict] = {}
|
|
|
|
# 회사별로 현재 AIS 수신 중인 선박 그룹핑
|
|
company_vessels: dict[int, list[str]] = {}
|
|
for mmsi, vid in self._mmsi_to_vid.items():
|
|
v = self._vessels.get(vid)
|
|
if not v or mmsi not in vessel_dfs:
|
|
continue
|
|
cid = v['company_id']
|
|
company_vessels.setdefault(cid, []).append(mmsi)
|
|
|
|
for cid, mmsis in company_vessels.items():
|
|
if len(mmsis) < 2:
|
|
# 단독 선박 → NOISE
|
|
for mmsi in mmsis:
|
|
v = self._vessels.get(self._mmsi_to_vid.get(mmsi, -1), {})
|
|
results[mmsi] = {
|
|
'cluster_id': -1,
|
|
'cluster_size': 1,
|
|
'is_leader': False,
|
|
'fleet_role': v.get('fleet_role', 'NOISE'),
|
|
}
|
|
continue
|
|
|
|
# 2척 이상 → 등록 선단 클러스터
|
|
for mmsi in mmsis:
|
|
vid = self._mmsi_to_vid[mmsi]
|
|
v = self._vessels[vid]
|
|
results[mmsi] = {
|
|
'cluster_id': cid,
|
|
'cluster_size': len(mmsis),
|
|
'is_leader': v['fleet_role'] == 'MAIN',
|
|
'fleet_role': v['fleet_role'],
|
|
}
|
|
|
|
# 매칭 안 된 선박 → NOISE
|
|
for mmsi in vessel_dfs:
|
|
if mmsi not in results:
|
|
results[mmsi] = {
|
|
'cluster_id': -1,
|
|
'cluster_size': 0,
|
|
'is_leader': False,
|
|
'fleet_role': 'NOISE',
|
|
}
|
|
|
|
return results
|
|
|
|
def save_snapshot(self, vessel_dfs: dict[str, pd.DataFrame], conn) -> None:
|
|
"""fleet_tracking_snapshot 저장."""
|
|
now = datetime.now(timezone.utc)
|
|
cur = conn.cursor()
|
|
|
|
company_vessels: dict[int, list[str]] = {}
|
|
for mmsi, vid in self._mmsi_to_vid.items():
|
|
v = self._vessels.get(vid)
|
|
if not v or mmsi not in vessel_dfs:
|
|
continue
|
|
company_vessels.setdefault(v['company_id'], []).append(mmsi)
|
|
|
|
for cid, mmsis in company_vessels.items():
|
|
active = len(mmsis)
|
|
total = sum(1 for v in self._vessels.values() if v['company_id'] == cid)
|
|
|
|
lats: list[float] = []
|
|
lons: list[float] = []
|
|
for mmsi in mmsis:
|
|
df = vessel_dfs.get(mmsi)
|
|
if df is not None and len(df) > 0:
|
|
last = df.iloc[-1]
|
|
lats.append(float(last['lat']))
|
|
lons.append(float(last['lon']))
|
|
|
|
center_lat = sum(lats) / len(lats) if lats else None
|
|
center_lon = sum(lons) / len(lons) if lons else None
|
|
|
|
cur.execute(
|
|
"""INSERT INTO kcg.fleet_tracking_snapshot
|
|
(company_id, snapshot_time, total_vessels, active_vessels,
|
|
center_lat, center_lon)
|
|
VALUES (%s, %s, %s, %s, %s, %s)""",
|
|
(cid, now, total, active, center_lat, center_lon),
|
|
)
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
logger.info('fleet snapshot saved: %d companies', len(company_vessels))
|
|
|
|
def get_company_vessels(self, vessel_dfs: dict[str, 'pd.DataFrame']) -> dict[int, list[str]]:
|
|
"""현재 AIS 수신 중인 등록 선단의 회사별 MMSI 목록 반환.
|
|
|
|
Returns: {company_id: [mmsi, ...]}
|
|
"""
|
|
result: dict[int, list[str]] = {}
|
|
for mmsi, vid in self._mmsi_to_vid.items():
|
|
v = self._vessels.get(vid)
|
|
if not v or mmsi not in vessel_dfs:
|
|
continue
|
|
result.setdefault(v['company_id'], []).append(mmsi)
|
|
return result
|
|
|
|
|
|
# 싱글턴
|
|
fleet_tracker = FleetTracker()
|