Phase 1.1 + 1.2: WebSocket 리플레이 요청 동시 부하 제어 - ActiveQueryManager에 Fair Semaphore 기반 글로벌 동시 쿼리 제한 추가 (기본 30개) - @Async 스트리밍 메서드 내에서 슬롯 획득 (인바운드 채널 블로킹 방지) - 쿼리 완료/실패/취소 시 finally 블록에서 반드시 리소스 반환 - 글로벌 Semaphore 슬롯 반환 - 세션별 쿼리 카운트 감소 (기존 누락 수정) - ActiveQueryManager 쿼리 정리 - TrackQueryInterceptor 세션 제한값 외부 설정화 (@Value) - application-prod.yml에 websocket.query 설정 추가 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
32 KiB
WebSocket 성능 부하 개선 보고서
선박 항적 리플레이 서비스 — 동시 요청 부하 대응
| 항목 | 내용 |
|---|---|
| 작성일 | 2026-02-06 |
| 대상 시스템 | Signal Batch — 선박 항적 조회/리플레이 WebSocket 서비스 |
| 운영 환경 | Linux, vessel-batch-control.sh → run-on-query-server-dev.sh (prod 프로파일) |
| 문제 상황 | 다수 클라이언트의 리플레이 요청 동시 유입 시 서비스 장애 발생 |
1. 현황 분석 (AS-IS)
1.1 시스템 아키텍처 개요
클라이언트 (N개)
│ WebSocket STOMP (/ws-tracks)
▼
┌─────────────────────────────────────────────────┐
│ Tomcat (max-threads: 200, max-connections: 10000) │
│ ┌─────────────────────────────────────────────┐ │
│ │ STOMP Inbound Channel (core:10, max:20) │ │
│ │ → TrackQueryInterceptor (세션당 3개 제한) │ │
│ └────────────┬────────────────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ @Async trackStreamingExecutor │ │
│ │ (core:15, max:30, queue:500) │ │
│ │ ┌───────────────────────────────────────┐ │ │
│ │ │ ChunkedTrackStreamingService │ │ │
│ │ │ └ 백프레셔: pendingBufferSize ≤ 50MB │ │ │
│ │ │ StompTrackStreamingService │ │ │
│ │ │ └ BlockingQueue(100) + Thread.sleep │ │ │
│ │ └───────────────────────────────────────┘ │ │
│ └────────────┬────────────────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ STOMP Outbound Channel (core:20, max:40) │ │
│ │ (queue: 5000) │ │
│ └─────────────────────────────────────────────┘ │
│ ▼ │
│ PostgreSQL + PostGIS (HikariCP max: 60) │
└─────────────────────────────────────────────────────┘
1.2 현재 구현된 부하 제어 메커니즘
(A) 세션당 동시 쿼리 제한 — TrackQueryInterceptor
파일: global/websocket/interceptor/TrackQueryInterceptor.java
// AS-IS: 세션당 최대 3개 동시 쿼리 제한
private final ConcurrentHashMap<String, AtomicInteger> sessionQueries = new ConcurrentHashMap<>();
private static final int MAX_QUERIES_PER_SESSION = 3;
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
// ...
if ("/app/tracks/query".equals(destination)) {
AtomicInteger queryCount = sessionQueries.computeIfAbsent(
sessionId, k -> new AtomicInteger(0)
);
if (queryCount.get() >= MAX_QUERIES_PER_SESSION) {
throw new IllegalStateException("Maximum concurrent queries exceeded");
}
queryCount.incrementAndGet();
}
return message;
}
// 쿼리 완료 시 카운트 감소 메서드 (존재하지만 호출부 없음)
public void decrementQueryCount(String sessionId) {
AtomicInteger count = sessionQueries.get(sessionId);
if (count != null) {
count.decrementAndGet();
}
}
(B) 메모리 버퍼 기반 백프레셔 — ChunkedTrackStreamingService
파일: global/websocket/service/ChunkedTrackStreamingService.java
// AS-IS: 50MB 버퍼 기반 백프레셔
private final AtomicLong pendingBufferSize = new AtomicLong(0);
private static final long MAX_PENDING_BUFFER = 50 * 1024 * 1024; // 50MB
// 버퍼 초과 시 busy-wait
int backpressureWaitCount = 0;
while (pendingBufferSize.get() > MAX_PENDING_BUFFER) {
Thread.sleep(50);
backpressureWaitCount++;
// 500ms 이상 대기 시 청크 크기 축소
if (backpressureWaitCount > 10 && metrics.dynamicChunkSizeKB > MIN_MESSAGE_SIZE_KB) {
metrics.dynamicChunkSizeKB = Math.max(MIN_MESSAGE_SIZE_KB,
metrics.dynamicChunkSizeKB - 512);
}
}
// 전송 완료 후 버퍼 감소 (비동기 + 100ms 고정 대기)
CompletableFuture.runAsync(() -> {
Thread.sleep(100); // 네트워크 전송 시간 "추정"
pendingBufferSize.addAndGet(-chunkSize);
});
(C) 비동기 스레드 풀 — AsyncConfig
파일: global/config/AsyncConfig.java
// AS-IS: 고정 크기 스레드 풀
@Bean(name = "trackStreamingExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(15);
executor.setMaxPoolSize(30);
executor.setQueueCapacity(500);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
(D) 정적 전송 지연 — StompTrackStreamingService
파일: global/websocket/service/StompTrackStreamingService.java
// AS-IS: 데이터 크기에 따른 고정 지연
private int calculateChunkDelay(int chunkSize, int totalTracks) {
if (totalTracks > 1000000) return 200;
else if (totalTracks > 500000) return 150;
else if (totalTracks > 100000) return 100;
else if (totalTracks > 50000) return 50;
else if (totalTracks > 10000) return 30;
return 10;
}
(E) STOMP 채널 설정 — WebSocketStompConfig
파일: global/config/WebSocketStompConfig.java
// AS-IS: 인바운드/아웃바운드 채널 스레드 풀
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.taskExecutor()
.corePoolSize(10).maxPoolSize(20).queueCapacity(100);
}
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.taskExecutor()
.corePoolSize(20).maxPoolSize(40).queueCapacity(5000);
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration
.setMessageSizeLimit(50 * 1024 * 1024) // 50MB
.setSendBufferSizeLimit(256 * 1024 * 1024) // 256MB
.setSendTimeLimit(120 * 1000); // 120초
}
1.3 운영 환경 설정
| 항목 | 설정값 |
|---|---|
| JVM 힙 | 서버 메모리의 1/3 (최소 8GB, 최대 16GB) |
| GC | G1GC, MaxGCPauseMillis=200 |
| Tomcat 스레드 | max: 200, min-spare: 10 |
| Tomcat 연결 | max-connections: 10000, accept-count: 100 |
| DB 커넥션 풀 (Query) | max: 60, min-idle: 10 |
| DB 커넥션 풀 (Collect) | max: 20, min-idle: 5 |
| DB 커넥션 풀 (Batch) | max: 20, min-idle: 10 |
2. 식별된 문제점
2.1 [심각] 글로벌 동시 쿼리 제한 없음
현상: 세션당 3개만 제한하므로, 100개 세션이 동시 접속하면 최대 300개 동시 쿼리 발생 가능
영향:
- DB 커넥션 풀(60개) 고갈 → 커넥션 대기 타임아웃
trackStreamingExecutor큐(500) 포화 →CallerRunsPolicy에 의해 인바운드 채널 스레드 블로킹- 인바운드 채널(max 20) 블로킹 → 새 WebSocket 요청 처리 불가 → 전체 서비스 마비
관련 코드: TrackQueryInterceptor.java — 글로벌 카운터 없음
[시나리오: 50개 클라이언트 × 3개 쿼리 = 150개 동시 요청]
trackStreamingExecutor (max: 30 스레드)
→ 30개 실행 + 500개 큐 대기
→ 큐 포화 시 CallerRunsPolicy 발동
→ 인바운드 채널 스레드(20개)에서 직접 실행
→ 인바운드 채널 블로킹 → WebSocket 메시지 처리 중단
→ 하트비트 실패 → 세션 끊김 → 연쇄 장애
2.2 [심각] 쿼리 카운트 감소 로직 누락
현상: TrackQueryInterceptor.decrementQueryCount() 메서드는 존재하지만, 쿼리 완료/실패 시 호출하는 코드가 없음
영향:
- 세션이 유지되는 동안 쿼리 카운트가 계속 증가만 함
- 3개 쿼리 후 해당 세션은 더 이상 쿼리 불가
- DISCONNECT 시에만
sessionQueries.remove()로 정리됨 - 결과적으로 세션당 제한이 사실상 무의미 (장기 연결에서 3개 쿼리 후 차단)
관련 코드:
// TrackQueryInterceptor.java:68 - 정의만 존재, 호출부 없음
public void decrementQueryCount(String sessionId) {
AtomicInteger count = sessionQueries.get(sessionId);
if (count != null) {
count.decrementAndGet();
}
}
2.3 [심각] CachedThreadPool 무제한 스레드 생성
현상: CancellableQueryManager가 Executors.newCachedThreadPool() 사용
영향:
- 부하 시 무제한 스레드 생성 → OOM 또는 OS 스레드 한도 초과
- 스레드 컨텍스트 스위칭 오버헤드 급증
관련 코드: CancellableQueryManager.java
// AS-IS: 상한 없는 스레드 풀
private final ExecutorService queryExecutor = Executors.newCachedThreadPool(r -> {
Thread thread = new Thread(r);
thread.setName("query-executor-" + thread.getId());
thread.setDaemon(true);
return thread;
});
2.4 [심각] ChunkedTrackStreamingService 쿼리 취소 미구현
현상: cancelQuery() 메서드가 메트릭 정리만 수행하고 실제 스트리밍 중단 로직이 없음
영향:
- 클라이언트가 연결을 끊어도 서버에서 DB 쿼리 + 데이터 가공이 계속 실행
- 리소스(DB 커넥션, CPU, 메모리)가 불필요하게 점유됨
관련 코드: ChunkedTrackStreamingService.java:1176
// AS-IS: 취소 시 메트릭만 정리, 실행 중인 쿼리는 계속됨
public void cancelQuery(String queryId) {
log.info("Cancelling chunked query: {}", queryId);
// TODO: 실제 취소 로직 구현
cleanupQueryMetrics(queryId);
}
2.5 [중간] 백프레셔의 비정확한 버퍼 추적
현상: 전송 완료 후 버퍼 크기 감소를 CompletableFuture + Thread.sleep(100ms)로 처리
영향:
- 실제 네트워크 전송 완료와 무관하게 100ms 후 무조건 버퍼 감소
- 네트워크 지연이 100ms보다 길면 버퍼가 과소 추적 → 실제 메모리 사용량보다 작게 인식
- 네트워크 지연이 100ms보다 짧으면 불필요한 백프레셔 발생
관련 코드: ChunkedTrackStreamingService.java:1069
// AS-IS: 네트워크 전송 시간을 100ms로 "추정"
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(100); // 네트워크 전송 시간 고려
pendingBufferSize.addAndGet(-chunkSize);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
2.6 [중간] 두 개의 쿼리 관리 시스템 분리
현상: ActiveQueryManager와 StompTrackStreamingService.activeQueries가 각각 독립적으로 쿼리 상태를 관리
영향:
- 취소 시 한쪽만 처리될 가능성
- 상태 불일치로 인한 리소스 누수
관련 클래스:
ActiveQueryManager: 세션별 쿼리 관리 (ConcurrentHashMap<String, ActiveQuery>)StompTrackStreamingService: 쿼리별 컨텍스트 관리 (ConcurrentHashMap<String, QueryContext>)
2.7 [중간] StompTrackStreamingService의 병합 모드 OOM 위험
현상: 병합 모드에서 모든 트랙 데이터를 메모리에 적재
영향:
- 대량 데이터(100만+ 트랙) 요청 시 힙 메모리 초과 → OOM
- G1GC Full GC 빈발 → STW(Stop-The-World) 증가 → 서비스 지연
2.8 [낮음] WebSocket 설정의 하드코딩
현상: 모든 WebSocket 관련 제한값이 Java 코드에 하드코딩
영향:
- 운영 중 튜닝 불가 (재배포 필요)
- 환경별 차별화 불가
| 하드코딩 항목 | 위치 | 값 |
|---|---|---|
MAX_QUERIES_PER_SESSION |
TrackQueryInterceptor | 3 |
MAX_PENDING_BUFFER |
ChunkedTrackStreamingService | 50MB |
MAX_MESSAGE_SIZE_KB |
ChunkedTrackStreamingService | 1024KB |
BlockingQueue 크기 |
StompTrackStreamingService | 100 |
| Inbound core/max | WebSocketStompConfig | 10/20 |
| Outbound core/max | WebSocketStompConfig | 20/40 |
| sendBufferSizeLimit | WebSocketStompConfig | 256MB |
2.9 [낮음] 미사용 최적화 코드 사장
현상: TrackStreamingOptimizer에 네트워크 메트릭 기반 적응형 청크 조정 로직이 설계되어 있으나, 실제 서비스에서 연결/사용되지 않음
3. 개선 계획 (TO-BE)
3.1 Phase 1 — 긴급 안정화 (즉시 적용)
목표: 동시 요청 폭주 시 서비스 마비 방지
3.1.1 글로벌 동시 쿼리 제한 도입
AS-IS: 세션당 제한만 존재 (글로벌 제한 없음)
TO-BE: 서버 전체 동시 실행 쿼리 수를 Semaphore로 제한 + 대기 큐 구현
// TO-BE: TrackQueryInterceptor.java — 글로벌 동시 제한 추가
@Slf4j
@Component
@RequiredArgsConstructor
public class TrackQueryInterceptor implements ChannelInterceptor {
private final ConcurrentHashMap<String, AtomicInteger> sessionQueries = new ConcurrentHashMap<>();
// ▼ [추가] 글로벌 동시 쿼리 제한
@Value("${websocket.query.max-concurrent-global:20}")
private int maxConcurrentGlobal;
@Value("${websocket.query.max-per-session:3}")
private int maxQueriesPerSession;
@Value("${websocket.query.queue-timeout-seconds:30}")
private int queueTimeoutSeconds;
private Semaphore globalQuerySemaphore;
private final AtomicInteger waitingCount = new AtomicInteger(0);
@PostConstruct
public void init() {
this.globalQuerySemaphore = new Semaphore(maxConcurrentGlobal, true); // fair=true
log.info("Global query semaphore initialized: maxConcurrent={}, queueTimeout={}s",
maxConcurrentGlobal, queueTimeoutSeconds);
}
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(
message, StompHeaderAccessor.class);
if (accessor != null && StompCommand.SEND.equals(accessor.getCommand())) {
String destination = accessor.getDestination();
String sessionId = accessor.getSessionId();
if ("/app/tracks/query".equals(destination)) {
// 1. 세션당 제한 확인
AtomicInteger queryCount = sessionQueries.computeIfAbsent(
sessionId, k -> new AtomicInteger(0));
if (queryCount.get() >= maxQueriesPerSession) {
throw new IllegalStateException(
"Session query limit exceeded: " + maxQueriesPerSession);
}
// 2. 글로벌 대기 큐 진입
int waiting = waitingCount.incrementAndGet();
log.info("Query queued: session={}, waiting={}, active={}/{}",
sessionId, waiting,
maxConcurrentGlobal - globalQuerySemaphore.availablePermits(),
maxConcurrentGlobal);
try {
boolean acquired = globalQuerySemaphore.tryAcquire(
queueTimeoutSeconds, TimeUnit.SECONDS);
if (!acquired) {
throw new IllegalStateException(
"Query queue timeout after " + queueTimeoutSeconds + "s. " +
"Server is busy, please retry later.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Query interrupted while waiting");
} finally {
waitingCount.decrementAndGet();
}
queryCount.incrementAndGet();
}
}
return message;
}
// ▼ [추가] 쿼리 완료 시 호출 — Semaphore 반환 + 카운트 감소
public void releaseQuery(String sessionId) {
globalQuerySemaphore.release();
decrementQueryCount(sessionId);
log.debug("Query released: session={}, available={}/{}",
sessionId, globalQuerySemaphore.availablePermits(), maxConcurrentGlobal);
}
// 기존 메서드 유지
public void decrementQueryCount(String sessionId) {
AtomicInteger count = sessionQueries.get(sessionId);
if (count != null && count.get() > 0) {
count.decrementAndGet();
}
}
// 모니터링용
public int getActiveQueryCount() {
return maxConcurrentGlobal - globalQuerySemaphore.availablePermits();
}
public int getWaitingCount() {
return waitingCount.get();
}
}
application.yml 설정 추가:
websocket:
query:
max-concurrent-global: 20 # 서버 전체 동시 쿼리 상한
max-per-session: 3 # 세션당 동시 쿼리 상한
queue-timeout-seconds: 30 # 대기 큐 타임아웃
3.1.2 쿼리 완료/실패 시 리소스 반환 보장
AS-IS: decrementQueryCount() 호출부 없음 → 세션당 제한이 해제되지 않음
TO-BE: 스트리밍 서비스에서 쿼리 완료/실패/취소 시 반드시 releaseQuery() 호출
// TO-BE: ChunkedTrackStreamingService.java — finally 블록에서 리소스 반환
@Async("trackStreamingExecutor")
public void streamChunkedTracks(TrackQueryRequest request,
String queryId, String sessionId,
Consumer<TrackChunkResponse> chunkConsumer,
Consumer<QueryStatusUpdate> statusConsumer) {
try {
// ... 기존 스트리밍 로직 ...
} catch (Exception e) {
// ... 에러 처리 ...
} finally {
// ▼ [추가] 반드시 리소스 반환
trackQueryInterceptor.releaseQuery(sessionId);
activeQueryManager.completeQuery(sessionId);
cleanupQueryMetrics(queryId);
log.info("Query resources released: queryId={}, sessionId={}", queryId, sessionId);
}
}
// TO-BE: StompTrackStreamingService.java — 동일하게 finally 보장
@Async
public void streamTracks(TrackQueryRequest request, String queryId,
String sessionId, ...) {
try {
// ... 기존 스트리밍 로직 ...
} catch (Exception e) {
// ... 에러 처리 ...
} finally {
// ▼ [추가] 반드시 리소스 반환
activeQueries.remove(queryId);
trackQueryInterceptor.releaseQuery(sessionId);
log.info("Query resources released: queryId={}, sessionId={}", queryId, sessionId);
}
}
3.1.3 CachedThreadPool → 제한된 ThreadPoolExecutor 교체
AS-IS: CancellableQueryManager의 무제한 스레드 풀
// AS-IS
private final ExecutorService queryExecutor = Executors.newCachedThreadPool(...);
TO-BE: 상한이 있는 ThreadPoolExecutor로 교체
// TO-BE: CancellableQueryManager.java
private final ExecutorService queryExecutor = new ThreadPoolExecutor(
5, // corePoolSize
20, // maximumPoolSize (상한 설정)
60L, TimeUnit.SECONDS, // keepAliveTime
new LinkedBlockingQueue<>(100), // 대기 큐 크기 제한
r -> {
Thread thread = new Thread(r);
thread.setName("query-executor-" + thread.getId());
thread.setDaemon(true);
return thread;
},
new ThreadPoolExecutor.CallerRunsPolicy() // 큐 포화 시 호출 스레드에서 실행
);
3.2 Phase 2 — 취소 및 정리 로직 완성
목표: 불필요한 리소스 점유 방지, 쿼리 생명주기 완전 관리
3.2.1 ChunkedTrackStreamingService 쿼리 취소 구현
AS-IS: TODO 주석만 존재
// AS-IS
public void cancelQuery(String queryId) {
log.info("Cancelling chunked query: {}", queryId);
// TODO: 실제 취소 로직 구현
cleanupQueryMetrics(queryId);
}
TO-BE: AtomicBoolean 취소 플래그 + 쿼리 루프 내 체크 포인트
// TO-BE: ChunkedTrackStreamingService.java
// ▼ [추가] 쿼리별 취소 플래그 관리
private final ConcurrentHashMap<String, AtomicBoolean> queryCancelFlags = new ConcurrentHashMap<>();
@Async("trackStreamingExecutor")
public void streamChunkedTracks(TrackQueryRequest request,
String queryId, String sessionId, ...) {
// ▼ [추가] 취소 플래그 등록
AtomicBoolean cancelFlag = new AtomicBoolean(false);
queryCancelFlags.put(queryId, cancelFlag);
try {
// ... 기존 로직에서 processTableRange 호출 시 cancelFlag 전달 ...
for (Map.Entry<TableStrategy, List<TimeRange>> entry : strategyMap.entrySet()) {
// ▼ [추가] 테이블 처리 전 취소 확인
if (cancelFlag.get()) {
log.info("Query {} cancelled before processing table {}", queryId, entry.getKey());
break;
}
// processTableRange(request, strategy, range, sessionId, viewportVesselIds, cancelFlag);
}
} finally {
queryCancelFlags.remove(queryId);
trackQueryInterceptor.releaseQuery(sessionId);
activeQueryManager.completeQuery(sessionId);
cleanupQueryMetrics(queryId);
}
}
// ▼ [개선] 실제 취소 로직 구현
public void cancelQuery(String queryId) {
AtomicBoolean flag = queryCancelFlags.get(queryId);
if (flag != null) {
flag.set(true);
log.info("Cancel flag set for query: {}", queryId);
}
cleanupQueryMetrics(queryId);
}
// TO-BE: processTableRange 내부 — 주기적 취소 확인
while (rs.next() && trackCount < MAX_TRACKS_PER_CHUNK) {
// ▼ [추가] 1000건마다 취소 확인
if (trackCount % 1000 == 0 && cancelFlag.get()) {
log.info("Query cancelled during processing: table={}, processed={}", tableName, trackCount);
return Collections.emptyList();
}
// ... 기존 처리 로직 ...
}
3.2.2 쿼리 관리 시스템 통합
AS-IS: ActiveQueryManager와 StompTrackStreamingService.activeQueries 이원화
TO-BE: ActiveQueryManager를 단일 진실의 원천(Single Source of Truth)으로 통합
// TO-BE: ActiveQueryManager.java — 취소 플래그 및 릴리즈 콜백 통합
public static class ActiveQuery {
private final String sessionId;
private final String queryId;
private final LocalDateTime startTime;
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private volatile String currentTable;
private volatile int processedChunks;
private Runnable onComplete; // ▼ [추가] 완료 콜백
public void setOnComplete(Runnable callback) {
this.onComplete = callback;
}
}
public void completeQuery(String sessionId) {
ActiveQuery query = activeQueries.remove(sessionId);
if (query != null) {
// ▼ [추가] 등록된 콜백 실행 (Semaphore 반환 등)
if (query.onComplete != null) {
query.onComplete.run();
}
log.debug("Query completed: sessionId={}, queryId={}", sessionId, query.getQueryId());
}
}
3.3 Phase 3 — 백프레셔 고도화
목표: 정확한 버퍼 추적, 클라이언트 소비 속도 반영
3.3.1 콜백 기반 버퍼 추적
AS-IS: Thread.sleep(100ms) 후 무조건 버퍼 감소 (추정치)
TO-BE: 전송 완료 콜백에서 정확히 버퍼 감소
// TO-BE: 전송 완료 콜백 기반 버퍼 관리
// SimpMessagingTemplate의 convertAndSendToUser 대신 직접 전송 제어
// WebSocket 메시지 전송 래퍼
private void sendWithBackpressureTracking(String sessionId, String destination,
TrackChunkResponse response, int chunkSize) {
pendingBufferSize.addAndGet(chunkSize);
try {
messagingTemplate.convertAndSendToUser(sessionId, destination, response);
// 전송 성공 즉시 버퍼 감소
pendingBufferSize.addAndGet(-chunkSize);
} catch (Exception e) {
// 전송 실패 시에도 버퍼 감소
pendingBufferSize.addAndGet(-chunkSize);
throw e;
}
}
3.3.2 적응형 전송 지연
AS-IS: 총 트랙 수에 따른 정적 지연
TO-BE: 실시간 버퍼 사용률 + 최근 전송 성공률에 따른 동적 지연
// TO-BE: 동적 지연 계산
private int calculateAdaptiveDelay(BackpressureMetrics metrics) {
double bufferUsage = (double) pendingBufferSize.get() / MAX_PENDING_BUFFER;
if (bufferUsage > 0.8) return 200; // 80% 이상: 강한 억제
else if (bufferUsage > 0.5) return 100; // 50% 이상: 중간 억제
else if (bufferUsage > 0.3) return 50; // 30% 이상: 약한 억제
return 10; // 정상: 최소 지연
}
3.4 Phase 4 — 설정 외부화 및 모니터링
목표: 운영 중 튜닝 가능, 실시간 상황 파악
3.4.1 WebSocket 설정 Properties 클래스 도입
// TO-BE: WebSocketProperties.java (신규)
@Data
@Component
@ConfigurationProperties(prefix = "websocket")
public class WebSocketProperties {
private QueryProperties query = new QueryProperties();
private TransportProperties transport = new TransportProperties();
private BackpressureProperties backpressure = new BackpressureProperties();
@Data
public static class QueryProperties {
private int maxConcurrentGlobal = 20;
private int maxPerSession = 3;
private int queueTimeoutSeconds = 30;
}
@Data
public static class TransportProperties {
private int inboundCorePoolSize = 10;
private int inboundMaxPoolSize = 20;
private int inboundQueueCapacity = 100;
private int outboundCorePoolSize = 20;
private int outboundMaxPoolSize = 40;
private int outboundQueueCapacity = 5000;
private int messageSizeLimitMb = 50;
private int sendBufferSizeLimitMb = 256;
private int sendTimeLimitSeconds = 120;
}
@Data
public static class BackpressureProperties {
private long maxPendingBufferMb = 50;
private int maxMessageSizeKb = 1024;
private int minMessageSizeKb = 256;
}
}
application.yml 통합 설정:
websocket:
query:
max-concurrent-global: 20
max-per-session: 3
queue-timeout-seconds: 30
transport:
inbound-core-pool-size: 10
inbound-max-pool-size: 20
inbound-queue-capacity: 100
outbound-core-pool-size: 20
outbound-max-pool-size: 40
outbound-queue-capacity: 5000
message-size-limit-mb: 50
send-buffer-size-limit-mb: 256
send-time-limit-seconds: 120
backpressure:
max-pending-buffer-mb: 50
max-message-size-kb: 1024
min-message-size-kb: 256
3.4.2 모니터링 엔드포인트 추가
// TO-BE: WebSocketMonitorController.java (신규)
@RestController
@RequestMapping("/api/v1/monitor/websocket")
@RequiredArgsConstructor
public class WebSocketMonitorController {
private final TrackQueryInterceptor queryInterceptor;
private final ActiveQueryManager activeQueryManager;
@GetMapping("/status")
public Map<String, Object> getStatus() {
return Map.of(
"activeQueries", queryInterceptor.getActiveQueryCount(),
"waitingInQueue", queryInterceptor.getWaitingCount(),
"queryDetails", activeQueryManager.getAllActiveQueries().entrySet().stream()
.map(e -> Map.of(
"sessionId", e.getKey(),
"queryId", e.getValue().getQueryId(),
"startTime", e.getValue().getStartTime().toString(),
"table", String.valueOf(e.getValue().getCurrentTable()),
"chunks", e.getValue().getProcessedChunks()
)).collect(Collectors.toList())
);
}
}
4. 개선 우선순위 및 기대 효과
| 순위 | 개선 항목 | 심각도 | 난이도 | 기대 효과 |
|---|---|---|---|---|
| 1 | 글로벌 동시 쿼리 제한 (Semaphore) | 심각 | 낮음 | 동시 요청 폭주 시 서비스 마비 방지 |
| 2 | 쿼리 완료 시 리소스 반환 (releaseQuery) |
심각 | 낮음 | 세션당 제한 정상 동작 |
| 3 | CachedThreadPool 교체 | 심각 | 낮음 | 무제한 스레드 생성 방지 |
| 4 | ChunkedTrackStreamingService 취소 구현 | 심각 | 중간 | 불필요한 리소스 점유 해소 |
| 5 | 쿼리 관리 시스템 통합 | 중간 | 중간 | 상태 불일치 해소 |
| 6 | 백프레셔 콜백 기반 전환 | 중간 | 중간 | 정확한 메모리 관리 |
| 7 | 설정 외부화 | 낮음 | 낮음 | 운영 중 튜닝 가능 |
| 8 | 모니터링 엔드포인트 | 낮음 | 낮음 | 실시간 상태 파악 |
5. 제한 사항 및 고려 사항
5.1 글로벌 동시 제한 수 결정 기준
maxConcurrentGlobal = min(DB커넥션풀 / 2, Async스레드풀max)
= min(60 / 2, 30)
= 20 (권장)
- DB 커넥션 풀(60개)의 50%를 WebSocket 쿼리에 할당
- 나머지 50%는 REST API, 배치 작업, 헬스체크 등에 예비
trackStreamingExecutormax(30) 이하로 설정하여 스레드 풀 포화 방지
5.2 대기 큐 타임아웃
- 30초 권장 (클라이언트 UX 고려)
- 타임아웃 시 클라이언트에 명확한 에러 메시지 전달 필요
- 프론트엔드에서 재시도 로직 또는 "서버가 바쁩니다" 안내 필요
5.3 기존 클라이언트 호환성
- STOMP 프로토콜 레벨 변경 없음 (하위 호환)
- 에러 메시지 형식이 변경될 수 있으므로 프론트엔드 에러 핸들링 확인 필요
부록 A. 관련 파일 목록
| 파일 | 역할 |
|---|---|
global/websocket/interceptor/TrackQueryInterceptor.java |
세션별 쿼리 제한 인터셉터 |
global/websocket/service/ChunkedTrackStreamingService.java |
청크 기반 스트리밍 (백프레셔) |
global/websocket/service/StompTrackStreamingService.java |
STOMP 스트리밍 (큐 + 지연) |
global/websocket/service/ActiveQueryManager.java |
활성 쿼리 추적 |
global/config/AsyncConfig.java |
비동기 스레드 풀 설정 |
global/config/WebSocketStompConfig.java |
WebSocket STOMP 설정 |
domain/vessel/service/query/CancellableQueryManager.java |
쿼리 취소 관리 |
global/websocket/listener/WebSocketDisconnectListener.java |
세션 종료 리스너 |
domain/vessel/service/optimization/TrackStreamingOptimizer.java |
적응형 최적화 (미사용) |
monitoring/service/TrackStreamingMetrics.java |
스트리밍 메트릭 |
부록 B. 운영 환경 리소스 현황
| 리소스 | 설정값 | 비고 |
|---|---|---|
| JVM Heap | 8~16GB (서버 메모리의 1/3) | G1GC |
| Tomcat Threads | max 200 | |
| Tomcat Connections | max 10,000 | |
| DB Pool (Query) | max 60 | 주 사용 |
| DB Pool (Collect) | max 20 | 읽기 전용 |
| DB Pool (Batch) | max 20 | 메타데이터 |
| trackStreamingExecutor | core 15, max 30, queue 500 | CallerRunsPolicy |
| STOMP Inbound | core 10, max 20, queue 100 | |
| STOMP Outbound | core 20, max 40, queue 5000 | |
| WebSocket Buffer | send 256MB, message 50MB |