wing-ops/prediction/image/mx15hdi/Metadata/Scripts/Export_Metadata_mx15hdi.py
jeonghyo.k 3946ff6a25 feat(prediction): 이미지 분석 서버 Docker 패키징 + DB 코드 제거
- prediction/image/ FastAPI 서버 Docker 환경 구성
  - Dockerfile: PyTorch 2.1 + CUDA 12.1 기반 GPU 이미지
  - docker-compose.yml: GPU 할당 + 데이터 볼륨 마운트
  - requirements.txt: 서버 의존성 목록
  - .env.example: 환경변수 템플릿
  - DOCKER_USAGE.md: 빌드/실행/API 사용법 문서
  - Dockerfile에 .dockerignore 제외 폴더 mkdir -p 추가
- .gitignore: prediction/image 결과물 및 모델 가중치(.pth) 제외 추가
- dbInsert_csv.py, dbInsert_shp.py 삭제 (미사용 DB 로직)
- api.py: dbInsert import 및 주석 처리된 DB 호출 코드 제거
- aerialRouter.ts: req.params 타입 오류 수정
2026-03-10 18:37:36 +09:00

396 lines
16 KiB
Python

# -*- coding: utf-8 -*-
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'
from PIL import Image
from PIL.ExifTags import TAGS, GPSTAGS
from datetime import datetime
import cv2
import csv
from tqdm import tqdm
from PIL import Image
import numpy as np
import pandas as pd
import re
import logging
import sys
from pathlib import Path
logging.getLogger("ppocr").setLevel(logging.WARNING)
# PaddleOCR는 geo_info() 호출 시 1회만 초기화 (레이지 로딩)
_ocr_engine = None
_SCRIPTS_DIR = Path(__file__).parent # mx15hdi/Metadata/Scripts/
_MX15HDI_DIR = _SCRIPTS_DIR.parent.parent # mx15hdi/
def _get_ocr_engine():
"""PaddleOCR 엔진을 최초 호출 시 초기화하여 반환한다."""
global _ocr_engine
if _ocr_engine is None:
from paddleocr import PaddleOCR
_ocr_engine = PaddleOCR(use_angle_cls=False, lang='en', det=False, rec=True)
return _ocr_engine
def convert_gps_to_degrees(gps_coords):
"""
Pillow GPS 좌표를 도(degrees) 단위로 변환
Args:
gps_coords: GPS 좌표 튜플 (degrees, minutes, seconds)
Returns:
float: 십진수 각도
"""
try:
d = float(gps_coords[0])
m = float(gps_coords[1])
s = float(gps_coords[2])
return d + (m / 60.0) + (s / 3600.0)
except (ZeroDivisionError, IndexError, AttributeError, TypeError):
return 0.0
def decimal_to_dms(decimal_degrees):
"""
십진수 좌표를 도분초(DMS) 형식으로 변환
"""
if decimal_degrees is None:
return None, None, None, False
is_negative = decimal_degrees < 0
decimal_degrees = abs(decimal_degrees)
degrees = int(decimal_degrees)
minutes_decimal = (decimal_degrees - degrees) * 60
minutes = int(minutes_decimal)
seconds = (minutes_decimal - minutes) * 60
return degrees, minutes, seconds, is_negative
class meta_info:
def extract_and_save_image_metadata(self, image_path, output_csv_path):
"""
단일 이미지 파일에서 EXIF 정보를 추출하고 결과를 CSV 파일로 저장합니다.
Args:
image_path: 정보를 추출할 단일 이미지 파일 경로.
output_csv_path: CSV 파일 저장 경로.
Returns:
Dict[str, Any]: 추출된 메타데이터 정보 딕셔너리 또는 파일이 없으면 None.
"""
# 1. 파일 존재 여부 확인
if not os.path.exists(image_path):
print(f"파일을 찾을 수 없습니다: {image_path}")
return None
image_path_list = os.listdir(image_path)
image_nm = image_path_list[0]
image_dir = os.path.join(image_path, image_nm)
# 2. 이미지 정보 추출 (Pillow 사용)
info = {
'datetime': None,
'latitude': None,
'longitude': None,
'altitude': None,
'lat_dms': (None, None, None, False),
'lon_dms': (None, None, None, False),
'date_parts': (None, None, None),
'time_parts': (None, None, None)
}
try:
# Pillow로 이미지 열기
image = Image.open(image_dir)
# EXIF 데이터 추출
exifdata = image.getexif()
if not exifdata:
print("EXIF 정보를 찾을 수 없습니다.")
# info 그대로 반환하여 빈 값이라도 CSV에 기록
# EXIF 태그를 딕셔너리로 변환
exif_dict = {}
for tag_id, value in exifdata.items():
tag = TAGS.get(tag_id, tag_id)
exif_dict[tag] = value
# 1) 촬영시간 추출
datetime_tags = ['DateTimeOriginal', 'DateTimeDigitized', 'DateTime']
for tag in datetime_tags:
if tag in exif_dict:
datetime_str = str(exif_dict[tag])
info['datetime'] = datetime_str
try:
dt_obj = datetime.strptime(datetime_str, "%Y:%m:%d %H:%M:%S")
info['date_parts'] = (dt_obj.day, dt_obj.month, dt_obj.year)
info['time_parts'] = (dt_obj.hour, dt_obj.minute, dt_obj.second)
except ValueError:
pass
break
# 2) GPS 정보 추출
gps_ifd = exifdata.get_ifd(0x8825) # GPS IFD 태그
if gps_ifd:
# GPS 데이터를 딕셔너리로 변환
gps_dict = {}
for tag_id, value in gps_ifd.items():
tag = GPSTAGS.get(tag_id, tag_id)
gps_dict[tag] = value
# 위도 추출
if 'GPSLatitude' in gps_dict and 'GPSLatitudeRef' in gps_dict:
lat = convert_gps_to_degrees(gps_dict['GPSLatitude'])
lat_ref = str(gps_dict['GPSLatitudeRef']).strip()
if lat_ref == 'S':
lat = -lat
info['latitude'] = lat
info['lat_dms'] = decimal_to_dms(lat)
# 경도 추출
if 'GPSLongitude' in gps_dict and 'GPSLongitudeRef' in gps_dict:
lon = convert_gps_to_degrees(gps_dict['GPSLongitude'])
lon_ref = str(gps_dict['GPSLongitudeRef']).strip()
if lon_ref == 'W':
lon = -lon
info['longitude'] = lon
info['lon_dms'] = decimal_to_dms(lon)
# 고도 추출
if 'GPSAltitude' in gps_dict:
try:
altitude = float(gps_dict['GPSAltitude'])
# 해수면 아래인지 확인
if 'GPSAltitudeRef' in gps_dict:
altitude_ref = gps_dict['GPSAltitudeRef']
if altitude_ref == 1: # 1 = 해수면 아래
altitude = -altitude
info['altitude'] = altitude
except (ValueError, TypeError):
info['altitude'] = None
image.close()
except Exception as e:
print(f"'{os.path.basename(image_path)}' 처리 중 오류 발생: {e}")
# 오류 발생 시에도 현재까지 추출된 info 반환
pass
# 3. 추출된 정보를 CSV 파일로 저장
try:
output_dir = os.path.dirname(output_csv_path)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
with open(output_csv_path, 'w', newline='', encoding='utf-8-sig') as csvfile:
# CSV 헤더 정의
fieldnames = [
'Filename',
'Tlat_d',
'Tlat_m',
'Tlat_s',
'Tlon_d',
'Tlon_m',
'Tlon_s',
'Alat_d',
'Alat_m',
'Alat_s',
'Alon_d',
'Alon_m',
'Alon_s',
'Az',
'El',
'Alt',
'Date1',
'Date2',
'Date3',
'Time1',
'Time2',
'Time3'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
# 정보 정리
lat_d, lat_m, lat_s, _ = info['lat_dms']
lon_d, lon_m, lon_s, _ = info['lon_dms']
day, month, year = info['date_parts']
hour, minute, second = info['time_parts']
row = {
'Filename': image_nm,
'Tlat_d': lat_d if lat_d is not None else '',
'Tlat_m': lat_m if lat_m is not None else '',
'Tlat_s': f"{lat_s:.4f}" if lat_s is not None else '',
'Tlon_d': lon_d if lon_d is not None else '',
'Tlon_m': lon_m if lon_m is not None else '',
'Tlon_s': f"{lon_s:.4f}" if lon_s is not None else '',
'Alat_d': lat_d if lat_d is not None else '',
'Alat_m': lat_m if lat_m is not None else '',
'Alat_s': f"{lat_s:.4f}" if lat_s is not None else '',
'Alon_d': lon_d if lon_d is not None else '',
'Alon_m': lon_m if lon_m is not None else '',
'Alon_s': f"{lon_s:.4f}" if lon_s is not None else '',
'Az': '',
'El': '',
'Alt': f"{info['altitude']:.2f}" if info['altitude'] is not None else '',
'Date1': day if day is not None else '',
'Date2': month if month is not None else '',
'Date3': year if year is not None else '',
'Time1': hour if hour is not None else '',
'Time2': minute if minute is not None else '',
'Time3': second if second is not None else ''
}
writer.writerow(row)
except Exception as e:
print(f"CSV 저장 중 오류 발생: {e}")
return info # 추출된 정보 반환
def geo_info(self, frame_folder_dir, output, positions_csv):
print("frame_folder_dir: ", frame_folder_dir)
meta_list = ["Filename", "Tlat_d", "Tlat_m", "Tlat_s",
"Tlon_d", "Tlon_m", "Tlon_s",
"Alat_d", "Alat_m", "Alat_s",
"Alon_d", "Alon_m", "Alon_s",
"Az", "El", "Alt",
"Date1", "Date2", "Date3",
"Time1", "Time2", "Time3"]
# Check if frame_folder_dir exists
if not os.path.exists(frame_folder_dir):
raise FileNotFoundError(f"이미지 폴더가 존재하지 않습니다: {frame_folder_dir}")
ocr_engine = _get_ocr_engine() # 레이지 초기화
positions = read_positions_from_csv(positions_csv)
frame_meta_list = []
frame_nm_list = os.listdir(frame_folder_dir)
debug_dir = "debug_empty_roi"
os.makedirs(debug_dir, exist_ok=True)
for frame_nm in tqdm(frame_nm_list):
frame_dir = os.path.join(frame_folder_dir, frame_nm)
frame = cv2.imread(frame_dir)
if frame is None:
print(f"이미지 로드 실패: {frame_dir}")
continue
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
_, thresh = cv2.threshold(gray, 140, 255, cv2.THRESH_BINARY)
dst2 = cv2.bitwise_not(thresh)
frame_dict = {"Filename": frame_nm}
for key in meta_list[1:]:
y1, y2, x1, x2 = positions.get(key, (0, 0, 0, 0))
if key == "El":
x1 = max(0, x1 - 10)
x2 = min(dst2.shape[1], x2 + 20)
roi = dst2[y1:y2, x1:x2]
if roi is None or roi.size == 0 or roi.shape[0] == 0 or roi.shape[1] == 0:
print(f"빈 ROI 발생 - key: {key}, frame: {frame_nm}, 좌표: y({y1}-{y2}), x({x1}-{x2})")
debug_path = os.path.join(debug_dir, f"{frame_nm}_{key}_empty.png")
cv2.imwrite(debug_path, dst2) # 전체 이미지 저장
empty_patch = np.zeros((50, 150), dtype=np.uint8)
cv2.imwrite(debug_path.replace("empty.png", "roi_patch.png"), empty_patch)
result = ""
else:
if key == "El":
roi = cv2.resize(roi, None, fx=2.5, fy=2.5, interpolation=cv2.INTER_CUBIC)
roi = cv2.GaussianBlur(roi, (3, 3), 0)
roi = cv2.adaptiveThreshold(roi, 255, cv2.ADAPTIVE_THRESH_MEAN_C,
cv2.THRESH_BINARY_INV, 11, 4)
text_result = ocr_engine.ocr(roi, cls=False)
digits = []
for line in text_result[0]:
text = line[1][0].replace(" ", "").strip()
if re.fullmatch(r"\d+", text):
x_center = (line[0][0][0] + line[0][2][0]) / 2
digits.append((x_center, text))
digits_sorted = sorted(digits, key=lambda x: x[0])
result = "".join([d[1] for d in digits_sorted])
elif key == "Alat_d":
roi = cv2.resize(roi, None, fx=2.5, fy=2.5, interpolation=cv2.INTER_CUBIC)
roi = cv2.GaussianBlur(roi, (3, 3), 0)
roi = cv2.adaptiveThreshold(roi, 255, cv2.ADAPTIVE_THRESH_MEAN_C,
cv2.THRESH_BINARY_INV, 11, 4)
text_result = ocr_engine.ocr(roi, cls=False)
roi_center_y = roi.shape[0] / 2
closest_box = None
closest_dist = float('inf')
result = ""
for line in text_result[0]:
text = line[1][0].replace(" ", "").strip()
conf = line[1][1]
box = line[0]
cy = (box[0][1] + box[2][1]) / 2
if re.match(r'^-?\d+(\.\d+)?$', text):
dist = abs(cy - roi_center_y)
if dist < closest_dist:
closest_dist = dist
result = text
else:
text = ocr_engine.ocr(roi, cls=False)
if text and text[0]:
result = text[0][0][1][0].replace(" ", "").strip()
else:
result = ""
frame_dict[key] = result
frame_meta_list.append(frame_dict)
os.makedirs(os.path.dirname(output), exist_ok=True)
with open(output, "w", encoding='utf-8-sig', newline='') as f:
writer = csv.DictWriter(f, fieldnames=meta_list)
writer.writeheader()
for data in frame_meta_list:
writer.writerow(data)
print(f"geo_info 생성 완료: {output}")
def interpolation(self, input, output):
df = pd.read_csv(input)
df.ffill(inplace=True) # fillna(method='ffill') deprecated → ffill()
df.bfill(inplace=True) # fillna(method='bfill') deprecated → bfill()
df.to_csv(output, index=False, encoding='utf-8-sig')
print(f"interpolation 저장 완료: {output}")
def run_metadata_export(file_id: str):
"""
file_id 기준으로 EXIF 추출 + 보간 CSV를 생성한다.
결과: mx15hdi/Metadata/CSV/{file_id}/mx15hdi_interpolation.csv
"""
img_path = str(_MX15HDI_DIR / 'Metadata' / 'Image' / 'Original_Images' / file_id)
csv_path = str(_MX15HDI_DIR / 'Metadata' / 'CSV' / file_id / 'mx15hdi.csv')
interp_csv_path = str(_MX15HDI_DIR / 'Metadata' / 'CSV' / file_id / 'mx15hdi_interpolation.csv')
i = meta_info()
i.extract_and_save_image_metadata(image_path=img_path, output_csv_path=csv_path)
i.interpolation(input=csv_path, output=interp_csv_path)
if __name__ == "__main__":
# Get parameter from command line
if len(sys.argv) < 2:
raise ValueError("파라미터가 제공되지 않았습니다. 폴더 이름을 명령줄 인자로 입력해주세요.")
param = sys.argv[1]
print("param: ", param)
run_metadata_export(param)