diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java index 0edb022..8fc3e14 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java @@ -36,6 +36,7 @@ import java.util.function.Consumer; import gc.mda.signal_batch.global.websocket.dto.QueryStatusUpdate; import gc.mda.signal_batch.global.websocket.dto.ViewportFilter; import org.springframework.scheduling.annotation.Async; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -82,6 +83,9 @@ public class ChunkedTrackStreamingService { private final Map queryMetrics = new ConcurrentHashMap<>(); @SuppressWarnings("unused") private volatile int currentChunkSizeKB = MAX_MESSAGE_SIZE_KB; + + // 쿼리별 취소 플래그 관리 + private final ConcurrentHashMap queryCancelFlags = new ConcurrentHashMap<>(); // track_geom 고정 사용 @@ -855,6 +859,10 @@ public class ChunkedTrackStreamingService { return; } + // 취소 플래그 등록 + AtomicBoolean cancelFlag = new AtomicBoolean(false); + queryCancelFlags.put(queryId, cancelFlag); + log.info("Starting chunked streaming for query: {}", queryId); queryStartTime = System.currentTimeMillis(); processedTimeRanges.clear(); @@ -887,6 +895,13 @@ public class ChunkedTrackStreamingService { for (TableStrategy strategy : new TableStrategy[]{TableStrategy.DAILY, TableStrategy.HOURLY, TableStrategy.FIVE_MINUTE}) { if (!strategyMap.containsKey(strategy)) continue; + // 취소 확인 + if (cancelFlag.get()) { + log.info("Query {} cancelled before processing strategy {}", queryId, strategy); + statusConsumer.accept(new QueryStatusUpdate(queryId, "CANCELLED", "Query cancelled by user", 0.0)); + return; + } + List ranges = strategyMap.get(strategy); log.info("Processing {} strategy with {} ranges", strategy, ranges.size()); @@ -1182,7 +1197,8 @@ public class ChunkedTrackStreamingService { 0.0 )); } finally { - // 리소스 반환 (순서: 글로벌 슬롯 → 세션 카운트 → 메트릭스) + // 리소스 반환 (순서: 취소 플래그 → 글로벌 슬롯 → 세션 카운트 → 메트릭스) + queryCancelFlags.remove(queryId); if (slotAcquired) { activeQueryManager.releaseQuerySlot(queryId); } @@ -1198,7 +1214,11 @@ public class ChunkedTrackStreamingService { */ public void cancelQuery(String queryId) { log.info("Cancelling chunked query: {}", queryId); - // TODO: Phase 2.1에서 실제 취소 로직 구현 + AtomicBoolean flag = queryCancelFlags.get(queryId); + if (flag != null) { + flag.set(true); + log.info("Cancel flag set for chunked query: {}", queryId); + } cleanupQueryMetrics(queryId); } @@ -2686,6 +2706,14 @@ public class ChunkedTrackStreamingService { return false; } + // 1. queryCancelFlags 확인 (cancelQuery() 호출 시 설정됨) + for (Map.Entry entry : queryCancelFlags.entrySet()) { + if (entry.getValue().get()) { + return true; + } + } + + // 2. ActiveQueryManager 확인 (세션 종료 시 설정됨) ActiveQueryManager.ActiveQuery query = activeQueryManager.getQuery(sessionId); return query != null && query.isCancelled(); }