import logging import pandas as pd from pipeline.preprocessor import AISPreprocessor from pipeline.behavior import BehaviorDetector from pipeline.resampler import TrajectoryResampler from pipeline.features import FeatureExtractor from pipeline.classifier import VesselTypeClassifier, get_season from pipeline.clusterer import EnhancedBIRCHClusterer from pipeline.constants import RESAMPLE_INTERVAL_MIN logger = logging.getLogger(__name__) class ChineseFishingVesselPipeline: """7-step pipeline for classifying Chinese fishing vessel activity types. Steps: 1. AIS preprocessing (Yan et al. 2022) 2. Behaviour-state detection (speed-based 3-class) 3. Trajectory resampling (Yan, Yang et al. — 4-minute interval) 4. Feature vector extraction (paper 12) 5. Vessel-type classification (rule-based scoring) 6. Enhanced BIRCH trajectory clustering (Yan, Yang et al.) 7. Seasonal activity tagging (paper 12) """ def __init__(self) -> None: self.preprocessor = AISPreprocessor() self.detector = BehaviorDetector() self.resampler = TrajectoryResampler(RESAMPLE_INTERVAL_MIN) self.extractor = FeatureExtractor() self.classifier = VesselTypeClassifier() self.clusterer = EnhancedBIRCHClusterer() def run( self, df_raw: pd.DataFrame ) -> tuple[list[dict], dict[str, pd.DataFrame]]: """Run the 7-step pipeline. Args: df_raw: raw AIS DataFrame with columns mmsi, timestamp, lat, lon, sog, cog. Returns: (results, vessel_dfs) where: - results is a list of classification dicts, each containing: mmsi, vessel_type, confidence, fishing_pct, cluster_id, season, n_points, features. - vessel_dfs is a mapping of mmsi -> resampled trajectory DataFrame. """ # Step 1: preprocess df = self.preprocessor.run(df_raw) if len(df) == 0: logger.warning('pipeline: no rows after preprocessing') return [], {} # Step 2: behaviour detection df = self.detector.detect(df) # Steps 3–5: per-vessel processing vessel_dfs: dict[str, pd.DataFrame] = {} results: list[dict] = [] for mmsi, df_v in df.groupby('mmsi'): df_resampled = self.resampler.resample(df_v) vessel_dfs[mmsi] = df_resampled features = self.extractor.extract(df_resampled) vtype, confidence = self.classifier.classify(features) fishing_pct = BehaviorDetector.compute_fishing_ratio(df_resampled) season = get_season(df_v['timestamp'].iloc[len(df_v) // 2]) results.append({ 'mmsi': mmsi, 'vessel_type': vtype, 'confidence': confidence, 'fishing_pct': fishing_pct, 'season': season, 'n_points': len(df_resampled), 'features': features, }) # Step 6: BIRCH clustering cluster_map = self.clusterer.fit_predict(vessel_dfs) for r in results: r['cluster_id'] = cluster_map.get(r['mmsi'], -1) logger.info( 'pipeline complete: %d vessels, types=%s', len(results), {r['vessel_type'] for r in results}, ) return results, vessel_dfs