Merge pull request 'feat: 선단 행동 패턴 매칭 + 수역 위험도 가산' (#128) from fix/risk-scoring-and-cluster into develop

This commit is contained in:
htlee 2026-03-20 17:46:55 +09:00
커밋 fae116f7bd
2개의 변경된 파일141개의 추가작업 그리고 119개의 파일을 삭제

파일 보기

@ -1,155 +1,177 @@
import math """선단(Fleet) 패턴 탐지 — 공간+행동 기반.
단순 공간 근접이 아닌, 협조 운항 패턴(유사 속도/방향/역할)으로 선단을 판별.
- PT 저인망: 2, 3NM 이내, 유사 속도(2~5kn) + 유사 방향(20° 이내)
- PS 선망: 3~5, 2NM 이내, 모선(고속)+조명선(정지)+운반선(저속 대형)
- FC 환적: 2, 0.5NM 이내, 양쪽 저속(2kn 이하)
"""
import logging import logging
from typing import Optional
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from algorithms.location import haversine_nm, dist_to_baseline, EARTH_RADIUS_NM from algorithms.location import haversine_nm, dist_to_baseline
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def detect_group_clusters( def _heading_diff(h1: float, h2: float) -> float:
vessel_snapshots: list[dict], """두 방향 사이 최소 각도차 (0~180)."""
cell_size_nm: float = 5.0, d = abs(h1 - h2) % 360
min_vessels: int = 3, return d if d <= 180 else 360 - d
max_cluster_size: int = 20,
def detect_fleet_patterns(
vessel_dfs: dict[str, pd.DataFrame],
) -> dict[int, list[dict]]: ) -> dict[int, list[dict]]:
"""고정 그리드 셀 기반 클러스터링 — DBSCAN 체인 효과 방지. """행동 패턴 기반 선단 탐지.
cell_size_nm 격자로 공간을 분할하여 같은 셀에 속하는 선박을 그룹핑. Returns: {fleet_id: [{mmsi, lat, lon, sog, cog, role, pattern}, ...]}
체인 효과 없이 max_cluster_size 제한으로 거대 클러스터 방지.
""" """
if len(vessel_snapshots) < min_vessels: # 각 선박의 최신 스냅샷 추출
return {} snapshots: list[dict] = []
for mmsi, df in vessel_dfs.items():
# 위도 1도 ≈ 60NM, 경도 1도 ≈ 60*cos(lat) NM if df is None or len(df) == 0:
# 중위도(35도) 기준 경도 1도 ≈ 49NM
cell_lat = cell_size_nm / 60.0
cell_lng = cell_size_nm / 49.0
# 격자 셀별 선박 그룹핑
cells: dict[tuple[int, int], list[dict]] = {}
for v in vessel_snapshots:
cell_key = (int(v['lat'] / cell_lat), int(v['lon'] / cell_lng))
cells.setdefault(cell_key, []).append(v)
clusters: dict[int, list[dict]] = {}
cluster_id = 0
for cell_vessels in cells.values():
if len(cell_vessels) < min_vessels:
continue continue
# 셀 내 선박을 max_cluster_size 단위로 분할 last = df.iloc[-1]
for i in range(0, len(cell_vessels), max_cluster_size): snapshots.append({
batch = cell_vessels[i:i + max_cluster_size] 'mmsi': mmsi,
if len(batch) >= min_vessels: 'lat': float(last['lat']),
clusters[cluster_id] = batch 'lon': float(last['lon']),
cluster_id += 1 'sog': float(last.get('sog', 0)),
'cog': float(last.get('cog', 0)),
})
return clusters if len(snapshots) < 2:
def identify_lead_vessel(cluster_vessels: list[dict]) -> dict:
"""5기준 스코어링으로 대표선 특정."""
if not cluster_vessels:
return {} return {}
scores: dict[str, float] = {} matched: set[str] = set()
fleets: dict[int, list[dict]] = {}
fleet_id = 0
timestamps = [pd.Timestamp(v.get('timestamp', 0)).timestamp() for v in cluster_vessels] # 1차: PT 저인망 쌍 탐지 (2척, 3NM, 유사 속도/방향)
min_ts = min(timestamps) if timestamps else 0 for i in range(len(snapshots)):
if snapshots[i]['mmsi'] in matched:
continue
a = snapshots[i]
for j in range(i + 1, len(snapshots)):
if snapshots[j]['mmsi'] in matched:
continue
b = snapshots[j]
dist = haversine_nm(a['lat'], a['lon'], b['lat'], b['lon'])
if dist > 3.0:
continue
# 둘 다 조업 속도 (2~5kn)
if not (2.0 <= a['sog'] <= 5.0 and 2.0 <= b['sog'] <= 5.0):
continue
# 유사 속도 (차이 1kn 미만)
if abs(a['sog'] - b['sog']) >= 1.0:
continue
# 유사 방향 (20° 미만)
if _heading_diff(a['cog'], b['cog']) >= 20.0:
continue
lats = [v['lat'] for v in cluster_vessels] fleets[fleet_id] = [
lons = [v['lon'] for v in cluster_vessels] {**a, 'role': 'LEADER', 'pattern': 'TRAWL_PAIR'},
centroid_lat = float(np.mean(lats)) {**b, 'role': 'MEMBER', 'pattern': 'TRAWL_PAIR'},
centroid_lon = float(np.mean(lons)) ]
matched.add(a['mmsi'])
matched.add(b['mmsi'])
fleet_id += 1
break
for i, v in enumerate(cluster_vessels): # 2차: FC 환적 쌍 탐지 (2척, 0.5NM, 양쪽 저속)
mmsi = v['mmsi'] for i in range(len(snapshots)):
s = 0.0 if snapshots[i]['mmsi'] in matched:
continue
a = snapshots[i]
for j in range(i + 1, len(snapshots)):
if snapshots[j]['mmsi'] in matched:
continue
b = snapshots[j]
dist = haversine_nm(a['lat'], a['lon'], b['lat'], b['lon'])
if dist > 0.5:
continue
if a['sog'] > 2.0 or b['sog'] > 2.0:
continue
# 기준 1: 최초 시각 (30점) fleets[fleet_id] = [
ts_rank = timestamps[i] - min_ts {**a, 'role': 'LEADER', 'pattern': 'TRANSSHIP'},
s += 30.0 * (1.0 - min(ts_rank, 7200) / 7200) {**b, 'role': 'MEMBER', 'pattern': 'TRANSSHIP'},
]
matched.add(a['mmsi'])
matched.add(b['mmsi'])
fleet_id += 1
break
# 기준 2: 총톤수 (25점) — 외부 DB 연동 전까지 균등 배점 # 3차: PS 선망 선단 탐지 (3~10척, 2NM 이내 클러스터)
s += 12.5 unmatched = [s for s in snapshots if s['mmsi'] not in matched]
for anchor in unmatched:
if anchor['mmsi'] in matched:
continue
nearby = []
for other in unmatched:
if other['mmsi'] == anchor['mmsi'] or other['mmsi'] in matched:
continue
dist = haversine_nm(anchor['lat'], anchor['lon'], other['lat'], other['lon'])
if dist <= 2.0:
nearby.append(other)
# 기준 3: 클러스터 중심 근접성 (20점) if len(nearby) < 2: # 본인 포함 3척 이상
dist_center = haversine_nm(v['lat'], v['lon'], centroid_lat, centroid_lon) continue
s += 20.0 * (1.0 - min(dist_center, 10) / 10)
# 기준 4: 기선 최근접 (15점) # 역할 분류: 고속(모선), 정지(조명선), 나머지(멤버)
dist_base = dist_to_baseline(v['lat'], v['lon']) members = [{**anchor, 'role': 'LEADER', 'pattern': 'PURSE_SEINE'}]
s += 15.0 * (1.0 - min(dist_base, 12) / 12) matched.add(anchor['mmsi'])
for n in nearby[:9]: # 최대 10척
if n['sog'] < 0.5:
role = 'LIGHTING'
else:
role = 'MEMBER'
members.append({**n, 'role': role, 'pattern': 'PURSE_SEINE'})
matched.add(n['mmsi'])
# 기준 5: AIS 소실 이력 (10점) — 이력 없으면 만점 fleets[fleet_id] = members
s += 10.0 fleet_id += 1
scores[mmsi] = round(s, 2) logger.info('fleet detection: %d fleets found (%d vessels matched)',
len(fleets), len(matched))
lead_mmsi = max(scores, key=lambda k: scores[k]) return fleets
score_vals = sorted(scores.values(), reverse=True)
if len(score_vals) > 1 and score_vals[0] - score_vals[1] > 15:
confidence = 'HIGH'
elif len(score_vals) > 1 and score_vals[0] - score_vals[1] > 8:
confidence = 'MED'
else:
confidence = 'LOW'
return {
'lead_mmsi': lead_mmsi,
'lead_score': scores[lead_mmsi],
'all_scores': scores,
'confidence': confidence,
}
def assign_fleet_roles( def assign_fleet_roles(
vessel_dfs: dict[str, pd.DataFrame], vessel_dfs: dict[str, pd.DataFrame],
cluster_map: dict[str, int], cluster_map: dict[str, int],
) -> dict[str, dict]: ) -> dict[str, dict]:
"""선단 역할 할당: LEADER/MEMBER/NOISE.""" """선단 역할 할당 — 패턴 매칭 기반.
cluster_map은 파이프라인에서 전달되지만, 여기서는 vessel_dfs로 직접 패턴 탐지.
"""
fleets = detect_fleet_patterns(vessel_dfs)
results: dict[str, dict] = {} results: dict[str, dict] = {}
# 클러스터별 그룹핑 # 매칭된 선박 (fleet_id를 cluster_id로 사용)
clusters: dict[int, list[str]] = {} fleet_mmsis: set[str] = set()
for mmsi, cid in cluster_map.items(): for fid, members in fleets.items():
clusters.setdefault(cid, []).append(mmsi) for m in members:
fleet_mmsis.add(m['mmsi'])
results[m['mmsi']] = {
'cluster_id': fid,
'cluster_size': len(members),
'is_leader': m['role'] == 'LEADER',
'fleet_role': m['role'],
}
for cid, mmsi_list in clusters.items(): # 매칭 안 된 선박 → NOISE (cluster_id = -1)
if cid == -1: for mmsi in vessel_dfs:
for mmsi in mmsi_list: if mmsi not in fleet_mmsis:
results[mmsi] = {
'cluster_size': 0,
'is_leader': False,
'fleet_role': 'NOISE',
}
continue
cluster_size = len(mmsi_list)
# 스냅샷 생성 (각 선박의 마지막 포인트)
snapshots: list[dict] = []
for mmsi in mmsi_list:
df = vessel_dfs.get(mmsi)
if df is not None and len(df) > 0:
last = df.iloc[-1]
snapshots.append({
'mmsi': mmsi,
'lat': last['lat'],
'lon': last['lon'],
'timestamp': last.get('timestamp', pd.Timestamp.now()),
})
lead_info = identify_lead_vessel(snapshots) if len(snapshots) >= 2 else {}
lead_mmsi = lead_info.get('lead_mmsi')
for mmsi in mmsi_list:
results[mmsi] = { results[mmsi] = {
'cluster_size': cluster_size, 'cluster_id': -1,
'is_leader': mmsi == lead_mmsi, 'cluster_size': 0,
'fleet_role': 'LEADER' if mmsi == lead_mmsi else 'MEMBER', 'is_leader': False,
'fleet_role': 'NOISE',
} }
return results return results

파일 보기

@ -116,7 +116,7 @@ def run_analysis_cycle():
vessel_type=c['vessel_type'], vessel_type=c['vessel_type'],
confidence=c['confidence'], confidence=c['confidence'],
fishing_pct=c['fishing_pct'], fishing_pct=c['fishing_pct'],
cluster_id=c['cluster_id'], cluster_id=fleet_info.get('cluster_id', -1),
season=c['season'], season=c['season'],
zone=zone_info.get('zone', 'EEZ_OR_BEYOND'), zone=zone_info.get('zone', 'EEZ_OR_BEYOND'),
dist_to_baseline_nm=zone_info.get('dist_from_baseline_nm', 999.0), dist_to_baseline_nm=zone_info.get('dist_from_baseline_nm', 999.0),