import numpy as np import cv2, os, rasterio import geopandas as gpd from shapely.geometry import Polygon from typing import List, Tuple, Dict, Optional from pathlib import Path import sys ''' Class ID: 1: Black oil 2: Brown oil 3: Rainbow oil 4: Silver oil ''' _SCRIPTS_DIR = Path(__file__).parent # mx15hdi/Polygon/Scripts/ _MX15HDI_DIR = _SCRIPTS_DIR.parent.parent # mx15hdi/ def get_class_mask(mask: np.ndarray, class_colors: Dict[int, Tuple[int, int, int]], threshold: int = 30) -> np.ndarray: """ Convert noisy RGB mask to a class ID mask using color distance threshold. """ h, w, _ = mask.shape class_mask = np.zeros((h, w), dtype=np.uint8) for class_id, target_color in class_colors.items(): diff = mask.astype(np.int16) - np.array(target_color, dtype=np.int16) dist = np.linalg.norm(diff, axis=2) class_mask[dist < threshold] = class_id return class_mask def mask_to_polygons(mask: np.ndarray, class_colors: Dict[int, Tuple[int, int, int]] = None, min_area: int = 0, # simplify: bool = False, simplify: bool = True, threshold: int = 30) -> Dict[int, List[Dict]]: if class_colors is None: class_colors = { 0: (0, 0, 0), # Background 1: (0, 0, 204), # Black oil 2: (180, 180, 180), # Brown oil 3: (255, 255, 0), # Rainbow oil 4: (178, 102, 255) # Silver oil } polygons = {} if len(mask.shape) == 3: class_mask = get_class_mask(mask, class_colors, threshold) else: class_mask = mask.astype(np.uint8) for class_id in class_colors: if class_id == 0: continue binary_mask = (class_mask == class_id).astype(np.uint8) contours, hierarchy = cv2.findContours(binary_mask, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_SIMPLE) if hierarchy is None or len(contours) == 0: polygons[class_id] = [] continue hierarchy = hierarchy[0] class_polygons = [] def is_valid(contour): return cv2.contourArea(contour) >= min_area and len(contour) >= 3 for idx, (cnt, h) in enumerate(zip(contours, hierarchy)): parent = h[3] if parent != -1: continue if not is_valid(cnt): continue if simplify: epsilon = 0.0005 * cv2.arcLength(cnt, True) exterior = cv2.approxPolyDP(cnt, epsilon, True).reshape(-1, 2) else: exterior = cnt.reshape(-1, 2) holes = [] child_id = h[2] while child_id != -1: hole_cnt = contours[child_id] if is_valid(hole_cnt): if simplify: eps_h = 0.005 * cv2.arcLength(hole_cnt, True) hole = cv2.approxPolyDP(hole_cnt, eps_h, True).reshape(-1, 2) else: hole = hole_cnt.reshape(-1, 2) holes.append(hole) child_id = hierarchy[child_id][0] class_polygons.append({'exterior': exterior, 'holes': holes}) polygons[class_id] = class_polygons return polygons def pixel_to_geo(coords: np.ndarray, transform) -> List[Tuple[float, float]]: """픽셀 좌표 배열을 지리 좌표 목록으로 변환한다 (벡터화 처리).""" xs, ys = coords[:, 0], coords[:, 1] lons, lats = rasterio.transform.xy(transform, ys, xs, offset='center') return list(zip(lons, lats)) def save_polygons_to_shapefile(polygons: Dict[int, List[Dict]], transform, crs, output_path: str): class_thickness_mm = { 1: 1.0, # Black oil (Emulsion) 2: 0.1, # Brown oil (Crude) 3: 0.0003, # Rainbow oil (Slick) 4: 0.0001 # Silver oil (Slick) } class_notes = { 1: "Black - Emulsion", 2: "Brown - Crude", 3: "Rainbow/Silver - Slick", 4: "Rainbow/Silver - Slick" } records = [] for class_id, class_polys in polygons.items(): for poly in class_polys: exterior_coords = np.array(poly['exterior']) exterior_geo = pixel_to_geo(exterior_coords, transform) if exterior_geo[0] != exterior_geo[-1]: exterior_geo.append(exterior_geo[0]) holes_geo = [] for hole in poly['holes']: hole_coords = np.array(hole) hole_geo = pixel_to_geo(hole_coords, transform) if hole_geo[0] != hole_geo[-1]: hole_geo.append(hole_geo[0]) holes_geo.append(hole_geo) shape = Polygon(shell=exterior_geo, holes=holes_geo if holes_geo else None) shape = shape.buffer(0) if not shape.is_valid or shape.is_empty: continue area = shape.area thickness_m = class_thickness_mm.get(class_id, 0) / 1000.0 volume = area * thickness_m note = class_notes.get(class_id, "Unknown") records.append({ 'geometry': shape, 'class_id': class_id, 'area_m2': area, 'volume_m3': volume, 'note': note }) if not records: print("No valid polygons to save for:", output_path) return gdf = gpd.GeoDataFrame(records, crs=crs) gdf.to_file(output_path) print(f"Saved shapefile: {output_path}") def _process_mask_entry(filename: str, mask_data: np.ndarray, transform, crs, output_shp_folder: Path): """하나의 mask 배열(메모리 또는 디스크 읽기 후)을 폴리곤으로 변환하여 저장한다.""" shp_output_path = output_shp_folder / (Path(filename).stem + ".shp") mask = mask_data if mask.ndim == 3 and mask.shape[0] in (1, 3, 4): # (C, H, W) → (H, W, C) mask = np.transpose(mask, (1, 2, 0)) if mask.shape[2] > 3: mask = mask[:, :, :3] elif mask.ndim == 3 and mask.shape[2] == 1: mask = np.squeeze(mask) polygons = mask_to_polygons(mask, simplify=False, threshold=30) save_polygons_to_shapefile(polygons, transform, crs, str(shp_output_path)) def run_oilshape(file_id: str, georef_cache: Optional[dict] = None): """ file_id 기준 마스크에서 유류 폴리곤을 추출하여 Shapefile로 저장한다. Args: file_id: 처리할 세션 식별자. georef_cache: run_georeference()의 반환값. 있으면 Mask_Tif 디스크 읽기 생략. {image_filename: {'mask': ndarray, 'transform': ..., 'crs': ...}} 결과: mx15hdi/Polygon/Shp/{file_id}/*.shp """ output_shp_folder = _MX15HDI_DIR / 'Polygon' / 'Shp' / file_id os.makedirs(output_shp_folder, exist_ok=True) if georef_cache: # In-memory 경로: Mask_Tif 디스크 읽기 없이 메모리 배열 사용 for filename, entry in georef_cache.items(): mask_data = entry.get('mask') transform = entry.get('transform') crs = entry.get('crs') if mask_data is None: print(f"mask 없음, 건너뜀: {filename}") continue _process_mask_entry(filename, mask_data, transform, crs, output_shp_folder) else: # 디스크 폴백: Mask_Tif 폴더에서 읽기 mask_folder = _MX15HDI_DIR / 'Georeference' / 'Mask_Tif' / file_id if not mask_folder.exists(): raise FileNotFoundError(f"마스크 폴더가 존재하지 않습니다: {mask_folder}") for filename in os.listdir(mask_folder): if not filename.endswith(".tif"): continue tif_mask_path = mask_folder / filename with rasterio.open(tif_mask_path) as src: mask_data = src.read() transform = src.transform crs = src.crs _process_mask_entry(filename, mask_data, transform, crs, output_shp_folder) if __name__ == "__main__": if len(sys.argv) < 2: raise ValueError("파라미터가 제공되지 않았습니다. 폴더 이름을 명령줄 인자로 입력해주세요.") run_oilshape(sys.argv[1])