feat(batch): 선박 마지막 위치 업데이트 프로세스 변경 #7

병합
HYOJIN feature/last-position-update 에서 develop 로 2 commits 를 머지했습니다 2026-02-24 14:17:39 +09:00
14개의 변경된 파일324개의 추가작업 그리고 849개의 파일을 삭제

파일 보기

@ -1,6 +1,7 @@
package com.snp.batch.jobs.aistargetdbsync.batch.config;
import com.snp.batch.jobs.aistargetdbsync.batch.tasklet.AisTargetDbSyncTasklet;
import com.snp.batch.jobs.aistargetdbsync.batch.tasklet.ShipLastPositionSyncTasklet;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
@ -20,13 +21,14 @@ import org.springframework.transaction.PlatformTransactionManager;
* API: 없음 (캐시 기반)
*
* 동작:
* - Caffeine 캐시에서 최근 15분 이내 데이터 조회
* - MMSI별 최신 위치 1건씩 DB에 UPSERT
* - 1분 주기 aisTargetImportJob과 분리하여 DB 볼륨 최적화
* - Step 1: Caffeine 캐시에서 최근 15분 이내 데이터 조회 ais_target DB UPSERT
* - Step 2: 캐시에서 IMO별 최신 위치 조회 tb_ship_main_info 위치 컬럼 UPDATE
*
* 데이터 흐름:
* - aisTargetImportJob (1분): API 캐시 업데이트
* - aisTargetDbSyncJob (15분): 캐시 DB 저장 ( Job)
* - Step 1: 캐시 ais_target UPSERT
* - Step 2: 캐시 tb_ship_main_info UPDATE (IMO별 최신 1건)
*/
@Slf4j
@Configuration
@ -35,14 +37,17 @@ public class AisTargetDbSyncJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final AisTargetDbSyncTasklet aisTargetDbSyncTasklet;
private final ShipLastPositionSyncTasklet shipLastPositionSyncTasklet;
public AisTargetDbSyncJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
AisTargetDbSyncTasklet aisTargetDbSyncTasklet) {
AisTargetDbSyncTasklet aisTargetDbSyncTasklet,
ShipLastPositionSyncTasklet shipLastPositionSyncTasklet) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
this.aisTargetDbSyncTasklet = aisTargetDbSyncTasklet;
this.shipLastPositionSyncTasklet = shipLastPositionSyncTasklet;
}
@Bean(name = "aisTargetDbSyncStep")
@ -52,6 +57,13 @@ public class AisTargetDbSyncJobConfig {
.build();
}
@Bean(name = "shipLastPositionSyncStep")
public Step shipLastPositionSyncStep() {
return new StepBuilder("shipLastPositionSyncStep", jobRepository)
.tasklet(shipLastPositionSyncTasklet, transactionManager)
.build();
}
@Bean(name = "aisTargetDbSyncJob")
public Job aisTargetDbSyncJob() {
log.info("Job 생성: aisTargetDbSyncJob");
@ -74,6 +86,7 @@ public class AisTargetDbSyncJobConfig {
}
})
.start(aisTargetDbSyncStep())
.next(shipLastPositionSyncStep())
.build();
}
}

파일 보기

@ -0,0 +1,15 @@
package com.snp.batch.jobs.aistargetdbsync.batch.repository;
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
import java.util.List;
/**
* 캐시 기반 Ship Last Position 동기화 Repository
*
* AisTargetEntity(캐시 데이터) 받아 tb_ship_main_info의 위치/항해 컬럼을 UPDATE
*/
public interface ShipLastPositionSyncRepository {
int updateLastPositions(List<AisTargetEntity> entities);
}

파일 보기

@ -0,0 +1,155 @@
package com.snp.batch.jobs.aistargetdbsync.batch.repository;
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import java.sql.PreparedStatement;
import java.sql.Types;
import java.util.List;
/**
* 캐시 기반 Ship Last Position 동기화 Repository 구현
*
* AisTargetEntity의 필드를 tb_ship_main_info 컬럼에 매핑하여 UPDATE 수행.
* 기존 ShipLastPositionRepositoryImpl과 동일한 SQL을 사용하되,
* AisTargetEntity의 타입(OffsetDateTime, Integer ) 맞게 변환 처리.
*/
@Slf4j
@Repository("shipLastPositionSyncRepository")
public class ShipLastPositionSyncRepositoryImpl implements ShipLastPositionSyncRepository {
private static final int BATCH_SIZE = 1000;
private final JdbcTemplate jdbcTemplate;
@Value("${app.batch.service-schema.name}")
private String targetSchema;
@Value("${app.batch.service-schema.tables.service-001}")
private String tableName;
public ShipLastPositionSyncRepositoryImpl(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
private String getTableName() {
return targetSchema + "." + tableName;
}
private String getUpdateSql() {
// return """
// UPDATE %s
// SET last_cptr_hr_utc = ?::timestamptz,
// last_port = ?,
// now_position_lat = ?,
// now_position_lon = ?,
// ship_dest = ?,
// arvl_prnmnt_hr = ?::timestamptz,
// bow_drctn = ?,
// cog = ?,
// sog = ?,
// ship_nav_status = ?,
// cargo_ton = ?,
// add_info = ?,
// in_sts = ?,
// ancrg_yn = ?
// WHERE imo_no = ?
// """.formatted(getTableName());
return """
UPDATE new_snp.core20
SET lastseen = ?::timestamptz,
lastport = ?,
position_latitude = ?,
position_longitude = ?,
destination = ?,
eta = ?::timestamptz,
heading = ?,
cog = ?,
speedservice = ?,
navstat = ?,
tonnes_cargo = ?,
extra_info = ?,
in_sts = ?,
on_berth = ?
WHERE lrno = ?;
""";
}
@Override
@Transactional
public int updateLastPositions(List<AisTargetEntity> entities) {
if (entities == null || entities.isEmpty()) {
return 0;
}
String sql = getUpdateSql();
int totalUpdated = 0;
int[][] batchResults = jdbcTemplate.batchUpdate(sql, entities, BATCH_SIZE,
(ps, entity) -> setUpdateParameters(ps, entity));
for (int[] batchResult : batchResults) {
for (int result : batchResult) {
if (result > 0) {
totalUpdated += result;
}
}
}
log.info("Ship Last Position 동기화 완료: 대상={} 건, 갱신={} 건", entities.size(), totalUpdated);
return totalUpdated;
}
private void setUpdateParameters(PreparedStatement ps, AisTargetEntity entity) throws java.sql.SQLException {
int idx = 1;
// last_cptr_hr_utc receivedDate (OffsetDateTime String for ::timestamptz)
ps.setString(idx++, entity.getReceivedDate() != null ? entity.getReceivedDate().toString() : null);
// last_port lpcCode (Integer String)
ps.setString(idx++, entity.getLpcCode() != null ? String.valueOf(entity.getLpcCode()) : null);
// now_position_lat lat (Double, null-safe)
ps.setObject(idx++, entity.getLat(), Types.DOUBLE);
// now_position_lon lon (Double, null-safe)
ps.setObject(idx++, entity.getLon(), Types.DOUBLE);
// ship_dest destination (String)
ps.setString(idx++, entity.getDestination());
// arvl_prnmnt_hr eta (OffsetDateTime String for ::timestamptz)
ps.setString(idx++, entity.getEta() != null ? entity.getEta().toString() : null);
// bow_drctn heading (Double, null-safe)
ps.setObject(idx++, entity.getHeading(), Types.DOUBLE);
// cog cog (Double, null-safe)
ps.setObject(idx++, entity.getCog(), Types.DOUBLE);
// sog sog (Double, null-safe)
ps.setObject(idx++, entity.getSog(), Types.DOUBLE);
// ship_nav_status status (String)
ps.setString(idx++, entity.getStatus());
// cargo_ton tonnesCargo (Integer, null-safe)
ps.setObject(idx++, entity.getTonnesCargo(), Types.INTEGER);
// add_info extraInfo (String)
ps.setString(idx++, entity.getExtraInfo());
// in_sts inSTS (Integer, null-safe)
ps.setObject(idx++, entity.getInSTS(), Types.INTEGER);
// ancrg_yn onBerth (Boolean, null-safe)
ps.setObject(idx++, entity.getOnBerth(), Types.BOOLEAN);
// WHERE imo_no imoVerified (String)
ps.setString(idx++, entity.getImoVerified());
}
}

파일 보기

@ -0,0 +1,133 @@
package com.snp.batch.jobs.aistargetdbsync.batch.tasklet;
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager;
import com.snp.batch.jobs.aistargetdbsync.batch.repository.ShipLastPositionSyncRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
* Ship Last Position Sync Tasklet
*
* 동작:
* - Caffeine 캐시에서 마지막 성공 이후 ~ 현재까지의 AIS 데이터를 조회
* - imoVerified가 유효한 항목만 필터링
* - IMO별 messageTimestamp 최신 1건만 선택
* - tb_ship_main_info의 위치/항해 14개 컬럼을 UPDATE
*
* 시간 범위 결정 전략:
* - AisTargetDbSyncTasklet과 동일 (lastSuccessTime 기반, fallback 사용)
*/
@Slf4j
@Component
public class ShipLastPositionSyncTasklet implements Tasklet {
private final AisTargetCacheManager cacheManager;
private final ShipLastPositionSyncRepository repository;
private final int fallbackMinutes;
private final AtomicReference<Instant> lastSuccessTime = new AtomicReference<>();
public ShipLastPositionSyncTasklet(
AisTargetCacheManager cacheManager,
ShipLastPositionSyncRepository repository,
@Value("${app.batch.ais-target-db-sync.time-range-minutes:15}") int fallbackMinutes) {
this.cacheManager = cacheManager;
this.repository = repository;
this.fallbackMinutes = fallbackMinutes;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
Instant now = Instant.now();
int rangeMinutes = resolveRangeMinutes(now);
log.info("========================================");
log.info("Ship Last Position Sync 시작");
log.info("조회 범위: 최근 {}분 (방식: {})", rangeMinutes,
lastSuccessTime.get() != null ? "마지막 성공 기준" : "fallback");
log.info("========================================");
long startTime = System.currentTimeMillis();
// 1. 캐시에서 시간 범위 데이터 조회
List<AisTargetEntity> entities = cacheManager.getByTimeRange(rangeMinutes);
if (entities.isEmpty()) {
log.warn("캐시에서 조회된 데이터가 없습니다 (범위: {}분)", rangeMinutes);
lastSuccessTime.set(now);
return RepeatStatus.FINISHED;
}
// 2. imoVerified가 유효한 항목만 필터
List<AisTargetEntity> imoFiltered = entities.stream()
.filter(e -> isValidImo(e.getImoVerified()))
.toList();
if (imoFiltered.isEmpty()) {
log.info("유효한 IMO가 있는 데이터가 없습니다 (전체: {} 건)", entities.size());
lastSuccessTime.set(now);
return RepeatStatus.FINISHED;
}
// 3. IMO별 messageTimestamp 최신 1건만 선택
List<AisTargetEntity> latestPerImo = imoFiltered.stream()
.collect(Collectors.groupingBy(AisTargetEntity::getImoVerified))
.values().stream()
.map(group -> group.stream()
.max(Comparator.comparing(
AisTargetEntity::getMessageTimestamp,
Comparator.nullsFirst(Comparator.naturalOrder())))
.orElseThrow())
.toList();
log.info("캐시 조회: {} 건 → IMO 유효: {} 건 → IMO별 최신: {} 건",
entities.size(), imoFiltered.size(), latestPerImo.size());
// 4. DB UPDATE
int updated = repository.updateLastPositions(latestPerImo);
long elapsed = System.currentTimeMillis() - startTime;
lastSuccessTime.set(now);
log.info("========================================");
log.info("Ship Last Position Sync 완료");
log.info("대상: {} 건, 갱신: {} 건", latestPerImo.size(), updated);
log.info("소요 시간: {}ms", elapsed);
log.info("========================================");
contribution.incrementWriteCount(updated);
return RepeatStatus.FINISHED;
}
private static final int MAX_RANGE_MINUTES = 60;
private int resolveRangeMinutes(Instant now) {
Instant last = lastSuccessTime.get();
if (last == null) {
return Math.min(fallbackMinutes, MAX_RANGE_MINUTES);
}
long elapsedMinutes = Duration.between(last, now).toMinutes();
int range = (int) Math.max(elapsedMinutes + 1, 1);
return Math.min(range, MAX_RANGE_MINUTES);
}
private boolean isValidImo(String imoVerified) {
return imoVerified != null && !imoVerified.isBlank() && !"0".equals(imoVerified);
}
}

파일 보기

@ -1,90 +0,0 @@
package com.snp.batch.jobs.shipdetail.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.shipdetail.batch.dto.TargetEnhancedDto;
import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity;
import com.snp.batch.jobs.shipdetail.batch.processor.ShipLastPositionDataProcessor;
import com.snp.batch.jobs.shipdetail.batch.reader.ShipLastPositionDataReader;
import com.snp.batch.jobs.shipdetail.batch.writer.ShipLastPositionDataWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
@Slf4j
@Configuration
public class ShipLastPositionUpdateJobConfig extends BaseJobConfig<TargetEnhancedDto, TargetEnhancedEntity> {
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeAisApiWebClient;
private final ShipLastPositionDataProcessor shipLastPositionDataProcessor;
private final ShipLastPositionDataWriter shipLastPositionDataWriter;
@Value("${app.batch.service-schema.name}")
private String targetSchema;
@Override
protected int getChunkSize() {
return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정
}
public ShipLastPositionUpdateJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ShipLastPositionDataProcessor shipLastPositionDataProcessor,
ShipLastPositionDataWriter shipLastPositionDataWriter,
JdbcTemplate jdbcTemplate,
@Qualifier("maritimeAisApiWebClient")WebClient maritimeAisApiWebClient) {
super(jobRepository, transactionManager);
this.jdbcTemplate = jdbcTemplate;
this.maritimeAisApiWebClient = maritimeAisApiWebClient;
this.shipLastPositionDataProcessor = shipLastPositionDataProcessor;
this.shipLastPositionDataWriter = shipLastPositionDataWriter;
}
@Override
protected String getJobName() {
return "shipLastPositionUpdateJob";
}
@Override
protected String getStepName() {
return "shipLastPositionUpdateStep";
}
@Override
protected ItemReader<TargetEnhancedDto> createReader() {
return new ShipLastPositionDataReader(maritimeAisApiWebClient, jdbcTemplate, targetSchema);
}
@Override
protected ItemProcessor<TargetEnhancedDto, TargetEnhancedEntity> createProcessor() {
return shipLastPositionDataProcessor;
}
@Override
protected ItemWriter<TargetEnhancedEntity> createWriter() {
return shipLastPositionDataWriter;
}
@Bean(name = "shipLastPositionUpdateJob")
public Job shipLastPositionUpdateJob() {
return job();
}
@Bean(name = "shipLastPositionUpdateStep")
public Step shipLastPositionUpdateStep() {
return step();
}
}

파일 보기

@ -1,42 +0,0 @@
package com.snp.batch.jobs.shipdetail.batch.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
* Maritime API GetShipsByIHSLRorIMONumbers 응답 래퍼
*
* API 응답 구조:
* {
* "shipCount": 5,
* "Ships": [...]
* }
*
* Maritime API GetShipsByIHSLRorIMONumbersAll 응답 래퍼
*
* API 응답 구조:
* {
* "shipCount": 5,
* "ShipResult": [...],
* "APSStatus": {...}
* }
*
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AisApiResponse {
@JsonProperty("ShipResult")
private List<ShipResultDto> shipResult;
@JsonProperty("targetEnhancedArr")
private List<TargetEnhancedDto> targetEnhancedArr;
}

파일 보기

@ -1,185 +0,0 @@
package com.snp.batch.jobs.shipdetail.batch.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TargetEnhancedDto {
@JsonProperty("DWT")
private Integer dwt;
@JsonProperty("STAT5CODE")
private String stat5Code;
@JsonProperty("Source")
private String source;
@JsonProperty("CoG")
private Double cog;
@JsonProperty("LastStaticUpdateReceived")
private String lastStaticUpdateReceived;
@JsonProperty("ZoneId")
private Integer zoneId;
@JsonProperty("LPCCode")
private String lpcCode;
@JsonProperty("TonnesCargo")
private Integer tonnesCargo;
@JsonProperty("StationId")
private String stationId;
@JsonProperty("InSTS")
private Object inSts; // JSON에서 null이므로 Object로 처리
@JsonProperty("ImoVerified")
private String imoVerified;
@JsonProperty("OnBerth")
private Object onBerth;
@JsonProperty("DestinationTidied")
private String destinationTidied;
@JsonProperty("Anomalous")
private Object anomalous; // JSON에서 null이므로 Object로 처리
@JsonProperty("MessageType")
private Object messageType; // JSON에서 null이므로 Object로 처리
@JsonProperty("DestinationPortID")
private Integer destinationPortID;
@JsonProperty("DestinationUNLOCODE")
private String destinationUnlocode;
@JsonProperty("MMSI")
private Integer mmsi;
@JsonProperty("IMO")
private Integer imo;
@JsonProperty("AgeMinutes")
private Double ageMinutes;
@JsonProperty("Lat")
private Double lat;
@JsonProperty("Lon")
private Double lon;
@JsonProperty("Heading")
private Double heading;
@JsonProperty("SoG")
private Double sog;
@JsonProperty("Width")
private Integer width;
@JsonProperty("Length")
private Integer length;
@JsonProperty("Draught")
private Double draught;
@JsonProperty("Name")
private String name;
@JsonProperty("Callsign")
private String callsign;
@JsonProperty("Destination")
private String destination;
@JsonProperty("ETA")
private String eta;
@JsonProperty("Status")
private String status;
@JsonProperty("VesselType")
private String vesselType;
@JsonProperty("ExtraInfo")
private String extraInfo;
@JsonProperty("PositionAccuracy")
private Integer positionAccuracy;
@JsonProperty("RoT")
private Integer rot;
@JsonProperty("TimestampUTC")
private Integer timestampUtc;
@JsonProperty("RepeatIndicator")
private Integer repeatIndicator;
@JsonProperty("RAIMFlag")
private Integer raimFlag;
@JsonProperty("RadioStatus")
private Integer radioStatus;
@JsonProperty("Regional")
private Integer regional;
@JsonProperty("Regional2")
private Integer regional2;
@JsonProperty("Spare")
private Integer spare;
@JsonProperty("Spare2")
private Integer spare2;
@JsonProperty("AISVersion")
private Integer aisVersion;
@JsonProperty("PositionFixType")
private Integer positionFixType;
@JsonProperty("DTE")
private Integer dte;
@JsonProperty("BandFlag")
private Integer bandFlag;
@JsonProperty("ReceivedDate")
private String receivedDate;
@JsonProperty("MessageTimestamp")
private String messageTimestamp;
@JsonProperty("LengthBow")
private Integer lengthBow;
@JsonProperty("LengthStern")
private Integer lengthStern;
@JsonProperty("WidthPort")
private Integer widthPort;
@JsonProperty("WidthStarboard")
private Integer widthStarboard;
// Getters and Setters (생략 - 필요에 따라 추가)
// 예시:
public Integer getDwt() {
return dwt;
}
public void setDwt(Integer dwt) {
this.dwt = dwt;
}
}

파일 보기

@ -1,128 +0,0 @@
package com.snp.batch.jobs.shipdetail.batch.entity;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.snp.batch.common.batch.entity.BaseEntity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.time.LocalDateTime;
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class TargetEnhancedEntity extends BaseEntity {
private Integer dwt;
private String stat5Code;
private String source;
private Double cog;
private String lastStaticUpdateReceived;
private Integer zoneId;
private String lpcCode;
private Integer tonnesCargo;
private String stationId;
private Object inSts; // JSON에서 null이므로 Object로 처리
private String imoVerified;
private Object onBerth;
private String destinationTidied;
private Object anomalous; // JSON에서 null이므로 Object로 처리
private Object messageType; // JSON에서 null이므로 Object로 처리
private Integer destinationPortID;
private String destinationUnlocode;
private Integer mmsi;
private Integer imo;
private Double ageMinutes;
private Double lat;
private Double lon;
private Double heading;
private Double sog;
private Integer width;
private Integer length;
private Double draught;
private String name;
private String callsign;
private String destination;
private String eta;
private String status;
private String vesselType;
private String extraInfo;
private Integer positionAccuracy;
private Integer rot;
private Integer timestampUtc;
private Integer repeatIndicator;
private Integer raimFlag;
private Integer radioStatus;
private Integer regional;
private Integer regional2;
private Integer spare;
private Integer spare2;
private Integer aisVersion;
private Integer positionFixType;
private Integer dte;
private Integer bandFlag;
private String receivedDate;
private String messageTimestamp;
private Integer lengthBow;
private Integer lengthStern;
private Integer widthPort;
private Integer widthStarboard;
}

파일 보기

@ -1,83 +0,0 @@
package com.snp.batch.jobs.shipdetail.batch.processor;
import com.snp.batch.common.batch.processor.BaseProcessor;
import com.snp.batch.jobs.shipdetail.batch.dto.TargetEnhancedDto;
import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ShipLastPositionDataProcessor extends BaseProcessor<TargetEnhancedDto, TargetEnhancedEntity> {
@Override
protected TargetEnhancedEntity processItem(TargetEnhancedDto dto) throws Exception {
log.debug("AIS 최신 항적 데이터 처리 시작: imoNumber={}, shipName={}",
dto.getImoVerified(), dto.getName());
TargetEnhancedEntity entity = TargetEnhancedEntity.builder()
.dwt(dto.getDwt())
.stat5Code(dto.getStat5Code())
.source(dto.getSource())
.lpcCode(dto.getLpcCode())
.tonnesCargo(dto.getTonnesCargo())
.imoVerified(dto.getImoVerified())
.lastStaticUpdateReceived(dto.getLastStaticUpdateReceived()) // Entity 필드 타입이 String인 경우
// Position and Movement Data
.cog(dto.getCog())
.zoneId(dto.getZoneId())
.stationId(dto.getStationId())
.onBerth(dto.getOnBerth())
.mmsi(dto.getMmsi())
.imo(dto.getImo())
.ageMinutes(dto.getAgeMinutes())
.lat(dto.getLat())
.lon(dto.getLon())
.heading(dto.getHeading())
.sog(dto.getSog())
.positionAccuracy(dto.getPositionAccuracy())
.rot(dto.getRot())
.timestampUtc(dto.getTimestampUtc())
.status(dto.getStatus())
// Dimensions and Draught
.width(dto.getWidth())
.length(dto.getLength())
.draught(dto.getDraught())
.lengthBow(dto.getLengthBow())
.lengthStern(dto.getLengthStern())
.widthPort(dto.getWidthPort())
.widthStarboard(dto.getWidthStarboard())
// Voyage Data
.name(dto.getName())
.callsign(dto.getCallsign())
.destination(dto.getDestination())
.destinationTidied(dto.getDestinationTidied())
.destinationPortID(dto.getDestinationPortID())
.destinationUnlocode(dto.getDestinationUnlocode())
.eta(dto.getEta()) // Entity 필드 타입이 String인 경우
.vesselType(dto.getVesselType())
.extraInfo(dto.getExtraInfo())
// AIS Metadata Fields
.repeatIndicator(dto.getRepeatIndicator())
.raimFlag(dto.getRaimFlag())
.radioStatus(dto.getRadioStatus())
.regional(dto.getRegional())
.regional2(dto.getRegional2())
.spare(dto.getSpare())
.spare2(dto.getSpare2())
.aisVersion(dto.getAisVersion())
.positionFixType(dto.getPositionFixType())
.dte(dto.getDte())
.bandFlag(dto.getBandFlag())
.receivedDate(dto.getReceivedDate()) // Entity 필드 타입이 String인 경우
.messageTimestamp(dto.getMessageTimestamp()) // Entity 필드 타입이 String인 경우
// Null/Object Fields
.inSts(dto.getInSts())
.anomalous(dto.getAnomalous())
.messageType(dto.getMessageType())
.build();
log.debug("AIS 최신 항적 데이터 처리 완료: imoNumber={}", dto.getImoVerified());
return entity;
}
}

파일 보기

@ -1,155 +0,0 @@
package com.snp.batch.jobs.shipdetail.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.shipdetail.batch.dto.AisApiResponse;
import com.snp.batch.jobs.shipdetail.batch.dto.TargetEnhancedDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.*;
@Slf4j
public class ShipLastPositionDataReader extends BaseApiReader<TargetEnhancedDto> {
//TODO :
// 1. Core20 IMO_NUMBER 전체 조회
// 2. IMO번호에 대한 마지막 AIS 신호 요청 (1회 최대 5000개 : Chunk 단위로 반복)
// 3. Response Data -> Core20에 업데이트 (Chunk 단위로 반복)
private final JdbcTemplate jdbcTemplate;
private final String targetSchema;
private List<String> allImoNumbers;
private int currentBatchIndex = 0;
private final int batchSize = 5000;
public ShipLastPositionDataReader(WebClient webClient, JdbcTemplate jdbcTemplate, String targetSchema) {
super(webClient);
this.jdbcTemplate = jdbcTemplate;
this.targetSchema = targetSchema;
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "ShipLastPositionDataReader";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allImoNumbers = null;
}
@Override
protected String getApiPath() {
return "/AisSvc.svc/AIS/GetTargetsByIMOsEnhanced";
}
private String getTargetTable(){
return targetSchema + ".tb_ship_main_info";
}
private String getImoQuery() {
return "SELECT imo_no FROM " + getTargetTable() + " ORDER BY imo_no";
}
@Override
protected void beforeFetch(){
log.info("[{}] Core20 테이블에서 IMO 번호 조회 시작...", getReaderName());
allImoNumbers = jdbcTemplate.queryForList(getImoQuery(), String.class);
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
updateApiCallStats(totalBatches, 0);
}
@Override
protected List<TargetEnhancedDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
return null; // Job 종료
}
// 현재 배치의 시작/ 인덱스 계산
int startIndex = currentBatchIndex;
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
try {
// IMO 번호를 쉼표로 연결 (: "1000019,1000021,1000033,...")
String imoParam = String.join(",", currentBatch);
// API 호출
AisApiResponse response = callAisApiWithBatch(imoParam);
// 다음 배치로 인덱스 이동
currentBatchIndex = endIndex;
// 응답 처리
if (response != null && response.getTargetEnhancedArr() != null) {
List<TargetEnhancedDto> targets = response.getTargetEnhancedArr();
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, targets.size());
// API 호출 통계 업데이트
updateApiCallStats(totalBatches, currentBatchNumber);
// API 과부하 방지 (다음 배치 0.5초 대기)
if (currentBatchIndex < allImoNumbers.size()) {
Thread.sleep(500);
}
return targets;
} else {
log.warn("[{}] 배치 {}/{} 응답 없음",
getReaderName(), currentBatchNumber, totalBatches);
// API 호출 통계 업데이트 (실패도 카운트)
updateApiCallStats(totalBatches, currentBatchNumber);
return Collections.emptyList();
}
} catch (Exception e) {
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
currentBatchIndex = endIndex;
// 리스트 반환 (Job 계속 진행)
return Collections.emptyList();
}
}
private AisApiResponse callAisApiWithBatch(String imoNumbers) {
String url = getApiPath();
Map<String, String> requestBody = Map.of("imo", imoNumbers);
log.debug("[{}] API 호출: {}", getReaderName(), url);
log.debug("[{}] : {}", "requestBody", requestBody.toString());
return webClient.post()
.uri(url)
.bodyValue(requestBody)
.retrieve()
.bodyToMono(AisApiResponse.class)
.block();
}
}

파일 보기

@ -1,9 +0,0 @@
package com.snp.batch.jobs.shipdetail.batch.repository;
import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity;
import java.util.List;
public interface ShipLastPositionRepository {
void saveLastPositionAll(List<TargetEnhancedEntity> items);
}

파일 보기

@ -1,125 +0,0 @@
package com.snp.batch.jobs.shipdetail.batch.repository;
import com.snp.batch.common.batch.repository.BaseJdbcRepository;
import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import java.sql.PreparedStatement;
import java.sql.Types;
import java.util.List;
@Slf4j
@Repository("shipLastPositionRepository")
public class ShipLastPositionRepositoryImpl extends BaseJdbcRepository<TargetEnhancedEntity, Long> implements ShipLastPositionRepository {
@Value("${app.batch.service-schema.name}")
private String targetSchema;
@Value("${app.batch.service-schema.tables.service-001}")
private String tableName;
public ShipLastPositionRepositoryImpl(JdbcTemplate jdbcTemplate) {
super(jdbcTemplate);
}
@Override
protected String getTargetSchema() {
return targetSchema;
}
@Override
protected String getSimpleTableName() {
return tableName;
}
@Override
protected RowMapper<TargetEnhancedEntity> getRowMapper() {
return null;
}
@Override
protected Long extractId(TargetEnhancedEntity entity) {
return null;
}
@Override
protected String getInsertSql() {
return null;
}
@Override
protected String getUpdateSql() {
return """
UPDATE %s
SET last_cptr_hr_utc = ?::timestamptz,
last_port = ?,
now_position_lat = ?,
now_position_lon = ?,
ship_dest = ?,
arvl_prnmnt_hr = ?::timestamptz,
bow_drctn = ?,
cog = ?,
sog = ?,
ship_nav_status = ?,
cargo_ton = ?,
add_info = ?,
in_sts = ?,
ancrg_yn = ?
WHERE imo_no = ?;
""".formatted(getTableName());
}
@Override
protected void setInsertParameters(PreparedStatement ps, TargetEnhancedEntity entity) throws Exception {
}
@Override
protected void setUpdateParameters(PreparedStatement ps, TargetEnhancedEntity entity) throws Exception {
int idx = 1;
ps.setString(idx++, entity.getReceivedDate()); // receivedDate : lastseen (Datetime)
ps.setString(idx++, entity.getLpcCode()); // lpcCode : lastport (String)
ps.setDouble(idx++, entity.getLat()); // lat : position_latitude (Double)
ps.setDouble(idx++, entity.getLon()); // lon : position_longitude (Double)
ps.setString(idx++, entity.getDestination()); // destination : destination (String)
ps.setString(idx++, entity.getEta()); // eta : eta (Datetime)
ps.setDouble(idx++, entity.getHeading()); // heading : heading (Double)
ps.setDouble(idx++, entity.getSog()); // sog : speedservice (Double)
ps.setDouble(idx++, entity.getCog()); // cog : cog (Double)
ps.setString(idx++, entity.getStatus()); // status : navstat (String)
ps.setInt(idx++, entity.getTonnesCargo()); // status : tonnes_cargo (Int)
ps.setString(idx++, entity.getExtraInfo()); // status : extra_info (String)
ps.setObject(idx++, entity.getInSts(), Types.INTEGER);
ps.setObject(idx++, entity.getOnBerth(), Types.BOOLEAN);
ps.setString(idx++, entity.getImoVerified()); // imoVerified : imo (String)
}
@Override
protected String getEntityName() {
return "TargetEnhancedEntity";
}
@Override
@Transactional
public void saveLastPositionAll(List<TargetEnhancedEntity> items) {
if (items == null || items.isEmpty()) {
return;
}
jdbcTemplate.batchUpdate(getUpdateSql(), items, items.size(),
(ps, entity) -> {
try {
setUpdateParameters(ps, entity);
} catch (Exception e) {
log.error("배치 수정 파라미터 설정 실패", e);
throw new RuntimeException(e);
}
});
log.info("{} 전체 저장 완료: 수정={} 건", getEntityName(), items.size());
}
}

파일 보기

@ -1,24 +0,0 @@
package com.snp.batch.jobs.shipdetail.batch.writer;
import com.snp.batch.common.batch.writer.BaseWriter;
import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity;
import com.snp.batch.jobs.shipdetail.batch.repository.ShipLastPositionRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class ShipLastPositionDataWriter extends BaseWriter<TargetEnhancedEntity> {
private final ShipLastPositionRepository shipLastPositionRepository;
public ShipLastPositionDataWriter(ShipLastPositionRepository shipLastPositionRepository) {
super("ShipLastPosition");
this.shipLastPositionRepository = shipLastPositionRepository;
}
@Override
protected void writeItems(List<TargetEnhancedEntity> items) throws Exception {
shipLastPositionRepository.saveLastPositionAll(items);
}
}

파일 보기

@ -134,10 +134,10 @@ app:
# Core20 캐시 테이블 설정 (환경별로 테이블/컬럼명이 다를 수 있음)
core20:
schema: t_std_snp_data # 스키마명
table: tb_ship_info_mst # 테이블명
imo-column: imo_no # IMO/LRNO 컬럼명 (PK, NOT NULL)
mmsi-column: mmsi_no # MMSI 컬럼명 (NULLABLE)
schema: new_snp # 스키마명
table: core20 # 테이블명
imo-column: lrno # IMO/LRNO 컬럼명 (PK, NOT NULL)
mmsi-column: mmsi # MMSI 컬럼명 (NULLABLE)
# 파티션 관리 설정
partition: