Merge pull request 'release: 2026-03-27 (5건 커밋)' (#116) from develop into main
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 9m4s
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 9m4s
This commit is contained in:
커밋
99a7f607f7
@ -4,6 +4,16 @@
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
## [2026-03-27]
|
||||||
|
|
||||||
|
### 추가
|
||||||
|
- WebSocket 리플레이 쿼리 L1/L2 캐시 통합 — HOURLY/5MIN 구간 DB 의존 제거, 당일 쿼리 100% 캐시
|
||||||
|
- 쿼리 메트릭 사용자 ID 수집 — GC_SESSION JWT에서 인증된 사용자 email 추출
|
||||||
|
- 대시보드 Top 클라이언트 IP/ID 토글 — groupBy 파라미터로 IP 기준 또는 사용자 ID 기준 전환
|
||||||
|
|
||||||
|
### 수정
|
||||||
|
- vessel info SQL 컬럼명 오류 수정 (ship_nm → name) — 선박 정보 조회 실패("bad SQL grammar") 해결
|
||||||
|
|
||||||
## [2026-03-19]
|
## [2026-03-19]
|
||||||
|
|
||||||
### 변경
|
### 변경
|
||||||
|
|||||||
@ -68,7 +68,7 @@ export const monitorApi = {
|
|||||||
return fetchJson(`/api/monitoring/query-metrics/summary?hours=${hours}`)
|
return fetchJson(`/api/monitoring/query-metrics/summary?hours=${hours}`)
|
||||||
},
|
},
|
||||||
|
|
||||||
getQueryMetricsTimeSeries(days = 7): Promise<QueryMetricsTimeSeries> {
|
getQueryMetricsTimeSeries(days = 7, groupBy: 'ip' | 'id' = 'ip'): Promise<QueryMetricsTimeSeries> {
|
||||||
return fetchJson(`/api/monitoring/query-metrics/timeseries?days=${days}`)
|
return fetchJson(`/api/monitoring/query-metrics/timeseries?days=${days}&groupBy=${groupBy}`)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|||||||
@ -32,6 +32,7 @@ export default function Dashboard() {
|
|||||||
const [running, setRunning] = useCachedState<RunningJob[]>('dash.running', [])
|
const [running, setRunning] = useCachedState<RunningJob[]>('dash.running', [])
|
||||||
const [queryTs, setQueryTs] = useCachedState<QueryMetricsTimeSeries | null>('dash.queryTs', null)
|
const [queryTs, setQueryTs] = useCachedState<QueryMetricsTimeSeries | null>('dash.queryTs', null)
|
||||||
const [days, setDays] = useState(7)
|
const [days, setDays] = useState(7)
|
||||||
|
const [clientGroupBy, setClientGroupBy] = useState<'ip' | 'id'>('ip')
|
||||||
const [isQueryChartsOpen, setIsQueryChartsOpen] = useState(() =>
|
const [isQueryChartsOpen, setIsQueryChartsOpen] = useState(() =>
|
||||||
localStorage.getItem('dashboard-query-charts') !== 'collapsed',
|
localStorage.getItem('dashboard-query-charts') !== 'collapsed',
|
||||||
)
|
)
|
||||||
@ -51,8 +52,8 @@ export default function Dashboard() {
|
|||||||
monitorApi.getDelay().then(setDelay).catch(() => {})
|
monitorApi.getDelay().then(setDelay).catch(() => {})
|
||||||
batchApi.getDailyStats().then(setDaily).catch(() => {})
|
batchApi.getDailyStats().then(setDaily).catch(() => {})
|
||||||
batchApi.getRunningJobs().then(setRunning).catch(() => {})
|
batchApi.getRunningJobs().then(setRunning).catch(() => {})
|
||||||
monitorApi.getQueryMetricsTimeSeries(days).then(setQueryTs).catch(() => {})
|
monitorApi.getQueryMetricsTimeSeries(days, clientGroupBy).then(setQueryTs).catch(() => {})
|
||||||
}, POLL_INTERVAL, [days])
|
}, POLL_INTERVAL, [days, clientGroupBy])
|
||||||
|
|
||||||
const memUsage = metrics
|
const memUsage = metrics
|
||||||
? Math.round((metrics.memory.used / metrics.memory.max) * 100)
|
? Math.round((metrics.memory.used / metrics.memory.max) * 100)
|
||||||
@ -327,7 +328,21 @@ export default function Dashboard() {
|
|||||||
{/* Top Clients */}
|
{/* Top Clients */}
|
||||||
{queryTs.topClients.length > 0 && (
|
{queryTs.topClients.length > 0 && (
|
||||||
<div>
|
<div>
|
||||||
<div className="mb-2 text-sm font-medium text-muted">{t('dashboard.topClients')}</div>
|
<div className="mb-2 flex items-center gap-2">
|
||||||
|
<span className="text-sm font-medium text-muted">{t('dashboard.topClients')}</span>
|
||||||
|
<div className="flex overflow-hidden rounded-md border border-[var(--border-primary)] text-xs">
|
||||||
|
<button
|
||||||
|
type="button"
|
||||||
|
className={`px-2 py-0.5 ${clientGroupBy === 'ip' ? 'bg-[var(--accent-primary)] text-white' : 'text-[var(--text-secondary)]'}`}
|
||||||
|
onClick={() => setClientGroupBy('ip')}
|
||||||
|
>IP</button>
|
||||||
|
<button
|
||||||
|
type="button"
|
||||||
|
className={`px-2 py-0.5 ${clientGroupBy === 'id' ? 'bg-[var(--accent-primary)] text-white' : 'text-[var(--text-secondary)]'}`}
|
||||||
|
onClick={() => setClientGroupBy('id')}
|
||||||
|
>ID</button>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
<div className="space-y-2">
|
<div className="space-y-2">
|
||||||
{queryTs.topClients.map((c, i) => {
|
{queryTs.topClients.map((c, i) => {
|
||||||
const maxCount = queryTs.topClients[0].query_count
|
const maxCount = queryTs.topClients[0].query_count
|
||||||
|
|||||||
@ -22,8 +22,10 @@ import org.springframework.http.server.ServletServerHttpRequest;
|
|||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import jakarta.servlet.http.Cookie;
|
||||||
import jakarta.servlet.http.HttpServletRequest;
|
import jakarta.servlet.http.HttpServletRequest;
|
||||||
import java.security.Principal;
|
import java.security.Principal;
|
||||||
|
import java.util.Base64;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
@ -180,11 +182,18 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
|
|||||||
String clientIp = extractClientIp(request);
|
String clientIp = extractClientIp(request);
|
||||||
attributes.put("CLIENT_IP", clientIp);
|
attributes.put("CLIENT_IP", clientIp);
|
||||||
|
|
||||||
// User-Agent 추출
|
|
||||||
if (request instanceof ServletServerHttpRequest) {
|
if (request instanceof ServletServerHttpRequest) {
|
||||||
HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
|
HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
|
||||||
|
|
||||||
|
// User-Agent 추출
|
||||||
String userAgent = servletRequest.getHeader("User-Agent");
|
String userAgent = servletRequest.getHeader("User-Agent");
|
||||||
attributes.put("USER_AGENT", userAgent);
|
attributes.put("USER_AGENT", userAgent);
|
||||||
|
|
||||||
|
// GC_SESSION 쿠키에서 JWT email 추출 (guide 서비스 인증)
|
||||||
|
String clientId = extractEmailFromJwtCookie(servletRequest);
|
||||||
|
if (clientId != null) {
|
||||||
|
attributes.put("CLIENT_ID", clientId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
@ -225,5 +234,43 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
|
|||||||
// ServletServerHttpRequest가 아닌 경우 기본값
|
// ServletServerHttpRequest가 아닌 경우 기본값
|
||||||
return "unknown";
|
return "unknown";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* GC_SESSION 쿠키에서 JWT payload의 email 클레임 추출.
|
||||||
|
* JWT 검증은 nginx auth_request에서 이미 완료 — 여기서는 payload 디코딩만 수행.
|
||||||
|
*/
|
||||||
|
private String extractEmailFromJwtCookie(HttpServletRequest request) {
|
||||||
|
Cookie[] cookies = request.getCookies();
|
||||||
|
if (cookies == null) return null;
|
||||||
|
|
||||||
|
String token = null;
|
||||||
|
for (Cookie cookie : cookies) {
|
||||||
|
if ("GC_SESSION".equals(cookie.getName())) {
|
||||||
|
token = cookie.getValue();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (token == null || token.isEmpty()) return null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
// JWT: header.payload.signature — payload만 Base64URL 디코딩
|
||||||
|
String[] parts = token.split("\\.");
|
||||||
|
if (parts.length < 2) return null;
|
||||||
|
|
||||||
|
String payload = new String(Base64.getUrlDecoder().decode(parts[1]));
|
||||||
|
// 간단한 JSON 파싱 (Jackson 의존 없이): "email":"value" 추출
|
||||||
|
int emailIdx = payload.indexOf("\"email\"");
|
||||||
|
if (emailIdx < 0) return null;
|
||||||
|
|
||||||
|
int colonIdx = payload.indexOf(':', emailIdx);
|
||||||
|
int quoteStart = payload.indexOf('"', colonIdx + 1);
|
||||||
|
int quoteEnd = payload.indexOf('"', quoteStart + 1);
|
||||||
|
if (quoteStart < 0 || quoteEnd < 0) return null;
|
||||||
|
|
||||||
|
return payload.substring(quoteStart + 1, quoteEnd);
|
||||||
|
} catch (Exception e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -71,12 +71,18 @@ public class StompTrackController {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// 세션 속성에서 CLIENT_IP 추출
|
// 세션 속성에서 CLIENT_IP, CLIENT_ID 추출
|
||||||
String clientIp = null;
|
String clientIp = null;
|
||||||
|
String clientId = null;
|
||||||
Map<String, Object> sessionAttrs = headerAccessor.getSessionAttributes();
|
Map<String, Object> sessionAttrs = headerAccessor.getSessionAttributes();
|
||||||
if (sessionAttrs != null && sessionAttrs.containsKey("CLIENT_IP")) {
|
if (sessionAttrs != null) {
|
||||||
|
if (sessionAttrs.containsKey("CLIENT_IP")) {
|
||||||
clientIp = (String) sessionAttrs.get("CLIENT_IP");
|
clientIp = (String) sessionAttrs.get("CLIENT_IP");
|
||||||
}
|
}
|
||||||
|
if (sessionAttrs.containsKey("CLIENT_ID")) {
|
||||||
|
clientId = (String) sessionAttrs.get("CLIENT_ID");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 비동기 스트리밍 시작 - 청크 모드 체크
|
// 비동기 스트리밍 시작 - 청크 모드 체크
|
||||||
if (request.isChunkedMode()) {
|
if (request.isChunkedMode()) {
|
||||||
@ -86,7 +92,8 @@ public class StompTrackController {
|
|||||||
sessionId,
|
sessionId,
|
||||||
chunk -> sendChunkedDataToUser(userId, chunk),
|
chunk -> sendChunkedDataToUser(userId, chunk),
|
||||||
statusCallback,
|
statusCallback,
|
||||||
clientIp
|
clientIp,
|
||||||
|
clientId
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
trackStreamingService.streamTracks(
|
trackStreamingService.streamTracks(
|
||||||
|
|||||||
@ -1,7 +1,11 @@
|
|||||||
package gc.mda.signal_batch.global.websocket.service;
|
package gc.mda.signal_batch.global.websocket.service;
|
||||||
|
|
||||||
|
import gc.mda.signal_batch.batch.reader.FiveMinTrackCache;
|
||||||
|
import gc.mda.signal_batch.batch.reader.HourlyTrackCache;
|
||||||
|
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
|
||||||
import gc.mda.signal_batch.global.exception.MemoryBudgetExceededException;
|
import gc.mda.signal_batch.global.exception.MemoryBudgetExceededException;
|
||||||
import gc.mda.signal_batch.global.util.TrackMemoryEstimator;
|
import gc.mda.signal_batch.global.util.TrackMemoryEstimator;
|
||||||
|
import gc.mda.signal_batch.global.util.VesselTrackToCompactConverter;
|
||||||
import gc.mda.signal_batch.global.websocket.dto.TrackChunkResponse;
|
import gc.mda.signal_batch.global.websocket.dto.TrackChunkResponse;
|
||||||
import gc.mda.signal_batch.global.websocket.interceptor.TrackQueryInterceptor;
|
import gc.mda.signal_batch.global.websocket.interceptor.TrackQueryInterceptor;
|
||||||
import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack;
|
import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack;
|
||||||
@ -55,6 +59,9 @@ public class ChunkedTrackStreamingService {
|
|||||||
private final CacheTrackSimplifier cacheTrackSimplifier;
|
private final CacheTrackSimplifier cacheTrackSimplifier;
|
||||||
private final TrackMemoryBudgetManager memoryBudgetManager;
|
private final TrackMemoryBudgetManager memoryBudgetManager;
|
||||||
private final QueryMetricsBufferService queryMetricsBufferService;
|
private final QueryMetricsBufferService queryMetricsBufferService;
|
||||||
|
private final HourlyTrackCache hourlyTrackCache;
|
||||||
|
private final FiveMinTrackCache fiveMinTrackCache;
|
||||||
|
private final VesselTrackToCompactConverter vesselTrackToCompactConverter;
|
||||||
private static final ThreadLocal<WKTReader> wktReaderLocal = ThreadLocal.withInitial(WKTReader::new);
|
private static final ThreadLocal<WKTReader> wktReaderLocal = ThreadLocal.withInitial(WKTReader::new);
|
||||||
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||||
private static final int MAX_TRACKS_PER_CHUNK = 500000; // 청크당 최대 트랙 수 (10만 선박 지원)
|
private static final int MAX_TRACKS_PER_CHUNK = 500000; // 청크당 최대 트랙 수 (10만 선박 지원)
|
||||||
@ -105,7 +112,10 @@ public class ChunkedTrackStreamingService {
|
|||||||
DailyTrackCacheManager dailyTrackCacheManager,
|
DailyTrackCacheManager dailyTrackCacheManager,
|
||||||
CacheTrackSimplifier cacheTrackSimplifier,
|
CacheTrackSimplifier cacheTrackSimplifier,
|
||||||
TrackMemoryBudgetManager memoryBudgetManager,
|
TrackMemoryBudgetManager memoryBudgetManager,
|
||||||
QueryMetricsBufferService queryMetricsBufferService) {
|
QueryMetricsBufferService queryMetricsBufferService,
|
||||||
|
HourlyTrackCache hourlyTrackCache,
|
||||||
|
FiveMinTrackCache fiveMinTrackCache,
|
||||||
|
VesselTrackToCompactConverter vesselTrackToCompactConverter) {
|
||||||
this.queryJdbcTemplate = queryJdbcTemplate;
|
this.queryJdbcTemplate = queryJdbcTemplate;
|
||||||
this.queryDataSource = queryDataSource;
|
this.queryDataSource = queryDataSource;
|
||||||
this.activeQueryManager = activeQueryManager;
|
this.activeQueryManager = activeQueryManager;
|
||||||
@ -114,6 +124,9 @@ public class ChunkedTrackStreamingService {
|
|||||||
this.cacheTrackSimplifier = cacheTrackSimplifier;
|
this.cacheTrackSimplifier = cacheTrackSimplifier;
|
||||||
this.memoryBudgetManager = memoryBudgetManager;
|
this.memoryBudgetManager = memoryBudgetManager;
|
||||||
this.queryMetricsBufferService = queryMetricsBufferService;
|
this.queryMetricsBufferService = queryMetricsBufferService;
|
||||||
|
this.hourlyTrackCache = hourlyTrackCache;
|
||||||
|
this.fiveMinTrackCache = fiveMinTrackCache;
|
||||||
|
this.vesselTrackToCompactConverter = vesselTrackToCompactConverter;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -153,6 +166,8 @@ public class ChunkedTrackStreamingService {
|
|||||||
private static class QueryBenchmark {
|
private static class QueryBenchmark {
|
||||||
int cacheHitDays = 0;
|
int cacheHitDays = 0;
|
||||||
int dbQueryDays = 0;
|
int dbQueryDays = 0;
|
||||||
|
int cacheHourlyRanges = 0; // L2 캐시 그룹 수
|
||||||
|
int cacheFiveMinRanges = 0; // L1 캐시 그룹 수
|
||||||
int totalTracks = 0;
|
int totalTracks = 0;
|
||||||
int totalPointsBefore = 0;
|
int totalPointsBefore = 0;
|
||||||
int totalPointsAfter = 0;
|
int totalPointsAfter = 0;
|
||||||
@ -164,13 +179,15 @@ public class ChunkedTrackStreamingService {
|
|||||||
int connViewportPass1 = 0; // collectViewportVesselIds DB 쿼리
|
int connViewportPass1 = 0; // collectViewportVesselIds DB 쿼리
|
||||||
int connDailyPages = 0; // streamDailyTableWithPagination 페이지
|
int connDailyPages = 0; // streamDailyTableWithPagination 페이지
|
||||||
int connVesselInfo = 0; // preloadVesselInfoWithSessionCache 배치
|
int connVesselInfo = 0; // preloadVesselInfoWithSessionCache 배치
|
||||||
int connHourly5min = 0; // processTableRange (5min/hourly)
|
int connHourly5min = 0; // processTableRange (5min/hourly) — 기존 DB 경로 잔류용
|
||||||
int connTableCheck = 0; // hasDataInTable 존재 검증
|
int connTableCheck = 0; // hasDataInTable 존재 검증
|
||||||
Integer zoomLevel;
|
Integer zoomLevel;
|
||||||
|
|
||||||
String determinePath() {
|
String determinePath() {
|
||||||
if (cacheHitDays > 0 && dbQueryDays > 0) return "HYBRID";
|
boolean anyCache = cacheHitDays > 0 || cacheHourlyRanges > 0 || cacheFiveMinRanges > 0;
|
||||||
if (cacheHitDays > 0) return "CACHE";
|
boolean anyDb = dbQueryDays > 0;
|
||||||
|
if (anyCache && anyDb) return "HYBRID";
|
||||||
|
if (anyCache) return "CACHE";
|
||||||
return "DB";
|
return "DB";
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -182,6 +199,7 @@ public class ChunkedTrackStreamingService {
|
|||||||
return String.format(
|
return String.format(
|
||||||
"{\"queryId\":\"%s\",\"timestamp\":\"%s\",\"path\":\"%s\"," +
|
"{\"queryId\":\"%s\",\"timestamp\":\"%s\",\"path\":\"%s\"," +
|
||||||
"\"zoomLevel\":%s,\"dateRanges\":%d,\"cacheHitDays\":%d,\"dbQueryDays\":%d," +
|
"\"zoomLevel\":%s,\"dateRanges\":%d,\"cacheHitDays\":%d,\"dbQueryDays\":%d," +
|
||||||
|
"\"cacheHourlyRanges\":%d,\"cacheFiveMinRanges\":%d," +
|
||||||
"\"totalTracks\":%d,\"totalPointsBefore\":%d,\"totalPointsAfter\":%d," +
|
"\"totalTracks\":%d,\"totalPointsBefore\":%d,\"totalPointsAfter\":%d," +
|
||||||
"\"pointReductionPct\":%d,\"totalBatches\":%d,\"batchesBeforeSimplify\":%d," +
|
"\"pointReductionPct\":%d,\"totalBatches\":%d,\"batchesBeforeSimplify\":%d," +
|
||||||
"\"simplifyTimeMs\":%d,\"dbQueryTimeMs\":%d,\"totalElapsedMs\":%d," +
|
"\"simplifyTimeMs\":%d,\"dbQueryTimeMs\":%d,\"totalElapsedMs\":%d," +
|
||||||
@ -192,8 +210,9 @@ public class ChunkedTrackStreamingService {
|
|||||||
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),
|
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),
|
||||||
determinePath(),
|
determinePath(),
|
||||||
zoomLevel != null ? zoomLevel.toString() : "null",
|
zoomLevel != null ? zoomLevel.toString() : "null",
|
||||||
cacheHitDays + dbQueryDays,
|
cacheHitDays + dbQueryDays + cacheHourlyRanges + cacheFiveMinRanges,
|
||||||
cacheHitDays, dbQueryDays,
|
cacheHitDays, dbQueryDays,
|
||||||
|
cacheHourlyRanges, cacheFiveMinRanges,
|
||||||
totalTracks, totalPointsBefore, totalPointsAfter,
|
totalTracks, totalPointsBefore, totalPointsAfter,
|
||||||
totalPointsBefore > 0 ? Math.round((1 - (double) totalPointsAfter / totalPointsBefore) * 100) : 0,
|
totalPointsBefore > 0 ? Math.round((1 - (double) totalPointsAfter / totalPointsBefore) * 100) : 0,
|
||||||
totalBatches, batchesBeforeSimplify,
|
totalBatches, batchesBeforeSimplify,
|
||||||
@ -238,12 +257,12 @@ public class ChunkedTrackStreamingService {
|
|||||||
|
|
||||||
// DB에서 조회 (signal_kind_code는 캐시 저장 시 치환된 값)
|
// DB에서 조회 (signal_kind_code는 캐시 저장 시 치환된 값)
|
||||||
try {
|
try {
|
||||||
String sql = "SELECT ship_nm, vessel_type, signal_kind_code FROM signal.t_ais_position " +
|
String sql = "SELECT name, vessel_type, signal_kind_code FROM signal.t_ais_position " +
|
||||||
"WHERE mmsi = ? LIMIT 1";
|
"WHERE mmsi = ? LIMIT 1";
|
||||||
|
|
||||||
VesselInfo info = queryJdbcTemplate.queryForObject(sql,
|
VesselInfo info = queryJdbcTemplate.queryForObject(sql,
|
||||||
(rs, rowNum) -> new VesselInfo(
|
(rs, rowNum) -> new VesselInfo(
|
||||||
rs.getString("ship_nm"),
|
rs.getString("name"),
|
||||||
rs.getString("vessel_type"),
|
rs.getString("vessel_type"),
|
||||||
rs.getString("signal_kind_code")
|
rs.getString("signal_kind_code")
|
||||||
),
|
),
|
||||||
@ -320,7 +339,7 @@ public class ChunkedTrackStreamingService {
|
|||||||
.map(id -> "?")
|
.map(id -> "?")
|
||||||
.collect(Collectors.joining(","));
|
.collect(Collectors.joining(","));
|
||||||
|
|
||||||
String sql = "SELECT mmsi, ship_nm, vessel_type, signal_kind_code " +
|
String sql = "SELECT mmsi, name, vessel_type, signal_kind_code " +
|
||||||
"FROM signal.t_ais_position " +
|
"FROM signal.t_ais_position " +
|
||||||
"WHERE mmsi IN (" + placeholders + ")";
|
"WHERE mmsi IN (" + placeholders + ")";
|
||||||
|
|
||||||
@ -329,7 +348,7 @@ public class ChunkedTrackStreamingService {
|
|||||||
queryJdbcTemplate.query(sql, rs -> {
|
queryJdbcTemplate.query(sql, rs -> {
|
||||||
String visselId = rs.getString("mmsi");
|
String visselId = rs.getString("mmsi");
|
||||||
VesselInfo info = new VesselInfo(
|
VesselInfo info = new VesselInfo(
|
||||||
rs.getString("ship_nm"),
|
rs.getString("name"),
|
||||||
rs.getString("vessel_type"),
|
rs.getString("vessel_type"),
|
||||||
rs.getString("signal_kind_code")
|
rs.getString("signal_kind_code")
|
||||||
);
|
);
|
||||||
@ -389,7 +408,7 @@ public class ChunkedTrackStreamingService {
|
|||||||
.map(id -> "?")
|
.map(id -> "?")
|
||||||
.collect(Collectors.joining(","));
|
.collect(Collectors.joining(","));
|
||||||
|
|
||||||
String sql = "SELECT mmsi, ship_nm, vessel_type, signal_kind_code " +
|
String sql = "SELECT mmsi, name, vessel_type, signal_kind_code " +
|
||||||
"FROM signal.t_ais_position " +
|
"FROM signal.t_ais_position " +
|
||||||
"WHERE mmsi IN (" + placeholders + ")";
|
"WHERE mmsi IN (" + placeholders + ")";
|
||||||
|
|
||||||
@ -398,7 +417,7 @@ public class ChunkedTrackStreamingService {
|
|||||||
queryJdbcTemplate.query(sql, rs -> {
|
queryJdbcTemplate.query(sql, rs -> {
|
||||||
String vesselId = rs.getString("mmsi");
|
String vesselId = rs.getString("mmsi");
|
||||||
VesselInfo info = new VesselInfo(
|
VesselInfo info = new VesselInfo(
|
||||||
rs.getString("ship_nm"),
|
rs.getString("name"),
|
||||||
rs.getString("vessel_type"),
|
rs.getString("vessel_type"),
|
||||||
rs.getString("signal_kind_code")
|
rs.getString("signal_kind_code")
|
||||||
);
|
);
|
||||||
@ -447,6 +466,12 @@ public class ChunkedTrackStreamingService {
|
|||||||
|
|
||||||
for (Map.Entry<TableStrategy, List<TimeRange>> entry : strategyMap.entrySet()) {
|
for (Map.Entry<TableStrategy, List<TimeRange>> entry : strategyMap.entrySet()) {
|
||||||
TableStrategy strategy = entry.getKey();
|
TableStrategy strategy = entry.getKey();
|
||||||
|
|
||||||
|
// HOURLY/FIVE_MINUTE: 캐시에서 직접 뷰포트 필터 수행 → Pass 1 DB 쿼리 불필요
|
||||||
|
if (strategy == TableStrategy.HOURLY || strategy == TableStrategy.FIVE_MINUTE) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
String tableName = strategy.getTableName();
|
String tableName = strategy.getTableName();
|
||||||
|
|
||||||
for (TimeRange range : entry.getValue()) {
|
for (TimeRange range : entry.getValue()) {
|
||||||
@ -725,7 +750,8 @@ public class ChunkedTrackStreamingService {
|
|||||||
String sessionId,
|
String sessionId,
|
||||||
Consumer<TrackChunkResponse> chunkConsumer,
|
Consumer<TrackChunkResponse> chunkConsumer,
|
||||||
Consumer<QueryStatusUpdate> statusConsumer,
|
Consumer<QueryStatusUpdate> statusConsumer,
|
||||||
String clientIp) {
|
String clientIp,
|
||||||
|
String clientId) {
|
||||||
boolean slotAcquired = false;
|
boolean slotAcquired = false;
|
||||||
QueryBenchmark benchmark = null;
|
QueryBenchmark benchmark = null;
|
||||||
QueryContext ctx = null;
|
QueryContext ctx = null;
|
||||||
@ -867,193 +893,11 @@ public class ChunkedTrackStreamingService {
|
|||||||
globalChunkIndex, uniqueVesselIds, viewportVesselIds, benchmark, ctx);
|
globalChunkIndex, uniqueVesselIds, viewportVesselIds, benchmark, ctx);
|
||||||
globalChunkIndex = ctx.getCurrentChunkIndex();
|
globalChunkIndex = ctx.getCurrentChunkIndex();
|
||||||
} else {
|
} else {
|
||||||
// Hourly/5min은 6시간 단위로 그룹화하여 처리
|
// ★ HOURLY/FIVE_MINUTE: L1/L2 캐시에서 직접 처리
|
||||||
Map<String, List<TimeRange>> timeGroups = groupRangesByTimeWindow(ranges, 6);
|
processHourlyFiveMinWithCache(ranges, request, queryId,
|
||||||
|
chunkConsumer, statusConsumer, uniqueVesselIds,
|
||||||
for (Map.Entry<String, List<TimeRange>> groupEntry : timeGroups.entrySet()) {
|
strategy, benchmark, ctx, metrics);
|
||||||
String groupKey = groupEntry.getKey();
|
globalChunkIndex = ctx.getCurrentChunkIndex();
|
||||||
List<TimeRange> groupRanges = groupEntry.getValue();
|
|
||||||
log.info("[{}] Processing time window {} with {} ranges", strategy, groupKey, groupRanges.size());
|
|
||||||
|
|
||||||
// 시간 그룹 데이터를 병합
|
|
||||||
Map<String, VesselAccumulator> mergedMap = new HashMap<>(20000);
|
|
||||||
LocalDateTime baseTime = null;
|
|
||||||
|
|
||||||
for (TimeRange range : groupRanges) {
|
|
||||||
try {
|
|
||||||
// 첫 범위의 시작 시간을 기준으로 설정
|
|
||||||
if (baseTime == null) {
|
|
||||||
baseTime = range.getStart();
|
|
||||||
}
|
|
||||||
|
|
||||||
List<CompactVesselTrack> compactTracks = processTableRangeWithBaseTime(
|
|
||||||
request, strategy, range, baseTime, viewportVesselIds, queryId, ctx);
|
|
||||||
if (benchmark != null) benchmark.connHourly5min++; // [BENCHMARK]
|
|
||||||
|
|
||||||
// 선박별로 볕합
|
|
||||||
for (CompactVesselTrack track : compactTracks) {
|
|
||||||
String vesselId = track.getVesselId();
|
|
||||||
VesselAccumulator accumulator = mergedMap.get(vesselId);
|
|
||||||
|
|
||||||
if (accumulator == null) {
|
|
||||||
accumulator = new VesselAccumulator();
|
|
||||||
accumulator.mmsi = track.getVesselId();
|
|
||||||
|
|
||||||
VesselInfo vesselInfo = getVesselInfo(track.getVesselId());
|
|
||||||
accumulator.shipName = vesselInfo.shipName;
|
|
||||||
accumulator.shipType = vesselInfo.shipType;
|
|
||||||
accumulator.shipKindCode = vesselInfo.signalKindCode;
|
|
||||||
|
|
||||||
mergedMap.put(vesselId, accumulator);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 데이터 병합
|
|
||||||
accumulator.geometry.addAll(track.getGeometry());
|
|
||||||
accumulator.timestamps.addAll(track.getTimestamps());
|
|
||||||
accumulator.speeds.addAll(track.getSpeeds());
|
|
||||||
accumulator.totalDistance += track.getTotalDistance();
|
|
||||||
accumulator.maxSpeed = Math.max(accumulator.maxSpeed, track.getMaxSpeed());
|
|
||||||
accumulator.pointCount += track.getPointCount();
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("Error processing {} range {}: {}", strategy, range, e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 병합된 데이터를 청크로 분할하여 전송
|
|
||||||
if (!mergedMap.isEmpty()) {
|
|
||||||
List<CompactVesselTrack> mergedTracks = mergedMap.entrySet().stream()
|
|
||||||
.map(entry -> {
|
|
||||||
String vesselId = entry.getKey();
|
|
||||||
VesselAccumulator acc = entry.getValue();
|
|
||||||
|
|
||||||
double avgSpeed = calculateAvgSpeed(acc.totalDistance, acc.timestamps);
|
|
||||||
|
|
||||||
return CompactVesselTrack.builder()
|
|
||||||
.vesselId(vesselId)
|
|
||||||
.nationalCode(acc.mmsi != null && acc.mmsi.length() >= 3 ? acc.mmsi.substring(0, 3) : null)
|
|
||||||
.shipName(acc.shipName)
|
|
||||||
.shipType(acc.shipType)
|
|
||||||
.shipKindCode(acc.shipKindCode)
|
|
||||||
.geometry(acc.geometry)
|
|
||||||
.timestamps(acc.timestamps)
|
|
||||||
.speeds(acc.speeds)
|
|
||||||
.totalDistance(acc.totalDistance)
|
|
||||||
.avgSpeed(avgSpeed)
|
|
||||||
.maxSpeed(acc.maxSpeed)
|
|
||||||
.pointCount(acc.pointCount)
|
|
||||||
.build();
|
|
||||||
})
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
|
|
||||||
// 전체 포인트 통계 계산
|
|
||||||
int totalOriginalPoints = mergedTracks.stream()
|
|
||||||
.mapToInt(t -> t.getPointCount())
|
|
||||||
.sum();
|
|
||||||
|
|
||||||
log.info("[{}] Time window {} - Merged {} vessels, Total {} points",
|
|
||||||
strategy, groupKey, mergedTracks.size(), totalOriginalPoints);
|
|
||||||
|
|
||||||
List<List<CompactVesselTrack>> batches = splitByMessageSize(mergedTracks, queryId);
|
|
||||||
|
|
||||||
for (List<CompactVesselTrack> batch : batches) {
|
|
||||||
TrackChunkResponse response = new TrackChunkResponse();
|
|
||||||
response.setQueryId(queryId);
|
|
||||||
response.setChunkIndex(globalChunkIndex++);
|
|
||||||
response.setIsLastChunk(false);
|
|
||||||
response.setTotalChunks(-1); // 마짉 청크에서 설정
|
|
||||||
response.setCompactTracks(batch);
|
|
||||||
// 처리된 시간 추가
|
|
||||||
String timeKey = groupKey + "_" + strategy;
|
|
||||||
ctx.processedTimeRanges.put(timeKey, groupRanges.stream()
|
|
||||||
.mapToInt(r -> (int)Duration.between(r.getStart(), r.getEnd()).toMinutes())
|
|
||||||
.sum());
|
|
||||||
int currentProcessedMin = ctx.processedTimeRanges.values().stream().mapToInt(Integer::intValue).sum();
|
|
||||||
response.setStats(createChunkStats(batch, uniqueVesselIds, currentProcessedMin, ctx));
|
|
||||||
|
|
||||||
// 버퍼 크기 계산 및 추가
|
|
||||||
int chunkSize = batch.stream().mapToInt(t -> estimateTrackSize(t)).sum();
|
|
||||||
long currentBufferSize = pendingBufferSize.addAndGet(chunkSize);
|
|
||||||
metrics.totalBytes.addAndGet(chunkSize);
|
|
||||||
metrics.chunkCount.incrementAndGet();
|
|
||||||
|
|
||||||
// 버퍼 사용률 로그
|
|
||||||
double bufferUsage = (double) currentBufferSize / MAX_PENDING_BUFFER * 100;
|
|
||||||
if (bufferUsage > 80 && System.currentTimeMillis() - metrics.lastWarningTime > 5000) {
|
|
||||||
metrics.bufferWarnings.incrementAndGet();
|
|
||||||
metrics.lastWarningTime = System.currentTimeMillis();
|
|
||||||
log.warn("[BACKPRESSURE] Query {} - Buffer usage high: {}% ({} MB / {} MB)",
|
|
||||||
queryId, String.format("%.1f", bufferUsage), currentBufferSize / (1024 * 1024),
|
|
||||||
MAX_PENDING_BUFFER / (1024 * 1024));
|
|
||||||
}
|
|
||||||
|
|
||||||
// 버퍼가 가듍 찬 경우 대기 및 동적 청크 크기 조절
|
|
||||||
int backpressureWaitCount = 0;
|
|
||||||
while (pendingBufferSize.get() > MAX_PENDING_BUFFER) {
|
|
||||||
if (backpressureWaitCount == 0) {
|
|
||||||
metrics.backpressureEvents.incrementAndGet();
|
|
||||||
log.warn("[BACKPRESSURE] Query {} - Buffer full! Waiting for buffer to drain. Current: {} MB",
|
|
||||||
queryId, pendingBufferSize.get() / (1024 * 1024));
|
|
||||||
}
|
|
||||||
Thread.sleep(50);
|
|
||||||
backpressureWaitCount++;
|
|
||||||
|
|
||||||
// 500ms 이상 대기 시 청크 크기 감소
|
|
||||||
if (backpressureWaitCount > 10 && metrics.dynamicChunkSizeKB > MIN_MESSAGE_SIZE_KB) {
|
|
||||||
int oldSize = metrics.dynamicChunkSizeKB;
|
|
||||||
metrics.dynamicChunkSizeKB = Math.max(MIN_MESSAGE_SIZE_KB,
|
|
||||||
metrics.dynamicChunkSizeKB - 512);
|
|
||||||
log.info("[BACKPRESSURE] Query {} - Reducing chunk size: {} KB -> {} KB",
|
|
||||||
queryId, oldSize, metrics.dynamicChunkSizeKB);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (backpressureWaitCount > 0) {
|
|
||||||
log.info("[BACKPRESSURE] Query {} - Buffer drained after {} ms wait",
|
|
||||||
queryId, backpressureWaitCount * 50);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 전송 및 버퍼 추적 (전송 완료 후 즉시 감소)
|
|
||||||
try {
|
|
||||||
chunkConsumer.accept(response);
|
|
||||||
} finally {
|
|
||||||
pendingBufferSize.addAndGet(-chunkSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 유니크 선박 카운트
|
|
||||||
batch.forEach(track -> uniqueVesselIds.add(track.getVesselId()));
|
|
||||||
|
|
||||||
// 진행률 업데이트 (시간 기반만 사용)
|
|
||||||
double timeProgress = (double) currentProcessedMin / ctx.estimatedTotalMinutes * 100;
|
|
||||||
|
|
||||||
statusConsumer.accept(new QueryStatusUpdate(
|
|
||||||
queryId,
|
|
||||||
"PROCESSING",
|
|
||||||
"Processing chunk " + globalChunkIndex,
|
|
||||||
Math.min(99.0, timeProgress)
|
|
||||||
));
|
|
||||||
|
|
||||||
// 버퍼 사용률에 따른 적응형 대기
|
|
||||||
double currentBufferUsage = (double) pendingBufferSize.get() / MAX_PENDING_BUFFER;
|
|
||||||
int waitTime;
|
|
||||||
if (currentBufferUsage > 0.8) waitTime = 200; // 80%↑: 강한 억제
|
|
||||||
else if (currentBufferUsage > 0.5) waitTime = 100; // 50%↑: 중간 억제
|
|
||||||
else if (currentBufferUsage > 0.3) waitTime = 50; // 30%↑: 약한 억제
|
|
||||||
else waitTime = 10; // 정상: 최소 지연
|
|
||||||
Thread.sleep(waitTime);
|
|
||||||
|
|
||||||
// 진행 상황 로그 (매 10번째 청크마다)
|
|
||||||
if (globalChunkIndex % 10 == 0) {
|
|
||||||
log.info("Progress: chunk {}, vessels: {}, time progress: {}%",
|
|
||||||
globalChunkIndex, uniqueVesselIds.size(),
|
|
||||||
Math.round(timeProgress));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
batches.clear(); // 메모리 즉시 해제: 배치 분할 리스트
|
|
||||||
mergedTracks.clear(); // 메모리 즉시 해제: 병합 항적 리스트
|
|
||||||
}
|
|
||||||
mergedMap.clear(); // 메모리 즉시 해제: 선박 누적 맵
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1145,7 +989,7 @@ public class ChunkedTrackStreamingService {
|
|||||||
.viewportBounds(vpBounds)
|
.viewportBounds(vpBounds)
|
||||||
.requestedMmsi(request.getVesselIds() != null ? request.getVesselIds().size() : 0)
|
.requestedMmsi(request.getVesselIds() != null ? request.getVesselIds().size() : 0)
|
||||||
.dataPath(benchmark.determinePath())
|
.dataPath(benchmark.determinePath())
|
||||||
.cacheHitDays(benchmark.cacheHitDays)
|
.cacheHitDays(benchmark.cacheHitDays + benchmark.cacheHourlyRanges + benchmark.cacheFiveMinRanges)
|
||||||
.dbQueryDays(benchmark.dbQueryDays)
|
.dbQueryDays(benchmark.dbQueryDays)
|
||||||
.dbConnTotal(benchmark.dbConnectionTotal())
|
.dbConnTotal(benchmark.dbConnectionTotal())
|
||||||
.uniqueVessels(uniqueVesselIds.size())
|
.uniqueVessels(uniqueVesselIds.size())
|
||||||
@ -1160,6 +1004,7 @@ public class ChunkedTrackStreamingService {
|
|||||||
.backpressureEvents(bpMetrics != null ? bpMetrics.backpressureEvents.get() : 0)
|
.backpressureEvents(bpMetrics != null ? bpMetrics.backpressureEvents.get() : 0)
|
||||||
.status(queryStatus)
|
.status(queryStatus)
|
||||||
.clientIp(clientIp)
|
.clientIp(clientIp)
|
||||||
|
.clientId(clientId)
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2576,4 +2421,377 @@ public class ChunkedTrackStreamingService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ========== L1/L2 캐시 기반 HOURLY/FIVE_MINUTE 처리 ==========
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HOURLY/FIVE_MINUTE 범위를 L1/L2 캐시에서 직접 처리.
|
||||||
|
* currentHourStart 기준: >= currentHour → L1(5min), < currentHour → L2(hourly)
|
||||||
|
* DB fallback 없음 — 캐시에 없으면 데이터 자체가 없음 (배치 Job이 DB+캐시 동시 적재)
|
||||||
|
*/
|
||||||
|
private void processHourlyFiveMinWithCache(
|
||||||
|
List<TimeRange> ranges, TrackQueryRequest request, String queryId,
|
||||||
|
Consumer<TrackChunkResponse> chunkConsumer,
|
||||||
|
Consumer<QueryStatusUpdate> statusConsumer,
|
||||||
|
Set<String> uniqueVesselIds,
|
||||||
|
TableStrategy originalStrategy, QueryBenchmark benchmark,
|
||||||
|
QueryContext ctx, BackpressureMetrics bpMetrics) throws Exception {
|
||||||
|
|
||||||
|
LocalDateTime currentHourStart = LocalDateTime.now().withMinute(0).withSecond(0).withNano(0);
|
||||||
|
|
||||||
|
Map<String, List<TimeRange>> timeGroups = groupRangesByTimeWindow(ranges, 6);
|
||||||
|
|
||||||
|
for (Map.Entry<String, List<TimeRange>> groupEntry : timeGroups.entrySet()) {
|
||||||
|
String groupKey = groupEntry.getKey();
|
||||||
|
List<TimeRange> groupRanges = groupEntry.getValue();
|
||||||
|
|
||||||
|
if (isQueryCancelled(queryId, null)) return;
|
||||||
|
|
||||||
|
log.info("[CACHE-{}] Processing time window {} with {} ranges", originalStrategy, groupKey, groupRanges.size());
|
||||||
|
|
||||||
|
LocalDateTime groupStart = groupRanges.get(0).getStart();
|
||||||
|
LocalDateTime groupEnd = groupRanges.get(groupRanges.size() - 1).getEnd();
|
||||||
|
|
||||||
|
// L1/L2 범위 분리: currentHourStart 기준
|
||||||
|
Map<String, List<VesselTrack>> l2Result = Collections.emptyMap();
|
||||||
|
Map<String, List<VesselTrack>> l1Result = Collections.emptyMap();
|
||||||
|
|
||||||
|
// L2 범위: groupStart ~ min(groupEnd, currentHourStart)
|
||||||
|
if (groupStart.isBefore(currentHourStart)) {
|
||||||
|
LocalDateTime l2End = groupEnd.isBefore(currentHourStart) ? groupEnd : currentHourStart;
|
||||||
|
LocalDateTime l2StartExpanded = groupStart.withMinute(0).withSecond(0).withNano(0);
|
||||||
|
l2Result = hourlyTrackCache.getTracksInRange(l2StartExpanded, l2End);
|
||||||
|
log.info("[CACHE-L2] Range [{}, {}): {} vessels", l2StartExpanded, l2End, l2Result.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
// L1 범위: max(groupStart, currentHourStart) ~ groupEnd
|
||||||
|
if (groupEnd.isAfter(currentHourStart)) {
|
||||||
|
LocalDateTime l1Start = groupStart.isAfter(currentHourStart) ? groupStart : currentHourStart;
|
||||||
|
l1Result = fiveMinTrackCache.getTracksInRange(l1Start, groupEnd);
|
||||||
|
log.info("[CACHE-L1] Range [{}, {}): {} vessels", l1Start, groupEnd, l1Result.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
// L1 + L2 결과 merge (MMSI 기준)
|
||||||
|
Map<String, List<VesselTrack>> merged = new LinkedHashMap<>(l2Result);
|
||||||
|
for (Map.Entry<String, List<VesselTrack>> l1Entry : l1Result.entrySet()) {
|
||||||
|
merged.merge(l1Entry.getKey(), l1Entry.getValue(), (existing, newTracks) -> {
|
||||||
|
List<VesselTrack> combined = new ArrayList<>(existing);
|
||||||
|
combined.addAll(newTracks);
|
||||||
|
combined.sort(Comparator.comparing(VesselTrack::getTimeBucket));
|
||||||
|
return combined;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (merged.isEmpty()) {
|
||||||
|
log.info("[CACHE-{}] No data in cache for window {}", originalStrategy, groupKey);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 뷰포트 필터
|
||||||
|
Map<String, List<VesselTrack>> viewportFiltered = filterByViewport(merged, request.getViewport());
|
||||||
|
|
||||||
|
// VesselTrack → CompactVesselTrack 변환
|
||||||
|
List<CompactVesselTrack> compactTracks = vesselTrackToCompactConverter.convert(viewportFiltered);
|
||||||
|
|
||||||
|
// 간소화 적용
|
||||||
|
applySimplification(compactTracks, request, originalStrategy, ctx);
|
||||||
|
|
||||||
|
// VesselAccumulator에 병합
|
||||||
|
Map<String, VesselAccumulator> mergedMap = new HashMap<>(compactTracks.size());
|
||||||
|
mergeTracks(compactTracks, mergedMap);
|
||||||
|
|
||||||
|
if (!mergedMap.isEmpty()) {
|
||||||
|
List<CompactVesselTrack> mergedTracks = mergedMap.entrySet().stream()
|
||||||
|
.map(entry -> {
|
||||||
|
VesselAccumulator acc = entry.getValue();
|
||||||
|
String nationalCode = acc.mmsi != null && acc.mmsi.length() >= 3
|
||||||
|
? acc.mmsi.substring(0, 3) : null;
|
||||||
|
double avgSpeed = calculateAvgSpeed(acc.totalDistance, acc.timestamps);
|
||||||
|
return CompactVesselTrack.builder()
|
||||||
|
.vesselId(acc.mmsi)
|
||||||
|
.nationalCode(nationalCode)
|
||||||
|
.shipName(acc.shipName)
|
||||||
|
.shipType(acc.shipType)
|
||||||
|
.shipKindCode(acc.shipKindCode)
|
||||||
|
.geometry(acc.geometry)
|
||||||
|
.timestamps(acc.timestamps)
|
||||||
|
.speeds(acc.speeds)
|
||||||
|
.totalDistance(acc.totalDistance)
|
||||||
|
.avgSpeed(avgSpeed)
|
||||||
|
.maxSpeed(acc.maxSpeed)
|
||||||
|
.pointCount(acc.pointCount)
|
||||||
|
.build();
|
||||||
|
})
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
uniqueVesselIds.addAll(mergedMap.keySet());
|
||||||
|
|
||||||
|
int totalPoints = mergedTracks.stream().mapToInt(CompactVesselTrack::getPointCount).sum();
|
||||||
|
log.info("[CACHE-{}] Time window {} - {} vessels, {} points",
|
||||||
|
originalStrategy, groupKey, mergedTracks.size(), totalPoints);
|
||||||
|
|
||||||
|
if (benchmark != null) {
|
||||||
|
benchmark.totalTracks += mergedTracks.size();
|
||||||
|
benchmark.totalPointsBefore += totalPoints;
|
||||||
|
benchmark.totalPointsAfter += totalPoints;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 청크 분할 및 전송 (백프레셔 포함)
|
||||||
|
List<List<CompactVesselTrack>> batches = splitByMessageSize(mergedTracks, queryId);
|
||||||
|
|
||||||
|
for (List<CompactVesselTrack> batch : batches) {
|
||||||
|
if (isQueryCancelled(queryId, null)) return;
|
||||||
|
|
||||||
|
TrackChunkResponse response = new TrackChunkResponse();
|
||||||
|
response.setQueryId(queryId);
|
||||||
|
response.setChunkIndex(ctx.currentGlobalChunkIndex++);
|
||||||
|
response.setIsLastChunk(false);
|
||||||
|
response.setTotalChunks(-1);
|
||||||
|
response.setCompactTracks(batch);
|
||||||
|
|
||||||
|
String timeKey = groupKey + "_" + originalStrategy;
|
||||||
|
ctx.processedTimeRanges.put(timeKey, groupRanges.stream()
|
||||||
|
.mapToInt(r -> (int) Duration.between(r.getStart(), r.getEnd()).toMinutes())
|
||||||
|
.sum());
|
||||||
|
int currentProcessedMin = ctx.processedTimeRanges.values().stream()
|
||||||
|
.mapToInt(Integer::intValue).sum();
|
||||||
|
response.setStats(createChunkStats(batch, uniqueVesselIds, currentProcessedMin, ctx));
|
||||||
|
|
||||||
|
int chunkSize = batch.stream().mapToInt(this::estimateTrackSize).sum();
|
||||||
|
long currentBufferSize = pendingBufferSize.addAndGet(chunkSize);
|
||||||
|
if (bpMetrics != null) {
|
||||||
|
bpMetrics.totalBytes.addAndGet(chunkSize);
|
||||||
|
bpMetrics.chunkCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
// 백프레셔 대기
|
||||||
|
int bpWait = 0;
|
||||||
|
while (pendingBufferSize.get() > MAX_PENDING_BUFFER) {
|
||||||
|
Thread.sleep(50);
|
||||||
|
bpWait++;
|
||||||
|
if (bpWait > 10 && bpMetrics != null && bpMetrics.dynamicChunkSizeKB > MIN_MESSAGE_SIZE_KB) {
|
||||||
|
bpMetrics.dynamicChunkSizeKB = Math.max(MIN_MESSAGE_SIZE_KB,
|
||||||
|
bpMetrics.dynamicChunkSizeKB - 512);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
chunkConsumer.accept(response);
|
||||||
|
} finally {
|
||||||
|
pendingBufferSize.addAndGet(-chunkSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
double timeProgress = ctx.estimatedTotalMinutes > 0
|
||||||
|
? (double) currentProcessedMin / ctx.estimatedTotalMinutes * 100 : 0;
|
||||||
|
statusConsumer.accept(new QueryStatusUpdate(
|
||||||
|
queryId, "PROCESSING",
|
||||||
|
"Processing chunk " + ctx.currentGlobalChunkIndex,
|
||||||
|
Math.min(99.0, timeProgress)));
|
||||||
|
|
||||||
|
if (benchmark != null) benchmark.totalBatches++;
|
||||||
|
}
|
||||||
|
batches.clear();
|
||||||
|
mergedTracks.clear();
|
||||||
|
}
|
||||||
|
mergedMap.clear();
|
||||||
|
|
||||||
|
// benchmark 갱신
|
||||||
|
if (benchmark != null) {
|
||||||
|
if (originalStrategy == TableStrategy.HOURLY) {
|
||||||
|
benchmark.cacheHourlyRanges += groupRanges.size();
|
||||||
|
} else {
|
||||||
|
benchmark.cacheFiveMinRanges += groupRanges.size();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 캐시 데이터에서 뷰포트 교차 선박 필터링 — JTS 파싱 없이 WKT 좌표 직접 파싱
|
||||||
|
*/
|
||||||
|
private Map<String, List<VesselTrack>> filterByViewport(
|
||||||
|
Map<String, List<VesselTrack>> tracksByMmsi, ViewportFilter viewport) {
|
||||||
|
if (viewport == null || !viewport.isValid() || tracksByMmsi.isEmpty()) {
|
||||||
|
return tracksByMmsi;
|
||||||
|
}
|
||||||
|
|
||||||
|
double minLon = viewport.getMinLon(), minLat = viewport.getMinLat();
|
||||||
|
double maxLon = viewport.getMaxLon(), maxLat = viewport.getMaxLat();
|
||||||
|
|
||||||
|
Map<String, List<VesselTrack>> filtered = new LinkedHashMap<>();
|
||||||
|
|
||||||
|
for (Map.Entry<String, List<VesselTrack>> entry : tracksByMmsi.entrySet()) {
|
||||||
|
boolean inViewport = false;
|
||||||
|
for (VesselTrack track : entry.getValue()) {
|
||||||
|
if (isTrackInViewport(track.getTrackGeom(), minLon, minLat, maxLon, maxLat)) {
|
||||||
|
inViewport = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (inViewport) {
|
||||||
|
filtered.put(entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return filtered;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* WKT LineStringM 좌표 경량 파싱으로 뷰포트 bbox 교차 확인
|
||||||
|
*/
|
||||||
|
private boolean isTrackInViewport(String wkt, double minLon, double minLat, double maxLon, double maxLat) {
|
||||||
|
if (wkt == null || wkt.isEmpty()) return false;
|
||||||
|
|
||||||
|
int openParen = wkt.indexOf('(');
|
||||||
|
int closeParen = wkt.lastIndexOf(')');
|
||||||
|
if (openParen < 0 || closeParen <= openParen + 1) return false;
|
||||||
|
|
||||||
|
String coords = wkt.substring(openParen + 1, closeParen);
|
||||||
|
for (String point : coords.split(",")) {
|
||||||
|
String trimmed = point.trim();
|
||||||
|
int firstSpace = trimmed.indexOf(' ');
|
||||||
|
if (firstSpace <= 0) continue;
|
||||||
|
int secondSpace = trimmed.indexOf(' ', firstSpace + 1);
|
||||||
|
|
||||||
|
try {
|
||||||
|
double lon = Double.parseDouble(trimmed.substring(0, firstSpace));
|
||||||
|
double lat = Double.parseDouble(trimmed.substring(firstSpace + 1,
|
||||||
|
secondSpace > 0 ? secondSpace : trimmed.length()));
|
||||||
|
if (lon >= minLon && lon <= maxLon && lat >= minLat && lat <= maxLat) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
// skip malformed coordinate
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* CompactVesselTrack 리스트를 VesselAccumulator 맵에 병합
|
||||||
|
*/
|
||||||
|
private void mergeTracks(List<CompactVesselTrack> tracks, Map<String, VesselAccumulator> mergedMap) {
|
||||||
|
for (CompactVesselTrack track : tracks) {
|
||||||
|
String vesselId = track.getVesselId();
|
||||||
|
VesselAccumulator accumulator = mergedMap.get(vesselId);
|
||||||
|
|
||||||
|
if (accumulator == null) {
|
||||||
|
accumulator = new VesselAccumulator();
|
||||||
|
accumulator.mmsi = vesselId;
|
||||||
|
accumulator.shipName = track.getShipName();
|
||||||
|
accumulator.shipType = track.getShipType();
|
||||||
|
accumulator.shipKindCode = track.getShipKindCode();
|
||||||
|
mergedMap.put(vesselId, accumulator);
|
||||||
|
}
|
||||||
|
|
||||||
|
accumulator.geometry.addAll(track.getGeometry());
|
||||||
|
accumulator.timestamps.addAll(track.getTimestamps());
|
||||||
|
accumulator.speeds.addAll(track.getSpeeds());
|
||||||
|
accumulator.totalDistance += track.getTotalDistance();
|
||||||
|
accumulator.maxSpeed = Math.max(accumulator.maxSpeed, track.getMaxSpeed());
|
||||||
|
accumulator.pointCount += track.getPointCount();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* CompactVesselTrack 리스트에 전략/줌 레벨별 간소화 적용
|
||||||
|
*/
|
||||||
|
private void applySimplification(List<CompactVesselTrack> tracks, TrackQueryRequest request,
|
||||||
|
TableStrategy strategy, QueryContext ctx) {
|
||||||
|
if (tracks == null || tracks.isEmpty()) return;
|
||||||
|
|
||||||
|
for (CompactVesselTrack track : tracks) {
|
||||||
|
if (track.getGeometry() == null || track.getGeometry().size() <= 1) continue;
|
||||||
|
|
||||||
|
int originalSize = track.getGeometry().size();
|
||||||
|
List<double[]> simplifiedGeometry = new ArrayList<>();
|
||||||
|
List<String> simplifiedTimestamps = new ArrayList<>();
|
||||||
|
List<Double> simplifiedSpeeds = new ArrayList<>();
|
||||||
|
double[] prevPoint = null;
|
||||||
|
LocalDateTime prevTime = null;
|
||||||
|
|
||||||
|
for (int i = 0; i < track.getGeometry().size(); i++) {
|
||||||
|
double[] point = track.getGeometry().get(i);
|
||||||
|
LocalDateTime currentTime = null;
|
||||||
|
|
||||||
|
String tsStr = track.getTimestamps().get(i);
|
||||||
|
try {
|
||||||
|
if (tsStr.matches("\\d{10,}")) {
|
||||||
|
currentTime = LocalDateTime.ofInstant(
|
||||||
|
java.time.Instant.ofEpochSecond(Long.parseLong(tsStr)),
|
||||||
|
java.time.ZoneId.systemDefault());
|
||||||
|
} else {
|
||||||
|
currentTime = LocalDateTime.parse(tsStr, TIMESTAMP_FORMATTER);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
currentTime = LocalDateTime.now();
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean include = false;
|
||||||
|
|
||||||
|
if (i == 0 || i == track.getGeometry().size() - 1) {
|
||||||
|
include = true;
|
||||||
|
} else if (prevPoint != null && prevTime != null) {
|
||||||
|
double distance = calculateDistance(prevPoint[1], prevPoint[0], point[1], point[0]);
|
||||||
|
long minutesSincePrev = ChronoUnit.MINUTES.between(prevTime, currentTime);
|
||||||
|
|
||||||
|
if (strategy == TableStrategy.DAILY) {
|
||||||
|
include = distance > 0.05 || minutesSincePrev >= 60;
|
||||||
|
} else if (strategy == TableStrategy.HOURLY) {
|
||||||
|
include = distance > 1.08 || minutesSincePrev >= 60;
|
||||||
|
} else {
|
||||||
|
include = distance > 0.54 || minutesSincePrev >= 30;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (track.getAvgSpeed() > 0 && track.getAvgSpeed() < 5.0) {
|
||||||
|
include = distance > 1.5 || minutesSincePrev >= 45;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (include) {
|
||||||
|
simplifiedGeometry.add(point);
|
||||||
|
simplifiedTimestamps.add(tsStr);
|
||||||
|
simplifiedSpeeds.add(track.getSpeeds().size() > i ? track.getSpeeds().get(i) : 0.0);
|
||||||
|
prevPoint = point;
|
||||||
|
prevTime = currentTime;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 줌 레벨에 따른 추가 샘플링
|
||||||
|
if (request.getZoomLevel() != null && request.getZoomLevel() < 10 && simplifiedGeometry.size() > 2) {
|
||||||
|
int sampleRate = request.getZoomLevel() < 6 ? 10 :
|
||||||
|
request.getZoomLevel() < 8 ? 5 : 2;
|
||||||
|
|
||||||
|
List<double[]> sampledGeometry = new ArrayList<>();
|
||||||
|
List<String> sampledTimestamps = new ArrayList<>();
|
||||||
|
List<Double> sampledSpeeds = new ArrayList<>();
|
||||||
|
|
||||||
|
for (int i = 0; i < simplifiedGeometry.size(); i++) {
|
||||||
|
if (i % sampleRate == 0 || i == simplifiedGeometry.size() - 1) {
|
||||||
|
sampledGeometry.add(simplifiedGeometry.get(i));
|
||||||
|
sampledTimestamps.add(simplifiedTimestamps.get(i));
|
||||||
|
sampledSpeeds.add(simplifiedSpeeds.get(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
track.setGeometry(sampledGeometry);
|
||||||
|
track.setTimestamps(sampledTimestamps);
|
||||||
|
track.setSpeeds(sampledSpeeds);
|
||||||
|
track.setPointCount(sampledGeometry.size());
|
||||||
|
} else {
|
||||||
|
track.setGeometry(simplifiedGeometry);
|
||||||
|
track.setTimestamps(simplifiedTimestamps);
|
||||||
|
track.setSpeeds(simplifiedSpeeds);
|
||||||
|
track.setPointCount(simplifiedGeometry.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ctx != null && ctx.vesselLogCount < 10 && originalSize > track.getPointCount()) {
|
||||||
|
double reductionRate = (1 - (double) track.getPointCount() / originalSize) * 100;
|
||||||
|
log.info("[CACHE-SIMPLIFY] Vessel {} simplified: {} -> {} points ({}% reduced, zoom: {})",
|
||||||
|
track.getVesselId(), originalSize, track.getPointCount(),
|
||||||
|
Math.round(reductionRate), request.getZoomLevel());
|
||||||
|
ctx.vesselLogCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -160,7 +160,8 @@ public class QueryMetricsController {
|
|||||||
@GetMapping("/timeseries")
|
@GetMapping("/timeseries")
|
||||||
@Operation(summary = "쿼리 메트릭 시계열", description = "시간별/일별 버킷 집계 + Top 10 클라이언트")
|
@Operation(summary = "쿼리 메트릭 시계열", description = "시간별/일별 버킷 집계 + Top 10 클라이언트")
|
||||||
public Map<String, Object> getTimeSeries(
|
public Map<String, Object> getTimeSeries(
|
||||||
@Parameter(description = "조회 기간 (일)") @RequestParam(defaultValue = "7") int days) {
|
@Parameter(description = "조회 기간 (일)") @RequestParam(defaultValue = "7") int days,
|
||||||
|
@Parameter(description = "Top 클라이언트 그룹 기준 (ip | id)") @RequestParam(defaultValue = "ip") String groupBy) {
|
||||||
|
|
||||||
days = Math.min(days, 90);
|
days = Math.min(days, 90);
|
||||||
String granularity = days <= 7 ? "HOURLY" : "DAILY";
|
String granularity = days <= 7 ? "HOURLY" : "DAILY";
|
||||||
@ -184,15 +185,17 @@ public class QueryMetricsController {
|
|||||||
|
|
||||||
List<Map<String, Object>> buckets = queryJdbcTemplate.queryForList(bucketSql);
|
List<Map<String, Object>> buckets = queryJdbcTemplate.queryForList(bucketSql);
|
||||||
|
|
||||||
|
boolean groupById = "id".equalsIgnoreCase(groupBy);
|
||||||
|
String clientColumn = groupById ? "client_id" : "client_ip";
|
||||||
String topClientsSql = """
|
String topClientsSql = """
|
||||||
SELECT client_ip, COUNT(*) AS query_count,
|
SELECT %s AS client, COUNT(*) AS query_count,
|
||||||
COALESCE(AVG(elapsed_ms), 0) AS avg_elapsed_ms
|
COALESCE(AVG(elapsed_ms), 0) AS avg_elapsed_ms
|
||||||
FROM signal.t_query_metrics
|
FROM signal.t_query_metrics
|
||||||
WHERE created_at >= NOW() - INTERVAL '%d days'
|
WHERE created_at >= NOW() - INTERVAL '%d days'
|
||||||
AND client_ip IS NOT NULL
|
AND %s IS NOT NULL
|
||||||
GROUP BY client_ip
|
GROUP BY %s
|
||||||
ORDER BY query_count DESC LIMIT 10
|
ORDER BY query_count DESC LIMIT 10
|
||||||
""".formatted(days);
|
""".formatted(clientColumn, days, clientColumn, clientColumn);
|
||||||
|
|
||||||
List<Map<String, Object>> topClients = queryJdbcTemplate.queryForList(topClientsSql);
|
List<Map<String, Object>> topClients = queryJdbcTemplate.queryForList(topClientsSql);
|
||||||
|
|
||||||
@ -200,6 +203,7 @@ public class QueryMetricsController {
|
|||||||
result.put("buckets", buckets);
|
result.put("buckets", buckets);
|
||||||
result.put("topClients", topClients);
|
result.put("topClients", topClients);
|
||||||
result.put("granularity", granularity);
|
result.put("granularity", granularity);
|
||||||
|
result.put("groupBy", groupById ? "id" : "ip");
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -32,7 +32,7 @@ public class QueryMetricsBufferService {
|
|||||||
unique_vessels, total_tracks, total_points, points_after_simplify,
|
unique_vessels, total_tracks, total_points, points_after_simplify,
|
||||||
total_chunks, response_bytes,
|
total_chunks, response_bytes,
|
||||||
elapsed_ms, db_query_ms, simplify_ms, backpressure_events,
|
elapsed_ms, db_query_ms, simplify_ms, backpressure_events,
|
||||||
status, client_ip
|
status, client_ip, client_id
|
||||||
) VALUES (
|
) VALUES (
|
||||||
?, ?, ?, now(),
|
?, ?, ?, now(),
|
||||||
?, ?, ?, ?, ?,
|
?, ?, ?, ?, ?,
|
||||||
@ -40,7 +40,7 @@ public class QueryMetricsBufferService {
|
|||||||
?, ?, ?, ?,
|
?, ?, ?, ?,
|
||||||
?, ?,
|
?, ?,
|
||||||
?, ?, ?, ?,
|
?, ?, ?, ?,
|
||||||
?, ?
|
?, ?, ?
|
||||||
)
|
)
|
||||||
""";
|
""";
|
||||||
|
|
||||||
@ -73,6 +73,27 @@ public class QueryMetricsBufferService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
void ensureClientIdColumn() {
|
||||||
|
try {
|
||||||
|
queryJdbcTemplate.execute("""
|
||||||
|
DO $$
|
||||||
|
BEGIN
|
||||||
|
IF NOT EXISTS (
|
||||||
|
SELECT 1 FROM information_schema.columns
|
||||||
|
WHERE table_schema = 'signal' AND table_name = 't_query_metrics' AND column_name = 'client_id'
|
||||||
|
) THEN
|
||||||
|
ALTER TABLE signal.t_query_metrics ADD COLUMN client_id VARCHAR(100);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_query_metrics_client_id ON signal.t_query_metrics(client_id, created_at);
|
||||||
|
END IF;
|
||||||
|
END $$
|
||||||
|
""");
|
||||||
|
log.info("t_query_metrics client_id column ensured");
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("Failed to ensure client_id column: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 메트릭 레코드를 버퍼에 추가 (lock-free)
|
* 메트릭 레코드를 버퍼에 추가 (lock-free)
|
||||||
*/
|
*/
|
||||||
@ -119,7 +140,7 @@ public class QueryMetricsBufferService {
|
|||||||
m.getUniqueVessels(), m.getTotalTracks(), m.getTotalPoints(), m.getPointsAfterSimplify(),
|
m.getUniqueVessels(), m.getTotalTracks(), m.getTotalPoints(), m.getPointsAfterSimplify(),
|
||||||
m.getTotalChunks(), m.getResponseBytes(),
|
m.getTotalChunks(), m.getResponseBytes(),
|
||||||
m.getElapsedMs(), m.getDbQueryMs(), m.getSimplifyMs(), m.getBackpressureEvents(),
|
m.getElapsedMs(), m.getDbQueryMs(), m.getSimplifyMs(), m.getBackpressureEvents(),
|
||||||
m.getStatus(), m.getClientIp()
|
m.getStatus(), m.getClientIp(), m.getClientId()
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -131,5 +131,6 @@ public class QueryMetricsService {
|
|||||||
private final int backpressureEvents;
|
private final int backpressureEvents;
|
||||||
private final String status;
|
private final String status;
|
||||||
private final String clientIp;
|
private final String clientIp;
|
||||||
|
private final String clientId;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
불러오는 중...
Reference in New Issue
Block a user