import math import logging import numpy as np import pandas as pd from algorithms.location import haversine_nm, dist_to_baseline, EARTH_RADIUS_NM logger = logging.getLogger(__name__) def detect_group_clusters( vessel_snapshots: list[dict], spatial_eps_nm: float = 10.0, time_eps_hours: float = 2.0, min_vessels: int = 3, ) -> dict[int, list[dict]]: """DBSCAN 시공간 클러스터링으로 집단 탐지.""" if len(vessel_snapshots) < min_vessels: return {} try: from sklearn.cluster import DBSCAN except ImportError: logger.warning('sklearn not available for DBSCAN clustering') return {} lat_rad = [math.radians(v['lat']) * EARTH_RADIUS_NM for v in vessel_snapshots] lon_rad = [math.radians(v['lon']) * EARTH_RADIUS_NM for v in vessel_snapshots] # 시간을 NM 단위로 정규화 timestamps = [pd.Timestamp(v['timestamp']).timestamp() for v in vessel_snapshots] t_min = min(timestamps) time_nm = [(t - t_min) / 3600 * 10 / time_eps_hours for t in timestamps] X = np.array(list(zip(lat_rad, lon_rad, time_nm))) db = DBSCAN(eps=spatial_eps_nm, min_samples=min_vessels, metric='euclidean').fit(X) clusters: dict[int, list[dict]] = {} for idx, label in enumerate(db.labels_): if label == -1: continue clusters.setdefault(int(label), []).append(vessel_snapshots[idx]) return clusters def identify_lead_vessel(cluster_vessels: list[dict]) -> dict: """5기준 스코어링으로 대표선 특정.""" if not cluster_vessels: return {} scores: dict[str, float] = {} timestamps = [pd.Timestamp(v.get('timestamp', 0)).timestamp() for v in cluster_vessels] min_ts = min(timestamps) if timestamps else 0 lats = [v['lat'] for v in cluster_vessels] lons = [v['lon'] for v in cluster_vessels] centroid_lat = float(np.mean(lats)) centroid_lon = float(np.mean(lons)) for i, v in enumerate(cluster_vessels): mmsi = v['mmsi'] s = 0.0 # 기준 1: 최초 시각 (30점) ts_rank = timestamps[i] - min_ts s += 30.0 * (1.0 - min(ts_rank, 7200) / 7200) # 기준 2: 총톤수 (25점) — 외부 DB 연동 전까지 균등 배점 s += 12.5 # 기준 3: 클러스터 중심 근접성 (20점) dist_center = haversine_nm(v['lat'], v['lon'], centroid_lat, centroid_lon) s += 20.0 * (1.0 - min(dist_center, 10) / 10) # 기준 4: 기선 최근접 (15점) dist_base = dist_to_baseline(v['lat'], v['lon']) s += 15.0 * (1.0 - min(dist_base, 12) / 12) # 기준 5: AIS 소실 이력 (10점) — 이력 없으면 만점 s += 10.0 scores[mmsi] = round(s, 2) lead_mmsi = max(scores, key=lambda k: scores[k]) score_vals = sorted(scores.values(), reverse=True) if len(score_vals) > 1 and score_vals[0] - score_vals[1] > 15: confidence = 'HIGH' elif len(score_vals) > 1 and score_vals[0] - score_vals[1] > 8: confidence = 'MED' else: confidence = 'LOW' return { 'lead_mmsi': lead_mmsi, 'lead_score': scores[lead_mmsi], 'all_scores': scores, 'confidence': confidence, } def assign_fleet_roles( vessel_dfs: dict[str, pd.DataFrame], cluster_map: dict[str, int], ) -> dict[str, dict]: """선단 역할 할당: LEADER/MEMBER/NOISE.""" results: dict[str, dict] = {} # 클러스터별 그룹핑 clusters: dict[int, list[str]] = {} for mmsi, cid in cluster_map.items(): clusters.setdefault(cid, []).append(mmsi) for cid, mmsi_list in clusters.items(): if cid == -1: for mmsi in mmsi_list: results[mmsi] = { 'cluster_size': 0, 'is_leader': False, 'fleet_role': 'NOISE', } continue cluster_size = len(mmsi_list) # 스냅샷 생성 (각 선박의 마지막 포인트) snapshots: list[dict] = [] for mmsi in mmsi_list: df = vessel_dfs.get(mmsi) if df is not None and len(df) > 0: last = df.iloc[-1] snapshots.append({ 'mmsi': mmsi, 'lat': last['lat'], 'lon': last['lon'], 'timestamp': last.get('timestamp', pd.Timestamp.now()), }) lead_info = identify_lead_vessel(snapshots) if len(snapshots) >= 2 else {} lead_mmsi = lead_info.get('lead_mmsi') for mmsi in mmsi_list: results[mmsi] = { 'cluster_size': cluster_size, 'is_leader': mmsi == lead_mmsi, 'fleet_role': 'LEADER' if mmsi == lead_mmsi else 'MEMBER', } return results