refactor(prediction): pair_trawl sog/cog on-demand 계산 + 중국 MID 413/414 추가

- pair_trawl._ensure_sog_cog(): _trajectory_similarity 진입 시 sog/cog 없으면
  vessel_store._compute_sog_cog() 로 haversine 계산 (tracks + timestamp 만 있으면 OK)
- pool 을 vessel_store._tracks 전체(55k)로 원복: 한국 440xxx/러시아 273xxx 페어 탐색 가능
- base 필터 중국 MID 확장: 412 → 412/413/414 (본토/홍콩/마카오)
- df_targets groupby 우회 제거 (불필요한 결합)
This commit is contained in:
htlee 2026-04-16 09:29:38 +09:00
부모 6c08d831d0
커밋 f5374a5316
2개의 변경된 파일31개의 추가작업 그리고 14개의 파일을 삭제

파일 보기

@ -321,6 +321,25 @@ def _classify_role(
return 'UNKNOWN' return 'UNKNOWN'
def _ensure_sog_cog(df: pd.DataFrame) -> pd.DataFrame:
"""sog/cog 컬럼이 없으면 lat/lon/timestamp 로 haversine 계산하여 추가.
pair_trawl vessel_store._tracks(raw_sog 보유) 전체를 pool 받는다.
select_analysis_targets 결과에 의존하면 중국 412* 선박(한국 440xxx )
pool 에서 빠지므로, 필요 on-demand 계산한다.
"""
if 'sog' in df.columns and 'cog' in df.columns:
return df
if 'timestamp' not in df.columns or 'lat' not in df.columns or 'lon' not in df.columns:
return df
try:
# cache.vessel_store._compute_sog_cog 를 재사용
from cache.vessel_store import _compute_sog_cog
return _compute_sog_cog(df)
except Exception:
return df
def _trajectory_similarity( def _trajectory_similarity(
df_a: pd.DataFrame, df_b: pd.DataFrame, min_samples: int = 6, df_a: pd.DataFrame, df_b: pd.DataFrame, min_samples: int = 6,
) -> tuple[float, int, dict]: ) -> tuple[float, int, dict]:
@ -331,6 +350,8 @@ def _trajectory_similarity(
""" """
if df_a.empty or df_b.empty: if df_a.empty or df_b.empty:
return 0.0, 0, {} return 0.0, 0, {}
df_a = _ensure_sog_cog(df_a)
df_b = _ensure_sog_cog(df_b)
cols = {'timestamp', 'lat', 'lon', 'sog', 'cog'} cols = {'timestamp', 'lat', 'lon', 'sog', 'cog'}
if cols - set(df_a.columns) or cols - set(df_b.columns): if cols - set(df_a.columns) or cols - set(df_b.columns):
return 0.0, 0, {} return 0.0, 0, {}

파일 보기

@ -212,26 +212,22 @@ def run_analysis_cycle():
pt_sub_registered: set[str] = set() # TODO: fishery_code=PT-S 구분 pt_sub_registered: set[str] = set() # TODO: fishery_code=PT-S 구분
base_mmsis: set[str] = {c['mmsi'] for c in classifications} base_mmsis: set[str] = {c['mmsi'] for c in classifications}
base_mmsis |= pt_registered base_mmsis |= pt_registered
# pool_tracks: 중국 전체 8k+ 선박의 sog/cog 계산된 궤적 # pool 은 전체 24h 누적 tracks (중국 8k+ 한국/러시아 포함 55k).
# df_targets 는 select_analysis_targets() 에서 이미 전체 412* 에 sog/cog 계산 # sog/cog 미계산 상태여도 _trajectory_similarity 내부에서 on-demand 계산.
# (vessel_store._tracks 는 raw_sog 만 보유해 _trajectory_similarity 가 동작 안 함) pool_tracks = getattr(vessel_store, '_tracks', {}) or vessel_dfs
pool_tracks: dict = {}
try:
for mmsi_val, grp in df_targets.groupby('mmsi'):
pool_tracks[str(mmsi_val)] = grp.reset_index(drop=True)
except Exception as e:
logger.warning('pool_tracks 생성 실패, vessel_dfs fallback: %s', e)
pool_tracks = dict(vessel_dfs)
# 조업 속력대(1.5~5.0kn)에서 움직이는 모든 중국 선박을 base로 확장. # 조업 속력대(1.5~5.0kn)에서 움직이는 모든 중국 선박을 base로 확장.
# classifications 500척만으로는 bbox 기점이 부족해 실제 공조 페어를 놓침. # 중국 MID: 412(본토) / 413(홍콩) / 414(마카오)
for mmsi, df in pool_tracks.items(): for mmsi, df in pool_tracks.items():
if not mmsi.startswith('412'): if not (mmsi.startswith('412') or mmsi.startswith('413') or mmsi.startswith('414')):
continue continue
if df is None or df.empty or 'sog' not in df.columns: if df is None or df.empty:
continue
sog_col = 'sog' if 'sog' in df.columns else ('raw_sog' if 'raw_sog' in df.columns else None)
if sog_col is None:
continue continue
try: try:
mean_sog = float(df['sog'].tail(12).mean()) mean_sog = float(df[sog_col].tail(12).mean())
if 1.5 <= mean_sog <= 5.0: if 1.5 <= mean_sog <= 5.0:
base_mmsis.add(mmsi) base_mmsis.add(mmsi)
except Exception: except Exception: