release: 선단 패턴 매칭 + 수역 위험도 #129

병합
htlee develop 에서 main 로 2 commits 를 머지했습니다 2026-03-20 17:47:08 +09:00
2개의 변경된 파일141개의 추가작업 그리고 119개의 파일을 삭제

파일 보기

@ -1,155 +1,177 @@
import math
"""선단(Fleet) 패턴 탐지 — 공간+행동 기반.
단순 공간 근접이 아닌, 협조 운항 패턴(유사 속도/방향/역할)으로 선단을 판별.
- PT 저인망: 2, 3NM 이내, 유사 속도(2~5kn) + 유사 방향(20° 이내)
- PS 선망: 3~5, 2NM 이내, 모선(고속)+조명선(정지)+운반선(저속 대형)
- FC 환적: 2, 0.5NM 이내, 양쪽 저속(2kn 이하)
"""
import logging
from typing import Optional
import numpy as np
import pandas as pd
from algorithms.location import haversine_nm, dist_to_baseline, EARTH_RADIUS_NM
from algorithms.location import haversine_nm, dist_to_baseline
logger = logging.getLogger(__name__)
def detect_group_clusters(
vessel_snapshots: list[dict],
cell_size_nm: float = 5.0,
min_vessels: int = 3,
max_cluster_size: int = 20,
def _heading_diff(h1: float, h2: float) -> float:
"""두 방향 사이 최소 각도차 (0~180)."""
d = abs(h1 - h2) % 360
return d if d <= 180 else 360 - d
def detect_fleet_patterns(
vessel_dfs: dict[str, pd.DataFrame],
) -> dict[int, list[dict]]:
"""고정 그리드 셀 기반 클러스터링 — DBSCAN 체인 효과 방지.
"""행동 패턴 기반 선단 탐지.
cell_size_nm 격자로 공간을 분할하여 같은 셀에 속하는 선박을 그룹핑.
체인 효과 없이 max_cluster_size 제한으로 거대 클러스터 방지.
Returns: {fleet_id: [{mmsi, lat, lon, sog, cog, role, pattern}, ...]}
"""
if len(vessel_snapshots) < min_vessels:
return {}
# 위도 1도 ≈ 60NM, 경도 1도 ≈ 60*cos(lat) NM
# 중위도(35도) 기준 경도 1도 ≈ 49NM
cell_lat = cell_size_nm / 60.0
cell_lng = cell_size_nm / 49.0
# 격자 셀별 선박 그룹핑
cells: dict[tuple[int, int], list[dict]] = {}
for v in vessel_snapshots:
cell_key = (int(v['lat'] / cell_lat), int(v['lon'] / cell_lng))
cells.setdefault(cell_key, []).append(v)
clusters: dict[int, list[dict]] = {}
cluster_id = 0
for cell_vessels in cells.values():
if len(cell_vessels) < min_vessels:
# 각 선박의 최신 스냅샷 추출
snapshots: list[dict] = []
for mmsi, df in vessel_dfs.items():
if df is None or len(df) == 0:
continue
# 셀 내 선박을 max_cluster_size 단위로 분할
for i in range(0, len(cell_vessels), max_cluster_size):
batch = cell_vessels[i:i + max_cluster_size]
if len(batch) >= min_vessels:
clusters[cluster_id] = batch
cluster_id += 1
last = df.iloc[-1]
snapshots.append({
'mmsi': mmsi,
'lat': float(last['lat']),
'lon': float(last['lon']),
'sog': float(last.get('sog', 0)),
'cog': float(last.get('cog', 0)),
})
return clusters
def identify_lead_vessel(cluster_vessels: list[dict]) -> dict:
"""5기준 스코어링으로 대표선 특정."""
if not cluster_vessels:
if len(snapshots) < 2:
return {}
scores: dict[str, float] = {}
matched: set[str] = set()
fleets: dict[int, list[dict]] = {}
fleet_id = 0
timestamps = [pd.Timestamp(v.get('timestamp', 0)).timestamp() for v in cluster_vessels]
min_ts = min(timestamps) if timestamps else 0
# 1차: PT 저인망 쌍 탐지 (2척, 3NM, 유사 속도/방향)
for i in range(len(snapshots)):
if snapshots[i]['mmsi'] in matched:
continue
a = snapshots[i]
for j in range(i + 1, len(snapshots)):
if snapshots[j]['mmsi'] in matched:
continue
b = snapshots[j]
dist = haversine_nm(a['lat'], a['lon'], b['lat'], b['lon'])
if dist > 3.0:
continue
# 둘 다 조업 속도 (2~5kn)
if not (2.0 <= a['sog'] <= 5.0 and 2.0 <= b['sog'] <= 5.0):
continue
# 유사 속도 (차이 1kn 미만)
if abs(a['sog'] - b['sog']) >= 1.0:
continue
# 유사 방향 (20° 미만)
if _heading_diff(a['cog'], b['cog']) >= 20.0:
continue
lats = [v['lat'] for v in cluster_vessels]
lons = [v['lon'] for v in cluster_vessels]
centroid_lat = float(np.mean(lats))
centroid_lon = float(np.mean(lons))
fleets[fleet_id] = [
{**a, 'role': 'LEADER', 'pattern': 'TRAWL_PAIR'},
{**b, 'role': 'MEMBER', 'pattern': 'TRAWL_PAIR'},
]
matched.add(a['mmsi'])
matched.add(b['mmsi'])
fleet_id += 1
break
for i, v in enumerate(cluster_vessels):
mmsi = v['mmsi']
s = 0.0
# 2차: FC 환적 쌍 탐지 (2척, 0.5NM, 양쪽 저속)
for i in range(len(snapshots)):
if snapshots[i]['mmsi'] in matched:
continue
a = snapshots[i]
for j in range(i + 1, len(snapshots)):
if snapshots[j]['mmsi'] in matched:
continue
b = snapshots[j]
dist = haversine_nm(a['lat'], a['lon'], b['lat'], b['lon'])
if dist > 0.5:
continue
if a['sog'] > 2.0 or b['sog'] > 2.0:
continue
# 기준 1: 최초 시각 (30점)
ts_rank = timestamps[i] - min_ts
s += 30.0 * (1.0 - min(ts_rank, 7200) / 7200)
fleets[fleet_id] = [
{**a, 'role': 'LEADER', 'pattern': 'TRANSSHIP'},
{**b, 'role': 'MEMBER', 'pattern': 'TRANSSHIP'},
]
matched.add(a['mmsi'])
matched.add(b['mmsi'])
fleet_id += 1
break
# 기준 2: 총톤수 (25점) — 외부 DB 연동 전까지 균등 배점
s += 12.5
# 3차: PS 선망 선단 탐지 (3~10척, 2NM 이내 클러스터)
unmatched = [s for s in snapshots if s['mmsi'] not in matched]
for anchor in unmatched:
if anchor['mmsi'] in matched:
continue
nearby = []
for other in unmatched:
if other['mmsi'] == anchor['mmsi'] or other['mmsi'] in matched:
continue
dist = haversine_nm(anchor['lat'], anchor['lon'], other['lat'], other['lon'])
if dist <= 2.0:
nearby.append(other)
# 기준 3: 클러스터 중심 근접성 (20점)
dist_center = haversine_nm(v['lat'], v['lon'], centroid_lat, centroid_lon)
s += 20.0 * (1.0 - min(dist_center, 10) / 10)
if len(nearby) < 2: # 본인 포함 3척 이상
continue
# 기준 4: 기선 최근접 (15점)
dist_base = dist_to_baseline(v['lat'], v['lon'])
s += 15.0 * (1.0 - min(dist_base, 12) / 12)
# 역할 분류: 고속(모선), 정지(조명선), 나머지(멤버)
members = [{**anchor, 'role': 'LEADER', 'pattern': 'PURSE_SEINE'}]
matched.add(anchor['mmsi'])
for n in nearby[:9]: # 최대 10척
if n['sog'] < 0.5:
role = 'LIGHTING'
else:
role = 'MEMBER'
members.append({**n, 'role': role, 'pattern': 'PURSE_SEINE'})
matched.add(n['mmsi'])
# 기준 5: AIS 소실 이력 (10점) — 이력 없으면 만점
s += 10.0
fleets[fleet_id] = members
fleet_id += 1
scores[mmsi] = round(s, 2)
lead_mmsi = max(scores, key=lambda k: scores[k])
score_vals = sorted(scores.values(), reverse=True)
if len(score_vals) > 1 and score_vals[0] - score_vals[1] > 15:
confidence = 'HIGH'
elif len(score_vals) > 1 and score_vals[0] - score_vals[1] > 8:
confidence = 'MED'
else:
confidence = 'LOW'
return {
'lead_mmsi': lead_mmsi,
'lead_score': scores[lead_mmsi],
'all_scores': scores,
'confidence': confidence,
}
logger.info('fleet detection: %d fleets found (%d vessels matched)',
len(fleets), len(matched))
return fleets
def assign_fleet_roles(
vessel_dfs: dict[str, pd.DataFrame],
cluster_map: dict[str, int],
) -> dict[str, dict]:
"""선단 역할 할당: LEADER/MEMBER/NOISE."""
"""선단 역할 할당 — 패턴 매칭 기반.
cluster_map은 파이프라인에서 전달되지만, 여기서는 vessel_dfs로 직접 패턴 탐지.
"""
fleets = detect_fleet_patterns(vessel_dfs)
results: dict[str, dict] = {}
# 클러스터별 그룹핑
clusters: dict[int, list[str]] = {}
for mmsi, cid in cluster_map.items():
clusters.setdefault(cid, []).append(mmsi)
# 매칭된 선박 (fleet_id를 cluster_id로 사용)
fleet_mmsis: set[str] = set()
for fid, members in fleets.items():
for m in members:
fleet_mmsis.add(m['mmsi'])
results[m['mmsi']] = {
'cluster_id': fid,
'cluster_size': len(members),
'is_leader': m['role'] == 'LEADER',
'fleet_role': m['role'],
}
for cid, mmsi_list in clusters.items():
if cid == -1:
for mmsi in mmsi_list:
results[mmsi] = {
'cluster_size': 0,
'is_leader': False,
'fleet_role': 'NOISE',
}
continue
cluster_size = len(mmsi_list)
# 스냅샷 생성 (각 선박의 마지막 포인트)
snapshots: list[dict] = []
for mmsi in mmsi_list:
df = vessel_dfs.get(mmsi)
if df is not None and len(df) > 0:
last = df.iloc[-1]
snapshots.append({
'mmsi': mmsi,
'lat': last['lat'],
'lon': last['lon'],
'timestamp': last.get('timestamp', pd.Timestamp.now()),
})
lead_info = identify_lead_vessel(snapshots) if len(snapshots) >= 2 else {}
lead_mmsi = lead_info.get('lead_mmsi')
for mmsi in mmsi_list:
# 매칭 안 된 선박 → NOISE (cluster_id = -1)
for mmsi in vessel_dfs:
if mmsi not in fleet_mmsis:
results[mmsi] = {
'cluster_size': cluster_size,
'is_leader': mmsi == lead_mmsi,
'fleet_role': 'LEADER' if mmsi == lead_mmsi else 'MEMBER',
'cluster_id': -1,
'cluster_size': 0,
'is_leader': False,
'fleet_role': 'NOISE',
}
return results

파일 보기

@ -116,7 +116,7 @@ def run_analysis_cycle():
vessel_type=c['vessel_type'],
confidence=c['confidence'],
fishing_pct=c['fishing_pct'],
cluster_id=c['cluster_id'],
cluster_id=fleet_info.get('cluster_id', -1),
season=c['season'],
zone=zone_info.get('zone', 'EEZ_OR_BEYOND'),
dist_to_baseline_nm=zone_info.get('dist_from_baseline_nm', 999.0),