diff --git a/src/main/java/gc/mda/signal_batch/batch/job/DailyAggregationJobConfig.java b/src/main/java/gc/mda/signal_batch/batch/job/DailyAggregationJobConfig.java index b21cf9f..3f29436 100644 --- a/src/main/java/gc/mda/signal_batch/batch/job/DailyAggregationJobConfig.java +++ b/src/main/java/gc/mda/signal_batch/batch/job/DailyAggregationJobConfig.java @@ -1,9 +1,12 @@ package gc.mda.signal_batch.batch.job; import gc.mda.signal_batch.batch.listener.JobCompletionListener; +import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobExecutionListener; import org.springframework.batch.core.JobParametersValidator; import org.springframework.batch.core.job.DefaultJobParametersValidator; import org.springframework.batch.core.job.builder.JobBuilder; @@ -20,22 +23,48 @@ import org.springframework.context.annotation.Profile; @RequiredArgsConstructor @ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true) public class DailyAggregationJobConfig { - + private final JobRepository jobRepository; private final DailyAggregationStepConfig dailyAggregationStepConfig; private final JobCompletionListener jobCompletionListener; - + private final DailyTrackCacheManager dailyTrackCacheManager; + @Bean public Job dailyAggregationJob() { return new JobBuilder("dailyAggregationJob", jobRepository) .incrementer(new RunIdIncrementer()) .validator(dailyJobParametersValidator()) .listener(jobCompletionListener) + .listener(dailyCacheRefreshListener()) .start(dailyAggregationStepConfig.mergeDailyTracksStep()) .next(dailyAggregationStepConfig.gridDailySummaryStep()) .next(dailyAggregationStepConfig.areaDailySummaryStep()) .build(); } + + @Bean + public JobExecutionListener dailyCacheRefreshListener() { + return new JobExecutionListener() { + @Override + public void beforeJob(JobExecution jobExecution) { + // no-op + } + + @Override + public void afterJob(JobExecution jobExecution) { + if (jobExecution.getStatus().isUnsuccessful()) { + log.warn("Daily aggregation job failed, skipping cache refresh"); + return; + } + try { + log.info("Daily aggregation job completed, refreshing daily track cache"); + dailyTrackCacheManager.refreshAfterDailyJob(); + } catch (Exception e) { + log.error("Failed to refresh daily track cache after job: {}", e.getMessage()); + } + } + }; + } @Bean public JobParametersValidator dailyJobParametersValidator() { diff --git a/src/main/java/gc/mda/signal_batch/global/config/DailyTrackCacheProperties.java b/src/main/java/gc/mda/signal_batch/global/config/DailyTrackCacheProperties.java new file mode 100644 index 0000000..b64c094 --- /dev/null +++ b/src/main/java/gc/mda/signal_batch/global/config/DailyTrackCacheProperties.java @@ -0,0 +1,19 @@ +package gc.mda.signal_batch.global.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Data +@Component +@ConfigurationProperties(prefix = "cache.daily-track") +public class DailyTrackCacheProperties { + /** 캐시 활성화 여부 */ + private boolean enabled = false; + /** 캐시 보관 일수 (오늘 제외, D-1 ~ D-N) */ + private int retentionDays = 7; + /** 최대 메모리 사용량 (GB) */ + private int maxMemoryGb = 5; + /** 비동기 워밍업 여부 */ + private boolean warmupAsync = true; +} diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/DailyTrackCacheManager.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/DailyTrackCacheManager.java new file mode 100644 index 0000000..e4ef65f --- /dev/null +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/DailyTrackCacheManager.java @@ -0,0 +1,618 @@ +package gc.mda.signal_batch.global.websocket.service; + +import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack; +import gc.mda.signal_batch.global.config.DailyTrackCacheProperties; +import gc.mda.signal_batch.global.util.ShipKindCodeConverter; +import gc.mda.signal_batch.domain.vessel.service.IntegrationVesselService; +import gc.mda.signal_batch.domain.vessel.dto.IntegrationVessel; +import gc.mda.signal_batch.global.util.IntegrationSignalConstants; +import lombok.extern.slf4j.Slf4j; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.LineString; +import org.locationtech.jts.io.ParseException; +import org.locationtech.jts.io.WKTReader; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import javax.sql.DataSource; +import java.sql.*; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * 일일(Daily) 항적 데이터 인메모리 캐시 매니저 + *

+ * 어제(D-1) ~ 7일 전(D-7)의 daily 테이블 데이터를 메모리에 캐시하여 + * DB 조회를 생략하고 즉시 응답. 오늘(D-0) 데이터는 항상 DB 조회. + *

+ * 비동기 독립 실행: 캐시 로드는 별도 스레드에서 수행. + * 스케줄러, REST API, WebSocket 응답 모두 캐시 로드 완료를 기다리지 않음. + */ +@Slf4j +@Service +public class DailyTrackCacheManager { + + public enum CacheStatus { + NOT_STARTED, LOADING, PARTIAL, READY, DISABLED + } + + private final DataSource queryDataSource; + private final DailyTrackCacheProperties cacheProperties; + private final IntegrationVesselService integrationVesselService; + + // 날짜별 캐시 (D-1 ~ D-N) + private final ConcurrentHashMap cache = new ConcurrentHashMap<>(); + private final AtomicReference status = new AtomicReference<>(CacheStatus.NOT_STARTED); + + private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + // 스레드 로컬 WKTReader (thread-safe) + private static final ThreadLocal wktReaderLocal = ThreadLocal.withInitial(WKTReader::new); + + public DailyTrackCacheManager( + @Qualifier("queryDataSource") DataSource queryDataSource, + DailyTrackCacheProperties cacheProperties, + IntegrationVesselService integrationVesselService) { + this.queryDataSource = queryDataSource; + this.cacheProperties = cacheProperties; + this.integrationVesselService = integrationVesselService; + } + + /** + * 캐시된 날짜별 데이터 + */ + public static class DailyTrackData { + private final LocalDate date; + private final Map tracks; // key: "sigSrcCd_targetId" + private final long loadedAtMillis; + private final int vesselCount; + private final long memorySizeBytes; + + public DailyTrackData(LocalDate date, Map tracks, long memorySizeBytes) { + this.date = date; + this.tracks = tracks; + this.loadedAtMillis = System.currentTimeMillis(); + this.vesselCount = tracks.size(); + this.memorySizeBytes = memorySizeBytes; + } + + public LocalDate getDate() { return date; } + public Map getTracks() { return tracks; } + public long getLoadedAtMillis() { return loadedAtMillis; } + public int getVesselCount() { return vesselCount; } + public long getMemorySizeBytes() { return memorySizeBytes; } + } + + /** + * 쿼리 범위 분리 결과 + */ + public static class SplitQueryResult { + private final List cachedDates; // 캐시에서 가져올 날짜 + private final List dbRanges; // DB 조회 필요 범위 (연속 날짜 묶음) + private final DateRange todayRange; // 오늘 구간 (hourly/5min) + + public SplitQueryResult(List cachedDates, List dbRanges, DateRange todayRange) { + this.cachedDates = cachedDates; + this.dbRanges = dbRanges; + this.todayRange = todayRange; + } + + public List getCachedDates() { return cachedDates; } + public List getDbRanges() { return dbRanges; } + public DateRange getTodayRange() { return todayRange; } + public boolean hasCachedData() { return !cachedDates.isEmpty(); } + public boolean hasDbRanges() { return !dbRanges.isEmpty(); } + public boolean hasTodayRange() { return todayRange != null; } + } + + public static class DateRange { + private final LocalDateTime start; + private final LocalDateTime end; + + public DateRange(LocalDateTime start, LocalDateTime end) { + this.start = start; + this.end = end; + } + + public LocalDateTime getStart() { return start; } + public LocalDateTime getEnd() { return end; } + } + + // ── 비동기 캐시 워밍업 ── + + @EventListener(ApplicationReadyEvent.class) + public void onApplicationReady() { + if (!cacheProperties.isEnabled()) { + status.set(CacheStatus.DISABLED); + log.info("Daily track cache is disabled"); + return; + } + if (cacheProperties.isWarmupAsync()) { + warmUpCacheAsync(); + } else { + warmUpCache(); + } + } + + @Async("trackStreamingExecutor") + public void warmUpCacheAsync() { + warmUpCache(); + } + + /** + * 캐시 워밍업: D-1 → D-2 → ... → D-N 순서로 최근 우선 로드 + */ + public void warmUpCache() { + if (!cacheProperties.isEnabled()) { + status.set(CacheStatus.DISABLED); + return; + } + + status.set(CacheStatus.LOADING); + log.info("Daily track cache warmup started (async): retentionDays={}, maxMemoryGb={}", + cacheProperties.getRetentionDays(), cacheProperties.getMaxMemoryGb()); + + long totalStart = System.currentTimeMillis(); + long totalMemory = 0; + long maxMemoryBytes = (long) cacheProperties.getMaxMemoryGb() * 1024 * 1024 * 1024; + + LocalDate today = LocalDate.now(); + int loadedCount = 0; + + for (int daysBack = 1; daysBack <= cacheProperties.getRetentionDays(); daysBack++) { + LocalDate targetDate = today.minusDays(daysBack); + try { + long dateStart = System.currentTimeMillis(); + DailyTrackData data = loadDay(targetDate); + + if (data != null && data.getVesselCount() > 0) { + // 메모리 한도 체크 + if (totalMemory + data.getMemorySizeBytes() > maxMemoryBytes) { + log.warn("Cache memory limit reached: {}GB / {}GB. Stopping at D-{}", + totalMemory / (1024 * 1024 * 1024), cacheProperties.getMaxMemoryGb(), daysBack); + break; + } + + cache.put(targetDate, data); + totalMemory += data.getMemorySizeBytes(); + loadedCount++; + + long elapsed = System.currentTimeMillis() - dateStart; + log.info("Cached D-{} ({}): {} vessels, {} MB, {}ms", + daysBack, targetDate, data.getVesselCount(), + data.getMemorySizeBytes() / (1024 * 1024), elapsed); + + // 부분 로드 시점에 PARTIAL 상태로 전환 + if (status.get() == CacheStatus.LOADING) { + status.set(CacheStatus.PARTIAL); + } + } else { + log.info("No daily data for D-{} ({})", daysBack, targetDate); + } + } catch (Exception e) { + log.error("Failed to load cache for D-{} ({}): {}", daysBack, targetDate, e.getMessage()); + } + } + + status.set(CacheStatus.READY); + long totalElapsed = System.currentTimeMillis() - totalStart; + log.info("Daily track cache warmup completed: {} days loaded, total {} MB, {}ms", + loadedCount, totalMemory / (1024 * 1024), totalElapsed); + } + + /** + * 특정 날짜의 daily 테이블 전체 로드 → CompactVesselTrack 변환 + */ + public DailyTrackData loadDay(LocalDate date) { + LocalDateTime dayStart = date.atStartOfDay(); + LocalDateTime dayEnd = date.plusDays(1).atStartOfDay(); + + String sql = "SELECT sig_src_cd, target_id, time_bucket, " + + "public.ST_AsText(track_geom) as track_geom, " + + "distance_nm, avg_speed, max_speed, point_count, " + + "start_position->>'time' as start_time, " + + "end_position->>'time' as end_time " + + "FROM signal.t_vessel_tracks_daily " + + "WHERE time_bucket >= ? AND time_bucket < ? " + + "ORDER BY sig_src_cd, target_id"; + + Map vesselMap = new HashMap<>(50000); + long estimatedMemory = 0; + WKTReader wktReader = wktReaderLocal.get(); + + try (Connection conn = queryDataSource.getConnection(); + PreparedStatement ps = conn.prepareStatement(sql)) { + + ps.setTimestamp(1, Timestamp.valueOf(dayStart)); + ps.setTimestamp(2, Timestamp.valueOf(dayEnd)); + ps.setFetchSize(10000); + + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + String sigSrcCd = rs.getString("sig_src_cd"); + String targetId = rs.getString("target_id"); + String vesselId = sigSrcCd + "_" + targetId; + + VesselAccumulator acc = vesselMap.computeIfAbsent(vesselId, k -> { + VesselAccumulator a = new VesselAccumulator(); + a.sigSrcCd = sigSrcCd; + a.targetId = targetId; + return a; + }); + + String trackGeomWkt = rs.getString("track_geom"); + Timestamp timeBucket = rs.getTimestamp("time_bucket"); + String startTimeStr = null; + String endTimeStr = null; + try { startTimeStr = rs.getString("start_time"); } catch (SQLException ignored) {} + try { endTimeStr = rs.getString("end_time"); } catch (SQLException ignored) {} + + double distanceNm = rs.getDouble("distance_nm"); + double maxSpeed = rs.getDouble("max_speed"); + acc.totalDistance += distanceNm; + acc.maxSpeed = Math.max(acc.maxSpeed, maxSpeed); + + if (trackGeomWkt != null && !trackGeomWkt.isEmpty() && !"LINESTRING EMPTY".equals(trackGeomWkt)) { + try { + LineString lineString = (LineString) wktReader.read(trackGeomWkt); + if (lineString.getNumPoints() == 0) continue; + + LocalDateTime baseTime = timeBucket.toLocalDateTime(); + if (startTimeStr != null && !startTimeStr.isEmpty()) { + try { + baseTime = LocalDateTime.parse(startTimeStr, TIMESTAMP_FORMATTER); + } catch (Exception ignored) {} + } + + Coordinate[] coords = lineString.getCoordinates(); + for (Coordinate coord : coords) { + acc.geometry.add(new double[]{coord.x, coord.y}); + // M 값이 있으면 타임스탬프로 사용 + if (!Double.isNaN(coord.getM())) { + acc.timestamps.add(String.valueOf((long) coord.getM())); + } else { + acc.timestamps.add(String.valueOf(baseTime.toEpochSecond(java.time.ZoneOffset.of("+09:00")))); + } + // 속도 추산 (인접 좌표 간 거리/시간) + acc.speeds.add(0.0); + } + acc.pointCount += coords.length; + estimatedMemory += coords.length * 40L; // 좌표당 약 40바이트 추정 + } catch (ParseException e) { + log.debug("Failed to parse track_geom for vessel {}: {}", vesselId, e.getMessage()); + } + } + } + } + } catch (Exception e) { + log.error("Failed to load daily data for {}: {}", date, e.getMessage()); + return null; + } + + if (vesselMap.isEmpty()) { + return null; + } + + // VesselAccumulator → CompactVesselTrack 변환 + Map tracks = new HashMap<>(vesselMap.size()); + for (Map.Entry entry : vesselMap.entrySet()) { + VesselAccumulator acc = entry.getValue(); + if (acc.geometry.isEmpty()) continue; + + double avgSpeed = acc.pointCount > 0 ? acc.totalDistance / Math.max(1, acc.pointCount) * 60 : 0; + + // shipKindCode 계산 + String shipKindCode = ShipKindCodeConverter.getShipKindCodeWithNamePattern( + acc.sigSrcCd, null, null, acc.targetId); + + // 통합선박 ID 조회 + String integrationTargetId = null; + try { + IntegrationVessel iv = integrationVesselService.findByVessel(acc.sigSrcCd, acc.targetId); + if (iv != null) { + integrationTargetId = iv.generateIntegrationId(); + } + } catch (Exception ignored) {} + + CompactVesselTrack track = CompactVesselTrack.builder() + .vesselId(entry.getKey()) + .sigSrcCd(acc.sigSrcCd) + .targetId(acc.targetId) + .shipKindCode(shipKindCode) + .integrationTargetId(integrationTargetId) + .geometry(acc.geometry) + .timestamps(acc.timestamps) + .speeds(acc.speeds) + .totalDistance(acc.totalDistance) + .avgSpeed(avgSpeed) + .maxSpeed(acc.maxSpeed) + .pointCount(acc.pointCount) + .build(); + + tracks.put(entry.getKey(), track); + } + + estimatedMemory += tracks.size() * 200L; // 객체 오버헤드 + return new DailyTrackData(date, tracks, estimatedMemory); + } + + // ── 캐시 조회 API ── + + /** + * 특정 날짜가 캐시되어 있는지 확인 + */ + public boolean isCached(LocalDate date) { + return cache.containsKey(date); + } + + /** + * 캐시에서 특정 날짜의 전체 항적 조회 + */ + public List getCachedTracks(LocalDate date) { + DailyTrackData data = cache.get(date); + if (data == null) return Collections.emptyList(); + return new ArrayList<>(data.getTracks().values()); + } + + /** + * 캐시에서 특정 날짜의 항적을 뷰포트로 필터링하여 조회 + */ + public List getCachedTracks(LocalDate date, double minLon, double minLat, double maxLon, double maxLat) { + DailyTrackData data = cache.get(date); + if (data == null) return Collections.emptyList(); + + return data.getTracks().values().stream() + .filter(track -> isInViewport(track, minLon, minLat, maxLon, maxLat)) + .collect(Collectors.toList()); + } + + /** + * 여러 날짜의 캐시 데이터를 vessel 기준으로 병합 + */ + public List getCachedTracksMultipleDays(List dates) { + Map merged = new HashMap<>(); + + for (LocalDate date : dates) { + DailyTrackData data = cache.get(date); + if (data == null) continue; + + for (Map.Entry entry : data.getTracks().entrySet()) { + String vesselId = entry.getKey(); + CompactVesselTrack track = entry.getValue(); + + CompactVesselTrack.CompactVesselTrackBuilder builder = merged.get(vesselId); + if (builder == null) { + // 첫 번째 날짜: 빌더 생성 + builder = CompactVesselTrack.builder() + .vesselId(vesselId) + .sigSrcCd(track.getSigSrcCd()) + .targetId(track.getTargetId()) + .nationalCode(track.getNationalCode()) + .shipName(track.getShipName()) + .shipType(track.getShipType()) + .shipKindCode(track.getShipKindCode()) + .integrationTargetId(track.getIntegrationTargetId()) + .geometry(new ArrayList<>(track.getGeometry())) + .timestamps(new ArrayList<>(track.getTimestamps())) + .speeds(new ArrayList<>(track.getSpeeds())) + .totalDistance(track.getTotalDistance()) + .avgSpeed(track.getAvgSpeed()) + .maxSpeed(track.getMaxSpeed()) + .pointCount(track.getPointCount()); + merged.put(vesselId, builder); + } else { + // 후속 날짜: 기존 빌더의 데이터가 이미 build 전이므로 + // 별도 AccumulatorTrack으로 처리 + CompactVesselTrack existing = builder.build(); + List geo = new ArrayList<>(existing.getGeometry()); + geo.addAll(track.getGeometry()); + List ts = new ArrayList<>(existing.getTimestamps()); + ts.addAll(track.getTimestamps()); + List sp = new ArrayList<>(existing.getSpeeds()); + sp.addAll(track.getSpeeds()); + + builder.geometry(geo) + .timestamps(ts) + .speeds(sp) + .totalDistance(existing.getTotalDistance() + track.getTotalDistance()) + .maxSpeed(Math.max(existing.getMaxSpeed(), track.getMaxSpeed())) + .pointCount(existing.getPointCount() + track.getPointCount()); + } + } + } + + return merged.values().stream() + .map(CompactVesselTrack.CompactVesselTrackBuilder::build) + .collect(Collectors.toList()); + } + + /** + * 요청 범위를 캐시 구간 / DB 구간으로 분리 + */ + public SplitQueryResult splitQueryRange(LocalDateTime startTime, LocalDateTime endTime) { + LocalDate today = LocalDate.now(); + List cachedDates = new ArrayList<>(); + List dbDates = new ArrayList<>(); + DateRange todayRange = null; + + // 요청 범위의 날짜별 분류 + LocalDate startDate = startTime.toLocalDate(); + LocalDate endDate = endTime.toLocalDate(); + + for (LocalDate d = startDate; !d.isAfter(endDate); d = d.plusDays(1)) { + if (d.equals(today)) { + // 오늘 → hourly/5min 테이블 조회 + LocalDateTime todayStart = today.atStartOfDay(); + LocalDateTime todayEnd = endTime.isAfter(LocalDateTime.now()) ? LocalDateTime.now() : endTime; + if (todayStart.isBefore(startTime)) todayStart = startTime; + if (todayEnd.isAfter(todayStart)) { + todayRange = new DateRange(todayStart, todayEnd); + } + } else if (d.isAfter(today)) { + // 미래 날짜 → 무시 + continue; + } else if (isCached(d)) { + cachedDates.add(d); + } else { + dbDates.add(d); + } + } + + // DB 조회 필요 날짜를 연속 범위로 묶기 + List dbRanges = mergeConsecutiveDates(dbDates, startTime, endTime); + + return new SplitQueryResult(cachedDates, dbRanges, todayRange); + } + + /** + * 연속된 날짜들을 DateRange로 묶기 + */ + private List mergeConsecutiveDates(List dates, LocalDateTime reqStart, LocalDateTime reqEnd) { + if (dates.isEmpty()) return Collections.emptyList(); + + Collections.sort(dates); + List ranges = new ArrayList<>(); + LocalDate rangeStart = dates.get(0); + LocalDate rangeLast = dates.get(0); + + for (int i = 1; i < dates.size(); i++) { + LocalDate d = dates.get(i); + if (d.equals(rangeLast.plusDays(1))) { + rangeLast = d; + } else { + ranges.add(toDateRange(rangeStart, rangeLast, reqStart, reqEnd)); + rangeStart = d; + rangeLast = d; + } + } + ranges.add(toDateRange(rangeStart, rangeLast, reqStart, reqEnd)); + return ranges; + } + + private DateRange toDateRange(LocalDate start, LocalDate end, LocalDateTime reqStart, LocalDateTime reqEnd) { + LocalDateTime s = start.atStartOfDay(); + LocalDateTime e = end.plusDays(1).atStartOfDay(); + // 요청 범위로 클리핑 + if (s.isBefore(reqStart)) s = reqStart; + if (e.isAfter(reqEnd)) e = reqEnd; + return new DateRange(s, e); + } + + // ── 캐시 갱신 ── + + /** + * 일일 배치 완료 후 캐시 갱신: 전날 로드 + 만료 데이터 제거 + */ + public void refreshAfterDailyJob() { + if (!cacheProperties.isEnabled()) return; + + LocalDate today = LocalDate.now(); + LocalDate yesterday = today.minusDays(1); + + log.info("Refreshing daily track cache after daily job: loading {}", yesterday); + + // 전날 데이터 (재)로드 + try { + DailyTrackData data = loadDay(yesterday); + if (data != null && data.getVesselCount() > 0) { + cache.put(yesterday, data); + log.info("Cache refreshed for {}: {} vessels, {} MB", + yesterday, data.getVesselCount(), data.getMemorySizeBytes() / (1024 * 1024)); + } + } catch (Exception e) { + log.error("Failed to refresh cache for {}: {}", yesterday, e.getMessage()); + } + + // 보관 기간 초과 데이터 제거 + LocalDate oldestAllowed = today.minusDays(cacheProperties.getRetentionDays()); + List toRemove = cache.keySet().stream() + .filter(d -> d.isBefore(oldestAllowed)) + .collect(Collectors.toList()); + + for (LocalDate d : toRemove) { + DailyTrackData removed = cache.remove(d); + if (removed != null) { + log.info("Evicted cache for {}: {} vessels, {} MB", + d, removed.getVesselCount(), removed.getMemorySizeBytes() / (1024 * 1024)); + } + } + } + + // ── 모니터링 ── + + /** + * 캐시 상태 정보 (모니터링용) + */ + public Map getCacheStatus() { + Map info = new LinkedHashMap<>(); + info.put("status", status.get().name()); + info.put("enabled", cacheProperties.isEnabled()); + info.put("retentionDays", cacheProperties.getRetentionDays()); + info.put("maxMemoryGb", cacheProperties.getMaxMemoryGb()); + info.put("cachedDays", cache.size()); + + long totalMemory = 0; + int totalVessels = 0; + List> dayDetails = new ArrayList<>(); + + for (Map.Entry entry : cache.entrySet()) { + DailyTrackData data = entry.getValue(); + totalMemory += data.getMemorySizeBytes(); + totalVessels += data.getVesselCount(); + + Map dayInfo = new LinkedHashMap<>(); + dayInfo.put("date", entry.getKey().toString()); + dayInfo.put("vesselCount", data.getVesselCount()); + dayInfo.put("memorySizeMb", data.getMemorySizeBytes() / (1024 * 1024)); + dayInfo.put("loadedAt", new java.util.Date(data.getLoadedAtMillis()).toString()); + dayDetails.add(dayInfo); + } + + info.put("totalVessels", totalVessels); + info.put("totalMemoryMb", totalMemory / (1024 * 1024)); + info.put("days", dayDetails); + return info; + } + + public CacheStatus getStatus() { + return status.get(); + } + + public boolean isEnabled() { + return cacheProperties.isEnabled(); + } + + // ── 내부 유틸 ── + + private boolean isInViewport(CompactVesselTrack track, double minLon, double minLat, double maxLon, double maxLat) { + if (track.getGeometry() == null || track.getGeometry().isEmpty()) return false; + for (double[] coord : track.getGeometry()) { + if (coord[0] >= minLon && coord[0] <= maxLon && coord[1] >= minLat && coord[1] <= maxLat) { + return true; + } + } + return false; + } + + /** + * 선박 데이터 누적용 내부 클래스 + */ + private static class VesselAccumulator { + String sigSrcCd; + String targetId; + List geometry = new ArrayList<>(500); + List timestamps = new ArrayList<>(500); + List speeds = new ArrayList<>(500); + double totalDistance = 0; + double maxSpeed = 0; + int pointCount = 0; + } +} diff --git a/src/main/java/gc/mda/signal_batch/monitoring/controller/WebSocketMonitoringController.java b/src/main/java/gc/mda/signal_batch/monitoring/controller/WebSocketMonitoringController.java index 5ab5502..94ff261 100644 --- a/src/main/java/gc/mda/signal_batch/monitoring/controller/WebSocketMonitoringController.java +++ b/src/main/java/gc/mda/signal_batch/monitoring/controller/WebSocketMonitoringController.java @@ -2,6 +2,7 @@ package gc.mda.signal_batch.monitoring.controller; import gc.mda.signal_batch.monitoring.service.TrackStreamingMetrics; import gc.mda.signal_batch.global.websocket.service.ActiveQueryManager; +import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager; import gc.mda.signal_batch.global.websocket.service.StompTrackStreamingService; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -27,6 +28,7 @@ public class WebSocketMonitoringController { private final TrackStreamingMetrics trackStreamingMetrics; private final StompTrackStreamingService trackStreamingService; private final ActiveQueryManager activeQueryManager; + private final DailyTrackCacheManager dailyTrackCacheManager; /** * WebSocket 스트리밍 현황 조회 @@ -115,6 +117,15 @@ public class WebSocketMonitoringController { return ResponseEntity.ok(status); } + /** + * 일일 항적 캐시 상태 조회 + */ + @GetMapping("/daily-cache") + @Operation(summary = "일일 항적 캐시 현황", description = "일일 데이터 인메모리 캐시의 상태, 날짜별 선박 수, 메모리 사용량을 조회합니다") + public ResponseEntity> getDailyCacheStatus() { + return ResponseEntity.ok(dailyTrackCacheManager.getCacheStatus()); + } + /** * WebSocket 테스트 페이지로 리다이렉트 */