feat: 대기열 기반 쿼리 관리 및 타임아웃 최적화 (Phase 5)
- 거부 대신 대기열 순번 안내: QUEUED 상태 2초 간격 전송 (최대 2분) - QueryStatusUpdate에 queuePosition, totalInQueue 필드 추가 - ActiveQueryManager: ConcurrentLinkedQueue 기반 대기열 추적 - WebSocketProperties에 SessionProperties 추가 (타임아웃/하트비트 설정) - WebSocketStompConfig: 하드코딩 → Properties 주입으로 전환 - application-prod.yml: Query풀 180, global 60, idle 15s, heartbeat 5s - AsyncConfig: core 40, max 120, queue 100 (대기열 수용) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
부모
60366816a6
커밋
7bd7bf556e
184
run-on-query-server-dev.sh
Normal file
184
run-on-query-server-dev.sh
Normal file
@ -0,0 +1,184 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Query DB 서버에서 최적화된 실행 스크립트
|
||||
# Rocky Linux 환경에 맞춰 조정됨
|
||||
# Java 17 경로 명시적 지정
|
||||
|
||||
# 애플리케이션 경로
|
||||
APP_HOME="/devdata/apps/bridge-db-monitoring"
|
||||
JAR_FILE="$APP_HOME/vessel-batch-aggregation.jar"
|
||||
|
||||
# Java 17 경로
|
||||
JAVA_HOME="/devdata/apps/jdk-17.0.8"
|
||||
JAVA_BIN="$JAVA_HOME/bin/java"
|
||||
|
||||
# 로그 디렉토리
|
||||
LOG_DIR="$APP_HOME/logs"
|
||||
mkdir -p $LOG_DIR
|
||||
|
||||
echo "================================================"
|
||||
echo "Vessel Batch Aggregation - Query Server Edition"
|
||||
echo "Start Time: $(date)"
|
||||
echo "================================================"
|
||||
|
||||
# 경로 확인
|
||||
echo "Environment Check:"
|
||||
echo "- App Home: $APP_HOME"
|
||||
echo "- JAR File: $JAR_FILE"
|
||||
echo "- Java Path: $JAVA_BIN"
|
||||
echo "- Java Version: $($JAVA_BIN -version 2>&1 | head -1)"
|
||||
|
||||
# JAR 파일 존재 확인
|
||||
if [ ! -f "$JAR_FILE" ]; then
|
||||
echo "ERROR: JAR file not found at $JAR_FILE"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Java 실행 파일 확인
|
||||
if [ ! -x "$JAVA_BIN" ]; then
|
||||
echo "ERROR: Java not found or not executable at $JAVA_BIN"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# 서버 정보 확인
|
||||
echo ""
|
||||
echo "Server Info:"
|
||||
echo "- Hostname: $(hostname)"
|
||||
echo "- CPU Cores: $(nproc)"
|
||||
echo "- Total Memory: $(free -h | grep Mem | awk '{print $2}')"
|
||||
echo "- PostgreSQL Version: $(psql --version 2>/dev/null | head -1 || echo 'PostgreSQL client not in PATH')"
|
||||
|
||||
# 환경 변수 설정 (localhost 최적화)
|
||||
export SPRING_PROFILES_ACTIVE=prod
|
||||
|
||||
# Query DB와 Batch Meta DB를 localhost로 오버라이드
|
||||
export SPRING_DATASOURCE_QUERY_JDBC_URL="jdbc:postgresql://10.188.171.182:5432/mdadb?options=-csearch_path=signal,public&assumeMinServerVersion=12&reWriteBatchedInserts=true"
|
||||
export SPRING_DATASOURCE_BATCH_JDBC_URL="jdbc:postgresql://10.188.171.182:5432/mdadb?currentSchema=public&assumeMinServerVersion=12&reWriteBatchedInserts=true"
|
||||
|
||||
# 서버 CPU 코어 수에 따른 병렬 처리 조정
|
||||
CPU_CORES=$(nproc)
|
||||
export VESSEL_BATCH_PARTITION_SIZE=$((CPU_CORES * 2))
|
||||
export VESSEL_BATCH_BULK_INSERT_PARALLEL_THREADS=$((CPU_CORES / 2))
|
||||
|
||||
echo ""
|
||||
echo "Optimized Settings:"
|
||||
echo "- Partition Size: $VESSEL_BATCH_PARTITION_SIZE"
|
||||
echo "- Parallel Threads: $VESSEL_BATCH_BULK_INSERT_PARALLEL_THREADS"
|
||||
echo "- Query DB: localhost (optimized)"
|
||||
echo "- Batch Meta DB: localhost (optimized)"
|
||||
|
||||
# JVM 옵션 (서버 메모리에 맞게 조정)
|
||||
TOTAL_MEM=$(free -g | grep Mem | awk '{print $2}')
|
||||
JVM_HEAP=$((TOTAL_MEM / 3)) # 전체 메모리의 25% 사용
|
||||
|
||||
# 최소 16GB, 최대 32GB로 제한
|
||||
if [ $JVM_HEAP -lt 8 ]; then
|
||||
JVM_HEAP=8
|
||||
elif [ $JVM_HEAP -gt 16 ]; then
|
||||
JVM_HEAP=16
|
||||
fi
|
||||
|
||||
JAVA_OPTS="-Xms${JVM_HEAP}g -Xmx${JVM_HEAP}g \
|
||||
-XX:+UseG1GC \
|
||||
-XX:MaxGCPauseMillis=200 \
|
||||
-XX:+UseStringDeduplication \
|
||||
-XX:+ParallelRefProcEnabled \
|
||||
-XX:ParallelGCThreads=$((CPU_CORES / 2)) \
|
||||
-XX:ConcGCThreads=$((CPU_CORES / 4)) \
|
||||
-XX:+HeapDumpOnOutOfMemoryError \
|
||||
-XX:HeapDumpPath=$LOG_DIR/heapdump.hprof \
|
||||
-Dfile.encoding=UTF-8 \
|
||||
-Duser.timezone=Asia/Seoul \
|
||||
-Djava.security.egd=file:/dev/./urandom \
|
||||
-Dspring.profiles.active=prod"
|
||||
|
||||
echo "- JVM Heap Size: ${JVM_HEAP}GB"
|
||||
|
||||
# 기존 프로세스 확인 및 종료
|
||||
echo ""
|
||||
echo "Checking for existing process..."
|
||||
PID=$(pgrep -f "$JAR_FILE")
|
||||
if [ ! -z "$PID" ]; then
|
||||
echo "Stopping existing process (PID: $PID)..."
|
||||
kill -15 $PID
|
||||
|
||||
# 프로세스 종료 대기 (최대 30초)
|
||||
for i in {1..30}; do
|
||||
if ! kill -0 $PID 2>/dev/null; then
|
||||
echo "Process stopped successfully."
|
||||
break
|
||||
fi
|
||||
if [ $i -eq 30 ]; then
|
||||
echo "Force killing process..."
|
||||
kill -9 $PID
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
fi
|
||||
|
||||
# 작업 디렉토리로 이동
|
||||
cd $APP_HOME
|
||||
|
||||
# 애플리케이션 실행 (nice로 우선순위 조정)
|
||||
echo ""
|
||||
echo "Starting application with reduced priority..."
|
||||
echo "Command: nice -n 10 $JAVA_BIN $JAVA_OPTS -jar $JAR_FILE"
|
||||
echo ""
|
||||
|
||||
# nohup으로 백그라운드 실행
|
||||
nohup nice -n 10 $JAVA_BIN $JAVA_OPTS -jar $JAR_FILE \
|
||||
> $LOG_DIR/app.log 2>&1 &
|
||||
|
||||
NEW_PID=$!
|
||||
echo "Application started with PID: $NEW_PID"
|
||||
|
||||
# PID 파일 생성
|
||||
echo $NEW_PID > $APP_HOME/vessel-batch.pid
|
||||
|
||||
# 시작 확인 (30초 대기)
|
||||
echo "Waiting for application startup..."
|
||||
STARTUP_SUCCESS=false
|
||||
for i in {1..30}; do
|
||||
if grep -q "Started SignalBatchApplication" $LOG_DIR/app.log 2>/dev/null; then
|
||||
echo "✅ Application started successfully!"
|
||||
STARTUP_SUCCESS=true
|
||||
break
|
||||
fi
|
||||
echo -n "."
|
||||
sleep 1
|
||||
done
|
||||
|
||||
if [ "$STARTUP_SUCCESS" = false ]; then
|
||||
echo ""
|
||||
echo "⚠️ Application startup timeout. Check logs for errors."
|
||||
echo "Log file: $LOG_DIR/app.log"
|
||||
tail -20 $LOG_DIR/app.log
|
||||
fi
|
||||
|
||||
echo ""
|
||||
echo "================================================"
|
||||
echo "Deployment Complete!"
|
||||
echo "- PID: $NEW_PID"
|
||||
echo "- PID File: $APP_HOME/vessel-batch.pid"
|
||||
echo "- Log: $LOG_DIR/app.log"
|
||||
echo "- Monitor: tail -f $LOG_DIR/app.log"
|
||||
echo "================================================"
|
||||
|
||||
# 초기 상태 확인
|
||||
sleep 5
|
||||
echo ""
|
||||
echo "Initial Status Check:"
|
||||
curl -s http://localhost:18090/actuator/health 2>/dev/null | python -m json.tool || echo "Health endpoint not yet available"
|
||||
|
||||
# 리소스 사용량 표시
|
||||
echo ""
|
||||
echo "Resource Usage:"
|
||||
ps aux | grep $NEW_PID | grep -v grep
|
||||
|
||||
# 빠른 명령어 안내
|
||||
echo ""
|
||||
echo "Useful Commands:"
|
||||
echo "- Stop: kill -15 \$(cat $APP_HOME/vessel-batch.pid)"
|
||||
echo "- Logs: tail -f $LOG_DIR/app.log"
|
||||
echo "- Status: curl http://localhost:18090/actuator/health"
|
||||
echo "- Monitor: $APP_HOME/monitor-query-server.sh"
|
||||
@ -20,10 +20,10 @@ public class AsyncConfig implements AsyncConfigurer {
|
||||
@Bean(name = "trackStreamingExecutor")
|
||||
public Executor getAsyncExecutor() {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
executor.setCorePoolSize(15);
|
||||
executor.setMaxPoolSize(30);
|
||||
executor.setQueueCapacity(500);
|
||||
executor.setKeepAliveSeconds(40);
|
||||
executor.setCorePoolSize(40);
|
||||
executor.setMaxPoolSize(120);
|
||||
executor.setQueueCapacity(100);
|
||||
executor.setKeepAliveSeconds(30);
|
||||
executor.setThreadNamePrefix("track-stream-");
|
||||
// executor.setTaskDecorator(new MdcTaskDecorator());
|
||||
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
|
||||
@ -16,13 +16,14 @@ public class WebSocketProperties {
|
||||
private QueryProperties query = new QueryProperties();
|
||||
private TransportProperties transport = new TransportProperties();
|
||||
private BackpressureProperties backpressure = new BackpressureProperties();
|
||||
private SessionProperties session = new SessionProperties();
|
||||
|
||||
@Data
|
||||
public static class QueryProperties {
|
||||
/** 서버 전체 동시 실행 쿼리 상한 */
|
||||
private int maxConcurrentGlobal = 30;
|
||||
private int maxConcurrentGlobal = 60;
|
||||
/** 세션당 동시 쿼리 상한 */
|
||||
private int maxPerSession = 3;
|
||||
private int maxPerSession = 20;
|
||||
/** 글로벌 대기 큐 타임아웃 (초) */
|
||||
private int queueTimeoutSeconds = 30;
|
||||
}
|
||||
@ -49,4 +50,18 @@ public class WebSocketProperties {
|
||||
/** 메시지당 최소 크기 (KB) */
|
||||
private int minMessageSizeKb = 256;
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class SessionProperties {
|
||||
/** 세션 유휴 타임아웃 (밀리초) */
|
||||
private long idleTimeoutMs = 15000;
|
||||
/** 서버 하트비트 간격 (밀리초) */
|
||||
private long serverHeartbeatMs = 5000;
|
||||
/** 클라이언트 하트비트 간격 (밀리초) */
|
||||
private long clientHeartbeatMs = 5000;
|
||||
/** SockJS disconnect delay (밀리초) */
|
||||
private long sockjsDisconnectDelayMs = 5000;
|
||||
/** 메시지 전송 시간 제한 (초) */
|
||||
private int sendTimeLimitSeconds = 30;
|
||||
}
|
||||
}
|
||||
|
||||
@ -37,13 +37,14 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
|
||||
private final TrackQueryInterceptor trackQueryInterceptor;
|
||||
private final WebSocketUsageLoggingInterceptor webSocketUsageLoggingInterceptor;
|
||||
private final ObjectMapper objectMapper; // JacksonConfig에서 생성된 ObjectMapper 주입
|
||||
private final WebSocketProperties webSocketProperties;
|
||||
|
||||
@Bean
|
||||
public ServletServerContainerFactoryBean createWebSocketContainer() {
|
||||
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
|
||||
container.setMaxTextMessageBufferSize(256 * 1024 * 1024); // 256MB로 증가
|
||||
container.setMaxBinaryMessageBufferSize(256 * 1024 * 1024); // 256MB로 증가
|
||||
container.setMaxSessionIdleTimeout(60000L);
|
||||
container.setMaxSessionIdleTimeout(webSocketProperties.getSession().getIdleTimeoutMs());
|
||||
return container;
|
||||
}
|
||||
|
||||
@ -64,7 +65,7 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
|
||||
.setClientLibraryUrl("/static/libs/js/sockjs.min.js")
|
||||
.setStreamBytesLimit(100 * 1024 * 1024) // 100MB로 증가
|
||||
.setHttpMessageCacheSize(1000)
|
||||
.setDisconnectDelay(30 * 1000)
|
||||
.setDisconnectDelay(webSocketProperties.getSession().getSockjsDisconnectDelayMs())
|
||||
.setWebSocketEnabled(true)
|
||||
.setSuppressCors(false);
|
||||
}
|
||||
@ -74,7 +75,10 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
|
||||
// 인메모리 메시지 브로커 설정
|
||||
registry.enableSimpleBroker("/topic", "/queue")
|
||||
.setTaskScheduler(messageBrokerTaskScheduler())
|
||||
.setHeartbeatValue(new long[]{10000, 10000}); // 서버/클라이언트 하트비트
|
||||
.setHeartbeatValue(new long[]{
|
||||
webSocketProperties.getSession().getServerHeartbeatMs(),
|
||||
webSocketProperties.getSession().getClientHeartbeatMs()
|
||||
});
|
||||
|
||||
// 애플리케이션 destination prefix
|
||||
registry.setApplicationDestinationPrefixes("/app");
|
||||
@ -89,31 +93,33 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
|
||||
@Override
|
||||
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
|
||||
registration
|
||||
.setMessageSizeLimit(50 * 1024 * 1024) // 50MB로 증가
|
||||
.setSendBufferSizeLimit(256 * 1024 * 1024) // 256MB로 증가
|
||||
.setSendTimeLimit(120 * 1000) // 120초로 증가
|
||||
.setTimeToFirstMessage(30 * 1000); // 첫 메시지까지 30초
|
||||
.setMessageSizeLimit(webSocketProperties.getTransport().getMessageSizeLimitMb() * 1024 * 1024)
|
||||
.setSendBufferSizeLimit(webSocketProperties.getTransport().getSendBufferSizeLimitMb() * 1024 * 1024)
|
||||
.setSendTimeLimit(webSocketProperties.getSession().getSendTimeLimitSeconds() * 1000)
|
||||
.setTimeToFirstMessage(30 * 1000);
|
||||
|
||||
// 에러 핸들러 등록은 별도로 처리
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configureClientInboundChannel(ChannelRegistration registration) {
|
||||
WebSocketProperties.TransportProperties tp = webSocketProperties.getTransport();
|
||||
registration
|
||||
.interceptors(webSocketUsageLoggingInterceptor, trackQueryInterceptor)
|
||||
.taskExecutor()
|
||||
.corePoolSize(10)
|
||||
.maxPoolSize(20)
|
||||
.queueCapacity(100);
|
||||
.corePoolSize(tp.getInboundCorePoolSize())
|
||||
.maxPoolSize(tp.getInboundMaxPoolSize())
|
||||
.queueCapacity(tp.getInboundQueueCapacity());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configureClientOutboundChannel(ChannelRegistration registration) {
|
||||
WebSocketProperties.TransportProperties tp = webSocketProperties.getTransport();
|
||||
registration
|
||||
.taskExecutor()
|
||||
.corePoolSize(20) // 10 -> 20로 증가
|
||||
.maxPoolSize(40) // 20 -> 40로 증가
|
||||
.queueCapacity(5000); // 1000 -> 5000로 증가
|
||||
.corePoolSize(tp.getOutboundCorePoolSize())
|
||||
.maxPoolSize(tp.getOutboundMaxPoolSize())
|
||||
.queueCapacity(tp.getOutboundQueueCapacity());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -9,7 +9,16 @@ import lombok.NoArgsConstructor;
|
||||
@AllArgsConstructor
|
||||
public class QueryStatusUpdate {
|
||||
private String queryId;
|
||||
private String status; // STARTED, PROCESSING, COMPLETED, CANCELLED, ERROR
|
||||
private String status; // STARTED, PROCESSING, COMPLETED, CANCELLED, ERROR, QUEUED
|
||||
private String message;
|
||||
private Double progressPercentage;
|
||||
private Integer queuePosition;
|
||||
private Integer totalInQueue;
|
||||
|
||||
public QueryStatusUpdate(String queryId, String status, String message, Double progressPercentage) {
|
||||
this.queryId = queryId;
|
||||
this.status = status;
|
||||
this.message = message;
|
||||
this.progressPercentage = progressPercentage;
|
||||
}
|
||||
}
|
||||
@ -7,14 +7,13 @@ import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* WebSocket 세션별 진행 중인 쿼리 추적 및 글로벌 동시 쿼리 제한 관리
|
||||
* Phase 5: 대기열 기반 관리 (거부 대신 순번 대기)
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@ -27,7 +26,10 @@ public class ActiveQueryManager {
|
||||
private Semaphore globalQuerySemaphore;
|
||||
private final AtomicInteger waitingCount = new AtomicInteger(0);
|
||||
|
||||
@Value("${websocket.query.max-concurrent-global:30}")
|
||||
// 대기열 추적 (FIFO 순서 보장)
|
||||
private final ConcurrentLinkedQueue<String> waitingQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
@Value("${websocket.query.max-concurrent-global:60}")
|
||||
private int maxConcurrentGlobal;
|
||||
|
||||
@Value("${websocket.query.queue-timeout-seconds:30}")
|
||||
@ -156,7 +158,20 @@ public class ActiveQueryManager {
|
||||
// ── 글로벌 동시 쿼리 제한 ──
|
||||
|
||||
/**
|
||||
* 글로벌 쿼리 슬롯 획득 (대기 큐 포함)
|
||||
* 글로벌 쿼리 슬롯 획득 (즉시 시도, 대기 없음)
|
||||
* @return true: 슬롯 획득 성공, false: 슬롯 없음 (대기열 진입 필요)
|
||||
*/
|
||||
public boolean tryAcquireQuerySlotImmediate(String queryId) {
|
||||
boolean acquired = globalQuerySemaphore.tryAcquire();
|
||||
if (acquired) {
|
||||
log.info("Query {} acquired global slot immediately: active={}/{}",
|
||||
queryId, getGlobalActiveQueryCount(), maxConcurrentGlobal);
|
||||
}
|
||||
return acquired;
|
||||
}
|
||||
|
||||
/**
|
||||
* 글로벌 쿼리 슬롯 획득 (대기 큐에서 FIFO 순서로 대기) - 기존 호환
|
||||
* @return true: 슬롯 획득 성공, false: 타임아웃으로 획득 실패
|
||||
*/
|
||||
public boolean tryAcquireQuerySlot(String queryId) throws InterruptedException {
|
||||
@ -179,6 +194,65 @@ public class ActiveQueryManager {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 2초 간격으로 슬롯 대기 (대기열 기반)
|
||||
* 호출자가 상태 콜백을 통해 QUEUED 상태를 클라이언트에 전송할 수 있음
|
||||
* @param queryId 쿼리 ID
|
||||
* @param maxWaitSeconds 최대 대기 시간 (초)
|
||||
* @return true: 슬롯 획득, false: 타임아웃
|
||||
*/
|
||||
public boolean waitForSlotWithQueue(String queryId, int maxWaitSeconds) throws InterruptedException {
|
||||
waitingQueue.offer(queryId);
|
||||
waitingCount.incrementAndGet();
|
||||
try {
|
||||
long deadline = System.currentTimeMillis() + (maxWaitSeconds * 1000L);
|
||||
while (System.currentTimeMillis() < deadline) {
|
||||
boolean acquired = globalQuerySemaphore.tryAcquire(2, TimeUnit.SECONDS);
|
||||
if (acquired) {
|
||||
waitingQueue.remove(queryId);
|
||||
log.info("Query {} acquired slot after queuing: active={}/{}",
|
||||
queryId, getGlobalActiveQueryCount(), maxConcurrentGlobal);
|
||||
return true;
|
||||
}
|
||||
// 슬롯 못 얻으면 계속 대기 (호출자에서 QUEUED 상태 전송)
|
||||
}
|
||||
waitingQueue.remove(queryId);
|
||||
log.warn("Query {} timed out in queue after {}s", queryId, maxWaitSeconds);
|
||||
return false;
|
||||
} finally {
|
||||
waitingCount.decrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 대기열에서의 순번 조회
|
||||
* @return 1-based 순번, 대기열에 없으면 0
|
||||
*/
|
||||
public int getQueuePosition(String queryId) {
|
||||
int position = 1;
|
||||
for (String id : waitingQueue) {
|
||||
if (id.equals(queryId)) {
|
||||
return position;
|
||||
}
|
||||
position++;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* 현재 대기열 크기
|
||||
*/
|
||||
public int getQueueSize() {
|
||||
return waitingQueue.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* 대기열 직접 접근 (서비스에서 offer/remove 가능)
|
||||
*/
|
||||
public ConcurrentLinkedQueue<String> getWaitingQueue() {
|
||||
return waitingQueue;
|
||||
}
|
||||
|
||||
/**
|
||||
* 글로벌 쿼리 슬롯 반환
|
||||
*/
|
||||
|
||||
@ -56,6 +56,7 @@ public class ChunkedTrackStreamingService {
|
||||
private final ActiveQueryManager activeQueryManager;
|
||||
private final IntegrationVesselService integrationVesselService;
|
||||
private final TrackQueryInterceptor trackQueryInterceptor;
|
||||
private final DailyTrackCacheManager dailyTrackCacheManager;
|
||||
private final WKTReader wktReader = new WKTReader();
|
||||
@SuppressWarnings("unused")
|
||||
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
|
||||
@ -95,13 +96,15 @@ public class ChunkedTrackStreamingService {
|
||||
TrackSimplificationStrategy simplificationStrategy,
|
||||
ActiveQueryManager activeQueryManager,
|
||||
IntegrationVesselService integrationVesselService,
|
||||
TrackQueryInterceptor trackQueryInterceptor) {
|
||||
TrackQueryInterceptor trackQueryInterceptor,
|
||||
DailyTrackCacheManager dailyTrackCacheManager) {
|
||||
this.queryJdbcTemplate = queryJdbcTemplate;
|
||||
this.queryDataSource = queryDataSource;
|
||||
this.simplificationStrategy = simplificationStrategy;
|
||||
this.activeQueryManager = activeQueryManager;
|
||||
this.integrationVesselService = integrationVesselService;
|
||||
this.trackQueryInterceptor = trackQueryInterceptor;
|
||||
this.dailyTrackCacheManager = dailyTrackCacheManager;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -730,12 +733,20 @@ public class ChunkedTrackStreamingService {
|
||||
List<TimeRange> ranges = strategyMap.get(strategy);
|
||||
log.info("Processing {} strategy with {} ranges", strategy, ranges.size());
|
||||
|
||||
// Daily 테이블의 경우 선박 기준 페이지네이션으로 처리 (누락 방지)
|
||||
// Daily 테이블의 경우 캐시 우선 → 페이지네이션 폴백
|
||||
if (strategy == TableStrategy.DAILY) {
|
||||
for (TimeRange range : ranges) {
|
||||
try {
|
||||
// 선박 기준 페이지네이션으로 모든 데이터 수집 후 병합
|
||||
List<CompactVesselTrack> compactTracks = processDailyTableWithPagination(request, range, null, viewportVesselIds);
|
||||
List<CompactVesselTrack> compactTracks;
|
||||
LocalDate rangeDate = range.getStart().toLocalDate();
|
||||
|
||||
// 캐시 히트 시 메모리에서 가져옴
|
||||
if (dailyTrackCacheManager.isEnabled() && dailyTrackCacheManager.isCached(rangeDate)) {
|
||||
compactTracks = dailyTrackCacheManager.getCachedTracks(rangeDate);
|
||||
log.info("Daily cache HIT for {} in processQueryInChunks: {} tracks", rangeDate, compactTracks.size());
|
||||
} else {
|
||||
compactTracks = processDailyTableWithPagination(request, range, null, viewportVesselIds);
|
||||
}
|
||||
|
||||
if (!compactTracks.isEmpty()) {
|
||||
// 메시지 크기로 분할
|
||||
@ -847,16 +858,45 @@ public class ChunkedTrackStreamingService {
|
||||
Consumer<QueryStatusUpdate> statusConsumer) {
|
||||
boolean slotAcquired = false;
|
||||
try {
|
||||
// 글로벌 동시 쿼리 슬롯 획득 (대기 큐에서 FIFO 순서로 대기)
|
||||
slotAcquired = activeQueryManager.tryAcquireQuerySlot(queryId);
|
||||
// 글로벌 동시 쿼리 슬롯 즉시 획득 시도
|
||||
slotAcquired = activeQueryManager.tryAcquireQuerySlotImmediate(queryId);
|
||||
if (!slotAcquired) {
|
||||
log.warn("Query {} rejected: global concurrent limit reached", queryId);
|
||||
statusConsumer.accept(new QueryStatusUpdate(
|
||||
queryId, "ERROR",
|
||||
"Server is busy. Maximum concurrent queries reached. Please retry later.",
|
||||
0.0
|
||||
));
|
||||
return;
|
||||
// 대기열 진입 → 2초 간격으로 QUEUED 상태 전송하며 슬롯 대기
|
||||
log.info("Query {} entering wait queue: active={}/{}", queryId,
|
||||
activeQueryManager.getGlobalActiveQueryCount(), activeQueryManager.getMaxConcurrentGlobal());
|
||||
|
||||
int maxWaitSeconds = 120; // 최대 2분 대기
|
||||
long deadline = System.currentTimeMillis() + (maxWaitSeconds * 1000L);
|
||||
activeQueryManager.getWaitingQueue().offer(queryId);
|
||||
try {
|
||||
while (System.currentTimeMillis() < deadline) {
|
||||
slotAcquired = activeQueryManager.tryAcquireQuerySlot(queryId);
|
||||
if (slotAcquired) break;
|
||||
// QUEUED 상태 전송
|
||||
int position = activeQueryManager.getQueuePosition(queryId);
|
||||
int totalInQueue = activeQueryManager.getQueueSize();
|
||||
QueryStatusUpdate queueStatus = new QueryStatusUpdate(
|
||||
queryId, "QUEUED",
|
||||
String.format("Queue position: %d/%d. Waiting for available slot...", position, totalInQueue),
|
||||
0.0
|
||||
);
|
||||
queueStatus.setQueuePosition(position);
|
||||
queueStatus.setTotalInQueue(totalInQueue);
|
||||
statusConsumer.accept(queueStatus);
|
||||
}
|
||||
} finally {
|
||||
activeQueryManager.getWaitingQueue().remove(queryId);
|
||||
}
|
||||
|
||||
if (!slotAcquired) {
|
||||
log.warn("Query {} timed out in queue after {}s", queryId, maxWaitSeconds);
|
||||
statusConsumer.accept(new QueryStatusUpdate(
|
||||
queryId, "ERROR",
|
||||
"Server is busy. Queue timeout exceeded. Please retry later.",
|
||||
0.0
|
||||
));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// 취소 플래그 등록
|
||||
@ -2328,7 +2368,7 @@ public class ChunkedTrackStreamingService {
|
||||
}
|
||||
|
||||
/**
|
||||
* Daily 전략 처리 - 페이지별 즉시 스트리밍 (타임아웃 방지)
|
||||
* Daily 전략 처리 - 캐시 우선 조회 + 페이지별 즉시 스트리밍 (타임아웃 방지)
|
||||
*/
|
||||
private void processDailyStrategy(List<TimeRange> ranges, TrackQueryRequest request, String queryId,
|
||||
Consumer<TrackChunkResponse> chunkConsumer,
|
||||
@ -2344,13 +2384,74 @@ public class ChunkedTrackStreamingService {
|
||||
log.info("Session vessel cache initialized for {} date ranges", ranges.size());
|
||||
|
||||
for (TimeRange range : ranges) {
|
||||
// 페이지별 즉시 스트리밍으로 처리 (세션 캐시 공유)
|
||||
streamDailyTableWithPagination(request, range, queryId, chunkConsumer, statusConsumer, uniqueVesselIds, sessionVesselCache, viewportVesselIds);
|
||||
LocalDate rangeDate = range.getStart().toLocalDate();
|
||||
|
||||
// 캐시 히트 체크: 해당 날짜가 인메모리 캐시에 있으면 DB 조회 생략
|
||||
if (dailyTrackCacheManager.isEnabled() && dailyTrackCacheManager.isCached(rangeDate)) {
|
||||
log.info("Daily cache HIT for {}: serving from memory", rangeDate);
|
||||
List<CompactVesselTrack> cachedTracks;
|
||||
|
||||
// 뷰포트 필터링 적용
|
||||
if (request.getViewport() != null) {
|
||||
ViewportFilter vp = request.getViewport();
|
||||
cachedTracks = dailyTrackCacheManager.getCachedTracks(
|
||||
rangeDate, vp.getMinLon(), vp.getMinLat(), vp.getMaxLon(), vp.getMaxLat());
|
||||
} else {
|
||||
cachedTracks = dailyTrackCacheManager.getCachedTracks(rangeDate);
|
||||
}
|
||||
|
||||
if (!cachedTracks.isEmpty()) {
|
||||
// viewportVesselIds 필터 적용
|
||||
if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) {
|
||||
cachedTracks = cachedTracks.stream()
|
||||
.filter(t -> viewportVesselIds.contains(t.getVesselId()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
// 메시지 크기로 분할하여 전송
|
||||
List<List<CompactVesselTrack>> batches = splitByMessageSize(cachedTracks);
|
||||
for (List<CompactVesselTrack> batch : batches) {
|
||||
sendChunkResponse(batch, queryId, chunkConsumer, uniqueVesselIds);
|
||||
}
|
||||
log.info("Daily cache served {} tracks for {} in {} batches",
|
||||
cachedTracks.size(), rangeDate, batches.size());
|
||||
}
|
||||
} else {
|
||||
// 캐시 미스: 기존 DB 페이지네이션으로 처리
|
||||
if (dailyTrackCacheManager.isEnabled()) {
|
||||
log.info("Daily cache MISS for {}: falling back to DB", rangeDate);
|
||||
}
|
||||
streamDailyTableWithPagination(request, range, queryId, chunkConsumer, statusConsumer, uniqueVesselIds, sessionVesselCache, viewportVesselIds);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Session vessel cache final size: {} vessels cached", sessionVesselCache.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* 캐시 데이터를 청크 응답으로 전송하는 헬퍼 메서드
|
||||
*/
|
||||
private void sendChunkResponse(List<CompactVesselTrack> batch, String queryId,
|
||||
Consumer<TrackChunkResponse> chunkConsumer,
|
||||
Set<String> uniqueVesselIds) {
|
||||
TrackChunkResponse response = new TrackChunkResponse();
|
||||
response.setQueryId(queryId);
|
||||
response.setChunkIndex(currentGlobalChunkIndex++);
|
||||
response.setIsLastChunk(false);
|
||||
response.setTotalChunks(-1);
|
||||
response.setCompactTracks(batch);
|
||||
|
||||
int processedMin = processedTimeRanges.values().stream().mapToInt(Integer::intValue).sum();
|
||||
response.setStats(createChunkStats(batch, uniqueVesselIds, processedMin));
|
||||
|
||||
chunkConsumer.accept(response);
|
||||
batch.forEach(track -> uniqueVesselIds.add(track.getVesselId()));
|
||||
|
||||
// 백프레셔 적용
|
||||
int batchSize = batch.stream().mapToInt(this::estimateTrackSize).sum();
|
||||
pendingBufferSize.addAndGet(batchSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Daily 테이블 페이지별 즉시 스트리밍 - 각 페이지 완료 즉시 청크 전송
|
||||
*/
|
||||
|
||||
@ -20,6 +20,7 @@ import org.springframework.stereotype.Service;
|
||||
import javax.sql.DataSource;
|
||||
import java.sql.*;
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
@ -43,6 +44,7 @@ public class StompTrackStreamingService {
|
||||
|
||||
private final ActiveQueryManager activeQueryManager;
|
||||
private final TrackQueryInterceptor trackQueryInterceptor;
|
||||
private final DailyTrackCacheManager dailyTrackCacheManager;
|
||||
|
||||
// 활성 쿼리 관리
|
||||
private final Map<String, QueryContext> activeQueries = new ConcurrentHashMap<>();
|
||||
@ -66,7 +68,8 @@ public class StompTrackStreamingService {
|
||||
TrackSimplificationStrategy simplificationStrategy,
|
||||
VesselTrackMerger vesselTrackMerger,
|
||||
ActiveQueryManager activeQueryManager,
|
||||
TrackQueryInterceptor trackQueryInterceptor) {
|
||||
TrackQueryInterceptor trackQueryInterceptor,
|
||||
DailyTrackCacheManager dailyTrackCacheManager) {
|
||||
this.queryDataSource = queryDataSource;
|
||||
this.queryJdbcTemplate = queryJdbcTemplate;
|
||||
this.areaBoundaryCache = areaBoundaryCache;
|
||||
@ -76,6 +79,7 @@ public class StompTrackStreamingService {
|
||||
this.vesselTrackMerger = vesselTrackMerger;
|
||||
this.activeQueryManager = activeQueryManager;
|
||||
this.trackQueryInterceptor = trackQueryInterceptor;
|
||||
this.dailyTrackCacheManager = dailyTrackCacheManager;
|
||||
}
|
||||
|
||||
@Async
|
||||
@ -95,16 +99,45 @@ public class StompTrackStreamingService {
|
||||
|
||||
boolean slotAcquired = false;
|
||||
try {
|
||||
// 글로벌 동시 쿼리 슬롯 획득 (대기 큐에서 FIFO 순서로 대기)
|
||||
slotAcquired = activeQueryManager.tryAcquireQuerySlot(queryId);
|
||||
// 글로벌 동시 쿼리 슬롯 즉시 획득 시도
|
||||
slotAcquired = activeQueryManager.tryAcquireQuerySlotImmediate(queryId);
|
||||
if (!slotAcquired) {
|
||||
log.warn("Query {} rejected: global concurrent limit reached", queryId);
|
||||
statusConsumer.accept(new QueryStatusUpdate(
|
||||
queryId, "ERROR",
|
||||
"Server is busy. Maximum concurrent queries reached. Please retry later.",
|
||||
0.0
|
||||
));
|
||||
return;
|
||||
// 대기열 진입 → 2초 간격으로 QUEUED 상태 전송하며 슬롯 대기
|
||||
log.info("Query {} entering wait queue: active={}/{}", queryId,
|
||||
activeQueryManager.getGlobalActiveQueryCount(), activeQueryManager.getMaxConcurrentGlobal());
|
||||
|
||||
int maxWaitSeconds = 120; // 최대 2분 대기
|
||||
long deadline = System.currentTimeMillis() + (maxWaitSeconds * 1000L);
|
||||
activeQueryManager.getWaitingQueue().offer(queryId);
|
||||
try {
|
||||
while (System.currentTimeMillis() < deadline) {
|
||||
slotAcquired = activeQueryManager.tryAcquireQuerySlot(queryId);
|
||||
if (slotAcquired) break;
|
||||
// QUEUED 상태 전송
|
||||
int position = activeQueryManager.getQueuePosition(queryId);
|
||||
int totalInQueue = activeQueryManager.getQueueSize();
|
||||
QueryStatusUpdate queueStatus = new QueryStatusUpdate(
|
||||
queryId, "QUEUED",
|
||||
String.format("Queue position: %d/%d. Waiting for available slot...", position, totalInQueue),
|
||||
0.0
|
||||
);
|
||||
queueStatus.setQueuePosition(position);
|
||||
queueStatus.setTotalInQueue(totalInQueue);
|
||||
statusConsumer.accept(queueStatus);
|
||||
}
|
||||
} finally {
|
||||
activeQueryManager.getWaitingQueue().remove(queryId);
|
||||
}
|
||||
|
||||
if (!slotAcquired) {
|
||||
log.warn("Query {} timed out in queue after {}s", queryId, maxWaitSeconds);
|
||||
statusConsumer.accept(new QueryStatusUpdate(
|
||||
queryId, "ERROR",
|
||||
"Server is busy. Queue timeout exceeded. Please retry later.",
|
||||
0.0
|
||||
));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// 입력 검증
|
||||
@ -439,12 +472,39 @@ public class StompTrackStreamingService {
|
||||
resultQueue, processedTracks, chunkIndex, totalTracks, filteredVessels);
|
||||
}
|
||||
|
||||
// 일별 데이터 처리 (DAILY)
|
||||
// 일별 데이터 처리 (DAILY) - 캐시 우선 조회
|
||||
if (strategyMap.containsKey(TableStrategy.DAILY)) {
|
||||
log.info("Processing daily data...");
|
||||
processSingleStrategy(request, TableStrategy.DAILY,
|
||||
strategyMap.get(TableStrategy.DAILY), queryId,
|
||||
resultQueue, processedTracks, chunkIndex, totalTracks, filteredVessels);
|
||||
List<TimeRange> dailyRanges = strategyMap.get(TableStrategy.DAILY);
|
||||
List<TimeRange> dbRanges = new ArrayList<>();
|
||||
|
||||
for (TimeRange range : dailyRanges) {
|
||||
LocalDate rangeDate = range.getStart().toLocalDate();
|
||||
if (dailyTrackCacheManager.isEnabled() && dailyTrackCacheManager.isCached(rangeDate)) {
|
||||
// 캐시 히트: 메모리에서 가져와서 직접 큐에 넣기
|
||||
log.info("Daily cache HIT for {} in StompStreaming", rangeDate);
|
||||
List<gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack> cachedTracks =
|
||||
dailyTrackCacheManager.getCachedTracks(rangeDate);
|
||||
|
||||
if (!cachedTracks.isEmpty()) {
|
||||
TrackChunkResponse chunk = new TrackChunkResponse();
|
||||
chunk.setQueryId(queryId);
|
||||
chunk.setChunkIndex(chunkIndex.getAndIncrement());
|
||||
chunk.setIsLastChunk(false);
|
||||
chunk.setCompactTracks(cachedTracks);
|
||||
processedTracks.addAndGet(cachedTracks.size());
|
||||
resultQueue.put(chunk);
|
||||
}
|
||||
} else {
|
||||
dbRanges.add(range);
|
||||
}
|
||||
}
|
||||
|
||||
// 캐시 미스 범위만 DB 조회
|
||||
if (!dbRanges.isEmpty()) {
|
||||
processSingleStrategy(request, TableStrategy.DAILY, dbRanges, queryId,
|
||||
resultQueue, processedTracks, chunkIndex, totalTracks, filteredVessels);
|
||||
}
|
||||
}
|
||||
|
||||
} finally {
|
||||
|
||||
@ -42,8 +42,8 @@ spring:
|
||||
connection-timeout: 5000
|
||||
idle-timeout: 600000
|
||||
max-lifetime: 1800000
|
||||
maximum-pool-size: 120 # 60 -> 120 (총 250 중 48%, WebSocket 스트리밍 + REST API 주 사용)
|
||||
minimum-idle: 20 # 10 -> 20
|
||||
maximum-pool-size: 180 # 120 -> 180 (WebSocket 대기열 + REST API 주 사용)
|
||||
minimum-idle: 30 # 20 -> 30
|
||||
connection-test-query: SELECT 1
|
||||
validation-timeout: 5000
|
||||
leak-detection-threshold: 60000 # 커넥션 누수 감지 (60초)
|
||||
@ -262,12 +262,26 @@ vessel: # spring 하위가 아닌 최상위 레벨
|
||||
t_abnormal_tracks:
|
||||
retention-months: 0 # 비정상 항적: 무한 보관
|
||||
|
||||
# 일일 항적 데이터 인메모리 캐시
|
||||
cache:
|
||||
daily-track:
|
||||
enabled: true
|
||||
retention-days: 7 # D-1 ~ D-7 (오늘 제외)
|
||||
max-memory-gb: 5 # 최대 5GB
|
||||
warmup-async: true # 비동기 워밍업 (서버 시작 차단 없음)
|
||||
|
||||
# WebSocket 부하 제어 설정
|
||||
websocket:
|
||||
query:
|
||||
max-concurrent-global: 30 # 서버 전체 동시 실행 쿼리 상한 (Query풀 120 / 쿼리당 ~3커넥션)
|
||||
max-per-session: 3 # 세션당 동시 쿼리 상한
|
||||
max-concurrent-global: 60 # 서버 전체 동시 실행 쿼리 상한 (Query풀 180 / 쿼리당 ~3커넥션)
|
||||
max-per-session: 20 # 세션당 동시 쿼리 상한 (대기열 방식이므로 넉넉하게)
|
||||
queue-timeout-seconds: 30 # 글로벌 대기 큐 타임아웃
|
||||
session:
|
||||
idle-timeout-ms: 15000 # 세션 유휴 타임아웃 15초 (60s → 15s)
|
||||
server-heartbeat-ms: 5000 # 서버 하트비트 5초 (10s → 5s)
|
||||
client-heartbeat-ms: 5000 # 클라이언트 하트비트 5초 (10s → 5s)
|
||||
sockjs-disconnect-delay-ms: 5000 # SockJS 해제 지연 5초 (30s → 5s)
|
||||
send-time-limit-seconds: 30 # 메시지 전송 시간 제한 30초 (120s → 30s)
|
||||
|
||||
# 액추에이터 설정
|
||||
management:
|
||||
|
||||
215
vessel-batch-control.sh
Normal file
215
vessel-batch-control.sh
Normal file
@ -0,0 +1,215 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Vessel Batch 관리 스크립트
|
||||
# 시작, 중지, 상태 확인 등 기본 관리 기능
|
||||
|
||||
# 애플리케이션 경로
|
||||
APP_HOME="/devdata/apps/bridge-db-monitoring"
|
||||
JAR_FILE="$APP_HOME/vessel-batch-aggregation.jar"
|
||||
PID_FILE="$APP_HOME/vessel-batch.pid"
|
||||
LOG_DIR="$APP_HOME/logs"
|
||||
|
||||
# Java 17 경로
|
||||
JAVA_HOME="/devdata/apps/jdk-17.0.8"
|
||||
JAVA_BIN="$JAVA_HOME/bin/java"
|
||||
|
||||
# 색상 코드
|
||||
RED='\033[0;31m'
|
||||
GREEN='\033[0;32m'
|
||||
YELLOW='\033[1;33m'
|
||||
NC='\033[0m'
|
||||
|
||||
# 함수: PID 확인
|
||||
get_pid() {
|
||||
if [ -f "$PID_FILE" ]; then
|
||||
PID=$(cat $PID_FILE)
|
||||
if kill -0 $PID 2>/dev/null; then
|
||||
echo $PID
|
||||
else
|
||||
rm -f $PID_FILE
|
||||
echo ""
|
||||
fi
|
||||
else
|
||||
PID=$(pgrep -f "$JAR_FILE")
|
||||
echo $PID
|
||||
fi
|
||||
}
|
||||
|
||||
# 함수: 상태 확인
|
||||
status() {
|
||||
PID=$(get_pid)
|
||||
if [ ! -z "$PID" ]; then
|
||||
echo -e "${GREEN}✓ Vessel Batch is running (PID: $PID)${NC}"
|
||||
|
||||
# 프로세스 정보
|
||||
echo ""
|
||||
ps aux | grep $PID | grep -v grep
|
||||
|
||||
# Health Check
|
||||
echo ""
|
||||
echo "Health Check:"
|
||||
curl -s http://localhost:18090/actuator/health 2>/dev/null | python -m json.tool || echo "Health endpoint not available"
|
||||
|
||||
# 처리 상태
|
||||
echo ""
|
||||
echo "Processing Status:"
|
||||
if command -v psql >/dev/null 2>&1; then
|
||||
psql -h localhost -U mda -d mdadb -c "
|
||||
SELECT
|
||||
NOW() - MAX(last_update) as processing_delay,
|
||||
COUNT(*) as vessel_count
|
||||
FROM signal.t_vessel_latest_position;" 2>/dev/null || echo "Unable to query database"
|
||||
fi
|
||||
|
||||
return 0
|
||||
else
|
||||
echo -e "${RED}✗ Vessel Batch is not running${NC}"
|
||||
return 1
|
||||
fi
|
||||
}
|
||||
|
||||
# 함수: 시작
|
||||
start() {
|
||||
PID=$(get_pid)
|
||||
if [ ! -z "$PID" ]; then
|
||||
echo -e "${YELLOW}Vessel Batch is already running (PID: $PID)${NC}"
|
||||
return 1
|
||||
fi
|
||||
|
||||
echo "Starting Vessel Batch..."
|
||||
cd $APP_HOME
|
||||
$APP_HOME/run-on-query-server-dev.sh
|
||||
}
|
||||
|
||||
# 함수: 중지
|
||||
stop() {
|
||||
PID=$(get_pid)
|
||||
if [ -z "$PID" ]; then
|
||||
echo -e "${YELLOW}Vessel Batch is not running${NC}"
|
||||
return 1
|
||||
fi
|
||||
|
||||
echo "Stopping Vessel Batch (PID: $PID)..."
|
||||
kill -15 $PID
|
||||
|
||||
# 종료 대기
|
||||
for i in {1..30}; do
|
||||
if ! kill -0 $PID 2>/dev/null; then
|
||||
echo -e "${GREEN}✓ Vessel Batch stopped successfully${NC}"
|
||||
rm -f $PID_FILE
|
||||
return 0
|
||||
fi
|
||||
echo -n "."
|
||||
sleep 1
|
||||
done
|
||||
|
||||
echo ""
|
||||
echo -e "${RED}Process did not stop gracefully, force killing...${NC}"
|
||||
kill -9 $PID
|
||||
rm -f $PID_FILE
|
||||
}
|
||||
|
||||
# 함수: 재시작
|
||||
restart() {
|
||||
echo "Restarting Vessel Batch..."
|
||||
stop
|
||||
sleep 3
|
||||
start
|
||||
}
|
||||
|
||||
# 함수: 로그 보기
|
||||
logs() {
|
||||
if [ ! -d "$LOG_DIR" ]; then
|
||||
echo "Log directory not found: $LOG_DIR"
|
||||
return 1
|
||||
fi
|
||||
|
||||
echo "Available log files:"
|
||||
ls -lh $LOG_DIR/*.log 2>/dev/null
|
||||
|
||||
echo ""
|
||||
echo "Tailing app.log (Ctrl+C to exit)..."
|
||||
tail -f $LOG_DIR/app.log
|
||||
}
|
||||
|
||||
# 함수: 최근 에러 확인
|
||||
errors() {
|
||||
if [ ! -f "$LOG_DIR/app.log" ]; then
|
||||
echo "Log file not found: $LOG_DIR/app.log"
|
||||
return 1
|
||||
fi
|
||||
|
||||
echo "Recent errors (last 50 lines with ERROR):"
|
||||
grep "ERROR" $LOG_DIR/app.log | tail -50
|
||||
|
||||
echo ""
|
||||
echo "Error summary:"
|
||||
echo "Total errors: $(grep -c "ERROR" $LOG_DIR/app.log)"
|
||||
echo "Errors today: $(grep "ERROR" $LOG_DIR/app.log | grep "$(date +%Y-%m-%d)" | wc -l)"
|
||||
}
|
||||
|
||||
# 함수: 성능 통계
|
||||
stats() {
|
||||
echo "Performance Statistics"
|
||||
echo "===================="
|
||||
|
||||
if [ -f "$LOG_DIR/resource-monitor.csv" ]; then
|
||||
echo "Recent resource usage:"
|
||||
tail -5 $LOG_DIR/resource-monitor.csv | column -t -s,
|
||||
fi
|
||||
|
||||
echo ""
|
||||
echo "Batch job statistics:"
|
||||
if command -v psql >/dev/null 2>&1; then
|
||||
psql -h localhost -U mda -d mdadb -c "
|
||||
SELECT
|
||||
job_name,
|
||||
COUNT(*) as executions,
|
||||
AVG(EXTRACT(EPOCH FROM (end_time - start_time))/60)::numeric(10,2) as avg_duration_min,
|
||||
MAX(end_time) as last_execution
|
||||
FROM batch_job_execution je
|
||||
JOIN batch_job_instance ji ON je.job_instance_id = ji.job_instance_id
|
||||
WHERE end_time > CURRENT_DATE - INTERVAL '7 days'
|
||||
GROUP BY job_name;" 2>/dev/null || echo "Unable to query batch statistics"
|
||||
fi
|
||||
}
|
||||
|
||||
# 메인 로직
|
||||
case "$1" in
|
||||
start)
|
||||
start
|
||||
;;
|
||||
stop)
|
||||
stop
|
||||
;;
|
||||
restart)
|
||||
restart
|
||||
;;
|
||||
status)
|
||||
status
|
||||
;;
|
||||
logs)
|
||||
logs
|
||||
;;
|
||||
errors)
|
||||
errors
|
||||
;;
|
||||
stats)
|
||||
stats
|
||||
;;
|
||||
*)
|
||||
echo "Usage: $0 {start|stop|restart|status|logs|errors|stats}"
|
||||
echo ""
|
||||
echo "Commands:"
|
||||
echo " start - Start the Vessel Batch application"
|
||||
echo " stop - Stop the Vessel Batch application"
|
||||
echo " restart - Restart the Vessel Batch application"
|
||||
echo " status - Check application status and health"
|
||||
echo " logs - Tail application logs"
|
||||
echo " errors - Show recent errors from logs"
|
||||
echo " stats - Show performance statistics"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
|
||||
exit $?
|
||||
불러오는 중...
Reference in New Issue
Block a user