perf: vesselStaticSyncStep N+1 쿼리 제거 — DISTINCT ON bulk SELECT로 전환

이전: MMSI별 개별 SELECT (~10만 쿼리) → 수 분 소요
이후: DISTINCT ON (mmsi) 1회 bulk SELECT → 인메모리 CDC 비교

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
htlee 2026-02-19 12:17:16 +09:00
부모 a6d886c61b
커밋 4dd40b7231

파일 보기

@ -54,6 +54,8 @@ public class VesselStaticStepConfig {
public Step vesselStaticSyncStep() {
return new StepBuilder("vesselStaticSyncStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
long stepStart = System.currentTimeMillis();
// 1. 캐시에서 전체 데이터 MMSI별 그룹
Collection<AisTargetEntity> allEntities = cacheManager.getAllValues();
@ -70,18 +72,16 @@ public class VesselStaticStepConfig {
Map<String, AisTargetEntity> coalesced = coalesceByMmsi(allEntities);
JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource);
Timestamp hourBucketTs = Timestamp.valueOf(hourBucket);
// 2. CDC: 이전 레코드와 비교 변경 시에만 INSERT
String selectPrevSql = """
SELECT imo, name, callsign, vessel_type, extra_info,
length, width, draught, destination, status,
signal_kind_code, class_type
FROM signal.t_vessel_static
WHERE mmsi = ? AND time_bucket <= ?
ORDER BY time_bucket DESC
LIMIT 1
""";
// 2. CDC: bulk SELECT로 이전 레코드 전체 조회 (N+1 1회)
Map<String, Map<String, Object>> prevRecords = bulkFetchPreviousRecords(
jdbcTemplate, hourBucketTs);
log.info("t_vessel_static CDC 비교 시작 — 현재: {} 선박, 이전: {} 레코드",
coalesced.size(), prevRecords.size());
// 3. 인메모리 비교 변경 시에만 INSERT
String insertSql = """
INSERT INTO signal.t_vessel_static (
mmsi, time_bucket, imo, name, callsign,
@ -104,26 +104,16 @@ public class VesselStaticStepConfig {
class_type = EXCLUDED.class_type
""";
Timestamp hourBucketTs = Timestamp.valueOf(hourBucket);
int inserted = 0;
int skipped = 0;
List<Object[]> batchArgs = new ArrayList<>();
for (Map.Entry<String, AisTargetEntity> entry : coalesced.entrySet()) {
String mmsi = entry.getKey();
AisTargetEntity current = entry.getValue();
// 이전 레코드 조회
boolean changed;
try {
Map<String, Object> prev = jdbcTemplate.queryForMap(
selectPrevSql, mmsi, hourBucketTs);
changed = hasStaticInfoChanged(current, prev);
} catch (org.springframework.dao.EmptyResultDataAccessException e) {
// 이전 레코드 없음 INSERT
changed = true;
}
Map<String, Object> prev = prevRecords.get(mmsi);
boolean changed = (prev == null) || hasStaticInfoChanged(current, prev);
if (changed) {
Timestamp etaTs = current.getEta() != null
@ -148,14 +138,52 @@ public class VesselStaticStepConfig {
jdbcTemplate.batchUpdate(insertSql, batchArgs);
}
log.info("t_vessel_static 동기화 완료: 총 {} 선박, INSERT {} 건, CDC 스킵 {} 건",
coalesced.size(), inserted, skipped);
long elapsed = System.currentTimeMillis() - stepStart;
log.info("t_vessel_static 동기화 완료: 총 {} 선박, INSERT {} 건, CDC 스킵 {} 건 ({}ms)",
coalesced.size(), inserted, skipped, elapsed);
return org.springframework.batch.repeat.RepeatStatus.FINISHED;
}, transactionManager)
.build();
}
/**
* DISTINCT ON (mmsi) 전체 이전 레코드를 1회 bulk 조회
* N+1 개별 SELECT 1회 bulk SELECT로 최적화
*/
private Map<String, Map<String, Object>> bulkFetchPreviousRecords(
JdbcTemplate jdbcTemplate, Timestamp hourBucketTs) {
String sql = """
SELECT DISTINCT ON (mmsi)
mmsi, imo, name, callsign, vessel_type, extra_info,
length, width, draught, destination, status,
signal_kind_code, class_type
FROM signal.t_vessel_static
WHERE time_bucket <= ?
ORDER BY mmsi, time_bucket DESC
""";
Map<String, Map<String, Object>> result = new HashMap<>();
jdbcTemplate.query(sql, rs -> {
Map<String, Object> row = new HashMap<>();
row.put("imo", rs.getObject("imo"));
row.put("name", rs.getString("name"));
row.put("callsign", rs.getString("callsign"));
row.put("vessel_type", rs.getString("vessel_type"));
row.put("extra_info", rs.getString("extra_info"));
row.put("length", rs.getObject("length"));
row.put("width", rs.getObject("width"));
row.put("draught", rs.getObject("draught"));
row.put("destination", rs.getString("destination"));
row.put("status", rs.getString("status"));
row.put("signal_kind_code", rs.getString("signal_kind_code"));
row.put("class_type", rs.getString("class_type"));
result.put(rs.getString("mmsi"), row);
}, hourBucketTs);
return result;
}
/**
* MMSI별 필드 COALESCE: 필드별 마지막 non-empty 조합
*/