feat: 재수집 실패건 재시도 프로세스 추가 및 선박제원정보 수집 최적화 #10

병합
HYOJIN feature/ISSUE-001-retry-failed-request 에서 develop 로 1 commits 를 머지했습니다 2026-02-26 17:01:39 +09:00
16개의 변경된 파일890개의 추가작업 그리고 104개의 파일을 삭제

파일 보기

@ -103,6 +103,7 @@ export interface StepExecutionDto {
duration: number | null; duration: number | null;
apiCallInfo: ApiCallInfo | null; apiCallInfo: ApiCallInfo | null;
apiLogSummary: StepApiLogSummary | null; apiLogSummary: StepApiLogSummary | null;
failedRecords?: FailedRecordDto[] | null;
} }
export interface ApiLogEntryDto { export interface ApiLogEntryDto {
@ -137,6 +138,16 @@ export interface ApiLogPageResponse {
export type ApiLogStatus = 'ALL' | 'SUCCESS' | 'ERROR'; export type ApiLogStatus = 'ALL' | 'SUCCESS' | 'ERROR';
export interface FailedRecordDto {
id: number;
jobName: string;
recordKey: string;
errorMessage: string | null;
retryCount: number;
status: string;
createdAt: string;
}
export interface JobExecutionDetailDto { export interface JobExecutionDetailDto {
executionId: number; executionId: number;
jobName: string; jobName: string;
@ -330,6 +341,15 @@ export const batchApi = {
`${BASE}/jobs/${jobName}/execute${qs}`); `${BASE}/jobs/${jobName}/execute${qs}`);
}, },
retryFailedRecords: (jobName: string, recordKeys: string[], stepExecutionId: number) => {
const qs = new URLSearchParams({
retryRecordKeys: recordKeys.join(','),
sourceStepExecutionId: String(stepExecutionId),
});
return postJson<{ success: boolean; message: string; executionId?: number }>(
`${BASE}/jobs/${jobName}/execute?${qs.toString()}`);
},
getJobExecutions: (jobName: string) => getJobExecutions: (jobName: string) =>
fetchJson<JobExecutionDto[]>(`${BASE}/jobs/${jobName}/executions`), fetchJson<JobExecutionDto[]>(`${BASE}/jobs/${jobName}/executions`),

파일 보기

@ -1,6 +1,6 @@
import { useState, useCallback, useEffect } from 'react'; import { useState, useCallback, useEffect } from 'react';
import { useParams, useSearchParams, useNavigate } from 'react-router-dom'; import { useParams, useSearchParams, useNavigate } from 'react-router-dom';
import { batchApi, type JobExecutionDetailDto, type StepExecutionDto, type ApiLogPageResponse, type ApiLogStatus } from '../api/batchApi'; import { batchApi, type JobExecutionDetailDto, type StepExecutionDto, type FailedRecordDto, type ApiLogPageResponse, type ApiLogStatus } from '../api/batchApi';
import { formatDateTime, formatDuration, calculateDuration } from '../utils/formatters'; import { formatDateTime, formatDuration, calculateDuration } from '../utils/formatters';
import { usePoller } from '../hooks/usePoller'; import { usePoller } from '../hooks/usePoller';
import StatusBadge from '../components/StatusBadge'; import StatusBadge from '../components/StatusBadge';
@ -264,9 +264,10 @@ function ApiLogSection({ stepExecutionId, summary }: ApiLogSectionProps) {
interface StepCardProps { interface StepCardProps {
step: StepExecutionDto; step: StepExecutionDto;
jobName: string;
} }
function StepCard({ step }: StepCardProps) { function StepCard({ step, jobName }: StepCardProps) {
const stats = [ const stats = [
{ label: '읽기', value: step.readCount }, { label: '읽기', value: step.readCount },
{ label: '쓰기', value: step.writeCount }, { label: '쓰기', value: step.writeCount },
@ -380,6 +381,11 @@ function StepCard({ step }: StepCardProps) {
</div> </div>
)} )}
{/* 호출 실패 데이터 토글 */}
{step.failedRecords && step.failedRecords.length > 0 && (
<FailedRecordsToggle records={step.failedRecords} jobName={jobName} stepExecutionId={step.stepExecutionId} />
)}
{step.exitMessage && ( {step.exitMessage && (
<div className="mt-4 rounded-lg bg-red-50 border border-red-200 p-3"> <div className="mt-4 rounded-lg bg-red-50 border border-red-200 p-3">
<p className="text-xs font-medium text-red-700 mb-1">Exit Message</p> <p className="text-xs font-medium text-red-700 mb-1">Exit Message</p>
@ -578,6 +584,7 @@ export default function ExecutionDetail() {
<StepCard <StepCard
key={step.stepExecutionId} key={step.stepExecutionId}
step={step} step={step}
jobName={detail.jobName}
/> />
))} ))}
</div> </div>
@ -587,6 +594,161 @@ export default function ExecutionDetail() {
); );
} }
function FailedRecordsToggle({ records, jobName, stepExecutionId }: { records: FailedRecordDto[]; jobName: string; stepExecutionId: number }) {
const [open, setOpen] = useState(false);
const [showConfirm, setShowConfirm] = useState(false);
const [retrying, setRetrying] = useState(false);
const navigate = useNavigate();
const failedRecords = records.filter((r) => r.status === 'FAILED');
const statusColor = (status: string) => {
switch (status) {
case 'RESOLVED': return 'text-emerald-600 bg-emerald-50';
case 'RETRY_PENDING': return 'text-amber-600 bg-amber-50';
default: return 'text-red-600 bg-red-50';
}
};
const handleRetry = async () => {
setRetrying(true);
try {
const keys = failedRecords.map((r) => r.recordKey);
const result = await batchApi.retryFailedRecords(jobName, keys, stepExecutionId);
if (result.success && result.executionId) {
setShowConfirm(false);
navigate(`/executions/${result.executionId}`);
}
} catch {
alert('재수집 실행에 실패했습니다.');
} finally {
setRetrying(false);
}
};
return (
<div className="mt-4 rounded-lg bg-red-50 border border-red-200 p-3">
<div className="flex items-center justify-between">
<button
onClick={() => setOpen((v) => !v)}
className="inline-flex items-center gap-1 text-xs font-medium text-red-600 hover:text-red-800 transition-colors"
>
<svg
className={`w-3 h-3 transition-transform ${open ? 'rotate-90' : ''}`}
fill="none" viewBox="0 0 24 24" stroke="currentColor"
>
<path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M9 5l7 7-7 7" />
</svg>
({records.length.toLocaleString()})
</button>
{failedRecords.length > 0 && (
<button
onClick={() => setShowConfirm(true)}
className="inline-flex items-center gap-1 px-2.5 py-1 text-xs font-medium text-white bg-red-500 hover:bg-red-600 rounded-md transition-colors"
>
<svg className="w-3 h-3" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M4 4v5h.582m15.356 2A8.001 8.001 0 004.582 9m0 0H9m11 11v-5h-.581m0 0a8.003 8.003 0 01-15.357-2m15.357 2H15" />
</svg>
({failedRecords.length})
</button>
)}
</div>
{open && (
<div className="mt-2 overflow-x-auto max-h-80 overflow-y-auto">
<table className="w-full text-xs text-left">
<thead className="bg-red-100 text-red-700 sticky top-0">
<tr>
<th className="px-2 py-1.5 font-medium">Record Key</th>
<th className="px-2 py-1.5 font-medium"> </th>
<th className="px-2 py-1.5 font-medium text-center"></th>
<th className="px-2 py-1.5 font-medium text-center"></th>
<th className="px-2 py-1.5 font-medium"> </th>
</tr>
</thead>
<tbody className="divide-y divide-red-100">
{records.map((record) => (
<tr
key={record.id}
className="bg-white hover:bg-red-50"
>
<td className="px-2 py-1.5 font-mono text-red-900">
{record.recordKey}
</td>
<td className="px-2 py-1.5 text-red-600 max-w-[200px] truncate" title={record.errorMessage || ''}>
{record.errorMessage || '-'}
</td>
<td className="px-2 py-1.5 text-center text-red-900">
{record.retryCount}
</td>
<td className="px-2 py-1.5 text-center">
<span className={`inline-flex px-1.5 py-0.5 text-[10px] font-medium rounded-full ${statusColor(record.status)}`}>
{record.status}
</span>
</td>
<td className="px-2 py-1.5 text-red-500 whitespace-nowrap">
{formatDateTime(record.createdAt)}
</td>
</tr>
))}
</tbody>
</table>
</div>
)}
{/* 확인 다이얼로그 */}
{showConfirm && (
<div className="fixed inset-0 z-50 flex items-center justify-center bg-black/40">
<div className="bg-white rounded-xl shadow-2xl p-6 w-full max-w-md mx-4">
<h3 className="text-lg font-semibold text-wing-text mb-2">
</h3>
<p className="text-sm text-wing-muted mb-3">
{failedRecords.length} IMO에 .
</p>
<div className="bg-gray-50 rounded-lg p-3 mb-4 max-h-40 overflow-y-auto">
<div className="flex flex-wrap gap-1">
{failedRecords.map((r) => (
<span
key={r.id}
className="inline-flex px-2 py-0.5 text-xs font-mono bg-red-100 text-red-700 rounded"
>
{r.recordKey}
</span>
))}
</div>
</div>
<div className="flex justify-end gap-2">
<button
onClick={() => setShowConfirm(false)}
disabled={retrying}
className="px-4 py-2 text-sm font-medium text-wing-muted bg-gray-100 hover:bg-gray-200 rounded-lg transition-colors disabled:opacity-50"
>
</button>
<button
onClick={handleRetry}
disabled={retrying}
className="px-4 py-2 text-sm font-medium text-white bg-red-500 hover:bg-red-600 rounded-lg transition-colors disabled:opacity-50 inline-flex items-center gap-1"
>
{retrying ? (
<>
<div className="h-3.5 w-3.5 animate-spin rounded-full border-2 border-white border-t-transparent" />
...
</>
) : (
'재수집 실행'
)}
</button>
</div>
</div>
</div>
)}
</div>
);
}
function InfoItem({ label, value }: { label: string; value: string }) { function InfoItem({ label, value }: { label: string; value: string }) {
return ( return (
<div> <div>

파일 보기

@ -4,6 +4,7 @@ import {
batchApi, batchApi,
type RecollectionDetailResponse, type RecollectionDetailResponse,
type StepExecutionDto, type StepExecutionDto,
type FailedRecordDto,
type ApiLogPageResponse, type ApiLogPageResponse,
type ApiLogStatus, type ApiLogStatus,
} from '../api/batchApi'; } from '../api/batchApi';
@ -266,7 +267,7 @@ function ApiLogSection({ stepExecutionId, summary }: ApiLogSectionProps) {
); );
} }
function StepCard({ step }: { step: StepExecutionDto }) { function StepCard({ step, jobName }: { step: StepExecutionDto; jobName: string }) {
const stats = [ const stats = [
{ label: '읽기', value: step.readCount }, { label: '읽기', value: step.readCount },
{ label: '쓰기', value: step.writeCount }, { label: '쓰기', value: step.writeCount },
@ -358,6 +359,11 @@ function StepCard({ step }: { step: StepExecutionDto }) {
</div> </div>
)} )}
{/* 호출 실패 데이터 토글 */}
{step.failedRecords && step.failedRecords.length > 0 && (
<FailedRecordsToggle records={step.failedRecords} jobName={jobName} stepExecutionId={step.stepExecutionId} />
)}
{step.exitMessage && ( {step.exitMessage && (
<div className="mt-4 rounded-lg bg-red-50 border border-red-200 p-3"> <div className="mt-4 rounded-lg bg-red-50 border border-red-200 p-3">
<p className="text-xs font-medium text-red-700 mb-1">Exit Message</p> <p className="text-xs font-medium text-red-700 mb-1">Exit Message</p>
@ -630,7 +636,7 @@ export default function RecollectDetail() {
) : ( ) : (
<div className="space-y-4"> <div className="space-y-4">
{stepExecutions.map((step) => ( {stepExecutions.map((step) => (
<StepCard key={step.stepExecutionId} step={step} /> <StepCard key={step.stepExecutionId} step={step} jobName={history.jobName} />
))} ))}
</div> </div>
)} )}
@ -639,6 +645,161 @@ export default function RecollectDetail() {
); );
} }
function FailedRecordsToggle({ records, jobName, stepExecutionId }: { records: FailedRecordDto[]; jobName: string; stepExecutionId: number }) {
const [open, setOpen] = useState(false);
const [showConfirm, setShowConfirm] = useState(false);
const [retrying, setRetrying] = useState(false);
const navigate = useNavigate();
const failedRecords = records.filter((r) => r.status === 'FAILED');
const statusColor = (status: string) => {
switch (status) {
case 'RESOLVED': return 'text-emerald-600 bg-emerald-50';
case 'RETRY_PENDING': return 'text-amber-600 bg-amber-50';
default: return 'text-red-600 bg-red-50';
}
};
const handleRetry = async () => {
setRetrying(true);
try {
const keys = failedRecords.map((r) => r.recordKey);
const result = await batchApi.retryFailedRecords(jobName, keys, stepExecutionId);
if (result.success && result.executionId) {
setShowConfirm(false);
navigate(`/executions/${result.executionId}`);
}
} catch {
alert('재수집 실행에 실패했습니다.');
} finally {
setRetrying(false);
}
};
return (
<div className="mt-4 rounded-lg bg-red-50 border border-red-200 p-3">
<div className="flex items-center justify-between">
<button
onClick={() => setOpen((v) => !v)}
className="inline-flex items-center gap-1 text-xs font-medium text-red-600 hover:text-red-800 transition-colors"
>
<svg
className={`w-3 h-3 transition-transform ${open ? 'rotate-90' : ''}`}
fill="none" viewBox="0 0 24 24" stroke="currentColor"
>
<path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M9 5l7 7-7 7" />
</svg>
({records.length.toLocaleString()})
</button>
{failedRecords.length > 0 && (
<button
onClick={() => setShowConfirm(true)}
className="inline-flex items-center gap-1 px-2.5 py-1 text-xs font-medium text-white bg-red-500 hover:bg-red-600 rounded-md transition-colors"
>
<svg className="w-3 h-3" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M4 4v5h.582m15.356 2A8.001 8.001 0 004.582 9m0 0H9m11 11v-5h-.581m0 0a8.003 8.003 0 01-15.357-2m15.357 2H15" />
</svg>
({failedRecords.length})
</button>
)}
</div>
{open && (
<div className="mt-2 overflow-x-auto max-h-80 overflow-y-auto">
<table className="w-full text-xs text-left">
<thead className="bg-red-100 text-red-700 sticky top-0">
<tr>
<th className="px-2 py-1.5 font-medium">Record Key</th>
<th className="px-2 py-1.5 font-medium"> </th>
<th className="px-2 py-1.5 font-medium text-center"></th>
<th className="px-2 py-1.5 font-medium text-center"></th>
<th className="px-2 py-1.5 font-medium"> </th>
</tr>
</thead>
<tbody className="divide-y divide-red-100">
{records.map((record) => (
<tr
key={record.id}
className="bg-white hover:bg-red-50"
>
<td className="px-2 py-1.5 font-mono text-red-900">
{record.recordKey}
</td>
<td className="px-2 py-1.5 text-red-600 max-w-[200px] truncate" title={record.errorMessage || ''}>
{record.errorMessage || '-'}
</td>
<td className="px-2 py-1.5 text-center text-red-900">
{record.retryCount}
</td>
<td className="px-2 py-1.5 text-center">
<span className={`inline-flex px-1.5 py-0.5 text-[10px] font-medium rounded-full ${statusColor(record.status)}`}>
{record.status}
</span>
</td>
<td className="px-2 py-1.5 text-red-500 whitespace-nowrap">
{formatDateTime(record.createdAt)}
</td>
</tr>
))}
</tbody>
</table>
</div>
)}
{/* 확인 다이얼로그 */}
{showConfirm && (
<div className="fixed inset-0 z-50 flex items-center justify-center bg-black/40">
<div className="bg-white rounded-xl shadow-2xl p-6 w-full max-w-md mx-4">
<h3 className="text-lg font-semibold text-wing-text mb-2">
</h3>
<p className="text-sm text-wing-muted mb-3">
{failedRecords.length} IMO에 .
</p>
<div className="bg-gray-50 rounded-lg p-3 mb-4 max-h-40 overflow-y-auto">
<div className="flex flex-wrap gap-1">
{failedRecords.map((r) => (
<span
key={r.id}
className="inline-flex px-2 py-0.5 text-xs font-mono bg-red-100 text-red-700 rounded"
>
{r.recordKey}
</span>
))}
</div>
</div>
<div className="flex justify-end gap-2">
<button
onClick={() => setShowConfirm(false)}
disabled={retrying}
className="px-4 py-2 text-sm font-medium text-wing-muted bg-gray-100 hover:bg-gray-200 rounded-lg transition-colors disabled:opacity-50"
>
</button>
<button
onClick={handleRetry}
disabled={retrying}
className="px-4 py-2 text-sm font-medium text-white bg-red-500 hover:bg-red-600 rounded-lg transition-colors disabled:opacity-50 inline-flex items-center gap-1"
>
{retrying ? (
<>
<div className="h-3.5 w-3.5 animate-spin rounded-full border-2 border-white border-t-transparent" />
...
</>
) : (
'재수집 실행'
)}
</button>
</div>
</div>
</div>
)}
</div>
);
}
function InfoItem({ label, value }: { label: string; value: string }) { function InfoItem({ label, value }: { label: string; value: string }) {
return ( return (
<div> <div>

파일 보기

@ -89,6 +89,14 @@ public abstract class BaseApiReader<T> implements ItemReader<T> {
this.jobExecutionId = jobExecutionId; this.jobExecutionId = jobExecutionId;
this.stepExecutionId = stepExecutionId; this.stepExecutionId = stepExecutionId;
} }
protected Long getJobExecutionId() {
return this.jobExecutionId;
}
protected Long getStepExecutionId() {
return this.stepExecutionId;
}
/** /**
* 기본 생성자 (WebClient 없이 사용 - Mock 데이터용) * 기본 생성자 (WebClient 없이 사용 - Mock 데이터용)
*/ */

파일 보기

@ -1,10 +1,15 @@
package com.snp.batch.global.config; package com.snp.batch.global.config;
import io.netty.channel.ChannelOption;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.http.client.HttpClient;
import java.time.Duration;
/** /**
* Maritime API WebClient 설정 * Maritime API WebClient 설정
@ -59,12 +64,17 @@ public class MaritimeApiWebClientConfig {
log.info("Base URL: {}", maritimeApiUrl); log.info("Base URL: {}", maritimeApiUrl);
log.info("========================================"); log.info("========================================");
HttpClient httpClient = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) // 연결 타임아웃 10초
.responseTimeout(Duration.ofSeconds(60)); // 응답 대기 60초
return WebClient.builder() return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.baseUrl(maritimeApiUrl) .baseUrl(maritimeApiUrl)
.defaultHeaders(headers -> headers.setBasicAuth(maritimeApiUsername, maritimeApiPassword)) .defaultHeaders(headers -> headers.setBasicAuth(maritimeApiUsername, maritimeApiPassword))
.codecs(configurer -> configurer .codecs(configurer -> configurer
.defaultCodecs() .defaultCodecs()
.maxInMemorySize(100 * 1024 * 1024)) // 30MB 버퍼 .maxInMemorySize(100 * 1024 * 1024)) // 100MB 버퍼
.build(); .build();
} }

파일 보기

@ -70,6 +70,7 @@ public class JobExecutionDetailDto {
private Long duration; // 실행 시간 (ms) private Long duration; // 실행 시간 (ms)
private ApiCallInfo apiCallInfo; // API 호출 정보 - StepExecutionContext 기반 (옵셔널) private ApiCallInfo apiCallInfo; // API 호출 정보 - StepExecutionContext 기반 (옵셔널)
private StepApiLogSummary apiLogSummary; // API 호출 로그 요약 - batch_api_log 기반 (옵셔널) private StepApiLogSummary apiLogSummary; // API 호출 로그 요약 - batch_api_log 기반 (옵셔널)
private List<FailedRecordDto> failedRecords; // 실패 레코드 (옵셔널)
} }
/** /**
@ -107,6 +108,23 @@ public class JobExecutionDetailDto {
private Long totalRecordCount; // 반환 건수 private Long totalRecordCount; // 반환 건수
} }
/**
* 실패 레코드 DTO
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class FailedRecordDto {
private Long id;
private String jobName;
private String recordKey;
private String errorMessage;
private Integer retryCount;
private String status;
private LocalDateTime createdAt;
}
/** /**
* API 호출 로그 페이징 응답 * API 호출 로그 페이징 응답
*/ */

파일 보기

@ -0,0 +1,46 @@
package com.snp.batch.global.model;
import jakarta.persistence.*;
import lombok.*;
import org.hibernate.annotations.CreationTimestamp;
import java.time.LocalDateTime;
@Entity
@Table(name = "batch_failed_record", schema = "t_std_snp_data")
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Builder
public class BatchFailedRecord {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false)
private String jobName;
private Long jobExecutionId;
private Long stepExecutionId;
@Column(nullable = false)
private String recordKey;
@Column(columnDefinition = "TEXT")
private String errorMessage;
@Column(nullable = false)
private Integer retryCount;
@Column(nullable = false, length = 20)
@Builder.Default
private String status = "FAILED";
@CreationTimestamp
@Column(updatable = false)
private LocalDateTime createdAt;
private LocalDateTime resolvedAt;
}

파일 보기

@ -0,0 +1,48 @@
package com.snp.batch.global.repository;
import com.snp.batch.global.model.BatchFailedRecord;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;
import java.time.LocalDateTime;
import java.util.List;
@Repository
public interface BatchFailedRecordRepository extends JpaRepository<BatchFailedRecord, Long> {
/**
* 특정 Job의 상태별 실패 조회
*/
List<BatchFailedRecord> findByJobNameAndStatus(String jobName, String status);
/**
* 실행별 실패 레코드 조회
*/
List<BatchFailedRecord> findByJobExecutionId(Long jobExecutionId);
/**
* Step별 실패 레코드 조회
*/
List<BatchFailedRecord> findByStepExecutionId(Long stepExecutionId);
/**
* 실행별 실패 건수
*/
long countByJobExecutionId(Long jobExecutionId);
/**
* 특정 Step 실행의 실패 레코드를 RESOLVED로 벌크 업데이트
*/
@Modifying
@Query("UPDATE BatchFailedRecord r SET r.status = 'RESOLVED', r.resolvedAt = :resolvedAt " +
"WHERE r.jobName = :jobName AND r.stepExecutionId = :stepExecutionId " +
"AND r.recordKey IN :recordKeys AND r.status = 'FAILED'")
int resolveByStepExecutionIdAndRecordKeys(
@Param("jobName") String jobName,
@Param("stepExecutionId") Long stepExecutionId,
@Param("recordKeys") List<String> recordKeys,
@Param("resolvedAt") LocalDateTime resolvedAt);
}

파일 보기

@ -9,11 +9,16 @@ import com.snp.batch.jobs.shipdetail.batch.reader.ShipDetailUpdateDataReader;
import com.snp.batch.jobs.shipdetail.batch.writer.ShipDetailDataWriter; import com.snp.batch.jobs.shipdetail.batch.writer.ShipDetailDataWriter;
import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchApiLogService;
import com.snp.batch.service.BatchDateService; import com.snp.batch.service.BatchDateService;
import com.snp.batch.service.BatchFailedRecordService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step; import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.core.step.tasklet.Tasklet;
@ -29,6 +34,9 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import java.util.Arrays;
import java.util.List;
@Slf4j @Slf4j
@Configuration @Configuration
public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetailDto, ShipDetailEntity> { public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetailDto, ShipDetailEntity> {
@ -41,6 +49,7 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
private final ObjectMapper objectMapper; // ObjectMapper 주입 추가 private final ObjectMapper objectMapper; // ObjectMapper 주입 추가
private final BatchDateService batchDateService; private final BatchDateService batchDateService;
private final BatchApiLogService batchApiLogService; private final BatchApiLogService batchApiLogService;
private final BatchFailedRecordService batchFailedRecordService;
@Value("${app.batch.ship-api.url}") @Value("${app.batch.ship-api.url}")
private String maritimeApiUrl; private String maritimeApiUrl;
@ -48,6 +57,18 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
@Value("${app.batch.target-schema.name}") @Value("${app.batch.target-schema.name}")
private String targetSchema; private String targetSchema;
@Value("${app.batch.ship-detail-update.batch-size:10}")
private int shipDetailBatchSize;
@Value("${app.batch.ship-detail-update.delay-on-success-ms:300}")
private long delayOnSuccessMs;
@Value("${app.batch.ship-detail-update.delay-on-failure-ms:2000}")
private long delayOnFailureMs;
@Value("${app.batch.ship-detail-update.max-retry-count:3}")
private int maxRetryCount;
protected String getApiKey() {return "SHIP_DETAIL_UPDATE_API";} protected String getApiKey() {return "SHIP_DETAIL_UPDATE_API";}
protected String getBatchUpdateSql() { protected String getBatchUpdateSql() {
return String.format("UPDATE %s.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", targetSchema, getApiKey());} return String.format("UPDATE %s.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", targetSchema, getApiKey());}
@ -63,16 +84,18 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
@Qualifier("maritimeApiWebClient") WebClient maritimeApiWebClient, @Qualifier("maritimeApiWebClient") WebClient maritimeApiWebClient,
ObjectMapper objectMapper, ObjectMapper objectMapper,
BatchDateService batchDateService, BatchDateService batchDateService,
BatchApiLogService batchApiLogService) { // ObjectMapper 주입 추가 BatchApiLogService batchApiLogService,
BatchFailedRecordService batchFailedRecordService) {
super(jobRepository, transactionManager); super(jobRepository, transactionManager);
this.shipDetailDataProcessor = shipDetailDataProcessor; this.shipDetailDataProcessor = shipDetailDataProcessor;
this.shipDetailDataWriter = shipDetailDataWriter; this.shipDetailDataWriter = shipDetailDataWriter;
this.shipDetailUpdateDataReader = shipDetailUpdateDataReader; this.shipDetailUpdateDataReader = shipDetailUpdateDataReader;
this.jdbcTemplate = jdbcTemplate; this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient; this.maritimeApiWebClient = maritimeApiWebClient;
this.objectMapper = objectMapper; // ObjectMapper 초기화 this.objectMapper = objectMapper;
this.batchDateService = batchDateService; this.batchDateService = batchDateService;
this.batchApiLogService = batchApiLogService; this.batchApiLogService = batchApiLogService;
this.batchFailedRecordService = batchFailedRecordService;
} }
@Override @Override
@ -89,18 +112,61 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetail
protected Job createJobFlow(JobBuilder jobBuilder) { protected Job createJobFlow(JobBuilder jobBuilder) {
return jobBuilder return jobBuilder
.start(ShipDetailUpdateStep()) .start(ShipDetailUpdateStep())
.next(shipDetailLastExecutionUpdateStep()) .next(retryModeDecider())
.on("RETRY").end()
.from(retryModeDecider()).on("NORMAL").to(shipDetailLastExecutionUpdateStep())
.end()
.build(); .build();
} }
/**
* Retry 모드 판별 Decider
* retryRecordKeys 파라미터가 존재하면 RETRY, 없으면 NORMAL 반환
*/
@Bean
public JobExecutionDecider retryModeDecider() {
return (jobExecution, stepExecution) -> {
String retryKeys = jobExecution.getJobParameters().getString("retryRecordKeys");
if (retryKeys != null && !retryKeys.isBlank()) {
log.info("[ShipDetailUpdateJob] Decider: RETRY 모드 - LAST_EXECUTION 업데이트 스킵");
return new FlowExecutionStatus("RETRY");
}
log.info("[ShipDetailUpdateJob] Decider: NORMAL 모드 - LAST_EXECUTION 업데이트 진행");
return new FlowExecutionStatus("NORMAL");
};
}
@Bean @Bean
@StepScope @StepScope
public ShipDetailUpdateDataReader shipDetailUpdateDataReader( public ShipDetailUpdateDataReader shipDetailUpdateDataReader(
@Value("#{stepExecution.jobExecution.id}") Long jobExecutionId, // SpEL로 ID 추출 @Value("#{stepExecution.jobExecution.id}") Long jobExecutionId,
@Value("#{stepExecution.id}") Long stepExecutionId @Value("#{stepExecution.id}") Long stepExecutionId,
@Value("#{jobParameters['retryRecordKeys']}") String retryRecordKeysParam,
@Value("#{jobParameters['sourceStepExecutionId']}") String sourceStepExecutionIdParam
) { ) {
ShipDetailUpdateDataReader reader = new ShipDetailUpdateDataReader(maritimeApiWebClient, jdbcTemplate, objectMapper, batchDateService, batchApiLogService, maritimeApiUrl); ShipDetailUpdateDataReader reader = new ShipDetailUpdateDataReader(
reader.setExecutionIds(jobExecutionId, stepExecutionId); // ID 세팅 maritimeApiWebClient, jdbcTemplate, objectMapper,
batchDateService, batchApiLogService, batchFailedRecordService, maritimeApiUrl,
shipDetailBatchSize, delayOnSuccessMs, delayOnFailureMs, maxRetryCount
);
reader.setExecutionIds(jobExecutionId, stepExecutionId);
// Retry 모드: retryRecordKeys 파라미터가 있으면 주입
if (retryRecordKeysParam != null && !retryRecordKeysParam.isBlank()) {
List<String> retryKeys = Arrays.stream(retryRecordKeysParam.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.toList();
reader.setRetryRecordKeys(retryKeys);
if (sourceStepExecutionIdParam != null && !sourceStepExecutionIdParam.isBlank()) {
reader.setSourceStepExecutionId(Long.parseLong(sourceStepExecutionIdParam));
}
log.info("[ShipDetailUpdateJob] Retry 모드 활성화: {} 건의 IMO 대상, sourceStepExecutionId: {}",
retryKeys.size(), sourceStepExecutionIdParam);
}
return reader; return reader;
} }

파일 보기

@ -9,6 +9,7 @@ import com.snp.batch.jobs.shipdetail.batch.dto.ShipResultDto;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipUpdateApiResponse; import com.snp.batch.jobs.shipdetail.batch.dto.ShipUpdateApiResponse;
import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchApiLogService;
import com.snp.batch.service.BatchDateService; import com.snp.batch.service.BatchDateService;
import com.snp.batch.service.BatchFailedRecordService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.ParameterizedTypeReference;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
@ -20,85 +21,130 @@ import java.util.stream.Collectors;
@Slf4j @Slf4j
public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> { public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
private final BatchDateService batchDateService; // BatchDateService 필드 추가 private final BatchDateService batchDateService;
private final BatchApiLogService batchApiLogService; private final BatchApiLogService batchApiLogService;
private final BatchFailedRecordService batchFailedRecordService;
private final String maritimeApiUrl; private final String maritimeApiUrl;
private final JdbcTemplate jdbcTemplate; private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
// 배치 처리 상태
// 외부 설정값
private final int batchSize;
private final long delayOnSuccessMs;
private final long delayOnFailureMs;
private final int maxRetryCount;
// 배치 처리 상태
private List<String> allImoNumbers; private List<String> allImoNumbers;
// DB 해시값을 저장할
private int currentBatchIndex = 0; private int currentBatchIndex = 0;
private final int batchSize = 20;
public ShipDetailUpdateDataReader(WebClient webClient, JdbcTemplate jdbcTemplate, ObjectMapper objectMapper,BatchDateService batchDateService, BatchApiLogService batchApiLogService, String maritimeApiUrl) { // 실패 IMO 추적
private final List<String> failedImoNumbers = new ArrayList<>();
private String lastErrorMessage;
// Retry 모드
private List<String> retryRecordKeys;
private Long sourceStepExecutionId;
public ShipDetailUpdateDataReader(
WebClient webClient,
JdbcTemplate jdbcTemplate,
ObjectMapper objectMapper,
BatchDateService batchDateService,
BatchApiLogService batchApiLogService,
BatchFailedRecordService batchFailedRecordService,
String maritimeApiUrl,
int batchSize,
long delayOnSuccessMs,
long delayOnFailureMs,
int maxRetryCount
) {
super(webClient); super(webClient);
this.jdbcTemplate = jdbcTemplate; this.jdbcTemplate = jdbcTemplate;
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
this.batchDateService = batchDateService; this.batchDateService = batchDateService;
this.batchApiLogService = batchApiLogService; this.batchApiLogService = batchApiLogService;
this.batchFailedRecordService = batchFailedRecordService;
this.maritimeApiUrl = maritimeApiUrl; this.maritimeApiUrl = maritimeApiUrl;
enableChunkMode(); // Chunk 모드 활성화 this.batchSize = batchSize;
this.delayOnSuccessMs = delayOnSuccessMs;
this.delayOnFailureMs = delayOnFailureMs;
this.maxRetryCount = maxRetryCount;
enableChunkMode();
}
public void setRetryRecordKeys(List<String> retryRecordKeys) {
this.retryRecordKeys = retryRecordKeys;
}
public void setSourceStepExecutionId(Long sourceStepExecutionId) {
this.sourceStepExecutionId = sourceStepExecutionId;
}
private boolean isRetryMode() {
return retryRecordKeys != null && !retryRecordKeys.isEmpty();
} }
@Override @Override
protected String getReaderName() { protected String getReaderName() {
return "ShipDetailUpdateDataReader"; return "ShipDetailUpdateDataReader";
} }
protected String getShipUpdateApiPath(){ return "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipChangesByLastUpdateDateRange"; }
protected String getShipUpdateApiPath() {
return "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipChangesByLastUpdateDateRange";
}
@Override @Override
protected String getApiPath() { protected String getApiPath() {
return "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipsByIHSLRorIMONumbersAll"; return "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipsByIHSLRorIMONumbersAll";
} }
protected String getApiKey() {return "SHIP_DETAIL_UPDATE_API";}
protected String getApiKey() {
return "SHIP_DETAIL_UPDATE_API";
}
@Override @Override
protected void resetCustomState() { protected void resetCustomState() {
this.currentBatchIndex = 0; this.currentBatchIndex = 0;
this.allImoNumbers = null; this.allImoNumbers = null;
this.failedImoNumbers.clear();
this.lastErrorMessage = null;
// retryRecordKeys는 JobConfig에서 주입하므로 초기화하지 않음
} }
/**
* 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회
*/
@Override @Override
protected void beforeFetch() { protected void beforeFetch() {
// 💡 Step 1. 기간 변경된 IMO 번호 리스트 조회 if (isRetryMode()) {
log.info("[{}] 변경된 IMO 번호 조회 시작...", getReaderName()); log.info("[{}] [RETRY MODE] 실패 건 재수집 모드 시작 - 대상 IMO: {} 건",
ShipUpdateApiResponse response = callShipUpdateApi(); getReaderName(), retryRecordKeys.size());
allImoNumbers = extractUpdateImoNumbers(response); allImoNumbers = new ArrayList<>(retryRecordKeys);
log.info("[{}] 변경된 IMO 번호 수: {} 개", getReaderName(), response.getShipCount()); log.info("[{}] [RETRY MODE] IMO 목록: {}", getReaderName(), allImoNumbers);
} else {
log.info("[{}] 변경된 IMO 번호 조회 시작...", getReaderName());
ShipUpdateApiResponse response = callShipUpdateApi();
allImoNumbers = extractUpdateImoNumbers(response);
log.info("[{}] 변경된 IMO 번호 수: {} 개", getReaderName(), response.getShipCount());
log.info("[{}] 총 {} 개의 변경된 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
}
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 총 {} 개의 변경된 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size()); log.info("[{}] 설정: batch-size={}, delay-success={}ms, delay-failure={}ms, max-retry={}",
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize); getReaderName(), batchSize, delayOnSuccessMs, delayOnFailureMs, maxRetryCount);
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches); log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
// API 통계 초기화
updateApiCallStats(totalBatches, 0); updateApiCallStats(totalBatches, 0);
} }
/**
* Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
*
* Spring Batch가 100건씩 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 100건 ( 이상 없으면 null)
*/
@Override @Override
protected List<ShipDetailDto> fetchNextBatch() throws Exception { protected List<ShipDetailDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) { if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
return null; // Job 종료 return null;
} }
// 현재 배치의 시작/ 인덱스 계산
int startIndex = currentBatchIndex; int startIndex = currentBatchIndex;
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size()); int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex); List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1; int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
@ -107,65 +153,95 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...", log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size()); getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
try { // 다음 배치로 인덱스 이동 (성공/실패 무관하게 진행)
// IMO 번호를 쉼표로 연결 (: "1000019,1000021,1000033,...") currentBatchIndex = endIndex;
String imoParam = String.join(",", currentBatch);
// API 호출 String imoParam = String.join(",", currentBatch);
ShipDetailApiResponse response = callApiWithBatch(imoParam);
// 다음 배치로 인덱스 이동 // Retry with exponential backoff
currentBatchIndex = endIndex; ShipDetailApiResponse response = callApiWithRetry(imoParam, currentBatch, currentBatchNumber, totalBatches);
// 응답 처리 // API 호출 통계 업데이트
if (response != null && response.getShipResult() != null) { updateApiCallStats(totalBatches, currentBatchNumber);
List<ShipDetailDto> shipDetailDtoList = response.getShipResult().stream() if (response != null && response.getShipResult() != null) {
.map(ShipResultDto::getShipDetails) // result -> result.getShipDetail() List<ShipDetailDto> shipDetailDtoList = response.getShipResult().stream()
.filter(Objects::nonNull) // 데이터가 없는 경우 제외 .map(ShipResultDto::getShipDetails)
.collect(Collectors.toList()); .filter(Objects::nonNull)
.collect(Collectors.toList());
log.info("[{}] 배치 {}/{} 완료: {} 건 조회", log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, shipDetailDtoList.size()); getReaderName(), currentBatchNumber, totalBatches, shipDetailDtoList.size());
// API 호출 통계 업데이트 // 성공 딜레이
updateApiCallStats(totalBatches, currentBatchNumber); sleepIfNeeded(delayOnSuccessMs);
// API 과부하 방지 (다음 배치 0.5초 대기) return shipDetailDtoList;
if (currentBatchIndex < allImoNumbers.size()) { } else {
Thread.sleep(500); log.warn("[{}] 배치 {}/{} 응답 없음", getReaderName(), currentBatchNumber, totalBatches);
}
return shipDetailDtoList;
} else {
log.warn("[{}] 배치 {}/{} 응답 없음",
getReaderName(), currentBatchNumber, totalBatches);
// API 호출 통계 업데이트 (실패도 카운트)
updateApiCallStats(totalBatches, currentBatchNumber);
return Collections.emptyList();
}
} catch (Exception e) {
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
currentBatchIndex = endIndex;
// 리스트 반환 (Job 계속 진행)
return Collections.emptyList(); return Collections.emptyList();
} }
} }
/** /**
* Query Parameter를 사용한 API 호출 * Retry with exponential backoff
* * 최대 maxRetryCount 재시도, 대기: 2초 4초 8초
* @param imoNumbers 쉼표로 연결된 IMO 번호 (: "1000019,1000021,...")
* @return API 응답
*/ */
private ShipDetailApiResponse callApiWithRetry(
String imoParam,
List<String> currentBatch,
int currentBatchNumber,
int totalBatches
) {
Exception lastException = null;
for (int attempt = 1; attempt <= maxRetryCount; attempt++) {
try {
ShipDetailApiResponse response = callApiWithBatch(imoParam);
if (attempt > 1) {
log.info("[{}] 배치 {}/{} 재시도 {}/{} 성공",
getReaderName(), currentBatchNumber, totalBatches, attempt, maxRetryCount);
}
return response;
} catch (Exception e) {
lastException = e;
log.warn("[{}] 배치 {}/{} 재시도 {}/{} 실패: {}",
getReaderName(), currentBatchNumber, totalBatches,
attempt, maxRetryCount, e.getMessage());
if (attempt < maxRetryCount) {
long backoffMs = delayOnFailureMs * (1L << (attempt - 1)); // 2s, 4s, 8s
log.info("[{}] {}ms 후 재시도...", getReaderName(), backoffMs);
sleepIfNeeded(backoffMs);
}
}
}
// 모든 재시도 실패 - 실패 IMO 기록
failedImoNumbers.addAll(currentBatch);
lastErrorMessage = lastException != null ? lastException.getMessage() : "unknown";
log.error("[{}] 배치 {}/{} 최종 실패 ({}회 재시도 소진). 실패 IMO {} 건 기록: {}",
getReaderName(), currentBatchNumber, totalBatches, maxRetryCount,
currentBatch.size(), lastErrorMessage);
// 실패 딜레이
sleepIfNeeded(delayOnFailureMs);
return null;
}
private void sleepIfNeeded(long ms) {
if (currentBatchIndex < allImoNumbers.size()) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private ShipDetailApiResponse callApiWithBatch(String imoNumbers) { private ShipDetailApiResponse callApiWithBatch(String imoNumbers) {
Map<String, String> params = new HashMap<>(); Map<String, String> params = new HashMap<>();
params.put("IMONumbers", imoNumbers); params.put("IMONumbers", imoNumbers);
@ -176,12 +252,11 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
params, params,
new ParameterizedTypeReference<ShipDetailApiResponse>() {}, new ParameterizedTypeReference<ShipDetailApiResponse>() {},
batchApiLogService, batchApiLogService,
res -> res.getShipResult() != null ? (long) res.getShipResult().size() : 0L // 람다 적용 res -> res.getShipResult() != null ? (long) res.getShipResult().size() : 0L
); );
} }
private ShipUpdateApiResponse callShipUpdateApi(){ private ShipUpdateApiResponse callShipUpdateApi() {
// 1. BatchDateService를 통해 동적 날짜 파라미터 조회
Map<String, String> params = batchDateService.getDateRangeWithoutTimeParams(getApiKey()); Map<String, String> params = batchDateService.getDateRangeWithoutTimeParams(getApiKey());
return executeSingleApiCall( return executeSingleApiCall(
maritimeApiUrl, maritimeApiUrl,
@ -189,7 +264,7 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
params, params,
new ParameterizedTypeReference<ShipUpdateApiResponse>() {}, new ParameterizedTypeReference<ShipUpdateApiResponse>() {},
batchApiLogService, batchApiLogService,
res -> res.getShips() != null ? (long) res.getShips().size() : 0L // 람다 적용 res -> res.getShips() != null ? (long) res.getShips().size() : 0L
); );
} }
@ -197,29 +272,68 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailDto> {
if (response.getShips() == null) { if (response.getShips() == null) {
return Collections.emptyList(); return Collections.emptyList();
} }
return response.getShips() .stream() return response.getShips().stream()
// ShipDto 객체에서 imoNumber 필드 (String 타입) 추출
.map(ShipDto::getImoNumber) .map(ShipDto::getImoNumber)
// IMO 번호가 null이 아닌 경우만 필터링 (선택 사항이지만 안전성을 위해)
.filter(imoNumber -> imoNumber != null) .filter(imoNumber -> imoNumber != null)
// 추출된 String imoNumber들을 List<String>으로 수집
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
@Override @Override
protected void afterFetch(List<ShipDetailDto> data) { protected void afterFetch(List<ShipDetailDto> data) {
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
try{ try {
if (data == null) { if (data == null) {
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches); log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료", log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
getReaderName(), allImoNumbers.size()); getReaderName(), allImoNumbers.size());
if (isRetryMode()) {
// Retry 모드: 성공 RESOLVED 처리 + 재실패 FAILED 레코드 저장
log.info("[{}] [RETRY MODE] 재수집 결과 처리 시작 (sourceStepExecutionId: {})",
getReaderName(), sourceStepExecutionId);
batchFailedRecordService.resolveSuccessfulRetries(
"ShipDetailUpdateJob", sourceStepExecutionId, retryRecordKeys, failedImoNumbers);
if (!failedImoNumbers.isEmpty()) {
log.warn("[{}] [RETRY MODE] 재실패 IMO 건수: {} 건", getReaderName(), failedImoNumbers.size());
batchFailedRecordService.saveFailedRecords(
"ShipDetailUpdateJob",
getJobExecutionId(),
getStepExecutionId(),
failedImoNumbers,
maxRetryCount,
lastErrorMessage
);
} else {
log.info("[{}] [RETRY MODE] 모든 재수집 건 정상 처리 완료", getReaderName());
}
int successCount = retryRecordKeys.size() - failedImoNumbers.size();
log.info("[{}] [RETRY MODE] 결과: 성공 {} 건, 재실패 {} 건",
getReaderName(), successCount, failedImoNumbers.size());
} else {
// 일반 모드: 기존 로직
if (!failedImoNumbers.isEmpty()) {
log.warn("[{}] 최종 실패 IMO 건수: {} 건", getReaderName(), failedImoNumbers.size());
log.warn("[{}] 실패 IMO 목록: {}", getReaderName(), failedImoNumbers);
batchFailedRecordService.saveFailedRecords(
"ShipDetailUpdateJob",
getJobExecutionId(),
getStepExecutionId(),
failedImoNumbers,
maxRetryCount,
lastErrorMessage
);
} else {
log.info("[{}] 모든 배치 정상 처리 완료 (실패 건 없음)", getReaderName());
}
}
} }
}catch (Exception e){ } catch (Exception e) {
log.info("[{}] 전체 {} 개 배치 처리 실패", getReaderName(), totalBatches); log.info("[{}] 전체 {} 개 배치 처리 실패", getReaderName(), totalBatches);
log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료", log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
getReaderName(), allImoNumbers.size()); getReaderName(), allImoNumbers.size());
} }
} }
} }

파일 보기

@ -0,0 +1,74 @@
package com.snp.batch.service;
import com.snp.batch.global.model.BatchFailedRecord;
import com.snp.batch.global.repository.BatchFailedRecordRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.List;
@Service
@RequiredArgsConstructor
@Slf4j
public class BatchFailedRecordService {
private final BatchFailedRecordRepository batchFailedRecordRepository;
/**
* 실패한 레코드 목록을 비동기로 DB에 저장합니다.
* REQUIRES_NEW를 사용하여 메인 배치 트랜잭션과 독립적으로 저장합니다.
*/
@Async("apiLogExecutor")
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void saveFailedRecords(
String jobName,
Long jobExecutionId,
Long stepExecutionId,
List<String> failedRecordKeys,
int retryCount,
String errorMessage
) {
try {
List<BatchFailedRecord> records = failedRecordKeys.stream()
.map(recordKey -> BatchFailedRecord.builder()
.jobName(jobName)
.jobExecutionId(jobExecutionId)
.stepExecutionId(stepExecutionId)
.recordKey(recordKey)
.errorMessage(errorMessage)
.retryCount(retryCount)
.status("FAILED")
.build())
.toList();
batchFailedRecordRepository.saveAll(records);
log.info("실패 레코드 {} 건 저장 완료 (job: {}, executionId: {})",
records.size(), jobName, jobExecutionId);
} catch (Exception e) {
log.error("실패 레코드 저장 실패: {}", e.getMessage());
}
}
/**
* 재수집 성공 건을 RESOLVED로 처리합니다.
* 원본 stepExecutionId로 범위를 제한하여 해당 Step의 실패 건만 RESOLVED 처리합니다.
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void resolveSuccessfulRetries(String jobName, Long sourceStepExecutionId,
List<String> allRetryKeys, List<String> failedAgainKeys) {
List<String> successfulKeys = allRetryKeys.stream()
.filter(key -> !failedAgainKeys.contains(key))
.toList();
if (!successfulKeys.isEmpty()) {
int resolved = batchFailedRecordRepository.resolveByStepExecutionIdAndRecordKeys(
jobName, sourceStepExecutionId, successfulKeys, LocalDateTime.now());
log.info("실패 레코드 RESOLVED 처리: {} 건 (job: {}, sourceStepExecutionId: {})",
resolved, jobName, sourceStepExecutionId);
}
}
}

파일 보기

@ -3,7 +3,9 @@ package com.snp.batch.service;
import com.snp.batch.common.batch.listener.RecollectionJobExecutionListener; import com.snp.batch.common.batch.listener.RecollectionJobExecutionListener;
import com.snp.batch.global.dto.*; import com.snp.batch.global.dto.*;
import com.snp.batch.global.model.BatchApiLog; import com.snp.batch.global.model.BatchApiLog;
import com.snp.batch.global.model.BatchFailedRecord;
import com.snp.batch.global.repository.BatchApiLogRepository; import com.snp.batch.global.repository.BatchApiLogRepository;
import com.snp.batch.global.repository.BatchFailedRecordRepository;
import com.snp.batch.global.repository.TimelineRepository; import com.snp.batch.global.repository.TimelineRepository;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -40,6 +42,7 @@ public class BatchService {
private final TimelineRepository timelineRepository; private final TimelineRepository timelineRepository;
private final RecollectionJobExecutionListener recollectionJobExecutionListener; private final RecollectionJobExecutionListener recollectionJobExecutionListener;
private final BatchApiLogRepository apiLogRepository; private final BatchApiLogRepository apiLogRepository;
private final BatchFailedRecordRepository failedRecordRepository;
@Autowired @Autowired
public BatchService(JobLauncher jobLauncher, public BatchService(JobLauncher jobLauncher,
@ -49,7 +52,8 @@ public class BatchService {
@Lazy ScheduleService scheduleService, @Lazy ScheduleService scheduleService,
TimelineRepository timelineRepository, TimelineRepository timelineRepository,
RecollectionJobExecutionListener recollectionJobExecutionListener, RecollectionJobExecutionListener recollectionJobExecutionListener,
BatchApiLogRepository apiLogRepository) { BatchApiLogRepository apiLogRepository,
BatchFailedRecordRepository failedRecordRepository) {
this.jobLauncher = jobLauncher; this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer; this.jobExplorer = jobExplorer;
this.jobOperator = jobOperator; this.jobOperator = jobOperator;
@ -58,6 +62,7 @@ public class BatchService {
this.timelineRepository = timelineRepository; this.timelineRepository = timelineRepository;
this.recollectionJobExecutionListener = recollectionJobExecutionListener; this.recollectionJobExecutionListener = recollectionJobExecutionListener;
this.apiLogRepository = apiLogRepository; this.apiLogRepository = apiLogRepository;
this.failedRecordRepository = failedRecordRepository;
} }
/** /**
@ -238,6 +243,20 @@ public class BatchService {
com.snp.batch.global.dto.JobExecutionDetailDto.StepApiLogSummary apiLogSummary = com.snp.batch.global.dto.JobExecutionDetailDto.StepApiLogSummary apiLogSummary =
buildStepApiLogSummary(stepExecution.getId()); buildStepApiLogSummary(stepExecution.getId());
// Step별 실패 레코드 조회
List<JobExecutionDetailDto.FailedRecordDto> failedRecordDtos =
failedRecordRepository.findByStepExecutionId(stepExecution.getId()).stream()
.map(record -> JobExecutionDetailDto.FailedRecordDto.builder()
.id(record.getId())
.jobName(record.getJobName())
.recordKey(record.getRecordKey())
.errorMessage(record.getErrorMessage())
.retryCount(record.getRetryCount())
.status(record.getStatus())
.createdAt(record.getCreatedAt())
.build())
.collect(Collectors.toList());
return com.snp.batch.global.dto.JobExecutionDetailDto.StepExecutionDto.builder() return com.snp.batch.global.dto.JobExecutionDetailDto.StepExecutionDto.builder()
.stepExecutionId(stepExecution.getId()) .stepExecutionId(stepExecution.getId())
.stepName(stepExecution.getStepName()) .stepName(stepExecution.getStepName())
@ -257,6 +276,7 @@ public class BatchService {
.duration(duration) .duration(duration)
.apiCallInfo(apiCallInfo) .apiCallInfo(apiCallInfo)
.apiLogSummary(apiLogSummary) .apiLogSummary(apiLogSummary)
.failedRecords(failedRecordDtos.isEmpty() ? null : failedRecordDtos)
.build(); .build();
} }

파일 보기

@ -4,8 +4,10 @@ import com.snp.batch.global.dto.JobExecutionDetailDto;
import com.snp.batch.global.model.BatchCollectionPeriod; import com.snp.batch.global.model.BatchCollectionPeriod;
import com.snp.batch.global.model.BatchLastExecution; import com.snp.batch.global.model.BatchLastExecution;
import com.snp.batch.global.model.BatchRecollectionHistory; import com.snp.batch.global.model.BatchRecollectionHistory;
import com.snp.batch.global.model.BatchFailedRecord;
import com.snp.batch.global.repository.BatchApiLogRepository; import com.snp.batch.global.repository.BatchApiLogRepository;
import com.snp.batch.global.repository.BatchCollectionPeriodRepository; import com.snp.batch.global.repository.BatchCollectionPeriodRepository;
import com.snp.batch.global.repository.BatchFailedRecordRepository;
import com.snp.batch.global.repository.BatchLastExecutionRepository; import com.snp.batch.global.repository.BatchLastExecutionRepository;
import com.snp.batch.global.repository.BatchRecollectionHistoryRepository; import com.snp.batch.global.repository.BatchRecollectionHistoryRepository;
import jakarta.persistence.criteria.Predicate; import jakarta.persistence.criteria.Predicate;
@ -37,6 +39,7 @@ public class RecollectionHistoryService {
private final BatchCollectionPeriodRepository periodRepository; private final BatchCollectionPeriodRepository periodRepository;
private final BatchLastExecutionRepository lastExecutionRepository; private final BatchLastExecutionRepository lastExecutionRepository;
private final BatchApiLogRepository apiLogRepository; private final BatchApiLogRepository apiLogRepository;
private final BatchFailedRecordRepository failedRecordRepository;
private final JobExplorer jobExplorer; private final JobExplorer jobExplorer;
/** /**
@ -267,6 +270,20 @@ public class RecollectionHistoryService {
JobExecutionDetailDto.StepApiLogSummary apiLogSummary = JobExecutionDetailDto.StepApiLogSummary apiLogSummary =
buildStepApiLogSummary(stepExecution.getId()); buildStepApiLogSummary(stepExecution.getId());
// Step별 실패 레코드 조회
List<JobExecutionDetailDto.FailedRecordDto> failedRecordDtos =
failedRecordRepository.findByStepExecutionId(stepExecution.getId()).stream()
.map(record -> JobExecutionDetailDto.FailedRecordDto.builder()
.id(record.getId())
.jobName(record.getJobName())
.recordKey(record.getRecordKey())
.errorMessage(record.getErrorMessage())
.retryCount(record.getRetryCount())
.status(record.getStatus())
.createdAt(record.getCreatedAt())
.build())
.collect(Collectors.toList());
return JobExecutionDetailDto.StepExecutionDto.builder() return JobExecutionDetailDto.StepExecutionDto.builder()
.stepExecutionId(stepExecution.getId()) .stepExecutionId(stepExecution.getId())
.stepName(stepExecution.getStepName()) .stepName(stepExecution.getStepName())
@ -286,6 +303,7 @@ public class RecollectionHistoryService {
.duration(duration) .duration(duration)
.apiCallInfo(apiCallInfo) .apiCallInfo(apiCallInfo)
.apiLogSummary(apiLogSummary) .apiLogSummary(apiLogSummary)
.failedRecords(failedRecordDtos.isEmpty() ? null : failedRecordDtos)
.build(); .build();
} }

파일 보기

@ -17,7 +17,7 @@ spring:
jpa: jpa:
hibernate: hibernate:
ddl-auto: update ddl-auto: update
show-sql: true show-sql: false
properties: properties:
hibernate: hibernate:
dialect: org.hibernate.dialect.PostgreSQLDialect dialect: org.hibernate.dialect.PostgreSQLDialect
@ -110,6 +110,13 @@ app:
enabled: true enabled: true
cron: "0 0 * * * ?" # Every hour cron: "0 0 * * * ?" # Every hour
# ShipDetailUpdate 배치 설정 (dev - 기존과 동일하게 20건 유지)
ship-detail-update:
batch-size: 20 # dev에서는 문제 없으므로 기존 20건 유지
delay-on-success-ms: 300
delay-on-failure-ms: 2000
max-retry-count: 3
# AIS Target 배치 설정 # AIS Target 배치 설정
ais-target: ais-target:
since-seconds: 60 # API 조회 범위 (초) since-seconds: 60 # API 조회 범위 (초)

파일 보기

@ -17,7 +17,7 @@ spring:
jpa: jpa:
hibernate: hibernate:
ddl-auto: update ddl-auto: update
show-sql: true show-sql: false
properties: properties:
hibernate: hibernate:
dialect: org.hibernate.dialect.PostgreSQLDialect dialect: org.hibernate.dialect.PostgreSQLDialect
@ -72,7 +72,7 @@ spring:
# Server Configuration # Server Configuration
server: server:
port: 8041 port: 9000
# port: 8041 # port: 8041
servlet: servlet:
context-path: /snp-api context-path: /snp-api
@ -112,6 +112,13 @@ app:
enabled: true enabled: true
cron: "0 0 * * * ?" # Every hour cron: "0 0 * * * ?" # Every hour
# ShipDetailUpdate 배치 설정 (prod 튜닝)
ship-detail-update:
batch-size: 10 # API 요청 당 IMO 건수 (프록시 타임아웃 방지)
delay-on-success-ms: 300 # 성공 시 딜레이 (ms)
delay-on-failure-ms: 2000 # 실패 시 딜레이 (ms)
max-retry-count: 3 # 최대 재시도 횟수
# AIS Target 배치 설정 # AIS Target 배치 설정
ais-target: ais-target:
since-seconds: 60 # API 조회 범위 (초) since-seconds: 60 # API 조회 범위 (초)

파일 보기

@ -165,6 +165,13 @@ app:
enabled: true enabled: true
cron: "0 0 * * * ?" # Every hour cron: "0 0 * * * ?" # Every hour
# ShipDetailUpdate 배치 설정
ship-detail-update:
batch-size: 10 # API 요청 당 IMO 건수
delay-on-success-ms: 300 # 성공 시 딜레이 (ms)
delay-on-failure-ms: 2000 # 실패 시 딜레이 (ms)
max-retry-count: 3 # 최대 재시도 횟수
# AIS Target Import 배치 설정 (캐시 업데이트 전용) # AIS Target Import 배치 설정 (캐시 업데이트 전용)
ais-target: ais-target:
since-seconds: 60 # API 조회 범위 (초) since-seconds: 60 # API 조회 범위 (초)