feat(imometa): IMO Meta Table 관리 배치 작업 개발

Step 1: GetAllIMONumbers API → tb_ship_default_info UPSERT (17만건)
Step 2: GetAllIMONumbersToDelete API → umnged_ship_flag = 'Y' UPDATE

Closes #80

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
HYOJIN 2026-03-24 12:50:51 +09:00
부모 5f445381c6
커밋 757eb2617d
9개의 변경된 파일541개의 추가작업 그리고 0개의 파일을 삭제

파일 보기

@ -0,0 +1,216 @@
package com.snp.batch.jobs.imometa.batch.config;
import com.snp.batch.common.batch.config.BaseMultiStepJobConfig;
import com.snp.batch.global.model.BatchApiLog;
import com.snp.batch.jobs.imometa.batch.dto.ImoMetaDto;
import com.snp.batch.jobs.imometa.batch.dto.ImoMetaResponse;
import com.snp.batch.jobs.imometa.batch.entity.ImoMetaEntity;
import com.snp.batch.jobs.imometa.batch.reader.ImoMetaDataReader;
import com.snp.batch.jobs.imometa.batch.repository.ImoMetaRepository;
import com.snp.batch.jobs.imometa.batch.writer.ImoMetaDataWriter;
import com.snp.batch.service.BatchApiLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import org.springframework.web.util.UriComponentsBuilder;
import java.time.LocalDateTime;
import java.util.List;
@Slf4j
@Configuration
public class ImoMetaImportJobConfig extends BaseMultiStepJobConfig<ImoMetaDto, ImoMetaEntity> {
private static final String DELETE_IMO_PATH = "/MaritimeWCF/APSShipService.svc/RESTFul/GetAllIMONumbersToDelete";
private static final String CREATED_BY_SYSTEM = "SYSTEM";
private final ImoMetaRepository imoMetaRepository;
private final WebClient maritimeApiWebClient;
private final BatchApiLogService batchApiLogService;
private final ImoMetaDataWriter imoMetaDataWriter;
@Value("${app.batch.ship-api.url}")
private String maritimeApiUrl;
public ImoMetaImportJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ImoMetaRepository imoMetaRepository,
@Qualifier("maritimeApiWebClient") WebClient maritimeApiWebClient,
BatchApiLogService batchApiLogService,
ImoMetaDataWriter imoMetaDataWriter) {
super(jobRepository, transactionManager);
this.imoMetaRepository = imoMetaRepository;
this.maritimeApiWebClient = maritimeApiWebClient;
this.batchApiLogService = batchApiLogService;
this.imoMetaDataWriter = imoMetaDataWriter;
}
@Override
protected String getJobName() {
return "ImoMetaImportJob";
}
@Override
protected String getStepName() {
return "ImoMetaImportStep";
}
@Override
protected int getChunkSize() {
return 5000;
}
@Override
protected Job createJobFlow(JobBuilder jobBuilder) {
return jobBuilder
.start(imoMetaImportStep())
.next(imoMetaDeleteFlagStep())
.build();
}
// ========================================
// Step 1: IMO 전체 수집 (Chunk-oriented)
// ========================================
@Bean(name = "ImoMetaImportStep")
public Step imoMetaImportStep() {
return new StepBuilder("ImoMetaImportStep", jobRepository)
.<ImoMetaDto, ImoMetaEntity>chunk(getChunkSize(), transactionManager)
.reader(imoMetaDataReader(null, null))
.processor(imoMetaProcessor(null))
.writer(imoMetaDataWriter)
.build();
}
@Bean
@StepScope
public ImoMetaDataReader imoMetaDataReader(
@Value("#{stepExecution.jobExecution.id}") Long jobExecutionId,
@Value("#{stepExecution.id}") Long stepExecutionId) {
ImoMetaDataReader reader = new ImoMetaDataReader(maritimeApiWebClient, batchApiLogService, maritimeApiUrl);
reader.setExecutionIds(jobExecutionId, stepExecutionId);
return reader;
}
@Bean
@StepScope
public ItemProcessor<ImoMetaDto, ImoMetaEntity> imoMetaProcessor(
@Value("#{stepExecution.jobExecution.id}") Long jobExecutionId) {
return dto -> ImoMetaEntity.builder()
.imoNo(dto.getImoNo())
.coreShipInd(dto.getCoreShipInd())
.datasetVer(dto.getDataSetVersion() != null ? dto.getDataSetVersion().getDataSetVersion() : null)
.jobExecutionId(jobExecutionId)
.createdBy(CREATED_BY_SYSTEM)
.build();
}
// ========================================
// Step 2: 삭제 대상 IMO 플래그 업데이트 (Tasklet)
// ========================================
@Bean
public Tasklet imoMetaDeleteFlagTasklet() {
return (contribution, chunkContext) -> {
long jobExecutionId = chunkContext.getStepContext().getStepExecution()
.getJobExecution().getId();
long stepExecutionId = chunkContext.getStepContext().getStepExecution().getId();
String fullUri = UriComponentsBuilder.fromHttpUrl(maritimeApiUrl)
.path(DELETE_IMO_PATH)
.build()
.toUriString();
long startTime = System.currentTimeMillis();
int statusCode = 200;
String errorMessage = null;
long responseSize = 0L;
try {
log.info("[ImoMetaDeleteFlagTasklet] API 요청 시작: {}", fullUri);
ImoMetaResponse response = maritimeApiWebClient.get()
.uri(uriBuilder -> uriBuilder.path(DELETE_IMO_PATH).build())
.retrieve()
.bodyToMono(ImoMetaResponse.class)
.block();
List<String> imoNumbers = (response != null && response.getShips() != null)
? response.getShips().stream()
.map(ImoMetaDto::getImoNo)
.filter(imoNo -> imoNo != null && !imoNo.isBlank())
.toList()
: List.of();
responseSize = imoNumbers.size();
log.info("[ImoMetaDeleteFlagTasklet] 삭제 대상 IMO {}건 조회됨", responseSize);
if (!imoNumbers.isEmpty()) {
imoMetaRepository.updateDeleteFlag(imoNumbers);
log.info("[ImoMetaDeleteFlagTasklet] 삭제 플래그 업데이트 완료: {}건", imoNumbers.size());
} else {
log.info("[ImoMetaDeleteFlagTasklet] 삭제 대상 IMO 없음 → 스킵");
}
} catch (WebClientResponseException e) {
statusCode = e.getStatusCode().value();
errorMessage = String.format("API Error: %s", e.getResponseBodyAsString());
log.error("[ImoMetaDeleteFlagTasklet] API 호출 실패 (HTTP {}): {}", statusCode, errorMessage);
throw e;
} catch (Exception e) {
statusCode = 500;
errorMessage = String.format("System Error: %s", e.getMessage());
log.error("[ImoMetaDeleteFlagTasklet] API 호출 중 예외 발생: {}", errorMessage);
throw e;
} finally {
long duration = System.currentTimeMillis() - startTime;
batchApiLogService.saveLog(BatchApiLog.builder()
.apiRequestLocation("ImoMetaDeleteFlagTasklet")
.requestUri(fullUri)
.httpMethod("GET")
.statusCode(statusCode)
.responseTimeMs(duration)
.responseCount(responseSize)
.errorMessage(errorMessage)
.createdAt(LocalDateTime.now())
.jobExecutionId(jobExecutionId)
.stepExecutionId(stepExecutionId)
.build());
}
return RepeatStatus.FINISHED;
};
}
@Bean(name = "ImoMetaDeleteFlagStep")
public Step imoMetaDeleteFlagStep() {
return new StepBuilder("ImoMetaDeleteFlagStep", jobRepository)
.tasklet(imoMetaDeleteFlagTasklet(), transactionManager)
.build();
}
// ========================================
// Job Bean
// ========================================
@Bean(name = "ImoMetaImportJob")
public Job imoMetaImportJob() {
return job();
}
}

파일 보기

@ -0,0 +1,31 @@
package com.snp.batch.jobs.imometa.batch.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ImoMetaDto {
@JsonProperty("CoreShipInd")
private String coreShipInd;
@JsonProperty("DataSetVersion")
private DataSetVersionWrapper dataSetVersion;
@JsonProperty("IHSLRorIMOShipNo")
private String imoNo;
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class DataSetVersionWrapper {
@JsonProperty("DataSetVersion")
private String dataSetVersion;
}
}

파일 보기

@ -0,0 +1,20 @@
package com.snp.batch.jobs.imometa.batch.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ImoMetaResponse {
@JsonProperty("shipCount")
private int shipCount;
@JsonProperty("Ships")
private List<ImoMetaDto> ships;
}

파일 보기

@ -0,0 +1,20 @@
package com.snp.batch.jobs.imometa.batch.entity;
import com.snp.batch.common.batch.entity.BaseEntity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class ImoMetaEntity extends BaseEntity {
private String imoNo;
private String coreShipInd;
private String datasetVer;
}

파일 보기

@ -0,0 +1,141 @@
package com.snp.batch.jobs.imometa.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.global.model.BatchApiLog;
import com.snp.batch.jobs.imometa.batch.dto.ImoMetaDto;
import com.snp.batch.jobs.imometa.batch.dto.ImoMetaResponse;
import com.snp.batch.service.BatchApiLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import org.springframework.web.util.UriComponentsBuilder;
import java.time.LocalDateTime;
import java.util.List;
@Slf4j
public class ImoMetaDataReader extends BaseApiReader<ImoMetaDto> {
private static final String ALL_IMO_PATH = "/MaritimeWCF/APSShipService.svc/RESTFul/GetAllIMONumbers";
private static final int BATCH_SIZE = 5000;
private final BatchApiLogService batchApiLogService;
private final String maritimeApiUrl;
private List<ImoMetaDto> allData;
private int currentBatchIndex = 0;
public ImoMetaDataReader(WebClient webClient, BatchApiLogService batchApiLogService, String maritimeApiUrl) {
super(webClient);
this.batchApiLogService = batchApiLogService;
this.maritimeApiUrl = maritimeApiUrl;
enableChunkMode();
}
@Override
protected String getReaderName() {
return "ImoMetaDataReader";
}
@Override
protected String getApiPath() {
return ALL_IMO_PATH;
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allData = null;
}
@Override
protected List<ImoMetaDto> fetchNextBatch() throws Exception {
if (allData == null) {
allData = fetchAllImoData();
if (allData == null || allData.isEmpty()) {
log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName());
return null;
}
log.info("[{}] 총 {}건 데이터 조회됨. batchSize = {}", getReaderName(), allData.size(), BATCH_SIZE);
}
if (currentBatchIndex >= allData.size()) {
log.info("[{}] 모든 배치 처리 완료", getReaderName());
return null;
}
int end = Math.min(currentBatchIndex + BATCH_SIZE, allData.size());
List<ImoMetaDto> batch = allData.subList(currentBatchIndex, end);
int batchNum = (currentBatchIndex / BATCH_SIZE) + 1;
int totalBatches = (int) Math.ceil((double) allData.size() / BATCH_SIZE);
log.info("[{}] 배치 {}/{} 처리 중: {}건", getReaderName(), batchNum, totalBatches, batch.size());
currentBatchIndex = end;
updateApiCallStats(totalBatches, batchNum);
return batch;
}
private List<ImoMetaDto> fetchAllImoData() {
String fullUri = UriComponentsBuilder.fromHttpUrl(maritimeApiUrl)
.path(ALL_IMO_PATH)
.queryParam("includeDeadShips", "0")
.build()
.toUriString();
long startTime = System.currentTimeMillis();
int statusCode = 200;
String errorMessage = null;
long responseSize = 0L;
try {
log.info("[{}] API 요청 시작: {}", getReaderName(), fullUri);
ImoMetaResponse response = webClient.get()
.uri(uriBuilder -> uriBuilder
.path(ALL_IMO_PATH)
.queryParam("includeDeadShips", "0")
.build())
.retrieve()
.bodyToMono(ImoMetaResponse.class)
.block();
List<ImoMetaDto> result = (response != null && response.getShips() != null)
? response.getShips()
: List.of();
responseSize = result.size();
log.info("[{}] API 응답 수신: {}건", getReaderName(), responseSize);
return result;
} catch (WebClientResponseException e) {
statusCode = e.getStatusCode().value();
errorMessage = String.format("API Error: %s", e.getResponseBodyAsString());
log.error("[{}] API 호출 실패 (HTTP {}): {}", getReaderName(), statusCode, errorMessage);
throw e;
} catch (Exception e) {
statusCode = 500;
errorMessage = String.format("System Error: %s", e.getMessage());
log.error("[{}] API 호출 중 예외 발생: {}", getReaderName(), errorMessage);
throw e;
} finally {
long duration = System.currentTimeMillis() - startTime;
batchApiLogService.saveLog(BatchApiLog.builder()
.apiRequestLocation(getReaderName())
.requestUri(fullUri)
.httpMethod("GET")
.statusCode(statusCode)
.responseTimeMs(duration)
.responseCount(responseSize)
.errorMessage(errorMessage)
.createdAt(LocalDateTime.now())
.jobExecutionId(getJobExecutionId())
.stepExecutionId(getStepExecutionId())
.build());
}
}
}

파일 보기

@ -0,0 +1,10 @@
package com.snp.batch.jobs.imometa.batch.repository;
import com.snp.batch.jobs.imometa.batch.entity.ImoMetaEntity;
import java.util.List;
public interface ImoMetaRepository {
void upsertAll(List<ImoMetaEntity> items);
void updateDeleteFlag(List<String> imoNumbers);
}

파일 보기

@ -0,0 +1,76 @@
package com.snp.batch.jobs.imometa.batch.repository;
import com.snp.batch.jobs.imometa.batch.entity.ImoMetaEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import java.sql.Types;
import java.util.List;
@Slf4j
@Repository
public class ImoMetaRepositoryImpl implements ImoMetaRepository {
private final JdbcTemplate jdbcTemplate;
@Value("${app.batch.target-schema.name}")
private String targetSchema;
@Value("${app.batch.target-schema.tables.imo-meta-001:tb_ship_default_info}")
private String tableName;
public ImoMetaRepositoryImpl(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
private String getTableName() {
return targetSchema + "." + tableName;
}
@Override
@Transactional
public void upsertAll(List<ImoMetaEntity> items) {
if (items == null || items.isEmpty()) return;
String sql = """
INSERT INTO %s (imo_no, core_ship_ind, dataset_ver, job_execution_id, creatr_id)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (imo_no) DO UPDATE SET
core_ship_ind = EXCLUDED.core_ship_ind,
dataset_ver = EXCLUDED.dataset_ver,
job_execution_id = EXCLUDED.job_execution_id,
mdfcn_dt = CURRENT_TIMESTAMP,
mdfr_id = EXCLUDED.creatr_id
""".formatted(getTableName());
jdbcTemplate.batchUpdate(sql, items, items.size(), (ps, entity) -> {
int idx = 1;
ps.setString(idx++, entity.getImoNo());
ps.setString(idx++, entity.getCoreShipInd());
ps.setString(idx++, entity.getDatasetVer());
ps.setObject(idx++, entity.getJobExecutionId(), Types.BIGINT);
ps.setString(idx, entity.getCreatedBy());
});
log.info("ImoMeta upsert 완료: {} 건", items.size());
}
@Override
@Transactional
public void updateDeleteFlag(List<String> imoNumbers) {
if (imoNumbers == null || imoNumbers.isEmpty()) return;
String sql = """
UPDATE %s SET umnged_ship_flag = 'Y', mdfcn_dt = CURRENT_TIMESTAMP, mdfr_id = 'SYSTEM'
WHERE imo_no = ?
""".formatted(getTableName());
jdbcTemplate.batchUpdate(sql, imoNumbers, imoNumbers.size(),
(ps, imoNo) -> ps.setString(1, imoNo));
log.info("ImoMeta delete flag 업데이트 완료: {} 건", imoNumbers.size());
}
}

파일 보기

@ -0,0 +1,26 @@
package com.snp.batch.jobs.imometa.batch.writer;
import com.snp.batch.common.batch.writer.BaseWriter;
import com.snp.batch.jobs.imometa.batch.entity.ImoMetaEntity;
import com.snp.batch.jobs.imometa.batch.repository.ImoMetaRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class ImoMetaDataWriter extends BaseWriter<ImoMetaEntity> {
private final ImoMetaRepository imoMetaRepository;
public ImoMetaDataWriter(ImoMetaRepository imoMetaRepository) {
super("ImoMetaEntity");
this.imoMetaRepository = imoMetaRepository;
}
@Override
protected void writeItems(List<ImoMetaEntity> items) throws Exception {
imoMetaRepository.upsertAll(items);
}
}

파일 보기

@ -147,6 +147,7 @@ app:
risk-compliance-003: tb_company_compliance_info risk-compliance-003: tb_company_compliance_info
risk-detail-001: tb_ship_risk_detail_info risk-detail-001: tb_ship_risk_detail_info
ship-028: ship_detail_hash_json ship-028: ship_detail_hash_json
imo-meta-001: tb_ship_default_info
service-schema: service-schema:
name: std_snp_svc name: std_snp_svc
tables: tables: