kcg-ai-monitoring/prediction/pipeline/orchestrator.py
htlee e2fc355b2c feat: S2 prediction 분석 엔진 모노레포 이식
iran prediction 47개 Python 파일을 prediction/ 디렉토리로 복제:
- algorithms/ 14개 분석 알고리즘 (어구추론, 다크베셀, 스푸핑, 환적, 위험도 등)
- pipeline/ 7단계 분류 파이프라인
- cache/vessel_store (24h 슬라이딩 윈도우)
- db/ 어댑터 (snpdb 원본조회, kcgdb 결과저장)
- chat/ AI 채팅 (Ollama, 후순위)
- data/ 정적 데이터 (기선, 특정어업수역 GeoJSON)

config.py를 kcgaidb로 재구성 (DB명, 사용자, 비밀번호)
DB 연결 검증 완료 (kcgaidb 37개 테이블 접근 확인)
Makefile에 dev-prediction / dev-all 타겟 추가
CLAUDE.md에 prediction 섹션 추가

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 12:56:51 +09:00

96 lines
3.3 KiB
Python

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import pandas as pd
from pipeline.preprocessor import AISPreprocessor
from pipeline.behavior import BehaviorDetector
from pipeline.resampler import TrajectoryResampler
from pipeline.features import FeatureExtractor
from pipeline.classifier import VesselTypeClassifier, get_season
from pipeline.clusterer import EnhancedBIRCHClusterer
from pipeline.constants import RESAMPLE_INTERVAL_MIN
logger = logging.getLogger(__name__)
class ChineseFishingVesselPipeline:
"""7-step pipeline for classifying Chinese fishing vessel activity types.
Steps:
1. AIS preprocessing (Yan et al. 2022)
2. Behaviour-state detection (speed-based 3-class)
3. Trajectory resampling (Yan, Yang et al. — 4-minute interval)
4. Feature vector extraction (paper 12)
5. Vessel-type classification (rule-based scoring)
6. Enhanced BIRCH trajectory clustering (Yan, Yang et al.)
7. Seasonal activity tagging (paper 12)
"""
def __init__(self) -> None:
self.preprocessor = AISPreprocessor()
self.detector = BehaviorDetector()
self.resampler = TrajectoryResampler(RESAMPLE_INTERVAL_MIN)
self.extractor = FeatureExtractor()
self.classifier = VesselTypeClassifier()
self.clusterer = EnhancedBIRCHClusterer()
def run(
self, df_raw: pd.DataFrame
) -> tuple[list[dict], dict[str, pd.DataFrame]]:
"""Run the 7-step pipeline.
Args:
df_raw: raw AIS DataFrame with columns mmsi, timestamp, lat, lon,
sog, cog.
Returns:
(results, vessel_dfs) where:
- results is a list of classification dicts, each containing:
mmsi, vessel_type, confidence, fishing_pct, cluster_id, season,
n_points, features.
- vessel_dfs is a mapping of mmsi -> resampled trajectory DataFrame.
"""
# Step 1: preprocess
df = self.preprocessor.run(df_raw)
if len(df) == 0:
logger.warning('pipeline: no rows after preprocessing')
return [], {}
# Step 2: behaviour detection
df = self.detector.detect(df)
# Steps 35: per-vessel processing
vessel_dfs: dict[str, pd.DataFrame] = {}
results: list[dict] = []
for mmsi, df_v in df.groupby('mmsi'):
df_resampled = self.resampler.resample(df_v)
vessel_dfs[mmsi] = df_resampled
features = self.extractor.extract(df_resampled)
vtype, confidence = self.classifier.classify(features)
fishing_pct = BehaviorDetector.compute_fishing_ratio(df_resampled)
season = get_season(df_v['timestamp'].iloc[len(df_v) // 2])
results.append({
'mmsi': mmsi,
'vessel_type': vtype,
'confidence': confidence,
'fishing_pct': fishing_pct,
'season': season,
'n_points': len(df_resampled),
'features': features,
})
# Step 6: BIRCH clustering
cluster_map = self.clusterer.fit_predict(vessel_dfs)
for r in results:
r['cluster_id'] = cluster_map.get(r['mmsi'], -1)
logger.info(
'pipeline complete: %d vessels, types=%s',
len(results),
{r['vessel_type'] for r in results},
)
return results, vessel_dfs