- detect_gear_groups: vessel_dfs(분류 대상만) → vessel_store.get_all_latest_positions()(전체 14K선박) - build_all_group_snapshots: 동일하게 all_positions 기반으로 전환 - vessel_store: get_all_latest_positions() 메서드 추가 - 결과: 0 gear groups → 210 gear groups (GEAR_IN_ZONE 57, GEAR_OUT_ZONE 45)
371 lines
14 KiB
Python
371 lines
14 KiB
Python
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_STATIC_REFRESH_INTERVAL_MIN = 60
|
|
_PERMIT_REFRESH_INTERVAL_MIN = 30
|
|
_EARTH_RADIUS_NM = 3440.065
|
|
_MAX_REASONABLE_SOG = 30.0
|
|
_CHINESE_MMSI_PREFIX = '412'
|
|
|
|
|
|
def _compute_sog_cog(df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Compute SOG (knots) and COG (degrees) from consecutive lat/lon/timestamp points."""
|
|
df = df.sort_values(['mmsi', 'timestamp']).copy()
|
|
|
|
lat1 = np.radians(df['lat'].values[:-1])
|
|
lon1 = np.radians(df['lon'].values[:-1])
|
|
lat2 = np.radians(df['lat'].values[1:])
|
|
lon2 = np.radians(df['lon'].values[1:])
|
|
|
|
# Haversine distance (nautical miles)
|
|
dlat = lat2 - lat1
|
|
dlon = lon2 - lon1
|
|
a = np.sin(dlat / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2) ** 2
|
|
dist_nm = _EARTH_RADIUS_NM * 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
|
|
|
|
# Time difference (hours)
|
|
ts = df['timestamp'].values
|
|
dt_sec = (ts[1:] - ts[:-1]).astype('timedelta64[s]').astype(float)
|
|
dt_hours = dt_sec / 3600.0
|
|
dt_hours[dt_hours <= 0] = np.nan
|
|
|
|
# SOG = dist / time (knots)
|
|
computed_sog = dist_nm / dt_hours
|
|
computed_sog = np.clip(np.nan_to_num(computed_sog, nan=0.0), 0, _MAX_REASONABLE_SOG)
|
|
|
|
# COG = bearing (degrees)
|
|
x = np.sin(dlon) * np.cos(lat2)
|
|
y = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(dlon)
|
|
bearing = (np.degrees(np.arctan2(x, y)) + 360) % 360
|
|
|
|
# Append last value (copy from previous)
|
|
sog_arr = np.append(computed_sog, computed_sog[-1:] if len(computed_sog) > 0 else [0])
|
|
cog_arr = np.append(bearing, bearing[-1:] if len(bearing) > 0 else [0])
|
|
|
|
# Reset at MMSI boundaries
|
|
mmsi_vals = df['mmsi'].values
|
|
boundary = np.where(mmsi_vals[:-1] != mmsi_vals[1:])[0]
|
|
for idx in boundary:
|
|
sog_arr[idx + 1] = df['raw_sog'].iloc[idx + 1] if 'raw_sog' in df.columns else 0
|
|
cog_arr[idx + 1] = 0
|
|
|
|
# Where computed SOG is 0 or NaN, fall back to raw_sog
|
|
df['sog'] = sog_arr
|
|
if 'raw_sog' in df.columns:
|
|
mask = (df['sog'] == 0) | np.isnan(df['sog'])
|
|
df.loc[mask, 'sog'] = df.loc[mask, 'raw_sog'].fillna(0)
|
|
|
|
df['cog'] = cog_arr
|
|
return df
|
|
|
|
|
|
class VesselStore:
|
|
"""In-memory vessel trajectory store for Korean waters vessel data.
|
|
|
|
Maintains a 24-hour sliding window of all vessel tracks and supports
|
|
incremental 5-minute updates. Chinese vessel (MMSI 412*) filtering
|
|
is applied only at analysis target selection time.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self._tracks: dict[str, pd.DataFrame] = {}
|
|
self._last_bucket: Optional[datetime] = None
|
|
self._static_info: dict[str, dict] = {}
|
|
self._permit_set: set[str] = set()
|
|
self._static_refreshed_at: Optional[datetime] = None
|
|
self._permit_refreshed_at: Optional[datetime] = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Public load / update methods
|
|
# ------------------------------------------------------------------
|
|
|
|
def load_initial(self, hours: int = 24) -> None:
|
|
"""Load all Korean waters vessel data for the past N hours.
|
|
|
|
Fetches a bulk DataFrame from snpdb, groups by MMSI, and stores
|
|
each vessel's track separately. Also triggers static info and
|
|
permit registry refresh.
|
|
"""
|
|
from db import snpdb
|
|
|
|
logger.info('loading initial vessel tracks (last %dh)...', hours)
|
|
try:
|
|
df_all = snpdb.fetch_all_tracks(hours)
|
|
except Exception as e:
|
|
logger.error('fetch_all_tracks failed: %s', e)
|
|
return
|
|
|
|
if df_all.empty:
|
|
logger.warning('fetch_all_tracks returned empty DataFrame')
|
|
return
|
|
|
|
# Rename sog column to raw_sog to preserve original AIS-reported speed
|
|
if 'sog' in df_all.columns and 'raw_sog' not in df_all.columns:
|
|
df_all = df_all.rename(columns={'sog': 'raw_sog'})
|
|
|
|
self._tracks = {}
|
|
for mmsi, group in df_all.groupby('mmsi'):
|
|
self._tracks[str(mmsi)] = group.reset_index(drop=True)
|
|
|
|
# last_bucket 설정 — incremental fetch 시작점
|
|
if 'time_bucket' in df_all.columns and not df_all['time_bucket'].dropna().empty:
|
|
max_bucket = pd.to_datetime(df_all['time_bucket'].dropna()).max()
|
|
if hasattr(max_bucket, 'to_pydatetime'):
|
|
max_bucket = max_bucket.to_pydatetime()
|
|
if isinstance(max_bucket, datetime) and max_bucket.tzinfo is None:
|
|
max_bucket = max_bucket.replace(tzinfo=timezone.utc)
|
|
self._last_bucket = max_bucket
|
|
elif 'timestamp' in df_all.columns and not df_all['timestamp'].dropna().empty:
|
|
max_ts = pd.to_datetime(df_all['timestamp'].dropna()).max()
|
|
if hasattr(max_ts, 'to_pydatetime'):
|
|
max_ts = max_ts.to_pydatetime()
|
|
if isinstance(max_ts, datetime) and max_ts.tzinfo is None:
|
|
max_ts = max_ts.replace(tzinfo=timezone.utc)
|
|
self._last_bucket = max_ts
|
|
|
|
vessel_count = len(self._tracks)
|
|
point_count = sum(len(v) for v in self._tracks.values())
|
|
logger.info(
|
|
'initial load complete: %d vessels, %d total points, last_bucket=%s',
|
|
vessel_count,
|
|
point_count,
|
|
self._last_bucket,
|
|
)
|
|
|
|
self.refresh_static_info()
|
|
self.refresh_permit_registry()
|
|
|
|
def merge_incremental(self, df_new: pd.DataFrame) -> None:
|
|
"""Merge a new batch of vessel positions into the in-memory store.
|
|
|
|
Deduplicates by timestamp within each MMSI and updates _last_bucket.
|
|
"""
|
|
if df_new.empty:
|
|
logger.debug('merge_incremental called with empty DataFrame, skipping')
|
|
return
|
|
|
|
if 'sog' in df_new.columns and 'raw_sog' not in df_new.columns:
|
|
df_new = df_new.rename(columns={'sog': 'raw_sog'})
|
|
|
|
new_buckets: list[datetime] = []
|
|
|
|
for mmsi, group in df_new.groupby('mmsi'):
|
|
mmsi_str = str(mmsi)
|
|
if mmsi_str in self._tracks:
|
|
combined = pd.concat([self._tracks[mmsi_str], group], ignore_index=True)
|
|
combined = combined.drop_duplicates(subset=['timestamp'])
|
|
self._tracks[mmsi_str] = combined.reset_index(drop=True)
|
|
else:
|
|
self._tracks[mmsi_str] = group.reset_index(drop=True)
|
|
|
|
if 'time_bucket' in group.columns and not group['time_bucket'].empty:
|
|
bucket_vals = pd.to_datetime(group['time_bucket'].dropna())
|
|
if not bucket_vals.empty:
|
|
new_buckets.append(bucket_vals.max().to_pydatetime())
|
|
|
|
if new_buckets:
|
|
latest = max(new_buckets)
|
|
if isinstance(latest, datetime) and latest.tzinfo is None:
|
|
latest = latest.replace(tzinfo=timezone.utc)
|
|
if self._last_bucket is None or latest > self._last_bucket:
|
|
self._last_bucket = latest
|
|
|
|
logger.debug(
|
|
'incremental merge done: %d mmsis in batch, store has %d vessels',
|
|
df_new['mmsi'].nunique(),
|
|
len(self._tracks),
|
|
)
|
|
|
|
def evict_stale(self, hours: int = 24) -> None:
|
|
"""Remove track points older than N hours and evict empty MMSI entries."""
|
|
import datetime as _dt
|
|
|
|
now = datetime.now(timezone.utc)
|
|
cutoff_aware = now - _dt.timedelta(hours=hours)
|
|
cutoff_naive = cutoff_aware.replace(tzinfo=None)
|
|
|
|
before_total = sum(len(v) for v in self._tracks.values())
|
|
evicted_mmsis: list[str] = []
|
|
|
|
for mmsi in list(self._tracks.keys()):
|
|
df = self._tracks[mmsi]
|
|
ts_col = df['timestamp']
|
|
# Handle tz-aware and tz-naive timestamps uniformly
|
|
if hasattr(ts_col.dtype, 'tz') and ts_col.dtype.tz is not None:
|
|
mask = ts_col >= pd.Timestamp(cutoff_aware)
|
|
else:
|
|
mask = ts_col >= pd.Timestamp(cutoff_naive)
|
|
filtered = df[mask].reset_index(drop=True)
|
|
if filtered.empty:
|
|
del self._tracks[mmsi]
|
|
evicted_mmsis.append(mmsi)
|
|
else:
|
|
self._tracks[mmsi] = filtered
|
|
|
|
after_total = sum(len(v) for v in self._tracks.values())
|
|
logger.info(
|
|
'eviction complete: removed %d points, evicted %d mmsis (threshold=%dh)',
|
|
before_total - after_total,
|
|
len(evicted_mmsis),
|
|
hours,
|
|
)
|
|
|
|
def refresh_static_info(self) -> None:
|
|
"""Fetch vessel static info (type, name, dimensions) from snpdb.
|
|
|
|
Skips refresh if called within the last 60 minutes.
|
|
"""
|
|
now = datetime.now(timezone.utc)
|
|
if self._static_refreshed_at is not None:
|
|
elapsed_min = (now - self._static_refreshed_at).total_seconds() / 60
|
|
if elapsed_min < _STATIC_REFRESH_INTERVAL_MIN:
|
|
logger.debug(
|
|
'static info refresh skipped (%.1f min since last refresh)',
|
|
elapsed_min,
|
|
)
|
|
return
|
|
|
|
if not self._tracks:
|
|
logger.debug('no tracks in store, skipping static info refresh')
|
|
return
|
|
|
|
from db import snpdb
|
|
|
|
mmsi_list = list(self._tracks.keys())
|
|
try:
|
|
info = snpdb.fetch_static_info(mmsi_list)
|
|
self._static_info.update(info)
|
|
self._static_refreshed_at = now
|
|
logger.info('static info refreshed: %d vessels', len(info))
|
|
except Exception as e:
|
|
logger.error('fetch_static_info failed: %s', e)
|
|
|
|
def refresh_permit_registry(self) -> None:
|
|
"""Fetch permitted Chinese fishing vessel MMSIs from snpdb.
|
|
|
|
Skips refresh if called within the last 30 minutes.
|
|
"""
|
|
now = datetime.now(timezone.utc)
|
|
if self._permit_refreshed_at is not None:
|
|
elapsed_min = (now - self._permit_refreshed_at).total_seconds() / 60
|
|
if elapsed_min < _PERMIT_REFRESH_INTERVAL_MIN:
|
|
logger.debug(
|
|
'permit registry refresh skipped (%.1f min since last refresh)',
|
|
elapsed_min,
|
|
)
|
|
return
|
|
|
|
from db import snpdb
|
|
|
|
try:
|
|
mmsis = snpdb.fetch_permit_mmsis()
|
|
self._permit_set = set(mmsis)
|
|
self._permit_refreshed_at = now
|
|
logger.info('permit registry refreshed: %d permitted vessels', len(self._permit_set))
|
|
except Exception as e:
|
|
logger.error('fetch_permit_mmsis failed: %s', e)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Analysis target selection
|
|
# ------------------------------------------------------------------
|
|
|
|
def select_analysis_targets(self) -> pd.DataFrame:
|
|
"""Build a combined DataFrame of Chinese vessel tracks with computed SOG/COG.
|
|
|
|
Filters to MMSI starting with '412', computes SOG and COG from
|
|
consecutive lat/lon/timestamp pairs using the haversine formula,
|
|
and falls back to raw_sog where computed values are zero or NaN.
|
|
|
|
Returns:
|
|
DataFrame with columns: mmsi, timestamp, lat, lon, sog, cog
|
|
"""
|
|
chinese_mmsis = [m for m in self._tracks if m.startswith(_CHINESE_MMSI_PREFIX)]
|
|
if not chinese_mmsis:
|
|
logger.info('no Chinese vessels (412*) found in store')
|
|
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog'])
|
|
|
|
frames = [self._tracks[m] for m in chinese_mmsis]
|
|
combined = pd.concat(frames, ignore_index=True)
|
|
|
|
required_cols = {'mmsi', 'timestamp', 'lat', 'lon'}
|
|
missing = required_cols - set(combined.columns)
|
|
if missing:
|
|
logger.error('combined DataFrame missing required columns: %s', missing)
|
|
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog'])
|
|
|
|
result = _compute_sog_cog(combined)
|
|
|
|
output_cols = ['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog']
|
|
available = [c for c in output_cols if c in result.columns]
|
|
return result[available].reset_index(drop=True)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Lookup helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def is_permitted(self, mmsi: str) -> bool:
|
|
"""Return True if the given MMSI is in the permitted Chinese fishing vessel registry."""
|
|
return mmsi in self._permit_set
|
|
|
|
def get_vessel_info(self, mmsi: str) -> dict:
|
|
"""Return static vessel info dict for the given MMSI, or empty dict if not found."""
|
|
return self._static_info.get(mmsi, {})
|
|
|
|
def get_all_latest_positions(self) -> dict[str, dict]:
|
|
"""모든 선박의 최신 위치 반환. {mmsi: {lat, lon, sog, cog, timestamp, name}}"""
|
|
result: dict[str, dict] = {}
|
|
for mmsi, df in self._tracks.items():
|
|
if df is None or len(df) == 0:
|
|
continue
|
|
last = df.iloc[-1]
|
|
info = self._static_info.get(mmsi, {})
|
|
result[mmsi] = {
|
|
'lat': float(last['lat']),
|
|
'lon': float(last['lon']),
|
|
'sog': float(last.get('sog', 0) or 0),
|
|
'cog': float(last.get('cog', 0) or 0),
|
|
'timestamp': last.get('timestamp'),
|
|
'name': info.get('name', ''),
|
|
}
|
|
return result
|
|
|
|
# ------------------------------------------------------------------
|
|
# Properties
|
|
# ------------------------------------------------------------------
|
|
|
|
@property
|
|
def last_bucket(self) -> Optional[datetime]:
|
|
"""Return the latest time bucket seen across all merged incremental batches."""
|
|
return self._last_bucket
|
|
|
|
# ------------------------------------------------------------------
|
|
# Diagnostics
|
|
# ------------------------------------------------------------------
|
|
|
|
def stats(self) -> dict:
|
|
"""Return store statistics for health/status reporting."""
|
|
total_points = sum(len(v) for v in self._tracks.values())
|
|
chinese_count = sum(1 for m in self._tracks if m.startswith(_CHINESE_MMSI_PREFIX))
|
|
|
|
# Rough memory estimate: each row ~200 bytes across columns
|
|
memory_mb = round((total_points * 200) / (1024 * 1024), 2)
|
|
|
|
return {
|
|
'vessels': len(self._tracks),
|
|
'points': total_points,
|
|
'memory_mb': memory_mb,
|
|
'last_bucket': self._last_bucket.isoformat() if self._last_bucket else None,
|
|
'targets': chinese_count,
|
|
'permitted': len(self._permit_set),
|
|
}
|
|
|
|
|
|
# Module-level singleton
|
|
vessel_store = VesselStore()
|