perf: 백프레셔 메커니즘 고도화 - 정확한 버퍼 추적 및 적응형 지연

Phase 3: 백프레셔 고도화
- ChunkedTrackStreamingService:
  - CompletableFuture+Thread.sleep(100ms) 추정 방식 제거
  - 전송 완료 후 try-finally에서 즉시 버퍼 크기 감소 (정확한 추적)
  - 정적 대기 → 버퍼 사용률(%) 기반 4단계 적응형 지연
- StompTrackStreamingService:
  - 총 트랙 수 기반 정적 지연 제거
  - BlockingQueue 사용률 + 데이터 크기 복합 적응형 지연으로 교체

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
HeungTak Lee 2026-02-06 13:43:50 +09:00
부모 28908e1a0d
커밋 7b7e283ea4
2개의 변경된 파일29개의 추가작업 그리고 34개의 파일을 삭제

파일 보기

@ -1094,18 +1094,12 @@ public class ChunkedTrackStreamingService {
queryId, backpressureWaitCount * 50);
}
// 즉시 전송
chunkConsumer.accept(response);
// 전송 완료 버퍼 크기 감소 (비동기 처리 고려)
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(100); // 네트워크 전송 시간 고려
pendingBufferSize.addAndGet(-chunkSize);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 전송 버퍼 추적 (전송 완료 즉시 감소)
try {
chunkConsumer.accept(response);
} finally {
pendingBufferSize.addAndGet(-chunkSize);
}
// 유니크 선박 카운트
batch.forEach(track -> uniqueVesselIds.add(track.getVesselId()));
@ -1120,10 +1114,13 @@ public class ChunkedTrackStreamingService {
Math.min(99.0, timeProgress)
));
// 버퍼 사용률에 따른 동적 대기
long currentBuffer = pendingBufferSize.get();
int waitTime = currentBuffer > 30_000_000 ? 100 :
currentBuffer > 10_000_000 ? 50 : 10;
// 버퍼 사용률에 따른 적응형 대기
double currentBufferUsage = (double) pendingBufferSize.get() / MAX_PENDING_BUFFER;
int waitTime;
if (currentBufferUsage > 0.8) waitTime = 200; // 80%: 강한 억제
else if (currentBufferUsage > 0.5) waitTime = 100; // 50%: 중간 억제
else if (currentBufferUsage > 0.3) waitTime = 50; // 30%: 약한 억제
else waitTime = 10; // 정상: 최소 지연
Thread.sleep(waitTime);
// 진행 상황 로그 ( 10번째 청크마다)

파일 보기

@ -717,29 +717,27 @@ public class StompTrackStreamingService {
metrics.recordChunkProcessed(queryId, tracks.size(),
System.currentTimeMillis() - chunkStartTime);
// 전송 속도 제어 - 버퍼 누적 방지
// 청크 크기에 따라 동적으로 지연 시간 조정
int delayMs = calculateChunkDelay(tracks.size(), totalTracks);
// 전송 속도 제어 - 사용률 기반 적응형 지연
int delayMs = calculateAdaptiveDelay(resultQueue, totalTracks);
if (delayMs > 0) {
Thread.sleep(delayMs);
}
}
// 청크 전송 지연 시간 계산
private int calculateChunkDelay(int chunkSize, int totalTracks) {
// 대량의 데이터일수록 지연 시간 증가
if (totalTracks > 1000000) {
return 200; // 200ms 지연
} else if (totalTracks > 500000) {
return 150; // 150ms 지연
} else if (totalTracks > 100000) {
return 100; // 100ms 지연
} else if (totalTracks > 50000) {
return 50; // 50ms 지연
} else if (totalTracks > 10000) {
return 30; // 30ms 지연
}
return 10; // 기본 10ms
// 사용률 + 데이터 크기 기반 적응형 지연 계산
private int calculateAdaptiveDelay(BlockingQueue<?> queue, int totalTracks) {
// 사용률 기반 지연 ( 용량 100 기준)
double queueUsage = (double) queue.size() / 100;
int queueDelay;
if (queueUsage > 0.8) queueDelay = 200; // 80%: 강한 억제
else if (queueUsage > 0.5) queueDelay = 100; // 50%: 중간 억제
else if (queueUsage > 0.3) queueDelay = 50; // 30%: 약한 억제
else queueDelay = 10; // 정상: 최소 지연
// 데이터 크기 기반 보정 (대량 데이터일수록 추가 지연)
int sizeDelay = totalTracks > 500000 ? 50 : totalTracks > 100000 ? 20 : 0;
return queueDelay + sizeDelay;
}
private String buildStreamingQuery(TrackQueryRequest request,