feat(shipdetail): 선박제원정보 수집 배치 작업 병렬화 및 최적화 #70
@ -35,6 +35,7 @@
|
|||||||
- 스케줄 화면 검색/정렬/필터 기능 추가 및 UI 구조 개선 (#54)
|
- 스케줄 화면 검색/정렬/필터 기능 추가 및 UI 구조 개선 (#54)
|
||||||
- 재수집 이력 화면 개선: 배치 실행일시 추가, 작업명 잘림 해소, CSV 내보내기 제거 (#55)
|
- 재수집 이력 화면 개선: 배치 실행일시 추가, 작업명 잘림 해소, CSV 내보내기 제거 (#55)
|
||||||
- AIS API 응답 스트리밍 처리로 메모리 버퍼 제한 우회 (DataBufferLimitException 근본 해결)
|
- AIS API 응답 스트리밍 처리로 메모리 버퍼 제한 우회 (DataBufferLimitException 근본 해결)
|
||||||
|
- 선박제원정보 수집 배치 작업 병렬화 (Partitioned Step) (#64)
|
||||||
|
|
||||||
### 수정
|
### 수정
|
||||||
- 자동 재수집 JobParameter 오버플로우 수정 (VARCHAR 2500 제한 해결)
|
- 자동 재수집 JobParameter 오버플로우 수정 (VARCHAR 2500 제한 해결)
|
||||||
@ -53,8 +54,10 @@
|
|||||||
- CronPreview step=0 무한루프 방지
|
- CronPreview step=0 무한루프 방지
|
||||||
- AIS WebClient 버퍼 제한 50MB→100MB 확대 및 타임아웃 설정 추가 (DataBufferLimitException 해결)
|
- AIS WebClient 버퍼 제한 50MB→100MB 확대 및 타임아웃 설정 추가 (DataBufferLimitException 해결)
|
||||||
- ShipDetailUpdateDataReader beforeFetch에서 allImoNumbers 미할당으로 인한 NPE 수정
|
- ShipDetailUpdateDataReader beforeFetch에서 allImoNumbers 미할당으로 인한 NPE 수정
|
||||||
|
- 재수집 모드 afterFetch 중복 실행으로 인한 실패 레코드 중복 INSERT 수정 (#64)
|
||||||
|
|
||||||
### 변경
|
### 변경
|
||||||
|
- 재수집 실패건 추적 기준 sourceStepExecutionId → sourceJobExecutionId로 변경 (#64)
|
||||||
- 실패 레코드 Upsert 패턴 적용 (동일 키 중복 방지)
|
- 실패 레코드 Upsert 패턴 적용 (동일 키 중복 방지)
|
||||||
- 재시도 상태 배지 표시 (대기/재시도 N/3/재시도 초과)
|
- 재시도 상태 배지 표시 (대기/재시도 N/3/재시도 초과)
|
||||||
- 미사용 Dead Code 정리 (~1,200 LOC 삭제)
|
- 미사용 Dead Code 정리 (~1,200 LOC 삭제)
|
||||||
|
|||||||
@ -30,7 +30,7 @@ import java.util.Set;
|
|||||||
public class AutoRetryJobExecutionListener implements JobExecutionListener {
|
public class AutoRetryJobExecutionListener implements JobExecutionListener {
|
||||||
|
|
||||||
private static final String FAILED_RECORD_KEYS = "failedRecordKeys";
|
private static final String FAILED_RECORD_KEYS = "failedRecordKeys";
|
||||||
private static final String FAILED_STEP_EXECUTION_ID = "failedStepExecutionId";
|
private static final String FAILED_JOB_EXECUTION_ID = "failedJobExecutionId";
|
||||||
private static final String FAILED_API_KEY = "failedApiKey";
|
private static final String FAILED_API_KEY = "failedApiKey";
|
||||||
private static final int MAX_AUTO_RETRY_COUNT = 3;
|
private static final int MAX_AUTO_RETRY_COUNT = 3;
|
||||||
|
|
||||||
@ -61,7 +61,7 @@ public class AutoRetryJobExecutionListener implements JobExecutionListener {
|
|||||||
|
|
||||||
// 모든 Step의 failedRecordKeys를 Set으로 병합 (중복 제거)
|
// 모든 Step의 failedRecordKeys를 Set으로 병합 (중복 제거)
|
||||||
Set<String> mergedKeys = new LinkedHashSet<>();
|
Set<String> mergedKeys = new LinkedHashSet<>();
|
||||||
Long sourceStepExecutionId = null;
|
Long sourceJobExecutionId = jobExecution.getId();
|
||||||
String apiKey = null;
|
String apiKey = null;
|
||||||
|
|
||||||
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
|
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
|
||||||
@ -77,12 +77,6 @@ public class AutoRetryJobExecutionListener implements JobExecutionListener {
|
|||||||
.filter(key -> !key.isBlank())
|
.filter(key -> !key.isBlank())
|
||||||
.forEach(mergedKeys::add);
|
.forEach(mergedKeys::add);
|
||||||
|
|
||||||
// sourceStepExecutionId: 마지막 Step 것 사용
|
|
||||||
sourceStepExecutionId = stepExecution.getExecutionContext()
|
|
||||||
.containsKey(FAILED_STEP_EXECUTION_ID)
|
|
||||||
? stepExecution.getExecutionContext().getLong(FAILED_STEP_EXECUTION_ID)
|
|
||||||
: stepExecution.getId();
|
|
||||||
|
|
||||||
// apiKey: non-null인 것 중 첫 번째 사용
|
// apiKey: non-null인 것 중 첫 번째 사용
|
||||||
if (apiKey == null) {
|
if (apiKey == null) {
|
||||||
apiKey = stepExecution.getExecutionContext()
|
apiKey = stepExecution.getExecutionContext()
|
||||||
@ -112,8 +106,8 @@ public class AutoRetryJobExecutionListener implements JobExecutionListener {
|
|||||||
log.info("[AutoRetry] {} Job 완료 후 실패 건 {}건 감지 → 자동 재수집 트리거",
|
log.info("[AutoRetry] {} Job 완료 후 실패 건 {}건 감지 → 자동 재수집 트리거",
|
||||||
jobName, mergedKeys.size());
|
jobName, mergedKeys.size());
|
||||||
|
|
||||||
// sourceStepExecutionId 기반으로 1회만 triggerRetryAsync 호출 (실패 키는 DB에서 직접 조회)
|
// sourceJobExecutionId 기반으로 1회만 triggerRetryAsync 호출 (실패 키는 DB에서 직접 조회)
|
||||||
autoRetryTriggerService.triggerRetryAsync(
|
autoRetryTriggerService.triggerRetryAsync(
|
||||||
jobName, mergedKeys.size(), sourceStepExecutionId, apiKey);
|
jobName, mergedKeys.size(), sourceJobExecutionId, apiKey);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -31,7 +31,7 @@ public class AutoRetryTriggerService {
|
|||||||
|
|
||||||
@Async("autoRetryExecutor")
|
@Async("autoRetryExecutor")
|
||||||
public void triggerRetryAsync(String jobName, int failedCount,
|
public void triggerRetryAsync(String jobName, int failedCount,
|
||||||
Long sourceStepExecutionId, String apiKey) {
|
Long sourceJobExecutionId, String apiKey) {
|
||||||
try {
|
try {
|
||||||
Job job = jobMap.get(jobName);
|
Job job = jobMap.get(jobName);
|
||||||
if (job == null) {
|
if (job == null) {
|
||||||
@ -41,7 +41,7 @@ public class AutoRetryTriggerService {
|
|||||||
|
|
||||||
JobParametersBuilder builder = new JobParametersBuilder()
|
JobParametersBuilder builder = new JobParametersBuilder()
|
||||||
.addLong("timestamp", System.currentTimeMillis())
|
.addLong("timestamp", System.currentTimeMillis())
|
||||||
.addString("sourceStepExecutionId", String.valueOf(sourceStepExecutionId))
|
.addString("sourceJobExecutionId", String.valueOf(sourceJobExecutionId))
|
||||||
.addString("executionMode", "RECOLLECT")
|
.addString("executionMode", "RECOLLECT")
|
||||||
.addString("reason", "자동 재수집 (실패 건 자동 처리)")
|
.addString("reason", "자동 재수집 (실패 건 자동 처리)")
|
||||||
.addString("executor", "AUTO_RETRY");
|
.addString("executor", "AUTO_RETRY");
|
||||||
@ -52,8 +52,8 @@ public class AutoRetryTriggerService {
|
|||||||
|
|
||||||
JobParameters retryParams = builder.toJobParameters();
|
JobParameters retryParams = builder.toJobParameters();
|
||||||
|
|
||||||
log.info("[AutoRetry] 재수집 Job 실행 시작: jobName={}, 실패건={}, sourceStepExecutionId={}",
|
log.info("[AutoRetry] 재수집 Job 실행 시작: jobName={}, 실패건={}, sourceJobExecutionId={}",
|
||||||
jobName, failedCount, sourceStepExecutionId);
|
jobName, failedCount, sourceJobExecutionId);
|
||||||
|
|
||||||
JobExecution retryExecution = jobLauncher.run(job, retryParams);
|
JobExecution retryExecution = jobLauncher.run(job, retryParams);
|
||||||
|
|
||||||
|
|||||||
@ -5,6 +5,8 @@ import org.springframework.context.annotation.Configuration;
|
|||||||
import org.springframework.scheduling.annotation.EnableAsync;
|
import org.springframework.scheduling.annotation.EnableAsync;
|
||||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
|
|
||||||
|
import org.springframework.core.task.TaskExecutor;
|
||||||
|
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
@ -36,4 +38,19 @@ public class AsyncConfig {
|
|||||||
executor.initialize();
|
executor.initialize();
|
||||||
return executor;
|
return executor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 배치 파티션 병렬 실행 전용 Executor.
|
||||||
|
* ShipDetailUpdate 파티셔닝 등 배치 Step 병렬 처리에 사용.
|
||||||
|
*/
|
||||||
|
@Bean(name = "batchPartitionExecutor")
|
||||||
|
public TaskExecutor batchPartitionExecutor() {
|
||||||
|
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||||
|
executor.setCorePoolSize(4); // 기본 파티션 수
|
||||||
|
executor.setMaxPoolSize(8); // 최대 파티션 수
|
||||||
|
executor.setQueueCapacity(20); // 대기 큐
|
||||||
|
executor.setThreadNamePrefix("BatchPartition-");
|
||||||
|
executor.initialize();
|
||||||
|
return executor;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@ -59,6 +59,19 @@ public interface BatchFailedRecordRepository extends JpaRepository<BatchFailedRe
|
|||||||
@Param("recordKeys") List<String> recordKeys,
|
@Param("recordKeys") List<String> recordKeys,
|
||||||
@Param("resolvedAt") LocalDateTime resolvedAt);
|
@Param("resolvedAt") LocalDateTime resolvedAt);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 특정 Job 실행의 실패 레코드를 RESOLVED로 벌크 업데이트
|
||||||
|
*/
|
||||||
|
@Modifying
|
||||||
|
@Query("UPDATE BatchFailedRecord r SET r.status = 'RESOLVED', r.resolvedAt = :resolvedAt " +
|
||||||
|
"WHERE r.jobName = :jobName AND r.jobExecutionId = :jobExecutionId " +
|
||||||
|
"AND r.recordKey IN :recordKeys AND r.status = 'FAILED'")
|
||||||
|
int resolveByJobExecutionIdAndRecordKeys(
|
||||||
|
@Param("jobName") String jobName,
|
||||||
|
@Param("jobExecutionId") Long jobExecutionId,
|
||||||
|
@Param("recordKeys") List<String> recordKeys,
|
||||||
|
@Param("resolvedAt") LocalDateTime resolvedAt);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ID 목록으로 FAILED 상태 레코드를 일괄 RESOLVED 처리
|
* ID 목록으로 FAILED 상태 레코드를 일괄 RESOLVED 처리
|
||||||
*/
|
*/
|
||||||
@ -89,6 +102,17 @@ public interface BatchFailedRecordRepository extends JpaRepository<BatchFailedRe
|
|||||||
@Param("jobName") String jobName,
|
@Param("jobName") String jobName,
|
||||||
@Param("stepExecutionId") Long stepExecutionId);
|
@Param("stepExecutionId") Long stepExecutionId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 특정 Job 실행의 미해결(FAILED) 실패 레코드 키 목록 조회.
|
||||||
|
* 자동 재수집 시 JobParameter 대신 DB에서 직접 조회하여 VARCHAR(2500) 제한을 우회.
|
||||||
|
*/
|
||||||
|
@Query("SELECT r.recordKey FROM BatchFailedRecord r " +
|
||||||
|
"WHERE r.jobName = :jobName AND r.jobExecutionId = :jobExecutionId " +
|
||||||
|
"AND r.status = 'FAILED'")
|
||||||
|
List<String> findFailedRecordKeysByJobExecutionId(
|
||||||
|
@Param("jobName") String jobName,
|
||||||
|
@Param("jobExecutionId") Long jobExecutionId);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 동일 Job에서 이미 FAILED 상태인 recordKey 목록 조회 (중복 방지용)
|
* 동일 Job에서 이미 FAILED 상태인 recordKey 목록 조회 (중복 방지용)
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -0,0 +1,172 @@
|
|||||||
|
package com.snp.batch.jobs.shipdetail.batch.config;
|
||||||
|
|
||||||
|
import com.snp.batch.global.model.BatchApiLog;
|
||||||
|
import com.snp.batch.global.repository.BatchFailedRecordRepository;
|
||||||
|
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDto;
|
||||||
|
import com.snp.batch.jobs.shipdetail.batch.dto.ShipUpdateApiResponse;
|
||||||
|
import com.snp.batch.service.BatchApiLogService;
|
||||||
|
import com.snp.batch.service.BatchDateService;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.batch.core.JobExecution;
|
||||||
|
import org.springframework.batch.core.StepContribution;
|
||||||
|
import org.springframework.batch.core.scope.context.ChunkContext;
|
||||||
|
import org.springframework.batch.core.step.tasklet.Tasklet;
|
||||||
|
import org.springframework.batch.repeat.RepeatStatus;
|
||||||
|
import org.springframework.core.ParameterizedTypeReference;
|
||||||
|
import org.springframework.util.LinkedMultiValueMap;
|
||||||
|
import org.springframework.util.MultiValueMap;
|
||||||
|
import org.springframework.web.reactive.function.client.WebClient;
|
||||||
|
import org.springframework.web.util.UriComponentsBuilder;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 선박제원정보 변경 IMO 목록 조회 Tasklet.
|
||||||
|
* NORMAL 모드: Maritime API에서 변경된 IMO 번호 조회
|
||||||
|
* RECOLLECT 모드: DB에서 실패 IMO 번호 조회
|
||||||
|
* 조회 결과를 JobExecutionContext에 저장하여 Partitioner가 사용할 수 있게 함.
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class ShipDetailImoFetchTasklet implements Tasklet {
|
||||||
|
|
||||||
|
private static final String API_KEY = "SHIP_DETAIL_UPDATE_API";
|
||||||
|
private static final String SHIP_UPDATE_API_PATH = "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipChangesByLastUpdateDateRange";
|
||||||
|
private static final String JOB_NAME = "ShipDetailUpdateJob";
|
||||||
|
|
||||||
|
private final WebClient maritimeApiWebClient;
|
||||||
|
private final BatchDateService batchDateService;
|
||||||
|
private final BatchApiLogService batchApiLogService;
|
||||||
|
private final BatchFailedRecordRepository batchFailedRecordRepository;
|
||||||
|
private final String maritimeApiUrl;
|
||||||
|
|
||||||
|
public ShipDetailImoFetchTasklet(
|
||||||
|
WebClient maritimeApiWebClient,
|
||||||
|
BatchDateService batchDateService,
|
||||||
|
BatchApiLogService batchApiLogService,
|
||||||
|
BatchFailedRecordRepository batchFailedRecordRepository,
|
||||||
|
String maritimeApiUrl) {
|
||||||
|
this.maritimeApiWebClient = maritimeApiWebClient;
|
||||||
|
this.batchDateService = batchDateService;
|
||||||
|
this.batchApiLogService = batchApiLogService;
|
||||||
|
this.batchFailedRecordRepository = batchFailedRecordRepository;
|
||||||
|
this.maritimeApiUrl = maritimeApiUrl;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
|
||||||
|
JobExecution jobExecution = chunkContext.getStepContext()
|
||||||
|
.getStepExecution().getJobExecution();
|
||||||
|
String executionMode = jobExecution.getJobParameters()
|
||||||
|
.getString("executionMode", "NORMAL");
|
||||||
|
|
||||||
|
List<String> imoNumbers;
|
||||||
|
|
||||||
|
if ("RECOLLECT".equals(executionMode)) {
|
||||||
|
imoNumbers = fetchRecollectImoNumbers(jobExecution);
|
||||||
|
} else {
|
||||||
|
imoNumbers = fetchChangedImoNumbers(jobExecution);
|
||||||
|
}
|
||||||
|
|
||||||
|
// JobExecutionContext에 저장
|
||||||
|
jobExecution.getExecutionContext().putInt("totalImoCount", imoNumbers.size());
|
||||||
|
|
||||||
|
if (!imoNumbers.isEmpty()) {
|
||||||
|
jobExecution.getExecutionContext().putString("allImoNumbers", String.join(",", imoNumbers));
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("[ShipDetailImoFetchTasklet] {} 모드: IMO {} 건 조회 완료",
|
||||||
|
executionMode, imoNumbers.size());
|
||||||
|
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<String> fetchRecollectImoNumbers(JobExecution jobExecution) {
|
||||||
|
String sourceJobExecutionIdParam = jobExecution.getJobParameters()
|
||||||
|
.getString("sourceJobExecutionId");
|
||||||
|
|
||||||
|
if (sourceJobExecutionIdParam == null || sourceJobExecutionIdParam.isBlank()) {
|
||||||
|
log.warn("[ShipDetailImoFetchTasklet] RECOLLECT 모드이나 sourceJobExecutionId가 없음");
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
Long sourceJobExecutionId = Long.parseLong(sourceJobExecutionIdParam);
|
||||||
|
List<String> retryKeys = batchFailedRecordRepository.findFailedRecordKeysByJobExecutionId(
|
||||||
|
JOB_NAME, sourceJobExecutionId);
|
||||||
|
|
||||||
|
log.info("[ShipDetailImoFetchTasklet] RECOLLECT: DB에서 {} 건의 실패 키 조회 (sourceJobExecutionId: {})",
|
||||||
|
retryKeys.size(), sourceJobExecutionId);
|
||||||
|
|
||||||
|
return retryKeys;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<String> fetchChangedImoNumbers(JobExecution jobExecution) {
|
||||||
|
Map<String, String> params = batchDateService.getDateRangeWithoutTimeParams(API_KEY);
|
||||||
|
|
||||||
|
MultiValueMap<String, String> multiValueParams = new LinkedMultiValueMap<>();
|
||||||
|
params.forEach((key, value) ->
|
||||||
|
multiValueParams.put(key, Collections.singletonList(value)));
|
||||||
|
|
||||||
|
String fullUri = UriComponentsBuilder.fromHttpUrl(maritimeApiUrl)
|
||||||
|
.path(SHIP_UPDATE_API_PATH)
|
||||||
|
.queryParams(multiValueParams)
|
||||||
|
.build()
|
||||||
|
.toUriString();
|
||||||
|
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
int statusCode = 200;
|
||||||
|
String errorMessage = null;
|
||||||
|
Long responseSize = 0L;
|
||||||
|
|
||||||
|
try {
|
||||||
|
log.info("[ShipDetailImoFetchTasklet] 변경 IMO 조회 API 호출: {}", fullUri);
|
||||||
|
|
||||||
|
ShipUpdateApiResponse response = maritimeApiWebClient.get()
|
||||||
|
.uri(uriBuilder -> {
|
||||||
|
uriBuilder.path(SHIP_UPDATE_API_PATH);
|
||||||
|
params.forEach(uriBuilder::queryParam);
|
||||||
|
return uriBuilder.build();
|
||||||
|
})
|
||||||
|
.retrieve()
|
||||||
|
.bodyToMono(new ParameterizedTypeReference<ShipUpdateApiResponse>() {})
|
||||||
|
.block();
|
||||||
|
|
||||||
|
if (response == null || response.getShips() == null) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
List<String> imoNumbers = response.getShips().stream()
|
||||||
|
.map(ShipDto::getImoNumber)
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
responseSize = (long) imoNumbers.size();
|
||||||
|
return imoNumbers;
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
statusCode = 500;
|
||||||
|
errorMessage = e.getMessage();
|
||||||
|
log.error("[ShipDetailImoFetchTasklet] API 호출 실패: {}", e.getMessage(), e);
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
long duration = System.currentTimeMillis() - startTime;
|
||||||
|
batchApiLogService.saveLog(BatchApiLog.builder()
|
||||||
|
.apiRequestLocation("ShipDetailImoFetchTasklet")
|
||||||
|
.jobExecutionId(jobExecution.getId())
|
||||||
|
.stepExecutionId(jobExecution.getStepExecutions().stream()
|
||||||
|
.findFirst().map(se -> se.getId()).orElse(null))
|
||||||
|
.requestUri(fullUri)
|
||||||
|
.httpMethod("GET")
|
||||||
|
.statusCode(statusCode)
|
||||||
|
.responseTimeMs(duration)
|
||||||
|
.responseCount(responseSize)
|
||||||
|
.errorMessage(errorMessage)
|
||||||
|
.createdAt(LocalDateTime.now())
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,61 @@
|
|||||||
|
package com.snp.batch.jobs.shipdetail.batch.config;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.batch.core.partition.support.Partitioner;
|
||||||
|
import org.springframework.batch.item.ExecutionContext;
|
||||||
|
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 선박제원정보 IMO 목록을 N개 파티션으로 분할하는 Partitioner.
|
||||||
|
* JobExecutionContext의 allImoNumbers를 읽어 파티션별 ExecutionContext에 할당.
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class ShipDetailPartitioner implements Partitioner {
|
||||||
|
|
||||||
|
private final List<String> allImoNumbers;
|
||||||
|
private final int partitionCount;
|
||||||
|
|
||||||
|
public ShipDetailPartitioner(List<String> allImoNumbers, int partitionCount) {
|
||||||
|
this.allImoNumbers = allImoNumbers;
|
||||||
|
this.partitionCount = partitionCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, ExecutionContext> partition(int gridSize) {
|
||||||
|
int totalSize = allImoNumbers.size();
|
||||||
|
// 실제 파티션 수: IMO 수가 적으면 파티션 수를 줄임
|
||||||
|
int actualPartitionCount = Math.min(partitionCount, Math.max(1, totalSize));
|
||||||
|
|
||||||
|
Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
|
||||||
|
int partitionSize = (int) Math.ceil((double) totalSize / actualPartitionCount);
|
||||||
|
|
||||||
|
for (int i = 0; i < actualPartitionCount; i++) {
|
||||||
|
int fromIndex = i * partitionSize;
|
||||||
|
int toIndex = Math.min(fromIndex + partitionSize, totalSize);
|
||||||
|
|
||||||
|
if (fromIndex >= totalSize) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<String> partitionImos = allImoNumbers.subList(fromIndex, toIndex);
|
||||||
|
ExecutionContext context = new ExecutionContext();
|
||||||
|
context.putString("partitionImoNumbers", String.join(",", partitionImos));
|
||||||
|
context.putInt("partitionIndex", i);
|
||||||
|
context.putInt("partitionSize", partitionImos.size());
|
||||||
|
|
||||||
|
String partitionKey = "partition" + i;
|
||||||
|
partitions.put(partitionKey, context);
|
||||||
|
|
||||||
|
log.info("[ShipDetailPartitioner] {} : IMO {} 건 (index {}-{})",
|
||||||
|
partitionKey, partitionImos.size(), fromIndex, toIndex - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("[ShipDetailPartitioner] 총 {} 개 파티션 생성 (전체 IMO: {} 건)",
|
||||||
|
partitions.size(), totalSize);
|
||||||
|
|
||||||
|
return partitions;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -2,18 +2,19 @@ package com.snp.batch.jobs.shipdetail.batch.config;
|
|||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.snp.batch.common.batch.config.BaseMultiStepJobConfig;
|
import com.snp.batch.common.batch.config.BaseMultiStepJobConfig;
|
||||||
import org.springframework.batch.core.JobExecutionListener;
|
import com.snp.batch.global.repository.BatchFailedRecordRepository;
|
||||||
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailDto;
|
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailDto;
|
||||||
import com.snp.batch.jobs.shipdetail.batch.entity.ShipDetailEntity;
|
import com.snp.batch.jobs.shipdetail.batch.entity.ShipDetailEntity;
|
||||||
import com.snp.batch.jobs.shipdetail.batch.processor.ShipDetailDataProcessor;
|
import com.snp.batch.jobs.shipdetail.batch.processor.ShipDetailDataProcessor;
|
||||||
import com.snp.batch.jobs.shipdetail.batch.reader.ShipDetailUpdateDataReader;
|
import com.snp.batch.jobs.shipdetail.batch.reader.ShipDetailUpdateDataReader;
|
||||||
import com.snp.batch.jobs.shipdetail.batch.writer.ShipDetailDataWriter;
|
import com.snp.batch.jobs.shipdetail.batch.writer.ShipDetailDataWriter;
|
||||||
import com.snp.batch.global.repository.BatchFailedRecordRepository;
|
|
||||||
import com.snp.batch.service.BatchApiLogService;
|
import com.snp.batch.service.BatchApiLogService;
|
||||||
import com.snp.batch.service.BatchDateService;
|
import com.snp.batch.service.BatchDateService;
|
||||||
import com.snp.batch.service.BatchFailedRecordService;
|
import com.snp.batch.service.BatchFailedRecordService;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.batch.core.Job;
|
import org.springframework.batch.core.Job;
|
||||||
|
import org.springframework.batch.core.JobExecution;
|
||||||
|
import org.springframework.batch.core.JobExecutionListener;
|
||||||
import org.springframework.batch.core.Step;
|
import org.springframework.batch.core.Step;
|
||||||
import org.springframework.batch.core.configuration.annotation.StepScope;
|
import org.springframework.batch.core.configuration.annotation.StepScope;
|
||||||
import org.springframework.batch.core.job.builder.JobBuilder;
|
import org.springframework.batch.core.job.builder.JobBuilder;
|
||||||
@ -22,20 +23,20 @@ import org.springframework.batch.core.job.flow.JobExecutionDecider;
|
|||||||
import org.springframework.batch.core.repository.JobRepository;
|
import org.springframework.batch.core.repository.JobRepository;
|
||||||
import org.springframework.batch.core.step.builder.StepBuilder;
|
import org.springframework.batch.core.step.builder.StepBuilder;
|
||||||
import org.springframework.batch.core.step.tasklet.Tasklet;
|
import org.springframework.batch.core.step.tasklet.Tasklet;
|
||||||
import org.springframework.batch.item.ItemProcessor;
|
|
||||||
import org.springframework.batch.item.ItemReader;
|
|
||||||
import org.springframework.batch.item.ItemWriter;
|
|
||||||
import org.springframework.batch.repeat.RepeatStatus;
|
import org.springframework.batch.repeat.RepeatStatus;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.core.task.TaskExecutor;
|
||||||
import org.springframework.jdbc.core.JdbcTemplate;
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
import org.springframework.transaction.PlatformTransactionManager;
|
import org.springframework.transaction.PlatformTransactionManager;
|
||||||
import org.springframework.web.reactive.function.client.WebClient;
|
import org.springframework.web.reactive.function.client.WebClient;
|
||||||
|
|
||||||
import java.sql.Timestamp;
|
import java.sql.Timestamp;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ -53,6 +54,7 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
|
|||||||
private final BatchFailedRecordService batchFailedRecordService;
|
private final BatchFailedRecordService batchFailedRecordService;
|
||||||
private final BatchFailedRecordRepository batchFailedRecordRepository;
|
private final BatchFailedRecordRepository batchFailedRecordRepository;
|
||||||
private final JobExecutionListener autoRetryJobExecutionListener;
|
private final JobExecutionListener autoRetryJobExecutionListener;
|
||||||
|
private final TaskExecutor batchPartitionExecutor;
|
||||||
|
|
||||||
@Value("${app.batch.ship-api.url}")
|
@Value("${app.batch.ship-api.url}")
|
||||||
private String maritimeApiUrl;
|
private String maritimeApiUrl;
|
||||||
@ -75,8 +77,10 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
|
|||||||
@Value("${app.batch.last-execution-buffer-hours:24}")
|
@Value("${app.batch.last-execution-buffer-hours:24}")
|
||||||
private int lastExecutionBufferHours;
|
private int lastExecutionBufferHours;
|
||||||
|
|
||||||
protected String getApiKey() {return "SHIP_DETAIL_UPDATE_API";}
|
@Value("${app.batch.ship-detail-update.partition-count:4}")
|
||||||
|
private int partitionCount;
|
||||||
|
|
||||||
|
protected String getApiKey() {return "SHIP_DETAIL_UPDATE_API";}
|
||||||
|
|
||||||
public ShipDetailUpdateJobConfig(
|
public ShipDetailUpdateJobConfig(
|
||||||
JobRepository jobRepository,
|
JobRepository jobRepository,
|
||||||
@ -91,7 +95,8 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
|
|||||||
BatchApiLogService batchApiLogService,
|
BatchApiLogService batchApiLogService,
|
||||||
BatchFailedRecordService batchFailedRecordService,
|
BatchFailedRecordService batchFailedRecordService,
|
||||||
BatchFailedRecordRepository batchFailedRecordRepository,
|
BatchFailedRecordRepository batchFailedRecordRepository,
|
||||||
@Qualifier("autoRetryJobExecutionListener") JobExecutionListener autoRetryJobExecutionListener) {
|
@Qualifier("autoRetryJobExecutionListener") JobExecutionListener autoRetryJobExecutionListener,
|
||||||
|
@Qualifier("batchPartitionExecutor") TaskExecutor batchPartitionExecutor) {
|
||||||
super(jobRepository, transactionManager);
|
super(jobRepository, transactionManager);
|
||||||
this.shipDetailDataProcessor = shipDetailDataProcessor;
|
this.shipDetailDataProcessor = shipDetailDataProcessor;
|
||||||
this.shipDetailDataWriter = shipDetailDataWriter;
|
this.shipDetailDataWriter = shipDetailDataWriter;
|
||||||
@ -104,6 +109,7 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
|
|||||||
this.batchFailedRecordService = batchFailedRecordService;
|
this.batchFailedRecordService = batchFailedRecordService;
|
||||||
this.batchFailedRecordRepository = batchFailedRecordRepository;
|
this.batchFailedRecordRepository = batchFailedRecordRepository;
|
||||||
this.autoRetryJobExecutionListener = autoRetryJobExecutionListener;
|
this.autoRetryJobExecutionListener = autoRetryJobExecutionListener;
|
||||||
|
this.batchPartitionExecutor = batchPartitionExecutor;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -121,41 +127,105 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
|
|||||||
return "ShipDetailUpdateStep";
|
return "ShipDetailUpdateStep";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ========================================
|
||||||
|
// Job Flow 정의
|
||||||
|
// ========================================
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Job createJobFlow(JobBuilder jobBuilder) {
|
protected Job createJobFlow(JobBuilder jobBuilder) {
|
||||||
return jobBuilder
|
return jobBuilder
|
||||||
.start(ShipDetailUpdateStep())
|
.start(shipDetailImoFetchStep())
|
||||||
.next(emptyResponseDecider())
|
.next(imoCountDecider())
|
||||||
.on("EMPTY_RESPONSE").end()
|
.on("EMPTY_RESPONSE").end()
|
||||||
.from(emptyResponseDecider()).on("NORMAL").to(shipDetailLastExecutionUpdateStep())
|
.from(imoCountDecider()).on("NORMAL").to(shipDetailUpdatePartitionedStep())
|
||||||
|
.next(shipDetailLastExecutionUpdateStep())
|
||||||
.end()
|
.end()
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
// ========================================
|
||||||
* 응답 데이터 건수 판별 Decider
|
// Step 0: IMO 목록 조회
|
||||||
* 응답 0건이면 EMPTY_RESPONSE, 아니면 NORMAL 반환
|
// ========================================
|
||||||
*/
|
|
||||||
@Bean
|
@Bean
|
||||||
public JobExecutionDecider emptyResponseDecider() {
|
public Tasklet shipDetailImoFetchTasklet() {
|
||||||
|
return new ShipDetailImoFetchTasklet(
|
||||||
|
maritimeApiWebClient, batchDateService, batchApiLogService,
|
||||||
|
batchFailedRecordRepository, maritimeApiUrl);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean(name = "ShipDetailImoFetchStep")
|
||||||
|
public Step shipDetailImoFetchStep() {
|
||||||
|
return new StepBuilder("ShipDetailImoFetchStep", jobRepository)
|
||||||
|
.tasklet(shipDetailImoFetchTasklet(), transactionManager)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ========================================
|
||||||
|
// Decider: IMO 건수 확인
|
||||||
|
// ========================================
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public JobExecutionDecider imoCountDecider() {
|
||||||
return (jobExecution, stepExecution) -> {
|
return (jobExecution, stepExecution) -> {
|
||||||
if (stepExecution != null && stepExecution.getReadCount() == 0) {
|
int totalImoCount = jobExecution.getExecutionContext().getInt("totalImoCount", 0);
|
||||||
log.info("[ShipDetailUpdateJob] Decider: EMPTY_RESPONSE - 응답 데이터 0건으로 LAST_EXECUTION 업데이트 스킵 (다음 실행 시 동일 범위 재조회)");
|
|
||||||
|
if (totalImoCount == 0) {
|
||||||
|
log.info("[ShipDetailUpdateJob] Decider: EMPTY_RESPONSE - IMO 0건으로 LAST_EXECUTION 업데이트 스킵");
|
||||||
return new FlowExecutionStatus("EMPTY_RESPONSE");
|
return new FlowExecutionStatus("EMPTY_RESPONSE");
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("[ShipDetailUpdateJob] Decider: NORMAL 모드 - LAST_EXECUTION 업데이트 진행");
|
log.info("[ShipDetailUpdateJob] Decider: NORMAL - IMO {} 건 처리 시작", totalImoCount);
|
||||||
return new FlowExecutionStatus("NORMAL");
|
return new FlowExecutionStatus("NORMAL");
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ========================================
|
||||||
|
// Step 1: Partitioned Step (병렬 처리)
|
||||||
|
// ========================================
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@StepScope
|
||||||
|
public ShipDetailPartitioner shipDetailPartitioner(
|
||||||
|
@Value("#{jobExecutionContext['allImoNumbers']}") String allImoNumbersStr
|
||||||
|
) {
|
||||||
|
List<String> allImoNumbers = (allImoNumbersStr != null && !allImoNumbersStr.isBlank())
|
||||||
|
? Arrays.asList(allImoNumbersStr.split(","))
|
||||||
|
: Collections.emptyList();
|
||||||
|
return new ShipDetailPartitioner(allImoNumbers, partitionCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean(name = "ShipDetailUpdatePartitionedStep")
|
||||||
|
public Step shipDetailUpdatePartitionedStep() {
|
||||||
|
return new StepBuilder("ShipDetailUpdatePartitionedStep", jobRepository)
|
||||||
|
.partitioner("ShipDetailUpdateStep", shipDetailPartitioner(null))
|
||||||
|
.step(shipDetailUpdateWorkerStep())
|
||||||
|
.taskExecutor(batchPartitionExecutor)
|
||||||
|
.gridSize(partitionCount)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Step shipDetailUpdateWorkerStep() {
|
||||||
|
return new StepBuilder("ShipDetailUpdateStep", jobRepository)
|
||||||
|
.<ShipDetailDto, ShipDetailEntity>chunk(getChunkSize(), transactionManager)
|
||||||
|
.reader(shipDetailUpdateDataReader(null, null, null, null))
|
||||||
|
.processor(shipDetailDataProcessor(null))
|
||||||
|
.writer(shipDetailDataWriter)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ========================================
|
||||||
|
// Reader / Processor (StepScope)
|
||||||
|
// ========================================
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
@StepScope
|
@StepScope
|
||||||
public ShipDetailUpdateDataReader shipDetailUpdateDataReader(
|
public ShipDetailUpdateDataReader shipDetailUpdateDataReader(
|
||||||
@Value("#{stepExecution.jobExecution.id}") Long jobExecutionId,
|
@Value("#{stepExecution.jobExecution.id}") Long jobExecutionId,
|
||||||
@Value("#{stepExecution.id}") Long stepExecutionId,
|
@Value("#{stepExecution.id}") Long stepExecutionId,
|
||||||
@Value("#{jobParameters['executionMode']}") String executionMode,
|
@Value("#{jobParameters['executionMode']}") String executionMode,
|
||||||
@Value("#{jobParameters['sourceStepExecutionId']}") String sourceStepExecutionIdParam
|
@Value("#{stepExecutionContext['partitionImoNumbers']}") String partitionImoNumbers
|
||||||
) {
|
) {
|
||||||
ShipDetailUpdateDataReader reader = new ShipDetailUpdateDataReader(
|
ShipDetailUpdateDataReader reader = new ShipDetailUpdateDataReader(
|
||||||
maritimeApiWebClient, jdbcTemplate, objectMapper,
|
maritimeApiWebClient, jdbcTemplate, objectMapper,
|
||||||
@ -163,36 +233,15 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
|
|||||||
shipDetailBatchSize, delayOnSuccessMs, delayOnFailureMs, maxRetryCount
|
shipDetailBatchSize, delayOnSuccessMs, delayOnFailureMs, maxRetryCount
|
||||||
);
|
);
|
||||||
reader.setExecutionIds(jobExecutionId, stepExecutionId);
|
reader.setExecutionIds(jobExecutionId, stepExecutionId);
|
||||||
|
reader.setPartitionImoNumbers(partitionImoNumbers);
|
||||||
|
|
||||||
// RECOLLECT 모드: DB에서 실패 키를 직접 조회하여 주입
|
if ("RECOLLECT".equals(executionMode)) {
|
||||||
if ("RECOLLECT".equals(executionMode) && sourceStepExecutionIdParam != null && !sourceStepExecutionIdParam.isBlank()) {
|
reader.setRecollectMode(true);
|
||||||
Long sourceStepExecutionId = Long.parseLong(sourceStepExecutionIdParam);
|
|
||||||
List<String> retryKeys = batchFailedRecordRepository.findFailedRecordKeysByStepExecutionId(
|
|
||||||
"ShipDetailUpdateJob", sourceStepExecutionId);
|
|
||||||
|
|
||||||
if (!retryKeys.isEmpty()) {
|
|
||||||
reader.setRetryRecordKeys(retryKeys);
|
|
||||||
reader.setSourceStepExecutionId(sourceStepExecutionId);
|
|
||||||
log.info("[ShipDetailUpdateJob] Retry 모드 활성화: DB에서 {} 건의 실패 키 조회, sourceStepExecutionId: {}",
|
|
||||||
retryKeys.size(), sourceStepExecutionId);
|
|
||||||
} else {
|
|
||||||
log.warn("[ShipDetailUpdateJob] RECOLLECT 모드이나 DB에서 실패 키를 찾을 수 없음 (sourceStepExecutionId: {})",
|
|
||||||
sourceStepExecutionId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return reader;
|
return reader;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ItemReader<ShipDetailDto> createReader() { // 타입 변경
|
|
||||||
return shipDetailUpdateDataReader;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ItemProcessor<ShipDetailDto, ShipDetailEntity> createProcessor() {
|
|
||||||
return shipDetailDataProcessor;
|
|
||||||
}
|
|
||||||
@Bean
|
@Bean
|
||||||
@StepScope
|
@StepScope
|
||||||
public ShipDetailDataProcessor shipDetailDataProcessor(
|
public ShipDetailDataProcessor shipDetailDataProcessor(
|
||||||
@ -201,29 +250,24 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
|
|||||||
return new ShipDetailDataProcessor(jobExecutionId);
|
return new ShipDetailDataProcessor(jobExecutionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ItemWriter<ShipDetailEntity> createWriter() { // 타입 변경
|
|
||||||
return shipDetailDataWriter;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected int getChunkSize() {
|
protected int getChunkSize() {
|
||||||
return 20;
|
return 20;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ========================================
|
||||||
|
// Job / Step Bean 등록
|
||||||
|
// ========================================
|
||||||
|
|
||||||
@Bean(name = "ShipDetailUpdateJob")
|
@Bean(name = "ShipDetailUpdateJob")
|
||||||
public Job ShipDetailUpdateJob() {
|
public Job ShipDetailUpdateJob() {
|
||||||
return job();
|
return job();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean(name = "ShipDetailUpdateStep")
|
// ========================================
|
||||||
public Step ShipDetailUpdateStep() {
|
// Step 2: LastExecution 업데이트
|
||||||
return step();
|
// ========================================
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 2단계: 모든 스텝 성공 시 배치 실행 로그(날짜) 업데이트
|
|
||||||
*/
|
|
||||||
@Bean
|
@Bean
|
||||||
public Tasklet shipDetailLastExecutionUpdateTasklet() {
|
public Tasklet shipDetailLastExecutionUpdateTasklet() {
|
||||||
return (contribution, chunkContext) -> {
|
return (contribution, chunkContext) -> {
|
||||||
@ -257,6 +301,7 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
|
|||||||
return RepeatStatus.FINISHED;
|
return RepeatStatus.FINISHED;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean(name = "ShipDetailLastExecutionUpdateStep")
|
@Bean(name = "ShipDetailLastExecutionUpdateStep")
|
||||||
public Step shipDetailLastExecutionUpdateStep() {
|
public Step shipDetailLastExecutionUpdateStep() {
|
||||||
return new StepBuilder("ShipDetailLastExecutionUpdateStep", jobRepository)
|
return new StepBuilder("ShipDetailLastExecutionUpdateStep", jobRepository)
|
||||||
|
|||||||
@ -4,9 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||||||
import com.snp.batch.common.batch.reader.BaseApiReader;
|
import com.snp.batch.common.batch.reader.BaseApiReader;
|
||||||
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailApiResponse;
|
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailApiResponse;
|
||||||
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailDto;
|
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailDto;
|
||||||
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDto;
|
|
||||||
import com.snp.batch.jobs.shipdetail.batch.dto.ShipResultDto;
|
import com.snp.batch.jobs.shipdetail.batch.dto.ShipResultDto;
|
||||||
import com.snp.batch.jobs.shipdetail.batch.dto.ShipUpdateApiResponse;
|
|
||||||
import com.snp.batch.service.BatchApiLogService;
|
import com.snp.batch.service.BatchApiLogService;
|
||||||
import com.snp.batch.service.BatchDateService;
|
import com.snp.batch.service.BatchDateService;
|
||||||
import com.snp.batch.service.BatchFailedRecordService;
|
import com.snp.batch.service.BatchFailedRecordService;
|
||||||
@ -41,10 +39,11 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
|
|||||||
// 실패 IMO 추적
|
// 실패 IMO 추적
|
||||||
private final List<String> failedImoNumbers = new ArrayList<>();
|
private final List<String> failedImoNumbers = new ArrayList<>();
|
||||||
private String lastErrorMessage;
|
private String lastErrorMessage;
|
||||||
|
private boolean afterFetchCompleted = false;
|
||||||
|
|
||||||
// Retry 모드
|
// 파티션 모드
|
||||||
private List<String> retryRecordKeys;
|
private String partitionImoNumbers;
|
||||||
private Long sourceStepExecutionId;
|
private boolean recollectMode = false;
|
||||||
|
|
||||||
public ShipDetailUpdateDataReader(
|
public ShipDetailUpdateDataReader(
|
||||||
WebClient webClient,
|
WebClient webClient,
|
||||||
@ -73,16 +72,12 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
|
|||||||
enableChunkMode();
|
enableChunkMode();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setRetryRecordKeys(List<String> retryRecordKeys) {
|
public void setPartitionImoNumbers(String partitionImoNumbers) {
|
||||||
this.retryRecordKeys = retryRecordKeys;
|
this.partitionImoNumbers = partitionImoNumbers;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setSourceStepExecutionId(Long sourceStepExecutionId) {
|
public void setRecollectMode(boolean recollectMode) {
|
||||||
this.sourceStepExecutionId = sourceStepExecutionId;
|
this.recollectMode = recollectMode;
|
||||||
}
|
|
||||||
|
|
||||||
private boolean isRetryMode() {
|
|
||||||
return retryRecordKeys != null && !retryRecordKeys.isEmpty();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -90,10 +85,6 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
|
|||||||
return "ShipDetailUpdateDataReader";
|
return "ShipDetailUpdateDataReader";
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String getShipUpdateApiPath() {
|
|
||||||
return "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipChangesByLastUpdateDateRange";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected String getApiPath() {
|
protected String getApiPath() {
|
||||||
return "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipsByIHSLRorIMONumbersAll";
|
return "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipsByIHSLRorIMONumbersAll";
|
||||||
@ -109,25 +100,22 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
|
|||||||
this.allImoNumbers = null;
|
this.allImoNumbers = null;
|
||||||
this.failedImoNumbers.clear();
|
this.failedImoNumbers.clear();
|
||||||
this.lastErrorMessage = null;
|
this.lastErrorMessage = null;
|
||||||
// retryRecordKeys는 JobConfig에서 주입하므로 초기화하지 않음
|
this.afterFetchCompleted = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void beforeFetch() {
|
protected void beforeFetch() {
|
||||||
if (isRetryMode()) {
|
// 파티션에서 할당받은 IMO 목록을 파싱
|
||||||
log.info("[{}] [RETRY MODE] 실패 건 재수집 모드 시작 - 대상 IMO: {} 건",
|
if (partitionImoNumbers != null && !partitionImoNumbers.isBlank()) {
|
||||||
getReaderName(), retryRecordKeys.size());
|
allImoNumbers = new ArrayList<>(Arrays.asList(partitionImoNumbers.split(",")));
|
||||||
allImoNumbers = new ArrayList<>(retryRecordKeys);
|
|
||||||
log.info("[{}] [RETRY MODE] IMO 목록: {}", getReaderName(), allImoNumbers);
|
|
||||||
} else {
|
} else {
|
||||||
log.info("[{}] 변경된 IMO 번호 조회 시작...", getReaderName());
|
allImoNumbers = Collections.emptyList();
|
||||||
ShipUpdateApiResponse response = callShipUpdateApi();
|
|
||||||
allImoNumbers = extractUpdateImoNumbers(response);
|
|
||||||
log.info("[{}] 총 {} 개의 변경된 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
|
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
|
||||||
|
|
||||||
|
log.info("[{}] 파티션 IMO {} 건 할당 (recollectMode: {})",
|
||||||
|
getReaderName(), allImoNumbers.size(), recollectMode);
|
||||||
log.info("[{}] 설정: batch-size={}, delay-success={}ms, delay-failure={}ms, max-retry={}",
|
log.info("[{}] 설정: batch-size={}, delay-success={}ms, delay-failure={}ms, max-retry={}",
|
||||||
getReaderName(), batchSize, delayOnSuccessMs, delayOnFailureMs, maxRetryCount);
|
getReaderName(), batchSize, delayOnSuccessMs, delayOnFailureMs, maxRetryCount);
|
||||||
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
|
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
|
||||||
@ -255,46 +243,31 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ShipUpdateApiResponse callShipUpdateApi() {
|
|
||||||
Map<String, String> params = batchDateService.getDateRangeWithoutTimeParams(getApiKey());
|
|
||||||
return executeSingleApiCall(
|
|
||||||
maritimeApiUrl,
|
|
||||||
getShipUpdateApiPath(),
|
|
||||||
params,
|
|
||||||
new ParameterizedTypeReference<ShipUpdateApiResponse>() {},
|
|
||||||
batchApiLogService,
|
|
||||||
res -> res.getShips() != null ? (long) res.getShips().size() : 0L
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<String> extractUpdateImoNumbers(ShipUpdateApiResponse response) {
|
|
||||||
if (response.getShips() == null) {
|
|
||||||
return Collections.emptyList();
|
|
||||||
}
|
|
||||||
return response.getShips().stream()
|
|
||||||
.map(ShipDto::getImoNumber)
|
|
||||||
.filter(imoNumber -> imoNumber != null)
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void afterFetch(List<ShipDetailDto> data) {
|
protected void afterFetch(List<ShipDetailDto> data) {
|
||||||
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
|
int totalBatches = allImoNumbers != null
|
||||||
|
? (int) Math.ceil((double) allImoNumbers.size() / batchSize) : 0;
|
||||||
try {
|
try {
|
||||||
if (data == null) {
|
if (data == null && !afterFetchCompleted) {
|
||||||
|
afterFetchCompleted = true;
|
||||||
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
|
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
|
||||||
log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
|
log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
|
||||||
getReaderName(), allImoNumbers.size());
|
getReaderName(), allImoNumbers != null ? allImoNumbers.size() : 0);
|
||||||
|
|
||||||
if (isRetryMode()) {
|
if (recollectMode) {
|
||||||
// Retry 모드: 성공 건 RESOLVED 처리 + 재실패 건 새 FAILED 레코드 저장
|
// RECOLLECT 모드: 성공 건 RESOLVED 처리 + 재실패 건 새 FAILED 레코드 저장
|
||||||
log.info("[{}] [RETRY MODE] 재수집 결과 처리 시작 (sourceStepExecutionId: {})",
|
Long sourceJobExecutionId = getSourceJobExecutionId();
|
||||||
getReaderName(), sourceStepExecutionId);
|
log.info("[{}] [RECOLLECT] 재수집 결과 처리 시작 (sourceJobExecutionId: {})",
|
||||||
batchFailedRecordService.resolveSuccessfulRetries(
|
getReaderName(), sourceJobExecutionId);
|
||||||
"ShipDetailUpdateJob", sourceStepExecutionId, retryRecordKeys, failedImoNumbers);
|
|
||||||
|
if (sourceJobExecutionId != null) {
|
||||||
|
batchFailedRecordService.resolveSuccessfulRetries(
|
||||||
|
"ShipDetailUpdateJob", sourceJobExecutionId,
|
||||||
|
allImoNumbers, failedImoNumbers);
|
||||||
|
}
|
||||||
|
|
||||||
if (!failedImoNumbers.isEmpty()) {
|
if (!failedImoNumbers.isEmpty()) {
|
||||||
log.warn("[{}] [RETRY MODE] 재실패 IMO 건수: {} 건", getReaderName(), failedImoNumbers.size());
|
log.warn("[{}] [RECOLLECT] 재실패 IMO 건수: {} 건", getReaderName(), failedImoNumbers.size());
|
||||||
batchFailedRecordService.saveFailedRecords(
|
batchFailedRecordService.saveFailedRecords(
|
||||||
"ShipDetailUpdateJob",
|
"ShipDetailUpdateJob",
|
||||||
getJobExecutionId(),
|
getJobExecutionId(),
|
||||||
@ -304,11 +277,11 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
|
|||||||
lastErrorMessage
|
lastErrorMessage
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
log.info("[{}] [RETRY MODE] 모든 재수집 건 정상 처리 완료", getReaderName());
|
log.info("[{}] [RECOLLECT] 모든 재수집 건 정상 처리 완료", getReaderName());
|
||||||
}
|
}
|
||||||
|
|
||||||
int successCount = retryRecordKeys.size() - failedImoNumbers.size();
|
int successCount = (allImoNumbers != null ? allImoNumbers.size() : 0) - failedImoNumbers.size();
|
||||||
log.info("[{}] [RETRY MODE] 결과: 성공 {} 건, 재실패 {} 건",
|
log.info("[{}] [RECOLLECT] 결과: 성공 {} 건, 재실패 {} 건",
|
||||||
getReaderName(), successCount, failedImoNumbers.size());
|
getReaderName(), successCount, failedImoNumbers.size());
|
||||||
} else {
|
} else {
|
||||||
// 일반 모드: 동기 저장 (자동 재수집 트리거 전에 커밋 보장)
|
// 일반 모드: 동기 저장 (자동 재수집 트리거 전에 커밋 보장)
|
||||||
@ -333,11 +306,11 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
|
|||||||
log.error("[{}] afterFetch 처리 중 예외 발생: {}", getReaderName(), e.getMessage(), e);
|
log.error("[{}] afterFetch 처리 중 예외 발생: {}", getReaderName(), e.getMessage(), e);
|
||||||
} finally {
|
} finally {
|
||||||
// 일반 모드에서만: 실패 건이 있으면 ExecutionContext에 저장 (자동 재수집 트리거용)
|
// 일반 모드에서만: 실패 건이 있으면 ExecutionContext에 저장 (자동 재수집 트리거용)
|
||||||
if (!isRetryMode() && !failedImoNumbers.isEmpty() && stepExecution != null) {
|
if (!recollectMode && !failedImoNumbers.isEmpty() && stepExecution != null) {
|
||||||
try {
|
try {
|
||||||
String failedKeys = String.join(",", failedImoNumbers);
|
String failedKeys = String.join(",", failedImoNumbers);
|
||||||
stepExecution.getExecutionContext().putString("failedRecordKeys", failedKeys);
|
stepExecution.getExecutionContext().putString("failedRecordKeys", failedKeys);
|
||||||
stepExecution.getExecutionContext().putLong("failedStepExecutionId", getStepExecutionId());
|
stepExecution.getExecutionContext().putLong("failedJobExecutionId", getJobExecutionId());
|
||||||
stepExecution.getExecutionContext().putString("failedApiKey", getApiKey());
|
stepExecution.getExecutionContext().putString("failedApiKey", getApiKey());
|
||||||
log.info("[{}] 자동 재수집 대상 실패 키 {} 건 ExecutionContext에 저장",
|
log.info("[{}] 자동 재수집 대상 실패 키 {} 건 ExecutionContext에 저장",
|
||||||
getReaderName(), failedImoNumbers.size());
|
getReaderName(), failedImoNumbers.size());
|
||||||
@ -347,4 +320,18 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* JobParameter에서 sourceJobExecutionId를 조회 (RECOLLECT 모드용)
|
||||||
|
*/
|
||||||
|
private Long getSourceJobExecutionId() {
|
||||||
|
if (stepExecution != null) {
|
||||||
|
String param = stepExecution.getJobExecution()
|
||||||
|
.getJobParameters().getString("sourceJobExecutionId");
|
||||||
|
if (param != null && !param.isBlank()) {
|
||||||
|
return Long.parseLong(param);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -141,19 +141,19 @@ public class BatchFailedRecordService {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 재수집 성공 건을 RESOLVED로 처리합니다.
|
* 재수집 성공 건을 RESOLVED로 처리합니다.
|
||||||
* 원본 stepExecutionId로 범위를 제한하여 해당 Step의 실패 건만 RESOLVED 처리합니다.
|
* 원본 jobExecutionId로 범위를 제한하여 해당 Job의 실패 건만 RESOLVED 처리합니다.
|
||||||
*/
|
*/
|
||||||
@Transactional(propagation = Propagation.REQUIRES_NEW)
|
@Transactional(propagation = Propagation.REQUIRES_NEW)
|
||||||
public void resolveSuccessfulRetries(String jobName, Long sourceStepExecutionId,
|
public void resolveSuccessfulRetries(String jobName, Long sourceJobExecutionId,
|
||||||
List<String> allRetryKeys, List<String> failedAgainKeys) {
|
List<String> allRetryKeys, List<String> failedAgainKeys) {
|
||||||
List<String> successfulKeys = allRetryKeys.stream()
|
List<String> successfulKeys = allRetryKeys.stream()
|
||||||
.filter(key -> !failedAgainKeys.contains(key))
|
.filter(key -> !failedAgainKeys.contains(key))
|
||||||
.toList();
|
.toList();
|
||||||
if (!successfulKeys.isEmpty()) {
|
if (!successfulKeys.isEmpty()) {
|
||||||
int resolved = batchFailedRecordRepository.resolveByStepExecutionIdAndRecordKeys(
|
int resolved = batchFailedRecordRepository.resolveByJobExecutionIdAndRecordKeys(
|
||||||
jobName, sourceStepExecutionId, successfulKeys, LocalDateTime.now());
|
jobName, sourceJobExecutionId, successfulKeys, LocalDateTime.now());
|
||||||
log.info("실패 레코드 RESOLVED 처리: {} 건 (job: {}, sourceStepExecutionId: {})",
|
log.info("실패 레코드 RESOLVED 처리: {} 건 (job: {}, sourceJobExecutionId: {})",
|
||||||
resolved, jobName, sourceStepExecutionId);
|
resolved, jobName, sourceJobExecutionId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -110,10 +110,11 @@ app:
|
|||||||
|
|
||||||
# ShipDetailUpdate 배치 설정 (dev - 기존과 동일하게 20건 유지)
|
# ShipDetailUpdate 배치 설정 (dev - 기존과 동일하게 20건 유지)
|
||||||
ship-detail-update:
|
ship-detail-update:
|
||||||
batch-size: 20 # dev에서는 문제 없으므로 기존 20건 유지
|
batch-size: 10 # dev에서는 문제 없으므로 기존 20건 유지
|
||||||
delay-on-success-ms: 300
|
delay-on-success-ms: 300
|
||||||
delay-on-failure-ms: 2000
|
delay-on-failure-ms: 2000
|
||||||
max-retry-count: 3
|
max-retry-count: 3
|
||||||
|
partition-count: 4 # dev: 2개 파티션
|
||||||
|
|
||||||
# AIS Target 배치 설정
|
# AIS Target 배치 설정
|
||||||
ais-target:
|
ais-target:
|
||||||
|
|||||||
@ -115,6 +115,7 @@ app:
|
|||||||
delay-on-success-ms: 300 # 성공 시 딜레이 (ms)
|
delay-on-success-ms: 300 # 성공 시 딜레이 (ms)
|
||||||
delay-on-failure-ms: 2000 # 실패 시 딜레이 (ms)
|
delay-on-failure-ms: 2000 # 실패 시 딜레이 (ms)
|
||||||
max-retry-count: 3 # 최대 재시도 횟수
|
max-retry-count: 3 # 최대 재시도 횟수
|
||||||
|
partition-count: 4 # prod: 4개 파티션
|
||||||
|
|
||||||
# AIS Target 배치 설정
|
# AIS Target 배치 설정
|
||||||
ais-target:
|
ais-target:
|
||||||
|
|||||||
@ -172,6 +172,7 @@ app:
|
|||||||
delay-on-success-ms: 300 # 성공 시 딜레이 (ms)
|
delay-on-success-ms: 300 # 성공 시 딜레이 (ms)
|
||||||
delay-on-failure-ms: 2000 # 실패 시 딜레이 (ms)
|
delay-on-failure-ms: 2000 # 실패 시 딜레이 (ms)
|
||||||
max-retry-count: 3 # 최대 재시도 횟수
|
max-retry-count: 3 # 최대 재시도 횟수
|
||||||
|
partition-count: 4 # 병렬 파티션 수
|
||||||
|
|
||||||
# AIS Target Import 배치 설정 (캐시 업데이트 전용)
|
# AIS Target Import 배치 설정 (캐시 업데이트 전용)
|
||||||
ais-target:
|
ais-target:
|
||||||
|
|||||||
불러오는 중...
Reference in New Issue
Block a user