Merge pull request 'feat(shipdetail): 선박제원정보 수집 배치 작업 병렬화 및 최적화' (#70) from feature/ISSUE-64-ship-batch-parallel-optimization into develop

This commit is contained in:
HYOJIN 2026-03-20 16:26:24 +09:00
커밋 859d290164
13개의 변경된 파일446개의 추가작업 그리고 140개의 파일을 삭제

파일 보기

@ -35,6 +35,7 @@
- 스케줄 화면 검색/정렬/필터 기능 추가 및 UI 구조 개선 (#54)
- 재수집 이력 화면 개선: 배치 실행일시 추가, 작업명 잘림 해소, CSV 내보내기 제거 (#55)
- AIS API 응답 스트리밍 처리로 메모리 버퍼 제한 우회 (DataBufferLimitException 근본 해결)
- 선박제원정보 수집 배치 작업 병렬화 (Partitioned Step) (#64)
### 수정
- 자동 재수집 JobParameter 오버플로우 수정 (VARCHAR 2500 제한 해결)
@ -53,8 +54,10 @@
- CronPreview step=0 무한루프 방지
- AIS WebClient 버퍼 제한 50MB→100MB 확대 및 타임아웃 설정 추가 (DataBufferLimitException 해결)
- ShipDetailUpdateDataReader beforeFetch에서 allImoNumbers 미할당으로 인한 NPE 수정
- 재수집 모드 afterFetch 중복 실행으로 인한 실패 레코드 중복 INSERT 수정 (#64)
### 변경
- 재수집 실패건 추적 기준 sourceStepExecutionId → sourceJobExecutionId로 변경 (#64)
- 실패 레코드 Upsert 패턴 적용 (동일 키 중복 방지)
- 재시도 상태 배지 표시 (대기/재시도 N/3/재시도 초과)
- 미사용 Dead Code 정리 (~1,200 LOC 삭제)

파일 보기

@ -30,7 +30,7 @@ import java.util.Set;
public class AutoRetryJobExecutionListener implements JobExecutionListener {
private static final String FAILED_RECORD_KEYS = "failedRecordKeys";
private static final String FAILED_STEP_EXECUTION_ID = "failedStepExecutionId";
private static final String FAILED_JOB_EXECUTION_ID = "failedJobExecutionId";
private static final String FAILED_API_KEY = "failedApiKey";
private static final int MAX_AUTO_RETRY_COUNT = 3;
@ -61,7 +61,7 @@ public class AutoRetryJobExecutionListener implements JobExecutionListener {
// 모든 Step의 failedRecordKeys를 Set으로 병합 (중복 제거)
Set<String> mergedKeys = new LinkedHashSet<>();
Long sourceStepExecutionId = null;
Long sourceJobExecutionId = jobExecution.getId();
String apiKey = null;
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
@ -77,12 +77,6 @@ public class AutoRetryJobExecutionListener implements JobExecutionListener {
.filter(key -> !key.isBlank())
.forEach(mergedKeys::add);
// sourceStepExecutionId: 마지막 Step 사용
sourceStepExecutionId = stepExecution.getExecutionContext()
.containsKey(FAILED_STEP_EXECUTION_ID)
? stepExecution.getExecutionContext().getLong(FAILED_STEP_EXECUTION_ID)
: stepExecution.getId();
// apiKey: non-null인 번째 사용
if (apiKey == null) {
apiKey = stepExecution.getExecutionContext()
@ -112,8 +106,8 @@ public class AutoRetryJobExecutionListener implements JobExecutionListener {
log.info("[AutoRetry] {} Job 완료 후 실패 건 {}건 감지 → 자동 재수집 트리거",
jobName, mergedKeys.size());
// sourceStepExecutionId 기반으로 1회만 triggerRetryAsync 호출 (실패 키는 DB에서 직접 조회)
// sourceJobExecutionId 기반으로 1회만 triggerRetryAsync 호출 (실패 키는 DB에서 직접 조회)
autoRetryTriggerService.triggerRetryAsync(
jobName, mergedKeys.size(), sourceStepExecutionId, apiKey);
jobName, mergedKeys.size(), sourceJobExecutionId, apiKey);
}
}

파일 보기

@ -31,7 +31,7 @@ public class AutoRetryTriggerService {
@Async("autoRetryExecutor")
public void triggerRetryAsync(String jobName, int failedCount,
Long sourceStepExecutionId, String apiKey) {
Long sourceJobExecutionId, String apiKey) {
try {
Job job = jobMap.get(jobName);
if (job == null) {
@ -41,7 +41,7 @@ public class AutoRetryTriggerService {
JobParametersBuilder builder = new JobParametersBuilder()
.addLong("timestamp", System.currentTimeMillis())
.addString("sourceStepExecutionId", String.valueOf(sourceStepExecutionId))
.addString("sourceJobExecutionId", String.valueOf(sourceJobExecutionId))
.addString("executionMode", "RECOLLECT")
.addString("reason", "자동 재수집 (실패 건 자동 처리)")
.addString("executor", "AUTO_RETRY");
@ -52,8 +52,8 @@ public class AutoRetryTriggerService {
JobParameters retryParams = builder.toJobParameters();
log.info("[AutoRetry] 재수집 Job 실행 시작: jobName={}, 실패건={}, sourceStepExecutionId={}",
jobName, failedCount, sourceStepExecutionId);
log.info("[AutoRetry] 재수집 Job 실행 시작: jobName={}, 실패건={}, sourceJobExecutionId={}",
jobName, failedCount, sourceJobExecutionId);
JobExecution retryExecution = jobLauncher.run(job, retryParams);

파일 보기

@ -5,6 +5,8 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@ -36,4 +38,19 @@ public class AsyncConfig {
executor.initialize();
return executor;
}
/**
* 배치 파티션 병렬 실행 전용 Executor.
* ShipDetailUpdate 파티셔닝 배치 Step 병렬 처리에 사용.
*/
@Bean(name = "batchPartitionExecutor")
public TaskExecutor batchPartitionExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4); // 기본 파티션
executor.setMaxPoolSize(8); // 최대 파티션
executor.setQueueCapacity(20); // 대기
executor.setThreadNamePrefix("BatchPartition-");
executor.initialize();
return executor;
}
}

파일 보기

@ -59,6 +59,19 @@ public interface BatchFailedRecordRepository extends JpaRepository<BatchFailedRe
@Param("recordKeys") List<String> recordKeys,
@Param("resolvedAt") LocalDateTime resolvedAt);
/**
* 특정 Job 실행의 실패 레코드를 RESOLVED로 벌크 업데이트
*/
@Modifying
@Query("UPDATE BatchFailedRecord r SET r.status = 'RESOLVED', r.resolvedAt = :resolvedAt " +
"WHERE r.jobName = :jobName AND r.jobExecutionId = :jobExecutionId " +
"AND r.recordKey IN :recordKeys AND r.status = 'FAILED'")
int resolveByJobExecutionIdAndRecordKeys(
@Param("jobName") String jobName,
@Param("jobExecutionId") Long jobExecutionId,
@Param("recordKeys") List<String> recordKeys,
@Param("resolvedAt") LocalDateTime resolvedAt);
/**
* ID 목록으로 FAILED 상태 레코드를 일괄 RESOLVED 처리
*/
@ -89,6 +102,17 @@ public interface BatchFailedRecordRepository extends JpaRepository<BatchFailedRe
@Param("jobName") String jobName,
@Param("stepExecutionId") Long stepExecutionId);
/**
* 특정 Job 실행의 미해결(FAILED) 실패 레코드 목록 조회.
* 자동 재수집 JobParameter 대신 DB에서 직접 조회하여 VARCHAR(2500) 제한을 우회.
*/
@Query("SELECT r.recordKey FROM BatchFailedRecord r " +
"WHERE r.jobName = :jobName AND r.jobExecutionId = :jobExecutionId " +
"AND r.status = 'FAILED'")
List<String> findFailedRecordKeysByJobExecutionId(
@Param("jobName") String jobName,
@Param("jobExecutionId") Long jobExecutionId);
/**
* 동일 Job에서 이미 FAILED 상태인 recordKey 목록 조회 (중복 방지용)
*/

파일 보기

@ -0,0 +1,172 @@
package com.snp.batch.jobs.shipdetail.batch.config;
import com.snp.batch.global.model.BatchApiLog;
import com.snp.batch.global.repository.BatchFailedRecordRepository;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDto;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipUpdateApiResponse;
import com.snp.batch.service.BatchApiLogService;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriComponentsBuilder;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* 선박제원정보 변경 IMO 목록 조회 Tasklet.
* NORMAL 모드: Maritime API에서 변경된 IMO 번호 조회
* RECOLLECT 모드: DB에서 실패 IMO 번호 조회
* 조회 결과를 JobExecutionContext에 저장하여 Partitioner가 사용할 있게 .
*/
@Slf4j
public class ShipDetailImoFetchTasklet implements Tasklet {
private static final String API_KEY = "SHIP_DETAIL_UPDATE_API";
private static final String SHIP_UPDATE_API_PATH = "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipChangesByLastUpdateDateRange";
private static final String JOB_NAME = "ShipDetailUpdateJob";
private final WebClient maritimeApiWebClient;
private final BatchDateService batchDateService;
private final BatchApiLogService batchApiLogService;
private final BatchFailedRecordRepository batchFailedRecordRepository;
private final String maritimeApiUrl;
public ShipDetailImoFetchTasklet(
WebClient maritimeApiWebClient,
BatchDateService batchDateService,
BatchApiLogService batchApiLogService,
BatchFailedRecordRepository batchFailedRecordRepository,
String maritimeApiUrl) {
this.maritimeApiWebClient = maritimeApiWebClient;
this.batchDateService = batchDateService;
this.batchApiLogService = batchApiLogService;
this.batchFailedRecordRepository = batchFailedRecordRepository;
this.maritimeApiUrl = maritimeApiUrl;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
JobExecution jobExecution = chunkContext.getStepContext()
.getStepExecution().getJobExecution();
String executionMode = jobExecution.getJobParameters()
.getString("executionMode", "NORMAL");
List<String> imoNumbers;
if ("RECOLLECT".equals(executionMode)) {
imoNumbers = fetchRecollectImoNumbers(jobExecution);
} else {
imoNumbers = fetchChangedImoNumbers(jobExecution);
}
// JobExecutionContext에 저장
jobExecution.getExecutionContext().putInt("totalImoCount", imoNumbers.size());
if (!imoNumbers.isEmpty()) {
jobExecution.getExecutionContext().putString("allImoNumbers", String.join(",", imoNumbers));
}
log.info("[ShipDetailImoFetchTasklet] {} 모드: IMO {} 건 조회 완료",
executionMode, imoNumbers.size());
return RepeatStatus.FINISHED;
}
private List<String> fetchRecollectImoNumbers(JobExecution jobExecution) {
String sourceJobExecutionIdParam = jobExecution.getJobParameters()
.getString("sourceJobExecutionId");
if (sourceJobExecutionIdParam == null || sourceJobExecutionIdParam.isBlank()) {
log.warn("[ShipDetailImoFetchTasklet] RECOLLECT 모드이나 sourceJobExecutionId가 없음");
return Collections.emptyList();
}
Long sourceJobExecutionId = Long.parseLong(sourceJobExecutionIdParam);
List<String> retryKeys = batchFailedRecordRepository.findFailedRecordKeysByJobExecutionId(
JOB_NAME, sourceJobExecutionId);
log.info("[ShipDetailImoFetchTasklet] RECOLLECT: DB에서 {} 건의 실패 키 조회 (sourceJobExecutionId: {})",
retryKeys.size(), sourceJobExecutionId);
return retryKeys;
}
private List<String> fetchChangedImoNumbers(JobExecution jobExecution) {
Map<String, String> params = batchDateService.getDateRangeWithoutTimeParams(API_KEY);
MultiValueMap<String, String> multiValueParams = new LinkedMultiValueMap<>();
params.forEach((key, value) ->
multiValueParams.put(key, Collections.singletonList(value)));
String fullUri = UriComponentsBuilder.fromHttpUrl(maritimeApiUrl)
.path(SHIP_UPDATE_API_PATH)
.queryParams(multiValueParams)
.build()
.toUriString();
long startTime = System.currentTimeMillis();
int statusCode = 200;
String errorMessage = null;
Long responseSize = 0L;
try {
log.info("[ShipDetailImoFetchTasklet] 변경 IMO 조회 API 호출: {}", fullUri);
ShipUpdateApiResponse response = maritimeApiWebClient.get()
.uri(uriBuilder -> {
uriBuilder.path(SHIP_UPDATE_API_PATH);
params.forEach(uriBuilder::queryParam);
return uriBuilder.build();
})
.retrieve()
.bodyToMono(new ParameterizedTypeReference<ShipUpdateApiResponse>() {})
.block();
if (response == null || response.getShips() == null) {
return Collections.emptyList();
}
List<String> imoNumbers = response.getShips().stream()
.map(ShipDto::getImoNumber)
.filter(Objects::nonNull)
.collect(Collectors.toList());
responseSize = (long) imoNumbers.size();
return imoNumbers;
} catch (Exception e) {
statusCode = 500;
errorMessage = e.getMessage();
log.error("[ShipDetailImoFetchTasklet] API 호출 실패: {}", e.getMessage(), e);
throw e;
} finally {
long duration = System.currentTimeMillis() - startTime;
batchApiLogService.saveLog(BatchApiLog.builder()
.apiRequestLocation("ShipDetailImoFetchTasklet")
.jobExecutionId(jobExecution.getId())
.stepExecutionId(jobExecution.getStepExecutions().stream()
.findFirst().map(se -> se.getId()).orElse(null))
.requestUri(fullUri)
.httpMethod("GET")
.statusCode(statusCode)
.responseTimeMs(duration)
.responseCount(responseSize)
.errorMessage(errorMessage)
.createdAt(LocalDateTime.now())
.build());
}
}
}

파일 보기

@ -0,0 +1,61 @@
package com.snp.batch.jobs.shipdetail.batch.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* 선박제원정보 IMO 목록을 N개 파티션으로 분할하는 Partitioner.
* JobExecutionContext의 allImoNumbers를 읽어 파티션별 ExecutionContext에 할당.
*/
@Slf4j
public class ShipDetailPartitioner implements Partitioner {
private final List<String> allImoNumbers;
private final int partitionCount;
public ShipDetailPartitioner(List<String> allImoNumbers, int partitionCount) {
this.allImoNumbers = allImoNumbers;
this.partitionCount = partitionCount;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int totalSize = allImoNumbers.size();
// 실제 파티션 : IMO 수가 적으면 파티션 수를 줄임
int actualPartitionCount = Math.min(partitionCount, Math.max(1, totalSize));
Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
int partitionSize = (int) Math.ceil((double) totalSize / actualPartitionCount);
for (int i = 0; i < actualPartitionCount; i++) {
int fromIndex = i * partitionSize;
int toIndex = Math.min(fromIndex + partitionSize, totalSize);
if (fromIndex >= totalSize) {
break;
}
List<String> partitionImos = allImoNumbers.subList(fromIndex, toIndex);
ExecutionContext context = new ExecutionContext();
context.putString("partitionImoNumbers", String.join(",", partitionImos));
context.putInt("partitionIndex", i);
context.putInt("partitionSize", partitionImos.size());
String partitionKey = "partition" + i;
partitions.put(partitionKey, context);
log.info("[ShipDetailPartitioner] {} : IMO {} 건 (index {}-{})",
partitionKey, partitionImos.size(), fromIndex, toIndex - 1);
}
log.info("[ShipDetailPartitioner] 총 {} 개 파티션 생성 (전체 IMO: {} 건)",
partitions.size(), totalSize);
return partitions;
}
}

파일 보기

@ -2,18 +2,19 @@ package com.snp.batch.jobs.shipdetail.batch.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.config.BaseMultiStepJobConfig;
import org.springframework.batch.core.JobExecutionListener;
import com.snp.batch.global.repository.BatchFailedRecordRepository;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailDto;
import com.snp.batch.jobs.shipdetail.batch.entity.ShipDetailEntity;
import com.snp.batch.jobs.shipdetail.batch.processor.ShipDetailDataProcessor;
import com.snp.batch.jobs.shipdetail.batch.reader.ShipDetailUpdateDataReader;
import com.snp.batch.jobs.shipdetail.batch.writer.ShipDetailDataWriter;
import com.snp.batch.global.repository.BatchFailedRecordRepository;
import com.snp.batch.service.BatchApiLogService;
import com.snp.batch.service.BatchDateService;
import com.snp.batch.service.BatchFailedRecordService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
@ -22,20 +23,20 @@ import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@Slf4j
@ -53,6 +54,7 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
private final BatchFailedRecordService batchFailedRecordService;
private final BatchFailedRecordRepository batchFailedRecordRepository;
private final JobExecutionListener autoRetryJobExecutionListener;
private final TaskExecutor batchPartitionExecutor;
@Value("${app.batch.ship-api.url}")
private String maritimeApiUrl;
@ -75,8 +77,10 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
@Value("${app.batch.last-execution-buffer-hours:24}")
private int lastExecutionBufferHours;
protected String getApiKey() {return "SHIP_DETAIL_UPDATE_API";}
@Value("${app.batch.ship-detail-update.partition-count:4}")
private int partitionCount;
protected String getApiKey() {return "SHIP_DETAIL_UPDATE_API";}
public ShipDetailUpdateJobConfig(
JobRepository jobRepository,
@ -91,7 +95,8 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
BatchApiLogService batchApiLogService,
BatchFailedRecordService batchFailedRecordService,
BatchFailedRecordRepository batchFailedRecordRepository,
@Qualifier("autoRetryJobExecutionListener") JobExecutionListener autoRetryJobExecutionListener) {
@Qualifier("autoRetryJobExecutionListener") JobExecutionListener autoRetryJobExecutionListener,
@Qualifier("batchPartitionExecutor") TaskExecutor batchPartitionExecutor) {
super(jobRepository, transactionManager);
this.shipDetailDataProcessor = shipDetailDataProcessor;
this.shipDetailDataWriter = shipDetailDataWriter;
@ -104,6 +109,7 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
this.batchFailedRecordService = batchFailedRecordService;
this.batchFailedRecordRepository = batchFailedRecordRepository;
this.autoRetryJobExecutionListener = autoRetryJobExecutionListener;
this.batchPartitionExecutor = batchPartitionExecutor;
}
@Override
@ -121,41 +127,105 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
return "ShipDetailUpdateStep";
}
// ========================================
// Job Flow 정의
// ========================================
@Override
protected Job createJobFlow(JobBuilder jobBuilder) {
return jobBuilder
.start(ShipDetailUpdateStep())
.next(emptyResponseDecider())
.start(shipDetailImoFetchStep())
.next(imoCountDecider())
.on("EMPTY_RESPONSE").end()
.from(emptyResponseDecider()).on("NORMAL").to(shipDetailLastExecutionUpdateStep())
.from(imoCountDecider()).on("NORMAL").to(shipDetailUpdatePartitionedStep())
.next(shipDetailLastExecutionUpdateStep())
.end()
.build();
}
/**
* 응답 데이터 건수 판별 Decider
* 응답 0건이면 EMPTY_RESPONSE, 아니면 NORMAL 반환
*/
// ========================================
// Step 0: IMO 목록 조회
// ========================================
@Bean
public JobExecutionDecider emptyResponseDecider() {
public Tasklet shipDetailImoFetchTasklet() {
return new ShipDetailImoFetchTasklet(
maritimeApiWebClient, batchDateService, batchApiLogService,
batchFailedRecordRepository, maritimeApiUrl);
}
@Bean(name = "ShipDetailImoFetchStep")
public Step shipDetailImoFetchStep() {
return new StepBuilder("ShipDetailImoFetchStep", jobRepository)
.tasklet(shipDetailImoFetchTasklet(), transactionManager)
.build();
}
// ========================================
// Decider: IMO 건수 확인
// ========================================
@Bean
public JobExecutionDecider imoCountDecider() {
return (jobExecution, stepExecution) -> {
if (stepExecution != null && stepExecution.getReadCount() == 0) {
log.info("[ShipDetailUpdateJob] Decider: EMPTY_RESPONSE - 응답 데이터 0건으로 LAST_EXECUTION 업데이트 스킵 (다음 실행 시 동일 범위 재조회)");
int totalImoCount = jobExecution.getExecutionContext().getInt("totalImoCount", 0);
if (totalImoCount == 0) {
log.info("[ShipDetailUpdateJob] Decider: EMPTY_RESPONSE - IMO 0건으로 LAST_EXECUTION 업데이트 스킵");
return new FlowExecutionStatus("EMPTY_RESPONSE");
}
log.info("[ShipDetailUpdateJob] Decider: NORMAL 모드 - LAST_EXECUTION 업데이트 진행");
log.info("[ShipDetailUpdateJob] Decider: NORMAL - IMO {} 건 처리 시작", totalImoCount);
return new FlowExecutionStatus("NORMAL");
};
}
// ========================================
// Step 1: Partitioned Step (병렬 처리)
// ========================================
@Bean
@StepScope
public ShipDetailPartitioner shipDetailPartitioner(
@Value("#{jobExecutionContext['allImoNumbers']}") String allImoNumbersStr
) {
List<String> allImoNumbers = (allImoNumbersStr != null && !allImoNumbersStr.isBlank())
? Arrays.asList(allImoNumbersStr.split(","))
: Collections.emptyList();
return new ShipDetailPartitioner(allImoNumbers, partitionCount);
}
@Bean(name = "ShipDetailUpdatePartitionedStep")
public Step shipDetailUpdatePartitionedStep() {
return new StepBuilder("ShipDetailUpdatePartitionedStep", jobRepository)
.partitioner("ShipDetailUpdateStep", shipDetailPartitioner(null))
.step(shipDetailUpdateWorkerStep())
.taskExecutor(batchPartitionExecutor)
.gridSize(partitionCount)
.build();
}
@Bean
public Step shipDetailUpdateWorkerStep() {
return new StepBuilder("ShipDetailUpdateStep", jobRepository)
.<ShipDetailDto, ShipDetailEntity>chunk(getChunkSize(), transactionManager)
.reader(shipDetailUpdateDataReader(null, null, null, null))
.processor(shipDetailDataProcessor(null))
.writer(shipDetailDataWriter)
.build();
}
// ========================================
// Reader / Processor (StepScope)
// ========================================
@Bean
@StepScope
public ShipDetailUpdateDataReader shipDetailUpdateDataReader(
@Value("#{stepExecution.jobExecution.id}") Long jobExecutionId,
@Value("#{stepExecution.id}") Long stepExecutionId,
@Value("#{jobParameters['executionMode']}") String executionMode,
@Value("#{jobParameters['sourceStepExecutionId']}") String sourceStepExecutionIdParam
@Value("#{stepExecutionContext['partitionImoNumbers']}") String partitionImoNumbers
) {
ShipDetailUpdateDataReader reader = new ShipDetailUpdateDataReader(
maritimeApiWebClient, jdbcTemplate, objectMapper,
@ -163,36 +233,15 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
shipDetailBatchSize, delayOnSuccessMs, delayOnFailureMs, maxRetryCount
);
reader.setExecutionIds(jobExecutionId, stepExecutionId);
reader.setPartitionImoNumbers(partitionImoNumbers);
// RECOLLECT 모드: DB에서 실패 키를 직접 조회하여 주입
if ("RECOLLECT".equals(executionMode) && sourceStepExecutionIdParam != null && !sourceStepExecutionIdParam.isBlank()) {
Long sourceStepExecutionId = Long.parseLong(sourceStepExecutionIdParam);
List<String> retryKeys = batchFailedRecordRepository.findFailedRecordKeysByStepExecutionId(
"ShipDetailUpdateJob", sourceStepExecutionId);
if (!retryKeys.isEmpty()) {
reader.setRetryRecordKeys(retryKeys);
reader.setSourceStepExecutionId(sourceStepExecutionId);
log.info("[ShipDetailUpdateJob] Retry 모드 활성화: DB에서 {} 건의 실패 키 조회, sourceStepExecutionId: {}",
retryKeys.size(), sourceStepExecutionId);
} else {
log.warn("[ShipDetailUpdateJob] RECOLLECT 모드이나 DB에서 실패 키를 찾을 수 없음 (sourceStepExecutionId: {})",
sourceStepExecutionId);
}
if ("RECOLLECT".equals(executionMode)) {
reader.setRecollectMode(true);
}
return reader;
}
@Override
protected ItemReader<ShipDetailDto> createReader() { // 타입 변경
return shipDetailUpdateDataReader;
}
@Override
protected ItemProcessor<ShipDetailDto, ShipDetailEntity> createProcessor() {
return shipDetailDataProcessor;
}
@Bean
@StepScope
public ShipDetailDataProcessor shipDetailDataProcessor(
@ -201,29 +250,24 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
return new ShipDetailDataProcessor(jobExecutionId);
}
@Override
protected ItemWriter<ShipDetailEntity> createWriter() { // 타입 변경
return shipDetailDataWriter;
}
@Override
protected int getChunkSize() {
return 20;
}
// ========================================
// Job / Step Bean 등록
// ========================================
@Bean(name = "ShipDetailUpdateJob")
public Job ShipDetailUpdateJob() {
return job();
}
@Bean(name = "ShipDetailUpdateStep")
public Step ShipDetailUpdateStep() {
return step();
}
// ========================================
// Step 2: LastExecution 업데이트
// ========================================
/**
* 2단계: 모든 스텝 성공 배치 실행 로그(날짜) 업데이트
*/
@Bean
public Tasklet shipDetailLastExecutionUpdateTasklet() {
return (contribution, chunkContext) -> {
@ -257,6 +301,7 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
return RepeatStatus.FINISHED;
};
}
@Bean(name = "ShipDetailLastExecutionUpdateStep")
public Step shipDetailLastExecutionUpdateStep() {
return new StepBuilder("ShipDetailLastExecutionUpdateStep", jobRepository)

파일 보기

@ -4,9 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailApiResponse;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailDto;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDto;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipResultDto;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipUpdateApiResponse;
import com.snp.batch.service.BatchApiLogService;
import com.snp.batch.service.BatchDateService;
import com.snp.batch.service.BatchFailedRecordService;
@ -41,10 +39,11 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
// 실패 IMO 추적
private final List<String> failedImoNumbers = new ArrayList<>();
private String lastErrorMessage;
private boolean afterFetchCompleted = false;
// Retry 모드
private List<String> retryRecordKeys;
private Long sourceStepExecutionId;
// 파티션 모드
private String partitionImoNumbers;
private boolean recollectMode = false;
public ShipDetailUpdateDataReader(
WebClient webClient,
@ -73,16 +72,12 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
enableChunkMode();
}
public void setRetryRecordKeys(List<String> retryRecordKeys) {
this.retryRecordKeys = retryRecordKeys;
public void setPartitionImoNumbers(String partitionImoNumbers) {
this.partitionImoNumbers = partitionImoNumbers;
}
public void setSourceStepExecutionId(Long sourceStepExecutionId) {
this.sourceStepExecutionId = sourceStepExecutionId;
}
private boolean isRetryMode() {
return retryRecordKeys != null && !retryRecordKeys.isEmpty();
public void setRecollectMode(boolean recollectMode) {
this.recollectMode = recollectMode;
}
@Override
@ -90,10 +85,6 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
return "ShipDetailUpdateDataReader";
}
protected String getShipUpdateApiPath() {
return "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipChangesByLastUpdateDateRange";
}
@Override
protected String getApiPath() {
return "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipsByIHSLRorIMONumbersAll";
@ -109,25 +100,22 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
this.allImoNumbers = null;
this.failedImoNumbers.clear();
this.lastErrorMessage = null;
// retryRecordKeys는 JobConfig에서 주입하므로 초기화하지 않음
this.afterFetchCompleted = false;
}
@Override
protected void beforeFetch() {
if (isRetryMode()) {
log.info("[{}] [RETRY MODE] 실패 건 재수집 모드 시작 - 대상 IMO: {} 건",
getReaderName(), retryRecordKeys.size());
allImoNumbers = new ArrayList<>(retryRecordKeys);
log.info("[{}] [RETRY MODE] IMO 목록: {}", getReaderName(), allImoNumbers);
// 파티션에서 할당받은 IMO 목록을 파싱
if (partitionImoNumbers != null && !partitionImoNumbers.isBlank()) {
allImoNumbers = new ArrayList<>(Arrays.asList(partitionImoNumbers.split(",")));
} else {
log.info("[{}] 변경된 IMO 번호 조회 시작...", getReaderName());
ShipUpdateApiResponse response = callShipUpdateApi();
allImoNumbers = extractUpdateImoNumbers(response);
log.info("[{}] 총 {} 개의 변경된 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
allImoNumbers = Collections.emptyList();
}
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 파티션 IMO {} 건 할당 (recollectMode: {})",
getReaderName(), allImoNumbers.size(), recollectMode);
log.info("[{}] 설정: batch-size={}, delay-success={}ms, delay-failure={}ms, max-retry={}",
getReaderName(), batchSize, delayOnSuccessMs, delayOnFailureMs, maxRetryCount);
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
@ -255,46 +243,31 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
);
}
private ShipUpdateApiResponse callShipUpdateApi() {
Map<String, String> params = batchDateService.getDateRangeWithoutTimeParams(getApiKey());
return executeSingleApiCall(
maritimeApiUrl,
getShipUpdateApiPath(),
params,
new ParameterizedTypeReference<ShipUpdateApiResponse>() {},
batchApiLogService,
res -> res.getShips() != null ? (long) res.getShips().size() : 0L
);
}
private List<String> extractUpdateImoNumbers(ShipUpdateApiResponse response) {
if (response.getShips() == null) {
return Collections.emptyList();
}
return response.getShips().stream()
.map(ShipDto::getImoNumber)
.filter(imoNumber -> imoNumber != null)
.collect(Collectors.toList());
}
@Override
protected void afterFetch(List<ShipDetailDto> data) {
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
int totalBatches = allImoNumbers != null
? (int) Math.ceil((double) allImoNumbers.size() / batchSize) : 0;
try {
if (data == null) {
if (data == null && !afterFetchCompleted) {
afterFetchCompleted = true;
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
getReaderName(), allImoNumbers.size());
getReaderName(), allImoNumbers != null ? allImoNumbers.size() : 0);
if (isRetryMode()) {
// Retry 모드: 성공 RESOLVED 처리 + 재실패 FAILED 레코드 저장
log.info("[{}] [RETRY MODE] 재수집 결과 처리 시작 (sourceStepExecutionId: {})",
getReaderName(), sourceStepExecutionId);
batchFailedRecordService.resolveSuccessfulRetries(
"ShipDetailUpdateJob", sourceStepExecutionId, retryRecordKeys, failedImoNumbers);
if (recollectMode) {
// RECOLLECT 모드: 성공 RESOLVED 처리 + 재실패 FAILED 레코드 저장
Long sourceJobExecutionId = getSourceJobExecutionId();
log.info("[{}] [RECOLLECT] 재수집 결과 처리 시작 (sourceJobExecutionId: {})",
getReaderName(), sourceJobExecutionId);
if (sourceJobExecutionId != null) {
batchFailedRecordService.resolveSuccessfulRetries(
"ShipDetailUpdateJob", sourceJobExecutionId,
allImoNumbers, failedImoNumbers);
}
if (!failedImoNumbers.isEmpty()) {
log.warn("[{}] [RETRY MODE] 재실패 IMO 건수: {} 건", getReaderName(), failedImoNumbers.size());
log.warn("[{}] [RECOLLECT] 재실패 IMO 건수: {} 건", getReaderName(), failedImoNumbers.size());
batchFailedRecordService.saveFailedRecords(
"ShipDetailUpdateJob",
getJobExecutionId(),
@ -304,11 +277,11 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
lastErrorMessage
);
} else {
log.info("[{}] [RETRY MODE] 모든 재수집 건 정상 처리 완료", getReaderName());
log.info("[{}] [RECOLLECT] 모든 재수집 건 정상 처리 완료", getReaderName());
}
int successCount = retryRecordKeys.size() - failedImoNumbers.size();
log.info("[{}] [RETRY MODE] 결과: 성공 {} 건, 재실패 {} 건",
int successCount = (allImoNumbers != null ? allImoNumbers.size() : 0) - failedImoNumbers.size();
log.info("[{}] [RECOLLECT] 결과: 성공 {} 건, 재실패 {} 건",
getReaderName(), successCount, failedImoNumbers.size());
} else {
// 일반 모드: 동기 저장 (자동 재수집 트리거 전에 커밋 보장)
@ -333,11 +306,11 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
log.error("[{}] afterFetch 처리 중 예외 발생: {}", getReaderName(), e.getMessage(), e);
} finally {
// 일반 모드에서만: 실패 건이 있으면 ExecutionContext에 저장 (자동 재수집 트리거용)
if (!isRetryMode() && !failedImoNumbers.isEmpty() && stepExecution != null) {
if (!recollectMode && !failedImoNumbers.isEmpty() && stepExecution != null) {
try {
String failedKeys = String.join(",", failedImoNumbers);
stepExecution.getExecutionContext().putString("failedRecordKeys", failedKeys);
stepExecution.getExecutionContext().putLong("failedStepExecutionId", getStepExecutionId());
stepExecution.getExecutionContext().putLong("failedJobExecutionId", getJobExecutionId());
stepExecution.getExecutionContext().putString("failedApiKey", getApiKey());
log.info("[{}] 자동 재수집 대상 실패 키 {} 건 ExecutionContext에 저장",
getReaderName(), failedImoNumbers.size());
@ -347,4 +320,18 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
}
}
}
/**
* JobParameter에서 sourceJobExecutionId를 조회 (RECOLLECT 모드용)
*/
private Long getSourceJobExecutionId() {
if (stepExecution != null) {
String param = stepExecution.getJobExecution()
.getJobParameters().getString("sourceJobExecutionId");
if (param != null && !param.isBlank()) {
return Long.parseLong(param);
}
}
return null;
}
}

파일 보기

@ -141,19 +141,19 @@ public class BatchFailedRecordService {
/**
* 재수집 성공 건을 RESOLVED로 처리합니다.
* 원본 stepExecutionId로 범위를 제한하여 해당 Step 실패 건만 RESOLVED 처리합니다.
* 원본 jobExecutionId로 범위를 제한하여 해당 Job 실패 건만 RESOLVED 처리합니다.
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void resolveSuccessfulRetries(String jobName, Long sourceStepExecutionId,
public void resolveSuccessfulRetries(String jobName, Long sourceJobExecutionId,
List<String> allRetryKeys, List<String> failedAgainKeys) {
List<String> successfulKeys = allRetryKeys.stream()
.filter(key -> !failedAgainKeys.contains(key))
.toList();
if (!successfulKeys.isEmpty()) {
int resolved = batchFailedRecordRepository.resolveByStepExecutionIdAndRecordKeys(
jobName, sourceStepExecutionId, successfulKeys, LocalDateTime.now());
log.info("실패 레코드 RESOLVED 처리: {} 건 (job: {}, sourceStepExecutionId: {})",
resolved, jobName, sourceStepExecutionId);
int resolved = batchFailedRecordRepository.resolveByJobExecutionIdAndRecordKeys(
jobName, sourceJobExecutionId, successfulKeys, LocalDateTime.now());
log.info("실패 레코드 RESOLVED 처리: {} 건 (job: {}, sourceJobExecutionId: {})",
resolved, jobName, sourceJobExecutionId);
}
}
}

파일 보기

@ -110,10 +110,11 @@ app:
# ShipDetailUpdate 배치 설정 (dev - 기존과 동일하게 20건 유지)
ship-detail-update:
batch-size: 20 # dev에서는 문제 없으므로 기존 20건 유지
batch-size: 10 # dev에서는 문제 없으므로 기존 20건 유지
delay-on-success-ms: 300
delay-on-failure-ms: 2000
max-retry-count: 3
partition-count: 4 # dev: 2개 파티션
# AIS Target 배치 설정
ais-target:

파일 보기

@ -115,6 +115,7 @@ app:
delay-on-success-ms: 300 # 성공 시 딜레이 (ms)
delay-on-failure-ms: 2000 # 실패 시 딜레이 (ms)
max-retry-count: 3 # 최대 재시도 횟수
partition-count: 4 # prod: 4개 파티션
# AIS Target 배치 설정
ais-target:

파일 보기

@ -172,6 +172,7 @@ app:
delay-on-success-ms: 300 # 성공 시 딜레이 (ms)
delay-on-failure-ms: 2000 # 실패 시 딜레이 (ms)
max-retry-count: 3 # 최대 재시도 횟수
partition-count: 4 # 병렬 파티션 수
# AIS Target Import 배치 설정 (캐시 업데이트 전용)
ais-target: