fix: 캐시-DB 하이브리드 조회 시 뷰포트 2-pass 필터링 정합성 보장

문제: 캐시 경로에서 뷰포트 공간 필터를 이중 적용하여,
다른 날짜에서 뷰포트를 통과한 선박의 항적이 누락되는 버그

수정 내용:
- collectViewportVesselIds: 캐시된 날짜는 메모리에서 뷰포트 체크 (DB 커넥션 절약)
- processDailyStrategy: viewportVesselIds(2-pass 결과) 있으면 vessel ID 필터만 적용,
  공간 필터 재적용 금지 → 전체 조회기간 항적 무결성 보장
- processQueryInChunks: 동일 패턴 적용
- StompTrackStreamingService: 캐시 경로에 filteredVessels 필터 적용

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
HeungTak Lee 2026-02-06 15:56:56 +09:00
부모 dc586dde0c
커밋 e9d5d36928
2개의 변경된 파일49개의 추가작업 그리고 12개의 파일을 삭제

파일 보기

@ -422,10 +422,31 @@ public class ChunkedTrackStreamingService {
long startMs = System.currentTimeMillis(); long startMs = System.currentTimeMillis();
Set<String> vesselIds = new HashSet<>(); Set<String> vesselIds = new HashSet<>();
int cacheHitDays = 0;
int dbQueryDays = 0;
for (Map.Entry<TableStrategy, List<TimeRange>> entry : strategyMap.entrySet()) { for (Map.Entry<TableStrategy, List<TimeRange>> entry : strategyMap.entrySet()) {
String tableName = entry.getKey().getTableName(); TableStrategy strategy = entry.getKey();
String tableName = strategy.getTableName();
for (TimeRange range : entry.getValue()) { for (TimeRange range : entry.getValue()) {
// Daily 전략이고 캐시에 있으면 메모리에서 뷰포트 체크 (DB 커넥션 절약)
if (strategy == TableStrategy.DAILY && dailyTrackCacheManager.isEnabled()) {
LocalDate rangeDate = range.getStart().toLocalDate();
if (dailyTrackCacheManager.isCached(rangeDate)) {
List<CompactVesselTrack> cachedTracks = dailyTrackCacheManager.getCachedTracks(
rangeDate, viewport.getMinLon(), viewport.getMinLat(),
viewport.getMaxLon(), viewport.getMaxLat());
for (CompactVesselTrack track : cachedTracks) {
vesselIds.add(track.getVesselId());
}
cacheHitDays++;
continue;
}
}
// DB 쿼리로 뷰포트 교차 선박 수집
dbQueryDays++;
StringBuilder sql = new StringBuilder(); StringBuilder sql = new StringBuilder();
sql.append("SELECT DISTINCT sig_src_cd, target_id FROM ").append(tableName); sql.append("SELECT DISTINCT sig_src_cd, target_id FROM ").append(tableName);
sql.append(" WHERE time_bucket >= ? AND time_bucket < ?"); sql.append(" WHERE time_bucket >= ? AND time_bucket < ?");
@ -459,8 +480,8 @@ public class ChunkedTrackStreamingService {
} }
long elapsed = System.currentTimeMillis() - startMs; long elapsed = System.currentTimeMillis() - startMs;
log.info("2-pass viewport filter Pass 1: {} unique vessels found across {} tables in {}ms", log.info("2-pass viewport filter Pass 1: {} vessels, cacheHit={} days, dbQuery={} days, {}ms",
vesselIds.size(), strategyMap.size(), elapsed); vesselIds.size(), cacheHitDays, dbQueryDays, elapsed);
return vesselIds; return vesselIds;
} }
@ -742,7 +763,14 @@ public class ChunkedTrackStreamingService {
// 캐시 히트 메모리에서 가져옴 // 캐시 히트 메모리에서 가져옴
if (dailyTrackCacheManager.isEnabled() && dailyTrackCacheManager.isCached(rangeDate)) { if (dailyTrackCacheManager.isEnabled() && dailyTrackCacheManager.isCached(rangeDate)) {
compactTracks = dailyTrackCacheManager.getCachedTracks(rangeDate); // viewportVesselIds가 있으면 vessel ID 필터만 (공간 필터 재적용 금지)
if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) {
compactTracks = dailyTrackCacheManager.getCachedTracks(rangeDate).stream()
.filter(t -> viewportVesselIds.contains(t.getVesselId()))
.collect(Collectors.toList());
} else {
compactTracks = dailyTrackCacheManager.getCachedTracks(rangeDate);
}
log.info("Daily cache HIT for {} in processQueryInChunks: {} tracks", rangeDate, compactTracks.size()); log.info("Daily cache HIT for {} in processQueryInChunks: {} tracks", rangeDate, compactTracks.size());
} else { } else {
compactTracks = processDailyTableWithPagination(request, range, null, viewportVesselIds); compactTracks = processDailyTableWithPagination(request, range, null, viewportVesselIds);
@ -2391,8 +2419,15 @@ public class ChunkedTrackStreamingService {
log.info("Daily cache HIT for {}: serving from memory", rangeDate); log.info("Daily cache HIT for {}: serving from memory", rangeDate);
List<CompactVesselTrack> cachedTracks; List<CompactVesselTrack> cachedTracks;
// 뷰포트 필터링 적용 // viewportVesselIds가 있으면 (2-pass 완료) vessel ID로만 필터링
if (request.getViewport() != null) { // 공간 필터 재적용 금지: 다른 날짜에서 뷰포트 통과한 선박의 날짜 항적도 포함해야
if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) {
cachedTracks = dailyTrackCacheManager.getCachedTracks(rangeDate);
cachedTracks = cachedTracks.stream()
.filter(t -> viewportVesselIds.contains(t.getVesselId()))
.collect(Collectors.toList());
} else if (request.getViewport() != null) {
// 2-pass 없이 직접 뷰포트 필터 (processQueryInChunks )
ViewportFilter vp = request.getViewport(); ViewportFilter vp = request.getViewport();
cachedTracks = dailyTrackCacheManager.getCachedTracks( cachedTracks = dailyTrackCacheManager.getCachedTracks(
rangeDate, vp.getMinLon(), vp.getMinLat(), vp.getMaxLon(), vp.getMaxLat()); rangeDate, vp.getMinLon(), vp.getMinLat(), vp.getMaxLon(), vp.getMaxLat());
@ -2401,12 +2436,6 @@ public class ChunkedTrackStreamingService {
} }
if (!cachedTracks.isEmpty()) { if (!cachedTracks.isEmpty()) {
// viewportVesselIds 필터 적용
if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) {
cachedTracks = cachedTracks.stream()
.filter(t -> viewportVesselIds.contains(t.getVesselId()))
.collect(Collectors.toList());
}
// 메시지 크기로 분할하여 전송 // 메시지 크기로 분할하여 전송
List<List<CompactVesselTrack>> batches = splitByMessageSize(cachedTracks); List<List<CompactVesselTrack>> batches = splitByMessageSize(cachedTracks);

파일 보기

@ -486,6 +486,14 @@ public class StompTrackStreamingService {
List<gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack> cachedTracks = List<gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack> cachedTracks =
dailyTrackCacheManager.getCachedTracks(rangeDate); dailyTrackCacheManager.getCachedTracks(rangeDate);
// 거리/속도 필터 적용 (filteredVessels가 있으면)
if (filteredVessels != null && !filteredVessels.isEmpty()) {
final Set<String> fv = filteredVessels;
cachedTracks = cachedTracks.stream()
.filter(t -> fv.contains(t.getVesselId()))
.collect(Collectors.toList());
}
if (!cachedTracks.isEmpty()) { if (!cachedTracks.isEmpty()) {
TrackChunkResponse chunk = new TrackChunkResponse(); TrackChunkResponse chunk = new TrackChunkResponse();
chunk.setQueryId(queryId); chunk.setQueryId(queryId);