feat: 다계층 인메모리 캐시(L1/L2/L3) 조회 통합 + CACHE-MONITOR 로그 #6
@ -1,6 +1,7 @@
|
||||
package gc.mda.signal_batch.batch.job;
|
||||
|
||||
import gc.mda.signal_batch.batch.listener.JobCompletionListener;
|
||||
import gc.mda.signal_batch.batch.reader.HourlyTrackCache;
|
||||
import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -17,9 +18,11 @@ import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Slf4j
|
||||
@Configuration
|
||||
@Profile("!query") // query 프로파일에서는 배치 작업 비활성화
|
||||
@Profile("!query")
|
||||
@RequiredArgsConstructor
|
||||
@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class DailyAggregationJobConfig {
|
||||
@ -28,6 +31,7 @@ public class DailyAggregationJobConfig {
|
||||
private final DailyAggregationStepConfig dailyAggregationStepConfig;
|
||||
private final JobCompletionListener jobCompletionListener;
|
||||
private final DailyTrackCacheManager dailyTrackCacheManager;
|
||||
private final HourlyTrackCache hourlyTrackCache;
|
||||
|
||||
@Bean
|
||||
public Job dailyAggregationJob() {
|
||||
@ -53,14 +57,26 @@ public class DailyAggregationJobConfig {
|
||||
@Override
|
||||
public void afterJob(JobExecution jobExecution) {
|
||||
if (jobExecution.getStatus().isUnsuccessful()) {
|
||||
log.warn("Daily aggregation job failed, skipping cache refresh");
|
||||
log.warn("[CACHE-MONITOR] DailyJob FAILED — L2/L3 cleanup 건너뜀, status={}",
|
||||
jobExecution.getStatus());
|
||||
return;
|
||||
}
|
||||
try {
|
||||
log.info("Daily aggregation job completed, refreshing daily track cache");
|
||||
log.info("[CACHE-MONITOR] DailyJob 완료 → L3 refresh 시작, L2 size={}",
|
||||
hourlyTrackCache.size());
|
||||
dailyTrackCacheManager.refreshAfterDailyJob();
|
||||
|
||||
// hourly 캐시에서 어제 범위 제거
|
||||
String startTime = jobExecution.getJobParameters().getString("startTime");
|
||||
String endTime = jobExecution.getJobParameters().getString("endTime");
|
||||
LocalDateTime start = LocalDateTime.parse(startTime);
|
||||
LocalDateTime end = LocalDateTime.parse(endTime);
|
||||
long l2Before = hourlyTrackCache.size();
|
||||
hourlyTrackCache.removeRange(start, end);
|
||||
log.info("[CACHE-MONITOR] DailyJob → L2 cleanup [{}, {}): L2 before={}, after={}, L2 stats=[{}]",
|
||||
start, end, l2Before, hourlyTrackCache.size(), hourlyTrackCache.getStats());
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to refresh daily track cache after job: {}", e.getMessage());
|
||||
log.error("[CACHE-MONITOR] DailyJob 캐시 갱신/정리 실패: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@ -1,9 +1,12 @@
|
||||
package gc.mda.signal_batch.batch.job;
|
||||
|
||||
import gc.mda.signal_batch.batch.listener.JobCompletionListener;
|
||||
import gc.mda.signal_batch.batch.reader.FiveMinTrackCache;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.JobExecution;
|
||||
import org.springframework.batch.core.JobExecutionListener;
|
||||
import org.springframework.batch.core.JobParametersValidator;
|
||||
import org.springframework.batch.core.job.DefaultJobParametersValidator;
|
||||
import org.springframework.batch.core.job.builder.JobBuilder;
|
||||
@ -14,31 +17,61 @@ import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Slf4j
|
||||
@Configuration
|
||||
@Profile("!query") // query 프로파일에서는 배치 작업 비활성화
|
||||
@Profile("!query")
|
||||
@RequiredArgsConstructor
|
||||
@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class HourlyAggregationJobConfig {
|
||||
|
||||
|
||||
private final JobRepository jobRepository;
|
||||
private final HourlyAggregationStepConfig hourlyAggregationStepConfig;
|
||||
private final VesselStaticStepConfig vesselStaticStepConfig;
|
||||
private final JobCompletionListener jobCompletionListener;
|
||||
|
||||
private final FiveMinTrackCache fiveMinTrackCache;
|
||||
|
||||
@Bean
|
||||
public Job hourlyAggregationJob() {
|
||||
return new JobBuilder("hourlyAggregationJob", jobRepository)
|
||||
.incrementer(new RunIdIncrementer())
|
||||
.validator(hourlyJobParametersValidator())
|
||||
.listener(jobCompletionListener)
|
||||
.listener(hourlyFiveMinCleanupListener())
|
||||
.start(hourlyAggregationStepConfig.mergeHourlyTracksStep())
|
||||
.next(hourlyAggregationStepConfig.gridHourlySummaryStep())
|
||||
.next(hourlyAggregationStepConfig.areaHourlySummaryStep())
|
||||
.next(vesselStaticStepConfig.vesselStaticSyncStep())
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public JobExecutionListener hourlyFiveMinCleanupListener() {
|
||||
return new JobExecutionListener() {
|
||||
@Override
|
||||
public void afterJob(JobExecution jobExecution) {
|
||||
if (jobExecution.getStatus().isUnsuccessful()) {
|
||||
log.info("[CACHE-MONITOR] HourlyJob FAILED — L1 cleanup 건너뜀, status={}",
|
||||
jobExecution.getStatus());
|
||||
return;
|
||||
}
|
||||
try {
|
||||
String startTime = jobExecution.getJobParameters().getString("startTime");
|
||||
String endTime = jobExecution.getJobParameters().getString("endTime");
|
||||
LocalDateTime start = LocalDateTime.parse(startTime);
|
||||
LocalDateTime end = LocalDateTime.parse(endTime);
|
||||
long l1Before = fiveMinTrackCache.size();
|
||||
fiveMinTrackCache.removeRange(start, end);
|
||||
log.info("[CACHE-MONITOR] HourlyJob 완료 → L1 cleanup [{}, {}): L1 before={}, after={}, L1 stats=[{}]",
|
||||
start, end, l1Before, fiveMinTrackCache.size(), fiveMinTrackCache.getStats());
|
||||
} catch (Exception e) {
|
||||
log.error("[CACHE-MONITOR] L1 cleanup 실패: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Bean
|
||||
public JobParametersValidator hourlyJobParametersValidator() {
|
||||
DefaultJobParametersValidator validator = new DefaultJobParametersValidator();
|
||||
|
||||
@ -6,6 +6,7 @@ import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetecti
|
||||
import gc.mda.signal_batch.batch.processor.HourlyTrackMergeProcessor;
|
||||
import gc.mda.signal_batch.batch.reader.CacheBasedHourlyTrackReader;
|
||||
import gc.mda.signal_batch.batch.reader.FiveMinTrackCache;
|
||||
import gc.mda.signal_batch.batch.reader.HourlyTrackCache;
|
||||
import gc.mda.signal_batch.batch.writer.VesselTrackBulkWriter;
|
||||
import gc.mda.signal_batch.batch.writer.AbnormalTrackWriter;
|
||||
import gc.mda.signal_batch.batch.writer.CompositeTrackWriter;
|
||||
@ -44,6 +45,7 @@ public class HourlyAggregationStepConfig {
|
||||
private final AbnormalTrackWriter abnormalTrackWriter;
|
||||
private final AbnormalTrackDetector abnormalTrackDetector;
|
||||
private final FiveMinTrackCache fiveMinTrackCache;
|
||||
private final HourlyTrackCache hourlyTrackCache;
|
||||
|
||||
public HourlyAggregationStepConfig(
|
||||
JobRepository jobRepository,
|
||||
@ -52,7 +54,8 @@ public class HourlyAggregationStepConfig {
|
||||
VesselTrackBulkWriter vesselTrackBulkWriter,
|
||||
AbnormalTrackWriter abnormalTrackWriter,
|
||||
AbnormalTrackDetector abnormalTrackDetector,
|
||||
FiveMinTrackCache fiveMinTrackCache) {
|
||||
FiveMinTrackCache fiveMinTrackCache,
|
||||
HourlyTrackCache hourlyTrackCache) {
|
||||
this.jobRepository = jobRepository;
|
||||
this.queryDataSource = queryDataSource;
|
||||
this.transactionManager = transactionManager;
|
||||
@ -60,6 +63,7 @@ public class HourlyAggregationStepConfig {
|
||||
this.abnormalTrackWriter = abnormalTrackWriter;
|
||||
this.abnormalTrackDetector = abnormalTrackDetector;
|
||||
this.fiveMinTrackCache = fiveMinTrackCache;
|
||||
this.hourlyTrackCache = hourlyTrackCache;
|
||||
}
|
||||
|
||||
@Value("${vessel.batch.chunk-size:5000}")
|
||||
@ -72,11 +76,13 @@ public class HourlyAggregationStepConfig {
|
||||
@Bean
|
||||
public Step mergeHourlyTracksStep() {
|
||||
log.info("Building mergeHourlyTracksStep with cache-based in-memory merge");
|
||||
HourlyTrackMergeProcessor processor = hourlyTrackMergeProcessor(null);
|
||||
return new StepBuilder("mergeHourlyTracksStep", jobRepository)
|
||||
.<List<VesselTrack>, AbnormalDetectionResult>chunk(chunkSize, transactionManager)
|
||||
.reader(cacheBasedHourlyTrackReader(null, null))
|
||||
.processor(hourlyTrackMergeProcessor(null))
|
||||
.processor(processor)
|
||||
.writer(hourlyCompositeTrackWriter())
|
||||
.listener(processor)
|
||||
.build();
|
||||
}
|
||||
|
||||
@ -115,7 +121,8 @@ public class HourlyAggregationStepConfig {
|
||||
return new CompositeTrackWriter(
|
||||
vesselTrackBulkWriter,
|
||||
abnormalTrackWriter,
|
||||
"hourly"
|
||||
"hourly",
|
||||
hourlyTrackCache
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@ -5,6 +5,9 @@ import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
|
||||
import gc.mda.signal_batch.global.util.LineStringMUtils;
|
||||
import gc.mda.signal_batch.global.util.TrackSimplificationUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.core.ExitStatus;
|
||||
import org.springframework.batch.core.StepExecution;
|
||||
import org.springframework.batch.core.StepExecutionListener;
|
||||
import org.springframework.batch.item.ItemProcessor;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
@ -29,7 +32,8 @@ import java.util.regex.Pattern;
|
||||
* N+1 SQL 제거 → DB 쿼리 최대 1회 (비정상 검출용 이전 버킷 prefetch)
|
||||
*/
|
||||
@Slf4j
|
||||
public class HourlyTrackMergeProcessor implements ItemProcessor<List<VesselTrack>, AbnormalDetectionResult> {
|
||||
public class HourlyTrackMergeProcessor
|
||||
implements ItemProcessor<List<VesselTrack>, AbnormalDetectionResult>, StepExecutionListener {
|
||||
|
||||
private static final Pattern WKT_COORDS_PATTERN = Pattern.compile("LINESTRING M\\((.+)\\)");
|
||||
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
@ -42,6 +46,13 @@ public class HourlyTrackMergeProcessor implements ItemProcessor<List<VesselTrack
|
||||
private Map<String, VesselTrack> previousBucketCache;
|
||||
private boolean previousBucketLoaded = false;
|
||||
|
||||
// Step 레벨 집계 카운터
|
||||
private int totalProcessed = 0;
|
||||
private int mergeFailCount = 0;
|
||||
private int simplifiedCount = 0;
|
||||
private int abnormalCount = 0;
|
||||
private int avgSpeedFailCount = 0;
|
||||
|
||||
public HourlyTrackMergeProcessor(
|
||||
AbnormalTrackDetector abnormalTrackDetector,
|
||||
JdbcTemplate queryJdbcTemplate,
|
||||
@ -58,11 +69,12 @@ public class HourlyTrackMergeProcessor implements ItemProcessor<List<VesselTrack
|
||||
}
|
||||
|
||||
String mmsi = fiveMinTracks.get(0).getMmsi();
|
||||
totalProcessed++;
|
||||
|
||||
// Step 1: WKT 좌표 병합
|
||||
String mergedWkt = mergeTrackGeometries(fiveMinTracks);
|
||||
if (mergedWkt == null) {
|
||||
log.debug("병합 실패 (유효한 좌표 없음): mmsi={}", mmsi);
|
||||
mergeFailCount++;
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -94,10 +106,7 @@ public class HourlyTrackMergeProcessor implements ItemProcessor<List<VesselTrack
|
||||
int simplifiedPoints = countWktPoints(simplifiedWkt);
|
||||
|
||||
if (!mergedWkt.equals(simplifiedWkt)) {
|
||||
TrackSimplificationUtils.SimplificationStats stats =
|
||||
TrackSimplificationUtils.getSimplificationStats(mergedWkt, simplifiedWkt);
|
||||
log.debug("시간별 궤적 간소화 - vessel: {}, 원본: {}포인트, 간소화: {}포인트 ({}% 감소)",
|
||||
mmsi, stats.originalPoints, stats.simplifiedPoints, (int) stats.reductionRate);
|
||||
simplifiedCount++;
|
||||
}
|
||||
|
||||
VesselTrack hourlyTrack = VesselTrack.builder()
|
||||
@ -122,8 +131,7 @@ public class HourlyTrackMergeProcessor implements ItemProcessor<List<VesselTrack
|
||||
AbnormalDetectionResult result = abnormalTrackDetector.detectBucketTransitionOnly(hourlyTrack, prevTrack);
|
||||
|
||||
if (result.hasAbnormalities()) {
|
||||
log.debug("Hourly 비정상 궤적 검출: mmsi={}, segments={}",
|
||||
mmsi, result.getAbnormalSegments().size());
|
||||
abnormalCount++;
|
||||
}
|
||||
|
||||
return result;
|
||||
@ -189,11 +197,18 @@ public class HourlyTrackMergeProcessor implements ItemProcessor<List<VesselTrack
|
||||
|
||||
return BigDecimal.valueOf(avgSpeedVal).setScale(2, RoundingMode.HALF_UP);
|
||||
} catch (Exception e) {
|
||||
log.debug("avgSpeed 계산 실패: {}", e.getMessage());
|
||||
avgSpeedFailCount++;
|
||||
return BigDecimal.ZERO;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExitStatus afterStep(StepExecution stepExecution) {
|
||||
log.debug("Hourly 병합 처리 집계 — 총: {}, 병합실패: {}, 간소화: {}, 비정상: {}, avgSpeed실패: {}",
|
||||
totalProcessed, mergeFailCount, simplifiedCount, abnormalCount, avgSpeedFailCount);
|
||||
return null;
|
||||
}
|
||||
|
||||
private int countWktPoints(String wkt) {
|
||||
if (wkt == null || !wkt.startsWith("LINESTRING M")) return 0;
|
||||
try {
|
||||
@ -222,6 +237,7 @@ public class HourlyTrackMergeProcessor implements ItemProcessor<List<VesselTrack
|
||||
""";
|
||||
|
||||
Map<String, VesselTrack> result = new HashMap<>();
|
||||
int[] parseFailCount = {0};
|
||||
|
||||
try {
|
||||
queryJdbcTemplate.query(sql,
|
||||
@ -231,17 +247,24 @@ public class HourlyTrackMergeProcessor implements ItemProcessor<List<VesselTrack
|
||||
},
|
||||
rs -> {
|
||||
String mmsi = rs.getString("mmsi");
|
||||
VesselTrack.TrackPosition endPos = parseEndPosition(rs.getString("end_position"));
|
||||
if (endPos == null && rs.getString("end_position") != null) {
|
||||
parseFailCount[0]++;
|
||||
}
|
||||
VesselTrack track = VesselTrack.builder()
|
||||
.mmsi(mmsi)
|
||||
.timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime())
|
||||
.trackGeom(rs.getString("last_segment"))
|
||||
.endPosition(parseEndPosition(rs.getString("end_position")))
|
||||
.endPosition(endPos)
|
||||
.build();
|
||||
result.put(mmsi, track);
|
||||
});
|
||||
|
||||
log.info("이전 버킷 트랙 prefetch 완료: {} 선박 (기간: {} ~ {})",
|
||||
result.size(), prevStart, prevEnd);
|
||||
if (parseFailCount[0] > 0) {
|
||||
log.debug("end_position 파싱 실패: {} 건", parseFailCount[0]);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("이전 버킷 트랙 prefetch 실패 (첫 실행일 수 있음): {}", e.getMessage());
|
||||
}
|
||||
@ -264,7 +287,6 @@ public class HourlyTrackMergeProcessor implements ItemProcessor<List<VesselTrack
|
||||
.sog(sog != null ? new BigDecimal(sog) : null)
|
||||
.build();
|
||||
} catch (Exception e) {
|
||||
log.debug("end_position 파싱 실패: {}", json);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@ -130,6 +130,7 @@ public class CacheBasedHourlyTrackReader implements ItemReader<List<VesselTrack>
|
||||
""";
|
||||
|
||||
Map<String, List<VesselTrack>> result = new LinkedHashMap<>();
|
||||
int[] parseFailCount = {0};
|
||||
|
||||
queryJdbcTemplate.query(sql,
|
||||
ps -> {
|
||||
@ -139,6 +140,12 @@ public class CacheBasedHourlyTrackReader implements ItemReader<List<VesselTrack>
|
||||
},
|
||||
rs -> {
|
||||
String mmsi = rs.getString("mmsi");
|
||||
VesselTrack.TrackPosition startPos = parseTrackPosition(rs.getString("start_position"));
|
||||
VesselTrack.TrackPosition endPos = parseTrackPosition(rs.getString("end_position"));
|
||||
if ((startPos == null && rs.getString("start_position") != null)
|
||||
|| (endPos == null && rs.getString("end_position") != null)) {
|
||||
parseFailCount[0]++;
|
||||
}
|
||||
VesselTrack track = VesselTrack.builder()
|
||||
.mmsi(mmsi)
|
||||
.timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime())
|
||||
@ -147,8 +154,8 @@ public class CacheBasedHourlyTrackReader implements ItemReader<List<VesselTrack>
|
||||
.avgSpeed(rs.getBigDecimal("avg_speed"))
|
||||
.maxSpeed(rs.getBigDecimal("max_speed"))
|
||||
.pointCount(rs.getInt("point_count"))
|
||||
.startPosition(parseTrackPosition(rs.getString("start_position")))
|
||||
.endPosition(parseTrackPosition(rs.getString("end_position")))
|
||||
.startPosition(startPos)
|
||||
.endPosition(endPos)
|
||||
.build();
|
||||
|
||||
result.computeIfAbsent(mmsi, k -> new ArrayList<>()).add(track);
|
||||
@ -156,6 +163,9 @@ public class CacheBasedHourlyTrackReader implements ItemReader<List<VesselTrack>
|
||||
|
||||
log.info("DB fallback 완료: {} 선박, {} 트랙",
|
||||
result.size(), result.values().stream().mapToInt(List::size).sum());
|
||||
if (parseFailCount[0] > 0) {
|
||||
log.debug("TrackPosition 파싱 실패: {} 건", parseFailCount[0]);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -174,7 +184,6 @@ public class CacheBasedHourlyTrackReader implements ItemReader<List<VesselTrack>
|
||||
.sog(sogStr != null ? new BigDecimal(sogStr) : null)
|
||||
.build();
|
||||
} catch (Exception e) {
|
||||
log.debug("TrackPosition 파싱 실패: {}", json);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@ -56,9 +56,12 @@ public class FiveMinTrackCache {
|
||||
|
||||
public void putAll(List<VesselTrack> tracks) {
|
||||
if (tracks == null) return;
|
||||
long beforeSize = cache.estimatedSize();
|
||||
for (VesselTrack track : tracks) {
|
||||
put(track);
|
||||
}
|
||||
log.info("[CACHE-MONITOR] L1.putAll: input={}, cacheBefore={}, cacheAfter={}, stats=[{}]",
|
||||
tracks.size(), beforeSize, cache.estimatedSize(), getStats());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -85,9 +88,28 @@ public class FiveMinTrackCache {
|
||||
tracks.sort(Comparator.comparing(VesselTrack::getTimeBucket));
|
||||
}
|
||||
|
||||
int totalTracks = result.values().stream().mapToInt(List::size).sum();
|
||||
log.info("[CACHE-MONITOR] L1.getTracksInRange [{}, {}): mmsi={}, tracks={}, cacheTotal={}",
|
||||
start, end, result.size(), totalTracks, cache.estimatedSize());
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 지정 시간 범위의 캐시 항목 제거 (hourly merge 완료 후 호출)
|
||||
*/
|
||||
public void removeRange(LocalDateTime start, LocalDateTime end) {
|
||||
long before = cache.estimatedSize();
|
||||
cache.asMap().entrySet().removeIf(entry -> {
|
||||
VesselTrack track = entry.getValue();
|
||||
return track.getTimeBucket() != null
|
||||
&& !track.getTimeBucket().isBefore(start)
|
||||
&& track.getTimeBucket().isBefore(end);
|
||||
});
|
||||
long after = cache.estimatedSize();
|
||||
log.info("[CACHE-MONITOR] L1.removeRange [{}, {}): removed={}, before={}, after={}, stats=[{}]",
|
||||
start, end, before - after, before, after, getStats());
|
||||
}
|
||||
|
||||
public long size() {
|
||||
return cache.estimatedSize();
|
||||
}
|
||||
|
||||
@ -0,0 +1,128 @@
|
||||
package gc.mda.signal_batch.batch.reader;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Hourly VesselTrack 인메모리 캐시 (L2)
|
||||
*
|
||||
* hourly 집계 후 DB 저장과 동시에 캐시에 보관.
|
||||
* 조회 서비스(GisServiceV2)에서 오늘 정각 이전 구간을 DB 대신 캐시에서 즉시 응답.
|
||||
*
|
||||
* key: "mmsi::timeBucket" (예: "440123456::2026-02-19T09:00")
|
||||
* value: VesselTrack
|
||||
* TTL: 26시간 (24시간 + 2시간 여유, daily merge 후 removeRange로 제거)
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class HourlyTrackCache {
|
||||
|
||||
private static final DateTimeFormatter KEY_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm");
|
||||
|
||||
private Cache<String, VesselTrack> cache;
|
||||
|
||||
@Value("${app.cache.hourly-track.ttl-hours:26}")
|
||||
private long ttlHours;
|
||||
|
||||
@Value("${app.cache.hourly-track.max-size:780000}")
|
||||
private int maxSize;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
this.cache = Caffeine.newBuilder()
|
||||
.maximumSize(maxSize)
|
||||
.expireAfterWrite(ttlHours, TimeUnit.HOURS)
|
||||
.recordStats()
|
||||
.build();
|
||||
log.info("HourlyTrackCache 초기화 — TTL: {}시간, maxSize: {}", ttlHours, maxSize);
|
||||
}
|
||||
|
||||
public void put(VesselTrack track) {
|
||||
if (track == null || track.getMmsi() == null || track.getTimeBucket() == null) {
|
||||
return;
|
||||
}
|
||||
cache.put(buildKey(track.getMmsi(), track.getTimeBucket()), track);
|
||||
}
|
||||
|
||||
public void putAll(List<VesselTrack> tracks) {
|
||||
if (tracks == null) return;
|
||||
long beforeSize = cache.estimatedSize();
|
||||
for (VesselTrack track : tracks) {
|
||||
put(track);
|
||||
}
|
||||
log.info("[CACHE-MONITOR] L2.putAll: input={}, cacheBefore={}, cacheAfter={}, stats=[{}]",
|
||||
tracks.size(), beforeSize, cache.estimatedSize(), getStats());
|
||||
}
|
||||
|
||||
/**
|
||||
* 지정 시간 범위의 트랙을 MMSI별로 그루핑하여 반환
|
||||
*
|
||||
* @param start 시작 시각 (inclusive)
|
||||
* @param end 종료 시각 (exclusive)
|
||||
* @return Map<mmsi, List<VesselTrack>> (시간순 정렬)
|
||||
*/
|
||||
public Map<String, List<VesselTrack>> getTracksInRange(LocalDateTime start, LocalDateTime end) {
|
||||
Map<String, List<VesselTrack>> result = new LinkedHashMap<>();
|
||||
|
||||
for (Map.Entry<String, VesselTrack> entry : cache.asMap().entrySet()) {
|
||||
VesselTrack track = entry.getValue();
|
||||
if (track.getTimeBucket() != null
|
||||
&& !track.getTimeBucket().isBefore(start)
|
||||
&& track.getTimeBucket().isBefore(end)) {
|
||||
result.computeIfAbsent(track.getMmsi(), k -> new ArrayList<>()).add(track);
|
||||
}
|
||||
}
|
||||
|
||||
for (List<VesselTrack> tracks : result.values()) {
|
||||
tracks.sort(Comparator.comparing(VesselTrack::getTimeBucket));
|
||||
}
|
||||
|
||||
int totalTracks = result.values().stream().mapToInt(List::size).sum();
|
||||
log.info("[CACHE-MONITOR] L2.getTracksInRange [{}, {}): mmsi={}, tracks={}, cacheTotal={}",
|
||||
start, end, result.size(), totalTracks, cache.estimatedSize());
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 지정 시간 범위의 캐시 항목 제거 (daily merge 완료 후 호출)
|
||||
*/
|
||||
public void removeRange(LocalDateTime start, LocalDateTime end) {
|
||||
long before = cache.estimatedSize();
|
||||
cache.asMap().entrySet().removeIf(entry -> {
|
||||
VesselTrack track = entry.getValue();
|
||||
return track.getTimeBucket() != null
|
||||
&& !track.getTimeBucket().isBefore(start)
|
||||
&& track.getTimeBucket().isBefore(end);
|
||||
});
|
||||
long after = cache.estimatedSize();
|
||||
log.info("[CACHE-MONITOR] L2.removeRange [{}, {}): removed={}, before={}, after={}, stats=[{}]",
|
||||
start, end, before - after, before, after, getStats());
|
||||
}
|
||||
|
||||
public long size() {
|
||||
return cache.estimatedSize();
|
||||
}
|
||||
|
||||
public String getStats() {
|
||||
var stats = cache.stats();
|
||||
return String.format("size=%d, hitRate=%.1f%%, hits=%d, misses=%d",
|
||||
cache.estimatedSize(),
|
||||
stats.hitRate() * 100,
|
||||
stats.hitCount(),
|
||||
stats.missCount());
|
||||
}
|
||||
|
||||
private String buildKey(String mmsi, LocalDateTime timeBucket) {
|
||||
return mmsi + "::" + timeBucket.format(KEY_FORMATTER);
|
||||
}
|
||||
}
|
||||
@ -1,8 +1,8 @@
|
||||
package gc.mda.signal_batch.batch.writer;
|
||||
|
||||
import gc.mda.signal_batch.batch.reader.HourlyTrackCache;
|
||||
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
|
||||
import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetectionResult;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.core.StepExecution;
|
||||
import org.springframework.batch.core.annotation.BeforeStep;
|
||||
@ -19,12 +19,28 @@ import java.util.List;
|
||||
*/
|
||||
@Slf4j
|
||||
@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true)
|
||||
@RequiredArgsConstructor
|
||||
public class CompositeTrackWriter implements ItemWriter<AbnormalDetectionResult> {
|
||||
|
||||
|
||||
private final VesselTrackBulkWriter vesselTrackBulkWriter;
|
||||
private final AbnormalTrackWriter abnormalTrackWriter;
|
||||
private final String targetTable;
|
||||
private final HourlyTrackCache hourlyTrackCache; // nullable (daily writer는 미사용)
|
||||
|
||||
public CompositeTrackWriter(VesselTrackBulkWriter vesselTrackBulkWriter,
|
||||
AbnormalTrackWriter abnormalTrackWriter,
|
||||
String targetTable,
|
||||
HourlyTrackCache hourlyTrackCache) {
|
||||
this.vesselTrackBulkWriter = vesselTrackBulkWriter;
|
||||
this.abnormalTrackWriter = abnormalTrackWriter;
|
||||
this.targetTable = targetTable;
|
||||
this.hourlyTrackCache = hourlyTrackCache;
|
||||
}
|
||||
|
||||
public CompositeTrackWriter(VesselTrackBulkWriter vesselTrackBulkWriter,
|
||||
AbnormalTrackWriter abnormalTrackWriter,
|
||||
String targetTable) {
|
||||
this(vesselTrackBulkWriter, abnormalTrackWriter, targetTable, null);
|
||||
}
|
||||
|
||||
@BeforeStep
|
||||
public void beforeStep(StepExecution stepExecution) {
|
||||
@ -67,6 +83,12 @@ public class CompositeTrackWriter implements ItemWriter<AbnormalDetectionResult>
|
||||
if (!normalTracks.isEmpty()) {
|
||||
if ("hourly".equals(targetTable)) {
|
||||
vesselTrackBulkWriter.writeHourlyTracks(normalTracks);
|
||||
if (hourlyTrackCache != null) {
|
||||
long l2Before = hourlyTrackCache.size();
|
||||
hourlyTrackCache.putAll(normalTracks);
|
||||
log.info("[CACHE-MONITOR] CompositeTrackWriter → L2.putAll: tracks={}, L2 before={}, after={}",
|
||||
normalTracks.size(), l2Before, hourlyTrackCache.size());
|
||||
}
|
||||
} else if ("daily".equals(targetTable)) {
|
||||
vesselTrackBulkWriter.writeDailyTracks(normalTracks);
|
||||
} else {
|
||||
|
||||
@ -1,10 +1,14 @@
|
||||
package gc.mda.signal_batch.domain.gis.service;
|
||||
|
||||
import gc.mda.signal_batch.batch.reader.FiveMinTrackCache;
|
||||
import gc.mda.signal_batch.batch.reader.HourlyTrackCache;
|
||||
import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack;
|
||||
import gc.mda.signal_batch.domain.vessel.dto.TrackResponse;
|
||||
import gc.mda.signal_batch.domain.vessel.dto.VesselTracksRequest;
|
||||
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
|
||||
import gc.mda.signal_batch.global.exception.QueryTimeoutException;
|
||||
import gc.mda.signal_batch.global.util.TrackConverter;
|
||||
import gc.mda.signal_batch.global.util.VesselTrackToCompactConverter;
|
||||
import gc.mda.signal_batch.global.websocket.service.ActiveQueryManager;
|
||||
import gc.mda.signal_batch.global.websocket.service.CacheTrackSimplifier;
|
||||
import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager;
|
||||
@ -39,6 +43,9 @@ public class GisServiceV2 {
|
||||
private final DailyTrackCacheManager dailyTrackCacheManager;
|
||||
private final CacheTrackSimplifier cacheTrackSimplifier;
|
||||
private final GisService gisService;
|
||||
private final HourlyTrackCache hourlyTrackCache;
|
||||
private final FiveMinTrackCache fiveMinTrackCache;
|
||||
private final VesselTrackToCompactConverter vesselTrackToCompactConverter;
|
||||
|
||||
@Value("${rest.v2.query.timeout-seconds:30}")
|
||||
private int restQueryTimeout;
|
||||
@ -54,12 +61,18 @@ public class GisServiceV2 {
|
||||
ActiveQueryManager activeQueryManager,
|
||||
DailyTrackCacheManager dailyTrackCacheManager,
|
||||
CacheTrackSimplifier cacheTrackSimplifier,
|
||||
GisService gisService) {
|
||||
GisService gisService,
|
||||
HourlyTrackCache hourlyTrackCache,
|
||||
FiveMinTrackCache fiveMinTrackCache,
|
||||
VesselTrackToCompactConverter vesselTrackToCompactConverter) {
|
||||
this.queryDataSource = queryDataSource;
|
||||
this.activeQueryManager = activeQueryManager;
|
||||
this.dailyTrackCacheManager = dailyTrackCacheManager;
|
||||
this.cacheTrackSimplifier = cacheTrackSimplifier;
|
||||
this.gisService = gisService;
|
||||
this.hourlyTrackCache = hourlyTrackCache;
|
||||
this.fiveMinTrackCache = fiveMinTrackCache;
|
||||
this.vesselTrackToCompactConverter = vesselTrackToCompactConverter;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -332,18 +345,52 @@ public class GisServiceV2 {
|
||||
}
|
||||
}
|
||||
|
||||
// 3. 오늘 구간 DB 조회 (hourly + 5min)
|
||||
if (split.hasTodayRange()) {
|
||||
DailyTrackCacheManager.DateRange today = split.getTodayRange();
|
||||
VesselTracksRequest todayRequest = VesselTracksRequest.builder()
|
||||
.startTime(today.getStart())
|
||||
.endTime(today.getEnd())
|
||||
.vessels(request.getVessels())
|
||||
.build();
|
||||
List<CompactVesselTrack> todayTracks = gisService.getVesselTracks(todayRequest);
|
||||
allTracks.addAll(todayTracks);
|
||||
log.debug("[CacheQuery] today {} ~ {} -> {} tracks",
|
||||
today.getStart(), today.getEnd(), todayTracks.size());
|
||||
// 3-a. hourly 범위 → L2 캐시 → DB fallback
|
||||
if (split.hasHourlyRange()) {
|
||||
DailyTrackCacheManager.DateRange hr = split.getHourlyRange();
|
||||
Map<String, List<VesselTrack>> hourlyTracks =
|
||||
hourlyTrackCache.getTracksInRange(hr.getStart(), hr.getEnd());
|
||||
|
||||
if (!hourlyTracks.isEmpty()) {
|
||||
Map<String, List<VesselTrack>> filtered = filterByMmsi(hourlyTracks, requestedMmsis);
|
||||
List<CompactVesselTrack> converted = vesselTrackToCompactConverter.convert(filtered);
|
||||
allTracks.addAll(converted);
|
||||
int totalPts = converted.stream().mapToInt(CompactVesselTrack::getPointCount).sum();
|
||||
log.info("[CACHE-MONITOR] queryWithCache L2 HIT [{}, {}): cacheVessels={}, filteredVessels={}, compactTracks={}, points={}",
|
||||
hr.getStart(), hr.getEnd(), hourlyTracks.size(), filtered.size(), converted.size(), totalPts);
|
||||
} else {
|
||||
VesselTracksRequest hourlyReq = VesselTracksRequest.builder()
|
||||
.startTime(hr.getStart()).endTime(hr.getEnd())
|
||||
.vessels(request.getVessels()).build();
|
||||
List<CompactVesselTrack> dbResult = gisService.getVesselTracks(hourlyReq);
|
||||
allTracks.addAll(dbResult);
|
||||
log.info("[CACHE-MONITOR] queryWithCache L2 MISS → DB fallback [{}, {}): dbTracks={}",
|
||||
hr.getStart(), hr.getEnd(), dbResult.size());
|
||||
}
|
||||
}
|
||||
|
||||
// 3-b. 5min 범위 → L1 캐시 → DB fallback
|
||||
if (split.hasFiveMinRange()) {
|
||||
DailyTrackCacheManager.DateRange fr = split.getFiveMinRange();
|
||||
Map<String, List<VesselTrack>> fiveMinTracks =
|
||||
fiveMinTrackCache.getTracksInRange(fr.getStart(), fr.getEnd());
|
||||
|
||||
if (!fiveMinTracks.isEmpty()) {
|
||||
Map<String, List<VesselTrack>> filtered = filterByMmsi(fiveMinTracks, requestedMmsis);
|
||||
List<CompactVesselTrack> converted = vesselTrackToCompactConverter.convert(filtered);
|
||||
allTracks.addAll(converted);
|
||||
int totalPts = converted.stream().mapToInt(CompactVesselTrack::getPointCount).sum();
|
||||
log.info("[CACHE-MONITOR] queryWithCache L1 HIT [{}, {}): cacheVessels={}, filteredVessels={}, compactTracks={}, points={}",
|
||||
fr.getStart(), fr.getEnd(), fiveMinTracks.size(), filtered.size(), converted.size(), totalPts);
|
||||
} else {
|
||||
VesselTracksRequest fiveMinReq = VesselTracksRequest.builder()
|
||||
.startTime(fr.getStart()).endTime(fr.getEnd())
|
||||
.vessels(request.getVessels()).build();
|
||||
List<CompactVesselTrack> dbResult = gisService.getVesselTracks(fiveMinReq);
|
||||
allTracks.addAll(dbResult);
|
||||
log.info("[CACHE-MONITOR] queryWithCache L1 MISS → DB fallback [{}, {}): dbTracks={}",
|
||||
fr.getStart(), fr.getEnd(), dbResult.size());
|
||||
}
|
||||
}
|
||||
|
||||
// 4. 동일 선박 병합 (캐시 + DB 결과)
|
||||
@ -450,6 +497,22 @@ public class GisServiceV2 {
|
||||
return merged;
|
||||
}
|
||||
|
||||
// ── MMSI 필터링 ──
|
||||
|
||||
private Map<String, List<VesselTrack>> filterByMmsi(
|
||||
Map<String, List<VesselTrack>> tracksByMmsi, Set<String> requestedMmsis) {
|
||||
if (requestedMmsis == null || requestedMmsis.isEmpty()) {
|
||||
return tracksByMmsi;
|
||||
}
|
||||
Map<String, List<VesselTrack>> filtered = new LinkedHashMap<>();
|
||||
for (Map.Entry<String, List<VesselTrack>> entry : tracksByMmsi.entrySet()) {
|
||||
if (requestedMmsis.contains(entry.getKey())) {
|
||||
filtered.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
return filtered;
|
||||
}
|
||||
|
||||
// ── 유틸리티 메서드 ──
|
||||
|
||||
private TrackResponse mapTrackResponse(ResultSet rs, int rowNum) throws SQLException {
|
||||
|
||||
@ -33,6 +33,21 @@ public class AsyncConfig implements AsyncConfigurer {
|
||||
return executor;
|
||||
}
|
||||
|
||||
@Bean(name = "cacheWarmupExecutor")
|
||||
public Executor getCacheWarmupExecutor() {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
executor.setCorePoolSize(2);
|
||||
executor.setMaxPoolSize(4);
|
||||
executor.setQueueCapacity(10);
|
||||
executor.setKeepAliveSeconds(60);
|
||||
executor.setThreadNamePrefix("cache-warmup-");
|
||||
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
executor.setWaitForTasksToCompleteOnShutdown(true);
|
||||
executor.setAwaitTerminationSeconds(120);
|
||||
executor.initialize();
|
||||
return executor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
|
||||
return new AsyncUncaughtExceptionHandler() {
|
||||
|
||||
@ -0,0 +1,184 @@
|
||||
package gc.mda.signal_batch.global.config;
|
||||
|
||||
import gc.mda.signal_batch.batch.reader.FiveMinTrackCache;
|
||||
import gc.mda.signal_batch.batch.reader.HourlyTrackCache;
|
||||
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
|
||||
import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.*;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* L1(5min)/L2(hourly)/L3(daily) 통합 캐시 워밍업 오케스트레이터
|
||||
*
|
||||
* 기동 시 전용 cacheWarmupExecutor 스레드에서 비동기 실행.
|
||||
* 스케줄러/API 스레드와 완전 독립 — 지연/실패해도 다른 서비스 무영향.
|
||||
*
|
||||
* 기존 DailyTrackCacheManager.onApplicationReady() 이벤트 리스너를 대체.
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class CacheWarmupService {
|
||||
|
||||
private final FiveMinTrackCache fiveMinTrackCache;
|
||||
private final HourlyTrackCache hourlyTrackCache;
|
||||
private final DailyTrackCacheManager dailyTrackCacheManager;
|
||||
private final DataSource queryDataSource;
|
||||
private final DailyTrackCacheProperties cacheProperties;
|
||||
|
||||
public CacheWarmupService(
|
||||
FiveMinTrackCache fiveMinTrackCache,
|
||||
HourlyTrackCache hourlyTrackCache,
|
||||
DailyTrackCacheManager dailyTrackCacheManager,
|
||||
@Qualifier("queryDataSource") DataSource queryDataSource,
|
||||
DailyTrackCacheProperties cacheProperties) {
|
||||
this.fiveMinTrackCache = fiveMinTrackCache;
|
||||
this.hourlyTrackCache = hourlyTrackCache;
|
||||
this.dailyTrackCacheManager = dailyTrackCacheManager;
|
||||
this.queryDataSource = queryDataSource;
|
||||
this.cacheProperties = cacheProperties;
|
||||
}
|
||||
|
||||
@EventListener(ApplicationReadyEvent.class)
|
||||
public void onApplicationReady() {
|
||||
if (!cacheProperties.isEnabled()) {
|
||||
log.info("Cache warmup skipped (disabled)");
|
||||
return;
|
||||
}
|
||||
warmUpAllCachesAsync();
|
||||
}
|
||||
|
||||
@Async("cacheWarmupExecutor")
|
||||
public void warmUpAllCachesAsync() {
|
||||
long totalStart = System.currentTimeMillis();
|
||||
log.info("[CACHE-MONITOR] === 전체 캐시 워밍업 시작 === thread={}, now={}",
|
||||
Thread.currentThread().getName(), LocalDateTime.now());
|
||||
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
LocalDateTime currentHour = now.withMinute(0).withSecond(0).withNano(0);
|
||||
LocalDateTime todayStart = now.toLocalDate().atStartOfDay();
|
||||
|
||||
// 1. L1 워밍업: 현재 시간대 5min 트랙 (예: 12:00~12:50)
|
||||
long l1Start = System.currentTimeMillis();
|
||||
try {
|
||||
warmUpFiveMinCache(currentHour, now);
|
||||
} catch (Exception e) {
|
||||
log.error("[CACHE-MONITOR] L1(5min) 워밍업 실패 — 계속 진행: {}", e.getMessage());
|
||||
}
|
||||
long l1Elapsed = System.currentTimeMillis() - l1Start;
|
||||
|
||||
// 2. L2 워밍업: 오늘 정각 이전 hourly 트랙 (예: 00:00~12:00)
|
||||
long l2Start = System.currentTimeMillis();
|
||||
try {
|
||||
warmUpHourlyCache(todayStart, currentHour);
|
||||
} catch (Exception e) {
|
||||
log.error("[CACHE-MONITOR] L2(hourly) 워밍업 실패 — 계속 진행: {}", e.getMessage());
|
||||
}
|
||||
long l2Elapsed = System.currentTimeMillis() - l2Start;
|
||||
|
||||
// 3. L3 워밍업: D-1 ~ D-N daily (기존 DailyTrackCacheManager 위임)
|
||||
long l3Start = System.currentTimeMillis();
|
||||
try {
|
||||
dailyTrackCacheManager.warmUpCache();
|
||||
} catch (Exception e) {
|
||||
log.error("[CACHE-MONITOR] L3(daily) 워밍업 실패: {}", e.getMessage());
|
||||
}
|
||||
long l3Elapsed = System.currentTimeMillis() - l3Start;
|
||||
|
||||
long elapsed = System.currentTimeMillis() - totalStart;
|
||||
log.info("[CACHE-MONITOR] === 전체 캐시 워밍업 완료 === totalElapsed={}ms, L1={}ms(size={}), L2={}ms(size={}), L3={}ms, thread={}",
|
||||
elapsed, l1Elapsed, fiveMinTrackCache.size(), l2Elapsed, hourlyTrackCache.size(), l3Elapsed, Thread.currentThread().getName());
|
||||
}
|
||||
|
||||
/**
|
||||
* L1 워밍업: t_vessel_tracks_5min에서 현재 시간대 DB 로드
|
||||
* 예: 12:54 기동 → 12:00~12:50 (10건/MMSI) 로드
|
||||
*/
|
||||
private void warmUpFiveMinCache(LocalDateTime hourStart, LocalDateTime now) {
|
||||
String sql = "SELECT mmsi, time_bucket, " +
|
||||
"public.ST_AsText(track_geom) as track_geom, " +
|
||||
"distance_nm, avg_speed, max_speed, point_count " +
|
||||
"FROM signal.t_vessel_tracks_5min " +
|
||||
"WHERE time_bucket >= ? AND time_bucket < ?";
|
||||
|
||||
List<VesselTrack> tracks = loadVesselTracksFromDb(sql, hourStart, now);
|
||||
if (!tracks.isEmpty()) {
|
||||
long distinctMmsi = tracks.stream().map(VesselTrack::getMmsi).distinct().count();
|
||||
fiveMinTrackCache.putAll(tracks);
|
||||
log.info("[CACHE-MONITOR] L1 워밍업 완료: [{}, {}) → {} 건, {} MMSI, cacheSize={}",
|
||||
hourStart, now, tracks.size(), distinctMmsi, fiveMinTrackCache.size());
|
||||
} else {
|
||||
log.info("[CACHE-MONITOR] L1 워밍업: [{}, {}) → 데이터 없음 (정상)", hourStart, now);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* L2 워밍업: t_vessel_tracks_hourly에서 오늘 정각 이전 DB 로드
|
||||
* 예: 12:54 기동 → 00:00~12:00 (최대 12건/MMSI) 로드
|
||||
*/
|
||||
private void warmUpHourlyCache(LocalDateTime todayStart, LocalDateTime currentHour) {
|
||||
if (!currentHour.isAfter(todayStart)) {
|
||||
log.info("[CACHE-MONITOR] L2 워밍업: 자정 직후 — 건너뜀");
|
||||
return;
|
||||
}
|
||||
|
||||
String sql = "SELECT mmsi, time_bucket, " +
|
||||
"public.ST_AsText(track_geom) as track_geom, " +
|
||||
"distance_nm, avg_speed, max_speed, point_count " +
|
||||
"FROM signal.t_vessel_tracks_hourly " +
|
||||
"WHERE time_bucket >= ? AND time_bucket < ?";
|
||||
|
||||
List<VesselTrack> tracks = loadVesselTracksFromDb(sql, todayStart, currentHour);
|
||||
if (!tracks.isEmpty()) {
|
||||
long distinctMmsi = tracks.stream().map(VesselTrack::getMmsi).distinct().count();
|
||||
hourlyTrackCache.putAll(tracks);
|
||||
log.info("[CACHE-MONITOR] L2 워밍업 완료: [{}, {}) → {} 건, {} MMSI, cacheSize={}",
|
||||
todayStart, currentHour, tracks.size(), distinctMmsi, hourlyTrackCache.size());
|
||||
} else {
|
||||
log.info("[CACHE-MONITOR] L2 워밍업: [{}, {}) → 데이터 없음 (정상)",
|
||||
todayStart, currentHour);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* DB에서 VesselTrack 리스트 로드 (L1/L2 공용)
|
||||
*/
|
||||
private List<VesselTrack> loadVesselTracksFromDb(
|
||||
String sql, LocalDateTime start, LocalDateTime end) {
|
||||
List<VesselTrack> result = new ArrayList<>();
|
||||
try (Connection conn = queryDataSource.getConnection();
|
||||
PreparedStatement ps = conn.prepareStatement(sql)) {
|
||||
ps.setTimestamp(1, Timestamp.valueOf(start));
|
||||
ps.setTimestamp(2, Timestamp.valueOf(end));
|
||||
ps.setFetchSize(10000);
|
||||
|
||||
try (ResultSet rs = ps.executeQuery()) {
|
||||
while (rs.next()) {
|
||||
VesselTrack track = VesselTrack.builder()
|
||||
.mmsi(rs.getString("mmsi"))
|
||||
.timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime())
|
||||
.trackGeom(rs.getString("track_geom"))
|
||||
.distanceNm(BigDecimal.valueOf(rs.getDouble("distance_nm")))
|
||||
.avgSpeed(BigDecimal.valueOf(rs.getDouble("avg_speed")))
|
||||
.maxSpeed(BigDecimal.valueOf(rs.getDouble("max_speed")))
|
||||
.pointCount(rs.getInt("point_count"))
|
||||
.build();
|
||||
result.add(track);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("DB 워밍업 쿼리 실패 [{}, {}): {}", start, end, e.getMessage());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,154 @@
|
||||
package gc.mda.signal_batch.global.util;
|
||||
|
||||
import gc.mda.signal_batch.batch.reader.AisTargetCacheManager;
|
||||
import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack;
|
||||
import gc.mda.signal_batch.domain.vessel.model.AisTargetEntity;
|
||||
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.locationtech.jts.geom.Coordinate;
|
||||
import org.locationtech.jts.geom.LineString;
|
||||
import org.locationtech.jts.io.ParseException;
|
||||
import org.locationtech.jts.io.WKTReader;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* VesselTrack(배치 도메인) → CompactVesselTrack(조회 응답 DTO) 변환
|
||||
*
|
||||
* DailyTrackCacheManager.loadDay()의 VesselAccumulator 패턴 재사용.
|
||||
* L1(5min)/L2(hourly) 캐시 데이터를 REST/WebSocket 응답 형태로 변환.
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class VesselTrackToCompactConverter {
|
||||
|
||||
private static final ThreadLocal<WKTReader> wktReaderLocal = ThreadLocal.withInitial(WKTReader::new);
|
||||
|
||||
private final AisTargetCacheManager aisTargetCacheManager;
|
||||
|
||||
/**
|
||||
* MMSI별 VesselTrack 리스트 → CompactVesselTrack 리스트 변환
|
||||
*
|
||||
* @param tracksByMmsi MMSI → VesselTrack 리스트 (시간순 정렬 전제)
|
||||
* @return CompactVesselTrack 리스트 (MMSI별 1건, geometry/timestamps/speeds 병합)
|
||||
*/
|
||||
public List<CompactVesselTrack> convert(Map<String, List<VesselTrack>> tracksByMmsi) {
|
||||
if (tracksByMmsi == null || tracksByMmsi.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
long startMs = System.currentTimeMillis();
|
||||
int inputTrackCount = tracksByMmsi.values().stream().mapToInt(List::size).sum();
|
||||
|
||||
// 선박 정보 일괄 조회
|
||||
List<String> mmsiList = new ArrayList<>(tracksByMmsi.keySet());
|
||||
Map<String, AisTargetEntity> vesselInfoMap = aisTargetCacheManager.getAll(mmsiList);
|
||||
|
||||
long vesselInfoMs = System.currentTimeMillis();
|
||||
|
||||
List<CompactVesselTrack> result = new ArrayList<>(tracksByMmsi.size());
|
||||
WKTReader wktReader = wktReaderLocal.get();
|
||||
|
||||
for (Map.Entry<String, List<VesselTrack>> entry : tracksByMmsi.entrySet()) {
|
||||
String mmsi = entry.getKey();
|
||||
List<VesselTrack> tracks = entry.getValue();
|
||||
|
||||
CompactVesselTrack compact = convertSingleVessel(mmsi, tracks, vesselInfoMap.get(mmsi), wktReader);
|
||||
if (compact != null) {
|
||||
result.add(compact);
|
||||
}
|
||||
}
|
||||
|
||||
int totalPoints = result.stream().mapToInt(CompactVesselTrack::getPointCount).sum();
|
||||
long elapsed = System.currentTimeMillis() - startMs;
|
||||
log.info("[CACHE-MONITOR] VesselTrackToCompactConverter.convert: inputMmsi={}, inputTracks={}, outputCompact={}, totalPoints={}, vesselInfoLookup={}ms, totalElapsed={}ms",
|
||||
tracksByMmsi.size(), inputTrackCount, result.size(), totalPoints, vesselInfoMs - startMs, elapsed);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private CompactVesselTrack convertSingleVessel(
|
||||
String mmsi,
|
||||
List<VesselTrack> tracks,
|
||||
AisTargetEntity vesselInfo,
|
||||
WKTReader wktReader) {
|
||||
|
||||
List<double[]> geometry = new ArrayList<>();
|
||||
List<String> timestamps = new ArrayList<>();
|
||||
List<Double> speeds = new ArrayList<>();
|
||||
double totalDistance = 0;
|
||||
double maxSpeed = 0;
|
||||
|
||||
for (VesselTrack track : tracks) {
|
||||
String trackGeomWkt = track.getTrackGeom();
|
||||
if (trackGeomWkt == null || trackGeomWkt.isEmpty() || "LINESTRING EMPTY".equals(trackGeomWkt)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
LineString lineString = (LineString) wktReader.read(trackGeomWkt);
|
||||
if (lineString.getNumPoints() == 0) continue;
|
||||
|
||||
Coordinate[] coords = lineString.getCoordinates();
|
||||
for (Coordinate coord : coords) {
|
||||
geometry.add(new double[]{coord.x, coord.y});
|
||||
if (!Double.isNaN(coord.getM())) {
|
||||
timestamps.add(String.valueOf((long) coord.getM()));
|
||||
} else {
|
||||
timestamps.add(String.valueOf(
|
||||
track.getTimeBucket().toEpochSecond(java.time.ZoneOffset.of("+09:00"))));
|
||||
}
|
||||
speeds.add(0.0);
|
||||
}
|
||||
} catch (ParseException e) {
|
||||
log.debug("WKT 파싱 실패 — mmsi={}: {}", mmsi, e.getMessage());
|
||||
}
|
||||
|
||||
if (track.getDistanceNm() != null) {
|
||||
totalDistance += track.getDistanceNm().doubleValue();
|
||||
}
|
||||
if (track.getMaxSpeed() != null) {
|
||||
maxSpeed = Math.max(maxSpeed, track.getMaxSpeed().doubleValue());
|
||||
}
|
||||
}
|
||||
|
||||
if (geometry.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
int pointCount = geometry.size();
|
||||
double avgSpeed = pointCount > 0 ? totalDistance / Math.max(1, pointCount) * 60 : 0;
|
||||
|
||||
// 선박 정보 설정
|
||||
String shipName = null;
|
||||
String shipType = null;
|
||||
String shipKindCode = null;
|
||||
if (vesselInfo != null) {
|
||||
shipName = vesselInfo.getName();
|
||||
shipType = vesselInfo.getVesselType();
|
||||
shipKindCode = SignalKindCode.resolve(vesselInfo.getVesselType(), vesselInfo.getExtraInfo()).getCode();
|
||||
} else {
|
||||
shipKindCode = SignalKindCode.resolve(null, null).getCode();
|
||||
}
|
||||
|
||||
String nationalCode = mmsi.length() >= 3 ? mmsi.substring(0, 3) : mmsi;
|
||||
|
||||
return CompactVesselTrack.builder()
|
||||
.vesselId(mmsi)
|
||||
.nationalCode(nationalCode)
|
||||
.shipName(shipName)
|
||||
.shipType(shipType)
|
||||
.shipKindCode(shipKindCode)
|
||||
.geometry(geometry)
|
||||
.timestamps(timestamps)
|
||||
.speeds(speeds)
|
||||
.totalDistance(totalDistance)
|
||||
.avgSpeed(avgSpeed)
|
||||
.maxSpeed(maxSpeed)
|
||||
.pointCount(pointCount)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@ -11,9 +11,6 @@ import org.locationtech.jts.index.strtree.STRtree;
|
||||
import org.locationtech.jts.io.ParseException;
|
||||
import org.locationtech.jts.io.WKTReader;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
@ -100,20 +97,29 @@ public class DailyTrackCacheManager {
|
||||
public static class SplitQueryResult {
|
||||
private final List<LocalDate> cachedDates; // 캐시에서 가져올 날짜
|
||||
private final List<DateRange> dbRanges; // DB 조회 필요 범위 (연속 날짜 묶음)
|
||||
private final DateRange todayRange; // 오늘 구간 (hourly/5min)
|
||||
private final DateRange todayRange; // 오늘 구간 (fallback용 유지)
|
||||
private final DateRange hourlyRange; // 오늘 정각 이전 구간 (L2 캐시)
|
||||
private final DateRange fiveMinRange; // 현재 시각대 구간 (L1 캐시)
|
||||
|
||||
public SplitQueryResult(List<LocalDate> cachedDates, List<DateRange> dbRanges, DateRange todayRange) {
|
||||
public SplitQueryResult(List<LocalDate> cachedDates, List<DateRange> dbRanges,
|
||||
DateRange todayRange, DateRange hourlyRange, DateRange fiveMinRange) {
|
||||
this.cachedDates = cachedDates;
|
||||
this.dbRanges = dbRanges;
|
||||
this.todayRange = todayRange;
|
||||
this.hourlyRange = hourlyRange;
|
||||
this.fiveMinRange = fiveMinRange;
|
||||
}
|
||||
|
||||
public List<LocalDate> getCachedDates() { return cachedDates; }
|
||||
public List<DateRange> getDbRanges() { return dbRanges; }
|
||||
public DateRange getTodayRange() { return todayRange; }
|
||||
public DateRange getHourlyRange() { return hourlyRange; }
|
||||
public DateRange getFiveMinRange() { return fiveMinRange; }
|
||||
public boolean hasCachedData() { return !cachedDates.isEmpty(); }
|
||||
public boolean hasDbRanges() { return !dbRanges.isEmpty(); }
|
||||
public boolean hasTodayRange() { return todayRange != null; }
|
||||
public boolean hasHourlyRange() { return hourlyRange != null; }
|
||||
public boolean hasFiveMinRange() { return fiveMinRange != null; }
|
||||
}
|
||||
|
||||
public static class DateRange {
|
||||
@ -129,29 +135,11 @@ public class DailyTrackCacheManager {
|
||||
public LocalDateTime getEnd() { return end; }
|
||||
}
|
||||
|
||||
// ── 비동기 캐시 워밍업 ──
|
||||
|
||||
@EventListener(ApplicationReadyEvent.class)
|
||||
public void onApplicationReady() {
|
||||
if (!cacheProperties.isEnabled()) {
|
||||
status.set(CacheStatus.DISABLED);
|
||||
log.info("Daily track cache is disabled");
|
||||
return;
|
||||
}
|
||||
if (cacheProperties.isWarmupAsync()) {
|
||||
warmUpCacheAsync();
|
||||
} else {
|
||||
warmUpCache();
|
||||
}
|
||||
}
|
||||
|
||||
@Async("trackStreamingExecutor")
|
||||
public void warmUpCacheAsync() {
|
||||
warmUpCache();
|
||||
}
|
||||
// ── 캐시 워밍업 (CacheWarmupService에서 호출) ──
|
||||
|
||||
/**
|
||||
* 캐시 워밍업: D-1 → D-2 → ... → D-N 순서로 최근 우선 로드
|
||||
* CacheWarmupService.warmUpAllCachesAsync()에서 L3 단계로 호출됨.
|
||||
*/
|
||||
public void warmUpCache() {
|
||||
if (!cacheProperties.isEnabled()) {
|
||||
@ -441,6 +429,8 @@ public class DailyTrackCacheManager {
|
||||
List<LocalDate> cachedDates = new ArrayList<>();
|
||||
List<LocalDate> dbDates = new ArrayList<>();
|
||||
DateRange todayRange = null;
|
||||
DateRange hourlyRange = null;
|
||||
DateRange fiveMinRange = null;
|
||||
|
||||
// 요청 범위의 날짜별 분류
|
||||
LocalDate startDate = startTime.toLocalDate();
|
||||
@ -448,12 +438,25 @@ public class DailyTrackCacheManager {
|
||||
|
||||
for (LocalDate d = startDate; !d.isAfter(endDate); d = d.plusDays(1)) {
|
||||
if (d.equals(today)) {
|
||||
// 오늘 → hourly/5min 테이블 조회
|
||||
LocalDateTime todayStart = today.atStartOfDay();
|
||||
LocalDateTime todayEnd = endTime.isAfter(LocalDateTime.now()) ? LocalDateTime.now() : endTime;
|
||||
if (todayStart.isBefore(startTime)) todayStart = startTime;
|
||||
|
||||
// 현재 시각의 정각
|
||||
LocalDateTime currentHour = LocalDateTime.now().withMinute(0).withSecond(0).withNano(0);
|
||||
|
||||
if (todayEnd.isAfter(todayStart)) {
|
||||
todayRange = new DateRange(todayStart, todayEnd);
|
||||
|
||||
// hourlyRange: todayStart ~ currentHour (정각 이전, hourly 캐시에 있음)
|
||||
if (currentHour.isAfter(todayStart)) {
|
||||
hourlyRange = new DateRange(todayStart, currentHour);
|
||||
}
|
||||
|
||||
// fiveMinRange: currentHour ~ todayEnd (현재 시각대, 5min 캐시에 있음)
|
||||
if (todayEnd.isAfter(currentHour)) {
|
||||
fiveMinRange = new DateRange(currentHour, todayEnd);
|
||||
}
|
||||
}
|
||||
} else if (d.isAfter(today)) {
|
||||
// 미래 날짜 → 무시
|
||||
@ -468,7 +471,15 @@ public class DailyTrackCacheManager {
|
||||
// DB 조회 필요 날짜를 연속 범위로 묶기
|
||||
List<DateRange> dbRanges = mergeConsecutiveDates(dbDates, startTime, endTime);
|
||||
|
||||
return new SplitQueryResult(cachedDates, dbRanges, todayRange);
|
||||
log.info("[CACHE-MONITOR] splitQueryRange [{}, {}): cachedDays={}, dbRanges={}, hourlyRange={}, fiveMinRange={}, todayRange={}",
|
||||
startTime, endTime,
|
||||
cachedDates.size(),
|
||||
dbRanges.size(),
|
||||
hourlyRange != null ? hourlyRange.getStart() + "~" + hourlyRange.getEnd() : "null",
|
||||
fiveMinRange != null ? fiveMinRange.getStart() + "~" + fiveMinRange.getEnd() : "null",
|
||||
todayRange != null ? todayRange.getStart() + "~" + todayRange.getEnd() : "null");
|
||||
|
||||
return new SplitQueryResult(cachedDates, dbRanges, todayRange, hourlyRange, fiveMinRange);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -49,11 +49,11 @@ vessel:
|
||||
batch:
|
||||
scheduler:
|
||||
enabled: true
|
||||
chunk-size: 1000
|
||||
chunk-size: 10000
|
||||
partition-size: 4
|
||||
fetch-size: 50000
|
||||
bulk-insert:
|
||||
batch-size: 1000
|
||||
batch-size: 10000
|
||||
parallel-threads: 2
|
||||
|
||||
# ── AIS API 수집 설정 ──
|
||||
|
||||
@ -273,6 +273,14 @@ app:
|
||||
ais-target:
|
||||
ttl-minutes: 120
|
||||
|
||||
five-min-track:
|
||||
ttl-minutes: 75
|
||||
max-size: 500000
|
||||
|
||||
hourly-track:
|
||||
ttl-hours: 26
|
||||
max-size: 780000
|
||||
|
||||
# 일일 항적 데이터 인메모리 캐시
|
||||
cache:
|
||||
daily-track:
|
||||
|
||||
@ -272,6 +272,14 @@ app:
|
||||
ttl-minutes: 120 # 기본 TTL (프로파일별 오버라이드)
|
||||
max-size: 300000 # 최대 캐시 크기 (30만 건)
|
||||
|
||||
five-min-track:
|
||||
ttl-minutes: 75 # TTL 75분 (1시간 + 15분 여유)
|
||||
max-size: 500000 # 30K MMSI × 15 버킷
|
||||
|
||||
hourly-track:
|
||||
ttl-hours: 26 # TTL 26시간 (24시간 + 2시간 여유)
|
||||
max-size: 780000 # 30K MMSI × 26시간
|
||||
|
||||
chnprmship:
|
||||
mmsi-resource-path: classpath:chnprmship-mmsi.txt
|
||||
ttl-days: 2
|
||||
|
||||
불러오는 중...
Reference in New Issue
Block a user