signal-batch/docs/websocket-performance-improvement-report.md
HeungTak Lee 78ff307785 feat: 글로벌 동시 쿼리 제한(Semaphore) 및 쿼리 완료 시 리소스 반환 보장
Phase 1.1 + 1.2: WebSocket 리플레이 요청 동시 부하 제어
- ActiveQueryManager에 Fair Semaphore 기반 글로벌 동시 쿼리 제한 추가 (기본 30개)
- @Async 스트리밍 메서드 내에서 슬롯 획득 (인바운드 채널 블로킹 방지)
- 쿼리 완료/실패/취소 시 finally 블록에서 반드시 리소스 반환
  - 글로벌 Semaphore 슬롯 반환
  - 세션별 쿼리 카운트 감소 (기존 누락 수정)
  - ActiveQueryManager 쿼리 정리
- TrackQueryInterceptor 세션 제한값 외부 설정화 (@Value)
- application-prod.yml에 websocket.query 설정 추가

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-06 13:36:10 +09:00

866 lines
32 KiB
Markdown
Raw Blame 히스토리

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# WebSocket 성능 부하 개선 보고서
## 선박 항적 리플레이 서비스 — 동시 요청 부하 대응
| 항목 | 내용 |
|------|------|
| 작성일 | 2026-02-06 |
| 대상 시스템 | Signal Batch — 선박 항적 조회/리플레이 WebSocket 서비스 |
| 운영 환경 | Linux, `vessel-batch-control.sh``run-on-query-server-dev.sh` (prod 프로파일) |
| 문제 상황 | 다수 클라이언트의 리플레이 요청 동시 유입 시 서비스 장애 발생 |
---
## 1. 현황 분석 (AS-IS)
### 1.1 시스템 아키텍처 개요
```
클라이언트 (N개)
│ WebSocket STOMP (/ws-tracks)
┌─────────────────────────────────────────────────┐
│ Tomcat (max-threads: 200, max-connections: 10000) │
│ ┌─────────────────────────────────────────────┐ │
│ │ STOMP Inbound Channel (core:10, max:20) │ │
│ │ → TrackQueryInterceptor (세션당 3개 제한) │ │
│ └────────────┬────────────────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ @Async trackStreamingExecutor │ │
│ │ (core:15, max:30, queue:500) │ │
│ │ ┌───────────────────────────────────────┐ │ │
│ │ │ ChunkedTrackStreamingService │ │ │
│ │ │ └ 백프레셔: pendingBufferSize ≤ 50MB │ │ │
│ │ │ StompTrackStreamingService │ │ │
│ │ │ └ BlockingQueue(100) + Thread.sleep │ │ │
│ │ └───────────────────────────────────────┘ │ │
│ └────────────┬────────────────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ STOMP Outbound Channel (core:20, max:40) │ │
│ │ (queue: 5000) │ │
│ └─────────────────────────────────────────────┘ │
│ ▼ │
│ PostgreSQL + PostGIS (HikariCP max: 60) │
└─────────────────────────────────────────────────────┘
```
### 1.2 현재 구현된 부하 제어 메커니즘
#### (A) 세션당 동시 쿼리 제한 — `TrackQueryInterceptor`
**파일**: `global/websocket/interceptor/TrackQueryInterceptor.java`
```java
// AS-IS: 세션당 최대 3개 동시 쿼리 제한
private final ConcurrentHashMap<String, AtomicInteger> sessionQueries = new ConcurrentHashMap<>();
private static final int MAX_QUERIES_PER_SESSION = 3;
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
// ...
if ("/app/tracks/query".equals(destination)) {
AtomicInteger queryCount = sessionQueries.computeIfAbsent(
sessionId, k -> new AtomicInteger(0)
);
if (queryCount.get() >= MAX_QUERIES_PER_SESSION) {
throw new IllegalStateException("Maximum concurrent queries exceeded");
}
queryCount.incrementAndGet();
}
return message;
}
// 쿼리 완료 시 카운트 감소 메서드 (존재하지만 호출부 없음)
public void decrementQueryCount(String sessionId) {
AtomicInteger count = sessionQueries.get(sessionId);
if (count != null) {
count.decrementAndGet();
}
}
```
#### (B) 메모리 버퍼 기반 백프레셔 — `ChunkedTrackStreamingService`
**파일**: `global/websocket/service/ChunkedTrackStreamingService.java`
```java
// AS-IS: 50MB 버퍼 기반 백프레셔
private final AtomicLong pendingBufferSize = new AtomicLong(0);
private static final long MAX_PENDING_BUFFER = 50 * 1024 * 1024; // 50MB
// 버퍼 초과 시 busy-wait
int backpressureWaitCount = 0;
while (pendingBufferSize.get() > MAX_PENDING_BUFFER) {
Thread.sleep(50);
backpressureWaitCount++;
// 500ms 이상 대기 시 청크 크기 축소
if (backpressureWaitCount > 10 && metrics.dynamicChunkSizeKB > MIN_MESSAGE_SIZE_KB) {
metrics.dynamicChunkSizeKB = Math.max(MIN_MESSAGE_SIZE_KB,
metrics.dynamicChunkSizeKB - 512);
}
}
// 전송 완료 후 버퍼 감소 (비동기 + 100ms 고정 대기)
CompletableFuture.runAsync(() -> {
Thread.sleep(100); // 네트워크 전송 시간 "추정"
pendingBufferSize.addAndGet(-chunkSize);
});
```
#### (C) 비동기 스레드 풀 — `AsyncConfig`
**파일**: `global/config/AsyncConfig.java`
```java
// AS-IS: 고정 크기 스레드 풀
@Bean(name = "trackStreamingExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(15);
executor.setMaxPoolSize(30);
executor.setQueueCapacity(500);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
```
#### (D) 정적 전송 지연 — `StompTrackStreamingService`
**파일**: `global/websocket/service/StompTrackStreamingService.java`
```java
// AS-IS: 데이터 크기에 따른 고정 지연
private int calculateChunkDelay(int chunkSize, int totalTracks) {
if (totalTracks > 1000000) return 200;
else if (totalTracks > 500000) return 150;
else if (totalTracks > 100000) return 100;
else if (totalTracks > 50000) return 50;
else if (totalTracks > 10000) return 30;
return 10;
}
```
#### (E) STOMP 채널 설정 — `WebSocketStompConfig`
**파일**: `global/config/WebSocketStompConfig.java`
```java
// AS-IS: 인바운드/아웃바운드 채널 스레드 풀
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.taskExecutor()
.corePoolSize(10).maxPoolSize(20).queueCapacity(100);
}
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.taskExecutor()
.corePoolSize(20).maxPoolSize(40).queueCapacity(5000);
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration
.setMessageSizeLimit(50 * 1024 * 1024) // 50MB
.setSendBufferSizeLimit(256 * 1024 * 1024) // 256MB
.setSendTimeLimit(120 * 1000); // 120초
}
```
### 1.3 운영 환경 설정
| 항목 | 설정값 |
|------|--------|
| JVM 힙 | 서버 메모리의 1/3 (최소 8GB, 최대 16GB) |
| GC | G1GC, MaxGCPauseMillis=200 |
| Tomcat 스레드 | max: 200, min-spare: 10 |
| Tomcat 연결 | max-connections: 10000, accept-count: 100 |
| DB 커넥션 풀 (Query) | max: 60, min-idle: 10 |
| DB 커넥션 풀 (Collect) | max: 20, min-idle: 5 |
| DB 커넥션 풀 (Batch) | max: 20, min-idle: 10 |
---
## 2. 식별된 문제점
### 2.1 [심각] 글로벌 동시 쿼리 제한 없음
**현상**: 세션당 3개만 제한하므로, 100개 세션이 동시 접속하면 최대 **300개 동시 쿼리** 발생 가능
**영향**:
- DB 커넥션 풀(60개) 고갈 → 커넥션 대기 타임아웃
- `trackStreamingExecutor` 큐(500) 포화 → `CallerRunsPolicy`에 의해 인바운드 채널 스레드 블로킹
- 인바운드 채널(max 20) 블로킹 → 새 WebSocket 요청 처리 불가 → 전체 서비스 마비
**관련 코드**: `TrackQueryInterceptor.java` — 글로벌 카운터 없음
```
[시나리오: 50개 클라이언트 × 3개 쿼리 = 150개 동시 요청]
trackStreamingExecutor (max: 30 스레드)
→ 30개 실행 + 500개 큐 대기
→ 큐 포화 시 CallerRunsPolicy 발동
→ 인바운드 채널 스레드(20개)에서 직접 실행
→ 인바운드 채널 블로킹 → WebSocket 메시지 처리 중단
→ 하트비트 실패 → 세션 끊김 → 연쇄 장애
```
### 2.2 [심각] 쿼리 카운트 감소 로직 누락
**현상**: `TrackQueryInterceptor.decrementQueryCount()` 메서드는 존재하지만, 쿼리 완료/실패 시 **호출하는 코드가 없음**
**영향**:
- 세션이 유지되는 동안 쿼리 카운트가 계속 증가만 함
- 3개 쿼리 후 해당 세션은 더 이상 쿼리 불가
- DISCONNECT 시에만 `sessionQueries.remove()`로 정리됨
- 결과적으로 세션당 제한이 **사실상 무의미** (장기 연결에서 3개 쿼리 후 차단)
**관련 코드**:
```java
// TrackQueryInterceptor.java:68 - 정의만 존재, 호출부 없음
public void decrementQueryCount(String sessionId) {
AtomicInteger count = sessionQueries.get(sessionId);
if (count != null) {
count.decrementAndGet();
}
}
```
### 2.3 [심각] CachedThreadPool 무제한 스레드 생성
**현상**: `CancellableQueryManager``Executors.newCachedThreadPool()` 사용
**영향**:
- 부하 시 무제한 스레드 생성 → OOM 또는 OS 스레드 한도 초과
- 스레드 컨텍스트 스위칭 오버헤드 급증
**관련 코드**: `CancellableQueryManager.java`
```java
// AS-IS: 상한 없는 스레드 풀
private final ExecutorService queryExecutor = Executors.newCachedThreadPool(r -> {
Thread thread = new Thread(r);
thread.setName("query-executor-" + thread.getId());
thread.setDaemon(true);
return thread;
});
```
### 2.4 [심각] ChunkedTrackStreamingService 쿼리 취소 미구현
**현상**: `cancelQuery()` 메서드가 메트릭 정리만 수행하고 **실제 스트리밍 중단 로직이 없음**
**영향**:
- 클라이언트가 연결을 끊어도 서버에서 DB 쿼리 + 데이터 가공이 계속 실행
- 리소스(DB 커넥션, CPU, 메모리)가 불필요하게 점유됨
**관련 코드**: `ChunkedTrackStreamingService.java:1176`
```java
// AS-IS: 취소 시 메트릭만 정리, 실행 중인 쿼리는 계속됨
public void cancelQuery(String queryId) {
log.info("Cancelling chunked query: {}", queryId);
// TODO: 실제 취소 로직 구현
cleanupQueryMetrics(queryId);
}
```
### 2.5 [중간] 백프레셔의 비정확한 버퍼 추적
**현상**: 전송 완료 후 버퍼 크기 감소를 `CompletableFuture + Thread.sleep(100ms)`로 처리
**영향**:
- 실제 네트워크 전송 완료와 무관하게 100ms 후 무조건 버퍼 감소
- 네트워크 지연이 100ms보다 길면 버퍼가 과소 추적 → 실제 메모리 사용량보다 작게 인식
- 네트워크 지연이 100ms보다 짧으면 불필요한 백프레셔 발생
**관련 코드**: `ChunkedTrackStreamingService.java:1069`
```java
// AS-IS: 네트워크 전송 시간을 100ms로 "추정"
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(100); // 네트워크 전송 시간 고려
pendingBufferSize.addAndGet(-chunkSize);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
```
### 2.6 [중간] 두 개의 쿼리 관리 시스템 분리
**현상**: `ActiveQueryManager``StompTrackStreamingService.activeQueries`가 각각 독립적으로 쿼리 상태를 관리
**영향**:
- 취소 시 한쪽만 처리될 가능성
- 상태 불일치로 인한 리소스 누수
**관련 클래스**:
- `ActiveQueryManager`: 세션별 쿼리 관리 (`ConcurrentHashMap<String, ActiveQuery>`)
- `StompTrackStreamingService`: 쿼리별 컨텍스트 관리 (`ConcurrentHashMap<String, QueryContext>`)
### 2.7 [중간] StompTrackStreamingService의 병합 모드 OOM 위험
**현상**: 병합 모드에서 모든 트랙 데이터를 메모리에 적재
**영향**:
- 대량 데이터(100만+ 트랙) 요청 시 힙 메모리 초과 → OOM
- G1GC Full GC 빈발 → STW(Stop-The-World) 증가 → 서비스 지연
### 2.8 [낮음] WebSocket 설정의 하드코딩
**현상**: 모든 WebSocket 관련 제한값이 Java 코드에 하드코딩
**영향**:
- 운영 중 튜닝 불가 (재배포 필요)
- 환경별 차별화 불가
| 하드코딩 항목 | 위치 | 값 |
|--------------|------|-----|
| `MAX_QUERIES_PER_SESSION` | TrackQueryInterceptor | 3 |
| `MAX_PENDING_BUFFER` | ChunkedTrackStreamingService | 50MB |
| `MAX_MESSAGE_SIZE_KB` | ChunkedTrackStreamingService | 1024KB |
| `BlockingQueue 크기` | StompTrackStreamingService | 100 |
| Inbound core/max | WebSocketStompConfig | 10/20 |
| Outbound core/max | WebSocketStompConfig | 20/40 |
| sendBufferSizeLimit | WebSocketStompConfig | 256MB |
### 2.9 [낮음] 미사용 최적화 코드 사장
**현상**: `TrackStreamingOptimizer`에 네트워크 메트릭 기반 적응형 청크 조정 로직이 설계되어 있으나, 실제 서비스에서 연결/사용되지 않음
---
## 3. 개선 계획 (TO-BE)
### 3.1 Phase 1 — 긴급 안정화 (즉시 적용)
> 목표: 동시 요청 폭주 시 서비스 마비 방지
#### 3.1.1 글로벌 동시 쿼리 제한 도입
**AS-IS**: 세션당 제한만 존재 (글로벌 제한 없음)
**TO-BE**: 서버 전체 동시 실행 쿼리 수를 Semaphore로 제한 + 대기 큐 구현
```java
// TO-BE: TrackQueryInterceptor.java — 글로벌 동시 제한 추가
@Slf4j
@Component
@RequiredArgsConstructor
public class TrackQueryInterceptor implements ChannelInterceptor {
private final ConcurrentHashMap<String, AtomicInteger> sessionQueries = new ConcurrentHashMap<>();
// ▼ [추가] 글로벌 동시 쿼리 제한
@Value("${websocket.query.max-concurrent-global:20}")
private int maxConcurrentGlobal;
@Value("${websocket.query.max-per-session:3}")
private int maxQueriesPerSession;
@Value("${websocket.query.queue-timeout-seconds:30}")
private int queueTimeoutSeconds;
private Semaphore globalQuerySemaphore;
private final AtomicInteger waitingCount = new AtomicInteger(0);
@PostConstruct
public void init() {
this.globalQuerySemaphore = new Semaphore(maxConcurrentGlobal, true); // fair=true
log.info("Global query semaphore initialized: maxConcurrent={}, queueTimeout={}s",
maxConcurrentGlobal, queueTimeoutSeconds);
}
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(
message, StompHeaderAccessor.class);
if (accessor != null && StompCommand.SEND.equals(accessor.getCommand())) {
String destination = accessor.getDestination();
String sessionId = accessor.getSessionId();
if ("/app/tracks/query".equals(destination)) {
// 1. 세션당 제한 확인
AtomicInteger queryCount = sessionQueries.computeIfAbsent(
sessionId, k -> new AtomicInteger(0));
if (queryCount.get() >= maxQueriesPerSession) {
throw new IllegalStateException(
"Session query limit exceeded: " + maxQueriesPerSession);
}
// 2. 글로벌 대기 큐 진입
int waiting = waitingCount.incrementAndGet();
log.info("Query queued: session={}, waiting={}, active={}/{}",
sessionId, waiting,
maxConcurrentGlobal - globalQuerySemaphore.availablePermits(),
maxConcurrentGlobal);
try {
boolean acquired = globalQuerySemaphore.tryAcquire(
queueTimeoutSeconds, TimeUnit.SECONDS);
if (!acquired) {
throw new IllegalStateException(
"Query queue timeout after " + queueTimeoutSeconds + "s. " +
"Server is busy, please retry later.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Query interrupted while waiting");
} finally {
waitingCount.decrementAndGet();
}
queryCount.incrementAndGet();
}
}
return message;
}
// ▼ [추가] 쿼리 완료 시 호출 — Semaphore 반환 + 카운트 감소
public void releaseQuery(String sessionId) {
globalQuerySemaphore.release();
decrementQueryCount(sessionId);
log.debug("Query released: session={}, available={}/{}",
sessionId, globalQuerySemaphore.availablePermits(), maxConcurrentGlobal);
}
// 기존 메서드 유지
public void decrementQueryCount(String sessionId) {
AtomicInteger count = sessionQueries.get(sessionId);
if (count != null && count.get() > 0) {
count.decrementAndGet();
}
}
// 모니터링용
public int getActiveQueryCount() {
return maxConcurrentGlobal - globalQuerySemaphore.availablePermits();
}
public int getWaitingCount() {
return waitingCount.get();
}
}
```
**application.yml 설정 추가**:
```yaml
websocket:
query:
max-concurrent-global: 20 # 서버 전체 동시 쿼리 상한
max-per-session: 3 # 세션당 동시 쿼리 상한
queue-timeout-seconds: 30 # 대기 큐 타임아웃
```
#### 3.1.2 쿼리 완료/실패 시 리소스 반환 보장
**AS-IS**: `decrementQueryCount()` 호출부 없음 → 세션당 제한이 해제되지 않음
**TO-BE**: 스트리밍 서비스에서 쿼리 완료/실패/취소 시 반드시 `releaseQuery()` 호출
```java
// TO-BE: ChunkedTrackStreamingService.java — finally 블록에서 리소스 반환
@Async("trackStreamingExecutor")
public void streamChunkedTracks(TrackQueryRequest request,
String queryId, String sessionId,
Consumer<TrackChunkResponse> chunkConsumer,
Consumer<QueryStatusUpdate> statusConsumer) {
try {
// ... 기존 스트리밍 로직 ...
} catch (Exception e) {
// ... 에러 처리 ...
} finally {
// ▼ [추가] 반드시 리소스 반환
trackQueryInterceptor.releaseQuery(sessionId);
activeQueryManager.completeQuery(sessionId);
cleanupQueryMetrics(queryId);
log.info("Query resources released: queryId={}, sessionId={}", queryId, sessionId);
}
}
```
```java
// TO-BE: StompTrackStreamingService.java — 동일하게 finally 보장
@Async
public void streamTracks(TrackQueryRequest request, String queryId,
String sessionId, ...) {
try {
// ... 기존 스트리밍 로직 ...
} catch (Exception e) {
// ... 에러 처리 ...
} finally {
// ▼ [추가] 반드시 리소스 반환
activeQueries.remove(queryId);
trackQueryInterceptor.releaseQuery(sessionId);
log.info("Query resources released: queryId={}, sessionId={}", queryId, sessionId);
}
}
```
#### 3.1.3 CachedThreadPool → 제한된 ThreadPoolExecutor 교체
**AS-IS**: `CancellableQueryManager`의 무제한 스레드 풀
```java
// AS-IS
private final ExecutorService queryExecutor = Executors.newCachedThreadPool(...);
```
**TO-BE**: 상한이 있는 ThreadPoolExecutor로 교체
```java
// TO-BE: CancellableQueryManager.java
private final ExecutorService queryExecutor = new ThreadPoolExecutor(
5, // corePoolSize
20, // maximumPoolSize (상한 설정)
60L, TimeUnit.SECONDS, // keepAliveTime
new LinkedBlockingQueue<>(100), // 대기 큐 크기 제한
r -> {
Thread thread = new Thread(r);
thread.setName("query-executor-" + thread.getId());
thread.setDaemon(true);
return thread;
},
new ThreadPoolExecutor.CallerRunsPolicy() // 큐 포화 시 호출 스레드에서 실행
);
```
---
### 3.2 Phase 2 — 취소 및 정리 로직 완성
> 목표: 불필요한 리소스 점유 방지, 쿼리 생명주기 완전 관리
#### 3.2.1 ChunkedTrackStreamingService 쿼리 취소 구현
**AS-IS**: TODO 주석만 존재
```java
// AS-IS
public void cancelQuery(String queryId) {
log.info("Cancelling chunked query: {}", queryId);
// TODO: 실제 취소 로직 구현
cleanupQueryMetrics(queryId);
}
```
**TO-BE**: AtomicBoolean 취소 플래그 + 쿼리 루프 내 체크 포인트
```java
// TO-BE: ChunkedTrackStreamingService.java
// ▼ [추가] 쿼리별 취소 플래그 관리
private final ConcurrentHashMap<String, AtomicBoolean> queryCancelFlags = new ConcurrentHashMap<>();
@Async("trackStreamingExecutor")
public void streamChunkedTracks(TrackQueryRequest request,
String queryId, String sessionId, ...) {
// ▼ [추가] 취소 플래그 등록
AtomicBoolean cancelFlag = new AtomicBoolean(false);
queryCancelFlags.put(queryId, cancelFlag);
try {
// ... 기존 로직에서 processTableRange 호출 시 cancelFlag 전달 ...
for (Map.Entry<TableStrategy, List<TimeRange>> entry : strategyMap.entrySet()) {
// ▼ [추가] 테이블 처리 전 취소 확인
if (cancelFlag.get()) {
log.info("Query {} cancelled before processing table {}", queryId, entry.getKey());
break;
}
// processTableRange(request, strategy, range, sessionId, viewportVesselIds, cancelFlag);
}
} finally {
queryCancelFlags.remove(queryId);
trackQueryInterceptor.releaseQuery(sessionId);
activeQueryManager.completeQuery(sessionId);
cleanupQueryMetrics(queryId);
}
}
// ▼ [개선] 실제 취소 로직 구현
public void cancelQuery(String queryId) {
AtomicBoolean flag = queryCancelFlags.get(queryId);
if (flag != null) {
flag.set(true);
log.info("Cancel flag set for query: {}", queryId);
}
cleanupQueryMetrics(queryId);
}
```
```java
// TO-BE: processTableRange 내부 — 주기적 취소 확인
while (rs.next() && trackCount < MAX_TRACKS_PER_CHUNK) {
// ▼ [추가] 1000건마다 취소 확인
if (trackCount % 1000 == 0 && cancelFlag.get()) {
log.info("Query cancelled during processing: table={}, processed={}", tableName, trackCount);
return Collections.emptyList();
}
// ... 기존 처리 로직 ...
}
```
#### 3.2.2 쿼리 관리 시스템 통합
**AS-IS**: `ActiveQueryManager``StompTrackStreamingService.activeQueries` 이원화
**TO-BE**: `ActiveQueryManager`를 단일 진실의 원천(Single Source of Truth)으로 통합
```java
// TO-BE: ActiveQueryManager.java — 취소 플래그 및 릴리즈 콜백 통합
public static class ActiveQuery {
private final String sessionId;
private final String queryId;
private final LocalDateTime startTime;
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private volatile String currentTable;
private volatile int processedChunks;
private Runnable onComplete; // ▼ [추가] 완료 콜백
public void setOnComplete(Runnable callback) {
this.onComplete = callback;
}
}
public void completeQuery(String sessionId) {
ActiveQuery query = activeQueries.remove(sessionId);
if (query != null) {
// ▼ [추가] 등록된 콜백 실행 (Semaphore 반환 등)
if (query.onComplete != null) {
query.onComplete.run();
}
log.debug("Query completed: sessionId={}, queryId={}", sessionId, query.getQueryId());
}
}
```
---
### 3.3 Phase 3 — 백프레셔 고도화
> 목표: 정확한 버퍼 추적, 클라이언트 소비 속도 반영
#### 3.3.1 콜백 기반 버퍼 추적
**AS-IS**: `Thread.sleep(100ms)` 후 무조건 버퍼 감소 (추정치)
**TO-BE**: 전송 완료 콜백에서 정확히 버퍼 감소
```java
// TO-BE: 전송 완료 콜백 기반 버퍼 관리
// SimpMessagingTemplate의 convertAndSendToUser 대신 직접 전송 제어
// WebSocket 메시지 전송 래퍼
private void sendWithBackpressureTracking(String sessionId, String destination,
TrackChunkResponse response, int chunkSize) {
pendingBufferSize.addAndGet(chunkSize);
try {
messagingTemplate.convertAndSendToUser(sessionId, destination, response);
// 전송 성공 즉시 버퍼 감소
pendingBufferSize.addAndGet(-chunkSize);
} catch (Exception e) {
// 전송 실패 시에도 버퍼 감소
pendingBufferSize.addAndGet(-chunkSize);
throw e;
}
}
```
#### 3.3.2 적응형 전송 지연
**AS-IS**: 총 트랙 수에 따른 정적 지연
**TO-BE**: 실시간 버퍼 사용률 + 최근 전송 성공률에 따른 동적 지연
```java
// TO-BE: 동적 지연 계산
private int calculateAdaptiveDelay(BackpressureMetrics metrics) {
double bufferUsage = (double) pendingBufferSize.get() / MAX_PENDING_BUFFER;
if (bufferUsage > 0.8) return 200; // 80% 이상: 강한 억제
else if (bufferUsage > 0.5) return 100; // 50% 이상: 중간 억제
else if (bufferUsage > 0.3) return 50; // 30% 이상: 약한 억제
return 10; // 정상: 최소 지연
}
```
---
### 3.4 Phase 4 — 설정 외부화 및 모니터링
> 목표: 운영 중 튜닝 가능, 실시간 상황 파악
#### 3.4.1 WebSocket 설정 Properties 클래스 도입
```java
// TO-BE: WebSocketProperties.java (신규)
@Data
@Component
@ConfigurationProperties(prefix = "websocket")
public class WebSocketProperties {
private QueryProperties query = new QueryProperties();
private TransportProperties transport = new TransportProperties();
private BackpressureProperties backpressure = new BackpressureProperties();
@Data
public static class QueryProperties {
private int maxConcurrentGlobal = 20;
private int maxPerSession = 3;
private int queueTimeoutSeconds = 30;
}
@Data
public static class TransportProperties {
private int inboundCorePoolSize = 10;
private int inboundMaxPoolSize = 20;
private int inboundQueueCapacity = 100;
private int outboundCorePoolSize = 20;
private int outboundMaxPoolSize = 40;
private int outboundQueueCapacity = 5000;
private int messageSizeLimitMb = 50;
private int sendBufferSizeLimitMb = 256;
private int sendTimeLimitSeconds = 120;
}
@Data
public static class BackpressureProperties {
private long maxPendingBufferMb = 50;
private int maxMessageSizeKb = 1024;
private int minMessageSizeKb = 256;
}
}
```
**application.yml 통합 설정**:
```yaml
websocket:
query:
max-concurrent-global: 20
max-per-session: 3
queue-timeout-seconds: 30
transport:
inbound-core-pool-size: 10
inbound-max-pool-size: 20
inbound-queue-capacity: 100
outbound-core-pool-size: 20
outbound-max-pool-size: 40
outbound-queue-capacity: 5000
message-size-limit-mb: 50
send-buffer-size-limit-mb: 256
send-time-limit-seconds: 120
backpressure:
max-pending-buffer-mb: 50
max-message-size-kb: 1024
min-message-size-kb: 256
```
#### 3.4.2 모니터링 엔드포인트 추가
```java
// TO-BE: WebSocketMonitorController.java (신규)
@RestController
@RequestMapping("/api/v1/monitor/websocket")
@RequiredArgsConstructor
public class WebSocketMonitorController {
private final TrackQueryInterceptor queryInterceptor;
private final ActiveQueryManager activeQueryManager;
@GetMapping("/status")
public Map<String, Object> getStatus() {
return Map.of(
"activeQueries", queryInterceptor.getActiveQueryCount(),
"waitingInQueue", queryInterceptor.getWaitingCount(),
"queryDetails", activeQueryManager.getAllActiveQueries().entrySet().stream()
.map(e -> Map.of(
"sessionId", e.getKey(),
"queryId", e.getValue().getQueryId(),
"startTime", e.getValue().getStartTime().toString(),
"table", String.valueOf(e.getValue().getCurrentTable()),
"chunks", e.getValue().getProcessedChunks()
)).collect(Collectors.toList())
);
}
}
```
---
## 4. 개선 우선순위 및 기대 효과
| 순위 | 개선 항목 | 심각도 | 난이도 | 기대 효과 |
|------|----------|--------|--------|-----------|
| **1** | 글로벌 동시 쿼리 제한 (Semaphore) | 심각 | 낮음 | 동시 요청 폭주 시 서비스 마비 방지 |
| **2** | 쿼리 완료 시 리소스 반환 (`releaseQuery`) | 심각 | 낮음 | 세션당 제한 정상 동작 |
| **3** | CachedThreadPool 교체 | 심각 | 낮음 | 무제한 스레드 생성 방지 |
| **4** | ChunkedTrackStreamingService 취소 구현 | 심각 | 중간 | 불필요한 리소스 점유 해소 |
| **5** | 쿼리 관리 시스템 통합 | 중간 | 중간 | 상태 불일치 해소 |
| **6** | 백프레셔 콜백 기반 전환 | 중간 | 중간 | 정확한 메모리 관리 |
| **7** | 설정 외부화 | 낮음 | 낮음 | 운영 중 튜닝 가능 |
| **8** | 모니터링 엔드포인트 | 낮음 | 낮음 | 실시간 상태 파악 |
---
## 5. 제한 사항 및 고려 사항
### 5.1 글로벌 동시 제한 수 결정 기준
```
maxConcurrentGlobal = min(DB커넥션풀 / 2, Async스레드풀max)
= min(60 / 2, 30)
= 20 (권장)
```
- DB 커넥션 풀(60개)의 50%를 WebSocket 쿼리에 할당
- 나머지 50%는 REST API, 배치 작업, 헬스체크 등에 예비
- `trackStreamingExecutor` max(30) 이하로 설정하여 스레드 풀 포화 방지
### 5.2 대기 큐 타임아웃
- 30초 권장 (클라이언트 UX 고려)
- 타임아웃 시 클라이언트에 명확한 에러 메시지 전달 필요
- 프론트엔드에서 재시도 로직 또는 "서버가 바쁩니다" 안내 필요
### 5.3 기존 클라이언트 호환성
- STOMP 프로토콜 레벨 변경 없음 (하위 호환)
- 에러 메시지 형식이 변경될 수 있으므로 프론트엔드 에러 핸들링 확인 필요
---
## 부록 A. 관련 파일 목록
| 파일 | 역할 |
|------|------|
| `global/websocket/interceptor/TrackQueryInterceptor.java` | 세션별 쿼리 제한 인터셉터 |
| `global/websocket/service/ChunkedTrackStreamingService.java` | 청크 기반 스트리밍 (백프레셔) |
| `global/websocket/service/StompTrackStreamingService.java` | STOMP 스트리밍 (큐 + 지연) |
| `global/websocket/service/ActiveQueryManager.java` | 활성 쿼리 추적 |
| `global/config/AsyncConfig.java` | 비동기 스레드 풀 설정 |
| `global/config/WebSocketStompConfig.java` | WebSocket STOMP 설정 |
| `domain/vessel/service/query/CancellableQueryManager.java` | 쿼리 취소 관리 |
| `global/websocket/listener/WebSocketDisconnectListener.java` | 세션 종료 리스너 |
| `domain/vessel/service/optimization/TrackStreamingOptimizer.java` | 적응형 최적화 (미사용) |
| `monitoring/service/TrackStreamingMetrics.java` | 스트리밍 메트릭 |
## 부록 B. 운영 환경 리소스 현황
| 리소스 | 설정값 | 비고 |
|--------|--------|------|
| JVM Heap | 8~16GB (서버 메모리의 1/3) | G1GC |
| Tomcat Threads | max 200 | |
| Tomcat Connections | max 10,000 | |
| DB Pool (Query) | max 60 | 주 사용 |
| DB Pool (Collect) | max 20 | 읽기 전용 |
| DB Pool (Batch) | max 20 | 메타데이터 |
| trackStreamingExecutor | core 15, max 30, queue 500 | CallerRunsPolicy |
| STOMP Inbound | core 10, max 20, queue 100 | |
| STOMP Outbound | core 20, max 40, queue 5000 | |
| WebSocket Buffer | send 256MB, message 50MB | |