signal-batch/docs/websocket-performance-improvement-report.md
HeungTak Lee 78ff307785 feat: 글로벌 동시 쿼리 제한(Semaphore) 및 쿼리 완료 시 리소스 반환 보장
Phase 1.1 + 1.2: WebSocket 리플레이 요청 동시 부하 제어
- ActiveQueryManager에 Fair Semaphore 기반 글로벌 동시 쿼리 제한 추가 (기본 30개)
- @Async 스트리밍 메서드 내에서 슬롯 획득 (인바운드 채널 블로킹 방지)
- 쿼리 완료/실패/취소 시 finally 블록에서 반드시 리소스 반환
  - 글로벌 Semaphore 슬롯 반환
  - 세션별 쿼리 카운트 감소 (기존 누락 수정)
  - ActiveQueryManager 쿼리 정리
- TrackQueryInterceptor 세션 제한값 외부 설정화 (@Value)
- application-prod.yml에 websocket.query 설정 추가

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-06 13:36:10 +09:00

32 KiB
Raw Blame 히스토리

WebSocket 성능 부하 개선 보고서

선박 항적 리플레이 서비스 — 동시 요청 부하 대응

항목 내용
작성일 2026-02-06
대상 시스템 Signal Batch — 선박 항적 조회/리플레이 WebSocket 서비스
운영 환경 Linux, vessel-batch-control.shrun-on-query-server-dev.sh (prod 프로파일)
문제 상황 다수 클라이언트의 리플레이 요청 동시 유입 시 서비스 장애 발생

1. 현황 분석 (AS-IS)

1.1 시스템 아키텍처 개요

클라이언트 (N개)
    │ WebSocket STOMP (/ws-tracks)
    ▼
┌─────────────────────────────────────────────────┐
│  Tomcat (max-threads: 200, max-connections: 10000) │
│  ┌─────────────────────────────────────────────┐   │
│  │  STOMP Inbound Channel (core:10, max:20)    │   │
│  │  → TrackQueryInterceptor (세션당 3개 제한)    │   │
│  └────────────┬────────────────────────────────┘   │
│               ▼                                     │
│  ┌─────────────────────────────────────────────┐   │
│  │  @Async trackStreamingExecutor              │   │
│  │  (core:15, max:30, queue:500)               │   │
│  │  ┌───────────────────────────────────────┐  │   │
│  │  │ ChunkedTrackStreamingService          │  │   │
│  │  │ └ 백프레셔: pendingBufferSize ≤ 50MB  │  │   │
│  │  │ StompTrackStreamingService            │  │   │
│  │  │ └ BlockingQueue(100) + Thread.sleep   │  │   │
│  │  └───────────────────────────────────────┘  │   │
│  └────────────┬────────────────────────────────┘   │
│               ▼                                     │
│  ┌─────────────────────────────────────────────┐   │
│  │  STOMP Outbound Channel (core:20, max:40)   │   │
│  │  (queue: 5000)                              │   │
│  └─────────────────────────────────────────────┘   │
│               ▼                                     │
│  PostgreSQL + PostGIS (HikariCP max: 60)            │
└─────────────────────────────────────────────────────┘

1.2 현재 구현된 부하 제어 메커니즘

(A) 세션당 동시 쿼리 제한 — TrackQueryInterceptor

파일: global/websocket/interceptor/TrackQueryInterceptor.java

// AS-IS: 세션당 최대 3개 동시 쿼리 제한
private final ConcurrentHashMap<String, AtomicInteger> sessionQueries = new ConcurrentHashMap<>();
private static final int MAX_QUERIES_PER_SESSION = 3;

@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
    // ...
    if ("/app/tracks/query".equals(destination)) {
        AtomicInteger queryCount = sessionQueries.computeIfAbsent(
            sessionId, k -> new AtomicInteger(0)
        );
        if (queryCount.get() >= MAX_QUERIES_PER_SESSION) {
            throw new IllegalStateException("Maximum concurrent queries exceeded");
        }
        queryCount.incrementAndGet();
    }
    return message;
}

// 쿼리 완료 시 카운트 감소 메서드 (존재하지만 호출부 없음)
public void decrementQueryCount(String sessionId) {
    AtomicInteger count = sessionQueries.get(sessionId);
    if (count != null) {
        count.decrementAndGet();
    }
}

(B) 메모리 버퍼 기반 백프레셔 — ChunkedTrackStreamingService

파일: global/websocket/service/ChunkedTrackStreamingService.java

// AS-IS: 50MB 버퍼 기반 백프레셔
private final AtomicLong pendingBufferSize = new AtomicLong(0);
private static final long MAX_PENDING_BUFFER = 50 * 1024 * 1024; // 50MB

// 버퍼 초과 시 busy-wait
int backpressureWaitCount = 0;
while (pendingBufferSize.get() > MAX_PENDING_BUFFER) {
    Thread.sleep(50);
    backpressureWaitCount++;
    // 500ms 이상 대기 시 청크 크기 축소
    if (backpressureWaitCount > 10 && metrics.dynamicChunkSizeKB > MIN_MESSAGE_SIZE_KB) {
        metrics.dynamicChunkSizeKB = Math.max(MIN_MESSAGE_SIZE_KB,
                metrics.dynamicChunkSizeKB - 512);
    }
}

// 전송 완료 후 버퍼 감소 (비동기 + 100ms 고정 대기)
CompletableFuture.runAsync(() -> {
    Thread.sleep(100); // 네트워크 전송 시간 "추정"
    pendingBufferSize.addAndGet(-chunkSize);
});

(C) 비동기 스레드 풀 — AsyncConfig

파일: global/config/AsyncConfig.java

// AS-IS: 고정 크기 스레드 풀
@Bean(name = "trackStreamingExecutor")
public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(15);
    executor.setMaxPoolSize(30);
    executor.setQueueCapacity(500);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    return executor;
}

(D) 정적 전송 지연 — StompTrackStreamingService

파일: global/websocket/service/StompTrackStreamingService.java

// AS-IS: 데이터 크기에 따른 고정 지연
private int calculateChunkDelay(int chunkSize, int totalTracks) {
    if (totalTracks > 1000000) return 200;
    else if (totalTracks > 500000) return 150;
    else if (totalTracks > 100000) return 100;
    else if (totalTracks > 50000) return 50;
    else if (totalTracks > 10000) return 30;
    return 10;
}

(E) STOMP 채널 설정 — WebSocketStompConfig

파일: global/config/WebSocketStompConfig.java

// AS-IS: 인바운드/아웃바운드 채널 스레드 풀
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
    registration.taskExecutor()
        .corePoolSize(10).maxPoolSize(20).queueCapacity(100);
}

@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
    registration.taskExecutor()
        .corePoolSize(20).maxPoolSize(40).queueCapacity(5000);
}

@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
    registration
        .setMessageSizeLimit(50 * 1024 * 1024)      // 50MB
        .setSendBufferSizeLimit(256 * 1024 * 1024)  // 256MB
        .setSendTimeLimit(120 * 1000);               // 120초
}

1.3 운영 환경 설정

항목 설정값
JVM 힙 서버 메모리의 1/3 (최소 8GB, 최대 16GB)
GC G1GC, MaxGCPauseMillis=200
Tomcat 스레드 max: 200, min-spare: 10
Tomcat 연결 max-connections: 10000, accept-count: 100
DB 커넥션 풀 (Query) max: 60, min-idle: 10
DB 커넥션 풀 (Collect) max: 20, min-idle: 5
DB 커넥션 풀 (Batch) max: 20, min-idle: 10

2. 식별된 문제점

2.1 [심각] 글로벌 동시 쿼리 제한 없음

현상: 세션당 3개만 제한하므로, 100개 세션이 동시 접속하면 최대 300개 동시 쿼리 발생 가능

영향:

  • DB 커넥션 풀(60개) 고갈 → 커넥션 대기 타임아웃
  • trackStreamingExecutor 큐(500) 포화 → CallerRunsPolicy에 의해 인바운드 채널 스레드 블로킹
  • 인바운드 채널(max 20) 블로킹 → 새 WebSocket 요청 처리 불가 → 전체 서비스 마비

관련 코드: TrackQueryInterceptor.java — 글로벌 카운터 없음

[시나리오: 50개 클라이언트 × 3개 쿼리 = 150개 동시 요청]

trackStreamingExecutor (max: 30 스레드)
    → 30개 실행 + 500개 큐 대기
    → 큐 포화 시 CallerRunsPolicy 발동
    → 인바운드 채널 스레드(20개)에서 직접 실행
    → 인바운드 채널 블로킹 → WebSocket 메시지 처리 중단
    → 하트비트 실패 → 세션 끊김 → 연쇄 장애

2.2 [심각] 쿼리 카운트 감소 로직 누락

현상: TrackQueryInterceptor.decrementQueryCount() 메서드는 존재하지만, 쿼리 완료/실패 시 호출하는 코드가 없음

영향:

  • 세션이 유지되는 동안 쿼리 카운트가 계속 증가만 함
  • 3개 쿼리 후 해당 세션은 더 이상 쿼리 불가
  • DISCONNECT 시에만 sessionQueries.remove()로 정리됨
  • 결과적으로 세션당 제한이 사실상 무의미 (장기 연결에서 3개 쿼리 후 차단)

관련 코드:

// TrackQueryInterceptor.java:68 - 정의만 존재, 호출부 없음
public void decrementQueryCount(String sessionId) {
    AtomicInteger count = sessionQueries.get(sessionId);
    if (count != null) {
        count.decrementAndGet();
    }
}

2.3 [심각] CachedThreadPool 무제한 스레드 생성

현상: CancellableQueryManagerExecutors.newCachedThreadPool() 사용

영향:

  • 부하 시 무제한 스레드 생성 → OOM 또는 OS 스레드 한도 초과
  • 스레드 컨텍스트 스위칭 오버헤드 급증

관련 코드: CancellableQueryManager.java

// AS-IS: 상한 없는 스레드 풀
private final ExecutorService queryExecutor = Executors.newCachedThreadPool(r -> {
    Thread thread = new Thread(r);
    thread.setName("query-executor-" + thread.getId());
    thread.setDaemon(true);
    return thread;
});

2.4 [심각] ChunkedTrackStreamingService 쿼리 취소 미구현

현상: cancelQuery() 메서드가 메트릭 정리만 수행하고 실제 스트리밍 중단 로직이 없음

영향:

  • 클라이언트가 연결을 끊어도 서버에서 DB 쿼리 + 데이터 가공이 계속 실행
  • 리소스(DB 커넥션, CPU, 메모리)가 불필요하게 점유됨

관련 코드: ChunkedTrackStreamingService.java:1176

// AS-IS: 취소 시 메트릭만 정리, 실행 중인 쿼리는 계속됨
public void cancelQuery(String queryId) {
    log.info("Cancelling chunked query: {}", queryId);
    // TODO: 실제 취소 로직 구현
    cleanupQueryMetrics(queryId);
}

2.5 [중간] 백프레셔의 비정확한 버퍼 추적

현상: 전송 완료 후 버퍼 크기 감소를 CompletableFuture + Thread.sleep(100ms)로 처리

영향:

  • 실제 네트워크 전송 완료와 무관하게 100ms 후 무조건 버퍼 감소
  • 네트워크 지연이 100ms보다 길면 버퍼가 과소 추적 → 실제 메모리 사용량보다 작게 인식
  • 네트워크 지연이 100ms보다 짧으면 불필요한 백프레셔 발생

관련 코드: ChunkedTrackStreamingService.java:1069

// AS-IS: 네트워크 전송 시간을 100ms로 "추정"
CompletableFuture.runAsync(() -> {
    try {
        Thread.sleep(100); // 네트워크 전송 시간 고려
        pendingBufferSize.addAndGet(-chunkSize);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
});

2.6 [중간] 두 개의 쿼리 관리 시스템 분리

현상: ActiveQueryManagerStompTrackStreamingService.activeQueries가 각각 독립적으로 쿼리 상태를 관리

영향:

  • 취소 시 한쪽만 처리될 가능성
  • 상태 불일치로 인한 리소스 누수

관련 클래스:

  • ActiveQueryManager: 세션별 쿼리 관리 (ConcurrentHashMap<String, ActiveQuery>)
  • StompTrackStreamingService: 쿼리별 컨텍스트 관리 (ConcurrentHashMap<String, QueryContext>)

2.7 [중간] StompTrackStreamingService의 병합 모드 OOM 위험

현상: 병합 모드에서 모든 트랙 데이터를 메모리에 적재

영향:

  • 대량 데이터(100만+ 트랙) 요청 시 힙 메모리 초과 → OOM
  • G1GC Full GC 빈발 → STW(Stop-The-World) 증가 → 서비스 지연

2.8 [낮음] WebSocket 설정의 하드코딩

현상: 모든 WebSocket 관련 제한값이 Java 코드에 하드코딩

영향:

  • 운영 중 튜닝 불가 (재배포 필요)
  • 환경별 차별화 불가
하드코딩 항목 위치
MAX_QUERIES_PER_SESSION TrackQueryInterceptor 3
MAX_PENDING_BUFFER ChunkedTrackStreamingService 50MB
MAX_MESSAGE_SIZE_KB ChunkedTrackStreamingService 1024KB
BlockingQueue 크기 StompTrackStreamingService 100
Inbound core/max WebSocketStompConfig 10/20
Outbound core/max WebSocketStompConfig 20/40
sendBufferSizeLimit WebSocketStompConfig 256MB

2.9 [낮음] 미사용 최적화 코드 사장

현상: TrackStreamingOptimizer에 네트워크 메트릭 기반 적응형 청크 조정 로직이 설계되어 있으나, 실제 서비스에서 연결/사용되지 않음


3. 개선 계획 (TO-BE)

3.1 Phase 1 — 긴급 안정화 (즉시 적용)

목표: 동시 요청 폭주 시 서비스 마비 방지

3.1.1 글로벌 동시 쿼리 제한 도입

AS-IS: 세션당 제한만 존재 (글로벌 제한 없음)

TO-BE: 서버 전체 동시 실행 쿼리 수를 Semaphore로 제한 + 대기 큐 구현

// TO-BE: TrackQueryInterceptor.java — 글로벌 동시 제한 추가
@Slf4j
@Component
@RequiredArgsConstructor
public class TrackQueryInterceptor implements ChannelInterceptor {

    private final ConcurrentHashMap<String, AtomicInteger> sessionQueries = new ConcurrentHashMap<>();

    // ▼ [추가] 글로벌 동시 쿼리 제한
    @Value("${websocket.query.max-concurrent-global:20}")
    private int maxConcurrentGlobal;

    @Value("${websocket.query.max-per-session:3}")
    private int maxQueriesPerSession;

    @Value("${websocket.query.queue-timeout-seconds:30}")
    private int queueTimeoutSeconds;

    private Semaphore globalQuerySemaphore;
    private final AtomicInteger waitingCount = new AtomicInteger(0);

    @PostConstruct
    public void init() {
        this.globalQuerySemaphore = new Semaphore(maxConcurrentGlobal, true); // fair=true
        log.info("Global query semaphore initialized: maxConcurrent={}, queueTimeout={}s",
                maxConcurrentGlobal, queueTimeoutSeconds);
    }

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(
            message, StompHeaderAccessor.class);

        if (accessor != null && StompCommand.SEND.equals(accessor.getCommand())) {
            String destination = accessor.getDestination();
            String sessionId = accessor.getSessionId();

            if ("/app/tracks/query".equals(destination)) {
                // 1. 세션당 제한 확인
                AtomicInteger queryCount = sessionQueries.computeIfAbsent(
                    sessionId, k -> new AtomicInteger(0));
                if (queryCount.get() >= maxQueriesPerSession) {
                    throw new IllegalStateException(
                        "Session query limit exceeded: " + maxQueriesPerSession);
                }

                // 2. 글로벌 대기 큐 진입
                int waiting = waitingCount.incrementAndGet();
                log.info("Query queued: session={}, waiting={}, active={}/{}",
                        sessionId, waiting,
                        maxConcurrentGlobal - globalQuerySemaphore.availablePermits(),
                        maxConcurrentGlobal);
                try {
                    boolean acquired = globalQuerySemaphore.tryAcquire(
                        queueTimeoutSeconds, TimeUnit.SECONDS);
                    if (!acquired) {
                        throw new IllegalStateException(
                            "Query queue timeout after " + queueTimeoutSeconds + "s. " +
                            "Server is busy, please retry later.");
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("Query interrupted while waiting");
                } finally {
                    waitingCount.decrementAndGet();
                }

                queryCount.incrementAndGet();
            }
        }
        return message;
    }

    // ▼ [추가] 쿼리 완료 시 호출 — Semaphore 반환 + 카운트 감소
    public void releaseQuery(String sessionId) {
        globalQuerySemaphore.release();
        decrementQueryCount(sessionId);
        log.debug("Query released: session={}, available={}/{}",
                sessionId, globalQuerySemaphore.availablePermits(), maxConcurrentGlobal);
    }

    // 기존 메서드 유지
    public void decrementQueryCount(String sessionId) {
        AtomicInteger count = sessionQueries.get(sessionId);
        if (count != null && count.get() > 0) {
            count.decrementAndGet();
        }
    }

    // 모니터링용
    public int getActiveQueryCount() {
        return maxConcurrentGlobal - globalQuerySemaphore.availablePermits();
    }

    public int getWaitingCount() {
        return waitingCount.get();
    }
}

application.yml 설정 추가:

websocket:
  query:
    max-concurrent-global: 20     # 서버 전체 동시 쿼리 상한
    max-per-session: 3            # 세션당 동시 쿼리 상한
    queue-timeout-seconds: 30     # 대기 큐 타임아웃

3.1.2 쿼리 완료/실패 시 리소스 반환 보장

AS-IS: decrementQueryCount() 호출부 없음 → 세션당 제한이 해제되지 않음

TO-BE: 스트리밍 서비스에서 쿼리 완료/실패/취소 시 반드시 releaseQuery() 호출

// TO-BE: ChunkedTrackStreamingService.java — finally 블록에서 리소스 반환
@Async("trackStreamingExecutor")
public void streamChunkedTracks(TrackQueryRequest request,
                                String queryId, String sessionId,
                                Consumer<TrackChunkResponse> chunkConsumer,
                                Consumer<QueryStatusUpdate> statusConsumer) {
    try {
        // ... 기존 스트리밍 로직 ...
    } catch (Exception e) {
        // ... 에러 처리 ...
    } finally {
        // ▼ [추가] 반드시 리소스 반환
        trackQueryInterceptor.releaseQuery(sessionId);
        activeQueryManager.completeQuery(sessionId);
        cleanupQueryMetrics(queryId);
        log.info("Query resources released: queryId={}, sessionId={}", queryId, sessionId);
    }
}
// TO-BE: StompTrackStreamingService.java — 동일하게 finally 보장
@Async
public void streamTracks(TrackQueryRequest request, String queryId,
                         String sessionId, ...) {
    try {
        // ... 기존 스트리밍 로직 ...
    } catch (Exception e) {
        // ... 에러 처리 ...
    } finally {
        // ▼ [추가] 반드시 리소스 반환
        activeQueries.remove(queryId);
        trackQueryInterceptor.releaseQuery(sessionId);
        log.info("Query resources released: queryId={}, sessionId={}", queryId, sessionId);
    }
}

3.1.3 CachedThreadPool → 제한된 ThreadPoolExecutor 교체

AS-IS: CancellableQueryManager의 무제한 스레드 풀

// AS-IS
private final ExecutorService queryExecutor = Executors.newCachedThreadPool(...);

TO-BE: 상한이 있는 ThreadPoolExecutor로 교체

// TO-BE: CancellableQueryManager.java
private final ExecutorService queryExecutor = new ThreadPoolExecutor(
    5,                                    // corePoolSize
    20,                                   // maximumPoolSize (상한 설정)
    60L, TimeUnit.SECONDS,               // keepAliveTime
    new LinkedBlockingQueue<>(100),       // 대기 큐 크기 제한
    r -> {
        Thread thread = new Thread(r);
        thread.setName("query-executor-" + thread.getId());
        thread.setDaemon(true);
        return thread;
    },
    new ThreadPoolExecutor.CallerRunsPolicy() // 큐 포화 시 호출 스레드에서 실행
);

3.2 Phase 2 — 취소 및 정리 로직 완성

목표: 불필요한 리소스 점유 방지, 쿼리 생명주기 완전 관리

3.2.1 ChunkedTrackStreamingService 쿼리 취소 구현

AS-IS: TODO 주석만 존재

// AS-IS
public void cancelQuery(String queryId) {
    log.info("Cancelling chunked query: {}", queryId);
    // TODO: 실제 취소 로직 구현
    cleanupQueryMetrics(queryId);
}

TO-BE: AtomicBoolean 취소 플래그 + 쿼리 루프 내 체크 포인트

// TO-BE: ChunkedTrackStreamingService.java

// ▼ [추가] 쿼리별 취소 플래그 관리
private final ConcurrentHashMap<String, AtomicBoolean> queryCancelFlags = new ConcurrentHashMap<>();

@Async("trackStreamingExecutor")
public void streamChunkedTracks(TrackQueryRequest request,
                                String queryId, String sessionId, ...) {
    // ▼ [추가] 취소 플래그 등록
    AtomicBoolean cancelFlag = new AtomicBoolean(false);
    queryCancelFlags.put(queryId, cancelFlag);

    try {
        // ... 기존 로직에서 processTableRange 호출 시 cancelFlag 전달 ...
        for (Map.Entry<TableStrategy, List<TimeRange>> entry : strategyMap.entrySet()) {
            // ▼ [추가] 테이블 처리 전 취소 확인
            if (cancelFlag.get()) {
                log.info("Query {} cancelled before processing table {}", queryId, entry.getKey());
                break;
            }
            // processTableRange(request, strategy, range, sessionId, viewportVesselIds, cancelFlag);
        }
    } finally {
        queryCancelFlags.remove(queryId);
        trackQueryInterceptor.releaseQuery(sessionId);
        activeQueryManager.completeQuery(sessionId);
        cleanupQueryMetrics(queryId);
    }
}

// ▼ [개선] 실제 취소 로직 구현
public void cancelQuery(String queryId) {
    AtomicBoolean flag = queryCancelFlags.get(queryId);
    if (flag != null) {
        flag.set(true);
        log.info("Cancel flag set for query: {}", queryId);
    }
    cleanupQueryMetrics(queryId);
}
// TO-BE: processTableRange 내부 — 주기적 취소 확인
while (rs.next() && trackCount < MAX_TRACKS_PER_CHUNK) {
    // ▼ [추가] 1000건마다 취소 확인
    if (trackCount % 1000 == 0 && cancelFlag.get()) {
        log.info("Query cancelled during processing: table={}, processed={}", tableName, trackCount);
        return Collections.emptyList();
    }
    // ... 기존 처리 로직 ...
}

3.2.2 쿼리 관리 시스템 통합

AS-IS: ActiveQueryManagerStompTrackStreamingService.activeQueries 이원화

TO-BE: ActiveQueryManager를 단일 진실의 원천(Single Source of Truth)으로 통합

// TO-BE: ActiveQueryManager.java — 취소 플래그 및 릴리즈 콜백 통합
public static class ActiveQuery {
    private final String sessionId;
    private final String queryId;
    private final LocalDateTime startTime;
    private final AtomicBoolean cancelled = new AtomicBoolean(false);
    private volatile String currentTable;
    private volatile int processedChunks;
    private Runnable onComplete;  // ▼ [추가] 완료 콜백

    public void setOnComplete(Runnable callback) {
        this.onComplete = callback;
    }
}

public void completeQuery(String sessionId) {
    ActiveQuery query = activeQueries.remove(sessionId);
    if (query != null) {
        // ▼ [추가] 등록된 콜백 실행 (Semaphore 반환 등)
        if (query.onComplete != null) {
            query.onComplete.run();
        }
        log.debug("Query completed: sessionId={}, queryId={}", sessionId, query.getQueryId());
    }
}

3.3 Phase 3 — 백프레셔 고도화

목표: 정확한 버퍼 추적, 클라이언트 소비 속도 반영

3.3.1 콜백 기반 버퍼 추적

AS-IS: Thread.sleep(100ms) 후 무조건 버퍼 감소 (추정치)

TO-BE: 전송 완료 콜백에서 정확히 버퍼 감소

// TO-BE: 전송 완료 콜백 기반 버퍼 관리
// SimpMessagingTemplate의 convertAndSendToUser 대신 직접 전송 제어

// WebSocket 메시지 전송 래퍼
private void sendWithBackpressureTracking(String sessionId, String destination,
                                          TrackChunkResponse response, int chunkSize) {
    pendingBufferSize.addAndGet(chunkSize);
    try {
        messagingTemplate.convertAndSendToUser(sessionId, destination, response);
        // 전송 성공 즉시 버퍼 감소
        pendingBufferSize.addAndGet(-chunkSize);
    } catch (Exception e) {
        // 전송 실패 시에도 버퍼 감소
        pendingBufferSize.addAndGet(-chunkSize);
        throw e;
    }
}

3.3.2 적응형 전송 지연

AS-IS: 총 트랙 수에 따른 정적 지연

TO-BE: 실시간 버퍼 사용률 + 최근 전송 성공률에 따른 동적 지연

// TO-BE: 동적 지연 계산
private int calculateAdaptiveDelay(BackpressureMetrics metrics) {
    double bufferUsage = (double) pendingBufferSize.get() / MAX_PENDING_BUFFER;

    if (bufferUsage > 0.8) return 200;       // 80% 이상: 강한 억제
    else if (bufferUsage > 0.5) return 100;  // 50% 이상: 중간 억제
    else if (bufferUsage > 0.3) return 50;   // 30% 이상: 약한 억제
    return 10;                                // 정상: 최소 지연
}

3.4 Phase 4 — 설정 외부화 및 모니터링

목표: 운영 중 튜닝 가능, 실시간 상황 파악

3.4.1 WebSocket 설정 Properties 클래스 도입

// TO-BE: WebSocketProperties.java (신규)
@Data
@Component
@ConfigurationProperties(prefix = "websocket")
public class WebSocketProperties {

    private QueryProperties query = new QueryProperties();
    private TransportProperties transport = new TransportProperties();
    private BackpressureProperties backpressure = new BackpressureProperties();

    @Data
    public static class QueryProperties {
        private int maxConcurrentGlobal = 20;
        private int maxPerSession = 3;
        private int queueTimeoutSeconds = 30;
    }

    @Data
    public static class TransportProperties {
        private int inboundCorePoolSize = 10;
        private int inboundMaxPoolSize = 20;
        private int inboundQueueCapacity = 100;
        private int outboundCorePoolSize = 20;
        private int outboundMaxPoolSize = 40;
        private int outboundQueueCapacity = 5000;
        private int messageSizeLimitMb = 50;
        private int sendBufferSizeLimitMb = 256;
        private int sendTimeLimitSeconds = 120;
    }

    @Data
    public static class BackpressureProperties {
        private long maxPendingBufferMb = 50;
        private int maxMessageSizeKb = 1024;
        private int minMessageSizeKb = 256;
    }
}

application.yml 통합 설정:

websocket:
  query:
    max-concurrent-global: 20
    max-per-session: 3
    queue-timeout-seconds: 30
  transport:
    inbound-core-pool-size: 10
    inbound-max-pool-size: 20
    inbound-queue-capacity: 100
    outbound-core-pool-size: 20
    outbound-max-pool-size: 40
    outbound-queue-capacity: 5000
    message-size-limit-mb: 50
    send-buffer-size-limit-mb: 256
    send-time-limit-seconds: 120
  backpressure:
    max-pending-buffer-mb: 50
    max-message-size-kb: 1024
    min-message-size-kb: 256

3.4.2 모니터링 엔드포인트 추가

// TO-BE: WebSocketMonitorController.java (신규)
@RestController
@RequestMapping("/api/v1/monitor/websocket")
@RequiredArgsConstructor
public class WebSocketMonitorController {

    private final TrackQueryInterceptor queryInterceptor;
    private final ActiveQueryManager activeQueryManager;

    @GetMapping("/status")
    public Map<String, Object> getStatus() {
        return Map.of(
            "activeQueries", queryInterceptor.getActiveQueryCount(),
            "waitingInQueue", queryInterceptor.getWaitingCount(),
            "queryDetails", activeQueryManager.getAllActiveQueries().entrySet().stream()
                .map(e -> Map.of(
                    "sessionId", e.getKey(),
                    "queryId", e.getValue().getQueryId(),
                    "startTime", e.getValue().getStartTime().toString(),
                    "table", String.valueOf(e.getValue().getCurrentTable()),
                    "chunks", e.getValue().getProcessedChunks()
                )).collect(Collectors.toList())
        );
    }
}

4. 개선 우선순위 및 기대 효과

순위 개선 항목 심각도 난이도 기대 효과
1 글로벌 동시 쿼리 제한 (Semaphore) 심각 낮음 동시 요청 폭주 시 서비스 마비 방지
2 쿼리 완료 시 리소스 반환 (releaseQuery) 심각 낮음 세션당 제한 정상 동작
3 CachedThreadPool 교체 심각 낮음 무제한 스레드 생성 방지
4 ChunkedTrackStreamingService 취소 구현 심각 중간 불필요한 리소스 점유 해소
5 쿼리 관리 시스템 통합 중간 중간 상태 불일치 해소
6 백프레셔 콜백 기반 전환 중간 중간 정확한 메모리 관리
7 설정 외부화 낮음 낮음 운영 중 튜닝 가능
8 모니터링 엔드포인트 낮음 낮음 실시간 상태 파악

5. 제한 사항 및 고려 사항

5.1 글로벌 동시 제한 수 결정 기준

maxConcurrentGlobal = min(DB커넥션풀 / 2, Async스레드풀max)
                    = min(60 / 2, 30)
                    = 20 (권장)
  • DB 커넥션 풀(60개)의 50%를 WebSocket 쿼리에 할당
  • 나머지 50%는 REST API, 배치 작업, 헬스체크 등에 예비
  • trackStreamingExecutor max(30) 이하로 설정하여 스레드 풀 포화 방지

5.2 대기 큐 타임아웃

  • 30초 권장 (클라이언트 UX 고려)
  • 타임아웃 시 클라이언트에 명확한 에러 메시지 전달 필요
  • 프론트엔드에서 재시도 로직 또는 "서버가 바쁩니다" 안내 필요

5.3 기존 클라이언트 호환성

  • STOMP 프로토콜 레벨 변경 없음 (하위 호환)
  • 에러 메시지 형식이 변경될 수 있으므로 프론트엔드 에러 핸들링 확인 필요

부록 A. 관련 파일 목록

파일 역할
global/websocket/interceptor/TrackQueryInterceptor.java 세션별 쿼리 제한 인터셉터
global/websocket/service/ChunkedTrackStreamingService.java 청크 기반 스트리밍 (백프레셔)
global/websocket/service/StompTrackStreamingService.java STOMP 스트리밍 (큐 + 지연)
global/websocket/service/ActiveQueryManager.java 활성 쿼리 추적
global/config/AsyncConfig.java 비동기 스레드 풀 설정
global/config/WebSocketStompConfig.java WebSocket STOMP 설정
domain/vessel/service/query/CancellableQueryManager.java 쿼리 취소 관리
global/websocket/listener/WebSocketDisconnectListener.java 세션 종료 리스너
domain/vessel/service/optimization/TrackStreamingOptimizer.java 적응형 최적화 (미사용)
monitoring/service/TrackStreamingMetrics.java 스트리밍 메트릭

부록 B. 운영 환경 리소스 현황

리소스 설정값 비고
JVM Heap 8~16GB (서버 메모리의 1/3) G1GC
Tomcat Threads max 200
Tomcat Connections max 10,000
DB Pool (Query) max 60 주 사용
DB Pool (Collect) max 20 읽기 전용
DB Pool (Batch) max 20 메타데이터
trackStreamingExecutor core 15, max 30, queue 500 CallerRunsPolicy
STOMP Inbound core 10, max 20, queue 100
STOMP Outbound core 20, max 40, queue 5000
WebSocket Buffer send 256MB, message 50MB