Merge pull request 'release: 2026-03-26 (3건 커밋)' (#23) from develop into main
All checks were successful
Build and Deploy SNP Sync Batch / build-and-deploy (push) Successful in 36s

This commit is contained in:
HYOJIN 2026-03-26 13:41:25 +09:00
커밋 042cb54c4a
22개의 변경된 파일1313개의 추가작업 그리고 350개의 파일을 삭제

199
README.md Normal file
파일 보기

@ -0,0 +1,199 @@
SNP Sync Batch
S&P Global 해양 데이터를 수집하여 PostgreSQL에 동기화하는 Spring Batch 시스템입니다.
## 기술 스택
| 구분 | 기술 |
|------|------|
| Language | Java 17 |
| Framework | Spring Boot 3.2.1, Spring Batch 5.1.0 |
| Scheduler | Quartz 2.5.0 (JDBC Store) |
| Database | PostgreSQL (dual datasource) |
| Cache | Caffeine |
| Frontend | React + TypeScript (Vite) |
| Build | Maven (frontend-maven-plugin 통합) |
| CI/CD | Gitea Actions → systemd 자동 배포 |
## 아키텍처
```
┌─────────────────────────────────────────────────────────────────┐
│ SNP Sync Batch │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Quartz │───►│ Reader │───►│ Processor │───►│ Writer │ │
│ │ Scheduler │ │(BaseSyncR)│ │(DTO→Entity)│ │(SubChunk) │ │
│ └──────────┘ └────┬─────┘ └──────────┘ └────┬─────┘ │
│ │ │ │
│ ┌────────▼────────┐ ┌───────▼───────┐ │
│ │ Source DB │ │ Target DB │ │
│ │ (std_snp_data) │ │ (std_snp_svc) │ │
│ │ batch_flag 관리 │ │ UPSERT 저장 │ │
│ └─────────────────┘ └───────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ React Frontend (포트 8051, /snp-sync) │ │
│ │ 대시보드 │ 동기화현황 │ 실행이력 │ 스케줄 │ 타임라인 │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
## 데이터베이스 구조
### Dual Datasource
| Datasource | 스키마 | 용도 |
|------------|--------|------|
| batchDataSource (Primary) | snp_batch | Spring Batch 메타데이터, Quartz 스케줄 |
| businessDataSource | std_snp_data / std_snp_svc | 비즈니스 데이터 (소스/타겟) |
### 동기화 대상 테이블
| 도메인 | 소스 테이블 수 | 주요 내용 |
|--------|--------------|----------|
| Ship | 25개 | 선박 제원, 이력, 관계 |
| Company | 1개 | 회사 상세 정보 |
| Event | 4개 | 해양 사건/사고 |
| Facility | 1개 | 항구 시설 |
| PSC | 3개 | PSC 검사 |
| Movements | 8개 | 선박 이동 (항적) |
| Code | 2개 | 코드 (선종, 국적) |
| Risk | 1개 | 위험 지표 (+ 값 변경 이력) |
| Compliance | 2개 | 컴플라이언스 (+ 값 변경 이력) |
## 동기화 프로세스
### batch_flag 상태 흐름
```
[N] 대기 ──── Reader ────► [P] 진행 ──── Writer 성공 ────► [S] 완료
│ │
│ Writer 실패 시
│ P 상태 고착 (수동 리셋 필요)
batch_job_execution.status = 'COMPLETED' 인 데이터만 대상
```
### Reader → Writer 흐름
```
BaseSyncReader (while 루프)
├─ fetchNextGroup(11284): MIN(batch_flag='N') → 데이터 로드 → N→P
├─ fetchNextGroup(11286): 다음 그룹 연속 로드 → N→P
├─ fetchNextGroup(11317): 다음 그룹 연속 로드 → N→P
└─ fetchNextGroup(): 데이터 없음 → return null → Step 종료
BaseChunkedWriter (sub-chunk 5000건 단위)
├─ Sub-Chunk 1: [1~5000건] → 독립 트랜잭션 커밋
├─ Sub-Chunk 2: [5001~10000건] → 독립 트랜잭션 커밋
└─ ...
BatchWriteListener.afterWrite()
└─ 청크 내 모든 고유 job_execution_id → P→S 업데이트
```
### 값 변경 이력 관리 (Risk, Compliance)
데이터 동기화 Job과 값 변경 이력 Job이 분리되어 운영됩니다:
```
riskDataSyncJob (데이터 동기화)
Source → Reader → Writer
├─ UPSERT → tb_ship_risk_detail_info (최신 데이터)
└─ UPSERT → tb_ship_risk_detail_hstry (스냅샷 이력)
riskDetailChangeDataSyncJob (값 변경 이력)
tb_ship_risk_detail_hstry (스냅샷)
→ imo_no별 시계열 비교
→ indicator 컬럼 변경분 감지
→ INSERT → tb_ship_risk_detail_info_hstry (컬럼명, 이전값, 이후값)
```
## 배치 Job 목록
### 데이터 동기화 Job
| Job 이름 | 도메인 | 설명 |
|----------|--------|------|
| snpMdaDataSyncJob | Ship + Company | 선박/회사 데이터 동기화 (26 Step) |
| eventDataSyncJob | Event | 사건 데이터 동기화 (4 Step) |
| facilitySyncJob | Facility | 항구 시설 동기화 |
| pscDataSyncJob | PSC | PSC 검사 동기화 (3 Step) |
| anchorageCallDataSyncJob | Movements | 정박지 기항 |
| berthCallDataSyncJob | Movements | 부두 기항 |
| currentlyAtDataSyncJob | Movements | 현재 위치 |
| destinationDataSyncJob | Movements | 목적지 |
| portCallDataSyncJob | Movements | 항구 기항 |
| stsOperationDataSyncJob | Movements | STS 작업 |
| terminalCallDataSyncJob | Movements | 터미널 기항 |
| transitDataSyncJob | Movements | 통과 |
| codeDataSyncJob | Code | 코드 동기화 (2 Step) |
| riskDataSyncJob | Risk | 위험 지표 동기화 |
| shipComplianceDataSyncJob | Compliance | 선박 컴플라이언스 |
| companyComplianceDataSyncJob | Compliance | 회사 컴플라이언스 |
### 값 변경 이력 Job
| Job 이름 | 설명 |
|----------|------|
| riskDetailChangeDataSyncJob | Risk indicator 값 변경 이력 |
| shipComplianceChangeDataSyncJob | 선박 컴플라이언스 값 변경 이력 |
| companyComplianceChangeDataSyncJob | 회사 컴플라이언스 값 변경 이력 |
### 유지보수 Job
| Job 이름 | 설명 |
|----------|------|
| batchLogCleanupJob | 배치 로그 정리 (90일 보관) |
## 프론트엔드
| 메뉴 | 경로 | 설명 |
|------|------|------|
| 대시보드 | `/` | 통계, 실행 중 작업, 최근 실행/실패 |
| 동기화 현황 | `/sync-status` | 테이블별 N/P/S 건수, 데이터 미리보기, P 리셋 |
| 실행 이력 | `/executions` | 배치 실행 검색, 필터, 상세 |
| 작업 | `/jobs` | Job 목록, 수동 실행 |
| 스케줄 | `/schedules` | Quartz 스케줄 CRUD, Cron 미리보기 |
| 타임라인 | `/schedule-timeline` | 일/주/월별 실행 시각화 |
## CI/CD
```
develop → main 머지
→ Gitea Actions (빌드 + JAR 배포)
→ systemd path watcher (.deploy-trigger 감지)
→ restart.sh → 서비스 재시작
```
### 배포 확인
```bash
# 배포 로그
cat /devdata/services/snp-sync-batch/backend/deploy.log
# 서비스 상태
systemctl status snp-sync-batch
# 실시간 로그
journalctl -u snp-sync-batch -n 100 -f
```
## 빌드 및 실행
```bash
# 개발 환경 실행
mvn spring-boot:run
# 패키징
mvn clean package -DskipTests
# 서버 포트: 8051
# Context Path: /snp-sync
# Swagger: /snp-sync/swagger-ui.html
```

파일 보기

@ -4,6 +4,18 @@
## [Unreleased] ## [Unreleased]
## [2026-03-26]
### 추가
- Risk 데이터 동기화 대상 변경: tb_ship_risk_detail_info + 값 변경 이력 관리 (#3)
- riskDetailChangeDataSyncJob: Risk indicator 값 변경 이력 Job 분리
- 배치 로그 관리 정리 Job 구현: 90일 보관 기간 기준 자동 삭제 (#16)
- README.md 프로젝트 문서 추가
### 변경
- BaseSyncReader: while 루프 연속 그룹 로드 방식으로 변경 (multi-chunk Step 종료 문제 해결)
- BatchWriteListener: 청크 내 모든 고유 job_execution_id P→S 업데이트
## [2026-03-25] ## [2026-03-25]
### 추가 ### 추가

파일 보기

@ -30,7 +30,6 @@ public abstract class BaseSyncReader<T extends JobExecutionGroupable> implements
protected final JdbcTemplate businessJdbcTemplate; protected final JdbcTemplate businessJdbcTemplate;
private List<T> allDataBuffer = new ArrayList<>(); private List<T> allDataBuffer = new ArrayList<>();
private Long currentGroupId = null;
protected BaseSyncReader(DataSource businessDataSource, TableMetaInfo tableMetaInfo) { protected BaseSyncReader(DataSource businessDataSource, TableMetaInfo tableMetaInfo) {
this.businessJdbcTemplate = new JdbcTemplate(businessDataSource); this.businessJdbcTemplate = new JdbcTemplate(businessDataSource);
@ -53,21 +52,13 @@ public abstract class BaseSyncReader<T extends JobExecutionGroupable> implements
@Override @Override
public T read() throws Exception { public T read() throws Exception {
if (allDataBuffer.isEmpty()) { // buffer가 비어있으면 다음 그룹 로드 (연속)
// 이전 그룹 처리 완료 null 반환하여 청크 종료 while (allDataBuffer.isEmpty()) {
// (Writer + afterWrite(PS) 실행된 다음 청크에서 다음 그룹 로드)
if (currentGroupId != null) {
currentGroupId = null;
return null;
}
// 다음 그룹 로드
fetchNextGroup(); fetchNextGroup();
}
if (allDataBuffer.isEmpty()) { if (allDataBuffer.isEmpty()) {
return null; // 이상 처리할 데이터 없음 Step 종료 return null; // 이상 처리할 데이터 없음 Step 종료
} }
}
return allDataBuffer.remove(0); return allDataBuffer.remove(0);
} }
@ -78,10 +69,14 @@ public abstract class BaseSyncReader<T extends JobExecutionGroupable> implements
nextTargetId = businessJdbcTemplate.queryForObject( nextTargetId = businessJdbcTemplate.queryForObject(
CommonSql.getNextTargetQuery(getSourceTable()), Long.class); CommonSql.getNextTargetQuery(getSourceTable()), Long.class);
} catch (Exception e) { } catch (Exception e) {
log.warn("[{}] 다음 처리 대상 조회 실패: {}", getLogPrefix(), e.getMessage());
return; return;
} }
if (nextTargetId == null) return; if (nextTargetId == null) {
log.debug("[{}] 더 이상 처리할 데이터 없음", getLogPrefix());
return;
}
log.info("[{}] 다음 처리 대상 ID 발견: {}", getLogPrefix(), nextTargetId); log.info("[{}] 다음 처리 대상 ID 발견: {}", getLogPrefix(), nextTargetId);
@ -92,7 +87,5 @@ public abstract class BaseSyncReader<T extends JobExecutionGroupable> implements
// NP 전환 // NP 전환
String updateSql = CommonSql.getProcessBatchQuery(getSourceTable()); String updateSql = CommonSql.getProcessBatchQuery(getSourceTable());
businessJdbcTemplate.update(updateSql, nextTargetId); businessJdbcTemplate.update(updateSql, nextTargetId);
currentGroupId = nextTargetId;
} }
} }

파일 보기

@ -5,10 +5,12 @@ import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.item.Chunk; import org.springframework.batch.item.Chunk;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import java.util.Set;
import java.util.stream.Collectors;
/** /**
* Writer 성공 batch_flag PS 업데이트 리스너 * Writer 성공 batch_flag PS 업데이트 리스너
* * 청크 모든 고유 job_execution_id에 대해 S 업데이트
* SQL은 실행 시점에 생성 (CommonSql.SOURCE_SCHEMA 초기화 보장)
*/ */
@Slf4j @Slf4j
public class BatchWriteListener<S extends JobExecutionGroupable> implements ItemWriteListener<S> { public class BatchWriteListener<S extends JobExecutionGroupable> implements ItemWriteListener<S> {
@ -25,11 +27,14 @@ public class BatchWriteListener<S extends JobExecutionGroupable> implements Item
public void afterWrite(Chunk<? extends S> items) { public void afterWrite(Chunk<? extends S> items) {
if (items.isEmpty()) return; if (items.isEmpty()) return;
Long jobExecutionId = items.getItems().get(0).getJobExecutionId();
try {
// SQL을 실행 시점에 생성하여 SOURCE_SCHEMA null 문제 방지
String sql = CommonSql.getCompleteBatchQuery(sourceTable); String sql = CommonSql.getCompleteBatchQuery(sourceTable);
Set<Long> jobExecutionIds = items.getItems().stream()
.map(JobExecutionGroupable::getJobExecutionId)
.collect(Collectors.toSet());
for (Long jobExecutionId : jobExecutionIds) {
try {
int updatedRows = businessJdbcTemplate.update(sql, jobExecutionId); int updatedRows = businessJdbcTemplate.update(sql, jobExecutionId);
log.info("[BatchWriteListener] Success update 'S'. jobExecutionId: {}, rows: {}", jobExecutionId, updatedRows); log.info("[BatchWriteListener] Success update 'S'. jobExecutionId: {}, rows: {}", jobExecutionId, updatedRows);
} catch (Exception e) { } catch (Exception e) {
@ -37,13 +42,16 @@ public class BatchWriteListener<S extends JobExecutionGroupable> implements Item
throw e; throw e;
} }
} }
}
@Override @Override
public void onWriteError(Exception exception, Chunk<? extends S> items) { public void onWriteError(Exception exception, Chunk<? extends S> items) {
if (!items.isEmpty()) { if (!items.isEmpty()) {
Long jobExecutionId = items.getItems().get(0).getJobExecutionId(); Set<Long> ids = items.getItems().stream()
log.error("[BatchWriteListener] Write Error Detected! jobExecutionId: {}. Status will NOT be updated to 'S'. Error: {}", .map(JobExecutionGroupable::getJobExecutionId)
jobExecutionId, exception.getMessage()); .collect(Collectors.toSet());
log.error("[BatchWriteListener] Write Error Detected! jobExecutionIds: {}. Status will NOT be updated to 'S'. Error: {}",
ids, exception.getMessage());
} }
if (exception instanceof RuntimeException) { if (exception instanceof RuntimeException) {

파일 보기

@ -309,10 +309,13 @@ public class TableMetaInfo {
// Risk & Compliance Tables // Risk & Compliance Tables
@Value("${app.batch.target-schema.tables.risk-compliance-001}") @Value("${app.batch.target-schema.tables.risk-compliance-001}")
public String targetTbShipRiskInfo; public String targetTbShipRiskDetailInfo;
@Value("${app.batch.target-schema.tables.risk-compliance-001-1}")
public String targetTbShipRiskDetailInfoHstry;
@Value("${app.batch.target-schema.tables.risk-compliance-002}") @Value("${app.batch.target-schema.tables.risk-compliance-002}")
public String targetTbShipRiskHstry; public String targetTbShipRiskDetailHstry;
@Value("${app.batch.target-schema.tables.risk-compliance-003}") @Value("${app.batch.target-schema.tables.risk-compliance-003}")
public String targetTbShipComplianceInfo; public String targetTbShipComplianceInfo;

파일 보기

@ -0,0 +1,114 @@
package com.snp.batch.jobs.datasync.batch.risk.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.common.util.TableMetaInfo;
import com.snp.batch.jobs.datasync.batch.risk.dto.RiskChangeDto;
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskChangeEntity;
import com.snp.batch.jobs.datasync.batch.risk.processor.RiskChangeProcessor;
import com.snp.batch.jobs.datasync.batch.risk.reader.RiskChangeReader;
import com.snp.batch.jobs.datasync.batch.risk.repository.RiskRepository;
import com.snp.batch.jobs.datasync.batch.risk.writer.RiskChangeWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
/**
* Risk 변경 이력 관리 Job
* tb_ship_risk_detail_hstry (스냅샷)에서 시계열 비교 tb_ship_risk_detail_info_hstry에 변경분 저장
*/
@Slf4j
@Configuration
public class RiskDetailChangeSyncJobConfig extends BaseJobConfig<RiskChangeDto, RiskChangeEntity> {
private final TableMetaInfo tableMetaInfo;
private final RiskRepository riskRepository;
private final DataSource batchDataSource;
private final String targetSchema;
private static final int CHUNK_SIZE = 1000;
public RiskDetailChangeSyncJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
RiskRepository riskRepository,
TableMetaInfo tableMetaInfo,
@Qualifier("batchDataSource") DataSource batchDataSource,
@Value("${app.batch.target-schema.name}") String targetSchema) {
super(jobRepository, transactionManager);
this.riskRepository = riskRepository;
this.tableMetaInfo = tableMetaInfo;
this.batchDataSource = batchDataSource;
this.targetSchema = targetSchema;
}
@Override
protected String getJobName() {
return "riskDetailChangeDataSyncJob";
}
@Override
protected String getStepName() {
return "riskDetailChangeSyncStep";
}
@Override
protected ItemReader<RiskChangeDto> createReader() {
return riskChangeReader(batchDataSource, tableMetaInfo, targetSchema);
}
@Override
protected ItemProcessor<RiskChangeDto, RiskChangeEntity> createProcessor() {
return new RiskChangeProcessor();
}
@Override
protected ItemWriter<RiskChangeEntity> createWriter() {
return new RiskChangeWriter(riskRepository);
}
@Bean
@StepScope
public ItemReader<RiskChangeDto> riskChangeReader(
@Qualifier("batchDataSource") DataSource batchDataSource,
TableMetaInfo tableMetaInfo,
@Value("${app.batch.target-schema.name}") String targetSchema) {
return new RiskChangeReader(batchDataSource, tableMetaInfo, targetSchema);
}
@Bean(name = "riskDetailChangeSyncStep")
public Step riskDetailChangeSyncStep() {
log.info("Step 생성: riskDetailChangeSyncStep");
return new StepBuilder(getStepName(), jobRepository)
.<RiskChangeDto, RiskChangeEntity>chunk(CHUNK_SIZE, transactionManager)
.reader(createReader())
.processor(createProcessor())
.writer(createWriter())
.build();
}
@Override
protected Job createJobFlow(JobBuilder jobBuilder) {
return jobBuilder
.start(riskDetailChangeSyncStep())
.build();
}
@Bean(name = "riskDetailChangeDataSyncJob")
public Job riskDetailChangeDataSyncJob() {
return job();
}
}

파일 보기

@ -0,0 +1,25 @@
package com.snp.batch.jobs.datasync.batch.risk.dto;
import com.snp.batch.common.util.JobExecutionGroupable;
import lombok.*;
import java.time.LocalDateTime;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RiskChangeDto implements JobExecutionGroupable {
private Long jobExecutionId;
private String imoNo;
private LocalDateTime lastMdfcnDt;
private String flctnColNm;
private String bfrVal;
private String aftrVal;
@Override
public Long getJobExecutionId() {
return this.jobExecutionId;
}
}

파일 보기

@ -1,6 +1,8 @@
package com.snp.batch.jobs.datasync.batch.risk.dto; package com.snp.batch.jobs.datasync.batch.risk.dto;
import com.snp.batch.common.util.JobExecutionGroupable; import com.snp.batch.common.util.JobExecutionGroupable;
import lombok.*; import lombok.*;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@Getter @Getter
@ -12,46 +14,88 @@ public class RiskDto implements JobExecutionGroupable {
private Long jobExecutionId; private Long jobExecutionId;
private String imoNo; private String imoNo;
private LocalDateTime lastMdfcnDt; private LocalDateTime lastMdfcnDt;
private String riskDataMaint; private Integer riskDataMaint;
private String aisNotrcvElpsDays; private Integer aisNotrcvElpsDays;
private String aisLwrnkDays; private String aisNotrcvElpsDaysDesc;
private String aisUpImoDesc; private Integer aisLwrnkDays;
private String othrShipNmVoyYn; private String aisLwrnkDaysDesc;
private String mmsiAnomMessage; private Integer aisUpImoDesc;
private String recentDarkActv; private String aisUpImoDescVal;
private String portPrtcll; private Integer othrShipNmVoyYn;
private String portRisk; private String othrShipNmVoyYnDesc;
private String stsJob; private Integer mmsiAnomMessage;
private String driftChg; private String mmsiAnomMessageDesc;
private String riskEvent; private Integer recentDarkActv;
private String ntnltyChg; private String recentDarkActvDesc;
private String ntnltyPrsMouPerf; private Integer portPrtcll;
private String ntnltyTkyMouPerf; private String portPrtcllDesc;
private String ntnltyUscgMouPerf; private Integer portRisk;
private String uscgExclShipCert; private String portRiskDesc;
private String pscInspectionElpsHr; private Integer stsJob;
private String pscInspection; private String stsJobDesc;
private String pscDefect; private Integer driftChg;
private String pscDetained; private String driftChgDesc;
private String nowSmgrcEvdc; private Integer riskEvent;
private String doccChg; private String riskEventDesc;
private String nowClfic; private String riskEventDescExt;
private String clficStatusChg; private Integer ntnltyChg;
private String pniInsrnc; private String ntnltyChgDesc;
private String shipNmChg; private Integer ntnltyPrsMouPerf;
private String gboChg; private String ntnltyPrsMouPerfDesc;
private String vslage; private Integer ntnltyTkyMouPerf;
private String ilglFshrViol; private String ntnltyTkyMouPerfDesc;
private String draftChg; private Integer ntnltyUscgMouPerf;
private String recentSanctionPrtcll; private String ntnltyUscgMouPerfDesc;
private String snglShipVoy; private Integer uscgExclShipCert;
private String fltsfty; private String uscgExclShipCertDesc;
private String fltPsc; private Integer pscInspectionElpsHr;
private String spcInspectionOvdue; private String pscInspectionElpsHrDesc;
private String ownrUnk; private Integer pscInspection;
private String rssPortCall; private String pscInspectionDesc;
private String rssOwnrReg; private Integer pscDefect;
private String rssSts; private String pscDefectDesc;
private Integer pscDetained;
private String pscDetainedDesc;
private Integer nowSmgrcEvdc;
private String nowSmgrcEvdcDesc;
private Integer doccChg;
private String doccChgDesc;
private Integer nowClfic;
private String nowClficDesc;
private String nowClficDescExt;
private Integer clficStatusChg;
private String clficStatusChgDesc;
private Integer pniInsrnc;
private String pniInsrncDesc;
private String pniInsrncDescExt;
private Integer shipNmChg;
private String shipNmChgDesc;
private Integer gboChg;
private String gboChgDesc;
private Integer vslage;
private String vslageDesc;
private Integer ilglFshrViol;
private String ilglFshrViolDesc;
private Integer draftChg;
private String draftChgDesc;
private Integer recentSanctionPrtcll;
private String recentSanctionPrtcllDesc;
private Integer snglShipVoy;
private String snglShipVoyDesc;
private Integer fltsfty;
private String fltsftyDesc;
private Integer fltPsc;
private String fltPscDesc;
private Integer spcInspectionOvdue;
private String spcInspectionOvdueDesc;
private Integer ownrUnk;
private String ownrUnkDesc;
private Integer rssPortCall;
private String rssPortCallDesc;
private Integer rssOwnrReg;
private String rssOwnrRegDesc;
private Integer rssSts;
private String rssStsDesc;
@Override @Override
public Long getJobExecutionId() { public Long getJobExecutionId() {

파일 보기

@ -0,0 +1,25 @@
package com.snp.batch.jobs.datasync.batch.risk.entity;
import com.snp.batch.common.util.JobExecutionGroupable;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.time.LocalDateTime;
@Data
@SuperBuilder
@AllArgsConstructor
public class RiskChangeEntity implements JobExecutionGroupable {
private String imoNo;
private LocalDateTime lastMdfcnDt;
private String flctnColNm;
private String bfrVal;
private String aftrVal;
private Long jobExecutionId;
@Override
public Long getJobExecutionId() {
return this.jobExecutionId;
}
}

파일 보기

@ -1,7 +1,9 @@
package com.snp.batch.jobs.datasync.batch.risk.entity; package com.snp.batch.jobs.datasync.batch.risk.entity;
import com.snp.batch.common.util.JobExecutionGroupable; import com.snp.batch.common.util.JobExecutionGroupable;
import lombok.*; import lombok.*;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@Data @Data
@ -10,46 +12,88 @@ import java.time.LocalDateTime;
public class RiskEntity implements JobExecutionGroupable { public class RiskEntity implements JobExecutionGroupable {
private String imoNo; private String imoNo;
private LocalDateTime lastMdfcnDt; private LocalDateTime lastMdfcnDt;
private String riskDataMaint; private Integer riskDataMaint;
private String aisNotrcvElpsDays; private Integer aisNotrcvElpsDays;
private String aisLwrnkDays; private String aisNotrcvElpsDaysDesc;
private String aisUpImoDesc; private Integer aisLwrnkDays;
private String othrShipNmVoyYn; private String aisLwrnkDaysDesc;
private String mmsiAnomMessage; private Integer aisUpImoDesc;
private String recentDarkActv; private String aisUpImoDescVal;
private String portPrtcll; private Integer othrShipNmVoyYn;
private String portRisk; private String othrShipNmVoyYnDesc;
private String stsJob; private Integer mmsiAnomMessage;
private String driftChg; private String mmsiAnomMessageDesc;
private String riskEvent; private Integer recentDarkActv;
private String ntnltyChg; private String recentDarkActvDesc;
private String ntnltyPrsMouPerf; private Integer portPrtcll;
private String ntnltyTkyMouPerf; private String portPrtcllDesc;
private String ntnltyUscgMouPerf; private Integer portRisk;
private String uscgExclShipCert; private String portRiskDesc;
private String pscInspectionElpsHr; private Integer stsJob;
private String pscInspection; private String stsJobDesc;
private String pscDefect; private Integer driftChg;
private String pscDetained; private String driftChgDesc;
private String nowSmgrcEvdc; private Integer riskEvent;
private String doccChg; private String riskEventDesc;
private String nowClfic; private String riskEventDescExt;
private String clficStatusChg; private Integer ntnltyChg;
private String pniInsrnc; private String ntnltyChgDesc;
private String shipNmChg; private Integer ntnltyPrsMouPerf;
private String gboChg; private String ntnltyPrsMouPerfDesc;
private String vslage; private Integer ntnltyTkyMouPerf;
private String ilglFshrViol; private String ntnltyTkyMouPerfDesc;
private String draftChg; private Integer ntnltyUscgMouPerf;
private String recentSanctionPrtcll; private String ntnltyUscgMouPerfDesc;
private String snglShipVoy; private Integer uscgExclShipCert;
private String fltsfty; private String uscgExclShipCertDesc;
private String fltPsc; private Integer pscInspectionElpsHr;
private String spcInspectionOvdue; private String pscInspectionElpsHrDesc;
private String ownrUnk; private Integer pscInspection;
private String rssPortCall; private String pscInspectionDesc;
private String rssOwnrReg; private Integer pscDefect;
private String rssSts; private String pscDefectDesc;
private Integer pscDetained;
private String pscDetainedDesc;
private Integer nowSmgrcEvdc;
private String nowSmgrcEvdcDesc;
private Integer doccChg;
private String doccChgDesc;
private Integer nowClfic;
private String nowClficDesc;
private String nowClficDescExt;
private Integer clficStatusChg;
private String clficStatusChgDesc;
private Integer pniInsrnc;
private String pniInsrncDesc;
private String pniInsrncDescExt;
private Integer shipNmChg;
private String shipNmChgDesc;
private Integer gboChg;
private String gboChgDesc;
private Integer vslage;
private String vslageDesc;
private Integer ilglFshrViol;
private String ilglFshrViolDesc;
private Integer draftChg;
private String draftChgDesc;
private Integer recentSanctionPrtcll;
private String recentSanctionPrtcllDesc;
private Integer snglShipVoy;
private String snglShipVoyDesc;
private Integer fltsfty;
private String fltsftyDesc;
private Integer fltPsc;
private String fltPscDesc;
private Integer spcInspectionOvdue;
private String spcInspectionOvdueDesc;
private Integer ownrUnk;
private String ownrUnkDesc;
private Integer rssPortCall;
private String rssPortCallDesc;
private Integer rssOwnrReg;
private String rssOwnrRegDesc;
private Integer rssSts;
private String rssStsDesc;
private Long jobExecutionId; private Long jobExecutionId;

파일 보기

@ -0,0 +1,21 @@
package com.snp.batch.jobs.datasync.batch.risk.processor;
import com.snp.batch.common.batch.processor.BaseProcessor;
import com.snp.batch.jobs.datasync.batch.risk.dto.RiskChangeDto;
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskChangeEntity;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RiskChangeProcessor extends BaseProcessor<RiskChangeDto, RiskChangeEntity> {
@Override
protected RiskChangeEntity processItem(RiskChangeDto dto) throws Exception {
return RiskChangeEntity.builder()
.jobExecutionId(dto.getJobExecutionId())
.imoNo(dto.getImoNo())
.lastMdfcnDt(dto.getLastMdfcnDt())
.flctnColNm(dto.getFlctnColNm())
.bfrVal(dto.getBfrVal())
.aftrVal(dto.getAftrVal())
.build();
}
}

파일 보기

@ -15,44 +15,86 @@ public class RiskProcessor extends BaseProcessor<RiskDto, RiskEntity> {
.lastMdfcnDt(dto.getLastMdfcnDt()) .lastMdfcnDt(dto.getLastMdfcnDt())
.riskDataMaint(dto.getRiskDataMaint()) .riskDataMaint(dto.getRiskDataMaint())
.aisNotrcvElpsDays(dto.getAisNotrcvElpsDays()) .aisNotrcvElpsDays(dto.getAisNotrcvElpsDays())
.aisNotrcvElpsDaysDesc(dto.getAisNotrcvElpsDaysDesc())
.aisLwrnkDays(dto.getAisLwrnkDays()) .aisLwrnkDays(dto.getAisLwrnkDays())
.aisLwrnkDaysDesc(dto.getAisLwrnkDaysDesc())
.aisUpImoDesc(dto.getAisUpImoDesc()) .aisUpImoDesc(dto.getAisUpImoDesc())
.aisUpImoDescVal(dto.getAisUpImoDescVal())
.othrShipNmVoyYn(dto.getOthrShipNmVoyYn()) .othrShipNmVoyYn(dto.getOthrShipNmVoyYn())
.othrShipNmVoyYnDesc(dto.getOthrShipNmVoyYnDesc())
.mmsiAnomMessage(dto.getMmsiAnomMessage()) .mmsiAnomMessage(dto.getMmsiAnomMessage())
.mmsiAnomMessageDesc(dto.getMmsiAnomMessageDesc())
.recentDarkActv(dto.getRecentDarkActv()) .recentDarkActv(dto.getRecentDarkActv())
.recentDarkActvDesc(dto.getRecentDarkActvDesc())
.portPrtcll(dto.getPortPrtcll()) .portPrtcll(dto.getPortPrtcll())
.portPrtcllDesc(dto.getPortPrtcllDesc())
.portRisk(dto.getPortRisk()) .portRisk(dto.getPortRisk())
.portRiskDesc(dto.getPortRiskDesc())
.stsJob(dto.getStsJob()) .stsJob(dto.getStsJob())
.stsJobDesc(dto.getStsJobDesc())
.driftChg(dto.getDriftChg()) .driftChg(dto.getDriftChg())
.driftChgDesc(dto.getDriftChgDesc())
.riskEvent(dto.getRiskEvent()) .riskEvent(dto.getRiskEvent())
.riskEventDesc(dto.getRiskEventDesc())
.riskEventDescExt(dto.getRiskEventDescExt())
.ntnltyChg(dto.getNtnltyChg()) .ntnltyChg(dto.getNtnltyChg())
.ntnltyChgDesc(dto.getNtnltyChgDesc())
.ntnltyPrsMouPerf(dto.getNtnltyPrsMouPerf()) .ntnltyPrsMouPerf(dto.getNtnltyPrsMouPerf())
.ntnltyPrsMouPerfDesc(dto.getNtnltyPrsMouPerfDesc())
.ntnltyTkyMouPerf(dto.getNtnltyTkyMouPerf()) .ntnltyTkyMouPerf(dto.getNtnltyTkyMouPerf())
.ntnltyTkyMouPerfDesc(dto.getNtnltyTkyMouPerfDesc())
.ntnltyUscgMouPerf(dto.getNtnltyUscgMouPerf()) .ntnltyUscgMouPerf(dto.getNtnltyUscgMouPerf())
.ntnltyUscgMouPerfDesc(dto.getNtnltyUscgMouPerfDesc())
.uscgExclShipCert(dto.getUscgExclShipCert()) .uscgExclShipCert(dto.getUscgExclShipCert())
.uscgExclShipCertDesc(dto.getUscgExclShipCertDesc())
.pscInspectionElpsHr(dto.getPscInspectionElpsHr()) .pscInspectionElpsHr(dto.getPscInspectionElpsHr())
.pscInspectionElpsHrDesc(dto.getPscInspectionElpsHrDesc())
.pscInspection(dto.getPscInspection()) .pscInspection(dto.getPscInspection())
.pscInspectionDesc(dto.getPscInspectionDesc())
.pscDefect(dto.getPscDefect()) .pscDefect(dto.getPscDefect())
.pscDefectDesc(dto.getPscDefectDesc())
.pscDetained(dto.getPscDetained()) .pscDetained(dto.getPscDetained())
.pscDetainedDesc(dto.getPscDetainedDesc())
.nowSmgrcEvdc(dto.getNowSmgrcEvdc()) .nowSmgrcEvdc(dto.getNowSmgrcEvdc())
.nowSmgrcEvdcDesc(dto.getNowSmgrcEvdcDesc())
.doccChg(dto.getDoccChg()) .doccChg(dto.getDoccChg())
.doccChgDesc(dto.getDoccChgDesc())
.nowClfic(dto.getNowClfic()) .nowClfic(dto.getNowClfic())
.nowClficDesc(dto.getNowClficDesc())
.nowClficDescExt(dto.getNowClficDescExt())
.clficStatusChg(dto.getClficStatusChg()) .clficStatusChg(dto.getClficStatusChg())
.clficStatusChgDesc(dto.getClficStatusChgDesc())
.pniInsrnc(dto.getPniInsrnc()) .pniInsrnc(dto.getPniInsrnc())
.pniInsrncDesc(dto.getPniInsrncDesc())
.pniInsrncDescExt(dto.getPniInsrncDescExt())
.shipNmChg(dto.getShipNmChg()) .shipNmChg(dto.getShipNmChg())
.shipNmChgDesc(dto.getShipNmChgDesc())
.gboChg(dto.getGboChg()) .gboChg(dto.getGboChg())
.gboChgDesc(dto.getGboChgDesc())
.vslage(dto.getVslage()) .vslage(dto.getVslage())
.vslageDesc(dto.getVslageDesc())
.ilglFshrViol(dto.getIlglFshrViol()) .ilglFshrViol(dto.getIlglFshrViol())
.ilglFshrViolDesc(dto.getIlglFshrViolDesc())
.draftChg(dto.getDraftChg()) .draftChg(dto.getDraftChg())
.draftChgDesc(dto.getDraftChgDesc())
.recentSanctionPrtcll(dto.getRecentSanctionPrtcll()) .recentSanctionPrtcll(dto.getRecentSanctionPrtcll())
.recentSanctionPrtcllDesc(dto.getRecentSanctionPrtcllDesc())
.snglShipVoy(dto.getSnglShipVoy()) .snglShipVoy(dto.getSnglShipVoy())
.snglShipVoyDesc(dto.getSnglShipVoyDesc())
.fltsfty(dto.getFltsfty()) .fltsfty(dto.getFltsfty())
.fltsftyDesc(dto.getFltsftyDesc())
.fltPsc(dto.getFltPsc()) .fltPsc(dto.getFltPsc())
.fltPscDesc(dto.getFltPscDesc())
.spcInspectionOvdue(dto.getSpcInspectionOvdue()) .spcInspectionOvdue(dto.getSpcInspectionOvdue())
.spcInspectionOvdueDesc(dto.getSpcInspectionOvdueDesc())
.ownrUnk(dto.getOwnrUnk()) .ownrUnk(dto.getOwnrUnk())
.ownrUnkDesc(dto.getOwnrUnkDesc())
.rssPortCall(dto.getRssPortCall()) .rssPortCall(dto.getRssPortCall())
.rssPortCallDesc(dto.getRssPortCallDesc())
.rssOwnrReg(dto.getRssOwnrReg()) .rssOwnrReg(dto.getRssOwnrReg())
.rssOwnrRegDesc(dto.getRssOwnrRegDesc())
.rssSts(dto.getRssSts()) .rssSts(dto.getRssSts())
.rssStsDesc(dto.getRssStsDesc())
.build(); .build();
} }
} }

파일 보기

@ -0,0 +1,163 @@
package com.snp.batch.jobs.datasync.batch.risk.reader;
import com.snp.batch.common.util.TableMetaInfo;
import com.snp.batch.jobs.datasync.batch.risk.dto.RiskChangeDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.*;
/**
* Risk 변경 이력 Reader
* tb_ship_risk_detail_hstry (스냅샷 이력)에서 imo_no별 시계열 비교하여 변경분 감지
*/
@Slf4j
public class RiskChangeReader implements ItemReader<RiskChangeDto> {
private final TableMetaInfo tableMetaInfo;
private final JdbcTemplate batchJdbcTemplate;
private final String targetSchema;
private List<RiskChangeDto> changeBuffer = new ArrayList<>();
private boolean initialized = false;
private static final List<String> INDICATOR_COLUMNS = List.of(
"ais_notrcv_elps_days", "ais_lwrnk_days", "ais_up_imo_desc",
"othr_ship_nm_voy_yn", "mmsi_anom_message", "recent_dark_actv",
"port_prtcll", "port_risk", "sts_job", "drift_chg",
"risk_event", "ntnlty_chg", "ntnlty_prs_mou_perf",
"ntnlty_tky_mou_perf", "ntnlty_uscg_mou_perf", "uscg_excl_ship_cert",
"psc_inspection_elps_hr", "psc_inspection", "psc_defect", "psc_detained",
"now_smgrc_evdc", "docc_chg", "now_clfic", "clfic_status_chg",
"pni_insrnc", "ship_nm_chg", "gbo_chg", "vslage",
"ilgl_fshr_viol", "draft_chg", "recent_sanction_prtcll", "sngl_ship_voy",
"fltsfty", "flt_psc", "spc_inspection_ovdue", "ownr_unk",
"rss_port_call", "rss_ownr_reg", "rss_sts"
);
public RiskChangeReader(@Qualifier("batchDataSource") DataSource batchDataSource,
TableMetaInfo tableMetaInfo,
@Value("${app.batch.target-schema.name}") String targetSchema) {
this.batchJdbcTemplate = new JdbcTemplate(batchDataSource);
this.tableMetaInfo = tableMetaInfo;
this.targetSchema = targetSchema;
}
@Override
public RiskChangeDto read() throws Exception {
if (!initialized) {
initializeChangeBuffer();
initialized = true;
}
if (changeBuffer.isEmpty()) return null;
return changeBuffer.remove(0);
}
private void initializeChangeBuffer() {
log.info("[RiskChangeReader] 변경 이력 감지 시작");
// 스냅샷 이력에서 indicator 컬럼만 조회 (imo_no, last_mdfcn_dt 정렬)
String columnList = String.join(", ", INDICATOR_COLUMNS);
String sql = String.format("""
SELECT imo_no, last_mdfcn_dt, %s
FROM %s.%s
ORDER BY imo_no, last_mdfcn_dt ASC
""", columnList, targetSchema, tableMetaInfo.targetTbShipRiskDetailHstry);
List<Map<String, Object>> allData = batchJdbcTemplate.queryForList(sql);
log.info("[RiskChangeReader] 스냅샷 이력 조회 완료: {} 건", allData.size());
// 기존 변경 이력 조회 (중복 방지)
String existingSql = String.format("""
SELECT imo_no, last_mdfcn_dt, flctn_col_nm
FROM %s.%s
""", targetSchema, tableMetaInfo.targetTbShipRiskDetailInfoHstry);
Set<String> existingKeys = new HashSet<>();
try {
List<Map<String, Object>> existingData = batchJdbcTemplate.queryForList(existingSql);
for (Map<String, Object> row : existingData) {
existingKeys.add(buildExistingKey(row));
}
log.info("[RiskChangeReader] 기존 변경 이력 데이터: {} 건", existingKeys.size());
} catch (Exception e) {
log.warn("[RiskChangeReader] 기존 데이터 조회 실패: {}", e.getMessage());
}
// imo_no별 그룹핑
Map<String, List<Map<String, Object>>> groupedByImo = new LinkedHashMap<>();
for (Map<String, Object> row : allData) {
String imoNo = (String) row.get("imo_no");
groupedByImo.computeIfAbsent(imoNo, k -> new ArrayList<>()).add(row);
}
// imo_no별 시계열 비교
long changeCount = 0;
for (Map.Entry<String, List<Map<String, Object>>> entry : groupedByImo.entrySet()) {
String imoNo = entry.getKey();
List<Map<String, Object>> records = entry.getValue();
Map<String, Object> previousRecord = null;
for (Map<String, Object> currentRecord : records) {
if (previousRecord != null) {
List<RiskChangeDto> changes = detectChanges(imoNo, previousRecord, currentRecord, existingKeys);
changeBuffer.addAll(changes);
changeCount += changes.size();
}
previousRecord = currentRecord;
}
}
log.info("[RiskChangeReader] 변경 이력 감지 완료: {} 건 (imo_no 그룹: {} 개)", changeCount, groupedByImo.size());
}
private List<RiskChangeDto> detectChanges(String imoNo,
Map<String, Object> previousRecord,
Map<String, Object> currentRecord,
Set<String> existingKeys) {
List<RiskChangeDto> changes = new ArrayList<>();
Timestamp currentTs = (Timestamp) currentRecord.get("last_mdfcn_dt");
LocalDateTime lastMdfcnDt = currentTs != null ? currentTs.toLocalDateTime() : null;
for (String column : INDICATOR_COLUMNS) {
Object prevValue = previousRecord.get(column);
Object currValue = currentRecord.get(column);
if (!Objects.equals(prevValue, currValue)) {
String existingKey = imoNo + "|" + lastMdfcnDt + "|" + column;
if (existingKeys.contains(existingKey)) continue;
changes.add(RiskChangeDto.builder()
.jobExecutionId(0L)
.imoNo(imoNo)
.lastMdfcnDt(lastMdfcnDt)
.flctnColNm(column)
.bfrVal(convertToString(prevValue))
.aftrVal(convertToString(currValue))
.build());
}
}
return changes;
}
private String buildExistingKey(Map<String, Object> row) {
String imoNo = (String) row.get("imo_no");
Object lastMdfcnDtObj = row.get("last_mdfcn_dt");
LocalDateTime lastMdfcnDt = null;
if (lastMdfcnDtObj instanceof Timestamp) {
lastMdfcnDt = ((Timestamp) lastMdfcnDtObj).toLocalDateTime();
}
String flctnColNm = (String) row.get("flctn_col_nm");
return imoNo + "|" + lastMdfcnDt + "|" + flctnColNm;
}
private String convertToString(Object value) {
if (value == null) return "null";
return String.valueOf(value);
}
}

파일 보기

@ -31,46 +31,88 @@ public class RiskReader extends BaseSyncReader<RiskDto> {
.jobExecutionId(targetId) .jobExecutionId(targetId)
.imoNo(rs.getString("imo_no")) .imoNo(rs.getString("imo_no"))
.lastMdfcnDt(lastMdfcnDtTs != null ? lastMdfcnDtTs.toLocalDateTime() : null) .lastMdfcnDt(lastMdfcnDtTs != null ? lastMdfcnDtTs.toLocalDateTime() : null)
.riskDataMaint(rs.getString("risk_data_maint")) .riskDataMaint(rs.getObject("risk_data_maint") != null ? rs.getInt("risk_data_maint") : null)
.aisNotrcvElpsDays(rs.getString("ais_notrcv_elps_days")) .aisNotrcvElpsDays(rs.getObject("ais_notrcv_elps_days") != null ? rs.getInt("ais_notrcv_elps_days") : null)
.aisLwrnkDays(rs.getString("ais_lwrnk_days")) .aisNotrcvElpsDaysDesc(rs.getString("ais_notrcv_elps_days_desc"))
.aisUpImoDesc(rs.getString("ais_up_imo_desc")) .aisLwrnkDays(rs.getObject("ais_lwrnk_days") != null ? rs.getInt("ais_lwrnk_days") : null)
.othrShipNmVoyYn(rs.getString("othr_ship_nm_voy_yn")) .aisLwrnkDaysDesc(rs.getString("ais_lwrnk_days_desc"))
.mmsiAnomMessage(rs.getString("mmsi_anom_message")) .aisUpImoDesc(rs.getObject("ais_up_imo_desc") != null ? rs.getInt("ais_up_imo_desc") : null)
.recentDarkActv(rs.getString("recent_dark_actv")) .aisUpImoDescVal(rs.getString("ais_up_imo_desc_val"))
.portPrtcll(rs.getString("port_prtcll")) .othrShipNmVoyYn(rs.getObject("othr_ship_nm_voy_yn") != null ? rs.getInt("othr_ship_nm_voy_yn") : null)
.portRisk(rs.getString("port_risk")) .othrShipNmVoyYnDesc(rs.getString("othr_ship_nm_voy_yn_desc"))
.stsJob(rs.getString("sts_job")) .mmsiAnomMessage(rs.getObject("mmsi_anom_message") != null ? rs.getInt("mmsi_anom_message") : null)
.driftChg(rs.getString("drift_chg")) .mmsiAnomMessageDesc(rs.getString("mmsi_anom_message_desc"))
.riskEvent(rs.getString("risk_event")) .recentDarkActv(rs.getObject("recent_dark_actv") != null ? rs.getInt("recent_dark_actv") : null)
.ntnltyChg(rs.getString("ntnlty_chg")) .recentDarkActvDesc(rs.getString("recent_dark_actv_desc"))
.ntnltyPrsMouPerf(rs.getString("ntnlty_prs_mou_perf")) .portPrtcll(rs.getObject("port_prtcll") != null ? rs.getInt("port_prtcll") : null)
.ntnltyTkyMouPerf(rs.getString("ntnlty_tky_mou_perf")) .portPrtcllDesc(rs.getString("port_prtcll_desc"))
.ntnltyUscgMouPerf(rs.getString("ntnlty_uscg_mou_perf")) .portRisk(rs.getObject("port_risk") != null ? rs.getInt("port_risk") : null)
.uscgExclShipCert(rs.getString("uscg_excl_ship_cert")) .portRiskDesc(rs.getString("port_risk_desc"))
.pscInspectionElpsHr(rs.getString("psc_inspection_elps_hr")) .stsJob(rs.getObject("sts_job") != null ? rs.getInt("sts_job") : null)
.pscInspection(rs.getString("psc_inspection")) .stsJobDesc(rs.getString("sts_job_desc"))
.pscDefect(rs.getString("psc_defect")) .driftChg(rs.getObject("drift_chg") != null ? rs.getInt("drift_chg") : null)
.pscDetained(rs.getString("psc_detained")) .driftChgDesc(rs.getString("drift_chg_desc"))
.nowSmgrcEvdc(rs.getString("now_smgrc_evdc")) .riskEvent(rs.getObject("risk_event") != null ? rs.getInt("risk_event") : null)
.doccChg(rs.getString("docc_chg")) .riskEventDesc(rs.getString("risk_event_desc"))
.nowClfic(rs.getString("now_clfic")) .riskEventDescExt(rs.getString("risk_event_desc_ext"))
.clficStatusChg(rs.getString("clfic_status_chg")) .ntnltyChg(rs.getObject("ntnlty_chg") != null ? rs.getInt("ntnlty_chg") : null)
.pniInsrnc(rs.getString("pni_insrnc")) .ntnltyChgDesc(rs.getString("ntnlty_chg_desc"))
.shipNmChg(rs.getString("ship_nm_chg")) .ntnltyPrsMouPerf(rs.getObject("ntnlty_prs_mou_perf") != null ? rs.getInt("ntnlty_prs_mou_perf") : null)
.gboChg(rs.getString("gbo_chg")) .ntnltyPrsMouPerfDesc(rs.getString("ntnlty_prs_mou_perf_desc"))
.vslage(rs.getString("vslage")) .ntnltyTkyMouPerf(rs.getObject("ntnlty_tky_mou_perf") != null ? rs.getInt("ntnlty_tky_mou_perf") : null)
.ilglFshrViol(rs.getString("ilgl_fshr_viol")) .ntnltyTkyMouPerfDesc(rs.getString("ntnlty_tky_mou_perf_desc"))
.draftChg(rs.getString("draft_chg")) .ntnltyUscgMouPerf(rs.getObject("ntnlty_uscg_mou_perf") != null ? rs.getInt("ntnlty_uscg_mou_perf") : null)
.recentSanctionPrtcll(rs.getString("recent_sanction_prtcll")) .ntnltyUscgMouPerfDesc(rs.getString("ntnlty_uscg_mou_perf_desc"))
.snglShipVoy(rs.getString("sngl_ship_voy")) .uscgExclShipCert(rs.getObject("uscg_excl_ship_cert") != null ? rs.getInt("uscg_excl_ship_cert") : null)
.fltsfty(rs.getString("fltsfty")) .uscgExclShipCertDesc(rs.getString("uscg_excl_ship_cert_desc"))
.fltPsc(rs.getString("flt_psc")) .pscInspectionElpsHr(rs.getObject("psc_inspection_elps_hr") != null ? rs.getInt("psc_inspection_elps_hr") : null)
.spcInspectionOvdue(rs.getString("spc_inspection_ovdue")) .pscInspectionElpsHrDesc(rs.getString("psc_inspection_elps_hr_desc"))
.ownrUnk(rs.getString("ownr_unk")) .pscInspection(rs.getObject("psc_inspection") != null ? rs.getInt("psc_inspection") : null)
.rssPortCall(rs.getString("rss_port_call")) .pscInspectionDesc(rs.getString("psc_inspection_desc"))
.rssOwnrReg(rs.getString("rss_ownr_reg")) .pscDefect(rs.getObject("psc_defect") != null ? rs.getInt("psc_defect") : null)
.rssSts(rs.getString("rss_sts")) .pscDefectDesc(rs.getString("psc_defect_desc"))
.pscDetained(rs.getObject("psc_detained") != null ? rs.getInt("psc_detained") : null)
.pscDetainedDesc(rs.getString("psc_detained_desc"))
.nowSmgrcEvdc(rs.getObject("now_smgrc_evdc") != null ? rs.getInt("now_smgrc_evdc") : null)
.nowSmgrcEvdcDesc(rs.getString("now_smgrc_evdc_desc"))
.doccChg(rs.getObject("docc_chg") != null ? rs.getInt("docc_chg") : null)
.doccChgDesc(rs.getString("docc_chg_desc"))
.nowClfic(rs.getObject("now_clfic") != null ? rs.getInt("now_clfic") : null)
.nowClficDesc(rs.getString("now_clfic_desc"))
.nowClficDescExt(rs.getString("now_clfic_desc_ext"))
.clficStatusChg(rs.getObject("clfic_status_chg") != null ? rs.getInt("clfic_status_chg") : null)
.clficStatusChgDesc(rs.getString("clfic_status_chg_desc"))
.pniInsrnc(rs.getObject("pni_insrnc") != null ? rs.getInt("pni_insrnc") : null)
.pniInsrncDesc(rs.getString("pni_insrnc_desc"))
.pniInsrncDescExt(rs.getString("pni_insrnc_desc_ext"))
.shipNmChg(rs.getObject("ship_nm_chg") != null ? rs.getInt("ship_nm_chg") : null)
.shipNmChgDesc(rs.getString("ship_nm_chg_desc"))
.gboChg(rs.getObject("gbo_chg") != null ? rs.getInt("gbo_chg") : null)
.gboChgDesc(rs.getString("gbo_chg_desc"))
.vslage(rs.getObject("vslage") != null ? rs.getInt("vslage") : null)
.vslageDesc(rs.getString("vslage_desc"))
.ilglFshrViol(rs.getObject("ilgl_fshr_viol") != null ? rs.getInt("ilgl_fshr_viol") : null)
.ilglFshrViolDesc(rs.getString("ilgl_fshr_viol_desc"))
.draftChg(rs.getObject("draft_chg") != null ? rs.getInt("draft_chg") : null)
.draftChgDesc(rs.getString("draft_chg_desc"))
.recentSanctionPrtcll(rs.getObject("recent_sanction_prtcll") != null ? rs.getInt("recent_sanction_prtcll") : null)
.recentSanctionPrtcllDesc(rs.getString("recent_sanction_prtcll_desc"))
.snglShipVoy(rs.getObject("sngl_ship_voy") != null ? rs.getInt("sngl_ship_voy") : null)
.snglShipVoyDesc(rs.getString("sngl_ship_voy_desc"))
.fltsfty(rs.getObject("fltsfty") != null ? rs.getInt("fltsfty") : null)
.fltsftyDesc(rs.getString("fltsfty_desc"))
.fltPsc(rs.getObject("flt_psc") != null ? rs.getInt("flt_psc") : null)
.fltPscDesc(rs.getString("flt_psc_desc"))
.spcInspectionOvdue(rs.getObject("spc_inspection_ovdue") != null ? rs.getInt("spc_inspection_ovdue") : null)
.spcInspectionOvdueDesc(rs.getString("spc_inspection_ovdue_desc"))
.ownrUnk(rs.getObject("ownr_unk") != null ? rs.getInt("ownr_unk") : null)
.ownrUnkDesc(rs.getString("ownr_unk_desc"))
.rssPortCall(rs.getObject("rss_port_call") != null ? rs.getInt("rss_port_call") : null)
.rssPortCallDesc(rs.getString("rss_port_call_desc"))
.rssOwnrReg(rs.getObject("rss_ownr_reg") != null ? rs.getInt("rss_ownr_reg") : null)
.rssOwnrRegDesc(rs.getString("rss_ownr_reg_desc"))
.rssSts(rs.getObject("rss_sts") != null ? rs.getInt("rss_sts") : null)
.rssStsDesc(rs.getString("rss_sts_desc"))
.build(); .build();
} }
} }

파일 보기

@ -1,5 +1,6 @@
package com.snp.batch.jobs.datasync.batch.risk.repository; package com.snp.batch.jobs.datasync.batch.risk.repository;
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskChangeEntity;
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskEntity; import com.snp.batch.jobs.datasync.batch.risk.entity.RiskEntity;
import java.util.List; import java.util.List;
@ -9,6 +10,7 @@ import java.util.List;
* 구현체: RiskRepositoryImpl (JdbcTemplate 기반) * 구현체: RiskRepositoryImpl (JdbcTemplate 기반)
*/ */
public interface RiskRepository { public interface RiskRepository {
void saveRisk(List<RiskEntity> riskEntityList); void saveRiskDetail(List<RiskEntity> riskEntityList);
void saveRiskHistory(List<RiskEntity> riskEntityList); void saveRiskDetailHistory(List<RiskEntity> riskEntityList);
void saveRiskDetailChangeHistory(List<RiskChangeEntity> changeEntityList);
} }

파일 보기

@ -2,6 +2,7 @@ package com.snp.batch.jobs.datasync.batch.risk.repository;
import com.snp.batch.common.batch.repository.MultiDataSourceJdbcRepository; import com.snp.batch.common.batch.repository.MultiDataSourceJdbcRepository;
import com.snp.batch.common.util.TableMetaInfo; import com.snp.batch.common.util.TableMetaInfo;
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskChangeEntity;
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskEntity; import com.snp.batch.jobs.datasync.batch.risk.entity.RiskEntity;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
@ -12,6 +13,7 @@ import org.springframework.stereotype.Repository;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.sql.Types;
import java.util.List; import java.util.List;
/** /**
@ -21,145 +23,140 @@ import java.util.List;
@Repository("riskRepository") @Repository("riskRepository")
public class RiskRepositoryImpl extends MultiDataSourceJdbcRepository<RiskEntity, Long> implements RiskRepository { public class RiskRepositoryImpl extends MultiDataSourceJdbcRepository<RiskEntity, Long> implements RiskRepository {
private DataSource batchDataSource;
private DataSource businessDataSource;
private final TableMetaInfo tableMetaInfo; private final TableMetaInfo tableMetaInfo;
public RiskRepositoryImpl(@Qualifier("batchDataSource") DataSource batchDataSource, public RiskRepositoryImpl(@Qualifier("batchDataSource") DataSource batchDataSource,
@Qualifier("businessDataSource") DataSource businessDataSource, @Qualifier("businessDataSource") DataSource businessDataSource,
TableMetaInfo tableMetaInfo) { TableMetaInfo tableMetaInfo) {
super(new JdbcTemplate(batchDataSource), new JdbcTemplate(businessDataSource)); super(new JdbcTemplate(batchDataSource), new JdbcTemplate(businessDataSource));
this.batchDataSource = batchDataSource;
this.businessDataSource = businessDataSource;
this.tableMetaInfo = tableMetaInfo; this.tableMetaInfo = tableMetaInfo;
} }
@Override @Override protected String getTableName() { return null; }
protected String getTableName() { @Override protected RowMapper<RiskEntity> getRowMapper() { return null; }
return null; @Override protected Long extractId(RiskEntity entity) { return null; }
} @Override protected String getInsertSql() { return null; }
@Override protected String getUpdateSql() { return null; }
@Override protected void setInsertParameters(PreparedStatement ps, RiskEntity entity) {}
@Override protected void setUpdateParameters(PreparedStatement ps, RiskEntity entity) {}
@Override protected String getEntityName() { return null; }
@Override @Override
protected RowMapper<RiskEntity> getRowMapper() { public void saveRiskDetail(List<RiskEntity> riskEntityList) {
return null; if (riskEntityList == null || riskEntityList.isEmpty()) return;
} String sql = RiskSql.getRiskDetailUpsertSql(tableMetaInfo.targetTbShipRiskDetailInfo, "imo_no");
@Override
protected Long extractId(RiskEntity entity) {
return null;
}
@Override
protected String getInsertSql() {
return null;
}
@Override
protected String getUpdateSql() {
return null;
}
@Override
protected void setInsertParameters(PreparedStatement ps, RiskEntity entity) throws Exception {
}
@Override
protected void setUpdateParameters(PreparedStatement ps, RiskEntity entity) throws Exception {
}
@Override
protected String getEntityName() {
return null;
}
@Override
public void saveRisk(List<RiskEntity> riskEntityList) {
String sql = RiskSql.getRiskUpsertSql(tableMetaInfo.targetTbShipRiskInfo, "imo_no");
if (riskEntityList == null || riskEntityList.isEmpty()) {
return;
}
// log.debug("{} 배치 삽입 시작: {} 건", "RiskEntity", riskEntityList.size());
batchJdbcTemplate.batchUpdate(sql, riskEntityList, riskEntityList.size(), batchJdbcTemplate.batchUpdate(sql, riskEntityList, riskEntityList.size(),
(ps, entity) -> { (ps, entity) -> { try { bindRiskDetail(ps, entity); } catch (Exception e) { throw new RuntimeException(e); } });
try {
bindRisk(ps, entity);
} catch (Exception e) {
log.error("배치 삽입 파라미터 설정 실패", e);
throw new RuntimeException(e);
}
});
// log.debug("{} 배치 삽입 완료: {} 건", "RiskEntity", riskEntityList.size());
} }
@Override @Override
public void saveRiskHistory(List<RiskEntity> riskEntityList) { public void saveRiskDetailHistory(List<RiskEntity> riskEntityList) {
String sql = RiskSql.getRiskUpsertSql(tableMetaInfo.targetTbShipRiskHstry, "imo_no, last_mdfcn_dt"); if (riskEntityList == null || riskEntityList.isEmpty()) return;
if (riskEntityList == null || riskEntityList.isEmpty()) { String sql = RiskSql.getRiskDetailUpsertSql(tableMetaInfo.targetTbShipRiskDetailHstry, "imo_no, last_mdfcn_dt");
return;
}
// log.debug("{} 배치 삽입 시작: {} 건", "RiskEntity", riskEntityList.size());
batchJdbcTemplate.batchUpdate(sql, riskEntityList, riskEntityList.size(), batchJdbcTemplate.batchUpdate(sql, riskEntityList, riskEntityList.size(),
(ps, entity) -> { try { bindRiskDetail(ps, entity); } catch (Exception e) { throw new RuntimeException(e); } });
}
@Override
public void saveRiskDetailChangeHistory(List<RiskChangeEntity> changeEntityList) {
if (changeEntityList == null || changeEntityList.isEmpty()) return;
String sql = RiskSql.getChangeHistoryInsertSql(tableMetaInfo.targetTbShipRiskDetailInfoHstry);
batchJdbcTemplate.batchUpdate(sql, changeEntityList, changeEntityList.size(),
(ps, entity) -> { (ps, entity) -> {
try { ps.setString(1, entity.getImoNo());
bindRisk(ps, entity); ps.setTimestamp(2, entity.getLastMdfcnDt() != null ? Timestamp.valueOf(entity.getLastMdfcnDt()) : null);
} catch (Exception e) { ps.setString(3, entity.getFlctnColNm());
log.error("배치 삽입 파라미터 설정 실패", e); ps.setString(4, entity.getBfrVal());
throw new RuntimeException(e); ps.setString(5, entity.getAftrVal());
}
}); });
// log.debug("{} 배치 삽입 완료: {} 건", "RiskEntity", riskEntityList.size());
} }
public void bindRisk(PreparedStatement pstmt, RiskEntity entity) throws Exception { private void bindRiskDetail(PreparedStatement ps, RiskEntity e) throws Exception {
int idx = 1; int idx = 1;
pstmt.setString(idx++, "SYSTEM"); // 1. creatr_id ps.setString(idx++, "SYSTEM");
pstmt.setString(idx++, entity.getImoNo()); // 2. imo_no ps.setString(idx++, e.getImoNo());
pstmt.setTimestamp(idx++, entity.getLastMdfcnDt() != null ? Timestamp.valueOf(entity.getLastMdfcnDt()) : null); // 3. last_mdfcn_dt ps.setTimestamp(idx++, e.getLastMdfcnDt() != null ? Timestamp.valueOf(e.getLastMdfcnDt()) : null);
pstmt.setString(idx++, entity.getRiskDataMaint()); // 4. risk_data_maint ps.setObject(idx++, e.getRiskDataMaint(), Types.INTEGER);
pstmt.setString(idx++, entity.getAisNotrcvElpsDays()); // 5. ais_notrcv_elps_days ps.setObject(idx++, e.getAisNotrcvElpsDays(), Types.INTEGER);
pstmt.setString(idx++, entity.getAisLwrnkDays()); // 6. ais_lwrnk_days ps.setString(idx++, e.getAisNotrcvElpsDaysDesc());
pstmt.setString(idx++, entity.getAisUpImoDesc()); // 7. ais_up_imo_desc ps.setObject(idx++, e.getAisLwrnkDays(), Types.INTEGER);
pstmt.setString(idx++, entity.getOthrShipNmVoyYn()); // 8. othr_ship_nm_voy_yn ps.setString(idx++, e.getAisLwrnkDaysDesc());
pstmt.setString(idx++, entity.getMmsiAnomMessage()); // 9. mmsi_anom_message ps.setObject(idx++, e.getAisUpImoDesc(), Types.INTEGER);
pstmt.setString(idx++, entity.getRecentDarkActv()); // 10. recent_dark_actv ps.setString(idx++, e.getAisUpImoDescVal());
pstmt.setString(idx++, entity.getPortPrtcll()); // 11. port_prtcll ps.setObject(idx++, e.getOthrShipNmVoyYn(), Types.INTEGER);
pstmt.setString(idx++, entity.getPortRisk()); // 12. port_risk ps.setString(idx++, e.getOthrShipNmVoyYnDesc());
pstmt.setString(idx++, entity.getStsJob()); // 13. sts_job ps.setObject(idx++, e.getMmsiAnomMessage(), Types.INTEGER);
pstmt.setString(idx++, entity.getDriftChg()); // 14. drift_chg ps.setString(idx++, e.getMmsiAnomMessageDesc());
pstmt.setString(idx++, entity.getRiskEvent()); // 15. risk_event ps.setObject(idx++, e.getRecentDarkActv(), Types.INTEGER);
pstmt.setString(idx++, entity.getNtnltyChg()); // 16. ntnlty_chg ps.setString(idx++, e.getRecentDarkActvDesc());
pstmt.setString(idx++, entity.getNtnltyPrsMouPerf()); // 17. ntnlty_prs_mou_perf ps.setObject(idx++, e.getPortPrtcll(), Types.INTEGER);
pstmt.setString(idx++, entity.getNtnltyTkyMouPerf()); // 18. ntnlty_tky_mou_perf ps.setString(idx++, e.getPortPrtcllDesc());
pstmt.setString(idx++, entity.getNtnltyUscgMouPerf()); // 19. ntnlty_uscg_mou_perf ps.setObject(idx++, e.getPortRisk(), Types.INTEGER);
pstmt.setString(idx++, entity.getUscgExclShipCert()); // 20. uscg_excl_ship_cert ps.setString(idx++, e.getPortRiskDesc());
pstmt.setString(idx++, entity.getPscInspectionElpsHr()); // 21. psc_inspection_elps_hr ps.setObject(idx++, e.getStsJob(), Types.INTEGER);
pstmt.setString(idx++, entity.getPscInspection()); // 22. psc_inspection ps.setString(idx++, e.getStsJobDesc());
pstmt.setString(idx++, entity.getPscDefect()); // 23. psc_defect ps.setObject(idx++, e.getDriftChg(), Types.INTEGER);
pstmt.setString(idx++, entity.getPscDetained()); // 24. psc_detained ps.setString(idx++, e.getDriftChgDesc());
pstmt.setString(idx++, entity.getNowSmgrcEvdc()); // 25. now_smgrc_evdc ps.setObject(idx++, e.getRiskEvent(), Types.INTEGER);
pstmt.setString(idx++, entity.getDoccChg()); // 26. docc_chg ps.setString(idx++, e.getRiskEventDesc());
pstmt.setString(idx++, entity.getNowClfic()); // 27. now_clfic ps.setString(idx++, e.getRiskEventDescExt());
pstmt.setString(idx++, entity.getClficStatusChg()); // 28. clfic_status_chg ps.setObject(idx++, e.getNtnltyChg(), Types.INTEGER);
pstmt.setString(idx++, entity.getPniInsrnc()); // 29. pni_insrnc ps.setString(idx++, e.getNtnltyChgDesc());
pstmt.setString(idx++, entity.getShipNmChg()); // 30. ship_nm_chg ps.setObject(idx++, e.getNtnltyPrsMouPerf(), Types.INTEGER);
pstmt.setString(idx++, entity.getGboChg()); // 31. gbo_chg ps.setString(idx++, e.getNtnltyPrsMouPerfDesc());
pstmt.setString(idx++, entity.getVslage()); // 32. vslage ps.setObject(idx++, e.getNtnltyTkyMouPerf(), Types.INTEGER);
pstmt.setString(idx++, entity.getIlglFshrViol()); // 33. ilgl_fshr_viol ps.setString(idx++, e.getNtnltyTkyMouPerfDesc());
pstmt.setString(idx++, entity.getDraftChg()); // 34. draft_chg ps.setObject(idx++, e.getNtnltyUscgMouPerf(), Types.INTEGER);
pstmt.setString(idx++, entity.getRecentSanctionPrtcll()); // 35. recent_sanction_prtcll ps.setString(idx++, e.getNtnltyUscgMouPerfDesc());
pstmt.setString(idx++, entity.getSnglShipVoy()); // 36. sngl_ship_voy ps.setObject(idx++, e.getUscgExclShipCert(), Types.INTEGER);
pstmt.setString(idx++, entity.getFltsfty()); // 37. fltsfty ps.setString(idx++, e.getUscgExclShipCertDesc());
pstmt.setString(idx++, entity.getFltPsc()); // 38. flt_psc ps.setObject(idx++, e.getPscInspectionElpsHr(), Types.INTEGER);
pstmt.setString(idx++, entity.getSpcInspectionOvdue()); // 39. spc_inspection_ovdue ps.setString(idx++, e.getPscInspectionElpsHrDesc());
pstmt.setString(idx++, entity.getOwnrUnk()); // 40. ownr_unk ps.setObject(idx++, e.getPscInspection(), Types.INTEGER);
pstmt.setString(idx++, entity.getRssPortCall()); // 41. rss_port_call ps.setString(idx++, e.getPscInspectionDesc());
pstmt.setString(idx++, entity.getRssOwnrReg()); // 42. rss_ownr_reg ps.setObject(idx++, e.getPscDefect(), Types.INTEGER);
pstmt.setString(idx++, entity.getRssSts()); // 43. rss_sts ps.setString(idx++, e.getPscDefectDesc());
ps.setObject(idx++, e.getPscDetained(), Types.INTEGER);
ps.setString(idx++, e.getPscDetainedDesc());
ps.setObject(idx++, e.getNowSmgrcEvdc(), Types.INTEGER);
ps.setString(idx++, e.getNowSmgrcEvdcDesc());
ps.setObject(idx++, e.getDoccChg(), Types.INTEGER);
ps.setString(idx++, e.getDoccChgDesc());
ps.setObject(idx++, e.getNowClfic(), Types.INTEGER);
ps.setString(idx++, e.getNowClficDesc());
ps.setString(idx++, e.getNowClficDescExt());
ps.setObject(idx++, e.getClficStatusChg(), Types.INTEGER);
ps.setString(idx++, e.getClficStatusChgDesc());
ps.setObject(idx++, e.getPniInsrnc(), Types.INTEGER);
ps.setString(idx++, e.getPniInsrncDesc());
ps.setString(idx++, e.getPniInsrncDescExt());
ps.setObject(idx++, e.getShipNmChg(), Types.INTEGER);
ps.setString(idx++, e.getShipNmChgDesc());
ps.setObject(idx++, e.getGboChg(), Types.INTEGER);
ps.setString(idx++, e.getGboChgDesc());
ps.setObject(idx++, e.getVslage(), Types.INTEGER);
ps.setString(idx++, e.getVslageDesc());
ps.setObject(idx++, e.getIlglFshrViol(), Types.INTEGER);
ps.setString(idx++, e.getIlglFshrViolDesc());
ps.setObject(idx++, e.getDraftChg(), Types.INTEGER);
ps.setString(idx++, e.getDraftChgDesc());
ps.setObject(idx++, e.getRecentSanctionPrtcll(), Types.INTEGER);
ps.setString(idx++, e.getRecentSanctionPrtcllDesc());
ps.setObject(idx++, e.getSnglShipVoy(), Types.INTEGER);
ps.setString(idx++, e.getSnglShipVoyDesc());
ps.setObject(idx++, e.getFltsfty(), Types.INTEGER);
ps.setString(idx++, e.getFltsftyDesc());
ps.setObject(idx++, e.getFltPsc(), Types.INTEGER);
ps.setString(idx++, e.getFltPscDesc());
ps.setObject(idx++, e.getSpcInspectionOvdue(), Types.INTEGER);
ps.setString(idx++, e.getSpcInspectionOvdueDesc());
ps.setObject(idx++, e.getOwnrUnk(), Types.INTEGER);
ps.setString(idx++, e.getOwnrUnkDesc());
ps.setObject(idx++, e.getRssPortCall(), Types.INTEGER);
ps.setString(idx++, e.getRssPortCallDesc());
ps.setObject(idx++, e.getRssOwnrReg(), Types.INTEGER);
ps.setString(idx++, e.getRssOwnrRegDesc());
ps.setObject(idx++, e.getRssSts(), Types.INTEGER);
ps.setString(idx++, e.getRssStsDesc());
} }
} }

파일 보기

@ -6,85 +6,135 @@ import org.springframework.stereotype.Component;
@Component @Component
public class RiskSql { public class RiskSql {
private static String TARGET_SCHEMA; private static String TARGET_SCHEMA;
public RiskSql(@Value("${app.batch.target-schema.name}") String targetSchema) { public RiskSql(@Value("${app.batch.target-schema.name}") String targetSchema) {
TARGET_SCHEMA = targetSchema; TARGET_SCHEMA = targetSchema;
} }
public static String getRiskUpsertSql(String targetTable, String targetIndex) { /**
* Risk Detail UPSERT SQL (tb_ship_risk_detail_info, tb_ship_risk_detail_hstry 공통)
*/
public static String getRiskDetailUpsertSql(String targetTable, String targetIndex) {
return """ return """
INSERT INTO %s.%s ( INSERT INTO %s.%s (
crt_dt, creatr_id, crt_dt, creatr_id,
imo_no, last_mdfcn_dt, risk_data_maint, ais_notrcv_elps_days, imo_no, last_mdfcn_dt, risk_data_maint,
ais_lwrnk_days, ais_up_imo_desc, othr_ship_nm_voy_yn, mmsi_anom_message, ais_notrcv_elps_days, ais_notrcv_elps_days_desc,
recent_dark_actv, port_prtcll, port_risk, sts_job, ais_lwrnk_days, ais_lwrnk_days_desc,
drift_chg, risk_event, ntnlty_chg, ntnlty_prs_mou_perf, ais_up_imo_desc, ais_up_imo_desc_val,
ntnlty_tky_mou_perf, ntnlty_uscg_mou_perf, uscg_excl_ship_cert, psc_inspection_elps_hr, othr_ship_nm_voy_yn, othr_ship_nm_voy_yn_desc,
psc_inspection, psc_defect, psc_detained, now_smgrc_evdc, mmsi_anom_message, mmsi_anom_message_desc,
docc_chg, now_clfic, clfic_status_chg, pni_insrnc, recent_dark_actv, recent_dark_actv_desc,
ship_nm_chg, gbo_chg, vslage, ilgl_fshr_viol, port_prtcll, port_prtcll_desc,
draft_chg, recent_sanction_prtcll, sngl_ship_voy, fltsfty, port_risk, port_risk_desc,
flt_psc, spc_inspection_ovdue, ownr_unk, rss_port_call, sts_job, sts_job_desc,
rss_ownr_reg, rss_sts drift_chg, drift_chg_desc,
risk_event, risk_event_desc, risk_event_desc_ext,
ntnlty_chg, ntnlty_chg_desc,
ntnlty_prs_mou_perf, ntnlty_prs_mou_perf_desc,
ntnlty_tky_mou_perf, ntnlty_tky_mou_perf_desc,
ntnlty_uscg_mou_perf, ntnlty_uscg_mou_perf_desc,
uscg_excl_ship_cert, uscg_excl_ship_cert_desc,
psc_inspection_elps_hr, psc_inspection_elps_hr_desc,
psc_inspection, psc_inspection_desc,
psc_defect, psc_defect_desc,
psc_detained, psc_detained_desc,
now_smgrc_evdc, now_smgrc_evdc_desc,
docc_chg, docc_chg_desc,
now_clfic, now_clfic_desc, now_clfic_desc_ext,
clfic_status_chg, clfic_status_chg_desc,
pni_insrnc, pni_insrnc_desc, pni_insrnc_desc_ext,
ship_nm_chg, ship_nm_chg_desc,
gbo_chg, gbo_chg_desc,
vslage, vslage_desc,
ilgl_fshr_viol, ilgl_fshr_viol_desc,
draft_chg, draft_chg_desc,
recent_sanction_prtcll, recent_sanction_prtcll_desc,
sngl_ship_voy, sngl_ship_voy_desc,
fltsfty, fltsfty_desc,
flt_psc, flt_psc_desc,
spc_inspection_ovdue, spc_inspection_ovdue_desc,
ownr_unk, ownr_unk_desc,
rss_port_call, rss_port_call_desc,
rss_ownr_reg, rss_ownr_reg_desc,
rss_sts, rss_sts_desc
) )
VALUES ( VALUES (
CURRENT_TIMESTAMP, ?, CURRENT_TIMESTAMP, ?,
?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?
?, ?
) )
ON CONFLICT (%s) ON CONFLICT (%s)
DO UPDATE SET DO UPDATE SET
mdfcn_dt = CURRENT_TIMESTAMP, mdfcn_dt = CURRENT_TIMESTAMP, mdfr_id = 'SYSTEM',
mdfr_id = 'SYSTEM', last_mdfcn_dt = EXCLUDED.last_mdfcn_dt, risk_data_maint = EXCLUDED.risk_data_maint,
last_mdfcn_dt = EXCLUDED.last_mdfcn_dt, ais_notrcv_elps_days = EXCLUDED.ais_notrcv_elps_days, ais_notrcv_elps_days_desc = EXCLUDED.ais_notrcv_elps_days_desc,
risk_data_maint = EXCLUDED.risk_data_maint, ais_lwrnk_days = EXCLUDED.ais_lwrnk_days, ais_lwrnk_days_desc = EXCLUDED.ais_lwrnk_days_desc,
ais_notrcv_elps_days = EXCLUDED.ais_notrcv_elps_days, ais_up_imo_desc = EXCLUDED.ais_up_imo_desc, ais_up_imo_desc_val = EXCLUDED.ais_up_imo_desc_val,
ais_lwrnk_days = EXCLUDED.ais_lwrnk_days, othr_ship_nm_voy_yn = EXCLUDED.othr_ship_nm_voy_yn, othr_ship_nm_voy_yn_desc = EXCLUDED.othr_ship_nm_voy_yn_desc,
ais_up_imo_desc = EXCLUDED.ais_up_imo_desc, mmsi_anom_message = EXCLUDED.mmsi_anom_message, mmsi_anom_message_desc = EXCLUDED.mmsi_anom_message_desc,
othr_ship_nm_voy_yn = EXCLUDED.othr_ship_nm_voy_yn, recent_dark_actv = EXCLUDED.recent_dark_actv, recent_dark_actv_desc = EXCLUDED.recent_dark_actv_desc,
mmsi_anom_message = EXCLUDED.mmsi_anom_message, port_prtcll = EXCLUDED.port_prtcll, port_prtcll_desc = EXCLUDED.port_prtcll_desc,
recent_dark_actv = EXCLUDED.recent_dark_actv, port_risk = EXCLUDED.port_risk, port_risk_desc = EXCLUDED.port_risk_desc,
port_prtcll = EXCLUDED.port_prtcll, sts_job = EXCLUDED.sts_job, sts_job_desc = EXCLUDED.sts_job_desc,
port_risk = EXCLUDED.port_risk, drift_chg = EXCLUDED.drift_chg, drift_chg_desc = EXCLUDED.drift_chg_desc,
sts_job = EXCLUDED.sts_job, risk_event = EXCLUDED.risk_event, risk_event_desc = EXCLUDED.risk_event_desc, risk_event_desc_ext = EXCLUDED.risk_event_desc_ext,
drift_chg = EXCLUDED.drift_chg, ntnlty_chg = EXCLUDED.ntnlty_chg, ntnlty_chg_desc = EXCLUDED.ntnlty_chg_desc,
risk_event = EXCLUDED.risk_event, ntnlty_prs_mou_perf = EXCLUDED.ntnlty_prs_mou_perf, ntnlty_prs_mou_perf_desc = EXCLUDED.ntnlty_prs_mou_perf_desc,
ntnlty_chg = EXCLUDED.ntnlty_chg, ntnlty_tky_mou_perf = EXCLUDED.ntnlty_tky_mou_perf, ntnlty_tky_mou_perf_desc = EXCLUDED.ntnlty_tky_mou_perf_desc,
ntnlty_prs_mou_perf = EXCLUDED.ntnlty_prs_mou_perf, ntnlty_uscg_mou_perf = EXCLUDED.ntnlty_uscg_mou_perf, ntnlty_uscg_mou_perf_desc = EXCLUDED.ntnlty_uscg_mou_perf_desc,
ntnlty_tky_mou_perf = EXCLUDED.ntnlty_tky_mou_perf, uscg_excl_ship_cert = EXCLUDED.uscg_excl_ship_cert, uscg_excl_ship_cert_desc = EXCLUDED.uscg_excl_ship_cert_desc,
ntnlty_uscg_mou_perf = EXCLUDED.ntnlty_uscg_mou_perf, psc_inspection_elps_hr = EXCLUDED.psc_inspection_elps_hr, psc_inspection_elps_hr_desc = EXCLUDED.psc_inspection_elps_hr_desc,
uscg_excl_ship_cert = EXCLUDED.uscg_excl_ship_cert, psc_inspection = EXCLUDED.psc_inspection, psc_inspection_desc = EXCLUDED.psc_inspection_desc,
psc_inspection_elps_hr = EXCLUDED.psc_inspection_elps_hr, psc_defect = EXCLUDED.psc_defect, psc_defect_desc = EXCLUDED.psc_defect_desc,
psc_inspection = EXCLUDED.psc_inspection, psc_detained = EXCLUDED.psc_detained, psc_detained_desc = EXCLUDED.psc_detained_desc,
psc_defect = EXCLUDED.psc_defect, now_smgrc_evdc = EXCLUDED.now_smgrc_evdc, now_smgrc_evdc_desc = EXCLUDED.now_smgrc_evdc_desc,
psc_detained = EXCLUDED.psc_detained, docc_chg = EXCLUDED.docc_chg, docc_chg_desc = EXCLUDED.docc_chg_desc,
now_smgrc_evdc = EXCLUDED.now_smgrc_evdc, now_clfic = EXCLUDED.now_clfic, now_clfic_desc = EXCLUDED.now_clfic_desc, now_clfic_desc_ext = EXCLUDED.now_clfic_desc_ext,
docc_chg = EXCLUDED.docc_chg, clfic_status_chg = EXCLUDED.clfic_status_chg, clfic_status_chg_desc = EXCLUDED.clfic_status_chg_desc,
now_clfic = EXCLUDED.now_clfic, pni_insrnc = EXCLUDED.pni_insrnc, pni_insrnc_desc = EXCLUDED.pni_insrnc_desc, pni_insrnc_desc_ext = EXCLUDED.pni_insrnc_desc_ext,
clfic_status_chg = EXCLUDED.clfic_status_chg, ship_nm_chg = EXCLUDED.ship_nm_chg, ship_nm_chg_desc = EXCLUDED.ship_nm_chg_desc,
pni_insrnc = EXCLUDED.pni_insrnc, gbo_chg = EXCLUDED.gbo_chg, gbo_chg_desc = EXCLUDED.gbo_chg_desc,
ship_nm_chg = EXCLUDED.ship_nm_chg, vslage = EXCLUDED.vslage, vslage_desc = EXCLUDED.vslage_desc,
gbo_chg = EXCLUDED.gbo_chg, ilgl_fshr_viol = EXCLUDED.ilgl_fshr_viol, ilgl_fshr_viol_desc = EXCLUDED.ilgl_fshr_viol_desc,
vslage = EXCLUDED.vslage, draft_chg = EXCLUDED.draft_chg, draft_chg_desc = EXCLUDED.draft_chg_desc,
ilgl_fshr_viol = EXCLUDED.ilgl_fshr_viol, recent_sanction_prtcll = EXCLUDED.recent_sanction_prtcll, recent_sanction_prtcll_desc = EXCLUDED.recent_sanction_prtcll_desc,
draft_chg = EXCLUDED.draft_chg, sngl_ship_voy = EXCLUDED.sngl_ship_voy, sngl_ship_voy_desc = EXCLUDED.sngl_ship_voy_desc,
recent_sanction_prtcll = EXCLUDED.recent_sanction_prtcll, fltsfty = EXCLUDED.fltsfty, fltsfty_desc = EXCLUDED.fltsfty_desc,
sngl_ship_voy = EXCLUDED.sngl_ship_voy, flt_psc = EXCLUDED.flt_psc, flt_psc_desc = EXCLUDED.flt_psc_desc,
fltsfty = EXCLUDED.fltsfty, spc_inspection_ovdue = EXCLUDED.spc_inspection_ovdue, spc_inspection_ovdue_desc = EXCLUDED.spc_inspection_ovdue_desc,
flt_psc = EXCLUDED.flt_psc, ownr_unk = EXCLUDED.ownr_unk, ownr_unk_desc = EXCLUDED.ownr_unk_desc,
spc_inspection_ovdue = EXCLUDED.spc_inspection_ovdue, rss_port_call = EXCLUDED.rss_port_call, rss_port_call_desc = EXCLUDED.rss_port_call_desc,
ownr_unk = EXCLUDED.ownr_unk, rss_ownr_reg = EXCLUDED.rss_ownr_reg, rss_ownr_reg_desc = EXCLUDED.rss_ownr_reg_desc,
rss_port_call = EXCLUDED.rss_port_call, rss_sts = EXCLUDED.rss_sts, rss_sts_desc = EXCLUDED.rss_sts_desc;
rss_ownr_reg = EXCLUDED.rss_ownr_reg,
rss_sts = EXCLUDED.rss_sts;
""".formatted(TARGET_SCHEMA, targetTable, targetIndex); """.formatted(TARGET_SCHEMA, targetTable, targetIndex);
} }
/**
* 변경 이력 INSERT SQL (tb_ship_risk_detail_info_hstry)
*/
public static String getChangeHistoryInsertSql(String targetTable) {
return """
INSERT INTO %s.%s (crt_dt, creatr_id, imo_no, last_mdfcn_dt, flctn_col_nm, bfr_val, aftr_val)
VALUES (CURRENT_TIMESTAMP, 'SYSTEM', ?, ?, ?, ?, ?)
ON CONFLICT (imo_no, last_mdfcn_dt, flctn_col_nm) DO NOTHING
""".formatted(TARGET_SCHEMA, targetTable);
}
/**
* 기존 데이터 조회 SQL (변경 비교용)
*/
public static String getSelectCurrentSql(String targetTable) {
return """
SELECT * FROM %s.%s WHERE imo_no = ?
""".formatted(TARGET_SCHEMA, targetTable);
}
} }

파일 보기

@ -0,0 +1,23 @@
package com.snp.batch.jobs.datasync.batch.risk.writer;
import com.snp.batch.jobs.datasync.batch.risk.entity.RiskChangeEntity;
import com.snp.batch.jobs.datasync.batch.risk.repository.RiskRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import java.util.ArrayList;
@Slf4j
@RequiredArgsConstructor
public class RiskChangeWriter implements ItemWriter<RiskChangeEntity> {
private final RiskRepository riskRepository;
@Override
public void write(Chunk<? extends RiskChangeEntity> chunk) throws Exception {
if (chunk.isEmpty()) return;
riskRepository.saveRiskDetailChangeHistory(new ArrayList<>(chunk.getItems()));
}
}

파일 보기

@ -19,10 +19,8 @@ public class RiskWriter extends BaseChunkedWriter<RiskEntity> {
@Override @Override
protected void writeItems(List<RiskEntity> items) throws Exception { protected void writeItems(List<RiskEntity> items) throws Exception {
if (items.isEmpty()) { if (items.isEmpty()) return;
return; riskRepository.saveRiskDetail(items); // 1. 최신 데이터 UPSERT
} riskRepository.saveRiskDetailHistory(items); // 2. 스냅샷 이력 UPSERT
riskRepository.saveRisk(items);
riskRepository.saveRiskHistory(items);
} }
} }

파일 보기

@ -0,0 +1,63 @@
package com.snp.batch.jobs.maintenance;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import java.time.LocalDateTime;
/**
* 배치 로그 정리 Job
* 보관 기간이 지난 Spring Batch 메타데이터를 삭제
*
* 실행 주기: 매주 일요일 00:00 (Quartz 스케줄)
*/
@Slf4j
@Configuration
@RequiredArgsConstructor
public class BatchLogCleanupJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final BatchLogCleanupRepository cleanupRepository;
@Value("${app.batch.log-retention-days:90}")
private int retentionDays;
@Bean
public Job batchLogCleanupJob() {
return new JobBuilder("batchLogCleanupJob", jobRepository)
.start(batchLogCleanupStep())
.build();
}
@Bean
public Step batchLogCleanupStep() {
return new StepBuilder("batchLogCleanupStep", jobRepository)
.tasklet(batchLogCleanupTasklet(), transactionManager)
.build();
}
@Bean
public Tasklet batchLogCleanupTasklet() {
return (contribution, chunkContext) -> {
LocalDateTime before = LocalDateTime.now().minusDays(retentionDays);
log.info("[LogCleanup] 배치 로그 정리 시작 (보관 기간: {}일, 기준일시: {})", retentionDays, before);
int totalDeleted = cleanupRepository.deleteOldLogs(before);
log.info("[LogCleanup] 배치 로그 정리 완료 (총 {}건 삭제)", totalDeleted);
return RepeatStatus.FINISHED;
};
}
}

파일 보기

@ -0,0 +1,93 @@
package com.snp.batch.jobs.maintenance;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.time.LocalDateTime;
/**
* 배치 로그 삭제 Repository
* FK 제약 조건 순서를 고려하여 삭제
*/
@Slf4j
@Repository
@RequiredArgsConstructor
public class BatchLogCleanupRepository {
private final JdbcTemplate jdbcTemplate;
/**
* 보관 기간이 지난 배치 로그를 삭제
* 삭제 순서: step_context step_execution job_context job_params job_execution job_instance(고아)
*
* @param before 시점 이전의 로그를 삭제
* @return 삭제된
*/
public int deleteOldLogs(LocalDateTime before) {
int total = 0;
// 1. batch_step_execution_context (step_execution FK)
int stepCtx = jdbcTemplate.update("""
DELETE FROM batch_step_execution_context
WHERE STEP_EXECUTION_ID IN (
SELECT se.STEP_EXECUTION_ID
FROM batch_step_execution se
INNER JOIN batch_job_execution je ON se.JOB_EXECUTION_ID = je.JOB_EXECUTION_ID
WHERE je.START_TIME < ?
)
""", before);
log.info("[LogCleanup] batch_step_execution_context: {}건 삭제", stepCtx);
total += stepCtx;
// 2. batch_step_execution
int stepExec = jdbcTemplate.update("""
DELETE FROM batch_step_execution
WHERE JOB_EXECUTION_ID IN (
SELECT JOB_EXECUTION_ID FROM batch_job_execution WHERE START_TIME < ?
)
""", before);
log.info("[LogCleanup] batch_step_execution: {}건 삭제", stepExec);
total += stepExec;
// 3. batch_job_execution_context
int jobCtx = jdbcTemplate.update("""
DELETE FROM batch_job_execution_context
WHERE JOB_EXECUTION_ID IN (
SELECT JOB_EXECUTION_ID FROM batch_job_execution WHERE START_TIME < ?
)
""", before);
log.info("[LogCleanup] batch_job_execution_context: {}건 삭제", jobCtx);
total += jobCtx;
// 4. batch_job_execution_params
int jobParams = jdbcTemplate.update("""
DELETE FROM batch_job_execution_params
WHERE JOB_EXECUTION_ID IN (
SELECT JOB_EXECUTION_ID FROM batch_job_execution WHERE START_TIME < ?
)
""", before);
log.info("[LogCleanup] batch_job_execution_params: {}건 삭제", jobParams);
total += jobParams;
// 5. batch_job_execution
int jobExec = jdbcTemplate.update("""
DELETE FROM batch_job_execution WHERE START_TIME < ?
""", before);
log.info("[LogCleanup] batch_job_execution: {}건 삭제", jobExec);
total += jobExec;
// 6. batch_job_instance (고아 인스턴스)
int jobInst = jdbcTemplate.update("""
DELETE FROM batch_job_instance
WHERE JOB_INSTANCE_ID NOT IN (
SELECT JOB_INSTANCE_ID FROM batch_job_execution
)
""");
log.info("[LogCleanup] batch_job_instance (고아): {}건 삭제", jobInst);
total += jobInst;
return total;
}
}

파일 보기

@ -104,6 +104,7 @@ app:
batch: batch:
chunk-size: 10000 chunk-size: 10000
sub-chunk-size: 5000 # Writer Sub-Chunk 분할 크기 sub-chunk-size: 5000 # Writer Sub-Chunk 분할 크기
log-retention-days: 90 # 배치 로그 보관 기간 (일)
source-schema: source-schema:
name: std_snp_data name: std_snp_data
tables: tables:
@ -151,7 +152,7 @@ app:
movements-008: tb_ship_trnst_hstry movements-008: tb_ship_trnst_hstry
code-001: tb_ship_type_cd code-001: tb_ship_type_cd
code-002: tb_ship_country_cd code-002: tb_ship_country_cd
risk-compliance-001: tb_ship_risk_info risk-compliance-001: tb_ship_risk_detail_info
risk-compliance-003: tb_ship_compliance_info risk-compliance-003: tb_ship_compliance_info
risk-compliance-006: tb_company_compliance_info risk-compliance-006: tb_company_compliance_info
target-schema: target-schema:
@ -201,8 +202,9 @@ app:
movements-008: tb_ship_trnst_hstry movements-008: tb_ship_trnst_hstry
code-001: tb_ship_type_cd code-001: tb_ship_type_cd
code-002: tb_ship_country_cd code-002: tb_ship_country_cd
risk-compliance-001: tb_ship_risk_info risk-compliance-001: tb_ship_risk_detail_info
risk-compliance-002: tb_ship_risk_hstry risk-compliance-001-1: tb_ship_risk_detail_info_hstry
risk-compliance-002: tb_ship_risk_detail_hstry
risk-compliance-003: tb_ship_compliance_info risk-compliance-003: tb_ship_compliance_info
risk-compliance-004: tb_ship_compliance_hstry risk-compliance-004: tb_ship_compliance_hstry
risk-compliance-005: tb_ship_compliance_info_hstry risk-compliance-005: tb_ship_compliance_info_hstry