From a81e68ad3030bcb4c7ef7f176b27a2c25c6dfe96 Mon Sep 17 00:00:00 2001 From: HYOJIN Date: Tue, 24 Mar 2026 09:33:44 +0900 Subject: [PATCH] =?UTF-8?q?fix(batch):=201=20chunk=20=3D=201=20job=5Fexecu?= =?UTF-8?q?tion=5Fid=20=EB=B3=B4=EC=9E=A5=20=EB=B0=8F=20BatchWriteListener?= =?UTF-8?q?=20SQL=20null=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - BaseSyncReader: 한 그룹 데이터 반환 후 null로 청크 종료하여 1 chunk = 1 job_execution_id 보장 - chunk(Integer.MAX_VALUE): Reader의 null로만 청크 경계 결정 - BatchWriteListener: SQL을 실행 시점에 생성하여 CommonSql.SOURCE_SCHEMA 초기화 전 null 참조 문제 해결 (빈 생성 시 → 실행 시 지연 생성) - JobConfig: BatchWriteListener에 SQL 대신 테이블명 전달 - Repository: 배치 삽입 시작/완료 로그 주석처리 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../common/batch/config/BaseJobConfig.java | 4 +- .../common/batch/processor/BaseProcessor.java | 2 +- .../common/batch/reader/BaseSyncReader.java | 77 ++++------ .../batch/repository/BaseJdbcRepository.java | 4 +- .../MultiDataSourceJdbcRepository.java | 4 +- .../batch/common/util/BatchWriteListener.java | 26 ++-- .../batch/code/config/CodeSyncJobConfig.java | 12 +- .../code/repository/CodeRepositoryImpl.java | 8 +- .../CompanyComplianceSyncJobConfig.java | 7 +- .../config/ShipComplianceSyncJobConfig.java | 7 +- .../repository/ComplianceRepositoryImpl.java | 16 +-- .../event/config/EventSyncJobConfig.java | 22 ++- .../event/repository/EventRepositoryImpl.java | 16 +-- .../config/FacilitySyncJobConfig.java | 7 +- .../repository/FacilityRepositoryImpl.java | 4 +- .../config/AnchorageCallSyncJobConfig.java | 7 +- .../config/BerthCallSyncJobConfig.java | 7 +- .../config/CurrentlyAtSyncJobConfig.java | 7 +- .../config/DestinationSyncJobConfig.java | 7 +- .../config/PortCallSyncJobConfig.java | 7 +- .../config/StsOperationSyncJobConfig.java | 7 +- .../config/TerminalCallSyncJobConfig.java | 7 +- .../movement/config/TransitSyncJobConfig.java | 7 +- .../repository/MovementRepositoryImpl.java | 32 ++--- .../batch/psc/config/PscSyncJobConfig.java | 17 +-- .../psc/repository/PscRepositoryImpl.java | 12 +- .../batch/risk/config/RiskSyncJobConfig.java | 7 +- .../risk/repository/RiskRepositoryImpl.java | 8 +- .../ship/config/ShipDetailSyncJobConfig.java | 132 +++++++----------- .../ship/repository/ShipRepositoryImpl.java | 100 ++++++------- 30 files changed, 254 insertions(+), 326 deletions(-) diff --git a/src/main/java/com/snp/batch/common/batch/config/BaseJobConfig.java b/src/main/java/com/snp/batch/common/batch/config/BaseJobConfig.java index bace6af..ba87605 100644 --- a/src/main/java/com/snp/batch/common/batch/config/BaseJobConfig.java +++ b/src/main/java/com/snp/batch/common/batch/config/BaseJobConfig.java @@ -94,7 +94,7 @@ public abstract class BaseJobConfig { if (processor != null) { var chunkBuilder = stepBuilder - .chunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(createReader()) .processor(processor) .writer(createWriter()); @@ -104,7 +104,7 @@ public abstract class BaseJobConfig { } else { @SuppressWarnings("unchecked") var chunkBuilder = stepBuilder - .chunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(createReader()) .writer((ItemWriter) createWriter()); diff --git a/src/main/java/com/snp/batch/common/batch/processor/BaseProcessor.java b/src/main/java/com/snp/batch/common/batch/processor/BaseProcessor.java index 0add9cc..a9ad40c 100644 --- a/src/main/java/com/snp/batch/common/batch/processor/BaseProcessor.java +++ b/src/main/java/com/snp/batch/common/batch/processor/BaseProcessor.java @@ -55,7 +55,7 @@ public abstract class BaseProcessor implements ItemProcessor { return null; } - log.debug("데이터 처리 중: {}", item); +// log.debug("데이터 처리 중: {}", item); return processItem(item); } } diff --git a/src/main/java/com/snp/batch/common/batch/reader/BaseSyncReader.java b/src/main/java/com/snp/batch/common/batch/reader/BaseSyncReader.java index d89662b..7e47ecd 100644 --- a/src/main/java/com/snp/batch/common/batch/reader/BaseSyncReader.java +++ b/src/main/java/com/snp/batch/common/batch/reader/BaseSyncReader.java @@ -16,11 +16,10 @@ import java.util.List; /** * 동기화 Reader 추상 클래스 * - * job_execution_id 단위로 청크를 분리하는 2단계 read 방식: - * 1단계 (peek): 다음 그룹 ID만 조회, 현재 그룹과 다르면 null 반환 (청크 종료) - * 2단계 (fetch): 데이터 로드 + batch_flag N→P 전환 - * - * GroupByExecutionIdPolicy를 대체하여 Reader 자체에서 청크 경계를 제어한다. + * 1 chunk = 1 job_execution_id 보장: + * - 한 그룹의 데이터를 모두 반환한 후 null을 반환하여 청크 종료 + * - chunk(Integer.MAX_VALUE)와 함께 사용하여 Reader가 청크 경계를 제어 + * - 다음 그룹의 N→P 전환은 이전 그룹의 청크 처리(Write + P→S)가 완료된 후에만 발생 * * @param DTO 타입 (JobExecutionGroupable 구현 필요) */ @@ -32,7 +31,6 @@ public abstract class BaseSyncReader implements private List allDataBuffer = new ArrayList<>(); private Long currentGroupId = null; - private Long pendingGroupId = null; protected BaseSyncReader(DataSource businessDataSource, TableMetaInfo tableMetaInfo) { this.businessJdbcTemplate = new JdbcTemplate(businessDataSource); @@ -46,82 +44,55 @@ public abstract class BaseSyncReader implements /** * ResultSet → DTO 매핑 - * - * @param rs ResultSet (현재 row) - * @param targetId job_execution_id (DTO의 jobExecutionId 필드에 설정) - * @return 매핑된 DTO 객체 */ protected abstract T mapRow(ResultSet rs, Long targetId) throws SQLException; - /** - * 로그 접두사 (예: "IceClassReader") - */ protected String getLogPrefix() { return getClass().getSimpleName(); } @Override public T read() throws Exception { - // 1. buffer가 비어있으면 다음 그룹 확인 if (allDataBuffer.isEmpty()) { - // pending이 있으면 (이전 청크에서 감지된 다음 그룹) 바로 로드 - if (pendingGroupId != null) { - fetchAndTransition(pendingGroupId); - currentGroupId = pendingGroupId; - pendingGroupId = null; - } else { - // 다음 그룹 ID peek - Long nextId = peekNextGroupId(); - if (nextId == null) { - // 더 이상 처리할 데이터 없음 - currentGroupId = null; - return null; - } - - if (currentGroupId != null && !currentGroupId.equals(nextId)) { - // 다른 그룹 발견 → 현재 청크 종료, 다음 청크에서 처리 - pendingGroupId = nextId; - currentGroupId = null; - return null; - } - - // 같은 그룹이거나 첫 호출 → 로드 - fetchAndTransition(nextId); - currentGroupId = nextId; + // 이전 그룹 처리 완료 → null 반환하여 청크 종료 + // (Writer + afterWrite(P→S)가 실행된 후 다음 청크에서 다음 그룹 로드) + if (currentGroupId != null) { + currentGroupId = null; + return null; } + + // 다음 그룹 로드 + fetchNextGroup(); } if (allDataBuffer.isEmpty()) { - return null; + return null; // 더 이상 처리할 데이터 없음 → Step 종료 } return allDataBuffer.remove(0); } - /** - * 다음 처리 대상 job_execution_id 조회 (데이터 로드/전환 없음) - */ - private Long peekNextGroupId() { + private void fetchNextGroup() { + Long nextTargetId; try { - return businessJdbcTemplate.queryForObject( + nextTargetId = businessJdbcTemplate.queryForObject( CommonSql.getNextTargetQuery(getSourceTable()), Long.class); } catch (Exception e) { - return null; + return; } - } - /** - * 데이터 로드 + batch_flag N→P 전환 - */ - private void fetchAndTransition(Long targetId) { - log.info("[{}] 다음 처리 대상 ID 발견: {}", getLogPrefix(), targetId); + if (nextTargetId == null) return; + + log.info("[{}] 다음 처리 대상 ID 발견: {}", getLogPrefix(), nextTargetId); String sql = CommonSql.getTargetDataQuery(getSourceTable()); this.allDataBuffer = businessJdbcTemplate.query(sql, (rs, rowNum) -> - mapRow(rs, targetId), targetId); + mapRow(rs, nextTargetId), nextTargetId); // N→P 전환 String updateSql = CommonSql.getProcessBatchQuery(getSourceTable()); - businessJdbcTemplate.update(updateSql, targetId); + businessJdbcTemplate.update(updateSql, nextTargetId); + + currentGroupId = nextTargetId; } } diff --git a/src/main/java/com/snp/batch/common/batch/repository/BaseJdbcRepository.java b/src/main/java/com/snp/batch/common/batch/repository/BaseJdbcRepository.java index 76b0a5a..bf5797b 100644 --- a/src/main/java/com/snp/batch/common/batch/repository/BaseJdbcRepository.java +++ b/src/main/java/com/snp/batch/common/batch/repository/BaseJdbcRepository.java @@ -203,7 +203,7 @@ public abstract class BaseJdbcRepository { return; } - log.debug("{} 배치 삽입 시작: {} 건", getEntityName(), entities.size()); + // log.debug("{} 배치 삽입 시작: {} 건", getEntityName(), entities.size()); jdbcTemplate.batchUpdate(getInsertSql(), entities, entities.size(), (ps, entity) -> { @@ -215,7 +215,7 @@ public abstract class BaseJdbcRepository { } }); - log.debug("{} 배치 삽입 완료: {} 건", getEntityName(), entities.size()); + // log.debug("{} 배치 삽입 완료: {} 건", getEntityName(), entities.size()); } /** diff --git a/src/main/java/com/snp/batch/common/batch/repository/MultiDataSourceJdbcRepository.java b/src/main/java/com/snp/batch/common/batch/repository/MultiDataSourceJdbcRepository.java index c83e1b8..5fd4235 100644 --- a/src/main/java/com/snp/batch/common/batch/repository/MultiDataSourceJdbcRepository.java +++ b/src/main/java/com/snp/batch/common/batch/repository/MultiDataSourceJdbcRepository.java @@ -78,7 +78,7 @@ public abstract class MultiDataSourceJdbcRepository { return; } - log.debug("{} 배치 삽입 시작: {} 건 (Business DB)", getEntityName(), entities.size()); + // log.debug("{} 배치 삽입 시작: {} 건 (Business DB)", getEntityName(), entities.size()); // ⭐ businessJdbcTemplate 사용 businessJdbcTemplate.batchUpdate(getInsertSql(), entities, entities.size(), @@ -91,7 +91,7 @@ public abstract class MultiDataSourceJdbcRepository { } }); - log.debug("{} 배치 삽입 완료: {} 건", getEntityName(), entities.size()); + // log.debug("{} 배치 삽입 완료: {} 건", getEntityName(), entities.size()); } // ... (나머지 find, save, update, delete 메서드도 businessJdbcTemplate을 사용하여 구현합니다.) diff --git a/src/main/java/com/snp/batch/common/util/BatchWriteListener.java b/src/main/java/com/snp/batch/common/util/BatchWriteListener.java index 86daf25..cc4ffbc 100644 --- a/src/main/java/com/snp/batch/common/util/BatchWriteListener.java +++ b/src/main/java/com/snp/batch/common/util/BatchWriteListener.java @@ -1,51 +1,55 @@ package com.snp.batch.common.util; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.ItemWriteListener; import org.springframework.batch.item.Chunk; import org.springframework.jdbc.core.JdbcTemplate; +/** + * Writer 성공 후 batch_flag P→S 업데이트 리스너 + * + * SQL은 실행 시점에 생성 (CommonSql.SOURCE_SCHEMA 초기화 보장) + */ @Slf4j -@RequiredArgsConstructor public class BatchWriteListener implements ItemWriteListener { private final JdbcTemplate businessJdbcTemplate; - private final String updateSql; // 실행할 쿼리 (예: "UPDATE ... SET batch_flag = 'S' ...") + private final String sourceTable; + + public BatchWriteListener(JdbcTemplate businessJdbcTemplate, String sourceTable) { + this.businessJdbcTemplate = businessJdbcTemplate; + this.sourceTable = sourceTable; + } @Override public void afterWrite(Chunk items) { - // afterWrite는 Writer가 예외 없이 성공했을 때만 실행되는 것이 보장되어야 함 if (items.isEmpty()) return; Long jobExecutionId = items.getItems().get(0).getJobExecutionId(); try { - int updatedRows = businessJdbcTemplate.update(updateSql, jobExecutionId); + // SQL을 실행 시점에 생성하여 SOURCE_SCHEMA null 문제 방지 + String sql = CommonSql.getCompleteBatchQuery(sourceTable); + int updatedRows = businessJdbcTemplate.update(sql, jobExecutionId); log.info("[BatchWriteListener] Success update 'S'. jobExecutionId: {}, rows: {}", jobExecutionId, updatedRows); } catch (Exception e) { log.error("[BatchWriteListener] Update 'S' failed. jobExecutionId: {}", jobExecutionId, e); - // ❗중요: 리스너의 업데이트가 실패해도 배치를 중단시키려면 예외를 던져야 함 throw e; } } @Override public void onWriteError(Exception exception, Chunk items) { - // ⭐ Writer에서 에러가 발생하면 이 메서드가 호출됨 if (!items.isEmpty()) { Long jobExecutionId = items.getItems().get(0).getJobExecutionId(); log.error("[BatchWriteListener] Write Error Detected! jobExecutionId: {}. Status will NOT be updated to 'S'. Error: {}", jobExecutionId, exception.getMessage()); } - // ❗중요: 여기서 예외를 다시 던져야 배치가 중단(FAILED)됨 - // 만약 여기서 예외를 던지지 않으면 배치는 다음 청크를 계속 시도할 수 있음 if (exception instanceof RuntimeException) { throw (RuntimeException) exception; } else { throw new RuntimeException("Force stop batch due to write error", exception); } } - -} \ No newline at end of file +} diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/code/config/CodeSyncJobConfig.java b/src/main/java/com/snp/batch/jobs/datasync/batch/code/config/CodeSyncJobConfig.java index 60eab6a..815c89b 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/code/config/CodeSyncJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/code/config/CodeSyncJobConfig.java @@ -2,7 +2,7 @@ package com.snp.batch.jobs.datasync.batch.code.config; import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.util.BatchWriteListener; -import com.snp.batch.common.util.CommonSql; + import com.snp.batch.common.util.TableMetaInfo; import com.snp.batch.jobs.datasync.batch.code.dto.FlagCodeDto; import com.snp.batch.jobs.datasync.batch.code.dto.Stat5CodeDto; @@ -110,14 +110,12 @@ public class CodeSyncJobConfig extends BaseJobConfig flagCodeWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceFlagCode); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceFlagCode); } @Bean public BatchWriteListener stat5CodeWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceStat5Code); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceStat5Code); } // --- Steps --- @@ -126,7 +124,7 @@ public class CodeSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(createReader()) .processor(createProcessor()) .writer(createWriter()) @@ -138,7 +136,7 @@ public class CodeSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(stat5CodeReader(businessDataSource, tableMetaInfo)) .processor(new Stat5CodeProcessor()) .writer(new Stat5CodeWriter(codeRepository)) diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/code/repository/CodeRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/datasync/batch/code/repository/CodeRepositoryImpl.java index 3890621..2553a7b 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/code/repository/CodeRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/code/repository/CodeRepositoryImpl.java @@ -80,7 +80,7 @@ public class CodeRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -92,7 +92,7 @@ public class CodeRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -123,7 +123,7 @@ public class CodeRepositoryImpl extends MultiDataSourceJdbcRepository companyComplianceWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceTbCompanyComplianceInfo); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceTbCompanyComplianceInfo); } // --- Steps --- @@ -106,7 +105,7 @@ public class CompanyComplianceSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(createReader()) .processor(createProcessor()) .writer(createWriter()) diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/compliance/config/ShipComplianceSyncJobConfig.java b/src/main/java/com/snp/batch/jobs/datasync/batch/compliance/config/ShipComplianceSyncJobConfig.java index 4ecaca4..0916db8 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/compliance/config/ShipComplianceSyncJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/compliance/config/ShipComplianceSyncJobConfig.java @@ -2,7 +2,7 @@ package com.snp.batch.jobs.datasync.batch.compliance.config; import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.util.BatchWriteListener; -import com.snp.batch.common.util.CommonSql; + import com.snp.batch.common.util.TableMetaInfo; import com.snp.batch.jobs.datasync.batch.compliance.dto.ShipComplianceDto; import com.snp.batch.jobs.datasync.batch.compliance.entity.ShipComplianceEntity; @@ -96,8 +96,7 @@ public class ShipComplianceSyncJobConfig extends BaseJobConfig shipComplianceWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceCompliance); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceCompliance); } // --- Steps --- @@ -106,7 +105,7 @@ public class ShipComplianceSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(createReader()) .processor(createProcessor()) .writer(createWriter()) diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/compliance/repository/ComplianceRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/datasync/batch/compliance/repository/ComplianceRepositoryImpl.java index e6b0251..3f6b343 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/compliance/repository/ComplianceRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/compliance/repository/ComplianceRepositoryImpl.java @@ -80,7 +80,7 @@ public class ComplianceRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -92,7 +92,7 @@ public class ComplianceRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -134,7 +134,7 @@ public class ComplianceRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -293,7 +293,7 @@ public class ComplianceRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -324,7 +324,7 @@ public class ComplianceRepositoryImpl extends MultiDataSourceJdbcRepository { @Bean public BatchWriteListener eventWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceEvent); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceEvent); } @Bean public BatchWriteListener eventCargoWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceEventCargo); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceEventCargo); } @Bean public BatchWriteListener eventHumanCasualtyWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceEventHumanCasualty); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceEventHumanCasualty); } @Bean public BatchWriteListener eventRelationshipWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceEventRelationship); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceEventRelationship); } // --- Steps --- @@ -172,7 +168,7 @@ public class EventSyncJobConfig extends BaseJobConfig { public Step eventSyncStep() { log.info("Step 생성: eventSyncStep"); return new StepBuilder(getStepName(), jobRepository) - .chunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(createReader()) .processor(createProcessor()) .writer(createWriter()) @@ -184,7 +180,7 @@ public class EventSyncJobConfig extends BaseJobConfig { public Step eventCargoSyncStep() { log.info("Step 생성: eventCargoSyncStep"); return new StepBuilder("eventCargoSyncStep", jobRepository) - .chunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(eventCargoReader(businessDataSource, tableMetaInfo)) .processor(new EventCargoProcessor()) .writer(new EventCargoWriter(eventRepository, transactionManager, subChunkSize)) @@ -196,7 +192,7 @@ public class EventSyncJobConfig extends BaseJobConfig { public Step eventHumanCasualtySyncStep() { log.info("Step 생성: eventHumanCasualtySyncStep"); return new StepBuilder("eventHumanCasualtySyncStep", jobRepository) - .chunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(eventHumanCasualtyReader(businessDataSource, tableMetaInfo)) .processor(new EventHumanCasualtyProcessor()) .writer(new EventHumanCasualtyWriter(eventRepository, transactionManager, subChunkSize)) @@ -208,7 +204,7 @@ public class EventSyncJobConfig extends BaseJobConfig { public Step eventRelationshipSyncStep() { log.info("Step 생성: eventRelationshipSyncStep"); return new StepBuilder("eventRelationshipSyncStep", jobRepository) - .chunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(eventRelationshipReader(businessDataSource, tableMetaInfo)) .processor(new EventRelationshipProcessor()) .writer(new EventRelationshipWriter(eventRepository, transactionManager, subChunkSize)) diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/event/repository/EventRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/datasync/batch/event/repository/EventRepositoryImpl.java index 7c24111..b5ef42d 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/event/repository/EventRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/event/repository/EventRepositoryImpl.java @@ -84,7 +84,7 @@ public class EventRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -96,7 +96,7 @@ public class EventRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -164,7 +164,7 @@ public class EventRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -200,7 +200,7 @@ public class EventRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -231,7 +231,7 @@ public class EventRepositoryImpl extends MultiDataSourceJdbcRepository facilityPortWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceFacilityPort); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceFacilityPort); } // --- Steps --- @@ -109,7 +108,7 @@ public class FacilitySyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(createReader()) .processor(createProcessor()) .writer(createWriter()) diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/facility/repository/FacilityRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/datasync/batch/facility/repository/FacilityRepositoryImpl.java index 67292e4..318ee4d 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/facility/repository/FacilityRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/facility/repository/FacilityRepositoryImpl.java @@ -81,7 +81,7 @@ public class FacilityRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -93,7 +93,7 @@ public class FacilityRepositoryImpl extends MultiDataSourceJdbcRepository anchorageCallWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceTAnchorageCall); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceTAnchorageCall); } @Bean(name = "anchorageCallSyncStep") public Step anchorageCallSyncStep() { log.info("Step 생성: anchorageCallSyncStep"); return new StepBuilder(getStepName(), jobRepository) - .chunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(createReader()) .processor(createProcessor()) .writer(createWriter()) diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/BerthCallSyncJobConfig.java b/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/BerthCallSyncJobConfig.java index 2ce0451..3ee3251 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/BerthCallSyncJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/BerthCallSyncJobConfig.java @@ -2,7 +2,7 @@ package com.snp.batch.jobs.datasync.batch.movement.config; import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.util.BatchWriteListener; -import com.snp.batch.common.util.CommonSql; + import com.snp.batch.common.util.TableMetaInfo; import com.snp.batch.jobs.datasync.batch.movement.dto.BerthCallDto; import com.snp.batch.jobs.datasync.batch.movement.entity.BerthCallEntity; @@ -92,15 +92,14 @@ public class BerthCallSyncJobConfig extends BaseJobConfig berthCallWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceTBerthCall); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceTBerthCall); } @Bean(name = "berthCallSyncStep") public Step berthCallSyncStep() { log.info("Step 생성: berthCallSyncStep"); return new StepBuilder(getStepName(), jobRepository) - .chunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(createReader()) .processor(createProcessor()) .writer(createWriter()) diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/CurrentlyAtSyncJobConfig.java b/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/CurrentlyAtSyncJobConfig.java index 603f9b7..c6a6e2b 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/CurrentlyAtSyncJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/CurrentlyAtSyncJobConfig.java @@ -2,7 +2,7 @@ package com.snp.batch.jobs.datasync.batch.movement.config; import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.util.BatchWriteListener; -import com.snp.batch.common.util.CommonSql; + import com.snp.batch.common.util.TableMetaInfo; import com.snp.batch.jobs.datasync.batch.movement.dto.CurrentlyAtDto; import com.snp.batch.jobs.datasync.batch.movement.entity.CurrentlyAtEntity; @@ -92,15 +92,14 @@ public class CurrentlyAtSyncJobConfig extends BaseJobConfig currentlyAtWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceTCurrentlyAt); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceTCurrentlyAt); } @Bean(name = "currentlyAtSyncStep") public Step currentlyAtSyncStep() { log.info("Step 생성: currentlyAtSyncStep"); return new StepBuilder(getStepName(), jobRepository) - .chunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(createReader()) .processor(createProcessor()) .writer(createWriter()) diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/DestinationSyncJobConfig.java b/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/DestinationSyncJobConfig.java index 0c0b607..11f24c0 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/DestinationSyncJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/DestinationSyncJobConfig.java @@ -2,7 +2,7 @@ package com.snp.batch.jobs.datasync.batch.movement.config; import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.util.BatchWriteListener; -import com.snp.batch.common.util.CommonSql; + import com.snp.batch.common.util.TableMetaInfo; import com.snp.batch.jobs.datasync.batch.movement.dto.DestinationDto; import com.snp.batch.jobs.datasync.batch.movement.entity.DestinationEntity; @@ -92,15 +92,14 @@ public class DestinationSyncJobConfig extends BaseJobConfig destinationWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceTDestination); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceTDestination); } @Bean(name = "destinationSyncStep") public Step destinationSyncStep() { log.info("Step 생성: destinationSyncStep"); return new StepBuilder(getStepName(), jobRepository) - .chunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(createReader()) .processor(createProcessor()) .writer(createWriter()) diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/PortCallSyncJobConfig.java b/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/PortCallSyncJobConfig.java index e922e76..1dfb761 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/PortCallSyncJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/PortCallSyncJobConfig.java @@ -2,7 +2,7 @@ package com.snp.batch.jobs.datasync.batch.movement.config; import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.util.BatchWriteListener; -import com.snp.batch.common.util.CommonSql; + import com.snp.batch.common.util.TableMetaInfo; import com.snp.batch.jobs.datasync.batch.movement.dto.PortCallDto; import com.snp.batch.jobs.datasync.batch.movement.entity.PortCallEntity; @@ -92,15 +92,14 @@ public class PortCallSyncJobConfig extends BaseJobConfig portCallWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceTShipStpovInfo); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceTShipStpovInfo); } @Bean(name = "portCallSyncStep") public Step portCallSyncStep() { log.info("Step 생성: portCallSyncStep"); return new StepBuilder(getStepName(), jobRepository) - .chunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(createReader()) .processor(createProcessor()) .writer(createWriter()) diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/StsOperationSyncJobConfig.java b/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/StsOperationSyncJobConfig.java index 15d87ce..25f7199 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/StsOperationSyncJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/StsOperationSyncJobConfig.java @@ -2,7 +2,7 @@ package com.snp.batch.jobs.datasync.batch.movement.config; import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.util.BatchWriteListener; -import com.snp.batch.common.util.CommonSql; + import com.snp.batch.common.util.TableMetaInfo; import com.snp.batch.jobs.datasync.batch.movement.dto.StsOperationDto; import com.snp.batch.jobs.datasync.batch.movement.entity.StsOperationEntity; @@ -92,15 +92,14 @@ public class StsOperationSyncJobConfig extends BaseJobConfig stsOperationWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceTStsOperation); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceTStsOperation); } @Bean(name = "stsOperationSyncStep") public Step stsOperationSyncStep() { log.info("Step 생성: stsOperationSyncStep"); return new StepBuilder(getStepName(), jobRepository) - .chunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(createReader()) .processor(createProcessor()) .writer(createWriter()) diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/TerminalCallSyncJobConfig.java b/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/TerminalCallSyncJobConfig.java index d4ea182..676c9b2 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/TerminalCallSyncJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/TerminalCallSyncJobConfig.java @@ -2,7 +2,7 @@ package com.snp.batch.jobs.datasync.batch.movement.config; import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.util.BatchWriteListener; -import com.snp.batch.common.util.CommonSql; + import com.snp.batch.common.util.TableMetaInfo; import com.snp.batch.jobs.datasync.batch.movement.dto.TerminalCallDto; import com.snp.batch.jobs.datasync.batch.movement.entity.TerminalCallEntity; @@ -92,15 +92,14 @@ public class TerminalCallSyncJobConfig extends BaseJobConfig terminalCallWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceTTerminalCall); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceTTerminalCall); } @Bean(name = "terminalCallSyncStep") public Step terminalCallSyncStep() { log.info("Step 생성: terminalCallSyncStep"); return new StepBuilder(getStepName(), jobRepository) - .chunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(createReader()) .processor(createProcessor()) .writer(createWriter()) diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/TransitSyncJobConfig.java b/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/TransitSyncJobConfig.java index b2b8398..d330042 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/TransitSyncJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/movement/config/TransitSyncJobConfig.java @@ -2,7 +2,7 @@ package com.snp.batch.jobs.datasync.batch.movement.config; import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.util.BatchWriteListener; -import com.snp.batch.common.util.CommonSql; + import com.snp.batch.common.util.TableMetaInfo; import com.snp.batch.jobs.datasync.batch.movement.dto.TransitDto; import com.snp.batch.jobs.datasync.batch.movement.entity.TransitEntity; @@ -92,15 +92,14 @@ public class TransitSyncJobConfig extends BaseJobConfig transitWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceTTransit); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceTTransit); } @Bean(name = "transitSyncStep") public Step transitSyncStep() { log.info("Step 생성: transitSyncStep"); return new StepBuilder(getStepName(), jobRepository) - .chunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(createReader()) .processor(createProcessor()) .writer(createWriter()) diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/movement/repository/MovementRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/datasync/batch/movement/repository/MovementRepositoryImpl.java index 8a6b01f..b9a66bb 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/movement/repository/MovementRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/movement/repository/MovementRepositoryImpl.java @@ -80,7 +80,7 @@ public class MovementRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -92,7 +92,7 @@ public class MovementRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -137,7 +137,7 @@ public class MovementRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -182,7 +182,7 @@ public class MovementRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -230,7 +230,7 @@ public class MovementRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -269,7 +269,7 @@ public class MovementRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -317,7 +317,7 @@ public class MovementRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -363,7 +363,7 @@ public class MovementRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -411,7 +411,7 @@ public class MovementRepositoryImpl extends MultiDataSourceJdbcRepository pscDetailWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourcePscDetail); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourcePscDetail); } @Bean public BatchWriteListener pscDefectWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourcePscDefect); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourcePscDefect); } @Bean public BatchWriteListener pscAllCertificateWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourcePscAllCertificate); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourcePscAllCertificate); } // --- Steps --- @@ -151,7 +148,7 @@ public class PscSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(createReader()) .processor(createProcessor()) .writer(createWriter()) @@ -163,7 +160,7 @@ public class PscSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(pscDefectReader(businessDataSource, tableMetaInfo)) .processor(new PscDefectProcessor()) .writer(new PscDefectWriter(pscRepository, transactionManager, subChunkSize)) @@ -175,7 +172,7 @@ public class PscSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(pscAllCertificateReader(businessDataSource, tableMetaInfo)) .processor(new PscAllCertificateProcessor()) .writer(new PscAllCertificateWriter(pscRepository, transactionManager, subChunkSize)) diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/psc/repository/PscRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/datasync/batch/psc/repository/PscRepositoryImpl.java index d6f292f..43e4dd4 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/psc/repository/PscRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/psc/repository/PscRepositoryImpl.java @@ -83,7 +83,7 @@ public class PscRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -95,7 +95,7 @@ public class PscRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -151,7 +151,7 @@ public class PscRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -201,7 +201,7 @@ public class PscRepositoryImpl extends MultiDataSourceJdbcRepository { @Bean public BatchWriteListener riskWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceRisk); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceRisk); } // --- Steps --- @@ -109,7 +108,7 @@ public class RiskSyncJobConfig extends BaseJobConfig { public Step riskSyncStep() { log.info("Step 생성: riskSyncStep"); return new StepBuilder(getStepName(), jobRepository) - .chunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(createReader()) .processor(createProcessor()) .writer(createWriter()) diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/repository/RiskRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/repository/RiskRepositoryImpl.java index fbc101f..9ac442b 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/risk/repository/RiskRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/risk/repository/RiskRepositoryImpl.java @@ -80,7 +80,7 @@ public class RiskRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -92,7 +92,7 @@ public class RiskRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -113,7 +113,7 @@ public class RiskRepositoryImpl extends MultiDataSourceJdbcRepository shipWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceShipDetailData); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceShipDetailData); } @Bean public BatchWriteListener ownerHistoryWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceOwnerHistory); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceOwnerHistory); } @Bean public BatchWriteListener shipAddInfoWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceAdditionalShipsData); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceAdditionalShipsData); } @Bean public BatchWriteListener bareboatCharterHistoryWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceBareboatCharterHistory); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceBareboatCharterHistory); } @Bean public BatchWriteListener callsignAndMmsiHistoryWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceCallsignAndMmsiHistory); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceCallsignAndMmsiHistory); } @Bean public BatchWriteListener classHistoryWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceClassHistory); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceClassHistory); } @Bean public BatchWriteListener companyVesselRelationshipsWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceCompanyVesselRelationships); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceCompanyVesselRelationships); } @Bean public BatchWriteListener crewListWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceCrewList); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceCrewList); } @Bean public BatchWriteListener darkActivityConfirmedWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceDarkActivityConfirmed); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceDarkActivityConfirmed); } @Bean public BatchWriteListener flagHistoryWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceFlagHistory); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceFlagHistory); } @Bean public BatchWriteListener groupBeneficialOwnerHistoryWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceGroupBeneficialOwnerHistory); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceGroupBeneficialOwnerHistory); } @Bean public BatchWriteListener iceClassWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceIceClass); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceIceClass); } @Bean public BatchWriteListener nameHistoryWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceNameHistory); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceNameHistory); } @Bean public BatchWriteListener operatorHistoryWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceOperatorHistory); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceOperatorHistory); } @Bean public BatchWriteListener pandIHistoryWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourcePandiHistory); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourcePandiHistory); } @Bean public BatchWriteListener safetyManagementCertificateHistWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceSafetyManagementCertificateHist); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceSafetyManagementCertificateHist); } @Bean public BatchWriteListener shipManagerHistoryWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceShipManagerHistory); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceShipManagerHistory); } @Bean public BatchWriteListener sisterShipLinksWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceSisterShipLinks); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceSisterShipLinks); } @Bean public BatchWriteListener specialFeatureWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceSpecialFeature); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceSpecialFeature); } @Bean public BatchWriteListener statusHistoryWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceStatusHistory); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceStatusHistory); } @Bean public BatchWriteListener stowageCommodityWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceStowageCommodity); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceStowageCommodity); } @Bean public BatchWriteListener surveyDatesWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceSurveyDates); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceSurveyDates); } @Bean public BatchWriteListener surveyDatesHistoryUniqueWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceSurveyDatesHistoryUnique); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceSurveyDatesHistoryUnique); } @Bean public BatchWriteListener technicalManagerHistoryWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceTechnicalManagerHistory); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceTechnicalManagerHistory); } @Bean public BatchWriteListener thrustersWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceThrusters); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceThrusters); } @Bean public BatchWriteListener tbCompanyDetailWriteListener() { - String sql = CommonSql.getCompleteBatchQuery(tableMetaInfo.sourceTbCompanyDetail); - return new BatchWriteListener<>(businessJdbcTemplate, sql); + return new BatchWriteListener<>(businessJdbcTemplate, tableMetaInfo.sourceTbCompanyDetail); } // --- Steps --- @@ -631,7 +605,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(createReader()) .processor(createProcessor()) .writer(createWriter()) @@ -643,7 +617,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(ownerHistoryReader(businessDataSource, tableMetaInfo)) .processor(new OwnerHistoryProcessor()) .writer(new OwnerHistoryWriter(shipRepository, transactionManager, subChunkSize)) @@ -655,7 +629,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(shipAddInfoReader(businessDataSource, tableMetaInfo)) .processor(new ShipAddInfoProcessor()) .writer(new ShipAddInfoWriter(shipRepository, transactionManager, subChunkSize)) @@ -667,7 +641,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(bareboatCharterHistoryReader(businessDataSource, tableMetaInfo)) .processor(new BareboatCharterHistoryProcessor()) .writer(new BareboatCharterHistoryWriter(shipRepository, transactionManager, subChunkSize)) @@ -679,7 +653,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(callsignAndMmsiHistoryReader(businessDataSource, tableMetaInfo)) .processor(new CallsignAndMmsiHistoryProcessor()) .writer(new CallsignAndMmsiHistoryWriter(shipRepository, transactionManager, subChunkSize)) @@ -691,7 +665,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(classHistoryReader(businessDataSource, tableMetaInfo)) .processor(new ClassHistoryProcessor()) .writer(new ClassHistoryWriter(shipRepository, transactionManager, subChunkSize)) @@ -703,7 +677,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(companyVesselRelationshipsReader(businessDataSource, tableMetaInfo)) .processor(new CompanyVesselRelationshipsProcessor()) .writer(new CompanyVesselRelationshipsWriter(shipRepository, transactionManager, subChunkSize)) @@ -715,7 +689,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(crewListReader(businessDataSource, tableMetaInfo)) .processor(new CrewListProcessor()) .writer(new CrewListWriter(shipRepository, transactionManager, subChunkSize)) @@ -727,7 +701,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(darkActivityConfirmedReader(businessDataSource, tableMetaInfo)) .processor(new DarkActivityConfirmedProcessor()) .writer(new DarkActivityConfirmedWriter(shipRepository, transactionManager, subChunkSize)) @@ -739,7 +713,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(flagHistoryReader(businessDataSource, tableMetaInfo)) .processor(new FlagHistoryProcessor()) .writer(new FlagHistoryWriter(shipRepository, transactionManager, subChunkSize)) @@ -751,7 +725,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(groupBeneficialOwnerHistoryReader(businessDataSource, tableMetaInfo)) .processor(new GroupBeneficialOwnerHistoryProcessor()) .writer(new GroupBeneficialOwnerHistoryWriter(shipRepository, transactionManager, subChunkSize)) @@ -763,7 +737,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(iceClassReader(businessDataSource, tableMetaInfo)) .processor(new IceClassProcessor()) .writer(new IceClassWriter(shipRepository, transactionManager, subChunkSize)) @@ -775,7 +749,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(nameHistoryReader(businessDataSource, tableMetaInfo)) .processor(new NameHistoryProcessor()) .writer(new NameHistoryWriter(shipRepository, transactionManager, subChunkSize)) @@ -787,7 +761,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(operatorHistoryReader(businessDataSource, tableMetaInfo)) .processor(new OperatorHistoryProcessor()) .writer(new OperatorHistoryWriter(shipRepository, transactionManager, subChunkSize)) @@ -799,7 +773,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(pandIHistoryReader(businessDataSource, tableMetaInfo)) .processor(new PandIHistoryProcessor()) .writer(new PandIHistoryWriter(shipRepository, transactionManager, subChunkSize)) @@ -811,7 +785,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(safetyManagementCertificateHistReader(businessDataSource, tableMetaInfo)) .processor(new SafetyManagementCertificateHistProcessor()) .writer(new SafetyManagementCertificateHistWriter(shipRepository, transactionManager, subChunkSize)) @@ -823,7 +797,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(shipManagerHistoryReader(businessDataSource, tableMetaInfo)) .processor(new ShipManagerHistoryProcessor()) .writer(new ShipManagerHistoryWriter(shipRepository, transactionManager, subChunkSize)) @@ -835,7 +809,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(sisterShipLinksReader(businessDataSource, tableMetaInfo)) .processor(new SisterShipLinksProcessor()) .writer(new SisterShipLinksWriter(shipRepository, transactionManager, subChunkSize)) @@ -847,7 +821,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(specialFeatureReader(businessDataSource, tableMetaInfo)) .processor(new SpecialFeatureProcessor()) .writer(new SpecialFeatureWriter(shipRepository, transactionManager, subChunkSize)) @@ -859,7 +833,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(statusHistoryReader(businessDataSource, tableMetaInfo)) .processor(new StatusHistoryProcessor()) .writer(new StatusHistoryWriter(shipRepository, transactionManager, subChunkSize)) @@ -871,7 +845,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(stowageCommodityReader(businessDataSource, tableMetaInfo)) .processor(new StowageCommodityProcessor()) .writer(new StowageCommodityWriter(shipRepository, transactionManager, subChunkSize)) @@ -883,7 +857,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(surveyDatesReader(businessDataSource, tableMetaInfo)) .processor(new SurveyDatesProcessor()) .writer(new SurveyDatesWriter(shipRepository, transactionManager, subChunkSize)) @@ -895,7 +869,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(surveyDatesHistoryUniqueReader(businessDataSource, tableMetaInfo)) .processor(new SurveyDatesHistoryUniqueProcessor()) .writer(new SurveyDatesHistoryUniqueWriter(shipRepository, transactionManager, subChunkSize)) @@ -907,7 +881,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(technicalManagerHistoryReader(businessDataSource, tableMetaInfo)) .processor(new TechnicalManagerHistoryProcessor()) .writer(new TechnicalManagerHistoryWriter(shipRepository, transactionManager, subChunkSize)) @@ -919,7 +893,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(thrustersReader(businessDataSource, tableMetaInfo)) .processor(new ThrustersProcessor()) .writer(new ThrustersWriter(shipRepository, transactionManager, subChunkSize)) @@ -931,7 +905,7 @@ public class ShipDetailSyncJobConfig extends BaseJobConfigchunk(getChunkSize(), transactionManager) + .chunk(Integer.MAX_VALUE, transactionManager) .reader(tbCompanyDetailReader(businessDataSource, tableMetaInfo)) .processor(new TbCompanyDetailProcessor()) .writer(new TbCompanyDetailWriter(shipRepository, transactionManager, subChunkSize)) diff --git a/src/main/java/com/snp/batch/jobs/datasync/batch/ship/repository/ShipRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/datasync/batch/ship/repository/ShipRepositoryImpl.java index 4ab0ee8..ce4ec40 100644 --- a/src/main/java/com/snp/batch/jobs/datasync/batch/ship/repository/ShipRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/datasync/batch/ship/repository/ShipRepositoryImpl.java @@ -237,7 +237,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -249,7 +249,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -288,7 +288,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -320,7 +320,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -352,7 +352,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -387,7 +387,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -435,7 +435,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -474,7 +474,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -527,7 +527,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -559,7 +559,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -592,7 +592,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -622,7 +622,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -653,7 +653,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -686,7 +686,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -719,7 +719,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -752,7 +752,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -793,7 +793,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -826,7 +826,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -855,7 +855,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -886,7 +886,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -918,7 +918,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -951,7 +951,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -986,7 +986,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -1018,7 +1018,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -1051,7 +1051,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository { @@ -1087,7 +1087,7 @@ public class ShipRepositoryImpl extends MultiDataSourceJdbcRepository