import pandas as pd from collections import defaultdict from pipeline.constants import KR_BOUNDS, MAX_SOG_KNOTS, MIN_TRAJ_POINTS class AISPreprocessor: """Delete-Supplement-Update (Yan et al. 2022)""" def __init__(self): self.stats = defaultdict(int) def run(self, df: pd.DataFrame) -> pd.DataFrame: original = len(df) required = ['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog'] missing = [c for c in required if c not in df.columns] if missing: raise ValueError(f"필수 컬럼 누락: {missing}") df = df.copy() df['timestamp'] = pd.to_datetime(df['timestamp']) valid_mmsi = df['mmsi'].astype(str).str.match(r'^\d{9}$') df = df[valid_mmsi] self.stats['invalid_mmsi'] += original - len(df) df = df[(df['lat'].between(-90, 90)) & (df['lon'].between(-180, 180))] df = df[ df['lat'].between(KR_BOUNDS['lat_min'], KR_BOUNDS['lat_max']) & df['lon'].between(KR_BOUNDS['lon_min'], KR_BOUNDS['lon_max']) ] df = df.sort_values(['mmsi', 'timestamp']) df['sog'] = df.groupby('mmsi')['sog'].transform( lambda x: x.where( x.between(0, MAX_SOG_KNOTS), x.rolling(3, center=True, min_periods=1).mean(), ) ) df = df[(df['sog'] >= 0) & (df['sog'] <= MAX_SOG_KNOTS)] counts = df.groupby('mmsi').size() valid_mmsi_list = counts[counts >= MIN_TRAJ_POINTS].index df = df[df['mmsi'].isin(valid_mmsi_list)] df = df.drop_duplicates(subset=['mmsi', 'timestamp']) self.stats['final_records'] = len(df) self.stats['retention_pct'] = round(len(df) / max(original, 1) * 100, 2) return df.reset_index(drop=True)