fix: WebSocket OOM 방지 — 버퍼 256MB→2MB + 힙 기반 Admission Control

근본 원인: maxTextMessageBufferSize=256MB가 세션당 Humongous 객체 유발
- 100세션 × 256MB = 25.6GB G1GC 회수 불가 Humongous 리전 누적 → OOM

변경 내역:
- WebSocketStompConfig: 컨테이너 버퍼 256MB→2MB, SockJS stream 100MB→5MB
- WebSocketProperties: sendBuffer 256→50MB, outboundQueue 5000→200, msgLimit 50→2MB
- YAML(prod/prod-mpr/query): websocket.transport 섹션 명시적 추가
- ActiveQueryManager: 힙 사용률 85% 초과 시 쿼리 대기열 전환
- ChunkedTrackStreamingService: 중간 컬렉션 clear()/null 즉시 해제
- GisServiceV2: 캐시 원본 보호 toBuilder().build() + 중간 컬렉션 해제
- StompTrackController: activeSessions COMPLETED/ERROR 시 자동 제거
- AsyncConfig: 스레드풀 core 40→15, max 120→30

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
LHT 2026-02-12 16:37:48 +09:00
부모 fb72be89a1
커밋 28e6887379
10개의 변경된 파일150개의 추가작업 그리고 32개의 파일을 삭제

파일 보기

@ -164,6 +164,9 @@ public class GisServiceV2 {
} finally {
if (slotAcquired) {
activeQueryManager.releaseQuerySlot(queryId);
if (activeQueryManager.isHeapPressureHigh()) {
System.gc();
}
}
}
}
@ -263,6 +266,9 @@ public class GisServiceV2 {
} finally {
if (slotAcquired) {
activeQueryManager.releaseQuerySlot(queryId);
if (activeQueryManager.isHeapPressureHigh()) {
System.gc();
}
}
}
}
@ -300,6 +306,10 @@ public class GisServiceV2 {
} finally {
if (slotAcquired) {
activeQueryManager.releaseQuerySlot(queryId);
// Humongous 영역 조기 회수 (G1GC에서 8MB+ 객체는 Mixed GC에서만 회수)
if (activeQueryManager.isHeapPressureHigh()) {
System.gc();
}
}
}
}
@ -329,14 +339,18 @@ public class GisServiceV2 {
List<CompactVesselTrack> cachedTracks =
dailyTrackCacheManager.getCachedTracksMultipleDays(split.getCachedDates());
// 요청 선박만 필터링
// 요청 선박만 필터링 + 방어적 복사 (캐시 원본 보호: simplify가 in-place 수정하므로)
int totalCachedCount = cachedTracks.size();
List<CompactVesselTrack> filteredCached = cachedTracks.stream()
.filter(t -> requestedVesselKeys.contains(t.getSigSrcCd() + "_" + t.getTargetId()))
.map(t -> t.toBuilder().build())
.collect(Collectors.toList());
cachedTracks.clear(); // 메모리 즉시 해제: 캐시 참조 리스트
allTracks.addAll(filteredCached);
log.debug("[CacheQuery] cached {} days -> {} tracks (filtered from {})",
split.getCachedDates().size(), filteredCached.size(), cachedTracks.size());
split.getCachedDates().size(), filteredCached.size(), totalCachedCount);
}
// 2. DB에서 조회 (캐시 미적중 과거 날짜)
@ -372,9 +386,11 @@ public class GisServiceV2 {
// 4. 동일 선박 병합 (캐시 + DB 결과)
List<CompactVesselTrack> merged = mergeTracksByVessel(allTracks);
allTracks.clear(); // 메모리 즉시 해제: 병합 완료 원본 리스트
// 5. 통합선박 필터링
if ("1".equals(request.getIsIntegration()) && integrationVesselService.isEnabled()) {
// 5. 통합선박 필터링 (isIntegration이 null이거나 "1"이면 적용, "0" 미적용)
String isInteg = request.getIsIntegration();
if (!"0".equals(isInteg) && integrationVesselService.isEnabled()) {
merged = filterByIntegration(merged);
}

파일 보기

@ -20,9 +20,9 @@ public class AsyncConfig implements AsyncConfigurer {
@Bean(name = "trackStreamingExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(40);
executor.setMaxPoolSize(120);
executor.setQueueCapacity(100);
executor.setCorePoolSize(15);
executor.setMaxPoolSize(30);
executor.setQueueCapacity(50);
executor.setKeepAliveSeconds(30);
executor.setThreadNamePrefix("track-stream-");
// executor.setTaskDecorator(new MdcTaskDecorator());

파일 보기

@ -35,10 +35,10 @@ public class WebSocketProperties {
private int inboundQueueCapacity = 100;
private int outboundCorePoolSize = 20;
private int outboundMaxPoolSize = 40;
private int outboundQueueCapacity = 5000;
private int messageSizeLimitMb = 50;
private int sendBufferSizeLimitMb = 256;
private int sendTimeLimitSeconds = 120;
private int outboundQueueCapacity = 200;
private int messageSizeLimitMb = 2;
private int sendBufferSizeLimitMb = 50;
private int sendTimeLimitSeconds = 30;
}
@Data

파일 보기

@ -42,8 +42,9 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(256 * 1024 * 1024); // 256MB로 증가
container.setMaxBinaryMessageBufferSize(256 * 1024 * 1024); // 256MB로 증가
// 청크 최대 1024KB 버퍼 2MB면 충분. 256MB는 세션당 Humongous 객체 유발 OOM 근본 원인
container.setMaxTextMessageBufferSize(2 * 1024 * 1024); // 256MB 2MB
container.setMaxBinaryMessageBufferSize(2 * 1024 * 1024); // 256MB 2MB
container.setMaxSessionIdleTimeout(webSocketProperties.getSession().getIdleTimeoutMs());
return container;
}
@ -63,7 +64,7 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
.setAllowedOriginPatterns("*")
.withSockJS()
.setClientLibraryUrl("/static/libs/js/sockjs.min.js")
.setStreamBytesLimit(100 * 1024 * 1024) // 100MB 증가
.setStreamBytesLimit(5 * 1024 * 1024) // 100MB 5MB (SockJS 폴링 버퍼)
.setHttpMessageCacheSize(1000)
.setDisconnectDelay(webSocketProperties.getSession().getSockjsDisconnectDelayMs())
.setWebSocketEnabled(true)

파일 보기

@ -63,26 +63,30 @@ public class StompTrackController {
headerAccessor.setHeader("queryId", queryId);
headerAccessor.setLeaveMutable(true);
// 상태 콜백: 쿼리 완료/에러 activeSessions에서 제거
java.util.function.Consumer<QueryStatusUpdate> statusCallback = status -> {
sendStatusToUser(userId, status);
if ("COMPLETED".equals(status.getStatus()) || "ERROR".equals(status.getStatus())) {
activeSessions.remove(sessionId);
}
};
// 비동기 스트리밍 시작 - 청크 모드 체크
if (request.isChunkedMode()) {
// 새로운 청크 스트리밍 모드
chunkedTrackStreamingService.streamChunkedTracks(
request,
queryId,
sessionId, // sessionId 전달 (연결 끊김 감지용)
sessionId,
chunk -> sendChunkedDataToUser(userId, chunk),
// 리소스 정리는 서비스 finally 블록에서 일괄 처리
status -> sendStatusToUser(userId, status)
statusCallback
);
} else {
// 기존 스트리밍 모드
trackStreamingService.streamTracks(
request,
queryId,
sessionId,
chunk -> sendChunkToUser(userId, chunk),
// 리소스 정리는 서비스 finally 블록에서 일괄 처리
status -> sendStatusToUser(userId, status)
statusCallback
);
}

파일 보기

@ -35,6 +35,9 @@ public class ActiveQueryManager {
@Value("${websocket.query.queue-timeout-seconds:30}")
private int queueTimeoutSeconds;
@Value("${websocket.memory.heap-reject-threshold:0.85}")
private double heapRejectThreshold;
@PostConstruct
public void init() {
this.globalQuerySemaphore = new Semaphore(maxConcurrentGlobal, true);
@ -155,13 +158,44 @@ public class ActiveQueryManager {
return activeQueries.size();
}
// 메모리 체크
private volatile long lastHeapWarningTime = 0;
/**
* 현재 사용률이 임계값을 초과하는지 확인
* 외부(ChunkedTrackStreamingService )에서도 호출 가능
*/
public boolean isHeapPressureHigh() {
Runtime rt = Runtime.getRuntime();
long used = rt.totalMemory() - rt.freeMemory();
long max = rt.maxMemory();
double usage = (double) used / max;
if (usage > heapRejectThreshold) {
long now = System.currentTimeMillis();
if (now - lastHeapWarningTime > 5000) { // 5초마다만 로그
lastHeapWarningTime = now;
log.warn("[MEMORY] Heap pressure high: {}% ({} MB / {} MB)",
String.format("%.1f", usage * 100),
used / (1024 * 1024), max / (1024 * 1024));
}
return true;
}
return false;
}
// 글로벌 동시 쿼리 제한
/**
* 글로벌 쿼리 슬롯 획득 (즉시 시도, 대기 없음)
* 메모리 압박 슬롯 획득을 거부하여 대기열로 전환
* @return true: 슬롯 획득 성공, false: 슬롯 없음 (대기열 진입 필요)
*/
public boolean tryAcquireQuerySlotImmediate(String queryId) {
if (isHeapPressureHigh()) {
log.warn("Query {} deferred to queue: heap memory pressure", queryId);
return false;
}
boolean acquired = globalQuerySemaphore.tryAcquire();
if (acquired) {
log.info("Query {} acquired global slot immediately: active={}/{}",

파일 보기

@ -967,9 +967,24 @@ public class ChunkedTrackStreamingService {
long deadline = System.currentTimeMillis() + (maxWaitSeconds * 1000L);
activeQueryManager.getWaitingQueue().offer(queryId);
try {
int heapRetryCount = 0;
while (System.currentTimeMillis() < deadline) {
slotAcquired = activeQueryManager.tryAcquireQuerySlot(queryId);
if (slotAcquired) break;
if (slotAcquired) {
if (activeQueryManager.isHeapPressureHigh()) {
activeQueryManager.releaseQuerySlot(queryId);
slotAcquired = false;
heapRetryCount++;
if (heapRetryCount <= 3) {
log.warn("Query {} slot acquired but heap pressure high, waiting 3s before retry (attempt {})",
queryId, heapRetryCount);
}
Thread.sleep(3000); // GC 회복 대기 (busy-spin 방지)
continue;
} else {
break;
}
}
// QUEUED 상태 전송
int position = activeQueryManager.getQueuePosition(queryId);
int totalInQueue = activeQueryManager.getQueueSize();
@ -1273,7 +1288,10 @@ public class ChunkedTrackStreamingService {
Math.round(timeProgress));
}
}
batches.clear(); // 메모리 즉시 해제: 배치 분할 리스트
mergedTracks.clear(); // 메모리 즉시 해제: 병합 항적 리스트
}
mergedMap.clear(); // 메모리 즉시 해제: 선박 누적
}
}
}
@ -1346,7 +1364,8 @@ public class ChunkedTrackStreamingService {
0.0
));
} finally {
// 리소스 반환 (순서: 취소 플래그 글로벌 슬롯 세션 카운트 메트릭스)
// 리소스 반환 (순서: 데이터 정리 취소 플래그 글로벌 슬롯 세션 카운트 메트릭스)
processedTimeRanges.clear(); // 메모리 즉시 해제: 처리 시간 범위
queryCancelFlags.remove(queryId);
if (slotAcquired) {
activeQueryManager.releaseQuerySlot(queryId);
@ -1354,6 +1373,15 @@ public class ChunkedTrackStreamingService {
trackQueryInterceptor.releaseQuery(sessionId);
activeQueryManager.completeQuery(sessionId);
cleanupQueryMetrics(queryId);
// Humongous 영역 조기 회수: G1GC에서 8MB+ 객체는 Young GC로 회수 불가
// Concurrent Mark Mixed GC 사이클이 필요하므로, 대규모 해제 GC 힌트 제공
// -XX:+ExplicitGCInvokesConcurrent 플래그와 함께 사용 STW 없이 concurrent GC 트리거
if (activeQueryManager.isHeapPressureHigh()) {
log.info("Query {} triggering concurrent GC for Humongous reclaim (session={})", queryId, sessionId);
System.gc();
}
log.info("Query {} resources fully released (session={})", queryId, sessionId);
}
}
@ -2533,6 +2561,8 @@ public class ChunkedTrackStreamingService {
.mapToInt(t -> t.getGeometry() != null ? t.getGeometry().size() : 0).sum();
List<List<CompactVesselTrack>> preBatches = splitByMessageSize(cachedTracks);
int preBatchCount = preBatches.size();
preBatches.clear();
preBatches = null;
// 방어적 복사 간소화 (캐시 원본 보호)
long simplifyStart = System.currentTimeMillis();
@ -2548,16 +2578,21 @@ public class ChunkedTrackStreamingService {
// 메시지 크기로 분할하여 전송
List<List<CompactVesselTrack>> batches = splitByMessageSize(cachedTracks);
int postBatchCount = batches.size();
for (List<CompactVesselTrack> batch : batches) {
sendChunkResponse(batch, queryId, chunkConsumer, uniqueVesselIds);
}
batches.clear();
batches = null;
cachedTracks.clear();
cachedTracks = null;
log.info("[CacheHIT] date={}, zoom={}, tracks={}, points: {} -> {} ({}% 감소), batches: {} -> {} ({}% 감소), simplify: {}ms",
rangeDate, request.getZoomLevel(), preTrackCount,
prePointCount, postPointCount,
prePointCount > 0 ? Math.round((1 - (double) postPointCount / prePointCount) * 100) : 0,
preBatchCount, batches.size(),
preBatchCount > 0 ? Math.round((1 - (double) batches.size() / preBatchCount) * 100) : 0,
preBatchCount, postBatchCount,
preBatchCount > 0 ? Math.round((1 - (double) postBatchCount / preBatchCount) * 100) : 0,
simplifyElapsed);
// [BENCHMARK] 벤치마크 누적 (캐시 경로)
@ -2583,6 +2618,7 @@ public class ChunkedTrackStreamingService {
}
log.info("Session vessel cache final size: {} vessels cached", sessionVesselCache.size());
sessionVesselCache.clear();
}
/**
@ -2825,6 +2861,8 @@ public class ChunkedTrackStreamingService {
log.info("Daily page {} data processed in {}ms", pageNum + 1, processEndTime - processStartTime);
totalTrackCount += trackDataList.size();
trackDataList.clear(); // 메모리 즉시 해제: ResultSet 임시 데이터
vesselIdsInPage.clear(); // 메모리 즉시 해제: 페이지 선박 ID 집합
// 페이지 데이터를 즉시 청크로 전송
if (!pageVesselMap.isEmpty()) {
@ -2891,12 +2929,17 @@ public class ChunkedTrackStreamingService {
}
}
int pageVesselCount = pageTracks.size();
batches.clear(); // 메모리 즉시 해제: 배치 분할 리스트
pageTracks.clear(); // 메모리 즉시 해제: 페이지 항적 리스트
log.info("Daily streaming page {} sent: {} tracks, {} vessels (total: {} tracks, {} vessels)",
pageNum + 1, pageTrackCount, pageTracks.size(), totalTrackCount, totalVesselsSent);
pageNum + 1, pageTrackCount, pageVesselCount, totalTrackCount, totalVesselsSent);
}
// 페이지가 완전히 채워지지 않았으면 마지막 페이지
if (pageTrackCount < DAILY_PAGE_SIZE) {
pageVesselMap.clear(); // 메모리 즉시 해제: 마지막 페이지 선박
log.info("Daily streaming pagination completed: {} pages, {} total tracks, {} total vessels sent",
pageNum + 1, totalTrackCount, totalVesselsSent);
break;
@ -2906,6 +2949,7 @@ public class ChunkedTrackStreamingService {
lastSigSrcCd = currentSigSrcCd;
lastTargetId = currentTargetId;
pageNum++;
pageVesselMap.clear(); // 메모리 즉시 해제: 페이지 선박 누적
}
} catch (SQLException e) {
log.error("Error in daily streaming pagination page {}: {}", pageNum, e.getMessage());

파일 보기

@ -273,9 +273,16 @@ cache:
# WebSocket 부하 제어 설정
websocket:
query:
max-concurrent-global: 40 # 배치 서버이므로 동시 쿼리 제한 (prod 60 대비 보수적)
max-per-session: 15 # 세션당 동시 쿼리 상한
queue-timeout-seconds: 30 # 글로벌 대기 큐 타임아웃
max-concurrent-global: 20 # 배치 서버 동시 쿼리 제한 (메모리 보호: 40→20)
max-per-session: 10 # 세션당 동시 쿼리 상한 (15→10)
queue-timeout-seconds: 60 # 글로벌 대기 큐 타임아웃 (슬롯 감소 보완: 30→60)
transport:
message-size-limit-mb: 2 # 단일 STOMP 메시지 상한 (50→2MB, 청크 1024KB 기준)
send-buffer-size-limit-mb: 50 # 세션당 송신 버퍼 상한 (사전 할당 아님, 최악 20×50MB=1GB)
outbound-queue-capacity: 200 # 아웃바운드 메시지 큐 (5000→200)
send-time-limit-seconds: 30 # 메시지 전송 시간 제한
memory:
heap-reject-threshold: 0.85 # 힙 사용률 85% 초과 시 새 쿼리 대기열 전환 (기본 20~24GB → 정상시 50% 미만)
session:
idle-timeout-ms: 15000
server-heartbeat-ms: 5000

파일 보기

@ -278,9 +278,16 @@ cache:
# WebSocket 부하 제어 설정
websocket:
query:
max-concurrent-global: 60 # 서버 전체 동시 실행 쿼리 상한 (Query풀 180 / 쿼리당 ~3커넥션)
max-per-session: 20 # 세션당 동시 쿼리 상한 (대기열 방식이므로 넉넉하게)
queue-timeout-seconds: 30 # 글로벌 대기 큐 타임아웃
max-concurrent-global: 30 # 서버 전체 동시 실행 쿼리 상한 (메모리 보호: 60→30)
max-per-session: 10 # 세션당 동시 쿼리 상한 (20→10)
queue-timeout-seconds: 60 # 글로벌 대기 큐 타임아웃 (슬롯 감소 보완: 30→60)
transport:
message-size-limit-mb: 2 # 단일 STOMP 메시지 상한 (50→2MB, 청크 1024KB 기준)
send-buffer-size-limit-mb: 50 # 세션당 송신 버퍼 상한 (사전 할당 아님, 최악 30×50MB=1.5GB)
outbound-queue-capacity: 200 # 아웃바운드 메시지 큐 (5000→200)
send-time-limit-seconds: 30 # 메시지 전송 시간 제한
memory:
heap-reject-threshold: 0.85 # 힙 사용률 85% 초과 시 새 쿼리 대기열 전환 (기본 20~24GB → 정상시 50% 미만)
session:
idle-timeout-ms: 15000 # 세션 유휴 타임아웃 15초 (60s → 15s)
server-heartbeat-ms: 5000 # 서버 하트비트 5초 (10s → 5s)

파일 보기

@ -103,6 +103,11 @@ websocket:
max-concurrent-global: 40 # 조회 전용 서버 (배치 없으므로 prod-mpr보다 여유)
max-per-session: 15 # 세션당 동시 쿼리 상한
queue-timeout-seconds: 30 # 글로벌 대기 큐 타임아웃
transport:
message-size-limit-mb: 2 # 단일 STOMP 메시지 상한 (청크 1024KB 기준)
send-buffer-size-limit-mb: 50 # 세션당 송신 버퍼 상한 (사전 할당 아님, 최악 40×50MB=2GB)
outbound-queue-capacity: 200 # 아웃바운드 메시지 큐
send-time-limit-seconds: 30 # 메시지 전송 시간 제한
session:
idle-timeout-ms: 15000
server-heartbeat-ms: 5000