""" 어구 위반 G코드 분류 프레임워크 (DAR-03) G-01: 허가수역 외 조업 (zone-gear mismatch) G-02: 금어기 조업 (fishing outside permit period) G-03: 미등록/허가외 어구 (detected gear ≠ registered fishery_code) G-04: MMSI 조작 의심 (gear signal on/off cycling) G-05: 어구 인위적 이동 (fixed gear drift > threshold) G-06: 쌍끌이 공조 조업 (pair trawl — from pair_trawl.py) """ import math import logging from datetime import datetime from typing import Optional import pandas as pd logger = logging.getLogger(__name__) # G-code score weights G01_SCORE = 15 # 비허가 수역 조업 G02_SCORE = 18 # 금어기 조업 G03_SCORE = 12 # 미등록/허가외 어구 G04_SCORE = 10 # MMSI 조작 의심 G05_SCORE = 5 # 고정어구 인위적 이동 G06_SCORE = 20 # 쌍끌이 공조 탐지 # G-03: 허가 업종코드 → 허용 어구 유형 매핑 (fishery_permit_cn.fishery_code 기준) # PT = 2척식저인망(쌍끌이 본선), PT-S = 부속선 → trawl/pair_trawl # GN = 자망 → gillnet # PS = 선망(둘러치기) → purse_seine # OT = 외끌이저인망 → trawl # FC = 운반선 → 조업 금지 FISHERY_CODE_ALLOWED_GEAR: dict[str, set[str]] = { 'PT': {'PT', 'TRAWL', 'PT-S'}, 'PT-S': {'PT', 'TRAWL', 'PT-S'}, 'GN': {'GN', 'GNS', 'GND', 'GILLNET'}, 'PS': {'PS', 'PURSE'}, 'OT': {'OT', 'TRAWL'}, 'FC': set(), } # G-04 thresholds SIGNAL_CYCLING_GAP_MIN = 30 # minutes SIGNAL_CYCLING_MIN_COUNT = 2 # G-05 thresholds GEAR_DRIFT_THRESHOLD_NM = 0.270 # ≈ 500m (DAR-03 스펙, 조류 보정 전) # Fixed gear types (stow net, gillnet, trap) FIXED_GEAR_TYPES = {'GN', 'TRAP', 'FYK', 'FPO', 'GNS', 'GND'} def _haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """두 좌표 간 거리 (해리) — Haversine 공식.""" R = 3440.065 dlat = math.radians(lat2 - lat1) dlon = math.radians(lon2 - lon1) a = ( math.sin(dlat / 2) ** 2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2) ** 2 ) return 2 * R * math.asin(min(1.0, math.sqrt(a))) def _detect_signal_cycling( gear_episodes: list[dict], threshold_min: int = SIGNAL_CYCLING_GAP_MIN, ) -> tuple[bool, int]: """gear_identity_log 에피소드 간 간격 분석. 연속 에피소드 사이의 오프라인 gap이 threshold_min 이하인 횟수를 계산한다. 인위적으로 신호를 껐다 켜는 MMSI 조작 패턴을 탐지하기 위해 사용한다. Args: gear_episodes: [{'first_seen_at': datetime, 'last_seen_at': datetime}, ...] first_seen_at 오름차순 정렬된 에피소드 목록. threshold_min: 조작 의심 gap 상한 (분). 기본값 SIGNAL_CYCLING_GAP_MIN. Returns: (is_cycling, cycling_count) 튜플. cycling_count >= SIGNAL_CYCLING_MIN_COUNT 이면 is_cycling = True. """ if len(gear_episodes) < 2: return False, 0 try: sorted_episodes = sorted(gear_episodes, key=lambda e: e['first_seen_at']) except (KeyError, TypeError) as exc: logger.warning('gear_episodes 정렬 실패: %s', exc) return False, 0 cycling_count = 0 for i in range(1, len(sorted_episodes)): prev_end = sorted_episodes[i - 1].get('last_seen_at') curr_start = sorted_episodes[i].get('first_seen_at') if prev_end is None or curr_start is None: continue try: gap_sec = (curr_start - prev_end).total_seconds() except AttributeError: # datetime 객체가 아닌 경우 무시 continue gap_min = gap_sec / 60.0 if 0 < gap_min <= threshold_min: cycling_count += 1 return cycling_count >= SIGNAL_CYCLING_MIN_COUNT, cycling_count def _detect_gear_drift( positions: list[tuple[float, float]], threshold_nm: float = GEAR_DRIFT_THRESHOLD_NM, ) -> dict: """연속 위치 간 haversine 거리 합산으로 고정어구 이동 탐지. 조류에 의한 자연 이동은 보정하지 않으며, 보수적 임계값(0.405NM ≈ 750m)으로 인위적 이동만 탐지한다. Args: positions: [(lat, lon), ...] 순서 보장된 위치 목록. threshold_nm: 이동 탐지 임계값 (해리). 기본값 GEAR_DRIFT_THRESHOLD_NM. Returns: { 'drift_detected': bool, 'drift_nm': float, # 총 누적 이동 거리 (해리) 'tidal_corrected': bool, # 조류 보정 여부 (현재 항상 False) } """ if len(positions) < 2: return {'drift_detected': False, 'drift_nm': 0.0, 'tidal_corrected': False} total_drift_nm = 0.0 for i in range(1, len(positions)): lat1, lon1 = positions[i - 1] lat2, lon2 = positions[i] try: total_drift_nm += _haversine_nm(lat1, lon1, lat2, lon2) except (TypeError, ValueError) as exc: logger.warning('gear drift 거리 계산 실패 (index %d): %s', i, exc) continue drift_nm = round(total_drift_nm, 4) return { 'drift_detected': drift_nm > threshold_nm, 'drift_nm': drift_nm, 'tidal_corrected': False, } def _is_in_closed_season( ts: Optional[datetime], permit_periods: Optional[list[tuple[datetime, datetime]]], ) -> bool: """허가 조업 기간 밖이면 금어기 조업 (G-02) 으로 판정. permit_periods 가 비어 있으면 데이터 부재로 판정 불가 → False. ts 가 None 이면 판정 불가 → False. """ if not permit_periods or ts is None: return False try: ts_naive = ts.replace(tzinfo=None) if ts.tzinfo is not None else ts except AttributeError: return False return not any(start <= ts_naive <= end for start, end in permit_periods) def _is_unregistered_gear( detected_gear: Optional[str], registered_fishery_code: Optional[str], ) -> bool: """감지된 어구가 허가 업종코드의 허용 어구에 포함되지 않으면 G-03. 둘 중 하나라도 없으면 판정 불가 → False (데이터 부족). """ if not detected_gear or not registered_fishery_code: return False detected_norm = detected_gear.upper().strip() allowed = FISHERY_CODE_ALLOWED_GEAR.get(registered_fishery_code.upper().strip()) if allowed is None: return False # 미지의 업종코드 — 판정 보류 return detected_norm not in allowed def classify_gear_violations( mmsi: str, gear_type: str, zone_info: dict, df_vessel: Optional[pd.DataFrame], pair_result: Optional[dict], is_permitted: bool, gear_episodes: Optional[list[dict]] = None, gear_positions: Optional[list[tuple[float, float]]] = None, permit_periods: Optional[list[tuple[datetime, datetime]]] = None, registered_fishery_code: Optional[str] = None, observation_ts: Optional[datetime] = None, ) -> dict: """어구 위반 G코드 분류 메인 함수 (DAR-03). DAR-03 규격에 따라 G-01/G-04/G-05/G-06 위반 코드를 평가하고 종합 점수 및 판정 결과를 반환한다. Args: mmsi: 선박 MMSI 식별자. gear_type: 어구 유형 코드 (예: 'GN', 'PS', 'PT'). zone_info: 수역 정보 dict. classify_zone() 반환 형식과 동일. - 'zone': str (예: 'ZONE_I', 'TERRITORIAL_SEA') - 'allowed_gears': list[str] (허가 어구 목록, ZONE_* 에만 존재) df_vessel: 선박 AIS 이력 DataFrame (현재 내부 사용 없음, 향후 확장 대비). pair_result: pair_trawl 알고리즘 결과 dict. - 'pair_detected': bool - 'sync_duration_min': float - 'mean_separation_nm': float - 'pair_mmsi': str - 'g_codes': list[str] (선택) is_permitted: 선박의 해역 입어 허가 여부 (현재 참조 전용, G-01과 독립 판정). gear_episodes: 어구 신호 에피소드 목록. G-04 평가에 사용. [{'first_seen_at': datetime, 'last_seen_at': datetime}, ...] gear_positions: 어구 위치 목록 [(lat, lon), ...]. G-05 평가에 사용. Returns: { 'g_codes': list[str], # 탐지된 G코드 목록 (예: ['G-01', 'G-06']) 'gear_judgment': str, # 최고 우선순위 판정 레이블 'evidence': dict, # G코드별 근거 데이터 'gear_violation_score': int, # 위반 점수 합계 } 판정 우선순위: ZONE_VIOLATION > PAIR_TRAWL > GEAR_MISMATCH > '' (정상) """ g_codes: list[str] = [] evidence: dict = {} score = 0 judgment = '' # ── G-01: 허가수역 외 조업 ───────────────────────────────────── zone = zone_info.get('zone', '') if zone.startswith('ZONE_'): allowed_gears: list[str] = zone_info.get('allowed_gears', []) if allowed_gears and gear_type not in allowed_gears: g_codes.append('G-01') score += G01_SCORE evidence['G-01'] = { 'zone': zone, 'gear': gear_type, 'allowed': allowed_gears, } judgment = 'ZONE_VIOLATION' logger.debug( 'G-01 탐지 [mmsi=%s] zone=%s gear=%s allowed=%s', mmsi, zone, gear_type, allowed_gears, ) # ── G-02: 금어기 조업 ──────────────────────────────────────── if permit_periods: try: in_closed = _is_in_closed_season(observation_ts, permit_periods) except Exception as exc: logger.error('G-02 평가 실패 [mmsi=%s]: %s', mmsi, exc) in_closed = False if in_closed: g_codes.append('G-02') score += G02_SCORE evidence['G-02'] = { 'observed_at': observation_ts.isoformat() if observation_ts else None, 'permit_periods': [ [s.isoformat(), e.isoformat()] for s, e in permit_periods ], } if not judgment: judgment = 'CLOSED_SEASON_FISHING' logger.debug('G-02 탐지 [mmsi=%s] ts=%s', mmsi, observation_ts) # ── G-03: 미등록/허가외 어구 ────────────────────────────────── if registered_fishery_code: try: unregistered = _is_unregistered_gear(gear_type, registered_fishery_code) except Exception as exc: logger.error('G-03 평가 실패 [mmsi=%s]: %s', mmsi, exc) unregistered = False if unregistered: g_codes.append('G-03') score += G03_SCORE evidence['G-03'] = { 'detected_gear': gear_type, 'registered_fishery_code': registered_fishery_code, 'allowed_gears': sorted( FISHERY_CODE_ALLOWED_GEAR.get( registered_fishery_code.upper().strip(), set() ) ), } if not judgment: judgment = 'UNREGISTERED_GEAR' logger.debug( 'G-03 탐지 [mmsi=%s] detected=%s registered=%s', mmsi, gear_type, registered_fishery_code, ) # ── G-04: MMSI 조작 의심 (고정어구 신호 on/off 반복) ─────────── if gear_episodes is not None and gear_type in FIXED_GEAR_TYPES: try: is_cycling, cycling_count = _detect_signal_cycling(gear_episodes) except Exception as exc: logger.error('G-04 평가 실패 [mmsi=%s]: %s', mmsi, exc) is_cycling, cycling_count = False, 0 if is_cycling: g_codes.append('G-04') score += G04_SCORE evidence['G-04'] = { 'cycling_count': cycling_count, 'threshold_min': SIGNAL_CYCLING_GAP_MIN, } if not judgment: judgment = 'GEAR_MISMATCH' logger.debug( 'G-04 탐지 [mmsi=%s] cycling_count=%d', mmsi, cycling_count, ) # ── G-05: 고정어구 인위적 이동 ──────────────────────────────── if gear_positions is not None and gear_type in FIXED_GEAR_TYPES: try: drift_result = _detect_gear_drift(gear_positions) except Exception as exc: logger.error('G-05 평가 실패 [mmsi=%s]: %s', mmsi, exc) drift_result = {'drift_detected': False, 'drift_nm': 0.0, 'tidal_corrected': False} if drift_result['drift_detected']: g_codes.append('G-05') score += G05_SCORE evidence['G-05'] = drift_result if not judgment: judgment = 'GEAR_MISMATCH' logger.debug( 'G-05 탐지 [mmsi=%s] drift_nm=%.4f', mmsi, drift_result['drift_nm'], ) # ── G-06: 쌍끌이 공조 조업 ──────────────────────────────────── if pair_result and pair_result.get('pair_detected'): g_codes.append('G-06') score += G06_SCORE evidence['G-06'] = { 'sync_duration_min': pair_result.get('sync_duration_min'), 'mean_separation_nm': pair_result.get('mean_separation_nm'), 'pair_mmsi': pair_result.get('pair_mmsi'), } # pair_result 내 추가 g_codes (예: 'P-01') 병합 extra_codes: list[str] = pair_result.get('g_codes', []) for code in extra_codes: if code not in g_codes: g_codes.append(code) if not judgment: judgment = 'PAIR_TRAWL' logger.debug( 'G-06 탐지 [mmsi=%s] pair_mmsi=%s sync_min=%s', mmsi, pair_result.get('pair_mmsi'), pair_result.get('sync_duration_min'), ) return { 'g_codes': g_codes, 'gear_judgment': judgment, 'evidence': evidence, 'gear_violation_score': score, }