From 28e6887379706537f31a2cef3576a5dd82cd39b3 Mon Sep 17 00:00:00 2001 From: LHT Date: Thu, 12 Feb 2026 16:37:48 +0900 Subject: [PATCH] =?UTF-8?q?fix:=20WebSocket=20OOM=20=EB=B0=A9=EC=A7=80=20?= =?UTF-8?q?=E2=80=94=20=EB=B2=84=ED=8D=BC=20256MB=E2=86=922MB=20+=20?= =?UTF-8?q?=ED=9E=99=20=EA=B8=B0=EB=B0=98=20Admission=20Control?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 근본 원인: maxTextMessageBufferSize=256MB가 세션당 Humongous 객체 유발 - 100세션 × 256MB = 25.6GB G1GC 회수 불가 Humongous 리전 누적 → OOM 변경 내역: - WebSocketStompConfig: 컨테이너 버퍼 256MB→2MB, SockJS stream 100MB→5MB - WebSocketProperties: sendBuffer 256→50MB, outboundQueue 5000→200, msgLimit 50→2MB - YAML(prod/prod-mpr/query): websocket.transport 섹션 명시적 추가 - ActiveQueryManager: 힙 사용률 85% 초과 시 쿼리 대기열 전환 - ChunkedTrackStreamingService: 중간 컬렉션 clear()/null 즉시 해제 - GisServiceV2: 캐시 원본 보호 toBuilder().build() + 중간 컬렉션 해제 - StompTrackController: activeSessions COMPLETED/ERROR 시 자동 제거 - AsyncConfig: 스레드풀 core 40→15, max 120→30 Co-Authored-By: Claude Opus 4.6 --- .../domain/gis/service/GisServiceV2.java | 24 +++++++-- .../global/config/AsyncConfig.java | 6 +-- .../global/config/WebSocketProperties.java | 8 +-- .../global/config/WebSocketStompConfig.java | 7 +-- .../controller/StompTrackController.java | 18 ++++--- .../websocket/service/ActiveQueryManager.java | 34 ++++++++++++ .../service/ChunkedTrackStreamingService.java | 54 +++++++++++++++++-- src/main/resources/application-prod-mpr.yml | 13 +++-- src/main/resources/application-prod.yml | 13 +++-- src/main/resources/application-query.yml | 5 ++ 10 files changed, 150 insertions(+), 32 deletions(-) diff --git a/src/main/java/gc/mda/signal_batch/domain/gis/service/GisServiceV2.java b/src/main/java/gc/mda/signal_batch/domain/gis/service/GisServiceV2.java index 3f86820..8f3f55f 100644 --- a/src/main/java/gc/mda/signal_batch/domain/gis/service/GisServiceV2.java +++ b/src/main/java/gc/mda/signal_batch/domain/gis/service/GisServiceV2.java @@ -164,6 +164,9 @@ public class GisServiceV2 { } finally { if (slotAcquired) { activeQueryManager.releaseQuerySlot(queryId); + if (activeQueryManager.isHeapPressureHigh()) { + System.gc(); + } } } } @@ -263,6 +266,9 @@ public class GisServiceV2 { } finally { if (slotAcquired) { activeQueryManager.releaseQuerySlot(queryId); + if (activeQueryManager.isHeapPressureHigh()) { + System.gc(); + } } } } @@ -300,6 +306,10 @@ public class GisServiceV2 { } finally { if (slotAcquired) { activeQueryManager.releaseQuerySlot(queryId); + // Humongous 영역 조기 회수 (G1GC에서 8MB+ 객체는 Mixed GC에서만 회수) + if (activeQueryManager.isHeapPressureHigh()) { + System.gc(); + } } } } @@ -329,14 +339,18 @@ public class GisServiceV2 { List cachedTracks = dailyTrackCacheManager.getCachedTracksMultipleDays(split.getCachedDates()); - // 요청 선박만 필터링 + // 요청 선박만 필터링 + 방어적 복사 (캐시 원본 보호: simplify가 in-place 수정하므로) + int totalCachedCount = cachedTracks.size(); List filteredCached = cachedTracks.stream() .filter(t -> requestedVesselKeys.contains(t.getSigSrcCd() + "_" + t.getTargetId())) + .map(t -> t.toBuilder().build()) .collect(Collectors.toList()); + cachedTracks.clear(); // 메모리 즉시 해제: 캐시 참조 리스트 + allTracks.addAll(filteredCached); log.debug("[CacheQuery] cached {} days -> {} tracks (filtered from {})", - split.getCachedDates().size(), filteredCached.size(), cachedTracks.size()); + split.getCachedDates().size(), filteredCached.size(), totalCachedCount); } // 2. DB에서 조회 (캐시 미적중 과거 날짜) @@ -372,9 +386,11 @@ public class GisServiceV2 { // 4. 동일 선박 병합 (캐시 + DB 결과) List merged = mergeTracksByVessel(allTracks); + allTracks.clear(); // 메모리 즉시 해제: 병합 완료 후 원본 리스트 - // 5. 통합선박 필터링 - if ("1".equals(request.getIsIntegration()) && integrationVesselService.isEnabled()) { + // 5. 통합선박 필터링 (isIntegration이 null이거나 "1"이면 적용, "0"만 미적용) + String isInteg = request.getIsIntegration(); + if (!"0".equals(isInteg) && integrationVesselService.isEnabled()) { merged = filterByIntegration(merged); } diff --git a/src/main/java/gc/mda/signal_batch/global/config/AsyncConfig.java b/src/main/java/gc/mda/signal_batch/global/config/AsyncConfig.java index 2e54ae7..fe64c43 100644 --- a/src/main/java/gc/mda/signal_batch/global/config/AsyncConfig.java +++ b/src/main/java/gc/mda/signal_batch/global/config/AsyncConfig.java @@ -20,9 +20,9 @@ public class AsyncConfig implements AsyncConfigurer { @Bean(name = "trackStreamingExecutor") public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(40); - executor.setMaxPoolSize(120); - executor.setQueueCapacity(100); + executor.setCorePoolSize(15); + executor.setMaxPoolSize(30); + executor.setQueueCapacity(50); executor.setKeepAliveSeconds(30); executor.setThreadNamePrefix("track-stream-"); // executor.setTaskDecorator(new MdcTaskDecorator()); diff --git a/src/main/java/gc/mda/signal_batch/global/config/WebSocketProperties.java b/src/main/java/gc/mda/signal_batch/global/config/WebSocketProperties.java index f0adadf..f813180 100644 --- a/src/main/java/gc/mda/signal_batch/global/config/WebSocketProperties.java +++ b/src/main/java/gc/mda/signal_batch/global/config/WebSocketProperties.java @@ -35,10 +35,10 @@ public class WebSocketProperties { private int inboundQueueCapacity = 100; private int outboundCorePoolSize = 20; private int outboundMaxPoolSize = 40; - private int outboundQueueCapacity = 5000; - private int messageSizeLimitMb = 50; - private int sendBufferSizeLimitMb = 256; - private int sendTimeLimitSeconds = 120; + private int outboundQueueCapacity = 200; + private int messageSizeLimitMb = 2; + private int sendBufferSizeLimitMb = 50; + private int sendTimeLimitSeconds = 30; } @Data diff --git a/src/main/java/gc/mda/signal_batch/global/config/WebSocketStompConfig.java b/src/main/java/gc/mda/signal_batch/global/config/WebSocketStompConfig.java index a55712f..7132f61 100644 --- a/src/main/java/gc/mda/signal_batch/global/config/WebSocketStompConfig.java +++ b/src/main/java/gc/mda/signal_batch/global/config/WebSocketStompConfig.java @@ -42,8 +42,9 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer { @Bean public ServletServerContainerFactoryBean createWebSocketContainer() { ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean(); - container.setMaxTextMessageBufferSize(256 * 1024 * 1024); // 256MB로 증가 - container.setMaxBinaryMessageBufferSize(256 * 1024 * 1024); // 256MB로 증가 + // 청크 최대 1024KB → 버퍼 2MB면 충분. 256MB는 세션당 Humongous 객체 유발 → OOM 근본 원인 + container.setMaxTextMessageBufferSize(2 * 1024 * 1024); // 256MB → 2MB + container.setMaxBinaryMessageBufferSize(2 * 1024 * 1024); // 256MB → 2MB container.setMaxSessionIdleTimeout(webSocketProperties.getSession().getIdleTimeoutMs()); return container; } @@ -63,7 +64,7 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer { .setAllowedOriginPatterns("*") .withSockJS() .setClientLibraryUrl("/static/libs/js/sockjs.min.js") - .setStreamBytesLimit(100 * 1024 * 1024) // 100MB로 증가 + .setStreamBytesLimit(5 * 1024 * 1024) // 100MB → 5MB (SockJS 폴링 버퍼) .setHttpMessageCacheSize(1000) .setDisconnectDelay(webSocketProperties.getSession().getSockjsDisconnectDelayMs()) .setWebSocketEnabled(true) diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/controller/StompTrackController.java b/src/main/java/gc/mda/signal_batch/global/websocket/controller/StompTrackController.java index 46c3d8c..05ccc55 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/controller/StompTrackController.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/controller/StompTrackController.java @@ -63,26 +63,30 @@ public class StompTrackController { headerAccessor.setHeader("queryId", queryId); headerAccessor.setLeaveMutable(true); + // 상태 콜백: 쿼리 완료/에러 시 activeSessions에서 제거 + java.util.function.Consumer statusCallback = status -> { + sendStatusToUser(userId, status); + if ("COMPLETED".equals(status.getStatus()) || "ERROR".equals(status.getStatus())) { + activeSessions.remove(sessionId); + } + }; + // 비동기 스트리밍 시작 - 청크 모드 체크 if (request.isChunkedMode()) { - // 새로운 청크 스트리밍 모드 chunkedTrackStreamingService.streamChunkedTracks( request, queryId, - sessionId, // sessionId 전달 (연결 끊김 감지용) + sessionId, chunk -> sendChunkedDataToUser(userId, chunk), - // 리소스 정리는 서비스 finally 블록에서 일괄 처리 - status -> sendStatusToUser(userId, status) + statusCallback ); } else { - // 기존 스트리밍 모드 trackStreamingService.streamTracks( request, queryId, sessionId, chunk -> sendChunkToUser(userId, chunk), - // 리소스 정리는 서비스 finally 블록에서 일괄 처리 - status -> sendStatusToUser(userId, status) + statusCallback ); } diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/ActiveQueryManager.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/ActiveQueryManager.java index e9b2773..6859cc8 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/ActiveQueryManager.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/ActiveQueryManager.java @@ -35,6 +35,9 @@ public class ActiveQueryManager { @Value("${websocket.query.queue-timeout-seconds:30}") private int queueTimeoutSeconds; + @Value("${websocket.memory.heap-reject-threshold:0.85}") + private double heapRejectThreshold; + @PostConstruct public void init() { this.globalQuerySemaphore = new Semaphore(maxConcurrentGlobal, true); @@ -155,13 +158,44 @@ public class ActiveQueryManager { return activeQueries.size(); } + // ── 힙 메모리 체크 ── + + private volatile long lastHeapWarningTime = 0; + + /** + * 현재 힙 사용률이 임계값을 초과하는지 확인 + * 외부(ChunkedTrackStreamingService 등)에서도 호출 가능 + */ + public boolean isHeapPressureHigh() { + Runtime rt = Runtime.getRuntime(); + long used = rt.totalMemory() - rt.freeMemory(); + long max = rt.maxMemory(); + double usage = (double) used / max; + if (usage > heapRejectThreshold) { + long now = System.currentTimeMillis(); + if (now - lastHeapWarningTime > 5000) { // 5초마다만 로그 + lastHeapWarningTime = now; + log.warn("[MEMORY] Heap pressure high: {}% ({} MB / {} MB)", + String.format("%.1f", usage * 100), + used / (1024 * 1024), max / (1024 * 1024)); + } + return true; + } + return false; + } + // ── 글로벌 동시 쿼리 제한 ── /** * 글로벌 쿼리 슬롯 획득 (즉시 시도, 대기 없음) + * 힙 메모리 압박 시 슬롯 획득을 거부하여 대기열로 전환 * @return true: 슬롯 획득 성공, false: 슬롯 없음 (대기열 진입 필요) */ public boolean tryAcquireQuerySlotImmediate(String queryId) { + if (isHeapPressureHigh()) { + log.warn("Query {} deferred to queue: heap memory pressure", queryId); + return false; + } boolean acquired = globalQuerySemaphore.tryAcquire(); if (acquired) { log.info("Query {} acquired global slot immediately: active={}/{}", diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java index 27203d2..c8924d6 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java @@ -967,9 +967,24 @@ public class ChunkedTrackStreamingService { long deadline = System.currentTimeMillis() + (maxWaitSeconds * 1000L); activeQueryManager.getWaitingQueue().offer(queryId); try { + int heapRetryCount = 0; while (System.currentTimeMillis() < deadline) { slotAcquired = activeQueryManager.tryAcquireQuerySlot(queryId); - if (slotAcquired) break; + if (slotAcquired) { + if (activeQueryManager.isHeapPressureHigh()) { + activeQueryManager.releaseQuerySlot(queryId); + slotAcquired = false; + heapRetryCount++; + if (heapRetryCount <= 3) { + log.warn("Query {} slot acquired but heap pressure high, waiting 3s before retry (attempt {})", + queryId, heapRetryCount); + } + Thread.sleep(3000); // GC 회복 대기 (busy-spin 방지) + continue; + } else { + break; + } + } // QUEUED 상태 전송 int position = activeQueryManager.getQueuePosition(queryId); int totalInQueue = activeQueryManager.getQueueSize(); @@ -1273,7 +1288,10 @@ public class ChunkedTrackStreamingService { Math.round(timeProgress)); } } + batches.clear(); // 메모리 즉시 해제: 배치 분할 리스트 + mergedTracks.clear(); // 메모리 즉시 해제: 병합 항적 리스트 } + mergedMap.clear(); // 메모리 즉시 해제: 선박 누적 맵 } } } @@ -1346,7 +1364,8 @@ public class ChunkedTrackStreamingService { 0.0 )); } finally { - // 리소스 반환 (순서: 취소 플래그 → 글로벌 슬롯 → 세션 카운트 → 메트릭스) + // 리소스 반환 (순서: 데이터 정리 → 취소 플래그 → 글로벌 슬롯 → 세션 카운트 → 메트릭스) + processedTimeRanges.clear(); // 메모리 즉시 해제: 처리 시간 범위 맵 queryCancelFlags.remove(queryId); if (slotAcquired) { activeQueryManager.releaseQuerySlot(queryId); @@ -1354,6 +1373,15 @@ public class ChunkedTrackStreamingService { trackQueryInterceptor.releaseQuery(sessionId); activeQueryManager.completeQuery(sessionId); cleanupQueryMetrics(queryId); + + // Humongous 영역 조기 회수: G1GC에서 8MB+ 객체는 Young GC로 회수 불가 + // Concurrent Mark → Mixed GC 사이클이 필요하므로, 대규모 해제 후 GC 힌트 제공 + // -XX:+ExplicitGCInvokesConcurrent 플래그와 함께 사용 시 STW 없이 concurrent GC 트리거 + if (activeQueryManager.isHeapPressureHigh()) { + log.info("Query {} triggering concurrent GC for Humongous reclaim (session={})", queryId, sessionId); + System.gc(); + } + log.info("Query {} resources fully released (session={})", queryId, sessionId); } } @@ -2533,6 +2561,8 @@ public class ChunkedTrackStreamingService { .mapToInt(t -> t.getGeometry() != null ? t.getGeometry().size() : 0).sum(); List> preBatches = splitByMessageSize(cachedTracks); int preBatchCount = preBatches.size(); + preBatches.clear(); + preBatches = null; // 방어적 복사 후 간소화 (캐시 원본 보호) long simplifyStart = System.currentTimeMillis(); @@ -2548,16 +2578,21 @@ public class ChunkedTrackStreamingService { // 메시지 크기로 분할하여 전송 List> batches = splitByMessageSize(cachedTracks); + int postBatchCount = batches.size(); for (List batch : batches) { sendChunkResponse(batch, queryId, chunkConsumer, uniqueVesselIds); } + batches.clear(); + batches = null; + cachedTracks.clear(); + cachedTracks = null; log.info("[CacheHIT] date={}, zoom={}, tracks={}, points: {} -> {} ({}% 감소), batches: {} -> {} ({}% 감소), simplify: {}ms", rangeDate, request.getZoomLevel(), preTrackCount, prePointCount, postPointCount, prePointCount > 0 ? Math.round((1 - (double) postPointCount / prePointCount) * 100) : 0, - preBatchCount, batches.size(), - preBatchCount > 0 ? Math.round((1 - (double) batches.size() / preBatchCount) * 100) : 0, + preBatchCount, postBatchCount, + preBatchCount > 0 ? Math.round((1 - (double) postBatchCount / preBatchCount) * 100) : 0, simplifyElapsed); // [BENCHMARK] 벤치마크 누적 (캐시 경로) @@ -2583,6 +2618,7 @@ public class ChunkedTrackStreamingService { } log.info("Session vessel cache final size: {} vessels cached", sessionVesselCache.size()); + sessionVesselCache.clear(); } /** @@ -2825,6 +2861,8 @@ public class ChunkedTrackStreamingService { log.info("Daily page {} data processed in {}ms", pageNum + 1, processEndTime - processStartTime); totalTrackCount += trackDataList.size(); + trackDataList.clear(); // 메모리 즉시 해제: ResultSet 임시 데이터 + vesselIdsInPage.clear(); // 메모리 즉시 해제: 페이지 선박 ID 집합 // 페이지 데이터를 즉시 청크로 전송 if (!pageVesselMap.isEmpty()) { @@ -2891,12 +2929,17 @@ public class ChunkedTrackStreamingService { } } + int pageVesselCount = pageTracks.size(); + batches.clear(); // 메모리 즉시 해제: 배치 분할 리스트 + pageTracks.clear(); // 메모리 즉시 해제: 페이지 항적 리스트 + log.info("Daily streaming page {} sent: {} tracks, {} vessels (total: {} tracks, {} vessels)", - pageNum + 1, pageTrackCount, pageTracks.size(), totalTrackCount, totalVesselsSent); + pageNum + 1, pageTrackCount, pageVesselCount, totalTrackCount, totalVesselsSent); } // 페이지가 완전히 채워지지 않았으면 마지막 페이지 if (pageTrackCount < DAILY_PAGE_SIZE) { + pageVesselMap.clear(); // 메모리 즉시 해제: 마지막 페이지 선박 맵 log.info("Daily streaming pagination completed: {} pages, {} total tracks, {} total vessels sent", pageNum + 1, totalTrackCount, totalVesselsSent); break; @@ -2906,6 +2949,7 @@ public class ChunkedTrackStreamingService { lastSigSrcCd = currentSigSrcCd; lastTargetId = currentTargetId; pageNum++; + pageVesselMap.clear(); // 메모리 즉시 해제: 페이지 선박 누적 맵 } } catch (SQLException e) { log.error("Error in daily streaming pagination page {}: {}", pageNum, e.getMessage()); diff --git a/src/main/resources/application-prod-mpr.yml b/src/main/resources/application-prod-mpr.yml index e35ba5c..52d2931 100644 --- a/src/main/resources/application-prod-mpr.yml +++ b/src/main/resources/application-prod-mpr.yml @@ -273,9 +273,16 @@ cache: # WebSocket 부하 제어 설정 websocket: query: - max-concurrent-global: 40 # 배치 서버이므로 동시 쿼리 제한 (prod 60 대비 보수적) - max-per-session: 15 # 세션당 동시 쿼리 상한 - queue-timeout-seconds: 30 # 글로벌 대기 큐 타임아웃 + max-concurrent-global: 20 # 배치 서버 동시 쿼리 제한 (메모리 보호: 40→20) + max-per-session: 10 # 세션당 동시 쿼리 상한 (15→10) + queue-timeout-seconds: 60 # 글로벌 대기 큐 타임아웃 (슬롯 감소 보완: 30→60) + transport: + message-size-limit-mb: 2 # 단일 STOMP 메시지 상한 (50→2MB, 청크 1024KB 기준) + send-buffer-size-limit-mb: 50 # 세션당 송신 버퍼 상한 (사전 할당 아님, 최악 20×50MB=1GB) + outbound-queue-capacity: 200 # 아웃바운드 메시지 큐 (5000→200) + send-time-limit-seconds: 30 # 메시지 전송 시간 제한 + memory: + heap-reject-threshold: 0.85 # 힙 사용률 85% 초과 시 새 쿼리 대기열 전환 (기본 20~24GB → 정상시 50% 미만) session: idle-timeout-ms: 15000 server-heartbeat-ms: 5000 diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index 92ddbb1..c81dc1e 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -278,9 +278,16 @@ cache: # WebSocket 부하 제어 설정 websocket: query: - max-concurrent-global: 60 # 서버 전체 동시 실행 쿼리 상한 (Query풀 180 / 쿼리당 ~3커넥션) - max-per-session: 20 # 세션당 동시 쿼리 상한 (대기열 방식이므로 넉넉하게) - queue-timeout-seconds: 30 # 글로벌 대기 큐 타임아웃 + max-concurrent-global: 30 # 서버 전체 동시 실행 쿼리 상한 (메모리 보호: 60→30) + max-per-session: 10 # 세션당 동시 쿼리 상한 (20→10) + queue-timeout-seconds: 60 # 글로벌 대기 큐 타임아웃 (슬롯 감소 보완: 30→60) + transport: + message-size-limit-mb: 2 # 단일 STOMP 메시지 상한 (50→2MB, 청크 1024KB 기준) + send-buffer-size-limit-mb: 50 # 세션당 송신 버퍼 상한 (사전 할당 아님, 최악 30×50MB=1.5GB) + outbound-queue-capacity: 200 # 아웃바운드 메시지 큐 (5000→200) + send-time-limit-seconds: 30 # 메시지 전송 시간 제한 + memory: + heap-reject-threshold: 0.85 # 힙 사용률 85% 초과 시 새 쿼리 대기열 전환 (기본 20~24GB → 정상시 50% 미만) session: idle-timeout-ms: 15000 # 세션 유휴 타임아웃 15초 (60s → 15s) server-heartbeat-ms: 5000 # 서버 하트비트 5초 (10s → 5s) diff --git a/src/main/resources/application-query.yml b/src/main/resources/application-query.yml index 799dbf1..eb8485b 100644 --- a/src/main/resources/application-query.yml +++ b/src/main/resources/application-query.yml @@ -103,6 +103,11 @@ websocket: max-concurrent-global: 40 # 조회 전용 서버 (배치 없으므로 prod-mpr보다 여유) max-per-session: 15 # 세션당 동시 쿼리 상한 queue-timeout-seconds: 30 # 글로벌 대기 큐 타임아웃 + transport: + message-size-limit-mb: 2 # 단일 STOMP 메시지 상한 (청크 1024KB 기준) + send-buffer-size-limit-mb: 50 # 세션당 송신 버퍼 상한 (사전 할당 아님, 최악 40×50MB=2GB) + outbound-queue-capacity: 200 # 아웃바운드 메시지 큐 + send-time-limit-seconds: 30 # 메시지 전송 시간 제한 session: idle-timeout-ms: 15000 server-heartbeat-ms: 5000