Merge pull request 'feat: WebSocket 리플레이 캐시 통합 + 쿼리 메트릭 사용자 ID 수집' (#115) from feature/ws-cache-integration-and-client-metrics into develop

This commit is contained in:
htlee 2026-03-27 06:36:30 +09:00
커밋 d31eeef193
9개의 변경된 파일539개의 추가작업 그리고 218개의 파일을 삭제

파일 보기

@ -4,6 +4,14 @@
## [Unreleased]
### 추가
- WebSocket 리플레이 쿼리 L1/L2 캐시 통합 — HOURLY/5MIN 구간 DB 의존 제거, 당일 쿼리 100% 캐시
- 쿼리 메트릭 사용자 ID 수집 — GC_SESSION JWT에서 인증된 사용자 email 추출
- 대시보드 Top 클라이언트 IP/ID 토글 — groupBy 파라미터로 IP 기준 또는 사용자 ID 기준 전환
### 수정
- vessel info SQL 컬럼명 오류 수정 (ship_nm → name) — 선박 정보 조회 실패("bad SQL grammar") 해결
## [2026-03-19]
### 변경

파일 보기

@ -68,7 +68,7 @@ export const monitorApi = {
return fetchJson(`/api/monitoring/query-metrics/summary?hours=${hours}`)
},
getQueryMetricsTimeSeries(days = 7): Promise<QueryMetricsTimeSeries> {
return fetchJson(`/api/monitoring/query-metrics/timeseries?days=${days}`)
getQueryMetricsTimeSeries(days = 7, groupBy: 'ip' | 'id' = 'ip'): Promise<QueryMetricsTimeSeries> {
return fetchJson(`/api/monitoring/query-metrics/timeseries?days=${days}&groupBy=${groupBy}`)
},
}

파일 보기

@ -32,6 +32,7 @@ export default function Dashboard() {
const [running, setRunning] = useCachedState<RunningJob[]>('dash.running', [])
const [queryTs, setQueryTs] = useCachedState<QueryMetricsTimeSeries | null>('dash.queryTs', null)
const [days, setDays] = useState(7)
const [clientGroupBy, setClientGroupBy] = useState<'ip' | 'id'>('ip')
const [isQueryChartsOpen, setIsQueryChartsOpen] = useState(() =>
localStorage.getItem('dashboard-query-charts') !== 'collapsed',
)
@ -51,8 +52,8 @@ export default function Dashboard() {
monitorApi.getDelay().then(setDelay).catch(() => {})
batchApi.getDailyStats().then(setDaily).catch(() => {})
batchApi.getRunningJobs().then(setRunning).catch(() => {})
monitorApi.getQueryMetricsTimeSeries(days).then(setQueryTs).catch(() => {})
}, POLL_INTERVAL, [days])
monitorApi.getQueryMetricsTimeSeries(days, clientGroupBy).then(setQueryTs).catch(() => {})
}, POLL_INTERVAL, [days, clientGroupBy])
const memUsage = metrics
? Math.round((metrics.memory.used / metrics.memory.max) * 100)
@ -327,7 +328,21 @@ export default function Dashboard() {
{/* Top Clients */}
{queryTs.topClients.length > 0 && (
<div>
<div className="mb-2 text-sm font-medium text-muted">{t('dashboard.topClients')}</div>
<div className="mb-2 flex items-center gap-2">
<span className="text-sm font-medium text-muted">{t('dashboard.topClients')}</span>
<div className="flex overflow-hidden rounded-md border border-[var(--border-primary)] text-xs">
<button
type="button"
className={`px-2 py-0.5 ${clientGroupBy === 'ip' ? 'bg-[var(--accent-primary)] text-white' : 'text-[var(--text-secondary)]'}`}
onClick={() => setClientGroupBy('ip')}
>IP</button>
<button
type="button"
className={`px-2 py-0.5 ${clientGroupBy === 'id' ? 'bg-[var(--accent-primary)] text-white' : 'text-[var(--text-secondary)]'}`}
onClick={() => setClientGroupBy('id')}
>ID</button>
</div>
</div>
<div className="space-y-2">
{queryTs.topClients.map((c, i) => {
const maxCount = queryTs.topClients[0].query_count

파일 보기

@ -22,8 +22,10 @@ import org.springframework.http.server.ServletServerHttpRequest;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.servlet.http.Cookie;
import jakarta.servlet.http.HttpServletRequest;
import java.security.Principal;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ -180,11 +182,18 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
String clientIp = extractClientIp(request);
attributes.put("CLIENT_IP", clientIp);
// User-Agent 추출
if (request instanceof ServletServerHttpRequest) {
HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
// User-Agent 추출
String userAgent = servletRequest.getHeader("User-Agent");
attributes.put("USER_AGENT", userAgent);
// GC_SESSION 쿠키에서 JWT email 추출 (guide 서비스 인증)
String clientId = extractEmailFromJwtCookie(servletRequest);
if (clientId != null) {
attributes.put("CLIENT_ID", clientId);
}
}
return true;
@ -225,5 +234,43 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
// ServletServerHttpRequest가 아닌 경우 기본값
return "unknown";
}
/**
* GC_SESSION 쿠키에서 JWT payload의 email 클레임 추출.
* JWT 검증은 nginx auth_request에서 이미 완료 여기서는 payload 디코딩만 수행.
*/
private String extractEmailFromJwtCookie(HttpServletRequest request) {
Cookie[] cookies = request.getCookies();
if (cookies == null) return null;
String token = null;
for (Cookie cookie : cookies) {
if ("GC_SESSION".equals(cookie.getName())) {
token = cookie.getValue();
break;
}
}
if (token == null || token.isEmpty()) return null;
try {
// JWT: header.payload.signature payload만 Base64URL 디코딩
String[] parts = token.split("\\.");
if (parts.length < 2) return null;
String payload = new String(Base64.getUrlDecoder().decode(parts[1]));
// 간단한 JSON 파싱 (Jackson 의존 없이): "email":"value" 추출
int emailIdx = payload.indexOf("\"email\"");
if (emailIdx < 0) return null;
int colonIdx = payload.indexOf(':', emailIdx);
int quoteStart = payload.indexOf('"', colonIdx + 1);
int quoteEnd = payload.indexOf('"', quoteStart + 1);
if (quoteStart < 0 || quoteEnd < 0) return null;
return payload.substring(quoteStart + 1, quoteEnd);
} catch (Exception e) {
return null;
}
}
}
}

파일 보기

@ -71,11 +71,17 @@ public class StompTrackController {
}
};
// 세션 속성에서 CLIENT_IP 추출
// 세션 속성에서 CLIENT_IP, CLIENT_ID 추출
String clientIp = null;
String clientId = null;
Map<String, Object> sessionAttrs = headerAccessor.getSessionAttributes();
if (sessionAttrs != null && sessionAttrs.containsKey("CLIENT_IP")) {
clientIp = (String) sessionAttrs.get("CLIENT_IP");
if (sessionAttrs != null) {
if (sessionAttrs.containsKey("CLIENT_IP")) {
clientIp = (String) sessionAttrs.get("CLIENT_IP");
}
if (sessionAttrs.containsKey("CLIENT_ID")) {
clientId = (String) sessionAttrs.get("CLIENT_ID");
}
}
// 비동기 스트리밍 시작 - 청크 모드 체크
@ -86,7 +92,8 @@ public class StompTrackController {
sessionId,
chunk -> sendChunkedDataToUser(userId, chunk),
statusCallback,
clientIp
clientIp,
clientId
);
} else {
trackStreamingService.streamTracks(

파일 보기

@ -1,7 +1,11 @@
package gc.mda.signal_batch.global.websocket.service;
import gc.mda.signal_batch.batch.reader.FiveMinTrackCache;
import gc.mda.signal_batch.batch.reader.HourlyTrackCache;
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
import gc.mda.signal_batch.global.exception.MemoryBudgetExceededException;
import gc.mda.signal_batch.global.util.TrackMemoryEstimator;
import gc.mda.signal_batch.global.util.VesselTrackToCompactConverter;
import gc.mda.signal_batch.global.websocket.dto.TrackChunkResponse;
import gc.mda.signal_batch.global.websocket.interceptor.TrackQueryInterceptor;
import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack;
@ -55,6 +59,9 @@ public class ChunkedTrackStreamingService {
private final CacheTrackSimplifier cacheTrackSimplifier;
private final TrackMemoryBudgetManager memoryBudgetManager;
private final QueryMetricsBufferService queryMetricsBufferService;
private final HourlyTrackCache hourlyTrackCache;
private final FiveMinTrackCache fiveMinTrackCache;
private final VesselTrackToCompactConverter vesselTrackToCompactConverter;
private static final ThreadLocal<WKTReader> wktReaderLocal = ThreadLocal.withInitial(WKTReader::new);
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final int MAX_TRACKS_PER_CHUNK = 500000; // 청크당 최대 트랙 (10만 선박 지원)
@ -105,7 +112,10 @@ public class ChunkedTrackStreamingService {
DailyTrackCacheManager dailyTrackCacheManager,
CacheTrackSimplifier cacheTrackSimplifier,
TrackMemoryBudgetManager memoryBudgetManager,
QueryMetricsBufferService queryMetricsBufferService) {
QueryMetricsBufferService queryMetricsBufferService,
HourlyTrackCache hourlyTrackCache,
FiveMinTrackCache fiveMinTrackCache,
VesselTrackToCompactConverter vesselTrackToCompactConverter) {
this.queryJdbcTemplate = queryJdbcTemplate;
this.queryDataSource = queryDataSource;
this.activeQueryManager = activeQueryManager;
@ -114,6 +124,9 @@ public class ChunkedTrackStreamingService {
this.cacheTrackSimplifier = cacheTrackSimplifier;
this.memoryBudgetManager = memoryBudgetManager;
this.queryMetricsBufferService = queryMetricsBufferService;
this.hourlyTrackCache = hourlyTrackCache;
this.fiveMinTrackCache = fiveMinTrackCache;
this.vesselTrackToCompactConverter = vesselTrackToCompactConverter;
}
/**
@ -153,6 +166,8 @@ public class ChunkedTrackStreamingService {
private static class QueryBenchmark {
int cacheHitDays = 0;
int dbQueryDays = 0;
int cacheHourlyRanges = 0; // L2 캐시 그룹
int cacheFiveMinRanges = 0; // L1 캐시 그룹
int totalTracks = 0;
int totalPointsBefore = 0;
int totalPointsAfter = 0;
@ -164,13 +179,15 @@ public class ChunkedTrackStreamingService {
int connViewportPass1 = 0; // collectViewportVesselIds DB 쿼리
int connDailyPages = 0; // streamDailyTableWithPagination 페이지
int connVesselInfo = 0; // preloadVesselInfoWithSessionCache 배치
int connHourly5min = 0; // processTableRange (5min/hourly)
int connHourly5min = 0; // processTableRange (5min/hourly) 기존 DB 경로 잔류용
int connTableCheck = 0; // hasDataInTable 존재 검증
Integer zoomLevel;
String determinePath() {
if (cacheHitDays > 0 && dbQueryDays > 0) return "HYBRID";
if (cacheHitDays > 0) return "CACHE";
boolean anyCache = cacheHitDays > 0 || cacheHourlyRanges > 0 || cacheFiveMinRanges > 0;
boolean anyDb = dbQueryDays > 0;
if (anyCache && anyDb) return "HYBRID";
if (anyCache) return "CACHE";
return "DB";
}
@ -182,6 +199,7 @@ public class ChunkedTrackStreamingService {
return String.format(
"{\"queryId\":\"%s\",\"timestamp\":\"%s\",\"path\":\"%s\"," +
"\"zoomLevel\":%s,\"dateRanges\":%d,\"cacheHitDays\":%d,\"dbQueryDays\":%d," +
"\"cacheHourlyRanges\":%d,\"cacheFiveMinRanges\":%d," +
"\"totalTracks\":%d,\"totalPointsBefore\":%d,\"totalPointsAfter\":%d," +
"\"pointReductionPct\":%d,\"totalBatches\":%d,\"batchesBeforeSimplify\":%d," +
"\"simplifyTimeMs\":%d,\"dbQueryTimeMs\":%d,\"totalElapsedMs\":%d," +
@ -192,8 +210,9 @@ public class ChunkedTrackStreamingService {
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),
determinePath(),
zoomLevel != null ? zoomLevel.toString() : "null",
cacheHitDays + dbQueryDays,
cacheHitDays + dbQueryDays + cacheHourlyRanges + cacheFiveMinRanges,
cacheHitDays, dbQueryDays,
cacheHourlyRanges, cacheFiveMinRanges,
totalTracks, totalPointsBefore, totalPointsAfter,
totalPointsBefore > 0 ? Math.round((1 - (double) totalPointsAfter / totalPointsBefore) * 100) : 0,
totalBatches, batchesBeforeSimplify,
@ -238,12 +257,12 @@ public class ChunkedTrackStreamingService {
// DB에서 조회 (signal_kind_code는 캐시 저장 치환된 )
try {
String sql = "SELECT ship_nm, vessel_type, signal_kind_code FROM signal.t_ais_position " +
String sql = "SELECT name, vessel_type, signal_kind_code FROM signal.t_ais_position " +
"WHERE mmsi = ? LIMIT 1";
VesselInfo info = queryJdbcTemplate.queryForObject(sql,
(rs, rowNum) -> new VesselInfo(
rs.getString("ship_nm"),
rs.getString("name"),
rs.getString("vessel_type"),
rs.getString("signal_kind_code")
),
@ -320,7 +339,7 @@ public class ChunkedTrackStreamingService {
.map(id -> "?")
.collect(Collectors.joining(","));
String sql = "SELECT mmsi, ship_nm, vessel_type, signal_kind_code " +
String sql = "SELECT mmsi, name, vessel_type, signal_kind_code " +
"FROM signal.t_ais_position " +
"WHERE mmsi IN (" + placeholders + ")";
@ -329,7 +348,7 @@ public class ChunkedTrackStreamingService {
queryJdbcTemplate.query(sql, rs -> {
String visselId = rs.getString("mmsi");
VesselInfo info = new VesselInfo(
rs.getString("ship_nm"),
rs.getString("name"),
rs.getString("vessel_type"),
rs.getString("signal_kind_code")
);
@ -389,7 +408,7 @@ public class ChunkedTrackStreamingService {
.map(id -> "?")
.collect(Collectors.joining(","));
String sql = "SELECT mmsi, ship_nm, vessel_type, signal_kind_code " +
String sql = "SELECT mmsi, name, vessel_type, signal_kind_code " +
"FROM signal.t_ais_position " +
"WHERE mmsi IN (" + placeholders + ")";
@ -398,7 +417,7 @@ public class ChunkedTrackStreamingService {
queryJdbcTemplate.query(sql, rs -> {
String vesselId = rs.getString("mmsi");
VesselInfo info = new VesselInfo(
rs.getString("ship_nm"),
rs.getString("name"),
rs.getString("vessel_type"),
rs.getString("signal_kind_code")
);
@ -447,6 +466,12 @@ public class ChunkedTrackStreamingService {
for (Map.Entry<TableStrategy, List<TimeRange>> entry : strategyMap.entrySet()) {
TableStrategy strategy = entry.getKey();
// HOURLY/FIVE_MINUTE: 캐시에서 직접 뷰포트 필터 수행 Pass 1 DB 쿼리 불필요
if (strategy == TableStrategy.HOURLY || strategy == TableStrategy.FIVE_MINUTE) {
continue;
}
String tableName = strategy.getTableName();
for (TimeRange range : entry.getValue()) {
@ -725,7 +750,8 @@ public class ChunkedTrackStreamingService {
String sessionId,
Consumer<TrackChunkResponse> chunkConsumer,
Consumer<QueryStatusUpdate> statusConsumer,
String clientIp) {
String clientIp,
String clientId) {
boolean slotAcquired = false;
QueryBenchmark benchmark = null;
QueryContext ctx = null;
@ -867,193 +893,11 @@ public class ChunkedTrackStreamingService {
globalChunkIndex, uniqueVesselIds, viewportVesselIds, benchmark, ctx);
globalChunkIndex = ctx.getCurrentChunkIndex();
} else {
// Hourly/5min은 6시간 단위로 그룹화하여 처리
Map<String, List<TimeRange>> timeGroups = groupRangesByTimeWindow(ranges, 6);
for (Map.Entry<String, List<TimeRange>> groupEntry : timeGroups.entrySet()) {
String groupKey = groupEntry.getKey();
List<TimeRange> groupRanges = groupEntry.getValue();
log.info("[{}] Processing time window {} with {} ranges", strategy, groupKey, groupRanges.size());
// 시간 그룹 데이터를 병합
Map<String, VesselAccumulator> mergedMap = new HashMap<>(20000);
LocalDateTime baseTime = null;
for (TimeRange range : groupRanges) {
try {
// 범위의 시작 시간을 기준으로 설정
if (baseTime == null) {
baseTime = range.getStart();
}
List<CompactVesselTrack> compactTracks = processTableRangeWithBaseTime(
request, strategy, range, baseTime, viewportVesselIds, queryId, ctx);
if (benchmark != null) benchmark.connHourly5min++; // [BENCHMARK]
// 선박별로 볕합
for (CompactVesselTrack track : compactTracks) {
String vesselId = track.getVesselId();
VesselAccumulator accumulator = mergedMap.get(vesselId);
if (accumulator == null) {
accumulator = new VesselAccumulator();
accumulator.mmsi = track.getVesselId();
VesselInfo vesselInfo = getVesselInfo(track.getVesselId());
accumulator.shipName = vesselInfo.shipName;
accumulator.shipType = vesselInfo.shipType;
accumulator.shipKindCode = vesselInfo.signalKindCode;
mergedMap.put(vesselId, accumulator);
}
// 데이터 병합
accumulator.geometry.addAll(track.getGeometry());
accumulator.timestamps.addAll(track.getTimestamps());
accumulator.speeds.addAll(track.getSpeeds());
accumulator.totalDistance += track.getTotalDistance();
accumulator.maxSpeed = Math.max(accumulator.maxSpeed, track.getMaxSpeed());
accumulator.pointCount += track.getPointCount();
}
} catch (Exception e) {
log.error("Error processing {} range {}: {}", strategy, range, e.getMessage());
}
}
// 병합된 데이터를 청크로 분할하여 전송
if (!mergedMap.isEmpty()) {
List<CompactVesselTrack> mergedTracks = mergedMap.entrySet().stream()
.map(entry -> {
String vesselId = entry.getKey();
VesselAccumulator acc = entry.getValue();
double avgSpeed = calculateAvgSpeed(acc.totalDistance, acc.timestamps);
return CompactVesselTrack.builder()
.vesselId(vesselId)
.nationalCode(acc.mmsi != null && acc.mmsi.length() >= 3 ? acc.mmsi.substring(0, 3) : null)
.shipName(acc.shipName)
.shipType(acc.shipType)
.shipKindCode(acc.shipKindCode)
.geometry(acc.geometry)
.timestamps(acc.timestamps)
.speeds(acc.speeds)
.totalDistance(acc.totalDistance)
.avgSpeed(avgSpeed)
.maxSpeed(acc.maxSpeed)
.pointCount(acc.pointCount)
.build();
})
.collect(Collectors.toList());
// 전체 포인트 통계 계산
int totalOriginalPoints = mergedTracks.stream()
.mapToInt(t -> t.getPointCount())
.sum();
log.info("[{}] Time window {} - Merged {} vessels, Total {} points",
strategy, groupKey, mergedTracks.size(), totalOriginalPoints);
List<List<CompactVesselTrack>> batches = splitByMessageSize(mergedTracks, queryId);
for (List<CompactVesselTrack> batch : batches) {
TrackChunkResponse response = new TrackChunkResponse();
response.setQueryId(queryId);
response.setChunkIndex(globalChunkIndex++);
response.setIsLastChunk(false);
response.setTotalChunks(-1); // 마짉 청크에서 설정
response.setCompactTracks(batch);
// 처리된 시간 추가
String timeKey = groupKey + "_" + strategy;
ctx.processedTimeRanges.put(timeKey, groupRanges.stream()
.mapToInt(r -> (int)Duration.between(r.getStart(), r.getEnd()).toMinutes())
.sum());
int currentProcessedMin = ctx.processedTimeRanges.values().stream().mapToInt(Integer::intValue).sum();
response.setStats(createChunkStats(batch, uniqueVesselIds, currentProcessedMin, ctx));
// 버퍼 크기 계산 추가
int chunkSize = batch.stream().mapToInt(t -> estimateTrackSize(t)).sum();
long currentBufferSize = pendingBufferSize.addAndGet(chunkSize);
metrics.totalBytes.addAndGet(chunkSize);
metrics.chunkCount.incrementAndGet();
// 버퍼 사용률 로그
double bufferUsage = (double) currentBufferSize / MAX_PENDING_BUFFER * 100;
if (bufferUsage > 80 && System.currentTimeMillis() - metrics.lastWarningTime > 5000) {
metrics.bufferWarnings.incrementAndGet();
metrics.lastWarningTime = System.currentTimeMillis();
log.warn("[BACKPRESSURE] Query {} - Buffer usage high: {}% ({} MB / {} MB)",
queryId, String.format("%.1f", bufferUsage), currentBufferSize / (1024 * 1024),
MAX_PENDING_BUFFER / (1024 * 1024));
}
// 버퍼가 가듍 경우 대기 동적 청크 크기 조절
int backpressureWaitCount = 0;
while (pendingBufferSize.get() > MAX_PENDING_BUFFER) {
if (backpressureWaitCount == 0) {
metrics.backpressureEvents.incrementAndGet();
log.warn("[BACKPRESSURE] Query {} - Buffer full! Waiting for buffer to drain. Current: {} MB",
queryId, pendingBufferSize.get() / (1024 * 1024));
}
Thread.sleep(50);
backpressureWaitCount++;
// 500ms 이상 대기 청크 크기 감소
if (backpressureWaitCount > 10 && metrics.dynamicChunkSizeKB > MIN_MESSAGE_SIZE_KB) {
int oldSize = metrics.dynamicChunkSizeKB;
metrics.dynamicChunkSizeKB = Math.max(MIN_MESSAGE_SIZE_KB,
metrics.dynamicChunkSizeKB - 512);
log.info("[BACKPRESSURE] Query {} - Reducing chunk size: {} KB -> {} KB",
queryId, oldSize, metrics.dynamicChunkSizeKB);
}
}
if (backpressureWaitCount > 0) {
log.info("[BACKPRESSURE] Query {} - Buffer drained after {} ms wait",
queryId, backpressureWaitCount * 50);
}
// 전송 버퍼 추적 (전송 완료 즉시 감소)
try {
chunkConsumer.accept(response);
} finally {
pendingBufferSize.addAndGet(-chunkSize);
}
// 유니크 선박 카운트
batch.forEach(track -> uniqueVesselIds.add(track.getVesselId()));
// 진행률 업데이트 (시간 기반만 사용)
double timeProgress = (double) currentProcessedMin / ctx.estimatedTotalMinutes * 100;
statusConsumer.accept(new QueryStatusUpdate(
queryId,
"PROCESSING",
"Processing chunk " + globalChunkIndex,
Math.min(99.0, timeProgress)
));
// 버퍼 사용률에 따른 적응형 대기
double currentBufferUsage = (double) pendingBufferSize.get() / MAX_PENDING_BUFFER;
int waitTime;
if (currentBufferUsage > 0.8) waitTime = 200; // 80%: 강한 억제
else if (currentBufferUsage > 0.5) waitTime = 100; // 50%: 중간 억제
else if (currentBufferUsage > 0.3) waitTime = 50; // 30%: 약한 억제
else waitTime = 10; // 정상: 최소 지연
Thread.sleep(waitTime);
// 진행 상황 로그 ( 10번째 청크마다)
if (globalChunkIndex % 10 == 0) {
log.info("Progress: chunk {}, vessels: {}, time progress: {}%",
globalChunkIndex, uniqueVesselIds.size(),
Math.round(timeProgress));
}
}
batches.clear(); // 메모리 즉시 해제: 배치 분할 리스트
mergedTracks.clear(); // 메모리 즉시 해제: 병합 항적 리스트
}
mergedMap.clear(); // 메모리 즉시 해제: 선박 누적
}
// HOURLY/FIVE_MINUTE: L1/L2 캐시에서 직접 처리
processHourlyFiveMinWithCache(ranges, request, queryId,
chunkConsumer, statusConsumer, uniqueVesselIds,
strategy, benchmark, ctx, metrics);
globalChunkIndex = ctx.getCurrentChunkIndex();
}
}
@ -1145,7 +989,7 @@ public class ChunkedTrackStreamingService {
.viewportBounds(vpBounds)
.requestedMmsi(request.getVesselIds() != null ? request.getVesselIds().size() : 0)
.dataPath(benchmark.determinePath())
.cacheHitDays(benchmark.cacheHitDays)
.cacheHitDays(benchmark.cacheHitDays + benchmark.cacheHourlyRanges + benchmark.cacheFiveMinRanges)
.dbQueryDays(benchmark.dbQueryDays)
.dbConnTotal(benchmark.dbConnectionTotal())
.uniqueVessels(uniqueVesselIds.size())
@ -1160,6 +1004,7 @@ public class ChunkedTrackStreamingService {
.backpressureEvents(bpMetrics != null ? bpMetrics.backpressureEvents.get() : 0)
.status(queryStatus)
.clientIp(clientIp)
.clientId(clientId)
.build());
}
@ -2576,4 +2421,377 @@ public class ChunkedTrackStreamingService {
}
}
// ========== L1/L2 캐시 기반 HOURLY/FIVE_MINUTE 처리 ==========
/**
* HOURLY/FIVE_MINUTE 범위를 L1/L2 캐시에서 직접 처리.
* currentHourStart 기준: >= currentHour L1(5min), < currentHour L2(hourly)
* DB fallback 없음 캐시에 없으면 데이터 자체가 없음 (배치 Job이 DB+캐시 동시 적재)
*/
private void processHourlyFiveMinWithCache(
List<TimeRange> ranges, TrackQueryRequest request, String queryId,
Consumer<TrackChunkResponse> chunkConsumer,
Consumer<QueryStatusUpdate> statusConsumer,
Set<String> uniqueVesselIds,
TableStrategy originalStrategy, QueryBenchmark benchmark,
QueryContext ctx, BackpressureMetrics bpMetrics) throws Exception {
LocalDateTime currentHourStart = LocalDateTime.now().withMinute(0).withSecond(0).withNano(0);
Map<String, List<TimeRange>> timeGroups = groupRangesByTimeWindow(ranges, 6);
for (Map.Entry<String, List<TimeRange>> groupEntry : timeGroups.entrySet()) {
String groupKey = groupEntry.getKey();
List<TimeRange> groupRanges = groupEntry.getValue();
if (isQueryCancelled(queryId, null)) return;
log.info("[CACHE-{}] Processing time window {} with {} ranges", originalStrategy, groupKey, groupRanges.size());
LocalDateTime groupStart = groupRanges.get(0).getStart();
LocalDateTime groupEnd = groupRanges.get(groupRanges.size() - 1).getEnd();
// L1/L2 범위 분리: currentHourStart 기준
Map<String, List<VesselTrack>> l2Result = Collections.emptyMap();
Map<String, List<VesselTrack>> l1Result = Collections.emptyMap();
// L2 범위: groupStart ~ min(groupEnd, currentHourStart)
if (groupStart.isBefore(currentHourStart)) {
LocalDateTime l2End = groupEnd.isBefore(currentHourStart) ? groupEnd : currentHourStart;
LocalDateTime l2StartExpanded = groupStart.withMinute(0).withSecond(0).withNano(0);
l2Result = hourlyTrackCache.getTracksInRange(l2StartExpanded, l2End);
log.info("[CACHE-L2] Range [{}, {}): {} vessels", l2StartExpanded, l2End, l2Result.size());
}
// L1 범위: max(groupStart, currentHourStart) ~ groupEnd
if (groupEnd.isAfter(currentHourStart)) {
LocalDateTime l1Start = groupStart.isAfter(currentHourStart) ? groupStart : currentHourStart;
l1Result = fiveMinTrackCache.getTracksInRange(l1Start, groupEnd);
log.info("[CACHE-L1] Range [{}, {}): {} vessels", l1Start, groupEnd, l1Result.size());
}
// L1 + L2 결과 merge (MMSI 기준)
Map<String, List<VesselTrack>> merged = new LinkedHashMap<>(l2Result);
for (Map.Entry<String, List<VesselTrack>> l1Entry : l1Result.entrySet()) {
merged.merge(l1Entry.getKey(), l1Entry.getValue(), (existing, newTracks) -> {
List<VesselTrack> combined = new ArrayList<>(existing);
combined.addAll(newTracks);
combined.sort(Comparator.comparing(VesselTrack::getTimeBucket));
return combined;
});
}
if (merged.isEmpty()) {
log.info("[CACHE-{}] No data in cache for window {}", originalStrategy, groupKey);
continue;
}
// 뷰포트 필터
Map<String, List<VesselTrack>> viewportFiltered = filterByViewport(merged, request.getViewport());
// VesselTrack CompactVesselTrack 변환
List<CompactVesselTrack> compactTracks = vesselTrackToCompactConverter.convert(viewportFiltered);
// 간소화 적용
applySimplification(compactTracks, request, originalStrategy, ctx);
// VesselAccumulator에 병합
Map<String, VesselAccumulator> mergedMap = new HashMap<>(compactTracks.size());
mergeTracks(compactTracks, mergedMap);
if (!mergedMap.isEmpty()) {
List<CompactVesselTrack> mergedTracks = mergedMap.entrySet().stream()
.map(entry -> {
VesselAccumulator acc = entry.getValue();
String nationalCode = acc.mmsi != null && acc.mmsi.length() >= 3
? acc.mmsi.substring(0, 3) : null;
double avgSpeed = calculateAvgSpeed(acc.totalDistance, acc.timestamps);
return CompactVesselTrack.builder()
.vesselId(acc.mmsi)
.nationalCode(nationalCode)
.shipName(acc.shipName)
.shipType(acc.shipType)
.shipKindCode(acc.shipKindCode)
.geometry(acc.geometry)
.timestamps(acc.timestamps)
.speeds(acc.speeds)
.totalDistance(acc.totalDistance)
.avgSpeed(avgSpeed)
.maxSpeed(acc.maxSpeed)
.pointCount(acc.pointCount)
.build();
})
.collect(Collectors.toList());
uniqueVesselIds.addAll(mergedMap.keySet());
int totalPoints = mergedTracks.stream().mapToInt(CompactVesselTrack::getPointCount).sum();
log.info("[CACHE-{}] Time window {} - {} vessels, {} points",
originalStrategy, groupKey, mergedTracks.size(), totalPoints);
if (benchmark != null) {
benchmark.totalTracks += mergedTracks.size();
benchmark.totalPointsBefore += totalPoints;
benchmark.totalPointsAfter += totalPoints;
}
// 청크 분할 전송 (백프레셔 포함)
List<List<CompactVesselTrack>> batches = splitByMessageSize(mergedTracks, queryId);
for (List<CompactVesselTrack> batch : batches) {
if (isQueryCancelled(queryId, null)) return;
TrackChunkResponse response = new TrackChunkResponse();
response.setQueryId(queryId);
response.setChunkIndex(ctx.currentGlobalChunkIndex++);
response.setIsLastChunk(false);
response.setTotalChunks(-1);
response.setCompactTracks(batch);
String timeKey = groupKey + "_" + originalStrategy;
ctx.processedTimeRanges.put(timeKey, groupRanges.stream()
.mapToInt(r -> (int) Duration.between(r.getStart(), r.getEnd()).toMinutes())
.sum());
int currentProcessedMin = ctx.processedTimeRanges.values().stream()
.mapToInt(Integer::intValue).sum();
response.setStats(createChunkStats(batch, uniqueVesselIds, currentProcessedMin, ctx));
int chunkSize = batch.stream().mapToInt(this::estimateTrackSize).sum();
long currentBufferSize = pendingBufferSize.addAndGet(chunkSize);
if (bpMetrics != null) {
bpMetrics.totalBytes.addAndGet(chunkSize);
bpMetrics.chunkCount.incrementAndGet();
}
// 백프레셔 대기
int bpWait = 0;
while (pendingBufferSize.get() > MAX_PENDING_BUFFER) {
Thread.sleep(50);
bpWait++;
if (bpWait > 10 && bpMetrics != null && bpMetrics.dynamicChunkSizeKB > MIN_MESSAGE_SIZE_KB) {
bpMetrics.dynamicChunkSizeKB = Math.max(MIN_MESSAGE_SIZE_KB,
bpMetrics.dynamicChunkSizeKB - 512);
}
}
try {
chunkConsumer.accept(response);
} finally {
pendingBufferSize.addAndGet(-chunkSize);
}
double timeProgress = ctx.estimatedTotalMinutes > 0
? (double) currentProcessedMin / ctx.estimatedTotalMinutes * 100 : 0;
statusConsumer.accept(new QueryStatusUpdate(
queryId, "PROCESSING",
"Processing chunk " + ctx.currentGlobalChunkIndex,
Math.min(99.0, timeProgress)));
if (benchmark != null) benchmark.totalBatches++;
}
batches.clear();
mergedTracks.clear();
}
mergedMap.clear();
// benchmark 갱신
if (benchmark != null) {
if (originalStrategy == TableStrategy.HOURLY) {
benchmark.cacheHourlyRanges += groupRanges.size();
} else {
benchmark.cacheFiveMinRanges += groupRanges.size();
}
}
}
}
/**
* 캐시 데이터에서 뷰포트 교차 선박 필터링 JTS 파싱 없이 WKT 좌표 직접 파싱
*/
private Map<String, List<VesselTrack>> filterByViewport(
Map<String, List<VesselTrack>> tracksByMmsi, ViewportFilter viewport) {
if (viewport == null || !viewport.isValid() || tracksByMmsi.isEmpty()) {
return tracksByMmsi;
}
double minLon = viewport.getMinLon(), minLat = viewport.getMinLat();
double maxLon = viewport.getMaxLon(), maxLat = viewport.getMaxLat();
Map<String, List<VesselTrack>> filtered = new LinkedHashMap<>();
for (Map.Entry<String, List<VesselTrack>> entry : tracksByMmsi.entrySet()) {
boolean inViewport = false;
for (VesselTrack track : entry.getValue()) {
if (isTrackInViewport(track.getTrackGeom(), minLon, minLat, maxLon, maxLat)) {
inViewport = true;
break;
}
}
if (inViewport) {
filtered.put(entry.getKey(), entry.getValue());
}
}
return filtered;
}
/**
* WKT LineStringM 좌표 경량 파싱으로 뷰포트 bbox 교차 확인
*/
private boolean isTrackInViewport(String wkt, double minLon, double minLat, double maxLon, double maxLat) {
if (wkt == null || wkt.isEmpty()) return false;
int openParen = wkt.indexOf('(');
int closeParen = wkt.lastIndexOf(')');
if (openParen < 0 || closeParen <= openParen + 1) return false;
String coords = wkt.substring(openParen + 1, closeParen);
for (String point : coords.split(",")) {
String trimmed = point.trim();
int firstSpace = trimmed.indexOf(' ');
if (firstSpace <= 0) continue;
int secondSpace = trimmed.indexOf(' ', firstSpace + 1);
try {
double lon = Double.parseDouble(trimmed.substring(0, firstSpace));
double lat = Double.parseDouble(trimmed.substring(firstSpace + 1,
secondSpace > 0 ? secondSpace : trimmed.length()));
if (lon >= minLon && lon <= maxLon && lat >= minLat && lat <= maxLat) {
return true;
}
} catch (NumberFormatException e) {
// skip malformed coordinate
}
}
return false;
}
/**
* CompactVesselTrack 리스트를 VesselAccumulator 맵에 병합
*/
private void mergeTracks(List<CompactVesselTrack> tracks, Map<String, VesselAccumulator> mergedMap) {
for (CompactVesselTrack track : tracks) {
String vesselId = track.getVesselId();
VesselAccumulator accumulator = mergedMap.get(vesselId);
if (accumulator == null) {
accumulator = new VesselAccumulator();
accumulator.mmsi = vesselId;
accumulator.shipName = track.getShipName();
accumulator.shipType = track.getShipType();
accumulator.shipKindCode = track.getShipKindCode();
mergedMap.put(vesselId, accumulator);
}
accumulator.geometry.addAll(track.getGeometry());
accumulator.timestamps.addAll(track.getTimestamps());
accumulator.speeds.addAll(track.getSpeeds());
accumulator.totalDistance += track.getTotalDistance();
accumulator.maxSpeed = Math.max(accumulator.maxSpeed, track.getMaxSpeed());
accumulator.pointCount += track.getPointCount();
}
}
/**
* CompactVesselTrack 리스트에 전략/ 레벨별 간소화 적용
*/
private void applySimplification(List<CompactVesselTrack> tracks, TrackQueryRequest request,
TableStrategy strategy, QueryContext ctx) {
if (tracks == null || tracks.isEmpty()) return;
for (CompactVesselTrack track : tracks) {
if (track.getGeometry() == null || track.getGeometry().size() <= 1) continue;
int originalSize = track.getGeometry().size();
List<double[]> simplifiedGeometry = new ArrayList<>();
List<String> simplifiedTimestamps = new ArrayList<>();
List<Double> simplifiedSpeeds = new ArrayList<>();
double[] prevPoint = null;
LocalDateTime prevTime = null;
for (int i = 0; i < track.getGeometry().size(); i++) {
double[] point = track.getGeometry().get(i);
LocalDateTime currentTime = null;
String tsStr = track.getTimestamps().get(i);
try {
if (tsStr.matches("\\d{10,}")) {
currentTime = LocalDateTime.ofInstant(
java.time.Instant.ofEpochSecond(Long.parseLong(tsStr)),
java.time.ZoneId.systemDefault());
} else {
currentTime = LocalDateTime.parse(tsStr, TIMESTAMP_FORMATTER);
}
} catch (Exception e) {
currentTime = LocalDateTime.now();
}
boolean include = false;
if (i == 0 || i == track.getGeometry().size() - 1) {
include = true;
} else if (prevPoint != null && prevTime != null) {
double distance = calculateDistance(prevPoint[1], prevPoint[0], point[1], point[0]);
long minutesSincePrev = ChronoUnit.MINUTES.between(prevTime, currentTime);
if (strategy == TableStrategy.DAILY) {
include = distance > 0.05 || minutesSincePrev >= 60;
} else if (strategy == TableStrategy.HOURLY) {
include = distance > 1.08 || minutesSincePrev >= 60;
} else {
include = distance > 0.54 || minutesSincePrev >= 30;
}
if (track.getAvgSpeed() > 0 && track.getAvgSpeed() < 5.0) {
include = distance > 1.5 || minutesSincePrev >= 45;
}
}
if (include) {
simplifiedGeometry.add(point);
simplifiedTimestamps.add(tsStr);
simplifiedSpeeds.add(track.getSpeeds().size() > i ? track.getSpeeds().get(i) : 0.0);
prevPoint = point;
prevTime = currentTime;
}
}
// 레벨에 따른 추가 샘플링
if (request.getZoomLevel() != null && request.getZoomLevel() < 10 && simplifiedGeometry.size() > 2) {
int sampleRate = request.getZoomLevel() < 6 ? 10 :
request.getZoomLevel() < 8 ? 5 : 2;
List<double[]> sampledGeometry = new ArrayList<>();
List<String> sampledTimestamps = new ArrayList<>();
List<Double> sampledSpeeds = new ArrayList<>();
for (int i = 0; i < simplifiedGeometry.size(); i++) {
if (i % sampleRate == 0 || i == simplifiedGeometry.size() - 1) {
sampledGeometry.add(simplifiedGeometry.get(i));
sampledTimestamps.add(simplifiedTimestamps.get(i));
sampledSpeeds.add(simplifiedSpeeds.get(i));
}
}
track.setGeometry(sampledGeometry);
track.setTimestamps(sampledTimestamps);
track.setSpeeds(sampledSpeeds);
track.setPointCount(sampledGeometry.size());
} else {
track.setGeometry(simplifiedGeometry);
track.setTimestamps(simplifiedTimestamps);
track.setSpeeds(simplifiedSpeeds);
track.setPointCount(simplifiedGeometry.size());
}
if (ctx != null && ctx.vesselLogCount < 10 && originalSize > track.getPointCount()) {
double reductionRate = (1 - (double) track.getPointCount() / originalSize) * 100;
log.info("[CACHE-SIMPLIFY] Vessel {} simplified: {} -> {} points ({}% reduced, zoom: {})",
track.getVesselId(), originalSize, track.getPointCount(),
Math.round(reductionRate), request.getZoomLevel());
ctx.vesselLogCount++;
}
}
}
}

파일 보기

@ -160,7 +160,8 @@ public class QueryMetricsController {
@GetMapping("/timeseries")
@Operation(summary = "쿼리 메트릭 시계열", description = "시간별/일별 버킷 집계 + Top 10 클라이언트")
public Map<String, Object> getTimeSeries(
@Parameter(description = "조회 기간 (일)") @RequestParam(defaultValue = "7") int days) {
@Parameter(description = "조회 기간 (일)") @RequestParam(defaultValue = "7") int days,
@Parameter(description = "Top 클라이언트 그룹 기준 (ip | id)") @RequestParam(defaultValue = "ip") String groupBy) {
days = Math.min(days, 90);
String granularity = days <= 7 ? "HOURLY" : "DAILY";
@ -184,15 +185,17 @@ public class QueryMetricsController {
List<Map<String, Object>> buckets = queryJdbcTemplate.queryForList(bucketSql);
boolean groupById = "id".equalsIgnoreCase(groupBy);
String clientColumn = groupById ? "client_id" : "client_ip";
String topClientsSql = """
SELECT client_ip, COUNT(*) AS query_count,
SELECT %s AS client, COUNT(*) AS query_count,
COALESCE(AVG(elapsed_ms), 0) AS avg_elapsed_ms
FROM signal.t_query_metrics
WHERE created_at >= NOW() - INTERVAL '%d days'
AND client_ip IS NOT NULL
GROUP BY client_ip
AND %s IS NOT NULL
GROUP BY %s
ORDER BY query_count DESC LIMIT 10
""".formatted(days);
""".formatted(clientColumn, days, clientColumn, clientColumn);
List<Map<String, Object>> topClients = queryJdbcTemplate.queryForList(topClientsSql);
@ -200,6 +203,7 @@ public class QueryMetricsController {
result.put("buckets", buckets);
result.put("topClients", topClients);
result.put("granularity", granularity);
result.put("groupBy", groupById ? "id" : "ip");
return result;
}

파일 보기

@ -32,7 +32,7 @@ public class QueryMetricsBufferService {
unique_vessels, total_tracks, total_points, points_after_simplify,
total_chunks, response_bytes,
elapsed_ms, db_query_ms, simplify_ms, backpressure_events,
status, client_ip
status, client_ip, client_id
) VALUES (
?, ?, ?, now(),
?, ?, ?, ?, ?,
@ -40,7 +40,7 @@ public class QueryMetricsBufferService {
?, ?, ?, ?,
?, ?,
?, ?, ?, ?,
?, ?
?, ?, ?
)
""";
@ -73,6 +73,27 @@ public class QueryMetricsBufferService {
}
}
@PostConstruct
void ensureClientIdColumn() {
try {
queryJdbcTemplate.execute("""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = 'signal' AND table_name = 't_query_metrics' AND column_name = 'client_id'
) THEN
ALTER TABLE signal.t_query_metrics ADD COLUMN client_id VARCHAR(100);
CREATE INDEX IF NOT EXISTS idx_query_metrics_client_id ON signal.t_query_metrics(client_id, created_at);
END IF;
END $$
""");
log.info("t_query_metrics client_id column ensured");
} catch (Exception e) {
log.warn("Failed to ensure client_id column: {}", e.getMessage());
}
}
/**
* 메트릭 레코드를 버퍼에 추가 (lock-free)
*/
@ -119,7 +140,7 @@ public class QueryMetricsBufferService {
m.getUniqueVessels(), m.getTotalTracks(), m.getTotalPoints(), m.getPointsAfterSimplify(),
m.getTotalChunks(), m.getResponseBytes(),
m.getElapsedMs(), m.getDbQueryMs(), m.getSimplifyMs(), m.getBackpressureEvents(),
m.getStatus(), m.getClientIp()
m.getStatus(), m.getClientIp(), m.getClientId()
};
}

파일 보기

@ -131,5 +131,6 @@ public class QueryMetricsService {
private final int backpressureEvents;
private final String status;
private final String clientIp;
private final String clientId;
}
}