From 7bacd1a989f57ac62b43cc558650550c38b03c9b Mon Sep 17 00:00:00 2001 From: HYOJIN Date: Mon, 23 Mar 2026 10:48:42 +0900 Subject: [PATCH] =?UTF-8?q?refactor(batch):=20=ED=8C=8C=ED=8B=B0=EC=85=98?= =?UTF-8?q?=20=EC=8A=A4=ED=85=9D=20=ED=94=84=EB=A1=9C=EC=84=B8=EC=8A=A4=20?= =?UTF-8?q?=EA=B3=B5=ED=86=B5=20=EB=AA=A8=EB=93=88=ED=99=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit StringListPartitioner, LastExecutionUpdateTasklet, BasePartitionedJobConfig를 공통 모듈로 추출하고 ShipDetailUpdateJobConfig가 이를 사용하도록 리팩토링 Closes #73 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../config/BasePartitionedJobConfig.java | 82 +++++++++++++++++++ .../partition/StringListPartitioner.java | 61 ++++++++++++++ .../tasklet/LastExecutionUpdateTasklet.java | 73 +++++++++++++++++ .../batch/config/ShipDetailPartitioner.java | 53 ++---------- .../config/ShipDetailUpdateJobConfig.java | 70 +++------------- 5 files changed, 233 insertions(+), 106 deletions(-) create mode 100644 src/main/java/com/snp/batch/common/batch/config/BasePartitionedJobConfig.java create mode 100644 src/main/java/com/snp/batch/common/batch/partition/StringListPartitioner.java create mode 100644 src/main/java/com/snp/batch/common/batch/tasklet/LastExecutionUpdateTasklet.java diff --git a/src/main/java/com/snp/batch/common/batch/config/BasePartitionedJobConfig.java b/src/main/java/com/snp/batch/common/batch/config/BasePartitionedJobConfig.java new file mode 100644 index 0000000..7b5a119 --- /dev/null +++ b/src/main/java/com/snp/batch/common/batch/config/BasePartitionedJobConfig.java @@ -0,0 +1,82 @@ +package com.snp.batch.common.batch.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.job.flow.FlowExecutionStatus; +import org.springframework.batch.core.job.flow.JobExecutionDecider; +import org.springframework.batch.core.partition.support.Partitioner; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.core.task.TaskExecutor; +import org.springframework.transaction.PlatformTransactionManager; + +/** + * 파티션 기반 병렬 처리 Job 구성을 위한 추상 클래스. + * 키 목록 조회 → 파티션 병렬 처리 → 후처리 패턴의 공통 인프라 제공. + * + * @param 입력 타입 (Reader 출력, Processor 입력) + * @param 출력 타입 (Processor 출력, Writer 입력) + */ +@Slf4j +public abstract class BasePartitionedJobConfig extends BaseMultiStepJobConfig { + + public BasePartitionedJobConfig(JobRepository jobRepository, PlatformTransactionManager transactionManager) { + super(jobRepository, transactionManager); + } + + /** + * 파티션 Step을 생성합니다. + * + * @param stepName 파티션 Step 이름 + * @param workerStepName Worker Step 이름 (partitioner 등록에 사용) + * @param partitioner Partitioner 인스턴스 + * @param workerStep Worker Step 인스턴스 + * @param taskExecutor 병렬 실행용 TaskExecutor + * @param gridSize 파티션 수 + * @return 구성된 파티션 Step + */ + protected Step createPartitionedStep(String stepName, String workerStepName, + Partitioner partitioner, Step workerStep, + TaskExecutor taskExecutor, int gridSize) { + return new StepBuilder(stepName, jobRepository) + .partitioner(workerStepName, partitioner) + .step(workerStep) + .taskExecutor(taskExecutor) + .gridSize(gridSize) + .build(); + } + + /** + * 키 건수 기반 Decider를 생성합니다. + * JobExecutionContext의 지정된 키 값이 0이면 EMPTY_RESPONSE, 아니면 NORMAL 반환. + * + * @param contextKey JobExecutionContext에서 조회할 int 키 이름 + * @param jobName 로그에 표시할 Job 이름 + * @return 키 건수 기반 JobExecutionDecider + */ + protected JobExecutionDecider createKeyCountDecider(String contextKey, String jobName) { + return (jobExecution, stepExecution) -> { + int totalCount = jobExecution.getExecutionContext().getInt(contextKey, 0); + if (totalCount == 0) { + log.info("[{}] Decider: EMPTY_RESPONSE - {} 0건으로 후속 스텝 스킵", jobName, contextKey); + return new FlowExecutionStatus("EMPTY_RESPONSE"); + } + log.info("[{}] Decider: NORMAL - {} {} 건 처리 시작", jobName, contextKey, totalCount); + return new FlowExecutionStatus("NORMAL"); + }; + } + + /** + * LastExecution 업데이트 Step을 생성합니다. + * + * @param stepName Step 이름 + * @param tasklet LastExecutionUpdateTasklet 인스턴스 + * @return 구성된 LastExecution 업데이트 Step + */ + protected Step createLastExecutionUpdateStep(String stepName, Tasklet tasklet) { + return new StepBuilder(stepName, jobRepository) + .tasklet(tasklet, transactionManager) + .build(); + } +} diff --git a/src/main/java/com/snp/batch/common/batch/partition/StringListPartitioner.java b/src/main/java/com/snp/batch/common/batch/partition/StringListPartitioner.java new file mode 100644 index 0000000..7a99493 --- /dev/null +++ b/src/main/java/com/snp/batch/common/batch/partition/StringListPartitioner.java @@ -0,0 +1,61 @@ +package com.snp.batch.common.batch.partition; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.partition.support.Partitioner; +import org.springframework.batch.item.ExecutionContext; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * 문자열 키 목록을 N개 파티션으로 균등 분할하는 범용 Partitioner. + * + *

각 파티션의 ExecutionContext에 다음 값을 저장한다.

+ *
    + *
  • {@code {contextKeyName}} — 해당 파티션에 할당된 키 목록 (CSV 형식)
  • + *
  • {@code partitionIndex} — 파티션 인덱스 (0-based)
  • + *
  • {@code partitionSize} — 해당 파티션의 키 수
  • + *
+ */ +@Slf4j +public class StringListPartitioner implements Partitioner { + + private final List allKeys; + private final int partitionCount; + private final String contextKeyName; + + public StringListPartitioner(List allKeys, int partitionCount, String contextKeyName) { + this.allKeys = allKeys; + this.partitionCount = partitionCount; + this.contextKeyName = contextKeyName; + } + + @Override + public Map partition(int gridSize) { + int totalSize = allKeys.size(); + int actualPartitionCount = Math.min(partitionCount, Math.max(1, totalSize)); + Map partitions = new LinkedHashMap<>(); + int partitionSize = (int) Math.ceil((double) totalSize / actualPartitionCount); + + for (int i = 0; i < actualPartitionCount; i++) { + int fromIndex = i * partitionSize; + int toIndex = Math.min(fromIndex + partitionSize, totalSize); + if (fromIndex >= totalSize) break; + + List partitionKeys = allKeys.subList(fromIndex, toIndex); + ExecutionContext context = new ExecutionContext(); + context.putString(contextKeyName, String.join(",", partitionKeys)); + context.putInt("partitionIndex", i); + context.putInt("partitionSize", partitionKeys.size()); + + String partitionKey = "partition" + i; + partitions.put(partitionKey, context); + log.info("[StringListPartitioner] {} : 키 {} 건 (index {}-{})", + partitionKey, partitionKeys.size(), fromIndex, toIndex - 1); + } + log.info("[StringListPartitioner] 총 {} 개 파티션 생성 (전체 키: {} 건)", + partitions.size(), totalSize); + return partitions; + } +} diff --git a/src/main/java/com/snp/batch/common/batch/tasklet/LastExecutionUpdateTasklet.java b/src/main/java/com/snp/batch/common/batch/tasklet/LastExecutionUpdateTasklet.java new file mode 100644 index 0000000..7ef7455 --- /dev/null +++ b/src/main/java/com/snp/batch/common/batch/tasklet/LastExecutionUpdateTasklet.java @@ -0,0 +1,73 @@ +package com.snp.batch.common.batch.tasklet; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.sql.Timestamp; +import java.time.LocalDateTime; + +/** + * 배치 작업 완료 후 BATCH_LAST_EXECUTION 테이블의 LAST_SUCCESS_DATE를 업데이트하는 공통 Tasklet. + * + *

RECOLLECT 모드일 경우 업데이트를 스킵하며, + * Job ExecutionContext에 저장된 {@code batchToDate}를 기준으로 성공 날짜를 계산합니다. + * {@code batchToDate}가 없을 경우 현재 시간에서 {@code bufferHours}를 차감하여 사용합니다.

+ */ +@Slf4j +public class LastExecutionUpdateTasklet implements Tasklet { + + private static final String RECOLLECT_MODE = "RECOLLECT"; + + private final JdbcTemplate jdbcTemplate; + private final String targetSchema; + private final String apiKey; + private final int bufferHours; + + public LastExecutionUpdateTasklet(JdbcTemplate jdbcTemplate, String targetSchema, + String apiKey, int bufferHours) { + this.jdbcTemplate = jdbcTemplate; + this.targetSchema = targetSchema; + this.apiKey = apiKey; + this.bufferHours = bufferHours; + } + + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) { + String executionMode = chunkContext.getStepContext() + .getStepExecution().getJobExecution() + .getJobParameters().getString("executionMode", "NORMAL"); + + if (RECOLLECT_MODE.equals(executionMode)) { + log.info(">>>>> RECOLLECT 모드 - LAST_EXECUTION 업데이트 스킵"); + return RepeatStatus.FINISHED; + } + + String toDateStr = chunkContext.getStepContext() + .getStepExecution().getJobExecution() + .getExecutionContext().getString("batchToDate", null); + + LocalDateTime successDate; + if (toDateStr != null) { + successDate = LocalDateTime.parse(toDateStr).minusHours(bufferHours); + log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 시작 (캡처된 toDate - {}시간 버퍼: {})", + bufferHours, successDate); + } else { + successDate = LocalDateTime.now().minusHours(bufferHours); + log.warn(">>>>> batchToDate가 없어 현재 시간 - {}시간 버퍼 사용: {}", bufferHours, successDate); + } + + jdbcTemplate.update( + String.format( + "UPDATE %s.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = ?, UPDATED_AT = NOW() WHERE API_KEY = ?", + targetSchema), + Timestamp.valueOf(successDate), apiKey + ); + + log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료 (LAST_SUCCESS_DATE = {})", successDate); + return RepeatStatus.FINISHED; + } +} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailPartitioner.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailPartitioner.java index b90b4f7..1cbaa3f 100644 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailPartitioner.java +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailPartitioner.java @@ -1,61 +1,18 @@ package com.snp.batch.jobs.shipdetail.batch.config; -import lombok.extern.slf4j.Slf4j; -import org.springframework.batch.core.partition.support.Partitioner; -import org.springframework.batch.item.ExecutionContext; +import com.snp.batch.common.batch.partition.StringListPartitioner; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; /** * 선박제원정보 IMO 목록을 N개 파티션으로 분할하는 Partitioner. - * JobExecutionContext의 allImoNumbers를 읽어 파티션별 ExecutionContext에 할당. + * 공통 {@link StringListPartitioner}에 위임하며, ExecutionContext 키로 "partitionImoNumbers"를 사용. */ -@Slf4j -public class ShipDetailPartitioner implements Partitioner { +public class ShipDetailPartitioner extends StringListPartitioner { - private final List allImoNumbers; - private final int partitionCount; + public static final String CONTEXT_KEY = "partitionImoNumbers"; public ShipDetailPartitioner(List allImoNumbers, int partitionCount) { - this.allImoNumbers = allImoNumbers; - this.partitionCount = partitionCount; - } - - @Override - public Map partition(int gridSize) { - int totalSize = allImoNumbers.size(); - // 실제 파티션 수: IMO 수가 적으면 파티션 수를 줄임 - int actualPartitionCount = Math.min(partitionCount, Math.max(1, totalSize)); - - Map partitions = new LinkedHashMap<>(); - int partitionSize = (int) Math.ceil((double) totalSize / actualPartitionCount); - - for (int i = 0; i < actualPartitionCount; i++) { - int fromIndex = i * partitionSize; - int toIndex = Math.min(fromIndex + partitionSize, totalSize); - - if (fromIndex >= totalSize) { - break; - } - - List partitionImos = allImoNumbers.subList(fromIndex, toIndex); - ExecutionContext context = new ExecutionContext(); - context.putString("partitionImoNumbers", String.join(",", partitionImos)); - context.putInt("partitionIndex", i); - context.putInt("partitionSize", partitionImos.size()); - - String partitionKey = "partition" + i; - partitions.put(partitionKey, context); - - log.info("[ShipDetailPartitioner] {} : IMO {} 건 (index {}-{})", - partitionKey, partitionImos.size(), fromIndex, toIndex - 1); - } - - log.info("[ShipDetailPartitioner] 총 {} 개 파티션 생성 (전체 IMO: {} 건)", - partitions.size(), totalSize); - - return partitions; + super(allImoNumbers, partitionCount, CONTEXT_KEY); } } diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java index 55af4d5..1b911ba 100644 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java @@ -1,7 +1,8 @@ package com.snp.batch.jobs.shipdetail.batch.config; import com.fasterxml.jackson.databind.ObjectMapper; -import com.snp.batch.common.batch.config.BaseMultiStepJobConfig; +import com.snp.batch.common.batch.config.BasePartitionedJobConfig; +import com.snp.batch.common.batch.tasklet.LastExecutionUpdateTasklet; import com.snp.batch.global.repository.BatchFailedRecordRepository; import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailDto; import com.snp.batch.jobs.shipdetail.batch.entity.ShipDetailEntity; @@ -13,17 +14,14 @@ import com.snp.batch.service.BatchDateService; import com.snp.batch.service.BatchFailedRecordService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; -import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionListener; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.job.builder.JobBuilder; -import org.springframework.batch.core.job.flow.FlowExecutionStatus; import org.springframework.batch.core.job.flow.JobExecutionDecider; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.core.step.tasklet.Tasklet; -import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; @@ -33,15 +31,13 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.web.reactive.function.client.WebClient; -import java.sql.Timestamp; -import java.time.LocalDateTime; import java.util.Arrays; import java.util.Collections; import java.util.List; @Slf4j @Configuration -public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig { +public class ShipDetailUpdateJobConfig extends BasePartitionedJobConfig { private final ShipDetailDataProcessor shipDetailDataProcessor; private final ShipDetailDataWriter shipDetailDataWriter; @@ -167,17 +163,7 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig { - int totalImoCount = jobExecution.getExecutionContext().getInt("totalImoCount", 0); - - if (totalImoCount == 0) { - log.info("[ShipDetailUpdateJob] Decider: EMPTY_RESPONSE - IMO 0건으로 LAST_EXECUTION 업데이트 스킵"); - return new FlowExecutionStatus("EMPTY_RESPONSE"); - } - - log.info("[ShipDetailUpdateJob] Decider: NORMAL - IMO {} 건 처리 시작", totalImoCount); - return new FlowExecutionStatus("NORMAL"); - }; + return createKeyCountDecider("totalImoCount", getJobName()); } // ======================================== @@ -197,12 +183,10 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig { - String executionMode = chunkContext.getStepContext() - .getStepExecution().getJobExecution() - .getJobParameters().getString("executionMode", "NORMAL"); - if ("RECOLLECT".equals(executionMode)) { - log.info(">>>>> RECOLLECT 모드 - LAST_EXECUTION 업데이트 스킵"); - return RepeatStatus.FINISHED; - } - - String toDateStr = chunkContext.getStepContext() - .getStepExecution().getJobExecution() - .getExecutionContext().getString("batchToDate", null); - - LocalDateTime successDate; - if (toDateStr != null) { - successDate = LocalDateTime.parse(toDateStr).minusHours(lastExecutionBufferHours); - log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 시작 (캡처된 toDate - {}시간 버퍼: {})", lastExecutionBufferHours, successDate); - } else { - successDate = LocalDateTime.now().minusHours(lastExecutionBufferHours); - log.warn(">>>>> batchToDate가 없어 현재 시간 - {}시간 버퍼 사용: {}", lastExecutionBufferHours, successDate); - } - - jdbcTemplate.update( - String.format("UPDATE %s.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = ?, UPDATED_AT = NOW() WHERE API_KEY = ?", targetSchema), - Timestamp.valueOf(successDate), getApiKey() - ); - - log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료 (LAST_SUCCESS_DATE = {})", successDate); - return RepeatStatus.FINISHED; - }; + return new LastExecutionUpdateTasklet(jdbcTemplate, targetSchema, getApiKey(), lastExecutionBufferHours); } @Bean(name = "ShipDetailLastExecutionUpdateStep") public Step shipDetailLastExecutionUpdateStep() { - return new StepBuilder("ShipDetailLastExecutionUpdateStep", jobRepository) - .tasklet(shipDetailLastExecutionUpdateTasklet(), transactionManager) - .build(); + return createLastExecutionUpdateStep("ShipDetailLastExecutionUpdateStep", + shipDetailLastExecutionUpdateTasklet()); } }