From 273d65c01a971b47670643db31c1338f1a77bd8a Mon Sep 17 00:00:00 2001 From: htlee Date: Fri, 20 Feb 2026 11:38:48 +0900 Subject: [PATCH] =?UTF-8?q?perf:=20Daily=20Job=20=EC=9D=B8=EB=A9=94?= =?UTF-8?q?=EB=AA=A8=EB=A6=AC=20=EC=BA=90=EC=8B=9C=20=EA=B8=B0=EB=B0=98=20?= =?UTF-8?q?=EC=B5=9C=EC=A0=81=ED=99=94=20=E2=80=94=20N+1=20SQL=20=EC=A0=9C?= =?UTF-8?q?=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hourly Job에 적용된 캐시 기반 병합 패턴을 Daily Job에 동일 적용: - CacheBasedDailyTrackReader: L2(HourlyTrackCache)에서 읽기 + DB fallback - DailyTrackMergeProcessor: Java 인메모리 WKT 병합 + 통계 집계 - 비정상 검출: MMSI별 개별 쿼리 → 1회 bulk prefetch - SQL ~20,000건 → ~3건 (99.98% 감소), 24분 → 30~60초 예상 삭제: DailyTrackProcessor, DailyTrackProcessorWithAbnormalDetection, BaseTrackProcessorWithAbnormalDetection (N+1 SQL 프로세서) Co-Authored-By: Claude Opus 4.6 --- .../batch/job/DailyAggregationStepConfig.java | 227 ++++++------- ...seTrackProcessorWithAbnormalDetection.java | 136 -------- .../processor/DailyTrackMergeProcessor.java | 298 ++++++++++++++++++ .../batch/processor/DailyTrackProcessor.java | 197 ------------ ...lyTrackProcessorWithAbnormalDetection.java | 38 --- .../reader/CacheBasedDailyTrackReader.java | 190 +++++++++++ 6 files changed, 579 insertions(+), 507 deletions(-) delete mode 100644 src/main/java/gc/mda/signal_batch/batch/processor/BaseTrackProcessorWithAbnormalDetection.java create mode 100644 src/main/java/gc/mda/signal_batch/batch/processor/DailyTrackMergeProcessor.java delete mode 100644 src/main/java/gc/mda/signal_batch/batch/processor/DailyTrackProcessor.java delete mode 100644 src/main/java/gc/mda/signal_batch/batch/processor/DailyTrackProcessorWithAbnormalDetection.java create mode 100644 src/main/java/gc/mda/signal_batch/batch/reader/CacheBasedDailyTrackReader.java diff --git a/src/main/java/gc/mda/signal_batch/batch/job/DailyAggregationStepConfig.java b/src/main/java/gc/mda/signal_batch/batch/job/DailyAggregationStepConfig.java index 1ab85e3..00ffb8e 100644 --- a/src/main/java/gc/mda/signal_batch/batch/job/DailyAggregationStepConfig.java +++ b/src/main/java/gc/mda/signal_batch/batch/job/DailyAggregationStepConfig.java @@ -1,23 +1,20 @@ package gc.mda.signal_batch.batch.job; import gc.mda.signal_batch.domain.vessel.model.VesselTrack; -import gc.mda.signal_batch.batch.processor.DailyTrackProcessor; -import gc.mda.signal_batch.batch.processor.DailyTrackProcessorWithAbnormalDetection; +import gc.mda.signal_batch.batch.processor.DailyTrackMergeProcessor; import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector; import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetectionResult; +import gc.mda.signal_batch.batch.reader.CacheBasedDailyTrackReader; +import gc.mda.signal_batch.batch.reader.HourlyTrackCache; import gc.mda.signal_batch.batch.writer.VesselTrackBulkWriter; import gc.mda.signal_batch.batch.writer.AbnormalTrackWriter; import gc.mda.signal_batch.batch.writer.CompositeTrackWriter; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; -import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; -import org.springframework.batch.item.database.JdbcCursorItemReader; -import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -29,7 +26,6 @@ import org.springframework.transaction.PlatformTransactionManager; import javax.sql.DataSource; import java.time.LocalDateTime; -import java.util.ArrayList; import java.util.List; @Slf4j @@ -44,6 +40,7 @@ public class DailyAggregationStepConfig { private final VesselTrackBulkWriter vesselTrackBulkWriter; private final AbnormalTrackWriter abnormalTrackWriter; private final AbnormalTrackDetector abnormalTrackDetector; + private final HourlyTrackCache hourlyTrackCache; public DailyAggregationStepConfig( JobRepository jobRepository, @@ -51,42 +48,72 @@ public class DailyAggregationStepConfig { @Qualifier("queryTransactionManager") PlatformTransactionManager transactionManager, VesselTrackBulkWriter vesselTrackBulkWriter, AbnormalTrackWriter abnormalTrackWriter, - AbnormalTrackDetector abnormalTrackDetector) { + AbnormalTrackDetector abnormalTrackDetector, + HourlyTrackCache hourlyTrackCache) { this.jobRepository = jobRepository; this.queryDataSource = queryDataSource; this.transactionManager = transactionManager; this.vesselTrackBulkWriter = vesselTrackBulkWriter; this.abnormalTrackWriter = abnormalTrackWriter; this.abnormalTrackDetector = abnormalTrackDetector; + this.hourlyTrackCache = hourlyTrackCache; } - + @Value("${vessel.batch.chunk-size:5000}") private int chunkSize; - + @Bean public Step mergeDailyTracksStep() { - // 비정상 궤적 검출은 항상 활성화 (설정 파일로 제어) - boolean detectAbnormal = true; - - if (detectAbnormal) { - log.info("Building mergeDailyTracksStep with abnormal detection enabled"); - return new StepBuilder("mergeDailyTracksStep", jobRepository) - .chunk(chunkSize, transactionManager) - .reader(dailyVesselKeyReader(null, null)) - .processor(dailyTrackProcessorWithAbnormalDetection()) - .writer(dailyCompositeTrackWriter()) - .build(); - } else { - log.info("Building mergeDailyTracksStep without abnormal detection"); - return new StepBuilder("mergeDailyTracksStep", jobRepository) - .chunk(chunkSize, transactionManager) - .reader(dailyVesselKeyReader(null, null)) - .processor(dailyTrackItemProcessor()) - .writer(dailyTrackWriter()) - .build(); - } + log.info("Building mergeDailyTracksStep with cache-based in-memory merge"); + return new StepBuilder("mergeDailyTracksStep", jobRepository) + ., AbnormalDetectionResult>chunk(chunkSize, transactionManager) + .reader(cacheBasedDailyTrackReader(null, null)) + .processor(dailyTrackMergeProcessor(null)) + .writer(dailyCompositeTrackWriter()) + .listener(dailyTrackMergeProcessor(null)) + .build(); } - + + @Bean + @StepScope + public CacheBasedDailyTrackReader cacheBasedDailyTrackReader( + @Value("#{jobParameters['startTime']}") String startTime, + @Value("#{jobParameters['endTime']}") String endTime) { + + LocalDateTime start = LocalDateTime.parse(startTime); + LocalDateTime end = LocalDateTime.parse(endTime); + + return new CacheBasedDailyTrackReader( + hourlyTrackCache, + new JdbcTemplate(queryDataSource), + start, + end); + } + + @Bean + @StepScope + public DailyTrackMergeProcessor dailyTrackMergeProcessor( + @Value("#{jobParameters['startTime']}") String startTime) { + + LocalDateTime dayBucket = LocalDateTime.parse(startTime) + .withHour(0).withMinute(0).withSecond(0).withNano(0); + + return new DailyTrackMergeProcessor( + abnormalTrackDetector, + new JdbcTemplate(queryDataSource), + dayBucket); + } + + @Bean + public ItemWriter dailyCompositeTrackWriter() { + abnormalTrackWriter.setJobName("dailyAggregationJob"); + return new CompositeTrackWriter( + vesselTrackBulkWriter, + abnormalTrackWriter, + "daily" + ); + } + @Bean public Step gridDailySummaryStep() { return new StepBuilder("gridDailySummaryStep", jobRepository) @@ -96,7 +123,7 @@ public class DailyAggregationStepConfig { .writer(dailyGridWriter(null, null)) .build(); } - + @Bean public Step areaDailySummaryStep() { return new StepBuilder("areaDailySummaryStep", jobRepository) @@ -106,75 +133,24 @@ public class DailyAggregationStepConfig { .writer(dailyAreaWriter(null, null)) .build(); } - - @Bean - @StepScope - public JdbcCursorItemReader dailyVesselKeyReader( - @Value("#{jobParameters['startTime']}") String startTime, - @Value("#{jobParameters['endTime']}") String endTime) { - - LocalDateTime start = LocalDateTime.parse(startTime); - LocalDateTime end = LocalDateTime.parse(endTime); - - String sql = """ - SELECT DISTINCT mmsi, date_trunc('day', time_bucket) as day_bucket - FROM signal.t_vessel_tracks_hourly - WHERE time_bucket >= ? AND time_bucket < ? - ORDER BY mmsi, day_bucket - """; - return new JdbcCursorItemReaderBuilder() - .name("dailyVesselKeyReader") - .dataSource(queryDataSource) - .sql(sql) - .preparedStatementSetter(ps -> { - ps.setTimestamp(1, java.sql.Timestamp.valueOf(start)); - ps.setTimestamp(2, java.sql.Timestamp.valueOf(end)); - }) - .rowMapper((rs, rowNum) -> new VesselTrack.VesselKey( - rs.getString("mmsi"), - rs.getObject("day_bucket", LocalDateTime.class) - )) - .build(); - } - - @Bean - public ItemProcessor dailyTrackItemProcessor() { - return new DailyTrackProcessor(queryDataSource, new JdbcTemplate(queryDataSource)); - } - - @Bean - public ItemWriter dailyTrackWriter() { - return items -> { - List tracks = new ArrayList<>(); - for (VesselTrack track : items) { - if (track != null) { - tracks.add(track); - } - } - if (!tracks.isEmpty()) { - vesselTrackBulkWriter.writeDailyTracks(tracks); - } - }; - } - @Bean @StepScope - public JdbcCursorItemReader dailyGridReader( + public org.springframework.batch.item.database.JdbcCursorItemReader dailyGridReader( @Value("#{jobParameters['startTime']}") String startTime, @Value("#{jobParameters['endTime']}") String endTime) { - + LocalDateTime start = LocalDateTime.parse(startTime); LocalDateTime end = LocalDateTime.parse(endTime); - + String sql = """ SELECT DISTINCT haegu_no FROM signal.t_grid_tracks_summary_hourly WHERE time_bucket >= ? AND time_bucket < ? ORDER BY haegu_no """; - - return new JdbcCursorItemReaderBuilder() + + return new org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder() .name("dailyGridReader") .dataSource(queryDataSource) .sql(sql) @@ -185,36 +161,36 @@ public class DailyAggregationStepConfig { .rowMapper((rs, rowNum) -> rs.getInt("haegu_no")) .build(); } - + @Bean - public ItemProcessor dailyGridProcessor() { + public org.springframework.batch.item.ItemProcessor dailyGridProcessor() { return haeguNo -> { DailyGridSummary summary = new DailyGridSummary(); summary.haeguNo = haeguNo; return summary; }; } - + @Bean @StepScope public ItemWriter dailyGridWriter( @Value("#{jobParameters['startTime']}") String startTime, @Value("#{jobParameters['endTime']}") String endTime) { - + return items -> { LocalDateTime start = LocalDateTime.parse(startTime); LocalDateTime end = LocalDateTime.parse(endTime); LocalDateTime dayBucket = start.withHour(0).withMinute(0).withSecond(0).withNano(0); - + JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource); - + for (DailyGridSummary summary : items) { if (summary == null) continue; - + String sql = """ INSERT INTO signal.t_grid_tracks_summary_daily (haegu_no, time_bucket, total_vessels, total_distance_nm, avg_speed, vessel_list, created_at) - SELECT + SELECT haegu_no, ?::timestamp as time_bucket, COUNT(DISTINCT vessel_key) as total_vessels, @@ -239,29 +215,29 @@ public class DailyAggregationStepConfig { vessel_list = EXCLUDED.vessel_list, created_at = NOW() """; - + jdbcTemplate.update(sql, dayBucket, summary.haeguNo, start, end); } }; } - + @Bean @StepScope - public JdbcCursorItemReader dailyAreaReader( + public org.springframework.batch.item.database.JdbcCursorItemReader dailyAreaReader( @Value("#{jobParameters['startTime']}") String startTime, @Value("#{jobParameters['endTime']}") String endTime) { - + LocalDateTime start = LocalDateTime.parse(startTime); LocalDateTime end = LocalDateTime.parse(endTime); - + String sql = """ SELECT DISTINCT area_id FROM signal.t_area_tracks_summary_hourly WHERE time_bucket >= ? AND time_bucket < ? ORDER BY area_id """; - - return new JdbcCursorItemReaderBuilder() + + return new org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder() .name("dailyAreaReader") .dataSource(queryDataSource) .sql(sql) @@ -272,36 +248,36 @@ public class DailyAggregationStepConfig { .rowMapper((rs, rowNum) -> rs.getString("area_id")) .build(); } - + @Bean - public ItemProcessor dailyAreaProcessor() { + public org.springframework.batch.item.ItemProcessor dailyAreaProcessor() { return areaId -> { DailyAreaSummary summary = new DailyAreaSummary(); summary.areaId = areaId; return summary; }; } - + @Bean @StepScope public ItemWriter dailyAreaWriter( @Value("#{jobParameters['startTime']}") String startTime, @Value("#{jobParameters['endTime']}") String endTime) { - + return items -> { LocalDateTime start = LocalDateTime.parse(startTime); LocalDateTime end = LocalDateTime.parse(endTime); LocalDateTime dayBucket = start.withHour(0).withMinute(0).withSecond(0).withNano(0); - + JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource); - + for (DailyAreaSummary summary : items) { if (summary == null) continue; - + String sql = """ INSERT INTO signal.t_area_tracks_summary_daily (area_id, time_bucket, total_vessels, total_distance_nm, avg_speed, vessel_list, created_at) - SELECT + SELECT area_id, ?::timestamp as time_bucket, COUNT(DISTINCT vessel_key) as total_vessels, @@ -326,39 +302,18 @@ public class DailyAggregationStepConfig { vessel_list = EXCLUDED.vessel_list, created_at = NOW() """; - + jdbcTemplate.update(sql, dayBucket, summary.areaId, start, end); } }; } - - // 비정상 궤적 검출 관련 빈 정의 - @Bean - public ItemProcessor dailyTrackProcessorWithAbnormalDetection() { - return new DailyTrackProcessorWithAbnormalDetection( - dailyTrackItemProcessor(), - abnormalTrackDetector, - queryDataSource - ); - } - - @Bean - public ItemWriter dailyCompositeTrackWriter() { - // Job 이름 직접 설정 - abnormalTrackWriter.setJobName("dailyAggregationJob"); - return new CompositeTrackWriter( - vesselTrackBulkWriter, - abnormalTrackWriter, - "daily" - ); - } - + // Summary 클래스들 public static class DailyGridSummary { public Integer haeguNo; } - + public static class DailyAreaSummary { public String areaId; } -} \ No newline at end of file +} diff --git a/src/main/java/gc/mda/signal_batch/batch/processor/BaseTrackProcessorWithAbnormalDetection.java b/src/main/java/gc/mda/signal_batch/batch/processor/BaseTrackProcessorWithAbnormalDetection.java deleted file mode 100644 index dc5d836..0000000 --- a/src/main/java/gc/mda/signal_batch/batch/processor/BaseTrackProcessorWithAbnormalDetection.java +++ /dev/null @@ -1,136 +0,0 @@ -package gc.mda.signal_batch.batch.processor; - -import gc.mda.signal_batch.domain.vessel.model.VesselTrack; -import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetectionResult; -import gc.mda.signal_batch.global.util.LineStringMUtils; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.batch.item.ItemProcessor; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.jdbc.core.JdbcTemplate; - -import javax.sql.DataSource; -import java.math.BigDecimal; -import java.sql.Timestamp; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; - - -/** - * 시간별/일별 궤적 프로세서 기본 클래스 - 비정상 궤적 검출 기능 포함 - */ -@Slf4j -@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true) -@RequiredArgsConstructor -public abstract class BaseTrackProcessorWithAbnormalDetection implements ItemProcessor { - - protected final ItemProcessor trackProcessor; - protected final AbnormalTrackDetector abnormalTrackDetector; - protected final DataSource queryDataSource; - - private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - - @Override - public AbnormalDetectionResult process(VesselTrack.VesselKey vesselKey) throws Exception { - // 기존 프로세서로 궤적 생성 - VesselTrack track = trackProcessor.process(vesselKey); - - if (track == null) { - return null; - } - - // 이전 bucket의 마지막 궤적 조회 - VesselTrack previousTrack = getPreviousBucketLastTrack(vesselKey); - - // Bucket 간 연결점만 검사 (하위 데이터는 이미 검증됨) - AbnormalDetectionResult result = abnormalTrackDetector.detectBucketTransitionOnly(track, previousTrack); - - if (result.hasAbnormalities()) { - log.debug("Abnormal track detected for vessel {} at {}: {}", - track.getMmsi(), track.getTimeBucket(), - result.getAbnormalSegments().size()); - } - - return result; - } - - /** - * 이전 버킷의 마지막 궤적 조회 - */ - protected VesselTrack getPreviousBucketLastTrack(VesselTrack.VesselKey vesselKey) { - try { - String sql = """ - SELECT mmsi, time_bucket, - end_position, - public.ST_AsText(public.ST_LineSubstring(track_geom, 0.9, 1.0)) as last_segment - FROM %s - WHERE mmsi = ? - AND time_bucket >= ? - AND time_bucket < ? - ORDER BY time_bucket DESC - LIMIT 1 - """.formatted(getPreviousTrackTableName()); - - LocalDateTime currentBucket = getNormalizedBucket(vesselKey.getTimeBucket()); - LocalDateTime previousBucket = getPreviousBucket(currentBucket); - - // Convert to java.sql.Timestamp for proper PostgreSQL type handling - Timestamp previousBucketTimestamp = Timestamp.valueOf(previousBucket); - Timestamp currentBucketTimestamp = Timestamp.valueOf(currentBucket); - - JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource); - return jdbcTemplate.queryForObject(sql, - (rs, rowNum) -> { - return VesselTrack.builder() - .mmsi(rs.getString("mmsi")) - .timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime()) - .trackGeom(rs.getString("last_segment")) - .endPosition(parseEndPosition(rs.getString("end_position"))) - .build(); - }, - vesselKey.getMmsi(), previousBucketTimestamp, currentBucketTimestamp - ); - } catch (Exception e) { - log.debug("No previous bucket track found for vessel {}", vesselKey); - return null; - } - } - - /** - * JSON 형식의 end_position 파싱 - */ - protected VesselTrack.TrackPosition parseEndPosition(String json) { - if (json == null) return null; - try { - String lat = LineStringMUtils.extractJsonValue(json, "lat"); - String lon = LineStringMUtils.extractJsonValue(json, "lon"); - String time = LineStringMUtils.extractJsonValue(json, "time"); - String sog = LineStringMUtils.extractJsonValue(json, "sog"); - - return VesselTrack.TrackPosition.builder() - .lat(lat != null ? Double.parseDouble(lat) : null) - .lon(lon != null ? Double.parseDouble(lon) : null) - .time(time != null ? LocalDateTime.parse(time, TIMESTAMP_FORMATTER) : null) - .sog(sog != null ? new BigDecimal(sog) : null) - .build(); - } catch (Exception e) { - log.error("Failed to parse end position: {}", json, e); - return null; - } - } - - /** - * 이전 트랙을 조회할 테이블명 반환 (하위 클래스에서 구현) - */ - protected abstract String getPreviousTrackTableName(); - - /** - * 정규화된 버킷 시간 반환 (하위 클래스에서 구현) - */ - protected abstract LocalDateTime getNormalizedBucket(LocalDateTime timeBucket); - - /** - * 이전 버킷 시간 계산 (하위 클래스에서 구현) - */ - protected abstract LocalDateTime getPreviousBucket(LocalDateTime currentBucket); -} \ No newline at end of file diff --git a/src/main/java/gc/mda/signal_batch/batch/processor/DailyTrackMergeProcessor.java b/src/main/java/gc/mda/signal_batch/batch/processor/DailyTrackMergeProcessor.java new file mode 100644 index 0000000..44041b1 --- /dev/null +++ b/src/main/java/gc/mda/signal_batch/batch/processor/DailyTrackMergeProcessor.java @@ -0,0 +1,298 @@ +package gc.mda.signal_batch.batch.processor; + +import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetectionResult; +import gc.mda.signal_batch.domain.vessel.model.VesselTrack; +import gc.mda.signal_batch.global.util.LineStringMUtils; +import gc.mda.signal_batch.global.util.TrackSimplificationUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.StepExecutionListener; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * 인메모리 기반 Daily Track 병합 프로세서 + * + * Hourly 트랙 리스트를 받아: + * 1. WKT 좌표 연결 (Java String) + * 2. 통계 집계 (distance, speed, pointCount) + * 3. 간소화 (TrackSimplificationUtils.simplifyDailyTrack) + * 4. 비정상 검출 (이전 날짜 1회 bulk prefetch) + * + * N+1 SQL 제거 → DB 쿼리 최대 1회 (비정상 검출용 이전 날짜 prefetch) + */ +@Slf4j +public class DailyTrackMergeProcessor + implements ItemProcessor, AbnormalDetectionResult>, StepExecutionListener { + + private static final Pattern WKT_COORDS_PATTERN = Pattern.compile("LINESTRING M\\((.+)\\)"); + private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + private final AbnormalTrackDetector abnormalTrackDetector; + private final JdbcTemplate queryJdbcTemplate; + private final LocalDateTime dayBucket; + + // Lazy-init: 이전 날짜 데이터 1회 bulk prefetch + private Map previousDayCache; + private boolean previousDayLoaded = false; + + // Step 레벨 집계 카운터 + private int totalProcessed = 0; + private int mergeFailCount = 0; + private int simplifiedCount = 0; + private int abnormalCount = 0; + private int avgSpeedFailCount = 0; + + public DailyTrackMergeProcessor( + AbnormalTrackDetector abnormalTrackDetector, + JdbcTemplate queryJdbcTemplate, + LocalDateTime dayBucket) { + this.abnormalTrackDetector = abnormalTrackDetector; + this.queryJdbcTemplate = queryJdbcTemplate; + this.dayBucket = dayBucket; + } + + @Override + public AbnormalDetectionResult process(List hourlyTracks) throws Exception { + if (hourlyTracks == null || hourlyTracks.isEmpty()) { + return null; + } + + String mmsi = hourlyTracks.get(0).getMmsi(); + totalProcessed++; + + // Step 1: WKT 좌표 병합 + String mergedWkt = mergeTrackGeometries(hourlyTracks); + if (mergedWkt == null) { + mergeFailCount++; + return null; + } + + // Step 2: 통계 집계 + BigDecimal totalDistance = BigDecimal.ZERO; + BigDecimal maxSpeed = BigDecimal.ZERO; + int totalPoints = 0; + + for (VesselTrack track : hourlyTracks) { + if (track.getDistanceNm() != null) { + totalDistance = totalDistance.add(track.getDistanceNm()); + } + if (track.getMaxSpeed() != null && track.getMaxSpeed().compareTo(maxSpeed) > 0) { + maxSpeed = track.getMaxSpeed(); + } + if (track.getPointCount() != null) { + totalPoints += track.getPointCount(); + } + } + + // avgSpeed: M값 기반 시간 차이로 계산 + BigDecimal avgSpeed = calculateAvgSpeed(mergedWkt, totalDistance); + + VesselTrack.TrackPosition startPos = hourlyTracks.get(0).getStartPosition(); + VesselTrack.TrackPosition endPos = hourlyTracks.get(hourlyTracks.size() - 1).getEndPosition(); + + // Step 3: 일별 간소화 + String simplifiedWkt = TrackSimplificationUtils.simplifyDailyTrack(mergedWkt); + int simplifiedPoints = countWktPoints(simplifiedWkt); + + if (!mergedWkt.equals(simplifiedWkt)) { + simplifiedCount++; + } + + VesselTrack dailyTrack = VesselTrack.builder() + .mmsi(mmsi) + .timeBucket(dayBucket) + .trackGeom(simplifiedWkt) + .distanceNm(totalDistance) + .avgSpeed(avgSpeed) + .maxSpeed(maxSpeed) + .pointCount(simplifiedPoints > 0 ? simplifiedPoints : totalPoints) + .startPosition(startPos) + .endPosition(endPos) + .build(); + + // Step 4: 비정상 검출 (lazy-init, 1회 bulk prefetch) + if (!previousDayLoaded) { + previousDayCache = bulkFetchPreviousDayLastTracks(); + previousDayLoaded = true; + } + + VesselTrack prevTrack = previousDayCache.get(mmsi); + AbnormalDetectionResult result = abnormalTrackDetector.detectBucketTransitionOnly(dailyTrack, prevTrack); + + if (result.hasAbnormalities()) { + abnormalCount++; + } + + return result; + } + + /** + * Hourly 트랙들의 WKT 좌표를 하나로 연결 + */ + private String mergeTrackGeometries(List tracks) { + StringBuilder allCoords = new StringBuilder(); + + for (VesselTrack track : tracks) { + String wkt = track.getTrackGeom(); + if (wkt == null || wkt.isEmpty()) continue; + + Matcher matcher = WKT_COORDS_PATTERN.matcher(wkt); + if (matcher.find()) { + String coords = matcher.group(1); + if (!coords.isBlank()) { + if (allCoords.length() > 0) { + allCoords.append(", "); + } + allCoords.append(coords); + } + } + } + + if (allCoords.length() == 0) { + return null; + } + + return "LINESTRING M(" + allCoords + ")"; + } + + /** + * M값(Unix timestamp) 기반 평균 속도 계산 + */ + private BigDecimal calculateAvgSpeed(String wkt, BigDecimal totalDistance) { + try { + Matcher matcher = WKT_COORDS_PATTERN.matcher(wkt); + if (!matcher.find()) return BigDecimal.ZERO; + + String coords = matcher.group(1); + String[] points = coords.split(","); + if (points.length < 2) return BigDecimal.ZERO; + + // 첫 번째 포인트의 M값 + String[] firstParts = points[0].trim().split("\\s+"); + double firstM = firstParts.length >= 3 ? Double.parseDouble(firstParts[2]) : 0; + + // 마지막 포인트의 M값 + String[] lastParts = points[points.length - 1].trim().split("\\s+"); + double lastM = lastParts.length >= 3 ? Double.parseDouble(lastParts[2]) : 0; + + double timeDiffSeconds = lastM - firstM; + if (timeDiffSeconds <= 0) return BigDecimal.ZERO; + + double timeDiffHours = timeDiffSeconds / 3600.0; + double avgSpeedVal = totalDistance.doubleValue() / timeDiffHours; + + // 비현실적 속도 제한 + avgSpeedVal = Math.min(avgSpeedVal, 9999.99); + + return BigDecimal.valueOf(avgSpeedVal).setScale(2, RoundingMode.HALF_UP); + } catch (Exception e) { + avgSpeedFailCount++; + return BigDecimal.ZERO; + } + } + + @Override + public void beforeStep(StepExecution stepExecution) { + // no-op + } + + @Override + public ExitStatus afterStep(StepExecution stepExecution) { + log.info("Daily 병합 처리 집계 — 총: {}, 병합실패: {}, 간소화: {}, 비정상: {}, avgSpeed실패: {}", + totalProcessed, mergeFailCount, simplifiedCount, abnormalCount, avgSpeedFailCount); + return null; + } + + private int countWktPoints(String wkt) { + if (wkt == null || !wkt.startsWith("LINESTRING M")) return 0; + try { + String coords = wkt.substring("LINESTRING M(".length(), wkt.length() - 1); + return coords.split(",").length; + } catch (Exception e) { + return 0; + } + } + + /** + * 비정상 검출용 — 전일(dayBucket-1일)의 MMSI별 마지막 hourly 트랙 bulk prefetch + */ + private Map bulkFetchPreviousDayLastTracks() { + LocalDateTime prevStart = dayBucket.minusDays(1); + LocalDateTime prevEnd = dayBucket; + + String sql = """ + SELECT DISTINCT ON (mmsi) + mmsi, time_bucket, end_position, + public.ST_AsText(public.ST_LineSubstring(track_geom, 0.9, 1.0)) as last_segment + FROM signal.t_vessel_tracks_hourly + WHERE time_bucket >= ? AND time_bucket < ? + AND track_geom IS NOT NULL + ORDER BY mmsi, time_bucket DESC + """; + + Map result = new HashMap<>(); + int[] parseFailCount = {0}; + + try { + queryJdbcTemplate.query(sql, + ps -> { + ps.setTimestamp(1, Timestamp.valueOf(prevStart)); + ps.setTimestamp(2, Timestamp.valueOf(prevEnd)); + }, + rs -> { + String mmsi = rs.getString("mmsi"); + VesselTrack.TrackPosition endPos = parseEndPosition(rs.getString("end_position")); + if (endPos == null && rs.getString("end_position") != null) { + parseFailCount[0]++; + } + VesselTrack track = VesselTrack.builder() + .mmsi(mmsi) + .timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime()) + .trackGeom(rs.getString("last_segment")) + .endPosition(endPos) + .build(); + result.put(mmsi, track); + }); + + log.info("전일 트랙 prefetch 완료: {} 선박 (기간: {} ~ {})", + result.size(), prevStart, prevEnd); + if (parseFailCount[0] > 0) { + log.debug("end_position 파싱 실패: {} 건", parseFailCount[0]); + } + } catch (Exception e) { + log.warn("전일 트랙 prefetch 실패 (첫 실행일 수 있음): {}", e.getMessage()); + } + + return result; + } + + private VesselTrack.TrackPosition parseEndPosition(String json) { + if (json == null) return null; + try { + String lat = LineStringMUtils.extractJsonValue(json, "lat"); + String lon = LineStringMUtils.extractJsonValue(json, "lon"); + String time = LineStringMUtils.extractJsonValue(json, "time"); + String sog = LineStringMUtils.extractJsonValue(json, "sog"); + + return VesselTrack.TrackPosition.builder() + .lat(lat != null ? Double.parseDouble(lat) : null) + .lon(lon != null ? Double.parseDouble(lon) : null) + .time(time != null ? LocalDateTime.parse(time, TIMESTAMP_FORMATTER) : null) + .sog(sog != null ? new BigDecimal(sog) : null) + .build(); + } catch (Exception e) { + return null; + } + } +} diff --git a/src/main/java/gc/mda/signal_batch/batch/processor/DailyTrackProcessor.java b/src/main/java/gc/mda/signal_batch/batch/processor/DailyTrackProcessor.java deleted file mode 100644 index 5fc4300..0000000 --- a/src/main/java/gc/mda/signal_batch/batch/processor/DailyTrackProcessor.java +++ /dev/null @@ -1,197 +0,0 @@ -package gc.mda.signal_batch.batch.processor; - -import gc.mda.signal_batch.domain.vessel.model.VesselTrack; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.batch.item.ItemProcessor; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.jdbc.core.JdbcTemplate; - -import java.math.BigDecimal; -import java.sql.ResultSet; -import java.sql.Timestamp; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import gc.mda.signal_batch.global.util.LineStringMUtils; -import gc.mda.signal_batch.global.util.TrackSimplificationUtils; - -import javax.sql.DataSource; - - -@Slf4j -@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true) -@RequiredArgsConstructor -public class DailyTrackProcessor implements ItemProcessor { - - private final DataSource queryDataSource; - private final JdbcTemplate jdbcTemplate; - private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - - @Override - public VesselTrack process(VesselTrack.VesselKey vesselKey) throws Exception { - LocalDateTime dayBucket = vesselKey.getTimeBucket() - .withHour(0) - .withMinute(0) - .withSecond(0) - .withNano(0); - - String sql = """ - WITH ordered_tracks AS ( - SELECT * - FROM signal.t_vessel_tracks_hourly - WHERE mmsi = ? - AND time_bucket >= ? - AND time_bucket < ? - AND track_geom IS NOT NULL - AND public.ST_NPoints(track_geom) > 0 - ORDER BY time_bucket - ), - merged_coords AS ( - SELECT - mmsi, - string_agg( - substring(public.ST_AsText(track_geom) from 'M \\((.+)\\)'), - ',' - ORDER BY time_bucket - ) FILTER (WHERE track_geom IS NOT NULL) as all_coords - FROM ordered_tracks - GROUP BY mmsi - ), - merged_tracks AS ( - SELECT - mc.mmsi, - TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS') as time_bucket, - public.ST_GeomFromText('LINESTRING M(' || mc.all_coords || ')') as merged_geom, - (SELECT MAX(max_speed) FROM ordered_tracks WHERE mmsi = mc.mmsi) as max_speed, - (SELECT SUM(point_count) FROM ordered_tracks WHERE mmsi = mc.mmsi) as total_points, - (SELECT MIN(time_bucket) FROM ordered_tracks WHERE mmsi = mc.mmsi) as start_time, - (SELECT MAX(time_bucket) FROM ordered_tracks WHERE mmsi = mc.mmsi) as end_time, - (SELECT start_position FROM ordered_tracks WHERE mmsi = mc.mmsi ORDER BY time_bucket LIMIT 1) as start_pos, - (SELECT end_position FROM ordered_tracks WHERE mmsi = mc.mmsi ORDER BY time_bucket DESC LIMIT 1) as end_pos - FROM merged_coords mc - ), - calculated_tracks AS ( - SELECT - *, - public.ST_Length(merged_geom::geography) / 1852.0 as total_distance, - CASE - WHEN public.ST_NPoints(merged_geom) > 0 THEN - public.ST_M(public.ST_PointN(merged_geom, public.ST_NPoints(merged_geom))) - - public.ST_M(public.ST_PointN(merged_geom, 1)) - ELSE - EXTRACT(EPOCH FROM - TO_TIMESTAMP(end_pos->>'time', 'YYYY-MM-DD HH24:MI:SS') - TO_TIMESTAMP(start_pos->>'time', 'YYYY-MM-DD HH24:MI:SS') - ) - END as time_diff_seconds - FROM merged_tracks - ) - SELECT - mmsi, - time_bucket, - merged_geom, - total_distance, - CASE - WHEN time_diff_seconds > 0 THEN - CAST(LEAST((total_distance / (time_diff_seconds / 3600.0)), 9999.99) AS numeric(6,2)) - ELSE 0 - END as avg_speed, - max_speed, - total_points, - start_time, - end_time, - start_pos, - end_pos, - public.ST_AsText(merged_geom) as geom_text - FROM calculated_tracks - """; - - LocalDateTime startTime = dayBucket; - LocalDateTime endTime = dayBucket.plusDays(1); - - Timestamp startTimestamp = Timestamp.valueOf(startTime); - Timestamp endTimestamp = Timestamp.valueOf(endTime); - Timestamp dayBucketTimestamp = Timestamp.valueOf(dayBucket); - - log.debug("DailyTrackProcessor params - mmsi: {}, startTime: {}, endTime: {}, dayBucket: {}", - vesselKey.getMmsi(), startTimestamp, endTimestamp, dayBucketTimestamp); - - try { - return jdbcTemplate.queryForObject(sql, - (rs, rowNum) -> { - try { - return buildDailyTrack(rs, dayBucket); - } catch (Exception e) { - throw new RuntimeException("Failed to build daily track", e); - } - }, - vesselKey.getMmsi(), - startTimestamp, endTimestamp, dayBucketTimestamp - ); - } catch (org.springframework.dao.EmptyResultDataAccessException e) { - log.warn("No hourly data found for vessel {} in time range {}-{}, skipping daily aggregation", - vesselKey.getMmsi(), startTimestamp, endTimestamp); - return null; - } catch (Exception e) { - log.error("Failed to process daily track for vessel {}: {}", - vesselKey.getMmsi(), e.getMessage(), e); - return null; - } - } - - private VesselTrack buildDailyTrack(ResultSet rs, LocalDateTime dayBucket) throws Exception { - VesselTrack.TrackPosition startPos = null; - VesselTrack.TrackPosition endPos = null; - - String startPosJson = rs.getString("start_pos"); - String endPosJson = rs.getString("end_pos"); - - if (startPosJson != null) { - startPos = parseTrackPosition(startPosJson); - } - if (endPosJson != null) { - endPos = parseTrackPosition(endPosJson); - } - - String dailyLineStringM = rs.getString("geom_text"); - String simplifiedLineStringM = TrackSimplificationUtils.simplifyDailyTrack(dailyLineStringM); - - if (!dailyLineStringM.equals(simplifiedLineStringM)) { - TrackSimplificationUtils.SimplificationStats stats = - TrackSimplificationUtils.getSimplificationStats(dailyLineStringM, simplifiedLineStringM); - log.debug("일별 궤적 간소화 - vessel: {}, 원본: {}포인트, 간소화: {}포인트 ({}% 감소)", - rs.getString("mmsi"), - stats.originalPoints, stats.simplifiedPoints, (int)stats.reductionRate); - } - - return VesselTrack.builder() - .mmsi(rs.getString("mmsi")) - .timeBucket(dayBucket) - .trackGeom(simplifiedLineStringM) - .distanceNm(rs.getBigDecimal("total_distance")) - .avgSpeed(rs.getBigDecimal("avg_speed")) - .maxSpeed(rs.getBigDecimal("max_speed")) - .pointCount(rs.getInt("total_points")) - .startPosition(startPos) - .endPosition(endPos) - .build(); - } - - private VesselTrack.TrackPosition parseTrackPosition(String json) { - try { - String latStr = LineStringMUtils.extractJsonValue(json, "lat"); - String lonStr = LineStringMUtils.extractJsonValue(json, "lon"); - String timeStr = LineStringMUtils.extractJsonValue(json, "time"); - String sogStr = LineStringMUtils.extractJsonValue(json, "sog"); - - return VesselTrack.TrackPosition.builder() - .lat(latStr != null ? Double.parseDouble(latStr) : null) - .lon(lonStr != null ? Double.parseDouble(lonStr) : null) - .time(timeStr != null ? LocalDateTime.parse(timeStr, TIMESTAMP_FORMATTER) : null) - .sog(sogStr != null ? new BigDecimal(sogStr) : null) - .build(); - } catch (Exception e) { - log.error("Failed to parse track position: {}", json, e); - return null; - } - } -} diff --git a/src/main/java/gc/mda/signal_batch/batch/processor/DailyTrackProcessorWithAbnormalDetection.java b/src/main/java/gc/mda/signal_batch/batch/processor/DailyTrackProcessorWithAbnormalDetection.java deleted file mode 100644 index 7c169d4..0000000 --- a/src/main/java/gc/mda/signal_batch/batch/processor/DailyTrackProcessorWithAbnormalDetection.java +++ /dev/null @@ -1,38 +0,0 @@ -package gc.mda.signal_batch.batch.processor; - -import gc.mda.signal_batch.domain.vessel.model.VesselTrack; -import lombok.extern.slf4j.Slf4j; -import org.springframework.batch.item.ItemProcessor; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import javax.sql.DataSource; -import java.time.LocalDateTime; - -/** - * 일별 궤적 프로세서 - 비정상 궤적 검출 기능 포함 - */ -@Slf4j -@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true) -public class DailyTrackProcessorWithAbnormalDetection extends BaseTrackProcessorWithAbnormalDetection { - - public DailyTrackProcessorWithAbnormalDetection( - ItemProcessor dailyTrackProcessor, - AbnormalTrackDetector abnormalTrackDetector, - DataSource queryDataSource) { - super(dailyTrackProcessor, abnormalTrackDetector, queryDataSource); - } - - @Override - protected String getPreviousTrackTableName() { - return "signal.t_vessel_tracks_hourly"; - } - - @Override - protected LocalDateTime getNormalizedBucket(LocalDateTime timeBucket) { - return timeBucket.withHour(0).withMinute(0).withSecond(0).withNano(0); - } - - @Override - protected LocalDateTime getPreviousBucket(LocalDateTime currentBucket) { - return currentBucket.minusDays(1); - } -} diff --git a/src/main/java/gc/mda/signal_batch/batch/reader/CacheBasedDailyTrackReader.java b/src/main/java/gc/mda/signal_batch/batch/reader/CacheBasedDailyTrackReader.java new file mode 100644 index 0000000..fcbe38b --- /dev/null +++ b/src/main/java/gc/mda/signal_batch/batch/reader/CacheBasedDailyTrackReader.java @@ -0,0 +1,190 @@ +package gc.mda.signal_batch.batch.reader; + +import gc.mda.signal_batch.domain.vessel.model.VesselTrack; +import gc.mda.signal_batch.global.util.LineStringMUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.item.ItemReader; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.*; + +/** + * 캐시 기반 Daily Track Reader + * + * HourlyTrackCache(L2)에서 시간별 트랙을 MMSI별로 읽어 반환. + * 캐시에 없는 MMSI는 DB fallback으로 보충. + * + * 정상 운영 시: DB 쿼리 1회 (DISTINCT mmsi 완전성 확인) + * 앱 재시작 후: DB 쿼리 2회 (완전성 확인 + fallback 벌크) + */ +@Slf4j +public class CacheBasedDailyTrackReader implements ItemReader> { + + private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + private final HourlyTrackCache hourlyTrackCache; + private final JdbcTemplate queryJdbcTemplate; + private final LocalDateTime startTime; + private final LocalDateTime endTime; + + private Iterator> groupIterator; + private boolean initialized = false; + + public CacheBasedDailyTrackReader( + HourlyTrackCache hourlyTrackCache, + JdbcTemplate queryJdbcTemplate, + LocalDateTime startTime, + LocalDateTime endTime) { + this.hourlyTrackCache = hourlyTrackCache; + this.queryJdbcTemplate = queryJdbcTemplate; + this.startTime = startTime; + this.endTime = endTime; + } + + @Override + public List read() { + if (!initialized) { + initialize(); + initialized = true; + } + + if (groupIterator != null && groupIterator.hasNext()) { + return groupIterator.next(); + } + return null; + } + + private void initialize() { + // 1. L2 캐시에서 데이터 로드 + Map> cacheData = hourlyTrackCache.getTracksInRange(startTime, endTime); + log.info("Daily Reader 초기화 — 캐시에서 {} 선박 로드 (기간: {} ~ {})", + cacheData.size(), startTime, endTime); + + // 2. DB에서 해당 기간 MMSI 목록 조회 (완전성 확인) + String distinctSql = """ + SELECT DISTINCT mmsi FROM signal.t_vessel_tracks_hourly + WHERE time_bucket >= ? AND time_bucket < ? + """; + + List dbMmsiList = queryJdbcTemplate.queryForList( + distinctSql, String.class, + Timestamp.valueOf(startTime), Timestamp.valueOf(endTime)); + Set dbMmsiSet = new HashSet<>(dbMmsiList); + + // 3. 캐시에 없는 MMSI 감지 + Set cacheMmsiSet = cacheData.keySet(); + Set missingMmsi = new HashSet<>(dbMmsiSet); + missingMmsi.removeAll(cacheMmsiSet); + + if (!missingMmsi.isEmpty()) { + log.info("캐시 미스 {} 선박 → DB fallback", missingMmsi.size()); + Map> fallbackData = fetchFromDb(missingMmsi); + + // 캐시 데이터와 병합 + Map> merged = new LinkedHashMap<>(cacheData); + merged.putAll(fallbackData); + cacheData = merged; + } + + // 캐시에만 있고 DB에 없는 경우 (stale 캐시) → 제거 + int staleCount = 0; + Iterator it = cacheData.keySet().iterator(); + while (it.hasNext()) { + if (!dbMmsiSet.contains(it.next())) { + it.remove(); + staleCount++; + } + } + if (staleCount > 0) { + log.debug("Stale 캐시 항목 {} 건 제거", staleCount); + } + + log.info("Daily Reader 준비 완료 — 총 {} 선박 (캐시: {}, DB fallback: {})", + cacheData.size(), cacheData.size() - missingMmsi.size(), missingMmsi.size()); + + groupIterator = cacheData.values().iterator(); + } + + /** + * 누락된 MMSI의 시간별 트랙을 DB에서 벌크 조회 + */ + private Map> fetchFromDb(Set mmsiSet) { + if (mmsiSet.isEmpty()) return Collections.emptyMap(); + + String[] mmsiArray = mmsiSet.toArray(new String[0]); + + String sql = """ + SELECT mmsi, time_bucket, + public.ST_AsText(track_geom) as geom_text, + distance_nm, avg_speed, max_speed, point_count, + start_position, end_position + FROM signal.t_vessel_tracks_hourly + WHERE mmsi = ANY(?) + AND time_bucket >= ? AND time_bucket < ? + AND track_geom IS NOT NULL + ORDER BY mmsi, time_bucket + """; + + Map> result = new LinkedHashMap<>(); + int[] parseFailCount = {0}; + + queryJdbcTemplate.query(sql, + ps -> { + ps.setArray(1, ps.getConnection().createArrayOf("varchar", mmsiArray)); + ps.setTimestamp(2, Timestamp.valueOf(startTime)); + ps.setTimestamp(3, Timestamp.valueOf(endTime)); + }, + rs -> { + String mmsi = rs.getString("mmsi"); + VesselTrack.TrackPosition startPos = parseTrackPosition(rs.getString("start_position")); + VesselTrack.TrackPosition endPos = parseTrackPosition(rs.getString("end_position")); + if ((startPos == null && rs.getString("start_position") != null) + || (endPos == null && rs.getString("end_position") != null)) { + parseFailCount[0]++; + } + VesselTrack track = VesselTrack.builder() + .mmsi(mmsi) + .timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime()) + .trackGeom(rs.getString("geom_text")) + .distanceNm(rs.getBigDecimal("distance_nm")) + .avgSpeed(rs.getBigDecimal("avg_speed")) + .maxSpeed(rs.getBigDecimal("max_speed")) + .pointCount(rs.getInt("point_count")) + .startPosition(startPos) + .endPosition(endPos) + .build(); + + result.computeIfAbsent(mmsi, k -> new ArrayList<>()).add(track); + }); + + log.info("DB fallback 완료: {} 선박, {} 트랙", + result.size(), result.values().stream().mapToInt(List::size).sum()); + if (parseFailCount[0] > 0) { + log.debug("TrackPosition 파싱 실패: {} 건", parseFailCount[0]); + } + return result; + } + + private VesselTrack.TrackPosition parseTrackPosition(String json) { + if (json == null) return null; + try { + String latStr = LineStringMUtils.extractJsonValue(json, "lat"); + String lonStr = LineStringMUtils.extractJsonValue(json, "lon"); + String timeStr = LineStringMUtils.extractJsonValue(json, "time"); + String sogStr = LineStringMUtils.extractJsonValue(json, "sog"); + + return VesselTrack.TrackPosition.builder() + .lat(latStr != null ? Double.parseDouble(latStr) : null) + .lon(lonStr != null ? Double.parseDouble(lonStr) : null) + .time(timeStr != null ? LocalDateTime.parse(timeStr, TIMESTAMP_FORMATTER) : null) + .sog(sogStr != null ? new BigDecimal(sogStr) : null) + .build(); + } catch (Exception e) { + return null; + } + } +}