From 0cdb46d0637d9825905f940f6bfe29a16518f046 Mon Sep 17 00:00:00 2001 From: htlee Date: Thu, 19 Feb 2026 20:24:28 +0900 Subject: [PATCH] =?UTF-8?q?perf:=20API=20=EC=9D=91=EB=8B=B5=20=EC=B5=9C?= =?UTF-8?q?=EC=A0=81=ED=99=94=20+=20=EC=A0=90=EC=A7=84=EC=A0=81=20?= =?UTF-8?q?=EB=A0=8C=EB=8D=94=EB=A7=81=20+=20=ED=95=B4=EA=B5=AC=20chorople?= =?UTF-8?q?th=20=EC=A7=80=EB=8F=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 백엔드: - haegu/realtime: DB 공간 JOIN(12s) → 인메모리 캐시 순회(~50ms) - batch/statistics: N+1 JobExplorer(1.1s) → 단일 SQL 집계(~100ms) - batch/daily-stats: N+1×7일(9s) → 직접 SQL 2쿼리(~200ms) - throughput: pg_total_relation_size 매번 호출(1.4s) → Caffeine 5분 캐시 - quality: 풀스캔(0.6s) → 24시간 범위 제한 프론트엔드: - Promise.allSettled 차단 → 개별 .then() 점진적 렌더링 - useCachedState 훅: 페이지 전환 시 이전 데이터 즉시 표시 - AreaStats: 해구 폴리곤 choropleth 지도 + 선박수 범례 추가 Co-Authored-By: Claude Opus 4.6 --- frontend/src/api/gisApi.ts | 4 +- frontend/src/hooks/useCachedState.ts | 26 ++ frontend/src/i18n/en.ts | 3 + frontend/src/i18n/ko.ts | 3 + frontend/src/pages/AbnormalTracks.tsx | 15 +- frontend/src/pages/ApiMetrics.tsx | 26 +- frontend/src/pages/AreaStats.tsx | 191 ++++++++++++- frontend/src/pages/Dashboard.tsx | 35 +-- frontend/src/pages/DataPipeline.tsx | 36 +-- .../controller/BatchAdminController.java | 269 ++++++++---------- .../controller/MonitoringController.java | 182 +++++++++--- 11 files changed, 524 insertions(+), 266 deletions(-) create mode 100644 frontend/src/hooks/useCachedState.ts diff --git a/frontend/src/api/gisApi.ts b/frontend/src/api/gisApi.ts index 0ab2d98..63ae787 100644 --- a/frontend/src/api/gisApi.ts +++ b/frontend/src/api/gisApi.ts @@ -2,8 +2,8 @@ import { fetchJson, postJson } from './httpClient.ts' export interface HaeguBoundary { haegu_no: number - haegu_name: string - boundary_wkt: string + area_name: string | null + geom_json: string center_lon: number center_lat: number } diff --git a/frontend/src/hooks/useCachedState.ts b/frontend/src/hooks/useCachedState.ts new file mode 100644 index 0000000..e7dc13b --- /dev/null +++ b/frontend/src/hooks/useCachedState.ts @@ -0,0 +1,26 @@ +import { useState, useCallback } from 'react' + +/** + * 모듈 레벨 캐시 — React 컴포넌트 라이프사이클 밖에 유지 + * 페이지 이동 후 복귀 시 이전 데이터를 즉시 복원 + */ +const cache = new Map() + +/** + * useState + 모듈 레벨 캐시 + * - 초기값: 캐시에 데이터가 있으면 캐시 값, 없으면 fallback + * - setState 시 캐시도 함께 갱신 + */ +export function useCachedState(key: string, fallback: T): [T, (v: T) => void] { + const [value, setValue] = useState(() => { + const cached = cache.get(key) + return cached !== undefined ? (cached as T) : fallback + }) + + const set = useCallback((v: T) => { + cache.set(key, v) + setValue(v) + }, [key]) + + return [value, set] +} diff --git a/frontend/src/i18n/en.ts b/frontend/src/i18n/en.ts index 5fee23c..719cb65 100644 --- a/frontend/src/i18n/en.ts +++ b/frontend/src/i18n/en.ts @@ -117,6 +117,9 @@ const en = { 'area.duplicates': 'Duplicate Tracks', 'area.stalePositions': 'Stale Positions', 'area.checkedAt': 'Checked at', + 'area.haeguMap': 'Vessel Distribution by Area', + 'area.mapLegend': 'Vessels', + 'area.vessels': ' vessels', // API Explorer 'explorer.title': 'API Explorer', diff --git a/frontend/src/i18n/ko.ts b/frontend/src/i18n/ko.ts index 7576d4f..cec7d0f 100644 --- a/frontend/src/i18n/ko.ts +++ b/frontend/src/i18n/ko.ts @@ -117,6 +117,9 @@ const ko = { 'area.duplicates': '중복 항적', 'area.stalePositions': '갱신 지연 위치', 'area.checkedAt': '검증 시각', + 'area.haeguMap': '해구별 선박 분포', + 'area.mapLegend': '선박 수', + 'area.vessels': '척', // API Explorer 'explorer.title': 'API 탐색기', diff --git a/frontend/src/pages/AbnormalTracks.tsx b/frontend/src/pages/AbnormalTracks.tsx index f78f5f1..7ab08ca 100644 --- a/frontend/src/pages/AbnormalTracks.tsx +++ b/frontend/src/pages/AbnormalTracks.tsx @@ -1,5 +1,6 @@ import { useState } from 'react' import { usePoller } from '../hooks/usePoller.ts' +import { useCachedState } from '../hooks/useCachedState.ts' import { useI18n } from '../hooks/useI18n.ts' import { abnormalApi } from '../api/abnormalApi.ts' import type { AbnormalTrack, AbnormalSummary } from '../api/abnormalApi.ts' @@ -24,17 +25,13 @@ const HOURS_OPTIONS = [6, 12, 24, 48, 72] export default function AbnormalTracks() { const { t } = useI18n() const [hours, setHours] = useState(24) - const [tracks, setTracks] = useState([]) - const [summary, setSummary] = useState(null) + const [tracks, setTracks] = useCachedState('abn.tracks', []) + const [summary, setSummary] = useCachedState('abn.summary', null) const [typeFilter, setTypeFilter] = useState('all') - usePoller(async () => { - const [tr, sm] = await Promise.allSettled([ - abnormalApi.getRecent(hours), - abnormalApi.getStatisticsSummary(7), - ]) - if (tr.status === 'fulfilled') setTracks(tr.value) - if (sm.status === 'fulfilled') setSummary(sm.value) + usePoller(() => { + abnormalApi.getRecent(hours).then(setTracks).catch(() => {}) + abnormalApi.getStatisticsSummary(7).then(setSummary).catch(() => {}) }, POLL_INTERVAL, [hours]) const filteredTracks = typeFilter === 'all' diff --git a/frontend/src/pages/ApiMetrics.tsx b/frontend/src/pages/ApiMetrics.tsx index 61d5e69..c114d06 100644 --- a/frontend/src/pages/ApiMetrics.tsx +++ b/frontend/src/pages/ApiMetrics.tsx @@ -1,5 +1,5 @@ -import { useState } from 'react' import { usePoller } from '../hooks/usePoller.ts' +import { useCachedState } from '../hooks/useCachedState.ts' import { useI18n } from '../hooks/useI18n.ts' import { monitorApi } from '../api/monitorApi.ts' import type { MetricsSummary, CacheStats, ProcessingDelay, CacheDetails } from '../api/types.ts' @@ -10,22 +10,16 @@ const POLL_INTERVAL = 10_000 export default function ApiMetrics() { const { t } = useI18n() - const [metrics, setMetrics] = useState(null) - const [cache, setCache] = useState(null) - const [cacheDetails, setCacheDetails] = useState(null) - const [delay, setDelay] = useState(null) + const [metrics, setMetrics] = useCachedState('api.metrics', null) + const [cache, setCache] = useCachedState('api.cache', null) + const [cacheDetails, setCacheDetails] = useCachedState('api.cacheDetail', null) + const [delay, setDelay] = useCachedState('api.delay', null) - usePoller(async () => { - const [m, c, cd, d] = await Promise.allSettled([ - monitorApi.getMetricsSummary(), - monitorApi.getCacheStats(), - monitorApi.getCacheDetails(), - monitorApi.getDelay(), - ]) - if (m.status === 'fulfilled') setMetrics(m.value) - if (c.status === 'fulfilled') setCache(c.value) - if (cd.status === 'fulfilled') setCacheDetails(cd.value) - if (d.status === 'fulfilled') setDelay(d.value) + usePoller(() => { + monitorApi.getMetricsSummary().then(setMetrics).catch(() => {}) + monitorApi.getCacheStats().then(setCache).catch(() => {}) + monitorApi.getCacheDetails().then(setCacheDetails).catch(() => {}) + monitorApi.getDelay().then(setDelay).catch(() => {}) }, POLL_INTERVAL) const memUsed = metrics?.memory.used ?? 0 diff --git a/frontend/src/pages/AreaStats.tsx b/frontend/src/pages/AreaStats.tsx index bc64e4f..a54c7b1 100644 --- a/frontend/src/pages/AreaStats.tsx +++ b/frontend/src/pages/AreaStats.tsx @@ -1,31 +1,173 @@ -import { useState } from 'react' +import { useEffect, useRef, useCallback } from 'react' +import maplibregl from 'maplibre-gl' import { usePoller } from '../hooks/usePoller.ts' +import { useCachedState } from '../hooks/useCachedState.ts' import { useI18n } from '../hooks/useI18n.ts' import { monitorApi } from '../api/monitorApi.ts' +import { gisApi } from '../api/gisApi.ts' +import type { HaeguBoundary } from '../api/gisApi.ts' import type { ThroughputMetrics, DataQuality, HaeguStat } from '../api/types.ts' +import MapContainer from '../components/map/MapContainer.tsx' import MetricCard from '../components/charts/MetricCard.tsx' import StatusBadge from '../components/common/StatusBadge.tsx' import { formatNumber, formatDateTime } from '../utils/formatters.ts' const POLL_INTERVAL = 30_000 +/** 범례에 표시할 항목 */ +const LEGEND_ITEMS = [ + { label: '1-9', color: 'rgba(59,130,246,0.45)' }, + { label: '10-49', color: 'rgba(16,185,129,0.6)' }, + { label: '50-99', color: 'rgba(245,158,11,0.7)' }, + { label: '100-199', color: 'rgba(239,68,68,0.7)' }, + { label: '200+', color: 'rgba(139,92,246,0.8)' }, +] + export default function AreaStats() { const { t } = useI18n() - const [haegu, setHaegu] = useState([]) - const [throughput, setThroughput] = useState(null) - const [quality, setQuality] = useState(null) + const [haegu, setHaegu] = useCachedState('area.haegu', []) + const [throughput, setThroughput] = useCachedState('area.throughput', null) + const [quality, setQuality] = useCachedState('area.quality', null) + const [boundaries, setBoundaries] = useCachedState('area.boundaries', []) - usePoller(async () => { - const [h, tp, q] = await Promise.allSettled([ - monitorApi.getHaeguRealtimeStats(), - monitorApi.getThroughput(), - monitorApi.getQuality(), - ]) - if (h.status === 'fulfilled') setHaegu(h.value) - if (tp.status === 'fulfilled') setThroughput(tp.value) - if (q.status === 'fulfilled') setQuality(q.value) + const mapRef = useRef(null) + const popupRef = useRef(null) + + usePoller(() => { + monitorApi.getHaeguRealtimeStats().then(setHaegu).catch(() => {}) + monitorApi.getThroughput().then(setThroughput).catch(() => {}) + monitorApi.getQuality().then(setQuality).catch(() => {}) }, POLL_INTERVAL) + // 경계 데이터는 1회만 로딩 + useEffect(() => { + if (boundaries.length === 0) { + gisApi.getHaeguBoundaries().then(setBoundaries).catch(() => {}) + } + // eslint-disable-next-line react-hooks/exhaustive-deps + }, []) + + // GeoJSON 생성 함수 + const buildGeoJson = useCallback((): GeoJSON.FeatureCollection => { + const statsMap = new Map() + for (const h of haegu) { + statsMap.set(h.haegu_no, h) + } + + const features: GeoJSON.Feature[] = [] + for (const b of boundaries) { + if (!b.geom_json) continue + try { + const geometry = JSON.parse(b.geom_json) + const stat = statsMap.get(b.haegu_no) + features.push({ + type: 'Feature', + properties: { + haegu_no: b.haegu_no, + haegu_name: stat?.haegu_name ?? `대해구 ${b.haegu_no}`, + vessels: stat?.current_vessels ?? 0, + avg_speed: stat?.avg_speed ?? 0, + avg_density: stat?.avg_density ?? 0, + }, + geometry, + }) + } catch { + // skip invalid geom + } + } + return { type: 'FeatureCollection', features } + }, [boundaries, haegu]) + + // 지도 레이어 업데이트 + useEffect(() => { + const map = mapRef.current + if (!map || boundaries.length === 0) return + + const geojson = buildGeoJson() + + if (map.getSource('haegu')) { + (map.getSource('haegu') as maplibregl.GeoJSONSource).setData(geojson) + } else { + map.addSource('haegu', { type: 'geojson', data: geojson }) + + // fill 색상 step expression + // ['step', input, default, stop1, output1, stop2, output2, ...] + const fillColor = [ + 'step', + ['get', 'vessels'], + 'rgba(59,130,246,0.05)', // default (vessels < 0) + 0, 'rgba(59,130,246,0.25)', // 0 + 1, 'rgba(59,130,246,0.45)', // 1-9 + 10, 'rgba(16,185,129,0.55)', // 10-49 + 50, 'rgba(245,158,11,0.6)', // 50-99 + 100, 'rgba(239,68,68,0.6)', // 100-199 + 200, 'rgba(139,92,246,0.7)', // 200+ + ] as unknown as maplibregl.ExpressionSpecification + + map.addLayer({ + id: 'haegu-fill', + type: 'fill', + source: 'haegu', + paint: { + 'fill-color': fillColor, + 'fill-opacity': 0.8, + }, + }) + + map.addLayer({ + id: 'haegu-line', + type: 'line', + source: 'haegu', + paint: { + 'line-color': 'rgba(100,116,139,0.5)', + 'line-width': 1, + }, + }) + + // hover popup + const popup = new maplibregl.Popup({ + closeButton: false, + closeOnClick: false, + maxWidth: '220px', + }) + popupRef.current = popup + + map.on('mousemove', 'haegu-fill', (e) => { + if (!e.features || e.features.length === 0) return + map.getCanvas().style.cursor = 'pointer' + const props = e.features[0].properties + if (!props) return + + popup + .setLngLat(e.lngLat) + .setHTML( + `
+ ${props.haegu_name}
+ ${t('area.currentVessels')}: ${props.vessels}${t('area.vessels')}
+ ${t('area.avgSpeed')}: ${Number(props.avg_speed).toFixed(1)} kn +
`, + ) + .addTo(map) + }) + + map.on('mouseleave', 'haegu-fill', () => { + map.getCanvas().style.cursor = '' + popup.remove() + }) + } + }, [boundaries, haegu, buildGeoJson, t]) + + const handleMapReady = useCallback((map: maplibregl.Map) => { + mapRef.current = map + }, []) + + // cleanup popup on unmount + useEffect(() => { + return () => { + popupRef.current?.remove() + } + }, []) + return (

{t('area.title')}

@@ -54,6 +196,29 @@ export default function AreaStats() { />
+ {/* Haegu Map (choropleth) */} +
+
{t('area.haeguMap')}
+
+ + {/* Legend */} +
+
{t('area.mapLegend')}
+
+ {LEGEND_ITEMS.map((item) => ( +
+ + {item.label} +
+ ))} +
+
+
+
+ {/* Haegu Stats Table */}
{t('area.haeguStats')}
diff --git a/frontend/src/pages/Dashboard.tsx b/frontend/src/pages/Dashboard.tsx index 21a1b03..06d90c3 100644 --- a/frontend/src/pages/Dashboard.tsx +++ b/frontend/src/pages/Dashboard.tsx @@ -1,5 +1,6 @@ import { useState } from 'react' import { usePoller } from '../hooks/usePoller.ts' +import { useCachedState } from '../hooks/useCachedState.ts' import { useI18n } from '../hooks/useI18n.ts' import { batchApi } from '../api/batchApi.ts' import { monitorApi } from '../api/monitorApi.ts' @@ -21,29 +22,21 @@ const POLL_INTERVAL = 30_000 export default function Dashboard() { const { t } = useI18n() - const [stats, setStats] = useState(null) - const [metrics, setMetrics] = useState(null) - const [cache, setCache] = useState(null) - const [delay, setDelay] = useState(null) - const [daily, setDaily] = useState(null) - const [running, setRunning] = useState([]) + const [stats, setStats] = useCachedState('dash.stats', null) + const [metrics, setMetrics] = useCachedState('dash.metrics', null) + const [cache, setCache] = useCachedState('dash.cache', null) + const [delay, setDelay] = useCachedState('dash.delay', null) + const [daily, setDaily] = useCachedState('dash.daily', null) + const [running, setRunning] = useCachedState('dash.running', []) const [days, setDays] = useState(7) - usePoller(async () => { - const [s, m, c, d, ds, r] = await Promise.allSettled([ - batchApi.getStatistics(days), - monitorApi.getMetricsSummary(), - monitorApi.getCacheStats(), - monitorApi.getDelay(), - batchApi.getDailyStats(), - batchApi.getRunningJobs(), - ]) - if (s.status === 'fulfilled') setStats(s.value) - if (m.status === 'fulfilled') setMetrics(m.value) - if (c.status === 'fulfilled') setCache(c.value) - if (d.status === 'fulfilled') setDelay(d.value) - if (ds.status === 'fulfilled') setDaily(ds.value) - if (r.status === 'fulfilled') setRunning(r.value) + usePoller(() => { + batchApi.getStatistics(days).then(setStats).catch(() => {}) + monitorApi.getMetricsSummary().then(setMetrics).catch(() => {}) + monitorApi.getCacheStats().then(setCache).catch(() => {}) + monitorApi.getDelay().then(setDelay).catch(() => {}) + batchApi.getDailyStats().then(setDaily).catch(() => {}) + batchApi.getRunningJobs().then(setRunning).catch(() => {}) }, POLL_INTERVAL, [days]) const memUsage = metrics diff --git a/frontend/src/pages/DataPipeline.tsx b/frontend/src/pages/DataPipeline.tsx index 6ce4c2b..a0b02ee 100644 --- a/frontend/src/pages/DataPipeline.tsx +++ b/frontend/src/pages/DataPipeline.tsx @@ -1,5 +1,5 @@ -import { useState } from 'react' import { usePoller } from '../hooks/usePoller.ts' +import { useCachedState } from '../hooks/useCachedState.ts' import { useI18n } from '../hooks/useI18n.ts' import { batchApi } from '../api/batchApi.ts' import { monitorApi } from '../api/monitorApi.ts' @@ -28,28 +28,20 @@ const JOB_DISPLAY: Record = { export default function DataPipeline() { const { t } = useI18n() - const [stats, setStats] = useState(null) - const [daily, setDaily] = useState(null) - const [delay, setDelay] = useState(null) - const [cache, setCache] = useState(null) - const [cacheDetails, setCacheDetails] = useState(null) - const [recentJobs, setRecentJobs] = useState([]) + const [stats, setStats] = useCachedState('pipe.stats', null) + const [daily, setDaily] = useCachedState('pipe.daily', null) + const [delay, setDelay] = useCachedState('pipe.delay', null) + const [cache, setCache] = useCachedState('pipe.cache', null) + const [cacheDetails, setCacheDetails] = useCachedState('pipe.cacheDetail', null) + const [recentJobs, setRecentJobs] = useCachedState('pipe.jobs', []) - usePoller(async () => { - const [s, d, dl, c, cd, rj] = await Promise.allSettled([ - batchApi.getStatistics(7), - batchApi.getDailyStats(), - monitorApi.getDelay(), - monitorApi.getCacheStats(), - monitorApi.getCacheDetails(), - batchApi.getJobHistory(undefined, 20), - ]) - if (s.status === 'fulfilled') setStats(s.value) - if (d.status === 'fulfilled') setDaily(d.value) - if (dl.status === 'fulfilled') setDelay(dl.value) - if (c.status === 'fulfilled') setCache(c.value) - if (cd.status === 'fulfilled') setCacheDetails(cd.value) - if (rj.status === 'fulfilled') setRecentJobs(rj.value) + usePoller(() => { + batchApi.getStatistics(7).then(setStats).catch(() => {}) + batchApi.getDailyStats().then(setDaily).catch(() => {}) + monitorApi.getDelay().then(setDelay).catch(() => {}) + monitorApi.getCacheStats().then(setCache).catch(() => {}) + monitorApi.getCacheDetails().then(setCacheDetails).catch(() => {}) + batchApi.getJobHistory(undefined, 20).then(setRecentJobs).catch(() => {}) }, POLL_INTERVAL) const jobCounts = stats?.byJob.executionCounts ?? {} diff --git a/src/main/java/gc/mda/signal_batch/monitoring/controller/BatchAdminController.java b/src/main/java/gc/mda/signal_batch/monitoring/controller/BatchAdminController.java index 730cf0b..bec4ffd 100644 --- a/src/main/java/gc/mda/signal_batch/monitoring/controller/BatchAdminController.java +++ b/src/main/java/gc/mda/signal_batch/monitoring/controller/BatchAdminController.java @@ -16,8 +16,10 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.web.bind.annotation.*; +import java.time.LocalDate; import java.time.LocalDateTime; import java.util.*; import java.util.Date; @@ -41,7 +43,11 @@ public class BatchAdminController { @Autowired @Qualifier("vesselTrackAggregationJob") private Job vesselTrackAggregationJob; - + + @Autowired + @Qualifier("queryJdbcTemplate") + private JdbcTemplate queryJdbcTemplate; + private final BatchMetadataCleanupService batchMetadataCleanupService; @Autowired @@ -529,7 +535,9 @@ public class BatchAdminController { } /** - * 배치 통계 + * 배치 통계 (단일 SQL 집계) + * 기존: JobExplorer N+1 루프 (~8000 쿼리, 1.1초) + * 개선: batch 메타 테이블 직접 GROUP BY (~1 쿼리) */ @GetMapping("/statistics") @Operation(summary = "배치 통계 조회", description = "지정된 기간의 배치 실행 통계를 조회합니다") @@ -537,86 +545,62 @@ public class BatchAdminController { @Parameter(description = "조회 기간 (일)") @RequestParam(defaultValue = "7") int days) { try { + long start = System.currentTimeMillis(); LocalDateTime since = LocalDateTime.now().minusDays(days); - Map statistics = new HashMap<>(); + + List> jobStats = queryJdbcTemplate.queryForList( + """ + SELECT + ji.JOB_NAME as job_name, + COUNT(*) as total_executions, + COUNT(*) FILTER (WHERE je.STATUS = 'COMPLETED') as successful, + COUNT(*) FILTER (WHERE je.STATUS = 'FAILED') as failed, + COALESCE(SUM(EXTRACT(EPOCH FROM (je.END_TIME - je.START_TIME)))::bigint, 0) as total_duration_sec, + COALESCE(SUM(se.total_read), 0) as total_records + FROM batch_job_execution je + JOIN batch_job_instance ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID + LEFT JOIN ( + SELECT JOB_EXECUTION_ID, SUM(READ_COUNT) as total_read + FROM batch_step_execution + GROUP BY JOB_EXECUTION_ID + ) se ON je.JOB_EXECUTION_ID = se.JOB_EXECUTION_ID + WHERE je.START_TIME > ? + GROUP BY ji.JOB_NAME + """, + since + ); int totalExecutions = 0; int successfulExecutions = 0; int failedExecutions = 0; long totalRecordsProcessed = 0; long totalDuration = 0; - Map jobExecutionCounts = new HashMap<>(); Map jobProcessingTimes = new HashMap<>(); - for (String jobName : jobExplorer.getJobNames()) { - try { - List instances = jobExplorer.getJobInstances(jobName, 0, 1000); + for (Map row : jobStats) { + String jobName = (String) row.get("job_name"); + int total = ((Number) row.get("total_executions")).intValue(); + int successful = ((Number) row.get("successful")).intValue(); + int failed = ((Number) row.get("failed")).intValue(); + long duration = ((Number) row.get("total_duration_sec")).longValue(); + long records = ((Number) row.get("total_records")).longValue(); - for (JobInstance instance : instances) { - try { - List executions = jobExplorer.getJobExecutions(instance); - - for (JobExecution execution : executions) { - try { - if (execution.getStartTime() != null && - execution.getStartTime().isAfter(since)) { - - totalExecutions++; - - if (execution.getStatus() == BatchStatus.COMPLETED) { - successfulExecutions++; - } else if (execution.getStatus() == BatchStatus.FAILED) { - failedExecutions++; - } - - // 처리 레코드 수 - ExecutionContext 오류를 피하기 위해 안전하게 처리 - long records = 0; - try { - records = execution.getStepExecutions().stream() - .mapToLong(StepExecution::getReadCount) - .sum(); - } catch (Exception e) { - log.warn("Failed to get read count for execution {}: {}", execution.getId(), e.getMessage()); - // ExecutionContext 오류 시 기본값 0 사용 - } - totalRecordsProcessed += records; - - // 실행 시간 - if (execution.getEndTime() != null) { - long duration = java.time.Duration.between( - execution.getStartTime(), - execution.getEndTime() - ).getSeconds(); - totalDuration += duration; - - jobProcessingTimes.merge(jobName, duration, Long::sum); - } - - jobExecutionCounts.merge(jobName, 1, Integer::sum); - } - } catch (Exception e) { - log.warn("Failed to process execution {}: {}", execution.getId(), e.getMessage()); - // 개별 execution 처리 실패 시 건너뛰기 - } - } - } catch (Exception e) { - log.debug("Failed to get executions for instance {}: {}", instance.getId(), e.getMessage()); - // 개별 instance 처리 실패 시 건너뛰기 - DEBUG 레벨로 변경하여 로그 스팸 방지 - } - } - } catch (Exception e) { - log.warn("Failed to get instances for job {}: {}", jobName, e.getMessage()); - // 개별 job 처리 실패 시 건너뛰기 - } + totalExecutions += total; + successfulExecutions += successful; + failedExecutions += failed; + totalDuration += duration; + totalRecordsProcessed += records; + jobExecutionCounts.put(jobName, total); + jobProcessingTimes.put(jobName, duration); } + Map statistics = new HashMap<>(); statistics.put("period", Map.of( "start", since, "end", LocalDateTime.now(), "days", days )); - statistics.put("summary", Map.of( "totalExecutions", totalExecutions, "successful", successfulExecutions, @@ -630,12 +614,12 @@ public class BatchAdminController { "avgProcessingTimeSeconds", totalExecutions > 0 ? totalDuration / totalExecutions : 0 )); - statistics.put("byJob", Map.of( "executionCounts", jobExecutionCounts, "processingTimes", jobProcessingTimes )); + log.info("Batch statistics (direct SQL): {}ms", System.currentTimeMillis() - start); return ResponseEntity.ok(statistics); } catch (Exception e) { @@ -646,96 +630,95 @@ public class BatchAdminController { } /** - * 일별 처리 통계 (Dashboard 차트용) + * 일별 처리 통계 (Dashboard 차트용, 단일 SQL 집계) + * 기존: JobExplorer N+1 × 7일 루프 (~8800 쿼리, 9초) + * 개선: batch 메타 테이블 직접 GROUP BY (2 쿼리) */ @GetMapping("/daily-stats") @Operation(summary = "일별 처리 통계", description = "최근 7일간 일별 배치 처리 통계를 조회합니다 (대시보드 차트용)") public ResponseEntity> getDailyStatistics() { try { - Map result = new HashMap<>(); + long start = System.currentTimeMillis(); + + // 1) 7일간 일별 + Job별 집계 (단일 SQL) + List> rows = queryJdbcTemplate.queryForList( + """ + SELECT + DATE(je.START_TIME) as stat_date, + ji.JOB_NAME as job_name, + COUNT(*) as execution_count, + COALESCE(SUM(se.total_write), 0) as total_processed + FROM batch_job_execution je + JOIN batch_job_instance ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID + LEFT JOIN ( + SELECT JOB_EXECUTION_ID, SUM(WRITE_COUNT) as total_write + FROM batch_step_execution + GROUP BY JOB_EXECUTION_ID + ) se ON je.JOB_EXECUTION_ID = se.JOB_EXECUTION_ID + WHERE je.STATUS = 'COMPLETED' + AND je.START_TIME > NOW() - INTERVAL '7 days' + GROUP BY DATE(je.START_TIME), ji.JOB_NAME + ORDER BY stat_date + """ + ); + + // 일별 그룹핑: date -> [totalProcessed, vesselJobs, trackJobs] + Map dailyMap = new LinkedHashMap<>(); + for (Map row : rows) { + String date = row.get("stat_date").toString(); + String jobName = (String) row.get("job_name"); + int execCount = ((Number) row.get("execution_count")).intValue(); + long processed = ((Number) row.get("total_processed")).longValue(); + + long[] accum = dailyMap.computeIfAbsent(date, k -> new long[3]); + accum[0] += processed; + if (jobName.contains("vesselAggregation")) { + accum[1] += execCount; + } else if (jobName.contains("vesselTrack")) { + accum[2] += execCount; + } + } + + // 7일 전체 채우기 (데이터 없는 날도 포함) List> dailyStats = new ArrayList<>(); - - // 최근 7일간 일별 처리량 for (int i = 6; i >= 0; i--) { - LocalDateTime date = LocalDateTime.now().minusDays(i).withHour(0).withMinute(0).withSecond(0); - LocalDateTime nextDate = date.plusDays(1); + String date = LocalDate.now().minusDays(i).toString(); + long[] accum = dailyMap.getOrDefault(date, new long[3]); + Map dailyStat = new HashMap<>(); + dailyStat.put("date", date); + dailyStat.put("totalProcessed", accum[0]); + dailyStat.put("vesselJobs", (int) accum[1]); + dailyStat.put("trackJobs", (int) accum[2]); + dailyStats.add(dailyStat); + } - long totalProcessed = 0; - int vesselJobCount = 0; - int trackJobCount = 0; + // 2) 24시간 상태별 요약 (2번째 SQL) + Map statusSummary = new HashMap<>(); + statusSummary.put("completed", 0); + statusSummary.put("failed", 0); + statusSummary.put("stopped", 0); - // 모든 Job의 실행 이력 확인 - for (String jobName : jobExplorer.getJobNames()) { - List instances = jobExplorer.getJobInstances(jobName, 0, 1000); + List> statusRows = queryJdbcTemplate.queryForList( + """ + SELECT LOWER(je.STATUS) as status, COUNT(*) as cnt + FROM batch_job_execution je + WHERE je.START_TIME > NOW() - INTERVAL '24 hours' + GROUP BY je.STATUS + """ + ); + for (Map row : statusRows) { + String status = (String) row.get("status"); + int cnt = ((Number) row.get("cnt")).intValue(); + statusSummary.merge(status, cnt, Integer::sum); + } - for (JobInstance instance : instances) { - try { - List executions = jobExplorer.getJobExecutions(instance); + Map result = new HashMap<>(); + result.put("dailyStats", dailyStats); + result.put("statusSummary", statusSummary); - for (JobExecution execution : executions) { - if (execution.getStartTime() != null && - execution.getStartTime().isAfter(date) && - execution.getStartTime().isBefore(nextDate) && - execution.getStatus() == BatchStatus.COMPLETED) { + log.info("Daily statistics (direct SQL): {}ms", System.currentTimeMillis() - start); + return ResponseEntity.ok(result); - // 처리된 레코드 수 계산 - long records = execution.getStepExecutions().stream() - .mapToLong(StepExecution::getWriteCount) - .sum(); - totalProcessed += records; - - // Job 타입별 카운트 - if (jobName.contains("vesselAggregation")) { - vesselJobCount++; - } else if (jobName.contains("vesselTrack")) { - trackJobCount++; - } - } - } - } catch(Exception e){ - log.debug("Failed to get executions for instance {} in daily statistics: {}", instance.getId(), e.getMessage()); - // ExecutionContext 역직렬화 실패 시 해당 instance는 건너뛰기 - } - } - } - - Map dailyStat = new HashMap<>(); - dailyStat.put("date", date.toLocalDate().toString()); - dailyStat.put("totalProcessed", totalProcessed); - dailyStat.put("vesselJobs", vesselJobCount); - dailyStat.put("trackJobs", trackJobCount); - dailyStats.add(dailyStat); - } - - // Job 상태별 요약 (Status Distribution Chart용) - Map statusSummary = new HashMap<>(); - statusSummary.put("completed", 0); - statusSummary.put("failed", 0); - statusSummary.put("stopped", 0); - - // 최근 24시간 Job 상태 통계 - LocalDateTime since24h = LocalDateTime.now().minusHours(24); - for (String jobName : jobExplorer.getJobNames()) { - List instances = jobExplorer.getJobInstances(jobName, 0, 200); - - for (JobInstance instance : instances) { - List executions = jobExplorer.getJobExecutions(instance); - - for (JobExecution execution : executions) { - if (execution.getStartTime() != null && - execution.getStartTime().isAfter(since24h)) { - - String status = execution.getStatus().toString().toLowerCase(); - statusSummary.merge(status, 1, Integer::sum); - } - } - } - } - - result.put("dailyStats", dailyStats); - result.put("statusSummary", statusSummary); - - return ResponseEntity.ok(result); } catch (Exception e) { log.error("Failed to get daily statistics", e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) diff --git a/src/main/java/gc/mda/signal_batch/monitoring/controller/MonitoringController.java b/src/main/java/gc/mda/signal_batch/monitoring/controller/MonitoringController.java index dcc29b0..f92abcc 100644 --- a/src/main/java/gc/mda/signal_batch/monitoring/controller/MonitoringController.java +++ b/src/main/java/gc/mda/signal_batch/monitoring/controller/MonitoringController.java @@ -1,25 +1,47 @@ package gc.mda.signal_batch.monitoring.controller; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import gc.mda.signal_batch.batch.reader.AisTargetCacheManager; +import gc.mda.signal_batch.domain.vessel.model.AisTargetEntity; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.web.bind.annotation.*; import java.time.LocalDateTime; +import java.time.OffsetDateTime; import java.util.*; +import java.util.concurrent.TimeUnit; @Slf4j @RestController @RequestMapping("/monitor") -@RequiredArgsConstructor @ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true) @Tag(name = "시스템 모니터링 API", description = "데이터 처리 지연, 해구별 현황, 처리량 및 데이터 품질 모니터링 API") public class MonitoringController { private final JdbcTemplate queryJdbcTemplate; + private final AisTargetCacheManager aisTargetCacheManager; + + /** 해구 경계 (1회 로딩, 변경 거의 없음) */ + private volatile List haeguBoundsList; + + /** throughput 파티션 크기 캐시 (5분 TTL) */ + private final Cache>> partitionSizeCache = Caffeine.newBuilder() + .expireAfterWrite(5, TimeUnit.MINUTES) + .maximumSize(1) + .build(); + + public MonitoringController( + @Qualifier("queryJdbcTemplate") JdbcTemplate queryJdbcTemplate, + AisTargetCacheManager aisTargetCacheManager) { + this.queryJdbcTemplate = queryJdbcTemplate; + this.aisTargetCacheManager = aisTargetCacheManager; + } /** * 데이터 처리 지연 상태 확인 (AIS 수집 vs 5분 집계 간 지연) @@ -82,39 +104,62 @@ public class MonitoringController { } /** - * 대해구별 실시간 선박 현황 (t_ais_position + t_haegu_definitions 공간 조인) + * 대해구별 실시간 선박 현황 (인메모리 캐시 기반) + * 기존: t_ais_position JOIN t_haegu_definitions (~12초) + * 개선: AisTargetCacheManager 인메모리 순회 + 해구 바운딩 박스 매칭 */ @GetMapping("/haegu/realtime") @Operation(summary = "대해구별 실시간 현황", description = "최신 AIS 위치 데이터 기준 대해구별 선박 통계를 조회합니다") public List> getRealtimeHaeguStatus() { - String sql = """ - SELECT - h.haegu_no, - CONCAT('대해구 ', h.haegu_no) as haegu_name, - COUNT(DISTINCT a.mmsi) as current_vessels, - ROUND(AVG(a.sog)::numeric, 1) as avg_speed, - ROUND((COUNT(DISTINCT a.mmsi)::numeric / - GREATEST((h.max_lat - h.min_lat) * (h.max_lon - h.min_lon) * 12321, 0.01))::numeric, - 4) as avg_density, - MAX(a.last_update) as last_update, - h.center_lon, - h.center_lat - FROM signal.t_haegu_definitions h - JOIN signal.t_ais_position a - ON a.lat BETWEEN h.min_lat AND h.max_lat - AND a.lon BETWEEN h.min_lon AND h.max_lon - WHERE a.last_update > NOW() - INTERVAL '30 minutes' - GROUP BY h.haegu_no, h.min_lat, h.min_lon, h.max_lat, h.max_lon, - h.center_lon, h.center_lat - HAVING COUNT(DISTINCT a.mmsi) > 0 - ORDER BY current_vessels DESC - LIMIT 50 - """; - try { long start = System.currentTimeMillis(); - List> result = queryJdbcTemplate.queryForList(sql); - log.info("Haegu realtime query: {} rows in {}ms", result.size(), System.currentTimeMillis() - start); + List bounds = getHaeguBounds(); + OffsetDateTime threshold = OffsetDateTime.now().minusMinutes(30); + + Map accumulators = new LinkedHashMap<>(); + for (HaeguBounds b : bounds) { + accumulators.put(b.haeguNo, new HaeguAccumulator()); + } + + for (AisTargetEntity entity : aisTargetCacheManager.getAllValues()) { + if (entity.getLat() == null || entity.getLon() == null) continue; + if (entity.getMessageTimestamp() == null || entity.getMessageTimestamp().isBefore(threshold)) continue; + + double lat = entity.getLat(); + double lon = entity.getLon(); + for (HaeguBounds b : bounds) { + if (lat >= b.minLat && lat <= b.maxLat && lon >= b.minLon && lon <= b.maxLon) { + accumulators.get(b.haeguNo).add(entity); + } + } + } + + List> result = new ArrayList<>(); + for (HaeguBounds b : bounds) { + HaeguAccumulator acc = accumulators.get(b.haeguNo); + if (acc.vesselCount() == 0) continue; + + double area = Math.max((b.maxLat - b.minLat) * (b.maxLon - b.minLon) * 12321, 0.01); + Map row = new LinkedHashMap<>(); + row.put("haegu_no", b.haeguNo); + row.put("haegu_name", "대해구 " + b.haeguNo); + row.put("current_vessels", acc.vesselCount()); + row.put("avg_speed", Math.round(acc.avgSpeed() * 10.0) / 10.0); + row.put("avg_density", Math.round(acc.vesselCount() / area * 10000.0) / 10000.0); + row.put("last_update", acc.latestUpdate()); + row.put("center_lon", b.centerLon); + row.put("center_lat", b.centerLat); + result.add(row); + } + + result.sort((a, b2) -> Integer.compare( + ((Number) b2.get("current_vessels")).intValue(), + ((Number) a.get("current_vessels")).intValue())); + if (result.size() > 50) { + result = new ArrayList<>(result.subList(0, 50)); + } + + log.info("Haegu realtime (in-memory): {} rows in {}ms", result.size(), System.currentTimeMillis() - start); return result; } catch (Exception e) { log.error("Failed to get realtime haegu status: {}", e.getMessage(), e); @@ -158,17 +203,19 @@ public class MonitoringController { metrics.put("hourlyDetails", hourlyStats); } - List> partitionSizes = queryJdbcTemplate.queryForList( - """ - SELECT - tablename, - pg_size_pretty(pg_total_relation_size('signal.' || tablename)) as size, - pg_total_relation_size('signal.' || tablename) as size_bytes - FROM pg_tables - WHERE schemaname = 'signal' - AND tablename LIKE 't_vessel_tracks_%' - ORDER BY tablename - """ + List> partitionSizes = partitionSizeCache.get("sizes", k -> + queryJdbcTemplate.queryForList( + """ + SELECT + tablename, + pg_size_pretty(pg_total_relation_size('signal.' || tablename)) as size, + pg_total_relation_size('signal.' || tablename) as size_bytes + FROM pg_tables + WHERE schemaname = 'signal' + AND tablename LIKE 't_vessel_tracks_%' + ORDER BY tablename + """ + ) ); metrics.put("partitionSizes", partitionSizes); @@ -210,6 +257,7 @@ public class MonitoringController { SELECT COUNT(*) FROM signal.t_ais_position WHERE last_update < NOW() - INTERVAL '30 minutes' + AND last_update > NOW() - INTERVAL '24 hours' """, Integer.class ); @@ -232,6 +280,60 @@ public class MonitoringController { return quality; } + // ==================== 해구 인메모리 헬퍼 ==================== + + private List getHaeguBounds() { + if (haeguBoundsList != null) { + return haeguBoundsList; + } + synchronized (this) { + if (haeguBoundsList != null) { + return haeguBoundsList; + } + haeguBoundsList = queryJdbcTemplate.query( + "SELECT haegu_no, min_lat, max_lat, min_lon, max_lon, center_lon, center_lat FROM signal.t_haegu_definitions", + (rs, rowNum) -> new HaeguBounds( + rs.getInt("haegu_no"), + rs.getDouble("min_lat"), rs.getDouble("max_lat"), + rs.getDouble("min_lon"), rs.getDouble("max_lon"), + rs.getDouble("center_lon"), rs.getDouble("center_lat") + ) + ); + log.info("해구 경계 로딩 완료: {}개", haeguBoundsList.size()); + return haeguBoundsList; + } + } + + private record HaeguBounds(int haeguNo, double minLat, double maxLat, + double minLon, double maxLon, + double centerLon, double centerLat) {} + + private static class HaeguAccumulator { + private final Set mmsiSet = new HashSet<>(); + private double sogSum = 0; + private int sogCount = 0; + private OffsetDateTime latest = null; + + void add(AisTargetEntity entity) { + mmsiSet.add(entity.getMmsi()); + if (entity.getSog() != null) { + sogSum += entity.getSog(); + sogCount++; + } + if (entity.getMessageTimestamp() != null) { + if (latest == null || entity.getMessageTimestamp().isAfter(latest)) { + latest = entity.getMessageTimestamp(); + } + } + } + + int vesselCount() { return mmsiSet.size(); } + double avgSpeed() { return sogCount > 0 ? sogSum / sogCount : 0; } + OffsetDateTime latestUpdate() { return latest; } + } + + // ==================== 유틸 ==================== + private LocalDateTime toLocalDateTime(Object raw) { if (raw == null) return null; if (raw instanceof LocalDateTime ldt) return ldt;