[예측] - OpenDrift Python API 서버 및 스크립트 추가 (prediction/opendrift/) - 시뮬레이션 상태 폴링 훅(useSimulationStatus), 로딩 오버레이 추가 - HydrParticleOverlay: deck.gl 기반 입자 궤적 시각화 레이어 - OilSpillView/LeftPanel/RightPanel: 시뮬레이션 실행·결과 표시 UI 개편 - predictionService/predictionRouter: 시뮬레이션 CRUD 및 상태 관리 API - simulation.ts: OpenDrift 연동 엔드포인트 확장 - docs/PREDICTION-GUIDE.md: 예측 기능 개발 가이드 추가 [CCTV/항공방제] - CCTV 오일 감지 GPU 추론 연동 (OilDetectionOverlay, useOilDetection) - CCTV 안전관리 감지 기능 추가 (선박 출입, 침입 감지) - oil_inference_server.py: Python GPU 추론 서버 [관리자] - 관리자 화면 고도화 (사용자/권한/게시판/선박신호 패널) - AdminSidebar, BoardMgmtPanel, VesselSignalPanel 신규 컴포넌트 [기타] - DB: 시뮬레이션 결과, 선박보험 시드(1391건), 역할 정리 마이그레이션 - 팀 워크플로우 v1.6.1 동기화 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
253 lines
9.5 KiB
Python
253 lines
9.5 KiB
Python
"""
|
|
calcCostlineLength.py
|
|
|
|
기름 유출로 오염된 해안선 길이를 계산하는 모듈
|
|
Thread-safe하며 다른 스크립트에서 import하여 사용 가능
|
|
|
|
사용 예시:
|
|
from calcCostlineLength import OilSpillCoastlineAnalyzer
|
|
|
|
analyzer = OilSpillCoastlineAnalyzer("coastline.shp")
|
|
length, info = analyzer.calculate_polluted_length(particles)
|
|
"""
|
|
|
|
import geopandas as gpd
|
|
import numpy as np
|
|
from scipy.spatial import cKDTree
|
|
from typing import List, Dict, Tuple, Optional
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import os
|
|
|
|
from logger import get_logger
|
|
from utils import haversine_distance
|
|
|
|
logger = get_logger("calcCostlineLength")
|
|
|
|
|
|
class OilSpillCoastlineAnalyzer:
|
|
"""
|
|
기름 유출로 오염된 해안선 길이를 계산하는 클래스 (Thread-Safe)
|
|
|
|
Attributes:
|
|
coastline_gdf: 해안선 GeoDataFrame
|
|
buffer_distance: 입자 매칭 버퍼 거리 (도 단위)
|
|
coastline_points: 해안선 점들의 NumPy 배열
|
|
kdtree: 공간 검색을 위한 KD-Tree
|
|
segment_info: 세그먼트 정보 튜플
|
|
segment_lengths: 세그먼트 길이 배열 (미터)
|
|
"""
|
|
|
|
def __init__(self, coastline_shp_path: str, buffer_distance: float = 0.001,
|
|
simplify_tolerance: float = 0.001,
|
|
bbox: Optional[Tuple[float, float, float, float]] = None,
|
|
center_point: Optional[Tuple[float, float]] = None,
|
|
radius: Optional[float] = None):
|
|
|
|
if not os.path.exists(coastline_shp_path):
|
|
raise FileNotFoundError(f"Coastline file not found: {coastline_shp_path}")
|
|
|
|
self.coastline_gdf = gpd.read_file(coastline_shp_path)
|
|
|
|
if self.coastline_gdf.crs and self.coastline_gdf.crs != 'EPSG:4326':
|
|
self.coastline_gdf = self.coastline_gdf.to_crs('EPSG:4326')
|
|
logger.info(f"Original coastline features: {len(self.coastline_gdf):,}")
|
|
|
|
if bbox is not None:
|
|
self._filter_by_bbox(bbox)
|
|
elif center_point is not None and radius is not None:
|
|
self._filter_by_center(center_point, radius)
|
|
|
|
self.buffer_distance = buffer_distance
|
|
self.simplify_tolerance = simplify_tolerance
|
|
self._build_spatial_index()
|
|
|
|
def _filter_by_bbox(self, bbox: Tuple[float, float, float, float]):
|
|
"""경계 상자로 해안선 필터링"""
|
|
minx, miny, maxx, maxy = bbox
|
|
|
|
bounds = self.coastline_gdf.bounds
|
|
mask = (
|
|
(bounds['minx'] <= maxx) & (bounds['maxx'] >= minx) &
|
|
(bounds['miny'] <= maxy) & (bounds['maxy'] >= miny)
|
|
)
|
|
|
|
self.coastline_gdf = self.coastline_gdf[mask].copy()
|
|
logger.info(f"Filtered features: {len(self.coastline_gdf):,} "
|
|
f"({len(self.coastline_gdf) / len(mask) * 100:.1f}% retained)")
|
|
|
|
def _filter_by_center(self, center_point: Tuple[float, float], radius: float):
|
|
"""중심점과 반경으로 해안선 필터링"""
|
|
lon, lat = center_point
|
|
bbox = (lon - radius, lat - radius, lon + radius, lat + radius)
|
|
self._filter_by_bbox(bbox)
|
|
|
|
def _build_spatial_index(self):
|
|
"""해안선의 공간 인덱스 구축 (KD-Tree 사용)"""
|
|
|
|
if len(self.coastline_gdf) == 0:
|
|
logger.warning("No coastline after filtering!")
|
|
self.coastline_points = np.array([]).reshape(0, 2)
|
|
self.segment_info = tuple()
|
|
self.segment_lengths = {}
|
|
self.kdtree = None
|
|
return
|
|
|
|
coastline_points = []
|
|
segment_info = []
|
|
segment_lengths = {}
|
|
|
|
if self.simplify_tolerance > 0:
|
|
self.coastline_gdf.geometry = self.coastline_gdf.geometry.simplify(
|
|
self.simplify_tolerance, preserve_topology=False
|
|
)
|
|
|
|
for idx, geom in enumerate(self.coastline_gdf.geometry):
|
|
if geom.is_empty:
|
|
continue
|
|
|
|
if geom.geom_type == 'LineString':
|
|
coords = np.array(geom.coords)
|
|
for i in range(len(coords) - 1):
|
|
p1 = coords[i]
|
|
p2 = coords[i + 1]
|
|
|
|
seg_key = (idx, i)
|
|
|
|
if seg_key not in segment_lengths:
|
|
length_m = haversine_distance(p1[0], p1[1], p2[0], p2[1], return_km=False)
|
|
segment_lengths[seg_key] = length_m
|
|
|
|
coastline_points.append(p1)
|
|
segment_info.append(seg_key)
|
|
|
|
coastline_points.append(p2)
|
|
segment_info.append(seg_key)
|
|
|
|
elif geom.geom_type == 'MultiLineString':
|
|
for line_idx, line in enumerate(geom.geoms):
|
|
if line.is_empty:
|
|
continue
|
|
coords = np.array(line.coords)
|
|
for i in range(len(coords) - 1):
|
|
p1 = coords[i]
|
|
p2 = coords[i + 1]
|
|
|
|
seg_key = (idx, i, line_idx)
|
|
|
|
if seg_key not in segment_lengths:
|
|
length_m = haversine_distance(p1[0], p1[1], p2[0], p2[1], return_km=False)
|
|
segment_lengths[seg_key] = length_m
|
|
|
|
coastline_points.append(p1)
|
|
segment_info.append(seg_key)
|
|
|
|
coastline_points.append(p2)
|
|
segment_info.append(seg_key)
|
|
|
|
self.coastline_points = np.array(coastline_points)
|
|
self.segment_info = tuple(segment_info)
|
|
self.segment_lengths = segment_lengths
|
|
self.kdtree = cKDTree(self.coastline_points)
|
|
|
|
def calculate_polluted_length(self, particles: List[Dict]) -> Tuple[float, Dict]:
|
|
"""
|
|
오염된 해안선 길이 계산 (완전 Thread-Safe)
|
|
|
|
Args:
|
|
particles: 입자 정보 리스트
|
|
각 입자는 {"lon": float, "lat": float, "stranded": int} 형태
|
|
|
|
Returns:
|
|
tuple: (오염된 해안선 총 길이(m), 상세 정보 dict)
|
|
"""
|
|
stranded_particles = [p for p in particles if p.get('stranded', 0) == 1]
|
|
|
|
if not stranded_particles:
|
|
return 0.0, {
|
|
"polluted_segments": 0,
|
|
"total_particles": len(particles),
|
|
"stranded_particles": 0,
|
|
"affected_particles_in_buffer": 0
|
|
}
|
|
|
|
if self.kdtree is None or len(self.coastline_points) == 0:
|
|
return 0.0, {
|
|
"polluted_segments": 0,
|
|
"total_particles": len(particles),
|
|
"stranded_particles": len(stranded_particles),
|
|
"affected_particles_in_buffer": 0
|
|
}
|
|
|
|
particle_coords = np.array([[p['lon'], p['lat']] for p in stranded_particles])
|
|
|
|
distances, indices = self.kdtree.query(particle_coords, k=1)
|
|
|
|
valid_mask = distances < self.buffer_distance
|
|
valid_indices = indices[valid_mask]
|
|
|
|
if len(valid_indices) == 0:
|
|
return 0.0, {
|
|
"polluted_segments": 0,
|
|
"total_particles": len(particles),
|
|
"stranded_particles": len(stranded_particles),
|
|
"affected_particles_in_buffer": 0
|
|
}
|
|
|
|
polluted_segments = set()
|
|
for idx in valid_indices:
|
|
seg_info = self.segment_info[idx]
|
|
polluted_segments.add(seg_info)
|
|
|
|
total_length = sum(self.segment_lengths[seg] for seg in polluted_segments)
|
|
|
|
detail_info = {
|
|
"polluted_segments": len(polluted_segments),
|
|
"total_particles": len(particles),
|
|
"stranded_particles": len(stranded_particles),
|
|
"affected_particles_in_buffer": int(valid_mask.sum())
|
|
}
|
|
|
|
return total_length, detail_info
|
|
|
|
def calculate_polluted_length_batch(self,
|
|
particle_batches: List[List[Dict]],
|
|
max_workers: Optional[int] = None) -> List[Tuple[float, Dict]]:
|
|
"""여러 입자 배치를 병렬로 처리"""
|
|
results = []
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
futures = {executor.submit(self.calculate_polluted_length, batch): i
|
|
for i, batch in enumerate(particle_batches)}
|
|
|
|
for future in as_completed(futures):
|
|
results.append(future.result())
|
|
|
|
return results
|
|
|
|
def get_info(self) -> Dict:
|
|
"""분석기의 정보 반환"""
|
|
return {
|
|
"buffer_distance": self.buffer_distance,
|
|
"total_coastline_segments": len(set(self.segment_info)),
|
|
"total_coastline_points": len(self.coastline_points),
|
|
"coastline_features": len(self.coastline_gdf)
|
|
}
|
|
|
|
|
|
def create_analyzer(coastline_shp_path: str,
|
|
buffer_distance: float = 0.001,
|
|
simplify_tolerance: float = 0.001,
|
|
bbox: Optional[Tuple[float, float, float, float]] = None,
|
|
center_point: Optional[Tuple[float, float]] = None,
|
|
radius: Optional[float] = None) -> OilSpillCoastlineAnalyzer:
|
|
"""분석기 인스턴스 생성 (편의 함수)"""
|
|
return OilSpillCoastlineAnalyzer(coastline_shp_path, buffer_distance,
|
|
simplify_tolerance, bbox, center_point, radius)
|
|
|
|
|
|
def calculate_single(coastline_shp_path: str,
|
|
particles: List[Dict],
|
|
buffer_distance: float = 0.001) -> Tuple[float, Dict]:
|
|
"""한 번만 계산하는 경우 사용하는 편의 함수"""
|
|
analyzer = OilSpillCoastlineAnalyzer(coastline_shp_path, buffer_distance)
|
|
return analyzer.calculate_polluted_length(particles)
|