feat(batch): 선박 마지막 위치 업데이트 프로세스 변경 #7
@ -1,6 +1,7 @@
|
||||
package com.snp.batch.jobs.aistargetdbsync.batch.config;
|
||||
|
||||
import com.snp.batch.jobs.aistargetdbsync.batch.tasklet.AisTargetDbSyncTasklet;
|
||||
import com.snp.batch.jobs.aistargetdbsync.batch.tasklet.ShipLastPositionSyncTasklet;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.JobExecution;
|
||||
@ -20,13 +21,14 @@ import org.springframework.transaction.PlatformTransactionManager;
|
||||
* API: 없음 (캐시 기반)
|
||||
*
|
||||
* 동작:
|
||||
* - Caffeine 캐시에서 최근 15분 이내 데이터 조회
|
||||
* - MMSI별 최신 위치 1건씩 DB에 UPSERT
|
||||
* - 1분 주기 aisTargetImportJob과 분리하여 DB 볼륨 최적화
|
||||
* - Step 1: Caffeine 캐시에서 최근 15분 이내 데이터 조회 → ais_target DB UPSERT
|
||||
* - Step 2: 캐시에서 IMO별 최신 위치 조회 → tb_ship_main_info 위치 컬럼 UPDATE
|
||||
*
|
||||
* 데이터 흐름:
|
||||
* - aisTargetImportJob (1분): API → 캐시 업데이트
|
||||
* - aisTargetDbSyncJob (15분): 캐시 → DB 저장 (이 Job)
|
||||
* - Step 1: 캐시 → ais_target UPSERT
|
||||
* - Step 2: 캐시 → tb_ship_main_info UPDATE (IMO별 최신 1건)
|
||||
*/
|
||||
@Slf4j
|
||||
@Configuration
|
||||
@ -35,14 +37,17 @@ public class AisTargetDbSyncJobConfig {
|
||||
private final JobRepository jobRepository;
|
||||
private final PlatformTransactionManager transactionManager;
|
||||
private final AisTargetDbSyncTasklet aisTargetDbSyncTasklet;
|
||||
private final ShipLastPositionSyncTasklet shipLastPositionSyncTasklet;
|
||||
|
||||
public AisTargetDbSyncJobConfig(
|
||||
JobRepository jobRepository,
|
||||
PlatformTransactionManager transactionManager,
|
||||
AisTargetDbSyncTasklet aisTargetDbSyncTasklet) {
|
||||
AisTargetDbSyncTasklet aisTargetDbSyncTasklet,
|
||||
ShipLastPositionSyncTasklet shipLastPositionSyncTasklet) {
|
||||
this.jobRepository = jobRepository;
|
||||
this.transactionManager = transactionManager;
|
||||
this.aisTargetDbSyncTasklet = aisTargetDbSyncTasklet;
|
||||
this.shipLastPositionSyncTasklet = shipLastPositionSyncTasklet;
|
||||
}
|
||||
|
||||
@Bean(name = "aisTargetDbSyncStep")
|
||||
@ -52,6 +57,13 @@ public class AisTargetDbSyncJobConfig {
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean(name = "shipLastPositionSyncStep")
|
||||
public Step shipLastPositionSyncStep() {
|
||||
return new StepBuilder("shipLastPositionSyncStep", jobRepository)
|
||||
.tasklet(shipLastPositionSyncTasklet, transactionManager)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean(name = "aisTargetDbSyncJob")
|
||||
public Job aisTargetDbSyncJob() {
|
||||
log.info("Job 생성: aisTargetDbSyncJob");
|
||||
@ -74,6 +86,7 @@ public class AisTargetDbSyncJobConfig {
|
||||
}
|
||||
})
|
||||
.start(aisTargetDbSyncStep())
|
||||
.next(shipLastPositionSyncStep())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,15 @@
|
||||
package com.snp.batch.jobs.aistargetdbsync.batch.repository;
|
||||
|
||||
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 캐시 기반 Ship Last Position 동기화 Repository
|
||||
*
|
||||
* AisTargetEntity(캐시 데이터)를 받아 tb_ship_main_info의 위치/항해 컬럼을 UPDATE
|
||||
*/
|
||||
public interface ShipLastPositionSyncRepository {
|
||||
|
||||
int updateLastPositions(List<AisTargetEntity> entities);
|
||||
}
|
||||
@ -0,0 +1,137 @@
|
||||
package com.snp.batch.jobs.aistargetdbsync.batch.repository;
|
||||
|
||||
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.Types;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 캐시 기반 Ship Last Position 동기화 Repository 구현
|
||||
*
|
||||
* AisTargetEntity의 필드를 tb_ship_main_info 컬럼에 매핑하여 UPDATE 수행.
|
||||
* 기존 ShipLastPositionRepositoryImpl과 동일한 SQL을 사용하되,
|
||||
* AisTargetEntity의 타입(OffsetDateTime, Integer 등)에 맞게 변환 처리.
|
||||
*/
|
||||
@Slf4j
|
||||
@Repository("shipLastPositionSyncRepository")
|
||||
public class ShipLastPositionSyncRepositoryImpl implements ShipLastPositionSyncRepository {
|
||||
|
||||
private static final int BATCH_SIZE = 1000;
|
||||
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
|
||||
@Value("${app.batch.service-schema.name}")
|
||||
private String targetSchema;
|
||||
|
||||
@Value("${app.batch.service-schema.tables.service-001}")
|
||||
private String tableName;
|
||||
|
||||
public ShipLastPositionSyncRepositoryImpl(JdbcTemplate jdbcTemplate) {
|
||||
this.jdbcTemplate = jdbcTemplate;
|
||||
}
|
||||
|
||||
private String getTableName() {
|
||||
return targetSchema + "." + tableName;
|
||||
}
|
||||
|
||||
private String getUpdateSql() {
|
||||
return """
|
||||
UPDATE %s
|
||||
SET last_cptr_hr_utc = ?::timestamptz,
|
||||
last_port = ?,
|
||||
now_position_lat = ?,
|
||||
now_position_lon = ?,
|
||||
ship_dest = ?,
|
||||
arvl_prnmnt_hr = ?::timestamptz,
|
||||
bow_drctn = ?,
|
||||
cog = ?,
|
||||
sog = ?,
|
||||
ship_nav_status = ?,
|
||||
cargo_ton = ?,
|
||||
add_info = ?,
|
||||
in_sts = ?,
|
||||
ancrg_yn = ?
|
||||
WHERE imo_no = ?
|
||||
""".formatted(getTableName());
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public int updateLastPositions(List<AisTargetEntity> entities) {
|
||||
if (entities == null || entities.isEmpty()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
String sql = getUpdateSql();
|
||||
int totalUpdated = 0;
|
||||
|
||||
int[][] batchResults = jdbcTemplate.batchUpdate(sql, entities, BATCH_SIZE,
|
||||
(ps, entity) -> setUpdateParameters(ps, entity));
|
||||
|
||||
for (int[] batchResult : batchResults) {
|
||||
for (int result : batchResult) {
|
||||
if (result > 0) {
|
||||
totalUpdated += result;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Ship Last Position 동기화 완료: 대상={} 건, 갱신={} 건", entities.size(), totalUpdated);
|
||||
return totalUpdated;
|
||||
}
|
||||
|
||||
private void setUpdateParameters(PreparedStatement ps, AisTargetEntity entity) throws java.sql.SQLException {
|
||||
int idx = 1;
|
||||
|
||||
// last_cptr_hr_utc ← receivedDate (OffsetDateTime → String for ::timestamptz)
|
||||
ps.setString(idx++, entity.getReceivedDate() != null ? entity.getReceivedDate().toString() : null);
|
||||
|
||||
// last_port ← lpcCode (Integer → String)
|
||||
ps.setString(idx++, entity.getLpcCode() != null ? String.valueOf(entity.getLpcCode()) : null);
|
||||
|
||||
// now_position_lat ← lat (Double, null-safe)
|
||||
ps.setObject(idx++, entity.getLat(), Types.DOUBLE);
|
||||
|
||||
// now_position_lon ← lon (Double, null-safe)
|
||||
ps.setObject(idx++, entity.getLon(), Types.DOUBLE);
|
||||
|
||||
// ship_dest ← destination (String)
|
||||
ps.setString(idx++, entity.getDestination());
|
||||
|
||||
// arvl_prnmnt_hr ← eta (OffsetDateTime → String for ::timestamptz)
|
||||
ps.setString(idx++, entity.getEta() != null ? entity.getEta().toString() : null);
|
||||
|
||||
// bow_drctn ← heading (Double, null-safe)
|
||||
ps.setObject(idx++, entity.getHeading(), Types.DOUBLE);
|
||||
|
||||
// cog ← cog (Double, null-safe)
|
||||
ps.setObject(idx++, entity.getCog(), Types.DOUBLE);
|
||||
|
||||
// sog ← sog (Double, null-safe)
|
||||
ps.setObject(idx++, entity.getSog(), Types.DOUBLE);
|
||||
|
||||
// ship_nav_status ← status (String)
|
||||
ps.setString(idx++, entity.getStatus());
|
||||
|
||||
// cargo_ton ← tonnesCargo (Integer, null-safe)
|
||||
ps.setObject(idx++, entity.getTonnesCargo(), Types.INTEGER);
|
||||
|
||||
// add_info ← extraInfo (String)
|
||||
ps.setString(idx++, entity.getExtraInfo());
|
||||
|
||||
// in_sts ← inSTS (Integer, null-safe)
|
||||
ps.setObject(idx++, entity.getInSTS(), Types.INTEGER);
|
||||
|
||||
// ancrg_yn ← onBerth (Boolean, null-safe)
|
||||
ps.setObject(idx++, entity.getOnBerth(), Types.BOOLEAN);
|
||||
|
||||
// WHERE imo_no ← imoVerified (String)
|
||||
ps.setString(idx++, entity.getImoVerified());
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,133 @@
|
||||
package com.snp.batch.jobs.aistargetdbsync.batch.tasklet;
|
||||
|
||||
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
||||
import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager;
|
||||
import com.snp.batch.jobs.aistargetdbsync.batch.repository.ShipLastPositionSyncRepository;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.core.StepContribution;
|
||||
import org.springframework.batch.core.scope.context.ChunkContext;
|
||||
import org.springframework.batch.core.step.tasklet.Tasklet;
|
||||
import org.springframework.batch.repeat.RepeatStatus;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Ship Last Position Sync Tasklet
|
||||
*
|
||||
* 동작:
|
||||
* - Caffeine 캐시에서 마지막 성공 이후 ~ 현재까지의 AIS 데이터를 조회
|
||||
* - imoVerified가 유효한 항목만 필터링
|
||||
* - IMO별 messageTimestamp 최신 1건만 선택
|
||||
* - tb_ship_main_info의 위치/항해 14개 컬럼을 UPDATE
|
||||
*
|
||||
* 시간 범위 결정 전략:
|
||||
* - AisTargetDbSyncTasklet과 동일 (lastSuccessTime 기반, fallback 사용)
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class ShipLastPositionSyncTasklet implements Tasklet {
|
||||
|
||||
private final AisTargetCacheManager cacheManager;
|
||||
private final ShipLastPositionSyncRepository repository;
|
||||
private final int fallbackMinutes;
|
||||
|
||||
private final AtomicReference<Instant> lastSuccessTime = new AtomicReference<>();
|
||||
|
||||
public ShipLastPositionSyncTasklet(
|
||||
AisTargetCacheManager cacheManager,
|
||||
ShipLastPositionSyncRepository repository,
|
||||
@Value("${app.batch.ais-target-db-sync.time-range-minutes:15}") int fallbackMinutes) {
|
||||
this.cacheManager = cacheManager;
|
||||
this.repository = repository;
|
||||
this.fallbackMinutes = fallbackMinutes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
|
||||
Instant now = Instant.now();
|
||||
int rangeMinutes = resolveRangeMinutes(now);
|
||||
|
||||
log.info("========================================");
|
||||
log.info("Ship Last Position Sync 시작");
|
||||
log.info("조회 범위: 최근 {}분 (방식: {})", rangeMinutes,
|
||||
lastSuccessTime.get() != null ? "마지막 성공 기준" : "fallback");
|
||||
log.info("========================================");
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
// 1. 캐시에서 시간 범위 내 데이터 조회
|
||||
List<AisTargetEntity> entities = cacheManager.getByTimeRange(rangeMinutes);
|
||||
|
||||
if (entities.isEmpty()) {
|
||||
log.warn("캐시에서 조회된 데이터가 없습니다 (범위: {}분)", rangeMinutes);
|
||||
lastSuccessTime.set(now);
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
|
||||
// 2. imoVerified가 유효한 항목만 필터
|
||||
List<AisTargetEntity> imoFiltered = entities.stream()
|
||||
.filter(e -> isValidImo(e.getImoVerified()))
|
||||
.toList();
|
||||
|
||||
if (imoFiltered.isEmpty()) {
|
||||
log.info("유효한 IMO가 있는 데이터가 없습니다 (전체: {} 건)", entities.size());
|
||||
lastSuccessTime.set(now);
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
|
||||
// 3. IMO별 messageTimestamp 최신 1건만 선택
|
||||
List<AisTargetEntity> latestPerImo = imoFiltered.stream()
|
||||
.collect(Collectors.groupingBy(AisTargetEntity::getImoVerified))
|
||||
.values().stream()
|
||||
.map(group -> group.stream()
|
||||
.max(Comparator.comparing(
|
||||
AisTargetEntity::getMessageTimestamp,
|
||||
Comparator.nullsFirst(Comparator.naturalOrder())))
|
||||
.orElseThrow())
|
||||
.toList();
|
||||
|
||||
log.info("캐시 조회: {} 건 → IMO 유효: {} 건 → IMO별 최신: {} 건",
|
||||
entities.size(), imoFiltered.size(), latestPerImo.size());
|
||||
|
||||
// 4. DB UPDATE
|
||||
int updated = repository.updateLastPositions(latestPerImo);
|
||||
|
||||
long elapsed = System.currentTimeMillis() - startTime;
|
||||
|
||||
lastSuccessTime.set(now);
|
||||
|
||||
log.info("========================================");
|
||||
log.info("Ship Last Position Sync 완료");
|
||||
log.info("대상: {} 건, 갱신: {} 건", latestPerImo.size(), updated);
|
||||
log.info("소요 시간: {}ms", elapsed);
|
||||
log.info("========================================");
|
||||
|
||||
contribution.incrementWriteCount(updated);
|
||||
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
|
||||
private static final int MAX_RANGE_MINUTES = 60;
|
||||
|
||||
private int resolveRangeMinutes(Instant now) {
|
||||
Instant last = lastSuccessTime.get();
|
||||
if (last == null) {
|
||||
return Math.min(fallbackMinutes, MAX_RANGE_MINUTES);
|
||||
}
|
||||
|
||||
long elapsedMinutes = Duration.between(last, now).toMinutes();
|
||||
int range = (int) Math.max(elapsedMinutes + 1, 1);
|
||||
return Math.min(range, MAX_RANGE_MINUTES);
|
||||
}
|
||||
|
||||
private boolean isValidImo(String imoVerified) {
|
||||
return imoVerified != null && !imoVerified.isBlank() && !"0".equals(imoVerified);
|
||||
}
|
||||
}
|
||||
불러오는 중...
Reference in New Issue
Block a user