diff --git a/docs/RELEASE-NOTES.md b/docs/RELEASE-NOTES.md index fb0795e..dea5bac 100644 --- a/docs/RELEASE-NOTES.md +++ b/docs/RELEASE-NOTES.md @@ -7,6 +7,7 @@ ### 변경 - SignalKindCode 매핑 규칙 개선 — aton/tug/tender→DEFAULT, shipName BUOY 검출 추가 - 응답 경로 signal_kind_code 치환 1회화 — 캐시 저장 시 치환, 응답 시 DB/캐시 값 직접 사용 +- ChunkedTrackStreamingService 전수 최적화 — isQueryCancelled 버그수정, QueryContext 스레드 안전성, 쿼리 메트릭 DB 저장, 데드코드 400줄 삭제, VesselInfo N+1 해소 ## [2026-03-02] diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/ActiveQueryManager.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/ActiveQueryManager.java index 6859cc8..89c0297 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/ActiveQueryManager.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/ActiveQueryManager.java @@ -316,4 +316,11 @@ public class ActiveQueryManager { public int getMaxConcurrentGlobal() { return maxConcurrentGlobal; } + + /** + * 대기열 타임아웃 (초) + */ + public int getQueueTimeoutSeconds() { + return queueTimeoutSeconds; + } } diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java index 1f1660f..445d0a0 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java @@ -7,7 +7,6 @@ import gc.mda.signal_batch.global.websocket.interceptor.TrackQueryInterceptor; import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack; import gc.mda.signal_batch.global.websocket.dto.TrackQueryRequest; import gc.mda.signal_batch.global.websocket.dto.ChunkStats; -import gc.mda.signal_batch.domain.vessel.service.simplification.TrackSimplificationStrategy; import gc.mda.signal_batch.domain.vessel.service.simplification.TrackSimplificationStrategy.SimplificationLevel; import lombok.extern.slf4j.Slf4j; import org.locationtech.jts.geom.Coordinate; @@ -26,13 +25,11 @@ import java.time.LocalDateTime; import java.time.temporal.ChronoUnit; import java.time.format.DateTimeFormatter; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.stream.Collectors; import java.util.function.Consumer; import gc.mda.signal_batch.global.websocket.dto.QueryStatusUpdate; import gc.mda.signal_batch.global.websocket.dto.ViewportFilter; +import gc.mda.signal_batch.monitoring.service.QueryMetricsService; import org.springframework.scheduling.annotation.Async; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -51,16 +48,13 @@ public class ChunkedTrackStreamingService { private final JdbcTemplate queryJdbcTemplate; private final DataSource queryDataSource; - @SuppressWarnings("unused") - private final TrackSimplificationStrategy simplificationStrategy; private final ActiveQueryManager activeQueryManager; private final TrackQueryInterceptor trackQueryInterceptor; private final DailyTrackCacheManager dailyTrackCacheManager; private final CacheTrackSimplifier cacheTrackSimplifier; private final TrackMemoryBudgetManager memoryBudgetManager; - private final WKTReader wktReader = new WKTReader(); - @SuppressWarnings("unused") - private final ExecutorService executorService = Executors.newFixedThreadPool(10); + private final QueryMetricsService queryMetricsService; + private static final ThreadLocal wktReaderLocal = ThreadLocal.withInitial(WKTReader::new); private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private static final int MAX_TRACKS_PER_CHUNK = 500000; // 청크당 최대 트랙 수 (10만 선박 지원) private static final int MAX_MESSAGE_SIZE_KB = 1024; // 메시지당 최대 크기 1MB @@ -72,19 +66,30 @@ public class ChunkedTrackStreamingService { private static final long VESSEL_CACHE_TTL = 3600_000; // 1시간 private volatile long lastCacheCleanup = System.currentTimeMillis(); - // 진행률 추적용 변수 - private int estimatedTotalMinutes = 0; - private long queryStartTime = 0; - private final Map processedTimeRanges = new HashMap<>(); + /** + * 쿼리별 상태 컨텍스트 — 싱글턴 서비스에서 동시 쿼리 간 상태 교차 오염 방지 + */ + private static class QueryContext { + final String queryId; + int estimatedTotalMinutes = 0; + long queryStartTime = System.currentTimeMillis(); + final Map processedTimeRanges = new HashMap<>(); + int currentGlobalChunkIndex = 0; + int vesselLogCount = 0; + + QueryContext(String queryId) { + this.queryId = queryId; + } + + int getCurrentChunkIndex() { + return currentGlobalChunkIndex; + } + } private final AtomicLong pendingBufferSize = new AtomicLong(0); private static final long MAX_PENDING_BUFFER = 50 * 1024 * 1024; // 50MB - @SuppressWarnings("unused") - private static final long WARNING_BUFFER_THRESHOLD = 40 * 1024 * 1024; // 40MB (80%) // 백프레셔 관련 변수 private final Map queryMetrics = new ConcurrentHashMap<>(); - @SuppressWarnings("unused") - private volatile int currentChunkSizeKB = MAX_MESSAGE_SIZE_KB; // 쿼리별 취소 플래그 관리 private final ConcurrentHashMap queryCancelFlags = new ConcurrentHashMap<>(); @@ -94,20 +99,20 @@ public class ChunkedTrackStreamingService { public ChunkedTrackStreamingService( @Qualifier("queryJdbcTemplate") JdbcTemplate queryJdbcTemplate, @Qualifier("queryDataSource") DataSource queryDataSource, - TrackSimplificationStrategy simplificationStrategy, ActiveQueryManager activeQueryManager, TrackQueryInterceptor trackQueryInterceptor, DailyTrackCacheManager dailyTrackCacheManager, CacheTrackSimplifier cacheTrackSimplifier, - TrackMemoryBudgetManager memoryBudgetManager) { + TrackMemoryBudgetManager memoryBudgetManager, + QueryMetricsService queryMetricsService) { this.queryJdbcTemplate = queryJdbcTemplate; this.queryDataSource = queryDataSource; - this.simplificationStrategy = simplificationStrategy; this.activeQueryManager = activeQueryManager; this.trackQueryInterceptor = trackQueryInterceptor; this.dailyTrackCacheManager = dailyTrackCacheManager; this.cacheTrackSimplifier = cacheTrackSimplifier; this.memoryBudgetManager = memoryBudgetManager; + this.queryMetricsService = queryMetricsService; } /** @@ -268,60 +273,6 @@ public class ChunkedTrackStreamingService { log.info("Vessel cache cleanup: {} -> {} entries", before, after); } - /** - * 선박 정보 배치 조회 - */ - @SuppressWarnings("unused") - private Map batchGetVesselInfo(Set vesselIds) { - Map result = new HashMap<>(); - List uncachedIds = new ArrayList<>(); - - // 캐시 확인 - for (String vesselId : vesselIds) { - VesselInfo cached = vesselInfoCache.get(vesselId); - if (cached != null && !cached.isExpired()) { - result.put(vesselId, cached); - } else { - uncachedIds.add(vesselId); - } - } - - // 캐시에 없는 것들은 DB에서 배치 조회 - if (!uncachedIds.isEmpty()) { - try { - String sql = "SELECT mmsi, ship_nm, vessel_type, signal_kind_code " + - "FROM signal.t_ais_position " + - "WHERE mmsi IN (" + - String.join(",", Collections.nCopies(uncachedIds.size(), "?")) + ")"; - - queryJdbcTemplate.query(sql, rs -> { - String vesselId = rs.getString("mmsi"); - VesselInfo info = new VesselInfo( - rs.getString("ship_nm"), - rs.getString("vessel_type"), - rs.getString("signal_kind_code") - ); - result.put(vesselId, info); - vesselInfoCache.put(vesselId, info); - }, uncachedIds.toArray()); - - log.info("Batch loaded {} vessel infos from DB", result.size() - (vesselIds.size() - uncachedIds.size())); - } catch (Exception e) { - log.warn("Failed to batch load vessel info: {}", e.getMessage()); - // 기본값 설정 - for (String vesselId : uncachedIds) { - if (!result.containsKey(vesselId)) { - VesselInfo defaultInfo = new VesselInfo(null, null, null); - result.put(vesselId, defaultInfo); - vesselInfoCache.put(vesselId, defaultInfo); - } - } - } - } - - return result; - } - /** * 선박 정보 배치 프리로드 - 세션 캐시 사용 버전 * 순서: 세션 캐시 → 전역 캐시 → DB 조회 @@ -555,14 +506,19 @@ public class ChunkedTrackStreamingService { } private List processTableRange(TrackQueryRequest request, TableStrategy strategy, TimeRange range) { - return processTableRange(request, strategy, range, null, null); + return processTableRange(request, strategy, range, null, null, null); } private List processTableRange(TrackQueryRequest request, TableStrategy strategy, TimeRange range, String sessionId) { - return processTableRange(request, strategy, range, sessionId, null); + return processTableRange(request, strategy, range, sessionId, null, null); } private List processTableRange(TrackQueryRequest request, TableStrategy strategy, TimeRange range, String sessionId, Set viewportVesselIds) { + return processTableRange(request, strategy, range, sessionId, viewportVesselIds, null); + } + + private List processTableRange(TrackQueryRequest request, TableStrategy strategy, TimeRange range, + String sessionId, Set viewportVesselIds, String queryId) { Map vesselMap = new HashMap<>(20000); // 예상 선박 수 String tableName = strategy.getTableName(); @@ -606,10 +562,10 @@ public class ChunkedTrackStreamingService { while (rs.next() && trackCount < MAX_TRACKS_PER_CHUNK) { // 세션 연결 끊김 체크 (1000개마다) - if (sessionId != null && trackCount % 1000 == 0) { - if (isQueryCancelled(sessionId)) { - log.warn("Query cancelled by disconnect: sessionId={}, table={}, trackCount={}", - sessionId, tableName, trackCount); + if (trackCount % 1000 == 0 && (queryId != null || sessionId != null)) { + if (isQueryCancelled(queryId, sessionId)) { + log.warn("Query cancelled: queryId={}, sessionId={}, table={}, trackCount={}", + queryId, sessionId, tableName, trackCount); return Collections.emptyList(); } } @@ -647,7 +603,7 @@ public class ChunkedTrackStreamingService { if (trackGeomWkt != null && !trackGeomWkt.isEmpty() && !"LINESTRING EMPTY".equals(trackGeomWkt)) { try { - LineString lineString = (LineString) wktReader.read(trackGeomWkt); + LineString lineString = (LineString) wktReaderLocal.get().read(trackGeomWkt); // 빈 LineString은 건너뛰되, 선박 객체는 이미 생성됨 if (lineString.getNumPoints() == 0) { @@ -739,32 +695,7 @@ public class ChunkedTrackStreamingService { VesselAccumulator acc = entry.getValue(); // 평균속도 계산 (전체 궤적 기반) - double avgSpeed = 0.0; - if (acc.totalDistance > 0 && acc.timestamps.size() > 1) { - String firstTs = acc.timestamps.get(0); - String lastTs = acc.timestamps.get(acc.timestamps.size() - 1); - - LocalDateTime startTime; - LocalDateTime endTime; - - // Unix timestamp 감지 (10자리 이상 숫자) - if (firstTs.matches("\\d{10,}")) { - startTime = LocalDateTime.ofInstant( - java.time.Instant.ofEpochSecond(Long.parseLong(firstTs)), - java.time.ZoneId.systemDefault()); - endTime = LocalDateTime.ofInstant( - java.time.Instant.ofEpochSecond(Long.parseLong(lastTs)), - java.time.ZoneId.systemDefault()); - } else { - startTime = LocalDateTime.parse(firstTs, TIMESTAMP_FORMATTER); - endTime = LocalDateTime.parse(lastTs, TIMESTAMP_FORMATTER); - } - - double hours = Duration.between(startTime, endTime).toMinutes() / 60.0; - if (hours > 0) { - avgSpeed = acc.totalDistance / hours; - } - } + double avgSpeed = calculateAvgSpeed(acc.totalDistance, acc.timestamps); return CompactVesselTrack.builder() .vesselId(vesselId) @@ -784,156 +715,6 @@ public class ChunkedTrackStreamingService { .collect(Collectors.toList()); } - /** - * 쿼리를 테이블 전략별로 처리 - */ - public List processQueryInChunks(TrackQueryRequest request, String queryId) { - log.info("Processing chunked query: {}", queryId); - queryStartTime = System.currentTimeMillis(); - processedTimeRanges.clear(); - - // 시간 범위별 테이블 전략 분할 - Map> strategyMap = splitTimeRangeByStrategy( - request.getStartTime(), request.getEndTime() - ); - - log.info("Query {} using strategies: {}", queryId, strategyMap.keySet()); - - // 2-pass 뷰포트 필터링: 전체 시간 범위에서 뷰포트 교차 선박 수집 - Set viewportVesselIds = collectViewportVesselIds(strategyMap, request); - - List responses = new ArrayList<>(); - int globalChunkIndex = 0; - Set uniqueVesselIds = new HashSet<>(); - - // 테이블 전략별 처리 (daily → hourly → 5min 순서) - for (TableStrategy strategy : new TableStrategy[]{TableStrategy.DAILY, TableStrategy.HOURLY, TableStrategy.FIVE_MINUTE}) { - if (!strategyMap.containsKey(strategy)) continue; - - List ranges = strategyMap.get(strategy); - log.info("Processing {} strategy with {} ranges", strategy, ranges.size()); - - // Daily 테이블의 경우 캐시 우선 → 페이지네이션 폴백 - if (strategy == TableStrategy.DAILY) { - for (TimeRange range : ranges) { - try { - List compactTracks; - LocalDate rangeDate = range.getStart().toLocalDate(); - - // 캐시 히트 시 메모리에서 가져옴 - if (dailyTrackCacheManager.isEnabled() && dailyTrackCacheManager.isCached(rangeDate)) { - // viewportVesselIds가 있으면 vessel ID 필터만 (공간 필터 재적용 금지) - if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) { - compactTracks = dailyTrackCacheManager.getCachedTracks(rangeDate).stream() - .filter(t -> viewportVesselIds.contains(t.getVesselId())) - .collect(Collectors.toList()); - } else { - compactTracks = dailyTrackCacheManager.getCachedTracks(rangeDate); - } - log.info("Daily cache HIT for {} in processQueryInChunks: {} tracks", rangeDate, compactTracks.size()); - } else { - compactTracks = processDailyTableWithPagination(request, range, null, viewportVesselIds); - } - - if (!compactTracks.isEmpty()) { - // 메시지 크기로 분할 - List> batches = splitByMessageSize(compactTracks); - for (List batch : batches) { - TrackChunkResponse response = new TrackChunkResponse(); - response.setQueryId(queryId); - response.setChunkIndex(globalChunkIndex++); - response.setIsLastChunk(false); // 나중에 설정 - response.setTotalChunks(-1); // 마지막 청크에서 설정 - response.setCompactTracks(batch); - // 처리된 시간 계산 - String rangeKey = "daily_" + range.getStart(); - processedTimeRanges.put(rangeKey, (int)Duration.between(range.getStart(), range.getEnd()).toMinutes()); - int processedMinutes = processedTimeRanges.values().stream().mapToInt(Integer::intValue).sum(); - response.setStats(createChunkStats(batch, uniqueVesselIds, processedMinutes)); - responses.add(response); - - // 유니크 선박 추가 - batch.forEach(track -> uniqueVesselIds.add(track.getVesselId())); - - // 배치 크기 로그 (Daily에도 추가) - int batchSize = batch.stream().mapToInt(t -> estimateTrackSize(t)).sum() / 1024; - log.debug("[{}] Batch {} size: {}KB, tracks: {}", - strategy, response.getChunkIndex(), batchSize, batch.size()); - } - } - } catch (Exception e) { - log.error("Error processing daily range {}: {}", range, e.getMessage()); - } - } - } else { - // Hourly/5min 테이블은 청크로 분할 - List chunks = divideRangesIntoChunks(ranges, strategy); - - for (TimeChunk chunk : chunks) { - try { - List compactTracks = processTableRange(request, strategy, - new TimeRange(chunk.start, chunk.end), null, viewportVesselIds); - - if (!compactTracks.isEmpty()) { - // 메시지 크기로 분할 (5min/hourly에도 적용) - List> batches = splitByMessageSize(compactTracks); - for (List batch : batches) { - TrackChunkResponse response = new TrackChunkResponse(); - response.setQueryId(queryId); - response.setChunkIndex(globalChunkIndex++); - response.setIsLastChunk(false); - response.setTotalChunks(-1); // 마짉 청크에서 설정 - response.setCompactTracks(batch); - // 처리된 시간 업데이트 - String chunkKey = strategy + "_" + chunk.start; - processedTimeRanges.put(chunkKey, (int)Duration.between(chunk.start, chunk.end).toMinutes()); - int processedMin = processedTimeRanges.values().stream().mapToInt(Integer::intValue).sum(); - response.setStats(createChunkStats(batch, uniqueVesselIds, processedMin)); - responses.add(response); - - // 유니크 선박 추가 - batch.forEach(track -> uniqueVesselIds.add(track.getVesselId())); - - // 배치 크기 로그 - int batchSize = batch.stream().mapToInt(t -> estimateTrackSize(t)).sum() / 1024; - log.debug("[{}] Batch {} size: {}KB, tracks: {}", - strategy, response.getChunkIndex(), batchSize, batch.size()); - } - } - } catch (Exception e) { - log.error("Error processing chunk {}: {}", chunk, e.getMessage()); - } - } - } - } - - // 마지막 청크 표시 - if (!responses.isEmpty()) { - responses.get(responses.size() - 1).setIsLastChunk(true); - - // 총 선박 수 계산 - responses.forEach(response -> { - if (response.getCompactTracks() != null) { - response.getCompactTracks().forEach(track -> uniqueVesselIds.add(track.getVesselId())); - } - }); - log.info("Query {} completed: Total {} chunks, {} unique vessels processed", - queryId, responses.size(), uniqueVesselIds.size()); - } - - // 전체 응답 크기 체크 - long totalResponseSize = responses.stream() - .mapToLong(r -> r.getCompactTracks().stream() - .mapToInt(t -> estimateTrackSize(t)) - .sum()) - .sum(); - - log.info("Query {} total response size: {} MB across {} chunks", - queryId, totalResponseSize / (1024 * 1024), responses.size()); - - return responses; - } - /** * 비동기 스트리밍 메서드 (컨트롤러에서 호출) */ @@ -944,6 +725,11 @@ public class ChunkedTrackStreamingService { Consumer chunkConsumer, Consumer statusConsumer) { boolean slotAcquired = false; + QueryBenchmark benchmark = null; + QueryContext ctx = null; + String queryStatus = "ERROR"; + Set uniqueVesselIds = new HashSet<>(); + int globalChunkIndex = 0; try { // 글로벌 동시 쿼리 슬롯 즉시 획득 시도 slotAcquired = activeQueryManager.tryAcquireQuerySlotImmediate(queryId); @@ -952,7 +738,7 @@ public class ChunkedTrackStreamingService { log.info("Query {} entering wait queue: active={}/{}", queryId, activeQueryManager.getGlobalActiveQueryCount(), activeQueryManager.getMaxConcurrentGlobal()); - int maxWaitSeconds = 120; // 최대 2분 대기 + int maxWaitSeconds = activeQueryManager.getQueueTimeoutSeconds(); long deadline = System.currentTimeMillis() + (maxWaitSeconds * 1000L); activeQueryManager.getWaitingQueue().offer(queryId); try { @@ -1019,12 +805,14 @@ public class ChunkedTrackStreamingService { AtomicBoolean cancelFlag = new AtomicBoolean(false); queryCancelFlags.put(queryId, cancelFlag); + // 쿼리별 상태 컨텍스트 생성 + ctx = new QueryContext(queryId); + log.info("Starting chunked streaming for query: {}", queryId); - queryStartTime = System.currentTimeMillis(); - processedTimeRanges.clear(); + ctx.queryStartTime = System.currentTimeMillis(); // [BENCHMARK] 벤치마크 지표 초기화 - QueryBenchmark benchmark = new QueryBenchmark(); + benchmark = new QueryBenchmark(); benchmark.zoomLevel = request.getZoomLevel(); // 백프레셔 메트릭스 초기화 @@ -1041,15 +829,20 @@ public class ChunkedTrackStreamingService { ); // 전체 시간 계산 - estimatedTotalMinutes = (int)Duration.between(request.getStartTime(), request.getEndTime()).toMinutes(); - log.info("Total time range: {} minutes", estimatedTotalMinutes); + ctx.estimatedTotalMinutes = (int)Duration.between(request.getStartTime(), request.getEndTime()).toMinutes(); + log.info("Total time range: {} minutes", ctx.estimatedTotalMinutes); // 2-pass 뷰포트 필터링: 전체 시간 범위에서 뷰포트 교차 선박 수집 Set viewportVesselIds = collectViewportVesselIds(strategyMap, request, benchmark); // [BENCHMARK] - int globalChunkIndex = 0; + // VesselInfo N+1 방지: 뷰포트 내 전체 MMSI를 배치 프리로드 + Map sessionVesselCache = new HashMap<>(); + if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) { + preloadVesselInfoWithSessionCache(viewportVesselIds, sessionVesselCache, benchmark); + } + + globalChunkIndex = 0; int totalVessels = 0; - Set uniqueVesselIds = new HashSet<>(); // 테이블 전략별 처리 (daily → hourly → 5min 순서) for (TableStrategy strategy : new TableStrategy[]{TableStrategy.DAILY, TableStrategy.HOURLY, TableStrategy.FIVE_MINUTE}) { @@ -1058,6 +851,7 @@ public class ChunkedTrackStreamingService { // 취소 확인 if (cancelFlag.get()) { log.info("Query {} cancelled before processing strategy {}", queryId, strategy); + queryStatus = "CANCELLED"; statusConsumer.accept(new QueryStatusUpdate(queryId, "CANCELLED", "Query cancelled by user", 0.0)); return; } @@ -1068,8 +862,8 @@ public class ChunkedTrackStreamingService { if (strategy == TableStrategy.DAILY) { // Daily는 기존 방식 유지 (이미 일 단위) processDailyStrategy(ranges, request, queryId, chunkConsumer, statusConsumer, - globalChunkIndex, uniqueVesselIds, viewportVesselIds, benchmark); - globalChunkIndex = getCurrentChunkIndex(); + globalChunkIndex, uniqueVesselIds, viewportVesselIds, benchmark, ctx); + globalChunkIndex = ctx.getCurrentChunkIndex(); } else { // Hourly/5min은 6시간 단위로 그룹화하여 처리 Map> timeGroups = groupRangesByTimeWindow(ranges, 6); @@ -1091,7 +885,7 @@ public class ChunkedTrackStreamingService { } List compactTracks = processTableRangeWithBaseTime( - request, strategy, range, baseTime, viewportVesselIds); + request, strategy, range, baseTime, viewportVesselIds, queryId, ctx); if (benchmark != null) benchmark.connHourly5min++; // [BENCHMARK] // 선박별로 볕합 @@ -1131,32 +925,7 @@ public class ChunkedTrackStreamingService { String vesselId = entry.getKey(); VesselAccumulator acc = entry.getValue(); - double avgSpeed = 0.0; - if (acc.totalDistance > 0 && acc.timestamps.size() > 1) { - String firstTs = acc.timestamps.get(0); - String lastTs = acc.timestamps.get(acc.timestamps.size() - 1); - - LocalDateTime start; - LocalDateTime end; - - // Unix timestamp 감지 (10자리 이상 숫자) - if (firstTs.matches("\\d{10,}")) { - start = LocalDateTime.ofInstant( - java.time.Instant.ofEpochSecond(Long.parseLong(firstTs)), - java.time.ZoneId.systemDefault()); - end = LocalDateTime.ofInstant( - java.time.Instant.ofEpochSecond(Long.parseLong(lastTs)), - java.time.ZoneId.systemDefault()); - } else { - start = LocalDateTime.parse(firstTs, TIMESTAMP_FORMATTER); - end = LocalDateTime.parse(lastTs, TIMESTAMP_FORMATTER); - } - - double hours = Duration.between(start, end).toMinutes() / 60.0; - if (hours > 0) { - avgSpeed = acc.totalDistance / hours; - } - } + double avgSpeed = calculateAvgSpeed(acc.totalDistance, acc.timestamps); return CompactVesselTrack.builder() .vesselId(vesselId) @@ -1194,11 +963,11 @@ public class ChunkedTrackStreamingService { response.setCompactTracks(batch); // 처리된 시간 추가 String timeKey = groupKey + "_" + strategy; - processedTimeRanges.put(timeKey, groupRanges.stream() + ctx.processedTimeRanges.put(timeKey, groupRanges.stream() .mapToInt(r -> (int)Duration.between(r.getStart(), r.getEnd()).toMinutes()) .sum()); - int currentProcessedMin = processedTimeRanges.values().stream().mapToInt(Integer::intValue).sum(); - response.setStats(createChunkStats(batch, uniqueVesselIds, currentProcessedMin)); + int currentProcessedMin = ctx.processedTimeRanges.values().stream().mapToInt(Integer::intValue).sum(); + response.setStats(createChunkStats(batch, uniqueVesselIds, currentProcessedMin, ctx)); // 버퍼 크기 계산 및 추가 int chunkSize = batch.stream().mapToInt(t -> estimateTrackSize(t)).sum(); @@ -1253,7 +1022,7 @@ public class ChunkedTrackStreamingService { batch.forEach(track -> uniqueVesselIds.add(track.getVesselId())); // 진행률 업데이트 (시간 기반만 사용) - double timeProgress = (double) currentProcessedMin / estimatedTotalMinutes * 100; + double timeProgress = (double) currentProcessedMin / ctx.estimatedTotalMinutes * 100; statusConsumer.accept(new QueryStatusUpdate( queryId, @@ -1295,18 +1064,20 @@ public class ChunkedTrackStreamingService { lastChunkMarker.setTotalChunks(globalChunkIndex); lastChunkMarker.setIsLastChunk(true); lastChunkMarker.setCompactTracks(new ArrayList<>()); - int finalProcessedMin = processedTimeRanges.values().stream().mapToInt(Integer::intValue).sum(); - lastChunkMarker.setStats(createChunkStats(new ArrayList<>(), uniqueVesselIds, finalProcessedMin)); + int finalProcessedMin = ctx.processedTimeRanges.values().stream().mapToInt(Integer::intValue).sum(); + lastChunkMarker.setStats(createChunkStats(new ArrayList<>(), uniqueVesselIds, finalProcessedMin, ctx)); chunkConsumer.accept(lastChunkMarker); } // [BENCHMARK] 벤치마크 JSON 기록 - long totalElapsedMs = System.currentTimeMillis() - queryStartTime; - benchmark.totalBatches = globalChunkIndex; - try { - benchmarkLog.info(benchmark.toJson(queryId, uniqueVesselIds.size(), totalElapsedMs)); - } catch (Exception e) { - log.debug("Failed to write benchmark log: {}", e.getMessage()); + long totalElapsedMs = System.currentTimeMillis() - ctx.queryStartTime; + if (benchmark != null) { + benchmark.totalBatches = globalChunkIndex; + try { + benchmarkLog.info(benchmark.toJson(queryId, uniqueVesselIds.size(), totalElapsedMs)); + } catch (Exception e) { + log.debug("Failed to write benchmark log: {}", e.getMessage()); + } } log.info("Query {} completed: {} chunks, {} unique vessels", @@ -1329,6 +1100,7 @@ public class ChunkedTrackStreamingService { } // 완료 상태 + queryStatus = "COMPLETED"; statusConsumer.accept(new QueryStatusUpdate( queryId, "COMPLETED", @@ -1354,9 +1126,42 @@ public class ChunkedTrackStreamingService { 0.0 )); } finally { + // 쿼리 메트릭 DB 비동기 저장 + if (benchmark != null && ctx != null) { + BackpressureMetrics bpMetrics = queryMetrics.get(queryId); + ViewportFilter vp = request.getViewport(); + String vpBounds = (vp != null && vp.isValid()) + ? String.format("%.4f,%.4f,%.4f,%.4f", vp.getMinLon(), vp.getMinLat(), vp.getMaxLon(), vp.getMaxLat()) + : null; + queryMetricsService.saveAsync(QueryMetricsService.QueryMetric.builder() + .queryId(queryId) + .sessionId(sessionId) + .queryType("WEBSOCKET") + .startTime(request.getStartTime()) + .endTime(request.getEndTime()) + .zoomLevel(request.getZoomLevel()) + .viewportBounds(vpBounds) + .requestedMmsi(request.getVesselIds() != null ? request.getVesselIds().size() : 0) + .dataPath(benchmark.determinePath()) + .cacheHitDays(benchmark.cacheHitDays) + .dbQueryDays(benchmark.dbQueryDays) + .dbConnTotal(benchmark.dbConnectionTotal()) + .uniqueVessels(uniqueVesselIds.size()) + .totalTracks(benchmark.totalTracks) + .totalPoints(benchmark.totalPointsBefore) + .pointsAfterSimplify(benchmark.totalPointsAfter) + .totalChunks(benchmark.totalBatches) + .responseBytes(bpMetrics != null ? bpMetrics.totalBytes.get() : 0) + .elapsedMs(System.currentTimeMillis() - ctx.queryStartTime) + .dbQueryMs(benchmark.dbQueryTimeMs) + .simplifyMs(benchmark.simplifyTimeMs) + .backpressureEvents(bpMetrics != null ? bpMetrics.backpressureEvents.get() : 0) + .status(queryStatus) + .build()); + } + // 리소스 반환 (순서: 메모리 예산 → 데이터 정리 → 취소 플래그 → 글로벌 슬롯 → 세션 카운트 → 메트릭스) memoryBudgetManager.releaseQueryMemory(queryId); - processedTimeRanges.clear(); // 메모리 즉시 해제: 처리 시간 범위 맵 queryCancelFlags.remove(queryId); if (slotAcquired) { activeQueryManager.releaseQuerySlot(queryId); @@ -1366,8 +1171,6 @@ public class ChunkedTrackStreamingService { cleanupQueryMetrics(queryId); // Humongous 영역 조기 회수: G1GC에서 8MB+ 객체는 Young GC로 회수 불가 - // Concurrent Mark → Mixed GC 사이클이 필요하므로, 대규모 해제 후 GC 힌트 제공 - // -XX:+ExplicitGCInvokesConcurrent 플래그와 함께 사용 시 STW 없이 concurrent GC 트리거 if (activeQueryManager.isHeapPressureHigh()) { log.info("Query {} triggering concurrent GC for Humongous reclaim (session={})", queryId, sessionId); System.gc(); @@ -1655,163 +1458,6 @@ public class ChunkedTrackStreamingService { .collect(Collectors.toList()); } - /** - * 범위를 청크로 분할 - */ - private List divideRangesIntoChunks(List ranges, TableStrategy strategy) { - List chunks = new ArrayList<>(); - - for (TimeRange range : ranges) { - if (strategy == TableStrategy.HOURLY) { - // Hourly는 3시간 단위로 분할 - LocalDateTime chunkStart = range.getStart(); - while (chunkStart.isBefore(range.getEnd())) { - LocalDateTime chunkEnd = chunkStart.plusHours(3); - if (chunkEnd.isAfter(range.getEnd())) { - chunkEnd = range.getEnd(); - } - chunks.add(new TimeChunk(chunkStart, chunkEnd)); - chunkStart = chunkEnd; - } - } else { - // 5min은 30분 단위로 분할 - LocalDateTime chunkStart = range.getStart(); - while (chunkStart.isBefore(range.getEnd())) { - LocalDateTime chunkEnd = chunkStart.plusMinutes(30); - if (chunkEnd.isAfter(range.getEnd())) { - chunkEnd = range.getEnd(); - } - chunks.add(new TimeChunk(chunkStart, chunkEnd)); - chunkStart = chunkEnd; - } - } - } - - return chunks; - } - - /** - * 청크 처리 - 시간 범위별 테이블 전략 사용 - */ - @SuppressWarnings("unused") - private List processChunk(TrackQueryRequest request, TimeChunk chunk) { - Map vesselMap = new HashMap<>(); - - // 시간 범위별 테이블 전략 분할 - Map> strategyMap = splitTimeRangeByStrategy( - chunk.start, chunk.end - ); - - // 각 테이블 전략별로 처리 - for (Map.Entry> entry : strategyMap.entrySet()) { - TableStrategy strategy = entry.getKey(); - List ranges = entry.getValue(); - - for (TimeRange range : ranges) { - List tracks = processTableRange(request, strategy, range); - // 선박별로 병합 - for (CompactVesselTrack track : tracks) { - CompactVesselTrack.CompactVesselTrackBuilder builder = vesselMap.get(track.getVesselId()); - if (builder == null) { - builder = CompactVesselTrack.builder() - .vesselId(track.getVesselId()) - .nationalCode(track.getNationalCode()) - .geometry(new ArrayList<>()) - .timestamps(new ArrayList<>()) - .speeds(new ArrayList<>()) - .totalDistance(0.0) - .avgSpeed(0.0) - .maxSpeed(0.0) - .pointCount(0); - vesselMap.put(track.getVesselId(), builder); - } - - // 데이터 병합 - builder.geometry(mergeGeometry(builder.build().getGeometry(), track.getGeometry())); - builder.timestamps(mergeTimestamps(builder.build().getTimestamps(), track.getTimestamps())); - builder.speeds(mergeSpeeds(builder.build().getSpeeds(), track.getSpeeds())); - builder.totalDistance(builder.build().getTotalDistance() + track.getTotalDistance()); - builder.maxSpeed(Math.max(builder.build().getMaxSpeed(), track.getMaxSpeed())); - builder.pointCount(builder.build().getPointCount() + track.getPointCount()); - } - } - } - - // 평균 속도 계산 및 최종 변환 - return vesselMap.values().stream() - .map(builder -> { - CompactVesselTrack track = builder.build(); - if (track.getTotalDistance() > 0 && track.getTimestamps().size() > 1) { - // Unix timestamp 지원 - String firstTs = track.getTimestamps().get(0); - String lastTs = track.getTimestamps().get(track.getTimestamps().size() - 1); - - LocalDateTime startTime; - LocalDateTime endTime; - - // Unix timestamp 감지 (10자리 이상 숫자) - if (firstTs.matches("\\d{10,}")) { - startTime = LocalDateTime.ofInstant( - java.time.Instant.ofEpochSecond(Long.parseLong(firstTs)), - java.time.ZoneId.systemDefault()); - endTime = LocalDateTime.ofInstant( - java.time.Instant.ofEpochSecond(Long.parseLong(lastTs)), - java.time.ZoneId.systemDefault()); - } else { - startTime = LocalDateTime.parse(firstTs, TIMESTAMP_FORMATTER); - endTime = LocalDateTime.parse(lastTs, TIMESTAMP_FORMATTER); - } - - double hours = java.time.Duration.between(startTime, endTime).toMinutes() / 60.0; - if (hours > 0) { - track.setAvgSpeed(track.getTotalDistance() / hours); - } - } - return track; - }) - .collect(Collectors.toList()); - } - - - /** - * 테이블 선택 (집계 지연 고려) - */ - @SuppressWarnings("unused") - private String selectTableByTimeRange(LocalDateTime start, LocalDateTime end) { - LocalDateTime now = LocalDateTime.now(); - - // Daily 데이터는 매일 01:00에 집계 (전일 데이터) - LocalDateTime lastDailyAggregation = now.toLocalDate().atTime(1, 0); - if (now.isBefore(lastDailyAggregation)) { - lastDailyAggregation = lastDailyAggregation.minusDays(1); - } - - // Hourly 데이터는 매시 10분에 집계 (이전 시간 데이터) - LocalDateTime lastHourlyAggregation = now.truncatedTo(java.time.temporal.ChronoUnit.HOURS).plusMinutes(10); - if (now.isBefore(lastHourlyAggregation)) { - lastHourlyAggregation = lastHourlyAggregation.minusHours(1); - } - - // 조회 시간이 완전히 daily 집계 완료 범위에 있으면 - if (end.isBefore(lastDailyAggregation.minusDays(1))) { - return "signal.t_vessel_tracks_daily"; - } - - // 조회 시간이 완전히 hourly 집계 완료 범위에 있고 1일 이상이면 - if (end.isBefore(lastHourlyAggregation.minusHours(1)) && - java.time.Duration.between(start, end).toHours() >= 24) { - return "signal.t_vessel_tracks_daily"; - } - - // 조회 시간이 완전히 hourly 집계 완료 범위에 있으면 - if (end.isBefore(lastHourlyAggregation.minusHours(1))) { - return "signal.t_vessel_tracks_hourly"; - } - - // 그 외의 경우 5분 데이터 사용 - return "signal.t_vessel_tracks_5min"; - } - /** * 쿼리 생성 (간소화 적용, 2-pass 뷰포트 필터링 지원) */ @@ -1959,9 +1605,9 @@ public class ChunkedTrackStreamingService { String currentMmsi = null; while (rs.next()) { - // 세션 연결 끊김 체크 (1000개마다) + // 쿼리 취소 체크 (1000개마다) if (sessionId != null && pageTrackCount % 1000 == 0) { - if (isQueryCancelled(sessionId)) { + if (isQueryCancelled(null, sessionId)) { log.warn("Query cancelled during daily pagination: sessionId={}, page={}, trackCount={}", sessionId, pageNum, totalTrackCount); return Collections.emptyList(); @@ -1996,7 +1642,7 @@ public class ChunkedTrackStreamingService { if (trackGeomWkt != null && !trackGeomWkt.isEmpty() && !"LINESTRING EMPTY".equals(trackGeomWkt)) { try { - LineString lineString = (LineString) wktReader.read(trackGeomWkt); + LineString lineString = (LineString) wktReaderLocal.get().read(trackGeomWkt); // 빈 LineString은 건너뛰되, 선박 객체는 이미 생성됨 if (lineString.getNumPoints() == 0) { @@ -2086,31 +1732,7 @@ public class ChunkedTrackStreamingService { String vesselId = entry.getKey(); VesselAccumulator acc = entry.getValue(); - double avgSpeed = 0.0; - if (acc.totalDistance > 0 && acc.timestamps.size() > 1) { - String firstTs = acc.timestamps.get(0); - String lastTs = acc.timestamps.get(acc.timestamps.size() - 1); - - LocalDateTime startTime; - LocalDateTime endTime; - - if (firstTs.matches("\\d{10,}")) { - startTime = LocalDateTime.ofInstant( - java.time.Instant.ofEpochSecond(Long.parseLong(firstTs)), - java.time.ZoneId.systemDefault()); - endTime = LocalDateTime.ofInstant( - java.time.Instant.ofEpochSecond(Long.parseLong(lastTs)), - java.time.ZoneId.systemDefault()); - } else { - startTime = LocalDateTime.parse(firstTs, TIMESTAMP_FORMATTER); - endTime = LocalDateTime.parse(lastTs, TIMESTAMP_FORMATTER); - } - - double hours = Duration.between(startTime, endTime).toMinutes() / 60.0; - if (hours > 0) { - avgSpeed = acc.totalDistance / hours; - } - } + double avgSpeed = calculateAvgSpeed(acc.totalDistance, acc.timestamps); return CompactVesselTrack.builder() .vesselId(vesselId) @@ -2196,25 +1818,6 @@ public class ChunkedTrackStreamingService { return SimplificationLevel.LIGHT; } - // 도우미 메서드들 - private List mergeGeometry(List existing, List newData) { - if (existing == null) existing = new ArrayList<>(); - existing.addAll(newData); - return existing; - } - - private List mergeTimestamps(List existing, List newData) { - if (existing == null) existing = new ArrayList<>(); - existing.addAll(newData); - return existing; - } - - private List mergeSpeeds(List existing, List newData) { - if (existing == null) existing = new ArrayList<>(); - existing.addAll(newData); - return existing; - } - // 내부 클래스들 private static class TimeChunk { LocalDateTime start; @@ -2269,7 +1872,7 @@ public class ChunkedTrackStreamingService { // ChunkStats 생성 헬퍼 메서드 private ChunkStats createChunkStats(List compactTracks, - Set uniqueVesselIds, int processedMinutes) { + Set uniqueVesselIds, int processedMinutes, QueryContext ctx) { ChunkStats stats = new ChunkStats(); stats.setTrackCount(compactTracks.size()); stats.setTotalTracks(compactTracks.stream() @@ -2277,45 +1880,22 @@ public class ChunkedTrackStreamingService { .sum()); // 시간 기반 진행률만 사용 - double timeProgress = estimatedTotalMinutes > 0 ? - (double) processedMinutes / estimatedTotalMinutes * 100 : 0; + double timeProgress = ctx.estimatedTotalMinutes > 0 ? + (double) processedMinutes / ctx.estimatedTotalMinutes * 100 : 0; stats.setProgressPercentage(Math.min(100.0, timeProgress)); stats.setTimeProgress(timeProgress); // 기본 정보 stats.setProcessedVessels(uniqueVesselIds.size()); stats.setProcessedMinutes(processedMinutes); - stats.setTotalMinutes(estimatedTotalMinutes); + stats.setTotalMinutes(ctx.estimatedTotalMinutes); // 경과 시간 - stats.setElapsedMillis(System.currentTimeMillis() - queryStartTime); + stats.setElapsedMillis(System.currentTimeMillis() - ctx.queryStartTime); return stats; } - // 현재 청크 인덱스 추적용 - private int currentGlobalChunkIndex = 0; - private int vesselLogCount = 0; // 로그 카운트 추가 - - private int getCurrentChunkIndex() { - return currentGlobalChunkIndex; - } - - /** - * 범위를 날짜별로 그룹화 - */ - @SuppressWarnings("unused") - private Map> groupRangesByDate(List ranges) { - Map> dateGroups = new TreeMap<>(); - - for (TimeRange range : ranges) { - LocalDate date = range.getStart().toLocalDate(); - dateGroups.computeIfAbsent(date, k -> new ArrayList<>()).add(range); - } - - return dateGroups; - } - /** * 범위를 시간 윈도우로 그룹화 (예: 6시간 단위) */ @@ -2339,10 +1919,10 @@ public class ChunkedTrackStreamingService { */ private List processTableRangeWithBaseTime( TrackQueryRequest request, TableStrategy strategy, TimeRange range, LocalDateTime dayBaseTime, - Set viewportVesselIds) { + Set viewportVesselIds, String queryId, QueryContext ctx) { // 기본 처리 후 M값 보정 - List tracks = processTableRange(request, strategy, range, null, viewportVesselIds); + List tracks = processTableRange(request, strategy, range, null, viewportVesselIds, queryId); // 각 트랙의 timestamp를 dayBaseTime 기준으로 재계산 및 간소화 for (CompactVesselTrack track : tracks) { @@ -2451,7 +2031,7 @@ public class ChunkedTrackStreamingService { track.setPointCount(sampledGeometry.size()); // 간소화 결과 로그 (처음 10개 선박만) - if (vesselLogCount++ < 10) { + if (ctx.vesselLogCount++ < 10) { double reductionRate = (1 - (double)sampledGeometry.size() / originalSize) * 100; log.info("[{}] Vessel {} simplified: {} -> {} points ({}% reduced, zoom: {}, speed: {} knots)", strategy, track.getVesselId(), originalSize, sampledGeometry.size(), @@ -2464,7 +2044,7 @@ public class ChunkedTrackStreamingService { track.setPointCount(simplifiedGeometry.size()); // 간소화 결과 로그 (처음 10개 선박만) - if (vesselLogCount++ < 10) { + if (ctx.vesselLogCount++ < 10) { double reductionRate = (1 - (double)simplifiedGeometry.size() / originalSize) * 100; log.info("[{}] Vessel {} simplified: {} -> {} points ({}% reduced, speed: {} knots)", strategy, track.getVesselId(), originalSize, simplifiedGeometry.size(), @@ -2488,9 +2068,9 @@ public class ChunkedTrackStreamingService { int startChunkIndex, Set uniqueVesselIds, Set viewportVesselIds, - QueryBenchmark benchmark) throws Exception { + QueryBenchmark benchmark, QueryContext ctx) throws Exception { - currentGlobalChunkIndex = startChunkIndex; + ctx.currentGlobalChunkIndex = startChunkIndex; // 전체 쿼리 동안 유지되는 세션 캐시 (모든 날짜에 걸쳐 공유) Map sessionVesselCache = new HashMap<>(20000); @@ -2546,7 +2126,7 @@ public class ChunkedTrackStreamingService { List> batches = splitByMessageSize(cachedTracks); int postBatchCount = batches.size(); for (List batch : batches) { - sendChunkResponse(batch, queryId, chunkConsumer, uniqueVesselIds); + sendChunkResponse(batch, queryId, chunkConsumer, uniqueVesselIds, ctx); } batches.clear(); batches = null; @@ -2579,7 +2159,7 @@ public class ChunkedTrackStreamingService { if (benchmark != null) { benchmark.dbQueryDays++; // [BENCHMARK] } - streamDailyTableWithPagination(request, range, queryId, chunkConsumer, statusConsumer, uniqueVesselIds, sessionVesselCache, viewportVesselIds, benchmark); + streamDailyTableWithPagination(request, range, queryId, chunkConsumer, statusConsumer, uniqueVesselIds, sessionVesselCache, viewportVesselIds, benchmark, ctx); } } @@ -2592,16 +2172,16 @@ public class ChunkedTrackStreamingService { */ private void sendChunkResponse(List batch, String queryId, Consumer chunkConsumer, - Set uniqueVesselIds) { + Set uniqueVesselIds, QueryContext ctx) { TrackChunkResponse response = new TrackChunkResponse(); response.setQueryId(queryId); - response.setChunkIndex(currentGlobalChunkIndex++); + response.setChunkIndex(ctx.currentGlobalChunkIndex++); response.setIsLastChunk(false); response.setTotalChunks(-1); response.setCompactTracks(batch); - int processedMin = processedTimeRanges.values().stream().mapToInt(Integer::intValue).sum(); - response.setStats(createChunkStats(batch, uniqueVesselIds, processedMin)); + int processedMin = ctx.processedTimeRanges.values().stream().mapToInt(Integer::intValue).sum(); + response.setStats(createChunkStats(batch, uniqueVesselIds, processedMin, ctx)); chunkConsumer.accept(response); batch.forEach(track -> uniqueVesselIds.add(track.getVesselId())); @@ -2620,7 +2200,7 @@ public class ChunkedTrackStreamingService { Set uniqueVesselIds, Map sessionVesselCache, Set viewportVesselIds, - QueryBenchmark benchmark) { + QueryBenchmark benchmark, QueryContext ctx) { String tableName = TableStrategy.DAILY.getTableName(); // 줌 레벨에 따른 강화된 간소화 tolerance @@ -2628,9 +2208,6 @@ public class ChunkedTrackStreamingService { log.info("Daily streaming pagination started for range [{} - {}] with tolerance {} (zoom: {}, cacheSize: {})", range.getStart(), range.getEnd(), tolerance, request.getZoomLevel(), sessionVesselCache.size()); - // 세션 ID 추출 (쿼리 취소 확인용) - String sessionId = queryId != null ? queryId.split("_")[0] : null; - String lastMmsi = null; int pageNum = 0; int totalTrackCount = 0; @@ -2638,10 +2215,10 @@ public class ChunkedTrackStreamingService { // 페이지네이션 루프 - 각 페이지 완료 즉시 전송 while (true) { - // 세션 연결 끊김 체크 (각 페이지 시작 전) - if (sessionId != null && isQueryCancelled(sessionId)) { - log.warn("Query cancelled before page {}: sessionId={}, totalTracks={}", - pageNum + 1, sessionId, totalTrackCount); + // 쿼리 취소 체크 (각 페이지 시작 전) + if (isQueryCancelled(queryId, null)) { + log.warn("Query cancelled before page {}: queryId={}, totalTracks={}", + pageNum + 1, queryId, totalTrackCount); return; } @@ -2723,11 +2300,11 @@ public class ChunkedTrackStreamingService { // 3단계: 수집된 데이터 처리 long processStartTime = System.currentTimeMillis(); for (String[] trackData : trackDataList) { - // 세션 연결 끊김 체크 (1000개마다) - if (sessionId != null && pageTrackCount % 1000 == 0 && pageTrackCount > 0) { - if (isQueryCancelled(sessionId)) { - log.warn("Query cancelled during page {} processing: sessionId={}, pageTrackCount={}", - pageNum + 1, sessionId, pageTrackCount); + // 쿼리 취소 체크 (1000개마다) + if (pageTrackCount % 1000 == 0 && pageTrackCount > 0) { + if (isQueryCancelled(queryId, null)) { + log.warn("Query cancelled during page {} processing: queryId={}, pageTrackCount={}", + pageNum + 1, queryId, pageTrackCount); return; } } @@ -2759,7 +2336,7 @@ public class ChunkedTrackStreamingService { if (trackGeomWkt != null && !trackGeomWkt.isEmpty() && !"LINESTRING EMPTY".equals(trackGeomWkt)) { try { - LineString lineString = (LineString) wktReader.read(trackGeomWkt); + LineString lineString = (LineString) wktReaderLocal.get().read(trackGeomWkt); // 빈 LineString은 건너뛰되, 선박 객체는 이미 생성됨 if (lineString.getNumPoints() == 0) { @@ -2841,16 +2418,16 @@ public class ChunkedTrackStreamingService { for (List batch : batches) { TrackChunkResponse response = new TrackChunkResponse(); response.setQueryId(queryId); - response.setChunkIndex(currentGlobalChunkIndex++); + response.setChunkIndex(ctx.currentGlobalChunkIndex++); response.setIsLastChunk(false); response.setTotalChunks(-1); response.setCompactTracks(batch); // 처리된 시간 계산 String dailyKey = "daily_" + range.getStart() + "_page" + pageNum; - processedTimeRanges.put(dailyKey, (int)Duration.between(range.getStart(), range.getEnd()).toMinutes() / Math.max(1, pageNum + 1)); - int processedMin = processedTimeRanges.values().stream().mapToInt(Integer::intValue).sum(); - response.setStats(createChunkStats(batch, uniqueVesselIds, processedMin)); + ctx.processedTimeRanges.put(dailyKey, (int)Duration.between(range.getStart(), range.getEnd()).toMinutes() / Math.max(1, pageNum + 1)); + int processedMin = ctx.processedTimeRanges.values().stream().mapToInt(Integer::intValue).sum(); + response.setStats(createChunkStats(batch, uniqueVesselIds, processedMin, ctx)); // metrics 업데이트 (BACKPRESSURE 통계용) BackpressureMetrics metrics = queryMetrics.get(queryId); @@ -2864,7 +2441,7 @@ public class ChunkedTrackStreamingService { batch.forEach(track -> uniqueVesselIds.add(track.getVesselId())); - double timeProgress = (double) processedMin / estimatedTotalMinutes * 100; + double timeProgress = (double) processedMin / ctx.estimatedTotalMinutes * 100; statusConsumer.accept(new QueryStatusUpdate( queryId, "PROCESSING", @@ -2918,31 +2495,7 @@ public class ChunkedTrackStreamingService { String visselId = entry.getKey(); VesselAccumulator acc = entry.getValue(); - double avgSpeed = 0.0; - if (acc.totalDistance > 0 && acc.timestamps.size() > 1) { - String firstTs = acc.timestamps.get(0); - String lastTs = acc.timestamps.get(acc.timestamps.size() - 1); - - LocalDateTime startTime; - LocalDateTime endTime; - - if (firstTs.matches("\\d{10,}")) { - startTime = LocalDateTime.ofInstant( - java.time.Instant.ofEpochSecond(Long.parseLong(firstTs)), - java.time.ZoneId.systemDefault()); - endTime = LocalDateTime.ofInstant( - java.time.Instant.ofEpochSecond(Long.parseLong(lastTs)), - java.time.ZoneId.systemDefault()); - } else { - startTime = LocalDateTime.parse(firstTs, TIMESTAMP_FORMATTER); - endTime = LocalDateTime.parse(lastTs, TIMESTAMP_FORMATTER); - } - - double hours = Duration.between(startTime, endTime).toMinutes() / 60.0; - if (hours > 0) { - avgSpeed = acc.totalDistance / hours; - } - } + double avgSpeed = calculateAvgSpeed(acc.totalDistance, acc.timestamps); return CompactVesselTrack.builder() .vesselId(visselId) @@ -2964,22 +2517,60 @@ public class ChunkedTrackStreamingService { /** * 쿼리 중단 여부 확인 + * queryId: queryCancelFlags 직접 조회 (O(1)), sessionId: ActiveQueryManager 세션 종료 체크 */ - private boolean isQueryCancelled(String sessionId) { - if (sessionId == null) { - return false; - } - - // 1. queryCancelFlags 확인 (cancelQuery() 호출 시 설정됨) - for (Map.Entry entry : queryCancelFlags.entrySet()) { - if (entry.getValue().get()) { + private boolean isQueryCancelled(String queryId, String sessionId) { + // 1. queryId 기반 직접 조회 (cancelQuery() 호출 시 설정됨) + if (queryId != null) { + AtomicBoolean flag = queryCancelFlags.get(queryId); + if (flag != null && flag.get()) { return true; } } - // 2. ActiveQueryManager 확인 (세션 종료 시 설정됨) - ActiveQueryManager.ActiveQuery query = activeQueryManager.getQuery(sessionId); - return query != null && query.isCancelled(); + // 2. sessionId 기반 ActiveQueryManager 확인 (세션 종료 시 설정됨) + if (sessionId != null) { + ActiveQueryManager.ActiveQuery query = activeQueryManager.getQuery(sessionId); + if (query != null && query.isCancelled()) { + return true; + } + } + + return false; + } + + /** + * 평균 속도 계산 (distance / hours) + * timestamps의 첫/끝 값으로 경과 시간을 구한 뒤, totalDistance / hours 반환 + */ + private double calculateAvgSpeed(double totalDistance, List timestamps) { + if (totalDistance <= 0 || timestamps.size() <= 1) { + return 0.0; + } + try { + String firstTs = timestamps.get(0); + String lastTs = timestamps.get(timestamps.size() - 1); + + LocalDateTime start; + LocalDateTime end; + + if (firstTs.matches("\\d{10,}")) { + start = LocalDateTime.ofInstant( + java.time.Instant.ofEpochSecond(Long.parseLong(firstTs)), + java.time.ZoneId.systemDefault()); + end = LocalDateTime.ofInstant( + java.time.Instant.ofEpochSecond(Long.parseLong(lastTs)), + java.time.ZoneId.systemDefault()); + } else { + start = LocalDateTime.parse(firstTs, TIMESTAMP_FORMATTER); + end = LocalDateTime.parse(lastTs, TIMESTAMP_FORMATTER); + } + + double hours = Duration.between(start, end).toMinutes() / 60.0; + return hours > 0 ? totalDistance / hours : 0.0; + } catch (Exception e) { + return 0.0; + } } } \ No newline at end of file diff --git a/src/main/java/gc/mda/signal_batch/monitoring/controller/QueryMetricsController.java b/src/main/java/gc/mda/signal_batch/monitoring/controller/QueryMetricsController.java new file mode 100644 index 0000000..e7fb789 --- /dev/null +++ b/src/main/java/gc/mda/signal_batch/monitoring/controller/QueryMetricsController.java @@ -0,0 +1,40 @@ +package gc.mda.signal_batch.monitoring.controller; + +import gc.mda.signal_batch.monitoring.service.QueryMetricsService; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import lombok.RequiredArgsConstructor; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +import java.util.List; +import java.util.Map; + +/** + * 쿼리 메트릭 조회 API + * + * WebSocket/REST 쿼리 실행 이력 및 성능 통계를 제공한다. + * ApiMetrics 프론트엔드 페이지의 데이터 소스. + */ +@RestController +@RequestMapping("/api/monitoring/query-metrics") +@RequiredArgsConstructor +@Tag(name = "Query Metrics", description = "쿼리 실행 메트릭 조회 API") +public class QueryMetricsController { + + private final QueryMetricsService queryMetricsService; + + @GetMapping + @Operation(summary = "최근 쿼리 메트릭 조회", description = "최근 N건의 쿼리 실행 메트릭을 조회합니다") + public ResponseEntity>> getRecentMetrics( + @RequestParam(defaultValue = "50") int limit) { + return ResponseEntity.ok(queryMetricsService.getRecentMetrics(Math.min(limit, 200))); + } + + @GetMapping("/stats") + @Operation(summary = "쿼리 메트릭 통계", description = "기간별 쿼리 성능 통계 (평균 응답시간, 캐시 비율, 느린 쿼리 등)") + public ResponseEntity> getStats( + @RequestParam(defaultValue = "7") int days) { + return ResponseEntity.ok(queryMetricsService.getStats(Math.min(days, 90))); + } +} diff --git a/src/main/java/gc/mda/signal_batch/monitoring/service/QueryMetricsService.java b/src/main/java/gc/mda/signal_batch/monitoring/service/QueryMetricsService.java new file mode 100644 index 0000000..9dd696c --- /dev/null +++ b/src/main/java/gc/mda/signal_batch/monitoring/service/QueryMetricsService.java @@ -0,0 +1,180 @@ +package gc.mda.signal_batch.monitoring.service; + +import lombok.Builder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * 쿼리 실행 메트릭 DB 저장/조회 서비스 + * + * WebSocket 리플레이 및 REST API 쿼리의 성능 메트릭을 signal.t_query_metrics에 저장. + * streamChunkedTracks() finally 블록에서 비동기 INSERT 호출하여 응답 지연 없이 기록. + */ +@Slf4j +@Service +public class QueryMetricsService { + + private final JdbcTemplate queryJdbcTemplate; + + private static final String INSERT_SQL = """ + INSERT INTO signal.t_query_metrics ( + query_id, session_id, query_type, created_at, + start_time, end_time, zoom_level, viewport_bounds, requested_mmsi, + data_path, cache_hit_days, db_query_days, db_conn_total, + unique_vessels, total_tracks, total_points, points_after_simplify, + total_chunks, response_bytes, + elapsed_ms, db_query_ms, simplify_ms, backpressure_events, + status + ) VALUES ( + ?, ?, ?, now(), + ?, ?, ?, ?, ?, + ?, ?, ?, ?, + ?, ?, ?, ?, + ?, ?, + ?, ?, ?, ?, + ? + ) + """; + + public QueryMetricsService(@Qualifier("queryJdbcTemplate") JdbcTemplate queryJdbcTemplate) { + this.queryJdbcTemplate = queryJdbcTemplate; + } + + /** + * 쿼리 메트릭 비동기 저장 — 쿼리 응답에 영향 없음 + */ + @Async("trackStreamingExecutor") + public void saveAsync(QueryMetric metric) { + try { + queryJdbcTemplate.update(INSERT_SQL, + metric.queryId, metric.sessionId, metric.queryType, + metric.startTime != null ? Timestamp.valueOf(metric.startTime) : null, + metric.endTime != null ? Timestamp.valueOf(metric.endTime) : null, + metric.zoomLevel, metric.viewportBounds, metric.requestedMmsi, + metric.dataPath, metric.cacheHitDays, metric.dbQueryDays, metric.dbConnTotal, + metric.uniqueVessels, metric.totalTracks, metric.totalPoints, metric.pointsAfterSimplify, + metric.totalChunks, metric.responseBytes, + metric.elapsedMs, metric.dbQueryMs, metric.simplifyMs, metric.backpressureEvents, + metric.status + ); + log.debug("Query metric saved: queryId={}, elapsed={}ms, status={}", + metric.queryId, metric.elapsedMs, metric.status); + } catch (Exception e) { + log.warn("Failed to save query metric: queryId={}, error={}", metric.queryId, e.getMessage()); + } + } + + /** + * 최근 쿼리 메트릭 조회 + */ + public List> getRecentMetrics(int limit) { + return queryJdbcTemplate.queryForList(""" + SELECT query_id, session_id, query_type, created_at, + start_time, end_time, zoom_level, viewport_bounds, + data_path, cache_hit_days, db_query_days, db_conn_total, + unique_vessels, total_tracks, total_points, points_after_simplify, + total_chunks, response_bytes, + elapsed_ms, db_query_ms, simplify_ms, backpressure_events, status + FROM signal.t_query_metrics + ORDER BY created_at DESC + LIMIT ? + """, limit); + } + + /** + * 기간별 쿼리 메트릭 통계 + */ + public Map getStats(int days) { + Map stats = new LinkedHashMap<>(); + + // 전체 통계 + Map summary = queryJdbcTemplate.queryForMap(""" + SELECT + COUNT(*) AS total_queries, + ROUND(AVG(elapsed_ms)) AS avg_elapsed_ms, + MAX(elapsed_ms) AS max_elapsed_ms, + ROUND(AVG(unique_vessels)) AS avg_vessels, + ROUND(AVG(total_points)) AS avg_points, + SUM(CASE WHEN data_path = 'CACHE' THEN 1 ELSE 0 END) AS cache_only, + SUM(CASE WHEN data_path = 'HYBRID' THEN 1 ELSE 0 END) AS hybrid, + SUM(CASE WHEN data_path = 'DB' THEN 1 ELSE 0 END) AS db_only, + SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) AS completed, + SUM(CASE WHEN status = 'CANCELLED' THEN 1 ELSE 0 END) AS cancelled, + SUM(CASE WHEN status = 'ERROR' THEN 1 ELSE 0 END) AS errors, + SUM(CASE WHEN status = 'TIMEOUT' THEN 1 ELSE 0 END) AS timeouts + FROM signal.t_query_metrics + WHERE created_at >= now() - INTERVAL '%d days' + """.formatted(days)); + stats.put("summary", summary); + + // 일별 추이 + List> daily = queryJdbcTemplate.queryForList(""" + SELECT + DATE(created_at) AS date, + COUNT(*) AS query_count, + ROUND(AVG(elapsed_ms)) AS avg_elapsed_ms, + ROUND(AVG(unique_vessels)) AS avg_vessels, + SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) AS completed, + SUM(CASE WHEN status != 'COMPLETED' THEN 1 ELSE 0 END) AS failed + FROM signal.t_query_metrics + WHERE created_at >= now() - INTERVAL '%d days' + GROUP BY DATE(created_at) + ORDER BY date DESC + """.formatted(days)); + stats.put("dailyTrend", daily); + + // 느린 쿼리 TOP 10 + List> slowQueries = queryJdbcTemplate.queryForList(""" + SELECT query_id, created_at, elapsed_ms, unique_vessels, total_points, + data_path, db_conn_total, zoom_level, status + FROM signal.t_query_metrics + WHERE created_at >= now() - INTERVAL '%d days' + ORDER BY elapsed_ms DESC + LIMIT 10 + """.formatted(days)); + stats.put("slowQueries", slowQueries); + + return stats; + } + + /** + * 쿼리 메트릭 데이터 클래스 + */ + @Getter + @Builder + public static class QueryMetric { + private final String queryId; + private final String sessionId; + private final String queryType; + private final LocalDateTime startTime; + private final LocalDateTime endTime; + private final Integer zoomLevel; + private final String viewportBounds; + private final int requestedMmsi; + private final String dataPath; + private final int cacheHitDays; + private final int dbQueryDays; + private final int dbConnTotal; + private final int uniqueVessels; + private final int totalTracks; + private final int totalPoints; + private final int pointsAfterSimplify; + private final int totalChunks; + private final long responseBytes; + private final long elapsedMs; + private final long dbQueryMs; + private final long simplifyMs; + private final int backpressureEvents; + private final String status; + } +} diff --git a/src/main/resources/sql/create_query_metrics_table.sql b/src/main/resources/sql/create_query_metrics_table.sql new file mode 100644 index 0000000..3c4b404 --- /dev/null +++ b/src/main/resources/sql/create_query_metrics_table.sql @@ -0,0 +1,42 @@ +-- 쿼리 실행 메트릭 테이블 +-- WebSocket/REST 쿼리의 성능 지표를 기록하여 ApiMetrics 페이지에서 조회 +CREATE TABLE IF NOT EXISTS signal.t_query_metrics ( + id BIGSERIAL PRIMARY KEY, + query_id VARCHAR(64) NOT NULL, + session_id VARCHAR(64), + query_type VARCHAR(20) NOT NULL, -- 'WEBSOCKET' | 'REST_V1' | 'REST_V2' + created_at TIMESTAMP NOT NULL DEFAULT now(), + + -- 요청 파라미터 + start_time TIMESTAMP, + end_time TIMESTAMP, + zoom_level INTEGER, + viewport_bounds VARCHAR(200), -- "minLon,minLat,maxLon,maxLat" + requested_mmsi INTEGER DEFAULT 0, + + -- 처리 경로 + data_path VARCHAR(10), -- 'CACHE' | 'DB' | 'HYBRID' + cache_hit_days INTEGER DEFAULT 0, + db_query_days INTEGER DEFAULT 0, + db_conn_total INTEGER DEFAULT 0, + + -- 결과 통계 + unique_vessels INTEGER DEFAULT 0, + total_tracks INTEGER DEFAULT 0, + total_points INTEGER DEFAULT 0, + points_after_simplify INTEGER DEFAULT 0, + total_chunks INTEGER DEFAULT 0, + response_bytes BIGINT DEFAULT 0, + + -- 성능 + elapsed_ms BIGINT DEFAULT 0, + db_query_ms BIGINT DEFAULT 0, + simplify_ms BIGINT DEFAULT 0, + backpressure_events INTEGER DEFAULT 0, + + -- 결과 상태 + status VARCHAR(20) DEFAULT 'COMPLETED' -- 'COMPLETED' | 'CANCELLED' | 'ERROR' | 'TIMEOUT' +); + +CREATE INDEX IF NOT EXISTS idx_query_metrics_created ON signal.t_query_metrics(created_at); +CREATE INDEX IF NOT EXISTS idx_query_metrics_type ON signal.t_query_metrics(query_type, created_at);