kcg-ai-monitoring/prediction/algorithms/fleet.py
htlee e2fc355b2c feat: S2 prediction 분석 엔진 모노레포 이식
iran prediction 47개 Python 파일을 prediction/ 디렉토리로 복제:
- algorithms/ 14개 분석 알고리즘 (어구추론, 다크베셀, 스푸핑, 환적, 위험도 등)
- pipeline/ 7단계 분류 파이프라인
- cache/vessel_store (24h 슬라이딩 윈도우)
- db/ 어댑터 (snpdb 원본조회, kcgdb 결과저장)
- chat/ AI 채팅 (Ollama, 후순위)
- data/ 정적 데이터 (기선, 특정어업수역 GeoJSON)

config.py를 kcgaidb로 재구성 (DB명, 사용자, 비밀번호)
DB 연결 검증 완료 (kcgaidb 37개 테이블 접근 확인)
Makefile에 dev-prediction / dev-all 타겟 추가
CLAUDE.md에 prediction 섹션 추가

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 12:56:51 +09:00

178 lines
5.8 KiB
Python

"""선단(Fleet) 패턴 탐지 — 공간+행동 기반.
단순 공간 근접이 아닌, 협조 운항 패턴(유사 속도/방향/역할)으로 선단을 판별.
- PT 저인망: 2척, 3NM 이내, 유사 속도(2~5kn) + 유사 방향(20° 이내)
- PS 선망: 3~5척, 2NM 이내, 모선(고속)+조명선(정지)+운반선(저속 대형)
- FC 환적: 2척, 0.5NM 이내, 양쪽 저속(2kn 이하)
"""
import logging
from typing import Optional
import numpy as np
import pandas as pd
from algorithms.location import haversine_nm, dist_to_baseline
logger = logging.getLogger(__name__)
def _heading_diff(h1: float, h2: float) -> float:
"""두 방향 사이 최소 각도차 (0~180)."""
d = abs(h1 - h2) % 360
return d if d <= 180 else 360 - d
def detect_fleet_patterns(
vessel_dfs: dict[str, pd.DataFrame],
) -> dict[int, list[dict]]:
"""행동 패턴 기반 선단 탐지.
Returns: {fleet_id: [{mmsi, lat, lon, sog, cog, role, pattern}, ...]}
"""
# 각 선박의 최신 스냅샷 추출
snapshots: list[dict] = []
for mmsi, df in vessel_dfs.items():
if df is None or len(df) == 0:
continue
last = df.iloc[-1]
snapshots.append({
'mmsi': mmsi,
'lat': float(last['lat']),
'lon': float(last['lon']),
'sog': float(last.get('sog', 0)),
'cog': float(last.get('cog', 0)),
})
if len(snapshots) < 2:
return {}
matched: set[str] = set()
fleets: dict[int, list[dict]] = {}
fleet_id = 0
# 1차: PT 저인망 쌍 탐지 (2척, 3NM, 유사 속도/방향)
for i in range(len(snapshots)):
if snapshots[i]['mmsi'] in matched:
continue
a = snapshots[i]
for j in range(i + 1, len(snapshots)):
if snapshots[j]['mmsi'] in matched:
continue
b = snapshots[j]
dist = haversine_nm(a['lat'], a['lon'], b['lat'], b['lon'])
if dist > 3.0:
continue
# 둘 다 조업 속도 (2~5kn)
if not (2.0 <= a['sog'] <= 5.0 and 2.0 <= b['sog'] <= 5.0):
continue
# 유사 속도 (차이 1kn 미만)
if abs(a['sog'] - b['sog']) >= 1.0:
continue
# 유사 방향 (20° 미만)
if _heading_diff(a['cog'], b['cog']) >= 20.0:
continue
fleets[fleet_id] = [
{**a, 'role': 'LEADER', 'pattern': 'TRAWL_PAIR'},
{**b, 'role': 'MEMBER', 'pattern': 'TRAWL_PAIR'},
]
matched.add(a['mmsi'])
matched.add(b['mmsi'])
fleet_id += 1
break
# 2차: FC 환적 쌍 탐지 (2척, 0.5NM, 양쪽 저속)
for i in range(len(snapshots)):
if snapshots[i]['mmsi'] in matched:
continue
a = snapshots[i]
for j in range(i + 1, len(snapshots)):
if snapshots[j]['mmsi'] in matched:
continue
b = snapshots[j]
dist = haversine_nm(a['lat'], a['lon'], b['lat'], b['lon'])
if dist > 0.5:
continue
if a['sog'] > 2.0 or b['sog'] > 2.0:
continue
fleets[fleet_id] = [
{**a, 'role': 'LEADER', 'pattern': 'TRANSSHIP'},
{**b, 'role': 'MEMBER', 'pattern': 'TRANSSHIP'},
]
matched.add(a['mmsi'])
matched.add(b['mmsi'])
fleet_id += 1
break
# 3차: PS 선망 선단 탐지 (3~10척, 2NM 이내 클러스터)
unmatched = [s for s in snapshots if s['mmsi'] not in matched]
for anchor in unmatched:
if anchor['mmsi'] in matched:
continue
nearby = []
for other in unmatched:
if other['mmsi'] == anchor['mmsi'] or other['mmsi'] in matched:
continue
dist = haversine_nm(anchor['lat'], anchor['lon'], other['lat'], other['lon'])
if dist <= 2.0:
nearby.append(other)
if len(nearby) < 2: # 본인 포함 3척 이상
continue
# 역할 분류: 고속(모선), 정지(조명선), 나머지(멤버)
members = [{**anchor, 'role': 'LEADER', 'pattern': 'PURSE_SEINE'}]
matched.add(anchor['mmsi'])
for n in nearby[:9]: # 최대 10척
if n['sog'] < 0.5:
role = 'LIGHTING'
else:
role = 'MEMBER'
members.append({**n, 'role': role, 'pattern': 'PURSE_SEINE'})
matched.add(n['mmsi'])
fleets[fleet_id] = members
fleet_id += 1
logger.info('fleet detection: %d fleets found (%d vessels matched)',
len(fleets), len(matched))
return fleets
def assign_fleet_roles(
vessel_dfs: dict[str, pd.DataFrame],
cluster_map: dict[str, int],
) -> dict[str, dict]:
"""선단 역할 할당 — 패턴 매칭 기반.
cluster_map은 파이프라인에서 전달되지만, 여기서는 vessel_dfs로 직접 패턴 탐지.
"""
fleets = detect_fleet_patterns(vessel_dfs)
results: dict[str, dict] = {}
# 매칭된 선박 (fleet_id를 cluster_id로 사용)
fleet_mmsis: set[str] = set()
for fid, members in fleets.items():
for m in members:
fleet_mmsis.add(m['mmsi'])
results[m['mmsi']] = {
'cluster_id': fid,
'cluster_size': len(members),
'is_leader': m['role'] == 'LEADER',
'fleet_role': m['role'],
}
# 매칭 안 된 선박 → NOISE (cluster_id = -1)
for mmsi in vessel_dfs:
if mmsi not in fleet_mmsis:
results[mmsi] = {
'cluster_id': -1,
'cluster_size': 0,
'is_leader': False,
'fleet_role': 'NOISE',
}
return results