[예측] - OpenDrift Python API 서버 및 스크립트 추가 (prediction/opendrift/) - 시뮬레이션 상태 폴링 훅(useSimulationStatus), 로딩 오버레이 추가 - HydrParticleOverlay: deck.gl 기반 입자 궤적 시각화 레이어 - OilSpillView/LeftPanel/RightPanel: 시뮬레이션 실행·결과 표시 UI 개편 - predictionService/predictionRouter: 시뮬레이션 CRUD 및 상태 관리 API - simulation.ts: OpenDrift 연동 엔드포인트 확장 - docs/PREDICTION-GUIDE.md: 예측 기능 개발 가이드 추가 [CCTV/항공방제] - CCTV 오일 감지 GPU 추론 연동 (OilDetectionOverlay, useOilDetection) - CCTV 안전관리 감지 기능 추가 (선박 출입, 침입 감지) - oil_inference_server.py: Python GPU 추론 서버 [관리자] - 관리자 화면 고도화 (사용자/권한/게시판/선박신호 패널) - AdminSidebar, BoardMgmtPanel, VesselSignalPanel 신규 컴포넌트 [기타] - DB: 시뮬레이션 결과, 선박보험 시드(1391건), 역할 정리 마이그레이션 - 팀 워크플로우 v1.6.1 동기화 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
267 lines
10 KiB
Python
267 lines
10 KiB
Python
from fastapi import FastAPI, Request
|
|
from fastapi.responses import JSONResponse
|
|
|
|
import sys
|
|
import asyncio
|
|
import uuid
|
|
import os
|
|
import numpy as np
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from enum import Enum
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
from opendrift.readers import reader_netCDF_CF_generic
|
|
from opendrift.models.openoil import OpenOil
|
|
|
|
from config import STORAGE, COORDS, SIM
|
|
from logger import get_logger
|
|
from utils import check_nc_file_by_date, check_nc_files_for_date, check_img_file_by_date, kst_to_utc
|
|
from createJsonResult import extract_and_save_data_to_json as extract_json
|
|
from findFile import find_nearest_earlier_file as find_file
|
|
from extractUvFull import extract_uv_full
|
|
from latestForecastDate import get_earliest_latest_forecast_date
|
|
|
|
logger = get_logger("api")
|
|
app = FastAPI()
|
|
|
|
# ============================================================
|
|
# Workers 포화 관리 (단일 프로세스 기준 — startup.sh: --workers 1)
|
|
# ============================================================
|
|
MAX_CONCURRENT = int(os.getenv('MAX_CONCURRENT_JOBS', '4'))
|
|
jobs: dict[str, dict] = {}
|
|
_thread_pool = ThreadPoolExecutor(max_workers=MAX_CONCURRENT)
|
|
|
|
# ============================================================
|
|
# Parcels 선택적 로드 (없어도 동작)
|
|
# ============================================================
|
|
try:
|
|
sys.path.insert(0, str(STORAGE.PARCELS_PATH))
|
|
from parcels_api import router as parcels_router # type: ignore
|
|
app.include_router(parcels_router)
|
|
logger.info("Parcels router 로드 완료")
|
|
except Exception as _e:
|
|
logger.warning(f"Parcels router 로드 건너뜀 (정상): {_e}")
|
|
|
|
|
|
class CustomErrorCode(Enum):
|
|
FILE_NOT_FOUND = 5001
|
|
PARSE_ERROR = 5002
|
|
MODELING_ERROR = 5003
|
|
SYSTEM_ERROR = 5004
|
|
|
|
|
|
def _parse_datetime(dt_str: Optional[str]) -> Optional[datetime]:
|
|
"""다양한 형식의 날짜 문자열을 datetime으로 변환 (KST 기준)"""
|
|
if not dt_str:
|
|
return None
|
|
formats = [
|
|
"%Y-%m-%dT%H:%M:%S.%fZ",
|
|
"%Y-%m-%dT%H:%M:%SZ",
|
|
"%Y-%m-%dT%H:%M:%S",
|
|
"%Y-%m-%d %H:%M:%S",
|
|
"%Y%m%d%H",
|
|
]
|
|
for fmt in formats:
|
|
try:
|
|
return datetime.strptime(dt_str, fmt)
|
|
except ValueError:
|
|
continue
|
|
return None
|
|
|
|
|
|
# ============================================================
|
|
# 기존 API 엔드포인트 (변경 없음)
|
|
# ============================================================
|
|
|
|
@app.get("/get-received-date")
|
|
async def get_received_date():
|
|
"""예보 수신일 및 가능일 확인"""
|
|
result = get_earliest_latest_forecast_date()
|
|
if result:
|
|
return JSONResponse(content=result, status_code=200)
|
|
return JSONResponse(content={
|
|
"error_code": CustomErrorCode.FILE_NOT_FOUND.value,
|
|
"message": "File not found error"
|
|
}, status_code=200)
|
|
|
|
|
|
@app.get("/get-uv/{datetime_str}/{category}")
|
|
async def get_uv(datetime_str: str, category: str):
|
|
"""바람, 해수 시각화용 데이터 리턴"""
|
|
date_obj = kst_to_utc(datetime.strptime(datetime_str, "%Y%m%d%H"))
|
|
if category == "wind":
|
|
nc_path, date = check_nc_file_by_date(str(STORAGE.WIND), date_obj)
|
|
else:
|
|
nc_path, date = check_nc_file_by_date(str(STORAGE.HYDR), date_obj)
|
|
result = extract_uv_full(
|
|
nc_path,
|
|
date_obj.strftime("%Y-%m-%d %H:%M:%S"),
|
|
category,
|
|
skip=1,
|
|
lon_range=COORDS.lon_range,
|
|
lat_range=COORDS.lat_range
|
|
)
|
|
return JSONResponse(content={"result": result}, status_code=200)
|
|
|
|
|
|
# ============================================================
|
|
# NC 파일 확인 (수정: 404 반환으로 Node.js !checkRes.ok 연동)
|
|
# ============================================================
|
|
|
|
@app.post("/check-nc")
|
|
async def check_nc(request: Request):
|
|
"""기상 데이터 존재 여부 확인. startTime(KST) 기준으로 NC 파일 조회."""
|
|
body = await request.json()
|
|
start_time_str = body.get('startTime') or body.get('start_time')
|
|
|
|
try:
|
|
date_obj = _parse_datetime(start_time_str)
|
|
if date_obj is None:
|
|
date_obj = datetime.now()
|
|
|
|
date_utc = kst_to_utc(date_obj)
|
|
wind_nc_path, ocean_nc_path, _, _ = check_nc_files_for_date(date_utc)
|
|
|
|
if not wind_nc_path or not ocean_nc_path:
|
|
return JSONResponse(content={"message": "not exist"}, status_code=404)
|
|
|
|
return JSONResponse(content={"message": "exist"}, status_code=200)
|
|
|
|
except Exception:
|
|
logger.exception("Error checking NC files")
|
|
return JSONResponse(content={
|
|
"error_code": CustomErrorCode.SYSTEM_ERROR.value,
|
|
"message": "System Error"
|
|
}, status_code=500)
|
|
|
|
|
|
# ============================================================
|
|
# 비동기 시뮬레이션 실행 (Workers 포화 제어)
|
|
# ============================================================
|
|
|
|
@app.post("/run-model")
|
|
async def run_model(request: Request):
|
|
"""기름 유출 시뮬레이션 비동기 실행. job_id를 즉시 반환하고 백그라운드에서 처리."""
|
|
running = sum(1 for j in jobs.values() if j['status'] == 'RUNNING')
|
|
if running >= MAX_CONCURRENT:
|
|
return JSONResponse(status_code=503, content={
|
|
'success': False,
|
|
'error': '분석 서버가 사용 중입니다. 잠시 후 재시도해 주세요.',
|
|
'running': running,
|
|
'max': MAX_CONCURRENT,
|
|
})
|
|
|
|
body = await request.json()
|
|
job_id = str(uuid.uuid4())
|
|
jobs[job_id] = {'status': 'RUNNING', 'result': None, 'error': None}
|
|
asyncio.create_task(_run_simulation(job_id, body))
|
|
|
|
return JSONResponse(content={'success': True, 'job_id': job_id, 'status': 'RUNNING'}, status_code=200)
|
|
|
|
|
|
@app.get("/status/{job_id}")
|
|
async def get_job_status(job_id: str):
|
|
"""시뮬레이션 작업 상태 조회"""
|
|
if job_id not in jobs:
|
|
return JSONResponse(content={'error': 'Job not found'}, status_code=404)
|
|
|
|
job = jobs[job_id]
|
|
if job['status'] == 'DONE':
|
|
return JSONResponse(content={'status': 'DONE', 'result': job['result']})
|
|
if job['status'] == 'ERROR':
|
|
return JSONResponse(content={'status': 'ERROR', 'error': job['error']})
|
|
return JSONResponse(content={'status': 'RUNNING'})
|
|
|
|
|
|
async def _run_simulation(job_id: str, body: dict) -> None:
|
|
"""시뮬레이션을 ThreadPoolExecutor에서 실행하고 결과를 jobs 딕셔너리에 저장"""
|
|
loop = asyncio.get_event_loop()
|
|
try:
|
|
result = await loop.run_in_executor(_thread_pool, _simulate_sync, body, job_id)
|
|
jobs[job_id] = {'status': 'DONE', 'result': result, 'error': None}
|
|
except Exception as e:
|
|
logger.exception(f"시뮬레이션 오류 (job_id={job_id})")
|
|
jobs[job_id] = {'status': 'ERROR', 'result': None, 'error': str(e)}
|
|
|
|
|
|
def _simulate_sync(body: dict, job_id: str):
|
|
"""동기 시뮬레이션 로직 (ThreadPoolExecutor에서 실행)"""
|
|
start_time_str = body.get('startTime') or body.get('start_time')
|
|
run_time = body.get('runTime') or body.get('run_time')
|
|
mat_ty = body.get('matTy') or body.get('mat_ty')
|
|
mat_vol = body.get('matVol') or body.get('mat_vol')
|
|
lon = body.get('lon')
|
|
lat = body.get('lat')
|
|
spill_time = body.get('spillTime') or body.get('spill_time')
|
|
name = body.get('name') or job_id # name 없으면 job_id 사용
|
|
|
|
start_time_measure = datetime.now()
|
|
o = OpenOil(loglevel=20)
|
|
|
|
date_obj = _parse_datetime(start_time_str) or datetime.now()
|
|
date_utc = kst_to_utc(date_obj)
|
|
|
|
wind_nc_path, ocean_nc_path, _, _ = check_nc_files_for_date(date_utc)
|
|
if not wind_nc_path:
|
|
raise FileNotFoundError("바람 NC 파일을 찾을 수 없습니다.")
|
|
if not ocean_nc_path:
|
|
raise FileNotFoundError("해양 NC 파일을 찾을 수 없습니다.")
|
|
|
|
logger.info(f"[job:{job_id}] wind_nc_path: {wind_nc_path}")
|
|
logger.info(f"[job:{job_id}] ocean_nc_path: {ocean_nc_path}")
|
|
|
|
reader_wind = reader_netCDF_CF_generic.Reader(
|
|
wind_nc_path,
|
|
standard_name_mapping={'x_wind': 'x_wind', 'y_wind': 'y_wind'}
|
|
)
|
|
reader_ocean = reader_netCDF_CF_generic.Reader(ocean_nc_path)
|
|
if 'temperature' in reader_ocean.Dataset.variables:
|
|
temp = reader_ocean.Dataset['temperature']
|
|
temp_values = temp.values
|
|
mask = temp_values > SIM.TEMPERATURE_THRESHOLD
|
|
temp_values[mask] = np.nan
|
|
reader_ocean.Dataset['temperature'].values = temp_values
|
|
|
|
o.add_reader([reader_ocean, reader_wind])
|
|
|
|
o.set_config('processes:evaporation', True)
|
|
o.set_config('processes:emulsification', True)
|
|
o.set_config('drift:vertical_mixing', True)
|
|
o.set_config('vertical_mixing:timestep', SIM.VERTICAL_MIXING_TIMESTEP)
|
|
o.set_config('seed:m3_per_hour', mat_vol)
|
|
|
|
if spill_time == 0 or spill_time is None:
|
|
o.seed_elements(lon=lon, lat=lat, number=100,
|
|
time=date_utc, z=0, oil_type=mat_ty)
|
|
else:
|
|
release_duration = timedelta(hours=spill_time)
|
|
end_t = date_utc + release_duration
|
|
o.seed_elements(lon=lon, lat=lat, number=100,
|
|
time=[date_utc, end_t], z=0, oil_type=mat_ty)
|
|
|
|
ncfile = f"{STORAGE.RESULT}/{name}.nc"
|
|
try:
|
|
o.run(duration=timedelta(hours=run_time), time_step=900, time_step_output=3600, outfile=ncfile)
|
|
except Exception as e:
|
|
logger.error(f"[job:{job_id}] 시뮬레이션 실행 오류: {e}")
|
|
raise
|
|
|
|
json_data = extract_json(ncfile, wind_nc_path, ocean_nc_path, name, lon, lat)
|
|
if not json_data:
|
|
raise ValueError("시뮬레이션 결과 변환 실패")
|
|
|
|
elapsed = (datetime.now() - start_time_measure).total_seconds()
|
|
logger.info(f"[job:{job_id}] 완료: {int(elapsed//60)}m {int(elapsed%60)}s")
|
|
return json_data
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
# 서버 설정 (호스트와 포트는 필요에 따라 수정하세요)
|
|
# log_level="info"를 통해 FastAPI와 uvicorn의 로그를 확인할 수 있습니다.
|
|
uvicorn.run(
|
|
"api:app",
|
|
host="0.0.0.0",
|
|
port=5003,
|
|
reload=True # 코드 변경 시 자동으로 서버를 재시작하는 모드 (개발용)
|
|
) |