from __future__ import annotations import pandas as pd from algorithms.location import haversine_nm, classify_zone # noqa: F401 (haversine_nm re-exported for callers) # Yan et al. (2022) 어구별 조업 속도 임계값 GEAR_SOG_THRESHOLDS: dict[str, tuple[float, float]] = { 'PT': (2.5, 4.5), # 쌍끌이저인망 'OT': (2.0, 4.0), # 단선저인망 'GN': (0.5, 2.5), # 자망·유망 'SQ': (0.0, 1.0), # 오징어채낚기 'TRAP': (0.3, 1.5), # 통발 'PS': (3.0, 6.0), # 선망 'TRAWL': (2.0, 4.5), # (alias) 'PURSE': (3.0, 6.0), # (alias) 'LONGLINE': (0.5, 2.5), } TRANSIT_SOG_MIN = 5.0 ANCHORED_SOG_MAX = 0.5 def classify_vessel_state(sog: float, cog_delta: float = 0.0, gear_type: str = 'PT') -> str: """UCAF: 어구별 상태 분류.""" if sog <= ANCHORED_SOG_MAX: return 'ANCHORED' if sog >= TRANSIT_SOG_MIN: return 'TRANSIT' sog_min, sog_max = GEAR_SOG_THRESHOLDS.get(gear_type, (1.0, 5.0)) if sog_min <= sog <= sog_max: return 'FISHING' return 'UNKNOWN' def compute_ucaf_score(df_vessel: pd.DataFrame, gear_type: str = 'PT') -> float: """UCAF 점수: 어구별 조업 상태 비율 (0~1).""" if len(df_vessel) == 0: return 0.0 sog_min, sog_max = GEAR_SOG_THRESHOLDS.get(gear_type, (1.0, 5.0)) in_range = df_vessel['sog'].between(sog_min, sog_max).sum() return round(in_range / len(df_vessel), 4) def compute_ucft_score(df_vessel: pd.DataFrame) -> float: """UCFT 점수: 조업 vs 항행 이진 신뢰도 (0~1).""" if len(df_vessel) == 0: return 0.0 fishing = (df_vessel['sog'].between(0.5, 5.0)).sum() transit = (df_vessel['sog'] >= TRANSIT_SOG_MIN).sum() total = fishing + transit if total == 0: return 0.0 return round(fishing / total, 4) def detect_fishing_segments(df_vessel: pd.DataFrame, window_min: int = 15, gear_type: str = 'PT') -> list[dict]: """연속 조업 구간 추출.""" if len(df_vessel) < 2: return [] segments: list[dict] = [] in_fishing = False seg_start_idx = 0 records = df_vessel.to_dict('records') for i, rec in enumerate(records): sog = rec.get('sog', 0) state = classify_vessel_state(sog, gear_type=gear_type) if state == 'FISHING' and not in_fishing: in_fishing = True seg_start_idx = i elif state != 'FISHING' and in_fishing: start_ts = records[seg_start_idx].get('timestamp') end_ts = rec.get('timestamp') if start_ts and end_ts: dur_sec = (pd.Timestamp(end_ts) - pd.Timestamp(start_ts)).total_seconds() dur_min = dur_sec / 60 if dur_min >= window_min: zone_info = classify_zone( records[seg_start_idx].get('lat', 0), records[seg_start_idx].get('lon', 0), ) segments.append({ 'start_idx': seg_start_idx, 'end_idx': i - 1, 'duration_min': round(dur_min, 1), 'zone': zone_info.get('zone', 'UNKNOWN'), 'in_territorial_sea': zone_info.get('zone') == 'TERRITORIAL_SEA', }) in_fishing = False # 트랙 끝까지 조업 중이면 마지막 세그먼트 추가 if in_fishing and len(records) > seg_start_idx: start_ts = records[seg_start_idx].get('timestamp') end_ts = records[-1].get('timestamp') if start_ts and end_ts: dur_sec = (pd.Timestamp(end_ts) - pd.Timestamp(start_ts)).total_seconds() dur_min = dur_sec / 60 if dur_min >= window_min: zone_info = classify_zone( records[seg_start_idx].get('lat', 0), records[seg_start_idx].get('lon', 0), ) segments.append({ 'start_idx': seg_start_idx, 'end_idx': len(records) - 1, 'duration_min': round(dur_min, 1), 'zone': zone_info.get('zone', 'UNKNOWN'), 'in_territorial_sea': zone_info.get('zone') == 'TERRITORIAL_SEA', }) return segments def detect_trawl_uturn(df_vessel: pd.DataFrame, uturn_threshold_deg: float = 150.0, min_uturn_count: int = 3) -> dict: """U-turn 왕복 패턴 감지 (저인망 특징).""" if len(df_vessel) < 2: return {'uturn_count': 0, 'trawl_suspected': False} uturn_count = 0 cog_vals = df_vessel['cog'].values sog_vals = df_vessel['sog'].values for i in range(1, len(cog_vals)): delta = abs((cog_vals[i] - cog_vals[i - 1] + 180) % 360 - 180) if delta >= uturn_threshold_deg and sog_vals[i] < TRANSIT_SOG_MIN: uturn_count += 1 return { 'uturn_count': uturn_count, 'trawl_suspected': uturn_count >= min_uturn_count, }