release: 2026-03-26 (3건 커밋) #23
199
README.md
Normal file
199
README.md
Normal file
@ -0,0 +1,199 @@
|
||||
SNP Sync Batch
|
||||
|
||||
S&P Global 해양 데이터를 수집하여 PostgreSQL에 동기화하는 Spring Batch 시스템입니다.
|
||||
|
||||
## 기술 스택
|
||||
|
||||
| 구분 | 기술 |
|
||||
|------|------|
|
||||
| Language | Java 17 |
|
||||
| Framework | Spring Boot 3.2.1, Spring Batch 5.1.0 |
|
||||
| Scheduler | Quartz 2.5.0 (JDBC Store) |
|
||||
| Database | PostgreSQL (dual datasource) |
|
||||
| Cache | Caffeine |
|
||||
| Frontend | React + TypeScript (Vite) |
|
||||
| Build | Maven (frontend-maven-plugin 통합) |
|
||||
| CI/CD | Gitea Actions → systemd 자동 배포 |
|
||||
|
||||
## 아키텍처
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────┐
|
||||
│ SNP Sync Batch │
|
||||
│ │
|
||||
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
|
||||
│ │ Quartz │───►│ Reader │───►│ Processor │───►│ Writer │ │
|
||||
│ │ Scheduler │ │(BaseSyncR)│ │(DTO→Entity)│ │(SubChunk) │ │
|
||||
│ └──────────┘ └────┬─────┘ └──────────┘ └────┬─────┘ │
|
||||
│ │ │ │
|
||||
│ ┌────────▼────────┐ ┌───────▼───────┐ │
|
||||
│ │ Source DB │ │ Target DB │ │
|
||||
│ │ (std_snp_data) │ │ (std_snp_svc) │ │
|
||||
│ │ batch_flag 관리 │ │ UPSERT 저장 │ │
|
||||
│ └─────────────────┘ └───────────────┘ │
|
||||
│ │
|
||||
│ ┌──────────────────────────────────────────────────────────┐ │
|
||||
│ │ React Frontend (포트 8051, /snp-sync) │ │
|
||||
│ │ 대시보드 │ 동기화현황 │ 실행이력 │ 스케줄 │ 타임라인 │ │
|
||||
│ └──────────────────────────────────────────────────────────┘ │
|
||||
└─────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## 데이터베이스 구조
|
||||
|
||||
### Dual Datasource
|
||||
|
||||
| Datasource | 스키마 | 용도 |
|
||||
|------------|--------|------|
|
||||
| batchDataSource (Primary) | snp_batch | Spring Batch 메타데이터, Quartz 스케줄 |
|
||||
| businessDataSource | std_snp_data / std_snp_svc | 비즈니스 데이터 (소스/타겟) |
|
||||
|
||||
### 동기화 대상 테이블
|
||||
|
||||
| 도메인 | 소스 테이블 수 | 주요 내용 |
|
||||
|--------|--------------|----------|
|
||||
| Ship | 25개 | 선박 제원, 이력, 관계 |
|
||||
| Company | 1개 | 회사 상세 정보 |
|
||||
| Event | 4개 | 해양 사건/사고 |
|
||||
| Facility | 1개 | 항구 시설 |
|
||||
| PSC | 3개 | PSC 검사 |
|
||||
| Movements | 8개 | 선박 이동 (항적) |
|
||||
| Code | 2개 | 코드 (선종, 국적) |
|
||||
| Risk | 1개 | 위험 지표 (+ 값 변경 이력) |
|
||||
| Compliance | 2개 | 컴플라이언스 (+ 값 변경 이력) |
|
||||
|
||||
## 동기화 프로세스
|
||||
|
||||
### batch_flag 상태 흐름
|
||||
|
||||
```
|
||||
[N] 대기 ──── Reader ────► [P] 진행 ──── Writer 성공 ────► [S] 완료
|
||||
│ │
|
||||
│ Writer 실패 시
|
||||
│ P 상태 고착 (수동 리셋 필요)
|
||||
│
|
||||
batch_job_execution.status = 'COMPLETED' 인 데이터만 대상
|
||||
```
|
||||
|
||||
### Reader → Writer 흐름
|
||||
|
||||
```
|
||||
BaseSyncReader (while 루프)
|
||||
│
|
||||
├─ fetchNextGroup(11284): MIN(batch_flag='N') → 데이터 로드 → N→P
|
||||
├─ fetchNextGroup(11286): 다음 그룹 연속 로드 → N→P
|
||||
├─ fetchNextGroup(11317): 다음 그룹 연속 로드 → N→P
|
||||
└─ fetchNextGroup(): 데이터 없음 → return null → Step 종료
|
||||
│
|
||||
▼
|
||||
BaseChunkedWriter (sub-chunk 5000건 단위)
|
||||
│
|
||||
├─ Sub-Chunk 1: [1~5000건] → 독립 트랜잭션 커밋
|
||||
├─ Sub-Chunk 2: [5001~10000건] → 독립 트랜잭션 커밋
|
||||
└─ ...
|
||||
│
|
||||
▼
|
||||
BatchWriteListener.afterWrite()
|
||||
└─ 청크 내 모든 고유 job_execution_id → P→S 업데이트
|
||||
```
|
||||
|
||||
### 값 변경 이력 관리 (Risk, Compliance)
|
||||
|
||||
데이터 동기화 Job과 값 변경 이력 Job이 분리되어 운영됩니다:
|
||||
|
||||
```
|
||||
riskDataSyncJob (데이터 동기화)
|
||||
Source → Reader → Writer
|
||||
├─ UPSERT → tb_ship_risk_detail_info (최신 데이터)
|
||||
└─ UPSERT → tb_ship_risk_detail_hstry (스냅샷 이력)
|
||||
|
||||
riskDetailChangeDataSyncJob (값 변경 이력)
|
||||
tb_ship_risk_detail_hstry (스냅샷)
|
||||
→ imo_no별 시계열 비교
|
||||
→ indicator 컬럼 변경분 감지
|
||||
→ INSERT → tb_ship_risk_detail_info_hstry (컬럼명, 이전값, 이후값)
|
||||
```
|
||||
|
||||
## 배치 Job 목록
|
||||
|
||||
### 데이터 동기화 Job
|
||||
|
||||
| Job 이름 | 도메인 | 설명 |
|
||||
|----------|--------|------|
|
||||
| snpMdaDataSyncJob | Ship + Company | 선박/회사 데이터 동기화 (26 Step) |
|
||||
| eventDataSyncJob | Event | 사건 데이터 동기화 (4 Step) |
|
||||
| facilitySyncJob | Facility | 항구 시설 동기화 |
|
||||
| pscDataSyncJob | PSC | PSC 검사 동기화 (3 Step) |
|
||||
| anchorageCallDataSyncJob | Movements | 정박지 기항 |
|
||||
| berthCallDataSyncJob | Movements | 부두 기항 |
|
||||
| currentlyAtDataSyncJob | Movements | 현재 위치 |
|
||||
| destinationDataSyncJob | Movements | 목적지 |
|
||||
| portCallDataSyncJob | Movements | 항구 기항 |
|
||||
| stsOperationDataSyncJob | Movements | STS 작업 |
|
||||
| terminalCallDataSyncJob | Movements | 터미널 기항 |
|
||||
| transitDataSyncJob | Movements | 통과 |
|
||||
| codeDataSyncJob | Code | 코드 동기화 (2 Step) |
|
||||
| riskDataSyncJob | Risk | 위험 지표 동기화 |
|
||||
| shipComplianceDataSyncJob | Compliance | 선박 컴플라이언스 |
|
||||
| companyComplianceDataSyncJob | Compliance | 회사 컴플라이언스 |
|
||||
|
||||
### 값 변경 이력 Job
|
||||
|
||||
| Job 이름 | 설명 |
|
||||
|----------|------|
|
||||
| riskDetailChangeDataSyncJob | Risk indicator 값 변경 이력 |
|
||||
| shipComplianceChangeDataSyncJob | 선박 컴플라이언스 값 변경 이력 |
|
||||
| companyComplianceChangeDataSyncJob | 회사 컴플라이언스 값 변경 이력 |
|
||||
|
||||
### 유지보수 Job
|
||||
|
||||
| Job 이름 | 설명 |
|
||||
|----------|------|
|
||||
| batchLogCleanupJob | 배치 로그 정리 (90일 보관) |
|
||||
|
||||
## 프론트엔드
|
||||
|
||||
| 메뉴 | 경로 | 설명 |
|
||||
|------|------|------|
|
||||
| 대시보드 | `/` | 통계, 실행 중 작업, 최근 실행/실패 |
|
||||
| 동기화 현황 | `/sync-status` | 테이블별 N/P/S 건수, 데이터 미리보기, P 리셋 |
|
||||
| 실행 이력 | `/executions` | 배치 실행 검색, 필터, 상세 |
|
||||
| 작업 | `/jobs` | Job 목록, 수동 실행 |
|
||||
| 스케줄 | `/schedules` | Quartz 스케줄 CRUD, Cron 미리보기 |
|
||||
| 타임라인 | `/schedule-timeline` | 일/주/월별 실행 시각화 |
|
||||
|
||||
## CI/CD
|
||||
|
||||
```
|
||||
develop → main 머지
|
||||
→ Gitea Actions (빌드 + JAR 배포)
|
||||
→ systemd path watcher (.deploy-trigger 감지)
|
||||
→ restart.sh → 서비스 재시작
|
||||
```
|
||||
|
||||
### 배포 확인
|
||||
|
||||
```bash
|
||||
# 배포 로그
|
||||
cat /devdata/services/snp-sync-batch/backend/deploy.log
|
||||
|
||||
# 서비스 상태
|
||||
systemctl status snp-sync-batch
|
||||
|
||||
# 실시간 로그
|
||||
journalctl -u snp-sync-batch -n 100 -f
|
||||
```
|
||||
|
||||
## 빌드 및 실행
|
||||
|
||||
```bash
|
||||
# 개발 환경 실행
|
||||
mvn spring-boot:run
|
||||
|
||||
# 패키징
|
||||
mvn clean package -DskipTests
|
||||
|
||||
# 서버 포트: 8051
|
||||
# Context Path: /snp-sync
|
||||
# Swagger: /snp-sync/swagger-ui.html
|
||||
```
|
||||
@ -4,6 +4,18 @@
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [2026-03-26]
|
||||
|
||||
### 추가
|
||||
- Risk 데이터 동기화 대상 변경: tb_ship_risk_detail_info + 값 변경 이력 관리 (#3)
|
||||
- riskDetailChangeDataSyncJob: Risk indicator 값 변경 이력 Job 분리
|
||||
- 배치 로그 관리 정리 Job 구현: 90일 보관 기간 기준 자동 삭제 (#16)
|
||||
- README.md 프로젝트 문서 추가
|
||||
|
||||
### 변경
|
||||
- BaseSyncReader: while 루프 연속 그룹 로드 방식으로 변경 (multi-chunk Step 종료 문제 해결)
|
||||
- BatchWriteListener: 청크 내 모든 고유 job_execution_id P→S 업데이트
|
||||
|
||||
## [2026-03-25]
|
||||
|
||||
### 추가
|
||||
|
||||
@ -30,7 +30,6 @@ public abstract class BaseSyncReader<T extends JobExecutionGroupable> implements
|
||||
protected final JdbcTemplate businessJdbcTemplate;
|
||||
|
||||
private List<T> allDataBuffer = new ArrayList<>();
|
||||
private Long currentGroupId = null;
|
||||
|
||||
protected BaseSyncReader(DataSource businessDataSource, TableMetaInfo tableMetaInfo) {
|
||||
this.businessJdbcTemplate = new JdbcTemplate(businessDataSource);
|
||||
@ -53,20 +52,12 @@ public abstract class BaseSyncReader<T extends JobExecutionGroupable> implements
|
||||
|
||||
@Override
|
||||
public T read() throws Exception {
|
||||
if (allDataBuffer.isEmpty()) {
|
||||
// 이전 그룹 처리 완료 → null 반환하여 청크 종료
|
||||
// (Writer + afterWrite(P→S)가 실행된 후 다음 청크에서 다음 그룹 로드)
|
||||
if (currentGroupId != null) {
|
||||
currentGroupId = null;
|
||||
return null;
|
||||
}
|
||||
|
||||
// 다음 그룹 로드
|
||||
// buffer가 비어있으면 다음 그룹 로드 (연속)
|
||||
while (allDataBuffer.isEmpty()) {
|
||||
fetchNextGroup();
|
||||
}
|
||||
|
||||
if (allDataBuffer.isEmpty()) {
|
||||
return null; // 더 이상 처리할 데이터 없음 → Step 종료
|
||||
if (allDataBuffer.isEmpty()) {
|
||||
return null; // 더 이상 처리할 데이터 없음 → Step 종료
|
||||
}
|
||||
}
|
||||
|
||||
return allDataBuffer.remove(0);
|
||||
@ -78,10 +69,14 @@ public abstract class BaseSyncReader<T extends JobExecutionGroupable> implements
|
||||
nextTargetId = businessJdbcTemplate.queryForObject(
|
||||
CommonSql.getNextTargetQuery(getSourceTable()), Long.class);
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] 다음 처리 대상 조회 실패: {}", getLogPrefix(), e.getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
if (nextTargetId == null) return;
|
||||
if (nextTargetId == null) {
|
||||
log.debug("[{}] 더 이상 처리할 데이터 없음", getLogPrefix());
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("[{}] 다음 처리 대상 ID 발견: {}", getLogPrefix(), nextTargetId);
|
||||
|
||||
@ -92,7 +87,5 @@ public abstract class BaseSyncReader<T extends JobExecutionGroupable> implements
|
||||
// N→P 전환
|
||||
String updateSql = CommonSql.getProcessBatchQuery(getSourceTable());
|
||||
businessJdbcTemplate.update(updateSql, nextTargetId);
|
||||
|
||||
currentGroupId = nextTargetId;
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,10 +5,12 @@ import org.springframework.batch.core.ItemWriteListener;
|
||||
import org.springframework.batch.item.Chunk;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Writer 성공 후 batch_flag P→S 업데이트 리스너
|
||||
*
|
||||
* SQL은 실행 시점에 생성 (CommonSql.SOURCE_SCHEMA 초기화 보장)
|
||||
* 청크 내 모든 고유 job_execution_id에 대해 S 업데이트
|
||||
*/
|
||||
@Slf4j
|
||||
public class BatchWriteListener<S extends JobExecutionGroupable> implements ItemWriteListener<S> {
|
||||
@ -25,25 +27,31 @@ public class BatchWriteListener<S extends JobExecutionGroupable> implements Item
|
||||
public void afterWrite(Chunk<? extends S> items) {
|
||||
if (items.isEmpty()) return;
|
||||
|
||||
Long jobExecutionId = items.getItems().get(0).getJobExecutionId();
|
||||
String sql = CommonSql.getCompleteBatchQuery(sourceTable);
|
||||
|
||||
try {
|
||||
// SQL을 실행 시점에 생성하여 SOURCE_SCHEMA null 문제 방지
|
||||
String sql = CommonSql.getCompleteBatchQuery(sourceTable);
|
||||
int updatedRows = businessJdbcTemplate.update(sql, jobExecutionId);
|
||||
log.info("[BatchWriteListener] Success update 'S'. jobExecutionId: {}, rows: {}", jobExecutionId, updatedRows);
|
||||
} catch (Exception e) {
|
||||
log.error("[BatchWriteListener] Update 'S' failed. jobExecutionId: {}", jobExecutionId, e);
|
||||
throw e;
|
||||
Set<Long> jobExecutionIds = items.getItems().stream()
|
||||
.map(JobExecutionGroupable::getJobExecutionId)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
for (Long jobExecutionId : jobExecutionIds) {
|
||||
try {
|
||||
int updatedRows = businessJdbcTemplate.update(sql, jobExecutionId);
|
||||
log.info("[BatchWriteListener] Success update 'S'. jobExecutionId: {}, rows: {}", jobExecutionId, updatedRows);
|
||||
} catch (Exception e) {
|
||||
log.error("[BatchWriteListener] Update 'S' failed. jobExecutionId: {}", jobExecutionId, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWriteError(Exception exception, Chunk<? extends S> items) {
|
||||
if (!items.isEmpty()) {
|
||||
Long jobExecutionId = items.getItems().get(0).getJobExecutionId();
|
||||
log.error("[BatchWriteListener] Write Error Detected! jobExecutionId: {}. Status will NOT be updated to 'S'. Error: {}",
|
||||
jobExecutionId, exception.getMessage());
|
||||
Set<Long> ids = items.getItems().stream()
|
||||
.map(JobExecutionGroupable::getJobExecutionId)
|
||||
.collect(Collectors.toSet());
|
||||
log.error("[BatchWriteListener] Write Error Detected! jobExecutionIds: {}. Status will NOT be updated to 'S'. Error: {}",
|
||||
ids, exception.getMessage());
|
||||
}
|
||||
|
||||
if (exception instanceof RuntimeException) {
|
||||
|
||||
@ -309,10 +309,13 @@ public class TableMetaInfo {
|
||||
|
||||
// Risk & Compliance Tables
|
||||
@Value("${app.batch.target-schema.tables.risk-compliance-001}")
|
||||
public String targetTbShipRiskInfo;
|
||||
public String targetTbShipRiskDetailInfo;
|
||||
|
||||
@Value("${app.batch.target-schema.tables.risk-compliance-001-1}")
|
||||
public String targetTbShipRiskDetailInfoHstry;
|
||||
|
||||
@Value("${app.batch.target-schema.tables.risk-compliance-002}")
|
||||
public String targetTbShipRiskHstry;
|
||||
public String targetTbShipRiskDetailHstry;
|
||||
|
||||
@Value("${app.batch.target-schema.tables.risk-compliance-003}")
|
||||
public String targetTbShipComplianceInfo;
|
||||
|
||||
@ -0,0 +1,114 @@
|
||||
package com.snp.batch.jobs.datasync.batch.risk.config;
|
||||
|
||||
import com.snp.batch.common.batch.config.BaseJobConfig;
|
||||
import com.snp.batch.common.util.TableMetaInfo;
|
||||
import com.snp.batch.jobs.datasync.batch.risk.dto.RiskChangeDto;
|
||||
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskChangeEntity;
|
||||
import com.snp.batch.jobs.datasync.batch.risk.processor.RiskChangeProcessor;
|
||||
import com.snp.batch.jobs.datasync.batch.risk.reader.RiskChangeReader;
|
||||
import com.snp.batch.jobs.datasync.batch.risk.repository.RiskRepository;
|
||||
import com.snp.batch.jobs.datasync.batch.risk.writer.RiskChangeWriter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.Step;
|
||||
import org.springframework.batch.core.configuration.annotation.StepScope;
|
||||
import org.springframework.batch.core.job.builder.JobBuilder;
|
||||
import org.springframework.batch.core.repository.JobRepository;
|
||||
import org.springframework.batch.core.step.builder.StepBuilder;
|
||||
import org.springframework.batch.item.ItemProcessor;
|
||||
import org.springframework.batch.item.ItemReader;
|
||||
import org.springframework.batch.item.ItemWriter;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
|
||||
/**
|
||||
* Risk 값 변경 이력 관리 Job
|
||||
* tb_ship_risk_detail_hstry (스냅샷)에서 시계열 비교 → tb_ship_risk_detail_info_hstry에 변경분 저장
|
||||
*/
|
||||
@Slf4j
|
||||
@Configuration
|
||||
public class RiskDetailChangeSyncJobConfig extends BaseJobConfig<RiskChangeDto, RiskChangeEntity> {
|
||||
|
||||
private final TableMetaInfo tableMetaInfo;
|
||||
private final RiskRepository riskRepository;
|
||||
private final DataSource batchDataSource;
|
||||
private final String targetSchema;
|
||||
|
||||
private static final int CHUNK_SIZE = 1000;
|
||||
|
||||
public RiskDetailChangeSyncJobConfig(
|
||||
JobRepository jobRepository,
|
||||
PlatformTransactionManager transactionManager,
|
||||
RiskRepository riskRepository,
|
||||
TableMetaInfo tableMetaInfo,
|
||||
@Qualifier("batchDataSource") DataSource batchDataSource,
|
||||
@Value("${app.batch.target-schema.name}") String targetSchema) {
|
||||
super(jobRepository, transactionManager);
|
||||
this.riskRepository = riskRepository;
|
||||
this.tableMetaInfo = tableMetaInfo;
|
||||
this.batchDataSource = batchDataSource;
|
||||
this.targetSchema = targetSchema;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getJobName() {
|
||||
return "riskDetailChangeDataSyncJob";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getStepName() {
|
||||
return "riskDetailChangeSyncStep";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ItemReader<RiskChangeDto> createReader() {
|
||||
return riskChangeReader(batchDataSource, tableMetaInfo, targetSchema);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ItemProcessor<RiskChangeDto, RiskChangeEntity> createProcessor() {
|
||||
return new RiskChangeProcessor();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ItemWriter<RiskChangeEntity> createWriter() {
|
||||
return new RiskChangeWriter(riskRepository);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@StepScope
|
||||
public ItemReader<RiskChangeDto> riskChangeReader(
|
||||
@Qualifier("batchDataSource") DataSource batchDataSource,
|
||||
TableMetaInfo tableMetaInfo,
|
||||
@Value("${app.batch.target-schema.name}") String targetSchema) {
|
||||
return new RiskChangeReader(batchDataSource, tableMetaInfo, targetSchema);
|
||||
}
|
||||
|
||||
@Bean(name = "riskDetailChangeSyncStep")
|
||||
public Step riskDetailChangeSyncStep() {
|
||||
log.info("Step 생성: riskDetailChangeSyncStep");
|
||||
return new StepBuilder(getStepName(), jobRepository)
|
||||
.<RiskChangeDto, RiskChangeEntity>chunk(CHUNK_SIZE, transactionManager)
|
||||
.reader(createReader())
|
||||
.processor(createProcessor())
|
||||
.writer(createWriter())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Job createJobFlow(JobBuilder jobBuilder) {
|
||||
return jobBuilder
|
||||
.start(riskDetailChangeSyncStep())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean(name = "riskDetailChangeDataSyncJob")
|
||||
public Job riskDetailChangeDataSyncJob() {
|
||||
return job();
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,25 @@
|
||||
package com.snp.batch.jobs.datasync.batch.risk.dto;
|
||||
|
||||
import com.snp.batch.common.util.JobExecutionGroupable;
|
||||
import lombok.*;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class RiskChangeDto implements JobExecutionGroupable {
|
||||
private Long jobExecutionId;
|
||||
private String imoNo;
|
||||
private LocalDateTime lastMdfcnDt;
|
||||
private String flctnColNm;
|
||||
private String bfrVal;
|
||||
private String aftrVal;
|
||||
|
||||
@Override
|
||||
public Long getJobExecutionId() {
|
||||
return this.jobExecutionId;
|
||||
}
|
||||
}
|
||||
@ -1,6 +1,8 @@
|
||||
package com.snp.batch.jobs.datasync.batch.risk.dto;
|
||||
|
||||
import com.snp.batch.common.util.JobExecutionGroupable;
|
||||
import lombok.*;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Getter
|
||||
@ -12,46 +14,88 @@ public class RiskDto implements JobExecutionGroupable {
|
||||
private Long jobExecutionId;
|
||||
private String imoNo;
|
||||
private LocalDateTime lastMdfcnDt;
|
||||
private String riskDataMaint;
|
||||
private String aisNotrcvElpsDays;
|
||||
private String aisLwrnkDays;
|
||||
private String aisUpImoDesc;
|
||||
private String othrShipNmVoyYn;
|
||||
private String mmsiAnomMessage;
|
||||
private String recentDarkActv;
|
||||
private String portPrtcll;
|
||||
private String portRisk;
|
||||
private String stsJob;
|
||||
private String driftChg;
|
||||
private String riskEvent;
|
||||
private String ntnltyChg;
|
||||
private String ntnltyPrsMouPerf;
|
||||
private String ntnltyTkyMouPerf;
|
||||
private String ntnltyUscgMouPerf;
|
||||
private String uscgExclShipCert;
|
||||
private String pscInspectionElpsHr;
|
||||
private String pscInspection;
|
||||
private String pscDefect;
|
||||
private String pscDetained;
|
||||
private String nowSmgrcEvdc;
|
||||
private String doccChg;
|
||||
private String nowClfic;
|
||||
private String clficStatusChg;
|
||||
private String pniInsrnc;
|
||||
private String shipNmChg;
|
||||
private String gboChg;
|
||||
private String vslage;
|
||||
private String ilglFshrViol;
|
||||
private String draftChg;
|
||||
private String recentSanctionPrtcll;
|
||||
private String snglShipVoy;
|
||||
private String fltsfty;
|
||||
private String fltPsc;
|
||||
private String spcInspectionOvdue;
|
||||
private String ownrUnk;
|
||||
private String rssPortCall;
|
||||
private String rssOwnrReg;
|
||||
private String rssSts;
|
||||
private Integer riskDataMaint;
|
||||
private Integer aisNotrcvElpsDays;
|
||||
private String aisNotrcvElpsDaysDesc;
|
||||
private Integer aisLwrnkDays;
|
||||
private String aisLwrnkDaysDesc;
|
||||
private Integer aisUpImoDesc;
|
||||
private String aisUpImoDescVal;
|
||||
private Integer othrShipNmVoyYn;
|
||||
private String othrShipNmVoyYnDesc;
|
||||
private Integer mmsiAnomMessage;
|
||||
private String mmsiAnomMessageDesc;
|
||||
private Integer recentDarkActv;
|
||||
private String recentDarkActvDesc;
|
||||
private Integer portPrtcll;
|
||||
private String portPrtcllDesc;
|
||||
private Integer portRisk;
|
||||
private String portRiskDesc;
|
||||
private Integer stsJob;
|
||||
private String stsJobDesc;
|
||||
private Integer driftChg;
|
||||
private String driftChgDesc;
|
||||
private Integer riskEvent;
|
||||
private String riskEventDesc;
|
||||
private String riskEventDescExt;
|
||||
private Integer ntnltyChg;
|
||||
private String ntnltyChgDesc;
|
||||
private Integer ntnltyPrsMouPerf;
|
||||
private String ntnltyPrsMouPerfDesc;
|
||||
private Integer ntnltyTkyMouPerf;
|
||||
private String ntnltyTkyMouPerfDesc;
|
||||
private Integer ntnltyUscgMouPerf;
|
||||
private String ntnltyUscgMouPerfDesc;
|
||||
private Integer uscgExclShipCert;
|
||||
private String uscgExclShipCertDesc;
|
||||
private Integer pscInspectionElpsHr;
|
||||
private String pscInspectionElpsHrDesc;
|
||||
private Integer pscInspection;
|
||||
private String pscInspectionDesc;
|
||||
private Integer pscDefect;
|
||||
private String pscDefectDesc;
|
||||
private Integer pscDetained;
|
||||
private String pscDetainedDesc;
|
||||
private Integer nowSmgrcEvdc;
|
||||
private String nowSmgrcEvdcDesc;
|
||||
private Integer doccChg;
|
||||
private String doccChgDesc;
|
||||
private Integer nowClfic;
|
||||
private String nowClficDesc;
|
||||
private String nowClficDescExt;
|
||||
private Integer clficStatusChg;
|
||||
private String clficStatusChgDesc;
|
||||
private Integer pniInsrnc;
|
||||
private String pniInsrncDesc;
|
||||
private String pniInsrncDescExt;
|
||||
private Integer shipNmChg;
|
||||
private String shipNmChgDesc;
|
||||
private Integer gboChg;
|
||||
private String gboChgDesc;
|
||||
private Integer vslage;
|
||||
private String vslageDesc;
|
||||
private Integer ilglFshrViol;
|
||||
private String ilglFshrViolDesc;
|
||||
private Integer draftChg;
|
||||
private String draftChgDesc;
|
||||
private Integer recentSanctionPrtcll;
|
||||
private String recentSanctionPrtcllDesc;
|
||||
private Integer snglShipVoy;
|
||||
private String snglShipVoyDesc;
|
||||
private Integer fltsfty;
|
||||
private String fltsftyDesc;
|
||||
private Integer fltPsc;
|
||||
private String fltPscDesc;
|
||||
private Integer spcInspectionOvdue;
|
||||
private String spcInspectionOvdueDesc;
|
||||
private Integer ownrUnk;
|
||||
private String ownrUnkDesc;
|
||||
private Integer rssPortCall;
|
||||
private String rssPortCallDesc;
|
||||
private Integer rssOwnrReg;
|
||||
private String rssOwnrRegDesc;
|
||||
private Integer rssSts;
|
||||
private String rssStsDesc;
|
||||
|
||||
@Override
|
||||
public Long getJobExecutionId() {
|
||||
|
||||
@ -0,0 +1,25 @@
|
||||
package com.snp.batch.jobs.datasync.batch.risk.entity;
|
||||
|
||||
import com.snp.batch.common.util.JobExecutionGroupable;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Data
|
||||
@SuperBuilder
|
||||
@AllArgsConstructor
|
||||
public class RiskChangeEntity implements JobExecutionGroupable {
|
||||
private String imoNo;
|
||||
private LocalDateTime lastMdfcnDt;
|
||||
private String flctnColNm;
|
||||
private String bfrVal;
|
||||
private String aftrVal;
|
||||
|
||||
private Long jobExecutionId;
|
||||
|
||||
@Override
|
||||
public Long getJobExecutionId() {
|
||||
return this.jobExecutionId;
|
||||
}
|
||||
}
|
||||
@ -1,7 +1,9 @@
|
||||
package com.snp.batch.jobs.datasync.batch.risk.entity;
|
||||
|
||||
import com.snp.batch.common.util.JobExecutionGroupable;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Data
|
||||
@ -10,46 +12,88 @@ import java.time.LocalDateTime;
|
||||
public class RiskEntity implements JobExecutionGroupable {
|
||||
private String imoNo;
|
||||
private LocalDateTime lastMdfcnDt;
|
||||
private String riskDataMaint;
|
||||
private String aisNotrcvElpsDays;
|
||||
private String aisLwrnkDays;
|
||||
private String aisUpImoDesc;
|
||||
private String othrShipNmVoyYn;
|
||||
private String mmsiAnomMessage;
|
||||
private String recentDarkActv;
|
||||
private String portPrtcll;
|
||||
private String portRisk;
|
||||
private String stsJob;
|
||||
private String driftChg;
|
||||
private String riskEvent;
|
||||
private String ntnltyChg;
|
||||
private String ntnltyPrsMouPerf;
|
||||
private String ntnltyTkyMouPerf;
|
||||
private String ntnltyUscgMouPerf;
|
||||
private String uscgExclShipCert;
|
||||
private String pscInspectionElpsHr;
|
||||
private String pscInspection;
|
||||
private String pscDefect;
|
||||
private String pscDetained;
|
||||
private String nowSmgrcEvdc;
|
||||
private String doccChg;
|
||||
private String nowClfic;
|
||||
private String clficStatusChg;
|
||||
private String pniInsrnc;
|
||||
private String shipNmChg;
|
||||
private String gboChg;
|
||||
private String vslage;
|
||||
private String ilglFshrViol;
|
||||
private String draftChg;
|
||||
private String recentSanctionPrtcll;
|
||||
private String snglShipVoy;
|
||||
private String fltsfty;
|
||||
private String fltPsc;
|
||||
private String spcInspectionOvdue;
|
||||
private String ownrUnk;
|
||||
private String rssPortCall;
|
||||
private String rssOwnrReg;
|
||||
private String rssSts;
|
||||
private Integer riskDataMaint;
|
||||
private Integer aisNotrcvElpsDays;
|
||||
private String aisNotrcvElpsDaysDesc;
|
||||
private Integer aisLwrnkDays;
|
||||
private String aisLwrnkDaysDesc;
|
||||
private Integer aisUpImoDesc;
|
||||
private String aisUpImoDescVal;
|
||||
private Integer othrShipNmVoyYn;
|
||||
private String othrShipNmVoyYnDesc;
|
||||
private Integer mmsiAnomMessage;
|
||||
private String mmsiAnomMessageDesc;
|
||||
private Integer recentDarkActv;
|
||||
private String recentDarkActvDesc;
|
||||
private Integer portPrtcll;
|
||||
private String portPrtcllDesc;
|
||||
private Integer portRisk;
|
||||
private String portRiskDesc;
|
||||
private Integer stsJob;
|
||||
private String stsJobDesc;
|
||||
private Integer driftChg;
|
||||
private String driftChgDesc;
|
||||
private Integer riskEvent;
|
||||
private String riskEventDesc;
|
||||
private String riskEventDescExt;
|
||||
private Integer ntnltyChg;
|
||||
private String ntnltyChgDesc;
|
||||
private Integer ntnltyPrsMouPerf;
|
||||
private String ntnltyPrsMouPerfDesc;
|
||||
private Integer ntnltyTkyMouPerf;
|
||||
private String ntnltyTkyMouPerfDesc;
|
||||
private Integer ntnltyUscgMouPerf;
|
||||
private String ntnltyUscgMouPerfDesc;
|
||||
private Integer uscgExclShipCert;
|
||||
private String uscgExclShipCertDesc;
|
||||
private Integer pscInspectionElpsHr;
|
||||
private String pscInspectionElpsHrDesc;
|
||||
private Integer pscInspection;
|
||||
private String pscInspectionDesc;
|
||||
private Integer pscDefect;
|
||||
private String pscDefectDesc;
|
||||
private Integer pscDetained;
|
||||
private String pscDetainedDesc;
|
||||
private Integer nowSmgrcEvdc;
|
||||
private String nowSmgrcEvdcDesc;
|
||||
private Integer doccChg;
|
||||
private String doccChgDesc;
|
||||
private Integer nowClfic;
|
||||
private String nowClficDesc;
|
||||
private String nowClficDescExt;
|
||||
private Integer clficStatusChg;
|
||||
private String clficStatusChgDesc;
|
||||
private Integer pniInsrnc;
|
||||
private String pniInsrncDesc;
|
||||
private String pniInsrncDescExt;
|
||||
private Integer shipNmChg;
|
||||
private String shipNmChgDesc;
|
||||
private Integer gboChg;
|
||||
private String gboChgDesc;
|
||||
private Integer vslage;
|
||||
private String vslageDesc;
|
||||
private Integer ilglFshrViol;
|
||||
private String ilglFshrViolDesc;
|
||||
private Integer draftChg;
|
||||
private String draftChgDesc;
|
||||
private Integer recentSanctionPrtcll;
|
||||
private String recentSanctionPrtcllDesc;
|
||||
private Integer snglShipVoy;
|
||||
private String snglShipVoyDesc;
|
||||
private Integer fltsfty;
|
||||
private String fltsftyDesc;
|
||||
private Integer fltPsc;
|
||||
private String fltPscDesc;
|
||||
private Integer spcInspectionOvdue;
|
||||
private String spcInspectionOvdueDesc;
|
||||
private Integer ownrUnk;
|
||||
private String ownrUnkDesc;
|
||||
private Integer rssPortCall;
|
||||
private String rssPortCallDesc;
|
||||
private Integer rssOwnrReg;
|
||||
private String rssOwnrRegDesc;
|
||||
private Integer rssSts;
|
||||
private String rssStsDesc;
|
||||
|
||||
private Long jobExecutionId;
|
||||
|
||||
|
||||
@ -0,0 +1,21 @@
|
||||
package com.snp.batch.jobs.datasync.batch.risk.processor;
|
||||
|
||||
import com.snp.batch.common.batch.processor.BaseProcessor;
|
||||
import com.snp.batch.jobs.datasync.batch.risk.dto.RiskChangeDto;
|
||||
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskChangeEntity;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class RiskChangeProcessor extends BaseProcessor<RiskChangeDto, RiskChangeEntity> {
|
||||
@Override
|
||||
protected RiskChangeEntity processItem(RiskChangeDto dto) throws Exception {
|
||||
return RiskChangeEntity.builder()
|
||||
.jobExecutionId(dto.getJobExecutionId())
|
||||
.imoNo(dto.getImoNo())
|
||||
.lastMdfcnDt(dto.getLastMdfcnDt())
|
||||
.flctnColNm(dto.getFlctnColNm())
|
||||
.bfrVal(dto.getBfrVal())
|
||||
.aftrVal(dto.getAftrVal())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@ -15,44 +15,86 @@ public class RiskProcessor extends BaseProcessor<RiskDto, RiskEntity> {
|
||||
.lastMdfcnDt(dto.getLastMdfcnDt())
|
||||
.riskDataMaint(dto.getRiskDataMaint())
|
||||
.aisNotrcvElpsDays(dto.getAisNotrcvElpsDays())
|
||||
.aisNotrcvElpsDaysDesc(dto.getAisNotrcvElpsDaysDesc())
|
||||
.aisLwrnkDays(dto.getAisLwrnkDays())
|
||||
.aisLwrnkDaysDesc(dto.getAisLwrnkDaysDesc())
|
||||
.aisUpImoDesc(dto.getAisUpImoDesc())
|
||||
.aisUpImoDescVal(dto.getAisUpImoDescVal())
|
||||
.othrShipNmVoyYn(dto.getOthrShipNmVoyYn())
|
||||
.othrShipNmVoyYnDesc(dto.getOthrShipNmVoyYnDesc())
|
||||
.mmsiAnomMessage(dto.getMmsiAnomMessage())
|
||||
.mmsiAnomMessageDesc(dto.getMmsiAnomMessageDesc())
|
||||
.recentDarkActv(dto.getRecentDarkActv())
|
||||
.recentDarkActvDesc(dto.getRecentDarkActvDesc())
|
||||
.portPrtcll(dto.getPortPrtcll())
|
||||
.portPrtcllDesc(dto.getPortPrtcllDesc())
|
||||
.portRisk(dto.getPortRisk())
|
||||
.portRiskDesc(dto.getPortRiskDesc())
|
||||
.stsJob(dto.getStsJob())
|
||||
.stsJobDesc(dto.getStsJobDesc())
|
||||
.driftChg(dto.getDriftChg())
|
||||
.driftChgDesc(dto.getDriftChgDesc())
|
||||
.riskEvent(dto.getRiskEvent())
|
||||
.riskEventDesc(dto.getRiskEventDesc())
|
||||
.riskEventDescExt(dto.getRiskEventDescExt())
|
||||
.ntnltyChg(dto.getNtnltyChg())
|
||||
.ntnltyChgDesc(dto.getNtnltyChgDesc())
|
||||
.ntnltyPrsMouPerf(dto.getNtnltyPrsMouPerf())
|
||||
.ntnltyPrsMouPerfDesc(dto.getNtnltyPrsMouPerfDesc())
|
||||
.ntnltyTkyMouPerf(dto.getNtnltyTkyMouPerf())
|
||||
.ntnltyTkyMouPerfDesc(dto.getNtnltyTkyMouPerfDesc())
|
||||
.ntnltyUscgMouPerf(dto.getNtnltyUscgMouPerf())
|
||||
.ntnltyUscgMouPerfDesc(dto.getNtnltyUscgMouPerfDesc())
|
||||
.uscgExclShipCert(dto.getUscgExclShipCert())
|
||||
.uscgExclShipCertDesc(dto.getUscgExclShipCertDesc())
|
||||
.pscInspectionElpsHr(dto.getPscInspectionElpsHr())
|
||||
.pscInspectionElpsHrDesc(dto.getPscInspectionElpsHrDesc())
|
||||
.pscInspection(dto.getPscInspection())
|
||||
.pscInspectionDesc(dto.getPscInspectionDesc())
|
||||
.pscDefect(dto.getPscDefect())
|
||||
.pscDefectDesc(dto.getPscDefectDesc())
|
||||
.pscDetained(dto.getPscDetained())
|
||||
.pscDetainedDesc(dto.getPscDetainedDesc())
|
||||
.nowSmgrcEvdc(dto.getNowSmgrcEvdc())
|
||||
.nowSmgrcEvdcDesc(dto.getNowSmgrcEvdcDesc())
|
||||
.doccChg(dto.getDoccChg())
|
||||
.doccChgDesc(dto.getDoccChgDesc())
|
||||
.nowClfic(dto.getNowClfic())
|
||||
.nowClficDesc(dto.getNowClficDesc())
|
||||
.nowClficDescExt(dto.getNowClficDescExt())
|
||||
.clficStatusChg(dto.getClficStatusChg())
|
||||
.clficStatusChgDesc(dto.getClficStatusChgDesc())
|
||||
.pniInsrnc(dto.getPniInsrnc())
|
||||
.pniInsrncDesc(dto.getPniInsrncDesc())
|
||||
.pniInsrncDescExt(dto.getPniInsrncDescExt())
|
||||
.shipNmChg(dto.getShipNmChg())
|
||||
.shipNmChgDesc(dto.getShipNmChgDesc())
|
||||
.gboChg(dto.getGboChg())
|
||||
.gboChgDesc(dto.getGboChgDesc())
|
||||
.vslage(dto.getVslage())
|
||||
.vslageDesc(dto.getVslageDesc())
|
||||
.ilglFshrViol(dto.getIlglFshrViol())
|
||||
.ilglFshrViolDesc(dto.getIlglFshrViolDesc())
|
||||
.draftChg(dto.getDraftChg())
|
||||
.draftChgDesc(dto.getDraftChgDesc())
|
||||
.recentSanctionPrtcll(dto.getRecentSanctionPrtcll())
|
||||
.recentSanctionPrtcllDesc(dto.getRecentSanctionPrtcllDesc())
|
||||
.snglShipVoy(dto.getSnglShipVoy())
|
||||
.snglShipVoyDesc(dto.getSnglShipVoyDesc())
|
||||
.fltsfty(dto.getFltsfty())
|
||||
.fltsftyDesc(dto.getFltsftyDesc())
|
||||
.fltPsc(dto.getFltPsc())
|
||||
.fltPscDesc(dto.getFltPscDesc())
|
||||
.spcInspectionOvdue(dto.getSpcInspectionOvdue())
|
||||
.spcInspectionOvdueDesc(dto.getSpcInspectionOvdueDesc())
|
||||
.ownrUnk(dto.getOwnrUnk())
|
||||
.ownrUnkDesc(dto.getOwnrUnkDesc())
|
||||
.rssPortCall(dto.getRssPortCall())
|
||||
.rssPortCallDesc(dto.getRssPortCallDesc())
|
||||
.rssOwnrReg(dto.getRssOwnrReg())
|
||||
.rssOwnrRegDesc(dto.getRssOwnrRegDesc())
|
||||
.rssSts(dto.getRssSts())
|
||||
.rssStsDesc(dto.getRssStsDesc())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,163 @@
|
||||
package com.snp.batch.jobs.datasync.batch.risk.reader;
|
||||
|
||||
import com.snp.batch.common.util.TableMetaInfo;
|
||||
import com.snp.batch.jobs.datasync.batch.risk.dto.RiskChangeDto;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.item.ItemReader;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* Risk 값 변경 이력 Reader
|
||||
* tb_ship_risk_detail_hstry (스냅샷 이력)에서 imo_no별 시계열 비교하여 변경분 감지
|
||||
*/
|
||||
@Slf4j
|
||||
public class RiskChangeReader implements ItemReader<RiskChangeDto> {
|
||||
|
||||
private final TableMetaInfo tableMetaInfo;
|
||||
private final JdbcTemplate batchJdbcTemplate;
|
||||
private final String targetSchema;
|
||||
private List<RiskChangeDto> changeBuffer = new ArrayList<>();
|
||||
private boolean initialized = false;
|
||||
|
||||
private static final List<String> INDICATOR_COLUMNS = List.of(
|
||||
"ais_notrcv_elps_days", "ais_lwrnk_days", "ais_up_imo_desc",
|
||||
"othr_ship_nm_voy_yn", "mmsi_anom_message", "recent_dark_actv",
|
||||
"port_prtcll", "port_risk", "sts_job", "drift_chg",
|
||||
"risk_event", "ntnlty_chg", "ntnlty_prs_mou_perf",
|
||||
"ntnlty_tky_mou_perf", "ntnlty_uscg_mou_perf", "uscg_excl_ship_cert",
|
||||
"psc_inspection_elps_hr", "psc_inspection", "psc_defect", "psc_detained",
|
||||
"now_smgrc_evdc", "docc_chg", "now_clfic", "clfic_status_chg",
|
||||
"pni_insrnc", "ship_nm_chg", "gbo_chg", "vslage",
|
||||
"ilgl_fshr_viol", "draft_chg", "recent_sanction_prtcll", "sngl_ship_voy",
|
||||
"fltsfty", "flt_psc", "spc_inspection_ovdue", "ownr_unk",
|
||||
"rss_port_call", "rss_ownr_reg", "rss_sts"
|
||||
);
|
||||
|
||||
public RiskChangeReader(@Qualifier("batchDataSource") DataSource batchDataSource,
|
||||
TableMetaInfo tableMetaInfo,
|
||||
@Value("${app.batch.target-schema.name}") String targetSchema) {
|
||||
this.batchJdbcTemplate = new JdbcTemplate(batchDataSource);
|
||||
this.tableMetaInfo = tableMetaInfo;
|
||||
this.targetSchema = targetSchema;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RiskChangeDto read() throws Exception {
|
||||
if (!initialized) {
|
||||
initializeChangeBuffer();
|
||||
initialized = true;
|
||||
}
|
||||
if (changeBuffer.isEmpty()) return null;
|
||||
return changeBuffer.remove(0);
|
||||
}
|
||||
|
||||
private void initializeChangeBuffer() {
|
||||
log.info("[RiskChangeReader] 변경 이력 감지 시작");
|
||||
|
||||
// 스냅샷 이력에서 indicator 컬럼만 조회 (imo_no, last_mdfcn_dt 순 정렬)
|
||||
String columnList = String.join(", ", INDICATOR_COLUMNS);
|
||||
String sql = String.format("""
|
||||
SELECT imo_no, last_mdfcn_dt, %s
|
||||
FROM %s.%s
|
||||
ORDER BY imo_no, last_mdfcn_dt ASC
|
||||
""", columnList, targetSchema, tableMetaInfo.targetTbShipRiskDetailHstry);
|
||||
|
||||
List<Map<String, Object>> allData = batchJdbcTemplate.queryForList(sql);
|
||||
log.info("[RiskChangeReader] 스냅샷 이력 조회 완료: {} 건", allData.size());
|
||||
|
||||
// 기존 변경 이력 조회 (중복 방지)
|
||||
String existingSql = String.format("""
|
||||
SELECT imo_no, last_mdfcn_dt, flctn_col_nm
|
||||
FROM %s.%s
|
||||
""", targetSchema, tableMetaInfo.targetTbShipRiskDetailInfoHstry);
|
||||
|
||||
Set<String> existingKeys = new HashSet<>();
|
||||
try {
|
||||
List<Map<String, Object>> existingData = batchJdbcTemplate.queryForList(existingSql);
|
||||
for (Map<String, Object> row : existingData) {
|
||||
existingKeys.add(buildExistingKey(row));
|
||||
}
|
||||
log.info("[RiskChangeReader] 기존 변경 이력 데이터: {} 건", existingKeys.size());
|
||||
} catch (Exception e) {
|
||||
log.warn("[RiskChangeReader] 기존 데이터 조회 실패: {}", e.getMessage());
|
||||
}
|
||||
|
||||
// imo_no별 그룹핑
|
||||
Map<String, List<Map<String, Object>>> groupedByImo = new LinkedHashMap<>();
|
||||
for (Map<String, Object> row : allData) {
|
||||
String imoNo = (String) row.get("imo_no");
|
||||
groupedByImo.computeIfAbsent(imoNo, k -> new ArrayList<>()).add(row);
|
||||
}
|
||||
|
||||
// 각 imo_no별 시계열 비교
|
||||
long changeCount = 0;
|
||||
for (Map.Entry<String, List<Map<String, Object>>> entry : groupedByImo.entrySet()) {
|
||||
String imoNo = entry.getKey();
|
||||
List<Map<String, Object>> records = entry.getValue();
|
||||
|
||||
Map<String, Object> previousRecord = null;
|
||||
for (Map<String, Object> currentRecord : records) {
|
||||
if (previousRecord != null) {
|
||||
List<RiskChangeDto> changes = detectChanges(imoNo, previousRecord, currentRecord, existingKeys);
|
||||
changeBuffer.addAll(changes);
|
||||
changeCount += changes.size();
|
||||
}
|
||||
previousRecord = currentRecord;
|
||||
}
|
||||
}
|
||||
|
||||
log.info("[RiskChangeReader] 변경 이력 감지 완료: {} 건 (imo_no 그룹: {} 개)", changeCount, groupedByImo.size());
|
||||
}
|
||||
|
||||
private List<RiskChangeDto> detectChanges(String imoNo,
|
||||
Map<String, Object> previousRecord,
|
||||
Map<String, Object> currentRecord,
|
||||
Set<String> existingKeys) {
|
||||
List<RiskChangeDto> changes = new ArrayList<>();
|
||||
Timestamp currentTs = (Timestamp) currentRecord.get("last_mdfcn_dt");
|
||||
LocalDateTime lastMdfcnDt = currentTs != null ? currentTs.toLocalDateTime() : null;
|
||||
|
||||
for (String column : INDICATOR_COLUMNS) {
|
||||
Object prevValue = previousRecord.get(column);
|
||||
Object currValue = currentRecord.get(column);
|
||||
|
||||
if (!Objects.equals(prevValue, currValue)) {
|
||||
String existingKey = imoNo + "|" + lastMdfcnDt + "|" + column;
|
||||
if (existingKeys.contains(existingKey)) continue;
|
||||
|
||||
changes.add(RiskChangeDto.builder()
|
||||
.jobExecutionId(0L)
|
||||
.imoNo(imoNo)
|
||||
.lastMdfcnDt(lastMdfcnDt)
|
||||
.flctnColNm(column)
|
||||
.bfrVal(convertToString(prevValue))
|
||||
.aftrVal(convertToString(currValue))
|
||||
.build());
|
||||
}
|
||||
}
|
||||
return changes;
|
||||
}
|
||||
|
||||
private String buildExistingKey(Map<String, Object> row) {
|
||||
String imoNo = (String) row.get("imo_no");
|
||||
Object lastMdfcnDtObj = row.get("last_mdfcn_dt");
|
||||
LocalDateTime lastMdfcnDt = null;
|
||||
if (lastMdfcnDtObj instanceof Timestamp) {
|
||||
lastMdfcnDt = ((Timestamp) lastMdfcnDtObj).toLocalDateTime();
|
||||
}
|
||||
String flctnColNm = (String) row.get("flctn_col_nm");
|
||||
return imoNo + "|" + lastMdfcnDt + "|" + flctnColNm;
|
||||
}
|
||||
|
||||
private String convertToString(Object value) {
|
||||
if (value == null) return "null";
|
||||
return String.valueOf(value);
|
||||
}
|
||||
}
|
||||
@ -31,46 +31,88 @@ public class RiskReader extends BaseSyncReader<RiskDto> {
|
||||
.jobExecutionId(targetId)
|
||||
.imoNo(rs.getString("imo_no"))
|
||||
.lastMdfcnDt(lastMdfcnDtTs != null ? lastMdfcnDtTs.toLocalDateTime() : null)
|
||||
.riskDataMaint(rs.getString("risk_data_maint"))
|
||||
.aisNotrcvElpsDays(rs.getString("ais_notrcv_elps_days"))
|
||||
.aisLwrnkDays(rs.getString("ais_lwrnk_days"))
|
||||
.aisUpImoDesc(rs.getString("ais_up_imo_desc"))
|
||||
.othrShipNmVoyYn(rs.getString("othr_ship_nm_voy_yn"))
|
||||
.mmsiAnomMessage(rs.getString("mmsi_anom_message"))
|
||||
.recentDarkActv(rs.getString("recent_dark_actv"))
|
||||
.portPrtcll(rs.getString("port_prtcll"))
|
||||
.portRisk(rs.getString("port_risk"))
|
||||
.stsJob(rs.getString("sts_job"))
|
||||
.driftChg(rs.getString("drift_chg"))
|
||||
.riskEvent(rs.getString("risk_event"))
|
||||
.ntnltyChg(rs.getString("ntnlty_chg"))
|
||||
.ntnltyPrsMouPerf(rs.getString("ntnlty_prs_mou_perf"))
|
||||
.ntnltyTkyMouPerf(rs.getString("ntnlty_tky_mou_perf"))
|
||||
.ntnltyUscgMouPerf(rs.getString("ntnlty_uscg_mou_perf"))
|
||||
.uscgExclShipCert(rs.getString("uscg_excl_ship_cert"))
|
||||
.pscInspectionElpsHr(rs.getString("psc_inspection_elps_hr"))
|
||||
.pscInspection(rs.getString("psc_inspection"))
|
||||
.pscDefect(rs.getString("psc_defect"))
|
||||
.pscDetained(rs.getString("psc_detained"))
|
||||
.nowSmgrcEvdc(rs.getString("now_smgrc_evdc"))
|
||||
.doccChg(rs.getString("docc_chg"))
|
||||
.nowClfic(rs.getString("now_clfic"))
|
||||
.clficStatusChg(rs.getString("clfic_status_chg"))
|
||||
.pniInsrnc(rs.getString("pni_insrnc"))
|
||||
.shipNmChg(rs.getString("ship_nm_chg"))
|
||||
.gboChg(rs.getString("gbo_chg"))
|
||||
.vslage(rs.getString("vslage"))
|
||||
.ilglFshrViol(rs.getString("ilgl_fshr_viol"))
|
||||
.draftChg(rs.getString("draft_chg"))
|
||||
.recentSanctionPrtcll(rs.getString("recent_sanction_prtcll"))
|
||||
.snglShipVoy(rs.getString("sngl_ship_voy"))
|
||||
.fltsfty(rs.getString("fltsfty"))
|
||||
.fltPsc(rs.getString("flt_psc"))
|
||||
.spcInspectionOvdue(rs.getString("spc_inspection_ovdue"))
|
||||
.ownrUnk(rs.getString("ownr_unk"))
|
||||
.rssPortCall(rs.getString("rss_port_call"))
|
||||
.rssOwnrReg(rs.getString("rss_ownr_reg"))
|
||||
.rssSts(rs.getString("rss_sts"))
|
||||
.riskDataMaint(rs.getObject("risk_data_maint") != null ? rs.getInt("risk_data_maint") : null)
|
||||
.aisNotrcvElpsDays(rs.getObject("ais_notrcv_elps_days") != null ? rs.getInt("ais_notrcv_elps_days") : null)
|
||||
.aisNotrcvElpsDaysDesc(rs.getString("ais_notrcv_elps_days_desc"))
|
||||
.aisLwrnkDays(rs.getObject("ais_lwrnk_days") != null ? rs.getInt("ais_lwrnk_days") : null)
|
||||
.aisLwrnkDaysDesc(rs.getString("ais_lwrnk_days_desc"))
|
||||
.aisUpImoDesc(rs.getObject("ais_up_imo_desc") != null ? rs.getInt("ais_up_imo_desc") : null)
|
||||
.aisUpImoDescVal(rs.getString("ais_up_imo_desc_val"))
|
||||
.othrShipNmVoyYn(rs.getObject("othr_ship_nm_voy_yn") != null ? rs.getInt("othr_ship_nm_voy_yn") : null)
|
||||
.othrShipNmVoyYnDesc(rs.getString("othr_ship_nm_voy_yn_desc"))
|
||||
.mmsiAnomMessage(rs.getObject("mmsi_anom_message") != null ? rs.getInt("mmsi_anom_message") : null)
|
||||
.mmsiAnomMessageDesc(rs.getString("mmsi_anom_message_desc"))
|
||||
.recentDarkActv(rs.getObject("recent_dark_actv") != null ? rs.getInt("recent_dark_actv") : null)
|
||||
.recentDarkActvDesc(rs.getString("recent_dark_actv_desc"))
|
||||
.portPrtcll(rs.getObject("port_prtcll") != null ? rs.getInt("port_prtcll") : null)
|
||||
.portPrtcllDesc(rs.getString("port_prtcll_desc"))
|
||||
.portRisk(rs.getObject("port_risk") != null ? rs.getInt("port_risk") : null)
|
||||
.portRiskDesc(rs.getString("port_risk_desc"))
|
||||
.stsJob(rs.getObject("sts_job") != null ? rs.getInt("sts_job") : null)
|
||||
.stsJobDesc(rs.getString("sts_job_desc"))
|
||||
.driftChg(rs.getObject("drift_chg") != null ? rs.getInt("drift_chg") : null)
|
||||
.driftChgDesc(rs.getString("drift_chg_desc"))
|
||||
.riskEvent(rs.getObject("risk_event") != null ? rs.getInt("risk_event") : null)
|
||||
.riskEventDesc(rs.getString("risk_event_desc"))
|
||||
.riskEventDescExt(rs.getString("risk_event_desc_ext"))
|
||||
.ntnltyChg(rs.getObject("ntnlty_chg") != null ? rs.getInt("ntnlty_chg") : null)
|
||||
.ntnltyChgDesc(rs.getString("ntnlty_chg_desc"))
|
||||
.ntnltyPrsMouPerf(rs.getObject("ntnlty_prs_mou_perf") != null ? rs.getInt("ntnlty_prs_mou_perf") : null)
|
||||
.ntnltyPrsMouPerfDesc(rs.getString("ntnlty_prs_mou_perf_desc"))
|
||||
.ntnltyTkyMouPerf(rs.getObject("ntnlty_tky_mou_perf") != null ? rs.getInt("ntnlty_tky_mou_perf") : null)
|
||||
.ntnltyTkyMouPerfDesc(rs.getString("ntnlty_tky_mou_perf_desc"))
|
||||
.ntnltyUscgMouPerf(rs.getObject("ntnlty_uscg_mou_perf") != null ? rs.getInt("ntnlty_uscg_mou_perf") : null)
|
||||
.ntnltyUscgMouPerfDesc(rs.getString("ntnlty_uscg_mou_perf_desc"))
|
||||
.uscgExclShipCert(rs.getObject("uscg_excl_ship_cert") != null ? rs.getInt("uscg_excl_ship_cert") : null)
|
||||
.uscgExclShipCertDesc(rs.getString("uscg_excl_ship_cert_desc"))
|
||||
.pscInspectionElpsHr(rs.getObject("psc_inspection_elps_hr") != null ? rs.getInt("psc_inspection_elps_hr") : null)
|
||||
.pscInspectionElpsHrDesc(rs.getString("psc_inspection_elps_hr_desc"))
|
||||
.pscInspection(rs.getObject("psc_inspection") != null ? rs.getInt("psc_inspection") : null)
|
||||
.pscInspectionDesc(rs.getString("psc_inspection_desc"))
|
||||
.pscDefect(rs.getObject("psc_defect") != null ? rs.getInt("psc_defect") : null)
|
||||
.pscDefectDesc(rs.getString("psc_defect_desc"))
|
||||
.pscDetained(rs.getObject("psc_detained") != null ? rs.getInt("psc_detained") : null)
|
||||
.pscDetainedDesc(rs.getString("psc_detained_desc"))
|
||||
.nowSmgrcEvdc(rs.getObject("now_smgrc_evdc") != null ? rs.getInt("now_smgrc_evdc") : null)
|
||||
.nowSmgrcEvdcDesc(rs.getString("now_smgrc_evdc_desc"))
|
||||
.doccChg(rs.getObject("docc_chg") != null ? rs.getInt("docc_chg") : null)
|
||||
.doccChgDesc(rs.getString("docc_chg_desc"))
|
||||
.nowClfic(rs.getObject("now_clfic") != null ? rs.getInt("now_clfic") : null)
|
||||
.nowClficDesc(rs.getString("now_clfic_desc"))
|
||||
.nowClficDescExt(rs.getString("now_clfic_desc_ext"))
|
||||
.clficStatusChg(rs.getObject("clfic_status_chg") != null ? rs.getInt("clfic_status_chg") : null)
|
||||
.clficStatusChgDesc(rs.getString("clfic_status_chg_desc"))
|
||||
.pniInsrnc(rs.getObject("pni_insrnc") != null ? rs.getInt("pni_insrnc") : null)
|
||||
.pniInsrncDesc(rs.getString("pni_insrnc_desc"))
|
||||
.pniInsrncDescExt(rs.getString("pni_insrnc_desc_ext"))
|
||||
.shipNmChg(rs.getObject("ship_nm_chg") != null ? rs.getInt("ship_nm_chg") : null)
|
||||
.shipNmChgDesc(rs.getString("ship_nm_chg_desc"))
|
||||
.gboChg(rs.getObject("gbo_chg") != null ? rs.getInt("gbo_chg") : null)
|
||||
.gboChgDesc(rs.getString("gbo_chg_desc"))
|
||||
.vslage(rs.getObject("vslage") != null ? rs.getInt("vslage") : null)
|
||||
.vslageDesc(rs.getString("vslage_desc"))
|
||||
.ilglFshrViol(rs.getObject("ilgl_fshr_viol") != null ? rs.getInt("ilgl_fshr_viol") : null)
|
||||
.ilglFshrViolDesc(rs.getString("ilgl_fshr_viol_desc"))
|
||||
.draftChg(rs.getObject("draft_chg") != null ? rs.getInt("draft_chg") : null)
|
||||
.draftChgDesc(rs.getString("draft_chg_desc"))
|
||||
.recentSanctionPrtcll(rs.getObject("recent_sanction_prtcll") != null ? rs.getInt("recent_sanction_prtcll") : null)
|
||||
.recentSanctionPrtcllDesc(rs.getString("recent_sanction_prtcll_desc"))
|
||||
.snglShipVoy(rs.getObject("sngl_ship_voy") != null ? rs.getInt("sngl_ship_voy") : null)
|
||||
.snglShipVoyDesc(rs.getString("sngl_ship_voy_desc"))
|
||||
.fltsfty(rs.getObject("fltsfty") != null ? rs.getInt("fltsfty") : null)
|
||||
.fltsftyDesc(rs.getString("fltsfty_desc"))
|
||||
.fltPsc(rs.getObject("flt_psc") != null ? rs.getInt("flt_psc") : null)
|
||||
.fltPscDesc(rs.getString("flt_psc_desc"))
|
||||
.spcInspectionOvdue(rs.getObject("spc_inspection_ovdue") != null ? rs.getInt("spc_inspection_ovdue") : null)
|
||||
.spcInspectionOvdueDesc(rs.getString("spc_inspection_ovdue_desc"))
|
||||
.ownrUnk(rs.getObject("ownr_unk") != null ? rs.getInt("ownr_unk") : null)
|
||||
.ownrUnkDesc(rs.getString("ownr_unk_desc"))
|
||||
.rssPortCall(rs.getObject("rss_port_call") != null ? rs.getInt("rss_port_call") : null)
|
||||
.rssPortCallDesc(rs.getString("rss_port_call_desc"))
|
||||
.rssOwnrReg(rs.getObject("rss_ownr_reg") != null ? rs.getInt("rss_ownr_reg") : null)
|
||||
.rssOwnrRegDesc(rs.getString("rss_ownr_reg_desc"))
|
||||
.rssSts(rs.getObject("rss_sts") != null ? rs.getInt("rss_sts") : null)
|
||||
.rssStsDesc(rs.getString("rss_sts_desc"))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package com.snp.batch.jobs.datasync.batch.risk.repository;
|
||||
|
||||
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskChangeEntity;
|
||||
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskEntity;
|
||||
|
||||
import java.util.List;
|
||||
@ -9,6 +10,7 @@ import java.util.List;
|
||||
* 구현체: RiskRepositoryImpl (JdbcTemplate 기반)
|
||||
*/
|
||||
public interface RiskRepository {
|
||||
void saveRisk(List<RiskEntity> riskEntityList);
|
||||
void saveRiskHistory(List<RiskEntity> riskEntityList);
|
||||
void saveRiskDetail(List<RiskEntity> riskEntityList);
|
||||
void saveRiskDetailHistory(List<RiskEntity> riskEntityList);
|
||||
void saveRiskDetailChangeHistory(List<RiskChangeEntity> changeEntityList);
|
||||
}
|
||||
|
||||
@ -2,6 +2,7 @@ package com.snp.batch.jobs.datasync.batch.risk.repository;
|
||||
|
||||
import com.snp.batch.common.batch.repository.MultiDataSourceJdbcRepository;
|
||||
import com.snp.batch.common.util.TableMetaInfo;
|
||||
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskChangeEntity;
|
||||
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskEntity;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
@ -12,6 +13,7 @@ import org.springframework.stereotype.Repository;
|
||||
import javax.sql.DataSource;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.Timestamp;
|
||||
import java.sql.Types;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -21,145 +23,140 @@ import java.util.List;
|
||||
@Repository("riskRepository")
|
||||
public class RiskRepositoryImpl extends MultiDataSourceJdbcRepository<RiskEntity, Long> implements RiskRepository {
|
||||
|
||||
private DataSource batchDataSource;
|
||||
private DataSource businessDataSource;
|
||||
private final TableMetaInfo tableMetaInfo;
|
||||
|
||||
public RiskRepositoryImpl(@Qualifier("batchDataSource") DataSource batchDataSource,
|
||||
@Qualifier("businessDataSource") DataSource businessDataSource,
|
||||
TableMetaInfo tableMetaInfo) {
|
||||
|
||||
super(new JdbcTemplate(batchDataSource), new JdbcTemplate(businessDataSource));
|
||||
|
||||
this.batchDataSource = batchDataSource;
|
||||
this.businessDataSource = businessDataSource;
|
||||
this.tableMetaInfo = tableMetaInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getTableName() {
|
||||
return null;
|
||||
}
|
||||
@Override protected String getTableName() { return null; }
|
||||
@Override protected RowMapper<RiskEntity> getRowMapper() { return null; }
|
||||
@Override protected Long extractId(RiskEntity entity) { return null; }
|
||||
@Override protected String getInsertSql() { return null; }
|
||||
@Override protected String getUpdateSql() { return null; }
|
||||
@Override protected void setInsertParameters(PreparedStatement ps, RiskEntity entity) {}
|
||||
@Override protected void setUpdateParameters(PreparedStatement ps, RiskEntity entity) {}
|
||||
@Override protected String getEntityName() { return null; }
|
||||
|
||||
@Override
|
||||
protected RowMapper<RiskEntity> getRowMapper() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Long extractId(RiskEntity entity) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getInsertSql() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getUpdateSql() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setInsertParameters(PreparedStatement ps, RiskEntity entity) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setUpdateParameters(PreparedStatement ps, RiskEntity entity) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getEntityName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void saveRisk(List<RiskEntity> riskEntityList) {
|
||||
String sql = RiskSql.getRiskUpsertSql(tableMetaInfo.targetTbShipRiskInfo, "imo_no");
|
||||
if (riskEntityList == null || riskEntityList.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
// log.debug("{} 배치 삽입 시작: {} 건", "RiskEntity", riskEntityList.size());
|
||||
|
||||
public void saveRiskDetail(List<RiskEntity> riskEntityList) {
|
||||
if (riskEntityList == null || riskEntityList.isEmpty()) return;
|
||||
String sql = RiskSql.getRiskDetailUpsertSql(tableMetaInfo.targetTbShipRiskDetailInfo, "imo_no");
|
||||
batchJdbcTemplate.batchUpdate(sql, riskEntityList, riskEntityList.size(),
|
||||
(ps, entity) -> {
|
||||
try {
|
||||
bindRisk(ps, entity);
|
||||
} catch (Exception e) {
|
||||
log.error("배치 삽입 파라미터 설정 실패", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
// log.debug("{} 배치 삽입 완료: {} 건", "RiskEntity", riskEntityList.size());
|
||||
(ps, entity) -> { try { bindRiskDetail(ps, entity); } catch (Exception e) { throw new RuntimeException(e); } });
|
||||
}
|
||||
|
||||
@Override
|
||||
public void saveRiskHistory(List<RiskEntity> riskEntityList) {
|
||||
String sql = RiskSql.getRiskUpsertSql(tableMetaInfo.targetTbShipRiskHstry, "imo_no, last_mdfcn_dt");
|
||||
if (riskEntityList == null || riskEntityList.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
// log.debug("{} 배치 삽입 시작: {} 건", "RiskEntity", riskEntityList.size());
|
||||
|
||||
public void saveRiskDetailHistory(List<RiskEntity> riskEntityList) {
|
||||
if (riskEntityList == null || riskEntityList.isEmpty()) return;
|
||||
String sql = RiskSql.getRiskDetailUpsertSql(tableMetaInfo.targetTbShipRiskDetailHstry, "imo_no, last_mdfcn_dt");
|
||||
batchJdbcTemplate.batchUpdate(sql, riskEntityList, riskEntityList.size(),
|
||||
(ps, entity) -> {
|
||||
try {
|
||||
bindRisk(ps, entity);
|
||||
} catch (Exception e) {
|
||||
log.error("배치 삽입 파라미터 설정 실패", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
// log.debug("{} 배치 삽입 완료: {} 건", "RiskEntity", riskEntityList.size());
|
||||
(ps, entity) -> { try { bindRiskDetail(ps, entity); } catch (Exception e) { throw new RuntimeException(e); } });
|
||||
}
|
||||
|
||||
public void bindRisk(PreparedStatement pstmt, RiskEntity entity) throws Exception {
|
||||
@Override
|
||||
public void saveRiskDetailChangeHistory(List<RiskChangeEntity> changeEntityList) {
|
||||
if (changeEntityList == null || changeEntityList.isEmpty()) return;
|
||||
String sql = RiskSql.getChangeHistoryInsertSql(tableMetaInfo.targetTbShipRiskDetailInfoHstry);
|
||||
batchJdbcTemplate.batchUpdate(sql, changeEntityList, changeEntityList.size(),
|
||||
(ps, entity) -> {
|
||||
ps.setString(1, entity.getImoNo());
|
||||
ps.setTimestamp(2, entity.getLastMdfcnDt() != null ? Timestamp.valueOf(entity.getLastMdfcnDt()) : null);
|
||||
ps.setString(3, entity.getFlctnColNm());
|
||||
ps.setString(4, entity.getBfrVal());
|
||||
ps.setString(5, entity.getAftrVal());
|
||||
});
|
||||
}
|
||||
|
||||
private void bindRiskDetail(PreparedStatement ps, RiskEntity e) throws Exception {
|
||||
int idx = 1;
|
||||
pstmt.setString(idx++, "SYSTEM"); // 1. creatr_id
|
||||
pstmt.setString(idx++, entity.getImoNo()); // 2. imo_no
|
||||
pstmt.setTimestamp(idx++, entity.getLastMdfcnDt() != null ? Timestamp.valueOf(entity.getLastMdfcnDt()) : null); // 3. last_mdfcn_dt
|
||||
pstmt.setString(idx++, entity.getRiskDataMaint()); // 4. risk_data_maint
|
||||
pstmt.setString(idx++, entity.getAisNotrcvElpsDays()); // 5. ais_notrcv_elps_days
|
||||
pstmt.setString(idx++, entity.getAisLwrnkDays()); // 6. ais_lwrnk_days
|
||||
pstmt.setString(idx++, entity.getAisUpImoDesc()); // 7. ais_up_imo_desc
|
||||
pstmt.setString(idx++, entity.getOthrShipNmVoyYn()); // 8. othr_ship_nm_voy_yn
|
||||
pstmt.setString(idx++, entity.getMmsiAnomMessage()); // 9. mmsi_anom_message
|
||||
pstmt.setString(idx++, entity.getRecentDarkActv()); // 10. recent_dark_actv
|
||||
pstmt.setString(idx++, entity.getPortPrtcll()); // 11. port_prtcll
|
||||
pstmt.setString(idx++, entity.getPortRisk()); // 12. port_risk
|
||||
pstmt.setString(idx++, entity.getStsJob()); // 13. sts_job
|
||||
pstmt.setString(idx++, entity.getDriftChg()); // 14. drift_chg
|
||||
pstmt.setString(idx++, entity.getRiskEvent()); // 15. risk_event
|
||||
pstmt.setString(idx++, entity.getNtnltyChg()); // 16. ntnlty_chg
|
||||
pstmt.setString(idx++, entity.getNtnltyPrsMouPerf()); // 17. ntnlty_prs_mou_perf
|
||||
pstmt.setString(idx++, entity.getNtnltyTkyMouPerf()); // 18. ntnlty_tky_mou_perf
|
||||
pstmt.setString(idx++, entity.getNtnltyUscgMouPerf()); // 19. ntnlty_uscg_mou_perf
|
||||
pstmt.setString(idx++, entity.getUscgExclShipCert()); // 20. uscg_excl_ship_cert
|
||||
pstmt.setString(idx++, entity.getPscInspectionElpsHr()); // 21. psc_inspection_elps_hr
|
||||
pstmt.setString(idx++, entity.getPscInspection()); // 22. psc_inspection
|
||||
pstmt.setString(idx++, entity.getPscDefect()); // 23. psc_defect
|
||||
pstmt.setString(idx++, entity.getPscDetained()); // 24. psc_detained
|
||||
pstmt.setString(idx++, entity.getNowSmgrcEvdc()); // 25. now_smgrc_evdc
|
||||
pstmt.setString(idx++, entity.getDoccChg()); // 26. docc_chg
|
||||
pstmt.setString(idx++, entity.getNowClfic()); // 27. now_clfic
|
||||
pstmt.setString(idx++, entity.getClficStatusChg()); // 28. clfic_status_chg
|
||||
pstmt.setString(idx++, entity.getPniInsrnc()); // 29. pni_insrnc
|
||||
pstmt.setString(idx++, entity.getShipNmChg()); // 30. ship_nm_chg
|
||||
pstmt.setString(idx++, entity.getGboChg()); // 31. gbo_chg
|
||||
pstmt.setString(idx++, entity.getVslage()); // 32. vslage
|
||||
pstmt.setString(idx++, entity.getIlglFshrViol()); // 33. ilgl_fshr_viol
|
||||
pstmt.setString(idx++, entity.getDraftChg()); // 34. draft_chg
|
||||
pstmt.setString(idx++, entity.getRecentSanctionPrtcll()); // 35. recent_sanction_prtcll
|
||||
pstmt.setString(idx++, entity.getSnglShipVoy()); // 36. sngl_ship_voy
|
||||
pstmt.setString(idx++, entity.getFltsfty()); // 37. fltsfty
|
||||
pstmt.setString(idx++, entity.getFltPsc()); // 38. flt_psc
|
||||
pstmt.setString(idx++, entity.getSpcInspectionOvdue()); // 39. spc_inspection_ovdue
|
||||
pstmt.setString(idx++, entity.getOwnrUnk()); // 40. ownr_unk
|
||||
pstmt.setString(idx++, entity.getRssPortCall()); // 41. rss_port_call
|
||||
pstmt.setString(idx++, entity.getRssOwnrReg()); // 42. rss_ownr_reg
|
||||
pstmt.setString(idx++, entity.getRssSts()); // 43. rss_sts
|
||||
ps.setString(idx++, "SYSTEM");
|
||||
ps.setString(idx++, e.getImoNo());
|
||||
ps.setTimestamp(idx++, e.getLastMdfcnDt() != null ? Timestamp.valueOf(e.getLastMdfcnDt()) : null);
|
||||
ps.setObject(idx++, e.getRiskDataMaint(), Types.INTEGER);
|
||||
ps.setObject(idx++, e.getAisNotrcvElpsDays(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getAisNotrcvElpsDaysDesc());
|
||||
ps.setObject(idx++, e.getAisLwrnkDays(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getAisLwrnkDaysDesc());
|
||||
ps.setObject(idx++, e.getAisUpImoDesc(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getAisUpImoDescVal());
|
||||
ps.setObject(idx++, e.getOthrShipNmVoyYn(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getOthrShipNmVoyYnDesc());
|
||||
ps.setObject(idx++, e.getMmsiAnomMessage(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getMmsiAnomMessageDesc());
|
||||
ps.setObject(idx++, e.getRecentDarkActv(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getRecentDarkActvDesc());
|
||||
ps.setObject(idx++, e.getPortPrtcll(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getPortPrtcllDesc());
|
||||
ps.setObject(idx++, e.getPortRisk(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getPortRiskDesc());
|
||||
ps.setObject(idx++, e.getStsJob(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getStsJobDesc());
|
||||
ps.setObject(idx++, e.getDriftChg(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getDriftChgDesc());
|
||||
ps.setObject(idx++, e.getRiskEvent(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getRiskEventDesc());
|
||||
ps.setString(idx++, e.getRiskEventDescExt());
|
||||
ps.setObject(idx++, e.getNtnltyChg(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getNtnltyChgDesc());
|
||||
ps.setObject(idx++, e.getNtnltyPrsMouPerf(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getNtnltyPrsMouPerfDesc());
|
||||
ps.setObject(idx++, e.getNtnltyTkyMouPerf(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getNtnltyTkyMouPerfDesc());
|
||||
ps.setObject(idx++, e.getNtnltyUscgMouPerf(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getNtnltyUscgMouPerfDesc());
|
||||
ps.setObject(idx++, e.getUscgExclShipCert(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getUscgExclShipCertDesc());
|
||||
ps.setObject(idx++, e.getPscInspectionElpsHr(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getPscInspectionElpsHrDesc());
|
||||
ps.setObject(idx++, e.getPscInspection(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getPscInspectionDesc());
|
||||
ps.setObject(idx++, e.getPscDefect(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getPscDefectDesc());
|
||||
ps.setObject(idx++, e.getPscDetained(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getPscDetainedDesc());
|
||||
ps.setObject(idx++, e.getNowSmgrcEvdc(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getNowSmgrcEvdcDesc());
|
||||
ps.setObject(idx++, e.getDoccChg(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getDoccChgDesc());
|
||||
ps.setObject(idx++, e.getNowClfic(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getNowClficDesc());
|
||||
ps.setString(idx++, e.getNowClficDescExt());
|
||||
ps.setObject(idx++, e.getClficStatusChg(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getClficStatusChgDesc());
|
||||
ps.setObject(idx++, e.getPniInsrnc(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getPniInsrncDesc());
|
||||
ps.setString(idx++, e.getPniInsrncDescExt());
|
||||
ps.setObject(idx++, e.getShipNmChg(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getShipNmChgDesc());
|
||||
ps.setObject(idx++, e.getGboChg(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getGboChgDesc());
|
||||
ps.setObject(idx++, e.getVslage(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getVslageDesc());
|
||||
ps.setObject(idx++, e.getIlglFshrViol(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getIlglFshrViolDesc());
|
||||
ps.setObject(idx++, e.getDraftChg(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getDraftChgDesc());
|
||||
ps.setObject(idx++, e.getRecentSanctionPrtcll(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getRecentSanctionPrtcllDesc());
|
||||
ps.setObject(idx++, e.getSnglShipVoy(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getSnglShipVoyDesc());
|
||||
ps.setObject(idx++, e.getFltsfty(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getFltsftyDesc());
|
||||
ps.setObject(idx++, e.getFltPsc(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getFltPscDesc());
|
||||
ps.setObject(idx++, e.getSpcInspectionOvdue(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getSpcInspectionOvdueDesc());
|
||||
ps.setObject(idx++, e.getOwnrUnk(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getOwnrUnkDesc());
|
||||
ps.setObject(idx++, e.getRssPortCall(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getRssPortCallDesc());
|
||||
ps.setObject(idx++, e.getRssOwnrReg(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getRssOwnrRegDesc());
|
||||
ps.setObject(idx++, e.getRssSts(), Types.INTEGER);
|
||||
ps.setString(idx++, e.getRssStsDesc());
|
||||
}
|
||||
}
|
||||
|
||||
@ -6,85 +6,135 @@ import org.springframework.stereotype.Component;
|
||||
@Component
|
||||
public class RiskSql {
|
||||
private static String TARGET_SCHEMA;
|
||||
|
||||
public RiskSql(@Value("${app.batch.target-schema.name}") String targetSchema) {
|
||||
TARGET_SCHEMA = targetSchema;
|
||||
}
|
||||
|
||||
public static String getRiskUpsertSql(String targetTable, String targetIndex) {
|
||||
/**
|
||||
* Risk Detail UPSERT SQL (tb_ship_risk_detail_info, tb_ship_risk_detail_hstry 공통)
|
||||
*/
|
||||
public static String getRiskDetailUpsertSql(String targetTable, String targetIndex) {
|
||||
return """
|
||||
INSERT INTO %s.%s (
|
||||
crt_dt, creatr_id,
|
||||
imo_no, last_mdfcn_dt, risk_data_maint, ais_notrcv_elps_days,
|
||||
ais_lwrnk_days, ais_up_imo_desc, othr_ship_nm_voy_yn, mmsi_anom_message,
|
||||
recent_dark_actv, port_prtcll, port_risk, sts_job,
|
||||
drift_chg, risk_event, ntnlty_chg, ntnlty_prs_mou_perf,
|
||||
ntnlty_tky_mou_perf, ntnlty_uscg_mou_perf, uscg_excl_ship_cert, psc_inspection_elps_hr,
|
||||
psc_inspection, psc_defect, psc_detained, now_smgrc_evdc,
|
||||
docc_chg, now_clfic, clfic_status_chg, pni_insrnc,
|
||||
ship_nm_chg, gbo_chg, vslage, ilgl_fshr_viol,
|
||||
draft_chg, recent_sanction_prtcll, sngl_ship_voy, fltsfty,
|
||||
flt_psc, spc_inspection_ovdue, ownr_unk, rss_port_call,
|
||||
rss_ownr_reg, rss_sts
|
||||
imo_no, last_mdfcn_dt, risk_data_maint,
|
||||
ais_notrcv_elps_days, ais_notrcv_elps_days_desc,
|
||||
ais_lwrnk_days, ais_lwrnk_days_desc,
|
||||
ais_up_imo_desc, ais_up_imo_desc_val,
|
||||
othr_ship_nm_voy_yn, othr_ship_nm_voy_yn_desc,
|
||||
mmsi_anom_message, mmsi_anom_message_desc,
|
||||
recent_dark_actv, recent_dark_actv_desc,
|
||||
port_prtcll, port_prtcll_desc,
|
||||
port_risk, port_risk_desc,
|
||||
sts_job, sts_job_desc,
|
||||
drift_chg, drift_chg_desc,
|
||||
risk_event, risk_event_desc, risk_event_desc_ext,
|
||||
ntnlty_chg, ntnlty_chg_desc,
|
||||
ntnlty_prs_mou_perf, ntnlty_prs_mou_perf_desc,
|
||||
ntnlty_tky_mou_perf, ntnlty_tky_mou_perf_desc,
|
||||
ntnlty_uscg_mou_perf, ntnlty_uscg_mou_perf_desc,
|
||||
uscg_excl_ship_cert, uscg_excl_ship_cert_desc,
|
||||
psc_inspection_elps_hr, psc_inspection_elps_hr_desc,
|
||||
psc_inspection, psc_inspection_desc,
|
||||
psc_defect, psc_defect_desc,
|
||||
psc_detained, psc_detained_desc,
|
||||
now_smgrc_evdc, now_smgrc_evdc_desc,
|
||||
docc_chg, docc_chg_desc,
|
||||
now_clfic, now_clfic_desc, now_clfic_desc_ext,
|
||||
clfic_status_chg, clfic_status_chg_desc,
|
||||
pni_insrnc, pni_insrnc_desc, pni_insrnc_desc_ext,
|
||||
ship_nm_chg, ship_nm_chg_desc,
|
||||
gbo_chg, gbo_chg_desc,
|
||||
vslage, vslage_desc,
|
||||
ilgl_fshr_viol, ilgl_fshr_viol_desc,
|
||||
draft_chg, draft_chg_desc,
|
||||
recent_sanction_prtcll, recent_sanction_prtcll_desc,
|
||||
sngl_ship_voy, sngl_ship_voy_desc,
|
||||
fltsfty, fltsfty_desc,
|
||||
flt_psc, flt_psc_desc,
|
||||
spc_inspection_ovdue, spc_inspection_ovdue_desc,
|
||||
ownr_unk, ownr_unk_desc,
|
||||
rss_port_call, rss_port_call_desc,
|
||||
rss_ownr_reg, rss_ownr_reg_desc,
|
||||
rss_sts, rss_sts_desc
|
||||
)
|
||||
VALUES (
|
||||
CURRENT_TIMESTAMP, ?,
|
||||
?, ?, ?, ?,
|
||||
?, ?, ?, ?,
|
||||
?, ?, ?, ?,
|
||||
?, ?, ?, ?,
|
||||
?, ?, ?, ?,
|
||||
?, ?, ?, ?,
|
||||
?, ?, ?, ?,
|
||||
?, ?, ?, ?,
|
||||
?, ?, ?, ?,
|
||||
?, ?, ?, ?,
|
||||
?, ?
|
||||
?, ?, ?,
|
||||
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
|
||||
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
|
||||
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
|
||||
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
|
||||
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
|
||||
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
|
||||
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
|
||||
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
|
||||
?
|
||||
)
|
||||
ON CONFLICT (%s)
|
||||
DO UPDATE SET
|
||||
mdfcn_dt = CURRENT_TIMESTAMP,
|
||||
mdfr_id = 'SYSTEM',
|
||||
last_mdfcn_dt = EXCLUDED.last_mdfcn_dt,
|
||||
risk_data_maint = EXCLUDED.risk_data_maint,
|
||||
ais_notrcv_elps_days = EXCLUDED.ais_notrcv_elps_days,
|
||||
ais_lwrnk_days = EXCLUDED.ais_lwrnk_days,
|
||||
ais_up_imo_desc = EXCLUDED.ais_up_imo_desc,
|
||||
othr_ship_nm_voy_yn = EXCLUDED.othr_ship_nm_voy_yn,
|
||||
mmsi_anom_message = EXCLUDED.mmsi_anom_message,
|
||||
recent_dark_actv = EXCLUDED.recent_dark_actv,
|
||||
port_prtcll = EXCLUDED.port_prtcll,
|
||||
port_risk = EXCLUDED.port_risk,
|
||||
sts_job = EXCLUDED.sts_job,
|
||||
drift_chg = EXCLUDED.drift_chg,
|
||||
risk_event = EXCLUDED.risk_event,
|
||||
ntnlty_chg = EXCLUDED.ntnlty_chg,
|
||||
ntnlty_prs_mou_perf = EXCLUDED.ntnlty_prs_mou_perf,
|
||||
ntnlty_tky_mou_perf = EXCLUDED.ntnlty_tky_mou_perf,
|
||||
ntnlty_uscg_mou_perf = EXCLUDED.ntnlty_uscg_mou_perf,
|
||||
uscg_excl_ship_cert = EXCLUDED.uscg_excl_ship_cert,
|
||||
psc_inspection_elps_hr = EXCLUDED.psc_inspection_elps_hr,
|
||||
psc_inspection = EXCLUDED.psc_inspection,
|
||||
psc_defect = EXCLUDED.psc_defect,
|
||||
psc_detained = EXCLUDED.psc_detained,
|
||||
now_smgrc_evdc = EXCLUDED.now_smgrc_evdc,
|
||||
docc_chg = EXCLUDED.docc_chg,
|
||||
now_clfic = EXCLUDED.now_clfic,
|
||||
clfic_status_chg = EXCLUDED.clfic_status_chg,
|
||||
pni_insrnc = EXCLUDED.pni_insrnc,
|
||||
ship_nm_chg = EXCLUDED.ship_nm_chg,
|
||||
gbo_chg = EXCLUDED.gbo_chg,
|
||||
vslage = EXCLUDED.vslage,
|
||||
ilgl_fshr_viol = EXCLUDED.ilgl_fshr_viol,
|
||||
draft_chg = EXCLUDED.draft_chg,
|
||||
recent_sanction_prtcll = EXCLUDED.recent_sanction_prtcll,
|
||||
sngl_ship_voy = EXCLUDED.sngl_ship_voy,
|
||||
fltsfty = EXCLUDED.fltsfty,
|
||||
flt_psc = EXCLUDED.flt_psc,
|
||||
spc_inspection_ovdue = EXCLUDED.spc_inspection_ovdue,
|
||||
ownr_unk = EXCLUDED.ownr_unk,
|
||||
rss_port_call = EXCLUDED.rss_port_call,
|
||||
rss_ownr_reg = EXCLUDED.rss_ownr_reg,
|
||||
rss_sts = EXCLUDED.rss_sts;
|
||||
mdfcn_dt = CURRENT_TIMESTAMP, mdfr_id = 'SYSTEM',
|
||||
last_mdfcn_dt = EXCLUDED.last_mdfcn_dt, risk_data_maint = EXCLUDED.risk_data_maint,
|
||||
ais_notrcv_elps_days = EXCLUDED.ais_notrcv_elps_days, ais_notrcv_elps_days_desc = EXCLUDED.ais_notrcv_elps_days_desc,
|
||||
ais_lwrnk_days = EXCLUDED.ais_lwrnk_days, ais_lwrnk_days_desc = EXCLUDED.ais_lwrnk_days_desc,
|
||||
ais_up_imo_desc = EXCLUDED.ais_up_imo_desc, ais_up_imo_desc_val = EXCLUDED.ais_up_imo_desc_val,
|
||||
othr_ship_nm_voy_yn = EXCLUDED.othr_ship_nm_voy_yn, othr_ship_nm_voy_yn_desc = EXCLUDED.othr_ship_nm_voy_yn_desc,
|
||||
mmsi_anom_message = EXCLUDED.mmsi_anom_message, mmsi_anom_message_desc = EXCLUDED.mmsi_anom_message_desc,
|
||||
recent_dark_actv = EXCLUDED.recent_dark_actv, recent_dark_actv_desc = EXCLUDED.recent_dark_actv_desc,
|
||||
port_prtcll = EXCLUDED.port_prtcll, port_prtcll_desc = EXCLUDED.port_prtcll_desc,
|
||||
port_risk = EXCLUDED.port_risk, port_risk_desc = EXCLUDED.port_risk_desc,
|
||||
sts_job = EXCLUDED.sts_job, sts_job_desc = EXCLUDED.sts_job_desc,
|
||||
drift_chg = EXCLUDED.drift_chg, drift_chg_desc = EXCLUDED.drift_chg_desc,
|
||||
risk_event = EXCLUDED.risk_event, risk_event_desc = EXCLUDED.risk_event_desc, risk_event_desc_ext = EXCLUDED.risk_event_desc_ext,
|
||||
ntnlty_chg = EXCLUDED.ntnlty_chg, ntnlty_chg_desc = EXCLUDED.ntnlty_chg_desc,
|
||||
ntnlty_prs_mou_perf = EXCLUDED.ntnlty_prs_mou_perf, ntnlty_prs_mou_perf_desc = EXCLUDED.ntnlty_prs_mou_perf_desc,
|
||||
ntnlty_tky_mou_perf = EXCLUDED.ntnlty_tky_mou_perf, ntnlty_tky_mou_perf_desc = EXCLUDED.ntnlty_tky_mou_perf_desc,
|
||||
ntnlty_uscg_mou_perf = EXCLUDED.ntnlty_uscg_mou_perf, ntnlty_uscg_mou_perf_desc = EXCLUDED.ntnlty_uscg_mou_perf_desc,
|
||||
uscg_excl_ship_cert = EXCLUDED.uscg_excl_ship_cert, uscg_excl_ship_cert_desc = EXCLUDED.uscg_excl_ship_cert_desc,
|
||||
psc_inspection_elps_hr = EXCLUDED.psc_inspection_elps_hr, psc_inspection_elps_hr_desc = EXCLUDED.psc_inspection_elps_hr_desc,
|
||||
psc_inspection = EXCLUDED.psc_inspection, psc_inspection_desc = EXCLUDED.psc_inspection_desc,
|
||||
psc_defect = EXCLUDED.psc_defect, psc_defect_desc = EXCLUDED.psc_defect_desc,
|
||||
psc_detained = EXCLUDED.psc_detained, psc_detained_desc = EXCLUDED.psc_detained_desc,
|
||||
now_smgrc_evdc = EXCLUDED.now_smgrc_evdc, now_smgrc_evdc_desc = EXCLUDED.now_smgrc_evdc_desc,
|
||||
docc_chg = EXCLUDED.docc_chg, docc_chg_desc = EXCLUDED.docc_chg_desc,
|
||||
now_clfic = EXCLUDED.now_clfic, now_clfic_desc = EXCLUDED.now_clfic_desc, now_clfic_desc_ext = EXCLUDED.now_clfic_desc_ext,
|
||||
clfic_status_chg = EXCLUDED.clfic_status_chg, clfic_status_chg_desc = EXCLUDED.clfic_status_chg_desc,
|
||||
pni_insrnc = EXCLUDED.pni_insrnc, pni_insrnc_desc = EXCLUDED.pni_insrnc_desc, pni_insrnc_desc_ext = EXCLUDED.pni_insrnc_desc_ext,
|
||||
ship_nm_chg = EXCLUDED.ship_nm_chg, ship_nm_chg_desc = EXCLUDED.ship_nm_chg_desc,
|
||||
gbo_chg = EXCLUDED.gbo_chg, gbo_chg_desc = EXCLUDED.gbo_chg_desc,
|
||||
vslage = EXCLUDED.vslage, vslage_desc = EXCLUDED.vslage_desc,
|
||||
ilgl_fshr_viol = EXCLUDED.ilgl_fshr_viol, ilgl_fshr_viol_desc = EXCLUDED.ilgl_fshr_viol_desc,
|
||||
draft_chg = EXCLUDED.draft_chg, draft_chg_desc = EXCLUDED.draft_chg_desc,
|
||||
recent_sanction_prtcll = EXCLUDED.recent_sanction_prtcll, recent_sanction_prtcll_desc = EXCLUDED.recent_sanction_prtcll_desc,
|
||||
sngl_ship_voy = EXCLUDED.sngl_ship_voy, sngl_ship_voy_desc = EXCLUDED.sngl_ship_voy_desc,
|
||||
fltsfty = EXCLUDED.fltsfty, fltsfty_desc = EXCLUDED.fltsfty_desc,
|
||||
flt_psc = EXCLUDED.flt_psc, flt_psc_desc = EXCLUDED.flt_psc_desc,
|
||||
spc_inspection_ovdue = EXCLUDED.spc_inspection_ovdue, spc_inspection_ovdue_desc = EXCLUDED.spc_inspection_ovdue_desc,
|
||||
ownr_unk = EXCLUDED.ownr_unk, ownr_unk_desc = EXCLUDED.ownr_unk_desc,
|
||||
rss_port_call = EXCLUDED.rss_port_call, rss_port_call_desc = EXCLUDED.rss_port_call_desc,
|
||||
rss_ownr_reg = EXCLUDED.rss_ownr_reg, rss_ownr_reg_desc = EXCLUDED.rss_ownr_reg_desc,
|
||||
rss_sts = EXCLUDED.rss_sts, rss_sts_desc = EXCLUDED.rss_sts_desc;
|
||||
""".formatted(TARGET_SCHEMA, targetTable, targetIndex);
|
||||
}
|
||||
|
||||
/**
|
||||
* 값 변경 이력 INSERT SQL (tb_ship_risk_detail_info_hstry)
|
||||
*/
|
||||
public static String getChangeHistoryInsertSql(String targetTable) {
|
||||
return """
|
||||
INSERT INTO %s.%s (crt_dt, creatr_id, imo_no, last_mdfcn_dt, flctn_col_nm, bfr_val, aftr_val)
|
||||
VALUES (CURRENT_TIMESTAMP, 'SYSTEM', ?, ?, ?, ?, ?)
|
||||
ON CONFLICT (imo_no, last_mdfcn_dt, flctn_col_nm) DO NOTHING
|
||||
""".formatted(TARGET_SCHEMA, targetTable);
|
||||
}
|
||||
|
||||
/**
|
||||
* 기존 데이터 조회 SQL (변경 비교용)
|
||||
*/
|
||||
public static String getSelectCurrentSql(String targetTable) {
|
||||
return """
|
||||
SELECT * FROM %s.%s WHERE imo_no = ?
|
||||
""".formatted(TARGET_SCHEMA, targetTable);
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,23 @@
|
||||
package com.snp.batch.jobs.datasync.batch.risk.writer;
|
||||
|
||||
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskChangeEntity;
|
||||
import com.snp.batch.jobs.datasync.batch.risk.repository.RiskRepository;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.item.Chunk;
|
||||
import org.springframework.batch.item.ItemWriter;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class RiskChangeWriter implements ItemWriter<RiskChangeEntity> {
|
||||
|
||||
private final RiskRepository riskRepository;
|
||||
|
||||
@Override
|
||||
public void write(Chunk<? extends RiskChangeEntity> chunk) throws Exception {
|
||||
if (chunk.isEmpty()) return;
|
||||
riskRepository.saveRiskDetailChangeHistory(new ArrayList<>(chunk.getItems()));
|
||||
}
|
||||
}
|
||||
@ -19,10 +19,8 @@ public class RiskWriter extends BaseChunkedWriter<RiskEntity> {
|
||||
|
||||
@Override
|
||||
protected void writeItems(List<RiskEntity> items) throws Exception {
|
||||
if (items.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
riskRepository.saveRisk(items);
|
||||
riskRepository.saveRiskHistory(items);
|
||||
if (items.isEmpty()) return;
|
||||
riskRepository.saveRiskDetail(items); // 1. 최신 데이터 UPSERT
|
||||
riskRepository.saveRiskDetailHistory(items); // 2. 스냅샷 이력 UPSERT
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,63 @@
|
||||
package com.snp.batch.jobs.maintenance;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.Step;
|
||||
import org.springframework.batch.core.job.builder.JobBuilder;
|
||||
import org.springframework.batch.core.repository.JobRepository;
|
||||
import org.springframework.batch.core.step.builder.StepBuilder;
|
||||
import org.springframework.batch.core.step.tasklet.Tasklet;
|
||||
import org.springframework.batch.repeat.RepeatStatus;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* 배치 로그 정리 Job
|
||||
* 보관 기간이 지난 Spring Batch 메타데이터를 삭제
|
||||
*
|
||||
* 실행 주기: 매주 일요일 00:00 (Quartz 스케줄)
|
||||
*/
|
||||
@Slf4j
|
||||
@Configuration
|
||||
@RequiredArgsConstructor
|
||||
public class BatchLogCleanupJobConfig {
|
||||
|
||||
private final JobRepository jobRepository;
|
||||
private final PlatformTransactionManager transactionManager;
|
||||
private final BatchLogCleanupRepository cleanupRepository;
|
||||
|
||||
@Value("${app.batch.log-retention-days:90}")
|
||||
private int retentionDays;
|
||||
|
||||
@Bean
|
||||
public Job batchLogCleanupJob() {
|
||||
return new JobBuilder("batchLogCleanupJob", jobRepository)
|
||||
.start(batchLogCleanupStep())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Step batchLogCleanupStep() {
|
||||
return new StepBuilder("batchLogCleanupStep", jobRepository)
|
||||
.tasklet(batchLogCleanupTasklet(), transactionManager)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Tasklet batchLogCleanupTasklet() {
|
||||
return (contribution, chunkContext) -> {
|
||||
LocalDateTime before = LocalDateTime.now().minusDays(retentionDays);
|
||||
log.info("[LogCleanup] 배치 로그 정리 시작 (보관 기간: {}일, 기준일시: {})", retentionDays, before);
|
||||
|
||||
int totalDeleted = cleanupRepository.deleteOldLogs(before);
|
||||
|
||||
log.info("[LogCleanup] 배치 로그 정리 완료 (총 {}건 삭제)", totalDeleted);
|
||||
return RepeatStatus.FINISHED;
|
||||
};
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,93 @@
|
||||
package com.snp.batch.jobs.maintenance;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* 배치 로그 삭제 Repository
|
||||
* FK 제약 조건 순서를 고려하여 삭제
|
||||
*/
|
||||
@Slf4j
|
||||
@Repository
|
||||
@RequiredArgsConstructor
|
||||
public class BatchLogCleanupRepository {
|
||||
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
|
||||
/**
|
||||
* 보관 기간이 지난 배치 로그를 삭제
|
||||
* 삭제 순서: step_context → step_execution → job_context → job_params → job_execution → job_instance(고아)
|
||||
*
|
||||
* @param before 이 시점 이전의 로그를 삭제
|
||||
* @return 삭제된 총 행 수
|
||||
*/
|
||||
public int deleteOldLogs(LocalDateTime before) {
|
||||
int total = 0;
|
||||
|
||||
// 1. batch_step_execution_context (step_execution FK)
|
||||
int stepCtx = jdbcTemplate.update("""
|
||||
DELETE FROM batch_step_execution_context
|
||||
WHERE STEP_EXECUTION_ID IN (
|
||||
SELECT se.STEP_EXECUTION_ID
|
||||
FROM batch_step_execution se
|
||||
INNER JOIN batch_job_execution je ON se.JOB_EXECUTION_ID = je.JOB_EXECUTION_ID
|
||||
WHERE je.START_TIME < ?
|
||||
)
|
||||
""", before);
|
||||
log.info("[LogCleanup] batch_step_execution_context: {}건 삭제", stepCtx);
|
||||
total += stepCtx;
|
||||
|
||||
// 2. batch_step_execution
|
||||
int stepExec = jdbcTemplate.update("""
|
||||
DELETE FROM batch_step_execution
|
||||
WHERE JOB_EXECUTION_ID IN (
|
||||
SELECT JOB_EXECUTION_ID FROM batch_job_execution WHERE START_TIME < ?
|
||||
)
|
||||
""", before);
|
||||
log.info("[LogCleanup] batch_step_execution: {}건 삭제", stepExec);
|
||||
total += stepExec;
|
||||
|
||||
// 3. batch_job_execution_context
|
||||
int jobCtx = jdbcTemplate.update("""
|
||||
DELETE FROM batch_job_execution_context
|
||||
WHERE JOB_EXECUTION_ID IN (
|
||||
SELECT JOB_EXECUTION_ID FROM batch_job_execution WHERE START_TIME < ?
|
||||
)
|
||||
""", before);
|
||||
log.info("[LogCleanup] batch_job_execution_context: {}건 삭제", jobCtx);
|
||||
total += jobCtx;
|
||||
|
||||
// 4. batch_job_execution_params
|
||||
int jobParams = jdbcTemplate.update("""
|
||||
DELETE FROM batch_job_execution_params
|
||||
WHERE JOB_EXECUTION_ID IN (
|
||||
SELECT JOB_EXECUTION_ID FROM batch_job_execution WHERE START_TIME < ?
|
||||
)
|
||||
""", before);
|
||||
log.info("[LogCleanup] batch_job_execution_params: {}건 삭제", jobParams);
|
||||
total += jobParams;
|
||||
|
||||
// 5. batch_job_execution
|
||||
int jobExec = jdbcTemplate.update("""
|
||||
DELETE FROM batch_job_execution WHERE START_TIME < ?
|
||||
""", before);
|
||||
log.info("[LogCleanup] batch_job_execution: {}건 삭제", jobExec);
|
||||
total += jobExec;
|
||||
|
||||
// 6. batch_job_instance (고아 인스턴스)
|
||||
int jobInst = jdbcTemplate.update("""
|
||||
DELETE FROM batch_job_instance
|
||||
WHERE JOB_INSTANCE_ID NOT IN (
|
||||
SELECT JOB_INSTANCE_ID FROM batch_job_execution
|
||||
)
|
||||
""");
|
||||
log.info("[LogCleanup] batch_job_instance (고아): {}건 삭제", jobInst);
|
||||
total += jobInst;
|
||||
|
||||
return total;
|
||||
}
|
||||
}
|
||||
@ -104,6 +104,7 @@ app:
|
||||
batch:
|
||||
chunk-size: 10000
|
||||
sub-chunk-size: 5000 # Writer Sub-Chunk 분할 크기
|
||||
log-retention-days: 90 # 배치 로그 보관 기간 (일)
|
||||
source-schema:
|
||||
name: std_snp_data
|
||||
tables:
|
||||
@ -151,7 +152,7 @@ app:
|
||||
movements-008: tb_ship_trnst_hstry
|
||||
code-001: tb_ship_type_cd
|
||||
code-002: tb_ship_country_cd
|
||||
risk-compliance-001: tb_ship_risk_info
|
||||
risk-compliance-001: tb_ship_risk_detail_info
|
||||
risk-compliance-003: tb_ship_compliance_info
|
||||
risk-compliance-006: tb_company_compliance_info
|
||||
target-schema:
|
||||
@ -201,8 +202,9 @@ app:
|
||||
movements-008: tb_ship_trnst_hstry
|
||||
code-001: tb_ship_type_cd
|
||||
code-002: tb_ship_country_cd
|
||||
risk-compliance-001: tb_ship_risk_info
|
||||
risk-compliance-002: tb_ship_risk_hstry
|
||||
risk-compliance-001: tb_ship_risk_detail_info
|
||||
risk-compliance-001-1: tb_ship_risk_detail_info_hstry
|
||||
risk-compliance-002: tb_ship_risk_detail_hstry
|
||||
risk-compliance-003: tb_ship_compliance_info
|
||||
risk-compliance-004: tb_ship_compliance_hstry
|
||||
risk-compliance-005: tb_ship_compliance_info_hstry
|
||||
|
||||
불러오는 중...
Reference in New Issue
Block a user