diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java index 58b99da..909ce56 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java @@ -422,10 +422,31 @@ public class ChunkedTrackStreamingService { long startMs = System.currentTimeMillis(); Set vesselIds = new HashSet<>(); + int cacheHitDays = 0; + int dbQueryDays = 0; for (Map.Entry> entry : strategyMap.entrySet()) { - String tableName = entry.getKey().getTableName(); + TableStrategy strategy = entry.getKey(); + String tableName = strategy.getTableName(); + for (TimeRange range : entry.getValue()) { + // Daily 전략이고 캐시에 있으면 → 메모리에서 뷰포트 체크 (DB 커넥션 절약) + if (strategy == TableStrategy.DAILY && dailyTrackCacheManager.isEnabled()) { + LocalDate rangeDate = range.getStart().toLocalDate(); + if (dailyTrackCacheManager.isCached(rangeDate)) { + List cachedTracks = dailyTrackCacheManager.getCachedTracks( + rangeDate, viewport.getMinLon(), viewport.getMinLat(), + viewport.getMaxLon(), viewport.getMaxLat()); + for (CompactVesselTrack track : cachedTracks) { + vesselIds.add(track.getVesselId()); + } + cacheHitDays++; + continue; + } + } + + // DB 쿼리로 뷰포트 교차 선박 수집 + dbQueryDays++; StringBuilder sql = new StringBuilder(); sql.append("SELECT DISTINCT sig_src_cd, target_id FROM ").append(tableName); sql.append(" WHERE time_bucket >= ? AND time_bucket < ?"); @@ -459,8 +480,8 @@ public class ChunkedTrackStreamingService { } long elapsed = System.currentTimeMillis() - startMs; - log.info("2-pass viewport filter Pass 1: {} unique vessels found across {} tables in {}ms", - vesselIds.size(), strategyMap.size(), elapsed); + log.info("2-pass viewport filter Pass 1: {} vessels, cacheHit={} days, dbQuery={} days, {}ms", + vesselIds.size(), cacheHitDays, dbQueryDays, elapsed); return vesselIds; } @@ -742,7 +763,14 @@ public class ChunkedTrackStreamingService { // 캐시 히트 시 메모리에서 가져옴 if (dailyTrackCacheManager.isEnabled() && dailyTrackCacheManager.isCached(rangeDate)) { - compactTracks = dailyTrackCacheManager.getCachedTracks(rangeDate); + // viewportVesselIds가 있으면 vessel ID 필터만 (공간 필터 재적용 금지) + if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) { + compactTracks = dailyTrackCacheManager.getCachedTracks(rangeDate).stream() + .filter(t -> viewportVesselIds.contains(t.getVesselId())) + .collect(Collectors.toList()); + } else { + compactTracks = dailyTrackCacheManager.getCachedTracks(rangeDate); + } log.info("Daily cache HIT for {} in processQueryInChunks: {} tracks", rangeDate, compactTracks.size()); } else { compactTracks = processDailyTableWithPagination(request, range, null, viewportVesselIds); @@ -2391,8 +2419,15 @@ public class ChunkedTrackStreamingService { log.info("Daily cache HIT for {}: serving from memory", rangeDate); List cachedTracks; - // 뷰포트 필터링 적용 - if (request.getViewport() != null) { + // viewportVesselIds가 있으면 (2-pass 완료) → vessel ID로만 필터링 + // 공간 필터 재적용 금지: 다른 날짜에서 뷰포트 통과한 선박의 이 날짜 항적도 포함해야 함 + if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) { + cachedTracks = dailyTrackCacheManager.getCachedTracks(rangeDate); + cachedTracks = cachedTracks.stream() + .filter(t -> viewportVesselIds.contains(t.getVesselId())) + .collect(Collectors.toList()); + } else if (request.getViewport() != null) { + // 2-pass 없이 직접 뷰포트 필터 (processQueryInChunks 등) ViewportFilter vp = request.getViewport(); cachedTracks = dailyTrackCacheManager.getCachedTracks( rangeDate, vp.getMinLon(), vp.getMinLat(), vp.getMaxLon(), vp.getMaxLat()); @@ -2401,12 +2436,6 @@ public class ChunkedTrackStreamingService { } if (!cachedTracks.isEmpty()) { - // viewportVesselIds 필터 적용 - if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) { - cachedTracks = cachedTracks.stream() - .filter(t -> viewportVesselIds.contains(t.getVesselId())) - .collect(Collectors.toList()); - } // 메시지 크기로 분할하여 전송 List> batches = splitByMessageSize(cachedTracks); diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/StompTrackStreamingService.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/StompTrackStreamingService.java index 5ef2dfd..5ffb9c6 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/StompTrackStreamingService.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/StompTrackStreamingService.java @@ -486,6 +486,14 @@ public class StompTrackStreamingService { List cachedTracks = dailyTrackCacheManager.getCachedTracks(rangeDate); + // 거리/속도 필터 적용 (filteredVessels가 있으면) + if (filteredVessels != null && !filteredVessels.isEmpty()) { + final Set fv = filteredVessels; + cachedTracks = cachedTracks.stream() + .filter(t -> fv.contains(t.getVesselId())) + .collect(Collectors.toList()); + } + if (!cachedTracks.isEmpty()) { TrackChunkResponse chunk = new TrackChunkResponse(); chunk.setQueryId(queryId);