diff --git a/.gitattributes b/.gitattributes index 3b41682..5232288 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1,2 +1,31 @@ +# 자동 줄바꿈 정규화 (checkout 시 OS에 맞게, commit 시 LF로 통일) +* text=auto + +# Java 소스 +*.java text +*.properties text +*.xml text +*.yml text +*.yaml text + +# Web 리소스 +*.html text +*.css text +*.js text +*.json text + +# Shell 스크립트 (Linux/Mac) +*.sh text eol=lf /mvnw text eol=lf + +# Windows 스크립트 *.cmd text eol=crlf +*.bat text eol=crlf + +# 바이너리 (줄바꿈 변환하지 않음) +*.jar binary +*.png binary +*.jpg binary +*.gif binary +*.ico binary +*.docx binary diff --git a/.gitignore b/.gitignore index d701169..98bed6e 100644 --- a/.gitignore +++ b/.gitignore @@ -34,7 +34,6 @@ build/ .vscode/ scripts/ .claude -.mvn/ logs/ *.log sql/ diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 0000000..2f94e61 --- /dev/null +++ b/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +wrapperVersion=3.3.2 +distributionType=only-script +distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.10/apache-maven-3.9.10-bin.zip diff --git a/docs/websocket-performance-improvement-report.md b/docs/websocket-performance-improvement-report.md index 89b5b31..d7fca5b 100644 --- a/docs/websocket-performance-improvement-report.md +++ b/docs/websocket-performance-improvement-report.md @@ -6,7 +6,7 @@ |------|------| | 작성일 | 2026-02-06 | | 대상 시스템 | Signal Batch — 선박 항적 조회/리플레이 WebSocket 서비스 | -| 운영 환경 | Linux, `vessel-batch-control.sh` → `run-on-query-server-dev.sh` (prod 프로파일) | +| 운영 환경 | Linux | | 문제 상황 | 다수 클라이언트의 리플레이 요청 동시 유입 시 서비스 장애 발생 | --- @@ -114,7 +114,7 @@ CompletableFuture.runAsync(() -> { **파일**: `global/config/AsyncConfig.java` ```java -// AS-IS: 고정 크기 스레드 풀 +// AS-IS: 소규모 스레드 풀 + 대용량 큐 @Bean(name = "trackStreamingExecutor") public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); @@ -124,6 +124,7 @@ public Executor getAsyncExecutor() { executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } +// TO-BE: core 40, max 120, queue 100 (대기열을 Semaphore로 대체) ``` #### (D) 정적 전송 지연 — `StompTrackStreamingService` @@ -347,8 +348,10 @@ CompletableFuture.runAsync(() -> { **TO-BE**: 서버 전체 동시 실행 쿼리 수를 Semaphore로 제한 + 대기 큐 구현 +> **구현 참고**: 실제 구현에서 글로벌 Semaphore는 `ActiveQueryManager`로 분리되었으며, `TrackQueryInterceptor`는 세션당 제한만 담당합니다. 아래 코드는 설계 의도를 보여주는 개념적 예시입니다. + ```java -// TO-BE: TrackQueryInterceptor.java — 글로벌 동시 제한 추가 +// TO-BE: TrackQueryInterceptor.java + ActiveQueryManager.java — 글로벌 동시 제한 추가 @Slf4j @Component @RequiredArgsConstructor @@ -357,10 +360,10 @@ public class TrackQueryInterceptor implements ChannelInterceptor { private final ConcurrentHashMap sessionQueries = new ConcurrentHashMap<>(); // ▼ [추가] 글로벌 동시 쿼리 제한 - @Value("${websocket.query.max-concurrent-global:20}") + @Value("${websocket.query.max-concurrent-global:60}") private int maxConcurrentGlobal; - @Value("${websocket.query.max-per-session:3}") + @Value("${websocket.query.max-per-session:20}") private int maxQueriesPerSession; @Value("${websocket.query.queue-timeout-seconds:30}") @@ -452,8 +455,8 @@ public class TrackQueryInterceptor implements ChannelInterceptor { ```yaml websocket: query: - max-concurrent-global: 20 # 서버 전체 동시 쿼리 상한 - max-per-session: 3 # 세션당 동시 쿼리 상한 + max-concurrent-global: 60 # 서버 전체 동시 쿼리 상한 + max-per-session: 20 # 세션당 동시 쿼리 상한 queue-timeout-seconds: 30 # 대기 큐 타임아웃 ``` @@ -710,8 +713,8 @@ public class WebSocketProperties { @Data public static class QueryProperties { - private int maxConcurrentGlobal = 20; - private int maxPerSession = 3; + private int maxConcurrentGlobal = 60; + private int maxPerSession = 20; private int queueTimeoutSeconds = 30; } @@ -741,8 +744,8 @@ public class WebSocketProperties { ```yaml websocket: query: - max-concurrent-global: 20 - max-per-session: 3 + max-concurrent-global: 60 + max-per-session: 20 queue-timeout-seconds: 30 transport: inbound-core-pool-size: 10 @@ -812,14 +815,14 @@ public class WebSocketMonitorController { ### 5.1 글로벌 동시 제한 수 결정 기준 ``` -maxConcurrentGlobal = min(DB커넥션풀 / 2, Async스레드풀max) - = min(60 / 2, 30) - = 20 (권장) +maxConcurrentGlobal = DB커넥션풀 / 평균쿼리당커넥션 + = 180 / 3 + = 60 ``` -- DB 커넥션 풀(60개)의 50%를 WebSocket 쿼리에 할당 -- 나머지 50%는 REST API, 배치 작업, 헬스체크 등에 예비 -- `trackStreamingExecutor` max(30) 이하로 설정하여 스레드 풀 포화 방지 +- DB 커넥션 풀(180개) 기준 쿼리당 평균 3개 커넥션 사용을 고려하여 60개 설정 +- REST API, 배치 작업 등은 별도 DataSource 풀(Collect 80, Batch 30)을 사용하므로 Query 풀 전체 활용 가능 +- `trackStreamingExecutor` max(120) 이내로 설정하여 스레드 풀 여유 확보 ### 5.2 대기 큐 타임아웃 @@ -880,10 +883,11 @@ tryAcquireSlotImmediate() | 설정 | AS-IS | TO-BE | 근거 | |------|-------|-------|------| -| Query DB 풀 | 120 | **180** | 동시 60쿼리 × 3커넥션 | -| max-concurrent-global | 30 | **60** | 180 / ~3 | +| Query DB 풀 | 60 | **180** | 동시 60쿼리 × 3커넥션 | +| max-concurrent-global | (없음) | **60** | 180 / ~3 | | max-per-session | 3 | **20** | 대기열 방식이므로 넉넉하게 | -| Executor max | 30 | **120** | 60실행 + 60대기 | +| Executor core/max | 15/30 | **40/120** | 60실행 + 60대기 | +| Executor queue | 500 | **100** | 대기열을 Semaphore로 대체 | | Session idle timeout | 60s | **15s** | 빠른 정리 | | Heartbeat | 10s/10s | **5s/5s** | 죽은 연결 빠른 감지 | | SockJS disconnect delay | 30s | **5s** | 빠른 해제 | @@ -925,7 +929,7 @@ DailyTrackCacheManager (@Service) |------|------| | Daily 테이블 1일분 | ~350MB (DB) | | 7일분 인메모리 추정 | ~4GB (Java 객체 오버헤드 포함) | -| 최대 메모리 한도 | 5GB (설정 가능) | +| 최대 메모리 한도 | 6GB (설정 가능) | | JVM 힙 (권장) | 12GB 이상 | ### 6.3 쿼리 라우팅 @@ -967,12 +971,59 @@ GET /api/websocket/daily-cache | 리소스 | AS-IS | TO-BE | 비고 | |--------|-------|-------|------| -| JVM Heap | 8~16GB | **12~16GB 권장** | 캐시 ~4GB + 운영 | -| DB Pool (Query) | max 120 | **max 180** | WebSocket + REST | -| DB Pool (Collect) | max 80 | max 80 | 배치 Reader | -| DB Pool (Batch) | max 30 | max 30 | 메타데이터 | -| max-concurrent-global | 30 | **60** | Query풀 180 / 3 | -| trackStreamingExecutor | core 15, max 30 | **core 40, max 120** | 대기열 + 실행 | +| JVM Heap | 8~16GB | **16~32GB** | 캐시 ~6GB + 운영 | +| DB Pool (Query) | max 60 | **max 180** | WebSocket + REST | +| DB Pool (Collect) | max 20 | **max 80** | 배치 Reader | +| DB Pool (Batch) | max 20 | **max 30** | 메타데이터 | +| max-concurrent-global | (없음) | **60** | Query풀 180 / 3 | +| trackStreamingExecutor | core 15, max 30, queue 500 | **core 40, max 120, queue 100** | 대기열 + 실행 | | Session idle timeout | 60s | **15s** | 빠른 정리 | | Heartbeat | 10s/10s | **5s/5s** | 빠른 감지 | -| Daily cache | - | **7일분 ~4GB** | 비동기 워밍업 | +| Send time limit | 120s | **30s** | 빠른 정리 | +| Daily cache | - | **7일분 ~6GB** | 비동기 워밍업 | + +## Phase 9. 인메모리 캐시 간소화 및 성능 정량 비교 + +### 9.1 개요 + +캐시 HIT 경로에 DB 경로 동등 수준의 3단계 간소화 파이프라인을 적용하여, +DB 커넥션 없이도 동일한 품질의 항적 데이터를 제공. + +**간소화 파이프라인**: Douglas-Peucker (DP) → 거리/시간 기반 샘플링 → 줌 레벨 샘플링 + +**벤치마크 로그**: `logs/cache-benchmark.log`에 DB/캐시 경로별 JSON 요약 자동 수집. + +### 9.2 AS-IS vs TO-BE 정량 비교 + +운영 환경에서 수집한 `cache-benchmark.log` 12건 기준 (2026-02-07 측정): + +| 지표 | AS-IS (DB 경로) | TO-BE (캐시 경로) | 개선율 | +|------|----------------|-------------------|--------| +| 응답 시간 (ms) | 7,221 ~ 8,195 | 575 ~ 1,439 | **5.7 ~ 12.6배** | +| 배치 전송 수 | 2 ~ 11 | 3 ~ 10 | 유사 (간소화로 감소) | +| DB 커넥션 사용 | 8 ~ 19건 | 2 ~ 3건 | **63 ~ 89% 절감** | +| DB 쿼리 시간 (ms) | 1,443 ~ 3,475 | 0 | **100% 절감** | +| 포인트 압축 | SQL ST_Simplify | 앱 레벨 95 ~ 99% | 동등 품질 | +| 간소화 CPU 시간 | - | 24 ~ 1,258ms | DB 대기 없음 | + +> 상세 데이터 및 경로별 분석은 별도 「일일 캐시 성능 벤치마크 보고서」 참조 + +### 9.3 데이터 품질 비교 + +- 간소화 후 포인트 수가 DB 경로(ST_Simplify)와 동등 수준 (99% 이상 감소) +- 선박 정보(`shipName`, `shipKindCode`, `nationalCode`) 정상 제공 확인 + - 캐시 빌드 시 `t_vessel_latest_position` 일괄 조회로 보강 + +### 9.4 선박 정보 보강 + +캐시 빌드(`DailyTrackCacheManager.loadDay()`) 시 `enrichVesselInfo()` 추가: +- `t_vessel_latest_position` 테이블에서 `ship_nm`, `ship_ty` IN 절 1000건 배치 조회 +- `nationalCode`: MMSI 기반 NationalCodeUtil 계산 +- `shipKindCode`: ShipKindCodeConverter에 shipType/shipName 전달하여 정확한 판별 + +### 9.5 시스템 효과 + +- **DB IO 제거**: 캐시 HIT 시 Query DB 커넥션 사용 0 → 커넥션 풀 여유 확보 +- **동시 사용자 수용**: DB 커넥션 경합 해소로 동시 처리 가능 수 증가 +- **간소화 비용**: 24~1,258ms (순수 CPU 연산, DB 대기 없음) +- **벤치마크 모니터링**: `cache-benchmark.log`로 경로별 성능 지표 상시 수집 diff --git a/mvnw b/mvnw old mode 100644 new mode 100755 diff --git a/src/main/java/gc/mda/signal_batch/domain/gis/controller/GisControllerV2.java b/src/main/java/gc/mda/signal_batch/domain/gis/controller/GisControllerV2.java index 81294d2..58960ca 100644 --- a/src/main/java/gc/mda/signal_batch/domain/gis/controller/GisControllerV2.java +++ b/src/main/java/gc/mda/signal_batch/domain/gis/controller/GisControllerV2.java @@ -181,7 +181,7 @@ public class GisControllerV2 { content = @Content(schema = @Schema(implementation = VesselTracksRequest.class)) ) @RequestBody VesselTracksRequest request) { - return gisService.getVesselTracks(request); + return gisServiceV2.getVesselTracksV2(request); } @GetMapping("/vessels/recent-positions") diff --git a/src/main/java/gc/mda/signal_batch/domain/gis/service/GisServiceV2.java b/src/main/java/gc/mda/signal_batch/domain/gis/service/GisServiceV2.java index f80dbb7..3f86820 100644 --- a/src/main/java/gc/mda/signal_batch/domain/gis/service/GisServiceV2.java +++ b/src/main/java/gc/mda/signal_batch/domain/gis/service/GisServiceV2.java @@ -2,12 +2,18 @@ package gc.mda.signal_batch.domain.gis.service; import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack; import gc.mda.signal_batch.domain.vessel.dto.TrackResponse; +import gc.mda.signal_batch.domain.vessel.dto.VesselTracksRequest; import gc.mda.signal_batch.domain.vessel.dto.IntegrationVessel; import gc.mda.signal_batch.domain.vessel.service.IntegrationVesselService; +import gc.mda.signal_batch.global.exception.QueryTimeoutException; import gc.mda.signal_batch.global.util.IntegrationSignalConstants; import gc.mda.signal_batch.global.util.TrackConverter; +import gc.mda.signal_batch.global.websocket.service.ActiveQueryManager; +import gc.mda.signal_batch.global.websocket.service.CacheTrackSimplifier; +import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; @@ -22,6 +28,11 @@ import java.util.stream.Collectors; /** * GIS 서비스 V2 - CompactVesselTrack 기반 응답 * WebSocket API와 동일한 응답 구조 제공 + * + * Phase: REST V2 캐시 + 부하 제어 + 응답 크기 제한 + * - Semaphore 기반 동시성 제어 (ActiveQueryManager 공유) + * - POST /vessels: DailyTrackCacheManager 캐시 우선 조회 + * - 2단계 간소화 파이프라인 (표준 간소화 + 포인트 버짓 강제) */ @Slf4j @Service @@ -29,37 +40,81 @@ public class GisServiceV2 { private final DataSource queryDataSource; private final IntegrationVesselService integrationVesselService; + private final ActiveQueryManager activeQueryManager; + private final DailyTrackCacheManager dailyTrackCacheManager; + private final CacheTrackSimplifier cacheTrackSimplifier; + private final GisService gisService; + + @Value("${rest.v2.query.timeout-seconds:30}") + private int restQueryTimeout; + + @Value("${rest.v2.query.max-total-points:500000}") + private int maxTotalPoints; // 선박 정보 캐시 (TTL: 1시간) private final ConcurrentHashMap vesselInfoCache = new ConcurrentHashMap<>(); private static final long VESSEL_CACHE_TTL = 3600_000; // 1시간 public GisServiceV2(@Qualifier("queryDataSource") DataSource queryDataSource, - IntegrationVesselService integrationVesselService) { + IntegrationVesselService integrationVesselService, + ActiveQueryManager activeQueryManager, + DailyTrackCacheManager dailyTrackCacheManager, + CacheTrackSimplifier cacheTrackSimplifier, + GisService gisService) { this.queryDataSource = queryDataSource; this.integrationVesselService = integrationVesselService; + this.activeQueryManager = activeQueryManager; + this.dailyTrackCacheManager = dailyTrackCacheManager; + this.cacheTrackSimplifier = cacheTrackSimplifier; + this.gisService = gisService; } /** * 해구별 선박 항적 조회 (V2 - CompactVesselTrack 반환) + * Semaphore 부하 제어 + 간소화 파이프라인 적용 */ public List getHaeguTracks(Integer haeguNo, int minutes, boolean filterByIntegration) { - JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource); - List rawTracks = new ArrayList<>(); + String queryId = "rest-haegu-" + haeguNo + "-" + UUID.randomUUID().toString().substring(0, 8); + boolean slotAcquired = false; - LocalDateTime now = LocalDateTime.now(); - LocalDateTime startTime = now.minusMinutes(minutes); + try { + slotAcquired = acquireSlotWithWait(queryId); - if (minutes > 60) { - LocalDateTime currentHour = now.withMinute(0).withSecond(0).withNano(0); + JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource); + List rawTracks = new ArrayList<>(); - if (minutes <= 1440) { - // hourly 테이블에서 과거 데이터 조회 - String hourlySql = """ + LocalDateTime now = LocalDateTime.now(); + LocalDateTime startTime = now.minusMinutes(minutes); + + if (minutes > 60) { + LocalDateTime currentHour = now.withMinute(0).withSecond(0).withNano(0); + + if (minutes <= 1440) { + String hourlySql = """ + SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket, + public.ST_AsText(t.track_geom) as track_geom, + t.distance_nm, t.avg_speed, t.max_speed, t.point_count + FROM signal.t_vessel_tracks_hourly t + WHERE EXISTS ( + SELECT 1 FROM signal.t_grid_vessel_tracks g + WHERE g.sig_src_cd = t.sig_src_cd + AND g.target_id = t.target_id + AND g.haegu_no = %d + AND g.time_bucket >= '%s' + ) + AND t.time_bucket >= '%s' + AND t.time_bucket < '%s' + ORDER BY t.sig_src_cd, t.target_id, t.time_bucket + """.formatted(haeguNo, startTime, startTime, currentHour); + + rawTracks.addAll(jdbcTemplate.query(hourlySql, this::mapTrackResponse)); + } + + String recentSql = """ SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket, public.ST_AsText(t.track_geom) as track_geom, t.distance_nm, t.avg_speed, t.max_speed, t.point_count - FROM signal.t_vessel_tracks_hourly t + FROM signal.t_vessel_tracks_5min t WHERE EXISTS ( SELECT 1 FROM signal.t_grid_vessel_tracks g WHERE g.sig_src_cd = t.sig_src_cd @@ -68,87 +123,97 @@ public class GisServiceV2 { AND g.time_bucket >= '%s' ) AND t.time_bucket >= '%s' - AND t.time_bucket < '%s' ORDER BY t.sig_src_cd, t.target_id, t.time_bucket - """.formatted(haeguNo, startTime, startTime, currentHour); + """.formatted(haeguNo, startTime, currentHour); - rawTracks.addAll(jdbcTemplate.query(hourlySql, this::mapTrackResponse)); + rawTracks.addAll(jdbcTemplate.query(recentSql, this::mapTrackResponse)); + + } else { + String sql = """ + SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket, + public.ST_AsText(t.track_geom) as track_geom, + t.distance_nm, t.avg_speed, t.max_speed, t.point_count + FROM signal.t_vessel_tracks_5min t + WHERE EXISTS ( + SELECT 1 FROM signal.t_grid_vessel_tracks g + WHERE g.sig_src_cd = t.sig_src_cd + AND g.target_id = t.target_id + AND g.haegu_no = %d + AND g.time_bucket >= NOW() - INTERVAL '%d minutes' + ) + AND t.time_bucket >= NOW() - INTERVAL '%d minutes' + ORDER BY t.sig_src_cd, t.target_id, t.time_bucket + """.formatted(haeguNo, minutes, minutes); + + rawTracks = jdbcTemplate.query(sql, this::mapTrackResponse); } - // 5min 테이블에서 최근 데이터 조회 - String recentSql = """ - SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket, - public.ST_AsText(t.track_geom) as track_geom, - t.distance_nm, t.avg_speed, t.max_speed, t.point_count - FROM signal.t_vessel_tracks_5min t - WHERE EXISTS ( - SELECT 1 FROM signal.t_grid_vessel_tracks g - WHERE g.sig_src_cd = t.sig_src_cd - AND g.target_id = t.target_id - AND g.haegu_no = %d - AND g.time_bucket >= '%s' - ) - AND t.time_bucket >= '%s' - ORDER BY t.sig_src_cd, t.target_id, t.time_bucket - """.formatted(haeguNo, startTime, currentHour); + List result = TrackConverter.convert(rawTracks, this::getVesselInfo); - rawTracks.addAll(jdbcTemplate.query(recentSql, this::mapTrackResponse)); + if (filterByIntegration && integrationVesselService.isEnabled()) { + result = filterByIntegration(result); + } - } else { - // 1시간 이하는 5분 테이블만 사용 - String sql = """ - SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket, - public.ST_AsText(t.track_geom) as track_geom, - t.distance_nm, t.avg_speed, t.max_speed, t.point_count - FROM signal.t_vessel_tracks_5min t - WHERE EXISTS ( - SELECT 1 FROM signal.t_grid_vessel_tracks g - WHERE g.sig_src_cd = t.sig_src_cd - AND g.target_id = t.target_id - AND g.haegu_no = %d - AND g.time_bucket >= NOW() - INTERVAL '%d minutes' - ) - AND t.time_bucket >= NOW() - INTERVAL '%d minutes' - ORDER BY t.sig_src_cd, t.target_id, t.time_bucket - """.formatted(haeguNo, minutes, minutes); + result = applySimplificationPipeline(result); - rawTracks = jdbcTemplate.query(sql, this::mapTrackResponse); + log.debug("V2 API: Fetched {} compact tracks for haegu {} in last {} minutes", + result.size(), haeguNo, minutes); + + return result; + + } finally { + if (slotAcquired) { + activeQueryManager.releaseQuerySlot(queryId); + } } - - // CompactVesselTrack으로 변환 - List result = TrackConverter.convert(rawTracks, this::getVesselInfo); - - // 통합선박 필터링 적용 - if (filterByIntegration && integrationVesselService.isEnabled()) { - result = filterByIntegration(result); - } - - log.debug("V2 API: Fetched {} compact tracks for haegu {} in last {} minutes", - result.size(), haeguNo, minutes); - - return result; } /** * 영역별 선박 항적 조회 (V2 - CompactVesselTrack 반환) + * Semaphore 부하 제어 + 간소화 파이프라인 적용 */ public List getAreaTracks(String areaId, int minutes, boolean filterByIntegration) { - JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource); - List rawTracks = new ArrayList<>(); + String queryId = "rest-area-" + areaId + "-" + UUID.randomUUID().toString().substring(0, 8); + boolean slotAcquired = false; - LocalDateTime now = LocalDateTime.now(); - LocalDateTime startTime = now.minusMinutes(minutes); + try { + slotAcquired = acquireSlotWithWait(queryId); - if (minutes > 60) { - LocalDateTime currentHour = now.withMinute(0).withSecond(0).withNano(0); + JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource); + List rawTracks = new ArrayList<>(); - if (minutes <= 1440) { - // hourly 테이블에서 과거 데이터 조회 - String hourlySql = """ + LocalDateTime now = LocalDateTime.now(); + LocalDateTime startTime = now.minusMinutes(minutes); + + if (minutes > 60) { + LocalDateTime currentHour = now.withMinute(0).withSecond(0).withNano(0); + + if (minutes <= 1440) { + String hourlySql = """ + SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket, + public.ST_AsText(t.track_geom) as track_geom, + t.distance_nm, t.avg_speed, t.max_speed, t.point_count + FROM signal.t_vessel_tracks_hourly t + WHERE EXISTS ( + SELECT 1 FROM signal.t_area_vessel_tracks a + WHERE a.sig_src_cd = t.sig_src_cd + AND a.target_id = t.target_id + AND a.area_id = '%s' + AND a.time_bucket >= '%s' + ) + AND t.time_bucket >= '%s' + AND t.time_bucket < '%s' + ORDER BY t.sig_src_cd, t.target_id, t.time_bucket + """.formatted(areaId, startTime, startTime, currentHour); + + rawTracks.addAll(jdbcTemplate.query(hourlySql, this::mapTrackResponse)); + } + + String recentSql = """ SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket, public.ST_AsText(t.track_geom) as track_geom, t.distance_nm, t.avg_speed, t.max_speed, t.point_count - FROM signal.t_vessel_tracks_hourly t + FROM signal.t_vessel_tracks_5min t WHERE EXISTS ( SELECT 1 FROM signal.t_area_vessel_tracks a WHERE a.sig_src_cd = t.sig_src_cd @@ -157,70 +222,284 @@ public class GisServiceV2 { AND a.time_bucket >= '%s' ) AND t.time_bucket >= '%s' - AND t.time_bucket < '%s' ORDER BY t.sig_src_cd, t.target_id, t.time_bucket - """.formatted(areaId, startTime, startTime, currentHour); + """.formatted(areaId, startTime, currentHour); - rawTracks.addAll(jdbcTemplate.query(hourlySql, this::mapTrackResponse)); + rawTracks.addAll(jdbcTemplate.query(recentSql, this::mapTrackResponse)); + + } else { + String sql = """ + SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket, + public.ST_AsText(t.track_geom) as track_geom, + t.distance_nm, t.avg_speed, t.max_speed, t.point_count + FROM signal.t_vessel_tracks_5min t + WHERE EXISTS ( + SELECT 1 FROM signal.t_area_vessel_tracks a + WHERE a.sig_src_cd = t.sig_src_cd + AND a.target_id = t.target_id + AND a.area_id = '%s' + AND a.time_bucket >= NOW() - INTERVAL '%d minutes' + ) + AND t.time_bucket >= NOW() - INTERVAL '%d minutes' + ORDER BY t.sig_src_cd, t.target_id, t.time_bucket + """.formatted(areaId, minutes, minutes); + + rawTracks = jdbcTemplate.query(sql, this::mapTrackResponse); } - // 5min 테이블에서 최근 데이터 조회 - String recentSql = """ - SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket, - public.ST_AsText(t.track_geom) as track_geom, - t.distance_nm, t.avg_speed, t.max_speed, t.point_count - FROM signal.t_vessel_tracks_5min t - WHERE EXISTS ( - SELECT 1 FROM signal.t_area_vessel_tracks a - WHERE a.sig_src_cd = t.sig_src_cd - AND a.target_id = t.target_id - AND a.area_id = '%s' - AND a.time_bucket >= '%s' - ) - AND t.time_bucket >= '%s' - ORDER BY t.sig_src_cd, t.target_id, t.time_bucket - """.formatted(areaId, startTime, currentHour); + List result = TrackConverter.convert(rawTracks, this::getVesselInfo); - rawTracks.addAll(jdbcTemplate.query(recentSql, this::mapTrackResponse)); + if (filterByIntegration && integrationVesselService.isEnabled()) { + result = filterByIntegration(result); + } - } else { - // 1시간 이하는 5분 테이블만 사용 - String sql = """ - SELECT DISTINCT t.sig_src_cd, t.target_id, t.time_bucket, - public.ST_AsText(t.track_geom) as track_geom, - t.distance_nm, t.avg_speed, t.max_speed, t.point_count - FROM signal.t_vessel_tracks_5min t - WHERE EXISTS ( - SELECT 1 FROM signal.t_area_vessel_tracks a - WHERE a.sig_src_cd = t.sig_src_cd - AND a.target_id = t.target_id - AND a.area_id = '%s' - AND a.time_bucket >= NOW() - INTERVAL '%d minutes' - ) - AND t.time_bucket >= NOW() - INTERVAL '%d minutes' - ORDER BY t.sig_src_cd, t.target_id, t.time_bucket - """.formatted(areaId, minutes, minutes); + result = applySimplificationPipeline(result); - rawTracks = jdbcTemplate.query(sql, this::mapTrackResponse); + log.debug("V2 API: Fetched {} compact tracks for area {} in last {} minutes", + result.size(), areaId, minutes); + + return result; + + } finally { + if (slotAcquired) { + activeQueryManager.releaseQuerySlot(queryId); + } } - - // CompactVesselTrack으로 변환 - List result = TrackConverter.convert(rawTracks, this::getVesselInfo); - - // 통합선박 필터링 적용 - if (filterByIntegration && integrationVesselService.isEnabled()) { - result = filterByIntegration(result); - } - - log.debug("V2 API: Fetched {} compact tracks for area {} in last {} minutes", - result.size(), areaId, minutes); - - return result; } /** - * TrackResponse 매핑 + * 선박별 항적 조회 V2 (캐시 + Semaphore + 간소화) + * DailyTrackCacheManager를 활용한 캐시 우선 조회 */ + public List getVesselTracksV2(VesselTracksRequest request) { + String queryId = "rest-vessels-" + UUID.randomUUID().toString().substring(0, 8); + boolean slotAcquired = false; + + try { + slotAcquired = acquireSlotWithWait(queryId); + + List result; + + if (dailyTrackCacheManager.isEnabled() && + dailyTrackCacheManager.getStatus() != DailyTrackCacheManager.CacheStatus.NOT_STARTED && + dailyTrackCacheManager.getStatus() != DailyTrackCacheManager.CacheStatus.DISABLED) { + + result = queryWithCache(request); + } else { + // 캐시 비활성화/미준비: 기존 GisService에 위임 + result = gisService.getVesselTracks(request); + } + + result = applySimplificationPipeline(result); + + log.debug("V2 API: Returned {} tracks for {} vessels (cache={})", + result.size(), request.getVessels().size(), dailyTrackCacheManager.isEnabled()); + + return result; + + } finally { + if (slotAcquired) { + activeQueryManager.releaseQuerySlot(queryId); + } + } + } + + // ── 캐시 조회 로직 ── + + /** + * splitQueryRange를 사용한 캐시 우선 조회 + * D-1부터 역순으로 캐시 존재 확인 → 캐시/DB 분리 조회 → 병합 + */ + private List queryWithCache(VesselTracksRequest request) { + LocalDateTime startTime = request.getStartTime(); + LocalDateTime endTime = request.getEndTime(); + + DailyTrackCacheManager.SplitQueryResult split = + dailyTrackCacheManager.splitQueryRange(startTime, endTime); + + List allTracks = new ArrayList<>(); + + // 요청 선박 ID 집합 구성 + Set requestedVesselKeys = request.getVessels().stream() + .map(v -> v.getSigSrcCd() + "_" + v.getTargetId()) + .collect(Collectors.toSet()); + + // 1. 캐시에서 조회 (캐시된 날짜) + if (split.hasCachedData()) { + List cachedTracks = + dailyTrackCacheManager.getCachedTracksMultipleDays(split.getCachedDates()); + + // 요청 선박만 필터링 + List filteredCached = cachedTracks.stream() + .filter(t -> requestedVesselKeys.contains(t.getSigSrcCd() + "_" + t.getTargetId())) + .collect(Collectors.toList()); + + allTracks.addAll(filteredCached); + log.debug("[CacheQuery] cached {} days -> {} tracks (filtered from {})", + split.getCachedDates().size(), filteredCached.size(), cachedTracks.size()); + } + + // 2. DB에서 조회 (캐시 미적중 과거 날짜) + if (split.hasDbRanges()) { + for (DailyTrackCacheManager.DateRange dbRange : split.getDbRanges()) { + VesselTracksRequest dbRequest = VesselTracksRequest.builder() + .startTime(dbRange.getStart()) + .endTime(dbRange.getEnd()) + .vessels(request.getVessels()) + .isIntegration(request.getIsIntegration()) + .build(); + List dbTracks = gisService.getVesselTracks(dbRequest); + allTracks.addAll(dbTracks); + log.debug("[CacheQuery] DB range {} ~ {} -> {} tracks", + dbRange.getStart(), dbRange.getEnd(), dbTracks.size()); + } + } + + // 3. 오늘 구간 DB 조회 (hourly + 5min) + if (split.hasTodayRange()) { + DailyTrackCacheManager.DateRange today = split.getTodayRange(); + VesselTracksRequest todayRequest = VesselTracksRequest.builder() + .startTime(today.getStart()) + .endTime(today.getEnd()) + .vessels(request.getVessels()) + .isIntegration(request.getIsIntegration()) + .build(); + List todayTracks = gisService.getVesselTracks(todayRequest); + allTracks.addAll(todayTracks); + log.debug("[CacheQuery] today {} ~ {} -> {} tracks", + today.getStart(), today.getEnd(), todayTracks.size()); + } + + // 4. 동일 선박 병합 (캐시 + DB 결과) + List merged = mergeTracksByVessel(allTracks); + + // 5. 통합선박 필터링 + if ("1".equals(request.getIsIntegration()) && integrationVesselService.isEnabled()) { + merged = filterByIntegration(merged); + } + + return merged; + } + + // ── Semaphore 슬롯 획득 ── + + /** + * REST V2 전용 슬롯 획득: 즉시 시도 → blocking 대기 → 타임아웃 시 예외 + */ + private boolean acquireSlotWithWait(String queryId) { + if (activeQueryManager.tryAcquireQuerySlotImmediate(queryId)) { + return true; + } + + try { + boolean acquired = activeQueryManager.tryAcquireQuerySlot(queryId); + if (!acquired) { + throw new QueryTimeoutException( + "서버가 과부하 상태입니다. " + restQueryTimeout + "초 대기 후 타임아웃되었습니다."); + } + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new QueryTimeoutException("쿼리 대기 중 인터럽트가 발생했습니다."); + } + } + + // ── 간소화 파이프라인 ── + + /** + * 2단계 간소화 파이프라인 + * [1단계] 표준 간소화 (DP + 거리/시간 + 줌) + * [2단계] 포인트 버짓 강제 (총 포인트 상한 초과 시 균일 Nth-point) + */ + private List applySimplificationPipeline(List tracks) { + if (tracks == null || tracks.isEmpty()) { + return tracks; + } + + // 1단계: 표준 간소화 + tracks = cacheTrackSimplifier.simplify(tracks, CacheTrackSimplifier.SimplificationConfig.builder().build()); + + // 2단계: 포인트 버짓 강제 + tracks = cacheTrackSimplifier.enforcePointBudget(tracks, maxTotalPoints); + + return tracks; + } + + // ── 선박별 트랙 병합 ── + + /** + * 동일 선박(vesselId)의 트랙을 병합 + * 캐시와 DB에서 동일 선박 데이터가 올 수 있으므로 geometry/timestamps/speeds 합산 + */ + private List mergeTracksByVessel(List tracks) { + if (tracks == null || tracks.size() <= 1) { + return tracks != null ? tracks : Collections.emptyList(); + } + + Map> grouped = tracks.stream() + .collect(Collectors.groupingBy(t -> t.getSigSrcCd() + "_" + t.getTargetId())); + + // 병합이 필요 없는 경우 (모든 선박이 1개씩만) + if (grouped.values().stream().allMatch(list -> list.size() == 1)) { + return tracks; + } + + List merged = new ArrayList<>(); + + for (Map.Entry> entry : grouped.entrySet()) { + List vesselTracks = entry.getValue(); + + if (vesselTracks.size() == 1) { + merged.add(vesselTracks.get(0)); + continue; + } + + // 첫 번째 트랙을 기준으로 병합 + CompactVesselTrack base = vesselTracks.get(0); + List allGeometry = new ArrayList<>(base.getGeometry() != null ? base.getGeometry() : Collections.emptyList()); + List allTimestamps = new ArrayList<>(base.getTimestamps() != null ? base.getTimestamps() : Collections.emptyList()); + List allSpeeds = new ArrayList<>(base.getSpeeds() != null ? base.getSpeeds() : Collections.emptyList()); + double totalDistance = base.getTotalDistance(); + double maxSpeed = base.getMaxSpeed(); + int totalPointCount = base.getPointCount(); + + for (int i = 1; i < vesselTracks.size(); i++) { + CompactVesselTrack t = vesselTracks.get(i); + if (t.getGeometry() != null) allGeometry.addAll(t.getGeometry()); + if (t.getTimestamps() != null) allTimestamps.addAll(t.getTimestamps()); + if (t.getSpeeds() != null) allSpeeds.addAll(t.getSpeeds()); + totalDistance += t.getTotalDistance(); + maxSpeed = Math.max(maxSpeed, t.getMaxSpeed()); + totalPointCount += t.getPointCount(); + } + + CompactVesselTrack mergedTrack = CompactVesselTrack.builder() + .vesselId(base.getVesselId()) + .sigSrcCd(base.getSigSrcCd()) + .targetId(base.getTargetId()) + .nationalCode(base.getNationalCode()) + .shipName(base.getShipName()) + .shipType(base.getShipType()) + .shipKindCode(base.getShipKindCode()) + .integrationTargetId(base.getIntegrationTargetId()) + .geometry(allGeometry) + .timestamps(allTimestamps) + .speeds(allSpeeds) + .totalDistance(totalDistance) + .avgSpeed(base.getAvgSpeed()) + .maxSpeed(maxSpeed) + .pointCount(totalPointCount) + .build(); + + merged.add(mergedTrack); + } + + log.debug("[MergeVessels] {} tracks -> {} merged vessels", tracks.size(), merged.size()); + return merged; + } + + // ── 기존 유틸리티 메서드 (변경 없음) ── + private TrackResponse mapTrackResponse(ResultSet rs, int rowNum) throws SQLException { return TrackResponse.builder() .sigSrcCd(rs.getString("sig_src_cd")) @@ -234,9 +513,6 @@ public class GisServiceV2 { .build(); } - /** - * 선박 정보 조회 (캐시 우선) - */ private TrackConverter.VesselInfo getVesselInfo(String sigSrcCd, String targetId) { String cacheKey = sigSrcCd + "_" + targetId; @@ -245,7 +521,6 @@ public class GisServiceV2 { return new TrackConverter.VesselInfo(cached.shipName, cached.shipType); } - // DB 조회 JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource); try { String sql = """ @@ -259,7 +534,6 @@ public class GisServiceV2 { String shipName = result.get("ship_nm") != null ? result.get("ship_nm").toString() : "-"; String shipType = result.get("ship_ty") != null ? result.get("ship_ty").toString() : "-"; - // 캐시 저장 vesselInfoCache.put(cacheKey, new VesselInfoCache(shipName, shipType)); return new TrackConverter.VesselInfo(shipName, shipType); @@ -268,15 +542,11 @@ public class GisServiceV2 { } } - /** - * 통합선박 기준 필터링 - */ private List filterByIntegration(List tracks) { if (tracks == null || tracks.isEmpty()) { return tracks; } - // 1. 모든 트랙의 통합선박 정보 조회 Map vesselIntegrations = new HashMap<>(); for (CompactVesselTrack track : tracks) { String key = track.getSigSrcCd() + "_" + track.getTargetId(); @@ -288,7 +558,6 @@ public class GisServiceV2 { } } - // 2. 통합선박별 그룹핑 Map> groupedByIntegration = new HashMap<>(); Map integrationMap = new HashMap<>(); @@ -308,7 +577,6 @@ public class GisServiceV2 { groupedByIntegration.computeIfAbsent(seq, k -> new ArrayList<>()).add(track); } - // 3. 각 그룹에서 최고 우선순위 신호만 선택 List result = new ArrayList<>(); for (Map.Entry> entry : groupedByIntegration.entrySet()) { @@ -316,7 +584,6 @@ public class GisServiceV2 { List groupTracks = entry.getValue(); if (seq < 0) { - // 통합정보 없는 단독 선박 CompactVesselTrack firstTrack = groupTracks.get(0); String soloIntegrationId = IntegrationSignalConstants.generateSoloIntegrationId( firstTrack.getSigSrcCd(), @@ -325,7 +592,6 @@ public class GisServiceV2 { groupTracks.forEach(t -> t.setIntegrationTargetId(soloIntegrationId)); result.addAll(groupTracks); } else { - // 통합선박 → 존재하는 신호 중 최고 우선순위 선택 IntegrationVessel integration = integrationMap.get(seq); Set existingSigSrcCds = groupTracks.stream() @@ -349,9 +615,6 @@ public class GisServiceV2 { return result; } - /** - * 선박 정보 캐시 내부 클래스 - */ private static class VesselInfoCache { String shipName; String shipType; diff --git a/src/main/java/gc/mda/signal_batch/domain/vessel/dto/CompactVesselTrack.java b/src/main/java/gc/mda/signal_batch/domain/vessel/dto/CompactVesselTrack.java index d646ad2..c294896 100644 --- a/src/main/java/gc/mda/signal_batch/domain/vessel/dto/CompactVesselTrack.java +++ b/src/main/java/gc/mda/signal_batch/domain/vessel/dto/CompactVesselTrack.java @@ -16,7 +16,7 @@ import java.util.List; * LineStringM 대신 단순 배열로 전송하여 프론트엔드 파싱 부하 제거 */ @Data -@Builder +@Builder(toBuilder = true) @NoArgsConstructor @AllArgsConstructor @JsonInclude(JsonInclude.Include.NON_NULL) diff --git a/src/main/java/gc/mda/signal_batch/global/exception/QueryTimeoutException.java b/src/main/java/gc/mda/signal_batch/global/exception/QueryTimeoutException.java new file mode 100644 index 0000000..c4465d8 --- /dev/null +++ b/src/main/java/gc/mda/signal_batch/global/exception/QueryTimeoutException.java @@ -0,0 +1,16 @@ +package gc.mda.signal_batch.global.exception; + +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.ResponseStatus; + +/** + * REST V2 쿼리 슬롯 대기 타임아웃 시 발생 + * 503 Service Unavailable 자동 반환 + */ +@ResponseStatus(HttpStatus.SERVICE_UNAVAILABLE) +public class QueryTimeoutException extends RuntimeException { + + public QueryTimeoutException(String message) { + super(message); + } +} diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/CacheTrackSimplifier.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/CacheTrackSimplifier.java new file mode 100644 index 0000000..a481735 --- /dev/null +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/CacheTrackSimplifier.java @@ -0,0 +1,430 @@ +package gc.mda.signal_batch.global.websocket.service; + +import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack; +import lombok.Builder; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; + +/** + * 인메모리 캐시 트랙 간소화 유틸리티 + * DB 경로의 ST_Simplify + 거리/시간 샘플링 + 줌 샘플링과 동등한 간소화를 캐시 경로에 적용 + */ +@Slf4j +@Component +public class CacheTrackSimplifier { + + private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private static final double EARTH_RADIUS_NM = 3440.065; + + // ── SimplificationConfig: 향후 요청 파라미터로 노출 가능한 구조 ── + @Data + @Builder + public static class SimplificationConfig { + /** Douglas-Peucker tolerance (ST_Simplify 대체, degree 단위) */ + @Builder.Default private double dpTolerance = 0.005; + /** 거리 기반 샘플링 최소 거리 (해리) */ + @Builder.Default private double minDistanceNm = 0.05; + /** 시간 기반 샘플링 최소 간격 (분) */ + @Builder.Default private long minIntervalMinutes = 60; + /** 저속 선박 속도 임계값 (knots) */ + @Builder.Default private double lowSpeedThreshold = 5.0; + /** 저속 선박 최소 거리 (해리) */ + @Builder.Default private double lowSpeedMinDistanceNm = 1.5; + /** 저속 선박 최소 시간 간격 (분) */ + @Builder.Default private long lowSpeedMinIntervalMinutes = 45; + /** 줌 기반 N번째 포인트 샘플링 (null = 미적용) */ + @Builder.Default private Integer zoomSampleRate = null; + } + + // ── zoom → config 팩토리 ── + public static SimplificationConfig forZoomLevel(Integer zoomLevel) { + return SimplificationConfig.builder() + .dpTolerance(getDpTolerance(zoomLevel)) + .zoomSampleRate(getZoomSampleRate(zoomLevel)) + .build(); + } + + private static double getDpTolerance(Integer zoom) { + if (zoom == null) zoom = 10; + return switch (zoom) { + case 0, 1, 2, 3, 4, 5 -> 0.05; + case 6, 7 -> 0.02; + case 8, 9 -> 0.01; + case 10, 11 -> 0.005; + default -> 0.002; + }; + } + + private static Integer getZoomSampleRate(Integer zoom) { + if (zoom == null || zoom >= 10) return null; + if (zoom < 6) return 10; + if (zoom < 8) return 5; + return 2; + } + + // ── 메인 API ── + + public List simplify(List tracks, Integer zoomLevel) { + return simplify(tracks, forZoomLevel(zoomLevel)); + } + + public List simplify(List tracks, SimplificationConfig config) { + if (tracks == null || tracks.isEmpty()) { + return tracks; + } + + long startTime = System.currentTimeMillis(); + int totalOriginalPoints = 0; + int totalAfterDp = 0; + int totalAfterDistTime = 0; + int totalAfterZoom = 0; + int simplifiedCount = 0; + int skippedCount = 0; + + for (int t = 0; t < tracks.size(); t++) { + CompactVesselTrack track = tracks.get(t); + if (track.getGeometry() == null || track.getGeometry().size() <= 2) { + skippedCount++; + continue; + } + + int originalSize = track.getGeometry().size(); + totalOriginalPoints += originalSize; + + // 1단계: Douglas-Peucker + applyDouglasPeucker(track, config.getDpTolerance()); + int afterDp = track.getGeometry().size(); + totalAfterDp += afterDp; + + // 2단계: 거리/시간 샘플링 + applyDistanceTimeSampling(track, config); + int afterDistTime = track.getGeometry().size(); + totalAfterDistTime += afterDistTime; + + // 3단계: 줌 기반 추가 샘플링 + if (config.getZoomSampleRate() != null) { + applyZoomSampling(track, config.getZoomSampleRate()); + } + int afterZoom = track.getGeometry().size(); + totalAfterZoom += afterZoom; + + track.setPointCount(afterZoom); + + // 처음 5개 선박 상세 로그 (debug 레벨) + if (simplifiedCount < 5) { + log.debug("[CacheSimplify] vessel={} original={} -> DP={} -> distTime={} -> zoom={} (avg={} kn)", + track.getVesselId(), originalSize, afterDp, afterDistTime, afterZoom, + track.getAvgSpeed() != null ? String.format("%.1f", track.getAvgSpeed()) : "N/A"); + } + simplifiedCount++; + } + + long elapsed = System.currentTimeMillis() - startTime; + + if (simplifiedCount > 0) { + double totalReduction = (1 - (double) totalAfterZoom / totalOriginalPoints) * 100; + + log.info("[CacheSimplify] tracks={} (skip={}), {} -> {} pts ({}% 감소), {}ms", + simplifiedCount, skippedCount, + totalOriginalPoints, totalAfterZoom, Math.round(totalReduction), elapsed); + } + + return tracks; + } + + // ── 1단계: Douglas-Peucker (ST_Simplify 대체) ── + + private void applyDouglasPeucker(CompactVesselTrack track, double tolerance) { + List geometry = track.getGeometry(); + int n = geometry.size(); + if (n <= 2) return; + + boolean[] keep = new boolean[n]; + keep[0] = true; + keep[n - 1] = true; + + douglasPeuckerRecursive(geometry, 0, n - 1, tolerance, keep); + + List newGeometry = new ArrayList<>(); + List newTimestamps = new ArrayList<>(); + List newSpeeds = new ArrayList<>(); + + List timestamps = track.getTimestamps(); + List speeds = track.getSpeeds(); + + for (int i = 0; i < n; i++) { + if (keep[i]) { + newGeometry.add(geometry.get(i)); + if (timestamps != null && i < timestamps.size()) { + newTimestamps.add(timestamps.get(i)); + } + if (speeds != null && i < speeds.size()) { + newSpeeds.add(speeds.get(i)); + } + } + } + + track.setGeometry(newGeometry); + track.setTimestamps(newTimestamps); + track.setSpeeds(newSpeeds); + } + + private void douglasPeuckerRecursive(List points, int start, int end, double tolerance, boolean[] keep) { + if (end - start < 2) return; + + double maxDist = 0; + int maxIndex = start; + + double[] p1 = points.get(start); + double[] p2 = points.get(end); + + for (int i = start + 1; i < end; i++) { + double dist = perpendicularDistance(points.get(i), p1, p2); + if (dist > maxDist) { + maxDist = dist; + maxIndex = i; + } + } + + if (maxDist > tolerance) { + keep[maxIndex] = true; + douglasPeuckerRecursive(points, start, maxIndex, tolerance, keep); + douglasPeuckerRecursive(points, maxIndex, end, tolerance, keep); + } + } + + /** + * 점-선분 직교 거리 (degree 단위, ST_Simplify와 동일) + */ + private double perpendicularDistance(double[] point, double[] lineStart, double[] lineEnd) { + double dx = lineEnd[0] - lineStart[0]; + double dy = lineEnd[1] - lineStart[1]; + + if (dx == 0 && dy == 0) { + // 선분이 점인 경우 + double pdx = point[0] - lineStart[0]; + double pdy = point[1] - lineStart[1]; + return Math.sqrt(pdx * pdx + pdy * pdy); + } + + double t = ((point[0] - lineStart[0]) * dx + (point[1] - lineStart[1]) * dy) / (dx * dx + dy * dy); + t = Math.max(0, Math.min(1, t)); + + double nearestX = lineStart[0] + t * dx; + double nearestY = lineStart[1] + t * dy; + + double distX = point[0] - nearestX; + double distY = point[1] - nearestY; + + return Math.sqrt(distX * distX + distY * distY); + } + + // ── 2단계: 거리/시간 기반 샘플링 ── + + private void applyDistanceTimeSampling(CompactVesselTrack track, SimplificationConfig config) { + List geometry = track.getGeometry(); + List timestamps = track.getTimestamps(); + List speeds = track.getSpeeds(); + + if (geometry.size() <= 2) return; + + List simplified = new ArrayList<>(); + List simplifiedTs = new ArrayList<>(); + List simplifiedSpd = new ArrayList<>(); + + double[] prevPoint = null; + LocalDateTime prevTime = null; + + for (int i = 0; i < geometry.size(); i++) { + double[] point = geometry.get(i); + boolean include = false; + + if (i == 0 || i == geometry.size() - 1) { + include = true; + } else if (prevPoint != null && prevTime != null && timestamps != null && i < timestamps.size()) { + LocalDateTime currentTime = parseTimestamp(timestamps.get(i)); + if (currentTime != null) { + double distance = calculateDistance(prevPoint[1], prevPoint[0], point[1], point[0]); + long minutesSincePrev = ChronoUnit.MINUTES.between(prevTime, currentTime); + + // DAILY 전략 기준 (DB 경로와 동일) + include = distance > config.getMinDistanceNm() || minutesSincePrev >= config.getMinIntervalMinutes(); + + // 저속 선박 강화 간소화 + if (track.getAvgSpeed() != null && track.getAvgSpeed() > 0 && track.getAvgSpeed() < config.getLowSpeedThreshold()) { + include = distance > config.getLowSpeedMinDistanceNm() || minutesSincePrev >= config.getLowSpeedMinIntervalMinutes(); + } + } + } + + if (include) { + simplified.add(point); + if (timestamps != null && i < timestamps.size()) { + simplifiedTs.add(timestamps.get(i)); + } + if (speeds != null && i < speeds.size()) { + simplifiedSpd.add(speeds.get(i)); + } + prevPoint = point; + if (timestamps != null && i < timestamps.size()) { + prevTime = parseTimestamp(timestamps.get(i)); + } + } + } + + track.setGeometry(simplified); + track.setTimestamps(simplifiedTs); + track.setSpeeds(simplifiedSpd); + } + + private LocalDateTime parseTimestamp(String tsStr) { + if (tsStr == null) return null; + try { + if (tsStr.matches("\\d{10,}")) { + return LocalDateTime.ofInstant( + java.time.Instant.ofEpochSecond(Long.parseLong(tsStr)), + java.time.ZoneId.systemDefault()); + } else { + return LocalDateTime.parse(tsStr, TIMESTAMP_FORMATTER); + } + } catch (Exception e) { + return null; + } + } + + // ── 3단계: 줌 기반 추가 샘플링 ── + + private void applyZoomSampling(CompactVesselTrack track, int sampleRate) { + List geometry = track.getGeometry(); + List timestamps = track.getTimestamps(); + List speeds = track.getSpeeds(); + + if (geometry.size() <= 2 || sampleRate <= 1) return; + + List sampled = new ArrayList<>(); + List sampledTs = new ArrayList<>(); + List sampledSpd = new ArrayList<>(); + + for (int i = 0; i < geometry.size(); i++) { + if (i % sampleRate == 0 || i == geometry.size() - 1) { + sampled.add(geometry.get(i)); + if (timestamps != null && i < timestamps.size()) { + sampledTs.add(timestamps.get(i)); + } + if (speeds != null && i < speeds.size()) { + sampledSpd.add(speeds.get(i)); + } + } + } + + track.setGeometry(sampled); + track.setTimestamps(sampledTs); + track.setSpeeds(sampledSpd); + } + + // ── 포인트 버짓 강제 (REST V2 응답 크기 제한) ── + + /** + * 전체 트랙의 총 포인트가 상한을 초과하면 비율 기반 균등 분배 샘플링 적용. + * 모든 트랙에 동일한 비율(ratio)을 적용하여 일관적 간소화 보장. + * + * 예시: 총 51만, 상한 50만 → ratio=0.98, 각 트랙 포인트 2%만 제거 → 결과 ≈50만 + * 예시: 총 100만, 상한 50만 → ratio=0.5, 각 트랙 포인트 50% 제거 → 결과 ≈50만 + */ + public List enforcePointBudget(List tracks, int maxTotalPoints) { + if (tracks == null || tracks.isEmpty() || maxTotalPoints <= 0) { + return tracks; + } + + int totalPoints = tracks.stream() + .mapToInt(t -> t.getGeometry() != null ? t.getGeometry().size() : 0) + .sum(); + + if (totalPoints <= maxTotalPoints) { + log.debug("[PointBudget] totalPoints={} <= max={}, no additional simplification needed", + totalPoints, maxTotalPoints); + return tracks; + } + + double ratio = (double) maxTotalPoints / totalPoints; + log.info("[PointBudget] totalPoints={} > max={}, applying uniform ratio={} to {} tracks", + totalPoints, maxTotalPoints, String.format("%.4f", ratio), tracks.size()); + + int resultPoints = 0; + for (CompactVesselTrack track : tracks) { + if (track.getGeometry() != null && track.getGeometry().size() > 2) { + int trackPoints = track.getGeometry().size(); + int targetPoints = Math.max(2, (int) Math.round(trackPoints * ratio)); + + if (targetPoints < trackPoints) { + applyProportionalSampling(track, targetPoints); + track.setPointCount(track.getGeometry().size()); + } + } + resultPoints += track.getGeometry() != null ? track.getGeometry().size() : 0; + } + + log.info("[PointBudget] reduced {} -> {} points ({} tracks)", + totalPoints, resultPoints, tracks.size()); + + return tracks; + } + + /** + * 비율 기반 균등 분배 샘플링: targetPoints 개수만큼 균등 간격으로 포인트 선택. + * 첫 포인트와 마지막 포인트는 항상 보존. + */ + private void applyProportionalSampling(CompactVesselTrack track, int targetPoints) { + List geometry = track.getGeometry(); + List timestamps = track.getTimestamps(); + List speeds = track.getSpeeds(); + int n = geometry.size(); + + if (targetPoints >= n || targetPoints < 2) return; + + List sampled = new ArrayList<>(targetPoints); + List sampledTs = (timestamps != null) ? new ArrayList<>(targetPoints) : null; + List sampledSpd = (speeds != null) ? new ArrayList<>(targetPoints) : null; + + double stride = (double) (n - 1) / (targetPoints - 1); + + for (int i = 0; i < targetPoints; i++) { + int index = (i == targetPoints - 1) ? n - 1 : (int) Math.round(i * stride); + + sampled.add(geometry.get(index)); + if (sampledTs != null && timestamps != null && index < timestamps.size()) { + sampledTs.add(timestamps.get(index)); + } + if (sampledSpd != null && speeds != null && index < speeds.size()) { + sampledSpd.add(speeds.get(index)); + } + } + + track.setGeometry(sampled); + if (sampledTs != null) track.setTimestamps(sampledTs); + if (sampledSpd != null) track.setSpeeds(sampledSpd); + } + + // ── 거리 계산 (Haversine, 해리 단위) ── + + private double calculateDistance(double lat1, double lon1, double lat2, double lon2) { + double lat1Rad = Math.toRadians(lat1); + double lat2Rad = Math.toRadians(lat2); + double deltaLat = lat2Rad - lat1Rad; + double deltaLon = Math.toRadians(lon2 - lon1); + + double a = Math.sin(deltaLat / 2) * Math.sin(deltaLat / 2) + + Math.cos(lat1Rad) * Math.cos(lat2Rad) * + Math.sin(deltaLon / 2) * Math.sin(deltaLon / 2); + double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a)); + + return EARTH_RADIUS_NM * c; + } +} diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java index 909ce56..27203d2 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java @@ -49,6 +49,8 @@ import java.util.concurrent.atomic.AtomicInteger; @Service public class ChunkedTrackStreamingService { + private static final org.slf4j.Logger benchmarkLog = org.slf4j.LoggerFactory.getLogger("CACHE_BENCHMARK"); // [BENCHMARK] + private final JdbcTemplate queryJdbcTemplate; private final DataSource queryDataSource; @SuppressWarnings("unused") @@ -57,6 +59,7 @@ public class ChunkedTrackStreamingService { private final IntegrationVesselService integrationVesselService; private final TrackQueryInterceptor trackQueryInterceptor; private final DailyTrackCacheManager dailyTrackCacheManager; + private final CacheTrackSimplifier cacheTrackSimplifier; private final WKTReader wktReader = new WKTReader(); @SuppressWarnings("unused") private final ExecutorService executorService = Executors.newFixedThreadPool(10); @@ -97,7 +100,8 @@ public class ChunkedTrackStreamingService { ActiveQueryManager activeQueryManager, IntegrationVesselService integrationVesselService, TrackQueryInterceptor trackQueryInterceptor, - DailyTrackCacheManager dailyTrackCacheManager) { + DailyTrackCacheManager dailyTrackCacheManager, + CacheTrackSimplifier cacheTrackSimplifier) { this.queryJdbcTemplate = queryJdbcTemplate; this.queryDataSource = queryDataSource; this.simplificationStrategy = simplificationStrategy; @@ -105,6 +109,7 @@ public class ChunkedTrackStreamingService { this.integrationVesselService = integrationVesselService; this.trackQueryInterceptor = trackQueryInterceptor; this.dailyTrackCacheManager = dailyTrackCacheManager; + this.cacheTrackSimplifier = cacheTrackSimplifier; } /** @@ -138,6 +143,61 @@ public class ChunkedTrackStreamingService { private volatile int dynamicChunkSizeKB = MAX_MESSAGE_SIZE_KB; } + // [BENCHMARK] 쿼리별 벤치마크 지표 수집용 내부 클래스 - 제거 시 이 클래스 전체 삭제 + private static class QueryBenchmark { + int cacheHitDays = 0; + int dbQueryDays = 0; + int totalTracks = 0; + int totalPointsBefore = 0; + int totalPointsAfter = 0; + int totalBatches = 0; + int batchesBeforeSimplify = 0; + long simplifyTimeMs = 0; + long dbQueryTimeMs = 0; + // [BENCHMARK] DB 커넥션 세분화 카운터 (기존 단일 dbConnectionCount 대체) + int connViewportPass1 = 0; // collectViewportVesselIds DB 쿼리 + int connDailyPages = 0; // streamDailyTableWithPagination 페이지 + int connVesselInfo = 0; // preloadVesselInfoWithSessionCache 배치 + int connHourly5min = 0; // processTableRange (5min/hourly) + int connTableCheck = 0; // hasDataInTable 존재 검증 + Integer zoomLevel; + + String determinePath() { + if (cacheHitDays > 0 && dbQueryDays > 0) return "HYBRID"; + if (cacheHitDays > 0) return "CACHE"; + return "DB"; + } + + int dbConnectionTotal() { // [BENCHMARK] + return connViewportPass1 + connDailyPages + connVesselInfo + connHourly5min + connTableCheck; + } + + String toJson(String queryId, int uniqueVessels, long totalElapsedMs) { + return String.format( + "{\"queryId\":\"%s\",\"timestamp\":\"%s\",\"path\":\"%s\"," + + "\"zoomLevel\":%s,\"dateRanges\":%d,\"cacheHitDays\":%d,\"dbQueryDays\":%d," + + "\"totalTracks\":%d,\"totalPointsBefore\":%d,\"totalPointsAfter\":%d," + + "\"pointReductionPct\":%d,\"totalBatches\":%d,\"batchesBeforeSimplify\":%d," + + "\"simplifyTimeMs\":%d,\"dbQueryTimeMs\":%d,\"totalElapsedMs\":%d," + + "\"dbConnectionTotal\":%d,\"dbConnViewportPass1\":%d,\"dbConnDailyPages\":%d," + + "\"dbConnVesselInfo\":%d,\"dbConnHourly5min\":%d,\"dbConnTableCheck\":%d," + + "\"uniqueVessels\":%d}", + queryId, + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), + determinePath(), + zoomLevel != null ? zoomLevel.toString() : "null", + cacheHitDays + dbQueryDays, + cacheHitDays, dbQueryDays, + totalTracks, totalPointsBefore, totalPointsAfter, + totalPointsBefore > 0 ? Math.round((1 - (double) totalPointsAfter / totalPointsBefore) * 100) : 0, + totalBatches, batchesBeforeSimplify, + simplifyTimeMs, dbQueryTimeMs, totalElapsedMs, + dbConnectionTotal(), connViewportPass1, connDailyPages, + connVesselInfo, connHourly5min, connTableCheck, + uniqueVessels); + } + } + /** * 테이블 범위 처리 - LineStringM을 압축된 배열로 변환 */ @@ -269,7 +329,11 @@ public class ChunkedTrackStreamingService { * 선박 정보 배치 프리로드 - 세션 캐시 사용 버전 * 순서: 세션 캐시 → 전역 캐시 → DB 조회 */ - private void preloadVesselInfoWithSessionCache(Set vesselIds, Map sessionCache) { + private void preloadVesselInfoWithSessionCache(Set vesselIds, Map sessionCache) { // [BENCHMARK] benchmark 파라미터 오버로드 아래 참조 + preloadVesselInfoWithSessionCache(vesselIds, sessionCache, null); + } + + private void preloadVesselInfoWithSessionCache(Set vesselIds, Map sessionCache, QueryBenchmark benchmark) { // [BENCHMARK] // 세션 캐시와 전역 캐시에 없는 선박만 필터링 List uncachedIds = vesselIds.stream() .filter(id -> { @@ -324,6 +388,7 @@ public class ChunkedTrackStreamingService { vesselInfoCache.put(visselId, info); foundIds.add(visselId); }, batch.toArray()); + if (benchmark != null) benchmark.connVesselInfo++; // [BENCHMARK] // DB에 없는 선박은 기본값으로 세션 캐시에 저장 (전역 캐시에는 저장 안함) for (String visselId : batch) { @@ -414,7 +479,11 @@ public class ChunkedTrackStreamingService { * 전체 쿼리 시간 범위에서 뷰포트 영역을 지나는 모든 선박을 식별하여, * Pass 2에서 해당 선박의 전체 항적(뷰포트 밖 포함)을 조회할 수 있도록 함 */ - private Set collectViewportVesselIds(Map> strategyMap, TrackQueryRequest request) { + private Set collectViewportVesselIds(Map> strategyMap, TrackQueryRequest request) { // [BENCHMARK] benchmark 파라미터 오버로드 아래 참조 + return collectViewportVesselIds(strategyMap, request, null); + } + + private Set collectViewportVesselIds(Map> strategyMap, TrackQueryRequest request, QueryBenchmark benchmark) { // [BENCHMARK] ViewportFilter viewport = request.getViewport(); if (viewport == null || !viewport.isValid()) { return null; @@ -473,6 +542,7 @@ public class ChunkedTrackStreamingService { vesselIds.add(rs.getString("sig_src_cd") + "_" + rs.getString("target_id")); } } + if (benchmark != null) benchmark.connViewportPass1++; // [BENCHMARK] } catch (SQLException e) { log.error("Error collecting viewport vessel IDs from {}: {}", tableName, e.getMessage()); } @@ -935,6 +1005,10 @@ public class ChunkedTrackStreamingService { queryStartTime = System.currentTimeMillis(); processedTimeRanges.clear(); + // [BENCHMARK] 벤치마크 지표 초기화 + QueryBenchmark benchmark = new QueryBenchmark(); + benchmark.zoomLevel = request.getZoomLevel(); + // 백프레셔 메트릭스 초기화 BackpressureMetrics metrics = new BackpressureMetrics(); queryMetrics.put(queryId, metrics); @@ -945,7 +1019,7 @@ public class ChunkedTrackStreamingService { // 시간 범위별 테이블 전략 분할 Map> strategyMap = splitTimeRangeByStrategy( - request.getStartTime(), request.getEndTime() + request.getStartTime(), request.getEndTime(), benchmark // [BENCHMARK] ); // 전체 시간 계산 @@ -953,7 +1027,7 @@ public class ChunkedTrackStreamingService { log.info("Total time range: {} minutes", estimatedTotalMinutes); // 2-pass 뷰포트 필터링: 전체 시간 범위에서 뷰포트 교차 선박 수집 - Set viewportVesselIds = collectViewportVesselIds(strategyMap, request); + Set viewportVesselIds = collectViewportVesselIds(strategyMap, request, benchmark); // [BENCHMARK] int globalChunkIndex = 0; int totalVessels = 0; @@ -976,7 +1050,7 @@ public class ChunkedTrackStreamingService { if (strategy == TableStrategy.DAILY) { // Daily는 기존 방식 유지 (이미 일 단위) processDailyStrategy(ranges, request, queryId, chunkConsumer, statusConsumer, - globalChunkIndex, uniqueVesselIds, viewportVesselIds); + globalChunkIndex, uniqueVesselIds, viewportVesselIds, benchmark); globalChunkIndex = getCurrentChunkIndex(); } else { // Hourly/5min은 6시간 단위로 그룹화하여 처리 @@ -1000,6 +1074,7 @@ public class ChunkedTrackStreamingService { List compactTracks = processTableRangeWithBaseTime( request, strategy, range, baseTime, viewportVesselIds); + if (benchmark != null) benchmark.connHourly5min++; // [BENCHMARK] // 선박별로 볕합 for (CompactVesselTrack track : compactTracks) { @@ -1217,6 +1292,15 @@ public class ChunkedTrackStreamingService { chunkConsumer.accept(lastChunkMarker); } + // [BENCHMARK] 벤치마크 JSON 기록 + long totalElapsedMs = System.currentTimeMillis() - queryStartTime; + benchmark.totalBatches = globalChunkIndex; + try { + benchmarkLog.info(benchmark.toJson(queryId, uniqueVesselIds.size(), totalElapsedMs)); + } catch (Exception e) { + log.debug("Failed to write benchmark log: {}", e.getMessage()); + } + log.info("Query {} completed: {} chunks, {} unique vessels", queryId, globalChunkIndex, uniqueVesselIds.size()); @@ -1390,7 +1474,11 @@ public class ChunkedTrackStreamingService { /** * 시간 범위별 테이블 선택 로직 (StompTrackStreamingService 참고) */ - private Map> splitTimeRangeByStrategy(LocalDateTime start, LocalDateTime end) { + private Map> splitTimeRangeByStrategy(LocalDateTime start, LocalDateTime end) { // [BENCHMARK] benchmark 파라미터 오버로드 아래 참조 + return splitTimeRangeByStrategy(start, end, null); + } + + private Map> splitTimeRangeByStrategy(LocalDateTime start, LocalDateTime end, QueryBenchmark benchmark) { // [BENCHMARK] Map> strategyMap = new LinkedHashMap<>(); LocalDateTime now = LocalDateTime.now(); @@ -1456,6 +1544,7 @@ public class ChunkedTrackStreamingService { // 각 일별 범위에 대해 데이터 존재 여부 확인 for (TimeRange dailyRange : mergedDailyRanges) { + if (benchmark != null) benchmark.connTableCheck++; // [BENCHMARK] if (hasDataInTable(TableStrategy.DAILY.getTableName(), dailyRange.getStart(), dailyRange.getEnd())) { validDailyRanges.add(dailyRange); log.debug("Daily data found for range: {}", dailyRange); @@ -1482,6 +1571,7 @@ public class ChunkedTrackStreamingService { List fallbackTo5Min = new ArrayList<>(); for (TimeRange range : hourlyRanges) { + if (benchmark != null) benchmark.connTableCheck++; // [BENCHMARK] if (hasDataInTable(TableStrategy.HOURLY.getTableName(), range.getStart(), range.getEnd())) { validHourlyRanges.add(range); } else { @@ -2403,7 +2493,8 @@ public class ChunkedTrackStreamingService { Consumer statusConsumer, int startChunkIndex, Set uniqueVesselIds, - Set viewportVesselIds) throws Exception { + Set viewportVesselIds, + QueryBenchmark benchmark) throws Exception { currentGlobalChunkIndex = startChunkIndex; @@ -2436,21 +2527,58 @@ public class ChunkedTrackStreamingService { } if (!cachedTracks.isEmpty()) { + // 간소화 전 지표 측정 + int preTrackCount = cachedTracks.size(); + int prePointCount = cachedTracks.stream() + .mapToInt(t -> t.getGeometry() != null ? t.getGeometry().size() : 0).sum(); + List> preBatches = splitByMessageSize(cachedTracks); + int preBatchCount = preBatches.size(); + + // 방어적 복사 후 간소화 (캐시 원본 보호) + long simplifyStart = System.currentTimeMillis(); + cachedTracks = cachedTracks.stream() + .map(t -> t.toBuilder().build()) + .collect(Collectors.toList()); + cachedTracks = cacheTrackSimplifier.simplify(cachedTracks, request.getZoomLevel()); + long simplifyElapsed = System.currentTimeMillis() - simplifyStart; + + // 간소화 후 지표 측정 + int postPointCount = cachedTracks.stream() + .mapToInt(t -> t.getGeometry() != null ? t.getGeometry().size() : 0).sum(); // 메시지 크기로 분할하여 전송 List> batches = splitByMessageSize(cachedTracks); for (List batch : batches) { sendChunkResponse(batch, queryId, chunkConsumer, uniqueVesselIds); } - log.info("Daily cache served {} tracks for {} in {} batches", - cachedTracks.size(), rangeDate, batches.size()); + + log.info("[CacheHIT] date={}, zoom={}, tracks={}, points: {} -> {} ({}% 감소), batches: {} -> {} ({}% 감소), simplify: {}ms", + rangeDate, request.getZoomLevel(), preTrackCount, + prePointCount, postPointCount, + prePointCount > 0 ? Math.round((1 - (double) postPointCount / prePointCount) * 100) : 0, + preBatchCount, batches.size(), + preBatchCount > 0 ? Math.round((1 - (double) batches.size() / preBatchCount) * 100) : 0, + simplifyElapsed); + + // [BENCHMARK] 벤치마크 누적 (캐시 경로) + if (benchmark != null) { + benchmark.cacheHitDays++; + benchmark.totalTracks += preTrackCount; + benchmark.totalPointsBefore += prePointCount; + benchmark.totalPointsAfter += postPointCount; + benchmark.batchesBeforeSimplify += preBatchCount; + benchmark.simplifyTimeMs += simplifyElapsed; + } } } else { // 캐시 미스: 기존 DB 페이지네이션으로 처리 if (dailyTrackCacheManager.isEnabled()) { log.info("Daily cache MISS for {}: falling back to DB", rangeDate); } - streamDailyTableWithPagination(request, range, queryId, chunkConsumer, statusConsumer, uniqueVesselIds, sessionVesselCache, viewportVesselIds); + if (benchmark != null) { + benchmark.dbQueryDays++; // [BENCHMARK] + } + streamDailyTableWithPagination(request, range, queryId, chunkConsumer, statusConsumer, uniqueVesselIds, sessionVesselCache, viewportVesselIds, benchmark); } } @@ -2489,7 +2617,8 @@ public class ChunkedTrackStreamingService { Consumer statusConsumer, Set uniqueVesselIds, Map sessionVesselCache, - Set viewportVesselIds) { + Set viewportVesselIds, + QueryBenchmark benchmark) { String tableName = TableStrategy.DAILY.getTableName(); // 줌 레벨에 따른 강화된 간소화 tolerance @@ -2548,7 +2677,14 @@ public class ChunkedTrackStreamingService { long queryStartTime = System.currentTimeMillis(); try (ResultSet rs = ps.executeQuery()) { long queryEndTime = System.currentTimeMillis(); - log.info("Daily page {} query executed in {}ms", pageNum + 1, queryEndTime - queryStartTime); + long pageQueryTime = queryEndTime - queryStartTime; + log.info("Daily page {} query executed in {}ms", pageNum + 1, pageQueryTime); + + // [BENCHMARK] DB 쿼리 시간 누적 + if (benchmark != null) { + benchmark.dbQueryTimeMs += pageQueryTime; + benchmark.connDailyPages++; // [BENCHMARK] + } int pageTrackCount = 0; String currentSigSrcCd = null; @@ -2582,7 +2718,7 @@ public class ChunkedTrackStreamingService { // 2단계: 선박 정보 배치 조회 (세션 캐시 → 전역 캐시 → DB 순서) long batchStartTime = System.currentTimeMillis(); - preloadVesselInfoWithSessionCache(vesselIdsInPage, sessionVesselCache); + preloadVesselInfoWithSessionCache(vesselIdsInPage, sessionVesselCache, benchmark); // [BENCHMARK] long batchEndTime = System.currentTimeMillis(); log.info("Daily page {} vessel info batch loaded in {}ms (session cache size: {})", pageNum + 1, batchEndTime - batchStartTime, sessionVesselCache.size()); @@ -2701,6 +2837,15 @@ public class ChunkedTrackStreamingService { totalVesselsSent += pageTracks.size(); + // [BENCHMARK] DB 경로 포인트 수 누적 + if (benchmark != null) { + int pagePoints = pageTracks.stream() + .mapToInt(t -> t.getPointCount() != null ? t.getPointCount() : 0).sum(); + benchmark.totalTracks += pageTracks.size(); + benchmark.totalPointsBefore += pagePoints; + benchmark.totalPointsAfter += pagePoints; // DB 경로는 ST_Simplify 적용 후이므로 동일 + } + // 메시지 크기로 분할하여 전송 List> batches = splitByMessageSize(pageTracks); for (List batch : batches) { diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/DailyTrackCacheManager.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/DailyTrackCacheManager.java index 34295de..72caae2 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/DailyTrackCacheManager.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/DailyTrackCacheManager.java @@ -2,6 +2,7 @@ package gc.mda.signal_batch.global.websocket.service; import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack; import gc.mda.signal_batch.global.config.DailyTrackCacheProperties; +import gc.mda.signal_batch.global.util.NationalCodeUtil; import gc.mda.signal_batch.global.util.ShipKindCodeConverter; import gc.mda.signal_batch.domain.vessel.service.IntegrationVesselService; import gc.mda.signal_batch.domain.vessel.dto.IntegrationVessel; @@ -310,6 +311,9 @@ public class DailyTrackCacheManager { return null; } + // 선박 정보 일괄 보강 (t_vessel_latest_position에서 shipName, shipType 조회) + enrichVesselInfo(vesselMap); + // VesselAccumulator → CompactVesselTrack 변환 Map tracks = new HashMap<>(vesselMap.size()); for (Map.Entry entry : vesselMap.entrySet()) { @@ -318,9 +322,13 @@ public class DailyTrackCacheManager { double avgSpeed = acc.pointCount > 0 ? acc.totalDistance / Math.max(1, acc.pointCount) * 60 : 0; - // shipKindCode 계산 + // shipKindCode 계산 (선박명 패턴 매칭 포함) String shipKindCode = ShipKindCodeConverter.getShipKindCodeWithNamePattern( - acc.sigSrcCd, null, null, acc.targetId); + acc.sigSrcCd, acc.shipType, acc.shipName, acc.targetId); + + // nationalCode 계산 + String nationalCode = NationalCodeUtil.calculateNationalCode( + acc.sigSrcCd, acc.targetId); // 통합선박 ID 조회 String integrationTargetId = null; @@ -335,6 +343,9 @@ public class DailyTrackCacheManager { .vesselId(entry.getKey()) .sigSrcCd(acc.sigSrcCd) .targetId(acc.targetId) + .nationalCode(nationalCode) + .shipName(acc.shipName) + .shipType(acc.shipType) .shipKindCode(shipKindCode) .integrationTargetId(integrationTargetId) .geometry(acc.geometry) @@ -632,6 +643,49 @@ public class DailyTrackCacheManager { return false; } + /** + * 선박 정보 일괄 보강 (t_vessel_latest_position에서 ship_nm, ship_ty 조회) + * IN 절 1000건 배치로 처리 + */ + private void enrichVesselInfo(Map vesselMap) { + List vesselIds = new ArrayList<>(vesselMap.keySet()); + int batchSize = 1000; + int enriched = 0; + + for (int i = 0; i < vesselIds.size(); i += batchSize) { + List batch = vesselIds.subList(i, Math.min(i + batchSize, vesselIds.size())); + + try (Connection conn = queryDataSource.getConnection()) { + String placeholders = batch.stream().map(id -> "?").collect(Collectors.joining(",")); + String sql = "SELECT sig_src_cd, target_id, ship_nm, ship_ty " + + "FROM signal.t_vessel_latest_position " + + "WHERE sig_src_cd || '_' || target_id IN (" + placeholders + ")"; + + try (PreparedStatement ps = conn.prepareStatement(sql)) { + for (int j = 0; j < batch.size(); j++) { + ps.setString(j + 1, batch.get(j)); + } + + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + String vesselId = rs.getString("sig_src_cd") + "_" + rs.getString("target_id"); + VesselAccumulator acc = vesselMap.get(vesselId); + if (acc != null) { + acc.shipName = rs.getString("ship_nm"); + acc.shipType = rs.getString("ship_ty"); + enriched++; + } + } + } + } + } catch (Exception e) { + log.warn("Failed to enrich vessel info batch {}-{}: {}", i, i + batch.size(), e.getMessage()); + } + } + + log.info("Enriched vessel info: {}/{} vessels", enriched, vesselMap.size()); + } + /** * 항적 맵에서 STRtree 공간 인덱스 빌드 */ @@ -661,6 +715,8 @@ public class DailyTrackCacheManager { private static class VesselAccumulator { String sigSrcCd; String targetId; + String shipName; + String shipType; List geometry = new ArrayList<>(500); List timestamps = new ArrayList<>(500); List speeds = new ArrayList<>(500); diff --git a/src/main/resources/application-prod-mpr.yml b/src/main/resources/application-prod-mpr.yml index ff1ea83..e35ba5c 100644 --- a/src/main/resources/application-prod-mpr.yml +++ b/src/main/resources/application-prod-mpr.yml @@ -23,8 +23,8 @@ spring: connection-timeout: 30000 # 원격 연결이므로 타임아웃 증가 idle-timeout: 600000 max-lifetime: 1800000 - maximum-pool-size: 20 # 10 -> 20 증가 - minimum-idle: 5 # 2 -> 5 증가 + maximum-pool-size: 30 # 10 -> 30 증가 + minimum-idle: 10 # 2 -> 10 증가 # 원격 연결 안정성을 위한 추가 설정 connection-test-query: SELECT 1 validation-timeout: 5000 @@ -42,8 +42,8 @@ spring: connection-timeout: 5000 idle-timeout: 600000 max-lifetime: 1800000 - maximum-pool-size: 60 # 20 -> 40 증가 - minimum-idle: 10 # 5 -> 10 증가 + maximum-pool-size: 90 # 20 -> 90 증가 + minimum-idle: 15 # 5 -> 15 증가 connection-test-query: SELECT 1 validation-timeout: 5000 leak-detection-threshold: 60000 # 커넥션 누수 감지 (60초) @@ -262,6 +262,34 @@ vessel: # spring 하위가 아닌 최상위 레벨 t_abnormal_tracks: retention-months: 0 # 비정상 항적: 무한 보관 +# 일일 항적 데이터 인메모리 캐시 +cache: + daily-track: + enabled: true + retention-days: 7 # D-1 ~ D-7 (오늘 제외) + max-memory-gb: 6 # 최대 6GB + warmup-async: true # 비동기 워밍업 (배치 Job과 경합 방지) + +# WebSocket 부하 제어 설정 +websocket: + query: + max-concurrent-global: 40 # 배치 서버이므로 동시 쿼리 제한 (prod 60 대비 보수적) + max-per-session: 15 # 세션당 동시 쿼리 상한 + queue-timeout-seconds: 30 # 글로벌 대기 큐 타임아웃 + session: + idle-timeout-ms: 15000 + server-heartbeat-ms: 5000 + client-heartbeat-ms: 5000 + sockjs-disconnect-delay-ms: 5000 + send-time-limit-seconds: 30 + +# REST V2 부하 제어 설정 +rest: + v2: + query: + timeout-seconds: 30 # 슬롯 대기 타임아웃 (초) + max-total-points: 500000 # 응답 총 포인트 상한 + # 액추에이터 설정 management: endpoints: diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index d3411c2..92ddbb1 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -288,6 +288,13 @@ websocket: sockjs-disconnect-delay-ms: 5000 # SockJS 해제 지연 5초 (30s → 5s) send-time-limit-seconds: 30 # 메시지 전송 시간 제한 30초 (120s → 30s) +# REST V2 부하 제어 설정 +rest: + v2: + query: + timeout-seconds: 30 # 슬롯 대기 타임아웃 (초) + max-total-points: 500000 # 응답 총 포인트 상한 + # 액추에이터 설정 management: endpoints: diff --git a/src/main/resources/application-query.yml b/src/main/resources/application-query.yml index 0803bb3..799dbf1 100644 --- a/src/main/resources/application-query.yml +++ b/src/main/resources/application-query.yml @@ -89,6 +89,34 @@ vessel: page-size: 5000 partition-size: 12 +# 일일 항적 데이터 인메모리 캐시 (조회 전용 서버의 핵심 성능 설정) +cache: + daily-track: + enabled: true + retention-days: 7 # D-1 ~ D-7 (오늘 제외) + max-memory-gb: 6 # 최대 6GB (조회 전용이므로 배치 없이 메모리 여유) + warmup-async: true # 비동기 워밍업 (서버 시작 차단 없음) + +# WebSocket 부하 제어 설정 +websocket: + query: + max-concurrent-global: 40 # 조회 전용 서버 (배치 없으므로 prod-mpr보다 여유) + max-per-session: 15 # 세션당 동시 쿼리 상한 + queue-timeout-seconds: 30 # 글로벌 대기 큐 타임아웃 + session: + idle-timeout-ms: 15000 + server-heartbeat-ms: 5000 + client-heartbeat-ms: 5000 + sockjs-disconnect-delay-ms: 5000 + send-time-limit-seconds: 30 + +# REST V2 부하 제어 설정 +rest: + v2: + query: + timeout-seconds: 30 # 슬롯 대기 타임아웃 (초) + max-total-points: 500000 # 응답 총 포인트 상한 + # 액추에이터 설정 management: endpoints: diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml index 3166e05..d194e31 100644 --- a/src/main/resources/logback-spring.xml +++ b/src/main/resources/logback-spring.xml @@ -64,6 +64,25 @@ + + + + ${LOG_PATH}/cache-benchmark.log + + ${LOG_PATH}/cache-benchmark.log.%d{yyyy-MM-dd}.%i.gz + 200MB + 30 + 5GB + + + %d{yyyy-MM-dd HH:mm:ss.SSS} - %msg%n + + + + + + + diff --git a/src/test/java/gc/mda/signal_batch/SignalBatchApplicationTests.java b/src/test/java/gc/mda/signal_batch/SignalBatchApplicationTests.java index f07d62b..b210404 100644 --- a/src/test/java/gc/mda/signal_batch/SignalBatchApplicationTests.java +++ b/src/test/java/gc/mda/signal_batch/SignalBatchApplicationTests.java @@ -1,5 +1,6 @@ package gc.mda.signal_batch; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; @@ -15,6 +16,7 @@ import gc.mda.signal_batch.global.websocket.interceptor.TrackQueryInterceptor; "scheduler.enabled=false", "spring.websocket.enabled=false" }) +@Disabled("원격 DB 연결 필요 - 로컬 빌드 시 스킵") @ActiveProfiles("test") class SignalBatchApplicationTests { diff --git a/src/test/java/gc/mda/signal_batch/performance/IndexStatusTest.java b/src/test/java/gc/mda/signal_batch/performance/IndexStatusTest.java index dbc463a..69c977a 100644 --- a/src/test/java/gc/mda/signal_batch/performance/IndexStatusTest.java +++ b/src/test/java/gc/mda/signal_batch/performance/IndexStatusTest.java @@ -1,6 +1,7 @@ package gc.mda.signal_batch.performance; import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @@ -17,6 +18,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; * 실행: mvn test -Dtest=IndexStatusTest */ @Slf4j +@Disabled("원격 DB 연결 필요 - 수동 실행만 허용 (mvn test -Dtest=IndexStatusTest)") @SpringBootTest @ActiveProfiles("dev") public class IndexStatusTest { diff --git a/src/test/java/gc/mda/signal_batch/websocket/WebSocketLoadTest.java b/src/test/java/gc/mda/signal_batch/websocket/WebSocketLoadTest.java index 331a7c4..786377a 100644 --- a/src/test/java/gc/mda/signal_batch/websocket/WebSocketLoadTest.java +++ b/src/test/java/gc/mda/signal_batch/websocket/WebSocketLoadTest.java @@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.*; @Slf4j +@Disabled("부하 테스트 - 원격 서버 연결 필요, 수동 실행만 허용") @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class WebSocketLoadTest {