From b31b67183dfe1ca0948dac974b958b92bf19c2d4 Mon Sep 17 00:00:00 2001 From: hyojin kim Date: Tue, 24 Feb 2026 13:50:51 +0900 Subject: [PATCH] feat: ship last position update step add --- .../config/AisTargetDbSyncJobConfig.java | 21 ++- .../ShipLastPositionSyncRepository.java | 15 ++ .../ShipLastPositionSyncRepositoryImpl.java | 137 ++++++++++++++++++ .../tasklet/ShipLastPositionSyncTasklet.java | 133 +++++++++++++++++ 4 files changed, 302 insertions(+), 4 deletions(-) create mode 100644 src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/repository/ShipLastPositionSyncRepository.java create mode 100644 src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/repository/ShipLastPositionSyncRepositoryImpl.java create mode 100644 src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/tasklet/ShipLastPositionSyncTasklet.java diff --git a/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/config/AisTargetDbSyncJobConfig.java b/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/config/AisTargetDbSyncJobConfig.java index 48e723a..bd32662 100644 --- a/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/config/AisTargetDbSyncJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/config/AisTargetDbSyncJobConfig.java @@ -1,6 +1,7 @@ package com.snp.batch.jobs.aistargetdbsync.batch.config; import com.snp.batch.jobs.aistargetdbsync.batch.tasklet.AisTargetDbSyncTasklet; +import com.snp.batch.jobs.aistargetdbsync.batch.tasklet.ShipLastPositionSyncTasklet; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; @@ -20,13 +21,14 @@ import org.springframework.transaction.PlatformTransactionManager; * API: 없음 (캐시 기반) * * 동작: - * - Caffeine 캐시에서 최근 15분 이내 데이터 조회 - * - MMSI별 최신 위치 1건씩 DB에 UPSERT - * - 1분 주기 aisTargetImportJob과 분리하여 DB 볼륨 최적화 + * - Step 1: Caffeine 캐시에서 최근 15분 이내 데이터 조회 → ais_target DB UPSERT + * - Step 2: 캐시에서 IMO별 최신 위치 조회 → tb_ship_main_info 위치 컬럼 UPDATE * * 데이터 흐름: * - aisTargetImportJob (1분): API → 캐시 업데이트 * - aisTargetDbSyncJob (15분): 캐시 → DB 저장 (이 Job) + * - Step 1: 캐시 → ais_target UPSERT + * - Step 2: 캐시 → tb_ship_main_info UPDATE (IMO별 최신 1건) */ @Slf4j @Configuration @@ -35,14 +37,17 @@ public class AisTargetDbSyncJobConfig { private final JobRepository jobRepository; private final PlatformTransactionManager transactionManager; private final AisTargetDbSyncTasklet aisTargetDbSyncTasklet; + private final ShipLastPositionSyncTasklet shipLastPositionSyncTasklet; public AisTargetDbSyncJobConfig( JobRepository jobRepository, PlatformTransactionManager transactionManager, - AisTargetDbSyncTasklet aisTargetDbSyncTasklet) { + AisTargetDbSyncTasklet aisTargetDbSyncTasklet, + ShipLastPositionSyncTasklet shipLastPositionSyncTasklet) { this.jobRepository = jobRepository; this.transactionManager = transactionManager; this.aisTargetDbSyncTasklet = aisTargetDbSyncTasklet; + this.shipLastPositionSyncTasklet = shipLastPositionSyncTasklet; } @Bean(name = "aisTargetDbSyncStep") @@ -52,6 +57,13 @@ public class AisTargetDbSyncJobConfig { .build(); } + @Bean(name = "shipLastPositionSyncStep") + public Step shipLastPositionSyncStep() { + return new StepBuilder("shipLastPositionSyncStep", jobRepository) + .tasklet(shipLastPositionSyncTasklet, transactionManager) + .build(); + } + @Bean(name = "aisTargetDbSyncJob") public Job aisTargetDbSyncJob() { log.info("Job 생성: aisTargetDbSyncJob"); @@ -74,6 +86,7 @@ public class AisTargetDbSyncJobConfig { } }) .start(aisTargetDbSyncStep()) + .next(shipLastPositionSyncStep()) .build(); } } diff --git a/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/repository/ShipLastPositionSyncRepository.java b/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/repository/ShipLastPositionSyncRepository.java new file mode 100644 index 0000000..acc19f6 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/repository/ShipLastPositionSyncRepository.java @@ -0,0 +1,15 @@ +package com.snp.batch.jobs.aistargetdbsync.batch.repository; + +import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; + +import java.util.List; + +/** + * 캐시 기반 Ship Last Position 동기화 Repository + * + * AisTargetEntity(캐시 데이터)를 받아 tb_ship_main_info의 위치/항해 컬럼을 UPDATE + */ +public interface ShipLastPositionSyncRepository { + + int updateLastPositions(List entities); +} diff --git a/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/repository/ShipLastPositionSyncRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/repository/ShipLastPositionSyncRepositoryImpl.java new file mode 100644 index 0000000..15958ed --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/repository/ShipLastPositionSyncRepositoryImpl.java @@ -0,0 +1,137 @@ +package com.snp.batch.jobs.aistargetdbsync.batch.repository; + +import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; +import org.springframework.transaction.annotation.Transactional; + +import java.sql.PreparedStatement; +import java.sql.Types; +import java.util.List; + +/** + * 캐시 기반 Ship Last Position 동기화 Repository 구현 + * + * AisTargetEntity의 필드를 tb_ship_main_info 컬럼에 매핑하여 UPDATE 수행. + * 기존 ShipLastPositionRepositoryImpl과 동일한 SQL을 사용하되, + * AisTargetEntity의 타입(OffsetDateTime, Integer 등)에 맞게 변환 처리. + */ +@Slf4j +@Repository("shipLastPositionSyncRepository") +public class ShipLastPositionSyncRepositoryImpl implements ShipLastPositionSyncRepository { + + private static final int BATCH_SIZE = 1000; + + private final JdbcTemplate jdbcTemplate; + + @Value("${app.batch.service-schema.name}") + private String targetSchema; + + @Value("${app.batch.service-schema.tables.service-001}") + private String tableName; + + public ShipLastPositionSyncRepositoryImpl(JdbcTemplate jdbcTemplate) { + this.jdbcTemplate = jdbcTemplate; + } + + private String getTableName() { + return targetSchema + "." + tableName; + } + + private String getUpdateSql() { + return """ + UPDATE %s + SET last_cptr_hr_utc = ?::timestamptz, + last_port = ?, + now_position_lat = ?, + now_position_lon = ?, + ship_dest = ?, + arvl_prnmnt_hr = ?::timestamptz, + bow_drctn = ?, + cog = ?, + sog = ?, + ship_nav_status = ?, + cargo_ton = ?, + add_info = ?, + in_sts = ?, + ancrg_yn = ? + WHERE imo_no = ? + """.formatted(getTableName()); + } + + @Override + @Transactional + public int updateLastPositions(List entities) { + if (entities == null || entities.isEmpty()) { + return 0; + } + + String sql = getUpdateSql(); + int totalUpdated = 0; + + int[][] batchResults = jdbcTemplate.batchUpdate(sql, entities, BATCH_SIZE, + (ps, entity) -> setUpdateParameters(ps, entity)); + + for (int[] batchResult : batchResults) { + for (int result : batchResult) { + if (result > 0) { + totalUpdated += result; + } + } + } + + log.info("Ship Last Position 동기화 완료: 대상={} 건, 갱신={} 건", entities.size(), totalUpdated); + return totalUpdated; + } + + private void setUpdateParameters(PreparedStatement ps, AisTargetEntity entity) throws java.sql.SQLException { + int idx = 1; + + // last_cptr_hr_utc ← receivedDate (OffsetDateTime → String for ::timestamptz) + ps.setString(idx++, entity.getReceivedDate() != null ? entity.getReceivedDate().toString() : null); + + // last_port ← lpcCode (Integer → String) + ps.setString(idx++, entity.getLpcCode() != null ? String.valueOf(entity.getLpcCode()) : null); + + // now_position_lat ← lat (Double, null-safe) + ps.setObject(idx++, entity.getLat(), Types.DOUBLE); + + // now_position_lon ← lon (Double, null-safe) + ps.setObject(idx++, entity.getLon(), Types.DOUBLE); + + // ship_dest ← destination (String) + ps.setString(idx++, entity.getDestination()); + + // arvl_prnmnt_hr ← eta (OffsetDateTime → String for ::timestamptz) + ps.setString(idx++, entity.getEta() != null ? entity.getEta().toString() : null); + + // bow_drctn ← heading (Double, null-safe) + ps.setObject(idx++, entity.getHeading(), Types.DOUBLE); + + // cog ← cog (Double, null-safe) + ps.setObject(idx++, entity.getCog(), Types.DOUBLE); + + // sog ← sog (Double, null-safe) + ps.setObject(idx++, entity.getSog(), Types.DOUBLE); + + // ship_nav_status ← status (String) + ps.setString(idx++, entity.getStatus()); + + // cargo_ton ← tonnesCargo (Integer, null-safe) + ps.setObject(idx++, entity.getTonnesCargo(), Types.INTEGER); + + // add_info ← extraInfo (String) + ps.setString(idx++, entity.getExtraInfo()); + + // in_sts ← inSTS (Integer, null-safe) + ps.setObject(idx++, entity.getInSTS(), Types.INTEGER); + + // ancrg_yn ← onBerth (Boolean, null-safe) + ps.setObject(idx++, entity.getOnBerth(), Types.BOOLEAN); + + // WHERE imo_no ← imoVerified (String) + ps.setString(idx++, entity.getImoVerified()); + } +} diff --git a/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/tasklet/ShipLastPositionSyncTasklet.java b/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/tasklet/ShipLastPositionSyncTasklet.java new file mode 100644 index 0000000..1a044b8 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/tasklet/ShipLastPositionSyncTasklet.java @@ -0,0 +1,133 @@ +package com.snp.batch.jobs.aistargetdbsync.batch.tasklet; + +import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; +import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager; +import com.snp.batch.jobs.aistargetdbsync.batch.repository.ShipLastPositionSyncRepository; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.time.Instant; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * Ship Last Position Sync Tasklet + * + * 동작: + * - Caffeine 캐시에서 마지막 성공 이후 ~ 현재까지의 AIS 데이터를 조회 + * - imoVerified가 유효한 항목만 필터링 + * - IMO별 messageTimestamp 최신 1건만 선택 + * - tb_ship_main_info의 위치/항해 14개 컬럼을 UPDATE + * + * 시간 범위 결정 전략: + * - AisTargetDbSyncTasklet과 동일 (lastSuccessTime 기반, fallback 사용) + */ +@Slf4j +@Component +public class ShipLastPositionSyncTasklet implements Tasklet { + + private final AisTargetCacheManager cacheManager; + private final ShipLastPositionSyncRepository repository; + private final int fallbackMinutes; + + private final AtomicReference lastSuccessTime = new AtomicReference<>(); + + public ShipLastPositionSyncTasklet( + AisTargetCacheManager cacheManager, + ShipLastPositionSyncRepository repository, + @Value("${app.batch.ais-target-db-sync.time-range-minutes:15}") int fallbackMinutes) { + this.cacheManager = cacheManager; + this.repository = repository; + this.fallbackMinutes = fallbackMinutes; + } + + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { + Instant now = Instant.now(); + int rangeMinutes = resolveRangeMinutes(now); + + log.info("========================================"); + log.info("Ship Last Position Sync 시작"); + log.info("조회 범위: 최근 {}분 (방식: {})", rangeMinutes, + lastSuccessTime.get() != null ? "마지막 성공 기준" : "fallback"); + log.info("========================================"); + + long startTime = System.currentTimeMillis(); + + // 1. 캐시에서 시간 범위 내 데이터 조회 + List entities = cacheManager.getByTimeRange(rangeMinutes); + + if (entities.isEmpty()) { + log.warn("캐시에서 조회된 데이터가 없습니다 (범위: {}분)", rangeMinutes); + lastSuccessTime.set(now); + return RepeatStatus.FINISHED; + } + + // 2. imoVerified가 유효한 항목만 필터 + List imoFiltered = entities.stream() + .filter(e -> isValidImo(e.getImoVerified())) + .toList(); + + if (imoFiltered.isEmpty()) { + log.info("유효한 IMO가 있는 데이터가 없습니다 (전체: {} 건)", entities.size()); + lastSuccessTime.set(now); + return RepeatStatus.FINISHED; + } + + // 3. IMO별 messageTimestamp 최신 1건만 선택 + List latestPerImo = imoFiltered.stream() + .collect(Collectors.groupingBy(AisTargetEntity::getImoVerified)) + .values().stream() + .map(group -> group.stream() + .max(Comparator.comparing( + AisTargetEntity::getMessageTimestamp, + Comparator.nullsFirst(Comparator.naturalOrder()))) + .orElseThrow()) + .toList(); + + log.info("캐시 조회: {} 건 → IMO 유효: {} 건 → IMO별 최신: {} 건", + entities.size(), imoFiltered.size(), latestPerImo.size()); + + // 4. DB UPDATE + int updated = repository.updateLastPositions(latestPerImo); + + long elapsed = System.currentTimeMillis() - startTime; + + lastSuccessTime.set(now); + + log.info("========================================"); + log.info("Ship Last Position Sync 완료"); + log.info("대상: {} 건, 갱신: {} 건", latestPerImo.size(), updated); + log.info("소요 시간: {}ms", elapsed); + log.info("========================================"); + + contribution.incrementWriteCount(updated); + + return RepeatStatus.FINISHED; + } + + private static final int MAX_RANGE_MINUTES = 60; + + private int resolveRangeMinutes(Instant now) { + Instant last = lastSuccessTime.get(); + if (last == null) { + return Math.min(fallbackMinutes, MAX_RANGE_MINUTES); + } + + long elapsedMinutes = Duration.between(last, now).toMinutes(); + int range = (int) Math.max(elapsedMinutes + 1, 1); + return Math.min(range, MAX_RANGE_MINUTES); + } + + private boolean isValidImo(String imoVerified) { + return imoVerified != null && !imoVerified.isBlank() && !"0".equals(imoVerified); + } +}