perf: Hourly Job 인메모리 병합 전환 — N+1 SQL 제거

Hourly 집계의 N+1 DB 쿼리 패턴(60K+)을 인메모리 병합으로 교체.
5분 트랙 적재 시 FiveMinTrackCache에 보관하고 hourly job에서 캐시 기반으로
좌표 병합/통계 집계/간소화를 수행하여 DB 쿼리를 0~2회로 감소.

- FiveMinTrackCache: Caffeine 캐시 (TTL 75분, maxSize 500K)
- CacheBasedHourlyTrackReader: 캐시 기반 Reader + DB fallback
- HourlyTrackMergeProcessor: Java WKT 병합 + 비정상 검출 bulk prefetch
- @ConfigurationProperties 중복 해결 (프로그래밍 방식 Hikari 바인딩)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
htlee 2026-02-19 10:11:55 +09:00
부모 2e9361ee58
커밋 82ae4d9ef5
12개의 변경된 파일731개의 추가작업 그리고 426개의 파일을 삭제

파일 보기

@ -1,14 +1,14 @@
package gc.mda.signal_batch.batch.job; package gc.mda.signal_batch.batch.job;
import gc.mda.signal_batch.domain.vessel.model.VesselTrack; import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
import gc.mda.signal_batch.batch.processor.HourlyTrackProcessor;
import gc.mda.signal_batch.batch.processor.HourlyTrackProcessorWithAbnormalDetection;
import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector; import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector;
import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetectionResult; import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetectionResult;
import gc.mda.signal_batch.batch.processor.HourlyTrackMergeProcessor;
import gc.mda.signal_batch.batch.reader.CacheBasedHourlyTrackReader;
import gc.mda.signal_batch.batch.reader.FiveMinTrackCache;
import gc.mda.signal_batch.batch.writer.VesselTrackBulkWriter; import gc.mda.signal_batch.batch.writer.VesselTrackBulkWriter;
import gc.mda.signal_batch.batch.writer.AbnormalTrackWriter; import gc.mda.signal_batch.batch.writer.AbnormalTrackWriter;
import gc.mda.signal_batch.batch.writer.CompositeTrackWriter; import gc.mda.signal_batch.batch.writer.CompositeTrackWriter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Step; import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
@ -29,12 +29,11 @@ import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List; import java.util.List;
@Slf4j @Slf4j
@Configuration @Configuration
@Profile("!query") // query 프로파일에서는 배치 작업 비활성화 @Profile("!query")
@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true) @ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true)
public class HourlyAggregationStepConfig { public class HourlyAggregationStepConfig {
@ -44,6 +43,7 @@ public class HourlyAggregationStepConfig {
private final VesselTrackBulkWriter vesselTrackBulkWriter; private final VesselTrackBulkWriter vesselTrackBulkWriter;
private final AbnormalTrackWriter abnormalTrackWriter; private final AbnormalTrackWriter abnormalTrackWriter;
private final AbnormalTrackDetector abnormalTrackDetector; private final AbnormalTrackDetector abnormalTrackDetector;
private final FiveMinTrackCache fiveMinTrackCache;
public HourlyAggregationStepConfig( public HourlyAggregationStepConfig(
JobRepository jobRepository, JobRepository jobRepository,
@ -51,42 +51,78 @@ public class HourlyAggregationStepConfig {
@Qualifier("queryTransactionManager") PlatformTransactionManager transactionManager, @Qualifier("queryTransactionManager") PlatformTransactionManager transactionManager,
VesselTrackBulkWriter vesselTrackBulkWriter, VesselTrackBulkWriter vesselTrackBulkWriter,
AbnormalTrackWriter abnormalTrackWriter, AbnormalTrackWriter abnormalTrackWriter,
AbnormalTrackDetector abnormalTrackDetector) { AbnormalTrackDetector abnormalTrackDetector,
FiveMinTrackCache fiveMinTrackCache) {
this.jobRepository = jobRepository; this.jobRepository = jobRepository;
this.queryDataSource = queryDataSource; this.queryDataSource = queryDataSource;
this.transactionManager = transactionManager; this.transactionManager = transactionManager;
this.vesselTrackBulkWriter = vesselTrackBulkWriter; this.vesselTrackBulkWriter = vesselTrackBulkWriter;
this.abnormalTrackWriter = abnormalTrackWriter; this.abnormalTrackWriter = abnormalTrackWriter;
this.abnormalTrackDetector = abnormalTrackDetector; this.abnormalTrackDetector = abnormalTrackDetector;
this.fiveMinTrackCache = fiveMinTrackCache;
} }
@Value("${vessel.batch.chunk-size:5000}") @Value("${vessel.batch.chunk-size:5000}")
private int chunkSize; private int chunkSize;
//
// Step 1: 5분 시간 병합 (인메모리 캐시 기반)
//
@Bean @Bean
public Step mergeHourlyTracksStep() { public Step mergeHourlyTracksStep() {
// 비정상 궤적 검출은 항상 활성화 (설정 파일로 제어) log.info("Building mergeHourlyTracksStep with cache-based in-memory merge");
boolean detectAbnormal = true; return new StepBuilder("mergeHourlyTracksStep", jobRepository)
.<List<VesselTrack>, AbnormalDetectionResult>chunk(chunkSize, transactionManager)
if (detectAbnormal) { .reader(cacheBasedHourlyTrackReader(null, null))
log.info("Building mergeHourlyTracksStep with abnormal detection enabled"); .processor(hourlyTrackMergeProcessor(null))
return new StepBuilder("mergeHourlyTracksStep", jobRepository) .writer(hourlyCompositeTrackWriter())
.<VesselTrack.VesselKey, AbnormalDetectionResult>chunk(chunkSize, transactionManager) .build();
.reader(hourlyVesselKeyReader(null, null))
.processor(hourlyTrackProcessorWithAbnormalDetection())
.writer(hourlyCompositeTrackWriter())
.build();
} else {
log.info("Building mergeHourlyTracksStep without abnormal detection");
return new StepBuilder("mergeHourlyTracksStep", jobRepository)
.<VesselTrack.VesselKey, VesselTrack>chunk(chunkSize, transactionManager)
.reader(hourlyVesselKeyReader(null, null))
.processor(hourlyTrackItemProcessor())
.writer(hourlyTrackWriter())
.build();
}
} }
@Bean
@StepScope
public CacheBasedHourlyTrackReader cacheBasedHourlyTrackReader(
@Value("#{jobParameters['startTime']}") String startTime,
@Value("#{jobParameters['endTime']}") String endTime) {
LocalDateTime start = LocalDateTime.parse(startTime);
LocalDateTime end = LocalDateTime.parse(endTime);
return new CacheBasedHourlyTrackReader(
fiveMinTrackCache,
new JdbcTemplate(queryDataSource),
start, end);
}
@Bean
@StepScope
public HourlyTrackMergeProcessor hourlyTrackMergeProcessor(
@Value("#{jobParameters['timeBucket']}") String timeBucket) {
LocalDateTime hourBucket = LocalDateTime.parse(timeBucket)
.withMinute(0).withSecond(0).withNano(0);
return new HourlyTrackMergeProcessor(
abnormalTrackDetector,
new JdbcTemplate(queryDataSource),
hourBucket);
}
@Bean
public ItemWriter<AbnormalDetectionResult> hourlyCompositeTrackWriter() {
abnormalTrackWriter.setJobName("hourlyAggregationJob");
return new CompositeTrackWriter(
vesselTrackBulkWriter,
abnormalTrackWriter,
"hourly"
);
}
//
// Step 2: Grid Hourly Summary
//
@Bean @Bean
public Step gridHourlySummaryStep() { public Step gridHourlySummaryStep() {
return new StepBuilder("gridHourlySummaryStep", jobRepository) return new StepBuilder("gridHourlySummaryStep", jobRepository)
@ -96,85 +132,23 @@ public class HourlyAggregationStepConfig {
.writer(hourlyGridWriter(null, null)) .writer(hourlyGridWriter(null, null))
.build(); .build();
} }
@Bean
public Step areaHourlySummaryStep() {
return new StepBuilder("areaHourlySummaryStep", jobRepository)
.<String, HourlyAreaSummary>chunk(100, transactionManager)
.reader(hourlyAreaReader(null, null))
.processor(hourlyAreaProcessor())
.writer(hourlyAreaWriter(null, null))
.build();
}
@Bean
@StepScope
public JdbcCursorItemReader<VesselTrack.VesselKey> hourlyVesselKeyReader(
@Value("#{jobParameters['startTime']}") String startTime,
@Value("#{jobParameters['endTime']}") String endTime) {
LocalDateTime start = LocalDateTime.parse(startTime);
LocalDateTime end = LocalDateTime.parse(endTime);
String sql = """
SELECT DISTINCT mmsi, date_trunc('hour', time_bucket) as hour_bucket
FROM signal.t_vessel_tracks_5min
WHERE time_bucket >= ? AND time_bucket < ?
ORDER BY mmsi, hour_bucket
""";
return new JdbcCursorItemReaderBuilder<VesselTrack.VesselKey>()
.name("hourlyVesselKeyReader")
.dataSource(queryDataSource)
.sql(sql)
.preparedStatementSetter(ps -> {
ps.setTimestamp(1, java.sql.Timestamp.valueOf(start));
ps.setTimestamp(2, java.sql.Timestamp.valueOf(end));
})
.rowMapper((rs, rowNum) -> new VesselTrack.VesselKey(
rs.getString("mmsi"),
rs.getObject("hour_bucket", LocalDateTime.class)
))
.build();
}
@Bean
public ItemProcessor<VesselTrack.VesselKey, VesselTrack> hourlyTrackItemProcessor() {
return new HourlyTrackProcessor(queryDataSource, new JdbcTemplate(queryDataSource));
}
@Bean
public ItemWriter<VesselTrack> hourlyTrackWriter() {
return items -> {
List<VesselTrack> tracks = new ArrayList<>();
for (VesselTrack track : items) {
if (track != null) {
tracks.add(track);
}
}
if (!tracks.isEmpty()) {
vesselTrackBulkWriter.writeHourlyTracks(tracks);
}
};
}
// Grid summary reader
@Bean @Bean
@StepScope @StepScope
public JdbcCursorItemReader<Integer> hourlyGridReader( public JdbcCursorItemReader<Integer> hourlyGridReader(
@Value("#{jobParameters['startTime']}") String startTime, @Value("#{jobParameters['startTime']}") String startTime,
@Value("#{jobParameters['endTime']}") String endTime) { @Value("#{jobParameters['endTime']}") String endTime) {
LocalDateTime start = LocalDateTime.parse(startTime); LocalDateTime start = LocalDateTime.parse(startTime);
LocalDateTime end = LocalDateTime.parse(endTime); LocalDateTime end = LocalDateTime.parse(endTime);
String sql = """ String sql = """
SELECT DISTINCT haegu_no SELECT DISTINCT haegu_no
FROM signal.t_grid_vessel_tracks FROM signal.t_grid_vessel_tracks
WHERE time_bucket >= ? AND time_bucket < ? WHERE time_bucket >= ? AND time_bucket < ?
ORDER BY haegu_no ORDER BY haegu_no
"""; """;
return new JdbcCursorItemReaderBuilder<Integer>() return new JdbcCursorItemReaderBuilder<Integer>()
.name("hourlyGridReader") .name("hourlyGridReader")
.dataSource(queryDataSource) .dataSource(queryDataSource)
@ -186,39 +160,36 @@ public class HourlyAggregationStepConfig {
.rowMapper((rs, rowNum) -> rs.getInt("haegu_no")) .rowMapper((rs, rowNum) -> rs.getInt("haegu_no"))
.build(); .build();
} }
@Bean @Bean
public ItemProcessor<Integer, HourlyGridSummary> hourlyGridProcessor() { public ItemProcessor<Integer, HourlyGridSummary> hourlyGridProcessor() {
return new ItemProcessor<Integer, HourlyGridSummary>() { return haeguNo -> {
@Override HourlyGridSummary summary = new HourlyGridSummary();
public HourlyGridSummary process(Integer haeguNo) throws Exception { summary.haeguNo = haeguNo;
HourlyGridSummary summary = new HourlyGridSummary(); return summary;
summary.haeguNo = haeguNo;
return summary;
}
}; };
} }
@Bean @Bean
@StepScope @StepScope
public ItemWriter<HourlyGridSummary> hourlyGridWriter( public ItemWriter<HourlyGridSummary> hourlyGridWriter(
@Value("#{jobParameters['startTime']}") String startTime, @Value("#{jobParameters['startTime']}") String startTime,
@Value("#{jobParameters['endTime']}") String endTime) { @Value("#{jobParameters['endTime']}") String endTime) {
return items -> { return items -> {
LocalDateTime start = LocalDateTime.parse(startTime); LocalDateTime start = LocalDateTime.parse(startTime);
LocalDateTime end = LocalDateTime.parse(endTime); LocalDateTime end = LocalDateTime.parse(endTime);
LocalDateTime hourBucket = start.withMinute(0).withSecond(0).withNano(0); LocalDateTime hourBucket = start.withMinute(0).withSecond(0).withNano(0);
JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource); JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource);
for (HourlyGridSummary summary : items) { for (HourlyGridSummary summary : items) {
if (summary == null) continue; if (summary == null) continue;
String sql = """ String sql = """
INSERT INTO signal.t_grid_tracks_summary_hourly INSERT INTO signal.t_grid_tracks_summary_hourly
(haegu_no, time_bucket, total_vessels, total_distance_nm, avg_speed, vessel_list, created_at) (haegu_no, time_bucket, total_vessels, total_distance_nm, avg_speed, vessel_list, created_at)
SELECT SELECT
haegu_no, haegu_no,
?::timestamp as time_bucket, ?::timestamp as time_bucket,
COUNT(DISTINCT mmsi) as total_vessels, COUNT(DISTINCT mmsi) as total_vessels,
@ -242,29 +213,42 @@ public class HourlyAggregationStepConfig {
vessel_list = EXCLUDED.vessel_list, vessel_list = EXCLUDED.vessel_list,
created_at = NOW() created_at = NOW()
"""; """;
jdbcTemplate.update(sql, hourBucket, summary.haeguNo, start, end); jdbcTemplate.update(sql, hourBucket, summary.haeguNo, start, end);
} }
}; };
} }
// Area summary reader //
// Step 3: Area Hourly Summary
//
@Bean
public Step areaHourlySummaryStep() {
return new StepBuilder("areaHourlySummaryStep", jobRepository)
.<String, HourlyAreaSummary>chunk(100, transactionManager)
.reader(hourlyAreaReader(null, null))
.processor(hourlyAreaProcessor())
.writer(hourlyAreaWriter(null, null))
.build();
}
@Bean @Bean
@StepScope @StepScope
public JdbcCursorItemReader<String> hourlyAreaReader( public JdbcCursorItemReader<String> hourlyAreaReader(
@Value("#{jobParameters['startTime']}") String startTime, @Value("#{jobParameters['startTime']}") String startTime,
@Value("#{jobParameters['endTime']}") String endTime) { @Value("#{jobParameters['endTime']}") String endTime) {
LocalDateTime start = LocalDateTime.parse(startTime); LocalDateTime start = LocalDateTime.parse(startTime);
LocalDateTime end = LocalDateTime.parse(endTime); LocalDateTime end = LocalDateTime.parse(endTime);
String sql = """ String sql = """
SELECT DISTINCT area_id SELECT DISTINCT area_id
FROM signal.t_area_vessel_tracks FROM signal.t_area_vessel_tracks
WHERE time_bucket >= ? AND time_bucket < ? WHERE time_bucket >= ? AND time_bucket < ?
ORDER BY area_id ORDER BY area_id
"""; """;
return new JdbcCursorItemReaderBuilder<String>() return new JdbcCursorItemReaderBuilder<String>()
.name("hourlyAreaReader") .name("hourlyAreaReader")
.dataSource(queryDataSource) .dataSource(queryDataSource)
@ -276,39 +260,36 @@ public class HourlyAggregationStepConfig {
.rowMapper((rs, rowNum) -> rs.getString("area_id")) .rowMapper((rs, rowNum) -> rs.getString("area_id"))
.build(); .build();
} }
@Bean @Bean
public ItemProcessor<String, HourlyAreaSummary> hourlyAreaProcessor() { public ItemProcessor<String, HourlyAreaSummary> hourlyAreaProcessor() {
return new ItemProcessor<String, HourlyAreaSummary>() { return areaId -> {
@Override HourlyAreaSummary summary = new HourlyAreaSummary();
public HourlyAreaSummary process(String areaId) throws Exception { summary.areaId = areaId;
HourlyAreaSummary summary = new HourlyAreaSummary(); return summary;
summary.areaId = areaId;
return summary;
}
}; };
} }
@Bean @Bean
@StepScope @StepScope
public ItemWriter<HourlyAreaSummary> hourlyAreaWriter( public ItemWriter<HourlyAreaSummary> hourlyAreaWriter(
@Value("#{jobParameters['startTime']}") String startTime, @Value("#{jobParameters['startTime']}") String startTime,
@Value("#{jobParameters['endTime']}") String endTime) { @Value("#{jobParameters['endTime']}") String endTime) {
return items -> { return items -> {
LocalDateTime start = LocalDateTime.parse(startTime); LocalDateTime start = LocalDateTime.parse(startTime);
LocalDateTime end = LocalDateTime.parse(endTime); LocalDateTime end = LocalDateTime.parse(endTime);
LocalDateTime hourBucket = start.withMinute(0).withSecond(0).withNano(0); LocalDateTime hourBucket = start.withMinute(0).withSecond(0).withNano(0);
JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource); JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource);
for (HourlyAreaSummary summary : items) { for (HourlyAreaSummary summary : items) {
if (summary == null) continue; if (summary == null) continue;
String sql = """ String sql = """
INSERT INTO signal.t_area_tracks_summary_hourly INSERT INTO signal.t_area_tracks_summary_hourly
(area_id, time_bucket, total_vessels, total_distance_nm, avg_speed, vessel_list, created_at) (area_id, time_bucket, total_vessels, total_distance_nm, avg_speed, vessel_list, created_at)
SELECT SELECT
area_id, area_id,
?::timestamp as time_bucket, ?::timestamp as time_bucket,
COUNT(DISTINCT mmsi) as total_vessels, COUNT(DISTINCT mmsi) as total_vessels,
@ -332,39 +313,18 @@ public class HourlyAggregationStepConfig {
vessel_list = EXCLUDED.vessel_list, vessel_list = EXCLUDED.vessel_list,
created_at = NOW() created_at = NOW()
"""; """;
jdbcTemplate.update(sql, hourBucket, summary.areaId, start, end); jdbcTemplate.update(sql, hourBucket, summary.areaId, start, end);
} }
}; };
} }
// 비정상 궤적 검출 관련 정의
@Bean
public ItemProcessor<VesselTrack.VesselKey, AbnormalDetectionResult> hourlyTrackProcessorWithAbnormalDetection() {
return new HourlyTrackProcessorWithAbnormalDetection(
hourlyTrackItemProcessor(),
abnormalTrackDetector,
queryDataSource
);
}
@Bean
public ItemWriter<AbnormalDetectionResult> hourlyCompositeTrackWriter() {
// Job 이름 직접 설정
abnormalTrackWriter.setJobName("hourlyAggregationJob");
return new CompositeTrackWriter(
vesselTrackBulkWriter,
abnormalTrackWriter,
"hourly"
);
}
// Summary 클래스들 // Summary 클래스들
public static class HourlyGridSummary { public static class HourlyGridSummary {
public Integer haeguNo; public Integer haeguNo;
} }
public static class HourlyAreaSummary { public static class HourlyAreaSummary {
public String areaId; public String areaId;
} }
} }

파일 보기

@ -9,6 +9,7 @@ import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector;
import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetectionResult; import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetectionResult;
import gc.mda.signal_batch.batch.reader.AisTargetCacheManager; import gc.mda.signal_batch.batch.reader.AisTargetCacheManager;
import gc.mda.signal_batch.batch.reader.CacheBasedVesselTrackDataReader; import gc.mda.signal_batch.batch.reader.CacheBasedVesselTrackDataReader;
import gc.mda.signal_batch.batch.reader.FiveMinTrackCache;
import gc.mda.signal_batch.global.util.TrackClippingUtils; import gc.mda.signal_batch.global.util.TrackClippingUtils;
import gc.mda.signal_batch.batch.writer.VesselTrackBulkWriter; import gc.mda.signal_batch.batch.writer.VesselTrackBulkWriter;
import gc.mda.signal_batch.batch.writer.AbnormalTrackWriter; import gc.mda.signal_batch.batch.writer.AbnormalTrackWriter;
@ -59,6 +60,7 @@ public class VesselTrackStepConfig {
private final AbnormalTrackDetector abnormalTrackDetector; private final AbnormalTrackDetector abnormalTrackDetector;
private final AbnormalTrackWriter abnormalTrackWriter; private final AbnormalTrackWriter abnormalTrackWriter;
private final VesselPreviousBucketCache previousBucketCache; private final VesselPreviousBucketCache previousBucketCache;
private final FiveMinTrackCache fiveMinTrackCache;
// 현재 처리 중인 버킷의 종료 위치 저장 (캐시 업데이트용) // 현재 처리 중인 버킷의 종료 위치 저장 (캐시 업데이트용)
private final Map<String, VesselBucketPositionDto> currentBucketEndPositions = new ConcurrentHashMap<>(); private final Map<String, VesselBucketPositionDto> currentBucketEndPositions = new ConcurrentHashMap<>();
@ -73,7 +75,8 @@ public class VesselTrackStepConfig {
TrackClippingUtils trackClippingUtils, TrackClippingUtils trackClippingUtils,
AbnormalTrackDetector abnormalTrackDetector, AbnormalTrackDetector abnormalTrackDetector,
AbnormalTrackWriter abnormalTrackWriter, AbnormalTrackWriter abnormalTrackWriter,
VesselPreviousBucketCache previousBucketCache) { VesselPreviousBucketCache previousBucketCache,
FiveMinTrackCache fiveMinTrackCache) {
this.jobRepository = jobRepository; this.jobRepository = jobRepository;
this.transactionManager = transactionManager; this.transactionManager = transactionManager;
this.queryDataSource = queryDataSource; this.queryDataSource = queryDataSource;
@ -84,6 +87,7 @@ public class VesselTrackStepConfig {
this.abnormalTrackDetector = abnormalTrackDetector; this.abnormalTrackDetector = abnormalTrackDetector;
this.abnormalTrackWriter = abnormalTrackWriter; this.abnormalTrackWriter = abnormalTrackWriter;
this.previousBucketCache = previousBucketCache; this.previousBucketCache = previousBucketCache;
this.fiveMinTrackCache = fiveMinTrackCache;
} }
@Value("${vessel.batch.chunk-size:1000}") @Value("${vessel.batch.chunk-size:1000}")
@ -302,7 +306,18 @@ public class VesselTrackStepConfig {
// 1. 기존 Writer로 DB 저장 // 1. 기존 Writer로 DB 저장
vesselTrackBulkWriter.write(chunk); vesselTrackBulkWriter.write(chunk);
// 2. 캐시 업데이트 (현재 버킷 종료 위치) // 2. FiveMinTrackCache에 저장 (hourly 인메모리 병합용)
int cachedCount = 0;
for (List<VesselTrack> trackGroup : chunk.getItems()) {
fiveMinTrackCache.putAll(trackGroup);
cachedCount += trackGroup.size();
}
if (cachedCount > 0) {
log.debug("FiveMinTrackCache 저장: {} 건 (총 캐시: {} 건)",
cachedCount, fiveMinTrackCache.size());
}
// 3. 이전 버킷 종료 위치 캐시 업데이트
if (!currentBucketEndPositions.isEmpty()) { if (!currentBucketEndPositions.isEmpty()) {
List<VesselBucketPositionDto> positions = new ArrayList<>(currentBucketEndPositions.values()); List<VesselBucketPositionDto> positions = new ArrayList<>(currentBucketEndPositions.values());
previousBucketCache.putAll(positions); previousBucketCache.putAll(positions);

파일 보기

@ -0,0 +1,271 @@
package gc.mda.signal_batch.batch.processor;
import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetectionResult;
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
import gc.mda.signal_batch.global.util.LineStringMUtils;
import gc.mda.signal_batch.global.util.TrackSimplificationUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.jdbc.core.JdbcTemplate;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 인메모리 기반 Hourly Track 병합 프로세서
*
* 5분 트랙 리스트를 받아:
* 1. WKT 좌표 연결 (Java String)
* 2. 통계 집계 (distance, speed, pointCount)
* 3. 간소화 (TrackSimplificationUtils)
* 4. 비정상 검출 (이전 버킷 1회 bulk prefetch)
*
* N+1 SQL 제거 DB 쿼리 최대 1회 (비정상 검출용 이전 버킷 prefetch)
*/
@Slf4j
public class HourlyTrackMergeProcessor implements ItemProcessor<List<VesselTrack>, AbnormalDetectionResult> {
private static final Pattern WKT_COORDS_PATTERN = Pattern.compile("LINESTRING M\\((.+)\\)");
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private final AbnormalTrackDetector abnormalTrackDetector;
private final JdbcTemplate queryJdbcTemplate;
private final LocalDateTime hourBucket;
// Lazy-init: 이전 버킷 데이터 1회 bulk prefetch
private Map<String, VesselTrack> previousBucketCache;
private boolean previousBucketLoaded = false;
public HourlyTrackMergeProcessor(
AbnormalTrackDetector abnormalTrackDetector,
JdbcTemplate queryJdbcTemplate,
LocalDateTime hourBucket) {
this.abnormalTrackDetector = abnormalTrackDetector;
this.queryJdbcTemplate = queryJdbcTemplate;
this.hourBucket = hourBucket;
}
@Override
public AbnormalDetectionResult process(List<VesselTrack> fiveMinTracks) throws Exception {
if (fiveMinTracks == null || fiveMinTracks.isEmpty()) {
return null;
}
String mmsi = fiveMinTracks.get(0).getMmsi();
// Step 1: WKT 좌표 병합
String mergedWkt = mergeTrackGeometries(fiveMinTracks);
if (mergedWkt == null) {
log.debug("병합 실패 (유효한 좌표 없음): mmsi={}", mmsi);
return null;
}
// Step 2: 통계 집계
BigDecimal totalDistance = BigDecimal.ZERO;
BigDecimal maxSpeed = BigDecimal.ZERO;
int totalPoints = 0;
for (VesselTrack track : fiveMinTracks) {
if (track.getDistanceNm() != null) {
totalDistance = totalDistance.add(track.getDistanceNm());
}
if (track.getMaxSpeed() != null && track.getMaxSpeed().compareTo(maxSpeed) > 0) {
maxSpeed = track.getMaxSpeed();
}
if (track.getPointCount() != null) {
totalPoints += track.getPointCount();
}
}
// avgSpeed: M값 기반 시간 차이로 계산
BigDecimal avgSpeed = calculateAvgSpeed(mergedWkt, totalDistance);
VesselTrack.TrackPosition startPos = fiveMinTracks.get(0).getStartPosition();
VesselTrack.TrackPosition endPos = fiveMinTracks.get(fiveMinTracks.size() - 1).getEndPosition();
// Step 3: 간소화
String simplifiedWkt = TrackSimplificationUtils.simplifyHourlyTrack(mergedWkt);
int simplifiedPoints = countWktPoints(simplifiedWkt);
if (!mergedWkt.equals(simplifiedWkt)) {
TrackSimplificationUtils.SimplificationStats stats =
TrackSimplificationUtils.getSimplificationStats(mergedWkt, simplifiedWkt);
log.debug("시간별 궤적 간소화 - vessel: {}, 원본: {}포인트, 간소화: {}포인트 ({}% 감소)",
mmsi, stats.originalPoints, stats.simplifiedPoints, (int) stats.reductionRate);
}
VesselTrack hourlyTrack = VesselTrack.builder()
.mmsi(mmsi)
.timeBucket(hourBucket)
.trackGeom(simplifiedWkt)
.distanceNm(totalDistance)
.avgSpeed(avgSpeed)
.maxSpeed(maxSpeed)
.pointCount(simplifiedPoints > 0 ? simplifiedPoints : totalPoints)
.startPosition(startPos)
.endPosition(endPos)
.build();
// Step 4: 비정상 검출 (lazy-init, 1회 bulk prefetch)
if (!previousBucketLoaded) {
previousBucketCache = bulkFetchPreviousBucketTracks();
previousBucketLoaded = true;
}
VesselTrack prevTrack = previousBucketCache.get(mmsi);
AbnormalDetectionResult result = abnormalTrackDetector.detectBucketTransitionOnly(hourlyTrack, prevTrack);
if (result.hasAbnormalities()) {
log.debug("Hourly 비정상 궤적 검출: mmsi={}, segments={}",
mmsi, result.getAbnormalSegments().size());
}
return result;
}
/**
* 5분 트랙들의 WKT 좌표를 하나로 연결
*/
private String mergeTrackGeometries(List<VesselTrack> tracks) {
StringBuilder allCoords = new StringBuilder();
for (VesselTrack track : tracks) {
String wkt = track.getTrackGeom();
if (wkt == null || wkt.isEmpty()) continue;
Matcher matcher = WKT_COORDS_PATTERN.matcher(wkt);
if (matcher.find()) {
String coords = matcher.group(1);
if (!coords.isBlank()) {
if (allCoords.length() > 0) {
allCoords.append(", ");
}
allCoords.append(coords);
}
}
}
if (allCoords.length() == 0) {
return null;
}
return "LINESTRING M(" + allCoords + ")";
}
/**
* M값(Unix timestamp) 기반 평균 속도 계산
*/
private BigDecimal calculateAvgSpeed(String wkt, BigDecimal totalDistance) {
try {
Matcher matcher = WKT_COORDS_PATTERN.matcher(wkt);
if (!matcher.find()) return BigDecimal.ZERO;
String coords = matcher.group(1);
String[] points = coords.split(",");
if (points.length < 2) return BigDecimal.ZERO;
// 번째 포인트의 M값
String[] firstParts = points[0].trim().split("\\s+");
double firstM = firstParts.length >= 3 ? Double.parseDouble(firstParts[2]) : 0;
// 마지막 포인트의 M값
String[] lastParts = points[points.length - 1].trim().split("\\s+");
double lastM = lastParts.length >= 3 ? Double.parseDouble(lastParts[2]) : 0;
double timeDiffSeconds = lastM - firstM;
if (timeDiffSeconds <= 0) return BigDecimal.ZERO;
double timeDiffHours = timeDiffSeconds / 3600.0;
double avgSpeedVal = totalDistance.doubleValue() / timeDiffHours;
// 비현실적 속도 제한
avgSpeedVal = Math.min(avgSpeedVal, 9999.99);
return BigDecimal.valueOf(avgSpeedVal).setScale(2, RoundingMode.HALF_UP);
} catch (Exception e) {
log.debug("avgSpeed 계산 실패: {}", e.getMessage());
return BigDecimal.ZERO;
}
}
private int countWktPoints(String wkt) {
if (wkt == null || !wkt.startsWith("LINESTRING M")) return 0;
try {
String coords = wkt.substring("LINESTRING M(".length(), wkt.length() - 1);
return coords.split(",").length;
} catch (Exception e) {
return 0;
}
}
/**
* 비정상 검출용 이전 1시간의 MMSI별 마지막 5분 트랙 bulk prefetch
*/
private Map<String, VesselTrack> bulkFetchPreviousBucketTracks() {
LocalDateTime prevStart = hourBucket.minusHours(1);
LocalDateTime prevEnd = hourBucket;
String sql = """
SELECT DISTINCT ON (mmsi)
mmsi, time_bucket, end_position,
public.ST_AsText(public.ST_LineSubstring(track_geom, 0.9, 1.0)) as last_segment
FROM signal.t_vessel_tracks_5min
WHERE time_bucket >= ? AND time_bucket < ?
AND track_geom IS NOT NULL
ORDER BY mmsi, time_bucket DESC
""";
Map<String, VesselTrack> result = new HashMap<>();
try {
queryJdbcTemplate.query(sql,
ps -> {
ps.setTimestamp(1, Timestamp.valueOf(prevStart));
ps.setTimestamp(2, Timestamp.valueOf(prevEnd));
},
rs -> {
String mmsi = rs.getString("mmsi");
VesselTrack track = VesselTrack.builder()
.mmsi(mmsi)
.timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime())
.trackGeom(rs.getString("last_segment"))
.endPosition(parseEndPosition(rs.getString("end_position")))
.build();
result.put(mmsi, track);
});
log.info("이전 버킷 트랙 prefetch 완료: {} 선박 (기간: {} ~ {})",
result.size(), prevStart, prevEnd);
} catch (Exception e) {
log.warn("이전 버킷 트랙 prefetch 실패 (첫 실행일 수 있음): {}", e.getMessage());
}
return result;
}
private VesselTrack.TrackPosition parseEndPosition(String json) {
if (json == null) return null;
try {
String lat = LineStringMUtils.extractJsonValue(json, "lat");
String lon = LineStringMUtils.extractJsonValue(json, "lon");
String time = LineStringMUtils.extractJsonValue(json, "time");
String sog = LineStringMUtils.extractJsonValue(json, "sog");
return VesselTrack.TrackPosition.builder()
.lat(lat != null ? Double.parseDouble(lat) : null)
.lon(lon != null ? Double.parseDouble(lon) : null)
.time(time != null ? LocalDateTime.parse(time, TIMESTAMP_FORMATTER) : null)
.sog(sog != null ? new BigDecimal(sog) : null)
.build();
} catch (Exception e) {
log.debug("end_position 파싱 실패: {}", json);
return null;
}
}
}

파일 보기

@ -1,194 +0,0 @@
package gc.mda.signal_batch.batch.processor;
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.jdbc.core.JdbcTemplate;
import java.math.BigDecimal;
import java.sql.ResultSet;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import gc.mda.signal_batch.global.util.LineStringMUtils;
import gc.mda.signal_batch.global.util.TrackSimplificationUtils;
import javax.sql.DataSource;
@Slf4j
@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true)
@RequiredArgsConstructor
public class HourlyTrackProcessor implements ItemProcessor<VesselTrack.VesselKey, VesselTrack> {
private final DataSource queryDataSource;
private final JdbcTemplate jdbcTemplate;
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Override
public VesselTrack process(VesselTrack.VesselKey vesselKey) throws Exception {
LocalDateTime hourBucket = vesselKey.getTimeBucket()
.withMinute(0)
.withSecond(0)
.withNano(0);
String sql = """
WITH ordered_tracks AS (
SELECT *
FROM signal.t_vessel_tracks_5min
WHERE mmsi = ?
AND time_bucket >= ?
AND time_bucket < ?
AND track_geom IS NOT NULL
AND public.ST_NPoints(track_geom) > 0
ORDER BY time_bucket
),
merged_coords AS (
SELECT
mmsi,
string_agg(
substring(public.ST_AsText(track_geom) from 'M \\((.+)\\)'),
','
ORDER BY time_bucket
) FILTER (WHERE track_geom IS NOT NULL) as all_coords
FROM ordered_tracks
GROUP BY mmsi
),
merged_tracks AS (
SELECT
mc.mmsi,
TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS') as time_bucket,
public.ST_GeomFromText('LINESTRING M(' || mc.all_coords || ')') as merged_geom,
(SELECT MAX(max_speed) FROM ordered_tracks WHERE mmsi = mc.mmsi) as max_speed,
(SELECT SUM(point_count) FROM ordered_tracks WHERE mmsi = mc.mmsi) as total_points,
(SELECT MIN(time_bucket) FROM ordered_tracks WHERE mmsi = mc.mmsi) as start_time,
(SELECT MAX(time_bucket) FROM ordered_tracks WHERE mmsi = mc.mmsi) as end_time,
(SELECT start_position FROM ordered_tracks WHERE mmsi = mc.mmsi ORDER BY time_bucket LIMIT 1) as start_pos,
(SELECT end_position FROM ordered_tracks WHERE mmsi = mc.mmsi ORDER BY time_bucket DESC LIMIT 1) as end_pos
FROM merged_coords mc
),
calculated_tracks AS (
SELECT
*,
public.ST_Length(merged_geom::geography) / 1852.0 as total_distance,
CASE
WHEN public.ST_NPoints(merged_geom) > 0 THEN
public.ST_M(public.ST_PointN(merged_geom, public.ST_NPoints(merged_geom))) -
public.ST_M(public.ST_PointN(merged_geom, 1))
ELSE
EXTRACT(EPOCH FROM
TO_TIMESTAMP(end_pos->>'time', 'YYYY-MM-DD HH24:MI:SS') - TO_TIMESTAMP(start_pos->>'time', 'YYYY-MM-DD HH24:MI:SS')
)
END as time_diff_seconds
FROM merged_tracks
)
SELECT
mmsi,
time_bucket,
merged_geom,
total_distance,
CASE
WHEN time_diff_seconds > 0 THEN
CAST(LEAST((total_distance / (time_diff_seconds / 3600.0)), 9999.99) AS numeric(6,2))
ELSE 0
END as avg_speed,
max_speed,
total_points,
start_time,
end_time,
start_pos,
end_pos,
public.ST_AsText(merged_geom) as geom_text
FROM calculated_tracks
""";
LocalDateTime startTime = hourBucket;
LocalDateTime endTime = hourBucket.plusHours(1);
Timestamp startTimestamp = Timestamp.valueOf(startTime);
Timestamp endTimestamp = Timestamp.valueOf(endTime);
Timestamp hourBucketTimestamp = Timestamp.valueOf(hourBucket);
log.debug("HourlyTrackProcessor params - mmsi: {}, startTime: {}, endTime: {}, hourBucket: {}",
vesselKey.getMmsi(), startTimestamp, endTimestamp, hourBucketTimestamp);
try {
return jdbcTemplate.queryForObject(sql,
(rs, rowNum) -> {
try {
return buildHourlyTrack(rs, hourBucket);
} catch (Exception e) {
throw new RuntimeException("Failed to build hourly track", e);
}
},
vesselKey.getMmsi(),
startTimestamp, endTimestamp, hourBucketTimestamp
);
} catch (org.springframework.dao.EmptyResultDataAccessException e) {
log.warn("No 5min data found for vessel {} in time range {}-{}, skipping hourly aggregation",
vesselKey.getMmsi(), startTimestamp, endTimestamp);
return null;
} catch (Exception e) {
log.error("Failed to process hourly track for vessel {}: {}",
vesselKey.getMmsi(), e.getMessage(), e);
return null;
}
}
private VesselTrack buildHourlyTrack(ResultSet rs, LocalDateTime hourBucket) throws Exception {
VesselTrack.TrackPosition startPos = null;
VesselTrack.TrackPosition endPos = null;
String startPosJson = rs.getString("start_pos");
String endPosJson = rs.getString("end_pos");
if (startPosJson != null) {
startPos = parseTrackPosition(startPosJson);
}
if (endPosJson != null) {
endPos = parseTrackPosition(endPosJson);
}
String hourlyLineStringM = rs.getString("geom_text");
String simplifiedLineStringM = TrackSimplificationUtils.simplifyHourlyTrack(hourlyLineStringM);
if (!hourlyLineStringM.equals(simplifiedLineStringM)) {
TrackSimplificationUtils.SimplificationStats stats =
TrackSimplificationUtils.getSimplificationStats(hourlyLineStringM, simplifiedLineStringM);
log.debug("시간별 궤적 간소화 - vessel: {}, 원본: {}포인트, 간소화: {}포인트 ({}% 감소)",
rs.getString("mmsi"),
stats.originalPoints, stats.simplifiedPoints, (int)stats.reductionRate);
}
return VesselTrack.builder()
.mmsi(rs.getString("mmsi"))
.timeBucket(hourBucket)
.trackGeom(simplifiedLineStringM)
.distanceNm(rs.getBigDecimal("total_distance"))
.avgSpeed(rs.getBigDecimal("avg_speed"))
.maxSpeed(rs.getBigDecimal("max_speed"))
.pointCount(rs.getInt("total_points"))
.startPosition(startPos)
.endPosition(endPos)
.build();
}
private VesselTrack.TrackPosition parseTrackPosition(String json) {
try {
String latStr = LineStringMUtils.extractJsonValue(json, "lat");
String lonStr = LineStringMUtils.extractJsonValue(json, "lon");
String timeStr = LineStringMUtils.extractJsonValue(json, "time");
String sogStr = LineStringMUtils.extractJsonValue(json, "sog");
return VesselTrack.TrackPosition.builder()
.lat(latStr != null ? Double.parseDouble(latStr) : null)
.lon(lonStr != null ? Double.parseDouble(lonStr) : null)
.time(timeStr != null ? LocalDateTime.parse(timeStr, TIMESTAMP_FORMATTER) : null)
.sog(sogStr != null ? new BigDecimal(sogStr) : null)
.build();
} catch (Exception e) {
log.error("Failed to parse track position: {}", json, e);
return null;
}
}
}

파일 보기

@ -1,38 +0,0 @@
package gc.mda.signal_batch.batch.processor;
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import javax.sql.DataSource;
import java.time.LocalDateTime;
/**
* 시간별 궤적 프로세서 - 비정상 궤적 검출 기능 포함
*/
@Slf4j
@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true)
public class HourlyTrackProcessorWithAbnormalDetection extends BaseTrackProcessorWithAbnormalDetection {
public HourlyTrackProcessorWithAbnormalDetection(
ItemProcessor<VesselTrack.VesselKey, VesselTrack> hourlyTrackProcessor,
AbnormalTrackDetector abnormalTrackDetector,
DataSource queryDataSource) {
super(hourlyTrackProcessor, abnormalTrackDetector, queryDataSource);
}
@Override
protected String getPreviousTrackTableName() {
return "signal.t_vessel_tracks_5min";
}
@Override
protected LocalDateTime getNormalizedBucket(LocalDateTime timeBucket) {
return timeBucket.withMinute(0).withSecond(0).withNano(0);
}
@Override
protected LocalDateTime getPreviousBucket(LocalDateTime currentBucket) {
return currentBucket.minusHours(1);
}
}

파일 보기

@ -0,0 +1,181 @@
package gc.mda.signal_batch.batch.reader;
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
import gc.mda.signal_batch.global.util.LineStringMUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemReader;
import org.springframework.jdbc.core.JdbcTemplate;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
/**
* 캐시 기반 Hourly Track Reader
*
* FiveMinTrackCache에서 5분 트랙을 MMSI별로 읽어 반환.
* 캐시에 없는 MMSI는 DB fallback으로 보충.
*
* 정상 운영 : DB 쿼리 1회 (DISTINCT mmsi 완전성 확인)
* 재시작 : DB 쿼리 2회 (완전성 확인 + fallback 벌크)
*/
@Slf4j
public class CacheBasedHourlyTrackReader implements ItemReader<List<VesselTrack>> {
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private final FiveMinTrackCache fiveMinTrackCache;
private final JdbcTemplate queryJdbcTemplate;
private final LocalDateTime startTime;
private final LocalDateTime endTime;
private Iterator<List<VesselTrack>> groupIterator;
private boolean initialized = false;
public CacheBasedHourlyTrackReader(
FiveMinTrackCache fiveMinTrackCache,
JdbcTemplate queryJdbcTemplate,
LocalDateTime startTime,
LocalDateTime endTime) {
this.fiveMinTrackCache = fiveMinTrackCache;
this.queryJdbcTemplate = queryJdbcTemplate;
this.startTime = startTime;
this.endTime = endTime;
}
@Override
public List<VesselTrack> read() {
if (!initialized) {
initialize();
initialized = true;
}
if (groupIterator != null && groupIterator.hasNext()) {
return groupIterator.next();
}
return null;
}
private void initialize() {
// 1. 캐시에서 데이터 로드
Map<String, List<VesselTrack>> cacheData = fiveMinTrackCache.getTracksInRange(startTime, endTime);
log.info("Hourly Reader 초기화 — 캐시에서 {} 선박 로드 (기간: {} ~ {})",
cacheData.size(), startTime, endTime);
// 2. DB에서 해당 기간 MMSI 목록 조회 (완전성 확인)
String distinctSql = """
SELECT DISTINCT mmsi FROM signal.t_vessel_tracks_5min
WHERE time_bucket >= ? AND time_bucket < ?
""";
List<String> dbMmsiList = queryJdbcTemplate.queryForList(
distinctSql, String.class,
Timestamp.valueOf(startTime), Timestamp.valueOf(endTime));
Set<String> dbMmsiSet = new HashSet<>(dbMmsiList);
// 3. 캐시에 없는 MMSI 감지
Set<String> cacheMmsiSet = cacheData.keySet();
Set<String> missingMmsi = new HashSet<>(dbMmsiSet);
missingMmsi.removeAll(cacheMmsiSet);
if (!missingMmsi.isEmpty()) {
log.info("캐시 미스 {} 선박 → DB fallback", missingMmsi.size());
Map<String, List<VesselTrack>> fallbackData = fetchFromDb(missingMmsi);
// 캐시 데이터와 병합
Map<String, List<VesselTrack>> merged = new LinkedHashMap<>(cacheData);
merged.putAll(fallbackData);
cacheData = merged;
}
// 캐시에만 있고 DB에 없는 경우 (stale 캐시) 제거
int staleCount = 0;
Iterator<String> it = cacheData.keySet().iterator();
while (it.hasNext()) {
if (!dbMmsiSet.contains(it.next())) {
it.remove();
staleCount++;
}
}
if (staleCount > 0) {
log.debug("Stale 캐시 항목 {} 건 제거", staleCount);
}
log.info("Hourly Reader 준비 완료 — 총 {} 선박 (캐시: {}, DB fallback: {})",
cacheData.size(), cacheData.size() - missingMmsi.size(), missingMmsi.size());
groupIterator = cacheData.values().iterator();
}
/**
* 누락된 MMSI의 5분 트랙을 DB에서 벌크 조회
*/
private Map<String, List<VesselTrack>> fetchFromDb(Set<String> mmsiSet) {
if (mmsiSet.isEmpty()) return Collections.emptyMap();
String[] mmsiArray = mmsiSet.toArray(new String[0]);
String sql = """
SELECT mmsi, time_bucket,
public.ST_AsText(track_geom) as geom_text,
distance_nm, avg_speed, max_speed, point_count,
start_position, end_position
FROM signal.t_vessel_tracks_5min
WHERE mmsi = ANY(?)
AND time_bucket >= ? AND time_bucket < ?
AND track_geom IS NOT NULL
ORDER BY mmsi, time_bucket
""";
Map<String, List<VesselTrack>> result = new LinkedHashMap<>();
queryJdbcTemplate.query(sql,
ps -> {
ps.setArray(1, ps.getConnection().createArrayOf("varchar", mmsiArray));
ps.setTimestamp(2, Timestamp.valueOf(startTime));
ps.setTimestamp(3, Timestamp.valueOf(endTime));
},
rs -> {
String mmsi = rs.getString("mmsi");
VesselTrack track = VesselTrack.builder()
.mmsi(mmsi)
.timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime())
.trackGeom(rs.getString("geom_text"))
.distanceNm(rs.getBigDecimal("distance_nm"))
.avgSpeed(rs.getBigDecimal("avg_speed"))
.maxSpeed(rs.getBigDecimal("max_speed"))
.pointCount(rs.getInt("point_count"))
.startPosition(parseTrackPosition(rs.getString("start_position")))
.endPosition(parseTrackPosition(rs.getString("end_position")))
.build();
result.computeIfAbsent(mmsi, k -> new ArrayList<>()).add(track);
});
log.info("DB fallback 완료: {} 선박, {} 트랙",
result.size(), result.values().stream().mapToInt(List::size).sum());
return result;
}
private VesselTrack.TrackPosition parseTrackPosition(String json) {
if (json == null) return null;
try {
String latStr = LineStringMUtils.extractJsonValue(json, "lat");
String lonStr = LineStringMUtils.extractJsonValue(json, "lon");
String timeStr = LineStringMUtils.extractJsonValue(json, "time");
String sogStr = LineStringMUtils.extractJsonValue(json, "sog");
return VesselTrack.TrackPosition.builder()
.lat(latStr != null ? Double.parseDouble(latStr) : null)
.lon(lonStr != null ? Double.parseDouble(lonStr) : null)
.time(timeStr != null ? LocalDateTime.parse(timeStr, TIMESTAMP_FORMATTER) : null)
.sog(sogStr != null ? new BigDecimal(sogStr) : null)
.build();
} catch (Exception e) {
log.debug("TrackPosition 파싱 실패: {}", json);
return null;
}
}
}

파일 보기

@ -0,0 +1,107 @@
package gc.mda.signal_batch.batch.reader;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* 5분 VesselTrack 인메모리 캐시
*
* 5분 집계 DB 저장과 동시에 캐시에 보관.
* hourly job에서 DB를 거치지 않고 직접 병합에 사용.
*
* key: "mmsi::timeBucket" (: "440123456::2026-02-19T09:05")
* value: VesselTrack
* TTL: 75분 (1시간 + 15분 여유)
*/
@Slf4j
@Component
public class FiveMinTrackCache {
private static final DateTimeFormatter KEY_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm");
private Cache<String, VesselTrack> cache;
@Value("${app.cache.five-min-track.ttl-minutes:75}")
private long ttlMinutes;
@Value("${app.cache.five-min-track.max-size:500000}")
private int maxSize;
@PostConstruct
public void init() {
this.cache = Caffeine.newBuilder()
.maximumSize(maxSize)
.expireAfterWrite(ttlMinutes, TimeUnit.MINUTES)
.recordStats()
.build();
log.info("FiveMinTrackCache 초기화 — TTL: {}분, maxSize: {}", ttlMinutes, maxSize);
}
public void put(VesselTrack track) {
if (track == null || track.getMmsi() == null || track.getTimeBucket() == null) {
return;
}
cache.put(buildKey(track.getMmsi(), track.getTimeBucket()), track);
}
public void putAll(List<VesselTrack> tracks) {
if (tracks == null) return;
for (VesselTrack track : tracks) {
put(track);
}
}
/**
* 지정 시간 범위의 트랙을 MMSI별로 그루핑하여 반환
*
* @param start 시작 시각 (inclusive)
* @param end 종료 시각 (exclusive)
* @return Map<mmsi, List<VesselTrack>> (시간순 정렬)
*/
public Map<String, List<VesselTrack>> getTracksInRange(LocalDateTime start, LocalDateTime end) {
Map<String, List<VesselTrack>> result = new LinkedHashMap<>();
for (Map.Entry<String, VesselTrack> entry : cache.asMap().entrySet()) {
VesselTrack track = entry.getValue();
if (track.getTimeBucket() != null
&& !track.getTimeBucket().isBefore(start)
&& track.getTimeBucket().isBefore(end)) {
result.computeIfAbsent(track.getMmsi(), k -> new ArrayList<>()).add(track);
}
}
// MMSI별 시간순 정렬
for (List<VesselTrack> tracks : result.values()) {
tracks.sort(Comparator.comparing(VesselTrack::getTimeBucket));
}
return result;
}
public long size() {
return cache.estimatedSize();
}
public String getStats() {
var stats = cache.stats();
return String.format("size=%d, hitRate=%.1f%%, hits=%d, misses=%d",
cache.estimatedSize(),
stats.hitRate() * 100,
stats.hitCount(),
stats.missCount());
}
private String buildKey(String mmsi, LocalDateTime timeBucket) {
return mmsi + "::" + timeBucket.format(KEY_FORMATTER);
}
}

파일 보기

@ -20,5 +20,16 @@ public class DataSourceConfigProperties {
private String username; private String username;
private String password; private String password;
private String driverClassName = "org.postgresql.Driver"; private String driverClassName = "org.postgresql.Driver";
private HikariProperties hikari = new HikariProperties();
}
@Data
public static class HikariProperties {
private String poolName;
private Integer maximumPoolSize;
private Integer minimumIdle;
private Long connectionTimeout;
private Long idleTimeout;
private Long maxLifetime;
} }
} }

파일 보기

@ -4,7 +4,6 @@ import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary; import org.springframework.context.annotation.Primary;
@ -28,7 +27,6 @@ public class DevDataSourceConfig {
} }
@Bean @Bean
@ConfigurationProperties(prefix = "spring.datasource.collect.hikari")
public HikariConfig collectHikariConfig() { public HikariConfig collectHikariConfig() {
HikariConfig config = new HikariConfig(); HikariConfig config = new HikariConfig();
applyConnectionProps(config, properties.getCollect()); applyConnectionProps(config, properties.getCollect());
@ -53,7 +51,6 @@ public class DevDataSourceConfig {
} }
@Bean(name = "devQueryHikariConfig") @Bean(name = "devQueryHikariConfig")
@ConfigurationProperties(prefix = "spring.datasource.query.hikari")
public HikariConfig devQueryHikariConfig() { public HikariConfig devQueryHikariConfig() {
HikariConfig config = new HikariConfig(); HikariConfig config = new HikariConfig();
applyConnectionProps(config, properties.getQuery()); applyConnectionProps(config, properties.getQuery());
@ -78,7 +75,6 @@ public class DevDataSourceConfig {
} }
@Bean @Bean
@ConfigurationProperties(prefix = "spring.datasource.batch.hikari")
public HikariConfig batchHikariConfig() { public HikariConfig batchHikariConfig() {
HikariConfig config = new HikariConfig(); HikariConfig config = new HikariConfig();
applyConnectionProps(config, properties.getBatch()); applyConnectionProps(config, properties.getBatch());
@ -154,5 +150,16 @@ public class DevDataSourceConfig {
config.setUsername(props.getUsername()); config.setUsername(props.getUsername());
config.setPassword(props.getPassword()); config.setPassword(props.getPassword());
config.setDriverClassName(props.getDriverClassName()); config.setDriverClassName(props.getDriverClassName());
applyHikariProps(config, props.getHikari());
}
private void applyHikariProps(HikariConfig config, DataSourceConfigProperties.HikariProperties hikari) {
if (hikari == null) return;
if (hikari.getPoolName() != null) config.setPoolName(hikari.getPoolName());
if (hikari.getMaximumPoolSize() != null) config.setMaximumPoolSize(hikari.getMaximumPoolSize());
if (hikari.getMinimumIdle() != null) config.setMinimumIdle(hikari.getMinimumIdle());
if (hikari.getConnectionTimeout() != null) config.setConnectionTimeout(hikari.getConnectionTimeout());
if (hikari.getIdleTimeout() != null) config.setIdleTimeout(hikari.getIdleTimeout());
if (hikari.getMaxLifetime() != null) config.setMaxLifetime(hikari.getMaxLifetime());
} }
} }

파일 보기

@ -4,7 +4,6 @@ import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary; import org.springframework.context.annotation.Primary;
@ -29,7 +28,6 @@ public class LocalDataSourceConfig {
} }
@Bean @Bean
@ConfigurationProperties(prefix = "spring.datasource.collect.hikari")
public HikariConfig localCollectHikariConfig() { public HikariConfig localCollectHikariConfig() {
HikariConfig config = new HikariConfig(); HikariConfig config = new HikariConfig();
applyConnectionProps(config, properties.getCollect()); applyConnectionProps(config, properties.getCollect());
@ -45,7 +43,6 @@ public class LocalDataSourceConfig {
} }
@Bean(name = "localQueryHikariConfig") @Bean(name = "localQueryHikariConfig")
@ConfigurationProperties(prefix = "spring.datasource.query.hikari")
public HikariConfig localQueryHikariConfig() { public HikariConfig localQueryHikariConfig() {
HikariConfig config = new HikariConfig(); HikariConfig config = new HikariConfig();
applyConnectionProps(config, properties.getQuery()); applyConnectionProps(config, properties.getQuery());
@ -61,7 +58,6 @@ public class LocalDataSourceConfig {
} }
@Bean @Bean
@ConfigurationProperties(prefix = "spring.datasource.batch.hikari")
public HikariConfig localBatchHikariConfig() { public HikariConfig localBatchHikariConfig() {
HikariConfig config = new HikariConfig(); HikariConfig config = new HikariConfig();
applyConnectionProps(config, properties.getBatch()); applyConnectionProps(config, properties.getBatch());
@ -128,5 +124,15 @@ public class LocalDataSourceConfig {
config.setUsername(props.getUsername()); config.setUsername(props.getUsername());
config.setPassword(props.getPassword()); config.setPassword(props.getPassword());
config.setDriverClassName(props.getDriverClassName()); config.setDriverClassName(props.getDriverClassName());
applyHikariProps(config, props.getHikari());
}
private void applyHikariProps(HikariConfig config, DataSourceConfigProperties.HikariProperties hikari) {
if (hikari.getPoolName() != null) config.setPoolName(hikari.getPoolName());
if (hikari.getMaximumPoolSize() != null) config.setMaximumPoolSize(hikari.getMaximumPoolSize());
if (hikari.getMinimumIdle() != null) config.setMinimumIdle(hikari.getMinimumIdle());
if (hikari.getConnectionTimeout() != null) config.setConnectionTimeout(hikari.getConnectionTimeout());
if (hikari.getIdleTimeout() != null) config.setIdleTimeout(hikari.getIdleTimeout());
if (hikari.getMaxLifetime() != null) config.setMaxLifetime(hikari.getMaxLifetime());
} }
} }

파일 보기

@ -4,7 +4,6 @@ import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary; import org.springframework.context.annotation.Primary;
@ -28,7 +27,6 @@ public class ProdDataSourceConfig {
} }
@Bean @Bean
@ConfigurationProperties(prefix = "spring.datasource.collect.hikari")
public HikariConfig prodCollectHikariConfig() { public HikariConfig prodCollectHikariConfig() {
HikariConfig config = new HikariConfig(); HikariConfig config = new HikariConfig();
applyConnectionProps(config, properties.getCollect()); applyConnectionProps(config, properties.getCollect());
@ -45,7 +43,6 @@ public class ProdDataSourceConfig {
} }
@Bean(name = "prodQueryHikariConfig") @Bean(name = "prodQueryHikariConfig")
@ConfigurationProperties(prefix = "spring.datasource.query.hikari")
public HikariConfig prodQueryHikariConfig() { public HikariConfig prodQueryHikariConfig() {
HikariConfig config = new HikariConfig(); HikariConfig config = new HikariConfig();
applyConnectionProps(config, properties.getQuery()); applyConnectionProps(config, properties.getQuery());
@ -62,7 +59,6 @@ public class ProdDataSourceConfig {
} }
@Bean @Bean
@ConfigurationProperties(prefix = "spring.datasource.batch.hikari")
public HikariConfig prodBatchHikariConfig() { public HikariConfig prodBatchHikariConfig() {
HikariConfig config = new HikariConfig(); HikariConfig config = new HikariConfig();
applyConnectionProps(config, properties.getBatch()); applyConnectionProps(config, properties.getBatch());
@ -130,5 +126,15 @@ public class ProdDataSourceConfig {
config.setUsername(props.getUsername()); config.setUsername(props.getUsername());
config.setPassword(props.getPassword()); config.setPassword(props.getPassword());
config.setDriverClassName(props.getDriverClassName()); config.setDriverClassName(props.getDriverClassName());
applyHikariProps(config, props.getHikari());
}
private void applyHikariProps(HikariConfig config, DataSourceConfigProperties.HikariProperties hikari) {
if (hikari.getPoolName() != null) config.setPoolName(hikari.getPoolName());
if (hikari.getMaximumPoolSize() != null) config.setMaximumPoolSize(hikari.getMaximumPoolSize());
if (hikari.getMinimumIdle() != null) config.setMinimumIdle(hikari.getMinimumIdle());
if (hikari.getConnectionTimeout() != null) config.setConnectionTimeout(hikari.getConnectionTimeout());
if (hikari.getIdleTimeout() != null) config.setIdleTimeout(hikari.getIdleTimeout());
if (hikari.getMaxLifetime() != null) config.setMaxLifetime(hikari.getMaxLifetime());
} }
} }

파일 보기

@ -236,33 +236,6 @@ class AisTargetCacheManagerTest {
assertThat(cacheManager.getAllValues()).hasSize(2); assertThat(cacheManager.getAllValues()).hasSize(2);
} }
@Test
@DisplayName("getByTimeRange — 시간 범위 내 데이터만 반환")
void getByTimeRange_filtersCorrectly() {
OffsetDateTime recent = OffsetDateTime.now(ZoneOffset.UTC).minusMinutes(5);
OffsetDateTime old = OffsetDateTime.now(ZoneOffset.UTC).minusMinutes(120);
cacheManager.put(createEntity("111111111", recent));
cacheManager.put(createEntity("222222222", old));
List<AisTargetEntity> result = cacheManager.getByTimeRange(60);
assertThat(result).hasSize(1);
assertThat(result.get(0).getMmsi()).isEqualTo("111111111");
}
@Test
@DisplayName("getByTimeRange — timestamp=null 데이터 제외")
void getByTimeRange_excludesNullTimestamp() {
OffsetDateTime recent = OffsetDateTime.now(ZoneOffset.UTC).minusMinutes(5);
cacheManager.put(createEntity("111111111", recent));
cacheManager.put(createEntity("222222222", null));
List<AisTargetEntity> result = cacheManager.getByTimeRange(60);
assertThat(result).hasSize(1);
}
} }
@Nested @Nested