feat: 대기열 기반 쿼리 관리 및 타임아웃 최적화 (Phase 5)

- 거부 대신 대기열 순번 안내: QUEUED 상태 2초 간격 전송 (최대 2분)
- QueryStatusUpdate에 queuePosition, totalInQueue 필드 추가
- ActiveQueryManager: ConcurrentLinkedQueue 기반 대기열 추적
- WebSocketProperties에 SessionProperties 추가 (타임아웃/하트비트 설정)
- WebSocketStompConfig: 하드코딩 → Properties 주입으로 전환
- application-prod.yml: Query풀 180, global 60, idle 15s, heartbeat 5s
- AsyncConfig: core 40, max 120, queue 100 (대기열 수용)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
HeungTak Lee 2026-02-06 15:34:02 +09:00
부모 60366816a6
커밋 7bd7bf556e
10개의 변경된 파일737개의 추가작업 그리고 59개의 파일을 삭제

184
run-on-query-server-dev.sh Normal file
파일 보기

@ -0,0 +1,184 @@
#!/bin/bash
# Query DB 서버에서 최적화된 실행 스크립트
# Rocky Linux 환경에 맞춰 조정됨
# Java 17 경로 명시적 지정
# 애플리케이션 경로
APP_HOME="/devdata/apps/bridge-db-monitoring"
JAR_FILE="$APP_HOME/vessel-batch-aggregation.jar"
# Java 17 경로
JAVA_HOME="/devdata/apps/jdk-17.0.8"
JAVA_BIN="$JAVA_HOME/bin/java"
# 로그 디렉토리
LOG_DIR="$APP_HOME/logs"
mkdir -p $LOG_DIR
echo "================================================"
echo "Vessel Batch Aggregation - Query Server Edition"
echo "Start Time: $(date)"
echo "================================================"
# 경로 확인
echo "Environment Check:"
echo "- App Home: $APP_HOME"
echo "- JAR File: $JAR_FILE"
echo "- Java Path: $JAVA_BIN"
echo "- Java Version: $($JAVA_BIN -version 2>&1 | head -1)"
# JAR 파일 존재 확인
if [ ! -f "$JAR_FILE" ]; then
echo "ERROR: JAR file not found at $JAR_FILE"
exit 1
fi
# Java 실행 파일 확인
if [ ! -x "$JAVA_BIN" ]; then
echo "ERROR: Java not found or not executable at $JAVA_BIN"
exit 1
fi
# 서버 정보 확인
echo ""
echo "Server Info:"
echo "- Hostname: $(hostname)"
echo "- CPU Cores: $(nproc)"
echo "- Total Memory: $(free -h | grep Mem | awk '{print $2}')"
echo "- PostgreSQL Version: $(psql --version 2>/dev/null | head -1 || echo 'PostgreSQL client not in PATH')"
# 환경 변수 설정 (localhost 최적화)
export SPRING_PROFILES_ACTIVE=prod
# Query DB와 Batch Meta DB를 localhost로 오버라이드
export SPRING_DATASOURCE_QUERY_JDBC_URL="jdbc:postgresql://10.188.171.182:5432/mdadb?options=-csearch_path=signal,public&assumeMinServerVersion=12&reWriteBatchedInserts=true"
export SPRING_DATASOURCE_BATCH_JDBC_URL="jdbc:postgresql://10.188.171.182:5432/mdadb?currentSchema=public&assumeMinServerVersion=12&reWriteBatchedInserts=true"
# 서버 CPU 코어 수에 따른 병렬 처리 조정
CPU_CORES=$(nproc)
export VESSEL_BATCH_PARTITION_SIZE=$((CPU_CORES * 2))
export VESSEL_BATCH_BULK_INSERT_PARALLEL_THREADS=$((CPU_CORES / 2))
echo ""
echo "Optimized Settings:"
echo "- Partition Size: $VESSEL_BATCH_PARTITION_SIZE"
echo "- Parallel Threads: $VESSEL_BATCH_BULK_INSERT_PARALLEL_THREADS"
echo "- Query DB: localhost (optimized)"
echo "- Batch Meta DB: localhost (optimized)"
# JVM 옵션 (서버 메모리에 맞게 조정)
TOTAL_MEM=$(free -g | grep Mem | awk '{print $2}')
JVM_HEAP=$((TOTAL_MEM / 3)) # 전체 메모리의 25% 사용
# 최소 16GB, 최대 32GB로 제한
if [ $JVM_HEAP -lt 8 ]; then
JVM_HEAP=8
elif [ $JVM_HEAP -gt 16 ]; then
JVM_HEAP=16
fi
JAVA_OPTS="-Xms${JVM_HEAP}g -Xmx${JVM_HEAP}g \
-XX:+UseG1GC \
-XX:MaxGCPauseMillis=200 \
-XX:+UseStringDeduplication \
-XX:+ParallelRefProcEnabled \
-XX:ParallelGCThreads=$((CPU_CORES / 2)) \
-XX:ConcGCThreads=$((CPU_CORES / 4)) \
-XX:+HeapDumpOnOutOfMemoryError \
-XX:HeapDumpPath=$LOG_DIR/heapdump.hprof \
-Dfile.encoding=UTF-8 \
-Duser.timezone=Asia/Seoul \
-Djava.security.egd=file:/dev/./urandom \
-Dspring.profiles.active=prod"
echo "- JVM Heap Size: ${JVM_HEAP}GB"
# 기존 프로세스 확인 및 종료
echo ""
echo "Checking for existing process..."
PID=$(pgrep -f "$JAR_FILE")
if [ ! -z "$PID" ]; then
echo "Stopping existing process (PID: $PID)..."
kill -15 $PID
# 프로세스 종료 대기 (최대 30초)
for i in {1..30}; do
if ! kill -0 $PID 2>/dev/null; then
echo "Process stopped successfully."
break
fi
if [ $i -eq 30 ]; then
echo "Force killing process..."
kill -9 $PID
fi
sleep 1
done
fi
# 작업 디렉토리로 이동
cd $APP_HOME
# 애플리케이션 실행 (nice로 우선순위 조정)
echo ""
echo "Starting application with reduced priority..."
echo "Command: nice -n 10 $JAVA_BIN $JAVA_OPTS -jar $JAR_FILE"
echo ""
# nohup으로 백그라운드 실행
nohup nice -n 10 $JAVA_BIN $JAVA_OPTS -jar $JAR_FILE \
> $LOG_DIR/app.log 2>&1 &
NEW_PID=$!
echo "Application started with PID: $NEW_PID"
# PID 파일 생성
echo $NEW_PID > $APP_HOME/vessel-batch.pid
# 시작 확인 (30초 대기)
echo "Waiting for application startup..."
STARTUP_SUCCESS=false
for i in {1..30}; do
if grep -q "Started SignalBatchApplication" $LOG_DIR/app.log 2>/dev/null; then
echo "✅ Application started successfully!"
STARTUP_SUCCESS=true
break
fi
echo -n "."
sleep 1
done
if [ "$STARTUP_SUCCESS" = false ]; then
echo ""
echo "⚠️ Application startup timeout. Check logs for errors."
echo "Log file: $LOG_DIR/app.log"
tail -20 $LOG_DIR/app.log
fi
echo ""
echo "================================================"
echo "Deployment Complete!"
echo "- PID: $NEW_PID"
echo "- PID File: $APP_HOME/vessel-batch.pid"
echo "- Log: $LOG_DIR/app.log"
echo "- Monitor: tail -f $LOG_DIR/app.log"
echo "================================================"
# 초기 상태 확인
sleep 5
echo ""
echo "Initial Status Check:"
curl -s http://localhost:18090/actuator/health 2>/dev/null | python -m json.tool || echo "Health endpoint not yet available"
# 리소스 사용량 표시
echo ""
echo "Resource Usage:"
ps aux | grep $NEW_PID | grep -v grep
# 빠른 명령어 안내
echo ""
echo "Useful Commands:"
echo "- Stop: kill -15 \$(cat $APP_HOME/vessel-batch.pid)"
echo "- Logs: tail -f $LOG_DIR/app.log"
echo "- Status: curl http://localhost:18090/actuator/health"
echo "- Monitor: $APP_HOME/monitor-query-server.sh"

파일 보기

@ -20,10 +20,10 @@ public class AsyncConfig implements AsyncConfigurer {
@Bean(name = "trackStreamingExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(15);
executor.setMaxPoolSize(30);
executor.setQueueCapacity(500);
executor.setKeepAliveSeconds(40);
executor.setCorePoolSize(40);
executor.setMaxPoolSize(120);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(30);
executor.setThreadNamePrefix("track-stream-");
// executor.setTaskDecorator(new MdcTaskDecorator());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

파일 보기

@ -16,13 +16,14 @@ public class WebSocketProperties {
private QueryProperties query = new QueryProperties();
private TransportProperties transport = new TransportProperties();
private BackpressureProperties backpressure = new BackpressureProperties();
private SessionProperties session = new SessionProperties();
@Data
public static class QueryProperties {
/** 서버 전체 동시 실행 쿼리 상한 */
private int maxConcurrentGlobal = 30;
private int maxConcurrentGlobal = 60;
/** 세션당 동시 쿼리 상한 */
private int maxPerSession = 3;
private int maxPerSession = 20;
/** 글로벌 대기 큐 타임아웃 (초) */
private int queueTimeoutSeconds = 30;
}
@ -49,4 +50,18 @@ public class WebSocketProperties {
/** 메시지당 최소 크기 (KB) */
private int minMessageSizeKb = 256;
}
@Data
public static class SessionProperties {
/** 세션 유휴 타임아웃 (밀리초) */
private long idleTimeoutMs = 15000;
/** 서버 하트비트 간격 (밀리초) */
private long serverHeartbeatMs = 5000;
/** 클라이언트 하트비트 간격 (밀리초) */
private long clientHeartbeatMs = 5000;
/** SockJS disconnect delay (밀리초) */
private long sockjsDisconnectDelayMs = 5000;
/** 메시지 전송 시간 제한 (초) */
private int sendTimeLimitSeconds = 30;
}
}

파일 보기

@ -37,13 +37,14 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
private final TrackQueryInterceptor trackQueryInterceptor;
private final WebSocketUsageLoggingInterceptor webSocketUsageLoggingInterceptor;
private final ObjectMapper objectMapper; // JacksonConfig에서 생성된 ObjectMapper 주입
private final WebSocketProperties webSocketProperties;
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(256 * 1024 * 1024); // 256MB로 증가
container.setMaxBinaryMessageBufferSize(256 * 1024 * 1024); // 256MB로 증가
container.setMaxSessionIdleTimeout(60000L);
container.setMaxSessionIdleTimeout(webSocketProperties.getSession().getIdleTimeoutMs());
return container;
}
@ -64,7 +65,7 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
.setClientLibraryUrl("/static/libs/js/sockjs.min.js")
.setStreamBytesLimit(100 * 1024 * 1024) // 100MB로 증가
.setHttpMessageCacheSize(1000)
.setDisconnectDelay(30 * 1000)
.setDisconnectDelay(webSocketProperties.getSession().getSockjsDisconnectDelayMs())
.setWebSocketEnabled(true)
.setSuppressCors(false);
}
@ -74,7 +75,10 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
// 인메모리 메시지 브로커 설정
registry.enableSimpleBroker("/topic", "/queue")
.setTaskScheduler(messageBrokerTaskScheduler())
.setHeartbeatValue(new long[]{10000, 10000}); // 서버/클라이언트 하트비트
.setHeartbeatValue(new long[]{
webSocketProperties.getSession().getServerHeartbeatMs(),
webSocketProperties.getSession().getClientHeartbeatMs()
});
// 애플리케이션 destination prefix
registry.setApplicationDestinationPrefixes("/app");
@ -89,31 +93,33 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration
.setMessageSizeLimit(50 * 1024 * 1024) // 50MB로 증가
.setSendBufferSizeLimit(256 * 1024 * 1024) // 256MB로 증가
.setSendTimeLimit(120 * 1000) // 120초로 증가
.setTimeToFirstMessage(30 * 1000); // 메시지까지 30초
.setMessageSizeLimit(webSocketProperties.getTransport().getMessageSizeLimitMb() * 1024 * 1024)
.setSendBufferSizeLimit(webSocketProperties.getTransport().getSendBufferSizeLimitMb() * 1024 * 1024)
.setSendTimeLimit(webSocketProperties.getSession().getSendTimeLimitSeconds() * 1000)
.setTimeToFirstMessage(30 * 1000);
// 에러 핸들러 등록은 별도로 처리
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
WebSocketProperties.TransportProperties tp = webSocketProperties.getTransport();
registration
.interceptors(webSocketUsageLoggingInterceptor, trackQueryInterceptor)
.taskExecutor()
.corePoolSize(10)
.maxPoolSize(20)
.queueCapacity(100);
.corePoolSize(tp.getInboundCorePoolSize())
.maxPoolSize(tp.getInboundMaxPoolSize())
.queueCapacity(tp.getInboundQueueCapacity());
}
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
WebSocketProperties.TransportProperties tp = webSocketProperties.getTransport();
registration
.taskExecutor()
.corePoolSize(20) // 10 -> 20로 증가
.maxPoolSize(40) // 20 -> 40로 증가
.queueCapacity(5000); // 1000 -> 5000로 증가
.corePoolSize(tp.getOutboundCorePoolSize())
.maxPoolSize(tp.getOutboundMaxPoolSize())
.queueCapacity(tp.getOutboundQueueCapacity());
}
@Override

파일 보기

@ -9,7 +9,16 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
public class QueryStatusUpdate {
private String queryId;
private String status; // STARTED, PROCESSING, COMPLETED, CANCELLED, ERROR
private String status; // STARTED, PROCESSING, COMPLETED, CANCELLED, ERROR, QUEUED
private String message;
private Double progressPercentage;
private Integer queuePosition;
private Integer totalInQueue;
public QueryStatusUpdate(String queryId, String status, String message, Double progressPercentage) {
this.queryId = queryId;
this.status = status;
this.message = message;
this.progressPercentage = progressPercentage;
}
}

파일 보기

@ -7,14 +7,13 @@ import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* WebSocket 세션별 진행 중인 쿼리 추적 글로벌 동시 쿼리 제한 관리
* Phase 5: 대기열 기반 관리 (거부 대신 순번 대기)
*/
@Slf4j
@Service
@ -27,7 +26,10 @@ public class ActiveQueryManager {
private Semaphore globalQuerySemaphore;
private final AtomicInteger waitingCount = new AtomicInteger(0);
@Value("${websocket.query.max-concurrent-global:30}")
// 대기열 추적 (FIFO 순서 보장)
private final ConcurrentLinkedQueue<String> waitingQueue = new ConcurrentLinkedQueue<>();
@Value("${websocket.query.max-concurrent-global:60}")
private int maxConcurrentGlobal;
@Value("${websocket.query.queue-timeout-seconds:30}")
@ -156,7 +158,20 @@ public class ActiveQueryManager {
// 글로벌 동시 쿼리 제한
/**
* 글로벌 쿼리 슬롯 획득 (대기 포함)
* 글로벌 쿼리 슬롯 획득 (즉시 시도, 대기 없음)
* @return true: 슬롯 획득 성공, false: 슬롯 없음 (대기열 진입 필요)
*/
public boolean tryAcquireQuerySlotImmediate(String queryId) {
boolean acquired = globalQuerySemaphore.tryAcquire();
if (acquired) {
log.info("Query {} acquired global slot immediately: active={}/{}",
queryId, getGlobalActiveQueryCount(), maxConcurrentGlobal);
}
return acquired;
}
/**
* 글로벌 쿼리 슬롯 획득 (대기 큐에서 FIFO 순서로 대기) - 기존 호환
* @return true: 슬롯 획득 성공, false: 타임아웃으로 획득 실패
*/
public boolean tryAcquireQuerySlot(String queryId) throws InterruptedException {
@ -179,6 +194,65 @@ public class ActiveQueryManager {
}
}
/**
* 2초 간격으로 슬롯 대기 (대기열 기반)
* 호출자가 상태 콜백을 통해 QUEUED 상태를 클라이언트에 전송할 있음
* @param queryId 쿼리 ID
* @param maxWaitSeconds 최대 대기 시간 ()
* @return true: 슬롯 획득, false: 타임아웃
*/
public boolean waitForSlotWithQueue(String queryId, int maxWaitSeconds) throws InterruptedException {
waitingQueue.offer(queryId);
waitingCount.incrementAndGet();
try {
long deadline = System.currentTimeMillis() + (maxWaitSeconds * 1000L);
while (System.currentTimeMillis() < deadline) {
boolean acquired = globalQuerySemaphore.tryAcquire(2, TimeUnit.SECONDS);
if (acquired) {
waitingQueue.remove(queryId);
log.info("Query {} acquired slot after queuing: active={}/{}",
queryId, getGlobalActiveQueryCount(), maxConcurrentGlobal);
return true;
}
// 슬롯 얻으면 계속 대기 (호출자에서 QUEUED 상태 전송)
}
waitingQueue.remove(queryId);
log.warn("Query {} timed out in queue after {}s", queryId, maxWaitSeconds);
return false;
} finally {
waitingCount.decrementAndGet();
}
}
/**
* 대기열에서의 순번 조회
* @return 1-based 순번, 대기열에 없으면 0
*/
public int getQueuePosition(String queryId) {
int position = 1;
for (String id : waitingQueue) {
if (id.equals(queryId)) {
return position;
}
position++;
}
return 0;
}
/**
* 현재 대기열 크기
*/
public int getQueueSize() {
return waitingQueue.size();
}
/**
* 대기열 직접 접근 (서비스에서 offer/remove 가능)
*/
public ConcurrentLinkedQueue<String> getWaitingQueue() {
return waitingQueue;
}
/**
* 글로벌 쿼리 슬롯 반환
*/

파일 보기

@ -56,6 +56,7 @@ public class ChunkedTrackStreamingService {
private final ActiveQueryManager activeQueryManager;
private final IntegrationVesselService integrationVesselService;
private final TrackQueryInterceptor trackQueryInterceptor;
private final DailyTrackCacheManager dailyTrackCacheManager;
private final WKTReader wktReader = new WKTReader();
@SuppressWarnings("unused")
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@ -95,13 +96,15 @@ public class ChunkedTrackStreamingService {
TrackSimplificationStrategy simplificationStrategy,
ActiveQueryManager activeQueryManager,
IntegrationVesselService integrationVesselService,
TrackQueryInterceptor trackQueryInterceptor) {
TrackQueryInterceptor trackQueryInterceptor,
DailyTrackCacheManager dailyTrackCacheManager) {
this.queryJdbcTemplate = queryJdbcTemplate;
this.queryDataSource = queryDataSource;
this.simplificationStrategy = simplificationStrategy;
this.activeQueryManager = activeQueryManager;
this.integrationVesselService = integrationVesselService;
this.trackQueryInterceptor = trackQueryInterceptor;
this.dailyTrackCacheManager = dailyTrackCacheManager;
}
/**
@ -730,12 +733,20 @@ public class ChunkedTrackStreamingService {
List<TimeRange> ranges = strategyMap.get(strategy);
log.info("Processing {} strategy with {} ranges", strategy, ranges.size());
// Daily 테이블의 경우 선박 기준 페이지네이션으로 처리 (누락 방지)
// Daily 테이블의 경우 캐시 우선 페이지네이션 폴백
if (strategy == TableStrategy.DAILY) {
for (TimeRange range : ranges) {
try {
// 선박 기준 페이지네이션으로 모든 데이터 수집 병합
List<CompactVesselTrack> compactTracks = processDailyTableWithPagination(request, range, null, viewportVesselIds);
List<CompactVesselTrack> compactTracks;
LocalDate rangeDate = range.getStart().toLocalDate();
// 캐시 히트 메모리에서 가져옴
if (dailyTrackCacheManager.isEnabled() && dailyTrackCacheManager.isCached(rangeDate)) {
compactTracks = dailyTrackCacheManager.getCachedTracks(rangeDate);
log.info("Daily cache HIT for {} in processQueryInChunks: {} tracks", rangeDate, compactTracks.size());
} else {
compactTracks = processDailyTableWithPagination(request, range, null, viewportVesselIds);
}
if (!compactTracks.isEmpty()) {
// 메시지 크기로 분할
@ -847,17 +858,46 @@ public class ChunkedTrackStreamingService {
Consumer<QueryStatusUpdate> statusConsumer) {
boolean slotAcquired = false;
try {
// 글로벌 동시 쿼리 슬롯 획득 (대기 큐에서 FIFO 순서로 대기)
slotAcquired = activeQueryManager.tryAcquireQuerySlot(queryId);
// 글로벌 동시 쿼리 슬롯 즉시 획득 시도
slotAcquired = activeQueryManager.tryAcquireQuerySlotImmediate(queryId);
if (!slotAcquired) {
log.warn("Query {} rejected: global concurrent limit reached", queryId);
// 대기열 진입 2초 간격으로 QUEUED 상태 전송하며 슬롯 대기
log.info("Query {} entering wait queue: active={}/{}", queryId,
activeQueryManager.getGlobalActiveQueryCount(), activeQueryManager.getMaxConcurrentGlobal());
int maxWaitSeconds = 120; // 최대 2분 대기
long deadline = System.currentTimeMillis() + (maxWaitSeconds * 1000L);
activeQueryManager.getWaitingQueue().offer(queryId);
try {
while (System.currentTimeMillis() < deadline) {
slotAcquired = activeQueryManager.tryAcquireQuerySlot(queryId);
if (slotAcquired) break;
// QUEUED 상태 전송
int position = activeQueryManager.getQueuePosition(queryId);
int totalInQueue = activeQueryManager.getQueueSize();
QueryStatusUpdate queueStatus = new QueryStatusUpdate(
queryId, "QUEUED",
String.format("Queue position: %d/%d. Waiting for available slot...", position, totalInQueue),
0.0
);
queueStatus.setQueuePosition(position);
queueStatus.setTotalInQueue(totalInQueue);
statusConsumer.accept(queueStatus);
}
} finally {
activeQueryManager.getWaitingQueue().remove(queryId);
}
if (!slotAcquired) {
log.warn("Query {} timed out in queue after {}s", queryId, maxWaitSeconds);
statusConsumer.accept(new QueryStatusUpdate(
queryId, "ERROR",
"Server is busy. Maximum concurrent queries reached. Please retry later.",
"Server is busy. Queue timeout exceeded. Please retry later.",
0.0
));
return;
}
}
// 취소 플래그 등록
AtomicBoolean cancelFlag = new AtomicBoolean(false);
@ -2328,7 +2368,7 @@ public class ChunkedTrackStreamingService {
}
/**
* Daily 전략 처리 - 페이지별 즉시 스트리밍 (타임아웃 방지)
* Daily 전략 처리 - 캐시 우선 조회 + 페이지별 즉시 스트리밍 (타임아웃 방지)
*/
private void processDailyStrategy(List<TimeRange> ranges, TrackQueryRequest request, String queryId,
Consumer<TrackChunkResponse> chunkConsumer,
@ -2344,13 +2384,74 @@ public class ChunkedTrackStreamingService {
log.info("Session vessel cache initialized for {} date ranges", ranges.size());
for (TimeRange range : ranges) {
// 페이지별 즉시 스트리밍으로 처리 (세션 캐시 공유)
LocalDate rangeDate = range.getStart().toLocalDate();
// 캐시 히트 체크: 해당 날짜가 인메모리 캐시에 있으면 DB 조회 생략
if (dailyTrackCacheManager.isEnabled() && dailyTrackCacheManager.isCached(rangeDate)) {
log.info("Daily cache HIT for {}: serving from memory", rangeDate);
List<CompactVesselTrack> cachedTracks;
// 뷰포트 필터링 적용
if (request.getViewport() != null) {
ViewportFilter vp = request.getViewport();
cachedTracks = dailyTrackCacheManager.getCachedTracks(
rangeDate, vp.getMinLon(), vp.getMinLat(), vp.getMaxLon(), vp.getMaxLat());
} else {
cachedTracks = dailyTrackCacheManager.getCachedTracks(rangeDate);
}
if (!cachedTracks.isEmpty()) {
// viewportVesselIds 필터 적용
if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) {
cachedTracks = cachedTracks.stream()
.filter(t -> viewportVesselIds.contains(t.getVesselId()))
.collect(Collectors.toList());
}
// 메시지 크기로 분할하여 전송
List<List<CompactVesselTrack>> batches = splitByMessageSize(cachedTracks);
for (List<CompactVesselTrack> batch : batches) {
sendChunkResponse(batch, queryId, chunkConsumer, uniqueVesselIds);
}
log.info("Daily cache served {} tracks for {} in {} batches",
cachedTracks.size(), rangeDate, batches.size());
}
} else {
// 캐시 미스: 기존 DB 페이지네이션으로 처리
if (dailyTrackCacheManager.isEnabled()) {
log.info("Daily cache MISS for {}: falling back to DB", rangeDate);
}
streamDailyTableWithPagination(request, range, queryId, chunkConsumer, statusConsumer, uniqueVesselIds, sessionVesselCache, viewportVesselIds);
}
}
log.info("Session vessel cache final size: {} vessels cached", sessionVesselCache.size());
}
/**
* 캐시 데이터를 청크 응답으로 전송하는 헬퍼 메서드
*/
private void sendChunkResponse(List<CompactVesselTrack> batch, String queryId,
Consumer<TrackChunkResponse> chunkConsumer,
Set<String> uniqueVesselIds) {
TrackChunkResponse response = new TrackChunkResponse();
response.setQueryId(queryId);
response.setChunkIndex(currentGlobalChunkIndex++);
response.setIsLastChunk(false);
response.setTotalChunks(-1);
response.setCompactTracks(batch);
int processedMin = processedTimeRanges.values().stream().mapToInt(Integer::intValue).sum();
response.setStats(createChunkStats(batch, uniqueVesselIds, processedMin));
chunkConsumer.accept(response);
batch.forEach(track -> uniqueVesselIds.add(track.getVesselId()));
// 백프레셔 적용
int batchSize = batch.stream().mapToInt(this::estimateTrackSize).sum();
pendingBufferSize.addAndGet(batchSize);
}
/**
* Daily 테이블 페이지별 즉시 스트리밍 - 페이지 완료 즉시 청크 전송
*/

파일 보기

@ -20,6 +20,7 @@ import org.springframework.stereotype.Service;
import javax.sql.DataSource;
import java.sql.*;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.*;
@ -43,6 +44,7 @@ public class StompTrackStreamingService {
private final ActiveQueryManager activeQueryManager;
private final TrackQueryInterceptor trackQueryInterceptor;
private final DailyTrackCacheManager dailyTrackCacheManager;
// 활성 쿼리 관리
private final Map<String, QueryContext> activeQueries = new ConcurrentHashMap<>();
@ -66,7 +68,8 @@ public class StompTrackStreamingService {
TrackSimplificationStrategy simplificationStrategy,
VesselTrackMerger vesselTrackMerger,
ActiveQueryManager activeQueryManager,
TrackQueryInterceptor trackQueryInterceptor) {
TrackQueryInterceptor trackQueryInterceptor,
DailyTrackCacheManager dailyTrackCacheManager) {
this.queryDataSource = queryDataSource;
this.queryJdbcTemplate = queryJdbcTemplate;
this.areaBoundaryCache = areaBoundaryCache;
@ -76,6 +79,7 @@ public class StompTrackStreamingService {
this.vesselTrackMerger = vesselTrackMerger;
this.activeQueryManager = activeQueryManager;
this.trackQueryInterceptor = trackQueryInterceptor;
this.dailyTrackCacheManager = dailyTrackCacheManager;
}
@Async
@ -95,17 +99,46 @@ public class StompTrackStreamingService {
boolean slotAcquired = false;
try {
// 글로벌 동시 쿼리 슬롯 획득 (대기 큐에서 FIFO 순서로 대기)
slotAcquired = activeQueryManager.tryAcquireQuerySlot(queryId);
// 글로벌 동시 쿼리 슬롯 즉시 획득 시도
slotAcquired = activeQueryManager.tryAcquireQuerySlotImmediate(queryId);
if (!slotAcquired) {
log.warn("Query {} rejected: global concurrent limit reached", queryId);
// 대기열 진입 2초 간격으로 QUEUED 상태 전송하며 슬롯 대기
log.info("Query {} entering wait queue: active={}/{}", queryId,
activeQueryManager.getGlobalActiveQueryCount(), activeQueryManager.getMaxConcurrentGlobal());
int maxWaitSeconds = 120; // 최대 2분 대기
long deadline = System.currentTimeMillis() + (maxWaitSeconds * 1000L);
activeQueryManager.getWaitingQueue().offer(queryId);
try {
while (System.currentTimeMillis() < deadline) {
slotAcquired = activeQueryManager.tryAcquireQuerySlot(queryId);
if (slotAcquired) break;
// QUEUED 상태 전송
int position = activeQueryManager.getQueuePosition(queryId);
int totalInQueue = activeQueryManager.getQueueSize();
QueryStatusUpdate queueStatus = new QueryStatusUpdate(
queryId, "QUEUED",
String.format("Queue position: %d/%d. Waiting for available slot...", position, totalInQueue),
0.0
);
queueStatus.setQueuePosition(position);
queueStatus.setTotalInQueue(totalInQueue);
statusConsumer.accept(queueStatus);
}
} finally {
activeQueryManager.getWaitingQueue().remove(queryId);
}
if (!slotAcquired) {
log.warn("Query {} timed out in queue after {}s", queryId, maxWaitSeconds);
statusConsumer.accept(new QueryStatusUpdate(
queryId, "ERROR",
"Server is busy. Maximum concurrent queries reached. Please retry later.",
"Server is busy. Queue timeout exceeded. Please retry later.",
0.0
));
return;
}
}
// 입력 검증
validateRequest(request);
@ -439,13 +472,40 @@ public class StompTrackStreamingService {
resultQueue, processedTracks, chunkIndex, totalTracks, filteredVessels);
}
// 일별 데이터 처리 (DAILY)
// 일별 데이터 처리 (DAILY) - 캐시 우선 조회
if (strategyMap.containsKey(TableStrategy.DAILY)) {
log.info("Processing daily data...");
processSingleStrategy(request, TableStrategy.DAILY,
strategyMap.get(TableStrategy.DAILY), queryId,
List<TimeRange> dailyRanges = strategyMap.get(TableStrategy.DAILY);
List<TimeRange> dbRanges = new ArrayList<>();
for (TimeRange range : dailyRanges) {
LocalDate rangeDate = range.getStart().toLocalDate();
if (dailyTrackCacheManager.isEnabled() && dailyTrackCacheManager.isCached(rangeDate)) {
// 캐시 히트: 메모리에서 가져와서 직접 큐에 넣기
log.info("Daily cache HIT for {} in StompStreaming", rangeDate);
List<gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack> cachedTracks =
dailyTrackCacheManager.getCachedTracks(rangeDate);
if (!cachedTracks.isEmpty()) {
TrackChunkResponse chunk = new TrackChunkResponse();
chunk.setQueryId(queryId);
chunk.setChunkIndex(chunkIndex.getAndIncrement());
chunk.setIsLastChunk(false);
chunk.setCompactTracks(cachedTracks);
processedTracks.addAndGet(cachedTracks.size());
resultQueue.put(chunk);
}
} else {
dbRanges.add(range);
}
}
// 캐시 미스 범위만 DB 조회
if (!dbRanges.isEmpty()) {
processSingleStrategy(request, TableStrategy.DAILY, dbRanges, queryId,
resultQueue, processedTracks, chunkIndex, totalTracks, filteredVessels);
}
}
} finally {
// 모든 데이터 처리 완료 신호

파일 보기

@ -42,8 +42,8 @@ spring:
connection-timeout: 5000
idle-timeout: 600000
max-lifetime: 1800000
maximum-pool-size: 120 # 60 -> 120 (총 250 중 48%, WebSocket 스트리밍 + REST API 주 사용)
minimum-idle: 20 # 10 -> 20
maximum-pool-size: 180 # 120 -> 180 (WebSocket 대기열 + REST API 주 사용)
minimum-idle: 30 # 20 -> 30
connection-test-query: SELECT 1
validation-timeout: 5000
leak-detection-threshold: 60000 # 커넥션 누수 감지 (60초)
@ -262,12 +262,26 @@ vessel: # spring 하위가 아닌 최상위 레벨
t_abnormal_tracks:
retention-months: 0 # 비정상 항적: 무한 보관
# 일일 항적 데이터 인메모리 캐시
cache:
daily-track:
enabled: true
retention-days: 7 # D-1 ~ D-7 (오늘 제외)
max-memory-gb: 5 # 최대 5GB
warmup-async: true # 비동기 워밍업 (서버 시작 차단 없음)
# WebSocket 부하 제어 설정
websocket:
query:
max-concurrent-global: 30 # 서버 전체 동시 실행 쿼리 상한 (Query풀 120 / 쿼리당 ~3커넥션)
max-per-session: 3 # 세션당 동시 쿼리 상한
max-concurrent-global: 60 # 서버 전체 동시 실행 쿼리 상한 (Query풀 180 / 쿼리당 ~3커넥션)
max-per-session: 20 # 세션당 동시 쿼리 상한 (대기열 방식이므로 넉넉하게)
queue-timeout-seconds: 30 # 글로벌 대기 큐 타임아웃
session:
idle-timeout-ms: 15000 # 세션 유휴 타임아웃 15초 (60s → 15s)
server-heartbeat-ms: 5000 # 서버 하트비트 5초 (10s → 5s)
client-heartbeat-ms: 5000 # 클라이언트 하트비트 5초 (10s → 5s)
sockjs-disconnect-delay-ms: 5000 # SockJS 해제 지연 5초 (30s → 5s)
send-time-limit-seconds: 30 # 메시지 전송 시간 제한 30초 (120s → 30s)
# 액추에이터 설정
management:

215
vessel-batch-control.sh Normal file
파일 보기

@ -0,0 +1,215 @@
#!/bin/bash
# Vessel Batch 관리 스크립트
# 시작, 중지, 상태 확인 등 기본 관리 기능
# 애플리케이션 경로
APP_HOME="/devdata/apps/bridge-db-monitoring"
JAR_FILE="$APP_HOME/vessel-batch-aggregation.jar"
PID_FILE="$APP_HOME/vessel-batch.pid"
LOG_DIR="$APP_HOME/logs"
# Java 17 경로
JAVA_HOME="/devdata/apps/jdk-17.0.8"
JAVA_BIN="$JAVA_HOME/bin/java"
# 색상 코드
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
# 함수: PID 확인
get_pid() {
if [ -f "$PID_FILE" ]; then
PID=$(cat $PID_FILE)
if kill -0 $PID 2>/dev/null; then
echo $PID
else
rm -f $PID_FILE
echo ""
fi
else
PID=$(pgrep -f "$JAR_FILE")
echo $PID
fi
}
# 함수: 상태 확인
status() {
PID=$(get_pid)
if [ ! -z "$PID" ]; then
echo -e "${GREEN}✓ Vessel Batch is running (PID: $PID)${NC}"
# 프로세스 정보
echo ""
ps aux | grep $PID | grep -v grep
# Health Check
echo ""
echo "Health Check:"
curl -s http://localhost:18090/actuator/health 2>/dev/null | python -m json.tool || echo "Health endpoint not available"
# 처리 상태
echo ""
echo "Processing Status:"
if command -v psql >/dev/null 2>&1; then
psql -h localhost -U mda -d mdadb -c "
SELECT
NOW() - MAX(last_update) as processing_delay,
COUNT(*) as vessel_count
FROM signal.t_vessel_latest_position;" 2>/dev/null || echo "Unable to query database"
fi
return 0
else
echo -e "${RED}✗ Vessel Batch is not running${NC}"
return 1
fi
}
# 함수: 시작
start() {
PID=$(get_pid)
if [ ! -z "$PID" ]; then
echo -e "${YELLOW}Vessel Batch is already running (PID: $PID)${NC}"
return 1
fi
echo "Starting Vessel Batch..."
cd $APP_HOME
$APP_HOME/run-on-query-server-dev.sh
}
# 함수: 중지
stop() {
PID=$(get_pid)
if [ -z "$PID" ]; then
echo -e "${YELLOW}Vessel Batch is not running${NC}"
return 1
fi
echo "Stopping Vessel Batch (PID: $PID)..."
kill -15 $PID
# 종료 대기
for i in {1..30}; do
if ! kill -0 $PID 2>/dev/null; then
echo -e "${GREEN}✓ Vessel Batch stopped successfully${NC}"
rm -f $PID_FILE
return 0
fi
echo -n "."
sleep 1
done
echo ""
echo -e "${RED}Process did not stop gracefully, force killing...${NC}"
kill -9 $PID
rm -f $PID_FILE
}
# 함수: 재시작
restart() {
echo "Restarting Vessel Batch..."
stop
sleep 3
start
}
# 함수: 로그 보기
logs() {
if [ ! -d "$LOG_DIR" ]; then
echo "Log directory not found: $LOG_DIR"
return 1
fi
echo "Available log files:"
ls -lh $LOG_DIR/*.log 2>/dev/null
echo ""
echo "Tailing app.log (Ctrl+C to exit)..."
tail -f $LOG_DIR/app.log
}
# 함수: 최근 에러 확인
errors() {
if [ ! -f "$LOG_DIR/app.log" ]; then
echo "Log file not found: $LOG_DIR/app.log"
return 1
fi
echo "Recent errors (last 50 lines with ERROR):"
grep "ERROR" $LOG_DIR/app.log | tail -50
echo ""
echo "Error summary:"
echo "Total errors: $(grep -c "ERROR" $LOG_DIR/app.log)"
echo "Errors today: $(grep "ERROR" $LOG_DIR/app.log | grep "$(date +%Y-%m-%d)" | wc -l)"
}
# 함수: 성능 통계
stats() {
echo "Performance Statistics"
echo "===================="
if [ -f "$LOG_DIR/resource-monitor.csv" ]; then
echo "Recent resource usage:"
tail -5 $LOG_DIR/resource-monitor.csv | column -t -s,
fi
echo ""
echo "Batch job statistics:"
if command -v psql >/dev/null 2>&1; then
psql -h localhost -U mda -d mdadb -c "
SELECT
job_name,
COUNT(*) as executions,
AVG(EXTRACT(EPOCH FROM (end_time - start_time))/60)::numeric(10,2) as avg_duration_min,
MAX(end_time) as last_execution
FROM batch_job_execution je
JOIN batch_job_instance ji ON je.job_instance_id = ji.job_instance_id
WHERE end_time > CURRENT_DATE - INTERVAL '7 days'
GROUP BY job_name;" 2>/dev/null || echo "Unable to query batch statistics"
fi
}
# 메인 로직
case "$1" in
start)
start
;;
stop)
stop
;;
restart)
restart
;;
status)
status
;;
logs)
logs
;;
errors)
errors
;;
stats)
stats
;;
*)
echo "Usage: $0 {start|stop|restart|status|logs|errors|stats}"
echo ""
echo "Commands:"
echo " start - Start the Vessel Batch application"
echo " stop - Stop the Vessel Batch application"
echo " restart - Restart the Vessel Batch application"
echo " status - Check application status and health"
echo " logs - Tail application logs"
echo " errors - Show recent errors from logs"
echo " stats - Show performance statistics"
exit 1
;;
esac
exit $?