diff --git a/src/main/java/gc/mda/signal_batch/batch/job/HourlyAggregationStepConfig.java b/src/main/java/gc/mda/signal_batch/batch/job/HourlyAggregationStepConfig.java index b1b8f80..840ae7d 100644 --- a/src/main/java/gc/mda/signal_batch/batch/job/HourlyAggregationStepConfig.java +++ b/src/main/java/gc/mda/signal_batch/batch/job/HourlyAggregationStepConfig.java @@ -1,14 +1,14 @@ package gc.mda.signal_batch.batch.job; import gc.mda.signal_batch.domain.vessel.model.VesselTrack; -import gc.mda.signal_batch.batch.processor.HourlyTrackProcessor; -import gc.mda.signal_batch.batch.processor.HourlyTrackProcessorWithAbnormalDetection; import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector; import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetectionResult; +import gc.mda.signal_batch.batch.processor.HourlyTrackMergeProcessor; +import gc.mda.signal_batch.batch.reader.CacheBasedHourlyTrackReader; +import gc.mda.signal_batch.batch.reader.FiveMinTrackCache; import gc.mda.signal_batch.batch.writer.VesselTrackBulkWriter; import gc.mda.signal_batch.batch.writer.AbnormalTrackWriter; import gc.mda.signal_batch.batch.writer.CompositeTrackWriter; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.StepScope; @@ -29,12 +29,11 @@ import org.springframework.transaction.PlatformTransactionManager; import javax.sql.DataSource; import java.time.LocalDateTime; -import java.util.ArrayList; import java.util.List; @Slf4j @Configuration -@Profile("!query") // query 프로파일에서는 배치 작업 비활성화 +@Profile("!query") @ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true) public class HourlyAggregationStepConfig { @@ -44,6 +43,7 @@ public class HourlyAggregationStepConfig { private final VesselTrackBulkWriter vesselTrackBulkWriter; private final AbnormalTrackWriter abnormalTrackWriter; private final AbnormalTrackDetector abnormalTrackDetector; + private final FiveMinTrackCache fiveMinTrackCache; public HourlyAggregationStepConfig( JobRepository jobRepository, @@ -51,42 +51,78 @@ public class HourlyAggregationStepConfig { @Qualifier("queryTransactionManager") PlatformTransactionManager transactionManager, VesselTrackBulkWriter vesselTrackBulkWriter, AbnormalTrackWriter abnormalTrackWriter, - AbnormalTrackDetector abnormalTrackDetector) { + AbnormalTrackDetector abnormalTrackDetector, + FiveMinTrackCache fiveMinTrackCache) { this.jobRepository = jobRepository; this.queryDataSource = queryDataSource; this.transactionManager = transactionManager; this.vesselTrackBulkWriter = vesselTrackBulkWriter; this.abnormalTrackWriter = abnormalTrackWriter; this.abnormalTrackDetector = abnormalTrackDetector; + this.fiveMinTrackCache = fiveMinTrackCache; } - + @Value("${vessel.batch.chunk-size:5000}") private int chunkSize; - + + // ────────────────────────────────────────────── + // Step 1: 5분 → 시간 병합 (인메모리 캐시 기반) + // ────────────────────────────────────────────── + @Bean public Step mergeHourlyTracksStep() { - // 비정상 궤적 검출은 항상 활성화 (설정 파일로 제어) - boolean detectAbnormal = true; - - if (detectAbnormal) { - log.info("Building mergeHourlyTracksStep with abnormal detection enabled"); - return new StepBuilder("mergeHourlyTracksStep", jobRepository) - .chunk(chunkSize, transactionManager) - .reader(hourlyVesselKeyReader(null, null)) - .processor(hourlyTrackProcessorWithAbnormalDetection()) - .writer(hourlyCompositeTrackWriter()) - .build(); - } else { - log.info("Building mergeHourlyTracksStep without abnormal detection"); - return new StepBuilder("mergeHourlyTracksStep", jobRepository) - .chunk(chunkSize, transactionManager) - .reader(hourlyVesselKeyReader(null, null)) - .processor(hourlyTrackItemProcessor()) - .writer(hourlyTrackWriter()) - .build(); - } + log.info("Building mergeHourlyTracksStep with cache-based in-memory merge"); + return new StepBuilder("mergeHourlyTracksStep", jobRepository) + ., AbnormalDetectionResult>chunk(chunkSize, transactionManager) + .reader(cacheBasedHourlyTrackReader(null, null)) + .processor(hourlyTrackMergeProcessor(null)) + .writer(hourlyCompositeTrackWriter()) + .build(); } - + + @Bean + @StepScope + public CacheBasedHourlyTrackReader cacheBasedHourlyTrackReader( + @Value("#{jobParameters['startTime']}") String startTime, + @Value("#{jobParameters['endTime']}") String endTime) { + + LocalDateTime start = LocalDateTime.parse(startTime); + LocalDateTime end = LocalDateTime.parse(endTime); + + return new CacheBasedHourlyTrackReader( + fiveMinTrackCache, + new JdbcTemplate(queryDataSource), + start, end); + } + + @Bean + @StepScope + public HourlyTrackMergeProcessor hourlyTrackMergeProcessor( + @Value("#{jobParameters['timeBucket']}") String timeBucket) { + + LocalDateTime hourBucket = LocalDateTime.parse(timeBucket) + .withMinute(0).withSecond(0).withNano(0); + + return new HourlyTrackMergeProcessor( + abnormalTrackDetector, + new JdbcTemplate(queryDataSource), + hourBucket); + } + + @Bean + public ItemWriter hourlyCompositeTrackWriter() { + abnormalTrackWriter.setJobName("hourlyAggregationJob"); + return new CompositeTrackWriter( + vesselTrackBulkWriter, + abnormalTrackWriter, + "hourly" + ); + } + + // ────────────────────────────────────────────── + // Step 2: Grid Hourly Summary + // ────────────────────────────────────────────── + @Bean public Step gridHourlySummaryStep() { return new StepBuilder("gridHourlySummaryStep", jobRepository) @@ -96,85 +132,23 @@ public class HourlyAggregationStepConfig { .writer(hourlyGridWriter(null, null)) .build(); } - - @Bean - public Step areaHourlySummaryStep() { - return new StepBuilder("areaHourlySummaryStep", jobRepository) - .chunk(100, transactionManager) - .reader(hourlyAreaReader(null, null)) - .processor(hourlyAreaProcessor()) - .writer(hourlyAreaWriter(null, null)) - .build(); - } - - @Bean - @StepScope - public JdbcCursorItemReader hourlyVesselKeyReader( - @Value("#{jobParameters['startTime']}") String startTime, - @Value("#{jobParameters['endTime']}") String endTime) { - - LocalDateTime start = LocalDateTime.parse(startTime); - LocalDateTime end = LocalDateTime.parse(endTime); - - String sql = """ - SELECT DISTINCT mmsi, date_trunc('hour', time_bucket) as hour_bucket - FROM signal.t_vessel_tracks_5min - WHERE time_bucket >= ? AND time_bucket < ? - ORDER BY mmsi, hour_bucket - """; - return new JdbcCursorItemReaderBuilder() - .name("hourlyVesselKeyReader") - .dataSource(queryDataSource) - .sql(sql) - .preparedStatementSetter(ps -> { - ps.setTimestamp(1, java.sql.Timestamp.valueOf(start)); - ps.setTimestamp(2, java.sql.Timestamp.valueOf(end)); - }) - .rowMapper((rs, rowNum) -> new VesselTrack.VesselKey( - rs.getString("mmsi"), - rs.getObject("hour_bucket", LocalDateTime.class) - )) - .build(); - } - - @Bean - public ItemProcessor hourlyTrackItemProcessor() { - return new HourlyTrackProcessor(queryDataSource, new JdbcTemplate(queryDataSource)); - } - - @Bean - public ItemWriter hourlyTrackWriter() { - return items -> { - List tracks = new ArrayList<>(); - for (VesselTrack track : items) { - if (track != null) { - tracks.add(track); - } - } - if (!tracks.isEmpty()) { - vesselTrackBulkWriter.writeHourlyTracks(tracks); - } - }; - } - - // Grid summary reader @Bean @StepScope public JdbcCursorItemReader hourlyGridReader( @Value("#{jobParameters['startTime']}") String startTime, @Value("#{jobParameters['endTime']}") String endTime) { - + LocalDateTime start = LocalDateTime.parse(startTime); LocalDateTime end = LocalDateTime.parse(endTime); - + String sql = """ SELECT DISTINCT haegu_no FROM signal.t_grid_vessel_tracks WHERE time_bucket >= ? AND time_bucket < ? ORDER BY haegu_no """; - + return new JdbcCursorItemReaderBuilder() .name("hourlyGridReader") .dataSource(queryDataSource) @@ -186,39 +160,36 @@ public class HourlyAggregationStepConfig { .rowMapper((rs, rowNum) -> rs.getInt("haegu_no")) .build(); } - + @Bean public ItemProcessor hourlyGridProcessor() { - return new ItemProcessor() { - @Override - public HourlyGridSummary process(Integer haeguNo) throws Exception { - HourlyGridSummary summary = new HourlyGridSummary(); - summary.haeguNo = haeguNo; - return summary; - } + return haeguNo -> { + HourlyGridSummary summary = new HourlyGridSummary(); + summary.haeguNo = haeguNo; + return summary; }; } - + @Bean @StepScope public ItemWriter hourlyGridWriter( @Value("#{jobParameters['startTime']}") String startTime, @Value("#{jobParameters['endTime']}") String endTime) { - + return items -> { LocalDateTime start = LocalDateTime.parse(startTime); LocalDateTime end = LocalDateTime.parse(endTime); LocalDateTime hourBucket = start.withMinute(0).withSecond(0).withNano(0); - + JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource); - + for (HourlyGridSummary summary : items) { if (summary == null) continue; - + String sql = """ INSERT INTO signal.t_grid_tracks_summary_hourly (haegu_no, time_bucket, total_vessels, total_distance_nm, avg_speed, vessel_list, created_at) - SELECT + SELECT haegu_no, ?::timestamp as time_bucket, COUNT(DISTINCT mmsi) as total_vessels, @@ -242,29 +213,42 @@ public class HourlyAggregationStepConfig { vessel_list = EXCLUDED.vessel_list, created_at = NOW() """; - + jdbcTemplate.update(sql, hourBucket, summary.haeguNo, start, end); } }; } - - // Area summary reader + + // ────────────────────────────────────────────── + // Step 3: Area Hourly Summary + // ────────────────────────────────────────────── + + @Bean + public Step areaHourlySummaryStep() { + return new StepBuilder("areaHourlySummaryStep", jobRepository) + .chunk(100, transactionManager) + .reader(hourlyAreaReader(null, null)) + .processor(hourlyAreaProcessor()) + .writer(hourlyAreaWriter(null, null)) + .build(); + } + @Bean @StepScope public JdbcCursorItemReader hourlyAreaReader( @Value("#{jobParameters['startTime']}") String startTime, @Value("#{jobParameters['endTime']}") String endTime) { - + LocalDateTime start = LocalDateTime.parse(startTime); LocalDateTime end = LocalDateTime.parse(endTime); - + String sql = """ SELECT DISTINCT area_id FROM signal.t_area_vessel_tracks WHERE time_bucket >= ? AND time_bucket < ? ORDER BY area_id """; - + return new JdbcCursorItemReaderBuilder() .name("hourlyAreaReader") .dataSource(queryDataSource) @@ -276,39 +260,36 @@ public class HourlyAggregationStepConfig { .rowMapper((rs, rowNum) -> rs.getString("area_id")) .build(); } - + @Bean public ItemProcessor hourlyAreaProcessor() { - return new ItemProcessor() { - @Override - public HourlyAreaSummary process(String areaId) throws Exception { - HourlyAreaSummary summary = new HourlyAreaSummary(); - summary.areaId = areaId; - return summary; - } + return areaId -> { + HourlyAreaSummary summary = new HourlyAreaSummary(); + summary.areaId = areaId; + return summary; }; } - + @Bean @StepScope public ItemWriter hourlyAreaWriter( @Value("#{jobParameters['startTime']}") String startTime, @Value("#{jobParameters['endTime']}") String endTime) { - + return items -> { LocalDateTime start = LocalDateTime.parse(startTime); LocalDateTime end = LocalDateTime.parse(endTime); LocalDateTime hourBucket = start.withMinute(0).withSecond(0).withNano(0); - + JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource); - + for (HourlyAreaSummary summary : items) { if (summary == null) continue; - + String sql = """ INSERT INTO signal.t_area_tracks_summary_hourly (area_id, time_bucket, total_vessels, total_distance_nm, avg_speed, vessel_list, created_at) - SELECT + SELECT area_id, ?::timestamp as time_bucket, COUNT(DISTINCT mmsi) as total_vessels, @@ -332,39 +313,18 @@ public class HourlyAggregationStepConfig { vessel_list = EXCLUDED.vessel_list, created_at = NOW() """; - + jdbcTemplate.update(sql, hourBucket, summary.areaId, start, end); } }; } - - // 비정상 궤적 검출 관련 빈 정의 - @Bean - public ItemProcessor hourlyTrackProcessorWithAbnormalDetection() { - return new HourlyTrackProcessorWithAbnormalDetection( - hourlyTrackItemProcessor(), - abnormalTrackDetector, - queryDataSource - ); - } - - @Bean - public ItemWriter hourlyCompositeTrackWriter() { - // Job 이름 직접 설정 - abnormalTrackWriter.setJobName("hourlyAggregationJob"); - return new CompositeTrackWriter( - vesselTrackBulkWriter, - abnormalTrackWriter, - "hourly" - ); - } - + // Summary 클래스들 public static class HourlyGridSummary { public Integer haeguNo; } - + public static class HourlyAreaSummary { public String areaId; } -} \ No newline at end of file +} diff --git a/src/main/java/gc/mda/signal_batch/batch/job/VesselTrackStepConfig.java b/src/main/java/gc/mda/signal_batch/batch/job/VesselTrackStepConfig.java index 2211d32..2047059 100644 --- a/src/main/java/gc/mda/signal_batch/batch/job/VesselTrackStepConfig.java +++ b/src/main/java/gc/mda/signal_batch/batch/job/VesselTrackStepConfig.java @@ -9,6 +9,7 @@ import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector; import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetectionResult; import gc.mda.signal_batch.batch.reader.AisTargetCacheManager; import gc.mda.signal_batch.batch.reader.CacheBasedVesselTrackDataReader; +import gc.mda.signal_batch.batch.reader.FiveMinTrackCache; import gc.mda.signal_batch.global.util.TrackClippingUtils; import gc.mda.signal_batch.batch.writer.VesselTrackBulkWriter; import gc.mda.signal_batch.batch.writer.AbnormalTrackWriter; @@ -59,6 +60,7 @@ public class VesselTrackStepConfig { private final AbnormalTrackDetector abnormalTrackDetector; private final AbnormalTrackWriter abnormalTrackWriter; private final VesselPreviousBucketCache previousBucketCache; + private final FiveMinTrackCache fiveMinTrackCache; // 현재 처리 중인 버킷의 종료 위치 저장 (캐시 업데이트용) private final Map currentBucketEndPositions = new ConcurrentHashMap<>(); @@ -73,7 +75,8 @@ public class VesselTrackStepConfig { TrackClippingUtils trackClippingUtils, AbnormalTrackDetector abnormalTrackDetector, AbnormalTrackWriter abnormalTrackWriter, - VesselPreviousBucketCache previousBucketCache) { + VesselPreviousBucketCache previousBucketCache, + FiveMinTrackCache fiveMinTrackCache) { this.jobRepository = jobRepository; this.transactionManager = transactionManager; this.queryDataSource = queryDataSource; @@ -84,6 +87,7 @@ public class VesselTrackStepConfig { this.abnormalTrackDetector = abnormalTrackDetector; this.abnormalTrackWriter = abnormalTrackWriter; this.previousBucketCache = previousBucketCache; + this.fiveMinTrackCache = fiveMinTrackCache; } @Value("${vessel.batch.chunk-size:1000}") @@ -302,7 +306,18 @@ public class VesselTrackStepConfig { // 1. 기존 Writer로 DB 저장 vesselTrackBulkWriter.write(chunk); - // 2. 캐시 업데이트 (현재 버킷 종료 위치) + // 2. FiveMinTrackCache에 저장 (hourly 인메모리 병합용) + int cachedCount = 0; + for (List trackGroup : chunk.getItems()) { + fiveMinTrackCache.putAll(trackGroup); + cachedCount += trackGroup.size(); + } + if (cachedCount > 0) { + log.debug("FiveMinTrackCache 저장: {} 건 (총 캐시: {} 건)", + cachedCount, fiveMinTrackCache.size()); + } + + // 3. 이전 버킷 종료 위치 캐시 업데이트 if (!currentBucketEndPositions.isEmpty()) { List positions = new ArrayList<>(currentBucketEndPositions.values()); previousBucketCache.putAll(positions); diff --git a/src/main/java/gc/mda/signal_batch/batch/processor/HourlyTrackMergeProcessor.java b/src/main/java/gc/mda/signal_batch/batch/processor/HourlyTrackMergeProcessor.java new file mode 100644 index 0000000..b0561bd --- /dev/null +++ b/src/main/java/gc/mda/signal_batch/batch/processor/HourlyTrackMergeProcessor.java @@ -0,0 +1,271 @@ +package gc.mda.signal_batch.batch.processor; + +import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetectionResult; +import gc.mda.signal_batch.domain.vessel.model.VesselTrack; +import gc.mda.signal_batch.global.util.LineStringMUtils; +import gc.mda.signal_batch.global.util.TrackSimplificationUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * 인메모리 기반 Hourly Track 병합 프로세서 + * + * 5분 트랙 리스트를 받아: + * 1. WKT 좌표 연결 (Java String) + * 2. 통계 집계 (distance, speed, pointCount) + * 3. 간소화 (TrackSimplificationUtils) + * 4. 비정상 검출 (이전 버킷 1회 bulk prefetch) + * + * N+1 SQL 제거 → DB 쿼리 최대 1회 (비정상 검출용 이전 버킷 prefetch) + */ +@Slf4j +public class HourlyTrackMergeProcessor implements ItemProcessor, AbnormalDetectionResult> { + + private static final Pattern WKT_COORDS_PATTERN = Pattern.compile("LINESTRING M\\((.+)\\)"); + private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + private final AbnormalTrackDetector abnormalTrackDetector; + private final JdbcTemplate queryJdbcTemplate; + private final LocalDateTime hourBucket; + + // Lazy-init: 이전 버킷 데이터 1회 bulk prefetch + private Map previousBucketCache; + private boolean previousBucketLoaded = false; + + public HourlyTrackMergeProcessor( + AbnormalTrackDetector abnormalTrackDetector, + JdbcTemplate queryJdbcTemplate, + LocalDateTime hourBucket) { + this.abnormalTrackDetector = abnormalTrackDetector; + this.queryJdbcTemplate = queryJdbcTemplate; + this.hourBucket = hourBucket; + } + + @Override + public AbnormalDetectionResult process(List fiveMinTracks) throws Exception { + if (fiveMinTracks == null || fiveMinTracks.isEmpty()) { + return null; + } + + String mmsi = fiveMinTracks.get(0).getMmsi(); + + // Step 1: WKT 좌표 병합 + String mergedWkt = mergeTrackGeometries(fiveMinTracks); + if (mergedWkt == null) { + log.debug("병합 실패 (유효한 좌표 없음): mmsi={}", mmsi); + return null; + } + + // Step 2: 통계 집계 + BigDecimal totalDistance = BigDecimal.ZERO; + BigDecimal maxSpeed = BigDecimal.ZERO; + int totalPoints = 0; + + for (VesselTrack track : fiveMinTracks) { + if (track.getDistanceNm() != null) { + totalDistance = totalDistance.add(track.getDistanceNm()); + } + if (track.getMaxSpeed() != null && track.getMaxSpeed().compareTo(maxSpeed) > 0) { + maxSpeed = track.getMaxSpeed(); + } + if (track.getPointCount() != null) { + totalPoints += track.getPointCount(); + } + } + + // avgSpeed: M값 기반 시간 차이로 계산 + BigDecimal avgSpeed = calculateAvgSpeed(mergedWkt, totalDistance); + + VesselTrack.TrackPosition startPos = fiveMinTracks.get(0).getStartPosition(); + VesselTrack.TrackPosition endPos = fiveMinTracks.get(fiveMinTracks.size() - 1).getEndPosition(); + + // Step 3: 간소화 + String simplifiedWkt = TrackSimplificationUtils.simplifyHourlyTrack(mergedWkt); + int simplifiedPoints = countWktPoints(simplifiedWkt); + + if (!mergedWkt.equals(simplifiedWkt)) { + TrackSimplificationUtils.SimplificationStats stats = + TrackSimplificationUtils.getSimplificationStats(mergedWkt, simplifiedWkt); + log.debug("시간별 궤적 간소화 - vessel: {}, 원본: {}포인트, 간소화: {}포인트 ({}% 감소)", + mmsi, stats.originalPoints, stats.simplifiedPoints, (int) stats.reductionRate); + } + + VesselTrack hourlyTrack = VesselTrack.builder() + .mmsi(mmsi) + .timeBucket(hourBucket) + .trackGeom(simplifiedWkt) + .distanceNm(totalDistance) + .avgSpeed(avgSpeed) + .maxSpeed(maxSpeed) + .pointCount(simplifiedPoints > 0 ? simplifiedPoints : totalPoints) + .startPosition(startPos) + .endPosition(endPos) + .build(); + + // Step 4: 비정상 검출 (lazy-init, 1회 bulk prefetch) + if (!previousBucketLoaded) { + previousBucketCache = bulkFetchPreviousBucketTracks(); + previousBucketLoaded = true; + } + + VesselTrack prevTrack = previousBucketCache.get(mmsi); + AbnormalDetectionResult result = abnormalTrackDetector.detectBucketTransitionOnly(hourlyTrack, prevTrack); + + if (result.hasAbnormalities()) { + log.debug("Hourly 비정상 궤적 검출: mmsi={}, segments={}", + mmsi, result.getAbnormalSegments().size()); + } + + return result; + } + + /** + * 5분 트랙들의 WKT 좌표를 하나로 연결 + */ + private String mergeTrackGeometries(List tracks) { + StringBuilder allCoords = new StringBuilder(); + + for (VesselTrack track : tracks) { + String wkt = track.getTrackGeom(); + if (wkt == null || wkt.isEmpty()) continue; + + Matcher matcher = WKT_COORDS_PATTERN.matcher(wkt); + if (matcher.find()) { + String coords = matcher.group(1); + if (!coords.isBlank()) { + if (allCoords.length() > 0) { + allCoords.append(", "); + } + allCoords.append(coords); + } + } + } + + if (allCoords.length() == 0) { + return null; + } + + return "LINESTRING M(" + allCoords + ")"; + } + + /** + * M값(Unix timestamp) 기반 평균 속도 계산 + */ + private BigDecimal calculateAvgSpeed(String wkt, BigDecimal totalDistance) { + try { + Matcher matcher = WKT_COORDS_PATTERN.matcher(wkt); + if (!matcher.find()) return BigDecimal.ZERO; + + String coords = matcher.group(1); + String[] points = coords.split(","); + if (points.length < 2) return BigDecimal.ZERO; + + // 첫 번째 포인트의 M값 + String[] firstParts = points[0].trim().split("\\s+"); + double firstM = firstParts.length >= 3 ? Double.parseDouble(firstParts[2]) : 0; + + // 마지막 포인트의 M값 + String[] lastParts = points[points.length - 1].trim().split("\\s+"); + double lastM = lastParts.length >= 3 ? Double.parseDouble(lastParts[2]) : 0; + + double timeDiffSeconds = lastM - firstM; + if (timeDiffSeconds <= 0) return BigDecimal.ZERO; + + double timeDiffHours = timeDiffSeconds / 3600.0; + double avgSpeedVal = totalDistance.doubleValue() / timeDiffHours; + + // 비현실적 속도 제한 + avgSpeedVal = Math.min(avgSpeedVal, 9999.99); + + return BigDecimal.valueOf(avgSpeedVal).setScale(2, RoundingMode.HALF_UP); + } catch (Exception e) { + log.debug("avgSpeed 계산 실패: {}", e.getMessage()); + return BigDecimal.ZERO; + } + } + + private int countWktPoints(String wkt) { + if (wkt == null || !wkt.startsWith("LINESTRING M")) return 0; + try { + String coords = wkt.substring("LINESTRING M(".length(), wkt.length() - 1); + return coords.split(",").length; + } catch (Exception e) { + return 0; + } + } + + /** + * 비정상 검출용 — 이전 1시간의 MMSI별 마지막 5분 트랙 bulk prefetch + */ + private Map bulkFetchPreviousBucketTracks() { + LocalDateTime prevStart = hourBucket.minusHours(1); + LocalDateTime prevEnd = hourBucket; + + String sql = """ + SELECT DISTINCT ON (mmsi) + mmsi, time_bucket, end_position, + public.ST_AsText(public.ST_LineSubstring(track_geom, 0.9, 1.0)) as last_segment + FROM signal.t_vessel_tracks_5min + WHERE time_bucket >= ? AND time_bucket < ? + AND track_geom IS NOT NULL + ORDER BY mmsi, time_bucket DESC + """; + + Map result = new HashMap<>(); + + try { + queryJdbcTemplate.query(sql, + ps -> { + ps.setTimestamp(1, Timestamp.valueOf(prevStart)); + ps.setTimestamp(2, Timestamp.valueOf(prevEnd)); + }, + rs -> { + String mmsi = rs.getString("mmsi"); + VesselTrack track = VesselTrack.builder() + .mmsi(mmsi) + .timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime()) + .trackGeom(rs.getString("last_segment")) + .endPosition(parseEndPosition(rs.getString("end_position"))) + .build(); + result.put(mmsi, track); + }); + + log.info("이전 버킷 트랙 prefetch 완료: {} 선박 (기간: {} ~ {})", + result.size(), prevStart, prevEnd); + } catch (Exception e) { + log.warn("이전 버킷 트랙 prefetch 실패 (첫 실행일 수 있음): {}", e.getMessage()); + } + + return result; + } + + private VesselTrack.TrackPosition parseEndPosition(String json) { + if (json == null) return null; + try { + String lat = LineStringMUtils.extractJsonValue(json, "lat"); + String lon = LineStringMUtils.extractJsonValue(json, "lon"); + String time = LineStringMUtils.extractJsonValue(json, "time"); + String sog = LineStringMUtils.extractJsonValue(json, "sog"); + + return VesselTrack.TrackPosition.builder() + .lat(lat != null ? Double.parseDouble(lat) : null) + .lon(lon != null ? Double.parseDouble(lon) : null) + .time(time != null ? LocalDateTime.parse(time, TIMESTAMP_FORMATTER) : null) + .sog(sog != null ? new BigDecimal(sog) : null) + .build(); + } catch (Exception e) { + log.debug("end_position 파싱 실패: {}", json); + return null; + } + } +} diff --git a/src/main/java/gc/mda/signal_batch/batch/processor/HourlyTrackProcessor.java b/src/main/java/gc/mda/signal_batch/batch/processor/HourlyTrackProcessor.java deleted file mode 100644 index a32de02..0000000 --- a/src/main/java/gc/mda/signal_batch/batch/processor/HourlyTrackProcessor.java +++ /dev/null @@ -1,194 +0,0 @@ -package gc.mda.signal_batch.batch.processor; - -import gc.mda.signal_batch.domain.vessel.model.VesselTrack; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.batch.item.ItemProcessor; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.jdbc.core.JdbcTemplate; - -import java.math.BigDecimal; -import java.sql.ResultSet; -import java.sql.Timestamp; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import gc.mda.signal_batch.global.util.LineStringMUtils; -import gc.mda.signal_batch.global.util.TrackSimplificationUtils; -import javax.sql.DataSource; - -@Slf4j -@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true) -@RequiredArgsConstructor -public class HourlyTrackProcessor implements ItemProcessor { - - private final DataSource queryDataSource; - private final JdbcTemplate jdbcTemplate; - private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - - @Override - public VesselTrack process(VesselTrack.VesselKey vesselKey) throws Exception { - LocalDateTime hourBucket = vesselKey.getTimeBucket() - .withMinute(0) - .withSecond(0) - .withNano(0); - - String sql = """ - WITH ordered_tracks AS ( - SELECT * - FROM signal.t_vessel_tracks_5min - WHERE mmsi = ? - AND time_bucket >= ? - AND time_bucket < ? - AND track_geom IS NOT NULL - AND public.ST_NPoints(track_geom) > 0 - ORDER BY time_bucket - ), - merged_coords AS ( - SELECT - mmsi, - string_agg( - substring(public.ST_AsText(track_geom) from 'M \\((.+)\\)'), - ',' - ORDER BY time_bucket - ) FILTER (WHERE track_geom IS NOT NULL) as all_coords - FROM ordered_tracks - GROUP BY mmsi - ), - merged_tracks AS ( - SELECT - mc.mmsi, - TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS') as time_bucket, - public.ST_GeomFromText('LINESTRING M(' || mc.all_coords || ')') as merged_geom, - (SELECT MAX(max_speed) FROM ordered_tracks WHERE mmsi = mc.mmsi) as max_speed, - (SELECT SUM(point_count) FROM ordered_tracks WHERE mmsi = mc.mmsi) as total_points, - (SELECT MIN(time_bucket) FROM ordered_tracks WHERE mmsi = mc.mmsi) as start_time, - (SELECT MAX(time_bucket) FROM ordered_tracks WHERE mmsi = mc.mmsi) as end_time, - (SELECT start_position FROM ordered_tracks WHERE mmsi = mc.mmsi ORDER BY time_bucket LIMIT 1) as start_pos, - (SELECT end_position FROM ordered_tracks WHERE mmsi = mc.mmsi ORDER BY time_bucket DESC LIMIT 1) as end_pos - FROM merged_coords mc - ), - calculated_tracks AS ( - SELECT - *, - public.ST_Length(merged_geom::geography) / 1852.0 as total_distance, - CASE - WHEN public.ST_NPoints(merged_geom) > 0 THEN - public.ST_M(public.ST_PointN(merged_geom, public.ST_NPoints(merged_geom))) - - public.ST_M(public.ST_PointN(merged_geom, 1)) - ELSE - EXTRACT(EPOCH FROM - TO_TIMESTAMP(end_pos->>'time', 'YYYY-MM-DD HH24:MI:SS') - TO_TIMESTAMP(start_pos->>'time', 'YYYY-MM-DD HH24:MI:SS') - ) - END as time_diff_seconds - FROM merged_tracks - ) - SELECT - mmsi, - time_bucket, - merged_geom, - total_distance, - CASE - WHEN time_diff_seconds > 0 THEN - CAST(LEAST((total_distance / (time_diff_seconds / 3600.0)), 9999.99) AS numeric(6,2)) - ELSE 0 - END as avg_speed, - max_speed, - total_points, - start_time, - end_time, - start_pos, - end_pos, - public.ST_AsText(merged_geom) as geom_text - FROM calculated_tracks - """; - - LocalDateTime startTime = hourBucket; - LocalDateTime endTime = hourBucket.plusHours(1); - - Timestamp startTimestamp = Timestamp.valueOf(startTime); - Timestamp endTimestamp = Timestamp.valueOf(endTime); - Timestamp hourBucketTimestamp = Timestamp.valueOf(hourBucket); - - log.debug("HourlyTrackProcessor params - mmsi: {}, startTime: {}, endTime: {}, hourBucket: {}", - vesselKey.getMmsi(), startTimestamp, endTimestamp, hourBucketTimestamp); - - try { - return jdbcTemplate.queryForObject(sql, - (rs, rowNum) -> { - try { - return buildHourlyTrack(rs, hourBucket); - } catch (Exception e) { - throw new RuntimeException("Failed to build hourly track", e); - } - }, - vesselKey.getMmsi(), - startTimestamp, endTimestamp, hourBucketTimestamp - ); - } catch (org.springframework.dao.EmptyResultDataAccessException e) { - log.warn("No 5min data found for vessel {} in time range {}-{}, skipping hourly aggregation", - vesselKey.getMmsi(), startTimestamp, endTimestamp); - return null; - } catch (Exception e) { - log.error("Failed to process hourly track for vessel {}: {}", - vesselKey.getMmsi(), e.getMessage(), e); - return null; - } - } - - private VesselTrack buildHourlyTrack(ResultSet rs, LocalDateTime hourBucket) throws Exception { - VesselTrack.TrackPosition startPos = null; - VesselTrack.TrackPosition endPos = null; - - String startPosJson = rs.getString("start_pos"); - String endPosJson = rs.getString("end_pos"); - - if (startPosJson != null) { - startPos = parseTrackPosition(startPosJson); - } - if (endPosJson != null) { - endPos = parseTrackPosition(endPosJson); - } - - String hourlyLineStringM = rs.getString("geom_text"); - String simplifiedLineStringM = TrackSimplificationUtils.simplifyHourlyTrack(hourlyLineStringM); - - if (!hourlyLineStringM.equals(simplifiedLineStringM)) { - TrackSimplificationUtils.SimplificationStats stats = - TrackSimplificationUtils.getSimplificationStats(hourlyLineStringM, simplifiedLineStringM); - log.debug("시간별 궤적 간소화 - vessel: {}, 원본: {}포인트, 간소화: {}포인트 ({}% 감소)", - rs.getString("mmsi"), - stats.originalPoints, stats.simplifiedPoints, (int)stats.reductionRate); - } - - return VesselTrack.builder() - .mmsi(rs.getString("mmsi")) - .timeBucket(hourBucket) - .trackGeom(simplifiedLineStringM) - .distanceNm(rs.getBigDecimal("total_distance")) - .avgSpeed(rs.getBigDecimal("avg_speed")) - .maxSpeed(rs.getBigDecimal("max_speed")) - .pointCount(rs.getInt("total_points")) - .startPosition(startPos) - .endPosition(endPos) - .build(); - } - - private VesselTrack.TrackPosition parseTrackPosition(String json) { - try { - String latStr = LineStringMUtils.extractJsonValue(json, "lat"); - String lonStr = LineStringMUtils.extractJsonValue(json, "lon"); - String timeStr = LineStringMUtils.extractJsonValue(json, "time"); - String sogStr = LineStringMUtils.extractJsonValue(json, "sog"); - - return VesselTrack.TrackPosition.builder() - .lat(latStr != null ? Double.parseDouble(latStr) : null) - .lon(lonStr != null ? Double.parseDouble(lonStr) : null) - .time(timeStr != null ? LocalDateTime.parse(timeStr, TIMESTAMP_FORMATTER) : null) - .sog(sogStr != null ? new BigDecimal(sogStr) : null) - .build(); - } catch (Exception e) { - log.error("Failed to parse track position: {}", json, e); - return null; - } - } -} diff --git a/src/main/java/gc/mda/signal_batch/batch/processor/HourlyTrackProcessorWithAbnormalDetection.java b/src/main/java/gc/mda/signal_batch/batch/processor/HourlyTrackProcessorWithAbnormalDetection.java deleted file mode 100644 index 1fc7337..0000000 --- a/src/main/java/gc/mda/signal_batch/batch/processor/HourlyTrackProcessorWithAbnormalDetection.java +++ /dev/null @@ -1,38 +0,0 @@ -package gc.mda.signal_batch.batch.processor; - -import gc.mda.signal_batch.domain.vessel.model.VesselTrack; -import lombok.extern.slf4j.Slf4j; -import org.springframework.batch.item.ItemProcessor; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import javax.sql.DataSource; -import java.time.LocalDateTime; - -/** - * 시간별 궤적 프로세서 - 비정상 궤적 검출 기능 포함 - */ -@Slf4j -@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true) -public class HourlyTrackProcessorWithAbnormalDetection extends BaseTrackProcessorWithAbnormalDetection { - - public HourlyTrackProcessorWithAbnormalDetection( - ItemProcessor hourlyTrackProcessor, - AbnormalTrackDetector abnormalTrackDetector, - DataSource queryDataSource) { - super(hourlyTrackProcessor, abnormalTrackDetector, queryDataSource); - } - - @Override - protected String getPreviousTrackTableName() { - return "signal.t_vessel_tracks_5min"; - } - - @Override - protected LocalDateTime getNormalizedBucket(LocalDateTime timeBucket) { - return timeBucket.withMinute(0).withSecond(0).withNano(0); - } - - @Override - protected LocalDateTime getPreviousBucket(LocalDateTime currentBucket) { - return currentBucket.minusHours(1); - } -} \ No newline at end of file diff --git a/src/main/java/gc/mda/signal_batch/batch/reader/CacheBasedHourlyTrackReader.java b/src/main/java/gc/mda/signal_batch/batch/reader/CacheBasedHourlyTrackReader.java new file mode 100644 index 0000000..409116f --- /dev/null +++ b/src/main/java/gc/mda/signal_batch/batch/reader/CacheBasedHourlyTrackReader.java @@ -0,0 +1,181 @@ +package gc.mda.signal_batch.batch.reader; + +import gc.mda.signal_batch.domain.vessel.model.VesselTrack; +import gc.mda.signal_batch.global.util.LineStringMUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.item.ItemReader; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.*; + +/** + * 캐시 기반 Hourly Track Reader + * + * FiveMinTrackCache에서 5분 트랙을 MMSI별로 읽어 반환. + * 캐시에 없는 MMSI는 DB fallback으로 보충. + * + * 정상 운영 시: DB 쿼리 1회 (DISTINCT mmsi 완전성 확인) + * 앱 재시작 후: DB 쿼리 2회 (완전성 확인 + fallback 벌크) + */ +@Slf4j +public class CacheBasedHourlyTrackReader implements ItemReader> { + + private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + private final FiveMinTrackCache fiveMinTrackCache; + private final JdbcTemplate queryJdbcTemplate; + private final LocalDateTime startTime; + private final LocalDateTime endTime; + + private Iterator> groupIterator; + private boolean initialized = false; + + public CacheBasedHourlyTrackReader( + FiveMinTrackCache fiveMinTrackCache, + JdbcTemplate queryJdbcTemplate, + LocalDateTime startTime, + LocalDateTime endTime) { + this.fiveMinTrackCache = fiveMinTrackCache; + this.queryJdbcTemplate = queryJdbcTemplate; + this.startTime = startTime; + this.endTime = endTime; + } + + @Override + public List read() { + if (!initialized) { + initialize(); + initialized = true; + } + + if (groupIterator != null && groupIterator.hasNext()) { + return groupIterator.next(); + } + return null; + } + + private void initialize() { + // 1. 캐시에서 데이터 로드 + Map> cacheData = fiveMinTrackCache.getTracksInRange(startTime, endTime); + log.info("Hourly Reader 초기화 — 캐시에서 {} 선박 로드 (기간: {} ~ {})", + cacheData.size(), startTime, endTime); + + // 2. DB에서 해당 기간 MMSI 목록 조회 (완전성 확인) + String distinctSql = """ + SELECT DISTINCT mmsi FROM signal.t_vessel_tracks_5min + WHERE time_bucket >= ? AND time_bucket < ? + """; + + List dbMmsiList = queryJdbcTemplate.queryForList( + distinctSql, String.class, + Timestamp.valueOf(startTime), Timestamp.valueOf(endTime)); + Set dbMmsiSet = new HashSet<>(dbMmsiList); + + // 3. 캐시에 없는 MMSI 감지 + Set cacheMmsiSet = cacheData.keySet(); + Set missingMmsi = new HashSet<>(dbMmsiSet); + missingMmsi.removeAll(cacheMmsiSet); + + if (!missingMmsi.isEmpty()) { + log.info("캐시 미스 {} 선박 → DB fallback", missingMmsi.size()); + Map> fallbackData = fetchFromDb(missingMmsi); + + // 캐시 데이터와 병합 + Map> merged = new LinkedHashMap<>(cacheData); + merged.putAll(fallbackData); + cacheData = merged; + } + + // 캐시에만 있고 DB에 없는 경우 (stale 캐시) → 제거 + int staleCount = 0; + Iterator it = cacheData.keySet().iterator(); + while (it.hasNext()) { + if (!dbMmsiSet.contains(it.next())) { + it.remove(); + staleCount++; + } + } + if (staleCount > 0) { + log.debug("Stale 캐시 항목 {} 건 제거", staleCount); + } + + log.info("Hourly Reader 준비 완료 — 총 {} 선박 (캐시: {}, DB fallback: {})", + cacheData.size(), cacheData.size() - missingMmsi.size(), missingMmsi.size()); + + groupIterator = cacheData.values().iterator(); + } + + /** + * 누락된 MMSI의 5분 트랙을 DB에서 벌크 조회 + */ + private Map> fetchFromDb(Set mmsiSet) { + if (mmsiSet.isEmpty()) return Collections.emptyMap(); + + String[] mmsiArray = mmsiSet.toArray(new String[0]); + + String sql = """ + SELECT mmsi, time_bucket, + public.ST_AsText(track_geom) as geom_text, + distance_nm, avg_speed, max_speed, point_count, + start_position, end_position + FROM signal.t_vessel_tracks_5min + WHERE mmsi = ANY(?) + AND time_bucket >= ? AND time_bucket < ? + AND track_geom IS NOT NULL + ORDER BY mmsi, time_bucket + """; + + Map> result = new LinkedHashMap<>(); + + queryJdbcTemplate.query(sql, + ps -> { + ps.setArray(1, ps.getConnection().createArrayOf("varchar", mmsiArray)); + ps.setTimestamp(2, Timestamp.valueOf(startTime)); + ps.setTimestamp(3, Timestamp.valueOf(endTime)); + }, + rs -> { + String mmsi = rs.getString("mmsi"); + VesselTrack track = VesselTrack.builder() + .mmsi(mmsi) + .timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime()) + .trackGeom(rs.getString("geom_text")) + .distanceNm(rs.getBigDecimal("distance_nm")) + .avgSpeed(rs.getBigDecimal("avg_speed")) + .maxSpeed(rs.getBigDecimal("max_speed")) + .pointCount(rs.getInt("point_count")) + .startPosition(parseTrackPosition(rs.getString("start_position"))) + .endPosition(parseTrackPosition(rs.getString("end_position"))) + .build(); + + result.computeIfAbsent(mmsi, k -> new ArrayList<>()).add(track); + }); + + log.info("DB fallback 완료: {} 선박, {} 트랙", + result.size(), result.values().stream().mapToInt(List::size).sum()); + return result; + } + + private VesselTrack.TrackPosition parseTrackPosition(String json) { + if (json == null) return null; + try { + String latStr = LineStringMUtils.extractJsonValue(json, "lat"); + String lonStr = LineStringMUtils.extractJsonValue(json, "lon"); + String timeStr = LineStringMUtils.extractJsonValue(json, "time"); + String sogStr = LineStringMUtils.extractJsonValue(json, "sog"); + + return VesselTrack.TrackPosition.builder() + .lat(latStr != null ? Double.parseDouble(latStr) : null) + .lon(lonStr != null ? Double.parseDouble(lonStr) : null) + .time(timeStr != null ? LocalDateTime.parse(timeStr, TIMESTAMP_FORMATTER) : null) + .sog(sogStr != null ? new BigDecimal(sogStr) : null) + .build(); + } catch (Exception e) { + log.debug("TrackPosition 파싱 실패: {}", json); + return null; + } + } +} diff --git a/src/main/java/gc/mda/signal_batch/batch/reader/FiveMinTrackCache.java b/src/main/java/gc/mda/signal_batch/batch/reader/FiveMinTrackCache.java new file mode 100644 index 0000000..6790faf --- /dev/null +++ b/src/main/java/gc/mda/signal_batch/batch/reader/FiveMinTrackCache.java @@ -0,0 +1,107 @@ +package gc.mda.signal_batch.batch.reader; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import gc.mda.signal_batch.domain.vessel.model.VesselTrack; +import jakarta.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * 5분 VesselTrack 인메모리 캐시 + * + * 5분 집계 후 DB 저장과 동시에 캐시에 보관. + * hourly job에서 DB를 거치지 않고 직접 병합에 사용. + * + * key: "mmsi::timeBucket" (예: "440123456::2026-02-19T09:05") + * value: VesselTrack + * TTL: 75분 (1시간 + 15분 여유) + */ +@Slf4j +@Component +public class FiveMinTrackCache { + + private static final DateTimeFormatter KEY_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm"); + + private Cache cache; + + @Value("${app.cache.five-min-track.ttl-minutes:75}") + private long ttlMinutes; + + @Value("${app.cache.five-min-track.max-size:500000}") + private int maxSize; + + @PostConstruct + public void init() { + this.cache = Caffeine.newBuilder() + .maximumSize(maxSize) + .expireAfterWrite(ttlMinutes, TimeUnit.MINUTES) + .recordStats() + .build(); + log.info("FiveMinTrackCache 초기화 — TTL: {}분, maxSize: {}", ttlMinutes, maxSize); + } + + public void put(VesselTrack track) { + if (track == null || track.getMmsi() == null || track.getTimeBucket() == null) { + return; + } + cache.put(buildKey(track.getMmsi(), track.getTimeBucket()), track); + } + + public void putAll(List tracks) { + if (tracks == null) return; + for (VesselTrack track : tracks) { + put(track); + } + } + + /** + * 지정 시간 범위의 트랙을 MMSI별로 그루핑하여 반환 + * + * @param start 시작 시각 (inclusive) + * @param end 종료 시각 (exclusive) + * @return Map> (시간순 정렬) + */ + public Map> getTracksInRange(LocalDateTime start, LocalDateTime end) { + Map> result = new LinkedHashMap<>(); + + for (Map.Entry entry : cache.asMap().entrySet()) { + VesselTrack track = entry.getValue(); + if (track.getTimeBucket() != null + && !track.getTimeBucket().isBefore(start) + && track.getTimeBucket().isBefore(end)) { + result.computeIfAbsent(track.getMmsi(), k -> new ArrayList<>()).add(track); + } + } + + // MMSI별 시간순 정렬 + for (List tracks : result.values()) { + tracks.sort(Comparator.comparing(VesselTrack::getTimeBucket)); + } + + return result; + } + + public long size() { + return cache.estimatedSize(); + } + + public String getStats() { + var stats = cache.stats(); + return String.format("size=%d, hitRate=%.1f%%, hits=%d, misses=%d", + cache.estimatedSize(), + stats.hitRate() * 100, + stats.hitCount(), + stats.missCount()); + } + + private String buildKey(String mmsi, LocalDateTime timeBucket) { + return mmsi + "::" + timeBucket.format(KEY_FORMATTER); + } +} diff --git a/src/main/java/gc/mda/signal_batch/global/config/DataSourceConfigProperties.java b/src/main/java/gc/mda/signal_batch/global/config/DataSourceConfigProperties.java index a7cf0b6..e5f56b4 100644 --- a/src/main/java/gc/mda/signal_batch/global/config/DataSourceConfigProperties.java +++ b/src/main/java/gc/mda/signal_batch/global/config/DataSourceConfigProperties.java @@ -20,5 +20,16 @@ public class DataSourceConfigProperties { private String username; private String password; private String driverClassName = "org.postgresql.Driver"; + private HikariProperties hikari = new HikariProperties(); + } + + @Data + public static class HikariProperties { + private String poolName; + private Integer maximumPoolSize; + private Integer minimumIdle; + private Long connectionTimeout; + private Long idleTimeout; + private Long maxLifetime; } } \ No newline at end of file diff --git a/src/main/java/gc/mda/signal_batch/global/config/DevDataSourceConfig.java b/src/main/java/gc/mda/signal_batch/global/config/DevDataSourceConfig.java index e20d7bd..84fe0d4 100644 --- a/src/main/java/gc/mda/signal_batch/global/config/DevDataSourceConfig.java +++ b/src/main/java/gc/mda/signal_batch/global/config/DevDataSourceConfig.java @@ -4,7 +4,6 @@ import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; @@ -28,7 +27,6 @@ public class DevDataSourceConfig { } @Bean - @ConfigurationProperties(prefix = "spring.datasource.collect.hikari") public HikariConfig collectHikariConfig() { HikariConfig config = new HikariConfig(); applyConnectionProps(config, properties.getCollect()); @@ -53,7 +51,6 @@ public class DevDataSourceConfig { } @Bean(name = "devQueryHikariConfig") - @ConfigurationProperties(prefix = "spring.datasource.query.hikari") public HikariConfig devQueryHikariConfig() { HikariConfig config = new HikariConfig(); applyConnectionProps(config, properties.getQuery()); @@ -78,7 +75,6 @@ public class DevDataSourceConfig { } @Bean - @ConfigurationProperties(prefix = "spring.datasource.batch.hikari") public HikariConfig batchHikariConfig() { HikariConfig config = new HikariConfig(); applyConnectionProps(config, properties.getBatch()); @@ -154,5 +150,16 @@ public class DevDataSourceConfig { config.setUsername(props.getUsername()); config.setPassword(props.getPassword()); config.setDriverClassName(props.getDriverClassName()); + applyHikariProps(config, props.getHikari()); + } + + private void applyHikariProps(HikariConfig config, DataSourceConfigProperties.HikariProperties hikari) { + if (hikari == null) return; + if (hikari.getPoolName() != null) config.setPoolName(hikari.getPoolName()); + if (hikari.getMaximumPoolSize() != null) config.setMaximumPoolSize(hikari.getMaximumPoolSize()); + if (hikari.getMinimumIdle() != null) config.setMinimumIdle(hikari.getMinimumIdle()); + if (hikari.getConnectionTimeout() != null) config.setConnectionTimeout(hikari.getConnectionTimeout()); + if (hikari.getIdleTimeout() != null) config.setIdleTimeout(hikari.getIdleTimeout()); + if (hikari.getMaxLifetime() != null) config.setMaxLifetime(hikari.getMaxLifetime()); } } diff --git a/src/main/java/gc/mda/signal_batch/global/config/LocalDataSourceConfig.java b/src/main/java/gc/mda/signal_batch/global/config/LocalDataSourceConfig.java index 405108a..31dcf21 100644 --- a/src/main/java/gc/mda/signal_batch/global/config/LocalDataSourceConfig.java +++ b/src/main/java/gc/mda/signal_batch/global/config/LocalDataSourceConfig.java @@ -4,7 +4,6 @@ import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; @@ -29,7 +28,6 @@ public class LocalDataSourceConfig { } @Bean - @ConfigurationProperties(prefix = "spring.datasource.collect.hikari") public HikariConfig localCollectHikariConfig() { HikariConfig config = new HikariConfig(); applyConnectionProps(config, properties.getCollect()); @@ -45,7 +43,6 @@ public class LocalDataSourceConfig { } @Bean(name = "localQueryHikariConfig") - @ConfigurationProperties(prefix = "spring.datasource.query.hikari") public HikariConfig localQueryHikariConfig() { HikariConfig config = new HikariConfig(); applyConnectionProps(config, properties.getQuery()); @@ -61,7 +58,6 @@ public class LocalDataSourceConfig { } @Bean - @ConfigurationProperties(prefix = "spring.datasource.batch.hikari") public HikariConfig localBatchHikariConfig() { HikariConfig config = new HikariConfig(); applyConnectionProps(config, properties.getBatch()); @@ -128,5 +124,15 @@ public class LocalDataSourceConfig { config.setUsername(props.getUsername()); config.setPassword(props.getPassword()); config.setDriverClassName(props.getDriverClassName()); + applyHikariProps(config, props.getHikari()); + } + + private void applyHikariProps(HikariConfig config, DataSourceConfigProperties.HikariProperties hikari) { + if (hikari.getPoolName() != null) config.setPoolName(hikari.getPoolName()); + if (hikari.getMaximumPoolSize() != null) config.setMaximumPoolSize(hikari.getMaximumPoolSize()); + if (hikari.getMinimumIdle() != null) config.setMinimumIdle(hikari.getMinimumIdle()); + if (hikari.getConnectionTimeout() != null) config.setConnectionTimeout(hikari.getConnectionTimeout()); + if (hikari.getIdleTimeout() != null) config.setIdleTimeout(hikari.getIdleTimeout()); + if (hikari.getMaxLifetime() != null) config.setMaxLifetime(hikari.getMaxLifetime()); } } diff --git a/src/main/java/gc/mda/signal_batch/global/config/ProdDataSourceConfig.java b/src/main/java/gc/mda/signal_batch/global/config/ProdDataSourceConfig.java index 05574b6..80531ea 100644 --- a/src/main/java/gc/mda/signal_batch/global/config/ProdDataSourceConfig.java +++ b/src/main/java/gc/mda/signal_batch/global/config/ProdDataSourceConfig.java @@ -4,7 +4,6 @@ import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; @@ -28,7 +27,6 @@ public class ProdDataSourceConfig { } @Bean - @ConfigurationProperties(prefix = "spring.datasource.collect.hikari") public HikariConfig prodCollectHikariConfig() { HikariConfig config = new HikariConfig(); applyConnectionProps(config, properties.getCollect()); @@ -45,7 +43,6 @@ public class ProdDataSourceConfig { } @Bean(name = "prodQueryHikariConfig") - @ConfigurationProperties(prefix = "spring.datasource.query.hikari") public HikariConfig prodQueryHikariConfig() { HikariConfig config = new HikariConfig(); applyConnectionProps(config, properties.getQuery()); @@ -62,7 +59,6 @@ public class ProdDataSourceConfig { } @Bean - @ConfigurationProperties(prefix = "spring.datasource.batch.hikari") public HikariConfig prodBatchHikariConfig() { HikariConfig config = new HikariConfig(); applyConnectionProps(config, properties.getBatch()); @@ -130,5 +126,15 @@ public class ProdDataSourceConfig { config.setUsername(props.getUsername()); config.setPassword(props.getPassword()); config.setDriverClassName(props.getDriverClassName()); + applyHikariProps(config, props.getHikari()); + } + + private void applyHikariProps(HikariConfig config, DataSourceConfigProperties.HikariProperties hikari) { + if (hikari.getPoolName() != null) config.setPoolName(hikari.getPoolName()); + if (hikari.getMaximumPoolSize() != null) config.setMaximumPoolSize(hikari.getMaximumPoolSize()); + if (hikari.getMinimumIdle() != null) config.setMinimumIdle(hikari.getMinimumIdle()); + if (hikari.getConnectionTimeout() != null) config.setConnectionTimeout(hikari.getConnectionTimeout()); + if (hikari.getIdleTimeout() != null) config.setIdleTimeout(hikari.getIdleTimeout()); + if (hikari.getMaxLifetime() != null) config.setMaxLifetime(hikari.getMaxLifetime()); } } diff --git a/src/test/java/gc/mda/signal_batch/batch/reader/AisTargetCacheManagerTest.java b/src/test/java/gc/mda/signal_batch/batch/reader/AisTargetCacheManagerTest.java index 9eb87e2..e7ed887 100644 --- a/src/test/java/gc/mda/signal_batch/batch/reader/AisTargetCacheManagerTest.java +++ b/src/test/java/gc/mda/signal_batch/batch/reader/AisTargetCacheManagerTest.java @@ -236,33 +236,6 @@ class AisTargetCacheManagerTest { assertThat(cacheManager.getAllValues()).hasSize(2); } - @Test - @DisplayName("getByTimeRange — 시간 범위 내 데이터만 반환") - void getByTimeRange_filtersCorrectly() { - OffsetDateTime recent = OffsetDateTime.now(ZoneOffset.UTC).minusMinutes(5); - OffsetDateTime old = OffsetDateTime.now(ZoneOffset.UTC).minusMinutes(120); - - cacheManager.put(createEntity("111111111", recent)); - cacheManager.put(createEntity("222222222", old)); - - List result = cacheManager.getByTimeRange(60); - - assertThat(result).hasSize(1); - assertThat(result.get(0).getMmsi()).isEqualTo("111111111"); - } - - @Test - @DisplayName("getByTimeRange — timestamp=null 데이터 제외") - void getByTimeRange_excludesNullTimestamp() { - OffsetDateTime recent = OffsetDateTime.now(ZoneOffset.UTC).minusMinutes(5); - - cacheManager.put(createEntity("111111111", recent)); - cacheManager.put(createEntity("222222222", null)); - - List result = cacheManager.getByTimeRange(60); - - assertThat(result).hasSize(1); - } } @Nested