Merge pull request 'feat: 다계층 인메모리 캐시(L1/L2/L3) 조회 통합 + CACHE-MONITOR 로그' (#6) from feature/multilevel-track-cache into develop

This commit is contained in:
htlee 2026-02-19 13:34:22 +09:00
커밋 29bf116246
16개의 변경된 파일772개의 추가작업 그리고 70개의 파일을 삭제

파일 보기

@ -1,6 +1,7 @@
package gc.mda.signal_batch.batch.job;
import gc.mda.signal_batch.batch.listener.JobCompletionListener;
import gc.mda.signal_batch.batch.reader.HourlyTrackCache;
import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -17,9 +18,11 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import java.time.LocalDateTime;
@Slf4j
@Configuration
@Profile("!query") // query 프로파일에서는 배치 작업 비활성화
@Profile("!query")
@RequiredArgsConstructor
@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true)
public class DailyAggregationJobConfig {
@ -28,6 +31,7 @@ public class DailyAggregationJobConfig {
private final DailyAggregationStepConfig dailyAggregationStepConfig;
private final JobCompletionListener jobCompletionListener;
private final DailyTrackCacheManager dailyTrackCacheManager;
private final HourlyTrackCache hourlyTrackCache;
@Bean
public Job dailyAggregationJob() {
@ -53,14 +57,26 @@ public class DailyAggregationJobConfig {
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus().isUnsuccessful()) {
log.warn("Daily aggregation job failed, skipping cache refresh");
log.warn("[CACHE-MONITOR] DailyJob FAILED — L2/L3 cleanup 건너뜀, status={}",
jobExecution.getStatus());
return;
}
try {
log.info("Daily aggregation job completed, refreshing daily track cache");
log.info("[CACHE-MONITOR] DailyJob 완료 → L3 refresh 시작, L2 size={}",
hourlyTrackCache.size());
dailyTrackCacheManager.refreshAfterDailyJob();
// hourly 캐시에서 어제 범위 제거
String startTime = jobExecution.getJobParameters().getString("startTime");
String endTime = jobExecution.getJobParameters().getString("endTime");
LocalDateTime start = LocalDateTime.parse(startTime);
LocalDateTime end = LocalDateTime.parse(endTime);
long l2Before = hourlyTrackCache.size();
hourlyTrackCache.removeRange(start, end);
log.info("[CACHE-MONITOR] DailyJob → L2 cleanup [{}, {}): L2 before={}, after={}, L2 stats=[{}]",
start, end, l2Before, hourlyTrackCache.size(), hourlyTrackCache.getStats());
} catch (Exception e) {
log.error("Failed to refresh daily track cache after job: {}", e.getMessage());
log.error("[CACHE-MONITOR] DailyJob 캐시 갱신/정리 실패: {}", e.getMessage());
}
}
};

파일 보기

@ -1,9 +1,12 @@
package gc.mda.signal_batch.batch.job;
import gc.mda.signal_batch.batch.listener.JobCompletionListener;
import gc.mda.signal_batch.batch.reader.FiveMinTrackCache;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.JobParametersValidator;
import org.springframework.batch.core.job.DefaultJobParametersValidator;
import org.springframework.batch.core.job.builder.JobBuilder;
@ -14,31 +17,61 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import java.time.LocalDateTime;
@Slf4j
@Configuration
@Profile("!query") // query 프로파일에서는 배치 작업 비활성화
@Profile("!query")
@RequiredArgsConstructor
@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true)
public class HourlyAggregationJobConfig {
private final JobRepository jobRepository;
private final HourlyAggregationStepConfig hourlyAggregationStepConfig;
private final VesselStaticStepConfig vesselStaticStepConfig;
private final JobCompletionListener jobCompletionListener;
private final FiveMinTrackCache fiveMinTrackCache;
@Bean
public Job hourlyAggregationJob() {
return new JobBuilder("hourlyAggregationJob", jobRepository)
.incrementer(new RunIdIncrementer())
.validator(hourlyJobParametersValidator())
.listener(jobCompletionListener)
.listener(hourlyFiveMinCleanupListener())
.start(hourlyAggregationStepConfig.mergeHourlyTracksStep())
.next(hourlyAggregationStepConfig.gridHourlySummaryStep())
.next(hourlyAggregationStepConfig.areaHourlySummaryStep())
.next(vesselStaticStepConfig.vesselStaticSyncStep())
.build();
}
@Bean
public JobExecutionListener hourlyFiveMinCleanupListener() {
return new JobExecutionListener() {
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus().isUnsuccessful()) {
log.info("[CACHE-MONITOR] HourlyJob FAILED — L1 cleanup 건너뜀, status={}",
jobExecution.getStatus());
return;
}
try {
String startTime = jobExecution.getJobParameters().getString("startTime");
String endTime = jobExecution.getJobParameters().getString("endTime");
LocalDateTime start = LocalDateTime.parse(startTime);
LocalDateTime end = LocalDateTime.parse(endTime);
long l1Before = fiveMinTrackCache.size();
fiveMinTrackCache.removeRange(start, end);
log.info("[CACHE-MONITOR] HourlyJob 완료 → L1 cleanup [{}, {}): L1 before={}, after={}, L1 stats=[{}]",
start, end, l1Before, fiveMinTrackCache.size(), fiveMinTrackCache.getStats());
} catch (Exception e) {
log.error("[CACHE-MONITOR] L1 cleanup 실패: {}", e.getMessage());
}
}
};
}
@Bean
public JobParametersValidator hourlyJobParametersValidator() {
DefaultJobParametersValidator validator = new DefaultJobParametersValidator();

파일 보기

@ -6,6 +6,7 @@ import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetecti
import gc.mda.signal_batch.batch.processor.HourlyTrackMergeProcessor;
import gc.mda.signal_batch.batch.reader.CacheBasedHourlyTrackReader;
import gc.mda.signal_batch.batch.reader.FiveMinTrackCache;
import gc.mda.signal_batch.batch.reader.HourlyTrackCache;
import gc.mda.signal_batch.batch.writer.VesselTrackBulkWriter;
import gc.mda.signal_batch.batch.writer.AbnormalTrackWriter;
import gc.mda.signal_batch.batch.writer.CompositeTrackWriter;
@ -44,6 +45,7 @@ public class HourlyAggregationStepConfig {
private final AbnormalTrackWriter abnormalTrackWriter;
private final AbnormalTrackDetector abnormalTrackDetector;
private final FiveMinTrackCache fiveMinTrackCache;
private final HourlyTrackCache hourlyTrackCache;
public HourlyAggregationStepConfig(
JobRepository jobRepository,
@ -52,7 +54,8 @@ public class HourlyAggregationStepConfig {
VesselTrackBulkWriter vesselTrackBulkWriter,
AbnormalTrackWriter abnormalTrackWriter,
AbnormalTrackDetector abnormalTrackDetector,
FiveMinTrackCache fiveMinTrackCache) {
FiveMinTrackCache fiveMinTrackCache,
HourlyTrackCache hourlyTrackCache) {
this.jobRepository = jobRepository;
this.queryDataSource = queryDataSource;
this.transactionManager = transactionManager;
@ -60,6 +63,7 @@ public class HourlyAggregationStepConfig {
this.abnormalTrackWriter = abnormalTrackWriter;
this.abnormalTrackDetector = abnormalTrackDetector;
this.fiveMinTrackCache = fiveMinTrackCache;
this.hourlyTrackCache = hourlyTrackCache;
}
@Value("${vessel.batch.chunk-size:5000}")
@ -72,11 +76,13 @@ public class HourlyAggregationStepConfig {
@Bean
public Step mergeHourlyTracksStep() {
log.info("Building mergeHourlyTracksStep with cache-based in-memory merge");
HourlyTrackMergeProcessor processor = hourlyTrackMergeProcessor(null);
return new StepBuilder("mergeHourlyTracksStep", jobRepository)
.<List<VesselTrack>, AbnormalDetectionResult>chunk(chunkSize, transactionManager)
.reader(cacheBasedHourlyTrackReader(null, null))
.processor(hourlyTrackMergeProcessor(null))
.processor(processor)
.writer(hourlyCompositeTrackWriter())
.listener(processor)
.build();
}
@ -115,7 +121,8 @@ public class HourlyAggregationStepConfig {
return new CompositeTrackWriter(
vesselTrackBulkWriter,
abnormalTrackWriter,
"hourly"
"hourly",
hourlyTrackCache
);
}

파일 보기

@ -5,6 +5,9 @@ import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
import gc.mda.signal_batch.global.util.LineStringMUtils;
import gc.mda.signal_batch.global.util.TrackSimplificationUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.jdbc.core.JdbcTemplate;
@ -29,7 +32,8 @@ import java.util.regex.Pattern;
* N+1 SQL 제거 DB 쿼리 최대 1회 (비정상 검출용 이전 버킷 prefetch)
*/
@Slf4j
public class HourlyTrackMergeProcessor implements ItemProcessor<List<VesselTrack>, AbnormalDetectionResult> {
public class HourlyTrackMergeProcessor
implements ItemProcessor<List<VesselTrack>, AbnormalDetectionResult>, StepExecutionListener {
private static final Pattern WKT_COORDS_PATTERN = Pattern.compile("LINESTRING M\\((.+)\\)");
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@ -42,6 +46,13 @@ public class HourlyTrackMergeProcessor implements ItemProcessor<List<VesselTrack
private Map<String, VesselTrack> previousBucketCache;
private boolean previousBucketLoaded = false;
// Step 레벨 집계 카운터
private int totalProcessed = 0;
private int mergeFailCount = 0;
private int simplifiedCount = 0;
private int abnormalCount = 0;
private int avgSpeedFailCount = 0;
public HourlyTrackMergeProcessor(
AbnormalTrackDetector abnormalTrackDetector,
JdbcTemplate queryJdbcTemplate,
@ -58,11 +69,12 @@ public class HourlyTrackMergeProcessor implements ItemProcessor<List<VesselTrack
}
String mmsi = fiveMinTracks.get(0).getMmsi();
totalProcessed++;
// Step 1: WKT 좌표 병합
String mergedWkt = mergeTrackGeometries(fiveMinTracks);
if (mergedWkt == null) {
log.debug("병합 실패 (유효한 좌표 없음): mmsi={}", mmsi);
mergeFailCount++;
return null;
}
@ -94,10 +106,7 @@ public class HourlyTrackMergeProcessor implements ItemProcessor<List<VesselTrack
int simplifiedPoints = countWktPoints(simplifiedWkt);
if (!mergedWkt.equals(simplifiedWkt)) {
TrackSimplificationUtils.SimplificationStats stats =
TrackSimplificationUtils.getSimplificationStats(mergedWkt, simplifiedWkt);
log.debug("시간별 궤적 간소화 - vessel: {}, 원본: {}포인트, 간소화: {}포인트 ({}% 감소)",
mmsi, stats.originalPoints, stats.simplifiedPoints, (int) stats.reductionRate);
simplifiedCount++;
}
VesselTrack hourlyTrack = VesselTrack.builder()
@ -122,8 +131,7 @@ public class HourlyTrackMergeProcessor implements ItemProcessor<List<VesselTrack
AbnormalDetectionResult result = abnormalTrackDetector.detectBucketTransitionOnly(hourlyTrack, prevTrack);
if (result.hasAbnormalities()) {
log.debug("Hourly 비정상 궤적 검출: mmsi={}, segments={}",
mmsi, result.getAbnormalSegments().size());
abnormalCount++;
}
return result;
@ -189,11 +197,18 @@ public class HourlyTrackMergeProcessor implements ItemProcessor<List<VesselTrack
return BigDecimal.valueOf(avgSpeedVal).setScale(2, RoundingMode.HALF_UP);
} catch (Exception e) {
log.debug("avgSpeed 계산 실패: {}", e.getMessage());
avgSpeedFailCount++;
return BigDecimal.ZERO;
}
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
log.debug("Hourly 병합 처리 집계 — 총: {}, 병합실패: {}, 간소화: {}, 비정상: {}, avgSpeed실패: {}",
totalProcessed, mergeFailCount, simplifiedCount, abnormalCount, avgSpeedFailCount);
return null;
}
private int countWktPoints(String wkt) {
if (wkt == null || !wkt.startsWith("LINESTRING M")) return 0;
try {
@ -222,6 +237,7 @@ public class HourlyTrackMergeProcessor implements ItemProcessor<List<VesselTrack
""";
Map<String, VesselTrack> result = new HashMap<>();
int[] parseFailCount = {0};
try {
queryJdbcTemplate.query(sql,
@ -231,17 +247,24 @@ public class HourlyTrackMergeProcessor implements ItemProcessor<List<VesselTrack
},
rs -> {
String mmsi = rs.getString("mmsi");
VesselTrack.TrackPosition endPos = parseEndPosition(rs.getString("end_position"));
if (endPos == null && rs.getString("end_position") != null) {
parseFailCount[0]++;
}
VesselTrack track = VesselTrack.builder()
.mmsi(mmsi)
.timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime())
.trackGeom(rs.getString("last_segment"))
.endPosition(parseEndPosition(rs.getString("end_position")))
.endPosition(endPos)
.build();
result.put(mmsi, track);
});
log.info("이전 버킷 트랙 prefetch 완료: {} 선박 (기간: {} ~ {})",
result.size(), prevStart, prevEnd);
if (parseFailCount[0] > 0) {
log.debug("end_position 파싱 실패: {} 건", parseFailCount[0]);
}
} catch (Exception e) {
log.warn("이전 버킷 트랙 prefetch 실패 (첫 실행일 수 있음): {}", e.getMessage());
}
@ -264,7 +287,6 @@ public class HourlyTrackMergeProcessor implements ItemProcessor<List<VesselTrack
.sog(sog != null ? new BigDecimal(sog) : null)
.build();
} catch (Exception e) {
log.debug("end_position 파싱 실패: {}", json);
return null;
}
}

파일 보기

@ -130,6 +130,7 @@ public class CacheBasedHourlyTrackReader implements ItemReader<List<VesselTrack>
""";
Map<String, List<VesselTrack>> result = new LinkedHashMap<>();
int[] parseFailCount = {0};
queryJdbcTemplate.query(sql,
ps -> {
@ -139,6 +140,12 @@ public class CacheBasedHourlyTrackReader implements ItemReader<List<VesselTrack>
},
rs -> {
String mmsi = rs.getString("mmsi");
VesselTrack.TrackPosition startPos = parseTrackPosition(rs.getString("start_position"));
VesselTrack.TrackPosition endPos = parseTrackPosition(rs.getString("end_position"));
if ((startPos == null && rs.getString("start_position") != null)
|| (endPos == null && rs.getString("end_position") != null)) {
parseFailCount[0]++;
}
VesselTrack track = VesselTrack.builder()
.mmsi(mmsi)
.timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime())
@ -147,8 +154,8 @@ public class CacheBasedHourlyTrackReader implements ItemReader<List<VesselTrack>
.avgSpeed(rs.getBigDecimal("avg_speed"))
.maxSpeed(rs.getBigDecimal("max_speed"))
.pointCount(rs.getInt("point_count"))
.startPosition(parseTrackPosition(rs.getString("start_position")))
.endPosition(parseTrackPosition(rs.getString("end_position")))
.startPosition(startPos)
.endPosition(endPos)
.build();
result.computeIfAbsent(mmsi, k -> new ArrayList<>()).add(track);
@ -156,6 +163,9 @@ public class CacheBasedHourlyTrackReader implements ItemReader<List<VesselTrack>
log.info("DB fallback 완료: {} 선박, {} 트랙",
result.size(), result.values().stream().mapToInt(List::size).sum());
if (parseFailCount[0] > 0) {
log.debug("TrackPosition 파싱 실패: {} 건", parseFailCount[0]);
}
return result;
}
@ -174,7 +184,6 @@ public class CacheBasedHourlyTrackReader implements ItemReader<List<VesselTrack>
.sog(sogStr != null ? new BigDecimal(sogStr) : null)
.build();
} catch (Exception e) {
log.debug("TrackPosition 파싱 실패: {}", json);
return null;
}
}

파일 보기

@ -56,9 +56,12 @@ public class FiveMinTrackCache {
public void putAll(List<VesselTrack> tracks) {
if (tracks == null) return;
long beforeSize = cache.estimatedSize();
for (VesselTrack track : tracks) {
put(track);
}
log.info("[CACHE-MONITOR] L1.putAll: input={}, cacheBefore={}, cacheAfter={}, stats=[{}]",
tracks.size(), beforeSize, cache.estimatedSize(), getStats());
}
/**
@ -85,9 +88,28 @@ public class FiveMinTrackCache {
tracks.sort(Comparator.comparing(VesselTrack::getTimeBucket));
}
int totalTracks = result.values().stream().mapToInt(List::size).sum();
log.info("[CACHE-MONITOR] L1.getTracksInRange [{}, {}): mmsi={}, tracks={}, cacheTotal={}",
start, end, result.size(), totalTracks, cache.estimatedSize());
return result;
}
/**
* 지정 시간 범위의 캐시 항목 제거 (hourly merge 완료 호출)
*/
public void removeRange(LocalDateTime start, LocalDateTime end) {
long before = cache.estimatedSize();
cache.asMap().entrySet().removeIf(entry -> {
VesselTrack track = entry.getValue();
return track.getTimeBucket() != null
&& !track.getTimeBucket().isBefore(start)
&& track.getTimeBucket().isBefore(end);
});
long after = cache.estimatedSize();
log.info("[CACHE-MONITOR] L1.removeRange [{}, {}): removed={}, before={}, after={}, stats=[{}]",
start, end, before - after, before, after, getStats());
}
public long size() {
return cache.estimatedSize();
}

파일 보기

@ -0,0 +1,128 @@
package gc.mda.signal_batch.batch.reader;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* Hourly VesselTrack 인메모리 캐시 (L2)
*
* hourly 집계 DB 저장과 동시에 캐시에 보관.
* 조회 서비스(GisServiceV2)에서 오늘 정각 이전 구간을 DB 대신 캐시에서 즉시 응답.
*
* key: "mmsi::timeBucket" (: "440123456::2026-02-19T09:00")
* value: VesselTrack
* TTL: 26시간 (24시간 + 2시간 여유, daily merge removeRange로 제거)
*/
@Slf4j
@Component
public class HourlyTrackCache {
private static final DateTimeFormatter KEY_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm");
private Cache<String, VesselTrack> cache;
@Value("${app.cache.hourly-track.ttl-hours:26}")
private long ttlHours;
@Value("${app.cache.hourly-track.max-size:780000}")
private int maxSize;
@PostConstruct
public void init() {
this.cache = Caffeine.newBuilder()
.maximumSize(maxSize)
.expireAfterWrite(ttlHours, TimeUnit.HOURS)
.recordStats()
.build();
log.info("HourlyTrackCache 초기화 — TTL: {}시간, maxSize: {}", ttlHours, maxSize);
}
public void put(VesselTrack track) {
if (track == null || track.getMmsi() == null || track.getTimeBucket() == null) {
return;
}
cache.put(buildKey(track.getMmsi(), track.getTimeBucket()), track);
}
public void putAll(List<VesselTrack> tracks) {
if (tracks == null) return;
long beforeSize = cache.estimatedSize();
for (VesselTrack track : tracks) {
put(track);
}
log.info("[CACHE-MONITOR] L2.putAll: input={}, cacheBefore={}, cacheAfter={}, stats=[{}]",
tracks.size(), beforeSize, cache.estimatedSize(), getStats());
}
/**
* 지정 시간 범위의 트랙을 MMSI별로 그루핑하여 반환
*
* @param start 시작 시각 (inclusive)
* @param end 종료 시각 (exclusive)
* @return Map<mmsi, List<VesselTrack>> (시간순 정렬)
*/
public Map<String, List<VesselTrack>> getTracksInRange(LocalDateTime start, LocalDateTime end) {
Map<String, List<VesselTrack>> result = new LinkedHashMap<>();
for (Map.Entry<String, VesselTrack> entry : cache.asMap().entrySet()) {
VesselTrack track = entry.getValue();
if (track.getTimeBucket() != null
&& !track.getTimeBucket().isBefore(start)
&& track.getTimeBucket().isBefore(end)) {
result.computeIfAbsent(track.getMmsi(), k -> new ArrayList<>()).add(track);
}
}
for (List<VesselTrack> tracks : result.values()) {
tracks.sort(Comparator.comparing(VesselTrack::getTimeBucket));
}
int totalTracks = result.values().stream().mapToInt(List::size).sum();
log.info("[CACHE-MONITOR] L2.getTracksInRange [{}, {}): mmsi={}, tracks={}, cacheTotal={}",
start, end, result.size(), totalTracks, cache.estimatedSize());
return result;
}
/**
* 지정 시간 범위의 캐시 항목 제거 (daily merge 완료 호출)
*/
public void removeRange(LocalDateTime start, LocalDateTime end) {
long before = cache.estimatedSize();
cache.asMap().entrySet().removeIf(entry -> {
VesselTrack track = entry.getValue();
return track.getTimeBucket() != null
&& !track.getTimeBucket().isBefore(start)
&& track.getTimeBucket().isBefore(end);
});
long after = cache.estimatedSize();
log.info("[CACHE-MONITOR] L2.removeRange [{}, {}): removed={}, before={}, after={}, stats=[{}]",
start, end, before - after, before, after, getStats());
}
public long size() {
return cache.estimatedSize();
}
public String getStats() {
var stats = cache.stats();
return String.format("size=%d, hitRate=%.1f%%, hits=%d, misses=%d",
cache.estimatedSize(),
stats.hitRate() * 100,
stats.hitCount(),
stats.missCount());
}
private String buildKey(String mmsi, LocalDateTime timeBucket) {
return mmsi + "::" + timeBucket.format(KEY_FORMATTER);
}
}

파일 보기

@ -1,8 +1,8 @@
package gc.mda.signal_batch.batch.writer;
import gc.mda.signal_batch.batch.reader.HourlyTrackCache;
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetectionResult;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
@ -19,12 +19,28 @@ import java.util.List;
*/
@Slf4j
@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true)
@RequiredArgsConstructor
public class CompositeTrackWriter implements ItemWriter<AbnormalDetectionResult> {
private final VesselTrackBulkWriter vesselTrackBulkWriter;
private final AbnormalTrackWriter abnormalTrackWriter;
private final String targetTable;
private final HourlyTrackCache hourlyTrackCache; // nullable (daily writer는 미사용)
public CompositeTrackWriter(VesselTrackBulkWriter vesselTrackBulkWriter,
AbnormalTrackWriter abnormalTrackWriter,
String targetTable,
HourlyTrackCache hourlyTrackCache) {
this.vesselTrackBulkWriter = vesselTrackBulkWriter;
this.abnormalTrackWriter = abnormalTrackWriter;
this.targetTable = targetTable;
this.hourlyTrackCache = hourlyTrackCache;
}
public CompositeTrackWriter(VesselTrackBulkWriter vesselTrackBulkWriter,
AbnormalTrackWriter abnormalTrackWriter,
String targetTable) {
this(vesselTrackBulkWriter, abnormalTrackWriter, targetTable, null);
}
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
@ -67,6 +83,12 @@ public class CompositeTrackWriter implements ItemWriter<AbnormalDetectionResult>
if (!normalTracks.isEmpty()) {
if ("hourly".equals(targetTable)) {
vesselTrackBulkWriter.writeHourlyTracks(normalTracks);
if (hourlyTrackCache != null) {
long l2Before = hourlyTrackCache.size();
hourlyTrackCache.putAll(normalTracks);
log.info("[CACHE-MONITOR] CompositeTrackWriter → L2.putAll: tracks={}, L2 before={}, after={}",
normalTracks.size(), l2Before, hourlyTrackCache.size());
}
} else if ("daily".equals(targetTable)) {
vesselTrackBulkWriter.writeDailyTracks(normalTracks);
} else {

파일 보기

@ -1,10 +1,14 @@
package gc.mda.signal_batch.domain.gis.service;
import gc.mda.signal_batch.batch.reader.FiveMinTrackCache;
import gc.mda.signal_batch.batch.reader.HourlyTrackCache;
import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack;
import gc.mda.signal_batch.domain.vessel.dto.TrackResponse;
import gc.mda.signal_batch.domain.vessel.dto.VesselTracksRequest;
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
import gc.mda.signal_batch.global.exception.QueryTimeoutException;
import gc.mda.signal_batch.global.util.TrackConverter;
import gc.mda.signal_batch.global.util.VesselTrackToCompactConverter;
import gc.mda.signal_batch.global.websocket.service.ActiveQueryManager;
import gc.mda.signal_batch.global.websocket.service.CacheTrackSimplifier;
import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager;
@ -39,6 +43,9 @@ public class GisServiceV2 {
private final DailyTrackCacheManager dailyTrackCacheManager;
private final CacheTrackSimplifier cacheTrackSimplifier;
private final GisService gisService;
private final HourlyTrackCache hourlyTrackCache;
private final FiveMinTrackCache fiveMinTrackCache;
private final VesselTrackToCompactConverter vesselTrackToCompactConverter;
@Value("${rest.v2.query.timeout-seconds:30}")
private int restQueryTimeout;
@ -54,12 +61,18 @@ public class GisServiceV2 {
ActiveQueryManager activeQueryManager,
DailyTrackCacheManager dailyTrackCacheManager,
CacheTrackSimplifier cacheTrackSimplifier,
GisService gisService) {
GisService gisService,
HourlyTrackCache hourlyTrackCache,
FiveMinTrackCache fiveMinTrackCache,
VesselTrackToCompactConverter vesselTrackToCompactConverter) {
this.queryDataSource = queryDataSource;
this.activeQueryManager = activeQueryManager;
this.dailyTrackCacheManager = dailyTrackCacheManager;
this.cacheTrackSimplifier = cacheTrackSimplifier;
this.gisService = gisService;
this.hourlyTrackCache = hourlyTrackCache;
this.fiveMinTrackCache = fiveMinTrackCache;
this.vesselTrackToCompactConverter = vesselTrackToCompactConverter;
}
/**
@ -332,18 +345,52 @@ public class GisServiceV2 {
}
}
// 3. 오늘 구간 DB 조회 (hourly + 5min)
if (split.hasTodayRange()) {
DailyTrackCacheManager.DateRange today = split.getTodayRange();
VesselTracksRequest todayRequest = VesselTracksRequest.builder()
.startTime(today.getStart())
.endTime(today.getEnd())
.vessels(request.getVessels())
.build();
List<CompactVesselTrack> todayTracks = gisService.getVesselTracks(todayRequest);
allTracks.addAll(todayTracks);
log.debug("[CacheQuery] today {} ~ {} -> {} tracks",
today.getStart(), today.getEnd(), todayTracks.size());
// 3-a. hourly 범위 L2 캐시 DB fallback
if (split.hasHourlyRange()) {
DailyTrackCacheManager.DateRange hr = split.getHourlyRange();
Map<String, List<VesselTrack>> hourlyTracks =
hourlyTrackCache.getTracksInRange(hr.getStart(), hr.getEnd());
if (!hourlyTracks.isEmpty()) {
Map<String, List<VesselTrack>> filtered = filterByMmsi(hourlyTracks, requestedMmsis);
List<CompactVesselTrack> converted = vesselTrackToCompactConverter.convert(filtered);
allTracks.addAll(converted);
int totalPts = converted.stream().mapToInt(CompactVesselTrack::getPointCount).sum();
log.info("[CACHE-MONITOR] queryWithCache L2 HIT [{}, {}): cacheVessels={}, filteredVessels={}, compactTracks={}, points={}",
hr.getStart(), hr.getEnd(), hourlyTracks.size(), filtered.size(), converted.size(), totalPts);
} else {
VesselTracksRequest hourlyReq = VesselTracksRequest.builder()
.startTime(hr.getStart()).endTime(hr.getEnd())
.vessels(request.getVessels()).build();
List<CompactVesselTrack> dbResult = gisService.getVesselTracks(hourlyReq);
allTracks.addAll(dbResult);
log.info("[CACHE-MONITOR] queryWithCache L2 MISS → DB fallback [{}, {}): dbTracks={}",
hr.getStart(), hr.getEnd(), dbResult.size());
}
}
// 3-b. 5min 범위 L1 캐시 DB fallback
if (split.hasFiveMinRange()) {
DailyTrackCacheManager.DateRange fr = split.getFiveMinRange();
Map<String, List<VesselTrack>> fiveMinTracks =
fiveMinTrackCache.getTracksInRange(fr.getStart(), fr.getEnd());
if (!fiveMinTracks.isEmpty()) {
Map<String, List<VesselTrack>> filtered = filterByMmsi(fiveMinTracks, requestedMmsis);
List<CompactVesselTrack> converted = vesselTrackToCompactConverter.convert(filtered);
allTracks.addAll(converted);
int totalPts = converted.stream().mapToInt(CompactVesselTrack::getPointCount).sum();
log.info("[CACHE-MONITOR] queryWithCache L1 HIT [{}, {}): cacheVessels={}, filteredVessels={}, compactTracks={}, points={}",
fr.getStart(), fr.getEnd(), fiveMinTracks.size(), filtered.size(), converted.size(), totalPts);
} else {
VesselTracksRequest fiveMinReq = VesselTracksRequest.builder()
.startTime(fr.getStart()).endTime(fr.getEnd())
.vessels(request.getVessels()).build();
List<CompactVesselTrack> dbResult = gisService.getVesselTracks(fiveMinReq);
allTracks.addAll(dbResult);
log.info("[CACHE-MONITOR] queryWithCache L1 MISS → DB fallback [{}, {}): dbTracks={}",
fr.getStart(), fr.getEnd(), dbResult.size());
}
}
// 4. 동일 선박 병합 (캐시 + DB 결과)
@ -450,6 +497,22 @@ public class GisServiceV2 {
return merged;
}
// MMSI 필터링
private Map<String, List<VesselTrack>> filterByMmsi(
Map<String, List<VesselTrack>> tracksByMmsi, Set<String> requestedMmsis) {
if (requestedMmsis == null || requestedMmsis.isEmpty()) {
return tracksByMmsi;
}
Map<String, List<VesselTrack>> filtered = new LinkedHashMap<>();
for (Map.Entry<String, List<VesselTrack>> entry : tracksByMmsi.entrySet()) {
if (requestedMmsis.contains(entry.getKey())) {
filtered.put(entry.getKey(), entry.getValue());
}
}
return filtered;
}
// 유틸리티 메서드
private TrackResponse mapTrackResponse(ResultSet rs, int rowNum) throws SQLException {

파일 보기

@ -33,6 +33,21 @@ public class AsyncConfig implements AsyncConfigurer {
return executor;
}
@Bean(name = "cacheWarmupExecutor")
public Executor getCacheWarmupExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(4);
executor.setQueueCapacity(10);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("cache-warmup-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(120);
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {

파일 보기

@ -0,0 +1,184 @@
package gc.mda.signal_batch.global.config;
import gc.mda.signal_batch.batch.reader.FiveMinTrackCache;
import gc.mda.signal_batch.batch.reader.HourlyTrackCache;
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.sql.DataSource;
import java.math.BigDecimal;
import java.sql.*;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
/**
* L1(5min)/L2(hourly)/L3(daily) 통합 캐시 워밍업 오케스트레이터
*
* 기동 전용 cacheWarmupExecutor 스레드에서 비동기 실행.
* 스케줄러/API 스레드와 완전 독립 지연/실패해도 다른 서비스 무영향.
*
* 기존 DailyTrackCacheManager.onApplicationReady() 이벤트 리스너를 대체.
*/
@Slf4j
@Component
public class CacheWarmupService {
private final FiveMinTrackCache fiveMinTrackCache;
private final HourlyTrackCache hourlyTrackCache;
private final DailyTrackCacheManager dailyTrackCacheManager;
private final DataSource queryDataSource;
private final DailyTrackCacheProperties cacheProperties;
public CacheWarmupService(
FiveMinTrackCache fiveMinTrackCache,
HourlyTrackCache hourlyTrackCache,
DailyTrackCacheManager dailyTrackCacheManager,
@Qualifier("queryDataSource") DataSource queryDataSource,
DailyTrackCacheProperties cacheProperties) {
this.fiveMinTrackCache = fiveMinTrackCache;
this.hourlyTrackCache = hourlyTrackCache;
this.dailyTrackCacheManager = dailyTrackCacheManager;
this.queryDataSource = queryDataSource;
this.cacheProperties = cacheProperties;
}
@EventListener(ApplicationReadyEvent.class)
public void onApplicationReady() {
if (!cacheProperties.isEnabled()) {
log.info("Cache warmup skipped (disabled)");
return;
}
warmUpAllCachesAsync();
}
@Async("cacheWarmupExecutor")
public void warmUpAllCachesAsync() {
long totalStart = System.currentTimeMillis();
log.info("[CACHE-MONITOR] === 전체 캐시 워밍업 시작 === thread={}, now={}",
Thread.currentThread().getName(), LocalDateTime.now());
LocalDateTime now = LocalDateTime.now();
LocalDateTime currentHour = now.withMinute(0).withSecond(0).withNano(0);
LocalDateTime todayStart = now.toLocalDate().atStartOfDay();
// 1. L1 워밍업: 현재 시간대 5min 트랙 (: 12:00~12:50)
long l1Start = System.currentTimeMillis();
try {
warmUpFiveMinCache(currentHour, now);
} catch (Exception e) {
log.error("[CACHE-MONITOR] L1(5min) 워밍업 실패 — 계속 진행: {}", e.getMessage());
}
long l1Elapsed = System.currentTimeMillis() - l1Start;
// 2. L2 워밍업: 오늘 정각 이전 hourly 트랙 (: 00:00~12:00)
long l2Start = System.currentTimeMillis();
try {
warmUpHourlyCache(todayStart, currentHour);
} catch (Exception e) {
log.error("[CACHE-MONITOR] L2(hourly) 워밍업 실패 — 계속 진행: {}", e.getMessage());
}
long l2Elapsed = System.currentTimeMillis() - l2Start;
// 3. L3 워밍업: D-1 ~ D-N daily (기존 DailyTrackCacheManager 위임)
long l3Start = System.currentTimeMillis();
try {
dailyTrackCacheManager.warmUpCache();
} catch (Exception e) {
log.error("[CACHE-MONITOR] L3(daily) 워밍업 실패: {}", e.getMessage());
}
long l3Elapsed = System.currentTimeMillis() - l3Start;
long elapsed = System.currentTimeMillis() - totalStart;
log.info("[CACHE-MONITOR] === 전체 캐시 워밍업 완료 === totalElapsed={}ms, L1={}ms(size={}), L2={}ms(size={}), L3={}ms, thread={}",
elapsed, l1Elapsed, fiveMinTrackCache.size(), l2Elapsed, hourlyTrackCache.size(), l3Elapsed, Thread.currentThread().getName());
}
/**
* L1 워밍업: t_vessel_tracks_5min에서 현재 시간대 DB 로드
* : 12:54 기동 12:00~12:50 (10건/MMSI) 로드
*/
private void warmUpFiveMinCache(LocalDateTime hourStart, LocalDateTime now) {
String sql = "SELECT mmsi, time_bucket, " +
"public.ST_AsText(track_geom) as track_geom, " +
"distance_nm, avg_speed, max_speed, point_count " +
"FROM signal.t_vessel_tracks_5min " +
"WHERE time_bucket >= ? AND time_bucket < ?";
List<VesselTrack> tracks = loadVesselTracksFromDb(sql, hourStart, now);
if (!tracks.isEmpty()) {
long distinctMmsi = tracks.stream().map(VesselTrack::getMmsi).distinct().count();
fiveMinTrackCache.putAll(tracks);
log.info("[CACHE-MONITOR] L1 워밍업 완료: [{}, {}) → {} 건, {} MMSI, cacheSize={}",
hourStart, now, tracks.size(), distinctMmsi, fiveMinTrackCache.size());
} else {
log.info("[CACHE-MONITOR] L1 워밍업: [{}, {}) → 데이터 없음 (정상)", hourStart, now);
}
}
/**
* L2 워밍업: t_vessel_tracks_hourly에서 오늘 정각 이전 DB 로드
* : 12:54 기동 00:00~12:00 (최대 12건/MMSI) 로드
*/
private void warmUpHourlyCache(LocalDateTime todayStart, LocalDateTime currentHour) {
if (!currentHour.isAfter(todayStart)) {
log.info("[CACHE-MONITOR] L2 워밍업: 자정 직후 — 건너뜀");
return;
}
String sql = "SELECT mmsi, time_bucket, " +
"public.ST_AsText(track_geom) as track_geom, " +
"distance_nm, avg_speed, max_speed, point_count " +
"FROM signal.t_vessel_tracks_hourly " +
"WHERE time_bucket >= ? AND time_bucket < ?";
List<VesselTrack> tracks = loadVesselTracksFromDb(sql, todayStart, currentHour);
if (!tracks.isEmpty()) {
long distinctMmsi = tracks.stream().map(VesselTrack::getMmsi).distinct().count();
hourlyTrackCache.putAll(tracks);
log.info("[CACHE-MONITOR] L2 워밍업 완료: [{}, {}) → {} 건, {} MMSI, cacheSize={}",
todayStart, currentHour, tracks.size(), distinctMmsi, hourlyTrackCache.size());
} else {
log.info("[CACHE-MONITOR] L2 워밍업: [{}, {}) → 데이터 없음 (정상)",
todayStart, currentHour);
}
}
/**
* DB에서 VesselTrack 리스트 로드 (L1/L2 공용)
*/
private List<VesselTrack> loadVesselTracksFromDb(
String sql, LocalDateTime start, LocalDateTime end) {
List<VesselTrack> result = new ArrayList<>();
try (Connection conn = queryDataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setTimestamp(1, Timestamp.valueOf(start));
ps.setTimestamp(2, Timestamp.valueOf(end));
ps.setFetchSize(10000);
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
VesselTrack track = VesselTrack.builder()
.mmsi(rs.getString("mmsi"))
.timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime())
.trackGeom(rs.getString("track_geom"))
.distanceNm(BigDecimal.valueOf(rs.getDouble("distance_nm")))
.avgSpeed(BigDecimal.valueOf(rs.getDouble("avg_speed")))
.maxSpeed(BigDecimal.valueOf(rs.getDouble("max_speed")))
.pointCount(rs.getInt("point_count"))
.build();
result.add(track);
}
}
} catch (Exception e) {
log.error("DB 워밍업 쿼리 실패 [{}, {}): {}", start, end, e.getMessage());
}
return result;
}
}

파일 보기

@ -0,0 +1,154 @@
package gc.mda.signal_batch.global.util;
import gc.mda.signal_batch.batch.reader.AisTargetCacheManager;
import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack;
import gc.mda.signal_batch.domain.vessel.model.AisTargetEntity;
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.LineString;
import org.locationtech.jts.io.ParseException;
import org.locationtech.jts.io.WKTReader;
import org.springframework.stereotype.Component;
import java.util.*;
/**
* VesselTrack(배치 도메인) CompactVesselTrack(조회 응답 DTO) 변환
*
* DailyTrackCacheManager.loadDay() VesselAccumulator 패턴 재사용.
* L1(5min)/L2(hourly) 캐시 데이터를 REST/WebSocket 응답 형태로 변환.
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class VesselTrackToCompactConverter {
private static final ThreadLocal<WKTReader> wktReaderLocal = ThreadLocal.withInitial(WKTReader::new);
private final AisTargetCacheManager aisTargetCacheManager;
/**
* MMSI별 VesselTrack 리스트 CompactVesselTrack 리스트 변환
*
* @param tracksByMmsi MMSI VesselTrack 리스트 (시간순 정렬 전제)
* @return CompactVesselTrack 리스트 (MMSI별 1건, geometry/timestamps/speeds 병합)
*/
public List<CompactVesselTrack> convert(Map<String, List<VesselTrack>> tracksByMmsi) {
if (tracksByMmsi == null || tracksByMmsi.isEmpty()) {
return Collections.emptyList();
}
long startMs = System.currentTimeMillis();
int inputTrackCount = tracksByMmsi.values().stream().mapToInt(List::size).sum();
// 선박 정보 일괄 조회
List<String> mmsiList = new ArrayList<>(tracksByMmsi.keySet());
Map<String, AisTargetEntity> vesselInfoMap = aisTargetCacheManager.getAll(mmsiList);
long vesselInfoMs = System.currentTimeMillis();
List<CompactVesselTrack> result = new ArrayList<>(tracksByMmsi.size());
WKTReader wktReader = wktReaderLocal.get();
for (Map.Entry<String, List<VesselTrack>> entry : tracksByMmsi.entrySet()) {
String mmsi = entry.getKey();
List<VesselTrack> tracks = entry.getValue();
CompactVesselTrack compact = convertSingleVessel(mmsi, tracks, vesselInfoMap.get(mmsi), wktReader);
if (compact != null) {
result.add(compact);
}
}
int totalPoints = result.stream().mapToInt(CompactVesselTrack::getPointCount).sum();
long elapsed = System.currentTimeMillis() - startMs;
log.info("[CACHE-MONITOR] VesselTrackToCompactConverter.convert: inputMmsi={}, inputTracks={}, outputCompact={}, totalPoints={}, vesselInfoLookup={}ms, totalElapsed={}ms",
tracksByMmsi.size(), inputTrackCount, result.size(), totalPoints, vesselInfoMs - startMs, elapsed);
return result;
}
private CompactVesselTrack convertSingleVessel(
String mmsi,
List<VesselTrack> tracks,
AisTargetEntity vesselInfo,
WKTReader wktReader) {
List<double[]> geometry = new ArrayList<>();
List<String> timestamps = new ArrayList<>();
List<Double> speeds = new ArrayList<>();
double totalDistance = 0;
double maxSpeed = 0;
for (VesselTrack track : tracks) {
String trackGeomWkt = track.getTrackGeom();
if (trackGeomWkt == null || trackGeomWkt.isEmpty() || "LINESTRING EMPTY".equals(trackGeomWkt)) {
continue;
}
try {
LineString lineString = (LineString) wktReader.read(trackGeomWkt);
if (lineString.getNumPoints() == 0) continue;
Coordinate[] coords = lineString.getCoordinates();
for (Coordinate coord : coords) {
geometry.add(new double[]{coord.x, coord.y});
if (!Double.isNaN(coord.getM())) {
timestamps.add(String.valueOf((long) coord.getM()));
} else {
timestamps.add(String.valueOf(
track.getTimeBucket().toEpochSecond(java.time.ZoneOffset.of("+09:00"))));
}
speeds.add(0.0);
}
} catch (ParseException e) {
log.debug("WKT 파싱 실패 — mmsi={}: {}", mmsi, e.getMessage());
}
if (track.getDistanceNm() != null) {
totalDistance += track.getDistanceNm().doubleValue();
}
if (track.getMaxSpeed() != null) {
maxSpeed = Math.max(maxSpeed, track.getMaxSpeed().doubleValue());
}
}
if (geometry.isEmpty()) {
return null;
}
int pointCount = geometry.size();
double avgSpeed = pointCount > 0 ? totalDistance / Math.max(1, pointCount) * 60 : 0;
// 선박 정보 설정
String shipName = null;
String shipType = null;
String shipKindCode = null;
if (vesselInfo != null) {
shipName = vesselInfo.getName();
shipType = vesselInfo.getVesselType();
shipKindCode = SignalKindCode.resolve(vesselInfo.getVesselType(), vesselInfo.getExtraInfo()).getCode();
} else {
shipKindCode = SignalKindCode.resolve(null, null).getCode();
}
String nationalCode = mmsi.length() >= 3 ? mmsi.substring(0, 3) : mmsi;
return CompactVesselTrack.builder()
.vesselId(mmsi)
.nationalCode(nationalCode)
.shipName(shipName)
.shipType(shipType)
.shipKindCode(shipKindCode)
.geometry(geometry)
.timestamps(timestamps)
.speeds(speeds)
.totalDistance(totalDistance)
.avgSpeed(avgSpeed)
.maxSpeed(maxSpeed)
.pointCount(pointCount)
.build();
}
}

파일 보기

@ -11,9 +11,6 @@ import org.locationtech.jts.index.strtree.STRtree;
import org.locationtech.jts.io.ParseException;
import org.locationtech.jts.io.WKTReader;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.sql.DataSource;
@ -100,20 +97,29 @@ public class DailyTrackCacheManager {
public static class SplitQueryResult {
private final List<LocalDate> cachedDates; // 캐시에서 가져올 날짜
private final List<DateRange> dbRanges; // DB 조회 필요 범위 (연속 날짜 묶음)
private final DateRange todayRange; // 오늘 구간 (hourly/5min)
private final DateRange todayRange; // 오늘 구간 (fallback용 유지)
private final DateRange hourlyRange; // 오늘 정각 이전 구간 (L2 캐시)
private final DateRange fiveMinRange; // 현재 시각대 구간 (L1 캐시)
public SplitQueryResult(List<LocalDate> cachedDates, List<DateRange> dbRanges, DateRange todayRange) {
public SplitQueryResult(List<LocalDate> cachedDates, List<DateRange> dbRanges,
DateRange todayRange, DateRange hourlyRange, DateRange fiveMinRange) {
this.cachedDates = cachedDates;
this.dbRanges = dbRanges;
this.todayRange = todayRange;
this.hourlyRange = hourlyRange;
this.fiveMinRange = fiveMinRange;
}
public List<LocalDate> getCachedDates() { return cachedDates; }
public List<DateRange> getDbRanges() { return dbRanges; }
public DateRange getTodayRange() { return todayRange; }
public DateRange getHourlyRange() { return hourlyRange; }
public DateRange getFiveMinRange() { return fiveMinRange; }
public boolean hasCachedData() { return !cachedDates.isEmpty(); }
public boolean hasDbRanges() { return !dbRanges.isEmpty(); }
public boolean hasTodayRange() { return todayRange != null; }
public boolean hasHourlyRange() { return hourlyRange != null; }
public boolean hasFiveMinRange() { return fiveMinRange != null; }
}
public static class DateRange {
@ -129,29 +135,11 @@ public class DailyTrackCacheManager {
public LocalDateTime getEnd() { return end; }
}
// 비동기 캐시 워밍업
@EventListener(ApplicationReadyEvent.class)
public void onApplicationReady() {
if (!cacheProperties.isEnabled()) {
status.set(CacheStatus.DISABLED);
log.info("Daily track cache is disabled");
return;
}
if (cacheProperties.isWarmupAsync()) {
warmUpCacheAsync();
} else {
warmUpCache();
}
}
@Async("trackStreamingExecutor")
public void warmUpCacheAsync() {
warmUpCache();
}
// 캐시 워밍업 (CacheWarmupService에서 호출)
/**
* 캐시 워밍업: D-1 D-2 ... D-N 순서로 최근 우선 로드
* CacheWarmupService.warmUpAllCachesAsync()에서 L3 단계로 호출됨.
*/
public void warmUpCache() {
if (!cacheProperties.isEnabled()) {
@ -441,6 +429,8 @@ public class DailyTrackCacheManager {
List<LocalDate> cachedDates = new ArrayList<>();
List<LocalDate> dbDates = new ArrayList<>();
DateRange todayRange = null;
DateRange hourlyRange = null;
DateRange fiveMinRange = null;
// 요청 범위의 날짜별 분류
LocalDate startDate = startTime.toLocalDate();
@ -448,12 +438,25 @@ public class DailyTrackCacheManager {
for (LocalDate d = startDate; !d.isAfter(endDate); d = d.plusDays(1)) {
if (d.equals(today)) {
// 오늘 hourly/5min 테이블 조회
LocalDateTime todayStart = today.atStartOfDay();
LocalDateTime todayEnd = endTime.isAfter(LocalDateTime.now()) ? LocalDateTime.now() : endTime;
if (todayStart.isBefore(startTime)) todayStart = startTime;
// 현재 시각의 정각
LocalDateTime currentHour = LocalDateTime.now().withMinute(0).withSecond(0).withNano(0);
if (todayEnd.isAfter(todayStart)) {
todayRange = new DateRange(todayStart, todayEnd);
// hourlyRange: todayStart ~ currentHour (정각 이전, hourly 캐시에 있음)
if (currentHour.isAfter(todayStart)) {
hourlyRange = new DateRange(todayStart, currentHour);
}
// fiveMinRange: currentHour ~ todayEnd (현재 시각대, 5min 캐시에 있음)
if (todayEnd.isAfter(currentHour)) {
fiveMinRange = new DateRange(currentHour, todayEnd);
}
}
} else if (d.isAfter(today)) {
// 미래 날짜 무시
@ -468,7 +471,15 @@ public class DailyTrackCacheManager {
// DB 조회 필요 날짜를 연속 범위로 묶기
List<DateRange> dbRanges = mergeConsecutiveDates(dbDates, startTime, endTime);
return new SplitQueryResult(cachedDates, dbRanges, todayRange);
log.info("[CACHE-MONITOR] splitQueryRange [{}, {}): cachedDays={}, dbRanges={}, hourlyRange={}, fiveMinRange={}, todayRange={}",
startTime, endTime,
cachedDates.size(),
dbRanges.size(),
hourlyRange != null ? hourlyRange.getStart() + "~" + hourlyRange.getEnd() : "null",
fiveMinRange != null ? fiveMinRange.getStart() + "~" + fiveMinRange.getEnd() : "null",
todayRange != null ? todayRange.getStart() + "~" + todayRange.getEnd() : "null");
return new SplitQueryResult(cachedDates, dbRanges, todayRange, hourlyRange, fiveMinRange);
}
/**

파일 보기

@ -49,11 +49,11 @@ vessel:
batch:
scheduler:
enabled: true
chunk-size: 1000
chunk-size: 10000
partition-size: 4
fetch-size: 50000
bulk-insert:
batch-size: 1000
batch-size: 10000
parallel-threads: 2
# ── AIS API 수집 설정 ──

파일 보기

@ -273,6 +273,14 @@ app:
ais-target:
ttl-minutes: 120
five-min-track:
ttl-minutes: 75
max-size: 500000
hourly-track:
ttl-hours: 26
max-size: 780000
# 일일 항적 데이터 인메모리 캐시
cache:
daily-track:

파일 보기

@ -272,6 +272,14 @@ app:
ttl-minutes: 120 # 기본 TTL (프로파일별 오버라이드)
max-size: 300000 # 최대 캐시 크기 (30만 건)
five-min-track:
ttl-minutes: 75 # TTL 75분 (1시간 + 15분 여유)
max-size: 500000 # 30K MMSI × 15 버킷
hourly-track:
ttl-hours: 26 # TTL 26시간 (24시간 + 2시간 여유)
max-size: 780000 # 30K MMSI × 26시간
chnprmship:
mmsi-resource-path: classpath:chnprmship-mmsi.txt
ttl-days: 2