diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java index 8fc3e14..ef9f2c1 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java @@ -1094,18 +1094,12 @@ public class ChunkedTrackStreamingService { queryId, backpressureWaitCount * 50); } - // 즉시 전송 - chunkConsumer.accept(response); - - // 전송 완료 후 버퍼 크기 감소 (비동기 처리 고려) - CompletableFuture.runAsync(() -> { - try { - Thread.sleep(100); // 네트워크 전송 시간 고려 - pendingBufferSize.addAndGet(-chunkSize); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }); + // 전송 및 버퍼 추적 (전송 완료 후 즉시 감소) + try { + chunkConsumer.accept(response); + } finally { + pendingBufferSize.addAndGet(-chunkSize); + } // 유니크 선박 카운트 batch.forEach(track -> uniqueVesselIds.add(track.getVesselId())); @@ -1120,10 +1114,13 @@ public class ChunkedTrackStreamingService { Math.min(99.0, timeProgress) )); - // 버퍼 사용률에 따른 동적 대기 - long currentBuffer = pendingBufferSize.get(); - int waitTime = currentBuffer > 30_000_000 ? 100 : - currentBuffer > 10_000_000 ? 50 : 10; + // 버퍼 사용률에 따른 적응형 대기 + double currentBufferUsage = (double) pendingBufferSize.get() / MAX_PENDING_BUFFER; + int waitTime; + if (currentBufferUsage > 0.8) waitTime = 200; // 80%↑: 강한 억제 + else if (currentBufferUsage > 0.5) waitTime = 100; // 50%↑: 중간 억제 + else if (currentBufferUsage > 0.3) waitTime = 50; // 30%↑: 약한 억제 + else waitTime = 10; // 정상: 최소 지연 Thread.sleep(waitTime); // 진행 상황 로그 (매 10번째 청크마다) diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/StompTrackStreamingService.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/StompTrackStreamingService.java index d351ead..5cedcb0 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/StompTrackStreamingService.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/StompTrackStreamingService.java @@ -717,29 +717,27 @@ public class StompTrackStreamingService { metrics.recordChunkProcessed(queryId, tracks.size(), System.currentTimeMillis() - chunkStartTime); - // 전송 속도 제어 - 버퍼 누적 방지 - // 청크 크기에 따라 동적으로 지연 시간 조정 - int delayMs = calculateChunkDelay(tracks.size(), totalTracks); + // 전송 속도 제어 - 큐 사용률 기반 적응형 지연 + int delayMs = calculateAdaptiveDelay(resultQueue, totalTracks); if (delayMs > 0) { Thread.sleep(delayMs); } } - // 청크 전송 지연 시간 계산 - private int calculateChunkDelay(int chunkSize, int totalTracks) { - // 대량의 데이터일수록 지연 시간 증가 - if (totalTracks > 1000000) { - return 200; // 200ms 지연 - } else if (totalTracks > 500000) { - return 150; // 150ms 지연 - } else if (totalTracks > 100000) { - return 100; // 100ms 지연 - } else if (totalTracks > 50000) { - return 50; // 50ms 지연 - } else if (totalTracks > 10000) { - return 30; // 30ms 지연 - } - return 10; // 기본 10ms + // 큐 사용률 + 데이터 크기 기반 적응형 지연 계산 + private int calculateAdaptiveDelay(BlockingQueue queue, int totalTracks) { + // 큐 사용률 기반 지연 (큐 용량 100 기준) + double queueUsage = (double) queue.size() / 100; + int queueDelay; + if (queueUsage > 0.8) queueDelay = 200; // 80%↑: 강한 억제 + else if (queueUsage > 0.5) queueDelay = 100; // 50%↑: 중간 억제 + else if (queueUsage > 0.3) queueDelay = 50; // 30%↑: 약한 억제 + else queueDelay = 10; // 정상: 최소 지연 + + // 데이터 크기 기반 보정 (대량 데이터일수록 추가 지연) + int sizeDelay = totalTracks > 500000 ? 50 : totalTracks > 100000 ? 20 : 0; + + return queueDelay + sizeDelay; } private String buildStreamingQuery(TrackQueryRequest request,