kcg-ai-monitoring/prediction/algorithms/fishing_pattern.py
htlee 2ee8a0e7ff feat(detection): DAR-03 어구 탐지 워크플로우 + 모선 검토 UI + 24h 리플레이 통합
- prediction: G-01/G-04/G-05/G-06 위반 분류 + 쌍끌이 공조 탐지 추가
- backend: 모선 확정/제외 API + signal-batch 항적 프록시 + ParentResolution 점수 근거 필드 확장
- frontend: 어구 탐지 그리드 다중필터/지도 flyTo, 후보 검토 패널(점수 근거+확정/제외), 24h convex hull 리플레이 + TripsLayer 애니메이션
- gitignore: 루트 .venv/ 추가
2026-04-15 13:26:15 +09:00

155 lines
6.0 KiB
Python

from __future__ import annotations
import pandas as pd
from algorithms.location import haversine_nm, classify_zone # noqa: F401 (haversine_nm re-exported for callers)
# Yan et al. (2022) 어구별 조업 속도 임계값
GEAR_SOG_THRESHOLDS: dict[str, tuple[float, float]] = {
'PT': (2.5, 4.5), # 쌍끌이저인망
'OT': (2.0, 4.0), # 단선저인망
'GN': (0.5, 2.5), # 자망·유망
'SQ': (0.0, 1.0), # 오징어채낚기
'TRAP': (0.3, 1.5), # 통발
'PS': (3.0, 6.0), # 선망
'TRAWL': (2.0, 4.5), # (alias)
'PURSE': (3.0, 6.0), # (alias)
'LONGLINE': (0.5, 2.5),
}
TRANSIT_SOG_MIN = 5.0
ANCHORED_SOG_MAX = 0.5
def classify_vessel_state(sog: float, cog_delta: float = 0.0,
gear_type: str = 'PT') -> str:
"""UCAF: 어구별 상태 분류."""
if sog <= ANCHORED_SOG_MAX:
return 'ANCHORED'
if sog >= TRANSIT_SOG_MIN:
return 'TRANSIT'
sog_min, sog_max = GEAR_SOG_THRESHOLDS.get(gear_type, (1.0, 5.0))
if sog_min <= sog <= sog_max:
# PT(쌍끌이): 공조 시 COG 변화가 적어야 함 — 큰 COG 변화는 비조업
if gear_type == 'PT' and cog_delta > 30.0:
return 'UNKNOWN'
return 'FISHING'
return 'UNKNOWN'
def compute_ucaf_score(df_vessel: pd.DataFrame, gear_type: str = 'PT') -> float:
"""UCAF 점수: 어구별 조업 상태 비율 (0~1)."""
if len(df_vessel) == 0:
return 0.0
sog_min, sog_max = GEAR_SOG_THRESHOLDS.get(gear_type, (1.0, 5.0))
in_range = df_vessel['sog'].between(sog_min, sog_max).sum()
return round(in_range / len(df_vessel), 4)
def compute_ucft_score(df_vessel: pd.DataFrame) -> float:
"""UCFT 점수: 조업 vs 항행 이진 신뢰도 (0~1)."""
if len(df_vessel) == 0:
return 0.0
fishing = (df_vessel['sog'].between(0.5, 5.0)).sum()
transit = (df_vessel['sog'] >= TRANSIT_SOG_MIN).sum()
total = fishing + transit
if total == 0:
return 0.0
return round(fishing / total, 4)
def detect_fishing_segments(df_vessel: pd.DataFrame,
window_min: int = 15,
gear_type: str = 'PT') -> list[dict]:
"""연속 조업 구간 추출."""
if len(df_vessel) < 2:
return []
segments: list[dict] = []
in_fishing = False
seg_start_idx = 0
records = df_vessel.to_dict('records')
for i, rec in enumerate(records):
sog = rec.get('sog', 0)
state = classify_vessel_state(sog, gear_type=gear_type)
if state == 'FISHING' and not in_fishing:
in_fishing = True
seg_start_idx = i
elif state != 'FISHING' and in_fishing:
start_ts = records[seg_start_idx].get('timestamp')
end_ts = rec.get('timestamp')
if start_ts and end_ts:
dur_sec = (pd.Timestamp(end_ts) - pd.Timestamp(start_ts)).total_seconds()
dur_min = dur_sec / 60
if dur_min >= window_min:
zone_info = classify_zone(
records[seg_start_idx].get('lat', 0),
records[seg_start_idx].get('lon', 0),
)
seg_end = i - 1
# Count speed anomalies within segment (SOG > TRANSIT_SOG_MIN during fishing)
speed_anomalies = 0
for idx in range(seg_start_idx, seg_end + 1):
if records[idx].get('sog', 0) > TRANSIT_SOG_MIN:
speed_anomalies += 1
segments.append({
'start_idx': seg_start_idx,
'end_idx': seg_end,
'duration_min': round(dur_min, 1),
'zone': zone_info.get('zone', 'UNKNOWN'),
'in_territorial_sea': zone_info.get('zone') == 'TERRITORIAL_SEA',
'speed_anomaly_count': speed_anomalies,
})
in_fishing = False
# 트랙 끝까지 조업 중이면 마지막 세그먼트 추가
if in_fishing and len(records) > seg_start_idx:
start_ts = records[seg_start_idx].get('timestamp')
end_ts = records[-1].get('timestamp')
if start_ts and end_ts:
dur_sec = (pd.Timestamp(end_ts) - pd.Timestamp(start_ts)).total_seconds()
dur_min = dur_sec / 60
if dur_min >= window_min:
zone_info = classify_zone(
records[seg_start_idx].get('lat', 0),
records[seg_start_idx].get('lon', 0),
)
seg_end = len(records) - 1
# Count speed anomalies within segment (SOG > TRANSIT_SOG_MIN during fishing)
speed_anomalies = 0
for idx in range(seg_start_idx, seg_end + 1):
if records[idx].get('sog', 0) > TRANSIT_SOG_MIN:
speed_anomalies += 1
segments.append({
'start_idx': seg_start_idx,
'end_idx': seg_end,
'duration_min': round(dur_min, 1),
'zone': zone_info.get('zone', 'UNKNOWN'),
'in_territorial_sea': zone_info.get('zone') == 'TERRITORIAL_SEA',
'speed_anomaly_count': speed_anomalies,
})
return segments
def detect_trawl_uturn(df_vessel: pd.DataFrame,
uturn_threshold_deg: float = 150.0,
min_uturn_count: int = 3) -> dict:
"""U-turn 왕복 패턴 감지 (저인망 특징)."""
if len(df_vessel) < 2:
return {'uturn_count': 0, 'trawl_suspected': False}
uturn_count = 0
cog_vals = df_vessel['cog'].values
sog_vals = df_vessel['sog'].values
for i in range(1, len(cog_vals)):
delta = abs((cog_vals[i] - cog_vals[i - 1] + 180) % 360 - 180)
if delta >= uturn_threshold_deg and sog_vals[i] < TRANSIT_SOG_MIN:
uturn_count += 1
return {
'uturn_count': uturn_count,
'trawl_suspected': uturn_count >= min_uturn_count,
}