From f5374a5316bec979703eea363e1005df74405687 Mon Sep 17 00:00:00 2001 From: htlee Date: Thu, 16 Apr 2026 09:29:38 +0900 Subject: [PATCH] =?UTF-8?q?refactor(prediction):=20pair=5Ftrawl=20sog/cog?= =?UTF-8?q?=20on-demand=20=EA=B3=84=EC=82=B0=20+=20=EC=A4=91=EA=B5=AD=20MI?= =?UTF-8?q?D=20413/414=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - pair_trawl._ensure_sog_cog(): _trajectory_similarity 진입 시 sog/cog 없으면 vessel_store._compute_sog_cog() 로 haversine 계산 (tracks + timestamp 만 있으면 OK) - pool 을 vessel_store._tracks 전체(55k)로 원복: 한국 440xxx/러시아 273xxx 페어 탐색 가능 - base 필터 중국 MID 확장: 412 → 412/413/414 (본토/홍콩/마카오) - df_targets groupby 우회 제거 (불필요한 결합) --- prediction/algorithms/pair_trawl.py | 21 +++++++++++++++++++++ prediction/scheduler.py | 24 ++++++++++-------------- 2 files changed, 31 insertions(+), 14 deletions(-) diff --git a/prediction/algorithms/pair_trawl.py b/prediction/algorithms/pair_trawl.py index d2e21d7..d1e628a 100644 --- a/prediction/algorithms/pair_trawl.py +++ b/prediction/algorithms/pair_trawl.py @@ -321,6 +321,25 @@ def _classify_role( return 'UNKNOWN' +def _ensure_sog_cog(df: pd.DataFrame) -> pd.DataFrame: + """sog/cog 컬럼이 없으면 lat/lon/timestamp 로 haversine 계산하여 추가. + + pair_trawl 은 vessel_store._tracks(raw_sog 만 보유) 전체를 pool 로 받는다. + select_analysis_targets 결과에 의존하면 중국 412* 외 선박(한국 440xxx 등)은 + pool 에서 빠지므로, 필요 시 on-demand 로 계산한다. + """ + if 'sog' in df.columns and 'cog' in df.columns: + return df + if 'timestamp' not in df.columns or 'lat' not in df.columns or 'lon' not in df.columns: + return df + try: + # cache.vessel_store._compute_sog_cog 를 재사용 + from cache.vessel_store import _compute_sog_cog + return _compute_sog_cog(df) + except Exception: + return df + + def _trajectory_similarity( df_a: pd.DataFrame, df_b: pd.DataFrame, min_samples: int = 6, ) -> tuple[float, int, dict]: @@ -331,6 +350,8 @@ def _trajectory_similarity( """ if df_a.empty or df_b.empty: return 0.0, 0, {} + df_a = _ensure_sog_cog(df_a) + df_b = _ensure_sog_cog(df_b) cols = {'timestamp', 'lat', 'lon', 'sog', 'cog'} if cols - set(df_a.columns) or cols - set(df_b.columns): return 0.0, 0, {} diff --git a/prediction/scheduler.py b/prediction/scheduler.py index 7032046..74cfaae 100644 --- a/prediction/scheduler.py +++ b/prediction/scheduler.py @@ -212,26 +212,22 @@ def run_analysis_cycle(): pt_sub_registered: set[str] = set() # TODO: fishery_code=PT-S 구분 base_mmsis: set[str] = {c['mmsi'] for c in classifications} base_mmsis |= pt_registered - # pool_tracks: 중국 전체 8k+ 선박의 sog/cog 계산된 궤적 - # df_targets 는 select_analysis_targets() 에서 이미 전체 412* 에 sog/cog 계산 - # (vessel_store._tracks 는 raw_sog 만 보유해 _trajectory_similarity 가 동작 안 함) - pool_tracks: dict = {} - try: - for mmsi_val, grp in df_targets.groupby('mmsi'): - pool_tracks[str(mmsi_val)] = grp.reset_index(drop=True) - except Exception as e: - logger.warning('pool_tracks 생성 실패, vessel_dfs fallback: %s', e) - pool_tracks = dict(vessel_dfs) + # pool 은 전체 24h 누적 tracks (중국 8k+ 한국/러시아 포함 55k). + # sog/cog 미계산 상태여도 _trajectory_similarity 내부에서 on-demand 계산. + pool_tracks = getattr(vessel_store, '_tracks', {}) or vessel_dfs # 조업 속력대(1.5~5.0kn)에서 움직이는 모든 중국 선박을 base로 확장. - # classifications 500척만으로는 bbox 기점이 부족해 실제 공조 페어를 놓침. + # 중국 MID: 412(본토) / 413(홍콩) / 414(마카오) for mmsi, df in pool_tracks.items(): - if not mmsi.startswith('412'): + if not (mmsi.startswith('412') or mmsi.startswith('413') or mmsi.startswith('414')): continue - if df is None or df.empty or 'sog' not in df.columns: + if df is None or df.empty: + continue + sog_col = 'sog' if 'sog' in df.columns else ('raw_sog' if 'raw_sog' in df.columns else None) + if sog_col is None: continue try: - mean_sog = float(df['sog'].tail(12).mean()) + mean_sog = float(df[sog_col].tail(12).mean()) if 1.5 <= mean_sog <= 5.0: base_mmsis.add(mmsi) except Exception: