✨ Core20 : AIS 신호 데이터 업데이트 Job
This commit is contained in:
부모
4ea0a565c5
커밋
094bd13e36
33
src/main/java/com/snp/batch/common/util/SafeGetDataUtil.java
Normal file
33
src/main/java/com/snp/batch/common/util/SafeGetDataUtil.java
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
package com.snp.batch.common.util;
|
||||||
|
|
||||||
|
public class SafeGetDataUtil {
|
||||||
|
private String safeGetString(String value) {
|
||||||
|
if (value == null || value.trim().isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return value.trim();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Double safeGetDouble(String value) {
|
||||||
|
if (value == null || value.trim().isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return Double.parseDouble(value);
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Long safeGetLong(String value) {
|
||||||
|
if (value == null || value.trim().isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return Long.parseLong(value.trim());
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -29,6 +29,9 @@ public class MaritimeApiWebClientConfig {
|
|||||||
@Value("${app.batch.ship-api.url}")
|
@Value("${app.batch.ship-api.url}")
|
||||||
private String maritimeApiUrl;
|
private String maritimeApiUrl;
|
||||||
|
|
||||||
|
@Value("https://aisapi.maritime.spglobal.com")
|
||||||
|
private String maritimeAisApiUrl;
|
||||||
|
|
||||||
@Value("${app.batch.ship-api.username}")
|
@Value("${app.batch.ship-api.username}")
|
||||||
private String maritimeApiUsername;
|
private String maritimeApiUsername;
|
||||||
|
|
||||||
@ -60,6 +63,22 @@ public class MaritimeApiWebClientConfig {
|
|||||||
.maxInMemorySize(20 * 1024 * 1024)) // 20MB 버퍼
|
.maxInMemorySize(20 * 1024 * 1024)) // 20MB 버퍼
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean(name = "maritimeAisApiWebClient")
|
||||||
|
public WebClient maritimeAisApiWebClient(){
|
||||||
|
log.info("========================================");
|
||||||
|
log.info("Maritime AIS API WebClient 생성");
|
||||||
|
log.info("Base URL: {}", maritimeAisApiUrl);
|
||||||
|
log.info("========================================");
|
||||||
|
|
||||||
|
return WebClient.builder()
|
||||||
|
.baseUrl(maritimeAisApiUrl)
|
||||||
|
.defaultHeaders(headers -> headers.setBasicAuth(maritimeApiUsername, maritimeApiPassword))
|
||||||
|
.codecs(configurer -> configurer
|
||||||
|
.defaultCodecs()
|
||||||
|
.maxInMemorySize(20 * 1024 * 1024)) // 20MB 버퍼
|
||||||
|
.build();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -0,0 +1,85 @@
|
|||||||
|
package com.snp.batch.jobs.shipdetail.batch.config;
|
||||||
|
|
||||||
|
import com.snp.batch.common.batch.config.BaseJobConfig;
|
||||||
|
import com.snp.batch.jobs.shipdetail.batch.dto.TargetEnhancedDto;
|
||||||
|
import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity;
|
||||||
|
import com.snp.batch.jobs.shipdetail.batch.processor.ShipLastPositionDataProcessor;
|
||||||
|
import com.snp.batch.jobs.shipdetail.batch.reader.ShipLastPositionDataReader;
|
||||||
|
import com.snp.batch.jobs.shipdetail.batch.writer.ShipLastPositionDataWriter;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.batch.core.Job;
|
||||||
|
import org.springframework.batch.core.Step;
|
||||||
|
import org.springframework.batch.core.repository.JobRepository;
|
||||||
|
import org.springframework.batch.item.ItemProcessor;
|
||||||
|
import org.springframework.batch.item.ItemReader;
|
||||||
|
import org.springframework.batch.item.ItemWriter;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
import org.springframework.transaction.PlatformTransactionManager;
|
||||||
|
import org.springframework.web.reactive.function.client.WebClient;
|
||||||
|
@Slf4j
|
||||||
|
@Configuration
|
||||||
|
public class ShipLastPositionUpdateJobConfig extends BaseJobConfig<TargetEnhancedDto, TargetEnhancedEntity> {
|
||||||
|
private final JdbcTemplate jdbcTemplate;
|
||||||
|
private final WebClient maritimeAisApiWebClient;
|
||||||
|
|
||||||
|
private final ShipLastPositionDataProcessor shipLastPositionDataProcessor;
|
||||||
|
|
||||||
|
private final ShipLastPositionDataWriter shipLastPositionDataWriter;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int getChunkSize() {
|
||||||
|
return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정
|
||||||
|
}
|
||||||
|
public ShipLastPositionUpdateJobConfig(
|
||||||
|
JobRepository jobRepository,
|
||||||
|
PlatformTransactionManager transactionManager,
|
||||||
|
ShipLastPositionDataProcessor shipLastPositionDataProcessor,
|
||||||
|
ShipLastPositionDataWriter shipLastPositionDataWriter,
|
||||||
|
JdbcTemplate jdbcTemplate,
|
||||||
|
@Qualifier("maritimeAisApiWebClient")WebClient maritimeAisApiWebClient) {
|
||||||
|
super(jobRepository, transactionManager);
|
||||||
|
this.jdbcTemplate = jdbcTemplate;
|
||||||
|
this.maritimeAisApiWebClient = maritimeAisApiWebClient;
|
||||||
|
this.shipLastPositionDataProcessor = shipLastPositionDataProcessor;
|
||||||
|
this.shipLastPositionDataWriter = shipLastPositionDataWriter;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getJobName() {
|
||||||
|
return "shipLastPositionUpdateJob";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getStepName() {
|
||||||
|
return "shipLastPositionUpdateStep";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ItemReader<TargetEnhancedDto> createReader() {
|
||||||
|
return new ShipLastPositionDataReader(maritimeAisApiWebClient, jdbcTemplate);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ItemProcessor<TargetEnhancedDto, TargetEnhancedEntity> createProcessor() {
|
||||||
|
return shipLastPositionDataProcessor;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ItemWriter<TargetEnhancedEntity> createWriter() {
|
||||||
|
return shipLastPositionDataWriter;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean(name = "shipLastPositionUpdateJob")
|
||||||
|
public Job shipLastPositionUpdateJob() {
|
||||||
|
return job();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean(name = "shipLastPositionUpdateStep")
|
||||||
|
public Step shipLastPositionUpdateStep() {
|
||||||
|
return step();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -0,0 +1,42 @@
|
|||||||
|
package com.snp.batch.jobs.shipdetail.batch.dto;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maritime API GetShipsByIHSLRorIMONumbers 응답 래퍼
|
||||||
|
*
|
||||||
|
* API 응답 구조:
|
||||||
|
* {
|
||||||
|
* "shipCount": 5,
|
||||||
|
* "Ships": [...]
|
||||||
|
* }
|
||||||
|
*
|
||||||
|
* Maritime API GetShipsByIHSLRorIMONumbersAll 응답 래퍼
|
||||||
|
*
|
||||||
|
* API 응답 구조:
|
||||||
|
* {
|
||||||
|
* "shipCount": 5,
|
||||||
|
* "ShipResult": [...],
|
||||||
|
* "APSStatus": {...}
|
||||||
|
* }
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@Builder
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class AisApiResponse {
|
||||||
|
|
||||||
|
@JsonProperty("ShipResult")
|
||||||
|
private List<ShipResultDto> shipResult;
|
||||||
|
|
||||||
|
@JsonProperty("targetEnhancedArr")
|
||||||
|
private List<TargetEnhancedDto> targetEnhancedArr;
|
||||||
|
|
||||||
|
}
|
||||||
@ -0,0 +1,185 @@
|
|||||||
|
package com.snp.batch.jobs.shipdetail.batch.dto;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@Builder
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class TargetEnhancedDto {
|
||||||
|
|
||||||
|
@JsonProperty("DWT")
|
||||||
|
private Integer dwt;
|
||||||
|
|
||||||
|
@JsonProperty("STAT5CODE")
|
||||||
|
private String stat5Code;
|
||||||
|
|
||||||
|
@JsonProperty("Source")
|
||||||
|
private String source;
|
||||||
|
|
||||||
|
@JsonProperty("CoG")
|
||||||
|
private Double cog;
|
||||||
|
|
||||||
|
@JsonProperty("LastStaticUpdateReceived")
|
||||||
|
private String lastStaticUpdateReceived;
|
||||||
|
|
||||||
|
@JsonProperty("ZoneId")
|
||||||
|
private Integer zoneId;
|
||||||
|
|
||||||
|
@JsonProperty("LPCCode")
|
||||||
|
private String lpcCode;
|
||||||
|
|
||||||
|
@JsonProperty("TonnesCargo")
|
||||||
|
private Integer tonnesCargo;
|
||||||
|
|
||||||
|
@JsonProperty("StationId")
|
||||||
|
private String stationId;
|
||||||
|
|
||||||
|
@JsonProperty("InSTS")
|
||||||
|
private Object inSts; // JSON에서 null이므로 Object로 처리
|
||||||
|
|
||||||
|
@JsonProperty("ImoVerified")
|
||||||
|
private String imoVerified;
|
||||||
|
|
||||||
|
@JsonProperty("OnBerth")
|
||||||
|
private Boolean onBerth;
|
||||||
|
|
||||||
|
@JsonProperty("DestinationTidied")
|
||||||
|
private String destinationTidied;
|
||||||
|
|
||||||
|
@JsonProperty("Anomalous")
|
||||||
|
private Object anomalous; // JSON에서 null이므로 Object로 처리
|
||||||
|
|
||||||
|
@JsonProperty("MessageType")
|
||||||
|
private Object messageType; // JSON에서 null이므로 Object로 처리
|
||||||
|
|
||||||
|
@JsonProperty("DestinationPortID")
|
||||||
|
private Integer destinationPortID;
|
||||||
|
|
||||||
|
@JsonProperty("DestinationUNLOCODE")
|
||||||
|
private String destinationUnlocode;
|
||||||
|
|
||||||
|
@JsonProperty("MMSI")
|
||||||
|
private Integer mmsi;
|
||||||
|
|
||||||
|
@JsonProperty("IMO")
|
||||||
|
private Integer imo;
|
||||||
|
|
||||||
|
@JsonProperty("AgeMinutes")
|
||||||
|
private Double ageMinutes;
|
||||||
|
|
||||||
|
@JsonProperty("Lat")
|
||||||
|
private Double lat;
|
||||||
|
|
||||||
|
@JsonProperty("Lon")
|
||||||
|
private Double lon;
|
||||||
|
|
||||||
|
@JsonProperty("Heading")
|
||||||
|
private Double heading;
|
||||||
|
|
||||||
|
@JsonProperty("SoG")
|
||||||
|
private Double sog;
|
||||||
|
|
||||||
|
@JsonProperty("Width")
|
||||||
|
private Integer width;
|
||||||
|
|
||||||
|
@JsonProperty("Length")
|
||||||
|
private Integer length;
|
||||||
|
|
||||||
|
@JsonProperty("Draught")
|
||||||
|
private Double draught;
|
||||||
|
|
||||||
|
@JsonProperty("Name")
|
||||||
|
private String name;
|
||||||
|
|
||||||
|
@JsonProperty("Callsign")
|
||||||
|
private String callsign;
|
||||||
|
|
||||||
|
@JsonProperty("Destination")
|
||||||
|
private String destination;
|
||||||
|
|
||||||
|
@JsonProperty("ETA")
|
||||||
|
private String eta;
|
||||||
|
|
||||||
|
@JsonProperty("Status")
|
||||||
|
private String status;
|
||||||
|
|
||||||
|
@JsonProperty("VesselType")
|
||||||
|
private String vesselType;
|
||||||
|
|
||||||
|
@JsonProperty("ExtraInfo")
|
||||||
|
private String extraInfo;
|
||||||
|
|
||||||
|
@JsonProperty("PositionAccuracy")
|
||||||
|
private Integer positionAccuracy;
|
||||||
|
|
||||||
|
@JsonProperty("RoT")
|
||||||
|
private Integer rot;
|
||||||
|
|
||||||
|
@JsonProperty("TimestampUTC")
|
||||||
|
private Integer timestampUtc;
|
||||||
|
|
||||||
|
@JsonProperty("RepeatIndicator")
|
||||||
|
private Integer repeatIndicator;
|
||||||
|
|
||||||
|
@JsonProperty("RAIMFlag")
|
||||||
|
private Integer raimFlag;
|
||||||
|
|
||||||
|
@JsonProperty("RadioStatus")
|
||||||
|
private Integer radioStatus;
|
||||||
|
|
||||||
|
@JsonProperty("Regional")
|
||||||
|
private Integer regional;
|
||||||
|
|
||||||
|
@JsonProperty("Regional2")
|
||||||
|
private Integer regional2;
|
||||||
|
|
||||||
|
@JsonProperty("Spare")
|
||||||
|
private Integer spare;
|
||||||
|
|
||||||
|
@JsonProperty("Spare2")
|
||||||
|
private Integer spare2;
|
||||||
|
|
||||||
|
@JsonProperty("AISVersion")
|
||||||
|
private Integer aisVersion;
|
||||||
|
|
||||||
|
@JsonProperty("PositionFixType")
|
||||||
|
private Integer positionFixType;
|
||||||
|
|
||||||
|
@JsonProperty("DTE")
|
||||||
|
private Integer dte;
|
||||||
|
|
||||||
|
@JsonProperty("BandFlag")
|
||||||
|
private Integer bandFlag;
|
||||||
|
|
||||||
|
@JsonProperty("ReceivedDate")
|
||||||
|
private String receivedDate;
|
||||||
|
|
||||||
|
@JsonProperty("MessageTimestamp")
|
||||||
|
private String messageTimestamp;
|
||||||
|
|
||||||
|
@JsonProperty("LengthBow")
|
||||||
|
private Integer lengthBow;
|
||||||
|
|
||||||
|
@JsonProperty("LengthStern")
|
||||||
|
private Integer lengthStern;
|
||||||
|
|
||||||
|
@JsonProperty("WidthPort")
|
||||||
|
private Integer widthPort;
|
||||||
|
|
||||||
|
@JsonProperty("WidthStarboard")
|
||||||
|
private Integer widthStarboard;
|
||||||
|
|
||||||
|
// Getters and Setters (생략 - 필요에 따라 추가)
|
||||||
|
// 예시:
|
||||||
|
public Integer getDwt() {
|
||||||
|
return dwt;
|
||||||
|
}
|
||||||
|
public void setDwt(Integer dwt) {
|
||||||
|
this.dwt = dwt;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,128 @@
|
|||||||
|
package com.snp.batch.jobs.shipdetail.batch.entity;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.snp.batch.common.batch.entity.BaseEntity;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.EqualsAndHashCode;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
import lombok.experimental.SuperBuilder;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@SuperBuilder
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
@EqualsAndHashCode(callSuper = true)
|
||||||
|
public class TargetEnhancedEntity extends BaseEntity {
|
||||||
|
|
||||||
|
private Integer dwt;
|
||||||
|
|
||||||
|
private String stat5Code;
|
||||||
|
|
||||||
|
private String source;
|
||||||
|
|
||||||
|
private Double cog;
|
||||||
|
|
||||||
|
private String lastStaticUpdateReceived;
|
||||||
|
|
||||||
|
private Integer zoneId;
|
||||||
|
|
||||||
|
private String lpcCode;
|
||||||
|
|
||||||
|
private Integer tonnesCargo;
|
||||||
|
|
||||||
|
private String stationId;
|
||||||
|
|
||||||
|
private Object inSts; // JSON에서 null이므로 Object로 처리
|
||||||
|
|
||||||
|
private String imoVerified;
|
||||||
|
|
||||||
|
private Boolean onBerth;
|
||||||
|
|
||||||
|
private String destinationTidied;
|
||||||
|
|
||||||
|
private Object anomalous; // JSON에서 null이므로 Object로 처리
|
||||||
|
|
||||||
|
private Object messageType; // JSON에서 null이므로 Object로 처리
|
||||||
|
|
||||||
|
private Integer destinationPortID;
|
||||||
|
|
||||||
|
private String destinationUnlocode;
|
||||||
|
|
||||||
|
private Integer mmsi;
|
||||||
|
|
||||||
|
private Integer imo;
|
||||||
|
|
||||||
|
private Double ageMinutes;
|
||||||
|
|
||||||
|
private Double lat;
|
||||||
|
|
||||||
|
private Double lon;
|
||||||
|
|
||||||
|
private Double heading;
|
||||||
|
|
||||||
|
private Double sog;
|
||||||
|
|
||||||
|
private Integer width;
|
||||||
|
|
||||||
|
private Integer length;
|
||||||
|
|
||||||
|
private Double draught;
|
||||||
|
|
||||||
|
private String name;
|
||||||
|
|
||||||
|
private String callsign;
|
||||||
|
|
||||||
|
private String destination;
|
||||||
|
|
||||||
|
private String eta;
|
||||||
|
|
||||||
|
private String status;
|
||||||
|
|
||||||
|
private String vesselType;
|
||||||
|
|
||||||
|
private String extraInfo;
|
||||||
|
|
||||||
|
private Integer positionAccuracy;
|
||||||
|
|
||||||
|
private Integer rot;
|
||||||
|
|
||||||
|
private Integer timestampUtc;
|
||||||
|
|
||||||
|
private Integer repeatIndicator;
|
||||||
|
|
||||||
|
private Integer raimFlag;
|
||||||
|
|
||||||
|
private Integer radioStatus;
|
||||||
|
|
||||||
|
private Integer regional;
|
||||||
|
|
||||||
|
private Integer regional2;
|
||||||
|
|
||||||
|
private Integer spare;
|
||||||
|
|
||||||
|
private Integer spare2;
|
||||||
|
|
||||||
|
private Integer aisVersion;
|
||||||
|
|
||||||
|
private Integer positionFixType;
|
||||||
|
|
||||||
|
private Integer dte;
|
||||||
|
|
||||||
|
private Integer bandFlag;
|
||||||
|
|
||||||
|
private String receivedDate;
|
||||||
|
|
||||||
|
private String messageTimestamp;
|
||||||
|
|
||||||
|
private Integer lengthBow;
|
||||||
|
|
||||||
|
private Integer lengthStern;
|
||||||
|
|
||||||
|
private Integer widthPort;
|
||||||
|
|
||||||
|
private Integer widthStarboard;
|
||||||
|
|
||||||
|
}
|
||||||
@ -0,0 +1,83 @@
|
|||||||
|
package com.snp.batch.jobs.shipdetail.batch.processor;
|
||||||
|
|
||||||
|
import com.snp.batch.common.batch.processor.BaseProcessor;
|
||||||
|
import com.snp.batch.jobs.shipdetail.batch.dto.TargetEnhancedDto;
|
||||||
|
import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class ShipLastPositionDataProcessor extends BaseProcessor<TargetEnhancedDto, TargetEnhancedEntity> {
|
||||||
|
@Override
|
||||||
|
protected TargetEnhancedEntity processItem(TargetEnhancedDto dto) throws Exception {
|
||||||
|
log.debug("AIS 최신 항적 데이터 처리 시작: imoNumber={}, shipName={}",
|
||||||
|
dto.getImoVerified(), dto.getName());
|
||||||
|
|
||||||
|
TargetEnhancedEntity entity = TargetEnhancedEntity.builder()
|
||||||
|
.dwt(dto.getDwt())
|
||||||
|
.stat5Code(dto.getStat5Code())
|
||||||
|
.source(dto.getSource())
|
||||||
|
.lpcCode(dto.getLpcCode())
|
||||||
|
.tonnesCargo(dto.getTonnesCargo())
|
||||||
|
.imoVerified(dto.getImoVerified())
|
||||||
|
.lastStaticUpdateReceived(dto.getLastStaticUpdateReceived()) // Entity 필드 타입이 String인 경우
|
||||||
|
// Position and Movement Data
|
||||||
|
.cog(dto.getCog())
|
||||||
|
.zoneId(dto.getZoneId())
|
||||||
|
.stationId(dto.getStationId())
|
||||||
|
.onBerth(dto.getOnBerth())
|
||||||
|
.mmsi(dto.getMmsi())
|
||||||
|
.imo(dto.getImo())
|
||||||
|
.ageMinutes(dto.getAgeMinutes())
|
||||||
|
.lat(dto.getLat())
|
||||||
|
.lon(dto.getLon())
|
||||||
|
.heading(dto.getHeading())
|
||||||
|
.sog(dto.getSog())
|
||||||
|
.positionAccuracy(dto.getPositionAccuracy())
|
||||||
|
.rot(dto.getRot())
|
||||||
|
.timestampUtc(dto.getTimestampUtc())
|
||||||
|
.status(dto.getStatus())
|
||||||
|
// Dimensions and Draught
|
||||||
|
.width(dto.getWidth())
|
||||||
|
.length(dto.getLength())
|
||||||
|
.draught(dto.getDraught())
|
||||||
|
.lengthBow(dto.getLengthBow())
|
||||||
|
.lengthStern(dto.getLengthStern())
|
||||||
|
.widthPort(dto.getWidthPort())
|
||||||
|
.widthStarboard(dto.getWidthStarboard())
|
||||||
|
// Voyage Data
|
||||||
|
.name(dto.getName())
|
||||||
|
.callsign(dto.getCallsign())
|
||||||
|
.destination(dto.getDestination())
|
||||||
|
.destinationTidied(dto.getDestinationTidied())
|
||||||
|
.destinationPortID(dto.getDestinationPortID())
|
||||||
|
.destinationUnlocode(dto.getDestinationUnlocode())
|
||||||
|
.eta(dto.getEta()) // Entity 필드 타입이 String인 경우
|
||||||
|
.vesselType(dto.getVesselType())
|
||||||
|
.extraInfo(dto.getExtraInfo())
|
||||||
|
// AIS Metadata Fields
|
||||||
|
.repeatIndicator(dto.getRepeatIndicator())
|
||||||
|
.raimFlag(dto.getRaimFlag())
|
||||||
|
.radioStatus(dto.getRadioStatus())
|
||||||
|
.regional(dto.getRegional())
|
||||||
|
.regional2(dto.getRegional2())
|
||||||
|
.spare(dto.getSpare())
|
||||||
|
.spare2(dto.getSpare2())
|
||||||
|
.aisVersion(dto.getAisVersion())
|
||||||
|
.positionFixType(dto.getPositionFixType())
|
||||||
|
.dte(dto.getDte())
|
||||||
|
.bandFlag(dto.getBandFlag())
|
||||||
|
.receivedDate(dto.getReceivedDate()) // Entity 필드 타입이 String인 경우
|
||||||
|
.messageTimestamp(dto.getMessageTimestamp()) // Entity 필드 타입이 String인 경우
|
||||||
|
// Null/Object Fields
|
||||||
|
.inSts(dto.getInSts())
|
||||||
|
.anomalous(dto.getAnomalous())
|
||||||
|
.messageType(dto.getMessageType())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
log.debug("AIS 최신 항적 데이터 처리 완료: imoNumber={}", dto.getImoVerified());
|
||||||
|
|
||||||
|
return entity;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,150 @@
|
|||||||
|
package com.snp.batch.jobs.shipdetail.batch.reader;
|
||||||
|
|
||||||
|
import com.snp.batch.common.batch.reader.BaseApiReader;
|
||||||
|
import com.snp.batch.jobs.shipdetail.batch.dto.AisApiResponse;
|
||||||
|
import com.snp.batch.jobs.shipdetail.batch.dto.TargetEnhancedDto;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
import org.springframework.web.reactive.function.client.WebClient;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
@Slf4j
|
||||||
|
public class ShipLastPositionDataReader extends BaseApiReader<TargetEnhancedDto> {
|
||||||
|
|
||||||
|
//TODO :
|
||||||
|
// 1. Core20 IMO_NUMBER 전체 조회
|
||||||
|
// 2. IMO번호에 대한 마지막 AIS 신호 요청 (1회 최대 5000개 : Chunk 단위로 반복)
|
||||||
|
// 3. Response Data -> Core20에 업데이트 (Chunk 단위로 반복)
|
||||||
|
|
||||||
|
private final JdbcTemplate jdbcTemplate;
|
||||||
|
|
||||||
|
private List<String> allImoNumbers;
|
||||||
|
private int currentBatchIndex = 0;
|
||||||
|
private final int batchSize = 5000;
|
||||||
|
|
||||||
|
public ShipLastPositionDataReader(WebClient webClient, JdbcTemplate jdbcTemplate) {
|
||||||
|
super(webClient);
|
||||||
|
this.jdbcTemplate = jdbcTemplate;
|
||||||
|
enableChunkMode(); // ✨ Chunk 모드 활성화
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getReaderName() {
|
||||||
|
return "ShipLastPositionDataReader";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getApiPath() {
|
||||||
|
return "/AisSvc.svc/AIS/GetTargetsByIMOsEnhanced";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getApiBaseUrl() {
|
||||||
|
return "https://aisapi.maritime.spglobal.com";
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getTargetTable(){
|
||||||
|
return "test_s_p.test_core20";
|
||||||
|
}
|
||||||
|
private String GET_CORE_IMO_LIST =
|
||||||
|
"SELECT ihslrorimoshipno FROM " + getTargetTable() + " ORDER BY ihslrorimoshipno";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void beforeFetch(){
|
||||||
|
log.info("[{}] Core20 테이블에서 IMO 번호 조회 시작...", getReaderName());
|
||||||
|
|
||||||
|
allImoNumbers = jdbcTemplate.queryForList(GET_CORE_IMO_LIST, String.class);
|
||||||
|
|
||||||
|
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
|
||||||
|
|
||||||
|
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
|
||||||
|
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
|
||||||
|
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
|
||||||
|
|
||||||
|
updateApiCallStats(totalBatches, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<TargetEnhancedDto> fetchNextBatch() throws Exception {
|
||||||
|
// 모든 배치 처리 완료 확인
|
||||||
|
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
|
||||||
|
return null; // Job 종료
|
||||||
|
}
|
||||||
|
|
||||||
|
// 현재 배치의 시작/끝 인덱스 계산
|
||||||
|
int startIndex = currentBatchIndex;
|
||||||
|
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
|
||||||
|
|
||||||
|
// 현재 배치의 IMO 번호 추출 (100개)
|
||||||
|
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
|
||||||
|
|
||||||
|
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
|
||||||
|
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
|
||||||
|
|
||||||
|
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
|
||||||
|
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
|
||||||
|
|
||||||
|
try {
|
||||||
|
// IMO 번호를 쉼표로 연결 (예: "1000019,1000021,1000033,...")
|
||||||
|
String imoParam = String.join(",", currentBatch);
|
||||||
|
|
||||||
|
// API 호출
|
||||||
|
AisApiResponse response = callAisApiWithBatch(imoParam);
|
||||||
|
|
||||||
|
// 다음 배치로 인덱스 이동
|
||||||
|
currentBatchIndex = endIndex;
|
||||||
|
|
||||||
|
// 응답 처리
|
||||||
|
if (response != null && response.getTargetEnhancedArr() != null) {
|
||||||
|
List<TargetEnhancedDto> targets = response.getTargetEnhancedArr();
|
||||||
|
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
|
||||||
|
getReaderName(), currentBatchNumber, totalBatches, targets.size());
|
||||||
|
|
||||||
|
// API 호출 통계 업데이트
|
||||||
|
updateApiCallStats(totalBatches, currentBatchNumber);
|
||||||
|
|
||||||
|
// API 과부하 방지 (다음 배치 전 0.5초 대기)
|
||||||
|
if (currentBatchIndex < allImoNumbers.size()) {
|
||||||
|
Thread.sleep(500);
|
||||||
|
}
|
||||||
|
|
||||||
|
return targets;
|
||||||
|
} else {
|
||||||
|
log.warn("[{}] 배치 {}/{} 응답 없음",
|
||||||
|
getReaderName(), currentBatchNumber, totalBatches);
|
||||||
|
|
||||||
|
// API 호출 통계 업데이트 (실패도 카운트)
|
||||||
|
updateApiCallStats(totalBatches, currentBatchNumber);
|
||||||
|
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
|
||||||
|
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
|
||||||
|
|
||||||
|
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
|
||||||
|
currentBatchIndex = endIndex;
|
||||||
|
|
||||||
|
// 빈 리스트 반환 (Job 계속 진행)
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private AisApiResponse callAisApiWithBatch(String imoNumbers) {
|
||||||
|
String url = getApiPath();
|
||||||
|
|
||||||
|
Map<String, String> requestBody = Map.of("imo", imoNumbers);
|
||||||
|
|
||||||
|
log.debug("[{}] API 호출: {}", getReaderName(), url);
|
||||||
|
log.debug("[{}] : {}", "requestBody", requestBody.toString());
|
||||||
|
|
||||||
|
return webClient.post()
|
||||||
|
.uri(url)
|
||||||
|
.bodyValue(requestBody)
|
||||||
|
.retrieve()
|
||||||
|
.bodyToMono(AisApiResponse.class)
|
||||||
|
.block();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -0,0 +1,9 @@
|
|||||||
|
package com.snp.batch.jobs.shipdetail.batch.repository;
|
||||||
|
|
||||||
|
import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public interface ShipLastPositionRepository {
|
||||||
|
void saveLastPositionAll(List<TargetEnhancedEntity> items);
|
||||||
|
}
|
||||||
@ -0,0 +1,98 @@
|
|||||||
|
package com.snp.batch.jobs.shipdetail.batch.repository;
|
||||||
|
|
||||||
|
import com.snp.batch.common.batch.repository.BaseJdbcRepository;
|
||||||
|
import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
import org.springframework.jdbc.core.RowMapper;
|
||||||
|
import org.springframework.stereotype.Repository;
|
||||||
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
|
import java.sql.PreparedStatement;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Repository("shipLastPositionRepository")
|
||||||
|
public class ShipLastPositionRepositoryImpl extends BaseJdbcRepository<TargetEnhancedEntity, Long> implements ShipLastPositionRepository {
|
||||||
|
public ShipLastPositionRepositoryImpl(JdbcTemplate jdbcTemplate) {
|
||||||
|
super(jdbcTemplate);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getTableName() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RowMapper<TargetEnhancedEntity> getRowMapper() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Long extractId(TargetEnhancedEntity entity) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getInsertSql() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getUpdateSql() {
|
||||||
|
return """
|
||||||
|
UPDATE test_s_p.test_core20
|
||||||
|
SET lastseen = ?::timestamptz,
|
||||||
|
lastport = ?,
|
||||||
|
position_latitude = ?,
|
||||||
|
position_longitude = ?,
|
||||||
|
destination = ?,
|
||||||
|
eta = ?::timestamptz,
|
||||||
|
heading = ?,
|
||||||
|
batch_flag = 'N'
|
||||||
|
WHERE ihslrorimoshipno = ?;
|
||||||
|
""";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setInsertParameters(PreparedStatement ps, TargetEnhancedEntity entity) throws Exception {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setUpdateParameters(PreparedStatement ps, TargetEnhancedEntity entity) throws Exception {
|
||||||
|
int idx = 1;
|
||||||
|
ps.setString(idx++, entity.getReceivedDate()); // receivedDate : lastseen (Datetime)
|
||||||
|
ps.setString(idx++, entity.getLpcCode()); // lpcCode : lastport (String)
|
||||||
|
ps.setDouble(idx++, entity.getLat()); // lat : position_latitude (Double)
|
||||||
|
ps.setDouble(idx++, entity.getLon()); // lon : position_longitude (Double)
|
||||||
|
ps.setString(idx++, entity.getDestination()); // destination : destination (String)
|
||||||
|
ps.setString(idx++, entity.getEta()); // eta : eta (Datetime)
|
||||||
|
ps.setDouble(idx++, entity.getHeading()); // heading : heading (Double)
|
||||||
|
ps.setString(idx++, entity.getImoVerified()); // imoVerified : imo (String)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getEntityName() {
|
||||||
|
return "TargetEnhancedEntity";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Transactional
|
||||||
|
public void saveLastPositionAll(List<TargetEnhancedEntity> items) {
|
||||||
|
if (items == null || items.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
jdbcTemplate.batchUpdate(getUpdateSql(), items, items.size(),
|
||||||
|
(ps, entity) -> {
|
||||||
|
try {
|
||||||
|
setUpdateParameters(ps, entity);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("배치 수정 파라미터 설정 실패", e);
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
log.info("{} 전체 저장 완료: 수정={} 건", getEntityName(), items.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,24 @@
|
|||||||
|
package com.snp.batch.jobs.shipdetail.batch.writer;
|
||||||
|
|
||||||
|
import com.snp.batch.common.batch.writer.BaseWriter;
|
||||||
|
import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity;
|
||||||
|
import com.snp.batch.jobs.shipdetail.batch.repository.ShipLastPositionRepository;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class ShipLastPositionDataWriter extends BaseWriter<TargetEnhancedEntity> {
|
||||||
|
|
||||||
|
private final ShipLastPositionRepository shipLastPositionRepository;
|
||||||
|
public ShipLastPositionDataWriter(ShipLastPositionRepository shipLastPositionRepository) {
|
||||||
|
super("ShipLastPosition");
|
||||||
|
this.shipLastPositionRepository = shipLastPositionRepository;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void writeItems(List<TargetEnhancedEntity> items) throws Exception {
|
||||||
|
shipLastPositionRepository.saveLastPositionAll(items);
|
||||||
|
}
|
||||||
|
}
|
||||||
불러오는 중...
Reference in New Issue
Block a user