From 732f69eb02de181c34481c2d2cb750ff0f00bc6b Mon Sep 17 00:00:00 2001 From: htlee Date: Thu, 19 Feb 2026 11:09:27 +0900 Subject: [PATCH 1/3] =?UTF-8?q?feat:=20Stale=20=EB=8D=B0=EC=9D=B4=ED=84=B0?= =?UTF-8?q?=20=EB=B9=84=EC=A0=95=EC=83=81=20=EA=B6=A4=EC=A0=81=20=EC=A0=84?= =?UTF-8?q?=ED=99=98=20=E2=80=94=20=EA=B3=BC=EA=B1=B0=20timestamp=20?= =?UTF-8?q?=EC=88=98=EC=8B=A0=20=EC=8B=9C=20=EC=A0=95=EB=B3=B4=20=EB=B3=B4?= =?UTF-8?q?=EC=A1=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - CacheBasedVesselTrackDataReader: stale 그룹 스킵 제거 → 전체 통과 + 감지 로깅 - VesselTrackStepConfig: Processor에서 stale time_bucket 감지 시 비정상 궤적으로 전환 - abnormal_type: stale_timestamp (신규 분류) - time_bucket: 현재 5분 버킷으로 오버라이드 (파티션 존재 보장) - details: 원본 time_bucket, 지연 시간(분/시), 속도/거리 정보 Co-Authored-By: Claude Opus 4.6 --- .../batch/job/VesselTrackStepConfig.java | 79 ++++++++++++++++++- .../CacheBasedVesselTrackDataReader.java | 13 ++- 2 files changed, 81 insertions(+), 11 deletions(-) diff --git a/src/main/java/gc/mda/signal_batch/batch/job/VesselTrackStepConfig.java b/src/main/java/gc/mda/signal_batch/batch/job/VesselTrackStepConfig.java index 2047059..14e33eb 100644 --- a/src/main/java/gc/mda/signal_batch/batch/job/VesselTrackStepConfig.java +++ b/src/main/java/gc/mda/signal_batch/batch/job/VesselTrackStepConfig.java @@ -35,8 +35,11 @@ import org.springframework.context.annotation.Profile; import javax.sql.DataSource; import java.sql.Timestamp; +import java.time.Duration; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -93,9 +96,6 @@ public class VesselTrackStepConfig { @Value("${vessel.batch.chunk-size:1000}") private int chunkSize; - @Value("${partition.retention.tables.t_vessel_tracks_5min.retention-days:7}") - private int trackRetentionDays; - @PostConstruct public void init() { // 5분 Job의 이름을 명시적으로 설정 @@ -116,7 +116,7 @@ public class VesselTrackStepConfig { @Bean @StepScope public CacheBasedVesselTrackDataReader trackDataReader() { - return new CacheBasedVesselTrackDataReader(aisTargetCacheManager, trackRetentionDays); + return new CacheBasedVesselTrackDataReader(aisTargetCacheManager); } @Bean @@ -140,8 +140,15 @@ public class VesselTrackStepConfig { // 3. 강화된 비정상 궤적 필터링 (버킷 내 + 버킷 간 점프 검출) List filteredTracks = new ArrayList<>(); + LocalDateTime staleCutoff = LocalDateTime.now().toLocalDate().atStartOfDay(); for (VesselTrack track : tracks) { + // Stale 데이터 감지 → 비정상 궤적으로 전환 (정상 집계에서 누락) + if (track.getTimeBucket() != null && track.getTimeBucket().isBefore(staleCutoff)) { + saveStaleAbnormalTrack(track); + continue; + } + boolean isAbnormal = false; String abnormalReason = ""; @@ -285,6 +292,70 @@ public class VesselTrackStepConfig { } } + /** + * Stale 데이터(오늘 이전 time_bucket)를 비정상 궤적으로 전환 저장 + * - time_bucket: 현재 5분 버킷으로 오버라이드 (파티션 존재 보장) + * - abnormal_type: stale_timestamp + * - details: 원본 time_bucket, 지연 시간(분/시), 속도/거리 + */ + private void saveStaleAbnormalTrack(VesselTrack track) { + LocalDateTime now = LocalDateTime.now(); + LocalDateTime currentBucket = now.withSecond(0).withNano(0) + .minusMinutes(now.getMinute() % 5); + LocalDateTime originalTimeBucket = track.getTimeBucket(); + long delayMinutes = Duration.between(originalTimeBucket, now).toMinutes(); + + VesselTrack staleTrack = VesselTrack.builder() + .mmsi(track.getMmsi()) + .timeBucket(currentBucket) + .trackGeom(track.getTrackGeom()) + .distanceNm(track.getDistanceNm()) + .avgSpeed(track.getAvgSpeed()) + .maxSpeed(track.getMaxSpeed()) + .pointCount(track.getPointCount()) + .startPosition(track.getStartPosition()) + .endPosition(track.getEndPosition()) + .build(); + + log.info("Stale 데이터 비정상 전환: MMSI={}, 원본={}, 현재={}, 지연={}분", + track.getMmsi(), originalTimeBucket, currentBucket, delayMinutes); + + Map details = new HashMap<>(); + details.put("originalTimeBucket", originalTimeBucket.toString()); + details.put("currentTimeBucket", currentBucket.toString()); + details.put("delayMinutes", delayMinutes); + details.put("delayHours", delayMinutes / 60); + if (track.getAvgSpeed() != null) details.put("avgSpeed", track.getAvgSpeed()); + if (track.getDistanceNm() != null) details.put("distanceNm", track.getDistanceNm()); + if (track.getPointCount() != null) details.put("pointCount", track.getPointCount()); + + List segments = List.of( + AbnormalTrackDetector.AbnormalSegment.builder() + .type("stale_timestamp") + .startIndex(0) + .endIndex(track.getPointCount() != null + ? Math.max(track.getPointCount() - 1, 0) : 0) + .actualValue(delayMinutes) + .threshold(0) + .description(String.format("Stale 데이터: 원본 %s, 지연 %d분 (%d시간)", + originalTimeBucket, delayMinutes, delayMinutes / 60)) + .details(details) + .build()); + + AbnormalDetectionResult result = AbnormalDetectionResult.builder() + .originalTrack(staleTrack) + .correctedTrack(null) + .abnormalSegments(segments) + .hasAbnormalities(true) + .build(); + + try { + abnormalTrackWriter.write(new Chunk<>(List.of(result))); + } catch (Exception e) { + log.error("Stale 비정상 궤적 저장 실패: MMSI={}", track.getMmsi(), e); + } + } + // CompositeItemWriter로 3개 테이블에 동시 저장 @Bean @StepScope diff --git a/src/main/java/gc/mda/signal_batch/batch/reader/CacheBasedVesselTrackDataReader.java b/src/main/java/gc/mda/signal_batch/batch/reader/CacheBasedVesselTrackDataReader.java index ab7d28d..c1a0a3d 100644 --- a/src/main/java/gc/mda/signal_batch/batch/reader/CacheBasedVesselTrackDataReader.java +++ b/src/main/java/gc/mda/signal_batch/batch/reader/CacheBasedVesselTrackDataReader.java @@ -29,7 +29,6 @@ import java.util.stream.Collectors; public class CacheBasedVesselTrackDataReader implements ItemReader> { private final AisTargetCacheManager cacheManager; - private final int staleDataThresholdDays; private Iterator> groupIterator; private boolean initialized = false; @@ -59,11 +58,12 @@ public class CacheBasedVesselTrackDataReader implements ItemReader> allGroups = new ArrayList<>(); long totalPoints = 0; int totalVessels = 0; - int skippedStaleGroups = 0; + int staleGroups = 0; for (Map.Entry> entry : trackBuffer.entrySet()) { List vesselDataList = entry.getValue().stream() @@ -88,16 +88,15 @@ public class CacheBasedVesselTrackDataReader implements ItemReader> bucketEntry : bucketGroups.entrySet()) { if (bucketEntry.getKey().isBefore(staleCutoff)) { - skippedStaleGroups++; - continue; + staleGroups++; } allGroups.add(bucketEntry.getValue()); totalPoints += bucketEntry.getValue().size(); } } - if (skippedStaleGroups > 0) { - log.info("Stale data 필터: {}개 그룹 스킵 ({}일 이전 데이터)", skippedStaleGroups, staleDataThresholdDays); + if (staleGroups > 0) { + log.info("Stale 그룹 감지: {}건 (기준: {} 이전, 비정상 궤적으로 전환 예정)", staleGroups, staleCutoff); } log.info("트랙 버퍼 Reader 초기화: {} 선박, {} 그룹(MMSI×버킷), {} 포인트 (평균 {}pt/그룹)", totalVessels, allGroups.size(), totalPoints, -- 2.45.2 From a6d886c61b006116d6cdfe32931a2a1a522dbb32 Mon Sep 17 00:00:00 2001 From: htlee Date: Thu, 19 Feb 2026 11:12:49 +0900 Subject: [PATCH 2/3] =?UTF-8?q?fix:=20hourly=20job=20timeBucket=20?= =?UTF-8?q?=ED=8C=8C=EB=9D=BC=EB=AF=B8=ED=84=B0=20=EB=AC=B8=EC=9E=90?= =?UTF-8?q?=EC=97=B4=20"hourly"=20=E2=86=92=20=EC=8B=A4=EC=A0=9C=20?= =?UTF-8?q?=EC=8B=9C=EA=B0=84=EA=B0=92=20=EC=A0=84=EB=8B=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit VesselBatchScheduler에서 hourlyAggregationJob 실행 시 timeBucket 파라미터에 "hourly" 문자열을 전달하여 HourlyTrackMergeProcessor의 LocalDateTime.parse() 실패. startTime (정시 값)을 전달하도록 수정. Co-Authored-By: Claude Opus 4.6 --- .../gc/mda/signal_batch/batch/job/VesselBatchScheduler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/gc/mda/signal_batch/batch/job/VesselBatchScheduler.java b/src/main/java/gc/mda/signal_batch/batch/job/VesselBatchScheduler.java index ab8fb31..c1dbf5d 100644 --- a/src/main/java/gc/mda/signal_batch/batch/job/VesselBatchScheduler.java +++ b/src/main/java/gc/mda/signal_batch/batch/job/VesselBatchScheduler.java @@ -162,7 +162,7 @@ public class VesselBatchScheduler { JobParameters params = new JobParametersBuilder() .addString("startTime", startTime.toString()) .addString("endTime", endTime.toString()) - .addString("timeBucket", "hourly") + .addString("timeBucket", startTime.toString()) .addString("executionTime", now.toString()) .addString("enableAbnormalDetection", String.valueOf(abnormalDetectionEnabled)) .toJobParameters(); -- 2.45.2 From 4dd40b7231a473c5391795d969bb57ea789ecb18 Mon Sep 17 00:00:00 2001 From: htlee Date: Thu, 19 Feb 2026 12:17:16 +0900 Subject: [PATCH 3/3] =?UTF-8?q?perf:=20vesselStaticSyncStep=20N+1=20?= =?UTF-8?q?=EC=BF=BC=EB=A6=AC=20=EC=A0=9C=EA=B1=B0=20=E2=80=94=20DISTINCT?= =?UTF-8?q?=20ON=20bulk=20SELECT=EB=A1=9C=20=EC=A0=84=ED=99=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 이전: MMSI별 개별 SELECT (~10만 쿼리) → 수 분 소요 이후: DISTINCT ON (mmsi) 1회 bulk SELECT → 인메모리 CDC 비교 Co-Authored-By: Claude Opus 4.6 --- .../batch/job/VesselStaticStepConfig.java | 76 +++++++++++++------ 1 file changed, 52 insertions(+), 24 deletions(-) diff --git a/src/main/java/gc/mda/signal_batch/batch/job/VesselStaticStepConfig.java b/src/main/java/gc/mda/signal_batch/batch/job/VesselStaticStepConfig.java index e282052..d7cee45 100644 --- a/src/main/java/gc/mda/signal_batch/batch/job/VesselStaticStepConfig.java +++ b/src/main/java/gc/mda/signal_batch/batch/job/VesselStaticStepConfig.java @@ -54,6 +54,8 @@ public class VesselStaticStepConfig { public Step vesselStaticSyncStep() { return new StepBuilder("vesselStaticSyncStep", jobRepository) .tasklet((contribution, chunkContext) -> { + long stepStart = System.currentTimeMillis(); + // 1. 캐시에서 전체 데이터 → MMSI별 그룹 Collection allEntities = cacheManager.getAllValues(); @@ -70,18 +72,16 @@ public class VesselStaticStepConfig { Map coalesced = coalesceByMmsi(allEntities); JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource); + Timestamp hourBucketTs = Timestamp.valueOf(hourBucket); - // 2. CDC: 이전 레코드와 비교 → 변경 시에만 INSERT - String selectPrevSql = """ - SELECT imo, name, callsign, vessel_type, extra_info, - length, width, draught, destination, status, - signal_kind_code, class_type - FROM signal.t_vessel_static - WHERE mmsi = ? AND time_bucket <= ? - ORDER BY time_bucket DESC - LIMIT 1 - """; + // 2. CDC: bulk SELECT로 이전 레코드 전체 조회 (N+1 → 1회) + Map> prevRecords = bulkFetchPreviousRecords( + jdbcTemplate, hourBucketTs); + log.info("t_vessel_static CDC 비교 시작 — 현재: {} 선박, 이전: {} 레코드", + coalesced.size(), prevRecords.size()); + + // 3. 인메모리 비교 → 변경 시에만 INSERT String insertSql = """ INSERT INTO signal.t_vessel_static ( mmsi, time_bucket, imo, name, callsign, @@ -104,26 +104,16 @@ public class VesselStaticStepConfig { class_type = EXCLUDED.class_type """; - Timestamp hourBucketTs = Timestamp.valueOf(hourBucket); int inserted = 0; int skipped = 0; - List batchArgs = new ArrayList<>(); for (Map.Entry entry : coalesced.entrySet()) { String mmsi = entry.getKey(); AisTargetEntity current = entry.getValue(); - // 이전 레코드 조회 - boolean changed; - try { - Map prev = jdbcTemplate.queryForMap( - selectPrevSql, mmsi, hourBucketTs); - changed = hasStaticInfoChanged(current, prev); - } catch (org.springframework.dao.EmptyResultDataAccessException e) { - // 이전 레코드 없음 → 첫 INSERT - changed = true; - } + Map prev = prevRecords.get(mmsi); + boolean changed = (prev == null) || hasStaticInfoChanged(current, prev); if (changed) { Timestamp etaTs = current.getEta() != null @@ -148,14 +138,52 @@ public class VesselStaticStepConfig { jdbcTemplate.batchUpdate(insertSql, batchArgs); } - log.info("t_vessel_static 동기화 완료: 총 {} 선박, INSERT {} 건, CDC 스킵 {} 건", - coalesced.size(), inserted, skipped); + long elapsed = System.currentTimeMillis() - stepStart; + log.info("t_vessel_static 동기화 완료: 총 {} 선박, INSERT {} 건, CDC 스킵 {} 건 ({}ms)", + coalesced.size(), inserted, skipped, elapsed); return org.springframework.batch.repeat.RepeatStatus.FINISHED; }, transactionManager) .build(); } + /** + * DISTINCT ON (mmsi)로 전체 이전 레코드를 1회 bulk 조회 + * N+1 개별 SELECT → 1회 bulk SELECT로 최적화 + */ + private Map> bulkFetchPreviousRecords( + JdbcTemplate jdbcTemplate, Timestamp hourBucketTs) { + String sql = """ + SELECT DISTINCT ON (mmsi) + mmsi, imo, name, callsign, vessel_type, extra_info, + length, width, draught, destination, status, + signal_kind_code, class_type + FROM signal.t_vessel_static + WHERE time_bucket <= ? + ORDER BY mmsi, time_bucket DESC + """; + + Map> result = new HashMap<>(); + jdbcTemplate.query(sql, rs -> { + Map row = new HashMap<>(); + row.put("imo", rs.getObject("imo")); + row.put("name", rs.getString("name")); + row.put("callsign", rs.getString("callsign")); + row.put("vessel_type", rs.getString("vessel_type")); + row.put("extra_info", rs.getString("extra_info")); + row.put("length", rs.getObject("length")); + row.put("width", rs.getObject("width")); + row.put("draught", rs.getObject("draught")); + row.put("destination", rs.getString("destination")); + row.put("status", rs.getString("status")); + row.put("signal_kind_code", rs.getString("signal_kind_code")); + row.put("class_type", rs.getString("class_type")); + result.put(rs.getString("mmsi"), row); + }, hourBucketTs); + + return result; + } + /** * MMSI별 필드 COALESCE: 각 필드별 마지막 non-empty 값 조합 */ -- 2.45.2