feat(batch): 선박 마지막 위치 업데이트 프로세스 변경 #7
@ -1,6 +1,7 @@
|
||||
package com.snp.batch.jobs.aistargetdbsync.batch.config;
|
||||
|
||||
import com.snp.batch.jobs.aistargetdbsync.batch.tasklet.AisTargetDbSyncTasklet;
|
||||
import com.snp.batch.jobs.aistargetdbsync.batch.tasklet.ShipLastPositionSyncTasklet;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.JobExecution;
|
||||
@ -20,13 +21,14 @@ import org.springframework.transaction.PlatformTransactionManager;
|
||||
* API: 없음 (캐시 기반)
|
||||
*
|
||||
* 동작:
|
||||
* - Caffeine 캐시에서 최근 15분 이내 데이터 조회
|
||||
* - MMSI별 최신 위치 1건씩 DB에 UPSERT
|
||||
* - 1분 주기 aisTargetImportJob과 분리하여 DB 볼륨 최적화
|
||||
* - Step 1: Caffeine 캐시에서 최근 15분 이내 데이터 조회 → ais_target DB UPSERT
|
||||
* - Step 2: 캐시에서 IMO별 최신 위치 조회 → tb_ship_main_info 위치 컬럼 UPDATE
|
||||
*
|
||||
* 데이터 흐름:
|
||||
* - aisTargetImportJob (1분): API → 캐시 업데이트
|
||||
* - aisTargetDbSyncJob (15분): 캐시 → DB 저장 (이 Job)
|
||||
* - Step 1: 캐시 → ais_target UPSERT
|
||||
* - Step 2: 캐시 → tb_ship_main_info UPDATE (IMO별 최신 1건)
|
||||
*/
|
||||
@Slf4j
|
||||
@Configuration
|
||||
@ -35,14 +37,17 @@ public class AisTargetDbSyncJobConfig {
|
||||
private final JobRepository jobRepository;
|
||||
private final PlatformTransactionManager transactionManager;
|
||||
private final AisTargetDbSyncTasklet aisTargetDbSyncTasklet;
|
||||
private final ShipLastPositionSyncTasklet shipLastPositionSyncTasklet;
|
||||
|
||||
public AisTargetDbSyncJobConfig(
|
||||
JobRepository jobRepository,
|
||||
PlatformTransactionManager transactionManager,
|
||||
AisTargetDbSyncTasklet aisTargetDbSyncTasklet) {
|
||||
AisTargetDbSyncTasklet aisTargetDbSyncTasklet,
|
||||
ShipLastPositionSyncTasklet shipLastPositionSyncTasklet) {
|
||||
this.jobRepository = jobRepository;
|
||||
this.transactionManager = transactionManager;
|
||||
this.aisTargetDbSyncTasklet = aisTargetDbSyncTasklet;
|
||||
this.shipLastPositionSyncTasklet = shipLastPositionSyncTasklet;
|
||||
}
|
||||
|
||||
@Bean(name = "aisTargetDbSyncStep")
|
||||
@ -52,6 +57,13 @@ public class AisTargetDbSyncJobConfig {
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean(name = "shipLastPositionSyncStep")
|
||||
public Step shipLastPositionSyncStep() {
|
||||
return new StepBuilder("shipLastPositionSyncStep", jobRepository)
|
||||
.tasklet(shipLastPositionSyncTasklet, transactionManager)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean(name = "aisTargetDbSyncJob")
|
||||
public Job aisTargetDbSyncJob() {
|
||||
log.info("Job 생성: aisTargetDbSyncJob");
|
||||
@ -74,6 +86,7 @@ public class AisTargetDbSyncJobConfig {
|
||||
}
|
||||
})
|
||||
.start(aisTargetDbSyncStep())
|
||||
.next(shipLastPositionSyncStep())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,15 @@
|
||||
package com.snp.batch.jobs.aistargetdbsync.batch.repository;
|
||||
|
||||
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 캐시 기반 Ship Last Position 동기화 Repository
|
||||
*
|
||||
* AisTargetEntity(캐시 데이터)를 받아 tb_ship_main_info의 위치/항해 컬럼을 UPDATE
|
||||
*/
|
||||
public interface ShipLastPositionSyncRepository {
|
||||
|
||||
int updateLastPositions(List<AisTargetEntity> entities);
|
||||
}
|
||||
@ -0,0 +1,155 @@
|
||||
package com.snp.batch.jobs.aistargetdbsync.batch.repository;
|
||||
|
||||
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.Types;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 캐시 기반 Ship Last Position 동기화 Repository 구현
|
||||
*
|
||||
* AisTargetEntity의 필드를 tb_ship_main_info 컬럼에 매핑하여 UPDATE 수행.
|
||||
* 기존 ShipLastPositionRepositoryImpl과 동일한 SQL을 사용하되,
|
||||
* AisTargetEntity의 타입(OffsetDateTime, Integer 등)에 맞게 변환 처리.
|
||||
*/
|
||||
@Slf4j
|
||||
@Repository("shipLastPositionSyncRepository")
|
||||
public class ShipLastPositionSyncRepositoryImpl implements ShipLastPositionSyncRepository {
|
||||
|
||||
private static final int BATCH_SIZE = 1000;
|
||||
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
|
||||
@Value("${app.batch.service-schema.name}")
|
||||
private String targetSchema;
|
||||
|
||||
@Value("${app.batch.service-schema.tables.service-001}")
|
||||
private String tableName;
|
||||
|
||||
public ShipLastPositionSyncRepositoryImpl(JdbcTemplate jdbcTemplate) {
|
||||
this.jdbcTemplate = jdbcTemplate;
|
||||
}
|
||||
|
||||
private String getTableName() {
|
||||
return targetSchema + "." + tableName;
|
||||
}
|
||||
|
||||
private String getUpdateSql() {
|
||||
// return """
|
||||
// UPDATE %s
|
||||
// SET last_cptr_hr_utc = ?::timestamptz,
|
||||
// last_port = ?,
|
||||
// now_position_lat = ?,
|
||||
// now_position_lon = ?,
|
||||
// ship_dest = ?,
|
||||
// arvl_prnmnt_hr = ?::timestamptz,
|
||||
// bow_drctn = ?,
|
||||
// cog = ?,
|
||||
// sog = ?,
|
||||
// ship_nav_status = ?,
|
||||
// cargo_ton = ?,
|
||||
// add_info = ?,
|
||||
// in_sts = ?,
|
||||
// ancrg_yn = ?
|
||||
// WHERE imo_no = ?
|
||||
// """.formatted(getTableName());
|
||||
return """
|
||||
UPDATE new_snp.core20
|
||||
SET lastseen = ?::timestamptz,
|
||||
lastport = ?,
|
||||
position_latitude = ?,
|
||||
position_longitude = ?,
|
||||
destination = ?,
|
||||
eta = ?::timestamptz,
|
||||
heading = ?,
|
||||
cog = ?,
|
||||
speedservice = ?,
|
||||
navstat = ?,
|
||||
tonnes_cargo = ?,
|
||||
extra_info = ?,
|
||||
in_sts = ?,
|
||||
on_berth = ?
|
||||
WHERE lrno = ?;
|
||||
""";
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public int updateLastPositions(List<AisTargetEntity> entities) {
|
||||
if (entities == null || entities.isEmpty()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
String sql = getUpdateSql();
|
||||
int totalUpdated = 0;
|
||||
|
||||
int[][] batchResults = jdbcTemplate.batchUpdate(sql, entities, BATCH_SIZE,
|
||||
(ps, entity) -> setUpdateParameters(ps, entity));
|
||||
|
||||
for (int[] batchResult : batchResults) {
|
||||
for (int result : batchResult) {
|
||||
if (result > 0) {
|
||||
totalUpdated += result;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Ship Last Position 동기화 완료: 대상={} 건, 갱신={} 건", entities.size(), totalUpdated);
|
||||
return totalUpdated;
|
||||
}
|
||||
|
||||
private void setUpdateParameters(PreparedStatement ps, AisTargetEntity entity) throws java.sql.SQLException {
|
||||
int idx = 1;
|
||||
|
||||
// last_cptr_hr_utc ← receivedDate (OffsetDateTime → String for ::timestamptz)
|
||||
ps.setString(idx++, entity.getReceivedDate() != null ? entity.getReceivedDate().toString() : null);
|
||||
|
||||
// last_port ← lpcCode (Integer → String)
|
||||
ps.setString(idx++, entity.getLpcCode() != null ? String.valueOf(entity.getLpcCode()) : null);
|
||||
|
||||
// now_position_lat ← lat (Double, null-safe)
|
||||
ps.setObject(idx++, entity.getLat(), Types.DOUBLE);
|
||||
|
||||
// now_position_lon ← lon (Double, null-safe)
|
||||
ps.setObject(idx++, entity.getLon(), Types.DOUBLE);
|
||||
|
||||
// ship_dest ← destination (String)
|
||||
ps.setString(idx++, entity.getDestination());
|
||||
|
||||
// arvl_prnmnt_hr ← eta (OffsetDateTime → String for ::timestamptz)
|
||||
ps.setString(idx++, entity.getEta() != null ? entity.getEta().toString() : null);
|
||||
|
||||
// bow_drctn ← heading (Double, null-safe)
|
||||
ps.setObject(idx++, entity.getHeading(), Types.DOUBLE);
|
||||
|
||||
// cog ← cog (Double, null-safe)
|
||||
ps.setObject(idx++, entity.getCog(), Types.DOUBLE);
|
||||
|
||||
// sog ← sog (Double, null-safe)
|
||||
ps.setObject(idx++, entity.getSog(), Types.DOUBLE);
|
||||
|
||||
// ship_nav_status ← status (String)
|
||||
ps.setString(idx++, entity.getStatus());
|
||||
|
||||
// cargo_ton ← tonnesCargo (Integer, null-safe)
|
||||
ps.setObject(idx++, entity.getTonnesCargo(), Types.INTEGER);
|
||||
|
||||
// add_info ← extraInfo (String)
|
||||
ps.setString(idx++, entity.getExtraInfo());
|
||||
|
||||
// in_sts ← inSTS (Integer, null-safe)
|
||||
ps.setObject(idx++, entity.getInSTS(), Types.INTEGER);
|
||||
|
||||
// ancrg_yn ← onBerth (Boolean, null-safe)
|
||||
ps.setObject(idx++, entity.getOnBerth(), Types.BOOLEAN);
|
||||
|
||||
// WHERE imo_no ← imoVerified (String)
|
||||
ps.setString(idx++, entity.getImoVerified());
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,133 @@
|
||||
package com.snp.batch.jobs.aistargetdbsync.batch.tasklet;
|
||||
|
||||
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
||||
import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager;
|
||||
import com.snp.batch.jobs.aistargetdbsync.batch.repository.ShipLastPositionSyncRepository;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.core.StepContribution;
|
||||
import org.springframework.batch.core.scope.context.ChunkContext;
|
||||
import org.springframework.batch.core.step.tasklet.Tasklet;
|
||||
import org.springframework.batch.repeat.RepeatStatus;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Ship Last Position Sync Tasklet
|
||||
*
|
||||
* 동작:
|
||||
* - Caffeine 캐시에서 마지막 성공 이후 ~ 현재까지의 AIS 데이터를 조회
|
||||
* - imoVerified가 유효한 항목만 필터링
|
||||
* - IMO별 messageTimestamp 최신 1건만 선택
|
||||
* - tb_ship_main_info의 위치/항해 14개 컬럼을 UPDATE
|
||||
*
|
||||
* 시간 범위 결정 전략:
|
||||
* - AisTargetDbSyncTasklet과 동일 (lastSuccessTime 기반, fallback 사용)
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class ShipLastPositionSyncTasklet implements Tasklet {
|
||||
|
||||
private final AisTargetCacheManager cacheManager;
|
||||
private final ShipLastPositionSyncRepository repository;
|
||||
private final int fallbackMinutes;
|
||||
|
||||
private final AtomicReference<Instant> lastSuccessTime = new AtomicReference<>();
|
||||
|
||||
public ShipLastPositionSyncTasklet(
|
||||
AisTargetCacheManager cacheManager,
|
||||
ShipLastPositionSyncRepository repository,
|
||||
@Value("${app.batch.ais-target-db-sync.time-range-minutes:15}") int fallbackMinutes) {
|
||||
this.cacheManager = cacheManager;
|
||||
this.repository = repository;
|
||||
this.fallbackMinutes = fallbackMinutes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
|
||||
Instant now = Instant.now();
|
||||
int rangeMinutes = resolveRangeMinutes(now);
|
||||
|
||||
log.info("========================================");
|
||||
log.info("Ship Last Position Sync 시작");
|
||||
log.info("조회 범위: 최근 {}분 (방식: {})", rangeMinutes,
|
||||
lastSuccessTime.get() != null ? "마지막 성공 기준" : "fallback");
|
||||
log.info("========================================");
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
// 1. 캐시에서 시간 범위 내 데이터 조회
|
||||
List<AisTargetEntity> entities = cacheManager.getByTimeRange(rangeMinutes);
|
||||
|
||||
if (entities.isEmpty()) {
|
||||
log.warn("캐시에서 조회된 데이터가 없습니다 (범위: {}분)", rangeMinutes);
|
||||
lastSuccessTime.set(now);
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
|
||||
// 2. imoVerified가 유효한 항목만 필터
|
||||
List<AisTargetEntity> imoFiltered = entities.stream()
|
||||
.filter(e -> isValidImo(e.getImoVerified()))
|
||||
.toList();
|
||||
|
||||
if (imoFiltered.isEmpty()) {
|
||||
log.info("유효한 IMO가 있는 데이터가 없습니다 (전체: {} 건)", entities.size());
|
||||
lastSuccessTime.set(now);
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
|
||||
// 3. IMO별 messageTimestamp 최신 1건만 선택
|
||||
List<AisTargetEntity> latestPerImo = imoFiltered.stream()
|
||||
.collect(Collectors.groupingBy(AisTargetEntity::getImoVerified))
|
||||
.values().stream()
|
||||
.map(group -> group.stream()
|
||||
.max(Comparator.comparing(
|
||||
AisTargetEntity::getMessageTimestamp,
|
||||
Comparator.nullsFirst(Comparator.naturalOrder())))
|
||||
.orElseThrow())
|
||||
.toList();
|
||||
|
||||
log.info("캐시 조회: {} 건 → IMO 유효: {} 건 → IMO별 최신: {} 건",
|
||||
entities.size(), imoFiltered.size(), latestPerImo.size());
|
||||
|
||||
// 4. DB UPDATE
|
||||
int updated = repository.updateLastPositions(latestPerImo);
|
||||
|
||||
long elapsed = System.currentTimeMillis() - startTime;
|
||||
|
||||
lastSuccessTime.set(now);
|
||||
|
||||
log.info("========================================");
|
||||
log.info("Ship Last Position Sync 완료");
|
||||
log.info("대상: {} 건, 갱신: {} 건", latestPerImo.size(), updated);
|
||||
log.info("소요 시간: {}ms", elapsed);
|
||||
log.info("========================================");
|
||||
|
||||
contribution.incrementWriteCount(updated);
|
||||
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
|
||||
private static final int MAX_RANGE_MINUTES = 60;
|
||||
|
||||
private int resolveRangeMinutes(Instant now) {
|
||||
Instant last = lastSuccessTime.get();
|
||||
if (last == null) {
|
||||
return Math.min(fallbackMinutes, MAX_RANGE_MINUTES);
|
||||
}
|
||||
|
||||
long elapsedMinutes = Duration.between(last, now).toMinutes();
|
||||
int range = (int) Math.max(elapsedMinutes + 1, 1);
|
||||
return Math.min(range, MAX_RANGE_MINUTES);
|
||||
}
|
||||
|
||||
private boolean isValidImo(String imoVerified) {
|
||||
return imoVerified != null && !imoVerified.isBlank() && !"0".equals(imoVerified);
|
||||
}
|
||||
}
|
||||
@ -1,90 +0,0 @@
|
||||
package com.snp.batch.jobs.shipdetail.batch.config;
|
||||
|
||||
import com.snp.batch.common.batch.config.BaseJobConfig;
|
||||
import com.snp.batch.jobs.shipdetail.batch.dto.TargetEnhancedDto;
|
||||
import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity;
|
||||
import com.snp.batch.jobs.shipdetail.batch.processor.ShipLastPositionDataProcessor;
|
||||
import com.snp.batch.jobs.shipdetail.batch.reader.ShipLastPositionDataReader;
|
||||
import com.snp.batch.jobs.shipdetail.batch.writer.ShipLastPositionDataWriter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.Step;
|
||||
import org.springframework.batch.core.repository.JobRepository;
|
||||
import org.springframework.batch.item.ItemProcessor;
|
||||
import org.springframework.batch.item.ItemReader;
|
||||
import org.springframework.batch.item.ItemWriter;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
|
||||
@Slf4j
|
||||
@Configuration
|
||||
public class ShipLastPositionUpdateJobConfig extends BaseJobConfig<TargetEnhancedDto, TargetEnhancedEntity> {
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
private final WebClient maritimeAisApiWebClient;
|
||||
|
||||
private final ShipLastPositionDataProcessor shipLastPositionDataProcessor;
|
||||
|
||||
private final ShipLastPositionDataWriter shipLastPositionDataWriter;
|
||||
|
||||
@Value("${app.batch.service-schema.name}")
|
||||
private String targetSchema;
|
||||
|
||||
@Override
|
||||
protected int getChunkSize() {
|
||||
return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정
|
||||
}
|
||||
public ShipLastPositionUpdateJobConfig(
|
||||
JobRepository jobRepository,
|
||||
PlatformTransactionManager transactionManager,
|
||||
ShipLastPositionDataProcessor shipLastPositionDataProcessor,
|
||||
ShipLastPositionDataWriter shipLastPositionDataWriter,
|
||||
JdbcTemplate jdbcTemplate,
|
||||
@Qualifier("maritimeAisApiWebClient")WebClient maritimeAisApiWebClient) {
|
||||
super(jobRepository, transactionManager);
|
||||
this.jdbcTemplate = jdbcTemplate;
|
||||
this.maritimeAisApiWebClient = maritimeAisApiWebClient;
|
||||
this.shipLastPositionDataProcessor = shipLastPositionDataProcessor;
|
||||
this.shipLastPositionDataWriter = shipLastPositionDataWriter;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getJobName() {
|
||||
return "shipLastPositionUpdateJob";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getStepName() {
|
||||
return "shipLastPositionUpdateStep";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ItemReader<TargetEnhancedDto> createReader() {
|
||||
return new ShipLastPositionDataReader(maritimeAisApiWebClient, jdbcTemplate, targetSchema);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ItemProcessor<TargetEnhancedDto, TargetEnhancedEntity> createProcessor() {
|
||||
return shipLastPositionDataProcessor;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ItemWriter<TargetEnhancedEntity> createWriter() {
|
||||
return shipLastPositionDataWriter;
|
||||
}
|
||||
|
||||
@Bean(name = "shipLastPositionUpdateJob")
|
||||
public Job shipLastPositionUpdateJob() {
|
||||
return job();
|
||||
}
|
||||
|
||||
@Bean(name = "shipLastPositionUpdateStep")
|
||||
public Step shipLastPositionUpdateStep() {
|
||||
return step();
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,42 +0,0 @@
|
||||
package com.snp.batch.jobs.shipdetail.batch.dto;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Maritime API GetShipsByIHSLRorIMONumbers 응답 래퍼
|
||||
*
|
||||
* API 응답 구조:
|
||||
* {
|
||||
* "shipCount": 5,
|
||||
* "Ships": [...]
|
||||
* }
|
||||
*
|
||||
* Maritime API GetShipsByIHSLRorIMONumbersAll 응답 래퍼
|
||||
*
|
||||
* API 응답 구조:
|
||||
* {
|
||||
* "shipCount": 5,
|
||||
* "ShipResult": [...],
|
||||
* "APSStatus": {...}
|
||||
* }
|
||||
*
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class AisApiResponse {
|
||||
|
||||
@JsonProperty("ShipResult")
|
||||
private List<ShipResultDto> shipResult;
|
||||
|
||||
@JsonProperty("targetEnhancedArr")
|
||||
private List<TargetEnhancedDto> targetEnhancedArr;
|
||||
|
||||
}
|
||||
@ -1,185 +0,0 @@
|
||||
package com.snp.batch.jobs.shipdetail.batch.dto;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class TargetEnhancedDto {
|
||||
|
||||
@JsonProperty("DWT")
|
||||
private Integer dwt;
|
||||
|
||||
@JsonProperty("STAT5CODE")
|
||||
private String stat5Code;
|
||||
|
||||
@JsonProperty("Source")
|
||||
private String source;
|
||||
|
||||
@JsonProperty("CoG")
|
||||
private Double cog;
|
||||
|
||||
@JsonProperty("LastStaticUpdateReceived")
|
||||
private String lastStaticUpdateReceived;
|
||||
|
||||
@JsonProperty("ZoneId")
|
||||
private Integer zoneId;
|
||||
|
||||
@JsonProperty("LPCCode")
|
||||
private String lpcCode;
|
||||
|
||||
@JsonProperty("TonnesCargo")
|
||||
private Integer tonnesCargo;
|
||||
|
||||
@JsonProperty("StationId")
|
||||
private String stationId;
|
||||
|
||||
@JsonProperty("InSTS")
|
||||
private Object inSts; // JSON에서 null이므로 Object로 처리
|
||||
|
||||
@JsonProperty("ImoVerified")
|
||||
private String imoVerified;
|
||||
|
||||
@JsonProperty("OnBerth")
|
||||
private Object onBerth;
|
||||
|
||||
@JsonProperty("DestinationTidied")
|
||||
private String destinationTidied;
|
||||
|
||||
@JsonProperty("Anomalous")
|
||||
private Object anomalous; // JSON에서 null이므로 Object로 처리
|
||||
|
||||
@JsonProperty("MessageType")
|
||||
private Object messageType; // JSON에서 null이므로 Object로 처리
|
||||
|
||||
@JsonProperty("DestinationPortID")
|
||||
private Integer destinationPortID;
|
||||
|
||||
@JsonProperty("DestinationUNLOCODE")
|
||||
private String destinationUnlocode;
|
||||
|
||||
@JsonProperty("MMSI")
|
||||
private Integer mmsi;
|
||||
|
||||
@JsonProperty("IMO")
|
||||
private Integer imo;
|
||||
|
||||
@JsonProperty("AgeMinutes")
|
||||
private Double ageMinutes;
|
||||
|
||||
@JsonProperty("Lat")
|
||||
private Double lat;
|
||||
|
||||
@JsonProperty("Lon")
|
||||
private Double lon;
|
||||
|
||||
@JsonProperty("Heading")
|
||||
private Double heading;
|
||||
|
||||
@JsonProperty("SoG")
|
||||
private Double sog;
|
||||
|
||||
@JsonProperty("Width")
|
||||
private Integer width;
|
||||
|
||||
@JsonProperty("Length")
|
||||
private Integer length;
|
||||
|
||||
@JsonProperty("Draught")
|
||||
private Double draught;
|
||||
|
||||
@JsonProperty("Name")
|
||||
private String name;
|
||||
|
||||
@JsonProperty("Callsign")
|
||||
private String callsign;
|
||||
|
||||
@JsonProperty("Destination")
|
||||
private String destination;
|
||||
|
||||
@JsonProperty("ETA")
|
||||
private String eta;
|
||||
|
||||
@JsonProperty("Status")
|
||||
private String status;
|
||||
|
||||
@JsonProperty("VesselType")
|
||||
private String vesselType;
|
||||
|
||||
@JsonProperty("ExtraInfo")
|
||||
private String extraInfo;
|
||||
|
||||
@JsonProperty("PositionAccuracy")
|
||||
private Integer positionAccuracy;
|
||||
|
||||
@JsonProperty("RoT")
|
||||
private Integer rot;
|
||||
|
||||
@JsonProperty("TimestampUTC")
|
||||
private Integer timestampUtc;
|
||||
|
||||
@JsonProperty("RepeatIndicator")
|
||||
private Integer repeatIndicator;
|
||||
|
||||
@JsonProperty("RAIMFlag")
|
||||
private Integer raimFlag;
|
||||
|
||||
@JsonProperty("RadioStatus")
|
||||
private Integer radioStatus;
|
||||
|
||||
@JsonProperty("Regional")
|
||||
private Integer regional;
|
||||
|
||||
@JsonProperty("Regional2")
|
||||
private Integer regional2;
|
||||
|
||||
@JsonProperty("Spare")
|
||||
private Integer spare;
|
||||
|
||||
@JsonProperty("Spare2")
|
||||
private Integer spare2;
|
||||
|
||||
@JsonProperty("AISVersion")
|
||||
private Integer aisVersion;
|
||||
|
||||
@JsonProperty("PositionFixType")
|
||||
private Integer positionFixType;
|
||||
|
||||
@JsonProperty("DTE")
|
||||
private Integer dte;
|
||||
|
||||
@JsonProperty("BandFlag")
|
||||
private Integer bandFlag;
|
||||
|
||||
@JsonProperty("ReceivedDate")
|
||||
private String receivedDate;
|
||||
|
||||
@JsonProperty("MessageTimestamp")
|
||||
private String messageTimestamp;
|
||||
|
||||
@JsonProperty("LengthBow")
|
||||
private Integer lengthBow;
|
||||
|
||||
@JsonProperty("LengthStern")
|
||||
private Integer lengthStern;
|
||||
|
||||
@JsonProperty("WidthPort")
|
||||
private Integer widthPort;
|
||||
|
||||
@JsonProperty("WidthStarboard")
|
||||
private Integer widthStarboard;
|
||||
|
||||
// Getters and Setters (생략 - 필요에 따라 추가)
|
||||
// 예시:
|
||||
public Integer getDwt() {
|
||||
return dwt;
|
||||
}
|
||||
public void setDwt(Integer dwt) {
|
||||
this.dwt = dwt;
|
||||
}
|
||||
}
|
||||
@ -1,128 +0,0 @@
|
||||
package com.snp.batch.jobs.shipdetail.batch.entity;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.snp.batch.common.batch.entity.BaseEntity;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Data
|
||||
@SuperBuilder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
public class TargetEnhancedEntity extends BaseEntity {
|
||||
|
||||
private Integer dwt;
|
||||
|
||||
private String stat5Code;
|
||||
|
||||
private String source;
|
||||
|
||||
private Double cog;
|
||||
|
||||
private String lastStaticUpdateReceived;
|
||||
|
||||
private Integer zoneId;
|
||||
|
||||
private String lpcCode;
|
||||
|
||||
private Integer tonnesCargo;
|
||||
|
||||
private String stationId;
|
||||
|
||||
private Object inSts; // JSON에서 null이므로 Object로 처리
|
||||
|
||||
private String imoVerified;
|
||||
|
||||
private Object onBerth;
|
||||
|
||||
private String destinationTidied;
|
||||
|
||||
private Object anomalous; // JSON에서 null이므로 Object로 처리
|
||||
|
||||
private Object messageType; // JSON에서 null이므로 Object로 처리
|
||||
|
||||
private Integer destinationPortID;
|
||||
|
||||
private String destinationUnlocode;
|
||||
|
||||
private Integer mmsi;
|
||||
|
||||
private Integer imo;
|
||||
|
||||
private Double ageMinutes;
|
||||
|
||||
private Double lat;
|
||||
|
||||
private Double lon;
|
||||
|
||||
private Double heading;
|
||||
|
||||
private Double sog;
|
||||
|
||||
private Integer width;
|
||||
|
||||
private Integer length;
|
||||
|
||||
private Double draught;
|
||||
|
||||
private String name;
|
||||
|
||||
private String callsign;
|
||||
|
||||
private String destination;
|
||||
|
||||
private String eta;
|
||||
|
||||
private String status;
|
||||
|
||||
private String vesselType;
|
||||
|
||||
private String extraInfo;
|
||||
|
||||
private Integer positionAccuracy;
|
||||
|
||||
private Integer rot;
|
||||
|
||||
private Integer timestampUtc;
|
||||
|
||||
private Integer repeatIndicator;
|
||||
|
||||
private Integer raimFlag;
|
||||
|
||||
private Integer radioStatus;
|
||||
|
||||
private Integer regional;
|
||||
|
||||
private Integer regional2;
|
||||
|
||||
private Integer spare;
|
||||
|
||||
private Integer spare2;
|
||||
|
||||
private Integer aisVersion;
|
||||
|
||||
private Integer positionFixType;
|
||||
|
||||
private Integer dte;
|
||||
|
||||
private Integer bandFlag;
|
||||
|
||||
private String receivedDate;
|
||||
|
||||
private String messageTimestamp;
|
||||
|
||||
private Integer lengthBow;
|
||||
|
||||
private Integer lengthStern;
|
||||
|
||||
private Integer widthPort;
|
||||
|
||||
private Integer widthStarboard;
|
||||
|
||||
}
|
||||
@ -1,83 +0,0 @@
|
||||
package com.snp.batch.jobs.shipdetail.batch.processor;
|
||||
|
||||
import com.snp.batch.common.batch.processor.BaseProcessor;
|
||||
import com.snp.batch.jobs.shipdetail.batch.dto.TargetEnhancedDto;
|
||||
import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class ShipLastPositionDataProcessor extends BaseProcessor<TargetEnhancedDto, TargetEnhancedEntity> {
|
||||
@Override
|
||||
protected TargetEnhancedEntity processItem(TargetEnhancedDto dto) throws Exception {
|
||||
log.debug("AIS 최신 항적 데이터 처리 시작: imoNumber={}, shipName={}",
|
||||
dto.getImoVerified(), dto.getName());
|
||||
|
||||
TargetEnhancedEntity entity = TargetEnhancedEntity.builder()
|
||||
.dwt(dto.getDwt())
|
||||
.stat5Code(dto.getStat5Code())
|
||||
.source(dto.getSource())
|
||||
.lpcCode(dto.getLpcCode())
|
||||
.tonnesCargo(dto.getTonnesCargo())
|
||||
.imoVerified(dto.getImoVerified())
|
||||
.lastStaticUpdateReceived(dto.getLastStaticUpdateReceived()) // Entity 필드 타입이 String인 경우
|
||||
// Position and Movement Data
|
||||
.cog(dto.getCog())
|
||||
.zoneId(dto.getZoneId())
|
||||
.stationId(dto.getStationId())
|
||||
.onBerth(dto.getOnBerth())
|
||||
.mmsi(dto.getMmsi())
|
||||
.imo(dto.getImo())
|
||||
.ageMinutes(dto.getAgeMinutes())
|
||||
.lat(dto.getLat())
|
||||
.lon(dto.getLon())
|
||||
.heading(dto.getHeading())
|
||||
.sog(dto.getSog())
|
||||
.positionAccuracy(dto.getPositionAccuracy())
|
||||
.rot(dto.getRot())
|
||||
.timestampUtc(dto.getTimestampUtc())
|
||||
.status(dto.getStatus())
|
||||
// Dimensions and Draught
|
||||
.width(dto.getWidth())
|
||||
.length(dto.getLength())
|
||||
.draught(dto.getDraught())
|
||||
.lengthBow(dto.getLengthBow())
|
||||
.lengthStern(dto.getLengthStern())
|
||||
.widthPort(dto.getWidthPort())
|
||||
.widthStarboard(dto.getWidthStarboard())
|
||||
// Voyage Data
|
||||
.name(dto.getName())
|
||||
.callsign(dto.getCallsign())
|
||||
.destination(dto.getDestination())
|
||||
.destinationTidied(dto.getDestinationTidied())
|
||||
.destinationPortID(dto.getDestinationPortID())
|
||||
.destinationUnlocode(dto.getDestinationUnlocode())
|
||||
.eta(dto.getEta()) // Entity 필드 타입이 String인 경우
|
||||
.vesselType(dto.getVesselType())
|
||||
.extraInfo(dto.getExtraInfo())
|
||||
// AIS Metadata Fields
|
||||
.repeatIndicator(dto.getRepeatIndicator())
|
||||
.raimFlag(dto.getRaimFlag())
|
||||
.radioStatus(dto.getRadioStatus())
|
||||
.regional(dto.getRegional())
|
||||
.regional2(dto.getRegional2())
|
||||
.spare(dto.getSpare())
|
||||
.spare2(dto.getSpare2())
|
||||
.aisVersion(dto.getAisVersion())
|
||||
.positionFixType(dto.getPositionFixType())
|
||||
.dte(dto.getDte())
|
||||
.bandFlag(dto.getBandFlag())
|
||||
.receivedDate(dto.getReceivedDate()) // Entity 필드 타입이 String인 경우
|
||||
.messageTimestamp(dto.getMessageTimestamp()) // Entity 필드 타입이 String인 경우
|
||||
// Null/Object Fields
|
||||
.inSts(dto.getInSts())
|
||||
.anomalous(dto.getAnomalous())
|
||||
.messageType(dto.getMessageType())
|
||||
.build();
|
||||
|
||||
log.debug("AIS 최신 항적 데이터 처리 완료: imoNumber={}", dto.getImoVerified());
|
||||
|
||||
return entity;
|
||||
}
|
||||
}
|
||||
@ -1,155 +0,0 @@
|
||||
package com.snp.batch.jobs.shipdetail.batch.reader;
|
||||
|
||||
import com.snp.batch.common.batch.reader.BaseApiReader;
|
||||
import com.snp.batch.jobs.shipdetail.batch.dto.AisApiResponse;
|
||||
import com.snp.batch.jobs.shipdetail.batch.dto.TargetEnhancedDto;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
|
||||
import java.util.*;
|
||||
@Slf4j
|
||||
public class ShipLastPositionDataReader extends BaseApiReader<TargetEnhancedDto> {
|
||||
|
||||
//TODO :
|
||||
// 1. Core20 IMO_NUMBER 전체 조회
|
||||
// 2. IMO번호에 대한 마지막 AIS 신호 요청 (1회 최대 5000개 : Chunk 단위로 반복)
|
||||
// 3. Response Data -> Core20에 업데이트 (Chunk 단위로 반복)
|
||||
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
private final String targetSchema;
|
||||
|
||||
private List<String> allImoNumbers;
|
||||
private int currentBatchIndex = 0;
|
||||
private final int batchSize = 5000;
|
||||
|
||||
public ShipLastPositionDataReader(WebClient webClient, JdbcTemplate jdbcTemplate, String targetSchema) {
|
||||
super(webClient);
|
||||
this.jdbcTemplate = jdbcTemplate;
|
||||
this.targetSchema = targetSchema;
|
||||
enableChunkMode(); // ✨ Chunk 모드 활성화
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getReaderName() {
|
||||
return "ShipLastPositionDataReader";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void resetCustomState() {
|
||||
this.currentBatchIndex = 0;
|
||||
this.allImoNumbers = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getApiPath() {
|
||||
return "/AisSvc.svc/AIS/GetTargetsByIMOsEnhanced";
|
||||
}
|
||||
|
||||
private String getTargetTable(){
|
||||
return targetSchema + ".tb_ship_main_info";
|
||||
}
|
||||
|
||||
private String getImoQuery() {
|
||||
return "SELECT imo_no FROM " + getTargetTable() + " ORDER BY imo_no";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void beforeFetch(){
|
||||
log.info("[{}] Core20 테이블에서 IMO 번호 조회 시작...", getReaderName());
|
||||
|
||||
allImoNumbers = jdbcTemplate.queryForList(getImoQuery(), String.class);
|
||||
|
||||
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
|
||||
|
||||
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
|
||||
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
|
||||
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
|
||||
|
||||
updateApiCallStats(totalBatches, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<TargetEnhancedDto> fetchNextBatch() throws Exception {
|
||||
// 모든 배치 처리 완료 확인
|
||||
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
|
||||
return null; // Job 종료
|
||||
}
|
||||
|
||||
// 현재 배치의 시작/끝 인덱스 계산
|
||||
int startIndex = currentBatchIndex;
|
||||
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
|
||||
|
||||
// 현재 배치의 IMO 번호 추출 (100개)
|
||||
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
|
||||
|
||||
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
|
||||
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
|
||||
|
||||
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
|
||||
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
|
||||
|
||||
try {
|
||||
// IMO 번호를 쉼표로 연결 (예: "1000019,1000021,1000033,...")
|
||||
String imoParam = String.join(",", currentBatch);
|
||||
|
||||
// API 호출
|
||||
AisApiResponse response = callAisApiWithBatch(imoParam);
|
||||
|
||||
// 다음 배치로 인덱스 이동
|
||||
currentBatchIndex = endIndex;
|
||||
|
||||
// 응답 처리
|
||||
if (response != null && response.getTargetEnhancedArr() != null) {
|
||||
List<TargetEnhancedDto> targets = response.getTargetEnhancedArr();
|
||||
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
|
||||
getReaderName(), currentBatchNumber, totalBatches, targets.size());
|
||||
|
||||
// API 호출 통계 업데이트
|
||||
updateApiCallStats(totalBatches, currentBatchNumber);
|
||||
|
||||
// API 과부하 방지 (다음 배치 전 0.5초 대기)
|
||||
if (currentBatchIndex < allImoNumbers.size()) {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
||||
return targets;
|
||||
} else {
|
||||
log.warn("[{}] 배치 {}/{} 응답 없음",
|
||||
getReaderName(), currentBatchNumber, totalBatches);
|
||||
|
||||
// API 호출 통계 업데이트 (실패도 카운트)
|
||||
updateApiCallStats(totalBatches, currentBatchNumber);
|
||||
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
|
||||
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
|
||||
|
||||
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
|
||||
currentBatchIndex = endIndex;
|
||||
|
||||
// 빈 리스트 반환 (Job 계속 진행)
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
private AisApiResponse callAisApiWithBatch(String imoNumbers) {
|
||||
String url = getApiPath();
|
||||
|
||||
Map<String, String> requestBody = Map.of("imo", imoNumbers);
|
||||
|
||||
log.debug("[{}] API 호출: {}", getReaderName(), url);
|
||||
log.debug("[{}] : {}", "requestBody", requestBody.toString());
|
||||
|
||||
return webClient.post()
|
||||
.uri(url)
|
||||
.bodyValue(requestBody)
|
||||
.retrieve()
|
||||
.bodyToMono(AisApiResponse.class)
|
||||
.block();
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,9 +0,0 @@
|
||||
package com.snp.batch.jobs.shipdetail.batch.repository;
|
||||
|
||||
import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface ShipLastPositionRepository {
|
||||
void saveLastPositionAll(List<TargetEnhancedEntity> items);
|
||||
}
|
||||
@ -1,125 +0,0 @@
|
||||
package com.snp.batch.jobs.shipdetail.batch.repository;
|
||||
|
||||
import com.snp.batch.common.batch.repository.BaseJdbcRepository;
|
||||
import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.jdbc.core.RowMapper;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.Types;
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@Repository("shipLastPositionRepository")
|
||||
public class ShipLastPositionRepositoryImpl extends BaseJdbcRepository<TargetEnhancedEntity, Long> implements ShipLastPositionRepository {
|
||||
|
||||
@Value("${app.batch.service-schema.name}")
|
||||
private String targetSchema;
|
||||
|
||||
@Value("${app.batch.service-schema.tables.service-001}")
|
||||
private String tableName;
|
||||
|
||||
public ShipLastPositionRepositoryImpl(JdbcTemplate jdbcTemplate) {
|
||||
super(jdbcTemplate);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getTargetSchema() {
|
||||
return targetSchema;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getSimpleTableName() {
|
||||
return tableName;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RowMapper<TargetEnhancedEntity> getRowMapper() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Long extractId(TargetEnhancedEntity entity) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getInsertSql() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getUpdateSql() {
|
||||
return """
|
||||
UPDATE %s
|
||||
SET last_cptr_hr_utc = ?::timestamptz,
|
||||
last_port = ?,
|
||||
now_position_lat = ?,
|
||||
now_position_lon = ?,
|
||||
ship_dest = ?,
|
||||
arvl_prnmnt_hr = ?::timestamptz,
|
||||
bow_drctn = ?,
|
||||
cog = ?,
|
||||
sog = ?,
|
||||
ship_nav_status = ?,
|
||||
cargo_ton = ?,
|
||||
add_info = ?,
|
||||
in_sts = ?,
|
||||
ancrg_yn = ?
|
||||
WHERE imo_no = ?;
|
||||
""".formatted(getTableName());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setInsertParameters(PreparedStatement ps, TargetEnhancedEntity entity) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setUpdateParameters(PreparedStatement ps, TargetEnhancedEntity entity) throws Exception {
|
||||
int idx = 1;
|
||||
ps.setString(idx++, entity.getReceivedDate()); // receivedDate : lastseen (Datetime)
|
||||
ps.setString(idx++, entity.getLpcCode()); // lpcCode : lastport (String)
|
||||
ps.setDouble(idx++, entity.getLat()); // lat : position_latitude (Double)
|
||||
ps.setDouble(idx++, entity.getLon()); // lon : position_longitude (Double)
|
||||
ps.setString(idx++, entity.getDestination()); // destination : destination (String)
|
||||
ps.setString(idx++, entity.getEta()); // eta : eta (Datetime)
|
||||
ps.setDouble(idx++, entity.getHeading()); // heading : heading (Double)
|
||||
ps.setDouble(idx++, entity.getSog()); // sog : speedservice (Double)
|
||||
ps.setDouble(idx++, entity.getCog()); // cog : cog (Double)
|
||||
ps.setString(idx++, entity.getStatus()); // status : navstat (String)
|
||||
ps.setInt(idx++, entity.getTonnesCargo()); // status : tonnes_cargo (Int)
|
||||
ps.setString(idx++, entity.getExtraInfo()); // status : extra_info (String)
|
||||
ps.setObject(idx++, entity.getInSts(), Types.INTEGER);
|
||||
ps.setObject(idx++, entity.getOnBerth(), Types.BOOLEAN);
|
||||
ps.setString(idx++, entity.getImoVerified()); // imoVerified : imo (String)
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getEntityName() {
|
||||
return "TargetEnhancedEntity";
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void saveLastPositionAll(List<TargetEnhancedEntity> items) {
|
||||
if (items == null || items.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
jdbcTemplate.batchUpdate(getUpdateSql(), items, items.size(),
|
||||
(ps, entity) -> {
|
||||
try {
|
||||
setUpdateParameters(ps, entity);
|
||||
} catch (Exception e) {
|
||||
log.error("배치 수정 파라미터 설정 실패", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
log.info("{} 전체 저장 완료: 수정={} 건", getEntityName(), items.size());
|
||||
}
|
||||
}
|
||||
@ -1,24 +0,0 @@
|
||||
package com.snp.batch.jobs.shipdetail.batch.writer;
|
||||
|
||||
import com.snp.batch.common.batch.writer.BaseWriter;
|
||||
import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity;
|
||||
import com.snp.batch.jobs.shipdetail.batch.repository.ShipLastPositionRepository;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
@Slf4j
|
||||
@Component
|
||||
public class ShipLastPositionDataWriter extends BaseWriter<TargetEnhancedEntity> {
|
||||
|
||||
private final ShipLastPositionRepository shipLastPositionRepository;
|
||||
public ShipLastPositionDataWriter(ShipLastPositionRepository shipLastPositionRepository) {
|
||||
super("ShipLastPosition");
|
||||
this.shipLastPositionRepository = shipLastPositionRepository;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeItems(List<TargetEnhancedEntity> items) throws Exception {
|
||||
shipLastPositionRepository.saveLastPositionAll(items);
|
||||
}
|
||||
}
|
||||
@ -134,10 +134,10 @@ app:
|
||||
|
||||
# Core20 캐시 테이블 설정 (환경별로 테이블/컬럼명이 다를 수 있음)
|
||||
core20:
|
||||
schema: t_std_snp_data # 스키마명
|
||||
table: tb_ship_info_mst # 테이블명
|
||||
imo-column: imo_no # IMO/LRNO 컬럼명 (PK, NOT NULL)
|
||||
mmsi-column: mmsi_no # MMSI 컬럼명 (NULLABLE)
|
||||
schema: new_snp # 스키마명
|
||||
table: core20 # 테이블명
|
||||
imo-column: lrno # IMO/LRNO 컬럼명 (PK, NOT NULL)
|
||||
mmsi-column: mmsi # MMSI 컬럼명 (NULLABLE)
|
||||
|
||||
# 파티션 관리 설정
|
||||
partition:
|
||||
|
||||
불러오는 중...
Reference in New Issue
Block a user