From 68e690d791303aeca10c2076e4e3caec2206ab4b Mon Sep 17 00:00:00 2001 From: htlee Date: Thu, 16 Apr 2026 09:47:08 +0900 Subject: [PATCH] =?UTF-8?q?refactor(prediction):=20pair=5Ftrawl=20tier=20?= =?UTF-8?q?=EB=B6=84=EB=A5=98=20+=20join=5Fkey=20time=5Fbucket=20=EC=A0=84?= =?UTF-8?q?=ED=99=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 두 가지 근본 버그를 동시에 해결: 1. Join key 버그 — raw AIS timestamp(ms 단위) inner join 은 두 선박 간 우연히 일치하는 확률이 거의 0. vessel_store._tracks 의 time_bucket(5분 리샘플) 컬럼을 우선 사용. _pair_join_key() 헬퍼로 fallback 지원. 2. AND 게이트 0건 문제 — 스펙 100%(2h 연속 + 500m + SOG 2-4 + sog_delta 0.5 + cog 10°)를 전부 요구하면 실제 공조 페어를 놓침. Tier 분류로 재설계: - STRONG : 스펙 100% (24 cycles, 기존 조건) - PROBABLE: 800m / SOG 1.5-5 / sog_delta 1.0 / cog 20° / 12 cycles + 0.6 ratio - SUSPECT : 동일 완화 조건 / 6 cycles + 0.3 ratio (플래그만) G-06 판정은 STRONG/PROBABLE 만. SUSPECT 는 약한 신호로 노출. 거부 사유 카운터(REJECT_COUNTERS) + tier 카운트를 사이클별 로그 출력. '조건이 엄격한건지 실제 페어가 없는건지' 원인 구분 가능. 피드백 메모리: feedback_detection_tier.md --- prediction/algorithms/pair_trawl.py | 168 ++++++++++++++++++++-------- prediction/scheduler.py | 26 ++++- 2 files changed, 144 insertions(+), 50 deletions(-) diff --git a/prediction/algorithms/pair_trawl.py b/prediction/algorithms/pair_trawl.py index d1e628a..d794fea 100644 --- a/prediction/algorithms/pair_trawl.py +++ b/prediction/algorithms/pair_trawl.py @@ -37,15 +37,30 @@ logger = logging.getLogger(__name__) # 상수 # ────────────────────────────────────────────────────────────── -PROXIMITY_NM = 0.27 # 500m ≈ 0.27 NM -SOG_DELTA_MAX = 0.5 # kn -COG_DELTA_MAX = 10.0 # degrees -SOG_MIN = 2.0 # kn (조업 속력 하한) -SOG_MAX = 4.0 # kn (조업 속력 상한) -MIN_SYNC_CYCLES = 24 # 24 × 5min = 2시간 +PROXIMITY_NM = 0.27 # 500m ≈ 0.27 NM (STRONG tier 스펙) +SOG_DELTA_MAX = 0.5 # kn (STRONG) +COG_DELTA_MAX = 10.0 # degrees (STRONG) +SOG_MIN = 2.0 # kn (조업 속력 하한 — STRONG) +SOG_MAX = 4.0 # kn (조업 속력 상한 — STRONG) +MIN_SYNC_CYCLES = 24 # 24 × 5min = 2시간 (STRONG) SIMULTANEOUS_GAP_MIN = 30 # 동시 AIS 차단 기준 (분) CYCLE_INTERVAL_MIN = 5 # 5분 리샘플 데이터 +# Tier별 완화 임계 — AND 게이트로 0건 만들지 않고 신호 강도에 맞춰 분류 +# STRONG : 스펙 100% (2h 연속 + 전 조건) +# PROBABLE : 1h 연속 또는 누적 2h + sync_ratio 0.6 +# SUSPECT : 30분+ + sync_ratio 0.3 (약한 신호, 플래그만) +PROBABLE_MIN_BLOCK_CYCLES = 12 # 1h +PROBABLE_MIN_SYNC_RATIO = 0.6 +SUSPECT_MIN_BLOCK_CYCLES = 6 # 30min +SUSPECT_MIN_SYNC_RATIO = 0.3 +# 완화 tier 기준: proximity 800m, SOG 1.5-5.0kn, sog_delta 1.0, cog 20° +PROBABLE_PROXIMITY_NM = 0.43 # ≈ 800m +PROBABLE_SOG_DELTA_MAX = 1.0 +PROBABLE_COG_DELTA_MAX = 20.0 +PROBABLE_SOG_MIN = 1.5 +PROBABLE_SOG_MAX = 5.0 + # scan_unregistered_pairs 전용 CELL_SIZE = 0.01 # ~1.1km 격자 CANDIDATE_PROXIMITY_FACTOR = 2.0 # 후보 탐색 반경: PROXIMITY_NM × 2 @@ -110,9 +125,11 @@ def _cell_key(lat: float, lon: float) -> tuple[int, int]: return (round(lat / CELL_SIZE), round(lon / CELL_SIZE)) -def _default_result(mmsi_b: str) -> dict: +def _default_result(mmsi_b: str, reject_reason: str = '') -> dict: return { 'pair_detected': False, + 'tier': None, + 'reject_reason': reject_reason, 'sync_duration_min': 0.0, 'max_sync_block_min': 0.0, 'mean_separation_nm': 0.0, @@ -125,6 +142,20 @@ def _default_result(mmsi_b: str) -> dict: } +# 사이클별 거부 사유 카운터 (scheduler 가 읽어 로그 출력 후 reset) +REJECT_COUNTERS: dict[str, int] = { + 'empty_df': 0, + 'missing_columns': 0, + 'insufficient_aligned': 0, + 'no_sync_at_any_tier': 0, +} + + +def reset_reject_counters() -> None: + for k in REJECT_COUNTERS: + REJECT_COUNTERS[k] = 0 + + # ────────────────────────────────────────────────────────────── # 공개 API # ────────────────────────────────────────────────────────────── @@ -151,42 +182,48 @@ def detect_pair_trawl( Returns: 위 필드들 + role_a/role_b/similarity/bonus/pair_type """ - required_cols = {'timestamp', 'lat', 'lon', 'sog', 'cog'} - if df_a.empty or df_b.empty: + REJECT_COUNTERS['empty_df'] += 1 logger.debug('pair_trawl(%s, %s): empty DataFrame', mmsi_a, mmsi_b) - return _default_result(mmsi_b) + return _default_result(mmsi_b, 'empty_df') + df_a = _ensure_sog_cog(df_a) + df_b = _ensure_sog_cog(df_b) + join_key = _pair_join_key(df_a) + required_cols = {join_key, 'lat', 'lon', 'sog', 'cog'} missing_a = required_cols - set(df_a.columns) missing_b = required_cols - set(df_b.columns) if missing_a or missing_b: + REJECT_COUNTERS['missing_columns'] += 1 logger.warning( 'pair_trawl(%s, %s): missing columns a=%s b=%s', mmsi_a, mmsi_b, missing_a, missing_b, ) - return _default_result(mmsi_b) + return _default_result(mmsi_b, 'missing_columns') - # ── Step 1: timestamp inner join ──────────────────────── - a = df_a[['timestamp', 'lat', 'lon', 'sog', 'cog']].copy() - b = df_b[['timestamp', 'lat', 'lon', 'sog', 'cog']].copy() - - a['timestamp'] = pd.to_datetime(a['timestamp']) - b['timestamp'] = pd.to_datetime(b['timestamp']) + # ── Step 1: join_key (time_bucket 우선, fallback timestamp) inner join ── + a = df_a[[join_key, 'lat', 'lon', 'sog', 'cog']].copy() + b = df_b[[join_key, 'lat', 'lon', 'sog', 'cog']].copy() + # 같은 bucket 에 다수 샘플이 있으면 평균 + a = a.groupby(join_key, as_index=False).mean(numeric_only=True) + b = b.groupby(join_key, as_index=False).mean(numeric_only=True) merged = pd.merge( a.rename(columns={'lat': 'lat_a', 'lon': 'lon_a', 'sog': 'sog_a', 'cog': 'cog_a'}), b.rename(columns={'lat': 'lat_b', 'lon': 'lon_b', 'sog': 'sog_b', 'cog': 'cog_b'}), - on='timestamp', + on=join_key, how='inner', - ).sort_values('timestamp').reset_index(drop=True) + ).sort_values(join_key).reset_index(drop=True) total_aligned = len(merged) - if total_aligned < MIN_SYNC_CYCLES: + # SUSPECT tier 조차 성립 불가 (30min = 6 cycles) + if total_aligned < SUSPECT_MIN_BLOCK_CYCLES: + REJECT_COUNTERS['insufficient_aligned'] += 1 logger.debug( - 'pair_trawl(%s, %s): only %d aligned rows (need %d)', - mmsi_a, mmsi_b, total_aligned, MIN_SYNC_CYCLES, + 'pair_trawl(%s, %s): only %d aligned rows (need %d for SUSPECT)', + mmsi_a, mmsi_b, total_aligned, SUSPECT_MIN_BLOCK_CYCLES, ) - return _default_result(mmsi_b) + return _default_result(mmsi_b, 'insufficient_aligned') # ── Step 2: 행별 동기화 지표 계산 ─────────────────────── merged['distance_nm'] = merged.apply( @@ -198,26 +235,53 @@ def detect_pair_trawl( lambda r: _cog_delta(r['cog_a'], r['cog_b']), axis=1, ) - merged['both_in_range'] = ( - merged['sog_a'].between(SOG_MIN, SOG_MAX) - & merged['sog_b'].between(SOG_MIN, SOG_MAX) - ) - merged['synced'] = ( + # STRONG tier: 스펙 100% — 500m / SOG 2-4 / sog_delta 0.5 / cog 10° / 24 cycles 연속 + merged['synced_strong'] = ( (merged['distance_nm'] <= PROXIMITY_NM) & (merged['sog_delta'] <= SOG_DELTA_MAX) & (merged['cog_delta'] <= COG_DELTA_MAX) - & merged['both_in_range'] + & merged['sog_a'].between(SOG_MIN, SOG_MAX) + & merged['sog_b'].between(SOG_MIN, SOG_MAX) + ) + # PROBABLE tier: 완화 — 800m / SOG 1.5-5 / sog_delta 1.0 / cog 20° / 12 cycles 연속 + merged['synced_probable'] = ( + (merged['distance_nm'] <= PROBABLE_PROXIMITY_NM) + & (merged['sog_delta'] <= PROBABLE_SOG_DELTA_MAX) + & (merged['cog_delta'] <= PROBABLE_COG_DELTA_MAX) + & merged['sog_a'].between(PROBABLE_SOG_MIN, PROBABLE_SOG_MAX) + & merged['sog_b'].between(PROBABLE_SOG_MIN, PROBABLE_SOG_MAX) ) - # ── Step 3: 연속 블록 탐지 ────────────────────────────── - max_block_cycles = _max_sync_block(merged['synced']) - if max_block_cycles < MIN_SYNC_CYCLES: - logger.debug( - 'pair_trawl(%s, %s): max sync block %d cycles < %d required', - mmsi_a, mmsi_b, max_block_cycles, MIN_SYNC_CYCLES, - ) - return _default_result(mmsi_b) + # ── Step 3: tier 분류 — 가장 높은 tier 채택 ────────────── + strong_block = _max_sync_block(merged['synced_strong']) + probable_block = _max_sync_block(merged['synced_probable']) + strong_total = int(merged['synced_strong'].sum()) + probable_total = int(merged['synced_probable'].sum()) + strong_ratio = strong_total / total_aligned if total_aligned else 0.0 + probable_ratio = probable_total / total_aligned if total_aligned else 0.0 + tier: Optional[str] = None + if strong_block >= MIN_SYNC_CYCLES: + tier = 'STRONG' + used_col = 'synced_strong' + max_block_cycles = strong_block + elif probable_block >= PROBABLE_MIN_BLOCK_CYCLES and probable_ratio >= PROBABLE_MIN_SYNC_RATIO: + tier = 'PROBABLE' + used_col = 'synced_probable' + max_block_cycles = probable_block + elif probable_block >= SUSPECT_MIN_BLOCK_CYCLES and probable_ratio >= SUSPECT_MIN_SYNC_RATIO: + tier = 'SUSPECT' + used_col = 'synced_probable' + max_block_cycles = probable_block + else: + REJECT_COUNTERS['no_sync_at_any_tier'] += 1 + logger.debug( + 'pair_trawl(%s, %s): no tier — strong_block=%d probable_block=%d probable_ratio=%.2f', + mmsi_a, mmsi_b, strong_block, probable_block, probable_ratio, + ) + return _default_result(mmsi_b, 'no_sync_at_any_tier') + + merged['synced'] = merged[used_col] total_synced = int(merged['synced'].sum()) sync_duration_min = total_synced * CYCLE_INTERVAL_MIN max_sync_block_min = max_block_cycles * CYCLE_INTERVAL_MIN @@ -267,15 +331,17 @@ def detect_pair_trawl( bonus = 15; pair_type = 'TRANSSHIP_LIKE' logger.info( - 'pair_trawl(%s, %s): detected — sync=%.0fmin max_block=%.0fmin ' + 'pair_trawl(%s, %s): %s — sync=%.0fmin max_block=%.0fmin ' 'sep=%.3fnm confidence=%.3f bonus=%d type=%s g_codes=%s', - mmsi_a, mmsi_b, + mmsi_a, mmsi_b, tier, sync_duration_min, max_sync_block_min, mean_separation_nm, confidence, bonus, pair_type, g_codes, ) return { 'pair_detected': True, + 'tier': tier, + 'reject_reason': '', 'sync_duration_min': round(sync_duration_min, 1), 'max_sync_block_min': round(max_sync_block_min, 1), 'mean_separation_nm': round(mean_separation_nm, 4), @@ -340,6 +406,16 @@ def _ensure_sog_cog(df: pd.DataFrame) -> pd.DataFrame: return df +def _pair_join_key(df: pd.DataFrame) -> str: + """두 선박 궤적 inner-join 시 사용할 시간 키. + + vessel_store._tracks 는 raw AIS timestamp(ms 단위)를 사용하므로 두 선박 간 + 동일 timestamp 가 우연히 일치할 확률이 거의 0 → inner join 결과 빈약. + 5분 리샘플 단위 time_bucket 컬럼이 있으면 그것을 사용. + """ + return 'time_bucket' if 'time_bucket' in df.columns else 'timestamp' + + def _trajectory_similarity( df_a: pd.DataFrame, df_b: pd.DataFrame, min_samples: int = 6, ) -> tuple[float, int, dict]: @@ -352,18 +428,20 @@ def _trajectory_similarity( return 0.0, 0, {} df_a = _ensure_sog_cog(df_a) df_b = _ensure_sog_cog(df_b) - cols = {'timestamp', 'lat', 'lon', 'sog', 'cog'} + join_key = _pair_join_key(df_a) + cols = {join_key, 'lat', 'lon', 'sog', 'cog'} if cols - set(df_a.columns) or cols - set(df_b.columns): return 0.0, 0, {} try: - a = df_a[['timestamp', 'lat', 'lon', 'sog', 'cog']].copy() - b = df_b[['timestamp', 'lat', 'lon', 'sog', 'cog']].copy() - a['timestamp'] = pd.to_datetime(a['timestamp']) - b['timestamp'] = pd.to_datetime(b['timestamp']) + a = df_a[[join_key, 'lat', 'lon', 'sog', 'cog']].copy() + b = df_b[[join_key, 'lat', 'lon', 'sog', 'cog']].copy() + # 같은 join_key 에 다수 샘플이 있을 수 있으므로 평균값으로 bucketize + a = a.groupby(join_key, as_index=False).mean(numeric_only=True) + b = b.groupby(join_key, as_index=False).mean(numeric_only=True) m = pd.merge( a.rename(columns={'lat': 'la', 'lon': 'oa', 'sog': 'sa', 'cog': 'ca'}), b.rename(columns={'lat': 'lb', 'lon': 'ob', 'sog': 'sb', 'cog': 'cb'}), - on='timestamp', how='inner', + on=join_key, how='inner', ) except Exception: return 0.0, 0, {} diff --git a/prediction/scheduler.py b/prediction/scheduler.py index 74cfaae..001165d 100644 --- a/prediction/scheduler.py +++ b/prediction/scheduler.py @@ -241,6 +241,9 @@ def run_analysis_cycle(): pt_sub_registered=pt_sub_registered, min_common_samples=4, ) + from algorithms.pair_trawl import REJECT_COUNTERS, reset_reject_counters + reset_reject_counters() + tier_counts = {'STRONG': 0, 'PROBABLE': 0, 'SUSPECT': 0} pt_det = 0; coop_det = 0 for cand in pair_candidates: ma, mb = cand['base_mmsi'], cand['target_mmsi'] @@ -253,6 +256,8 @@ def run_analysis_cycle(): ) if not result.get('pair_detected'): continue + tier = result.get('tier') or 'UNKNOWN' + tier_counts[tier] = tier_counts.get(tier, 0) + 1 pair_results[ma] = {**result, 'pair_mmsi': mb} pair_results[mb] = {**result, 'pair_mmsi': ma} if result.get('pair_type') == 'PT_REGISTERED': @@ -260,8 +265,14 @@ def run_analysis_cycle(): elif result.get('pair_type') == 'COOP_FISHING': coop_det += 1 logger.info( - 'pair detection: candidates=%d, detected=%d (pt=%d, coop=%d)', - len(pair_candidates), len(pair_results) // 2, pt_det, coop_det, + 'pair detection: candidates=%d, detected=%d ' + '(STRONG=%d PROBABLE=%d SUSPECT=%d, pt=%d coop=%d) ' + 'reject={empty=%d miss=%d insuf_align=%d no_sync=%d}', + len(pair_candidates), len(pair_results) // 2, + tier_counts['STRONG'], tier_counts['PROBABLE'], tier_counts['SUSPECT'], + pt_det, coop_det, + REJECT_COUNTERS['empty_df'], REJECT_COUNTERS['missing_columns'], + REJECT_COUNTERS['insufficient_aligned'], REJECT_COUNTERS['no_sync_at_any_tier'], ) except Exception as e: logger.warning('pair detection failed: %s', e) @@ -363,9 +374,14 @@ def run_analysis_cycle(): pair_result = pair_results.get(mmsi) if pair_result and not pair_result.get('pair_detected'): pair_result = None - # G-06 판정은 pair_type='PT_REGISTERED' 또는 엄격 sync 조건 만족일 때만 - if pair_result and pair_result.get('pair_type') not in ('PT_REGISTERED', 'TRANSSHIP_LIKE', 'COOP_FISHING', 'GENERIC'): - pair_result = None + # G-06 판정은 STRONG/PROBABLE tier + 유효 pair_type 만. SUSPECT 는 플래그만 유지. + if pair_result: + if pair_result.get('tier') not in ('STRONG', 'PROBABLE'): + pair_result = None + elif pair_result.get('pair_type') not in ( + 'PT_REGISTERED', 'TRANSSHIP_LIKE', 'COOP_FISHING', 'GENERIC', + ): + pair_result = None gear_episodes: list = [] gear_positions: list = []