wing-ops/prediction/image/mx15hdi/Polygon/Scripts/Oilshape.py
jeonghyo.k 3946ff6a25 feat(prediction): 이미지 분석 서버 Docker 패키징 + DB 코드 제거
- prediction/image/ FastAPI 서버 Docker 환경 구성
  - Dockerfile: PyTorch 2.1 + CUDA 12.1 기반 GPU 이미지
  - docker-compose.yml: GPU 할당 + 데이터 볼륨 마운트
  - requirements.txt: 서버 의존성 목록
  - .env.example: 환경변수 템플릿
  - DOCKER_USAGE.md: 빌드/실행/API 사용법 문서
  - Dockerfile에 .dockerignore 제외 폴더 mkdir -p 추가
- .gitignore: prediction/image 결과물 및 모델 가중치(.pth) 제외 추가
- dbInsert_csv.py, dbInsert_shp.py 삭제 (미사용 DB 로직)
- api.py: dbInsert import 및 주석 처리된 DB 호출 코드 제거
- aerialRouter.ts: req.params 타입 오류 수정
2026-03-10 18:37:36 +09:00

250 lines
8.4 KiB
Python

import numpy as np
import cv2, os, rasterio
import geopandas as gpd
from shapely.geometry import Polygon
from typing import List, Tuple, Dict, Optional
from pathlib import Path
import sys
''' Class ID:
1: Black oil
2: Brown oil
3: Rainbow oil
4: Silver oil
'''
_SCRIPTS_DIR = Path(__file__).parent # mx15hdi/Polygon/Scripts/
_MX15HDI_DIR = _SCRIPTS_DIR.parent.parent # mx15hdi/
def get_class_mask(mask: np.ndarray,
class_colors: Dict[int, Tuple[int, int, int]],
threshold: int = 30) -> np.ndarray:
"""
Convert noisy RGB mask to a class ID mask using color distance threshold.
"""
h, w, _ = mask.shape
class_mask = np.zeros((h, w), dtype=np.uint8)
for class_id, target_color in class_colors.items():
diff = mask.astype(np.int16) - np.array(target_color, dtype=np.int16)
dist = np.linalg.norm(diff, axis=2)
class_mask[dist < threshold] = class_id
return class_mask
def mask_to_polygons(mask: np.ndarray,
class_colors: Dict[int, Tuple[int, int, int]] = None,
min_area: int = 0,
# simplify: bool = False,
simplify: bool = True,
threshold: int = 30) -> Dict[int, List[Dict]]:
if class_colors is None:
class_colors = {
0: (0, 0, 0), # Background
1: (0, 0, 204), # Black oil
2: (180, 180, 180), # Brown oil
3: (255, 255, 0), # Rainbow oil
4: (178, 102, 255) # Silver oil
}
polygons = {}
if len(mask.shape) == 3:
class_mask = get_class_mask(mask, class_colors, threshold)
else:
class_mask = mask.astype(np.uint8)
for class_id in class_colors:
if class_id == 0:
continue
binary_mask = (class_mask == class_id).astype(np.uint8)
contours, hierarchy = cv2.findContours(binary_mask, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_SIMPLE)
if hierarchy is None or len(contours) == 0:
polygons[class_id] = []
continue
hierarchy = hierarchy[0]
class_polygons = []
def is_valid(contour):
return cv2.contourArea(contour) >= min_area and len(contour) >= 3
for idx, (cnt, h) in enumerate(zip(contours, hierarchy)):
parent = h[3]
if parent != -1:
continue
if not is_valid(cnt):
continue
if simplify:
epsilon = 0.0005 * cv2.arcLength(cnt, True)
exterior = cv2.approxPolyDP(cnt, epsilon, True).reshape(-1, 2)
else:
exterior = cnt.reshape(-1, 2)
holes = []
child_id = h[2]
while child_id != -1:
hole_cnt = contours[child_id]
if is_valid(hole_cnt):
if simplify:
eps_h = 0.005 * cv2.arcLength(hole_cnt, True)
hole = cv2.approxPolyDP(hole_cnt, eps_h, True).reshape(-1, 2)
else:
hole = hole_cnt.reshape(-1, 2)
holes.append(hole)
child_id = hierarchy[child_id][0]
class_polygons.append({'exterior': exterior, 'holes': holes})
polygons[class_id] = class_polygons
return polygons
def pixel_to_geo(coords: np.ndarray, transform) -> List[Tuple[float, float]]:
"""픽셀 좌표 배열을 지리 좌표 목록으로 변환한다 (벡터화 처리)."""
xs, ys = coords[:, 0], coords[:, 1]
lons, lats = rasterio.transform.xy(transform, ys, xs, offset='center')
return list(zip(lons, lats))
def save_polygons_to_shapefile(polygons: Dict[int, List[Dict]],
transform,
crs,
output_path: str):
class_thickness_mm = {
1: 1.0, # Black oil (Emulsion)
2: 0.1, # Brown oil (Crude)
3: 0.0003, # Rainbow oil (Slick)
4: 0.0001 # Silver oil (Slick)
}
class_notes = {
1: "Black - Emulsion",
2: "Brown - Crude",
3: "Rainbow/Silver - Slick",
4: "Rainbow/Silver - Slick"
}
records = []
for class_id, class_polys in polygons.items():
for poly in class_polys:
exterior_coords = np.array(poly['exterior'])
exterior_geo = pixel_to_geo(exterior_coords, transform)
if exterior_geo[0] != exterior_geo[-1]:
exterior_geo.append(exterior_geo[0])
holes_geo = []
for hole in poly['holes']:
hole_coords = np.array(hole)
hole_geo = pixel_to_geo(hole_coords, transform)
if hole_geo[0] != hole_geo[-1]:
hole_geo.append(hole_geo[0])
holes_geo.append(hole_geo)
shape = Polygon(shell=exterior_geo, holes=holes_geo if holes_geo else None)
shape = shape.buffer(0)
if not shape.is_valid or shape.is_empty:
continue
area = shape.area
thickness_m = class_thickness_mm.get(class_id, 0) / 1000.0
volume = area * thickness_m
note = class_notes.get(class_id, "Unknown")
records.append({
'geometry': shape,
'class_id': class_id,
'area_m2': area,
'volume_m3': volume,
'note': note
})
if not records:
print("No valid polygons to save for:", output_path)
return
gdf = gpd.GeoDataFrame(records, crs=crs)
gdf.to_file(output_path)
print(f"Saved shapefile: {output_path}")
def _process_mask_entry(filename: str, mask_data: np.ndarray, transform, crs,
output_shp_folder: Path):
"""하나의 mask 배열(메모리 또는 디스크 읽기 후)을 폴리곤으로 변환하여 저장한다."""
shp_output_path = output_shp_folder / (Path(filename).stem + ".shp")
mask = mask_data
if mask.ndim == 3 and mask.shape[0] in (1, 3, 4):
# (C, H, W) → (H, W, C)
mask = np.transpose(mask, (1, 2, 0))
if mask.shape[2] > 3:
mask = mask[:, :, :3]
elif mask.ndim == 3 and mask.shape[2] == 1:
mask = np.squeeze(mask)
polygons = mask_to_polygons(mask, simplify=False, threshold=30)
save_polygons_to_shapefile(polygons, transform, crs, str(shp_output_path))
def run_oilshape(file_id: str, georef_cache: Optional[dict] = None):
"""
file_id 기준 마스크에서 유류 폴리곤을 추출하여 Shapefile로 저장한다.
Args:
file_id: 처리할 세션 식별자.
georef_cache: run_georeference()의 반환값. 있으면 Mask_Tif 디스크 읽기 생략.
{image_filename: {'mask': ndarray, 'transform': ..., 'crs': ...}}
결과: mx15hdi/Polygon/Shp/{file_id}/*.shp
"""
output_shp_folder = _MX15HDI_DIR / 'Polygon' / 'Shp' / file_id
os.makedirs(output_shp_folder, exist_ok=True)
if georef_cache:
# In-memory 경로: Mask_Tif 디스크 읽기 없이 메모리 배열 사용
for filename, entry in georef_cache.items():
mask_data = entry.get('mask')
transform = entry.get('transform')
crs = entry.get('crs')
if mask_data is None:
print(f"mask 없음, 건너뜀: {filename}")
continue
_process_mask_entry(filename, mask_data, transform, crs, output_shp_folder)
else:
# 디스크 폴백: Mask_Tif 폴더에서 읽기
mask_folder = _MX15HDI_DIR / 'Georeference' / 'Mask_Tif' / file_id
if not mask_folder.exists():
raise FileNotFoundError(f"마스크 폴더가 존재하지 않습니다: {mask_folder}")
for filename in os.listdir(mask_folder):
if not filename.endswith(".tif"):
continue
tif_mask_path = mask_folder / filename
with rasterio.open(tif_mask_path) as src:
mask_data = src.read()
transform = src.transform
crs = src.crs
_process_mask_entry(filename, mask_data, transform, crs, output_shp_folder)
if __name__ == "__main__":
if len(sys.argv) < 2:
raise ValueError("파라미터가 제공되지 않았습니다. 폴더 이름을 명령줄 인자로 입력해주세요.")
run_oilshape(sys.argv[1])