- risk.py: 특정어업수역(ZONE_I~IV) 내 미허가 어선 +25점 가산 - fleet.py: DBSCAN → 고정 그리드 셀(5NM) 클러스터링 (체인 효과 차단) - max_cluster_size=20으로 거대 클러스터 방지 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
156 lines
4.9 KiB
Python
156 lines
4.9 KiB
Python
import math
|
|
import logging
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
from algorithms.location import haversine_nm, dist_to_baseline, EARTH_RADIUS_NM
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def detect_group_clusters(
|
|
vessel_snapshots: list[dict],
|
|
cell_size_nm: float = 5.0,
|
|
min_vessels: int = 3,
|
|
max_cluster_size: int = 20,
|
|
) -> dict[int, list[dict]]:
|
|
"""고정 그리드 셀 기반 클러스터링 — DBSCAN 체인 효과 방지.
|
|
|
|
cell_size_nm 격자로 공간을 분할하여 같은 셀에 속하는 선박을 그룹핑.
|
|
체인 효과 없이 max_cluster_size 제한으로 거대 클러스터 방지.
|
|
"""
|
|
if len(vessel_snapshots) < min_vessels:
|
|
return {}
|
|
|
|
# 위도 1도 ≈ 60NM, 경도 1도 ≈ 60*cos(lat) NM
|
|
# 중위도(35도) 기준 경도 1도 ≈ 49NM
|
|
cell_lat = cell_size_nm / 60.0
|
|
cell_lng = cell_size_nm / 49.0
|
|
|
|
# 격자 셀별 선박 그룹핑
|
|
cells: dict[tuple[int, int], list[dict]] = {}
|
|
for v in vessel_snapshots:
|
|
cell_key = (int(v['lat'] / cell_lat), int(v['lon'] / cell_lng))
|
|
cells.setdefault(cell_key, []).append(v)
|
|
|
|
clusters: dict[int, list[dict]] = {}
|
|
cluster_id = 0
|
|
for cell_vessels in cells.values():
|
|
if len(cell_vessels) < min_vessels:
|
|
continue
|
|
# 셀 내 선박을 max_cluster_size 단위로 분할
|
|
for i in range(0, len(cell_vessels), max_cluster_size):
|
|
batch = cell_vessels[i:i + max_cluster_size]
|
|
if len(batch) >= min_vessels:
|
|
clusters[cluster_id] = batch
|
|
cluster_id += 1
|
|
|
|
return clusters
|
|
|
|
|
|
def identify_lead_vessel(cluster_vessels: list[dict]) -> dict:
|
|
"""5기준 스코어링으로 대표선 특정."""
|
|
if not cluster_vessels:
|
|
return {}
|
|
|
|
scores: dict[str, float] = {}
|
|
|
|
timestamps = [pd.Timestamp(v.get('timestamp', 0)).timestamp() for v in cluster_vessels]
|
|
min_ts = min(timestamps) if timestamps else 0
|
|
|
|
lats = [v['lat'] for v in cluster_vessels]
|
|
lons = [v['lon'] for v in cluster_vessels]
|
|
centroid_lat = float(np.mean(lats))
|
|
centroid_lon = float(np.mean(lons))
|
|
|
|
for i, v in enumerate(cluster_vessels):
|
|
mmsi = v['mmsi']
|
|
s = 0.0
|
|
|
|
# 기준 1: 최초 시각 (30점)
|
|
ts_rank = timestamps[i] - min_ts
|
|
s += 30.0 * (1.0 - min(ts_rank, 7200) / 7200)
|
|
|
|
# 기준 2: 총톤수 (25점) — 외부 DB 연동 전까지 균등 배점
|
|
s += 12.5
|
|
|
|
# 기준 3: 클러스터 중심 근접성 (20점)
|
|
dist_center = haversine_nm(v['lat'], v['lon'], centroid_lat, centroid_lon)
|
|
s += 20.0 * (1.0 - min(dist_center, 10) / 10)
|
|
|
|
# 기준 4: 기선 최근접 (15점)
|
|
dist_base = dist_to_baseline(v['lat'], v['lon'])
|
|
s += 15.0 * (1.0 - min(dist_base, 12) / 12)
|
|
|
|
# 기준 5: AIS 소실 이력 (10점) — 이력 없으면 만점
|
|
s += 10.0
|
|
|
|
scores[mmsi] = round(s, 2)
|
|
|
|
lead_mmsi = max(scores, key=lambda k: scores[k])
|
|
score_vals = sorted(scores.values(), reverse=True)
|
|
|
|
if len(score_vals) > 1 and score_vals[0] - score_vals[1] > 15:
|
|
confidence = 'HIGH'
|
|
elif len(score_vals) > 1 and score_vals[0] - score_vals[1] > 8:
|
|
confidence = 'MED'
|
|
else:
|
|
confidence = 'LOW'
|
|
|
|
return {
|
|
'lead_mmsi': lead_mmsi,
|
|
'lead_score': scores[lead_mmsi],
|
|
'all_scores': scores,
|
|
'confidence': confidence,
|
|
}
|
|
|
|
|
|
def assign_fleet_roles(
|
|
vessel_dfs: dict[str, pd.DataFrame],
|
|
cluster_map: dict[str, int],
|
|
) -> dict[str, dict]:
|
|
"""선단 역할 할당: LEADER/MEMBER/NOISE."""
|
|
results: dict[str, dict] = {}
|
|
|
|
# 클러스터별 그룹핑
|
|
clusters: dict[int, list[str]] = {}
|
|
for mmsi, cid in cluster_map.items():
|
|
clusters.setdefault(cid, []).append(mmsi)
|
|
|
|
for cid, mmsi_list in clusters.items():
|
|
if cid == -1:
|
|
for mmsi in mmsi_list:
|
|
results[mmsi] = {
|
|
'cluster_size': 0,
|
|
'is_leader': False,
|
|
'fleet_role': 'NOISE',
|
|
}
|
|
continue
|
|
|
|
cluster_size = len(mmsi_list)
|
|
|
|
# 스냅샷 생성 (각 선박의 마지막 포인트)
|
|
snapshots: list[dict] = []
|
|
for mmsi in mmsi_list:
|
|
df = vessel_dfs.get(mmsi)
|
|
if df is not None and len(df) > 0:
|
|
last = df.iloc[-1]
|
|
snapshots.append({
|
|
'mmsi': mmsi,
|
|
'lat': last['lat'],
|
|
'lon': last['lon'],
|
|
'timestamp': last.get('timestamp', pd.Timestamp.now()),
|
|
})
|
|
|
|
lead_info = identify_lead_vessel(snapshots) if len(snapshots) >= 2 else {}
|
|
lead_mmsi = lead_info.get('lead_mmsi')
|
|
|
|
for mmsi in mmsi_list:
|
|
results[mmsi] = {
|
|
'cluster_size': cluster_size,
|
|
'is_leader': mmsi == lead_mmsi,
|
|
'fleet_role': 'LEADER' if mmsi == lead_mmsi else 'MEMBER',
|
|
}
|
|
|
|
return results
|