diff --git a/src/main/java/gc/mda/signal_batch/batch/job/DailyAggregationJobConfig.java b/src/main/java/gc/mda/signal_batch/batch/job/DailyAggregationJobConfig.java index 3f29436..5b800b5 100644 --- a/src/main/java/gc/mda/signal_batch/batch/job/DailyAggregationJobConfig.java +++ b/src/main/java/gc/mda/signal_batch/batch/job/DailyAggregationJobConfig.java @@ -1,6 +1,7 @@ package gc.mda.signal_batch.batch.job; import gc.mda.signal_batch.batch.listener.JobCompletionListener; +import gc.mda.signal_batch.batch.reader.HourlyTrackCache; import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -17,9 +18,11 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; +import java.time.LocalDateTime; + @Slf4j @Configuration -@Profile("!query") // query 프로파일에서는 배치 작업 비활성화 +@Profile("!query") @RequiredArgsConstructor @ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true) public class DailyAggregationJobConfig { @@ -28,6 +31,7 @@ public class DailyAggregationJobConfig { private final DailyAggregationStepConfig dailyAggregationStepConfig; private final JobCompletionListener jobCompletionListener; private final DailyTrackCacheManager dailyTrackCacheManager; + private final HourlyTrackCache hourlyTrackCache; @Bean public Job dailyAggregationJob() { @@ -53,14 +57,26 @@ public class DailyAggregationJobConfig { @Override public void afterJob(JobExecution jobExecution) { if (jobExecution.getStatus().isUnsuccessful()) { - log.warn("Daily aggregation job failed, skipping cache refresh"); + log.warn("[CACHE-MONITOR] DailyJob FAILED — L2/L3 cleanup 건너뜀, status={}", + jobExecution.getStatus()); return; } try { - log.info("Daily aggregation job completed, refreshing daily track cache"); + log.info("[CACHE-MONITOR] DailyJob 완료 → L3 refresh 시작, L2 size={}", + hourlyTrackCache.size()); dailyTrackCacheManager.refreshAfterDailyJob(); + + // hourly 캐시에서 어제 범위 제거 + String startTime = jobExecution.getJobParameters().getString("startTime"); + String endTime = jobExecution.getJobParameters().getString("endTime"); + LocalDateTime start = LocalDateTime.parse(startTime); + LocalDateTime end = LocalDateTime.parse(endTime); + long l2Before = hourlyTrackCache.size(); + hourlyTrackCache.removeRange(start, end); + log.info("[CACHE-MONITOR] DailyJob → L2 cleanup [{}, {}): L2 before={}, after={}, L2 stats=[{}]", + start, end, l2Before, hourlyTrackCache.size(), hourlyTrackCache.getStats()); } catch (Exception e) { - log.error("Failed to refresh daily track cache after job: {}", e.getMessage()); + log.error("[CACHE-MONITOR] DailyJob 캐시 갱신/정리 실패: {}", e.getMessage()); } } }; diff --git a/src/main/java/gc/mda/signal_batch/batch/job/HourlyAggregationJobConfig.java b/src/main/java/gc/mda/signal_batch/batch/job/HourlyAggregationJobConfig.java index fdece49..f3bfe87 100644 --- a/src/main/java/gc/mda/signal_batch/batch/job/HourlyAggregationJobConfig.java +++ b/src/main/java/gc/mda/signal_batch/batch/job/HourlyAggregationJobConfig.java @@ -1,9 +1,12 @@ package gc.mda.signal_batch.batch.job; import gc.mda.signal_batch.batch.listener.JobCompletionListener; +import gc.mda.signal_batch.batch.reader.FiveMinTrackCache; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobExecutionListener; import org.springframework.batch.core.JobParametersValidator; import org.springframework.batch.core.job.DefaultJobParametersValidator; import org.springframework.batch.core.job.builder.JobBuilder; @@ -14,31 +17,61 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; +import java.time.LocalDateTime; + @Slf4j @Configuration -@Profile("!query") // query 프로파일에서는 배치 작업 비활성화 +@Profile("!query") @RequiredArgsConstructor @ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true) public class HourlyAggregationJobConfig { - + private final JobRepository jobRepository; private final HourlyAggregationStepConfig hourlyAggregationStepConfig; private final VesselStaticStepConfig vesselStaticStepConfig; private final JobCompletionListener jobCompletionListener; - + private final FiveMinTrackCache fiveMinTrackCache; + @Bean public Job hourlyAggregationJob() { return new JobBuilder("hourlyAggregationJob", jobRepository) .incrementer(new RunIdIncrementer()) .validator(hourlyJobParametersValidator()) .listener(jobCompletionListener) + .listener(hourlyFiveMinCleanupListener()) .start(hourlyAggregationStepConfig.mergeHourlyTracksStep()) .next(hourlyAggregationStepConfig.gridHourlySummaryStep()) .next(hourlyAggregationStepConfig.areaHourlySummaryStep()) .next(vesselStaticStepConfig.vesselStaticSyncStep()) .build(); } - + + @Bean + public JobExecutionListener hourlyFiveMinCleanupListener() { + return new JobExecutionListener() { + @Override + public void afterJob(JobExecution jobExecution) { + if (jobExecution.getStatus().isUnsuccessful()) { + log.info("[CACHE-MONITOR] HourlyJob FAILED — L1 cleanup 건너뜀, status={}", + jobExecution.getStatus()); + return; + } + try { + String startTime = jobExecution.getJobParameters().getString("startTime"); + String endTime = jobExecution.getJobParameters().getString("endTime"); + LocalDateTime start = LocalDateTime.parse(startTime); + LocalDateTime end = LocalDateTime.parse(endTime); + long l1Before = fiveMinTrackCache.size(); + fiveMinTrackCache.removeRange(start, end); + log.info("[CACHE-MONITOR] HourlyJob 완료 → L1 cleanup [{}, {}): L1 before={}, after={}, L1 stats=[{}]", + start, end, l1Before, fiveMinTrackCache.size(), fiveMinTrackCache.getStats()); + } catch (Exception e) { + log.error("[CACHE-MONITOR] L1 cleanup 실패: {}", e.getMessage()); + } + } + }; + } + @Bean public JobParametersValidator hourlyJobParametersValidator() { DefaultJobParametersValidator validator = new DefaultJobParametersValidator(); diff --git a/src/main/java/gc/mda/signal_batch/batch/job/HourlyAggregationStepConfig.java b/src/main/java/gc/mda/signal_batch/batch/job/HourlyAggregationStepConfig.java index 840ae7d..7718d1a 100644 --- a/src/main/java/gc/mda/signal_batch/batch/job/HourlyAggregationStepConfig.java +++ b/src/main/java/gc/mda/signal_batch/batch/job/HourlyAggregationStepConfig.java @@ -6,6 +6,7 @@ import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetecti import gc.mda.signal_batch.batch.processor.HourlyTrackMergeProcessor; import gc.mda.signal_batch.batch.reader.CacheBasedHourlyTrackReader; import gc.mda.signal_batch.batch.reader.FiveMinTrackCache; +import gc.mda.signal_batch.batch.reader.HourlyTrackCache; import gc.mda.signal_batch.batch.writer.VesselTrackBulkWriter; import gc.mda.signal_batch.batch.writer.AbnormalTrackWriter; import gc.mda.signal_batch.batch.writer.CompositeTrackWriter; @@ -44,6 +45,7 @@ public class HourlyAggregationStepConfig { private final AbnormalTrackWriter abnormalTrackWriter; private final AbnormalTrackDetector abnormalTrackDetector; private final FiveMinTrackCache fiveMinTrackCache; + private final HourlyTrackCache hourlyTrackCache; public HourlyAggregationStepConfig( JobRepository jobRepository, @@ -52,7 +54,8 @@ public class HourlyAggregationStepConfig { VesselTrackBulkWriter vesselTrackBulkWriter, AbnormalTrackWriter abnormalTrackWriter, AbnormalTrackDetector abnormalTrackDetector, - FiveMinTrackCache fiveMinTrackCache) { + FiveMinTrackCache fiveMinTrackCache, + HourlyTrackCache hourlyTrackCache) { this.jobRepository = jobRepository; this.queryDataSource = queryDataSource; this.transactionManager = transactionManager; @@ -60,6 +63,7 @@ public class HourlyAggregationStepConfig { this.abnormalTrackWriter = abnormalTrackWriter; this.abnormalTrackDetector = abnormalTrackDetector; this.fiveMinTrackCache = fiveMinTrackCache; + this.hourlyTrackCache = hourlyTrackCache; } @Value("${vessel.batch.chunk-size:5000}") @@ -72,11 +76,13 @@ public class HourlyAggregationStepConfig { @Bean public Step mergeHourlyTracksStep() { log.info("Building mergeHourlyTracksStep with cache-based in-memory merge"); + HourlyTrackMergeProcessor processor = hourlyTrackMergeProcessor(null); return new StepBuilder("mergeHourlyTracksStep", jobRepository) ., AbnormalDetectionResult>chunk(chunkSize, transactionManager) .reader(cacheBasedHourlyTrackReader(null, null)) - .processor(hourlyTrackMergeProcessor(null)) + .processor(processor) .writer(hourlyCompositeTrackWriter()) + .listener(processor) .build(); } @@ -115,7 +121,8 @@ public class HourlyAggregationStepConfig { return new CompositeTrackWriter( vesselTrackBulkWriter, abnormalTrackWriter, - "hourly" + "hourly", + hourlyTrackCache ); } diff --git a/src/main/java/gc/mda/signal_batch/batch/processor/HourlyTrackMergeProcessor.java b/src/main/java/gc/mda/signal_batch/batch/processor/HourlyTrackMergeProcessor.java index b0561bd..12d397a 100644 --- a/src/main/java/gc/mda/signal_batch/batch/processor/HourlyTrackMergeProcessor.java +++ b/src/main/java/gc/mda/signal_batch/batch/processor/HourlyTrackMergeProcessor.java @@ -5,6 +5,9 @@ import gc.mda.signal_batch.domain.vessel.model.VesselTrack; import gc.mda.signal_batch.global.util.LineStringMUtils; import gc.mda.signal_batch.global.util.TrackSimplificationUtils; import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.StepExecutionListener; import org.springframework.batch.item.ItemProcessor; import org.springframework.jdbc.core.JdbcTemplate; @@ -29,7 +32,8 @@ import java.util.regex.Pattern; * N+1 SQL 제거 → DB 쿼리 최대 1회 (비정상 검출용 이전 버킷 prefetch) */ @Slf4j -public class HourlyTrackMergeProcessor implements ItemProcessor, AbnormalDetectionResult> { +public class HourlyTrackMergeProcessor + implements ItemProcessor, AbnormalDetectionResult>, StepExecutionListener { private static final Pattern WKT_COORDS_PATTERN = Pattern.compile("LINESTRING M\\((.+)\\)"); private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @@ -42,6 +46,13 @@ public class HourlyTrackMergeProcessor implements ItemProcessor previousBucketCache; private boolean previousBucketLoaded = false; + // Step 레벨 집계 카운터 + private int totalProcessed = 0; + private int mergeFailCount = 0; + private int simplifiedCount = 0; + private int abnormalCount = 0; + private int avgSpeedFailCount = 0; + public HourlyTrackMergeProcessor( AbnormalTrackDetector abnormalTrackDetector, JdbcTemplate queryJdbcTemplate, @@ -58,11 +69,12 @@ public class HourlyTrackMergeProcessor implements ItemProcessor result = new HashMap<>(); + int[] parseFailCount = {0}; try { queryJdbcTemplate.query(sql, @@ -231,17 +247,24 @@ public class HourlyTrackMergeProcessor implements ItemProcessor { String mmsi = rs.getString("mmsi"); + VesselTrack.TrackPosition endPos = parseEndPosition(rs.getString("end_position")); + if (endPos == null && rs.getString("end_position") != null) { + parseFailCount[0]++; + } VesselTrack track = VesselTrack.builder() .mmsi(mmsi) .timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime()) .trackGeom(rs.getString("last_segment")) - .endPosition(parseEndPosition(rs.getString("end_position"))) + .endPosition(endPos) .build(); result.put(mmsi, track); }); log.info("이전 버킷 트랙 prefetch 완료: {} 선박 (기간: {} ~ {})", result.size(), prevStart, prevEnd); + if (parseFailCount[0] > 0) { + log.debug("end_position 파싱 실패: {} 건", parseFailCount[0]); + } } catch (Exception e) { log.warn("이전 버킷 트랙 prefetch 실패 (첫 실행일 수 있음): {}", e.getMessage()); } @@ -264,7 +287,6 @@ public class HourlyTrackMergeProcessor implements ItemProcessor """; Map> result = new LinkedHashMap<>(); + int[] parseFailCount = {0}; queryJdbcTemplate.query(sql, ps -> { @@ -139,6 +140,12 @@ public class CacheBasedHourlyTrackReader implements ItemReader }, rs -> { String mmsi = rs.getString("mmsi"); + VesselTrack.TrackPosition startPos = parseTrackPosition(rs.getString("start_position")); + VesselTrack.TrackPosition endPos = parseTrackPosition(rs.getString("end_position")); + if ((startPos == null && rs.getString("start_position") != null) + || (endPos == null && rs.getString("end_position") != null)) { + parseFailCount[0]++; + } VesselTrack track = VesselTrack.builder() .mmsi(mmsi) .timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime()) @@ -147,8 +154,8 @@ public class CacheBasedHourlyTrackReader implements ItemReader .avgSpeed(rs.getBigDecimal("avg_speed")) .maxSpeed(rs.getBigDecimal("max_speed")) .pointCount(rs.getInt("point_count")) - .startPosition(parseTrackPosition(rs.getString("start_position"))) - .endPosition(parseTrackPosition(rs.getString("end_position"))) + .startPosition(startPos) + .endPosition(endPos) .build(); result.computeIfAbsent(mmsi, k -> new ArrayList<>()).add(track); @@ -156,6 +163,9 @@ public class CacheBasedHourlyTrackReader implements ItemReader log.info("DB fallback 완료: {} 선박, {} 트랙", result.size(), result.values().stream().mapToInt(List::size).sum()); + if (parseFailCount[0] > 0) { + log.debug("TrackPosition 파싱 실패: {} 건", parseFailCount[0]); + } return result; } @@ -174,7 +184,6 @@ public class CacheBasedHourlyTrackReader implements ItemReader .sog(sogStr != null ? new BigDecimal(sogStr) : null) .build(); } catch (Exception e) { - log.debug("TrackPosition 파싱 실패: {}", json); return null; } } diff --git a/src/main/java/gc/mda/signal_batch/batch/reader/FiveMinTrackCache.java b/src/main/java/gc/mda/signal_batch/batch/reader/FiveMinTrackCache.java index 6790faf..769af5f 100644 --- a/src/main/java/gc/mda/signal_batch/batch/reader/FiveMinTrackCache.java +++ b/src/main/java/gc/mda/signal_batch/batch/reader/FiveMinTrackCache.java @@ -56,9 +56,12 @@ public class FiveMinTrackCache { public void putAll(List tracks) { if (tracks == null) return; + long beforeSize = cache.estimatedSize(); for (VesselTrack track : tracks) { put(track); } + log.info("[CACHE-MONITOR] L1.putAll: input={}, cacheBefore={}, cacheAfter={}, stats=[{}]", + tracks.size(), beforeSize, cache.estimatedSize(), getStats()); } /** @@ -85,9 +88,28 @@ public class FiveMinTrackCache { tracks.sort(Comparator.comparing(VesselTrack::getTimeBucket)); } + int totalTracks = result.values().stream().mapToInt(List::size).sum(); + log.info("[CACHE-MONITOR] L1.getTracksInRange [{}, {}): mmsi={}, tracks={}, cacheTotal={}", + start, end, result.size(), totalTracks, cache.estimatedSize()); return result; } + /** + * 지정 시간 범위의 캐시 항목 제거 (hourly merge 완료 후 호출) + */ + public void removeRange(LocalDateTime start, LocalDateTime end) { + long before = cache.estimatedSize(); + cache.asMap().entrySet().removeIf(entry -> { + VesselTrack track = entry.getValue(); + return track.getTimeBucket() != null + && !track.getTimeBucket().isBefore(start) + && track.getTimeBucket().isBefore(end); + }); + long after = cache.estimatedSize(); + log.info("[CACHE-MONITOR] L1.removeRange [{}, {}): removed={}, before={}, after={}, stats=[{}]", + start, end, before - after, before, after, getStats()); + } + public long size() { return cache.estimatedSize(); } diff --git a/src/main/java/gc/mda/signal_batch/batch/reader/HourlyTrackCache.java b/src/main/java/gc/mda/signal_batch/batch/reader/HourlyTrackCache.java new file mode 100644 index 0000000..a41aae8 --- /dev/null +++ b/src/main/java/gc/mda/signal_batch/batch/reader/HourlyTrackCache.java @@ -0,0 +1,128 @@ +package gc.mda.signal_batch.batch.reader; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import gc.mda.signal_batch.domain.vessel.model.VesselTrack; +import jakarta.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * Hourly VesselTrack 인메모리 캐시 (L2) + * + * hourly 집계 후 DB 저장과 동시에 캐시에 보관. + * 조회 서비스(GisServiceV2)에서 오늘 정각 이전 구간을 DB 대신 캐시에서 즉시 응답. + * + * key: "mmsi::timeBucket" (예: "440123456::2026-02-19T09:00") + * value: VesselTrack + * TTL: 26시간 (24시간 + 2시간 여유, daily merge 후 removeRange로 제거) + */ +@Slf4j +@Component +public class HourlyTrackCache { + + private static final DateTimeFormatter KEY_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm"); + + private Cache cache; + + @Value("${app.cache.hourly-track.ttl-hours:26}") + private long ttlHours; + + @Value("${app.cache.hourly-track.max-size:780000}") + private int maxSize; + + @PostConstruct + public void init() { + this.cache = Caffeine.newBuilder() + .maximumSize(maxSize) + .expireAfterWrite(ttlHours, TimeUnit.HOURS) + .recordStats() + .build(); + log.info("HourlyTrackCache 초기화 — TTL: {}시간, maxSize: {}", ttlHours, maxSize); + } + + public void put(VesselTrack track) { + if (track == null || track.getMmsi() == null || track.getTimeBucket() == null) { + return; + } + cache.put(buildKey(track.getMmsi(), track.getTimeBucket()), track); + } + + public void putAll(List tracks) { + if (tracks == null) return; + long beforeSize = cache.estimatedSize(); + for (VesselTrack track : tracks) { + put(track); + } + log.info("[CACHE-MONITOR] L2.putAll: input={}, cacheBefore={}, cacheAfter={}, stats=[{}]", + tracks.size(), beforeSize, cache.estimatedSize(), getStats()); + } + + /** + * 지정 시간 범위의 트랙을 MMSI별로 그루핑하여 반환 + * + * @param start 시작 시각 (inclusive) + * @param end 종료 시각 (exclusive) + * @return Map> (시간순 정렬) + */ + public Map> getTracksInRange(LocalDateTime start, LocalDateTime end) { + Map> result = new LinkedHashMap<>(); + + for (Map.Entry entry : cache.asMap().entrySet()) { + VesselTrack track = entry.getValue(); + if (track.getTimeBucket() != null + && !track.getTimeBucket().isBefore(start) + && track.getTimeBucket().isBefore(end)) { + result.computeIfAbsent(track.getMmsi(), k -> new ArrayList<>()).add(track); + } + } + + for (List tracks : result.values()) { + tracks.sort(Comparator.comparing(VesselTrack::getTimeBucket)); + } + + int totalTracks = result.values().stream().mapToInt(List::size).sum(); + log.info("[CACHE-MONITOR] L2.getTracksInRange [{}, {}): mmsi={}, tracks={}, cacheTotal={}", + start, end, result.size(), totalTracks, cache.estimatedSize()); + return result; + } + + /** + * 지정 시간 범위의 캐시 항목 제거 (daily merge 완료 후 호출) + */ + public void removeRange(LocalDateTime start, LocalDateTime end) { + long before = cache.estimatedSize(); + cache.asMap().entrySet().removeIf(entry -> { + VesselTrack track = entry.getValue(); + return track.getTimeBucket() != null + && !track.getTimeBucket().isBefore(start) + && track.getTimeBucket().isBefore(end); + }); + long after = cache.estimatedSize(); + log.info("[CACHE-MONITOR] L2.removeRange [{}, {}): removed={}, before={}, after={}, stats=[{}]", + start, end, before - after, before, after, getStats()); + } + + public long size() { + return cache.estimatedSize(); + } + + public String getStats() { + var stats = cache.stats(); + return String.format("size=%d, hitRate=%.1f%%, hits=%d, misses=%d", + cache.estimatedSize(), + stats.hitRate() * 100, + stats.hitCount(), + stats.missCount()); + } + + private String buildKey(String mmsi, LocalDateTime timeBucket) { + return mmsi + "::" + timeBucket.format(KEY_FORMATTER); + } +} diff --git a/src/main/java/gc/mda/signal_batch/batch/writer/CompositeTrackWriter.java b/src/main/java/gc/mda/signal_batch/batch/writer/CompositeTrackWriter.java index 906f797..c8bb679 100644 --- a/src/main/java/gc/mda/signal_batch/batch/writer/CompositeTrackWriter.java +++ b/src/main/java/gc/mda/signal_batch/batch/writer/CompositeTrackWriter.java @@ -1,8 +1,8 @@ package gc.mda.signal_batch.batch.writer; +import gc.mda.signal_batch.batch.reader.HourlyTrackCache; import gc.mda.signal_batch.domain.vessel.model.VesselTrack; import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetectionResult; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.annotation.BeforeStep; @@ -19,12 +19,28 @@ import java.util.List; */ @Slf4j @ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true) -@RequiredArgsConstructor public class CompositeTrackWriter implements ItemWriter { - + private final VesselTrackBulkWriter vesselTrackBulkWriter; private final AbnormalTrackWriter abnormalTrackWriter; private final String targetTable; + private final HourlyTrackCache hourlyTrackCache; // nullable (daily writer는 미사용) + + public CompositeTrackWriter(VesselTrackBulkWriter vesselTrackBulkWriter, + AbnormalTrackWriter abnormalTrackWriter, + String targetTable, + HourlyTrackCache hourlyTrackCache) { + this.vesselTrackBulkWriter = vesselTrackBulkWriter; + this.abnormalTrackWriter = abnormalTrackWriter; + this.targetTable = targetTable; + this.hourlyTrackCache = hourlyTrackCache; + } + + public CompositeTrackWriter(VesselTrackBulkWriter vesselTrackBulkWriter, + AbnormalTrackWriter abnormalTrackWriter, + String targetTable) { + this(vesselTrackBulkWriter, abnormalTrackWriter, targetTable, null); + } @BeforeStep public void beforeStep(StepExecution stepExecution) { @@ -67,6 +83,12 @@ public class CompositeTrackWriter implements ItemWriter if (!normalTracks.isEmpty()) { if ("hourly".equals(targetTable)) { vesselTrackBulkWriter.writeHourlyTracks(normalTracks); + if (hourlyTrackCache != null) { + long l2Before = hourlyTrackCache.size(); + hourlyTrackCache.putAll(normalTracks); + log.info("[CACHE-MONITOR] CompositeTrackWriter → L2.putAll: tracks={}, L2 before={}, after={}", + normalTracks.size(), l2Before, hourlyTrackCache.size()); + } } else if ("daily".equals(targetTable)) { vesselTrackBulkWriter.writeDailyTracks(normalTracks); } else { diff --git a/src/main/java/gc/mda/signal_batch/domain/gis/service/GisServiceV2.java b/src/main/java/gc/mda/signal_batch/domain/gis/service/GisServiceV2.java index 59ddfaa..055eeed 100644 --- a/src/main/java/gc/mda/signal_batch/domain/gis/service/GisServiceV2.java +++ b/src/main/java/gc/mda/signal_batch/domain/gis/service/GisServiceV2.java @@ -1,10 +1,14 @@ package gc.mda.signal_batch.domain.gis.service; +import gc.mda.signal_batch.batch.reader.FiveMinTrackCache; +import gc.mda.signal_batch.batch.reader.HourlyTrackCache; import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack; import gc.mda.signal_batch.domain.vessel.dto.TrackResponse; import gc.mda.signal_batch.domain.vessel.dto.VesselTracksRequest; +import gc.mda.signal_batch.domain.vessel.model.VesselTrack; import gc.mda.signal_batch.global.exception.QueryTimeoutException; import gc.mda.signal_batch.global.util.TrackConverter; +import gc.mda.signal_batch.global.util.VesselTrackToCompactConverter; import gc.mda.signal_batch.global.websocket.service.ActiveQueryManager; import gc.mda.signal_batch.global.websocket.service.CacheTrackSimplifier; import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager; @@ -39,6 +43,9 @@ public class GisServiceV2 { private final DailyTrackCacheManager dailyTrackCacheManager; private final CacheTrackSimplifier cacheTrackSimplifier; private final GisService gisService; + private final HourlyTrackCache hourlyTrackCache; + private final FiveMinTrackCache fiveMinTrackCache; + private final VesselTrackToCompactConverter vesselTrackToCompactConverter; @Value("${rest.v2.query.timeout-seconds:30}") private int restQueryTimeout; @@ -54,12 +61,18 @@ public class GisServiceV2 { ActiveQueryManager activeQueryManager, DailyTrackCacheManager dailyTrackCacheManager, CacheTrackSimplifier cacheTrackSimplifier, - GisService gisService) { + GisService gisService, + HourlyTrackCache hourlyTrackCache, + FiveMinTrackCache fiveMinTrackCache, + VesselTrackToCompactConverter vesselTrackToCompactConverter) { this.queryDataSource = queryDataSource; this.activeQueryManager = activeQueryManager; this.dailyTrackCacheManager = dailyTrackCacheManager; this.cacheTrackSimplifier = cacheTrackSimplifier; this.gisService = gisService; + this.hourlyTrackCache = hourlyTrackCache; + this.fiveMinTrackCache = fiveMinTrackCache; + this.vesselTrackToCompactConverter = vesselTrackToCompactConverter; } /** @@ -332,18 +345,52 @@ public class GisServiceV2 { } } - // 3. 오늘 구간 DB 조회 (hourly + 5min) - if (split.hasTodayRange()) { - DailyTrackCacheManager.DateRange today = split.getTodayRange(); - VesselTracksRequest todayRequest = VesselTracksRequest.builder() - .startTime(today.getStart()) - .endTime(today.getEnd()) - .vessels(request.getVessels()) - .build(); - List todayTracks = gisService.getVesselTracks(todayRequest); - allTracks.addAll(todayTracks); - log.debug("[CacheQuery] today {} ~ {} -> {} tracks", - today.getStart(), today.getEnd(), todayTracks.size()); + // 3-a. hourly 범위 → L2 캐시 → DB fallback + if (split.hasHourlyRange()) { + DailyTrackCacheManager.DateRange hr = split.getHourlyRange(); + Map> hourlyTracks = + hourlyTrackCache.getTracksInRange(hr.getStart(), hr.getEnd()); + + if (!hourlyTracks.isEmpty()) { + Map> filtered = filterByMmsi(hourlyTracks, requestedMmsis); + List converted = vesselTrackToCompactConverter.convert(filtered); + allTracks.addAll(converted); + int totalPts = converted.stream().mapToInt(CompactVesselTrack::getPointCount).sum(); + log.info("[CACHE-MONITOR] queryWithCache L2 HIT [{}, {}): cacheVessels={}, filteredVessels={}, compactTracks={}, points={}", + hr.getStart(), hr.getEnd(), hourlyTracks.size(), filtered.size(), converted.size(), totalPts); + } else { + VesselTracksRequest hourlyReq = VesselTracksRequest.builder() + .startTime(hr.getStart()).endTime(hr.getEnd()) + .vessels(request.getVessels()).build(); + List dbResult = gisService.getVesselTracks(hourlyReq); + allTracks.addAll(dbResult); + log.info("[CACHE-MONITOR] queryWithCache L2 MISS → DB fallback [{}, {}): dbTracks={}", + hr.getStart(), hr.getEnd(), dbResult.size()); + } + } + + // 3-b. 5min 범위 → L1 캐시 → DB fallback + if (split.hasFiveMinRange()) { + DailyTrackCacheManager.DateRange fr = split.getFiveMinRange(); + Map> fiveMinTracks = + fiveMinTrackCache.getTracksInRange(fr.getStart(), fr.getEnd()); + + if (!fiveMinTracks.isEmpty()) { + Map> filtered = filterByMmsi(fiveMinTracks, requestedMmsis); + List converted = vesselTrackToCompactConverter.convert(filtered); + allTracks.addAll(converted); + int totalPts = converted.stream().mapToInt(CompactVesselTrack::getPointCount).sum(); + log.info("[CACHE-MONITOR] queryWithCache L1 HIT [{}, {}): cacheVessels={}, filteredVessels={}, compactTracks={}, points={}", + fr.getStart(), fr.getEnd(), fiveMinTracks.size(), filtered.size(), converted.size(), totalPts); + } else { + VesselTracksRequest fiveMinReq = VesselTracksRequest.builder() + .startTime(fr.getStart()).endTime(fr.getEnd()) + .vessels(request.getVessels()).build(); + List dbResult = gisService.getVesselTracks(fiveMinReq); + allTracks.addAll(dbResult); + log.info("[CACHE-MONITOR] queryWithCache L1 MISS → DB fallback [{}, {}): dbTracks={}", + fr.getStart(), fr.getEnd(), dbResult.size()); + } } // 4. 동일 선박 병합 (캐시 + DB 결과) @@ -450,6 +497,22 @@ public class GisServiceV2 { return merged; } + // ── MMSI 필터링 ── + + private Map> filterByMmsi( + Map> tracksByMmsi, Set requestedMmsis) { + if (requestedMmsis == null || requestedMmsis.isEmpty()) { + return tracksByMmsi; + } + Map> filtered = new LinkedHashMap<>(); + for (Map.Entry> entry : tracksByMmsi.entrySet()) { + if (requestedMmsis.contains(entry.getKey())) { + filtered.put(entry.getKey(), entry.getValue()); + } + } + return filtered; + } + // ── 유틸리티 메서드 ── private TrackResponse mapTrackResponse(ResultSet rs, int rowNum) throws SQLException { diff --git a/src/main/java/gc/mda/signal_batch/global/config/AsyncConfig.java b/src/main/java/gc/mda/signal_batch/global/config/AsyncConfig.java index fe64c43..d19ad8d 100644 --- a/src/main/java/gc/mda/signal_batch/global/config/AsyncConfig.java +++ b/src/main/java/gc/mda/signal_batch/global/config/AsyncConfig.java @@ -33,6 +33,21 @@ public class AsyncConfig implements AsyncConfigurer { return executor; } + @Bean(name = "cacheWarmupExecutor") + public Executor getCacheWarmupExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(2); + executor.setMaxPoolSize(4); + executor.setQueueCapacity(10); + executor.setKeepAliveSeconds(60); + executor.setThreadNamePrefix("cache-warmup-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(120); + executor.initialize(); + return executor; + } + @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncUncaughtExceptionHandler() { diff --git a/src/main/java/gc/mda/signal_batch/global/config/CacheWarmupService.java b/src/main/java/gc/mda/signal_batch/global/config/CacheWarmupService.java new file mode 100644 index 0000000..28efd5b --- /dev/null +++ b/src/main/java/gc/mda/signal_batch/global/config/CacheWarmupService.java @@ -0,0 +1,184 @@ +package gc.mda.signal_batch.global.config; + +import gc.mda.signal_batch.batch.reader.FiveMinTrackCache; +import gc.mda.signal_batch.batch.reader.HourlyTrackCache; +import gc.mda.signal_batch.domain.vessel.model.VesselTrack; +import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +import javax.sql.DataSource; +import java.math.BigDecimal; +import java.sql.*; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; + +/** + * L1(5min)/L2(hourly)/L3(daily) 통합 캐시 워밍업 오케스트레이터 + * + * 기동 시 전용 cacheWarmupExecutor 스레드에서 비동기 실행. + * 스케줄러/API 스레드와 완전 독립 — 지연/실패해도 다른 서비스 무영향. + * + * 기존 DailyTrackCacheManager.onApplicationReady() 이벤트 리스너를 대체. + */ +@Slf4j +@Component +public class CacheWarmupService { + + private final FiveMinTrackCache fiveMinTrackCache; + private final HourlyTrackCache hourlyTrackCache; + private final DailyTrackCacheManager dailyTrackCacheManager; + private final DataSource queryDataSource; + private final DailyTrackCacheProperties cacheProperties; + + public CacheWarmupService( + FiveMinTrackCache fiveMinTrackCache, + HourlyTrackCache hourlyTrackCache, + DailyTrackCacheManager dailyTrackCacheManager, + @Qualifier("queryDataSource") DataSource queryDataSource, + DailyTrackCacheProperties cacheProperties) { + this.fiveMinTrackCache = fiveMinTrackCache; + this.hourlyTrackCache = hourlyTrackCache; + this.dailyTrackCacheManager = dailyTrackCacheManager; + this.queryDataSource = queryDataSource; + this.cacheProperties = cacheProperties; + } + + @EventListener(ApplicationReadyEvent.class) + public void onApplicationReady() { + if (!cacheProperties.isEnabled()) { + log.info("Cache warmup skipped (disabled)"); + return; + } + warmUpAllCachesAsync(); + } + + @Async("cacheWarmupExecutor") + public void warmUpAllCachesAsync() { + long totalStart = System.currentTimeMillis(); + log.info("[CACHE-MONITOR] === 전체 캐시 워밍업 시작 === thread={}, now={}", + Thread.currentThread().getName(), LocalDateTime.now()); + + LocalDateTime now = LocalDateTime.now(); + LocalDateTime currentHour = now.withMinute(0).withSecond(0).withNano(0); + LocalDateTime todayStart = now.toLocalDate().atStartOfDay(); + + // 1. L1 워밍업: 현재 시간대 5min 트랙 (예: 12:00~12:50) + long l1Start = System.currentTimeMillis(); + try { + warmUpFiveMinCache(currentHour, now); + } catch (Exception e) { + log.error("[CACHE-MONITOR] L1(5min) 워밍업 실패 — 계속 진행: {}", e.getMessage()); + } + long l1Elapsed = System.currentTimeMillis() - l1Start; + + // 2. L2 워밍업: 오늘 정각 이전 hourly 트랙 (예: 00:00~12:00) + long l2Start = System.currentTimeMillis(); + try { + warmUpHourlyCache(todayStart, currentHour); + } catch (Exception e) { + log.error("[CACHE-MONITOR] L2(hourly) 워밍업 실패 — 계속 진행: {}", e.getMessage()); + } + long l2Elapsed = System.currentTimeMillis() - l2Start; + + // 3. L3 워밍업: D-1 ~ D-N daily (기존 DailyTrackCacheManager 위임) + long l3Start = System.currentTimeMillis(); + try { + dailyTrackCacheManager.warmUpCache(); + } catch (Exception e) { + log.error("[CACHE-MONITOR] L3(daily) 워밍업 실패: {}", e.getMessage()); + } + long l3Elapsed = System.currentTimeMillis() - l3Start; + + long elapsed = System.currentTimeMillis() - totalStart; + log.info("[CACHE-MONITOR] === 전체 캐시 워밍업 완료 === totalElapsed={}ms, L1={}ms(size={}), L2={}ms(size={}), L3={}ms, thread={}", + elapsed, l1Elapsed, fiveMinTrackCache.size(), l2Elapsed, hourlyTrackCache.size(), l3Elapsed, Thread.currentThread().getName()); + } + + /** + * L1 워밍업: t_vessel_tracks_5min에서 현재 시간대 DB 로드 + * 예: 12:54 기동 → 12:00~12:50 (10건/MMSI) 로드 + */ + private void warmUpFiveMinCache(LocalDateTime hourStart, LocalDateTime now) { + String sql = "SELECT mmsi, time_bucket, " + + "public.ST_AsText(track_geom) as track_geom, " + + "distance_nm, avg_speed, max_speed, point_count " + + "FROM signal.t_vessel_tracks_5min " + + "WHERE time_bucket >= ? AND time_bucket < ?"; + + List tracks = loadVesselTracksFromDb(sql, hourStart, now); + if (!tracks.isEmpty()) { + long distinctMmsi = tracks.stream().map(VesselTrack::getMmsi).distinct().count(); + fiveMinTrackCache.putAll(tracks); + log.info("[CACHE-MONITOR] L1 워밍업 완료: [{}, {}) → {} 건, {} MMSI, cacheSize={}", + hourStart, now, tracks.size(), distinctMmsi, fiveMinTrackCache.size()); + } else { + log.info("[CACHE-MONITOR] L1 워밍업: [{}, {}) → 데이터 없음 (정상)", hourStart, now); + } + } + + /** + * L2 워밍업: t_vessel_tracks_hourly에서 오늘 정각 이전 DB 로드 + * 예: 12:54 기동 → 00:00~12:00 (최대 12건/MMSI) 로드 + */ + private void warmUpHourlyCache(LocalDateTime todayStart, LocalDateTime currentHour) { + if (!currentHour.isAfter(todayStart)) { + log.info("[CACHE-MONITOR] L2 워밍업: 자정 직후 — 건너뜀"); + return; + } + + String sql = "SELECT mmsi, time_bucket, " + + "public.ST_AsText(track_geom) as track_geom, " + + "distance_nm, avg_speed, max_speed, point_count " + + "FROM signal.t_vessel_tracks_hourly " + + "WHERE time_bucket >= ? AND time_bucket < ?"; + + List tracks = loadVesselTracksFromDb(sql, todayStart, currentHour); + if (!tracks.isEmpty()) { + long distinctMmsi = tracks.stream().map(VesselTrack::getMmsi).distinct().count(); + hourlyTrackCache.putAll(tracks); + log.info("[CACHE-MONITOR] L2 워밍업 완료: [{}, {}) → {} 건, {} MMSI, cacheSize={}", + todayStart, currentHour, tracks.size(), distinctMmsi, hourlyTrackCache.size()); + } else { + log.info("[CACHE-MONITOR] L2 워밍업: [{}, {}) → 데이터 없음 (정상)", + todayStart, currentHour); + } + } + + /** + * DB에서 VesselTrack 리스트 로드 (L1/L2 공용) + */ + private List loadVesselTracksFromDb( + String sql, LocalDateTime start, LocalDateTime end) { + List result = new ArrayList<>(); + try (Connection conn = queryDataSource.getConnection(); + PreparedStatement ps = conn.prepareStatement(sql)) { + ps.setTimestamp(1, Timestamp.valueOf(start)); + ps.setTimestamp(2, Timestamp.valueOf(end)); + ps.setFetchSize(10000); + + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + VesselTrack track = VesselTrack.builder() + .mmsi(rs.getString("mmsi")) + .timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime()) + .trackGeom(rs.getString("track_geom")) + .distanceNm(BigDecimal.valueOf(rs.getDouble("distance_nm"))) + .avgSpeed(BigDecimal.valueOf(rs.getDouble("avg_speed"))) + .maxSpeed(BigDecimal.valueOf(rs.getDouble("max_speed"))) + .pointCount(rs.getInt("point_count")) + .build(); + result.add(track); + } + } + } catch (Exception e) { + log.error("DB 워밍업 쿼리 실패 [{}, {}): {}", start, end, e.getMessage()); + } + return result; + } +} diff --git a/src/main/java/gc/mda/signal_batch/global/util/VesselTrackToCompactConverter.java b/src/main/java/gc/mda/signal_batch/global/util/VesselTrackToCompactConverter.java new file mode 100644 index 0000000..79a1abd --- /dev/null +++ b/src/main/java/gc/mda/signal_batch/global/util/VesselTrackToCompactConverter.java @@ -0,0 +1,154 @@ +package gc.mda.signal_batch.global.util; + +import gc.mda.signal_batch.batch.reader.AisTargetCacheManager; +import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack; +import gc.mda.signal_batch.domain.vessel.model.AisTargetEntity; +import gc.mda.signal_batch.domain.vessel.model.VesselTrack; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.LineString; +import org.locationtech.jts.io.ParseException; +import org.locationtech.jts.io.WKTReader; +import org.springframework.stereotype.Component; + +import java.util.*; + +/** + * VesselTrack(배치 도메인) → CompactVesselTrack(조회 응답 DTO) 변환 + * + * DailyTrackCacheManager.loadDay()의 VesselAccumulator 패턴 재사용. + * L1(5min)/L2(hourly) 캐시 데이터를 REST/WebSocket 응답 형태로 변환. + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class VesselTrackToCompactConverter { + + private static final ThreadLocal wktReaderLocal = ThreadLocal.withInitial(WKTReader::new); + + private final AisTargetCacheManager aisTargetCacheManager; + + /** + * MMSI별 VesselTrack 리스트 → CompactVesselTrack 리스트 변환 + * + * @param tracksByMmsi MMSI → VesselTrack 리스트 (시간순 정렬 전제) + * @return CompactVesselTrack 리스트 (MMSI별 1건, geometry/timestamps/speeds 병합) + */ + public List convert(Map> tracksByMmsi) { + if (tracksByMmsi == null || tracksByMmsi.isEmpty()) { + return Collections.emptyList(); + } + + long startMs = System.currentTimeMillis(); + int inputTrackCount = tracksByMmsi.values().stream().mapToInt(List::size).sum(); + + // 선박 정보 일괄 조회 + List mmsiList = new ArrayList<>(tracksByMmsi.keySet()); + Map vesselInfoMap = aisTargetCacheManager.getAll(mmsiList); + + long vesselInfoMs = System.currentTimeMillis(); + + List result = new ArrayList<>(tracksByMmsi.size()); + WKTReader wktReader = wktReaderLocal.get(); + + for (Map.Entry> entry : tracksByMmsi.entrySet()) { + String mmsi = entry.getKey(); + List tracks = entry.getValue(); + + CompactVesselTrack compact = convertSingleVessel(mmsi, tracks, vesselInfoMap.get(mmsi), wktReader); + if (compact != null) { + result.add(compact); + } + } + + int totalPoints = result.stream().mapToInt(CompactVesselTrack::getPointCount).sum(); + long elapsed = System.currentTimeMillis() - startMs; + log.info("[CACHE-MONITOR] VesselTrackToCompactConverter.convert: inputMmsi={}, inputTracks={}, outputCompact={}, totalPoints={}, vesselInfoLookup={}ms, totalElapsed={}ms", + tracksByMmsi.size(), inputTrackCount, result.size(), totalPoints, vesselInfoMs - startMs, elapsed); + + return result; + } + + private CompactVesselTrack convertSingleVessel( + String mmsi, + List tracks, + AisTargetEntity vesselInfo, + WKTReader wktReader) { + + List geometry = new ArrayList<>(); + List timestamps = new ArrayList<>(); + List speeds = new ArrayList<>(); + double totalDistance = 0; + double maxSpeed = 0; + + for (VesselTrack track : tracks) { + String trackGeomWkt = track.getTrackGeom(); + if (trackGeomWkt == null || trackGeomWkt.isEmpty() || "LINESTRING EMPTY".equals(trackGeomWkt)) { + continue; + } + + try { + LineString lineString = (LineString) wktReader.read(trackGeomWkt); + if (lineString.getNumPoints() == 0) continue; + + Coordinate[] coords = lineString.getCoordinates(); + for (Coordinate coord : coords) { + geometry.add(new double[]{coord.x, coord.y}); + if (!Double.isNaN(coord.getM())) { + timestamps.add(String.valueOf((long) coord.getM())); + } else { + timestamps.add(String.valueOf( + track.getTimeBucket().toEpochSecond(java.time.ZoneOffset.of("+09:00")))); + } + speeds.add(0.0); + } + } catch (ParseException e) { + log.debug("WKT 파싱 실패 — mmsi={}: {}", mmsi, e.getMessage()); + } + + if (track.getDistanceNm() != null) { + totalDistance += track.getDistanceNm().doubleValue(); + } + if (track.getMaxSpeed() != null) { + maxSpeed = Math.max(maxSpeed, track.getMaxSpeed().doubleValue()); + } + } + + if (geometry.isEmpty()) { + return null; + } + + int pointCount = geometry.size(); + double avgSpeed = pointCount > 0 ? totalDistance / Math.max(1, pointCount) * 60 : 0; + + // 선박 정보 설정 + String shipName = null; + String shipType = null; + String shipKindCode = null; + if (vesselInfo != null) { + shipName = vesselInfo.getName(); + shipType = vesselInfo.getVesselType(); + shipKindCode = SignalKindCode.resolve(vesselInfo.getVesselType(), vesselInfo.getExtraInfo()).getCode(); + } else { + shipKindCode = SignalKindCode.resolve(null, null).getCode(); + } + + String nationalCode = mmsi.length() >= 3 ? mmsi.substring(0, 3) : mmsi; + + return CompactVesselTrack.builder() + .vesselId(mmsi) + .nationalCode(nationalCode) + .shipName(shipName) + .shipType(shipType) + .shipKindCode(shipKindCode) + .geometry(geometry) + .timestamps(timestamps) + .speeds(speeds) + .totalDistance(totalDistance) + .avgSpeed(avgSpeed) + .maxSpeed(maxSpeed) + .pointCount(pointCount) + .build(); + } +} diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/DailyTrackCacheManager.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/DailyTrackCacheManager.java index 49a124e..35c3884 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/DailyTrackCacheManager.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/DailyTrackCacheManager.java @@ -11,9 +11,6 @@ import org.locationtech.jts.index.strtree.STRtree; import org.locationtech.jts.io.ParseException; import org.locationtech.jts.io.WKTReader; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.event.EventListener; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import javax.sql.DataSource; @@ -100,20 +97,29 @@ public class DailyTrackCacheManager { public static class SplitQueryResult { private final List cachedDates; // 캐시에서 가져올 날짜 private final List dbRanges; // DB 조회 필요 범위 (연속 날짜 묶음) - private final DateRange todayRange; // 오늘 구간 (hourly/5min) + private final DateRange todayRange; // 오늘 구간 (fallback용 유지) + private final DateRange hourlyRange; // 오늘 정각 이전 구간 (L2 캐시) + private final DateRange fiveMinRange; // 현재 시각대 구간 (L1 캐시) - public SplitQueryResult(List cachedDates, List dbRanges, DateRange todayRange) { + public SplitQueryResult(List cachedDates, List dbRanges, + DateRange todayRange, DateRange hourlyRange, DateRange fiveMinRange) { this.cachedDates = cachedDates; this.dbRanges = dbRanges; this.todayRange = todayRange; + this.hourlyRange = hourlyRange; + this.fiveMinRange = fiveMinRange; } public List getCachedDates() { return cachedDates; } public List getDbRanges() { return dbRanges; } public DateRange getTodayRange() { return todayRange; } + public DateRange getHourlyRange() { return hourlyRange; } + public DateRange getFiveMinRange() { return fiveMinRange; } public boolean hasCachedData() { return !cachedDates.isEmpty(); } public boolean hasDbRanges() { return !dbRanges.isEmpty(); } public boolean hasTodayRange() { return todayRange != null; } + public boolean hasHourlyRange() { return hourlyRange != null; } + public boolean hasFiveMinRange() { return fiveMinRange != null; } } public static class DateRange { @@ -129,29 +135,11 @@ public class DailyTrackCacheManager { public LocalDateTime getEnd() { return end; } } - // ── 비동기 캐시 워밍업 ── - - @EventListener(ApplicationReadyEvent.class) - public void onApplicationReady() { - if (!cacheProperties.isEnabled()) { - status.set(CacheStatus.DISABLED); - log.info("Daily track cache is disabled"); - return; - } - if (cacheProperties.isWarmupAsync()) { - warmUpCacheAsync(); - } else { - warmUpCache(); - } - } - - @Async("trackStreamingExecutor") - public void warmUpCacheAsync() { - warmUpCache(); - } + // ── 캐시 워밍업 (CacheWarmupService에서 호출) ── /** * 캐시 워밍업: D-1 → D-2 → ... → D-N 순서로 최근 우선 로드 + * CacheWarmupService.warmUpAllCachesAsync()에서 L3 단계로 호출됨. */ public void warmUpCache() { if (!cacheProperties.isEnabled()) { @@ -441,6 +429,8 @@ public class DailyTrackCacheManager { List cachedDates = new ArrayList<>(); List dbDates = new ArrayList<>(); DateRange todayRange = null; + DateRange hourlyRange = null; + DateRange fiveMinRange = null; // 요청 범위의 날짜별 분류 LocalDate startDate = startTime.toLocalDate(); @@ -448,12 +438,25 @@ public class DailyTrackCacheManager { for (LocalDate d = startDate; !d.isAfter(endDate); d = d.plusDays(1)) { if (d.equals(today)) { - // 오늘 → hourly/5min 테이블 조회 LocalDateTime todayStart = today.atStartOfDay(); LocalDateTime todayEnd = endTime.isAfter(LocalDateTime.now()) ? LocalDateTime.now() : endTime; if (todayStart.isBefore(startTime)) todayStart = startTime; + + // 현재 시각의 정각 + LocalDateTime currentHour = LocalDateTime.now().withMinute(0).withSecond(0).withNano(0); + if (todayEnd.isAfter(todayStart)) { todayRange = new DateRange(todayStart, todayEnd); + + // hourlyRange: todayStart ~ currentHour (정각 이전, hourly 캐시에 있음) + if (currentHour.isAfter(todayStart)) { + hourlyRange = new DateRange(todayStart, currentHour); + } + + // fiveMinRange: currentHour ~ todayEnd (현재 시각대, 5min 캐시에 있음) + if (todayEnd.isAfter(currentHour)) { + fiveMinRange = new DateRange(currentHour, todayEnd); + } } } else if (d.isAfter(today)) { // 미래 날짜 → 무시 @@ -468,7 +471,15 @@ public class DailyTrackCacheManager { // DB 조회 필요 날짜를 연속 범위로 묶기 List dbRanges = mergeConsecutiveDates(dbDates, startTime, endTime); - return new SplitQueryResult(cachedDates, dbRanges, todayRange); + log.info("[CACHE-MONITOR] splitQueryRange [{}, {}): cachedDays={}, dbRanges={}, hourlyRange={}, fiveMinRange={}, todayRange={}", + startTime, endTime, + cachedDates.size(), + dbRanges.size(), + hourlyRange != null ? hourlyRange.getStart() + "~" + hourlyRange.getEnd() : "null", + fiveMinRange != null ? fiveMinRange.getStart() + "~" + fiveMinRange.getEnd() : "null", + todayRange != null ? todayRange.getStart() + "~" + todayRange.getEnd() : "null"); + + return new SplitQueryResult(cachedDates, dbRanges, todayRange, hourlyRange, fiveMinRange); } /** diff --git a/src/main/resources/application-local.yml b/src/main/resources/application-local.yml index 62cbc62..937aa09 100644 --- a/src/main/resources/application-local.yml +++ b/src/main/resources/application-local.yml @@ -49,11 +49,11 @@ vessel: batch: scheduler: enabled: true - chunk-size: 1000 + chunk-size: 10000 partition-size: 4 fetch-size: 50000 bulk-insert: - batch-size: 1000 + batch-size: 10000 parallel-threads: 2 # ── AIS API 수집 설정 ── diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index f46b6ba..ebe3e3a 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -273,6 +273,14 @@ app: ais-target: ttl-minutes: 120 + five-min-track: + ttl-minutes: 75 + max-size: 500000 + + hourly-track: + ttl-hours: 26 + max-size: 780000 + # 일일 항적 데이터 인메모리 캐시 cache: daily-track: diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 32a3aae..36d4be7 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -272,6 +272,14 @@ app: ttl-minutes: 120 # 기본 TTL (프로파일별 오버라이드) max-size: 300000 # 최대 캐시 크기 (30만 건) + five-min-track: + ttl-minutes: 75 # TTL 75분 (1시간 + 15분 여유) + max-size: 500000 # 30K MMSI × 15 버킷 + + hourly-track: + ttl-hours: 26 # TTL 26시간 (24시간 + 2시간 여유) + max-size: 780000 # 30K MMSI × 26시간 + chnprmship: mmsi-resource-path: classpath:chnprmship-mmsi.txt ttl-days: 2