diff --git a/docs/implementation-progress.md b/docs/implementation-progress.md new file mode 100644 index 0000000..42ae8ae --- /dev/null +++ b/docs/implementation-progress.md @@ -0,0 +1,86 @@ +# WebSocket 부하 제어 개선 — 구현 진행 상황 + +> 브랜치: `feat/websocket-load-control` +> 시작일: 2026-02-06 + +--- + +## DB 커넥션 풀 분배 설계 (총 250개) + +| DataSource | AS-IS | TO-BE | 비고 | +|------------|-------|-------|------| +| **Query** | 60 (min 10) | 120 (min 20) | WebSocket 스트리밍 + REST API 주 사용 | +| **Collect** | 20 (min 5) | 80 (min 15) | 배치 Reader, 신호 수집 조회 | +| **Batch** | 20 (min 10) | 30 (min 5) | Spring Batch 메타데이터 | +| **예비** | - | 20 | 운영 여유분 | +| **합계** | 100 | 250 | DB 서버 500 중 250 사용 | + +### 글로벌 동시 쿼리 제한 산정 +- Query 풀 120개 / 쿼리당 평균 3커넥션 = 40 +- 보수적 적용: **30개** (REST API, 헬스체크 등에 여유분 확보) + +--- + +## Phase 1 — 긴급 안정화 + +- [ ] **1.1** 글로벌 동시 쿼리 제한 (Semaphore + Fair Queue) + - TrackQueryInterceptor에 Semaphore 기반 글로벌 제한 추가 + - application-prod.yml에 websocket 설정 외부화 + - 상태: 대기 + +- [ ] **1.2** 쿼리 완료 시 리소스 반환 보장 + - ChunkedTrackStreamingService finally 블록에 releaseQuery() 추가 + - StompTrackStreamingService finally 블록에 releaseQuery() 추가 + - 상태: 대기 + +- [ ] **1.3** CachedThreadPool → 제한된 ThreadPoolExecutor 교체 + - CancellableQueryManager의 newCachedThreadPool 교체 + - 상태: 대기 + +- [ ] **1.4** DB 커넥션 풀 재분배 (prod) + - application-prod.yml Query/Collect/Batch 풀 사이즈 조정 + - 상태: 대기 + +--- + +## Phase 2 — 취소 및 정리 로직 완성 + +- [ ] **2.1** ChunkedTrackStreamingService 쿼리 취소 구현 + - AtomicBoolean 취소 플래그 + processTableRange 내 체크포인트 + - 상태: 대기 + +- [ ] **2.2** 쿼리 관리 시스템 통합 + - ActiveQueryManager를 단일 진실의 원천으로 통합 + - 상태: 대기 + +--- + +## Phase 3 — 백프레셔 고도화 + +- [ ] **3.1** 콜백 기반 버퍼 추적 + - CompletableFuture + Thread.sleep(100) → 즉시 감소 전환 + - 상태: 대기 + +- [ ] **3.2** 적응형 전송 지연 + - 버퍼 사용률 기반 동적 지연 + - 상태: 대기 + +--- + +## Phase 4 — 설정 외부화 및 모니터링 + +- [ ] **4.1** WebSocketProperties 설정 클래스 + - 하드코딩된 설정값을 application.yml로 외부화 + - 상태: 대기 + +- [ ] **4.2** 모니터링 엔드포인트 + - /api/v1/monitor/websocket/status + - 상태: 대기 + +--- + +## 커밋 이력 + +| 날짜 | Phase | 커밋 메시지 | 해시 | +|------|-------|------------|------| +| | | | | diff --git a/docs/websocket-performance-improvement-report.md b/docs/websocket-performance-improvement-report.md new file mode 100644 index 0000000..d9cbea4 --- /dev/null +++ b/docs/websocket-performance-improvement-report.md @@ -0,0 +1,865 @@ +# WebSocket 성능 부하 개선 보고서 + +## 선박 항적 리플레이 서비스 — 동시 요청 부하 대응 + +| 항목 | 내용 | +|------|------| +| 작성일 | 2026-02-06 | +| 대상 시스템 | Signal Batch — 선박 항적 조회/리플레이 WebSocket 서비스 | +| 운영 환경 | Linux, `vessel-batch-control.sh` → `run-on-query-server-dev.sh` (prod 프로파일) | +| 문제 상황 | 다수 클라이언트의 리플레이 요청 동시 유입 시 서비스 장애 발생 | + +--- + +## 1. 현황 분석 (AS-IS) + +### 1.1 시스템 아키텍처 개요 + +``` +클라이언트 (N개) + │ WebSocket STOMP (/ws-tracks) + ▼ +┌─────────────────────────────────────────────────┐ +│ Tomcat (max-threads: 200, max-connections: 10000) │ +│ ┌─────────────────────────────────────────────┐ │ +│ │ STOMP Inbound Channel (core:10, max:20) │ │ +│ │ → TrackQueryInterceptor (세션당 3개 제한) │ │ +│ └────────────┬────────────────────────────────┘ │ +│ ▼ │ +│ ┌─────────────────────────────────────────────┐ │ +│ │ @Async trackStreamingExecutor │ │ +│ │ (core:15, max:30, queue:500) │ │ +│ │ ┌───────────────────────────────────────┐ │ │ +│ │ │ ChunkedTrackStreamingService │ │ │ +│ │ │ └ 백프레셔: pendingBufferSize ≤ 50MB │ │ │ +│ │ │ StompTrackStreamingService │ │ │ +│ │ │ └ BlockingQueue(100) + Thread.sleep │ │ │ +│ │ └───────────────────────────────────────┘ │ │ +│ └────────────┬────────────────────────────────┘ │ +│ ▼ │ +│ ┌─────────────────────────────────────────────┐ │ +│ │ STOMP Outbound Channel (core:20, max:40) │ │ +│ │ (queue: 5000) │ │ +│ └─────────────────────────────────────────────┘ │ +│ ▼ │ +│ PostgreSQL + PostGIS (HikariCP max: 60) │ +└─────────────────────────────────────────────────────┘ +``` + +### 1.2 현재 구현된 부하 제어 메커니즘 + +#### (A) 세션당 동시 쿼리 제한 — `TrackQueryInterceptor` + +**파일**: `global/websocket/interceptor/TrackQueryInterceptor.java` + +```java +// AS-IS: 세션당 최대 3개 동시 쿼리 제한 +private final ConcurrentHashMap sessionQueries = new ConcurrentHashMap<>(); +private static final int MAX_QUERIES_PER_SESSION = 3; + +@Override +public Message preSend(Message message, MessageChannel channel) { + // ... + if ("/app/tracks/query".equals(destination)) { + AtomicInteger queryCount = sessionQueries.computeIfAbsent( + sessionId, k -> new AtomicInteger(0) + ); + if (queryCount.get() >= MAX_QUERIES_PER_SESSION) { + throw new IllegalStateException("Maximum concurrent queries exceeded"); + } + queryCount.incrementAndGet(); + } + return message; +} + +// 쿼리 완료 시 카운트 감소 메서드 (존재하지만 호출부 없음) +public void decrementQueryCount(String sessionId) { + AtomicInteger count = sessionQueries.get(sessionId); + if (count != null) { + count.decrementAndGet(); + } +} +``` + +#### (B) 메모리 버퍼 기반 백프레셔 — `ChunkedTrackStreamingService` + +**파일**: `global/websocket/service/ChunkedTrackStreamingService.java` + +```java +// AS-IS: 50MB 버퍼 기반 백프레셔 +private final AtomicLong pendingBufferSize = new AtomicLong(0); +private static final long MAX_PENDING_BUFFER = 50 * 1024 * 1024; // 50MB + +// 버퍼 초과 시 busy-wait +int backpressureWaitCount = 0; +while (pendingBufferSize.get() > MAX_PENDING_BUFFER) { + Thread.sleep(50); + backpressureWaitCount++; + // 500ms 이상 대기 시 청크 크기 축소 + if (backpressureWaitCount > 10 && metrics.dynamicChunkSizeKB > MIN_MESSAGE_SIZE_KB) { + metrics.dynamicChunkSizeKB = Math.max(MIN_MESSAGE_SIZE_KB, + metrics.dynamicChunkSizeKB - 512); + } +} + +// 전송 완료 후 버퍼 감소 (비동기 + 100ms 고정 대기) +CompletableFuture.runAsync(() -> { + Thread.sleep(100); // 네트워크 전송 시간 "추정" + pendingBufferSize.addAndGet(-chunkSize); +}); +``` + +#### (C) 비동기 스레드 풀 — `AsyncConfig` + +**파일**: `global/config/AsyncConfig.java` + +```java +// AS-IS: 고정 크기 스레드 풀 +@Bean(name = "trackStreamingExecutor") +public Executor getAsyncExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(15); + executor.setMaxPoolSize(30); + executor.setQueueCapacity(500); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + return executor; +} +``` + +#### (D) 정적 전송 지연 — `StompTrackStreamingService` + +**파일**: `global/websocket/service/StompTrackStreamingService.java` + +```java +// AS-IS: 데이터 크기에 따른 고정 지연 +private int calculateChunkDelay(int chunkSize, int totalTracks) { + if (totalTracks > 1000000) return 200; + else if (totalTracks > 500000) return 150; + else if (totalTracks > 100000) return 100; + else if (totalTracks > 50000) return 50; + else if (totalTracks > 10000) return 30; + return 10; +} +``` + +#### (E) STOMP 채널 설정 — `WebSocketStompConfig` + +**파일**: `global/config/WebSocketStompConfig.java` + +```java +// AS-IS: 인바운드/아웃바운드 채널 스레드 풀 +@Override +public void configureClientInboundChannel(ChannelRegistration registration) { + registration.taskExecutor() + .corePoolSize(10).maxPoolSize(20).queueCapacity(100); +} + +@Override +public void configureClientOutboundChannel(ChannelRegistration registration) { + registration.taskExecutor() + .corePoolSize(20).maxPoolSize(40).queueCapacity(5000); +} + +@Override +public void configureWebSocketTransport(WebSocketTransportRegistration registration) { + registration + .setMessageSizeLimit(50 * 1024 * 1024) // 50MB + .setSendBufferSizeLimit(256 * 1024 * 1024) // 256MB + .setSendTimeLimit(120 * 1000); // 120초 +} +``` + +### 1.3 운영 환경 설정 + +| 항목 | 설정값 | +|------|--------| +| JVM 힙 | 서버 메모리의 1/3 (최소 8GB, 최대 16GB) | +| GC | G1GC, MaxGCPauseMillis=200 | +| Tomcat 스레드 | max: 200, min-spare: 10 | +| Tomcat 연결 | max-connections: 10000, accept-count: 100 | +| DB 커넥션 풀 (Query) | max: 60, min-idle: 10 | +| DB 커넥션 풀 (Collect) | max: 20, min-idle: 5 | +| DB 커넥션 풀 (Batch) | max: 20, min-idle: 10 | + +--- + +## 2. 식별된 문제점 + +### 2.1 [심각] 글로벌 동시 쿼리 제한 없음 + +**현상**: 세션당 3개만 제한하므로, 100개 세션이 동시 접속하면 최대 **300개 동시 쿼리** 발생 가능 + +**영향**: +- DB 커넥션 풀(60개) 고갈 → 커넥션 대기 타임아웃 +- `trackStreamingExecutor` 큐(500) 포화 → `CallerRunsPolicy`에 의해 인바운드 채널 스레드 블로킹 +- 인바운드 채널(max 20) 블로킹 → 새 WebSocket 요청 처리 불가 → 전체 서비스 마비 + +**관련 코드**: `TrackQueryInterceptor.java` — 글로벌 카운터 없음 + +``` +[시나리오: 50개 클라이언트 × 3개 쿼리 = 150개 동시 요청] + +trackStreamingExecutor (max: 30 스레드) + → 30개 실행 + 500개 큐 대기 + → 큐 포화 시 CallerRunsPolicy 발동 + → 인바운드 채널 스레드(20개)에서 직접 실행 + → 인바운드 채널 블로킹 → WebSocket 메시지 처리 중단 + → 하트비트 실패 → 세션 끊김 → 연쇄 장애 +``` + +### 2.2 [심각] 쿼리 카운트 감소 로직 누락 + +**현상**: `TrackQueryInterceptor.decrementQueryCount()` 메서드는 존재하지만, 쿼리 완료/실패 시 **호출하는 코드가 없음** + +**영향**: +- 세션이 유지되는 동안 쿼리 카운트가 계속 증가만 함 +- 3개 쿼리 후 해당 세션은 더 이상 쿼리 불가 +- DISCONNECT 시에만 `sessionQueries.remove()`로 정리됨 +- 결과적으로 세션당 제한이 **사실상 무의미** (장기 연결에서 3개 쿼리 후 차단) + +**관련 코드**: + +```java +// TrackQueryInterceptor.java:68 - 정의만 존재, 호출부 없음 +public void decrementQueryCount(String sessionId) { + AtomicInteger count = sessionQueries.get(sessionId); + if (count != null) { + count.decrementAndGet(); + } +} +``` + +### 2.3 [심각] CachedThreadPool 무제한 스레드 생성 + +**현상**: `CancellableQueryManager`가 `Executors.newCachedThreadPool()` 사용 + +**영향**: +- 부하 시 무제한 스레드 생성 → OOM 또는 OS 스레드 한도 초과 +- 스레드 컨텍스트 스위칭 오버헤드 급증 + +**관련 코드**: `CancellableQueryManager.java` + +```java +// AS-IS: 상한 없는 스레드 풀 +private final ExecutorService queryExecutor = Executors.newCachedThreadPool(r -> { + Thread thread = new Thread(r); + thread.setName("query-executor-" + thread.getId()); + thread.setDaemon(true); + return thread; +}); +``` + +### 2.4 [심각] ChunkedTrackStreamingService 쿼리 취소 미구현 + +**현상**: `cancelQuery()` 메서드가 메트릭 정리만 수행하고 **실제 스트리밍 중단 로직이 없음** + +**영향**: +- 클라이언트가 연결을 끊어도 서버에서 DB 쿼리 + 데이터 가공이 계속 실행 +- 리소스(DB 커넥션, CPU, 메모리)가 불필요하게 점유됨 + +**관련 코드**: `ChunkedTrackStreamingService.java:1176` + +```java +// AS-IS: 취소 시 메트릭만 정리, 실행 중인 쿼리는 계속됨 +public void cancelQuery(String queryId) { + log.info("Cancelling chunked query: {}", queryId); + // TODO: 실제 취소 로직 구현 + cleanupQueryMetrics(queryId); +} +``` + +### 2.5 [중간] 백프레셔의 비정확한 버퍼 추적 + +**현상**: 전송 완료 후 버퍼 크기 감소를 `CompletableFuture + Thread.sleep(100ms)`로 처리 + +**영향**: +- 실제 네트워크 전송 완료와 무관하게 100ms 후 무조건 버퍼 감소 +- 네트워크 지연이 100ms보다 길면 버퍼가 과소 추적 → 실제 메모리 사용량보다 작게 인식 +- 네트워크 지연이 100ms보다 짧으면 불필요한 백프레셔 발생 + +**관련 코드**: `ChunkedTrackStreamingService.java:1069` + +```java +// AS-IS: 네트워크 전송 시간을 100ms로 "추정" +CompletableFuture.runAsync(() -> { + try { + Thread.sleep(100); // 네트워크 전송 시간 고려 + pendingBufferSize.addAndGet(-chunkSize); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } +}); +``` + +### 2.6 [중간] 두 개의 쿼리 관리 시스템 분리 + +**현상**: `ActiveQueryManager`와 `StompTrackStreamingService.activeQueries`가 각각 독립적으로 쿼리 상태를 관리 + +**영향**: +- 취소 시 한쪽만 처리될 가능성 +- 상태 불일치로 인한 리소스 누수 + +**관련 클래스**: +- `ActiveQueryManager`: 세션별 쿼리 관리 (`ConcurrentHashMap`) +- `StompTrackStreamingService`: 쿼리별 컨텍스트 관리 (`ConcurrentHashMap`) + +### 2.7 [중간] StompTrackStreamingService의 병합 모드 OOM 위험 + +**현상**: 병합 모드에서 모든 트랙 데이터를 메모리에 적재 + +**영향**: +- 대량 데이터(100만+ 트랙) 요청 시 힙 메모리 초과 → OOM +- G1GC Full GC 빈발 → STW(Stop-The-World) 증가 → 서비스 지연 + +### 2.8 [낮음] WebSocket 설정의 하드코딩 + +**현상**: 모든 WebSocket 관련 제한값이 Java 코드에 하드코딩 + +**영향**: +- 운영 중 튜닝 불가 (재배포 필요) +- 환경별 차별화 불가 + +| 하드코딩 항목 | 위치 | 값 | +|--------------|------|-----| +| `MAX_QUERIES_PER_SESSION` | TrackQueryInterceptor | 3 | +| `MAX_PENDING_BUFFER` | ChunkedTrackStreamingService | 50MB | +| `MAX_MESSAGE_SIZE_KB` | ChunkedTrackStreamingService | 1024KB | +| `BlockingQueue 크기` | StompTrackStreamingService | 100 | +| Inbound core/max | WebSocketStompConfig | 10/20 | +| Outbound core/max | WebSocketStompConfig | 20/40 | +| sendBufferSizeLimit | WebSocketStompConfig | 256MB | + +### 2.9 [낮음] 미사용 최적화 코드 사장 + +**현상**: `TrackStreamingOptimizer`에 네트워크 메트릭 기반 적응형 청크 조정 로직이 설계되어 있으나, 실제 서비스에서 연결/사용되지 않음 + +--- + +## 3. 개선 계획 (TO-BE) + +### 3.1 Phase 1 — 긴급 안정화 (즉시 적용) + +> 목표: 동시 요청 폭주 시 서비스 마비 방지 + +#### 3.1.1 글로벌 동시 쿼리 제한 도입 + +**AS-IS**: 세션당 제한만 존재 (글로벌 제한 없음) + +**TO-BE**: 서버 전체 동시 실행 쿼리 수를 Semaphore로 제한 + 대기 큐 구현 + +```java +// TO-BE: TrackQueryInterceptor.java — 글로벌 동시 제한 추가 +@Slf4j +@Component +@RequiredArgsConstructor +public class TrackQueryInterceptor implements ChannelInterceptor { + + private final ConcurrentHashMap sessionQueries = new ConcurrentHashMap<>(); + + // ▼ [추가] 글로벌 동시 쿼리 제한 + @Value("${websocket.query.max-concurrent-global:20}") + private int maxConcurrentGlobal; + + @Value("${websocket.query.max-per-session:3}") + private int maxQueriesPerSession; + + @Value("${websocket.query.queue-timeout-seconds:30}") + private int queueTimeoutSeconds; + + private Semaphore globalQuerySemaphore; + private final AtomicInteger waitingCount = new AtomicInteger(0); + + @PostConstruct + public void init() { + this.globalQuerySemaphore = new Semaphore(maxConcurrentGlobal, true); // fair=true + log.info("Global query semaphore initialized: maxConcurrent={}, queueTimeout={}s", + maxConcurrentGlobal, queueTimeoutSeconds); + } + + @Override + public Message preSend(Message message, MessageChannel channel) { + StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor( + message, StompHeaderAccessor.class); + + if (accessor != null && StompCommand.SEND.equals(accessor.getCommand())) { + String destination = accessor.getDestination(); + String sessionId = accessor.getSessionId(); + + if ("/app/tracks/query".equals(destination)) { + // 1. 세션당 제한 확인 + AtomicInteger queryCount = sessionQueries.computeIfAbsent( + sessionId, k -> new AtomicInteger(0)); + if (queryCount.get() >= maxQueriesPerSession) { + throw new IllegalStateException( + "Session query limit exceeded: " + maxQueriesPerSession); + } + + // 2. 글로벌 대기 큐 진입 + int waiting = waitingCount.incrementAndGet(); + log.info("Query queued: session={}, waiting={}, active={}/{}", + sessionId, waiting, + maxConcurrentGlobal - globalQuerySemaphore.availablePermits(), + maxConcurrentGlobal); + try { + boolean acquired = globalQuerySemaphore.tryAcquire( + queueTimeoutSeconds, TimeUnit.SECONDS); + if (!acquired) { + throw new IllegalStateException( + "Query queue timeout after " + queueTimeoutSeconds + "s. " + + "Server is busy, please retry later."); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Query interrupted while waiting"); + } finally { + waitingCount.decrementAndGet(); + } + + queryCount.incrementAndGet(); + } + } + return message; + } + + // ▼ [추가] 쿼리 완료 시 호출 — Semaphore 반환 + 카운트 감소 + public void releaseQuery(String sessionId) { + globalQuerySemaphore.release(); + decrementQueryCount(sessionId); + log.debug("Query released: session={}, available={}/{}", + sessionId, globalQuerySemaphore.availablePermits(), maxConcurrentGlobal); + } + + // 기존 메서드 유지 + public void decrementQueryCount(String sessionId) { + AtomicInteger count = sessionQueries.get(sessionId); + if (count != null && count.get() > 0) { + count.decrementAndGet(); + } + } + + // 모니터링용 + public int getActiveQueryCount() { + return maxConcurrentGlobal - globalQuerySemaphore.availablePermits(); + } + + public int getWaitingCount() { + return waitingCount.get(); + } +} +``` + +**application.yml 설정 추가**: +```yaml +websocket: + query: + max-concurrent-global: 20 # 서버 전체 동시 쿼리 상한 + max-per-session: 3 # 세션당 동시 쿼리 상한 + queue-timeout-seconds: 30 # 대기 큐 타임아웃 +``` + +#### 3.1.2 쿼리 완료/실패 시 리소스 반환 보장 + +**AS-IS**: `decrementQueryCount()` 호출부 없음 → 세션당 제한이 해제되지 않음 + +**TO-BE**: 스트리밍 서비스에서 쿼리 완료/실패/취소 시 반드시 `releaseQuery()` 호출 + +```java +// TO-BE: ChunkedTrackStreamingService.java — finally 블록에서 리소스 반환 +@Async("trackStreamingExecutor") +public void streamChunkedTracks(TrackQueryRequest request, + String queryId, String sessionId, + Consumer chunkConsumer, + Consumer statusConsumer) { + try { + // ... 기존 스트리밍 로직 ... + } catch (Exception e) { + // ... 에러 처리 ... + } finally { + // ▼ [추가] 반드시 리소스 반환 + trackQueryInterceptor.releaseQuery(sessionId); + activeQueryManager.completeQuery(sessionId); + cleanupQueryMetrics(queryId); + log.info("Query resources released: queryId={}, sessionId={}", queryId, sessionId); + } +} +``` + +```java +// TO-BE: StompTrackStreamingService.java — 동일하게 finally 보장 +@Async +public void streamTracks(TrackQueryRequest request, String queryId, + String sessionId, ...) { + try { + // ... 기존 스트리밍 로직 ... + } catch (Exception e) { + // ... 에러 처리 ... + } finally { + // ▼ [추가] 반드시 리소스 반환 + activeQueries.remove(queryId); + trackQueryInterceptor.releaseQuery(sessionId); + log.info("Query resources released: queryId={}, sessionId={}", queryId, sessionId); + } +} +``` + +#### 3.1.3 CachedThreadPool → 제한된 ThreadPoolExecutor 교체 + +**AS-IS**: `CancellableQueryManager`의 무제한 스레드 풀 + +```java +// AS-IS +private final ExecutorService queryExecutor = Executors.newCachedThreadPool(...); +``` + +**TO-BE**: 상한이 있는 ThreadPoolExecutor로 교체 + +```java +// TO-BE: CancellableQueryManager.java +private final ExecutorService queryExecutor = new ThreadPoolExecutor( + 5, // corePoolSize + 20, // maximumPoolSize (상한 설정) + 60L, TimeUnit.SECONDS, // keepAliveTime + new LinkedBlockingQueue<>(100), // 대기 큐 크기 제한 + r -> { + Thread thread = new Thread(r); + thread.setName("query-executor-" + thread.getId()); + thread.setDaemon(true); + return thread; + }, + new ThreadPoolExecutor.CallerRunsPolicy() // 큐 포화 시 호출 스레드에서 실행 +); +``` + +--- + +### 3.2 Phase 2 — 취소 및 정리 로직 완성 + +> 목표: 불필요한 리소스 점유 방지, 쿼리 생명주기 완전 관리 + +#### 3.2.1 ChunkedTrackStreamingService 쿼리 취소 구현 + +**AS-IS**: TODO 주석만 존재 + +```java +// AS-IS +public void cancelQuery(String queryId) { + log.info("Cancelling chunked query: {}", queryId); + // TODO: 실제 취소 로직 구현 + cleanupQueryMetrics(queryId); +} +``` + +**TO-BE**: AtomicBoolean 취소 플래그 + 쿼리 루프 내 체크 포인트 + +```java +// TO-BE: ChunkedTrackStreamingService.java + +// ▼ [추가] 쿼리별 취소 플래그 관리 +private final ConcurrentHashMap queryCancelFlags = new ConcurrentHashMap<>(); + +@Async("trackStreamingExecutor") +public void streamChunkedTracks(TrackQueryRequest request, + String queryId, String sessionId, ...) { + // ▼ [추가] 취소 플래그 등록 + AtomicBoolean cancelFlag = new AtomicBoolean(false); + queryCancelFlags.put(queryId, cancelFlag); + + try { + // ... 기존 로직에서 processTableRange 호출 시 cancelFlag 전달 ... + for (Map.Entry> entry : strategyMap.entrySet()) { + // ▼ [추가] 테이블 처리 전 취소 확인 + if (cancelFlag.get()) { + log.info("Query {} cancelled before processing table {}", queryId, entry.getKey()); + break; + } + // processTableRange(request, strategy, range, sessionId, viewportVesselIds, cancelFlag); + } + } finally { + queryCancelFlags.remove(queryId); + trackQueryInterceptor.releaseQuery(sessionId); + activeQueryManager.completeQuery(sessionId); + cleanupQueryMetrics(queryId); + } +} + +// ▼ [개선] 실제 취소 로직 구현 +public void cancelQuery(String queryId) { + AtomicBoolean flag = queryCancelFlags.get(queryId); + if (flag != null) { + flag.set(true); + log.info("Cancel flag set for query: {}", queryId); + } + cleanupQueryMetrics(queryId); +} +``` + +```java +// TO-BE: processTableRange 내부 — 주기적 취소 확인 +while (rs.next() && trackCount < MAX_TRACKS_PER_CHUNK) { + // ▼ [추가] 1000건마다 취소 확인 + if (trackCount % 1000 == 0 && cancelFlag.get()) { + log.info("Query cancelled during processing: table={}, processed={}", tableName, trackCount); + return Collections.emptyList(); + } + // ... 기존 처리 로직 ... +} +``` + +#### 3.2.2 쿼리 관리 시스템 통합 + +**AS-IS**: `ActiveQueryManager`와 `StompTrackStreamingService.activeQueries` 이원화 + +**TO-BE**: `ActiveQueryManager`를 단일 진실의 원천(Single Source of Truth)으로 통합 + +```java +// TO-BE: ActiveQueryManager.java — 취소 플래그 및 릴리즈 콜백 통합 +public static class ActiveQuery { + private final String sessionId; + private final String queryId; + private final LocalDateTime startTime; + private final AtomicBoolean cancelled = new AtomicBoolean(false); + private volatile String currentTable; + private volatile int processedChunks; + private Runnable onComplete; // ▼ [추가] 완료 콜백 + + public void setOnComplete(Runnable callback) { + this.onComplete = callback; + } +} + +public void completeQuery(String sessionId) { + ActiveQuery query = activeQueries.remove(sessionId); + if (query != null) { + // ▼ [추가] 등록된 콜백 실행 (Semaphore 반환 등) + if (query.onComplete != null) { + query.onComplete.run(); + } + log.debug("Query completed: sessionId={}, queryId={}", sessionId, query.getQueryId()); + } +} +``` + +--- + +### 3.3 Phase 3 — 백프레셔 고도화 + +> 목표: 정확한 버퍼 추적, 클라이언트 소비 속도 반영 + +#### 3.3.1 콜백 기반 버퍼 추적 + +**AS-IS**: `Thread.sleep(100ms)` 후 무조건 버퍼 감소 (추정치) + +**TO-BE**: 전송 완료 콜백에서 정확히 버퍼 감소 + +```java +// TO-BE: 전송 완료 콜백 기반 버퍼 관리 +// SimpMessagingTemplate의 convertAndSendToUser 대신 직접 전송 제어 + +// WebSocket 메시지 전송 래퍼 +private void sendWithBackpressureTracking(String sessionId, String destination, + TrackChunkResponse response, int chunkSize) { + pendingBufferSize.addAndGet(chunkSize); + try { + messagingTemplate.convertAndSendToUser(sessionId, destination, response); + // 전송 성공 즉시 버퍼 감소 + pendingBufferSize.addAndGet(-chunkSize); + } catch (Exception e) { + // 전송 실패 시에도 버퍼 감소 + pendingBufferSize.addAndGet(-chunkSize); + throw e; + } +} +``` + +#### 3.3.2 적응형 전송 지연 + +**AS-IS**: 총 트랙 수에 따른 정적 지연 + +**TO-BE**: 실시간 버퍼 사용률 + 최근 전송 성공률에 따른 동적 지연 + +```java +// TO-BE: 동적 지연 계산 +private int calculateAdaptiveDelay(BackpressureMetrics metrics) { + double bufferUsage = (double) pendingBufferSize.get() / MAX_PENDING_BUFFER; + + if (bufferUsage > 0.8) return 200; // 80% 이상: 강한 억제 + else if (bufferUsage > 0.5) return 100; // 50% 이상: 중간 억제 + else if (bufferUsage > 0.3) return 50; // 30% 이상: 약한 억제 + return 10; // 정상: 최소 지연 +} +``` + +--- + +### 3.4 Phase 4 — 설정 외부화 및 모니터링 + +> 목표: 운영 중 튜닝 가능, 실시간 상황 파악 + +#### 3.4.1 WebSocket 설정 Properties 클래스 도입 + +```java +// TO-BE: WebSocketProperties.java (신규) +@Data +@Component +@ConfigurationProperties(prefix = "websocket") +public class WebSocketProperties { + + private QueryProperties query = new QueryProperties(); + private TransportProperties transport = new TransportProperties(); + private BackpressureProperties backpressure = new BackpressureProperties(); + + @Data + public static class QueryProperties { + private int maxConcurrentGlobal = 20; + private int maxPerSession = 3; + private int queueTimeoutSeconds = 30; + } + + @Data + public static class TransportProperties { + private int inboundCorePoolSize = 10; + private int inboundMaxPoolSize = 20; + private int inboundQueueCapacity = 100; + private int outboundCorePoolSize = 20; + private int outboundMaxPoolSize = 40; + private int outboundQueueCapacity = 5000; + private int messageSizeLimitMb = 50; + private int sendBufferSizeLimitMb = 256; + private int sendTimeLimitSeconds = 120; + } + + @Data + public static class BackpressureProperties { + private long maxPendingBufferMb = 50; + private int maxMessageSizeKb = 1024; + private int minMessageSizeKb = 256; + } +} +``` + +**application.yml 통합 설정**: +```yaml +websocket: + query: + max-concurrent-global: 20 + max-per-session: 3 + queue-timeout-seconds: 30 + transport: + inbound-core-pool-size: 10 + inbound-max-pool-size: 20 + inbound-queue-capacity: 100 + outbound-core-pool-size: 20 + outbound-max-pool-size: 40 + outbound-queue-capacity: 5000 + message-size-limit-mb: 50 + send-buffer-size-limit-mb: 256 + send-time-limit-seconds: 120 + backpressure: + max-pending-buffer-mb: 50 + max-message-size-kb: 1024 + min-message-size-kb: 256 +``` + +#### 3.4.2 모니터링 엔드포인트 추가 + +```java +// TO-BE: WebSocketMonitorController.java (신규) +@RestController +@RequestMapping("/api/v1/monitor/websocket") +@RequiredArgsConstructor +public class WebSocketMonitorController { + + private final TrackQueryInterceptor queryInterceptor; + private final ActiveQueryManager activeQueryManager; + + @GetMapping("/status") + public Map getStatus() { + return Map.of( + "activeQueries", queryInterceptor.getActiveQueryCount(), + "waitingInQueue", queryInterceptor.getWaitingCount(), + "queryDetails", activeQueryManager.getAllActiveQueries().entrySet().stream() + .map(e -> Map.of( + "sessionId", e.getKey(), + "queryId", e.getValue().getQueryId(), + "startTime", e.getValue().getStartTime().toString(), + "table", String.valueOf(e.getValue().getCurrentTable()), + "chunks", e.getValue().getProcessedChunks() + )).collect(Collectors.toList()) + ); + } +} +``` + +--- + +## 4. 개선 우선순위 및 기대 효과 + +| 순위 | 개선 항목 | 심각도 | 난이도 | 기대 효과 | +|------|----------|--------|--------|-----------| +| **1** | 글로벌 동시 쿼리 제한 (Semaphore) | 심각 | 낮음 | 동시 요청 폭주 시 서비스 마비 방지 | +| **2** | 쿼리 완료 시 리소스 반환 (`releaseQuery`) | 심각 | 낮음 | 세션당 제한 정상 동작 | +| **3** | CachedThreadPool 교체 | 심각 | 낮음 | 무제한 스레드 생성 방지 | +| **4** | ChunkedTrackStreamingService 취소 구현 | 심각 | 중간 | 불필요한 리소스 점유 해소 | +| **5** | 쿼리 관리 시스템 통합 | 중간 | 중간 | 상태 불일치 해소 | +| **6** | 백프레셔 콜백 기반 전환 | 중간 | 중간 | 정확한 메모리 관리 | +| **7** | 설정 외부화 | 낮음 | 낮음 | 운영 중 튜닝 가능 | +| **8** | 모니터링 엔드포인트 | 낮음 | 낮음 | 실시간 상태 파악 | + +--- + +## 5. 제한 사항 및 고려 사항 + +### 5.1 글로벌 동시 제한 수 결정 기준 + +``` +maxConcurrentGlobal = min(DB커넥션풀 / 2, Async스레드풀max) + = min(60 / 2, 30) + = 20 (권장) +``` + +- DB 커넥션 풀(60개)의 50%를 WebSocket 쿼리에 할당 +- 나머지 50%는 REST API, 배치 작업, 헬스체크 등에 예비 +- `trackStreamingExecutor` max(30) 이하로 설정하여 스레드 풀 포화 방지 + +### 5.2 대기 큐 타임아웃 + +- 30초 권장 (클라이언트 UX 고려) +- 타임아웃 시 클라이언트에 명확한 에러 메시지 전달 필요 +- 프론트엔드에서 재시도 로직 또는 "서버가 바쁩니다" 안내 필요 + +### 5.3 기존 클라이언트 호환성 + +- STOMP 프로토콜 레벨 변경 없음 (하위 호환) +- 에러 메시지 형식이 변경될 수 있으므로 프론트엔드 에러 핸들링 확인 필요 + +--- + +## 부록 A. 관련 파일 목록 + +| 파일 | 역할 | +|------|------| +| `global/websocket/interceptor/TrackQueryInterceptor.java` | 세션별 쿼리 제한 인터셉터 | +| `global/websocket/service/ChunkedTrackStreamingService.java` | 청크 기반 스트리밍 (백프레셔) | +| `global/websocket/service/StompTrackStreamingService.java` | STOMP 스트리밍 (큐 + 지연) | +| `global/websocket/service/ActiveQueryManager.java` | 활성 쿼리 추적 | +| `global/config/AsyncConfig.java` | 비동기 스레드 풀 설정 | +| `global/config/WebSocketStompConfig.java` | WebSocket STOMP 설정 | +| `domain/vessel/service/query/CancellableQueryManager.java` | 쿼리 취소 관리 | +| `global/websocket/listener/WebSocketDisconnectListener.java` | 세션 종료 리스너 | +| `domain/vessel/service/optimization/TrackStreamingOptimizer.java` | 적응형 최적화 (미사용) | +| `monitoring/service/TrackStreamingMetrics.java` | 스트리밍 메트릭 | + +## 부록 B. 운영 환경 리소스 현황 + +| 리소스 | 설정값 | 비고 | +|--------|--------|------| +| JVM Heap | 8~16GB (서버 메모리의 1/3) | G1GC | +| Tomcat Threads | max 200 | | +| Tomcat Connections | max 10,000 | | +| DB Pool (Query) | max 60 | 주 사용 | +| DB Pool (Collect) | max 20 | 읽기 전용 | +| DB Pool (Batch) | max 20 | 메타데이터 | +| trackStreamingExecutor | core 15, max 30, queue 500 | CallerRunsPolicy | +| STOMP Inbound | core 10, max 20, queue 100 | | +| STOMP Outbound | core 20, max 40, queue 5000 | | +| WebSocket Buffer | send 256MB, message 50MB | | diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/interceptor/TrackQueryInterceptor.java b/src/main/java/gc/mda/signal_batch/global/websocket/interceptor/TrackQueryInterceptor.java index feecb49..12a9e46 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/interceptor/TrackQueryInterceptor.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/interceptor/TrackQueryInterceptor.java @@ -1,7 +1,7 @@ package gc.mda.signal_batch.global.websocket.interceptor; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.simp.stomp.StompCommand; @@ -16,12 +16,13 @@ import java.util.concurrent.atomic.AtomicInteger; @Slf4j @Component -@RequiredArgsConstructor public class TrackQueryInterceptor implements ChannelInterceptor { // 세션별 활성 쿼리 수 추적 private final ConcurrentHashMap sessionQueries = new ConcurrentHashMap<>(); - private static final int MAX_QUERIES_PER_SESSION = 3; + + @Value("${websocket.query.max-per-session:3}") + private int maxQueriesPerSession; @Override public Message preSend(Message message, MessageChannel channel) { @@ -37,9 +38,10 @@ public class TrackQueryInterceptor implements ChannelInterceptor { sessionId, k -> new AtomicInteger(0) ); - if (queryCount.get() >= MAX_QUERIES_PER_SESSION) { - log.warn("Session {} exceeded max queries limit", sessionId); - throw new IllegalStateException("Maximum concurrent queries exceeded"); + if (queryCount.get() >= maxQueriesPerSession) { + log.warn("Session {} exceeded max queries limit ({})", sessionId, maxQueriesPerSession); + throw new IllegalStateException( + "Maximum concurrent queries per session exceeded: " + maxQueriesPerSession); } queryCount.incrementAndGet(); @@ -65,10 +67,15 @@ public class TrackQueryInterceptor implements ChannelInterceptor { } } - public void decrementQueryCount(String sessionId) { + /** + * 쿼리 완료/실패 시 세션 카운트 감소 + 글로벌 슬롯 반환 + * 스트리밍 서비스의 finally 블록에서 호출 + */ + public void releaseQuery(String sessionId) { AtomicInteger count = sessionQueries.get(sessionId); - if (count != null) { - count.decrementAndGet(); + if (count != null && count.get() > 0) { + int remaining = count.decrementAndGet(); + log.debug("Session {} query released. Remaining: {}", sessionId, remaining); } } } \ No newline at end of file diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/ActiveQueryManager.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/ActiveQueryManager.java index 57a06e2..f7c435e 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/ActiveQueryManager.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/ActiveQueryManager.java @@ -1,15 +1,20 @@ package gc.mda.signal_batch.global.websocket.service; +import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** - * WebSocket 세션별 진행 중인 쿼리 추적 및 관리 + * WebSocket 세션별 진행 중인 쿼리 추적 및 글로벌 동시 쿼리 제한 관리 */ @Slf4j @Service @@ -18,6 +23,23 @@ public class ActiveQueryManager { // 세션ID -> 활성 쿼리 정보 매핑 private final Map activeQueries = new ConcurrentHashMap<>(); + // 글로벌 동시 쿼리 제한 (Semaphore, fair=true로 FIFO 대기) + private Semaphore globalQuerySemaphore; + private final AtomicInteger waitingCount = new AtomicInteger(0); + + @Value("${websocket.query.max-concurrent-global:30}") + private int maxConcurrentGlobal; + + @Value("${websocket.query.queue-timeout-seconds:30}") + private int queueTimeoutSeconds; + + @PostConstruct + public void init() { + this.globalQuerySemaphore = new Semaphore(maxConcurrentGlobal, true); + log.info("Global query semaphore initialized: maxConcurrent={}, queueTimeout={}s", + maxConcurrentGlobal, queueTimeoutSeconds); + } + /** * 활성 쿼리 정보 */ @@ -130,4 +152,60 @@ public class ActiveQueryManager { public int getActiveQueryCount() { return activeQueries.size(); } + + // ── 글로벌 동시 쿼리 제한 ── + + /** + * 글로벌 쿼리 슬롯 획득 (대기 큐 포함) + * @return true: 슬롯 획득 성공, false: 타임아웃으로 획득 실패 + */ + public boolean tryAcquireQuerySlot(String queryId) throws InterruptedException { + int waiting = waitingCount.incrementAndGet(); + int active = getGlobalActiveQueryCount(); + log.info("Query {} waiting for global slot: waiting={}, active={}/{}", + queryId, waiting, active, maxConcurrentGlobal); + try { + boolean acquired = globalQuerySemaphore.tryAcquire(queueTimeoutSeconds, TimeUnit.SECONDS); + if (acquired) { + log.info("Query {} acquired global slot: active={}/{}", + queryId, getGlobalActiveQueryCount(), maxConcurrentGlobal); + } else { + log.warn("Query {} timed out waiting for global slot after {}s (active={}/{})", + queryId, queueTimeoutSeconds, active, maxConcurrentGlobal); + } + return acquired; + } finally { + waitingCount.decrementAndGet(); + } + } + + /** + * 글로벌 쿼리 슬롯 반환 + */ + public void releaseQuerySlot(String queryId) { + globalQuerySemaphore.release(); + log.debug("Query {} released global slot: active={}/{}", + queryId, getGlobalActiveQueryCount(), maxConcurrentGlobal); + } + + /** + * 글로벌 활성 쿼리 수 (Semaphore 기준) + */ + public int getGlobalActiveQueryCount() { + return maxConcurrentGlobal - globalQuerySemaphore.availablePermits(); + } + + /** + * 대기 중인 쿼리 수 + */ + public int getWaitingCount() { + return waitingCount.get(); + } + + /** + * 최대 동시 쿼리 수 + */ + public int getMaxConcurrentGlobal() { + return maxConcurrentGlobal; + } } diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java index f94ffce..0edb022 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java @@ -3,6 +3,7 @@ package gc.mda.signal_batch.global.websocket.service; import gc.mda.signal_batch.global.util.ShipKindCodeConverter; import gc.mda.signal_batch.global.util.IntegrationSignalConstants; import gc.mda.signal_batch.global.websocket.dto.TrackChunkResponse; +import gc.mda.signal_batch.global.websocket.interceptor.TrackQueryInterceptor; import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack; import gc.mda.signal_batch.domain.vessel.dto.IntegrationVessel; import gc.mda.signal_batch.domain.vessel.service.IntegrationVesselService; @@ -53,6 +54,7 @@ public class ChunkedTrackStreamingService { private final TrackSimplificationStrategy simplificationStrategy; private final ActiveQueryManager activeQueryManager; private final IntegrationVesselService integrationVesselService; + private final TrackQueryInterceptor trackQueryInterceptor; private final WKTReader wktReader = new WKTReader(); @SuppressWarnings("unused") private final ExecutorService executorService = Executors.newFixedThreadPool(10); @@ -88,12 +90,14 @@ public class ChunkedTrackStreamingService { @Qualifier("queryDataSource") DataSource queryDataSource, TrackSimplificationStrategy simplificationStrategy, ActiveQueryManager activeQueryManager, - IntegrationVesselService integrationVesselService) { + IntegrationVesselService integrationVesselService, + TrackQueryInterceptor trackQueryInterceptor) { this.queryJdbcTemplate = queryJdbcTemplate; this.queryDataSource = queryDataSource; this.simplificationStrategy = simplificationStrategy; this.activeQueryManager = activeQueryManager; this.integrationVesselService = integrationVesselService; + this.trackQueryInterceptor = trackQueryInterceptor; } /** @@ -837,7 +841,20 @@ public class ChunkedTrackStreamingService { String sessionId, Consumer chunkConsumer, Consumer statusConsumer) { + boolean slotAcquired = false; try { + // 글로벌 동시 쿼리 슬롯 획득 (대기 큐에서 FIFO 순서로 대기) + slotAcquired = activeQueryManager.tryAcquireQuerySlot(queryId); + if (!slotAcquired) { + log.warn("Query {} rejected: global concurrent limit reached", queryId); + statusConsumer.accept(new QueryStatusUpdate( + queryId, "ERROR", + "Server is busy. Maximum concurrent queries reached. Please retry later.", + 0.0 + )); + return; + } + log.info("Starting chunked streaming for query: {}", queryId); queryStartTime = System.currentTimeMillis(); processedTimeRanges.clear(); @@ -1165,8 +1182,14 @@ public class ChunkedTrackStreamingService { 0.0 )); } finally { - // 쿼리 메트릭스 정리 + // 리소스 반환 (순서: 글로벌 슬롯 → 세션 카운트 → 메트릭스) + if (slotAcquired) { + activeQueryManager.releaseQuerySlot(queryId); + } + trackQueryInterceptor.releaseQuery(sessionId); + activeQueryManager.completeQuery(sessionId); cleanupQueryMetrics(queryId); + log.info("Query {} resources fully released (session={})", queryId, sessionId); } } @@ -1175,7 +1198,7 @@ public class ChunkedTrackStreamingService { */ public void cancelQuery(String queryId) { log.info("Cancelling chunked query: {}", queryId); - // TODO: 실제 취소 로직 구현 + // TODO: Phase 2.1에서 실제 취소 로직 구현 cleanupQueryMetrics(queryId); } diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/StompTrackStreamingService.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/StompTrackStreamingService.java index 46d4950..d351ead 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/StompTrackStreamingService.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/StompTrackStreamingService.java @@ -2,6 +2,7 @@ package gc.mda.signal_batch.global.websocket.service; import gc.mda.signal_batch.domain.gis.cache.AreaBoundaryCache; import gc.mda.signal_batch.global.websocket.dto.*; +import gc.mda.signal_batch.global.websocket.interceptor.TrackQueryInterceptor; import gc.mda.signal_batch.monitoring.service.TrackStreamingMetrics; import gc.mda.signal_batch.domain.vessel.service.filter.VesselTrackFilter; import gc.mda.signal_batch.domain.vessel.service.simplification.TrackSimplificationStrategy; @@ -40,6 +41,8 @@ public class StompTrackStreamingService { private final VesselTrackFilter vesselTrackFilter; + private final ActiveQueryManager activeQueryManager; + private final TrackQueryInterceptor trackQueryInterceptor; // 활성 쿼리 관리 private final Map activeQueries = new ConcurrentHashMap<>(); @@ -61,7 +64,9 @@ public class StompTrackStreamingService { TrackStreamingMetrics metrics, VesselTrackFilter vesselTrackFilter, TrackSimplificationStrategy simplificationStrategy, - VesselTrackMerger vesselTrackMerger) { + VesselTrackMerger vesselTrackMerger, + ActiveQueryManager activeQueryManager, + TrackQueryInterceptor trackQueryInterceptor) { this.queryDataSource = queryDataSource; this.queryJdbcTemplate = queryJdbcTemplate; this.areaBoundaryCache = areaBoundaryCache; @@ -69,6 +74,8 @@ public class StompTrackStreamingService { this.vesselTrackFilter = vesselTrackFilter; this.simplificationStrategy = simplificationStrategy; this.vesselTrackMerger = vesselTrackMerger; + this.activeQueryManager = activeQueryManager; + this.trackQueryInterceptor = trackQueryInterceptor; } @Async @@ -86,7 +93,20 @@ public class StompTrackStreamingService { context.setTimerSample(timer); metrics.recordQueryStarted(queryId, "WEBSOCKET"); + boolean slotAcquired = false; try { + // 글로벌 동시 쿼리 슬롯 획득 (대기 큐에서 FIFO 순서로 대기) + slotAcquired = activeQueryManager.tryAcquireQuerySlot(queryId); + if (!slotAcquired) { + log.warn("Query {} rejected: global concurrent limit reached", queryId); + statusConsumer.accept(new QueryStatusUpdate( + queryId, "ERROR", + "Server is busy. Maximum concurrent queries reached. Please retry later.", + 0.0 + )); + return; + } + // 입력 검증 validateRequest(request); @@ -174,6 +194,13 @@ public class StompTrackStreamingService { )); } finally { activeQueries.remove(queryId); + // 리소스 반환 (순서: 글로벌 슬롯 → 세션 카운트) + if (slotAcquired) { + activeQueryManager.releaseQuerySlot(queryId); + } + trackQueryInterceptor.releaseQuery(sessionId); + activeQueryManager.completeQuery(sessionId); + log.info("Query {} resources fully released (session={})", queryId, sessionId); } } diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index ae8702c..8db4776 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -262,6 +262,13 @@ vessel: # spring 하위가 아닌 최상위 레벨 t_abnormal_tracks: retention-months: 0 # 비정상 항적: 무한 보관 +# WebSocket 부하 제어 설정 +websocket: + query: + max-concurrent-global: 30 # 서버 전체 동시 실행 쿼리 상한 (Query풀 120 / 쿼리당 ~3커넥션) + max-per-session: 3 # 세션당 동시 쿼리 상한 + queue-timeout-seconds: 30 # 글로벌 대기 큐 타임아웃 + # 액추에이터 설정 management: endpoints: