Merge pull request 'feat(ais): AIS 응답 스트리밍 처리 및 캐시 로그 명확화' (#69) from feature/ais-streaming-response into develop
This commit is contained in:
커밋
865bb95fc3
@ -34,6 +34,7 @@
|
|||||||
- 각 화면별 사용자 가이드 추가 (#41)
|
- 각 화면별 사용자 가이드 추가 (#41)
|
||||||
- 스케줄 화면 검색/정렬/필터 기능 추가 및 UI 구조 개선 (#54)
|
- 스케줄 화면 검색/정렬/필터 기능 추가 및 UI 구조 개선 (#54)
|
||||||
- 재수집 이력 화면 개선: 배치 실행일시 추가, 작업명 잘림 해소, CSV 내보내기 제거 (#55)
|
- 재수집 이력 화면 개선: 배치 실행일시 추가, 작업명 잘림 해소, CSV 내보내기 제거 (#55)
|
||||||
|
- AIS API 응답 스트리밍 처리로 메모리 버퍼 제한 우회 (DataBufferLimitException 근본 해결)
|
||||||
|
|
||||||
### 수정
|
### 수정
|
||||||
- 자동 재수집 JobParameter 오버플로우 수정 (VARCHAR 2500 제한 해결)
|
- 자동 재수집 JobParameter 오버플로우 수정 (VARCHAR 2500 제한 해결)
|
||||||
@ -59,6 +60,7 @@
|
|||||||
- 미사용 Dead Code 정리 (~1,200 LOC 삭제)
|
- 미사용 Dead Code 정리 (~1,200 LOC 삭제)
|
||||||
- 미사용 배치 작업 13개 제거 (~4,000 LOC 삭제) (#40)
|
- 미사용 배치 작업 13개 제거 (~4,000 LOC 삭제) (#40)
|
||||||
- API 인증정보 공통화(api-auth) 및 환경별 중복 설정 제거 (#59)
|
- API 인증정보 공통화(api-auth) 및 환경별 중복 설정 제거 (#59)
|
||||||
|
- AIS Import Job 로그에 캐시 적재 흐름 명시 (`API → 캐시`)
|
||||||
|
|
||||||
### 기타
|
### 기타
|
||||||
- Gitea 팀 프로젝트 워크플로우 구조 적용
|
- Gitea 팀 프로젝트 워크플로우 구조 적용
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
package com.snp.batch.jobs.aistarget.batch.config;
|
package com.snp.batch.jobs.aistarget.batch.config;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.snp.batch.common.batch.config.BaseJobConfig;
|
import com.snp.batch.common.batch.config.BaseJobConfig;
|
||||||
import com.snp.batch.jobs.aistarget.batch.dto.AisTargetDto;
|
import com.snp.batch.jobs.aistarget.batch.dto.AisTargetDto;
|
||||||
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
||||||
@ -44,6 +45,7 @@ public class AisTargetImportJobConfig extends BaseJobConfig<AisTargetDto, AisTar
|
|||||||
private final AisTargetDataProcessor aisTargetDataProcessor;
|
private final AisTargetDataProcessor aisTargetDataProcessor;
|
||||||
private final AisTargetDataWriter aisTargetDataWriter;
|
private final AisTargetDataWriter aisTargetDataWriter;
|
||||||
private final WebClient maritimeAisApiWebClient;
|
private final WebClient maritimeAisApiWebClient;
|
||||||
|
private final ObjectMapper objectMapper;
|
||||||
private final Core20CacheManager core20CacheManager;
|
private final Core20CacheManager core20CacheManager;
|
||||||
|
|
||||||
@Value("${app.batch.ais-target.since-seconds:60}")
|
@Value("${app.batch.ais-target.since-seconds:60}")
|
||||||
@ -58,11 +60,13 @@ public class AisTargetImportJobConfig extends BaseJobConfig<AisTargetDto, AisTar
|
|||||||
AisTargetDataProcessor aisTargetDataProcessor,
|
AisTargetDataProcessor aisTargetDataProcessor,
|
||||||
AisTargetDataWriter aisTargetDataWriter,
|
AisTargetDataWriter aisTargetDataWriter,
|
||||||
@Qualifier("maritimeAisApiWebClient") WebClient maritimeAisApiWebClient,
|
@Qualifier("maritimeAisApiWebClient") WebClient maritimeAisApiWebClient,
|
||||||
|
ObjectMapper objectMapper,
|
||||||
Core20CacheManager core20CacheManager) {
|
Core20CacheManager core20CacheManager) {
|
||||||
super(jobRepository, transactionManager);
|
super(jobRepository, transactionManager);
|
||||||
this.aisTargetDataProcessor = aisTargetDataProcessor;
|
this.aisTargetDataProcessor = aisTargetDataProcessor;
|
||||||
this.aisTargetDataWriter = aisTargetDataWriter;
|
this.aisTargetDataWriter = aisTargetDataWriter;
|
||||||
this.maritimeAisApiWebClient = maritimeAisApiWebClient;
|
this.maritimeAisApiWebClient = maritimeAisApiWebClient;
|
||||||
|
this.objectMapper = objectMapper;
|
||||||
this.core20CacheManager = core20CacheManager;
|
this.core20CacheManager = core20CacheManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -78,7 +82,7 @@ public class AisTargetImportJobConfig extends BaseJobConfig<AisTargetDto, AisTar
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ItemReader<AisTargetDto> createReader() {
|
protected ItemReader<AisTargetDto> createReader() {
|
||||||
return new AisTargetDataReader(maritimeAisApiWebClient, sinceSeconds);
|
return new AisTargetDataReader(maritimeAisApiWebClient, objectMapper, sinceSeconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -104,7 +108,7 @@ public class AisTargetImportJobConfig extends BaseJobConfig<AisTargetDto, AisTar
|
|||||||
// 배치 수집 시점 설정
|
// 배치 수집 시점 설정
|
||||||
OffsetDateTime collectedAt = OffsetDateTime.now();
|
OffsetDateTime collectedAt = OffsetDateTime.now();
|
||||||
aisTargetDataProcessor.setCollectedAt(collectedAt);
|
aisTargetDataProcessor.setCollectedAt(collectedAt);
|
||||||
log.info("[{}] Job 시작 - 수집 시점: {}", getJobName(), collectedAt);
|
log.info("[{}] Job 시작 (API → 캐시) - 수집 시점: {}", getJobName(), collectedAt);
|
||||||
|
|
||||||
// Core20 캐시 관리
|
// Core20 캐시 관리
|
||||||
// 1. 캐시가 비어있으면 즉시 로딩 (첫 실행 또는 재시작 시)
|
// 1. 캐시가 비어있으면 즉시 로딩 (첫 실행 또는 재시작 시)
|
||||||
@ -121,7 +125,7 @@ public class AisTargetImportJobConfig extends BaseJobConfig<AisTargetDto, AisTar
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterJob(JobExecution jobExecution) {
|
public void afterJob(JobExecution jobExecution) {
|
||||||
log.info("[{}] Job 완료 - 상태: {}, 처리 건수: {}, Core20 캐시 크기: {}",
|
log.info("[{}] Job 완료 (API → 캐시) - 상태: {}, 캐시 적재 건수: {}, Core20 캐시 크기: {}",
|
||||||
getJobName(),
|
getJobName(),
|
||||||
jobExecution.getStatus(),
|
jobExecution.getStatus(),
|
||||||
jobExecution.getStepExecutions().stream()
|
jobExecution.getStepExecutions().stream()
|
||||||
|
|||||||
@ -1,11 +1,20 @@
|
|||||||
package com.snp.batch.jobs.aistarget.batch.reader;
|
package com.snp.batch.jobs.aistarget.batch.reader;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.snp.batch.common.batch.reader.BaseApiReader;
|
import com.snp.batch.common.batch.reader.BaseApiReader;
|
||||||
import com.snp.batch.jobs.aistarget.batch.dto.AisTargetApiResponse;
|
import com.snp.batch.jobs.aistarget.batch.dto.AisTargetApiResponse;
|
||||||
import com.snp.batch.jobs.aistarget.batch.dto.AisTargetDto;
|
import com.snp.batch.jobs.aistarget.batch.dto.AisTargetDto;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
|
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||||
import org.springframework.web.reactive.function.client.WebClient;
|
import org.springframework.web.reactive.function.client.WebClient;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.StandardOpenOption;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -13,21 +22,24 @@ import java.util.Map;
|
|||||||
/**
|
/**
|
||||||
* AIS Target 데이터 Reader
|
* AIS Target 데이터 Reader
|
||||||
*
|
*
|
||||||
* API: POST /AisSvc.svc/AIS/GetTargets
|
* API: POST /AisSvc.svc/AIS/GetTargetsEnhanced
|
||||||
* Request: {"sinceSeconds": "60"}
|
* Request: {"sinceSeconds": "60"}
|
||||||
*
|
*
|
||||||
* 동작:
|
* 동작:
|
||||||
* - 매 분 15초에 실행 (Quartz 스케줄)
|
* - 매 분 15초에 실행 (Quartz 스케줄)
|
||||||
* - 최근 60초 동안의 전체 선박 위치 정보 조회
|
* - 최근 60초 동안의 전체 선박 위치 정보 조회
|
||||||
* - 약 33,000건/분 처리
|
* - 약 33,000건/분 처리
|
||||||
|
* - 대용량 응답을 임시 파일로 스트리밍하여 메모리 버퍼 제한 우회
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class AisTargetDataReader extends BaseApiReader<AisTargetDto> {
|
public class AisTargetDataReader extends BaseApiReader<AisTargetDto> {
|
||||||
|
|
||||||
private final int sinceSeconds;
|
private final int sinceSeconds;
|
||||||
|
private final ObjectMapper objectMapper;
|
||||||
|
|
||||||
public AisTargetDataReader(WebClient webClient, int sinceSeconds) {
|
public AisTargetDataReader(WebClient webClient, ObjectMapper objectMapper, int sinceSeconds) {
|
||||||
super(webClient);
|
super(webClient);
|
||||||
|
this.objectMapper = objectMapper;
|
||||||
this.sinceSeconds = sinceSeconds;
|
this.sinceSeconds = sinceSeconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -63,15 +75,18 @@ public class AisTargetDataReader extends BaseApiReader<AisTargetDto> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<AisTargetDto> fetchDataFromApi() {
|
protected List<AisTargetDto> fetchDataFromApi() {
|
||||||
|
Path tempFile = null;
|
||||||
try {
|
try {
|
||||||
log.info("[{}] API 호출 시작: {} {}", getReaderName(), getHttpMethod(), getApiPath());
|
log.info("[{}] API 호출 시작 (스트리밍 모드): {} {}", getReaderName(), getHttpMethod(), getApiPath());
|
||||||
|
|
||||||
AisTargetApiResponse response = webClient.post()
|
tempFile = Files.createTempFile("ais-response-", ".json");
|
||||||
.uri(getApiPath())
|
|
||||||
.bodyValue(getRequestBody())
|
// 응답을 DataBuffer 스트림으로 받아 임시 파일에 기록
|
||||||
.retrieve()
|
long bytesWritten = streamResponseToFile(tempFile);
|
||||||
.bodyToMono(AisTargetApiResponse.class)
|
log.info("[{}] 응답 스트리밍 완료: {} bytes → {}", getReaderName(), bytesWritten, tempFile.getFileName());
|
||||||
.block();
|
|
||||||
|
// 임시 파일에서 JSON 파싱
|
||||||
|
AisTargetApiResponse response = parseResponseFromFile(tempFile);
|
||||||
|
|
||||||
if (response != null && response.getTargetArr() != null) {
|
if (response != null && response.getTargetArr() != null) {
|
||||||
List<AisTargetDto> targets = response.getTargetArr();
|
List<AisTargetDto> targets = response.getTargetArr();
|
||||||
@ -86,6 +101,51 @@ public class AisTargetDataReader extends BaseApiReader<AisTargetDto> {
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[{}] API 호출 실패: {}", getReaderName(), e.getMessage(), e);
|
log.error("[{}] API 호출 실패: {}", getReaderName(), e.getMessage(), e);
|
||||||
return handleApiError(e);
|
return handleApiError(e);
|
||||||
|
} finally {
|
||||||
|
deleteTempFile(tempFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* WebClient 응답을 DataBuffer 스트림으로 받아 임시 파일에 기록한다.
|
||||||
|
* bodyToMono()와 달리 메모리 버퍼 제한(maxInMemorySize)의 영향을 받지 않는다.
|
||||||
|
*/
|
||||||
|
private long streamResponseToFile(Path tempFile) throws IOException {
|
||||||
|
try (OutputStream os = Files.newOutputStream(tempFile, StandardOpenOption.WRITE)) {
|
||||||
|
webClient.post()
|
||||||
|
.uri(getApiPath())
|
||||||
|
.bodyValue(getRequestBody())
|
||||||
|
.retrieve()
|
||||||
|
.bodyToFlux(DataBuffer.class)
|
||||||
|
.doOnNext(dataBuffer -> {
|
||||||
|
try {
|
||||||
|
byte[] bytes = new byte[dataBuffer.readableByteCount()];
|
||||||
|
dataBuffer.read(bytes);
|
||||||
|
os.write(bytes);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException("임시 파일 쓰기 실패", e);
|
||||||
|
} finally {
|
||||||
|
DataBufferUtils.release(dataBuffer);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.blockLast();
|
||||||
|
}
|
||||||
|
return Files.size(tempFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
private AisTargetApiResponse parseResponseFromFile(Path tempFile) throws IOException {
|
||||||
|
try (InputStream is = Files.newInputStream(tempFile)) {
|
||||||
|
return objectMapper.readValue(is, AisTargetApiResponse.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteTempFile(Path tempFile) {
|
||||||
|
if (tempFile != null) {
|
||||||
|
try {
|
||||||
|
Files.deleteIfExists(tempFile);
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.warn("[{}] 임시 파일 삭제 실패: {}", getReaderName(), tempFile, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -52,7 +52,7 @@ public class AisTargetDataWriter extends BaseWriter<AisTargetEntity> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void writeItems(List<AisTargetEntity> items) throws Exception {
|
protected void writeItems(List<AisTargetEntity> items) throws Exception {
|
||||||
log.debug("AIS Target 캐시 업데이트 시작: {} 건", items.size());
|
log.info("AIS Target 캐시 업데이트 시작: {} 건", items.size());
|
||||||
|
|
||||||
// 1. ClassType 분류 (캐시 저장 전에 분류)
|
// 1. ClassType 분류 (캐시 저장 전에 분류)
|
||||||
// - Core20 캐시의 IMO와 매칭하여 classType(A/B), core20Mmsi 설정
|
// - Core20 캐시의 IMO와 매칭하여 classType(A/B), core20Mmsi 설정
|
||||||
@ -67,7 +67,7 @@ public class AisTargetDataWriter extends BaseWriter<AisTargetEntity> {
|
|||||||
// 3. 캐시 업데이트 (classType, core20Mmsi, signalKindCode 포함)
|
// 3. 캐시 업데이트 (classType, core20Mmsi, signalKindCode 포함)
|
||||||
cacheManager.putAll(items);
|
cacheManager.putAll(items);
|
||||||
|
|
||||||
log.debug("AIS Target 캐시 업데이트 완료: {} 건 (캐시 크기: {})",
|
log.info("AIS Target 캐시 업데이트 완료: {} 건 (캐시 크기: {})",
|
||||||
items.size(), cacheManager.size());
|
items.size(), cacheManager.size());
|
||||||
|
|
||||||
// 4. ChnPrmShip 전용 캐시 업데이트 (대상 MMSI만 필터)
|
// 4. ChnPrmShip 전용 캐시 업데이트 (대상 MMSI만 필터)
|
||||||
|
|||||||
불러오는 중...
Reference in New Issue
Block a user