wing-ops/prediction/opendrift/utils.py
jeonghyo.k 88eb6b121a feat(prediction): OpenDrift 유류 확산 시뮬레이션 통합 + CCTV/관리자 고도화
[예측]
- OpenDrift Python API 서버 및 스크립트 추가 (prediction/opendrift/)
- 시뮬레이션 상태 폴링 훅(useSimulationStatus), 로딩 오버레이 추가
- HydrParticleOverlay: deck.gl 기반 입자 궤적 시각화 레이어
- OilSpillView/LeftPanel/RightPanel: 시뮬레이션 실행·결과 표시 UI 개편
- predictionService/predictionRouter: 시뮬레이션 CRUD 및 상태 관리 API
- simulation.ts: OpenDrift 연동 엔드포인트 확장
- docs/PREDICTION-GUIDE.md: 예측 기능 개발 가이드 추가

[CCTV/항공방제]
- CCTV 오일 감지 GPU 추론 연동 (OilDetectionOverlay, useOilDetection)
- CCTV 안전관리 감지 기능 추가 (선박 출입, 침입 감지)
- oil_inference_server.py: Python GPU 추론 서버

[관리자]
- 관리자 화면 고도화 (사용자/권한/게시판/선박신호 패널)
- AdminSidebar, BoardMgmtPanel, VesselSignalPanel 신규 컴포넌트

[기타]
- DB: 시뮬레이션 결과, 선박보험 시드(1391건), 역할 정리 마이그레이션
- 팀 워크플로우 v1.6.1 동기화

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-09 14:55:46 +09:00

268 lines
7.9 KiB
Python

"""
utils.py
공통 유틸리티 함수 모듈
여러 모듈에서 중복 사용되는 함수들을 통합합니다.
"""
import os
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Optional, Tuple, List
from config import STORAGE, SIM, FilePatterns
def haversine_distance(lon1: float, lat1: float, lon2: float, lat2: float,
return_km: bool = True) -> float:
"""
두 지점 간의 Haversine 거리를 계산합니다.
Parameters
----------
lon1, lat1 : float
첫 번째 지점의 경도, 위도
lon2, lat2 : float
두 번째 지점의 경도, 위도
return_km : bool
True이면 km 단위, False이면 m 단위로 반환
Returns
-------
float
두 지점 간의 거리
"""
lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])
dlon = lon2 - lon1
dlat = lat2 - lat1
a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
c = 2 * np.arcsin(np.sqrt(a))
if return_km:
return c * SIM.EARTH_RADIUS_KM
else:
return c * SIM.EARTH_RADIUS_M
def find_time_index(ds, target_time) -> Tuple[int, datetime]:
"""
입력된 시간과 동일하거나 과거이면서 가장 가까운 시간의 인덱스를 찾습니다.
Parameters
----------
ds : xarray.Dataset
NetCDF 데이터셋
target_time : str or datetime
목표 시간 (예: '2024-01-15 12:00:00' 또는 datetime 객체)
Returns
-------
time_idx : int
선택된 시간의 인덱스
selected_time : datetime
선택된 시간
Raises
------
ValueError
목표 시간 이전의 데이터가 없는 경우
"""
if isinstance(target_time, str):
target_time = pd.to_datetime(target_time)
time_var = ds['time'].values
times = pd.to_datetime(time_var)
valid_times = times[times <= target_time]
if len(valid_times) == 0:
raise ValueError(f"목표 시간 {target_time} 이전의 데이터가 없습니다. "
f"데이터의 시작 시간: {times[0]}")
selected_time = valid_times.max()
time_idx = np.where(times == selected_time)[0][0]
return time_idx, selected_time
def convert_and_round(arr: np.ndarray, land_mask: np.ndarray,
land_value: int = 0, decimals: int = 3) -> List[List]:
"""
2D numpy 배열을 리스트로 변환하면서 반올림 및 육지 마스킹을 처리합니다.
Parameters
----------
arr : np.ndarray
변환할 2D 배열
land_mask : np.ndarray
육지 마스크 (True인 위치는 육지)
land_value : int
육지 값 (기본값: 0)
decimals : int
반올림 소수점 자릿수 (기본값: 3)
Returns
-------
List[List]
변환된 2D 리스트
"""
result = np.where(
land_mask | np.isnan(arr),
float(land_value),
np.round(arr.astype(float), decimals)
)
return result.tolist()
def check_nc_file_by_date(base_path: str, date_obj: datetime,
max_attempts: int = None) -> Tuple[Optional[str], Optional[datetime]]:
"""
주어진 날짜로 NC 파일이 존재하는지 확인하고, 없으면 이전 날짜로 재시도합니다.
Parameters
----------
base_path : str
기본 저장 경로
date_obj : datetime
시작 날짜
max_attempts : int, optional
최대 시도 횟수 (기본값: FILE_FALLBACK_DAYS)
Returns
-------
file_path : str or None
찾은 파일 경로, 없으면 None
final_date : datetime or None
최종 사용된 날짜, 없으면 None
"""
if max_attempts is None:
max_attempts = SIM.FILE_FALLBACK_DAYS
is_wind = "wind" in base_path.lower()
for _ in range(max_attempts):
date_str = date_obj.strftime("%Y%m%d")
dir_path = os.path.join(base_path, date_str)
if not os.path.exists(dir_path):
date_obj -= timedelta(days=1)
continue
if is_wind:
file_name = FilePatterns.get_wind_filename(date_str)
else:
file_name = FilePatterns.get_hydr_filename(date_str)
file_path = os.path.join(dir_path, file_name)
if os.path.exists(file_path):
return file_path, date_obj
date_obj -= timedelta(days=1)
return None, None
def check_nc_files_for_date(date_obj: datetime,
primary_wind_path: str = None,
fallback_wind_path: str = None,
primary_hydr_path: str = None,
fallback_hydr_path: str = None) -> Tuple[Optional[str], Optional[str], Optional[datetime], Optional[datetime]]:
"""
바람과 해양 NC 파일을 모두 확인합니다. (primary 경로 우선, fallback 경로 대체)
Parameters
----------
date_obj : datetime
시작 날짜
primary_wind_path : str, optional
바람 데이터 기본 경로 (기본값: /storage/pos_wind)
fallback_wind_path : str, optional
바람 데이터 대체 경로 (기본값: /storage/wind)
primary_hydr_path : str, optional
해양 데이터 기본 경로 (기본값: /storage/pos_hydr)
fallback_hydr_path : str, optional
해양 데이터 대체 경로 (기본값: /storage/hydr)
Returns
-------
wind_nc_path : str or None
ocean_nc_path : str or None
wind_date : datetime or None
ocean_date : datetime or None
"""
if primary_wind_path is None:
primary_wind_path = str(STORAGE.POS_WIND)
if fallback_wind_path is None:
fallback_wind_path = str(STORAGE.WIND)
if primary_hydr_path is None:
primary_hydr_path = str(STORAGE.POS_HYDR)
if fallback_hydr_path is None:
fallback_hydr_path = str(STORAGE.HYDR)
# 바람 파일 확인
wind_nc_path, wind_date = check_nc_file_by_date(primary_wind_path, date_obj)
if not wind_nc_path:
wind_nc_path, wind_date = check_nc_file_by_date(fallback_wind_path, date_obj)
# 해양 파일 확인
ocean_nc_path, ocean_date = check_nc_file_by_date(primary_hydr_path, date_obj)
if not ocean_nc_path:
ocean_nc_path, ocean_date = check_nc_file_by_date(fallback_hydr_path, date_obj)
return wind_nc_path, ocean_nc_path, wind_date, ocean_date
def check_img_file_by_date(date_obj: datetime, img_type: str,
max_attempts: int = None) -> Tuple[Optional[str], Optional[datetime]]:
"""
주어진 날짜로 이미지 폴더가 존재하는지 확인하고, 없으면 이전 날짜로 재시도합니다.
Parameters
----------
date_obj : datetime
시작 날짜
img_type : str
이미지 타입 (예: "wind", "hydr", "pos_wind", "pos_hydr")
max_attempts : int, optional
최대 시도 횟수 (기본값: FILE_FALLBACK_DAYS)
Returns
-------
folder_path : str or None
찾은 폴더 경로, 없으면 None
final_date : datetime or None
최종 사용된 날짜, 없으면 None
"""
if max_attempts is None:
max_attempts = SIM.FILE_FALLBACK_DAYS
for _ in range(max_attempts):
date_str = date_obj.strftime("%Y%m%d")
dir_path = os.path.join(f"/storage/{img_type}", date_str)
if not os.path.exists(dir_path):
date_obj -= timedelta(days=1)
continue
file_path = os.path.join(dir_path, "visual_image")
if os.path.exists(file_path):
return file_path, date_obj
date_obj -= timedelta(days=1)
return None, None
def kst_to_utc(dt: datetime) -> datetime:
"""KST 시간을 UTC로 변환합니다."""
return dt - timedelta(hours=SIM.TIMEZONE_OFFSET_HOURS)
def utc_to_kst(dt: datetime) -> datetime:
"""UTC 시간을 KST로 변환합니다."""
return dt + timedelta(hours=SIM.TIMEZONE_OFFSET_HOURS)