refactor(batch): 파티션 스텝 프로세스 공통 모듈화

StringListPartitioner, LastExecutionUpdateTasklet, BasePartitionedJobConfig를
공통 모듈로 추출하고 ShipDetailUpdateJobConfig가 이를 사용하도록 리팩토링

Closes #73

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
HYOJIN 2026-03-23 10:48:42 +09:00
부모 e94e982ca9
커밋 7bacd1a989
5개의 변경된 파일233개의 추가작업 그리고 106개의 파일을 삭제

파일 보기

@ -0,0 +1,82 @@
package com.snp.batch.common.batch.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.core.task.TaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
/**
* 파티션 기반 병렬 처리 Job 구성을 위한 추상 클래스.
* 목록 조회 파티션 병렬 처리 후처리 패턴의 공통 인프라 제공.
*
* @param <I> 입력 타입 (Reader 출력, Processor 입력)
* @param <O> 출력 타입 (Processor 출력, Writer 입력)
*/
@Slf4j
public abstract class BasePartitionedJobConfig<I, O> extends BaseMultiStepJobConfig<I, O> {
public BasePartitionedJobConfig(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
super(jobRepository, transactionManager);
}
/**
* 파티션 Step을 생성합니다.
*
* @param stepName 파티션 Step 이름
* @param workerStepName Worker Step 이름 (partitioner 등록에 사용)
* @param partitioner Partitioner 인스턴스
* @param workerStep Worker Step 인스턴스
* @param taskExecutor 병렬 실행용 TaskExecutor
* @param gridSize 파티션
* @return 구성된 파티션 Step
*/
protected Step createPartitionedStep(String stepName, String workerStepName,
Partitioner partitioner, Step workerStep,
TaskExecutor taskExecutor, int gridSize) {
return new StepBuilder(stepName, jobRepository)
.partitioner(workerStepName, partitioner)
.step(workerStep)
.taskExecutor(taskExecutor)
.gridSize(gridSize)
.build();
}
/**
* 건수 기반 Decider를 생성합니다.
* JobExecutionContext의 지정된 값이 0이면 EMPTY_RESPONSE, 아니면 NORMAL 반환.
*
* @param contextKey JobExecutionContext에서 조회할 int 이름
* @param jobName 로그에 표시할 Job 이름
* @return 건수 기반 JobExecutionDecider
*/
protected JobExecutionDecider createKeyCountDecider(String contextKey, String jobName) {
return (jobExecution, stepExecution) -> {
int totalCount = jobExecution.getExecutionContext().getInt(contextKey, 0);
if (totalCount == 0) {
log.info("[{}] Decider: EMPTY_RESPONSE - {} 0건으로 후속 스텝 스킵", jobName, contextKey);
return new FlowExecutionStatus("EMPTY_RESPONSE");
}
log.info("[{}] Decider: NORMAL - {} {} 건 처리 시작", jobName, contextKey, totalCount);
return new FlowExecutionStatus("NORMAL");
};
}
/**
* LastExecution 업데이트 Step을 생성합니다.
*
* @param stepName Step 이름
* @param tasklet LastExecutionUpdateTasklet 인스턴스
* @return 구성된 LastExecution 업데이트 Step
*/
protected Step createLastExecutionUpdateStep(String stepName, Tasklet tasklet) {
return new StepBuilder(stepName, jobRepository)
.tasklet(tasklet, transactionManager)
.build();
}
}

파일 보기

@ -0,0 +1,61 @@
package com.snp.batch.common.batch.partition;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* 문자열 목록을 N개 파티션으로 균등 분할하는 범용 Partitioner.
*
* <p> 파티션의 ExecutionContext에 다음 값을 저장한다.</p>
* <ul>
* <li>{@code {contextKeyName}} 해당 파티션에 할당된 목록 (CSV 형식)</li>
* <li>{@code partitionIndex} 파티션 인덱스 (0-based)</li>
* <li>{@code partitionSize} 해당 파티션의 </li>
* </ul>
*/
@Slf4j
public class StringListPartitioner implements Partitioner {
private final List<String> allKeys;
private final int partitionCount;
private final String contextKeyName;
public StringListPartitioner(List<String> allKeys, int partitionCount, String contextKeyName) {
this.allKeys = allKeys;
this.partitionCount = partitionCount;
this.contextKeyName = contextKeyName;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int totalSize = allKeys.size();
int actualPartitionCount = Math.min(partitionCount, Math.max(1, totalSize));
Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
int partitionSize = (int) Math.ceil((double) totalSize / actualPartitionCount);
for (int i = 0; i < actualPartitionCount; i++) {
int fromIndex = i * partitionSize;
int toIndex = Math.min(fromIndex + partitionSize, totalSize);
if (fromIndex >= totalSize) break;
List<String> partitionKeys = allKeys.subList(fromIndex, toIndex);
ExecutionContext context = new ExecutionContext();
context.putString(contextKeyName, String.join(",", partitionKeys));
context.putInt("partitionIndex", i);
context.putInt("partitionSize", partitionKeys.size());
String partitionKey = "partition" + i;
partitions.put(partitionKey, context);
log.info("[StringListPartitioner] {} : 키 {} 건 (index {}-{})",
partitionKey, partitionKeys.size(), fromIndex, toIndex - 1);
}
log.info("[StringListPartitioner] 총 {} 개 파티션 생성 (전체 키: {} 건)",
partitions.size(), totalSize);
return partitions;
}
}

파일 보기

@ -0,0 +1,73 @@
package com.snp.batch.common.batch.tasklet;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.Timestamp;
import java.time.LocalDateTime;
/**
* 배치 작업 완료 BATCH_LAST_EXECUTION 테이블의 LAST_SUCCESS_DATE를 업데이트하는 공통 Tasklet.
*
* <p>RECOLLECT 모드일 경우 업데이트를 스킵하며,
* Job ExecutionContext에 저장된 {@code batchToDate} 기준으로 성공 날짜를 계산합니다.
* {@code batchToDate} 없을 경우 현재 시간에서 {@code bufferHours} 차감하여 사용합니다.</p>
*/
@Slf4j
public class LastExecutionUpdateTasklet implements Tasklet {
private static final String RECOLLECT_MODE = "RECOLLECT";
private final JdbcTemplate jdbcTemplate;
private final String targetSchema;
private final String apiKey;
private final int bufferHours;
public LastExecutionUpdateTasklet(JdbcTemplate jdbcTemplate, String targetSchema,
String apiKey, int bufferHours) {
this.jdbcTemplate = jdbcTemplate;
this.targetSchema = targetSchema;
this.apiKey = apiKey;
this.bufferHours = bufferHours;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
String executionMode = chunkContext.getStepContext()
.getStepExecution().getJobExecution()
.getJobParameters().getString("executionMode", "NORMAL");
if (RECOLLECT_MODE.equals(executionMode)) {
log.info(">>>>> RECOLLECT 모드 - LAST_EXECUTION 업데이트 스킵");
return RepeatStatus.FINISHED;
}
String toDateStr = chunkContext.getStepContext()
.getStepExecution().getJobExecution()
.getExecutionContext().getString("batchToDate", null);
LocalDateTime successDate;
if (toDateStr != null) {
successDate = LocalDateTime.parse(toDateStr).minusHours(bufferHours);
log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 시작 (캡처된 toDate - {}시간 버퍼: {})",
bufferHours, successDate);
} else {
successDate = LocalDateTime.now().minusHours(bufferHours);
log.warn(">>>>> batchToDate가 없어 현재 시간 - {}시간 버퍼 사용: {}", bufferHours, successDate);
}
jdbcTemplate.update(
String.format(
"UPDATE %s.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = ?, UPDATED_AT = NOW() WHERE API_KEY = ?",
targetSchema),
Timestamp.valueOf(successDate), apiKey
);
log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료 (LAST_SUCCESS_DATE = {})", successDate);
return RepeatStatus.FINISHED;
}
}

파일 보기

@ -1,61 +1,18 @@
package com.snp.batch.jobs.shipdetail.batch.config; package com.snp.batch.jobs.shipdetail.batch.config;
import lombok.extern.slf4j.Slf4j; import com.snp.batch.common.batch.partition.StringListPartitioner;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* 선박제원정보 IMO 목록을 N개 파티션으로 분할하는 Partitioner. * 선박제원정보 IMO 목록을 N개 파티션으로 분할하는 Partitioner.
* JobExecutionContext의 allImoNumbers를 읽어 파티션별 ExecutionContext에 할당. * 공통 {@link StringListPartitioner} 위임하며, ExecutionContext 키로 "partitionImoNumbers" 사용.
*/ */
@Slf4j public class ShipDetailPartitioner extends StringListPartitioner {
public class ShipDetailPartitioner implements Partitioner {
private final List<String> allImoNumbers; public static final String CONTEXT_KEY = "partitionImoNumbers";
private final int partitionCount;
public ShipDetailPartitioner(List<String> allImoNumbers, int partitionCount) { public ShipDetailPartitioner(List<String> allImoNumbers, int partitionCount) {
this.allImoNumbers = allImoNumbers; super(allImoNumbers, partitionCount, CONTEXT_KEY);
this.partitionCount = partitionCount;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int totalSize = allImoNumbers.size();
// 실제 파티션 : IMO 수가 적으면 파티션 수를 줄임
int actualPartitionCount = Math.min(partitionCount, Math.max(1, totalSize));
Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
int partitionSize = (int) Math.ceil((double) totalSize / actualPartitionCount);
for (int i = 0; i < actualPartitionCount; i++) {
int fromIndex = i * partitionSize;
int toIndex = Math.min(fromIndex + partitionSize, totalSize);
if (fromIndex >= totalSize) {
break;
}
List<String> partitionImos = allImoNumbers.subList(fromIndex, toIndex);
ExecutionContext context = new ExecutionContext();
context.putString("partitionImoNumbers", String.join(",", partitionImos));
context.putInt("partitionIndex", i);
context.putInt("partitionSize", partitionImos.size());
String partitionKey = "partition" + i;
partitions.put(partitionKey, context);
log.info("[ShipDetailPartitioner] {} : IMO {} 건 (index {}-{})",
partitionKey, partitionImos.size(), fromIndex, toIndex - 1);
}
log.info("[ShipDetailPartitioner] 총 {} 개 파티션 생성 (전체 IMO: {} 건)",
partitions.size(), totalSize);
return partitions;
} }
} }

파일 보기

@ -1,7 +1,8 @@
package com.snp.batch.jobs.shipdetail.batch.config; package com.snp.batch.jobs.shipdetail.batch.config;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.config.BaseMultiStepJobConfig; import com.snp.batch.common.batch.config.BasePartitionedJobConfig;
import com.snp.batch.common.batch.tasklet.LastExecutionUpdateTasklet;
import com.snp.batch.global.repository.BatchFailedRecordRepository; import com.snp.batch.global.repository.BatchFailedRecordRepository;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailDto; import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailDto;
import com.snp.batch.jobs.shipdetail.batch.entity.ShipDetailEntity; import com.snp.batch.jobs.shipdetail.batch.entity.ShipDetailEntity;
@ -13,17 +14,14 @@ import com.snp.batch.service.BatchDateService;
import com.snp.batch.service.BatchFailedRecordService; import com.snp.batch.service.BatchFailedRecordService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener; import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step; import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider; import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
@ -33,15 +31,13 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@Slf4j @Slf4j
@Configuration @Configuration
public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetailDto, ShipDetailEntity> { public class ShipDetailUpdateJobConfig extends BasePartitionedJobConfig<ShipDetailDto, ShipDetailEntity> {
private final ShipDetailDataProcessor shipDetailDataProcessor; private final ShipDetailDataProcessor shipDetailDataProcessor;
private final ShipDetailDataWriter shipDetailDataWriter; private final ShipDetailDataWriter shipDetailDataWriter;
@ -167,17 +163,7 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
@Bean @Bean
public JobExecutionDecider imoCountDecider() { public JobExecutionDecider imoCountDecider() {
return (jobExecution, stepExecution) -> { return createKeyCountDecider("totalImoCount", getJobName());
int totalImoCount = jobExecution.getExecutionContext().getInt("totalImoCount", 0);
if (totalImoCount == 0) {
log.info("[ShipDetailUpdateJob] Decider: EMPTY_RESPONSE - IMO 0건으로 LAST_EXECUTION 업데이트 스킵");
return new FlowExecutionStatus("EMPTY_RESPONSE");
}
log.info("[ShipDetailUpdateJob] Decider: NORMAL - IMO {} 건 처리 시작", totalImoCount);
return new FlowExecutionStatus("NORMAL");
};
} }
// ======================================== // ========================================
@ -197,12 +183,10 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
@Bean(name = "ShipDetailUpdatePartitionedStep") @Bean(name = "ShipDetailUpdatePartitionedStep")
public Step shipDetailUpdatePartitionedStep() { public Step shipDetailUpdatePartitionedStep() {
return new StepBuilder("ShipDetailUpdatePartitionedStep", jobRepository) return createPartitionedStep(
.partitioner("ShipDetailUpdateStep", shipDetailPartitioner(null)) "ShipDetailUpdatePartitionedStep", "ShipDetailUpdateStep",
.step(shipDetailUpdateWorkerStep()) shipDetailPartitioner(null), shipDetailUpdateWorkerStep(),
.taskExecutor(batchPartitionExecutor) batchPartitionExecutor, partitionCount);
.gridSize(partitionCount)
.build();
} }
@Bean @Bean
@ -260,7 +244,7 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
// ======================================== // ========================================
@Bean(name = "ShipDetailUpdateJob") @Bean(name = "ShipDetailUpdateJob")
public Job ShipDetailUpdateJob() { public Job shipDetailUpdateJob() {
return job(); return job();
} }
@ -270,42 +254,12 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
@Bean @Bean
public Tasklet shipDetailLastExecutionUpdateTasklet() { public Tasklet shipDetailLastExecutionUpdateTasklet() {
return (contribution, chunkContext) -> { return new LastExecutionUpdateTasklet(jdbcTemplate, targetSchema, getApiKey(), lastExecutionBufferHours);
String executionMode = chunkContext.getStepContext()
.getStepExecution().getJobExecution()
.getJobParameters().getString("executionMode", "NORMAL");
if ("RECOLLECT".equals(executionMode)) {
log.info(">>>>> RECOLLECT 모드 - LAST_EXECUTION 업데이트 스킵");
return RepeatStatus.FINISHED;
}
String toDateStr = chunkContext.getStepContext()
.getStepExecution().getJobExecution()
.getExecutionContext().getString("batchToDate", null);
LocalDateTime successDate;
if (toDateStr != null) {
successDate = LocalDateTime.parse(toDateStr).minusHours(lastExecutionBufferHours);
log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 시작 (캡처된 toDate - {}시간 버퍼: {})", lastExecutionBufferHours, successDate);
} else {
successDate = LocalDateTime.now().minusHours(lastExecutionBufferHours);
log.warn(">>>>> batchToDate가 없어 현재 시간 - {}시간 버퍼 사용: {}", lastExecutionBufferHours, successDate);
}
jdbcTemplate.update(
String.format("UPDATE %s.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = ?, UPDATED_AT = NOW() WHERE API_KEY = ?", targetSchema),
Timestamp.valueOf(successDate), getApiKey()
);
log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료 (LAST_SUCCESS_DATE = {})", successDate);
return RepeatStatus.FINISHED;
};
} }
@Bean(name = "ShipDetailLastExecutionUpdateStep") @Bean(name = "ShipDetailLastExecutionUpdateStep")
public Step shipDetailLastExecutionUpdateStep() { public Step shipDetailLastExecutionUpdateStep() {
return new StepBuilder("ShipDetailLastExecutionUpdateStep", jobRepository) return createLastExecutionUpdateStep("ShipDetailLastExecutionUpdateStep",
.tasklet(shipDetailLastExecutionUpdateTasklet(), transactionManager) shipDetailLastExecutionUpdateTasklet());
.build();
} }
} }