refactor(prediction): pair_trawl tier 분류 + join_key time_bucket 전환

두 가지 근본 버그를 동시에 해결:

1. Join key 버그 — raw AIS timestamp(ms 단위) inner join 은 두 선박 간 우연히
   일치하는 확률이 거의 0. vessel_store._tracks 의 time_bucket(5분 리샘플)
   컬럼을 우선 사용. _pair_join_key() 헬퍼로 fallback 지원.

2. AND 게이트 0건 문제 — 스펙 100%(2h 연속 + 500m + SOG 2-4 + sog_delta 0.5 +
   cog 10°)를 전부 요구하면 실제 공조 페어를 놓침. Tier 분류로 재설계:
   - STRONG  : 스펙 100% (24 cycles, 기존 조건)
   - PROBABLE: 800m / SOG 1.5-5 / sog_delta 1.0 / cog 20° / 12 cycles + 0.6 ratio
   - SUSPECT : 동일 완화 조건 / 6 cycles + 0.3 ratio (플래그만)
   G-06 판정은 STRONG/PROBABLE 만. SUSPECT 는 약한 신호로 노출.

거부 사유 카운터(REJECT_COUNTERS) + tier 카운트를 사이클별 로그 출력.
'조건이 엄격한건지 실제 페어가 없는건지' 원인 구분 가능.

피드백 메모리: feedback_detection_tier.md
This commit is contained in:
htlee 2026-04-16 09:47:08 +09:00
부모 f5374a5316
커밋 68e690d791
2개의 변경된 파일144개의 추가작업 그리고 50개의 파일을 삭제

파일 보기

@ -37,15 +37,30 @@ logger = logging.getLogger(__name__)
# 상수
# ──────────────────────────────────────────────────────────────
PROXIMITY_NM = 0.27 # 500m ≈ 0.27 NM
SOG_DELTA_MAX = 0.5 # kn
COG_DELTA_MAX = 10.0 # degrees
SOG_MIN = 2.0 # kn (조업 속력 하한)
SOG_MAX = 4.0 # kn (조업 속력 상한)
MIN_SYNC_CYCLES = 24 # 24 × 5min = 2시간
PROXIMITY_NM = 0.27 # 500m ≈ 0.27 NM (STRONG tier 스펙)
SOG_DELTA_MAX = 0.5 # kn (STRONG)
COG_DELTA_MAX = 10.0 # degrees (STRONG)
SOG_MIN = 2.0 # kn (조업 속력 하한 — STRONG)
SOG_MAX = 4.0 # kn (조업 속력 상한 — STRONG)
MIN_SYNC_CYCLES = 24 # 24 × 5min = 2시간 (STRONG)
SIMULTANEOUS_GAP_MIN = 30 # 동시 AIS 차단 기준 (분)
CYCLE_INTERVAL_MIN = 5 # 5분 리샘플 데이터
# Tier별 완화 임계 — AND 게이트로 0건 만들지 않고 신호 강도에 맞춰 분류
# STRONG : 스펙 100% (2h 연속 + 전 조건)
# PROBABLE : 1h 연속 또는 누적 2h + sync_ratio 0.6
# SUSPECT : 30분+ + sync_ratio 0.3 (약한 신호, 플래그만)
PROBABLE_MIN_BLOCK_CYCLES = 12 # 1h
PROBABLE_MIN_SYNC_RATIO = 0.6
SUSPECT_MIN_BLOCK_CYCLES = 6 # 30min
SUSPECT_MIN_SYNC_RATIO = 0.3
# 완화 tier 기준: proximity 800m, SOG 1.5-5.0kn, sog_delta 1.0, cog 20°
PROBABLE_PROXIMITY_NM = 0.43 # ≈ 800m
PROBABLE_SOG_DELTA_MAX = 1.0
PROBABLE_COG_DELTA_MAX = 20.0
PROBABLE_SOG_MIN = 1.5
PROBABLE_SOG_MAX = 5.0
# scan_unregistered_pairs 전용
CELL_SIZE = 0.01 # ~1.1km 격자
CANDIDATE_PROXIMITY_FACTOR = 2.0 # 후보 탐색 반경: PROXIMITY_NM × 2
@ -110,9 +125,11 @@ def _cell_key(lat: float, lon: float) -> tuple[int, int]:
return (round(lat / CELL_SIZE), round(lon / CELL_SIZE))
def _default_result(mmsi_b: str) -> dict:
def _default_result(mmsi_b: str, reject_reason: str = '') -> dict:
return {
'pair_detected': False,
'tier': None,
'reject_reason': reject_reason,
'sync_duration_min': 0.0,
'max_sync_block_min': 0.0,
'mean_separation_nm': 0.0,
@ -125,6 +142,20 @@ def _default_result(mmsi_b: str) -> dict:
}
# 사이클별 거부 사유 카운터 (scheduler 가 읽어 로그 출력 후 reset)
REJECT_COUNTERS: dict[str, int] = {
'empty_df': 0,
'missing_columns': 0,
'insufficient_aligned': 0,
'no_sync_at_any_tier': 0,
}
def reset_reject_counters() -> None:
for k in REJECT_COUNTERS:
REJECT_COUNTERS[k] = 0
# ──────────────────────────────────────────────────────────────
# 공개 API
# ──────────────────────────────────────────────────────────────
@ -151,42 +182,48 @@ def detect_pair_trawl(
Returns:
필드들 + role_a/role_b/similarity/bonus/pair_type
"""
required_cols = {'timestamp', 'lat', 'lon', 'sog', 'cog'}
if df_a.empty or df_b.empty:
REJECT_COUNTERS['empty_df'] += 1
logger.debug('pair_trawl(%s, %s): empty DataFrame', mmsi_a, mmsi_b)
return _default_result(mmsi_b)
return _default_result(mmsi_b, 'empty_df')
df_a = _ensure_sog_cog(df_a)
df_b = _ensure_sog_cog(df_b)
join_key = _pair_join_key(df_a)
required_cols = {join_key, 'lat', 'lon', 'sog', 'cog'}
missing_a = required_cols - set(df_a.columns)
missing_b = required_cols - set(df_b.columns)
if missing_a or missing_b:
REJECT_COUNTERS['missing_columns'] += 1
logger.warning(
'pair_trawl(%s, %s): missing columns a=%s b=%s',
mmsi_a, mmsi_b, missing_a, missing_b,
)
return _default_result(mmsi_b)
return _default_result(mmsi_b, 'missing_columns')
# ── Step 1: timestamp inner join ────────────────────────
a = df_a[['timestamp', 'lat', 'lon', 'sog', 'cog']].copy()
b = df_b[['timestamp', 'lat', 'lon', 'sog', 'cog']].copy()
a['timestamp'] = pd.to_datetime(a['timestamp'])
b['timestamp'] = pd.to_datetime(b['timestamp'])
# ── Step 1: join_key (time_bucket 우선, fallback timestamp) inner join ──
a = df_a[[join_key, 'lat', 'lon', 'sog', 'cog']].copy()
b = df_b[[join_key, 'lat', 'lon', 'sog', 'cog']].copy()
# 같은 bucket 에 다수 샘플이 있으면 평균
a = a.groupby(join_key, as_index=False).mean(numeric_only=True)
b = b.groupby(join_key, as_index=False).mean(numeric_only=True)
merged = pd.merge(
a.rename(columns={'lat': 'lat_a', 'lon': 'lon_a', 'sog': 'sog_a', 'cog': 'cog_a'}),
b.rename(columns={'lat': 'lat_b', 'lon': 'lon_b', 'sog': 'sog_b', 'cog': 'cog_b'}),
on='timestamp',
on=join_key,
how='inner',
).sort_values('timestamp').reset_index(drop=True)
).sort_values(join_key).reset_index(drop=True)
total_aligned = len(merged)
if total_aligned < MIN_SYNC_CYCLES:
# SUSPECT tier 조차 성립 불가 (30min = 6 cycles)
if total_aligned < SUSPECT_MIN_BLOCK_CYCLES:
REJECT_COUNTERS['insufficient_aligned'] += 1
logger.debug(
'pair_trawl(%s, %s): only %d aligned rows (need %d)',
mmsi_a, mmsi_b, total_aligned, MIN_SYNC_CYCLES,
'pair_trawl(%s, %s): only %d aligned rows (need %d for SUSPECT)',
mmsi_a, mmsi_b, total_aligned, SUSPECT_MIN_BLOCK_CYCLES,
)
return _default_result(mmsi_b)
return _default_result(mmsi_b, 'insufficient_aligned')
# ── Step 2: 행별 동기화 지표 계산 ───────────────────────
merged['distance_nm'] = merged.apply(
@ -198,26 +235,53 @@ def detect_pair_trawl(
lambda r: _cog_delta(r['cog_a'], r['cog_b']),
axis=1,
)
merged['both_in_range'] = (
merged['sog_a'].between(SOG_MIN, SOG_MAX)
& merged['sog_b'].between(SOG_MIN, SOG_MAX)
)
merged['synced'] = (
# STRONG tier: 스펙 100% — 500m / SOG 2-4 / sog_delta 0.5 / cog 10° / 24 cycles 연속
merged['synced_strong'] = (
(merged['distance_nm'] <= PROXIMITY_NM)
& (merged['sog_delta'] <= SOG_DELTA_MAX)
& (merged['cog_delta'] <= COG_DELTA_MAX)
& merged['both_in_range']
& merged['sog_a'].between(SOG_MIN, SOG_MAX)
& merged['sog_b'].between(SOG_MIN, SOG_MAX)
)
# PROBABLE tier: 완화 — 800m / SOG 1.5-5 / sog_delta 1.0 / cog 20° / 12 cycles 연속
merged['synced_probable'] = (
(merged['distance_nm'] <= PROBABLE_PROXIMITY_NM)
& (merged['sog_delta'] <= PROBABLE_SOG_DELTA_MAX)
& (merged['cog_delta'] <= PROBABLE_COG_DELTA_MAX)
& merged['sog_a'].between(PROBABLE_SOG_MIN, PROBABLE_SOG_MAX)
& merged['sog_b'].between(PROBABLE_SOG_MIN, PROBABLE_SOG_MAX)
)
# ── Step 3: 연속 블록 탐지 ──────────────────────────────
max_block_cycles = _max_sync_block(merged['synced'])
if max_block_cycles < MIN_SYNC_CYCLES:
logger.debug(
'pair_trawl(%s, %s): max sync block %d cycles < %d required',
mmsi_a, mmsi_b, max_block_cycles, MIN_SYNC_CYCLES,
)
return _default_result(mmsi_b)
# ── Step 3: tier 분류 — 가장 높은 tier 채택 ──────────────
strong_block = _max_sync_block(merged['synced_strong'])
probable_block = _max_sync_block(merged['synced_probable'])
strong_total = int(merged['synced_strong'].sum())
probable_total = int(merged['synced_probable'].sum())
strong_ratio = strong_total / total_aligned if total_aligned else 0.0
probable_ratio = probable_total / total_aligned if total_aligned else 0.0
tier: Optional[str] = None
if strong_block >= MIN_SYNC_CYCLES:
tier = 'STRONG'
used_col = 'synced_strong'
max_block_cycles = strong_block
elif probable_block >= PROBABLE_MIN_BLOCK_CYCLES and probable_ratio >= PROBABLE_MIN_SYNC_RATIO:
tier = 'PROBABLE'
used_col = 'synced_probable'
max_block_cycles = probable_block
elif probable_block >= SUSPECT_MIN_BLOCK_CYCLES and probable_ratio >= SUSPECT_MIN_SYNC_RATIO:
tier = 'SUSPECT'
used_col = 'synced_probable'
max_block_cycles = probable_block
else:
REJECT_COUNTERS['no_sync_at_any_tier'] += 1
logger.debug(
'pair_trawl(%s, %s): no tier — strong_block=%d probable_block=%d probable_ratio=%.2f',
mmsi_a, mmsi_b, strong_block, probable_block, probable_ratio,
)
return _default_result(mmsi_b, 'no_sync_at_any_tier')
merged['synced'] = merged[used_col]
total_synced = int(merged['synced'].sum())
sync_duration_min = total_synced * CYCLE_INTERVAL_MIN
max_sync_block_min = max_block_cycles * CYCLE_INTERVAL_MIN
@ -267,15 +331,17 @@ def detect_pair_trawl(
bonus = 15; pair_type = 'TRANSSHIP_LIKE'
logger.info(
'pair_trawl(%s, %s): detected — sync=%.0fmin max_block=%.0fmin '
'pair_trawl(%s, %s): %s — sync=%.0fmin max_block=%.0fmin '
'sep=%.3fnm confidence=%.3f bonus=%d type=%s g_codes=%s',
mmsi_a, mmsi_b,
mmsi_a, mmsi_b, tier,
sync_duration_min, max_sync_block_min,
mean_separation_nm, confidence, bonus, pair_type, g_codes,
)
return {
'pair_detected': True,
'tier': tier,
'reject_reason': '',
'sync_duration_min': round(sync_duration_min, 1),
'max_sync_block_min': round(max_sync_block_min, 1),
'mean_separation_nm': round(mean_separation_nm, 4),
@ -340,6 +406,16 @@ def _ensure_sog_cog(df: pd.DataFrame) -> pd.DataFrame:
return df
def _pair_join_key(df: pd.DataFrame) -> str:
"""두 선박 궤적 inner-join 시 사용할 시간 키.
vessel_store._tracks raw AIS timestamp(ms 단위) 사용하므로 선박
동일 timestamp 우연히 일치할 확률이 거의 0 inner join 결과 빈약.
5 리샘플 단위 time_bucket 컬럼이 있으면 그것을 사용.
"""
return 'time_bucket' if 'time_bucket' in df.columns else 'timestamp'
def _trajectory_similarity(
df_a: pd.DataFrame, df_b: pd.DataFrame, min_samples: int = 6,
) -> tuple[float, int, dict]:
@ -352,18 +428,20 @@ def _trajectory_similarity(
return 0.0, 0, {}
df_a = _ensure_sog_cog(df_a)
df_b = _ensure_sog_cog(df_b)
cols = {'timestamp', 'lat', 'lon', 'sog', 'cog'}
join_key = _pair_join_key(df_a)
cols = {join_key, 'lat', 'lon', 'sog', 'cog'}
if cols - set(df_a.columns) or cols - set(df_b.columns):
return 0.0, 0, {}
try:
a = df_a[['timestamp', 'lat', 'lon', 'sog', 'cog']].copy()
b = df_b[['timestamp', 'lat', 'lon', 'sog', 'cog']].copy()
a['timestamp'] = pd.to_datetime(a['timestamp'])
b['timestamp'] = pd.to_datetime(b['timestamp'])
a = df_a[[join_key, 'lat', 'lon', 'sog', 'cog']].copy()
b = df_b[[join_key, 'lat', 'lon', 'sog', 'cog']].copy()
# 같은 join_key 에 다수 샘플이 있을 수 있으므로 평균값으로 bucketize
a = a.groupby(join_key, as_index=False).mean(numeric_only=True)
b = b.groupby(join_key, as_index=False).mean(numeric_only=True)
m = pd.merge(
a.rename(columns={'lat': 'la', 'lon': 'oa', 'sog': 'sa', 'cog': 'ca'}),
b.rename(columns={'lat': 'lb', 'lon': 'ob', 'sog': 'sb', 'cog': 'cb'}),
on='timestamp', how='inner',
on=join_key, how='inner',
)
except Exception:
return 0.0, 0, {}

파일 보기

@ -241,6 +241,9 @@ def run_analysis_cycle():
pt_sub_registered=pt_sub_registered,
min_common_samples=4,
)
from algorithms.pair_trawl import REJECT_COUNTERS, reset_reject_counters
reset_reject_counters()
tier_counts = {'STRONG': 0, 'PROBABLE': 0, 'SUSPECT': 0}
pt_det = 0; coop_det = 0
for cand in pair_candidates:
ma, mb = cand['base_mmsi'], cand['target_mmsi']
@ -253,6 +256,8 @@ def run_analysis_cycle():
)
if not result.get('pair_detected'):
continue
tier = result.get('tier') or 'UNKNOWN'
tier_counts[tier] = tier_counts.get(tier, 0) + 1
pair_results[ma] = {**result, 'pair_mmsi': mb}
pair_results[mb] = {**result, 'pair_mmsi': ma}
if result.get('pair_type') == 'PT_REGISTERED':
@ -260,8 +265,14 @@ def run_analysis_cycle():
elif result.get('pair_type') == 'COOP_FISHING':
coop_det += 1
logger.info(
'pair detection: candidates=%d, detected=%d (pt=%d, coop=%d)',
len(pair_candidates), len(pair_results) // 2, pt_det, coop_det,
'pair detection: candidates=%d, detected=%d '
'(STRONG=%d PROBABLE=%d SUSPECT=%d, pt=%d coop=%d) '
'reject={empty=%d miss=%d insuf_align=%d no_sync=%d}',
len(pair_candidates), len(pair_results) // 2,
tier_counts['STRONG'], tier_counts['PROBABLE'], tier_counts['SUSPECT'],
pt_det, coop_det,
REJECT_COUNTERS['empty_df'], REJECT_COUNTERS['missing_columns'],
REJECT_COUNTERS['insufficient_aligned'], REJECT_COUNTERS['no_sync_at_any_tier'],
)
except Exception as e:
logger.warning('pair detection failed: %s', e)
@ -363,9 +374,14 @@ def run_analysis_cycle():
pair_result = pair_results.get(mmsi)
if pair_result and not pair_result.get('pair_detected'):
pair_result = None
# G-06 판정은 pair_type='PT_REGISTERED' 또는 엄격 sync 조건 만족일 때만
if pair_result and pair_result.get('pair_type') not in ('PT_REGISTERED', 'TRANSSHIP_LIKE', 'COOP_FISHING', 'GENERIC'):
pair_result = None
# G-06 판정은 STRONG/PROBABLE tier + 유효 pair_type 만. SUSPECT 는 플래그만 유지.
if pair_result:
if pair_result.get('tier') not in ('STRONG', 'PROBABLE'):
pair_result = None
elif pair_result.get('pair_type') not in (
'PT_REGISTERED', 'TRANSSHIP_LIKE', 'COOP_FISHING', 'GENERIC',
):
pair_result = None
gear_episodes: list = []
gear_positions: list = []