wing-ops/hns_dispersion.py
Nan Kyung Lee 8f98f63aa5 feat(aerial): CCTV 실시간 HLS 스트림 + HNS 분석 고도화
CCTV 실시간 영상:
- CCTVPlayer 컴포넌트 (hls.js 기반 HLS/MJPEG/MP4 재생)
- 백엔드 HLS 프록시 엔드포인트 (CORS 우회, m3u8 URL 재작성)
- KHOA 15개 + KBS 6개 실제 해안 CCTV 연동
- Vite dev proxy, 스트림 타입 자동 감지 유틸리티

HNS 분석:
- HNS 시나리오 저장/불러오기/재계산 기능
- 물질 DB 검색 및 상세 정보 연동
- 좌표/파라미터 입력 UI 개선
- Python 확산 모델 스크립트 (hns_dispersion.py)

공통:
- 3D 지도 토글, 보고서 생성 개선
- useSubMenu 훅, mapUtils 확장
- ESLint set-state-in-effect 수정

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-04 17:21:41 +09:00

566 lines
20 KiB
Python
Raw Blame 히스토리

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
WING HNS 대기확산 모델 알고리즘
=====================================
지원 모델:
1. Gaussian Plume - 연속 누출 (Continuous Release)
2. Gaussian Puff - 순간 누출 (Instantaneous Release)
3. Dense Gas Model - 고밀도 가스 (ALOHA 방식, Britter-McQuaid)
출력:
- 시간별 농도장 2D 히트맵 애니메이션 (MP4 / GIF)
의존 라이브러리:
pip install numpy matplotlib scipy
작성: WING 프로젝트 / HNS 대응 모듈
"""
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.colors import LogNorm
from dataclasses import dataclass, field
from typing import Literal, Optional
import warnings
warnings.filterwarnings("ignore")
# ──────────────────────────────────────────────
# 1. 입력 파라미터 데이터 클래스
# ──────────────────────────────────────────────
@dataclass
class MeteoParams:
"""기상 조건"""
wind_speed: float = 5.0 # 풍속 (m/s)
wind_dir_deg: float = 270.0 # 풍향 (기상 기준, 0=북, 90=동, ...)
stability: str = "D" # Pasquill-Gifford 안정도 (A~F)
temperature: float = 293.15 # 기온 (K)
pressure: float = 101325.0 # 기압 (Pa)
mixing_height: float = 800.0 # 혼합층 높이 (m)
@dataclass
class SourceParams:
"""누출원 정보"""
Q: float = 10.0 # 배출률 (g/s) - Plume용
Q_total: float = 5000.0 # 총 누출량 (g) - Puff용
x0: float = 0.0 # 누출 위치 X (m)
y0: float = 0.0 # 누출 위치 Y (m)
z0: float = 0.5 # 누출 높이 (m) (해상 = 수면 근처)
release_duration: float = 0.0 # 누출 지속시간 (s), 0=순간
# Dense Gas 전용
molecular_weight: float = 71.9 # 분자량 (g/mol) - 기본: Chlorine
vapor_pressure: float = 670.0 # 증기압 (mmHg)
density_gas: float = 3.2 # 가스 밀도 (kg/m³) at 누출 조건
pool_radius: float = 5.0 # 액체풀 반경 (m)
@dataclass
class SimParams:
"""시뮬레이션 격자 및 시간 설정"""
x_range: tuple = (-50, 3000) # X 범위 (m)
y_range: tuple = (-800, 800) # Y 범위 (m)
nx: int = 200 # X 격자 수
ny: int = 160 # Y 격자 수
z_ref: float = 1.5 # 농도 계산 기준 높이 (m, 호흡선)
t_start: float = 0.0 # 시작 시간 (s)
t_end: float = 600.0 # 종료 시간 (s)
dt: float = 30.0 # 시간 간격 (s)
# ──────────────────────────────────────────────
# 2. 확산계수 (Pasquill-Gifford σy, σz)
# ──────────────────────────────────────────────
# Briggs (1973) open-country 파라미터
_PG_PARAMS = {
# stability: (ay, by, az, bz) → σy = ay*x/(1+by*x)^0.5, σz = az*x/(1+bz*x)^0.5
"A": (0.22, 0.0001, 0.20, 0.000),
"B": (0.16, 0.0001, 0.12, 0.000),
"C": (0.11, 0.0001, 0.08, 0.0002),
"D": (0.08, 0.0001, 0.06, 0.0015),
"E": (0.06, 0.0001, 0.03, 0.0003),
"F": (0.04, 0.0001, 0.016, 0.0003),
}
def sigma_y(x: np.ndarray, stability: str) -> np.ndarray:
"""수평 확산계수 σy (m)"""
ay, by, _, _ = _PG_PARAMS[stability]
x = np.maximum(x, 1.0)
return ay * x / np.sqrt(1.0 + by * x)
def sigma_z(x: np.ndarray, stability: str) -> np.ndarray:
"""수직 확산계수 σz (m)"""
_, _, az, bz = _PG_PARAMS[stability]
x = np.maximum(x, 1.0)
return az * x / np.sqrt(1.0 + bz * x)
# ──────────────────────────────────────────────
# 3. 바람 방향 회전 유틸리티
# ──────────────────────────────────────────────
def wind_rotation(wind_dir_deg: float):
"""
기상 풍향 → 수학 좌표 회전각 변환
Returns: (cos_theta, sin_theta)
기상 풍향 270° (서풍) → 바람이 동쪽(+x)으로 진행
"""
math_angle = np.radians(270.0 - wind_dir_deg)
return np.cos(math_angle), np.sin(math_angle)
def rotate_to_wind(X, Y, x0, y0, cos_t, sin_t):
"""절대 좌표 → 바람 중심축 좌표 (x'=풍하, y'=횡풍)"""
dx = X - x0
dy = Y - y0
xw = dx * cos_t + dy * sin_t # 풍하거리
yw = -dx * sin_t + dy * cos_t # 횡풍거리
return xw, yw
# ──────────────────────────────────────────────
# 4. Model 1: Gaussian Plume (연속 누출)
# ──────────────────────────────────────────────
def gaussian_plume(
X: np.ndarray, Y: np.ndarray,
meteo: MeteoParams, src: SourceParams, sim: SimParams,
t: float = None # Plume은 정상 상태; t 인수는 인터페이스 통일용
) -> np.ndarray:
"""
정상 상태 가우시안 플룸 농도 (g/m³)
- 지면 반사 포함 (image method)
- 혼합층 상한 반사 포함 (선택적)
C(x,y,z) = Q / (2π σy σz u)
× exp(-y²/2σy²)
× [exp(-(z-H)²/2σz²) + exp(-(z+H)²/2σz²)]
"""
cos_t, sin_t = wind_rotation(meteo.wind_dir_deg)
xw, yw = rotate_to_wind(X, Y, src.x0, src.y0, cos_t, sin_t)
# 풍하 양수 구역만 계산
mask = xw > 0
C = np.zeros_like(X)
sy = sigma_y(xw[mask], meteo.stability)
sz = sigma_z(xw[mask], meteo.stability)
sz = np.minimum(sz, meteo.mixing_height) # 혼합층 상한 클리핑
u = max(meteo.wind_speed, 0.5)
H = src.z0
z = sim.z_ref
# 지면 반사항
term_y = np.exp(-0.5 * (yw[mask] / sy)**2)
term_z1 = np.exp(-0.5 * ((z - H) / sz)**2)
term_z2 = np.exp(-0.5 * ((z + H) / sz)**2)
C[mask] = (src.Q / (2 * np.pi * sy * sz * u)) * term_y * (term_z1 + term_z2)
return C
# ──────────────────────────────────────────────
# 5. Model 2: Gaussian Puff (순간 누출)
# ──────────────────────────────────────────────
def gaussian_puff(
X: np.ndarray, Y: np.ndarray,
meteo: MeteoParams, src: SourceParams, sim: SimParams,
t: float
) -> np.ndarray:
"""
이동하는 가우시안 퍼프 농도 (g/m³)
C(x,y,z,t) = Q_total / [(2π)^(3/2) σx σy σz]
× exp(-x_r²/2σx²)
× exp(-y_r²/2σy²)
× [exp(-(z-H)²/2σz²) + exp(-(z+H)²/2σz²)]
여기서 퍼프 중심 = (u·t·cos, u·t·sin)
"""
if t <= 0:
return np.zeros_like(X)
u = max(meteo.wind_speed, 0.5)
cos_t, sin_t = wind_rotation(meteo.wind_dir_deg)
# 퍼프 중심 이동
xc = src.x0 + u * t * cos_t
yc = src.y0 + u * t * sin_t
# 퍼프에서의 상대 거리
dx = X - xc
dy = Y - yc
# 이동거리 기준으로 σ 계산
travel_dist = u * t
sy = sigma_y(np.array([travel_dist]), meteo.stability)[0]
sz = sigma_z(np.array([travel_dist]), meteo.stability)[0]
sx = sy # 풍하 방향 확산 ≈ 횡풍 방향
sz = min(sz, meteo.mixing_height)
H = src.z0
z = sim.z_ref
norm = (2 * np.pi)**1.5 * sx * sy * sz
term_x = np.exp(-0.5 * (dx / sx)**2)
term_y = np.exp(-0.5 * (dy / sy)**2)
term_z = (np.exp(-0.5 * ((z - H) / sz)**2) +
np.exp(-0.5 * ((z + H) / sz)**2))
C = (src.Q_total / norm) * term_x * term_y * term_z
return C
# ──────────────────────────────────────────────
# 6. Model 3: Dense Gas (ALOHA 방식, Britter-McQuaid)
# ──────────────────────────────────────────────
def dense_gas_britter_mcquaid(
X: np.ndarray, Y: np.ndarray,
meteo: MeteoParams, src: SourceParams, sim: SimParams,
t: float
) -> np.ndarray:
"""
Britter & McQuaid (1988) Dense Gas 모델
- 무거운 가스의 중력 침강(gravity spreading) 효과 반영
- ALOHA에서 채용하는 방식과 동일한 기본 구조
적용 조건: ρ_gas / ρ_air > 1.1 이상
출력 단위: g/m³ (ppm 변환은 convert_to_ppm 함수 사용)
1) 부력 flux: g0' = g(ρg-ρa)/ρa × qv
2) 연속 누출: plume width = f(g0', u, x)
3) 순간 누출: cloud radius = f(g0, t)
"""
rho_air = meteo.pressure * 0.02897 / (8.314 * meteo.temperature) # kg/m³
g = 9.81 # m/s²
u = max(meteo.wind_speed, 0.5)
# 초기 농도 (부피 분율)
rho_g = src.density_gas
C0_vol = rho_g / (rho_g + rho_air) # 초기 부피 분율
# 단위 부력 flux (m²/s³)
g_prime0 = g * (rho_g - rho_air) / rho_air
cos_t, sin_t = wind_rotation(meteo.wind_dir_deg)
xw, yw = rotate_to_wind(X, Y, src.x0, src.y0, cos_t, sin_t)
mask = xw > 0
C = np.zeros_like(X)
if not np.any(mask):
return C
xd = xw[mask]
yd = yw[mask]
if src.release_duration == 0:
# ── 순간 누출 (Puff) ──
# 이동 중심
xc = u * t
# 클라우드 반경: r(t) = r0 + α·(g_prime0·V0)^(1/4)·t^(1/2)
V0 = np.pi * src.pool_radius**2 * 1.0 # 초기 체적 (1m 두께 가정)
r0 = src.pool_radius
r_t = r0 + 1.1 * (g_prime0 * V0)**0.25 * max(t, 0.1)**0.5
# 클라우드 높이: h(t) = V0 / (π r²)
h_t = max(V0 / (np.pi * r_t**2), 0.1)
# Gaussian 농도 분포 내 클라우드
dist_xr = xd - xc
sigma_cloud = r_t / 2.15 # r_t ≈ 2.15σ
C_vol = C0_vol * np.exp(-0.5 * (dist_xr / sigma_cloud)**2) \
* np.exp(-0.5 * (yd / sigma_cloud)**2)
else:
# ── 연속 누출 (Plume) ──
# 풍하거리별 플룸 폭: b(x) = b0 × [1 + α(g_prime_x / u³)^(1/3)]
qv = src.Q / (rho_g * 1000) # 체적 유량 (m³/s)
g_prime_line = g_prime0 * qv / u # 단위 길이 부력
b0 = src.pool_radius
# Britter-McQuaid: b ∝ x^0.6 for gravity-dominated
b_x = b0 + 2.5 * (g_prime_line / u**2)**0.333 * xd**0.6
b_x = np.maximum(b_x, b0)
# 플룸 높이 (중력 침강 → 낮게 유지)
h_x = qv / (u * b_x)
h_x = np.maximum(h_x, 0.1)
# 횡풍 Gaussian × 풍하 top-hat 근사
C_vol = C0_vol * (b0 / b_x) * np.exp(-0.5 * (yd / (b_x / 2))**2)
# 부피 분율 → g/m³ 변환
MW = src.molecular_weight
C[mask] = C_vol * (MW * meteo.pressure) / (8.314 * meteo.temperature) # g/m³
return C
# ──────────────────────────────────────────────
# 7. 독성 임계값 (AEGL / IDLH)
# ──────────────────────────────────────────────
# 주요 HNS 물질 독성 기준 (ppm)
TOXICITY_LEVELS = {
"Chlorine": {"AEGL1": 0.5, "AEGL2": 2.8, "AEGL3": 50.0, "IDLH": 10.0, "MW": 71.0},
"Ammonia": {"AEGL1": 1.1, "AEGL2": 16.0, "AEGL3": 50.0, "IDLH": 300.0, "MW": 17.0},
"HCl": {"AEGL1": 1.8, "AEGL2": 22.0, "AEGL3": 100.0, "IDLH": 50.0, "MW": 36.5},
"Benzene": {"AEGL1": 52.0, "AEGL2": 800.0,"AEGL3": 4000.0,"IDLH": 500.0, "MW": 78.1},
"H2S": {"AEGL1": 0.51, "AEGL2": 17.0, "AEGL3": 50.0, "IDLH": 50.0, "MW": 34.1},
}
def gm3_to_ppm(C_gm3: np.ndarray, MW: float, T_K: float = 293.15, P_Pa: float = 101325) -> np.ndarray:
"""g/m³ → ppm 변환"""
return C_gm3 * (8.314 * T_K) / (MW * P_Pa) * 1e6
# ──────────────────────────────────────────────
# 8. 애니메이션 생성
# ──────────────────────────────────────────────
def run_animation(
model: Literal["plume", "puff", "dense_gas"],
meteo: MeteoParams,
src: SourceParams,
sim: SimParams,
substance: str = "Chlorine",
save_path: Optional[str] = None,
show: bool = True
):
"""
시간별 대기확산 농도 애니메이션 생성
Parameters
----------
model : 사용할 모델 ("plume" | "puff" | "dense_gas")
save_path : 저장 경로 (.gif 또는 .mp4), None이면 저장 안 함
show : plt.show() 호출 여부
"""
MODEL_FUNC = {
"plume": gaussian_plume,
"puff": gaussian_puff,
"dense_gas": dense_gas_britter_mcquaid,
}
assert model in MODEL_FUNC, f"지원하지 않는 모델: {model}"
func = MODEL_FUNC[model]
# 격자 생성
x_lin = np.linspace(*sim.x_range, sim.nx)
y_lin = np.linspace(*sim.y_range, sim.ny)
X, Y = np.meshgrid(x_lin, y_lin)
# 시간 배열
times = np.arange(sim.t_start + sim.dt, sim.t_end + sim.dt, sim.dt)
# 독성 기준
tox = TOXICITY_LEVELS.get(substance, TOXICITY_LEVELS["Chlorine"])
MW = tox["MW"]
# 농도 프레임 사전 계산
print(f"[{model.upper()}] 농도 계산 중... ({len(times)} 프레임)")
frames_C = []
for t in times:
C_gm3 = func(X, Y, meteo, src, sim, t)
C_ppm = gm3_to_ppm(C_gm3, MW, meteo.temperature, meteo.pressure)
frames_C.append(C_ppm)
C_max_global = max(f.max() for f in frames_C)
C_min_plot = max(tox["AEGL1"] * 0.01, 1e-4)
C_max_plot = C_max_global * 1.0
# ── 그림 설정 ──
fig, ax = plt.subplots(figsize=(12, 7))
fig.patch.set_facecolor("#0a1628")
ax.set_facecolor("#0d1f3c")
cmap = plt.cm.get_cmap("RdYlGn_r")
cmap.set_under("#0d1f3c")
im = ax.pcolormesh(
X, Y, frames_C[0],
cmap=cmap,
norm=LogNorm(vmin=C_min_plot, vmax=C_max_plot),
shading="auto"
)
# 독성 등고선
AEGL_COLORS = {
"AEGL1": ("#00ff88", f'AEGL-1 ({tox["AEGL1"]} ppm)'),
"AEGL2": ("#ffcc00", f'AEGL-2 ({tox["AEGL2"]} ppm)'),
"AEGL3": ("#ff4444", f'AEGL-3 ({tox["AEGL3"]} ppm)'),
}
contour_handles = {}
for key, (color, label) in AEGL_COLORS.items():
level = tox[key]
if C_max_global >= level:
cs = ax.contour(X, Y, frames_C[0], levels=[level],
colors=[color], linewidths=1.5, linestyles="--")
contour_handles[key] = cs
# 누출원 마커
ax.plot(src.x0, src.y0, "w*", ms=14, zorder=10, label="누출원")
# 컬러바
cbar = plt.colorbar(im, ax=ax, fraction=0.03, pad=0.02)
cbar.set_label("농도 (ppm)", color="white", fontsize=11)
cbar.ax.yaxis.set_tick_params(color="white")
plt.setp(cbar.ax.yaxis.get_ticklabels(), color="white")
# 범례 패치
from matplotlib.lines import Line2D
legend_elements = [
Line2D([0],[0], color=c, ls="--", lw=1.5, label=l)
for _, (c, l) in AEGL_COLORS.items()
if C_max_global >= tox[k.replace(c,"")[-5:]] if False else True
] + [Line2D([0],[0], marker="*", color="white", ms=10, ls="none", label="누출원")]
ax.legend(handles=legend_elements, loc="upper right",
facecolor="#1a2a4a", edgecolor="#446688", labelcolor="white", fontsize=9)
# 축 설정
ax.set_xlabel("X (m) - 동서", color="white")
ax.set_ylabel("Y (m) - 남북", color="white")
ax.tick_params(colors="white")
for spine in ax.spines.values():
spine.set_edgecolor("#446688")
model_labels = {
"plume": "Gaussian Plume (연속누출)",
"puff": "Gaussian Puff (순간누출)",
"dense_gas": "Dense Gas / ALOHA 방식",
}
title = ax.set_title(
f"[WING] HNS 대기확산 - {model_labels[model]} | {substance} | t=0s",
color="white", fontsize=12, fontweight="bold"
)
# 풍향 화살표
cos_t, sin_t = wind_rotation(meteo.wind_dir_deg)
ax_w = fig.add_axes([0.01, 0.88, 0.08, 0.08])
ax_w.set_xlim(-1.5, 1.5)
ax_w.set_ylim(-1.5, 1.5)
ax_w.set_aspect("equal")
ax_w.axis("off")
ax_w.set_facecolor("#0a1628")
ax_w.annotate("", xy=(cos_t, sin_t), xytext=(0, 0),
arrowprops=dict(arrowstyle="->", color="cyan", lw=2))
ax_w.text(0, -1.4, f"{meteo.wind_speed}m/s\n{meteo.wind_dir_deg}°",
color="cyan", fontsize=7, ha="center")
def _update(frame_idx):
C_ppm = frames_C[frame_idx]
t_now = times[frame_idx]
im.set_array(C_ppm.ravel())
# 등고선 업데이트 (이전 제거 후 재생성)
for coll in ax.collections[1:]:
coll.remove()
for key, (color, _) in AEGL_COLORS.items():
level = tox[key]
if C_ppm.max() >= level:
ax.contour(X, Y, C_ppm, levels=[level],
colors=[color], linewidths=1.5, linestyles="--")
title.set_text(
f"[WING] HNS 대기확산 - {model_labels[model]} | {substance} | t={t_now:.0f}s"
)
return [im]
ani = animation.FuncAnimation(
fig, _update, frames=len(times), interval=300, blit=False
)
if save_path:
ext = save_path.split(".")[-1].lower()
print(f"애니메이션 저장 중: {save_path}")
if ext == "gif":
ani.save(save_path, writer="pillow", fps=5, dpi=120)
else:
ani.save(save_path, writer="ffmpeg", fps=5, dpi=120)
print("저장 완료!")
if show:
plt.tight_layout()
plt.show()
plt.close(fig)
return ani
# ──────────────────────────────────────────────
# 9. 다중 모델 비교 실행
# ──────────────────────────────────────────────
def run_all_models(
meteo: MeteoParams,
src: SourceParams,
sim: SimParams,
substance: str = "Chlorine",
save_dir: str = "."
):
"""세 가지 모델 모두 GIF로 저장"""
for model in ["plume", "puff", "dense_gas"]:
path = f"{save_dir}/wing_hns_{model}_{substance.lower()}.gif"
run_animation(model, meteo, src, sim, substance,
save_path=path, show=False)
print(f"{model}{path}")
# ──────────────────────────────────────────────
# 10. 메인 실행 예시 (WING 기본값)
# ──────────────────────────────────────────────
if __name__ == "__main__":
# ── 기상 조건 (서해 해상 기준)
meteo = MeteoParams(
wind_speed = 6.0, # m/s
wind_dir_deg = 270.0, # 서풍 (동쪽으로 이동)
stability = "D", # 중립 (해상 대부분)
temperature = 288.15, # 15°C
mixing_height= 600.0, # m
)
# ── 누출원 (선박 탱크 파손 시나리오)
src = SourceParams(
Q = 20.0, # g/s (연속 누출)
Q_total = 10000.0, # g (순간 누출)
z0 = 1.0, # m (갑판 높이)
release_duration = 300.0, # s
molecular_weight = 71.0, # Chlorine
density_gas = 3.17, # kg/m³
pool_radius = 3.0, # m
)
# ── 시뮬레이션 범위
sim = SimParams(
x_range = (-100, 4000),
y_range = (-1000, 1000),
nx=220, ny=160,
t_end = 600.0,
dt = 30.0,
)
substance = "Chlorine"
print("=" * 55)
print(" WING HNS 대기확산 시뮬레이터")
print(f" 물질: {substance} | 안정도: {meteo.stability}")
print(f" 풍속: {meteo.wind_speed}m/s | 풍향: {meteo.wind_dir_deg}°")
print("=" * 55)
# 모델 선택 실행 (개별 실행 가능)
# run_animation("plume", meteo, src, sim, substance, save_path="wing_plume.gif")
# run_animation("puff", meteo, src, sim, substance, save_path="wing_puff.gif")
# run_animation("dense_gas", meteo, src, sim, substance, save_path="wing_dense.gif")
# 전체 모델 일괄 저장
run_all_models(meteo, src, sim, substance, save_dir=".")