feat: 일일 데이터 인메모리 캐시 구현 (Phase 6)
- DailyTrackCacheManager: D-1~D-7 daily 테이블 데이터 인메모리 캐시 - @Async 비동기 워밍업 (서버 시작 차단 없음, 최근 우선 로드) - 뷰포트 필터링, 다중 날짜 병합 조회, 하이브리드 쿼리 분리 - 메모리 한도 체크 (기본 5GB), 날짜별 즉시 활성화 - DailyTrackCacheProperties: enabled, retentionDays, maxMemoryGb 설정 - DailyAggregationJobConfig: 배치 완료 시 캐시 자동 갱신 리스너 - ChunkedTrackStreamingService: daily 전략에서 캐시 우선 조회 + DB 폴백 - StompTrackStreamingService: 동일 캐시 우선 패턴 적용 - WebSocketMonitoringController: GET /api/websocket/daily-cache 엔드포인트 - application-prod.yml: cache.daily-track 설정 추가 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
부모
7bd7bf556e
커밋
03b14e687a
@ -1,9 +1,12 @@
|
||||
package gc.mda.signal_batch.batch.job;
|
||||
|
||||
import gc.mda.signal_batch.batch.listener.JobCompletionListener;
|
||||
import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.JobExecution;
|
||||
import org.springframework.batch.core.JobExecutionListener;
|
||||
import org.springframework.batch.core.JobParametersValidator;
|
||||
import org.springframework.batch.core.job.DefaultJobParametersValidator;
|
||||
import org.springframework.batch.core.job.builder.JobBuilder;
|
||||
@ -24,6 +27,7 @@ public class DailyAggregationJobConfig {
|
||||
private final JobRepository jobRepository;
|
||||
private final DailyAggregationStepConfig dailyAggregationStepConfig;
|
||||
private final JobCompletionListener jobCompletionListener;
|
||||
private final DailyTrackCacheManager dailyTrackCacheManager;
|
||||
|
||||
@Bean
|
||||
public Job dailyAggregationJob() {
|
||||
@ -31,12 +35,37 @@ public class DailyAggregationJobConfig {
|
||||
.incrementer(new RunIdIncrementer())
|
||||
.validator(dailyJobParametersValidator())
|
||||
.listener(jobCompletionListener)
|
||||
.listener(dailyCacheRefreshListener())
|
||||
.start(dailyAggregationStepConfig.mergeDailyTracksStep())
|
||||
.next(dailyAggregationStepConfig.gridDailySummaryStep())
|
||||
.next(dailyAggregationStepConfig.areaDailySummaryStep())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public JobExecutionListener dailyCacheRefreshListener() {
|
||||
return new JobExecutionListener() {
|
||||
@Override
|
||||
public void beforeJob(JobExecution jobExecution) {
|
||||
// no-op
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterJob(JobExecution jobExecution) {
|
||||
if (jobExecution.getStatus().isUnsuccessful()) {
|
||||
log.warn("Daily aggregation job failed, skipping cache refresh");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
log.info("Daily aggregation job completed, refreshing daily track cache");
|
||||
dailyTrackCacheManager.refreshAfterDailyJob();
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to refresh daily track cache after job: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Bean
|
||||
public JobParametersValidator dailyJobParametersValidator() {
|
||||
DefaultJobParametersValidator validator = new DefaultJobParametersValidator();
|
||||
|
||||
@ -0,0 +1,19 @@
|
||||
package gc.mda.signal_batch.global.config;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Data
|
||||
@Component
|
||||
@ConfigurationProperties(prefix = "cache.daily-track")
|
||||
public class DailyTrackCacheProperties {
|
||||
/** 캐시 활성화 여부 */
|
||||
private boolean enabled = false;
|
||||
/** 캐시 보관 일수 (오늘 제외, D-1 ~ D-N) */
|
||||
private int retentionDays = 7;
|
||||
/** 최대 메모리 사용량 (GB) */
|
||||
private int maxMemoryGb = 5;
|
||||
/** 비동기 워밍업 여부 */
|
||||
private boolean warmupAsync = true;
|
||||
}
|
||||
@ -0,0 +1,618 @@
|
||||
package gc.mda.signal_batch.global.websocket.service;
|
||||
|
||||
import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack;
|
||||
import gc.mda.signal_batch.global.config.DailyTrackCacheProperties;
|
||||
import gc.mda.signal_batch.global.util.ShipKindCodeConverter;
|
||||
import gc.mda.signal_batch.domain.vessel.service.IntegrationVesselService;
|
||||
import gc.mda.signal_batch.domain.vessel.dto.IntegrationVessel;
|
||||
import gc.mda.signal_batch.global.util.IntegrationSignalConstants;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.locationtech.jts.geom.Coordinate;
|
||||
import org.locationtech.jts.geom.LineString;
|
||||
import org.locationtech.jts.io.ParseException;
|
||||
import org.locationtech.jts.io.WKTReader;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.sql.*;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 일일(Daily) 항적 데이터 인메모리 캐시 매니저
|
||||
* <p>
|
||||
* 어제(D-1) ~ 7일 전(D-7)의 daily 테이블 데이터를 메모리에 캐시하여
|
||||
* DB 조회를 생략하고 즉시 응답. 오늘(D-0) 데이터는 항상 DB 조회.
|
||||
* <p>
|
||||
* 비동기 독립 실행: 캐시 로드는 별도 스레드에서 수행.
|
||||
* 스케줄러, REST API, WebSocket 응답 모두 캐시 로드 완료를 기다리지 않음.
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class DailyTrackCacheManager {
|
||||
|
||||
public enum CacheStatus {
|
||||
NOT_STARTED, LOADING, PARTIAL, READY, DISABLED
|
||||
}
|
||||
|
||||
private final DataSource queryDataSource;
|
||||
private final DailyTrackCacheProperties cacheProperties;
|
||||
private final IntegrationVesselService integrationVesselService;
|
||||
|
||||
// 날짜별 캐시 (D-1 ~ D-N)
|
||||
private final ConcurrentHashMap<LocalDate, DailyTrackData> cache = new ConcurrentHashMap<>();
|
||||
private final AtomicReference<CacheStatus> status = new AtomicReference<>(CacheStatus.NOT_STARTED);
|
||||
|
||||
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
// 스레드 로컬 WKTReader (thread-safe)
|
||||
private static final ThreadLocal<WKTReader> wktReaderLocal = ThreadLocal.withInitial(WKTReader::new);
|
||||
|
||||
public DailyTrackCacheManager(
|
||||
@Qualifier("queryDataSource") DataSource queryDataSource,
|
||||
DailyTrackCacheProperties cacheProperties,
|
||||
IntegrationVesselService integrationVesselService) {
|
||||
this.queryDataSource = queryDataSource;
|
||||
this.cacheProperties = cacheProperties;
|
||||
this.integrationVesselService = integrationVesselService;
|
||||
}
|
||||
|
||||
/**
|
||||
* 캐시된 날짜별 데이터
|
||||
*/
|
||||
public static class DailyTrackData {
|
||||
private final LocalDate date;
|
||||
private final Map<String, CompactVesselTrack> tracks; // key: "sigSrcCd_targetId"
|
||||
private final long loadedAtMillis;
|
||||
private final int vesselCount;
|
||||
private final long memorySizeBytes;
|
||||
|
||||
public DailyTrackData(LocalDate date, Map<String, CompactVesselTrack> tracks, long memorySizeBytes) {
|
||||
this.date = date;
|
||||
this.tracks = tracks;
|
||||
this.loadedAtMillis = System.currentTimeMillis();
|
||||
this.vesselCount = tracks.size();
|
||||
this.memorySizeBytes = memorySizeBytes;
|
||||
}
|
||||
|
||||
public LocalDate getDate() { return date; }
|
||||
public Map<String, CompactVesselTrack> getTracks() { return tracks; }
|
||||
public long getLoadedAtMillis() { return loadedAtMillis; }
|
||||
public int getVesselCount() { return vesselCount; }
|
||||
public long getMemorySizeBytes() { return memorySizeBytes; }
|
||||
}
|
||||
|
||||
/**
|
||||
* 쿼리 범위 분리 결과
|
||||
*/
|
||||
public static class SplitQueryResult {
|
||||
private final List<LocalDate> cachedDates; // 캐시에서 가져올 날짜
|
||||
private final List<DateRange> dbRanges; // DB 조회 필요 범위 (연속 날짜 묶음)
|
||||
private final DateRange todayRange; // 오늘 구간 (hourly/5min)
|
||||
|
||||
public SplitQueryResult(List<LocalDate> cachedDates, List<DateRange> dbRanges, DateRange todayRange) {
|
||||
this.cachedDates = cachedDates;
|
||||
this.dbRanges = dbRanges;
|
||||
this.todayRange = todayRange;
|
||||
}
|
||||
|
||||
public List<LocalDate> getCachedDates() { return cachedDates; }
|
||||
public List<DateRange> getDbRanges() { return dbRanges; }
|
||||
public DateRange getTodayRange() { return todayRange; }
|
||||
public boolean hasCachedData() { return !cachedDates.isEmpty(); }
|
||||
public boolean hasDbRanges() { return !dbRanges.isEmpty(); }
|
||||
public boolean hasTodayRange() { return todayRange != null; }
|
||||
}
|
||||
|
||||
public static class DateRange {
|
||||
private final LocalDateTime start;
|
||||
private final LocalDateTime end;
|
||||
|
||||
public DateRange(LocalDateTime start, LocalDateTime end) {
|
||||
this.start = start;
|
||||
this.end = end;
|
||||
}
|
||||
|
||||
public LocalDateTime getStart() { return start; }
|
||||
public LocalDateTime getEnd() { return end; }
|
||||
}
|
||||
|
||||
// ── 비동기 캐시 워밍업 ──
|
||||
|
||||
@EventListener(ApplicationReadyEvent.class)
|
||||
public void onApplicationReady() {
|
||||
if (!cacheProperties.isEnabled()) {
|
||||
status.set(CacheStatus.DISABLED);
|
||||
log.info("Daily track cache is disabled");
|
||||
return;
|
||||
}
|
||||
if (cacheProperties.isWarmupAsync()) {
|
||||
warmUpCacheAsync();
|
||||
} else {
|
||||
warmUpCache();
|
||||
}
|
||||
}
|
||||
|
||||
@Async("trackStreamingExecutor")
|
||||
public void warmUpCacheAsync() {
|
||||
warmUpCache();
|
||||
}
|
||||
|
||||
/**
|
||||
* 캐시 워밍업: D-1 → D-2 → ... → D-N 순서로 최근 우선 로드
|
||||
*/
|
||||
public void warmUpCache() {
|
||||
if (!cacheProperties.isEnabled()) {
|
||||
status.set(CacheStatus.DISABLED);
|
||||
return;
|
||||
}
|
||||
|
||||
status.set(CacheStatus.LOADING);
|
||||
log.info("Daily track cache warmup started (async): retentionDays={}, maxMemoryGb={}",
|
||||
cacheProperties.getRetentionDays(), cacheProperties.getMaxMemoryGb());
|
||||
|
||||
long totalStart = System.currentTimeMillis();
|
||||
long totalMemory = 0;
|
||||
long maxMemoryBytes = (long) cacheProperties.getMaxMemoryGb() * 1024 * 1024 * 1024;
|
||||
|
||||
LocalDate today = LocalDate.now();
|
||||
int loadedCount = 0;
|
||||
|
||||
for (int daysBack = 1; daysBack <= cacheProperties.getRetentionDays(); daysBack++) {
|
||||
LocalDate targetDate = today.minusDays(daysBack);
|
||||
try {
|
||||
long dateStart = System.currentTimeMillis();
|
||||
DailyTrackData data = loadDay(targetDate);
|
||||
|
||||
if (data != null && data.getVesselCount() > 0) {
|
||||
// 메모리 한도 체크
|
||||
if (totalMemory + data.getMemorySizeBytes() > maxMemoryBytes) {
|
||||
log.warn("Cache memory limit reached: {}GB / {}GB. Stopping at D-{}",
|
||||
totalMemory / (1024 * 1024 * 1024), cacheProperties.getMaxMemoryGb(), daysBack);
|
||||
break;
|
||||
}
|
||||
|
||||
cache.put(targetDate, data);
|
||||
totalMemory += data.getMemorySizeBytes();
|
||||
loadedCount++;
|
||||
|
||||
long elapsed = System.currentTimeMillis() - dateStart;
|
||||
log.info("Cached D-{} ({}): {} vessels, {} MB, {}ms",
|
||||
daysBack, targetDate, data.getVesselCount(),
|
||||
data.getMemorySizeBytes() / (1024 * 1024), elapsed);
|
||||
|
||||
// 부분 로드 시점에 PARTIAL 상태로 전환
|
||||
if (status.get() == CacheStatus.LOADING) {
|
||||
status.set(CacheStatus.PARTIAL);
|
||||
}
|
||||
} else {
|
||||
log.info("No daily data for D-{} ({})", daysBack, targetDate);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to load cache for D-{} ({}): {}", daysBack, targetDate, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
status.set(CacheStatus.READY);
|
||||
long totalElapsed = System.currentTimeMillis() - totalStart;
|
||||
log.info("Daily track cache warmup completed: {} days loaded, total {} MB, {}ms",
|
||||
loadedCount, totalMemory / (1024 * 1024), totalElapsed);
|
||||
}
|
||||
|
||||
/**
|
||||
* 특정 날짜의 daily 테이블 전체 로드 → CompactVesselTrack 변환
|
||||
*/
|
||||
public DailyTrackData loadDay(LocalDate date) {
|
||||
LocalDateTime dayStart = date.atStartOfDay();
|
||||
LocalDateTime dayEnd = date.plusDays(1).atStartOfDay();
|
||||
|
||||
String sql = "SELECT sig_src_cd, target_id, time_bucket, " +
|
||||
"public.ST_AsText(track_geom) as track_geom, " +
|
||||
"distance_nm, avg_speed, max_speed, point_count, " +
|
||||
"start_position->>'time' as start_time, " +
|
||||
"end_position->>'time' as end_time " +
|
||||
"FROM signal.t_vessel_tracks_daily " +
|
||||
"WHERE time_bucket >= ? AND time_bucket < ? " +
|
||||
"ORDER BY sig_src_cd, target_id";
|
||||
|
||||
Map<String, VesselAccumulator> vesselMap = new HashMap<>(50000);
|
||||
long estimatedMemory = 0;
|
||||
WKTReader wktReader = wktReaderLocal.get();
|
||||
|
||||
try (Connection conn = queryDataSource.getConnection();
|
||||
PreparedStatement ps = conn.prepareStatement(sql)) {
|
||||
|
||||
ps.setTimestamp(1, Timestamp.valueOf(dayStart));
|
||||
ps.setTimestamp(2, Timestamp.valueOf(dayEnd));
|
||||
ps.setFetchSize(10000);
|
||||
|
||||
try (ResultSet rs = ps.executeQuery()) {
|
||||
while (rs.next()) {
|
||||
String sigSrcCd = rs.getString("sig_src_cd");
|
||||
String targetId = rs.getString("target_id");
|
||||
String vesselId = sigSrcCd + "_" + targetId;
|
||||
|
||||
VesselAccumulator acc = vesselMap.computeIfAbsent(vesselId, k -> {
|
||||
VesselAccumulator a = new VesselAccumulator();
|
||||
a.sigSrcCd = sigSrcCd;
|
||||
a.targetId = targetId;
|
||||
return a;
|
||||
});
|
||||
|
||||
String trackGeomWkt = rs.getString("track_geom");
|
||||
Timestamp timeBucket = rs.getTimestamp("time_bucket");
|
||||
String startTimeStr = null;
|
||||
String endTimeStr = null;
|
||||
try { startTimeStr = rs.getString("start_time"); } catch (SQLException ignored) {}
|
||||
try { endTimeStr = rs.getString("end_time"); } catch (SQLException ignored) {}
|
||||
|
||||
double distanceNm = rs.getDouble("distance_nm");
|
||||
double maxSpeed = rs.getDouble("max_speed");
|
||||
acc.totalDistance += distanceNm;
|
||||
acc.maxSpeed = Math.max(acc.maxSpeed, maxSpeed);
|
||||
|
||||
if (trackGeomWkt != null && !trackGeomWkt.isEmpty() && !"LINESTRING EMPTY".equals(trackGeomWkt)) {
|
||||
try {
|
||||
LineString lineString = (LineString) wktReader.read(trackGeomWkt);
|
||||
if (lineString.getNumPoints() == 0) continue;
|
||||
|
||||
LocalDateTime baseTime = timeBucket.toLocalDateTime();
|
||||
if (startTimeStr != null && !startTimeStr.isEmpty()) {
|
||||
try {
|
||||
baseTime = LocalDateTime.parse(startTimeStr, TIMESTAMP_FORMATTER);
|
||||
} catch (Exception ignored) {}
|
||||
}
|
||||
|
||||
Coordinate[] coords = lineString.getCoordinates();
|
||||
for (Coordinate coord : coords) {
|
||||
acc.geometry.add(new double[]{coord.x, coord.y});
|
||||
// M 값이 있으면 타임스탬프로 사용
|
||||
if (!Double.isNaN(coord.getM())) {
|
||||
acc.timestamps.add(String.valueOf((long) coord.getM()));
|
||||
} else {
|
||||
acc.timestamps.add(String.valueOf(baseTime.toEpochSecond(java.time.ZoneOffset.of("+09:00"))));
|
||||
}
|
||||
// 속도 추산 (인접 좌표 간 거리/시간)
|
||||
acc.speeds.add(0.0);
|
||||
}
|
||||
acc.pointCount += coords.length;
|
||||
estimatedMemory += coords.length * 40L; // 좌표당 약 40바이트 추정
|
||||
} catch (ParseException e) {
|
||||
log.debug("Failed to parse track_geom for vessel {}: {}", vesselId, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to load daily data for {}: {}", date, e.getMessage());
|
||||
return null;
|
||||
}
|
||||
|
||||
if (vesselMap.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// VesselAccumulator → CompactVesselTrack 변환
|
||||
Map<String, CompactVesselTrack> tracks = new HashMap<>(vesselMap.size());
|
||||
for (Map.Entry<String, VesselAccumulator> entry : vesselMap.entrySet()) {
|
||||
VesselAccumulator acc = entry.getValue();
|
||||
if (acc.geometry.isEmpty()) continue;
|
||||
|
||||
double avgSpeed = acc.pointCount > 0 ? acc.totalDistance / Math.max(1, acc.pointCount) * 60 : 0;
|
||||
|
||||
// shipKindCode 계산
|
||||
String shipKindCode = ShipKindCodeConverter.getShipKindCodeWithNamePattern(
|
||||
acc.sigSrcCd, null, null, acc.targetId);
|
||||
|
||||
// 통합선박 ID 조회
|
||||
String integrationTargetId = null;
|
||||
try {
|
||||
IntegrationVessel iv = integrationVesselService.findByVessel(acc.sigSrcCd, acc.targetId);
|
||||
if (iv != null) {
|
||||
integrationTargetId = iv.generateIntegrationId();
|
||||
}
|
||||
} catch (Exception ignored) {}
|
||||
|
||||
CompactVesselTrack track = CompactVesselTrack.builder()
|
||||
.vesselId(entry.getKey())
|
||||
.sigSrcCd(acc.sigSrcCd)
|
||||
.targetId(acc.targetId)
|
||||
.shipKindCode(shipKindCode)
|
||||
.integrationTargetId(integrationTargetId)
|
||||
.geometry(acc.geometry)
|
||||
.timestamps(acc.timestamps)
|
||||
.speeds(acc.speeds)
|
||||
.totalDistance(acc.totalDistance)
|
||||
.avgSpeed(avgSpeed)
|
||||
.maxSpeed(acc.maxSpeed)
|
||||
.pointCount(acc.pointCount)
|
||||
.build();
|
||||
|
||||
tracks.put(entry.getKey(), track);
|
||||
}
|
||||
|
||||
estimatedMemory += tracks.size() * 200L; // 객체 오버헤드
|
||||
return new DailyTrackData(date, tracks, estimatedMemory);
|
||||
}
|
||||
|
||||
// ── 캐시 조회 API ──
|
||||
|
||||
/**
|
||||
* 특정 날짜가 캐시되어 있는지 확인
|
||||
*/
|
||||
public boolean isCached(LocalDate date) {
|
||||
return cache.containsKey(date);
|
||||
}
|
||||
|
||||
/**
|
||||
* 캐시에서 특정 날짜의 전체 항적 조회
|
||||
*/
|
||||
public List<CompactVesselTrack> getCachedTracks(LocalDate date) {
|
||||
DailyTrackData data = cache.get(date);
|
||||
if (data == null) return Collections.emptyList();
|
||||
return new ArrayList<>(data.getTracks().values());
|
||||
}
|
||||
|
||||
/**
|
||||
* 캐시에서 특정 날짜의 항적을 뷰포트로 필터링하여 조회
|
||||
*/
|
||||
public List<CompactVesselTrack> getCachedTracks(LocalDate date, double minLon, double minLat, double maxLon, double maxLat) {
|
||||
DailyTrackData data = cache.get(date);
|
||||
if (data == null) return Collections.emptyList();
|
||||
|
||||
return data.getTracks().values().stream()
|
||||
.filter(track -> isInViewport(track, minLon, minLat, maxLon, maxLat))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* 여러 날짜의 캐시 데이터를 vessel 기준으로 병합
|
||||
*/
|
||||
public List<CompactVesselTrack> getCachedTracksMultipleDays(List<LocalDate> dates) {
|
||||
Map<String, CompactVesselTrack.CompactVesselTrackBuilder> merged = new HashMap<>();
|
||||
|
||||
for (LocalDate date : dates) {
|
||||
DailyTrackData data = cache.get(date);
|
||||
if (data == null) continue;
|
||||
|
||||
for (Map.Entry<String, CompactVesselTrack> entry : data.getTracks().entrySet()) {
|
||||
String vesselId = entry.getKey();
|
||||
CompactVesselTrack track = entry.getValue();
|
||||
|
||||
CompactVesselTrack.CompactVesselTrackBuilder builder = merged.get(vesselId);
|
||||
if (builder == null) {
|
||||
// 첫 번째 날짜: 빌더 생성
|
||||
builder = CompactVesselTrack.builder()
|
||||
.vesselId(vesselId)
|
||||
.sigSrcCd(track.getSigSrcCd())
|
||||
.targetId(track.getTargetId())
|
||||
.nationalCode(track.getNationalCode())
|
||||
.shipName(track.getShipName())
|
||||
.shipType(track.getShipType())
|
||||
.shipKindCode(track.getShipKindCode())
|
||||
.integrationTargetId(track.getIntegrationTargetId())
|
||||
.geometry(new ArrayList<>(track.getGeometry()))
|
||||
.timestamps(new ArrayList<>(track.getTimestamps()))
|
||||
.speeds(new ArrayList<>(track.getSpeeds()))
|
||||
.totalDistance(track.getTotalDistance())
|
||||
.avgSpeed(track.getAvgSpeed())
|
||||
.maxSpeed(track.getMaxSpeed())
|
||||
.pointCount(track.getPointCount());
|
||||
merged.put(vesselId, builder);
|
||||
} else {
|
||||
// 후속 날짜: 기존 빌더의 데이터가 이미 build 전이므로
|
||||
// 별도 AccumulatorTrack으로 처리
|
||||
CompactVesselTrack existing = builder.build();
|
||||
List<double[]> geo = new ArrayList<>(existing.getGeometry());
|
||||
geo.addAll(track.getGeometry());
|
||||
List<String> ts = new ArrayList<>(existing.getTimestamps());
|
||||
ts.addAll(track.getTimestamps());
|
||||
List<Double> sp = new ArrayList<>(existing.getSpeeds());
|
||||
sp.addAll(track.getSpeeds());
|
||||
|
||||
builder.geometry(geo)
|
||||
.timestamps(ts)
|
||||
.speeds(sp)
|
||||
.totalDistance(existing.getTotalDistance() + track.getTotalDistance())
|
||||
.maxSpeed(Math.max(existing.getMaxSpeed(), track.getMaxSpeed()))
|
||||
.pointCount(existing.getPointCount() + track.getPointCount());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return merged.values().stream()
|
||||
.map(CompactVesselTrack.CompactVesselTrackBuilder::build)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* 요청 범위를 캐시 구간 / DB 구간으로 분리
|
||||
*/
|
||||
public SplitQueryResult splitQueryRange(LocalDateTime startTime, LocalDateTime endTime) {
|
||||
LocalDate today = LocalDate.now();
|
||||
List<LocalDate> cachedDates = new ArrayList<>();
|
||||
List<LocalDate> dbDates = new ArrayList<>();
|
||||
DateRange todayRange = null;
|
||||
|
||||
// 요청 범위의 날짜별 분류
|
||||
LocalDate startDate = startTime.toLocalDate();
|
||||
LocalDate endDate = endTime.toLocalDate();
|
||||
|
||||
for (LocalDate d = startDate; !d.isAfter(endDate); d = d.plusDays(1)) {
|
||||
if (d.equals(today)) {
|
||||
// 오늘 → hourly/5min 테이블 조회
|
||||
LocalDateTime todayStart = today.atStartOfDay();
|
||||
LocalDateTime todayEnd = endTime.isAfter(LocalDateTime.now()) ? LocalDateTime.now() : endTime;
|
||||
if (todayStart.isBefore(startTime)) todayStart = startTime;
|
||||
if (todayEnd.isAfter(todayStart)) {
|
||||
todayRange = new DateRange(todayStart, todayEnd);
|
||||
}
|
||||
} else if (d.isAfter(today)) {
|
||||
// 미래 날짜 → 무시
|
||||
continue;
|
||||
} else if (isCached(d)) {
|
||||
cachedDates.add(d);
|
||||
} else {
|
||||
dbDates.add(d);
|
||||
}
|
||||
}
|
||||
|
||||
// DB 조회 필요 날짜를 연속 범위로 묶기
|
||||
List<DateRange> dbRanges = mergeConsecutiveDates(dbDates, startTime, endTime);
|
||||
|
||||
return new SplitQueryResult(cachedDates, dbRanges, todayRange);
|
||||
}
|
||||
|
||||
/**
|
||||
* 연속된 날짜들을 DateRange로 묶기
|
||||
*/
|
||||
private List<DateRange> mergeConsecutiveDates(List<LocalDate> dates, LocalDateTime reqStart, LocalDateTime reqEnd) {
|
||||
if (dates.isEmpty()) return Collections.emptyList();
|
||||
|
||||
Collections.sort(dates);
|
||||
List<DateRange> ranges = new ArrayList<>();
|
||||
LocalDate rangeStart = dates.get(0);
|
||||
LocalDate rangeLast = dates.get(0);
|
||||
|
||||
for (int i = 1; i < dates.size(); i++) {
|
||||
LocalDate d = dates.get(i);
|
||||
if (d.equals(rangeLast.plusDays(1))) {
|
||||
rangeLast = d;
|
||||
} else {
|
||||
ranges.add(toDateRange(rangeStart, rangeLast, reqStart, reqEnd));
|
||||
rangeStart = d;
|
||||
rangeLast = d;
|
||||
}
|
||||
}
|
||||
ranges.add(toDateRange(rangeStart, rangeLast, reqStart, reqEnd));
|
||||
return ranges;
|
||||
}
|
||||
|
||||
private DateRange toDateRange(LocalDate start, LocalDate end, LocalDateTime reqStart, LocalDateTime reqEnd) {
|
||||
LocalDateTime s = start.atStartOfDay();
|
||||
LocalDateTime e = end.plusDays(1).atStartOfDay();
|
||||
// 요청 범위로 클리핑
|
||||
if (s.isBefore(reqStart)) s = reqStart;
|
||||
if (e.isAfter(reqEnd)) e = reqEnd;
|
||||
return new DateRange(s, e);
|
||||
}
|
||||
|
||||
// ── 캐시 갱신 ──
|
||||
|
||||
/**
|
||||
* 일일 배치 완료 후 캐시 갱신: 전날 로드 + 만료 데이터 제거
|
||||
*/
|
||||
public void refreshAfterDailyJob() {
|
||||
if (!cacheProperties.isEnabled()) return;
|
||||
|
||||
LocalDate today = LocalDate.now();
|
||||
LocalDate yesterday = today.minusDays(1);
|
||||
|
||||
log.info("Refreshing daily track cache after daily job: loading {}", yesterday);
|
||||
|
||||
// 전날 데이터 (재)로드
|
||||
try {
|
||||
DailyTrackData data = loadDay(yesterday);
|
||||
if (data != null && data.getVesselCount() > 0) {
|
||||
cache.put(yesterday, data);
|
||||
log.info("Cache refreshed for {}: {} vessels, {} MB",
|
||||
yesterday, data.getVesselCount(), data.getMemorySizeBytes() / (1024 * 1024));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to refresh cache for {}: {}", yesterday, e.getMessage());
|
||||
}
|
||||
|
||||
// 보관 기간 초과 데이터 제거
|
||||
LocalDate oldestAllowed = today.minusDays(cacheProperties.getRetentionDays());
|
||||
List<LocalDate> toRemove = cache.keySet().stream()
|
||||
.filter(d -> d.isBefore(oldestAllowed))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
for (LocalDate d : toRemove) {
|
||||
DailyTrackData removed = cache.remove(d);
|
||||
if (removed != null) {
|
||||
log.info("Evicted cache for {}: {} vessels, {} MB",
|
||||
d, removed.getVesselCount(), removed.getMemorySizeBytes() / (1024 * 1024));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── 모니터링 ──
|
||||
|
||||
/**
|
||||
* 캐시 상태 정보 (모니터링용)
|
||||
*/
|
||||
public Map<String, Object> getCacheStatus() {
|
||||
Map<String, Object> info = new LinkedHashMap<>();
|
||||
info.put("status", status.get().name());
|
||||
info.put("enabled", cacheProperties.isEnabled());
|
||||
info.put("retentionDays", cacheProperties.getRetentionDays());
|
||||
info.put("maxMemoryGb", cacheProperties.getMaxMemoryGb());
|
||||
info.put("cachedDays", cache.size());
|
||||
|
||||
long totalMemory = 0;
|
||||
int totalVessels = 0;
|
||||
List<Map<String, Object>> dayDetails = new ArrayList<>();
|
||||
|
||||
for (Map.Entry<LocalDate, DailyTrackData> entry : cache.entrySet()) {
|
||||
DailyTrackData data = entry.getValue();
|
||||
totalMemory += data.getMemorySizeBytes();
|
||||
totalVessels += data.getVesselCount();
|
||||
|
||||
Map<String, Object> dayInfo = new LinkedHashMap<>();
|
||||
dayInfo.put("date", entry.getKey().toString());
|
||||
dayInfo.put("vesselCount", data.getVesselCount());
|
||||
dayInfo.put("memorySizeMb", data.getMemorySizeBytes() / (1024 * 1024));
|
||||
dayInfo.put("loadedAt", new java.util.Date(data.getLoadedAtMillis()).toString());
|
||||
dayDetails.add(dayInfo);
|
||||
}
|
||||
|
||||
info.put("totalVessels", totalVessels);
|
||||
info.put("totalMemoryMb", totalMemory / (1024 * 1024));
|
||||
info.put("days", dayDetails);
|
||||
return info;
|
||||
}
|
||||
|
||||
public CacheStatus getStatus() {
|
||||
return status.get();
|
||||
}
|
||||
|
||||
public boolean isEnabled() {
|
||||
return cacheProperties.isEnabled();
|
||||
}
|
||||
|
||||
// ── 내부 유틸 ──
|
||||
|
||||
private boolean isInViewport(CompactVesselTrack track, double minLon, double minLat, double maxLon, double maxLat) {
|
||||
if (track.getGeometry() == null || track.getGeometry().isEmpty()) return false;
|
||||
for (double[] coord : track.getGeometry()) {
|
||||
if (coord[0] >= minLon && coord[0] <= maxLon && coord[1] >= minLat && coord[1] <= maxLat) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 선박 데이터 누적용 내부 클래스
|
||||
*/
|
||||
private static class VesselAccumulator {
|
||||
String sigSrcCd;
|
||||
String targetId;
|
||||
List<double[]> geometry = new ArrayList<>(500);
|
||||
List<String> timestamps = new ArrayList<>(500);
|
||||
List<Double> speeds = new ArrayList<>(500);
|
||||
double totalDistance = 0;
|
||||
double maxSpeed = 0;
|
||||
int pointCount = 0;
|
||||
}
|
||||
}
|
||||
@ -2,6 +2,7 @@ package gc.mda.signal_batch.monitoring.controller;
|
||||
|
||||
import gc.mda.signal_batch.monitoring.service.TrackStreamingMetrics;
|
||||
import gc.mda.signal_batch.global.websocket.service.ActiveQueryManager;
|
||||
import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager;
|
||||
import gc.mda.signal_batch.global.websocket.service.StompTrackStreamingService;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.Parameter;
|
||||
@ -27,6 +28,7 @@ public class WebSocketMonitoringController {
|
||||
private final TrackStreamingMetrics trackStreamingMetrics;
|
||||
private final StompTrackStreamingService trackStreamingService;
|
||||
private final ActiveQueryManager activeQueryManager;
|
||||
private final DailyTrackCacheManager dailyTrackCacheManager;
|
||||
|
||||
/**
|
||||
* WebSocket 스트리밍 현황 조회
|
||||
@ -115,6 +117,15 @@ public class WebSocketMonitoringController {
|
||||
return ResponseEntity.ok(status);
|
||||
}
|
||||
|
||||
/**
|
||||
* 일일 항적 캐시 상태 조회
|
||||
*/
|
||||
@GetMapping("/daily-cache")
|
||||
@Operation(summary = "일일 항적 캐시 현황", description = "일일 데이터 인메모리 캐시의 상태, 날짜별 선박 수, 메모리 사용량을 조회합니다")
|
||||
public ResponseEntity<Map<String, Object>> getDailyCacheStatus() {
|
||||
return ResponseEntity.ok(dailyTrackCacheManager.getCacheStatus());
|
||||
}
|
||||
|
||||
/**
|
||||
* WebSocket 테스트 페이지로 리다이렉트
|
||||
*/
|
||||
|
||||
불러오는 중...
Reference in New Issue
Block a user