kcg-monitoring/prediction/cache/vessel_store.py
htlee 83b3d80c6d feat: Python 어선 분류기 + 배포 설정 + 백엔드 모니터링 프록시
- prediction/: FastAPI 7단계 분류 파이프라인 + 6개 탐지 알고리즘
  - snpdb 궤적 조회 → 인메모리 캐시(13K척) → 분류 → kcgdb 저장
  - APScheduler 5분 주기, Python 3.9 호환
  - 버그 수정: @property last_bucket, SQL INTERVAL 바인딩, rollback, None 가드
  - 보안: DB 비밀번호 하드코딩 제거 → env 환경변수 필수
- deploy/kcg-prediction.service: systemd 서비스 (redis-211, 포트 8001)
- deploy.yml: prediction CI/CD 배포 단계 추가 (192.168.1.18:32023)
- backend: PredictionProxyController (health/status/trigger 프록시)
- backend: AppProperties predictionBaseUrl + AuthFilter 인증 예외

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 12:07:40 +09:00

336 lines
13 KiB
Python

import logging
from datetime import datetime, timezone
from typing import Optional
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
_STATIC_REFRESH_INTERVAL_MIN = 60
_PERMIT_REFRESH_INTERVAL_MIN = 30
_EARTH_RADIUS_NM = 3440.065
_MAX_REASONABLE_SOG = 30.0
_CHINESE_MMSI_PREFIX = '412'
def _compute_sog_cog(df: pd.DataFrame) -> pd.DataFrame:
"""Compute SOG (knots) and COG (degrees) from consecutive lat/lon/timestamp points."""
df = df.sort_values(['mmsi', 'timestamp']).copy()
lat1 = np.radians(df['lat'].values[:-1])
lon1 = np.radians(df['lon'].values[:-1])
lat2 = np.radians(df['lat'].values[1:])
lon2 = np.radians(df['lon'].values[1:])
# Haversine distance (nautical miles)
dlat = lat2 - lat1
dlon = lon2 - lon1
a = np.sin(dlat / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2) ** 2
dist_nm = _EARTH_RADIUS_NM * 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
# Time difference (hours)
ts = df['timestamp'].values
dt_sec = (ts[1:] - ts[:-1]).astype('timedelta64[s]').astype(float)
dt_hours = dt_sec / 3600.0
dt_hours[dt_hours <= 0] = np.nan
# SOG = dist / time (knots)
computed_sog = dist_nm / dt_hours
computed_sog = np.clip(np.nan_to_num(computed_sog, nan=0.0), 0, _MAX_REASONABLE_SOG)
# COG = bearing (degrees)
x = np.sin(dlon) * np.cos(lat2)
y = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(dlon)
bearing = (np.degrees(np.arctan2(x, y)) + 360) % 360
# Append last value (copy from previous)
sog_arr = np.append(computed_sog, computed_sog[-1:] if len(computed_sog) > 0 else [0])
cog_arr = np.append(bearing, bearing[-1:] if len(bearing) > 0 else [0])
# Reset at MMSI boundaries
mmsi_vals = df['mmsi'].values
boundary = np.where(mmsi_vals[:-1] != mmsi_vals[1:])[0]
for idx in boundary:
sog_arr[idx + 1] = df['raw_sog'].iloc[idx + 1] if 'raw_sog' in df.columns else 0
cog_arr[idx + 1] = 0
# Where computed SOG is 0 or NaN, fall back to raw_sog
df['sog'] = sog_arr
if 'raw_sog' in df.columns:
mask = (df['sog'] == 0) | np.isnan(df['sog'])
df.loc[mask, 'sog'] = df.loc[mask, 'raw_sog'].fillna(0)
df['cog'] = cog_arr
return df
class VesselStore:
"""In-memory vessel trajectory store for Korean waters vessel data.
Maintains a 24-hour sliding window of all vessel tracks and supports
incremental 5-minute updates. Chinese vessel (MMSI 412*) filtering
is applied only at analysis target selection time.
"""
def __init__(self) -> None:
self._tracks: dict[str, pd.DataFrame] = {}
self._last_bucket: Optional[datetime] = None
self._static_info: dict[str, dict] = {}
self._permit_set: set[str] = set()
self._static_refreshed_at: Optional[datetime] = None
self._permit_refreshed_at: Optional[datetime] = None
# ------------------------------------------------------------------
# Public load / update methods
# ------------------------------------------------------------------
def load_initial(self, hours: int = 24) -> None:
"""Load all Korean waters vessel data for the past N hours.
Fetches a bulk DataFrame from snpdb, groups by MMSI, and stores
each vessel's track separately. Also triggers static info and
permit registry refresh.
"""
from db import snpdb
logger.info('loading initial vessel tracks (last %dh)...', hours)
try:
df_all = snpdb.fetch_all_tracks(hours)
except Exception as e:
logger.error('fetch_all_tracks failed: %s', e)
return
if df_all.empty:
logger.warning('fetch_all_tracks returned empty DataFrame')
return
# Rename sog column to raw_sog to preserve original AIS-reported speed
if 'sog' in df_all.columns and 'raw_sog' not in df_all.columns:
df_all = df_all.rename(columns={'sog': 'raw_sog'})
self._tracks = {}
for mmsi, group in df_all.groupby('mmsi'):
self._tracks[str(mmsi)] = group.reset_index(drop=True)
vessel_count = len(self._tracks)
point_count = sum(len(v) for v in self._tracks.values())
logger.info(
'initial load complete: %d vessels, %d total points',
vessel_count,
point_count,
)
self.refresh_static_info()
self.refresh_permit_registry()
def merge_incremental(self, df_new: pd.DataFrame) -> None:
"""Merge a new batch of vessel positions into the in-memory store.
Deduplicates by timestamp within each MMSI and updates _last_bucket.
"""
if df_new.empty:
logger.debug('merge_incremental called with empty DataFrame, skipping')
return
if 'sog' in df_new.columns and 'raw_sog' not in df_new.columns:
df_new = df_new.rename(columns={'sog': 'raw_sog'})
new_buckets: list[datetime] = []
for mmsi, group in df_new.groupby('mmsi'):
mmsi_str = str(mmsi)
if mmsi_str in self._tracks:
combined = pd.concat([self._tracks[mmsi_str], group], ignore_index=True)
combined = combined.drop_duplicates(subset=['timestamp'])
self._tracks[mmsi_str] = combined.reset_index(drop=True)
else:
self._tracks[mmsi_str] = group.reset_index(drop=True)
if 'time_bucket' in group.columns and not group['time_bucket'].empty:
bucket_vals = pd.to_datetime(group['time_bucket'].dropna())
if not bucket_vals.empty:
new_buckets.append(bucket_vals.max().to_pydatetime())
if new_buckets:
latest = max(new_buckets)
if isinstance(latest, datetime) and latest.tzinfo is None:
latest = latest.replace(tzinfo=timezone.utc)
if self._last_bucket is None or latest > self._last_bucket:
self._last_bucket = latest
logger.debug(
'incremental merge done: %d mmsis in batch, store has %d vessels',
df_new['mmsi'].nunique(),
len(self._tracks),
)
def evict_stale(self, hours: int = 24) -> None:
"""Remove track points older than N hours and evict empty MMSI entries."""
import datetime as _dt
now = datetime.now(timezone.utc)
cutoff_aware = now - _dt.timedelta(hours=hours)
cutoff_naive = cutoff_aware.replace(tzinfo=None)
before_total = sum(len(v) for v in self._tracks.values())
evicted_mmsis: list[str] = []
for mmsi in list(self._tracks.keys()):
df = self._tracks[mmsi]
ts_col = df['timestamp']
# Handle tz-aware and tz-naive timestamps uniformly
if hasattr(ts_col.dtype, 'tz') and ts_col.dtype.tz is not None:
mask = ts_col >= pd.Timestamp(cutoff_aware)
else:
mask = ts_col >= pd.Timestamp(cutoff_naive)
filtered = df[mask].reset_index(drop=True)
if filtered.empty:
del self._tracks[mmsi]
evicted_mmsis.append(mmsi)
else:
self._tracks[mmsi] = filtered
after_total = sum(len(v) for v in self._tracks.values())
logger.info(
'eviction complete: removed %d points, evicted %d mmsis (threshold=%dh)',
before_total - after_total,
len(evicted_mmsis),
hours,
)
def refresh_static_info(self) -> None:
"""Fetch vessel static info (type, name, dimensions) from snpdb.
Skips refresh if called within the last 60 minutes.
"""
now = datetime.now(timezone.utc)
if self._static_refreshed_at is not None:
elapsed_min = (now - self._static_refreshed_at).total_seconds() / 60
if elapsed_min < _STATIC_REFRESH_INTERVAL_MIN:
logger.debug(
'static info refresh skipped (%.1f min since last refresh)',
elapsed_min,
)
return
if not self._tracks:
logger.debug('no tracks in store, skipping static info refresh')
return
from db import snpdb
mmsi_list = list(self._tracks.keys())
try:
info = snpdb.fetch_static_info(mmsi_list)
self._static_info.update(info)
self._static_refreshed_at = now
logger.info('static info refreshed: %d vessels', len(info))
except Exception as e:
logger.error('fetch_static_info failed: %s', e)
def refresh_permit_registry(self) -> None:
"""Fetch permitted Chinese fishing vessel MMSIs from snpdb.
Skips refresh if called within the last 30 minutes.
"""
now = datetime.now(timezone.utc)
if self._permit_refreshed_at is not None:
elapsed_min = (now - self._permit_refreshed_at).total_seconds() / 60
if elapsed_min < _PERMIT_REFRESH_INTERVAL_MIN:
logger.debug(
'permit registry refresh skipped (%.1f min since last refresh)',
elapsed_min,
)
return
from db import snpdb
try:
mmsis = snpdb.fetch_permit_mmsis()
self._permit_set = set(mmsis)
self._permit_refreshed_at = now
logger.info('permit registry refreshed: %d permitted vessels', len(self._permit_set))
except Exception as e:
logger.error('fetch_permit_mmsis failed: %s', e)
# ------------------------------------------------------------------
# Analysis target selection
# ------------------------------------------------------------------
def select_analysis_targets(self) -> pd.DataFrame:
"""Build a combined DataFrame of Chinese vessel tracks with computed SOG/COG.
Filters to MMSI starting with '412', computes SOG and COG from
consecutive lat/lon/timestamp pairs using the haversine formula,
and falls back to raw_sog where computed values are zero or NaN.
Returns:
DataFrame with columns: mmsi, timestamp, lat, lon, sog, cog
"""
chinese_mmsis = [m for m in self._tracks if m.startswith(_CHINESE_MMSI_PREFIX)]
if not chinese_mmsis:
logger.info('no Chinese vessels (412*) found in store')
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog'])
frames = [self._tracks[m] for m in chinese_mmsis]
combined = pd.concat(frames, ignore_index=True)
required_cols = {'mmsi', 'timestamp', 'lat', 'lon'}
missing = required_cols - set(combined.columns)
if missing:
logger.error('combined DataFrame missing required columns: %s', missing)
return pd.DataFrame(columns=['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog'])
result = _compute_sog_cog(combined)
output_cols = ['mmsi', 'timestamp', 'lat', 'lon', 'sog', 'cog']
available = [c for c in output_cols if c in result.columns]
return result[available].reset_index(drop=True)
# ------------------------------------------------------------------
# Lookup helpers
# ------------------------------------------------------------------
def is_permitted(self, mmsi: str) -> bool:
"""Return True if the given MMSI is in the permitted Chinese fishing vessel registry."""
return mmsi in self._permit_set
def get_vessel_info(self, mmsi: str) -> dict:
"""Return static vessel info dict for the given MMSI, or empty dict if not found."""
return self._static_info.get(mmsi, {})
# ------------------------------------------------------------------
# Properties
# ------------------------------------------------------------------
@property
def last_bucket(self) -> Optional[datetime]:
"""Return the latest time bucket seen across all merged incremental batches."""
return self._last_bucket
# ------------------------------------------------------------------
# Diagnostics
# ------------------------------------------------------------------
def stats(self) -> dict:
"""Return store statistics for health/status reporting."""
total_points = sum(len(v) for v in self._tracks.values())
chinese_count = sum(1 for m in self._tracks if m.startswith(_CHINESE_MMSI_PREFIX))
# Rough memory estimate: each row ~200 bytes across columns
memory_mb = round((total_points * 200) / (1024 * 1024), 2)
return {
'vessels': len(self._tracks),
'points': total_points,
'memory_mb': memory_mb,
'last_bucket': self._last_bucket.isoformat() if self._last_bucket else None,
'targets': chinese_count,
'permitted': len(self._permit_set),
}
# Module-level singleton
vessel_store = VesselStore()