부모
405f813c9f
커밋
d7b43359fb
199
README.md
Normal file
199
README.md
Normal file
@ -0,0 +1,199 @@
|
|||||||
|
SNP Sync Batch
|
||||||
|
|
||||||
|
S&P Global 해양 데이터를 수집하여 PostgreSQL에 동기화하는 Spring Batch 시스템입니다.
|
||||||
|
|
||||||
|
## 기술 스택
|
||||||
|
|
||||||
|
| 구분 | 기술 |
|
||||||
|
|------|------|
|
||||||
|
| Language | Java 17 |
|
||||||
|
| Framework | Spring Boot 3.2.1, Spring Batch 5.1.0 |
|
||||||
|
| Scheduler | Quartz 2.5.0 (JDBC Store) |
|
||||||
|
| Database | PostgreSQL (dual datasource) |
|
||||||
|
| Cache | Caffeine |
|
||||||
|
| Frontend | React + TypeScript (Vite) |
|
||||||
|
| Build | Maven (frontend-maven-plugin 통합) |
|
||||||
|
| CI/CD | Gitea Actions → systemd 자동 배포 |
|
||||||
|
|
||||||
|
## 아키텍처
|
||||||
|
|
||||||
|
```
|
||||||
|
┌─────────────────────────────────────────────────────────────────┐
|
||||||
|
│ SNP Sync Batch │
|
||||||
|
│ │
|
||||||
|
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
|
||||||
|
│ │ Quartz │───►│ Reader │───►│ Processor │───►│ Writer │ │
|
||||||
|
│ │ Scheduler │ │(BaseSyncR)│ │(DTO→Entity)│ │(SubChunk) │ │
|
||||||
|
│ └──────────┘ └────┬─────┘ └──────────┘ └────┬─────┘ │
|
||||||
|
│ │ │ │
|
||||||
|
│ ┌────────▼────────┐ ┌───────▼───────┐ │
|
||||||
|
│ │ Source DB │ │ Target DB │ │
|
||||||
|
│ │ (std_snp_data) │ │ (std_snp_svc) │ │
|
||||||
|
│ │ batch_flag 관리 │ │ UPSERT 저장 │ │
|
||||||
|
│ └─────────────────┘ └───────────────┘ │
|
||||||
|
│ │
|
||||||
|
│ ┌──────────────────────────────────────────────────────────┐ │
|
||||||
|
│ │ React Frontend (포트 8051, /snp-sync) │ │
|
||||||
|
│ │ 대시보드 │ 동기화현황 │ 실행이력 │ 스케줄 │ 타임라인 │ │
|
||||||
|
│ └──────────────────────────────────────────────────────────┘ │
|
||||||
|
└─────────────────────────────────────────────────────────────────┘
|
||||||
|
```
|
||||||
|
|
||||||
|
## 데이터베이스 구조
|
||||||
|
|
||||||
|
### Dual Datasource
|
||||||
|
|
||||||
|
| Datasource | 스키마 | 용도 |
|
||||||
|
|------------|--------|------|
|
||||||
|
| batchDataSource (Primary) | snp_batch | Spring Batch 메타데이터, Quartz 스케줄 |
|
||||||
|
| businessDataSource | std_snp_data / std_snp_svc | 비즈니스 데이터 (소스/타겟) |
|
||||||
|
|
||||||
|
### 동기화 대상 테이블
|
||||||
|
|
||||||
|
| 도메인 | 소스 테이블 수 | 주요 내용 |
|
||||||
|
|--------|--------------|----------|
|
||||||
|
| Ship | 25개 | 선박 제원, 이력, 관계 |
|
||||||
|
| Company | 1개 | 회사 상세 정보 |
|
||||||
|
| Event | 4개 | 해양 사건/사고 |
|
||||||
|
| Facility | 1개 | 항구 시설 |
|
||||||
|
| PSC | 3개 | PSC 검사 |
|
||||||
|
| Movements | 8개 | 선박 이동 (항적) |
|
||||||
|
| Code | 2개 | 코드 (선종, 국적) |
|
||||||
|
| Risk | 1개 | 위험 지표 (+ 값 변경 이력) |
|
||||||
|
| Compliance | 2개 | 컴플라이언스 (+ 값 변경 이력) |
|
||||||
|
|
||||||
|
## 동기화 프로세스
|
||||||
|
|
||||||
|
### batch_flag 상태 흐름
|
||||||
|
|
||||||
|
```
|
||||||
|
[N] 대기 ──── Reader ────► [P] 진행 ──── Writer 성공 ────► [S] 완료
|
||||||
|
│ │
|
||||||
|
│ Writer 실패 시
|
||||||
|
│ P 상태 고착 (수동 리셋 필요)
|
||||||
|
│
|
||||||
|
batch_job_execution.status = 'COMPLETED' 인 데이터만 대상
|
||||||
|
```
|
||||||
|
|
||||||
|
### Reader → Writer 흐름
|
||||||
|
|
||||||
|
```
|
||||||
|
BaseSyncReader (while 루프)
|
||||||
|
│
|
||||||
|
├─ fetchNextGroup(11284): MIN(batch_flag='N') → 데이터 로드 → N→P
|
||||||
|
├─ fetchNextGroup(11286): 다음 그룹 연속 로드 → N→P
|
||||||
|
├─ fetchNextGroup(11317): 다음 그룹 연속 로드 → N→P
|
||||||
|
└─ fetchNextGroup(): 데이터 없음 → return null → Step 종료
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
BaseChunkedWriter (sub-chunk 5000건 단위)
|
||||||
|
│
|
||||||
|
├─ Sub-Chunk 1: [1~5000건] → 독립 트랜잭션 커밋
|
||||||
|
├─ Sub-Chunk 2: [5001~10000건] → 독립 트랜잭션 커밋
|
||||||
|
└─ ...
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
BatchWriteListener.afterWrite()
|
||||||
|
└─ 청크 내 모든 고유 job_execution_id → P→S 업데이트
|
||||||
|
```
|
||||||
|
|
||||||
|
### 값 변경 이력 관리 (Risk, Compliance)
|
||||||
|
|
||||||
|
데이터 동기화 Job과 값 변경 이력 Job이 분리되어 운영됩니다:
|
||||||
|
|
||||||
|
```
|
||||||
|
riskDataSyncJob (데이터 동기화)
|
||||||
|
Source → Reader → Writer
|
||||||
|
├─ UPSERT → tb_ship_risk_detail_info (최신 데이터)
|
||||||
|
└─ UPSERT → tb_ship_risk_detail_hstry (스냅샷 이력)
|
||||||
|
|
||||||
|
riskDetailChangeDataSyncJob (값 변경 이력)
|
||||||
|
tb_ship_risk_detail_hstry (스냅샷)
|
||||||
|
→ imo_no별 시계열 비교
|
||||||
|
→ indicator 컬럼 변경분 감지
|
||||||
|
→ INSERT → tb_ship_risk_detail_info_hstry (컬럼명, 이전값, 이후값)
|
||||||
|
```
|
||||||
|
|
||||||
|
## 배치 Job 목록
|
||||||
|
|
||||||
|
### 데이터 동기화 Job
|
||||||
|
|
||||||
|
| Job 이름 | 도메인 | 설명 |
|
||||||
|
|----------|--------|------|
|
||||||
|
| snpMdaDataSyncJob | Ship + Company | 선박/회사 데이터 동기화 (26 Step) |
|
||||||
|
| eventDataSyncJob | Event | 사건 데이터 동기화 (4 Step) |
|
||||||
|
| facilitySyncJob | Facility | 항구 시설 동기화 |
|
||||||
|
| pscDataSyncJob | PSC | PSC 검사 동기화 (3 Step) |
|
||||||
|
| anchorageCallDataSyncJob | Movements | 정박지 기항 |
|
||||||
|
| berthCallDataSyncJob | Movements | 부두 기항 |
|
||||||
|
| currentlyAtDataSyncJob | Movements | 현재 위치 |
|
||||||
|
| destinationDataSyncJob | Movements | 목적지 |
|
||||||
|
| portCallDataSyncJob | Movements | 항구 기항 |
|
||||||
|
| stsOperationDataSyncJob | Movements | STS 작업 |
|
||||||
|
| terminalCallDataSyncJob | Movements | 터미널 기항 |
|
||||||
|
| transitDataSyncJob | Movements | 통과 |
|
||||||
|
| codeDataSyncJob | Code | 코드 동기화 (2 Step) |
|
||||||
|
| riskDataSyncJob | Risk | 위험 지표 동기화 |
|
||||||
|
| shipComplianceDataSyncJob | Compliance | 선박 컴플라이언스 |
|
||||||
|
| companyComplianceDataSyncJob | Compliance | 회사 컴플라이언스 |
|
||||||
|
|
||||||
|
### 값 변경 이력 Job
|
||||||
|
|
||||||
|
| Job 이름 | 설명 |
|
||||||
|
|----------|------|
|
||||||
|
| riskDetailChangeDataSyncJob | Risk indicator 값 변경 이력 |
|
||||||
|
| shipComplianceChangeDataSyncJob | 선박 컴플라이언스 값 변경 이력 |
|
||||||
|
| companyComplianceChangeDataSyncJob | 회사 컴플라이언스 값 변경 이력 |
|
||||||
|
|
||||||
|
### 유지보수 Job
|
||||||
|
|
||||||
|
| Job 이름 | 설명 |
|
||||||
|
|----------|------|
|
||||||
|
| batchLogCleanupJob | 배치 로그 정리 (90일 보관) |
|
||||||
|
|
||||||
|
## 프론트엔드
|
||||||
|
|
||||||
|
| 메뉴 | 경로 | 설명 |
|
||||||
|
|------|------|------|
|
||||||
|
| 대시보드 | `/` | 통계, 실행 중 작업, 최근 실행/실패 |
|
||||||
|
| 동기화 현황 | `/sync-status` | 테이블별 N/P/S 건수, 데이터 미리보기, P 리셋 |
|
||||||
|
| 실행 이력 | `/executions` | 배치 실행 검색, 필터, 상세 |
|
||||||
|
| 작업 | `/jobs` | Job 목록, 수동 실행 |
|
||||||
|
| 스케줄 | `/schedules` | Quartz 스케줄 CRUD, Cron 미리보기 |
|
||||||
|
| 타임라인 | `/schedule-timeline` | 일/주/월별 실행 시각화 |
|
||||||
|
|
||||||
|
## CI/CD
|
||||||
|
|
||||||
|
```
|
||||||
|
develop → main 머지
|
||||||
|
→ Gitea Actions (빌드 + JAR 배포)
|
||||||
|
→ systemd path watcher (.deploy-trigger 감지)
|
||||||
|
→ restart.sh → 서비스 재시작
|
||||||
|
```
|
||||||
|
|
||||||
|
### 배포 확인
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 배포 로그
|
||||||
|
cat /devdata/services/snp-sync-batch/backend/deploy.log
|
||||||
|
|
||||||
|
# 서비스 상태
|
||||||
|
systemctl status snp-sync-batch
|
||||||
|
|
||||||
|
# 실시간 로그
|
||||||
|
journalctl -u snp-sync-batch -n 100 -f
|
||||||
|
```
|
||||||
|
|
||||||
|
## 빌드 및 실행
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 개발 환경 실행
|
||||||
|
mvn spring-boot:run
|
||||||
|
|
||||||
|
# 패키징
|
||||||
|
mvn clean package -DskipTests
|
||||||
|
|
||||||
|
# 서버 포트: 8051
|
||||||
|
# Context Path: /snp-sync
|
||||||
|
# Swagger: /snp-sync/swagger-ui.html
|
||||||
|
```
|
||||||
@ -5,7 +5,14 @@
|
|||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
### 추가
|
### 추가
|
||||||
|
- Risk 데이터 동기화 대상 변경: tb_ship_risk_detail_info + 값 변경 이력 관리 (#3)
|
||||||
|
- riskDetailChangeDataSyncJob: Risk indicator 값 변경 이력 Job 분리
|
||||||
- 배치 로그 관리 정리 Job 구현: 90일 보관 기간 기준 자동 삭제 (#16)
|
- 배치 로그 관리 정리 Job 구현: 90일 보관 기간 기준 자동 삭제 (#16)
|
||||||
|
- README.md 프로젝트 문서 추가
|
||||||
|
|
||||||
|
### 변경
|
||||||
|
- BaseSyncReader: while 루프 연속 그룹 로드 방식으로 변경 (multi-chunk Step 종료 문제 해결)
|
||||||
|
- BatchWriteListener: 청크 내 모든 고유 job_execution_id P→S 업데이트
|
||||||
|
|
||||||
## [2026-03-25]
|
## [2026-03-25]
|
||||||
|
|
||||||
|
|||||||
@ -30,7 +30,6 @@ public abstract class BaseSyncReader<T extends JobExecutionGroupable> implements
|
|||||||
protected final JdbcTemplate businessJdbcTemplate;
|
protected final JdbcTemplate businessJdbcTemplate;
|
||||||
|
|
||||||
private List<T> allDataBuffer = new ArrayList<>();
|
private List<T> allDataBuffer = new ArrayList<>();
|
||||||
private Long currentGroupId = null;
|
|
||||||
|
|
||||||
protected BaseSyncReader(DataSource businessDataSource, TableMetaInfo tableMetaInfo) {
|
protected BaseSyncReader(DataSource businessDataSource, TableMetaInfo tableMetaInfo) {
|
||||||
this.businessJdbcTemplate = new JdbcTemplate(businessDataSource);
|
this.businessJdbcTemplate = new JdbcTemplate(businessDataSource);
|
||||||
@ -53,21 +52,13 @@ public abstract class BaseSyncReader<T extends JobExecutionGroupable> implements
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public T read() throws Exception {
|
public T read() throws Exception {
|
||||||
if (allDataBuffer.isEmpty()) {
|
// buffer가 비어있으면 다음 그룹 로드 (연속)
|
||||||
// 이전 그룹 처리 완료 → null 반환하여 청크 종료
|
while (allDataBuffer.isEmpty()) {
|
||||||
// (Writer + afterWrite(P→S)가 실행된 후 다음 청크에서 다음 그룹 로드)
|
|
||||||
if (currentGroupId != null) {
|
|
||||||
currentGroupId = null;
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 다음 그룹 로드
|
|
||||||
fetchNextGroup();
|
fetchNextGroup();
|
||||||
}
|
|
||||||
|
|
||||||
if (allDataBuffer.isEmpty()) {
|
if (allDataBuffer.isEmpty()) {
|
||||||
return null; // 더 이상 처리할 데이터 없음 → Step 종료
|
return null; // 더 이상 처리할 데이터 없음 → Step 종료
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return allDataBuffer.remove(0);
|
return allDataBuffer.remove(0);
|
||||||
}
|
}
|
||||||
@ -78,10 +69,14 @@ public abstract class BaseSyncReader<T extends JobExecutionGroupable> implements
|
|||||||
nextTargetId = businessJdbcTemplate.queryForObject(
|
nextTargetId = businessJdbcTemplate.queryForObject(
|
||||||
CommonSql.getNextTargetQuery(getSourceTable()), Long.class);
|
CommonSql.getNextTargetQuery(getSourceTable()), Long.class);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
log.warn("[{}] 다음 처리 대상 조회 실패: {}", getLogPrefix(), e.getMessage());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nextTargetId == null) return;
|
if (nextTargetId == null) {
|
||||||
|
log.debug("[{}] 더 이상 처리할 데이터 없음", getLogPrefix());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
log.info("[{}] 다음 처리 대상 ID 발견: {}", getLogPrefix(), nextTargetId);
|
log.info("[{}] 다음 처리 대상 ID 발견: {}", getLogPrefix(), nextTargetId);
|
||||||
|
|
||||||
@ -92,7 +87,5 @@ public abstract class BaseSyncReader<T extends JobExecutionGroupable> implements
|
|||||||
// N→P 전환
|
// N→P 전환
|
||||||
String updateSql = CommonSql.getProcessBatchQuery(getSourceTable());
|
String updateSql = CommonSql.getProcessBatchQuery(getSourceTable());
|
||||||
businessJdbcTemplate.update(updateSql, nextTargetId);
|
businessJdbcTemplate.update(updateSql, nextTargetId);
|
||||||
|
|
||||||
currentGroupId = nextTargetId;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -5,10 +5,12 @@ import org.springframework.batch.core.ItemWriteListener;
|
|||||||
import org.springframework.batch.item.Chunk;
|
import org.springframework.batch.item.Chunk;
|
||||||
import org.springframework.jdbc.core.JdbcTemplate;
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writer 성공 후 batch_flag P→S 업데이트 리스너
|
* Writer 성공 후 batch_flag P→S 업데이트 리스너
|
||||||
*
|
* 청크 내 모든 고유 job_execution_id에 대해 S 업데이트
|
||||||
* SQL은 실행 시점에 생성 (CommonSql.SOURCE_SCHEMA 초기화 보장)
|
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class BatchWriteListener<S extends JobExecutionGroupable> implements ItemWriteListener<S> {
|
public class BatchWriteListener<S extends JobExecutionGroupable> implements ItemWriteListener<S> {
|
||||||
@ -25,11 +27,14 @@ public class BatchWriteListener<S extends JobExecutionGroupable> implements Item
|
|||||||
public void afterWrite(Chunk<? extends S> items) {
|
public void afterWrite(Chunk<? extends S> items) {
|
||||||
if (items.isEmpty()) return;
|
if (items.isEmpty()) return;
|
||||||
|
|
||||||
Long jobExecutionId = items.getItems().get(0).getJobExecutionId();
|
|
||||||
|
|
||||||
try {
|
|
||||||
// SQL을 실행 시점에 생성하여 SOURCE_SCHEMA null 문제 방지
|
|
||||||
String sql = CommonSql.getCompleteBatchQuery(sourceTable);
|
String sql = CommonSql.getCompleteBatchQuery(sourceTable);
|
||||||
|
|
||||||
|
Set<Long> jobExecutionIds = items.getItems().stream()
|
||||||
|
.map(JobExecutionGroupable::getJobExecutionId)
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
|
for (Long jobExecutionId : jobExecutionIds) {
|
||||||
|
try {
|
||||||
int updatedRows = businessJdbcTemplate.update(sql, jobExecutionId);
|
int updatedRows = businessJdbcTemplate.update(sql, jobExecutionId);
|
||||||
log.info("[BatchWriteListener] Success update 'S'. jobExecutionId: {}, rows: {}", jobExecutionId, updatedRows);
|
log.info("[BatchWriteListener] Success update 'S'. jobExecutionId: {}, rows: {}", jobExecutionId, updatedRows);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@ -37,13 +42,16 @@ public class BatchWriteListener<S extends JobExecutionGroupable> implements Item
|
|||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onWriteError(Exception exception, Chunk<? extends S> items) {
|
public void onWriteError(Exception exception, Chunk<? extends S> items) {
|
||||||
if (!items.isEmpty()) {
|
if (!items.isEmpty()) {
|
||||||
Long jobExecutionId = items.getItems().get(0).getJobExecutionId();
|
Set<Long> ids = items.getItems().stream()
|
||||||
log.error("[BatchWriteListener] Write Error Detected! jobExecutionId: {}. Status will NOT be updated to 'S'. Error: {}",
|
.map(JobExecutionGroupable::getJobExecutionId)
|
||||||
jobExecutionId, exception.getMessage());
|
.collect(Collectors.toSet());
|
||||||
|
log.error("[BatchWriteListener] Write Error Detected! jobExecutionIds: {}. Status will NOT be updated to 'S'. Error: {}",
|
||||||
|
ids, exception.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (exception instanceof RuntimeException) {
|
if (exception instanceof RuntimeException) {
|
||||||
|
|||||||
@ -309,10 +309,13 @@ public class TableMetaInfo {
|
|||||||
|
|
||||||
// Risk & Compliance Tables
|
// Risk & Compliance Tables
|
||||||
@Value("${app.batch.target-schema.tables.risk-compliance-001}")
|
@Value("${app.batch.target-schema.tables.risk-compliance-001}")
|
||||||
public String targetTbShipRiskInfo;
|
public String targetTbShipRiskDetailInfo;
|
||||||
|
|
||||||
|
@Value("${app.batch.target-schema.tables.risk-compliance-001-1}")
|
||||||
|
public String targetTbShipRiskDetailInfoHstry;
|
||||||
|
|
||||||
@Value("${app.batch.target-schema.tables.risk-compliance-002}")
|
@Value("${app.batch.target-schema.tables.risk-compliance-002}")
|
||||||
public String targetTbShipRiskHstry;
|
public String targetTbShipRiskDetailHstry;
|
||||||
|
|
||||||
@Value("${app.batch.target-schema.tables.risk-compliance-003}")
|
@Value("${app.batch.target-schema.tables.risk-compliance-003}")
|
||||||
public String targetTbShipComplianceInfo;
|
public String targetTbShipComplianceInfo;
|
||||||
|
|||||||
@ -0,0 +1,114 @@
|
|||||||
|
package com.snp.batch.jobs.datasync.batch.risk.config;
|
||||||
|
|
||||||
|
import com.snp.batch.common.batch.config.BaseJobConfig;
|
||||||
|
import com.snp.batch.common.util.TableMetaInfo;
|
||||||
|
import com.snp.batch.jobs.datasync.batch.risk.dto.RiskChangeDto;
|
||||||
|
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskChangeEntity;
|
||||||
|
import com.snp.batch.jobs.datasync.batch.risk.processor.RiskChangeProcessor;
|
||||||
|
import com.snp.batch.jobs.datasync.batch.risk.reader.RiskChangeReader;
|
||||||
|
import com.snp.batch.jobs.datasync.batch.risk.repository.RiskRepository;
|
||||||
|
import com.snp.batch.jobs.datasync.batch.risk.writer.RiskChangeWriter;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.batch.core.Job;
|
||||||
|
import org.springframework.batch.core.Step;
|
||||||
|
import org.springframework.batch.core.configuration.annotation.StepScope;
|
||||||
|
import org.springframework.batch.core.job.builder.JobBuilder;
|
||||||
|
import org.springframework.batch.core.repository.JobRepository;
|
||||||
|
import org.springframework.batch.core.step.builder.StepBuilder;
|
||||||
|
import org.springframework.batch.item.ItemProcessor;
|
||||||
|
import org.springframework.batch.item.ItemReader;
|
||||||
|
import org.springframework.batch.item.ItemWriter;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.transaction.PlatformTransactionManager;
|
||||||
|
|
||||||
|
import javax.sql.DataSource;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Risk 값 변경 이력 관리 Job
|
||||||
|
* tb_ship_risk_detail_hstry (스냅샷)에서 시계열 비교 → tb_ship_risk_detail_info_hstry에 변경분 저장
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Configuration
|
||||||
|
public class RiskDetailChangeSyncJobConfig extends BaseJobConfig<RiskChangeDto, RiskChangeEntity> {
|
||||||
|
|
||||||
|
private final TableMetaInfo tableMetaInfo;
|
||||||
|
private final RiskRepository riskRepository;
|
||||||
|
private final DataSource batchDataSource;
|
||||||
|
private final String targetSchema;
|
||||||
|
|
||||||
|
private static final int CHUNK_SIZE = 1000;
|
||||||
|
|
||||||
|
public RiskDetailChangeSyncJobConfig(
|
||||||
|
JobRepository jobRepository,
|
||||||
|
PlatformTransactionManager transactionManager,
|
||||||
|
RiskRepository riskRepository,
|
||||||
|
TableMetaInfo tableMetaInfo,
|
||||||
|
@Qualifier("batchDataSource") DataSource batchDataSource,
|
||||||
|
@Value("${app.batch.target-schema.name}") String targetSchema) {
|
||||||
|
super(jobRepository, transactionManager);
|
||||||
|
this.riskRepository = riskRepository;
|
||||||
|
this.tableMetaInfo = tableMetaInfo;
|
||||||
|
this.batchDataSource = batchDataSource;
|
||||||
|
this.targetSchema = targetSchema;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getJobName() {
|
||||||
|
return "riskDetailChangeDataSyncJob";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getStepName() {
|
||||||
|
return "riskDetailChangeSyncStep";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ItemReader<RiskChangeDto> createReader() {
|
||||||
|
return riskChangeReader(batchDataSource, tableMetaInfo, targetSchema);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ItemProcessor<RiskChangeDto, RiskChangeEntity> createProcessor() {
|
||||||
|
return new RiskChangeProcessor();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ItemWriter<RiskChangeEntity> createWriter() {
|
||||||
|
return new RiskChangeWriter(riskRepository);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@StepScope
|
||||||
|
public ItemReader<RiskChangeDto> riskChangeReader(
|
||||||
|
@Qualifier("batchDataSource") DataSource batchDataSource,
|
||||||
|
TableMetaInfo tableMetaInfo,
|
||||||
|
@Value("${app.batch.target-schema.name}") String targetSchema) {
|
||||||
|
return new RiskChangeReader(batchDataSource, tableMetaInfo, targetSchema);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean(name = "riskDetailChangeSyncStep")
|
||||||
|
public Step riskDetailChangeSyncStep() {
|
||||||
|
log.info("Step 생성: riskDetailChangeSyncStep");
|
||||||
|
return new StepBuilder(getStepName(), jobRepository)
|
||||||
|
.<RiskChangeDto, RiskChangeEntity>chunk(CHUNK_SIZE, transactionManager)
|
||||||
|
.reader(createReader())
|
||||||
|
.processor(createProcessor())
|
||||||
|
.writer(createWriter())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Job createJobFlow(JobBuilder jobBuilder) {
|
||||||
|
return jobBuilder
|
||||||
|
.start(riskDetailChangeSyncStep())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean(name = "riskDetailChangeDataSyncJob")
|
||||||
|
public Job riskDetailChangeDataSyncJob() {
|
||||||
|
return job();
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,25 @@
|
|||||||
|
package com.snp.batch.jobs.datasync.batch.risk.dto;
|
||||||
|
|
||||||
|
import com.snp.batch.common.util.JobExecutionGroupable;
|
||||||
|
import lombok.*;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
@Builder
|
||||||
|
public class RiskChangeDto implements JobExecutionGroupable {
|
||||||
|
private Long jobExecutionId;
|
||||||
|
private String imoNo;
|
||||||
|
private LocalDateTime lastMdfcnDt;
|
||||||
|
private String flctnColNm;
|
||||||
|
private String bfrVal;
|
||||||
|
private String aftrVal;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long getJobExecutionId() {
|
||||||
|
return this.jobExecutionId;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,6 +1,8 @@
|
|||||||
package com.snp.batch.jobs.datasync.batch.risk.dto;
|
package com.snp.batch.jobs.datasync.batch.risk.dto;
|
||||||
|
|
||||||
import com.snp.batch.common.util.JobExecutionGroupable;
|
import com.snp.batch.common.util.JobExecutionGroupable;
|
||||||
import lombok.*;
|
import lombok.*;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
@ -12,46 +14,88 @@ public class RiskDto implements JobExecutionGroupable {
|
|||||||
private Long jobExecutionId;
|
private Long jobExecutionId;
|
||||||
private String imoNo;
|
private String imoNo;
|
||||||
private LocalDateTime lastMdfcnDt;
|
private LocalDateTime lastMdfcnDt;
|
||||||
private String riskDataMaint;
|
private Integer riskDataMaint;
|
||||||
private String aisNotrcvElpsDays;
|
private Integer aisNotrcvElpsDays;
|
||||||
private String aisLwrnkDays;
|
private String aisNotrcvElpsDaysDesc;
|
||||||
private String aisUpImoDesc;
|
private Integer aisLwrnkDays;
|
||||||
private String othrShipNmVoyYn;
|
private String aisLwrnkDaysDesc;
|
||||||
private String mmsiAnomMessage;
|
private Integer aisUpImoDesc;
|
||||||
private String recentDarkActv;
|
private String aisUpImoDescVal;
|
||||||
private String portPrtcll;
|
private Integer othrShipNmVoyYn;
|
||||||
private String portRisk;
|
private String othrShipNmVoyYnDesc;
|
||||||
private String stsJob;
|
private Integer mmsiAnomMessage;
|
||||||
private String driftChg;
|
private String mmsiAnomMessageDesc;
|
||||||
private String riskEvent;
|
private Integer recentDarkActv;
|
||||||
private String ntnltyChg;
|
private String recentDarkActvDesc;
|
||||||
private String ntnltyPrsMouPerf;
|
private Integer portPrtcll;
|
||||||
private String ntnltyTkyMouPerf;
|
private String portPrtcllDesc;
|
||||||
private String ntnltyUscgMouPerf;
|
private Integer portRisk;
|
||||||
private String uscgExclShipCert;
|
private String portRiskDesc;
|
||||||
private String pscInspectionElpsHr;
|
private Integer stsJob;
|
||||||
private String pscInspection;
|
private String stsJobDesc;
|
||||||
private String pscDefect;
|
private Integer driftChg;
|
||||||
private String pscDetained;
|
private String driftChgDesc;
|
||||||
private String nowSmgrcEvdc;
|
private Integer riskEvent;
|
||||||
private String doccChg;
|
private String riskEventDesc;
|
||||||
private String nowClfic;
|
private String riskEventDescExt;
|
||||||
private String clficStatusChg;
|
private Integer ntnltyChg;
|
||||||
private String pniInsrnc;
|
private String ntnltyChgDesc;
|
||||||
private String shipNmChg;
|
private Integer ntnltyPrsMouPerf;
|
||||||
private String gboChg;
|
private String ntnltyPrsMouPerfDesc;
|
||||||
private String vslage;
|
private Integer ntnltyTkyMouPerf;
|
||||||
private String ilglFshrViol;
|
private String ntnltyTkyMouPerfDesc;
|
||||||
private String draftChg;
|
private Integer ntnltyUscgMouPerf;
|
||||||
private String recentSanctionPrtcll;
|
private String ntnltyUscgMouPerfDesc;
|
||||||
private String snglShipVoy;
|
private Integer uscgExclShipCert;
|
||||||
private String fltsfty;
|
private String uscgExclShipCertDesc;
|
||||||
private String fltPsc;
|
private Integer pscInspectionElpsHr;
|
||||||
private String spcInspectionOvdue;
|
private String pscInspectionElpsHrDesc;
|
||||||
private String ownrUnk;
|
private Integer pscInspection;
|
||||||
private String rssPortCall;
|
private String pscInspectionDesc;
|
||||||
private String rssOwnrReg;
|
private Integer pscDefect;
|
||||||
private String rssSts;
|
private String pscDefectDesc;
|
||||||
|
private Integer pscDetained;
|
||||||
|
private String pscDetainedDesc;
|
||||||
|
private Integer nowSmgrcEvdc;
|
||||||
|
private String nowSmgrcEvdcDesc;
|
||||||
|
private Integer doccChg;
|
||||||
|
private String doccChgDesc;
|
||||||
|
private Integer nowClfic;
|
||||||
|
private String nowClficDesc;
|
||||||
|
private String nowClficDescExt;
|
||||||
|
private Integer clficStatusChg;
|
||||||
|
private String clficStatusChgDesc;
|
||||||
|
private Integer pniInsrnc;
|
||||||
|
private String pniInsrncDesc;
|
||||||
|
private String pniInsrncDescExt;
|
||||||
|
private Integer shipNmChg;
|
||||||
|
private String shipNmChgDesc;
|
||||||
|
private Integer gboChg;
|
||||||
|
private String gboChgDesc;
|
||||||
|
private Integer vslage;
|
||||||
|
private String vslageDesc;
|
||||||
|
private Integer ilglFshrViol;
|
||||||
|
private String ilglFshrViolDesc;
|
||||||
|
private Integer draftChg;
|
||||||
|
private String draftChgDesc;
|
||||||
|
private Integer recentSanctionPrtcll;
|
||||||
|
private String recentSanctionPrtcllDesc;
|
||||||
|
private Integer snglShipVoy;
|
||||||
|
private String snglShipVoyDesc;
|
||||||
|
private Integer fltsfty;
|
||||||
|
private String fltsftyDesc;
|
||||||
|
private Integer fltPsc;
|
||||||
|
private String fltPscDesc;
|
||||||
|
private Integer spcInspectionOvdue;
|
||||||
|
private String spcInspectionOvdueDesc;
|
||||||
|
private Integer ownrUnk;
|
||||||
|
private String ownrUnkDesc;
|
||||||
|
private Integer rssPortCall;
|
||||||
|
private String rssPortCallDesc;
|
||||||
|
private Integer rssOwnrReg;
|
||||||
|
private String rssOwnrRegDesc;
|
||||||
|
private Integer rssSts;
|
||||||
|
private String rssStsDesc;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Long getJobExecutionId() {
|
public Long getJobExecutionId() {
|
||||||
|
|||||||
@ -0,0 +1,25 @@
|
|||||||
|
package com.snp.batch.jobs.datasync.batch.risk.entity;
|
||||||
|
|
||||||
|
import com.snp.batch.common.util.JobExecutionGroupable;
|
||||||
|
import lombok.*;
|
||||||
|
import lombok.experimental.SuperBuilder;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@SuperBuilder
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class RiskChangeEntity implements JobExecutionGroupable {
|
||||||
|
private String imoNo;
|
||||||
|
private LocalDateTime lastMdfcnDt;
|
||||||
|
private String flctnColNm;
|
||||||
|
private String bfrVal;
|
||||||
|
private String aftrVal;
|
||||||
|
|
||||||
|
private Long jobExecutionId;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long getJobExecutionId() {
|
||||||
|
return this.jobExecutionId;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,7 +1,9 @@
|
|||||||
package com.snp.batch.jobs.datasync.batch.risk.entity;
|
package com.snp.batch.jobs.datasync.batch.risk.entity;
|
||||||
|
|
||||||
import com.snp.batch.common.util.JobExecutionGroupable;
|
import com.snp.batch.common.util.JobExecutionGroupable;
|
||||||
import lombok.*;
|
import lombok.*;
|
||||||
import lombok.experimental.SuperBuilder;
|
import lombok.experimental.SuperBuilder;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
@ -10,46 +12,88 @@ import java.time.LocalDateTime;
|
|||||||
public class RiskEntity implements JobExecutionGroupable {
|
public class RiskEntity implements JobExecutionGroupable {
|
||||||
private String imoNo;
|
private String imoNo;
|
||||||
private LocalDateTime lastMdfcnDt;
|
private LocalDateTime lastMdfcnDt;
|
||||||
private String riskDataMaint;
|
private Integer riskDataMaint;
|
||||||
private String aisNotrcvElpsDays;
|
private Integer aisNotrcvElpsDays;
|
||||||
private String aisLwrnkDays;
|
private String aisNotrcvElpsDaysDesc;
|
||||||
private String aisUpImoDesc;
|
private Integer aisLwrnkDays;
|
||||||
private String othrShipNmVoyYn;
|
private String aisLwrnkDaysDesc;
|
||||||
private String mmsiAnomMessage;
|
private Integer aisUpImoDesc;
|
||||||
private String recentDarkActv;
|
private String aisUpImoDescVal;
|
||||||
private String portPrtcll;
|
private Integer othrShipNmVoyYn;
|
||||||
private String portRisk;
|
private String othrShipNmVoyYnDesc;
|
||||||
private String stsJob;
|
private Integer mmsiAnomMessage;
|
||||||
private String driftChg;
|
private String mmsiAnomMessageDesc;
|
||||||
private String riskEvent;
|
private Integer recentDarkActv;
|
||||||
private String ntnltyChg;
|
private String recentDarkActvDesc;
|
||||||
private String ntnltyPrsMouPerf;
|
private Integer portPrtcll;
|
||||||
private String ntnltyTkyMouPerf;
|
private String portPrtcllDesc;
|
||||||
private String ntnltyUscgMouPerf;
|
private Integer portRisk;
|
||||||
private String uscgExclShipCert;
|
private String portRiskDesc;
|
||||||
private String pscInspectionElpsHr;
|
private Integer stsJob;
|
||||||
private String pscInspection;
|
private String stsJobDesc;
|
||||||
private String pscDefect;
|
private Integer driftChg;
|
||||||
private String pscDetained;
|
private String driftChgDesc;
|
||||||
private String nowSmgrcEvdc;
|
private Integer riskEvent;
|
||||||
private String doccChg;
|
private String riskEventDesc;
|
||||||
private String nowClfic;
|
private String riskEventDescExt;
|
||||||
private String clficStatusChg;
|
private Integer ntnltyChg;
|
||||||
private String pniInsrnc;
|
private String ntnltyChgDesc;
|
||||||
private String shipNmChg;
|
private Integer ntnltyPrsMouPerf;
|
||||||
private String gboChg;
|
private String ntnltyPrsMouPerfDesc;
|
||||||
private String vslage;
|
private Integer ntnltyTkyMouPerf;
|
||||||
private String ilglFshrViol;
|
private String ntnltyTkyMouPerfDesc;
|
||||||
private String draftChg;
|
private Integer ntnltyUscgMouPerf;
|
||||||
private String recentSanctionPrtcll;
|
private String ntnltyUscgMouPerfDesc;
|
||||||
private String snglShipVoy;
|
private Integer uscgExclShipCert;
|
||||||
private String fltsfty;
|
private String uscgExclShipCertDesc;
|
||||||
private String fltPsc;
|
private Integer pscInspectionElpsHr;
|
||||||
private String spcInspectionOvdue;
|
private String pscInspectionElpsHrDesc;
|
||||||
private String ownrUnk;
|
private Integer pscInspection;
|
||||||
private String rssPortCall;
|
private String pscInspectionDesc;
|
||||||
private String rssOwnrReg;
|
private Integer pscDefect;
|
||||||
private String rssSts;
|
private String pscDefectDesc;
|
||||||
|
private Integer pscDetained;
|
||||||
|
private String pscDetainedDesc;
|
||||||
|
private Integer nowSmgrcEvdc;
|
||||||
|
private String nowSmgrcEvdcDesc;
|
||||||
|
private Integer doccChg;
|
||||||
|
private String doccChgDesc;
|
||||||
|
private Integer nowClfic;
|
||||||
|
private String nowClficDesc;
|
||||||
|
private String nowClficDescExt;
|
||||||
|
private Integer clficStatusChg;
|
||||||
|
private String clficStatusChgDesc;
|
||||||
|
private Integer pniInsrnc;
|
||||||
|
private String pniInsrncDesc;
|
||||||
|
private String pniInsrncDescExt;
|
||||||
|
private Integer shipNmChg;
|
||||||
|
private String shipNmChgDesc;
|
||||||
|
private Integer gboChg;
|
||||||
|
private String gboChgDesc;
|
||||||
|
private Integer vslage;
|
||||||
|
private String vslageDesc;
|
||||||
|
private Integer ilglFshrViol;
|
||||||
|
private String ilglFshrViolDesc;
|
||||||
|
private Integer draftChg;
|
||||||
|
private String draftChgDesc;
|
||||||
|
private Integer recentSanctionPrtcll;
|
||||||
|
private String recentSanctionPrtcllDesc;
|
||||||
|
private Integer snglShipVoy;
|
||||||
|
private String snglShipVoyDesc;
|
||||||
|
private Integer fltsfty;
|
||||||
|
private String fltsftyDesc;
|
||||||
|
private Integer fltPsc;
|
||||||
|
private String fltPscDesc;
|
||||||
|
private Integer spcInspectionOvdue;
|
||||||
|
private String spcInspectionOvdueDesc;
|
||||||
|
private Integer ownrUnk;
|
||||||
|
private String ownrUnkDesc;
|
||||||
|
private Integer rssPortCall;
|
||||||
|
private String rssPortCallDesc;
|
||||||
|
private Integer rssOwnrReg;
|
||||||
|
private String rssOwnrRegDesc;
|
||||||
|
private Integer rssSts;
|
||||||
|
private String rssStsDesc;
|
||||||
|
|
||||||
private Long jobExecutionId;
|
private Long jobExecutionId;
|
||||||
|
|
||||||
|
|||||||
@ -0,0 +1,21 @@
|
|||||||
|
package com.snp.batch.jobs.datasync.batch.risk.processor;
|
||||||
|
|
||||||
|
import com.snp.batch.common.batch.processor.BaseProcessor;
|
||||||
|
import com.snp.batch.jobs.datasync.batch.risk.dto.RiskChangeDto;
|
||||||
|
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskChangeEntity;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class RiskChangeProcessor extends BaseProcessor<RiskChangeDto, RiskChangeEntity> {
|
||||||
|
@Override
|
||||||
|
protected RiskChangeEntity processItem(RiskChangeDto dto) throws Exception {
|
||||||
|
return RiskChangeEntity.builder()
|
||||||
|
.jobExecutionId(dto.getJobExecutionId())
|
||||||
|
.imoNo(dto.getImoNo())
|
||||||
|
.lastMdfcnDt(dto.getLastMdfcnDt())
|
||||||
|
.flctnColNm(dto.getFlctnColNm())
|
||||||
|
.bfrVal(dto.getBfrVal())
|
||||||
|
.aftrVal(dto.getAftrVal())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -15,44 +15,86 @@ public class RiskProcessor extends BaseProcessor<RiskDto, RiskEntity> {
|
|||||||
.lastMdfcnDt(dto.getLastMdfcnDt())
|
.lastMdfcnDt(dto.getLastMdfcnDt())
|
||||||
.riskDataMaint(dto.getRiskDataMaint())
|
.riskDataMaint(dto.getRiskDataMaint())
|
||||||
.aisNotrcvElpsDays(dto.getAisNotrcvElpsDays())
|
.aisNotrcvElpsDays(dto.getAisNotrcvElpsDays())
|
||||||
|
.aisNotrcvElpsDaysDesc(dto.getAisNotrcvElpsDaysDesc())
|
||||||
.aisLwrnkDays(dto.getAisLwrnkDays())
|
.aisLwrnkDays(dto.getAisLwrnkDays())
|
||||||
|
.aisLwrnkDaysDesc(dto.getAisLwrnkDaysDesc())
|
||||||
.aisUpImoDesc(dto.getAisUpImoDesc())
|
.aisUpImoDesc(dto.getAisUpImoDesc())
|
||||||
|
.aisUpImoDescVal(dto.getAisUpImoDescVal())
|
||||||
.othrShipNmVoyYn(dto.getOthrShipNmVoyYn())
|
.othrShipNmVoyYn(dto.getOthrShipNmVoyYn())
|
||||||
|
.othrShipNmVoyYnDesc(dto.getOthrShipNmVoyYnDesc())
|
||||||
.mmsiAnomMessage(dto.getMmsiAnomMessage())
|
.mmsiAnomMessage(dto.getMmsiAnomMessage())
|
||||||
|
.mmsiAnomMessageDesc(dto.getMmsiAnomMessageDesc())
|
||||||
.recentDarkActv(dto.getRecentDarkActv())
|
.recentDarkActv(dto.getRecentDarkActv())
|
||||||
|
.recentDarkActvDesc(dto.getRecentDarkActvDesc())
|
||||||
.portPrtcll(dto.getPortPrtcll())
|
.portPrtcll(dto.getPortPrtcll())
|
||||||
|
.portPrtcllDesc(dto.getPortPrtcllDesc())
|
||||||
.portRisk(dto.getPortRisk())
|
.portRisk(dto.getPortRisk())
|
||||||
|
.portRiskDesc(dto.getPortRiskDesc())
|
||||||
.stsJob(dto.getStsJob())
|
.stsJob(dto.getStsJob())
|
||||||
|
.stsJobDesc(dto.getStsJobDesc())
|
||||||
.driftChg(dto.getDriftChg())
|
.driftChg(dto.getDriftChg())
|
||||||
|
.driftChgDesc(dto.getDriftChgDesc())
|
||||||
.riskEvent(dto.getRiskEvent())
|
.riskEvent(dto.getRiskEvent())
|
||||||
|
.riskEventDesc(dto.getRiskEventDesc())
|
||||||
|
.riskEventDescExt(dto.getRiskEventDescExt())
|
||||||
.ntnltyChg(dto.getNtnltyChg())
|
.ntnltyChg(dto.getNtnltyChg())
|
||||||
|
.ntnltyChgDesc(dto.getNtnltyChgDesc())
|
||||||
.ntnltyPrsMouPerf(dto.getNtnltyPrsMouPerf())
|
.ntnltyPrsMouPerf(dto.getNtnltyPrsMouPerf())
|
||||||
|
.ntnltyPrsMouPerfDesc(dto.getNtnltyPrsMouPerfDesc())
|
||||||
.ntnltyTkyMouPerf(dto.getNtnltyTkyMouPerf())
|
.ntnltyTkyMouPerf(dto.getNtnltyTkyMouPerf())
|
||||||
|
.ntnltyTkyMouPerfDesc(dto.getNtnltyTkyMouPerfDesc())
|
||||||
.ntnltyUscgMouPerf(dto.getNtnltyUscgMouPerf())
|
.ntnltyUscgMouPerf(dto.getNtnltyUscgMouPerf())
|
||||||
|
.ntnltyUscgMouPerfDesc(dto.getNtnltyUscgMouPerfDesc())
|
||||||
.uscgExclShipCert(dto.getUscgExclShipCert())
|
.uscgExclShipCert(dto.getUscgExclShipCert())
|
||||||
|
.uscgExclShipCertDesc(dto.getUscgExclShipCertDesc())
|
||||||
.pscInspectionElpsHr(dto.getPscInspectionElpsHr())
|
.pscInspectionElpsHr(dto.getPscInspectionElpsHr())
|
||||||
|
.pscInspectionElpsHrDesc(dto.getPscInspectionElpsHrDesc())
|
||||||
.pscInspection(dto.getPscInspection())
|
.pscInspection(dto.getPscInspection())
|
||||||
|
.pscInspectionDesc(dto.getPscInspectionDesc())
|
||||||
.pscDefect(dto.getPscDefect())
|
.pscDefect(dto.getPscDefect())
|
||||||
|
.pscDefectDesc(dto.getPscDefectDesc())
|
||||||
.pscDetained(dto.getPscDetained())
|
.pscDetained(dto.getPscDetained())
|
||||||
|
.pscDetainedDesc(dto.getPscDetainedDesc())
|
||||||
.nowSmgrcEvdc(dto.getNowSmgrcEvdc())
|
.nowSmgrcEvdc(dto.getNowSmgrcEvdc())
|
||||||
|
.nowSmgrcEvdcDesc(dto.getNowSmgrcEvdcDesc())
|
||||||
.doccChg(dto.getDoccChg())
|
.doccChg(dto.getDoccChg())
|
||||||
|
.doccChgDesc(dto.getDoccChgDesc())
|
||||||
.nowClfic(dto.getNowClfic())
|
.nowClfic(dto.getNowClfic())
|
||||||
|
.nowClficDesc(dto.getNowClficDesc())
|
||||||
|
.nowClficDescExt(dto.getNowClficDescExt())
|
||||||
.clficStatusChg(dto.getClficStatusChg())
|
.clficStatusChg(dto.getClficStatusChg())
|
||||||
|
.clficStatusChgDesc(dto.getClficStatusChgDesc())
|
||||||
.pniInsrnc(dto.getPniInsrnc())
|
.pniInsrnc(dto.getPniInsrnc())
|
||||||
|
.pniInsrncDesc(dto.getPniInsrncDesc())
|
||||||
|
.pniInsrncDescExt(dto.getPniInsrncDescExt())
|
||||||
.shipNmChg(dto.getShipNmChg())
|
.shipNmChg(dto.getShipNmChg())
|
||||||
|
.shipNmChgDesc(dto.getShipNmChgDesc())
|
||||||
.gboChg(dto.getGboChg())
|
.gboChg(dto.getGboChg())
|
||||||
|
.gboChgDesc(dto.getGboChgDesc())
|
||||||
.vslage(dto.getVslage())
|
.vslage(dto.getVslage())
|
||||||
|
.vslageDesc(dto.getVslageDesc())
|
||||||
.ilglFshrViol(dto.getIlglFshrViol())
|
.ilglFshrViol(dto.getIlglFshrViol())
|
||||||
|
.ilglFshrViolDesc(dto.getIlglFshrViolDesc())
|
||||||
.draftChg(dto.getDraftChg())
|
.draftChg(dto.getDraftChg())
|
||||||
|
.draftChgDesc(dto.getDraftChgDesc())
|
||||||
.recentSanctionPrtcll(dto.getRecentSanctionPrtcll())
|
.recentSanctionPrtcll(dto.getRecentSanctionPrtcll())
|
||||||
|
.recentSanctionPrtcllDesc(dto.getRecentSanctionPrtcllDesc())
|
||||||
.snglShipVoy(dto.getSnglShipVoy())
|
.snglShipVoy(dto.getSnglShipVoy())
|
||||||
|
.snglShipVoyDesc(dto.getSnglShipVoyDesc())
|
||||||
.fltsfty(dto.getFltsfty())
|
.fltsfty(dto.getFltsfty())
|
||||||
|
.fltsftyDesc(dto.getFltsftyDesc())
|
||||||
.fltPsc(dto.getFltPsc())
|
.fltPsc(dto.getFltPsc())
|
||||||
|
.fltPscDesc(dto.getFltPscDesc())
|
||||||
.spcInspectionOvdue(dto.getSpcInspectionOvdue())
|
.spcInspectionOvdue(dto.getSpcInspectionOvdue())
|
||||||
|
.spcInspectionOvdueDesc(dto.getSpcInspectionOvdueDesc())
|
||||||
.ownrUnk(dto.getOwnrUnk())
|
.ownrUnk(dto.getOwnrUnk())
|
||||||
|
.ownrUnkDesc(dto.getOwnrUnkDesc())
|
||||||
.rssPortCall(dto.getRssPortCall())
|
.rssPortCall(dto.getRssPortCall())
|
||||||
|
.rssPortCallDesc(dto.getRssPortCallDesc())
|
||||||
.rssOwnrReg(dto.getRssOwnrReg())
|
.rssOwnrReg(dto.getRssOwnrReg())
|
||||||
|
.rssOwnrRegDesc(dto.getRssOwnrRegDesc())
|
||||||
.rssSts(dto.getRssSts())
|
.rssSts(dto.getRssSts())
|
||||||
|
.rssStsDesc(dto.getRssStsDesc())
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,163 @@
|
|||||||
|
package com.snp.batch.jobs.datasync.batch.risk.reader;
|
||||||
|
|
||||||
|
import com.snp.batch.common.util.TableMetaInfo;
|
||||||
|
import com.snp.batch.jobs.datasync.batch.risk.dto.RiskChangeDto;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.batch.item.ItemReader;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
|
||||||
|
import javax.sql.DataSource;
|
||||||
|
import java.sql.Timestamp;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Risk 값 변경 이력 Reader
|
||||||
|
* tb_ship_risk_detail_hstry (스냅샷 이력)에서 imo_no별 시계열 비교하여 변경분 감지
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class RiskChangeReader implements ItemReader<RiskChangeDto> {
|
||||||
|
|
||||||
|
private final TableMetaInfo tableMetaInfo;
|
||||||
|
private final JdbcTemplate batchJdbcTemplate;
|
||||||
|
private final String targetSchema;
|
||||||
|
private List<RiskChangeDto> changeBuffer = new ArrayList<>();
|
||||||
|
private boolean initialized = false;
|
||||||
|
|
||||||
|
private static final List<String> INDICATOR_COLUMNS = List.of(
|
||||||
|
"ais_notrcv_elps_days", "ais_lwrnk_days", "ais_up_imo_desc",
|
||||||
|
"othr_ship_nm_voy_yn", "mmsi_anom_message", "recent_dark_actv",
|
||||||
|
"port_prtcll", "port_risk", "sts_job", "drift_chg",
|
||||||
|
"risk_event", "ntnlty_chg", "ntnlty_prs_mou_perf",
|
||||||
|
"ntnlty_tky_mou_perf", "ntnlty_uscg_mou_perf", "uscg_excl_ship_cert",
|
||||||
|
"psc_inspection_elps_hr", "psc_inspection", "psc_defect", "psc_detained",
|
||||||
|
"now_smgrc_evdc", "docc_chg", "now_clfic", "clfic_status_chg",
|
||||||
|
"pni_insrnc", "ship_nm_chg", "gbo_chg", "vslage",
|
||||||
|
"ilgl_fshr_viol", "draft_chg", "recent_sanction_prtcll", "sngl_ship_voy",
|
||||||
|
"fltsfty", "flt_psc", "spc_inspection_ovdue", "ownr_unk",
|
||||||
|
"rss_port_call", "rss_ownr_reg", "rss_sts"
|
||||||
|
);
|
||||||
|
|
||||||
|
public RiskChangeReader(@Qualifier("batchDataSource") DataSource batchDataSource,
|
||||||
|
TableMetaInfo tableMetaInfo,
|
||||||
|
@Value("${app.batch.target-schema.name}") String targetSchema) {
|
||||||
|
this.batchJdbcTemplate = new JdbcTemplate(batchDataSource);
|
||||||
|
this.tableMetaInfo = tableMetaInfo;
|
||||||
|
this.targetSchema = targetSchema;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RiskChangeDto read() throws Exception {
|
||||||
|
if (!initialized) {
|
||||||
|
initializeChangeBuffer();
|
||||||
|
initialized = true;
|
||||||
|
}
|
||||||
|
if (changeBuffer.isEmpty()) return null;
|
||||||
|
return changeBuffer.remove(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initializeChangeBuffer() {
|
||||||
|
log.info("[RiskChangeReader] 변경 이력 감지 시작");
|
||||||
|
|
||||||
|
// 스냅샷 이력에서 indicator 컬럼만 조회 (imo_no, last_mdfcn_dt 순 정렬)
|
||||||
|
String columnList = String.join(", ", INDICATOR_COLUMNS);
|
||||||
|
String sql = String.format("""
|
||||||
|
SELECT imo_no, last_mdfcn_dt, %s
|
||||||
|
FROM %s.%s
|
||||||
|
ORDER BY imo_no, last_mdfcn_dt ASC
|
||||||
|
""", columnList, targetSchema, tableMetaInfo.targetTbShipRiskDetailHstry);
|
||||||
|
|
||||||
|
List<Map<String, Object>> allData = batchJdbcTemplate.queryForList(sql);
|
||||||
|
log.info("[RiskChangeReader] 스냅샷 이력 조회 완료: {} 건", allData.size());
|
||||||
|
|
||||||
|
// 기존 변경 이력 조회 (중복 방지)
|
||||||
|
String existingSql = String.format("""
|
||||||
|
SELECT imo_no, last_mdfcn_dt, flctn_col_nm
|
||||||
|
FROM %s.%s
|
||||||
|
""", targetSchema, tableMetaInfo.targetTbShipRiskDetailInfoHstry);
|
||||||
|
|
||||||
|
Set<String> existingKeys = new HashSet<>();
|
||||||
|
try {
|
||||||
|
List<Map<String, Object>> existingData = batchJdbcTemplate.queryForList(existingSql);
|
||||||
|
for (Map<String, Object> row : existingData) {
|
||||||
|
existingKeys.add(buildExistingKey(row));
|
||||||
|
}
|
||||||
|
log.info("[RiskChangeReader] 기존 변경 이력 데이터: {} 건", existingKeys.size());
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("[RiskChangeReader] 기존 데이터 조회 실패: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
// imo_no별 그룹핑
|
||||||
|
Map<String, List<Map<String, Object>>> groupedByImo = new LinkedHashMap<>();
|
||||||
|
for (Map<String, Object> row : allData) {
|
||||||
|
String imoNo = (String) row.get("imo_no");
|
||||||
|
groupedByImo.computeIfAbsent(imoNo, k -> new ArrayList<>()).add(row);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 각 imo_no별 시계열 비교
|
||||||
|
long changeCount = 0;
|
||||||
|
for (Map.Entry<String, List<Map<String, Object>>> entry : groupedByImo.entrySet()) {
|
||||||
|
String imoNo = entry.getKey();
|
||||||
|
List<Map<String, Object>> records = entry.getValue();
|
||||||
|
|
||||||
|
Map<String, Object> previousRecord = null;
|
||||||
|
for (Map<String, Object> currentRecord : records) {
|
||||||
|
if (previousRecord != null) {
|
||||||
|
List<RiskChangeDto> changes = detectChanges(imoNo, previousRecord, currentRecord, existingKeys);
|
||||||
|
changeBuffer.addAll(changes);
|
||||||
|
changeCount += changes.size();
|
||||||
|
}
|
||||||
|
previousRecord = currentRecord;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("[RiskChangeReader] 변경 이력 감지 완료: {} 건 (imo_no 그룹: {} 개)", changeCount, groupedByImo.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<RiskChangeDto> detectChanges(String imoNo,
|
||||||
|
Map<String, Object> previousRecord,
|
||||||
|
Map<String, Object> currentRecord,
|
||||||
|
Set<String> existingKeys) {
|
||||||
|
List<RiskChangeDto> changes = new ArrayList<>();
|
||||||
|
Timestamp currentTs = (Timestamp) currentRecord.get("last_mdfcn_dt");
|
||||||
|
LocalDateTime lastMdfcnDt = currentTs != null ? currentTs.toLocalDateTime() : null;
|
||||||
|
|
||||||
|
for (String column : INDICATOR_COLUMNS) {
|
||||||
|
Object prevValue = previousRecord.get(column);
|
||||||
|
Object currValue = currentRecord.get(column);
|
||||||
|
|
||||||
|
if (!Objects.equals(prevValue, currValue)) {
|
||||||
|
String existingKey = imoNo + "|" + lastMdfcnDt + "|" + column;
|
||||||
|
if (existingKeys.contains(existingKey)) continue;
|
||||||
|
|
||||||
|
changes.add(RiskChangeDto.builder()
|
||||||
|
.jobExecutionId(0L)
|
||||||
|
.imoNo(imoNo)
|
||||||
|
.lastMdfcnDt(lastMdfcnDt)
|
||||||
|
.flctnColNm(column)
|
||||||
|
.bfrVal(convertToString(prevValue))
|
||||||
|
.aftrVal(convertToString(currValue))
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return changes;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String buildExistingKey(Map<String, Object> row) {
|
||||||
|
String imoNo = (String) row.get("imo_no");
|
||||||
|
Object lastMdfcnDtObj = row.get("last_mdfcn_dt");
|
||||||
|
LocalDateTime lastMdfcnDt = null;
|
||||||
|
if (lastMdfcnDtObj instanceof Timestamp) {
|
||||||
|
lastMdfcnDt = ((Timestamp) lastMdfcnDtObj).toLocalDateTime();
|
||||||
|
}
|
||||||
|
String flctnColNm = (String) row.get("flctn_col_nm");
|
||||||
|
return imoNo + "|" + lastMdfcnDt + "|" + flctnColNm;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String convertToString(Object value) {
|
||||||
|
if (value == null) return "null";
|
||||||
|
return String.valueOf(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -31,46 +31,88 @@ public class RiskReader extends BaseSyncReader<RiskDto> {
|
|||||||
.jobExecutionId(targetId)
|
.jobExecutionId(targetId)
|
||||||
.imoNo(rs.getString("imo_no"))
|
.imoNo(rs.getString("imo_no"))
|
||||||
.lastMdfcnDt(lastMdfcnDtTs != null ? lastMdfcnDtTs.toLocalDateTime() : null)
|
.lastMdfcnDt(lastMdfcnDtTs != null ? lastMdfcnDtTs.toLocalDateTime() : null)
|
||||||
.riskDataMaint(rs.getString("risk_data_maint"))
|
.riskDataMaint(rs.getObject("risk_data_maint") != null ? rs.getInt("risk_data_maint") : null)
|
||||||
.aisNotrcvElpsDays(rs.getString("ais_notrcv_elps_days"))
|
.aisNotrcvElpsDays(rs.getObject("ais_notrcv_elps_days") != null ? rs.getInt("ais_notrcv_elps_days") : null)
|
||||||
.aisLwrnkDays(rs.getString("ais_lwrnk_days"))
|
.aisNotrcvElpsDaysDesc(rs.getString("ais_notrcv_elps_days_desc"))
|
||||||
.aisUpImoDesc(rs.getString("ais_up_imo_desc"))
|
.aisLwrnkDays(rs.getObject("ais_lwrnk_days") != null ? rs.getInt("ais_lwrnk_days") : null)
|
||||||
.othrShipNmVoyYn(rs.getString("othr_ship_nm_voy_yn"))
|
.aisLwrnkDaysDesc(rs.getString("ais_lwrnk_days_desc"))
|
||||||
.mmsiAnomMessage(rs.getString("mmsi_anom_message"))
|
.aisUpImoDesc(rs.getObject("ais_up_imo_desc") != null ? rs.getInt("ais_up_imo_desc") : null)
|
||||||
.recentDarkActv(rs.getString("recent_dark_actv"))
|
.aisUpImoDescVal(rs.getString("ais_up_imo_desc_val"))
|
||||||
.portPrtcll(rs.getString("port_prtcll"))
|
.othrShipNmVoyYn(rs.getObject("othr_ship_nm_voy_yn") != null ? rs.getInt("othr_ship_nm_voy_yn") : null)
|
||||||
.portRisk(rs.getString("port_risk"))
|
.othrShipNmVoyYnDesc(rs.getString("othr_ship_nm_voy_yn_desc"))
|
||||||
.stsJob(rs.getString("sts_job"))
|
.mmsiAnomMessage(rs.getObject("mmsi_anom_message") != null ? rs.getInt("mmsi_anom_message") : null)
|
||||||
.driftChg(rs.getString("drift_chg"))
|
.mmsiAnomMessageDesc(rs.getString("mmsi_anom_message_desc"))
|
||||||
.riskEvent(rs.getString("risk_event"))
|
.recentDarkActv(rs.getObject("recent_dark_actv") != null ? rs.getInt("recent_dark_actv") : null)
|
||||||
.ntnltyChg(rs.getString("ntnlty_chg"))
|
.recentDarkActvDesc(rs.getString("recent_dark_actv_desc"))
|
||||||
.ntnltyPrsMouPerf(rs.getString("ntnlty_prs_mou_perf"))
|
.portPrtcll(rs.getObject("port_prtcll") != null ? rs.getInt("port_prtcll") : null)
|
||||||
.ntnltyTkyMouPerf(rs.getString("ntnlty_tky_mou_perf"))
|
.portPrtcllDesc(rs.getString("port_prtcll_desc"))
|
||||||
.ntnltyUscgMouPerf(rs.getString("ntnlty_uscg_mou_perf"))
|
.portRisk(rs.getObject("port_risk") != null ? rs.getInt("port_risk") : null)
|
||||||
.uscgExclShipCert(rs.getString("uscg_excl_ship_cert"))
|
.portRiskDesc(rs.getString("port_risk_desc"))
|
||||||
.pscInspectionElpsHr(rs.getString("psc_inspection_elps_hr"))
|
.stsJob(rs.getObject("sts_job") != null ? rs.getInt("sts_job") : null)
|
||||||
.pscInspection(rs.getString("psc_inspection"))
|
.stsJobDesc(rs.getString("sts_job_desc"))
|
||||||
.pscDefect(rs.getString("psc_defect"))
|
.driftChg(rs.getObject("drift_chg") != null ? rs.getInt("drift_chg") : null)
|
||||||
.pscDetained(rs.getString("psc_detained"))
|
.driftChgDesc(rs.getString("drift_chg_desc"))
|
||||||
.nowSmgrcEvdc(rs.getString("now_smgrc_evdc"))
|
.riskEvent(rs.getObject("risk_event") != null ? rs.getInt("risk_event") : null)
|
||||||
.doccChg(rs.getString("docc_chg"))
|
.riskEventDesc(rs.getString("risk_event_desc"))
|
||||||
.nowClfic(rs.getString("now_clfic"))
|
.riskEventDescExt(rs.getString("risk_event_desc_ext"))
|
||||||
.clficStatusChg(rs.getString("clfic_status_chg"))
|
.ntnltyChg(rs.getObject("ntnlty_chg") != null ? rs.getInt("ntnlty_chg") : null)
|
||||||
.pniInsrnc(rs.getString("pni_insrnc"))
|
.ntnltyChgDesc(rs.getString("ntnlty_chg_desc"))
|
||||||
.shipNmChg(rs.getString("ship_nm_chg"))
|
.ntnltyPrsMouPerf(rs.getObject("ntnlty_prs_mou_perf") != null ? rs.getInt("ntnlty_prs_mou_perf") : null)
|
||||||
.gboChg(rs.getString("gbo_chg"))
|
.ntnltyPrsMouPerfDesc(rs.getString("ntnlty_prs_mou_perf_desc"))
|
||||||
.vslage(rs.getString("vslage"))
|
.ntnltyTkyMouPerf(rs.getObject("ntnlty_tky_mou_perf") != null ? rs.getInt("ntnlty_tky_mou_perf") : null)
|
||||||
.ilglFshrViol(rs.getString("ilgl_fshr_viol"))
|
.ntnltyTkyMouPerfDesc(rs.getString("ntnlty_tky_mou_perf_desc"))
|
||||||
.draftChg(rs.getString("draft_chg"))
|
.ntnltyUscgMouPerf(rs.getObject("ntnlty_uscg_mou_perf") != null ? rs.getInt("ntnlty_uscg_mou_perf") : null)
|
||||||
.recentSanctionPrtcll(rs.getString("recent_sanction_prtcll"))
|
.ntnltyUscgMouPerfDesc(rs.getString("ntnlty_uscg_mou_perf_desc"))
|
||||||
.snglShipVoy(rs.getString("sngl_ship_voy"))
|
.uscgExclShipCert(rs.getObject("uscg_excl_ship_cert") != null ? rs.getInt("uscg_excl_ship_cert") : null)
|
||||||
.fltsfty(rs.getString("fltsfty"))
|
.uscgExclShipCertDesc(rs.getString("uscg_excl_ship_cert_desc"))
|
||||||
.fltPsc(rs.getString("flt_psc"))
|
.pscInspectionElpsHr(rs.getObject("psc_inspection_elps_hr") != null ? rs.getInt("psc_inspection_elps_hr") : null)
|
||||||
.spcInspectionOvdue(rs.getString("spc_inspection_ovdue"))
|
.pscInspectionElpsHrDesc(rs.getString("psc_inspection_elps_hr_desc"))
|
||||||
.ownrUnk(rs.getString("ownr_unk"))
|
.pscInspection(rs.getObject("psc_inspection") != null ? rs.getInt("psc_inspection") : null)
|
||||||
.rssPortCall(rs.getString("rss_port_call"))
|
.pscInspectionDesc(rs.getString("psc_inspection_desc"))
|
||||||
.rssOwnrReg(rs.getString("rss_ownr_reg"))
|
.pscDefect(rs.getObject("psc_defect") != null ? rs.getInt("psc_defect") : null)
|
||||||
.rssSts(rs.getString("rss_sts"))
|
.pscDefectDesc(rs.getString("psc_defect_desc"))
|
||||||
|
.pscDetained(rs.getObject("psc_detained") != null ? rs.getInt("psc_detained") : null)
|
||||||
|
.pscDetainedDesc(rs.getString("psc_detained_desc"))
|
||||||
|
.nowSmgrcEvdc(rs.getObject("now_smgrc_evdc") != null ? rs.getInt("now_smgrc_evdc") : null)
|
||||||
|
.nowSmgrcEvdcDesc(rs.getString("now_smgrc_evdc_desc"))
|
||||||
|
.doccChg(rs.getObject("docc_chg") != null ? rs.getInt("docc_chg") : null)
|
||||||
|
.doccChgDesc(rs.getString("docc_chg_desc"))
|
||||||
|
.nowClfic(rs.getObject("now_clfic") != null ? rs.getInt("now_clfic") : null)
|
||||||
|
.nowClficDesc(rs.getString("now_clfic_desc"))
|
||||||
|
.nowClficDescExt(rs.getString("now_clfic_desc_ext"))
|
||||||
|
.clficStatusChg(rs.getObject("clfic_status_chg") != null ? rs.getInt("clfic_status_chg") : null)
|
||||||
|
.clficStatusChgDesc(rs.getString("clfic_status_chg_desc"))
|
||||||
|
.pniInsrnc(rs.getObject("pni_insrnc") != null ? rs.getInt("pni_insrnc") : null)
|
||||||
|
.pniInsrncDesc(rs.getString("pni_insrnc_desc"))
|
||||||
|
.pniInsrncDescExt(rs.getString("pni_insrnc_desc_ext"))
|
||||||
|
.shipNmChg(rs.getObject("ship_nm_chg") != null ? rs.getInt("ship_nm_chg") : null)
|
||||||
|
.shipNmChgDesc(rs.getString("ship_nm_chg_desc"))
|
||||||
|
.gboChg(rs.getObject("gbo_chg") != null ? rs.getInt("gbo_chg") : null)
|
||||||
|
.gboChgDesc(rs.getString("gbo_chg_desc"))
|
||||||
|
.vslage(rs.getObject("vslage") != null ? rs.getInt("vslage") : null)
|
||||||
|
.vslageDesc(rs.getString("vslage_desc"))
|
||||||
|
.ilglFshrViol(rs.getObject("ilgl_fshr_viol") != null ? rs.getInt("ilgl_fshr_viol") : null)
|
||||||
|
.ilglFshrViolDesc(rs.getString("ilgl_fshr_viol_desc"))
|
||||||
|
.draftChg(rs.getObject("draft_chg") != null ? rs.getInt("draft_chg") : null)
|
||||||
|
.draftChgDesc(rs.getString("draft_chg_desc"))
|
||||||
|
.recentSanctionPrtcll(rs.getObject("recent_sanction_prtcll") != null ? rs.getInt("recent_sanction_prtcll") : null)
|
||||||
|
.recentSanctionPrtcllDesc(rs.getString("recent_sanction_prtcll_desc"))
|
||||||
|
.snglShipVoy(rs.getObject("sngl_ship_voy") != null ? rs.getInt("sngl_ship_voy") : null)
|
||||||
|
.snglShipVoyDesc(rs.getString("sngl_ship_voy_desc"))
|
||||||
|
.fltsfty(rs.getObject("fltsfty") != null ? rs.getInt("fltsfty") : null)
|
||||||
|
.fltsftyDesc(rs.getString("fltsfty_desc"))
|
||||||
|
.fltPsc(rs.getObject("flt_psc") != null ? rs.getInt("flt_psc") : null)
|
||||||
|
.fltPscDesc(rs.getString("flt_psc_desc"))
|
||||||
|
.spcInspectionOvdue(rs.getObject("spc_inspection_ovdue") != null ? rs.getInt("spc_inspection_ovdue") : null)
|
||||||
|
.spcInspectionOvdueDesc(rs.getString("spc_inspection_ovdue_desc"))
|
||||||
|
.ownrUnk(rs.getObject("ownr_unk") != null ? rs.getInt("ownr_unk") : null)
|
||||||
|
.ownrUnkDesc(rs.getString("ownr_unk_desc"))
|
||||||
|
.rssPortCall(rs.getObject("rss_port_call") != null ? rs.getInt("rss_port_call") : null)
|
||||||
|
.rssPortCallDesc(rs.getString("rss_port_call_desc"))
|
||||||
|
.rssOwnrReg(rs.getObject("rss_ownr_reg") != null ? rs.getInt("rss_ownr_reg") : null)
|
||||||
|
.rssOwnrRegDesc(rs.getString("rss_ownr_reg_desc"))
|
||||||
|
.rssSts(rs.getObject("rss_sts") != null ? rs.getInt("rss_sts") : null)
|
||||||
|
.rssStsDesc(rs.getString("rss_sts_desc"))
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
package com.snp.batch.jobs.datasync.batch.risk.repository;
|
package com.snp.batch.jobs.datasync.batch.risk.repository;
|
||||||
|
|
||||||
|
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskChangeEntity;
|
||||||
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskEntity;
|
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskEntity;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -9,6 +10,7 @@ import java.util.List;
|
|||||||
* 구현체: RiskRepositoryImpl (JdbcTemplate 기반)
|
* 구현체: RiskRepositoryImpl (JdbcTemplate 기반)
|
||||||
*/
|
*/
|
||||||
public interface RiskRepository {
|
public interface RiskRepository {
|
||||||
void saveRisk(List<RiskEntity> riskEntityList);
|
void saveRiskDetail(List<RiskEntity> riskEntityList);
|
||||||
void saveRiskHistory(List<RiskEntity> riskEntityList);
|
void saveRiskDetailHistory(List<RiskEntity> riskEntityList);
|
||||||
|
void saveRiskDetailChangeHistory(List<RiskChangeEntity> changeEntityList);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,6 +2,7 @@ package com.snp.batch.jobs.datasync.batch.risk.repository;
|
|||||||
|
|
||||||
import com.snp.batch.common.batch.repository.MultiDataSourceJdbcRepository;
|
import com.snp.batch.common.batch.repository.MultiDataSourceJdbcRepository;
|
||||||
import com.snp.batch.common.util.TableMetaInfo;
|
import com.snp.batch.common.util.TableMetaInfo;
|
||||||
|
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskChangeEntity;
|
||||||
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskEntity;
|
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskEntity;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
@ -12,6 +13,7 @@ import org.springframework.stereotype.Repository;
|
|||||||
import javax.sql.DataSource;
|
import javax.sql.DataSource;
|
||||||
import java.sql.PreparedStatement;
|
import java.sql.PreparedStatement;
|
||||||
import java.sql.Timestamp;
|
import java.sql.Timestamp;
|
||||||
|
import java.sql.Types;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -21,145 +23,140 @@ import java.util.List;
|
|||||||
@Repository("riskRepository")
|
@Repository("riskRepository")
|
||||||
public class RiskRepositoryImpl extends MultiDataSourceJdbcRepository<RiskEntity, Long> implements RiskRepository {
|
public class RiskRepositoryImpl extends MultiDataSourceJdbcRepository<RiskEntity, Long> implements RiskRepository {
|
||||||
|
|
||||||
private DataSource batchDataSource;
|
|
||||||
private DataSource businessDataSource;
|
|
||||||
private final TableMetaInfo tableMetaInfo;
|
private final TableMetaInfo tableMetaInfo;
|
||||||
|
|
||||||
public RiskRepositoryImpl(@Qualifier("batchDataSource") DataSource batchDataSource,
|
public RiskRepositoryImpl(@Qualifier("batchDataSource") DataSource batchDataSource,
|
||||||
@Qualifier("businessDataSource") DataSource businessDataSource,
|
@Qualifier("businessDataSource") DataSource businessDataSource,
|
||||||
TableMetaInfo tableMetaInfo) {
|
TableMetaInfo tableMetaInfo) {
|
||||||
|
|
||||||
super(new JdbcTemplate(batchDataSource), new JdbcTemplate(businessDataSource));
|
super(new JdbcTemplate(batchDataSource), new JdbcTemplate(businessDataSource));
|
||||||
|
|
||||||
this.batchDataSource = batchDataSource;
|
|
||||||
this.businessDataSource = businessDataSource;
|
|
||||||
this.tableMetaInfo = tableMetaInfo;
|
this.tableMetaInfo = tableMetaInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override protected String getTableName() { return null; }
|
||||||
protected String getTableName() {
|
@Override protected RowMapper<RiskEntity> getRowMapper() { return null; }
|
||||||
return null;
|
@Override protected Long extractId(RiskEntity entity) { return null; }
|
||||||
}
|
@Override protected String getInsertSql() { return null; }
|
||||||
|
@Override protected String getUpdateSql() { return null; }
|
||||||
|
@Override protected void setInsertParameters(PreparedStatement ps, RiskEntity entity) {}
|
||||||
|
@Override protected void setUpdateParameters(PreparedStatement ps, RiskEntity entity) {}
|
||||||
|
@Override protected String getEntityName() { return null; }
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected RowMapper<RiskEntity> getRowMapper() {
|
public void saveRiskDetail(List<RiskEntity> riskEntityList) {
|
||||||
return null;
|
if (riskEntityList == null || riskEntityList.isEmpty()) return;
|
||||||
}
|
String sql = RiskSql.getRiskDetailUpsertSql(tableMetaInfo.targetTbShipRiskDetailInfo, "imo_no");
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Long extractId(RiskEntity entity) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected String getInsertSql() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected String getUpdateSql() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void setInsertParameters(PreparedStatement ps, RiskEntity entity) throws Exception {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void setUpdateParameters(PreparedStatement ps, RiskEntity entity) throws Exception {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected String getEntityName() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void saveRisk(List<RiskEntity> riskEntityList) {
|
|
||||||
String sql = RiskSql.getRiskUpsertSql(tableMetaInfo.targetTbShipRiskInfo, "imo_no");
|
|
||||||
if (riskEntityList == null || riskEntityList.isEmpty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// log.debug("{} 배치 삽입 시작: {} 건", "RiskEntity", riskEntityList.size());
|
|
||||||
|
|
||||||
batchJdbcTemplate.batchUpdate(sql, riskEntityList, riskEntityList.size(),
|
batchJdbcTemplate.batchUpdate(sql, riskEntityList, riskEntityList.size(),
|
||||||
(ps, entity) -> {
|
(ps, entity) -> { try { bindRiskDetail(ps, entity); } catch (Exception e) { throw new RuntimeException(e); } });
|
||||||
try {
|
|
||||||
bindRisk(ps, entity);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("배치 삽입 파라미터 설정 실패", e);
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// log.debug("{} 배치 삽입 완료: {} 건", "RiskEntity", riskEntityList.size());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void saveRiskHistory(List<RiskEntity> riskEntityList) {
|
public void saveRiskDetailHistory(List<RiskEntity> riskEntityList) {
|
||||||
String sql = RiskSql.getRiskUpsertSql(tableMetaInfo.targetTbShipRiskHstry, "imo_no, last_mdfcn_dt");
|
if (riskEntityList == null || riskEntityList.isEmpty()) return;
|
||||||
if (riskEntityList == null || riskEntityList.isEmpty()) {
|
String sql = RiskSql.getRiskDetailUpsertSql(tableMetaInfo.targetTbShipRiskDetailHstry, "imo_no, last_mdfcn_dt");
|
||||||
return;
|
|
||||||
}
|
|
||||||
// log.debug("{} 배치 삽입 시작: {} 건", "RiskEntity", riskEntityList.size());
|
|
||||||
|
|
||||||
batchJdbcTemplate.batchUpdate(sql, riskEntityList, riskEntityList.size(),
|
batchJdbcTemplate.batchUpdate(sql, riskEntityList, riskEntityList.size(),
|
||||||
|
(ps, entity) -> { try { bindRiskDetail(ps, entity); } catch (Exception e) { throw new RuntimeException(e); } });
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void saveRiskDetailChangeHistory(List<RiskChangeEntity> changeEntityList) {
|
||||||
|
if (changeEntityList == null || changeEntityList.isEmpty()) return;
|
||||||
|
String sql = RiskSql.getChangeHistoryInsertSql(tableMetaInfo.targetTbShipRiskDetailInfoHstry);
|
||||||
|
batchJdbcTemplate.batchUpdate(sql, changeEntityList, changeEntityList.size(),
|
||||||
(ps, entity) -> {
|
(ps, entity) -> {
|
||||||
try {
|
ps.setString(1, entity.getImoNo());
|
||||||
bindRisk(ps, entity);
|
ps.setTimestamp(2, entity.getLastMdfcnDt() != null ? Timestamp.valueOf(entity.getLastMdfcnDt()) : null);
|
||||||
} catch (Exception e) {
|
ps.setString(3, entity.getFlctnColNm());
|
||||||
log.error("배치 삽입 파라미터 설정 실패", e);
|
ps.setString(4, entity.getBfrVal());
|
||||||
throw new RuntimeException(e);
|
ps.setString(5, entity.getAftrVal());
|
||||||
}
|
|
||||||
});
|
});
|
||||||
|
|
||||||
// log.debug("{} 배치 삽입 완료: {} 건", "RiskEntity", riskEntityList.size());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void bindRisk(PreparedStatement pstmt, RiskEntity entity) throws Exception {
|
private void bindRiskDetail(PreparedStatement ps, RiskEntity e) throws Exception {
|
||||||
int idx = 1;
|
int idx = 1;
|
||||||
pstmt.setString(idx++, "SYSTEM"); // 1. creatr_id
|
ps.setString(idx++, "SYSTEM");
|
||||||
pstmt.setString(idx++, entity.getImoNo()); // 2. imo_no
|
ps.setString(idx++, e.getImoNo());
|
||||||
pstmt.setTimestamp(idx++, entity.getLastMdfcnDt() != null ? Timestamp.valueOf(entity.getLastMdfcnDt()) : null); // 3. last_mdfcn_dt
|
ps.setTimestamp(idx++, e.getLastMdfcnDt() != null ? Timestamp.valueOf(e.getLastMdfcnDt()) : null);
|
||||||
pstmt.setString(idx++, entity.getRiskDataMaint()); // 4. risk_data_maint
|
ps.setObject(idx++, e.getRiskDataMaint(), Types.INTEGER);
|
||||||
pstmt.setString(idx++, entity.getAisNotrcvElpsDays()); // 5. ais_notrcv_elps_days
|
ps.setObject(idx++, e.getAisNotrcvElpsDays(), Types.INTEGER);
|
||||||
pstmt.setString(idx++, entity.getAisLwrnkDays()); // 6. ais_lwrnk_days
|
ps.setString(idx++, e.getAisNotrcvElpsDaysDesc());
|
||||||
pstmt.setString(idx++, entity.getAisUpImoDesc()); // 7. ais_up_imo_desc
|
ps.setObject(idx++, e.getAisLwrnkDays(), Types.INTEGER);
|
||||||
pstmt.setString(idx++, entity.getOthrShipNmVoyYn()); // 8. othr_ship_nm_voy_yn
|
ps.setString(idx++, e.getAisLwrnkDaysDesc());
|
||||||
pstmt.setString(idx++, entity.getMmsiAnomMessage()); // 9. mmsi_anom_message
|
ps.setObject(idx++, e.getAisUpImoDesc(), Types.INTEGER);
|
||||||
pstmt.setString(idx++, entity.getRecentDarkActv()); // 10. recent_dark_actv
|
ps.setString(idx++, e.getAisUpImoDescVal());
|
||||||
pstmt.setString(idx++, entity.getPortPrtcll()); // 11. port_prtcll
|
ps.setObject(idx++, e.getOthrShipNmVoyYn(), Types.INTEGER);
|
||||||
pstmt.setString(idx++, entity.getPortRisk()); // 12. port_risk
|
ps.setString(idx++, e.getOthrShipNmVoyYnDesc());
|
||||||
pstmt.setString(idx++, entity.getStsJob()); // 13. sts_job
|
ps.setObject(idx++, e.getMmsiAnomMessage(), Types.INTEGER);
|
||||||
pstmt.setString(idx++, entity.getDriftChg()); // 14. drift_chg
|
ps.setString(idx++, e.getMmsiAnomMessageDesc());
|
||||||
pstmt.setString(idx++, entity.getRiskEvent()); // 15. risk_event
|
ps.setObject(idx++, e.getRecentDarkActv(), Types.INTEGER);
|
||||||
pstmt.setString(idx++, entity.getNtnltyChg()); // 16. ntnlty_chg
|
ps.setString(idx++, e.getRecentDarkActvDesc());
|
||||||
pstmt.setString(idx++, entity.getNtnltyPrsMouPerf()); // 17. ntnlty_prs_mou_perf
|
ps.setObject(idx++, e.getPortPrtcll(), Types.INTEGER);
|
||||||
pstmt.setString(idx++, entity.getNtnltyTkyMouPerf()); // 18. ntnlty_tky_mou_perf
|
ps.setString(idx++, e.getPortPrtcllDesc());
|
||||||
pstmt.setString(idx++, entity.getNtnltyUscgMouPerf()); // 19. ntnlty_uscg_mou_perf
|
ps.setObject(idx++, e.getPortRisk(), Types.INTEGER);
|
||||||
pstmt.setString(idx++, entity.getUscgExclShipCert()); // 20. uscg_excl_ship_cert
|
ps.setString(idx++, e.getPortRiskDesc());
|
||||||
pstmt.setString(idx++, entity.getPscInspectionElpsHr()); // 21. psc_inspection_elps_hr
|
ps.setObject(idx++, e.getStsJob(), Types.INTEGER);
|
||||||
pstmt.setString(idx++, entity.getPscInspection()); // 22. psc_inspection
|
ps.setString(idx++, e.getStsJobDesc());
|
||||||
pstmt.setString(idx++, entity.getPscDefect()); // 23. psc_defect
|
ps.setObject(idx++, e.getDriftChg(), Types.INTEGER);
|
||||||
pstmt.setString(idx++, entity.getPscDetained()); // 24. psc_detained
|
ps.setString(idx++, e.getDriftChgDesc());
|
||||||
pstmt.setString(idx++, entity.getNowSmgrcEvdc()); // 25. now_smgrc_evdc
|
ps.setObject(idx++, e.getRiskEvent(), Types.INTEGER);
|
||||||
pstmt.setString(idx++, entity.getDoccChg()); // 26. docc_chg
|
ps.setString(idx++, e.getRiskEventDesc());
|
||||||
pstmt.setString(idx++, entity.getNowClfic()); // 27. now_clfic
|
ps.setString(idx++, e.getRiskEventDescExt());
|
||||||
pstmt.setString(idx++, entity.getClficStatusChg()); // 28. clfic_status_chg
|
ps.setObject(idx++, e.getNtnltyChg(), Types.INTEGER);
|
||||||
pstmt.setString(idx++, entity.getPniInsrnc()); // 29. pni_insrnc
|
ps.setString(idx++, e.getNtnltyChgDesc());
|
||||||
pstmt.setString(idx++, entity.getShipNmChg()); // 30. ship_nm_chg
|
ps.setObject(idx++, e.getNtnltyPrsMouPerf(), Types.INTEGER);
|
||||||
pstmt.setString(idx++, entity.getGboChg()); // 31. gbo_chg
|
ps.setString(idx++, e.getNtnltyPrsMouPerfDesc());
|
||||||
pstmt.setString(idx++, entity.getVslage()); // 32. vslage
|
ps.setObject(idx++, e.getNtnltyTkyMouPerf(), Types.INTEGER);
|
||||||
pstmt.setString(idx++, entity.getIlglFshrViol()); // 33. ilgl_fshr_viol
|
ps.setString(idx++, e.getNtnltyTkyMouPerfDesc());
|
||||||
pstmt.setString(idx++, entity.getDraftChg()); // 34. draft_chg
|
ps.setObject(idx++, e.getNtnltyUscgMouPerf(), Types.INTEGER);
|
||||||
pstmt.setString(idx++, entity.getRecentSanctionPrtcll()); // 35. recent_sanction_prtcll
|
ps.setString(idx++, e.getNtnltyUscgMouPerfDesc());
|
||||||
pstmt.setString(idx++, entity.getSnglShipVoy()); // 36. sngl_ship_voy
|
ps.setObject(idx++, e.getUscgExclShipCert(), Types.INTEGER);
|
||||||
pstmt.setString(idx++, entity.getFltsfty()); // 37. fltsfty
|
ps.setString(idx++, e.getUscgExclShipCertDesc());
|
||||||
pstmt.setString(idx++, entity.getFltPsc()); // 38. flt_psc
|
ps.setObject(idx++, e.getPscInspectionElpsHr(), Types.INTEGER);
|
||||||
pstmt.setString(idx++, entity.getSpcInspectionOvdue()); // 39. spc_inspection_ovdue
|
ps.setString(idx++, e.getPscInspectionElpsHrDesc());
|
||||||
pstmt.setString(idx++, entity.getOwnrUnk()); // 40. ownr_unk
|
ps.setObject(idx++, e.getPscInspection(), Types.INTEGER);
|
||||||
pstmt.setString(idx++, entity.getRssPortCall()); // 41. rss_port_call
|
ps.setString(idx++, e.getPscInspectionDesc());
|
||||||
pstmt.setString(idx++, entity.getRssOwnrReg()); // 42. rss_ownr_reg
|
ps.setObject(idx++, e.getPscDefect(), Types.INTEGER);
|
||||||
pstmt.setString(idx++, entity.getRssSts()); // 43. rss_sts
|
ps.setString(idx++, e.getPscDefectDesc());
|
||||||
|
ps.setObject(idx++, e.getPscDetained(), Types.INTEGER);
|
||||||
|
ps.setString(idx++, e.getPscDetainedDesc());
|
||||||
|
ps.setObject(idx++, e.getNowSmgrcEvdc(), Types.INTEGER);
|
||||||
|
ps.setString(idx++, e.getNowSmgrcEvdcDesc());
|
||||||
|
ps.setObject(idx++, e.getDoccChg(), Types.INTEGER);
|
||||||
|
ps.setString(idx++, e.getDoccChgDesc());
|
||||||
|
ps.setObject(idx++, e.getNowClfic(), Types.INTEGER);
|
||||||
|
ps.setString(idx++, e.getNowClficDesc());
|
||||||
|
ps.setString(idx++, e.getNowClficDescExt());
|
||||||
|
ps.setObject(idx++, e.getClficStatusChg(), Types.INTEGER);
|
||||||
|
ps.setString(idx++, e.getClficStatusChgDesc());
|
||||||
|
ps.setObject(idx++, e.getPniInsrnc(), Types.INTEGER);
|
||||||
|
ps.setString(idx++, e.getPniInsrncDesc());
|
||||||
|
ps.setString(idx++, e.getPniInsrncDescExt());
|
||||||
|
ps.setObject(idx++, e.getShipNmChg(), Types.INTEGER);
|
||||||
|
ps.setString(idx++, e.getShipNmChgDesc());
|
||||||
|
ps.setObject(idx++, e.getGboChg(), Types.INTEGER);
|
||||||
|
ps.setString(idx++, e.getGboChgDesc());
|
||||||
|
ps.setObject(idx++, e.getVslage(), Types.INTEGER);
|
||||||
|
ps.setString(idx++, e.getVslageDesc());
|
||||||
|
ps.setObject(idx++, e.getIlglFshrViol(), Types.INTEGER);
|
||||||
|
ps.setString(idx++, e.getIlglFshrViolDesc());
|
||||||
|
ps.setObject(idx++, e.getDraftChg(), Types.INTEGER);
|
||||||
|
ps.setString(idx++, e.getDraftChgDesc());
|
||||||
|
ps.setObject(idx++, e.getRecentSanctionPrtcll(), Types.INTEGER);
|
||||||
|
ps.setString(idx++, e.getRecentSanctionPrtcllDesc());
|
||||||
|
ps.setObject(idx++, e.getSnglShipVoy(), Types.INTEGER);
|
||||||
|
ps.setString(idx++, e.getSnglShipVoyDesc());
|
||||||
|
ps.setObject(idx++, e.getFltsfty(), Types.INTEGER);
|
||||||
|
ps.setString(idx++, e.getFltsftyDesc());
|
||||||
|
ps.setObject(idx++, e.getFltPsc(), Types.INTEGER);
|
||||||
|
ps.setString(idx++, e.getFltPscDesc());
|
||||||
|
ps.setObject(idx++, e.getSpcInspectionOvdue(), Types.INTEGER);
|
||||||
|
ps.setString(idx++, e.getSpcInspectionOvdueDesc());
|
||||||
|
ps.setObject(idx++, e.getOwnrUnk(), Types.INTEGER);
|
||||||
|
ps.setString(idx++, e.getOwnrUnkDesc());
|
||||||
|
ps.setObject(idx++, e.getRssPortCall(), Types.INTEGER);
|
||||||
|
ps.setString(idx++, e.getRssPortCallDesc());
|
||||||
|
ps.setObject(idx++, e.getRssOwnrReg(), Types.INTEGER);
|
||||||
|
ps.setString(idx++, e.getRssOwnrRegDesc());
|
||||||
|
ps.setObject(idx++, e.getRssSts(), Types.INTEGER);
|
||||||
|
ps.setString(idx++, e.getRssStsDesc());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -6,85 +6,135 @@ import org.springframework.stereotype.Component;
|
|||||||
@Component
|
@Component
|
||||||
public class RiskSql {
|
public class RiskSql {
|
||||||
private static String TARGET_SCHEMA;
|
private static String TARGET_SCHEMA;
|
||||||
|
|
||||||
public RiskSql(@Value("${app.batch.target-schema.name}") String targetSchema) {
|
public RiskSql(@Value("${app.batch.target-schema.name}") String targetSchema) {
|
||||||
TARGET_SCHEMA = targetSchema;
|
TARGET_SCHEMA = targetSchema;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String getRiskUpsertSql(String targetTable, String targetIndex) {
|
/**
|
||||||
|
* Risk Detail UPSERT SQL (tb_ship_risk_detail_info, tb_ship_risk_detail_hstry 공통)
|
||||||
|
*/
|
||||||
|
public static String getRiskDetailUpsertSql(String targetTable, String targetIndex) {
|
||||||
return """
|
return """
|
||||||
INSERT INTO %s.%s (
|
INSERT INTO %s.%s (
|
||||||
crt_dt, creatr_id,
|
crt_dt, creatr_id,
|
||||||
imo_no, last_mdfcn_dt, risk_data_maint, ais_notrcv_elps_days,
|
imo_no, last_mdfcn_dt, risk_data_maint,
|
||||||
ais_lwrnk_days, ais_up_imo_desc, othr_ship_nm_voy_yn, mmsi_anom_message,
|
ais_notrcv_elps_days, ais_notrcv_elps_days_desc,
|
||||||
recent_dark_actv, port_prtcll, port_risk, sts_job,
|
ais_lwrnk_days, ais_lwrnk_days_desc,
|
||||||
drift_chg, risk_event, ntnlty_chg, ntnlty_prs_mou_perf,
|
ais_up_imo_desc, ais_up_imo_desc_val,
|
||||||
ntnlty_tky_mou_perf, ntnlty_uscg_mou_perf, uscg_excl_ship_cert, psc_inspection_elps_hr,
|
othr_ship_nm_voy_yn, othr_ship_nm_voy_yn_desc,
|
||||||
psc_inspection, psc_defect, psc_detained, now_smgrc_evdc,
|
mmsi_anom_message, mmsi_anom_message_desc,
|
||||||
docc_chg, now_clfic, clfic_status_chg, pni_insrnc,
|
recent_dark_actv, recent_dark_actv_desc,
|
||||||
ship_nm_chg, gbo_chg, vslage, ilgl_fshr_viol,
|
port_prtcll, port_prtcll_desc,
|
||||||
draft_chg, recent_sanction_prtcll, sngl_ship_voy, fltsfty,
|
port_risk, port_risk_desc,
|
||||||
flt_psc, spc_inspection_ovdue, ownr_unk, rss_port_call,
|
sts_job, sts_job_desc,
|
||||||
rss_ownr_reg, rss_sts
|
drift_chg, drift_chg_desc,
|
||||||
|
risk_event, risk_event_desc, risk_event_desc_ext,
|
||||||
|
ntnlty_chg, ntnlty_chg_desc,
|
||||||
|
ntnlty_prs_mou_perf, ntnlty_prs_mou_perf_desc,
|
||||||
|
ntnlty_tky_mou_perf, ntnlty_tky_mou_perf_desc,
|
||||||
|
ntnlty_uscg_mou_perf, ntnlty_uscg_mou_perf_desc,
|
||||||
|
uscg_excl_ship_cert, uscg_excl_ship_cert_desc,
|
||||||
|
psc_inspection_elps_hr, psc_inspection_elps_hr_desc,
|
||||||
|
psc_inspection, psc_inspection_desc,
|
||||||
|
psc_defect, psc_defect_desc,
|
||||||
|
psc_detained, psc_detained_desc,
|
||||||
|
now_smgrc_evdc, now_smgrc_evdc_desc,
|
||||||
|
docc_chg, docc_chg_desc,
|
||||||
|
now_clfic, now_clfic_desc, now_clfic_desc_ext,
|
||||||
|
clfic_status_chg, clfic_status_chg_desc,
|
||||||
|
pni_insrnc, pni_insrnc_desc, pni_insrnc_desc_ext,
|
||||||
|
ship_nm_chg, ship_nm_chg_desc,
|
||||||
|
gbo_chg, gbo_chg_desc,
|
||||||
|
vslage, vslage_desc,
|
||||||
|
ilgl_fshr_viol, ilgl_fshr_viol_desc,
|
||||||
|
draft_chg, draft_chg_desc,
|
||||||
|
recent_sanction_prtcll, recent_sanction_prtcll_desc,
|
||||||
|
sngl_ship_voy, sngl_ship_voy_desc,
|
||||||
|
fltsfty, fltsfty_desc,
|
||||||
|
flt_psc, flt_psc_desc,
|
||||||
|
spc_inspection_ovdue, spc_inspection_ovdue_desc,
|
||||||
|
ownr_unk, ownr_unk_desc,
|
||||||
|
rss_port_call, rss_port_call_desc,
|
||||||
|
rss_ownr_reg, rss_ownr_reg_desc,
|
||||||
|
rss_sts, rss_sts_desc
|
||||||
)
|
)
|
||||||
VALUES (
|
VALUES (
|
||||||
CURRENT_TIMESTAMP, ?,
|
CURRENT_TIMESTAMP, ?,
|
||||||
?, ?, ?, ?,
|
?, ?, ?,
|
||||||
?, ?, ?, ?,
|
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
|
||||||
?, ?, ?, ?,
|
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
|
||||||
?, ?, ?, ?,
|
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
|
||||||
?, ?, ?, ?,
|
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
|
||||||
?, ?, ?, ?,
|
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
|
||||||
?, ?, ?, ?,
|
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
|
||||||
?, ?, ?, ?,
|
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
|
||||||
?, ?, ?, ?,
|
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
|
||||||
?, ?, ?, ?,
|
?
|
||||||
?, ?
|
|
||||||
)
|
)
|
||||||
ON CONFLICT (%s)
|
ON CONFLICT (%s)
|
||||||
DO UPDATE SET
|
DO UPDATE SET
|
||||||
mdfcn_dt = CURRENT_TIMESTAMP,
|
mdfcn_dt = CURRENT_TIMESTAMP, mdfr_id = 'SYSTEM',
|
||||||
mdfr_id = 'SYSTEM',
|
last_mdfcn_dt = EXCLUDED.last_mdfcn_dt, risk_data_maint = EXCLUDED.risk_data_maint,
|
||||||
last_mdfcn_dt = EXCLUDED.last_mdfcn_dt,
|
ais_notrcv_elps_days = EXCLUDED.ais_notrcv_elps_days, ais_notrcv_elps_days_desc = EXCLUDED.ais_notrcv_elps_days_desc,
|
||||||
risk_data_maint = EXCLUDED.risk_data_maint,
|
ais_lwrnk_days = EXCLUDED.ais_lwrnk_days, ais_lwrnk_days_desc = EXCLUDED.ais_lwrnk_days_desc,
|
||||||
ais_notrcv_elps_days = EXCLUDED.ais_notrcv_elps_days,
|
ais_up_imo_desc = EXCLUDED.ais_up_imo_desc, ais_up_imo_desc_val = EXCLUDED.ais_up_imo_desc_val,
|
||||||
ais_lwrnk_days = EXCLUDED.ais_lwrnk_days,
|
othr_ship_nm_voy_yn = EXCLUDED.othr_ship_nm_voy_yn, othr_ship_nm_voy_yn_desc = EXCLUDED.othr_ship_nm_voy_yn_desc,
|
||||||
ais_up_imo_desc = EXCLUDED.ais_up_imo_desc,
|
mmsi_anom_message = EXCLUDED.mmsi_anom_message, mmsi_anom_message_desc = EXCLUDED.mmsi_anom_message_desc,
|
||||||
othr_ship_nm_voy_yn = EXCLUDED.othr_ship_nm_voy_yn,
|
recent_dark_actv = EXCLUDED.recent_dark_actv, recent_dark_actv_desc = EXCLUDED.recent_dark_actv_desc,
|
||||||
mmsi_anom_message = EXCLUDED.mmsi_anom_message,
|
port_prtcll = EXCLUDED.port_prtcll, port_prtcll_desc = EXCLUDED.port_prtcll_desc,
|
||||||
recent_dark_actv = EXCLUDED.recent_dark_actv,
|
port_risk = EXCLUDED.port_risk, port_risk_desc = EXCLUDED.port_risk_desc,
|
||||||
port_prtcll = EXCLUDED.port_prtcll,
|
sts_job = EXCLUDED.sts_job, sts_job_desc = EXCLUDED.sts_job_desc,
|
||||||
port_risk = EXCLUDED.port_risk,
|
drift_chg = EXCLUDED.drift_chg, drift_chg_desc = EXCLUDED.drift_chg_desc,
|
||||||
sts_job = EXCLUDED.sts_job,
|
risk_event = EXCLUDED.risk_event, risk_event_desc = EXCLUDED.risk_event_desc, risk_event_desc_ext = EXCLUDED.risk_event_desc_ext,
|
||||||
drift_chg = EXCLUDED.drift_chg,
|
ntnlty_chg = EXCLUDED.ntnlty_chg, ntnlty_chg_desc = EXCLUDED.ntnlty_chg_desc,
|
||||||
risk_event = EXCLUDED.risk_event,
|
ntnlty_prs_mou_perf = EXCLUDED.ntnlty_prs_mou_perf, ntnlty_prs_mou_perf_desc = EXCLUDED.ntnlty_prs_mou_perf_desc,
|
||||||
ntnlty_chg = EXCLUDED.ntnlty_chg,
|
ntnlty_tky_mou_perf = EXCLUDED.ntnlty_tky_mou_perf, ntnlty_tky_mou_perf_desc = EXCLUDED.ntnlty_tky_mou_perf_desc,
|
||||||
ntnlty_prs_mou_perf = EXCLUDED.ntnlty_prs_mou_perf,
|
ntnlty_uscg_mou_perf = EXCLUDED.ntnlty_uscg_mou_perf, ntnlty_uscg_mou_perf_desc = EXCLUDED.ntnlty_uscg_mou_perf_desc,
|
||||||
ntnlty_tky_mou_perf = EXCLUDED.ntnlty_tky_mou_perf,
|
uscg_excl_ship_cert = EXCLUDED.uscg_excl_ship_cert, uscg_excl_ship_cert_desc = EXCLUDED.uscg_excl_ship_cert_desc,
|
||||||
ntnlty_uscg_mou_perf = EXCLUDED.ntnlty_uscg_mou_perf,
|
psc_inspection_elps_hr = EXCLUDED.psc_inspection_elps_hr, psc_inspection_elps_hr_desc = EXCLUDED.psc_inspection_elps_hr_desc,
|
||||||
uscg_excl_ship_cert = EXCLUDED.uscg_excl_ship_cert,
|
psc_inspection = EXCLUDED.psc_inspection, psc_inspection_desc = EXCLUDED.psc_inspection_desc,
|
||||||
psc_inspection_elps_hr = EXCLUDED.psc_inspection_elps_hr,
|
psc_defect = EXCLUDED.psc_defect, psc_defect_desc = EXCLUDED.psc_defect_desc,
|
||||||
psc_inspection = EXCLUDED.psc_inspection,
|
psc_detained = EXCLUDED.psc_detained, psc_detained_desc = EXCLUDED.psc_detained_desc,
|
||||||
psc_defect = EXCLUDED.psc_defect,
|
now_smgrc_evdc = EXCLUDED.now_smgrc_evdc, now_smgrc_evdc_desc = EXCLUDED.now_smgrc_evdc_desc,
|
||||||
psc_detained = EXCLUDED.psc_detained,
|
docc_chg = EXCLUDED.docc_chg, docc_chg_desc = EXCLUDED.docc_chg_desc,
|
||||||
now_smgrc_evdc = EXCLUDED.now_smgrc_evdc,
|
now_clfic = EXCLUDED.now_clfic, now_clfic_desc = EXCLUDED.now_clfic_desc, now_clfic_desc_ext = EXCLUDED.now_clfic_desc_ext,
|
||||||
docc_chg = EXCLUDED.docc_chg,
|
clfic_status_chg = EXCLUDED.clfic_status_chg, clfic_status_chg_desc = EXCLUDED.clfic_status_chg_desc,
|
||||||
now_clfic = EXCLUDED.now_clfic,
|
pni_insrnc = EXCLUDED.pni_insrnc, pni_insrnc_desc = EXCLUDED.pni_insrnc_desc, pni_insrnc_desc_ext = EXCLUDED.pni_insrnc_desc_ext,
|
||||||
clfic_status_chg = EXCLUDED.clfic_status_chg,
|
ship_nm_chg = EXCLUDED.ship_nm_chg, ship_nm_chg_desc = EXCLUDED.ship_nm_chg_desc,
|
||||||
pni_insrnc = EXCLUDED.pni_insrnc,
|
gbo_chg = EXCLUDED.gbo_chg, gbo_chg_desc = EXCLUDED.gbo_chg_desc,
|
||||||
ship_nm_chg = EXCLUDED.ship_nm_chg,
|
vslage = EXCLUDED.vslage, vslage_desc = EXCLUDED.vslage_desc,
|
||||||
gbo_chg = EXCLUDED.gbo_chg,
|
ilgl_fshr_viol = EXCLUDED.ilgl_fshr_viol, ilgl_fshr_viol_desc = EXCLUDED.ilgl_fshr_viol_desc,
|
||||||
vslage = EXCLUDED.vslage,
|
draft_chg = EXCLUDED.draft_chg, draft_chg_desc = EXCLUDED.draft_chg_desc,
|
||||||
ilgl_fshr_viol = EXCLUDED.ilgl_fshr_viol,
|
recent_sanction_prtcll = EXCLUDED.recent_sanction_prtcll, recent_sanction_prtcll_desc = EXCLUDED.recent_sanction_prtcll_desc,
|
||||||
draft_chg = EXCLUDED.draft_chg,
|
sngl_ship_voy = EXCLUDED.sngl_ship_voy, sngl_ship_voy_desc = EXCLUDED.sngl_ship_voy_desc,
|
||||||
recent_sanction_prtcll = EXCLUDED.recent_sanction_prtcll,
|
fltsfty = EXCLUDED.fltsfty, fltsfty_desc = EXCLUDED.fltsfty_desc,
|
||||||
sngl_ship_voy = EXCLUDED.sngl_ship_voy,
|
flt_psc = EXCLUDED.flt_psc, flt_psc_desc = EXCLUDED.flt_psc_desc,
|
||||||
fltsfty = EXCLUDED.fltsfty,
|
spc_inspection_ovdue = EXCLUDED.spc_inspection_ovdue, spc_inspection_ovdue_desc = EXCLUDED.spc_inspection_ovdue_desc,
|
||||||
flt_psc = EXCLUDED.flt_psc,
|
ownr_unk = EXCLUDED.ownr_unk, ownr_unk_desc = EXCLUDED.ownr_unk_desc,
|
||||||
spc_inspection_ovdue = EXCLUDED.spc_inspection_ovdue,
|
rss_port_call = EXCLUDED.rss_port_call, rss_port_call_desc = EXCLUDED.rss_port_call_desc,
|
||||||
ownr_unk = EXCLUDED.ownr_unk,
|
rss_ownr_reg = EXCLUDED.rss_ownr_reg, rss_ownr_reg_desc = EXCLUDED.rss_ownr_reg_desc,
|
||||||
rss_port_call = EXCLUDED.rss_port_call,
|
rss_sts = EXCLUDED.rss_sts, rss_sts_desc = EXCLUDED.rss_sts_desc;
|
||||||
rss_ownr_reg = EXCLUDED.rss_ownr_reg,
|
|
||||||
rss_sts = EXCLUDED.rss_sts;
|
|
||||||
""".formatted(TARGET_SCHEMA, targetTable, targetIndex);
|
""".formatted(TARGET_SCHEMA, targetTable, targetIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 값 변경 이력 INSERT SQL (tb_ship_risk_detail_info_hstry)
|
||||||
|
*/
|
||||||
|
public static String getChangeHistoryInsertSql(String targetTable) {
|
||||||
|
return """
|
||||||
|
INSERT INTO %s.%s (crt_dt, creatr_id, imo_no, last_mdfcn_dt, flctn_col_nm, bfr_val, aftr_val)
|
||||||
|
VALUES (CURRENT_TIMESTAMP, 'SYSTEM', ?, ?, ?, ?, ?)
|
||||||
|
ON CONFLICT (imo_no, last_mdfcn_dt, flctn_col_nm) DO NOTHING
|
||||||
|
""".formatted(TARGET_SCHEMA, targetTable);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 기존 데이터 조회 SQL (변경 비교용)
|
||||||
|
*/
|
||||||
|
public static String getSelectCurrentSql(String targetTable) {
|
||||||
|
return """
|
||||||
|
SELECT * FROM %s.%s WHERE imo_no = ?
|
||||||
|
""".formatted(TARGET_SCHEMA, targetTable);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,23 @@
|
|||||||
|
package com.snp.batch.jobs.datasync.batch.risk.writer;
|
||||||
|
|
||||||
|
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskChangeEntity;
|
||||||
|
import com.snp.batch.jobs.datasync.batch.risk.repository.RiskRepository;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.batch.item.Chunk;
|
||||||
|
import org.springframework.batch.item.ItemWriter;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class RiskChangeWriter implements ItemWriter<RiskChangeEntity> {
|
||||||
|
|
||||||
|
private final RiskRepository riskRepository;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(Chunk<? extends RiskChangeEntity> chunk) throws Exception {
|
||||||
|
if (chunk.isEmpty()) return;
|
||||||
|
riskRepository.saveRiskDetailChangeHistory(new ArrayList<>(chunk.getItems()));
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -19,10 +19,8 @@ public class RiskWriter extends BaseChunkedWriter<RiskEntity> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void writeItems(List<RiskEntity> items) throws Exception {
|
protected void writeItems(List<RiskEntity> items) throws Exception {
|
||||||
if (items.isEmpty()) {
|
if (items.isEmpty()) return;
|
||||||
return;
|
riskRepository.saveRiskDetail(items); // 1. 최신 데이터 UPSERT
|
||||||
}
|
riskRepository.saveRiskDetailHistory(items); // 2. 스냅샷 이력 UPSERT
|
||||||
riskRepository.saveRisk(items);
|
|
||||||
riskRepository.saveRiskHistory(items);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -152,7 +152,7 @@ app:
|
|||||||
movements-008: tb_ship_trnst_hstry
|
movements-008: tb_ship_trnst_hstry
|
||||||
code-001: tb_ship_type_cd
|
code-001: tb_ship_type_cd
|
||||||
code-002: tb_ship_country_cd
|
code-002: tb_ship_country_cd
|
||||||
risk-compliance-001: tb_ship_risk_info
|
risk-compliance-001: tb_ship_risk_detail_info
|
||||||
risk-compliance-003: tb_ship_compliance_info
|
risk-compliance-003: tb_ship_compliance_info
|
||||||
risk-compliance-006: tb_company_compliance_info
|
risk-compliance-006: tb_company_compliance_info
|
||||||
target-schema:
|
target-schema:
|
||||||
@ -202,8 +202,9 @@ app:
|
|||||||
movements-008: tb_ship_trnst_hstry
|
movements-008: tb_ship_trnst_hstry
|
||||||
code-001: tb_ship_type_cd
|
code-001: tb_ship_type_cd
|
||||||
code-002: tb_ship_country_cd
|
code-002: tb_ship_country_cd
|
||||||
risk-compliance-001: tb_ship_risk_info
|
risk-compliance-001: tb_ship_risk_detail_info
|
||||||
risk-compliance-002: tb_ship_risk_hstry
|
risk-compliance-001-1: tb_ship_risk_detail_info_hstry
|
||||||
|
risk-compliance-002: tb_ship_risk_detail_hstry
|
||||||
risk-compliance-003: tb_ship_compliance_info
|
risk-compliance-003: tb_ship_compliance_info
|
||||||
risk-compliance-004: tb_ship_compliance_hstry
|
risk-compliance-004: tb_ship_compliance_hstry
|
||||||
risk-compliance-005: tb_ship_compliance_info_hstry
|
risk-compliance-005: tb_ship_compliance_info_hstry
|
||||||
|
|||||||
불러오는 중...
Reference in New Issue
Block a user