wing-ops/backend/src/routes/simulation.ts
2026-03-25 15:35:43 +09:00

947 lines
37 KiB
TypeScript
Executable File
Raw Blame 히스토리

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { Router, Request, Response } from 'express'
import { wingPool } from '../db/wingDb.js'
import { requireAuth } from '../auth/authMiddleware.js'
import {
isValidLatitude,
isValidLongitude,
isValidNumber,
isValidStringLength,
} from '../middleware/security.js'
const router = Router()
const PYTHON_API_URL = process.env.PYTHON_API_URL ?? 'http://localhost:5003'
const POSEIDON_API_URL = process.env.POSEIDON_API_URL ?? 'http://localhost:5004'
const POLL_INTERVAL_MS = 3000
const POLL_TIMEOUT_MS = 30 * 60 * 1000 // 30분
// 유종 매핑: 한국어 UI 선택값 → OpenDrift 유종 코드
// 추후 DB/설정 파일로 외부화 예정 (개발 단계 임시 구현)
const OIL_TYPE_MAP: Record<string, string> = {
'벙커C유': 'GENERIC BUNKER C',
'경유': 'GENERIC DIESEL',
'원유': 'WEST TEXAS INTERMEDIATE (WTI)',
'중유': 'GENERIC HEAVY FUEL OIL',
'등유': 'FUEL OIL NO.1 (KEROSENE)',
'휘발유': 'GENERIC GASOLINE',
}
// 유종 매핑: 한국어 UI → DB 저장 코드
const OIL_DB_CODE_MAP: Record<string, string> = {
'벙커C유': 'BUNKER_C',
'경유': 'DIESEL',
'원유': 'CRUDE_OIL',
'중유': 'HEAVY_FUEL_OIL',
'등유': 'KEROSENE',
'휘발유': 'GASOLINE',
}
// 유출 형태 매핑: 한국어 UI → DB 저장 코드
const SPIL_TYPE_MAP: Record<string, string> = {
'연속': 'CONTINUOUS',
'비연속': 'DISCONTINUOUS',
'순간 유출': 'INSTANT',
}
// 단위 매핑: 한국어 UI → DB 저장 코드
const UNIT_MAP: Record<string, string> = {
'kL': 'KL', 'ton': 'TON', 'barrel': 'BBL',
}
// ============================================================
// 신규 생성된 ACDNT/SPIL_DATA/PRED_EXEC 롤백 헬퍼
// Python 호출 실패 시 이번 요청에서 생성된 레코드만 삭제한다.
// ============================================================
async function rollbackNewRecords(
predExecSn: number | null,
newSpilDataSn: number | null,
newAcdntSn: number | null
): Promise<void> {
try {
if (predExecSn !== null) {
await wingPool.query('DELETE FROM wing.PRED_EXEC WHERE PRED_EXEC_SN=$1', [predExecSn])
}
if (newSpilDataSn !== null) {
await wingPool.query('DELETE FROM wing.SPIL_DATA WHERE SPIL_DATA_SN=$1', [newSpilDataSn])
}
if (newAcdntSn !== null) {
await wingPool.query('DELETE FROM wing.ACDNT WHERE ACDNT_SN=$1', [newAcdntSn])
}
} catch (cleanupErr) {
console.error('[simulation] 롤백 실패:', cleanupErr)
}
}
// 모델명 → ALGO_CD 매핑
const MODEL_ALGO_CD_MAP: Record<string, string> = {
'OpenDrift': 'OPENDRIFT',
'POSEIDON': 'POSEIDON',
}
// 모델명 → API URL 매핑
const MODEL_API_URL_MAP: Record<string, string> = {
'OpenDrift': PYTHON_API_URL,
'POSEIDON': POSEIDON_API_URL,
}
// ============================================================
// POST /api/simulation/run
// 확산 시뮬레이션 실행 (다중 모델 지원: OpenDrift, POSEIDON)
// ============================================================
/**
* 선택된 모델(OpenDrift, POSEIDON)로 확산 시뮬레이션을 실행한다.
* 각 모델에 대해 PRED_EXEC 레코드를 별도 생성하고 Python API에 병렬 제출한다.
* KOSPS 모델은 PRED_EXEC INSERT(PENDING)만 수행하고 외부 API 연동은 하지 않는다.
* 프론트엔드는 execSns 배열의 각 execSn으로 GET /status/:execSn을 폴링하여 결과를 수신한다.
*/
router.post('/run', requireAuth, async (req: Request, res: Response) => {
try {
const { acdntSn: rawAcdntSn, acdntNm, spillUnit, spillTypeCd,
lat, lon, runTime, matTy, matVol, spillTime, startTime,
models: rawModels } = req.body
// 실행할 모델 목록 (기본값: OpenDrift)
const requestedModels: string[] = Array.isArray(rawModels) && rawModels.length > 0
? (rawModels as string[])
: ['OpenDrift']
// 1. 필수 파라미터 검증
if (lat === undefined || lon === undefined || runTime === undefined) {
return res.status(400).json({
error: '필수 파라미터 누락',
required: ['lat', 'lon', 'runTime'],
})
}
if (!isValidLatitude(lat)) {
return res.status(400).json({ error: '유효하지 않은 위도', message: '위도는 -90~90 범위여야 합니다.' })
}
if (!isValidLongitude(lon)) {
return res.status(400).json({ error: '유효하지 않은 경도', message: '경도는 -180~180 범위여야 합니다.' })
}
if (!isValidNumber(runTime, 1, 720)) {
return res.status(400).json({ error: '유효하지 않은 예측 시간', message: '예측 시간은 1~720 범위여야 합니다.' })
}
if (matVol !== undefined && !isValidNumber(matVol, 0, 1000000)) {
return res.status(400).json({ error: '유효하지 않은 유출량' })
}
if (matTy !== undefined && (typeof matTy !== 'string' || !isValidStringLength(matTy, 50))) {
return res.status(400).json({ error: '유효하지 않은 유종' })
}
// acdntSn 없는 경우 acdntNm 필수
if (!rawAcdntSn && (!acdntNm || typeof acdntNm !== 'string' || !acdntNm.trim())) {
return res.status(400).json({ error: '사고를 선택하거나 사고명을 입력해야 합니다.' })
}
if (acdntNm && (typeof acdntNm !== 'string' || !isValidStringLength(acdntNm, 200))) {
return res.status(400).json({ error: '사고명은 200자 이내여야 합니다.' })
}
// 2. Python NC 파일 존재 여부 확인 (ACDNT 생성 전에 수행하여 고아 레코드 방지)
// OpenDrift 모델이 포함된 경우에만 check-nc 수행
if (requestedModels.includes('OpenDrift')) {
try {
const checkRes = await fetch(`${PYTHON_API_URL}/check-nc`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ lat, lon, startTime }),
signal: AbortSignal.timeout(5000),
})
if (!checkRes.ok) {
return res.status(409).json({
error: '해당 좌표의 해양 기상 데이터가 없습니다.',
message: 'NC 파일이 준비되지 않았습니다.',
})
}
} catch {
// Python 서버 미기동 — 5번에서 처리
}
}
// 1-B. acdntSn 미제공 시 ACDNT + SPIL_DATA 생성
let resolvedAcdntSn: number | null = rawAcdntSn ? Number(rawAcdntSn) : null
let resolvedSpilDataSn: number | null = null
// 이번 요청에서 신규 생성된 레코드 추적 (Python 실패 시 롤백 대상)
let newlyCreatedAcdntSn: number | null = null
let newlyCreatedSpilDataSn: number | null = null
if (!resolvedAcdntSn && acdntNm) {
try {
const occrn = startTime ?? new Date().toISOString()
const acdntRes = await wingPool.query(
`INSERT INTO wing.ACDNT
(ACDNT_CD, ACDNT_NM, ACDNT_TP_CD, OCCRN_DTM, LAT, LNG, ACDNT_STTS_CD, USE_YN, REG_DTM)
VALUES (
'INC-' || EXTRACT(YEAR FROM NOW())::TEXT || '-' ||
LPAD(
(SELECT COALESCE(MAX(CAST(SPLIT_PART(ACDNT_CD, '-', 3) AS INTEGER)), 0) + 1
FROM wing.ACDNT
WHERE ACDNT_CD LIKE 'INC-' || EXTRACT(YEAR FROM NOW())::TEXT || '-%')::TEXT,
4, '0'
),
$1, '유류유출', $2, $3, $4, 'ACTIVE', 'Y', NOW()
)
RETURNING ACDNT_SN`,
[acdntNm.trim(), occrn, lat, lon]
)
resolvedAcdntSn = acdntRes.rows[0].acdnt_sn as number
newlyCreatedAcdntSn = resolvedAcdntSn
const spilRes = await wingPool.query(
`INSERT INTO wing.SPIL_DATA (ACDNT_SN, OIL_TP_CD, SPIL_QTY, SPIL_UNIT_CD, SPIL_TP_CD, FCST_HR, REG_DTM)
VALUES ($1, $2, $3, $4, $5, $6, NOW())
RETURNING SPIL_DATA_SN`,
[
resolvedAcdntSn,
OIL_DB_CODE_MAP[matTy as string] ?? 'BUNKER_C',
matVol ?? 0,
UNIT_MAP[spillUnit as string] ?? 'KL',
SPIL_TYPE_MAP[spillTypeCd as string] ?? 'CONTINUOUS',
runTime,
]
)
resolvedSpilDataSn = spilRes.rows[0].spil_data_sn as number
newlyCreatedSpilDataSn = resolvedSpilDataSn
} catch (dbErr) {
console.error('[simulation] ACDNT/SPIL_DATA INSERT 실패:', dbErr)
return res.status(500).json({ error: '사고 정보 생성 실패' })
}
}
// 3. 기존 사고의 경우 SPIL_DATA_SN 조회
if (resolvedAcdntSn && !resolvedSpilDataSn) {
try {
const spilRes = await wingPool.query(
`INSERT INTO wing.SPIL_DATA (ACDNT_SN, OIL_TP_CD, SPIL_QTY, SPIL_UNIT_CD, SPIL_TP_CD, FCST_HR, REG_DTM)
VALUES ($1, $2, $3, $4, $5, $6, NOW())
RETURNING SPIL_DATA_SN`,
[
resolvedAcdntSn,
OIL_DB_CODE_MAP[matTy as string] ?? 'BUNKER_C',
matVol ?? 0,
UNIT_MAP[spillUnit as string] ?? 'KL',
SPIL_TYPE_MAP[spillTypeCd as string] ?? 'CONTINUOUS',
runTime,
]
)
resolvedSpilDataSn = spilRes.rows[0].spil_data_sn as number
} catch (dbErr) {
console.error('[simulation] SPIL_DATA INSERT 실패:', dbErr)
}
}
// matTy 변환: 한국어 유종 → OpenDrift 유종 코드
// 매핑 대상이 아니면 원본 값 그대로 사용 (영문 직접 입력 대응)
const odMatTy = matTy !== undefined ? (OIL_TYPE_MAP[matTy as string] ?? (matTy as string)) : undefined
// 4. 각 모델별 PRED_EXEC INSERT 및 API 호출 (병렬)
// KOSPS: PRED_EXEC PENDING 생성만 하고 배열에서 제외 (외부 API 미연동)
const execNmBase = `EXPC_${Date.now()}`
const execSns: Array<{ model: string; execSn: number }> = []
// KOSPS 처리: PRED_EXEC INSERT(PENDING)만 수행
if (requestedModels.includes('KOSPS')) {
try {
const kospsExecNm = `${execNmBase}_KOSPS`
const insertRes = await wingPool.query(
`INSERT INTO wing.PRED_EXEC (ACDNT_SN, SPIL_DATA_SN, ALGO_CD, EXEC_STTS_CD, EXEC_NM, BGNG_DTM)
VALUES ($1, $2, 'KOSPS', 'PENDING', $3, NOW())
RETURNING PRED_EXEC_SN`,
[resolvedAcdntSn, resolvedSpilDataSn, kospsExecNm]
)
execSns.push({ model: 'KOSPS', execSn: insertRes.rows[0].pred_exec_sn as number })
} catch (dbErr) {
console.error('[simulation] KOSPS PRED_EXEC INSERT 실패:', dbErr)
}
}
// API 연동 모델 필터링 (KOSPS 제외)
const apiModels = requestedModels.filter((m) => m !== 'KOSPS' && MODEL_ALGO_CD_MAP[m] !== undefined)
// 각 모델에 대해 PRED_EXEC INSERT → /run-model 호출
await Promise.all(
apiModels.map(async (model) => {
const algoCd = MODEL_ALGO_CD_MAP[model]
const apiUrl = MODEL_API_URL_MAP[model]
const execNm = `${execNmBase}_${algoCd}`
// PRED_EXEC INSERT (PENDING)
let predExecSn: number
try {
const insertRes = await wingPool.query(
`INSERT INTO wing.PRED_EXEC (ACDNT_SN, SPIL_DATA_SN, ALGO_CD, EXEC_STTS_CD, EXEC_NM, BGNG_DTM)
VALUES ($1, $2, $3, 'PENDING', $4, NOW())
RETURNING PRED_EXEC_SN`,
[resolvedAcdntSn, resolvedSpilDataSn, algoCd, execNm]
)
predExecSn = insertRes.rows[0].pred_exec_sn as number
} catch (dbErr) {
console.error(`[simulation] ${model} PRED_EXEC INSERT 실패:`, dbErr)
return
}
execSns.push({ model, execSn: predExecSn })
// Python /run-model 호출
let jobId: string
try {
const pythonRes = await fetch(`${apiUrl}/run-model`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
lat,
lon,
startTime,
runTime,
matTy: odMatTy,
matVol,
spillTime,
name: execNm,
}),
signal: AbortSignal.timeout(10000),
})
if (pythonRes.status === 503) {
const errData = await pythonRes.json() as { error?: string }
await wingPool.query(
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG=$1, CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$2`,
[errData.error || '분석 서버 포화', predExecSn]
)
await rollbackNewRecords(predExecSn, newlyCreatedSpilDataSn, newlyCreatedAcdntSn)
return
}
if (!pythonRes.ok) {
throw new Error(`Python 서버 응답 오류: ${pythonRes.status}`)
}
const pythonData = await pythonRes.json() as { job_id: string }
jobId = pythonData.job_id
} catch {
await wingPool.query(
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG='Python 분석 서버에 연결할 수 없습니다.', CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$1`,
[predExecSn]
)
// 이 모델의 PRED_EXEC만 롤백 (다른 모델은 계속 진행)
await rollbackNewRecords(predExecSn, null, null)
return
}
// RUNNING 업데이트
await wingPool.query(
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='RUNNING' WHERE PRED_EXEC_SN=$1`,
[predExecSn]
)
// 백그라운드 폴링 시작
pollAndSaveModel(jobId, predExecSn, apiUrl, algoCd).catch((err: unknown) =>
console.error(`[simulation] ${model} pollAndSaveModel 오류:`, err)
)
})
)
// ACDNT/SPIL_DATA가 신규 생성됐으나 모든 모델이 실패한 경우 롤백
const hasRunning = execSns.some(({ model }) => model !== 'KOSPS')
if (!hasRunning && newlyCreatedAcdntSn !== null) {
await rollbackNewRecords(null, newlyCreatedSpilDataSn, newlyCreatedAcdntSn)
return res.status(503).json({ error: '분석 서버에 연결할 수 없습니다.' })
}
// 즉시 응답 (하위 호환을 위해 execSn도 포함)
res.json({
success: true,
execSns,
execSn: execSns[0]?.execSn ?? 0,
acdntSn: resolvedAcdntSn,
status: 'RUNNING',
})
} catch {
res.status(500).json({ error: '시뮬레이션 실행 실패', message: '서버 내부 오류가 발생했습니다.' })
}
})
// ============================================================
// GET /api/simulation/status/:execSn
// 시뮬레이션 실행 상태 및 결과 조회
// ============================================================
/**
* PRED_EXEC 테이블에서 실행 상태를 조회한다.
* DB 상태(COMPLETED/FAILED)를 프론트 상태(DONE/ERROR)로 매핑하여 반환한다.
*/
router.get('/status/:execSn', requireAuth, async (req: Request, res: Response) => {
const execSn = parseInt(req.params.execSn as string, 10)
if (isNaN(execSn) || execSn <= 0) {
return res.status(400).json({ error: '유효하지 않은 execSn' })
}
try {
const result = await wingPool.query(
`SELECT pe.EXEC_STTS_CD, pe.RSLT_DATA, pe.ERR_MSG, pe.BGNG_DTM, pe.ALGO_CD, sd.FCST_HR,
(
SELECT AVG(hist.REQD_SEC::FLOAT / hsd.FCST_HR)
FROM wing.PRED_EXEC hist
JOIN wing.SPIL_DATA hsd ON hist.SPIL_DATA_SN = hsd.SPIL_DATA_SN
WHERE hist.ALGO_CD = pe.ALGO_CD
AND hist.EXEC_STTS_CD = 'COMPLETED'
AND hist.REQD_SEC IS NOT NULL AND hist.REQD_SEC > 0
AND hsd.FCST_HR IS NOT NULL AND hsd.FCST_HR > 0
) AS avg_sec_per_hr
FROM wing.PRED_EXEC pe
LEFT JOIN wing.SPIL_DATA sd ON pe.SPIL_DATA_SN = sd.SPIL_DATA_SN
WHERE pe.PRED_EXEC_SN=$1`,
[execSn]
)
if (result.rows.length === 0) {
return res.status(404).json({ error: '분석 기록을 찾을 수 없습니다.' })
}
const row = result.rows[0]
const dbStatus: string = row.exec_stts_cd as string
// DB 상태 → API 상태 매핑
const statusMap: Record<string, string> = {
PENDING: 'PENDING',
RUNNING: 'RUNNING',
COMPLETED: 'DONE',
FAILED: 'ERROR',
}
const status = statusMap[dbStatus] ?? dbStatus
if (status === 'DONE' && row.rslt_data) {
const algoCd = String(row.algo_cd ?? '')
const modelName = ALGO_CD_TO_MODEL_NAME[algoCd] ?? algoCd
const { trajectory, summary, centerPoints, windData, hydrData } = transformResult(row.rslt_data as PythonTimeStep[], modelName)
return res.json({ status, trajectory, summary, centerPoints, windData, hydrData })
}
if (status === 'ERROR') {
return res.json({ status, error: (row.err_msg as string) || '분석 중 오류가 발생했습니다.' })
}
// PENDING/RUNNING: 경과 시간 기반 진행률 계산
// 과거 실행의 초/예측시간 비율(avg_sec_per_hr) × 현재 fcst_hr로 추정, 이력 없으면 5초/hr 폴백
let progress: number | undefined;
if (status === 'RUNNING' && row.bgng_dtm) {
const fcstHr = Number(row.fcst_hr) || 24;
const avgSecPerHr = row.avg_sec_per_hr ? Number(row.avg_sec_per_hr) : 5;
const estimatedSec = avgSecPerHr * fcstHr;
const elapsedSec = (Date.now() - new Date(row.bgng_dtm as string).getTime()) / 1000;
progress = Math.min(95, Math.floor((elapsedSec / estimatedSec) * 100));
}
res.json({ status, ...(progress !== undefined && { progress }) })
} catch {
res.status(500).json({ error: '상태 조회 실패' })
}
})
// ============================================================
// POST /api/simulation/run-model (동기 방식)
// 예측 완료 후 결과를 직접 반환한다.
// ============================================================
/**
* 선택된 모델로 확산 시뮬레이션을 실행하고 완료될 때까지 대기한 후 결과를 반환한다.
* 다중 모델은 병렬로 실행되며, 일부 모델 실패 시 성공한 모델 결과는 포함된다.
*/
router.post('/run-model', requireAuth, async (req: Request, res: Response) => {
try {
const { acdntSn: rawAcdntSn, acdntNm, spillUnit, spillTypeCd,
lat, lon, runTime, matTy, matVol, spillTime, startTime,
models: rawModels } = req.body
let requestedModels: string[] = Array.isArray(rawModels) && rawModels.length > 0
? (rawModels as string[])
: ['OpenDrift']
// 1. 필수 파라미터 검증
if (lat === undefined || lon === undefined || runTime === undefined) {
return res.status(400).json({ error: '필수 파라미터 누락', required: ['lat', 'lon', 'runTime'] })
}
if (!isValidLatitude(lat)) {
return res.status(400).json({ error: '유효하지 않은 위도', message: '위도는 -90~90 범위여야 합니다.' })
}
if (!isValidLongitude(lon)) {
return res.status(400).json({ error: '유효하지 않은 경도', message: '경도는 -180~180 범위여야 합니다.' })
}
if (!isValidNumber(runTime, 1, 720)) {
return res.status(400).json({ error: '유효하지 않은 예측 시간', message: '예측 시간은 1~720 범위여야 합니다.' })
}
if (matVol !== undefined && !isValidNumber(matVol, 0, 1000000)) {
return res.status(400).json({ error: '유효하지 않은 유출량' })
}
if (matTy !== undefined && (typeof matTy !== 'string' || !isValidStringLength(matTy, 50))) {
return res.status(400).json({ error: '유효하지 않은 유종' })
}
if (!rawAcdntSn && (!acdntNm || typeof acdntNm !== 'string' || !acdntNm.trim())) {
return res.status(400).json({ error: '사고를 선택하거나 사고명을 입력해야 합니다.' })
}
if (acdntNm && (typeof acdntNm !== 'string' || !isValidStringLength(acdntNm, 200))) {
return res.status(400).json({ error: '사고명은 200자 이내여야 합니다.' })
}
// 2. NC 파일 존재 여부 확인
if (requestedModels.includes('OpenDrift')) {
try {
const checkRes = await fetch(`${PYTHON_API_URL}/check-nc`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ lat, lon, startTime }),
signal: AbortSignal.timeout(5000),
})
if (!checkRes.ok) {
// NC 파일 없으면 OpenDrift만 제외, 나머지 모델(POSEIDON 등)은 계속 진행
requestedModels = requestedModels.filter(m => m !== 'OpenDrift')
if (requestedModels.length === 0) {
return res.status(409).json({
error: '해당 좌표의 해양 기상 데이터가 없습니다.',
message: 'NC 파일이 준비되지 않았습니다.',
})
}
}
} catch {
// Python 서버 미기동 — 이후 단계에서 처리
}
}
// 3. ACDNT/SPIL_DATA 생성 또는 조회
let resolvedAcdntSn: number | null = rawAcdntSn ? Number(rawAcdntSn) : null
let resolvedSpilDataSn: number | null = null
let newlyCreatedAcdntSn: number | null = null
let newlyCreatedSpilDataSn: number | null = null
if (!resolvedAcdntSn && acdntNm) {
try {
const occrn = startTime ?? new Date().toISOString()
const acdntRes = await wingPool.query(
`INSERT INTO wing.ACDNT
(ACDNT_CD, ACDNT_NM, ACDNT_TP_CD, OCCRN_DTM, LAT, LNG, ACDNT_STTS_CD, USE_YN, REG_DTM)
VALUES (
'INC-' || EXTRACT(YEAR FROM NOW())::TEXT || '-' ||
LPAD(
(SELECT COALESCE(MAX(CAST(SPLIT_PART(ACDNT_CD, '-', 3) AS INTEGER)), 0) + 1
FROM wing.ACDNT
WHERE ACDNT_CD LIKE 'INC-' || EXTRACT(YEAR FROM NOW())::TEXT || '-%')::TEXT,
4, '0'
),
$1, '유류유출', $2, $3, $4, 'ACTIVE', 'Y', NOW()
)
RETURNING ACDNT_SN`,
[acdntNm.trim(), occrn, lat, lon]
)
resolvedAcdntSn = acdntRes.rows[0].acdnt_sn as number
newlyCreatedAcdntSn = resolvedAcdntSn
const spilRes = await wingPool.query(
`INSERT INTO wing.SPIL_DATA (ACDNT_SN, OIL_TP_CD, SPIL_QTY, SPIL_UNIT_CD, SPIL_TP_CD, FCST_HR, REG_DTM)
VALUES ($1, $2, $3, $4, $5, $6, NOW())
RETURNING SPIL_DATA_SN`,
[
resolvedAcdntSn,
OIL_DB_CODE_MAP[matTy as string] ?? 'BUNKER_C',
matVol ?? 0,
UNIT_MAP[spillUnit as string] ?? 'KL',
SPIL_TYPE_MAP[spillTypeCd as string] ?? 'CONTINUOUS',
runTime,
]
)
resolvedSpilDataSn = spilRes.rows[0].spil_data_sn as number
newlyCreatedSpilDataSn = resolvedSpilDataSn
} catch (dbErr) {
console.error('[simulation/run-model] ACDNT/SPIL_DATA INSERT 실패:', dbErr)
return res.status(500).json({ error: '사고 정보 생성 실패' })
}
}
if (resolvedAcdntSn && !resolvedSpilDataSn) {
try {
const spilRes = await wingPool.query(
`INSERT INTO wing.SPIL_DATA (ACDNT_SN, OIL_TP_CD, SPIL_QTY, SPIL_UNIT_CD, SPIL_TP_CD, FCST_HR, REG_DTM)
VALUES ($1, $2, $3, $4, $5, $6, NOW())
RETURNING SPIL_DATA_SN`,
[
resolvedAcdntSn,
OIL_DB_CODE_MAP[matTy as string] ?? 'BUNKER_C',
matVol ?? 0,
UNIT_MAP[spillUnit as string] ?? 'KL',
SPIL_TYPE_MAP[spillTypeCd as string] ?? 'CONTINUOUS',
runTime,
]
)
resolvedSpilDataSn = spilRes.rows[0].spil_data_sn as number
} catch (dbErr) {
console.error('[simulation/run-model] SPIL_DATA INSERT 실패:', dbErr)
}
}
const odMatTy = matTy !== undefined ? (OIL_TYPE_MAP[matTy as string] ?? (matTy as string)) : undefined
const execNmBase = `EXPC_${Date.now()}`
// 이번 예측 실행을 식별하는 그룹 SN 생성
let predRunSn: number
try {
const runSnRes = await wingPool.query("SELECT nextval('wing.PRED_RUN_SN_SEQ') AS pred_run_sn")
predRunSn = runSnRes.rows[0].pred_run_sn as number
} catch (dbErr) {
console.error('[simulation/run-model] PRED_RUN_SN_SEQ 조회 실패:', dbErr)
return res.status(500).json({ error: '실행 SN 생성 실패' })
}
// KOSPS: PRED_EXEC INSERT(PENDING)만 수행
const execSns: Array<{ model: string; execSn: number }> = []
if (requestedModels.includes('KOSPS')) {
try {
const kospsExecNm = `${execNmBase}_KOSPS`
const insertRes = await wingPool.query(
`INSERT INTO wing.PRED_EXEC (ACDNT_SN, SPIL_DATA_SN, ALGO_CD, EXEC_STTS_CD, EXEC_NM, PRED_RUN_SN, BGNG_DTM)
VALUES ($1, $2, 'KOSPS', 'PENDING', $3, $4, NOW())
RETURNING PRED_EXEC_SN`,
[resolvedAcdntSn, resolvedSpilDataSn, kospsExecNm, predRunSn]
)
execSns.push({ model: 'KOSPS', execSn: insertRes.rows[0].pred_exec_sn as number })
} catch (dbErr) {
console.error('[simulation/run-model] KOSPS PRED_EXEC INSERT 실패:', dbErr)
}
}
// 4. API 연동 모델 시작 및 완료 대기 (병렬)
const apiModels = requestedModels.filter((m) => m !== 'KOSPS' && MODEL_ALGO_CD_MAP[m] !== undefined)
interface SyncModelResult {
model: string
execSn: number
status: 'DONE' | 'ERROR'
trajectory?: ReturnType<typeof transformResult>['trajectory']
summary?: ReturnType<typeof transformResult>['summary']
stepSummaries?: ReturnType<typeof transformResult>['stepSummaries']
centerPoints?: ReturnType<typeof transformResult>['centerPoints']
windData?: ReturnType<typeof transformResult>['windData']
hydrData?: ReturnType<typeof transformResult>['hydrData']
error?: string
}
const modelResults = await Promise.all(
apiModels.map(async (model): Promise<SyncModelResult> => {
const algoCd = MODEL_ALGO_CD_MAP[model]
const apiUrl = MODEL_API_URL_MAP[model]
const execNm = `${execNmBase}_${algoCd}`
// PRED_EXEC INSERT
let predExecSn: number
try {
const insertRes = await wingPool.query(
`INSERT INTO wing.PRED_EXEC (ACDNT_SN, SPIL_DATA_SN, ALGO_CD, EXEC_STTS_CD, EXEC_NM, PRED_RUN_SN, BGNG_DTM)
VALUES ($1, $2, $3, 'PENDING', $4, $5, NOW())
RETURNING PRED_EXEC_SN`,
[resolvedAcdntSn, resolvedSpilDataSn, algoCd, execNm, predRunSn]
)
predExecSn = insertRes.rows[0].pred_exec_sn as number
} catch (dbErr) {
console.error(`[simulation/run-model] ${model} PRED_EXEC INSERT 실패:`, dbErr)
return { model, execSn: 0, status: 'ERROR', error: 'DB 오류' }
}
execSns.push({ model, execSn: predExecSn })
// Python /run-model 호출
let jobId: string | undefined
try {
const pythonRes = await fetch(`${apiUrl}/run-model`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ lat, lon, startTime, runTime, matTy: odMatTy, matVol, spillTime, name: execNm }),
signal: AbortSignal.timeout(POLL_TIMEOUT_MS),
})
if (pythonRes.status === 503) {
const errData = await pythonRes.json() as { error?: string }
const errMsg = errData.error || '분석 서버 포화'
await wingPool.query(
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG=$1, CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$2`,
[errMsg, predExecSn]
)
return { model, execSn: predExecSn, status: 'ERROR', error: errMsg }
}
if (!pythonRes.ok) {
throw new Error(`Python 서버 응답 오류: ${pythonRes.status}`)
}
const pythonData = await pythonRes.json() as {
success?: boolean;
result?: PythonTimeStep[];
job_id?: string;
error?: string;
message?: string;
error_code?: number;
}
// 동기 성공 응답 (OpenDrift & POSEIDON 공통)
if (Array.isArray(pythonData.result)) {
await wingPool.query(
`UPDATE wing.PRED_EXEC
SET EXEC_STTS_CD='COMPLETED', RSLT_DATA=$1,
CMPL_DTM=NOW(), REQD_SEC=EXTRACT(EPOCH FROM (NOW() - BGNG_DTM))::INTEGER
WHERE PRED_EXEC_SN=$2`,
[JSON.stringify(pythonData.result), predExecSn]
)
const { trajectory, summary, stepSummaries, centerPoints, windData, hydrData } =
transformResult(pythonData.result, model)
return { model, execSn: predExecSn, status: 'DONE', trajectory, summary, stepSummaries, centerPoints, windData, hydrData }
}
// 비동기 응답 (하위 호환)
if (pythonData.job_id) {
jobId = pythonData.job_id
} else {
// 오류 응답 (success: false, HTTP 200)
const errMsg = pythonData.error || pythonData.message || '분석 오류'
await wingPool.query(
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG=$1, CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$2`,
[errMsg, predExecSn]
)
return { model, execSn: predExecSn, status: 'ERROR', error: errMsg }
}
} catch (fetchErr) {
const errMsg = 'Python 분석 서버에 연결할 수 없습니다.'
await wingPool.query(
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG=$1, CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$2`,
[errMsg, predExecSn]
)
return { model, execSn: predExecSn, status: 'ERROR', error: errMsg }
}
// RUNNING 업데이트 (비동기 폴링 경로)
await wingPool.query(
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='RUNNING' WHERE PRED_EXEC_SN=$1`,
[predExecSn]
)
// 결과 동기 대기
try {
const rawResult = await runModelSync(jobId!, predExecSn, apiUrl)
const { trajectory, summary, stepSummaries, centerPoints, windData, hydrData } = transformResult(rawResult, model)
return { model, execSn: predExecSn, status: 'DONE', trajectory, summary, stepSummaries, centerPoints, windData, hydrData }
} catch (syncErr) {
return { model, execSn: predExecSn, status: 'ERROR', error: (syncErr as Error).message }
}
})
)
// 모든 모델이 실패하고 신규 생성한 ACDNT가 있으면 롤백
const hasSuccess = modelResults.some((r) => r.status === 'DONE')
if (!hasSuccess && newlyCreatedAcdntSn !== null) {
for (const r of modelResults) {
if (r.execSn) await rollbackNewRecords(r.execSn, null, null)
}
await rollbackNewRecords(null, newlyCreatedSpilDataSn, newlyCreatedAcdntSn)
return res.status(503).json({ error: '분석 서버에 연결할 수 없습니다.' })
}
res.json({
success: true,
acdntSn: resolvedAcdntSn,
predRunSn,
execSns: [...execSns, ...modelResults.map(({ model, execSn }) => ({ model, execSn }))],
results: modelResults,
})
} catch {
res.status(500).json({ error: '시뮬레이션 실행 실패', message: '서버 내부 오류가 발생했습니다.' })
}
})
// ============================================================
// 백그라운드 폴링
// ============================================================
async function pollAndSaveModel(jobId: string, execSn: number, apiUrl: string, algoCode: string): Promise<void> {
const deadline = Date.now() + POLL_TIMEOUT_MS
while (Date.now() < deadline) {
await new Promise<void>(resolve => setTimeout(resolve, POLL_INTERVAL_MS))
try {
const pollRes = await fetch(`${apiUrl}/status/${jobId}`, {
signal: AbortSignal.timeout(5000),
})
if (!pollRes.ok) continue
const data = await pollRes.json() as PythonStatusResponse
if (data.status === 'DONE' && data.result) {
await wingPool.query(
`UPDATE wing.PRED_EXEC
SET EXEC_STTS_CD='COMPLETED',
RSLT_DATA=$1,
CMPL_DTM=NOW(),
REQD_SEC=EXTRACT(EPOCH FROM (NOW() - BGNG_DTM))::INTEGER
WHERE PRED_EXEC_SN=$2`,
[JSON.stringify(data.result), execSn]
)
return
}
if (data.status === 'ERROR') {
await wingPool.query(
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG=$1, CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$2`,
[data.error ?? '분석 오류', execSn]
)
return
}
} catch {
// 개별 폴링 오류는 무시하고 재시도
}
}
// 타임아웃 처리
await wingPool.query(
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG='분석 시간 초과 (30분)', CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$1`,
[execSn]
)
}
// ============================================================
// 동기 폴링: Python 결과 대기 후 반환
// ============================================================
async function runModelSync(jobId: string, execSn: number, apiUrl: string): Promise<PythonTimeStep[]> {
const deadline = Date.now() + POLL_TIMEOUT_MS
while (Date.now() < deadline) {
await new Promise<void>(resolve => setTimeout(resolve, POLL_INTERVAL_MS))
let data: PythonStatusResponse
try {
const pollRes = await fetch(`${apiUrl}/status/${jobId}`, {
signal: AbortSignal.timeout(5000),
})
if (!pollRes.ok) continue
data = await pollRes.json() as PythonStatusResponse
} catch {
// 네트워크 오류 — 재시도
continue
}
if (data.status === 'DONE' && data.result) {
await wingPool.query(
`UPDATE wing.PRED_EXEC
SET EXEC_STTS_CD='COMPLETED',
RSLT_DATA=$1,
CMPL_DTM=NOW(),
REQD_SEC=EXTRACT(EPOCH FROM (NOW() - BGNG_DTM))::INTEGER
WHERE PRED_EXEC_SN=$2`,
[JSON.stringify(data.result), execSn]
)
return data.result
}
if (data.status === 'ERROR') {
const errMsg = data.error ?? '분석 오류'
await wingPool.query(
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG=$1, CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$2`,
[errMsg, execSn]
)
throw new Error(errMsg)
}
}
await wingPool.query(
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG='분석 시간 초과 (30분)', CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$1`,
[execSn]
)
throw new Error('분석 시간 초과 (30분)')
}
// ============================================================
// 타입 및 결과 변환
// ============================================================
interface PythonParticle {
lat: number
lon: number
stranded?: 0 | 1
}
interface WindPoint {
lat: number
lon: number
wind_speed: number
wind_direction: number
}
interface HydrGrid {
lonInterval: number[]
boundLonLat: { top: number; bottom: number; left: number; right: number }
rows: number
cols: number
latInterval: number[]
}
interface PythonTimeStep {
particles: PythonParticle[]
remaining_volume_m3: number
weathered_volume_m3: number
evaporation_m3?: number
dispersion_m3?: number
pollution_area_km2: number
beached_volume_m3: number
pollution_coast_length_m: number
center_lat?: number
center_lon?: number
wind_data?: WindPoint[]
hydr_data?: [number[][], number[][]]
hydr_grid?: HydrGrid
}
interface PythonStatusResponse {
status: 'RUNNING' | 'DONE' | 'ERROR'
result?: PythonTimeStep[]
error?: string
}
// ALGO_CD → 프론트엔드 모델명 매핑
const ALGO_CD_TO_MODEL_NAME: Record<string, string> = {
'OPENDRIFT': 'OpenDrift',
'POSEIDON': 'POSEIDON',
}
function transformResult(rawResult: PythonTimeStep[], model: string) {
const trajectory = rawResult.flatMap((step, stepIdx) =>
step.particles.map((p, i) => ({
lat: p.lat,
lon: p.lon,
time: stepIdx,
particle: i,
stranded: p.stranded,
}))
)
const lastStep = rawResult[rawResult.length - 1]
const summary = {
remainingVolume: lastStep.remaining_volume_m3,
weatheredVolume: lastStep.weathered_volume_m3,
evaporationVolume: lastStep.evaporation_m3 ?? lastStep.weathered_volume_m3 * 0.65,
dispersionVolume: lastStep.dispersion_m3 ?? lastStep.weathered_volume_m3 * 0.35,
pollutionArea: lastStep.pollution_area_km2,
beachedVolume: lastStep.beached_volume_m3,
pollutionCoastLength: lastStep.pollution_coast_length_m,
}
const centerPoints = rawResult
.map((step, stepIdx) =>
step.center_lat != null && step.center_lon != null
? { lat: step.center_lat, lon: step.center_lon, time: stepIdx, model }
: null
)
.filter((p): p is { lat: number; lon: number; time: number; model: string } => p !== null)
const windData = rawResult.map((step) => step.wind_data ?? [])
const hydrData = rawResult.map((step) =>
step.hydr_data && step.hydr_grid
? { value: step.hydr_data, grid: step.hydr_grid }
: null
)
const stepSummaries = rawResult.map((step) => ({
remainingVolume: step.remaining_volume_m3,
weatheredVolume: step.weathered_volume_m3,
evaporationVolume: step.evaporation_m3 ?? step.weathered_volume_m3 * 0.65,
dispersionVolume: step.dispersion_m3 ?? step.weathered_volume_m3 * 0.35,
pollutionArea: step.pollution_area_km2,
beachedVolume: step.beached_volume_m3,
pollutionCoastLength: step.pollution_coast_length_m,
}))
return { trajectory, summary, stepSummaries, centerPoints, windData, hydrData }
}
export default router