Merge pull request 'feat: 캐시 O(1) 조회 + 메모리 예산 관리 + L2 블록 간소화 포팅' (#86) from feature/perf-cache-optimization into develop
This commit is contained in:
커밋
047117033b
@ -4,5 +4,11 @@
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### 추가
|
||||
- L1/L2/L3 캐시 O(1) 키 기반 직접 조회 (전체 스캔 O(n) 대체)
|
||||
- 64GB JVM 메모리 예산 논리적 파티셔닝 (캐시 35GB / 쿼리 20GB / 시스템 9GB)
|
||||
- L2 HourlyTrackCache 6시간 경과 엔트리 Nth-point 간소화 스케줄러
|
||||
- 메모리 예산 모니터링 API (`GET /api/monitoring/cache/budget`)
|
||||
|
||||
### 기타
|
||||
- settings.json에 CLAUDE_BOT_TOKEN 환경변수 추가
|
||||
|
||||
@ -94,6 +94,50 @@ public class FiveMinTrackCache {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 요청된 MMSI 키로 직접 O(1) 조회 — mmsi×5minBucket 조합으로 Caffeine getIfPresent() 호출
|
||||
* 기존 getTracksInRange()의 전체 스캔(O(n)) 대비 대폭 성능 개선.
|
||||
* 예: 1시간 × 100 MMSI = 1,200회 get() vs 최대 1.5M 엔트리 스캔
|
||||
*/
|
||||
public Map<String, List<VesselTrack>> getTracksForVessels(
|
||||
LocalDateTime start, LocalDateTime end, Set<String> mmsiKeys) {
|
||||
if (mmsiKeys == null || mmsiKeys.isEmpty()) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
Map<String, List<VesselTrack>> result = new LinkedHashMap<>();
|
||||
|
||||
// 5분 단위 버킷 정렬 (start를 가장 가까운 5분 바닥으로 정렬)
|
||||
int startMinute = (start.getMinute() / 5) * 5;
|
||||
LocalDateTime bucket = start.withMinute(startMinute).withSecond(0).withNano(0);
|
||||
|
||||
int lookupCount = 0;
|
||||
int hitCount = 0;
|
||||
|
||||
while (!bucket.isAfter(end) && bucket.isBefore(end)) {
|
||||
for (String mmsi : mmsiKeys) {
|
||||
String key = buildKey(mmsi, bucket);
|
||||
VesselTrack track = cache.getIfPresent(key);
|
||||
lookupCount++;
|
||||
if (track != null) {
|
||||
result.computeIfAbsent(mmsi, k -> new ArrayList<>()).add(track);
|
||||
hitCount++;
|
||||
}
|
||||
}
|
||||
bucket = bucket.plusMinutes(5);
|
||||
}
|
||||
|
||||
// MMSI별 시간순 정렬
|
||||
for (List<VesselTrack> tracks : result.values()) {
|
||||
tracks.sort(Comparator.comparing(VesselTrack::getTimeBucket));
|
||||
}
|
||||
|
||||
int totalTracks = result.values().stream().mapToInt(List::size).sum();
|
||||
log.info("[CACHE-MONITOR] L1.getTracksForVessels [{}, {}): requestedMmsi={}, lookups={}, hits={}, resultMmsi={}, tracks={}",
|
||||
start, end, mmsiKeys.size(), lookupCount, hitCount, result.size(), totalTracks);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 지정 시간 범위의 캐시 항목 제거 (hourly merge 완료 후 호출)
|
||||
*/
|
||||
|
||||
@ -11,6 +11,7 @@ import org.springframework.stereotype.Component;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
@ -31,6 +32,9 @@ public class HourlyTrackCache {
|
||||
|
||||
private Cache<String, VesselTrack> cache;
|
||||
|
||||
// 간소화 완료 추적 (시간 버킷 단위, 중복 간소화 방지)
|
||||
private final Set<LocalDateTime> simplifiedBuckets = ConcurrentHashMap.newKeySet();
|
||||
|
||||
@Value("${app.cache.hourly-track.ttl-hours:26}")
|
||||
private long ttlHours;
|
||||
|
||||
@ -93,6 +97,47 @@ public class HourlyTrackCache {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 요청된 MMSI 키로 직접 O(1) 조회 — mmsi×hourBucket 조합으로 Caffeine getIfPresent() 호출
|
||||
* 기존 getTracksInRange()의 전체 스캔(O(n)) 대비 대폭 성능 개선.
|
||||
* 예: 24시간 × 100 MMSI = 2,400회 get() vs 최대 7M 엔트리 스캔
|
||||
*/
|
||||
public Map<String, List<VesselTrack>> getTracksForVessels(
|
||||
LocalDateTime start, LocalDateTime end, Set<String> mmsiKeys) {
|
||||
if (mmsiKeys == null || mmsiKeys.isEmpty()) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
Map<String, List<VesselTrack>> result = new LinkedHashMap<>();
|
||||
LocalDateTime bucket = start.withMinute(0).withSecond(0).withNano(0);
|
||||
|
||||
int lookupCount = 0;
|
||||
int hitCount = 0;
|
||||
|
||||
while (!bucket.isAfter(end) && bucket.isBefore(end)) {
|
||||
for (String mmsi : mmsiKeys) {
|
||||
String key = buildKey(mmsi, bucket);
|
||||
VesselTrack track = cache.getIfPresent(key);
|
||||
lookupCount++;
|
||||
if (track != null) {
|
||||
result.computeIfAbsent(mmsi, k -> new ArrayList<>()).add(track);
|
||||
hitCount++;
|
||||
}
|
||||
}
|
||||
bucket = bucket.plusHours(1);
|
||||
}
|
||||
|
||||
// MMSI별 시간순 정렬
|
||||
for (List<VesselTrack> tracks : result.values()) {
|
||||
tracks.sort(Comparator.comparing(VesselTrack::getTimeBucket));
|
||||
}
|
||||
|
||||
int totalTracks = result.values().stream().mapToInt(List::size).sum();
|
||||
log.info("[CACHE-MONITOR] L2.getTracksForVessels [{}, {}): requestedMmsi={}, lookups={}, hits={}, resultMmsi={}, tracks={}",
|
||||
start, end, mmsiKeys.size(), lookupCount, hitCount, result.size(), totalTracks);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 지정 시간 범위의 캐시 항목 제거 (daily merge 완료 후 호출)
|
||||
*/
|
||||
@ -109,6 +154,74 @@ public class HourlyTrackCache {
|
||||
start, end, before - after, before, after, getStats());
|
||||
}
|
||||
|
||||
/**
|
||||
* 6시간 이상 경과한 캐시 엔트리의 WKT LineStringM을 간소화.
|
||||
* 매 sampleRate번째 포인트만 유지 (첫/마지막 항상 보존).
|
||||
* 이미 간소화된 시간 버킷은 스킵하여 중복 간소화 방지.
|
||||
*
|
||||
* @param hoursAgo 간소화 대상 경과 시간 (시)
|
||||
* @param sampleRate 샘플링 비율 (2 = 매 2번째 포인트만 유지 → ~50% 감소)
|
||||
* @return 간소화된 엔트리 수
|
||||
*/
|
||||
public int simplifyOlderThan(int hoursAgo, int sampleRate) {
|
||||
LocalDateTime threshold = LocalDateTime.now().minusHours(hoursAgo);
|
||||
int simplified = 0;
|
||||
int totalOriginal = 0;
|
||||
int totalAfter = 0;
|
||||
int skipped = 0;
|
||||
|
||||
for (Map.Entry<String, VesselTrack> entry : cache.asMap().entrySet()) {
|
||||
VesselTrack track = entry.getValue();
|
||||
if (track.getTimeBucket() == null || !track.getTimeBucket().isBefore(threshold)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 이미 간소화된 시간 버킷이면 스킵
|
||||
if (simplifiedBuckets.contains(track.getTimeBucket())) {
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
String wkt = track.getTrackGeom();
|
||||
if (wkt == null || track.getPointCount() == null || track.getPointCount() <= 3) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int originalCount = track.getPointCount();
|
||||
String simplifiedWkt = simplifyLineStringM(wkt, sampleRate);
|
||||
if (simplifiedWkt != null && !simplifiedWkt.equals(wkt)) {
|
||||
track.setTrackGeom(simplifiedWkt);
|
||||
int newCount = countWktPoints(simplifiedWkt);
|
||||
totalOriginal += originalCount;
|
||||
totalAfter += newCount;
|
||||
track.setPointCount(newCount);
|
||||
simplified++;
|
||||
}
|
||||
}
|
||||
|
||||
// 간소화 완료된 시간 버킷 기록 (threshold 이전 모든 정각 버킷)
|
||||
LocalDateTime bucket = threshold.withMinute(0).withSecond(0).withNano(0);
|
||||
LocalDateTime oldest = LocalDateTime.now().minusHours(ttlHours + 1);
|
||||
while (!bucket.isBefore(oldest)) {
|
||||
simplifiedBuckets.add(bucket);
|
||||
bucket = bucket.minusHours(1);
|
||||
}
|
||||
|
||||
// 만료된 버킷 추적 정리
|
||||
simplifiedBuckets.removeIf(b -> b.isBefore(oldest));
|
||||
|
||||
if (simplified > 0) {
|
||||
double reduction = totalOriginal > 0 ? (1 - (double) totalAfter / totalOriginal) * 100 : 0;
|
||||
log.info("[CACHE-SIMPLIFY] L2 간소화: entries={}, skipped={}, points {} -> {} ({}% 감소), threshold={}h",
|
||||
simplified, skipped, totalOriginal, totalAfter,
|
||||
String.format("%.1f", reduction), hoursAgo);
|
||||
} else {
|
||||
log.debug("[CACHE-SIMPLIFY] L2 간소화 대상 없음: skipped={}, threshold={}h", skipped, hoursAgo);
|
||||
}
|
||||
|
||||
return simplified;
|
||||
}
|
||||
|
||||
public long size() {
|
||||
return cache.estimatedSize();
|
||||
}
|
||||
@ -136,4 +249,48 @@ public class HourlyTrackCache {
|
||||
private String buildKey(String mmsi, LocalDateTime timeBucket) {
|
||||
return mmsi + "::" + timeBucket.format(KEY_FORMATTER);
|
||||
}
|
||||
|
||||
/**
|
||||
* WKT LineStringM에서 매 sampleRate번째 포인트만 유지.
|
||||
* 첫 포인트와 마지막 포인트는 항상 보존.
|
||||
*
|
||||
* 입력 형식: "LINESTRING M(lon1 lat1 m1,lon2 lat2 m2,...)"
|
||||
* 또는 "LINESTRINGM(lon1 lat1 m1,lon2 lat2 m2,...)"
|
||||
*/
|
||||
static String simplifyLineStringM(String wkt, int sampleRate) {
|
||||
if (wkt == null || sampleRate <= 1) return wkt;
|
||||
|
||||
int openParen = wkt.indexOf('(');
|
||||
int closeParen = wkt.lastIndexOf(')');
|
||||
if (openParen < 0 || closeParen < 0 || closeParen <= openParen + 1) return wkt;
|
||||
|
||||
String prefix = wkt.substring(0, openParen + 1);
|
||||
String coords = wkt.substring(openParen + 1, closeParen);
|
||||
|
||||
String[] points = coords.split(",");
|
||||
if (points.length <= 3) return wkt;
|
||||
|
||||
StringBuilder sb = new StringBuilder(prefix);
|
||||
for (int i = 0; i < points.length; i++) {
|
||||
if (i == 0 || i == points.length - 1 || i % sampleRate == 0) {
|
||||
if (sb.length() > prefix.length()) {
|
||||
sb.append(',');
|
||||
}
|
||||
sb.append(points[i]);
|
||||
}
|
||||
}
|
||||
sb.append(')');
|
||||
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
static int countWktPoints(String wkt) {
|
||||
if (wkt == null) return 0;
|
||||
int openParen = wkt.indexOf('(');
|
||||
int closeParen = wkt.lastIndexOf(')');
|
||||
if (openParen < 0 || closeParen < 0 || closeParen <= openParen + 1) return 0;
|
||||
String coords = wkt.substring(openParen + 1, closeParen);
|
||||
if (coords.isBlank()) return 0;
|
||||
return coords.split(",").length;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,46 @@
|
||||
package gc.mda.signal_batch.batch.reader;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* L2 HourlyTrackCache 간소화 스케줄러
|
||||
*
|
||||
* 6시간 이상 경과한 캐시 엔트리의 WKT LineStringM을 Nth-point 샘플링으로 간소화.
|
||||
* 기본 스케줄: 06:30, 12:30, 18:30 (1일 3회)
|
||||
*
|
||||
* 간소화 효과: sampleRate=2 기준 ~50% 포인트 감소 → L2 메모리 절약
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "vessel.batch.cache.hourly-simplification.enabled", havingValue = "true")
|
||||
public class HourlyTrackSimplifier {
|
||||
|
||||
private final HourlyTrackCache hourlyTrackCache;
|
||||
|
||||
@Value("${vessel.batch.cache.hourly-simplification.hours-ago:6}")
|
||||
private int hoursAgo;
|
||||
|
||||
@Value("${vessel.batch.cache.hourly-simplification.sample-rate:2}")
|
||||
private int sampleRate;
|
||||
|
||||
public HourlyTrackSimplifier(HourlyTrackCache hourlyTrackCache) {
|
||||
this.hourlyTrackCache = hourlyTrackCache;
|
||||
}
|
||||
|
||||
@Scheduled(cron = "${vessel.batch.cache.hourly-simplification.cron:0 30 6,12,18 * * *}")
|
||||
public void scheduledSimplification() {
|
||||
log.info("[HourlySimplifier] 스케줄 간소화 시작 — hoursAgo={}, sampleRate={}, cacheSize={}",
|
||||
hoursAgo, sampleRate, hourlyTrackCache.size());
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
int simplified = hourlyTrackCache.simplifyOlderThan(hoursAgo, sampleRate);
|
||||
|
||||
long elapsed = System.currentTimeMillis() - start;
|
||||
log.info("[HourlySimplifier] 스케줄 간소화 완료 — simplified={}, elapsed={}ms, cacheSize={}",
|
||||
simplified, elapsed, hourlyTrackCache.size());
|
||||
}
|
||||
}
|
||||
@ -9,12 +9,15 @@ import gc.mda.signal_batch.domain.vessel.model.AisTargetEntity;
|
||||
import gc.mda.signal_batch.domain.vessel.dto.TrackResponse;
|
||||
import gc.mda.signal_batch.domain.vessel.dto.VesselTracksRequest;
|
||||
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
|
||||
import gc.mda.signal_batch.global.exception.MemoryBudgetExceededException;
|
||||
import gc.mda.signal_batch.global.exception.QueryTimeoutException;
|
||||
import gc.mda.signal_batch.global.util.TrackConverter;
|
||||
import gc.mda.signal_batch.global.util.TrackMemoryEstimator;
|
||||
import gc.mda.signal_batch.global.util.VesselTrackToCompactConverter;
|
||||
import gc.mda.signal_batch.global.websocket.service.ActiveQueryManager;
|
||||
import gc.mda.signal_batch.global.websocket.service.CacheTrackSimplifier;
|
||||
import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager;
|
||||
import gc.mda.signal_batch.global.websocket.service.TrackMemoryBudgetManager;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
@ -52,6 +55,7 @@ public class GisServiceV2 {
|
||||
private final VesselTrackToCompactConverter vesselTrackToCompactConverter;
|
||||
private final ChnPrmShipCacheManager chnPrmShipCacheManager;
|
||||
private final ChnPrmShipProperties chnPrmShipProperties;
|
||||
private final TrackMemoryBudgetManager memoryBudgetManager;
|
||||
|
||||
@Value("${rest.v2.query.timeout-seconds:30}")
|
||||
private int restQueryTimeout;
|
||||
@ -72,7 +76,8 @@ public class GisServiceV2 {
|
||||
FiveMinTrackCache fiveMinTrackCache,
|
||||
VesselTrackToCompactConverter vesselTrackToCompactConverter,
|
||||
ChnPrmShipCacheManager chnPrmShipCacheManager,
|
||||
ChnPrmShipProperties chnPrmShipProperties) {
|
||||
ChnPrmShipProperties chnPrmShipProperties,
|
||||
TrackMemoryBudgetManager memoryBudgetManager) {
|
||||
this.queryDataSource = queryDataSource;
|
||||
this.activeQueryManager = activeQueryManager;
|
||||
this.dailyTrackCacheManager = dailyTrackCacheManager;
|
||||
@ -83,6 +88,7 @@ public class GisServiceV2 {
|
||||
this.vesselTrackToCompactConverter = vesselTrackToCompactConverter;
|
||||
this.chnPrmShipCacheManager = chnPrmShipCacheManager;
|
||||
this.chnPrmShipProperties = chnPrmShipProperties;
|
||||
this.memoryBudgetManager = memoryBudgetManager;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -277,10 +283,24 @@ public class GisServiceV2 {
|
||||
public List<CompactVesselTrack> getVesselTracksV2(VesselTracksRequest request) {
|
||||
String queryId = "rest-vessels-" + UUID.randomUUID().toString().substring(0, 8);
|
||||
boolean slotAcquired = false;
|
||||
boolean memoryReserved = false;
|
||||
|
||||
try {
|
||||
slotAcquired = acquireSlotWithWait(queryId);
|
||||
|
||||
// 쿼리 메모리 사전 예약
|
||||
int days = (int) java.time.Duration.between(request.getStartTime(), request.getEndTime()).toDays() + 1;
|
||||
long estimatedBytes = TrackMemoryEstimator.estimateQueryBytes(days, request.getVessels().size());
|
||||
try {
|
||||
memoryBudgetManager.reserveQueryMemory(queryId, estimatedBytes,
|
||||
memoryBudgetManager.getProperties().getQueueTimeoutSeconds() * 1000L);
|
||||
memoryReserved = true;
|
||||
} catch (MemoryBudgetExceededException e) {
|
||||
log.warn("[MemoryBudget] REST 쿼리 메모리 예약 실패: queryId={}, estimated={}MB — {}",
|
||||
queryId, estimatedBytes / (1024 * 1024), e.getMessage());
|
||||
throw e;
|
||||
}
|
||||
|
||||
List<CompactVesselTrack> result;
|
||||
|
||||
if (dailyTrackCacheManager.isEnabled() &&
|
||||
@ -306,6 +326,9 @@ public class GisServiceV2 {
|
||||
return result;
|
||||
|
||||
} finally {
|
||||
if (memoryReserved) {
|
||||
memoryBudgetManager.releaseQueryMemory(queryId);
|
||||
}
|
||||
if (slotAcquired) {
|
||||
activeQueryManager.releaseQuerySlot(queryId);
|
||||
if (activeQueryManager.isHeapPressureHigh()) {
|
||||
@ -328,24 +351,16 @@ public class GisServiceV2 {
|
||||
|
||||
Set<String> requestedMmsis = new HashSet<>(request.getVessels());
|
||||
|
||||
// 1. 캐시에서 조회 (캐시된 날짜) + 누락 MMSI 부분 DB fallback
|
||||
// 1. L3 캐시에서 요청 MMSI만 O(1) 직접 조회 + 누락 MMSI 부분 DB fallback
|
||||
if (split.hasCachedData()) {
|
||||
List<CompactVesselTrack> cachedTracks =
|
||||
dailyTrackCacheManager.getCachedTracksMultipleDays(split.getCachedDates());
|
||||
|
||||
int totalCachedCount = cachedTracks.size();
|
||||
List<CompactVesselTrack> filteredCached = cachedTracks.stream()
|
||||
.filter(t -> requestedMmsis.contains(t.getVesselId()))
|
||||
.map(t -> t.toBuilder().build())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
cachedTracks.clear();
|
||||
List<CompactVesselTrack> filteredCached =
|
||||
dailyTrackCacheManager.getCachedTracksForVessels(split.getCachedDates(), requestedMmsis);
|
||||
|
||||
allTracks.addAll(filteredCached);
|
||||
log.debug("[CacheQuery] cached {} days -> {} tracks (filtered from {})",
|
||||
split.getCachedDates().size(), filteredCached.size(), totalCachedCount);
|
||||
log.debug("[CacheQuery] cached {} days -> {} tracks (key-based lookup, {} MMSI requested)",
|
||||
split.getCachedDates().size(), filteredCached.size(), requestedMmsis.size());
|
||||
|
||||
// Daily 캐시에 없는 MMSI → DB fallback (hourly/5min 계층 조회)
|
||||
// Daily 캐시에 없는 MMSI → DB fallback
|
||||
Set<String> cachedMmsis = filteredCached.stream()
|
||||
.map(CompactVesselTrack::getVesselId)
|
||||
.collect(Collectors.toSet());
|
||||
@ -383,23 +398,22 @@ public class GisServiceV2 {
|
||||
}
|
||||
}
|
||||
|
||||
// 3-a. hourly 범위 → L2 캐시 → DB fallback (누락 MMSI 부분 fallback 포함)
|
||||
// 3-a. hourly 범위 → L2 캐시 O(1) 키 기반 조회 → DB fallback (누락 MMSI)
|
||||
if (split.hasHourlyRange()) {
|
||||
DailyTrackCacheManager.DateRange hr = split.getHourlyRange();
|
||||
Map<String, List<VesselTrack>> hourlyTracks =
|
||||
hourlyTrackCache.getTracksInRange(hr.getStart(), hr.getEnd());
|
||||
hourlyTrackCache.getTracksForVessels(hr.getStart(), hr.getEnd(), requestedMmsis);
|
||||
|
||||
if (!hourlyTracks.isEmpty()) {
|
||||
Map<String, List<VesselTrack>> filtered = filterByMmsi(hourlyTracks, requestedMmsis);
|
||||
List<CompactVesselTrack> converted = vesselTrackToCompactConverter.convert(filtered);
|
||||
List<CompactVesselTrack> converted = vesselTrackToCompactConverter.convert(hourlyTracks);
|
||||
allTracks.addAll(converted);
|
||||
int totalPts = converted.stream().mapToInt(CompactVesselTrack::getPointCount).sum();
|
||||
log.info("[CACHE-MONITOR] queryWithCache L2 HIT [{}, {}): cacheVessels={}, filteredVessels={}, compactTracks={}, points={}",
|
||||
hr.getStart(), hr.getEnd(), hourlyTracks.size(), filtered.size(), converted.size(), totalPts);
|
||||
log.info("[CACHE-MONITOR] queryWithCache L2 HIT [{}, {}): resultVessels={}, compactTracks={}, points={}",
|
||||
hr.getStart(), hr.getEnd(), hourlyTracks.size(), converted.size(), totalPts);
|
||||
|
||||
// 캐시에 없는 MMSI → DB fallback
|
||||
Set<String> missingMmsis = new HashSet<>(requestedMmsis);
|
||||
missingMmsis.removeAll(filtered.keySet());
|
||||
missingMmsis.removeAll(hourlyTracks.keySet());
|
||||
if (!missingMmsis.isEmpty()) {
|
||||
VesselTracksRequest fallbackReq = VesselTracksRequest.builder()
|
||||
.startTime(hr.getStart()).endTime(hr.getEnd())
|
||||
@ -407,7 +421,7 @@ public class GisServiceV2 {
|
||||
List<CompactVesselTrack> dbResult = gisService.getVesselTracks(fallbackReq);
|
||||
allTracks.addAll(dbResult);
|
||||
log.info("[CACHE-MONITOR] queryWithCache L2 PARTIAL → DB fallback: cacheHit={}, cacheMiss={}, dbTracks={}",
|
||||
filtered.size(), missingMmsis.size(), dbResult.size());
|
||||
hourlyTracks.size(), missingMmsis.size(), dbResult.size());
|
||||
}
|
||||
} else {
|
||||
VesselTracksRequest hourlyReq = VesselTracksRequest.builder()
|
||||
@ -420,23 +434,22 @@ public class GisServiceV2 {
|
||||
}
|
||||
}
|
||||
|
||||
// 3-b. 5min 범위 → L1 캐시 → DB fallback (누락 MMSI 부분 fallback 포함)
|
||||
// 3-b. 5min 범위 → L1 캐시 O(1) 키 기반 조회 → DB fallback (누락 MMSI)
|
||||
if (split.hasFiveMinRange()) {
|
||||
DailyTrackCacheManager.DateRange fr = split.getFiveMinRange();
|
||||
Map<String, List<VesselTrack>> fiveMinTracks =
|
||||
fiveMinTrackCache.getTracksInRange(fr.getStart(), fr.getEnd());
|
||||
fiveMinTrackCache.getTracksForVessels(fr.getStart(), fr.getEnd(), requestedMmsis);
|
||||
|
||||
if (!fiveMinTracks.isEmpty()) {
|
||||
Map<String, List<VesselTrack>> filtered = filterByMmsi(fiveMinTracks, requestedMmsis);
|
||||
List<CompactVesselTrack> converted = vesselTrackToCompactConverter.convert(filtered);
|
||||
List<CompactVesselTrack> converted = vesselTrackToCompactConverter.convert(fiveMinTracks);
|
||||
allTracks.addAll(converted);
|
||||
int totalPts = converted.stream().mapToInt(CompactVesselTrack::getPointCount).sum();
|
||||
log.info("[CACHE-MONITOR] queryWithCache L1 HIT [{}, {}): cacheVessels={}, filteredVessels={}, compactTracks={}, points={}",
|
||||
fr.getStart(), fr.getEnd(), fiveMinTracks.size(), filtered.size(), converted.size(), totalPts);
|
||||
log.info("[CACHE-MONITOR] queryWithCache L1 HIT [{}, {}): resultVessels={}, compactTracks={}, points={}",
|
||||
fr.getStart(), fr.getEnd(), fiveMinTracks.size(), converted.size(), totalPts);
|
||||
|
||||
// 캐시에 없는 MMSI → DB fallback
|
||||
Set<String> missingMmsis = new HashSet<>(requestedMmsis);
|
||||
missingMmsis.removeAll(filtered.keySet());
|
||||
missingMmsis.removeAll(fiveMinTracks.keySet());
|
||||
if (!missingMmsis.isEmpty()) {
|
||||
VesselTracksRequest fallbackReq = VesselTracksRequest.builder()
|
||||
.startTime(fr.getStart()).endTime(fr.getEnd())
|
||||
@ -444,7 +457,7 @@ public class GisServiceV2 {
|
||||
List<CompactVesselTrack> dbResult = gisService.getVesselTracks(fallbackReq);
|
||||
allTracks.addAll(dbResult);
|
||||
log.info("[CACHE-MONITOR] queryWithCache L1 PARTIAL → DB fallback: cacheHit={}, cacheMiss={}, dbTracks={}",
|
||||
filtered.size(), missingMmsis.size(), dbResult.size());
|
||||
fiveMinTracks.size(), missingMmsis.size(), dbResult.size());
|
||||
}
|
||||
} else {
|
||||
VesselTracksRequest fiveMinReq = VesselTracksRequest.builder()
|
||||
|
||||
@ -0,0 +1,45 @@
|
||||
package gc.mda.signal_batch.global.config;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 항적 데이터 메모리 예산 설정
|
||||
*
|
||||
* 64GB JVM 힙 기준 파티셔닝:
|
||||
* 캐시 35GB (55%) — L1/L2/L3
|
||||
* 쿼리 20GB (31%) — REST/WebSocket 동시 쿼리
|
||||
* 시스템 9GB (14%) — GC, 스레드스택, Spring 컨텍스트 (미추적)
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
@Component
|
||||
@ConfigurationProperties(prefix = "track.memory-budget")
|
||||
public class TrackMemoryBudgetProperties {
|
||||
|
||||
/** 전체 JVM 힙 예산 (GB) */
|
||||
private int totalBudgetGb = 64;
|
||||
|
||||
/** 캐시 전용 예산 (GB) — L1+L2+L3 전체 */
|
||||
private int cacheBudgetGb = 35;
|
||||
|
||||
/** 쿼리 응답 전용 예산 (GB) */
|
||||
private int queryBudgetGb = 20;
|
||||
|
||||
/** 단일 쿼리 최대 메모리 (GB) */
|
||||
private int maxSingleQueryGb = 5;
|
||||
|
||||
/** 메모리 추정 보정 계수 (실측 기반) */
|
||||
private double estimationCorrectionFactor = 1.8;
|
||||
|
||||
/** 쿼리 메모리 대기 큐 타임아웃 (초) */
|
||||
private int queueTimeoutSeconds = 60;
|
||||
|
||||
/** 예산 경고 임계값 (0.0~1.0) */
|
||||
private double warningThreshold = 0.8;
|
||||
|
||||
/** 예산 위험 임계값 (0.0~1.0) */
|
||||
private double criticalThreshold = 0.95;
|
||||
}
|
||||
@ -0,0 +1,16 @@
|
||||
package gc.mda.signal_batch.global.exception;
|
||||
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.web.bind.annotation.ResponseStatus;
|
||||
|
||||
/**
|
||||
* 메모리 예산 초과 시 발생하는 예외 (503 Service Unavailable)
|
||||
*
|
||||
* 단일 쿼리 상한 초과, 대기 큐 타임아웃, 전체 쿼리 예산 부족 시 발생.
|
||||
*/
|
||||
@ResponseStatus(HttpStatus.SERVICE_UNAVAILABLE)
|
||||
public class MemoryBudgetExceededException extends RuntimeException {
|
||||
public MemoryBudgetExceededException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,45 @@
|
||||
package gc.mda.signal_batch.global.util;
|
||||
|
||||
import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack;
|
||||
import lombok.experimental.UtilityClass;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* CompactVesselTrack의 Heap 점유량을 바이트 단위로 추정
|
||||
*
|
||||
* 포인트당 메모리 근거:
|
||||
* double[2]: 32B (header 16B + data 16B) + ArrayList entry 8B = 40B
|
||||
* String timestamp: ~48B (object 16B + char[] ~24B + ref 8B)
|
||||
* Double speed: 24B (object 16B + double 8B)
|
||||
* 합계: ~112B per point
|
||||
*/
|
||||
@UtilityClass
|
||||
public class TrackMemoryEstimator {
|
||||
|
||||
private static final long BYTES_PER_POINT = 112L;
|
||||
private static final long OBJECT_OVERHEAD = 300L;
|
||||
|
||||
public static long estimateTrackBytes(CompactVesselTrack track) {
|
||||
if (track == null) return 0;
|
||||
int points = track.getPointCount();
|
||||
return OBJECT_OVERHEAD + (long) points * BYTES_PER_POINT;
|
||||
}
|
||||
|
||||
public static long estimateListBytes(List<CompactVesselTrack> tracks) {
|
||||
if (tracks == null || tracks.isEmpty()) return 0;
|
||||
long total = 0;
|
||||
for (CompactVesselTrack track : tracks) {
|
||||
total += estimateTrackBytes(track);
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
/**
|
||||
* 사전 추정: 일 평균 500포인트 기준
|
||||
* days × vessels × 500 × 112B
|
||||
*/
|
||||
public static long estimateQueryBytes(int days, int estimatedVessels) {
|
||||
return (long) days * estimatedVessels * 500 * BYTES_PER_POINT;
|
||||
}
|
||||
}
|
||||
@ -1,6 +1,8 @@
|
||||
package gc.mda.signal_batch.global.websocket.service;
|
||||
|
||||
import gc.mda.signal_batch.global.util.SignalKindCode;
|
||||
import gc.mda.signal_batch.global.exception.MemoryBudgetExceededException;
|
||||
import gc.mda.signal_batch.global.util.TrackMemoryEstimator;
|
||||
import gc.mda.signal_batch.global.websocket.dto.TrackChunkResponse;
|
||||
import gc.mda.signal_batch.global.websocket.interceptor.TrackQueryInterceptor;
|
||||
import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack;
|
||||
@ -56,6 +58,7 @@ public class ChunkedTrackStreamingService {
|
||||
private final TrackQueryInterceptor trackQueryInterceptor;
|
||||
private final DailyTrackCacheManager dailyTrackCacheManager;
|
||||
private final CacheTrackSimplifier cacheTrackSimplifier;
|
||||
private final TrackMemoryBudgetManager memoryBudgetManager;
|
||||
private final WKTReader wktReader = new WKTReader();
|
||||
@SuppressWarnings("unused")
|
||||
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
|
||||
@ -96,7 +99,8 @@ public class ChunkedTrackStreamingService {
|
||||
ActiveQueryManager activeQueryManager,
|
||||
TrackQueryInterceptor trackQueryInterceptor,
|
||||
DailyTrackCacheManager dailyTrackCacheManager,
|
||||
CacheTrackSimplifier cacheTrackSimplifier) {
|
||||
CacheTrackSimplifier cacheTrackSimplifier,
|
||||
TrackMemoryBudgetManager memoryBudgetManager) {
|
||||
this.queryJdbcTemplate = queryJdbcTemplate;
|
||||
this.queryDataSource = queryDataSource;
|
||||
this.simplificationStrategy = simplificationStrategy;
|
||||
@ -104,6 +108,7 @@ public class ChunkedTrackStreamingService {
|
||||
this.trackQueryInterceptor = trackQueryInterceptor;
|
||||
this.dailyTrackCacheManager = dailyTrackCacheManager;
|
||||
this.cacheTrackSimplifier = cacheTrackSimplifier;
|
||||
this.memoryBudgetManager = memoryBudgetManager;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -991,6 +996,20 @@ public class ChunkedTrackStreamingService {
|
||||
}
|
||||
}
|
||||
|
||||
// 쿼리 메모리 사전 예약
|
||||
try {
|
||||
int days = (int) Duration.between(request.getStartTime(), request.getEndTime()).toDays() + 1;
|
||||
int estimatedVessels = Math.max(1000, days * 5000); // 뷰포트 쿼리는 선박 수 예측 어려움
|
||||
long estimatedBytes = TrackMemoryEstimator.estimateQueryBytes(days, estimatedVessels);
|
||||
memoryBudgetManager.reserveQueryMemory(queryId, estimatedBytes,
|
||||
memoryBudgetManager.getProperties().getQueueTimeoutSeconds() * 1000L);
|
||||
} catch (MemoryBudgetExceededException e) {
|
||||
log.warn("[MemoryBudget] WebSocket 쿼리 메모리 예약 실패: queryId={} — {}", queryId, e.getMessage());
|
||||
statusConsumer.accept(new QueryStatusUpdate(queryId, "ERROR",
|
||||
"서버 메모리 부족. 잠시 후 다시 시도해주세요.", 0.0));
|
||||
return;
|
||||
}
|
||||
|
||||
// 취소 플래그 등록
|
||||
AtomicBoolean cancelFlag = new AtomicBoolean(false);
|
||||
queryCancelFlags.put(queryId, cancelFlag);
|
||||
@ -1330,7 +1349,8 @@ public class ChunkedTrackStreamingService {
|
||||
0.0
|
||||
));
|
||||
} finally {
|
||||
// 리소스 반환 (순서: 데이터 정리 → 취소 플래그 → 글로벌 슬롯 → 세션 카운트 → 메트릭스)
|
||||
// 리소스 반환 (순서: 메모리 예산 → 데이터 정리 → 취소 플래그 → 글로벌 슬롯 → 세션 카운트 → 메트릭스)
|
||||
memoryBudgetManager.releaseQueryMemory(queryId);
|
||||
processedTimeRanges.clear(); // 메모리 즉시 해제: 처리 시간 범위 맵
|
||||
queryCancelFlags.remove(queryId);
|
||||
if (slotAcquired) {
|
||||
|
||||
@ -42,6 +42,7 @@ public class DailyTrackCacheManager {
|
||||
|
||||
private final DataSource queryDataSource;
|
||||
private final DailyTrackCacheProperties cacheProperties;
|
||||
private final TrackMemoryBudgetManager memoryBudgetManager;
|
||||
|
||||
// 날짜별 캐시 (D-1 ~ D-N)
|
||||
private final ConcurrentHashMap<LocalDate, DailyTrackData> cache = new ConcurrentHashMap<>();
|
||||
@ -54,9 +55,11 @@ public class DailyTrackCacheManager {
|
||||
|
||||
public DailyTrackCacheManager(
|
||||
@Qualifier("queryDataSource") DataSource queryDataSource,
|
||||
DailyTrackCacheProperties cacheProperties) {
|
||||
DailyTrackCacheProperties cacheProperties,
|
||||
TrackMemoryBudgetManager memoryBudgetManager) {
|
||||
this.queryDataSource = queryDataSource;
|
||||
this.cacheProperties = cacheProperties;
|
||||
this.memoryBudgetManager = memoryBudgetManager;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -165,13 +168,19 @@ public class DailyTrackCacheManager {
|
||||
DailyTrackData data = loadDay(targetDate);
|
||||
|
||||
if (data != null && data.getVesselCount() > 0) {
|
||||
// 메모리 한도 체크
|
||||
// 메모리 한도 체크 (DailyTrackCacheProperties 자체 한도)
|
||||
if (totalMemory + data.getMemorySizeBytes() > maxMemoryBytes) {
|
||||
log.warn("Cache memory limit reached: {}GB / {}GB. Stopping at D-{}",
|
||||
totalMemory / (1024 * 1024 * 1024), cacheProperties.getMaxMemoryGb(), daysBack);
|
||||
break;
|
||||
}
|
||||
|
||||
// 메모리 예산 매니저에 등록
|
||||
if (!memoryBudgetManager.registerCacheMemory(targetDate, data.getMemorySizeBytes())) {
|
||||
log.warn("[MemoryBudget] 캐시 예산 초과로 D-{} ({}) 로드 중단", daysBack, targetDate);
|
||||
break;
|
||||
}
|
||||
|
||||
cache.put(targetDate, data);
|
||||
totalMemory += data.getMemorySizeBytes();
|
||||
loadedCount++;
|
||||
@ -421,6 +430,76 @@ public class DailyTrackCacheManager {
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* 요청된 MMSI 키로 직접 O(1) 조회 — dayTracks.get(mmsi) 호출
|
||||
* 기존 getCachedTracksMultipleDays()의 전체 스캔 대비 대폭 성능 개선.
|
||||
* 예: 7일 × 100 MMSI = 700회 get() vs 7일 × 50K 선박 = 350K 엔트리 스캔
|
||||
*/
|
||||
public List<CompactVesselTrack> getCachedTracksForVessels(
|
||||
List<LocalDate> dates, Set<String> mmsiKeys) {
|
||||
if (mmsiKeys == null || mmsiKeys.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
Map<String, CompactVesselTrack.CompactVesselTrackBuilder> merged = new HashMap<>();
|
||||
int lookupCount = 0;
|
||||
int hitCount = 0;
|
||||
|
||||
for (LocalDate date : dates) {
|
||||
DailyTrackData data = cache.get(date);
|
||||
if (data == null) continue;
|
||||
|
||||
Map<String, CompactVesselTrack> dayTracks = data.getTracks();
|
||||
|
||||
for (String mmsi : mmsiKeys) {
|
||||
CompactVesselTrack track = dayTracks.get(mmsi);
|
||||
lookupCount++;
|
||||
if (track == null) continue;
|
||||
hitCount++;
|
||||
|
||||
CompactVesselTrack.CompactVesselTrackBuilder builder = merged.get(mmsi);
|
||||
if (builder == null) {
|
||||
builder = CompactVesselTrack.builder()
|
||||
.vesselId(mmsi)
|
||||
.nationalCode(track.getNationalCode())
|
||||
.shipName(track.getShipName())
|
||||
.shipType(track.getShipType())
|
||||
.shipKindCode(track.getShipKindCode())
|
||||
.geometry(new ArrayList<>(track.getGeometry()))
|
||||
.timestamps(new ArrayList<>(track.getTimestamps()))
|
||||
.speeds(new ArrayList<>(track.getSpeeds()))
|
||||
.totalDistance(track.getTotalDistance())
|
||||
.avgSpeed(track.getAvgSpeed())
|
||||
.maxSpeed(track.getMaxSpeed())
|
||||
.pointCount(track.getPointCount());
|
||||
merged.put(mmsi, builder);
|
||||
} else {
|
||||
CompactVesselTrack existing = builder.build();
|
||||
List<double[]> geo = new ArrayList<>(existing.getGeometry());
|
||||
geo.addAll(track.getGeometry());
|
||||
List<String> ts = new ArrayList<>(existing.getTimestamps());
|
||||
ts.addAll(track.getTimestamps());
|
||||
List<Double> sp = new ArrayList<>(existing.getSpeeds());
|
||||
sp.addAll(track.getSpeeds());
|
||||
|
||||
builder.geometry(geo)
|
||||
.timestamps(ts)
|
||||
.speeds(sp)
|
||||
.totalDistance(existing.getTotalDistance() + track.getTotalDistance())
|
||||
.maxSpeed(Math.max(existing.getMaxSpeed(), track.getMaxSpeed()))
|
||||
.pointCount(existing.getPointCount() + track.getPointCount());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.info("[CACHE-MONITOR] L3.getCachedTracksForVessels: dates={}, requestedMmsi={}, lookups={}, hits={}, resultVessels={}",
|
||||
dates.size(), mmsiKeys.size(), lookupCount, hitCount, merged.size());
|
||||
|
||||
return merged.values().stream()
|
||||
.map(CompactVesselTrack.CompactVesselTrackBuilder::build)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* 요청 범위를 캐시 구간 / DB 구간으로 분리
|
||||
*/
|
||||
@ -533,6 +612,7 @@ public class DailyTrackCacheManager {
|
||||
try {
|
||||
DailyTrackData data = loadDay(yesterday);
|
||||
if (data != null && data.getVesselCount() > 0) {
|
||||
memoryBudgetManager.registerCacheMemory(yesterday, data.getMemorySizeBytes());
|
||||
cache.put(yesterday, data);
|
||||
log.info("Cache refreshed for {}: {} vessels, {} MB",
|
||||
yesterday, data.getVesselCount(), data.getMemorySizeBytes() / (1024 * 1024));
|
||||
@ -550,6 +630,7 @@ public class DailyTrackCacheManager {
|
||||
for (LocalDate d : toRemove) {
|
||||
DailyTrackData removed = cache.remove(d);
|
||||
if (removed != null) {
|
||||
memoryBudgetManager.releaseCacheMemory(d);
|
||||
log.info("Evicted cache for {}: {} vessels, {} MB",
|
||||
d, removed.getVesselCount(), removed.getMemorySizeBytes() / (1024 * 1024));
|
||||
}
|
||||
|
||||
@ -0,0 +1,300 @@
|
||||
package gc.mda.signal_batch.global.websocket.service;
|
||||
|
||||
import gc.mda.signal_batch.global.config.TrackMemoryBudgetProperties;
|
||||
import gc.mda.signal_batch.global.exception.MemoryBudgetExceededException;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* 항적 데이터 메모리 예산 관리자
|
||||
*
|
||||
* 캐시 영역과 쿼리 영역의 메모리를 논리적으로 파티셔닝하여
|
||||
* 대형 쿼리가 배치 Job/캐시를 압박하는 것을 방지.
|
||||
*
|
||||
* 쿼리 예산: ReentrantLock(fair=true) + Condition 기반 FIFO 대기 큐.
|
||||
* 캐시 예산: AtomicLong 기반 즉시 등록/해제.
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class TrackMemoryBudgetManager {
|
||||
|
||||
@Getter
|
||||
private final TrackMemoryBudgetProperties properties;
|
||||
|
||||
// 캐시 메모리 추적
|
||||
private final AtomicLong cacheUsedBytes = new AtomicLong(0);
|
||||
private final ConcurrentHashMap<String, Long> cacheAllocations = new ConcurrentHashMap<>();
|
||||
|
||||
// 쿼리 메모리 추적
|
||||
private final AtomicLong queryUsedBytes = new AtomicLong(0);
|
||||
private final ConcurrentHashMap<String, Long> queryAllocations = new ConcurrentHashMap<>();
|
||||
private final AtomicInteger waitingQueryCount = new AtomicInteger(0);
|
||||
|
||||
// FIFO 대기 메커니즘
|
||||
private final ReentrantLock queryLock = new ReentrantLock(true); // fair=true
|
||||
private final Condition queryBudgetAvailable = queryLock.newCondition();
|
||||
|
||||
// 로그 중복 방지
|
||||
private volatile long lastPressureLogTime = 0;
|
||||
|
||||
public TrackMemoryBudgetManager(TrackMemoryBudgetProperties properties) {
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
log.info("TrackMemoryBudgetManager 초기화 — total: {}GB, cache: {}GB, query: {}GB, maxSingleQuery: {}GB, correctionFactor: {}",
|
||||
properties.getTotalBudgetGb(), properties.getCacheBudgetGb(),
|
||||
properties.getQueryBudgetGb(), properties.getMaxSingleQueryGb(),
|
||||
properties.getEstimationCorrectionFactor());
|
||||
}
|
||||
|
||||
// ── 캐시 메모리 관리 ──
|
||||
|
||||
/**
|
||||
* 캐시 메모리 등록 (날짜 기반 — L3 DailyTrackCache)
|
||||
* @return true: 예산 내 등록 성공, false: 예산 초과
|
||||
*/
|
||||
public boolean registerCacheMemory(LocalDate date, long bytes) {
|
||||
return registerCacheMemory("daily::" + date, bytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* 캐시 메모리 등록 (키 기반 — L1/L2 Caffeine 버킷)
|
||||
*/
|
||||
public boolean registerCacheMemory(String key, long bytes) {
|
||||
long budgetBytes = (long) properties.getCacheBudgetGb() * 1024 * 1024 * 1024;
|
||||
long currentUsed = cacheUsedBytes.get();
|
||||
|
||||
if (currentUsed + bytes > budgetBytes) {
|
||||
log.warn("[MemoryBudget] 캐시 예산 초과: key={}, requested={}MB, used={}MB, budget={}MB",
|
||||
key, bytes / (1024 * 1024), currentUsed / (1024 * 1024), budgetBytes / (1024 * 1024));
|
||||
return false;
|
||||
}
|
||||
|
||||
Long previous = cacheAllocations.put(key, bytes);
|
||||
if (previous != null) {
|
||||
cacheUsedBytes.addAndGet(bytes - previous);
|
||||
} else {
|
||||
cacheUsedBytes.addAndGet(bytes);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 캐시 메모리 해제 (날짜 기반)
|
||||
*/
|
||||
public void releaseCacheMemory(LocalDate date) {
|
||||
releaseCacheMemory("daily::" + date);
|
||||
}
|
||||
|
||||
/**
|
||||
* 캐시 메모리 해제 (키 기반)
|
||||
*/
|
||||
public void releaseCacheMemory(String key) {
|
||||
Long released = cacheAllocations.remove(key);
|
||||
if (released != null) {
|
||||
cacheUsedBytes.addAndGet(-released);
|
||||
}
|
||||
}
|
||||
|
||||
public long getAvailableCacheBudget() {
|
||||
long budgetBytes = (long) properties.getCacheBudgetGb() * 1024 * 1024 * 1024;
|
||||
return Math.max(0, budgetBytes - cacheUsedBytes.get());
|
||||
}
|
||||
|
||||
// ── 쿼리 메모리 관리 (FIFO 대기 큐) ──
|
||||
|
||||
/**
|
||||
* 쿼리 메모리 예약 — 예산 부족 시 FIFO 대기
|
||||
*
|
||||
* @param queryId 쿼리 식별자
|
||||
* @param estimatedBytes 추정 메모리 (보정 전 raw 값)
|
||||
* @param maxWaitMs 최대 대기 시간 (밀리초)
|
||||
* @throws MemoryBudgetExceededException 단일 쿼리 상한 초과 또는 타임아웃
|
||||
*/
|
||||
public void reserveQueryMemory(String queryId, long estimatedBytes, long maxWaitMs) {
|
||||
long correctedBytes = applyCorrection(estimatedBytes);
|
||||
long maxSingleBytes = (long) properties.getMaxSingleQueryGb() * 1024 * 1024 * 1024;
|
||||
|
||||
// 단일 쿼리 상한 체크
|
||||
if (correctedBytes > maxSingleBytes) {
|
||||
throw new MemoryBudgetExceededException(
|
||||
String.format("단일 쿼리 메모리 상한 초과: estimated=%dMB, max=%dMB",
|
||||
correctedBytes / (1024 * 1024), maxSingleBytes / (1024 * 1024)));
|
||||
}
|
||||
|
||||
queryLock.lock();
|
||||
try {
|
||||
// 즉시 예약 가능한지 확인
|
||||
if (canReserveQuery(correctedBytes)) {
|
||||
doReserve(queryId, correctedBytes);
|
||||
return;
|
||||
}
|
||||
|
||||
// 대기 큐 진입
|
||||
waitingQueryCount.incrementAndGet();
|
||||
long deadline = System.nanoTime() + maxWaitMs * 1_000_000L;
|
||||
try {
|
||||
while (!canReserveQuery(correctedBytes)) {
|
||||
long remainingNanos = deadline - System.nanoTime();
|
||||
if (remainingNanos <= 0) {
|
||||
throw new MemoryBudgetExceededException(
|
||||
String.format("쿼리 메모리 대기 타임아웃: %dms, queryUsed=%dMB, budget=%dMB",
|
||||
maxWaitMs, queryUsedBytes.get() / (1024 * 1024),
|
||||
(long) properties.getQueryBudgetGb() * 1024));
|
||||
}
|
||||
queryBudgetAvailable.awaitNanos(remainingNanos);
|
||||
}
|
||||
doReserve(queryId, correctedBytes);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new MemoryBudgetExceededException("쿼리 메모리 대기 중 인터럽트 발생");
|
||||
} finally {
|
||||
waitingQueryCount.decrementAndGet();
|
||||
}
|
||||
} finally {
|
||||
queryLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 쿼리 메모리 해제 후 대기 쿼리 시그널
|
||||
*/
|
||||
public void releaseQueryMemory(String queryId) {
|
||||
Long released = queryAllocations.remove(queryId);
|
||||
if (released != null) {
|
||||
queryUsedBytes.addAndGet(-released);
|
||||
queryLock.lock();
|
||||
try {
|
||||
queryBudgetAvailable.signalAll();
|
||||
} finally {
|
||||
queryLock.unlock();
|
||||
}
|
||||
log.debug("[MemoryBudget] 쿼리 메모리 해제: queryId={}, released={}MB, remaining={}MB",
|
||||
queryId, released / (1024 * 1024), queryUsedBytes.get() / (1024 * 1024));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 쿼리 메모리 중간 업데이트 (실제 사용량이 추정과 다를 때)
|
||||
*/
|
||||
public void updateQueryMemory(String queryId, long actualBytes) {
|
||||
long corrected = applyCorrection(actualBytes);
|
||||
Long previous = queryAllocations.put(queryId, corrected);
|
||||
if (previous != null) {
|
||||
queryUsedBytes.addAndGet(corrected - previous);
|
||||
}
|
||||
}
|
||||
|
||||
// ── 모니터링 ──
|
||||
|
||||
/**
|
||||
* 예산 현황 (모니터링 API용)
|
||||
*/
|
||||
public Map<String, Object> getBudgetStatus() {
|
||||
Map<String, Object> status = new LinkedHashMap<>();
|
||||
|
||||
long cacheUsed = cacheUsedBytes.get();
|
||||
long queryUsed = queryUsedBytes.get();
|
||||
long totalUsed = cacheUsed + queryUsed;
|
||||
long cacheBudget = (long) properties.getCacheBudgetGb() * 1024 * 1024 * 1024;
|
||||
long queryBudget = (long) properties.getQueryBudgetGb() * 1024 * 1024 * 1024;
|
||||
|
||||
// 전체
|
||||
Map<String, Object> total = new LinkedHashMap<>();
|
||||
total.put("totalGb", properties.getTotalBudgetGb());
|
||||
total.put("usedMb", totalUsed / (1024 * 1024));
|
||||
total.put("usagePercent", String.format("%.1f", totalUsed * 100.0 / ((long) properties.getTotalBudgetGb() * 1024 * 1024 * 1024)));
|
||||
total.put("status", getUsageStatus(totalUsed, (long) properties.getTotalBudgetGb() * 1024 * 1024 * 1024));
|
||||
status.put("totalBudget", total);
|
||||
|
||||
// 캐시
|
||||
Map<String, Object> cacheInfo = new LinkedHashMap<>();
|
||||
cacheInfo.put("budgetGb", properties.getCacheBudgetGb());
|
||||
cacheInfo.put("usedMb", cacheUsed / (1024 * 1024));
|
||||
cacheInfo.put("usagePercent", cacheBudget > 0 ? String.format("%.1f", cacheUsed * 100.0 / cacheBudget) : "0.0");
|
||||
cacheInfo.put("allocations", cacheAllocations.size());
|
||||
status.put("cacheBudget", cacheInfo);
|
||||
|
||||
// 쿼리
|
||||
Map<String, Object> queryInfo = new LinkedHashMap<>();
|
||||
queryInfo.put("budgetGb", properties.getQueryBudgetGb());
|
||||
queryInfo.put("usedMb", queryUsed / (1024 * 1024));
|
||||
queryInfo.put("usagePercent", queryBudget > 0 ? String.format("%.1f", queryUsed * 100.0 / queryBudget) : "0.0");
|
||||
queryInfo.put("activeReservations", queryAllocations.size());
|
||||
queryInfo.put("waitingCount", waitingQueryCount.get());
|
||||
status.put("queryBudget", queryInfo);
|
||||
|
||||
// JVM 힙
|
||||
Runtime runtime = Runtime.getRuntime();
|
||||
long usedHeap = runtime.totalMemory() - runtime.freeMemory();
|
||||
long maxHeap = runtime.maxMemory();
|
||||
Map<String, Object> heap = new LinkedHashMap<>();
|
||||
heap.put("usedMb", usedHeap / (1024 * 1024));
|
||||
heap.put("maxMb", maxHeap / (1024 * 1024));
|
||||
heap.put("usagePercent", String.format("%.1f", usedHeap * 100.0 / maxHeap));
|
||||
status.put("heapInfo", heap);
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
public boolean isBudgetPressureHigh() {
|
||||
long totalUsed = cacheUsedBytes.get() + queryUsedBytes.get();
|
||||
long totalBudget = (long) properties.getTotalBudgetGb() * 1024 * 1024 * 1024;
|
||||
double ratio = (double) totalUsed / totalBudget;
|
||||
if (ratio >= properties.getWarningThreshold()) {
|
||||
logBudgetPressure(ratio);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// ── 내부 메서드 ──
|
||||
|
||||
private boolean canReserveQuery(long bytes) {
|
||||
long budgetBytes = (long) properties.getQueryBudgetGb() * 1024 * 1024 * 1024;
|
||||
return queryUsedBytes.get() + bytes <= budgetBytes;
|
||||
}
|
||||
|
||||
private void doReserve(String queryId, long correctedBytes) {
|
||||
queryAllocations.put(queryId, correctedBytes);
|
||||
queryUsedBytes.addAndGet(correctedBytes);
|
||||
log.debug("[MemoryBudget] 쿼리 메모리 예약: queryId={}, reserved={}MB, queryTotal={}MB",
|
||||
queryId, correctedBytes / (1024 * 1024), queryUsedBytes.get() / (1024 * 1024));
|
||||
}
|
||||
|
||||
private long applyCorrection(long rawEstimate) {
|
||||
return (long) (rawEstimate * properties.getEstimationCorrectionFactor());
|
||||
}
|
||||
|
||||
private String getUsageStatus(long used, long total) {
|
||||
if (total == 0) return "UNKNOWN";
|
||||
double ratio = (double) used / total;
|
||||
if (ratio >= properties.getCriticalThreshold()) return "CRITICAL";
|
||||
if (ratio >= properties.getWarningThreshold()) return "WARNING";
|
||||
return "NORMAL";
|
||||
}
|
||||
|
||||
private void logBudgetPressure(double ratio) {
|
||||
long now = System.currentTimeMillis();
|
||||
if (now - lastPressureLogTime > 5000) {
|
||||
lastPressureLogTime = now;
|
||||
log.warn("[MemoryBudget] 예산 압박: usage={}, cache={}MB, query={}MB, waiting={}",
|
||||
String.format("%.1f%%", ratio * 100),
|
||||
cacheUsedBytes.get() / (1024 * 1024),
|
||||
queryUsedBytes.get() / (1024 * 1024),
|
||||
waitingQueryCount.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -5,6 +5,7 @@ import gc.mda.signal_batch.batch.reader.FiveMinTrackCache;
|
||||
import gc.mda.signal_batch.batch.reader.HourlyTrackCache;
|
||||
import gc.mda.signal_batch.domain.vessel.service.VesselLatestPositionCache;
|
||||
import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager;
|
||||
import gc.mda.signal_batch.global.websocket.service.TrackMemoryBudgetManager;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -45,6 +46,9 @@ public class CacheMonitoringController {
|
||||
@Autowired(required = false)
|
||||
private VesselLatestPositionCache latestPositionCache;
|
||||
|
||||
@Autowired
|
||||
private TrackMemoryBudgetManager memoryBudgetManager;
|
||||
|
||||
/**
|
||||
* 캐시 통계 조회 (Dashboard 표시용 — 전체 캐시 집계)
|
||||
*/
|
||||
@ -189,4 +193,13 @@ public class CacheMonitoringController {
|
||||
health.put("latestPosition", latestPositionCache != null ? "UP" : "DISABLED");
|
||||
return ResponseEntity.ok(health);
|
||||
}
|
||||
|
||||
/**
|
||||
* 메모리 예산 현황 (캐시 + 쿼리 파티셔닝 + JVM 힙)
|
||||
*/
|
||||
@GetMapping("/budget")
|
||||
@Operation(summary = "메모리 예산 현황", description = "캐시/쿼리 메모리 예산 사용량, 대기 큐, JVM 힙 정보를 조회합니다")
|
||||
public ResponseEntity<Map<String, Object>> getMemoryBudgetStatus() {
|
||||
return ResponseEntity.ok(memoryBudgetManager.getBudgetStatus());
|
||||
}
|
||||
}
|
||||
|
||||
@ -211,6 +211,9 @@ vessel: # spring 하위가 아닌 최상위 레벨
|
||||
max-size: 60000 # 최대 60,000척
|
||||
refresh-interval-minutes: 2 # 2분치 데이터 조회 (수집 지연 고려)
|
||||
|
||||
# L2 HourlyTrackCache 간소화 (운영 환경 활성화)
|
||||
hourly-simplification:
|
||||
enabled: true # 운영 환경: 활성화
|
||||
|
||||
# 비정상 궤적 검출 설정 (개선됨)
|
||||
abnormal-detection:
|
||||
@ -284,6 +287,18 @@ cache:
|
||||
max-memory-gb: 6 # 최대 6GB (일 평균 ~720MB × 7일 = ~5GB)
|
||||
warmup-async: true # 비동기 워밍업 (서버 시작 차단 없음)
|
||||
|
||||
# 항적 데이터 메모리 예산 (64GB JVM 기준)
|
||||
track:
|
||||
memory-budget:
|
||||
total-budget-gb: 64 # 전체 JVM 힙
|
||||
cache-budget-gb: 35 # L1+L2+L3 캐시 (L3 5GB + L2 ~14GB + L1 ~3GB + 여유 13GB)
|
||||
query-budget-gb: 20 # REST/WebSocket 동시 쿼리 (동시 60쿼리 × ~300MB)
|
||||
max-single-query-gb: 5 # 단일 쿼리 상한
|
||||
estimation-correction-factor: 1.8 # 실측 기반 보정 계수
|
||||
queue-timeout-seconds: 60
|
||||
warning-threshold: 0.8
|
||||
critical-threshold: 0.95
|
||||
|
||||
# WebSocket 부하 제어 설정
|
||||
websocket:
|
||||
query:
|
||||
|
||||
@ -272,6 +272,13 @@ vessel:
|
||||
ttl-minutes: 120 # 캐시 TTL: 120분 (위성 AIS 30~60분 간격 고려)
|
||||
max-size: 100000 # 최대 선박 수: 100,000척 (2시간 누적 고려)
|
||||
|
||||
# L2 HourlyTrackCache 간소화 설정
|
||||
hourly-simplification:
|
||||
enabled: false # 기본값: 비활성화 (프로파일별로 활성화)
|
||||
cron: "0 30 6,12,18 * * *" # 06:30, 12:30, 18:30 실행
|
||||
hours-ago: 6 # 6시간 이상 경과 엔트리 대상
|
||||
sample-rate: 2 # 매 2번째 포인트만 유지 (~50% 감소)
|
||||
|
||||
# ==================== S&P Global AIS API 설정 ====================
|
||||
app:
|
||||
ais-api:
|
||||
@ -301,6 +308,18 @@ app:
|
||||
warmup-enabled: true
|
||||
warmup-days: 7
|
||||
|
||||
# 항적 데이터 메모리 예산 (논리적 파티셔닝)
|
||||
track:
|
||||
memory-budget:
|
||||
total-budget-gb: 64 # 전체 JVM 힙 예산
|
||||
cache-budget-gb: 35 # L1/L2/L3 캐시 (55%)
|
||||
query-budget-gb: 20 # REST/WebSocket 동시 쿼리 (31%)
|
||||
max-single-query-gb: 5 # 단일 쿼리 상한
|
||||
estimation-correction-factor: 1.8 # 메모리 추정 보정 계수
|
||||
queue-timeout-seconds: 60 # 쿼리 대기 큐 타임아웃
|
||||
warning-threshold: 0.8 # 예산 경고 임계값 (80%)
|
||||
critical-threshold: 0.95 # 예산 위험 임계값 (95%)
|
||||
|
||||
# Swagger/OpenAPI 설정
|
||||
springdoc:
|
||||
api-docs:
|
||||
|
||||
불러오는 중...
Reference in New Issue
Block a user