feat: REST V2 캐시 우선 조회 + Semaphore 부하 제어 + 응답 포인트 상한
- POST /api/v2/tracks/vessels: DailyTrackCacheManager 캐시-DB 분리 조회 - 모든 V2 항적 엔드포인트에 ActiveQueryManager 공유 Semaphore 적용 - 포인트 버짓: 총 포인트 초과 시 비율 기반 균등 분배 간소화 - prod/prod-mpr/query 프로파일에 rest.v2.query 설정 추가 - 원격 DB 필요 테스트 @Disabled 처리, Maven Wrapper/gitattributes 정비 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
부모
121a7b7d01
커밋
1480990f4f
29
.gitattributes
vendored
29
.gitattributes
vendored
@ -1,2 +1,31 @@
|
|||||||
|
# 자동 줄바꿈 정규화 (checkout 시 OS에 맞게, commit 시 LF로 통일)
|
||||||
|
* text=auto
|
||||||
|
|
||||||
|
# Java 소스
|
||||||
|
*.java text
|
||||||
|
*.properties text
|
||||||
|
*.xml text
|
||||||
|
*.yml text
|
||||||
|
*.yaml text
|
||||||
|
|
||||||
|
# Web 리소스
|
||||||
|
*.html text
|
||||||
|
*.css text
|
||||||
|
*.js text
|
||||||
|
*.json text
|
||||||
|
|
||||||
|
# Shell 스크립트 (Linux/Mac)
|
||||||
|
*.sh text eol=lf
|
||||||
/mvnw text eol=lf
|
/mvnw text eol=lf
|
||||||
|
|
||||||
|
# Windows 스크립트
|
||||||
*.cmd text eol=crlf
|
*.cmd text eol=crlf
|
||||||
|
*.bat text eol=crlf
|
||||||
|
|
||||||
|
# 바이너리 (줄바꿈 변환하지 않음)
|
||||||
|
*.jar binary
|
||||||
|
*.png binary
|
||||||
|
*.jpg binary
|
||||||
|
*.gif binary
|
||||||
|
*.ico binary
|
||||||
|
*.docx binary
|
||||||
|
|||||||
1
.gitignore
vendored
1
.gitignore
vendored
@ -34,7 +34,6 @@ build/
|
|||||||
.vscode/
|
.vscode/
|
||||||
scripts/
|
scripts/
|
||||||
.claude
|
.claude
|
||||||
.mvn/
|
|
||||||
logs/
|
logs/
|
||||||
*.log
|
*.log
|
||||||
sql/
|
sql/
|
||||||
|
|||||||
19
.mvn/wrapper/maven-wrapper.properties
vendored
Normal file
19
.mvn/wrapper/maven-wrapper.properties
vendored
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing,
|
||||||
|
# software distributed under the License is distributed on an
|
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
# KIND, either express or implied. See the License for the
|
||||||
|
# specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
wrapperVersion=3.3.2
|
||||||
|
distributionType=only-script
|
||||||
|
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.10/apache-maven-3.9.10-bin.zip
|
||||||
@ -6,7 +6,7 @@
|
|||||||
|------|------|
|
|------|------|
|
||||||
| 작성일 | 2026-02-06 |
|
| 작성일 | 2026-02-06 |
|
||||||
| 대상 시스템 | Signal Batch — 선박 항적 조회/리플레이 WebSocket 서비스 |
|
| 대상 시스템 | Signal Batch — 선박 항적 조회/리플레이 WebSocket 서비스 |
|
||||||
| 운영 환경 | Linux, `vessel-batch-control.sh` → `run-on-query-server-dev.sh` (prod 프로파일) |
|
| 운영 환경 | Linux |
|
||||||
| 문제 상황 | 다수 클라이언트의 리플레이 요청 동시 유입 시 서비스 장애 발생 |
|
| 문제 상황 | 다수 클라이언트의 리플레이 요청 동시 유입 시 서비스 장애 발생 |
|
||||||
|
|
||||||
---
|
---
|
||||||
@ -114,7 +114,7 @@ CompletableFuture.runAsync(() -> {
|
|||||||
**파일**: `global/config/AsyncConfig.java`
|
**파일**: `global/config/AsyncConfig.java`
|
||||||
|
|
||||||
```java
|
```java
|
||||||
// AS-IS: 고정 크기 스레드 풀
|
// AS-IS: 소규모 스레드 풀 + 대용량 큐
|
||||||
@Bean(name = "trackStreamingExecutor")
|
@Bean(name = "trackStreamingExecutor")
|
||||||
public Executor getAsyncExecutor() {
|
public Executor getAsyncExecutor() {
|
||||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||||
@ -124,6 +124,7 @@ public Executor getAsyncExecutor() {
|
|||||||
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
|
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
|
||||||
return executor;
|
return executor;
|
||||||
}
|
}
|
||||||
|
// TO-BE: core 40, max 120, queue 100 (대기열을 Semaphore로 대체)
|
||||||
```
|
```
|
||||||
|
|
||||||
#### (D) 정적 전송 지연 — `StompTrackStreamingService`
|
#### (D) 정적 전송 지연 — `StompTrackStreamingService`
|
||||||
@ -347,8 +348,10 @@ CompletableFuture.runAsync(() -> {
|
|||||||
|
|
||||||
**TO-BE**: 서버 전체 동시 실행 쿼리 수를 Semaphore로 제한 + 대기 큐 구현
|
**TO-BE**: 서버 전체 동시 실행 쿼리 수를 Semaphore로 제한 + 대기 큐 구현
|
||||||
|
|
||||||
|
> **구현 참고**: 실제 구현에서 글로벌 Semaphore는 `ActiveQueryManager`로 분리되었으며, `TrackQueryInterceptor`는 세션당 제한만 담당합니다. 아래 코드는 설계 의도를 보여주는 개념적 예시입니다.
|
||||||
|
|
||||||
```java
|
```java
|
||||||
// TO-BE: TrackQueryInterceptor.java — 글로벌 동시 제한 추가
|
// TO-BE: TrackQueryInterceptor.java + ActiveQueryManager.java — 글로벌 동시 제한 추가
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
@ -357,10 +360,10 @@ public class TrackQueryInterceptor implements ChannelInterceptor {
|
|||||||
private final ConcurrentHashMap<String, AtomicInteger> sessionQueries = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, AtomicInteger> sessionQueries = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
// ▼ [추가] 글로벌 동시 쿼리 제한
|
// ▼ [추가] 글로벌 동시 쿼리 제한
|
||||||
@Value("${websocket.query.max-concurrent-global:20}")
|
@Value("${websocket.query.max-concurrent-global:60}")
|
||||||
private int maxConcurrentGlobal;
|
private int maxConcurrentGlobal;
|
||||||
|
|
||||||
@Value("${websocket.query.max-per-session:3}")
|
@Value("${websocket.query.max-per-session:20}")
|
||||||
private int maxQueriesPerSession;
|
private int maxQueriesPerSession;
|
||||||
|
|
||||||
@Value("${websocket.query.queue-timeout-seconds:30}")
|
@Value("${websocket.query.queue-timeout-seconds:30}")
|
||||||
@ -452,8 +455,8 @@ public class TrackQueryInterceptor implements ChannelInterceptor {
|
|||||||
```yaml
|
```yaml
|
||||||
websocket:
|
websocket:
|
||||||
query:
|
query:
|
||||||
max-concurrent-global: 20 # 서버 전체 동시 쿼리 상한
|
max-concurrent-global: 60 # 서버 전체 동시 쿼리 상한
|
||||||
max-per-session: 3 # 세션당 동시 쿼리 상한
|
max-per-session: 20 # 세션당 동시 쿼리 상한
|
||||||
queue-timeout-seconds: 30 # 대기 큐 타임아웃
|
queue-timeout-seconds: 30 # 대기 큐 타임아웃
|
||||||
```
|
```
|
||||||
|
|
||||||
@ -710,8 +713,8 @@ public class WebSocketProperties {
|
|||||||
|
|
||||||
@Data
|
@Data
|
||||||
public static class QueryProperties {
|
public static class QueryProperties {
|
||||||
private int maxConcurrentGlobal = 20;
|
private int maxConcurrentGlobal = 60;
|
||||||
private int maxPerSession = 3;
|
private int maxPerSession = 20;
|
||||||
private int queueTimeoutSeconds = 30;
|
private int queueTimeoutSeconds = 30;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -741,8 +744,8 @@ public class WebSocketProperties {
|
|||||||
```yaml
|
```yaml
|
||||||
websocket:
|
websocket:
|
||||||
query:
|
query:
|
||||||
max-concurrent-global: 20
|
max-concurrent-global: 60
|
||||||
max-per-session: 3
|
max-per-session: 20
|
||||||
queue-timeout-seconds: 30
|
queue-timeout-seconds: 30
|
||||||
transport:
|
transport:
|
||||||
inbound-core-pool-size: 10
|
inbound-core-pool-size: 10
|
||||||
@ -812,14 +815,14 @@ public class WebSocketMonitorController {
|
|||||||
### 5.1 글로벌 동시 제한 수 결정 기준
|
### 5.1 글로벌 동시 제한 수 결정 기준
|
||||||
|
|
||||||
```
|
```
|
||||||
maxConcurrentGlobal = min(DB커넥션풀 / 2, Async스레드풀max)
|
maxConcurrentGlobal = DB커넥션풀 / 평균쿼리당커넥션
|
||||||
= min(60 / 2, 30)
|
= 180 / 3
|
||||||
= 20 (권장)
|
= 60
|
||||||
```
|
```
|
||||||
|
|
||||||
- DB 커넥션 풀(60개)의 50%를 WebSocket 쿼리에 할당
|
- DB 커넥션 풀(180개) 기준 쿼리당 평균 3개 커넥션 사용을 고려하여 60개 설정
|
||||||
- 나머지 50%는 REST API, 배치 작업, 헬스체크 등에 예비
|
- REST API, 배치 작업 등은 별도 DataSource 풀(Collect 80, Batch 30)을 사용하므로 Query 풀 전체 활용 가능
|
||||||
- `trackStreamingExecutor` max(30) 이하로 설정하여 스레드 풀 포화 방지
|
- `trackStreamingExecutor` max(120) 이내로 설정하여 스레드 풀 여유 확보
|
||||||
|
|
||||||
### 5.2 대기 큐 타임아웃
|
### 5.2 대기 큐 타임아웃
|
||||||
|
|
||||||
@ -880,10 +883,11 @@ tryAcquireSlotImmediate()
|
|||||||
|
|
||||||
| 설정 | AS-IS | TO-BE | 근거 |
|
| 설정 | AS-IS | TO-BE | 근거 |
|
||||||
|------|-------|-------|------|
|
|------|-------|-------|------|
|
||||||
| Query DB 풀 | 120 | **180** | 동시 60쿼리 × 3커넥션 |
|
| Query DB 풀 | 60 | **180** | 동시 60쿼리 × 3커넥션 |
|
||||||
| max-concurrent-global | 30 | **60** | 180 / ~3 |
|
| max-concurrent-global | (없음) | **60** | 180 / ~3 |
|
||||||
| max-per-session | 3 | **20** | 대기열 방식이므로 넉넉하게 |
|
| max-per-session | 3 | **20** | 대기열 방식이므로 넉넉하게 |
|
||||||
| Executor max | 30 | **120** | 60실행 + 60대기 |
|
| Executor core/max | 15/30 | **40/120** | 60실행 + 60대기 |
|
||||||
|
| Executor queue | 500 | **100** | 대기열을 Semaphore로 대체 |
|
||||||
| Session idle timeout | 60s | **15s** | 빠른 정리 |
|
| Session idle timeout | 60s | **15s** | 빠른 정리 |
|
||||||
| Heartbeat | 10s/10s | **5s/5s** | 죽은 연결 빠른 감지 |
|
| Heartbeat | 10s/10s | **5s/5s** | 죽은 연결 빠른 감지 |
|
||||||
| SockJS disconnect delay | 30s | **5s** | 빠른 해제 |
|
| SockJS disconnect delay | 30s | **5s** | 빠른 해제 |
|
||||||
@ -925,7 +929,7 @@ DailyTrackCacheManager (@Service)
|
|||||||
|------|------|
|
|------|------|
|
||||||
| Daily 테이블 1일분 | ~350MB (DB) |
|
| Daily 테이블 1일분 | ~350MB (DB) |
|
||||||
| 7일분 인메모리 추정 | ~4GB (Java 객체 오버헤드 포함) |
|
| 7일분 인메모리 추정 | ~4GB (Java 객체 오버헤드 포함) |
|
||||||
| 최대 메모리 한도 | 5GB (설정 가능) |
|
| 최대 메모리 한도 | 6GB (설정 가능) |
|
||||||
| JVM 힙 (권장) | 12GB 이상 |
|
| JVM 힙 (권장) | 12GB 이상 |
|
||||||
|
|
||||||
### 6.3 쿼리 라우팅
|
### 6.3 쿼리 라우팅
|
||||||
@ -967,12 +971,59 @@ GET /api/websocket/daily-cache
|
|||||||
|
|
||||||
| 리소스 | AS-IS | TO-BE | 비고 |
|
| 리소스 | AS-IS | TO-BE | 비고 |
|
||||||
|--------|-------|-------|------|
|
|--------|-------|-------|------|
|
||||||
| JVM Heap | 8~16GB | **12~16GB 권장** | 캐시 ~4GB + 운영 |
|
| JVM Heap | 8~16GB | **16~32GB** | 캐시 ~6GB + 운영 |
|
||||||
| DB Pool (Query) | max 120 | **max 180** | WebSocket + REST |
|
| DB Pool (Query) | max 60 | **max 180** | WebSocket + REST |
|
||||||
| DB Pool (Collect) | max 80 | max 80 | 배치 Reader |
|
| DB Pool (Collect) | max 20 | **max 80** | 배치 Reader |
|
||||||
| DB Pool (Batch) | max 30 | max 30 | 메타데이터 |
|
| DB Pool (Batch) | max 20 | **max 30** | 메타데이터 |
|
||||||
| max-concurrent-global | 30 | **60** | Query풀 180 / 3 |
|
| max-concurrent-global | (없음) | **60** | Query풀 180 / 3 |
|
||||||
| trackStreamingExecutor | core 15, max 30 | **core 40, max 120** | 대기열 + 실행 |
|
| trackStreamingExecutor | core 15, max 30, queue 500 | **core 40, max 120, queue 100** | 대기열 + 실행 |
|
||||||
| Session idle timeout | 60s | **15s** | 빠른 정리 |
|
| Session idle timeout | 60s | **15s** | 빠른 정리 |
|
||||||
| Heartbeat | 10s/10s | **5s/5s** | 빠른 감지 |
|
| Heartbeat | 10s/10s | **5s/5s** | 빠른 감지 |
|
||||||
| Daily cache | - | **7일분 ~4GB** | 비동기 워밍업 |
|
| Send time limit | 120s | **30s** | 빠른 정리 |
|
||||||
|
| Daily cache | - | **7일분 ~6GB** | 비동기 워밍업 |
|
||||||
|
|
||||||
|
## Phase 9. 인메모리 캐시 간소화 및 성능 정량 비교
|
||||||
|
|
||||||
|
### 9.1 개요
|
||||||
|
|
||||||
|
캐시 HIT 경로에 DB 경로 동등 수준의 3단계 간소화 파이프라인을 적용하여,
|
||||||
|
DB 커넥션 없이도 동일한 품질의 항적 데이터를 제공.
|
||||||
|
|
||||||
|
**간소화 파이프라인**: Douglas-Peucker (DP) → 거리/시간 기반 샘플링 → 줌 레벨 샘플링
|
||||||
|
|
||||||
|
**벤치마크 로그**: `logs/cache-benchmark.log`에 DB/캐시 경로별 JSON 요약 자동 수집.
|
||||||
|
|
||||||
|
### 9.2 AS-IS vs TO-BE 정량 비교
|
||||||
|
|
||||||
|
운영 환경에서 수집한 `cache-benchmark.log` 12건 기준 (2026-02-07 측정):
|
||||||
|
|
||||||
|
| 지표 | AS-IS (DB 경로) | TO-BE (캐시 경로) | 개선율 |
|
||||||
|
|------|----------------|-------------------|--------|
|
||||||
|
| 응답 시간 (ms) | 7,221 ~ 8,195 | 575 ~ 1,439 | **5.7 ~ 12.6배** |
|
||||||
|
| 배치 전송 수 | 2 ~ 11 | 3 ~ 10 | 유사 (간소화로 감소) |
|
||||||
|
| DB 커넥션 사용 | 8 ~ 19건 | 2 ~ 3건 | **63 ~ 89% 절감** |
|
||||||
|
| DB 쿼리 시간 (ms) | 1,443 ~ 3,475 | 0 | **100% 절감** |
|
||||||
|
| 포인트 압축 | SQL ST_Simplify | 앱 레벨 95 ~ 99% | 동등 품질 |
|
||||||
|
| 간소화 CPU 시간 | - | 24 ~ 1,258ms | DB 대기 없음 |
|
||||||
|
|
||||||
|
> 상세 데이터 및 경로별 분석은 별도 「일일 캐시 성능 벤치마크 보고서」 참조
|
||||||
|
|
||||||
|
### 9.3 데이터 품질 비교
|
||||||
|
|
||||||
|
- 간소화 후 포인트 수가 DB 경로(ST_Simplify)와 동등 수준 (99% 이상 감소)
|
||||||
|
- 선박 정보(`shipName`, `shipKindCode`, `nationalCode`) 정상 제공 확인
|
||||||
|
- 캐시 빌드 시 `t_vessel_latest_position` 일괄 조회로 보강
|
||||||
|
|
||||||
|
### 9.4 선박 정보 보강
|
||||||
|
|
||||||
|
캐시 빌드(`DailyTrackCacheManager.loadDay()`) 시 `enrichVesselInfo()` 추가:
|
||||||
|
- `t_vessel_latest_position` 테이블에서 `ship_nm`, `ship_ty` IN 절 1000건 배치 조회
|
||||||
|
- `nationalCode`: MMSI 기반 NationalCodeUtil 계산
|
||||||
|
- `shipKindCode`: ShipKindCodeConverter에 shipType/shipName 전달하여 정확한 판별
|
||||||
|
|
||||||
|
### 9.5 시스템 효과
|
||||||
|
|
||||||
|
- **DB IO 제거**: 캐시 HIT 시 Query DB 커넥션 사용 0 → 커넥션 풀 여유 확보
|
||||||
|
- **동시 사용자 수용**: DB 커넥션 경합 해소로 동시 처리 가능 수 증가
|
||||||
|
- **간소화 비용**: 24~1,258ms (순수 CPU 연산, DB 대기 없음)
|
||||||
|
- **벤치마크 모니터링**: `cache-benchmark.log`로 경로별 성능 지표 상시 수집
|
||||||
|
|||||||
@ -181,7 +181,7 @@ public class GisControllerV2 {
|
|||||||
content = @Content(schema = @Schema(implementation = VesselTracksRequest.class))
|
content = @Content(schema = @Schema(implementation = VesselTracksRequest.class))
|
||||||
)
|
)
|
||||||
@RequestBody VesselTracksRequest request) {
|
@RequestBody VesselTracksRequest request) {
|
||||||
return gisService.getVesselTracks(request);
|
return gisServiceV2.getVesselTracksV2(request);
|
||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("/vessels/recent-positions")
|
@GetMapping("/vessels/recent-positions")
|
||||||
|
|||||||
@ -2,12 +2,18 @@ package gc.mda.signal_batch.domain.gis.service;
|
|||||||
|
|
||||||
import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack;
|
import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack;
|
||||||
import gc.mda.signal_batch.domain.vessel.dto.TrackResponse;
|
import gc.mda.signal_batch.domain.vessel.dto.TrackResponse;
|
||||||
|
import gc.mda.signal_batch.domain.vessel.dto.VesselTracksRequest;
|
||||||
import gc.mda.signal_batch.domain.vessel.dto.IntegrationVessel;
|
import gc.mda.signal_batch.domain.vessel.dto.IntegrationVessel;
|
||||||
import gc.mda.signal_batch.domain.vessel.service.IntegrationVesselService;
|
import gc.mda.signal_batch.domain.vessel.service.IntegrationVesselService;
|
||||||
|
import gc.mda.signal_batch.global.exception.QueryTimeoutException;
|
||||||
import gc.mda.signal_batch.global.util.IntegrationSignalConstants;
|
import gc.mda.signal_batch.global.util.IntegrationSignalConstants;
|
||||||
import gc.mda.signal_batch.global.util.TrackConverter;
|
import gc.mda.signal_batch.global.util.TrackConverter;
|
||||||
|
import gc.mda.signal_batch.global.websocket.service.ActiveQueryManager;
|
||||||
|
import gc.mda.signal_batch.global.websocket.service.CacheTrackSimplifier;
|
||||||
|
import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.jdbc.core.JdbcTemplate;
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
@ -22,6 +28,11 @@ import java.util.stream.Collectors;
|
|||||||
/**
|
/**
|
||||||
* GIS 서비스 V2 - CompactVesselTrack 기반 응답
|
* GIS 서비스 V2 - CompactVesselTrack 기반 응답
|
||||||
* WebSocket API와 동일한 응답 구조 제공
|
* WebSocket API와 동일한 응답 구조 제공
|
||||||
|
*
|
||||||
|
* Phase: REST V2 캐시 + 부하 제어 + 응답 크기 제한
|
||||||
|
* - Semaphore 기반 동시성 제어 (ActiveQueryManager 공유)
|
||||||
|
* - POST /vessels: DailyTrackCacheManager 캐시 우선 조회
|
||||||
|
* - 2단계 간소화 파이프라인 (표준 간소화 + 포인트 버짓 강제)
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
@ -29,37 +40,81 @@ public class GisServiceV2 {
|
|||||||
|
|
||||||
private final DataSource queryDataSource;
|
private final DataSource queryDataSource;
|
||||||
private final IntegrationVesselService integrationVesselService;
|
private final IntegrationVesselService integrationVesselService;
|
||||||
|
private final ActiveQueryManager activeQueryManager;
|
||||||
|
private final DailyTrackCacheManager dailyTrackCacheManager;
|
||||||
|
private final CacheTrackSimplifier cacheTrackSimplifier;
|
||||||
|
private final GisService gisService;
|
||||||
|
|
||||||
|
@Value("${rest.v2.query.timeout-seconds:30}")
|
||||||
|
private int restQueryTimeout;
|
||||||
|
|
||||||
|
@Value("${rest.v2.query.max-total-points:500000}")
|
||||||
|
private int maxTotalPoints;
|
||||||
|
|
||||||
// 선박 정보 캐시 (TTL: 1시간)
|
// 선박 정보 캐시 (TTL: 1시간)
|
||||||
private final ConcurrentHashMap<String, VesselInfoCache> vesselInfoCache = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, VesselInfoCache> vesselInfoCache = new ConcurrentHashMap<>();
|
||||||
private static final long VESSEL_CACHE_TTL = 3600_000; // 1시간
|
private static final long VESSEL_CACHE_TTL = 3600_000; // 1시간
|
||||||
|
|
||||||
public GisServiceV2(@Qualifier("queryDataSource") DataSource queryDataSource,
|
public GisServiceV2(@Qualifier("queryDataSource") DataSource queryDataSource,
|
||||||
IntegrationVesselService integrationVesselService) {
|
IntegrationVesselService integrationVesselService,
|
||||||
|
ActiveQueryManager activeQueryManager,
|
||||||
|
DailyTrackCacheManager dailyTrackCacheManager,
|
||||||
|
CacheTrackSimplifier cacheTrackSimplifier,
|
||||||
|
GisService gisService) {
|
||||||
this.queryDataSource = queryDataSource;
|
this.queryDataSource = queryDataSource;
|
||||||
this.integrationVesselService = integrationVesselService;
|
this.integrationVesselService = integrationVesselService;
|
||||||
|
this.activeQueryManager = activeQueryManager;
|
||||||
|
this.dailyTrackCacheManager = dailyTrackCacheManager;
|
||||||
|
this.cacheTrackSimplifier = cacheTrackSimplifier;
|
||||||
|
this.gisService = gisService;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 해구별 선박 항적 조회 (V2 - CompactVesselTrack 반환)
|
* 해구별 선박 항적 조회 (V2 - CompactVesselTrack 반환)
|
||||||
|
* Semaphore 부하 제어 + 간소화 파이프라인 적용
|
||||||
*/
|
*/
|
||||||
public List<CompactVesselTrack> getHaeguTracks(Integer haeguNo, int minutes, boolean filterByIntegration) {
|
public List<CompactVesselTrack> getHaeguTracks(Integer haeguNo, int minutes, boolean filterByIntegration) {
|
||||||
JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource);
|
String queryId = "rest-haegu-" + haeguNo + "-" + UUID.randomUUID().toString().substring(0, 8);
|
||||||
List<TrackResponse> rawTracks = new ArrayList<>();
|
boolean slotAcquired = false;
|
||||||
|
|
||||||
LocalDateTime now = LocalDateTime.now();
|
try {
|
||||||
LocalDateTime startTime = now.minusMinutes(minutes);
|
slotAcquired = acquireSlotWithWait(queryId);
|
||||||
|
|
||||||
if (minutes > 60) {
|
JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource);
|
||||||
LocalDateTime currentHour = now.withMinute(0).withSecond(0).withNano(0);
|
List<TrackResponse> rawTracks = new ArrayList<>();
|
||||||
|
|
||||||
if (minutes <= 1440) {
|
LocalDateTime now = LocalDateTime.now();
|
||||||
// hourly 테이블에서 과거 데이터 조회
|
LocalDateTime startTime = now.minusMinutes(minutes);
|
||||||
String hourlySql = """
|
|
||||||
|
if (minutes > 60) {
|
||||||
|
LocalDateTime currentHour = now.withMinute(0).withSecond(0).withNano(0);
|
||||||
|
|
||||||
|
if (minutes <= 1440) {
|
||||||
|
String hourlySql = """
|
||||||
|
SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket,
|
||||||
|
public.ST_AsText(t.track_geom) as track_geom,
|
||||||
|
t.distance_nm, t.avg_speed, t.max_speed, t.point_count
|
||||||
|
FROM signal.t_vessel_tracks_hourly t
|
||||||
|
WHERE EXISTS (
|
||||||
|
SELECT 1 FROM signal.t_grid_vessel_tracks g
|
||||||
|
WHERE g.sig_src_cd = t.sig_src_cd
|
||||||
|
AND g.target_id = t.target_id
|
||||||
|
AND g.haegu_no = %d
|
||||||
|
AND g.time_bucket >= '%s'
|
||||||
|
)
|
||||||
|
AND t.time_bucket >= '%s'
|
||||||
|
AND t.time_bucket < '%s'
|
||||||
|
ORDER BY t.sig_src_cd, t.target_id, t.time_bucket
|
||||||
|
""".formatted(haeguNo, startTime, startTime, currentHour);
|
||||||
|
|
||||||
|
rawTracks.addAll(jdbcTemplate.query(hourlySql, this::mapTrackResponse));
|
||||||
|
}
|
||||||
|
|
||||||
|
String recentSql = """
|
||||||
SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket,
|
SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket,
|
||||||
public.ST_AsText(t.track_geom) as track_geom,
|
public.ST_AsText(t.track_geom) as track_geom,
|
||||||
t.distance_nm, t.avg_speed, t.max_speed, t.point_count
|
t.distance_nm, t.avg_speed, t.max_speed, t.point_count
|
||||||
FROM signal.t_vessel_tracks_hourly t
|
FROM signal.t_vessel_tracks_5min t
|
||||||
WHERE EXISTS (
|
WHERE EXISTS (
|
||||||
SELECT 1 FROM signal.t_grid_vessel_tracks g
|
SELECT 1 FROM signal.t_grid_vessel_tracks g
|
||||||
WHERE g.sig_src_cd = t.sig_src_cd
|
WHERE g.sig_src_cd = t.sig_src_cd
|
||||||
@ -68,87 +123,97 @@ public class GisServiceV2 {
|
|||||||
AND g.time_bucket >= '%s'
|
AND g.time_bucket >= '%s'
|
||||||
)
|
)
|
||||||
AND t.time_bucket >= '%s'
|
AND t.time_bucket >= '%s'
|
||||||
AND t.time_bucket < '%s'
|
|
||||||
ORDER BY t.sig_src_cd, t.target_id, t.time_bucket
|
ORDER BY t.sig_src_cd, t.target_id, t.time_bucket
|
||||||
""".formatted(haeguNo, startTime, startTime, currentHour);
|
""".formatted(haeguNo, startTime, currentHour);
|
||||||
|
|
||||||
rawTracks.addAll(jdbcTemplate.query(hourlySql, this::mapTrackResponse));
|
rawTracks.addAll(jdbcTemplate.query(recentSql, this::mapTrackResponse));
|
||||||
|
|
||||||
|
} else {
|
||||||
|
String sql = """
|
||||||
|
SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket,
|
||||||
|
public.ST_AsText(t.track_geom) as track_geom,
|
||||||
|
t.distance_nm, t.avg_speed, t.max_speed, t.point_count
|
||||||
|
FROM signal.t_vessel_tracks_5min t
|
||||||
|
WHERE EXISTS (
|
||||||
|
SELECT 1 FROM signal.t_grid_vessel_tracks g
|
||||||
|
WHERE g.sig_src_cd = t.sig_src_cd
|
||||||
|
AND g.target_id = t.target_id
|
||||||
|
AND g.haegu_no = %d
|
||||||
|
AND g.time_bucket >= NOW() - INTERVAL '%d minutes'
|
||||||
|
)
|
||||||
|
AND t.time_bucket >= NOW() - INTERVAL '%d minutes'
|
||||||
|
ORDER BY t.sig_src_cd, t.target_id, t.time_bucket
|
||||||
|
""".formatted(haeguNo, minutes, minutes);
|
||||||
|
|
||||||
|
rawTracks = jdbcTemplate.query(sql, this::mapTrackResponse);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5min 테이블에서 최근 데이터 조회
|
List<CompactVesselTrack> result = TrackConverter.convert(rawTracks, this::getVesselInfo);
|
||||||
String recentSql = """
|
|
||||||
SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket,
|
|
||||||
public.ST_AsText(t.track_geom) as track_geom,
|
|
||||||
t.distance_nm, t.avg_speed, t.max_speed, t.point_count
|
|
||||||
FROM signal.t_vessel_tracks_5min t
|
|
||||||
WHERE EXISTS (
|
|
||||||
SELECT 1 FROM signal.t_grid_vessel_tracks g
|
|
||||||
WHERE g.sig_src_cd = t.sig_src_cd
|
|
||||||
AND g.target_id = t.target_id
|
|
||||||
AND g.haegu_no = %d
|
|
||||||
AND g.time_bucket >= '%s'
|
|
||||||
)
|
|
||||||
AND t.time_bucket >= '%s'
|
|
||||||
ORDER BY t.sig_src_cd, t.target_id, t.time_bucket
|
|
||||||
""".formatted(haeguNo, startTime, currentHour);
|
|
||||||
|
|
||||||
rawTracks.addAll(jdbcTemplate.query(recentSql, this::mapTrackResponse));
|
if (filterByIntegration && integrationVesselService.isEnabled()) {
|
||||||
|
result = filterByIntegration(result);
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
result = applySimplificationPipeline(result);
|
||||||
// 1시간 이하는 5분 테이블만 사용
|
|
||||||
String sql = """
|
|
||||||
SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket,
|
|
||||||
public.ST_AsText(t.track_geom) as track_geom,
|
|
||||||
t.distance_nm, t.avg_speed, t.max_speed, t.point_count
|
|
||||||
FROM signal.t_vessel_tracks_5min t
|
|
||||||
WHERE EXISTS (
|
|
||||||
SELECT 1 FROM signal.t_grid_vessel_tracks g
|
|
||||||
WHERE g.sig_src_cd = t.sig_src_cd
|
|
||||||
AND g.target_id = t.target_id
|
|
||||||
AND g.haegu_no = %d
|
|
||||||
AND g.time_bucket >= NOW() - INTERVAL '%d minutes'
|
|
||||||
)
|
|
||||||
AND t.time_bucket >= NOW() - INTERVAL '%d minutes'
|
|
||||||
ORDER BY t.sig_src_cd, t.target_id, t.time_bucket
|
|
||||||
""".formatted(haeguNo, minutes, minutes);
|
|
||||||
|
|
||||||
rawTracks = jdbcTemplate.query(sql, this::mapTrackResponse);
|
log.debug("V2 API: Fetched {} compact tracks for haegu {} in last {} minutes",
|
||||||
|
result.size(), haeguNo, minutes);
|
||||||
|
|
||||||
|
return result;
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (slotAcquired) {
|
||||||
|
activeQueryManager.releaseQuerySlot(queryId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CompactVesselTrack으로 변환
|
|
||||||
List<CompactVesselTrack> result = TrackConverter.convert(rawTracks, this::getVesselInfo);
|
|
||||||
|
|
||||||
// 통합선박 필터링 적용
|
|
||||||
if (filterByIntegration && integrationVesselService.isEnabled()) {
|
|
||||||
result = filterByIntegration(result);
|
|
||||||
}
|
|
||||||
|
|
||||||
log.debug("V2 API: Fetched {} compact tracks for haegu {} in last {} minutes",
|
|
||||||
result.size(), haeguNo, minutes);
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 영역별 선박 항적 조회 (V2 - CompactVesselTrack 반환)
|
* 영역별 선박 항적 조회 (V2 - CompactVesselTrack 반환)
|
||||||
|
* Semaphore 부하 제어 + 간소화 파이프라인 적용
|
||||||
*/
|
*/
|
||||||
public List<CompactVesselTrack> getAreaTracks(String areaId, int minutes, boolean filterByIntegration) {
|
public List<CompactVesselTrack> getAreaTracks(String areaId, int minutes, boolean filterByIntegration) {
|
||||||
JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource);
|
String queryId = "rest-area-" + areaId + "-" + UUID.randomUUID().toString().substring(0, 8);
|
||||||
List<TrackResponse> rawTracks = new ArrayList<>();
|
boolean slotAcquired = false;
|
||||||
|
|
||||||
LocalDateTime now = LocalDateTime.now();
|
try {
|
||||||
LocalDateTime startTime = now.minusMinutes(minutes);
|
slotAcquired = acquireSlotWithWait(queryId);
|
||||||
|
|
||||||
if (minutes > 60) {
|
JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource);
|
||||||
LocalDateTime currentHour = now.withMinute(0).withSecond(0).withNano(0);
|
List<TrackResponse> rawTracks = new ArrayList<>();
|
||||||
|
|
||||||
if (minutes <= 1440) {
|
LocalDateTime now = LocalDateTime.now();
|
||||||
// hourly 테이블에서 과거 데이터 조회
|
LocalDateTime startTime = now.minusMinutes(minutes);
|
||||||
String hourlySql = """
|
|
||||||
|
if (minutes > 60) {
|
||||||
|
LocalDateTime currentHour = now.withMinute(0).withSecond(0).withNano(0);
|
||||||
|
|
||||||
|
if (minutes <= 1440) {
|
||||||
|
String hourlySql = """
|
||||||
|
SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket,
|
||||||
|
public.ST_AsText(t.track_geom) as track_geom,
|
||||||
|
t.distance_nm, t.avg_speed, t.max_speed, t.point_count
|
||||||
|
FROM signal.t_vessel_tracks_hourly t
|
||||||
|
WHERE EXISTS (
|
||||||
|
SELECT 1 FROM signal.t_area_vessel_tracks a
|
||||||
|
WHERE a.sig_src_cd = t.sig_src_cd
|
||||||
|
AND a.target_id = t.target_id
|
||||||
|
AND a.area_id = '%s'
|
||||||
|
AND a.time_bucket >= '%s'
|
||||||
|
)
|
||||||
|
AND t.time_bucket >= '%s'
|
||||||
|
AND t.time_bucket < '%s'
|
||||||
|
ORDER BY t.sig_src_cd, t.target_id, t.time_bucket
|
||||||
|
""".formatted(areaId, startTime, startTime, currentHour);
|
||||||
|
|
||||||
|
rawTracks.addAll(jdbcTemplate.query(hourlySql, this::mapTrackResponse));
|
||||||
|
}
|
||||||
|
|
||||||
|
String recentSql = """
|
||||||
SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket,
|
SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket,
|
||||||
public.ST_AsText(t.track_geom) as track_geom,
|
public.ST_AsText(t.track_geom) as track_geom,
|
||||||
t.distance_nm, t.avg_speed, t.max_speed, t.point_count
|
t.distance_nm, t.avg_speed, t.max_speed, t.point_count
|
||||||
FROM signal.t_vessel_tracks_hourly t
|
FROM signal.t_vessel_tracks_5min t
|
||||||
WHERE EXISTS (
|
WHERE EXISTS (
|
||||||
SELECT 1 FROM signal.t_area_vessel_tracks a
|
SELECT 1 FROM signal.t_area_vessel_tracks a
|
||||||
WHERE a.sig_src_cd = t.sig_src_cd
|
WHERE a.sig_src_cd = t.sig_src_cd
|
||||||
@ -157,70 +222,284 @@ public class GisServiceV2 {
|
|||||||
AND a.time_bucket >= '%s'
|
AND a.time_bucket >= '%s'
|
||||||
)
|
)
|
||||||
AND t.time_bucket >= '%s'
|
AND t.time_bucket >= '%s'
|
||||||
AND t.time_bucket < '%s'
|
|
||||||
ORDER BY t.sig_src_cd, t.target_id, t.time_bucket
|
ORDER BY t.sig_src_cd, t.target_id, t.time_bucket
|
||||||
""".formatted(areaId, startTime, startTime, currentHour);
|
""".formatted(areaId, startTime, currentHour);
|
||||||
|
|
||||||
rawTracks.addAll(jdbcTemplate.query(hourlySql, this::mapTrackResponse));
|
rawTracks.addAll(jdbcTemplate.query(recentSql, this::mapTrackResponse));
|
||||||
|
|
||||||
|
} else {
|
||||||
|
String sql = """
|
||||||
|
SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket,
|
||||||
|
public.ST_AsText(t.track_geom) as track_geom,
|
||||||
|
t.distance_nm, t.avg_speed, t.max_speed, t.point_count
|
||||||
|
FROM signal.t_vessel_tracks_5min t
|
||||||
|
WHERE EXISTS (
|
||||||
|
SELECT 1 FROM signal.t_area_vessel_tracks a
|
||||||
|
WHERE a.sig_src_cd = t.sig_src_cd
|
||||||
|
AND a.target_id = t.target_id
|
||||||
|
AND a.area_id = '%s'
|
||||||
|
AND a.time_bucket >= NOW() - INTERVAL '%d minutes'
|
||||||
|
)
|
||||||
|
AND t.time_bucket >= NOW() - INTERVAL '%d minutes'
|
||||||
|
ORDER BY t.sig_src_cd, t.target_id, t.time_bucket
|
||||||
|
""".formatted(areaId, minutes, minutes);
|
||||||
|
|
||||||
|
rawTracks = jdbcTemplate.query(sql, this::mapTrackResponse);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5min 테이블에서 최근 데이터 조회
|
List<CompactVesselTrack> result = TrackConverter.convert(rawTracks, this::getVesselInfo);
|
||||||
String recentSql = """
|
|
||||||
SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket,
|
|
||||||
public.ST_AsText(t.track_geom) as track_geom,
|
|
||||||
t.distance_nm, t.avg_speed, t.max_speed, t.point_count
|
|
||||||
FROM signal.t_vessel_tracks_5min t
|
|
||||||
WHERE EXISTS (
|
|
||||||
SELECT 1 FROM signal.t_area_vessel_tracks a
|
|
||||||
WHERE a.sig_src_cd = t.sig_src_cd
|
|
||||||
AND a.target_id = t.target_id
|
|
||||||
AND a.area_id = '%s'
|
|
||||||
AND a.time_bucket >= '%s'
|
|
||||||
)
|
|
||||||
AND t.time_bucket >= '%s'
|
|
||||||
ORDER BY t.sig_src_cd, t.target_id, t.time_bucket
|
|
||||||
""".formatted(areaId, startTime, currentHour);
|
|
||||||
|
|
||||||
rawTracks.addAll(jdbcTemplate.query(recentSql, this::mapTrackResponse));
|
if (filterByIntegration && integrationVesselService.isEnabled()) {
|
||||||
|
result = filterByIntegration(result);
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
result = applySimplificationPipeline(result);
|
||||||
// 1시간 이하는 5분 테이블만 사용
|
|
||||||
String sql = """
|
|
||||||
SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket,
|
|
||||||
public.ST_AsText(t.track_geom) as track_geom,
|
|
||||||
t.distance_nm, t.avg_speed, t.max_speed, t.point_count
|
|
||||||
FROM signal.t_vessel_tracks_5min t
|
|
||||||
WHERE EXISTS (
|
|
||||||
SELECT 1 FROM signal.t_area_vessel_tracks a
|
|
||||||
WHERE a.sig_src_cd = t.sig_src_cd
|
|
||||||
AND a.target_id = t.target_id
|
|
||||||
AND a.area_id = '%s'
|
|
||||||
AND a.time_bucket >= NOW() - INTERVAL '%d minutes'
|
|
||||||
)
|
|
||||||
AND t.time_bucket >= NOW() - INTERVAL '%d minutes'
|
|
||||||
ORDER BY t.sig_src_cd, t.target_id, t.time_bucket
|
|
||||||
""".formatted(areaId, minutes, minutes);
|
|
||||||
|
|
||||||
rawTracks = jdbcTemplate.query(sql, this::mapTrackResponse);
|
log.debug("V2 API: Fetched {} compact tracks for area {} in last {} minutes",
|
||||||
|
result.size(), areaId, minutes);
|
||||||
|
|
||||||
|
return result;
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (slotAcquired) {
|
||||||
|
activeQueryManager.releaseQuerySlot(queryId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CompactVesselTrack으로 변환
|
|
||||||
List<CompactVesselTrack> result = TrackConverter.convert(rawTracks, this::getVesselInfo);
|
|
||||||
|
|
||||||
// 통합선박 필터링 적용
|
|
||||||
if (filterByIntegration && integrationVesselService.isEnabled()) {
|
|
||||||
result = filterByIntegration(result);
|
|
||||||
}
|
|
||||||
|
|
||||||
log.debug("V2 API: Fetched {} compact tracks for area {} in last {} minutes",
|
|
||||||
result.size(), areaId, minutes);
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* TrackResponse 매핑
|
* 선박별 항적 조회 V2 (캐시 + Semaphore + 간소화)
|
||||||
|
* DailyTrackCacheManager를 활용한 캐시 우선 조회
|
||||||
*/
|
*/
|
||||||
|
public List<CompactVesselTrack> getVesselTracksV2(VesselTracksRequest request) {
|
||||||
|
String queryId = "rest-vessels-" + UUID.randomUUID().toString().substring(0, 8);
|
||||||
|
boolean slotAcquired = false;
|
||||||
|
|
||||||
|
try {
|
||||||
|
slotAcquired = acquireSlotWithWait(queryId);
|
||||||
|
|
||||||
|
List<CompactVesselTrack> result;
|
||||||
|
|
||||||
|
if (dailyTrackCacheManager.isEnabled() &&
|
||||||
|
dailyTrackCacheManager.getStatus() != DailyTrackCacheManager.CacheStatus.NOT_STARTED &&
|
||||||
|
dailyTrackCacheManager.getStatus() != DailyTrackCacheManager.CacheStatus.DISABLED) {
|
||||||
|
|
||||||
|
result = queryWithCache(request);
|
||||||
|
} else {
|
||||||
|
// 캐시 비활성화/미준비: 기존 GisService에 위임
|
||||||
|
result = gisService.getVesselTracks(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
result = applySimplificationPipeline(result);
|
||||||
|
|
||||||
|
log.debug("V2 API: Returned {} tracks for {} vessels (cache={})",
|
||||||
|
result.size(), request.getVessels().size(), dailyTrackCacheManager.isEnabled());
|
||||||
|
|
||||||
|
return result;
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (slotAcquired) {
|
||||||
|
activeQueryManager.releaseQuerySlot(queryId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── 캐시 조회 로직 ──
|
||||||
|
|
||||||
|
/**
|
||||||
|
* splitQueryRange를 사용한 캐시 우선 조회
|
||||||
|
* D-1부터 역순으로 캐시 존재 확인 → 캐시/DB 분리 조회 → 병합
|
||||||
|
*/
|
||||||
|
private List<CompactVesselTrack> queryWithCache(VesselTracksRequest request) {
|
||||||
|
LocalDateTime startTime = request.getStartTime();
|
||||||
|
LocalDateTime endTime = request.getEndTime();
|
||||||
|
|
||||||
|
DailyTrackCacheManager.SplitQueryResult split =
|
||||||
|
dailyTrackCacheManager.splitQueryRange(startTime, endTime);
|
||||||
|
|
||||||
|
List<CompactVesselTrack> allTracks = new ArrayList<>();
|
||||||
|
|
||||||
|
// 요청 선박 ID 집합 구성
|
||||||
|
Set<String> requestedVesselKeys = request.getVessels().stream()
|
||||||
|
.map(v -> v.getSigSrcCd() + "_" + v.getTargetId())
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
|
// 1. 캐시에서 조회 (캐시된 날짜)
|
||||||
|
if (split.hasCachedData()) {
|
||||||
|
List<CompactVesselTrack> cachedTracks =
|
||||||
|
dailyTrackCacheManager.getCachedTracksMultipleDays(split.getCachedDates());
|
||||||
|
|
||||||
|
// 요청 선박만 필터링
|
||||||
|
List<CompactVesselTrack> filteredCached = cachedTracks.stream()
|
||||||
|
.filter(t -> requestedVesselKeys.contains(t.getSigSrcCd() + "_" + t.getTargetId()))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
allTracks.addAll(filteredCached);
|
||||||
|
log.debug("[CacheQuery] cached {} days -> {} tracks (filtered from {})",
|
||||||
|
split.getCachedDates().size(), filteredCached.size(), cachedTracks.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. DB에서 조회 (캐시 미적중 과거 날짜)
|
||||||
|
if (split.hasDbRanges()) {
|
||||||
|
for (DailyTrackCacheManager.DateRange dbRange : split.getDbRanges()) {
|
||||||
|
VesselTracksRequest dbRequest = VesselTracksRequest.builder()
|
||||||
|
.startTime(dbRange.getStart())
|
||||||
|
.endTime(dbRange.getEnd())
|
||||||
|
.vessels(request.getVessels())
|
||||||
|
.isIntegration(request.getIsIntegration())
|
||||||
|
.build();
|
||||||
|
List<CompactVesselTrack> dbTracks = gisService.getVesselTracks(dbRequest);
|
||||||
|
allTracks.addAll(dbTracks);
|
||||||
|
log.debug("[CacheQuery] DB range {} ~ {} -> {} tracks",
|
||||||
|
dbRange.getStart(), dbRange.getEnd(), dbTracks.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. 오늘 구간 DB 조회 (hourly + 5min)
|
||||||
|
if (split.hasTodayRange()) {
|
||||||
|
DailyTrackCacheManager.DateRange today = split.getTodayRange();
|
||||||
|
VesselTracksRequest todayRequest = VesselTracksRequest.builder()
|
||||||
|
.startTime(today.getStart())
|
||||||
|
.endTime(today.getEnd())
|
||||||
|
.vessels(request.getVessels())
|
||||||
|
.isIntegration(request.getIsIntegration())
|
||||||
|
.build();
|
||||||
|
List<CompactVesselTrack> todayTracks = gisService.getVesselTracks(todayRequest);
|
||||||
|
allTracks.addAll(todayTracks);
|
||||||
|
log.debug("[CacheQuery] today {} ~ {} -> {} tracks",
|
||||||
|
today.getStart(), today.getEnd(), todayTracks.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. 동일 선박 병합 (캐시 + DB 결과)
|
||||||
|
List<CompactVesselTrack> merged = mergeTracksByVessel(allTracks);
|
||||||
|
|
||||||
|
// 5. 통합선박 필터링
|
||||||
|
if ("1".equals(request.getIsIntegration()) && integrationVesselService.isEnabled()) {
|
||||||
|
merged = filterByIntegration(merged);
|
||||||
|
}
|
||||||
|
|
||||||
|
return merged;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Semaphore 슬롯 획득 ──
|
||||||
|
|
||||||
|
/**
|
||||||
|
* REST V2 전용 슬롯 획득: 즉시 시도 → blocking 대기 → 타임아웃 시 예외
|
||||||
|
*/
|
||||||
|
private boolean acquireSlotWithWait(String queryId) {
|
||||||
|
if (activeQueryManager.tryAcquireQuerySlotImmediate(queryId)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
boolean acquired = activeQueryManager.tryAcquireQuerySlot(queryId);
|
||||||
|
if (!acquired) {
|
||||||
|
throw new QueryTimeoutException(
|
||||||
|
"서버가 과부하 상태입니다. " + restQueryTimeout + "초 대기 후 타임아웃되었습니다.");
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new QueryTimeoutException("쿼리 대기 중 인터럽트가 발생했습니다.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── 간소화 파이프라인 ──
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 2단계 간소화 파이프라인
|
||||||
|
* [1단계] 표준 간소화 (DP + 거리/시간 + 줌)
|
||||||
|
* [2단계] 포인트 버짓 강제 (총 포인트 상한 초과 시 균일 Nth-point)
|
||||||
|
*/
|
||||||
|
private List<CompactVesselTrack> applySimplificationPipeline(List<CompactVesselTrack> tracks) {
|
||||||
|
if (tracks == null || tracks.isEmpty()) {
|
||||||
|
return tracks;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1단계: 표준 간소화
|
||||||
|
tracks = cacheTrackSimplifier.simplify(tracks, CacheTrackSimplifier.SimplificationConfig.builder().build());
|
||||||
|
|
||||||
|
// 2단계: 포인트 버짓 강제
|
||||||
|
tracks = cacheTrackSimplifier.enforcePointBudget(tracks, maxTotalPoints);
|
||||||
|
|
||||||
|
return tracks;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── 선박별 트랙 병합 ──
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 동일 선박(vesselId)의 트랙을 병합
|
||||||
|
* 캐시와 DB에서 동일 선박 데이터가 올 수 있으므로 geometry/timestamps/speeds 합산
|
||||||
|
*/
|
||||||
|
private List<CompactVesselTrack> mergeTracksByVessel(List<CompactVesselTrack> tracks) {
|
||||||
|
if (tracks == null || tracks.size() <= 1) {
|
||||||
|
return tracks != null ? tracks : Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, List<CompactVesselTrack>> grouped = tracks.stream()
|
||||||
|
.collect(Collectors.groupingBy(t -> t.getSigSrcCd() + "_" + t.getTargetId()));
|
||||||
|
|
||||||
|
// 병합이 필요 없는 경우 (모든 선박이 1개씩만)
|
||||||
|
if (grouped.values().stream().allMatch(list -> list.size() == 1)) {
|
||||||
|
return tracks;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<CompactVesselTrack> merged = new ArrayList<>();
|
||||||
|
|
||||||
|
for (Map.Entry<String, List<CompactVesselTrack>> entry : grouped.entrySet()) {
|
||||||
|
List<CompactVesselTrack> vesselTracks = entry.getValue();
|
||||||
|
|
||||||
|
if (vesselTracks.size() == 1) {
|
||||||
|
merged.add(vesselTracks.get(0));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 첫 번째 트랙을 기준으로 병합
|
||||||
|
CompactVesselTrack base = vesselTracks.get(0);
|
||||||
|
List<double[]> allGeometry = new ArrayList<>(base.getGeometry() != null ? base.getGeometry() : Collections.emptyList());
|
||||||
|
List<String> allTimestamps = new ArrayList<>(base.getTimestamps() != null ? base.getTimestamps() : Collections.emptyList());
|
||||||
|
List<Double> allSpeeds = new ArrayList<>(base.getSpeeds() != null ? base.getSpeeds() : Collections.emptyList());
|
||||||
|
double totalDistance = base.getTotalDistance();
|
||||||
|
double maxSpeed = base.getMaxSpeed();
|
||||||
|
int totalPointCount = base.getPointCount();
|
||||||
|
|
||||||
|
for (int i = 1; i < vesselTracks.size(); i++) {
|
||||||
|
CompactVesselTrack t = vesselTracks.get(i);
|
||||||
|
if (t.getGeometry() != null) allGeometry.addAll(t.getGeometry());
|
||||||
|
if (t.getTimestamps() != null) allTimestamps.addAll(t.getTimestamps());
|
||||||
|
if (t.getSpeeds() != null) allSpeeds.addAll(t.getSpeeds());
|
||||||
|
totalDistance += t.getTotalDistance();
|
||||||
|
maxSpeed = Math.max(maxSpeed, t.getMaxSpeed());
|
||||||
|
totalPointCount += t.getPointCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
CompactVesselTrack mergedTrack = CompactVesselTrack.builder()
|
||||||
|
.vesselId(base.getVesselId())
|
||||||
|
.sigSrcCd(base.getSigSrcCd())
|
||||||
|
.targetId(base.getTargetId())
|
||||||
|
.nationalCode(base.getNationalCode())
|
||||||
|
.shipName(base.getShipName())
|
||||||
|
.shipType(base.getShipType())
|
||||||
|
.shipKindCode(base.getShipKindCode())
|
||||||
|
.integrationTargetId(base.getIntegrationTargetId())
|
||||||
|
.geometry(allGeometry)
|
||||||
|
.timestamps(allTimestamps)
|
||||||
|
.speeds(allSpeeds)
|
||||||
|
.totalDistance(totalDistance)
|
||||||
|
.avgSpeed(base.getAvgSpeed())
|
||||||
|
.maxSpeed(maxSpeed)
|
||||||
|
.pointCount(totalPointCount)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
merged.add(mergedTrack);
|
||||||
|
}
|
||||||
|
|
||||||
|
log.debug("[MergeVessels] {} tracks -> {} merged vessels", tracks.size(), merged.size());
|
||||||
|
return merged;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── 기존 유틸리티 메서드 (변경 없음) ──
|
||||||
|
|
||||||
private TrackResponse mapTrackResponse(ResultSet rs, int rowNum) throws SQLException {
|
private TrackResponse mapTrackResponse(ResultSet rs, int rowNum) throws SQLException {
|
||||||
return TrackResponse.builder()
|
return TrackResponse.builder()
|
||||||
.sigSrcCd(rs.getString("sig_src_cd"))
|
.sigSrcCd(rs.getString("sig_src_cd"))
|
||||||
@ -234,9 +513,6 @@ public class GisServiceV2 {
|
|||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 선박 정보 조회 (캐시 우선)
|
|
||||||
*/
|
|
||||||
private TrackConverter.VesselInfo getVesselInfo(String sigSrcCd, String targetId) {
|
private TrackConverter.VesselInfo getVesselInfo(String sigSrcCd, String targetId) {
|
||||||
String cacheKey = sigSrcCd + "_" + targetId;
|
String cacheKey = sigSrcCd + "_" + targetId;
|
||||||
|
|
||||||
@ -245,7 +521,6 @@ public class GisServiceV2 {
|
|||||||
return new TrackConverter.VesselInfo(cached.shipName, cached.shipType);
|
return new TrackConverter.VesselInfo(cached.shipName, cached.shipType);
|
||||||
}
|
}
|
||||||
|
|
||||||
// DB 조회
|
|
||||||
JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource);
|
JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource);
|
||||||
try {
|
try {
|
||||||
String sql = """
|
String sql = """
|
||||||
@ -259,7 +534,6 @@ public class GisServiceV2 {
|
|||||||
String shipName = result.get("ship_nm") != null ? result.get("ship_nm").toString() : "-";
|
String shipName = result.get("ship_nm") != null ? result.get("ship_nm").toString() : "-";
|
||||||
String shipType = result.get("ship_ty") != null ? result.get("ship_ty").toString() : "-";
|
String shipType = result.get("ship_ty") != null ? result.get("ship_ty").toString() : "-";
|
||||||
|
|
||||||
// 캐시 저장
|
|
||||||
vesselInfoCache.put(cacheKey, new VesselInfoCache(shipName, shipType));
|
vesselInfoCache.put(cacheKey, new VesselInfoCache(shipName, shipType));
|
||||||
|
|
||||||
return new TrackConverter.VesselInfo(shipName, shipType);
|
return new TrackConverter.VesselInfo(shipName, shipType);
|
||||||
@ -268,15 +542,11 @@ public class GisServiceV2 {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 통합선박 기준 필터링
|
|
||||||
*/
|
|
||||||
private List<CompactVesselTrack> filterByIntegration(List<CompactVesselTrack> tracks) {
|
private List<CompactVesselTrack> filterByIntegration(List<CompactVesselTrack> tracks) {
|
||||||
if (tracks == null || tracks.isEmpty()) {
|
if (tracks == null || tracks.isEmpty()) {
|
||||||
return tracks;
|
return tracks;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 1. 모든 트랙의 통합선박 정보 조회
|
|
||||||
Map<String, IntegrationVessel> vesselIntegrations = new HashMap<>();
|
Map<String, IntegrationVessel> vesselIntegrations = new HashMap<>();
|
||||||
for (CompactVesselTrack track : tracks) {
|
for (CompactVesselTrack track : tracks) {
|
||||||
String key = track.getSigSrcCd() + "_" + track.getTargetId();
|
String key = track.getSigSrcCd() + "_" + track.getTargetId();
|
||||||
@ -288,7 +558,6 @@ public class GisServiceV2 {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. 통합선박별 그룹핑
|
|
||||||
Map<Long, List<CompactVesselTrack>> groupedByIntegration = new HashMap<>();
|
Map<Long, List<CompactVesselTrack>> groupedByIntegration = new HashMap<>();
|
||||||
Map<Long, IntegrationVessel> integrationMap = new HashMap<>();
|
Map<Long, IntegrationVessel> integrationMap = new HashMap<>();
|
||||||
|
|
||||||
@ -308,7 +577,6 @@ public class GisServiceV2 {
|
|||||||
groupedByIntegration.computeIfAbsent(seq, k -> new ArrayList<>()).add(track);
|
groupedByIntegration.computeIfAbsent(seq, k -> new ArrayList<>()).add(track);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. 각 그룹에서 최고 우선순위 신호만 선택
|
|
||||||
List<CompactVesselTrack> result = new ArrayList<>();
|
List<CompactVesselTrack> result = new ArrayList<>();
|
||||||
|
|
||||||
for (Map.Entry<Long, List<CompactVesselTrack>> entry : groupedByIntegration.entrySet()) {
|
for (Map.Entry<Long, List<CompactVesselTrack>> entry : groupedByIntegration.entrySet()) {
|
||||||
@ -316,7 +584,6 @@ public class GisServiceV2 {
|
|||||||
List<CompactVesselTrack> groupTracks = entry.getValue();
|
List<CompactVesselTrack> groupTracks = entry.getValue();
|
||||||
|
|
||||||
if (seq < 0) {
|
if (seq < 0) {
|
||||||
// 통합정보 없는 단독 선박
|
|
||||||
CompactVesselTrack firstTrack = groupTracks.get(0);
|
CompactVesselTrack firstTrack = groupTracks.get(0);
|
||||||
String soloIntegrationId = IntegrationSignalConstants.generateSoloIntegrationId(
|
String soloIntegrationId = IntegrationSignalConstants.generateSoloIntegrationId(
|
||||||
firstTrack.getSigSrcCd(),
|
firstTrack.getSigSrcCd(),
|
||||||
@ -325,7 +592,6 @@ public class GisServiceV2 {
|
|||||||
groupTracks.forEach(t -> t.setIntegrationTargetId(soloIntegrationId));
|
groupTracks.forEach(t -> t.setIntegrationTargetId(soloIntegrationId));
|
||||||
result.addAll(groupTracks);
|
result.addAll(groupTracks);
|
||||||
} else {
|
} else {
|
||||||
// 통합선박 → 존재하는 신호 중 최고 우선순위 선택
|
|
||||||
IntegrationVessel integration = integrationMap.get(seq);
|
IntegrationVessel integration = integrationMap.get(seq);
|
||||||
|
|
||||||
Set<String> existingSigSrcCds = groupTracks.stream()
|
Set<String> existingSigSrcCds = groupTracks.stream()
|
||||||
@ -349,9 +615,6 @@ public class GisServiceV2 {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 선박 정보 캐시 내부 클래스
|
|
||||||
*/
|
|
||||||
private static class VesselInfoCache {
|
private static class VesselInfoCache {
|
||||||
String shipName;
|
String shipName;
|
||||||
String shipType;
|
String shipType;
|
||||||
|
|||||||
@ -16,7 +16,7 @@ import java.util.List;
|
|||||||
* LineStringM 대신 단순 배열로 전송하여 프론트엔드 파싱 부하 제거
|
* LineStringM 대신 단순 배열로 전송하여 프론트엔드 파싱 부하 제거
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
@Builder
|
@Builder(toBuilder = true)
|
||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||||
|
|||||||
@ -0,0 +1,16 @@
|
|||||||
|
package gc.mda.signal_batch.global.exception;
|
||||||
|
|
||||||
|
import org.springframework.http.HttpStatus;
|
||||||
|
import org.springframework.web.bind.annotation.ResponseStatus;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* REST V2 쿼리 슬롯 대기 타임아웃 시 발생
|
||||||
|
* 503 Service Unavailable 자동 반환
|
||||||
|
*/
|
||||||
|
@ResponseStatus(HttpStatus.SERVICE_UNAVAILABLE)
|
||||||
|
public class QueryTimeoutException extends RuntimeException {
|
||||||
|
|
||||||
|
public QueryTimeoutException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,430 @@
|
|||||||
|
package gc.mda.signal_batch.global.websocket.service;
|
||||||
|
|
||||||
|
import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack;
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.time.temporal.ChronoUnit;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 인메모리 캐시 트랙 간소화 유틸리티
|
||||||
|
* DB 경로의 ST_Simplify + 거리/시간 샘플링 + 줌 샘플링과 동등한 간소화를 캐시 경로에 적용
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class CacheTrackSimplifier {
|
||||||
|
|
||||||
|
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||||
|
private static final double EARTH_RADIUS_NM = 3440.065;
|
||||||
|
|
||||||
|
// ── SimplificationConfig: 향후 요청 파라미터로 노출 가능한 구조 ──
|
||||||
|
@Data
|
||||||
|
@Builder
|
||||||
|
public static class SimplificationConfig {
|
||||||
|
/** Douglas-Peucker tolerance (ST_Simplify 대체, degree 단위) */
|
||||||
|
@Builder.Default private double dpTolerance = 0.005;
|
||||||
|
/** 거리 기반 샘플링 최소 거리 (해리) */
|
||||||
|
@Builder.Default private double minDistanceNm = 0.05;
|
||||||
|
/** 시간 기반 샘플링 최소 간격 (분) */
|
||||||
|
@Builder.Default private long minIntervalMinutes = 60;
|
||||||
|
/** 저속 선박 속도 임계값 (knots) */
|
||||||
|
@Builder.Default private double lowSpeedThreshold = 5.0;
|
||||||
|
/** 저속 선박 최소 거리 (해리) */
|
||||||
|
@Builder.Default private double lowSpeedMinDistanceNm = 1.5;
|
||||||
|
/** 저속 선박 최소 시간 간격 (분) */
|
||||||
|
@Builder.Default private long lowSpeedMinIntervalMinutes = 45;
|
||||||
|
/** 줌 기반 N번째 포인트 샘플링 (null = 미적용) */
|
||||||
|
@Builder.Default private Integer zoomSampleRate = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── zoom → config 팩토리 ──
|
||||||
|
public static SimplificationConfig forZoomLevel(Integer zoomLevel) {
|
||||||
|
return SimplificationConfig.builder()
|
||||||
|
.dpTolerance(getDpTolerance(zoomLevel))
|
||||||
|
.zoomSampleRate(getZoomSampleRate(zoomLevel))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static double getDpTolerance(Integer zoom) {
|
||||||
|
if (zoom == null) zoom = 10;
|
||||||
|
return switch (zoom) {
|
||||||
|
case 0, 1, 2, 3, 4, 5 -> 0.05;
|
||||||
|
case 6, 7 -> 0.02;
|
||||||
|
case 8, 9 -> 0.01;
|
||||||
|
case 10, 11 -> 0.005;
|
||||||
|
default -> 0.002;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Integer getZoomSampleRate(Integer zoom) {
|
||||||
|
if (zoom == null || zoom >= 10) return null;
|
||||||
|
if (zoom < 6) return 10;
|
||||||
|
if (zoom < 8) return 5;
|
||||||
|
return 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── 메인 API ──
|
||||||
|
|
||||||
|
public List<CompactVesselTrack> simplify(List<CompactVesselTrack> tracks, Integer zoomLevel) {
|
||||||
|
return simplify(tracks, forZoomLevel(zoomLevel));
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<CompactVesselTrack> simplify(List<CompactVesselTrack> tracks, SimplificationConfig config) {
|
||||||
|
if (tracks == null || tracks.isEmpty()) {
|
||||||
|
return tracks;
|
||||||
|
}
|
||||||
|
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
int totalOriginalPoints = 0;
|
||||||
|
int totalAfterDp = 0;
|
||||||
|
int totalAfterDistTime = 0;
|
||||||
|
int totalAfterZoom = 0;
|
||||||
|
int simplifiedCount = 0;
|
||||||
|
int skippedCount = 0;
|
||||||
|
|
||||||
|
for (int t = 0; t < tracks.size(); t++) {
|
||||||
|
CompactVesselTrack track = tracks.get(t);
|
||||||
|
if (track.getGeometry() == null || track.getGeometry().size() <= 2) {
|
||||||
|
skippedCount++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int originalSize = track.getGeometry().size();
|
||||||
|
totalOriginalPoints += originalSize;
|
||||||
|
|
||||||
|
// 1단계: Douglas-Peucker
|
||||||
|
applyDouglasPeucker(track, config.getDpTolerance());
|
||||||
|
int afterDp = track.getGeometry().size();
|
||||||
|
totalAfterDp += afterDp;
|
||||||
|
|
||||||
|
// 2단계: 거리/시간 샘플링
|
||||||
|
applyDistanceTimeSampling(track, config);
|
||||||
|
int afterDistTime = track.getGeometry().size();
|
||||||
|
totalAfterDistTime += afterDistTime;
|
||||||
|
|
||||||
|
// 3단계: 줌 기반 추가 샘플링
|
||||||
|
if (config.getZoomSampleRate() != null) {
|
||||||
|
applyZoomSampling(track, config.getZoomSampleRate());
|
||||||
|
}
|
||||||
|
int afterZoom = track.getGeometry().size();
|
||||||
|
totalAfterZoom += afterZoom;
|
||||||
|
|
||||||
|
track.setPointCount(afterZoom);
|
||||||
|
|
||||||
|
// 처음 5개 선박 상세 로그 (debug 레벨)
|
||||||
|
if (simplifiedCount < 5) {
|
||||||
|
log.debug("[CacheSimplify] vessel={} original={} -> DP={} -> distTime={} -> zoom={} (avg={} kn)",
|
||||||
|
track.getVesselId(), originalSize, afterDp, afterDistTime, afterZoom,
|
||||||
|
track.getAvgSpeed() != null ? String.format("%.1f", track.getAvgSpeed()) : "N/A");
|
||||||
|
}
|
||||||
|
simplifiedCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
long elapsed = System.currentTimeMillis() - startTime;
|
||||||
|
|
||||||
|
if (simplifiedCount > 0) {
|
||||||
|
double totalReduction = (1 - (double) totalAfterZoom / totalOriginalPoints) * 100;
|
||||||
|
|
||||||
|
log.info("[CacheSimplify] tracks={} (skip={}), {} -> {} pts ({}% 감소), {}ms",
|
||||||
|
simplifiedCount, skippedCount,
|
||||||
|
totalOriginalPoints, totalAfterZoom, Math.round(totalReduction), elapsed);
|
||||||
|
}
|
||||||
|
|
||||||
|
return tracks;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── 1단계: Douglas-Peucker (ST_Simplify 대체) ──
|
||||||
|
|
||||||
|
private void applyDouglasPeucker(CompactVesselTrack track, double tolerance) {
|
||||||
|
List<double[]> geometry = track.getGeometry();
|
||||||
|
int n = geometry.size();
|
||||||
|
if (n <= 2) return;
|
||||||
|
|
||||||
|
boolean[] keep = new boolean[n];
|
||||||
|
keep[0] = true;
|
||||||
|
keep[n - 1] = true;
|
||||||
|
|
||||||
|
douglasPeuckerRecursive(geometry, 0, n - 1, tolerance, keep);
|
||||||
|
|
||||||
|
List<double[]> newGeometry = new ArrayList<>();
|
||||||
|
List<String> newTimestamps = new ArrayList<>();
|
||||||
|
List<Double> newSpeeds = new ArrayList<>();
|
||||||
|
|
||||||
|
List<String> timestamps = track.getTimestamps();
|
||||||
|
List<Double> speeds = track.getSpeeds();
|
||||||
|
|
||||||
|
for (int i = 0; i < n; i++) {
|
||||||
|
if (keep[i]) {
|
||||||
|
newGeometry.add(geometry.get(i));
|
||||||
|
if (timestamps != null && i < timestamps.size()) {
|
||||||
|
newTimestamps.add(timestamps.get(i));
|
||||||
|
}
|
||||||
|
if (speeds != null && i < speeds.size()) {
|
||||||
|
newSpeeds.add(speeds.get(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
track.setGeometry(newGeometry);
|
||||||
|
track.setTimestamps(newTimestamps);
|
||||||
|
track.setSpeeds(newSpeeds);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void douglasPeuckerRecursive(List<double[]> points, int start, int end, double tolerance, boolean[] keep) {
|
||||||
|
if (end - start < 2) return;
|
||||||
|
|
||||||
|
double maxDist = 0;
|
||||||
|
int maxIndex = start;
|
||||||
|
|
||||||
|
double[] p1 = points.get(start);
|
||||||
|
double[] p2 = points.get(end);
|
||||||
|
|
||||||
|
for (int i = start + 1; i < end; i++) {
|
||||||
|
double dist = perpendicularDistance(points.get(i), p1, p2);
|
||||||
|
if (dist > maxDist) {
|
||||||
|
maxDist = dist;
|
||||||
|
maxIndex = i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (maxDist > tolerance) {
|
||||||
|
keep[maxIndex] = true;
|
||||||
|
douglasPeuckerRecursive(points, start, maxIndex, tolerance, keep);
|
||||||
|
douglasPeuckerRecursive(points, maxIndex, end, tolerance, keep);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 점-선분 직교 거리 (degree 단위, ST_Simplify와 동일)
|
||||||
|
*/
|
||||||
|
private double perpendicularDistance(double[] point, double[] lineStart, double[] lineEnd) {
|
||||||
|
double dx = lineEnd[0] - lineStart[0];
|
||||||
|
double dy = lineEnd[1] - lineStart[1];
|
||||||
|
|
||||||
|
if (dx == 0 && dy == 0) {
|
||||||
|
// 선분이 점인 경우
|
||||||
|
double pdx = point[0] - lineStart[0];
|
||||||
|
double pdy = point[1] - lineStart[1];
|
||||||
|
return Math.sqrt(pdx * pdx + pdy * pdy);
|
||||||
|
}
|
||||||
|
|
||||||
|
double t = ((point[0] - lineStart[0]) * dx + (point[1] - lineStart[1]) * dy) / (dx * dx + dy * dy);
|
||||||
|
t = Math.max(0, Math.min(1, t));
|
||||||
|
|
||||||
|
double nearestX = lineStart[0] + t * dx;
|
||||||
|
double nearestY = lineStart[1] + t * dy;
|
||||||
|
|
||||||
|
double distX = point[0] - nearestX;
|
||||||
|
double distY = point[1] - nearestY;
|
||||||
|
|
||||||
|
return Math.sqrt(distX * distX + distY * distY);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── 2단계: 거리/시간 기반 샘플링 ──
|
||||||
|
|
||||||
|
private void applyDistanceTimeSampling(CompactVesselTrack track, SimplificationConfig config) {
|
||||||
|
List<double[]> geometry = track.getGeometry();
|
||||||
|
List<String> timestamps = track.getTimestamps();
|
||||||
|
List<Double> speeds = track.getSpeeds();
|
||||||
|
|
||||||
|
if (geometry.size() <= 2) return;
|
||||||
|
|
||||||
|
List<double[]> simplified = new ArrayList<>();
|
||||||
|
List<String> simplifiedTs = new ArrayList<>();
|
||||||
|
List<Double> simplifiedSpd = new ArrayList<>();
|
||||||
|
|
||||||
|
double[] prevPoint = null;
|
||||||
|
LocalDateTime prevTime = null;
|
||||||
|
|
||||||
|
for (int i = 0; i < geometry.size(); i++) {
|
||||||
|
double[] point = geometry.get(i);
|
||||||
|
boolean include = false;
|
||||||
|
|
||||||
|
if (i == 0 || i == geometry.size() - 1) {
|
||||||
|
include = true;
|
||||||
|
} else if (prevPoint != null && prevTime != null && timestamps != null && i < timestamps.size()) {
|
||||||
|
LocalDateTime currentTime = parseTimestamp(timestamps.get(i));
|
||||||
|
if (currentTime != null) {
|
||||||
|
double distance = calculateDistance(prevPoint[1], prevPoint[0], point[1], point[0]);
|
||||||
|
long minutesSincePrev = ChronoUnit.MINUTES.between(prevTime, currentTime);
|
||||||
|
|
||||||
|
// DAILY 전략 기준 (DB 경로와 동일)
|
||||||
|
include = distance > config.getMinDistanceNm() || minutesSincePrev >= config.getMinIntervalMinutes();
|
||||||
|
|
||||||
|
// 저속 선박 강화 간소화
|
||||||
|
if (track.getAvgSpeed() != null && track.getAvgSpeed() > 0 && track.getAvgSpeed() < config.getLowSpeedThreshold()) {
|
||||||
|
include = distance > config.getLowSpeedMinDistanceNm() || minutesSincePrev >= config.getLowSpeedMinIntervalMinutes();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (include) {
|
||||||
|
simplified.add(point);
|
||||||
|
if (timestamps != null && i < timestamps.size()) {
|
||||||
|
simplifiedTs.add(timestamps.get(i));
|
||||||
|
}
|
||||||
|
if (speeds != null && i < speeds.size()) {
|
||||||
|
simplifiedSpd.add(speeds.get(i));
|
||||||
|
}
|
||||||
|
prevPoint = point;
|
||||||
|
if (timestamps != null && i < timestamps.size()) {
|
||||||
|
prevTime = parseTimestamp(timestamps.get(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
track.setGeometry(simplified);
|
||||||
|
track.setTimestamps(simplifiedTs);
|
||||||
|
track.setSpeeds(simplifiedSpd);
|
||||||
|
}
|
||||||
|
|
||||||
|
private LocalDateTime parseTimestamp(String tsStr) {
|
||||||
|
if (tsStr == null) return null;
|
||||||
|
try {
|
||||||
|
if (tsStr.matches("\\d{10,}")) {
|
||||||
|
return LocalDateTime.ofInstant(
|
||||||
|
java.time.Instant.ofEpochSecond(Long.parseLong(tsStr)),
|
||||||
|
java.time.ZoneId.systemDefault());
|
||||||
|
} else {
|
||||||
|
return LocalDateTime.parse(tsStr, TIMESTAMP_FORMATTER);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── 3단계: 줌 기반 추가 샘플링 ──
|
||||||
|
|
||||||
|
private void applyZoomSampling(CompactVesselTrack track, int sampleRate) {
|
||||||
|
List<double[]> geometry = track.getGeometry();
|
||||||
|
List<String> timestamps = track.getTimestamps();
|
||||||
|
List<Double> speeds = track.getSpeeds();
|
||||||
|
|
||||||
|
if (geometry.size() <= 2 || sampleRate <= 1) return;
|
||||||
|
|
||||||
|
List<double[]> sampled = new ArrayList<>();
|
||||||
|
List<String> sampledTs = new ArrayList<>();
|
||||||
|
List<Double> sampledSpd = new ArrayList<>();
|
||||||
|
|
||||||
|
for (int i = 0; i < geometry.size(); i++) {
|
||||||
|
if (i % sampleRate == 0 || i == geometry.size() - 1) {
|
||||||
|
sampled.add(geometry.get(i));
|
||||||
|
if (timestamps != null && i < timestamps.size()) {
|
||||||
|
sampledTs.add(timestamps.get(i));
|
||||||
|
}
|
||||||
|
if (speeds != null && i < speeds.size()) {
|
||||||
|
sampledSpd.add(speeds.get(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
track.setGeometry(sampled);
|
||||||
|
track.setTimestamps(sampledTs);
|
||||||
|
track.setSpeeds(sampledSpd);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── 포인트 버짓 강제 (REST V2 응답 크기 제한) ──
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 전체 트랙의 총 포인트가 상한을 초과하면 비율 기반 균등 분배 샘플링 적용.
|
||||||
|
* 모든 트랙에 동일한 비율(ratio)을 적용하여 일관적 간소화 보장.
|
||||||
|
*
|
||||||
|
* 예시: 총 51만, 상한 50만 → ratio=0.98, 각 트랙 포인트 2%만 제거 → 결과 ≈50만
|
||||||
|
* 예시: 총 100만, 상한 50만 → ratio=0.5, 각 트랙 포인트 50% 제거 → 결과 ≈50만
|
||||||
|
*/
|
||||||
|
public List<CompactVesselTrack> enforcePointBudget(List<CompactVesselTrack> tracks, int maxTotalPoints) {
|
||||||
|
if (tracks == null || tracks.isEmpty() || maxTotalPoints <= 0) {
|
||||||
|
return tracks;
|
||||||
|
}
|
||||||
|
|
||||||
|
int totalPoints = tracks.stream()
|
||||||
|
.mapToInt(t -> t.getGeometry() != null ? t.getGeometry().size() : 0)
|
||||||
|
.sum();
|
||||||
|
|
||||||
|
if (totalPoints <= maxTotalPoints) {
|
||||||
|
log.debug("[PointBudget] totalPoints={} <= max={}, no additional simplification needed",
|
||||||
|
totalPoints, maxTotalPoints);
|
||||||
|
return tracks;
|
||||||
|
}
|
||||||
|
|
||||||
|
double ratio = (double) maxTotalPoints / totalPoints;
|
||||||
|
log.info("[PointBudget] totalPoints={} > max={}, applying uniform ratio={} to {} tracks",
|
||||||
|
totalPoints, maxTotalPoints, String.format("%.4f", ratio), tracks.size());
|
||||||
|
|
||||||
|
int resultPoints = 0;
|
||||||
|
for (CompactVesselTrack track : tracks) {
|
||||||
|
if (track.getGeometry() != null && track.getGeometry().size() > 2) {
|
||||||
|
int trackPoints = track.getGeometry().size();
|
||||||
|
int targetPoints = Math.max(2, (int) Math.round(trackPoints * ratio));
|
||||||
|
|
||||||
|
if (targetPoints < trackPoints) {
|
||||||
|
applyProportionalSampling(track, targetPoints);
|
||||||
|
track.setPointCount(track.getGeometry().size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resultPoints += track.getGeometry() != null ? track.getGeometry().size() : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("[PointBudget] reduced {} -> {} points ({} tracks)",
|
||||||
|
totalPoints, resultPoints, tracks.size());
|
||||||
|
|
||||||
|
return tracks;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 비율 기반 균등 분배 샘플링: targetPoints 개수만큼 균등 간격으로 포인트 선택.
|
||||||
|
* 첫 포인트와 마지막 포인트는 항상 보존.
|
||||||
|
*/
|
||||||
|
private void applyProportionalSampling(CompactVesselTrack track, int targetPoints) {
|
||||||
|
List<double[]> geometry = track.getGeometry();
|
||||||
|
List<String> timestamps = track.getTimestamps();
|
||||||
|
List<Double> speeds = track.getSpeeds();
|
||||||
|
int n = geometry.size();
|
||||||
|
|
||||||
|
if (targetPoints >= n || targetPoints < 2) return;
|
||||||
|
|
||||||
|
List<double[]> sampled = new ArrayList<>(targetPoints);
|
||||||
|
List<String> sampledTs = (timestamps != null) ? new ArrayList<>(targetPoints) : null;
|
||||||
|
List<Double> sampledSpd = (speeds != null) ? new ArrayList<>(targetPoints) : null;
|
||||||
|
|
||||||
|
double stride = (double) (n - 1) / (targetPoints - 1);
|
||||||
|
|
||||||
|
for (int i = 0; i < targetPoints; i++) {
|
||||||
|
int index = (i == targetPoints - 1) ? n - 1 : (int) Math.round(i * stride);
|
||||||
|
|
||||||
|
sampled.add(geometry.get(index));
|
||||||
|
if (sampledTs != null && timestamps != null && index < timestamps.size()) {
|
||||||
|
sampledTs.add(timestamps.get(index));
|
||||||
|
}
|
||||||
|
if (sampledSpd != null && speeds != null && index < speeds.size()) {
|
||||||
|
sampledSpd.add(speeds.get(index));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
track.setGeometry(sampled);
|
||||||
|
if (sampledTs != null) track.setTimestamps(sampledTs);
|
||||||
|
if (sampledSpd != null) track.setSpeeds(sampledSpd);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── 거리 계산 (Haversine, 해리 단위) ──
|
||||||
|
|
||||||
|
private double calculateDistance(double lat1, double lon1, double lat2, double lon2) {
|
||||||
|
double lat1Rad = Math.toRadians(lat1);
|
||||||
|
double lat2Rad = Math.toRadians(lat2);
|
||||||
|
double deltaLat = lat2Rad - lat1Rad;
|
||||||
|
double deltaLon = Math.toRadians(lon2 - lon1);
|
||||||
|
|
||||||
|
double a = Math.sin(deltaLat / 2) * Math.sin(deltaLat / 2) +
|
||||||
|
Math.cos(lat1Rad) * Math.cos(lat2Rad) *
|
||||||
|
Math.sin(deltaLon / 2) * Math.sin(deltaLon / 2);
|
||||||
|
double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
|
||||||
|
|
||||||
|
return EARTH_RADIUS_NM * c;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -49,6 +49,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||||||
@Service
|
@Service
|
||||||
public class ChunkedTrackStreamingService {
|
public class ChunkedTrackStreamingService {
|
||||||
|
|
||||||
|
private static final org.slf4j.Logger benchmarkLog = org.slf4j.LoggerFactory.getLogger("CACHE_BENCHMARK"); // [BENCHMARK]
|
||||||
|
|
||||||
private final JdbcTemplate queryJdbcTemplate;
|
private final JdbcTemplate queryJdbcTemplate;
|
||||||
private final DataSource queryDataSource;
|
private final DataSource queryDataSource;
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
@ -57,6 +59,7 @@ public class ChunkedTrackStreamingService {
|
|||||||
private final IntegrationVesselService integrationVesselService;
|
private final IntegrationVesselService integrationVesselService;
|
||||||
private final TrackQueryInterceptor trackQueryInterceptor;
|
private final TrackQueryInterceptor trackQueryInterceptor;
|
||||||
private final DailyTrackCacheManager dailyTrackCacheManager;
|
private final DailyTrackCacheManager dailyTrackCacheManager;
|
||||||
|
private final CacheTrackSimplifier cacheTrackSimplifier;
|
||||||
private final WKTReader wktReader = new WKTReader();
|
private final WKTReader wktReader = new WKTReader();
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
|
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
|
||||||
@ -97,7 +100,8 @@ public class ChunkedTrackStreamingService {
|
|||||||
ActiveQueryManager activeQueryManager,
|
ActiveQueryManager activeQueryManager,
|
||||||
IntegrationVesselService integrationVesselService,
|
IntegrationVesselService integrationVesselService,
|
||||||
TrackQueryInterceptor trackQueryInterceptor,
|
TrackQueryInterceptor trackQueryInterceptor,
|
||||||
DailyTrackCacheManager dailyTrackCacheManager) {
|
DailyTrackCacheManager dailyTrackCacheManager,
|
||||||
|
CacheTrackSimplifier cacheTrackSimplifier) {
|
||||||
this.queryJdbcTemplate = queryJdbcTemplate;
|
this.queryJdbcTemplate = queryJdbcTemplate;
|
||||||
this.queryDataSource = queryDataSource;
|
this.queryDataSource = queryDataSource;
|
||||||
this.simplificationStrategy = simplificationStrategy;
|
this.simplificationStrategy = simplificationStrategy;
|
||||||
@ -105,6 +109,7 @@ public class ChunkedTrackStreamingService {
|
|||||||
this.integrationVesselService = integrationVesselService;
|
this.integrationVesselService = integrationVesselService;
|
||||||
this.trackQueryInterceptor = trackQueryInterceptor;
|
this.trackQueryInterceptor = trackQueryInterceptor;
|
||||||
this.dailyTrackCacheManager = dailyTrackCacheManager;
|
this.dailyTrackCacheManager = dailyTrackCacheManager;
|
||||||
|
this.cacheTrackSimplifier = cacheTrackSimplifier;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -138,6 +143,61 @@ public class ChunkedTrackStreamingService {
|
|||||||
private volatile int dynamicChunkSizeKB = MAX_MESSAGE_SIZE_KB;
|
private volatile int dynamicChunkSizeKB = MAX_MESSAGE_SIZE_KB;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// [BENCHMARK] 쿼리별 벤치마크 지표 수집용 내부 클래스 - 제거 시 이 클래스 전체 삭제
|
||||||
|
private static class QueryBenchmark {
|
||||||
|
int cacheHitDays = 0;
|
||||||
|
int dbQueryDays = 0;
|
||||||
|
int totalTracks = 0;
|
||||||
|
int totalPointsBefore = 0;
|
||||||
|
int totalPointsAfter = 0;
|
||||||
|
int totalBatches = 0;
|
||||||
|
int batchesBeforeSimplify = 0;
|
||||||
|
long simplifyTimeMs = 0;
|
||||||
|
long dbQueryTimeMs = 0;
|
||||||
|
// [BENCHMARK] DB 커넥션 세분화 카운터 (기존 단일 dbConnectionCount 대체)
|
||||||
|
int connViewportPass1 = 0; // collectViewportVesselIds DB 쿼리
|
||||||
|
int connDailyPages = 0; // streamDailyTableWithPagination 페이지
|
||||||
|
int connVesselInfo = 0; // preloadVesselInfoWithSessionCache 배치
|
||||||
|
int connHourly5min = 0; // processTableRange (5min/hourly)
|
||||||
|
int connTableCheck = 0; // hasDataInTable 존재 검증
|
||||||
|
Integer zoomLevel;
|
||||||
|
|
||||||
|
String determinePath() {
|
||||||
|
if (cacheHitDays > 0 && dbQueryDays > 0) return "HYBRID";
|
||||||
|
if (cacheHitDays > 0) return "CACHE";
|
||||||
|
return "DB";
|
||||||
|
}
|
||||||
|
|
||||||
|
int dbConnectionTotal() { // [BENCHMARK]
|
||||||
|
return connViewportPass1 + connDailyPages + connVesselInfo + connHourly5min + connTableCheck;
|
||||||
|
}
|
||||||
|
|
||||||
|
String toJson(String queryId, int uniqueVessels, long totalElapsedMs) {
|
||||||
|
return String.format(
|
||||||
|
"{\"queryId\":\"%s\",\"timestamp\":\"%s\",\"path\":\"%s\"," +
|
||||||
|
"\"zoomLevel\":%s,\"dateRanges\":%d,\"cacheHitDays\":%d,\"dbQueryDays\":%d," +
|
||||||
|
"\"totalTracks\":%d,\"totalPointsBefore\":%d,\"totalPointsAfter\":%d," +
|
||||||
|
"\"pointReductionPct\":%d,\"totalBatches\":%d,\"batchesBeforeSimplify\":%d," +
|
||||||
|
"\"simplifyTimeMs\":%d,\"dbQueryTimeMs\":%d,\"totalElapsedMs\":%d," +
|
||||||
|
"\"dbConnectionTotal\":%d,\"dbConnViewportPass1\":%d,\"dbConnDailyPages\":%d," +
|
||||||
|
"\"dbConnVesselInfo\":%d,\"dbConnHourly5min\":%d,\"dbConnTableCheck\":%d," +
|
||||||
|
"\"uniqueVessels\":%d}",
|
||||||
|
queryId,
|
||||||
|
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),
|
||||||
|
determinePath(),
|
||||||
|
zoomLevel != null ? zoomLevel.toString() : "null",
|
||||||
|
cacheHitDays + dbQueryDays,
|
||||||
|
cacheHitDays, dbQueryDays,
|
||||||
|
totalTracks, totalPointsBefore, totalPointsAfter,
|
||||||
|
totalPointsBefore > 0 ? Math.round((1 - (double) totalPointsAfter / totalPointsBefore) * 100) : 0,
|
||||||
|
totalBatches, batchesBeforeSimplify,
|
||||||
|
simplifyTimeMs, dbQueryTimeMs, totalElapsedMs,
|
||||||
|
dbConnectionTotal(), connViewportPass1, connDailyPages,
|
||||||
|
connVesselInfo, connHourly5min, connTableCheck,
|
||||||
|
uniqueVessels);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 테이블 범위 처리 - LineStringM을 압축된 배열로 변환
|
* 테이블 범위 처리 - LineStringM을 압축된 배열로 변환
|
||||||
*/
|
*/
|
||||||
@ -269,7 +329,11 @@ public class ChunkedTrackStreamingService {
|
|||||||
* 선박 정보 배치 프리로드 - 세션 캐시 사용 버전
|
* 선박 정보 배치 프리로드 - 세션 캐시 사용 버전
|
||||||
* 순서: 세션 캐시 → 전역 캐시 → DB 조회
|
* 순서: 세션 캐시 → 전역 캐시 → DB 조회
|
||||||
*/
|
*/
|
||||||
private void preloadVesselInfoWithSessionCache(Set<String> vesselIds, Map<String, VesselInfo> sessionCache) {
|
private void preloadVesselInfoWithSessionCache(Set<String> vesselIds, Map<String, VesselInfo> sessionCache) { // [BENCHMARK] benchmark 파라미터 오버로드 아래 참조
|
||||||
|
preloadVesselInfoWithSessionCache(vesselIds, sessionCache, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void preloadVesselInfoWithSessionCache(Set<String> vesselIds, Map<String, VesselInfo> sessionCache, QueryBenchmark benchmark) { // [BENCHMARK]
|
||||||
// 세션 캐시와 전역 캐시에 없는 선박만 필터링
|
// 세션 캐시와 전역 캐시에 없는 선박만 필터링
|
||||||
List<String> uncachedIds = vesselIds.stream()
|
List<String> uncachedIds = vesselIds.stream()
|
||||||
.filter(id -> {
|
.filter(id -> {
|
||||||
@ -324,6 +388,7 @@ public class ChunkedTrackStreamingService {
|
|||||||
vesselInfoCache.put(visselId, info);
|
vesselInfoCache.put(visselId, info);
|
||||||
foundIds.add(visselId);
|
foundIds.add(visselId);
|
||||||
}, batch.toArray());
|
}, batch.toArray());
|
||||||
|
if (benchmark != null) benchmark.connVesselInfo++; // [BENCHMARK]
|
||||||
|
|
||||||
// DB에 없는 선박은 기본값으로 세션 캐시에 저장 (전역 캐시에는 저장 안함)
|
// DB에 없는 선박은 기본값으로 세션 캐시에 저장 (전역 캐시에는 저장 안함)
|
||||||
for (String visselId : batch) {
|
for (String visselId : batch) {
|
||||||
@ -414,7 +479,11 @@ public class ChunkedTrackStreamingService {
|
|||||||
* 전체 쿼리 시간 범위에서 뷰포트 영역을 지나는 모든 선박을 식별하여,
|
* 전체 쿼리 시간 범위에서 뷰포트 영역을 지나는 모든 선박을 식별하여,
|
||||||
* Pass 2에서 해당 선박의 전체 항적(뷰포트 밖 포함)을 조회할 수 있도록 함
|
* Pass 2에서 해당 선박의 전체 항적(뷰포트 밖 포함)을 조회할 수 있도록 함
|
||||||
*/
|
*/
|
||||||
private Set<String> collectViewportVesselIds(Map<TableStrategy, List<TimeRange>> strategyMap, TrackQueryRequest request) {
|
private Set<String> collectViewportVesselIds(Map<TableStrategy, List<TimeRange>> strategyMap, TrackQueryRequest request) { // [BENCHMARK] benchmark 파라미터 오버로드 아래 참조
|
||||||
|
return collectViewportVesselIds(strategyMap, request, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<String> collectViewportVesselIds(Map<TableStrategy, List<TimeRange>> strategyMap, TrackQueryRequest request, QueryBenchmark benchmark) { // [BENCHMARK]
|
||||||
ViewportFilter viewport = request.getViewport();
|
ViewportFilter viewport = request.getViewport();
|
||||||
if (viewport == null || !viewport.isValid()) {
|
if (viewport == null || !viewport.isValid()) {
|
||||||
return null;
|
return null;
|
||||||
@ -473,6 +542,7 @@ public class ChunkedTrackStreamingService {
|
|||||||
vesselIds.add(rs.getString("sig_src_cd") + "_" + rs.getString("target_id"));
|
vesselIds.add(rs.getString("sig_src_cd") + "_" + rs.getString("target_id"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (benchmark != null) benchmark.connViewportPass1++; // [BENCHMARK]
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
log.error("Error collecting viewport vessel IDs from {}: {}", tableName, e.getMessage());
|
log.error("Error collecting viewport vessel IDs from {}: {}", tableName, e.getMessage());
|
||||||
}
|
}
|
||||||
@ -935,6 +1005,10 @@ public class ChunkedTrackStreamingService {
|
|||||||
queryStartTime = System.currentTimeMillis();
|
queryStartTime = System.currentTimeMillis();
|
||||||
processedTimeRanges.clear();
|
processedTimeRanges.clear();
|
||||||
|
|
||||||
|
// [BENCHMARK] 벤치마크 지표 초기화
|
||||||
|
QueryBenchmark benchmark = new QueryBenchmark();
|
||||||
|
benchmark.zoomLevel = request.getZoomLevel();
|
||||||
|
|
||||||
// 백프레셔 메트릭스 초기화
|
// 백프레셔 메트릭스 초기화
|
||||||
BackpressureMetrics metrics = new BackpressureMetrics();
|
BackpressureMetrics metrics = new BackpressureMetrics();
|
||||||
queryMetrics.put(queryId, metrics);
|
queryMetrics.put(queryId, metrics);
|
||||||
@ -945,7 +1019,7 @@ public class ChunkedTrackStreamingService {
|
|||||||
|
|
||||||
// 시간 범위별 테이블 전략 분할
|
// 시간 범위별 테이블 전략 분할
|
||||||
Map<TableStrategy, List<TimeRange>> strategyMap = splitTimeRangeByStrategy(
|
Map<TableStrategy, List<TimeRange>> strategyMap = splitTimeRangeByStrategy(
|
||||||
request.getStartTime(), request.getEndTime()
|
request.getStartTime(), request.getEndTime(), benchmark // [BENCHMARK]
|
||||||
);
|
);
|
||||||
|
|
||||||
// 전체 시간 계산
|
// 전체 시간 계산
|
||||||
@ -953,7 +1027,7 @@ public class ChunkedTrackStreamingService {
|
|||||||
log.info("Total time range: {} minutes", estimatedTotalMinutes);
|
log.info("Total time range: {} minutes", estimatedTotalMinutes);
|
||||||
|
|
||||||
// 2-pass 뷰포트 필터링: 전체 시간 범위에서 뷰포트 교차 선박 수집
|
// 2-pass 뷰포트 필터링: 전체 시간 범위에서 뷰포트 교차 선박 수집
|
||||||
Set<String> viewportVesselIds = collectViewportVesselIds(strategyMap, request);
|
Set<String> viewportVesselIds = collectViewportVesselIds(strategyMap, request, benchmark); // [BENCHMARK]
|
||||||
|
|
||||||
int globalChunkIndex = 0;
|
int globalChunkIndex = 0;
|
||||||
int totalVessels = 0;
|
int totalVessels = 0;
|
||||||
@ -976,7 +1050,7 @@ public class ChunkedTrackStreamingService {
|
|||||||
if (strategy == TableStrategy.DAILY) {
|
if (strategy == TableStrategy.DAILY) {
|
||||||
// Daily는 기존 방식 유지 (이미 일 단위)
|
// Daily는 기존 방식 유지 (이미 일 단위)
|
||||||
processDailyStrategy(ranges, request, queryId, chunkConsumer, statusConsumer,
|
processDailyStrategy(ranges, request, queryId, chunkConsumer, statusConsumer,
|
||||||
globalChunkIndex, uniqueVesselIds, viewportVesselIds);
|
globalChunkIndex, uniqueVesselIds, viewportVesselIds, benchmark);
|
||||||
globalChunkIndex = getCurrentChunkIndex();
|
globalChunkIndex = getCurrentChunkIndex();
|
||||||
} else {
|
} else {
|
||||||
// Hourly/5min은 6시간 단위로 그룹화하여 처리
|
// Hourly/5min은 6시간 단위로 그룹화하여 처리
|
||||||
@ -1000,6 +1074,7 @@ public class ChunkedTrackStreamingService {
|
|||||||
|
|
||||||
List<CompactVesselTrack> compactTracks = processTableRangeWithBaseTime(
|
List<CompactVesselTrack> compactTracks = processTableRangeWithBaseTime(
|
||||||
request, strategy, range, baseTime, viewportVesselIds);
|
request, strategy, range, baseTime, viewportVesselIds);
|
||||||
|
if (benchmark != null) benchmark.connHourly5min++; // [BENCHMARK]
|
||||||
|
|
||||||
// 선박별로 볕합
|
// 선박별로 볕합
|
||||||
for (CompactVesselTrack track : compactTracks) {
|
for (CompactVesselTrack track : compactTracks) {
|
||||||
@ -1217,6 +1292,15 @@ public class ChunkedTrackStreamingService {
|
|||||||
chunkConsumer.accept(lastChunkMarker);
|
chunkConsumer.accept(lastChunkMarker);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// [BENCHMARK] 벤치마크 JSON 기록
|
||||||
|
long totalElapsedMs = System.currentTimeMillis() - queryStartTime;
|
||||||
|
benchmark.totalBatches = globalChunkIndex;
|
||||||
|
try {
|
||||||
|
benchmarkLog.info(benchmark.toJson(queryId, uniqueVesselIds.size(), totalElapsedMs));
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.debug("Failed to write benchmark log: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
log.info("Query {} completed: {} chunks, {} unique vessels",
|
log.info("Query {} completed: {} chunks, {} unique vessels",
|
||||||
queryId, globalChunkIndex, uniqueVesselIds.size());
|
queryId, globalChunkIndex, uniqueVesselIds.size());
|
||||||
|
|
||||||
@ -1390,7 +1474,11 @@ public class ChunkedTrackStreamingService {
|
|||||||
/**
|
/**
|
||||||
* 시간 범위별 테이블 선택 로직 (StompTrackStreamingService 참고)
|
* 시간 범위별 테이블 선택 로직 (StompTrackStreamingService 참고)
|
||||||
*/
|
*/
|
||||||
private Map<TableStrategy, List<TimeRange>> splitTimeRangeByStrategy(LocalDateTime start, LocalDateTime end) {
|
private Map<TableStrategy, List<TimeRange>> splitTimeRangeByStrategy(LocalDateTime start, LocalDateTime end) { // [BENCHMARK] benchmark 파라미터 오버로드 아래 참조
|
||||||
|
return splitTimeRangeByStrategy(start, end, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<TableStrategy, List<TimeRange>> splitTimeRangeByStrategy(LocalDateTime start, LocalDateTime end, QueryBenchmark benchmark) { // [BENCHMARK]
|
||||||
Map<TableStrategy, List<TimeRange>> strategyMap = new LinkedHashMap<>();
|
Map<TableStrategy, List<TimeRange>> strategyMap = new LinkedHashMap<>();
|
||||||
LocalDateTime now = LocalDateTime.now();
|
LocalDateTime now = LocalDateTime.now();
|
||||||
|
|
||||||
@ -1456,6 +1544,7 @@ public class ChunkedTrackStreamingService {
|
|||||||
|
|
||||||
// 각 일별 범위에 대해 데이터 존재 여부 확인
|
// 각 일별 범위에 대해 데이터 존재 여부 확인
|
||||||
for (TimeRange dailyRange : mergedDailyRanges) {
|
for (TimeRange dailyRange : mergedDailyRanges) {
|
||||||
|
if (benchmark != null) benchmark.connTableCheck++; // [BENCHMARK]
|
||||||
if (hasDataInTable(TableStrategy.DAILY.getTableName(), dailyRange.getStart(), dailyRange.getEnd())) {
|
if (hasDataInTable(TableStrategy.DAILY.getTableName(), dailyRange.getStart(), dailyRange.getEnd())) {
|
||||||
validDailyRanges.add(dailyRange);
|
validDailyRanges.add(dailyRange);
|
||||||
log.debug("Daily data found for range: {}", dailyRange);
|
log.debug("Daily data found for range: {}", dailyRange);
|
||||||
@ -1482,6 +1571,7 @@ public class ChunkedTrackStreamingService {
|
|||||||
List<TimeRange> fallbackTo5Min = new ArrayList<>();
|
List<TimeRange> fallbackTo5Min = new ArrayList<>();
|
||||||
|
|
||||||
for (TimeRange range : hourlyRanges) {
|
for (TimeRange range : hourlyRanges) {
|
||||||
|
if (benchmark != null) benchmark.connTableCheck++; // [BENCHMARK]
|
||||||
if (hasDataInTable(TableStrategy.HOURLY.getTableName(), range.getStart(), range.getEnd())) {
|
if (hasDataInTable(TableStrategy.HOURLY.getTableName(), range.getStart(), range.getEnd())) {
|
||||||
validHourlyRanges.add(range);
|
validHourlyRanges.add(range);
|
||||||
} else {
|
} else {
|
||||||
@ -2403,7 +2493,8 @@ public class ChunkedTrackStreamingService {
|
|||||||
Consumer<QueryStatusUpdate> statusConsumer,
|
Consumer<QueryStatusUpdate> statusConsumer,
|
||||||
int startChunkIndex,
|
int startChunkIndex,
|
||||||
Set<String> uniqueVesselIds,
|
Set<String> uniqueVesselIds,
|
||||||
Set<String> viewportVesselIds) throws Exception {
|
Set<String> viewportVesselIds,
|
||||||
|
QueryBenchmark benchmark) throws Exception {
|
||||||
|
|
||||||
currentGlobalChunkIndex = startChunkIndex;
|
currentGlobalChunkIndex = startChunkIndex;
|
||||||
|
|
||||||
@ -2436,21 +2527,58 @@ public class ChunkedTrackStreamingService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!cachedTracks.isEmpty()) {
|
if (!cachedTracks.isEmpty()) {
|
||||||
|
// 간소화 전 지표 측정
|
||||||
|
int preTrackCount = cachedTracks.size();
|
||||||
|
int prePointCount = cachedTracks.stream()
|
||||||
|
.mapToInt(t -> t.getGeometry() != null ? t.getGeometry().size() : 0).sum();
|
||||||
|
List<List<CompactVesselTrack>> preBatches = splitByMessageSize(cachedTracks);
|
||||||
|
int preBatchCount = preBatches.size();
|
||||||
|
|
||||||
|
// 방어적 복사 후 간소화 (캐시 원본 보호)
|
||||||
|
long simplifyStart = System.currentTimeMillis();
|
||||||
|
cachedTracks = cachedTracks.stream()
|
||||||
|
.map(t -> t.toBuilder().build())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
cachedTracks = cacheTrackSimplifier.simplify(cachedTracks, request.getZoomLevel());
|
||||||
|
long simplifyElapsed = System.currentTimeMillis() - simplifyStart;
|
||||||
|
|
||||||
|
// 간소화 후 지표 측정
|
||||||
|
int postPointCount = cachedTracks.stream()
|
||||||
|
.mapToInt(t -> t.getGeometry() != null ? t.getGeometry().size() : 0).sum();
|
||||||
|
|
||||||
// 메시지 크기로 분할하여 전송
|
// 메시지 크기로 분할하여 전송
|
||||||
List<List<CompactVesselTrack>> batches = splitByMessageSize(cachedTracks);
|
List<List<CompactVesselTrack>> batches = splitByMessageSize(cachedTracks);
|
||||||
for (List<CompactVesselTrack> batch : batches) {
|
for (List<CompactVesselTrack> batch : batches) {
|
||||||
sendChunkResponse(batch, queryId, chunkConsumer, uniqueVesselIds);
|
sendChunkResponse(batch, queryId, chunkConsumer, uniqueVesselIds);
|
||||||
}
|
}
|
||||||
log.info("Daily cache served {} tracks for {} in {} batches",
|
|
||||||
cachedTracks.size(), rangeDate, batches.size());
|
log.info("[CacheHIT] date={}, zoom={}, tracks={}, points: {} -> {} ({}% 감소), batches: {} -> {} ({}% 감소), simplify: {}ms",
|
||||||
|
rangeDate, request.getZoomLevel(), preTrackCount,
|
||||||
|
prePointCount, postPointCount,
|
||||||
|
prePointCount > 0 ? Math.round((1 - (double) postPointCount / prePointCount) * 100) : 0,
|
||||||
|
preBatchCount, batches.size(),
|
||||||
|
preBatchCount > 0 ? Math.round((1 - (double) batches.size() / preBatchCount) * 100) : 0,
|
||||||
|
simplifyElapsed);
|
||||||
|
|
||||||
|
// [BENCHMARK] 벤치마크 누적 (캐시 경로)
|
||||||
|
if (benchmark != null) {
|
||||||
|
benchmark.cacheHitDays++;
|
||||||
|
benchmark.totalTracks += preTrackCount;
|
||||||
|
benchmark.totalPointsBefore += prePointCount;
|
||||||
|
benchmark.totalPointsAfter += postPointCount;
|
||||||
|
benchmark.batchesBeforeSimplify += preBatchCount;
|
||||||
|
benchmark.simplifyTimeMs += simplifyElapsed;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// 캐시 미스: 기존 DB 페이지네이션으로 처리
|
// 캐시 미스: 기존 DB 페이지네이션으로 처리
|
||||||
if (dailyTrackCacheManager.isEnabled()) {
|
if (dailyTrackCacheManager.isEnabled()) {
|
||||||
log.info("Daily cache MISS for {}: falling back to DB", rangeDate);
|
log.info("Daily cache MISS for {}: falling back to DB", rangeDate);
|
||||||
}
|
}
|
||||||
streamDailyTableWithPagination(request, range, queryId, chunkConsumer, statusConsumer, uniqueVesselIds, sessionVesselCache, viewportVesselIds);
|
if (benchmark != null) {
|
||||||
|
benchmark.dbQueryDays++; // [BENCHMARK]
|
||||||
|
}
|
||||||
|
streamDailyTableWithPagination(request, range, queryId, chunkConsumer, statusConsumer, uniqueVesselIds, sessionVesselCache, viewportVesselIds, benchmark);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2489,7 +2617,8 @@ public class ChunkedTrackStreamingService {
|
|||||||
Consumer<QueryStatusUpdate> statusConsumer,
|
Consumer<QueryStatusUpdate> statusConsumer,
|
||||||
Set<String> uniqueVesselIds,
|
Set<String> uniqueVesselIds,
|
||||||
Map<String, VesselInfo> sessionVesselCache,
|
Map<String, VesselInfo> sessionVesselCache,
|
||||||
Set<String> viewportVesselIds) {
|
Set<String> viewportVesselIds,
|
||||||
|
QueryBenchmark benchmark) {
|
||||||
String tableName = TableStrategy.DAILY.getTableName();
|
String tableName = TableStrategy.DAILY.getTableName();
|
||||||
|
|
||||||
// 줌 레벨에 따른 강화된 간소화 tolerance
|
// 줌 레벨에 따른 강화된 간소화 tolerance
|
||||||
@ -2548,7 +2677,14 @@ public class ChunkedTrackStreamingService {
|
|||||||
long queryStartTime = System.currentTimeMillis();
|
long queryStartTime = System.currentTimeMillis();
|
||||||
try (ResultSet rs = ps.executeQuery()) {
|
try (ResultSet rs = ps.executeQuery()) {
|
||||||
long queryEndTime = System.currentTimeMillis();
|
long queryEndTime = System.currentTimeMillis();
|
||||||
log.info("Daily page {} query executed in {}ms", pageNum + 1, queryEndTime - queryStartTime);
|
long pageQueryTime = queryEndTime - queryStartTime;
|
||||||
|
log.info("Daily page {} query executed in {}ms", pageNum + 1, pageQueryTime);
|
||||||
|
|
||||||
|
// [BENCHMARK] DB 쿼리 시간 누적
|
||||||
|
if (benchmark != null) {
|
||||||
|
benchmark.dbQueryTimeMs += pageQueryTime;
|
||||||
|
benchmark.connDailyPages++; // [BENCHMARK]
|
||||||
|
}
|
||||||
|
|
||||||
int pageTrackCount = 0;
|
int pageTrackCount = 0;
|
||||||
String currentSigSrcCd = null;
|
String currentSigSrcCd = null;
|
||||||
@ -2582,7 +2718,7 @@ public class ChunkedTrackStreamingService {
|
|||||||
|
|
||||||
// 2단계: 선박 정보 배치 조회 (세션 캐시 → 전역 캐시 → DB 순서)
|
// 2단계: 선박 정보 배치 조회 (세션 캐시 → 전역 캐시 → DB 순서)
|
||||||
long batchStartTime = System.currentTimeMillis();
|
long batchStartTime = System.currentTimeMillis();
|
||||||
preloadVesselInfoWithSessionCache(vesselIdsInPage, sessionVesselCache);
|
preloadVesselInfoWithSessionCache(vesselIdsInPage, sessionVesselCache, benchmark); // [BENCHMARK]
|
||||||
long batchEndTime = System.currentTimeMillis();
|
long batchEndTime = System.currentTimeMillis();
|
||||||
log.info("Daily page {} vessel info batch loaded in {}ms (session cache size: {})",
|
log.info("Daily page {} vessel info batch loaded in {}ms (session cache size: {})",
|
||||||
pageNum + 1, batchEndTime - batchStartTime, sessionVesselCache.size());
|
pageNum + 1, batchEndTime - batchStartTime, sessionVesselCache.size());
|
||||||
@ -2701,6 +2837,15 @@ public class ChunkedTrackStreamingService {
|
|||||||
|
|
||||||
totalVesselsSent += pageTracks.size();
|
totalVesselsSent += pageTracks.size();
|
||||||
|
|
||||||
|
// [BENCHMARK] DB 경로 포인트 수 누적
|
||||||
|
if (benchmark != null) {
|
||||||
|
int pagePoints = pageTracks.stream()
|
||||||
|
.mapToInt(t -> t.getPointCount() != null ? t.getPointCount() : 0).sum();
|
||||||
|
benchmark.totalTracks += pageTracks.size();
|
||||||
|
benchmark.totalPointsBefore += pagePoints;
|
||||||
|
benchmark.totalPointsAfter += pagePoints; // DB 경로는 ST_Simplify 적용 후이므로 동일
|
||||||
|
}
|
||||||
|
|
||||||
// 메시지 크기로 분할하여 전송
|
// 메시지 크기로 분할하여 전송
|
||||||
List<List<CompactVesselTrack>> batches = splitByMessageSize(pageTracks);
|
List<List<CompactVesselTrack>> batches = splitByMessageSize(pageTracks);
|
||||||
for (List<CompactVesselTrack> batch : batches) {
|
for (List<CompactVesselTrack> batch : batches) {
|
||||||
|
|||||||
@ -2,6 +2,7 @@ package gc.mda.signal_batch.global.websocket.service;
|
|||||||
|
|
||||||
import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack;
|
import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack;
|
||||||
import gc.mda.signal_batch.global.config.DailyTrackCacheProperties;
|
import gc.mda.signal_batch.global.config.DailyTrackCacheProperties;
|
||||||
|
import gc.mda.signal_batch.global.util.NationalCodeUtil;
|
||||||
import gc.mda.signal_batch.global.util.ShipKindCodeConverter;
|
import gc.mda.signal_batch.global.util.ShipKindCodeConverter;
|
||||||
import gc.mda.signal_batch.domain.vessel.service.IntegrationVesselService;
|
import gc.mda.signal_batch.domain.vessel.service.IntegrationVesselService;
|
||||||
import gc.mda.signal_batch.domain.vessel.dto.IntegrationVessel;
|
import gc.mda.signal_batch.domain.vessel.dto.IntegrationVessel;
|
||||||
@ -310,6 +311,9 @@ public class DailyTrackCacheManager {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 선박 정보 일괄 보강 (t_vessel_latest_position에서 shipName, shipType 조회)
|
||||||
|
enrichVesselInfo(vesselMap);
|
||||||
|
|
||||||
// VesselAccumulator → CompactVesselTrack 변환
|
// VesselAccumulator → CompactVesselTrack 변환
|
||||||
Map<String, CompactVesselTrack> tracks = new HashMap<>(vesselMap.size());
|
Map<String, CompactVesselTrack> tracks = new HashMap<>(vesselMap.size());
|
||||||
for (Map.Entry<String, VesselAccumulator> entry : vesselMap.entrySet()) {
|
for (Map.Entry<String, VesselAccumulator> entry : vesselMap.entrySet()) {
|
||||||
@ -318,9 +322,13 @@ public class DailyTrackCacheManager {
|
|||||||
|
|
||||||
double avgSpeed = acc.pointCount > 0 ? acc.totalDistance / Math.max(1, acc.pointCount) * 60 : 0;
|
double avgSpeed = acc.pointCount > 0 ? acc.totalDistance / Math.max(1, acc.pointCount) * 60 : 0;
|
||||||
|
|
||||||
// shipKindCode 계산
|
// shipKindCode 계산 (선박명 패턴 매칭 포함)
|
||||||
String shipKindCode = ShipKindCodeConverter.getShipKindCodeWithNamePattern(
|
String shipKindCode = ShipKindCodeConverter.getShipKindCodeWithNamePattern(
|
||||||
acc.sigSrcCd, null, null, acc.targetId);
|
acc.sigSrcCd, acc.shipType, acc.shipName, acc.targetId);
|
||||||
|
|
||||||
|
// nationalCode 계산
|
||||||
|
String nationalCode = NationalCodeUtil.calculateNationalCode(
|
||||||
|
acc.sigSrcCd, acc.targetId);
|
||||||
|
|
||||||
// 통합선박 ID 조회
|
// 통합선박 ID 조회
|
||||||
String integrationTargetId = null;
|
String integrationTargetId = null;
|
||||||
@ -335,6 +343,9 @@ public class DailyTrackCacheManager {
|
|||||||
.vesselId(entry.getKey())
|
.vesselId(entry.getKey())
|
||||||
.sigSrcCd(acc.sigSrcCd)
|
.sigSrcCd(acc.sigSrcCd)
|
||||||
.targetId(acc.targetId)
|
.targetId(acc.targetId)
|
||||||
|
.nationalCode(nationalCode)
|
||||||
|
.shipName(acc.shipName)
|
||||||
|
.shipType(acc.shipType)
|
||||||
.shipKindCode(shipKindCode)
|
.shipKindCode(shipKindCode)
|
||||||
.integrationTargetId(integrationTargetId)
|
.integrationTargetId(integrationTargetId)
|
||||||
.geometry(acc.geometry)
|
.geometry(acc.geometry)
|
||||||
@ -632,6 +643,49 @@ public class DailyTrackCacheManager {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 선박 정보 일괄 보강 (t_vessel_latest_position에서 ship_nm, ship_ty 조회)
|
||||||
|
* IN 절 1000건 배치로 처리
|
||||||
|
*/
|
||||||
|
private void enrichVesselInfo(Map<String, VesselAccumulator> vesselMap) {
|
||||||
|
List<String> vesselIds = new ArrayList<>(vesselMap.keySet());
|
||||||
|
int batchSize = 1000;
|
||||||
|
int enriched = 0;
|
||||||
|
|
||||||
|
for (int i = 0; i < vesselIds.size(); i += batchSize) {
|
||||||
|
List<String> batch = vesselIds.subList(i, Math.min(i + batchSize, vesselIds.size()));
|
||||||
|
|
||||||
|
try (Connection conn = queryDataSource.getConnection()) {
|
||||||
|
String placeholders = batch.stream().map(id -> "?").collect(Collectors.joining(","));
|
||||||
|
String sql = "SELECT sig_src_cd, target_id, ship_nm, ship_ty " +
|
||||||
|
"FROM signal.t_vessel_latest_position " +
|
||||||
|
"WHERE sig_src_cd || '_' || target_id IN (" + placeholders + ")";
|
||||||
|
|
||||||
|
try (PreparedStatement ps = conn.prepareStatement(sql)) {
|
||||||
|
for (int j = 0; j < batch.size(); j++) {
|
||||||
|
ps.setString(j + 1, batch.get(j));
|
||||||
|
}
|
||||||
|
|
||||||
|
try (ResultSet rs = ps.executeQuery()) {
|
||||||
|
while (rs.next()) {
|
||||||
|
String vesselId = rs.getString("sig_src_cd") + "_" + rs.getString("target_id");
|
||||||
|
VesselAccumulator acc = vesselMap.get(vesselId);
|
||||||
|
if (acc != null) {
|
||||||
|
acc.shipName = rs.getString("ship_nm");
|
||||||
|
acc.shipType = rs.getString("ship_ty");
|
||||||
|
enriched++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("Failed to enrich vessel info batch {}-{}: {}", i, i + batch.size(), e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("Enriched vessel info: {}/{} vessels", enriched, vesselMap.size());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 항적 맵에서 STRtree 공간 인덱스 빌드
|
* 항적 맵에서 STRtree 공간 인덱스 빌드
|
||||||
*/
|
*/
|
||||||
@ -661,6 +715,8 @@ public class DailyTrackCacheManager {
|
|||||||
private static class VesselAccumulator {
|
private static class VesselAccumulator {
|
||||||
String sigSrcCd;
|
String sigSrcCd;
|
||||||
String targetId;
|
String targetId;
|
||||||
|
String shipName;
|
||||||
|
String shipType;
|
||||||
List<double[]> geometry = new ArrayList<>(500);
|
List<double[]> geometry = new ArrayList<>(500);
|
||||||
List<String> timestamps = new ArrayList<>(500);
|
List<String> timestamps = new ArrayList<>(500);
|
||||||
List<Double> speeds = new ArrayList<>(500);
|
List<Double> speeds = new ArrayList<>(500);
|
||||||
|
|||||||
@ -23,8 +23,8 @@ spring:
|
|||||||
connection-timeout: 30000 # 원격 연결이므로 타임아웃 증가
|
connection-timeout: 30000 # 원격 연결이므로 타임아웃 증가
|
||||||
idle-timeout: 600000
|
idle-timeout: 600000
|
||||||
max-lifetime: 1800000
|
max-lifetime: 1800000
|
||||||
maximum-pool-size: 20 # 10 -> 20 증가
|
maximum-pool-size: 30 # 10 -> 30 증가
|
||||||
minimum-idle: 5 # 2 -> 5 증가
|
minimum-idle: 10 # 2 -> 10 증가
|
||||||
# 원격 연결 안정성을 위한 추가 설정
|
# 원격 연결 안정성을 위한 추가 설정
|
||||||
connection-test-query: SELECT 1
|
connection-test-query: SELECT 1
|
||||||
validation-timeout: 5000
|
validation-timeout: 5000
|
||||||
@ -42,8 +42,8 @@ spring:
|
|||||||
connection-timeout: 5000
|
connection-timeout: 5000
|
||||||
idle-timeout: 600000
|
idle-timeout: 600000
|
||||||
max-lifetime: 1800000
|
max-lifetime: 1800000
|
||||||
maximum-pool-size: 60 # 20 -> 40 증가
|
maximum-pool-size: 90 # 20 -> 90 증가
|
||||||
minimum-idle: 10 # 5 -> 10 증가
|
minimum-idle: 15 # 5 -> 15 증가
|
||||||
connection-test-query: SELECT 1
|
connection-test-query: SELECT 1
|
||||||
validation-timeout: 5000
|
validation-timeout: 5000
|
||||||
leak-detection-threshold: 60000 # 커넥션 누수 감지 (60초)
|
leak-detection-threshold: 60000 # 커넥션 누수 감지 (60초)
|
||||||
@ -262,6 +262,34 @@ vessel: # spring 하위가 아닌 최상위 레벨
|
|||||||
t_abnormal_tracks:
|
t_abnormal_tracks:
|
||||||
retention-months: 0 # 비정상 항적: 무한 보관
|
retention-months: 0 # 비정상 항적: 무한 보관
|
||||||
|
|
||||||
|
# 일일 항적 데이터 인메모리 캐시
|
||||||
|
cache:
|
||||||
|
daily-track:
|
||||||
|
enabled: true
|
||||||
|
retention-days: 7 # D-1 ~ D-7 (오늘 제외)
|
||||||
|
max-memory-gb: 6 # 최대 6GB
|
||||||
|
warmup-async: true # 비동기 워밍업 (배치 Job과 경합 방지)
|
||||||
|
|
||||||
|
# WebSocket 부하 제어 설정
|
||||||
|
websocket:
|
||||||
|
query:
|
||||||
|
max-concurrent-global: 40 # 배치 서버이므로 동시 쿼리 제한 (prod 60 대비 보수적)
|
||||||
|
max-per-session: 15 # 세션당 동시 쿼리 상한
|
||||||
|
queue-timeout-seconds: 30 # 글로벌 대기 큐 타임아웃
|
||||||
|
session:
|
||||||
|
idle-timeout-ms: 15000
|
||||||
|
server-heartbeat-ms: 5000
|
||||||
|
client-heartbeat-ms: 5000
|
||||||
|
sockjs-disconnect-delay-ms: 5000
|
||||||
|
send-time-limit-seconds: 30
|
||||||
|
|
||||||
|
# REST V2 부하 제어 설정
|
||||||
|
rest:
|
||||||
|
v2:
|
||||||
|
query:
|
||||||
|
timeout-seconds: 30 # 슬롯 대기 타임아웃 (초)
|
||||||
|
max-total-points: 500000 # 응답 총 포인트 상한
|
||||||
|
|
||||||
# 액추에이터 설정
|
# 액추에이터 설정
|
||||||
management:
|
management:
|
||||||
endpoints:
|
endpoints:
|
||||||
|
|||||||
@ -288,6 +288,13 @@ websocket:
|
|||||||
sockjs-disconnect-delay-ms: 5000 # SockJS 해제 지연 5초 (30s → 5s)
|
sockjs-disconnect-delay-ms: 5000 # SockJS 해제 지연 5초 (30s → 5s)
|
||||||
send-time-limit-seconds: 30 # 메시지 전송 시간 제한 30초 (120s → 30s)
|
send-time-limit-seconds: 30 # 메시지 전송 시간 제한 30초 (120s → 30s)
|
||||||
|
|
||||||
|
# REST V2 부하 제어 설정
|
||||||
|
rest:
|
||||||
|
v2:
|
||||||
|
query:
|
||||||
|
timeout-seconds: 30 # 슬롯 대기 타임아웃 (초)
|
||||||
|
max-total-points: 500000 # 응답 총 포인트 상한
|
||||||
|
|
||||||
# 액추에이터 설정
|
# 액추에이터 설정
|
||||||
management:
|
management:
|
||||||
endpoints:
|
endpoints:
|
||||||
|
|||||||
@ -89,6 +89,34 @@ vessel:
|
|||||||
page-size: 5000
|
page-size: 5000
|
||||||
partition-size: 12
|
partition-size: 12
|
||||||
|
|
||||||
|
# 일일 항적 데이터 인메모리 캐시 (조회 전용 서버의 핵심 성능 설정)
|
||||||
|
cache:
|
||||||
|
daily-track:
|
||||||
|
enabled: true
|
||||||
|
retention-days: 7 # D-1 ~ D-7 (오늘 제외)
|
||||||
|
max-memory-gb: 6 # 최대 6GB (조회 전용이므로 배치 없이 메모리 여유)
|
||||||
|
warmup-async: true # 비동기 워밍업 (서버 시작 차단 없음)
|
||||||
|
|
||||||
|
# WebSocket 부하 제어 설정
|
||||||
|
websocket:
|
||||||
|
query:
|
||||||
|
max-concurrent-global: 40 # 조회 전용 서버 (배치 없으므로 prod-mpr보다 여유)
|
||||||
|
max-per-session: 15 # 세션당 동시 쿼리 상한
|
||||||
|
queue-timeout-seconds: 30 # 글로벌 대기 큐 타임아웃
|
||||||
|
session:
|
||||||
|
idle-timeout-ms: 15000
|
||||||
|
server-heartbeat-ms: 5000
|
||||||
|
client-heartbeat-ms: 5000
|
||||||
|
sockjs-disconnect-delay-ms: 5000
|
||||||
|
send-time-limit-seconds: 30
|
||||||
|
|
||||||
|
# REST V2 부하 제어 설정
|
||||||
|
rest:
|
||||||
|
v2:
|
||||||
|
query:
|
||||||
|
timeout-seconds: 30 # 슬롯 대기 타임아웃 (초)
|
||||||
|
max-total-points: 500000 # 응답 총 포인트 상한
|
||||||
|
|
||||||
# 액추에이터 설정
|
# 액추에이터 설정
|
||||||
management:
|
management:
|
||||||
endpoints:
|
endpoints:
|
||||||
|
|||||||
@ -65,6 +65,25 @@
|
|||||||
<appender-ref ref="WEBSOCKET_USAGE_FILE"/>
|
<appender-ref ref="WEBSOCKET_USAGE_FILE"/>
|
||||||
</logger>
|
</logger>
|
||||||
|
|
||||||
|
<!-- [BENCHMARK] 캐시 벤치마크 로그 전용 Appender - 벤치마크 제거 시 이 앱펜더와 아래 로거 삭제 -->
|
||||||
|
<appender name="CACHE_BENCHMARK_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||||
|
<file>${LOG_PATH}/cache-benchmark.log</file>
|
||||||
|
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
|
||||||
|
<fileNamePattern>${LOG_PATH}/cache-benchmark.log.%d{yyyy-MM-dd}.%i.gz</fileNamePattern>
|
||||||
|
<maxFileSize>200MB</maxFileSize>
|
||||||
|
<maxHistory>30</maxHistory>
|
||||||
|
<totalSizeCap>5GB</totalSizeCap>
|
||||||
|
</rollingPolicy>
|
||||||
|
<encoder>
|
||||||
|
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} - %msg%n</pattern>
|
||||||
|
</encoder>
|
||||||
|
</appender>
|
||||||
|
|
||||||
|
<!-- [BENCHMARK] CACHE_BENCHMARK 로거 (별도 파일에만 기록, 다른 로거로 전파 안함) - 벤치마크 제거 시 삭제 -->
|
||||||
|
<logger name="CACHE_BENCHMARK" level="INFO" additivity="false">
|
||||||
|
<appender-ref ref="CACHE_BENCHMARK_FILE"/>
|
||||||
|
</logger>
|
||||||
|
|
||||||
<!-- Root 로거 -->
|
<!-- Root 로거 -->
|
||||||
<root level="INFO">
|
<root level="INFO">
|
||||||
<appender-ref ref="CONSOLE"/>
|
<appender-ref ref="CONSOLE"/>
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
package gc.mda.signal_batch;
|
package gc.mda.signal_batch;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Disabled;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.springframework.boot.test.context.SpringBootTest;
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
import org.springframework.test.context.ActiveProfiles;
|
import org.springframework.test.context.ActiveProfiles;
|
||||||
@ -15,6 +16,7 @@ import gc.mda.signal_batch.global.websocket.interceptor.TrackQueryInterceptor;
|
|||||||
"scheduler.enabled=false",
|
"scheduler.enabled=false",
|
||||||
"spring.websocket.enabled=false"
|
"spring.websocket.enabled=false"
|
||||||
})
|
})
|
||||||
|
@Disabled("원격 DB 연결 필요 - 로컬 빌드 시 스킵")
|
||||||
@ActiveProfiles("test")
|
@ActiveProfiles("test")
|
||||||
class SignalBatchApplicationTests {
|
class SignalBatchApplicationTests {
|
||||||
|
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
package gc.mda.signal_batch.performance;
|
package gc.mda.signal_batch.performance;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.junit.jupiter.api.Disabled;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.test.context.SpringBootTest;
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
@ -17,6 +18,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
|
|||||||
* 실행: mvn test -Dtest=IndexStatusTest
|
* 실행: mvn test -Dtest=IndexStatusTest
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
|
@Disabled("원격 DB 연결 필요 - 수동 실행만 허용 (mvn test -Dtest=IndexStatusTest)")
|
||||||
@SpringBootTest
|
@SpringBootTest
|
||||||
@ActiveProfiles("dev")
|
@ActiveProfiles("dev")
|
||||||
public class IndexStatusTest {
|
public class IndexStatusTest {
|
||||||
|
|||||||
@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.*;
|
|||||||
|
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
|
@Disabled("부하 테스트 - 원격 서버 연결 필요, 수동 실행만 허용")
|
||||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||||
public class WebSocketLoadTest {
|
public class WebSocketLoadTest {
|
||||||
|
|
||||||
|
|||||||
불러오는 중...
Reference in New Issue
Block a user