import pandas as pd from typing import Dict, Tuple class VesselTypeClassifier: """ Rule-based scoring classifier for fishing vessel types. Scoring: for each feature in a type's profile, if the value falls within the defined range a distance-based score is added (closer to the range centre = higher score). Values outside the range incur a penalty. Returns (vessel_type, confidence). TRAWL — trawling speed 2.5–4.5 kt, high COG variation PURSE — purse-seine speed 3–5 kt, circular COG pattern LONGLINE — longline speed 0.5–2 kt, low COG variation, long fishing runs TRAP — trap/pot speed ~0 kt, many stationary events, short range """ PROFILES: Dict[str, Dict[str, Tuple[float, float]]] = { 'TRAWL': { 'sog_fishing_mean': (2.5, 4.5), 'cog_change_mean': (0.15, 9.9), 'fishing_pct': (0.3, 0.7), 'fishing_run_mean': (5, 50), 'stationary_events': (0, 5), }, 'PURSE': { 'sog_fishing_mean': (3.0, 5.0), 'cog_circularity': (0.2, 1.0), 'fishing_pct': (0.1, 0.5), 'fishing_run_mean': (3, 30), 'stationary_events': (0, 3), }, 'LONGLINE': { 'sog_fishing_mean': (0.5, 2.5), 'cog_change_mean': (0.0, 0.15), 'fishing_pct': (0.4, 0.9), 'fishing_run_mean': (20, 999), 'stationary_events': (0, 10), }, 'TRAP': { 'sog_fishing_mean': (0.0, 2.0), 'stationary_pct': (0.2, 0.8), 'stationary_events': (5, 999), 'fishing_run_mean': (1, 10), 'total_distance_km': (0, 100), }, } def classify(self, features: Dict) -> Tuple[str, float]: """Classify a vessel from its feature dict. Returns: (vessel_type, confidence) where confidence is in [0, 1]. """ if not features: return 'UNKNOWN', 0.0 scores: Dict[str, float] = {} for vtype, profile in self.PROFILES.items(): score = 0.0 matched = 0 for feat_name, (lo, hi) in profile.items(): val = features.get(feat_name) if val is None: continue matched += 1 if lo <= val <= hi: mid = (lo + hi) / 2 span = (hi - lo) / 2 if (hi - lo) > 0 else 1 score += max(0.0, 1 - abs(val - mid) / span) else: overshoot = min(abs(val - lo), abs(val - hi)) score -= min(0.5, overshoot / (hi - lo + 1e-9)) scores[vtype] = score / matched if matched > 0 else 0.0 best_type = max(scores, key=lambda k: scores[k]) total = sum(max(v, 0.0) for v in scores.values()) confidence = scores[best_type] / total if total > 0 else 0.0 return best_type, round(confidence, 3) def get_season(ts: pd.Timestamp) -> str: """Return the Northern-Hemisphere season for a timestamp. Reference: paper 12 seasonal activity analysis (Chinese EEZ). Chinese fishing ban period: Yellow Sea / East China Sea May–Sep, South China Sea May–Aug. """ m = ts.month if m in [3, 4, 5]: return 'SPRING' elif m in [6, 7, 8]: return 'SUMMER' elif m in [9, 10, 11]: return 'FALL' else: return 'WINTER'