From 324d20f8745cb8800442cc52004afff6fb9cbaae Mon Sep 17 00:00:00 2001 From: HYOJIN Date: Tue, 24 Mar 2026 11:01:32 +0900 Subject: [PATCH 1/2] =?UTF-8?q?feat(risk):=20Risk=20=EC=83=81=EC=84=B8=20?= =?UTF-8?q?=EB=8D=B0=EC=9D=B4=ED=84=B0=20=EC=88=98=EC=A7=91=20=EB=B0=B0?= =?UTF-8?q?=EC=B9=98=20=ED=94=84=EB=A1=9C=EC=84=B8=EC=8A=A4=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80=20(RisksByImos=20API)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit tb_ship_risk_info에서 전체 IMO 조회 → 파티션 병렬 처리로 RisksByImos API 호출 → tb_ship_risk_detail_info에 INSERT Closes #65 Co-Authored-By: Claude Opus 4.6 (1M context) --- docs/ddl/tb_ship_risk_detail_info.sql | 102 ++++++++ .../config/RiskDetailImoFetchTasklet.java | 50 ++++ .../config/RiskDetailImportJobConfig.java | 221 ++++++++++++++++++ .../batch/reader/RiskDetailDataReader.java | 148 ++++++++++++ .../risk/batch/repository/RiskRepository.java | 1 + .../batch/repository/RiskRepositoryImpl.java | 204 ++++++++++++++++ .../batch/writer/RiskDetailDataWriter.java | 26 +++ src/main/resources/application.yml | 7 + 8 files changed, 759 insertions(+) create mode 100644 docs/ddl/tb_ship_risk_detail_info.sql create mode 100644 src/main/java/com/snp/batch/jobs/risk/batch/config/RiskDetailImoFetchTasklet.java create mode 100644 src/main/java/com/snp/batch/jobs/risk/batch/config/RiskDetailImportJobConfig.java create mode 100644 src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDetailDataReader.java create mode 100644 src/main/java/com/snp/batch/jobs/risk/batch/writer/RiskDetailDataWriter.java diff --git a/docs/ddl/tb_ship_risk_detail_info.sql b/docs/ddl/tb_ship_risk_detail_info.sql new file mode 100644 index 0000000..ceb50fe --- /dev/null +++ b/docs/ddl/tb_ship_risk_detail_info.sql @@ -0,0 +1,102 @@ +-- std_snp_data.tb_ship_risk_detail_info definition + +-- Drop table + +-- DROP TABLE std_snp_data.tb_ship_risk_detail_info; + +CREATE TABLE std_snp_data.tb_ship_risk_detail_info ( + row_index bigserial NOT NULL, + crt_dt timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL, + creatr_id varchar(34) NOT NULL, + mdfcn_dt timestamp NULL, + mdfr_id varchar(34) NULL, + job_execution_id int8 NOT NULL, + imo_no varchar(20) NULL, + last_mdfcn_dt timestamptz NULL, + risk_data_maint int4 NULL, + ais_notrcv_elps_days int4 NULL, + ais_notrcv_elps_days_desc varchar(500) NULL, + ais_lwrnk_days int4 NULL, + ais_lwrnk_days_desc varchar(500) NULL, + ais_up_imo_desc int4 NULL, + ais_up_imo_desc_val varchar(500) NULL, + othr_ship_nm_voy_yn int4 NULL, + othr_ship_nm_voy_yn_desc varchar(500) NULL, + mmsi_anom_message int4 NULL, + mmsi_anom_message_desc varchar(500) NULL, + recent_dark_actv int4 NULL, + recent_dark_actv_desc varchar(500) NULL, + port_prtcll int4 NULL, + port_prtcll_desc varchar(500) NULL, + port_risk int4 NULL, + port_risk_desc varchar(500) NULL, + sts_job int4 NULL, + sts_job_desc varchar(500) NULL, + drift_chg int4 NULL, + drift_chg_desc varchar(500) NULL, + risk_event int4 NULL, + risk_event_desc varchar(500) NULL, + risk_event_desc_ext varchar(500) NULL, + ntnlty_chg int4 NULL, + ntnlty_chg_desc varchar(500) NULL, + ntnlty_prs_mou_perf int4 NULL, + ntnlty_prs_mou_perf_desc varchar(500) NULL, + ntnlty_tky_mou_perf int4 NULL, + ntnlty_tky_mou_perf_desc varchar(500) NULL, + ntnlty_uscg_mou_perf int4 NULL, + ntnlty_uscg_mou_perf_desc varchar(500) NULL, + uscg_excl_ship_cert int4 NULL, + uscg_excl_ship_cert_desc varchar(500) NULL, + psc_inspection_elps_hr int4 NULL, + psc_inspection_elps_hr_desc varchar(500) NULL, + psc_inspection int4 NULL, + psc_inspection_desc varchar(500) NULL, + psc_defect int4 NULL, + psc_defect_desc varchar(500) NULL, + psc_detained int4 NULL, + psc_detained_desc varchar(500) NULL, + now_smgrc_evdc int4 NULL, + now_smgrc_evdc_desc varchar(500) NULL, + docc_chg int4 NULL, + docc_chg_desc varchar(500) NULL, + now_clfic int4 NULL, + now_clfic_desc varchar(500) NULL, + now_clfic_desc_ext varchar(500) NULL, + clfic_status_chg int4 NULL, + clfic_status_chg_desc varchar(500) NULL, + pni_insrnc int4 NULL, + pni_insrnc_desc varchar(500) NULL, + pni_insrnc_desc_ext varchar(500) NULL, + ship_nm_chg int4 NULL, + ship_nm_chg_desc varchar(500) NULL, + gbo_chg int4 NULL, + gbo_chg_desc varchar(500) NULL, + vslage int4 NULL, + vslage_desc varchar(500) NULL, + ilgl_fshr_viol int4 NULL, + ilgl_fshr_viol_desc varchar(500) NULL, + draft_chg int4 NULL, + draft_chg_desc varchar(500) NULL, + recent_sanction_prtcll int4 NULL, + recent_sanction_prtcll_desc varchar(500) NULL, + sngl_ship_voy int4 NULL, + sngl_ship_voy_desc varchar(500) NULL, + fltsfty int4 NULL, + fltsfty_desc varchar(500) NULL, + flt_psc int4 NULL, + flt_psc_desc varchar(500) NULL, + spc_inspection_ovdue int4 NULL, + spc_inspection_ovdue_desc varchar(500) NULL, + ownr_unk int4 NULL, + ownr_unk_desc varchar(500) NULL, + rss_port_call int4 NULL, + rss_port_call_desc varchar(500) NULL, + rss_ownr_reg int4 NULL, + rss_ownr_reg_desc varchar(500) NULL, + rss_sts int4 NULL, + rss_sts_desc varchar(500) NULL +); +COMMENT ON TABLE std_snp_data.tb_ship_risk_detail_info IS 'Risk 상세 데이터 (RisksByImos API)'; + +ALTER TABLE std_snp_data.tb_ship_risk_detail_info OWNER TO snp; +GRANT SELECT, UPDATE, TRUNCATE, TRIGGER, INSERT, DELETE, REFERENCES ON TABLE std_snp_data.tb_ship_risk_detail_info TO snp; diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskDetailImoFetchTasklet.java b/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskDetailImoFetchTasklet.java new file mode 100644 index 0000000..13a285c --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskDetailImoFetchTasklet.java @@ -0,0 +1,50 @@ +package com.snp.batch.jobs.risk.batch.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.util.List; + +/** + * Risk 상세 데이터 수집을 위한 IMO 목록 조회 Tasklet. + * std_snp_svc.tb_ship_risk_info 테이블에서 전체 imo_no를 오름차순으로 조회하여 + * JobExecutionContext에 저장. + */ +@Slf4j +public class RiskDetailImoFetchTasklet implements Tasklet { + + private final JdbcTemplate jdbcTemplate; + private final String serviceSchema; + + public RiskDetailImoFetchTasklet(JdbcTemplate jdbcTemplate, String serviceSchema) { + this.jdbcTemplate = jdbcTemplate; + this.serviceSchema = serviceSchema; + } + + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) { + JobExecution jobExecution = chunkContext.getStepContext() + .getStepExecution().getJobExecution(); + + String sql = String.format( + "SELECT DISTINCT imo_no FROM %s.tb_ship_risk_info WHERE imo_no IS NOT NULL ORDER BY imo_no ASC", + serviceSchema); + + List imoNumbers = jdbcTemplate.queryForList(sql, String.class); + + jobExecution.getExecutionContext().putInt("totalImoCount", imoNumbers.size()); + if (!imoNumbers.isEmpty()) { + jobExecution.getExecutionContext().putString("allImoNumbers", String.join(",", imoNumbers)); + } + + log.info("[RiskDetailImoFetchTasklet] IMO {} 건 조회 완료 (from {}.tb_ship_risk_info)", + imoNumbers.size(), serviceSchema); + + return RepeatStatus.FINISHED; + } +} diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskDetailImportJobConfig.java b/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskDetailImportJobConfig.java new file mode 100644 index 0000000..d9a351f --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskDetailImportJobConfig.java @@ -0,0 +1,221 @@ +package com.snp.batch.jobs.risk.batch.config; + +import com.snp.batch.common.batch.config.BasePartitionedJobConfig; +import com.snp.batch.common.batch.partition.StringListPartitioner; +import com.snp.batch.common.batch.tasklet.LastExecutionUpdateTasklet; +import com.snp.batch.jobs.risk.batch.dto.RiskDto; +import com.snp.batch.jobs.risk.batch.entity.RiskEntity; +import com.snp.batch.jobs.risk.batch.processor.RiskDataProcessor; +import com.snp.batch.jobs.risk.batch.reader.RiskDetailDataReader; +import com.snp.batch.jobs.risk.batch.writer.RiskDetailDataWriter; +import com.snp.batch.service.BatchApiLogService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.job.flow.JobExecutionDecider; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.TaskExecutor; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.web.reactive.function.client.WebClient; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +@Slf4j +@Configuration +public class RiskDetailImportJobConfig extends BasePartitionedJobConfig { + + private final RiskDetailDataWriter riskDetailDataWriter; + private final JdbcTemplate jdbcTemplate; + private final WebClient maritimeServiceApiWebClient; + private final BatchApiLogService batchApiLogService; + private final TaskExecutor batchPartitionExecutor; + + @Value("${app.batch.webservice-api.url}") + private String apiUrl; + + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.service-schema.name}") + private String serviceSchema; + + @Value("${app.batch.risk-detail.partition-count:4}") + private int partitionCount; + + @Value("${app.batch.risk-detail.delay-on-success-ms:300}") + private long delayOnSuccessMs; + + @Value("${app.batch.risk-detail.delay-on-failure-ms:2000}") + private long delayOnFailureMs; + + @Value("${app.batch.last-execution-buffer-hours:24}") + private int lastExecutionBufferHours; + + public RiskDetailImportJobConfig( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + RiskDetailDataWriter riskDetailDataWriter, + JdbcTemplate jdbcTemplate, + @Qualifier("maritimeServiceApiWebClient") WebClient maritimeServiceApiWebClient, + BatchApiLogService batchApiLogService, + @Qualifier("batchPartitionExecutor") TaskExecutor batchPartitionExecutor) { + super(jobRepository, transactionManager); + this.riskDetailDataWriter = riskDetailDataWriter; + this.jdbcTemplate = jdbcTemplate; + this.maritimeServiceApiWebClient = maritimeServiceApiWebClient; + this.batchApiLogService = batchApiLogService; + this.batchPartitionExecutor = batchPartitionExecutor; + } + + protected String getApiKey() { + return "RISK_DETAIL_IMPORT_API"; + } + + @Override + protected String getJobName() { + return "RiskDetailImportJob"; + } + + @Override + protected String getStepName() { + return "RiskDetailImportStep"; + } + + @Override + protected int getChunkSize() { + return 100; + } + + // ======================================== + // Job Flow 정의 + // ======================================== + + @Override + protected Job createJobFlow(JobBuilder jobBuilder) { + return jobBuilder + .start(riskDetailImoFetchStep()) + .next(riskDetailImoCountDecider()) + .on("EMPTY_RESPONSE").end() + .from(riskDetailImoCountDecider()).on("NORMAL").to(riskDetailPartitionedStep()) + .next(riskDetailLastExecutionUpdateStep()) + .end() + .build(); + } + + // ======================================== + // Step 0: IMO 목록 조회 + // ======================================== + + @Bean + public Tasklet riskDetailImoFetchTasklet() { + return new RiskDetailImoFetchTasklet(jdbcTemplate, serviceSchema); + } + + @Bean(name = "RiskDetailImoFetchStep") + public Step riskDetailImoFetchStep() { + return new StepBuilder("RiskDetailImoFetchStep", jobRepository) + .tasklet(riskDetailImoFetchTasklet(), transactionManager) + .build(); + } + + // ======================================== + // Decider: IMO 건수 확인 + // ======================================== + + @Bean + public JobExecutionDecider riskDetailImoCountDecider() { + return createKeyCountDecider("totalImoCount", getJobName()); + } + + // ======================================== + // Step 1: Partitioned Step (병렬 처리) + // ======================================== + + @Bean + @StepScope + public StringListPartitioner riskDetailPartitioner( + @Value("#{jobExecutionContext['allImoNumbers']}") String allImoNumbersStr) { + List allImoNumbers = (allImoNumbersStr != null && !allImoNumbersStr.isBlank()) + ? Arrays.asList(allImoNumbersStr.split(",")) + : Collections.emptyList(); + return new StringListPartitioner(allImoNumbers, partitionCount, "partitionImoNumbers"); + } + + @Bean(name = "RiskDetailPartitionedStep") + public Step riskDetailPartitionedStep() { + return createPartitionedStep( + "RiskDetailPartitionedStep", "RiskDetailWorkerStep", + riskDetailPartitioner(null), riskDetailWorkerStep(), + batchPartitionExecutor, partitionCount); + } + + @Bean + public Step riskDetailWorkerStep() { + return new StepBuilder("RiskDetailWorkerStep", jobRepository) + .chunk(getChunkSize(), transactionManager) + .reader(riskDetailDataReader(null, null, null)) + .processor(riskDetailDataProcessor(null)) + .writer(riskDetailDataWriter) + .build(); + } + + // ======================================== + // Reader / Processor (StepScope) + // ======================================== + + @Bean + @StepScope + public RiskDetailDataReader riskDetailDataReader( + @Value("#{stepExecution.jobExecution.id}") Long jobExecutionId, + @Value("#{stepExecution.id}") Long stepExecutionId, + @Value("#{stepExecutionContext['partitionImoNumbers']}") String partitionImoNumbers) { + RiskDetailDataReader reader = new RiskDetailDataReader( + maritimeServiceApiWebClient, batchApiLogService, apiUrl, + delayOnSuccessMs, delayOnFailureMs); + reader.setExecutionIds(jobExecutionId, stepExecutionId); + reader.setPartitionImoNumbers(partitionImoNumbers); + return reader; + } + + @Bean + @StepScope + public RiskDataProcessor riskDetailDataProcessor( + @Value("#{stepExecution.jobExecution.id}") Long jobExecutionId) { + return new RiskDataProcessor(jobExecutionId); + } + + // ======================================== + // Job Bean 등록 + // ======================================== + + @Bean(name = "RiskDetailImportJob") + public Job riskDetailImportJob() { + return job(); + } + + // ======================================== + // Step 2: LastExecution 업데이트 + // ======================================== + + @Bean + public Tasklet riskDetailLastExecutionUpdateTasklet() { + return new LastExecutionUpdateTasklet(jdbcTemplate, targetSchema, getApiKey(), lastExecutionBufferHours); + } + + @Bean(name = "RiskDetailLastExecutionUpdateStep") + public Step riskDetailLastExecutionUpdateStep() { + return createLastExecutionUpdateStep("RiskDetailLastExecutionUpdateStep", + riskDetailLastExecutionUpdateTasklet()); + } +} diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDetailDataReader.java b/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDetailDataReader.java new file mode 100644 index 0000000..2e8e19a --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDetailDataReader.java @@ -0,0 +1,148 @@ +package com.snp.batch.jobs.risk.batch.reader; + +import com.snp.batch.global.model.BatchApiLog; +import com.snp.batch.jobs.risk.batch.dto.RiskDto; +import com.snp.batch.service.BatchApiLogService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.item.ItemReader; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.web.reactive.function.client.WebClient; + +import java.time.LocalDateTime; +import java.util.*; + +@Slf4j +public class RiskDetailDataReader implements ItemReader { + + private static final String API_PATH = "/RiskAndCompliance/RisksByImos"; + private static final int API_BATCH_SIZE = 100; // API 1회 최대 100개 IMO + + private final WebClient maritimeServiceApiWebClient; + private final BatchApiLogService batchApiLogService; + private final String apiUrl; + private final long delayOnSuccessMs; + private final long delayOnFailureMs; + + // 파티션에서 받은 IMO 목록 + private List partitionImoNumbers; + private int currentBatchIndex = 0; + + // 현재 배치 응답 데이터 (청크 내에서 하나씩 반환) + private Iterator currentIterator; + + // 실행 ID + private Long jobExecutionId; + private Long stepExecutionId; + + public RiskDetailDataReader(WebClient maritimeServiceApiWebClient, + BatchApiLogService batchApiLogService, + String apiUrl, + long delayOnSuccessMs, + long delayOnFailureMs) { + this.maritimeServiceApiWebClient = maritimeServiceApiWebClient; + this.batchApiLogService = batchApiLogService; + this.apiUrl = apiUrl; + this.delayOnSuccessMs = delayOnSuccessMs; + this.delayOnFailureMs = delayOnFailureMs; + } + + public void setPartitionImoNumbers(String partitionImoNumbersCsv) { + if (partitionImoNumbersCsv != null && !partitionImoNumbersCsv.isBlank()) { + this.partitionImoNumbers = new ArrayList<>(Arrays.asList(partitionImoNumbersCsv.split(","))); + } else { + this.partitionImoNumbers = Collections.emptyList(); + } + } + + public void setExecutionIds(Long jobExecutionId, Long stepExecutionId) { + this.jobExecutionId = jobExecutionId; + this.stepExecutionId = stepExecutionId; + } + + @Override + public RiskDto read() throws Exception { + // 현재 이터레이터에 남은 항목 반환 + if (currentIterator != null && currentIterator.hasNext()) { + return currentIterator.next(); + } + + // 모든 배치 처리 완료 + if (currentBatchIndex >= partitionImoNumbers.size()) { + return null; + } + + // 다음 배치 (100개씩) API 호출 + int fromIndex = currentBatchIndex; + int toIndex = Math.min(fromIndex + API_BATCH_SIZE, partitionImoNumbers.size()); + List batchImos = partitionImoNumbers.subList(fromIndex, toIndex); + currentBatchIndex = toIndex; + + String imosParam = String.join(",", batchImos); + String fullUri = apiUrl + API_PATH + "?imos=" + imosParam; + + long startTime = System.currentTimeMillis(); + int statusCode = 200; + String errorMessage = null; + long responseCount = 0; + + try { + log.info("[RiskDetailDataReader] API 호출: IMO {} 건 (index {}-{})", + batchImos.size(), fromIndex, toIndex - 1); + + List response = maritimeServiceApiWebClient.get() + .uri(uriBuilder -> uriBuilder + .path(API_PATH) + .queryParam("imos", imosParam) + .build()) + .retrieve() + .bodyToMono(new ParameterizedTypeReference>() {}) + .block(); + + if (response == null || response.isEmpty()) { + log.info("[RiskDetailDataReader] API 응답 없음 (index {}-{})", fromIndex, toIndex - 1); + if (delayOnSuccessMs > 0) Thread.sleep(delayOnSuccessMs); + return read(); // 다음 배치 시도 + } + + responseCount = response.size(); + log.info("[RiskDetailDataReader] API 응답: {} 건", responseCount); + + if (delayOnSuccessMs > 0) Thread.sleep(delayOnSuccessMs); + + currentIterator = response.iterator(); + return currentIterator.hasNext() ? currentIterator.next() : read(); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw e; + } catch (Exception e) { + statusCode = 500; + errorMessage = e.getMessage(); + log.error("[RiskDetailDataReader] API 호출 실패: {}", e.getMessage(), e); + + if (delayOnFailureMs > 0) { + try { + Thread.sleep(delayOnFailureMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + // 실패한 배치 건너뛰고 다음 배치로 + return read(); + } finally { + long duration = System.currentTimeMillis() - startTime; + batchApiLogService.saveLog(BatchApiLog.builder() + .apiRequestLocation("RiskDetailDataReader") + .jobExecutionId(jobExecutionId) + .stepExecutionId(stepExecutionId) + .requestUri(fullUri) + .httpMethod("GET") + .statusCode(statusCode) + .responseTimeMs(duration) + .responseCount(responseCount) + .errorMessage(errorMessage) + .createdAt(LocalDateTime.now()) + .build()); + } + } +} diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepository.java b/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepository.java index 21e3d0c..317e198 100644 --- a/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepository.java +++ b/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepository.java @@ -7,4 +7,5 @@ import java.util.List; public interface RiskRepository { void saveRiskAll(List items); // void saveRiskHistoryAll(List items); + void saveRiskDetailAll(List items); } diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepositoryImpl.java index 42d431d..e653b88 100644 --- a/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepositoryImpl.java @@ -22,6 +22,9 @@ public class RiskRepositoryImpl extends BaseJdbcRepository imp @Value("${app.batch.target-schema.tables.risk-compliance-001}") private String tableName; + @Value("${app.batch.target-schema.tables.risk-detail-001:tb_ship_risk_detail_info}") + private String riskDetailTableName; + public RiskRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } @@ -140,6 +143,189 @@ public class RiskRepositoryImpl extends BaseJdbcRepository imp return "RiskEntity"; } + private String getRiskDetailInsertSql() { + return """ + INSERT INTO %s.%s( + imo_no, last_mdfcn_dt, + risk_data_maint, + ais_notrcv_elps_days, ais_notrcv_elps_days_desc, + ais_lwrnk_days, ais_lwrnk_days_desc, + ais_up_imo_desc, ais_up_imo_desc_val, + othr_ship_nm_voy_yn, othr_ship_nm_voy_yn_desc, + mmsi_anom_message, mmsi_anom_message_desc, + recent_dark_actv, recent_dark_actv_desc, + port_prtcll, port_prtcll_desc, + port_risk, port_risk_desc, + sts_job, sts_job_desc, + drift_chg, drift_chg_desc, + risk_event, risk_event_desc, risk_event_desc_ext, + ntnlty_chg, ntnlty_chg_desc, + ntnlty_prs_mou_perf, ntnlty_prs_mou_perf_desc, + ntnlty_tky_mou_perf, ntnlty_tky_mou_perf_desc, + ntnlty_uscg_mou_perf, ntnlty_uscg_mou_perf_desc, + uscg_excl_ship_cert, uscg_excl_ship_cert_desc, + psc_inspection_elps_hr, psc_inspection_elps_hr_desc, + psc_inspection, psc_inspection_desc, + psc_defect, psc_defect_desc, + psc_detained, psc_detained_desc, + now_smgrc_evdc, now_smgrc_evdc_desc, + docc_chg, docc_chg_desc, + now_clfic, now_clfic_desc, now_clfic_desc_ext, + clfic_status_chg, clfic_status_chg_desc, + pni_insrnc, pni_insrnc_desc, pni_insrnc_desc_ext, + ship_nm_chg, ship_nm_chg_desc, + gbo_chg, gbo_chg_desc, + vslage, vslage_desc, + ilgl_fshr_viol, ilgl_fshr_viol_desc, + draft_chg, draft_chg_desc, + recent_sanction_prtcll, recent_sanction_prtcll_desc, + sngl_ship_voy, sngl_ship_voy_desc, + fltsfty, fltsfty_desc, + flt_psc, flt_psc_desc, + spc_inspection_ovdue, spc_inspection_ovdue_desc, + ownr_unk, ownr_unk_desc, + rss_port_call, rss_port_call_desc, + rss_ownr_reg, rss_ownr_reg_desc, + rss_sts, rss_sts_desc, + job_execution_id, creatr_id + ) + VALUES ( + ?, ?::timestamptz, -- 1-2: imo_no, last_mdfcn_dt + ?, -- 3: risk_data_maint + ?, ?, -- 4-5: ais_notrcv_elps_days + ?, ?, -- 6-7: ais_lwrnk_days + ?, ?, -- 8-9: ais_up_imo_desc + ?, ?, -- 10-11: othr_ship_nm_voy_yn + ?, ?, -- 12-13: mmsi_anom_message + ?, ?, -- 14-15: recent_dark_actv + ?, ?, -- 16-17: port_prtcll + ?, ?, -- 18-19: port_risk + ?, ?, -- 20-21: sts_job + ?, ?, -- 22-23: drift_chg + ?, ?, ?, -- 24-26: risk_event (+ext) + ?, ?, -- 27-28: ntnlty_chg + ?, ?, -- 29-30: ntnlty_prs_mou_perf + ?, ?, -- 31-32: ntnlty_tky_mou_perf + ?, ?, -- 33-34: ntnlty_uscg_mou_perf + ?, ?, -- 35-36: uscg_excl_ship_cert + ?, ?, -- 37-38: psc_inspection_elps_hr + ?, ?, -- 39-40: psc_inspection + ?, ?, -- 41-42: psc_defect + ?, ?, -- 43-44: psc_detained + ?, ?, -- 45-46: now_smgrc_evdc + ?, ?, -- 47-48: docc_chg + ?, ?, ?, -- 49-51: now_clfic (+ext) + ?, ?, -- 52-53: clfic_status_chg + ?, ?, ?, -- 54-56: pni_insrnc (+ext) + ?, ?, -- 57-58: ship_nm_chg + ?, ?, -- 59-60: gbo_chg + ?, ?, -- 61-62: vslage + ?, ?, -- 63-64: ilgl_fshr_viol + ?, ?, -- 65-66: draft_chg + ?, ?, -- 67-68: recent_sanction_prtcll + ?, ?, -- 69-70: sngl_ship_voy + ?, ?, -- 71-72: fltsfty + ?, ?, -- 73-74: flt_psc + ?, ?, -- 75-76: spc_inspection_ovdue + ?, ?, -- 77-78: ownr_unk + ?, ?, -- 79-80: rss_port_call + ?, ?, -- 81-82: rss_ownr_reg + ?, ?, -- 83-84: rss_sts + ?, ? -- 85-86: job_execution_id, creatr_id + ) + """.formatted(targetSchema, riskDetailTableName); + } + + private void setRiskDetailParameters(PreparedStatement ps, RiskEntity entity) throws Exception { + int idx = 1; + ps.setString(idx++, entity.getLrno()); + ps.setString(idx++, entity.getLastUpdated()); + ps.setObject(idx++, entity.getRiskDataMaintained(), Types.INTEGER); + ps.setObject(idx++, entity.getDaysSinceLastSeenOnAIS(), Types.INTEGER); + ps.setString(idx++, entity.getDaysSinceLastSeenOnAISNarrative()); + ps.setObject(idx++, entity.getDaysUnderAIS(), Types.INTEGER); + ps.setString(idx++, entity.getDaysUnderAISNarrative()); + ps.setObject(idx++, entity.getImoCorrectOnAIS(), Types.INTEGER); + ps.setString(idx++, entity.getImoCorrectOnAISNarrative()); + ps.setObject(idx++, entity.getSailingUnderName(), Types.INTEGER); + ps.setString(idx++, entity.getSailingUnderNameNarrative()); + ps.setObject(idx++, entity.getAnomalousMessagesFromMMSI(), Types.INTEGER); + ps.setString(idx++, entity.getAnomalousMessagesFromMMSINarrative()); + ps.setObject(idx++, entity.getMostRecentDarkActivity(), Types.INTEGER); + ps.setString(idx++, entity.getMostRecentDarkActivityNarrative()); + ps.setObject(idx++, entity.getPortCalls(), Types.INTEGER); + ps.setString(idx++, entity.getPortCallsNarrative()); + ps.setObject(idx++, entity.getPortRisk(), Types.INTEGER); + ps.setString(idx++, entity.getPortRiskNarrative()); + ps.setObject(idx++, entity.getStsOperations(), Types.INTEGER); + ps.setString(idx++, entity.getStsOperationsNarrative()); + ps.setObject(idx++, entity.getDriftingHighSeas(), Types.INTEGER); + ps.setString(idx++, entity.getDriftingHighSeasNarrative()); + ps.setObject(idx++, entity.getRiskEvents(), Types.INTEGER); + ps.setString(idx++, entity.getRiskEventNarrative()); + ps.setString(idx++, entity.getRiskEventNarrativeExtended()); + ps.setObject(idx++, entity.getFlagChanges(), Types.INTEGER); + ps.setString(idx++, entity.getFlagChangeNarrative()); + ps.setObject(idx++, entity.getFlagParisMOUPerformance(), Types.INTEGER); + ps.setString(idx++, entity.getFlagParisMOUPerformanceNarrative()); + ps.setObject(idx++, entity.getFlagTokyoMOUPeformance(), Types.INTEGER); + ps.setString(idx++, entity.getFlagTokyoMOUPeformanceNarrative()); + ps.setObject(idx++, entity.getFlagUSCGMOUPerformance(), Types.INTEGER); + ps.setString(idx++, entity.getFlagUSCGMOUPerformanceNarrative()); + ps.setObject(idx++, entity.getUscgQualship21(), Types.INTEGER); + ps.setString(idx++, entity.getUscgQualship21Narrative()); + ps.setObject(idx++, entity.getTimeSincePSCInspection(), Types.INTEGER); + ps.setString(idx++, entity.getTimeSincePSCInspectionNarrative()); + ps.setObject(idx++, entity.getPscInspections(), Types.INTEGER); + ps.setString(idx++, entity.getPscInspectionNarrative()); + ps.setObject(idx++, entity.getPscDefects(), Types.INTEGER); + ps.setString(idx++, entity.getPscDefectsNarrative()); + ps.setObject(idx++, entity.getPscDetentions(), Types.INTEGER); + ps.setString(idx++, entity.getPscDetentionsNarrative()); + ps.setObject(idx++, entity.getCurrentSMCCertificate(), Types.INTEGER); + ps.setString(idx++, entity.getCurrentSMCCertificateNarrative()); + ps.setObject(idx++, entity.getDocChanges(), Types.INTEGER); + ps.setString(idx++, entity.getDocChangesNarrative()); + ps.setObject(idx++, entity.getCurrentClass(), Types.INTEGER); + ps.setString(idx++, entity.getCurrentClassNarrative()); + ps.setString(idx++, entity.getCurrentClassNarrativeExtended()); + ps.setObject(idx++, entity.getClassStatusChanges(), Types.INTEGER); + ps.setString(idx++, entity.getClassStatusChangesNarrative()); + ps.setObject(idx++, entity.getPandICoverage(), Types.INTEGER); + ps.setString(idx++, entity.getPandICoverageNarrative()); + ps.setString(idx++, entity.getPandICoverageNarrativeExtended()); + ps.setObject(idx++, entity.getNameChanges(), Types.INTEGER); + ps.setString(idx++, entity.getNameChangesNarrative()); + ps.setObject(idx++, entity.getGboChanges(), Types.INTEGER); + ps.setString(idx++, entity.getGboChangesNarrative()); + ps.setObject(idx++, entity.getAgeOfShip(), Types.INTEGER); + ps.setString(idx++, entity.getAgeofShipNarrative()); + ps.setObject(idx++, entity.getIuuFishingViolation(), Types.INTEGER); + ps.setString(idx++, entity.getIuuFishingNarrative()); + ps.setObject(idx++, entity.getDraughtChanges(), Types.INTEGER); + ps.setString(idx++, entity.getDraughtChangesNarrative()); + ps.setObject(idx++, entity.getMostRecentSanctionedPortCall(), Types.INTEGER); + ps.setString(idx++, entity.getMostRecentSanctionedPortCallNarrative()); + ps.setObject(idx++, entity.getSingleShipOperation(), Types.INTEGER); + ps.setString(idx++, entity.getSingleShipOperationNarrative()); + ps.setObject(idx++, entity.getFleetSafety(), Types.INTEGER); + ps.setString(idx++, entity.getFleetSafetyNarrative()); + ps.setObject(idx++, entity.getFleetPSC(), Types.INTEGER); + ps.setString(idx++, entity.getFleetPSCNarrative()); + ps.setObject(idx++, entity.getSpecialSurveyOverdue(), Types.INTEGER); + ps.setString(idx++, entity.getSpecialSurveyOverdueNarrative()); + ps.setObject(idx++, entity.getOwnerUnknown(), Types.INTEGER); + ps.setString(idx++, entity.getOwnerUnknownNarrative()); + ps.setObject(idx++, entity.getRussianPortCall(), Types.INTEGER); + ps.setString(idx++, entity.getRussianPortCallNarrative()); + ps.setObject(idx++, entity.getRussianOwnerRegistration(), Types.INTEGER); + ps.setString(idx++, entity.getRussianOwnerRegistrationNarrative()); + ps.setObject(idx++, entity.getRussianSTS(), Types.INTEGER); + ps.setString(idx++, entity.getRussianSTSNarrative()); + ps.setObject(idx++, entity.getJobExecutionId(), Types.INTEGER); + ps.setString(idx++, entity.getCreatedBy()); + } + @Override public void saveRiskAll(List items) { if (items == null || items.isEmpty()) { @@ -158,4 +344,22 @@ public class RiskRepositoryImpl extends BaseJdbcRepository imp log.info("{} 전체 저장 완료: 수정={} 건", getEntityName(), items.size()); } + @Override + public void saveRiskDetailAll(List items) { + if (items == null || items.isEmpty()) { + return; + } + jdbcTemplate.batchUpdate(getRiskDetailInsertSql(), items, items.size(), + (ps, entity) -> { + try { + setRiskDetailParameters(ps, entity); + } catch (Exception e) { + log.error("RiskDetail 배치 파라미터 설정 실패", e); + throw new RuntimeException(e); + } + }); + + log.info("RiskDetail 전체 저장 완료: 수정={} 건", items.size()); + } + } diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/writer/RiskDetailDataWriter.java b/src/main/java/com/snp/batch/jobs/risk/batch/writer/RiskDetailDataWriter.java new file mode 100644 index 0000000..fba0d13 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/risk/batch/writer/RiskDetailDataWriter.java @@ -0,0 +1,26 @@ +package com.snp.batch.jobs.risk.batch.writer; + +import com.snp.batch.common.batch.writer.BaseWriter; +import com.snp.batch.jobs.risk.batch.entity.RiskEntity; +import com.snp.batch.jobs.risk.batch.repository.RiskRepository; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Slf4j +@Component +public class RiskDetailDataWriter extends BaseWriter { + + private final RiskRepository riskRepository; + + public RiskDetailDataWriter(RiskRepository riskRepository) { + super("riskDetailRepository"); + this.riskRepository = riskRepository; + } + + @Override + protected void writeItems(List items) throws Exception { + riskRepository.saveRiskDetailAll(items); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 46b9dc9..bcd8412 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -145,6 +145,7 @@ app: risk-compliance-001: tb_ship_risk_info risk-compliance-002: tb_ship_compliance_info risk-compliance-003: tb_company_compliance_info + risk-detail-001: tb_ship_risk_detail_info ship-028: ship_detail_hash_json service-schema: name: std_snp_svc @@ -166,6 +167,12 @@ app: # LAST_EXECUTION 버퍼 시간 (시간 단위) - 외부 DB 동기화 지연 대응 last-execution-buffer-hours: 24 + # RiskDetail 배치 설정 + risk-detail: + partition-count: 4 # 병렬 파티션 수 + delay-on-success-ms: 300 # 성공 시 딜레이 (ms) + delay-on-failure-ms: 2000 # 실패 시 딜레이 (ms) + # ShipDetailUpdate 배치 설정 ship-detail-update: batch-size: 10 # API 요청 당 IMO 건수 -- 2.45.2 From 9eda15b552e43d39dad82b940da1144fff534731 Mon Sep 17 00:00:00 2001 From: HYOJIN Date: Tue, 24 Mar 2026 11:02:10 +0900 Subject: [PATCH 2/2] =?UTF-8?q?docs:=20=EB=A6=B4=EB=A6=AC=EC=A6=88=20?= =?UTF-8?q?=EB=85=B8=ED=8A=B8=20=EC=97=85=EB=8D=B0=EC=9D=B4=ED=8A=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/RELEASE-NOTES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/RELEASE-NOTES.md b/docs/RELEASE-NOTES.md index 15a8999..3d30780 100644 --- a/docs/RELEASE-NOTES.md +++ b/docs/RELEASE-NOTES.md @@ -5,6 +5,7 @@ ## [Unreleased] ### 추가 +- Risk 상세 데이터 수집 배치 프로세스 추가 (RisksByImos API, 파티션 병렬 처리) (#65) - 배치 모니터링 React SPA 전환 및 10대 기능 강화 - AIS Target signalKindCode(MDA 범례코드) 치환 로직 추가 - 중국 허가선박 전용 API 및 DB Sync 동적 범위 개선 -- 2.45.2