wing-ops/prediction/opendrift/createJsonResult.py
jeonghyo.k 88eb6b121a feat(prediction): OpenDrift 유류 확산 시뮬레이션 통합 + CCTV/관리자 고도화
[예측]
- OpenDrift Python API 서버 및 스크립트 추가 (prediction/opendrift/)
- 시뮬레이션 상태 폴링 훅(useSimulationStatus), 로딩 오버레이 추가
- HydrParticleOverlay: deck.gl 기반 입자 궤적 시각화 레이어
- OilSpillView/LeftPanel/RightPanel: 시뮬레이션 실행·결과 표시 UI 개편
- predictionService/predictionRouter: 시뮬레이션 CRUD 및 상태 관리 API
- simulation.ts: OpenDrift 연동 엔드포인트 확장
- docs/PREDICTION-GUIDE.md: 예측 기능 개발 가이드 추가

[CCTV/항공방제]
- CCTV 오일 감지 GPU 추론 연동 (OilDetectionOverlay, useOilDetection)
- CCTV 안전관리 감지 기능 추가 (선박 출입, 침입 감지)
- oil_inference_server.py: Python GPU 추론 서버

[관리자]
- 관리자 화면 고도화 (사용자/권한/게시판/선박신호 패널)
- AdminSidebar, BoardMgmtPanel, VesselSignalPanel 신규 컴포넌트

[기타]
- DB: 시뮬레이션 결과, 선박보험 시드(1391건), 역할 정리 마이그레이션
- 팀 워크플로우 v1.6.1 동기화

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-09 14:55:46 +09:00

326 lines
18 KiB
Python

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import xarray as xr
import numpy as np
import pandas as pd
import os
from datetime import datetime, timedelta
from shapely.geometry import Polygon, Point
from convex_hull import get_convex_hull_from_json as get_convex_hull
from createWindJson import extract_wind_data_json as create_wind_json
from calcCostlineLength import OilSpillCoastlineAnalyzer
from extractUvWithinBox import _compute_hydr_region, _extract_uv_at_time
import concurrent.futures
from config import STORAGE, SIM, COASTLINE, THREAD_POOL
from logger import get_logger
from utils import check_img_file_by_date, find_time_index
logger = get_logger("createJsonResult")
def extract_and_save_data_to_json(ncfile_path, wind_ncfile_path, ocean_ncfile_path, name, ac_lon, ac_lat):
"""
NetCDF 파일에서 시간별 위치, 잔존량, 풍화량, 오염 면적을 추출하여 JSON 파일로 저장합니다.
"""
logger.info(f"Processing: {ncfile_path}, {wind_ncfile_path}, {ocean_ncfile_path}")
try:
if not os.path.exists(ncfile_path):
raise FileNotFoundError(f"Simulation result file not found: {ncfile_path}")
if not os.path.exists(wind_ncfile_path):
raise FileNotFoundError(f"Wind data file not found: {wind_ncfile_path}")
if not os.path.exists(ocean_ncfile_path):
raise FileNotFoundError(f"Ocean data file not found: {ocean_ncfile_path}")
analyzer = OilSpillCoastlineAnalyzer(
str(STORAGE.COASTLINE),
buffer_distance=COASTLINE.BUFFER_DISTANCE,
simplify_tolerance=COASTLINE.SIMPLIFY_TOLERANCE,
center_point=(ac_lon, ac_lat),
radius=COASTLINE.DEFAULT_RADIUS
)
with xr.open_dataset(ncfile_path) as ds, \
xr.open_dataset(wind_ncfile_path) as wind_ds, \
xr.open_dataset(ocean_ncfile_path) as ocean_ds:
# ------------------------------------------------------------------
# ① 시뮬레이션 변수 일괄 로드 — xarray → NumPy 1회 변환
# ------------------------------------------------------------------
total_steps = len(ds.time)
# OpenDrift NetCDF 차원 순서: (trajectory, time) = (N, T)
# .T 전치로 (T, N)으로 통일하여 이후 모든 연산이 axis=0=시간, axis=1=입자 기준이 되도록 함
status_all = ds.status.values.T if 'status' in ds else None # (T, N)
lon_all = ds.lon.values.T if 'lon' in ds else None
lat_all = ds.lat.values.T if 'lat' in ds else None
mass_all = ds.mass_oil.values.T if 'mass_oil' in ds else None
density_all = ds.density.values.T if 'density' in ds else None
evap_all = ds.mass_evaporated.values.T if 'mass_evaporated' in ds else None
moving_all = ds.moving.values.T if 'moving' in ds else None
viscosity_all = ds.viscosity.values.T if 'viscosity' in ds else None
watertemp_all = ds.sea_water_temperature.values.T if 'sea_water_temperature' in ds else None
oil_film_thick = (float(ds.oil_film_thickness.isel(time=0).values[0])
if 'oil_film_thickness' in ds else None)
initial_mass_total = None
if mass_all is not None:
try:
initial_mass_total = float(mass_all[0].sum())
logger.info(f"Initial oil mass: {initial_mass_total:.2f} kg")
except Exception as e:
logger.warning(f"Error processing mass_oil: {e}")
# ------------------------------------------------------------------
# ② 타임스탬프 사전 계산
# ------------------------------------------------------------------
time_values_pd = pd.to_datetime(ds.time.values)
formatted_times = [t.strftime('%Y-%m-%d %H:%M:%S') for t in time_values_pd]
# ------------------------------------------------------------------
# ③ 전처리 루프 벡터화 (기존 lines 77175 대체)
# ------------------------------------------------------------------
cumulative_evaporated_arr = np.zeros(total_steps)
cumulative_beached_arr = np.zeros(total_steps)
cumulative_pollution_area = {}
first_strand_step = None
last_valid_lon = None
last_valid_lat = None
if status_all is not None and lon_all is not None:
N = status_all.shape[1]
# 입자별 최초 stranded 스텝
stranded_mask = (status_all == 1) # (T, N)
has_stranded = stranded_mask.any(axis=0) # (N,)
first_strand_step = np.where(has_stranded,
stranded_mask.argmax(axis=0),
-1).astype(np.int32) # (N,)
# 입자별 마지막 유효 위치
valid_lon_mask = ~np.isnan(lon_all) # (T, N)
has_any_valid = valid_lon_mask.any(axis=0) # (N,)
last_valid_t = (total_steps - 1
- np.flip(valid_lon_mask, axis=0).argmax(axis=0)) # (N,)
pix = np.arange(N)
last_valid_lon = np.where(has_any_valid, lon_all[last_valid_t, pix], np.nan)
last_valid_lat = np.where(has_any_valid, lat_all[last_valid_t, pix], np.nan)
# 누적 증발량 — expanding max per particle
if evap_all is not None:
evap_clean = np.where(np.isnan(evap_all), -np.inf, evap_all)
run_evap = np.maximum.accumulate(evap_clean, axis=0) # (T, N)
run_evap = np.clip(run_evap, 0, None)
cumulative_evaporated_arr = run_evap.sum(axis=1) # (T,)
# 누적 부착량 — expanding max per stranded particle
if mass_all is not None and density_all is not None:
safe_den = np.where((density_all > 0) & ~np.isnan(density_all),
density_all, np.inf)
beach_vol = np.where(
stranded_mask & ~np.isnan(mass_all) & (mass_all > 0),
mass_all / safe_den, 0.0) # (T, N)
run_beach = np.maximum.accumulate(beach_vol, axis=0) # (T, N)
strand_threshold = np.where(first_strand_step >= 0,
first_strand_step,
total_steps)
active_matrix = (np.arange(total_steps)[:, None]
>= strand_threshold[None, :]) # (T, N)
cumulative_beached_arr = (run_beach * active_matrix).sum(axis=1) # (T,)
# 누적 오염 면적 — 순차 루프 유지 (set union 의존), 내부 연산은 vectorized
all_polluted_cells = set()
grid_config = None
for i in range(total_steps):
if mass_all is not None:
lon_i = lon_all[i]
lat_i = lat_all[i]
mass_i = mass_all[i]
valid_mask = (~np.isnan(lon_i)) & (~np.isnan(lat_i)) & (mass_i > 0)
if np.any(valid_mask):
lon_active = lon_i[valid_mask]
lat_active = lat_i[valid_mask]
if grid_config is None:
grid_config = {
'min_lon': lon_active.min() - 0.01,
'max_lon': lon_active.max() + 0.01,
'min_lat': lat_active.min() - 0.01,
'max_lat': lat_active.max() + 0.01,
'num_lon_bins': SIM.POLLUTION_GRID_BINS,
'num_lat_bins': SIM.POLLUTION_GRID_BINS,
}
else:
grid_config['min_lon'] = min(grid_config['min_lon'], lon_active.min() - 0.01)
grid_config['max_lon'] = max(grid_config['max_lon'], lon_active.max() + 0.01)
grid_config['min_lat'] = min(grid_config['min_lat'], lat_active.min() - 0.01)
grid_config['max_lat'] = max(grid_config['max_lat'], lat_active.max() + 0.01)
lon_bins = np.linspace(grid_config['min_lon'], grid_config['max_lon'],
grid_config['num_lon_bins'] + 1)
lat_bins = np.linspace(grid_config['min_lat'], grid_config['max_lat'],
grid_config['num_lat_bins'] + 1)
lon_indices = np.digitize(lon_active, lon_bins) - 1
lat_indices = np.digitize(lat_active, lat_bins) - 1
for lon_idx, lat_idx in zip(lon_indices, lat_indices):
if (0 <= lon_idx < grid_config['num_lon_bins']
and 0 <= lat_idx < grid_config['num_lat_bins']):
all_polluted_cells.add((lon_idx, lat_idx))
delta_lon_km = ((grid_config['max_lon'] - grid_config['min_lon'])
* SIM.KM_PER_DEG_LON)
delta_lat_km = ((grid_config['max_lat'] - grid_config['min_lat'])
* SIM.KM_PER_DEG_LAT)
area_of_cell_km2 = ((delta_lon_km / grid_config['num_lon_bins'])
* (delta_lat_km / grid_config['num_lat_bins']))
cumulative_pollution_area[i] = len(all_polluted_cells) * area_of_cell_km2
else:
cumulative_pollution_area[i] = cumulative_pollution_area.get(i - 1, 0.0) if i > 0 else 0.0
else:
cumulative_pollution_area[i] = cumulative_pollution_area.get(i - 1, 0.0) if i > 0 else 0.0
# ------------------------------------------------------------------
# ④ wind/hydr 전체 타임스텝 사전 추출 (파일 재오픈 없음)
# ------------------------------------------------------------------
logger.info("Pre-extracting hydr data for all timesteps...")
hydr_region = _compute_hydr_region(ocean_ds, ac_lon, ac_lat)
hydr_time_indices = [find_time_index(ocean_ds, t)[0] for t in formatted_times]
hydr_cache = [_extract_uv_at_time(ocean_ds, idx, hydr_region)
for idx in hydr_time_indices]
logger.info("Pre-extracting wind data for all timesteps...")
wind_cache = [create_wind_json(wind_ds, time_values_pd[i], ac_lon, ac_lat)
for i in range(total_steps)]
# ------------------------------------------------------------------
# ⑤ process_time_step — pre-loaded 배열 사용, 중복 xarray 호출 없음
# ------------------------------------------------------------------
def process_time_step(i):
formatted_time = formatted_times[i]
logger.debug(f"Processing time step: {formatted_time}")
lon_t = lon_all[i] if lon_all is not None else None
lat_t = lat_all[i] if lat_all is not None else None
mass_t = mass_all[i] if mass_all is not None else None
density_t = density_all[i] if density_all is not None else None
status_t = status_all[i] if status_all is not None else None
moving_t = moving_all[i] if moving_all is not None else None
viscosity_t = viscosity_all[i] if viscosity_all is not None else None
watertemp_t = watertemp_all[i] if watertemp_all is not None else None
# 활성 입자 처리
active_mask = moving_t > 0 if moving_t is not None else None
active_lon = lon_t[active_mask] if (active_mask is not None and lon_t is not None) else []
active_lat = lat_t[active_mask] if (active_mask is not None and lat_t is not None) else []
center_lon, center_lat = None, None
if len(active_lon) > 0 and len(active_lat) > 0:
center_lon = float(np.mean(active_lon))
center_lat = float(np.mean(active_lat))
# 입자 목록 구성 (stranded 처리 + last valid position 사용)
particles = []
if lon_t is not None and lat_t is not None and first_strand_step is not None:
stranded_flags = (first_strand_step >= 0) & (i >= first_strand_step) # (N,)
for idx in range(len(lon_t)):
lon_val = lon_t[idx]
lat_val = lat_t[idx]
stranded_value = int(stranded_flags[idx])
if stranded_value == 1 and (np.isnan(lon_val) or np.isnan(lat_val)):
if last_valid_lon is not None and not np.isnan(last_valid_lon[idx]):
lon_val = last_valid_lon[idx]
lat_val = last_valid_lat[idx]
if np.isnan(lon_val) or np.isnan(lat_val):
continue
particles.append({
"lon": float(lon_val),
"lat": float(lat_val),
"stranded": stranded_value,
})
# 오염 해안 길이
length, info = analyzer.calculate_polluted_length(particles)
# Convex hull
try:
hull_coords = get_convex_hull(particles)
wkt = Point(hull_coords[0]).wkt if len(hull_coords) == 1 else Polygon(hull_coords).wkt
except ValueError as e:
logger.warning(f"Convex hull error: {e}")
wkt = ""
# 잔존량 (status == 0 해상 입자)
remaining_volume_m3 = 0.0
if mass_t is not None and density_t is not None and status_t is not None:
sea_mask = (status_t == 0)
sea_masses = mass_t[sea_mask]
sea_densities = density_t[sea_mask]
valid_sea = ~np.isnan(sea_masses) & (sea_masses > 0) & ~np.isnan(sea_densities) & (sea_densities > 0)
if np.any(valid_sea):
avg_density = np.mean(sea_densities[valid_sea])
if avg_density > 0:
remaining_volume_m3 = float(np.sum(sea_masses[valid_sea]) / avg_density)
# 누적 증발량 → 부피
evaporated_mass = float(cumulative_evaporated_arr[i])
weathered_volume_m3 = 0.0
if evaporated_mass > 0 and density_t is not None:
valid_den = ~np.isnan(density_t) & (density_t > 0)
if np.any(valid_den):
avg_density = float(np.mean(density_t[valid_den]))
if avg_density > 0:
weathered_volume_m3 = evaporated_mass / avg_density
beached_volume_m3 = float(cumulative_beached_arr[i])
pollution_area = cumulative_pollution_area.get(i, 0.0)
average_viscosity = float(np.nanmean(viscosity_t)) if (viscosity_t is not None and len(viscosity_t) > 0) else None
average_water_temp = float(np.nanmean(watertemp_t)) if (watertemp_t is not None and len(watertemp_t) > 0) else None
hydr_data = hydr_cache[i]
return {
"time": formatted_time,
"center_lon": float(center_lon) if center_lon is not None else None,
"center_lat": float(center_lat) if center_lat is not None else None,
"remaining_volume_m3": float(remaining_volume_m3),
"weathered_volume_m3": float(weathered_volume_m3),
"pollution_area_km2": float(pollution_area),
"beached_volume_m3": float(beached_volume_m3),
"particles": particles,
"wkt": wkt,
"viscosity": average_viscosity,
"thickness": oil_film_thick,
"temperature": average_water_temp,
"wind_data": wind_cache[i],
"hydr_data": hydr_data['value'],
"hydr_grid": hydr_data['grid'],
"pollution_coast_length_m": length,
}
# ThreadPoolExecutor — wind/hydr I/O 없으므로 CPU 위주 작업만
with concurrent.futures.ThreadPoolExecutor(max_workers=THREAD_POOL.MAX_WORKERS) as executor:
futures = {executor.submit(process_time_step, i): i for i in range(total_steps)}
results = [None] * total_steps
for future in concurrent.futures.as_completed(futures):
idx = futures[future]
results[idx] = future.result()
return results
except FileNotFoundError as e:
logger.error(f"File not found: {e}")
return None
except Exception as e:
logger.exception(f"An unexpected error occurred: {e}")
return None