From 59ecdd851eb3c0b853e8873282f08011ba69ad69 Mon Sep 17 00:00:00 2001 From: HYOJIN Date: Fri, 20 Mar 2026 16:15:05 +0900 Subject: [PATCH] =?UTF-8?q?feat(shipdetail):=20=EC=84=A0=EB=B0=95=EC=A0=9C?= =?UTF-8?q?=EC=9B=90=EC=A0=95=EB=B3=B4=20=EB=B0=B0=EC=B9=98=20=EC=9E=91?= =?UTF-8?q?=EC=97=85=20=EB=B3=91=EB=A0=AC=ED=99=94=20(Partitioned=20Step)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - IMO 목록을 N개 파티션으로 분할하여 병렬 API 호출 - ImoFetchTasklet으로 IMO 조회 단계 분리 - sourceStepExecutionId → sourceJobExecutionId 마이그레이션 - afterFetch 중복 실행 방지 플래그 추가 - partition-count 설정 외부화 (dev:2, prod:4) --- .../AutoRetryJobExecutionListener.java | 14 +- .../listener/AutoRetryTriggerService.java | 8 +- .../snp/batch/global/config/AsyncConfig.java | 17 ++ .../BatchFailedRecordRepository.java | 24 +++ .../config/ShipDetailImoFetchTasklet.java | 172 ++++++++++++++++++ .../batch/config/ShipDetailPartitioner.java | 61 +++++++ .../config/ShipDetailUpdateJobConfig.java | 155 ++++++++++------ .../reader/ShipDetailUpdateDataReader.java | 115 ++++++------ .../service/BatchFailedRecordService.java | 12 +- src/main/resources/application-dev.yml | 3 +- src/main/resources/application-prod.yml | 1 + src/main/resources/application.yml | 1 + 12 files changed, 443 insertions(+), 140 deletions(-) create mode 100644 src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailImoFetchTasklet.java create mode 100644 src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailPartitioner.java diff --git a/src/main/java/com/snp/batch/common/batch/listener/AutoRetryJobExecutionListener.java b/src/main/java/com/snp/batch/common/batch/listener/AutoRetryJobExecutionListener.java index cb0ee31..70a1cf6 100644 --- a/src/main/java/com/snp/batch/common/batch/listener/AutoRetryJobExecutionListener.java +++ b/src/main/java/com/snp/batch/common/batch/listener/AutoRetryJobExecutionListener.java @@ -30,7 +30,7 @@ import java.util.Set; public class AutoRetryJobExecutionListener implements JobExecutionListener { private static final String FAILED_RECORD_KEYS = "failedRecordKeys"; - private static final String FAILED_STEP_EXECUTION_ID = "failedStepExecutionId"; + private static final String FAILED_JOB_EXECUTION_ID = "failedJobExecutionId"; private static final String FAILED_API_KEY = "failedApiKey"; private static final int MAX_AUTO_RETRY_COUNT = 3; @@ -61,7 +61,7 @@ public class AutoRetryJobExecutionListener implements JobExecutionListener { // 모든 Step의 failedRecordKeys를 Set으로 병합 (중복 제거) Set mergedKeys = new LinkedHashSet<>(); - Long sourceStepExecutionId = null; + Long sourceJobExecutionId = jobExecution.getId(); String apiKey = null; for (StepExecution stepExecution : jobExecution.getStepExecutions()) { @@ -77,12 +77,6 @@ public class AutoRetryJobExecutionListener implements JobExecutionListener { .filter(key -> !key.isBlank()) .forEach(mergedKeys::add); - // sourceStepExecutionId: 마지막 Step 것 사용 - sourceStepExecutionId = stepExecution.getExecutionContext() - .containsKey(FAILED_STEP_EXECUTION_ID) - ? stepExecution.getExecutionContext().getLong(FAILED_STEP_EXECUTION_ID) - : stepExecution.getId(); - // apiKey: non-null인 것 중 첫 번째 사용 if (apiKey == null) { apiKey = stepExecution.getExecutionContext() @@ -112,8 +106,8 @@ public class AutoRetryJobExecutionListener implements JobExecutionListener { log.info("[AutoRetry] {} Job 완료 후 실패 건 {}건 감지 → 자동 재수집 트리거", jobName, mergedKeys.size()); - // sourceStepExecutionId 기반으로 1회만 triggerRetryAsync 호출 (실패 키는 DB에서 직접 조회) + // sourceJobExecutionId 기반으로 1회만 triggerRetryAsync 호출 (실패 키는 DB에서 직접 조회) autoRetryTriggerService.triggerRetryAsync( - jobName, mergedKeys.size(), sourceStepExecutionId, apiKey); + jobName, mergedKeys.size(), sourceJobExecutionId, apiKey); } } diff --git a/src/main/java/com/snp/batch/common/batch/listener/AutoRetryTriggerService.java b/src/main/java/com/snp/batch/common/batch/listener/AutoRetryTriggerService.java index a6fa6c5..5485efb 100644 --- a/src/main/java/com/snp/batch/common/batch/listener/AutoRetryTriggerService.java +++ b/src/main/java/com/snp/batch/common/batch/listener/AutoRetryTriggerService.java @@ -31,7 +31,7 @@ public class AutoRetryTriggerService { @Async("autoRetryExecutor") public void triggerRetryAsync(String jobName, int failedCount, - Long sourceStepExecutionId, String apiKey) { + Long sourceJobExecutionId, String apiKey) { try { Job job = jobMap.get(jobName); if (job == null) { @@ -41,7 +41,7 @@ public class AutoRetryTriggerService { JobParametersBuilder builder = new JobParametersBuilder() .addLong("timestamp", System.currentTimeMillis()) - .addString("sourceStepExecutionId", String.valueOf(sourceStepExecutionId)) + .addString("sourceJobExecutionId", String.valueOf(sourceJobExecutionId)) .addString("executionMode", "RECOLLECT") .addString("reason", "자동 재수집 (실패 건 자동 처리)") .addString("executor", "AUTO_RETRY"); @@ -52,8 +52,8 @@ public class AutoRetryTriggerService { JobParameters retryParams = builder.toJobParameters(); - log.info("[AutoRetry] 재수집 Job 실행 시작: jobName={}, 실패건={}, sourceStepExecutionId={}", - jobName, failedCount, sourceStepExecutionId); + log.info("[AutoRetry] 재수집 Job 실행 시작: jobName={}, 실패건={}, sourceJobExecutionId={}", + jobName, failedCount, sourceJobExecutionId); JobExecution retryExecution = jobLauncher.run(job, retryParams); diff --git a/src/main/java/com/snp/batch/global/config/AsyncConfig.java b/src/main/java/com/snp/batch/global/config/AsyncConfig.java index 636d5e9..e9b273e 100644 --- a/src/main/java/com/snp/batch/global/config/AsyncConfig.java +++ b/src/main/java/com/snp/batch/global/config/AsyncConfig.java @@ -5,6 +5,8 @@ import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.core.task.TaskExecutor; + import java.util.concurrent.Executor; @Configuration @@ -36,4 +38,19 @@ public class AsyncConfig { executor.initialize(); return executor; } + + /** + * 배치 파티션 병렬 실행 전용 Executor. + * ShipDetailUpdate 파티셔닝 등 배치 Step 병렬 처리에 사용. + */ + @Bean(name = "batchPartitionExecutor") + public TaskExecutor batchPartitionExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(4); // 기본 파티션 수 + executor.setMaxPoolSize(8); // 최대 파티션 수 + executor.setQueueCapacity(20); // 대기 큐 + executor.setThreadNamePrefix("BatchPartition-"); + executor.initialize(); + return executor; + } } \ No newline at end of file diff --git a/src/main/java/com/snp/batch/global/repository/BatchFailedRecordRepository.java b/src/main/java/com/snp/batch/global/repository/BatchFailedRecordRepository.java index b775e73..d5c7a0c 100644 --- a/src/main/java/com/snp/batch/global/repository/BatchFailedRecordRepository.java +++ b/src/main/java/com/snp/batch/global/repository/BatchFailedRecordRepository.java @@ -59,6 +59,19 @@ public interface BatchFailedRecordRepository extends JpaRepository recordKeys, @Param("resolvedAt") LocalDateTime resolvedAt); + /** + * 특정 Job 실행의 실패 레코드를 RESOLVED로 벌크 업데이트 + */ + @Modifying + @Query("UPDATE BatchFailedRecord r SET r.status = 'RESOLVED', r.resolvedAt = :resolvedAt " + + "WHERE r.jobName = :jobName AND r.jobExecutionId = :jobExecutionId " + + "AND r.recordKey IN :recordKeys AND r.status = 'FAILED'") + int resolveByJobExecutionIdAndRecordKeys( + @Param("jobName") String jobName, + @Param("jobExecutionId") Long jobExecutionId, + @Param("recordKeys") List recordKeys, + @Param("resolvedAt") LocalDateTime resolvedAt); + /** * ID 목록으로 FAILED 상태 레코드를 일괄 RESOLVED 처리 */ @@ -89,6 +102,17 @@ public interface BatchFailedRecordRepository extends JpaRepository findFailedRecordKeysByJobExecutionId( + @Param("jobName") String jobName, + @Param("jobExecutionId") Long jobExecutionId); + /** * 동일 Job에서 이미 FAILED 상태인 recordKey 목록 조회 (중복 방지용) */ diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailImoFetchTasklet.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailImoFetchTasklet.java new file mode 100644 index 0000000..22ece70 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailImoFetchTasklet.java @@ -0,0 +1,172 @@ +package com.snp.batch.jobs.shipdetail.batch.config; + +import com.snp.batch.global.model.BatchApiLog; +import com.snp.batch.global.repository.BatchFailedRecordRepository; +import com.snp.batch.jobs.shipdetail.batch.dto.ShipDto; +import com.snp.batch.jobs.shipdetail.batch.dto.ShipUpdateApiResponse; +import com.snp.batch.service.BatchApiLogService; +import com.snp.batch.service.BatchDateService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.util.UriComponentsBuilder; + +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * 선박제원정보 변경 IMO 목록 조회 Tasklet. + * NORMAL 모드: Maritime API에서 변경된 IMO 번호 조회 + * RECOLLECT 모드: DB에서 실패 IMO 번호 조회 + * 조회 결과를 JobExecutionContext에 저장하여 Partitioner가 사용할 수 있게 함. + */ +@Slf4j +public class ShipDetailImoFetchTasklet implements Tasklet { + + private static final String API_KEY = "SHIP_DETAIL_UPDATE_API"; + private static final String SHIP_UPDATE_API_PATH = "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipChangesByLastUpdateDateRange"; + private static final String JOB_NAME = "ShipDetailUpdateJob"; + + private final WebClient maritimeApiWebClient; + private final BatchDateService batchDateService; + private final BatchApiLogService batchApiLogService; + private final BatchFailedRecordRepository batchFailedRecordRepository; + private final String maritimeApiUrl; + + public ShipDetailImoFetchTasklet( + WebClient maritimeApiWebClient, + BatchDateService batchDateService, + BatchApiLogService batchApiLogService, + BatchFailedRecordRepository batchFailedRecordRepository, + String maritimeApiUrl) { + this.maritimeApiWebClient = maritimeApiWebClient; + this.batchDateService = batchDateService; + this.batchApiLogService = batchApiLogService; + this.batchFailedRecordRepository = batchFailedRecordRepository; + this.maritimeApiUrl = maritimeApiUrl; + } + + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { + JobExecution jobExecution = chunkContext.getStepContext() + .getStepExecution().getJobExecution(); + String executionMode = jobExecution.getJobParameters() + .getString("executionMode", "NORMAL"); + + List imoNumbers; + + if ("RECOLLECT".equals(executionMode)) { + imoNumbers = fetchRecollectImoNumbers(jobExecution); + } else { + imoNumbers = fetchChangedImoNumbers(jobExecution); + } + + // JobExecutionContext에 저장 + jobExecution.getExecutionContext().putInt("totalImoCount", imoNumbers.size()); + + if (!imoNumbers.isEmpty()) { + jobExecution.getExecutionContext().putString("allImoNumbers", String.join(",", imoNumbers)); + } + + log.info("[ShipDetailImoFetchTasklet] {} 모드: IMO {} 건 조회 완료", + executionMode, imoNumbers.size()); + + return RepeatStatus.FINISHED; + } + + private List fetchRecollectImoNumbers(JobExecution jobExecution) { + String sourceJobExecutionIdParam = jobExecution.getJobParameters() + .getString("sourceJobExecutionId"); + + if (sourceJobExecutionIdParam == null || sourceJobExecutionIdParam.isBlank()) { + log.warn("[ShipDetailImoFetchTasklet] RECOLLECT 모드이나 sourceJobExecutionId가 없음"); + return Collections.emptyList(); + } + + Long sourceJobExecutionId = Long.parseLong(sourceJobExecutionIdParam); + List retryKeys = batchFailedRecordRepository.findFailedRecordKeysByJobExecutionId( + JOB_NAME, sourceJobExecutionId); + + log.info("[ShipDetailImoFetchTasklet] RECOLLECT: DB에서 {} 건의 실패 키 조회 (sourceJobExecutionId: {})", + retryKeys.size(), sourceJobExecutionId); + + return retryKeys; + } + + private List fetchChangedImoNumbers(JobExecution jobExecution) { + Map params = batchDateService.getDateRangeWithoutTimeParams(API_KEY); + + MultiValueMap multiValueParams = new LinkedMultiValueMap<>(); + params.forEach((key, value) -> + multiValueParams.put(key, Collections.singletonList(value))); + + String fullUri = UriComponentsBuilder.fromHttpUrl(maritimeApiUrl) + .path(SHIP_UPDATE_API_PATH) + .queryParams(multiValueParams) + .build() + .toUriString(); + + long startTime = System.currentTimeMillis(); + int statusCode = 200; + String errorMessage = null; + Long responseSize = 0L; + + try { + log.info("[ShipDetailImoFetchTasklet] 변경 IMO 조회 API 호출: {}", fullUri); + + ShipUpdateApiResponse response = maritimeApiWebClient.get() + .uri(uriBuilder -> { + uriBuilder.path(SHIP_UPDATE_API_PATH); + params.forEach(uriBuilder::queryParam); + return uriBuilder.build(); + }) + .retrieve() + .bodyToMono(new ParameterizedTypeReference() {}) + .block(); + + if (response == null || response.getShips() == null) { + return Collections.emptyList(); + } + + List imoNumbers = response.getShips().stream() + .map(ShipDto::getImoNumber) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + responseSize = (long) imoNumbers.size(); + return imoNumbers; + + } catch (Exception e) { + statusCode = 500; + errorMessage = e.getMessage(); + log.error("[ShipDetailImoFetchTasklet] API 호출 실패: {}", e.getMessage(), e); + throw e; + } finally { + long duration = System.currentTimeMillis() - startTime; + batchApiLogService.saveLog(BatchApiLog.builder() + .apiRequestLocation("ShipDetailImoFetchTasklet") + .jobExecutionId(jobExecution.getId()) + .stepExecutionId(jobExecution.getStepExecutions().stream() + .findFirst().map(se -> se.getId()).orElse(null)) + .requestUri(fullUri) + .httpMethod("GET") + .statusCode(statusCode) + .responseTimeMs(duration) + .responseCount(responseSize) + .errorMessage(errorMessage) + .createdAt(LocalDateTime.now()) + .build()); + } + } +} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailPartitioner.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailPartitioner.java new file mode 100644 index 0000000..b90b4f7 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailPartitioner.java @@ -0,0 +1,61 @@ +package com.snp.batch.jobs.shipdetail.batch.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.partition.support.Partitioner; +import org.springframework.batch.item.ExecutionContext; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * 선박제원정보 IMO 목록을 N개 파티션으로 분할하는 Partitioner. + * JobExecutionContext의 allImoNumbers를 읽어 파티션별 ExecutionContext에 할당. + */ +@Slf4j +public class ShipDetailPartitioner implements Partitioner { + + private final List allImoNumbers; + private final int partitionCount; + + public ShipDetailPartitioner(List allImoNumbers, int partitionCount) { + this.allImoNumbers = allImoNumbers; + this.partitionCount = partitionCount; + } + + @Override + public Map partition(int gridSize) { + int totalSize = allImoNumbers.size(); + // 실제 파티션 수: IMO 수가 적으면 파티션 수를 줄임 + int actualPartitionCount = Math.min(partitionCount, Math.max(1, totalSize)); + + Map partitions = new LinkedHashMap<>(); + int partitionSize = (int) Math.ceil((double) totalSize / actualPartitionCount); + + for (int i = 0; i < actualPartitionCount; i++) { + int fromIndex = i * partitionSize; + int toIndex = Math.min(fromIndex + partitionSize, totalSize); + + if (fromIndex >= totalSize) { + break; + } + + List partitionImos = allImoNumbers.subList(fromIndex, toIndex); + ExecutionContext context = new ExecutionContext(); + context.putString("partitionImoNumbers", String.join(",", partitionImos)); + context.putInt("partitionIndex", i); + context.putInt("partitionSize", partitionImos.size()); + + String partitionKey = "partition" + i; + partitions.put(partitionKey, context); + + log.info("[ShipDetailPartitioner] {} : IMO {} 건 (index {}-{})", + partitionKey, partitionImos.size(), fromIndex, toIndex - 1); + } + + log.info("[ShipDetailPartitioner] 총 {} 개 파티션 생성 (전체 IMO: {} 건)", + partitions.size(), totalSize); + + return partitions; + } +} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java index cd9cedf..55af4d5 100644 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java @@ -2,18 +2,19 @@ package com.snp.batch.jobs.shipdetail.batch.config; import com.fasterxml.jackson.databind.ObjectMapper; import com.snp.batch.common.batch.config.BaseMultiStepJobConfig; -import org.springframework.batch.core.JobExecutionListener; +import com.snp.batch.global.repository.BatchFailedRecordRepository; import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailDto; import com.snp.batch.jobs.shipdetail.batch.entity.ShipDetailEntity; import com.snp.batch.jobs.shipdetail.batch.processor.ShipDetailDataProcessor; import com.snp.batch.jobs.shipdetail.batch.reader.ShipDetailUpdateDataReader; import com.snp.batch.jobs.shipdetail.batch.writer.ShipDetailDataWriter; -import com.snp.batch.global.repository.BatchFailedRecordRepository; import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import com.snp.batch.service.BatchFailedRecordService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobExecutionListener; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.job.builder.JobBuilder; @@ -22,20 +23,20 @@ import org.springframework.batch.core.job.flow.JobExecutionDecider; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.core.step.tasklet.Tasklet; -import org.springframework.batch.item.ItemProcessor; -import org.springframework.batch.item.ItemReader; -import org.springframework.batch.item.ItemWriter; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.TaskExecutor; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.web.reactive.function.client.WebClient; import java.sql.Timestamp; import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.Collections; import java.util.List; @Slf4j @@ -53,6 +54,7 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig { - if (stepExecution != null && stepExecution.getReadCount() == 0) { - log.info("[ShipDetailUpdateJob] Decider: EMPTY_RESPONSE - 응답 데이터 0건으로 LAST_EXECUTION 업데이트 스킵 (다음 실행 시 동일 범위 재조회)"); + int totalImoCount = jobExecution.getExecutionContext().getInt("totalImoCount", 0); + + if (totalImoCount == 0) { + log.info("[ShipDetailUpdateJob] Decider: EMPTY_RESPONSE - IMO 0건으로 LAST_EXECUTION 업데이트 스킵"); return new FlowExecutionStatus("EMPTY_RESPONSE"); } - log.info("[ShipDetailUpdateJob] Decider: NORMAL 모드 - LAST_EXECUTION 업데이트 진행"); + log.info("[ShipDetailUpdateJob] Decider: NORMAL - IMO {} 건 처리 시작", totalImoCount); return new FlowExecutionStatus("NORMAL"); }; } + // ======================================== + // Step 1: Partitioned Step (병렬 처리) + // ======================================== + + @Bean + @StepScope + public ShipDetailPartitioner shipDetailPartitioner( + @Value("#{jobExecutionContext['allImoNumbers']}") String allImoNumbersStr + ) { + List allImoNumbers = (allImoNumbersStr != null && !allImoNumbersStr.isBlank()) + ? Arrays.asList(allImoNumbersStr.split(",")) + : Collections.emptyList(); + return new ShipDetailPartitioner(allImoNumbers, partitionCount); + } + + @Bean(name = "ShipDetailUpdatePartitionedStep") + public Step shipDetailUpdatePartitionedStep() { + return new StepBuilder("ShipDetailUpdatePartitionedStep", jobRepository) + .partitioner("ShipDetailUpdateStep", shipDetailPartitioner(null)) + .step(shipDetailUpdateWorkerStep()) + .taskExecutor(batchPartitionExecutor) + .gridSize(partitionCount) + .build(); + } + + @Bean + public Step shipDetailUpdateWorkerStep() { + return new StepBuilder("ShipDetailUpdateStep", jobRepository) + .chunk(getChunkSize(), transactionManager) + .reader(shipDetailUpdateDataReader(null, null, null, null)) + .processor(shipDetailDataProcessor(null)) + .writer(shipDetailDataWriter) + .build(); + } + + // ======================================== + // Reader / Processor (StepScope) + // ======================================== + @Bean @StepScope public ShipDetailUpdateDataReader shipDetailUpdateDataReader( @Value("#{stepExecution.jobExecution.id}") Long jobExecutionId, @Value("#{stepExecution.id}") Long stepExecutionId, @Value("#{jobParameters['executionMode']}") String executionMode, - @Value("#{jobParameters['sourceStepExecutionId']}") String sourceStepExecutionIdParam + @Value("#{stepExecutionContext['partitionImoNumbers']}") String partitionImoNumbers ) { ShipDetailUpdateDataReader reader = new ShipDetailUpdateDataReader( maritimeApiWebClient, jdbcTemplate, objectMapper, @@ -163,36 +233,15 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig retryKeys = batchFailedRecordRepository.findFailedRecordKeysByStepExecutionId( - "ShipDetailUpdateJob", sourceStepExecutionId); - - if (!retryKeys.isEmpty()) { - reader.setRetryRecordKeys(retryKeys); - reader.setSourceStepExecutionId(sourceStepExecutionId); - log.info("[ShipDetailUpdateJob] Retry 모드 활성화: DB에서 {} 건의 실패 키 조회, sourceStepExecutionId: {}", - retryKeys.size(), sourceStepExecutionId); - } else { - log.warn("[ShipDetailUpdateJob] RECOLLECT 모드이나 DB에서 실패 키를 찾을 수 없음 (sourceStepExecutionId: {})", - sourceStepExecutionId); - } + if ("RECOLLECT".equals(executionMode)) { + reader.setRecollectMode(true); } return reader; } - @Override - protected ItemReader createReader() { // 타입 변경 - return shipDetailUpdateDataReader; - } - - @Override - protected ItemProcessor createProcessor() { - return shipDetailDataProcessor; - } @Bean @StepScope public ShipDetailDataProcessor shipDetailDataProcessor( @@ -201,29 +250,24 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig createWriter() { // 타입 변경 - return shipDetailDataWriter; - } - @Override protected int getChunkSize() { return 20; } + // ======================================== + // Job / Step Bean 등록 + // ======================================== + @Bean(name = "ShipDetailUpdateJob") public Job ShipDetailUpdateJob() { return job(); } - @Bean(name = "ShipDetailUpdateStep") - public Step ShipDetailUpdateStep() { - return step(); - } + // ======================================== + // Step 2: LastExecution 업데이트 + // ======================================== - /** - * 2단계: 모든 스텝 성공 시 배치 실행 로그(날짜) 업데이트 - */ @Bean public Tasklet shipDetailLastExecutionUpdateTasklet() { return (contribution, chunkContext) -> { @@ -257,6 +301,7 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig { // 실패 IMO 추적 private final List failedImoNumbers = new ArrayList<>(); private String lastErrorMessage; + private boolean afterFetchCompleted = false; - // Retry 모드 - private List retryRecordKeys; - private Long sourceStepExecutionId; + // 파티션 모드 + private String partitionImoNumbers; + private boolean recollectMode = false; public ShipDetailUpdateDataReader( WebClient webClient, @@ -73,16 +72,12 @@ public class ShipDetailUpdateDataReader extends BaseApiReader { enableChunkMode(); } - public void setRetryRecordKeys(List retryRecordKeys) { - this.retryRecordKeys = retryRecordKeys; + public void setPartitionImoNumbers(String partitionImoNumbers) { + this.partitionImoNumbers = partitionImoNumbers; } - public void setSourceStepExecutionId(Long sourceStepExecutionId) { - this.sourceStepExecutionId = sourceStepExecutionId; - } - - private boolean isRetryMode() { - return retryRecordKeys != null && !retryRecordKeys.isEmpty(); + public void setRecollectMode(boolean recollectMode) { + this.recollectMode = recollectMode; } @Override @@ -90,10 +85,6 @@ public class ShipDetailUpdateDataReader extends BaseApiReader { return "ShipDetailUpdateDataReader"; } - protected String getShipUpdateApiPath() { - return "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipChangesByLastUpdateDateRange"; - } - @Override protected String getApiPath() { return "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipsByIHSLRorIMONumbersAll"; @@ -109,25 +100,22 @@ public class ShipDetailUpdateDataReader extends BaseApiReader { this.allImoNumbers = null; this.failedImoNumbers.clear(); this.lastErrorMessage = null; - // retryRecordKeys는 JobConfig에서 주입하므로 초기화하지 않음 + this.afterFetchCompleted = false; } @Override protected void beforeFetch() { - if (isRetryMode()) { - log.info("[{}] [RETRY MODE] 실패 건 재수집 모드 시작 - 대상 IMO: {} 건", - getReaderName(), retryRecordKeys.size()); - allImoNumbers = new ArrayList<>(retryRecordKeys); - log.info("[{}] [RETRY MODE] IMO 목록: {}", getReaderName(), allImoNumbers); + // 파티션에서 할당받은 IMO 목록을 파싱 + if (partitionImoNumbers != null && !partitionImoNumbers.isBlank()) { + allImoNumbers = new ArrayList<>(Arrays.asList(partitionImoNumbers.split(","))); } else { - log.info("[{}] 변경된 IMO 번호 조회 시작...", getReaderName()); - ShipUpdateApiResponse response = callShipUpdateApi(); - allImoNumbers = extractUpdateImoNumbers(response); - log.info("[{}] 총 {} 개의 변경된 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size()); + allImoNumbers = Collections.emptyList(); } int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + log.info("[{}] 파티션 IMO {} 건 할당 (recollectMode: {})", + getReaderName(), allImoNumbers.size(), recollectMode); log.info("[{}] 설정: batch-size={}, delay-success={}ms, delay-failure={}ms, max-retry={}", getReaderName(), batchSize, delayOnSuccessMs, delayOnFailureMs, maxRetryCount); log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches); @@ -255,46 +243,31 @@ public class ShipDetailUpdateDataReader extends BaseApiReader { ); } - private ShipUpdateApiResponse callShipUpdateApi() { - Map params = batchDateService.getDateRangeWithoutTimeParams(getApiKey()); - return executeSingleApiCall( - maritimeApiUrl, - getShipUpdateApiPath(), - params, - new ParameterizedTypeReference() {}, - batchApiLogService, - res -> res.getShips() != null ? (long) res.getShips().size() : 0L - ); - } - - private List extractUpdateImoNumbers(ShipUpdateApiResponse response) { - if (response.getShips() == null) { - return Collections.emptyList(); - } - return response.getShips().stream() - .map(ShipDto::getImoNumber) - .filter(imoNumber -> imoNumber != null) - .collect(Collectors.toList()); - } - @Override protected void afterFetch(List data) { - int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + int totalBatches = allImoNumbers != null + ? (int) Math.ceil((double) allImoNumbers.size() / batchSize) : 0; try { - if (data == null) { + if (data == null && !afterFetchCompleted) { + afterFetchCompleted = true; log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches); log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료", - getReaderName(), allImoNumbers.size()); + getReaderName(), allImoNumbers != null ? allImoNumbers.size() : 0); - if (isRetryMode()) { - // Retry 모드: 성공 건 RESOLVED 처리 + 재실패 건 새 FAILED 레코드 저장 - log.info("[{}] [RETRY MODE] 재수집 결과 처리 시작 (sourceStepExecutionId: {})", - getReaderName(), sourceStepExecutionId); - batchFailedRecordService.resolveSuccessfulRetries( - "ShipDetailUpdateJob", sourceStepExecutionId, retryRecordKeys, failedImoNumbers); + if (recollectMode) { + // RECOLLECT 모드: 성공 건 RESOLVED 처리 + 재실패 건 새 FAILED 레코드 저장 + Long sourceJobExecutionId = getSourceJobExecutionId(); + log.info("[{}] [RECOLLECT] 재수집 결과 처리 시작 (sourceJobExecutionId: {})", + getReaderName(), sourceJobExecutionId); + + if (sourceJobExecutionId != null) { + batchFailedRecordService.resolveSuccessfulRetries( + "ShipDetailUpdateJob", sourceJobExecutionId, + allImoNumbers, failedImoNumbers); + } if (!failedImoNumbers.isEmpty()) { - log.warn("[{}] [RETRY MODE] 재실패 IMO 건수: {} 건", getReaderName(), failedImoNumbers.size()); + log.warn("[{}] [RECOLLECT] 재실패 IMO 건수: {} 건", getReaderName(), failedImoNumbers.size()); batchFailedRecordService.saveFailedRecords( "ShipDetailUpdateJob", getJobExecutionId(), @@ -304,11 +277,11 @@ public class ShipDetailUpdateDataReader extends BaseApiReader { lastErrorMessage ); } else { - log.info("[{}] [RETRY MODE] 모든 재수집 건 정상 처리 완료", getReaderName()); + log.info("[{}] [RECOLLECT] 모든 재수집 건 정상 처리 완료", getReaderName()); } - int successCount = retryRecordKeys.size() - failedImoNumbers.size(); - log.info("[{}] [RETRY MODE] 결과: 성공 {} 건, 재실패 {} 건", + int successCount = (allImoNumbers != null ? allImoNumbers.size() : 0) - failedImoNumbers.size(); + log.info("[{}] [RECOLLECT] 결과: 성공 {} 건, 재실패 {} 건", getReaderName(), successCount, failedImoNumbers.size()); } else { // 일반 모드: 동기 저장 (자동 재수집 트리거 전에 커밋 보장) @@ -333,11 +306,11 @@ public class ShipDetailUpdateDataReader extends BaseApiReader { log.error("[{}] afterFetch 처리 중 예외 발생: {}", getReaderName(), e.getMessage(), e); } finally { // 일반 모드에서만: 실패 건이 있으면 ExecutionContext에 저장 (자동 재수집 트리거용) - if (!isRetryMode() && !failedImoNumbers.isEmpty() && stepExecution != null) { + if (!recollectMode && !failedImoNumbers.isEmpty() && stepExecution != null) { try { String failedKeys = String.join(",", failedImoNumbers); stepExecution.getExecutionContext().putString("failedRecordKeys", failedKeys); - stepExecution.getExecutionContext().putLong("failedStepExecutionId", getStepExecutionId()); + stepExecution.getExecutionContext().putLong("failedJobExecutionId", getJobExecutionId()); stepExecution.getExecutionContext().putString("failedApiKey", getApiKey()); log.info("[{}] 자동 재수집 대상 실패 키 {} 건 ExecutionContext에 저장", getReaderName(), failedImoNumbers.size()); @@ -347,4 +320,18 @@ public class ShipDetailUpdateDataReader extends BaseApiReader { } } } + + /** + * JobParameter에서 sourceJobExecutionId를 조회 (RECOLLECT 모드용) + */ + private Long getSourceJobExecutionId() { + if (stepExecution != null) { + String param = stepExecution.getJobExecution() + .getJobParameters().getString("sourceJobExecutionId"); + if (param != null && !param.isBlank()) { + return Long.parseLong(param); + } + } + return null; + } } diff --git a/src/main/java/com/snp/batch/service/BatchFailedRecordService.java b/src/main/java/com/snp/batch/service/BatchFailedRecordService.java index 7d0cc34..27e1ed6 100644 --- a/src/main/java/com/snp/batch/service/BatchFailedRecordService.java +++ b/src/main/java/com/snp/batch/service/BatchFailedRecordService.java @@ -141,19 +141,19 @@ public class BatchFailedRecordService { /** * 재수집 성공 건을 RESOLVED로 처리합니다. - * 원본 stepExecutionId로 범위를 제한하여 해당 Step의 실패 건만 RESOLVED 처리합니다. + * 원본 jobExecutionId로 범위를 제한하여 해당 Job의 실패 건만 RESOLVED 처리합니다. */ @Transactional(propagation = Propagation.REQUIRES_NEW) - public void resolveSuccessfulRetries(String jobName, Long sourceStepExecutionId, + public void resolveSuccessfulRetries(String jobName, Long sourceJobExecutionId, List allRetryKeys, List failedAgainKeys) { List successfulKeys = allRetryKeys.stream() .filter(key -> !failedAgainKeys.contains(key)) .toList(); if (!successfulKeys.isEmpty()) { - int resolved = batchFailedRecordRepository.resolveByStepExecutionIdAndRecordKeys( - jobName, sourceStepExecutionId, successfulKeys, LocalDateTime.now()); - log.info("실패 레코드 RESOLVED 처리: {} 건 (job: {}, sourceStepExecutionId: {})", - resolved, jobName, sourceStepExecutionId); + int resolved = batchFailedRecordRepository.resolveByJobExecutionIdAndRecordKeys( + jobName, sourceJobExecutionId, successfulKeys, LocalDateTime.now()); + log.info("실패 레코드 RESOLVED 처리: {} 건 (job: {}, sourceJobExecutionId: {})", + resolved, jobName, sourceJobExecutionId); } } } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 2de9c77..c1d67eb 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -110,10 +110,11 @@ app: # ShipDetailUpdate 배치 설정 (dev - 기존과 동일하게 20건 유지) ship-detail-update: - batch-size: 20 # dev에서는 문제 없으므로 기존 20건 유지 + batch-size: 10 # dev에서는 문제 없으므로 기존 20건 유지 delay-on-success-ms: 300 delay-on-failure-ms: 2000 max-retry-count: 3 + partition-count: 4 # dev: 2개 파티션 # AIS Target 배치 설정 ais-target: diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index fe269bf..ed49c09 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -115,6 +115,7 @@ app: delay-on-success-ms: 300 # 성공 시 딜레이 (ms) delay-on-failure-ms: 2000 # 실패 시 딜레이 (ms) max-retry-count: 3 # 최대 재시도 횟수 + partition-count: 4 # prod: 4개 파티션 # AIS Target 배치 설정 ais-target: diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index afdee62..46b9dc9 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -172,6 +172,7 @@ app: delay-on-success-ms: 300 # 성공 시 딜레이 (ms) delay-on-failure-ms: 2000 # 실패 시 딜레이 (ms) max-retry-count: 3 # 최대 재시도 횟수 + partition-count: 4 # 병렬 파티션 수 # AIS Target Import 배치 설정 (캐시 업데이트 전용) ais-target: