diff --git a/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/config/AisTargetDbSyncJobConfig.java b/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/config/AisTargetDbSyncJobConfig.java index 48e723a..bd32662 100644 --- a/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/config/AisTargetDbSyncJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/config/AisTargetDbSyncJobConfig.java @@ -1,6 +1,7 @@ package com.snp.batch.jobs.aistargetdbsync.batch.config; import com.snp.batch.jobs.aistargetdbsync.batch.tasklet.AisTargetDbSyncTasklet; +import com.snp.batch.jobs.aistargetdbsync.batch.tasklet.ShipLastPositionSyncTasklet; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; @@ -20,13 +21,14 @@ import org.springframework.transaction.PlatformTransactionManager; * API: 없음 (캐시 기반) * * 동작: - * - Caffeine 캐시에서 최근 15분 이내 데이터 조회 - * - MMSI별 최신 위치 1건씩 DB에 UPSERT - * - 1분 주기 aisTargetImportJob과 분리하여 DB 볼륨 최적화 + * - Step 1: Caffeine 캐시에서 최근 15분 이내 데이터 조회 → ais_target DB UPSERT + * - Step 2: 캐시에서 IMO별 최신 위치 조회 → tb_ship_main_info 위치 컬럼 UPDATE * * 데이터 흐름: * - aisTargetImportJob (1분): API → 캐시 업데이트 * - aisTargetDbSyncJob (15분): 캐시 → DB 저장 (이 Job) + * - Step 1: 캐시 → ais_target UPSERT + * - Step 2: 캐시 → tb_ship_main_info UPDATE (IMO별 최신 1건) */ @Slf4j @Configuration @@ -35,14 +37,17 @@ public class AisTargetDbSyncJobConfig { private final JobRepository jobRepository; private final PlatformTransactionManager transactionManager; private final AisTargetDbSyncTasklet aisTargetDbSyncTasklet; + private final ShipLastPositionSyncTasklet shipLastPositionSyncTasklet; public AisTargetDbSyncJobConfig( JobRepository jobRepository, PlatformTransactionManager transactionManager, - AisTargetDbSyncTasklet aisTargetDbSyncTasklet) { + AisTargetDbSyncTasklet aisTargetDbSyncTasklet, + ShipLastPositionSyncTasklet shipLastPositionSyncTasklet) { this.jobRepository = jobRepository; this.transactionManager = transactionManager; this.aisTargetDbSyncTasklet = aisTargetDbSyncTasklet; + this.shipLastPositionSyncTasklet = shipLastPositionSyncTasklet; } @Bean(name = "aisTargetDbSyncStep") @@ -52,6 +57,13 @@ public class AisTargetDbSyncJobConfig { .build(); } + @Bean(name = "shipLastPositionSyncStep") + public Step shipLastPositionSyncStep() { + return new StepBuilder("shipLastPositionSyncStep", jobRepository) + .tasklet(shipLastPositionSyncTasklet, transactionManager) + .build(); + } + @Bean(name = "aisTargetDbSyncJob") public Job aisTargetDbSyncJob() { log.info("Job 생성: aisTargetDbSyncJob"); @@ -74,6 +86,7 @@ public class AisTargetDbSyncJobConfig { } }) .start(aisTargetDbSyncStep()) + .next(shipLastPositionSyncStep()) .build(); } } diff --git a/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/repository/ShipLastPositionSyncRepository.java b/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/repository/ShipLastPositionSyncRepository.java new file mode 100644 index 0000000..acc19f6 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/repository/ShipLastPositionSyncRepository.java @@ -0,0 +1,15 @@ +package com.snp.batch.jobs.aistargetdbsync.batch.repository; + +import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; + +import java.util.List; + +/** + * 캐시 기반 Ship Last Position 동기화 Repository + * + * AisTargetEntity(캐시 데이터)를 받아 tb_ship_main_info의 위치/항해 컬럼을 UPDATE + */ +public interface ShipLastPositionSyncRepository { + + int updateLastPositions(List entities); +} diff --git a/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/repository/ShipLastPositionSyncRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/repository/ShipLastPositionSyncRepositoryImpl.java new file mode 100644 index 0000000..57af690 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/repository/ShipLastPositionSyncRepositoryImpl.java @@ -0,0 +1,155 @@ +package com.snp.batch.jobs.aistargetdbsync.batch.repository; + +import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; +import org.springframework.transaction.annotation.Transactional; + +import java.sql.PreparedStatement; +import java.sql.Types; +import java.util.List; + +/** + * 캐시 기반 Ship Last Position 동기화 Repository 구현 + * + * AisTargetEntity의 필드를 tb_ship_main_info 컬럼에 매핑하여 UPDATE 수행. + * 기존 ShipLastPositionRepositoryImpl과 동일한 SQL을 사용하되, + * AisTargetEntity의 타입(OffsetDateTime, Integer 등)에 맞게 변환 처리. + */ +@Slf4j +@Repository("shipLastPositionSyncRepository") +public class ShipLastPositionSyncRepositoryImpl implements ShipLastPositionSyncRepository { + + private static final int BATCH_SIZE = 1000; + + private final JdbcTemplate jdbcTemplate; + + @Value("${app.batch.service-schema.name}") + private String targetSchema; + + @Value("${app.batch.service-schema.tables.service-001}") + private String tableName; + + public ShipLastPositionSyncRepositoryImpl(JdbcTemplate jdbcTemplate) { + this.jdbcTemplate = jdbcTemplate; + } + + private String getTableName() { + return targetSchema + "." + tableName; + } + + private String getUpdateSql() { +// return """ +// UPDATE %s +// SET last_cptr_hr_utc = ?::timestamptz, +// last_port = ?, +// now_position_lat = ?, +// now_position_lon = ?, +// ship_dest = ?, +// arvl_prnmnt_hr = ?::timestamptz, +// bow_drctn = ?, +// cog = ?, +// sog = ?, +// ship_nav_status = ?, +// cargo_ton = ?, +// add_info = ?, +// in_sts = ?, +// ancrg_yn = ? +// WHERE imo_no = ? +// """.formatted(getTableName()); + return """ + UPDATE new_snp.core20 + SET lastseen = ?::timestamptz, + lastport = ?, + position_latitude = ?, + position_longitude = ?, + destination = ?, + eta = ?::timestamptz, + heading = ?, + cog = ?, + speedservice = ?, + navstat = ?, + tonnes_cargo = ?, + extra_info = ?, + in_sts = ?, + on_berth = ? + WHERE lrno = ?; + """; + } + + @Override + @Transactional + public int updateLastPositions(List entities) { + if (entities == null || entities.isEmpty()) { + return 0; + } + + String sql = getUpdateSql(); + int totalUpdated = 0; + + int[][] batchResults = jdbcTemplate.batchUpdate(sql, entities, BATCH_SIZE, + (ps, entity) -> setUpdateParameters(ps, entity)); + + for (int[] batchResult : batchResults) { + for (int result : batchResult) { + if (result > 0) { + totalUpdated += result; + } + } + } + + log.info("Ship Last Position 동기화 완료: 대상={} 건, 갱신={} 건", entities.size(), totalUpdated); + return totalUpdated; + } + + private void setUpdateParameters(PreparedStatement ps, AisTargetEntity entity) throws java.sql.SQLException { + int idx = 1; + + // last_cptr_hr_utc ← receivedDate (OffsetDateTime → String for ::timestamptz) + ps.setString(idx++, entity.getReceivedDate() != null ? entity.getReceivedDate().toString() : null); + + // last_port ← lpcCode (Integer → String) + ps.setString(idx++, entity.getLpcCode() != null ? String.valueOf(entity.getLpcCode()) : null); + + // now_position_lat ← lat (Double, null-safe) + ps.setObject(idx++, entity.getLat(), Types.DOUBLE); + + // now_position_lon ← lon (Double, null-safe) + ps.setObject(idx++, entity.getLon(), Types.DOUBLE); + + // ship_dest ← destination (String) + ps.setString(idx++, entity.getDestination()); + + // arvl_prnmnt_hr ← eta (OffsetDateTime → String for ::timestamptz) + ps.setString(idx++, entity.getEta() != null ? entity.getEta().toString() : null); + + // bow_drctn ← heading (Double, null-safe) + ps.setObject(idx++, entity.getHeading(), Types.DOUBLE); + + // cog ← cog (Double, null-safe) + ps.setObject(idx++, entity.getCog(), Types.DOUBLE); + + // sog ← sog (Double, null-safe) + ps.setObject(idx++, entity.getSog(), Types.DOUBLE); + + // ship_nav_status ← status (String) + ps.setString(idx++, entity.getStatus()); + + // cargo_ton ← tonnesCargo (Integer, null-safe) + ps.setObject(idx++, entity.getTonnesCargo(), Types.INTEGER); + + // add_info ← extraInfo (String) + ps.setString(idx++, entity.getExtraInfo()); + + // in_sts ← inSTS (Integer, null-safe) + ps.setObject(idx++, entity.getInSTS(), Types.INTEGER); + + // ancrg_yn ← onBerth (Boolean, null-safe) + ps.setObject(idx++, entity.getOnBerth(), Types.BOOLEAN); + + // WHERE imo_no ← imoVerified (String) + ps.setString(idx++, entity.getImoVerified()); + } +} diff --git a/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/tasklet/ShipLastPositionSyncTasklet.java b/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/tasklet/ShipLastPositionSyncTasklet.java new file mode 100644 index 0000000..1a044b8 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/tasklet/ShipLastPositionSyncTasklet.java @@ -0,0 +1,133 @@ +package com.snp.batch.jobs.aistargetdbsync.batch.tasklet; + +import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; +import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager; +import com.snp.batch.jobs.aistargetdbsync.batch.repository.ShipLastPositionSyncRepository; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.time.Instant; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * Ship Last Position Sync Tasklet + * + * 동작: + * - Caffeine 캐시에서 마지막 성공 이후 ~ 현재까지의 AIS 데이터를 조회 + * - imoVerified가 유효한 항목만 필터링 + * - IMO별 messageTimestamp 최신 1건만 선택 + * - tb_ship_main_info의 위치/항해 14개 컬럼을 UPDATE + * + * 시간 범위 결정 전략: + * - AisTargetDbSyncTasklet과 동일 (lastSuccessTime 기반, fallback 사용) + */ +@Slf4j +@Component +public class ShipLastPositionSyncTasklet implements Tasklet { + + private final AisTargetCacheManager cacheManager; + private final ShipLastPositionSyncRepository repository; + private final int fallbackMinutes; + + private final AtomicReference lastSuccessTime = new AtomicReference<>(); + + public ShipLastPositionSyncTasklet( + AisTargetCacheManager cacheManager, + ShipLastPositionSyncRepository repository, + @Value("${app.batch.ais-target-db-sync.time-range-minutes:15}") int fallbackMinutes) { + this.cacheManager = cacheManager; + this.repository = repository; + this.fallbackMinutes = fallbackMinutes; + } + + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { + Instant now = Instant.now(); + int rangeMinutes = resolveRangeMinutes(now); + + log.info("========================================"); + log.info("Ship Last Position Sync 시작"); + log.info("조회 범위: 최근 {}분 (방식: {})", rangeMinutes, + lastSuccessTime.get() != null ? "마지막 성공 기준" : "fallback"); + log.info("========================================"); + + long startTime = System.currentTimeMillis(); + + // 1. 캐시에서 시간 범위 내 데이터 조회 + List entities = cacheManager.getByTimeRange(rangeMinutes); + + if (entities.isEmpty()) { + log.warn("캐시에서 조회된 데이터가 없습니다 (범위: {}분)", rangeMinutes); + lastSuccessTime.set(now); + return RepeatStatus.FINISHED; + } + + // 2. imoVerified가 유효한 항목만 필터 + List imoFiltered = entities.stream() + .filter(e -> isValidImo(e.getImoVerified())) + .toList(); + + if (imoFiltered.isEmpty()) { + log.info("유효한 IMO가 있는 데이터가 없습니다 (전체: {} 건)", entities.size()); + lastSuccessTime.set(now); + return RepeatStatus.FINISHED; + } + + // 3. IMO별 messageTimestamp 최신 1건만 선택 + List latestPerImo = imoFiltered.stream() + .collect(Collectors.groupingBy(AisTargetEntity::getImoVerified)) + .values().stream() + .map(group -> group.stream() + .max(Comparator.comparing( + AisTargetEntity::getMessageTimestamp, + Comparator.nullsFirst(Comparator.naturalOrder()))) + .orElseThrow()) + .toList(); + + log.info("캐시 조회: {} 건 → IMO 유효: {} 건 → IMO별 최신: {} 건", + entities.size(), imoFiltered.size(), latestPerImo.size()); + + // 4. DB UPDATE + int updated = repository.updateLastPositions(latestPerImo); + + long elapsed = System.currentTimeMillis() - startTime; + + lastSuccessTime.set(now); + + log.info("========================================"); + log.info("Ship Last Position Sync 완료"); + log.info("대상: {} 건, 갱신: {} 건", latestPerImo.size(), updated); + log.info("소요 시간: {}ms", elapsed); + log.info("========================================"); + + contribution.incrementWriteCount(updated); + + return RepeatStatus.FINISHED; + } + + private static final int MAX_RANGE_MINUTES = 60; + + private int resolveRangeMinutes(Instant now) { + Instant last = lastSuccessTime.get(); + if (last == null) { + return Math.min(fallbackMinutes, MAX_RANGE_MINUTES); + } + + long elapsedMinutes = Duration.between(last, now).toMinutes(); + int range = (int) Math.max(elapsedMinutes + 1, 1); + return Math.min(range, MAX_RANGE_MINUTES); + } + + private boolean isValidImo(String imoVerified) { + return imoVerified != null && !imoVerified.isBlank() && !"0".equals(imoVerified); + } +} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipLastPositionUpdateJobConfig.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipLastPositionUpdateJobConfig.java deleted file mode 100644 index 4136ed3..0000000 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipLastPositionUpdateJobConfig.java +++ /dev/null @@ -1,90 +0,0 @@ -package com.snp.batch.jobs.shipdetail.batch.config; - -import com.snp.batch.common.batch.config.BaseJobConfig; -import com.snp.batch.jobs.shipdetail.batch.dto.TargetEnhancedDto; -import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity; -import com.snp.batch.jobs.shipdetail.batch.processor.ShipLastPositionDataProcessor; -import com.snp.batch.jobs.shipdetail.batch.reader.ShipLastPositionDataReader; -import com.snp.batch.jobs.shipdetail.batch.writer.ShipLastPositionDataWriter; -import lombok.extern.slf4j.Slf4j; -import org.springframework.batch.core.Job; -import org.springframework.batch.core.Step; -import org.springframework.batch.core.repository.JobRepository; -import org.springframework.batch.item.ItemProcessor; -import org.springframework.batch.item.ItemReader; -import org.springframework.batch.item.ItemWriter; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.web.reactive.function.client.WebClient; - -@Slf4j -@Configuration -public class ShipLastPositionUpdateJobConfig extends BaseJobConfig { - private final JdbcTemplate jdbcTemplate; - private final WebClient maritimeAisApiWebClient; - - private final ShipLastPositionDataProcessor shipLastPositionDataProcessor; - - private final ShipLastPositionDataWriter shipLastPositionDataWriter; - - @Value("${app.batch.service-schema.name}") - private String targetSchema; - - @Override - protected int getChunkSize() { - return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정 - } - public ShipLastPositionUpdateJobConfig( - JobRepository jobRepository, - PlatformTransactionManager transactionManager, - ShipLastPositionDataProcessor shipLastPositionDataProcessor, - ShipLastPositionDataWriter shipLastPositionDataWriter, - JdbcTemplate jdbcTemplate, - @Qualifier("maritimeAisApiWebClient")WebClient maritimeAisApiWebClient) { - super(jobRepository, transactionManager); - this.jdbcTemplate = jdbcTemplate; - this.maritimeAisApiWebClient = maritimeAisApiWebClient; - this.shipLastPositionDataProcessor = shipLastPositionDataProcessor; - this.shipLastPositionDataWriter = shipLastPositionDataWriter; - } - - @Override - protected String getJobName() { - return "shipLastPositionUpdateJob"; - } - - @Override - protected String getStepName() { - return "shipLastPositionUpdateStep"; - } - - @Override - protected ItemReader createReader() { - return new ShipLastPositionDataReader(maritimeAisApiWebClient, jdbcTemplate, targetSchema); - } - - @Override - protected ItemProcessor createProcessor() { - return shipLastPositionDataProcessor; - } - - @Override - protected ItemWriter createWriter() { - return shipLastPositionDataWriter; - } - - @Bean(name = "shipLastPositionUpdateJob") - public Job shipLastPositionUpdateJob() { - return job(); - } - - @Bean(name = "shipLastPositionUpdateStep") - public Step shipLastPositionUpdateStep() { - return step(); - } - -} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/dto/AisApiResponse.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/dto/AisApiResponse.java deleted file mode 100644 index 6af9b7d..0000000 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/dto/AisApiResponse.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.snp.batch.jobs.shipdetail.batch.dto; - -import com.fasterxml.jackson.annotation.JsonProperty; -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.util.List; - -/** - * Maritime API GetShipsByIHSLRorIMONumbers 응답 래퍼 - * - * API 응답 구조: - * { - * "shipCount": 5, - * "Ships": [...] - * } - * - * Maritime API GetShipsByIHSLRorIMONumbersAll 응답 래퍼 - * - * API 응답 구조: - * { - * "shipCount": 5, - * "ShipResult": [...], - * "APSStatus": {...} - * } - * - */ -@Data -@Builder -@NoArgsConstructor -@AllArgsConstructor -public class AisApiResponse { - - @JsonProperty("ShipResult") - private List shipResult; - - @JsonProperty("targetEnhancedArr") - private List targetEnhancedArr; - -} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/dto/TargetEnhancedDto.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/dto/TargetEnhancedDto.java deleted file mode 100644 index 6422d83..0000000 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/dto/TargetEnhancedDto.java +++ /dev/null @@ -1,185 +0,0 @@ -package com.snp.batch.jobs.shipdetail.batch.dto; - -import com.fasterxml.jackson.annotation.JsonProperty; -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@Builder -@NoArgsConstructor -@AllArgsConstructor -public class TargetEnhancedDto { - - @JsonProperty("DWT") - private Integer dwt; - - @JsonProperty("STAT5CODE") - private String stat5Code; - - @JsonProperty("Source") - private String source; - - @JsonProperty("CoG") - private Double cog; - - @JsonProperty("LastStaticUpdateReceived") - private String lastStaticUpdateReceived; - - @JsonProperty("ZoneId") - private Integer zoneId; - - @JsonProperty("LPCCode") - private String lpcCode; - - @JsonProperty("TonnesCargo") - private Integer tonnesCargo; - - @JsonProperty("StationId") - private String stationId; - - @JsonProperty("InSTS") - private Object inSts; // JSON에서 null이므로 Object로 처리 - - @JsonProperty("ImoVerified") - private String imoVerified; - - @JsonProperty("OnBerth") - private Object onBerth; - - @JsonProperty("DestinationTidied") - private String destinationTidied; - - @JsonProperty("Anomalous") - private Object anomalous; // JSON에서 null이므로 Object로 처리 - - @JsonProperty("MessageType") - private Object messageType; // JSON에서 null이므로 Object로 처리 - - @JsonProperty("DestinationPortID") - private Integer destinationPortID; - - @JsonProperty("DestinationUNLOCODE") - private String destinationUnlocode; - - @JsonProperty("MMSI") - private Integer mmsi; - - @JsonProperty("IMO") - private Integer imo; - - @JsonProperty("AgeMinutes") - private Double ageMinutes; - - @JsonProperty("Lat") - private Double lat; - - @JsonProperty("Lon") - private Double lon; - - @JsonProperty("Heading") - private Double heading; - - @JsonProperty("SoG") - private Double sog; - - @JsonProperty("Width") - private Integer width; - - @JsonProperty("Length") - private Integer length; - - @JsonProperty("Draught") - private Double draught; - - @JsonProperty("Name") - private String name; - - @JsonProperty("Callsign") - private String callsign; - - @JsonProperty("Destination") - private String destination; - - @JsonProperty("ETA") - private String eta; - - @JsonProperty("Status") - private String status; - - @JsonProperty("VesselType") - private String vesselType; - - @JsonProperty("ExtraInfo") - private String extraInfo; - - @JsonProperty("PositionAccuracy") - private Integer positionAccuracy; - - @JsonProperty("RoT") - private Integer rot; - - @JsonProperty("TimestampUTC") - private Integer timestampUtc; - - @JsonProperty("RepeatIndicator") - private Integer repeatIndicator; - - @JsonProperty("RAIMFlag") - private Integer raimFlag; - - @JsonProperty("RadioStatus") - private Integer radioStatus; - - @JsonProperty("Regional") - private Integer regional; - - @JsonProperty("Regional2") - private Integer regional2; - - @JsonProperty("Spare") - private Integer spare; - - @JsonProperty("Spare2") - private Integer spare2; - - @JsonProperty("AISVersion") - private Integer aisVersion; - - @JsonProperty("PositionFixType") - private Integer positionFixType; - - @JsonProperty("DTE") - private Integer dte; - - @JsonProperty("BandFlag") - private Integer bandFlag; - - @JsonProperty("ReceivedDate") - private String receivedDate; - - @JsonProperty("MessageTimestamp") - private String messageTimestamp; - - @JsonProperty("LengthBow") - private Integer lengthBow; - - @JsonProperty("LengthStern") - private Integer lengthStern; - - @JsonProperty("WidthPort") - private Integer widthPort; - - @JsonProperty("WidthStarboard") - private Integer widthStarboard; - - // Getters and Setters (생략 - 필요에 따라 추가) - // 예시: - public Integer getDwt() { - return dwt; - } - public void setDwt(Integer dwt) { - this.dwt = dwt; - } -} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/entity/TargetEnhancedEntity.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/entity/TargetEnhancedEntity.java deleted file mode 100644 index 3ad9c91..0000000 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/entity/TargetEnhancedEntity.java +++ /dev/null @@ -1,128 +0,0 @@ -package com.snp.batch.jobs.shipdetail.batch.entity; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.snp.batch.common.batch.entity.BaseEntity; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; -import lombok.experimental.SuperBuilder; - -import java.time.LocalDateTime; - -@Data -@SuperBuilder -@NoArgsConstructor -@AllArgsConstructor -@EqualsAndHashCode(callSuper = true) -public class TargetEnhancedEntity extends BaseEntity { - - private Integer dwt; - - private String stat5Code; - - private String source; - - private Double cog; - - private String lastStaticUpdateReceived; - - private Integer zoneId; - - private String lpcCode; - - private Integer tonnesCargo; - - private String stationId; - - private Object inSts; // JSON에서 null이므로 Object로 처리 - - private String imoVerified; - - private Object onBerth; - - private String destinationTidied; - - private Object anomalous; // JSON에서 null이므로 Object로 처리 - - private Object messageType; // JSON에서 null이므로 Object로 처리 - - private Integer destinationPortID; - - private String destinationUnlocode; - - private Integer mmsi; - - private Integer imo; - - private Double ageMinutes; - - private Double lat; - - private Double lon; - - private Double heading; - - private Double sog; - - private Integer width; - - private Integer length; - - private Double draught; - - private String name; - - private String callsign; - - private String destination; - - private String eta; - - private String status; - - private String vesselType; - - private String extraInfo; - - private Integer positionAccuracy; - - private Integer rot; - - private Integer timestampUtc; - - private Integer repeatIndicator; - - private Integer raimFlag; - - private Integer radioStatus; - - private Integer regional; - - private Integer regional2; - - private Integer spare; - - private Integer spare2; - - private Integer aisVersion; - - private Integer positionFixType; - - private Integer dte; - - private Integer bandFlag; - - private String receivedDate; - - private String messageTimestamp; - - private Integer lengthBow; - - private Integer lengthStern; - - private Integer widthPort; - - private Integer widthStarboard; - -} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/processor/ShipLastPositionDataProcessor.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/processor/ShipLastPositionDataProcessor.java deleted file mode 100644 index f3dc3ef..0000000 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/processor/ShipLastPositionDataProcessor.java +++ /dev/null @@ -1,83 +0,0 @@ -package com.snp.batch.jobs.shipdetail.batch.processor; - -import com.snp.batch.common.batch.processor.BaseProcessor; -import com.snp.batch.jobs.shipdetail.batch.dto.TargetEnhancedDto; -import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -@Slf4j -@Component -public class ShipLastPositionDataProcessor extends BaseProcessor { - @Override - protected TargetEnhancedEntity processItem(TargetEnhancedDto dto) throws Exception { - log.debug("AIS 최신 항적 데이터 처리 시작: imoNumber={}, shipName={}", - dto.getImoVerified(), dto.getName()); - - TargetEnhancedEntity entity = TargetEnhancedEntity.builder() - .dwt(dto.getDwt()) - .stat5Code(dto.getStat5Code()) - .source(dto.getSource()) - .lpcCode(dto.getLpcCode()) - .tonnesCargo(dto.getTonnesCargo()) - .imoVerified(dto.getImoVerified()) - .lastStaticUpdateReceived(dto.getLastStaticUpdateReceived()) // Entity 필드 타입이 String인 경우 - // Position and Movement Data - .cog(dto.getCog()) - .zoneId(dto.getZoneId()) - .stationId(dto.getStationId()) - .onBerth(dto.getOnBerth()) - .mmsi(dto.getMmsi()) - .imo(dto.getImo()) - .ageMinutes(dto.getAgeMinutes()) - .lat(dto.getLat()) - .lon(dto.getLon()) - .heading(dto.getHeading()) - .sog(dto.getSog()) - .positionAccuracy(dto.getPositionAccuracy()) - .rot(dto.getRot()) - .timestampUtc(dto.getTimestampUtc()) - .status(dto.getStatus()) - // Dimensions and Draught - .width(dto.getWidth()) - .length(dto.getLength()) - .draught(dto.getDraught()) - .lengthBow(dto.getLengthBow()) - .lengthStern(dto.getLengthStern()) - .widthPort(dto.getWidthPort()) - .widthStarboard(dto.getWidthStarboard()) - // Voyage Data - .name(dto.getName()) - .callsign(dto.getCallsign()) - .destination(dto.getDestination()) - .destinationTidied(dto.getDestinationTidied()) - .destinationPortID(dto.getDestinationPortID()) - .destinationUnlocode(dto.getDestinationUnlocode()) - .eta(dto.getEta()) // Entity 필드 타입이 String인 경우 - .vesselType(dto.getVesselType()) - .extraInfo(dto.getExtraInfo()) - // AIS Metadata Fields - .repeatIndicator(dto.getRepeatIndicator()) - .raimFlag(dto.getRaimFlag()) - .radioStatus(dto.getRadioStatus()) - .regional(dto.getRegional()) - .regional2(dto.getRegional2()) - .spare(dto.getSpare()) - .spare2(dto.getSpare2()) - .aisVersion(dto.getAisVersion()) - .positionFixType(dto.getPositionFixType()) - .dte(dto.getDte()) - .bandFlag(dto.getBandFlag()) - .receivedDate(dto.getReceivedDate()) // Entity 필드 타입이 String인 경우 - .messageTimestamp(dto.getMessageTimestamp()) // Entity 필드 타입이 String인 경우 - // Null/Object Fields - .inSts(dto.getInSts()) - .anomalous(dto.getAnomalous()) - .messageType(dto.getMessageType()) - .build(); - - log.debug("AIS 최신 항적 데이터 처리 완료: imoNumber={}", dto.getImoVerified()); - - return entity; - } -} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipLastPositionDataReader.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipLastPositionDataReader.java deleted file mode 100644 index 0bc8c23..0000000 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipLastPositionDataReader.java +++ /dev/null @@ -1,155 +0,0 @@ -package com.snp.batch.jobs.shipdetail.batch.reader; - -import com.snp.batch.common.batch.reader.BaseApiReader; -import com.snp.batch.jobs.shipdetail.batch.dto.AisApiResponse; -import com.snp.batch.jobs.shipdetail.batch.dto.TargetEnhancedDto; -import lombok.extern.slf4j.Slf4j; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.web.reactive.function.client.WebClient; - -import java.util.*; -@Slf4j -public class ShipLastPositionDataReader extends BaseApiReader { - - //TODO : - // 1. Core20 IMO_NUMBER 전체 조회 - // 2. IMO번호에 대한 마지막 AIS 신호 요청 (1회 최대 5000개 : Chunk 단위로 반복) - // 3. Response Data -> Core20에 업데이트 (Chunk 단위로 반복) - - private final JdbcTemplate jdbcTemplate; - private final String targetSchema; - - private List allImoNumbers; - private int currentBatchIndex = 0; - private final int batchSize = 5000; - - public ShipLastPositionDataReader(WebClient webClient, JdbcTemplate jdbcTemplate, String targetSchema) { - super(webClient); - this.jdbcTemplate = jdbcTemplate; - this.targetSchema = targetSchema; - enableChunkMode(); // ✨ Chunk 모드 활성화 - } - - @Override - protected String getReaderName() { - return "ShipLastPositionDataReader"; - } - - @Override - protected void resetCustomState() { - this.currentBatchIndex = 0; - this.allImoNumbers = null; - } - - @Override - protected String getApiPath() { - return "/AisSvc.svc/AIS/GetTargetsByIMOsEnhanced"; - } - - private String getTargetTable(){ - return targetSchema + ".tb_ship_main_info"; - } - - private String getImoQuery() { - return "SELECT imo_no FROM " + getTargetTable() + " ORDER BY imo_no"; - } - - @Override - protected void beforeFetch(){ - log.info("[{}] Core20 테이블에서 IMO 번호 조회 시작...", getReaderName()); - - allImoNumbers = jdbcTemplate.queryForList(getImoQuery(), String.class); - - int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); - - log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size()); - log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize); - log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches); - - updateApiCallStats(totalBatches, 0); - } - - @Override - protected List fetchNextBatch() throws Exception { - // 모든 배치 처리 완료 확인 - if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) { - return null; // Job 종료 - } - - // 현재 배치의 시작/끝 인덱스 계산 - int startIndex = currentBatchIndex; - int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size()); - - // 현재 배치의 IMO 번호 추출 (100개) - List currentBatch = allImoNumbers.subList(startIndex, endIndex); - - int currentBatchNumber = (currentBatchIndex / batchSize) + 1; - int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); - - log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...", - getReaderName(), currentBatchNumber, totalBatches, currentBatch.size()); - - try { - // IMO 번호를 쉼표로 연결 (예: "1000019,1000021,1000033,...") - String imoParam = String.join(",", currentBatch); - - // API 호출 - AisApiResponse response = callAisApiWithBatch(imoParam); - - // 다음 배치로 인덱스 이동 - currentBatchIndex = endIndex; - - // 응답 처리 - if (response != null && response.getTargetEnhancedArr() != null) { - List targets = response.getTargetEnhancedArr(); - log.info("[{}] 배치 {}/{} 완료: {} 건 조회", - getReaderName(), currentBatchNumber, totalBatches, targets.size()); - - // API 호출 통계 업데이트 - updateApiCallStats(totalBatches, currentBatchNumber); - - // API 과부하 방지 (다음 배치 전 0.5초 대기) - if (currentBatchIndex < allImoNumbers.size()) { - Thread.sleep(500); - } - - return targets; - } else { - log.warn("[{}] 배치 {}/{} 응답 없음", - getReaderName(), currentBatchNumber, totalBatches); - - // API 호출 통계 업데이트 (실패도 카운트) - updateApiCallStats(totalBatches, currentBatchNumber); - - return Collections.emptyList(); - } - - } catch (Exception e) { - log.error("[{}] 배치 {}/{} 처리 중 오류: {}", - getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e); - - // 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용) - currentBatchIndex = endIndex; - - // 빈 리스트 반환 (Job 계속 진행) - return Collections.emptyList(); - } - } - - private AisApiResponse callAisApiWithBatch(String imoNumbers) { - String url = getApiPath(); - - Map requestBody = Map.of("imo", imoNumbers); - - log.debug("[{}] API 호출: {}", getReaderName(), url); - log.debug("[{}] : {}", "requestBody", requestBody.toString()); - - return webClient.post() - .uri(url) - .bodyValue(requestBody) - .retrieve() - .bodyToMono(AisApiResponse.class) - .block(); - } - -} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipLastPositionRepository.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipLastPositionRepository.java deleted file mode 100644 index 7630460..0000000 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipLastPositionRepository.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.snp.batch.jobs.shipdetail.batch.repository; - -import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity; - -import java.util.List; - -public interface ShipLastPositionRepository { - void saveLastPositionAll(List items); -} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipLastPositionRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipLastPositionRepositoryImpl.java deleted file mode 100644 index 6c8673e..0000000 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipLastPositionRepositoryImpl.java +++ /dev/null @@ -1,125 +0,0 @@ -package com.snp.batch.jobs.shipdetail.batch.repository; - -import com.snp.batch.common.batch.repository.BaseJdbcRepository; -import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.core.RowMapper; -import org.springframework.stereotype.Repository; -import org.springframework.transaction.annotation.Transactional; - -import java.sql.PreparedStatement; -import java.sql.Types; -import java.util.List; - -@Slf4j -@Repository("shipLastPositionRepository") -public class ShipLastPositionRepositoryImpl extends BaseJdbcRepository implements ShipLastPositionRepository { - - @Value("${app.batch.service-schema.name}") - private String targetSchema; - - @Value("${app.batch.service-schema.tables.service-001}") - private String tableName; - - public ShipLastPositionRepositoryImpl(JdbcTemplate jdbcTemplate) { - super(jdbcTemplate); - } - - @Override - protected String getTargetSchema() { - return targetSchema; - } - - @Override - protected String getSimpleTableName() { - return tableName; - } - - @Override - protected RowMapper getRowMapper() { - return null; - } - - @Override - protected Long extractId(TargetEnhancedEntity entity) { - return null; - } - - @Override - protected String getInsertSql() { - return null; - } - - @Override - protected String getUpdateSql() { - return """ - UPDATE %s - SET last_cptr_hr_utc = ?::timestamptz, - last_port = ?, - now_position_lat = ?, - now_position_lon = ?, - ship_dest = ?, - arvl_prnmnt_hr = ?::timestamptz, - bow_drctn = ?, - cog = ?, - sog = ?, - ship_nav_status = ?, - cargo_ton = ?, - add_info = ?, - in_sts = ?, - ancrg_yn = ? - WHERE imo_no = ?; - """.formatted(getTableName()); - } - - @Override - protected void setInsertParameters(PreparedStatement ps, TargetEnhancedEntity entity) throws Exception { - - } - - @Override - protected void setUpdateParameters(PreparedStatement ps, TargetEnhancedEntity entity) throws Exception { - int idx = 1; - ps.setString(idx++, entity.getReceivedDate()); // receivedDate : lastseen (Datetime) - ps.setString(idx++, entity.getLpcCode()); // lpcCode : lastport (String) - ps.setDouble(idx++, entity.getLat()); // lat : position_latitude (Double) - ps.setDouble(idx++, entity.getLon()); // lon : position_longitude (Double) - ps.setString(idx++, entity.getDestination()); // destination : destination (String) - ps.setString(idx++, entity.getEta()); // eta : eta (Datetime) - ps.setDouble(idx++, entity.getHeading()); // heading : heading (Double) - ps.setDouble(idx++, entity.getSog()); // sog : speedservice (Double) - ps.setDouble(idx++, entity.getCog()); // cog : cog (Double) - ps.setString(idx++, entity.getStatus()); // status : navstat (String) - ps.setInt(idx++, entity.getTonnesCargo()); // status : tonnes_cargo (Int) - ps.setString(idx++, entity.getExtraInfo()); // status : extra_info (String) - ps.setObject(idx++, entity.getInSts(), Types.INTEGER); - ps.setObject(idx++, entity.getOnBerth(), Types.BOOLEAN); - ps.setString(idx++, entity.getImoVerified()); // imoVerified : imo (String) - } - - @Override - protected String getEntityName() { - return "TargetEnhancedEntity"; - } - - @Override - @Transactional - public void saveLastPositionAll(List items) { - if (items == null || items.isEmpty()) { - return; - } - jdbcTemplate.batchUpdate(getUpdateSql(), items, items.size(), - (ps, entity) -> { - try { - setUpdateParameters(ps, entity); - } catch (Exception e) { - log.error("배치 수정 파라미터 설정 실패", e); - throw new RuntimeException(e); - } - }); - - log.info("{} 전체 저장 완료: 수정={} 건", getEntityName(), items.size()); - } -} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/writer/ShipLastPositionDataWriter.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/writer/ShipLastPositionDataWriter.java deleted file mode 100644 index 5c5ef33..0000000 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/writer/ShipLastPositionDataWriter.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.snp.batch.jobs.shipdetail.batch.writer; - -import com.snp.batch.common.batch.writer.BaseWriter; -import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity; -import com.snp.batch.jobs.shipdetail.batch.repository.ShipLastPositionRepository; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import java.util.List; -@Slf4j -@Component -public class ShipLastPositionDataWriter extends BaseWriter { - - private final ShipLastPositionRepository shipLastPositionRepository; - public ShipLastPositionDataWriter(ShipLastPositionRepository shipLastPositionRepository) { - super("ShipLastPosition"); - this.shipLastPositionRepository = shipLastPositionRepository; - } - - @Override - protected void writeItems(List items) throws Exception { - shipLastPositionRepository.saveLastPositionAll(items); - } -} diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index 1848454..c3577c2 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -134,10 +134,10 @@ app: # Core20 캐시 테이블 설정 (환경별로 테이블/컬럼명이 다를 수 있음) core20: - schema: t_std_snp_data # 스키마명 - table: tb_ship_info_mst # 테이블명 - imo-column: imo_no # IMO/LRNO 컬럼명 (PK, NOT NULL) - mmsi-column: mmsi_no # MMSI 컬럼명 (NULLABLE) + schema: new_snp # 스키마명 + table: core20 # 테이블명 + imo-column: lrno # IMO/LRNO 컬럼명 (PK, NOT NULL) + mmsi-column: mmsi # MMSI 컬럼명 (NULLABLE) # 파티션 관리 설정 partition: