From 4dd40b7231a473c5391795d969bb57ea789ecb18 Mon Sep 17 00:00:00 2001 From: htlee Date: Thu, 19 Feb 2026 12:17:16 +0900 Subject: [PATCH] =?UTF-8?q?perf:=20vesselStaticSyncStep=20N+1=20=EC=BF=BC?= =?UTF-8?q?=EB=A6=AC=20=EC=A0=9C=EA=B1=B0=20=E2=80=94=20DISTINCT=20ON=20bu?= =?UTF-8?q?lk=20SELECT=EB=A1=9C=20=EC=A0=84=ED=99=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 이전: MMSI별 개별 SELECT (~10만 쿼리) → 수 분 소요 이후: DISTINCT ON (mmsi) 1회 bulk SELECT → 인메모리 CDC 비교 Co-Authored-By: Claude Opus 4.6 --- .../batch/job/VesselStaticStepConfig.java | 76 +++++++++++++------ 1 file changed, 52 insertions(+), 24 deletions(-) diff --git a/src/main/java/gc/mda/signal_batch/batch/job/VesselStaticStepConfig.java b/src/main/java/gc/mda/signal_batch/batch/job/VesselStaticStepConfig.java index e282052..d7cee45 100644 --- a/src/main/java/gc/mda/signal_batch/batch/job/VesselStaticStepConfig.java +++ b/src/main/java/gc/mda/signal_batch/batch/job/VesselStaticStepConfig.java @@ -54,6 +54,8 @@ public class VesselStaticStepConfig { public Step vesselStaticSyncStep() { return new StepBuilder("vesselStaticSyncStep", jobRepository) .tasklet((contribution, chunkContext) -> { + long stepStart = System.currentTimeMillis(); + // 1. 캐시에서 전체 데이터 → MMSI별 그룹 Collection allEntities = cacheManager.getAllValues(); @@ -70,18 +72,16 @@ public class VesselStaticStepConfig { Map coalesced = coalesceByMmsi(allEntities); JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource); + Timestamp hourBucketTs = Timestamp.valueOf(hourBucket); - // 2. CDC: 이전 레코드와 비교 → 변경 시에만 INSERT - String selectPrevSql = """ - SELECT imo, name, callsign, vessel_type, extra_info, - length, width, draught, destination, status, - signal_kind_code, class_type - FROM signal.t_vessel_static - WHERE mmsi = ? AND time_bucket <= ? - ORDER BY time_bucket DESC - LIMIT 1 - """; + // 2. CDC: bulk SELECT로 이전 레코드 전체 조회 (N+1 → 1회) + Map> prevRecords = bulkFetchPreviousRecords( + jdbcTemplate, hourBucketTs); + log.info("t_vessel_static CDC 비교 시작 — 현재: {} 선박, 이전: {} 레코드", + coalesced.size(), prevRecords.size()); + + // 3. 인메모리 비교 → 변경 시에만 INSERT String insertSql = """ INSERT INTO signal.t_vessel_static ( mmsi, time_bucket, imo, name, callsign, @@ -104,26 +104,16 @@ public class VesselStaticStepConfig { class_type = EXCLUDED.class_type """; - Timestamp hourBucketTs = Timestamp.valueOf(hourBucket); int inserted = 0; int skipped = 0; - List batchArgs = new ArrayList<>(); for (Map.Entry entry : coalesced.entrySet()) { String mmsi = entry.getKey(); AisTargetEntity current = entry.getValue(); - // 이전 레코드 조회 - boolean changed; - try { - Map prev = jdbcTemplate.queryForMap( - selectPrevSql, mmsi, hourBucketTs); - changed = hasStaticInfoChanged(current, prev); - } catch (org.springframework.dao.EmptyResultDataAccessException e) { - // 이전 레코드 없음 → 첫 INSERT - changed = true; - } + Map prev = prevRecords.get(mmsi); + boolean changed = (prev == null) || hasStaticInfoChanged(current, prev); if (changed) { Timestamp etaTs = current.getEta() != null @@ -148,14 +138,52 @@ public class VesselStaticStepConfig { jdbcTemplate.batchUpdate(insertSql, batchArgs); } - log.info("t_vessel_static 동기화 완료: 총 {} 선박, INSERT {} 건, CDC 스킵 {} 건", - coalesced.size(), inserted, skipped); + long elapsed = System.currentTimeMillis() - stepStart; + log.info("t_vessel_static 동기화 완료: 총 {} 선박, INSERT {} 건, CDC 스킵 {} 건 ({}ms)", + coalesced.size(), inserted, skipped, elapsed); return org.springframework.batch.repeat.RepeatStatus.FINISHED; }, transactionManager) .build(); } + /** + * DISTINCT ON (mmsi)로 전체 이전 레코드를 1회 bulk 조회 + * N+1 개별 SELECT → 1회 bulk SELECT로 최적화 + */ + private Map> bulkFetchPreviousRecords( + JdbcTemplate jdbcTemplate, Timestamp hourBucketTs) { + String sql = """ + SELECT DISTINCT ON (mmsi) + mmsi, imo, name, callsign, vessel_type, extra_info, + length, width, draught, destination, status, + signal_kind_code, class_type + FROM signal.t_vessel_static + WHERE time_bucket <= ? + ORDER BY mmsi, time_bucket DESC + """; + + Map> result = new HashMap<>(); + jdbcTemplate.query(sql, rs -> { + Map row = new HashMap<>(); + row.put("imo", rs.getObject("imo")); + row.put("name", rs.getString("name")); + row.put("callsign", rs.getString("callsign")); + row.put("vessel_type", rs.getString("vessel_type")); + row.put("extra_info", rs.getString("extra_info")); + row.put("length", rs.getObject("length")); + row.put("width", rs.getObject("width")); + row.put("draught", rs.getObject("draught")); + row.put("destination", rs.getString("destination")); + row.put("status", rs.getString("status")); + row.put("signal_kind_code", rs.getString("signal_kind_code")); + row.put("class_type", rs.getString("class_type")); + result.put(rs.getString("mmsi"), row); + }, hourBucketTs); + + return result; + } + /** * MMSI별 필드 COALESCE: 각 필드별 마지막 non-empty 값 조합 */