perf: 백프레셔 메커니즘 고도화 - 정확한 버퍼 추적 및 적응형 지연
Phase 3: 백프레셔 고도화 - ChunkedTrackStreamingService: - CompletableFuture+Thread.sleep(100ms) 추정 방식 제거 - 전송 완료 후 try-finally에서 즉시 버퍼 크기 감소 (정확한 추적) - 정적 대기 → 버퍼 사용률(%) 기반 4단계 적응형 지연 - StompTrackStreamingService: - 총 트랙 수 기반 정적 지연 제거 - BlockingQueue 사용률 + 데이터 크기 복합 적응형 지연으로 교체 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
부모
28908e1a0d
커밋
7b7e283ea4
@ -1094,18 +1094,12 @@ public class ChunkedTrackStreamingService {
|
|||||||
queryId, backpressureWaitCount * 50);
|
queryId, backpressureWaitCount * 50);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 즉시 전송
|
// 전송 및 버퍼 추적 (전송 완료 후 즉시 감소)
|
||||||
chunkConsumer.accept(response);
|
try {
|
||||||
|
chunkConsumer.accept(response);
|
||||||
// 전송 완료 후 버퍼 크기 감소 (비동기 처리 고려)
|
} finally {
|
||||||
CompletableFuture.runAsync(() -> {
|
pendingBufferSize.addAndGet(-chunkSize);
|
||||||
try {
|
}
|
||||||
Thread.sleep(100); // 네트워크 전송 시간 고려
|
|
||||||
pendingBufferSize.addAndGet(-chunkSize);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// 유니크 선박 카운트
|
// 유니크 선박 카운트
|
||||||
batch.forEach(track -> uniqueVesselIds.add(track.getVesselId()));
|
batch.forEach(track -> uniqueVesselIds.add(track.getVesselId()));
|
||||||
@ -1120,10 +1114,13 @@ public class ChunkedTrackStreamingService {
|
|||||||
Math.min(99.0, timeProgress)
|
Math.min(99.0, timeProgress)
|
||||||
));
|
));
|
||||||
|
|
||||||
// 버퍼 사용률에 따른 동적 대기
|
// 버퍼 사용률에 따른 적응형 대기
|
||||||
long currentBuffer = pendingBufferSize.get();
|
double currentBufferUsage = (double) pendingBufferSize.get() / MAX_PENDING_BUFFER;
|
||||||
int waitTime = currentBuffer > 30_000_000 ? 100 :
|
int waitTime;
|
||||||
currentBuffer > 10_000_000 ? 50 : 10;
|
if (currentBufferUsage > 0.8) waitTime = 200; // 80%↑: 강한 억제
|
||||||
|
else if (currentBufferUsage > 0.5) waitTime = 100; // 50%↑: 중간 억제
|
||||||
|
else if (currentBufferUsage > 0.3) waitTime = 50; // 30%↑: 약한 억제
|
||||||
|
else waitTime = 10; // 정상: 최소 지연
|
||||||
Thread.sleep(waitTime);
|
Thread.sleep(waitTime);
|
||||||
|
|
||||||
// 진행 상황 로그 (매 10번째 청크마다)
|
// 진행 상황 로그 (매 10번째 청크마다)
|
||||||
|
|||||||
@ -717,29 +717,27 @@ public class StompTrackStreamingService {
|
|||||||
metrics.recordChunkProcessed(queryId, tracks.size(),
|
metrics.recordChunkProcessed(queryId, tracks.size(),
|
||||||
System.currentTimeMillis() - chunkStartTime);
|
System.currentTimeMillis() - chunkStartTime);
|
||||||
|
|
||||||
// 전송 속도 제어 - 버퍼 누적 방지
|
// 전송 속도 제어 - 큐 사용률 기반 적응형 지연
|
||||||
// 청크 크기에 따라 동적으로 지연 시간 조정
|
int delayMs = calculateAdaptiveDelay(resultQueue, totalTracks);
|
||||||
int delayMs = calculateChunkDelay(tracks.size(), totalTracks);
|
|
||||||
if (delayMs > 0) {
|
if (delayMs > 0) {
|
||||||
Thread.sleep(delayMs);
|
Thread.sleep(delayMs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 청크 전송 지연 시간 계산
|
// 큐 사용률 + 데이터 크기 기반 적응형 지연 계산
|
||||||
private int calculateChunkDelay(int chunkSize, int totalTracks) {
|
private int calculateAdaptiveDelay(BlockingQueue<?> queue, int totalTracks) {
|
||||||
// 대량의 데이터일수록 지연 시간 증가
|
// 큐 사용률 기반 지연 (큐 용량 100 기준)
|
||||||
if (totalTracks > 1000000) {
|
double queueUsage = (double) queue.size() / 100;
|
||||||
return 200; // 200ms 지연
|
int queueDelay;
|
||||||
} else if (totalTracks > 500000) {
|
if (queueUsage > 0.8) queueDelay = 200; // 80%↑: 강한 억제
|
||||||
return 150; // 150ms 지연
|
else if (queueUsage > 0.5) queueDelay = 100; // 50%↑: 중간 억제
|
||||||
} else if (totalTracks > 100000) {
|
else if (queueUsage > 0.3) queueDelay = 50; // 30%↑: 약한 억제
|
||||||
return 100; // 100ms 지연
|
else queueDelay = 10; // 정상: 최소 지연
|
||||||
} else if (totalTracks > 50000) {
|
|
||||||
return 50; // 50ms 지연
|
// 데이터 크기 기반 보정 (대량 데이터일수록 추가 지연)
|
||||||
} else if (totalTracks > 10000) {
|
int sizeDelay = totalTracks > 500000 ? 50 : totalTracks > 100000 ? 20 : 0;
|
||||||
return 30; // 30ms 지연
|
|
||||||
}
|
return queueDelay + sizeDelay;
|
||||||
return 10; // 기본 10ms
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private String buildStreamingQuery(TrackQueryRequest request,
|
private String buildStreamingQuery(TrackQueryRequest request,
|
||||||
|
|||||||
불러오는 중...
Reference in New Issue
Block a user