diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java index 613342b..0d7c179 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java @@ -1,7 +1,11 @@ package gc.mda.signal_batch.global.websocket.service; +import gc.mda.signal_batch.batch.reader.FiveMinTrackCache; +import gc.mda.signal_batch.batch.reader.HourlyTrackCache; +import gc.mda.signal_batch.domain.vessel.model.VesselTrack; import gc.mda.signal_batch.global.exception.MemoryBudgetExceededException; import gc.mda.signal_batch.global.util.TrackMemoryEstimator; +import gc.mda.signal_batch.global.util.VesselTrackToCompactConverter; import gc.mda.signal_batch.global.websocket.dto.TrackChunkResponse; import gc.mda.signal_batch.global.websocket.interceptor.TrackQueryInterceptor; import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack; @@ -55,6 +59,9 @@ public class ChunkedTrackStreamingService { private final CacheTrackSimplifier cacheTrackSimplifier; private final TrackMemoryBudgetManager memoryBudgetManager; private final QueryMetricsBufferService queryMetricsBufferService; + private final HourlyTrackCache hourlyTrackCache; + private final FiveMinTrackCache fiveMinTrackCache; + private final VesselTrackToCompactConverter vesselTrackToCompactConverter; private static final ThreadLocal wktReaderLocal = ThreadLocal.withInitial(WKTReader::new); private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private static final int MAX_TRACKS_PER_CHUNK = 500000; // 청크당 최대 트랙 수 (10만 선박 지원) @@ -105,7 +112,10 @@ public class ChunkedTrackStreamingService { DailyTrackCacheManager dailyTrackCacheManager, CacheTrackSimplifier cacheTrackSimplifier, TrackMemoryBudgetManager memoryBudgetManager, - QueryMetricsBufferService queryMetricsBufferService) { + QueryMetricsBufferService queryMetricsBufferService, + HourlyTrackCache hourlyTrackCache, + FiveMinTrackCache fiveMinTrackCache, + VesselTrackToCompactConverter vesselTrackToCompactConverter) { this.queryJdbcTemplate = queryJdbcTemplate; this.queryDataSource = queryDataSource; this.activeQueryManager = activeQueryManager; @@ -114,6 +124,9 @@ public class ChunkedTrackStreamingService { this.cacheTrackSimplifier = cacheTrackSimplifier; this.memoryBudgetManager = memoryBudgetManager; this.queryMetricsBufferService = queryMetricsBufferService; + this.hourlyTrackCache = hourlyTrackCache; + this.fiveMinTrackCache = fiveMinTrackCache; + this.vesselTrackToCompactConverter = vesselTrackToCompactConverter; } /** @@ -153,6 +166,8 @@ public class ChunkedTrackStreamingService { private static class QueryBenchmark { int cacheHitDays = 0; int dbQueryDays = 0; + int cacheHourlyRanges = 0; // L2 캐시 그룹 수 + int cacheFiveMinRanges = 0; // L1 캐시 그룹 수 int totalTracks = 0; int totalPointsBefore = 0; int totalPointsAfter = 0; @@ -164,13 +179,15 @@ public class ChunkedTrackStreamingService { int connViewportPass1 = 0; // collectViewportVesselIds DB 쿼리 int connDailyPages = 0; // streamDailyTableWithPagination 페이지 int connVesselInfo = 0; // preloadVesselInfoWithSessionCache 배치 - int connHourly5min = 0; // processTableRange (5min/hourly) + int connHourly5min = 0; // processTableRange (5min/hourly) — 기존 DB 경로 잔류용 int connTableCheck = 0; // hasDataInTable 존재 검증 Integer zoomLevel; String determinePath() { - if (cacheHitDays > 0 && dbQueryDays > 0) return "HYBRID"; - if (cacheHitDays > 0) return "CACHE"; + boolean anyCache = cacheHitDays > 0 || cacheHourlyRanges > 0 || cacheFiveMinRanges > 0; + boolean anyDb = dbQueryDays > 0; + if (anyCache && anyDb) return "HYBRID"; + if (anyCache) return "CACHE"; return "DB"; } @@ -182,6 +199,7 @@ public class ChunkedTrackStreamingService { return String.format( "{\"queryId\":\"%s\",\"timestamp\":\"%s\",\"path\":\"%s\"," + "\"zoomLevel\":%s,\"dateRanges\":%d,\"cacheHitDays\":%d,\"dbQueryDays\":%d," + + "\"cacheHourlyRanges\":%d,\"cacheFiveMinRanges\":%d," + "\"totalTracks\":%d,\"totalPointsBefore\":%d,\"totalPointsAfter\":%d," + "\"pointReductionPct\":%d,\"totalBatches\":%d,\"batchesBeforeSimplify\":%d," + "\"simplifyTimeMs\":%d,\"dbQueryTimeMs\":%d,\"totalElapsedMs\":%d," + @@ -192,8 +210,9 @@ public class ChunkedTrackStreamingService { LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), determinePath(), zoomLevel != null ? zoomLevel.toString() : "null", - cacheHitDays + dbQueryDays, + cacheHitDays + dbQueryDays + cacheHourlyRanges + cacheFiveMinRanges, cacheHitDays, dbQueryDays, + cacheHourlyRanges, cacheFiveMinRanges, totalTracks, totalPointsBefore, totalPointsAfter, totalPointsBefore > 0 ? Math.round((1 - (double) totalPointsAfter / totalPointsBefore) * 100) : 0, totalBatches, batchesBeforeSimplify, @@ -238,12 +257,12 @@ public class ChunkedTrackStreamingService { // DB에서 조회 (signal_kind_code는 캐시 저장 시 치환된 값) try { - String sql = "SELECT ship_nm, vessel_type, signal_kind_code FROM signal.t_ais_position " + + String sql = "SELECT name, vessel_type, signal_kind_code FROM signal.t_ais_position " + "WHERE mmsi = ? LIMIT 1"; VesselInfo info = queryJdbcTemplate.queryForObject(sql, (rs, rowNum) -> new VesselInfo( - rs.getString("ship_nm"), + rs.getString("name"), rs.getString("vessel_type"), rs.getString("signal_kind_code") ), @@ -320,7 +339,7 @@ public class ChunkedTrackStreamingService { .map(id -> "?") .collect(Collectors.joining(",")); - String sql = "SELECT mmsi, ship_nm, vessel_type, signal_kind_code " + + String sql = "SELECT mmsi, name, vessel_type, signal_kind_code " + "FROM signal.t_ais_position " + "WHERE mmsi IN (" + placeholders + ")"; @@ -329,7 +348,7 @@ public class ChunkedTrackStreamingService { queryJdbcTemplate.query(sql, rs -> { String visselId = rs.getString("mmsi"); VesselInfo info = new VesselInfo( - rs.getString("ship_nm"), + rs.getString("name"), rs.getString("vessel_type"), rs.getString("signal_kind_code") ); @@ -389,7 +408,7 @@ public class ChunkedTrackStreamingService { .map(id -> "?") .collect(Collectors.joining(",")); - String sql = "SELECT mmsi, ship_nm, vessel_type, signal_kind_code " + + String sql = "SELECT mmsi, name, vessel_type, signal_kind_code " + "FROM signal.t_ais_position " + "WHERE mmsi IN (" + placeholders + ")"; @@ -398,7 +417,7 @@ public class ChunkedTrackStreamingService { queryJdbcTemplate.query(sql, rs -> { String vesselId = rs.getString("mmsi"); VesselInfo info = new VesselInfo( - rs.getString("ship_nm"), + rs.getString("name"), rs.getString("vessel_type"), rs.getString("signal_kind_code") ); @@ -447,6 +466,12 @@ public class ChunkedTrackStreamingService { for (Map.Entry> entry : strategyMap.entrySet()) { TableStrategy strategy = entry.getKey(); + + // HOURLY/FIVE_MINUTE: 캐시에서 직접 뷰포트 필터 수행 → Pass 1 DB 쿼리 불필요 + if (strategy == TableStrategy.HOURLY || strategy == TableStrategy.FIVE_MINUTE) { + continue; + } + String tableName = strategy.getTableName(); for (TimeRange range : entry.getValue()) { @@ -725,7 +750,8 @@ public class ChunkedTrackStreamingService { String sessionId, Consumer chunkConsumer, Consumer statusConsumer, - String clientIp) { + String clientIp, + String clientId) { boolean slotAcquired = false; QueryBenchmark benchmark = null; QueryContext ctx = null; @@ -867,193 +893,11 @@ public class ChunkedTrackStreamingService { globalChunkIndex, uniqueVesselIds, viewportVesselIds, benchmark, ctx); globalChunkIndex = ctx.getCurrentChunkIndex(); } else { - // Hourly/5min은 6시간 단위로 그룹화하여 처리 - Map> timeGroups = groupRangesByTimeWindow(ranges, 6); - - for (Map.Entry> groupEntry : timeGroups.entrySet()) { - String groupKey = groupEntry.getKey(); - List groupRanges = groupEntry.getValue(); - log.info("[{}] Processing time window {} with {} ranges", strategy, groupKey, groupRanges.size()); - - // 시간 그룹 데이터를 병합 - Map mergedMap = new HashMap<>(20000); - LocalDateTime baseTime = null; - - for (TimeRange range : groupRanges) { - try { - // 첫 범위의 시작 시간을 기준으로 설정 - if (baseTime == null) { - baseTime = range.getStart(); - } - - List compactTracks = processTableRangeWithBaseTime( - request, strategy, range, baseTime, viewportVesselIds, queryId, ctx); - if (benchmark != null) benchmark.connHourly5min++; // [BENCHMARK] - - // 선박별로 볕합 - for (CompactVesselTrack track : compactTracks) { - String vesselId = track.getVesselId(); - VesselAccumulator accumulator = mergedMap.get(vesselId); - - if (accumulator == null) { - accumulator = new VesselAccumulator(); - accumulator.mmsi = track.getVesselId(); - - VesselInfo vesselInfo = getVesselInfo(track.getVesselId()); - accumulator.shipName = vesselInfo.shipName; - accumulator.shipType = vesselInfo.shipType; - accumulator.shipKindCode = vesselInfo.signalKindCode; - - mergedMap.put(vesselId, accumulator); - } - - // 데이터 병합 - accumulator.geometry.addAll(track.getGeometry()); - accumulator.timestamps.addAll(track.getTimestamps()); - accumulator.speeds.addAll(track.getSpeeds()); - accumulator.totalDistance += track.getTotalDistance(); - accumulator.maxSpeed = Math.max(accumulator.maxSpeed, track.getMaxSpeed()); - accumulator.pointCount += track.getPointCount(); - } - } catch (Exception e) { - log.error("Error processing {} range {}: {}", strategy, range, e.getMessage()); - } - } - - // 병합된 데이터를 청크로 분할하여 전송 - if (!mergedMap.isEmpty()) { - List mergedTracks = mergedMap.entrySet().stream() - .map(entry -> { - String vesselId = entry.getKey(); - VesselAccumulator acc = entry.getValue(); - - double avgSpeed = calculateAvgSpeed(acc.totalDistance, acc.timestamps); - - return CompactVesselTrack.builder() - .vesselId(vesselId) - .nationalCode(acc.mmsi != null && acc.mmsi.length() >= 3 ? acc.mmsi.substring(0, 3) : null) - .shipName(acc.shipName) - .shipType(acc.shipType) - .shipKindCode(acc.shipKindCode) - .geometry(acc.geometry) - .timestamps(acc.timestamps) - .speeds(acc.speeds) - .totalDistance(acc.totalDistance) - .avgSpeed(avgSpeed) - .maxSpeed(acc.maxSpeed) - .pointCount(acc.pointCount) - .build(); - }) - .collect(Collectors.toList()); - - // 전체 포인트 통계 계산 - int totalOriginalPoints = mergedTracks.stream() - .mapToInt(t -> t.getPointCount()) - .sum(); - - log.info("[{}] Time window {} - Merged {} vessels, Total {} points", - strategy, groupKey, mergedTracks.size(), totalOriginalPoints); - - List> batches = splitByMessageSize(mergedTracks, queryId); - - for (List batch : batches) { - TrackChunkResponse response = new TrackChunkResponse(); - response.setQueryId(queryId); - response.setChunkIndex(globalChunkIndex++); - response.setIsLastChunk(false); - response.setTotalChunks(-1); // 마짉 청크에서 설정 - response.setCompactTracks(batch); - // 처리된 시간 추가 - String timeKey = groupKey + "_" + strategy; - ctx.processedTimeRanges.put(timeKey, groupRanges.stream() - .mapToInt(r -> (int)Duration.between(r.getStart(), r.getEnd()).toMinutes()) - .sum()); - int currentProcessedMin = ctx.processedTimeRanges.values().stream().mapToInt(Integer::intValue).sum(); - response.setStats(createChunkStats(batch, uniqueVesselIds, currentProcessedMin, ctx)); - - // 버퍼 크기 계산 및 추가 - int chunkSize = batch.stream().mapToInt(t -> estimateTrackSize(t)).sum(); - long currentBufferSize = pendingBufferSize.addAndGet(chunkSize); - metrics.totalBytes.addAndGet(chunkSize); - metrics.chunkCount.incrementAndGet(); - - // 버퍼 사용률 로그 - double bufferUsage = (double) currentBufferSize / MAX_PENDING_BUFFER * 100; - if (bufferUsage > 80 && System.currentTimeMillis() - metrics.lastWarningTime > 5000) { - metrics.bufferWarnings.incrementAndGet(); - metrics.lastWarningTime = System.currentTimeMillis(); - log.warn("[BACKPRESSURE] Query {} - Buffer usage high: {}% ({} MB / {} MB)", - queryId, String.format("%.1f", bufferUsage), currentBufferSize / (1024 * 1024), - MAX_PENDING_BUFFER / (1024 * 1024)); - } - - // 버퍼가 가듍 찬 경우 대기 및 동적 청크 크기 조절 - int backpressureWaitCount = 0; - while (pendingBufferSize.get() > MAX_PENDING_BUFFER) { - if (backpressureWaitCount == 0) { - metrics.backpressureEvents.incrementAndGet(); - log.warn("[BACKPRESSURE] Query {} - Buffer full! Waiting for buffer to drain. Current: {} MB", - queryId, pendingBufferSize.get() / (1024 * 1024)); - } - Thread.sleep(50); - backpressureWaitCount++; - - // 500ms 이상 대기 시 청크 크기 감소 - if (backpressureWaitCount > 10 && metrics.dynamicChunkSizeKB > MIN_MESSAGE_SIZE_KB) { - int oldSize = metrics.dynamicChunkSizeKB; - metrics.dynamicChunkSizeKB = Math.max(MIN_MESSAGE_SIZE_KB, - metrics.dynamicChunkSizeKB - 512); - log.info("[BACKPRESSURE] Query {} - Reducing chunk size: {} KB -> {} KB", - queryId, oldSize, metrics.dynamicChunkSizeKB); - } - } - - if (backpressureWaitCount > 0) { - log.info("[BACKPRESSURE] Query {} - Buffer drained after {} ms wait", - queryId, backpressureWaitCount * 50); - } - - // 전송 및 버퍼 추적 (전송 완료 후 즉시 감소) - try { - chunkConsumer.accept(response); - } finally { - pendingBufferSize.addAndGet(-chunkSize); - } - - // 유니크 선박 카운트 - batch.forEach(track -> uniqueVesselIds.add(track.getVesselId())); - - // 진행률 업데이트 (시간 기반만 사용) - double timeProgress = (double) currentProcessedMin / ctx.estimatedTotalMinutes * 100; - - statusConsumer.accept(new QueryStatusUpdate( - queryId, - "PROCESSING", - "Processing chunk " + globalChunkIndex, - Math.min(99.0, timeProgress) - )); - - // 버퍼 사용률에 따른 적응형 대기 - double currentBufferUsage = (double) pendingBufferSize.get() / MAX_PENDING_BUFFER; - int waitTime; - if (currentBufferUsage > 0.8) waitTime = 200; // 80%↑: 강한 억제 - else if (currentBufferUsage > 0.5) waitTime = 100; // 50%↑: 중간 억제 - else if (currentBufferUsage > 0.3) waitTime = 50; // 30%↑: 약한 억제 - else waitTime = 10; // 정상: 최소 지연 - Thread.sleep(waitTime); - - // 진행 상황 로그 (매 10번째 청크마다) - if (globalChunkIndex % 10 == 0) { - log.info("Progress: chunk {}, vessels: {}, time progress: {}%", - globalChunkIndex, uniqueVesselIds.size(), - Math.round(timeProgress)); - } - } - batches.clear(); // 메모리 즉시 해제: 배치 분할 리스트 - mergedTracks.clear(); // 메모리 즉시 해제: 병합 항적 리스트 - } - mergedMap.clear(); // 메모리 즉시 해제: 선박 누적 맵 - } + // ★ HOURLY/FIVE_MINUTE: L1/L2 캐시에서 직접 처리 + processHourlyFiveMinWithCache(ranges, request, queryId, + chunkConsumer, statusConsumer, uniqueVesselIds, + strategy, benchmark, ctx, metrics); + globalChunkIndex = ctx.getCurrentChunkIndex(); } } @@ -1145,7 +989,7 @@ public class ChunkedTrackStreamingService { .viewportBounds(vpBounds) .requestedMmsi(request.getVesselIds() != null ? request.getVesselIds().size() : 0) .dataPath(benchmark.determinePath()) - .cacheHitDays(benchmark.cacheHitDays) + .cacheHitDays(benchmark.cacheHitDays + benchmark.cacheHourlyRanges + benchmark.cacheFiveMinRanges) .dbQueryDays(benchmark.dbQueryDays) .dbConnTotal(benchmark.dbConnectionTotal()) .uniqueVessels(uniqueVesselIds.size()) @@ -1160,6 +1004,7 @@ public class ChunkedTrackStreamingService { .backpressureEvents(bpMetrics != null ? bpMetrics.backpressureEvents.get() : 0) .status(queryStatus) .clientIp(clientIp) + .clientId(clientId) .build()); } @@ -2576,4 +2421,377 @@ public class ChunkedTrackStreamingService { } } + // ========== L1/L2 캐시 기반 HOURLY/FIVE_MINUTE 처리 ========== + + /** + * HOURLY/FIVE_MINUTE 범위를 L1/L2 캐시에서 직접 처리. + * currentHourStart 기준: >= currentHour → L1(5min), < currentHour → L2(hourly) + * DB fallback 없음 — 캐시에 없으면 데이터 자체가 없음 (배치 Job이 DB+캐시 동시 적재) + */ + private void processHourlyFiveMinWithCache( + List ranges, TrackQueryRequest request, String queryId, + Consumer chunkConsumer, + Consumer statusConsumer, + Set uniqueVesselIds, + TableStrategy originalStrategy, QueryBenchmark benchmark, + QueryContext ctx, BackpressureMetrics bpMetrics) throws Exception { + + LocalDateTime currentHourStart = LocalDateTime.now().withMinute(0).withSecond(0).withNano(0); + + Map> timeGroups = groupRangesByTimeWindow(ranges, 6); + + for (Map.Entry> groupEntry : timeGroups.entrySet()) { + String groupKey = groupEntry.getKey(); + List groupRanges = groupEntry.getValue(); + + if (isQueryCancelled(queryId, null)) return; + + log.info("[CACHE-{}] Processing time window {} with {} ranges", originalStrategy, groupKey, groupRanges.size()); + + LocalDateTime groupStart = groupRanges.get(0).getStart(); + LocalDateTime groupEnd = groupRanges.get(groupRanges.size() - 1).getEnd(); + + // L1/L2 범위 분리: currentHourStart 기준 + Map> l2Result = Collections.emptyMap(); + Map> l1Result = Collections.emptyMap(); + + // L2 범위: groupStart ~ min(groupEnd, currentHourStart) + if (groupStart.isBefore(currentHourStart)) { + LocalDateTime l2End = groupEnd.isBefore(currentHourStart) ? groupEnd : currentHourStart; + LocalDateTime l2StartExpanded = groupStart.withMinute(0).withSecond(0).withNano(0); + l2Result = hourlyTrackCache.getTracksInRange(l2StartExpanded, l2End); + log.info("[CACHE-L2] Range [{}, {}): {} vessels", l2StartExpanded, l2End, l2Result.size()); + } + + // L1 범위: max(groupStart, currentHourStart) ~ groupEnd + if (groupEnd.isAfter(currentHourStart)) { + LocalDateTime l1Start = groupStart.isAfter(currentHourStart) ? groupStart : currentHourStart; + l1Result = fiveMinTrackCache.getTracksInRange(l1Start, groupEnd); + log.info("[CACHE-L1] Range [{}, {}): {} vessels", l1Start, groupEnd, l1Result.size()); + } + + // L1 + L2 결과 merge (MMSI 기준) + Map> merged = new LinkedHashMap<>(l2Result); + for (Map.Entry> l1Entry : l1Result.entrySet()) { + merged.merge(l1Entry.getKey(), l1Entry.getValue(), (existing, newTracks) -> { + List combined = new ArrayList<>(existing); + combined.addAll(newTracks); + combined.sort(Comparator.comparing(VesselTrack::getTimeBucket)); + return combined; + }); + } + + if (merged.isEmpty()) { + log.info("[CACHE-{}] No data in cache for window {}", originalStrategy, groupKey); + continue; + } + + // 뷰포트 필터 + Map> viewportFiltered = filterByViewport(merged, request.getViewport()); + + // VesselTrack → CompactVesselTrack 변환 + List compactTracks = vesselTrackToCompactConverter.convert(viewportFiltered); + + // 간소화 적용 + applySimplification(compactTracks, request, originalStrategy, ctx); + + // VesselAccumulator에 병합 + Map mergedMap = new HashMap<>(compactTracks.size()); + mergeTracks(compactTracks, mergedMap); + + if (!mergedMap.isEmpty()) { + List mergedTracks = mergedMap.entrySet().stream() + .map(entry -> { + VesselAccumulator acc = entry.getValue(); + String nationalCode = acc.mmsi != null && acc.mmsi.length() >= 3 + ? acc.mmsi.substring(0, 3) : null; + double avgSpeed = calculateAvgSpeed(acc.totalDistance, acc.timestamps); + return CompactVesselTrack.builder() + .vesselId(acc.mmsi) + .nationalCode(nationalCode) + .shipName(acc.shipName) + .shipType(acc.shipType) + .shipKindCode(acc.shipKindCode) + .geometry(acc.geometry) + .timestamps(acc.timestamps) + .speeds(acc.speeds) + .totalDistance(acc.totalDistance) + .avgSpeed(avgSpeed) + .maxSpeed(acc.maxSpeed) + .pointCount(acc.pointCount) + .build(); + }) + .collect(Collectors.toList()); + + uniqueVesselIds.addAll(mergedMap.keySet()); + + int totalPoints = mergedTracks.stream().mapToInt(CompactVesselTrack::getPointCount).sum(); + log.info("[CACHE-{}] Time window {} - {} vessels, {} points", + originalStrategy, groupKey, mergedTracks.size(), totalPoints); + + if (benchmark != null) { + benchmark.totalTracks += mergedTracks.size(); + benchmark.totalPointsBefore += totalPoints; + benchmark.totalPointsAfter += totalPoints; + } + + // 청크 분할 및 전송 (백프레셔 포함) + List> batches = splitByMessageSize(mergedTracks, queryId); + + for (List batch : batches) { + if (isQueryCancelled(queryId, null)) return; + + TrackChunkResponse response = new TrackChunkResponse(); + response.setQueryId(queryId); + response.setChunkIndex(ctx.currentGlobalChunkIndex++); + response.setIsLastChunk(false); + response.setTotalChunks(-1); + response.setCompactTracks(batch); + + String timeKey = groupKey + "_" + originalStrategy; + ctx.processedTimeRanges.put(timeKey, groupRanges.stream() + .mapToInt(r -> (int) Duration.between(r.getStart(), r.getEnd()).toMinutes()) + .sum()); + int currentProcessedMin = ctx.processedTimeRanges.values().stream() + .mapToInt(Integer::intValue).sum(); + response.setStats(createChunkStats(batch, uniqueVesselIds, currentProcessedMin, ctx)); + + int chunkSize = batch.stream().mapToInt(this::estimateTrackSize).sum(); + long currentBufferSize = pendingBufferSize.addAndGet(chunkSize); + if (bpMetrics != null) { + bpMetrics.totalBytes.addAndGet(chunkSize); + bpMetrics.chunkCount.incrementAndGet(); + } + + // 백프레셔 대기 + int bpWait = 0; + while (pendingBufferSize.get() > MAX_PENDING_BUFFER) { + Thread.sleep(50); + bpWait++; + if (bpWait > 10 && bpMetrics != null && bpMetrics.dynamicChunkSizeKB > MIN_MESSAGE_SIZE_KB) { + bpMetrics.dynamicChunkSizeKB = Math.max(MIN_MESSAGE_SIZE_KB, + bpMetrics.dynamicChunkSizeKB - 512); + } + } + + try { + chunkConsumer.accept(response); + } finally { + pendingBufferSize.addAndGet(-chunkSize); + } + + double timeProgress = ctx.estimatedTotalMinutes > 0 + ? (double) currentProcessedMin / ctx.estimatedTotalMinutes * 100 : 0; + statusConsumer.accept(new QueryStatusUpdate( + queryId, "PROCESSING", + "Processing chunk " + ctx.currentGlobalChunkIndex, + Math.min(99.0, timeProgress))); + + if (benchmark != null) benchmark.totalBatches++; + } + batches.clear(); + mergedTracks.clear(); + } + mergedMap.clear(); + + // benchmark 갱신 + if (benchmark != null) { + if (originalStrategy == TableStrategy.HOURLY) { + benchmark.cacheHourlyRanges += groupRanges.size(); + } else { + benchmark.cacheFiveMinRanges += groupRanges.size(); + } + } + } + } + + /** + * 캐시 데이터에서 뷰포트 교차 선박 필터링 — JTS 파싱 없이 WKT 좌표 직접 파싱 + */ + private Map> filterByViewport( + Map> tracksByMmsi, ViewportFilter viewport) { + if (viewport == null || !viewport.isValid() || tracksByMmsi.isEmpty()) { + return tracksByMmsi; + } + + double minLon = viewport.getMinLon(), minLat = viewport.getMinLat(); + double maxLon = viewport.getMaxLon(), maxLat = viewport.getMaxLat(); + + Map> filtered = new LinkedHashMap<>(); + + for (Map.Entry> entry : tracksByMmsi.entrySet()) { + boolean inViewport = false; + for (VesselTrack track : entry.getValue()) { + if (isTrackInViewport(track.getTrackGeom(), minLon, minLat, maxLon, maxLat)) { + inViewport = true; + break; + } + } + if (inViewport) { + filtered.put(entry.getKey(), entry.getValue()); + } + } + + return filtered; + } + + /** + * WKT LineStringM 좌표 경량 파싱으로 뷰포트 bbox 교차 확인 + */ + private boolean isTrackInViewport(String wkt, double minLon, double minLat, double maxLon, double maxLat) { + if (wkt == null || wkt.isEmpty()) return false; + + int openParen = wkt.indexOf('('); + int closeParen = wkt.lastIndexOf(')'); + if (openParen < 0 || closeParen <= openParen + 1) return false; + + String coords = wkt.substring(openParen + 1, closeParen); + for (String point : coords.split(",")) { + String trimmed = point.trim(); + int firstSpace = trimmed.indexOf(' '); + if (firstSpace <= 0) continue; + int secondSpace = trimmed.indexOf(' ', firstSpace + 1); + + try { + double lon = Double.parseDouble(trimmed.substring(0, firstSpace)); + double lat = Double.parseDouble(trimmed.substring(firstSpace + 1, + secondSpace > 0 ? secondSpace : trimmed.length())); + if (lon >= minLon && lon <= maxLon && lat >= minLat && lat <= maxLat) { + return true; + } + } catch (NumberFormatException e) { + // skip malformed coordinate + } + } + return false; + } + + /** + * CompactVesselTrack 리스트를 VesselAccumulator 맵에 병합 + */ + private void mergeTracks(List tracks, Map mergedMap) { + for (CompactVesselTrack track : tracks) { + String vesselId = track.getVesselId(); + VesselAccumulator accumulator = mergedMap.get(vesselId); + + if (accumulator == null) { + accumulator = new VesselAccumulator(); + accumulator.mmsi = vesselId; + accumulator.shipName = track.getShipName(); + accumulator.shipType = track.getShipType(); + accumulator.shipKindCode = track.getShipKindCode(); + mergedMap.put(vesselId, accumulator); + } + + accumulator.geometry.addAll(track.getGeometry()); + accumulator.timestamps.addAll(track.getTimestamps()); + accumulator.speeds.addAll(track.getSpeeds()); + accumulator.totalDistance += track.getTotalDistance(); + accumulator.maxSpeed = Math.max(accumulator.maxSpeed, track.getMaxSpeed()); + accumulator.pointCount += track.getPointCount(); + } + } + + /** + * CompactVesselTrack 리스트에 전략/줌 레벨별 간소화 적용 + */ + private void applySimplification(List tracks, TrackQueryRequest request, + TableStrategy strategy, QueryContext ctx) { + if (tracks == null || tracks.isEmpty()) return; + + for (CompactVesselTrack track : tracks) { + if (track.getGeometry() == null || track.getGeometry().size() <= 1) continue; + + int originalSize = track.getGeometry().size(); + List simplifiedGeometry = new ArrayList<>(); + List simplifiedTimestamps = new ArrayList<>(); + List simplifiedSpeeds = new ArrayList<>(); + double[] prevPoint = null; + LocalDateTime prevTime = null; + + for (int i = 0; i < track.getGeometry().size(); i++) { + double[] point = track.getGeometry().get(i); + LocalDateTime currentTime = null; + + String tsStr = track.getTimestamps().get(i); + try { + if (tsStr.matches("\\d{10,}")) { + currentTime = LocalDateTime.ofInstant( + java.time.Instant.ofEpochSecond(Long.parseLong(tsStr)), + java.time.ZoneId.systemDefault()); + } else { + currentTime = LocalDateTime.parse(tsStr, TIMESTAMP_FORMATTER); + } + } catch (Exception e) { + currentTime = LocalDateTime.now(); + } + + boolean include = false; + + if (i == 0 || i == track.getGeometry().size() - 1) { + include = true; + } else if (prevPoint != null && prevTime != null) { + double distance = calculateDistance(prevPoint[1], prevPoint[0], point[1], point[0]); + long minutesSincePrev = ChronoUnit.MINUTES.between(prevTime, currentTime); + + if (strategy == TableStrategy.DAILY) { + include = distance > 0.05 || minutesSincePrev >= 60; + } else if (strategy == TableStrategy.HOURLY) { + include = distance > 1.08 || minutesSincePrev >= 60; + } else { + include = distance > 0.54 || minutesSincePrev >= 30; + } + + if (track.getAvgSpeed() > 0 && track.getAvgSpeed() < 5.0) { + include = distance > 1.5 || minutesSincePrev >= 45; + } + } + + if (include) { + simplifiedGeometry.add(point); + simplifiedTimestamps.add(tsStr); + simplifiedSpeeds.add(track.getSpeeds().size() > i ? track.getSpeeds().get(i) : 0.0); + prevPoint = point; + prevTime = currentTime; + } + } + + // 줌 레벨에 따른 추가 샘플링 + if (request.getZoomLevel() != null && request.getZoomLevel() < 10 && simplifiedGeometry.size() > 2) { + int sampleRate = request.getZoomLevel() < 6 ? 10 : + request.getZoomLevel() < 8 ? 5 : 2; + + List sampledGeometry = new ArrayList<>(); + List sampledTimestamps = new ArrayList<>(); + List sampledSpeeds = new ArrayList<>(); + + for (int i = 0; i < simplifiedGeometry.size(); i++) { + if (i % sampleRate == 0 || i == simplifiedGeometry.size() - 1) { + sampledGeometry.add(simplifiedGeometry.get(i)); + sampledTimestamps.add(simplifiedTimestamps.get(i)); + sampledSpeeds.add(simplifiedSpeeds.get(i)); + } + } + + track.setGeometry(sampledGeometry); + track.setTimestamps(sampledTimestamps); + track.setSpeeds(sampledSpeeds); + track.setPointCount(sampledGeometry.size()); + } else { + track.setGeometry(simplifiedGeometry); + track.setTimestamps(simplifiedTimestamps); + track.setSpeeds(simplifiedSpeeds); + track.setPointCount(simplifiedGeometry.size()); + } + + if (ctx != null && ctx.vesselLogCount < 10 && originalSize > track.getPointCount()) { + double reductionRate = (1 - (double) track.getPointCount() / originalSize) * 100; + log.info("[CACHE-SIMPLIFY] Vessel {} simplified: {} -> {} points ({}% reduced, zoom: {})", + track.getVesselId(), originalSize, track.getPointCount(), + Math.round(reductionRate), request.getZoomLevel()); + ctx.vesselLogCount++; + } + } + } + } \ No newline at end of file