wing-ops/prediction/image/pic_gps.py
jeonghyo.k 3946ff6a25 feat(prediction): 이미지 분석 서버 Docker 패키징 + DB 코드 제거
- prediction/image/ FastAPI 서버 Docker 환경 구성
  - Dockerfile: PyTorch 2.1 + CUDA 12.1 기반 GPU 이미지
  - docker-compose.yml: GPU 할당 + 데이터 볼륨 마운트
  - requirements.txt: 서버 의존성 목록
  - .env.example: 환경변수 템플릿
  - DOCKER_USAGE.md: 빌드/실행/API 사용법 문서
  - Dockerfile에 .dockerignore 제외 폴더 mkdir -p 추가
- .gitignore: prediction/image 결과물 및 모델 가중치(.pth) 제외 추가
- dbInsert_csv.py, dbInsert_shp.py 삭제 (미사용 DB 로직)
- api.py: dbInsert import 및 주석 처리된 DB 호출 코드 제거
- aerialRouter.ts: req.params 타입 오류 수정
2026-03-10 18:37:36 +09:00

520 lines
18 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
pic_gps.py
드론/폰 사진 폴더를 합성(스티칭)해서 한 장으로 저장하는 스크립트.
+ GPS 정보 보존 및 중앙 이미지 찾기 기능 추가
✅ 새로운 기능
- GPS 정보가 있는 이미지들의 중앙 좌표를 계산하여 결과 이미지에 저장
- 가장 중앙에 위치한 원본 이미지를 찾아 출력
- --gps-strategy 옵션으로 GPS 저장 방식 선택 (center/first)
설치:
pip install opencv-contrib-python numpy pillow piexif
예시:
python pic_gps.py --mode drone --input "./photo/drone" --out "./out/drone.jpg" --enhance
python pic_gps.py --gps-strategy center # GPS 중앙값 사용
python pic_gps.py --gps-strategy first # 첫 이미지 GPS 사용
"""
import argparse
import os
import sys
import glob
from typing import List, Tuple, Optional, Dict
import math
import cv2
import numpy as np
from PIL import Image
import piexif
IMG_EXTS = (".jpg", ".jpeg", ".png", ".tif", ".tiff", ".bmp", ".webp")
def list_images(folder: str, prefix: str = "") -> List[str]:
if not os.path.isdir(folder):
return []
paths: List[str] = []
pattern_prefix = f"{prefix}*" if prefix else "*"
for ext in IMG_EXTS:
paths.extend(glob.glob(os.path.join(folder, f"{pattern_prefix}{ext}")))
paths.extend(glob.glob(os.path.join(folder, f"{pattern_prefix}{ext.upper()}")))
return sorted(set(paths))
def load_images(paths: List[str]) -> List[np.ndarray]:
images: List[np.ndarray] = []
for p in paths:
img = cv2.imread(p, cv2.IMREAD_COLOR)
if img is None:
print(f"[WARN] 이미지 읽기 실패: {p}", file=sys.stderr)
continue
images.append(img)
return images
def resize_max_dim(img: np.ndarray, max_dim: int) -> np.ndarray:
if max_dim <= 0:
return img
h, w = img.shape[:2]
m = max(h, w)
if m <= max_dim:
return img
scale = max_dim / float(m)
new_w = int(round(w * scale))
new_h = int(round(h * scale))
return cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
def clahe_contrast(img: np.ndarray) -> np.ndarray:
"""대비 보정: 바다/유막처럼 특징점 약할 때 도움"""
lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=2.5, tileGridSize=(8, 8))
l2 = clahe.apply(l)
lab2 = cv2.merge([l2, a, b])
return cv2.cvtColor(lab2, cv2.COLOR_LAB2BGR)
def preprocess(images: List[np.ndarray], max_dim: int, enhance: bool) -> List[np.ndarray]:
out: List[np.ndarray] = []
for img in images:
x = resize_max_dim(img, max_dim)
if enhance:
x = clahe_contrast(x)
out.append(x)
return out
def stitch_with_mode(images: List[np.ndarray], stitch_mode: str) -> Tuple[int, np.ndarray]:
"""
stitch_mode: "PANORAMA" | "SCANS"
returns (status, pano)
"""
if stitch_mode.upper() == "SCANS":
mode = cv2.Stitcher_SCANS
else:
mode = cv2.Stitcher_PANORAMA
stitcher = cv2.Stitcher_create(mode)
try:
stitcher.setPanoConfidenceThresh(0.5)
except Exception:
pass
status, pano = stitcher.stitch(images)
return status, pano
def run_stitch(mode: str, images: List[np.ndarray], try_fallback: bool) -> np.ndarray:
"""
mode: "drone" | "phone"
- drone: SCANS 우선 → 실패 시 PANORAMA
- phone: PANORAMA 우선 → 실패 시 SCANS
"""
mode = mode.lower().strip()
if mode not in ("drone", "phone"):
raise ValueError("mode는 'drone' 또는 'phone' 이어야 합니다.")
primary = "SCANS" if mode == "drone" else "PANORAMA"
secondary = "PANORAMA" if primary == "SCANS" else "SCANS"
status, pano = stitch_with_mode(images, primary)
if status == cv2.Stitcher_OK:
print(f"[OK] Stitch success with {primary}")
return pano
print(f"[WARN] Stitch failed with {primary}. status={status}", file=sys.stderr)
if try_fallback:
status2, pano2 = stitch_with_mode(images, secondary)
if status2 == cv2.Stitcher_OK:
print(f"[OK] Stitch success with fallback {secondary}")
return pano2
print(f"[ERR] Stitch failed with fallback {secondary}. status={status2}", file=sys.stderr)
raise RuntimeError(
"스티칭에 실패했습니다.\n"
"가능 원인: 사진 겹침 부족 / 흔들림 / 시점차 과다 / 바다만 가득(특징점 부족)\n"
"해결 팁:\n"
"- 사진 간 30% 이상 겹침\n"
"- 해안선/부두/선박/부표 등 고정 기준물 포함된 사진을 섞기\n"
"- --max-dim 1800 정도로 낮추고 --enhance 켜서 재시도\n"
)
# ========== GPS 관련 함수 ==========
def dms_to_decimal(dms: tuple, ref: str) -> float:
"""
DMS (Degrees, Minutes, Seconds) 형식을 십진수로 변환
dms: ((deg_num, deg_den), (min_num, min_den), (sec_num, sec_den))
ref: 'N', 'S', 'E', 'W'
"""
degrees = dms[0][0] / dms[0][1]
minutes = dms[1][0] / dms[1][1]
seconds = dms[2][0] / dms[2][1]
decimal = degrees + (minutes / 60.0) + (seconds / 3600.0)
if ref in ['S', 'W']:
decimal = -decimal
return decimal
def extract_datetime(image_path: str) -> Optional[str]:
"""
이미지에서 촬영 날짜/시간 추출
returns: 촬영 날짜 문자열 (YYYY:MM:DD HH:MM:SS) 또는 None
"""
try:
img = Image.open(image_path)
exif_data = img.info.get('exif')
if not exif_data:
return None
exif_dict = piexif.load(exif_data)
exif_info = exif_dict.get('Exif', {})
# DateTimeOriginal (원본 촬영 시간) 우선
if piexif.ExifIFD.DateTimeOriginal in exif_info:
datetime_bytes = exif_info[piexif.ExifIFD.DateTimeOriginal]
return datetime_bytes.decode('utf-8')
# DateTime (파일 수정 시간)
if piexif.ExifIFD.DateTime in exif_info:
datetime_bytes = exif_info[piexif.ExifIFD.DateTime]
return datetime_bytes.decode('utf-8')
# 0th IFD의 DateTime
zeroth_info = exif_dict.get('0th', {})
if piexif.ImageIFD.DateTime in zeroth_info:
datetime_bytes = zeroth_info[piexif.ImageIFD.DateTime]
return datetime_bytes.decode('utf-8')
return None
except Exception as e:
print(f"[WARN] 날짜 추출 실패 ({image_path}): {e}", file=sys.stderr)
return None
def extract_gps(image_path: str) -> Optional[Tuple[float, float, Optional[float]]]:
"""
이미지에서 GPS 정보 추출
returns: (latitude, longitude, altitude) 또는 None
"""
try:
img = Image.open(image_path)
exif_data = img.info.get('exif')
if not exif_data:
return None
exif_dict = piexif.load(exif_data)
gps_info = exif_dict.get('GPS', {})
if not gps_info:
return None
# 위도
if piexif.GPSIFD.GPSLatitude in gps_info and piexif.GPSIFD.GPSLatitudeRef in gps_info:
lat = dms_to_decimal(
gps_info[piexif.GPSIFD.GPSLatitude],
gps_info[piexif.GPSIFD.GPSLatitudeRef].decode()
)
else:
return None
# 경도
if piexif.GPSIFD.GPSLongitude in gps_info and piexif.GPSIFD.GPSLongitudeRef in gps_info:
lon = dms_to_decimal(
gps_info[piexif.GPSIFD.GPSLongitude],
gps_info[piexif.GPSIFD.GPSLongitudeRef].decode()
)
else:
return None
# 고도 (선택적)
altitude = None
if piexif.GPSIFD.GPSAltitude in gps_info:
alt_data = gps_info[piexif.GPSIFD.GPSAltitude]
altitude = alt_data[0] / alt_data[1]
return (lat, lon, altitude)
except Exception as e:
print(f"[WARN] GPS 추출 실패 ({image_path}): {e}", file=sys.stderr)
return None
def decimal_to_dms(decimal: float) -> Tuple[tuple, str]:
"""
십진수 좌표를 DMS 형식으로 변환
returns: (((deg, 1), (min, 1), (sec, 100)), ref)
"""
is_positive = decimal >= 0
decimal = abs(decimal)
degrees = int(decimal)
minutes = int((decimal - degrees) * 60)
seconds = int(((decimal - degrees) * 60 - minutes) * 60 * 100)
return ((degrees, 1), (minutes, 1), (seconds, 100))
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""
두 GPS 좌표 간의 거리 계산 (미터)
Haversine 공식 사용
"""
R = 6371000 # 지구 반지름 (미터)
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
delta_phi = math.radians(lat2 - lat1)
delta_lambda = math.radians(lon2 - lon1)
a = math.sin(delta_phi / 2) ** 2 + \
math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda / 2) ** 2
c = 2 * math.asin(math.sqrt(a))
return R * c
def collect_gps_data(paths: List[str]) -> List[Tuple[str, float, float, Optional[float], Optional[str]]]:
"""
모든 이미지의 GPS 정보와 촬영 날짜 수집
returns: [(path, lat, lon, alt, datetime), ...]
"""
gps_data = []
for p in paths:
gps = extract_gps(p)
datetime_str = extract_datetime(p)
if gps:
gps_data.append((p, gps[0], gps[1], gps[2], datetime_str))
return gps_data
def find_center_image(gps_data: List[Tuple[str, float, float, Optional[float], Optional[str]]]) -> Tuple[str, float, float, Optional[str]]:
"""
GPS 좌표의 중심에 가장 가까운 이미지 찾기
returns: (center_image_path, center_lat, center_lon, datetime)
"""
if not gps_data:
raise ValueError("GPS 정보가 있는 이미지가 없습니다.")
# 중심 좌표 계산
center_lat = sum(d[1] for d in gps_data) / len(gps_data)
center_lon = sum(d[2] for d in gps_data) / len(gps_data)
# 중심에 가장 가까운 이미지 찾기
min_dist = float('inf')
center_image = gps_data[0][0]
center_datetime = gps_data[0][4]
for path, lat, lon, _, datetime_str in gps_data:
dist = haversine_distance(center_lat, center_lon, lat, lon)
if dist < min_dist:
min_dist = dist
center_image = path
center_datetime = datetime_str
print(f"[GPS] 중심 좌표: ({center_lat:.6f}, {center_lon:.6f})")
print(f"[GPS] 중앙 이미지: {os.path.basename(center_image)} (중심으로부터 {min_dist:.1f}m)")
if center_datetime:
print(f"[GPS] 촬영 날짜: {center_datetime}")
return center_image, center_lat, center_lon, center_datetime
def create_gps_exif(lat: float, lon: float, alt: Optional[float] = None, datetime_str: Optional[str] = None) -> bytes:
"""
GPS 좌표와 촬영 날짜로 EXIF 데이터 생성
datetime_str: "YYYY:MM:DD HH:MM:SS" 형식
"""
exif_dict = {
"GPS": {},
"Exif": {},
"0th": {}
}
# 위도
lat_dms = decimal_to_dms(abs(lat))
exif_dict["GPS"][piexif.GPSIFD.GPSLatitude] = lat_dms
exif_dict["GPS"][piexif.GPSIFD.GPSLatitudeRef] = b'N' if lat >= 0 else b'S'
# 경도
lon_dms = decimal_to_dms(abs(lon))
exif_dict["GPS"][piexif.GPSIFD.GPSLongitude] = lon_dms
exif_dict["GPS"][piexif.GPSIFD.GPSLongitudeRef] = b'E' if lon >= 0 else b'W'
# 고도 (있는 경우)
if alt is not None:
exif_dict["GPS"][piexif.GPSIFD.GPSAltitude] = (int(alt * 100), 100)
exif_dict["GPS"][piexif.GPSIFD.GPSAltitudeRef] = 0 # 해발
# 촬영 날짜/시간 (있는 경우)
if datetime_str:
datetime_bytes = datetime_str.encode('utf-8')
exif_dict["Exif"][piexif.ExifIFD.DateTimeOriginal] = datetime_bytes
exif_dict["Exif"][piexif.ExifIFD.DateTimeDigitized] = datetime_bytes
exif_dict["0th"][piexif.ImageIFD.DateTime] = datetime_bytes
return piexif.dump(exif_dict)
def save_image_with_gps(
pano: np.ndarray,
output_path: str,
lat: float,
lon: float,
alt: Optional[float] = None,
datetime_str: Optional[str] = None
) -> None:
"""
GPS 정보와 촬영 날짜를 포함하여 이미지 저장
"""
# OpenCV BGR을 PIL RGB로 변환
pano_rgb = cv2.cvtColor(pano, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(pano_rgb)
# GPS EXIF 생성
exif_bytes = create_gps_exif(lat, lon, alt, datetime_str)
# 저장
pil_image.save(output_path, exif=exif_bytes, quality=95)
print(f"[GPS] GPS 정보 저장 완료: ({lat:.6f}, {lon:.6f})")
if datetime_str:
print(f"[GPS] 촬영 날짜 저장 완료: {datetime_str}")
# ========== 기존 함수 ==========
# def guess_defaults() -> Tuple[str, str]:
# """
# 인자 없을 때 wing/photo/drone 같은 실제 구조를 자동으로 잡도록 탐색.
# """
# base = os.getcwd()
# candidates = [
# (os.path.join(base, "photo", "drone"), "drone"),
# (os.path.join(base, "photo", "phone"), "phone"),
# (os.path.join(base, "photo"), "phone"),
# (os.path.join(base, "photos", "drone"), "drone"),
# (os.path.join(base, "photos", "phone"), "phone"),
# (os.path.join(base, "photos"), "phone"),
# ]
# for folder, mode in candidates:
# paths = list_images(folder)
# if len(paths) >= 2:
# return mode, folder
# return "phone", os.path.join(base, "photo")
def main():
ap = argparse.ArgumentParser(description="드론/폰 사진 폴더 합성(스티칭) + GPS 보존")
ap.add_argument("--mode", choices=["drone", "phone"], help="촬영 유형 (drone/phone)")
ap.add_argument("--input", help="이미지 폴더 경로")
ap.add_argument("--out", default="", help="출력 파일 경로 (비우면 자동 생성)")
ap.add_argument("--model", default="", help="합성에 사용될 카메라 모델")
ap.add_argument("--max-dim", type=int, default=2600, help="긴 변 기준 리사이즈(0이면 원본)")
ap.add_argument("--enhance", action="store_true", help="대비 보정(CLAHE)")
ap.add_argument("--try-fallback", action="store_true", help="실패 시 다른 모드로 재시도")
ap.add_argument("--debug", action="store_true", help="디버그 출력(파일 목록 일부 표시)")
ap.add_argument(
"--gps-strategy",
choices=["center", "first", "none"],
default="center",
help="GPS 저장 방식: center(중앙값), first(첫 이미지), none(저장안함)"
)
args = ap.parse_args()
# 인자 없을 때도 동작
# if not args.mode or not args.input:
# g_mode, g_input = guess_defaults()
# args.mode = args.mode or g_mode
# args.input = args.input or g_input
# print(f"[INFO] args missing -> auto defaults: --mode {args.mode} --input {args.input}")
paths = list_images(args.input, args.model)
if args.debug:
print(f"[DEBUG] input folder: {args.input}")
print(f"[DEBUG] found paths: {len(paths)}")
for p in paths[:10]:
print(f" - {os.path.basename(p)}")
if len(paths) < 2:
raise RuntimeError(f"합성할 이미지가 2장 이상 필요합니다. input={args.input}")
# GPS 정보 수집
gps_data = collect_gps_data(paths)
if gps_data:
print(f"[GPS] GPS 정보가 있는 이미지: {len(gps_data)}/{len(paths)}")
else:
print("[GPS] GPS 정보가 있는 이미지가 없습니다.")
# 중앙 이미지 찾기
center_image_path = None
gps_lat, gps_lon, gps_alt, gps_datetime = None, None, None, None
if gps_data and args.gps_strategy != "none":
if args.gps_strategy == "center":
center_image_path, gps_lat, gps_lon, gps_datetime = find_center_image(gps_data)
# 중앙 이미지의 고도 사용
for path, lat, lon, alt, dt in gps_data:
if path == center_image_path:
gps_alt = alt
break
elif args.gps_strategy == "first":
# 첫 번째 GPS 있는 이미지 사용
first_gps = gps_data[0]
center_image_path = first_gps[0]
gps_lat, gps_lon, gps_alt, gps_datetime = first_gps[1], first_gps[2], first_gps[3], first_gps[4]
print(f"[GPS] 첫 번째 이미지 GPS 사용: {os.path.basename(center_image_path)}")
if gps_datetime:
print(f"[GPS] 촬영 날짜: {gps_datetime}")
# 이미지 로드 및 전처리
images = load_images(paths)
if len(images) < 2:
raise RuntimeError("읽을 수 있는 이미지가 2장 미만입니다.")
images = preprocess(images, max_dim=args.max_dim, enhance=args.enhance)
# 스티칭
pano = run_stitch(args.mode, images, try_fallback=args.try_fallback)
# 출력 경로 자동 생성
if not args.out.strip():
out_dir = os.path.join(os.getcwd(), "out")
os.makedirs(out_dir, exist_ok=True)
args.out = os.path.join(out_dir, f"{args.mode}_stitched.jpg")
out_dir = os.path.dirname(os.path.abspath(args.out))
os.makedirs(out_dir, exist_ok=True)
# GPS 정보와 함께 저장
if gps_lat is not None and gps_lon is not None:
save_image_with_gps(pano, args.out, gps_lat, gps_lon, gps_alt, gps_datetime)
else:
# GPS 정보 없으면 일반 저장
if not cv2.imwrite(args.out, pano):
raise RuntimeError(f"저장 실패: {args.out}")
print(f"[INFO] GPS 정보 없이 저장")
h, w = pano.shape[:2]
print(f"[DONE] saved: {args.out} (size={w}x{h})")
if __name__ == "__main__":
main()