import numpy as np import xarray as xr from datetime import datetime import pandas as pd from logger import get_logger from utils import haversine_distance, find_time_index, convert_and_round logger = get_logger("extractUvWithinBox") def _compute_hydr_region(ocean_ds: xr.Dataset, center_lon: float, center_lat: float, box_size_km: float = 25) -> dict: """ 해양 데이터셋에서 중심점 기준 공간 영역을 계산합니다. 시뮬레이션당 1회 호출. Parameters ---------- ocean_ds : xr.Dataset 이미 열린 해양 NetCDF 데이터셋 center_lon, center_lat : float 중심점 경도, 위도 box_size_km : float 상하좌우 범위 (km) Returns ------- dict min_row, max_row, min_col, max_col, lon_region, lat_region, lon_intervals, lat_intervals, bound_lon_lat, u_ndim """ lon = ocean_ds['lon'].values lat = ocean_ds['lat'].values if lon.ndim == 1 and lat.ndim == 1: lon_2d, lat_2d = np.meshgrid(lon, lat) else: lon_2d = lon lat_2d = lat # 동서/남북 거리 계산 dx = haversine_distance(center_lon, center_lat, lon_2d, center_lat, return_km=True) dx = dx * np.sign(lon_2d - center_lon) dy = haversine_distance(center_lon, center_lat, center_lon, lat_2d, return_km=True) dy = dy * np.sign(lat_2d - center_lat) mask = (np.abs(dx) <= box_size_km) & (np.abs(dy) <= box_size_km) rows, cols = np.where(mask) if len(rows) == 0: raise ValueError("No data within specified range") min_row, max_row = int(rows.min()), int(rows.max()) min_col, max_col = int(cols.min()), int(cols.max()) lon_region = lon_2d[min_row:max_row + 1, min_col:max_col + 1] lat_region = lat_2d[min_row:max_row + 1, min_col:max_col + 1] lon_intervals = [float(lon_region[0, i + 1] - lon_region[0, i]) for i in range(lon_region.shape[1] - 1)] lat_intervals = [float(lat_region[i + 1, 0] - lat_region[i, 0]) for i in range(lat_region.shape[0] - 1)] bound_lon_lat = { "top": float(lat_region.max()), "bottom": float(lat_region.min()), "left": float(lon_region.min()), "right": float(lon_region.max()), } # u 변수 차원 수 미리 파악 u_data = ocean_ds.get('u', ocean_ds.get('ssu')) u_ndim = u_data.ndim return { "min_row": min_row, "max_row": max_row, "min_col": min_col, "max_col": max_col, "rows": int(max_row - min_row + 1), "cols": int(max_col - min_col + 1), "lon_intervals": lon_intervals, "lat_intervals": lat_intervals, "bound_lon_lat": bound_lon_lat, "u_ndim": u_ndim, } def _extract_uv_at_time(ocean_ds: xr.Dataset, time_idx: int, region: dict) -> dict: """ 사전 계산된 공간 영역에서 특정 timestep의 u/v 데이터를 추출합니다. Parameters ---------- ocean_ds : xr.Dataset 이미 열린 해양 NetCDF 데이터셋 time_idx : int 시간 인덱스 region : dict _compute_hydr_region()이 반환한 영역 정보 Returns ------- dict {"value": [u_list, v_list], "grid": {...}} """ u_data = ocean_ds.get('u', ocean_ds.get('ssu')) v_data = ocean_ds.get('v', ocean_ds.get('ssv')) u_ndim = region["u_ndim"] if u_ndim == 3: u_2d = u_data[time_idx].values v_2d = v_data[time_idx].values elif u_ndim == 4: u_2d = u_data[time_idx, 0].values v_2d = v_data[time_idx, 0].values else: u_2d = u_data.values v_2d = v_data.values r = region u_region = u_2d[r["min_row"]:r["max_row"] + 1, r["min_col"]:r["max_col"] + 1] v_region = v_2d[r["min_row"]:r["max_row"] + 1, r["min_col"]:r["max_col"] + 1] land_mask = (u_region == 0) & (v_region == 0) u_list = convert_and_round(u_region, land_mask) v_list = convert_and_round(v_region, land_mask) return { "value": [u_list, v_list], "grid": { "lonInterval": r["lon_intervals"], "boundLonLat": r["bound_lon_lat"], "rows": r["rows"], "cols": r["cols"], "latInterval": r["lat_intervals"], }, } def extract_uv_within_box(nc_file, center_lon, center_lat, target_time, box_size_km=25): """ 선택한 포인트와 시간으로부터 상하좌우 정사각형 범위 내의 u, v 데이터 추출 Parameters ---------- nc_file : str NetCDF 파일 경로 center_lon : float 중심점 경도 center_lat : float 중심점 위도 target_time : str or datetime 목표 시간 (예: '2024-01-15 12:00:00') box_size_km : float 상하좌우 범위 (km), 기본값 25km Returns ------- result : dict 추출된 데이터를 담은 딕셔너리 """ with xr.open_dataset(nc_file) as ds: region = _compute_hydr_region(ds, center_lon, center_lat, box_size_km) time_idx, _ = find_time_index(ds, target_time) return _extract_uv_at_time(ds, time_idx, region)