feat(shipdetail): 선박제원정보 배치 작업 병렬화 (Partitioned Step)
- IMO 목록을 N개 파티션으로 분할하여 병렬 API 호출 - ImoFetchTasklet으로 IMO 조회 단계 분리 - sourceStepExecutionId → sourceJobExecutionId 마이그레이션 - afterFetch 중복 실행 방지 플래그 추가 - partition-count 설정 외부화 (dev:2, prod:4)
This commit is contained in:
부모
865bb95fc3
커밋
59ecdd851e
@ -30,7 +30,7 @@ import java.util.Set;
|
||||
public class AutoRetryJobExecutionListener implements JobExecutionListener {
|
||||
|
||||
private static final String FAILED_RECORD_KEYS = "failedRecordKeys";
|
||||
private static final String FAILED_STEP_EXECUTION_ID = "failedStepExecutionId";
|
||||
private static final String FAILED_JOB_EXECUTION_ID = "failedJobExecutionId";
|
||||
private static final String FAILED_API_KEY = "failedApiKey";
|
||||
private static final int MAX_AUTO_RETRY_COUNT = 3;
|
||||
|
||||
@ -61,7 +61,7 @@ public class AutoRetryJobExecutionListener implements JobExecutionListener {
|
||||
|
||||
// 모든 Step의 failedRecordKeys를 Set으로 병합 (중복 제거)
|
||||
Set<String> mergedKeys = new LinkedHashSet<>();
|
||||
Long sourceStepExecutionId = null;
|
||||
Long sourceJobExecutionId = jobExecution.getId();
|
||||
String apiKey = null;
|
||||
|
||||
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
|
||||
@ -77,12 +77,6 @@ public class AutoRetryJobExecutionListener implements JobExecutionListener {
|
||||
.filter(key -> !key.isBlank())
|
||||
.forEach(mergedKeys::add);
|
||||
|
||||
// sourceStepExecutionId: 마지막 Step 것 사용
|
||||
sourceStepExecutionId = stepExecution.getExecutionContext()
|
||||
.containsKey(FAILED_STEP_EXECUTION_ID)
|
||||
? stepExecution.getExecutionContext().getLong(FAILED_STEP_EXECUTION_ID)
|
||||
: stepExecution.getId();
|
||||
|
||||
// apiKey: non-null인 것 중 첫 번째 사용
|
||||
if (apiKey == null) {
|
||||
apiKey = stepExecution.getExecutionContext()
|
||||
@ -112,8 +106,8 @@ public class AutoRetryJobExecutionListener implements JobExecutionListener {
|
||||
log.info("[AutoRetry] {} Job 완료 후 실패 건 {}건 감지 → 자동 재수집 트리거",
|
||||
jobName, mergedKeys.size());
|
||||
|
||||
// sourceStepExecutionId 기반으로 1회만 triggerRetryAsync 호출 (실패 키는 DB에서 직접 조회)
|
||||
// sourceJobExecutionId 기반으로 1회만 triggerRetryAsync 호출 (실패 키는 DB에서 직접 조회)
|
||||
autoRetryTriggerService.triggerRetryAsync(
|
||||
jobName, mergedKeys.size(), sourceStepExecutionId, apiKey);
|
||||
jobName, mergedKeys.size(), sourceJobExecutionId, apiKey);
|
||||
}
|
||||
}
|
||||
|
||||
@ -31,7 +31,7 @@ public class AutoRetryTriggerService {
|
||||
|
||||
@Async("autoRetryExecutor")
|
||||
public void triggerRetryAsync(String jobName, int failedCount,
|
||||
Long sourceStepExecutionId, String apiKey) {
|
||||
Long sourceJobExecutionId, String apiKey) {
|
||||
try {
|
||||
Job job = jobMap.get(jobName);
|
||||
if (job == null) {
|
||||
@ -41,7 +41,7 @@ public class AutoRetryTriggerService {
|
||||
|
||||
JobParametersBuilder builder = new JobParametersBuilder()
|
||||
.addLong("timestamp", System.currentTimeMillis())
|
||||
.addString("sourceStepExecutionId", String.valueOf(sourceStepExecutionId))
|
||||
.addString("sourceJobExecutionId", String.valueOf(sourceJobExecutionId))
|
||||
.addString("executionMode", "RECOLLECT")
|
||||
.addString("reason", "자동 재수집 (실패 건 자동 처리)")
|
||||
.addString("executor", "AUTO_RETRY");
|
||||
@ -52,8 +52,8 @@ public class AutoRetryTriggerService {
|
||||
|
||||
JobParameters retryParams = builder.toJobParameters();
|
||||
|
||||
log.info("[AutoRetry] 재수집 Job 실행 시작: jobName={}, 실패건={}, sourceStepExecutionId={}",
|
||||
jobName, failedCount, sourceStepExecutionId);
|
||||
log.info("[AutoRetry] 재수집 Job 실행 시작: jobName={}, 실패건={}, sourceJobExecutionId={}",
|
||||
jobName, failedCount, sourceJobExecutionId);
|
||||
|
||||
JobExecution retryExecution = jobLauncher.run(job, retryParams);
|
||||
|
||||
|
||||
@ -5,6 +5,8 @@ import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.annotation.EnableAsync;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
@Configuration
|
||||
@ -36,4 +38,19 @@ public class AsyncConfig {
|
||||
executor.initialize();
|
||||
return executor;
|
||||
}
|
||||
|
||||
/**
|
||||
* 배치 파티션 병렬 실행 전용 Executor.
|
||||
* ShipDetailUpdate 파티셔닝 등 배치 Step 병렬 처리에 사용.
|
||||
*/
|
||||
@Bean(name = "batchPartitionExecutor")
|
||||
public TaskExecutor batchPartitionExecutor() {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
executor.setCorePoolSize(4); // 기본 파티션 수
|
||||
executor.setMaxPoolSize(8); // 최대 파티션 수
|
||||
executor.setQueueCapacity(20); // 대기 큐
|
||||
executor.setThreadNamePrefix("BatchPartition-");
|
||||
executor.initialize();
|
||||
return executor;
|
||||
}
|
||||
}
|
||||
@ -59,6 +59,19 @@ public interface BatchFailedRecordRepository extends JpaRepository<BatchFailedRe
|
||||
@Param("recordKeys") List<String> recordKeys,
|
||||
@Param("resolvedAt") LocalDateTime resolvedAt);
|
||||
|
||||
/**
|
||||
* 특정 Job 실행의 실패 레코드를 RESOLVED로 벌크 업데이트
|
||||
*/
|
||||
@Modifying
|
||||
@Query("UPDATE BatchFailedRecord r SET r.status = 'RESOLVED', r.resolvedAt = :resolvedAt " +
|
||||
"WHERE r.jobName = :jobName AND r.jobExecutionId = :jobExecutionId " +
|
||||
"AND r.recordKey IN :recordKeys AND r.status = 'FAILED'")
|
||||
int resolveByJobExecutionIdAndRecordKeys(
|
||||
@Param("jobName") String jobName,
|
||||
@Param("jobExecutionId") Long jobExecutionId,
|
||||
@Param("recordKeys") List<String> recordKeys,
|
||||
@Param("resolvedAt") LocalDateTime resolvedAt);
|
||||
|
||||
/**
|
||||
* ID 목록으로 FAILED 상태 레코드를 일괄 RESOLVED 처리
|
||||
*/
|
||||
@ -89,6 +102,17 @@ public interface BatchFailedRecordRepository extends JpaRepository<BatchFailedRe
|
||||
@Param("jobName") String jobName,
|
||||
@Param("stepExecutionId") Long stepExecutionId);
|
||||
|
||||
/**
|
||||
* 특정 Job 실행의 미해결(FAILED) 실패 레코드 키 목록 조회.
|
||||
* 자동 재수집 시 JobParameter 대신 DB에서 직접 조회하여 VARCHAR(2500) 제한을 우회.
|
||||
*/
|
||||
@Query("SELECT r.recordKey FROM BatchFailedRecord r " +
|
||||
"WHERE r.jobName = :jobName AND r.jobExecutionId = :jobExecutionId " +
|
||||
"AND r.status = 'FAILED'")
|
||||
List<String> findFailedRecordKeysByJobExecutionId(
|
||||
@Param("jobName") String jobName,
|
||||
@Param("jobExecutionId") Long jobExecutionId);
|
||||
|
||||
/**
|
||||
* 동일 Job에서 이미 FAILED 상태인 recordKey 목록 조회 (중복 방지용)
|
||||
*/
|
||||
|
||||
@ -0,0 +1,172 @@
|
||||
package com.snp.batch.jobs.shipdetail.batch.config;
|
||||
|
||||
import com.snp.batch.global.model.BatchApiLog;
|
||||
import com.snp.batch.global.repository.BatchFailedRecordRepository;
|
||||
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDto;
|
||||
import com.snp.batch.jobs.shipdetail.batch.dto.ShipUpdateApiResponse;
|
||||
import com.snp.batch.service.BatchApiLogService;
|
||||
import com.snp.batch.service.BatchDateService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.core.JobExecution;
|
||||
import org.springframework.batch.core.StepContribution;
|
||||
import org.springframework.batch.core.scope.context.ChunkContext;
|
||||
import org.springframework.batch.core.step.tasklet.Tasklet;
|
||||
import org.springframework.batch.repeat.RepeatStatus;
|
||||
import org.springframework.core.ParameterizedTypeReference;
|
||||
import org.springframework.util.LinkedMultiValueMap;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
import org.springframework.web.util.UriComponentsBuilder;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 선박제원정보 변경 IMO 목록 조회 Tasklet.
|
||||
* NORMAL 모드: Maritime API에서 변경된 IMO 번호 조회
|
||||
* RECOLLECT 모드: DB에서 실패 IMO 번호 조회
|
||||
* 조회 결과를 JobExecutionContext에 저장하여 Partitioner가 사용할 수 있게 함.
|
||||
*/
|
||||
@Slf4j
|
||||
public class ShipDetailImoFetchTasklet implements Tasklet {
|
||||
|
||||
private static final String API_KEY = "SHIP_DETAIL_UPDATE_API";
|
||||
private static final String SHIP_UPDATE_API_PATH = "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipChangesByLastUpdateDateRange";
|
||||
private static final String JOB_NAME = "ShipDetailUpdateJob";
|
||||
|
||||
private final WebClient maritimeApiWebClient;
|
||||
private final BatchDateService batchDateService;
|
||||
private final BatchApiLogService batchApiLogService;
|
||||
private final BatchFailedRecordRepository batchFailedRecordRepository;
|
||||
private final String maritimeApiUrl;
|
||||
|
||||
public ShipDetailImoFetchTasklet(
|
||||
WebClient maritimeApiWebClient,
|
||||
BatchDateService batchDateService,
|
||||
BatchApiLogService batchApiLogService,
|
||||
BatchFailedRecordRepository batchFailedRecordRepository,
|
||||
String maritimeApiUrl) {
|
||||
this.maritimeApiWebClient = maritimeApiWebClient;
|
||||
this.batchDateService = batchDateService;
|
||||
this.batchApiLogService = batchApiLogService;
|
||||
this.batchFailedRecordRepository = batchFailedRecordRepository;
|
||||
this.maritimeApiUrl = maritimeApiUrl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
|
||||
JobExecution jobExecution = chunkContext.getStepContext()
|
||||
.getStepExecution().getJobExecution();
|
||||
String executionMode = jobExecution.getJobParameters()
|
||||
.getString("executionMode", "NORMAL");
|
||||
|
||||
List<String> imoNumbers;
|
||||
|
||||
if ("RECOLLECT".equals(executionMode)) {
|
||||
imoNumbers = fetchRecollectImoNumbers(jobExecution);
|
||||
} else {
|
||||
imoNumbers = fetchChangedImoNumbers(jobExecution);
|
||||
}
|
||||
|
||||
// JobExecutionContext에 저장
|
||||
jobExecution.getExecutionContext().putInt("totalImoCount", imoNumbers.size());
|
||||
|
||||
if (!imoNumbers.isEmpty()) {
|
||||
jobExecution.getExecutionContext().putString("allImoNumbers", String.join(",", imoNumbers));
|
||||
}
|
||||
|
||||
log.info("[ShipDetailImoFetchTasklet] {} 모드: IMO {} 건 조회 완료",
|
||||
executionMode, imoNumbers.size());
|
||||
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
|
||||
private List<String> fetchRecollectImoNumbers(JobExecution jobExecution) {
|
||||
String sourceJobExecutionIdParam = jobExecution.getJobParameters()
|
||||
.getString("sourceJobExecutionId");
|
||||
|
||||
if (sourceJobExecutionIdParam == null || sourceJobExecutionIdParam.isBlank()) {
|
||||
log.warn("[ShipDetailImoFetchTasklet] RECOLLECT 모드이나 sourceJobExecutionId가 없음");
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
Long sourceJobExecutionId = Long.parseLong(sourceJobExecutionIdParam);
|
||||
List<String> retryKeys = batchFailedRecordRepository.findFailedRecordKeysByJobExecutionId(
|
||||
JOB_NAME, sourceJobExecutionId);
|
||||
|
||||
log.info("[ShipDetailImoFetchTasklet] RECOLLECT: DB에서 {} 건의 실패 키 조회 (sourceJobExecutionId: {})",
|
||||
retryKeys.size(), sourceJobExecutionId);
|
||||
|
||||
return retryKeys;
|
||||
}
|
||||
|
||||
private List<String> fetchChangedImoNumbers(JobExecution jobExecution) {
|
||||
Map<String, String> params = batchDateService.getDateRangeWithoutTimeParams(API_KEY);
|
||||
|
||||
MultiValueMap<String, String> multiValueParams = new LinkedMultiValueMap<>();
|
||||
params.forEach((key, value) ->
|
||||
multiValueParams.put(key, Collections.singletonList(value)));
|
||||
|
||||
String fullUri = UriComponentsBuilder.fromHttpUrl(maritimeApiUrl)
|
||||
.path(SHIP_UPDATE_API_PATH)
|
||||
.queryParams(multiValueParams)
|
||||
.build()
|
||||
.toUriString();
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
int statusCode = 200;
|
||||
String errorMessage = null;
|
||||
Long responseSize = 0L;
|
||||
|
||||
try {
|
||||
log.info("[ShipDetailImoFetchTasklet] 변경 IMO 조회 API 호출: {}", fullUri);
|
||||
|
||||
ShipUpdateApiResponse response = maritimeApiWebClient.get()
|
||||
.uri(uriBuilder -> {
|
||||
uriBuilder.path(SHIP_UPDATE_API_PATH);
|
||||
params.forEach(uriBuilder::queryParam);
|
||||
return uriBuilder.build();
|
||||
})
|
||||
.retrieve()
|
||||
.bodyToMono(new ParameterizedTypeReference<ShipUpdateApiResponse>() {})
|
||||
.block();
|
||||
|
||||
if (response == null || response.getShips() == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
List<String> imoNumbers = response.getShips().stream()
|
||||
.map(ShipDto::getImoNumber)
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
responseSize = (long) imoNumbers.size();
|
||||
return imoNumbers;
|
||||
|
||||
} catch (Exception e) {
|
||||
statusCode = 500;
|
||||
errorMessage = e.getMessage();
|
||||
log.error("[ShipDetailImoFetchTasklet] API 호출 실패: {}", e.getMessage(), e);
|
||||
throw e;
|
||||
} finally {
|
||||
long duration = System.currentTimeMillis() - startTime;
|
||||
batchApiLogService.saveLog(BatchApiLog.builder()
|
||||
.apiRequestLocation("ShipDetailImoFetchTasklet")
|
||||
.jobExecutionId(jobExecution.getId())
|
||||
.stepExecutionId(jobExecution.getStepExecutions().stream()
|
||||
.findFirst().map(se -> se.getId()).orElse(null))
|
||||
.requestUri(fullUri)
|
||||
.httpMethod("GET")
|
||||
.statusCode(statusCode)
|
||||
.responseTimeMs(duration)
|
||||
.responseCount(responseSize)
|
||||
.errorMessage(errorMessage)
|
||||
.createdAt(LocalDateTime.now())
|
||||
.build());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,61 @@
|
||||
package com.snp.batch.jobs.shipdetail.batch.config;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.core.partition.support.Partitioner;
|
||||
import org.springframework.batch.item.ExecutionContext;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 선박제원정보 IMO 목록을 N개 파티션으로 분할하는 Partitioner.
|
||||
* JobExecutionContext의 allImoNumbers를 읽어 파티션별 ExecutionContext에 할당.
|
||||
*/
|
||||
@Slf4j
|
||||
public class ShipDetailPartitioner implements Partitioner {
|
||||
|
||||
private final List<String> allImoNumbers;
|
||||
private final int partitionCount;
|
||||
|
||||
public ShipDetailPartitioner(List<String> allImoNumbers, int partitionCount) {
|
||||
this.allImoNumbers = allImoNumbers;
|
||||
this.partitionCount = partitionCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, ExecutionContext> partition(int gridSize) {
|
||||
int totalSize = allImoNumbers.size();
|
||||
// 실제 파티션 수: IMO 수가 적으면 파티션 수를 줄임
|
||||
int actualPartitionCount = Math.min(partitionCount, Math.max(1, totalSize));
|
||||
|
||||
Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
|
||||
int partitionSize = (int) Math.ceil((double) totalSize / actualPartitionCount);
|
||||
|
||||
for (int i = 0; i < actualPartitionCount; i++) {
|
||||
int fromIndex = i * partitionSize;
|
||||
int toIndex = Math.min(fromIndex + partitionSize, totalSize);
|
||||
|
||||
if (fromIndex >= totalSize) {
|
||||
break;
|
||||
}
|
||||
|
||||
List<String> partitionImos = allImoNumbers.subList(fromIndex, toIndex);
|
||||
ExecutionContext context = new ExecutionContext();
|
||||
context.putString("partitionImoNumbers", String.join(",", partitionImos));
|
||||
context.putInt("partitionIndex", i);
|
||||
context.putInt("partitionSize", partitionImos.size());
|
||||
|
||||
String partitionKey = "partition" + i;
|
||||
partitions.put(partitionKey, context);
|
||||
|
||||
log.info("[ShipDetailPartitioner] {} : IMO {} 건 (index {}-{})",
|
||||
partitionKey, partitionImos.size(), fromIndex, toIndex - 1);
|
||||
}
|
||||
|
||||
log.info("[ShipDetailPartitioner] 총 {} 개 파티션 생성 (전체 IMO: {} 건)",
|
||||
partitions.size(), totalSize);
|
||||
|
||||
return partitions;
|
||||
}
|
||||
}
|
||||
@ -2,18 +2,19 @@ package com.snp.batch.jobs.shipdetail.batch.config;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.snp.batch.common.batch.config.BaseMultiStepJobConfig;
|
||||
import org.springframework.batch.core.JobExecutionListener;
|
||||
import com.snp.batch.global.repository.BatchFailedRecordRepository;
|
||||
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailDto;
|
||||
import com.snp.batch.jobs.shipdetail.batch.entity.ShipDetailEntity;
|
||||
import com.snp.batch.jobs.shipdetail.batch.processor.ShipDetailDataProcessor;
|
||||
import com.snp.batch.jobs.shipdetail.batch.reader.ShipDetailUpdateDataReader;
|
||||
import com.snp.batch.jobs.shipdetail.batch.writer.ShipDetailDataWriter;
|
||||
import com.snp.batch.global.repository.BatchFailedRecordRepository;
|
||||
import com.snp.batch.service.BatchApiLogService;
|
||||
import com.snp.batch.service.BatchDateService;
|
||||
import com.snp.batch.service.BatchFailedRecordService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.JobExecution;
|
||||
import org.springframework.batch.core.JobExecutionListener;
|
||||
import org.springframework.batch.core.Step;
|
||||
import org.springframework.batch.core.configuration.annotation.StepScope;
|
||||
import org.springframework.batch.core.job.builder.JobBuilder;
|
||||
@ -22,20 +23,20 @@ import org.springframework.batch.core.job.flow.JobExecutionDecider;
|
||||
import org.springframework.batch.core.repository.JobRepository;
|
||||
import org.springframework.batch.core.step.builder.StepBuilder;
|
||||
import org.springframework.batch.core.step.tasklet.Tasklet;
|
||||
import org.springframework.batch.item.ItemProcessor;
|
||||
import org.springframework.batch.item.ItemReader;
|
||||
import org.springframework.batch.item.ItemWriter;
|
||||
import org.springframework.batch.repeat.RepeatStatus;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@ -53,6 +54,7 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
|
||||
private final BatchFailedRecordService batchFailedRecordService;
|
||||
private final BatchFailedRecordRepository batchFailedRecordRepository;
|
||||
private final JobExecutionListener autoRetryJobExecutionListener;
|
||||
private final TaskExecutor batchPartitionExecutor;
|
||||
|
||||
@Value("${app.batch.ship-api.url}")
|
||||
private String maritimeApiUrl;
|
||||
@ -75,8 +77,10 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
|
||||
@Value("${app.batch.last-execution-buffer-hours:24}")
|
||||
private int lastExecutionBufferHours;
|
||||
|
||||
protected String getApiKey() {return "SHIP_DETAIL_UPDATE_API";}
|
||||
@Value("${app.batch.ship-detail-update.partition-count:4}")
|
||||
private int partitionCount;
|
||||
|
||||
protected String getApiKey() {return "SHIP_DETAIL_UPDATE_API";}
|
||||
|
||||
public ShipDetailUpdateJobConfig(
|
||||
JobRepository jobRepository,
|
||||
@ -91,7 +95,8 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
|
||||
BatchApiLogService batchApiLogService,
|
||||
BatchFailedRecordService batchFailedRecordService,
|
||||
BatchFailedRecordRepository batchFailedRecordRepository,
|
||||
@Qualifier("autoRetryJobExecutionListener") JobExecutionListener autoRetryJobExecutionListener) {
|
||||
@Qualifier("autoRetryJobExecutionListener") JobExecutionListener autoRetryJobExecutionListener,
|
||||
@Qualifier("batchPartitionExecutor") TaskExecutor batchPartitionExecutor) {
|
||||
super(jobRepository, transactionManager);
|
||||
this.shipDetailDataProcessor = shipDetailDataProcessor;
|
||||
this.shipDetailDataWriter = shipDetailDataWriter;
|
||||
@ -104,6 +109,7 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
|
||||
this.batchFailedRecordService = batchFailedRecordService;
|
||||
this.batchFailedRecordRepository = batchFailedRecordRepository;
|
||||
this.autoRetryJobExecutionListener = autoRetryJobExecutionListener;
|
||||
this.batchPartitionExecutor = batchPartitionExecutor;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -121,41 +127,105 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
|
||||
return "ShipDetailUpdateStep";
|
||||
}
|
||||
|
||||
// ========================================
|
||||
// Job Flow 정의
|
||||
// ========================================
|
||||
|
||||
@Override
|
||||
protected Job createJobFlow(JobBuilder jobBuilder) {
|
||||
return jobBuilder
|
||||
.start(ShipDetailUpdateStep())
|
||||
.next(emptyResponseDecider())
|
||||
.start(shipDetailImoFetchStep())
|
||||
.next(imoCountDecider())
|
||||
.on("EMPTY_RESPONSE").end()
|
||||
.from(emptyResponseDecider()).on("NORMAL").to(shipDetailLastExecutionUpdateStep())
|
||||
.from(imoCountDecider()).on("NORMAL").to(shipDetailUpdatePartitionedStep())
|
||||
.next(shipDetailLastExecutionUpdateStep())
|
||||
.end()
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 응답 데이터 건수 판별 Decider
|
||||
* 응답 0건이면 EMPTY_RESPONSE, 아니면 NORMAL 반환
|
||||
*/
|
||||
// ========================================
|
||||
// Step 0: IMO 목록 조회
|
||||
// ========================================
|
||||
|
||||
@Bean
|
||||
public JobExecutionDecider emptyResponseDecider() {
|
||||
public Tasklet shipDetailImoFetchTasklet() {
|
||||
return new ShipDetailImoFetchTasklet(
|
||||
maritimeApiWebClient, batchDateService, batchApiLogService,
|
||||
batchFailedRecordRepository, maritimeApiUrl);
|
||||
}
|
||||
|
||||
@Bean(name = "ShipDetailImoFetchStep")
|
||||
public Step shipDetailImoFetchStep() {
|
||||
return new StepBuilder("ShipDetailImoFetchStep", jobRepository)
|
||||
.tasklet(shipDetailImoFetchTasklet(), transactionManager)
|
||||
.build();
|
||||
}
|
||||
|
||||
// ========================================
|
||||
// Decider: IMO 건수 확인
|
||||
// ========================================
|
||||
|
||||
@Bean
|
||||
public JobExecutionDecider imoCountDecider() {
|
||||
return (jobExecution, stepExecution) -> {
|
||||
if (stepExecution != null && stepExecution.getReadCount() == 0) {
|
||||
log.info("[ShipDetailUpdateJob] Decider: EMPTY_RESPONSE - 응답 데이터 0건으로 LAST_EXECUTION 업데이트 스킵 (다음 실행 시 동일 범위 재조회)");
|
||||
int totalImoCount = jobExecution.getExecutionContext().getInt("totalImoCount", 0);
|
||||
|
||||
if (totalImoCount == 0) {
|
||||
log.info("[ShipDetailUpdateJob] Decider: EMPTY_RESPONSE - IMO 0건으로 LAST_EXECUTION 업데이트 스킵");
|
||||
return new FlowExecutionStatus("EMPTY_RESPONSE");
|
||||
}
|
||||
|
||||
log.info("[ShipDetailUpdateJob] Decider: NORMAL 모드 - LAST_EXECUTION 업데이트 진행");
|
||||
log.info("[ShipDetailUpdateJob] Decider: NORMAL - IMO {} 건 처리 시작", totalImoCount);
|
||||
return new FlowExecutionStatus("NORMAL");
|
||||
};
|
||||
}
|
||||
|
||||
// ========================================
|
||||
// Step 1: Partitioned Step (병렬 처리)
|
||||
// ========================================
|
||||
|
||||
@Bean
|
||||
@StepScope
|
||||
public ShipDetailPartitioner shipDetailPartitioner(
|
||||
@Value("#{jobExecutionContext['allImoNumbers']}") String allImoNumbersStr
|
||||
) {
|
||||
List<String> allImoNumbers = (allImoNumbersStr != null && !allImoNumbersStr.isBlank())
|
||||
? Arrays.asList(allImoNumbersStr.split(","))
|
||||
: Collections.emptyList();
|
||||
return new ShipDetailPartitioner(allImoNumbers, partitionCount);
|
||||
}
|
||||
|
||||
@Bean(name = "ShipDetailUpdatePartitionedStep")
|
||||
public Step shipDetailUpdatePartitionedStep() {
|
||||
return new StepBuilder("ShipDetailUpdatePartitionedStep", jobRepository)
|
||||
.partitioner("ShipDetailUpdateStep", shipDetailPartitioner(null))
|
||||
.step(shipDetailUpdateWorkerStep())
|
||||
.taskExecutor(batchPartitionExecutor)
|
||||
.gridSize(partitionCount)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Step shipDetailUpdateWorkerStep() {
|
||||
return new StepBuilder("ShipDetailUpdateStep", jobRepository)
|
||||
.<ShipDetailDto, ShipDetailEntity>chunk(getChunkSize(), transactionManager)
|
||||
.reader(shipDetailUpdateDataReader(null, null, null, null))
|
||||
.processor(shipDetailDataProcessor(null))
|
||||
.writer(shipDetailDataWriter)
|
||||
.build();
|
||||
}
|
||||
|
||||
// ========================================
|
||||
// Reader / Processor (StepScope)
|
||||
// ========================================
|
||||
|
||||
@Bean
|
||||
@StepScope
|
||||
public ShipDetailUpdateDataReader shipDetailUpdateDataReader(
|
||||
@Value("#{stepExecution.jobExecution.id}") Long jobExecutionId,
|
||||
@Value("#{stepExecution.id}") Long stepExecutionId,
|
||||
@Value("#{jobParameters['executionMode']}") String executionMode,
|
||||
@Value("#{jobParameters['sourceStepExecutionId']}") String sourceStepExecutionIdParam
|
||||
@Value("#{stepExecutionContext['partitionImoNumbers']}") String partitionImoNumbers
|
||||
) {
|
||||
ShipDetailUpdateDataReader reader = new ShipDetailUpdateDataReader(
|
||||
maritimeApiWebClient, jdbcTemplate, objectMapper,
|
||||
@ -163,36 +233,15 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
|
||||
shipDetailBatchSize, delayOnSuccessMs, delayOnFailureMs, maxRetryCount
|
||||
);
|
||||
reader.setExecutionIds(jobExecutionId, stepExecutionId);
|
||||
reader.setPartitionImoNumbers(partitionImoNumbers);
|
||||
|
||||
// RECOLLECT 모드: DB에서 실패 키를 직접 조회하여 주입
|
||||
if ("RECOLLECT".equals(executionMode) && sourceStepExecutionIdParam != null && !sourceStepExecutionIdParam.isBlank()) {
|
||||
Long sourceStepExecutionId = Long.parseLong(sourceStepExecutionIdParam);
|
||||
List<String> retryKeys = batchFailedRecordRepository.findFailedRecordKeysByStepExecutionId(
|
||||
"ShipDetailUpdateJob", sourceStepExecutionId);
|
||||
|
||||
if (!retryKeys.isEmpty()) {
|
||||
reader.setRetryRecordKeys(retryKeys);
|
||||
reader.setSourceStepExecutionId(sourceStepExecutionId);
|
||||
log.info("[ShipDetailUpdateJob] Retry 모드 활성화: DB에서 {} 건의 실패 키 조회, sourceStepExecutionId: {}",
|
||||
retryKeys.size(), sourceStepExecutionId);
|
||||
} else {
|
||||
log.warn("[ShipDetailUpdateJob] RECOLLECT 모드이나 DB에서 실패 키를 찾을 수 없음 (sourceStepExecutionId: {})",
|
||||
sourceStepExecutionId);
|
||||
}
|
||||
if ("RECOLLECT".equals(executionMode)) {
|
||||
reader.setRecollectMode(true);
|
||||
}
|
||||
|
||||
return reader;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ItemReader<ShipDetailDto> createReader() { // 타입 변경
|
||||
return shipDetailUpdateDataReader;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ItemProcessor<ShipDetailDto, ShipDetailEntity> createProcessor() {
|
||||
return shipDetailDataProcessor;
|
||||
}
|
||||
@Bean
|
||||
@StepScope
|
||||
public ShipDetailDataProcessor shipDetailDataProcessor(
|
||||
@ -201,29 +250,24 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
|
||||
return new ShipDetailDataProcessor(jobExecutionId);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ItemWriter<ShipDetailEntity> createWriter() { // 타입 변경
|
||||
return shipDetailDataWriter;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getChunkSize() {
|
||||
return 20;
|
||||
}
|
||||
|
||||
// ========================================
|
||||
// Job / Step Bean 등록
|
||||
// ========================================
|
||||
|
||||
@Bean(name = "ShipDetailUpdateJob")
|
||||
public Job ShipDetailUpdateJob() {
|
||||
return job();
|
||||
}
|
||||
|
||||
@Bean(name = "ShipDetailUpdateStep")
|
||||
public Step ShipDetailUpdateStep() {
|
||||
return step();
|
||||
}
|
||||
// ========================================
|
||||
// Step 2: LastExecution 업데이트
|
||||
// ========================================
|
||||
|
||||
/**
|
||||
* 2단계: 모든 스텝 성공 시 배치 실행 로그(날짜) 업데이트
|
||||
*/
|
||||
@Bean
|
||||
public Tasklet shipDetailLastExecutionUpdateTasklet() {
|
||||
return (contribution, chunkContext) -> {
|
||||
@ -257,6 +301,7 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
|
||||
return RepeatStatus.FINISHED;
|
||||
};
|
||||
}
|
||||
|
||||
@Bean(name = "ShipDetailLastExecutionUpdateStep")
|
||||
public Step shipDetailLastExecutionUpdateStep() {
|
||||
return new StepBuilder("ShipDetailLastExecutionUpdateStep", jobRepository)
|
||||
|
||||
@ -4,9 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.snp.batch.common.batch.reader.BaseApiReader;
|
||||
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailApiResponse;
|
||||
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailDto;
|
||||
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDto;
|
||||
import com.snp.batch.jobs.shipdetail.batch.dto.ShipResultDto;
|
||||
import com.snp.batch.jobs.shipdetail.batch.dto.ShipUpdateApiResponse;
|
||||
import com.snp.batch.service.BatchApiLogService;
|
||||
import com.snp.batch.service.BatchDateService;
|
||||
import com.snp.batch.service.BatchFailedRecordService;
|
||||
@ -41,10 +39,11 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
|
||||
// 실패 IMO 추적
|
||||
private final List<String> failedImoNumbers = new ArrayList<>();
|
||||
private String lastErrorMessage;
|
||||
private boolean afterFetchCompleted = false;
|
||||
|
||||
// Retry 모드
|
||||
private List<String> retryRecordKeys;
|
||||
private Long sourceStepExecutionId;
|
||||
// 파티션 모드
|
||||
private String partitionImoNumbers;
|
||||
private boolean recollectMode = false;
|
||||
|
||||
public ShipDetailUpdateDataReader(
|
||||
WebClient webClient,
|
||||
@ -73,16 +72,12 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
|
||||
enableChunkMode();
|
||||
}
|
||||
|
||||
public void setRetryRecordKeys(List<String> retryRecordKeys) {
|
||||
this.retryRecordKeys = retryRecordKeys;
|
||||
public void setPartitionImoNumbers(String partitionImoNumbers) {
|
||||
this.partitionImoNumbers = partitionImoNumbers;
|
||||
}
|
||||
|
||||
public void setSourceStepExecutionId(Long sourceStepExecutionId) {
|
||||
this.sourceStepExecutionId = sourceStepExecutionId;
|
||||
}
|
||||
|
||||
private boolean isRetryMode() {
|
||||
return retryRecordKeys != null && !retryRecordKeys.isEmpty();
|
||||
public void setRecollectMode(boolean recollectMode) {
|
||||
this.recollectMode = recollectMode;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -90,10 +85,6 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
|
||||
return "ShipDetailUpdateDataReader";
|
||||
}
|
||||
|
||||
protected String getShipUpdateApiPath() {
|
||||
return "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipChangesByLastUpdateDateRange";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getApiPath() {
|
||||
return "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipsByIHSLRorIMONumbersAll";
|
||||
@ -109,25 +100,22 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
|
||||
this.allImoNumbers = null;
|
||||
this.failedImoNumbers.clear();
|
||||
this.lastErrorMessage = null;
|
||||
// retryRecordKeys는 JobConfig에서 주입하므로 초기화하지 않음
|
||||
this.afterFetchCompleted = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void beforeFetch() {
|
||||
if (isRetryMode()) {
|
||||
log.info("[{}] [RETRY MODE] 실패 건 재수집 모드 시작 - 대상 IMO: {} 건",
|
||||
getReaderName(), retryRecordKeys.size());
|
||||
allImoNumbers = new ArrayList<>(retryRecordKeys);
|
||||
log.info("[{}] [RETRY MODE] IMO 목록: {}", getReaderName(), allImoNumbers);
|
||||
// 파티션에서 할당받은 IMO 목록을 파싱
|
||||
if (partitionImoNumbers != null && !partitionImoNumbers.isBlank()) {
|
||||
allImoNumbers = new ArrayList<>(Arrays.asList(partitionImoNumbers.split(",")));
|
||||
} else {
|
||||
log.info("[{}] 변경된 IMO 번호 조회 시작...", getReaderName());
|
||||
ShipUpdateApiResponse response = callShipUpdateApi();
|
||||
allImoNumbers = extractUpdateImoNumbers(response);
|
||||
log.info("[{}] 총 {} 개의 변경된 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
|
||||
allImoNumbers = Collections.emptyList();
|
||||
}
|
||||
|
||||
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
|
||||
|
||||
log.info("[{}] 파티션 IMO {} 건 할당 (recollectMode: {})",
|
||||
getReaderName(), allImoNumbers.size(), recollectMode);
|
||||
log.info("[{}] 설정: batch-size={}, delay-success={}ms, delay-failure={}ms, max-retry={}",
|
||||
getReaderName(), batchSize, delayOnSuccessMs, delayOnFailureMs, maxRetryCount);
|
||||
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
|
||||
@ -255,46 +243,31 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
|
||||
);
|
||||
}
|
||||
|
||||
private ShipUpdateApiResponse callShipUpdateApi() {
|
||||
Map<String, String> params = batchDateService.getDateRangeWithoutTimeParams(getApiKey());
|
||||
return executeSingleApiCall(
|
||||
maritimeApiUrl,
|
||||
getShipUpdateApiPath(),
|
||||
params,
|
||||
new ParameterizedTypeReference<ShipUpdateApiResponse>() {},
|
||||
batchApiLogService,
|
||||
res -> res.getShips() != null ? (long) res.getShips().size() : 0L
|
||||
);
|
||||
}
|
||||
|
||||
private List<String> extractUpdateImoNumbers(ShipUpdateApiResponse response) {
|
||||
if (response.getShips() == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return response.getShips().stream()
|
||||
.map(ShipDto::getImoNumber)
|
||||
.filter(imoNumber -> imoNumber != null)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void afterFetch(List<ShipDetailDto> data) {
|
||||
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
|
||||
int totalBatches = allImoNumbers != null
|
||||
? (int) Math.ceil((double) allImoNumbers.size() / batchSize) : 0;
|
||||
try {
|
||||
if (data == null) {
|
||||
if (data == null && !afterFetchCompleted) {
|
||||
afterFetchCompleted = true;
|
||||
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
|
||||
log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
|
||||
getReaderName(), allImoNumbers.size());
|
||||
getReaderName(), allImoNumbers != null ? allImoNumbers.size() : 0);
|
||||
|
||||
if (isRetryMode()) {
|
||||
// Retry 모드: 성공 건 RESOLVED 처리 + 재실패 건 새 FAILED 레코드 저장
|
||||
log.info("[{}] [RETRY MODE] 재수집 결과 처리 시작 (sourceStepExecutionId: {})",
|
||||
getReaderName(), sourceStepExecutionId);
|
||||
batchFailedRecordService.resolveSuccessfulRetries(
|
||||
"ShipDetailUpdateJob", sourceStepExecutionId, retryRecordKeys, failedImoNumbers);
|
||||
if (recollectMode) {
|
||||
// RECOLLECT 모드: 성공 건 RESOLVED 처리 + 재실패 건 새 FAILED 레코드 저장
|
||||
Long sourceJobExecutionId = getSourceJobExecutionId();
|
||||
log.info("[{}] [RECOLLECT] 재수집 결과 처리 시작 (sourceJobExecutionId: {})",
|
||||
getReaderName(), sourceJobExecutionId);
|
||||
|
||||
if (sourceJobExecutionId != null) {
|
||||
batchFailedRecordService.resolveSuccessfulRetries(
|
||||
"ShipDetailUpdateJob", sourceJobExecutionId,
|
||||
allImoNumbers, failedImoNumbers);
|
||||
}
|
||||
|
||||
if (!failedImoNumbers.isEmpty()) {
|
||||
log.warn("[{}] [RETRY MODE] 재실패 IMO 건수: {} 건", getReaderName(), failedImoNumbers.size());
|
||||
log.warn("[{}] [RECOLLECT] 재실패 IMO 건수: {} 건", getReaderName(), failedImoNumbers.size());
|
||||
batchFailedRecordService.saveFailedRecords(
|
||||
"ShipDetailUpdateJob",
|
||||
getJobExecutionId(),
|
||||
@ -304,11 +277,11 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
|
||||
lastErrorMessage
|
||||
);
|
||||
} else {
|
||||
log.info("[{}] [RETRY MODE] 모든 재수집 건 정상 처리 완료", getReaderName());
|
||||
log.info("[{}] [RECOLLECT] 모든 재수집 건 정상 처리 완료", getReaderName());
|
||||
}
|
||||
|
||||
int successCount = retryRecordKeys.size() - failedImoNumbers.size();
|
||||
log.info("[{}] [RETRY MODE] 결과: 성공 {} 건, 재실패 {} 건",
|
||||
int successCount = (allImoNumbers != null ? allImoNumbers.size() : 0) - failedImoNumbers.size();
|
||||
log.info("[{}] [RECOLLECT] 결과: 성공 {} 건, 재실패 {} 건",
|
||||
getReaderName(), successCount, failedImoNumbers.size());
|
||||
} else {
|
||||
// 일반 모드: 동기 저장 (자동 재수집 트리거 전에 커밋 보장)
|
||||
@ -333,11 +306,11 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
|
||||
log.error("[{}] afterFetch 처리 중 예외 발생: {}", getReaderName(), e.getMessage(), e);
|
||||
} finally {
|
||||
// 일반 모드에서만: 실패 건이 있으면 ExecutionContext에 저장 (자동 재수집 트리거용)
|
||||
if (!isRetryMode() && !failedImoNumbers.isEmpty() && stepExecution != null) {
|
||||
if (!recollectMode && !failedImoNumbers.isEmpty() && stepExecution != null) {
|
||||
try {
|
||||
String failedKeys = String.join(",", failedImoNumbers);
|
||||
stepExecution.getExecutionContext().putString("failedRecordKeys", failedKeys);
|
||||
stepExecution.getExecutionContext().putLong("failedStepExecutionId", getStepExecutionId());
|
||||
stepExecution.getExecutionContext().putLong("failedJobExecutionId", getJobExecutionId());
|
||||
stepExecution.getExecutionContext().putString("failedApiKey", getApiKey());
|
||||
log.info("[{}] 자동 재수집 대상 실패 키 {} 건 ExecutionContext에 저장",
|
||||
getReaderName(), failedImoNumbers.size());
|
||||
@ -347,4 +320,18 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* JobParameter에서 sourceJobExecutionId를 조회 (RECOLLECT 모드용)
|
||||
*/
|
||||
private Long getSourceJobExecutionId() {
|
||||
if (stepExecution != null) {
|
||||
String param = stepExecution.getJobExecution()
|
||||
.getJobParameters().getString("sourceJobExecutionId");
|
||||
if (param != null && !param.isBlank()) {
|
||||
return Long.parseLong(param);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@ -141,19 +141,19 @@ public class BatchFailedRecordService {
|
||||
|
||||
/**
|
||||
* 재수집 성공 건을 RESOLVED로 처리합니다.
|
||||
* 원본 stepExecutionId로 범위를 제한하여 해당 Step의 실패 건만 RESOLVED 처리합니다.
|
||||
* 원본 jobExecutionId로 범위를 제한하여 해당 Job의 실패 건만 RESOLVED 처리합니다.
|
||||
*/
|
||||
@Transactional(propagation = Propagation.REQUIRES_NEW)
|
||||
public void resolveSuccessfulRetries(String jobName, Long sourceStepExecutionId,
|
||||
public void resolveSuccessfulRetries(String jobName, Long sourceJobExecutionId,
|
||||
List<String> allRetryKeys, List<String> failedAgainKeys) {
|
||||
List<String> successfulKeys = allRetryKeys.stream()
|
||||
.filter(key -> !failedAgainKeys.contains(key))
|
||||
.toList();
|
||||
if (!successfulKeys.isEmpty()) {
|
||||
int resolved = batchFailedRecordRepository.resolveByStepExecutionIdAndRecordKeys(
|
||||
jobName, sourceStepExecutionId, successfulKeys, LocalDateTime.now());
|
||||
log.info("실패 레코드 RESOLVED 처리: {} 건 (job: {}, sourceStepExecutionId: {})",
|
||||
resolved, jobName, sourceStepExecutionId);
|
||||
int resolved = batchFailedRecordRepository.resolveByJobExecutionIdAndRecordKeys(
|
||||
jobName, sourceJobExecutionId, successfulKeys, LocalDateTime.now());
|
||||
log.info("실패 레코드 RESOLVED 처리: {} 건 (job: {}, sourceJobExecutionId: {})",
|
||||
resolved, jobName, sourceJobExecutionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -110,10 +110,11 @@ app:
|
||||
|
||||
# ShipDetailUpdate 배치 설정 (dev - 기존과 동일하게 20건 유지)
|
||||
ship-detail-update:
|
||||
batch-size: 20 # dev에서는 문제 없으므로 기존 20건 유지
|
||||
batch-size: 10 # dev에서는 문제 없으므로 기존 20건 유지
|
||||
delay-on-success-ms: 300
|
||||
delay-on-failure-ms: 2000
|
||||
max-retry-count: 3
|
||||
partition-count: 4 # dev: 2개 파티션
|
||||
|
||||
# AIS Target 배치 설정
|
||||
ais-target:
|
||||
|
||||
@ -115,6 +115,7 @@ app:
|
||||
delay-on-success-ms: 300 # 성공 시 딜레이 (ms)
|
||||
delay-on-failure-ms: 2000 # 실패 시 딜레이 (ms)
|
||||
max-retry-count: 3 # 최대 재시도 횟수
|
||||
partition-count: 4 # prod: 4개 파티션
|
||||
|
||||
# AIS Target 배치 설정
|
||||
ais-target:
|
||||
|
||||
@ -172,6 +172,7 @@ app:
|
||||
delay-on-success-ms: 300 # 성공 시 딜레이 (ms)
|
||||
delay-on-failure-ms: 2000 # 실패 시 딜레이 (ms)
|
||||
max-retry-count: 3 # 최대 재시도 횟수
|
||||
partition-count: 4 # 병렬 파티션 수
|
||||
|
||||
# AIS Target Import 배치 설정 (캐시 업데이트 전용)
|
||||
ais-target:
|
||||
|
||||
불러오는 중...
Reference in New Issue
Block a user