- prediction/image/ FastAPI 서버 Docker 환경 구성 - Dockerfile: PyTorch 2.1 + CUDA 12.1 기반 GPU 이미지 - docker-compose.yml: GPU 할당 + 데이터 볼륨 마운트 - requirements.txt: 서버 의존성 목록 - .env.example: 환경변수 템플릿 - DOCKER_USAGE.md: 빌드/실행/API 사용법 문서 - Dockerfile에 .dockerignore 제외 폴더 mkdir -p 추가 - .gitignore: prediction/image 결과물 및 모델 가중치(.pth) 제외 추가 - dbInsert_csv.py, dbInsert_shp.py 삭제 (미사용 DB 로직) - api.py: dbInsert import 및 주석 처리된 DB 호출 코드 제거 - aerialRouter.ts: req.params 타입 오류 수정
520 lines
18 KiB
Python
520 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
pic_gps.py
|
|
드론/폰 사진 폴더를 합성(스티칭)해서 한 장으로 저장하는 스크립트.
|
|
+ GPS 정보 보존 및 중앙 이미지 찾기 기능 추가
|
|
|
|
✅ 새로운 기능
|
|
- GPS 정보가 있는 이미지들의 중앙 좌표를 계산하여 결과 이미지에 저장
|
|
- 가장 중앙에 위치한 원본 이미지를 찾아 출력
|
|
- --gps-strategy 옵션으로 GPS 저장 방식 선택 (center/first)
|
|
|
|
설치:
|
|
pip install opencv-contrib-python numpy pillow piexif
|
|
|
|
예시:
|
|
python pic_gps.py --mode drone --input "./photo/drone" --out "./out/drone.jpg" --enhance
|
|
python pic_gps.py --gps-strategy center # GPS 중앙값 사용
|
|
python pic_gps.py --gps-strategy first # 첫 이미지 GPS 사용
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
import glob
|
|
from typing import List, Tuple, Optional, Dict
|
|
import math
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from PIL import Image
|
|
import piexif
|
|
|
|
IMG_EXTS = (".jpg", ".jpeg", ".png", ".tif", ".tiff", ".bmp", ".webp")
|
|
|
|
|
|
def list_images(folder: str, prefix: str = "") -> List[str]:
|
|
if not os.path.isdir(folder):
|
|
return []
|
|
paths: List[str] = []
|
|
|
|
pattern_prefix = f"{prefix}*" if prefix else "*"
|
|
|
|
for ext in IMG_EXTS:
|
|
paths.extend(glob.glob(os.path.join(folder, f"{pattern_prefix}{ext}")))
|
|
paths.extend(glob.glob(os.path.join(folder, f"{pattern_prefix}{ext.upper()}")))
|
|
return sorted(set(paths))
|
|
|
|
|
|
def load_images(paths: List[str]) -> List[np.ndarray]:
|
|
images: List[np.ndarray] = []
|
|
for p in paths:
|
|
img = cv2.imread(p, cv2.IMREAD_COLOR)
|
|
if img is None:
|
|
print(f"[WARN] 이미지 읽기 실패: {p}", file=sys.stderr)
|
|
continue
|
|
images.append(img)
|
|
return images
|
|
|
|
|
|
def resize_max_dim(img: np.ndarray, max_dim: int) -> np.ndarray:
|
|
if max_dim <= 0:
|
|
return img
|
|
h, w = img.shape[:2]
|
|
m = max(h, w)
|
|
if m <= max_dim:
|
|
return img
|
|
scale = max_dim / float(m)
|
|
new_w = int(round(w * scale))
|
|
new_h = int(round(h * scale))
|
|
return cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
|
|
|
|
|
|
def clahe_contrast(img: np.ndarray) -> np.ndarray:
|
|
"""대비 보정: 바다/유막처럼 특징점 약할 때 도움"""
|
|
lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
|
|
l, a, b = cv2.split(lab)
|
|
clahe = cv2.createCLAHE(clipLimit=2.5, tileGridSize=(8, 8))
|
|
l2 = clahe.apply(l)
|
|
lab2 = cv2.merge([l2, a, b])
|
|
return cv2.cvtColor(lab2, cv2.COLOR_LAB2BGR)
|
|
|
|
|
|
def preprocess(images: List[np.ndarray], max_dim: int, enhance: bool) -> List[np.ndarray]:
|
|
out: List[np.ndarray] = []
|
|
for img in images:
|
|
x = resize_max_dim(img, max_dim)
|
|
if enhance:
|
|
x = clahe_contrast(x)
|
|
out.append(x)
|
|
return out
|
|
|
|
|
|
def stitch_with_mode(images: List[np.ndarray], stitch_mode: str) -> Tuple[int, np.ndarray]:
|
|
"""
|
|
stitch_mode: "PANORAMA" | "SCANS"
|
|
returns (status, pano)
|
|
"""
|
|
if stitch_mode.upper() == "SCANS":
|
|
mode = cv2.Stitcher_SCANS
|
|
else:
|
|
mode = cv2.Stitcher_PANORAMA
|
|
|
|
stitcher = cv2.Stitcher_create(mode)
|
|
|
|
try:
|
|
stitcher.setPanoConfidenceThresh(0.5)
|
|
except Exception:
|
|
pass
|
|
|
|
status, pano = stitcher.stitch(images)
|
|
return status, pano
|
|
|
|
|
|
def run_stitch(mode: str, images: List[np.ndarray], try_fallback: bool) -> np.ndarray:
|
|
"""
|
|
mode: "drone" | "phone"
|
|
- drone: SCANS 우선 → 실패 시 PANORAMA
|
|
- phone: PANORAMA 우선 → 실패 시 SCANS
|
|
"""
|
|
mode = mode.lower().strip()
|
|
if mode not in ("drone", "phone"):
|
|
raise ValueError("mode는 'drone' 또는 'phone' 이어야 합니다.")
|
|
|
|
primary = "SCANS" if mode == "drone" else "PANORAMA"
|
|
secondary = "PANORAMA" if primary == "SCANS" else "SCANS"
|
|
|
|
status, pano = stitch_with_mode(images, primary)
|
|
if status == cv2.Stitcher_OK:
|
|
print(f"[OK] Stitch success with {primary}")
|
|
return pano
|
|
|
|
print(f"[WARN] Stitch failed with {primary}. status={status}", file=sys.stderr)
|
|
|
|
if try_fallback:
|
|
status2, pano2 = stitch_with_mode(images, secondary)
|
|
if status2 == cv2.Stitcher_OK:
|
|
print(f"[OK] Stitch success with fallback {secondary}")
|
|
return pano2
|
|
print(f"[ERR] Stitch failed with fallback {secondary}. status={status2}", file=sys.stderr)
|
|
|
|
raise RuntimeError(
|
|
"스티칭에 실패했습니다.\n"
|
|
"가능 원인: 사진 겹침 부족 / 흔들림 / 시점차 과다 / 바다만 가득(특징점 부족)\n"
|
|
"해결 팁:\n"
|
|
"- 사진 간 30% 이상 겹침\n"
|
|
"- 해안선/부두/선박/부표 등 고정 기준물 포함된 사진을 섞기\n"
|
|
"- --max-dim 1800 정도로 낮추고 --enhance 켜서 재시도\n"
|
|
)
|
|
|
|
|
|
# ========== GPS 관련 함수 ==========
|
|
|
|
def dms_to_decimal(dms: tuple, ref: str) -> float:
|
|
"""
|
|
DMS (Degrees, Minutes, Seconds) 형식을 십진수로 변환
|
|
dms: ((deg_num, deg_den), (min_num, min_den), (sec_num, sec_den))
|
|
ref: 'N', 'S', 'E', 'W'
|
|
"""
|
|
degrees = dms[0][0] / dms[0][1]
|
|
minutes = dms[1][0] / dms[1][1]
|
|
seconds = dms[2][0] / dms[2][1]
|
|
|
|
decimal = degrees + (minutes / 60.0) + (seconds / 3600.0)
|
|
|
|
if ref in ['S', 'W']:
|
|
decimal = -decimal
|
|
|
|
return decimal
|
|
|
|
|
|
def extract_datetime(image_path: str) -> Optional[str]:
|
|
"""
|
|
이미지에서 촬영 날짜/시간 추출
|
|
returns: 촬영 날짜 문자열 (YYYY:MM:DD HH:MM:SS) 또는 None
|
|
"""
|
|
try:
|
|
img = Image.open(image_path)
|
|
exif_data = img.info.get('exif')
|
|
if not exif_data:
|
|
return None
|
|
|
|
exif_dict = piexif.load(exif_data)
|
|
exif_info = exif_dict.get('Exif', {})
|
|
|
|
# DateTimeOriginal (원본 촬영 시간) 우선
|
|
if piexif.ExifIFD.DateTimeOriginal in exif_info:
|
|
datetime_bytes = exif_info[piexif.ExifIFD.DateTimeOriginal]
|
|
return datetime_bytes.decode('utf-8')
|
|
|
|
# DateTime (파일 수정 시간)
|
|
if piexif.ExifIFD.DateTime in exif_info:
|
|
datetime_bytes = exif_info[piexif.ExifIFD.DateTime]
|
|
return datetime_bytes.decode('utf-8')
|
|
|
|
# 0th IFD의 DateTime
|
|
zeroth_info = exif_dict.get('0th', {})
|
|
if piexif.ImageIFD.DateTime in zeroth_info:
|
|
datetime_bytes = zeroth_info[piexif.ImageIFD.DateTime]
|
|
return datetime_bytes.decode('utf-8')
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"[WARN] 날짜 추출 실패 ({image_path}): {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def extract_gps(image_path: str) -> Optional[Tuple[float, float, Optional[float]]]:
|
|
"""
|
|
이미지에서 GPS 정보 추출
|
|
returns: (latitude, longitude, altitude) 또는 None
|
|
"""
|
|
try:
|
|
img = Image.open(image_path)
|
|
exif_data = img.info.get('exif')
|
|
if not exif_data:
|
|
return None
|
|
|
|
exif_dict = piexif.load(exif_data)
|
|
gps_info = exif_dict.get('GPS', {})
|
|
|
|
if not gps_info:
|
|
return None
|
|
|
|
# 위도
|
|
if piexif.GPSIFD.GPSLatitude in gps_info and piexif.GPSIFD.GPSLatitudeRef in gps_info:
|
|
lat = dms_to_decimal(
|
|
gps_info[piexif.GPSIFD.GPSLatitude],
|
|
gps_info[piexif.GPSIFD.GPSLatitudeRef].decode()
|
|
)
|
|
else:
|
|
return None
|
|
|
|
# 경도
|
|
if piexif.GPSIFD.GPSLongitude in gps_info and piexif.GPSIFD.GPSLongitudeRef in gps_info:
|
|
lon = dms_to_decimal(
|
|
gps_info[piexif.GPSIFD.GPSLongitude],
|
|
gps_info[piexif.GPSIFD.GPSLongitudeRef].decode()
|
|
)
|
|
else:
|
|
return None
|
|
|
|
# 고도 (선택적)
|
|
altitude = None
|
|
if piexif.GPSIFD.GPSAltitude in gps_info:
|
|
alt_data = gps_info[piexif.GPSIFD.GPSAltitude]
|
|
altitude = alt_data[0] / alt_data[1]
|
|
|
|
return (lat, lon, altitude)
|
|
|
|
except Exception as e:
|
|
print(f"[WARN] GPS 추출 실패 ({image_path}): {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def decimal_to_dms(decimal: float) -> Tuple[tuple, str]:
|
|
"""
|
|
십진수 좌표를 DMS 형식으로 변환
|
|
returns: (((deg, 1), (min, 1), (sec, 100)), ref)
|
|
"""
|
|
is_positive = decimal >= 0
|
|
decimal = abs(decimal)
|
|
|
|
degrees = int(decimal)
|
|
minutes = int((decimal - degrees) * 60)
|
|
seconds = int(((decimal - degrees) * 60 - minutes) * 60 * 100)
|
|
|
|
return ((degrees, 1), (minutes, 1), (seconds, 100))
|
|
|
|
|
|
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
|
"""
|
|
두 GPS 좌표 간의 거리 계산 (미터)
|
|
Haversine 공식 사용
|
|
"""
|
|
R = 6371000 # 지구 반지름 (미터)
|
|
|
|
phi1 = math.radians(lat1)
|
|
phi2 = math.radians(lat2)
|
|
delta_phi = math.radians(lat2 - lat1)
|
|
delta_lambda = math.radians(lon2 - lon1)
|
|
|
|
a = math.sin(delta_phi / 2) ** 2 + \
|
|
math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda / 2) ** 2
|
|
c = 2 * math.asin(math.sqrt(a))
|
|
|
|
return R * c
|
|
|
|
|
|
def collect_gps_data(paths: List[str]) -> List[Tuple[str, float, float, Optional[float], Optional[str]]]:
|
|
"""
|
|
모든 이미지의 GPS 정보와 촬영 날짜 수집
|
|
returns: [(path, lat, lon, alt, datetime), ...]
|
|
"""
|
|
gps_data = []
|
|
for p in paths:
|
|
gps = extract_gps(p)
|
|
datetime_str = extract_datetime(p)
|
|
if gps:
|
|
gps_data.append((p, gps[0], gps[1], gps[2], datetime_str))
|
|
return gps_data
|
|
|
|
|
|
def find_center_image(gps_data: List[Tuple[str, float, float, Optional[float], Optional[str]]]) -> Tuple[str, float, float, Optional[str]]:
|
|
"""
|
|
GPS 좌표의 중심에 가장 가까운 이미지 찾기
|
|
returns: (center_image_path, center_lat, center_lon, datetime)
|
|
"""
|
|
if not gps_data:
|
|
raise ValueError("GPS 정보가 있는 이미지가 없습니다.")
|
|
|
|
# 중심 좌표 계산
|
|
center_lat = sum(d[1] for d in gps_data) / len(gps_data)
|
|
center_lon = sum(d[2] for d in gps_data) / len(gps_data)
|
|
|
|
# 중심에 가장 가까운 이미지 찾기
|
|
min_dist = float('inf')
|
|
center_image = gps_data[0][0]
|
|
center_datetime = gps_data[0][4]
|
|
|
|
for path, lat, lon, _, datetime_str in gps_data:
|
|
dist = haversine_distance(center_lat, center_lon, lat, lon)
|
|
if dist < min_dist:
|
|
min_dist = dist
|
|
center_image = path
|
|
center_datetime = datetime_str
|
|
|
|
print(f"[GPS] 중심 좌표: ({center_lat:.6f}, {center_lon:.6f})")
|
|
print(f"[GPS] 중앙 이미지: {os.path.basename(center_image)} (중심으로부터 {min_dist:.1f}m)")
|
|
if center_datetime:
|
|
print(f"[GPS] 촬영 날짜: {center_datetime}")
|
|
|
|
return center_image, center_lat, center_lon, center_datetime
|
|
|
|
|
|
def create_gps_exif(lat: float, lon: float, alt: Optional[float] = None, datetime_str: Optional[str] = None) -> bytes:
|
|
"""
|
|
GPS 좌표와 촬영 날짜로 EXIF 데이터 생성
|
|
datetime_str: "YYYY:MM:DD HH:MM:SS" 형식
|
|
"""
|
|
exif_dict = {
|
|
"GPS": {},
|
|
"Exif": {},
|
|
"0th": {}
|
|
}
|
|
|
|
# 위도
|
|
lat_dms = decimal_to_dms(abs(lat))
|
|
exif_dict["GPS"][piexif.GPSIFD.GPSLatitude] = lat_dms
|
|
exif_dict["GPS"][piexif.GPSIFD.GPSLatitudeRef] = b'N' if lat >= 0 else b'S'
|
|
|
|
# 경도
|
|
lon_dms = decimal_to_dms(abs(lon))
|
|
exif_dict["GPS"][piexif.GPSIFD.GPSLongitude] = lon_dms
|
|
exif_dict["GPS"][piexif.GPSIFD.GPSLongitudeRef] = b'E' if lon >= 0 else b'W'
|
|
|
|
# 고도 (있는 경우)
|
|
if alt is not None:
|
|
exif_dict["GPS"][piexif.GPSIFD.GPSAltitude] = (int(alt * 100), 100)
|
|
exif_dict["GPS"][piexif.GPSIFD.GPSAltitudeRef] = 0 # 해발
|
|
|
|
# 촬영 날짜/시간 (있는 경우)
|
|
if datetime_str:
|
|
datetime_bytes = datetime_str.encode('utf-8')
|
|
exif_dict["Exif"][piexif.ExifIFD.DateTimeOriginal] = datetime_bytes
|
|
exif_dict["Exif"][piexif.ExifIFD.DateTimeDigitized] = datetime_bytes
|
|
exif_dict["0th"][piexif.ImageIFD.DateTime] = datetime_bytes
|
|
|
|
return piexif.dump(exif_dict)
|
|
|
|
|
|
def save_image_with_gps(
|
|
pano: np.ndarray,
|
|
output_path: str,
|
|
lat: float,
|
|
lon: float,
|
|
alt: Optional[float] = None,
|
|
datetime_str: Optional[str] = None
|
|
) -> None:
|
|
"""
|
|
GPS 정보와 촬영 날짜를 포함하여 이미지 저장
|
|
"""
|
|
# OpenCV BGR을 PIL RGB로 변환
|
|
pano_rgb = cv2.cvtColor(pano, cv2.COLOR_BGR2RGB)
|
|
pil_image = Image.fromarray(pano_rgb)
|
|
|
|
# GPS EXIF 생성
|
|
exif_bytes = create_gps_exif(lat, lon, alt, datetime_str)
|
|
|
|
# 저장
|
|
pil_image.save(output_path, exif=exif_bytes, quality=95)
|
|
print(f"[GPS] GPS 정보 저장 완료: ({lat:.6f}, {lon:.6f})")
|
|
if datetime_str:
|
|
print(f"[GPS] 촬영 날짜 저장 완료: {datetime_str}")
|
|
|
|
|
|
# ========== 기존 함수 ==========
|
|
|
|
# def guess_defaults() -> Tuple[str, str]:
|
|
# """
|
|
# 인자 없을 때 wing/photo/drone 같은 실제 구조를 자동으로 잡도록 탐색.
|
|
# """
|
|
# base = os.getcwd()
|
|
|
|
# candidates = [
|
|
# (os.path.join(base, "photo", "drone"), "drone"),
|
|
# (os.path.join(base, "photo", "phone"), "phone"),
|
|
# (os.path.join(base, "photo"), "phone"),
|
|
# (os.path.join(base, "photos", "drone"), "drone"),
|
|
# (os.path.join(base, "photos", "phone"), "phone"),
|
|
# (os.path.join(base, "photos"), "phone"),
|
|
# ]
|
|
|
|
# for folder, mode in candidates:
|
|
# paths = list_images(folder)
|
|
# if len(paths) >= 2:
|
|
# return mode, folder
|
|
|
|
# return "phone", os.path.join(base, "photo")
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="드론/폰 사진 폴더 합성(스티칭) + GPS 보존")
|
|
ap.add_argument("--mode", choices=["drone", "phone"], help="촬영 유형 (drone/phone)")
|
|
ap.add_argument("--input", help="이미지 폴더 경로")
|
|
ap.add_argument("--out", default="", help="출력 파일 경로 (비우면 자동 생성)")
|
|
ap.add_argument("--model", default="", help="합성에 사용될 카메라 모델")
|
|
ap.add_argument("--max-dim", type=int, default=2600, help="긴 변 기준 리사이즈(0이면 원본)")
|
|
ap.add_argument("--enhance", action="store_true", help="대비 보정(CLAHE)")
|
|
ap.add_argument("--try-fallback", action="store_true", help="실패 시 다른 모드로 재시도")
|
|
ap.add_argument("--debug", action="store_true", help="디버그 출력(파일 목록 일부 표시)")
|
|
ap.add_argument(
|
|
"--gps-strategy",
|
|
choices=["center", "first", "none"],
|
|
default="center",
|
|
help="GPS 저장 방식: center(중앙값), first(첫 이미지), none(저장안함)"
|
|
)
|
|
args = ap.parse_args()
|
|
|
|
# 인자 없을 때도 동작
|
|
# if not args.mode or not args.input:
|
|
# g_mode, g_input = guess_defaults()
|
|
# args.mode = args.mode or g_mode
|
|
# args.input = args.input or g_input
|
|
# print(f"[INFO] args missing -> auto defaults: --mode {args.mode} --input {args.input}")
|
|
|
|
paths = list_images(args.input, args.model)
|
|
if args.debug:
|
|
print(f"[DEBUG] input folder: {args.input}")
|
|
print(f"[DEBUG] found paths: {len(paths)}")
|
|
for p in paths[:10]:
|
|
print(f" - {os.path.basename(p)}")
|
|
|
|
if len(paths) < 2:
|
|
raise RuntimeError(f"합성할 이미지가 2장 이상 필요합니다. input={args.input}")
|
|
|
|
# GPS 정보 수집
|
|
gps_data = collect_gps_data(paths)
|
|
if gps_data:
|
|
print(f"[GPS] GPS 정보가 있는 이미지: {len(gps_data)}/{len(paths)}장")
|
|
else:
|
|
print("[GPS] GPS 정보가 있는 이미지가 없습니다.")
|
|
|
|
# 중앙 이미지 찾기
|
|
center_image_path = None
|
|
gps_lat, gps_lon, gps_alt, gps_datetime = None, None, None, None
|
|
|
|
if gps_data and args.gps_strategy != "none":
|
|
if args.gps_strategy == "center":
|
|
center_image_path, gps_lat, gps_lon, gps_datetime = find_center_image(gps_data)
|
|
# 중앙 이미지의 고도 사용
|
|
for path, lat, lon, alt, dt in gps_data:
|
|
if path == center_image_path:
|
|
gps_alt = alt
|
|
break
|
|
elif args.gps_strategy == "first":
|
|
# 첫 번째 GPS 있는 이미지 사용
|
|
first_gps = gps_data[0]
|
|
center_image_path = first_gps[0]
|
|
gps_lat, gps_lon, gps_alt, gps_datetime = first_gps[1], first_gps[2], first_gps[3], first_gps[4]
|
|
print(f"[GPS] 첫 번째 이미지 GPS 사용: {os.path.basename(center_image_path)}")
|
|
if gps_datetime:
|
|
print(f"[GPS] 촬영 날짜: {gps_datetime}")
|
|
|
|
# 이미지 로드 및 전처리
|
|
images = load_images(paths)
|
|
if len(images) < 2:
|
|
raise RuntimeError("읽을 수 있는 이미지가 2장 미만입니다.")
|
|
|
|
images = preprocess(images, max_dim=args.max_dim, enhance=args.enhance)
|
|
|
|
# 스티칭
|
|
pano = run_stitch(args.mode, images, try_fallback=args.try_fallback)
|
|
|
|
# 출력 경로 자동 생성
|
|
if not args.out.strip():
|
|
out_dir = os.path.join(os.getcwd(), "out")
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
args.out = os.path.join(out_dir, f"{args.mode}_stitched.jpg")
|
|
|
|
out_dir = os.path.dirname(os.path.abspath(args.out))
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
|
|
# GPS 정보와 함께 저장
|
|
if gps_lat is not None and gps_lon is not None:
|
|
save_image_with_gps(pano, args.out, gps_lat, gps_lon, gps_alt, gps_datetime)
|
|
else:
|
|
# GPS 정보 없으면 일반 저장
|
|
if not cv2.imwrite(args.out, pano):
|
|
raise RuntimeError(f"저장 실패: {args.out}")
|
|
print(f"[INFO] GPS 정보 없이 저장")
|
|
|
|
h, w = pano.shape[:2]
|
|
print(f"[DONE] saved: {args.out} (size={w}x{h})")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |