feat: ChunkedTrackStreamingService 쿼리 취소 로직 구현

Phase 2.1: 기존 TODO만 있던 cancelQuery()에 실제 취소 메커니즘 추가
- 쿼리별 AtomicBoolean 취소 플래그(queryCancelFlags) 관리
- streamChunkedTracks 시작 시 취소 플래그 등록, finally에서 정리
- 테이블 전략별 루프 진입 전 취소 확인
- isQueryCancelled()에 queryCancelFlags 통합 확인 추가
- cancelQuery() 호출 시 플래그 설정으로 진행 중인 스트리밍 중단

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
HeungTak Lee 2026-02-06 13:40:55 +09:00
부모 122a247faf
커밋 e073007dc2

파일 보기

@ -36,6 +36,7 @@ import java.util.function.Consumer;
import gc.mda.signal_batch.global.websocket.dto.QueryStatusUpdate; import gc.mda.signal_batch.global.websocket.dto.QueryStatusUpdate;
import gc.mda.signal_batch.global.websocket.dto.ViewportFilter; import gc.mda.signal_batch.global.websocket.dto.ViewportFilter;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -82,6 +83,9 @@ public class ChunkedTrackStreamingService {
private final Map<String, BackpressureMetrics> queryMetrics = new ConcurrentHashMap<>(); private final Map<String, BackpressureMetrics> queryMetrics = new ConcurrentHashMap<>();
@SuppressWarnings("unused") @SuppressWarnings("unused")
private volatile int currentChunkSizeKB = MAX_MESSAGE_SIZE_KB; private volatile int currentChunkSizeKB = MAX_MESSAGE_SIZE_KB;
// 쿼리별 취소 플래그 관리
private final ConcurrentHashMap<String, AtomicBoolean> queryCancelFlags = new ConcurrentHashMap<>();
// track_geom 고정 사용 // track_geom 고정 사용
@ -855,6 +859,10 @@ public class ChunkedTrackStreamingService {
return; return;
} }
// 취소 플래그 등록
AtomicBoolean cancelFlag = new AtomicBoolean(false);
queryCancelFlags.put(queryId, cancelFlag);
log.info("Starting chunked streaming for query: {}", queryId); log.info("Starting chunked streaming for query: {}", queryId);
queryStartTime = System.currentTimeMillis(); queryStartTime = System.currentTimeMillis();
processedTimeRanges.clear(); processedTimeRanges.clear();
@ -887,6 +895,13 @@ public class ChunkedTrackStreamingService {
for (TableStrategy strategy : new TableStrategy[]{TableStrategy.DAILY, TableStrategy.HOURLY, TableStrategy.FIVE_MINUTE}) { for (TableStrategy strategy : new TableStrategy[]{TableStrategy.DAILY, TableStrategy.HOURLY, TableStrategy.FIVE_MINUTE}) {
if (!strategyMap.containsKey(strategy)) continue; if (!strategyMap.containsKey(strategy)) continue;
// 취소 확인
if (cancelFlag.get()) {
log.info("Query {} cancelled before processing strategy {}", queryId, strategy);
statusConsumer.accept(new QueryStatusUpdate(queryId, "CANCELLED", "Query cancelled by user", 0.0));
return;
}
List<TimeRange> ranges = strategyMap.get(strategy); List<TimeRange> ranges = strategyMap.get(strategy);
log.info("Processing {} strategy with {} ranges", strategy, ranges.size()); log.info("Processing {} strategy with {} ranges", strategy, ranges.size());
@ -1182,7 +1197,8 @@ public class ChunkedTrackStreamingService {
0.0 0.0
)); ));
} finally { } finally {
// 리소스 반환 (순서: 글로벌 슬롯 세션 카운트 메트릭스) // 리소스 반환 (순서: 취소 플래그 글로벌 슬롯 세션 카운트 메트릭스)
queryCancelFlags.remove(queryId);
if (slotAcquired) { if (slotAcquired) {
activeQueryManager.releaseQuerySlot(queryId); activeQueryManager.releaseQuerySlot(queryId);
} }
@ -1198,7 +1214,11 @@ public class ChunkedTrackStreamingService {
*/ */
public void cancelQuery(String queryId) { public void cancelQuery(String queryId) {
log.info("Cancelling chunked query: {}", queryId); log.info("Cancelling chunked query: {}", queryId);
// TODO: Phase 2.1에서 실제 취소 로직 구현 AtomicBoolean flag = queryCancelFlags.get(queryId);
if (flag != null) {
flag.set(true);
log.info("Cancel flag set for chunked query: {}", queryId);
}
cleanupQueryMetrics(queryId); cleanupQueryMetrics(queryId);
} }
@ -2686,6 +2706,14 @@ public class ChunkedTrackStreamingService {
return false; return false;
} }
// 1. queryCancelFlags 확인 (cancelQuery() 호출 설정됨)
for (Map.Entry<String, AtomicBoolean> entry : queryCancelFlags.entrySet()) {
if (entry.getValue().get()) {
return true;
}
}
// 2. ActiveQueryManager 확인 (세션 종료 설정됨)
ActiveQueryManager.ActiveQuery query = activeQueryManager.getQuery(sessionId); ActiveQueryManager.ActiveQuery query = activeQueryManager.getQuery(sessionId);
return query != null && query.isCancelled(); return query != null && query.isCancelled();
} }