wing-ops/prediction/opendrift/api.py
jeonghyo.k 88eb6b121a feat(prediction): OpenDrift 유류 확산 시뮬레이션 통합 + CCTV/관리자 고도화
[예측]
- OpenDrift Python API 서버 및 스크립트 추가 (prediction/opendrift/)
- 시뮬레이션 상태 폴링 훅(useSimulationStatus), 로딩 오버레이 추가
- HydrParticleOverlay: deck.gl 기반 입자 궤적 시각화 레이어
- OilSpillView/LeftPanel/RightPanel: 시뮬레이션 실행·결과 표시 UI 개편
- predictionService/predictionRouter: 시뮬레이션 CRUD 및 상태 관리 API
- simulation.ts: OpenDrift 연동 엔드포인트 확장
- docs/PREDICTION-GUIDE.md: 예측 기능 개발 가이드 추가

[CCTV/항공방제]
- CCTV 오일 감지 GPU 추론 연동 (OilDetectionOverlay, useOilDetection)
- CCTV 안전관리 감지 기능 추가 (선박 출입, 침입 감지)
- oil_inference_server.py: Python GPU 추론 서버

[관리자]
- 관리자 화면 고도화 (사용자/권한/게시판/선박신호 패널)
- AdminSidebar, BoardMgmtPanel, VesselSignalPanel 신규 컴포넌트

[기타]
- DB: 시뮬레이션 결과, 선박보험 시드(1391건), 역할 정리 마이그레이션
- 팀 워크플로우 v1.6.1 동기화

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-09 14:55:46 +09:00

267 lines
10 KiB
Python

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import sys
import asyncio
import uuid
import os
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
from datetime import datetime, timedelta
from typing import Optional
from opendrift.readers import reader_netCDF_CF_generic
from opendrift.models.openoil import OpenOil
from config import STORAGE, COORDS, SIM
from logger import get_logger
from utils import check_nc_file_by_date, check_nc_files_for_date, check_img_file_by_date, kst_to_utc
from createJsonResult import extract_and_save_data_to_json as extract_json
from findFile import find_nearest_earlier_file as find_file
from extractUvFull import extract_uv_full
from latestForecastDate import get_earliest_latest_forecast_date
logger = get_logger("api")
app = FastAPI()
# ============================================================
# Workers 포화 관리 (단일 프로세스 기준 — startup.sh: --workers 1)
# ============================================================
MAX_CONCURRENT = int(os.getenv('MAX_CONCURRENT_JOBS', '4'))
jobs: dict[str, dict] = {}
_thread_pool = ThreadPoolExecutor(max_workers=MAX_CONCURRENT)
# ============================================================
# Parcels 선택적 로드 (없어도 동작)
# ============================================================
try:
sys.path.insert(0, str(STORAGE.PARCELS_PATH))
from parcels_api import router as parcels_router # type: ignore
app.include_router(parcels_router)
logger.info("Parcels router 로드 완료")
except Exception as _e:
logger.warning(f"Parcels router 로드 건너뜀 (정상): {_e}")
class CustomErrorCode(Enum):
FILE_NOT_FOUND = 5001
PARSE_ERROR = 5002
MODELING_ERROR = 5003
SYSTEM_ERROR = 5004
def _parse_datetime(dt_str: Optional[str]) -> Optional[datetime]:
"""다양한 형식의 날짜 문자열을 datetime으로 변환 (KST 기준)"""
if not dt_str:
return None
formats = [
"%Y-%m-%dT%H:%M:%S.%fZ",
"%Y-%m-%dT%H:%M:%SZ",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%d %H:%M:%S",
"%Y%m%d%H",
]
for fmt in formats:
try:
return datetime.strptime(dt_str, fmt)
except ValueError:
continue
return None
# ============================================================
# 기존 API 엔드포인트 (변경 없음)
# ============================================================
@app.get("/get-received-date")
async def get_received_date():
"""예보 수신일 및 가능일 확인"""
result = get_earliest_latest_forecast_date()
if result:
return JSONResponse(content=result, status_code=200)
return JSONResponse(content={
"error_code": CustomErrorCode.FILE_NOT_FOUND.value,
"message": "File not found error"
}, status_code=200)
@app.get("/get-uv/{datetime_str}/{category}")
async def get_uv(datetime_str: str, category: str):
"""바람, 해수 시각화용 데이터 리턴"""
date_obj = kst_to_utc(datetime.strptime(datetime_str, "%Y%m%d%H"))
if category == "wind":
nc_path, date = check_nc_file_by_date(str(STORAGE.WIND), date_obj)
else:
nc_path, date = check_nc_file_by_date(str(STORAGE.HYDR), date_obj)
result = extract_uv_full(
nc_path,
date_obj.strftime("%Y-%m-%d %H:%M:%S"),
category,
skip=1,
lon_range=COORDS.lon_range,
lat_range=COORDS.lat_range
)
return JSONResponse(content={"result": result}, status_code=200)
# ============================================================
# NC 파일 확인 (수정: 404 반환으로 Node.js !checkRes.ok 연동)
# ============================================================
@app.post("/check-nc")
async def check_nc(request: Request):
"""기상 데이터 존재 여부 확인. startTime(KST) 기준으로 NC 파일 조회."""
body = await request.json()
start_time_str = body.get('startTime') or body.get('start_time')
try:
date_obj = _parse_datetime(start_time_str)
if date_obj is None:
date_obj = datetime.now()
date_utc = kst_to_utc(date_obj)
wind_nc_path, ocean_nc_path, _, _ = check_nc_files_for_date(date_utc)
if not wind_nc_path or not ocean_nc_path:
return JSONResponse(content={"message": "not exist"}, status_code=404)
return JSONResponse(content={"message": "exist"}, status_code=200)
except Exception:
logger.exception("Error checking NC files")
return JSONResponse(content={
"error_code": CustomErrorCode.SYSTEM_ERROR.value,
"message": "System Error"
}, status_code=500)
# ============================================================
# 비동기 시뮬레이션 실행 (Workers 포화 제어)
# ============================================================
@app.post("/run-model")
async def run_model(request: Request):
"""기름 유출 시뮬레이션 비동기 실행. job_id를 즉시 반환하고 백그라운드에서 처리."""
running = sum(1 for j in jobs.values() if j['status'] == 'RUNNING')
if running >= MAX_CONCURRENT:
return JSONResponse(status_code=503, content={
'success': False,
'error': '분석 서버가 사용 중입니다. 잠시 후 재시도해 주세요.',
'running': running,
'max': MAX_CONCURRENT,
})
body = await request.json()
job_id = str(uuid.uuid4())
jobs[job_id] = {'status': 'RUNNING', 'result': None, 'error': None}
asyncio.create_task(_run_simulation(job_id, body))
return JSONResponse(content={'success': True, 'job_id': job_id, 'status': 'RUNNING'}, status_code=200)
@app.get("/status/{job_id}")
async def get_job_status(job_id: str):
"""시뮬레이션 작업 상태 조회"""
if job_id not in jobs:
return JSONResponse(content={'error': 'Job not found'}, status_code=404)
job = jobs[job_id]
if job['status'] == 'DONE':
return JSONResponse(content={'status': 'DONE', 'result': job['result']})
if job['status'] == 'ERROR':
return JSONResponse(content={'status': 'ERROR', 'error': job['error']})
return JSONResponse(content={'status': 'RUNNING'})
async def _run_simulation(job_id: str, body: dict) -> None:
"""시뮬레이션을 ThreadPoolExecutor에서 실행하고 결과를 jobs 딕셔너리에 저장"""
loop = asyncio.get_event_loop()
try:
result = await loop.run_in_executor(_thread_pool, _simulate_sync, body, job_id)
jobs[job_id] = {'status': 'DONE', 'result': result, 'error': None}
except Exception as e:
logger.exception(f"시뮬레이션 오류 (job_id={job_id})")
jobs[job_id] = {'status': 'ERROR', 'result': None, 'error': str(e)}
def _simulate_sync(body: dict, job_id: str):
"""동기 시뮬레이션 로직 (ThreadPoolExecutor에서 실행)"""
start_time_str = body.get('startTime') or body.get('start_time')
run_time = body.get('runTime') or body.get('run_time')
mat_ty = body.get('matTy') or body.get('mat_ty')
mat_vol = body.get('matVol') or body.get('mat_vol')
lon = body.get('lon')
lat = body.get('lat')
spill_time = body.get('spillTime') or body.get('spill_time')
name = body.get('name') or job_id # name 없으면 job_id 사용
start_time_measure = datetime.now()
o = OpenOil(loglevel=20)
date_obj = _parse_datetime(start_time_str) or datetime.now()
date_utc = kst_to_utc(date_obj)
wind_nc_path, ocean_nc_path, _, _ = check_nc_files_for_date(date_utc)
if not wind_nc_path:
raise FileNotFoundError("바람 NC 파일을 찾을 수 없습니다.")
if not ocean_nc_path:
raise FileNotFoundError("해양 NC 파일을 찾을 수 없습니다.")
logger.info(f"[job:{job_id}] wind_nc_path: {wind_nc_path}")
logger.info(f"[job:{job_id}] ocean_nc_path: {ocean_nc_path}")
reader_wind = reader_netCDF_CF_generic.Reader(
wind_nc_path,
standard_name_mapping={'x_wind': 'x_wind', 'y_wind': 'y_wind'}
)
reader_ocean = reader_netCDF_CF_generic.Reader(ocean_nc_path)
if 'temperature' in reader_ocean.Dataset.variables:
temp = reader_ocean.Dataset['temperature']
temp_values = temp.values
mask = temp_values > SIM.TEMPERATURE_THRESHOLD
temp_values[mask] = np.nan
reader_ocean.Dataset['temperature'].values = temp_values
o.add_reader([reader_ocean, reader_wind])
o.set_config('processes:evaporation', True)
o.set_config('processes:emulsification', True)
o.set_config('drift:vertical_mixing', True)
o.set_config('vertical_mixing:timestep', SIM.VERTICAL_MIXING_TIMESTEP)
o.set_config('seed:m3_per_hour', mat_vol)
if spill_time == 0 or spill_time is None:
o.seed_elements(lon=lon, lat=lat, number=100,
time=date_utc, z=0, oil_type=mat_ty)
else:
release_duration = timedelta(hours=spill_time)
end_t = date_utc + release_duration
o.seed_elements(lon=lon, lat=lat, number=100,
time=[date_utc, end_t], z=0, oil_type=mat_ty)
ncfile = f"{STORAGE.RESULT}/{name}.nc"
try:
o.run(duration=timedelta(hours=run_time), time_step=900, time_step_output=3600, outfile=ncfile)
except Exception as e:
logger.error(f"[job:{job_id}] 시뮬레이션 실행 오류: {e}")
raise
json_data = extract_json(ncfile, wind_nc_path, ocean_nc_path, name, lon, lat)
if not json_data:
raise ValueError("시뮬레이션 결과 변환 실패")
elapsed = (datetime.now() - start_time_measure).total_seconds()
logger.info(f"[job:{job_id}] 완료: {int(elapsed//60)}m {int(elapsed%60)}s")
return json_data
if __name__ == "__main__":
import uvicorn
# 서버 설정 (호스트와 포트는 필요에 따라 수정하세요)
# log_level="info"를 통해 FastAPI와 uvicorn의 로그를 확인할 수 있습니다.
uvicorn.run(
"api:app",
host="0.0.0.0",
port=5003,
reload=True # 코드 변경 시 자동으로 서버를 재시작하는 모드 (개발용)
)