""" calcCostlineLength.py 기름 유출로 오염된 해안선 길이를 계산하는 모듈 Thread-safe하며 다른 스크립트에서 import하여 사용 가능 사용 예시: from calcCostlineLength import OilSpillCoastlineAnalyzer analyzer = OilSpillCoastlineAnalyzer("coastline.shp") length, info = analyzer.calculate_polluted_length(particles) """ import geopandas as gpd import numpy as np from scipy.spatial import cKDTree from typing import List, Dict, Tuple, Optional from concurrent.futures import ThreadPoolExecutor, as_completed import os from logger import get_logger from utils import haversine_distance logger = get_logger("calcCostlineLength") class OilSpillCoastlineAnalyzer: """ 기름 유출로 오염된 해안선 길이를 계산하는 클래스 (Thread-Safe) Attributes: coastline_gdf: 해안선 GeoDataFrame buffer_distance: 입자 매칭 버퍼 거리 (도 단위) coastline_points: 해안선 점들의 NumPy 배열 kdtree: 공간 검색을 위한 KD-Tree segment_info: 세그먼트 정보 튜플 segment_lengths: 세그먼트 길이 배열 (미터) """ def __init__(self, coastline_shp_path: str, buffer_distance: float = 0.001, simplify_tolerance: float = 0.001, bbox: Optional[Tuple[float, float, float, float]] = None, center_point: Optional[Tuple[float, float]] = None, radius: Optional[float] = None): if not os.path.exists(coastline_shp_path): raise FileNotFoundError(f"Coastline file not found: {coastline_shp_path}") self.coastline_gdf = gpd.read_file(coastline_shp_path) if self.coastline_gdf.crs and self.coastline_gdf.crs != 'EPSG:4326': self.coastline_gdf = self.coastline_gdf.to_crs('EPSG:4326') logger.info(f"Original coastline features: {len(self.coastline_gdf):,}") if bbox is not None: self._filter_by_bbox(bbox) elif center_point is not None and radius is not None: self._filter_by_center(center_point, radius) self.buffer_distance = buffer_distance self.simplify_tolerance = simplify_tolerance self._build_spatial_index() def _filter_by_bbox(self, bbox: Tuple[float, float, float, float]): """경계 상자로 해안선 필터링""" minx, miny, maxx, maxy = bbox bounds = self.coastline_gdf.bounds mask = ( (bounds['minx'] <= maxx) & (bounds['maxx'] >= minx) & (bounds['miny'] <= maxy) & (bounds['maxy'] >= miny) ) self.coastline_gdf = self.coastline_gdf[mask].copy() logger.info(f"Filtered features: {len(self.coastline_gdf):,} " f"({len(self.coastline_gdf) / len(mask) * 100:.1f}% retained)") def _filter_by_center(self, center_point: Tuple[float, float], radius: float): """중심점과 반경으로 해안선 필터링""" lon, lat = center_point bbox = (lon - radius, lat - radius, lon + radius, lat + radius) self._filter_by_bbox(bbox) def _build_spatial_index(self): """해안선의 공간 인덱스 구축 (KD-Tree 사용)""" if len(self.coastline_gdf) == 0: logger.warning("No coastline after filtering!") self.coastline_points = np.array([]).reshape(0, 2) self.segment_info = tuple() self.segment_lengths = {} self.kdtree = None return coastline_points = [] segment_info = [] segment_lengths = {} if self.simplify_tolerance > 0: self.coastline_gdf.geometry = self.coastline_gdf.geometry.simplify( self.simplify_tolerance, preserve_topology=False ) for idx, geom in enumerate(self.coastline_gdf.geometry): if geom.is_empty: continue if geom.geom_type == 'LineString': coords = np.array(geom.coords) for i in range(len(coords) - 1): p1 = coords[i] p2 = coords[i + 1] seg_key = (idx, i) if seg_key not in segment_lengths: length_m = haversine_distance(p1[0], p1[1], p2[0], p2[1], return_km=False) segment_lengths[seg_key] = length_m coastline_points.append(p1) segment_info.append(seg_key) coastline_points.append(p2) segment_info.append(seg_key) elif geom.geom_type == 'MultiLineString': for line_idx, line in enumerate(geom.geoms): if line.is_empty: continue coords = np.array(line.coords) for i in range(len(coords) - 1): p1 = coords[i] p2 = coords[i + 1] seg_key = (idx, i, line_idx) if seg_key not in segment_lengths: length_m = haversine_distance(p1[0], p1[1], p2[0], p2[1], return_km=False) segment_lengths[seg_key] = length_m coastline_points.append(p1) segment_info.append(seg_key) coastline_points.append(p2) segment_info.append(seg_key) self.coastline_points = np.array(coastline_points) self.segment_info = tuple(segment_info) self.segment_lengths = segment_lengths self.kdtree = cKDTree(self.coastline_points) def calculate_polluted_length(self, particles: List[Dict]) -> Tuple[float, Dict]: """ 오염된 해안선 길이 계산 (완전 Thread-Safe) Args: particles: 입자 정보 리스트 각 입자는 {"lon": float, "lat": float, "stranded": int} 형태 Returns: tuple: (오염된 해안선 총 길이(m), 상세 정보 dict) """ stranded_particles = [p for p in particles if p.get('stranded', 0) == 1] if not stranded_particles: return 0.0, { "polluted_segments": 0, "total_particles": len(particles), "stranded_particles": 0, "affected_particles_in_buffer": 0 } if self.kdtree is None or len(self.coastline_points) == 0: return 0.0, { "polluted_segments": 0, "total_particles": len(particles), "stranded_particles": len(stranded_particles), "affected_particles_in_buffer": 0 } particle_coords = np.array([[p['lon'], p['lat']] for p in stranded_particles]) distances, indices = self.kdtree.query(particle_coords, k=1) valid_mask = distances < self.buffer_distance valid_indices = indices[valid_mask] if len(valid_indices) == 0: return 0.0, { "polluted_segments": 0, "total_particles": len(particles), "stranded_particles": len(stranded_particles), "affected_particles_in_buffer": 0 } polluted_segments = set() for idx in valid_indices: seg_info = self.segment_info[idx] polluted_segments.add(seg_info) total_length = sum(self.segment_lengths[seg] for seg in polluted_segments) detail_info = { "polluted_segments": len(polluted_segments), "total_particles": len(particles), "stranded_particles": len(stranded_particles), "affected_particles_in_buffer": int(valid_mask.sum()) } return total_length, detail_info def calculate_polluted_length_batch(self, particle_batches: List[List[Dict]], max_workers: Optional[int] = None) -> List[Tuple[float, Dict]]: """여러 입자 배치를 병렬로 처리""" results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(self.calculate_polluted_length, batch): i for i, batch in enumerate(particle_batches)} for future in as_completed(futures): results.append(future.result()) return results def get_info(self) -> Dict: """분석기의 정보 반환""" return { "buffer_distance": self.buffer_distance, "total_coastline_segments": len(set(self.segment_info)), "total_coastline_points": len(self.coastline_points), "coastline_features": len(self.coastline_gdf) } def create_analyzer(coastline_shp_path: str, buffer_distance: float = 0.001, simplify_tolerance: float = 0.001, bbox: Optional[Tuple[float, float, float, float]] = None, center_point: Optional[Tuple[float, float]] = None, radius: Optional[float] = None) -> OilSpillCoastlineAnalyzer: """분석기 인스턴스 생성 (편의 함수)""" return OilSpillCoastlineAnalyzer(coastline_shp_path, buffer_distance, simplify_tolerance, bbox, center_point, radius) def calculate_single(coastline_shp_path: str, particles: List[Dict], buffer_distance: float = 0.001) -> Tuple[float, Dict]: """한 번만 계산하는 경우 사용하는 편의 함수""" analyzer = OilSpillCoastlineAnalyzer(coastline_shp_path, buffer_distance) return analyzer.calculate_polluted_length(particles)