kcg-monitoring/prediction/cache/vessel_store.py
htlee bbbc326e38 refactor: FleetClusterLayer 10파일 분리 + deck.gl 리플레이 기반 구축
FleetClusterLayer.tsx 2357줄 → 10개 파일 분리:
- fleetClusterTypes/Utils/Constants: 타입, 기하 함수, 모델 상수
- useFleetClusterGeoJson: 27개 useMemo GeoJSON 훅
- FleetClusterMapLayers: MapLibre Source/Layer JSX
- CorrelationPanel/HistoryReplayController: 패널 서브컴포넌트
- GearGroupSection/FleetGearListPanel: 좌측 목록 (DRY)
- FleetClusterLayer: 오케스트레이터 524줄

deck.gl + Zustand 리플레이 기반 (Phase 0~2):
- zustand 5.0.12, @deck.gl/geo-layers 9.2.11 설치
- gearReplayStore: Zustand + rAF 애니메이션 루프
- gearReplayPreprocess: TripsLayer 전처리 + cursor O(1) 보간
- useGearReplayLayers: deck.gl 레이어 빌더 (10fps 스로틀)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 07:44:07 +09:00

449 lines
18 KiB
Python

import logging
from datetime import datetime, timezone
from typing import Optional
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
_STATIC_REFRESH_INTERVAL_MIN = 60
_PERMIT_REFRESH_INTERVAL_MIN = 30
_EARTH_RADIUS_NM = 3440.065
_MAX_REASONABLE_SOG = 30.0
_CHINESE_MMSI_PREFIX = '412'
def _compute_sog_cog(df: pd.DataFrame) -> pd.DataFrame:
"""Compute SOG (knots) and COG (degrees) from consecutive lat/lon/timestamp points."""
df = df.sort_values(['mmsi', 'timestamp']).copy()
lat1 = np.radians(df['lat'].values[:-1])
lon1 = np.radians(df['lon'].values[:-1])
lat2 = np.radians(df['lat'].values[1:])
lon2 = np.radians(df['lon'].values[1:])
# Haversine distance (nautical miles)
dlat = lat2 - lat1
dlon = lon2 - lon1
a = np.sin(dlat / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2) ** 2
dist_nm = _EARTH_RADIUS_NM * 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
# Time difference (hours)
ts = df['timestamp'].values
dt_sec = (ts[1:] - ts[:-1]).astype('timedelta64[s]').astype(float)
dt_hours = dt_sec / 3600.0
dt_hours[dt_hours <= 0] = np.nan
# SOG = dist / time (knots)
computed_sog = dist_nm / dt_hours
computed_sog = np.clip(np.nan_to_num(computed_sog, nan=0.0), 0, _MAX_REASONABLE_SOG)
# COG = bearing (degrees)
x = np.sin(dlon) * np.cos(lat2)
y = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(dlon)
bearing = (np.degrees(np.arctan2(x, y)) + 360) % 360
# Append last value (copy from previous)
sog_arr = np.append(computed_sog, computed_sog[-1:] if len(computed_sog) > 0 else [0])
cog_arr = np.append(bearing, bearing[-1:] if len(bearing) > 0 else [0])
# Reset at MMSI boundaries
mmsi_vals = df['mmsi'].values
boundary = np.where(mmsi_vals[:-1] != mmsi_vals[1:])[0]
for idx in boundary:
sog_arr[idx + 1] = df['raw_sog'].iloc[idx + 1] if 'raw_sog' in df.columns else 0
cog_arr[idx + 1] = 0
# Where computed SOG is 0 or NaN, fall back to raw_sog
df['sog'] = sog_arr
if 'raw_sog' in df.columns:
mask = (df['sog'] == 0) | np.isnan(df['sog'])
df.loc[mask, 'sog'] = df.loc[mask, 'raw_sog'].fillna(0)
df['cog'] = cog_arr
return df
class VesselStore:
"""In-memory vessel trajectory store for Korean waters vessel data.
Maintains a 24-hour sliding window of all vessel tracks and supports
incremental 5-minute updates. Chinese vessel (MMSI 412*) filtering
is applied only at analysis target selection time.
"""
def __init__(self) -> None:
self._tracks: dict[str, pd.DataFrame] = {}
self._last_bucket: Optional[datetime] = None
self._static_info: dict[str, dict] = {}
self._permit_set: set[str] = set()
self._static_refreshed_at: Optional[datetime] = None
self._permit_refreshed_at: Optional[datetime] = None
# ------------------------------------------------------------------
# Public load / update methods
# ------------------------------------------------------------------
def load_initial(self, hours: int = 24) -> None:
"""Load all Korean waters vessel data for the past N hours.
Fetches a bulk DataFrame from snpdb, groups by MMSI, and stores
each vessel's track separately. Also triggers static info and
permit registry refresh.
"""
from db import snpdb
logger.info('loading initial vessel tracks (last %dh)...', hours)
try:
df_all = snpdb.fetch_all_tracks(hours)
except Exception as e:
logger.error('fetch_all_tracks failed: %s', e)
return
if df_all.empty:
logger.warning('fetch_all_tracks returned empty DataFrame')
return
# Rename sog column to raw_sog to preserve original AIS-reported speed
if 'sog' in df_all.columns and 'raw_sog' not in df_all.columns:
df_all = df_all.rename(columns={'sog': 'raw_sog'})
self._tracks = {}
for mmsi, group in df_all.groupby('mmsi'):
self._tracks[str(mmsi)] = group.reset_index(drop=True)
# last_bucket 설정 — incremental fetch 시작점
if 'time_bucket' in df_all.columns and not df_all['time_bucket'].dropna().empty:
max_bucket = pd.to_datetime(df_all['time_bucket'].dropna()).max()
if hasattr(max_bucket, 'to_pydatetime'):
max_bucket = max_bucket.to_pydatetime()
if isinstance(max_bucket, datetime) and max_bucket.tzinfo is None:
max_bucket = max_bucket.replace(tzinfo=timezone.utc)
self._last_bucket = max_bucket
elif 'timestamp' in df_all.columns and not df_all['timestamp'].dropna().empty:
max_ts = pd.to_datetime(df_all['timestamp'].dropna()).max()
if hasattr(max_ts, 'to_pydatetime'):
max_ts = max_ts.to_pydatetime()
if isinstance(max_ts, datetime) and max_ts.tzinfo is None:
max_ts = max_ts.replace(tzinfo=timezone.utc)
self._last_bucket = max_ts
vessel_count = len(self._tracks)
point_count = sum(len(v) for v in self._tracks.values())
logger.info(
'initial load complete: %d vessels, %d total points, last_bucket=%s',
vessel_count,
point_count,
self._last_bucket,
)
self.refresh_static_info()
self.refresh_permit_registry()
def merge_incremental(self, df_new: pd.DataFrame) -> None:
"""Merge a new batch of vessel positions into the in-memory store.
Deduplicates by timestamp within each MMSI and updates _last_bucket.
"""
if df_new.empty:
logger.debug('merge_incremental called with empty DataFrame, skipping')
return
if 'sog' in df_new.columns and 'raw_sog' not in df_new.columns:
df_new = df_new.rename(columns={'sog': 'raw_sog'})
new_buckets: list[datetime] = []
for mmsi, group in df_new.groupby('mmsi'):
mmsi_str = str(mmsi)
if mmsi_str in self._tracks:
combined = pd.concat([self._tracks[mmsi_str], group], ignore_index=True)
combined = combined.drop_duplicates(subset=['timestamp'])
self._tracks[mmsi_str] = combined.reset_index(drop=True)
else:
self._tracks[mmsi_str] = group.reset_index(drop=True)
if 'time_bucket' in group.columns and not group['time_bucket'].empty:
bucket_vals = pd.to_datetime(group['time_bucket'].dropna())
if not bucket_vals.empty:
new_buckets.append(bucket_vals.max().to_pydatetime())
if new_buckets:
latest = max(new_buckets)
if isinstance(latest, datetime) and latest.tzinfo is None:
latest = latest.replace(tzinfo=timezone.utc)
if self._last_bucket is None or latest > self._last_bucket:
self._last_bucket = latest
logger.debug(
'incremental merge done: %d mmsis in batch, store has %d vessels',
df_new['mmsi'].nunique(),
len(self._tracks),
)
def evict_stale(self, hours: int = 24) -> None:
"""Remove track points older than N hours and evict empty MMSI entries."""
import datetime as _dt
now = datetime.now(timezone.utc)
cutoff_aware = now - _dt.timedelta(hours=hours)
cutoff_naive = cutoff_aware.replace(tzinfo=None)
before_total = sum(len(v) for v in self._tracks.values())
evicted_mmsis: list[str] = []
for mmsi in list(self._tracks.keys()):
df = self._tracks[mmsi]
ts_col = df['timestamp']
# Handle tz-aware and tz-naive timestamps uniformly
if hasattr(ts_col.dtype, 'tz') and ts_col.dtype.tz is not None:
mask = ts_col >= pd.Timestamp(cutoff_aware)
else:
mask = ts_col >= pd.Timestamp(cutoff_naive)
filtered = df[mask].reset_index(drop=True)
if filtered.empty:
del self._tracks[mmsi]
evicted_mmsis.append(mmsi)
else:
self._tracks[mmsi] = filtered
after_total = sum(len(v) for v in self._tracks.values())
logger.info(
'eviction complete: removed %d points, evicted %d mmsis (threshold=%dh)',
before_total - after_total,
len(evicted_mmsis),
hours,
)
def refresh_static_info(self) -> None:
"""Fetch vessel static info (type, name, dimensions) from snpdb.
Skips refresh if called within the last 60 minutes.
"""
now = datetime.now(timezone.utc)
if self._static_refreshed_at is not None:
elapsed_min = (now - self._static_refreshed_at).total_seconds() / 60
if elapsed_min < _STATIC_REFRESH_INTERVAL_MIN:
logger.debug(
'static info refresh skipped (%.1f min since last refresh)',
elapsed_min,
)
return
if not self._tracks:
logger.debug('no tracks in store, skipping static info refresh')
return
from db import snpdb
mmsi_list = list(self._tracks.keys())
try:
info = snpdb.fetch_static_info(mmsi_list)
self._static_info.update(info)
self._static_refreshed_at = now
logger.info('static info refreshed: %d vessels', len(info))
except Exception as e:
logger.error('fetch_static_info failed: %s', e)
def refresh_permit_registry(self) -> None:
"""Fetch permitted Chinese fishing vessel MMSIs from snpdb.
Skips refresh if called within the last 30 minutes.
"""
now = datetime.now(timezone.utc)
if self._permit_refreshed_at is not None:
elapsed_min = (now - self._permit_refreshed_at).total_seconds() / 60
if elapsed_min < _PERMIT_REFRESH_INTERVAL_MIN:
logger.debug(
'permit registry refresh skipped (%.1f min since last refresh)',
elapsed_min,
)
return
from db import snpdb
try:
mmsis = snpdb.fetch_permit_mmsis()
self._permit_set = set(mmsis)
self._permit_refreshed_at = now
logger.info('permit registry refreshed: %d permitted vessels', len(self._permit_set))
except Exception as e:
logger.error('fetch_permit_mmsis failed: %s', e)
# ------------------------------------------------------------------
# Analysis target selection
# ------------------------------------------------------------------
def select_analysis_targets(self) -> pd.DataFrame:
"""Build a combined DataFrame of Chinese vessel tracks with computed SOG/COG.
Filters to MMSI starting with '412', computes SOG and COG from
consecutive lat/lon/timestamp pairs using the haversine formula,
and falls back to raw_sog where computed values are zero or NaN.
Returns:
DataFrame with columns: mmsi, timestamp, lat, lon, sog, cog
"""
chinese_mmsis = [m for m in self._tracks if m.startswith(_CHINESE_MMSI_PREFIX)]
if not chinese_mmsis:
logger.info('no Chinese vessels (412*) found in store')
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog'])
frames = [self._tracks[m] for m in chinese_mmsis]
combined = pd.concat(frames, ignore_index=True)
required_cols = {'mmsi', 'timestamp', 'lat', 'lon'}
missing = required_cols - set(combined.columns)
if missing:
logger.error('combined DataFrame missing required columns: %s', missing)
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog'])
result = _compute_sog_cog(combined)
output_cols = ['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog']
available = [c for c in output_cols if c in result.columns]
return result[available].reset_index(drop=True)
# ------------------------------------------------------------------
# Lookup helpers
# ------------------------------------------------------------------
def is_permitted(self, mmsi: str) -> bool:
"""Return True if the given MMSI is in the permitted Chinese fishing vessel registry."""
return mmsi in self._permit_set
def get_vessel_info(self, mmsi: str) -> dict:
"""Return static vessel info dict for the given MMSI, or empty dict if not found."""
return self._static_info.get(mmsi, {})
def get_all_latest_positions(self) -> dict[str, dict]:
"""모든 선박의 최신 위치 반환. {mmsi: {lat, lon, sog, cog, timestamp, name}}
cog는 마지막 2점의 좌표로 bearing 계산."""
import math
result: dict[str, dict] = {}
for mmsi, df in self._tracks.items():
if df is None or len(df) == 0:
continue
last = df.iloc[-1]
info = self._static_info.get(mmsi, {})
# COG: 마지막 2점으로 bearing 계산
cog = 0.0
if len(df) >= 2:
prev = df.iloc[-2]
lat1 = math.radians(float(prev['lat']))
lat2 = math.radians(float(last['lat']))
dlon = math.radians(float(last['lon']) - float(prev['lon']))
x = math.sin(dlon) * math.cos(lat2)
y = math.cos(lat1) * math.sin(lat2) - math.sin(lat1) * math.cos(lat2) * math.cos(dlon)
cog = (math.degrees(math.atan2(x, y)) + 360) % 360
result[mmsi] = {
'lat': float(last['lat']),
'lon': float(last['lon']),
'sog': float(last.get('sog', 0) or last.get('raw_sog', 0) or 0),
'cog': cog,
'timestamp': last.get('timestamp'),
'name': info.get('name', ''),
}
return result
def get_vessel_tracks(self, mmsis: list[str], hours: int = 24) -> dict[str, list[dict]]:
"""Return track points for given MMSIs within the specified hours window.
Returns dict mapping mmsi to list of {ts, lat, lon, sog, cog} dicts,
sorted by timestamp ascending.
"""
import datetime as _dt
now = datetime.now(timezone.utc)
cutoff_aware = now - _dt.timedelta(hours=hours)
cutoff_naive = cutoff_aware.replace(tzinfo=None)
result: dict[str, list[dict]] = {}
for mmsi in mmsis:
df = self._tracks.get(mmsi)
if df is None or len(df) == 0:
continue
ts_col = df['timestamp']
if hasattr(ts_col.dtype, 'tz') and ts_col.dtype.tz is not None:
mask = ts_col >= pd.Timestamp(cutoff_aware)
else:
mask = ts_col >= pd.Timestamp(cutoff_naive)
filtered = df[mask].sort_values('timestamp')
if filtered.empty:
continue
# Compute SOG/COG for this vessel's track
if len(filtered) >= 2:
track_with_sog = _compute_sog_cog(filtered.copy())
else:
track_with_sog = filtered.copy()
if 'sog' not in track_with_sog.columns:
track_with_sog['sog'] = track_with_sog.get('raw_sog', 0)
if 'cog' not in track_with_sog.columns:
track_with_sog['cog'] = 0
points = []
for _, row in track_with_sog.iterrows():
ts = row['timestamp']
# Convert to epoch ms
if hasattr(ts, 'timestamp'):
epoch_ms = int(ts.timestamp() * 1000)
else:
epoch_ms = int(pd.Timestamp(ts).timestamp() * 1000)
points.append({
'ts': epoch_ms,
'lat': float(row['lat']),
'lon': float(row['lon']),
'sog': float(row.get('sog', 0) or 0),
'cog': float(row.get('cog', 0) or 0),
})
if points:
result[mmsi] = points
return result
def get_chinese_mmsis(self) -> set:
"""Return the set of all Chinese vessel MMSIs (412*) currently in the store."""
return {m for m in self._tracks if m.startswith(_CHINESE_MMSI_PREFIX)}
# ------------------------------------------------------------------
# Properties
# ------------------------------------------------------------------
@property
def last_bucket(self) -> Optional[datetime]:
"""Return the latest time bucket seen across all merged incremental batches."""
return self._last_bucket
# ------------------------------------------------------------------
# Diagnostics
# ------------------------------------------------------------------
def stats(self) -> dict:
"""Return store statistics for health/status reporting."""
total_points = sum(len(v) for v in self._tracks.values())
chinese_count = sum(1 for m in self._tracks if m.startswith(_CHINESE_MMSI_PREFIX))
# Rough memory estimate: each row ~200 bytes across columns
memory_mb = round((total_points * 200) / (1024 * 1024), 2)
return {
'vessels': len(self._tracks),
'points': total_points,
'memory_mb': memory_mb,
'last_bucket': self._last_bucket.isoformat() if self._last_bucket else None,
'targets': chinese_count,
'permitted': len(self._permit_set),
}
# Module-level singleton
vessel_store = VesselStore()