kcg-ai-monitoring/prediction/algorithms/pair_trawl.py
htlee 68e690d791 refactor(prediction): pair_trawl tier 분류 + join_key time_bucket 전환
두 가지 근본 버그를 동시에 해결:

1. Join key 버그 — raw AIS timestamp(ms 단위) inner join 은 두 선박 간 우연히
   일치하는 확률이 거의 0. vessel_store._tracks 의 time_bucket(5분 리샘플)
   컬럼을 우선 사용. _pair_join_key() 헬퍼로 fallback 지원.

2. AND 게이트 0건 문제 — 스펙 100%(2h 연속 + 500m + SOG 2-4 + sog_delta 0.5 +
   cog 10°)를 전부 요구하면 실제 공조 페어를 놓침. Tier 분류로 재설계:
   - STRONG  : 스펙 100% (24 cycles, 기존 조건)
   - PROBABLE: 800m / SOG 1.5-5 / sog_delta 1.0 / cog 20° / 12 cycles + 0.6 ratio
   - SUSPECT : 동일 완화 조건 / 6 cycles + 0.3 ratio (플래그만)
   G-06 판정은 STRONG/PROBABLE 만. SUSPECT 는 약한 신호로 노출.

거부 사유 카운터(REJECT_COUNTERS) + tier 카운트를 사이클별 로그 출력.
'조건이 엄격한건지 실제 페어가 없는건지' 원인 구분 가능.

피드백 메모리: feedback_detection_tier.md
2026-04-16 09:47:08 +09:00

690 lines
26 KiB
Python
Raw Blame 히스토리

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""쌍끌이 트롤 공조 탐지 — DAR-03 G-06.
두 선박의 AIS 궤적을 분석하여 쌍끌이 조업 여부를 판정한다.
판정 기준 (DAR-03 / Kroodsma 2018):
- 선박 간격 ≤ 500m (0.27 NM)
- 속력 차이 ≤ 0.5 kn
- 방향 차이 ≤ 10°
- 조업 속력 2.0~4.0 kn
- 지속 시간 ≥ 2시간
- 동시 AIS 차단 ≥ 30분 → P-01 추가
"""
from __future__ import annotations
import logging
import math
from typing import Optional
import pandas as pd
try:
from algorithms.location import haversine_nm
except ImportError:
def haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float: # type: ignore[misc]
"""두 좌표 간 거리 (해리). fallback 구현."""
R = 3440.065
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat / 2) ** 2
+ math.cos(math.radians(lat1)) * math.cos(math.radians(lat2))
* math.sin(dlon / 2) ** 2)
return 2 * R * math.asin(math.sqrt(a))
logger = logging.getLogger(__name__)
# ──────────────────────────────────────────────────────────────
# 상수
# ──────────────────────────────────────────────────────────────
PROXIMITY_NM = 0.27 # 500m ≈ 0.27 NM (STRONG tier 스펙)
SOG_DELTA_MAX = 0.5 # kn (STRONG)
COG_DELTA_MAX = 10.0 # degrees (STRONG)
SOG_MIN = 2.0 # kn (조업 속력 하한 — STRONG)
SOG_MAX = 4.0 # kn (조업 속력 상한 — STRONG)
MIN_SYNC_CYCLES = 24 # 24 × 5min = 2시간 (STRONG)
SIMULTANEOUS_GAP_MIN = 30 # 동시 AIS 차단 기준 (분)
CYCLE_INTERVAL_MIN = 5 # 5분 리샘플 데이터
# Tier별 완화 임계 — AND 게이트로 0건 만들지 않고 신호 강도에 맞춰 분류
# STRONG : 스펙 100% (2h 연속 + 전 조건)
# PROBABLE : 1h 연속 또는 누적 2h + sync_ratio 0.6
# SUSPECT : 30분+ + sync_ratio 0.3 (약한 신호, 플래그만)
PROBABLE_MIN_BLOCK_CYCLES = 12 # 1h
PROBABLE_MIN_SYNC_RATIO = 0.6
SUSPECT_MIN_BLOCK_CYCLES = 6 # 30min
SUSPECT_MIN_SYNC_RATIO = 0.3
# 완화 tier 기준: proximity 800m, SOG 1.5-5.0kn, sog_delta 1.0, cog 20°
PROBABLE_PROXIMITY_NM = 0.43 # ≈ 800m
PROBABLE_SOG_DELTA_MAX = 1.0
PROBABLE_COG_DELTA_MAX = 20.0
PROBABLE_SOG_MIN = 1.5
PROBABLE_SOG_MAX = 5.0
# scan_unregistered_pairs 전용
CELL_SIZE = 0.01 # ~1.1km 격자
CANDIDATE_PROXIMITY_FACTOR = 2.0 # 후보 탐색 반경: PROXIMITY_NM × 2
CANDIDATE_SOG_MIN = 1.5 # 후보 속력 하한 (완화)
CANDIDATE_SOG_MAX = 5.0 # 후보 속력 상한 (완화)
# ──────────────────────────────────────────────────────────────
# 내부 헬퍼
# ──────────────────────────────────────────────────────────────
def _cog_delta(cog_a: float, cog_b: float) -> float:
"""두 COG 값의 각도 차이 (0~180°)."""
return abs((cog_a - cog_b + 180.0) % 360.0 - 180.0)
def _find_gaps(df: pd.DataFrame, gap_threshold_min: float = 10.0) -> list[tuple[pd.Timestamp, pd.Timestamp]]:
"""DataFrame 행 간 gap_threshold_min 초과 gap 구간 목록 반환.
Returns:
list of (gap_start, gap_end) as pd.Timestamp pairs
"""
if len(df) < 2:
return []
ts_series = pd.to_datetime(df['timestamp']).sort_values().reset_index(drop=True)
gaps: list[tuple[pd.Timestamp, pd.Timestamp]] = []
for i in range(1, len(ts_series)):
delta_min = (ts_series.iloc[i] - ts_series.iloc[i - 1]).total_seconds() / 60.0
if delta_min > gap_threshold_min:
gaps.append((ts_series.iloc[i - 1], ts_series.iloc[i]))
return gaps
def _overlap_minutes(
gaps_a: list[tuple[pd.Timestamp, pd.Timestamp]],
gaps_b: list[tuple[pd.Timestamp, pd.Timestamp]],
) -> float:
"""두 gap 목록의 시간 구간 겹침 합계 (분)."""
total = 0.0
for start_a, end_a in gaps_a:
for start_b, end_b in gaps_b:
overlap_start = max(start_a, start_b)
overlap_end = min(end_a, end_b)
if overlap_end > overlap_start:
total += (overlap_end - overlap_start).total_seconds() / 60.0
return total
def _max_sync_block(synced_series: 'pd.Series[bool]') -> int:
"""연속 True 블록의 최대 길이 반환."""
max_block = 0
current = 0
for val in synced_series:
if val:
current += 1
max_block = max(max_block, current)
else:
current = 0
return max_block
def _cell_key(lat: float, lon: float) -> tuple[int, int]:
return (round(lat / CELL_SIZE), round(lon / CELL_SIZE))
def _default_result(mmsi_b: str, reject_reason: str = '') -> dict:
return {
'pair_detected': False,
'tier': None,
'reject_reason': reject_reason,
'sync_duration_min': 0.0,
'max_sync_block_min': 0.0,
'mean_separation_nm': 0.0,
'sog_delta_mean': 0.0,
'cog_delta_mean': 0.0,
'simultaneous_gap_min': 0.0,
'g_codes': [],
'confidence': 0.0,
'pair_mmsi': mmsi_b,
}
# 사이클별 거부 사유 카운터 (scheduler 가 읽어 로그 출력 후 reset)
REJECT_COUNTERS: dict[str, int] = {
'empty_df': 0,
'missing_columns': 0,
'insufficient_aligned': 0,
'no_sync_at_any_tier': 0,
}
def reset_reject_counters() -> None:
for k in REJECT_COUNTERS:
REJECT_COUNTERS[k] = 0
# ──────────────────────────────────────────────────────────────
# 공개 API
# ──────────────────────────────────────────────────────────────
def detect_pair_trawl(
df_a: pd.DataFrame,
df_b: pd.DataFrame,
mmsi_a: str,
mmsi_b: str,
role_a: str = 'UNKNOWN',
role_b: str = 'UNKNOWN',
similarity: float = 0.0,
) -> dict:
"""쌍끌이 트롤 공조 탐지 (DAR-03 G-06).
Args:
df_a: 선박 A의 AIS DataFrame. 필수 컬럼: timestamp, lat, lon, sog, cog
df_b: 선박 B의 AIS DataFrame. 필수 컬럼: timestamp, lat, lon, sog, cog
mmsi_a: 선박 A MMSI
mmsi_b: 선박 B MMSI (결과의 pair_mmsi에 기록)
role_a/role_b: 'FISHING'|'CARRIER'|'PT_MAIN'|'PT_SUB'|'UNKNOWN'
similarity: find_pair_candidates가 계산한 1차 유사도 (0~1)
Returns:
위 필드들 + role_a/role_b/similarity/bonus/pair_type
"""
if df_a.empty or df_b.empty:
REJECT_COUNTERS['empty_df'] += 1
logger.debug('pair_trawl(%s, %s): empty DataFrame', mmsi_a, mmsi_b)
return _default_result(mmsi_b, 'empty_df')
df_a = _ensure_sog_cog(df_a)
df_b = _ensure_sog_cog(df_b)
join_key = _pair_join_key(df_a)
required_cols = {join_key, 'lat', 'lon', 'sog', 'cog'}
missing_a = required_cols - set(df_a.columns)
missing_b = required_cols - set(df_b.columns)
if missing_a or missing_b:
REJECT_COUNTERS['missing_columns'] += 1
logger.warning(
'pair_trawl(%s, %s): missing columns a=%s b=%s',
mmsi_a, mmsi_b, missing_a, missing_b,
)
return _default_result(mmsi_b, 'missing_columns')
# ── Step 1: join_key (time_bucket 우선, fallback timestamp) inner join ──
a = df_a[[join_key, 'lat', 'lon', 'sog', 'cog']].copy()
b = df_b[[join_key, 'lat', 'lon', 'sog', 'cog']].copy()
# 같은 bucket 에 다수 샘플이 있으면 평균
a = a.groupby(join_key, as_index=False).mean(numeric_only=True)
b = b.groupby(join_key, as_index=False).mean(numeric_only=True)
merged = pd.merge(
a.rename(columns={'lat': 'lat_a', 'lon': 'lon_a', 'sog': 'sog_a', 'cog': 'cog_a'}),
b.rename(columns={'lat': 'lat_b', 'lon': 'lon_b', 'sog': 'sog_b', 'cog': 'cog_b'}),
on=join_key,
how='inner',
).sort_values(join_key).reset_index(drop=True)
total_aligned = len(merged)
# SUSPECT tier 조차 성립 불가 (30min = 6 cycles)
if total_aligned < SUSPECT_MIN_BLOCK_CYCLES:
REJECT_COUNTERS['insufficient_aligned'] += 1
logger.debug(
'pair_trawl(%s, %s): only %d aligned rows (need %d for SUSPECT)',
mmsi_a, mmsi_b, total_aligned, SUSPECT_MIN_BLOCK_CYCLES,
)
return _default_result(mmsi_b, 'insufficient_aligned')
# ── Step 2: 행별 동기화 지표 계산 ───────────────────────
merged['distance_nm'] = merged.apply(
lambda r: haversine_nm(r['lat_a'], r['lon_a'], r['lat_b'], r['lon_b']),
axis=1,
)
merged['sog_delta'] = (merged['sog_a'] - merged['sog_b']).abs()
merged['cog_delta'] = merged.apply(
lambda r: _cog_delta(r['cog_a'], r['cog_b']),
axis=1,
)
# STRONG tier: 스펙 100% — 500m / SOG 2-4 / sog_delta 0.5 / cog 10° / 24 cycles 연속
merged['synced_strong'] = (
(merged['distance_nm'] <= PROXIMITY_NM)
& (merged['sog_delta'] <= SOG_DELTA_MAX)
& (merged['cog_delta'] <= COG_DELTA_MAX)
& merged['sog_a'].between(SOG_MIN, SOG_MAX)
& merged['sog_b'].between(SOG_MIN, SOG_MAX)
)
# PROBABLE tier: 완화 — 800m / SOG 1.5-5 / sog_delta 1.0 / cog 20° / 12 cycles 연속
merged['synced_probable'] = (
(merged['distance_nm'] <= PROBABLE_PROXIMITY_NM)
& (merged['sog_delta'] <= PROBABLE_SOG_DELTA_MAX)
& (merged['cog_delta'] <= PROBABLE_COG_DELTA_MAX)
& merged['sog_a'].between(PROBABLE_SOG_MIN, PROBABLE_SOG_MAX)
& merged['sog_b'].between(PROBABLE_SOG_MIN, PROBABLE_SOG_MAX)
)
# ── Step 3: tier 분류 — 가장 높은 tier 채택 ──────────────
strong_block = _max_sync_block(merged['synced_strong'])
probable_block = _max_sync_block(merged['synced_probable'])
strong_total = int(merged['synced_strong'].sum())
probable_total = int(merged['synced_probable'].sum())
strong_ratio = strong_total / total_aligned if total_aligned else 0.0
probable_ratio = probable_total / total_aligned if total_aligned else 0.0
tier: Optional[str] = None
if strong_block >= MIN_SYNC_CYCLES:
tier = 'STRONG'
used_col = 'synced_strong'
max_block_cycles = strong_block
elif probable_block >= PROBABLE_MIN_BLOCK_CYCLES and probable_ratio >= PROBABLE_MIN_SYNC_RATIO:
tier = 'PROBABLE'
used_col = 'synced_probable'
max_block_cycles = probable_block
elif probable_block >= SUSPECT_MIN_BLOCK_CYCLES and probable_ratio >= SUSPECT_MIN_SYNC_RATIO:
tier = 'SUSPECT'
used_col = 'synced_probable'
max_block_cycles = probable_block
else:
REJECT_COUNTERS['no_sync_at_any_tier'] += 1
logger.debug(
'pair_trawl(%s, %s): no tier — strong_block=%d probable_block=%d probable_ratio=%.2f',
mmsi_a, mmsi_b, strong_block, probable_block, probable_ratio,
)
return _default_result(mmsi_b, 'no_sync_at_any_tier')
merged['synced'] = merged[used_col]
total_synced = int(merged['synced'].sum())
sync_duration_min = total_synced * CYCLE_INTERVAL_MIN
max_sync_block_min = max_block_cycles * CYCLE_INTERVAL_MIN
mean_separation_nm = float(merged.loc[merged['synced'], 'distance_nm'].mean())
sog_delta_mean = float(merged.loc[merged['synced'], 'sog_delta'].mean())
cog_delta_mean = float(merged.loc[merged['synced'], 'cog_delta'].mean())
# ── Step 4: 동시 AIS 차단 검출 ──────────────────────────
gaps_a = _find_gaps(df_a, gap_threshold_min=10.0)
gaps_b = _find_gaps(df_b, gap_threshold_min=10.0)
simultaneous_gap_min = _overlap_minutes(gaps_a, gaps_b)
g_codes: list[str] = []
if simultaneous_gap_min >= SIMULTANEOUS_GAP_MIN:
g_codes.append('P-01')
# ── Step 5: 신뢰도 산출 ─────────────────────────────────
sync_ratio = min(1.0, total_synced / total_aligned)
synced_distances = merged.loc[merged['synced'], 'distance_nm']
if len(synced_distances) > 1:
std_distance = float(synced_distances.std())
else:
std_distance = 0.0
separation_stability = 1.0 - min(1.0, std_distance / PROXIMITY_NM)
sog_sync_quality = 1.0 - min(1.0, sog_delta_mean / SOG_DELTA_MAX)
confidence = round(
sync_ratio * 0.4
+ separation_stability * 0.3
+ sog_sync_quality * 0.3,
4,
)
# ── role 매칭 가점 (DAR-03 선종 정합) ─────────────────────
bonus = 0
pair_type = 'GENERIC'
if role_a == 'PT_MAIN' and role_b == 'PT_SUB':
bonus = 15; pair_type = 'PT_REGISTERED'
elif role_b == 'PT_MAIN' and role_a == 'PT_SUB':
bonus = 15; pair_type = 'PT_REGISTERED'
elif role_a == 'FISHING' and role_b == 'FISHING':
bonus = 10; pair_type = 'COOP_FISHING'
elif {role_a, role_b} == {'FISHING', 'CARRIER'}:
bonus = 15; pair_type = 'TRANSSHIP_LIKE'
logger.info(
'pair_trawl(%s, %s): %s — sync=%.0fmin max_block=%.0fmin '
'sep=%.3fnm confidence=%.3f bonus=%d type=%s g_codes=%s',
mmsi_a, mmsi_b, tier,
sync_duration_min, max_sync_block_min,
mean_separation_nm, confidence, bonus, pair_type, g_codes,
)
return {
'pair_detected': True,
'tier': tier,
'reject_reason': '',
'sync_duration_min': round(sync_duration_min, 1),
'max_sync_block_min': round(max_sync_block_min, 1),
'mean_separation_nm': round(mean_separation_nm, 4),
'sog_delta_mean': round(sog_delta_mean, 4),
'cog_delta_mean': round(cog_delta_mean, 4),
'simultaneous_gap_min': round(simultaneous_gap_min, 1),
'g_codes': g_codes,
'confidence': confidence,
'pair_mmsi': mmsi_b,
'role_a': role_a,
'role_b': role_b,
'similarity': round(similarity, 4),
'bonus': bonus,
'pair_type': pair_type,
}
def _classify_role(
mmsi: str,
info: dict,
pt_registered: set[str],
pt_sub_registered: set[str],
) -> str:
"""페어 후보 role 판정. 반환: PT_MAIN/PT_SUB/FISHING/CARRIER/UNKNOWN."""
if mmsi in pt_sub_registered:
return 'PT_SUB'
if mmsi in pt_registered:
return 'PT_MAIN'
kind = info.get('ship_kind_code', '') if info else ''
ship_ty = (info.get('ship_ty') or info.get('vessel_type') or '') if info else ''
if kind == '000020':
return 'FISHING'
if kind in ('000023', '000024'):
return 'CARRIER'
# 중국 412* 허가 어선은 ship_kind 없어도 FISHING 간주
if mmsi.startswith('412'):
return 'FISHING'
st = ship_ty.lower() if isinstance(ship_ty, str) else ''
if any(k in st for k in ('cargo', 'tanker', 'supply', 'carrier')):
return 'CARRIER'
if 'fishing' in st:
return 'FISHING'
return 'UNKNOWN'
def _ensure_sog_cog(df: pd.DataFrame) -> pd.DataFrame:
"""sog/cog 컬럼이 없으면 lat/lon/timestamp 로 haversine 계산하여 추가.
pair_trawl 은 vessel_store._tracks(raw_sog 만 보유) 전체를 pool 로 받는다.
select_analysis_targets 결과에 의존하면 중국 412* 외 선박(한국 440xxx 등)은
pool 에서 빠지므로, 필요 시 on-demand 로 계산한다.
"""
if 'sog' in df.columns and 'cog' in df.columns:
return df
if 'timestamp' not in df.columns or 'lat' not in df.columns or 'lon' not in df.columns:
return df
try:
# cache.vessel_store._compute_sog_cog 를 재사용
from cache.vessel_store import _compute_sog_cog
return _compute_sog_cog(df)
except Exception:
return df
def _pair_join_key(df: pd.DataFrame) -> str:
"""두 선박 궤적 inner-join 시 사용할 시간 키.
vessel_store._tracks 는 raw AIS timestamp(ms 단위)를 사용하므로 두 선박 간
동일 timestamp 가 우연히 일치할 확률이 거의 0 → inner join 결과 빈약.
5분 리샘플 단위 time_bucket 컬럼이 있으면 그것을 사용.
"""
return 'time_bucket' if 'time_bucket' in df.columns else 'timestamp'
def _trajectory_similarity(
df_a: pd.DataFrame, df_b: pd.DataFrame, min_samples: int = 6,
) -> tuple[float, int, dict]:
"""두 선박의 공통 타임스탬프 기반 궤적 유사도 (0~1).
Returns: (similarity, common_samples, breakdown)
breakdown = {location_score, sog_corr, cog_alignment}
"""
if df_a.empty or df_b.empty:
return 0.0, 0, {}
df_a = _ensure_sog_cog(df_a)
df_b = _ensure_sog_cog(df_b)
join_key = _pair_join_key(df_a)
cols = {join_key, 'lat', 'lon', 'sog', 'cog'}
if cols - set(df_a.columns) or cols - set(df_b.columns):
return 0.0, 0, {}
try:
a = df_a[[join_key, 'lat', 'lon', 'sog', 'cog']].copy()
b = df_b[[join_key, 'lat', 'lon', 'sog', 'cog']].copy()
# 같은 join_key 에 다수 샘플이 있을 수 있으므로 평균값으로 bucketize
a = a.groupby(join_key, as_index=False).mean(numeric_only=True)
b = b.groupby(join_key, as_index=False).mean(numeric_only=True)
m = pd.merge(
a.rename(columns={'lat': 'la', 'lon': 'oa', 'sog': 'sa', 'cog': 'ca'}),
b.rename(columns={'lat': 'lb', 'lon': 'ob', 'sog': 'sb', 'cog': 'cb'}),
on=join_key, how='inner',
)
except Exception:
return 0.0, 0, {}
n = len(m)
if n < min_samples:
return 0.0, n, {}
# location: 평균 거리 기반 (500m 이내 1.0 → 1km 0.0 선형)
dists = [
haversine_nm(r.la, r.oa, r.lb, r.ob)
for r in m.itertuples(index=False)
]
mean_dist_nm = sum(dists) / len(dists)
location_score = max(0.0, min(1.0, 1.0 - (mean_dist_nm - PROXIMITY_NM) / PROXIMITY_NM))
# sog 상관 (Pearson)
try:
sa = m['sa'].astype(float)
sb = m['sb'].astype(float)
if sa.std() > 0 and sb.std() > 0:
sog_corr = abs(float(sa.corr(sb)))
else:
sog_corr = 1.0 if (sa - sb).abs().mean() < 0.5 else 0.0
except Exception:
sog_corr = 0.0
# cog 방향 일치: 평균 각도차 → 10°=1.0, 90°=0.0 선형
cog_deltas = [
_cog_delta(float(r.ca), float(r.cb))
for r in m.itertuples(index=False)
]
mean_cog_delta = sum(cog_deltas) / len(cog_deltas)
cog_alignment = max(0.0, min(1.0, 1.0 - (mean_cog_delta - COG_DELTA_MAX) / 80.0))
similarity = 0.4 * location_score + 0.3 * sog_corr + 0.3 * cog_alignment
return round(similarity, 4), n, {
'location_score': round(location_score, 3),
'sog_corr': round(sog_corr, 3),
'cog_alignment': round(cog_alignment, 3),
'mean_distance_nm': round(mean_dist_nm, 4),
'mean_cog_delta_deg': round(mean_cog_delta, 2),
}
# bbox 1차 탐색 반경 (도 단위)
BBOX_DEG = 0.01 # 약 1.1km — 주변 후보만 컷
SIMILARITY_PAIR = 0.70 # 유력 페어
SIMILARITY_OBSERVE = 0.45 # 관찰 페어 (0.50 → 0.45 완화: pool 확대 후 recall 확보)
def find_pair_candidates(
base_mmsis: set[str],
vessel_dfs: dict[str, pd.DataFrame],
get_vessel_info,
pt_registered: Optional[set[str]] = None,
pt_sub_registered: Optional[set[str]] = None,
bbox_deg: float = BBOX_DEG,
min_common_samples: int = 6,
similarity_threshold: float = SIMILARITY_OBSERVE,
) -> list[dict]:
"""base 선박별로 bbox 1차 → 궤적 유사도 2차로 페어 후보 반환.
base 조건: PT 등록 선박, 어선(000020), 중국 412* 허가선 등 최소 1척 어선 성격.
target: 인접 격자 내 모든 선박. 궤적 유사도 ≥ similarity_threshold 이면 후보.
Returns: [{'base_mmsi', 'target_mmsi', 'similarity', 'common_samples',
'base_role', 'target_role', 'breakdown'}, ...]
"""
pt_registered = pt_registered or set()
pt_sub_registered = pt_sub_registered or set()
# 각 선박의 최근 위치 → grid 빌드
last_pos: dict[str, dict] = {}
for mmsi, df in vessel_dfs.items():
if df.empty or not all(c in df.columns for c in ('lat', 'lon')):
continue
try:
row = df.iloc[-1]
last_pos[mmsi] = {'lat': float(row['lat']), 'lon': float(row['lon'])}
except Exception:
continue
def _cell(lat: float, lon: float) -> tuple[int, int]:
return (int(lat / bbox_deg), int(lon / bbox_deg))
grid: dict[tuple[int, int], list[str]] = {}
for mmsi, p in last_pos.items():
grid.setdefault(_cell(p['lat'], p['lon']), []).append(mmsi)
checked: set[tuple[str, str]] = set()
results: list[dict] = []
for base in base_mmsis:
if base not in last_pos:
continue
bp = last_pos[base]
bc = _cell(bp['lat'], bp['lon'])
# 인접 9 cell
neighbors: list[str] = []
for dr in (-1, 0, 1):
for dc in (-1, 0, 1):
neighbors.extend(grid.get((bc[0] + dr, bc[1] + dc), []))
for target in neighbors:
if target == base:
continue
key = (base, target) if base < target else (target, base)
if key in checked:
continue
checked.add(key)
sim, n_common, breakdown = _trajectory_similarity(
vessel_dfs[base], vessel_dfs[target], min_common_samples,
)
if sim < similarity_threshold:
continue
info_a = get_vessel_info(base) if get_vessel_info else {}
info_b = get_vessel_info(target) if get_vessel_info else {}
role_a = _classify_role(base, info_a, pt_registered, pt_sub_registered)
role_b = _classify_role(target, info_b, pt_registered, pt_sub_registered)
# 최소 1척이 어선 성격이어야 함
fishing_like = {'FISHING', 'PT_MAIN', 'PT_SUB'}
if role_a not in fishing_like and role_b not in fishing_like:
continue
results.append({
'base_mmsi': base,
'target_mmsi': target,
'similarity': sim,
'common_samples': n_common,
'base_role': role_a,
'target_role': role_b,
'breakdown': breakdown,
'is_strong': sim >= SIMILARITY_PAIR,
})
logger.info(
'find_pair_candidates: base=%d, pool=%d → candidates=%d (strong=%d)',
len(base_mmsis), len(last_pos), len(results),
sum(1 for r in results if r['is_strong']),
)
return results
def scan_unregistered_pairs(
vessel_dfs: dict[str, pd.DataFrame],
registered_pairs: set[tuple[str, str]],
) -> list[tuple[str, str]]:
"""fleet_registry에 없는 TRAWL 선박 중 500m 이내 + 속력 동기화 조건을
만족하는 쌍 후보 반환. cell-key partitioning으로 O(n²) 회피.
Args:
vessel_dfs: mmsi → AIS DataFrame. 각 DataFrame은 timestamp, lat, lon, sog 컬럼 필요
registered_pairs: 이미 확인된 쌍 (fleet_tracker 제공). 정규화: (smaller, larger)
Returns:
list of (mmsi_a, mmsi_b) candidate pairs (정규화된 순서)
"""
if len(vessel_dfs) < 2:
return []
CANDIDATE_PROXIMITY_NM = PROXIMITY_NM * CANDIDATE_PROXIMITY_FACTOR
# ── 각 선박의 마지막 위치 추출 ──────────────────────────
last_positions: dict[str, dict] = {}
for mmsi, df in vessel_dfs.items():
if df.empty or 'lat' not in df.columns or 'lon' not in df.columns or 'sog' not in df.columns:
continue
try:
last_row = df.sort_values('timestamp').iloc[-1]
lat = float(last_row['lat'])
lon = float(last_row['lon'])
sog = float(last_row['sog'])
except Exception:
continue
last_positions[mmsi] = {'lat': lat, 'lon': lon, 'sog': sog}
if len(last_positions) < 2:
return []
# ── cell-key 격자 구성 ───────────────────────────────────
cell_map: dict[tuple[int, int], list[str]] = {}
for mmsi, pos in last_positions.items():
key = _cell_key(pos['lat'], pos['lon'])
if key not in cell_map:
cell_map[key] = []
cell_map[key].append(mmsi)
# ── 인접 9셀 내 후보 쌍 탐색 ────────────────────────────
candidates: list[tuple[str, str]] = []
checked: set[tuple[str, str]] = set()
for mmsi_a, pos_a in last_positions.items():
base_cell = _cell_key(pos_a['lat'], pos_a['lon'])
# 인접 8셀 + 자기 셀 수집
neighbor_mmsis: list[str] = []
for dr in (-1, 0, 1):
for dc in (-1, 0, 1):
neighbor_cell = (base_cell[0] + dr, base_cell[1] + dc)
neighbor_mmsis.extend(cell_map.get(neighbor_cell, []))
for mmsi_b in neighbor_mmsis:
if mmsi_b == mmsi_a:
continue
# 정규화된 쌍 키
pair_key: tuple[str, str] = (
(mmsi_a, mmsi_b) if mmsi_a < mmsi_b else (mmsi_b, mmsi_a)
)
# 중복 검사
if pair_key in checked:
continue
checked.add(pair_key)
# 이미 등록된 쌍 건너뜀
if pair_key in registered_pairs:
continue
pos_b = last_positions.get(mmsi_b)
if pos_b is None:
continue
# 속력 범위 필터 (완화 기준)
sog_a = pos_a['sog']
sog_b = pos_b['sog']
if not (CANDIDATE_SOG_MIN <= sog_a <= CANDIDATE_SOG_MAX):
continue
if not (CANDIDATE_SOG_MIN <= sog_b <= CANDIDATE_SOG_MAX):
continue
# 거리 필터
dist_nm = haversine_nm(pos_a['lat'], pos_a['lon'], pos_b['lat'], pos_b['lon'])
if dist_nm > CANDIDATE_PROXIMITY_NM:
continue
candidates.append(pair_key)
logger.debug(
'scan_unregistered_pairs: %d vessels, %d candidates found',
len(last_positions), len(candidates),
)
return candidates