feat: 일일 데이터 인메모리 캐시 구현 (Phase 6)

- DailyTrackCacheManager: D-1~D-7 daily 테이블 데이터 인메모리 캐시
  - @Async 비동기 워밍업 (서버 시작 차단 없음, 최근 우선 로드)
  - 뷰포트 필터링, 다중 날짜 병합 조회, 하이브리드 쿼리 분리
  - 메모리 한도 체크 (기본 5GB), 날짜별 즉시 활성화
- DailyTrackCacheProperties: enabled, retentionDays, maxMemoryGb 설정
- DailyAggregationJobConfig: 배치 완료 시 캐시 자동 갱신 리스너
- ChunkedTrackStreamingService: daily 전략에서 캐시 우선 조회 + DB 폴백
- StompTrackStreamingService: 동일 캐시 우선 패턴 적용
- WebSocketMonitoringController: GET /api/websocket/daily-cache 엔드포인트
- application-prod.yml: cache.daily-track 설정 추가

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
HeungTak Lee 2026-02-06 15:34:20 +09:00
부모 7bd7bf556e
커밋 03b14e687a
4개의 변경된 파일679개의 추가작업 그리고 2개의 파일을 삭제

파일 보기

@ -1,9 +1,12 @@
package gc.mda.signal_batch.batch.job;
import gc.mda.signal_batch.batch.listener.JobCompletionListener;
import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.JobParametersValidator;
import org.springframework.batch.core.job.DefaultJobParametersValidator;
import org.springframework.batch.core.job.builder.JobBuilder;
@ -24,6 +27,7 @@ public class DailyAggregationJobConfig {
private final JobRepository jobRepository;
private final DailyAggregationStepConfig dailyAggregationStepConfig;
private final JobCompletionListener jobCompletionListener;
private final DailyTrackCacheManager dailyTrackCacheManager;
@Bean
public Job dailyAggregationJob() {
@ -31,12 +35,37 @@ public class DailyAggregationJobConfig {
.incrementer(new RunIdIncrementer())
.validator(dailyJobParametersValidator())
.listener(jobCompletionListener)
.listener(dailyCacheRefreshListener())
.start(dailyAggregationStepConfig.mergeDailyTracksStep())
.next(dailyAggregationStepConfig.gridDailySummaryStep())
.next(dailyAggregationStepConfig.areaDailySummaryStep())
.build();
}
@Bean
public JobExecutionListener dailyCacheRefreshListener() {
return new JobExecutionListener() {
@Override
public void beforeJob(JobExecution jobExecution) {
// no-op
}
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus().isUnsuccessful()) {
log.warn("Daily aggregation job failed, skipping cache refresh");
return;
}
try {
log.info("Daily aggregation job completed, refreshing daily track cache");
dailyTrackCacheManager.refreshAfterDailyJob();
} catch (Exception e) {
log.error("Failed to refresh daily track cache after job: {}", e.getMessage());
}
}
};
}
@Bean
public JobParametersValidator dailyJobParametersValidator() {
DefaultJobParametersValidator validator = new DefaultJobParametersValidator();

파일 보기

@ -0,0 +1,19 @@
package gc.mda.signal_batch.global.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "cache.daily-track")
public class DailyTrackCacheProperties {
/** 캐시 활성화 여부 */
private boolean enabled = false;
/** 캐시 보관 일수 (오늘 제외, D-1 ~ D-N) */
private int retentionDays = 7;
/** 최대 메모리 사용량 (GB) */
private int maxMemoryGb = 5;
/** 비동기 워밍업 여부 */
private boolean warmupAsync = true;
}

파일 보기

@ -0,0 +1,618 @@
package gc.mda.signal_batch.global.websocket.service;
import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack;
import gc.mda.signal_batch.global.config.DailyTrackCacheProperties;
import gc.mda.signal_batch.global.util.ShipKindCodeConverter;
import gc.mda.signal_batch.domain.vessel.service.IntegrationVesselService;
import gc.mda.signal_batch.domain.vessel.dto.IntegrationVessel;
import gc.mda.signal_batch.global.util.IntegrationSignalConstants;
import lombok.extern.slf4j.Slf4j;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.LineString;
import org.locationtech.jts.io.ParseException;
import org.locationtech.jts.io.WKTReader;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.sql.DataSource;
import java.sql.*;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
* 일일(Daily) 항적 데이터 인메모리 캐시 매니저
* <p>
* 어제(D-1) ~ 7일 (D-7) daily 테이블 데이터를 메모리에 캐시하여
* DB 조회를 생략하고 즉시 응답. 오늘(D-0) 데이터는 항상 DB 조회.
* <p>
* 비동기 독립 실행: 캐시 로드는 별도 스레드에서 수행.
* 스케줄러, REST API, WebSocket 응답 모두 캐시 로드 완료를 기다리지 않음.
*/
@Slf4j
@Service
public class DailyTrackCacheManager {
public enum CacheStatus {
NOT_STARTED, LOADING, PARTIAL, READY, DISABLED
}
private final DataSource queryDataSource;
private final DailyTrackCacheProperties cacheProperties;
private final IntegrationVesselService integrationVesselService;
// 날짜별 캐시 (D-1 ~ D-N)
private final ConcurrentHashMap<LocalDate, DailyTrackData> cache = new ConcurrentHashMap<>();
private final AtomicReference<CacheStatus> status = new AtomicReference<>(CacheStatus.NOT_STARTED);
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// 스레드 로컬 WKTReader (thread-safe)
private static final ThreadLocal<WKTReader> wktReaderLocal = ThreadLocal.withInitial(WKTReader::new);
public DailyTrackCacheManager(
@Qualifier("queryDataSource") DataSource queryDataSource,
DailyTrackCacheProperties cacheProperties,
IntegrationVesselService integrationVesselService) {
this.queryDataSource = queryDataSource;
this.cacheProperties = cacheProperties;
this.integrationVesselService = integrationVesselService;
}
/**
* 캐시된 날짜별 데이터
*/
public static class DailyTrackData {
private final LocalDate date;
private final Map<String, CompactVesselTrack> tracks; // key: "sigSrcCd_targetId"
private final long loadedAtMillis;
private final int vesselCount;
private final long memorySizeBytes;
public DailyTrackData(LocalDate date, Map<String, CompactVesselTrack> tracks, long memorySizeBytes) {
this.date = date;
this.tracks = tracks;
this.loadedAtMillis = System.currentTimeMillis();
this.vesselCount = tracks.size();
this.memorySizeBytes = memorySizeBytes;
}
public LocalDate getDate() { return date; }
public Map<String, CompactVesselTrack> getTracks() { return tracks; }
public long getLoadedAtMillis() { return loadedAtMillis; }
public int getVesselCount() { return vesselCount; }
public long getMemorySizeBytes() { return memorySizeBytes; }
}
/**
* 쿼리 범위 분리 결과
*/
public static class SplitQueryResult {
private final List<LocalDate> cachedDates; // 캐시에서 가져올 날짜
private final List<DateRange> dbRanges; // DB 조회 필요 범위 (연속 날짜 묶음)
private final DateRange todayRange; // 오늘 구간 (hourly/5min)
public SplitQueryResult(List<LocalDate> cachedDates, List<DateRange> dbRanges, DateRange todayRange) {
this.cachedDates = cachedDates;
this.dbRanges = dbRanges;
this.todayRange = todayRange;
}
public List<LocalDate> getCachedDates() { return cachedDates; }
public List<DateRange> getDbRanges() { return dbRanges; }
public DateRange getTodayRange() { return todayRange; }
public boolean hasCachedData() { return !cachedDates.isEmpty(); }
public boolean hasDbRanges() { return !dbRanges.isEmpty(); }
public boolean hasTodayRange() { return todayRange != null; }
}
public static class DateRange {
private final LocalDateTime start;
private final LocalDateTime end;
public DateRange(LocalDateTime start, LocalDateTime end) {
this.start = start;
this.end = end;
}
public LocalDateTime getStart() { return start; }
public LocalDateTime getEnd() { return end; }
}
// 비동기 캐시 워밍업
@EventListener(ApplicationReadyEvent.class)
public void onApplicationReady() {
if (!cacheProperties.isEnabled()) {
status.set(CacheStatus.DISABLED);
log.info("Daily track cache is disabled");
return;
}
if (cacheProperties.isWarmupAsync()) {
warmUpCacheAsync();
} else {
warmUpCache();
}
}
@Async("trackStreamingExecutor")
public void warmUpCacheAsync() {
warmUpCache();
}
/**
* 캐시 워밍업: D-1 D-2 ... D-N 순서로 최근 우선 로드
*/
public void warmUpCache() {
if (!cacheProperties.isEnabled()) {
status.set(CacheStatus.DISABLED);
return;
}
status.set(CacheStatus.LOADING);
log.info("Daily track cache warmup started (async): retentionDays={}, maxMemoryGb={}",
cacheProperties.getRetentionDays(), cacheProperties.getMaxMemoryGb());
long totalStart = System.currentTimeMillis();
long totalMemory = 0;
long maxMemoryBytes = (long) cacheProperties.getMaxMemoryGb() * 1024 * 1024 * 1024;
LocalDate today = LocalDate.now();
int loadedCount = 0;
for (int daysBack = 1; daysBack <= cacheProperties.getRetentionDays(); daysBack++) {
LocalDate targetDate = today.minusDays(daysBack);
try {
long dateStart = System.currentTimeMillis();
DailyTrackData data = loadDay(targetDate);
if (data != null && data.getVesselCount() > 0) {
// 메모리 한도 체크
if (totalMemory + data.getMemorySizeBytes() > maxMemoryBytes) {
log.warn("Cache memory limit reached: {}GB / {}GB. Stopping at D-{}",
totalMemory / (1024 * 1024 * 1024), cacheProperties.getMaxMemoryGb(), daysBack);
break;
}
cache.put(targetDate, data);
totalMemory += data.getMemorySizeBytes();
loadedCount++;
long elapsed = System.currentTimeMillis() - dateStart;
log.info("Cached D-{} ({}): {} vessels, {} MB, {}ms",
daysBack, targetDate, data.getVesselCount(),
data.getMemorySizeBytes() / (1024 * 1024), elapsed);
// 부분 로드 시점에 PARTIAL 상태로 전환
if (status.get() == CacheStatus.LOADING) {
status.set(CacheStatus.PARTIAL);
}
} else {
log.info("No daily data for D-{} ({})", daysBack, targetDate);
}
} catch (Exception e) {
log.error("Failed to load cache for D-{} ({}): {}", daysBack, targetDate, e.getMessage());
}
}
status.set(CacheStatus.READY);
long totalElapsed = System.currentTimeMillis() - totalStart;
log.info("Daily track cache warmup completed: {} days loaded, total {} MB, {}ms",
loadedCount, totalMemory / (1024 * 1024), totalElapsed);
}
/**
* 특정 날짜의 daily 테이블 전체 로드 CompactVesselTrack 변환
*/
public DailyTrackData loadDay(LocalDate date) {
LocalDateTime dayStart = date.atStartOfDay();
LocalDateTime dayEnd = date.plusDays(1).atStartOfDay();
String sql = "SELECT sig_src_cd, target_id, time_bucket, " +
"public.ST_AsText(track_geom) as track_geom, " +
"distance_nm, avg_speed, max_speed, point_count, " +
"start_position->>'time' as start_time, " +
"end_position->>'time' as end_time " +
"FROM signal.t_vessel_tracks_daily " +
"WHERE time_bucket >= ? AND time_bucket < ? " +
"ORDER BY sig_src_cd, target_id";
Map<String, VesselAccumulator> vesselMap = new HashMap<>(50000);
long estimatedMemory = 0;
WKTReader wktReader = wktReaderLocal.get();
try (Connection conn = queryDataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setTimestamp(1, Timestamp.valueOf(dayStart));
ps.setTimestamp(2, Timestamp.valueOf(dayEnd));
ps.setFetchSize(10000);
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
String sigSrcCd = rs.getString("sig_src_cd");
String targetId = rs.getString("target_id");
String vesselId = sigSrcCd + "_" + targetId;
VesselAccumulator acc = vesselMap.computeIfAbsent(vesselId, k -> {
VesselAccumulator a = new VesselAccumulator();
a.sigSrcCd = sigSrcCd;
a.targetId = targetId;
return a;
});
String trackGeomWkt = rs.getString("track_geom");
Timestamp timeBucket = rs.getTimestamp("time_bucket");
String startTimeStr = null;
String endTimeStr = null;
try { startTimeStr = rs.getString("start_time"); } catch (SQLException ignored) {}
try { endTimeStr = rs.getString("end_time"); } catch (SQLException ignored) {}
double distanceNm = rs.getDouble("distance_nm");
double maxSpeed = rs.getDouble("max_speed");
acc.totalDistance += distanceNm;
acc.maxSpeed = Math.max(acc.maxSpeed, maxSpeed);
if (trackGeomWkt != null && !trackGeomWkt.isEmpty() && !"LINESTRING EMPTY".equals(trackGeomWkt)) {
try {
LineString lineString = (LineString) wktReader.read(trackGeomWkt);
if (lineString.getNumPoints() == 0) continue;
LocalDateTime baseTime = timeBucket.toLocalDateTime();
if (startTimeStr != null && !startTimeStr.isEmpty()) {
try {
baseTime = LocalDateTime.parse(startTimeStr, TIMESTAMP_FORMATTER);
} catch (Exception ignored) {}
}
Coordinate[] coords = lineString.getCoordinates();
for (Coordinate coord : coords) {
acc.geometry.add(new double[]{coord.x, coord.y});
// M 값이 있으면 타임스탬프로 사용
if (!Double.isNaN(coord.getM())) {
acc.timestamps.add(String.valueOf((long) coord.getM()));
} else {
acc.timestamps.add(String.valueOf(baseTime.toEpochSecond(java.time.ZoneOffset.of("+09:00"))));
}
// 속도 추산 (인접 좌표 거리/시간)
acc.speeds.add(0.0);
}
acc.pointCount += coords.length;
estimatedMemory += coords.length * 40L; // 좌표당 40바이트 추정
} catch (ParseException e) {
log.debug("Failed to parse track_geom for vessel {}: {}", vesselId, e.getMessage());
}
}
}
}
} catch (Exception e) {
log.error("Failed to load daily data for {}: {}", date, e.getMessage());
return null;
}
if (vesselMap.isEmpty()) {
return null;
}
// VesselAccumulator CompactVesselTrack 변환
Map<String, CompactVesselTrack> tracks = new HashMap<>(vesselMap.size());
for (Map.Entry<String, VesselAccumulator> entry : vesselMap.entrySet()) {
VesselAccumulator acc = entry.getValue();
if (acc.geometry.isEmpty()) continue;
double avgSpeed = acc.pointCount > 0 ? acc.totalDistance / Math.max(1, acc.pointCount) * 60 : 0;
// shipKindCode 계산
String shipKindCode = ShipKindCodeConverter.getShipKindCodeWithNamePattern(
acc.sigSrcCd, null, null, acc.targetId);
// 통합선박 ID 조회
String integrationTargetId = null;
try {
IntegrationVessel iv = integrationVesselService.findByVessel(acc.sigSrcCd, acc.targetId);
if (iv != null) {
integrationTargetId = iv.generateIntegrationId();
}
} catch (Exception ignored) {}
CompactVesselTrack track = CompactVesselTrack.builder()
.vesselId(entry.getKey())
.sigSrcCd(acc.sigSrcCd)
.targetId(acc.targetId)
.shipKindCode(shipKindCode)
.integrationTargetId(integrationTargetId)
.geometry(acc.geometry)
.timestamps(acc.timestamps)
.speeds(acc.speeds)
.totalDistance(acc.totalDistance)
.avgSpeed(avgSpeed)
.maxSpeed(acc.maxSpeed)
.pointCount(acc.pointCount)
.build();
tracks.put(entry.getKey(), track);
}
estimatedMemory += tracks.size() * 200L; // 객체 오버헤드
return new DailyTrackData(date, tracks, estimatedMemory);
}
// 캐시 조회 API
/**
* 특정 날짜가 캐시되어 있는지 확인
*/
public boolean isCached(LocalDate date) {
return cache.containsKey(date);
}
/**
* 캐시에서 특정 날짜의 전체 항적 조회
*/
public List<CompactVesselTrack> getCachedTracks(LocalDate date) {
DailyTrackData data = cache.get(date);
if (data == null) return Collections.emptyList();
return new ArrayList<>(data.getTracks().values());
}
/**
* 캐시에서 특정 날짜의 항적을 뷰포트로 필터링하여 조회
*/
public List<CompactVesselTrack> getCachedTracks(LocalDate date, double minLon, double minLat, double maxLon, double maxLat) {
DailyTrackData data = cache.get(date);
if (data == null) return Collections.emptyList();
return data.getTracks().values().stream()
.filter(track -> isInViewport(track, minLon, minLat, maxLon, maxLat))
.collect(Collectors.toList());
}
/**
* 여러 날짜의 캐시 데이터를 vessel 기준으로 병합
*/
public List<CompactVesselTrack> getCachedTracksMultipleDays(List<LocalDate> dates) {
Map<String, CompactVesselTrack.CompactVesselTrackBuilder> merged = new HashMap<>();
for (LocalDate date : dates) {
DailyTrackData data = cache.get(date);
if (data == null) continue;
for (Map.Entry<String, CompactVesselTrack> entry : data.getTracks().entrySet()) {
String vesselId = entry.getKey();
CompactVesselTrack track = entry.getValue();
CompactVesselTrack.CompactVesselTrackBuilder builder = merged.get(vesselId);
if (builder == null) {
// 번째 날짜: 빌더 생성
builder = CompactVesselTrack.builder()
.vesselId(vesselId)
.sigSrcCd(track.getSigSrcCd())
.targetId(track.getTargetId())
.nationalCode(track.getNationalCode())
.shipName(track.getShipName())
.shipType(track.getShipType())
.shipKindCode(track.getShipKindCode())
.integrationTargetId(track.getIntegrationTargetId())
.geometry(new ArrayList<>(track.getGeometry()))
.timestamps(new ArrayList<>(track.getTimestamps()))
.speeds(new ArrayList<>(track.getSpeeds()))
.totalDistance(track.getTotalDistance())
.avgSpeed(track.getAvgSpeed())
.maxSpeed(track.getMaxSpeed())
.pointCount(track.getPointCount());
merged.put(vesselId, builder);
} else {
// 후속 날짜: 기존 빌더의 데이터가 이미 build 전이므로
// 별도 AccumulatorTrack으로 처리
CompactVesselTrack existing = builder.build();
List<double[]> geo = new ArrayList<>(existing.getGeometry());
geo.addAll(track.getGeometry());
List<String> ts = new ArrayList<>(existing.getTimestamps());
ts.addAll(track.getTimestamps());
List<Double> sp = new ArrayList<>(existing.getSpeeds());
sp.addAll(track.getSpeeds());
builder.geometry(geo)
.timestamps(ts)
.speeds(sp)
.totalDistance(existing.getTotalDistance() + track.getTotalDistance())
.maxSpeed(Math.max(existing.getMaxSpeed(), track.getMaxSpeed()))
.pointCount(existing.getPointCount() + track.getPointCount());
}
}
}
return merged.values().stream()
.map(CompactVesselTrack.CompactVesselTrackBuilder::build)
.collect(Collectors.toList());
}
/**
* 요청 범위를 캐시 구간 / DB 구간으로 분리
*/
public SplitQueryResult splitQueryRange(LocalDateTime startTime, LocalDateTime endTime) {
LocalDate today = LocalDate.now();
List<LocalDate> cachedDates = new ArrayList<>();
List<LocalDate> dbDates = new ArrayList<>();
DateRange todayRange = null;
// 요청 범위의 날짜별 분류
LocalDate startDate = startTime.toLocalDate();
LocalDate endDate = endTime.toLocalDate();
for (LocalDate d = startDate; !d.isAfter(endDate); d = d.plusDays(1)) {
if (d.equals(today)) {
// 오늘 hourly/5min 테이블 조회
LocalDateTime todayStart = today.atStartOfDay();
LocalDateTime todayEnd = endTime.isAfter(LocalDateTime.now()) ? LocalDateTime.now() : endTime;
if (todayStart.isBefore(startTime)) todayStart = startTime;
if (todayEnd.isAfter(todayStart)) {
todayRange = new DateRange(todayStart, todayEnd);
}
} else if (d.isAfter(today)) {
// 미래 날짜 무시
continue;
} else if (isCached(d)) {
cachedDates.add(d);
} else {
dbDates.add(d);
}
}
// DB 조회 필요 날짜를 연속 범위로 묶기
List<DateRange> dbRanges = mergeConsecutiveDates(dbDates, startTime, endTime);
return new SplitQueryResult(cachedDates, dbRanges, todayRange);
}
/**
* 연속된 날짜들을 DateRange로 묶기
*/
private List<DateRange> mergeConsecutiveDates(List<LocalDate> dates, LocalDateTime reqStart, LocalDateTime reqEnd) {
if (dates.isEmpty()) return Collections.emptyList();
Collections.sort(dates);
List<DateRange> ranges = new ArrayList<>();
LocalDate rangeStart = dates.get(0);
LocalDate rangeLast = dates.get(0);
for (int i = 1; i < dates.size(); i++) {
LocalDate d = dates.get(i);
if (d.equals(rangeLast.plusDays(1))) {
rangeLast = d;
} else {
ranges.add(toDateRange(rangeStart, rangeLast, reqStart, reqEnd));
rangeStart = d;
rangeLast = d;
}
}
ranges.add(toDateRange(rangeStart, rangeLast, reqStart, reqEnd));
return ranges;
}
private DateRange toDateRange(LocalDate start, LocalDate end, LocalDateTime reqStart, LocalDateTime reqEnd) {
LocalDateTime s = start.atStartOfDay();
LocalDateTime e = end.plusDays(1).atStartOfDay();
// 요청 범위로 클리핑
if (s.isBefore(reqStart)) s = reqStart;
if (e.isAfter(reqEnd)) e = reqEnd;
return new DateRange(s, e);
}
// 캐시 갱신
/**
* 일일 배치 완료 캐시 갱신: 전날 로드 + 만료 데이터 제거
*/
public void refreshAfterDailyJob() {
if (!cacheProperties.isEnabled()) return;
LocalDate today = LocalDate.now();
LocalDate yesterday = today.minusDays(1);
log.info("Refreshing daily track cache after daily job: loading {}", yesterday);
// 전날 데이터 ()로드
try {
DailyTrackData data = loadDay(yesterday);
if (data != null && data.getVesselCount() > 0) {
cache.put(yesterday, data);
log.info("Cache refreshed for {}: {} vessels, {} MB",
yesterday, data.getVesselCount(), data.getMemorySizeBytes() / (1024 * 1024));
}
} catch (Exception e) {
log.error("Failed to refresh cache for {}: {}", yesterday, e.getMessage());
}
// 보관 기간 초과 데이터 제거
LocalDate oldestAllowed = today.minusDays(cacheProperties.getRetentionDays());
List<LocalDate> toRemove = cache.keySet().stream()
.filter(d -> d.isBefore(oldestAllowed))
.collect(Collectors.toList());
for (LocalDate d : toRemove) {
DailyTrackData removed = cache.remove(d);
if (removed != null) {
log.info("Evicted cache for {}: {} vessels, {} MB",
d, removed.getVesselCount(), removed.getMemorySizeBytes() / (1024 * 1024));
}
}
}
// 모니터링
/**
* 캐시 상태 정보 (모니터링용)
*/
public Map<String, Object> getCacheStatus() {
Map<String, Object> info = new LinkedHashMap<>();
info.put("status", status.get().name());
info.put("enabled", cacheProperties.isEnabled());
info.put("retentionDays", cacheProperties.getRetentionDays());
info.put("maxMemoryGb", cacheProperties.getMaxMemoryGb());
info.put("cachedDays", cache.size());
long totalMemory = 0;
int totalVessels = 0;
List<Map<String, Object>> dayDetails = new ArrayList<>();
for (Map.Entry<LocalDate, DailyTrackData> entry : cache.entrySet()) {
DailyTrackData data = entry.getValue();
totalMemory += data.getMemorySizeBytes();
totalVessels += data.getVesselCount();
Map<String, Object> dayInfo = new LinkedHashMap<>();
dayInfo.put("date", entry.getKey().toString());
dayInfo.put("vesselCount", data.getVesselCount());
dayInfo.put("memorySizeMb", data.getMemorySizeBytes() / (1024 * 1024));
dayInfo.put("loadedAt", new java.util.Date(data.getLoadedAtMillis()).toString());
dayDetails.add(dayInfo);
}
info.put("totalVessels", totalVessels);
info.put("totalMemoryMb", totalMemory / (1024 * 1024));
info.put("days", dayDetails);
return info;
}
public CacheStatus getStatus() {
return status.get();
}
public boolean isEnabled() {
return cacheProperties.isEnabled();
}
// 내부 유틸
private boolean isInViewport(CompactVesselTrack track, double minLon, double minLat, double maxLon, double maxLat) {
if (track.getGeometry() == null || track.getGeometry().isEmpty()) return false;
for (double[] coord : track.getGeometry()) {
if (coord[0] >= minLon && coord[0] <= maxLon && coord[1] >= minLat && coord[1] <= maxLat) {
return true;
}
}
return false;
}
/**
* 선박 데이터 누적용 내부 클래스
*/
private static class VesselAccumulator {
String sigSrcCd;
String targetId;
List<double[]> geometry = new ArrayList<>(500);
List<String> timestamps = new ArrayList<>(500);
List<Double> speeds = new ArrayList<>(500);
double totalDistance = 0;
double maxSpeed = 0;
int pointCount = 0;
}
}

파일 보기

@ -2,6 +2,7 @@ package gc.mda.signal_batch.monitoring.controller;
import gc.mda.signal_batch.monitoring.service.TrackStreamingMetrics;
import gc.mda.signal_batch.global.websocket.service.ActiveQueryManager;
import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager;
import gc.mda.signal_batch.global.websocket.service.StompTrackStreamingService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
@ -27,6 +28,7 @@ public class WebSocketMonitoringController {
private final TrackStreamingMetrics trackStreamingMetrics;
private final StompTrackStreamingService trackStreamingService;
private final ActiveQueryManager activeQueryManager;
private final DailyTrackCacheManager dailyTrackCacheManager;
/**
* WebSocket 스트리밍 현황 조회
@ -115,6 +117,15 @@ public class WebSocketMonitoringController {
return ResponseEntity.ok(status);
}
/**
* 일일 항적 캐시 상태 조회
*/
@GetMapping("/daily-cache")
@Operation(summary = "일일 항적 캐시 현황", description = "일일 데이터 인메모리 캐시의 상태, 날짜별 선박 수, 메모리 사용량을 조회합니다")
public ResponseEntity<Map<String, Object>> getDailyCacheStatus() {
return ResponseEntity.ok(dailyTrackCacheManager.getCacheStatus());
}
/**
* WebSocket 테스트 페이지로 리다이렉트
*/