"""gear_correlation_raw_metrics 파티션 유지보수. APScheduler 일별 작업으로 실행: - system_config에서 설정 읽기 (hot-reload, 프로세스 재시작 불필요) - 미래 파티션 미리 생성 - 만료 파티션 DROP - 미관측 점수 레코드 정리 """ import logging from datetime import date, datetime, timedelta from config import qualified_table, settings logger = logging.getLogger(__name__) SYSTEM_CONFIG = qualified_table('system_config') GEAR_CORRELATION_RAW_METRICS = qualified_table('gear_correlation_raw_metrics') GEAR_CORRELATION_SCORES = qualified_table('gear_correlation_scores') def _get_config_int(conn, key: str, default: int) -> int: """system_config에서 설정값 조회. 없으면 default.""" cur = conn.cursor() try: cur.execute( f"SELECT value::text FROM {SYSTEM_CONFIG} WHERE key = %s", (key,), ) row = cur.fetchone() return int(row[0].strip('"')) if row else default except Exception: return default finally: cur.close() def _create_future_partitions(conn, days_ahead: int) -> int: """미래 N일 파티션 생성. 반환: 생성된 파티션 수.""" cur = conn.cursor() created = 0 try: for i in range(days_ahead + 1): d = date.today() + timedelta(days=i) partition_name = f'gear_correlation_raw_metrics_{d.strftime("%Y%m%d")}' cur.execute( "SELECT 1 FROM pg_class c " "JOIN pg_namespace n ON n.oid = c.relnamespace " "WHERE c.relname = %s AND n.nspname = %s", (partition_name, settings.KCGDB_SCHEMA), ) if cur.fetchone() is None: next_d = d + timedelta(days=1) cur.execute( f"CREATE TABLE IF NOT EXISTS {qualified_table(partition_name)} " f"PARTITION OF {GEAR_CORRELATION_RAW_METRICS} " f"FOR VALUES FROM ('{d.isoformat()}') TO ('{next_d.isoformat()}')" ) created += 1 logger.info('created partition: %s.%s', settings.KCGDB_SCHEMA, partition_name) conn.commit() except Exception as e: conn.rollback() logger.error('failed to create partitions: %s', e) finally: cur.close() return created def _drop_expired_partitions(conn, retention_days: int) -> int: """retention_days 초과 파티션 DROP. 반환: 삭제된 파티션 수.""" cutoff = date.today() - timedelta(days=retention_days) cur = conn.cursor() dropped = 0 try: cur.execute( "SELECT c.relname FROM pg_class c " "JOIN pg_namespace n ON n.oid = c.relnamespace " "WHERE c.relname LIKE 'gear_correlation_raw_metrics_%%' " "AND n.nspname = %s AND c.relkind = 'r'", (settings.KCGDB_SCHEMA,), ) for (name,) in cur.fetchall(): date_str = name.rsplit('_', 1)[-1] try: partition_date = datetime.strptime(date_str, '%Y%m%d').date() except ValueError: continue if partition_date < cutoff: cur.execute(f'DROP TABLE IF EXISTS {qualified_table(name)}') dropped += 1 logger.info('dropped expired partition: %s.%s', settings.KCGDB_SCHEMA, name) conn.commit() except Exception as e: conn.rollback() logger.error('failed to drop partitions: %s', e) finally: cur.close() return dropped def _cleanup_stale_scores(conn, cleanup_days: int) -> int: """cleanup_days 이상 미관측 점수 레코드 삭제.""" cur = conn.cursor() try: cur.execute( f"DELETE FROM {GEAR_CORRELATION_SCORES} " "WHERE last_observed_at < NOW() - make_interval(days => %s)", (cleanup_days,), ) deleted = cur.rowcount conn.commit() return deleted except Exception as e: conn.rollback() logger.error('failed to cleanup stale scores: %s', e) return 0 finally: cur.close() def maintain_partitions(): """일별 파티션 유지보수 — 스케줄러에서 호출. system_config에서 설정을 매번 읽으므로 API를 통한 설정 변경이 다음 실행 시 즉시 반영됨. """ from db import kcgdb with kcgdb.get_conn() as conn: retention = _get_config_int(conn, 'partition.raw_metrics.retention_days', 7) ahead = _get_config_int(conn, 'partition.raw_metrics.create_ahead_days', 3) cleanup_days = _get_config_int(conn, 'partition.scores.cleanup_days', 30) created = _create_future_partitions(conn, ahead) dropped = _drop_expired_partitions(conn, retention) cleaned = _cleanup_stale_scores(conn, cleanup_days) logger.info( 'partition maintenance: %d created, %d dropped, %d stale scores cleaned ' '(retention=%dd, ahead=%dd, cleanup=%dd)', created, dropped, cleaned, retention, ahead, cleanup_days, )