"""gear_correlation_raw_metrics 파티션 유지보수. APScheduler 일별 작업으로 실행: - system_config에서 설정 읽기 (hot-reload, 프로세스 재시작 불필요) - 미래 파티션 미리 생성 - 만료 파티션 DROP - 미관측 점수 레코드 정리 """ import logging from datetime import date, datetime, timedelta from config import qualified_table, settings logger = logging.getLogger(__name__) SYSTEM_CONFIG = qualified_table('system_config') GEAR_CORRELATION_RAW_METRICS = qualified_table('gear_correlation_raw_metrics') GEAR_CORRELATION_SCORES = qualified_table('gear_correlation_scores') DETECTION_MODEL_RUN_OUTPUTS = qualified_table('detection_model_run_outputs') def _get_config_int(conn, key: str, default: int) -> int: """system_config에서 설정값 조회. 없으면 default.""" cur = conn.cursor() try: cur.execute( f"SELECT value::text FROM {SYSTEM_CONFIG} WHERE key = %s", (key,), ) row = cur.fetchone() return int(row[0].strip('"')) if row else default except Exception: return default finally: cur.close() def _create_future_partitions(conn, days_ahead: int) -> int: """미래 N일 파티션 생성. 반환: 생성된 파티션 수.""" cur = conn.cursor() created = 0 try: for i in range(days_ahead + 1): d = date.today() + timedelta(days=i) partition_name = f'gear_correlation_raw_metrics_{d.strftime("%Y%m%d")}' cur.execute( "SELECT 1 FROM pg_class c " "JOIN pg_namespace n ON n.oid = c.relnamespace " "WHERE c.relname = %s AND n.nspname = %s", (partition_name, settings.KCGDB_SCHEMA), ) if cur.fetchone() is None: next_d = d + timedelta(days=1) cur.execute( f"CREATE TABLE IF NOT EXISTS {qualified_table(partition_name)} " f"PARTITION OF {GEAR_CORRELATION_RAW_METRICS} " f"FOR VALUES FROM ('{d.isoformat()}') TO ('{next_d.isoformat()}')" ) created += 1 logger.info('created partition: %s.%s', settings.KCGDB_SCHEMA, partition_name) conn.commit() except Exception as e: conn.rollback() logger.error('failed to create partitions: %s', e) finally: cur.close() return created def _drop_expired_partitions(conn, retention_days: int) -> int: """retention_days 초과 파티션 DROP. 반환: 삭제된 파티션 수.""" cutoff = date.today() - timedelta(days=retention_days) cur = conn.cursor() dropped = 0 try: cur.execute( "SELECT c.relname FROM pg_class c " "JOIN pg_namespace n ON n.oid = c.relnamespace " "WHERE c.relname LIKE 'gear_correlation_raw_metrics_%%' " "AND n.nspname = %s AND c.relkind = 'r'", (settings.KCGDB_SCHEMA,), ) for (name,) in cur.fetchall(): date_str = name.rsplit('_', 1)[-1] try: partition_date = datetime.strptime(date_str, '%Y%m%d').date() except ValueError: continue if partition_date < cutoff: cur.execute(f'DROP TABLE IF EXISTS {qualified_table(name)}') dropped += 1 logger.info('dropped expired partition: %s.%s', settings.KCGDB_SCHEMA, name) conn.commit() except Exception as e: conn.rollback() logger.error('failed to drop partitions: %s', e) finally: cur.close() return dropped def _create_future_monthly_detection_partitions(conn, months_ahead: int) -> int: """detection_model_run_outputs 미래 N개월 파티션 생성. 월별 RANGE 파티션 (cycle_started_at) — V034 에서 2026-04/05 가 Flyway 로 선생성. 이후는 이 함수가 매일 돌면서 `months_ahead` 만큼 미리 생성. Returns: 생성된 파티션 수 """ cur = conn.cursor() created = 0 try: anchor = date.today().replace(day=1) for i in range(months_ahead + 1): # anchor 기준 +i 개월 y = anchor.year + (anchor.month - 1 + i) // 12 m = (anchor.month - 1 + i) % 12 + 1 start = date(y, m, 1) ny = y + (1 if m == 12 else 0) nm = 1 if m == 12 else m + 1 end = date(ny, nm, 1) partition_name = f'detection_model_run_outputs_{y:04d}_{m:02d}' cur.execute( "SELECT 1 FROM pg_class c " "JOIN pg_namespace n ON n.oid = c.relnamespace " "WHERE c.relname = %s AND n.nspname = %s", (partition_name, settings.KCGDB_SCHEMA), ) if cur.fetchone() is None: cur.execute( f"CREATE TABLE IF NOT EXISTS {qualified_table(partition_name)} " f"PARTITION OF {DETECTION_MODEL_RUN_OUTPUTS} " f"FOR VALUES FROM ('{start.isoformat()}') TO ('{end.isoformat()}')" ) created += 1 logger.info( 'created partition: %s.%s', settings.KCGDB_SCHEMA, partition_name, ) conn.commit() except Exception as e: conn.rollback() logger.error('failed to create detection_model_run_outputs partitions: %s', e) finally: cur.close() return created def _drop_expired_monthly_detection_partitions(conn, retention_months: int) -> int: """detection_model_run_outputs retention_months 초과 월 파티션 DROP. SHADOW 원시 결과는 비교 분석 후 가치 낮음 — 기본 retention 은 1개월. 집계는 detection_model_metrics 에 보존되므로 원시 폐기해도 추적 가능. """ cutoff_anchor = date.today().replace(day=1) # retention_months 만큼 과거로 이동 y = cutoff_anchor.year m = cutoff_anchor.month - retention_months while m <= 0: m += 12 y -= 1 cutoff = date(y, m, 1) cur = conn.cursor() dropped = 0 try: cur.execute( "SELECT c.relname FROM pg_class c " "JOIN pg_namespace n ON n.oid = c.relnamespace " "WHERE c.relname LIKE 'detection_model_run_outputs_%%' " "AND n.nspname = %s AND c.relkind = 'r'", (settings.KCGDB_SCHEMA,), ) for (name,) in cur.fetchall(): tail = name[len('detection_model_run_outputs_'):] try: yy, mm = tail.split('_') partition_start = date(int(yy), int(mm), 1) except (ValueError, IndexError): continue if partition_start < cutoff: cur.execute(f'DROP TABLE IF EXISTS {qualified_table(name)}') dropped += 1 logger.info( 'dropped expired partition: %s.%s', settings.KCGDB_SCHEMA, name, ) conn.commit() except Exception as e: conn.rollback() logger.error('failed to drop detection_model_run_outputs partitions: %s', e) finally: cur.close() return dropped def _cleanup_stale_scores(conn, cleanup_days: int) -> int: """cleanup_days 이상 미관측 점수 레코드 삭제.""" cur = conn.cursor() try: cur.execute( f"DELETE FROM {GEAR_CORRELATION_SCORES} " "WHERE last_observed_at < NOW() - make_interval(days => %s)", (cleanup_days,), ) deleted = cur.rowcount conn.commit() return deleted except Exception as e: conn.rollback() logger.error('failed to cleanup stale scores: %s', e) return 0 finally: cur.close() def maintain_partitions(): """일별 파티션 유지보수 — 스케줄러에서 호출. system_config에서 설정을 매번 읽으므로 API를 통한 설정 변경이 다음 실행 시 즉시 반영됨. """ from db import kcgdb with kcgdb.get_conn() as conn: retention = _get_config_int(conn, 'partition.raw_metrics.retention_days', 7) ahead = _get_config_int(conn, 'partition.raw_metrics.create_ahead_days', 3) cleanup_days = _get_config_int(conn, 'partition.scores.cleanup_days', 30) det_months_ahead = _get_config_int( conn, 'partition.detection_model_run_outputs.create_ahead_months', 2, ) det_retention_months = _get_config_int( conn, 'partition.detection_model_run_outputs.retention_months', 1, ) created = _create_future_partitions(conn, ahead) dropped = _drop_expired_partitions(conn, retention) cleaned = _cleanup_stale_scores(conn, cleanup_days) det_created = _create_future_monthly_detection_partitions(conn, det_months_ahead) det_dropped = _drop_expired_monthly_detection_partitions(conn, det_retention_months) logger.info( 'partition maintenance: %d created, %d dropped, %d stale scores cleaned ' '(retention=%dd, ahead=%dd, cleanup=%dd); ' 'detection_model_run_outputs: %d created, %d dropped ' '(retention_months=%d, ahead_months=%d)', created, dropped, cleaned, retention, ahead, cleanup_days, det_created, det_dropped, det_retention_months, det_months_ahead, )