kcg-monitoring/prediction/main.py
htlee 8631546142 fix: 트랙 API DB 접속 버그 수정 (context manager)
- kcgdb.get_conn()을 with문 없이 사용 → cursor 에러
- with kcgdb.get_conn() as conn: 으로 수정
- 디버그 로그 추가 (rows 수, track 매칭 수, vessel_store 크기)
- 결과: 47 vessels, 47 with track data (25567#1 그룹)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 09:03:33 +09:00

158 lines
4.9 KiB
Python

import logging
import sys
from contextlib import asynccontextmanager
from fastapi import BackgroundTasks, FastAPI
from config import settings
from db import kcgdb, snpdb
from scheduler import get_last_run, run_analysis_cycle, start_scheduler, stop_scheduler
logging.basicConfig(
level=getattr(logging, settings.LOG_LEVEL, logging.INFO),
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
stream=sys.stdout,
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(application: FastAPI):
from cache.vessel_store import vessel_store
logger.info('starting KCG Prediction Service')
snpdb.init_pool()
kcgdb.init_pool()
# 인메모리 캐시 초기 로드 (24시간)
logger.info('loading initial vessel data (%dh)...', settings.INITIAL_LOAD_HOURS)
vessel_store.load_initial(settings.INITIAL_LOAD_HOURS)
logger.info('initial load complete: %s', vessel_store.stats())
start_scheduler()
yield
stop_scheduler()
snpdb.close_pool()
kcgdb.close_pool()
logger.info('KCG Prediction Service stopped')
app = FastAPI(
title='KCG Prediction Service',
version='2.1.0',
lifespan=lifespan,
)
# AI 해양분석 채팅 라우터
from chat.router import router as chat_router
app.include_router(chat_router)
@app.get('/health')
def health_check():
from cache.vessel_store import vessel_store
return {
'status': 'ok',
'snpdb': snpdb.check_health(),
'kcgdb': kcgdb.check_health(),
'store': vessel_store.stats(),
}
@app.get('/api/v1/analysis/status')
def analysis_status():
return get_last_run()
@app.post('/api/v1/analysis/trigger')
def trigger_analysis(background_tasks: BackgroundTasks):
background_tasks.add_task(run_analysis_cycle)
return {'message': 'analysis cycle triggered'}
@app.get('/api/v1/correlation/{group_key:path}/tracks')
def get_correlation_tracks(
group_key: str,
hours: int = 24,
min_score: float = 0.3,
):
"""Return correlated vessels with their track history for map rendering.
Queries gear_correlation_scores (ALL active models) and enriches with
24h track data from in-memory vessel_store.
Each vessel includes which models detected it.
"""
from cache.vessel_store import vessel_store
try:
with kcgdb.get_conn() as conn:
cur = conn.cursor()
# Get correlated vessels from ALL active models
cur.execute("""
SELECT s.target_mmsi, s.target_type, s.target_name,
s.current_score, m.name AS model_name
FROM kcg.gear_correlation_scores s
JOIN kcg.correlation_param_models m ON s.model_id = m.id
WHERE s.group_key = %s
AND s.current_score >= %s
AND m.is_active = TRUE
ORDER BY s.current_score DESC
""", (group_key, min_score))
rows = cur.fetchall()
cur.close()
logger.info('correlation tracks: group_key=%r, min_score=%s, rows=%d',
group_key, min_score, len(rows))
if not rows:
return {'groupKey': group_key, 'vessels': []}
# Group by MMSI: collect all models per vessel, keep highest score
vessel_map: dict[str, dict] = {}
for row in rows:
mmsi = row[0]
model_name = row[4]
score = float(row[3])
if mmsi not in vessel_map:
vessel_map[mmsi] = {
'mmsi': mmsi,
'type': row[1],
'name': row[2] or '',
'score': score,
'models': {model_name: score},
}
else:
entry = vessel_map[mmsi]
entry['models'][model_name] = score
if score > entry['score']:
entry['score'] = score
mmsis = list(vessel_map.keys())
# Get tracks from vessel_store
tracks = vessel_store.get_vessel_tracks(mmsis, hours)
with_tracks = sum(1 for m in mmsis if m in tracks and len(tracks[m]) > 0)
logger.info('correlation tracks: %d unique mmsis, %d with track data, vessel_store._tracks has %d entries',
len(mmsis), with_tracks, len(vessel_store._tracks))
# Build response
vessels = []
for info in vessel_map.values():
track = tracks.get(info['mmsi'], [])
vessels.append({
'mmsi': info['mmsi'],
'name': info['name'],
'type': info['type'],
'score': info['score'],
'models': info['models'], # {modelName: score, ...}
'track': track,
})
return {'groupKey': group_key, 'vessels': vessels}
except Exception as e:
logger.warning('get_correlation_tracks failed for %s: %s', group_key, e)
return {'groupKey': group_key, 'vessels': []}