signal-batch/docs/websocket-performance-improvement-report.md
HeungTak Lee dc586dde0c docs: Phase 5~6 구현 진행 문서 및 성능 보고서 업데이트
- implementation-progress.md: Phase 5~6 체크리스트 추가 (전항목 완료)
- websocket-performance-improvement-report.md: 대기열 구조, 캐시 아키텍처 설명 추가

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-06 15:34:32 +09:00

35 KiB
Raw Blame 히스토리

WebSocket 성능 부하 개선 보고서

선박 항적 리플레이 서비스 — 동시 요청 부하 대응

항목 내용
작성일 2026-02-06
대상 시스템 Signal Batch — 선박 항적 조회/리플레이 WebSocket 서비스
운영 환경 Linux, vessel-batch-control.shrun-on-query-server-dev.sh (prod 프로파일)
문제 상황 다수 클라이언트의 리플레이 요청 동시 유입 시 서비스 장애 발생

1. 현황 분석 (AS-IS)

1.1 시스템 아키텍처 개요

클라이언트 (N개)
    │ WebSocket STOMP (/ws-tracks)
    ▼
┌─────────────────────────────────────────────────┐
│  Tomcat (max-threads: 200, max-connections: 10000) │
│  ┌─────────────────────────────────────────────┐   │
│  │  STOMP Inbound Channel (core:10, max:20)    │   │
│  │  → TrackQueryInterceptor (세션당 3개 제한)    │   │
│  └────────────┬────────────────────────────────┘   │
│               ▼                                     │
│  ┌─────────────────────────────────────────────┐   │
│  │  @Async trackStreamingExecutor              │   │
│  │  (core:15, max:30, queue:500)               │   │
│  │  ┌───────────────────────────────────────┐  │   │
│  │  │ ChunkedTrackStreamingService          │  │   │
│  │  │ └ 백프레셔: pendingBufferSize ≤ 50MB  │  │   │
│  │  │ StompTrackStreamingService            │  │   │
│  │  │ └ BlockingQueue(100) + Thread.sleep   │  │   │
│  │  └───────────────────────────────────────┘  │   │
│  └────────────┬────────────────────────────────┘   │
│               ▼                                     │
│  ┌─────────────────────────────────────────────┐   │
│  │  STOMP Outbound Channel (core:20, max:40)   │   │
│  │  (queue: 5000)                              │   │
│  └─────────────────────────────────────────────┘   │
│               ▼                                     │
│  PostgreSQL + PostGIS (HikariCP max: 60)            │
└─────────────────────────────────────────────────────┘

1.2 현재 구현된 부하 제어 메커니즘

(A) 세션당 동시 쿼리 제한 — TrackQueryInterceptor

파일: global/websocket/interceptor/TrackQueryInterceptor.java

// AS-IS: 세션당 최대 3개 동시 쿼리 제한
private final ConcurrentHashMap<String, AtomicInteger> sessionQueries = new ConcurrentHashMap<>();
private static final int MAX_QUERIES_PER_SESSION = 3;

@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
    // ...
    if ("/app/tracks/query".equals(destination)) {
        AtomicInteger queryCount = sessionQueries.computeIfAbsent(
            sessionId, k -> new AtomicInteger(0)
        );
        if (queryCount.get() >= MAX_QUERIES_PER_SESSION) {
            throw new IllegalStateException("Maximum concurrent queries exceeded");
        }
        queryCount.incrementAndGet();
    }
    return message;
}

// 쿼리 완료 시 카운트 감소 메서드 (존재하지만 호출부 없음)
public void decrementQueryCount(String sessionId) {
    AtomicInteger count = sessionQueries.get(sessionId);
    if (count != null) {
        count.decrementAndGet();
    }
}

(B) 메모리 버퍼 기반 백프레셔 — ChunkedTrackStreamingService

파일: global/websocket/service/ChunkedTrackStreamingService.java

// AS-IS: 50MB 버퍼 기반 백프레셔
private final AtomicLong pendingBufferSize = new AtomicLong(0);
private static final long MAX_PENDING_BUFFER = 50 * 1024 * 1024; // 50MB

// 버퍼 초과 시 busy-wait
int backpressureWaitCount = 0;
while (pendingBufferSize.get() > MAX_PENDING_BUFFER) {
    Thread.sleep(50);
    backpressureWaitCount++;
    // 500ms 이상 대기 시 청크 크기 축소
    if (backpressureWaitCount > 10 && metrics.dynamicChunkSizeKB > MIN_MESSAGE_SIZE_KB) {
        metrics.dynamicChunkSizeKB = Math.max(MIN_MESSAGE_SIZE_KB,
                metrics.dynamicChunkSizeKB - 512);
    }
}

// 전송 완료 후 버퍼 감소 (비동기 + 100ms 고정 대기)
CompletableFuture.runAsync(() -> {
    Thread.sleep(100); // 네트워크 전송 시간 "추정"
    pendingBufferSize.addAndGet(-chunkSize);
});

(C) 비동기 스레드 풀 — AsyncConfig

파일: global/config/AsyncConfig.java

// AS-IS: 고정 크기 스레드 풀
@Bean(name = "trackStreamingExecutor")
public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(15);
    executor.setMaxPoolSize(30);
    executor.setQueueCapacity(500);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    return executor;
}

(D) 정적 전송 지연 — StompTrackStreamingService

파일: global/websocket/service/StompTrackStreamingService.java

// AS-IS: 데이터 크기에 따른 고정 지연
private int calculateChunkDelay(int chunkSize, int totalTracks) {
    if (totalTracks > 1000000) return 200;
    else if (totalTracks > 500000) return 150;
    else if (totalTracks > 100000) return 100;
    else if (totalTracks > 50000) return 50;
    else if (totalTracks > 10000) return 30;
    return 10;
}

(E) STOMP 채널 설정 — WebSocketStompConfig

파일: global/config/WebSocketStompConfig.java

// AS-IS: 인바운드/아웃바운드 채널 스레드 풀
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
    registration.taskExecutor()
        .corePoolSize(10).maxPoolSize(20).queueCapacity(100);
}

@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
    registration.taskExecutor()
        .corePoolSize(20).maxPoolSize(40).queueCapacity(5000);
}

@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
    registration
        .setMessageSizeLimit(50 * 1024 * 1024)      // 50MB
        .setSendBufferSizeLimit(256 * 1024 * 1024)  // 256MB
        .setSendTimeLimit(120 * 1000);               // 120초
}

1.3 운영 환경 설정

항목 설정값
JVM 힙 서버 메모리의 1/3 (최소 8GB, 최대 16GB)
GC G1GC, MaxGCPauseMillis=200
Tomcat 스레드 max: 200, min-spare: 10
Tomcat 연결 max-connections: 10000, accept-count: 100
DB 커넥션 풀 (Query) max: 60, min-idle: 10
DB 커넥션 풀 (Collect) max: 20, min-idle: 5
DB 커넥션 풀 (Batch) max: 20, min-idle: 10

2. 식별된 문제점

2.1 [심각] 글로벌 동시 쿼리 제한 없음

현상: 세션당 3개만 제한하므로, 100개 세션이 동시 접속하면 최대 300개 동시 쿼리 발생 가능

영향:

  • DB 커넥션 풀(60개) 고갈 → 커넥션 대기 타임아웃
  • trackStreamingExecutor 큐(500) 포화 → CallerRunsPolicy에 의해 인바운드 채널 스레드 블로킹
  • 인바운드 채널(max 20) 블로킹 → 새 WebSocket 요청 처리 불가 → 전체 서비스 마비

관련 코드: TrackQueryInterceptor.java — 글로벌 카운터 없음

[시나리오: 50개 클라이언트 × 3개 쿼리 = 150개 동시 요청]

trackStreamingExecutor (max: 30 스레드)
    → 30개 실행 + 500개 큐 대기
    → 큐 포화 시 CallerRunsPolicy 발동
    → 인바운드 채널 스레드(20개)에서 직접 실행
    → 인바운드 채널 블로킹 → WebSocket 메시지 처리 중단
    → 하트비트 실패 → 세션 끊김 → 연쇄 장애

2.2 [심각] 쿼리 카운트 감소 로직 누락

현상: TrackQueryInterceptor.decrementQueryCount() 메서드는 존재하지만, 쿼리 완료/실패 시 호출하는 코드가 없음

영향:

  • 세션이 유지되는 동안 쿼리 카운트가 계속 증가만 함
  • 3개 쿼리 후 해당 세션은 더 이상 쿼리 불가
  • DISCONNECT 시에만 sessionQueries.remove()로 정리됨
  • 결과적으로 세션당 제한이 사실상 무의미 (장기 연결에서 3개 쿼리 후 차단)

관련 코드:

// TrackQueryInterceptor.java:68 - 정의만 존재, 호출부 없음
public void decrementQueryCount(String sessionId) {
    AtomicInteger count = sessionQueries.get(sessionId);
    if (count != null) {
        count.decrementAndGet();
    }
}

2.3 [심각] CachedThreadPool 무제한 스레드 생성

현상: CancellableQueryManagerExecutors.newCachedThreadPool() 사용

영향:

  • 부하 시 무제한 스레드 생성 → OOM 또는 OS 스레드 한도 초과
  • 스레드 컨텍스트 스위칭 오버헤드 급증

관련 코드: CancellableQueryManager.java

// AS-IS: 상한 없는 스레드 풀
private final ExecutorService queryExecutor = Executors.newCachedThreadPool(r -> {
    Thread thread = new Thread(r);
    thread.setName("query-executor-" + thread.getId());
    thread.setDaemon(true);
    return thread;
});

2.4 [심각] ChunkedTrackStreamingService 쿼리 취소 미구현

현상: cancelQuery() 메서드가 메트릭 정리만 수행하고 실제 스트리밍 중단 로직이 없음

영향:

  • 클라이언트가 연결을 끊어도 서버에서 DB 쿼리 + 데이터 가공이 계속 실행
  • 리소스(DB 커넥션, CPU, 메모리)가 불필요하게 점유됨

관련 코드: ChunkedTrackStreamingService.java:1176

// AS-IS: 취소 시 메트릭만 정리, 실행 중인 쿼리는 계속됨
public void cancelQuery(String queryId) {
    log.info("Cancelling chunked query: {}", queryId);
    // TODO: 실제 취소 로직 구현
    cleanupQueryMetrics(queryId);
}

2.5 [중간] 백프레셔의 비정확한 버퍼 추적

현상: 전송 완료 후 버퍼 크기 감소를 CompletableFuture + Thread.sleep(100ms)로 처리

영향:

  • 실제 네트워크 전송 완료와 무관하게 100ms 후 무조건 버퍼 감소
  • 네트워크 지연이 100ms보다 길면 버퍼가 과소 추적 → 실제 메모리 사용량보다 작게 인식
  • 네트워크 지연이 100ms보다 짧으면 불필요한 백프레셔 발생

관련 코드: ChunkedTrackStreamingService.java:1069

// AS-IS: 네트워크 전송 시간을 100ms로 "추정"
CompletableFuture.runAsync(() -> {
    try {
        Thread.sleep(100); // 네트워크 전송 시간 고려
        pendingBufferSize.addAndGet(-chunkSize);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
});

2.6 [중간] 두 개의 쿼리 관리 시스템 분리

현상: ActiveQueryManagerStompTrackStreamingService.activeQueries가 각각 독립적으로 쿼리 상태를 관리

영향:

  • 취소 시 한쪽만 처리될 가능성
  • 상태 불일치로 인한 리소스 누수

관련 클래스:

  • ActiveQueryManager: 세션별 쿼리 관리 (ConcurrentHashMap<String, ActiveQuery>)
  • StompTrackStreamingService: 쿼리별 컨텍스트 관리 (ConcurrentHashMap<String, QueryContext>)

2.7 [중간] StompTrackStreamingService의 병합 모드 OOM 위험

현상: 병합 모드에서 모든 트랙 데이터를 메모리에 적재

영향:

  • 대량 데이터(100만+ 트랙) 요청 시 힙 메모리 초과 → OOM
  • G1GC Full GC 빈발 → STW(Stop-The-World) 증가 → 서비스 지연

2.8 [낮음] WebSocket 설정의 하드코딩

현상: 모든 WebSocket 관련 제한값이 Java 코드에 하드코딩

영향:

  • 운영 중 튜닝 불가 (재배포 필요)
  • 환경별 차별화 불가
하드코딩 항목 위치
MAX_QUERIES_PER_SESSION TrackQueryInterceptor 3
MAX_PENDING_BUFFER ChunkedTrackStreamingService 50MB
MAX_MESSAGE_SIZE_KB ChunkedTrackStreamingService 1024KB
BlockingQueue 크기 StompTrackStreamingService 100
Inbound core/max WebSocketStompConfig 10/20
Outbound core/max WebSocketStompConfig 20/40
sendBufferSizeLimit WebSocketStompConfig 256MB

2.9 [낮음] 미사용 최적화 코드 사장

현상: TrackStreamingOptimizer에 네트워크 메트릭 기반 적응형 청크 조정 로직이 설계되어 있으나, 실제 서비스에서 연결/사용되지 않음


3. 개선 계획 (TO-BE)

3.1 Phase 1 — 긴급 안정화 (즉시 적용)

목표: 동시 요청 폭주 시 서비스 마비 방지

3.1.1 글로벌 동시 쿼리 제한 도입

AS-IS: 세션당 제한만 존재 (글로벌 제한 없음)

TO-BE: 서버 전체 동시 실행 쿼리 수를 Semaphore로 제한 + 대기 큐 구현

// TO-BE: TrackQueryInterceptor.java — 글로벌 동시 제한 추가
@Slf4j
@Component
@RequiredArgsConstructor
public class TrackQueryInterceptor implements ChannelInterceptor {

    private final ConcurrentHashMap<String, AtomicInteger> sessionQueries = new ConcurrentHashMap<>();

    // ▼ [추가] 글로벌 동시 쿼리 제한
    @Value("${websocket.query.max-concurrent-global:20}")
    private int maxConcurrentGlobal;

    @Value("${websocket.query.max-per-session:3}")
    private int maxQueriesPerSession;

    @Value("${websocket.query.queue-timeout-seconds:30}")
    private int queueTimeoutSeconds;

    private Semaphore globalQuerySemaphore;
    private final AtomicInteger waitingCount = new AtomicInteger(0);

    @PostConstruct
    public void init() {
        this.globalQuerySemaphore = new Semaphore(maxConcurrentGlobal, true); // fair=true
        log.info("Global query semaphore initialized: maxConcurrent={}, queueTimeout={}s",
                maxConcurrentGlobal, queueTimeoutSeconds);
    }

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(
            message, StompHeaderAccessor.class);

        if (accessor != null && StompCommand.SEND.equals(accessor.getCommand())) {
            String destination = accessor.getDestination();
            String sessionId = accessor.getSessionId();

            if ("/app/tracks/query".equals(destination)) {
                // 1. 세션당 제한 확인
                AtomicInteger queryCount = sessionQueries.computeIfAbsent(
                    sessionId, k -> new AtomicInteger(0));
                if (queryCount.get() >= maxQueriesPerSession) {
                    throw new IllegalStateException(
                        "Session query limit exceeded: " + maxQueriesPerSession);
                }

                // 2. 글로벌 대기 큐 진입
                int waiting = waitingCount.incrementAndGet();
                log.info("Query queued: session={}, waiting={}, active={}/{}",
                        sessionId, waiting,
                        maxConcurrentGlobal - globalQuerySemaphore.availablePermits(),
                        maxConcurrentGlobal);
                try {
                    boolean acquired = globalQuerySemaphore.tryAcquire(
                        queueTimeoutSeconds, TimeUnit.SECONDS);
                    if (!acquired) {
                        throw new IllegalStateException(
                            "Query queue timeout after " + queueTimeoutSeconds + "s. " +
                            "Server is busy, please retry later.");
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("Query interrupted while waiting");
                } finally {
                    waitingCount.decrementAndGet();
                }

                queryCount.incrementAndGet();
            }
        }
        return message;
    }

    // ▼ [추가] 쿼리 완료 시 호출 — Semaphore 반환 + 카운트 감소
    public void releaseQuery(String sessionId) {
        globalQuerySemaphore.release();
        decrementQueryCount(sessionId);
        log.debug("Query released: session={}, available={}/{}",
                sessionId, globalQuerySemaphore.availablePermits(), maxConcurrentGlobal);
    }

    // 기존 메서드 유지
    public void decrementQueryCount(String sessionId) {
        AtomicInteger count = sessionQueries.get(sessionId);
        if (count != null && count.get() > 0) {
            count.decrementAndGet();
        }
    }

    // 모니터링용
    public int getActiveQueryCount() {
        return maxConcurrentGlobal - globalQuerySemaphore.availablePermits();
    }

    public int getWaitingCount() {
        return waitingCount.get();
    }
}

application.yml 설정 추가:

websocket:
  query:
    max-concurrent-global: 20     # 서버 전체 동시 쿼리 상한
    max-per-session: 3            # 세션당 동시 쿼리 상한
    queue-timeout-seconds: 30     # 대기 큐 타임아웃

3.1.2 쿼리 완료/실패 시 리소스 반환 보장

AS-IS: decrementQueryCount() 호출부 없음 → 세션당 제한이 해제되지 않음

TO-BE: 스트리밍 서비스에서 쿼리 완료/실패/취소 시 반드시 releaseQuery() 호출

// TO-BE: ChunkedTrackStreamingService.java — finally 블록에서 리소스 반환
@Async("trackStreamingExecutor")
public void streamChunkedTracks(TrackQueryRequest request,
                                String queryId, String sessionId,
                                Consumer<TrackChunkResponse> chunkConsumer,
                                Consumer<QueryStatusUpdate> statusConsumer) {
    try {
        // ... 기존 스트리밍 로직 ...
    } catch (Exception e) {
        // ... 에러 처리 ...
    } finally {
        // ▼ [추가] 반드시 리소스 반환
        trackQueryInterceptor.releaseQuery(sessionId);
        activeQueryManager.completeQuery(sessionId);
        cleanupQueryMetrics(queryId);
        log.info("Query resources released: queryId={}, sessionId={}", queryId, sessionId);
    }
}
// TO-BE: StompTrackStreamingService.java — 동일하게 finally 보장
@Async
public void streamTracks(TrackQueryRequest request, String queryId,
                         String sessionId, ...) {
    try {
        // ... 기존 스트리밍 로직 ...
    } catch (Exception e) {
        // ... 에러 처리 ...
    } finally {
        // ▼ [추가] 반드시 리소스 반환
        activeQueries.remove(queryId);
        trackQueryInterceptor.releaseQuery(sessionId);
        log.info("Query resources released: queryId={}, sessionId={}", queryId, sessionId);
    }
}

3.1.3 CachedThreadPool → 제한된 ThreadPoolExecutor 교체

AS-IS: CancellableQueryManager의 무제한 스레드 풀

// AS-IS
private final ExecutorService queryExecutor = Executors.newCachedThreadPool(...);

TO-BE: 상한이 있는 ThreadPoolExecutor로 교체

// TO-BE: CancellableQueryManager.java
private final ExecutorService queryExecutor = new ThreadPoolExecutor(
    5,                                    // corePoolSize
    20,                                   // maximumPoolSize (상한 설정)
    60L, TimeUnit.SECONDS,               // keepAliveTime
    new LinkedBlockingQueue<>(100),       // 대기 큐 크기 제한
    r -> {
        Thread thread = new Thread(r);
        thread.setName("query-executor-" + thread.getId());
        thread.setDaemon(true);
        return thread;
    },
    new ThreadPoolExecutor.CallerRunsPolicy() // 큐 포화 시 호출 스레드에서 실행
);

3.2 Phase 2 — 취소 및 정리 로직 완성

목표: 불필요한 리소스 점유 방지, 쿼리 생명주기 완전 관리

3.2.1 ChunkedTrackStreamingService 쿼리 취소 구현

AS-IS: TODO 주석만 존재

// AS-IS
public void cancelQuery(String queryId) {
    log.info("Cancelling chunked query: {}", queryId);
    // TODO: 실제 취소 로직 구현
    cleanupQueryMetrics(queryId);
}

TO-BE: AtomicBoolean 취소 플래그 + 쿼리 루프 내 체크 포인트

// TO-BE: ChunkedTrackStreamingService.java

// ▼ [추가] 쿼리별 취소 플래그 관리
private final ConcurrentHashMap<String, AtomicBoolean> queryCancelFlags = new ConcurrentHashMap<>();

@Async("trackStreamingExecutor")
public void streamChunkedTracks(TrackQueryRequest request,
                                String queryId, String sessionId, ...) {
    // ▼ [추가] 취소 플래그 등록
    AtomicBoolean cancelFlag = new AtomicBoolean(false);
    queryCancelFlags.put(queryId, cancelFlag);

    try {
        // ... 기존 로직에서 processTableRange 호출 시 cancelFlag 전달 ...
        for (Map.Entry<TableStrategy, List<TimeRange>> entry : strategyMap.entrySet()) {
            // ▼ [추가] 테이블 처리 전 취소 확인
            if (cancelFlag.get()) {
                log.info("Query {} cancelled before processing table {}", queryId, entry.getKey());
                break;
            }
            // processTableRange(request, strategy, range, sessionId, viewportVesselIds, cancelFlag);
        }
    } finally {
        queryCancelFlags.remove(queryId);
        trackQueryInterceptor.releaseQuery(sessionId);
        activeQueryManager.completeQuery(sessionId);
        cleanupQueryMetrics(queryId);
    }
}

// ▼ [개선] 실제 취소 로직 구현
public void cancelQuery(String queryId) {
    AtomicBoolean flag = queryCancelFlags.get(queryId);
    if (flag != null) {
        flag.set(true);
        log.info("Cancel flag set for query: {}", queryId);
    }
    cleanupQueryMetrics(queryId);
}
// TO-BE: processTableRange 내부 — 주기적 취소 확인
while (rs.next() && trackCount < MAX_TRACKS_PER_CHUNK) {
    // ▼ [추가] 1000건마다 취소 확인
    if (trackCount % 1000 == 0 && cancelFlag.get()) {
        log.info("Query cancelled during processing: table={}, processed={}", tableName, trackCount);
        return Collections.emptyList();
    }
    // ... 기존 처리 로직 ...
}

3.2.2 쿼리 관리 시스템 통합

AS-IS: ActiveQueryManagerStompTrackStreamingService.activeQueries 이원화

TO-BE: ActiveQueryManager를 단일 진실의 원천(Single Source of Truth)으로 통합

// TO-BE: ActiveQueryManager.java — 취소 플래그 및 릴리즈 콜백 통합
public static class ActiveQuery {
    private final String sessionId;
    private final String queryId;
    private final LocalDateTime startTime;
    private final AtomicBoolean cancelled = new AtomicBoolean(false);
    private volatile String currentTable;
    private volatile int processedChunks;
    private Runnable onComplete;  // ▼ [추가] 완료 콜백

    public void setOnComplete(Runnable callback) {
        this.onComplete = callback;
    }
}

public void completeQuery(String sessionId) {
    ActiveQuery query = activeQueries.remove(sessionId);
    if (query != null) {
        // ▼ [추가] 등록된 콜백 실행 (Semaphore 반환 등)
        if (query.onComplete != null) {
            query.onComplete.run();
        }
        log.debug("Query completed: sessionId={}, queryId={}", sessionId, query.getQueryId());
    }
}

3.3 Phase 3 — 백프레셔 고도화

목표: 정확한 버퍼 추적, 클라이언트 소비 속도 반영

3.3.1 콜백 기반 버퍼 추적

AS-IS: Thread.sleep(100ms) 후 무조건 버퍼 감소 (추정치)

TO-BE: 전송 완료 콜백에서 정확히 버퍼 감소

// TO-BE: 전송 완료 콜백 기반 버퍼 관리
// SimpMessagingTemplate의 convertAndSendToUser 대신 직접 전송 제어

// WebSocket 메시지 전송 래퍼
private void sendWithBackpressureTracking(String sessionId, String destination,
                                          TrackChunkResponse response, int chunkSize) {
    pendingBufferSize.addAndGet(chunkSize);
    try {
        messagingTemplate.convertAndSendToUser(sessionId, destination, response);
        // 전송 성공 즉시 버퍼 감소
        pendingBufferSize.addAndGet(-chunkSize);
    } catch (Exception e) {
        // 전송 실패 시에도 버퍼 감소
        pendingBufferSize.addAndGet(-chunkSize);
        throw e;
    }
}

3.3.2 적응형 전송 지연

AS-IS: 총 트랙 수에 따른 정적 지연

TO-BE: 실시간 버퍼 사용률 + 최근 전송 성공률에 따른 동적 지연

// TO-BE: 동적 지연 계산
private int calculateAdaptiveDelay(BackpressureMetrics metrics) {
    double bufferUsage = (double) pendingBufferSize.get() / MAX_PENDING_BUFFER;

    if (bufferUsage > 0.8) return 200;       // 80% 이상: 강한 억제
    else if (bufferUsage > 0.5) return 100;  // 50% 이상: 중간 억제
    else if (bufferUsage > 0.3) return 50;   // 30% 이상: 약한 억제
    return 10;                                // 정상: 최소 지연
}

3.4 Phase 4 — 설정 외부화 및 모니터링

목표: 운영 중 튜닝 가능, 실시간 상황 파악

3.4.1 WebSocket 설정 Properties 클래스 도입

// TO-BE: WebSocketProperties.java (신규)
@Data
@Component
@ConfigurationProperties(prefix = "websocket")
public class WebSocketProperties {

    private QueryProperties query = new QueryProperties();
    private TransportProperties transport = new TransportProperties();
    private BackpressureProperties backpressure = new BackpressureProperties();

    @Data
    public static class QueryProperties {
        private int maxConcurrentGlobal = 20;
        private int maxPerSession = 3;
        private int queueTimeoutSeconds = 30;
    }

    @Data
    public static class TransportProperties {
        private int inboundCorePoolSize = 10;
        private int inboundMaxPoolSize = 20;
        private int inboundQueueCapacity = 100;
        private int outboundCorePoolSize = 20;
        private int outboundMaxPoolSize = 40;
        private int outboundQueueCapacity = 5000;
        private int messageSizeLimitMb = 50;
        private int sendBufferSizeLimitMb = 256;
        private int sendTimeLimitSeconds = 120;
    }

    @Data
    public static class BackpressureProperties {
        private long maxPendingBufferMb = 50;
        private int maxMessageSizeKb = 1024;
        private int minMessageSizeKb = 256;
    }
}

application.yml 통합 설정:

websocket:
  query:
    max-concurrent-global: 20
    max-per-session: 3
    queue-timeout-seconds: 30
  transport:
    inbound-core-pool-size: 10
    inbound-max-pool-size: 20
    inbound-queue-capacity: 100
    outbound-core-pool-size: 20
    outbound-max-pool-size: 40
    outbound-queue-capacity: 5000
    message-size-limit-mb: 50
    send-buffer-size-limit-mb: 256
    send-time-limit-seconds: 120
  backpressure:
    max-pending-buffer-mb: 50
    max-message-size-kb: 1024
    min-message-size-kb: 256

3.4.2 모니터링 엔드포인트 추가

// TO-BE: WebSocketMonitorController.java (신규)
@RestController
@RequestMapping("/api/v1/monitor/websocket")
@RequiredArgsConstructor
public class WebSocketMonitorController {

    private final TrackQueryInterceptor queryInterceptor;
    private final ActiveQueryManager activeQueryManager;

    @GetMapping("/status")
    public Map<String, Object> getStatus() {
        return Map.of(
            "activeQueries", queryInterceptor.getActiveQueryCount(),
            "waitingInQueue", queryInterceptor.getWaitingCount(),
            "queryDetails", activeQueryManager.getAllActiveQueries().entrySet().stream()
                .map(e -> Map.of(
                    "sessionId", e.getKey(),
                    "queryId", e.getValue().getQueryId(),
                    "startTime", e.getValue().getStartTime().toString(),
                    "table", String.valueOf(e.getValue().getCurrentTable()),
                    "chunks", e.getValue().getProcessedChunks()
                )).collect(Collectors.toList())
        );
    }
}

4. 개선 우선순위 및 기대 효과

순위 개선 항목 심각도 난이도 기대 효과
1 글로벌 동시 쿼리 제한 (Semaphore) 심각 낮음 동시 요청 폭주 시 서비스 마비 방지
2 쿼리 완료 시 리소스 반환 (releaseQuery) 심각 낮음 세션당 제한 정상 동작
3 CachedThreadPool 교체 심각 낮음 무제한 스레드 생성 방지
4 ChunkedTrackStreamingService 취소 구현 심각 중간 불필요한 리소스 점유 해소
5 쿼리 관리 시스템 통합 중간 중간 상태 불일치 해소
6 백프레셔 콜백 기반 전환 중간 중간 정확한 메모리 관리
7 설정 외부화 낮음 낮음 운영 중 튜닝 가능
8 모니터링 엔드포인트 낮음 낮음 실시간 상태 파악

5. 제한 사항 및 고려 사항

5.1 글로벌 동시 제한 수 결정 기준

maxConcurrentGlobal = min(DB커넥션풀 / 2, Async스레드풀max)
                    = min(60 / 2, 30)
                    = 20 (권장)
  • DB 커넥션 풀(60개)의 50%를 WebSocket 쿼리에 할당
  • 나머지 50%는 REST API, 배치 작업, 헬스체크 등에 예비
  • trackStreamingExecutor max(30) 이하로 설정하여 스레드 풀 포화 방지

5.2 대기 큐 타임아웃

  • 30초 권장 (클라이언트 UX 고려)
  • 타임아웃 시 클라이언트에 명확한 에러 메시지 전달 필요
  • 프론트엔드에서 재시도 로직 또는 "서버가 바쁩니다" 안내 필요

5.3 기존 클라이언트 호환성

  • STOMP 프로토콜 레벨 변경 없음 (하위 호환)
  • 에러 메시지 형식이 변경될 수 있으므로 프론트엔드 에러 핸들링 확인 필요

부록 A. 관련 파일 목록

파일 역할
global/websocket/interceptor/TrackQueryInterceptor.java 세션별 쿼리 제한 인터셉터
global/websocket/service/ChunkedTrackStreamingService.java 청크 기반 스트리밍 (백프레셔)
global/websocket/service/StompTrackStreamingService.java STOMP 스트리밍 (큐 + 지연)
global/websocket/service/ActiveQueryManager.java 활성 쿼리 추적
global/config/AsyncConfig.java 비동기 스레드 풀 설정
global/config/WebSocketStompConfig.java WebSocket STOMP 설정
domain/vessel/service/query/CancellableQueryManager.java 쿼리 취소 관리
global/websocket/listener/WebSocketDisconnectListener.java 세션 종료 리스너
domain/vessel/service/optimization/TrackStreamingOptimizer.java 적응형 최적화 (미사용)
monitoring/service/TrackStreamingMetrics.java 스트리밍 메트릭

Phase 5. 대기열 기반 쿼리 관리 + 타임아웃 최적화

5.1 대기열 구조

기존: 슬롯 부족 시 즉시 거부 (ERROR: Server is busy) 개선: 대기열 순번 안내 후 최대 2분 대기 (QUEUED: position 3/5)

요청 유입
    │
    ▼
tryAcquireSlotImmediate()
    │
    ├── 성공 → 즉시 처리
    │
    └── 실패 → 대기열 진입
        │   ┌──────────────────────────────┐
        │   │  2초 간격 루프:               │
        │   │  1. tryAcquire (2초 타임아웃)  │
        │   │  2. QUEUED 상태 전송           │
        │   │     (순번/전체 큐 크기)        │
        │   │  3. 최대 2분까지 반복          │
        │   └──────────────────────────────┘
        ├── 슬롯 획득 → 처리 시작
        └── 타임아웃 → ERROR 응답

5.2 타임아웃 최적화

설정 AS-IS TO-BE 근거
Query DB 풀 120 180 동시 60쿼리 × 3커넥션
max-concurrent-global 30 60 180 / ~3
max-per-session 3 20 대기열 방식이므로 넉넉하게
Executor max 30 120 60실행 + 60대기
Session idle timeout 60s 15s 빠른 정리
Heartbeat 10s/10s 5s/5s 죽은 연결 빠른 감지
SockJS disconnect delay 30s 5s 빠른 해제
Send time limit 120s 30s 응답 완료 후 빠른 정리

Phase 6. 일일 데이터 인메모리 캐시

6.1 캐시 아키텍처

일일(Daily) 집계 테이블의 7일분 데이터를 메모리에 캐시하여 DB 조회를 생략.

DailyTrackCacheManager (@Service)
├── cache: ConcurrentHashMap<LocalDate, DailyTrackData>
│   ├── D-7 → {tracks: Map<vesselId, CompactVesselTrack>, vesselCount, memorySizeBytes}
│   ├── D-6 → ...
│   ├── ...
│   └── D-1 → ...
│
├── 비동기 워밍업 (ApplicationReadyEvent → @Async)
│   → 스케줄러/API/WebSocket 차단 없이 백그라운드 로드
│   → D-1 → D-2 → ... → D-7 (최근 우선)
│   → 각 날짜 로드 완료 즉시 캐시 활성화
│
├── 하이브리드 쿼리 분리
│   → 요청 범위의 모든 날짜를 캐시 확인
│   → 히트: 메모리 조회, 미스: DB 조회, 오늘: hourly/5min 테이블
│   → 결과 vessel 기준 병합
│
└── 일일 갱신 (DailyAggregationJob 완료 시)
    → D-1 (재)로드 + D-8 제거

6.2 캐시 용량 추정

항목 수치
Daily 테이블 1일분 ~350MB (DB)
7일분 인메모리 추정 ~4GB (Java 객체 오버헤드 포함)
최대 메모리 한도 5GB (설정 가능)
JVM 힙 (권장) 12GB 이상

6.3 쿼리 라우팅

요청: 2026-01-30 ~ 2026-02-06 15:00

1. 날짜별 분류:
   ├─ 02-06 (오늘) → DB 필수 (hourly/5min 테이블)
   ├─ 02-05: cache HIT → 메모리
   ├─ 02-04: cache HIT → 메모리
   ├─ ...
   ├─ 01-31: cache HIT → 메모리
   └─ 01-30: cache MISS → DB (daily 테이블)

2. 결과: vessel 기준 병합 → 기존 응답 구조 그대로

6.4 모니터링

GET /api/websocket/daily-cache

{
  "status": "READY",
  "cachedDays": 7,
  "totalVessels": 280000,
  "totalMemoryMb": 3500,
  "days": [
    {"date": "2026-02-05", "vesselCount": 42000, "memorySizeMb": 520, "loadedAt": "..."},
    ...
  ]
}

부록 B. 운영 환경 리소스 현황 (Phase 5~6 반영)

리소스 AS-IS TO-BE 비고
JVM Heap 8~16GB 12~16GB 권장 캐시 ~4GB + 운영
DB Pool (Query) max 120 max 180 WebSocket + REST
DB Pool (Collect) max 80 max 80 배치 Reader
DB Pool (Batch) max 30 max 30 메타데이터
max-concurrent-global 30 60 Query풀 180 / 3
trackStreamingExecutor core 15, max 30 core 40, max 120 대기열 + 실행
Session idle timeout 60s 15s 빠른 정리
Heartbeat 10s/10s 5s/5s 빠른 감지
Daily cache - 7일분 ~4GB 비동기 워밍업