kcg-monitoring/prediction/main.py
htlee 8eacbb2c91 feat: 트랙 API 전체 모델 확장 + 개별 선박 on/off → 폴리곤 반영
Prediction API:
- /correlation/{group}/tracks: is_default=TRUE 제거 → 모든 활성 모델 조회
- 응답에 models: {modelName: score} 딕셔너리 추가 (모델별 점수)
- MMSI 기준 중복 제거, 최고 점수 유지

Frontend:
- CorrelationVesselTrack 타입: models 필드 추가, type 필드 추가
- 오퍼레이셔널 폴리곤: enabledVessels 기반 on/off 제어
  (score 임계값 → 개별 체크박스 토글로 전환)
- identity OFF 시 폴리곤 base points에서 멤버 위치 제외

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 08:40:31 +09:00

153 lines
4.5 KiB
Python

import logging
import sys
from contextlib import asynccontextmanager
from fastapi import BackgroundTasks, FastAPI
from config import settings
from db import kcgdb, snpdb
from scheduler import get_last_run, run_analysis_cycle, start_scheduler, stop_scheduler
logging.basicConfig(
level=getattr(logging, settings.LOG_LEVEL, logging.INFO),
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
stream=sys.stdout,
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(application: FastAPI):
from cache.vessel_store import vessel_store
logger.info('starting KCG Prediction Service')
snpdb.init_pool()
kcgdb.init_pool()
# 인메모리 캐시 초기 로드 (24시간)
logger.info('loading initial vessel data (%dh)...', settings.INITIAL_LOAD_HOURS)
vessel_store.load_initial(settings.INITIAL_LOAD_HOURS)
logger.info('initial load complete: %s', vessel_store.stats())
start_scheduler()
yield
stop_scheduler()
snpdb.close_pool()
kcgdb.close_pool()
logger.info('KCG Prediction Service stopped')
app = FastAPI(
title='KCG Prediction Service',
version='2.1.0',
lifespan=lifespan,
)
# AI 해양분석 채팅 라우터
from chat.router import router as chat_router
app.include_router(chat_router)
@app.get('/health')
def health_check():
from cache.vessel_store import vessel_store
return {
'status': 'ok',
'snpdb': snpdb.check_health(),
'kcgdb': kcgdb.check_health(),
'store': vessel_store.stats(),
}
@app.get('/api/v1/analysis/status')
def analysis_status():
return get_last_run()
@app.post('/api/v1/analysis/trigger')
def trigger_analysis(background_tasks: BackgroundTasks):
background_tasks.add_task(run_analysis_cycle)
return {'message': 'analysis cycle triggered'}
@app.get('/api/v1/correlation/{group_key:path}/tracks')
def get_correlation_tracks(
group_key: str,
hours: int = 24,
min_score: float = 0.3,
):
"""Return correlated vessels with their track history for map rendering.
Queries gear_correlation_scores (ALL active models) and enriches with
24h track data from in-memory vessel_store.
Each vessel includes which models detected it.
"""
from cache.vessel_store import vessel_store
try:
conn = kcgdb.get_conn()
cur = conn.cursor()
# Get correlated vessels from ALL active models
cur.execute("""
SELECT s.target_mmsi, s.target_type, s.target_name,
s.current_score, m.name AS model_name
FROM kcg.gear_correlation_scores s
JOIN kcg.correlation_param_models m ON s.model_id = m.id
WHERE s.group_key = %s
AND s.current_score >= %s
AND m.is_active = TRUE
ORDER BY s.current_score DESC
""", (group_key, min_score))
rows = cur.fetchall()
cur.close()
conn.close()
if not rows:
return {'groupKey': group_key, 'vessels': []}
# Group by MMSI: collect all models per vessel, keep highest score
vessel_map: dict[str, dict] = {}
for row in rows:
mmsi = row[0]
model_name = row[4]
score = float(row[3])
if mmsi not in vessel_map:
vessel_map[mmsi] = {
'mmsi': mmsi,
'type': row[1],
'name': row[2] or '',
'score': score,
'models': {model_name: score},
}
else:
entry = vessel_map[mmsi]
entry['models'][model_name] = score
if score > entry['score']:
entry['score'] = score
mmsis = list(vessel_map.keys())
# Get tracks from vessel_store
tracks = vessel_store.get_vessel_tracks(mmsis, hours)
# Build response
vessels = []
for info in vessel_map.values():
track = tracks.get(info['mmsi'], [])
vessels.append({
'mmsi': info['mmsi'],
'name': info['name'],
'type': info['type'],
'score': info['score'],
'models': info['models'], # {modelName: score, ...}
'track': track,
})
return {'groupKey': group_key, 'vessels': vessels}
except Exception as e:
logger.warning('get_correlation_tracks failed for %s: %s', group_key, e)
return {'groupKey': group_key, 'vessels': []}