kcg-ai-monitoring/prediction/algorithms/pair_trawl.py
htlee 8ff04a8cca feat(prediction): DAR-03 탐지 로직 보강 + 한중어업협정 906척 레지스트리 적재
- V029: kcg.fishery_permit_cn 신규 테이블(연단위, permit_year+permit_no 복합 유니크) + fleet_vessels permit_year/fishery_code 컬럼
- load_fishery_permit_cn.py: xls → DB 적재 스크립트, 906척 + 497 신청인사 upsert
- G-04/G-05/G-06 Dead code 해결: classify_gear_violations 호출 연결, dir() 버그 제거
- find_pair_candidates: bbox 1차 + 궤적 유사도(location/sog_corr/cog_alignment) 2차, role 가점
- spoofing 산식 교체: 1시간 윈도우 + teleport 절대 가점 + extreme 50kn 단독 0.6 확정
- transshipment 선종 완화: shipTy 부분일치 + 412* FISHING 간주
- gear_code DB write 경로 신설 + fleet_tracker API 3개 추가
- cron 스크립트: fishery_permit/pair_type/fleet_role 신규 섹션
2026-04-16 07:43:24 +09:00

591 lines
22 KiB
Python
Raw Blame 히스토리

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""쌍끌이 트롤 공조 탐지 — DAR-03 G-06.
두 선박의 AIS 궤적을 분석하여 쌍끌이 조업 여부를 판정한다.
판정 기준 (DAR-03 / Kroodsma 2018):
- 선박 간격 ≤ 500m (0.27 NM)
- 속력 차이 ≤ 0.5 kn
- 방향 차이 ≤ 10°
- 조업 속력 2.0~4.0 kn
- 지속 시간 ≥ 2시간
- 동시 AIS 차단 ≥ 30분 → P-01 추가
"""
from __future__ import annotations
import logging
import math
from typing import Optional
import pandas as pd
try:
from algorithms.location import haversine_nm
except ImportError:
def haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float: # type: ignore[misc]
"""두 좌표 간 거리 (해리). fallback 구현."""
R = 3440.065
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat / 2) ** 2
+ math.cos(math.radians(lat1)) * math.cos(math.radians(lat2))
* math.sin(dlon / 2) ** 2)
return 2 * R * math.asin(math.sqrt(a))
logger = logging.getLogger(__name__)
# ──────────────────────────────────────────────────────────────
# 상수
# ──────────────────────────────────────────────────────────────
PROXIMITY_NM = 0.27 # 500m ≈ 0.27 NM
SOG_DELTA_MAX = 0.5 # kn
COG_DELTA_MAX = 10.0 # degrees
SOG_MIN = 2.0 # kn (조업 속력 하한)
SOG_MAX = 4.0 # kn (조업 속력 상한)
MIN_SYNC_CYCLES = 24 # 24 × 5min = 2시간
SIMULTANEOUS_GAP_MIN = 30 # 동시 AIS 차단 기준 (분)
CYCLE_INTERVAL_MIN = 5 # 5분 리샘플 데이터
# scan_unregistered_pairs 전용
CELL_SIZE = 0.01 # ~1.1km 격자
CANDIDATE_PROXIMITY_FACTOR = 2.0 # 후보 탐색 반경: PROXIMITY_NM × 2
CANDIDATE_SOG_MIN = 1.5 # 후보 속력 하한 (완화)
CANDIDATE_SOG_MAX = 5.0 # 후보 속력 상한 (완화)
# ──────────────────────────────────────────────────────────────
# 내부 헬퍼
# ──────────────────────────────────────────────────────────────
def _cog_delta(cog_a: float, cog_b: float) -> float:
"""두 COG 값의 각도 차이 (0~180°)."""
return abs((cog_a - cog_b + 180.0) % 360.0 - 180.0)
def _find_gaps(df: pd.DataFrame, gap_threshold_min: float = 10.0) -> list[tuple[pd.Timestamp, pd.Timestamp]]:
"""DataFrame 행 간 gap_threshold_min 초과 gap 구간 목록 반환.
Returns:
list of (gap_start, gap_end) as pd.Timestamp pairs
"""
if len(df) < 2:
return []
ts_series = pd.to_datetime(df['timestamp']).sort_values().reset_index(drop=True)
gaps: list[tuple[pd.Timestamp, pd.Timestamp]] = []
for i in range(1, len(ts_series)):
delta_min = (ts_series.iloc[i] - ts_series.iloc[i - 1]).total_seconds() / 60.0
if delta_min > gap_threshold_min:
gaps.append((ts_series.iloc[i - 1], ts_series.iloc[i]))
return gaps
def _overlap_minutes(
gaps_a: list[tuple[pd.Timestamp, pd.Timestamp]],
gaps_b: list[tuple[pd.Timestamp, pd.Timestamp]],
) -> float:
"""두 gap 목록의 시간 구간 겹침 합계 (분)."""
total = 0.0
for start_a, end_a in gaps_a:
for start_b, end_b in gaps_b:
overlap_start = max(start_a, start_b)
overlap_end = min(end_a, end_b)
if overlap_end > overlap_start:
total += (overlap_end - overlap_start).total_seconds() / 60.0
return total
def _max_sync_block(synced_series: 'pd.Series[bool]') -> int:
"""연속 True 블록의 최대 길이 반환."""
max_block = 0
current = 0
for val in synced_series:
if val:
current += 1
max_block = max(max_block, current)
else:
current = 0
return max_block
def _cell_key(lat: float, lon: float) -> tuple[int, int]:
return (round(lat / CELL_SIZE), round(lon / CELL_SIZE))
def _default_result(mmsi_b: str) -> dict:
return {
'pair_detected': False,
'sync_duration_min': 0.0,
'max_sync_block_min': 0.0,
'mean_separation_nm': 0.0,
'sog_delta_mean': 0.0,
'cog_delta_mean': 0.0,
'simultaneous_gap_min': 0.0,
'g_codes': [],
'confidence': 0.0,
'pair_mmsi': mmsi_b,
}
# ──────────────────────────────────────────────────────────────
# 공개 API
# ──────────────────────────────────────────────────────────────
def detect_pair_trawl(
df_a: pd.DataFrame,
df_b: pd.DataFrame,
mmsi_a: str,
mmsi_b: str,
role_a: str = 'UNKNOWN',
role_b: str = 'UNKNOWN',
similarity: float = 0.0,
) -> dict:
"""쌍끌이 트롤 공조 탐지 (DAR-03 G-06).
Args:
df_a: 선박 A의 AIS DataFrame. 필수 컬럼: timestamp, lat, lon, sog, cog
df_b: 선박 B의 AIS DataFrame. 필수 컬럼: timestamp, lat, lon, sog, cog
mmsi_a: 선박 A MMSI
mmsi_b: 선박 B MMSI (결과의 pair_mmsi에 기록)
role_a/role_b: 'FISHING'|'CARRIER'|'PT_MAIN'|'PT_SUB'|'UNKNOWN'
similarity: find_pair_candidates가 계산한 1차 유사도 (0~1)
Returns:
위 필드들 + role_a/role_b/similarity/bonus/pair_type
"""
required_cols = {'timestamp', 'lat', 'lon', 'sog', 'cog'}
if df_a.empty or df_b.empty:
logger.debug('pair_trawl(%s, %s): empty DataFrame', mmsi_a, mmsi_b)
return _default_result(mmsi_b)
missing_a = required_cols - set(df_a.columns)
missing_b = required_cols - set(df_b.columns)
if missing_a or missing_b:
logger.warning(
'pair_trawl(%s, %s): missing columns a=%s b=%s',
mmsi_a, mmsi_b, missing_a, missing_b,
)
return _default_result(mmsi_b)
# ── Step 1: timestamp inner join ────────────────────────
a = df_a[['timestamp', 'lat', 'lon', 'sog', 'cog']].copy()
b = df_b[['timestamp', 'lat', 'lon', 'sog', 'cog']].copy()
a['timestamp'] = pd.to_datetime(a['timestamp'])
b['timestamp'] = pd.to_datetime(b['timestamp'])
merged = pd.merge(
a.rename(columns={'lat': 'lat_a', 'lon': 'lon_a', 'sog': 'sog_a', 'cog': 'cog_a'}),
b.rename(columns={'lat': 'lat_b', 'lon': 'lon_b', 'sog': 'sog_b', 'cog': 'cog_b'}),
on='timestamp',
how='inner',
).sort_values('timestamp').reset_index(drop=True)
total_aligned = len(merged)
if total_aligned < MIN_SYNC_CYCLES:
logger.debug(
'pair_trawl(%s, %s): only %d aligned rows (need %d)',
mmsi_a, mmsi_b, total_aligned, MIN_SYNC_CYCLES,
)
return _default_result(mmsi_b)
# ── Step 2: 행별 동기화 지표 계산 ───────────────────────
merged['distance_nm'] = merged.apply(
lambda r: haversine_nm(r['lat_a'], r['lon_a'], r['lat_b'], r['lon_b']),
axis=1,
)
merged['sog_delta'] = (merged['sog_a'] - merged['sog_b']).abs()
merged['cog_delta'] = merged.apply(
lambda r: _cog_delta(r['cog_a'], r['cog_b']),
axis=1,
)
merged['both_in_range'] = (
merged['sog_a'].between(SOG_MIN, SOG_MAX)
& merged['sog_b'].between(SOG_MIN, SOG_MAX)
)
merged['synced'] = (
(merged['distance_nm'] <= PROXIMITY_NM)
& (merged['sog_delta'] <= SOG_DELTA_MAX)
& (merged['cog_delta'] <= COG_DELTA_MAX)
& merged['both_in_range']
)
# ── Step 3: 연속 블록 탐지 ──────────────────────────────
max_block_cycles = _max_sync_block(merged['synced'])
if max_block_cycles < MIN_SYNC_CYCLES:
logger.debug(
'pair_trawl(%s, %s): max sync block %d cycles < %d required',
mmsi_a, mmsi_b, max_block_cycles, MIN_SYNC_CYCLES,
)
return _default_result(mmsi_b)
total_synced = int(merged['synced'].sum())
sync_duration_min = total_synced * CYCLE_INTERVAL_MIN
max_sync_block_min = max_block_cycles * CYCLE_INTERVAL_MIN
mean_separation_nm = float(merged.loc[merged['synced'], 'distance_nm'].mean())
sog_delta_mean = float(merged.loc[merged['synced'], 'sog_delta'].mean())
cog_delta_mean = float(merged.loc[merged['synced'], 'cog_delta'].mean())
# ── Step 4: 동시 AIS 차단 검출 ──────────────────────────
gaps_a = _find_gaps(df_a, gap_threshold_min=10.0)
gaps_b = _find_gaps(df_b, gap_threshold_min=10.0)
simultaneous_gap_min = _overlap_minutes(gaps_a, gaps_b)
g_codes: list[str] = []
if simultaneous_gap_min >= SIMULTANEOUS_GAP_MIN:
g_codes.append('P-01')
# ── Step 5: 신뢰도 산출 ─────────────────────────────────
sync_ratio = min(1.0, total_synced / total_aligned)
synced_distances = merged.loc[merged['synced'], 'distance_nm']
if len(synced_distances) > 1:
std_distance = float(synced_distances.std())
else:
std_distance = 0.0
separation_stability = 1.0 - min(1.0, std_distance / PROXIMITY_NM)
sog_sync_quality = 1.0 - min(1.0, sog_delta_mean / SOG_DELTA_MAX)
confidence = round(
sync_ratio * 0.4
+ separation_stability * 0.3
+ sog_sync_quality * 0.3,
4,
)
# ── role 매칭 가점 (DAR-03 선종 정합) ─────────────────────
bonus = 0
pair_type = 'GENERIC'
if role_a == 'PT_MAIN' and role_b == 'PT_SUB':
bonus = 15; pair_type = 'PT_REGISTERED'
elif role_b == 'PT_MAIN' and role_a == 'PT_SUB':
bonus = 15; pair_type = 'PT_REGISTERED'
elif role_a == 'FISHING' and role_b == 'FISHING':
bonus = 10; pair_type = 'COOP_FISHING'
elif {role_a, role_b} == {'FISHING', 'CARRIER'}:
bonus = 15; pair_type = 'TRANSSHIP_LIKE'
logger.info(
'pair_trawl(%s, %s): detected — sync=%.0fmin max_block=%.0fmin '
'sep=%.3fnm confidence=%.3f bonus=%d type=%s g_codes=%s',
mmsi_a, mmsi_b,
sync_duration_min, max_sync_block_min,
mean_separation_nm, confidence, bonus, pair_type, g_codes,
)
return {
'pair_detected': True,
'sync_duration_min': round(sync_duration_min, 1),
'max_sync_block_min': round(max_sync_block_min, 1),
'mean_separation_nm': round(mean_separation_nm, 4),
'sog_delta_mean': round(sog_delta_mean, 4),
'cog_delta_mean': round(cog_delta_mean, 4),
'simultaneous_gap_min': round(simultaneous_gap_min, 1),
'g_codes': g_codes,
'confidence': confidence,
'pair_mmsi': mmsi_b,
'role_a': role_a,
'role_b': role_b,
'similarity': round(similarity, 4),
'bonus': bonus,
'pair_type': pair_type,
}
def _classify_role(
mmsi: str,
info: dict,
pt_registered: set[str],
pt_sub_registered: set[str],
) -> str:
"""페어 후보 role 판정. 반환: PT_MAIN/PT_SUB/FISHING/CARRIER/UNKNOWN."""
if mmsi in pt_sub_registered:
return 'PT_SUB'
if mmsi in pt_registered:
return 'PT_MAIN'
kind = info.get('ship_kind_code', '') if info else ''
ship_ty = (info.get('ship_ty') or info.get('vessel_type') or '') if info else ''
if kind == '000020':
return 'FISHING'
if kind in ('000023', '000024'):
return 'CARRIER'
# 중국 412* 허가 어선은 ship_kind 없어도 FISHING 간주
if mmsi.startswith('412'):
return 'FISHING'
st = ship_ty.lower() if isinstance(ship_ty, str) else ''
if any(k in st for k in ('cargo', 'tanker', 'supply', 'carrier')):
return 'CARRIER'
if 'fishing' in st:
return 'FISHING'
return 'UNKNOWN'
def _trajectory_similarity(
df_a: pd.DataFrame, df_b: pd.DataFrame, min_samples: int = 6,
) -> tuple[float, int, dict]:
"""두 선박의 공통 타임스탬프 기반 궤적 유사도 (0~1).
Returns: (similarity, common_samples, breakdown)
breakdown = {location_score, sog_corr, cog_alignment}
"""
if df_a.empty or df_b.empty:
return 0.0, 0, {}
cols = {'timestamp', 'lat', 'lon', 'sog', 'cog'}
if cols - set(df_a.columns) or cols - set(df_b.columns):
return 0.0, 0, {}
try:
a = df_a[['timestamp', 'lat', 'lon', 'sog', 'cog']].copy()
b = df_b[['timestamp', 'lat', 'lon', 'sog', 'cog']].copy()
a['timestamp'] = pd.to_datetime(a['timestamp'])
b['timestamp'] = pd.to_datetime(b['timestamp'])
m = pd.merge(
a.rename(columns={'lat': 'la', 'lon': 'oa', 'sog': 'sa', 'cog': 'ca'}),
b.rename(columns={'lat': 'lb', 'lon': 'ob', 'sog': 'sb', 'cog': 'cb'}),
on='timestamp', how='inner',
)
except Exception:
return 0.0, 0, {}
n = len(m)
if n < min_samples:
return 0.0, n, {}
# location: 평균 거리 기반 (500m 이내 1.0 → 1km 0.0 선형)
dists = [
haversine_nm(r.la, r.oa, r.lb, r.ob)
for r in m.itertuples(index=False)
]
mean_dist_nm = sum(dists) / len(dists)
location_score = max(0.0, min(1.0, 1.0 - (mean_dist_nm - PROXIMITY_NM) / PROXIMITY_NM))
# sog 상관 (Pearson)
try:
sa = m['sa'].astype(float)
sb = m['sb'].astype(float)
if sa.std() > 0 and sb.std() > 0:
sog_corr = abs(float(sa.corr(sb)))
else:
sog_corr = 1.0 if (sa - sb).abs().mean() < 0.5 else 0.0
except Exception:
sog_corr = 0.0
# cog 방향 일치: 평균 각도차 → 10°=1.0, 90°=0.0 선형
cog_deltas = [
_cog_delta(float(r.ca), float(r.cb))
for r in m.itertuples(index=False)
]
mean_cog_delta = sum(cog_deltas) / len(cog_deltas)
cog_alignment = max(0.0, min(1.0, 1.0 - (mean_cog_delta - COG_DELTA_MAX) / 80.0))
similarity = 0.4 * location_score + 0.3 * sog_corr + 0.3 * cog_alignment
return round(similarity, 4), n, {
'location_score': round(location_score, 3),
'sog_corr': round(sog_corr, 3),
'cog_alignment': round(cog_alignment, 3),
'mean_distance_nm': round(mean_dist_nm, 4),
'mean_cog_delta_deg': round(mean_cog_delta, 2),
}
# bbox 1차 탐색 반경 (도 단위)
BBOX_DEG = 0.01 # 약 1.1km — 주변 후보만 컷
SIMILARITY_PAIR = 0.70 # 유력 페어
SIMILARITY_OBSERVE = 0.50 # 관찰 페어
def find_pair_candidates(
base_mmsis: set[str],
vessel_dfs: dict[str, pd.DataFrame],
get_vessel_info,
pt_registered: Optional[set[str]] = None,
pt_sub_registered: Optional[set[str]] = None,
bbox_deg: float = BBOX_DEG,
min_common_samples: int = 6,
similarity_threshold: float = SIMILARITY_OBSERVE,
) -> list[dict]:
"""base 선박별로 bbox 1차 → 궤적 유사도 2차로 페어 후보 반환.
base 조건: PT 등록 선박, 어선(000020), 중국 412* 허가선 등 최소 1척 어선 성격.
target: 인접 격자 내 모든 선박. 궤적 유사도 ≥ similarity_threshold 이면 후보.
Returns: [{'base_mmsi', 'target_mmsi', 'similarity', 'common_samples',
'base_role', 'target_role', 'breakdown'}, ...]
"""
pt_registered = pt_registered or set()
pt_sub_registered = pt_sub_registered or set()
# 각 선박의 최근 위치 → grid 빌드
last_pos: dict[str, dict] = {}
for mmsi, df in vessel_dfs.items():
if df.empty or not all(c in df.columns for c in ('lat', 'lon')):
continue
try:
row = df.iloc[-1]
last_pos[mmsi] = {'lat': float(row['lat']), 'lon': float(row['lon'])}
except Exception:
continue
def _cell(lat: float, lon: float) -> tuple[int, int]:
return (int(lat / bbox_deg), int(lon / bbox_deg))
grid: dict[tuple[int, int], list[str]] = {}
for mmsi, p in last_pos.items():
grid.setdefault(_cell(p['lat'], p['lon']), []).append(mmsi)
checked: set[tuple[str, str]] = set()
results: list[dict] = []
for base in base_mmsis:
if base not in last_pos:
continue
bp = last_pos[base]
bc = _cell(bp['lat'], bp['lon'])
# 인접 9 cell
neighbors: list[str] = []
for dr in (-1, 0, 1):
for dc in (-1, 0, 1):
neighbors.extend(grid.get((bc[0] + dr, bc[1] + dc), []))
for target in neighbors:
if target == base:
continue
key = (base, target) if base < target else (target, base)
if key in checked:
continue
checked.add(key)
sim, n_common, breakdown = _trajectory_similarity(
vessel_dfs[base], vessel_dfs[target], min_common_samples,
)
if sim < similarity_threshold:
continue
info_a = get_vessel_info(base) if get_vessel_info else {}
info_b = get_vessel_info(target) if get_vessel_info else {}
role_a = _classify_role(base, info_a, pt_registered, pt_sub_registered)
role_b = _classify_role(target, info_b, pt_registered, pt_sub_registered)
# 최소 1척이 어선 성격이어야 함
fishing_like = {'FISHING', 'PT_MAIN', 'PT_SUB'}
if role_a not in fishing_like and role_b not in fishing_like:
continue
results.append({
'base_mmsi': base,
'target_mmsi': target,
'similarity': sim,
'common_samples': n_common,
'base_role': role_a,
'target_role': role_b,
'breakdown': breakdown,
'is_strong': sim >= SIMILARITY_PAIR,
})
logger.info(
'find_pair_candidates: base=%d, pool=%d → candidates=%d (strong=%d)',
len(base_mmsis), len(last_pos), len(results),
sum(1 for r in results if r['is_strong']),
)
return results
def scan_unregistered_pairs(
vessel_dfs: dict[str, pd.DataFrame],
registered_pairs: set[tuple[str, str]],
) -> list[tuple[str, str]]:
"""fleet_registry에 없는 TRAWL 선박 중 500m 이내 + 속력 동기화 조건을
만족하는 쌍 후보 반환. cell-key partitioning으로 O(n²) 회피.
Args:
vessel_dfs: mmsi → AIS DataFrame. 각 DataFrame은 timestamp, lat, lon, sog 컬럼 필요
registered_pairs: 이미 확인된 쌍 (fleet_tracker 제공). 정규화: (smaller, larger)
Returns:
list of (mmsi_a, mmsi_b) candidate pairs (정규화된 순서)
"""
if len(vessel_dfs) < 2:
return []
CANDIDATE_PROXIMITY_NM = PROXIMITY_NM * CANDIDATE_PROXIMITY_FACTOR
# ── 각 선박의 마지막 위치 추출 ──────────────────────────
last_positions: dict[str, dict] = {}
for mmsi, df in vessel_dfs.items():
if df.empty or 'lat' not in df.columns or 'lon' not in df.columns or 'sog' not in df.columns:
continue
try:
last_row = df.sort_values('timestamp').iloc[-1]
lat = float(last_row['lat'])
lon = float(last_row['lon'])
sog = float(last_row['sog'])
except Exception:
continue
last_positions[mmsi] = {'lat': lat, 'lon': lon, 'sog': sog}
if len(last_positions) < 2:
return []
# ── cell-key 격자 구성 ───────────────────────────────────
cell_map: dict[tuple[int, int], list[str]] = {}
for mmsi, pos in last_positions.items():
key = _cell_key(pos['lat'], pos['lon'])
if key not in cell_map:
cell_map[key] = []
cell_map[key].append(mmsi)
# ── 인접 9셀 내 후보 쌍 탐색 ────────────────────────────
candidates: list[tuple[str, str]] = []
checked: set[tuple[str, str]] = set()
for mmsi_a, pos_a in last_positions.items():
base_cell = _cell_key(pos_a['lat'], pos_a['lon'])
# 인접 8셀 + 자기 셀 수집
neighbor_mmsis: list[str] = []
for dr in (-1, 0, 1):
for dc in (-1, 0, 1):
neighbor_cell = (base_cell[0] + dr, base_cell[1] + dc)
neighbor_mmsis.extend(cell_map.get(neighbor_cell, []))
for mmsi_b in neighbor_mmsis:
if mmsi_b == mmsi_a:
continue
# 정규화된 쌍 키
pair_key: tuple[str, str] = (
(mmsi_a, mmsi_b) if mmsi_a < mmsi_b else (mmsi_b, mmsi_a)
)
# 중복 검사
if pair_key in checked:
continue
checked.add(pair_key)
# 이미 등록된 쌍 건너뜀
if pair_key in registered_pairs:
continue
pos_b = last_positions.get(mmsi_b)
if pos_b is None:
continue
# 속력 범위 필터 (완화 기준)
sog_a = pos_a['sog']
sog_b = pos_b['sog']
if not (CANDIDATE_SOG_MIN <= sog_a <= CANDIDATE_SOG_MAX):
continue
if not (CANDIDATE_SOG_MIN <= sog_b <= CANDIDATE_SOG_MAX):
continue
# 거리 필터
dist_nm = haversine_nm(pos_a['lat'], pos_a['lon'], pos_b['lat'], pos_b['lon'])
if dist_nm > CANDIDATE_PROXIMITY_NM:
continue
candidates.append(pair_key)
logger.debug(
'scan_unregistered_pairs: %d vessels, %d candidates found',
len(last_positions), len(candidates),
)
return candidates