kcg-ai-monitoring/prediction/algorithms/gear_violation.py
htlee 2ee8a0e7ff feat(detection): DAR-03 어구 탐지 워크플로우 + 모선 검토 UI + 24h 리플레이 통합
- prediction: G-01/G-04/G-05/G-06 위반 분류 + 쌍끌이 공조 탐지 추가
- backend: 모선 확정/제외 API + signal-batch 항적 프록시 + ParentResolution 점수 근거 필드 확장
- frontend: 어구 탐지 그리드 다중필터/지도 flyTo, 후보 검토 패널(점수 근거+확정/제외), 24h convex hull 리플레이 + TripsLayer 애니메이션
- gitignore: 루트 .venv/ 추가
2026-04-15 13:26:15 +09:00

266 lines
10 KiB
Python

"""
어구 위반 G코드 분류 프레임워크 (DAR-03)
G-01: 허가수역 외 조업 (zone-gear mismatch)
G-04: MMSI 조작 의심 (gear signal on/off cycling)
G-05: 어구 인위적 이동 (fixed gear drift > threshold)
G-06: 쌍끌이 공조 조업 (pair trawl — from pair_trawl.py)
G-02 (금어기), G-03 (미등록 어구)은 외부 데이터 필요하여 보류.
"""
import math
import logging
from typing import Optional
import pandas as pd
logger = logging.getLogger(__name__)
# G-code score weights
G01_SCORE = 15 # 비허가 수역 조업
G04_SCORE = 10 # MMSI 조작 의심
G05_SCORE = 5 # 고정어구 인위적 이동
G06_SCORE = 20 # 쌍끌이 공조 탐지
# G-04 thresholds
SIGNAL_CYCLING_GAP_MIN = 30 # minutes
SIGNAL_CYCLING_MIN_COUNT = 2
# G-05 thresholds
GEAR_DRIFT_THRESHOLD_NM = 0.405 # ≈ 750m (보수적, 조류보정 없음)
# Fixed gear types (stow net, gillnet, trap)
FIXED_GEAR_TYPES = {'GN', 'TRAP', 'FYK', 'FPO', 'GNS', 'GND'}
def _haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""두 좌표 간 거리 (해리) — Haversine 공식."""
R = 3440.065
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (
math.sin(dlat / 2) ** 2
+ math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2) ** 2
)
return 2 * R * math.asin(min(1.0, math.sqrt(a)))
def _detect_signal_cycling(
gear_episodes: list[dict],
threshold_min: int = SIGNAL_CYCLING_GAP_MIN,
) -> tuple[bool, int]:
"""gear_identity_log 에피소드 간 간격 분석.
연속 에피소드 사이의 오프라인 gap이 threshold_min 이하인 횟수를 계산한다.
인위적으로 신호를 껐다 켜는 MMSI 조작 패턴을 탐지하기 위해 사용한다.
Args:
gear_episodes: [{'first_seen_at': datetime, 'last_seen_at': datetime}, ...]
first_seen_at 오름차순 정렬된 에피소드 목록.
threshold_min: 조작 의심 gap 상한 (분). 기본값 SIGNAL_CYCLING_GAP_MIN.
Returns:
(is_cycling, cycling_count) 튜플.
cycling_count >= SIGNAL_CYCLING_MIN_COUNT 이면 is_cycling = True.
"""
if len(gear_episodes) < 2:
return False, 0
try:
sorted_episodes = sorted(gear_episodes, key=lambda e: e['first_seen_at'])
except (KeyError, TypeError) as exc:
logger.warning('gear_episodes 정렬 실패: %s', exc)
return False, 0
cycling_count = 0
for i in range(1, len(sorted_episodes)):
prev_end = sorted_episodes[i - 1].get('last_seen_at')
curr_start = sorted_episodes[i].get('first_seen_at')
if prev_end is None or curr_start is None:
continue
try:
gap_sec = (curr_start - prev_end).total_seconds()
except AttributeError:
# datetime 객체가 아닌 경우 무시
continue
gap_min = gap_sec / 60.0
if 0 < gap_min <= threshold_min:
cycling_count += 1
return cycling_count >= SIGNAL_CYCLING_MIN_COUNT, cycling_count
def _detect_gear_drift(
positions: list[tuple[float, float]],
threshold_nm: float = GEAR_DRIFT_THRESHOLD_NM,
) -> dict:
"""연속 위치 간 haversine 거리 합산으로 고정어구 이동 탐지.
조류에 의한 자연 이동은 보정하지 않으며, 보수적 임계값(0.405NM ≈ 750m)으로
인위적 이동만 탐지한다.
Args:
positions: [(lat, lon), ...] 순서 보장된 위치 목록.
threshold_nm: 이동 탐지 임계값 (해리). 기본값 GEAR_DRIFT_THRESHOLD_NM.
Returns:
{
'drift_detected': bool,
'drift_nm': float, # 총 누적 이동 거리 (해리)
'tidal_corrected': bool, # 조류 보정 여부 (현재 항상 False)
}
"""
if len(positions) < 2:
return {'drift_detected': False, 'drift_nm': 0.0, 'tidal_corrected': False}
total_drift_nm = 0.0
for i in range(1, len(positions)):
lat1, lon1 = positions[i - 1]
lat2, lon2 = positions[i]
try:
total_drift_nm += _haversine_nm(lat1, lon1, lat2, lon2)
except (TypeError, ValueError) as exc:
logger.warning('gear drift 거리 계산 실패 (index %d): %s', i, exc)
continue
drift_nm = round(total_drift_nm, 4)
return {
'drift_detected': drift_nm > threshold_nm,
'drift_nm': drift_nm,
'tidal_corrected': False,
}
def classify_gear_violations(
mmsi: str,
gear_type: str,
zone_info: dict,
df_vessel: Optional[pd.DataFrame],
pair_result: Optional[dict],
is_permitted: bool,
gear_episodes: Optional[list[dict]] = None,
gear_positions: Optional[list[tuple[float, float]]] = None,
) -> dict:
"""어구 위반 G코드 분류 메인 함수 (DAR-03).
DAR-03 규격에 따라 G-01/G-04/G-05/G-06 위반 코드를 평가하고
종합 점수 및 판정 결과를 반환한다.
Args:
mmsi: 선박 MMSI 식별자.
gear_type: 어구 유형 코드 (예: 'GN', 'PS', 'PT').
zone_info: 수역 정보 dict. classify_zone() 반환 형식과 동일.
- 'zone': str (예: 'ZONE_I', 'TERRITORIAL_SEA')
- 'allowed_gears': list[str] (허가 어구 목록, ZONE_* 에만 존재)
df_vessel: 선박 AIS 이력 DataFrame (현재 내부 사용 없음, 향후 확장 대비).
pair_result: pair_trawl 알고리즘 결과 dict.
- 'pair_detected': bool
- 'sync_duration_min': float
- 'mean_separation_nm': float
- 'pair_mmsi': str
- 'g_codes': list[str] (선택)
is_permitted: 선박의 해역 입어 허가 여부 (현재 참조 전용, G-01과 독립 판정).
gear_episodes: 어구 신호 에피소드 목록. G-04 평가에 사용.
[{'first_seen_at': datetime, 'last_seen_at': datetime}, ...]
gear_positions: 어구 위치 목록 [(lat, lon), ...]. G-05 평가에 사용.
Returns:
{
'g_codes': list[str], # 탐지된 G코드 목록 (예: ['G-01', 'G-06'])
'gear_judgment': str, # 최고 우선순위 판정 레이블
'evidence': dict, # G코드별 근거 데이터
'gear_violation_score': int, # 위반 점수 합계
}
판정 우선순위: ZONE_VIOLATION > PAIR_TRAWL > GEAR_MISMATCH > '' (정상)
"""
g_codes: list[str] = []
evidence: dict = {}
score = 0
judgment = ''
# ── G-01: 허가수역 외 조업 ─────────────────────────────────────
zone = zone_info.get('zone', '')
if zone.startswith('ZONE_'):
allowed_gears: list[str] = zone_info.get('allowed_gears', [])
if allowed_gears and gear_type not in allowed_gears:
g_codes.append('G-01')
score += G01_SCORE
evidence['G-01'] = {
'zone': zone,
'gear': gear_type,
'allowed': allowed_gears,
}
judgment = 'ZONE_VIOLATION'
logger.debug(
'G-01 탐지 [mmsi=%s] zone=%s gear=%s allowed=%s',
mmsi, zone, gear_type, allowed_gears,
)
# ── G-04: MMSI 조작 의심 (고정어구 신호 on/off 반복) ───────────
if gear_episodes is not None and gear_type in FIXED_GEAR_TYPES:
try:
is_cycling, cycling_count = _detect_signal_cycling(gear_episodes)
except Exception as exc:
logger.error('G-04 평가 실패 [mmsi=%s]: %s', mmsi, exc)
is_cycling, cycling_count = False, 0
if is_cycling:
g_codes.append('G-04')
score += G04_SCORE
evidence['G-04'] = {
'cycling_count': cycling_count,
'threshold_min': SIGNAL_CYCLING_GAP_MIN,
}
if not judgment:
judgment = 'GEAR_MISMATCH'
logger.debug(
'G-04 탐지 [mmsi=%s] cycling_count=%d', mmsi, cycling_count,
)
# ── G-05: 고정어구 인위적 이동 ────────────────────────────────
if gear_positions is not None and gear_type in FIXED_GEAR_TYPES:
try:
drift_result = _detect_gear_drift(gear_positions)
except Exception as exc:
logger.error('G-05 평가 실패 [mmsi=%s]: %s', mmsi, exc)
drift_result = {'drift_detected': False, 'drift_nm': 0.0, 'tidal_corrected': False}
if drift_result['drift_detected']:
g_codes.append('G-05')
score += G05_SCORE
evidence['G-05'] = drift_result
if not judgment:
judgment = 'GEAR_MISMATCH'
logger.debug(
'G-05 탐지 [mmsi=%s] drift_nm=%.4f', mmsi, drift_result['drift_nm'],
)
# ── G-06: 쌍끌이 공조 조업 ────────────────────────────────────
if pair_result and pair_result.get('pair_detected'):
g_codes.append('G-06')
score += G06_SCORE
evidence['G-06'] = {
'sync_duration_min': pair_result.get('sync_duration_min'),
'mean_separation_nm': pair_result.get('mean_separation_nm'),
'pair_mmsi': pair_result.get('pair_mmsi'),
}
# pair_result 내 추가 g_codes (예: 'P-01') 병합
extra_codes: list[str] = pair_result.get('g_codes', [])
for code in extra_codes:
if code not in g_codes:
g_codes.append(code)
if not judgment:
judgment = 'PAIR_TRAWL'
logger.debug(
'G-06 탐지 [mmsi=%s] pair_mmsi=%s sync_min=%s',
mmsi, pair_result.get('pair_mmsi'), pair_result.get('sync_duration_min'),
)
return {
'g_codes': g_codes,
'gear_judgment': judgment,
'evidence': evidence,
'gear_violation_score': score,
}