kcg-ai-monitoring/prediction/pipeline/features.py
htlee e2fc355b2c feat: S2 prediction 분석 엔진 모노레포 이식
iran prediction 47개 Python 파일을 prediction/ 디렉토리로 복제:
- algorithms/ 14개 분석 알고리즘 (어구추론, 다크베셀, 스푸핑, 환적, 위험도 등)
- pipeline/ 7단계 분류 파이프라인
- cache/vessel_store (24h 슬라이딩 윈도우)
- db/ 어댑터 (snpdb 원본조회, kcgdb 결과저장)
- chat/ AI 채팅 (Ollama, 후순위)
- data/ 정적 데이터 (기선, 특정어업수역 GeoJSON)

config.py를 kcgaidb로 재구성 (DB명, 사용자, 비밀번호)
DB 연결 검증 완료 (kcgaidb 37개 테이블 접근 확인)
Makefile에 dev-prediction / dev-all 타겟 추가
CLAUDE.md에 prediction 섹션 추가

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 12:56:51 +09:00

94 lines
3.4 KiB
Python

import math
import numpy as np
import pandas as pd
from typing import Dict
class FeatureExtractor:
"""
어선 유형 분류를 위한 특징 벡터 추출
논문 12 (남중국해 어선 유형 식별) 기반 핵심 피처:
- 속도 통계 (mean, std, 분위수)
- 침로 변동성 (COG variance → 선회 패턴)
- 조업 비율 및 조업 지속 시간
- 이동 거리 및 해역 커버리지
- 정박 빈도 (투망/양망 간격 추정)
"""
@staticmethod
def haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""두 좌표 간 거리 (km)"""
R = 6371.0
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def extract(self, df_vessel: pd.DataFrame) -> Dict[str, float]:
if len(df_vessel) < 10:
return {}
sog = df_vessel['sog'].values
cog = df_vessel['cog'].values
states = df_vessel['state'].values
# Speed features
fishing_sog = sog[states == 'FISHING'] if (states == 'FISHING').any() else np.array([0])
feat: Dict[str, float] = {
'sog_mean': float(np.mean(sog)),
'sog_std': float(np.std(sog)),
'sog_fishing_mean': float(np.mean(fishing_sog)),
'sog_fishing_std': float(np.std(fishing_sog)),
'sog_q25': float(np.percentile(sog, 25)),
'sog_q75': float(np.percentile(sog, 75)),
}
# COG features (선망: 원형, 트롤: 직선왕복, 연승: 부드러운 곡선)
cog_diff = np.abs(np.diff(np.unwrap(np.radians(cog))))
feat['cog_change_mean'] = float(np.mean(cog_diff))
feat['cog_change_std'] = float(np.std(cog_diff))
feat['cog_circularity'] = float(np.sum(cog_diff > np.pi / 4) / len(cog_diff))
# State ratios
n = len(states)
feat['fishing_pct'] = float((states == 'FISHING').sum() / n)
feat['stationary_pct'] = float((states == 'STATIONARY').sum() / n)
feat['sailing_pct'] = float((states == 'SAILING').sum() / n)
# Stationary events (투망·양망 횟수 추정)
stationary_events = 0
prev = None
for s in states:
if s == 'STATIONARY' and prev != 'STATIONARY':
stationary_events += 1
prev = s
feat['stationary_events'] = float(stationary_events)
# Total distance (km)
lats = df_vessel['lat'].values
lons = df_vessel['lon'].values
total_dist = sum(
self.haversine(lats[i], lons[i], lats[i + 1], lons[i + 1])
for i in range(len(lats) - 1)
)
feat['total_distance_km'] = round(total_dist, 2)
# Coverage (바운딩 박스 면적 — 근사)
feat['coverage_deg2'] = round(float(np.ptp(lats)) * float(np.ptp(lons)), 4)
# Average fishing run length
fishing_runs = []
run = 0
for s in states:
if s == 'FISHING':
run += 1
elif run > 0:
fishing_runs.append(run)
run = 0
if run > 0:
fishing_runs.append(run)
feat['fishing_run_mean'] = float(np.mean(fishing_runs)) if fishing_runs else 0.0
return feat