Merge pull request 'fix(배치): RECOLLECT 모드에서 Tasklet 자체 스킵으로 last_success_date 복원 로직 제거 (#50)' (#51) from bugfix/ISSUE-50-recollect-execution-error into develop
This commit is contained in:
커밋
45100a9db1
@ -44,6 +44,7 @@
|
|||||||
- 마지막 성공 일시 세팅 방법 수정 (#15)
|
- 마지막 성공 일시 세팅 방법 수정 (#15)
|
||||||
- 테스트용 IMO 목록 건수 제한 제거 (#32)
|
- 테스트용 IMO 목록 건수 제한 제거 (#32)
|
||||||
- 타임라인 상세 화면 이동 오류 수정 및 실행 중 작업 상세 버튼 추가 (#34)
|
- 타임라인 상세 화면 이동 오류 수정 및 실행 중 작업 상세 버튼 추가 (#34)
|
||||||
|
- RECOLLECT 모드에서 Tasklet 자체 스킵으로 last_success_date 복원 로직 제거 (#50)
|
||||||
|
|
||||||
### 변경
|
### 변경
|
||||||
- 실패 레코드 Upsert 패턴 적용 (동일 키 중복 방지)
|
- 실패 레코드 Upsert 패턴 적용 (동일 키 중복 방지)
|
||||||
|
|||||||
@ -8,15 +8,11 @@ import org.springframework.batch.core.JobExecutionListener;
|
|||||||
import org.springframework.batch.core.StepExecution;
|
import org.springframework.batch.core.StepExecution;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public class RecollectionJobExecutionListener implements JobExecutionListener {
|
public class RecollectionJobExecutionListener implements JobExecutionListener {
|
||||||
|
|
||||||
private static final String ORIGINAL_LAST_SUCCESS_DATE_KEY = "originalLastSuccessDate";
|
|
||||||
|
|
||||||
private final RecollectionHistoryService recollectionHistoryService;
|
private final RecollectionHistoryService recollectionHistoryService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -35,18 +31,7 @@ public class RecollectionJobExecutionListener implements JobExecutionListener {
|
|||||||
String reason = jobExecution.getJobParameters().getString("reason");
|
String reason = jobExecution.getJobParameters().getString("reason");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 1. 현재 last_success_date를 JobExecutionContext에 저장 (afterJob에서 복원용)
|
// 재수집 이력 기록
|
||||||
if (apiKey != null) {
|
|
||||||
LocalDateTime originalDate = recollectionHistoryService.getLastSuccessDate(apiKey);
|
|
||||||
if (originalDate != null) {
|
|
||||||
jobExecution.getExecutionContext()
|
|
||||||
.putString(ORIGINAL_LAST_SUCCESS_DATE_KEY, originalDate.toString());
|
|
||||||
log.info("[RecollectionListener] 원본 last_success_date 저장: apiKey={}, date={}",
|
|
||||||
apiKey, originalDate);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. 재수집 이력 기록
|
|
||||||
recollectionHistoryService.recordStart(
|
recollectionHistoryService.recordStart(
|
||||||
jobName, jobExecutionId, apiKey, executor, reason);
|
jobName, jobExecutionId, apiKey, executor, reason);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@ -65,7 +50,6 @@ public class RecollectionJobExecutionListener implements JobExecutionListener {
|
|||||||
|
|
||||||
Long jobExecutionId = jobExecution.getId();
|
Long jobExecutionId = jobExecution.getId();
|
||||||
String status = jobExecution.getStatus().name();
|
String status = jobExecution.getStatus().name();
|
||||||
String apiKey = resolveApiKey(jobExecution);
|
|
||||||
|
|
||||||
// Step별 통계 집계
|
// Step별 통계 집계
|
||||||
long totalRead = 0;
|
long totalRead = 0;
|
||||||
@ -102,7 +86,7 @@ public class RecollectionJobExecutionListener implements JobExecutionListener {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 1. 재수집 이력 완료 기록
|
// 재수집 이력 완료 기록
|
||||||
try {
|
try {
|
||||||
recollectionHistoryService.recordCompletion(
|
recollectionHistoryService.recordCompletion(
|
||||||
jobExecutionId, status,
|
jobExecutionId, status,
|
||||||
@ -112,37 +96,6 @@ public class RecollectionJobExecutionListener implements JobExecutionListener {
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[RecollectionListener] 재수집 이력 완료 기록 실패: jobExecutionId={}", jobExecutionId, e);
|
log.error("[RecollectionListener] 재수집 이력 완료 기록 실패: jobExecutionId={}", jobExecutionId, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. last_success_date 복원 (Tasklet이 NOW()로 업데이트한 것을 되돌림)
|
|
||||||
// 재수집은 과거 데이터 재처리이므로 last_success_date를 변경하면 안 됨
|
|
||||||
// recordCompletion 실패와 무관하게 반드시 실행되어야 함
|
|
||||||
try {
|
|
||||||
if (apiKey != null) {
|
|
||||||
String originalDateStr = jobExecution.getExecutionContext()
|
|
||||||
.getString(ORIGINAL_LAST_SUCCESS_DATE_KEY, null);
|
|
||||||
log.info("[RecollectionListener] last_success_date 복원 시도: apiKey={}, originalDateStr={}",
|
|
||||||
apiKey, originalDateStr);
|
|
||||||
if (originalDateStr != null) {
|
|
||||||
LocalDateTime originalDate = LocalDateTime.parse(originalDateStr);
|
|
||||||
// 현재 DB 값이 원본보다 미래면(정상 수집 발생) 복원 스킵
|
|
||||||
LocalDateTime currentDate = recollectionHistoryService.getLastSuccessDate(apiKey);
|
|
||||||
if (currentDate != null && currentDate.isAfter(originalDate)) {
|
|
||||||
log.info("[RecollectionListener] last_success_date가 이미 갱신됨 (정상 수집 발생), 복원 스킵: apiKey={}, original={}, current={}",
|
|
||||||
apiKey, originalDate, currentDate);
|
|
||||||
} else {
|
|
||||||
recollectionHistoryService.restoreLastSuccessDate(apiKey, originalDate);
|
|
||||||
log.info("[RecollectionListener] last_success_date 복원 완료: apiKey={}, date={}",
|
|
||||||
apiKey, originalDate);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.warn("[RecollectionListener] originalLastSuccessDate가 ExecutionContext에 없음: apiKey={}",
|
|
||||||
apiKey);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("[RecollectionListener] last_success_date 복원 실패: apiKey={}, jobExecutionId={}",
|
|
||||||
apiKey, jobExecutionId, e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@ -212,6 +212,14 @@ public class CompanyComplianceImportRangeJobConfig extends BaseMultiStepJobConfi
|
|||||||
@Bean
|
@Bean
|
||||||
public Tasklet companyComplianceLastExecutionUpdateTasklet() {
|
public Tasklet companyComplianceLastExecutionUpdateTasklet() {
|
||||||
return (contribution, chunkContext) -> {
|
return (contribution, chunkContext) -> {
|
||||||
|
String executionMode = chunkContext.getStepContext()
|
||||||
|
.getStepExecution().getJobExecution()
|
||||||
|
.getJobParameters().getString("executionMode", "NORMAL");
|
||||||
|
if ("RECOLLECT".equals(executionMode)) {
|
||||||
|
log.info(">>>>> RECOLLECT 모드 - LAST_EXECUTION 업데이트 스킵");
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
String toDateStr = chunkContext.getStepContext()
|
String toDateStr = chunkContext.getStepContext()
|
||||||
.getStepExecution().getJobExecution()
|
.getStepExecution().getJobExecution()
|
||||||
.getExecutionContext().getString("batchToDate", null);
|
.getExecutionContext().getString("batchToDate", null);
|
||||||
|
|||||||
@ -214,6 +214,14 @@ public class ComplianceImportRangeJobConfig extends BaseMultiStepJobConfig<Compl
|
|||||||
@Bean
|
@Bean
|
||||||
public Tasklet complianceLastExecutionUpdateTasklet() {
|
public Tasklet complianceLastExecutionUpdateTasklet() {
|
||||||
return (contribution, chunkContext) -> {
|
return (contribution, chunkContext) -> {
|
||||||
|
String executionMode = chunkContext.getStepContext()
|
||||||
|
.getStepExecution().getJobExecution()
|
||||||
|
.getJobParameters().getString("executionMode", "NORMAL");
|
||||||
|
if ("RECOLLECT".equals(executionMode)) {
|
||||||
|
log.info(">>>>> RECOLLECT 모드 - LAST_EXECUTION 업데이트 스킵");
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
String toDateStr = chunkContext.getStepContext()
|
String toDateStr = chunkContext.getStepContext()
|
||||||
.getStepExecution().getJobExecution()
|
.getStepExecution().getJobExecution()
|
||||||
.getExecutionContext().getString("batchToDate", null);
|
.getExecutionContext().getString("batchToDate", null);
|
||||||
|
|||||||
@ -160,6 +160,14 @@ public class EventImportJobConfig extends BaseMultiStepJobConfig<EventDetailDto,
|
|||||||
@Bean
|
@Bean
|
||||||
public Tasklet eventLastExecutionUpdateTasklet() {
|
public Tasklet eventLastExecutionUpdateTasklet() {
|
||||||
return (contribution, chunkContext) -> {
|
return (contribution, chunkContext) -> {
|
||||||
|
String executionMode = chunkContext.getStepContext()
|
||||||
|
.getStepExecution().getJobExecution()
|
||||||
|
.getJobParameters().getString("executionMode", "NORMAL");
|
||||||
|
if ("RECOLLECT".equals(executionMode)) {
|
||||||
|
log.info(">>>>> RECOLLECT 모드 - LAST_EXECUTION 업데이트 스킵");
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
String toDateStr = chunkContext.getStepContext()
|
String toDateStr = chunkContext.getStepContext()
|
||||||
.getStepExecution().getJobExecution()
|
.getStepExecution().getJobExecution()
|
||||||
.getExecutionContext().getString("batchToDate", null);
|
.getExecutionContext().getString("batchToDate", null);
|
||||||
|
|||||||
@ -150,6 +150,14 @@ public class AnchorageCallsRangeJobConfig extends BaseMultiStepJobConfig<Anchora
|
|||||||
@Bean
|
@Bean
|
||||||
public Tasklet anchorageCallsLastExecutionUpdateTasklet() {
|
public Tasklet anchorageCallsLastExecutionUpdateTasklet() {
|
||||||
return (contribution, chunkContext) -> {
|
return (contribution, chunkContext) -> {
|
||||||
|
String executionMode = chunkContext.getStepContext()
|
||||||
|
.getStepExecution().getJobExecution()
|
||||||
|
.getJobParameters().getString("executionMode", "NORMAL");
|
||||||
|
if ("RECOLLECT".equals(executionMode)) {
|
||||||
|
log.info(">>>>> RECOLLECT 모드 - LAST_EXECUTION 업데이트 스킵");
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
String toDateStr = chunkContext.getStepContext()
|
String toDateStr = chunkContext.getStepContext()
|
||||||
.getStepExecution().getJobExecution()
|
.getStepExecution().getJobExecution()
|
||||||
.getExecutionContext().getString("batchToDate", null);
|
.getExecutionContext().getString("batchToDate", null);
|
||||||
|
|||||||
@ -144,6 +144,14 @@ public class BerthCallsRangJobConfig extends BaseMultiStepJobConfig<BerthCallsDt
|
|||||||
@Bean
|
@Bean
|
||||||
public Tasklet berthCallsLastExecutionUpdateTasklet() {
|
public Tasklet berthCallsLastExecutionUpdateTasklet() {
|
||||||
return (contribution, chunkContext) -> {
|
return (contribution, chunkContext) -> {
|
||||||
|
String executionMode = chunkContext.getStepContext()
|
||||||
|
.getStepExecution().getJobExecution()
|
||||||
|
.getJobParameters().getString("executionMode", "NORMAL");
|
||||||
|
if ("RECOLLECT".equals(executionMode)) {
|
||||||
|
log.info(">>>>> RECOLLECT 모드 - LAST_EXECUTION 업데이트 스킵");
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
String toDateStr = chunkContext.getStepContext()
|
String toDateStr = chunkContext.getStepContext()
|
||||||
.getStepExecution().getJobExecution()
|
.getStepExecution().getJobExecution()
|
||||||
.getExecutionContext().getString("batchToDate", null);
|
.getExecutionContext().getString("batchToDate", null);
|
||||||
|
|||||||
@ -142,6 +142,14 @@ public class CurrentlyAtRangeJobConfig extends BaseMultiStepJobConfig<CurrentlyA
|
|||||||
@Bean
|
@Bean
|
||||||
public Tasklet currentlyAtLastExecutionUpdateTasklet() {
|
public Tasklet currentlyAtLastExecutionUpdateTasklet() {
|
||||||
return (contribution, chunkContext) -> {
|
return (contribution, chunkContext) -> {
|
||||||
|
String executionMode = chunkContext.getStepContext()
|
||||||
|
.getStepExecution().getJobExecution()
|
||||||
|
.getJobParameters().getString("executionMode", "NORMAL");
|
||||||
|
if ("RECOLLECT".equals(executionMode)) {
|
||||||
|
log.info(">>>>> RECOLLECT 모드 - LAST_EXECUTION 업데이트 스킵");
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
String toDateStr = chunkContext.getStepContext()
|
String toDateStr = chunkContext.getStepContext()
|
||||||
.getStepExecution().getJobExecution()
|
.getStepExecution().getJobExecution()
|
||||||
.getExecutionContext().getString("batchToDate", null);
|
.getExecutionContext().getString("batchToDate", null);
|
||||||
|
|||||||
@ -145,6 +145,14 @@ public class DestinationsRangeJobConfig extends BaseMultiStepJobConfig<Destinati
|
|||||||
@Bean
|
@Bean
|
||||||
public Tasklet destinationsLastExecutionUpdateTasklet() {
|
public Tasklet destinationsLastExecutionUpdateTasklet() {
|
||||||
return (contribution, chunkContext) -> {
|
return (contribution, chunkContext) -> {
|
||||||
|
String executionMode = chunkContext.getStepContext()
|
||||||
|
.getStepExecution().getJobExecution()
|
||||||
|
.getJobParameters().getString("executionMode", "NORMAL");
|
||||||
|
if ("RECOLLECT".equals(executionMode)) {
|
||||||
|
log.info(">>>>> RECOLLECT 모드 - LAST_EXECUTION 업데이트 스킵");
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
String toDateStr = chunkContext.getStepContext()
|
String toDateStr = chunkContext.getStepContext()
|
||||||
.getStepExecution().getJobExecution()
|
.getStepExecution().getJobExecution()
|
||||||
.getExecutionContext().getString("batchToDate", null);
|
.getExecutionContext().getString("batchToDate", null);
|
||||||
|
|||||||
@ -146,6 +146,14 @@ public class ShipPortCallsRangeJobConfig extends BaseMultiStepJobConfig<PortCall
|
|||||||
@Bean
|
@Bean
|
||||||
public Tasklet portCallsLastExecutionUpdateTasklet() {
|
public Tasklet portCallsLastExecutionUpdateTasklet() {
|
||||||
return (contribution, chunkContext) -> {
|
return (contribution, chunkContext) -> {
|
||||||
|
String executionMode = chunkContext.getStepContext()
|
||||||
|
.getStepExecution().getJobExecution()
|
||||||
|
.getJobParameters().getString("executionMode", "NORMAL");
|
||||||
|
if ("RECOLLECT".equals(executionMode)) {
|
||||||
|
log.info(">>>>> RECOLLECT 모드 - LAST_EXECUTION 업데이트 스킵");
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
String toDateStr = chunkContext.getStepContext()
|
String toDateStr = chunkContext.getStepContext()
|
||||||
.getStepExecution().getJobExecution()
|
.getStepExecution().getJobExecution()
|
||||||
.getExecutionContext().getString("batchToDate", null);
|
.getExecutionContext().getString("batchToDate", null);
|
||||||
|
|||||||
@ -144,6 +144,14 @@ public class StsOperationRangeJobConfig extends BaseMultiStepJobConfig<StsOperat
|
|||||||
@Bean
|
@Bean
|
||||||
public Tasklet stsOperationLastExecutionUpdateTasklet() {
|
public Tasklet stsOperationLastExecutionUpdateTasklet() {
|
||||||
return (contribution, chunkContext) -> {
|
return (contribution, chunkContext) -> {
|
||||||
|
String executionMode = chunkContext.getStepContext()
|
||||||
|
.getStepExecution().getJobExecution()
|
||||||
|
.getJobParameters().getString("executionMode", "NORMAL");
|
||||||
|
if ("RECOLLECT".equals(executionMode)) {
|
||||||
|
log.info(">>>>> RECOLLECT 모드 - LAST_EXECUTION 업데이트 스킵");
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
String toDateStr = chunkContext.getStepContext()
|
String toDateStr = chunkContext.getStepContext()
|
||||||
.getStepExecution().getJobExecution()
|
.getStepExecution().getJobExecution()
|
||||||
.getExecutionContext().getString("batchToDate", null);
|
.getExecutionContext().getString("batchToDate", null);
|
||||||
|
|||||||
@ -144,6 +144,14 @@ public class TerminalCallsRangeJobConfig extends BaseMultiStepJobConfig<Terminal
|
|||||||
@Bean
|
@Bean
|
||||||
public Tasklet terminalCallsLastExecutionUpdateTasklet() {
|
public Tasklet terminalCallsLastExecutionUpdateTasklet() {
|
||||||
return (contribution, chunkContext) -> {
|
return (contribution, chunkContext) -> {
|
||||||
|
String executionMode = chunkContext.getStepContext()
|
||||||
|
.getStepExecution().getJobExecution()
|
||||||
|
.getJobParameters().getString("executionMode", "NORMAL");
|
||||||
|
if ("RECOLLECT".equals(executionMode)) {
|
||||||
|
log.info(">>>>> RECOLLECT 모드 - LAST_EXECUTION 업데이트 스킵");
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
String toDateStr = chunkContext.getStepContext()
|
String toDateStr = chunkContext.getStepContext()
|
||||||
.getStepExecution().getJobExecution()
|
.getStepExecution().getJobExecution()
|
||||||
.getExecutionContext().getString("batchToDate", null);
|
.getExecutionContext().getString("batchToDate", null);
|
||||||
|
|||||||
@ -142,6 +142,14 @@ public class TransitsRangeJobConfig extends BaseMultiStepJobConfig<TransitsDto,
|
|||||||
@Bean
|
@Bean
|
||||||
public Tasklet transitsLastExecutionUpdateTasklet() {
|
public Tasklet transitsLastExecutionUpdateTasklet() {
|
||||||
return (contribution, chunkContext) -> {
|
return (contribution, chunkContext) -> {
|
||||||
|
String executionMode = chunkContext.getStepContext()
|
||||||
|
.getStepExecution().getJobExecution()
|
||||||
|
.getJobParameters().getString("executionMode", "NORMAL");
|
||||||
|
if ("RECOLLECT".equals(executionMode)) {
|
||||||
|
log.info(">>>>> RECOLLECT 모드 - LAST_EXECUTION 업데이트 스킵");
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
String toDateStr = chunkContext.getStepContext()
|
String toDateStr = chunkContext.getStepContext()
|
||||||
.getStepExecution().getJobExecution()
|
.getStepExecution().getJobExecution()
|
||||||
.getExecutionContext().getString("batchToDate", null);
|
.getExecutionContext().getString("batchToDate", null);
|
||||||
|
|||||||
@ -166,6 +166,14 @@ public class PscInspectionJobConfig extends BaseMultiStepJobConfig<PscInspection
|
|||||||
@Bean
|
@Bean
|
||||||
public Tasklet pscLastExecutionUpdateTasklet() {
|
public Tasklet pscLastExecutionUpdateTasklet() {
|
||||||
return (contribution, chunkContext) -> {
|
return (contribution, chunkContext) -> {
|
||||||
|
String executionMode = chunkContext.getStepContext()
|
||||||
|
.getStepExecution().getJobExecution()
|
||||||
|
.getJobParameters().getString("executionMode", "NORMAL");
|
||||||
|
if ("RECOLLECT".equals(executionMode)) {
|
||||||
|
log.info(">>>>> RECOLLECT 모드 - LAST_EXECUTION 업데이트 스킵");
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
String toDateStr = chunkContext.getStepContext()
|
String toDateStr = chunkContext.getStepContext()
|
||||||
.getStepExecution().getJobExecution()
|
.getStepExecution().getJobExecution()
|
||||||
.getExecutionContext().getString("batchToDate", null);
|
.getExecutionContext().getString("batchToDate", null);
|
||||||
|
|||||||
@ -158,6 +158,14 @@ public class RiskImportRangeJobConfig extends BaseMultiStepJobConfig<RiskDto, Ri
|
|||||||
@Bean
|
@Bean
|
||||||
public Tasklet riskLastExecutionUpdateTasklet() {
|
public Tasklet riskLastExecutionUpdateTasklet() {
|
||||||
return (contribution, chunkContext) -> {
|
return (contribution, chunkContext) -> {
|
||||||
|
String executionMode = chunkContext.getStepContext()
|
||||||
|
.getStepExecution().getJobExecution()
|
||||||
|
.getJobParameters().getString("executionMode", "NORMAL");
|
||||||
|
if ("RECOLLECT".equals(executionMode)) {
|
||||||
|
log.info(">>>>> RECOLLECT 모드 - LAST_EXECUTION 업데이트 스킵");
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
String toDateStr = chunkContext.getStepContext()
|
String toDateStr = chunkContext.getStepContext()
|
||||||
.getStepExecution().getJobExecution()
|
.getStepExecution().getJobExecution()
|
||||||
.getExecutionContext().getString("batchToDate", null);
|
.getExecutionContext().getString("batchToDate", null);
|
||||||
|
|||||||
@ -16,9 +16,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.springframework.batch.core.Job;
|
import org.springframework.batch.core.Job;
|
||||||
import org.springframework.batch.core.Step;
|
import org.springframework.batch.core.Step;
|
||||||
import org.springframework.batch.core.configuration.annotation.StepScope;
|
import org.springframework.batch.core.configuration.annotation.StepScope;
|
||||||
import org.springframework.batch.core.job.builder.FlowBuilder;
|
|
||||||
import org.springframework.batch.core.job.builder.JobBuilder;
|
import org.springframework.batch.core.job.builder.JobBuilder;
|
||||||
import org.springframework.batch.core.job.flow.Flow;
|
|
||||||
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
|
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
|
||||||
import org.springframework.batch.core.job.flow.JobExecutionDecider;
|
import org.springframework.batch.core.job.flow.JobExecutionDecider;
|
||||||
import org.springframework.batch.core.repository.JobRepository;
|
import org.springframework.batch.core.repository.JobRepository;
|
||||||
@ -127,27 +125,20 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
|
|||||||
protected Job createJobFlow(JobBuilder jobBuilder) {
|
protected Job createJobFlow(JobBuilder jobBuilder) {
|
||||||
return jobBuilder
|
return jobBuilder
|
||||||
.start(ShipDetailUpdateStep())
|
.start(ShipDetailUpdateStep())
|
||||||
.next(retryModeDecider())
|
.next(emptyResponseDecider())
|
||||||
.on("RETRY").end()
|
.on("EMPTY_RESPONSE").end()
|
||||||
.from(retryModeDecider()).on("EMPTY_RESPONSE").end()
|
.from(emptyResponseDecider()).on("NORMAL").to(shipDetailLastExecutionUpdateStep())
|
||||||
.from(retryModeDecider()).on("NORMAL").to(shipDetailLastExecutionUpdateStep())
|
|
||||||
.end()
|
.end()
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retry 모드 판별 Decider
|
* 응답 데이터 건수 판별 Decider
|
||||||
* executionMode가 RECOLLECT이면 RETRY, 없으면 NORMAL 반환
|
* 응답 0건이면 EMPTY_RESPONSE, 아니면 NORMAL 반환
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
public JobExecutionDecider retryModeDecider() {
|
public JobExecutionDecider emptyResponseDecider() {
|
||||||
return (jobExecution, stepExecution) -> {
|
return (jobExecution, stepExecution) -> {
|
||||||
String executionMode = jobExecution.getJobParameters().getString("executionMode");
|
|
||||||
if ("RECOLLECT".equals(executionMode)) {
|
|
||||||
log.info("[ShipDetailUpdateJob] Decider: RETRY 모드 - LAST_EXECUTION 업데이트 스킵");
|
|
||||||
return new FlowExecutionStatus("RETRY");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (stepExecution != null && stepExecution.getReadCount() == 0) {
|
if (stepExecution != null && stepExecution.getReadCount() == 0) {
|
||||||
log.info("[ShipDetailUpdateJob] Decider: EMPTY_RESPONSE - 응답 데이터 0건으로 LAST_EXECUTION 업데이트 스킵 (다음 실행 시 동일 범위 재조회)");
|
log.info("[ShipDetailUpdateJob] Decider: EMPTY_RESPONSE - 응답 데이터 0건으로 LAST_EXECUTION 업데이트 스킵 (다음 실행 시 동일 범위 재조회)");
|
||||||
return new FlowExecutionStatus("EMPTY_RESPONSE");
|
return new FlowExecutionStatus("EMPTY_RESPONSE");
|
||||||
@ -236,6 +227,14 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
|
|||||||
@Bean
|
@Bean
|
||||||
public Tasklet shipDetailLastExecutionUpdateTasklet() {
|
public Tasklet shipDetailLastExecutionUpdateTasklet() {
|
||||||
return (contribution, chunkContext) -> {
|
return (contribution, chunkContext) -> {
|
||||||
|
String executionMode = chunkContext.getStepContext()
|
||||||
|
.getStepExecution().getJobExecution()
|
||||||
|
.getJobParameters().getString("executionMode", "NORMAL");
|
||||||
|
if ("RECOLLECT".equals(executionMode)) {
|
||||||
|
log.info(">>>>> RECOLLECT 모드 - LAST_EXECUTION 업데이트 스킵");
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
String toDateStr = chunkContext.getStepContext()
|
String toDateStr = chunkContext.getStepContext()
|
||||||
.getStepExecution().getJobExecution()
|
.getStepExecution().getJobExecution()
|
||||||
.getExecutionContext().getString("batchToDate", null);
|
.getExecutionContext().getString("batchToDate", null);
|
||||||
|
|||||||
@ -123,8 +123,6 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
|
|||||||
log.info("[{}] 변경된 IMO 번호 조회 시작...", getReaderName());
|
log.info("[{}] 변경된 IMO 번호 조회 시작...", getReaderName());
|
||||||
ShipUpdateApiResponse response = callShipUpdateApi();
|
ShipUpdateApiResponse response = callShipUpdateApi();
|
||||||
List<String> fullList = extractUpdateImoNumbers(response);
|
List<String> fullList = extractUpdateImoNumbers(response);
|
||||||
allImoNumbers = new ArrayList<>(fullList);
|
|
||||||
log.info("[{}] 변경된 IMO 번호 수: {} 개", getReaderName(), response.getShipCount());
|
|
||||||
log.info("[{}] 총 {} 개의 변경된 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
|
log.info("[{}] 총 {} 개의 변경된 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -57,9 +57,28 @@ public class BatchDateService {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 현재 Step의 Job 파라미터에서 executor를 확인.
|
||||||
|
* AUTO_RETRY/MANUAL_RETRY이면 실패건 재수집이므로 기간 테이블을 사용하지 않는다.
|
||||||
|
*/
|
||||||
|
private boolean isFailedRecordRetry() {
|
||||||
|
try {
|
||||||
|
StepContext context = StepSynchronizationManager.getContext();
|
||||||
|
if (context != null && context.getStepExecution() != null) {
|
||||||
|
String executor = context.getStepExecution().getJobExecution()
|
||||||
|
.getJobParameters().getString("executor");
|
||||||
|
return "AUTO_RETRY".equals(executor) || "MANUAL_RETRY".equals(executor);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.debug("executor 파라미터 확인 실패", e);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
public Map<String, String> getDateRangeWithoutTimeParams(String apiKey) {
|
public Map<String, String> getDateRangeWithoutTimeParams(String apiKey) {
|
||||||
// 재수집 모드: batch_collection_period에서 날짜 조회
|
// 기간 재수집 모드: batch_collection_period에서 날짜 조회
|
||||||
if ("RECOLLECT".equals(getExecutionMode())) {
|
// 실패건 재수집(AUTO_RETRY/MANUAL_RETRY)은 정상 모드와 동일하게 last_success_date 기반 사용
|
||||||
|
if ("RECOLLECT".equals(getExecutionMode()) && !isFailedRecordRetry()) {
|
||||||
return getCollectionPeriodDateParams(apiKey);
|
return getCollectionPeriodDateParams(apiKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -88,8 +107,9 @@ public class BatchDateService {
|
|||||||
public Map<String, String> getDateRangeWithTimezoneParams(String apiKey, String dateParam1, String dateParam2) {
|
public Map<String, String> getDateRangeWithTimezoneParams(String apiKey, String dateParam1, String dateParam2) {
|
||||||
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSX");
|
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSX");
|
||||||
|
|
||||||
// 재수집 모드: batch_collection_period에서 날짜 조회
|
// 기간 재수집 모드: batch_collection_period에서 날짜 조회
|
||||||
if ("RECOLLECT".equals(getExecutionMode())) {
|
// 실패건 재수집(AUTO_RETRY/MANUAL_RETRY)은 정상 모드와 동일하게 last_success_date 기반 사용
|
||||||
|
if ("RECOLLECT".equals(getExecutionMode()) && !isFailedRecordRetry()) {
|
||||||
return getCollectionPeriodTimezoneParams(apiKey, dateParam1, dateParam2, formatter);
|
return getCollectionPeriodTimezoneParams(apiKey, dateParam1, dateParam2, formatter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -2,7 +2,6 @@ package com.snp.batch.service;
|
|||||||
|
|
||||||
import com.snp.batch.global.dto.JobExecutionDetailDto;
|
import com.snp.batch.global.dto.JobExecutionDetailDto;
|
||||||
import com.snp.batch.global.model.BatchCollectionPeriod;
|
import com.snp.batch.global.model.BatchCollectionPeriod;
|
||||||
import com.snp.batch.global.model.BatchLastExecution;
|
|
||||||
import com.snp.batch.global.model.BatchRecollectionHistory;
|
import com.snp.batch.global.model.BatchRecollectionHistory;
|
||||||
import com.snp.batch.global.model.BatchFailedRecord;
|
import com.snp.batch.global.model.BatchFailedRecord;
|
||||||
import com.snp.batch.global.model.JobDisplayNameEntity;
|
import com.snp.batch.global.model.JobDisplayNameEntity;
|
||||||
@ -482,33 +481,6 @@ public class RecollectionHistoryService {
|
|||||||
.orElse(null);
|
.orElse(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 재수집 실행 전: 현재 last_success_date 조회 (복원용)
|
|
||||||
*/
|
|
||||||
@Transactional(readOnly = true)
|
|
||||||
public LocalDateTime getLastSuccessDate(String apiKey) {
|
|
||||||
return lastExecutionRepository.findById(apiKey)
|
|
||||||
.map(BatchLastExecution::getLastSuccessDate)
|
|
||||||
.orElse(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 재수집 실행 후: Tasklet이 업데이트한 last_success_date를 원래 값으로 복원
|
|
||||||
* 재수집은 과거 데이터 재처리이므로 last_success_date를 변경하면 안 됨
|
|
||||||
*/
|
|
||||||
@Transactional(propagation = Propagation.REQUIRES_NEW)
|
|
||||||
public void restoreLastSuccessDate(String apiKey, LocalDateTime originalDate) {
|
|
||||||
if (originalDate == null) return;
|
|
||||||
lastExecutionRepository.findById(apiKey).ifPresent(lastExec -> {
|
|
||||||
LocalDateTime beforeDate = lastExec.getLastSuccessDate();
|
|
||||||
lastExec.setLastSuccessDate(originalDate);
|
|
||||||
lastExec.setUpdatedAt(LocalDateTime.now());
|
|
||||||
lastExecutionRepository.save(lastExec);
|
|
||||||
log.info("[RecollectionHistory] last_success_date 복원: apiKey={}, before={}, after={}",
|
|
||||||
apiKey, beforeDate, originalDate);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 수집 기간 전체 조회
|
* 수집 기간 전체 조회
|
||||||
*/
|
*/
|
||||||
|
|||||||
불러오는 중...
Reference in New Issue
Block a user