feat: ChunkedTrackStreamingService 쿼리 취소 로직 구현
Phase 2.1: 기존 TODO만 있던 cancelQuery()에 실제 취소 메커니즘 추가 - 쿼리별 AtomicBoolean 취소 플래그(queryCancelFlags) 관리 - streamChunkedTracks 시작 시 취소 플래그 등록, finally에서 정리 - 테이블 전략별 루프 진입 전 취소 확인 - isQueryCancelled()에 queryCancelFlags 통합 확인 추가 - cancelQuery() 호출 시 플래그 설정으로 진행 중인 스트리밍 중단 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
부모
122a247faf
커밋
e073007dc2
@ -36,6 +36,7 @@ import java.util.function.Consumer;
|
|||||||
import gc.mda.signal_batch.global.websocket.dto.QueryStatusUpdate;
|
import gc.mda.signal_batch.global.websocket.dto.QueryStatusUpdate;
|
||||||
import gc.mda.signal_batch.global.websocket.dto.ViewportFilter;
|
import gc.mda.signal_batch.global.websocket.dto.ViewportFilter;
|
||||||
import org.springframework.scheduling.annotation.Async;
|
import org.springframework.scheduling.annotation.Async;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
@ -82,6 +83,9 @@ public class ChunkedTrackStreamingService {
|
|||||||
private final Map<String, BackpressureMetrics> queryMetrics = new ConcurrentHashMap<>();
|
private final Map<String, BackpressureMetrics> queryMetrics = new ConcurrentHashMap<>();
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
private volatile int currentChunkSizeKB = MAX_MESSAGE_SIZE_KB;
|
private volatile int currentChunkSizeKB = MAX_MESSAGE_SIZE_KB;
|
||||||
|
|
||||||
|
// 쿼리별 취소 플래그 관리
|
||||||
|
private final ConcurrentHashMap<String, AtomicBoolean> queryCancelFlags = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
// track_geom 고정 사용
|
// track_geom 고정 사용
|
||||||
|
|
||||||
@ -855,6 +859,10 @@ public class ChunkedTrackStreamingService {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 취소 플래그 등록
|
||||||
|
AtomicBoolean cancelFlag = new AtomicBoolean(false);
|
||||||
|
queryCancelFlags.put(queryId, cancelFlag);
|
||||||
|
|
||||||
log.info("Starting chunked streaming for query: {}", queryId);
|
log.info("Starting chunked streaming for query: {}", queryId);
|
||||||
queryStartTime = System.currentTimeMillis();
|
queryStartTime = System.currentTimeMillis();
|
||||||
processedTimeRanges.clear();
|
processedTimeRanges.clear();
|
||||||
@ -887,6 +895,13 @@ public class ChunkedTrackStreamingService {
|
|||||||
for (TableStrategy strategy : new TableStrategy[]{TableStrategy.DAILY, TableStrategy.HOURLY, TableStrategy.FIVE_MINUTE}) {
|
for (TableStrategy strategy : new TableStrategy[]{TableStrategy.DAILY, TableStrategy.HOURLY, TableStrategy.FIVE_MINUTE}) {
|
||||||
if (!strategyMap.containsKey(strategy)) continue;
|
if (!strategyMap.containsKey(strategy)) continue;
|
||||||
|
|
||||||
|
// 취소 확인
|
||||||
|
if (cancelFlag.get()) {
|
||||||
|
log.info("Query {} cancelled before processing strategy {}", queryId, strategy);
|
||||||
|
statusConsumer.accept(new QueryStatusUpdate(queryId, "CANCELLED", "Query cancelled by user", 0.0));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
List<TimeRange> ranges = strategyMap.get(strategy);
|
List<TimeRange> ranges = strategyMap.get(strategy);
|
||||||
log.info("Processing {} strategy with {} ranges", strategy, ranges.size());
|
log.info("Processing {} strategy with {} ranges", strategy, ranges.size());
|
||||||
|
|
||||||
@ -1182,7 +1197,8 @@ public class ChunkedTrackStreamingService {
|
|||||||
0.0
|
0.0
|
||||||
));
|
));
|
||||||
} finally {
|
} finally {
|
||||||
// 리소스 반환 (순서: 글로벌 슬롯 → 세션 카운트 → 메트릭스)
|
// 리소스 반환 (순서: 취소 플래그 → 글로벌 슬롯 → 세션 카운트 → 메트릭스)
|
||||||
|
queryCancelFlags.remove(queryId);
|
||||||
if (slotAcquired) {
|
if (slotAcquired) {
|
||||||
activeQueryManager.releaseQuerySlot(queryId);
|
activeQueryManager.releaseQuerySlot(queryId);
|
||||||
}
|
}
|
||||||
@ -1198,7 +1214,11 @@ public class ChunkedTrackStreamingService {
|
|||||||
*/
|
*/
|
||||||
public void cancelQuery(String queryId) {
|
public void cancelQuery(String queryId) {
|
||||||
log.info("Cancelling chunked query: {}", queryId);
|
log.info("Cancelling chunked query: {}", queryId);
|
||||||
// TODO: Phase 2.1에서 실제 취소 로직 구현
|
AtomicBoolean flag = queryCancelFlags.get(queryId);
|
||||||
|
if (flag != null) {
|
||||||
|
flag.set(true);
|
||||||
|
log.info("Cancel flag set for chunked query: {}", queryId);
|
||||||
|
}
|
||||||
cleanupQueryMetrics(queryId);
|
cleanupQueryMetrics(queryId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2686,6 +2706,14 @@ public class ChunkedTrackStreamingService {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 1. queryCancelFlags 확인 (cancelQuery() 호출 시 설정됨)
|
||||||
|
for (Map.Entry<String, AtomicBoolean> entry : queryCancelFlags.entrySet()) {
|
||||||
|
if (entry.getValue().get()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. ActiveQueryManager 확인 (세션 종료 시 설정됨)
|
||||||
ActiveQueryManager.ActiveQuery query = activeQueryManager.getQuery(sessionId);
|
ActiveQueryManager.ActiveQuery query = activeQueryManager.getQuery(sessionId);
|
||||||
return query != null && query.isCancelled();
|
return query != null && query.isCancelled();
|
||||||
}
|
}
|
||||||
|
|||||||
불러오는 중...
Reference in New Issue
Block a user