feat(risk): Risk 상세 데이터 수집 배치 프로세스 추가 #82
102
docs/ddl/tb_ship_risk_detail_info.sql
Normal file
102
docs/ddl/tb_ship_risk_detail_info.sql
Normal file
@ -0,0 +1,102 @@
|
||||
-- std_snp_data.tb_ship_risk_detail_info definition
|
||||
|
||||
-- Drop table
|
||||
|
||||
-- DROP TABLE std_snp_data.tb_ship_risk_detail_info;
|
||||
|
||||
CREATE TABLE std_snp_data.tb_ship_risk_detail_info (
|
||||
row_index bigserial NOT NULL,
|
||||
crt_dt timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL,
|
||||
creatr_id varchar(34) NOT NULL,
|
||||
mdfcn_dt timestamp NULL,
|
||||
mdfr_id varchar(34) NULL,
|
||||
job_execution_id int8 NOT NULL,
|
||||
imo_no varchar(20) NULL,
|
||||
last_mdfcn_dt timestamptz NULL,
|
||||
risk_data_maint int4 NULL,
|
||||
ais_notrcv_elps_days int4 NULL,
|
||||
ais_notrcv_elps_days_desc varchar(500) NULL,
|
||||
ais_lwrnk_days int4 NULL,
|
||||
ais_lwrnk_days_desc varchar(500) NULL,
|
||||
ais_up_imo_desc int4 NULL,
|
||||
ais_up_imo_desc_val varchar(500) NULL,
|
||||
othr_ship_nm_voy_yn int4 NULL,
|
||||
othr_ship_nm_voy_yn_desc varchar(500) NULL,
|
||||
mmsi_anom_message int4 NULL,
|
||||
mmsi_anom_message_desc varchar(500) NULL,
|
||||
recent_dark_actv int4 NULL,
|
||||
recent_dark_actv_desc varchar(500) NULL,
|
||||
port_prtcll int4 NULL,
|
||||
port_prtcll_desc varchar(500) NULL,
|
||||
port_risk int4 NULL,
|
||||
port_risk_desc varchar(500) NULL,
|
||||
sts_job int4 NULL,
|
||||
sts_job_desc varchar(500) NULL,
|
||||
drift_chg int4 NULL,
|
||||
drift_chg_desc varchar(500) NULL,
|
||||
risk_event int4 NULL,
|
||||
risk_event_desc varchar(500) NULL,
|
||||
risk_event_desc_ext varchar(500) NULL,
|
||||
ntnlty_chg int4 NULL,
|
||||
ntnlty_chg_desc varchar(500) NULL,
|
||||
ntnlty_prs_mou_perf int4 NULL,
|
||||
ntnlty_prs_mou_perf_desc varchar(500) NULL,
|
||||
ntnlty_tky_mou_perf int4 NULL,
|
||||
ntnlty_tky_mou_perf_desc varchar(500) NULL,
|
||||
ntnlty_uscg_mou_perf int4 NULL,
|
||||
ntnlty_uscg_mou_perf_desc varchar(500) NULL,
|
||||
uscg_excl_ship_cert int4 NULL,
|
||||
uscg_excl_ship_cert_desc varchar(500) NULL,
|
||||
psc_inspection_elps_hr int4 NULL,
|
||||
psc_inspection_elps_hr_desc varchar(500) NULL,
|
||||
psc_inspection int4 NULL,
|
||||
psc_inspection_desc varchar(500) NULL,
|
||||
psc_defect int4 NULL,
|
||||
psc_defect_desc varchar(500) NULL,
|
||||
psc_detained int4 NULL,
|
||||
psc_detained_desc varchar(500) NULL,
|
||||
now_smgrc_evdc int4 NULL,
|
||||
now_smgrc_evdc_desc varchar(500) NULL,
|
||||
docc_chg int4 NULL,
|
||||
docc_chg_desc varchar(500) NULL,
|
||||
now_clfic int4 NULL,
|
||||
now_clfic_desc varchar(500) NULL,
|
||||
now_clfic_desc_ext varchar(500) NULL,
|
||||
clfic_status_chg int4 NULL,
|
||||
clfic_status_chg_desc varchar(500) NULL,
|
||||
pni_insrnc int4 NULL,
|
||||
pni_insrnc_desc varchar(500) NULL,
|
||||
pni_insrnc_desc_ext varchar(500) NULL,
|
||||
ship_nm_chg int4 NULL,
|
||||
ship_nm_chg_desc varchar(500) NULL,
|
||||
gbo_chg int4 NULL,
|
||||
gbo_chg_desc varchar(500) NULL,
|
||||
vslage int4 NULL,
|
||||
vslage_desc varchar(500) NULL,
|
||||
ilgl_fshr_viol int4 NULL,
|
||||
ilgl_fshr_viol_desc varchar(500) NULL,
|
||||
draft_chg int4 NULL,
|
||||
draft_chg_desc varchar(500) NULL,
|
||||
recent_sanction_prtcll int4 NULL,
|
||||
recent_sanction_prtcll_desc varchar(500) NULL,
|
||||
sngl_ship_voy int4 NULL,
|
||||
sngl_ship_voy_desc varchar(500) NULL,
|
||||
fltsfty int4 NULL,
|
||||
fltsfty_desc varchar(500) NULL,
|
||||
flt_psc int4 NULL,
|
||||
flt_psc_desc varchar(500) NULL,
|
||||
spc_inspection_ovdue int4 NULL,
|
||||
spc_inspection_ovdue_desc varchar(500) NULL,
|
||||
ownr_unk int4 NULL,
|
||||
ownr_unk_desc varchar(500) NULL,
|
||||
rss_port_call int4 NULL,
|
||||
rss_port_call_desc varchar(500) NULL,
|
||||
rss_ownr_reg int4 NULL,
|
||||
rss_ownr_reg_desc varchar(500) NULL,
|
||||
rss_sts int4 NULL,
|
||||
rss_sts_desc varchar(500) NULL
|
||||
);
|
||||
COMMENT ON TABLE std_snp_data.tb_ship_risk_detail_info IS 'Risk 상세 데이터 (RisksByImos API)';
|
||||
|
||||
ALTER TABLE std_snp_data.tb_ship_risk_detail_info OWNER TO snp;
|
||||
GRANT SELECT, UPDATE, TRUNCATE, TRIGGER, INSERT, DELETE, REFERENCES ON TABLE std_snp_data.tb_ship_risk_detail_info TO snp;
|
||||
@ -0,0 +1,50 @@
|
||||
package com.snp.batch.jobs.risk.batch.config;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.core.JobExecution;
|
||||
import org.springframework.batch.core.StepContribution;
|
||||
import org.springframework.batch.core.scope.context.ChunkContext;
|
||||
import org.springframework.batch.core.step.tasklet.Tasklet;
|
||||
import org.springframework.batch.repeat.RepeatStatus;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Risk 상세 데이터 수집을 위한 IMO 목록 조회 Tasklet.
|
||||
* std_snp_svc.tb_ship_risk_info 테이블에서 전체 imo_no를 오름차순으로 조회하여
|
||||
* JobExecutionContext에 저장.
|
||||
*/
|
||||
@Slf4j
|
||||
public class RiskDetailImoFetchTasklet implements Tasklet {
|
||||
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
private final String serviceSchema;
|
||||
|
||||
public RiskDetailImoFetchTasklet(JdbcTemplate jdbcTemplate, String serviceSchema) {
|
||||
this.jdbcTemplate = jdbcTemplate;
|
||||
this.serviceSchema = serviceSchema;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
|
||||
JobExecution jobExecution = chunkContext.getStepContext()
|
||||
.getStepExecution().getJobExecution();
|
||||
|
||||
String sql = String.format(
|
||||
"SELECT DISTINCT imo_no FROM %s.tb_ship_risk_info WHERE imo_no IS NOT NULL ORDER BY imo_no ASC",
|
||||
serviceSchema);
|
||||
|
||||
List<String> imoNumbers = jdbcTemplate.queryForList(sql, String.class);
|
||||
|
||||
jobExecution.getExecutionContext().putInt("totalImoCount", imoNumbers.size());
|
||||
if (!imoNumbers.isEmpty()) {
|
||||
jobExecution.getExecutionContext().putString("allImoNumbers", String.join(",", imoNumbers));
|
||||
}
|
||||
|
||||
log.info("[RiskDetailImoFetchTasklet] IMO {} 건 조회 완료 (from {}.tb_ship_risk_info)",
|
||||
imoNumbers.size(), serviceSchema);
|
||||
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,221 @@
|
||||
package com.snp.batch.jobs.risk.batch.config;
|
||||
|
||||
import com.snp.batch.common.batch.config.BasePartitionedJobConfig;
|
||||
import com.snp.batch.common.batch.partition.StringListPartitioner;
|
||||
import com.snp.batch.common.batch.tasklet.LastExecutionUpdateTasklet;
|
||||
import com.snp.batch.jobs.risk.batch.dto.RiskDto;
|
||||
import com.snp.batch.jobs.risk.batch.entity.RiskEntity;
|
||||
import com.snp.batch.jobs.risk.batch.processor.RiskDataProcessor;
|
||||
import com.snp.batch.jobs.risk.batch.reader.RiskDetailDataReader;
|
||||
import com.snp.batch.jobs.risk.batch.writer.RiskDetailDataWriter;
|
||||
import com.snp.batch.service.BatchApiLogService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.Step;
|
||||
import org.springframework.batch.core.configuration.annotation.StepScope;
|
||||
import org.springframework.batch.core.job.builder.JobBuilder;
|
||||
import org.springframework.batch.core.job.flow.JobExecutionDecider;
|
||||
import org.springframework.batch.core.repository.JobRepository;
|
||||
import org.springframework.batch.core.step.builder.StepBuilder;
|
||||
import org.springframework.batch.core.step.tasklet.Tasklet;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@Configuration
|
||||
public class RiskDetailImportJobConfig extends BasePartitionedJobConfig<RiskDto, RiskEntity> {
|
||||
|
||||
private final RiskDetailDataWriter riskDetailDataWriter;
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
private final WebClient maritimeServiceApiWebClient;
|
||||
private final BatchApiLogService batchApiLogService;
|
||||
private final TaskExecutor batchPartitionExecutor;
|
||||
|
||||
@Value("${app.batch.webservice-api.url}")
|
||||
private String apiUrl;
|
||||
|
||||
@Value("${app.batch.target-schema.name}")
|
||||
private String targetSchema;
|
||||
|
||||
@Value("${app.batch.service-schema.name}")
|
||||
private String serviceSchema;
|
||||
|
||||
@Value("${app.batch.risk-detail.partition-count:4}")
|
||||
private int partitionCount;
|
||||
|
||||
@Value("${app.batch.risk-detail.delay-on-success-ms:300}")
|
||||
private long delayOnSuccessMs;
|
||||
|
||||
@Value("${app.batch.risk-detail.delay-on-failure-ms:2000}")
|
||||
private long delayOnFailureMs;
|
||||
|
||||
@Value("${app.batch.last-execution-buffer-hours:24}")
|
||||
private int lastExecutionBufferHours;
|
||||
|
||||
public RiskDetailImportJobConfig(
|
||||
JobRepository jobRepository,
|
||||
PlatformTransactionManager transactionManager,
|
||||
RiskDetailDataWriter riskDetailDataWriter,
|
||||
JdbcTemplate jdbcTemplate,
|
||||
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeServiceApiWebClient,
|
||||
BatchApiLogService batchApiLogService,
|
||||
@Qualifier("batchPartitionExecutor") TaskExecutor batchPartitionExecutor) {
|
||||
super(jobRepository, transactionManager);
|
||||
this.riskDetailDataWriter = riskDetailDataWriter;
|
||||
this.jdbcTemplate = jdbcTemplate;
|
||||
this.maritimeServiceApiWebClient = maritimeServiceApiWebClient;
|
||||
this.batchApiLogService = batchApiLogService;
|
||||
this.batchPartitionExecutor = batchPartitionExecutor;
|
||||
}
|
||||
|
||||
protected String getApiKey() {
|
||||
return "RISK_DETAIL_IMPORT_API";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getJobName() {
|
||||
return "RiskDetailImportJob";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getStepName() {
|
||||
return "RiskDetailImportStep";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getChunkSize() {
|
||||
return 100;
|
||||
}
|
||||
|
||||
// ========================================
|
||||
// Job Flow 정의
|
||||
// ========================================
|
||||
|
||||
@Override
|
||||
protected Job createJobFlow(JobBuilder jobBuilder) {
|
||||
return jobBuilder
|
||||
.start(riskDetailImoFetchStep())
|
||||
.next(riskDetailImoCountDecider())
|
||||
.on("EMPTY_RESPONSE").end()
|
||||
.from(riskDetailImoCountDecider()).on("NORMAL").to(riskDetailPartitionedStep())
|
||||
.next(riskDetailLastExecutionUpdateStep())
|
||||
.end()
|
||||
.build();
|
||||
}
|
||||
|
||||
// ========================================
|
||||
// Step 0: IMO 목록 조회
|
||||
// ========================================
|
||||
|
||||
@Bean
|
||||
public Tasklet riskDetailImoFetchTasklet() {
|
||||
return new RiskDetailImoFetchTasklet(jdbcTemplate, serviceSchema);
|
||||
}
|
||||
|
||||
@Bean(name = "RiskDetailImoFetchStep")
|
||||
public Step riskDetailImoFetchStep() {
|
||||
return new StepBuilder("RiskDetailImoFetchStep", jobRepository)
|
||||
.tasklet(riskDetailImoFetchTasklet(), transactionManager)
|
||||
.build();
|
||||
}
|
||||
|
||||
// ========================================
|
||||
// Decider: IMO 건수 확인
|
||||
// ========================================
|
||||
|
||||
@Bean
|
||||
public JobExecutionDecider riskDetailImoCountDecider() {
|
||||
return createKeyCountDecider("totalImoCount", getJobName());
|
||||
}
|
||||
|
||||
// ========================================
|
||||
// Step 1: Partitioned Step (병렬 처리)
|
||||
// ========================================
|
||||
|
||||
@Bean
|
||||
@StepScope
|
||||
public StringListPartitioner riskDetailPartitioner(
|
||||
@Value("#{jobExecutionContext['allImoNumbers']}") String allImoNumbersStr) {
|
||||
List<String> allImoNumbers = (allImoNumbersStr != null && !allImoNumbersStr.isBlank())
|
||||
? Arrays.asList(allImoNumbersStr.split(","))
|
||||
: Collections.emptyList();
|
||||
return new StringListPartitioner(allImoNumbers, partitionCount, "partitionImoNumbers");
|
||||
}
|
||||
|
||||
@Bean(name = "RiskDetailPartitionedStep")
|
||||
public Step riskDetailPartitionedStep() {
|
||||
return createPartitionedStep(
|
||||
"RiskDetailPartitionedStep", "RiskDetailWorkerStep",
|
||||
riskDetailPartitioner(null), riskDetailWorkerStep(),
|
||||
batchPartitionExecutor, partitionCount);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Step riskDetailWorkerStep() {
|
||||
return new StepBuilder("RiskDetailWorkerStep", jobRepository)
|
||||
.<RiskDto, RiskEntity>chunk(getChunkSize(), transactionManager)
|
||||
.reader(riskDetailDataReader(null, null, null))
|
||||
.processor(riskDetailDataProcessor(null))
|
||||
.writer(riskDetailDataWriter)
|
||||
.build();
|
||||
}
|
||||
|
||||
// ========================================
|
||||
// Reader / Processor (StepScope)
|
||||
// ========================================
|
||||
|
||||
@Bean
|
||||
@StepScope
|
||||
public RiskDetailDataReader riskDetailDataReader(
|
||||
@Value("#{stepExecution.jobExecution.id}") Long jobExecutionId,
|
||||
@Value("#{stepExecution.id}") Long stepExecutionId,
|
||||
@Value("#{stepExecutionContext['partitionImoNumbers']}") String partitionImoNumbers) {
|
||||
RiskDetailDataReader reader = new RiskDetailDataReader(
|
||||
maritimeServiceApiWebClient, batchApiLogService, apiUrl,
|
||||
delayOnSuccessMs, delayOnFailureMs);
|
||||
reader.setExecutionIds(jobExecutionId, stepExecutionId);
|
||||
reader.setPartitionImoNumbers(partitionImoNumbers);
|
||||
return reader;
|
||||
}
|
||||
|
||||
@Bean
|
||||
@StepScope
|
||||
public RiskDataProcessor riskDetailDataProcessor(
|
||||
@Value("#{stepExecution.jobExecution.id}") Long jobExecutionId) {
|
||||
return new RiskDataProcessor(jobExecutionId);
|
||||
}
|
||||
|
||||
// ========================================
|
||||
// Job Bean 등록
|
||||
// ========================================
|
||||
|
||||
@Bean(name = "RiskDetailImportJob")
|
||||
public Job riskDetailImportJob() {
|
||||
return job();
|
||||
}
|
||||
|
||||
// ========================================
|
||||
// Step 2: LastExecution 업데이트
|
||||
// ========================================
|
||||
|
||||
@Bean
|
||||
public Tasklet riskDetailLastExecutionUpdateTasklet() {
|
||||
return new LastExecutionUpdateTasklet(jdbcTemplate, targetSchema, getApiKey(), lastExecutionBufferHours);
|
||||
}
|
||||
|
||||
@Bean(name = "RiskDetailLastExecutionUpdateStep")
|
||||
public Step riskDetailLastExecutionUpdateStep() {
|
||||
return createLastExecutionUpdateStep("RiskDetailLastExecutionUpdateStep",
|
||||
riskDetailLastExecutionUpdateTasklet());
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,148 @@
|
||||
package com.snp.batch.jobs.risk.batch.reader;
|
||||
|
||||
import com.snp.batch.global.model.BatchApiLog;
|
||||
import com.snp.batch.jobs.risk.batch.dto.RiskDto;
|
||||
import com.snp.batch.service.BatchApiLogService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.item.ItemReader;
|
||||
import org.springframework.core.ParameterizedTypeReference;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
|
||||
@Slf4j
|
||||
public class RiskDetailDataReader implements ItemReader<RiskDto> {
|
||||
|
||||
private static final String API_PATH = "/RiskAndCompliance/RisksByImos";
|
||||
private static final int API_BATCH_SIZE = 100; // API 1회 최대 100개 IMO
|
||||
|
||||
private final WebClient maritimeServiceApiWebClient;
|
||||
private final BatchApiLogService batchApiLogService;
|
||||
private final String apiUrl;
|
||||
private final long delayOnSuccessMs;
|
||||
private final long delayOnFailureMs;
|
||||
|
||||
// 파티션에서 받은 IMO 목록
|
||||
private List<String> partitionImoNumbers;
|
||||
private int currentBatchIndex = 0;
|
||||
|
||||
// 현재 배치 응답 데이터 (청크 내에서 하나씩 반환)
|
||||
private Iterator<RiskDto> currentIterator;
|
||||
|
||||
// 실행 ID
|
||||
private Long jobExecutionId;
|
||||
private Long stepExecutionId;
|
||||
|
||||
public RiskDetailDataReader(WebClient maritimeServiceApiWebClient,
|
||||
BatchApiLogService batchApiLogService,
|
||||
String apiUrl,
|
||||
long delayOnSuccessMs,
|
||||
long delayOnFailureMs) {
|
||||
this.maritimeServiceApiWebClient = maritimeServiceApiWebClient;
|
||||
this.batchApiLogService = batchApiLogService;
|
||||
this.apiUrl = apiUrl;
|
||||
this.delayOnSuccessMs = delayOnSuccessMs;
|
||||
this.delayOnFailureMs = delayOnFailureMs;
|
||||
}
|
||||
|
||||
public void setPartitionImoNumbers(String partitionImoNumbersCsv) {
|
||||
if (partitionImoNumbersCsv != null && !partitionImoNumbersCsv.isBlank()) {
|
||||
this.partitionImoNumbers = new ArrayList<>(Arrays.asList(partitionImoNumbersCsv.split(",")));
|
||||
} else {
|
||||
this.partitionImoNumbers = Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
public void setExecutionIds(Long jobExecutionId, Long stepExecutionId) {
|
||||
this.jobExecutionId = jobExecutionId;
|
||||
this.stepExecutionId = stepExecutionId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RiskDto read() throws Exception {
|
||||
// 현재 이터레이터에 남은 항목 반환
|
||||
if (currentIterator != null && currentIterator.hasNext()) {
|
||||
return currentIterator.next();
|
||||
}
|
||||
|
||||
// 모든 배치 처리 완료
|
||||
if (currentBatchIndex >= partitionImoNumbers.size()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 다음 배치 (100개씩) API 호출
|
||||
int fromIndex = currentBatchIndex;
|
||||
int toIndex = Math.min(fromIndex + API_BATCH_SIZE, partitionImoNumbers.size());
|
||||
List<String> batchImos = partitionImoNumbers.subList(fromIndex, toIndex);
|
||||
currentBatchIndex = toIndex;
|
||||
|
||||
String imosParam = String.join(",", batchImos);
|
||||
String fullUri = apiUrl + API_PATH + "?imos=" + imosParam;
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
int statusCode = 200;
|
||||
String errorMessage = null;
|
||||
long responseCount = 0;
|
||||
|
||||
try {
|
||||
log.info("[RiskDetailDataReader] API 호출: IMO {} 건 (index {}-{})",
|
||||
batchImos.size(), fromIndex, toIndex - 1);
|
||||
|
||||
List<RiskDto> response = maritimeServiceApiWebClient.get()
|
||||
.uri(uriBuilder -> uriBuilder
|
||||
.path(API_PATH)
|
||||
.queryParam("imos", imosParam)
|
||||
.build())
|
||||
.retrieve()
|
||||
.bodyToMono(new ParameterizedTypeReference<List<RiskDto>>() {})
|
||||
.block();
|
||||
|
||||
if (response == null || response.isEmpty()) {
|
||||
log.info("[RiskDetailDataReader] API 응답 없음 (index {}-{})", fromIndex, toIndex - 1);
|
||||
if (delayOnSuccessMs > 0) Thread.sleep(delayOnSuccessMs);
|
||||
return read(); // 다음 배치 시도
|
||||
}
|
||||
|
||||
responseCount = response.size();
|
||||
log.info("[RiskDetailDataReader] API 응답: {} 건", responseCount);
|
||||
|
||||
if (delayOnSuccessMs > 0) Thread.sleep(delayOnSuccessMs);
|
||||
|
||||
currentIterator = response.iterator();
|
||||
return currentIterator.hasNext() ? currentIterator.next() : read();
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
statusCode = 500;
|
||||
errorMessage = e.getMessage();
|
||||
log.error("[RiskDetailDataReader] API 호출 실패: {}", e.getMessage(), e);
|
||||
|
||||
if (delayOnFailureMs > 0) {
|
||||
try {
|
||||
Thread.sleep(delayOnFailureMs);
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
// 실패한 배치 건너뛰고 다음 배치로
|
||||
return read();
|
||||
} finally {
|
||||
long duration = System.currentTimeMillis() - startTime;
|
||||
batchApiLogService.saveLog(BatchApiLog.builder()
|
||||
.apiRequestLocation("RiskDetailDataReader")
|
||||
.jobExecutionId(jobExecutionId)
|
||||
.stepExecutionId(stepExecutionId)
|
||||
.requestUri(fullUri)
|
||||
.httpMethod("GET")
|
||||
.statusCode(statusCode)
|
||||
.responseTimeMs(duration)
|
||||
.responseCount(responseCount)
|
||||
.errorMessage(errorMessage)
|
||||
.createdAt(LocalDateTime.now())
|
||||
.build());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -7,4 +7,5 @@ import java.util.List;
|
||||
public interface RiskRepository {
|
||||
void saveRiskAll(List<RiskEntity> items);
|
||||
// void saveRiskHistoryAll(List<RiskEntity> items);
|
||||
void saveRiskDetailAll(List<RiskEntity> items);
|
||||
}
|
||||
|
||||
@ -22,6 +22,9 @@ public class RiskRepositoryImpl extends BaseJdbcRepository<RiskEntity, Long> imp
|
||||
@Value("${app.batch.target-schema.tables.risk-compliance-001}")
|
||||
private String tableName;
|
||||
|
||||
@Value("${app.batch.target-schema.tables.risk-detail-001:tb_ship_risk_detail_info}")
|
||||
private String riskDetailTableName;
|
||||
|
||||
public RiskRepositoryImpl(JdbcTemplate jdbcTemplate) {
|
||||
super(jdbcTemplate);
|
||||
}
|
||||
@ -140,6 +143,189 @@ public class RiskRepositoryImpl extends BaseJdbcRepository<RiskEntity, Long> imp
|
||||
return "RiskEntity";
|
||||
}
|
||||
|
||||
private String getRiskDetailInsertSql() {
|
||||
return """
|
||||
INSERT INTO %s.%s(
|
||||
imo_no, last_mdfcn_dt,
|
||||
risk_data_maint,
|
||||
ais_notrcv_elps_days, ais_notrcv_elps_days_desc,
|
||||
ais_lwrnk_days, ais_lwrnk_days_desc,
|
||||
ais_up_imo_desc, ais_up_imo_desc_val,
|
||||
othr_ship_nm_voy_yn, othr_ship_nm_voy_yn_desc,
|
||||
mmsi_anom_message, mmsi_anom_message_desc,
|
||||
recent_dark_actv, recent_dark_actv_desc,
|
||||
port_prtcll, port_prtcll_desc,
|
||||
port_risk, port_risk_desc,
|
||||
sts_job, sts_job_desc,
|
||||
drift_chg, drift_chg_desc,
|
||||
risk_event, risk_event_desc, risk_event_desc_ext,
|
||||
ntnlty_chg, ntnlty_chg_desc,
|
||||
ntnlty_prs_mou_perf, ntnlty_prs_mou_perf_desc,
|
||||
ntnlty_tky_mou_perf, ntnlty_tky_mou_perf_desc,
|
||||
ntnlty_uscg_mou_perf, ntnlty_uscg_mou_perf_desc,
|
||||
uscg_excl_ship_cert, uscg_excl_ship_cert_desc,
|
||||
psc_inspection_elps_hr, psc_inspection_elps_hr_desc,
|
||||
psc_inspection, psc_inspection_desc,
|
||||
psc_defect, psc_defect_desc,
|
||||
psc_detained, psc_detained_desc,
|
||||
now_smgrc_evdc, now_smgrc_evdc_desc,
|
||||
docc_chg, docc_chg_desc,
|
||||
now_clfic, now_clfic_desc, now_clfic_desc_ext,
|
||||
clfic_status_chg, clfic_status_chg_desc,
|
||||
pni_insrnc, pni_insrnc_desc, pni_insrnc_desc_ext,
|
||||
ship_nm_chg, ship_nm_chg_desc,
|
||||
gbo_chg, gbo_chg_desc,
|
||||
vslage, vslage_desc,
|
||||
ilgl_fshr_viol, ilgl_fshr_viol_desc,
|
||||
draft_chg, draft_chg_desc,
|
||||
recent_sanction_prtcll, recent_sanction_prtcll_desc,
|
||||
sngl_ship_voy, sngl_ship_voy_desc,
|
||||
fltsfty, fltsfty_desc,
|
||||
flt_psc, flt_psc_desc,
|
||||
spc_inspection_ovdue, spc_inspection_ovdue_desc,
|
||||
ownr_unk, ownr_unk_desc,
|
||||
rss_port_call, rss_port_call_desc,
|
||||
rss_ownr_reg, rss_ownr_reg_desc,
|
||||
rss_sts, rss_sts_desc,
|
||||
job_execution_id, creatr_id
|
||||
)
|
||||
VALUES (
|
||||
?, ?::timestamptz, -- 1-2: imo_no, last_mdfcn_dt
|
||||
?, -- 3: risk_data_maint
|
||||
?, ?, -- 4-5: ais_notrcv_elps_days
|
||||
?, ?, -- 6-7: ais_lwrnk_days
|
||||
?, ?, -- 8-9: ais_up_imo_desc
|
||||
?, ?, -- 10-11: othr_ship_nm_voy_yn
|
||||
?, ?, -- 12-13: mmsi_anom_message
|
||||
?, ?, -- 14-15: recent_dark_actv
|
||||
?, ?, -- 16-17: port_prtcll
|
||||
?, ?, -- 18-19: port_risk
|
||||
?, ?, -- 20-21: sts_job
|
||||
?, ?, -- 22-23: drift_chg
|
||||
?, ?, ?, -- 24-26: risk_event (+ext)
|
||||
?, ?, -- 27-28: ntnlty_chg
|
||||
?, ?, -- 29-30: ntnlty_prs_mou_perf
|
||||
?, ?, -- 31-32: ntnlty_tky_mou_perf
|
||||
?, ?, -- 33-34: ntnlty_uscg_mou_perf
|
||||
?, ?, -- 35-36: uscg_excl_ship_cert
|
||||
?, ?, -- 37-38: psc_inspection_elps_hr
|
||||
?, ?, -- 39-40: psc_inspection
|
||||
?, ?, -- 41-42: psc_defect
|
||||
?, ?, -- 43-44: psc_detained
|
||||
?, ?, -- 45-46: now_smgrc_evdc
|
||||
?, ?, -- 47-48: docc_chg
|
||||
?, ?, ?, -- 49-51: now_clfic (+ext)
|
||||
?, ?, -- 52-53: clfic_status_chg
|
||||
?, ?, ?, -- 54-56: pni_insrnc (+ext)
|
||||
?, ?, -- 57-58: ship_nm_chg
|
||||
?, ?, -- 59-60: gbo_chg
|
||||
?, ?, -- 61-62: vslage
|
||||
?, ?, -- 63-64: ilgl_fshr_viol
|
||||
?, ?, -- 65-66: draft_chg
|
||||
?, ?, -- 67-68: recent_sanction_prtcll
|
||||
?, ?, -- 69-70: sngl_ship_voy
|
||||
?, ?, -- 71-72: fltsfty
|
||||
?, ?, -- 73-74: flt_psc
|
||||
?, ?, -- 75-76: spc_inspection_ovdue
|
||||
?, ?, -- 77-78: ownr_unk
|
||||
?, ?, -- 79-80: rss_port_call
|
||||
?, ?, -- 81-82: rss_ownr_reg
|
||||
?, ?, -- 83-84: rss_sts
|
||||
?, ? -- 85-86: job_execution_id, creatr_id
|
||||
)
|
||||
""".formatted(targetSchema, riskDetailTableName);
|
||||
}
|
||||
|
||||
private void setRiskDetailParameters(PreparedStatement ps, RiskEntity entity) throws Exception {
|
||||
int idx = 1;
|
||||
ps.setString(idx++, entity.getLrno());
|
||||
ps.setString(idx++, entity.getLastUpdated());
|
||||
ps.setObject(idx++, entity.getRiskDataMaintained(), Types.INTEGER);
|
||||
ps.setObject(idx++, entity.getDaysSinceLastSeenOnAIS(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getDaysSinceLastSeenOnAISNarrative());
|
||||
ps.setObject(idx++, entity.getDaysUnderAIS(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getDaysUnderAISNarrative());
|
||||
ps.setObject(idx++, entity.getImoCorrectOnAIS(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getImoCorrectOnAISNarrative());
|
||||
ps.setObject(idx++, entity.getSailingUnderName(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getSailingUnderNameNarrative());
|
||||
ps.setObject(idx++, entity.getAnomalousMessagesFromMMSI(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getAnomalousMessagesFromMMSINarrative());
|
||||
ps.setObject(idx++, entity.getMostRecentDarkActivity(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getMostRecentDarkActivityNarrative());
|
||||
ps.setObject(idx++, entity.getPortCalls(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getPortCallsNarrative());
|
||||
ps.setObject(idx++, entity.getPortRisk(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getPortRiskNarrative());
|
||||
ps.setObject(idx++, entity.getStsOperations(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getStsOperationsNarrative());
|
||||
ps.setObject(idx++, entity.getDriftingHighSeas(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getDriftingHighSeasNarrative());
|
||||
ps.setObject(idx++, entity.getRiskEvents(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getRiskEventNarrative());
|
||||
ps.setString(idx++, entity.getRiskEventNarrativeExtended());
|
||||
ps.setObject(idx++, entity.getFlagChanges(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getFlagChangeNarrative());
|
||||
ps.setObject(idx++, entity.getFlagParisMOUPerformance(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getFlagParisMOUPerformanceNarrative());
|
||||
ps.setObject(idx++, entity.getFlagTokyoMOUPeformance(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getFlagTokyoMOUPeformanceNarrative());
|
||||
ps.setObject(idx++, entity.getFlagUSCGMOUPerformance(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getFlagUSCGMOUPerformanceNarrative());
|
||||
ps.setObject(idx++, entity.getUscgQualship21(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getUscgQualship21Narrative());
|
||||
ps.setObject(idx++, entity.getTimeSincePSCInspection(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getTimeSincePSCInspectionNarrative());
|
||||
ps.setObject(idx++, entity.getPscInspections(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getPscInspectionNarrative());
|
||||
ps.setObject(idx++, entity.getPscDefects(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getPscDefectsNarrative());
|
||||
ps.setObject(idx++, entity.getPscDetentions(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getPscDetentionsNarrative());
|
||||
ps.setObject(idx++, entity.getCurrentSMCCertificate(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getCurrentSMCCertificateNarrative());
|
||||
ps.setObject(idx++, entity.getDocChanges(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getDocChangesNarrative());
|
||||
ps.setObject(idx++, entity.getCurrentClass(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getCurrentClassNarrative());
|
||||
ps.setString(idx++, entity.getCurrentClassNarrativeExtended());
|
||||
ps.setObject(idx++, entity.getClassStatusChanges(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getClassStatusChangesNarrative());
|
||||
ps.setObject(idx++, entity.getPandICoverage(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getPandICoverageNarrative());
|
||||
ps.setString(idx++, entity.getPandICoverageNarrativeExtended());
|
||||
ps.setObject(idx++, entity.getNameChanges(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getNameChangesNarrative());
|
||||
ps.setObject(idx++, entity.getGboChanges(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getGboChangesNarrative());
|
||||
ps.setObject(idx++, entity.getAgeOfShip(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getAgeofShipNarrative());
|
||||
ps.setObject(idx++, entity.getIuuFishingViolation(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getIuuFishingNarrative());
|
||||
ps.setObject(idx++, entity.getDraughtChanges(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getDraughtChangesNarrative());
|
||||
ps.setObject(idx++, entity.getMostRecentSanctionedPortCall(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getMostRecentSanctionedPortCallNarrative());
|
||||
ps.setObject(idx++, entity.getSingleShipOperation(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getSingleShipOperationNarrative());
|
||||
ps.setObject(idx++, entity.getFleetSafety(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getFleetSafetyNarrative());
|
||||
ps.setObject(idx++, entity.getFleetPSC(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getFleetPSCNarrative());
|
||||
ps.setObject(idx++, entity.getSpecialSurveyOverdue(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getSpecialSurveyOverdueNarrative());
|
||||
ps.setObject(idx++, entity.getOwnerUnknown(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getOwnerUnknownNarrative());
|
||||
ps.setObject(idx++, entity.getRussianPortCall(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getRussianPortCallNarrative());
|
||||
ps.setObject(idx++, entity.getRussianOwnerRegistration(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getRussianOwnerRegistrationNarrative());
|
||||
ps.setObject(idx++, entity.getRussianSTS(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getRussianSTSNarrative());
|
||||
ps.setObject(idx++, entity.getJobExecutionId(), Types.INTEGER);
|
||||
ps.setString(idx++, entity.getCreatedBy());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void saveRiskAll(List<RiskEntity> items) {
|
||||
if (items == null || items.isEmpty()) {
|
||||
@ -158,4 +344,22 @@ public class RiskRepositoryImpl extends BaseJdbcRepository<RiskEntity, Long> imp
|
||||
log.info("{} 전체 저장 완료: 수정={} 건", getEntityName(), items.size());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void saveRiskDetailAll(List<RiskEntity> items) {
|
||||
if (items == null || items.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
jdbcTemplate.batchUpdate(getRiskDetailInsertSql(), items, items.size(),
|
||||
(ps, entity) -> {
|
||||
try {
|
||||
setRiskDetailParameters(ps, entity);
|
||||
} catch (Exception e) {
|
||||
log.error("RiskDetail 배치 파라미터 설정 실패", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
log.info("RiskDetail 전체 저장 완료: 수정={} 건", items.size());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,26 @@
|
||||
package com.snp.batch.jobs.risk.batch.writer;
|
||||
|
||||
import com.snp.batch.common.batch.writer.BaseWriter;
|
||||
import com.snp.batch.jobs.risk.batch.entity.RiskEntity;
|
||||
import com.snp.batch.jobs.risk.batch.repository.RiskRepository;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RiskDetailDataWriter extends BaseWriter<RiskEntity> {
|
||||
|
||||
private final RiskRepository riskRepository;
|
||||
|
||||
public RiskDetailDataWriter(RiskRepository riskRepository) {
|
||||
super("riskDetailRepository");
|
||||
this.riskRepository = riskRepository;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeItems(List<RiskEntity> items) throws Exception {
|
||||
riskRepository.saveRiskDetailAll(items);
|
||||
}
|
||||
}
|
||||
@ -145,6 +145,7 @@ app:
|
||||
risk-compliance-001: tb_ship_risk_info
|
||||
risk-compliance-002: tb_ship_compliance_info
|
||||
risk-compliance-003: tb_company_compliance_info
|
||||
risk-detail-001: tb_ship_risk_detail_info
|
||||
ship-028: ship_detail_hash_json
|
||||
service-schema:
|
||||
name: std_snp_svc
|
||||
@ -166,6 +167,12 @@ app:
|
||||
# LAST_EXECUTION 버퍼 시간 (시간 단위) - 외부 DB 동기화 지연 대응
|
||||
last-execution-buffer-hours: 24
|
||||
|
||||
# RiskDetail 배치 설정
|
||||
risk-detail:
|
||||
partition-count: 4 # 병렬 파티션 수
|
||||
delay-on-success-ms: 300 # 성공 시 딜레이 (ms)
|
||||
delay-on-failure-ms: 2000 # 실패 시 딜레이 (ms)
|
||||
|
||||
# ShipDetailUpdate 배치 설정
|
||||
ship-detail-update:
|
||||
batch-size: 10 # API 요청 당 IMO 건수
|
||||
|
||||
불러오는 중...
Reference in New Issue
Block a user