feat(ais): AIS 응답 스트리밍 처리 및 캐시 로그 명확화
This commit is contained in:
부모
a5228a8910
커밋
ae110bd91a
@ -1,5 +1,6 @@
|
||||
package com.snp.batch.jobs.aistarget.batch.config;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.snp.batch.common.batch.config.BaseJobConfig;
|
||||
import com.snp.batch.jobs.aistarget.batch.dto.AisTargetDto;
|
||||
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
||||
@ -44,6 +45,7 @@ public class AisTargetImportJobConfig extends BaseJobConfig<AisTargetDto, AisTar
|
||||
private final AisTargetDataProcessor aisTargetDataProcessor;
|
||||
private final AisTargetDataWriter aisTargetDataWriter;
|
||||
private final WebClient maritimeAisApiWebClient;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final Core20CacheManager core20CacheManager;
|
||||
|
||||
@Value("${app.batch.ais-target.since-seconds:60}")
|
||||
@ -58,11 +60,13 @@ public class AisTargetImportJobConfig extends BaseJobConfig<AisTargetDto, AisTar
|
||||
AisTargetDataProcessor aisTargetDataProcessor,
|
||||
AisTargetDataWriter aisTargetDataWriter,
|
||||
@Qualifier("maritimeAisApiWebClient") WebClient maritimeAisApiWebClient,
|
||||
ObjectMapper objectMapper,
|
||||
Core20CacheManager core20CacheManager) {
|
||||
super(jobRepository, transactionManager);
|
||||
this.aisTargetDataProcessor = aisTargetDataProcessor;
|
||||
this.aisTargetDataWriter = aisTargetDataWriter;
|
||||
this.maritimeAisApiWebClient = maritimeAisApiWebClient;
|
||||
this.objectMapper = objectMapper;
|
||||
this.core20CacheManager = core20CacheManager;
|
||||
}
|
||||
|
||||
@ -78,7 +82,7 @@ public class AisTargetImportJobConfig extends BaseJobConfig<AisTargetDto, AisTar
|
||||
|
||||
@Override
|
||||
protected ItemReader<AisTargetDto> createReader() {
|
||||
return new AisTargetDataReader(maritimeAisApiWebClient, sinceSeconds);
|
||||
return new AisTargetDataReader(maritimeAisApiWebClient, objectMapper, sinceSeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -104,7 +108,7 @@ public class AisTargetImportJobConfig extends BaseJobConfig<AisTargetDto, AisTar
|
||||
// 배치 수집 시점 설정
|
||||
OffsetDateTime collectedAt = OffsetDateTime.now();
|
||||
aisTargetDataProcessor.setCollectedAt(collectedAt);
|
||||
log.info("[{}] Job 시작 - 수집 시점: {}", getJobName(), collectedAt);
|
||||
log.info("[{}] Job 시작 (API → 캐시) - 수집 시점: {}", getJobName(), collectedAt);
|
||||
|
||||
// Core20 캐시 관리
|
||||
// 1. 캐시가 비어있으면 즉시 로딩 (첫 실행 또는 재시작 시)
|
||||
@ -121,7 +125,7 @@ public class AisTargetImportJobConfig extends BaseJobConfig<AisTargetDto, AisTar
|
||||
|
||||
@Override
|
||||
public void afterJob(JobExecution jobExecution) {
|
||||
log.info("[{}] Job 완료 - 상태: {}, 처리 건수: {}, Core20 캐시 크기: {}",
|
||||
log.info("[{}] Job 완료 (API → 캐시) - 상태: {}, 캐시 적재 건수: {}, Core20 캐시 크기: {}",
|
||||
getJobName(),
|
||||
jobExecution.getStatus(),
|
||||
jobExecution.getStepExecutions().stream()
|
||||
|
||||
@ -1,11 +1,20 @@
|
||||
package com.snp.batch.jobs.aistarget.batch.reader;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.snp.batch.common.batch.reader.BaseApiReader;
|
||||
import com.snp.batch.jobs.aistarget.batch.dto.AisTargetApiResponse;
|
||||
import com.snp.batch.jobs.aistarget.batch.dto.AisTargetDto;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -13,21 +22,24 @@ import java.util.Map;
|
||||
/**
|
||||
* AIS Target 데이터 Reader
|
||||
*
|
||||
* API: POST /AisSvc.svc/AIS/GetTargets
|
||||
* API: POST /AisSvc.svc/AIS/GetTargetsEnhanced
|
||||
* Request: {"sinceSeconds": "60"}
|
||||
*
|
||||
* 동작:
|
||||
* - 매 분 15초에 실행 (Quartz 스케줄)
|
||||
* - 최근 60초 동안의 전체 선박 위치 정보 조회
|
||||
* - 약 33,000건/분 처리
|
||||
* - 대용량 응답을 임시 파일로 스트리밍하여 메모리 버퍼 제한 우회
|
||||
*/
|
||||
@Slf4j
|
||||
public class AisTargetDataReader extends BaseApiReader<AisTargetDto> {
|
||||
|
||||
private final int sinceSeconds;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
public AisTargetDataReader(WebClient webClient, int sinceSeconds) {
|
||||
public AisTargetDataReader(WebClient webClient, ObjectMapper objectMapper, int sinceSeconds) {
|
||||
super(webClient);
|
||||
this.objectMapper = objectMapper;
|
||||
this.sinceSeconds = sinceSeconds;
|
||||
}
|
||||
|
||||
@ -63,15 +75,18 @@ public class AisTargetDataReader extends BaseApiReader<AisTargetDto> {
|
||||
|
||||
@Override
|
||||
protected List<AisTargetDto> fetchDataFromApi() {
|
||||
Path tempFile = null;
|
||||
try {
|
||||
log.info("[{}] API 호출 시작: {} {}", getReaderName(), getHttpMethod(), getApiPath());
|
||||
log.info("[{}] API 호출 시작 (스트리밍 모드): {} {}", getReaderName(), getHttpMethod(), getApiPath());
|
||||
|
||||
AisTargetApiResponse response = webClient.post()
|
||||
.uri(getApiPath())
|
||||
.bodyValue(getRequestBody())
|
||||
.retrieve()
|
||||
.bodyToMono(AisTargetApiResponse.class)
|
||||
.block();
|
||||
tempFile = Files.createTempFile("ais-response-", ".json");
|
||||
|
||||
// 응답을 DataBuffer 스트림으로 받아 임시 파일에 기록
|
||||
long bytesWritten = streamResponseToFile(tempFile);
|
||||
log.info("[{}] 응답 스트리밍 완료: {} bytes → {}", getReaderName(), bytesWritten, tempFile.getFileName());
|
||||
|
||||
// 임시 파일에서 JSON 파싱
|
||||
AisTargetApiResponse response = parseResponseFromFile(tempFile);
|
||||
|
||||
if (response != null && response.getTargetArr() != null) {
|
||||
List<AisTargetDto> targets = response.getTargetArr();
|
||||
@ -86,6 +101,51 @@ public class AisTargetDataReader extends BaseApiReader<AisTargetDto> {
|
||||
} catch (Exception e) {
|
||||
log.error("[{}] API 호출 실패: {}", getReaderName(), e.getMessage(), e);
|
||||
return handleApiError(e);
|
||||
} finally {
|
||||
deleteTempFile(tempFile);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* WebClient 응답을 DataBuffer 스트림으로 받아 임시 파일에 기록한다.
|
||||
* bodyToMono()와 달리 메모리 버퍼 제한(maxInMemorySize)의 영향을 받지 않는다.
|
||||
*/
|
||||
private long streamResponseToFile(Path tempFile) throws IOException {
|
||||
try (OutputStream os = Files.newOutputStream(tempFile, StandardOpenOption.WRITE)) {
|
||||
webClient.post()
|
||||
.uri(getApiPath())
|
||||
.bodyValue(getRequestBody())
|
||||
.retrieve()
|
||||
.bodyToFlux(DataBuffer.class)
|
||||
.doOnNext(dataBuffer -> {
|
||||
try {
|
||||
byte[] bytes = new byte[dataBuffer.readableByteCount()];
|
||||
dataBuffer.read(bytes);
|
||||
os.write(bytes);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("임시 파일 쓰기 실패", e);
|
||||
} finally {
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
}
|
||||
})
|
||||
.blockLast();
|
||||
}
|
||||
return Files.size(tempFile);
|
||||
}
|
||||
|
||||
private AisTargetApiResponse parseResponseFromFile(Path tempFile) throws IOException {
|
||||
try (InputStream is = Files.newInputStream(tempFile)) {
|
||||
return objectMapper.readValue(is, AisTargetApiResponse.class);
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteTempFile(Path tempFile) {
|
||||
if (tempFile != null) {
|
||||
try {
|
||||
Files.deleteIfExists(tempFile);
|
||||
} catch (IOException e) {
|
||||
log.warn("[{}] 임시 파일 삭제 실패: {}", getReaderName(), tempFile, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -52,7 +52,7 @@ public class AisTargetDataWriter extends BaseWriter<AisTargetEntity> {
|
||||
|
||||
@Override
|
||||
protected void writeItems(List<AisTargetEntity> items) throws Exception {
|
||||
log.debug("AIS Target 캐시 업데이트 시작: {} 건", items.size());
|
||||
log.info("AIS Target 캐시 업데이트 시작: {} 건", items.size());
|
||||
|
||||
// 1. ClassType 분류 (캐시 저장 전에 분류)
|
||||
// - Core20 캐시의 IMO와 매칭하여 classType(A/B), core20Mmsi 설정
|
||||
@ -67,7 +67,7 @@ public class AisTargetDataWriter extends BaseWriter<AisTargetEntity> {
|
||||
// 3. 캐시 업데이트 (classType, core20Mmsi, signalKindCode 포함)
|
||||
cacheManager.putAll(items);
|
||||
|
||||
log.debug("AIS Target 캐시 업데이트 완료: {} 건 (캐시 크기: {})",
|
||||
log.info("AIS Target 캐시 업데이트 완료: {} 건 (캐시 크기: {})",
|
||||
items.size(), cacheManager.size());
|
||||
|
||||
// 4. ChnPrmShip 전용 캐시 업데이트 (대상 MMSI만 필터)
|
||||
|
||||
불러오는 중...
Reference in New Issue
Block a user