import logging from datetime import datetime, timezone from typing import Optional import numpy as np import pandas as pd logger = logging.getLogger(__name__) _STATIC_REFRESH_INTERVAL_MIN = 60 _PERMIT_REFRESH_INTERVAL_MIN = 30 _EARTH_RADIUS_NM = 3440.065 _MAX_REASONABLE_SOG = 30.0 _CHINESE_MMSI_PREFIX = '412' def _compute_sog_cog(df: pd.DataFrame) -> pd.DataFrame: """Compute SOG (knots) and COG (degrees) from consecutive lat/lon/timestamp points.""" df = df.sort_values(['mmsi', 'timestamp']).copy() lat1 = np.radians(df['lat'].values[:-1]) lon1 = np.radians(df['lon'].values[:-1]) lat2 = np.radians(df['lat'].values[1:]) lon2 = np.radians(df['lon'].values[1:]) # Haversine distance (nautical miles) dlat = lat2 - lat1 dlon = lon2 - lon1 a = np.sin(dlat / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2) ** 2 dist_nm = _EARTH_RADIUS_NM * 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a)) # Time difference (hours) ts = df['timestamp'].values dt_sec = (ts[1:] - ts[:-1]).astype('timedelta64[s]').astype(float) dt_hours = dt_sec / 3600.0 dt_hours[dt_hours <= 0] = np.nan # SOG = dist / time (knots) computed_sog = dist_nm / dt_hours computed_sog = np.clip(np.nan_to_num(computed_sog, nan=0.0), 0, _MAX_REASONABLE_SOG) # COG = bearing (degrees) x = np.sin(dlon) * np.cos(lat2) y = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(dlon) bearing = (np.degrees(np.arctan2(x, y)) + 360) % 360 # Append last value (copy from previous) sog_arr = np.append(computed_sog, computed_sog[-1:] if len(computed_sog) > 0 else [0]) cog_arr = np.append(bearing, bearing[-1:] if len(bearing) > 0 else [0]) # Reset at MMSI boundaries mmsi_vals = df['mmsi'].values boundary = np.where(mmsi_vals[:-1] != mmsi_vals[1:])[0] for idx in boundary: sog_arr[idx + 1] = df['raw_sog'].iloc[idx + 1] if 'raw_sog' in df.columns else 0 cog_arr[idx + 1] = 0 # Where computed SOG is 0 or NaN, fall back to raw_sog df['sog'] = sog_arr if 'raw_sog' in df.columns: mask = (df['sog'] == 0) | np.isnan(df['sog']) df.loc[mask, 'sog'] = df.loc[mask, 'raw_sog'].fillna(0) df['cog'] = cog_arr return df class VesselStore: """In-memory vessel trajectory store for Korean waters vessel data. Maintains a 24-hour sliding window of all vessel tracks and supports incremental 5-minute updates. Chinese vessel (MMSI 412*) filtering is applied only at analysis target selection time. """ def __init__(self) -> None: self._tracks: dict[str, pd.DataFrame] = {} self._last_bucket: Optional[datetime] = None self._static_info: dict[str, dict] = {} self._permit_set: set[str] = set() self._static_refreshed_at: Optional[datetime] = None self._permit_refreshed_at: Optional[datetime] = None # ------------------------------------------------------------------ # Public load / update methods # ------------------------------------------------------------------ def load_initial(self, hours: int = 24) -> None: """Load all Korean waters vessel data for the past N hours. Fetches a bulk DataFrame from snpdb, groups by MMSI, and stores each vessel's track separately. Also triggers static info and permit registry refresh. """ from db import snpdb logger.info('loading initial vessel tracks (last %dh)...', hours) try: df_all = snpdb.fetch_all_tracks(hours) except Exception as e: logger.error('fetch_all_tracks failed: %s', e) return if df_all.empty: logger.warning('fetch_all_tracks returned empty DataFrame') return # Rename sog column to raw_sog to preserve original AIS-reported speed if 'sog' in df_all.columns and 'raw_sog' not in df_all.columns: df_all = df_all.rename(columns={'sog': 'raw_sog'}) self._tracks = {} for mmsi, group in df_all.groupby('mmsi'): self._tracks[str(mmsi)] = group.reset_index(drop=True) # last_bucket 설정 — incremental fetch 시작점 if 'time_bucket' in df_all.columns and not df_all['time_bucket'].dropna().empty: max_bucket = pd.to_datetime(df_all['time_bucket'].dropna()).max() if hasattr(max_bucket, 'to_pydatetime'): max_bucket = max_bucket.to_pydatetime() if isinstance(max_bucket, datetime) and max_bucket.tzinfo is None: max_bucket = max_bucket.replace(tzinfo=timezone.utc) self._last_bucket = max_bucket elif 'timestamp' in df_all.columns and not df_all['timestamp'].dropna().empty: max_ts = pd.to_datetime(df_all['timestamp'].dropna()).max() if hasattr(max_ts, 'to_pydatetime'): max_ts = max_ts.to_pydatetime() if isinstance(max_ts, datetime) and max_ts.tzinfo is None: max_ts = max_ts.replace(tzinfo=timezone.utc) self._last_bucket = max_ts vessel_count = len(self._tracks) point_count = sum(len(v) for v in self._tracks.values()) logger.info( 'initial load complete: %d vessels, %d total points, last_bucket=%s', vessel_count, point_count, self._last_bucket, ) self.refresh_static_info() self.refresh_permit_registry() def merge_incremental(self, df_new: pd.DataFrame) -> None: """Merge a new batch of vessel positions into the in-memory store. Deduplicates by timestamp within each MMSI and updates _last_bucket. """ if df_new.empty: logger.debug('merge_incremental called with empty DataFrame, skipping') return if 'sog' in df_new.columns and 'raw_sog' not in df_new.columns: df_new = df_new.rename(columns={'sog': 'raw_sog'}) new_buckets: list[datetime] = [] for mmsi, group in df_new.groupby('mmsi'): mmsi_str = str(mmsi) if mmsi_str in self._tracks: combined = pd.concat([self._tracks[mmsi_str], group], ignore_index=True) combined = combined.drop_duplicates(subset=['timestamp']) self._tracks[mmsi_str] = combined.reset_index(drop=True) else: self._tracks[mmsi_str] = group.reset_index(drop=True) if 'time_bucket' in group.columns and not group['time_bucket'].empty: bucket_vals = pd.to_datetime(group['time_bucket'].dropna()) if not bucket_vals.empty: new_buckets.append(bucket_vals.max().to_pydatetime()) if new_buckets: latest = max(new_buckets) if isinstance(latest, datetime) and latest.tzinfo is None: latest = latest.replace(tzinfo=timezone.utc) if self._last_bucket is None or latest > self._last_bucket: self._last_bucket = latest logger.debug( 'incremental merge done: %d mmsis in batch, store has %d vessels', df_new['mmsi'].nunique(), len(self._tracks), ) def evict_stale(self, hours: int = 24) -> None: """Remove track points older than N hours and evict empty MMSI entries.""" import datetime as _dt now = datetime.now(timezone.utc) cutoff_aware = now - _dt.timedelta(hours=hours) cutoff_naive = cutoff_aware.replace(tzinfo=None) before_total = sum(len(v) for v in self._tracks.values()) evicted_mmsis: list[str] = [] for mmsi in list(self._tracks.keys()): df = self._tracks[mmsi] ts_col = df['timestamp'] # Handle tz-aware and tz-naive timestamps uniformly if hasattr(ts_col.dtype, 'tz') and ts_col.dtype.tz is not None: mask = ts_col >= pd.Timestamp(cutoff_aware) else: mask = ts_col >= pd.Timestamp(cutoff_naive) filtered = df[mask].reset_index(drop=True) if filtered.empty: del self._tracks[mmsi] evicted_mmsis.append(mmsi) else: self._tracks[mmsi] = filtered after_total = sum(len(v) for v in self._tracks.values()) logger.info( 'eviction complete: removed %d points, evicted %d mmsis (threshold=%dh)', before_total - after_total, len(evicted_mmsis), hours, ) def refresh_static_info(self) -> None: """Fetch vessel static info (type, name, dimensions) from snpdb. Skips refresh if called within the last 60 minutes. """ now = datetime.now(timezone.utc) if self._static_refreshed_at is not None: elapsed_min = (now - self._static_refreshed_at).total_seconds() / 60 if elapsed_min < _STATIC_REFRESH_INTERVAL_MIN: logger.debug( 'static info refresh skipped (%.1f min since last refresh)', elapsed_min, ) return if not self._tracks: logger.debug('no tracks in store, skipping static info refresh') return from db import snpdb mmsi_list = list(self._tracks.keys()) try: info = snpdb.fetch_static_info(mmsi_list) self._static_info.update(info) self._static_refreshed_at = now logger.info('static info refreshed: %d vessels', len(info)) except Exception as e: logger.error('fetch_static_info failed: %s', e) def refresh_permit_registry(self) -> None: """Fetch permitted Chinese fishing vessel MMSIs from snpdb. Skips refresh if called within the last 30 minutes. """ now = datetime.now(timezone.utc) if self._permit_refreshed_at is not None: elapsed_min = (now - self._permit_refreshed_at).total_seconds() / 60 if elapsed_min < _PERMIT_REFRESH_INTERVAL_MIN: logger.debug( 'permit registry refresh skipped (%.1f min since last refresh)', elapsed_min, ) return from db import snpdb try: mmsis = snpdb.fetch_permit_mmsis() self._permit_set = set(mmsis) self._permit_refreshed_at = now logger.info('permit registry refreshed: %d permitted vessels', len(self._permit_set)) except Exception as e: logger.error('fetch_permit_mmsis failed: %s', e) # ------------------------------------------------------------------ # Analysis target selection # ------------------------------------------------------------------ def select_analysis_targets(self) -> pd.DataFrame: """Build a combined DataFrame of Chinese vessel tracks with computed SOG/COG. Filters to MMSI starting with '412', computes SOG and COG from consecutive lat/lon/timestamp pairs using the haversine formula, and falls back to raw_sog where computed values are zero or NaN. Returns: DataFrame with columns: mmsi, timestamp, lat, lon, sog, cog """ chinese_mmsis = [m for m in self._tracks if m.startswith(_CHINESE_MMSI_PREFIX)] if not chinese_mmsis: logger.info('no Chinese vessels (412*) found in store') return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog']) frames = [self._tracks[m] for m in chinese_mmsis] combined = pd.concat(frames, ignore_index=True) required_cols = {'mmsi', 'timestamp', 'lat', 'lon'} missing = required_cols - set(combined.columns) if missing: logger.error('combined DataFrame missing required columns: %s', missing) return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog']) result = _compute_sog_cog(combined) output_cols = ['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog'] available = [c for c in output_cols if c in result.columns] return result[available].reset_index(drop=True) # ------------------------------------------------------------------ # Lookup helpers # ------------------------------------------------------------------ def is_permitted(self, mmsi: str) -> bool: """Return True if the given MMSI is in the permitted Chinese fishing vessel registry.""" return mmsi in self._permit_set def get_vessel_info(self, mmsi: str) -> dict: """Return static vessel info dict for the given MMSI, or empty dict if not found.""" return self._static_info.get(mmsi, {}) def get_all_latest_positions(self) -> dict[str, dict]: """모든 선박의 최신 위치 반환. {mmsi: {lat, lon, sog, cog, timestamp, name}} cog는 마지막 2점의 좌표로 bearing 계산.""" import math result: dict[str, dict] = {} for mmsi, df in self._tracks.items(): if df is None or len(df) == 0: continue last = df.iloc[-1] info = self._static_info.get(mmsi, {}) # COG: 마지막 2점으로 bearing 계산 cog = 0.0 if len(df) >= 2: prev = df.iloc[-2] lat1 = math.radians(float(prev['lat'])) lat2 = math.radians(float(last['lat'])) dlon = math.radians(float(last['lon']) - float(prev['lon'])) x = math.sin(dlon) * math.cos(lat2) y = math.cos(lat1) * math.sin(lat2) - math.sin(lat1) * math.cos(lat2) * math.cos(dlon) cog = (math.degrees(math.atan2(x, y)) + 360) % 360 result[mmsi] = { 'lat': float(last['lat']), 'lon': float(last['lon']), 'sog': float(last.get('sog', 0) or last.get('raw_sog', 0) or 0), 'cog': cog, 'timestamp': last.get('timestamp'), 'name': info.get('name', ''), } return result # ------------------------------------------------------------------ # Properties # ------------------------------------------------------------------ @property def last_bucket(self) -> Optional[datetime]: """Return the latest time bucket seen across all merged incremental batches.""" return self._last_bucket # ------------------------------------------------------------------ # Diagnostics # ------------------------------------------------------------------ def stats(self) -> dict: """Return store statistics for health/status reporting.""" total_points = sum(len(v) for v in self._tracks.values()) chinese_count = sum(1 for m in self._tracks if m.startswith(_CHINESE_MMSI_PREFIX)) # Rough memory estimate: each row ~200 bytes across columns memory_mb = round((total_points * 200) / (1024 * 1024), 2) return { 'vessels': len(self._tracks), 'points': total_points, 'memory_mb': memory_mb, 'last_bucket': self._last_bucket.isoformat() if self._last_bucket else None, 'targets': chinese_count, 'permitted': len(self._permit_set), } # Module-level singleton vessel_store = VesselStore()