feat: ChunkedTrackStreamingService 쿼리 취소 로직 구현
Phase 2.1: 기존 TODO만 있던 cancelQuery()에 실제 취소 메커니즘 추가 - 쿼리별 AtomicBoolean 취소 플래그(queryCancelFlags) 관리 - streamChunkedTracks 시작 시 취소 플래그 등록, finally에서 정리 - 테이블 전략별 루프 진입 전 취소 확인 - isQueryCancelled()에 queryCancelFlags 통합 확인 추가 - cancelQuery() 호출 시 플래그 설정으로 진행 중인 스트리밍 중단 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
부모
122a247faf
커밋
e073007dc2
@ -36,6 +36,7 @@ import java.util.function.Consumer;
|
||||
import gc.mda.signal_batch.global.websocket.dto.QueryStatusUpdate;
|
||||
import gc.mda.signal_batch.global.websocket.dto.ViewportFilter;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
@ -82,6 +83,9 @@ public class ChunkedTrackStreamingService {
|
||||
private final Map<String, BackpressureMetrics> queryMetrics = new ConcurrentHashMap<>();
|
||||
@SuppressWarnings("unused")
|
||||
private volatile int currentChunkSizeKB = MAX_MESSAGE_SIZE_KB;
|
||||
|
||||
// 쿼리별 취소 플래그 관리
|
||||
private final ConcurrentHashMap<String, AtomicBoolean> queryCancelFlags = new ConcurrentHashMap<>();
|
||||
|
||||
// track_geom 고정 사용
|
||||
|
||||
@ -855,6 +859,10 @@ public class ChunkedTrackStreamingService {
|
||||
return;
|
||||
}
|
||||
|
||||
// 취소 플래그 등록
|
||||
AtomicBoolean cancelFlag = new AtomicBoolean(false);
|
||||
queryCancelFlags.put(queryId, cancelFlag);
|
||||
|
||||
log.info("Starting chunked streaming for query: {}", queryId);
|
||||
queryStartTime = System.currentTimeMillis();
|
||||
processedTimeRanges.clear();
|
||||
@ -887,6 +895,13 @@ public class ChunkedTrackStreamingService {
|
||||
for (TableStrategy strategy : new TableStrategy[]{TableStrategy.DAILY, TableStrategy.HOURLY, TableStrategy.FIVE_MINUTE}) {
|
||||
if (!strategyMap.containsKey(strategy)) continue;
|
||||
|
||||
// 취소 확인
|
||||
if (cancelFlag.get()) {
|
||||
log.info("Query {} cancelled before processing strategy {}", queryId, strategy);
|
||||
statusConsumer.accept(new QueryStatusUpdate(queryId, "CANCELLED", "Query cancelled by user", 0.0));
|
||||
return;
|
||||
}
|
||||
|
||||
List<TimeRange> ranges = strategyMap.get(strategy);
|
||||
log.info("Processing {} strategy with {} ranges", strategy, ranges.size());
|
||||
|
||||
@ -1182,7 +1197,8 @@ public class ChunkedTrackStreamingService {
|
||||
0.0
|
||||
));
|
||||
} finally {
|
||||
// 리소스 반환 (순서: 글로벌 슬롯 → 세션 카운트 → 메트릭스)
|
||||
// 리소스 반환 (순서: 취소 플래그 → 글로벌 슬롯 → 세션 카운트 → 메트릭스)
|
||||
queryCancelFlags.remove(queryId);
|
||||
if (slotAcquired) {
|
||||
activeQueryManager.releaseQuerySlot(queryId);
|
||||
}
|
||||
@ -1198,7 +1214,11 @@ public class ChunkedTrackStreamingService {
|
||||
*/
|
||||
public void cancelQuery(String queryId) {
|
||||
log.info("Cancelling chunked query: {}", queryId);
|
||||
// TODO: Phase 2.1에서 실제 취소 로직 구현
|
||||
AtomicBoolean flag = queryCancelFlags.get(queryId);
|
||||
if (flag != null) {
|
||||
flag.set(true);
|
||||
log.info("Cancel flag set for chunked query: {}", queryId);
|
||||
}
|
||||
cleanupQueryMetrics(queryId);
|
||||
}
|
||||
|
||||
@ -2686,6 +2706,14 @@ public class ChunkedTrackStreamingService {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 1. queryCancelFlags 확인 (cancelQuery() 호출 시 설정됨)
|
||||
for (Map.Entry<String, AtomicBoolean> entry : queryCancelFlags.entrySet()) {
|
||||
if (entry.getValue().get()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// 2. ActiveQueryManager 확인 (세션 종료 시 설정됨)
|
||||
ActiveQueryManager.ActiveQuery query = activeQueryManager.getQuery(sessionId);
|
||||
return query != null && query.isCancelled();
|
||||
}
|
||||
|
||||
불러오는 중...
Reference in New Issue
Block a user