kcg-monitoring/prediction/algorithms/fleet.py
htlee d13baf302f fix: 위험도 점수 수역 가산 + 클러스터 그리드 셀 방식 전환
- risk.py: 특정어업수역(ZONE_I~IV) 내 미허가 어선 +25점 가산
- fleet.py: DBSCAN → 고정 그리드 셀(5NM) 클러스터링 (체인 효과 차단)
  - max_cluster_size=20으로 거대 클러스터 방지

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 17:38:49 +09:00

156 lines
4.9 KiB
Python

import math
import logging
import numpy as np
import pandas as pd
from algorithms.location import haversine_nm, dist_to_baseline, EARTH_RADIUS_NM
logger = logging.getLogger(__name__)
def detect_group_clusters(
vessel_snapshots: list[dict],
cell_size_nm: float = 5.0,
min_vessels: int = 3,
max_cluster_size: int = 20,
) -> dict[int, list[dict]]:
"""고정 그리드 셀 기반 클러스터링 — DBSCAN 체인 효과 방지.
cell_size_nm 격자로 공간을 분할하여 같은 셀에 속하는 선박을 그룹핑.
체인 효과 없이 max_cluster_size 제한으로 거대 클러스터 방지.
"""
if len(vessel_snapshots) < min_vessels:
return {}
# 위도 1도 ≈ 60NM, 경도 1도 ≈ 60*cos(lat) NM
# 중위도(35도) 기준 경도 1도 ≈ 49NM
cell_lat = cell_size_nm / 60.0
cell_lng = cell_size_nm / 49.0
# 격자 셀별 선박 그룹핑
cells: dict[tuple[int, int], list[dict]] = {}
for v in vessel_snapshots:
cell_key = (int(v['lat'] / cell_lat), int(v['lon'] / cell_lng))
cells.setdefault(cell_key, []).append(v)
clusters: dict[int, list[dict]] = {}
cluster_id = 0
for cell_vessels in cells.values():
if len(cell_vessels) < min_vessels:
continue
# 셀 내 선박을 max_cluster_size 단위로 분할
for i in range(0, len(cell_vessels), max_cluster_size):
batch = cell_vessels[i:i + max_cluster_size]
if len(batch) >= min_vessels:
clusters[cluster_id] = batch
cluster_id += 1
return clusters
def identify_lead_vessel(cluster_vessels: list[dict]) -> dict:
"""5기준 스코어링으로 대표선 특정."""
if not cluster_vessels:
return {}
scores: dict[str, float] = {}
timestamps = [pd.Timestamp(v.get('timestamp', 0)).timestamp() for v in cluster_vessels]
min_ts = min(timestamps) if timestamps else 0
lats = [v['lat'] for v in cluster_vessels]
lons = [v['lon'] for v in cluster_vessels]
centroid_lat = float(np.mean(lats))
centroid_lon = float(np.mean(lons))
for i, v in enumerate(cluster_vessels):
mmsi = v['mmsi']
s = 0.0
# 기준 1: 최초 시각 (30점)
ts_rank = timestamps[i] - min_ts
s += 30.0 * (1.0 - min(ts_rank, 7200) / 7200)
# 기준 2: 총톤수 (25점) — 외부 DB 연동 전까지 균등 배점
s += 12.5
# 기준 3: 클러스터 중심 근접성 (20점)
dist_center = haversine_nm(v['lat'], v['lon'], centroid_lat, centroid_lon)
s += 20.0 * (1.0 - min(dist_center, 10) / 10)
# 기준 4: 기선 최근접 (15점)
dist_base = dist_to_baseline(v['lat'], v['lon'])
s += 15.0 * (1.0 - min(dist_base, 12) / 12)
# 기준 5: AIS 소실 이력 (10점) — 이력 없으면 만점
s += 10.0
scores[mmsi] = round(s, 2)
lead_mmsi = max(scores, key=lambda k: scores[k])
score_vals = sorted(scores.values(), reverse=True)
if len(score_vals) > 1 and score_vals[0] - score_vals[1] > 15:
confidence = 'HIGH'
elif len(score_vals) > 1 and score_vals[0] - score_vals[1] > 8:
confidence = 'MED'
else:
confidence = 'LOW'
return {
'lead_mmsi': lead_mmsi,
'lead_score': scores[lead_mmsi],
'all_scores': scores,
'confidence': confidence,
}
def assign_fleet_roles(
vessel_dfs: dict[str, pd.DataFrame],
cluster_map: dict[str, int],
) -> dict[str, dict]:
"""선단 역할 할당: LEADER/MEMBER/NOISE."""
results: dict[str, dict] = {}
# 클러스터별 그룹핑
clusters: dict[int, list[str]] = {}
for mmsi, cid in cluster_map.items():
clusters.setdefault(cid, []).append(mmsi)
for cid, mmsi_list in clusters.items():
if cid == -1:
for mmsi in mmsi_list:
results[mmsi] = {
'cluster_size': 0,
'is_leader': False,
'fleet_role': 'NOISE',
}
continue
cluster_size = len(mmsi_list)
# 스냅샷 생성 (각 선박의 마지막 포인트)
snapshots: list[dict] = []
for mmsi in mmsi_list:
df = vessel_dfs.get(mmsi)
if df is not None and len(df) > 0:
last = df.iloc[-1]
snapshots.append({
'mmsi': mmsi,
'lat': last['lat'],
'lon': last['lon'],
'timestamp': last.get('timestamp', pd.Timestamp.now()),
})
lead_info = identify_lead_vessel(snapshots) if len(snapshots) >= 2 else {}
lead_mmsi = lead_info.get('lead_mmsi')
for mmsi in mmsi_list:
results[mmsi] = {
'cluster_size': cluster_size,
'is_leader': mmsi == lead_mmsi,
'fleet_role': 'LEADER' if mmsi == lead_mmsi else 'MEMBER',
}
return results