kcg-monitoring/prediction/pipeline/orchestrator.py
htlee a68dfb21b2 feat: Python 어선 분류기 + 배포 설정 + 백엔드 모니터링 프록시
- prediction/: FastAPI 7단계 분류 파이프라인 + 6개 탐지 알고리즘
  - snpdb 궤적 조회 → 인메모리 캐시(13K척) → 분류 → kcgdb 저장
  - APScheduler 5분 주기, Python 3.9 호환
  - 버그 수정: @property last_bucket, SQL INTERVAL 바인딩, rollback, None 가드
  - 보안: DB 비밀번호 하드코딩 제거 → env 환경변수 필수
- deploy/kcg-prediction.service: systemd 서비스 (redis-211, 포트 8001)
- deploy.yml: prediction CI/CD 배포 단계 추가 (192.168.1.18:32023)
- backend: PredictionProxyController (health/status/trigger 프록시)
- backend: AppProperties predictionBaseUrl + AuthFilter 인증 예외

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 12:10:21 +09:00

96 lines
3.3 KiB
Python

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import pandas as pd
from pipeline.preprocessor import AISPreprocessor
from pipeline.behavior import BehaviorDetector
from pipeline.resampler import TrajectoryResampler
from pipeline.features import FeatureExtractor
from pipeline.classifier import VesselTypeClassifier, get_season
from pipeline.clusterer import EnhancedBIRCHClusterer
from pipeline.constants import RESAMPLE_INTERVAL_MIN
logger = logging.getLogger(__name__)
class ChineseFishingVesselPipeline:
"""7-step pipeline for classifying Chinese fishing vessel activity types.
Steps:
1. AIS preprocessing (Yan et al. 2022)
2. Behaviour-state detection (speed-based 3-class)
3. Trajectory resampling (Yan, Yang et al. — 4-minute interval)
4. Feature vector extraction (paper 12)
5. Vessel-type classification (rule-based scoring)
6. Enhanced BIRCH trajectory clustering (Yan, Yang et al.)
7. Seasonal activity tagging (paper 12)
"""
def __init__(self) -> None:
self.preprocessor = AISPreprocessor()
self.detector = BehaviorDetector()
self.resampler = TrajectoryResampler(RESAMPLE_INTERVAL_MIN)
self.extractor = FeatureExtractor()
self.classifier = VesselTypeClassifier()
self.clusterer = EnhancedBIRCHClusterer()
def run(
self, df_raw: pd.DataFrame
) -> tuple[list[dict], dict[str, pd.DataFrame]]:
"""Run the 7-step pipeline.
Args:
df_raw: raw AIS DataFrame with columns mmsi, timestamp, lat, lon,
sog, cog.
Returns:
(results, vessel_dfs) where:
- results is a list of classification dicts, each containing:
mmsi, vessel_type, confidence, fishing_pct, cluster_id, season,
n_points, features.
- vessel_dfs is a mapping of mmsi -> resampled trajectory DataFrame.
"""
# Step 1: preprocess
df = self.preprocessor.run(df_raw)
if len(df) == 0:
logger.warning('pipeline: no rows after preprocessing')
return [], {}
# Step 2: behaviour detection
df = self.detector.detect(df)
# Steps 35: per-vessel processing
vessel_dfs: dict[str, pd.DataFrame] = {}
results: list[dict] = []
for mmsi, df_v in df.groupby('mmsi'):
df_resampled = self.resampler.resample(df_v)
vessel_dfs[mmsi] = df_resampled
features = self.extractor.extract(df_resampled)
vtype, confidence = self.classifier.classify(features)
fishing_pct = BehaviorDetector.compute_fishing_ratio(df_resampled)
season = get_season(df_v['timestamp'].iloc[len(df_v) // 2])
results.append({
'mmsi': mmsi,
'vessel_type': vtype,
'confidence': confidence,
'fishing_pct': fishing_pct,
'season': season,
'n_points': len(df_resampled),
'features': features,
})
# Step 6: BIRCH clustering
cluster_map = self.clusterer.fit_predict(vessel_dfs)
for r in results:
r['cluster_id'] = cluster_map.get(r['mmsi'], -1)
logger.info(
'pipeline complete: %d vessels, types=%s',
len(results),
{r['vessel_type'] for r in results},
)
return results, vessel_dfs