diff --git a/run-on-query-server-dev.sh b/run-on-query-server-dev.sh new file mode 100644 index 0000000..a612953 --- /dev/null +++ b/run-on-query-server-dev.sh @@ -0,0 +1,184 @@ +#!/bin/bash + +# Query DB 서버에서 최적화된 실행 스크립트 +# Rocky Linux 환경에 맞춰 조정됨 +# Java 17 경로 명시적 지정 + +# 애플리케이션 경로 +APP_HOME="/devdata/apps/bridge-db-monitoring" +JAR_FILE="$APP_HOME/vessel-batch-aggregation.jar" + +# Java 17 경로 +JAVA_HOME="/devdata/apps/jdk-17.0.8" +JAVA_BIN="$JAVA_HOME/bin/java" + +# 로그 디렉토리 +LOG_DIR="$APP_HOME/logs" +mkdir -p $LOG_DIR + +echo "================================================" +echo "Vessel Batch Aggregation - Query Server Edition" +echo "Start Time: $(date)" +echo "================================================" + +# 경로 확인 +echo "Environment Check:" +echo "- App Home: $APP_HOME" +echo "- JAR File: $JAR_FILE" +echo "- Java Path: $JAVA_BIN" +echo "- Java Version: $($JAVA_BIN -version 2>&1 | head -1)" + +# JAR 파일 존재 확인 +if [ ! -f "$JAR_FILE" ]; then + echo "ERROR: JAR file not found at $JAR_FILE" + exit 1 +fi + +# Java 실행 파일 확인 +if [ ! -x "$JAVA_BIN" ]; then + echo "ERROR: Java not found or not executable at $JAVA_BIN" + exit 1 +fi + +# 서버 정보 확인 +echo "" +echo "Server Info:" +echo "- Hostname: $(hostname)" +echo "- CPU Cores: $(nproc)" +echo "- Total Memory: $(free -h | grep Mem | awk '{print $2}')" +echo "- PostgreSQL Version: $(psql --version 2>/dev/null | head -1 || echo 'PostgreSQL client not in PATH')" + +# 환경 변수 설정 (localhost 최적화) +export SPRING_PROFILES_ACTIVE=prod + +# Query DB와 Batch Meta DB를 localhost로 오버라이드 +export SPRING_DATASOURCE_QUERY_JDBC_URL="jdbc:postgresql://10.188.171.182:5432/mdadb?options=-csearch_path=signal,public&assumeMinServerVersion=12&reWriteBatchedInserts=true" +export SPRING_DATASOURCE_BATCH_JDBC_URL="jdbc:postgresql://10.188.171.182:5432/mdadb?currentSchema=public&assumeMinServerVersion=12&reWriteBatchedInserts=true" + +# 서버 CPU 코어 수에 따른 병렬 처리 조정 +CPU_CORES=$(nproc) +export VESSEL_BATCH_PARTITION_SIZE=$((CPU_CORES * 2)) +export VESSEL_BATCH_BULK_INSERT_PARALLEL_THREADS=$((CPU_CORES / 2)) + +echo "" +echo "Optimized Settings:" +echo "- Partition Size: $VESSEL_BATCH_PARTITION_SIZE" +echo "- Parallel Threads: $VESSEL_BATCH_BULK_INSERT_PARALLEL_THREADS" +echo "- Query DB: localhost (optimized)" +echo "- Batch Meta DB: localhost (optimized)" + +# JVM 옵션 (서버 메모리에 맞게 조정) +TOTAL_MEM=$(free -g | grep Mem | awk '{print $2}') +JVM_HEAP=$((TOTAL_MEM / 3)) # 전체 메모리의 25% 사용 + +# 최소 16GB, 최대 32GB로 제한 +if [ $JVM_HEAP -lt 8 ]; then + JVM_HEAP=8 +elif [ $JVM_HEAP -gt 16 ]; then + JVM_HEAP=16 +fi + +JAVA_OPTS="-Xms${JVM_HEAP}g -Xmx${JVM_HEAP}g \ + -XX:+UseG1GC \ + -XX:MaxGCPauseMillis=200 \ + -XX:+UseStringDeduplication \ + -XX:+ParallelRefProcEnabled \ + -XX:ParallelGCThreads=$((CPU_CORES / 2)) \ + -XX:ConcGCThreads=$((CPU_CORES / 4)) \ + -XX:+HeapDumpOnOutOfMemoryError \ + -XX:HeapDumpPath=$LOG_DIR/heapdump.hprof \ + -Dfile.encoding=UTF-8 \ + -Duser.timezone=Asia/Seoul \ + -Djava.security.egd=file:/dev/./urandom \ + -Dspring.profiles.active=prod" + +echo "- JVM Heap Size: ${JVM_HEAP}GB" + +# 기존 프로세스 확인 및 종료 +echo "" +echo "Checking for existing process..." +PID=$(pgrep -f "$JAR_FILE") +if [ ! -z "$PID" ]; then + echo "Stopping existing process (PID: $PID)..." + kill -15 $PID + + # 프로세스 종료 대기 (최대 30초) + for i in {1..30}; do + if ! kill -0 $PID 2>/dev/null; then + echo "Process stopped successfully." + break + fi + if [ $i -eq 30 ]; then + echo "Force killing process..." + kill -9 $PID + fi + sleep 1 + done +fi + +# 작업 디렉토리로 이동 +cd $APP_HOME + +# 애플리케이션 실행 (nice로 우선순위 조정) +echo "" +echo "Starting application with reduced priority..." +echo "Command: nice -n 10 $JAVA_BIN $JAVA_OPTS -jar $JAR_FILE" +echo "" + +# nohup으로 백그라운드 실행 +nohup nice -n 10 $JAVA_BIN $JAVA_OPTS -jar $JAR_FILE \ + > $LOG_DIR/app.log 2>&1 & + +NEW_PID=$! +echo "Application started with PID: $NEW_PID" + +# PID 파일 생성 +echo $NEW_PID > $APP_HOME/vessel-batch.pid + +# 시작 확인 (30초 대기) +echo "Waiting for application startup..." +STARTUP_SUCCESS=false +for i in {1..30}; do + if grep -q "Started SignalBatchApplication" $LOG_DIR/app.log 2>/dev/null; then + echo "✅ Application started successfully!" + STARTUP_SUCCESS=true + break + fi + echo -n "." + sleep 1 +done + +if [ "$STARTUP_SUCCESS" = false ]; then + echo "" + echo "⚠️ Application startup timeout. Check logs for errors." + echo "Log file: $LOG_DIR/app.log" + tail -20 $LOG_DIR/app.log +fi + +echo "" +echo "================================================" +echo "Deployment Complete!" +echo "- PID: $NEW_PID" +echo "- PID File: $APP_HOME/vessel-batch.pid" +echo "- Log: $LOG_DIR/app.log" +echo "- Monitor: tail -f $LOG_DIR/app.log" +echo "================================================" + +# 초기 상태 확인 +sleep 5 +echo "" +echo "Initial Status Check:" +curl -s http://localhost:18090/actuator/health 2>/dev/null | python -m json.tool || echo "Health endpoint not yet available" + +# 리소스 사용량 표시 +echo "" +echo "Resource Usage:" +ps aux | grep $NEW_PID | grep -v grep + +# 빠른 명령어 안내 +echo "" +echo "Useful Commands:" +echo "- Stop: kill -15 \$(cat $APP_HOME/vessel-batch.pid)" +echo "- Logs: tail -f $LOG_DIR/app.log" +echo "- Status: curl http://localhost:18090/actuator/health" +echo "- Monitor: $APP_HOME/monitor-query-server.sh" diff --git a/src/main/java/gc/mda/signal_batch/global/config/AsyncConfig.java b/src/main/java/gc/mda/signal_batch/global/config/AsyncConfig.java index 89ace1f..2e54ae7 100644 --- a/src/main/java/gc/mda/signal_batch/global/config/AsyncConfig.java +++ b/src/main/java/gc/mda/signal_batch/global/config/AsyncConfig.java @@ -20,10 +20,10 @@ public class AsyncConfig implements AsyncConfigurer { @Bean(name = "trackStreamingExecutor") public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(15); - executor.setMaxPoolSize(30); - executor.setQueueCapacity(500); - executor.setKeepAliveSeconds(40); + executor.setCorePoolSize(40); + executor.setMaxPoolSize(120); + executor.setQueueCapacity(100); + executor.setKeepAliveSeconds(30); executor.setThreadNamePrefix("track-stream-"); // executor.setTaskDecorator(new MdcTaskDecorator()); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); diff --git a/src/main/java/gc/mda/signal_batch/global/config/WebSocketProperties.java b/src/main/java/gc/mda/signal_batch/global/config/WebSocketProperties.java index ce6d183..f0adadf 100644 --- a/src/main/java/gc/mda/signal_batch/global/config/WebSocketProperties.java +++ b/src/main/java/gc/mda/signal_batch/global/config/WebSocketProperties.java @@ -16,13 +16,14 @@ public class WebSocketProperties { private QueryProperties query = new QueryProperties(); private TransportProperties transport = new TransportProperties(); private BackpressureProperties backpressure = new BackpressureProperties(); + private SessionProperties session = new SessionProperties(); @Data public static class QueryProperties { /** 서버 전체 동시 실행 쿼리 상한 */ - private int maxConcurrentGlobal = 30; + private int maxConcurrentGlobal = 60; /** 세션당 동시 쿼리 상한 */ - private int maxPerSession = 3; + private int maxPerSession = 20; /** 글로벌 대기 큐 타임아웃 (초) */ private int queueTimeoutSeconds = 30; } @@ -49,4 +50,18 @@ public class WebSocketProperties { /** 메시지당 최소 크기 (KB) */ private int minMessageSizeKb = 256; } + + @Data + public static class SessionProperties { + /** 세션 유휴 타임아웃 (밀리초) */ + private long idleTimeoutMs = 15000; + /** 서버 하트비트 간격 (밀리초) */ + private long serverHeartbeatMs = 5000; + /** 클라이언트 하트비트 간격 (밀리초) */ + private long clientHeartbeatMs = 5000; + /** SockJS disconnect delay (밀리초) */ + private long sockjsDisconnectDelayMs = 5000; + /** 메시지 전송 시간 제한 (초) */ + private int sendTimeLimitSeconds = 30; + } } diff --git a/src/main/java/gc/mda/signal_batch/global/config/WebSocketStompConfig.java b/src/main/java/gc/mda/signal_batch/global/config/WebSocketStompConfig.java index 7e166b8..a55712f 100644 --- a/src/main/java/gc/mda/signal_batch/global/config/WebSocketStompConfig.java +++ b/src/main/java/gc/mda/signal_batch/global/config/WebSocketStompConfig.java @@ -37,13 +37,14 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer { private final TrackQueryInterceptor trackQueryInterceptor; private final WebSocketUsageLoggingInterceptor webSocketUsageLoggingInterceptor; private final ObjectMapper objectMapper; // JacksonConfig에서 생성된 ObjectMapper 주입 + private final WebSocketProperties webSocketProperties; @Bean public ServletServerContainerFactoryBean createWebSocketContainer() { ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean(); container.setMaxTextMessageBufferSize(256 * 1024 * 1024); // 256MB로 증가 container.setMaxBinaryMessageBufferSize(256 * 1024 * 1024); // 256MB로 증가 - container.setMaxSessionIdleTimeout(60000L); + container.setMaxSessionIdleTimeout(webSocketProperties.getSession().getIdleTimeoutMs()); return container; } @@ -64,7 +65,7 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer { .setClientLibraryUrl("/static/libs/js/sockjs.min.js") .setStreamBytesLimit(100 * 1024 * 1024) // 100MB로 증가 .setHttpMessageCacheSize(1000) - .setDisconnectDelay(30 * 1000) + .setDisconnectDelay(webSocketProperties.getSession().getSockjsDisconnectDelayMs()) .setWebSocketEnabled(true) .setSuppressCors(false); } @@ -74,7 +75,10 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer { // 인메모리 메시지 브로커 설정 registry.enableSimpleBroker("/topic", "/queue") .setTaskScheduler(messageBrokerTaskScheduler()) - .setHeartbeatValue(new long[]{10000, 10000}); // 서버/클라이언트 하트비트 + .setHeartbeatValue(new long[]{ + webSocketProperties.getSession().getServerHeartbeatMs(), + webSocketProperties.getSession().getClientHeartbeatMs() + }); // 애플리케이션 destination prefix registry.setApplicationDestinationPrefixes("/app"); @@ -89,31 +93,33 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer { @Override public void configureWebSocketTransport(WebSocketTransportRegistration registration) { registration - .setMessageSizeLimit(50 * 1024 * 1024) // 50MB로 증가 - .setSendBufferSizeLimit(256 * 1024 * 1024) // 256MB로 증가 - .setSendTimeLimit(120 * 1000) // 120초로 증가 - .setTimeToFirstMessage(30 * 1000); // 첫 메시지까지 30초 + .setMessageSizeLimit(webSocketProperties.getTransport().getMessageSizeLimitMb() * 1024 * 1024) + .setSendBufferSizeLimit(webSocketProperties.getTransport().getSendBufferSizeLimitMb() * 1024 * 1024) + .setSendTimeLimit(webSocketProperties.getSession().getSendTimeLimitSeconds() * 1000) + .setTimeToFirstMessage(30 * 1000); // 에러 핸들러 등록은 별도로 처리 } @Override public void configureClientInboundChannel(ChannelRegistration registration) { + WebSocketProperties.TransportProperties tp = webSocketProperties.getTransport(); registration .interceptors(webSocketUsageLoggingInterceptor, trackQueryInterceptor) .taskExecutor() - .corePoolSize(10) - .maxPoolSize(20) - .queueCapacity(100); + .corePoolSize(tp.getInboundCorePoolSize()) + .maxPoolSize(tp.getInboundMaxPoolSize()) + .queueCapacity(tp.getInboundQueueCapacity()); } @Override public void configureClientOutboundChannel(ChannelRegistration registration) { + WebSocketProperties.TransportProperties tp = webSocketProperties.getTransport(); registration .taskExecutor() - .corePoolSize(20) // 10 -> 20로 증가 - .maxPoolSize(40) // 20 -> 40로 증가 - .queueCapacity(5000); // 1000 -> 5000로 증가 + .corePoolSize(tp.getOutboundCorePoolSize()) + .maxPoolSize(tp.getOutboundMaxPoolSize()) + .queueCapacity(tp.getOutboundQueueCapacity()); } @Override diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/dto/QueryStatusUpdate.java b/src/main/java/gc/mda/signal_batch/global/websocket/dto/QueryStatusUpdate.java index b53a086..4f5887e 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/dto/QueryStatusUpdate.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/dto/QueryStatusUpdate.java @@ -9,7 +9,16 @@ import lombok.NoArgsConstructor; @AllArgsConstructor public class QueryStatusUpdate { private String queryId; - private String status; // STARTED, PROCESSING, COMPLETED, CANCELLED, ERROR + private String status; // STARTED, PROCESSING, COMPLETED, CANCELLED, ERROR, QUEUED private String message; private Double progressPercentage; + private Integer queuePosition; + private Integer totalInQueue; + + public QueryStatusUpdate(String queryId, String status, String message, Double progressPercentage) { + this.queryId = queryId; + this.status = status; + this.message = message; + this.progressPercentage = progressPercentage; + } } \ No newline at end of file diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/ActiveQueryManager.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/ActiveQueryManager.java index f7c435e..e9b2773 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/ActiveQueryManager.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/ActiveQueryManager.java @@ -7,14 +7,13 @@ import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** * WebSocket 세션별 진행 중인 쿼리 추적 및 글로벌 동시 쿼리 제한 관리 + * Phase 5: 대기열 기반 관리 (거부 대신 순번 대기) */ @Slf4j @Service @@ -27,7 +26,10 @@ public class ActiveQueryManager { private Semaphore globalQuerySemaphore; private final AtomicInteger waitingCount = new AtomicInteger(0); - @Value("${websocket.query.max-concurrent-global:30}") + // 대기열 추적 (FIFO 순서 보장) + private final ConcurrentLinkedQueue waitingQueue = new ConcurrentLinkedQueue<>(); + + @Value("${websocket.query.max-concurrent-global:60}") private int maxConcurrentGlobal; @Value("${websocket.query.queue-timeout-seconds:30}") @@ -156,7 +158,20 @@ public class ActiveQueryManager { // ── 글로벌 동시 쿼리 제한 ── /** - * 글로벌 쿼리 슬롯 획득 (대기 큐 포함) + * 글로벌 쿼리 슬롯 획득 (즉시 시도, 대기 없음) + * @return true: 슬롯 획득 성공, false: 슬롯 없음 (대기열 진입 필요) + */ + public boolean tryAcquireQuerySlotImmediate(String queryId) { + boolean acquired = globalQuerySemaphore.tryAcquire(); + if (acquired) { + log.info("Query {} acquired global slot immediately: active={}/{}", + queryId, getGlobalActiveQueryCount(), maxConcurrentGlobal); + } + return acquired; + } + + /** + * 글로벌 쿼리 슬롯 획득 (대기 큐에서 FIFO 순서로 대기) - 기존 호환 * @return true: 슬롯 획득 성공, false: 타임아웃으로 획득 실패 */ public boolean tryAcquireQuerySlot(String queryId) throws InterruptedException { @@ -179,6 +194,65 @@ public class ActiveQueryManager { } } + /** + * 2초 간격으로 슬롯 대기 (대기열 기반) + * 호출자가 상태 콜백을 통해 QUEUED 상태를 클라이언트에 전송할 수 있음 + * @param queryId 쿼리 ID + * @param maxWaitSeconds 최대 대기 시간 (초) + * @return true: 슬롯 획득, false: 타임아웃 + */ + public boolean waitForSlotWithQueue(String queryId, int maxWaitSeconds) throws InterruptedException { + waitingQueue.offer(queryId); + waitingCount.incrementAndGet(); + try { + long deadline = System.currentTimeMillis() + (maxWaitSeconds * 1000L); + while (System.currentTimeMillis() < deadline) { + boolean acquired = globalQuerySemaphore.tryAcquire(2, TimeUnit.SECONDS); + if (acquired) { + waitingQueue.remove(queryId); + log.info("Query {} acquired slot after queuing: active={}/{}", + queryId, getGlobalActiveQueryCount(), maxConcurrentGlobal); + return true; + } + // 슬롯 못 얻으면 계속 대기 (호출자에서 QUEUED 상태 전송) + } + waitingQueue.remove(queryId); + log.warn("Query {} timed out in queue after {}s", queryId, maxWaitSeconds); + return false; + } finally { + waitingCount.decrementAndGet(); + } + } + + /** + * 대기열에서의 순번 조회 + * @return 1-based 순번, 대기열에 없으면 0 + */ + public int getQueuePosition(String queryId) { + int position = 1; + for (String id : waitingQueue) { + if (id.equals(queryId)) { + return position; + } + position++; + } + return 0; + } + + /** + * 현재 대기열 크기 + */ + public int getQueueSize() { + return waitingQueue.size(); + } + + /** + * 대기열 직접 접근 (서비스에서 offer/remove 가능) + */ + public ConcurrentLinkedQueue getWaitingQueue() { + return waitingQueue; + } + /** * 글로벌 쿼리 슬롯 반환 */ diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java index ef9f2c1..58b99da 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java @@ -56,6 +56,7 @@ public class ChunkedTrackStreamingService { private final ActiveQueryManager activeQueryManager; private final IntegrationVesselService integrationVesselService; private final TrackQueryInterceptor trackQueryInterceptor; + private final DailyTrackCacheManager dailyTrackCacheManager; private final WKTReader wktReader = new WKTReader(); @SuppressWarnings("unused") private final ExecutorService executorService = Executors.newFixedThreadPool(10); @@ -95,13 +96,15 @@ public class ChunkedTrackStreamingService { TrackSimplificationStrategy simplificationStrategy, ActiveQueryManager activeQueryManager, IntegrationVesselService integrationVesselService, - TrackQueryInterceptor trackQueryInterceptor) { + TrackQueryInterceptor trackQueryInterceptor, + DailyTrackCacheManager dailyTrackCacheManager) { this.queryJdbcTemplate = queryJdbcTemplate; this.queryDataSource = queryDataSource; this.simplificationStrategy = simplificationStrategy; this.activeQueryManager = activeQueryManager; this.integrationVesselService = integrationVesselService; this.trackQueryInterceptor = trackQueryInterceptor; + this.dailyTrackCacheManager = dailyTrackCacheManager; } /** @@ -730,12 +733,20 @@ public class ChunkedTrackStreamingService { List ranges = strategyMap.get(strategy); log.info("Processing {} strategy with {} ranges", strategy, ranges.size()); - // Daily 테이블의 경우 선박 기준 페이지네이션으로 처리 (누락 방지) + // Daily 테이블의 경우 캐시 우선 → 페이지네이션 폴백 if (strategy == TableStrategy.DAILY) { for (TimeRange range : ranges) { try { - // 선박 기준 페이지네이션으로 모든 데이터 수집 후 병합 - List compactTracks = processDailyTableWithPagination(request, range, null, viewportVesselIds); + List compactTracks; + LocalDate rangeDate = range.getStart().toLocalDate(); + + // 캐시 히트 시 메모리에서 가져옴 + if (dailyTrackCacheManager.isEnabled() && dailyTrackCacheManager.isCached(rangeDate)) { + compactTracks = dailyTrackCacheManager.getCachedTracks(rangeDate); + log.info("Daily cache HIT for {} in processQueryInChunks: {} tracks", rangeDate, compactTracks.size()); + } else { + compactTracks = processDailyTableWithPagination(request, range, null, viewportVesselIds); + } if (!compactTracks.isEmpty()) { // 메시지 크기로 분할 @@ -847,16 +858,45 @@ public class ChunkedTrackStreamingService { Consumer statusConsumer) { boolean slotAcquired = false; try { - // 글로벌 동시 쿼리 슬롯 획득 (대기 큐에서 FIFO 순서로 대기) - slotAcquired = activeQueryManager.tryAcquireQuerySlot(queryId); + // 글로벌 동시 쿼리 슬롯 즉시 획득 시도 + slotAcquired = activeQueryManager.tryAcquireQuerySlotImmediate(queryId); if (!slotAcquired) { - log.warn("Query {} rejected: global concurrent limit reached", queryId); - statusConsumer.accept(new QueryStatusUpdate( - queryId, "ERROR", - "Server is busy. Maximum concurrent queries reached. Please retry later.", - 0.0 - )); - return; + // 대기열 진입 → 2초 간격으로 QUEUED 상태 전송하며 슬롯 대기 + log.info("Query {} entering wait queue: active={}/{}", queryId, + activeQueryManager.getGlobalActiveQueryCount(), activeQueryManager.getMaxConcurrentGlobal()); + + int maxWaitSeconds = 120; // 최대 2분 대기 + long deadline = System.currentTimeMillis() + (maxWaitSeconds * 1000L); + activeQueryManager.getWaitingQueue().offer(queryId); + try { + while (System.currentTimeMillis() < deadline) { + slotAcquired = activeQueryManager.tryAcquireQuerySlot(queryId); + if (slotAcquired) break; + // QUEUED 상태 전송 + int position = activeQueryManager.getQueuePosition(queryId); + int totalInQueue = activeQueryManager.getQueueSize(); + QueryStatusUpdate queueStatus = new QueryStatusUpdate( + queryId, "QUEUED", + String.format("Queue position: %d/%d. Waiting for available slot...", position, totalInQueue), + 0.0 + ); + queueStatus.setQueuePosition(position); + queueStatus.setTotalInQueue(totalInQueue); + statusConsumer.accept(queueStatus); + } + } finally { + activeQueryManager.getWaitingQueue().remove(queryId); + } + + if (!slotAcquired) { + log.warn("Query {} timed out in queue after {}s", queryId, maxWaitSeconds); + statusConsumer.accept(new QueryStatusUpdate( + queryId, "ERROR", + "Server is busy. Queue timeout exceeded. Please retry later.", + 0.0 + )); + return; + } } // 취소 플래그 등록 @@ -2328,7 +2368,7 @@ public class ChunkedTrackStreamingService { } /** - * Daily 전략 처리 - 페이지별 즉시 스트리밍 (타임아웃 방지) + * Daily 전략 처리 - 캐시 우선 조회 + 페이지별 즉시 스트리밍 (타임아웃 방지) */ private void processDailyStrategy(List ranges, TrackQueryRequest request, String queryId, Consumer chunkConsumer, @@ -2344,13 +2384,74 @@ public class ChunkedTrackStreamingService { log.info("Session vessel cache initialized for {} date ranges", ranges.size()); for (TimeRange range : ranges) { - // 페이지별 즉시 스트리밍으로 처리 (세션 캐시 공유) - streamDailyTableWithPagination(request, range, queryId, chunkConsumer, statusConsumer, uniqueVesselIds, sessionVesselCache, viewportVesselIds); + LocalDate rangeDate = range.getStart().toLocalDate(); + + // 캐시 히트 체크: 해당 날짜가 인메모리 캐시에 있으면 DB 조회 생략 + if (dailyTrackCacheManager.isEnabled() && dailyTrackCacheManager.isCached(rangeDate)) { + log.info("Daily cache HIT for {}: serving from memory", rangeDate); + List cachedTracks; + + // 뷰포트 필터링 적용 + if (request.getViewport() != null) { + ViewportFilter vp = request.getViewport(); + cachedTracks = dailyTrackCacheManager.getCachedTracks( + rangeDate, vp.getMinLon(), vp.getMinLat(), vp.getMaxLon(), vp.getMaxLat()); + } else { + cachedTracks = dailyTrackCacheManager.getCachedTracks(rangeDate); + } + + if (!cachedTracks.isEmpty()) { + // viewportVesselIds 필터 적용 + if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) { + cachedTracks = cachedTracks.stream() + .filter(t -> viewportVesselIds.contains(t.getVesselId())) + .collect(Collectors.toList()); + } + + // 메시지 크기로 분할하여 전송 + List> batches = splitByMessageSize(cachedTracks); + for (List batch : batches) { + sendChunkResponse(batch, queryId, chunkConsumer, uniqueVesselIds); + } + log.info("Daily cache served {} tracks for {} in {} batches", + cachedTracks.size(), rangeDate, batches.size()); + } + } else { + // 캐시 미스: 기존 DB 페이지네이션으로 처리 + if (dailyTrackCacheManager.isEnabled()) { + log.info("Daily cache MISS for {}: falling back to DB", rangeDate); + } + streamDailyTableWithPagination(request, range, queryId, chunkConsumer, statusConsumer, uniqueVesselIds, sessionVesselCache, viewportVesselIds); + } } log.info("Session vessel cache final size: {} vessels cached", sessionVesselCache.size()); } + /** + * 캐시 데이터를 청크 응답으로 전송하는 헬퍼 메서드 + */ + private void sendChunkResponse(List batch, String queryId, + Consumer chunkConsumer, + Set uniqueVesselIds) { + TrackChunkResponse response = new TrackChunkResponse(); + response.setQueryId(queryId); + response.setChunkIndex(currentGlobalChunkIndex++); + response.setIsLastChunk(false); + response.setTotalChunks(-1); + response.setCompactTracks(batch); + + int processedMin = processedTimeRanges.values().stream().mapToInt(Integer::intValue).sum(); + response.setStats(createChunkStats(batch, uniqueVesselIds, processedMin)); + + chunkConsumer.accept(response); + batch.forEach(track -> uniqueVesselIds.add(track.getVesselId())); + + // 백프레셔 적용 + int batchSize = batch.stream().mapToInt(this::estimateTrackSize).sum(); + pendingBufferSize.addAndGet(batchSize); + } + /** * Daily 테이블 페이지별 즉시 스트리밍 - 각 페이지 완료 즉시 청크 전송 */ diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/StompTrackStreamingService.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/StompTrackStreamingService.java index 5cedcb0..5ef2dfd 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/StompTrackStreamingService.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/StompTrackStreamingService.java @@ -20,6 +20,7 @@ import org.springframework.stereotype.Service; import javax.sql.DataSource; import java.sql.*; import java.time.Duration; +import java.time.LocalDate; import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.*; @@ -43,6 +44,7 @@ public class StompTrackStreamingService { private final ActiveQueryManager activeQueryManager; private final TrackQueryInterceptor trackQueryInterceptor; + private final DailyTrackCacheManager dailyTrackCacheManager; // 활성 쿼리 관리 private final Map activeQueries = new ConcurrentHashMap<>(); @@ -66,7 +68,8 @@ public class StompTrackStreamingService { TrackSimplificationStrategy simplificationStrategy, VesselTrackMerger vesselTrackMerger, ActiveQueryManager activeQueryManager, - TrackQueryInterceptor trackQueryInterceptor) { + TrackQueryInterceptor trackQueryInterceptor, + DailyTrackCacheManager dailyTrackCacheManager) { this.queryDataSource = queryDataSource; this.queryJdbcTemplate = queryJdbcTemplate; this.areaBoundaryCache = areaBoundaryCache; @@ -76,6 +79,7 @@ public class StompTrackStreamingService { this.vesselTrackMerger = vesselTrackMerger; this.activeQueryManager = activeQueryManager; this.trackQueryInterceptor = trackQueryInterceptor; + this.dailyTrackCacheManager = dailyTrackCacheManager; } @Async @@ -95,16 +99,45 @@ public class StompTrackStreamingService { boolean slotAcquired = false; try { - // 글로벌 동시 쿼리 슬롯 획득 (대기 큐에서 FIFO 순서로 대기) - slotAcquired = activeQueryManager.tryAcquireQuerySlot(queryId); + // 글로벌 동시 쿼리 슬롯 즉시 획득 시도 + slotAcquired = activeQueryManager.tryAcquireQuerySlotImmediate(queryId); if (!slotAcquired) { - log.warn("Query {} rejected: global concurrent limit reached", queryId); - statusConsumer.accept(new QueryStatusUpdate( - queryId, "ERROR", - "Server is busy. Maximum concurrent queries reached. Please retry later.", - 0.0 - )); - return; + // 대기열 진입 → 2초 간격으로 QUEUED 상태 전송하며 슬롯 대기 + log.info("Query {} entering wait queue: active={}/{}", queryId, + activeQueryManager.getGlobalActiveQueryCount(), activeQueryManager.getMaxConcurrentGlobal()); + + int maxWaitSeconds = 120; // 최대 2분 대기 + long deadline = System.currentTimeMillis() + (maxWaitSeconds * 1000L); + activeQueryManager.getWaitingQueue().offer(queryId); + try { + while (System.currentTimeMillis() < deadline) { + slotAcquired = activeQueryManager.tryAcquireQuerySlot(queryId); + if (slotAcquired) break; + // QUEUED 상태 전송 + int position = activeQueryManager.getQueuePosition(queryId); + int totalInQueue = activeQueryManager.getQueueSize(); + QueryStatusUpdate queueStatus = new QueryStatusUpdate( + queryId, "QUEUED", + String.format("Queue position: %d/%d. Waiting for available slot...", position, totalInQueue), + 0.0 + ); + queueStatus.setQueuePosition(position); + queueStatus.setTotalInQueue(totalInQueue); + statusConsumer.accept(queueStatus); + } + } finally { + activeQueryManager.getWaitingQueue().remove(queryId); + } + + if (!slotAcquired) { + log.warn("Query {} timed out in queue after {}s", queryId, maxWaitSeconds); + statusConsumer.accept(new QueryStatusUpdate( + queryId, "ERROR", + "Server is busy. Queue timeout exceeded. Please retry later.", + 0.0 + )); + return; + } } // 입력 검증 @@ -439,12 +472,39 @@ public class StompTrackStreamingService { resultQueue, processedTracks, chunkIndex, totalTracks, filteredVessels); } - // 일별 데이터 처리 (DAILY) + // 일별 데이터 처리 (DAILY) - 캐시 우선 조회 if (strategyMap.containsKey(TableStrategy.DAILY)) { log.info("Processing daily data..."); - processSingleStrategy(request, TableStrategy.DAILY, - strategyMap.get(TableStrategy.DAILY), queryId, - resultQueue, processedTracks, chunkIndex, totalTracks, filteredVessels); + List dailyRanges = strategyMap.get(TableStrategy.DAILY); + List dbRanges = new ArrayList<>(); + + for (TimeRange range : dailyRanges) { + LocalDate rangeDate = range.getStart().toLocalDate(); + if (dailyTrackCacheManager.isEnabled() && dailyTrackCacheManager.isCached(rangeDate)) { + // 캐시 히트: 메모리에서 가져와서 직접 큐에 넣기 + log.info("Daily cache HIT for {} in StompStreaming", rangeDate); + List cachedTracks = + dailyTrackCacheManager.getCachedTracks(rangeDate); + + if (!cachedTracks.isEmpty()) { + TrackChunkResponse chunk = new TrackChunkResponse(); + chunk.setQueryId(queryId); + chunk.setChunkIndex(chunkIndex.getAndIncrement()); + chunk.setIsLastChunk(false); + chunk.setCompactTracks(cachedTracks); + processedTracks.addAndGet(cachedTracks.size()); + resultQueue.put(chunk); + } + } else { + dbRanges.add(range); + } + } + + // 캐시 미스 범위만 DB 조회 + if (!dbRanges.isEmpty()) { + processSingleStrategy(request, TableStrategy.DAILY, dbRanges, queryId, + resultQueue, processedTracks, chunkIndex, totalTracks, filteredVessels); + } } } finally { diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index 82fa49a..80d8b23 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -42,8 +42,8 @@ spring: connection-timeout: 5000 idle-timeout: 600000 max-lifetime: 1800000 - maximum-pool-size: 120 # 60 -> 120 (총 250 중 48%, WebSocket 스트리밍 + REST API 주 사용) - minimum-idle: 20 # 10 -> 20 + maximum-pool-size: 180 # 120 -> 180 (WebSocket 대기열 + REST API 주 사용) + minimum-idle: 30 # 20 -> 30 connection-test-query: SELECT 1 validation-timeout: 5000 leak-detection-threshold: 60000 # 커넥션 누수 감지 (60초) @@ -262,12 +262,26 @@ vessel: # spring 하위가 아닌 최상위 레벨 t_abnormal_tracks: retention-months: 0 # 비정상 항적: 무한 보관 +# 일일 항적 데이터 인메모리 캐시 +cache: + daily-track: + enabled: true + retention-days: 7 # D-1 ~ D-7 (오늘 제외) + max-memory-gb: 5 # 최대 5GB + warmup-async: true # 비동기 워밍업 (서버 시작 차단 없음) + # WebSocket 부하 제어 설정 websocket: query: - max-concurrent-global: 30 # 서버 전체 동시 실행 쿼리 상한 (Query풀 120 / 쿼리당 ~3커넥션) - max-per-session: 3 # 세션당 동시 쿼리 상한 + max-concurrent-global: 60 # 서버 전체 동시 실행 쿼리 상한 (Query풀 180 / 쿼리당 ~3커넥션) + max-per-session: 20 # 세션당 동시 쿼리 상한 (대기열 방식이므로 넉넉하게) queue-timeout-seconds: 30 # 글로벌 대기 큐 타임아웃 + session: + idle-timeout-ms: 15000 # 세션 유휴 타임아웃 15초 (60s → 15s) + server-heartbeat-ms: 5000 # 서버 하트비트 5초 (10s → 5s) + client-heartbeat-ms: 5000 # 클라이언트 하트비트 5초 (10s → 5s) + sockjs-disconnect-delay-ms: 5000 # SockJS 해제 지연 5초 (30s → 5s) + send-time-limit-seconds: 30 # 메시지 전송 시간 제한 30초 (120s → 30s) # 액추에이터 설정 management: diff --git a/vessel-batch-control.sh b/vessel-batch-control.sh new file mode 100644 index 0000000..f433d2a --- /dev/null +++ b/vessel-batch-control.sh @@ -0,0 +1,215 @@ +#!/bin/bash + +# Vessel Batch 관리 스크립트 +# 시작, 중지, 상태 확인 등 기본 관리 기능 + +# 애플리케이션 경로 +APP_HOME="/devdata/apps/bridge-db-monitoring" +JAR_FILE="$APP_HOME/vessel-batch-aggregation.jar" +PID_FILE="$APP_HOME/vessel-batch.pid" +LOG_DIR="$APP_HOME/logs" + +# Java 17 경로 +JAVA_HOME="/devdata/apps/jdk-17.0.8" +JAVA_BIN="$JAVA_HOME/bin/java" + +# 색상 코드 +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' + +# 함수: PID 확인 +get_pid() { + if [ -f "$PID_FILE" ]; then + PID=$(cat $PID_FILE) + if kill -0 $PID 2>/dev/null; then + echo $PID + else + rm -f $PID_FILE + echo "" + fi + else + PID=$(pgrep -f "$JAR_FILE") + echo $PID + fi +} + +# 함수: 상태 확인 +status() { + PID=$(get_pid) + if [ ! -z "$PID" ]; then + echo -e "${GREEN}✓ Vessel Batch is running (PID: $PID)${NC}" + + # 프로세스 정보 + echo "" + ps aux | grep $PID | grep -v grep + + # Health Check + echo "" + echo "Health Check:" + curl -s http://localhost:18090/actuator/health 2>/dev/null | python -m json.tool || echo "Health endpoint not available" + + # 처리 상태 + echo "" + echo "Processing Status:" + if command -v psql >/dev/null 2>&1; then + psql -h localhost -U mda -d mdadb -c " + SELECT + NOW() - MAX(last_update) as processing_delay, + COUNT(*) as vessel_count + FROM signal.t_vessel_latest_position;" 2>/dev/null || echo "Unable to query database" + fi + + return 0 + else + echo -e "${RED}✗ Vessel Batch is not running${NC}" + return 1 + fi +} + +# 함수: 시작 +start() { + PID=$(get_pid) + if [ ! -z "$PID" ]; then + echo -e "${YELLOW}Vessel Batch is already running (PID: $PID)${NC}" + return 1 + fi + + echo "Starting Vessel Batch..." + cd $APP_HOME + $APP_HOME/run-on-query-server-dev.sh +} + +# 함수: 중지 +stop() { + PID=$(get_pid) + if [ -z "$PID" ]; then + echo -e "${YELLOW}Vessel Batch is not running${NC}" + return 1 + fi + + echo "Stopping Vessel Batch (PID: $PID)..." + kill -15 $PID + + # 종료 대기 + for i in {1..30}; do + if ! kill -0 $PID 2>/dev/null; then + echo -e "${GREEN}✓ Vessel Batch stopped successfully${NC}" + rm -f $PID_FILE + return 0 + fi + echo -n "." + sleep 1 + done + + echo "" + echo -e "${RED}Process did not stop gracefully, force killing...${NC}" + kill -9 $PID + rm -f $PID_FILE +} + +# 함수: 재시작 +restart() { + echo "Restarting Vessel Batch..." + stop + sleep 3 + start +} + +# 함수: 로그 보기 +logs() { + if [ ! -d "$LOG_DIR" ]; then + echo "Log directory not found: $LOG_DIR" + return 1 + fi + + echo "Available log files:" + ls -lh $LOG_DIR/*.log 2>/dev/null + + echo "" + echo "Tailing app.log (Ctrl+C to exit)..." + tail -f $LOG_DIR/app.log +} + +# 함수: 최근 에러 확인 +errors() { + if [ ! -f "$LOG_DIR/app.log" ]; then + echo "Log file not found: $LOG_DIR/app.log" + return 1 + fi + + echo "Recent errors (last 50 lines with ERROR):" + grep "ERROR" $LOG_DIR/app.log | tail -50 + + echo "" + echo "Error summary:" + echo "Total errors: $(grep -c "ERROR" $LOG_DIR/app.log)" + echo "Errors today: $(grep "ERROR" $LOG_DIR/app.log | grep "$(date +%Y-%m-%d)" | wc -l)" +} + +# 함수: 성능 통계 +stats() { + echo "Performance Statistics" + echo "====================" + + if [ -f "$LOG_DIR/resource-monitor.csv" ]; then + echo "Recent resource usage:" + tail -5 $LOG_DIR/resource-monitor.csv | column -t -s, + fi + + echo "" + echo "Batch job statistics:" + if command -v psql >/dev/null 2>&1; then + psql -h localhost -U mda -d mdadb -c " + SELECT + job_name, + COUNT(*) as executions, + AVG(EXTRACT(EPOCH FROM (end_time - start_time))/60)::numeric(10,2) as avg_duration_min, + MAX(end_time) as last_execution + FROM batch_job_execution je + JOIN batch_job_instance ji ON je.job_instance_id = ji.job_instance_id + WHERE end_time > CURRENT_DATE - INTERVAL '7 days' + GROUP BY job_name;" 2>/dev/null || echo "Unable to query batch statistics" + fi +} + +# 메인 로직 +case "$1" in + start) + start + ;; + stop) + stop + ;; + restart) + restart + ;; + status) + status + ;; + logs) + logs + ;; + errors) + errors + ;; + stats) + stats + ;; + *) + echo "Usage: $0 {start|stop|restart|status|logs|errors|stats}" + echo "" + echo "Commands:" + echo " start - Start the Vessel Batch application" + echo " stop - Stop the Vessel Batch application" + echo " restart - Restart the Vessel Batch application" + echo " status - Check application status and health" + echo " logs - Tail application logs" + echo " errors - Show recent errors from logs" + echo " stats - Show performance statistics" + exit 1 + ;; +esac + +exit $?