import os import xarray as xr import pandas as pd import numpy as np from datetime import datetime, timedelta from typing import Optional, List from config import STORAGE from logger import get_logger logger = get_logger("latestForecastDate") def check_file_size(file_path: str, min_size_bytes: int = 1024) -> bool: """파일 크기 확인""" try: if not os.path.exists(file_path): return False file_size = os.path.getsize(file_path) if file_size < min_size_bytes: logger.debug(f"File size insufficient: {os.path.basename(file_path)} ({file_size} bytes)") return False return True except Exception as e: logger.debug(f"File check error: {os.path.basename(file_path)} - {e}") return False def check_folder_completeness(folder_path: str, required_patterns: Optional[List[str]] = None, min_file_size: int = 1024) -> bool: """폴더의 다운로드 완전성 확인""" try: all_files = os.listdir(folder_path) if required_patterns: for pattern in required_patterns: matching_files = [f for f in all_files if f.startswith(pattern)] if not matching_files: return False for matched_file in matching_files: file_path = os.path.join(folder_path, matched_file) if not check_file_size(file_path, min_file_size): return False return True nc_files = [f for f in os.listdir(folder_path) if f.endswith('.nc')] if not nc_files: logger.debug("No NC files found") return False invalid_count = 0 for nc_file in nc_files: file_path = os.path.join(folder_path, nc_file) if not check_file_size(file_path, min_file_size): invalid_count += 1 if invalid_count > 0: logger.debug(f"{invalid_count} files with insufficient size") return False return True except Exception as e: logger.debug(f"Validation error: {e}") return False def get_latest_forecast_date(base_path: str, max_folders_to_check: int = 7, required_patterns: Optional[List[str]] = None, min_file_size: int = 1024) -> Optional[str]: """ YYYYMMDD 폴더 중 내림차순 상위 N개에서 완전한 최신 폴더의 생성 시간 반환 Args: base_path: 예보 파일 저장 경로 max_folders_to_check: 확인할 최신 폴더 개수 required_patterns: 필수 파일 목록 min_file_size: 파일 최소 크기 (bytes) Returns: YYYYMMDDHHmm 형식 또는 None """ if not os.path.exists(base_path): logger.warning(f"Path not found: {base_path}") return None try: subdirs = [d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d))] except PermissionError: logger.warning(f"Permission denied: {base_path}") return None valid_folders = [] for subdir in subdirs: if len(subdir) == 8 and subdir.isdigit(): try: datetime.strptime(subdir, '%Y%m%d') folder_path = os.path.join(base_path, subdir) creation_time = os.path.getctime(folder_path) valid_folders.append((subdir, creation_time, folder_path)) except (ValueError, OSError): continue if not valid_folders: logger.warning(f"No valid date folders: {base_path}") return None valid_folders.sort(key=lambda x: x[0], reverse=True) folders_to_check = valid_folders[:max_folders_to_check] folders_to_check.sort(key=lambda x: x[1], reverse=True) for folder_name, creation_timestamp, folder_path in folders_to_check: is_complete = check_folder_completeness( folder_path, required_patterns=required_patterns, min_file_size=min_file_size ) if is_complete: return folder_name logger.warning(f"No complete folder: {base_path}") return None def get_earliest_latest_forecast_date(wind_path: str = None, hydr_path: str = None, max_folders_to_check: int = 7, required_patterns: Optional[List[str]] = None, min_file_size: int = 1024) -> Optional[dict]: """ 바람과 해수 예보의 완전한 최신 폴더 생성 시간 중 더 과거 시간 반환 Args: wind_path: 바람 예보 경로 hydr_path: 해수 예보 경로 max_folders_to_check: 확인할 최신 폴더 개수 required_patterns: 필수 파일 목록 min_file_size: 파일 최소 크기 (bytes) Returns: 예보 정보 딕셔너리 또는 None """ if wind_path is None: wind_path = str(STORAGE.POS_WIND) if hydr_path is None: hydr_path = str(STORAGE.POS_HYDR) if required_patterns is None: required_patterns = ["EA012", "KO108"] wind_latest = get_latest_forecast_date( wind_path, max_folders_to_check=max_folders_to_check, required_patterns=required_patterns, min_file_size=min_file_size ) hydr_latest = get_latest_forecast_date( hydr_path, max_folders_to_check=max_folders_to_check, required_patterns=required_patterns, min_file_size=min_file_size ) if wind_latest is None or hydr_latest is None: logger.warning(f"Warning: No forecast received in the last {max_folders_to_check} days. Contact administrator.") return None latest_folder_name = min(wind_latest, hydr_latest) latest_folder_name_formatted = datetime.strptime(latest_folder_name, "%Y%m%d").strftime("%Y-%m-%d") latest_receive_date = (datetime.strptime(latest_folder_name, "%Y%m%d") + timedelta(days=1)).strftime("%Y-%m-%d") hydr_file_path = os.path.join(hydr_path, hydr_latest, f"KO108_MOHID_HYDR_SURF_{hydr_latest}00.nc") with xr.open_dataset(hydr_file_path) as ds: start_date = ds['time'].values[0] end_date = ds['time'].values[-1] diff = end_date - start_date hour_diff = diff / np.timedelta64(1, 'h') hour_diff_string = f"{int(hour_diff)}h" return_json = { "date": latest_folder_name_formatted, "receivedDate": latest_receive_date + " 12:00", "startDate": pd.Timestamp(start_date).strftime("%Y-%m-%d %H:%M:%S"), "endDate": pd.Timestamp(end_date).strftime("%Y-%m-%d %H:%M:%S"), "diff": hour_diff_string } return return_json if __name__ == "__main__": result = get_earliest_latest_forecast_date() if result: logger.info(f"Result: {result}") else: logger.info("No complete forecast folder found")