# -*- coding: utf-8 -*- import os os.environ['KMP_DUPLICATE_LIB_OK']='True' from PIL import Image from PIL.ExifTags import TAGS, GPSTAGS from datetime import datetime import cv2 import csv from tqdm import tqdm from PIL import Image import numpy as np import pandas as pd import re import logging import sys from pathlib import Path logging.getLogger("ppocr").setLevel(logging.WARNING) # PaddleOCR는 geo_info() 호출 시 1회만 초기화 (레이지 로딩) _ocr_engine = None _SCRIPTS_DIR = Path(__file__).parent # mx15hdi/Metadata/Scripts/ _MX15HDI_DIR = _SCRIPTS_DIR.parent.parent # mx15hdi/ def _get_ocr_engine(): """PaddleOCR 엔진을 최초 호출 시 초기화하여 반환한다.""" global _ocr_engine if _ocr_engine is None: from paddleocr import PaddleOCR _ocr_engine = PaddleOCR(use_angle_cls=False, lang='en', det=False, rec=True) return _ocr_engine def convert_gps_to_degrees(gps_coords): """ Pillow GPS 좌표를 도(degrees) 단위로 변환 Args: gps_coords: GPS 좌표 튜플 (degrees, minutes, seconds) Returns: float: 십진수 각도 """ try: d = float(gps_coords[0]) m = float(gps_coords[1]) s = float(gps_coords[2]) return d + (m / 60.0) + (s / 3600.0) except (ZeroDivisionError, IndexError, AttributeError, TypeError): return 0.0 def decimal_to_dms(decimal_degrees): """ 십진수 좌표를 도분초(DMS) 형식으로 변환 """ if decimal_degrees is None: return None, None, None, False is_negative = decimal_degrees < 0 decimal_degrees = abs(decimal_degrees) degrees = int(decimal_degrees) minutes_decimal = (decimal_degrees - degrees) * 60 minutes = int(minutes_decimal) seconds = (minutes_decimal - minutes) * 60 return degrees, minutes, seconds, is_negative class meta_info: def extract_and_save_image_metadata(self, image_path, output_csv_path): """ 단일 이미지 파일에서 EXIF 정보를 추출하고 결과를 CSV 파일로 저장합니다. Args: image_path: 정보를 추출할 단일 이미지 파일 경로. output_csv_path: CSV 파일 저장 경로. Returns: Dict[str, Any]: 추출된 메타데이터 정보 딕셔너리 또는 파일이 없으면 None. """ # 1. 파일 존재 여부 확인 if not os.path.exists(image_path): print(f"파일을 찾을 수 없습니다: {image_path}") return None image_path_list = os.listdir(image_path) image_nm = image_path_list[0] image_dir = os.path.join(image_path, image_nm) # 2. 이미지 정보 추출 (Pillow 사용) info = { 'datetime': None, 'latitude': None, 'longitude': None, 'altitude': None, 'lat_dms': (None, None, None, False), 'lon_dms': (None, None, None, False), 'date_parts': (None, None, None), 'time_parts': (None, None, None) } try: # Pillow로 이미지 열기 image = Image.open(image_dir) # EXIF 데이터 추출 exifdata = image.getexif() if not exifdata: print("EXIF 정보를 찾을 수 없습니다.") # info 그대로 반환하여 빈 값이라도 CSV에 기록 # EXIF 태그를 딕셔너리로 변환 exif_dict = {} for tag_id, value in exifdata.items(): tag = TAGS.get(tag_id, tag_id) exif_dict[tag] = value # 1) 촬영시간 추출 datetime_tags = ['DateTimeOriginal', 'DateTimeDigitized', 'DateTime'] for tag in datetime_tags: if tag in exif_dict: datetime_str = str(exif_dict[tag]) info['datetime'] = datetime_str try: dt_obj = datetime.strptime(datetime_str, "%Y:%m:%d %H:%M:%S") info['date_parts'] = (dt_obj.day, dt_obj.month, dt_obj.year) info['time_parts'] = (dt_obj.hour, dt_obj.minute, dt_obj.second) except ValueError: pass break # 2) GPS 정보 추출 gps_ifd = exifdata.get_ifd(0x8825) # GPS IFD 태그 if gps_ifd: # GPS 데이터를 딕셔너리로 변환 gps_dict = {} for tag_id, value in gps_ifd.items(): tag = GPSTAGS.get(tag_id, tag_id) gps_dict[tag] = value # 위도 추출 if 'GPSLatitude' in gps_dict and 'GPSLatitudeRef' in gps_dict: lat = convert_gps_to_degrees(gps_dict['GPSLatitude']) lat_ref = str(gps_dict['GPSLatitudeRef']).strip() if lat_ref == 'S': lat = -lat info['latitude'] = lat info['lat_dms'] = decimal_to_dms(lat) # 경도 추출 if 'GPSLongitude' in gps_dict and 'GPSLongitudeRef' in gps_dict: lon = convert_gps_to_degrees(gps_dict['GPSLongitude']) lon_ref = str(gps_dict['GPSLongitudeRef']).strip() if lon_ref == 'W': lon = -lon info['longitude'] = lon info['lon_dms'] = decimal_to_dms(lon) # 고도 추출 if 'GPSAltitude' in gps_dict: try: altitude = float(gps_dict['GPSAltitude']) # 해수면 아래인지 확인 if 'GPSAltitudeRef' in gps_dict: altitude_ref = gps_dict['GPSAltitudeRef'] if altitude_ref == 1: # 1 = 해수면 아래 altitude = -altitude info['altitude'] = altitude except (ValueError, TypeError): info['altitude'] = None image.close() except Exception as e: print(f"'{os.path.basename(image_path)}' 처리 중 오류 발생: {e}") # 오류 발생 시에도 현재까지 추출된 info 반환 pass # 3. 추출된 정보를 CSV 파일로 저장 try: output_dir = os.path.dirname(output_csv_path) if output_dir and not os.path.exists(output_dir): os.makedirs(output_dir) with open(output_csv_path, 'w', newline='', encoding='utf-8-sig') as csvfile: # CSV 헤더 정의 fieldnames = [ 'Filename', 'Tlat_d', 'Tlat_m', 'Tlat_s', 'Tlon_d', 'Tlon_m', 'Tlon_s', 'Alat_d', 'Alat_m', 'Alat_s', 'Alon_d', 'Alon_m', 'Alon_s', 'Az', 'El', 'Alt', 'Date1', 'Date2', 'Date3', 'Time1', 'Time2', 'Time3' ] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() # 정보 정리 lat_d, lat_m, lat_s, _ = info['lat_dms'] lon_d, lon_m, lon_s, _ = info['lon_dms'] day, month, year = info['date_parts'] hour, minute, second = info['time_parts'] row = { 'Filename': image_nm, 'Tlat_d': lat_d if lat_d is not None else '', 'Tlat_m': lat_m if lat_m is not None else '', 'Tlat_s': f"{lat_s:.4f}" if lat_s is not None else '', 'Tlon_d': lon_d if lon_d is not None else '', 'Tlon_m': lon_m if lon_m is not None else '', 'Tlon_s': f"{lon_s:.4f}" if lon_s is not None else '', 'Alat_d': lat_d if lat_d is not None else '', 'Alat_m': lat_m if lat_m is not None else '', 'Alat_s': f"{lat_s:.4f}" if lat_s is not None else '', 'Alon_d': lon_d if lon_d is not None else '', 'Alon_m': lon_m if lon_m is not None else '', 'Alon_s': f"{lon_s:.4f}" if lon_s is not None else '', 'Az': '', 'El': '', 'Alt': f"{info['altitude']:.2f}" if info['altitude'] is not None else '', 'Date1': day if day is not None else '', 'Date2': month if month is not None else '', 'Date3': year if year is not None else '', 'Time1': hour if hour is not None else '', 'Time2': minute if minute is not None else '', 'Time3': second if second is not None else '' } writer.writerow(row) except Exception as e: print(f"CSV 저장 중 오류 발생: {e}") return info # 추출된 정보 반환 def geo_info(self, frame_folder_dir, output, positions_csv): print("frame_folder_dir: ", frame_folder_dir) meta_list = ["Filename", "Tlat_d", "Tlat_m", "Tlat_s", "Tlon_d", "Tlon_m", "Tlon_s", "Alat_d", "Alat_m", "Alat_s", "Alon_d", "Alon_m", "Alon_s", "Az", "El", "Alt", "Date1", "Date2", "Date3", "Time1", "Time2", "Time3"] # Check if frame_folder_dir exists if not os.path.exists(frame_folder_dir): raise FileNotFoundError(f"이미지 폴더가 존재하지 않습니다: {frame_folder_dir}") ocr_engine = _get_ocr_engine() # 레이지 초기화 positions = read_positions_from_csv(positions_csv) frame_meta_list = [] frame_nm_list = os.listdir(frame_folder_dir) debug_dir = "debug_empty_roi" os.makedirs(debug_dir, exist_ok=True) for frame_nm in tqdm(frame_nm_list): frame_dir = os.path.join(frame_folder_dir, frame_nm) frame = cv2.imread(frame_dir) if frame is None: print(f"이미지 로드 실패: {frame_dir}") continue gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) _, thresh = cv2.threshold(gray, 140, 255, cv2.THRESH_BINARY) dst2 = cv2.bitwise_not(thresh) frame_dict = {"Filename": frame_nm} for key in meta_list[1:]: y1, y2, x1, x2 = positions.get(key, (0, 0, 0, 0)) if key == "El": x1 = max(0, x1 - 10) x2 = min(dst2.shape[1], x2 + 20) roi = dst2[y1:y2, x1:x2] if roi is None or roi.size == 0 or roi.shape[0] == 0 or roi.shape[1] == 0: print(f"빈 ROI 발생 - key: {key}, frame: {frame_nm}, 좌표: y({y1}-{y2}), x({x1}-{x2})") debug_path = os.path.join(debug_dir, f"{frame_nm}_{key}_empty.png") cv2.imwrite(debug_path, dst2) # 전체 이미지 저장 empty_patch = np.zeros((50, 150), dtype=np.uint8) cv2.imwrite(debug_path.replace("empty.png", "roi_patch.png"), empty_patch) result = "" else: if key == "El": roi = cv2.resize(roi, None, fx=2.5, fy=2.5, interpolation=cv2.INTER_CUBIC) roi = cv2.GaussianBlur(roi, (3, 3), 0) roi = cv2.adaptiveThreshold(roi, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY_INV, 11, 4) text_result = ocr_engine.ocr(roi, cls=False) digits = [] for line in text_result[0]: text = line[1][0].replace(" ", "").strip() if re.fullmatch(r"\d+", text): x_center = (line[0][0][0] + line[0][2][0]) / 2 digits.append((x_center, text)) digits_sorted = sorted(digits, key=lambda x: x[0]) result = "".join([d[1] for d in digits_sorted]) elif key == "Alat_d": roi = cv2.resize(roi, None, fx=2.5, fy=2.5, interpolation=cv2.INTER_CUBIC) roi = cv2.GaussianBlur(roi, (3, 3), 0) roi = cv2.adaptiveThreshold(roi, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY_INV, 11, 4) text_result = ocr_engine.ocr(roi, cls=False) roi_center_y = roi.shape[0] / 2 closest_box = None closest_dist = float('inf') result = "" for line in text_result[0]: text = line[1][0].replace(" ", "").strip() conf = line[1][1] box = line[0] cy = (box[0][1] + box[2][1]) / 2 if re.match(r'^-?\d+(\.\d+)?$', text): dist = abs(cy - roi_center_y) if dist < closest_dist: closest_dist = dist result = text else: text = ocr_engine.ocr(roi, cls=False) if text and text[0]: result = text[0][0][1][0].replace(" ", "").strip() else: result = "" frame_dict[key] = result frame_meta_list.append(frame_dict) os.makedirs(os.path.dirname(output), exist_ok=True) with open(output, "w", encoding='utf-8-sig', newline='') as f: writer = csv.DictWriter(f, fieldnames=meta_list) writer.writeheader() for data in frame_meta_list: writer.writerow(data) print(f"geo_info 생성 완료: {output}") def interpolation(self, input, output): df = pd.read_csv(input) df.ffill(inplace=True) # fillna(method='ffill') deprecated → ffill() df.bfill(inplace=True) # fillna(method='bfill') deprecated → bfill() df.to_csv(output, index=False, encoding='utf-8-sig') print(f"interpolation 저장 완료: {output}") def run_metadata_export(file_id: str): """ file_id 기준으로 EXIF 추출 + 보간 CSV를 생성한다. 결과: mx15hdi/Metadata/CSV/{file_id}/mx15hdi_interpolation.csv """ img_path = str(_MX15HDI_DIR / 'Metadata' / 'Image' / 'Original_Images' / file_id) csv_path = str(_MX15HDI_DIR / 'Metadata' / 'CSV' / file_id / 'mx15hdi.csv') interp_csv_path = str(_MX15HDI_DIR / 'Metadata' / 'CSV' / file_id / 'mx15hdi_interpolation.csv') i = meta_info() i.extract_and_save_image_metadata(image_path=img_path, output_csv_path=csv_path) i.interpolation(input=csv_path, output=interp_csv_path) if __name__ == "__main__": # Get parameter from command line if len(sys.argv) < 2: raise ValueError("파라미터가 제공되지 않았습니다. 폴더 이름을 명령줄 인자로 입력해주세요.") param = sys.argv[1] print("param: ", param) run_metadata_export(param)