refactor: 미사용 배치 작업 13개 제거 (#40) #44

병합
HYOJIN feature/ISSUE-40-remove-unused-jobs 에서 develop 로 2 commits 를 머지했습니다 2026-03-13 13:02:13 +09:00
35개의 변경된 파일2개의 추가작업 그리고 4070개의 파일을 삭제

파일 보기

@ -46,6 +46,7 @@
- 실패 레코드 Upsert 패턴 적용 (동일 키 중복 방지) - 실패 레코드 Upsert 패턴 적용 (동일 키 중복 방지)
- 재시도 상태 배지 표시 (대기/재시도 N/3/재시도 초과) - 재시도 상태 배지 표시 (대기/재시도 N/3/재시도 초과)
- 미사용 Dead Code 정리 (~1,200 LOC 삭제) - 미사용 Dead Code 정리 (~1,200 LOC 삭제)
- 미사용 배치 작업 13개 제거 (~4,000 LOC 삭제) (#40)
### 기타 ### 기타
- Gitea 팀 프로젝트 워크플로우 구조 적용 - Gitea 팀 프로젝트 워크플로우 구조 적용

파일 보기

@ -20,8 +20,7 @@ import java.time.Duration;
* - 설정 변경 곳에서만 수정 * - 설정 변경 곳에서만 수정
* *
* 사용 Job: * 사용 Job:
* - shipDataImportJob: IMO 번호 조회 * - 도메인 Job에서 공통으로 재사용
* - shipDetailImportJob: 선박 상세 정보 조회
* *
* 다른 API 서버 추가 : * 다른 API 서버 추가 :
* - 새로운 Config 클래스 생성 (: OtherApiWebClientConfig) * - 새로운 Config 클래스 생성 (: OtherApiWebClientConfig)

파일 보기

@ -1,90 +0,0 @@
package com.snp.batch.jobs.compliance.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.compliance.batch.dto.ComplianceDto;
import com.snp.batch.jobs.compliance.batch.entity.ComplianceEntity;
import com.snp.batch.jobs.compliance.batch.processor.ComplianceDataProcessor;
import com.snp.batch.jobs.compliance.batch.reader.ComplianceDataReader;
import com.snp.batch.jobs.compliance.batch.writer.ComplianceDataWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
@Slf4j
@Configuration
public class ComplianceImportJobConfig extends BaseJobConfig<ComplianceDto, ComplianceEntity> {
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeServiceApiWebClient;
private final ComplianceDataProcessor complianceDataProcessor;
private final ComplianceDataWriter complianceDataWriter;
@Value("${app.batch.target-schema.name}")
private String targetSchema;
@Override
protected int getChunkSize() {
return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정
}
public ComplianceImportJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ComplianceDataProcessor complianceDataProcessor,
ComplianceDataWriter complianceDataWriter,
JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient")WebClient maritimeServiceApiWebClient) {
super(jobRepository, transactionManager);
this.jdbcTemplate = jdbcTemplate;
this.maritimeServiceApiWebClient = maritimeServiceApiWebClient;
this.complianceDataProcessor = complianceDataProcessor;
this.complianceDataWriter = complianceDataWriter;
}
@Override
protected String getJobName() {
return "ComplianceImportJob";
}
@Override
protected String getStepName() {
return "ComplianceImportStep";
}
@Override
protected ItemReader<ComplianceDto> createReader() {
return new ComplianceDataReader(maritimeServiceApiWebClient, jdbcTemplate, targetSchema);
}
@Override
protected ItemProcessor<ComplianceDto, ComplianceEntity> createProcessor() {
return complianceDataProcessor;
}
@Override
protected ItemWriter<ComplianceEntity> createWriter() {
return complianceDataWriter;
}
@Bean(name = "ComplianceImportJob")
public Job complianceImportJob() {
return job();
}
@Bean(name = "ComplianceImportStep")
public Step complianceImportStep() {
return step();
}
}

파일 보기

@ -1,150 +0,0 @@
package com.snp.batch.jobs.compliance.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.compliance.batch.dto.ComplianceDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.Collections;
import java.util.List;
@Slf4j
public class ComplianceDataReader extends BaseApiReader<ComplianceDto> {
//TODO :
// 1. Core20 IMO_NUMBER 전체 조회
// 2. IMO번호에 대한 마지막 AIS 신호 요청 (1회 최대 5000개 : Chunk 단위로 반복)
// 3. Response Data -> Core20에 업데이트 (Chunk 단위로 반복)
private final JdbcTemplate jdbcTemplate;
private final String targetSchema;
private List<String> allImoNumbers;
private int currentBatchIndex = 0;
private final int batchSize = 100;
public ComplianceDataReader(WebClient webClient, JdbcTemplate jdbcTemplate, String targetSchema) {
super(webClient);
this.jdbcTemplate = jdbcTemplate;
this.targetSchema = targetSchema;
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "ComplianceDataReader";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allImoNumbers = null;
}
@Override
protected String getApiPath() {
return "/RiskAndCompliance/CompliancesByImos";
}
private String getTargetTable(){
return targetSchema + ".ship_data";
}
private String getImoQuery() {
return "select imo_number as ihslrorimoshipno from " + getTargetTable() + " order by imo_number";
}
@Override
protected void beforeFetch(){
log.info("[{}] Core20 테이블에서 IMO 번호 조회 시작...", getReaderName());
allImoNumbers = jdbcTemplate.queryForList(getImoQuery(), String.class);
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
updateApiCallStats(totalBatches, 0);
}
@Override
protected List<ComplianceDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
return null; // Job 종료
}
// 현재 배치의 시작/ 인덱스 계산
int startIndex = currentBatchIndex;
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
try {
// IMO 번호를 쉼표로 연결 (: "1000019,1000021,1000033,...")
String imoParam = String.join(",", currentBatch);
// API 호출
List<ComplianceDto> response = callAisApiWithBatch(imoParam);
// 다음 배치로 인덱스 이동
currentBatchIndex = endIndex;
// 응답 처리
if (response != null) {
// List<ComplianceDto> targets = response;
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, response.size());
// API 호출 통계 업데이트
updateApiCallStats(totalBatches, currentBatchNumber);
// API 과부하 방지 (다음 배치 0.5초 대기)
if (currentBatchIndex < allImoNumbers.size()) {
Thread.sleep(500);
}
return response;
} else {
log.warn("[{}] 배치 {}/{} 응답 없음",
getReaderName(), currentBatchNumber, totalBatches);
// API 호출 통계 업데이트 (실패도 카운트)
updateApiCallStats(totalBatches, currentBatchNumber);
return Collections.emptyList();
}
} catch (Exception e) {
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
currentBatchIndex = endIndex;
// 리스트 반환 (Job 계속 진행)
return Collections.emptyList();
}
}
private List<ComplianceDto> callAisApiWithBatch(String imoNumbers) {
String url = getApiPath() + "?imos=" + imoNumbers;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToMono(new ParameterizedTypeReference<List<ComplianceDto>>() {})
.block();
}
}

파일 보기

@ -2,7 +2,6 @@ package com.snp.batch.jobs.facility.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.facility.batch.dto.PortDto; import com.snp.batch.jobs.facility.batch.dto.PortDto;
import com.snp.batch.jobs.shipimport.batch.dto.ShipApiResponse;
import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchApiLogService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.ParameterizedTypeReference;

파일 보기

@ -1,104 +0,0 @@
package com.snp.batch.jobs.movement.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.movement.batch.entity.AnchorageCallsEntity;
import com.snp.batch.jobs.movement.batch.processor.AnchorageCallsProcessor;
import com.snp.batch.jobs.movement.batch.reader.AnchorageCallsReader;
import com.snp.batch.jobs.movement.batch.writer.AnchorageCallsWriter;
import com.snp.batch.jobs.movement.batch.dto.AnchorageCallsDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* AnchorageCallsReader (ship_data Maritime API)
* (AnchorageCallsDto)
* AnchorageCallsProcessor
* (AnchorageCallsEntity)
* AnchorageCallsWriter
* (t_anchoragecall 테이블)
*/
@Slf4j
@Configuration
public class AnchorageCallsJobConfig extends BaseJobConfig<AnchorageCallsDto, AnchorageCallsEntity> {
private final AnchorageCallsProcessor anchorageCallsProcessor;
private final AnchorageCallsWriter anchorageCallsWriter;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient;
public AnchorageCallsJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
AnchorageCallsProcessor anchorageCallsProcessor,
AnchorageCallsWriter anchorageCallsWriter, JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient
) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager);
this.anchorageCallsProcessor = anchorageCallsProcessor;
this.anchorageCallsWriter = anchorageCallsWriter;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient;
}
@Override
protected String getJobName() {
return "AnchorageCallsImportJob";
}
@Override
protected String getStepName() {
return "AnchorageCallsImportStep";
}
@Override
protected ItemReader<AnchorageCallsDto> createReader() { // 타입 변경
return new AnchorageCallsReader(maritimeApiWebClient, jdbcTemplate);
}
@Override
protected ItemProcessor<AnchorageCallsDto, AnchorageCallsEntity> createProcessor() {
return anchorageCallsProcessor;
}
@Override
protected ItemWriter<AnchorageCallsEntity> createWriter() { // 타입 변경
return anchorageCallsWriter;
}
@Override
protected int getChunkSize() {
return 50; // API에서 100개씩 가져오므로 chunk도 100으로 설정
}
@Bean(name = "AnchorageCallsImportJob")
public Job anchorageCallsImportJob() {
return job();
}
@Bean(name = "AnchorageCallsImportStep")
public Step anchorageCallsImportStep() {
return step();
}
}

파일 보기

@ -1,107 +0,0 @@
package com.snp.batch.jobs.movement.batch.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.movement.batch.entity.BerthCallsEntity;
import com.snp.batch.jobs.movement.batch.processor.BerthCallsProcessor;
import com.snp.batch.jobs.movement.batch.writer.BerthCallsWriter;
import com.snp.batch.jobs.movement.batch.dto.BerthCallsDto;
import com.snp.batch.jobs.movement.batch.reader.BerthCallsReader;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* ShipMovementReader (ship_data Maritime API)
* (PortCallDto)
* ShipMovementProcessor
* (ShipMovementEntity)
* ShipDetailDataWriter
* (ship_movement 테이블)
*/
@Slf4j
@Configuration
public class BerthCallsJobConfig extends BaseJobConfig<BerthCallsDto, BerthCallsEntity> {
private final BerthCallsProcessor berthCallsProcessor;
private final BerthCallsWriter berthCallsWriter;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient;
private final ObjectMapper objectMapper; // ObjectMapper 주입 추가
public BerthCallsJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
BerthCallsProcessor berthCallsProcessor,
BerthCallsWriter berthCallsWriter, JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient,
ObjectMapper objectMapper) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager);
this.berthCallsProcessor = berthCallsProcessor;
this.berthCallsWriter = berthCallsWriter;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient;
this.objectMapper = objectMapper; // ObjectMapper 초기화
}
@Override
protected String getJobName() {
return "BerthCallsImportJob";
}
@Override
protected String getStepName() {
return "BerthCallsImportStep";
}
@Override
protected ItemReader<BerthCallsDto> createReader() { // 타입 변경
return new BerthCallsReader(maritimeApiWebClient, jdbcTemplate);
}
@Override
protected ItemProcessor<BerthCallsDto, BerthCallsEntity> createProcessor() {
return berthCallsProcessor;
}
@Override
protected ItemWriter<BerthCallsEntity> createWriter() { // 타입 변경
return berthCallsWriter;
}
@Override
protected int getChunkSize() {
return 200; // API에서 100개씩 가져오므로 chunk도 100으로 설정
}
@Bean(name = "BerthCallsImportJob")
public Job berthCallsImportJob() {
return job();
}
@Bean(name = "BerthCallsImportStep")
public Step berthCallsImportStep() {
return step();
}
}

파일 보기

@ -1,103 +0,0 @@
package com.snp.batch.jobs.movement.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.movement.batch.processor.CurrentlyAtProcessor;
import com.snp.batch.jobs.movement.batch.reader.CurrentlyAtReader;
import com.snp.batch.jobs.movement.batch.writer.CurrentlyAtWriter;
import com.snp.batch.jobs.movement.batch.dto.CurrentlyAtDto;
import com.snp.batch.jobs.movement.batch.entity.CurrentlyAtEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* CurrentlyAtReader (ship_data Maritime API)
* (CurrentlyAtDto)
* CurrentlyAtProcessor
* (CurrentlyAtEntity)
* CurrentlyAtWriter
* (currentlyat 테이블)
*/
@Slf4j
@Configuration
public class CurrentlyAtJobConfig extends BaseJobConfig<CurrentlyAtDto, CurrentlyAtEntity> {
private final CurrentlyAtProcessor currentlyAtProcessor;
private final CurrentlyAtWriter currentlyAtWriter;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient;
public CurrentlyAtJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
CurrentlyAtProcessor currentlyAtProcessor,
CurrentlyAtWriter currentlyAtWriter, JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager);
this.currentlyAtProcessor = currentlyAtProcessor;
this.currentlyAtWriter = currentlyAtWriter;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient;
}
@Override
protected String getJobName() {
return "CurrentlyAtImportJob";
}
@Override
protected String getStepName() {
return "CurrentlyAtImportStep";
}
@Override
protected ItemReader<CurrentlyAtDto> createReader() { // 타입 변경
return new CurrentlyAtReader(maritimeApiWebClient, jdbcTemplate);
}
@Override
protected ItemProcessor<CurrentlyAtDto, CurrentlyAtEntity> createProcessor() {
return currentlyAtProcessor;
}
@Override
protected ItemWriter<CurrentlyAtEntity> createWriter() { // 타입 변경
return currentlyAtWriter;
}
@Override
protected int getChunkSize() {
return 50; // API에서 100개씩 가져오므로 chunk도 100으로 설정
}
@Bean(name = "CurrentlyAtImportJob")
public Job currentlyAtImportJob() {
return job();
}
@Bean(name = "CurrentlyAtImportStep")
public Step currentlyAtImportStep() {
return step();
}
}

파일 보기

@ -1,103 +0,0 @@
package com.snp.batch.jobs.movement.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.movement.batch.entity.DestinationEntity;
import com.snp.batch.jobs.movement.batch.processor.DestinationProcessor;
import com.snp.batch.jobs.movement.batch.reader.DestinationReader;
import com.snp.batch.jobs.movement.batch.writer.DestinationWriter;
import com.snp.batch.jobs.movement.batch.dto.DestinationDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* DestinationReader (ship_data Maritime API)
* (DestinationDto)
* DestinationProcessor
* (DestinationEntity)
* DestinationProcessor
* (t_destination 테이블)
*/
@Slf4j
@Configuration
public class DestinationsJobConfig extends BaseJobConfig<DestinationDto, DestinationEntity> {
private final DestinationProcessor destinationProcessor;
private final DestinationWriter destinationWriter;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient;
public DestinationsJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
DestinationProcessor destinationProcessor,
DestinationWriter destinationWriter, JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager);
this.destinationProcessor = destinationProcessor;
this.destinationWriter = destinationWriter;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient;
}
@Override
protected String getJobName() {
return "DestinationsImportJob";
}
@Override
protected String getStepName() {
return "DestinationsImportStep";
}
@Override
protected ItemReader<DestinationDto> createReader() { // 타입 변경
return new DestinationReader(maritimeApiWebClient, jdbcTemplate);
}
@Override
protected ItemProcessor<DestinationDto, DestinationEntity> createProcessor() {
return destinationProcessor;
}
@Override
protected ItemWriter<DestinationEntity> createWriter() { // 타입 변경
return destinationWriter;
}
@Override
protected int getChunkSize() {
return 1000; // API에서 100개씩 가져오므로 chunk도 100으로 설정
}
@Bean(name = "DestinationsImportJob")
public Job destinationsImportJob() {
return job();
}
@Bean(name = "DestinationsImportStep")
public Step destinationsImportStep() {
return step();
}
}

파일 보기

@ -1,132 +0,0 @@
package com.snp.batch.jobs.movement.batch.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.movement.batch.dto.PortCallsDto;
import com.snp.batch.jobs.movement.batch.entity.PortCallsEntity;
import com.snp.batch.jobs.movement.batch.processor.PortCallsProcessor;
import com.snp.batch.jobs.movement.batch.reader.PortCallsReader;
import com.snp.batch.jobs.movement.batch.writer.PortCallsWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* PortCallsReader (ship_data Maritime API)
* (PortCallDto)
* PortCallsProcessor
* (PortCallsEntity)
* ShipDetailDataWriter
* (ship_movement 테이블)
*/
@Slf4j
@Configuration
public class ShipPortCallsJobConfig extends BaseJobConfig<PortCallsDto, PortCallsEntity> {
private final PortCallsProcessor portCallsProcessor;
private final PortCallsWriter portCallsWriter;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient;
private final ObjectMapper objectMapper; // ObjectMapper 주입 추가
public ShipPortCallsJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
PortCallsProcessor portCallsProcessor,
PortCallsWriter portCallsWriter, JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient,
ObjectMapper objectMapper) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager);
this.portCallsProcessor = portCallsProcessor;
this.portCallsWriter = portCallsWriter;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient;
this.objectMapper = objectMapper; // ObjectMapper 초기화
}
@Override
protected String getJobName() {
return "PortCallsImportJob";
}
@Override
protected String getStepName() {
return "PortCallsImportStep";
}
@Bean
@StepScope
public PortCallsReader portCallsReader(
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate) {
if (startDate == null || startDate.isBlank() ||
stopDate == null || stopDate.isBlank()) {
LocalDate yesterday = LocalDate.now().minusDays(1);
startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
}
PortCallsReader reader = new PortCallsReader(maritimeApiWebClient, jdbcTemplate, objectMapper);
reader.setStartDate(startDate);
reader.setStopDate(stopDate);
return reader;
}
@Override
protected ItemReader<PortCallsDto> createReader() { // 타입 변경
// Reader 생성자 수정: ObjectMapper를 전달합니다.
return portCallsReader( null, null);
//return new PortCallsReader(maritimeApiWebClient, jdbcTemplate, objectMapper);
}
@Override
protected ItemProcessor<PortCallsDto, PortCallsEntity> createProcessor() {
return portCallsProcessor;
}
@Override
protected ItemWriter<PortCallsEntity> createWriter() { // 타입 변경
return portCallsWriter;
}
@Override
protected int getChunkSize() {
return 1000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정
}
@Bean(name = "PortCallsImportJob")
public Job portCallsImportJob() {
return job();
}
@Bean(name = "PortCallsImportStep")
public Step portCallsImportStep() {
return step();
}
}

파일 보기

@ -1,104 +0,0 @@
package com.snp.batch.jobs.movement.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.movement.batch.processor.StsOperationProcessor;
import com.snp.batch.jobs.movement.batch.reader.StsOperationReader;
import com.snp.batch.jobs.movement.batch.writer.StsOperationWriter;
import com.snp.batch.jobs.movement.batch.dto.StsOperationDto;
import com.snp.batch.jobs.movement.batch.entity.StsOperationEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* StsOperationReader (ship_data Maritime API)
* (StsOperationDto)
* StsOperationProcessor
* (StsOperationEntity)
* StsOperationWriter
* (t_stsoperation 테이블)
*/
@Slf4j
@Configuration
public class StsOperationJobConfig extends BaseJobConfig<StsOperationDto, StsOperationEntity> {
private final StsOperationProcessor stsOperationProcessor;
private final StsOperationWriter stsOperationWriter;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient;
public StsOperationJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
StsOperationProcessor stsOperationProcessor,
StsOperationWriter stsOperationWriter, JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager);
this.stsOperationProcessor = stsOperationProcessor;
this.stsOperationWriter = stsOperationWriter;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient;
}
@Override
protected String getJobName() {
return "STSOperationImportJob";
}
@Override
protected String getStepName() {
return "STSOperationImportStep";
}
@Override
protected ItemReader<StsOperationDto> createReader() { // 타입 변경
// Reader 생성자 수정: ObjectMapper를 전달합니다.
return new StsOperationReader(maritimeApiWebClient, jdbcTemplate);
}
@Override
protected ItemProcessor<StsOperationDto, StsOperationEntity> createProcessor() {
return stsOperationProcessor;
}
@Override
protected ItemWriter<StsOperationEntity> createWriter() { // 타입 변경
return stsOperationWriter;
}
@Override
protected int getChunkSize() {
return 200; // API에서 100개씩 가져오므로 chunk도 100으로 설정
}
@Bean(name = "STSOperationImportJob")
public Job stsOperationImportJob() {
return job();
}
@Bean(name = "STSOperationImportStep")
public Step stsOperationImportStep() {
return step();
}
}

파일 보기

@ -1,103 +0,0 @@
package com.snp.batch.jobs.movement.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.movement.batch.entity.TerminalCallsEntity;
import com.snp.batch.jobs.movement.batch.processor.TerminalCallsProcessor;
import com.snp.batch.jobs.movement.batch.reader.TerminalCallsReader;
import com.snp.batch.jobs.movement.batch.writer.TerminalCallsWriter;
import com.snp.batch.jobs.movement.batch.dto.TerminalCallsDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* TerminalCallsReader (ship_data Maritime API)
* (TerminalCallsDto)
* TerminalCallsProcessor
* (TerminalCallsEntity)
* TerminalCallsWriter
* (t_terminalcall 테이블)
*/
@Slf4j
@Configuration
public class TerminalCallsJobConfig extends BaseJobConfig<TerminalCallsDto, TerminalCallsEntity> {
private final TerminalCallsProcessor terminalCallsProcessor;
private final TerminalCallsWriter terminalCallsWriter;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient;
public TerminalCallsJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
TerminalCallsProcessor terminalCallsProcessor,
TerminalCallsWriter terminalCallsWriter, JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager);
this.terminalCallsProcessor = terminalCallsProcessor;
this.terminalCallsWriter = terminalCallsWriter;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient;
}
@Override
protected String getJobName() {
return "TerminalCallsImportJob";
}
@Override
protected String getStepName() {
return "TerminalCallImportStep";
}
@Override
protected ItemReader<TerminalCallsDto> createReader() { // 타입 변경
return new TerminalCallsReader(maritimeApiWebClient, jdbcTemplate);
}
@Override
protected ItemProcessor<TerminalCallsDto, TerminalCallsEntity> createProcessor() {
return terminalCallsProcessor;
}
@Override
protected ItemWriter<TerminalCallsEntity> createWriter() { // 타입 변경
return terminalCallsWriter;
}
@Override
protected int getChunkSize() {
return 1000; // API에서 100개씩 가져오므로 chunk도 100으로 설정
}
@Bean(name = "TerminalCallsImportJob")
public Job terminalCallsImportJob() {
return job();
}
@Bean(name = "TerminalCallImportStep")
public Step terminalCallImportStep() {
return step();
}
}

파일 보기

@ -1,103 +0,0 @@
package com.snp.batch.jobs.movement.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.movement.batch.dto.TransitsDto;
import com.snp.batch.jobs.movement.batch.entity.TransitsEntity;
import com.snp.batch.jobs.movement.batch.processor.TransitsProcessor;
import com.snp.batch.jobs.movement.batch.reader.TransitsReader;
import com.snp.batch.jobs.movement.batch.writer.TransitsWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* TransitsReader (ship_data Maritime API)
* (TransitsDto)
* TransitsProcessor
* (TransitsEntity)
* TransitsWriter
* (t_transit 테이블)
*/
@Slf4j
@Configuration
public class TransitsJobConfig extends BaseJobConfig<TransitsDto, TransitsEntity> {
private final TransitsProcessor transitsProcessor;
private final TransitsWriter transitsWriter;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient;
public TransitsJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
TransitsProcessor TransitsProcessor,
TransitsWriter transitsWriter, JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager);
this.transitsProcessor = TransitsProcessor;
this.transitsWriter = transitsWriter;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient;
}
@Override
protected String getJobName() {
return "TransitsImportJob";
}
@Override
protected String getStepName() {
return "TransitsImportStep";
}
@Override
protected ItemReader<TransitsDto> createReader() { // 타입 변경
return new TransitsReader(maritimeApiWebClient, jdbcTemplate);
}
@Override
protected ItemProcessor<TransitsDto, TransitsEntity> createProcessor() {
return transitsProcessor;
}
@Override
protected ItemWriter<TransitsEntity> createWriter() { // 타입 변경
return transitsWriter;
}
@Override
protected int getChunkSize() {
return 1000; // API에서 100개씩 가져오므로 chunk도 100으로 설정
}
@Bean(name = "TransitsImportJob")
public Job transitsImportJob() {
return job();
}
@Bean(name = "TransitsImportStep")
public Step transitsImportStep() {
return step();
}
}

파일 보기

@ -1,216 +0,0 @@
package com.snp.batch.jobs.movement.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.movement.batch.dto.AnchorageCallsDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
@StepScope
public class AnchorageCallsReader extends BaseApiReader<AnchorageCallsDto> {
private final JdbcTemplate jdbcTemplate;
// 배치 처리 상태
private List<String> allImoNumbers;
// DB 해시값을 저장할
private Map<String, String> dbMasterHashes;
private int currentBatchIndex = 0;
private final int batchSize = 5;
// @Value("#{jobParameters['startDate']}")
// private String startDate;
private String startDate = "2025-01-01";
// @Value("#{jobParameters['stopDate']}")
// private String stopDate;
private String stopDate = "2025-12-31";
public AnchorageCallsReader(WebClient webClient, JdbcTemplate jdbcTemplate ) {
super(webClient);
this.jdbcTemplate = jdbcTemplate;
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "AnchorageCallsReader";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allImoNumbers = null;
this.dbMasterHashes = null;
}
@Override
protected String getApiPath() {
return "/Movements/AnchorageCalls";
}
@Override
protected String getApiBaseUrl() {
return "https://webservices.maritime.spglobal.com";
}
private static final String GET_ALL_IMO_QUERY =
"SELECT imo_number FROM ship_data ORDER BY id";
// "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_anchoragecall) ORDER BY imo_number";
private static final String FETCH_ALL_HASHES_QUERY =
"SELECT imo_number, ship_detail_hash FROM ship_detail_hash_json ORDER BY imo_number";
/**
* 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회
*/
@Override
protected void beforeFetch() {
// 전처리 과정
// Step 1. IMO 전체 번호 조회
log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName());
allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class);
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
// API 통계 초기화
updateApiCallStats(totalBatches, 0);
}
/**
* Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
*
* Spring Batch가 100건씩 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 100건 ( 이상 없으면 null)
*/
@Override
protected List<AnchorageCallsDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
return null; // Job 종료
}
// 현재 배치의 시작/ 인덱스 계산
int startIndex = currentBatchIndex;
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
try {
// IMO 번호를 쉼표로 연결 (: "1000019,1000021,1000033,...")
String imoParam = String.join(",", currentBatch);
// API 호출
List<AnchorageCallsDto> response = callApiWithBatch(imoParam);
// 다음 배치로 인덱스 이동
currentBatchIndex = endIndex;
// 응답 처리
if (response != null ) {
List<AnchorageCallsDto> anchorageCalls = response;
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, anchorageCalls.size());
// API 호출 통계 업데이트
updateApiCallStats(totalBatches, currentBatchNumber);
// API 과부하 방지 (다음 배치 0.5초 대기)
if (currentBatchIndex < allImoNumbers.size()) {
Thread.sleep(500);
}
return anchorageCalls;
} else {
log.warn("[{}] 배치 {}/{} 응답 없음",
getReaderName(), currentBatchNumber, totalBatches);
// API 호출 통계 업데이트 (실패도 카운트)
updateApiCallStats(totalBatches, currentBatchNumber);
return Collections.emptyList();
}
} catch (Exception e) {
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
currentBatchIndex = endIndex;
// 리스트 반환 (Job 계속 진행)
return Collections.emptyList();
}
}
/**
* Query Parameter를 사용한 API 호출
*
* @param lrno 쉼표로 연결된 IMO 번호 (: "1000019,1000021,...")
* @return API 응답
*/
private List<AnchorageCallsDto> callApiWithBatch(String lrno) {
String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate+"&lrno=" + lrno;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(AnchorageCallsDto.class)
.collectList()
.block();
}
@Override
protected void afterFetch(List<AnchorageCallsDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
getReaderName(), allImoNumbers.size());
}
}
}

파일 보기

@ -1,213 +0,0 @@
package com.snp.batch.jobs.movement.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.movement.batch.dto.BerthCallsDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
@StepScope
public class BerthCallsReader extends BaseApiReader<BerthCallsDto> {
private final JdbcTemplate jdbcTemplate;
// 배치 처리 상태
private List<String> allImoNumbers;
// DB 해시값을 저장할
private Map<String, String> dbMasterHashes;
private int currentBatchIndex = 0;
private final int batchSize = 5;
// @Value("#{jobParameters['startDate']}")
// private String startDate;
private String startDate = "2025-01-01";
// @Value("#{jobParameters['stopDate']}")
// private String stopDate;
private String stopDate = "2025-12-31";
public BerthCallsReader(WebClient webClient, JdbcTemplate jdbcTemplate ) {
super(webClient);
this.jdbcTemplate = jdbcTemplate;
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "BerthCallsReader";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allImoNumbers = null;
this.dbMasterHashes = null;
}
@Override
protected String getApiPath() {
return "/Movements/BerthCalls";
}
@Override
protected String getApiBaseUrl() {
return "https://webservices.maritime.spglobal.com";
}
private static final String GET_ALL_IMO_QUERY =
"SELECT imo_number FROM ship_data ORDER BY id";
// "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_berthcall) ORDER BY imo_number";
/**
* 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회
*/
@Override
protected void beforeFetch() {
// 전처리 과정
// Step 1. IMO 전체 번호 조회
log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName());
allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class);
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
// API 통계 초기화
updateApiCallStats(totalBatches, 0);
}
/**
* Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
*
* Spring Batch가 100건씩 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 100건 ( 이상 없으면 null)
*/
@Override
protected List<BerthCallsDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
return null; // Job 종료
}
// 현재 배치의 시작/ 인덱스 계산
int startIndex = currentBatchIndex;
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
try {
// IMO 번호를 쉼표로 연결 (: "1000019,1000021,1000033,...")
String imoParam = String.join(",", currentBatch);
// API 호출
List<BerthCallsDto> response = callApiWithBatch(imoParam);
// 다음 배치로 인덱스 이동
currentBatchIndex = endIndex;
// 응답 처리
if (response != null ) {
List<BerthCallsDto> berthCalls = response;
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, berthCalls.size());
// API 호출 통계 업데이트
updateApiCallStats(totalBatches, currentBatchNumber);
// API 과부하 방지 (다음 배치 0.5초 대기)
if (currentBatchIndex < allImoNumbers.size()) {
Thread.sleep(500);
}
return berthCalls;
} else {
log.warn("[{}] 배치 {}/{} 응답 없음",
getReaderName(), currentBatchNumber, totalBatches);
// API 호출 통계 업데이트 (실패도 카운트)
updateApiCallStats(totalBatches, currentBatchNumber);
return Collections.emptyList();
}
} catch (Exception e) {
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
currentBatchIndex = endIndex;
// 리스트 반환 (Job 계속 진행)
return Collections.emptyList();
}
}
/**
* Query Parameter를 사용한 API 호출
*
* @param lrno 쉼표로 연결된 IMO 번호 (: "1000019,1000021,...")
* @return API 응답
*/
private List<BerthCallsDto> callApiWithBatch(String lrno) {
String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate+"&lrno=" + lrno;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(BerthCallsDto.class)
.collectList()
.block();
}
@Override
protected void afterFetch(List<BerthCallsDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
getReaderName(), allImoNumbers.size());
}
}
}

파일 보기

@ -1,211 +0,0 @@
package com.snp.batch.jobs.movement.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.movement.batch.dto.CurrentlyAtDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.Collections;
import java.util.List;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
* <p>
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
* <p>
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
* <p>
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
@StepScope
public class CurrentlyAtReader extends BaseApiReader<CurrentlyAtDto> {
private final JdbcTemplate jdbcTemplate;
// 배치 처리 상태
private List<String> allImoNumbers;
// DB 해시값을 저장할
private int currentBatchIndex = 0;
private final int batchSize = 10;
// @Value("#{jobParameters['startDate']}")
// private String startDate;
// private String startDate = "2025-01-01";
// @Value("#{jobParameters['stopDate']}")
// private String stopDate;
// private String stopDate = "2024-12-31";
public CurrentlyAtReader(WebClient webClient, JdbcTemplate jdbcTemplate) {
super(webClient);
this.jdbcTemplate = jdbcTemplate;
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "CurrentlyAtReader";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allImoNumbers = null;
}
@Override
protected String getApiPath() {
return "/Movements/CurrentlyAt";
}
@Override
protected String getApiBaseUrl() {
return "https://webservices.maritime.spglobal.com";
}
private static final String GET_ALL_IMO_QUERY =
"SELECT imo_number FROM ship_data ORDER BY id";
// "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_currentlyat) ORDER BY imo_number";
/**
* 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회
*/
@Override
protected void beforeFetch() {
// 전처리 과정
// Step 1. IMO 전체 번호 조회
log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName());
allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class);
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
// API 통계 초기화
updateApiCallStats(totalBatches, 0);
}
/**
* Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
* <p>
* Spring Batch가 100건씩 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 100건 ( 이상 없으면 null)
*/
@Override
protected List<CurrentlyAtDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
return null; // Job 종료
}
// 현재 배치의 시작/ 인덱스 계산
int startIndex = currentBatchIndex;
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
try {
// IMO 번호를 쉼표로 연결 (: "1000019,1000021,1000033,...")
String imoParam = String.join(",", currentBatch);
// API 호출
List<CurrentlyAtDto> response = callApiWithBatch(imoParam);
// 다음 배치로 인덱스 이동
currentBatchIndex = endIndex;
// 응답 처리
if (response != null) {
List<CurrentlyAtDto> portCalls = response;
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, portCalls.size());
// API 호출 통계 업데이트
updateApiCallStats(totalBatches, currentBatchNumber);
// API 과부하 방지 (다음 배치 0.5초 대기)
if (currentBatchIndex < allImoNumbers.size()) {
Thread.sleep(500);
}
return portCalls;
} else {
log.warn("[{}] 배치 {}/{} 응답 없음",
getReaderName(), currentBatchNumber, totalBatches);
// API 호출 통계 업데이트 (실패도 카운트)
updateApiCallStats(totalBatches, currentBatchNumber);
return Collections.emptyList();
}
} catch (Exception e) {
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
currentBatchIndex = endIndex;
// 리스트 반환 (Job 계속 진행)
return Collections.emptyList();
}
}
/**
* Query Parameter를 사용한 API 호출
*
* @param lrno 쉼표로 연결된 IMO 번호 (: "1000019,1000021,...")
* @return API 응답
*/
private List<CurrentlyAtDto> callApiWithBatch(String lrno) {
String url = getApiPath() + "?lrno=" + lrno;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(CurrentlyAtDto.class)
.collectList()
.block();
}
@Override
protected void afterFetch(List<CurrentlyAtDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
getReaderName(), allImoNumbers.size());
}
}
}

파일 보기

@ -1,210 +0,0 @@
package com.snp.batch.jobs.movement.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.movement.batch.dto.DestinationDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.Collections;
import java.util.List;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
@StepScope
public class DestinationReader extends BaseApiReader<DestinationDto> {
private final JdbcTemplate jdbcTemplate;
// 배치 처리 상태
private List<String> allImoNumbers;
// DB 해시값을 저장할
private int currentBatchIndex = 0;
private final int batchSize = 5;
// @Value("#{jobParameters['startDate']}")
// private String startDate;
private String startDate = "2025-01-01";
// @Value("#{jobParameters['stopDate']}")
// private String stopDate;
private String stopDate = "2025-12-31";
public DestinationReader(WebClient webClient, JdbcTemplate jdbcTemplate ) {
super(webClient);
this.jdbcTemplate = jdbcTemplate;
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "Destinations";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allImoNumbers = null;
}
@Override
protected String getApiPath() {
return "/Movements/Destinations";
}
@Override
protected String getApiBaseUrl() {
return "https://webservices.maritime.spglobal.com";
}
private static final String GET_ALL_IMO_QUERY =
"SELECT imo_number FROM ship_data ORDER BY id";
// "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_destination) ORDER BY imo_number";
/**
* 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회
*/
@Override
protected void beforeFetch() {
// 전처리 과정
// Step 1. IMO 전체 번호 조회
log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName());
allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class);
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
// API 통계 초기화
updateApiCallStats(totalBatches, 0);
}
/**
* Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
*
* Spring Batch가 100건씩 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 100건 ( 이상 없으면 null)
*/
@Override
protected List<DestinationDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
return null; // Job 종료
}
// 현재 배치의 시작/ 인덱스 계산
int startIndex = currentBatchIndex;
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
try {
// IMO 번호를 쉼표로 연결 (: "1000019,1000021,1000033,...")
String imoParam = String.join(",", currentBatch);
// API 호출
List<DestinationDto> response = callApiWithBatch(imoParam);
// 다음 배치로 인덱스 이동
currentBatchIndex = endIndex;
// 응답 처리
if (response != null ) {
List<DestinationDto> destinations = response;
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, destinations.size());
// API 호출 통계 업데이트
updateApiCallStats(totalBatches, currentBatchNumber);
// API 과부하 방지 (다음 배치 0.5초 대기)
if (currentBatchIndex < allImoNumbers.size()) {
Thread.sleep(500);
}
return destinations;
} else {
log.warn("[{}] 배치 {}/{} 응답 없음",
getReaderName(), currentBatchNumber, totalBatches);
// API 호출 통계 업데이트 (실패도 카운트)
updateApiCallStats(totalBatches, currentBatchNumber);
return Collections.emptyList();
}
} catch (Exception e) {
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
currentBatchIndex = endIndex;
// 리스트 반환 (Job 계속 진행)
return Collections.emptyList();
}
}
/**
* Query Parameter를 사용한 API 호출
*
* @param lrno 쉼표로 연결된 IMO 번호 (: "1000019,1000021,...")
* @return API 응답
*/
private List<DestinationDto> callApiWithBatch(String lrno) {
String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate+"&lrno=" + lrno;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(DestinationDto.class)
.collectList()
.block();
}
@Override
protected void afterFetch(List<DestinationDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
getReaderName(), allImoNumbers.size());
}
}
}

파일 보기

@ -1,216 +0,0 @@
package com.snp.batch.jobs.movement.batch.reader;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.movement.batch.dto.PortCallsDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.*;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
@StepScope
public class PortCallsReader extends BaseApiReader<PortCallsDto> {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
// 배치 처리 상태
private List<String> allImoNumbers;
// DB 해시값을 저장할
private Map<String, String> dbMasterHashes;
private int currentBatchIndex = 0;
private final int batchSize = 10;
// @Value("#{jobParameters['startDate']}")
// private String startDate;
private String startDate = "2025-01-01";
// @Value("#{jobParameters['stopDate']}")
// private String stopDate;
private String stopDate = "2025-12-31";
public void setStartDate(String startDate) {this.startDate = startDate;}
public void setStopDate(String stopDate){this.stopDate=stopDate;}
public PortCallsReader(WebClient webClient, JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) {
super(webClient);
this.jdbcTemplate = jdbcTemplate;
this.objectMapper = objectMapper;
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "ShipMovementReader";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allImoNumbers = null;
this.dbMasterHashes = null;
}
@Override
protected String getApiPath() {
return "/Movements/PortCalls";
}
@Override
protected String getApiBaseUrl() {
return "https://webservices.maritime.spglobal.com";
}
private static final String GET_ALL_IMO_QUERY =
"SELECT imo_number FROM ship_data ORDER BY id";
// "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_ship_stpov_info) ORDER BY imo_number";
/**
* 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회
*/
@Override
protected void beforeFetch() {
// 전처리 과정
// Step 1. IMO 전체 번호 조회
log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName());
allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class);
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
// API 통계 초기화
updateApiCallStats(totalBatches, 0);
}
/**
* Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
*
* Spring Batch가 100건씩 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 100건 ( 이상 없으면 null)
*/
@Override
protected List<PortCallsDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
return null; // Job 종료
}
// 현재 배치의 시작/ 인덱스 계산
int startIndex = currentBatchIndex;
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
try {
// IMO 번호를 쉼표로 연결 (: "1000019,1000021,1000033,...")
String imoParam = String.join(",", currentBatch);
// API 호출
// ShipMovementApiResponse response = callApiWithBatch(imoParam);
List<PortCallsDto> response= callApiWithBatch(imoParam);
// 다음 배치로 인덱스 이동
currentBatchIndex = endIndex;
// 응답 처리
if (response != null) {
List<PortCallsDto> portCalls = response;
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, portCalls.size());
// API 호출 통계 업데이트
updateApiCallStats(totalBatches, currentBatchNumber);
// API 과부하 방지 (다음 배치 0.5초 대기)
if (currentBatchIndex < allImoNumbers.size()) {
Thread.sleep(500);
}
return portCalls;
} else {
log.warn("[{}] 배치 {}/{} 응답 없음",
getReaderName(), currentBatchNumber, totalBatches);
// API 호출 통계 업데이트 (실패도 카운트)
updateApiCallStats(totalBatches, currentBatchNumber);
return Collections.emptyList();
}
} catch (Exception e) {
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
currentBatchIndex = endIndex;
// 리스트 반환 (Job 계속 진행)
return Collections.emptyList();
}
}
/**
* Query Parameter를 사용한 API 호출
*
* @param lrno 쉼표로 연결된 IMO 번호 (: "1000019,1000021,...")
* @return API 응답
*/
private List<PortCallsDto> callApiWithBatch(String lrno) {
String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate+"&lrno=" + lrno;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(PortCallsDto.class)
.collectList()
.block();
}
@Override
protected void afterFetch(List<PortCallsDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
getReaderName(), allImoNumbers.size());
}
}
}

파일 보기

@ -1,213 +0,0 @@
package com.snp.batch.jobs.movement.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.movement.batch.dto.StsOperationDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
@StepScope
public class StsOperationReader extends BaseApiReader<StsOperationDto> {
private final JdbcTemplate jdbcTemplate;
// 배치 처리 상태
private List<String> allImoNumbers;
// DB 해시값을 저장할
private Map<String, String> dbMasterHashes;
private int currentBatchIndex = 0;
private final int batchSize = 5;
// @Value("#{jobParameters['startDate']}")
// private String startDate;
private String startDate = "2025-01-01";
// @Value("#{jobParameters['stopDate']}")
// private String stopDate;
private String stopDate = "2025-12-31";
public StsOperationReader(WebClient webClient, JdbcTemplate jdbcTemplate ) {
super(webClient);
this.jdbcTemplate = jdbcTemplate;
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "StsOperationReader";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allImoNumbers = null;
this.dbMasterHashes = null;
}
@Override
protected String getApiPath() {
return "/Movements/StsOperations";
}
@Override
protected String getApiBaseUrl() {
return "https://webservices.maritime.spglobal.com";
}
private static final String GET_ALL_IMO_QUERY =
"SELECT imo_number FROM ship_data ORDER BY id";
// "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_stsoperation) ORDER BY imo_number";
/**
* 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회
*/
@Override
protected void beforeFetch() {
// 전처리 과정
// Step 1. IMO 전체 번호 조회
log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName());
allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class);
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
// API 통계 초기화
updateApiCallStats(totalBatches, 0);
}
/**
* Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
*
* Spring Batch가 100건씩 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 100건 ( 이상 없으면 null)
*/
@Override
protected List<StsOperationDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
return null; // Job 종료
}
// 현재 배치의 시작/ 인덱스 계산
int startIndex = currentBatchIndex;
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
try {
// IMO 번호를 쉼표로 연결 (: "1000019,1000021,1000033,...")
String imoParam = String.join(",", currentBatch);
// API 호출
List<StsOperationDto> response = callApiWithBatch(imoParam);
// 다음 배치로 인덱스 이동
currentBatchIndex = endIndex;
// 응답 처리
if (response != null ) {
List<StsOperationDto> responseList = response;
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, responseList.size());
// API 호출 통계 업데이트
updateApiCallStats(totalBatches, currentBatchNumber);
// API 과부하 방지 (다음 배치 0.5초 대기)
if (currentBatchIndex < allImoNumbers.size()) {
Thread.sleep(500);
}
return responseList;
} else {
log.warn("[{}] 배치 {}/{} 응답 없음",
getReaderName(), currentBatchNumber, totalBatches);
// API 호출 통계 업데이트 (실패도 카운트)
updateApiCallStats(totalBatches, currentBatchNumber);
return Collections.emptyList();
}
} catch (Exception e) {
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
currentBatchIndex = endIndex;
// 리스트 반환 (Job 계속 진행)
return Collections.emptyList();
}
}
/**
* Query Parameter를 사용한 API 호출
*
* @param lrno 쉼표로 연결된 IMO 번호 (: "1000019,1000021,...")
* @return API 응답
*/
private List<StsOperationDto> callApiWithBatch(String lrno) {
String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate+"&lrno=" + lrno;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(StsOperationDto.class)
.collectList()
.block();
}
@Override
protected void afterFetch(List<StsOperationDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
getReaderName(), allImoNumbers.size());
}
}
}

파일 보기

@ -1,213 +0,0 @@
package com.snp.batch.jobs.movement.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.movement.batch.dto.TerminalCallsDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
@StepScope
public class TerminalCallsReader extends BaseApiReader<TerminalCallsDto> {
private final JdbcTemplate jdbcTemplate;
// 배치 처리 상태
private List<String> allImoNumbers;
// DB 해시값을 저장할
private Map<String, String> dbMasterHashes;
private int currentBatchIndex = 0;
private final int batchSize = 5;
// @Value("#{jobParameters['startDate']}")
// private String startDate;
private String startDate = "2025-01-01";
// @Value("#{jobParameters['stopDate']}")
// private String stopDate;
private String stopDate = "2025-12-31";
public TerminalCallsReader(WebClient webClient, JdbcTemplate jdbcTemplate ) {
super(webClient);
this.jdbcTemplate = jdbcTemplate;
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "TerminalCalls";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allImoNumbers = null;
this.dbMasterHashes = null;
}
@Override
protected String getApiPath() {
return "/Movements/TerminalCalls";
}
@Override
protected String getApiBaseUrl() {
return "https://webservices.maritime.spglobal.com";
}
private static final String GET_ALL_IMO_QUERY =
"SELECT imo_number FROM ship_data ORDER BY id";
// "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_terminalcall) ORDER BY imo_number";
/**
* 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회
*/
@Override
protected void beforeFetch() {
// 전처리 과정
// Step 1. IMO 전체 번호 조회
log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName());
allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class);
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
// API 통계 초기화
updateApiCallStats(totalBatches, 0);
}
/**
* Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
*
* Spring Batch가 100건씩 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 100건 ( 이상 없으면 null)
*/
@Override
protected List<TerminalCallsDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
return null; // Job 종료
}
// 현재 배치의 시작/ 인덱스 계산
int startIndex = currentBatchIndex;
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
try {
// IMO 번호를 쉼표로 연결 (: "1000019,1000021,1000033,...")
String imoParam = String.join(",", currentBatch);
// API 호출
List<TerminalCallsDto> response = callApiWithBatch(imoParam);
// 다음 배치로 인덱스 이동
currentBatchIndex = endIndex;
// 응답 처리
if (response != null ) {
List<TerminalCallsDto> terminalCalls = response;
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, terminalCalls.size());
// API 호출 통계 업데이트
updateApiCallStats(totalBatches, currentBatchNumber);
// API 과부하 방지 (다음 배치 0.5초 대기)
if (currentBatchIndex < allImoNumbers.size()) {
Thread.sleep(500);
}
return terminalCalls;
} else {
log.warn("[{}] 배치 {}/{} 응답 없음",
getReaderName(), currentBatchNumber, totalBatches);
// API 호출 통계 업데이트 (실패도 카운트)
updateApiCallStats(totalBatches, currentBatchNumber);
return Collections.emptyList();
}
} catch (Exception e) {
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
currentBatchIndex = endIndex;
// 리스트 반환 (Job 계속 진행)
return Collections.emptyList();
}
}
/**
* Query Parameter를 사용한 API 호출
*
* @param lrno 쉼표로 연결된 IMO 번호 (: "1000019,1000021,...")
* @return API 응답
*/
private List<TerminalCallsDto> callApiWithBatch(String lrno) {
String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate+"&lrno=" + lrno;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(TerminalCallsDto.class)
.collectList()
.block();
}
@Override
protected void afterFetch(List<TerminalCallsDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
getReaderName(), allImoNumbers.size());
}
}
}

파일 보기

@ -1,210 +0,0 @@
package com.snp.batch.jobs.movement.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.movement.batch.dto.TransitsDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.Collections;
import java.util.List;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
@StepScope
public class TransitsReader extends BaseApiReader<TransitsDto> {
private final JdbcTemplate jdbcTemplate;
// 배치 처리 상태
private List<String> allImoNumbers;
// DB 해시값을 저장할
private int currentBatchIndex = 0;
private final int batchSize = 5;
// @Value("#{jobParameters['startDate']}")
// private String startDate;
private String startDate = "2025-01-01";
// @Value("#{jobParameters['stopDate']}")
// private String stopDate;
private String stopDate = "2025-12-31";
public TransitsReader(WebClient webClient, JdbcTemplate jdbcTemplate ) {
super(webClient);
this.jdbcTemplate = jdbcTemplate;
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "Transits";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allImoNumbers = null;
}
@Override
protected String getApiPath() {
return "/Movements/Transits";
}
@Override
protected String getApiBaseUrl() {
return "https://webservices.maritime.spglobal.com";
}
private static final String GET_ALL_IMO_QUERY =
"SELECT imo_number FROM ship_data ORDER BY id";
// "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_transit) ORDER BY imo_number";
/**
* 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회
*/
@Override
protected void beforeFetch() {
// 전처리 과정
// Step 1. IMO 전체 번호 조회
log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName());
allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class);
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
// API 통계 초기화
updateApiCallStats(totalBatches, 0);
}
/**
* Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
*
* Spring Batch가 100건씩 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 100건 ( 이상 없으면 null)
*/
@Override
protected List<TransitsDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
return null; // Job 종료
}
// 현재 배치의 시작/ 인덱스 계산
int startIndex = currentBatchIndex;
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
try {
// IMO 번호를 쉼표로 연결 (: "1000019,1000021,1000033,...")
String imoParam = String.join(",", currentBatch);
// API 호출
List<TransitsDto> response = callApiWithBatch(imoParam);
// 다음 배치로 인덱스 이동
currentBatchIndex = endIndex;
// 응답 처리
if (response != null ) {
List<TransitsDto> transits = response;
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, transits.size());
// API 호출 통계 업데이트
updateApiCallStats(totalBatches, currentBatchNumber);
// API 과부하 방지 (다음 배치 0.5초 대기)
if (currentBatchIndex < allImoNumbers.size()) {
Thread.sleep(500);
}
return transits;
} else {
log.warn("[{}] 배치 {}/{} 응답 없음",
getReaderName(), currentBatchNumber, totalBatches);
// API 호출 통계 업데이트 (실패도 카운트)
updateApiCallStats(totalBatches, currentBatchNumber);
return Collections.emptyList();
}
} catch (Exception e) {
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
currentBatchIndex = endIndex;
// 리스트 반환 (Job 계속 진행)
return Collections.emptyList();
}
}
/**
* Query Parameter를 사용한 API 호출
*
* @param lrno 쉼표로 연결된 IMO 번호 (: "1000019,1000021,...")
* @return API 응답
*/
private List<TransitsDto> callApiWithBatch(String lrno) {
String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate+"&lrno=" + lrno;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(TransitsDto.class)
.collectList()
.block();
}
@Override
protected void afterFetch(List<TransitsDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
getReaderName(), allImoNumbers.size());
}
}
}

파일 보기

@ -1,88 +0,0 @@
package com.snp.batch.jobs.risk.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.risk.batch.dto.RiskDto;
import com.snp.batch.jobs.risk.batch.entity.RiskEntity;
import com.snp.batch.jobs.risk.batch.processor.RiskDataProcessor;
import com.snp.batch.jobs.risk.batch.reader.RiskDataReader;
import com.snp.batch.jobs.risk.batch.writer.RiskDataWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
@Slf4j
@Configuration
public class RiskImportJobConfig extends BaseJobConfig<RiskDto, RiskEntity> {
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeServiceApiWebClient;
private final RiskDataProcessor riskDataProcessor;
private final RiskDataWriter riskDataWriter;
@Value("${app.batch.target-schema.name}")
private String targetSchema;
@Override
protected int getChunkSize() {
return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정
}
public RiskImportJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
RiskDataProcessor riskDataProcessor,
RiskDataWriter riskDataWriter,
JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient")WebClient maritimeServiceApiWebClient) {
super(jobRepository, transactionManager);
this.jdbcTemplate = jdbcTemplate;
this.maritimeServiceApiWebClient = maritimeServiceApiWebClient;
this.riskDataProcessor = riskDataProcessor;
this.riskDataWriter = riskDataWriter;
}
@Override
protected String getJobName() {
return "RiskImportJob";
}
@Override
protected String getStepName() {
return "RiskImportStep";
}
@Override
protected ItemReader<RiskDto> createReader() {
return new RiskDataReader(maritimeServiceApiWebClient, jdbcTemplate, targetSchema);
}
@Override
protected ItemProcessor<RiskDto, RiskEntity> createProcessor() {
return riskDataProcessor;
}
@Override
protected ItemWriter<RiskEntity> createWriter() { return riskDataWriter; }
@Bean(name = "RiskImportJob")
public Job riskImportJob() {
return job();
}
@Bean(name = "RiskImportStep")
public Step riskImportStep() {
return step();
}
}

파일 보기

@ -1,150 +0,0 @@
package com.snp.batch.jobs.risk.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.risk.batch.dto.RiskDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.Collections;
import java.util.List;
@Slf4j
public class RiskDataReader extends BaseApiReader<RiskDto> {
//TODO :
// 1. Core20 IMO_NUMBER 전체 조회
// 2. IMO번호에 대한 마지막 AIS 신호 요청 (1회 최대 5000개 : Chunk 단위로 반복)
// 3. Response Data -> Core20에 업데이트 (Chunk 단위로 반복)
private final JdbcTemplate jdbcTemplate;
private final String targetSchema;
private List<String> allImoNumbers;
private int currentBatchIndex = 0;
private final int batchSize = 100;
public RiskDataReader(WebClient webClient, JdbcTemplate jdbcTemplate, String targetSchema) {
super(webClient);
this.jdbcTemplate = jdbcTemplate;
this.targetSchema = targetSchema;
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "riskDataReader";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allImoNumbers = null;
}
@Override
protected String getApiPath() {
return "/RiskAndCompliance/RisksByImos";
}
private String getTargetTable(){
return targetSchema + ".ship_data";
}
private String getImoQuery() {
return "select imo_number as ihslrorimoshipno from " + getTargetTable() + " order by imo_number";
}
@Override
protected void beforeFetch(){
log.info("[{}] Core20 테이블에서 IMO 번호 조회 시작...", getReaderName());
allImoNumbers = jdbcTemplate.queryForList(getImoQuery(), String.class);
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
updateApiCallStats(totalBatches, 0);
}
@Override
protected List<RiskDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
return null; // Job 종료
}
// 현재 배치의 시작/ 인덱스 계산
int startIndex = currentBatchIndex;
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
try {
// IMO 번호를 쉼표로 연결 (: "1000019,1000021,1000033,...")
String imoParam = String.join(",", currentBatch);
// API 호출
List<RiskDto> response = callAisApiWithBatch(imoParam);
// 다음 배치로 인덱스 이동
currentBatchIndex = endIndex;
// 응답 처리
if (response != null) {
// List<ComplianceDto> targets = response;
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, response.size());
// API 호출 통계 업데이트
updateApiCallStats(totalBatches, currentBatchNumber);
// API 과부하 방지 (다음 배치 0.5초 대기)
if (currentBatchIndex < allImoNumbers.size()) {
Thread.sleep(500);
}
return response;
} else {
log.warn("[{}] 배치 {}/{} 응답 없음",
getReaderName(), currentBatchNumber, totalBatches);
// API 호출 통계 업데이트 (실패도 카운트)
updateApiCallStats(totalBatches, currentBatchNumber);
return Collections.emptyList();
}
} catch (Exception e) {
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
currentBatchIndex = endIndex;
// 리스트 반환 (Job 계속 진행)
return Collections.emptyList();
}
}
private List<RiskDto> callAisApiWithBatch(String imoNumbers) {
String url = getApiPath() + "?imos=" + imoNumbers;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToMono(new ParameterizedTypeReference<List<RiskDto>>() {})
.block();
}
}

파일 보기

@ -1,115 +0,0 @@
package com.snp.batch.jobs.shipdetail.batch.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailDto;
import com.snp.batch.jobs.shipdetail.batch.entity.ShipDetailEntity;
import com.snp.batch.jobs.shipdetail.batch.processor.ShipDetailDataProcessor;
import com.snp.batch.jobs.shipdetail.batch.reader.ShipDetailDataReader;
import com.snp.batch.jobs.shipdetail.batch.writer.ShipDetailDataWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbersAll 호출
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* ShipDetailDataReader (ship_data Maritime API)
* (ShipDetailDto)
* ShipDetailDataProcessor
* (ShipDetailEntity)
* ShipDetailDataWriter
* (ship_detail 테이블)
*/
/**
* 선박 상세 정보 Import Job Config
* I: ShipDetailDto (Reader 출력)
* O: ShipDetailEntity (Processor 출력)
*/
@Slf4j
@Configuration
public class ShipDetailImportJobConfig extends BaseJobConfig<ShipDetailDto, ShipDetailEntity> {
private final ShipDetailDataProcessor shipDetailDataProcessor;
private final ShipDetailDataWriter shipDetailDataWriter;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient;
private final ObjectMapper objectMapper;
@Value("${app.batch.target-schema.name}")
private String targetSchema;
public ShipDetailImportJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ShipDetailDataProcessor shipDetailDataProcessor,
ShipDetailDataWriter shipDetailDataWriter,
JdbcTemplate jdbcTemplate,
@Qualifier("maritimeApiWebClient") WebClient maritimeApiWebClient,
ObjectMapper objectMapper) {
super(jobRepository, transactionManager);
this.shipDetailDataProcessor = shipDetailDataProcessor;
this.shipDetailDataWriter = shipDetailDataWriter;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient;
this.objectMapper = objectMapper;
}
@Override
protected String getJobName() {
return "shipDetailImportJob";
}
@Override
protected String getStepName() {
return "shipDetailImportStep";
}
@Override
protected ItemReader<ShipDetailDto> createReader() {
return new ShipDetailDataReader(maritimeApiWebClient, jdbcTemplate, objectMapper, targetSchema);
}
@Override
protected ItemProcessor<ShipDetailDto, ShipDetailEntity> createProcessor() {
return shipDetailDataProcessor;
}
@Override
protected ItemWriter<ShipDetailEntity> createWriter() { // 타입 변경
return shipDetailDataWriter;
}
@Override
protected int getChunkSize() {
return 30; // API에서 100개씩 가져오므로 chunk도 100으로 설정
}
@Bean(name = "shipDetailImportJob")
public Job shipDetailImportJob() {
return job();
}
@Bean(name = "shipDetailImportStep")
public Step shipDetailImportStep() {
return step();
}
}

파일 보기

@ -1,176 +0,0 @@
package com.snp.batch.jobs.shipdetail.batch.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.beans.factory.annotation.Value;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
@Slf4j
@Configuration
public class ShipDetailSyncJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final JdbcTemplate jdbcTemplate;
@Value("${app.batch.target-schema.name}")
private String targetSchema;
@Value("${app.batch.last-execution-buffer-hours:24}")
private int lastExecutionBufferHours;
// API 정의 (배치 로그 관리용)
protected String getApiKey() {
return "SHIP_DETAIL_SYNC_API";
}
public ShipDetailSyncJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
JdbcTemplate jdbcTemplate) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
this.jdbcTemplate = jdbcTemplate;
}
/**
* 동기화 대상 25개 테이블 리스트
*/
private static final List<String> SYNC_TABLES = Arrays.asList(
"additionalshipsdata", "bareboatcharterhistory",
"callsignandmmsihistory", "classhistory", "companycompliancedetails",
"companyvesselrelationships", "crewlist", "darkactivityconfirmed",
"flaghistory", "groupbeneficialownerhistory", "iceclass", "namehistory",
"operatorhistory", "ownerhistory", "pandihistory", "safetymanagementcertificatehist",
"shipmanagerhistory", "sistershiplinks", "specialfeature", "statushistory",
"stowagecommodity", "surveydates", "surveydateshistoryunique",
"technicalmanagerhistory", "thrusters"
);
/**
* Job 구성: 모든 테이블 동기화 마지막 업데이트 실행
*/
@Bean(name = "ShipDetailSyncJob")
public Job shipDetailSyncJob() {
return new JobBuilder("ShipDetailSyncJob", jobRepository)
.start(shipMasterAndCoreSyncStep()) // 1단계: Ship_Detail_Data, Core20 테이블 동기화
.next(shipDetailSyncStep()) // 2단계: 선박제원정보 종속 25개 테이블 순차 동기화
.next(shipDetailSyncLastExecutionUpdateStep()) // 3단계: 최종 성공 시간 업데이트
.build();
}
/**
* 1단계: Ship_Detail_Data, Core20 테이블 동기화
*/
@Bean
public Tasklet shipMasterAndCoreSyncTasklet() {
return (contribution, chunkContext) -> {
// 배치 시작 시점 캡처 (LastExecutionUpdate에서 사용)
LocalDateTime toDate = LocalDateTime.now();
chunkContext.getStepContext()
.getStepExecution().getJobExecution()
.getExecutionContext().put("batchToDate", toDate.toString());
log.info(">>>>> batchToDate 캡처: {}", toDate);
log.info(">>>>> SHIP MASTER & CORE20 동기화 프로시저 호출 시작");
// PostgreSQL 기준 프로시저 호출 (CALL)
String procedureCall = String.format("CALL %s.proc_sync_ship_master_and_core()", targetSchema);
jdbcTemplate.execute(procedureCall);
log.info(">>>>> SHIP MASTER & CORE20 동기화 프로시저 호출 완료");
return RepeatStatus.FINISHED;
};
}
@Bean(name = "ShipMasterAndCoreSyncStep")
public Step shipMasterAndCoreSyncStep() {
return new StepBuilder("ShipMasterAndCoreSyncStep", jobRepository)
.tasklet(shipMasterAndCoreSyncTasklet(), transactionManager)
.build();
}
/**
* 2단계: 25개 테이블 동기화 Tasklet
*/
@Bean
public Tasklet shipDetailSyncTasklet() {
return (contribution, chunkContext) -> {
log.info(">>>>> [시작] 25개 테이블 동기화 프로세스");
for (String tableName : SYNC_TABLES) {
try {
log.info("테이블 동기화 중: {}", tableName);
// 이전에 생성한 동적 프로시저 호출
String procedureCall = String.format("CALL %s.proc_sync_ship_detail('%s')", targetSchema, tableName);
jdbcTemplate.execute(procedureCall);
} catch (Exception e) {
log.error("테이블 동기화 실패: {}. 에러: {}", tableName, e.getMessage());
// 특정 테이블 실패 중단할지, 계속 진행할지에 따라 throw 여부 결정
throw e; // 중단하려면 주석 해제
}
}
log.info(">>>>> [완료] 25개 테이블 동기화 프로세스");
return RepeatStatus.FINISHED;
};
}
@Bean(name = "ShipDetailSyncStep")
public Step shipDetailSyncStep() {
return new StepBuilder("ShipDetailSyncStep", jobRepository)
.tasklet(shipDetailSyncTasklet(), transactionManager)
.build();
}
/**
* 3단계: 모든 스텝 성공 배치 실행 로그 업데이트
*/
@Bean
public Tasklet shipDetailSyncLastExecutionUpdateTasklet() {
return (contribution, chunkContext) -> {
String toDateStr = chunkContext.getStepContext()
.getStepExecution().getJobExecution()
.getExecutionContext().getString("batchToDate", null);
LocalDateTime successDate;
if (toDateStr != null) {
successDate = LocalDateTime.parse(toDateStr).minusHours(lastExecutionBufferHours);
log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 시작 (캡처된 toDate - {}시간 버퍼: {})", lastExecutionBufferHours, successDate);
} else {
successDate = LocalDateTime.now().minusHours(lastExecutionBufferHours);
log.warn(">>>>> batchToDate가 없어 현재 시간 - {}시간 버퍼 사용: {}", lastExecutionBufferHours, successDate);
}
jdbcTemplate.update(
String.format("UPDATE %s.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = ?, UPDATED_AT = NOW() WHERE API_KEY = ?", targetSchema),
Timestamp.valueOf(successDate), getApiKey()
);
log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료 (LAST_SUCCESS_DATE = {})", successDate);
return RepeatStatus.FINISHED;
};
}
@Bean(name = "ShipDetailSyncLastExecutionUpdateStep")
public Step shipDetailSyncLastExecutionUpdateStep() {
return new StepBuilder("ShipDetailSyncLastExecutionUpdateStep", jobRepository)
.tasklet(shipDetailSyncLastExecutionUpdateTasklet(), transactionManager)
.build();
}
}

파일 보기

@ -1,213 +0,0 @@
package com.snp.batch.jobs.shipdetail.batch.reader;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailApiResponse;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailDto;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipResultDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
public class ShipDetailDataReader extends BaseApiReader<ShipDetailDto> {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
private final String targetSchema;
// 배치 처리 상태
private List<String> allImoNumbers;
private int currentBatchIndex = 0;
private final int batchSize = 30;
public ShipDetailDataReader(WebClient webClient, JdbcTemplate jdbcTemplate, ObjectMapper objectMapper, String targetSchema) {
super(webClient);
this.jdbcTemplate = jdbcTemplate;
this.objectMapper = objectMapper;
this.targetSchema = targetSchema;
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "ShipDetailDataReader";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allImoNumbers = null;
}
@Override
protected String getApiPath() {
return "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipsByIHSLRorIMONumbersAll";
}
/**
* IMO 번호 조회 쿼리 생성 (스키마 동적 적용)
*/
private String getImoQuery() {
return "select imo_no from " + targetSchema + ".tb_ship_default_info order by imo_no";
}
/**
* 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회
*/
@Override
protected void beforeFetch() {
// Step 1. IMO 전체 번호 조회
log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName());
allImoNumbers = jdbcTemplate.queryForList(getImoQuery(), String.class);
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
// API 통계 초기화
updateApiCallStats(totalBatches, 0);
}
/**
* Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
*
* Spring Batch가 100건씩 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 100건 ( 이상 없으면 null)
*/
@Override
protected List<ShipDetailDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
return null; // Job 종료
}
// 현재 배치의 시작/ 인덱스 계산
int startIndex = currentBatchIndex;
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
try {
// IMO 번호를 쉼표로 연결 (: "1000019,1000021,1000033,...")
String imoParam = String.join(",", currentBatch);
// API 호출
ShipDetailApiResponse response = callApiWithBatch(imoParam);
// 다음 배치로 인덱스 이동
currentBatchIndex = endIndex;
// 응답 처리
if (response != null && response.getShipResult() != null) {
List<ShipDetailDto> shipDetailDtoList = response.getShipResult().stream()
.map(ShipResultDto::getShipDetails) // result -> result.getShipDetail()
.filter(Objects::nonNull) // 데이터가 없는 경우 제외
.collect(Collectors.toList());
log.info("[{}] 배치 {}/{} 완료: {} 건 조회 (Result 그룹 수: {})",
getReaderName(), currentBatchNumber, totalBatches,
shipDetailDtoList.size(), response.getShipResult().size());
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, shipDetailDtoList.size());
// API 호출 통계 업데이트
updateApiCallStats(totalBatches, currentBatchNumber);
// API 과부하 방지 (다음 배치 0.5초 대기)
if (currentBatchIndex < allImoNumbers.size()) {
Thread.sleep(500);
}
return shipDetailDtoList;
} else {
log.warn("[{}] 배치 {}/{} 응답 없음",
getReaderName(), currentBatchNumber, totalBatches);
// API 호출 통계 업데이트 (실패도 카운트)
updateApiCallStats(totalBatches, currentBatchNumber);
return Collections.emptyList();
}
} catch (Exception e) {
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
currentBatchIndex = endIndex;
// 리스트 반환 (Job 계속 진행)
return Collections.emptyList();
}
}
/**
* Query Parameter를 사용한 API 호출
*
* @param imoNumbers 쉼표로 연결된 IMO 번호 (: "1000019,1000021,...")
* @return API 응답
*/
private ShipDetailApiResponse callApiWithBatch(String imoNumbers) {
String url = getApiPath() + "?IMONumbers=" + imoNumbers;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToMono(ShipDetailApiResponse.class)
.block();
}
@Override
protected void afterFetch(List<ShipDetailDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
getReaderName(), allImoNumbers.size());
}
}
}

파일 보기

@ -1,136 +0,0 @@
package com.snp.batch.jobs.shipimport.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.common.batch.processor.FlagCodeDataProcessor;
import com.snp.batch.jobs.common.batch.reader.FlagCodeDataReader;
import com.snp.batch.jobs.shipimport.batch.dto.ShipDto;
import com.snp.batch.jobs.shipimport.batch.entity.ShipEntity;
import com.snp.batch.jobs.shipimport.batch.processor.ShipDataProcessor;
import com.snp.batch.jobs.shipimport.batch.reader.ShipDataReader;
import com.snp.batch.jobs.shipimport.batch.repository.ShipRepository;
import com.snp.batch.jobs.shipimport.batch.writer.ShipDataWriter;
import com.snp.batch.service.BatchApiLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* Ship Data Import Job 설정
* BaseJobConfig를 상속하여 구현
*
* Maritime API에서 선박 데이터를 받아 PostgreSQL에 저장하는 배치 작업:
* - Maritime API에서 170,000+ 선박 IMO 번호 조회
* - 중복 체크 업데이트 로직
* - PostgreSQL에 저장
*/
@Slf4j
@Configuration
public class ShipImportJobConfig extends BaseJobConfig<ShipDto, ShipEntity> {
private final ShipRepository shipRepository;
private final WebClient maritimeApiWebClient;
private final ShipDataProcessor shipDataProcessor;
private final ShipDataReader shipDataReader;
private final BatchApiLogService batchApiLogService;
@Value("${app.batch.ship-api.url}")
private String maritimeApiUrl;
@Value("${app.batch.chunk-size:1000}")
private int chunkSize;
/**
* 생성자 주입
* maritimeApiWebClient: MaritimeApiWebClientConfig에서 등록한 Bean 주입
*/
public ShipImportJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ShipRepository shipRepository,
@Qualifier("maritimeApiWebClient") WebClient maritimeApiWebClient,
ShipDataProcessor shipDataProcessor,
ShipDataReader shipDataReader,
BatchApiLogService batchApiLogService) {
super(jobRepository, transactionManager);
this.shipRepository = shipRepository;
this.maritimeApiWebClient = maritimeApiWebClient;
this.shipDataProcessor = shipDataProcessor;
this.shipDataReader = shipDataReader;
this.batchApiLogService = batchApiLogService;
}
@Override
protected String getJobName() {
return "shipDataImportJob";
}
@Override
protected String getStepName() {
return "shipDataImportStep";
}
@Override
protected ItemReader<ShipDto> createReader() {
return shipDataReader;
}
@Bean
@StepScope
public ShipDataReader shipDataReader(
@Value("#{stepExecution.jobExecution.id}") Long jobExecutionId, // SpEL로 ID 추출
@Value("#{stepExecution.id}") Long stepExecutionId
) {
ShipDataReader reader = new ShipDataReader(maritimeApiWebClient, batchApiLogService, maritimeApiUrl);
reader.setExecutionIds(jobExecutionId, stepExecutionId); // ID 세팅
return reader;
}
@Override
protected ItemProcessor<ShipDto, ShipEntity> createProcessor() {
return shipDataProcessor;
}
@Bean
@StepScope
public ShipDataProcessor shipDataProcessor(
@Value("#{stepExecution.jobExecution.id}") Long jobExecutionId) {
return new ShipDataProcessor(jobExecutionId);
}
@Override
protected ItemWriter<ShipEntity> createWriter() {
return new ShipDataWriter(shipRepository);
}
@Override
protected int getChunkSize() {
return chunkSize;
}
/**
* Job Bean 등록
*/
@Bean(name = "shipDataImportJob")
public Job shipDataImportJob() {
return job();
}
/**
* Step Bean 등록
*/
@Bean(name = "shipDataImportStep")
public Step shipDataImportStep() {
return step();
}
}

파일 보기

@ -1,43 +0,0 @@
package com.snp.batch.jobs.shipimport.batch.dto;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class ShipApiResponse {
@JsonProperty("shipCount")
private Integer shipCount;
@JsonProperty("Ships")
private List<ShipDto> ships;
@JsonProperty("APSStatus")
private APSStatus apsStatus;
@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public static class APSStatus {
@JsonProperty("SystemVersion")
private String systemVersion;
@JsonProperty("SystemDate")
private String systemDate;
@JsonProperty("JobRunDate")
private String jobRunDate;
@JsonProperty("CompletedOK")
private Boolean completedOK;
}
}

파일 보기

@ -1,34 +0,0 @@
package com.snp.batch.jobs.shipimport.batch.dto;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class ShipDto {
@JsonProperty("DataSetVersion")
private DataSetVersion dataSetVersion;
@JsonProperty("CoreShipInd")
private String coreShipInd;
@JsonProperty("IHSLRorIMOShipNo")
private String imoNumber;
@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public static class DataSetVersion {
@JsonProperty("DataSetVersion")
private String version;
}
}

파일 보기

@ -1,55 +0,0 @@
package com.snp.batch.jobs.shipimport.batch.entity;
import com.snp.batch.common.batch.entity.BaseEntity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.time.LocalDateTime;
/**
* 선박 엔티티 - JDBC 전용
* Maritime API 데이터 저장
*
* 테이블: ship_data
*/
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class ShipEntity extends BaseEntity {
/**
* 기본 (자동 생성)
* 컬럼: id (BIGSERIAL)
*/
private Long id;
/**
* IMO 번호 (선박 고유 식별번호)
* 컬럼: imo_number (VARCHAR(20), UNIQUE, NOT NULL)
* 인덱스: idx_imo_number (UNIQUE)
*/
private String imoNumber;
/**
* Core Ship 여부
* 컬럼: core_ship_ind (VARCHAR(10))
*/
private String coreShipInd;
/**
* 데이터셋 버전
* 컬럼: dataset_version (VARCHAR(20))
*/
private String datasetVersion;
/**
* Import 일시
* 컬럼: import_date (TIMESTAMP)
*/
private LocalDateTime importDate;
}

파일 보기

@ -1,46 +0,0 @@
package com.snp.batch.jobs.shipimport.batch.processor;
import com.snp.batch.common.batch.processor.BaseProcessor;
import com.snp.batch.jobs.shipimport.batch.dto.ShipDto;
import com.snp.batch.jobs.shipimport.batch.entity.ShipEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
/**
* ShipDto를 ShipEntity로 변환하는 Processor
* BaseProcessor를 상속하여 공통 변환 패턴 적용
* 중복 체크 업데이트 로직 포함
*/
@Slf4j
public class ShipDataProcessor extends BaseProcessor<ShipDto, ShipEntity> {
private final Long jobExecutionId;
public ShipDataProcessor(@Value("#{stepExecution.jobExecution.id}") Long jobExecutionId) {
this.jobExecutionId = jobExecutionId;
}
@Override
protected ShipEntity processItem(ShipDto item) throws Exception {
if (item.getImoNumber() == null || item.getImoNumber().trim().isEmpty()) {
log.warn("IMO 번호가 없는 선박 데이터 스킵");
return null; // 스킵
}
ShipEntity entity = ShipEntity.builder()
.imoNumber(item.getImoNumber())
.coreShipInd(item.getCoreShipInd())
.datasetVersion(
item.getDataSetVersion() != null ?
item.getDataSetVersion().getVersion() : null
)
.jobExecutionId(jobExecutionId)
.createdBy("SYSTEM")
.build();
log.debug("선박 데이터 처리 완료: FlagCode={}", item.getImoNumber());
return entity;
}
}

파일 보기

@ -1,65 +0,0 @@
package com.snp.batch.jobs.shipimport.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.common.batch.dto.FlagCodeApiResponse;
import com.snp.batch.jobs.common.batch.dto.FlagCodeDto;
import com.snp.batch.jobs.shipimport.batch.dto.ShipApiResponse;
import com.snp.batch.jobs.shipimport.batch.dto.ShipDto;
import com.snp.batch.service.BatchApiLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* Maritime API에서 선박 데이터를 읽어오는 ItemReader
* BaseApiReader v2.0을 상속하여 공통 API 호출 패턴 적용
*/
@Slf4j
public class ShipDataReader extends BaseApiReader<ShipDto> {
private final BatchApiLogService batchApiLogService;
String maritimeApiUrl;
public ShipDataReader(WebClient webClient, BatchApiLogService batchApiLogService, String maritimeApiUrl) {
super(webClient); // BaseApiReader에 WebClient 전달
this.batchApiLogService = batchApiLogService;
this.maritimeApiUrl = maritimeApiUrl;
}
// ========================================
// 필수 구현 메서드
// ========================================
@Override
protected String getReaderName() {
return "ShipDataReader";
}
@Override
protected String getApiPath() {
return "/MaritimeWCF/APSShipService.svc/RESTFul/GetAllIMONumbers";
}
@Override
protected List<ShipDto> fetchDataFromApi() {
try {
log.info("선박 API 호출 시작");
// 공통 함수 호출
List<ShipDto> result = executeWrapperApiCall(
maritimeApiUrl, // 베이스 URL (필드 또는 설정값)
getApiPath(), // API 경로
ShipApiResponse.class, // 응답을 받을 래퍼 클래스
ShipApiResponse::getShips, // 리스트 추출 함수 (메서드 참조)
batchApiLogService // 로그 서비스
);
// 결과가 null일 경우 리스트 반환 (안전장치)
return result != null ? result : Collections.emptyList();
} catch (Exception e) {
log.error("선박 데이터 API 호출 실패", e);
log.error("에러 메시지: {}", e.getMessage());
return new ArrayList<>();
}
}
}

파일 보기

@ -1,13 +0,0 @@
package com.snp.batch.jobs.shipimport.batch.repository;
import com.snp.batch.jobs.shipimport.batch.entity.ShipEntity;
import java.util.List;
/**
* ShipEntity Repository 인터페이스
* 구현체: ShipRepositoryImpl (JdbcTemplate 기반)
*/
public interface ShipRepository {
void saveAllShipData(List<ShipEntity> entities);
}

파일 보기

@ -1,104 +0,0 @@
package com.snp.batch.jobs.shipimport.batch.repository;
import com.snp.batch.common.batch.repository.BaseJdbcRepository;
import com.snp.batch.jobs.shipimport.batch.entity.ShipEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;
import java.sql.PreparedStatement;
import java.sql.Types;
import java.util.List;
/**
* ShipEntity Repository (JdbcTemplate 기반)
*/
@Slf4j
@Repository("shipRepository")
public class ShipRepositoryImpl extends BaseJdbcRepository<ShipEntity, Long> implements ShipRepository {
@Value("${app.batch.target-schema.name}")
private String targetSchema;
@Value("${app.batch.target-schema.tables.ship-001}")
private String tableName;
public ShipRepositoryImpl(JdbcTemplate jdbcTemplate) {
super(jdbcTemplate);
}
@Override
protected String getTargetSchema() {
return targetSchema;
}
@Override
protected String getSimpleTableName() {
return tableName;
}
@Override
protected String getEntityName() {
return "Ship";
}
@Override
protected RowMapper<ShipEntity> getRowMapper() {
return null;
}
@Override
protected Long extractId(ShipEntity entity) {
return null;
}
@Override
protected String getInsertSql() {
return null;
}
@Override
protected String getUpdateSql() {
return """
INSERT INTO %s(
imo_no, core_ship_ind, dataset_ver,
job_execution_id, creatr_id
) VALUES (?, ?, ?, ?, ?)
""".formatted(getTableName());
}
@Override
protected void setInsertParameters(PreparedStatement ps, ShipEntity entity) throws Exception {
}
@Override
protected void setUpdateParameters(PreparedStatement ps, ShipEntity entity) throws Exception {
int idx = 1;
ps.setString(idx++, entity.getImoNumber());
ps.setString(idx++, entity.getCoreShipInd());
ps.setString(idx++, entity.getDatasetVersion());
ps.setObject(idx++, entity.getJobExecutionId(), Types.INTEGER);
ps.setString(idx++, entity.getCreatedBy());
}
@Override
public void saveAllShipData(List<ShipEntity> entities) {
if (entities == null || entities.isEmpty()) {
return;
}
jdbcTemplate.batchUpdate(getUpdateSql(), entities, entities.size(),
(ps, entity) -> {
try {
setUpdateParameters(ps, entity);
} catch (Exception e) {
log.error("배치 삽입 파라미터 설정 실패", e);
throw new RuntimeException(e);
}
});
log.info("{} 전체 저장 완료: {} 건", getEntityName(), entities.size());
}
}

파일 보기

@ -1,28 +0,0 @@
package com.snp.batch.jobs.shipimport.batch.writer;
import com.snp.batch.common.batch.writer.BaseWriter;
import com.snp.batch.jobs.shipimport.batch.entity.ShipEntity;
import com.snp.batch.jobs.shipimport.batch.repository.ShipRepository;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* ShipEntity를 DB에 저장하는 ItemWriter
* BaseWriter를 상속하여 공통 저장 패턴 적용
*/
@Slf4j
public class ShipDataWriter extends BaseWriter<ShipEntity> {
private final ShipRepository shipRepository;
public ShipDataWriter(ShipRepository shipRepository) {
super("Ship");
this.shipRepository = shipRepository;
}
@Override
protected void writeItems(List<ShipEntity> items) throws Exception {
shipRepository.saveAllShipData(items);
}
}