feat: WebSocket 리플레이 캐시 통합 + 쿼리 메트릭 사용자 ID 수집 #115

병합
htlee feature/ws-cache-integration-and-client-metrics 에서 develop 로 3 commits 를 머지했습니다 2026-03-27 06:36:30 +09:00
Showing only changes of commit 8a97321a90 - Show all commits

파일 보기

@ -1,7 +1,11 @@
package gc.mda.signal_batch.global.websocket.service;
import gc.mda.signal_batch.batch.reader.FiveMinTrackCache;
import gc.mda.signal_batch.batch.reader.HourlyTrackCache;
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
import gc.mda.signal_batch.global.exception.MemoryBudgetExceededException;
import gc.mda.signal_batch.global.util.TrackMemoryEstimator;
import gc.mda.signal_batch.global.util.VesselTrackToCompactConverter;
import gc.mda.signal_batch.global.websocket.dto.TrackChunkResponse;
import gc.mda.signal_batch.global.websocket.interceptor.TrackQueryInterceptor;
import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack;
@ -55,6 +59,9 @@ public class ChunkedTrackStreamingService {
private final CacheTrackSimplifier cacheTrackSimplifier;
private final TrackMemoryBudgetManager memoryBudgetManager;
private final QueryMetricsBufferService queryMetricsBufferService;
private final HourlyTrackCache hourlyTrackCache;
private final FiveMinTrackCache fiveMinTrackCache;
private final VesselTrackToCompactConverter vesselTrackToCompactConverter;
private static final ThreadLocal<WKTReader> wktReaderLocal = ThreadLocal.withInitial(WKTReader::new);
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final int MAX_TRACKS_PER_CHUNK = 500000; // 청크당 최대 트랙 (10만 선박 지원)
@ -105,7 +112,10 @@ public class ChunkedTrackStreamingService {
DailyTrackCacheManager dailyTrackCacheManager,
CacheTrackSimplifier cacheTrackSimplifier,
TrackMemoryBudgetManager memoryBudgetManager,
QueryMetricsBufferService queryMetricsBufferService) {
QueryMetricsBufferService queryMetricsBufferService,
HourlyTrackCache hourlyTrackCache,
FiveMinTrackCache fiveMinTrackCache,
VesselTrackToCompactConverter vesselTrackToCompactConverter) {
this.queryJdbcTemplate = queryJdbcTemplate;
this.queryDataSource = queryDataSource;
this.activeQueryManager = activeQueryManager;
@ -114,6 +124,9 @@ public class ChunkedTrackStreamingService {
this.cacheTrackSimplifier = cacheTrackSimplifier;
this.memoryBudgetManager = memoryBudgetManager;
this.queryMetricsBufferService = queryMetricsBufferService;
this.hourlyTrackCache = hourlyTrackCache;
this.fiveMinTrackCache = fiveMinTrackCache;
this.vesselTrackToCompactConverter = vesselTrackToCompactConverter;
}
/**
@ -153,6 +166,8 @@ public class ChunkedTrackStreamingService {
private static class QueryBenchmark {
int cacheHitDays = 0;
int dbQueryDays = 0;
int cacheHourlyRanges = 0; // L2 캐시 그룹
int cacheFiveMinRanges = 0; // L1 캐시 그룹
int totalTracks = 0;
int totalPointsBefore = 0;
int totalPointsAfter = 0;
@ -164,13 +179,15 @@ public class ChunkedTrackStreamingService {
int connViewportPass1 = 0; // collectViewportVesselIds DB 쿼리
int connDailyPages = 0; // streamDailyTableWithPagination 페이지
int connVesselInfo = 0; // preloadVesselInfoWithSessionCache 배치
int connHourly5min = 0; // processTableRange (5min/hourly)
int connHourly5min = 0; // processTableRange (5min/hourly) 기존 DB 경로 잔류용
int connTableCheck = 0; // hasDataInTable 존재 검증
Integer zoomLevel;
String determinePath() {
if (cacheHitDays > 0 && dbQueryDays > 0) return "HYBRID";
if (cacheHitDays > 0) return "CACHE";
boolean anyCache = cacheHitDays > 0 || cacheHourlyRanges > 0 || cacheFiveMinRanges > 0;
boolean anyDb = dbQueryDays > 0;
if (anyCache && anyDb) return "HYBRID";
if (anyCache) return "CACHE";
return "DB";
}
@ -182,6 +199,7 @@ public class ChunkedTrackStreamingService {
return String.format(
"{\"queryId\":\"%s\",\"timestamp\":\"%s\",\"path\":\"%s\"," +
"\"zoomLevel\":%s,\"dateRanges\":%d,\"cacheHitDays\":%d,\"dbQueryDays\":%d," +
"\"cacheHourlyRanges\":%d,\"cacheFiveMinRanges\":%d," +
"\"totalTracks\":%d,\"totalPointsBefore\":%d,\"totalPointsAfter\":%d," +
"\"pointReductionPct\":%d,\"totalBatches\":%d,\"batchesBeforeSimplify\":%d," +
"\"simplifyTimeMs\":%d,\"dbQueryTimeMs\":%d,\"totalElapsedMs\":%d," +
@ -192,8 +210,9 @@ public class ChunkedTrackStreamingService {
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),
determinePath(),
zoomLevel != null ? zoomLevel.toString() : "null",
cacheHitDays + dbQueryDays,
cacheHitDays + dbQueryDays + cacheHourlyRanges + cacheFiveMinRanges,
cacheHitDays, dbQueryDays,
cacheHourlyRanges, cacheFiveMinRanges,
totalTracks, totalPointsBefore, totalPointsAfter,
totalPointsBefore > 0 ? Math.round((1 - (double) totalPointsAfter / totalPointsBefore) * 100) : 0,
totalBatches, batchesBeforeSimplify,
@ -238,12 +257,12 @@ public class ChunkedTrackStreamingService {
// DB에서 조회 (signal_kind_code는 캐시 저장 치환된 )
try {
String sql = "SELECT ship_nm, vessel_type, signal_kind_code FROM signal.t_ais_position " +
String sql = "SELECT name, vessel_type, signal_kind_code FROM signal.t_ais_position " +
"WHERE mmsi = ? LIMIT 1";
VesselInfo info = queryJdbcTemplate.queryForObject(sql,
(rs, rowNum) -> new VesselInfo(
rs.getString("ship_nm"),
rs.getString("name"),
rs.getString("vessel_type"),
rs.getString("signal_kind_code")
),
@ -320,7 +339,7 @@ public class ChunkedTrackStreamingService {
.map(id -> "?")
.collect(Collectors.joining(","));
String sql = "SELECT mmsi, ship_nm, vessel_type, signal_kind_code " +
String sql = "SELECT mmsi, name, vessel_type, signal_kind_code " +
"FROM signal.t_ais_position " +
"WHERE mmsi IN (" + placeholders + ")";
@ -329,7 +348,7 @@ public class ChunkedTrackStreamingService {
queryJdbcTemplate.query(sql, rs -> {
String visselId = rs.getString("mmsi");
VesselInfo info = new VesselInfo(
rs.getString("ship_nm"),
rs.getString("name"),
rs.getString("vessel_type"),
rs.getString("signal_kind_code")
);
@ -389,7 +408,7 @@ public class ChunkedTrackStreamingService {
.map(id -> "?")
.collect(Collectors.joining(","));
String sql = "SELECT mmsi, ship_nm, vessel_type, signal_kind_code " +
String sql = "SELECT mmsi, name, vessel_type, signal_kind_code " +
"FROM signal.t_ais_position " +
"WHERE mmsi IN (" + placeholders + ")";
@ -398,7 +417,7 @@ public class ChunkedTrackStreamingService {
queryJdbcTemplate.query(sql, rs -> {
String vesselId = rs.getString("mmsi");
VesselInfo info = new VesselInfo(
rs.getString("ship_nm"),
rs.getString("name"),
rs.getString("vessel_type"),
rs.getString("signal_kind_code")
);
@ -447,6 +466,12 @@ public class ChunkedTrackStreamingService {
for (Map.Entry<TableStrategy, List<TimeRange>> entry : strategyMap.entrySet()) {
TableStrategy strategy = entry.getKey();
// HOURLY/FIVE_MINUTE: 캐시에서 직접 뷰포트 필터 수행 Pass 1 DB 쿼리 불필요
if (strategy == TableStrategy.HOURLY || strategy == TableStrategy.FIVE_MINUTE) {
continue;
}
String tableName = strategy.getTableName();
for (TimeRange range : entry.getValue()) {
@ -725,7 +750,8 @@ public class ChunkedTrackStreamingService {
String sessionId,
Consumer<TrackChunkResponse> chunkConsumer,
Consumer<QueryStatusUpdate> statusConsumer,
String clientIp) {
String clientIp,
String clientId) {
boolean slotAcquired = false;
QueryBenchmark benchmark = null;
QueryContext ctx = null;
@ -867,193 +893,11 @@ public class ChunkedTrackStreamingService {
globalChunkIndex, uniqueVesselIds, viewportVesselIds, benchmark, ctx);
globalChunkIndex = ctx.getCurrentChunkIndex();
} else {
// Hourly/5min은 6시간 단위로 그룹화하여 처리
Map<String, List<TimeRange>> timeGroups = groupRangesByTimeWindow(ranges, 6);
for (Map.Entry<String, List<TimeRange>> groupEntry : timeGroups.entrySet()) {
String groupKey = groupEntry.getKey();
List<TimeRange> groupRanges = groupEntry.getValue();
log.info("[{}] Processing time window {} with {} ranges", strategy, groupKey, groupRanges.size());
// 시간 그룹 데이터를 병합
Map<String, VesselAccumulator> mergedMap = new HashMap<>(20000);
LocalDateTime baseTime = null;
for (TimeRange range : groupRanges) {
try {
// 범위의 시작 시간을 기준으로 설정
if (baseTime == null) {
baseTime = range.getStart();
}
List<CompactVesselTrack> compactTracks = processTableRangeWithBaseTime(
request, strategy, range, baseTime, viewportVesselIds, queryId, ctx);
if (benchmark != null) benchmark.connHourly5min++; // [BENCHMARK]
// 선박별로 볕합
for (CompactVesselTrack track : compactTracks) {
String vesselId = track.getVesselId();
VesselAccumulator accumulator = mergedMap.get(vesselId);
if (accumulator == null) {
accumulator = new VesselAccumulator();
accumulator.mmsi = track.getVesselId();
VesselInfo vesselInfo = getVesselInfo(track.getVesselId());
accumulator.shipName = vesselInfo.shipName;
accumulator.shipType = vesselInfo.shipType;
accumulator.shipKindCode = vesselInfo.signalKindCode;
mergedMap.put(vesselId, accumulator);
}
// 데이터 병합
accumulator.geometry.addAll(track.getGeometry());
accumulator.timestamps.addAll(track.getTimestamps());
accumulator.speeds.addAll(track.getSpeeds());
accumulator.totalDistance += track.getTotalDistance();
accumulator.maxSpeed = Math.max(accumulator.maxSpeed, track.getMaxSpeed());
accumulator.pointCount += track.getPointCount();
}
} catch (Exception e) {
log.error("Error processing {} range {}: {}", strategy, range, e.getMessage());
}
}
// 병합된 데이터를 청크로 분할하여 전송
if (!mergedMap.isEmpty()) {
List<CompactVesselTrack> mergedTracks = mergedMap.entrySet().stream()
.map(entry -> {
String vesselId = entry.getKey();
VesselAccumulator acc = entry.getValue();
double avgSpeed = calculateAvgSpeed(acc.totalDistance, acc.timestamps);
return CompactVesselTrack.builder()
.vesselId(vesselId)
.nationalCode(acc.mmsi != null && acc.mmsi.length() >= 3 ? acc.mmsi.substring(0, 3) : null)
.shipName(acc.shipName)
.shipType(acc.shipType)
.shipKindCode(acc.shipKindCode)
.geometry(acc.geometry)
.timestamps(acc.timestamps)
.speeds(acc.speeds)
.totalDistance(acc.totalDistance)
.avgSpeed(avgSpeed)
.maxSpeed(acc.maxSpeed)
.pointCount(acc.pointCount)
.build();
})
.collect(Collectors.toList());
// 전체 포인트 통계 계산
int totalOriginalPoints = mergedTracks.stream()
.mapToInt(t -> t.getPointCount())
.sum();
log.info("[{}] Time window {} - Merged {} vessels, Total {} points",
strategy, groupKey, mergedTracks.size(), totalOriginalPoints);
List<List<CompactVesselTrack>> batches = splitByMessageSize(mergedTracks, queryId);
for (List<CompactVesselTrack> batch : batches) {
TrackChunkResponse response = new TrackChunkResponse();
response.setQueryId(queryId);
response.setChunkIndex(globalChunkIndex++);
response.setIsLastChunk(false);
response.setTotalChunks(-1); // 마짉 청크에서 설정
response.setCompactTracks(batch);
// 처리된 시간 추가
String timeKey = groupKey + "_" + strategy;
ctx.processedTimeRanges.put(timeKey, groupRanges.stream()
.mapToInt(r -> (int)Duration.between(r.getStart(), r.getEnd()).toMinutes())
.sum());
int currentProcessedMin = ctx.processedTimeRanges.values().stream().mapToInt(Integer::intValue).sum();
response.setStats(createChunkStats(batch, uniqueVesselIds, currentProcessedMin, ctx));
// 버퍼 크기 계산 추가
int chunkSize = batch.stream().mapToInt(t -> estimateTrackSize(t)).sum();
long currentBufferSize = pendingBufferSize.addAndGet(chunkSize);
metrics.totalBytes.addAndGet(chunkSize);
metrics.chunkCount.incrementAndGet();
// 버퍼 사용률 로그
double bufferUsage = (double) currentBufferSize / MAX_PENDING_BUFFER * 100;
if (bufferUsage > 80 && System.currentTimeMillis() - metrics.lastWarningTime > 5000) {
metrics.bufferWarnings.incrementAndGet();
metrics.lastWarningTime = System.currentTimeMillis();
log.warn("[BACKPRESSURE] Query {} - Buffer usage high: {}% ({} MB / {} MB)",
queryId, String.format("%.1f", bufferUsage), currentBufferSize / (1024 * 1024),
MAX_PENDING_BUFFER / (1024 * 1024));
}
// 버퍼가 가듍 경우 대기 동적 청크 크기 조절
int backpressureWaitCount = 0;
while (pendingBufferSize.get() > MAX_PENDING_BUFFER) {
if (backpressureWaitCount == 0) {
metrics.backpressureEvents.incrementAndGet();
log.warn("[BACKPRESSURE] Query {} - Buffer full! Waiting for buffer to drain. Current: {} MB",
queryId, pendingBufferSize.get() / (1024 * 1024));
}
Thread.sleep(50);
backpressureWaitCount++;
// 500ms 이상 대기 청크 크기 감소
if (backpressureWaitCount > 10 && metrics.dynamicChunkSizeKB > MIN_MESSAGE_SIZE_KB) {
int oldSize = metrics.dynamicChunkSizeKB;
metrics.dynamicChunkSizeKB = Math.max(MIN_MESSAGE_SIZE_KB,
metrics.dynamicChunkSizeKB - 512);
log.info("[BACKPRESSURE] Query {} - Reducing chunk size: {} KB -> {} KB",
queryId, oldSize, metrics.dynamicChunkSizeKB);
}
}
if (backpressureWaitCount > 0) {
log.info("[BACKPRESSURE] Query {} - Buffer drained after {} ms wait",
queryId, backpressureWaitCount * 50);
}
// 전송 버퍼 추적 (전송 완료 즉시 감소)
try {
chunkConsumer.accept(response);
} finally {
pendingBufferSize.addAndGet(-chunkSize);
}
// 유니크 선박 카운트
batch.forEach(track -> uniqueVesselIds.add(track.getVesselId()));
// 진행률 업데이트 (시간 기반만 사용)
double timeProgress = (double) currentProcessedMin / ctx.estimatedTotalMinutes * 100;
statusConsumer.accept(new QueryStatusUpdate(
queryId,
"PROCESSING",
"Processing chunk " + globalChunkIndex,
Math.min(99.0, timeProgress)
));
// 버퍼 사용률에 따른 적응형 대기
double currentBufferUsage = (double) pendingBufferSize.get() / MAX_PENDING_BUFFER;
int waitTime;
if (currentBufferUsage > 0.8) waitTime = 200; // 80%: 강한 억제
else if (currentBufferUsage > 0.5) waitTime = 100; // 50%: 중간 억제
else if (currentBufferUsage > 0.3) waitTime = 50; // 30%: 약한 억제
else waitTime = 10; // 정상: 최소 지연
Thread.sleep(waitTime);
// 진행 상황 로그 ( 10번째 청크마다)
if (globalChunkIndex % 10 == 0) {
log.info("Progress: chunk {}, vessels: {}, time progress: {}%",
globalChunkIndex, uniqueVesselIds.size(),
Math.round(timeProgress));
}
}
batches.clear(); // 메모리 즉시 해제: 배치 분할 리스트
mergedTracks.clear(); // 메모리 즉시 해제: 병합 항적 리스트
}
mergedMap.clear(); // 메모리 즉시 해제: 선박 누적
}
// HOURLY/FIVE_MINUTE: L1/L2 캐시에서 직접 처리
processHourlyFiveMinWithCache(ranges, request, queryId,
chunkConsumer, statusConsumer, uniqueVesselIds,
strategy, benchmark, ctx, metrics);
globalChunkIndex = ctx.getCurrentChunkIndex();
}
}
@ -1145,7 +989,7 @@ public class ChunkedTrackStreamingService {
.viewportBounds(vpBounds)
.requestedMmsi(request.getVesselIds() != null ? request.getVesselIds().size() : 0)
.dataPath(benchmark.determinePath())
.cacheHitDays(benchmark.cacheHitDays)
.cacheHitDays(benchmark.cacheHitDays + benchmark.cacheHourlyRanges + benchmark.cacheFiveMinRanges)
.dbQueryDays(benchmark.dbQueryDays)
.dbConnTotal(benchmark.dbConnectionTotal())
.uniqueVessels(uniqueVesselIds.size())
@ -1160,6 +1004,7 @@ public class ChunkedTrackStreamingService {
.backpressureEvents(bpMetrics != null ? bpMetrics.backpressureEvents.get() : 0)
.status(queryStatus)
.clientIp(clientIp)
.clientId(clientId)
.build());
}
@ -2576,4 +2421,377 @@ public class ChunkedTrackStreamingService {
}
}
// ========== L1/L2 캐시 기반 HOURLY/FIVE_MINUTE 처리 ==========
/**
* HOURLY/FIVE_MINUTE 범위를 L1/L2 캐시에서 직접 처리.
* currentHourStart 기준: >= currentHour L1(5min), < currentHour L2(hourly)
* DB fallback 없음 캐시에 없으면 데이터 자체가 없음 (배치 Job이 DB+캐시 동시 적재)
*/
private void processHourlyFiveMinWithCache(
List<TimeRange> ranges, TrackQueryRequest request, String queryId,
Consumer<TrackChunkResponse> chunkConsumer,
Consumer<QueryStatusUpdate> statusConsumer,
Set<String> uniqueVesselIds,
TableStrategy originalStrategy, QueryBenchmark benchmark,
QueryContext ctx, BackpressureMetrics bpMetrics) throws Exception {
LocalDateTime currentHourStart = LocalDateTime.now().withMinute(0).withSecond(0).withNano(0);
Map<String, List<TimeRange>> timeGroups = groupRangesByTimeWindow(ranges, 6);
for (Map.Entry<String, List<TimeRange>> groupEntry : timeGroups.entrySet()) {
String groupKey = groupEntry.getKey();
List<TimeRange> groupRanges = groupEntry.getValue();
if (isQueryCancelled(queryId, null)) return;
log.info("[CACHE-{}] Processing time window {} with {} ranges", originalStrategy, groupKey, groupRanges.size());
LocalDateTime groupStart = groupRanges.get(0).getStart();
LocalDateTime groupEnd = groupRanges.get(groupRanges.size() - 1).getEnd();
// L1/L2 범위 분리: currentHourStart 기준
Map<String, List<VesselTrack>> l2Result = Collections.emptyMap();
Map<String, List<VesselTrack>> l1Result = Collections.emptyMap();
// L2 범위: groupStart ~ min(groupEnd, currentHourStart)
if (groupStart.isBefore(currentHourStart)) {
LocalDateTime l2End = groupEnd.isBefore(currentHourStart) ? groupEnd : currentHourStart;
LocalDateTime l2StartExpanded = groupStart.withMinute(0).withSecond(0).withNano(0);
l2Result = hourlyTrackCache.getTracksInRange(l2StartExpanded, l2End);
log.info("[CACHE-L2] Range [{}, {}): {} vessels", l2StartExpanded, l2End, l2Result.size());
}
// L1 범위: max(groupStart, currentHourStart) ~ groupEnd
if (groupEnd.isAfter(currentHourStart)) {
LocalDateTime l1Start = groupStart.isAfter(currentHourStart) ? groupStart : currentHourStart;
l1Result = fiveMinTrackCache.getTracksInRange(l1Start, groupEnd);
log.info("[CACHE-L1] Range [{}, {}): {} vessels", l1Start, groupEnd, l1Result.size());
}
// L1 + L2 결과 merge (MMSI 기준)
Map<String, List<VesselTrack>> merged = new LinkedHashMap<>(l2Result);
for (Map.Entry<String, List<VesselTrack>> l1Entry : l1Result.entrySet()) {
merged.merge(l1Entry.getKey(), l1Entry.getValue(), (existing, newTracks) -> {
List<VesselTrack> combined = new ArrayList<>(existing);
combined.addAll(newTracks);
combined.sort(Comparator.comparing(VesselTrack::getTimeBucket));
return combined;
});
}
if (merged.isEmpty()) {
log.info("[CACHE-{}] No data in cache for window {}", originalStrategy, groupKey);
continue;
}
// 뷰포트 필터
Map<String, List<VesselTrack>> viewportFiltered = filterByViewport(merged, request.getViewport());
// VesselTrack CompactVesselTrack 변환
List<CompactVesselTrack> compactTracks = vesselTrackToCompactConverter.convert(viewportFiltered);
// 간소화 적용
applySimplification(compactTracks, request, originalStrategy, ctx);
// VesselAccumulator에 병합
Map<String, VesselAccumulator> mergedMap = new HashMap<>(compactTracks.size());
mergeTracks(compactTracks, mergedMap);
if (!mergedMap.isEmpty()) {
List<CompactVesselTrack> mergedTracks = mergedMap.entrySet().stream()
.map(entry -> {
VesselAccumulator acc = entry.getValue();
String nationalCode = acc.mmsi != null && acc.mmsi.length() >= 3
? acc.mmsi.substring(0, 3) : null;
double avgSpeed = calculateAvgSpeed(acc.totalDistance, acc.timestamps);
return CompactVesselTrack.builder()
.vesselId(acc.mmsi)
.nationalCode(nationalCode)
.shipName(acc.shipName)
.shipType(acc.shipType)
.shipKindCode(acc.shipKindCode)
.geometry(acc.geometry)
.timestamps(acc.timestamps)
.speeds(acc.speeds)
.totalDistance(acc.totalDistance)
.avgSpeed(avgSpeed)
.maxSpeed(acc.maxSpeed)
.pointCount(acc.pointCount)
.build();
})
.collect(Collectors.toList());
uniqueVesselIds.addAll(mergedMap.keySet());
int totalPoints = mergedTracks.stream().mapToInt(CompactVesselTrack::getPointCount).sum();
log.info("[CACHE-{}] Time window {} - {} vessels, {} points",
originalStrategy, groupKey, mergedTracks.size(), totalPoints);
if (benchmark != null) {
benchmark.totalTracks += mergedTracks.size();
benchmark.totalPointsBefore += totalPoints;
benchmark.totalPointsAfter += totalPoints;
}
// 청크 분할 전송 (백프레셔 포함)
List<List<CompactVesselTrack>> batches = splitByMessageSize(mergedTracks, queryId);
for (List<CompactVesselTrack> batch : batches) {
if (isQueryCancelled(queryId, null)) return;
TrackChunkResponse response = new TrackChunkResponse();
response.setQueryId(queryId);
response.setChunkIndex(ctx.currentGlobalChunkIndex++);
response.setIsLastChunk(false);
response.setTotalChunks(-1);
response.setCompactTracks(batch);
String timeKey = groupKey + "_" + originalStrategy;
ctx.processedTimeRanges.put(timeKey, groupRanges.stream()
.mapToInt(r -> (int) Duration.between(r.getStart(), r.getEnd()).toMinutes())
.sum());
int currentProcessedMin = ctx.processedTimeRanges.values().stream()
.mapToInt(Integer::intValue).sum();
response.setStats(createChunkStats(batch, uniqueVesselIds, currentProcessedMin, ctx));
int chunkSize = batch.stream().mapToInt(this::estimateTrackSize).sum();
long currentBufferSize = pendingBufferSize.addAndGet(chunkSize);
if (bpMetrics != null) {
bpMetrics.totalBytes.addAndGet(chunkSize);
bpMetrics.chunkCount.incrementAndGet();
}
// 백프레셔 대기
int bpWait = 0;
while (pendingBufferSize.get() > MAX_PENDING_BUFFER) {
Thread.sleep(50);
bpWait++;
if (bpWait > 10 && bpMetrics != null && bpMetrics.dynamicChunkSizeKB > MIN_MESSAGE_SIZE_KB) {
bpMetrics.dynamicChunkSizeKB = Math.max(MIN_MESSAGE_SIZE_KB,
bpMetrics.dynamicChunkSizeKB - 512);
}
}
try {
chunkConsumer.accept(response);
} finally {
pendingBufferSize.addAndGet(-chunkSize);
}
double timeProgress = ctx.estimatedTotalMinutes > 0
? (double) currentProcessedMin / ctx.estimatedTotalMinutes * 100 : 0;
statusConsumer.accept(new QueryStatusUpdate(
queryId, "PROCESSING",
"Processing chunk " + ctx.currentGlobalChunkIndex,
Math.min(99.0, timeProgress)));
if (benchmark != null) benchmark.totalBatches++;
}
batches.clear();
mergedTracks.clear();
}
mergedMap.clear();
// benchmark 갱신
if (benchmark != null) {
if (originalStrategy == TableStrategy.HOURLY) {
benchmark.cacheHourlyRanges += groupRanges.size();
} else {
benchmark.cacheFiveMinRanges += groupRanges.size();
}
}
}
}
/**
* 캐시 데이터에서 뷰포트 교차 선박 필터링 JTS 파싱 없이 WKT 좌표 직접 파싱
*/
private Map<String, List<VesselTrack>> filterByViewport(
Map<String, List<VesselTrack>> tracksByMmsi, ViewportFilter viewport) {
if (viewport == null || !viewport.isValid() || tracksByMmsi.isEmpty()) {
return tracksByMmsi;
}
double minLon = viewport.getMinLon(), minLat = viewport.getMinLat();
double maxLon = viewport.getMaxLon(), maxLat = viewport.getMaxLat();
Map<String, List<VesselTrack>> filtered = new LinkedHashMap<>();
for (Map.Entry<String, List<VesselTrack>> entry : tracksByMmsi.entrySet()) {
boolean inViewport = false;
for (VesselTrack track : entry.getValue()) {
if (isTrackInViewport(track.getTrackGeom(), minLon, minLat, maxLon, maxLat)) {
inViewport = true;
break;
}
}
if (inViewport) {
filtered.put(entry.getKey(), entry.getValue());
}
}
return filtered;
}
/**
* WKT LineStringM 좌표 경량 파싱으로 뷰포트 bbox 교차 확인
*/
private boolean isTrackInViewport(String wkt, double minLon, double minLat, double maxLon, double maxLat) {
if (wkt == null || wkt.isEmpty()) return false;
int openParen = wkt.indexOf('(');
int closeParen = wkt.lastIndexOf(')');
if (openParen < 0 || closeParen <= openParen + 1) return false;
String coords = wkt.substring(openParen + 1, closeParen);
for (String point : coords.split(",")) {
String trimmed = point.trim();
int firstSpace = trimmed.indexOf(' ');
if (firstSpace <= 0) continue;
int secondSpace = trimmed.indexOf(' ', firstSpace + 1);
try {
double lon = Double.parseDouble(trimmed.substring(0, firstSpace));
double lat = Double.parseDouble(trimmed.substring(firstSpace + 1,
secondSpace > 0 ? secondSpace : trimmed.length()));
if (lon >= minLon && lon <= maxLon && lat >= minLat && lat <= maxLat) {
return true;
}
} catch (NumberFormatException e) {
// skip malformed coordinate
}
}
return false;
}
/**
* CompactVesselTrack 리스트를 VesselAccumulator 맵에 병합
*/
private void mergeTracks(List<CompactVesselTrack> tracks, Map<String, VesselAccumulator> mergedMap) {
for (CompactVesselTrack track : tracks) {
String vesselId = track.getVesselId();
VesselAccumulator accumulator = mergedMap.get(vesselId);
if (accumulator == null) {
accumulator = new VesselAccumulator();
accumulator.mmsi = vesselId;
accumulator.shipName = track.getShipName();
accumulator.shipType = track.getShipType();
accumulator.shipKindCode = track.getShipKindCode();
mergedMap.put(vesselId, accumulator);
}
accumulator.geometry.addAll(track.getGeometry());
accumulator.timestamps.addAll(track.getTimestamps());
accumulator.speeds.addAll(track.getSpeeds());
accumulator.totalDistance += track.getTotalDistance();
accumulator.maxSpeed = Math.max(accumulator.maxSpeed, track.getMaxSpeed());
accumulator.pointCount += track.getPointCount();
}
}
/**
* CompactVesselTrack 리스트에 전략/ 레벨별 간소화 적용
*/
private void applySimplification(List<CompactVesselTrack> tracks, TrackQueryRequest request,
TableStrategy strategy, QueryContext ctx) {
if (tracks == null || tracks.isEmpty()) return;
for (CompactVesselTrack track : tracks) {
if (track.getGeometry() == null || track.getGeometry().size() <= 1) continue;
int originalSize = track.getGeometry().size();
List<double[]> simplifiedGeometry = new ArrayList<>();
List<String> simplifiedTimestamps = new ArrayList<>();
List<Double> simplifiedSpeeds = new ArrayList<>();
double[] prevPoint = null;
LocalDateTime prevTime = null;
for (int i = 0; i < track.getGeometry().size(); i++) {
double[] point = track.getGeometry().get(i);
LocalDateTime currentTime = null;
String tsStr = track.getTimestamps().get(i);
try {
if (tsStr.matches("\\d{10,}")) {
currentTime = LocalDateTime.ofInstant(
java.time.Instant.ofEpochSecond(Long.parseLong(tsStr)),
java.time.ZoneId.systemDefault());
} else {
currentTime = LocalDateTime.parse(tsStr, TIMESTAMP_FORMATTER);
}
} catch (Exception e) {
currentTime = LocalDateTime.now();
}
boolean include = false;
if (i == 0 || i == track.getGeometry().size() - 1) {
include = true;
} else if (prevPoint != null && prevTime != null) {
double distance = calculateDistance(prevPoint[1], prevPoint[0], point[1], point[0]);
long minutesSincePrev = ChronoUnit.MINUTES.between(prevTime, currentTime);
if (strategy == TableStrategy.DAILY) {
include = distance > 0.05 || minutesSincePrev >= 60;
} else if (strategy == TableStrategy.HOURLY) {
include = distance > 1.08 || minutesSincePrev >= 60;
} else {
include = distance > 0.54 || minutesSincePrev >= 30;
}
if (track.getAvgSpeed() > 0 && track.getAvgSpeed() < 5.0) {
include = distance > 1.5 || minutesSincePrev >= 45;
}
}
if (include) {
simplifiedGeometry.add(point);
simplifiedTimestamps.add(tsStr);
simplifiedSpeeds.add(track.getSpeeds().size() > i ? track.getSpeeds().get(i) : 0.0);
prevPoint = point;
prevTime = currentTime;
}
}
// 레벨에 따른 추가 샘플링
if (request.getZoomLevel() != null && request.getZoomLevel() < 10 && simplifiedGeometry.size() > 2) {
int sampleRate = request.getZoomLevel() < 6 ? 10 :
request.getZoomLevel() < 8 ? 5 : 2;
List<double[]> sampledGeometry = new ArrayList<>();
List<String> sampledTimestamps = new ArrayList<>();
List<Double> sampledSpeeds = new ArrayList<>();
for (int i = 0; i < simplifiedGeometry.size(); i++) {
if (i % sampleRate == 0 || i == simplifiedGeometry.size() - 1) {
sampledGeometry.add(simplifiedGeometry.get(i));
sampledTimestamps.add(simplifiedTimestamps.get(i));
sampledSpeeds.add(simplifiedSpeeds.get(i));
}
}
track.setGeometry(sampledGeometry);
track.setTimestamps(sampledTimestamps);
track.setSpeeds(sampledSpeeds);
track.setPointCount(sampledGeometry.size());
} else {
track.setGeometry(simplifiedGeometry);
track.setTimestamps(simplifiedTimestamps);
track.setSpeeds(simplifiedSpeeds);
track.setPointCount(simplifiedGeometry.size());
}
if (ctx != null && ctx.vesselLogCount < 10 && originalSize > track.getPointCount()) {
double reductionRate = (1 - (double) track.getPointCount() / originalSize) * 100;
log.info("[CACHE-SIMPLIFY] Vessel {} simplified: {} -> {} points ({}% reduced, zoom: {})",
track.getVesselId(), originalSize, track.getPointCount(),
Math.round(reductionRate), request.getZoomLevel());
ctx.vesselLogCount++;
}
}
}
}