diff --git a/src/main/java/gc/mda/signal_batch/global/config/WebSocketProperties.java b/src/main/java/gc/mda/signal_batch/global/config/WebSocketProperties.java new file mode 100644 index 0000000..ce6d183 --- /dev/null +++ b/src/main/java/gc/mda/signal_batch/global/config/WebSocketProperties.java @@ -0,0 +1,52 @@ +package gc.mda.signal_batch.global.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * WebSocket 부하 제어 관련 설정 + * application-{profile}.yml의 websocket.* 설정을 바인딩 + */ +@Data +@Component +@ConfigurationProperties(prefix = "websocket") +public class WebSocketProperties { + + private QueryProperties query = new QueryProperties(); + private TransportProperties transport = new TransportProperties(); + private BackpressureProperties backpressure = new BackpressureProperties(); + + @Data + public static class QueryProperties { + /** 서버 전체 동시 실행 쿼리 상한 */ + private int maxConcurrentGlobal = 30; + /** 세션당 동시 쿼리 상한 */ + private int maxPerSession = 3; + /** 글로벌 대기 큐 타임아웃 (초) */ + private int queueTimeoutSeconds = 30; + } + + @Data + public static class TransportProperties { + private int inboundCorePoolSize = 10; + private int inboundMaxPoolSize = 20; + private int inboundQueueCapacity = 100; + private int outboundCorePoolSize = 20; + private int outboundMaxPoolSize = 40; + private int outboundQueueCapacity = 5000; + private int messageSizeLimitMb = 50; + private int sendBufferSizeLimitMb = 256; + private int sendTimeLimitSeconds = 120; + } + + @Data + public static class BackpressureProperties { + /** 최대 대기 버퍼 크기 (MB) */ + private long maxPendingBufferMb = 50; + /** 메시지당 최대 크기 (KB) */ + private int maxMessageSizeKb = 1024; + /** 메시지당 최소 크기 (KB) */ + private int minMessageSizeKb = 256; + } +} diff --git a/src/main/java/gc/mda/signal_batch/monitoring/controller/WebSocketMonitoringController.java b/src/main/java/gc/mda/signal_batch/monitoring/controller/WebSocketMonitoringController.java index 6ffe1da..5ab5502 100644 --- a/src/main/java/gc/mda/signal_batch/monitoring/controller/WebSocketMonitoringController.java +++ b/src/main/java/gc/mda/signal_batch/monitoring/controller/WebSocketMonitoringController.java @@ -1,6 +1,7 @@ package gc.mda.signal_batch.monitoring.controller; import gc.mda.signal_batch.monitoring.service.TrackStreamingMetrics; +import gc.mda.signal_batch.global.websocket.service.ActiveQueryManager; import gc.mda.signal_batch.global.websocket.service.StompTrackStreamingService; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -13,6 +14,7 @@ import org.springframework.web.servlet.view.RedirectView; import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; @Slf4j @@ -24,6 +26,7 @@ public class WebSocketMonitoringController { private final TrackStreamingMetrics trackStreamingMetrics; private final StompTrackStreamingService trackStreamingService; + private final ActiveQueryManager activeQueryManager; /** * WebSocket 스트리밍 현황 조회 @@ -69,6 +72,49 @@ public class WebSocketMonitoringController { return ResponseEntity.ok(result); } + /** + * 부하 제어 현황 조회 + */ + @GetMapping("/load-control") + @Operation(summary = "부하 제어 현황", description = "글로벌 동시 쿼리 제한, 대기 큐, 활성 쿼리 상세 정보를 조회합니다") + public ResponseEntity> getLoadControlStatus() { + Map status = new HashMap<>(); + + // 글로벌 동시 쿼리 제한 상태 + Map concurrency = new HashMap<>(); + concurrency.put("globalActiveQueries", activeQueryManager.getGlobalActiveQueryCount()); + concurrency.put("maxConcurrentGlobal", activeQueryManager.getMaxConcurrentGlobal()); + concurrency.put("waitingInQueue", activeQueryManager.getWaitingCount()); + concurrency.put("registeredQueries", activeQueryManager.getActiveQueryCount()); + status.put("concurrency", concurrency); + + // 활성 쿼리 상세 + status.put("activeQueryDetails", activeQueryManager.getAllActiveQueries().entrySet().stream() + .map(e -> { + Map detail = new HashMap<>(); + detail.put("sessionId", e.getKey()); + detail.put("queryId", e.getValue().getQueryId()); + detail.put("startTime", e.getValue().getStartTime().toString()); + detail.put("currentTable", String.valueOf(e.getValue().getCurrentTable())); + detail.put("processedChunks", e.getValue().getProcessedChunks()); + detail.put("cancelled", e.getValue().isCancelled()); + return detail; + }) + .collect(Collectors.toList())); + + // 메모리 상태 + Runtime runtime = Runtime.getRuntime(); + Map memory = new HashMap<>(); + long usedMb = (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024); + long maxMb = runtime.maxMemory() / (1024 * 1024); + memory.put("used", usedMb + " MB"); + memory.put("max", maxMb + " MB"); + memory.put("usage", String.format("%.1f%%", (double) usedMb / maxMb * 100)); + status.put("memory", memory); + + return ResponseEntity.ok(status); + } + /** * WebSocket 테스트 페이지로 리다이렉트 */