wing-ops/prediction/opendrift/calcCostlineLength.py
jeonghyo.k 88eb6b121a feat(prediction): OpenDrift 유류 확산 시뮬레이션 통합 + CCTV/관리자 고도화
[예측]
- OpenDrift Python API 서버 및 스크립트 추가 (prediction/opendrift/)
- 시뮬레이션 상태 폴링 훅(useSimulationStatus), 로딩 오버레이 추가
- HydrParticleOverlay: deck.gl 기반 입자 궤적 시각화 레이어
- OilSpillView/LeftPanel/RightPanel: 시뮬레이션 실행·결과 표시 UI 개편
- predictionService/predictionRouter: 시뮬레이션 CRUD 및 상태 관리 API
- simulation.ts: OpenDrift 연동 엔드포인트 확장
- docs/PREDICTION-GUIDE.md: 예측 기능 개발 가이드 추가

[CCTV/항공방제]
- CCTV 오일 감지 GPU 추론 연동 (OilDetectionOverlay, useOilDetection)
- CCTV 안전관리 감지 기능 추가 (선박 출입, 침입 감지)
- oil_inference_server.py: Python GPU 추론 서버

[관리자]
- 관리자 화면 고도화 (사용자/권한/게시판/선박신호 패널)
- AdminSidebar, BoardMgmtPanel, VesselSignalPanel 신규 컴포넌트

[기타]
- DB: 시뮬레이션 결과, 선박보험 시드(1391건), 역할 정리 마이그레이션
- 팀 워크플로우 v1.6.1 동기화

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-09 14:55:46 +09:00

253 lines
9.5 KiB
Python

"""
calcCostlineLength.py
기름 유출로 오염된 해안선 길이를 계산하는 모듈
Thread-safe하며 다른 스크립트에서 import하여 사용 가능
사용 예시:
from calcCostlineLength import OilSpillCoastlineAnalyzer
analyzer = OilSpillCoastlineAnalyzer("coastline.shp")
length, info = analyzer.calculate_polluted_length(particles)
"""
import geopandas as gpd
import numpy as np
from scipy.spatial import cKDTree
from typing import List, Dict, Tuple, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
from logger import get_logger
from utils import haversine_distance
logger = get_logger("calcCostlineLength")
class OilSpillCoastlineAnalyzer:
"""
기름 유출로 오염된 해안선 길이를 계산하는 클래스 (Thread-Safe)
Attributes:
coastline_gdf: 해안선 GeoDataFrame
buffer_distance: 입자 매칭 버퍼 거리 (도 단위)
coastline_points: 해안선 점들의 NumPy 배열
kdtree: 공간 검색을 위한 KD-Tree
segment_info: 세그먼트 정보 튜플
segment_lengths: 세그먼트 길이 배열 (미터)
"""
def __init__(self, coastline_shp_path: str, buffer_distance: float = 0.001,
simplify_tolerance: float = 0.001,
bbox: Optional[Tuple[float, float, float, float]] = None,
center_point: Optional[Tuple[float, float]] = None,
radius: Optional[float] = None):
if not os.path.exists(coastline_shp_path):
raise FileNotFoundError(f"Coastline file not found: {coastline_shp_path}")
self.coastline_gdf = gpd.read_file(coastline_shp_path)
if self.coastline_gdf.crs and self.coastline_gdf.crs != 'EPSG:4326':
self.coastline_gdf = self.coastline_gdf.to_crs('EPSG:4326')
logger.info(f"Original coastline features: {len(self.coastline_gdf):,}")
if bbox is not None:
self._filter_by_bbox(bbox)
elif center_point is not None and radius is not None:
self._filter_by_center(center_point, radius)
self.buffer_distance = buffer_distance
self.simplify_tolerance = simplify_tolerance
self._build_spatial_index()
def _filter_by_bbox(self, bbox: Tuple[float, float, float, float]):
"""경계 상자로 해안선 필터링"""
minx, miny, maxx, maxy = bbox
bounds = self.coastline_gdf.bounds
mask = (
(bounds['minx'] <= maxx) & (bounds['maxx'] >= minx) &
(bounds['miny'] <= maxy) & (bounds['maxy'] >= miny)
)
self.coastline_gdf = self.coastline_gdf[mask].copy()
logger.info(f"Filtered features: {len(self.coastline_gdf):,} "
f"({len(self.coastline_gdf) / len(mask) * 100:.1f}% retained)")
def _filter_by_center(self, center_point: Tuple[float, float], radius: float):
"""중심점과 반경으로 해안선 필터링"""
lon, lat = center_point
bbox = (lon - radius, lat - radius, lon + radius, lat + radius)
self._filter_by_bbox(bbox)
def _build_spatial_index(self):
"""해안선의 공간 인덱스 구축 (KD-Tree 사용)"""
if len(self.coastline_gdf) == 0:
logger.warning("No coastline after filtering!")
self.coastline_points = np.array([]).reshape(0, 2)
self.segment_info = tuple()
self.segment_lengths = {}
self.kdtree = None
return
coastline_points = []
segment_info = []
segment_lengths = {}
if self.simplify_tolerance > 0:
self.coastline_gdf.geometry = self.coastline_gdf.geometry.simplify(
self.simplify_tolerance, preserve_topology=False
)
for idx, geom in enumerate(self.coastline_gdf.geometry):
if geom.is_empty:
continue
if geom.geom_type == 'LineString':
coords = np.array(geom.coords)
for i in range(len(coords) - 1):
p1 = coords[i]
p2 = coords[i + 1]
seg_key = (idx, i)
if seg_key not in segment_lengths:
length_m = haversine_distance(p1[0], p1[1], p2[0], p2[1], return_km=False)
segment_lengths[seg_key] = length_m
coastline_points.append(p1)
segment_info.append(seg_key)
coastline_points.append(p2)
segment_info.append(seg_key)
elif geom.geom_type == 'MultiLineString':
for line_idx, line in enumerate(geom.geoms):
if line.is_empty:
continue
coords = np.array(line.coords)
for i in range(len(coords) - 1):
p1 = coords[i]
p2 = coords[i + 1]
seg_key = (idx, i, line_idx)
if seg_key not in segment_lengths:
length_m = haversine_distance(p1[0], p1[1], p2[0], p2[1], return_km=False)
segment_lengths[seg_key] = length_m
coastline_points.append(p1)
segment_info.append(seg_key)
coastline_points.append(p2)
segment_info.append(seg_key)
self.coastline_points = np.array(coastline_points)
self.segment_info = tuple(segment_info)
self.segment_lengths = segment_lengths
self.kdtree = cKDTree(self.coastline_points)
def calculate_polluted_length(self, particles: List[Dict]) -> Tuple[float, Dict]:
"""
오염된 해안선 길이 계산 (완전 Thread-Safe)
Args:
particles: 입자 정보 리스트
각 입자는 {"lon": float, "lat": float, "stranded": int} 형태
Returns:
tuple: (오염된 해안선 총 길이(m), 상세 정보 dict)
"""
stranded_particles = [p for p in particles if p.get('stranded', 0) == 1]
if not stranded_particles:
return 0.0, {
"polluted_segments": 0,
"total_particles": len(particles),
"stranded_particles": 0,
"affected_particles_in_buffer": 0
}
if self.kdtree is None or len(self.coastline_points) == 0:
return 0.0, {
"polluted_segments": 0,
"total_particles": len(particles),
"stranded_particles": len(stranded_particles),
"affected_particles_in_buffer": 0
}
particle_coords = np.array([[p['lon'], p['lat']] for p in stranded_particles])
distances, indices = self.kdtree.query(particle_coords, k=1)
valid_mask = distances < self.buffer_distance
valid_indices = indices[valid_mask]
if len(valid_indices) == 0:
return 0.0, {
"polluted_segments": 0,
"total_particles": len(particles),
"stranded_particles": len(stranded_particles),
"affected_particles_in_buffer": 0
}
polluted_segments = set()
for idx in valid_indices:
seg_info = self.segment_info[idx]
polluted_segments.add(seg_info)
total_length = sum(self.segment_lengths[seg] for seg in polluted_segments)
detail_info = {
"polluted_segments": len(polluted_segments),
"total_particles": len(particles),
"stranded_particles": len(stranded_particles),
"affected_particles_in_buffer": int(valid_mask.sum())
}
return total_length, detail_info
def calculate_polluted_length_batch(self,
particle_batches: List[List[Dict]],
max_workers: Optional[int] = None) -> List[Tuple[float, Dict]]:
"""여러 입자 배치를 병렬로 처리"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(self.calculate_polluted_length, batch): i
for i, batch in enumerate(particle_batches)}
for future in as_completed(futures):
results.append(future.result())
return results
def get_info(self) -> Dict:
"""분석기의 정보 반환"""
return {
"buffer_distance": self.buffer_distance,
"total_coastline_segments": len(set(self.segment_info)),
"total_coastline_points": len(self.coastline_points),
"coastline_features": len(self.coastline_gdf)
}
def create_analyzer(coastline_shp_path: str,
buffer_distance: float = 0.001,
simplify_tolerance: float = 0.001,
bbox: Optional[Tuple[float, float, float, float]] = None,
center_point: Optional[Tuple[float, float]] = None,
radius: Optional[float] = None) -> OilSpillCoastlineAnalyzer:
"""분석기 인스턴스 생성 (편의 함수)"""
return OilSpillCoastlineAnalyzer(coastline_shp_path, buffer_distance,
simplify_tolerance, bbox, center_point, radius)
def calculate_single(coastline_shp_path: str,
particles: List[Dict],
buffer_distance: float = 0.001) -> Tuple[float, Dict]:
"""한 번만 계산하는 경우 사용하는 편의 함수"""
analyzer = OilSpillCoastlineAnalyzer(coastline_shp_path, buffer_distance)
return analyzer.calculate_polluted_length(particles)