feat(batch): 선박 마지막 위치 업데이트 프로세스 변경 #7
@ -1,6 +1,7 @@
|
|||||||
package com.snp.batch.jobs.aistargetdbsync.batch.config;
|
package com.snp.batch.jobs.aistargetdbsync.batch.config;
|
||||||
|
|
||||||
import com.snp.batch.jobs.aistargetdbsync.batch.tasklet.AisTargetDbSyncTasklet;
|
import com.snp.batch.jobs.aistargetdbsync.batch.tasklet.AisTargetDbSyncTasklet;
|
||||||
|
import com.snp.batch.jobs.aistargetdbsync.batch.tasklet.ShipLastPositionSyncTasklet;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.batch.core.Job;
|
import org.springframework.batch.core.Job;
|
||||||
import org.springframework.batch.core.JobExecution;
|
import org.springframework.batch.core.JobExecution;
|
||||||
@ -20,13 +21,14 @@ import org.springframework.transaction.PlatformTransactionManager;
|
|||||||
* API: 없음 (캐시 기반)
|
* API: 없음 (캐시 기반)
|
||||||
*
|
*
|
||||||
* 동작:
|
* 동작:
|
||||||
* - Caffeine 캐시에서 최근 15분 이내 데이터 조회
|
* - Step 1: Caffeine 캐시에서 최근 15분 이내 데이터 조회 → ais_target DB UPSERT
|
||||||
* - MMSI별 최신 위치 1건씩 DB에 UPSERT
|
* - Step 2: 캐시에서 IMO별 최신 위치 조회 → tb_ship_main_info 위치 컬럼 UPDATE
|
||||||
* - 1분 주기 aisTargetImportJob과 분리하여 DB 볼륨 최적화
|
|
||||||
*
|
*
|
||||||
* 데이터 흐름:
|
* 데이터 흐름:
|
||||||
* - aisTargetImportJob (1분): API → 캐시 업데이트
|
* - aisTargetImportJob (1분): API → 캐시 업데이트
|
||||||
* - aisTargetDbSyncJob (15분): 캐시 → DB 저장 (이 Job)
|
* - aisTargetDbSyncJob (15분): 캐시 → DB 저장 (이 Job)
|
||||||
|
* - Step 1: 캐시 → ais_target UPSERT
|
||||||
|
* - Step 2: 캐시 → tb_ship_main_info UPDATE (IMO별 최신 1건)
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Configuration
|
@Configuration
|
||||||
@ -35,14 +37,17 @@ public class AisTargetDbSyncJobConfig {
|
|||||||
private final JobRepository jobRepository;
|
private final JobRepository jobRepository;
|
||||||
private final PlatformTransactionManager transactionManager;
|
private final PlatformTransactionManager transactionManager;
|
||||||
private final AisTargetDbSyncTasklet aisTargetDbSyncTasklet;
|
private final AisTargetDbSyncTasklet aisTargetDbSyncTasklet;
|
||||||
|
private final ShipLastPositionSyncTasklet shipLastPositionSyncTasklet;
|
||||||
|
|
||||||
public AisTargetDbSyncJobConfig(
|
public AisTargetDbSyncJobConfig(
|
||||||
JobRepository jobRepository,
|
JobRepository jobRepository,
|
||||||
PlatformTransactionManager transactionManager,
|
PlatformTransactionManager transactionManager,
|
||||||
AisTargetDbSyncTasklet aisTargetDbSyncTasklet) {
|
AisTargetDbSyncTasklet aisTargetDbSyncTasklet,
|
||||||
|
ShipLastPositionSyncTasklet shipLastPositionSyncTasklet) {
|
||||||
this.jobRepository = jobRepository;
|
this.jobRepository = jobRepository;
|
||||||
this.transactionManager = transactionManager;
|
this.transactionManager = transactionManager;
|
||||||
this.aisTargetDbSyncTasklet = aisTargetDbSyncTasklet;
|
this.aisTargetDbSyncTasklet = aisTargetDbSyncTasklet;
|
||||||
|
this.shipLastPositionSyncTasklet = shipLastPositionSyncTasklet;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean(name = "aisTargetDbSyncStep")
|
@Bean(name = "aisTargetDbSyncStep")
|
||||||
@ -52,6 +57,13 @@ public class AisTargetDbSyncJobConfig {
|
|||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean(name = "shipLastPositionSyncStep")
|
||||||
|
public Step shipLastPositionSyncStep() {
|
||||||
|
return new StepBuilder("shipLastPositionSyncStep", jobRepository)
|
||||||
|
.tasklet(shipLastPositionSyncTasklet, transactionManager)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
@Bean(name = "aisTargetDbSyncJob")
|
@Bean(name = "aisTargetDbSyncJob")
|
||||||
public Job aisTargetDbSyncJob() {
|
public Job aisTargetDbSyncJob() {
|
||||||
log.info("Job 생성: aisTargetDbSyncJob");
|
log.info("Job 생성: aisTargetDbSyncJob");
|
||||||
@ -74,6 +86,7 @@ public class AisTargetDbSyncJobConfig {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
.start(aisTargetDbSyncStep())
|
.start(aisTargetDbSyncStep())
|
||||||
|
.next(shipLastPositionSyncStep())
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,15 @@
|
|||||||
|
package com.snp.batch.jobs.aistargetdbsync.batch.repository;
|
||||||
|
|
||||||
|
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 캐시 기반 Ship Last Position 동기화 Repository
|
||||||
|
*
|
||||||
|
* AisTargetEntity(캐시 데이터)를 받아 tb_ship_main_info의 위치/항해 컬럼을 UPDATE
|
||||||
|
*/
|
||||||
|
public interface ShipLastPositionSyncRepository {
|
||||||
|
|
||||||
|
int updateLastPositions(List<AisTargetEntity> entities);
|
||||||
|
}
|
||||||
@ -0,0 +1,155 @@
|
|||||||
|
package com.snp.batch.jobs.aistargetdbsync.batch.repository;
|
||||||
|
|
||||||
|
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
import org.springframework.stereotype.Repository;
|
||||||
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
|
import java.sql.PreparedStatement;
|
||||||
|
import java.sql.Types;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 캐시 기반 Ship Last Position 동기화 Repository 구현
|
||||||
|
*
|
||||||
|
* AisTargetEntity의 필드를 tb_ship_main_info 컬럼에 매핑하여 UPDATE 수행.
|
||||||
|
* 기존 ShipLastPositionRepositoryImpl과 동일한 SQL을 사용하되,
|
||||||
|
* AisTargetEntity의 타입(OffsetDateTime, Integer 등)에 맞게 변환 처리.
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Repository("shipLastPositionSyncRepository")
|
||||||
|
public class ShipLastPositionSyncRepositoryImpl implements ShipLastPositionSyncRepository {
|
||||||
|
|
||||||
|
private static final int BATCH_SIZE = 1000;
|
||||||
|
|
||||||
|
private final JdbcTemplate jdbcTemplate;
|
||||||
|
|
||||||
|
@Value("${app.batch.service-schema.name}")
|
||||||
|
private String targetSchema;
|
||||||
|
|
||||||
|
@Value("${app.batch.service-schema.tables.service-001}")
|
||||||
|
private String tableName;
|
||||||
|
|
||||||
|
public ShipLastPositionSyncRepositoryImpl(JdbcTemplate jdbcTemplate) {
|
||||||
|
this.jdbcTemplate = jdbcTemplate;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getTableName() {
|
||||||
|
return targetSchema + "." + tableName;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getUpdateSql() {
|
||||||
|
// return """
|
||||||
|
// UPDATE %s
|
||||||
|
// SET last_cptr_hr_utc = ?::timestamptz,
|
||||||
|
// last_port = ?,
|
||||||
|
// now_position_lat = ?,
|
||||||
|
// now_position_lon = ?,
|
||||||
|
// ship_dest = ?,
|
||||||
|
// arvl_prnmnt_hr = ?::timestamptz,
|
||||||
|
// bow_drctn = ?,
|
||||||
|
// cog = ?,
|
||||||
|
// sog = ?,
|
||||||
|
// ship_nav_status = ?,
|
||||||
|
// cargo_ton = ?,
|
||||||
|
// add_info = ?,
|
||||||
|
// in_sts = ?,
|
||||||
|
// ancrg_yn = ?
|
||||||
|
// WHERE imo_no = ?
|
||||||
|
// """.formatted(getTableName());
|
||||||
|
return """
|
||||||
|
UPDATE new_snp.core20
|
||||||
|
SET lastseen = ?::timestamptz,
|
||||||
|
lastport = ?,
|
||||||
|
position_latitude = ?,
|
||||||
|
position_longitude = ?,
|
||||||
|
destination = ?,
|
||||||
|
eta = ?::timestamptz,
|
||||||
|
heading = ?,
|
||||||
|
cog = ?,
|
||||||
|
speedservice = ?,
|
||||||
|
navstat = ?,
|
||||||
|
tonnes_cargo = ?,
|
||||||
|
extra_info = ?,
|
||||||
|
in_sts = ?,
|
||||||
|
on_berth = ?
|
||||||
|
WHERE lrno = ?;
|
||||||
|
""";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Transactional
|
||||||
|
public int updateLastPositions(List<AisTargetEntity> entities) {
|
||||||
|
if (entities == null || entities.isEmpty()) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
String sql = getUpdateSql();
|
||||||
|
int totalUpdated = 0;
|
||||||
|
|
||||||
|
int[][] batchResults = jdbcTemplate.batchUpdate(sql, entities, BATCH_SIZE,
|
||||||
|
(ps, entity) -> setUpdateParameters(ps, entity));
|
||||||
|
|
||||||
|
for (int[] batchResult : batchResults) {
|
||||||
|
for (int result : batchResult) {
|
||||||
|
if (result > 0) {
|
||||||
|
totalUpdated += result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("Ship Last Position 동기화 완료: 대상={} 건, 갱신={} 건", entities.size(), totalUpdated);
|
||||||
|
return totalUpdated;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setUpdateParameters(PreparedStatement ps, AisTargetEntity entity) throws java.sql.SQLException {
|
||||||
|
int idx = 1;
|
||||||
|
|
||||||
|
// last_cptr_hr_utc ← receivedDate (OffsetDateTime → String for ::timestamptz)
|
||||||
|
ps.setString(idx++, entity.getReceivedDate() != null ? entity.getReceivedDate().toString() : null);
|
||||||
|
|
||||||
|
// last_port ← lpcCode (Integer → String)
|
||||||
|
ps.setString(idx++, entity.getLpcCode() != null ? String.valueOf(entity.getLpcCode()) : null);
|
||||||
|
|
||||||
|
// now_position_lat ← lat (Double, null-safe)
|
||||||
|
ps.setObject(idx++, entity.getLat(), Types.DOUBLE);
|
||||||
|
|
||||||
|
// now_position_lon ← lon (Double, null-safe)
|
||||||
|
ps.setObject(idx++, entity.getLon(), Types.DOUBLE);
|
||||||
|
|
||||||
|
// ship_dest ← destination (String)
|
||||||
|
ps.setString(idx++, entity.getDestination());
|
||||||
|
|
||||||
|
// arvl_prnmnt_hr ← eta (OffsetDateTime → String for ::timestamptz)
|
||||||
|
ps.setString(idx++, entity.getEta() != null ? entity.getEta().toString() : null);
|
||||||
|
|
||||||
|
// bow_drctn ← heading (Double, null-safe)
|
||||||
|
ps.setObject(idx++, entity.getHeading(), Types.DOUBLE);
|
||||||
|
|
||||||
|
// cog ← cog (Double, null-safe)
|
||||||
|
ps.setObject(idx++, entity.getCog(), Types.DOUBLE);
|
||||||
|
|
||||||
|
// sog ← sog (Double, null-safe)
|
||||||
|
ps.setObject(idx++, entity.getSog(), Types.DOUBLE);
|
||||||
|
|
||||||
|
// ship_nav_status ← status (String)
|
||||||
|
ps.setString(idx++, entity.getStatus());
|
||||||
|
|
||||||
|
// cargo_ton ← tonnesCargo (Integer, null-safe)
|
||||||
|
ps.setObject(idx++, entity.getTonnesCargo(), Types.INTEGER);
|
||||||
|
|
||||||
|
// add_info ← extraInfo (String)
|
||||||
|
ps.setString(idx++, entity.getExtraInfo());
|
||||||
|
|
||||||
|
// in_sts ← inSTS (Integer, null-safe)
|
||||||
|
ps.setObject(idx++, entity.getInSTS(), Types.INTEGER);
|
||||||
|
|
||||||
|
// ancrg_yn ← onBerth (Boolean, null-safe)
|
||||||
|
ps.setObject(idx++, entity.getOnBerth(), Types.BOOLEAN);
|
||||||
|
|
||||||
|
// WHERE imo_no ← imoVerified (String)
|
||||||
|
ps.setString(idx++, entity.getImoVerified());
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,133 @@
|
|||||||
|
package com.snp.batch.jobs.aistargetdbsync.batch.tasklet;
|
||||||
|
|
||||||
|
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
||||||
|
import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager;
|
||||||
|
import com.snp.batch.jobs.aistargetdbsync.batch.repository.ShipLastPositionSyncRepository;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.batch.core.StepContribution;
|
||||||
|
import org.springframework.batch.core.scope.context.ChunkContext;
|
||||||
|
import org.springframework.batch.core.step.tasklet.Tasklet;
|
||||||
|
import org.springframework.batch.repeat.RepeatStatus;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ship Last Position Sync Tasklet
|
||||||
|
*
|
||||||
|
* 동작:
|
||||||
|
* - Caffeine 캐시에서 마지막 성공 이후 ~ 현재까지의 AIS 데이터를 조회
|
||||||
|
* - imoVerified가 유효한 항목만 필터링
|
||||||
|
* - IMO별 messageTimestamp 최신 1건만 선택
|
||||||
|
* - tb_ship_main_info의 위치/항해 14개 컬럼을 UPDATE
|
||||||
|
*
|
||||||
|
* 시간 범위 결정 전략:
|
||||||
|
* - AisTargetDbSyncTasklet과 동일 (lastSuccessTime 기반, fallback 사용)
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class ShipLastPositionSyncTasklet implements Tasklet {
|
||||||
|
|
||||||
|
private final AisTargetCacheManager cacheManager;
|
||||||
|
private final ShipLastPositionSyncRepository repository;
|
||||||
|
private final int fallbackMinutes;
|
||||||
|
|
||||||
|
private final AtomicReference<Instant> lastSuccessTime = new AtomicReference<>();
|
||||||
|
|
||||||
|
public ShipLastPositionSyncTasklet(
|
||||||
|
AisTargetCacheManager cacheManager,
|
||||||
|
ShipLastPositionSyncRepository repository,
|
||||||
|
@Value("${app.batch.ais-target-db-sync.time-range-minutes:15}") int fallbackMinutes) {
|
||||||
|
this.cacheManager = cacheManager;
|
||||||
|
this.repository = repository;
|
||||||
|
this.fallbackMinutes = fallbackMinutes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
|
||||||
|
Instant now = Instant.now();
|
||||||
|
int rangeMinutes = resolveRangeMinutes(now);
|
||||||
|
|
||||||
|
log.info("========================================");
|
||||||
|
log.info("Ship Last Position Sync 시작");
|
||||||
|
log.info("조회 범위: 최근 {}분 (방식: {})", rangeMinutes,
|
||||||
|
lastSuccessTime.get() != null ? "마지막 성공 기준" : "fallback");
|
||||||
|
log.info("========================================");
|
||||||
|
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
|
// 1. 캐시에서 시간 범위 내 데이터 조회
|
||||||
|
List<AisTargetEntity> entities = cacheManager.getByTimeRange(rangeMinutes);
|
||||||
|
|
||||||
|
if (entities.isEmpty()) {
|
||||||
|
log.warn("캐시에서 조회된 데이터가 없습니다 (범위: {}분)", rangeMinutes);
|
||||||
|
lastSuccessTime.set(now);
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. imoVerified가 유효한 항목만 필터
|
||||||
|
List<AisTargetEntity> imoFiltered = entities.stream()
|
||||||
|
.filter(e -> isValidImo(e.getImoVerified()))
|
||||||
|
.toList();
|
||||||
|
|
||||||
|
if (imoFiltered.isEmpty()) {
|
||||||
|
log.info("유효한 IMO가 있는 데이터가 없습니다 (전체: {} 건)", entities.size());
|
||||||
|
lastSuccessTime.set(now);
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. IMO별 messageTimestamp 최신 1건만 선택
|
||||||
|
List<AisTargetEntity> latestPerImo = imoFiltered.stream()
|
||||||
|
.collect(Collectors.groupingBy(AisTargetEntity::getImoVerified))
|
||||||
|
.values().stream()
|
||||||
|
.map(group -> group.stream()
|
||||||
|
.max(Comparator.comparing(
|
||||||
|
AisTargetEntity::getMessageTimestamp,
|
||||||
|
Comparator.nullsFirst(Comparator.naturalOrder())))
|
||||||
|
.orElseThrow())
|
||||||
|
.toList();
|
||||||
|
|
||||||
|
log.info("캐시 조회: {} 건 → IMO 유효: {} 건 → IMO별 최신: {} 건",
|
||||||
|
entities.size(), imoFiltered.size(), latestPerImo.size());
|
||||||
|
|
||||||
|
// 4. DB UPDATE
|
||||||
|
int updated = repository.updateLastPositions(latestPerImo);
|
||||||
|
|
||||||
|
long elapsed = System.currentTimeMillis() - startTime;
|
||||||
|
|
||||||
|
lastSuccessTime.set(now);
|
||||||
|
|
||||||
|
log.info("========================================");
|
||||||
|
log.info("Ship Last Position Sync 완료");
|
||||||
|
log.info("대상: {} 건, 갱신: {} 건", latestPerImo.size(), updated);
|
||||||
|
log.info("소요 시간: {}ms", elapsed);
|
||||||
|
log.info("========================================");
|
||||||
|
|
||||||
|
contribution.incrementWriteCount(updated);
|
||||||
|
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final int MAX_RANGE_MINUTES = 60;
|
||||||
|
|
||||||
|
private int resolveRangeMinutes(Instant now) {
|
||||||
|
Instant last = lastSuccessTime.get();
|
||||||
|
if (last == null) {
|
||||||
|
return Math.min(fallbackMinutes, MAX_RANGE_MINUTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
long elapsedMinutes = Duration.between(last, now).toMinutes();
|
||||||
|
int range = (int) Math.max(elapsedMinutes + 1, 1);
|
||||||
|
return Math.min(range, MAX_RANGE_MINUTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isValidImo(String imoVerified) {
|
||||||
|
return imoVerified != null && !imoVerified.isBlank() && !"0".equals(imoVerified);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,90 +0,0 @@
|
|||||||
package com.snp.batch.jobs.shipdetail.batch.config;
|
|
||||||
|
|
||||||
import com.snp.batch.common.batch.config.BaseJobConfig;
|
|
||||||
import com.snp.batch.jobs.shipdetail.batch.dto.TargetEnhancedDto;
|
|
||||||
import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity;
|
|
||||||
import com.snp.batch.jobs.shipdetail.batch.processor.ShipLastPositionDataProcessor;
|
|
||||||
import com.snp.batch.jobs.shipdetail.batch.reader.ShipLastPositionDataReader;
|
|
||||||
import com.snp.batch.jobs.shipdetail.batch.writer.ShipLastPositionDataWriter;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.batch.core.Job;
|
|
||||||
import org.springframework.batch.core.Step;
|
|
||||||
import org.springframework.batch.core.repository.JobRepository;
|
|
||||||
import org.springframework.batch.item.ItemProcessor;
|
|
||||||
import org.springframework.batch.item.ItemReader;
|
|
||||||
import org.springframework.batch.item.ItemWriter;
|
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
import org.springframework.jdbc.core.JdbcTemplate;
|
|
||||||
import org.springframework.transaction.PlatformTransactionManager;
|
|
||||||
import org.springframework.web.reactive.function.client.WebClient;
|
|
||||||
|
|
||||||
@Slf4j
|
|
||||||
@Configuration
|
|
||||||
public class ShipLastPositionUpdateJobConfig extends BaseJobConfig<TargetEnhancedDto, TargetEnhancedEntity> {
|
|
||||||
private final JdbcTemplate jdbcTemplate;
|
|
||||||
private final WebClient maritimeAisApiWebClient;
|
|
||||||
|
|
||||||
private final ShipLastPositionDataProcessor shipLastPositionDataProcessor;
|
|
||||||
|
|
||||||
private final ShipLastPositionDataWriter shipLastPositionDataWriter;
|
|
||||||
|
|
||||||
@Value("${app.batch.service-schema.name}")
|
|
||||||
private String targetSchema;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected int getChunkSize() {
|
|
||||||
return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정
|
|
||||||
}
|
|
||||||
public ShipLastPositionUpdateJobConfig(
|
|
||||||
JobRepository jobRepository,
|
|
||||||
PlatformTransactionManager transactionManager,
|
|
||||||
ShipLastPositionDataProcessor shipLastPositionDataProcessor,
|
|
||||||
ShipLastPositionDataWriter shipLastPositionDataWriter,
|
|
||||||
JdbcTemplate jdbcTemplate,
|
|
||||||
@Qualifier("maritimeAisApiWebClient")WebClient maritimeAisApiWebClient) {
|
|
||||||
super(jobRepository, transactionManager);
|
|
||||||
this.jdbcTemplate = jdbcTemplate;
|
|
||||||
this.maritimeAisApiWebClient = maritimeAisApiWebClient;
|
|
||||||
this.shipLastPositionDataProcessor = shipLastPositionDataProcessor;
|
|
||||||
this.shipLastPositionDataWriter = shipLastPositionDataWriter;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected String getJobName() {
|
|
||||||
return "shipLastPositionUpdateJob";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected String getStepName() {
|
|
||||||
return "shipLastPositionUpdateStep";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ItemReader<TargetEnhancedDto> createReader() {
|
|
||||||
return new ShipLastPositionDataReader(maritimeAisApiWebClient, jdbcTemplate, targetSchema);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ItemProcessor<TargetEnhancedDto, TargetEnhancedEntity> createProcessor() {
|
|
||||||
return shipLastPositionDataProcessor;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ItemWriter<TargetEnhancedEntity> createWriter() {
|
|
||||||
return shipLastPositionDataWriter;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean(name = "shipLastPositionUpdateJob")
|
|
||||||
public Job shipLastPositionUpdateJob() {
|
|
||||||
return job();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean(name = "shipLastPositionUpdateStep")
|
|
||||||
public Step shipLastPositionUpdateStep() {
|
|
||||||
return step();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@ -1,42 +0,0 @@
|
|||||||
package com.snp.batch.jobs.shipdetail.batch.dto;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.Builder;
|
|
||||||
import lombok.Data;
|
|
||||||
import lombok.NoArgsConstructor;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Maritime API GetShipsByIHSLRorIMONumbers 응답 래퍼
|
|
||||||
*
|
|
||||||
* API 응답 구조:
|
|
||||||
* {
|
|
||||||
* "shipCount": 5,
|
|
||||||
* "Ships": [...]
|
|
||||||
* }
|
|
||||||
*
|
|
||||||
* Maritime API GetShipsByIHSLRorIMONumbersAll 응답 래퍼
|
|
||||||
*
|
|
||||||
* API 응답 구조:
|
|
||||||
* {
|
|
||||||
* "shipCount": 5,
|
|
||||||
* "ShipResult": [...],
|
|
||||||
* "APSStatus": {...}
|
|
||||||
* }
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
@Data
|
|
||||||
@Builder
|
|
||||||
@NoArgsConstructor
|
|
||||||
@AllArgsConstructor
|
|
||||||
public class AisApiResponse {
|
|
||||||
|
|
||||||
@JsonProperty("ShipResult")
|
|
||||||
private List<ShipResultDto> shipResult;
|
|
||||||
|
|
||||||
@JsonProperty("targetEnhancedArr")
|
|
||||||
private List<TargetEnhancedDto> targetEnhancedArr;
|
|
||||||
|
|
||||||
}
|
|
||||||
@ -1,185 +0,0 @@
|
|||||||
package com.snp.batch.jobs.shipdetail.batch.dto;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.Builder;
|
|
||||||
import lombok.Data;
|
|
||||||
import lombok.NoArgsConstructor;
|
|
||||||
|
|
||||||
@Data
|
|
||||||
@Builder
|
|
||||||
@NoArgsConstructor
|
|
||||||
@AllArgsConstructor
|
|
||||||
public class TargetEnhancedDto {
|
|
||||||
|
|
||||||
@JsonProperty("DWT")
|
|
||||||
private Integer dwt;
|
|
||||||
|
|
||||||
@JsonProperty("STAT5CODE")
|
|
||||||
private String stat5Code;
|
|
||||||
|
|
||||||
@JsonProperty("Source")
|
|
||||||
private String source;
|
|
||||||
|
|
||||||
@JsonProperty("CoG")
|
|
||||||
private Double cog;
|
|
||||||
|
|
||||||
@JsonProperty("LastStaticUpdateReceived")
|
|
||||||
private String lastStaticUpdateReceived;
|
|
||||||
|
|
||||||
@JsonProperty("ZoneId")
|
|
||||||
private Integer zoneId;
|
|
||||||
|
|
||||||
@JsonProperty("LPCCode")
|
|
||||||
private String lpcCode;
|
|
||||||
|
|
||||||
@JsonProperty("TonnesCargo")
|
|
||||||
private Integer tonnesCargo;
|
|
||||||
|
|
||||||
@JsonProperty("StationId")
|
|
||||||
private String stationId;
|
|
||||||
|
|
||||||
@JsonProperty("InSTS")
|
|
||||||
private Object inSts; // JSON에서 null이므로 Object로 처리
|
|
||||||
|
|
||||||
@JsonProperty("ImoVerified")
|
|
||||||
private String imoVerified;
|
|
||||||
|
|
||||||
@JsonProperty("OnBerth")
|
|
||||||
private Object onBerth;
|
|
||||||
|
|
||||||
@JsonProperty("DestinationTidied")
|
|
||||||
private String destinationTidied;
|
|
||||||
|
|
||||||
@JsonProperty("Anomalous")
|
|
||||||
private Object anomalous; // JSON에서 null이므로 Object로 처리
|
|
||||||
|
|
||||||
@JsonProperty("MessageType")
|
|
||||||
private Object messageType; // JSON에서 null이므로 Object로 처리
|
|
||||||
|
|
||||||
@JsonProperty("DestinationPortID")
|
|
||||||
private Integer destinationPortID;
|
|
||||||
|
|
||||||
@JsonProperty("DestinationUNLOCODE")
|
|
||||||
private String destinationUnlocode;
|
|
||||||
|
|
||||||
@JsonProperty("MMSI")
|
|
||||||
private Integer mmsi;
|
|
||||||
|
|
||||||
@JsonProperty("IMO")
|
|
||||||
private Integer imo;
|
|
||||||
|
|
||||||
@JsonProperty("AgeMinutes")
|
|
||||||
private Double ageMinutes;
|
|
||||||
|
|
||||||
@JsonProperty("Lat")
|
|
||||||
private Double lat;
|
|
||||||
|
|
||||||
@JsonProperty("Lon")
|
|
||||||
private Double lon;
|
|
||||||
|
|
||||||
@JsonProperty("Heading")
|
|
||||||
private Double heading;
|
|
||||||
|
|
||||||
@JsonProperty("SoG")
|
|
||||||
private Double sog;
|
|
||||||
|
|
||||||
@JsonProperty("Width")
|
|
||||||
private Integer width;
|
|
||||||
|
|
||||||
@JsonProperty("Length")
|
|
||||||
private Integer length;
|
|
||||||
|
|
||||||
@JsonProperty("Draught")
|
|
||||||
private Double draught;
|
|
||||||
|
|
||||||
@JsonProperty("Name")
|
|
||||||
private String name;
|
|
||||||
|
|
||||||
@JsonProperty("Callsign")
|
|
||||||
private String callsign;
|
|
||||||
|
|
||||||
@JsonProperty("Destination")
|
|
||||||
private String destination;
|
|
||||||
|
|
||||||
@JsonProperty("ETA")
|
|
||||||
private String eta;
|
|
||||||
|
|
||||||
@JsonProperty("Status")
|
|
||||||
private String status;
|
|
||||||
|
|
||||||
@JsonProperty("VesselType")
|
|
||||||
private String vesselType;
|
|
||||||
|
|
||||||
@JsonProperty("ExtraInfo")
|
|
||||||
private String extraInfo;
|
|
||||||
|
|
||||||
@JsonProperty("PositionAccuracy")
|
|
||||||
private Integer positionAccuracy;
|
|
||||||
|
|
||||||
@JsonProperty("RoT")
|
|
||||||
private Integer rot;
|
|
||||||
|
|
||||||
@JsonProperty("TimestampUTC")
|
|
||||||
private Integer timestampUtc;
|
|
||||||
|
|
||||||
@JsonProperty("RepeatIndicator")
|
|
||||||
private Integer repeatIndicator;
|
|
||||||
|
|
||||||
@JsonProperty("RAIMFlag")
|
|
||||||
private Integer raimFlag;
|
|
||||||
|
|
||||||
@JsonProperty("RadioStatus")
|
|
||||||
private Integer radioStatus;
|
|
||||||
|
|
||||||
@JsonProperty("Regional")
|
|
||||||
private Integer regional;
|
|
||||||
|
|
||||||
@JsonProperty("Regional2")
|
|
||||||
private Integer regional2;
|
|
||||||
|
|
||||||
@JsonProperty("Spare")
|
|
||||||
private Integer spare;
|
|
||||||
|
|
||||||
@JsonProperty("Spare2")
|
|
||||||
private Integer spare2;
|
|
||||||
|
|
||||||
@JsonProperty("AISVersion")
|
|
||||||
private Integer aisVersion;
|
|
||||||
|
|
||||||
@JsonProperty("PositionFixType")
|
|
||||||
private Integer positionFixType;
|
|
||||||
|
|
||||||
@JsonProperty("DTE")
|
|
||||||
private Integer dte;
|
|
||||||
|
|
||||||
@JsonProperty("BandFlag")
|
|
||||||
private Integer bandFlag;
|
|
||||||
|
|
||||||
@JsonProperty("ReceivedDate")
|
|
||||||
private String receivedDate;
|
|
||||||
|
|
||||||
@JsonProperty("MessageTimestamp")
|
|
||||||
private String messageTimestamp;
|
|
||||||
|
|
||||||
@JsonProperty("LengthBow")
|
|
||||||
private Integer lengthBow;
|
|
||||||
|
|
||||||
@JsonProperty("LengthStern")
|
|
||||||
private Integer lengthStern;
|
|
||||||
|
|
||||||
@JsonProperty("WidthPort")
|
|
||||||
private Integer widthPort;
|
|
||||||
|
|
||||||
@JsonProperty("WidthStarboard")
|
|
||||||
private Integer widthStarboard;
|
|
||||||
|
|
||||||
// Getters and Setters (생략 - 필요에 따라 추가)
|
|
||||||
// 예시:
|
|
||||||
public Integer getDwt() {
|
|
||||||
return dwt;
|
|
||||||
}
|
|
||||||
public void setDwt(Integer dwt) {
|
|
||||||
this.dwt = dwt;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,128 +0,0 @@
|
|||||||
package com.snp.batch.jobs.shipdetail.batch.entity;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
|
||||||
import com.snp.batch.common.batch.entity.BaseEntity;
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.Data;
|
|
||||||
import lombok.EqualsAndHashCode;
|
|
||||||
import lombok.NoArgsConstructor;
|
|
||||||
import lombok.experimental.SuperBuilder;
|
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
|
|
||||||
@Data
|
|
||||||
@SuperBuilder
|
|
||||||
@NoArgsConstructor
|
|
||||||
@AllArgsConstructor
|
|
||||||
@EqualsAndHashCode(callSuper = true)
|
|
||||||
public class TargetEnhancedEntity extends BaseEntity {
|
|
||||||
|
|
||||||
private Integer dwt;
|
|
||||||
|
|
||||||
private String stat5Code;
|
|
||||||
|
|
||||||
private String source;
|
|
||||||
|
|
||||||
private Double cog;
|
|
||||||
|
|
||||||
private String lastStaticUpdateReceived;
|
|
||||||
|
|
||||||
private Integer zoneId;
|
|
||||||
|
|
||||||
private String lpcCode;
|
|
||||||
|
|
||||||
private Integer tonnesCargo;
|
|
||||||
|
|
||||||
private String stationId;
|
|
||||||
|
|
||||||
private Object inSts; // JSON에서 null이므로 Object로 처리
|
|
||||||
|
|
||||||
private String imoVerified;
|
|
||||||
|
|
||||||
private Object onBerth;
|
|
||||||
|
|
||||||
private String destinationTidied;
|
|
||||||
|
|
||||||
private Object anomalous; // JSON에서 null이므로 Object로 처리
|
|
||||||
|
|
||||||
private Object messageType; // JSON에서 null이므로 Object로 처리
|
|
||||||
|
|
||||||
private Integer destinationPortID;
|
|
||||||
|
|
||||||
private String destinationUnlocode;
|
|
||||||
|
|
||||||
private Integer mmsi;
|
|
||||||
|
|
||||||
private Integer imo;
|
|
||||||
|
|
||||||
private Double ageMinutes;
|
|
||||||
|
|
||||||
private Double lat;
|
|
||||||
|
|
||||||
private Double lon;
|
|
||||||
|
|
||||||
private Double heading;
|
|
||||||
|
|
||||||
private Double sog;
|
|
||||||
|
|
||||||
private Integer width;
|
|
||||||
|
|
||||||
private Integer length;
|
|
||||||
|
|
||||||
private Double draught;
|
|
||||||
|
|
||||||
private String name;
|
|
||||||
|
|
||||||
private String callsign;
|
|
||||||
|
|
||||||
private String destination;
|
|
||||||
|
|
||||||
private String eta;
|
|
||||||
|
|
||||||
private String status;
|
|
||||||
|
|
||||||
private String vesselType;
|
|
||||||
|
|
||||||
private String extraInfo;
|
|
||||||
|
|
||||||
private Integer positionAccuracy;
|
|
||||||
|
|
||||||
private Integer rot;
|
|
||||||
|
|
||||||
private Integer timestampUtc;
|
|
||||||
|
|
||||||
private Integer repeatIndicator;
|
|
||||||
|
|
||||||
private Integer raimFlag;
|
|
||||||
|
|
||||||
private Integer radioStatus;
|
|
||||||
|
|
||||||
private Integer regional;
|
|
||||||
|
|
||||||
private Integer regional2;
|
|
||||||
|
|
||||||
private Integer spare;
|
|
||||||
|
|
||||||
private Integer spare2;
|
|
||||||
|
|
||||||
private Integer aisVersion;
|
|
||||||
|
|
||||||
private Integer positionFixType;
|
|
||||||
|
|
||||||
private Integer dte;
|
|
||||||
|
|
||||||
private Integer bandFlag;
|
|
||||||
|
|
||||||
private String receivedDate;
|
|
||||||
|
|
||||||
private String messageTimestamp;
|
|
||||||
|
|
||||||
private Integer lengthBow;
|
|
||||||
|
|
||||||
private Integer lengthStern;
|
|
||||||
|
|
||||||
private Integer widthPort;
|
|
||||||
|
|
||||||
private Integer widthStarboard;
|
|
||||||
|
|
||||||
}
|
|
||||||
@ -1,83 +0,0 @@
|
|||||||
package com.snp.batch.jobs.shipdetail.batch.processor;
|
|
||||||
|
|
||||||
import com.snp.batch.common.batch.processor.BaseProcessor;
|
|
||||||
import com.snp.batch.jobs.shipdetail.batch.dto.TargetEnhancedDto;
|
|
||||||
import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
@Slf4j
|
|
||||||
@Component
|
|
||||||
public class ShipLastPositionDataProcessor extends BaseProcessor<TargetEnhancedDto, TargetEnhancedEntity> {
|
|
||||||
@Override
|
|
||||||
protected TargetEnhancedEntity processItem(TargetEnhancedDto dto) throws Exception {
|
|
||||||
log.debug("AIS 최신 항적 데이터 처리 시작: imoNumber={}, shipName={}",
|
|
||||||
dto.getImoVerified(), dto.getName());
|
|
||||||
|
|
||||||
TargetEnhancedEntity entity = TargetEnhancedEntity.builder()
|
|
||||||
.dwt(dto.getDwt())
|
|
||||||
.stat5Code(dto.getStat5Code())
|
|
||||||
.source(dto.getSource())
|
|
||||||
.lpcCode(dto.getLpcCode())
|
|
||||||
.tonnesCargo(dto.getTonnesCargo())
|
|
||||||
.imoVerified(dto.getImoVerified())
|
|
||||||
.lastStaticUpdateReceived(dto.getLastStaticUpdateReceived()) // Entity 필드 타입이 String인 경우
|
|
||||||
// Position and Movement Data
|
|
||||||
.cog(dto.getCog())
|
|
||||||
.zoneId(dto.getZoneId())
|
|
||||||
.stationId(dto.getStationId())
|
|
||||||
.onBerth(dto.getOnBerth())
|
|
||||||
.mmsi(dto.getMmsi())
|
|
||||||
.imo(dto.getImo())
|
|
||||||
.ageMinutes(dto.getAgeMinutes())
|
|
||||||
.lat(dto.getLat())
|
|
||||||
.lon(dto.getLon())
|
|
||||||
.heading(dto.getHeading())
|
|
||||||
.sog(dto.getSog())
|
|
||||||
.positionAccuracy(dto.getPositionAccuracy())
|
|
||||||
.rot(dto.getRot())
|
|
||||||
.timestampUtc(dto.getTimestampUtc())
|
|
||||||
.status(dto.getStatus())
|
|
||||||
// Dimensions and Draught
|
|
||||||
.width(dto.getWidth())
|
|
||||||
.length(dto.getLength())
|
|
||||||
.draught(dto.getDraught())
|
|
||||||
.lengthBow(dto.getLengthBow())
|
|
||||||
.lengthStern(dto.getLengthStern())
|
|
||||||
.widthPort(dto.getWidthPort())
|
|
||||||
.widthStarboard(dto.getWidthStarboard())
|
|
||||||
// Voyage Data
|
|
||||||
.name(dto.getName())
|
|
||||||
.callsign(dto.getCallsign())
|
|
||||||
.destination(dto.getDestination())
|
|
||||||
.destinationTidied(dto.getDestinationTidied())
|
|
||||||
.destinationPortID(dto.getDestinationPortID())
|
|
||||||
.destinationUnlocode(dto.getDestinationUnlocode())
|
|
||||||
.eta(dto.getEta()) // Entity 필드 타입이 String인 경우
|
|
||||||
.vesselType(dto.getVesselType())
|
|
||||||
.extraInfo(dto.getExtraInfo())
|
|
||||||
// AIS Metadata Fields
|
|
||||||
.repeatIndicator(dto.getRepeatIndicator())
|
|
||||||
.raimFlag(dto.getRaimFlag())
|
|
||||||
.radioStatus(dto.getRadioStatus())
|
|
||||||
.regional(dto.getRegional())
|
|
||||||
.regional2(dto.getRegional2())
|
|
||||||
.spare(dto.getSpare())
|
|
||||||
.spare2(dto.getSpare2())
|
|
||||||
.aisVersion(dto.getAisVersion())
|
|
||||||
.positionFixType(dto.getPositionFixType())
|
|
||||||
.dte(dto.getDte())
|
|
||||||
.bandFlag(dto.getBandFlag())
|
|
||||||
.receivedDate(dto.getReceivedDate()) // Entity 필드 타입이 String인 경우
|
|
||||||
.messageTimestamp(dto.getMessageTimestamp()) // Entity 필드 타입이 String인 경우
|
|
||||||
// Null/Object Fields
|
|
||||||
.inSts(dto.getInSts())
|
|
||||||
.anomalous(dto.getAnomalous())
|
|
||||||
.messageType(dto.getMessageType())
|
|
||||||
.build();
|
|
||||||
|
|
||||||
log.debug("AIS 최신 항적 데이터 처리 완료: imoNumber={}", dto.getImoVerified());
|
|
||||||
|
|
||||||
return entity;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,155 +0,0 @@
|
|||||||
package com.snp.batch.jobs.shipdetail.batch.reader;
|
|
||||||
|
|
||||||
import com.snp.batch.common.batch.reader.BaseApiReader;
|
|
||||||
import com.snp.batch.jobs.shipdetail.batch.dto.AisApiResponse;
|
|
||||||
import com.snp.batch.jobs.shipdetail.batch.dto.TargetEnhancedDto;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.jdbc.core.JdbcTemplate;
|
|
||||||
import org.springframework.web.reactive.function.client.WebClient;
|
|
||||||
|
|
||||||
import java.util.*;
|
|
||||||
@Slf4j
|
|
||||||
public class ShipLastPositionDataReader extends BaseApiReader<TargetEnhancedDto> {
|
|
||||||
|
|
||||||
//TODO :
|
|
||||||
// 1. Core20 IMO_NUMBER 전체 조회
|
|
||||||
// 2. IMO번호에 대한 마지막 AIS 신호 요청 (1회 최대 5000개 : Chunk 단위로 반복)
|
|
||||||
// 3. Response Data -> Core20에 업데이트 (Chunk 단위로 반복)
|
|
||||||
|
|
||||||
private final JdbcTemplate jdbcTemplate;
|
|
||||||
private final String targetSchema;
|
|
||||||
|
|
||||||
private List<String> allImoNumbers;
|
|
||||||
private int currentBatchIndex = 0;
|
|
||||||
private final int batchSize = 5000;
|
|
||||||
|
|
||||||
public ShipLastPositionDataReader(WebClient webClient, JdbcTemplate jdbcTemplate, String targetSchema) {
|
|
||||||
super(webClient);
|
|
||||||
this.jdbcTemplate = jdbcTemplate;
|
|
||||||
this.targetSchema = targetSchema;
|
|
||||||
enableChunkMode(); // ✨ Chunk 모드 활성화
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected String getReaderName() {
|
|
||||||
return "ShipLastPositionDataReader";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void resetCustomState() {
|
|
||||||
this.currentBatchIndex = 0;
|
|
||||||
this.allImoNumbers = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected String getApiPath() {
|
|
||||||
return "/AisSvc.svc/AIS/GetTargetsByIMOsEnhanced";
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getTargetTable(){
|
|
||||||
return targetSchema + ".tb_ship_main_info";
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getImoQuery() {
|
|
||||||
return "SELECT imo_no FROM " + getTargetTable() + " ORDER BY imo_no";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void beforeFetch(){
|
|
||||||
log.info("[{}] Core20 테이블에서 IMO 번호 조회 시작...", getReaderName());
|
|
||||||
|
|
||||||
allImoNumbers = jdbcTemplate.queryForList(getImoQuery(), String.class);
|
|
||||||
|
|
||||||
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
|
|
||||||
|
|
||||||
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
|
|
||||||
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
|
|
||||||
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
|
|
||||||
|
|
||||||
updateApiCallStats(totalBatches, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected List<TargetEnhancedDto> fetchNextBatch() throws Exception {
|
|
||||||
// 모든 배치 처리 완료 확인
|
|
||||||
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
|
|
||||||
return null; // Job 종료
|
|
||||||
}
|
|
||||||
|
|
||||||
// 현재 배치의 시작/끝 인덱스 계산
|
|
||||||
int startIndex = currentBatchIndex;
|
|
||||||
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
|
|
||||||
|
|
||||||
// 현재 배치의 IMO 번호 추출 (100개)
|
|
||||||
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
|
|
||||||
|
|
||||||
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
|
|
||||||
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
|
|
||||||
|
|
||||||
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
|
|
||||||
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
|
|
||||||
|
|
||||||
try {
|
|
||||||
// IMO 번호를 쉼표로 연결 (예: "1000019,1000021,1000033,...")
|
|
||||||
String imoParam = String.join(",", currentBatch);
|
|
||||||
|
|
||||||
// API 호출
|
|
||||||
AisApiResponse response = callAisApiWithBatch(imoParam);
|
|
||||||
|
|
||||||
// 다음 배치로 인덱스 이동
|
|
||||||
currentBatchIndex = endIndex;
|
|
||||||
|
|
||||||
// 응답 처리
|
|
||||||
if (response != null && response.getTargetEnhancedArr() != null) {
|
|
||||||
List<TargetEnhancedDto> targets = response.getTargetEnhancedArr();
|
|
||||||
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
|
|
||||||
getReaderName(), currentBatchNumber, totalBatches, targets.size());
|
|
||||||
|
|
||||||
// API 호출 통계 업데이트
|
|
||||||
updateApiCallStats(totalBatches, currentBatchNumber);
|
|
||||||
|
|
||||||
// API 과부하 방지 (다음 배치 전 0.5초 대기)
|
|
||||||
if (currentBatchIndex < allImoNumbers.size()) {
|
|
||||||
Thread.sleep(500);
|
|
||||||
}
|
|
||||||
|
|
||||||
return targets;
|
|
||||||
} else {
|
|
||||||
log.warn("[{}] 배치 {}/{} 응답 없음",
|
|
||||||
getReaderName(), currentBatchNumber, totalBatches);
|
|
||||||
|
|
||||||
// API 호출 통계 업데이트 (실패도 카운트)
|
|
||||||
updateApiCallStats(totalBatches, currentBatchNumber);
|
|
||||||
|
|
||||||
return Collections.emptyList();
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
|
|
||||||
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
|
|
||||||
|
|
||||||
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
|
|
||||||
currentBatchIndex = endIndex;
|
|
||||||
|
|
||||||
// 빈 리스트 반환 (Job 계속 진행)
|
|
||||||
return Collections.emptyList();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private AisApiResponse callAisApiWithBatch(String imoNumbers) {
|
|
||||||
String url = getApiPath();
|
|
||||||
|
|
||||||
Map<String, String> requestBody = Map.of("imo", imoNumbers);
|
|
||||||
|
|
||||||
log.debug("[{}] API 호출: {}", getReaderName(), url);
|
|
||||||
log.debug("[{}] : {}", "requestBody", requestBody.toString());
|
|
||||||
|
|
||||||
return webClient.post()
|
|
||||||
.uri(url)
|
|
||||||
.bodyValue(requestBody)
|
|
||||||
.retrieve()
|
|
||||||
.bodyToMono(AisApiResponse.class)
|
|
||||||
.block();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@ -1,9 +0,0 @@
|
|||||||
package com.snp.batch.jobs.shipdetail.batch.repository;
|
|
||||||
|
|
||||||
import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
public interface ShipLastPositionRepository {
|
|
||||||
void saveLastPositionAll(List<TargetEnhancedEntity> items);
|
|
||||||
}
|
|
||||||
@ -1,125 +0,0 @@
|
|||||||
package com.snp.batch.jobs.shipdetail.batch.repository;
|
|
||||||
|
|
||||||
import com.snp.batch.common.batch.repository.BaseJdbcRepository;
|
|
||||||
import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
import org.springframework.jdbc.core.JdbcTemplate;
|
|
||||||
import org.springframework.jdbc.core.RowMapper;
|
|
||||||
import org.springframework.stereotype.Repository;
|
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
|
||||||
|
|
||||||
import java.sql.PreparedStatement;
|
|
||||||
import java.sql.Types;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
@Slf4j
|
|
||||||
@Repository("shipLastPositionRepository")
|
|
||||||
public class ShipLastPositionRepositoryImpl extends BaseJdbcRepository<TargetEnhancedEntity, Long> implements ShipLastPositionRepository {
|
|
||||||
|
|
||||||
@Value("${app.batch.service-schema.name}")
|
|
||||||
private String targetSchema;
|
|
||||||
|
|
||||||
@Value("${app.batch.service-schema.tables.service-001}")
|
|
||||||
private String tableName;
|
|
||||||
|
|
||||||
public ShipLastPositionRepositoryImpl(JdbcTemplate jdbcTemplate) {
|
|
||||||
super(jdbcTemplate);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected String getTargetSchema() {
|
|
||||||
return targetSchema;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected String getSimpleTableName() {
|
|
||||||
return tableName;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected RowMapper<TargetEnhancedEntity> getRowMapper() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Long extractId(TargetEnhancedEntity entity) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected String getInsertSql() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected String getUpdateSql() {
|
|
||||||
return """
|
|
||||||
UPDATE %s
|
|
||||||
SET last_cptr_hr_utc = ?::timestamptz,
|
|
||||||
last_port = ?,
|
|
||||||
now_position_lat = ?,
|
|
||||||
now_position_lon = ?,
|
|
||||||
ship_dest = ?,
|
|
||||||
arvl_prnmnt_hr = ?::timestamptz,
|
|
||||||
bow_drctn = ?,
|
|
||||||
cog = ?,
|
|
||||||
sog = ?,
|
|
||||||
ship_nav_status = ?,
|
|
||||||
cargo_ton = ?,
|
|
||||||
add_info = ?,
|
|
||||||
in_sts = ?,
|
|
||||||
ancrg_yn = ?
|
|
||||||
WHERE imo_no = ?;
|
|
||||||
""".formatted(getTableName());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void setInsertParameters(PreparedStatement ps, TargetEnhancedEntity entity) throws Exception {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void setUpdateParameters(PreparedStatement ps, TargetEnhancedEntity entity) throws Exception {
|
|
||||||
int idx = 1;
|
|
||||||
ps.setString(idx++, entity.getReceivedDate()); // receivedDate : lastseen (Datetime)
|
|
||||||
ps.setString(idx++, entity.getLpcCode()); // lpcCode : lastport (String)
|
|
||||||
ps.setDouble(idx++, entity.getLat()); // lat : position_latitude (Double)
|
|
||||||
ps.setDouble(idx++, entity.getLon()); // lon : position_longitude (Double)
|
|
||||||
ps.setString(idx++, entity.getDestination()); // destination : destination (String)
|
|
||||||
ps.setString(idx++, entity.getEta()); // eta : eta (Datetime)
|
|
||||||
ps.setDouble(idx++, entity.getHeading()); // heading : heading (Double)
|
|
||||||
ps.setDouble(idx++, entity.getSog()); // sog : speedservice (Double)
|
|
||||||
ps.setDouble(idx++, entity.getCog()); // cog : cog (Double)
|
|
||||||
ps.setString(idx++, entity.getStatus()); // status : navstat (String)
|
|
||||||
ps.setInt(idx++, entity.getTonnesCargo()); // status : tonnes_cargo (Int)
|
|
||||||
ps.setString(idx++, entity.getExtraInfo()); // status : extra_info (String)
|
|
||||||
ps.setObject(idx++, entity.getInSts(), Types.INTEGER);
|
|
||||||
ps.setObject(idx++, entity.getOnBerth(), Types.BOOLEAN);
|
|
||||||
ps.setString(idx++, entity.getImoVerified()); // imoVerified : imo (String)
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected String getEntityName() {
|
|
||||||
return "TargetEnhancedEntity";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@Transactional
|
|
||||||
public void saveLastPositionAll(List<TargetEnhancedEntity> items) {
|
|
||||||
if (items == null || items.isEmpty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
jdbcTemplate.batchUpdate(getUpdateSql(), items, items.size(),
|
|
||||||
(ps, entity) -> {
|
|
||||||
try {
|
|
||||||
setUpdateParameters(ps, entity);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("배치 수정 파라미터 설정 실패", e);
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
log.info("{} 전체 저장 완료: 수정={} 건", getEntityName(), items.size());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,24 +0,0 @@
|
|||||||
package com.snp.batch.jobs.shipdetail.batch.writer;
|
|
||||||
|
|
||||||
import com.snp.batch.common.batch.writer.BaseWriter;
|
|
||||||
import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity;
|
|
||||||
import com.snp.batch.jobs.shipdetail.batch.repository.ShipLastPositionRepository;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
@Slf4j
|
|
||||||
@Component
|
|
||||||
public class ShipLastPositionDataWriter extends BaseWriter<TargetEnhancedEntity> {
|
|
||||||
|
|
||||||
private final ShipLastPositionRepository shipLastPositionRepository;
|
|
||||||
public ShipLastPositionDataWriter(ShipLastPositionRepository shipLastPositionRepository) {
|
|
||||||
super("ShipLastPosition");
|
|
||||||
this.shipLastPositionRepository = shipLastPositionRepository;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void writeItems(List<TargetEnhancedEntity> items) throws Exception {
|
|
||||||
shipLastPositionRepository.saveLastPositionAll(items);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -134,10 +134,10 @@ app:
|
|||||||
|
|
||||||
# Core20 캐시 테이블 설정 (환경별로 테이블/컬럼명이 다를 수 있음)
|
# Core20 캐시 테이블 설정 (환경별로 테이블/컬럼명이 다를 수 있음)
|
||||||
core20:
|
core20:
|
||||||
schema: t_std_snp_data # 스키마명
|
schema: new_snp # 스키마명
|
||||||
table: tb_ship_info_mst # 테이블명
|
table: core20 # 테이블명
|
||||||
imo-column: imo_no # IMO/LRNO 컬럼명 (PK, NOT NULL)
|
imo-column: lrno # IMO/LRNO 컬럼명 (PK, NOT NULL)
|
||||||
mmsi-column: mmsi_no # MMSI 컬럼명 (NULLABLE)
|
mmsi-column: mmsi # MMSI 컬럼명 (NULLABLE)
|
||||||
|
|
||||||
# 파티션 관리 설정
|
# 파티션 관리 설정
|
||||||
partition:
|
partition:
|
||||||
|
|||||||
불러오는 중...
Reference in New Issue
Block a user