signal-batch/docs/websocket-performance-improvement-report.md
HeungTak Lee dc586dde0c docs: Phase 5~6 구현 진행 문서 및 성능 보고서 업데이트
- implementation-progress.md: Phase 5~6 체크리스트 추가 (전항목 완료)
- websocket-performance-improvement-report.md: 대기열 구조, 캐시 아키텍처 설명 추가

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-06 15:34:32 +09:00

979 lines
35 KiB
Markdown
Raw Blame 히스토리

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# WebSocket 성능 부하 개선 보고서
## 선박 항적 리플레이 서비스 — 동시 요청 부하 대응
| 항목 | 내용 |
|------|------|
| 작성일 | 2026-02-06 |
| 대상 시스템 | Signal Batch — 선박 항적 조회/리플레이 WebSocket 서비스 |
| 운영 환경 | Linux, `vessel-batch-control.sh``run-on-query-server-dev.sh` (prod 프로파일) |
| 문제 상황 | 다수 클라이언트의 리플레이 요청 동시 유입 시 서비스 장애 발생 |
---
## 1. 현황 분석 (AS-IS)
### 1.1 시스템 아키텍처 개요
```
클라이언트 (N개)
│ WebSocket STOMP (/ws-tracks)
┌─────────────────────────────────────────────────┐
│ Tomcat (max-threads: 200, max-connections: 10000) │
│ ┌─────────────────────────────────────────────┐ │
│ │ STOMP Inbound Channel (core:10, max:20) │ │
│ │ → TrackQueryInterceptor (세션당 3개 제한) │ │
│ └────────────┬────────────────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ @Async trackStreamingExecutor │ │
│ │ (core:15, max:30, queue:500) │ │
│ │ ┌───────────────────────────────────────┐ │ │
│ │ │ ChunkedTrackStreamingService │ │ │
│ │ │ └ 백프레셔: pendingBufferSize ≤ 50MB │ │ │
│ │ │ StompTrackStreamingService │ │ │
│ │ │ └ BlockingQueue(100) + Thread.sleep │ │ │
│ │ └───────────────────────────────────────┘ │ │
│ └────────────┬────────────────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ STOMP Outbound Channel (core:20, max:40) │ │
│ │ (queue: 5000) │ │
│ └─────────────────────────────────────────────┘ │
│ ▼ │
│ PostgreSQL + PostGIS (HikariCP max: 60) │
└─────────────────────────────────────────────────────┘
```
### 1.2 현재 구현된 부하 제어 메커니즘
#### (A) 세션당 동시 쿼리 제한 — `TrackQueryInterceptor`
**파일**: `global/websocket/interceptor/TrackQueryInterceptor.java`
```java
// AS-IS: 세션당 최대 3개 동시 쿼리 제한
private final ConcurrentHashMap<String, AtomicInteger> sessionQueries = new ConcurrentHashMap<>();
private static final int MAX_QUERIES_PER_SESSION = 3;
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
// ...
if ("/app/tracks/query".equals(destination)) {
AtomicInteger queryCount = sessionQueries.computeIfAbsent(
sessionId, k -> new AtomicInteger(0)
);
if (queryCount.get() >= MAX_QUERIES_PER_SESSION) {
throw new IllegalStateException("Maximum concurrent queries exceeded");
}
queryCount.incrementAndGet();
}
return message;
}
// 쿼리 완료 시 카운트 감소 메서드 (존재하지만 호출부 없음)
public void decrementQueryCount(String sessionId) {
AtomicInteger count = sessionQueries.get(sessionId);
if (count != null) {
count.decrementAndGet();
}
}
```
#### (B) 메모리 버퍼 기반 백프레셔 — `ChunkedTrackStreamingService`
**파일**: `global/websocket/service/ChunkedTrackStreamingService.java`
```java
// AS-IS: 50MB 버퍼 기반 백프레셔
private final AtomicLong pendingBufferSize = new AtomicLong(0);
private static final long MAX_PENDING_BUFFER = 50 * 1024 * 1024; // 50MB
// 버퍼 초과 시 busy-wait
int backpressureWaitCount = 0;
while (pendingBufferSize.get() > MAX_PENDING_BUFFER) {
Thread.sleep(50);
backpressureWaitCount++;
// 500ms 이상 대기 시 청크 크기 축소
if (backpressureWaitCount > 10 && metrics.dynamicChunkSizeKB > MIN_MESSAGE_SIZE_KB) {
metrics.dynamicChunkSizeKB = Math.max(MIN_MESSAGE_SIZE_KB,
metrics.dynamicChunkSizeKB - 512);
}
}
// 전송 완료 후 버퍼 감소 (비동기 + 100ms 고정 대기)
CompletableFuture.runAsync(() -> {
Thread.sleep(100); // 네트워크 전송 시간 "추정"
pendingBufferSize.addAndGet(-chunkSize);
});
```
#### (C) 비동기 스레드 풀 — `AsyncConfig`
**파일**: `global/config/AsyncConfig.java`
```java
// AS-IS: 고정 크기 스레드 풀
@Bean(name = "trackStreamingExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(15);
executor.setMaxPoolSize(30);
executor.setQueueCapacity(500);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
```
#### (D) 정적 전송 지연 — `StompTrackStreamingService`
**파일**: `global/websocket/service/StompTrackStreamingService.java`
```java
// AS-IS: 데이터 크기에 따른 고정 지연
private int calculateChunkDelay(int chunkSize, int totalTracks) {
if (totalTracks > 1000000) return 200;
else if (totalTracks > 500000) return 150;
else if (totalTracks > 100000) return 100;
else if (totalTracks > 50000) return 50;
else if (totalTracks > 10000) return 30;
return 10;
}
```
#### (E) STOMP 채널 설정 — `WebSocketStompConfig`
**파일**: `global/config/WebSocketStompConfig.java`
```java
// AS-IS: 인바운드/아웃바운드 채널 스레드 풀
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.taskExecutor()
.corePoolSize(10).maxPoolSize(20).queueCapacity(100);
}
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.taskExecutor()
.corePoolSize(20).maxPoolSize(40).queueCapacity(5000);
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration
.setMessageSizeLimit(50 * 1024 * 1024) // 50MB
.setSendBufferSizeLimit(256 * 1024 * 1024) // 256MB
.setSendTimeLimit(120 * 1000); // 120초
}
```
### 1.3 운영 환경 설정
| 항목 | 설정값 |
|------|--------|
| JVM 힙 | 서버 메모리의 1/3 (최소 8GB, 최대 16GB) |
| GC | G1GC, MaxGCPauseMillis=200 |
| Tomcat 스레드 | max: 200, min-spare: 10 |
| Tomcat 연결 | max-connections: 10000, accept-count: 100 |
| DB 커넥션 풀 (Query) | max: 60, min-idle: 10 |
| DB 커넥션 풀 (Collect) | max: 20, min-idle: 5 |
| DB 커넥션 풀 (Batch) | max: 20, min-idle: 10 |
---
## 2. 식별된 문제점
### 2.1 [심각] 글로벌 동시 쿼리 제한 없음
**현상**: 세션당 3개만 제한하므로, 100개 세션이 동시 접속하면 최대 **300개 동시 쿼리** 발생 가능
**영향**:
- DB 커넥션 풀(60개) 고갈 → 커넥션 대기 타임아웃
- `trackStreamingExecutor` 큐(500) 포화 → `CallerRunsPolicy`에 의해 인바운드 채널 스레드 블로킹
- 인바운드 채널(max 20) 블로킹 → 새 WebSocket 요청 처리 불가 → 전체 서비스 마비
**관련 코드**: `TrackQueryInterceptor.java` — 글로벌 카운터 없음
```
[시나리오: 50개 클라이언트 × 3개 쿼리 = 150개 동시 요청]
trackStreamingExecutor (max: 30 스레드)
→ 30개 실행 + 500개 큐 대기
→ 큐 포화 시 CallerRunsPolicy 발동
→ 인바운드 채널 스레드(20개)에서 직접 실행
→ 인바운드 채널 블로킹 → WebSocket 메시지 처리 중단
→ 하트비트 실패 → 세션 끊김 → 연쇄 장애
```
### 2.2 [심각] 쿼리 카운트 감소 로직 누락
**현상**: `TrackQueryInterceptor.decrementQueryCount()` 메서드는 존재하지만, 쿼리 완료/실패 시 **호출하는 코드가 없음**
**영향**:
- 세션이 유지되는 동안 쿼리 카운트가 계속 증가만 함
- 3개 쿼리 후 해당 세션은 더 이상 쿼리 불가
- DISCONNECT 시에만 `sessionQueries.remove()`로 정리됨
- 결과적으로 세션당 제한이 **사실상 무의미** (장기 연결에서 3개 쿼리 후 차단)
**관련 코드**:
```java
// TrackQueryInterceptor.java:68 - 정의만 존재, 호출부 없음
public void decrementQueryCount(String sessionId) {
AtomicInteger count = sessionQueries.get(sessionId);
if (count != null) {
count.decrementAndGet();
}
}
```
### 2.3 [심각] CachedThreadPool 무제한 스레드 생성
**현상**: `CancellableQueryManager``Executors.newCachedThreadPool()` 사용
**영향**:
- 부하 시 무제한 스레드 생성 → OOM 또는 OS 스레드 한도 초과
- 스레드 컨텍스트 스위칭 오버헤드 급증
**관련 코드**: `CancellableQueryManager.java`
```java
// AS-IS: 상한 없는 스레드 풀
private final ExecutorService queryExecutor = Executors.newCachedThreadPool(r -> {
Thread thread = new Thread(r);
thread.setName("query-executor-" + thread.getId());
thread.setDaemon(true);
return thread;
});
```
### 2.4 [심각] ChunkedTrackStreamingService 쿼리 취소 미구현
**현상**: `cancelQuery()` 메서드가 메트릭 정리만 수행하고 **실제 스트리밍 중단 로직이 없음**
**영향**:
- 클라이언트가 연결을 끊어도 서버에서 DB 쿼리 + 데이터 가공이 계속 실행
- 리소스(DB 커넥션, CPU, 메모리)가 불필요하게 점유됨
**관련 코드**: `ChunkedTrackStreamingService.java:1176`
```java
// AS-IS: 취소 시 메트릭만 정리, 실행 중인 쿼리는 계속됨
public void cancelQuery(String queryId) {
log.info("Cancelling chunked query: {}", queryId);
// TODO: 실제 취소 로직 구현
cleanupQueryMetrics(queryId);
}
```
### 2.5 [중간] 백프레셔의 비정확한 버퍼 추적
**현상**: 전송 완료 후 버퍼 크기 감소를 `CompletableFuture + Thread.sleep(100ms)`로 처리
**영향**:
- 실제 네트워크 전송 완료와 무관하게 100ms 후 무조건 버퍼 감소
- 네트워크 지연이 100ms보다 길면 버퍼가 과소 추적 → 실제 메모리 사용량보다 작게 인식
- 네트워크 지연이 100ms보다 짧으면 불필요한 백프레셔 발생
**관련 코드**: `ChunkedTrackStreamingService.java:1069`
```java
// AS-IS: 네트워크 전송 시간을 100ms로 "추정"
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(100); // 네트워크 전송 시간 고려
pendingBufferSize.addAndGet(-chunkSize);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
```
### 2.6 [중간] 두 개의 쿼리 관리 시스템 분리
**현상**: `ActiveQueryManager``StompTrackStreamingService.activeQueries`가 각각 독립적으로 쿼리 상태를 관리
**영향**:
- 취소 시 한쪽만 처리될 가능성
- 상태 불일치로 인한 리소스 누수
**관련 클래스**:
- `ActiveQueryManager`: 세션별 쿼리 관리 (`ConcurrentHashMap<String, ActiveQuery>`)
- `StompTrackStreamingService`: 쿼리별 컨텍스트 관리 (`ConcurrentHashMap<String, QueryContext>`)
### 2.7 [중간] StompTrackStreamingService의 병합 모드 OOM 위험
**현상**: 병합 모드에서 모든 트랙 데이터를 메모리에 적재
**영향**:
- 대량 데이터(100만+ 트랙) 요청 시 힙 메모리 초과 → OOM
- G1GC Full GC 빈발 → STW(Stop-The-World) 증가 → 서비스 지연
### 2.8 [낮음] WebSocket 설정의 하드코딩
**현상**: 모든 WebSocket 관련 제한값이 Java 코드에 하드코딩
**영향**:
- 운영 중 튜닝 불가 (재배포 필요)
- 환경별 차별화 불가
| 하드코딩 항목 | 위치 | 값 |
|--------------|------|-----|
| `MAX_QUERIES_PER_SESSION` | TrackQueryInterceptor | 3 |
| `MAX_PENDING_BUFFER` | ChunkedTrackStreamingService | 50MB |
| `MAX_MESSAGE_SIZE_KB` | ChunkedTrackStreamingService | 1024KB |
| `BlockingQueue 크기` | StompTrackStreamingService | 100 |
| Inbound core/max | WebSocketStompConfig | 10/20 |
| Outbound core/max | WebSocketStompConfig | 20/40 |
| sendBufferSizeLimit | WebSocketStompConfig | 256MB |
### 2.9 [낮음] 미사용 최적화 코드 사장
**현상**: `TrackStreamingOptimizer`에 네트워크 메트릭 기반 적응형 청크 조정 로직이 설계되어 있으나, 실제 서비스에서 연결/사용되지 않음
---
## 3. 개선 계획 (TO-BE)
### 3.1 Phase 1 — 긴급 안정화 (즉시 적용)
> 목표: 동시 요청 폭주 시 서비스 마비 방지
#### 3.1.1 글로벌 동시 쿼리 제한 도입
**AS-IS**: 세션당 제한만 존재 (글로벌 제한 없음)
**TO-BE**: 서버 전체 동시 실행 쿼리 수를 Semaphore로 제한 + 대기 큐 구현
```java
// TO-BE: TrackQueryInterceptor.java — 글로벌 동시 제한 추가
@Slf4j
@Component
@RequiredArgsConstructor
public class TrackQueryInterceptor implements ChannelInterceptor {
private final ConcurrentHashMap<String, AtomicInteger> sessionQueries = new ConcurrentHashMap<>();
// ▼ [추가] 글로벌 동시 쿼리 제한
@Value("${websocket.query.max-concurrent-global:20}")
private int maxConcurrentGlobal;
@Value("${websocket.query.max-per-session:3}")
private int maxQueriesPerSession;
@Value("${websocket.query.queue-timeout-seconds:30}")
private int queueTimeoutSeconds;
private Semaphore globalQuerySemaphore;
private final AtomicInteger waitingCount = new AtomicInteger(0);
@PostConstruct
public void init() {
this.globalQuerySemaphore = new Semaphore(maxConcurrentGlobal, true); // fair=true
log.info("Global query semaphore initialized: maxConcurrent={}, queueTimeout={}s",
maxConcurrentGlobal, queueTimeoutSeconds);
}
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(
message, StompHeaderAccessor.class);
if (accessor != null && StompCommand.SEND.equals(accessor.getCommand())) {
String destination = accessor.getDestination();
String sessionId = accessor.getSessionId();
if ("/app/tracks/query".equals(destination)) {
// 1. 세션당 제한 확인
AtomicInteger queryCount = sessionQueries.computeIfAbsent(
sessionId, k -> new AtomicInteger(0));
if (queryCount.get() >= maxQueriesPerSession) {
throw new IllegalStateException(
"Session query limit exceeded: " + maxQueriesPerSession);
}
// 2. 글로벌 대기 큐 진입
int waiting = waitingCount.incrementAndGet();
log.info("Query queued: session={}, waiting={}, active={}/{}",
sessionId, waiting,
maxConcurrentGlobal - globalQuerySemaphore.availablePermits(),
maxConcurrentGlobal);
try {
boolean acquired = globalQuerySemaphore.tryAcquire(
queueTimeoutSeconds, TimeUnit.SECONDS);
if (!acquired) {
throw new IllegalStateException(
"Query queue timeout after " + queueTimeoutSeconds + "s. " +
"Server is busy, please retry later.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Query interrupted while waiting");
} finally {
waitingCount.decrementAndGet();
}
queryCount.incrementAndGet();
}
}
return message;
}
// ▼ [추가] 쿼리 완료 시 호출 — Semaphore 반환 + 카운트 감소
public void releaseQuery(String sessionId) {
globalQuerySemaphore.release();
decrementQueryCount(sessionId);
log.debug("Query released: session={}, available={}/{}",
sessionId, globalQuerySemaphore.availablePermits(), maxConcurrentGlobal);
}
// 기존 메서드 유지
public void decrementQueryCount(String sessionId) {
AtomicInteger count = sessionQueries.get(sessionId);
if (count != null && count.get() > 0) {
count.decrementAndGet();
}
}
// 모니터링용
public int getActiveQueryCount() {
return maxConcurrentGlobal - globalQuerySemaphore.availablePermits();
}
public int getWaitingCount() {
return waitingCount.get();
}
}
```
**application.yml 설정 추가**:
```yaml
websocket:
query:
max-concurrent-global: 20 # 서버 전체 동시 쿼리 상한
max-per-session: 3 # 세션당 동시 쿼리 상한
queue-timeout-seconds: 30 # 대기 큐 타임아웃
```
#### 3.1.2 쿼리 완료/실패 시 리소스 반환 보장
**AS-IS**: `decrementQueryCount()` 호출부 없음 → 세션당 제한이 해제되지 않음
**TO-BE**: 스트리밍 서비스에서 쿼리 완료/실패/취소 시 반드시 `releaseQuery()` 호출
```java
// TO-BE: ChunkedTrackStreamingService.java — finally 블록에서 리소스 반환
@Async("trackStreamingExecutor")
public void streamChunkedTracks(TrackQueryRequest request,
String queryId, String sessionId,
Consumer<TrackChunkResponse> chunkConsumer,
Consumer<QueryStatusUpdate> statusConsumer) {
try {
// ... 기존 스트리밍 로직 ...
} catch (Exception e) {
// ... 에러 처리 ...
} finally {
// ▼ [추가] 반드시 리소스 반환
trackQueryInterceptor.releaseQuery(sessionId);
activeQueryManager.completeQuery(sessionId);
cleanupQueryMetrics(queryId);
log.info("Query resources released: queryId={}, sessionId={}", queryId, sessionId);
}
}
```
```java
// TO-BE: StompTrackStreamingService.java — 동일하게 finally 보장
@Async
public void streamTracks(TrackQueryRequest request, String queryId,
String sessionId, ...) {
try {
// ... 기존 스트리밍 로직 ...
} catch (Exception e) {
// ... 에러 처리 ...
} finally {
// ▼ [추가] 반드시 리소스 반환
activeQueries.remove(queryId);
trackQueryInterceptor.releaseQuery(sessionId);
log.info("Query resources released: queryId={}, sessionId={}", queryId, sessionId);
}
}
```
#### 3.1.3 CachedThreadPool → 제한된 ThreadPoolExecutor 교체
**AS-IS**: `CancellableQueryManager`의 무제한 스레드 풀
```java
// AS-IS
private final ExecutorService queryExecutor = Executors.newCachedThreadPool(...);
```
**TO-BE**: 상한이 있는 ThreadPoolExecutor로 교체
```java
// TO-BE: CancellableQueryManager.java
private final ExecutorService queryExecutor = new ThreadPoolExecutor(
5, // corePoolSize
20, // maximumPoolSize (상한 설정)
60L, TimeUnit.SECONDS, // keepAliveTime
new LinkedBlockingQueue<>(100), // 대기 큐 크기 제한
r -> {
Thread thread = new Thread(r);
thread.setName("query-executor-" + thread.getId());
thread.setDaemon(true);
return thread;
},
new ThreadPoolExecutor.CallerRunsPolicy() // 큐 포화 시 호출 스레드에서 실행
);
```
---
### 3.2 Phase 2 — 취소 및 정리 로직 완성
> 목표: 불필요한 리소스 점유 방지, 쿼리 생명주기 완전 관리
#### 3.2.1 ChunkedTrackStreamingService 쿼리 취소 구현
**AS-IS**: TODO 주석만 존재
```java
// AS-IS
public void cancelQuery(String queryId) {
log.info("Cancelling chunked query: {}", queryId);
// TODO: 실제 취소 로직 구현
cleanupQueryMetrics(queryId);
}
```
**TO-BE**: AtomicBoolean 취소 플래그 + 쿼리 루프 내 체크 포인트
```java
// TO-BE: ChunkedTrackStreamingService.java
// ▼ [추가] 쿼리별 취소 플래그 관리
private final ConcurrentHashMap<String, AtomicBoolean> queryCancelFlags = new ConcurrentHashMap<>();
@Async("trackStreamingExecutor")
public void streamChunkedTracks(TrackQueryRequest request,
String queryId, String sessionId, ...) {
// ▼ [추가] 취소 플래그 등록
AtomicBoolean cancelFlag = new AtomicBoolean(false);
queryCancelFlags.put(queryId, cancelFlag);
try {
// ... 기존 로직에서 processTableRange 호출 시 cancelFlag 전달 ...
for (Map.Entry<TableStrategy, List<TimeRange>> entry : strategyMap.entrySet()) {
// ▼ [추가] 테이블 처리 전 취소 확인
if (cancelFlag.get()) {
log.info("Query {} cancelled before processing table {}", queryId, entry.getKey());
break;
}
// processTableRange(request, strategy, range, sessionId, viewportVesselIds, cancelFlag);
}
} finally {
queryCancelFlags.remove(queryId);
trackQueryInterceptor.releaseQuery(sessionId);
activeQueryManager.completeQuery(sessionId);
cleanupQueryMetrics(queryId);
}
}
// ▼ [개선] 실제 취소 로직 구현
public void cancelQuery(String queryId) {
AtomicBoolean flag = queryCancelFlags.get(queryId);
if (flag != null) {
flag.set(true);
log.info("Cancel flag set for query: {}", queryId);
}
cleanupQueryMetrics(queryId);
}
```
```java
// TO-BE: processTableRange 내부 — 주기적 취소 확인
while (rs.next() && trackCount < MAX_TRACKS_PER_CHUNK) {
// ▼ [추가] 1000건마다 취소 확인
if (trackCount % 1000 == 0 && cancelFlag.get()) {
log.info("Query cancelled during processing: table={}, processed={}", tableName, trackCount);
return Collections.emptyList();
}
// ... 기존 처리 로직 ...
}
```
#### 3.2.2 쿼리 관리 시스템 통합
**AS-IS**: `ActiveQueryManager``StompTrackStreamingService.activeQueries` 이원화
**TO-BE**: `ActiveQueryManager`를 단일 진실의 원천(Single Source of Truth)으로 통합
```java
// TO-BE: ActiveQueryManager.java — 취소 플래그 및 릴리즈 콜백 통합
public static class ActiveQuery {
private final String sessionId;
private final String queryId;
private final LocalDateTime startTime;
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private volatile String currentTable;
private volatile int processedChunks;
private Runnable onComplete; // ▼ [추가] 완료 콜백
public void setOnComplete(Runnable callback) {
this.onComplete = callback;
}
}
public void completeQuery(String sessionId) {
ActiveQuery query = activeQueries.remove(sessionId);
if (query != null) {
// ▼ [추가] 등록된 콜백 실행 (Semaphore 반환 등)
if (query.onComplete != null) {
query.onComplete.run();
}
log.debug("Query completed: sessionId={}, queryId={}", sessionId, query.getQueryId());
}
}
```
---
### 3.3 Phase 3 — 백프레셔 고도화
> 목표: 정확한 버퍼 추적, 클라이언트 소비 속도 반영
#### 3.3.1 콜백 기반 버퍼 추적
**AS-IS**: `Thread.sleep(100ms)` 후 무조건 버퍼 감소 (추정치)
**TO-BE**: 전송 완료 콜백에서 정확히 버퍼 감소
```java
// TO-BE: 전송 완료 콜백 기반 버퍼 관리
// SimpMessagingTemplate의 convertAndSendToUser 대신 직접 전송 제어
// WebSocket 메시지 전송 래퍼
private void sendWithBackpressureTracking(String sessionId, String destination,
TrackChunkResponse response, int chunkSize) {
pendingBufferSize.addAndGet(chunkSize);
try {
messagingTemplate.convertAndSendToUser(sessionId, destination, response);
// 전송 성공 즉시 버퍼 감소
pendingBufferSize.addAndGet(-chunkSize);
} catch (Exception e) {
// 전송 실패 시에도 버퍼 감소
pendingBufferSize.addAndGet(-chunkSize);
throw e;
}
}
```
#### 3.3.2 적응형 전송 지연
**AS-IS**: 총 트랙 수에 따른 정적 지연
**TO-BE**: 실시간 버퍼 사용률 + 최근 전송 성공률에 따른 동적 지연
```java
// TO-BE: 동적 지연 계산
private int calculateAdaptiveDelay(BackpressureMetrics metrics) {
double bufferUsage = (double) pendingBufferSize.get() / MAX_PENDING_BUFFER;
if (bufferUsage > 0.8) return 200; // 80% 이상: 강한 억제
else if (bufferUsage > 0.5) return 100; // 50% 이상: 중간 억제
else if (bufferUsage > 0.3) return 50; // 30% 이상: 약한 억제
return 10; // 정상: 최소 지연
}
```
---
### 3.4 Phase 4 — 설정 외부화 및 모니터링
> 목표: 운영 중 튜닝 가능, 실시간 상황 파악
#### 3.4.1 WebSocket 설정 Properties 클래스 도입
```java
// TO-BE: WebSocketProperties.java (신규)
@Data
@Component
@ConfigurationProperties(prefix = "websocket")
public class WebSocketProperties {
private QueryProperties query = new QueryProperties();
private TransportProperties transport = new TransportProperties();
private BackpressureProperties backpressure = new BackpressureProperties();
@Data
public static class QueryProperties {
private int maxConcurrentGlobal = 20;
private int maxPerSession = 3;
private int queueTimeoutSeconds = 30;
}
@Data
public static class TransportProperties {
private int inboundCorePoolSize = 10;
private int inboundMaxPoolSize = 20;
private int inboundQueueCapacity = 100;
private int outboundCorePoolSize = 20;
private int outboundMaxPoolSize = 40;
private int outboundQueueCapacity = 5000;
private int messageSizeLimitMb = 50;
private int sendBufferSizeLimitMb = 256;
private int sendTimeLimitSeconds = 120;
}
@Data
public static class BackpressureProperties {
private long maxPendingBufferMb = 50;
private int maxMessageSizeKb = 1024;
private int minMessageSizeKb = 256;
}
}
```
**application.yml 통합 설정**:
```yaml
websocket:
query:
max-concurrent-global: 20
max-per-session: 3
queue-timeout-seconds: 30
transport:
inbound-core-pool-size: 10
inbound-max-pool-size: 20
inbound-queue-capacity: 100
outbound-core-pool-size: 20
outbound-max-pool-size: 40
outbound-queue-capacity: 5000
message-size-limit-mb: 50
send-buffer-size-limit-mb: 256
send-time-limit-seconds: 120
backpressure:
max-pending-buffer-mb: 50
max-message-size-kb: 1024
min-message-size-kb: 256
```
#### 3.4.2 모니터링 엔드포인트 추가
```java
// TO-BE: WebSocketMonitorController.java (신규)
@RestController
@RequestMapping("/api/v1/monitor/websocket")
@RequiredArgsConstructor
public class WebSocketMonitorController {
private final TrackQueryInterceptor queryInterceptor;
private final ActiveQueryManager activeQueryManager;
@GetMapping("/status")
public Map<String, Object> getStatus() {
return Map.of(
"activeQueries", queryInterceptor.getActiveQueryCount(),
"waitingInQueue", queryInterceptor.getWaitingCount(),
"queryDetails", activeQueryManager.getAllActiveQueries().entrySet().stream()
.map(e -> Map.of(
"sessionId", e.getKey(),
"queryId", e.getValue().getQueryId(),
"startTime", e.getValue().getStartTime().toString(),
"table", String.valueOf(e.getValue().getCurrentTable()),
"chunks", e.getValue().getProcessedChunks()
)).collect(Collectors.toList())
);
}
}
```
---
## 4. 개선 우선순위 및 기대 효과
| 순위 | 개선 항목 | 심각도 | 난이도 | 기대 효과 |
|------|----------|--------|--------|-----------|
| **1** | 글로벌 동시 쿼리 제한 (Semaphore) | 심각 | 낮음 | 동시 요청 폭주 시 서비스 마비 방지 |
| **2** | 쿼리 완료 시 리소스 반환 (`releaseQuery`) | 심각 | 낮음 | 세션당 제한 정상 동작 |
| **3** | CachedThreadPool 교체 | 심각 | 낮음 | 무제한 스레드 생성 방지 |
| **4** | ChunkedTrackStreamingService 취소 구현 | 심각 | 중간 | 불필요한 리소스 점유 해소 |
| **5** | 쿼리 관리 시스템 통합 | 중간 | 중간 | 상태 불일치 해소 |
| **6** | 백프레셔 콜백 기반 전환 | 중간 | 중간 | 정확한 메모리 관리 |
| **7** | 설정 외부화 | 낮음 | 낮음 | 운영 중 튜닝 가능 |
| **8** | 모니터링 엔드포인트 | 낮음 | 낮음 | 실시간 상태 파악 |
---
## 5. 제한 사항 및 고려 사항
### 5.1 글로벌 동시 제한 수 결정 기준
```
maxConcurrentGlobal = min(DB커넥션풀 / 2, Async스레드풀max)
= min(60 / 2, 30)
= 20 (권장)
```
- DB 커넥션 풀(60개)의 50%를 WebSocket 쿼리에 할당
- 나머지 50%는 REST API, 배치 작업, 헬스체크 등에 예비
- `trackStreamingExecutor` max(30) 이하로 설정하여 스레드 풀 포화 방지
### 5.2 대기 큐 타임아웃
- 30초 권장 (클라이언트 UX 고려)
- 타임아웃 시 클라이언트에 명확한 에러 메시지 전달 필요
- 프론트엔드에서 재시도 로직 또는 "서버가 바쁩니다" 안내 필요
### 5.3 기존 클라이언트 호환성
- STOMP 프로토콜 레벨 변경 없음 (하위 호환)
- 에러 메시지 형식이 변경될 수 있으므로 프론트엔드 에러 핸들링 확인 필요
---
## 부록 A. 관련 파일 목록
| 파일 | 역할 |
|------|------|
| `global/websocket/interceptor/TrackQueryInterceptor.java` | 세션별 쿼리 제한 인터셉터 |
| `global/websocket/service/ChunkedTrackStreamingService.java` | 청크 기반 스트리밍 (백프레셔) |
| `global/websocket/service/StompTrackStreamingService.java` | STOMP 스트리밍 (큐 + 지연) |
| `global/websocket/service/ActiveQueryManager.java` | 활성 쿼리 추적 |
| `global/config/AsyncConfig.java` | 비동기 스레드 풀 설정 |
| `global/config/WebSocketStompConfig.java` | WebSocket STOMP 설정 |
| `domain/vessel/service/query/CancellableQueryManager.java` | 쿼리 취소 관리 |
| `global/websocket/listener/WebSocketDisconnectListener.java` | 세션 종료 리스너 |
| `domain/vessel/service/optimization/TrackStreamingOptimizer.java` | 적응형 최적화 (미사용) |
| `monitoring/service/TrackStreamingMetrics.java` | 스트리밍 메트릭 |
## Phase 5. 대기열 기반 쿼리 관리 + 타임아웃 최적화
### 5.1 대기열 구조
기존: 슬롯 부족 시 즉시 거부 (`ERROR: Server is busy`)
개선: 대기열 순번 안내 후 최대 2분 대기 (`QUEUED: position 3/5`)
```
요청 유입
tryAcquireSlotImmediate()
├── 성공 → 즉시 처리
└── 실패 → 대기열 진입
│ ┌──────────────────────────────┐
│ │ 2초 간격 루프: │
│ │ 1. tryAcquire (2초 타임아웃) │
│ │ 2. QUEUED 상태 전송 │
│ │ (순번/전체 큐 크기) │
│ │ 3. 최대 2분까지 반복 │
│ └──────────────────────────────┘
├── 슬롯 획득 → 처리 시작
└── 타임아웃 → ERROR 응답
```
### 5.2 타임아웃 최적화
| 설정 | AS-IS | TO-BE | 근거 |
|------|-------|-------|------|
| Query DB 풀 | 120 | **180** | 동시 60쿼리 × 3커넥션 |
| max-concurrent-global | 30 | **60** | 180 / ~3 |
| max-per-session | 3 | **20** | 대기열 방식이므로 넉넉하게 |
| Executor max | 30 | **120** | 60실행 + 60대기 |
| Session idle timeout | 60s | **15s** | 빠른 정리 |
| Heartbeat | 10s/10s | **5s/5s** | 죽은 연결 빠른 감지 |
| SockJS disconnect delay | 30s | **5s** | 빠른 해제 |
| Send time limit | 120s | **30s** | 응답 완료 후 빠른 정리 |
---
## Phase 6. 일일 데이터 인메모리 캐시
### 6.1 캐시 아키텍처
일일(Daily) 집계 테이블의 7일분 데이터를 메모리에 캐시하여 DB 조회를 생략.
```
DailyTrackCacheManager (@Service)
├── cache: ConcurrentHashMap<LocalDate, DailyTrackData>
│ ├── D-7 → {tracks: Map<vesselId, CompactVesselTrack>, vesselCount, memorySizeBytes}
│ ├── D-6 → ...
│ ├── ...
│ └── D-1 → ...
├── 비동기 워밍업 (ApplicationReadyEvent → @Async)
│ → 스케줄러/API/WebSocket 차단 없이 백그라운드 로드
│ → D-1 → D-2 → ... → D-7 (최근 우선)
│ → 각 날짜 로드 완료 즉시 캐시 활성화
├── 하이브리드 쿼리 분리
│ → 요청 범위의 모든 날짜를 캐시 확인
│ → 히트: 메모리 조회, 미스: DB 조회, 오늘: hourly/5min 테이블
│ → 결과 vessel 기준 병합
└── 일일 갱신 (DailyAggregationJob 완료 시)
→ D-1 (재)로드 + D-8 제거
```
### 6.2 캐시 용량 추정
| 항목 | 수치 |
|------|------|
| Daily 테이블 1일분 | ~350MB (DB) |
| 7일분 인메모리 추정 | ~4GB (Java 객체 오버헤드 포함) |
| 최대 메모리 한도 | 5GB (설정 가능) |
| JVM 힙 (권장) | 12GB 이상 |
### 6.3 쿼리 라우팅
```
요청: 2026-01-30 ~ 2026-02-06 15:00
1. 날짜별 분류:
├─ 02-06 (오늘) → DB 필수 (hourly/5min 테이블)
├─ 02-05: cache HIT → 메모리
├─ 02-04: cache HIT → 메모리
├─ ...
├─ 01-31: cache HIT → 메모리
└─ 01-30: cache MISS → DB (daily 테이블)
2. 결과: vessel 기준 병합 → 기존 응답 구조 그대로
```
### 6.4 모니터링
```
GET /api/websocket/daily-cache
{
"status": "READY",
"cachedDays": 7,
"totalVessels": 280000,
"totalMemoryMb": 3500,
"days": [
{"date": "2026-02-05", "vesselCount": 42000, "memorySizeMb": 520, "loadedAt": "..."},
...
]
}
```
---
## 부록 B. 운영 환경 리소스 현황 (Phase 5~6 반영)
| 리소스 | AS-IS | TO-BE | 비고 |
|--------|-------|-------|------|
| JVM Heap | 8~16GB | **12~16GB 권장** | 캐시 ~4GB + 운영 |
| DB Pool (Query) | max 120 | **max 180** | WebSocket + REST |
| DB Pool (Collect) | max 80 | max 80 | 배치 Reader |
| DB Pool (Batch) | max 30 | max 30 | 메타데이터 |
| max-concurrent-global | 30 | **60** | Query풀 180 / 3 |
| trackStreamingExecutor | core 15, max 30 | **core 40, max 120** | 대기열 + 실행 |
| Session idle timeout | 60s | **15s** | 빠른 정리 |
| Heartbeat | 10s/10s | **5s/5s** | 빠른 감지 |
| Daily cache | - | **7일분 ~4GB** | 비동기 워밍업 |