import { Router, Request, Response } from 'express' import { wingPool } from '../db/wingDb.js' import { requireAuth } from '../auth/authMiddleware.js' import { isValidLatitude, isValidLongitude, isValidNumber, isValidStringLength, } from '../middleware/security.js' const router = Router() const PYTHON_API_URL = process.env.PYTHON_API_URL ?? 'http://localhost:5003' const POSEIDON_API_URL = process.env.POSEIDON_API_URL ?? 'http://localhost:5004' const POLL_INTERVAL_MS = 3000 const POLL_TIMEOUT_MS = 30 * 60 * 1000 // 30분 // 유종 매핑: 한국어 UI 선택값 → OpenDrift 유종 코드 // 추후 DB/설정 파일로 외부화 예정 (개발 단계 임시 구현) const OIL_TYPE_MAP: Record = { '벙커C유': 'GENERIC BUNKER C', '경유': 'GENERIC DIESEL', '원유': 'WEST TEXAS INTERMEDIATE (WTI)', '중유': 'GENERIC HEAVY FUEL OIL', '등유': 'FUEL OIL NO.1 (KEROSENE)', '휘발유': 'GENERIC GASOLINE', } // 유종 매핑: 한국어 UI → DB 저장 코드 const OIL_DB_CODE_MAP: Record = { '벙커C유': 'BUNKER_C', '경유': 'DIESEL', '원유': 'CRUDE_OIL', '중유': 'HEAVY_FUEL_OIL', '등유': 'KEROSENE', '휘발유': 'GASOLINE', } // 유출 형태 매핑: 한국어 UI → DB 저장 코드 const SPIL_TYPE_MAP: Record = { '연속': 'CONTINUOUS', '비연속': 'DISCONTINUOUS', '순간 유출': 'INSTANT', } // 단위 매핑: 한국어 UI → DB 저장 코드 const UNIT_MAP: Record = { 'kL': 'KL', 'ton': 'TON', 'barrel': 'BBL', } // ============================================================ // 신규 생성된 ACDNT/SPIL_DATA/PRED_EXEC 롤백 헬퍼 // Python 호출 실패 시 이번 요청에서 생성된 레코드만 삭제한다. // ============================================================ async function rollbackNewRecords( predExecSn: number | null, newSpilDataSn: number | null, newAcdntSn: number | null ): Promise { try { if (predExecSn !== null) { await wingPool.query('DELETE FROM wing.PRED_EXEC WHERE PRED_EXEC_SN=$1', [predExecSn]) } if (newSpilDataSn !== null) { await wingPool.query('DELETE FROM wing.SPIL_DATA WHERE SPIL_DATA_SN=$1', [newSpilDataSn]) } if (newAcdntSn !== null) { await wingPool.query('DELETE FROM wing.ACDNT WHERE ACDNT_SN=$1', [newAcdntSn]) } } catch (cleanupErr) { console.error('[simulation] 롤백 실패:', cleanupErr) } } // 모델명 → ALGO_CD 매핑 const MODEL_ALGO_CD_MAP: Record = { 'OpenDrift': 'OPENDRIFT', 'POSEIDON': 'POSEIDON', } // 모델명 → API URL 매핑 const MODEL_API_URL_MAP: Record = { 'OpenDrift': PYTHON_API_URL, 'POSEIDON': POSEIDON_API_URL, } // ============================================================ // POST /api/simulation/run // 확산 시뮬레이션 실행 (다중 모델 지원: OpenDrift, POSEIDON) // ============================================================ /** * 선택된 모델(OpenDrift, POSEIDON)로 확산 시뮬레이션을 실행한다. * 각 모델에 대해 PRED_EXEC 레코드를 별도 생성하고 Python API에 병렬 제출한다. * KOSPS 모델은 PRED_EXEC INSERT(PENDING)만 수행하고 외부 API 연동은 하지 않는다. * 프론트엔드는 execSns 배열의 각 execSn으로 GET /status/:execSn을 폴링하여 결과를 수신한다. */ router.post('/run', requireAuth, async (req: Request, res: Response) => { try { const { acdntSn: rawAcdntSn, acdntNm, spillUnit, spillTypeCd, lat, lon, runTime, matTy, matVol, spillTime, startTime, models: rawModels } = req.body // 실행할 모델 목록 (기본값: OpenDrift) const requestedModels: string[] = Array.isArray(rawModels) && rawModels.length > 0 ? (rawModels as string[]) : ['OpenDrift'] // 1. 필수 파라미터 검증 if (lat === undefined || lon === undefined || runTime === undefined) { return res.status(400).json({ error: '필수 파라미터 누락', required: ['lat', 'lon', 'runTime'], }) } if (!isValidLatitude(lat)) { return res.status(400).json({ error: '유효하지 않은 위도', message: '위도는 -90~90 범위여야 합니다.' }) } if (!isValidLongitude(lon)) { return res.status(400).json({ error: '유효하지 않은 경도', message: '경도는 -180~180 범위여야 합니다.' }) } if (!isValidNumber(runTime, 1, 720)) { return res.status(400).json({ error: '유효하지 않은 예측 시간', message: '예측 시간은 1~720 범위여야 합니다.' }) } if (matVol !== undefined && !isValidNumber(matVol, 0, 1000000)) { return res.status(400).json({ error: '유효하지 않은 유출량' }) } if (matTy !== undefined && (typeof matTy !== 'string' || !isValidStringLength(matTy, 50))) { return res.status(400).json({ error: '유효하지 않은 유종' }) } // acdntSn 없는 경우 acdntNm 필수 if (!rawAcdntSn && (!acdntNm || typeof acdntNm !== 'string' || !acdntNm.trim())) { return res.status(400).json({ error: '사고를 선택하거나 사고명을 입력해야 합니다.' }) } if (acdntNm && (typeof acdntNm !== 'string' || !isValidStringLength(acdntNm, 200))) { return res.status(400).json({ error: '사고명은 200자 이내여야 합니다.' }) } // 2. Python NC 파일 존재 여부 확인 (ACDNT 생성 전에 수행하여 고아 레코드 방지) // OpenDrift 모델이 포함된 경우에만 check-nc 수행 if (requestedModels.includes('OpenDrift')) { try { const checkRes = await fetch(`${PYTHON_API_URL}/check-nc`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ lat, lon, startTime }), signal: AbortSignal.timeout(5000), }) if (!checkRes.ok) { return res.status(409).json({ error: '해당 좌표의 해양 기상 데이터가 없습니다.', message: 'NC 파일이 준비되지 않았습니다.', }) } } catch { // Python 서버 미기동 — 5번에서 처리 } } // 1-B. acdntSn 미제공 시 ACDNT + SPIL_DATA 생성 let resolvedAcdntSn: number | null = rawAcdntSn ? Number(rawAcdntSn) : null let resolvedSpilDataSn: number | null = null // 이번 요청에서 신규 생성된 레코드 추적 (Python 실패 시 롤백 대상) let newlyCreatedAcdntSn: number | null = null let newlyCreatedSpilDataSn: number | null = null if (!resolvedAcdntSn && acdntNm) { try { const occrn = startTime ?? new Date().toISOString() const acdntRes = await wingPool.query( `INSERT INTO wing.ACDNT (ACDNT_CD, ACDNT_NM, ACDNT_TP_CD, OCCRN_DTM, LAT, LNG, ACDNT_STTS_CD, USE_YN, REG_DTM) VALUES ( 'INC-' || EXTRACT(YEAR FROM NOW())::TEXT || '-' || LPAD( (SELECT COALESCE(MAX(CAST(SPLIT_PART(ACDNT_CD, '-', 3) AS INTEGER)), 0) + 1 FROM wing.ACDNT WHERE ACDNT_CD LIKE 'INC-' || EXTRACT(YEAR FROM NOW())::TEXT || '-%')::TEXT, 4, '0' ), $1, '유류유출', $2, $3, $4, 'ACTIVE', 'Y', NOW() ) RETURNING ACDNT_SN`, [acdntNm.trim(), occrn, lat, lon] ) resolvedAcdntSn = acdntRes.rows[0].acdnt_sn as number newlyCreatedAcdntSn = resolvedAcdntSn const spilRes = await wingPool.query( `INSERT INTO wing.SPIL_DATA (ACDNT_SN, OIL_TP_CD, SPIL_QTY, SPIL_UNIT_CD, SPIL_TP_CD, FCST_HR, REG_DTM) VALUES ($1, $2, $3, $4, $5, $6, NOW()) RETURNING SPIL_DATA_SN`, [ resolvedAcdntSn, OIL_DB_CODE_MAP[matTy as string] ?? 'BUNKER_C', matVol ?? 0, UNIT_MAP[spillUnit as string] ?? 'KL', SPIL_TYPE_MAP[spillTypeCd as string] ?? 'CONTINUOUS', runTime, ] ) resolvedSpilDataSn = spilRes.rows[0].spil_data_sn as number newlyCreatedSpilDataSn = resolvedSpilDataSn } catch (dbErr) { console.error('[simulation] ACDNT/SPIL_DATA INSERT 실패:', dbErr) return res.status(500).json({ error: '사고 정보 생성 실패' }) } } // 3. 기존 사고의 경우 SPIL_DATA_SN 조회 if (resolvedAcdntSn && !resolvedSpilDataSn) { try { const spilRes = await wingPool.query( `INSERT INTO wing.SPIL_DATA (ACDNT_SN, OIL_TP_CD, SPIL_QTY, SPIL_UNIT_CD, SPIL_TP_CD, FCST_HR, REG_DTM) VALUES ($1, $2, $3, $4, $5, $6, NOW()) RETURNING SPIL_DATA_SN`, [ resolvedAcdntSn, OIL_DB_CODE_MAP[matTy as string] ?? 'BUNKER_C', matVol ?? 0, UNIT_MAP[spillUnit as string] ?? 'KL', SPIL_TYPE_MAP[spillTypeCd as string] ?? 'CONTINUOUS', runTime, ] ) resolvedSpilDataSn = spilRes.rows[0].spil_data_sn as number } catch (dbErr) { console.error('[simulation] SPIL_DATA INSERT 실패:', dbErr) } } // matTy 변환: 한국어 유종 → OpenDrift 유종 코드 // 매핑 대상이 아니면 원본 값 그대로 사용 (영문 직접 입력 대응) const odMatTy = matTy !== undefined ? (OIL_TYPE_MAP[matTy as string] ?? (matTy as string)) : undefined // 4. 각 모델별 PRED_EXEC INSERT 및 API 호출 (병렬) // KOSPS: PRED_EXEC PENDING 생성만 하고 배열에서 제외 (외부 API 미연동) const execNmBase = `EXPC_${Date.now()}` const execSns: Array<{ model: string; execSn: number }> = [] // KOSPS 처리: PRED_EXEC INSERT(PENDING)만 수행 if (requestedModels.includes('KOSPS')) { try { const kospsExecNm = `${execNmBase}_KOSPS` const insertRes = await wingPool.query( `INSERT INTO wing.PRED_EXEC (ACDNT_SN, SPIL_DATA_SN, ALGO_CD, EXEC_STTS_CD, EXEC_NM, EXEC_USER_ID, BGNG_DTM) VALUES ($1, $2, 'KOSPS', 'PENDING', $3, $4, NOW()) RETURNING PRED_EXEC_SN`, [resolvedAcdntSn, resolvedSpilDataSn, kospsExecNm, req.user!.sub] ) execSns.push({ model: 'KOSPS', execSn: insertRes.rows[0].pred_exec_sn as number }) } catch (dbErr) { console.error('[simulation] KOSPS PRED_EXEC INSERT 실패:', dbErr) } } // API 연동 모델 필터링 (KOSPS 제외) const apiModels = requestedModels.filter((m) => m !== 'KOSPS' && MODEL_ALGO_CD_MAP[m] !== undefined) // 각 모델에 대해 PRED_EXEC INSERT → /run-model 호출 await Promise.all( apiModels.map(async (model) => { const algoCd = MODEL_ALGO_CD_MAP[model] const apiUrl = MODEL_API_URL_MAP[model] const execNm = `${execNmBase}_${algoCd}` // PRED_EXEC INSERT (PENDING) let predExecSn: number try { const insertRes = await wingPool.query( `INSERT INTO wing.PRED_EXEC (ACDNT_SN, SPIL_DATA_SN, ALGO_CD, EXEC_STTS_CD, EXEC_NM, EXEC_USER_ID, BGNG_DTM) VALUES ($1, $2, $3, 'PENDING', $4, $5, NOW()) RETURNING PRED_EXEC_SN`, [resolvedAcdntSn, resolvedSpilDataSn, algoCd, execNm, req.user!.sub] ) predExecSn = insertRes.rows[0].pred_exec_sn as number } catch (dbErr) { console.error(`[simulation] ${model} PRED_EXEC INSERT 실패:`, dbErr) return } execSns.push({ model, execSn: predExecSn }) // Python /run-model 호출 let jobId: string try { const pythonRes = await fetch(`${apiUrl}/run-model`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ lat, lon, startTime, runTime, matTy: odMatTy, matVol, spillTime, name: execNm, }), signal: AbortSignal.timeout(10000), }) if (pythonRes.status === 503) { const errData = await pythonRes.json() as { error?: string } await wingPool.query( `UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG=$1, CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$2`, [errData.error || '분석 서버 포화', predExecSn] ) await rollbackNewRecords(predExecSn, newlyCreatedSpilDataSn, newlyCreatedAcdntSn) return } if (!pythonRes.ok) { throw new Error(`Python 서버 응답 오류: ${pythonRes.status}`) } const pythonData = await pythonRes.json() as { job_id: string } jobId = pythonData.job_id } catch { await wingPool.query( `UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG='Python 분석 서버에 연결할 수 없습니다.', CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$1`, [predExecSn] ) // 이 모델의 PRED_EXEC만 롤백 (다른 모델은 계속 진행) await rollbackNewRecords(predExecSn, null, null) return } // RUNNING 업데이트 await wingPool.query( `UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='RUNNING' WHERE PRED_EXEC_SN=$1`, [predExecSn] ) // 백그라운드 폴링 시작 pollAndSaveModel(jobId, predExecSn, apiUrl, algoCd).catch((err: unknown) => console.error(`[simulation] ${model} pollAndSaveModel 오류:`, err) ) }) ) // ACDNT/SPIL_DATA가 신규 생성됐으나 모든 모델이 실패한 경우 롤백 const hasRunning = execSns.some(({ model }) => model !== 'KOSPS') if (!hasRunning && newlyCreatedAcdntSn !== null) { await rollbackNewRecords(null, newlyCreatedSpilDataSn, newlyCreatedAcdntSn) return res.status(503).json({ error: '분석 서버에 연결할 수 없습니다.' }) } // 즉시 응답 (하위 호환을 위해 execSn도 포함) res.json({ success: true, execSns, execSn: execSns[0]?.execSn ?? 0, acdntSn: resolvedAcdntSn, status: 'RUNNING', }) } catch { res.status(500).json({ error: '시뮬레이션 실행 실패', message: '서버 내부 오류가 발생했습니다.' }) } }) // ============================================================ // GET /api/simulation/status/:execSn // 시뮬레이션 실행 상태 및 결과 조회 // ============================================================ /** * PRED_EXEC 테이블에서 실행 상태를 조회한다. * DB 상태(COMPLETED/FAILED)를 프론트 상태(DONE/ERROR)로 매핑하여 반환한다. */ router.get('/status/:execSn', requireAuth, async (req: Request, res: Response) => { const execSn = parseInt(req.params.execSn as string, 10) if (isNaN(execSn) || execSn <= 0) { return res.status(400).json({ error: '유효하지 않은 execSn' }) } try { const result = await wingPool.query( `SELECT pe.EXEC_STTS_CD, pe.RSLT_DATA, pe.ERR_MSG, pe.BGNG_DTM, pe.ALGO_CD, sd.FCST_HR, ( SELECT AVG(hist.REQD_SEC::FLOAT / hsd.FCST_HR) FROM wing.PRED_EXEC hist JOIN wing.SPIL_DATA hsd ON hist.SPIL_DATA_SN = hsd.SPIL_DATA_SN WHERE hist.ALGO_CD = pe.ALGO_CD AND hist.EXEC_STTS_CD = 'COMPLETED' AND hist.REQD_SEC IS NOT NULL AND hist.REQD_SEC > 0 AND hsd.FCST_HR IS NOT NULL AND hsd.FCST_HR > 0 ) AS avg_sec_per_hr FROM wing.PRED_EXEC pe LEFT JOIN wing.SPIL_DATA sd ON pe.SPIL_DATA_SN = sd.SPIL_DATA_SN WHERE pe.PRED_EXEC_SN=$1`, [execSn] ) if (result.rows.length === 0) { return res.status(404).json({ error: '분석 기록을 찾을 수 없습니다.' }) } const row = result.rows[0] const dbStatus: string = row.exec_stts_cd as string // DB 상태 → API 상태 매핑 const statusMap: Record = { PENDING: 'PENDING', RUNNING: 'RUNNING', COMPLETED: 'DONE', FAILED: 'ERROR', } const status = statusMap[dbStatus] ?? dbStatus if (status === 'DONE' && row.rslt_data) { const algoCd = String(row.algo_cd ?? '') const modelName = ALGO_CD_TO_MODEL_NAME[algoCd] ?? algoCd const { trajectory, summary, centerPoints, windData, hydrData } = transformResult(row.rslt_data as PythonTimeStep[], modelName) return res.json({ status, trajectory, summary, centerPoints, windData, hydrData }) } if (status === 'ERROR') { return res.json({ status, error: (row.err_msg as string) || '분석 중 오류가 발생했습니다.' }) } // PENDING/RUNNING: 경과 시간 기반 진행률 계산 // 과거 실행의 초/예측시간 비율(avg_sec_per_hr) × 현재 fcst_hr로 추정, 이력 없으면 5초/hr 폴백 let progress: number | undefined; if (status === 'RUNNING' && row.bgng_dtm) { const fcstHr = Number(row.fcst_hr) || 24; const avgSecPerHr = row.avg_sec_per_hr ? Number(row.avg_sec_per_hr) : 5; const estimatedSec = avgSecPerHr * fcstHr; const elapsedSec = (Date.now() - new Date(row.bgng_dtm as string).getTime()) / 1000; progress = Math.min(95, Math.floor((elapsedSec / estimatedSec) * 100)); } res.json({ status, ...(progress !== undefined && { progress }) }) } catch { res.status(500).json({ error: '상태 조회 실패' }) } }) // ============================================================ // POST /api/simulation/run-model (동기 방식) // 예측 완료 후 결과를 직접 반환한다. // ============================================================ /** * 선택된 모델로 확산 시뮬레이션을 실행하고 완료될 때까지 대기한 후 결과를 반환한다. * 다중 모델은 병렬로 실행되며, 일부 모델 실패 시 성공한 모델 결과는 포함된다. */ router.post('/run-model', requireAuth, async (req: Request, res: Response) => { try { const { acdntSn: rawAcdntSn, acdntNm, spillUnit, spillTypeCd, lat, lon, runTime, matTy, matVol, spillTime, startTime, models: rawModels } = req.body let requestedModels: string[] = Array.isArray(rawModels) && rawModels.length > 0 ? (rawModels as string[]) : ['OpenDrift'] // 1. 필수 파라미터 검증 if (lat === undefined || lon === undefined || runTime === undefined) { return res.status(400).json({ error: '필수 파라미터 누락', required: ['lat', 'lon', 'runTime'] }) } if (!isValidLatitude(lat)) { return res.status(400).json({ error: '유효하지 않은 위도', message: '위도는 -90~90 범위여야 합니다.' }) } if (!isValidLongitude(lon)) { return res.status(400).json({ error: '유효하지 않은 경도', message: '경도는 -180~180 범위여야 합니다.' }) } if (!isValidNumber(runTime, 1, 720)) { return res.status(400).json({ error: '유효하지 않은 예측 시간', message: '예측 시간은 1~720 범위여야 합니다.' }) } if (matVol !== undefined && !isValidNumber(matVol, 0, 1000000)) { return res.status(400).json({ error: '유효하지 않은 유출량' }) } if (matTy !== undefined && (typeof matTy !== 'string' || !isValidStringLength(matTy, 50))) { return res.status(400).json({ error: '유효하지 않은 유종' }) } if (!rawAcdntSn && (!acdntNm || typeof acdntNm !== 'string' || !acdntNm.trim())) { return res.status(400).json({ error: '사고를 선택하거나 사고명을 입력해야 합니다.' }) } if (acdntNm && (typeof acdntNm !== 'string' || !isValidStringLength(acdntNm, 200))) { return res.status(400).json({ error: '사고명은 200자 이내여야 합니다.' }) } // 2. NC 파일 존재 여부 확인 if (requestedModels.includes('OpenDrift')) { try { const checkRes = await fetch(`${PYTHON_API_URL}/check-nc`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ lat, lon, startTime }), signal: AbortSignal.timeout(5000), }) if (!checkRes.ok) { // NC 파일 없으면 OpenDrift만 제외, 나머지 모델(POSEIDON 등)은 계속 진행 requestedModels = requestedModels.filter(m => m !== 'OpenDrift') if (requestedModels.length === 0) { return res.status(409).json({ error: '해당 좌표의 해양 기상 데이터가 없습니다.', message: 'NC 파일이 준비되지 않았습니다.', }) } } } catch { // Python 서버 미기동 — 이후 단계에서 처리 } } // 3. ACDNT/SPIL_DATA 생성 또는 조회 let resolvedAcdntSn: number | null = rawAcdntSn ? Number(rawAcdntSn) : null let resolvedSpilDataSn: number | null = null let newlyCreatedAcdntSn: number | null = null let newlyCreatedSpilDataSn: number | null = null if (!resolvedAcdntSn && acdntNm) { try { const occrn = startTime ?? new Date().toISOString() const acdntRes = await wingPool.query( `INSERT INTO wing.ACDNT (ACDNT_CD, ACDNT_NM, ACDNT_TP_CD, OCCRN_DTM, LAT, LNG, ACDNT_STTS_CD, USE_YN, REG_DTM) VALUES ( 'INC-' || EXTRACT(YEAR FROM NOW())::TEXT || '-' || LPAD( (SELECT COALESCE(MAX(CAST(SPLIT_PART(ACDNT_CD, '-', 3) AS INTEGER)), 0) + 1 FROM wing.ACDNT WHERE ACDNT_CD LIKE 'INC-' || EXTRACT(YEAR FROM NOW())::TEXT || '-%')::TEXT, 4, '0' ), $1, '유류유출', $2, $3, $4, 'ACTIVE', 'Y', NOW() ) RETURNING ACDNT_SN`, [acdntNm.trim(), occrn, lat, lon] ) resolvedAcdntSn = acdntRes.rows[0].acdnt_sn as number newlyCreatedAcdntSn = resolvedAcdntSn const spilRes = await wingPool.query( `INSERT INTO wing.SPIL_DATA (ACDNT_SN, OIL_TP_CD, SPIL_QTY, SPIL_UNIT_CD, SPIL_TP_CD, FCST_HR, REG_DTM) VALUES ($1, $2, $3, $4, $5, $6, NOW()) RETURNING SPIL_DATA_SN`, [ resolvedAcdntSn, OIL_DB_CODE_MAP[matTy as string] ?? 'BUNKER_C', matVol ?? 0, UNIT_MAP[spillUnit as string] ?? 'KL', SPIL_TYPE_MAP[spillTypeCd as string] ?? 'CONTINUOUS', runTime, ] ) resolvedSpilDataSn = spilRes.rows[0].spil_data_sn as number newlyCreatedSpilDataSn = resolvedSpilDataSn } catch (dbErr) { console.error('[simulation/run-model] ACDNT/SPIL_DATA INSERT 실패:', dbErr) return res.status(500).json({ error: '사고 정보 생성 실패' }) } } if (resolvedAcdntSn && !resolvedSpilDataSn) { try { const spilRes = await wingPool.query( `INSERT INTO wing.SPIL_DATA (ACDNT_SN, OIL_TP_CD, SPIL_QTY, SPIL_UNIT_CD, SPIL_TP_CD, FCST_HR, REG_DTM) VALUES ($1, $2, $3, $4, $5, $6, NOW()) RETURNING SPIL_DATA_SN`, [ resolvedAcdntSn, OIL_DB_CODE_MAP[matTy as string] ?? 'BUNKER_C', matVol ?? 0, UNIT_MAP[spillUnit as string] ?? 'KL', SPIL_TYPE_MAP[spillTypeCd as string] ?? 'CONTINUOUS', runTime, ] ) resolvedSpilDataSn = spilRes.rows[0].spil_data_sn as number } catch (dbErr) { console.error('[simulation/run-model] SPIL_DATA INSERT 실패:', dbErr) } } const odMatTy = matTy !== undefined ? (OIL_TYPE_MAP[matTy as string] ?? (matTy as string)) : undefined const execNmBase = `EXPC_${Date.now()}` // 이번 예측 실행을 식별하는 그룹 SN 생성 let predRunSn: number try { const runSnRes = await wingPool.query("SELECT nextval('wing.PRED_RUN_SN_SEQ') AS pred_run_sn") predRunSn = runSnRes.rows[0].pred_run_sn as number } catch (dbErr) { console.error('[simulation/run-model] PRED_RUN_SN_SEQ 조회 실패:', dbErr) return res.status(500).json({ error: '실행 SN 생성 실패' }) } // KOSPS: PRED_EXEC INSERT(PENDING)만 수행 const execSns: Array<{ model: string; execSn: number }> = [] if (requestedModels.includes('KOSPS')) { try { const kospsExecNm = `${execNmBase}_KOSPS` const insertRes = await wingPool.query( `INSERT INTO wing.PRED_EXEC (ACDNT_SN, SPIL_DATA_SN, ALGO_CD, EXEC_STTS_CD, EXEC_NM, PRED_RUN_SN, EXEC_USER_ID, BGNG_DTM) VALUES ($1, $2, 'KOSPS', 'PENDING', $3, $4, $5, NOW()) RETURNING PRED_EXEC_SN`, [resolvedAcdntSn, resolvedSpilDataSn, kospsExecNm, predRunSn, req.user!.sub] ) execSns.push({ model: 'KOSPS', execSn: insertRes.rows[0].pred_exec_sn as number }) } catch (dbErr) { console.error('[simulation/run-model] KOSPS PRED_EXEC INSERT 실패:', dbErr) } } // 4. API 연동 모델 시작 및 완료 대기 (병렬) const apiModels = requestedModels.filter((m) => m !== 'KOSPS' && MODEL_ALGO_CD_MAP[m] !== undefined) interface SyncModelResult { model: string execSn: number status: 'DONE' | 'ERROR' trajectory?: ReturnType['trajectory'] summary?: ReturnType['summary'] stepSummaries?: ReturnType['stepSummaries'] centerPoints?: ReturnType['centerPoints'] windData?: ReturnType['windData'] hydrData?: ReturnType['hydrData'] error?: string } const modelResults = await Promise.all( apiModels.map(async (model): Promise => { const algoCd = MODEL_ALGO_CD_MAP[model] const apiUrl = MODEL_API_URL_MAP[model] const execNm = `${execNmBase}_${algoCd}` // PRED_EXEC INSERT let predExecSn: number try { const insertRes = await wingPool.query( `INSERT INTO wing.PRED_EXEC (ACDNT_SN, SPIL_DATA_SN, ALGO_CD, EXEC_STTS_CD, EXEC_NM, PRED_RUN_SN, EXEC_USER_ID, BGNG_DTM) VALUES ($1, $2, $3, 'PENDING', $4, $5, $6, NOW()) RETURNING PRED_EXEC_SN`, [resolvedAcdntSn, resolvedSpilDataSn, algoCd, execNm, predRunSn, req.user!.sub] ) predExecSn = insertRes.rows[0].pred_exec_sn as number } catch (dbErr) { console.error(`[simulation/run-model] ${model} PRED_EXEC INSERT 실패:`, dbErr) return { model, execSn: 0, status: 'ERROR', error: 'DB 오류' } } execSns.push({ model, execSn: predExecSn }) // Python /run-model 호출 let jobId: string | undefined try { const pythonRes = await fetch(`${apiUrl}/run-model`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ lat, lon, startTime, runTime, matTy: odMatTy, matVol, spillTime, name: execNm }), signal: AbortSignal.timeout(POLL_TIMEOUT_MS), }) if (pythonRes.status === 503) { const errData = await pythonRes.json() as { error?: string } const errMsg = errData.error || '분석 서버 포화' await wingPool.query( `UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG=$1, CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$2`, [errMsg, predExecSn] ) return { model, execSn: predExecSn, status: 'ERROR', error: errMsg } } if (!pythonRes.ok) { throw new Error(`Python 서버 응답 오류: ${pythonRes.status}`) } const pythonData = await pythonRes.json() as { success?: boolean; result?: PythonTimeStep[]; job_id?: string; error?: string; message?: string; error_code?: number; } // 동기 성공 응답 (OpenDrift & POSEIDON 공통) if (Array.isArray(pythonData.result)) { await wingPool.query( `UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='COMPLETED', RSLT_DATA=$1, CMPL_DTM=NOW(), REQD_SEC=EXTRACT(EPOCH FROM (NOW() - BGNG_DTM))::INTEGER WHERE PRED_EXEC_SN=$2`, [JSON.stringify(pythonData.result), predExecSn] ) const { trajectory, summary, stepSummaries, centerPoints, windData, hydrData } = transformResult(pythonData.result, model) return { model, execSn: predExecSn, status: 'DONE', trajectory, summary, stepSummaries, centerPoints, windData, hydrData } } // 비동기 응답 (하위 호환) if (pythonData.job_id) { jobId = pythonData.job_id } else { // 오류 응답 (success: false, HTTP 200) const errMsg = pythonData.error || pythonData.message || '분석 오류' await wingPool.query( `UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG=$1, CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$2`, [errMsg, predExecSn] ) return { model, execSn: predExecSn, status: 'ERROR', error: errMsg } } } catch (fetchErr) { const errMsg = 'Python 분석 서버에 연결할 수 없습니다.' await wingPool.query( `UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG=$1, CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$2`, [errMsg, predExecSn] ) return { model, execSn: predExecSn, status: 'ERROR', error: errMsg } } // RUNNING 업데이트 (비동기 폴링 경로) await wingPool.query( `UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='RUNNING' WHERE PRED_EXEC_SN=$1`, [predExecSn] ) // 결과 동기 대기 try { const rawResult = await runModelSync(jobId!, predExecSn, apiUrl) const { trajectory, summary, stepSummaries, centerPoints, windData, hydrData } = transformResult(rawResult, model) return { model, execSn: predExecSn, status: 'DONE', trajectory, summary, stepSummaries, centerPoints, windData, hydrData } } catch (syncErr) { return { model, execSn: predExecSn, status: 'ERROR', error: (syncErr as Error).message } } }) ) // 모든 모델이 실패하고 신규 생성한 ACDNT가 있으면 롤백 const hasSuccess = modelResults.some((r) => r.status === 'DONE') if (!hasSuccess && newlyCreatedAcdntSn !== null) { for (const r of modelResults) { if (r.execSn) await rollbackNewRecords(r.execSn, null, null) } await rollbackNewRecords(null, newlyCreatedSpilDataSn, newlyCreatedAcdntSn) return res.status(503).json({ error: '분석 서버에 연결할 수 없습니다.' }) } res.json({ success: true, acdntSn: resolvedAcdntSn, predRunSn, execSns: [...execSns, ...modelResults.map(({ model, execSn }) => ({ model, execSn }))], results: modelResults, }) } catch { res.status(500).json({ error: '시뮬레이션 실행 실패', message: '서버 내부 오류가 발생했습니다.' }) } }) // ============================================================ // 백그라운드 폴링 // ============================================================ async function pollAndSaveModel(jobId: string, execSn: number, apiUrl: string, algoCode: string): Promise { const deadline = Date.now() + POLL_TIMEOUT_MS while (Date.now() < deadline) { await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL_MS)) try { const pollRes = await fetch(`${apiUrl}/status/${jobId}`, { signal: AbortSignal.timeout(5000), }) if (!pollRes.ok) continue const data = await pollRes.json() as PythonStatusResponse if (data.status === 'DONE' && data.result) { await wingPool.query( `UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='COMPLETED', RSLT_DATA=$1, CMPL_DTM=NOW(), REQD_SEC=EXTRACT(EPOCH FROM (NOW() - BGNG_DTM))::INTEGER WHERE PRED_EXEC_SN=$2`, [JSON.stringify(data.result), execSn] ) return } if (data.status === 'ERROR') { await wingPool.query( `UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG=$1, CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$2`, [data.error ?? '분석 오류', execSn] ) return } } catch { // 개별 폴링 오류는 무시하고 재시도 } } // 타임아웃 처리 await wingPool.query( `UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG='분석 시간 초과 (30분)', CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$1`, [execSn] ) } // ============================================================ // 동기 폴링: Python 결과 대기 후 반환 // ============================================================ async function runModelSync(jobId: string, execSn: number, apiUrl: string): Promise { const deadline = Date.now() + POLL_TIMEOUT_MS while (Date.now() < deadline) { await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL_MS)) let data: PythonStatusResponse try { const pollRes = await fetch(`${apiUrl}/status/${jobId}`, { signal: AbortSignal.timeout(5000), }) if (!pollRes.ok) continue data = await pollRes.json() as PythonStatusResponse } catch { // 네트워크 오류 — 재시도 continue } if (data.status === 'DONE' && data.result) { await wingPool.query( `UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='COMPLETED', RSLT_DATA=$1, CMPL_DTM=NOW(), REQD_SEC=EXTRACT(EPOCH FROM (NOW() - BGNG_DTM))::INTEGER WHERE PRED_EXEC_SN=$2`, [JSON.stringify(data.result), execSn] ) return data.result } if (data.status === 'ERROR') { const errMsg = data.error ?? '분석 오류' await wingPool.query( `UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG=$1, CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$2`, [errMsg, execSn] ) throw new Error(errMsg) } } await wingPool.query( `UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG='분석 시간 초과 (30분)', CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$1`, [execSn] ) throw new Error('분석 시간 초과 (30분)') } // ============================================================ // 타입 및 결과 변환 // ============================================================ interface PythonParticle { lat: number lon: number stranded?: 0 | 1 } interface WindPoint { lat: number lon: number wind_speed: number wind_direction: number } interface HydrGrid { lonInterval: number[] boundLonLat: { top: number; bottom: number; left: number; right: number } rows: number cols: number latInterval: number[] } interface PythonTimeStep { particles: PythonParticle[] remaining_volume_m3: number weathered_volume_m3: number evaporation_m3?: number dispersion_m3?: number pollution_area_km2: number beached_volume_m3: number pollution_coast_length_m: number center_lat?: number center_lon?: number wind_data?: WindPoint[] hydr_data?: [number[][], number[][]] hydr_grid?: HydrGrid } interface PythonStatusResponse { status: 'RUNNING' | 'DONE' | 'ERROR' result?: PythonTimeStep[] error?: string } // ALGO_CD → 프론트엔드 모델명 매핑 const ALGO_CD_TO_MODEL_NAME: Record = { 'OPENDRIFT': 'OpenDrift', 'POSEIDON': 'POSEIDON', } function transformResult(rawResult: PythonTimeStep[], model: string) { const trajectory = rawResult.flatMap((step, stepIdx) => step.particles.map((p, i) => ({ lat: p.lat, lon: p.lon, time: stepIdx, particle: i, stranded: p.stranded, })) ) const lastStep = rawResult[rawResult.length - 1] const summary = { remainingVolume: lastStep.remaining_volume_m3, weatheredVolume: lastStep.weathered_volume_m3, evaporationVolume: lastStep.evaporation_m3 ?? lastStep.weathered_volume_m3 * 0.65, dispersionVolume: lastStep.dispersion_m3 ?? lastStep.weathered_volume_m3 * 0.35, pollutionArea: lastStep.pollution_area_km2, beachedVolume: lastStep.beached_volume_m3, pollutionCoastLength: lastStep.pollution_coast_length_m, } const centerPoints = rawResult .map((step, stepIdx) => step.center_lat != null && step.center_lon != null ? { lat: step.center_lat, lon: step.center_lon, time: stepIdx, model } : null ) .filter((p): p is { lat: number; lon: number; time: number; model: string } => p !== null) const windData = rawResult.map((step) => step.wind_data ?? []) const hydrData = rawResult.map((step) => step.hydr_data && step.hydr_grid ? { value: step.hydr_data, grid: step.hydr_grid } : null ) const stepSummaries = rawResult.map((step) => ({ remainingVolume: step.remaining_volume_m3, weatheredVolume: step.weathered_volume_m3, evaporationVolume: step.evaporation_m3 ?? step.weathered_volume_m3 * 0.65, dispersionVolume: step.dispersion_m3 ?? step.weathered_volume_m3 * 0.35, pollutionArea: step.pollution_area_km2, beachedVolume: step.beached_volume_m3, pollutionCoastLength: step.pollution_coast_length_m, })) return { trajectory, summary, stepSummaries, centerPoints, windData, hydrData } } export default router