From e073007dc26cec41dc6c57742ee0956c44e8867d Mon Sep 17 00:00:00 2001 From: HeungTak Lee Date: Fri, 6 Feb 2026 13:40:55 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20ChunkedTrackStreamingService=20?= =?UTF-8?q?=EC=BF=BC=EB=A6=AC=20=EC=B7=A8=EC=86=8C=20=EB=A1=9C=EC=A7=81=20?= =?UTF-8?q?=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 2.1: 기존 TODO만 있던 cancelQuery()에 실제 취소 메커니즘 추가 - 쿼리별 AtomicBoolean 취소 플래그(queryCancelFlags) 관리 - streamChunkedTracks 시작 시 취소 플래그 등록, finally에서 정리 - 테이블 전략별 루프 진입 전 취소 확인 - isQueryCancelled()에 queryCancelFlags 통합 확인 추가 - cancelQuery() 호출 시 플래그 설정으로 진행 중인 스트리밍 중단 Co-Authored-By: Claude Opus 4.6 --- .../service/ChunkedTrackStreamingService.java | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java index 0edb022..8fc3e14 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java @@ -36,6 +36,7 @@ import java.util.function.Consumer; import gc.mda.signal_batch.global.websocket.dto.QueryStatusUpdate; import gc.mda.signal_batch.global.websocket.dto.ViewportFilter; import org.springframework.scheduling.annotation.Async; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -82,6 +83,9 @@ public class ChunkedTrackStreamingService { private final Map queryMetrics = new ConcurrentHashMap<>(); @SuppressWarnings("unused") private volatile int currentChunkSizeKB = MAX_MESSAGE_SIZE_KB; + + // 쿼리별 취소 플래그 관리 + private final ConcurrentHashMap queryCancelFlags = new ConcurrentHashMap<>(); // track_geom 고정 사용 @@ -855,6 +859,10 @@ public class ChunkedTrackStreamingService { return; } + // 취소 플래그 등록 + AtomicBoolean cancelFlag = new AtomicBoolean(false); + queryCancelFlags.put(queryId, cancelFlag); + log.info("Starting chunked streaming for query: {}", queryId); queryStartTime = System.currentTimeMillis(); processedTimeRanges.clear(); @@ -887,6 +895,13 @@ public class ChunkedTrackStreamingService { for (TableStrategy strategy : new TableStrategy[]{TableStrategy.DAILY, TableStrategy.HOURLY, TableStrategy.FIVE_MINUTE}) { if (!strategyMap.containsKey(strategy)) continue; + // 취소 확인 + if (cancelFlag.get()) { + log.info("Query {} cancelled before processing strategy {}", queryId, strategy); + statusConsumer.accept(new QueryStatusUpdate(queryId, "CANCELLED", "Query cancelled by user", 0.0)); + return; + } + List ranges = strategyMap.get(strategy); log.info("Processing {} strategy with {} ranges", strategy, ranges.size()); @@ -1182,7 +1197,8 @@ public class ChunkedTrackStreamingService { 0.0 )); } finally { - // 리소스 반환 (순서: 글로벌 슬롯 → 세션 카운트 → 메트릭스) + // 리소스 반환 (순서: 취소 플래그 → 글로벌 슬롯 → 세션 카운트 → 메트릭스) + queryCancelFlags.remove(queryId); if (slotAcquired) { activeQueryManager.releaseQuerySlot(queryId); } @@ -1198,7 +1214,11 @@ public class ChunkedTrackStreamingService { */ public void cancelQuery(String queryId) { log.info("Cancelling chunked query: {}", queryId); - // TODO: Phase 2.1에서 실제 취소 로직 구현 + AtomicBoolean flag = queryCancelFlags.get(queryId); + if (flag != null) { + flag.set(true); + log.info("Cancel flag set for chunked query: {}", queryId); + } cleanupQueryMetrics(queryId); } @@ -2686,6 +2706,14 @@ public class ChunkedTrackStreamingService { return false; } + // 1. queryCancelFlags 확인 (cancelQuery() 호출 시 설정됨) + for (Map.Entry entry : queryCancelFlags.entrySet()) { + if (entry.getValue().get()) { + return true; + } + } + + // 2. ActiveQueryManager 확인 (세션 종료 시 설정됨) ActiveQueryManager.ActiveQuery query = activeQueryManager.getQuery(sessionId); return query != null && query.isCancelled(); }