diff --git a/frontend/src/api/batchApi.ts b/frontend/src/api/batchApi.ts index 35b9e89..18a186b 100644 --- a/frontend/src/api/batchApi.ts +++ b/frontend/src/api/batchApi.ts @@ -103,6 +103,7 @@ export interface StepExecutionDto { duration: number | null; apiCallInfo: ApiCallInfo | null; apiLogSummary: StepApiLogSummary | null; + failedRecords?: FailedRecordDto[] | null; } export interface ApiLogEntryDto { @@ -137,6 +138,16 @@ export interface ApiLogPageResponse { export type ApiLogStatus = 'ALL' | 'SUCCESS' | 'ERROR'; +export interface FailedRecordDto { + id: number; + jobName: string; + recordKey: string; + errorMessage: string | null; + retryCount: number; + status: string; + createdAt: string; +} + export interface JobExecutionDetailDto { executionId: number; jobName: string; @@ -330,6 +341,15 @@ export const batchApi = { `${BASE}/jobs/${jobName}/execute${qs}`); }, + retryFailedRecords: (jobName: string, recordKeys: string[], stepExecutionId: number) => { + const qs = new URLSearchParams({ + retryRecordKeys: recordKeys.join(','), + sourceStepExecutionId: String(stepExecutionId), + }); + return postJson<{ success: boolean; message: string; executionId?: number }>( + `${BASE}/jobs/${jobName}/execute?${qs.toString()}`); + }, + getJobExecutions: (jobName: string) => fetchJson(`${BASE}/jobs/${jobName}/executions`), diff --git a/frontend/src/pages/ExecutionDetail.tsx b/frontend/src/pages/ExecutionDetail.tsx index 81b1eef..34fd036 100644 --- a/frontend/src/pages/ExecutionDetail.tsx +++ b/frontend/src/pages/ExecutionDetail.tsx @@ -1,6 +1,6 @@ import { useState, useCallback, useEffect } from 'react'; import { useParams, useSearchParams, useNavigate } from 'react-router-dom'; -import { batchApi, type JobExecutionDetailDto, type StepExecutionDto, type ApiLogPageResponse, type ApiLogStatus } from '../api/batchApi'; +import { batchApi, type JobExecutionDetailDto, type StepExecutionDto, type FailedRecordDto, type ApiLogPageResponse, type ApiLogStatus } from '../api/batchApi'; import { formatDateTime, formatDuration, calculateDuration } from '../utils/formatters'; import { usePoller } from '../hooks/usePoller'; import StatusBadge from '../components/StatusBadge'; @@ -264,9 +264,10 @@ function ApiLogSection({ stepExecutionId, summary }: ApiLogSectionProps) { interface StepCardProps { step: StepExecutionDto; + jobName: string; } -function StepCard({ step }: StepCardProps) { +function StepCard({ step, jobName }: StepCardProps) { const stats = [ { label: '읽기', value: step.readCount }, { label: '쓰기', value: step.writeCount }, @@ -380,6 +381,11 @@ function StepCard({ step }: StepCardProps) { )} + {/* 호출 실패 데이터 토글 */} + {step.failedRecords && step.failedRecords.length > 0 && ( + + )} + {step.exitMessage && (

Exit Message

@@ -578,6 +584,7 @@ export default function ExecutionDetail() { ))}
@@ -587,6 +594,161 @@ export default function ExecutionDetail() { ); } +function FailedRecordsToggle({ records, jobName, stepExecutionId }: { records: FailedRecordDto[]; jobName: string; stepExecutionId: number }) { + const [open, setOpen] = useState(false); + const [showConfirm, setShowConfirm] = useState(false); + const [retrying, setRetrying] = useState(false); + const navigate = useNavigate(); + + const failedRecords = records.filter((r) => r.status === 'FAILED'); + + const statusColor = (status: string) => { + switch (status) { + case 'RESOLVED': return 'text-emerald-600 bg-emerald-50'; + case 'RETRY_PENDING': return 'text-amber-600 bg-amber-50'; + default: return 'text-red-600 bg-red-50'; + } + }; + + const handleRetry = async () => { + setRetrying(true); + try { + const keys = failedRecords.map((r) => r.recordKey); + const result = await batchApi.retryFailedRecords(jobName, keys, stepExecutionId); + if (result.success && result.executionId) { + setShowConfirm(false); + navigate(`/executions/${result.executionId}`); + } + } catch { + alert('재수집 실행에 실패했습니다.'); + } finally { + setRetrying(false); + } + }; + + return ( +
+
+ + + {failedRecords.length > 0 && ( + + )} +
+ + {open && ( +
+ + + + + + + + + + + + {records.map((record) => ( + + + + + + + + ))} + +
Record Key에러 메시지재시도상태생성 시간
+ {record.recordKey} + + {record.errorMessage || '-'} + + {record.retryCount} + + + {record.status} + + + {formatDateTime(record.createdAt)} +
+
+ )} + + {/* 확인 다이얼로그 */} + {showConfirm && ( +
+
+

+ 실패 건 재수집 확인 +

+

+ 다음 {failedRecords.length}건의 IMO에 대해 재수집을 실행합니다. +

+
+
+ {failedRecords.map((r) => ( + + {r.recordKey} + + ))} +
+
+
+ + +
+
+
+ )} +
+ ); +} + function InfoItem({ label, value }: { label: string; value: string }) { return (
diff --git a/frontend/src/pages/RecollectDetail.tsx b/frontend/src/pages/RecollectDetail.tsx index 07c7d6e..0524c19 100644 --- a/frontend/src/pages/RecollectDetail.tsx +++ b/frontend/src/pages/RecollectDetail.tsx @@ -4,6 +4,7 @@ import { batchApi, type RecollectionDetailResponse, type StepExecutionDto, + type FailedRecordDto, type ApiLogPageResponse, type ApiLogStatus, } from '../api/batchApi'; @@ -266,7 +267,7 @@ function ApiLogSection({ stepExecutionId, summary }: ApiLogSectionProps) { ); } -function StepCard({ step }: { step: StepExecutionDto }) { +function StepCard({ step, jobName }: { step: StepExecutionDto; jobName: string }) { const stats = [ { label: '읽기', value: step.readCount }, { label: '쓰기', value: step.writeCount }, @@ -358,6 +359,11 @@ function StepCard({ step }: { step: StepExecutionDto }) {
)} + {/* 호출 실패 데이터 토글 */} + {step.failedRecords && step.failedRecords.length > 0 && ( + + )} + {step.exitMessage && (

Exit Message

@@ -630,7 +636,7 @@ export default function RecollectDetail() { ) : (
{stepExecutions.map((step) => ( - + ))}
)} @@ -639,6 +645,161 @@ export default function RecollectDetail() { ); } +function FailedRecordsToggle({ records, jobName, stepExecutionId }: { records: FailedRecordDto[]; jobName: string; stepExecutionId: number }) { + const [open, setOpen] = useState(false); + const [showConfirm, setShowConfirm] = useState(false); + const [retrying, setRetrying] = useState(false); + const navigate = useNavigate(); + + const failedRecords = records.filter((r) => r.status === 'FAILED'); + + const statusColor = (status: string) => { + switch (status) { + case 'RESOLVED': return 'text-emerald-600 bg-emerald-50'; + case 'RETRY_PENDING': return 'text-amber-600 bg-amber-50'; + default: return 'text-red-600 bg-red-50'; + } + }; + + const handleRetry = async () => { + setRetrying(true); + try { + const keys = failedRecords.map((r) => r.recordKey); + const result = await batchApi.retryFailedRecords(jobName, keys, stepExecutionId); + if (result.success && result.executionId) { + setShowConfirm(false); + navigate(`/executions/${result.executionId}`); + } + } catch { + alert('재수집 실행에 실패했습니다.'); + } finally { + setRetrying(false); + } + }; + + return ( +
+
+ + + {failedRecords.length > 0 && ( + + )} +
+ + {open && ( +
+ + + + + + + + + + + + {records.map((record) => ( + + + + + + + + ))} + +
Record Key에러 메시지재시도상태생성 시간
+ {record.recordKey} + + {record.errorMessage || '-'} + + {record.retryCount} + + + {record.status} + + + {formatDateTime(record.createdAt)} +
+
+ )} + + {/* 확인 다이얼로그 */} + {showConfirm && ( +
+
+

+ 실패 건 재수집 확인 +

+

+ 다음 {failedRecords.length}건의 IMO에 대해 재수집을 실행합니다. +

+
+
+ {failedRecords.map((r) => ( + + {r.recordKey} + + ))} +
+
+
+ + +
+
+
+ )} +
+ ); +} + function InfoItem({ label, value }: { label: string; value: string }) { return (
diff --git a/src/main/java/com/snp/batch/common/batch/reader/BaseApiReader.java b/src/main/java/com/snp/batch/common/batch/reader/BaseApiReader.java index 03e813b..e3b64d2 100644 --- a/src/main/java/com/snp/batch/common/batch/reader/BaseApiReader.java +++ b/src/main/java/com/snp/batch/common/batch/reader/BaseApiReader.java @@ -89,6 +89,14 @@ public abstract class BaseApiReader implements ItemReader { this.jobExecutionId = jobExecutionId; this.stepExecutionId = stepExecutionId; } + + protected Long getJobExecutionId() { + return this.jobExecutionId; + } + + protected Long getStepExecutionId() { + return this.stepExecutionId; + } /** * 기본 생성자 (WebClient 없이 사용 - Mock 데이터용) */ diff --git a/src/main/java/com/snp/batch/global/config/MaritimeApiWebClientConfig.java b/src/main/java/com/snp/batch/global/config/MaritimeApiWebClientConfig.java index d05a2f9..0321ab3 100644 --- a/src/main/java/com/snp/batch/global/config/MaritimeApiWebClientConfig.java +++ b/src/main/java/com/snp/batch/global/config/MaritimeApiWebClientConfig.java @@ -1,10 +1,15 @@ package com.snp.batch.global.config; +import io.netty.channel.ChannelOption; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.web.reactive.function.client.WebClient; +import reactor.netty.http.client.HttpClient; + +import java.time.Duration; /** * Maritime API WebClient 설정 @@ -59,12 +64,17 @@ public class MaritimeApiWebClientConfig { log.info("Base URL: {}", maritimeApiUrl); log.info("========================================"); + HttpClient httpClient = HttpClient.create() + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) // 연결 타임아웃 10초 + .responseTimeout(Duration.ofSeconds(60)); // 응답 대기 60초 + return WebClient.builder() + .clientConnector(new ReactorClientHttpConnector(httpClient)) .baseUrl(maritimeApiUrl) .defaultHeaders(headers -> headers.setBasicAuth(maritimeApiUsername, maritimeApiPassword)) .codecs(configurer -> configurer .defaultCodecs() - .maxInMemorySize(100 * 1024 * 1024)) // 30MB 버퍼 + .maxInMemorySize(100 * 1024 * 1024)) // 100MB 버퍼 .build(); } diff --git a/src/main/java/com/snp/batch/global/dto/JobExecutionDetailDto.java b/src/main/java/com/snp/batch/global/dto/JobExecutionDetailDto.java index d45b44e..7441a35 100644 --- a/src/main/java/com/snp/batch/global/dto/JobExecutionDetailDto.java +++ b/src/main/java/com/snp/batch/global/dto/JobExecutionDetailDto.java @@ -70,6 +70,7 @@ public class JobExecutionDetailDto { private Long duration; // 실행 시간 (ms) private ApiCallInfo apiCallInfo; // API 호출 정보 - StepExecutionContext 기반 (옵셔널) private StepApiLogSummary apiLogSummary; // API 호출 로그 요약 - batch_api_log 기반 (옵셔널) + private List failedRecords; // 실패 레코드 (옵셔널) } /** @@ -107,6 +108,23 @@ public class JobExecutionDetailDto { private Long totalRecordCount; // 총 반환 건수 } + /** + * 실패 레코드 DTO + */ + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + public static class FailedRecordDto { + private Long id; + private String jobName; + private String recordKey; + private String errorMessage; + private Integer retryCount; + private String status; + private LocalDateTime createdAt; + } + /** * API 호출 로그 페이징 응답 */ diff --git a/src/main/java/com/snp/batch/global/model/BatchFailedRecord.java b/src/main/java/com/snp/batch/global/model/BatchFailedRecord.java new file mode 100644 index 0000000..8fd38cf --- /dev/null +++ b/src/main/java/com/snp/batch/global/model/BatchFailedRecord.java @@ -0,0 +1,46 @@ +package com.snp.batch.global.model; + +import jakarta.persistence.*; +import lombok.*; +import org.hibernate.annotations.CreationTimestamp; + +import java.time.LocalDateTime; + +@Entity +@Table(name = "batch_failed_record", schema = "t_std_snp_data") +@Getter +@NoArgsConstructor(access = AccessLevel.PROTECTED) +@AllArgsConstructor +@Builder +public class BatchFailedRecord { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + @Column(nullable = false) + private String jobName; + + private Long jobExecutionId; + + private Long stepExecutionId; + + @Column(nullable = false) + private String recordKey; + + @Column(columnDefinition = "TEXT") + private String errorMessage; + + @Column(nullable = false) + private Integer retryCount; + + @Column(nullable = false, length = 20) + @Builder.Default + private String status = "FAILED"; + + @CreationTimestamp + @Column(updatable = false) + private LocalDateTime createdAt; + + private LocalDateTime resolvedAt; +} diff --git a/src/main/java/com/snp/batch/global/repository/BatchFailedRecordRepository.java b/src/main/java/com/snp/batch/global/repository/BatchFailedRecordRepository.java new file mode 100644 index 0000000..cde392c --- /dev/null +++ b/src/main/java/com/snp/batch/global/repository/BatchFailedRecordRepository.java @@ -0,0 +1,48 @@ +package com.snp.batch.global.repository; + +import com.snp.batch.global.model.BatchFailedRecord; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; +import org.springframework.stereotype.Repository; + +import java.time.LocalDateTime; +import java.util.List; + +@Repository +public interface BatchFailedRecordRepository extends JpaRepository { + + /** + * 특정 Job의 상태별 실패 건 조회 + */ + List findByJobNameAndStatus(String jobName, String status); + + /** + * 실행별 실패 레코드 조회 + */ + List findByJobExecutionId(Long jobExecutionId); + + /** + * Step별 실패 레코드 조회 + */ + List findByStepExecutionId(Long stepExecutionId); + + /** + * 실행별 실패 건수 + */ + long countByJobExecutionId(Long jobExecutionId); + + /** + * 특정 Step 실행의 실패 레코드를 RESOLVED로 벌크 업데이트 + */ + @Modifying + @Query("UPDATE BatchFailedRecord r SET r.status = 'RESOLVED', r.resolvedAt = :resolvedAt " + + "WHERE r.jobName = :jobName AND r.stepExecutionId = :stepExecutionId " + + "AND r.recordKey IN :recordKeys AND r.status = 'FAILED'") + int resolveByStepExecutionIdAndRecordKeys( + @Param("jobName") String jobName, + @Param("stepExecutionId") Long stepExecutionId, + @Param("recordKeys") List recordKeys, + @Param("resolvedAt") LocalDateTime resolvedAt); +} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java index 4862631..5b05301 100644 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java @@ -9,11 +9,16 @@ import com.snp.batch.jobs.shipdetail.batch.reader.ShipDetailUpdateDataReader; import com.snp.batch.jobs.shipdetail.batch.writer.ShipDetailDataWriter; import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; +import com.snp.batch.service.BatchFailedRecordService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.job.flow.Flow; +import org.springframework.batch.core.job.flow.FlowExecutionStatus; +import org.springframework.batch.core.job.flow.JobExecutionDecider; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.core.step.tasklet.Tasklet; @@ -29,6 +34,9 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.web.reactive.function.client.WebClient; +import java.util.Arrays; +import java.util.List; + @Slf4j @Configuration public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig { @@ -41,6 +49,7 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig { + String retryKeys = jobExecution.getJobParameters().getString("retryRecordKeys"); + if (retryKeys != null && !retryKeys.isBlank()) { + log.info("[ShipDetailUpdateJob] Decider: RETRY 모드 - LAST_EXECUTION 업데이트 스킵"); + return new FlowExecutionStatus("RETRY"); + } + log.info("[ShipDetailUpdateJob] Decider: NORMAL 모드 - LAST_EXECUTION 업데이트 진행"); + return new FlowExecutionStatus("NORMAL"); + }; + } + @Bean @StepScope public ShipDetailUpdateDataReader shipDetailUpdateDataReader( - @Value("#{stepExecution.jobExecution.id}") Long jobExecutionId, // SpEL로 ID 추출 - @Value("#{stepExecution.id}") Long stepExecutionId + @Value("#{stepExecution.jobExecution.id}") Long jobExecutionId, + @Value("#{stepExecution.id}") Long stepExecutionId, + @Value("#{jobParameters['retryRecordKeys']}") String retryRecordKeysParam, + @Value("#{jobParameters['sourceStepExecutionId']}") String sourceStepExecutionIdParam ) { - ShipDetailUpdateDataReader reader = new ShipDetailUpdateDataReader(maritimeApiWebClient, jdbcTemplate, objectMapper, batchDateService, batchApiLogService, maritimeApiUrl); - reader.setExecutionIds(jobExecutionId, stepExecutionId); // ID 세팅 + ShipDetailUpdateDataReader reader = new ShipDetailUpdateDataReader( + maritimeApiWebClient, jdbcTemplate, objectMapper, + batchDateService, batchApiLogService, batchFailedRecordService, maritimeApiUrl, + shipDetailBatchSize, delayOnSuccessMs, delayOnFailureMs, maxRetryCount + ); + reader.setExecutionIds(jobExecutionId, stepExecutionId); + + // Retry 모드: retryRecordKeys 파라미터가 있으면 주입 + if (retryRecordKeysParam != null && !retryRecordKeysParam.isBlank()) { + List retryKeys = Arrays.stream(retryRecordKeysParam.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .toList(); + reader.setRetryRecordKeys(retryKeys); + + if (sourceStepExecutionIdParam != null && !sourceStepExecutionIdParam.isBlank()) { + reader.setSourceStepExecutionId(Long.parseLong(sourceStepExecutionIdParam)); + } + + log.info("[ShipDetailUpdateJob] Retry 모드 활성화: {} 건의 IMO 대상, sourceStepExecutionId: {}", + retryKeys.size(), sourceStepExecutionIdParam); + } + return reader; } diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipDetailUpdateDataReader.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipDetailUpdateDataReader.java index d070ab4..14c19b8 100644 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipDetailUpdateDataReader.java +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipDetailUpdateDataReader.java @@ -9,6 +9,7 @@ import com.snp.batch.jobs.shipdetail.batch.dto.ShipResultDto; import com.snp.batch.jobs.shipdetail.batch.dto.ShipUpdateApiResponse; import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; +import com.snp.batch.service.BatchFailedRecordService; import lombok.extern.slf4j.Slf4j; import org.springframework.core.ParameterizedTypeReference; import org.springframework.jdbc.core.JdbcTemplate; @@ -20,85 +21,130 @@ import java.util.stream.Collectors; @Slf4j public class ShipDetailUpdateDataReader extends BaseApiReader { - private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 + private final BatchDateService batchDateService; private final BatchApiLogService batchApiLogService; + private final BatchFailedRecordService batchFailedRecordService; private final String maritimeApiUrl; private final JdbcTemplate jdbcTemplate; private final ObjectMapper objectMapper; - // 배치 처리 상태 + // 외부 설정값 + private final int batchSize; + private final long delayOnSuccessMs; + private final long delayOnFailureMs; + private final int maxRetryCount; + + // 배치 처리 상태 private List allImoNumbers; - // DB 해시값을 저장할 맵 private int currentBatchIndex = 0; - private final int batchSize = 20; - public ShipDetailUpdateDataReader(WebClient webClient, JdbcTemplate jdbcTemplate, ObjectMapper objectMapper,BatchDateService batchDateService, BatchApiLogService batchApiLogService, String maritimeApiUrl) { + + // 실패 IMO 추적 + private final List failedImoNumbers = new ArrayList<>(); + private String lastErrorMessage; + + // Retry 모드 + private List retryRecordKeys; + private Long sourceStepExecutionId; + + public ShipDetailUpdateDataReader( + WebClient webClient, + JdbcTemplate jdbcTemplate, + ObjectMapper objectMapper, + BatchDateService batchDateService, + BatchApiLogService batchApiLogService, + BatchFailedRecordService batchFailedRecordService, + String maritimeApiUrl, + int batchSize, + long delayOnSuccessMs, + long delayOnFailureMs, + int maxRetryCount + ) { super(webClient); this.jdbcTemplate = jdbcTemplate; this.objectMapper = objectMapper; this.batchDateService = batchDateService; this.batchApiLogService = batchApiLogService; + this.batchFailedRecordService = batchFailedRecordService; this.maritimeApiUrl = maritimeApiUrl; - enableChunkMode(); // ✨ Chunk 모드 활성화 + this.batchSize = batchSize; + this.delayOnSuccessMs = delayOnSuccessMs; + this.delayOnFailureMs = delayOnFailureMs; + this.maxRetryCount = maxRetryCount; + enableChunkMode(); + } + + public void setRetryRecordKeys(List retryRecordKeys) { + this.retryRecordKeys = retryRecordKeys; + } + + public void setSourceStepExecutionId(Long sourceStepExecutionId) { + this.sourceStepExecutionId = sourceStepExecutionId; + } + + private boolean isRetryMode() { + return retryRecordKeys != null && !retryRecordKeys.isEmpty(); } @Override protected String getReaderName() { return "ShipDetailUpdateDataReader"; } - protected String getShipUpdateApiPath(){ return "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipChangesByLastUpdateDateRange"; } + + protected String getShipUpdateApiPath() { + return "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipChangesByLastUpdateDateRange"; + } + @Override protected String getApiPath() { return "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipsByIHSLRorIMONumbersAll"; } - protected String getApiKey() {return "SHIP_DETAIL_UPDATE_API";} + + protected String getApiKey() { + return "SHIP_DETAIL_UPDATE_API"; + } @Override protected void resetCustomState() { this.currentBatchIndex = 0; this.allImoNumbers = null; + this.failedImoNumbers.clear(); + this.lastErrorMessage = null; + // retryRecordKeys는 JobConfig에서 주입하므로 초기화하지 않음 } - /** - * 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회 - */ @Override protected void beforeFetch() { - // 💡 Step 1. 기간 내 변경된 IMO 번호 리스트 조회 - log.info("[{}] 변경된 IMO 번호 조회 시작...", getReaderName()); - ShipUpdateApiResponse response = callShipUpdateApi(); - allImoNumbers = extractUpdateImoNumbers(response); - log.info("[{}] 변경된 IMO 번호 수: {} 개", getReaderName(), response.getShipCount()); + if (isRetryMode()) { + log.info("[{}] [RETRY MODE] 실패 건 재수집 모드 시작 - 대상 IMO: {} 건", + getReaderName(), retryRecordKeys.size()); + allImoNumbers = new ArrayList<>(retryRecordKeys); + log.info("[{}] [RETRY MODE] IMO 목록: {}", getReaderName(), allImoNumbers); + } else { + log.info("[{}] 변경된 IMO 번호 조회 시작...", getReaderName()); + ShipUpdateApiResponse response = callShipUpdateApi(); + allImoNumbers = extractUpdateImoNumbers(response); + log.info("[{}] 변경된 IMO 번호 수: {} 개", getReaderName(), response.getShipCount()); + log.info("[{}] 총 {} 개의 변경된 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size()); + } int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); - log.info("[{}] 총 {} 개의 변경된 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size()); - log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize); + log.info("[{}] 설정: batch-size={}, delay-success={}ms, delay-failure={}ms, max-retry={}", + getReaderName(), batchSize, delayOnSuccessMs, delayOnFailureMs, maxRetryCount); log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches); - // API 통계 초기화 updateApiCallStats(totalBatches, 0); } - /** - * ✨ Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환 - * - * Spring Batch가 100건씩 read() 호출 완료 후 이 메서드 재호출 - * - * @return 다음 배치 100건 (더 이상 없으면 null) - */ @Override protected List fetchNextBatch() throws Exception { - // 모든 배치 처리 완료 확인 if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) { - return null; // Job 종료 + return null; } - // 현재 배치의 시작/끝 인덱스 계산 int startIndex = currentBatchIndex; int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size()); - - // 현재 배치의 IMO 번호 추출 (100개) List currentBatch = allImoNumbers.subList(startIndex, endIndex); int currentBatchNumber = (currentBatchIndex / batchSize) + 1; @@ -107,65 +153,95 @@ public class ShipDetailUpdateDataReader extends BaseApiReader { log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...", getReaderName(), currentBatchNumber, totalBatches, currentBatch.size()); - try { - // IMO 번호를 쉼표로 연결 (예: "1000019,1000021,1000033,...") - String imoParam = String.join(",", currentBatch); + // 다음 배치로 인덱스 이동 (성공/실패 무관하게 진행) + currentBatchIndex = endIndex; - // API 호출 - ShipDetailApiResponse response = callApiWithBatch(imoParam); + String imoParam = String.join(",", currentBatch); - // 다음 배치로 인덱스 이동 - currentBatchIndex = endIndex; + // Retry with exponential backoff + ShipDetailApiResponse response = callApiWithRetry(imoParam, currentBatch, currentBatchNumber, totalBatches); - // 응답 처리 - if (response != null && response.getShipResult() != null) { + // API 호출 통계 업데이트 + updateApiCallStats(totalBatches, currentBatchNumber); - List shipDetailDtoList = response.getShipResult().stream() - .map(ShipResultDto::getShipDetails) // result -> result.getShipDetail() - .filter(Objects::nonNull) // 데이터가 없는 경우 제외 - .collect(Collectors.toList()); + if (response != null && response.getShipResult() != null) { + List shipDetailDtoList = response.getShipResult().stream() + .map(ShipResultDto::getShipDetails) + .filter(Objects::nonNull) + .collect(Collectors.toList()); - log.info("[{}] 배치 {}/{} 완료: {} 건 조회", - getReaderName(), currentBatchNumber, totalBatches, shipDetailDtoList.size()); + log.info("[{}] 배치 {}/{} 완료: {} 건 조회", + getReaderName(), currentBatchNumber, totalBatches, shipDetailDtoList.size()); - // API 호출 통계 업데이트 - updateApiCallStats(totalBatches, currentBatchNumber); + // 성공 시 딜레이 + sleepIfNeeded(delayOnSuccessMs); - // API 과부하 방지 (다음 배치 전 0.5초 대기) - if (currentBatchIndex < allImoNumbers.size()) { - Thread.sleep(500); - } - - return shipDetailDtoList; - - } else { - log.warn("[{}] 배치 {}/{} 응답 없음", - getReaderName(), currentBatchNumber, totalBatches); - - // API 호출 통계 업데이트 (실패도 카운트) - updateApiCallStats(totalBatches, currentBatchNumber); - - return Collections.emptyList(); - } - - } catch (Exception e) { - log.error("[{}] 배치 {}/{} 처리 중 오류: {}", - getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e); - - // 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용) - currentBatchIndex = endIndex; - - // 빈 리스트 반환 (Job 계속 진행) + return shipDetailDtoList; + } else { + log.warn("[{}] 배치 {}/{} 응답 없음", getReaderName(), currentBatchNumber, totalBatches); return Collections.emptyList(); } } /** - * Query Parameter를 사용한 API 호출 - * - * @param imoNumbers 쉼표로 연결된 IMO 번호 (예: "1000019,1000021,...") - * @return API 응답 + * Retry with exponential backoff + * 최대 maxRetryCount 회 재시도, 대기: 2초 → 4초 → 8초 */ + private ShipDetailApiResponse callApiWithRetry( + String imoParam, + List currentBatch, + int currentBatchNumber, + int totalBatches + ) { + Exception lastException = null; + + for (int attempt = 1; attempt <= maxRetryCount; attempt++) { + try { + ShipDetailApiResponse response = callApiWithBatch(imoParam); + + if (attempt > 1) { + log.info("[{}] 배치 {}/{} 재시도 {}/{} 성공", + getReaderName(), currentBatchNumber, totalBatches, attempt, maxRetryCount); + } + + return response; + } catch (Exception e) { + lastException = e; + log.warn("[{}] 배치 {}/{} 재시도 {}/{} 실패: {}", + getReaderName(), currentBatchNumber, totalBatches, + attempt, maxRetryCount, e.getMessage()); + + if (attempt < maxRetryCount) { + long backoffMs = delayOnFailureMs * (1L << (attempt - 1)); // 2s, 4s, 8s + log.info("[{}] {}ms 후 재시도...", getReaderName(), backoffMs); + sleepIfNeeded(backoffMs); + } + } + } + + // 모든 재시도 실패 - 실패 IMO 기록 + failedImoNumbers.addAll(currentBatch); + lastErrorMessage = lastException != null ? lastException.getMessage() : "unknown"; + log.error("[{}] 배치 {}/{} 최종 실패 ({}회 재시도 소진). 실패 IMO {} 건 기록: {}", + getReaderName(), currentBatchNumber, totalBatches, maxRetryCount, + currentBatch.size(), lastErrorMessage); + + // 실패 후 딜레이 + sleepIfNeeded(delayOnFailureMs); + + return null; + } + + private void sleepIfNeeded(long ms) { + if (currentBatchIndex < allImoNumbers.size()) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + private ShipDetailApiResponse callApiWithBatch(String imoNumbers) { Map params = new HashMap<>(); params.put("IMONumbers", imoNumbers); @@ -176,12 +252,11 @@ public class ShipDetailUpdateDataReader extends BaseApiReader { params, new ParameterizedTypeReference() {}, batchApiLogService, - res -> res.getShipResult() != null ? (long) res.getShipResult().size() : 0L // 람다 적용 + res -> res.getShipResult() != null ? (long) res.getShipResult().size() : 0L ); } - private ShipUpdateApiResponse callShipUpdateApi(){ - // 1. BatchDateService를 통해 동적 날짜 파라미터 맵 조회 + private ShipUpdateApiResponse callShipUpdateApi() { Map params = batchDateService.getDateRangeWithoutTimeParams(getApiKey()); return executeSingleApiCall( maritimeApiUrl, @@ -189,7 +264,7 @@ public class ShipDetailUpdateDataReader extends BaseApiReader { params, new ParameterizedTypeReference() {}, batchApiLogService, - res -> res.getShips() != null ? (long) res.getShips().size() : 0L // 람다 적용 + res -> res.getShips() != null ? (long) res.getShips().size() : 0L ); } @@ -197,29 +272,68 @@ public class ShipDetailUpdateDataReader extends BaseApiReader { if (response.getShips() == null) { return Collections.emptyList(); } - return response.getShips() .stream() - // ShipDto 객체에서 imoNumber 필드 (String 타입)를 추출 + return response.getShips().stream() .map(ShipDto::getImoNumber) - // IMO 번호가 null이 아닌 경우만 필터링 (선택 사항이지만 안전성을 위해) .filter(imoNumber -> imoNumber != null) - // 추출된 String imoNumber들을 List으로 수집 .collect(Collectors.toList()); } @Override protected void afterFetch(List data) { int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); - try{ + try { if (data == null) { log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches); log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료", getReaderName(), allImoNumbers.size()); + + if (isRetryMode()) { + // Retry 모드: 성공 건 RESOLVED 처리 + 재실패 건 새 FAILED 레코드 저장 + log.info("[{}] [RETRY MODE] 재수집 결과 처리 시작 (sourceStepExecutionId: {})", + getReaderName(), sourceStepExecutionId); + batchFailedRecordService.resolveSuccessfulRetries( + "ShipDetailUpdateJob", sourceStepExecutionId, retryRecordKeys, failedImoNumbers); + + if (!failedImoNumbers.isEmpty()) { + log.warn("[{}] [RETRY MODE] 재실패 IMO 건수: {} 건", getReaderName(), failedImoNumbers.size()); + batchFailedRecordService.saveFailedRecords( + "ShipDetailUpdateJob", + getJobExecutionId(), + getStepExecutionId(), + failedImoNumbers, + maxRetryCount, + lastErrorMessage + ); + } else { + log.info("[{}] [RETRY MODE] 모든 재수집 건 정상 처리 완료", getReaderName()); + } + + int successCount = retryRecordKeys.size() - failedImoNumbers.size(); + log.info("[{}] [RETRY MODE] 결과: 성공 {} 건, 재실패 {} 건", + getReaderName(), successCount, failedImoNumbers.size()); + } else { + // 일반 모드: 기존 로직 + if (!failedImoNumbers.isEmpty()) { + log.warn("[{}] 최종 실패 IMO 건수: {} 건", getReaderName(), failedImoNumbers.size()); + log.warn("[{}] 실패 IMO 목록: {}", getReaderName(), failedImoNumbers); + + batchFailedRecordService.saveFailedRecords( + "ShipDetailUpdateJob", + getJobExecutionId(), + getStepExecutionId(), + failedImoNumbers, + maxRetryCount, + lastErrorMessage + ); + } else { + log.info("[{}] 모든 배치 정상 처리 완료 (실패 건 없음)", getReaderName()); + } + } } - }catch (Exception e){ + } catch (Exception e) { log.info("[{}] 전체 {} 개 배치 처리 실패", getReaderName(), totalBatches); log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료", getReaderName(), allImoNumbers.size()); } } - } diff --git a/src/main/java/com/snp/batch/service/BatchFailedRecordService.java b/src/main/java/com/snp/batch/service/BatchFailedRecordService.java new file mode 100644 index 0000000..c125c60 --- /dev/null +++ b/src/main/java/com/snp/batch/service/BatchFailedRecordService.java @@ -0,0 +1,74 @@ +package com.snp.batch.service; + +import com.snp.batch.global.model.BatchFailedRecord; +import com.snp.batch.global.repository.BatchFailedRecordRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.util.List; + +@Service +@RequiredArgsConstructor +@Slf4j +public class BatchFailedRecordService { + + private final BatchFailedRecordRepository batchFailedRecordRepository; + + /** + * 실패한 레코드 목록을 비동기로 DB에 저장합니다. + * REQUIRES_NEW를 사용하여 메인 배치 트랜잭션과 독립적으로 저장합니다. + */ + @Async("apiLogExecutor") + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void saveFailedRecords( + String jobName, + Long jobExecutionId, + Long stepExecutionId, + List failedRecordKeys, + int retryCount, + String errorMessage + ) { + try { + List records = failedRecordKeys.stream() + .map(recordKey -> BatchFailedRecord.builder() + .jobName(jobName) + .jobExecutionId(jobExecutionId) + .stepExecutionId(stepExecutionId) + .recordKey(recordKey) + .errorMessage(errorMessage) + .retryCount(retryCount) + .status("FAILED") + .build()) + .toList(); + + batchFailedRecordRepository.saveAll(records); + log.info("실패 레코드 {} 건 저장 완료 (job: {}, executionId: {})", + records.size(), jobName, jobExecutionId); + } catch (Exception e) { + log.error("실패 레코드 저장 실패: {}", e.getMessage()); + } + } + + /** + * 재수집 성공 건을 RESOLVED로 처리합니다. + * 원본 stepExecutionId로 범위를 제한하여 해당 Step의 실패 건만 RESOLVED 처리합니다. + */ + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void resolveSuccessfulRetries(String jobName, Long sourceStepExecutionId, + List allRetryKeys, List failedAgainKeys) { + List successfulKeys = allRetryKeys.stream() + .filter(key -> !failedAgainKeys.contains(key)) + .toList(); + if (!successfulKeys.isEmpty()) { + int resolved = batchFailedRecordRepository.resolveByStepExecutionIdAndRecordKeys( + jobName, sourceStepExecutionId, successfulKeys, LocalDateTime.now()); + log.info("실패 레코드 RESOLVED 처리: {} 건 (job: {}, sourceStepExecutionId: {})", + resolved, jobName, sourceStepExecutionId); + } + } +} diff --git a/src/main/java/com/snp/batch/service/BatchService.java b/src/main/java/com/snp/batch/service/BatchService.java index 26a411c..9b1f471 100644 --- a/src/main/java/com/snp/batch/service/BatchService.java +++ b/src/main/java/com/snp/batch/service/BatchService.java @@ -3,7 +3,9 @@ package com.snp.batch.service; import com.snp.batch.common.batch.listener.RecollectionJobExecutionListener; import com.snp.batch.global.dto.*; import com.snp.batch.global.model.BatchApiLog; +import com.snp.batch.global.model.BatchFailedRecord; import com.snp.batch.global.repository.BatchApiLogRepository; +import com.snp.batch.global.repository.BatchFailedRecordRepository; import com.snp.batch.global.repository.TimelineRepository; import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; @@ -40,6 +42,7 @@ public class BatchService { private final TimelineRepository timelineRepository; private final RecollectionJobExecutionListener recollectionJobExecutionListener; private final BatchApiLogRepository apiLogRepository; + private final BatchFailedRecordRepository failedRecordRepository; @Autowired public BatchService(JobLauncher jobLauncher, @@ -49,7 +52,8 @@ public class BatchService { @Lazy ScheduleService scheduleService, TimelineRepository timelineRepository, RecollectionJobExecutionListener recollectionJobExecutionListener, - BatchApiLogRepository apiLogRepository) { + BatchApiLogRepository apiLogRepository, + BatchFailedRecordRepository failedRecordRepository) { this.jobLauncher = jobLauncher; this.jobExplorer = jobExplorer; this.jobOperator = jobOperator; @@ -58,6 +62,7 @@ public class BatchService { this.timelineRepository = timelineRepository; this.recollectionJobExecutionListener = recollectionJobExecutionListener; this.apiLogRepository = apiLogRepository; + this.failedRecordRepository = failedRecordRepository; } /** @@ -238,6 +243,20 @@ public class BatchService { com.snp.batch.global.dto.JobExecutionDetailDto.StepApiLogSummary apiLogSummary = buildStepApiLogSummary(stepExecution.getId()); + // Step별 실패 레코드 조회 + List failedRecordDtos = + failedRecordRepository.findByStepExecutionId(stepExecution.getId()).stream() + .map(record -> JobExecutionDetailDto.FailedRecordDto.builder() + .id(record.getId()) + .jobName(record.getJobName()) + .recordKey(record.getRecordKey()) + .errorMessage(record.getErrorMessage()) + .retryCount(record.getRetryCount()) + .status(record.getStatus()) + .createdAt(record.getCreatedAt()) + .build()) + .collect(Collectors.toList()); + return com.snp.batch.global.dto.JobExecutionDetailDto.StepExecutionDto.builder() .stepExecutionId(stepExecution.getId()) .stepName(stepExecution.getStepName()) @@ -257,6 +276,7 @@ public class BatchService { .duration(duration) .apiCallInfo(apiCallInfo) .apiLogSummary(apiLogSummary) + .failedRecords(failedRecordDtos.isEmpty() ? null : failedRecordDtos) .build(); } diff --git a/src/main/java/com/snp/batch/service/RecollectionHistoryService.java b/src/main/java/com/snp/batch/service/RecollectionHistoryService.java index fd0a97b..2736c2f 100644 --- a/src/main/java/com/snp/batch/service/RecollectionHistoryService.java +++ b/src/main/java/com/snp/batch/service/RecollectionHistoryService.java @@ -4,8 +4,10 @@ import com.snp.batch.global.dto.JobExecutionDetailDto; import com.snp.batch.global.model.BatchCollectionPeriod; import com.snp.batch.global.model.BatchLastExecution; import com.snp.batch.global.model.BatchRecollectionHistory; +import com.snp.batch.global.model.BatchFailedRecord; import com.snp.batch.global.repository.BatchApiLogRepository; import com.snp.batch.global.repository.BatchCollectionPeriodRepository; +import com.snp.batch.global.repository.BatchFailedRecordRepository; import com.snp.batch.global.repository.BatchLastExecutionRepository; import com.snp.batch.global.repository.BatchRecollectionHistoryRepository; import jakarta.persistence.criteria.Predicate; @@ -37,6 +39,7 @@ public class RecollectionHistoryService { private final BatchCollectionPeriodRepository periodRepository; private final BatchLastExecutionRepository lastExecutionRepository; private final BatchApiLogRepository apiLogRepository; + private final BatchFailedRecordRepository failedRecordRepository; private final JobExplorer jobExplorer; /** @@ -267,6 +270,20 @@ public class RecollectionHistoryService { JobExecutionDetailDto.StepApiLogSummary apiLogSummary = buildStepApiLogSummary(stepExecution.getId()); + // Step별 실패 레코드 조회 + List failedRecordDtos = + failedRecordRepository.findByStepExecutionId(stepExecution.getId()).stream() + .map(record -> JobExecutionDetailDto.FailedRecordDto.builder() + .id(record.getId()) + .jobName(record.getJobName()) + .recordKey(record.getRecordKey()) + .errorMessage(record.getErrorMessage()) + .retryCount(record.getRetryCount()) + .status(record.getStatus()) + .createdAt(record.getCreatedAt()) + .build()) + .collect(Collectors.toList()); + return JobExecutionDetailDto.StepExecutionDto.builder() .stepExecutionId(stepExecution.getId()) .stepName(stepExecution.getStepName()) @@ -286,6 +303,7 @@ public class RecollectionHistoryService { .duration(duration) .apiCallInfo(apiCallInfo) .apiLogSummary(apiLogSummary) + .failedRecords(failedRecordDtos.isEmpty() ? null : failedRecordDtos) .build(); } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index c98e91f..b65a905 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -17,7 +17,7 @@ spring: jpa: hibernate: ddl-auto: update - show-sql: true + show-sql: false properties: hibernate: dialect: org.hibernate.dialect.PostgreSQLDialect @@ -110,6 +110,13 @@ app: enabled: true cron: "0 0 * * * ?" # Every hour + # ShipDetailUpdate 배치 설정 (dev - 기존과 동일하게 20건 유지) + ship-detail-update: + batch-size: 20 # dev에서는 문제 없으므로 기존 20건 유지 + delay-on-success-ms: 300 + delay-on-failure-ms: 2000 + max-retry-count: 3 + # AIS Target 배치 설정 ais-target: since-seconds: 60 # API 조회 범위 (초) diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index c3577c2..f980c6c 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -17,7 +17,7 @@ spring: jpa: hibernate: ddl-auto: update - show-sql: true + show-sql: false properties: hibernate: dialect: org.hibernate.dialect.PostgreSQLDialect @@ -72,7 +72,7 @@ spring: # Server Configuration server: - port: 8041 + port: 9000 # port: 8041 servlet: context-path: /snp-api @@ -112,6 +112,13 @@ app: enabled: true cron: "0 0 * * * ?" # Every hour + # ShipDetailUpdate 배치 설정 (prod 튜닝) + ship-detail-update: + batch-size: 10 # API 요청 당 IMO 건수 (프록시 타임아웃 방지) + delay-on-success-ms: 300 # 성공 시 딜레이 (ms) + delay-on-failure-ms: 2000 # 실패 시 딜레이 (ms) + max-retry-count: 3 # 최대 재시도 횟수 + # AIS Target 배치 설정 ais-target: since-seconds: 60 # API 조회 범위 (초) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 46f9431..78f8155 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -165,6 +165,13 @@ app: enabled: true cron: "0 0 * * * ?" # Every hour + # ShipDetailUpdate 배치 설정 + ship-detail-update: + batch-size: 10 # API 요청 당 IMO 건수 + delay-on-success-ms: 300 # 성공 시 딜레이 (ms) + delay-on-failure-ms: 2000 # 실패 시 딜레이 (ms) + max-retry-count: 3 # 최대 재시도 횟수 + # AIS Target Import 배치 설정 (캐시 업데이트 전용) ais-target: since-seconds: 60 # API 조회 범위 (초)