Merge pull request 'release: 선단 패턴 매칭 + 수역 위험도' (#129) from develop into main
All checks were successful
Deploy KCG / deploy (push) Successful in 1m52s
All checks were successful
Deploy KCG / deploy (push) Successful in 1m52s
This commit is contained in:
커밋
56b92e408f
@ -1,155 +1,177 @@
|
||||
import math
|
||||
"""선단(Fleet) 패턴 탐지 — 공간+행동 기반.
|
||||
|
||||
단순 공간 근접이 아닌, 협조 운항 패턴(유사 속도/방향/역할)으로 선단을 판별.
|
||||
- PT 저인망: 2척, 3NM 이내, 유사 속도(2~5kn) + 유사 방향(20° 이내)
|
||||
- PS 선망: 3~5척, 2NM 이내, 모선(고속)+조명선(정지)+운반선(저속 대형)
|
||||
- FC 환적: 2척, 0.5NM 이내, 양쪽 저속(2kn 이하)
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from algorithms.location import haversine_nm, dist_to_baseline, EARTH_RADIUS_NM
|
||||
from algorithms.location import haversine_nm, dist_to_baseline
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def detect_group_clusters(
|
||||
vessel_snapshots: list[dict],
|
||||
cell_size_nm: float = 5.0,
|
||||
min_vessels: int = 3,
|
||||
max_cluster_size: int = 20,
|
||||
def _heading_diff(h1: float, h2: float) -> float:
|
||||
"""두 방향 사이 최소 각도차 (0~180)."""
|
||||
d = abs(h1 - h2) % 360
|
||||
return d if d <= 180 else 360 - d
|
||||
|
||||
|
||||
def detect_fleet_patterns(
|
||||
vessel_dfs: dict[str, pd.DataFrame],
|
||||
) -> dict[int, list[dict]]:
|
||||
"""고정 그리드 셀 기반 클러스터링 — DBSCAN 체인 효과 방지.
|
||||
"""행동 패턴 기반 선단 탐지.
|
||||
|
||||
cell_size_nm 격자로 공간을 분할하여 같은 셀에 속하는 선박을 그룹핑.
|
||||
체인 효과 없이 max_cluster_size 제한으로 거대 클러스터 방지.
|
||||
Returns: {fleet_id: [{mmsi, lat, lon, sog, cog, role, pattern}, ...]}
|
||||
"""
|
||||
if len(vessel_snapshots) < min_vessels:
|
||||
return {}
|
||||
|
||||
# 위도 1도 ≈ 60NM, 경도 1도 ≈ 60*cos(lat) NM
|
||||
# 중위도(35도) 기준 경도 1도 ≈ 49NM
|
||||
cell_lat = cell_size_nm / 60.0
|
||||
cell_lng = cell_size_nm / 49.0
|
||||
|
||||
# 격자 셀별 선박 그룹핑
|
||||
cells: dict[tuple[int, int], list[dict]] = {}
|
||||
for v in vessel_snapshots:
|
||||
cell_key = (int(v['lat'] / cell_lat), int(v['lon'] / cell_lng))
|
||||
cells.setdefault(cell_key, []).append(v)
|
||||
|
||||
clusters: dict[int, list[dict]] = {}
|
||||
cluster_id = 0
|
||||
for cell_vessels in cells.values():
|
||||
if len(cell_vessels) < min_vessels:
|
||||
# 각 선박의 최신 스냅샷 추출
|
||||
snapshots: list[dict] = []
|
||||
for mmsi, df in vessel_dfs.items():
|
||||
if df is None or len(df) == 0:
|
||||
continue
|
||||
# 셀 내 선박을 max_cluster_size 단위로 분할
|
||||
for i in range(0, len(cell_vessels), max_cluster_size):
|
||||
batch = cell_vessels[i:i + max_cluster_size]
|
||||
if len(batch) >= min_vessels:
|
||||
clusters[cluster_id] = batch
|
||||
cluster_id += 1
|
||||
last = df.iloc[-1]
|
||||
snapshots.append({
|
||||
'mmsi': mmsi,
|
||||
'lat': float(last['lat']),
|
||||
'lon': float(last['lon']),
|
||||
'sog': float(last.get('sog', 0)),
|
||||
'cog': float(last.get('cog', 0)),
|
||||
})
|
||||
|
||||
return clusters
|
||||
|
||||
|
||||
def identify_lead_vessel(cluster_vessels: list[dict]) -> dict:
|
||||
"""5기준 스코어링으로 대표선 특정."""
|
||||
if not cluster_vessels:
|
||||
if len(snapshots) < 2:
|
||||
return {}
|
||||
|
||||
scores: dict[str, float] = {}
|
||||
matched: set[str] = set()
|
||||
fleets: dict[int, list[dict]] = {}
|
||||
fleet_id = 0
|
||||
|
||||
timestamps = [pd.Timestamp(v.get('timestamp', 0)).timestamp() for v in cluster_vessels]
|
||||
min_ts = min(timestamps) if timestamps else 0
|
||||
# 1차: PT 저인망 쌍 탐지 (2척, 3NM, 유사 속도/방향)
|
||||
for i in range(len(snapshots)):
|
||||
if snapshots[i]['mmsi'] in matched:
|
||||
continue
|
||||
a = snapshots[i]
|
||||
for j in range(i + 1, len(snapshots)):
|
||||
if snapshots[j]['mmsi'] in matched:
|
||||
continue
|
||||
b = snapshots[j]
|
||||
dist = haversine_nm(a['lat'], a['lon'], b['lat'], b['lon'])
|
||||
if dist > 3.0:
|
||||
continue
|
||||
# 둘 다 조업 속도 (2~5kn)
|
||||
if not (2.0 <= a['sog'] <= 5.0 and 2.0 <= b['sog'] <= 5.0):
|
||||
continue
|
||||
# 유사 속도 (차이 1kn 미만)
|
||||
if abs(a['sog'] - b['sog']) >= 1.0:
|
||||
continue
|
||||
# 유사 방향 (20° 미만)
|
||||
if _heading_diff(a['cog'], b['cog']) >= 20.0:
|
||||
continue
|
||||
|
||||
lats = [v['lat'] for v in cluster_vessels]
|
||||
lons = [v['lon'] for v in cluster_vessels]
|
||||
centroid_lat = float(np.mean(lats))
|
||||
centroid_lon = float(np.mean(lons))
|
||||
fleets[fleet_id] = [
|
||||
{**a, 'role': 'LEADER', 'pattern': 'TRAWL_PAIR'},
|
||||
{**b, 'role': 'MEMBER', 'pattern': 'TRAWL_PAIR'},
|
||||
]
|
||||
matched.add(a['mmsi'])
|
||||
matched.add(b['mmsi'])
|
||||
fleet_id += 1
|
||||
break
|
||||
|
||||
for i, v in enumerate(cluster_vessels):
|
||||
mmsi = v['mmsi']
|
||||
s = 0.0
|
||||
# 2차: FC 환적 쌍 탐지 (2척, 0.5NM, 양쪽 저속)
|
||||
for i in range(len(snapshots)):
|
||||
if snapshots[i]['mmsi'] in matched:
|
||||
continue
|
||||
a = snapshots[i]
|
||||
for j in range(i + 1, len(snapshots)):
|
||||
if snapshots[j]['mmsi'] in matched:
|
||||
continue
|
||||
b = snapshots[j]
|
||||
dist = haversine_nm(a['lat'], a['lon'], b['lat'], b['lon'])
|
||||
if dist > 0.5:
|
||||
continue
|
||||
if a['sog'] > 2.0 or b['sog'] > 2.0:
|
||||
continue
|
||||
|
||||
# 기준 1: 최초 시각 (30점)
|
||||
ts_rank = timestamps[i] - min_ts
|
||||
s += 30.0 * (1.0 - min(ts_rank, 7200) / 7200)
|
||||
fleets[fleet_id] = [
|
||||
{**a, 'role': 'LEADER', 'pattern': 'TRANSSHIP'},
|
||||
{**b, 'role': 'MEMBER', 'pattern': 'TRANSSHIP'},
|
||||
]
|
||||
matched.add(a['mmsi'])
|
||||
matched.add(b['mmsi'])
|
||||
fleet_id += 1
|
||||
break
|
||||
|
||||
# 기준 2: 총톤수 (25점) — 외부 DB 연동 전까지 균등 배점
|
||||
s += 12.5
|
||||
# 3차: PS 선망 선단 탐지 (3~10척, 2NM 이내 클러스터)
|
||||
unmatched = [s for s in snapshots if s['mmsi'] not in matched]
|
||||
for anchor in unmatched:
|
||||
if anchor['mmsi'] in matched:
|
||||
continue
|
||||
nearby = []
|
||||
for other in unmatched:
|
||||
if other['mmsi'] == anchor['mmsi'] or other['mmsi'] in matched:
|
||||
continue
|
||||
dist = haversine_nm(anchor['lat'], anchor['lon'], other['lat'], other['lon'])
|
||||
if dist <= 2.0:
|
||||
nearby.append(other)
|
||||
|
||||
# 기준 3: 클러스터 중심 근접성 (20점)
|
||||
dist_center = haversine_nm(v['lat'], v['lon'], centroid_lat, centroid_lon)
|
||||
s += 20.0 * (1.0 - min(dist_center, 10) / 10)
|
||||
if len(nearby) < 2: # 본인 포함 3척 이상
|
||||
continue
|
||||
|
||||
# 기준 4: 기선 최근접 (15점)
|
||||
dist_base = dist_to_baseline(v['lat'], v['lon'])
|
||||
s += 15.0 * (1.0 - min(dist_base, 12) / 12)
|
||||
# 역할 분류: 고속(모선), 정지(조명선), 나머지(멤버)
|
||||
members = [{**anchor, 'role': 'LEADER', 'pattern': 'PURSE_SEINE'}]
|
||||
matched.add(anchor['mmsi'])
|
||||
for n in nearby[:9]: # 최대 10척
|
||||
if n['sog'] < 0.5:
|
||||
role = 'LIGHTING'
|
||||
else:
|
||||
role = 'MEMBER'
|
||||
members.append({**n, 'role': role, 'pattern': 'PURSE_SEINE'})
|
||||
matched.add(n['mmsi'])
|
||||
|
||||
# 기준 5: AIS 소실 이력 (10점) — 이력 없으면 만점
|
||||
s += 10.0
|
||||
fleets[fleet_id] = members
|
||||
fleet_id += 1
|
||||
|
||||
scores[mmsi] = round(s, 2)
|
||||
|
||||
lead_mmsi = max(scores, key=lambda k: scores[k])
|
||||
score_vals = sorted(scores.values(), reverse=True)
|
||||
|
||||
if len(score_vals) > 1 and score_vals[0] - score_vals[1] > 15:
|
||||
confidence = 'HIGH'
|
||||
elif len(score_vals) > 1 and score_vals[0] - score_vals[1] > 8:
|
||||
confidence = 'MED'
|
||||
else:
|
||||
confidence = 'LOW'
|
||||
|
||||
return {
|
||||
'lead_mmsi': lead_mmsi,
|
||||
'lead_score': scores[lead_mmsi],
|
||||
'all_scores': scores,
|
||||
'confidence': confidence,
|
||||
}
|
||||
logger.info('fleet detection: %d fleets found (%d vessels matched)',
|
||||
len(fleets), len(matched))
|
||||
return fleets
|
||||
|
||||
|
||||
def assign_fleet_roles(
|
||||
vessel_dfs: dict[str, pd.DataFrame],
|
||||
cluster_map: dict[str, int],
|
||||
) -> dict[str, dict]:
|
||||
"""선단 역할 할당: LEADER/MEMBER/NOISE."""
|
||||
"""선단 역할 할당 — 패턴 매칭 기반.
|
||||
|
||||
cluster_map은 파이프라인에서 전달되지만, 여기서는 vessel_dfs로 직접 패턴 탐지.
|
||||
"""
|
||||
fleets = detect_fleet_patterns(vessel_dfs)
|
||||
|
||||
results: dict[str, dict] = {}
|
||||
|
||||
# 클러스터별 그룹핑
|
||||
clusters: dict[int, list[str]] = {}
|
||||
for mmsi, cid in cluster_map.items():
|
||||
clusters.setdefault(cid, []).append(mmsi)
|
||||
# 매칭된 선박 (fleet_id를 cluster_id로 사용)
|
||||
fleet_mmsis: set[str] = set()
|
||||
for fid, members in fleets.items():
|
||||
for m in members:
|
||||
fleet_mmsis.add(m['mmsi'])
|
||||
results[m['mmsi']] = {
|
||||
'cluster_id': fid,
|
||||
'cluster_size': len(members),
|
||||
'is_leader': m['role'] == 'LEADER',
|
||||
'fleet_role': m['role'],
|
||||
}
|
||||
|
||||
for cid, mmsi_list in clusters.items():
|
||||
if cid == -1:
|
||||
for mmsi in mmsi_list:
|
||||
results[mmsi] = {
|
||||
'cluster_size': 0,
|
||||
'is_leader': False,
|
||||
'fleet_role': 'NOISE',
|
||||
}
|
||||
continue
|
||||
|
||||
cluster_size = len(mmsi_list)
|
||||
|
||||
# 스냅샷 생성 (각 선박의 마지막 포인트)
|
||||
snapshots: list[dict] = []
|
||||
for mmsi in mmsi_list:
|
||||
df = vessel_dfs.get(mmsi)
|
||||
if df is not None and len(df) > 0:
|
||||
last = df.iloc[-1]
|
||||
snapshots.append({
|
||||
'mmsi': mmsi,
|
||||
'lat': last['lat'],
|
||||
'lon': last['lon'],
|
||||
'timestamp': last.get('timestamp', pd.Timestamp.now()),
|
||||
})
|
||||
|
||||
lead_info = identify_lead_vessel(snapshots) if len(snapshots) >= 2 else {}
|
||||
lead_mmsi = lead_info.get('lead_mmsi')
|
||||
|
||||
for mmsi in mmsi_list:
|
||||
# 매칭 안 된 선박 → NOISE (cluster_id = -1)
|
||||
for mmsi in vessel_dfs:
|
||||
if mmsi not in fleet_mmsis:
|
||||
results[mmsi] = {
|
||||
'cluster_size': cluster_size,
|
||||
'is_leader': mmsi == lead_mmsi,
|
||||
'fleet_role': 'LEADER' if mmsi == lead_mmsi else 'MEMBER',
|
||||
'cluster_id': -1,
|
||||
'cluster_size': 0,
|
||||
'is_leader': False,
|
||||
'fleet_role': 'NOISE',
|
||||
}
|
||||
|
||||
return results
|
||||
|
||||
@ -116,7 +116,7 @@ def run_analysis_cycle():
|
||||
vessel_type=c['vessel_type'],
|
||||
confidence=c['confidence'],
|
||||
fishing_pct=c['fishing_pct'],
|
||||
cluster_id=c['cluster_id'],
|
||||
cluster_id=fleet_info.get('cluster_id', -1),
|
||||
season=c['season'],
|
||||
zone=zone_info.get('zone', 'EEZ_OR_BEYOND'),
|
||||
dist_to_baseline_nm=zone_info.get('dist_from_baseline_nm', 999.0),
|
||||
|
||||
불러오는 중...
Reference in New Issue
Block a user