from fastapi import FastAPI, Request from fastapi.responses import JSONResponse import sys import asyncio import uuid import os import numpy as np from concurrent.futures import ThreadPoolExecutor from enum import Enum from datetime import datetime, timedelta from typing import Optional from opendrift.readers import reader_netCDF_CF_generic from opendrift.models.openoil import OpenOil from config import STORAGE, COORDS, SIM from logger import get_logger from utils import check_nc_file_by_date, check_nc_files_for_date, check_img_file_by_date, kst_to_utc from createJsonResult import extract_and_save_data_to_json as extract_json from findFile import find_nearest_earlier_file as find_file from extractUvFull import extract_uv_full from latestForecastDate import get_earliest_latest_forecast_date logger = get_logger("api") app = FastAPI() # ============================================================ # Workers 포화 관리 (단일 프로세스 기준 — startup.sh: --workers 1) # ============================================================ MAX_CONCURRENT = int(os.getenv('MAX_CONCURRENT_JOBS', '4')) jobs: dict[str, dict] = {} _thread_pool = ThreadPoolExecutor(max_workers=MAX_CONCURRENT) # ============================================================ # Parcels 선택적 로드 (없어도 동작) # ============================================================ try: sys.path.insert(0, str(STORAGE.PARCELS_PATH)) from parcels_api import router as parcels_router # type: ignore app.include_router(parcels_router) logger.info("Parcels router 로드 완료") except Exception as _e: logger.warning(f"Parcels router 로드 건너뜀 (정상): {_e}") class CustomErrorCode(Enum): FILE_NOT_FOUND = 5001 PARSE_ERROR = 5002 MODELING_ERROR = 5003 SYSTEM_ERROR = 5004 def _parse_datetime(dt_str: Optional[str]) -> Optional[datetime]: """다양한 형식의 날짜 문자열을 datetime으로 변환 (KST 기준)""" if not dt_str: return None formats = [ "%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S", "%Y%m%d%H", ] for fmt in formats: try: return datetime.strptime(dt_str, fmt) except ValueError: continue return None # ============================================================ # 기존 API 엔드포인트 (변경 없음) # ============================================================ @app.get("/get-received-date") async def get_received_date(): """예보 수신일 및 가능일 확인""" result = get_earliest_latest_forecast_date() if result: return JSONResponse(content=result, status_code=200) return JSONResponse(content={ "error_code": CustomErrorCode.FILE_NOT_FOUND.value, "message": "File not found error" }, status_code=200) @app.get("/get-uv/{datetime_str}/{category}") async def get_uv(datetime_str: str, category: str): """바람, 해수 시각화용 데이터 리턴""" date_obj = kst_to_utc(datetime.strptime(datetime_str, "%Y%m%d%H")) if category == "wind": nc_path, date = check_nc_file_by_date(str(STORAGE.WIND), date_obj) else: nc_path, date = check_nc_file_by_date(str(STORAGE.HYDR), date_obj) result = extract_uv_full( nc_path, date_obj.strftime("%Y-%m-%d %H:%M:%S"), category, skip=1, lon_range=COORDS.lon_range, lat_range=COORDS.lat_range ) return JSONResponse(content={"result": result}, status_code=200) # ============================================================ # NC 파일 확인 (수정: 404 반환으로 Node.js !checkRes.ok 연동) # ============================================================ @app.post("/check-nc") async def check_nc(request: Request): """기상 데이터 존재 여부 확인. startTime(KST) 기준으로 NC 파일 조회.""" body = await request.json() start_time_str = body.get('startTime') or body.get('start_time') try: date_obj = _parse_datetime(start_time_str) if date_obj is None: date_obj = datetime.now() date_utc = kst_to_utc(date_obj) wind_nc_path, ocean_nc_path, _, _ = check_nc_files_for_date(date_utc) if not wind_nc_path or not ocean_nc_path: return JSONResponse(content={"message": "not exist"}, status_code=404) return JSONResponse(content={"message": "exist"}, status_code=200) except Exception: logger.exception("Error checking NC files") return JSONResponse(content={ "error_code": CustomErrorCode.SYSTEM_ERROR.value, "message": "System Error" }, status_code=500) # ============================================================ # 비동기 시뮬레이션 실행 (Workers 포화 제어) # ============================================================ @app.post("/run-model") async def run_model(request: Request): """기름 유출 시뮬레이션 비동기 실행. job_id를 즉시 반환하고 백그라운드에서 처리.""" running = sum(1 for j in jobs.values() if j['status'] == 'RUNNING') if running >= MAX_CONCURRENT: return JSONResponse(status_code=503, content={ 'success': False, 'error': '분석 서버가 사용 중입니다. 잠시 후 재시도해 주세요.', 'running': running, 'max': MAX_CONCURRENT, }) body = await request.json() job_id = str(uuid.uuid4()) jobs[job_id] = {'status': 'RUNNING', 'result': None, 'error': None} asyncio.create_task(_run_simulation(job_id, body)) return JSONResponse(content={'success': True, 'job_id': job_id, 'status': 'RUNNING'}, status_code=200) @app.get("/status/{job_id}") async def get_job_status(job_id: str): """시뮬레이션 작업 상태 조회""" if job_id not in jobs: return JSONResponse(content={'error': 'Job not found'}, status_code=404) job = jobs[job_id] if job['status'] == 'DONE': return JSONResponse(content={'status': 'DONE', 'result': job['result']}) if job['status'] == 'ERROR': return JSONResponse(content={'status': 'ERROR', 'error': job['error']}) return JSONResponse(content={'status': 'RUNNING'}) async def _run_simulation(job_id: str, body: dict) -> None: """시뮬레이션을 ThreadPoolExecutor에서 실행하고 결과를 jobs 딕셔너리에 저장""" loop = asyncio.get_event_loop() try: result = await loop.run_in_executor(_thread_pool, _simulate_sync, body, job_id) jobs[job_id] = {'status': 'DONE', 'result': result, 'error': None} except Exception as e: logger.exception(f"시뮬레이션 오류 (job_id={job_id})") jobs[job_id] = {'status': 'ERROR', 'result': None, 'error': str(e)} def _simulate_sync(body: dict, job_id: str): """동기 시뮬레이션 로직 (ThreadPoolExecutor에서 실행)""" start_time_str = body.get('startTime') or body.get('start_time') run_time = body.get('runTime') or body.get('run_time') mat_ty = body.get('matTy') or body.get('mat_ty') mat_vol = body.get('matVol') or body.get('mat_vol') lon = body.get('lon') lat = body.get('lat') spill_time = body.get('spillTime') or body.get('spill_time') name = body.get('name') or job_id # name 없으면 job_id 사용 start_time_measure = datetime.now() o = OpenOil(loglevel=20) date_obj = _parse_datetime(start_time_str) or datetime.now() date_utc = kst_to_utc(date_obj) wind_nc_path, ocean_nc_path, _, _ = check_nc_files_for_date(date_utc) if not wind_nc_path: raise FileNotFoundError("바람 NC 파일을 찾을 수 없습니다.") if not ocean_nc_path: raise FileNotFoundError("해양 NC 파일을 찾을 수 없습니다.") logger.info(f"[job:{job_id}] wind_nc_path: {wind_nc_path}") logger.info(f"[job:{job_id}] ocean_nc_path: {ocean_nc_path}") reader_wind = reader_netCDF_CF_generic.Reader( wind_nc_path, standard_name_mapping={'x_wind': 'x_wind', 'y_wind': 'y_wind'} ) reader_ocean = reader_netCDF_CF_generic.Reader(ocean_nc_path) if 'temperature' in reader_ocean.Dataset.variables: temp = reader_ocean.Dataset['temperature'] temp_values = temp.values mask = temp_values > SIM.TEMPERATURE_THRESHOLD temp_values[mask] = np.nan reader_ocean.Dataset['temperature'].values = temp_values o.add_reader([reader_ocean, reader_wind]) o.set_config('processes:evaporation', True) o.set_config('processes:emulsification', True) o.set_config('drift:vertical_mixing', True) o.set_config('vertical_mixing:timestep', SIM.VERTICAL_MIXING_TIMESTEP) o.set_config('seed:m3_per_hour', mat_vol) if spill_time == 0 or spill_time is None: o.seed_elements(lon=lon, lat=lat, number=100, time=date_utc, z=0, oil_type=mat_ty) else: release_duration = timedelta(hours=spill_time) end_t = date_utc + release_duration o.seed_elements(lon=lon, lat=lat, number=100, time=[date_utc, end_t], z=0, oil_type=mat_ty) ncfile = f"{STORAGE.RESULT}/{name}.nc" try: o.run(duration=timedelta(hours=run_time), time_step=900, time_step_output=3600, outfile=ncfile) except Exception as e: logger.error(f"[job:{job_id}] 시뮬레이션 실행 오류: {e}") raise json_data = extract_json(ncfile, wind_nc_path, ocean_nc_path, name, lon, lat) if not json_data: raise ValueError("시뮬레이션 결과 변환 실패") elapsed = (datetime.now() - start_time_measure).total_seconds() logger.info(f"[job:{job_id}] 완료: {int(elapsed//60)}m {int(elapsed%60)}s") return json_data if __name__ == "__main__": import uvicorn # 서버 설정 (호스트와 포트는 필요에 따라 수정하세요) # log_level="info"를 통해 FastAPI와 uvicorn의 로그를 확인할 수 있습니다. uvicorn.run( "api:app", host="0.0.0.0", port=5003, reload=True # 코드 변경 시 자동으로 서버를 재시작하는 모드 (개발용) )