From 22a3a87145f9e1ebd9aa879b8b73a34363730e46 Mon Sep 17 00:00:00 2001 From: HYOJIN Date: Thu, 26 Mar 2026 13:37:03 +0900 Subject: [PATCH 1/2] =?UTF-8?q?feat(risk):=20Risk=20=EB=8D=B0=EC=9D=B4?= =?UTF-8?q?=ED=84=B0=20=EB=8F=99=EA=B8=B0=ED=99=94=20=EB=8C=80=EC=83=81=20?= =?UTF-8?q?=EB=B3=80=EA=B2=BD=20=EB=B0=8F=20=EA=B0=92=20=EB=B3=80=EA=B2=BD?= =?UTF-8?q?=20=EC=9D=B4=EB=A0=A5=20=EA=B4=80=EB=A6=AC=20(#3)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 테이블 변경: - source: tb_ship_risk_info → tb_ship_risk_detail_info - target: tb_ship_risk_info → tb_ship_risk_detail_info - target history: tb_ship_risk_hstry → tb_ship_risk_detail_hstry - target 추가: tb_ship_risk_detail_info_hstry (indicator 값 변경 이력) Job 분리: - riskDataSyncJob: 데이터 동기화 + 스냅샷 이력 - riskDetailChangeDataSyncJob: 스냅샷 시계열 비교 → 값 변경 이력 RiskDto/Entity: indicator(Integer) + desc(String) 컬럼 추가 (42→83 필드) BaseSyncReader: while 루프로 모든 그룹 연속 로드 (null=Step종료 문제 해결) BatchWriteListener: 청크 내 모든 고유 job_execution_id P→S 업데이트 README.md 추가 Co-Authored-By: Claude Opus 4.6 (1M context) --- README.md | 199 +++++++++++++++ .../common/batch/reader/BaseSyncReader.java | 27 +- .../batch/common/util/BatchWriteListener.java | 36 ++- .../snp/batch/common/util/TableMetaInfo.java | 7 +- .../config/RiskDetailChangeSyncJobConfig.java | 114 +++++++++ .../batch/risk/dto/RiskChangeDto.java | 25 ++ .../jobs/datasync/batch/risk/dto/RiskDto.java | 124 ++++++--- .../batch/risk/entity/RiskChangeEntity.java | 25 ++ .../batch/risk/entity/RiskEntity.java | 124 ++++++--- .../risk/processor/RiskChangeProcessor.java | 21 ++ .../batch/risk/processor/RiskProcessor.java | 42 +++ .../batch/risk/reader/RiskChangeReader.java | 163 ++++++++++++ .../batch/risk/reader/RiskReader.java | 122 ++++++--- .../batch/risk/repository/RiskRepository.java | 6 +- .../risk/repository/RiskRepositoryImpl.java | 239 +++++++++--------- .../batch/risk/repository/RiskSql.java | 182 ++++++++----- .../batch/risk/writer/RiskChangeWriter.java | 23 ++ .../batch/risk/writer/RiskWriter.java | 8 +- src/main/resources/application.yml | 7 +- 19 files changed, 1144 insertions(+), 350 deletions(-) create mode 100644 README.md create mode 100644 src/main/java/com/snp/batch/jobs/datasync/batch/risk/config/RiskDetailChangeSyncJobConfig.java create mode 100644 src/main/java/com/snp/batch/jobs/datasync/batch/risk/dto/RiskChangeDto.java create mode 100644 src/main/java/com/snp/batch/jobs/datasync/batch/risk/entity/RiskChangeEntity.java create mode 100644 src/main/java/com/snp/batch/jobs/datasync/batch/risk/processor/RiskChangeProcessor.java create mode 100644 src/main/java/com/snp/batch/jobs/datasync/batch/risk/reader/RiskChangeReader.java create mode 100644 src/main/java/com/snp/batch/jobs/datasync/batch/risk/writer/RiskChangeWriter.java diff --git a/README.md b/README.md new file mode 100644 index 0000000..92d3885 --- /dev/null +++ b/README.md @@ -0,0 +1,199 @@ +SNP Sync Batch + +S&P Global 해양 데이터를 수집하여 PostgreSQL에 동기화하는 Spring Batch 시스템입니다. + +## 기술 스택 + +| 구분 | 기술 | +|------|------| +| Language | Java 17 | +| Framework | Spring Boot 3.2.1, Spring Batch 5.1.0 | +| Scheduler | Quartz 2.5.0 (JDBC Store) | +| Database | PostgreSQL (dual datasource) | +| Cache | Caffeine | +| Frontend | React + TypeScript (Vite) | +| Build | Maven (frontend-maven-plugin 통합) | +| CI/CD | Gitea Actions → systemd 자동 배포 | + +## 아키텍처 + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ SNP Sync Batch │ +│ │ +│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ +│ │ Quartz │───►│ Reader │───►│ Processor │───►│ Writer │ │ +│ │ Scheduler │ │(BaseSyncR)│ │(DTO→Entity)│ │(SubChunk) │ │ +│ └──────────┘ └────┬─────┘ └──────────┘ └────┬─────┘ │ +│ │ │ │ +│ ┌────────▼────────┐ ┌───────▼───────┐ │ +│ │ Source DB │ │ Target DB │ │ +│ │ (std_snp_data) │ │ (std_snp_svc) │ │ +│ │ batch_flag 관리 │ │ UPSERT 저장 │ │ +│ └─────────────────┘ └───────────────┘ │ +│ │ +│ ┌──────────────────────────────────────────────────────────┐ │ +│ │ React Frontend (포트 8051, /snp-sync) │ │ +│ │ 대시보드 │ 동기화현황 │ 실행이력 │ 스케줄 │ 타임라인 │ │ +│ └──────────────────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────┘ +``` + +## 데이터베이스 구조 + +### Dual Datasource + +| Datasource | 스키마 | 용도 | +|------------|--------|------| +| batchDataSource (Primary) | snp_batch | Spring Batch 메타데이터, Quartz 스케줄 | +| businessDataSource | std_snp_data / std_snp_svc | 비즈니스 데이터 (소스/타겟) | + +### 동기화 대상 테이블 + +| 도메인 | 소스 테이블 수 | 주요 내용 | +|--------|--------------|----------| +| Ship | 25개 | 선박 제원, 이력, 관계 | +| Company | 1개 | 회사 상세 정보 | +| Event | 4개 | 해양 사건/사고 | +| Facility | 1개 | 항구 시설 | +| PSC | 3개 | PSC 검사 | +| Movements | 8개 | 선박 이동 (항적) | +| Code | 2개 | 코드 (선종, 국적) | +| Risk | 1개 | 위험 지표 (+ 값 변경 이력) | +| Compliance | 2개 | 컴플라이언스 (+ 값 변경 이력) | + +## 동기화 프로세스 + +### batch_flag 상태 흐름 + +``` +[N] 대기 ──── Reader ────► [P] 진행 ──── Writer 성공 ────► [S] 완료 + │ │ + │ Writer 실패 시 + │ P 상태 고착 (수동 리셋 필요) + │ + batch_job_execution.status = 'COMPLETED' 인 데이터만 대상 +``` + +### Reader → Writer 흐름 + +``` +BaseSyncReader (while 루프) + │ + ├─ fetchNextGroup(11284): MIN(batch_flag='N') → 데이터 로드 → N→P + ├─ fetchNextGroup(11286): 다음 그룹 연속 로드 → N→P + ├─ fetchNextGroup(11317): 다음 그룹 연속 로드 → N→P + └─ fetchNextGroup(): 데이터 없음 → return null → Step 종료 + │ + ▼ +BaseChunkedWriter (sub-chunk 5000건 단위) + │ + ├─ Sub-Chunk 1: [1~5000건] → 독립 트랜잭션 커밋 + ├─ Sub-Chunk 2: [5001~10000건] → 독립 트랜잭션 커밋 + └─ ... + │ + ▼ +BatchWriteListener.afterWrite() + └─ 청크 내 모든 고유 job_execution_id → P→S 업데이트 +``` + +### 값 변경 이력 관리 (Risk, Compliance) + +데이터 동기화 Job과 값 변경 이력 Job이 분리되어 운영됩니다: + +``` +riskDataSyncJob (데이터 동기화) + Source → Reader → Writer + ├─ UPSERT → tb_ship_risk_detail_info (최신 데이터) + └─ UPSERT → tb_ship_risk_detail_hstry (스냅샷 이력) + +riskDetailChangeDataSyncJob (값 변경 이력) + tb_ship_risk_detail_hstry (스냅샷) + → imo_no별 시계열 비교 + → indicator 컬럼 변경분 감지 + → INSERT → tb_ship_risk_detail_info_hstry (컬럼명, 이전값, 이후값) +``` + +## 배치 Job 목록 + +### 데이터 동기화 Job + +| Job 이름 | 도메인 | 설명 | +|----------|--------|------| +| snpMdaDataSyncJob | Ship + Company | 선박/회사 데이터 동기화 (26 Step) | +| eventDataSyncJob | Event | 사건 데이터 동기화 (4 Step) | +| facilitySyncJob | Facility | 항구 시설 동기화 | +| pscDataSyncJob | PSC | PSC 검사 동기화 (3 Step) | +| anchorageCallDataSyncJob | Movements | 정박지 기항 | +| berthCallDataSyncJob | Movements | 부두 기항 | +| currentlyAtDataSyncJob | Movements | 현재 위치 | +| destinationDataSyncJob | Movements | 목적지 | +| portCallDataSyncJob | Movements | 항구 기항 | +| stsOperationDataSyncJob | Movements | STS 작업 | +| terminalCallDataSyncJob | Movements | 터미널 기항 | +| transitDataSyncJob | Movements | 통과 | +| codeDataSyncJob | Code | 코드 동기화 (2 Step) | +| riskDataSyncJob | Risk | 위험 지표 동기화 | +| shipComplianceDataSyncJob | Compliance | 선박 컴플라이언스 | +| companyComplianceDataSyncJob | Compliance | 회사 컴플라이언스 | + +### 값 변경 이력 Job + +| Job 이름 | 설명 | +|----------|------| +| riskDetailChangeDataSyncJob | Risk indicator 값 변경 이력 | +| shipComplianceChangeDataSyncJob | 선박 컴플라이언스 값 변경 이력 | +| companyComplianceChangeDataSyncJob | 회사 컴플라이언스 값 변경 이력 | + +### 유지보수 Job + +| Job 이름 | 설명 | +|----------|------| +| batchLogCleanupJob | 배치 로그 정리 (90일 보관) | + +## 프론트엔드 + +| 메뉴 | 경로 | 설명 | +|------|------|------| +| 대시보드 | `/` | 통계, 실행 중 작업, 최근 실행/실패 | +| 동기화 현황 | `/sync-status` | 테이블별 N/P/S 건수, 데이터 미리보기, P 리셋 | +| 실행 이력 | `/executions` | 배치 실행 검색, 필터, 상세 | +| 작업 | `/jobs` | Job 목록, 수동 실행 | +| 스케줄 | `/schedules` | Quartz 스케줄 CRUD, Cron 미리보기 | +| 타임라인 | `/schedule-timeline` | 일/주/월별 실행 시각화 | + +## CI/CD + +``` +develop → main 머지 + → Gitea Actions (빌드 + JAR 배포) + → systemd path watcher (.deploy-trigger 감지) + → restart.sh → 서비스 재시작 +``` + +### 배포 확인 + +```bash +# 배포 로그 +cat /devdata/services/snp-sync-batch/backend/deploy.log + +# 서비스 상태 +systemctl status snp-sync-batch + +# 실시간 로그 +journalctl -u snp-sync-batch -n 100 -f +``` + +## 빌드 및 실행 + +```bash +# 개발 환경 실행 +mvn spring-boot:run + +# 패키징 +mvn clean package -DskipTests + +# 서버 포트: 8051 +# Context Path: /snp-sync +# Swagger: /snp-sync/swagger-ui.html +``` diff --git a/src/main/java/com/snp/batch/common/batch/reader/BaseSyncReader.java b/src/main/java/com/snp/batch/common/batch/reader/BaseSyncReader.java index 7e47ecd..1648276 100644 --- a/src/main/java/com/snp/batch/common/batch/reader/BaseSyncReader.java +++ b/src/main/java/com/snp/batch/common/batch/reader/BaseSyncReader.java @@ -30,7 +30,6 @@ public abstract class BaseSyncReader implements protected final JdbcTemplate businessJdbcTemplate; private List allDataBuffer = new ArrayList<>(); - private Long currentGroupId = null; protected BaseSyncReader(DataSource businessDataSource, TableMetaInfo tableMetaInfo) { this.businessJdbcTemplate = new JdbcTemplate(businessDataSource); @@ -53,20 +52,12 @@ public abstract class BaseSyncReader implements @Override public T read() throws Exception { - if (allDataBuffer.isEmpty()) { - // 이전 그룹 처리 완료 → null 반환하여 청크 종료 - // (Writer + afterWrite(P→S)가 실행된 후 다음 청크에서 다음 그룹 로드) - if (currentGroupId != null) { - currentGroupId = null; - return null; - } - - // 다음 그룹 로드 + // buffer가 비어있으면 다음 그룹 로드 (연속) + while (allDataBuffer.isEmpty()) { fetchNextGroup(); - } - - if (allDataBuffer.isEmpty()) { - return null; // 더 이상 처리할 데이터 없음 → Step 종료 + if (allDataBuffer.isEmpty()) { + return null; // 더 이상 처리할 데이터 없음 → Step 종료 + } } return allDataBuffer.remove(0); @@ -78,10 +69,14 @@ public abstract class BaseSyncReader implements nextTargetId = businessJdbcTemplate.queryForObject( CommonSql.getNextTargetQuery(getSourceTable()), Long.class); } catch (Exception e) { + log.warn("[{}] 다음 처리 대상 조회 실패: {}", getLogPrefix(), e.getMessage()); return; } - if (nextTargetId == null) return; + if (nextTargetId == null) { + log.debug("[{}] 더 이상 처리할 데이터 없음", getLogPrefix()); + return; + } log.info("[{}] 다음 처리 대상 ID 발견: {}", getLogPrefix(), nextTargetId); @@ -92,7 +87,5 @@ public abstract class BaseSyncReader implements // N→P 전환 String updateSql = CommonSql.getProcessBatchQuery(getSourceTable()); businessJdbcTemplate.update(updateSql, nextTargetId); - - currentGroupId = nextTargetId; } } diff --git a/src/main/java/com/snp/batch/common/util/BatchWriteListener.java b/src/main/java/com/snp/batch/common/util/BatchWriteListener.java index cc4ffbc..9489a3c 100644 --- a/src/main/java/com/snp/batch/common/util/BatchWriteListener.java +++ b/src/main/java/com/snp/batch/common/util/BatchWriteListener.java @@ -5,10 +5,12 @@ import org.springframework.batch.core.ItemWriteListener; import org.springframework.batch.item.Chunk; import org.springframework.jdbc.core.JdbcTemplate; +import java.util.Set; +import java.util.stream.Collectors; + /** * Writer 성공 후 batch_flag P→S 업데이트 리스너 - * - * SQL은 실행 시점에 생성 (CommonSql.SOURCE_SCHEMA 초기화 보장) + * 청크 내 모든 고유 job_execution_id에 대해 S 업데이트 */ @Slf4j public class BatchWriteListener implements ItemWriteListener { @@ -25,25 +27,31 @@ public class BatchWriteListener implements Item public void afterWrite(Chunk items) { if (items.isEmpty()) return; - Long jobExecutionId = items.getItems().get(0).getJobExecutionId(); + String sql = CommonSql.getCompleteBatchQuery(sourceTable); - try { - // SQL을 실행 시점에 생성하여 SOURCE_SCHEMA null 문제 방지 - String sql = CommonSql.getCompleteBatchQuery(sourceTable); - int updatedRows = businessJdbcTemplate.update(sql, jobExecutionId); - log.info("[BatchWriteListener] Success update 'S'. jobExecutionId: {}, rows: {}", jobExecutionId, updatedRows); - } catch (Exception e) { - log.error("[BatchWriteListener] Update 'S' failed. jobExecutionId: {}", jobExecutionId, e); - throw e; + Set jobExecutionIds = items.getItems().stream() + .map(JobExecutionGroupable::getJobExecutionId) + .collect(Collectors.toSet()); + + for (Long jobExecutionId : jobExecutionIds) { + try { + int updatedRows = businessJdbcTemplate.update(sql, jobExecutionId); + log.info("[BatchWriteListener] Success update 'S'. jobExecutionId: {}, rows: {}", jobExecutionId, updatedRows); + } catch (Exception e) { + log.error("[BatchWriteListener] Update 'S' failed. jobExecutionId: {}", jobExecutionId, e); + throw e; + } } } @Override public void onWriteError(Exception exception, Chunk items) { if (!items.isEmpty()) { - Long jobExecutionId = items.getItems().get(0).getJobExecutionId(); - log.error("[BatchWriteListener] Write Error Detected! jobExecutionId: {}. Status will NOT be updated to 'S'. Error: {}", - jobExecutionId, exception.getMessage()); + Set ids = items.getItems().stream() + .map(JobExecutionGroupable::getJobExecutionId) + .collect(Collectors.toSet()); + log.error("[BatchWriteListener] Write Error Detected! jobExecutionIds: {}. Status will NOT be updated to 'S'. Error: {}", + ids, exception.getMessage()); } if (exception instanceof RuntimeException) { diff --git a/src/main/java/com/snp/batch/common/util/TableMetaInfo.java b/src/main/java/com/snp/batch/common/util/TableMetaInfo.java index 1bc9bfc..3b608b6 100644 --- a/src/main/java/com/snp/batch/common/util/TableMetaInfo.java +++ b/src/main/java/com/snp/batch/common/util/TableMetaInfo.java @@ -309,10 +309,13 @@ public class TableMetaInfo { // Risk & Compliance Tables @Value("${app.batch.target-schema.tables.risk-compliance-001}") - public String targetTbShipRiskInfo; + public String targetTbShipRiskDetailInfo; + + @Value("${app.batch.target-schema.tables.risk-compliance-001-1}") + public String targetTbShipRiskDetailInfoHstry; @Value("${app.batch.target-schema.tables.risk-compliance-002}") - public String targetTbShipRiskHstry; + public String targetTbShipRiskDetailHstry; @Value("${app.batch.target-schema.tables.risk-compliance-003}") public String targetTbShipComplianceInfo; diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/config/RiskDetailChangeSyncJobConfig.java b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/config/RiskDetailChangeSyncJobConfig.java new file mode 100644 index 0000000..7d2b5e4 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/config/RiskDetailChangeSyncJobConfig.java @@ -0,0 +1,114 @@ +package com.snp.batch.jobs.datasync.batch.risk.config; + +import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.common.util.TableMetaInfo; +import com.snp.batch.jobs.datasync.batch.risk.dto.RiskChangeDto; +import com.snp.batch.jobs.datasync.batch.risk.entity.RiskChangeEntity; +import com.snp.batch.jobs.datasync.batch.risk.processor.RiskChangeProcessor; +import com.snp.batch.jobs.datasync.batch.risk.reader.RiskChangeReader; +import com.snp.batch.jobs.datasync.batch.risk.repository.RiskRepository; +import com.snp.batch.jobs.datasync.batch.risk.writer.RiskChangeWriter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; + +import javax.sql.DataSource; + +/** + * Risk 값 변경 이력 관리 Job + * tb_ship_risk_detail_hstry (스냅샷)에서 시계열 비교 → tb_ship_risk_detail_info_hstry에 변경분 저장 + */ +@Slf4j +@Configuration +public class RiskDetailChangeSyncJobConfig extends BaseJobConfig { + + private final TableMetaInfo tableMetaInfo; + private final RiskRepository riskRepository; + private final DataSource batchDataSource; + private final String targetSchema; + + private static final int CHUNK_SIZE = 1000; + + public RiskDetailChangeSyncJobConfig( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + RiskRepository riskRepository, + TableMetaInfo tableMetaInfo, + @Qualifier("batchDataSource") DataSource batchDataSource, + @Value("${app.batch.target-schema.name}") String targetSchema) { + super(jobRepository, transactionManager); + this.riskRepository = riskRepository; + this.tableMetaInfo = tableMetaInfo; + this.batchDataSource = batchDataSource; + this.targetSchema = targetSchema; + } + + @Override + protected String getJobName() { + return "riskDetailChangeDataSyncJob"; + } + + @Override + protected String getStepName() { + return "riskDetailChangeSyncStep"; + } + + @Override + protected ItemReader createReader() { + return riskChangeReader(batchDataSource, tableMetaInfo, targetSchema); + } + + @Override + protected ItemProcessor createProcessor() { + return new RiskChangeProcessor(); + } + + @Override + protected ItemWriter createWriter() { + return new RiskChangeWriter(riskRepository); + } + + @Bean + @StepScope + public ItemReader riskChangeReader( + @Qualifier("batchDataSource") DataSource batchDataSource, + TableMetaInfo tableMetaInfo, + @Value("${app.batch.target-schema.name}") String targetSchema) { + return new RiskChangeReader(batchDataSource, tableMetaInfo, targetSchema); + } + + @Bean(name = "riskDetailChangeSyncStep") + public Step riskDetailChangeSyncStep() { + log.info("Step 생성: riskDetailChangeSyncStep"); + return new StepBuilder(getStepName(), jobRepository) + .chunk(CHUNK_SIZE, transactionManager) + .reader(createReader()) + .processor(createProcessor()) + .writer(createWriter()) + .build(); + } + + @Override + protected Job createJobFlow(JobBuilder jobBuilder) { + return jobBuilder + .start(riskDetailChangeSyncStep()) + .build(); + } + + @Bean(name = "riskDetailChangeDataSyncJob") + public Job riskDetailChangeDataSyncJob() { + return job(); + } +} diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/dto/RiskChangeDto.java b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/dto/RiskChangeDto.java new file mode 100644 index 0000000..af5dd98 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/dto/RiskChangeDto.java @@ -0,0 +1,25 @@ +package com.snp.batch.jobs.datasync.batch.risk.dto; + +import com.snp.batch.common.util.JobExecutionGroupable; +import lombok.*; + +import java.time.LocalDateTime; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class RiskChangeDto implements JobExecutionGroupable { + private Long jobExecutionId; + private String imoNo; + private LocalDateTime lastMdfcnDt; + private String flctnColNm; + private String bfrVal; + private String aftrVal; + + @Override + public Long getJobExecutionId() { + return this.jobExecutionId; + } +} diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/dto/RiskDto.java b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/dto/RiskDto.java index 5479872..acd7b6a 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/dto/RiskDto.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/dto/RiskDto.java @@ -1,6 +1,8 @@ package com.snp.batch.jobs.datasync.batch.risk.dto; + import com.snp.batch.common.util.JobExecutionGroupable; import lombok.*; + import java.time.LocalDateTime; @Getter @@ -12,46 +14,88 @@ public class RiskDto implements JobExecutionGroupable { private Long jobExecutionId; private String imoNo; private LocalDateTime lastMdfcnDt; - private String riskDataMaint; - private String aisNotrcvElpsDays; - private String aisLwrnkDays; - private String aisUpImoDesc; - private String othrShipNmVoyYn; - private String mmsiAnomMessage; - private String recentDarkActv; - private String portPrtcll; - private String portRisk; - private String stsJob; - private String driftChg; - private String riskEvent; - private String ntnltyChg; - private String ntnltyPrsMouPerf; - private String ntnltyTkyMouPerf; - private String ntnltyUscgMouPerf; - private String uscgExclShipCert; - private String pscInspectionElpsHr; - private String pscInspection; - private String pscDefect; - private String pscDetained; - private String nowSmgrcEvdc; - private String doccChg; - private String nowClfic; - private String clficStatusChg; - private String pniInsrnc; - private String shipNmChg; - private String gboChg; - private String vslage; - private String ilglFshrViol; - private String draftChg; - private String recentSanctionPrtcll; - private String snglShipVoy; - private String fltsfty; - private String fltPsc; - private String spcInspectionOvdue; - private String ownrUnk; - private String rssPortCall; - private String rssOwnrReg; - private String rssSts; + private Integer riskDataMaint; + private Integer aisNotrcvElpsDays; + private String aisNotrcvElpsDaysDesc; + private Integer aisLwrnkDays; + private String aisLwrnkDaysDesc; + private Integer aisUpImoDesc; + private String aisUpImoDescVal; + private Integer othrShipNmVoyYn; + private String othrShipNmVoyYnDesc; + private Integer mmsiAnomMessage; + private String mmsiAnomMessageDesc; + private Integer recentDarkActv; + private String recentDarkActvDesc; + private Integer portPrtcll; + private String portPrtcllDesc; + private Integer portRisk; + private String portRiskDesc; + private Integer stsJob; + private String stsJobDesc; + private Integer driftChg; + private String driftChgDesc; + private Integer riskEvent; + private String riskEventDesc; + private String riskEventDescExt; + private Integer ntnltyChg; + private String ntnltyChgDesc; + private Integer ntnltyPrsMouPerf; + private String ntnltyPrsMouPerfDesc; + private Integer ntnltyTkyMouPerf; + private String ntnltyTkyMouPerfDesc; + private Integer ntnltyUscgMouPerf; + private String ntnltyUscgMouPerfDesc; + private Integer uscgExclShipCert; + private String uscgExclShipCertDesc; + private Integer pscInspectionElpsHr; + private String pscInspectionElpsHrDesc; + private Integer pscInspection; + private String pscInspectionDesc; + private Integer pscDefect; + private String pscDefectDesc; + private Integer pscDetained; + private String pscDetainedDesc; + private Integer nowSmgrcEvdc; + private String nowSmgrcEvdcDesc; + private Integer doccChg; + private String doccChgDesc; + private Integer nowClfic; + private String nowClficDesc; + private String nowClficDescExt; + private Integer clficStatusChg; + private String clficStatusChgDesc; + private Integer pniInsrnc; + private String pniInsrncDesc; + private String pniInsrncDescExt; + private Integer shipNmChg; + private String shipNmChgDesc; + private Integer gboChg; + private String gboChgDesc; + private Integer vslage; + private String vslageDesc; + private Integer ilglFshrViol; + private String ilglFshrViolDesc; + private Integer draftChg; + private String draftChgDesc; + private Integer recentSanctionPrtcll; + private String recentSanctionPrtcllDesc; + private Integer snglShipVoy; + private String snglShipVoyDesc; + private Integer fltsfty; + private String fltsftyDesc; + private Integer fltPsc; + private String fltPscDesc; + private Integer spcInspectionOvdue; + private String spcInspectionOvdueDesc; + private Integer ownrUnk; + private String ownrUnkDesc; + private Integer rssPortCall; + private String rssPortCallDesc; + private Integer rssOwnrReg; + private String rssOwnrRegDesc; + private Integer rssSts; + private String rssStsDesc; @Override public Long getJobExecutionId() { diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/entity/RiskChangeEntity.java b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/entity/RiskChangeEntity.java new file mode 100644 index 0000000..f8efe75 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/entity/RiskChangeEntity.java @@ -0,0 +1,25 @@ +package com.snp.batch.jobs.datasync.batch.risk.entity; + +import com.snp.batch.common.util.JobExecutionGroupable; +import lombok.*; +import lombok.experimental.SuperBuilder; + +import java.time.LocalDateTime; + +@Data +@SuperBuilder +@AllArgsConstructor +public class RiskChangeEntity implements JobExecutionGroupable { + private String imoNo; + private LocalDateTime lastMdfcnDt; + private String flctnColNm; + private String bfrVal; + private String aftrVal; + + private Long jobExecutionId; + + @Override + public Long getJobExecutionId() { + return this.jobExecutionId; + } +} diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/entity/RiskEntity.java b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/entity/RiskEntity.java index 5ccb1c4..04ae397 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/entity/RiskEntity.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/entity/RiskEntity.java @@ -1,7 +1,9 @@ package com.snp.batch.jobs.datasync.batch.risk.entity; + import com.snp.batch.common.util.JobExecutionGroupable; import lombok.*; import lombok.experimental.SuperBuilder; + import java.time.LocalDateTime; @Data @@ -10,46 +12,88 @@ import java.time.LocalDateTime; public class RiskEntity implements JobExecutionGroupable { private String imoNo; private LocalDateTime lastMdfcnDt; - private String riskDataMaint; - private String aisNotrcvElpsDays; - private String aisLwrnkDays; - private String aisUpImoDesc; - private String othrShipNmVoyYn; - private String mmsiAnomMessage; - private String recentDarkActv; - private String portPrtcll; - private String portRisk; - private String stsJob; - private String driftChg; - private String riskEvent; - private String ntnltyChg; - private String ntnltyPrsMouPerf; - private String ntnltyTkyMouPerf; - private String ntnltyUscgMouPerf; - private String uscgExclShipCert; - private String pscInspectionElpsHr; - private String pscInspection; - private String pscDefect; - private String pscDetained; - private String nowSmgrcEvdc; - private String doccChg; - private String nowClfic; - private String clficStatusChg; - private String pniInsrnc; - private String shipNmChg; - private String gboChg; - private String vslage; - private String ilglFshrViol; - private String draftChg; - private String recentSanctionPrtcll; - private String snglShipVoy; - private String fltsfty; - private String fltPsc; - private String spcInspectionOvdue; - private String ownrUnk; - private String rssPortCall; - private String rssOwnrReg; - private String rssSts; + private Integer riskDataMaint; + private Integer aisNotrcvElpsDays; + private String aisNotrcvElpsDaysDesc; + private Integer aisLwrnkDays; + private String aisLwrnkDaysDesc; + private Integer aisUpImoDesc; + private String aisUpImoDescVal; + private Integer othrShipNmVoyYn; + private String othrShipNmVoyYnDesc; + private Integer mmsiAnomMessage; + private String mmsiAnomMessageDesc; + private Integer recentDarkActv; + private String recentDarkActvDesc; + private Integer portPrtcll; + private String portPrtcllDesc; + private Integer portRisk; + private String portRiskDesc; + private Integer stsJob; + private String stsJobDesc; + private Integer driftChg; + private String driftChgDesc; + private Integer riskEvent; + private String riskEventDesc; + private String riskEventDescExt; + private Integer ntnltyChg; + private String ntnltyChgDesc; + private Integer ntnltyPrsMouPerf; + private String ntnltyPrsMouPerfDesc; + private Integer ntnltyTkyMouPerf; + private String ntnltyTkyMouPerfDesc; + private Integer ntnltyUscgMouPerf; + private String ntnltyUscgMouPerfDesc; + private Integer uscgExclShipCert; + private String uscgExclShipCertDesc; + private Integer pscInspectionElpsHr; + private String pscInspectionElpsHrDesc; + private Integer pscInspection; + private String pscInspectionDesc; + private Integer pscDefect; + private String pscDefectDesc; + private Integer pscDetained; + private String pscDetainedDesc; + private Integer nowSmgrcEvdc; + private String nowSmgrcEvdcDesc; + private Integer doccChg; + private String doccChgDesc; + private Integer nowClfic; + private String nowClficDesc; + private String nowClficDescExt; + private Integer clficStatusChg; + private String clficStatusChgDesc; + private Integer pniInsrnc; + private String pniInsrncDesc; + private String pniInsrncDescExt; + private Integer shipNmChg; + private String shipNmChgDesc; + private Integer gboChg; + private String gboChgDesc; + private Integer vslage; + private String vslageDesc; + private Integer ilglFshrViol; + private String ilglFshrViolDesc; + private Integer draftChg; + private String draftChgDesc; + private Integer recentSanctionPrtcll; + private String recentSanctionPrtcllDesc; + private Integer snglShipVoy; + private String snglShipVoyDesc; + private Integer fltsfty; + private String fltsftyDesc; + private Integer fltPsc; + private String fltPscDesc; + private Integer spcInspectionOvdue; + private String spcInspectionOvdueDesc; + private Integer ownrUnk; + private String ownrUnkDesc; + private Integer rssPortCall; + private String rssPortCallDesc; + private Integer rssOwnrReg; + private String rssOwnrRegDesc; + private Integer rssSts; + private String rssStsDesc; private Long jobExecutionId; diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/processor/RiskChangeProcessor.java b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/processor/RiskChangeProcessor.java new file mode 100644 index 0000000..6f4efdf --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/processor/RiskChangeProcessor.java @@ -0,0 +1,21 @@ +package com.snp.batch.jobs.datasync.batch.risk.processor; + +import com.snp.batch.common.batch.processor.BaseProcessor; +import com.snp.batch.jobs.datasync.batch.risk.dto.RiskChangeDto; +import com.snp.batch.jobs.datasync.batch.risk.entity.RiskChangeEntity; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class RiskChangeProcessor extends BaseProcessor { + @Override + protected RiskChangeEntity processItem(RiskChangeDto dto) throws Exception { + return RiskChangeEntity.builder() + .jobExecutionId(dto.getJobExecutionId()) + .imoNo(dto.getImoNo()) + .lastMdfcnDt(dto.getLastMdfcnDt()) + .flctnColNm(dto.getFlctnColNm()) + .bfrVal(dto.getBfrVal()) + .aftrVal(dto.getAftrVal()) + .build(); + } +} diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/processor/RiskProcessor.java b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/processor/RiskProcessor.java index 2e5c3dd..f5c6aa8 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/processor/RiskProcessor.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/processor/RiskProcessor.java @@ -15,44 +15,86 @@ public class RiskProcessor extends BaseProcessor { .lastMdfcnDt(dto.getLastMdfcnDt()) .riskDataMaint(dto.getRiskDataMaint()) .aisNotrcvElpsDays(dto.getAisNotrcvElpsDays()) + .aisNotrcvElpsDaysDesc(dto.getAisNotrcvElpsDaysDesc()) .aisLwrnkDays(dto.getAisLwrnkDays()) + .aisLwrnkDaysDesc(dto.getAisLwrnkDaysDesc()) .aisUpImoDesc(dto.getAisUpImoDesc()) + .aisUpImoDescVal(dto.getAisUpImoDescVal()) .othrShipNmVoyYn(dto.getOthrShipNmVoyYn()) + .othrShipNmVoyYnDesc(dto.getOthrShipNmVoyYnDesc()) .mmsiAnomMessage(dto.getMmsiAnomMessage()) + .mmsiAnomMessageDesc(dto.getMmsiAnomMessageDesc()) .recentDarkActv(dto.getRecentDarkActv()) + .recentDarkActvDesc(dto.getRecentDarkActvDesc()) .portPrtcll(dto.getPortPrtcll()) + .portPrtcllDesc(dto.getPortPrtcllDesc()) .portRisk(dto.getPortRisk()) + .portRiskDesc(dto.getPortRiskDesc()) .stsJob(dto.getStsJob()) + .stsJobDesc(dto.getStsJobDesc()) .driftChg(dto.getDriftChg()) + .driftChgDesc(dto.getDriftChgDesc()) .riskEvent(dto.getRiskEvent()) + .riskEventDesc(dto.getRiskEventDesc()) + .riskEventDescExt(dto.getRiskEventDescExt()) .ntnltyChg(dto.getNtnltyChg()) + .ntnltyChgDesc(dto.getNtnltyChgDesc()) .ntnltyPrsMouPerf(dto.getNtnltyPrsMouPerf()) + .ntnltyPrsMouPerfDesc(dto.getNtnltyPrsMouPerfDesc()) .ntnltyTkyMouPerf(dto.getNtnltyTkyMouPerf()) + .ntnltyTkyMouPerfDesc(dto.getNtnltyTkyMouPerfDesc()) .ntnltyUscgMouPerf(dto.getNtnltyUscgMouPerf()) + .ntnltyUscgMouPerfDesc(dto.getNtnltyUscgMouPerfDesc()) .uscgExclShipCert(dto.getUscgExclShipCert()) + .uscgExclShipCertDesc(dto.getUscgExclShipCertDesc()) .pscInspectionElpsHr(dto.getPscInspectionElpsHr()) + .pscInspectionElpsHrDesc(dto.getPscInspectionElpsHrDesc()) .pscInspection(dto.getPscInspection()) + .pscInspectionDesc(dto.getPscInspectionDesc()) .pscDefect(dto.getPscDefect()) + .pscDefectDesc(dto.getPscDefectDesc()) .pscDetained(dto.getPscDetained()) + .pscDetainedDesc(dto.getPscDetainedDesc()) .nowSmgrcEvdc(dto.getNowSmgrcEvdc()) + .nowSmgrcEvdcDesc(dto.getNowSmgrcEvdcDesc()) .doccChg(dto.getDoccChg()) + .doccChgDesc(dto.getDoccChgDesc()) .nowClfic(dto.getNowClfic()) + .nowClficDesc(dto.getNowClficDesc()) + .nowClficDescExt(dto.getNowClficDescExt()) .clficStatusChg(dto.getClficStatusChg()) + .clficStatusChgDesc(dto.getClficStatusChgDesc()) .pniInsrnc(dto.getPniInsrnc()) + .pniInsrncDesc(dto.getPniInsrncDesc()) + .pniInsrncDescExt(dto.getPniInsrncDescExt()) .shipNmChg(dto.getShipNmChg()) + .shipNmChgDesc(dto.getShipNmChgDesc()) .gboChg(dto.getGboChg()) + .gboChgDesc(dto.getGboChgDesc()) .vslage(dto.getVslage()) + .vslageDesc(dto.getVslageDesc()) .ilglFshrViol(dto.getIlglFshrViol()) + .ilglFshrViolDesc(dto.getIlglFshrViolDesc()) .draftChg(dto.getDraftChg()) + .draftChgDesc(dto.getDraftChgDesc()) .recentSanctionPrtcll(dto.getRecentSanctionPrtcll()) + .recentSanctionPrtcllDesc(dto.getRecentSanctionPrtcllDesc()) .snglShipVoy(dto.getSnglShipVoy()) + .snglShipVoyDesc(dto.getSnglShipVoyDesc()) .fltsfty(dto.getFltsfty()) + .fltsftyDesc(dto.getFltsftyDesc()) .fltPsc(dto.getFltPsc()) + .fltPscDesc(dto.getFltPscDesc()) .spcInspectionOvdue(dto.getSpcInspectionOvdue()) + .spcInspectionOvdueDesc(dto.getSpcInspectionOvdueDesc()) .ownrUnk(dto.getOwnrUnk()) + .ownrUnkDesc(dto.getOwnrUnkDesc()) .rssPortCall(dto.getRssPortCall()) + .rssPortCallDesc(dto.getRssPortCallDesc()) .rssOwnrReg(dto.getRssOwnrReg()) + .rssOwnrRegDesc(dto.getRssOwnrRegDesc()) .rssSts(dto.getRssSts()) + .rssStsDesc(dto.getRssStsDesc()) .build(); } } diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/reader/RiskChangeReader.java b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/reader/RiskChangeReader.java new file mode 100644 index 0000000..9121a1e --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/reader/RiskChangeReader.java @@ -0,0 +1,163 @@ +package com.snp.batch.jobs.datasync.batch.risk.reader; + +import com.snp.batch.common.util.TableMetaInfo; +import com.snp.batch.jobs.datasync.batch.risk.dto.RiskChangeDto; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.item.ItemReader; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.jdbc.core.JdbcTemplate; + +import javax.sql.DataSource; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.util.*; + +/** + * Risk 값 변경 이력 Reader + * tb_ship_risk_detail_hstry (스냅샷 이력)에서 imo_no별 시계열 비교하여 변경분 감지 + */ +@Slf4j +public class RiskChangeReader implements ItemReader { + + private final TableMetaInfo tableMetaInfo; + private final JdbcTemplate batchJdbcTemplate; + private final String targetSchema; + private List changeBuffer = new ArrayList<>(); + private boolean initialized = false; + + private static final List INDICATOR_COLUMNS = List.of( + "ais_notrcv_elps_days", "ais_lwrnk_days", "ais_up_imo_desc", + "othr_ship_nm_voy_yn", "mmsi_anom_message", "recent_dark_actv", + "port_prtcll", "port_risk", "sts_job", "drift_chg", + "risk_event", "ntnlty_chg", "ntnlty_prs_mou_perf", + "ntnlty_tky_mou_perf", "ntnlty_uscg_mou_perf", "uscg_excl_ship_cert", + "psc_inspection_elps_hr", "psc_inspection", "psc_defect", "psc_detained", + "now_smgrc_evdc", "docc_chg", "now_clfic", "clfic_status_chg", + "pni_insrnc", "ship_nm_chg", "gbo_chg", "vslage", + "ilgl_fshr_viol", "draft_chg", "recent_sanction_prtcll", "sngl_ship_voy", + "fltsfty", "flt_psc", "spc_inspection_ovdue", "ownr_unk", + "rss_port_call", "rss_ownr_reg", "rss_sts" + ); + + public RiskChangeReader(@Qualifier("batchDataSource") DataSource batchDataSource, + TableMetaInfo tableMetaInfo, + @Value("${app.batch.target-schema.name}") String targetSchema) { + this.batchJdbcTemplate = new JdbcTemplate(batchDataSource); + this.tableMetaInfo = tableMetaInfo; + this.targetSchema = targetSchema; + } + + @Override + public RiskChangeDto read() throws Exception { + if (!initialized) { + initializeChangeBuffer(); + initialized = true; + } + if (changeBuffer.isEmpty()) return null; + return changeBuffer.remove(0); + } + + private void initializeChangeBuffer() { + log.info("[RiskChangeReader] 변경 이력 감지 시작"); + + // 스냅샷 이력에서 indicator 컬럼만 조회 (imo_no, last_mdfcn_dt 순 정렬) + String columnList = String.join(", ", INDICATOR_COLUMNS); + String sql = String.format(""" + SELECT imo_no, last_mdfcn_dt, %s + FROM %s.%s + ORDER BY imo_no, last_mdfcn_dt ASC + """, columnList, targetSchema, tableMetaInfo.targetTbShipRiskDetailHstry); + + List> allData = batchJdbcTemplate.queryForList(sql); + log.info("[RiskChangeReader] 스냅샷 이력 조회 완료: {} 건", allData.size()); + + // 기존 변경 이력 조회 (중복 방지) + String existingSql = String.format(""" + SELECT imo_no, last_mdfcn_dt, flctn_col_nm + FROM %s.%s + """, targetSchema, tableMetaInfo.targetTbShipRiskDetailInfoHstry); + + Set existingKeys = new HashSet<>(); + try { + List> existingData = batchJdbcTemplate.queryForList(existingSql); + for (Map row : existingData) { + existingKeys.add(buildExistingKey(row)); + } + log.info("[RiskChangeReader] 기존 변경 이력 데이터: {} 건", existingKeys.size()); + } catch (Exception e) { + log.warn("[RiskChangeReader] 기존 데이터 조회 실패: {}", e.getMessage()); + } + + // imo_no별 그룹핑 + Map>> groupedByImo = new LinkedHashMap<>(); + for (Map row : allData) { + String imoNo = (String) row.get("imo_no"); + groupedByImo.computeIfAbsent(imoNo, k -> new ArrayList<>()).add(row); + } + + // 각 imo_no별 시계열 비교 + long changeCount = 0; + for (Map.Entry>> entry : groupedByImo.entrySet()) { + String imoNo = entry.getKey(); + List> records = entry.getValue(); + + Map previousRecord = null; + for (Map currentRecord : records) { + if (previousRecord != null) { + List changes = detectChanges(imoNo, previousRecord, currentRecord, existingKeys); + changeBuffer.addAll(changes); + changeCount += changes.size(); + } + previousRecord = currentRecord; + } + } + + log.info("[RiskChangeReader] 변경 이력 감지 완료: {} 건 (imo_no 그룹: {} 개)", changeCount, groupedByImo.size()); + } + + private List detectChanges(String imoNo, + Map previousRecord, + Map currentRecord, + Set existingKeys) { + List changes = new ArrayList<>(); + Timestamp currentTs = (Timestamp) currentRecord.get("last_mdfcn_dt"); + LocalDateTime lastMdfcnDt = currentTs != null ? currentTs.toLocalDateTime() : null; + + for (String column : INDICATOR_COLUMNS) { + Object prevValue = previousRecord.get(column); + Object currValue = currentRecord.get(column); + + if (!Objects.equals(prevValue, currValue)) { + String existingKey = imoNo + "|" + lastMdfcnDt + "|" + column; + if (existingKeys.contains(existingKey)) continue; + + changes.add(RiskChangeDto.builder() + .jobExecutionId(0L) + .imoNo(imoNo) + .lastMdfcnDt(lastMdfcnDt) + .flctnColNm(column) + .bfrVal(convertToString(prevValue)) + .aftrVal(convertToString(currValue)) + .build()); + } + } + return changes; + } + + private String buildExistingKey(Map row) { + String imoNo = (String) row.get("imo_no"); + Object lastMdfcnDtObj = row.get("last_mdfcn_dt"); + LocalDateTime lastMdfcnDt = null; + if (lastMdfcnDtObj instanceof Timestamp) { + lastMdfcnDt = ((Timestamp) lastMdfcnDtObj).toLocalDateTime(); + } + String flctnColNm = (String) row.get("flctn_col_nm"); + return imoNo + "|" + lastMdfcnDt + "|" + flctnColNm; + } + + private String convertToString(Object value) { + if (value == null) return "null"; + return String.valueOf(value); + } +} diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/reader/RiskReader.java b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/reader/RiskReader.java index 59ad094..f0ad5cb 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/reader/RiskReader.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/reader/RiskReader.java @@ -31,46 +31,88 @@ public class RiskReader extends BaseSyncReader { .jobExecutionId(targetId) .imoNo(rs.getString("imo_no")) .lastMdfcnDt(lastMdfcnDtTs != null ? lastMdfcnDtTs.toLocalDateTime() : null) - .riskDataMaint(rs.getString("risk_data_maint")) - .aisNotrcvElpsDays(rs.getString("ais_notrcv_elps_days")) - .aisLwrnkDays(rs.getString("ais_lwrnk_days")) - .aisUpImoDesc(rs.getString("ais_up_imo_desc")) - .othrShipNmVoyYn(rs.getString("othr_ship_nm_voy_yn")) - .mmsiAnomMessage(rs.getString("mmsi_anom_message")) - .recentDarkActv(rs.getString("recent_dark_actv")) - .portPrtcll(rs.getString("port_prtcll")) - .portRisk(rs.getString("port_risk")) - .stsJob(rs.getString("sts_job")) - .driftChg(rs.getString("drift_chg")) - .riskEvent(rs.getString("risk_event")) - .ntnltyChg(rs.getString("ntnlty_chg")) - .ntnltyPrsMouPerf(rs.getString("ntnlty_prs_mou_perf")) - .ntnltyTkyMouPerf(rs.getString("ntnlty_tky_mou_perf")) - .ntnltyUscgMouPerf(rs.getString("ntnlty_uscg_mou_perf")) - .uscgExclShipCert(rs.getString("uscg_excl_ship_cert")) - .pscInspectionElpsHr(rs.getString("psc_inspection_elps_hr")) - .pscInspection(rs.getString("psc_inspection")) - .pscDefect(rs.getString("psc_defect")) - .pscDetained(rs.getString("psc_detained")) - .nowSmgrcEvdc(rs.getString("now_smgrc_evdc")) - .doccChg(rs.getString("docc_chg")) - .nowClfic(rs.getString("now_clfic")) - .clficStatusChg(rs.getString("clfic_status_chg")) - .pniInsrnc(rs.getString("pni_insrnc")) - .shipNmChg(rs.getString("ship_nm_chg")) - .gboChg(rs.getString("gbo_chg")) - .vslage(rs.getString("vslage")) - .ilglFshrViol(rs.getString("ilgl_fshr_viol")) - .draftChg(rs.getString("draft_chg")) - .recentSanctionPrtcll(rs.getString("recent_sanction_prtcll")) - .snglShipVoy(rs.getString("sngl_ship_voy")) - .fltsfty(rs.getString("fltsfty")) - .fltPsc(rs.getString("flt_psc")) - .spcInspectionOvdue(rs.getString("spc_inspection_ovdue")) - .ownrUnk(rs.getString("ownr_unk")) - .rssPortCall(rs.getString("rss_port_call")) - .rssOwnrReg(rs.getString("rss_ownr_reg")) - .rssSts(rs.getString("rss_sts")) + .riskDataMaint(rs.getObject("risk_data_maint") != null ? rs.getInt("risk_data_maint") : null) + .aisNotrcvElpsDays(rs.getObject("ais_notrcv_elps_days") != null ? rs.getInt("ais_notrcv_elps_days") : null) + .aisNotrcvElpsDaysDesc(rs.getString("ais_notrcv_elps_days_desc")) + .aisLwrnkDays(rs.getObject("ais_lwrnk_days") != null ? rs.getInt("ais_lwrnk_days") : null) + .aisLwrnkDaysDesc(rs.getString("ais_lwrnk_days_desc")) + .aisUpImoDesc(rs.getObject("ais_up_imo_desc") != null ? rs.getInt("ais_up_imo_desc") : null) + .aisUpImoDescVal(rs.getString("ais_up_imo_desc_val")) + .othrShipNmVoyYn(rs.getObject("othr_ship_nm_voy_yn") != null ? rs.getInt("othr_ship_nm_voy_yn") : null) + .othrShipNmVoyYnDesc(rs.getString("othr_ship_nm_voy_yn_desc")) + .mmsiAnomMessage(rs.getObject("mmsi_anom_message") != null ? rs.getInt("mmsi_anom_message") : null) + .mmsiAnomMessageDesc(rs.getString("mmsi_anom_message_desc")) + .recentDarkActv(rs.getObject("recent_dark_actv") != null ? rs.getInt("recent_dark_actv") : null) + .recentDarkActvDesc(rs.getString("recent_dark_actv_desc")) + .portPrtcll(rs.getObject("port_prtcll") != null ? rs.getInt("port_prtcll") : null) + .portPrtcllDesc(rs.getString("port_prtcll_desc")) + .portRisk(rs.getObject("port_risk") != null ? rs.getInt("port_risk") : null) + .portRiskDesc(rs.getString("port_risk_desc")) + .stsJob(rs.getObject("sts_job") != null ? rs.getInt("sts_job") : null) + .stsJobDesc(rs.getString("sts_job_desc")) + .driftChg(rs.getObject("drift_chg") != null ? rs.getInt("drift_chg") : null) + .driftChgDesc(rs.getString("drift_chg_desc")) + .riskEvent(rs.getObject("risk_event") != null ? rs.getInt("risk_event") : null) + .riskEventDesc(rs.getString("risk_event_desc")) + .riskEventDescExt(rs.getString("risk_event_desc_ext")) + .ntnltyChg(rs.getObject("ntnlty_chg") != null ? rs.getInt("ntnlty_chg") : null) + .ntnltyChgDesc(rs.getString("ntnlty_chg_desc")) + .ntnltyPrsMouPerf(rs.getObject("ntnlty_prs_mou_perf") != null ? rs.getInt("ntnlty_prs_mou_perf") : null) + .ntnltyPrsMouPerfDesc(rs.getString("ntnlty_prs_mou_perf_desc")) + .ntnltyTkyMouPerf(rs.getObject("ntnlty_tky_mou_perf") != null ? rs.getInt("ntnlty_tky_mou_perf") : null) + .ntnltyTkyMouPerfDesc(rs.getString("ntnlty_tky_mou_perf_desc")) + .ntnltyUscgMouPerf(rs.getObject("ntnlty_uscg_mou_perf") != null ? rs.getInt("ntnlty_uscg_mou_perf") : null) + .ntnltyUscgMouPerfDesc(rs.getString("ntnlty_uscg_mou_perf_desc")) + .uscgExclShipCert(rs.getObject("uscg_excl_ship_cert") != null ? rs.getInt("uscg_excl_ship_cert") : null) + .uscgExclShipCertDesc(rs.getString("uscg_excl_ship_cert_desc")) + .pscInspectionElpsHr(rs.getObject("psc_inspection_elps_hr") != null ? rs.getInt("psc_inspection_elps_hr") : null) + .pscInspectionElpsHrDesc(rs.getString("psc_inspection_elps_hr_desc")) + .pscInspection(rs.getObject("psc_inspection") != null ? rs.getInt("psc_inspection") : null) + .pscInspectionDesc(rs.getString("psc_inspection_desc")) + .pscDefect(rs.getObject("psc_defect") != null ? rs.getInt("psc_defect") : null) + .pscDefectDesc(rs.getString("psc_defect_desc")) + .pscDetained(rs.getObject("psc_detained") != null ? rs.getInt("psc_detained") : null) + .pscDetainedDesc(rs.getString("psc_detained_desc")) + .nowSmgrcEvdc(rs.getObject("now_smgrc_evdc") != null ? rs.getInt("now_smgrc_evdc") : null) + .nowSmgrcEvdcDesc(rs.getString("now_smgrc_evdc_desc")) + .doccChg(rs.getObject("docc_chg") != null ? rs.getInt("docc_chg") : null) + .doccChgDesc(rs.getString("docc_chg_desc")) + .nowClfic(rs.getObject("now_clfic") != null ? rs.getInt("now_clfic") : null) + .nowClficDesc(rs.getString("now_clfic_desc")) + .nowClficDescExt(rs.getString("now_clfic_desc_ext")) + .clficStatusChg(rs.getObject("clfic_status_chg") != null ? rs.getInt("clfic_status_chg") : null) + .clficStatusChgDesc(rs.getString("clfic_status_chg_desc")) + .pniInsrnc(rs.getObject("pni_insrnc") != null ? rs.getInt("pni_insrnc") : null) + .pniInsrncDesc(rs.getString("pni_insrnc_desc")) + .pniInsrncDescExt(rs.getString("pni_insrnc_desc_ext")) + .shipNmChg(rs.getObject("ship_nm_chg") != null ? rs.getInt("ship_nm_chg") : null) + .shipNmChgDesc(rs.getString("ship_nm_chg_desc")) + .gboChg(rs.getObject("gbo_chg") != null ? rs.getInt("gbo_chg") : null) + .gboChgDesc(rs.getString("gbo_chg_desc")) + .vslage(rs.getObject("vslage") != null ? rs.getInt("vslage") : null) + .vslageDesc(rs.getString("vslage_desc")) + .ilglFshrViol(rs.getObject("ilgl_fshr_viol") != null ? rs.getInt("ilgl_fshr_viol") : null) + .ilglFshrViolDesc(rs.getString("ilgl_fshr_viol_desc")) + .draftChg(rs.getObject("draft_chg") != null ? rs.getInt("draft_chg") : null) + .draftChgDesc(rs.getString("draft_chg_desc")) + .recentSanctionPrtcll(rs.getObject("recent_sanction_prtcll") != null ? rs.getInt("recent_sanction_prtcll") : null) + .recentSanctionPrtcllDesc(rs.getString("recent_sanction_prtcll_desc")) + .snglShipVoy(rs.getObject("sngl_ship_voy") != null ? rs.getInt("sngl_ship_voy") : null) + .snglShipVoyDesc(rs.getString("sngl_ship_voy_desc")) + .fltsfty(rs.getObject("fltsfty") != null ? rs.getInt("fltsfty") : null) + .fltsftyDesc(rs.getString("fltsfty_desc")) + .fltPsc(rs.getObject("flt_psc") != null ? rs.getInt("flt_psc") : null) + .fltPscDesc(rs.getString("flt_psc_desc")) + .spcInspectionOvdue(rs.getObject("spc_inspection_ovdue") != null ? rs.getInt("spc_inspection_ovdue") : null) + .spcInspectionOvdueDesc(rs.getString("spc_inspection_ovdue_desc")) + .ownrUnk(rs.getObject("ownr_unk") != null ? rs.getInt("ownr_unk") : null) + .ownrUnkDesc(rs.getString("ownr_unk_desc")) + .rssPortCall(rs.getObject("rss_port_call") != null ? rs.getInt("rss_port_call") : null) + .rssPortCallDesc(rs.getString("rss_port_call_desc")) + .rssOwnrReg(rs.getObject("rss_ownr_reg") != null ? rs.getInt("rss_ownr_reg") : null) + .rssOwnrRegDesc(rs.getString("rss_ownr_reg_desc")) + .rssSts(rs.getObject("rss_sts") != null ? rs.getInt("rss_sts") : null) + .rssStsDesc(rs.getString("rss_sts_desc")) .build(); } } diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/repository/RiskRepository.java b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/repository/RiskRepository.java index 485f564..3f613e4 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/repository/RiskRepository.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/repository/RiskRepository.java @@ -1,5 +1,6 @@ package com.snp.batch.jobs.datasync.batch.risk.repository; +import com.snp.batch.jobs.datasync.batch.risk.entity.RiskChangeEntity; import com.snp.batch.jobs.datasync.batch.risk.entity.RiskEntity; import java.util.List; @@ -9,6 +10,7 @@ import java.util.List; * 구현체: RiskRepositoryImpl (JdbcTemplate 기반) */ public interface RiskRepository { - void saveRisk(List riskEntityList); - void saveRiskHistory(List riskEntityList); + void saveRiskDetail(List riskEntityList); + void saveRiskDetailHistory(List riskEntityList); + void saveRiskDetailChangeHistory(List changeEntityList); } diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/repository/RiskRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/repository/RiskRepositoryImpl.java index 9ac442b..7c57adf 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/repository/RiskRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/repository/RiskRepositoryImpl.java @@ -2,6 +2,7 @@ package com.snp.batch.jobs.datasync.batch.risk.repository; import com.snp.batch.common.batch.repository.MultiDataSourceJdbcRepository; import com.snp.batch.common.util.TableMetaInfo; +import com.snp.batch.jobs.datasync.batch.risk.entity.RiskChangeEntity; import com.snp.batch.jobs.datasync.batch.risk.entity.RiskEntity; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; @@ -12,6 +13,7 @@ import org.springframework.stereotype.Repository; import javax.sql.DataSource; import java.sql.PreparedStatement; import java.sql.Timestamp; +import java.sql.Types; import java.util.List; /** @@ -21,145 +23,140 @@ import java.util.List; @Repository("riskRepository") public class RiskRepositoryImpl extends MultiDataSourceJdbcRepository implements RiskRepository { - private DataSource batchDataSource; - private DataSource businessDataSource; private final TableMetaInfo tableMetaInfo; public RiskRepositoryImpl(@Qualifier("batchDataSource") DataSource batchDataSource, @Qualifier("businessDataSource") DataSource businessDataSource, TableMetaInfo tableMetaInfo) { - super(new JdbcTemplate(batchDataSource), new JdbcTemplate(businessDataSource)); - - this.batchDataSource = batchDataSource; - this.businessDataSource = businessDataSource; this.tableMetaInfo = tableMetaInfo; } - @Override - protected String getTableName() { - return null; - } + @Override protected String getTableName() { return null; } + @Override protected RowMapper getRowMapper() { return null; } + @Override protected Long extractId(RiskEntity entity) { return null; } + @Override protected String getInsertSql() { return null; } + @Override protected String getUpdateSql() { return null; } + @Override protected void setInsertParameters(PreparedStatement ps, RiskEntity entity) {} + @Override protected void setUpdateParameters(PreparedStatement ps, RiskEntity entity) {} + @Override protected String getEntityName() { return null; } @Override - protected RowMapper getRowMapper() { - return null; - } - - @Override - protected Long extractId(RiskEntity entity) { - return null; - } - - @Override - protected String getInsertSql() { - return null; - } - - @Override - protected String getUpdateSql() { - return null; - } - - @Override - protected void setInsertParameters(PreparedStatement ps, RiskEntity entity) throws Exception { - } - - @Override - protected void setUpdateParameters(PreparedStatement ps, RiskEntity entity) throws Exception { - } - - @Override - protected String getEntityName() { - return null; - } - - @Override - public void saveRisk(List riskEntityList) { - String sql = RiskSql.getRiskUpsertSql(tableMetaInfo.targetTbShipRiskInfo, "imo_no"); - if (riskEntityList == null || riskEntityList.isEmpty()) { - return; - } - // log.debug("{} 배치 삽입 시작: {} 건", "RiskEntity", riskEntityList.size()); - + public void saveRiskDetail(List riskEntityList) { + if (riskEntityList == null || riskEntityList.isEmpty()) return; + String sql = RiskSql.getRiskDetailUpsertSql(tableMetaInfo.targetTbShipRiskDetailInfo, "imo_no"); batchJdbcTemplate.batchUpdate(sql, riskEntityList, riskEntityList.size(), - (ps, entity) -> { - try { - bindRisk(ps, entity); - } catch (Exception e) { - log.error("배치 삽입 파라미터 설정 실패", e); - throw new RuntimeException(e); - } - }); - - // log.debug("{} 배치 삽입 완료: {} 건", "RiskEntity", riskEntityList.size()); + (ps, entity) -> { try { bindRiskDetail(ps, entity); } catch (Exception e) { throw new RuntimeException(e); } }); } @Override - public void saveRiskHistory(List riskEntityList) { - String sql = RiskSql.getRiskUpsertSql(tableMetaInfo.targetTbShipRiskHstry, "imo_no, last_mdfcn_dt"); - if (riskEntityList == null || riskEntityList.isEmpty()) { - return; - } - // log.debug("{} 배치 삽입 시작: {} 건", "RiskEntity", riskEntityList.size()); - + public void saveRiskDetailHistory(List riskEntityList) { + if (riskEntityList == null || riskEntityList.isEmpty()) return; + String sql = RiskSql.getRiskDetailUpsertSql(tableMetaInfo.targetTbShipRiskDetailHstry, "imo_no, last_mdfcn_dt"); batchJdbcTemplate.batchUpdate(sql, riskEntityList, riskEntityList.size(), - (ps, entity) -> { - try { - bindRisk(ps, entity); - } catch (Exception e) { - log.error("배치 삽입 파라미터 설정 실패", e); - throw new RuntimeException(e); - } - }); - - // log.debug("{} 배치 삽입 완료: {} 건", "RiskEntity", riskEntityList.size()); + (ps, entity) -> { try { bindRiskDetail(ps, entity); } catch (Exception e) { throw new RuntimeException(e); } }); } - public void bindRisk(PreparedStatement pstmt, RiskEntity entity) throws Exception { + @Override + public void saveRiskDetailChangeHistory(List changeEntityList) { + if (changeEntityList == null || changeEntityList.isEmpty()) return; + String sql = RiskSql.getChangeHistoryInsertSql(tableMetaInfo.targetTbShipRiskDetailInfoHstry); + batchJdbcTemplate.batchUpdate(sql, changeEntityList, changeEntityList.size(), + (ps, entity) -> { + ps.setString(1, entity.getImoNo()); + ps.setTimestamp(2, entity.getLastMdfcnDt() != null ? Timestamp.valueOf(entity.getLastMdfcnDt()) : null); + ps.setString(3, entity.getFlctnColNm()); + ps.setString(4, entity.getBfrVal()); + ps.setString(5, entity.getAftrVal()); + }); + } + + private void bindRiskDetail(PreparedStatement ps, RiskEntity e) throws Exception { int idx = 1; - pstmt.setString(idx++, "SYSTEM"); // 1. creatr_id - pstmt.setString(idx++, entity.getImoNo()); // 2. imo_no - pstmt.setTimestamp(idx++, entity.getLastMdfcnDt() != null ? Timestamp.valueOf(entity.getLastMdfcnDt()) : null); // 3. last_mdfcn_dt - pstmt.setString(idx++, entity.getRiskDataMaint()); // 4. risk_data_maint - pstmt.setString(idx++, entity.getAisNotrcvElpsDays()); // 5. ais_notrcv_elps_days - pstmt.setString(idx++, entity.getAisLwrnkDays()); // 6. ais_lwrnk_days - pstmt.setString(idx++, entity.getAisUpImoDesc()); // 7. ais_up_imo_desc - pstmt.setString(idx++, entity.getOthrShipNmVoyYn()); // 8. othr_ship_nm_voy_yn - pstmt.setString(idx++, entity.getMmsiAnomMessage()); // 9. mmsi_anom_message - pstmt.setString(idx++, entity.getRecentDarkActv()); // 10. recent_dark_actv - pstmt.setString(idx++, entity.getPortPrtcll()); // 11. port_prtcll - pstmt.setString(idx++, entity.getPortRisk()); // 12. port_risk - pstmt.setString(idx++, entity.getStsJob()); // 13. sts_job - pstmt.setString(idx++, entity.getDriftChg()); // 14. drift_chg - pstmt.setString(idx++, entity.getRiskEvent()); // 15. risk_event - pstmt.setString(idx++, entity.getNtnltyChg()); // 16. ntnlty_chg - pstmt.setString(idx++, entity.getNtnltyPrsMouPerf()); // 17. ntnlty_prs_mou_perf - pstmt.setString(idx++, entity.getNtnltyTkyMouPerf()); // 18. ntnlty_tky_mou_perf - pstmt.setString(idx++, entity.getNtnltyUscgMouPerf()); // 19. ntnlty_uscg_mou_perf - pstmt.setString(idx++, entity.getUscgExclShipCert()); // 20. uscg_excl_ship_cert - pstmt.setString(idx++, entity.getPscInspectionElpsHr()); // 21. psc_inspection_elps_hr - pstmt.setString(idx++, entity.getPscInspection()); // 22. psc_inspection - pstmt.setString(idx++, entity.getPscDefect()); // 23. psc_defect - pstmt.setString(idx++, entity.getPscDetained()); // 24. psc_detained - pstmt.setString(idx++, entity.getNowSmgrcEvdc()); // 25. now_smgrc_evdc - pstmt.setString(idx++, entity.getDoccChg()); // 26. docc_chg - pstmt.setString(idx++, entity.getNowClfic()); // 27. now_clfic - pstmt.setString(idx++, entity.getClficStatusChg()); // 28. clfic_status_chg - pstmt.setString(idx++, entity.getPniInsrnc()); // 29. pni_insrnc - pstmt.setString(idx++, entity.getShipNmChg()); // 30. ship_nm_chg - pstmt.setString(idx++, entity.getGboChg()); // 31. gbo_chg - pstmt.setString(idx++, entity.getVslage()); // 32. vslage - pstmt.setString(idx++, entity.getIlglFshrViol()); // 33. ilgl_fshr_viol - pstmt.setString(idx++, entity.getDraftChg()); // 34. draft_chg - pstmt.setString(idx++, entity.getRecentSanctionPrtcll()); // 35. recent_sanction_prtcll - pstmt.setString(idx++, entity.getSnglShipVoy()); // 36. sngl_ship_voy - pstmt.setString(idx++, entity.getFltsfty()); // 37. fltsfty - pstmt.setString(idx++, entity.getFltPsc()); // 38. flt_psc - pstmt.setString(idx++, entity.getSpcInspectionOvdue()); // 39. spc_inspection_ovdue - pstmt.setString(idx++, entity.getOwnrUnk()); // 40. ownr_unk - pstmt.setString(idx++, entity.getRssPortCall()); // 41. rss_port_call - pstmt.setString(idx++, entity.getRssOwnrReg()); // 42. rss_ownr_reg - pstmt.setString(idx++, entity.getRssSts()); // 43. rss_sts + ps.setString(idx++, "SYSTEM"); + ps.setString(idx++, e.getImoNo()); + ps.setTimestamp(idx++, e.getLastMdfcnDt() != null ? Timestamp.valueOf(e.getLastMdfcnDt()) : null); + ps.setObject(idx++, e.getRiskDataMaint(), Types.INTEGER); + ps.setObject(idx++, e.getAisNotrcvElpsDays(), Types.INTEGER); + ps.setString(idx++, e.getAisNotrcvElpsDaysDesc()); + ps.setObject(idx++, e.getAisLwrnkDays(), Types.INTEGER); + ps.setString(idx++, e.getAisLwrnkDaysDesc()); + ps.setObject(idx++, e.getAisUpImoDesc(), Types.INTEGER); + ps.setString(idx++, e.getAisUpImoDescVal()); + ps.setObject(idx++, e.getOthrShipNmVoyYn(), Types.INTEGER); + ps.setString(idx++, e.getOthrShipNmVoyYnDesc()); + ps.setObject(idx++, e.getMmsiAnomMessage(), Types.INTEGER); + ps.setString(idx++, e.getMmsiAnomMessageDesc()); + ps.setObject(idx++, e.getRecentDarkActv(), Types.INTEGER); + ps.setString(idx++, e.getRecentDarkActvDesc()); + ps.setObject(idx++, e.getPortPrtcll(), Types.INTEGER); + ps.setString(idx++, e.getPortPrtcllDesc()); + ps.setObject(idx++, e.getPortRisk(), Types.INTEGER); + ps.setString(idx++, e.getPortRiskDesc()); + ps.setObject(idx++, e.getStsJob(), Types.INTEGER); + ps.setString(idx++, e.getStsJobDesc()); + ps.setObject(idx++, e.getDriftChg(), Types.INTEGER); + ps.setString(idx++, e.getDriftChgDesc()); + ps.setObject(idx++, e.getRiskEvent(), Types.INTEGER); + ps.setString(idx++, e.getRiskEventDesc()); + ps.setString(idx++, e.getRiskEventDescExt()); + ps.setObject(idx++, e.getNtnltyChg(), Types.INTEGER); + ps.setString(idx++, e.getNtnltyChgDesc()); + ps.setObject(idx++, e.getNtnltyPrsMouPerf(), Types.INTEGER); + ps.setString(idx++, e.getNtnltyPrsMouPerfDesc()); + ps.setObject(idx++, e.getNtnltyTkyMouPerf(), Types.INTEGER); + ps.setString(idx++, e.getNtnltyTkyMouPerfDesc()); + ps.setObject(idx++, e.getNtnltyUscgMouPerf(), Types.INTEGER); + ps.setString(idx++, e.getNtnltyUscgMouPerfDesc()); + ps.setObject(idx++, e.getUscgExclShipCert(), Types.INTEGER); + ps.setString(idx++, e.getUscgExclShipCertDesc()); + ps.setObject(idx++, e.getPscInspectionElpsHr(), Types.INTEGER); + ps.setString(idx++, e.getPscInspectionElpsHrDesc()); + ps.setObject(idx++, e.getPscInspection(), Types.INTEGER); + ps.setString(idx++, e.getPscInspectionDesc()); + ps.setObject(idx++, e.getPscDefect(), Types.INTEGER); + ps.setString(idx++, e.getPscDefectDesc()); + ps.setObject(idx++, e.getPscDetained(), Types.INTEGER); + ps.setString(idx++, e.getPscDetainedDesc()); + ps.setObject(idx++, e.getNowSmgrcEvdc(), Types.INTEGER); + ps.setString(idx++, e.getNowSmgrcEvdcDesc()); + ps.setObject(idx++, e.getDoccChg(), Types.INTEGER); + ps.setString(idx++, e.getDoccChgDesc()); + ps.setObject(idx++, e.getNowClfic(), Types.INTEGER); + ps.setString(idx++, e.getNowClficDesc()); + ps.setString(idx++, e.getNowClficDescExt()); + ps.setObject(idx++, e.getClficStatusChg(), Types.INTEGER); + ps.setString(idx++, e.getClficStatusChgDesc()); + ps.setObject(idx++, e.getPniInsrnc(), Types.INTEGER); + ps.setString(idx++, e.getPniInsrncDesc()); + ps.setString(idx++, e.getPniInsrncDescExt()); + ps.setObject(idx++, e.getShipNmChg(), Types.INTEGER); + ps.setString(idx++, e.getShipNmChgDesc()); + ps.setObject(idx++, e.getGboChg(), Types.INTEGER); + ps.setString(idx++, e.getGboChgDesc()); + ps.setObject(idx++, e.getVslage(), Types.INTEGER); + ps.setString(idx++, e.getVslageDesc()); + ps.setObject(idx++, e.getIlglFshrViol(), Types.INTEGER); + ps.setString(idx++, e.getIlglFshrViolDesc()); + ps.setObject(idx++, e.getDraftChg(), Types.INTEGER); + ps.setString(idx++, e.getDraftChgDesc()); + ps.setObject(idx++, e.getRecentSanctionPrtcll(), Types.INTEGER); + ps.setString(idx++, e.getRecentSanctionPrtcllDesc()); + ps.setObject(idx++, e.getSnglShipVoy(), Types.INTEGER); + ps.setString(idx++, e.getSnglShipVoyDesc()); + ps.setObject(idx++, e.getFltsfty(), Types.INTEGER); + ps.setString(idx++, e.getFltsftyDesc()); + ps.setObject(idx++, e.getFltPsc(), Types.INTEGER); + ps.setString(idx++, e.getFltPscDesc()); + ps.setObject(idx++, e.getSpcInspectionOvdue(), Types.INTEGER); + ps.setString(idx++, e.getSpcInspectionOvdueDesc()); + ps.setObject(idx++, e.getOwnrUnk(), Types.INTEGER); + ps.setString(idx++, e.getOwnrUnkDesc()); + ps.setObject(idx++, e.getRssPortCall(), Types.INTEGER); + ps.setString(idx++, e.getRssPortCallDesc()); + ps.setObject(idx++, e.getRssOwnrReg(), Types.INTEGER); + ps.setString(idx++, e.getRssOwnrRegDesc()); + ps.setObject(idx++, e.getRssSts(), Types.INTEGER); + ps.setString(idx++, e.getRssStsDesc()); } } diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/repository/RiskSql.java b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/repository/RiskSql.java index bf62205..2b0f163 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/repository/RiskSql.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/repository/RiskSql.java @@ -6,85 +6,135 @@ import org.springframework.stereotype.Component; @Component public class RiskSql { private static String TARGET_SCHEMA; + public RiskSql(@Value("${app.batch.target-schema.name}") String targetSchema) { TARGET_SCHEMA = targetSchema; } - public static String getRiskUpsertSql(String targetTable, String targetIndex) { + /** + * Risk Detail UPSERT SQL (tb_ship_risk_detail_info, tb_ship_risk_detail_hstry 공통) + */ + public static String getRiskDetailUpsertSql(String targetTable, String targetIndex) { return """ INSERT INTO %s.%s ( crt_dt, creatr_id, - imo_no, last_mdfcn_dt, risk_data_maint, ais_notrcv_elps_days, - ais_lwrnk_days, ais_up_imo_desc, othr_ship_nm_voy_yn, mmsi_anom_message, - recent_dark_actv, port_prtcll, port_risk, sts_job, - drift_chg, risk_event, ntnlty_chg, ntnlty_prs_mou_perf, - ntnlty_tky_mou_perf, ntnlty_uscg_mou_perf, uscg_excl_ship_cert, psc_inspection_elps_hr, - psc_inspection, psc_defect, psc_detained, now_smgrc_evdc, - docc_chg, now_clfic, clfic_status_chg, pni_insrnc, - ship_nm_chg, gbo_chg, vslage, ilgl_fshr_viol, - draft_chg, recent_sanction_prtcll, sngl_ship_voy, fltsfty, - flt_psc, spc_inspection_ovdue, ownr_unk, rss_port_call, - rss_ownr_reg, rss_sts + imo_no, last_mdfcn_dt, risk_data_maint, + ais_notrcv_elps_days, ais_notrcv_elps_days_desc, + ais_lwrnk_days, ais_lwrnk_days_desc, + ais_up_imo_desc, ais_up_imo_desc_val, + othr_ship_nm_voy_yn, othr_ship_nm_voy_yn_desc, + mmsi_anom_message, mmsi_anom_message_desc, + recent_dark_actv, recent_dark_actv_desc, + port_prtcll, port_prtcll_desc, + port_risk, port_risk_desc, + sts_job, sts_job_desc, + drift_chg, drift_chg_desc, + risk_event, risk_event_desc, risk_event_desc_ext, + ntnlty_chg, ntnlty_chg_desc, + ntnlty_prs_mou_perf, ntnlty_prs_mou_perf_desc, + ntnlty_tky_mou_perf, ntnlty_tky_mou_perf_desc, + ntnlty_uscg_mou_perf, ntnlty_uscg_mou_perf_desc, + uscg_excl_ship_cert, uscg_excl_ship_cert_desc, + psc_inspection_elps_hr, psc_inspection_elps_hr_desc, + psc_inspection, psc_inspection_desc, + psc_defect, psc_defect_desc, + psc_detained, psc_detained_desc, + now_smgrc_evdc, now_smgrc_evdc_desc, + docc_chg, docc_chg_desc, + now_clfic, now_clfic_desc, now_clfic_desc_ext, + clfic_status_chg, clfic_status_chg_desc, + pni_insrnc, pni_insrnc_desc, pni_insrnc_desc_ext, + ship_nm_chg, ship_nm_chg_desc, + gbo_chg, gbo_chg_desc, + vslage, vslage_desc, + ilgl_fshr_viol, ilgl_fshr_viol_desc, + draft_chg, draft_chg_desc, + recent_sanction_prtcll, recent_sanction_prtcll_desc, + sngl_ship_voy, sngl_ship_voy_desc, + fltsfty, fltsfty_desc, + flt_psc, flt_psc_desc, + spc_inspection_ovdue, spc_inspection_ovdue_desc, + ownr_unk, ownr_unk_desc, + rss_port_call, rss_port_call_desc, + rss_ownr_reg, rss_ownr_reg_desc, + rss_sts, rss_sts_desc ) VALUES ( CURRENT_TIMESTAMP, ?, - ?, ?, ?, ?, - ?, ?, ?, ?, - ?, ?, ?, ?, - ?, ?, ?, ?, - ?, ?, ?, ?, - ?, ?, ?, ?, - ?, ?, ?, ?, - ?, ?, ?, ?, - ?, ?, ?, ?, - ?, ?, ?, ?, - ?, ? + ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ? ) ON CONFLICT (%s) DO UPDATE SET - mdfcn_dt = CURRENT_TIMESTAMP, - mdfr_id = 'SYSTEM', - last_mdfcn_dt = EXCLUDED.last_mdfcn_dt, - risk_data_maint = EXCLUDED.risk_data_maint, - ais_notrcv_elps_days = EXCLUDED.ais_notrcv_elps_days, - ais_lwrnk_days = EXCLUDED.ais_lwrnk_days, - ais_up_imo_desc = EXCLUDED.ais_up_imo_desc, - othr_ship_nm_voy_yn = EXCLUDED.othr_ship_nm_voy_yn, - mmsi_anom_message = EXCLUDED.mmsi_anom_message, - recent_dark_actv = EXCLUDED.recent_dark_actv, - port_prtcll = EXCLUDED.port_prtcll, - port_risk = EXCLUDED.port_risk, - sts_job = EXCLUDED.sts_job, - drift_chg = EXCLUDED.drift_chg, - risk_event = EXCLUDED.risk_event, - ntnlty_chg = EXCLUDED.ntnlty_chg, - ntnlty_prs_mou_perf = EXCLUDED.ntnlty_prs_mou_perf, - ntnlty_tky_mou_perf = EXCLUDED.ntnlty_tky_mou_perf, - ntnlty_uscg_mou_perf = EXCLUDED.ntnlty_uscg_mou_perf, - uscg_excl_ship_cert = EXCLUDED.uscg_excl_ship_cert, - psc_inspection_elps_hr = EXCLUDED.psc_inspection_elps_hr, - psc_inspection = EXCLUDED.psc_inspection, - psc_defect = EXCLUDED.psc_defect, - psc_detained = EXCLUDED.psc_detained, - now_smgrc_evdc = EXCLUDED.now_smgrc_evdc, - docc_chg = EXCLUDED.docc_chg, - now_clfic = EXCLUDED.now_clfic, - clfic_status_chg = EXCLUDED.clfic_status_chg, - pni_insrnc = EXCLUDED.pni_insrnc, - ship_nm_chg = EXCLUDED.ship_nm_chg, - gbo_chg = EXCLUDED.gbo_chg, - vslage = EXCLUDED.vslage, - ilgl_fshr_viol = EXCLUDED.ilgl_fshr_viol, - draft_chg = EXCLUDED.draft_chg, - recent_sanction_prtcll = EXCLUDED.recent_sanction_prtcll, - sngl_ship_voy = EXCLUDED.sngl_ship_voy, - fltsfty = EXCLUDED.fltsfty, - flt_psc = EXCLUDED.flt_psc, - spc_inspection_ovdue = EXCLUDED.spc_inspection_ovdue, - ownr_unk = EXCLUDED.ownr_unk, - rss_port_call = EXCLUDED.rss_port_call, - rss_ownr_reg = EXCLUDED.rss_ownr_reg, - rss_sts = EXCLUDED.rss_sts; + mdfcn_dt = CURRENT_TIMESTAMP, mdfr_id = 'SYSTEM', + last_mdfcn_dt = EXCLUDED.last_mdfcn_dt, risk_data_maint = EXCLUDED.risk_data_maint, + ais_notrcv_elps_days = EXCLUDED.ais_notrcv_elps_days, ais_notrcv_elps_days_desc = EXCLUDED.ais_notrcv_elps_days_desc, + ais_lwrnk_days = EXCLUDED.ais_lwrnk_days, ais_lwrnk_days_desc = EXCLUDED.ais_lwrnk_days_desc, + ais_up_imo_desc = EXCLUDED.ais_up_imo_desc, ais_up_imo_desc_val = EXCLUDED.ais_up_imo_desc_val, + othr_ship_nm_voy_yn = EXCLUDED.othr_ship_nm_voy_yn, othr_ship_nm_voy_yn_desc = EXCLUDED.othr_ship_nm_voy_yn_desc, + mmsi_anom_message = EXCLUDED.mmsi_anom_message, mmsi_anom_message_desc = EXCLUDED.mmsi_anom_message_desc, + recent_dark_actv = EXCLUDED.recent_dark_actv, recent_dark_actv_desc = EXCLUDED.recent_dark_actv_desc, + port_prtcll = EXCLUDED.port_prtcll, port_prtcll_desc = EXCLUDED.port_prtcll_desc, + port_risk = EXCLUDED.port_risk, port_risk_desc = EXCLUDED.port_risk_desc, + sts_job = EXCLUDED.sts_job, sts_job_desc = EXCLUDED.sts_job_desc, + drift_chg = EXCLUDED.drift_chg, drift_chg_desc = EXCLUDED.drift_chg_desc, + risk_event = EXCLUDED.risk_event, risk_event_desc = EXCLUDED.risk_event_desc, risk_event_desc_ext = EXCLUDED.risk_event_desc_ext, + ntnlty_chg = EXCLUDED.ntnlty_chg, ntnlty_chg_desc = EXCLUDED.ntnlty_chg_desc, + ntnlty_prs_mou_perf = EXCLUDED.ntnlty_prs_mou_perf, ntnlty_prs_mou_perf_desc = EXCLUDED.ntnlty_prs_mou_perf_desc, + ntnlty_tky_mou_perf = EXCLUDED.ntnlty_tky_mou_perf, ntnlty_tky_mou_perf_desc = EXCLUDED.ntnlty_tky_mou_perf_desc, + ntnlty_uscg_mou_perf = EXCLUDED.ntnlty_uscg_mou_perf, ntnlty_uscg_mou_perf_desc = EXCLUDED.ntnlty_uscg_mou_perf_desc, + uscg_excl_ship_cert = EXCLUDED.uscg_excl_ship_cert, uscg_excl_ship_cert_desc = EXCLUDED.uscg_excl_ship_cert_desc, + psc_inspection_elps_hr = EXCLUDED.psc_inspection_elps_hr, psc_inspection_elps_hr_desc = EXCLUDED.psc_inspection_elps_hr_desc, + psc_inspection = EXCLUDED.psc_inspection, psc_inspection_desc = EXCLUDED.psc_inspection_desc, + psc_defect = EXCLUDED.psc_defect, psc_defect_desc = EXCLUDED.psc_defect_desc, + psc_detained = EXCLUDED.psc_detained, psc_detained_desc = EXCLUDED.psc_detained_desc, + now_smgrc_evdc = EXCLUDED.now_smgrc_evdc, now_smgrc_evdc_desc = EXCLUDED.now_smgrc_evdc_desc, + docc_chg = EXCLUDED.docc_chg, docc_chg_desc = EXCLUDED.docc_chg_desc, + now_clfic = EXCLUDED.now_clfic, now_clfic_desc = EXCLUDED.now_clfic_desc, now_clfic_desc_ext = EXCLUDED.now_clfic_desc_ext, + clfic_status_chg = EXCLUDED.clfic_status_chg, clfic_status_chg_desc = EXCLUDED.clfic_status_chg_desc, + pni_insrnc = EXCLUDED.pni_insrnc, pni_insrnc_desc = EXCLUDED.pni_insrnc_desc, pni_insrnc_desc_ext = EXCLUDED.pni_insrnc_desc_ext, + ship_nm_chg = EXCLUDED.ship_nm_chg, ship_nm_chg_desc = EXCLUDED.ship_nm_chg_desc, + gbo_chg = EXCLUDED.gbo_chg, gbo_chg_desc = EXCLUDED.gbo_chg_desc, + vslage = EXCLUDED.vslage, vslage_desc = EXCLUDED.vslage_desc, + ilgl_fshr_viol = EXCLUDED.ilgl_fshr_viol, ilgl_fshr_viol_desc = EXCLUDED.ilgl_fshr_viol_desc, + draft_chg = EXCLUDED.draft_chg, draft_chg_desc = EXCLUDED.draft_chg_desc, + recent_sanction_prtcll = EXCLUDED.recent_sanction_prtcll, recent_sanction_prtcll_desc = EXCLUDED.recent_sanction_prtcll_desc, + sngl_ship_voy = EXCLUDED.sngl_ship_voy, sngl_ship_voy_desc = EXCLUDED.sngl_ship_voy_desc, + fltsfty = EXCLUDED.fltsfty, fltsfty_desc = EXCLUDED.fltsfty_desc, + flt_psc = EXCLUDED.flt_psc, flt_psc_desc = EXCLUDED.flt_psc_desc, + spc_inspection_ovdue = EXCLUDED.spc_inspection_ovdue, spc_inspection_ovdue_desc = EXCLUDED.spc_inspection_ovdue_desc, + ownr_unk = EXCLUDED.ownr_unk, ownr_unk_desc = EXCLUDED.ownr_unk_desc, + rss_port_call = EXCLUDED.rss_port_call, rss_port_call_desc = EXCLUDED.rss_port_call_desc, + rss_ownr_reg = EXCLUDED.rss_ownr_reg, rss_ownr_reg_desc = EXCLUDED.rss_ownr_reg_desc, + rss_sts = EXCLUDED.rss_sts, rss_sts_desc = EXCLUDED.rss_sts_desc; """.formatted(TARGET_SCHEMA, targetTable, targetIndex); } + + /** + * 값 변경 이력 INSERT SQL (tb_ship_risk_detail_info_hstry) + */ + public static String getChangeHistoryInsertSql(String targetTable) { + return """ + INSERT INTO %s.%s (crt_dt, creatr_id, imo_no, last_mdfcn_dt, flctn_col_nm, bfr_val, aftr_val) + VALUES (CURRENT_TIMESTAMP, 'SYSTEM', ?, ?, ?, ?, ?) + ON CONFLICT (imo_no, last_mdfcn_dt, flctn_col_nm) DO NOTHING + """.formatted(TARGET_SCHEMA, targetTable); + } + + /** + * 기존 데이터 조회 SQL (변경 비교용) + */ + public static String getSelectCurrentSql(String targetTable) { + return """ + SELECT * FROM %s.%s WHERE imo_no = ? + """.formatted(TARGET_SCHEMA, targetTable); + } } diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/writer/RiskChangeWriter.java b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/writer/RiskChangeWriter.java new file mode 100644 index 0000000..a1ed700 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/writer/RiskChangeWriter.java @@ -0,0 +1,23 @@ +package com.snp.batch.jobs.datasync.batch.risk.writer; + +import com.snp.batch.jobs.datasync.batch.risk.entity.RiskChangeEntity; +import com.snp.batch.jobs.datasync.batch.risk.repository.RiskRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ItemWriter; + +import java.util.ArrayList; + +@Slf4j +@RequiredArgsConstructor +public class RiskChangeWriter implements ItemWriter { + + private final RiskRepository riskRepository; + + @Override + public void write(Chunk chunk) throws Exception { + if (chunk.isEmpty()) return; + riskRepository.saveRiskDetailChangeHistory(new ArrayList<>(chunk.getItems())); + } +} diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/writer/RiskWriter.java b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/writer/RiskWriter.java index 0a5e4cf..ec396a4 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/writer/RiskWriter.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/writer/RiskWriter.java @@ -19,10 +19,8 @@ public class RiskWriter extends BaseChunkedWriter { @Override protected void writeItems(List items) throws Exception { - if (items.isEmpty()) { - return; - } - riskRepository.saveRisk(items); - riskRepository.saveRiskHistory(items); + if (items.isEmpty()) return; + riskRepository.saveRiskDetail(items); // 1. 최신 데이터 UPSERT + riskRepository.saveRiskDetailHistory(items); // 2. 스냅샷 이력 UPSERT } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 3e19c09..dd65879 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -152,7 +152,7 @@ app: movements-008: tb_ship_trnst_hstry code-001: tb_ship_type_cd code-002: tb_ship_country_cd - risk-compliance-001: tb_ship_risk_info + risk-compliance-001: tb_ship_risk_detail_info risk-compliance-003: tb_ship_compliance_info risk-compliance-006: tb_company_compliance_info target-schema: @@ -202,8 +202,9 @@ app: movements-008: tb_ship_trnst_hstry code-001: tb_ship_type_cd code-002: tb_ship_country_cd - risk-compliance-001: tb_ship_risk_info - risk-compliance-002: tb_ship_risk_hstry + risk-compliance-001: tb_ship_risk_detail_info + risk-compliance-001-1: tb_ship_risk_detail_info_hstry + risk-compliance-002: tb_ship_risk_detail_hstry risk-compliance-003: tb_ship_compliance_info risk-compliance-004: tb_ship_compliance_hstry risk-compliance-005: tb_ship_compliance_info_hstry -- 2.45.2 From eb16aa8b535b76f03e806ae9679023358ea5ef45 Mon Sep 17 00:00:00 2001 From: HYOJIN Date: Thu, 26 Mar 2026 13:37:36 +0900 Subject: [PATCH 2/2] =?UTF-8?q?docs:=20=EB=A6=B4=EB=A6=AC=EC=A6=88=20?= =?UTF-8?q?=EB=85=B8=ED=8A=B8=20=EC=97=85=EB=8D=B0=EC=9D=B4=ED=8A=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/RELEASE-NOTES.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/RELEASE-NOTES.md b/docs/RELEASE-NOTES.md index e8e96da..9294e82 100644 --- a/docs/RELEASE-NOTES.md +++ b/docs/RELEASE-NOTES.md @@ -5,7 +5,14 @@ ## [Unreleased] ### 추가 +- Risk 데이터 동기화 대상 변경: tb_ship_risk_detail_info + 값 변경 이력 관리 (#3) +- riskDetailChangeDataSyncJob: Risk indicator 값 변경 이력 Job 분리 - 배치 로그 관리 정리 Job 구현: 90일 보관 기간 기준 자동 삭제 (#16) +- README.md 프로젝트 문서 추가 + +### 변경 +- BaseSyncReader: while 루프 연속 그룹 로드 방식으로 변경 (multi-chunk Step 종료 문제 해결) +- BatchWriteListener: 청크 내 모든 고유 job_execution_id P→S 업데이트 ## [2026-03-25] -- 2.45.2