refactor(websocket): ChunkedTrackStreamingService 전수 최적화 #89

병합
htlee feature/websocket-replay-optimization 에서 develop 로 2 commits 를 머지했습니다 2026-03-02 15:34:41 +09:00
6개의 변경된 파일469개의 추가작업 그리고 608개의 파일을 삭제

파일 보기

@ -7,6 +7,7 @@
### 변경
- SignalKindCode 매핑 규칙 개선 — aton/tug/tender→DEFAULT, shipName BUOY 검출 추가
- 응답 경로 signal_kind_code 치환 1회화 — 캐시 저장 시 치환, 응답 시 DB/캐시 값 직접 사용
- ChunkedTrackStreamingService 전수 최적화 — isQueryCancelled 버그수정, QueryContext 스레드 안전성, 쿼리 메트릭 DB 저장, 데드코드 400줄 삭제, VesselInfo N+1 해소
## [2026-03-02]

파일 보기

@ -316,4 +316,11 @@ public class ActiveQueryManager {
public int getMaxConcurrentGlobal() {
return maxConcurrentGlobal;
}
/**
* 대기열 타임아웃 ()
*/
public int getQueueTimeoutSeconds() {
return queueTimeoutSeconds;
}
}

파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다. Load Diff

파일 보기

@ -0,0 +1,40 @@
package gc.mda.signal_batch.monitoring.controller;
import gc.mda.signal_batch.monitoring.service.QueryMetricsService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
/**
* 쿼리 메트릭 조회 API
*
* WebSocket/REST 쿼리 실행 이력 성능 통계를 제공한다.
* ApiMetrics 프론트엔드 페이지의 데이터 소스.
*/
@RestController
@RequestMapping("/api/monitoring/query-metrics")
@RequiredArgsConstructor
@Tag(name = "Query Metrics", description = "쿼리 실행 메트릭 조회 API")
public class QueryMetricsController {
private final QueryMetricsService queryMetricsService;
@GetMapping
@Operation(summary = "최근 쿼리 메트릭 조회", description = "최근 N건의 쿼리 실행 메트릭을 조회합니다")
public ResponseEntity<List<Map<String, Object>>> getRecentMetrics(
@RequestParam(defaultValue = "50") int limit) {
return ResponseEntity.ok(queryMetricsService.getRecentMetrics(Math.min(limit, 200)));
}
@GetMapping("/stats")
@Operation(summary = "쿼리 메트릭 통계", description = "기간별 쿼리 성능 통계 (평균 응답시간, 캐시 비율, 느린 쿼리 등)")
public ResponseEntity<Map<String, Object>> getStats(
@RequestParam(defaultValue = "7") int days) {
return ResponseEntity.ok(queryMetricsService.getStats(Math.min(days, 90)));
}
}

파일 보기

@ -0,0 +1,180 @@
package gc.mda.signal_batch.monitoring.service;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* 쿼리 실행 메트릭 DB 저장/조회 서비스
*
* WebSocket 리플레이 REST API 쿼리의 성능 메트릭을 signal.t_query_metrics에 저장.
* streamChunkedTracks() finally 블록에서 비동기 INSERT 호출하여 응답 지연 없이 기록.
*/
@Slf4j
@Service
public class QueryMetricsService {
private final JdbcTemplate queryJdbcTemplate;
private static final String INSERT_SQL = """
INSERT INTO signal.t_query_metrics (
query_id, session_id, query_type, created_at,
start_time, end_time, zoom_level, viewport_bounds, requested_mmsi,
data_path, cache_hit_days, db_query_days, db_conn_total,
unique_vessels, total_tracks, total_points, points_after_simplify,
total_chunks, response_bytes,
elapsed_ms, db_query_ms, simplify_ms, backpressure_events,
status
) VALUES (
?, ?, ?, now(),
?, ?, ?, ?, ?,
?, ?, ?, ?,
?, ?, ?, ?,
?, ?,
?, ?, ?, ?,
?
)
""";
public QueryMetricsService(@Qualifier("queryJdbcTemplate") JdbcTemplate queryJdbcTemplate) {
this.queryJdbcTemplate = queryJdbcTemplate;
}
/**
* 쿼리 메트릭 비동기 저장 쿼리 응답에 영향 없음
*/
@Async("trackStreamingExecutor")
public void saveAsync(QueryMetric metric) {
try {
queryJdbcTemplate.update(INSERT_SQL,
metric.queryId, metric.sessionId, metric.queryType,
metric.startTime != null ? Timestamp.valueOf(metric.startTime) : null,
metric.endTime != null ? Timestamp.valueOf(metric.endTime) : null,
metric.zoomLevel, metric.viewportBounds, metric.requestedMmsi,
metric.dataPath, metric.cacheHitDays, metric.dbQueryDays, metric.dbConnTotal,
metric.uniqueVessels, metric.totalTracks, metric.totalPoints, metric.pointsAfterSimplify,
metric.totalChunks, metric.responseBytes,
metric.elapsedMs, metric.dbQueryMs, metric.simplifyMs, metric.backpressureEvents,
metric.status
);
log.debug("Query metric saved: queryId={}, elapsed={}ms, status={}",
metric.queryId, metric.elapsedMs, metric.status);
} catch (Exception e) {
log.warn("Failed to save query metric: queryId={}, error={}", metric.queryId, e.getMessage());
}
}
/**
* 최근 쿼리 메트릭 조회
*/
public List<Map<String, Object>> getRecentMetrics(int limit) {
return queryJdbcTemplate.queryForList("""
SELECT query_id, session_id, query_type, created_at,
start_time, end_time, zoom_level, viewport_bounds,
data_path, cache_hit_days, db_query_days, db_conn_total,
unique_vessels, total_tracks, total_points, points_after_simplify,
total_chunks, response_bytes,
elapsed_ms, db_query_ms, simplify_ms, backpressure_events, status
FROM signal.t_query_metrics
ORDER BY created_at DESC
LIMIT ?
""", limit);
}
/**
* 기간별 쿼리 메트릭 통계
*/
public Map<String, Object> getStats(int days) {
Map<String, Object> stats = new LinkedHashMap<>();
// 전체 통계
Map<String, Object> summary = queryJdbcTemplate.queryForMap("""
SELECT
COUNT(*) AS total_queries,
ROUND(AVG(elapsed_ms)) AS avg_elapsed_ms,
MAX(elapsed_ms) AS max_elapsed_ms,
ROUND(AVG(unique_vessels)) AS avg_vessels,
ROUND(AVG(total_points)) AS avg_points,
SUM(CASE WHEN data_path = 'CACHE' THEN 1 ELSE 0 END) AS cache_only,
SUM(CASE WHEN data_path = 'HYBRID' THEN 1 ELSE 0 END) AS hybrid,
SUM(CASE WHEN data_path = 'DB' THEN 1 ELSE 0 END) AS db_only,
SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) AS completed,
SUM(CASE WHEN status = 'CANCELLED' THEN 1 ELSE 0 END) AS cancelled,
SUM(CASE WHEN status = 'ERROR' THEN 1 ELSE 0 END) AS errors,
SUM(CASE WHEN status = 'TIMEOUT' THEN 1 ELSE 0 END) AS timeouts
FROM signal.t_query_metrics
WHERE created_at >= now() - INTERVAL '%d days'
""".formatted(days));
stats.put("summary", summary);
// 일별 추이
List<Map<String, Object>> daily = queryJdbcTemplate.queryForList("""
SELECT
DATE(created_at) AS date,
COUNT(*) AS query_count,
ROUND(AVG(elapsed_ms)) AS avg_elapsed_ms,
ROUND(AVG(unique_vessels)) AS avg_vessels,
SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) AS completed,
SUM(CASE WHEN status != 'COMPLETED' THEN 1 ELSE 0 END) AS failed
FROM signal.t_query_metrics
WHERE created_at >= now() - INTERVAL '%d days'
GROUP BY DATE(created_at)
ORDER BY date DESC
""".formatted(days));
stats.put("dailyTrend", daily);
// 느린 쿼리 TOP 10
List<Map<String, Object>> slowQueries = queryJdbcTemplate.queryForList("""
SELECT query_id, created_at, elapsed_ms, unique_vessels, total_points,
data_path, db_conn_total, zoom_level, status
FROM signal.t_query_metrics
WHERE created_at >= now() - INTERVAL '%d days'
ORDER BY elapsed_ms DESC
LIMIT 10
""".formatted(days));
stats.put("slowQueries", slowQueries);
return stats;
}
/**
* 쿼리 메트릭 데이터 클래스
*/
@Getter
@Builder
public static class QueryMetric {
private final String queryId;
private final String sessionId;
private final String queryType;
private final LocalDateTime startTime;
private final LocalDateTime endTime;
private final Integer zoomLevel;
private final String viewportBounds;
private final int requestedMmsi;
private final String dataPath;
private final int cacheHitDays;
private final int dbQueryDays;
private final int dbConnTotal;
private final int uniqueVessels;
private final int totalTracks;
private final int totalPoints;
private final int pointsAfterSimplify;
private final int totalChunks;
private final long responseBytes;
private final long elapsedMs;
private final long dbQueryMs;
private final long simplifyMs;
private final int backpressureEvents;
private final String status;
}
}

파일 보기

@ -0,0 +1,42 @@
-- 쿼리 실행 메트릭 테이블
-- WebSocket/REST 쿼리의 성능 지표를 기록하여 ApiMetrics 페이지에서 조회
CREATE TABLE IF NOT EXISTS signal.t_query_metrics (
id BIGSERIAL PRIMARY KEY,
query_id VARCHAR(64) NOT NULL,
session_id VARCHAR(64),
query_type VARCHAR(20) NOT NULL, -- 'WEBSOCKET' | 'REST_V1' | 'REST_V2'
created_at TIMESTAMP NOT NULL DEFAULT now(),
-- 요청 파라미터
start_time TIMESTAMP,
end_time TIMESTAMP,
zoom_level INTEGER,
viewport_bounds VARCHAR(200), -- "minLon,minLat,maxLon,maxLat"
requested_mmsi INTEGER DEFAULT 0,
-- 처리 경로
data_path VARCHAR(10), -- 'CACHE' | 'DB' | 'HYBRID'
cache_hit_days INTEGER DEFAULT 0,
db_query_days INTEGER DEFAULT 0,
db_conn_total INTEGER DEFAULT 0,
-- 결과 통계
unique_vessels INTEGER DEFAULT 0,
total_tracks INTEGER DEFAULT 0,
total_points INTEGER DEFAULT 0,
points_after_simplify INTEGER DEFAULT 0,
total_chunks INTEGER DEFAULT 0,
response_bytes BIGINT DEFAULT 0,
-- 성능
elapsed_ms BIGINT DEFAULT 0,
db_query_ms BIGINT DEFAULT 0,
simplify_ms BIGINT DEFAULT 0,
backpressure_events INTEGER DEFAULT 0,
-- 결과 상태
status VARCHAR(20) DEFAULT 'COMPLETED' -- 'COMPLETED' | 'CANCELLED' | 'ERROR' | 'TIMEOUT'
);
CREATE INDEX IF NOT EXISTS idx_query_metrics_created ON signal.t_query_metrics(created_at);
CREATE INDEX IF NOT EXISTS idx_query_metrics_type ON signal.t_query_metrics(query_type, created_at);