922 lines
36 KiB
TypeScript
Executable File
922 lines
36 KiB
TypeScript
Executable File
import { Router, Request, Response } from 'express'
|
||
import { wingPool } from '../db/wingDb.js'
|
||
import { requireAuth } from '../auth/authMiddleware.js'
|
||
import {
|
||
isValidLatitude,
|
||
isValidLongitude,
|
||
isValidNumber,
|
||
isValidStringLength,
|
||
} from '../middleware/security.js'
|
||
|
||
const router = Router()
|
||
|
||
const PYTHON_API_URL = process.env.PYTHON_API_URL ?? 'http://localhost:5003'
|
||
const POSEIDON_API_URL = process.env.POSEIDON_API_URL ?? 'http://localhost:5004'
|
||
const POLL_INTERVAL_MS = 3000
|
||
const POLL_TIMEOUT_MS = 30 * 60 * 1000 // 30분
|
||
|
||
// 유종 매핑: 한국어 UI 선택값 → OpenDrift 유종 코드
|
||
// 추후 DB/설정 파일로 외부화 예정 (개발 단계 임시 구현)
|
||
const OIL_TYPE_MAP: Record<string, string> = {
|
||
'벙커C유': 'GENERIC BUNKER C',
|
||
'경유': 'GENERIC DIESEL',
|
||
'원유': 'WEST TEXAS INTERMEDIATE (WTI)',
|
||
'중유': 'GENERIC HEAVY FUEL OIL',
|
||
'등유': 'FUEL OIL NO.1 (KEROSENE)',
|
||
'휘발유': 'GENERIC GASOLINE',
|
||
}
|
||
|
||
// 유종 매핑: 한국어 UI → DB 저장 코드
|
||
const OIL_DB_CODE_MAP: Record<string, string> = {
|
||
'벙커C유': 'BUNKER_C',
|
||
'경유': 'DIESEL',
|
||
'원유': 'CRUDE_OIL',
|
||
'중유': 'HEAVY_FUEL_OIL',
|
||
'등유': 'KEROSENE',
|
||
'휘발유': 'GASOLINE',
|
||
}
|
||
|
||
// 유출 형태 매핑: 한국어 UI → DB 저장 코드
|
||
const SPIL_TYPE_MAP: Record<string, string> = {
|
||
'연속': 'CONTINUOUS',
|
||
'비연속': 'DISCONTINUOUS',
|
||
'순간 유출': 'INSTANT',
|
||
}
|
||
|
||
// 단위 매핑: 한국어 UI → DB 저장 코드
|
||
const UNIT_MAP: Record<string, string> = {
|
||
'kL': 'KL', 'ton': 'TON', 'barrel': 'BBL',
|
||
}
|
||
|
||
// ============================================================
|
||
// 신규 생성된 ACDNT/SPIL_DATA/PRED_EXEC 롤백 헬퍼
|
||
// Python 호출 실패 시 이번 요청에서 생성된 레코드만 삭제한다.
|
||
// ============================================================
|
||
async function rollbackNewRecords(
|
||
predExecSn: number | null,
|
||
newSpilDataSn: number | null,
|
||
newAcdntSn: number | null
|
||
): Promise<void> {
|
||
try {
|
||
if (predExecSn !== null) {
|
||
await wingPool.query('DELETE FROM wing.PRED_EXEC WHERE PRED_EXEC_SN=$1', [predExecSn])
|
||
}
|
||
if (newSpilDataSn !== null) {
|
||
await wingPool.query('DELETE FROM wing.SPIL_DATA WHERE SPIL_DATA_SN=$1', [newSpilDataSn])
|
||
}
|
||
if (newAcdntSn !== null) {
|
||
await wingPool.query('DELETE FROM wing.ACDNT WHERE ACDNT_SN=$1', [newAcdntSn])
|
||
}
|
||
} catch (cleanupErr) {
|
||
console.error('[simulation] 롤백 실패:', cleanupErr)
|
||
}
|
||
}
|
||
|
||
// 모델명 → ALGO_CD 매핑
|
||
const MODEL_ALGO_CD_MAP: Record<string, string> = {
|
||
'OpenDrift': 'OPENDRIFT',
|
||
'POSEIDON': 'POSEIDON',
|
||
}
|
||
|
||
// 모델명 → API URL 매핑
|
||
const MODEL_API_URL_MAP: Record<string, string> = {
|
||
'OpenDrift': PYTHON_API_URL,
|
||
'POSEIDON': POSEIDON_API_URL,
|
||
}
|
||
|
||
// ============================================================
|
||
// POST /api/simulation/run
|
||
// 확산 시뮬레이션 실행 (다중 모델 지원: OpenDrift, POSEIDON)
|
||
// ============================================================
|
||
/**
|
||
* 선택된 모델(OpenDrift, POSEIDON)로 확산 시뮬레이션을 실행한다.
|
||
* 각 모델에 대해 PRED_EXEC 레코드를 별도 생성하고 Python API에 병렬 제출한다.
|
||
* KOSPS 모델은 PRED_EXEC INSERT(PENDING)만 수행하고 외부 API 연동은 하지 않는다.
|
||
* 프론트엔드는 execSns 배열의 각 execSn으로 GET /status/:execSn을 폴링하여 결과를 수신한다.
|
||
*/
|
||
router.post('/run', requireAuth, async (req: Request, res: Response) => {
|
||
try {
|
||
const { acdntSn: rawAcdntSn, acdntNm, spillUnit, spillTypeCd,
|
||
lat, lon, runTime, matTy, matVol, spillTime, startTime,
|
||
models: rawModels } = req.body
|
||
|
||
// 실행할 모델 목록 (기본값: OpenDrift)
|
||
const requestedModels: string[] = Array.isArray(rawModels) && rawModels.length > 0
|
||
? (rawModels as string[])
|
||
: ['OpenDrift']
|
||
|
||
// 1. 필수 파라미터 검증
|
||
if (lat === undefined || lon === undefined || runTime === undefined) {
|
||
return res.status(400).json({
|
||
error: '필수 파라미터 누락',
|
||
required: ['lat', 'lon', 'runTime'],
|
||
})
|
||
}
|
||
if (!isValidLatitude(lat)) {
|
||
return res.status(400).json({ error: '유효하지 않은 위도', message: '위도는 -90~90 범위여야 합니다.' })
|
||
}
|
||
if (!isValidLongitude(lon)) {
|
||
return res.status(400).json({ error: '유효하지 않은 경도', message: '경도는 -180~180 범위여야 합니다.' })
|
||
}
|
||
if (!isValidNumber(runTime, 1, 720)) {
|
||
return res.status(400).json({ error: '유효하지 않은 예측 시간', message: '예측 시간은 1~720 범위여야 합니다.' })
|
||
}
|
||
if (matVol !== undefined && !isValidNumber(matVol, 0, 1000000)) {
|
||
return res.status(400).json({ error: '유효하지 않은 유출량' })
|
||
}
|
||
if (matTy !== undefined && (typeof matTy !== 'string' || !isValidStringLength(matTy, 50))) {
|
||
return res.status(400).json({ error: '유효하지 않은 유종' })
|
||
}
|
||
// acdntSn 없는 경우 acdntNm 필수
|
||
if (!rawAcdntSn && (!acdntNm || typeof acdntNm !== 'string' || !acdntNm.trim())) {
|
||
return res.status(400).json({ error: '사고를 선택하거나 사고명을 입력해야 합니다.' })
|
||
}
|
||
if (acdntNm && (typeof acdntNm !== 'string' || !isValidStringLength(acdntNm, 200))) {
|
||
return res.status(400).json({ error: '사고명은 200자 이내여야 합니다.' })
|
||
}
|
||
|
||
// 2. Python NC 파일 존재 여부 확인 (ACDNT 생성 전에 수행하여 고아 레코드 방지)
|
||
// OpenDrift 모델이 포함된 경우에만 check-nc 수행
|
||
if (requestedModels.includes('OpenDrift')) {
|
||
try {
|
||
const checkRes = await fetch(`${PYTHON_API_URL}/check-nc`, {
|
||
method: 'POST',
|
||
headers: { 'Content-Type': 'application/json' },
|
||
body: JSON.stringify({ lat, lon, startTime }),
|
||
signal: AbortSignal.timeout(5000),
|
||
})
|
||
if (!checkRes.ok) {
|
||
return res.status(409).json({
|
||
error: '해당 좌표의 해양 기상 데이터가 없습니다.',
|
||
message: 'NC 파일이 준비되지 않았습니다.',
|
||
})
|
||
}
|
||
} catch {
|
||
// Python 서버 미기동 — 5번에서 처리
|
||
}
|
||
}
|
||
|
||
// 1-B. acdntSn 미제공 시 ACDNT + SPIL_DATA 생성
|
||
let resolvedAcdntSn: number | null = rawAcdntSn ? Number(rawAcdntSn) : null
|
||
let resolvedSpilDataSn: number | null = null
|
||
// 이번 요청에서 신규 생성된 레코드 추적 (Python 실패 시 롤백 대상)
|
||
let newlyCreatedAcdntSn: number | null = null
|
||
let newlyCreatedSpilDataSn: number | null = null
|
||
|
||
if (!resolvedAcdntSn && acdntNm) {
|
||
try {
|
||
const occrn = startTime ?? new Date().toISOString()
|
||
const acdntRes = await wingPool.query(
|
||
`INSERT INTO wing.ACDNT
|
||
(ACDNT_CD, ACDNT_NM, ACDNT_TP_CD, OCCRN_DTM, LAT, LNG, ACDNT_STTS_CD, USE_YN, REG_DTM)
|
||
VALUES (
|
||
'INC-' || EXTRACT(YEAR FROM NOW())::TEXT || '-' ||
|
||
LPAD(
|
||
(SELECT COALESCE(MAX(CAST(SPLIT_PART(ACDNT_CD, '-', 3) AS INTEGER)), 0) + 1
|
||
FROM wing.ACDNT
|
||
WHERE ACDNT_CD LIKE 'INC-' || EXTRACT(YEAR FROM NOW())::TEXT || '-%')::TEXT,
|
||
4, '0'
|
||
),
|
||
$1, '유류유출', $2, $3, $4, 'ACTIVE', 'Y', NOW()
|
||
)
|
||
RETURNING ACDNT_SN`,
|
||
[acdntNm.trim(), occrn, lat, lon]
|
||
)
|
||
resolvedAcdntSn = acdntRes.rows[0].acdnt_sn as number
|
||
newlyCreatedAcdntSn = resolvedAcdntSn
|
||
|
||
const spilRes = await wingPool.query(
|
||
`INSERT INTO wing.SPIL_DATA (ACDNT_SN, OIL_TP_CD, SPIL_QTY, SPIL_UNIT_CD, SPIL_TP_CD, FCST_HR, REG_DTM)
|
||
VALUES ($1, $2, $3, $4, $5, $6, NOW())
|
||
RETURNING SPIL_DATA_SN`,
|
||
[
|
||
resolvedAcdntSn,
|
||
OIL_DB_CODE_MAP[matTy as string] ?? 'BUNKER_C',
|
||
matVol ?? 0,
|
||
UNIT_MAP[spillUnit as string] ?? 'KL',
|
||
SPIL_TYPE_MAP[spillTypeCd as string] ?? 'CONTINUOUS',
|
||
runTime,
|
||
]
|
||
)
|
||
resolvedSpilDataSn = spilRes.rows[0].spil_data_sn as number
|
||
newlyCreatedSpilDataSn = resolvedSpilDataSn
|
||
} catch (dbErr) {
|
||
console.error('[simulation] ACDNT/SPIL_DATA INSERT 실패:', dbErr)
|
||
return res.status(500).json({ error: '사고 정보 생성 실패' })
|
||
}
|
||
}
|
||
|
||
// 3. 기존 사고의 경우 SPIL_DATA_SN 조회
|
||
if (resolvedAcdntSn && !resolvedSpilDataSn) {
|
||
try {
|
||
const spilRes = await wingPool.query(
|
||
`SELECT SPIL_DATA_SN FROM wing.SPIL_DATA WHERE ACDNT_SN = $1 ORDER BY SPIL_DATA_SN DESC LIMIT 1`,
|
||
[resolvedAcdntSn]
|
||
)
|
||
if (spilRes.rows.length > 0) {
|
||
resolvedSpilDataSn = spilRes.rows[0].spil_data_sn as number
|
||
}
|
||
} catch (dbErr) {
|
||
console.error('[simulation] SPIL_DATA 조회 실패:', dbErr)
|
||
}
|
||
}
|
||
|
||
// matTy 변환: 한국어 유종 → OpenDrift 유종 코드
|
||
// 매핑 대상이 아니면 원본 값 그대로 사용 (영문 직접 입력 대응)
|
||
const odMatTy = matTy !== undefined ? (OIL_TYPE_MAP[matTy as string] ?? (matTy as string)) : undefined
|
||
|
||
// 4. 각 모델별 PRED_EXEC INSERT 및 API 호출 (병렬)
|
||
// KOSPS: PRED_EXEC PENDING 생성만 하고 배열에서 제외 (외부 API 미연동)
|
||
const execNmBase = `EXPC_${Date.now()}`
|
||
const execSns: Array<{ model: string; execSn: number }> = []
|
||
|
||
// KOSPS 처리: PRED_EXEC INSERT(PENDING)만 수행
|
||
if (requestedModels.includes('KOSPS')) {
|
||
try {
|
||
const kospsExecNm = `${execNmBase}_KOSPS`
|
||
const insertRes = await wingPool.query(
|
||
`INSERT INTO wing.PRED_EXEC (ACDNT_SN, SPIL_DATA_SN, ALGO_CD, EXEC_STTS_CD, EXEC_NM, BGNG_DTM)
|
||
VALUES ($1, $2, 'KOSPS', 'PENDING', $3, NOW())
|
||
RETURNING PRED_EXEC_SN`,
|
||
[resolvedAcdntSn, resolvedSpilDataSn, kospsExecNm]
|
||
)
|
||
execSns.push({ model: 'KOSPS', execSn: insertRes.rows[0].pred_exec_sn as number })
|
||
} catch (dbErr) {
|
||
console.error('[simulation] KOSPS PRED_EXEC INSERT 실패:', dbErr)
|
||
}
|
||
}
|
||
|
||
// API 연동 모델 필터링 (KOSPS 제외)
|
||
const apiModels = requestedModels.filter((m) => m !== 'KOSPS' && MODEL_ALGO_CD_MAP[m] !== undefined)
|
||
|
||
// 각 모델에 대해 PRED_EXEC INSERT → /run-model 호출
|
||
await Promise.all(
|
||
apiModels.map(async (model) => {
|
||
const algoCd = MODEL_ALGO_CD_MAP[model]
|
||
const apiUrl = MODEL_API_URL_MAP[model]
|
||
const execNm = `${execNmBase}_${algoCd}`
|
||
|
||
// PRED_EXEC INSERT (PENDING)
|
||
let predExecSn: number
|
||
try {
|
||
const insertRes = await wingPool.query(
|
||
`INSERT INTO wing.PRED_EXEC (ACDNT_SN, SPIL_DATA_SN, ALGO_CD, EXEC_STTS_CD, EXEC_NM, BGNG_DTM)
|
||
VALUES ($1, $2, $3, 'PENDING', $4, NOW())
|
||
RETURNING PRED_EXEC_SN`,
|
||
[resolvedAcdntSn, resolvedSpilDataSn, algoCd, execNm]
|
||
)
|
||
predExecSn = insertRes.rows[0].pred_exec_sn as number
|
||
} catch (dbErr) {
|
||
console.error(`[simulation] ${model} PRED_EXEC INSERT 실패:`, dbErr)
|
||
return
|
||
}
|
||
|
||
execSns.push({ model, execSn: predExecSn })
|
||
|
||
// Python /run-model 호출
|
||
let jobId: string
|
||
try {
|
||
const pythonRes = await fetch(`${apiUrl}/run-model`, {
|
||
method: 'POST',
|
||
headers: { 'Content-Type': 'application/json' },
|
||
body: JSON.stringify({
|
||
lat,
|
||
lon,
|
||
startTime,
|
||
runTime,
|
||
matTy: odMatTy,
|
||
matVol,
|
||
spillTime,
|
||
name: execNm,
|
||
}),
|
||
signal: AbortSignal.timeout(10000),
|
||
})
|
||
|
||
if (pythonRes.status === 503) {
|
||
const errData = await pythonRes.json() as { error?: string }
|
||
await wingPool.query(
|
||
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG=$1, CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$2`,
|
||
[errData.error || '분석 서버 포화', predExecSn]
|
||
)
|
||
await rollbackNewRecords(predExecSn, newlyCreatedSpilDataSn, newlyCreatedAcdntSn)
|
||
return
|
||
}
|
||
|
||
if (!pythonRes.ok) {
|
||
throw new Error(`Python 서버 응답 오류: ${pythonRes.status}`)
|
||
}
|
||
|
||
const pythonData = await pythonRes.json() as { job_id: string }
|
||
jobId = pythonData.job_id
|
||
} catch {
|
||
await wingPool.query(
|
||
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG='Python 분석 서버에 연결할 수 없습니다.', CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$1`,
|
||
[predExecSn]
|
||
)
|
||
// 이 모델의 PRED_EXEC만 롤백 (다른 모델은 계속 진행)
|
||
await rollbackNewRecords(predExecSn, null, null)
|
||
return
|
||
}
|
||
|
||
// RUNNING 업데이트
|
||
await wingPool.query(
|
||
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='RUNNING' WHERE PRED_EXEC_SN=$1`,
|
||
[predExecSn]
|
||
)
|
||
|
||
// 백그라운드 폴링 시작
|
||
pollAndSaveModel(jobId, predExecSn, apiUrl, algoCd).catch((err: unknown) =>
|
||
console.error(`[simulation] ${model} pollAndSaveModel 오류:`, err)
|
||
)
|
||
})
|
||
)
|
||
|
||
// ACDNT/SPIL_DATA가 신규 생성됐으나 모든 모델이 실패한 경우 롤백
|
||
const hasRunning = execSns.some(({ model }) => model !== 'KOSPS')
|
||
if (!hasRunning && newlyCreatedAcdntSn !== null) {
|
||
await rollbackNewRecords(null, newlyCreatedSpilDataSn, newlyCreatedAcdntSn)
|
||
return res.status(503).json({ error: '분석 서버에 연결할 수 없습니다.' })
|
||
}
|
||
|
||
// 즉시 응답 (하위 호환을 위해 execSn도 포함)
|
||
res.json({
|
||
success: true,
|
||
execSns,
|
||
execSn: execSns[0]?.execSn ?? 0,
|
||
acdntSn: resolvedAcdntSn,
|
||
status: 'RUNNING',
|
||
})
|
||
} catch {
|
||
res.status(500).json({ error: '시뮬레이션 실행 실패', message: '서버 내부 오류가 발생했습니다.' })
|
||
}
|
||
})
|
||
|
||
// ============================================================
|
||
// GET /api/simulation/status/:execSn
|
||
// 시뮬레이션 실행 상태 및 결과 조회
|
||
// ============================================================
|
||
/**
|
||
* PRED_EXEC 테이블에서 실행 상태를 조회한다.
|
||
* DB 상태(COMPLETED/FAILED)를 프론트 상태(DONE/ERROR)로 매핑하여 반환한다.
|
||
*/
|
||
router.get('/status/:execSn', requireAuth, async (req: Request, res: Response) => {
|
||
const execSn = parseInt(req.params.execSn as string, 10)
|
||
if (isNaN(execSn) || execSn <= 0) {
|
||
return res.status(400).json({ error: '유효하지 않은 execSn' })
|
||
}
|
||
|
||
try {
|
||
const result = await wingPool.query(
|
||
`SELECT pe.EXEC_STTS_CD, pe.RSLT_DATA, pe.ERR_MSG, pe.BGNG_DTM, pe.ALGO_CD, sd.FCST_HR,
|
||
(
|
||
SELECT AVG(hist.REQD_SEC::FLOAT / hsd.FCST_HR)
|
||
FROM wing.PRED_EXEC hist
|
||
JOIN wing.SPIL_DATA hsd ON hist.SPIL_DATA_SN = hsd.SPIL_DATA_SN
|
||
WHERE hist.ALGO_CD = pe.ALGO_CD
|
||
AND hist.EXEC_STTS_CD = 'COMPLETED'
|
||
AND hist.REQD_SEC IS NOT NULL AND hist.REQD_SEC > 0
|
||
AND hsd.FCST_HR IS NOT NULL AND hsd.FCST_HR > 0
|
||
) AS avg_sec_per_hr
|
||
FROM wing.PRED_EXEC pe
|
||
LEFT JOIN wing.SPIL_DATA sd ON pe.SPIL_DATA_SN = sd.SPIL_DATA_SN
|
||
WHERE pe.PRED_EXEC_SN=$1`,
|
||
[execSn]
|
||
)
|
||
if (result.rows.length === 0) {
|
||
return res.status(404).json({ error: '분석 기록을 찾을 수 없습니다.' })
|
||
}
|
||
|
||
const row = result.rows[0]
|
||
const dbStatus: string = row.exec_stts_cd as string
|
||
// DB 상태 → API 상태 매핑
|
||
const statusMap: Record<string, string> = {
|
||
PENDING: 'PENDING',
|
||
RUNNING: 'RUNNING',
|
||
COMPLETED: 'DONE',
|
||
FAILED: 'ERROR',
|
||
}
|
||
const status = statusMap[dbStatus] ?? dbStatus
|
||
|
||
if (status === 'DONE' && row.rslt_data) {
|
||
const algoCd = String(row.algo_cd ?? '')
|
||
const modelName = ALGO_CD_TO_MODEL_NAME[algoCd] ?? algoCd
|
||
const { trajectory, summary, centerPoints, windData, hydrData } = transformResult(row.rslt_data as PythonTimeStep[], modelName)
|
||
return res.json({ status, trajectory, summary, centerPoints, windData, hydrData })
|
||
}
|
||
|
||
if (status === 'ERROR') {
|
||
return res.json({ status, error: (row.err_msg as string) || '분석 중 오류가 발생했습니다.' })
|
||
}
|
||
|
||
// PENDING/RUNNING: 경과 시간 기반 진행률 계산
|
||
// 과거 실행의 초/예측시간 비율(avg_sec_per_hr) × 현재 fcst_hr로 추정, 이력 없으면 5초/hr 폴백
|
||
let progress: number | undefined;
|
||
if (status === 'RUNNING' && row.bgng_dtm) {
|
||
const fcstHr = Number(row.fcst_hr) || 24;
|
||
const avgSecPerHr = row.avg_sec_per_hr ? Number(row.avg_sec_per_hr) : 5;
|
||
const estimatedSec = avgSecPerHr * fcstHr;
|
||
const elapsedSec = (Date.now() - new Date(row.bgng_dtm as string).getTime()) / 1000;
|
||
progress = Math.min(95, Math.floor((elapsedSec / estimatedSec) * 100));
|
||
}
|
||
|
||
res.json({ status, ...(progress !== undefined && { progress }) })
|
||
} catch {
|
||
res.status(500).json({ error: '상태 조회 실패' })
|
||
}
|
||
})
|
||
|
||
// ============================================================
|
||
// POST /api/simulation/run-model (동기 방식)
|
||
// 예측 완료 후 결과를 직접 반환한다.
|
||
// ============================================================
|
||
/**
|
||
* 선택된 모델로 확산 시뮬레이션을 실행하고 완료될 때까지 대기한 후 결과를 반환한다.
|
||
* 다중 모델은 병렬로 실행되며, 일부 모델 실패 시 성공한 모델 결과는 포함된다.
|
||
*/
|
||
router.post('/run-model', requireAuth, async (req: Request, res: Response) => {
|
||
try {
|
||
const { acdntSn: rawAcdntSn, acdntNm, spillUnit, spillTypeCd,
|
||
lat, lon, runTime, matTy, matVol, spillTime, startTime,
|
||
models: rawModels } = req.body
|
||
|
||
let requestedModels: string[] = Array.isArray(rawModels) && rawModels.length > 0
|
||
? (rawModels as string[])
|
||
: ['OpenDrift']
|
||
|
||
// 1. 필수 파라미터 검증
|
||
if (lat === undefined || lon === undefined || runTime === undefined) {
|
||
return res.status(400).json({ error: '필수 파라미터 누락', required: ['lat', 'lon', 'runTime'] })
|
||
}
|
||
if (!isValidLatitude(lat)) {
|
||
return res.status(400).json({ error: '유효하지 않은 위도', message: '위도는 -90~90 범위여야 합니다.' })
|
||
}
|
||
if (!isValidLongitude(lon)) {
|
||
return res.status(400).json({ error: '유효하지 않은 경도', message: '경도는 -180~180 범위여야 합니다.' })
|
||
}
|
||
if (!isValidNumber(runTime, 1, 720)) {
|
||
return res.status(400).json({ error: '유효하지 않은 예측 시간', message: '예측 시간은 1~720 범위여야 합니다.' })
|
||
}
|
||
if (matVol !== undefined && !isValidNumber(matVol, 0, 1000000)) {
|
||
return res.status(400).json({ error: '유효하지 않은 유출량' })
|
||
}
|
||
if (matTy !== undefined && (typeof matTy !== 'string' || !isValidStringLength(matTy, 50))) {
|
||
return res.status(400).json({ error: '유효하지 않은 유종' })
|
||
}
|
||
if (!rawAcdntSn && (!acdntNm || typeof acdntNm !== 'string' || !acdntNm.trim())) {
|
||
return res.status(400).json({ error: '사고를 선택하거나 사고명을 입력해야 합니다.' })
|
||
}
|
||
if (acdntNm && (typeof acdntNm !== 'string' || !isValidStringLength(acdntNm, 200))) {
|
||
return res.status(400).json({ error: '사고명은 200자 이내여야 합니다.' })
|
||
}
|
||
|
||
// 2. NC 파일 존재 여부 확인
|
||
if (requestedModels.includes('OpenDrift')) {
|
||
try {
|
||
const checkRes = await fetch(`${PYTHON_API_URL}/check-nc`, {
|
||
method: 'POST',
|
||
headers: { 'Content-Type': 'application/json' },
|
||
body: JSON.stringify({ lat, lon, startTime }),
|
||
signal: AbortSignal.timeout(5000),
|
||
})
|
||
if (!checkRes.ok) {
|
||
// NC 파일 없으면 OpenDrift만 제외, 나머지 모델(POSEIDON 등)은 계속 진행
|
||
requestedModels = requestedModels.filter(m => m !== 'OpenDrift')
|
||
if (requestedModels.length === 0) {
|
||
return res.status(409).json({
|
||
error: '해당 좌표의 해양 기상 데이터가 없습니다.',
|
||
message: 'NC 파일이 준비되지 않았습니다.',
|
||
})
|
||
}
|
||
}
|
||
} catch {
|
||
// Python 서버 미기동 — 이후 단계에서 처리
|
||
}
|
||
}
|
||
|
||
// 3. ACDNT/SPIL_DATA 생성 또는 조회
|
||
let resolvedAcdntSn: number | null = rawAcdntSn ? Number(rawAcdntSn) : null
|
||
let resolvedSpilDataSn: number | null = null
|
||
let newlyCreatedAcdntSn: number | null = null
|
||
let newlyCreatedSpilDataSn: number | null = null
|
||
|
||
if (!resolvedAcdntSn && acdntNm) {
|
||
try {
|
||
const occrn = startTime ?? new Date().toISOString()
|
||
const acdntRes = await wingPool.query(
|
||
`INSERT INTO wing.ACDNT
|
||
(ACDNT_CD, ACDNT_NM, ACDNT_TP_CD, OCCRN_DTM, LAT, LNG, ACDNT_STTS_CD, USE_YN, REG_DTM)
|
||
VALUES (
|
||
'INC-' || EXTRACT(YEAR FROM NOW())::TEXT || '-' ||
|
||
LPAD(
|
||
(SELECT COALESCE(MAX(CAST(SPLIT_PART(ACDNT_CD, '-', 3) AS INTEGER)), 0) + 1
|
||
FROM wing.ACDNT
|
||
WHERE ACDNT_CD LIKE 'INC-' || EXTRACT(YEAR FROM NOW())::TEXT || '-%')::TEXT,
|
||
4, '0'
|
||
),
|
||
$1, '유류유출', $2, $3, $4, 'ACTIVE', 'Y', NOW()
|
||
)
|
||
RETURNING ACDNT_SN`,
|
||
[acdntNm.trim(), occrn, lat, lon]
|
||
)
|
||
resolvedAcdntSn = acdntRes.rows[0].acdnt_sn as number
|
||
newlyCreatedAcdntSn = resolvedAcdntSn
|
||
|
||
const spilRes = await wingPool.query(
|
||
`INSERT INTO wing.SPIL_DATA (ACDNT_SN, OIL_TP_CD, SPIL_QTY, SPIL_UNIT_CD, SPIL_TP_CD, FCST_HR, REG_DTM)
|
||
VALUES ($1, $2, $3, $4, $5, $6, NOW())
|
||
RETURNING SPIL_DATA_SN`,
|
||
[
|
||
resolvedAcdntSn,
|
||
OIL_DB_CODE_MAP[matTy as string] ?? 'BUNKER_C',
|
||
matVol ?? 0,
|
||
UNIT_MAP[spillUnit as string] ?? 'KL',
|
||
SPIL_TYPE_MAP[spillTypeCd as string] ?? 'CONTINUOUS',
|
||
runTime,
|
||
]
|
||
)
|
||
resolvedSpilDataSn = spilRes.rows[0].spil_data_sn as number
|
||
newlyCreatedSpilDataSn = resolvedSpilDataSn
|
||
} catch (dbErr) {
|
||
console.error('[simulation/run-model] ACDNT/SPIL_DATA INSERT 실패:', dbErr)
|
||
return res.status(500).json({ error: '사고 정보 생성 실패' })
|
||
}
|
||
}
|
||
|
||
if (resolvedAcdntSn && !resolvedSpilDataSn) {
|
||
try {
|
||
const spilRes = await wingPool.query(
|
||
`SELECT SPIL_DATA_SN FROM wing.SPIL_DATA WHERE ACDNT_SN = $1 ORDER BY SPIL_DATA_SN DESC LIMIT 1`,
|
||
[resolvedAcdntSn]
|
||
)
|
||
if (spilRes.rows.length > 0) {
|
||
resolvedSpilDataSn = spilRes.rows[0].spil_data_sn as number
|
||
}
|
||
} catch (dbErr) {
|
||
console.error('[simulation/run-model] SPIL_DATA 조회 실패:', dbErr)
|
||
}
|
||
}
|
||
|
||
const odMatTy = matTy !== undefined ? (OIL_TYPE_MAP[matTy as string] ?? (matTy as string)) : undefined
|
||
const execNmBase = `EXPC_${Date.now()}`
|
||
|
||
// KOSPS: PRED_EXEC INSERT(PENDING)만 수행
|
||
const execSns: Array<{ model: string; execSn: number }> = []
|
||
if (requestedModels.includes('KOSPS')) {
|
||
try {
|
||
const kospsExecNm = `${execNmBase}_KOSPS`
|
||
const insertRes = await wingPool.query(
|
||
`INSERT INTO wing.PRED_EXEC (ACDNT_SN, SPIL_DATA_SN, ALGO_CD, EXEC_STTS_CD, EXEC_NM, BGNG_DTM)
|
||
VALUES ($1, $2, 'KOSPS', 'PENDING', $3, NOW())
|
||
RETURNING PRED_EXEC_SN`,
|
||
[resolvedAcdntSn, resolvedSpilDataSn, kospsExecNm]
|
||
)
|
||
execSns.push({ model: 'KOSPS', execSn: insertRes.rows[0].pred_exec_sn as number })
|
||
} catch (dbErr) {
|
||
console.error('[simulation/run-model] KOSPS PRED_EXEC INSERT 실패:', dbErr)
|
||
}
|
||
}
|
||
|
||
// 4. API 연동 모델 시작 및 완료 대기 (병렬)
|
||
const apiModels = requestedModels.filter((m) => m !== 'KOSPS' && MODEL_ALGO_CD_MAP[m] !== undefined)
|
||
|
||
interface SyncModelResult {
|
||
model: string
|
||
execSn: number
|
||
status: 'DONE' | 'ERROR'
|
||
trajectory?: ReturnType<typeof transformResult>['trajectory']
|
||
summary?: ReturnType<typeof transformResult>['summary']
|
||
stepSummaries?: ReturnType<typeof transformResult>['stepSummaries']
|
||
centerPoints?: ReturnType<typeof transformResult>['centerPoints']
|
||
windData?: ReturnType<typeof transformResult>['windData']
|
||
hydrData?: ReturnType<typeof transformResult>['hydrData']
|
||
error?: string
|
||
}
|
||
|
||
const modelResults = await Promise.all(
|
||
apiModels.map(async (model): Promise<SyncModelResult> => {
|
||
const algoCd = MODEL_ALGO_CD_MAP[model]
|
||
const apiUrl = MODEL_API_URL_MAP[model]
|
||
const execNm = `${execNmBase}_${algoCd}`
|
||
|
||
// PRED_EXEC INSERT
|
||
let predExecSn: number
|
||
try {
|
||
const insertRes = await wingPool.query(
|
||
`INSERT INTO wing.PRED_EXEC (ACDNT_SN, SPIL_DATA_SN, ALGO_CD, EXEC_STTS_CD, EXEC_NM, BGNG_DTM)
|
||
VALUES ($1, $2, $3, 'PENDING', $4, NOW())
|
||
RETURNING PRED_EXEC_SN`,
|
||
[resolvedAcdntSn, resolvedSpilDataSn, algoCd, execNm]
|
||
)
|
||
predExecSn = insertRes.rows[0].pred_exec_sn as number
|
||
} catch (dbErr) {
|
||
console.error(`[simulation/run-model] ${model} PRED_EXEC INSERT 실패:`, dbErr)
|
||
return { model, execSn: 0, status: 'ERROR', error: 'DB 오류' }
|
||
}
|
||
|
||
execSns.push({ model, execSn: predExecSn })
|
||
|
||
// Python /run-model 호출
|
||
let jobId: string | undefined
|
||
try {
|
||
const pythonRes = await fetch(`${apiUrl}/run-model`, {
|
||
method: 'POST',
|
||
headers: { 'Content-Type': 'application/json' },
|
||
body: JSON.stringify({ lat, lon, startTime, runTime, matTy: odMatTy, matVol, spillTime, name: execNm }),
|
||
signal: AbortSignal.timeout(POLL_TIMEOUT_MS),
|
||
})
|
||
|
||
if (pythonRes.status === 503) {
|
||
const errData = await pythonRes.json() as { error?: string }
|
||
const errMsg = errData.error || '분석 서버 포화'
|
||
await wingPool.query(
|
||
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG=$1, CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$2`,
|
||
[errMsg, predExecSn]
|
||
)
|
||
return { model, execSn: predExecSn, status: 'ERROR', error: errMsg }
|
||
}
|
||
|
||
if (!pythonRes.ok) {
|
||
throw new Error(`Python 서버 응답 오류: ${pythonRes.status}`)
|
||
}
|
||
|
||
const pythonData = await pythonRes.json() as {
|
||
success?: boolean;
|
||
result?: PythonTimeStep[];
|
||
job_id?: string;
|
||
error?: string;
|
||
message?: string;
|
||
error_code?: number;
|
||
}
|
||
|
||
// 동기 성공 응답 (OpenDrift & POSEIDON 공통)
|
||
if (Array.isArray(pythonData.result)) {
|
||
await wingPool.query(
|
||
`UPDATE wing.PRED_EXEC
|
||
SET EXEC_STTS_CD='COMPLETED', RSLT_DATA=$1,
|
||
CMPL_DTM=NOW(), REQD_SEC=EXTRACT(EPOCH FROM (NOW() - BGNG_DTM))::INTEGER
|
||
WHERE PRED_EXEC_SN=$2`,
|
||
[JSON.stringify(pythonData.result), predExecSn]
|
||
)
|
||
const { trajectory, summary, stepSummaries, centerPoints, windData, hydrData } =
|
||
transformResult(pythonData.result, model)
|
||
return { model, execSn: predExecSn, status: 'DONE', trajectory, summary, stepSummaries, centerPoints, windData, hydrData }
|
||
}
|
||
|
||
// 비동기 응답 (하위 호환)
|
||
if (pythonData.job_id) {
|
||
jobId = pythonData.job_id
|
||
} else {
|
||
// 오류 응답 (success: false, HTTP 200)
|
||
const errMsg = pythonData.error || pythonData.message || '분석 오류'
|
||
await wingPool.query(
|
||
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG=$1, CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$2`,
|
||
[errMsg, predExecSn]
|
||
)
|
||
return { model, execSn: predExecSn, status: 'ERROR', error: errMsg }
|
||
}
|
||
} catch (fetchErr) {
|
||
const errMsg = 'Python 분석 서버에 연결할 수 없습니다.'
|
||
await wingPool.query(
|
||
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG=$1, CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$2`,
|
||
[errMsg, predExecSn]
|
||
)
|
||
return { model, execSn: predExecSn, status: 'ERROR', error: errMsg }
|
||
}
|
||
|
||
// RUNNING 업데이트 (비동기 폴링 경로)
|
||
await wingPool.query(
|
||
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='RUNNING' WHERE PRED_EXEC_SN=$1`,
|
||
[predExecSn]
|
||
)
|
||
|
||
// 결과 동기 대기
|
||
try {
|
||
const rawResult = await runModelSync(jobId!, predExecSn, apiUrl)
|
||
const { trajectory, summary, stepSummaries, centerPoints, windData, hydrData } = transformResult(rawResult, model)
|
||
return { model, execSn: predExecSn, status: 'DONE', trajectory, summary, stepSummaries, centerPoints, windData, hydrData }
|
||
} catch (syncErr) {
|
||
return { model, execSn: predExecSn, status: 'ERROR', error: (syncErr as Error).message }
|
||
}
|
||
})
|
||
)
|
||
|
||
// 모든 모델이 실패하고 신규 생성한 ACDNT가 있으면 롤백
|
||
const hasSuccess = modelResults.some((r) => r.status === 'DONE')
|
||
if (!hasSuccess && newlyCreatedAcdntSn !== null) {
|
||
for (const r of modelResults) {
|
||
if (r.execSn) await rollbackNewRecords(r.execSn, null, null)
|
||
}
|
||
await rollbackNewRecords(null, newlyCreatedSpilDataSn, newlyCreatedAcdntSn)
|
||
return res.status(503).json({ error: '분석 서버에 연결할 수 없습니다.' })
|
||
}
|
||
|
||
res.json({
|
||
success: true,
|
||
acdntSn: resolvedAcdntSn,
|
||
execSns: [...execSns, ...modelResults.map(({ model, execSn }) => ({ model, execSn }))],
|
||
results: modelResults,
|
||
})
|
||
} catch {
|
||
res.status(500).json({ error: '시뮬레이션 실행 실패', message: '서버 내부 오류가 발생했습니다.' })
|
||
}
|
||
})
|
||
|
||
// ============================================================
|
||
// 백그라운드 폴링
|
||
// ============================================================
|
||
async function pollAndSaveModel(jobId: string, execSn: number, apiUrl: string, algoCode: string): Promise<void> {
|
||
const deadline = Date.now() + POLL_TIMEOUT_MS
|
||
|
||
while (Date.now() < deadline) {
|
||
await new Promise<void>(resolve => setTimeout(resolve, POLL_INTERVAL_MS))
|
||
|
||
try {
|
||
const pollRes = await fetch(`${apiUrl}/status/${jobId}`, {
|
||
signal: AbortSignal.timeout(5000),
|
||
})
|
||
if (!pollRes.ok) continue
|
||
|
||
const data = await pollRes.json() as PythonStatusResponse
|
||
|
||
if (data.status === 'DONE' && data.result) {
|
||
await wingPool.query(
|
||
`UPDATE wing.PRED_EXEC
|
||
SET EXEC_STTS_CD='COMPLETED',
|
||
RSLT_DATA=$1,
|
||
CMPL_DTM=NOW(),
|
||
REQD_SEC=EXTRACT(EPOCH FROM (NOW() - BGNG_DTM))::INTEGER
|
||
WHERE PRED_EXEC_SN=$2`,
|
||
[JSON.stringify(data.result), execSn]
|
||
)
|
||
return
|
||
}
|
||
|
||
if (data.status === 'ERROR') {
|
||
await wingPool.query(
|
||
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG=$1, CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$2`,
|
||
[data.error ?? '분석 오류', execSn]
|
||
)
|
||
return
|
||
}
|
||
} catch {
|
||
// 개별 폴링 오류는 무시하고 재시도
|
||
}
|
||
}
|
||
|
||
// 타임아웃 처리
|
||
await wingPool.query(
|
||
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG='분석 시간 초과 (30분)', CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$1`,
|
||
[execSn]
|
||
)
|
||
}
|
||
|
||
// ============================================================
|
||
// 동기 폴링: Python 결과 대기 후 반환
|
||
// ============================================================
|
||
async function runModelSync(jobId: string, execSn: number, apiUrl: string): Promise<PythonTimeStep[]> {
|
||
const deadline = Date.now() + POLL_TIMEOUT_MS
|
||
|
||
while (Date.now() < deadline) {
|
||
await new Promise<void>(resolve => setTimeout(resolve, POLL_INTERVAL_MS))
|
||
|
||
let data: PythonStatusResponse
|
||
try {
|
||
const pollRes = await fetch(`${apiUrl}/status/${jobId}`, {
|
||
signal: AbortSignal.timeout(5000),
|
||
})
|
||
if (!pollRes.ok) continue
|
||
data = await pollRes.json() as PythonStatusResponse
|
||
} catch {
|
||
// 네트워크 오류 — 재시도
|
||
continue
|
||
}
|
||
|
||
if (data.status === 'DONE' && data.result) {
|
||
await wingPool.query(
|
||
`UPDATE wing.PRED_EXEC
|
||
SET EXEC_STTS_CD='COMPLETED',
|
||
RSLT_DATA=$1,
|
||
CMPL_DTM=NOW(),
|
||
REQD_SEC=EXTRACT(EPOCH FROM (NOW() - BGNG_DTM))::INTEGER
|
||
WHERE PRED_EXEC_SN=$2`,
|
||
[JSON.stringify(data.result), execSn]
|
||
)
|
||
return data.result
|
||
}
|
||
|
||
if (data.status === 'ERROR') {
|
||
const errMsg = data.error ?? '분석 오류'
|
||
await wingPool.query(
|
||
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG=$1, CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$2`,
|
||
[errMsg, execSn]
|
||
)
|
||
throw new Error(errMsg)
|
||
}
|
||
}
|
||
|
||
await wingPool.query(
|
||
`UPDATE wing.PRED_EXEC SET EXEC_STTS_CD='FAILED', ERR_MSG='분석 시간 초과 (30분)', CMPL_DTM=NOW() WHERE PRED_EXEC_SN=$1`,
|
||
[execSn]
|
||
)
|
||
throw new Error('분석 시간 초과 (30분)')
|
||
}
|
||
|
||
// ============================================================
|
||
// 타입 및 결과 변환
|
||
// ============================================================
|
||
interface PythonParticle {
|
||
lat: number
|
||
lon: number
|
||
stranded?: 0 | 1
|
||
}
|
||
|
||
interface WindPoint {
|
||
lat: number
|
||
lon: number
|
||
wind_speed: number
|
||
wind_direction: number
|
||
}
|
||
|
||
interface HydrGrid {
|
||
lonInterval: number[]
|
||
boundLonLat: { top: number; bottom: number; left: number; right: number }
|
||
rows: number
|
||
cols: number
|
||
latInterval: number[]
|
||
}
|
||
|
||
interface PythonTimeStep {
|
||
particles: PythonParticle[]
|
||
remaining_volume_m3: number
|
||
weathered_volume_m3: number
|
||
evaporation_m3?: number
|
||
dispersion_m3?: number
|
||
pollution_area_km2: number
|
||
beached_volume_m3: number
|
||
pollution_coast_length_m: number
|
||
center_lat?: number
|
||
center_lon?: number
|
||
wind_data?: WindPoint[]
|
||
hydr_data?: [number[][], number[][]]
|
||
hydr_grid?: HydrGrid
|
||
}
|
||
|
||
interface PythonStatusResponse {
|
||
status: 'RUNNING' | 'DONE' | 'ERROR'
|
||
result?: PythonTimeStep[]
|
||
error?: string
|
||
}
|
||
|
||
// ALGO_CD → 프론트엔드 모델명 매핑
|
||
const ALGO_CD_TO_MODEL_NAME: Record<string, string> = {
|
||
'OPENDRIFT': 'OpenDrift',
|
||
'POSEIDON': 'POSEIDON',
|
||
}
|
||
|
||
function transformResult(rawResult: PythonTimeStep[], model: string) {
|
||
const trajectory = rawResult.flatMap((step, stepIdx) =>
|
||
step.particles.map((p, i) => ({
|
||
lat: p.lat,
|
||
lon: p.lon,
|
||
time: stepIdx,
|
||
particle: i,
|
||
stranded: p.stranded,
|
||
}))
|
||
)
|
||
const lastStep = rawResult[rawResult.length - 1]
|
||
const summary = {
|
||
remainingVolume: lastStep.remaining_volume_m3,
|
||
weatheredVolume: lastStep.weathered_volume_m3,
|
||
evaporationVolume: lastStep.evaporation_m3 ?? lastStep.weathered_volume_m3 * 0.65,
|
||
dispersionVolume: lastStep.dispersion_m3 ?? lastStep.weathered_volume_m3 * 0.35,
|
||
pollutionArea: lastStep.pollution_area_km2,
|
||
beachedVolume: lastStep.beached_volume_m3,
|
||
pollutionCoastLength: lastStep.pollution_coast_length_m,
|
||
}
|
||
const centerPoints = rawResult
|
||
.map((step, stepIdx) =>
|
||
step.center_lat != null && step.center_lon != null
|
||
? { lat: step.center_lat, lon: step.center_lon, time: stepIdx, model }
|
||
: null
|
||
)
|
||
.filter((p): p is { lat: number; lon: number; time: number; model: string } => p !== null)
|
||
const windData = rawResult.map((step) => step.wind_data ?? [])
|
||
const hydrData = rawResult.map((step) =>
|
||
step.hydr_data && step.hydr_grid
|
||
? { value: step.hydr_data, grid: step.hydr_grid }
|
||
: null
|
||
)
|
||
const stepSummaries = rawResult.map((step) => ({
|
||
remainingVolume: step.remaining_volume_m3,
|
||
weatheredVolume: step.weathered_volume_m3,
|
||
evaporationVolume: step.evaporation_m3 ?? step.weathered_volume_m3 * 0.65,
|
||
dispersionVolume: step.dispersion_m3 ?? step.weathered_volume_m3 * 0.35,
|
||
pollutionArea: step.pollution_area_km2,
|
||
beachedVolume: step.beached_volume_m3,
|
||
pollutionCoastLength: step.pollution_coast_length_m,
|
||
}))
|
||
return { trajectory, summary, stepSummaries, centerPoints, windData, hydrData }
|
||
}
|
||
|
||
export default router
|