Merge pull request 'perf: API 응답 최적화 + 점진적 렌더링 + 해구 choropleth 지도' (#39) from develop into main
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 2m35s

This commit is contained in:
htlee 2026-02-19 20:26:23 +09:00
커밋 5cf6e32d71
11개의 변경된 파일524개의 추가작업 그리고 266개의 파일을 삭제

파일 보기

@ -2,8 +2,8 @@ import { fetchJson, postJson } from './httpClient.ts'
export interface HaeguBoundary {
haegu_no: number
haegu_name: string
boundary_wkt: string
area_name: string | null
geom_json: string
center_lon: number
center_lat: number
}

파일 보기

@ -0,0 +1,26 @@
import { useState, useCallback } from 'react'
/**
* React
*
*/
const cache = new Map<string, unknown>()
/**
* useState +
* - 초기값: 캐시에 , fallback
* - setState
*/
export function useCachedState<T>(key: string, fallback: T): [T, (v: T) => void] {
const [value, setValue] = useState<T>(() => {
const cached = cache.get(key)
return cached !== undefined ? (cached as T) : fallback
})
const set = useCallback((v: T) => {
cache.set(key, v)
setValue(v)
}, [key])
return [value, set]
}

파일 보기

@ -117,6 +117,9 @@ const en = {
'area.duplicates': 'Duplicate Tracks',
'area.stalePositions': 'Stale Positions',
'area.checkedAt': 'Checked at',
'area.haeguMap': 'Vessel Distribution by Area',
'area.mapLegend': 'Vessels',
'area.vessels': ' vessels',
// API Explorer
'explorer.title': 'API Explorer',

파일 보기

@ -117,6 +117,9 @@ const ko = {
'area.duplicates': '중복 항적',
'area.stalePositions': '갱신 지연 위치',
'area.checkedAt': '검증 시각',
'area.haeguMap': '해구별 선박 분포',
'area.mapLegend': '선박 수',
'area.vessels': '척',
// API Explorer
'explorer.title': 'API 탐색기',

파일 보기

@ -1,5 +1,6 @@
import { useState } from 'react'
import { usePoller } from '../hooks/usePoller.ts'
import { useCachedState } from '../hooks/useCachedState.ts'
import { useI18n } from '../hooks/useI18n.ts'
import { abnormalApi } from '../api/abnormalApi.ts'
import type { AbnormalTrack, AbnormalSummary } from '../api/abnormalApi.ts'
@ -24,17 +25,13 @@ const HOURS_OPTIONS = [6, 12, 24, 48, 72]
export default function AbnormalTracks() {
const { t } = useI18n()
const [hours, setHours] = useState(24)
const [tracks, setTracks] = useState<AbnormalTrack[]>([])
const [summary, setSummary] = useState<AbnormalSummary | null>(null)
const [tracks, setTracks] = useCachedState<AbnormalTrack[]>('abn.tracks', [])
const [summary, setSummary] = useCachedState<AbnormalSummary | null>('abn.summary', null)
const [typeFilter, setTypeFilter] = useState<string>('all')
usePoller(async () => {
const [tr, sm] = await Promise.allSettled([
abnormalApi.getRecent(hours),
abnormalApi.getStatisticsSummary(7),
])
if (tr.status === 'fulfilled') setTracks(tr.value)
if (sm.status === 'fulfilled') setSummary(sm.value)
usePoller(() => {
abnormalApi.getRecent(hours).then(setTracks).catch(() => {})
abnormalApi.getStatisticsSummary(7).then(setSummary).catch(() => {})
}, POLL_INTERVAL, [hours])
const filteredTracks = typeFilter === 'all'

파일 보기

@ -1,5 +1,5 @@
import { useState } from 'react'
import { usePoller } from '../hooks/usePoller.ts'
import { useCachedState } from '../hooks/useCachedState.ts'
import { useI18n } from '../hooks/useI18n.ts'
import { monitorApi } from '../api/monitorApi.ts'
import type { MetricsSummary, CacheStats, ProcessingDelay, CacheDetails } from '../api/types.ts'
@ -10,22 +10,16 @@ const POLL_INTERVAL = 10_000
export default function ApiMetrics() {
const { t } = useI18n()
const [metrics, setMetrics] = useState<MetricsSummary | null>(null)
const [cache, setCache] = useState<CacheStats | null>(null)
const [cacheDetails, setCacheDetails] = useState<CacheDetails | null>(null)
const [delay, setDelay] = useState<ProcessingDelay | null>(null)
const [metrics, setMetrics] = useCachedState<MetricsSummary | null>('api.metrics', null)
const [cache, setCache] = useCachedState<CacheStats | null>('api.cache', null)
const [cacheDetails, setCacheDetails] = useCachedState<CacheDetails | null>('api.cacheDetail', null)
const [delay, setDelay] = useCachedState<ProcessingDelay | null>('api.delay', null)
usePoller(async () => {
const [m, c, cd, d] = await Promise.allSettled([
monitorApi.getMetricsSummary(),
monitorApi.getCacheStats(),
monitorApi.getCacheDetails(),
monitorApi.getDelay(),
])
if (m.status === 'fulfilled') setMetrics(m.value)
if (c.status === 'fulfilled') setCache(c.value)
if (cd.status === 'fulfilled') setCacheDetails(cd.value)
if (d.status === 'fulfilled') setDelay(d.value)
usePoller(() => {
monitorApi.getMetricsSummary().then(setMetrics).catch(() => {})
monitorApi.getCacheStats().then(setCache).catch(() => {})
monitorApi.getCacheDetails().then(setCacheDetails).catch(() => {})
monitorApi.getDelay().then(setDelay).catch(() => {})
}, POLL_INTERVAL)
const memUsed = metrics?.memory.used ?? 0

파일 보기

@ -1,31 +1,173 @@
import { useState } from 'react'
import { useEffect, useRef, useCallback } from 'react'
import maplibregl from 'maplibre-gl'
import { usePoller } from '../hooks/usePoller.ts'
import { useCachedState } from '../hooks/useCachedState.ts'
import { useI18n } from '../hooks/useI18n.ts'
import { monitorApi } from '../api/monitorApi.ts'
import { gisApi } from '../api/gisApi.ts'
import type { HaeguBoundary } from '../api/gisApi.ts'
import type { ThroughputMetrics, DataQuality, HaeguStat } from '../api/types.ts'
import MapContainer from '../components/map/MapContainer.tsx'
import MetricCard from '../components/charts/MetricCard.tsx'
import StatusBadge from '../components/common/StatusBadge.tsx'
import { formatNumber, formatDateTime } from '../utils/formatters.ts'
const POLL_INTERVAL = 30_000
/** 범례에 표시할 항목 */
const LEGEND_ITEMS = [
{ label: '1-9', color: 'rgba(59,130,246,0.45)' },
{ label: '10-49', color: 'rgba(16,185,129,0.6)' },
{ label: '50-99', color: 'rgba(245,158,11,0.7)' },
{ label: '100-199', color: 'rgba(239,68,68,0.7)' },
{ label: '200+', color: 'rgba(139,92,246,0.8)' },
]
export default function AreaStats() {
const { t } = useI18n()
const [haegu, setHaegu] = useState<HaeguStat[]>([])
const [throughput, setThroughput] = useState<ThroughputMetrics | null>(null)
const [quality, setQuality] = useState<DataQuality | null>(null)
const [haegu, setHaegu] = useCachedState<HaeguStat[]>('area.haegu', [])
const [throughput, setThroughput] = useCachedState<ThroughputMetrics | null>('area.throughput', null)
const [quality, setQuality] = useCachedState<DataQuality | null>('area.quality', null)
const [boundaries, setBoundaries] = useCachedState<HaeguBoundary[]>('area.boundaries', [])
usePoller(async () => {
const [h, tp, q] = await Promise.allSettled([
monitorApi.getHaeguRealtimeStats(),
monitorApi.getThroughput(),
monitorApi.getQuality(),
])
if (h.status === 'fulfilled') setHaegu(h.value)
if (tp.status === 'fulfilled') setThroughput(tp.value)
if (q.status === 'fulfilled') setQuality(q.value)
const mapRef = useRef<maplibregl.Map | null>(null)
const popupRef = useRef<maplibregl.Popup | null>(null)
usePoller(() => {
monitorApi.getHaeguRealtimeStats().then(setHaegu).catch(() => {})
monitorApi.getThroughput().then(setThroughput).catch(() => {})
monitorApi.getQuality().then(setQuality).catch(() => {})
}, POLL_INTERVAL)
// 경계 데이터는 1회만 로딩
useEffect(() => {
if (boundaries.length === 0) {
gisApi.getHaeguBoundaries().then(setBoundaries).catch(() => {})
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [])
// GeoJSON 생성 함수
const buildGeoJson = useCallback((): GeoJSON.FeatureCollection => {
const statsMap = new Map<number, HaeguStat>()
for (const h of haegu) {
statsMap.set(h.haegu_no, h)
}
const features: GeoJSON.Feature[] = []
for (const b of boundaries) {
if (!b.geom_json) continue
try {
const geometry = JSON.parse(b.geom_json)
const stat = statsMap.get(b.haegu_no)
features.push({
type: 'Feature',
properties: {
haegu_no: b.haegu_no,
haegu_name: stat?.haegu_name ?? `대해구 ${b.haegu_no}`,
vessels: stat?.current_vessels ?? 0,
avg_speed: stat?.avg_speed ?? 0,
avg_density: stat?.avg_density ?? 0,
},
geometry,
})
} catch {
// skip invalid geom
}
}
return { type: 'FeatureCollection', features }
}, [boundaries, haegu])
// 지도 레이어 업데이트
useEffect(() => {
const map = mapRef.current
if (!map || boundaries.length === 0) return
const geojson = buildGeoJson()
if (map.getSource('haegu')) {
(map.getSource('haegu') as maplibregl.GeoJSONSource).setData(geojson)
} else {
map.addSource('haegu', { type: 'geojson', data: geojson })
// fill 색상 step expression
// ['step', input, default, stop1, output1, stop2, output2, ...]
const fillColor = [
'step',
['get', 'vessels'],
'rgba(59,130,246,0.05)', // default (vessels < 0)
0, 'rgba(59,130,246,0.25)', // 0
1, 'rgba(59,130,246,0.45)', // 1-9
10, 'rgba(16,185,129,0.55)', // 10-49
50, 'rgba(245,158,11,0.6)', // 50-99
100, 'rgba(239,68,68,0.6)', // 100-199
200, 'rgba(139,92,246,0.7)', // 200+
] as unknown as maplibregl.ExpressionSpecification
map.addLayer({
id: 'haegu-fill',
type: 'fill',
source: 'haegu',
paint: {
'fill-color': fillColor,
'fill-opacity': 0.8,
},
})
map.addLayer({
id: 'haegu-line',
type: 'line',
source: 'haegu',
paint: {
'line-color': 'rgba(100,116,139,0.5)',
'line-width': 1,
},
})
// hover popup
const popup = new maplibregl.Popup({
closeButton: false,
closeOnClick: false,
maxWidth: '220px',
})
popupRef.current = popup
map.on('mousemove', 'haegu-fill', (e) => {
if (!e.features || e.features.length === 0) return
map.getCanvas().style.cursor = 'pointer'
const props = e.features[0].properties
if (!props) return
popup
.setLngLat(e.lngLat)
.setHTML(
`<div style="font-size:13px;line-height:1.5">
<strong>${props.haegu_name}</strong><br/>
${t('area.currentVessels')}: <b>${props.vessels}</b>${t('area.vessels')}<br/>
${t('area.avgSpeed')}: ${Number(props.avg_speed).toFixed(1)} kn
</div>`,
)
.addTo(map)
})
map.on('mouseleave', 'haegu-fill', () => {
map.getCanvas().style.cursor = ''
popup.remove()
})
}
}, [boundaries, haegu, buildGeoJson, t])
const handleMapReady = useCallback((map: maplibregl.Map) => {
mapRef.current = map
}, [])
// cleanup popup on unmount
useEffect(() => {
return () => {
popupRef.current?.remove()
}
}, [])
return (
<div className="space-y-6 fade-in">
<h1 className="text-2xl font-bold">{t('area.title')}</h1>
@ -54,6 +196,29 @@ export default function AreaStats() {
/>
</div>
{/* Haegu Map (choropleth) */}
<div className="sb-card">
<div className="sb-card-header">{t('area.haeguMap')}</div>
<div className="relative" style={{ height: 480 }}>
<MapContainer onMapReady={handleMapReady} />
{/* Legend */}
<div className="absolute bottom-4 left-4 rounded-lg bg-surface/90 px-3 py-2 shadow-lg backdrop-blur-sm">
<div className="mb-1.5 text-xs font-medium text-muted">{t('area.mapLegend')}</div>
<div className="space-y-1">
{LEGEND_ITEMS.map((item) => (
<div key={item.label} className="flex items-center gap-2 text-xs">
<span
className="inline-block h-3 w-5 rounded"
style={{ backgroundColor: item.color }}
/>
<span>{item.label}</span>
</div>
))}
</div>
</div>
</div>
</div>
{/* Haegu Stats Table */}
<div className="sb-card">
<div className="sb-card-header">{t('area.haeguStats')}</div>

파일 보기

@ -1,5 +1,6 @@
import { useState } from 'react'
import { usePoller } from '../hooks/usePoller.ts'
import { useCachedState } from '../hooks/useCachedState.ts'
import { useI18n } from '../hooks/useI18n.ts'
import { batchApi } from '../api/batchApi.ts'
import { monitorApi } from '../api/monitorApi.ts'
@ -21,29 +22,21 @@ const POLL_INTERVAL = 30_000
export default function Dashboard() {
const { t } = useI18n()
const [stats, setStats] = useState<BatchStatistics | null>(null)
const [metrics, setMetrics] = useState<MetricsSummary | null>(null)
const [cache, setCache] = useState<CacheStats | null>(null)
const [delay, setDelay] = useState<ProcessingDelay | null>(null)
const [daily, setDaily] = useState<DailyStats | null>(null)
const [running, setRunning] = useState<RunningJob[]>([])
const [stats, setStats] = useCachedState<BatchStatistics | null>('dash.stats', null)
const [metrics, setMetrics] = useCachedState<MetricsSummary | null>('dash.metrics', null)
const [cache, setCache] = useCachedState<CacheStats | null>('dash.cache', null)
const [delay, setDelay] = useCachedState<ProcessingDelay | null>('dash.delay', null)
const [daily, setDaily] = useCachedState<DailyStats | null>('dash.daily', null)
const [running, setRunning] = useCachedState<RunningJob[]>('dash.running', [])
const [days, setDays] = useState(7)
usePoller(async () => {
const [s, m, c, d, ds, r] = await Promise.allSettled([
batchApi.getStatistics(days),
monitorApi.getMetricsSummary(),
monitorApi.getCacheStats(),
monitorApi.getDelay(),
batchApi.getDailyStats(),
batchApi.getRunningJobs(),
])
if (s.status === 'fulfilled') setStats(s.value)
if (m.status === 'fulfilled') setMetrics(m.value)
if (c.status === 'fulfilled') setCache(c.value)
if (d.status === 'fulfilled') setDelay(d.value)
if (ds.status === 'fulfilled') setDaily(ds.value)
if (r.status === 'fulfilled') setRunning(r.value)
usePoller(() => {
batchApi.getStatistics(days).then(setStats).catch(() => {})
monitorApi.getMetricsSummary().then(setMetrics).catch(() => {})
monitorApi.getCacheStats().then(setCache).catch(() => {})
monitorApi.getDelay().then(setDelay).catch(() => {})
batchApi.getDailyStats().then(setDaily).catch(() => {})
batchApi.getRunningJobs().then(setRunning).catch(() => {})
}, POLL_INTERVAL, [days])
const memUsage = metrics

파일 보기

@ -1,5 +1,5 @@
import { useState } from 'react'
import { usePoller } from '../hooks/usePoller.ts'
import { useCachedState } from '../hooks/useCachedState.ts'
import { useI18n } from '../hooks/useI18n.ts'
import { batchApi } from '../api/batchApi.ts'
import { monitorApi } from '../api/monitorApi.ts'
@ -28,28 +28,20 @@ const JOB_DISPLAY: Record<string, string> = {
export default function DataPipeline() {
const { t } = useI18n()
const [stats, setStats] = useState<BatchStatistics | null>(null)
const [daily, setDaily] = useState<DailyStats | null>(null)
const [delay, setDelay] = useState<ProcessingDelay | null>(null)
const [cache, setCache] = useState<CacheStats | null>(null)
const [cacheDetails, setCacheDetails] = useState<CacheDetails | null>(null)
const [recentJobs, setRecentJobs] = useState<JobExecution[]>([])
const [stats, setStats] = useCachedState<BatchStatistics | null>('pipe.stats', null)
const [daily, setDaily] = useCachedState<DailyStats | null>('pipe.daily', null)
const [delay, setDelay] = useCachedState<ProcessingDelay | null>('pipe.delay', null)
const [cache, setCache] = useCachedState<CacheStats | null>('pipe.cache', null)
const [cacheDetails, setCacheDetails] = useCachedState<CacheDetails | null>('pipe.cacheDetail', null)
const [recentJobs, setRecentJobs] = useCachedState<JobExecution[]>('pipe.jobs', [])
usePoller(async () => {
const [s, d, dl, c, cd, rj] = await Promise.allSettled([
batchApi.getStatistics(7),
batchApi.getDailyStats(),
monitorApi.getDelay(),
monitorApi.getCacheStats(),
monitorApi.getCacheDetails(),
batchApi.getJobHistory(undefined, 20),
])
if (s.status === 'fulfilled') setStats(s.value)
if (d.status === 'fulfilled') setDaily(d.value)
if (dl.status === 'fulfilled') setDelay(dl.value)
if (c.status === 'fulfilled') setCache(c.value)
if (cd.status === 'fulfilled') setCacheDetails(cd.value)
if (rj.status === 'fulfilled') setRecentJobs(rj.value)
usePoller(() => {
batchApi.getStatistics(7).then(setStats).catch(() => {})
batchApi.getDailyStats().then(setDaily).catch(() => {})
monitorApi.getDelay().then(setDelay).catch(() => {})
monitorApi.getCacheStats().then(setCache).catch(() => {})
monitorApi.getCacheDetails().then(setCacheDetails).catch(() => {})
batchApi.getJobHistory(undefined, 20).then(setRecentJobs).catch(() => {})
}, POLL_INTERVAL)
const jobCounts = stats?.byJob.executionCounts ?? {}

파일 보기

@ -16,8 +16,10 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.bind.annotation.*;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.*;
import java.util.Date;
@ -41,7 +43,11 @@ public class BatchAdminController {
@Autowired
@Qualifier("vesselTrackAggregationJob")
private Job vesselTrackAggregationJob;
@Autowired
@Qualifier("queryJdbcTemplate")
private JdbcTemplate queryJdbcTemplate;
private final BatchMetadataCleanupService batchMetadataCleanupService;
@Autowired
@ -529,7 +535,9 @@ public class BatchAdminController {
}
/**
* 배치 통계
* 배치 통계 (단일 SQL 집계)
* 기존: JobExplorer N+1 루프 (~8000 쿼리, 1.1초)
* 개선: batch 메타 테이블 직접 GROUP BY (~1 쿼리)
*/
@GetMapping("/statistics")
@Operation(summary = "배치 통계 조회", description = "지정된 기간의 배치 실행 통계를 조회합니다")
@ -537,86 +545,62 @@ public class BatchAdminController {
@Parameter(description = "조회 기간 (일)") @RequestParam(defaultValue = "7") int days) {
try {
long start = System.currentTimeMillis();
LocalDateTime since = LocalDateTime.now().minusDays(days);
Map<String, Object> statistics = new HashMap<>();
List<Map<String, Object>> jobStats = queryJdbcTemplate.queryForList(
"""
SELECT
ji.JOB_NAME as job_name,
COUNT(*) as total_executions,
COUNT(*) FILTER (WHERE je.STATUS = 'COMPLETED') as successful,
COUNT(*) FILTER (WHERE je.STATUS = 'FAILED') as failed,
COALESCE(SUM(EXTRACT(EPOCH FROM (je.END_TIME - je.START_TIME)))::bigint, 0) as total_duration_sec,
COALESCE(SUM(se.total_read), 0) as total_records
FROM batch_job_execution je
JOIN batch_job_instance ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID
LEFT JOIN (
SELECT JOB_EXECUTION_ID, SUM(READ_COUNT) as total_read
FROM batch_step_execution
GROUP BY JOB_EXECUTION_ID
) se ON je.JOB_EXECUTION_ID = se.JOB_EXECUTION_ID
WHERE je.START_TIME > ?
GROUP BY ji.JOB_NAME
""",
since
);
int totalExecutions = 0;
int successfulExecutions = 0;
int failedExecutions = 0;
long totalRecordsProcessed = 0;
long totalDuration = 0;
Map<String, Integer> jobExecutionCounts = new HashMap<>();
Map<String, Long> jobProcessingTimes = new HashMap<>();
for (String jobName : jobExplorer.getJobNames()) {
try {
List<JobInstance> instances = jobExplorer.getJobInstances(jobName, 0, 1000);
for (Map<String, Object> row : jobStats) {
String jobName = (String) row.get("job_name");
int total = ((Number) row.get("total_executions")).intValue();
int successful = ((Number) row.get("successful")).intValue();
int failed = ((Number) row.get("failed")).intValue();
long duration = ((Number) row.get("total_duration_sec")).longValue();
long records = ((Number) row.get("total_records")).longValue();
for (JobInstance instance : instances) {
try {
List<JobExecution> executions = jobExplorer.getJobExecutions(instance);
for (JobExecution execution : executions) {
try {
if (execution.getStartTime() != null &&
execution.getStartTime().isAfter(since)) {
totalExecutions++;
if (execution.getStatus() == BatchStatus.COMPLETED) {
successfulExecutions++;
} else if (execution.getStatus() == BatchStatus.FAILED) {
failedExecutions++;
}
// 처리 레코드 - ExecutionContext 오류를 피하기 위해 안전하게 처리
long records = 0;
try {
records = execution.getStepExecutions().stream()
.mapToLong(StepExecution::getReadCount)
.sum();
} catch (Exception e) {
log.warn("Failed to get read count for execution {}: {}", execution.getId(), e.getMessage());
// ExecutionContext 오류 기본값 0 사용
}
totalRecordsProcessed += records;
// 실행 시간
if (execution.getEndTime() != null) {
long duration = java.time.Duration.between(
execution.getStartTime(),
execution.getEndTime()
).getSeconds();
totalDuration += duration;
jobProcessingTimes.merge(jobName, duration, Long::sum);
}
jobExecutionCounts.merge(jobName, 1, Integer::sum);
}
} catch (Exception e) {
log.warn("Failed to process execution {}: {}", execution.getId(), e.getMessage());
// 개별 execution 처리 실패 건너뛰기
}
}
} catch (Exception e) {
log.debug("Failed to get executions for instance {}: {}", instance.getId(), e.getMessage());
// 개별 instance 처리 실패 건너뛰기 - DEBUG 레벨로 변경하여 로그 스팸 방지
}
}
} catch (Exception e) {
log.warn("Failed to get instances for job {}: {}", jobName, e.getMessage());
// 개별 job 처리 실패 건너뛰기
}
totalExecutions += total;
successfulExecutions += successful;
failedExecutions += failed;
totalDuration += duration;
totalRecordsProcessed += records;
jobExecutionCounts.put(jobName, total);
jobProcessingTimes.put(jobName, duration);
}
Map<String, Object> statistics = new HashMap<>();
statistics.put("period", Map.of(
"start", since,
"end", LocalDateTime.now(),
"days", days
));
statistics.put("summary", Map.of(
"totalExecutions", totalExecutions,
"successful", successfulExecutions,
@ -630,12 +614,12 @@ public class BatchAdminController {
"avgProcessingTimeSeconds", totalExecutions > 0 ?
totalDuration / totalExecutions : 0
));
statistics.put("byJob", Map.of(
"executionCounts", jobExecutionCounts,
"processingTimes", jobProcessingTimes
));
log.info("Batch statistics (direct SQL): {}ms", System.currentTimeMillis() - start);
return ResponseEntity.ok(statistics);
} catch (Exception e) {
@ -646,96 +630,95 @@ public class BatchAdminController {
}
/**
* 일별 처리 통계 (Dashboard 차트용)
* 일별 처리 통계 (Dashboard 차트용, 단일 SQL 집계)
* 기존: JobExplorer N+1 × 7일 루프 (~8800 쿼리, 9초)
* 개선: batch 메타 테이블 직접 GROUP BY (2 쿼리)
*/
@GetMapping("/daily-stats")
@Operation(summary = "일별 처리 통계", description = "최근 7일간 일별 배치 처리 통계를 조회합니다 (대시보드 차트용)")
public ResponseEntity<Map<String, Object>> getDailyStatistics() {
try {
Map<String, Object> result = new HashMap<>();
long start = System.currentTimeMillis();
// 1) 7일간 일별 + Job별 집계 (단일 SQL)
List<Map<String, Object>> rows = queryJdbcTemplate.queryForList(
"""
SELECT
DATE(je.START_TIME) as stat_date,
ji.JOB_NAME as job_name,
COUNT(*) as execution_count,
COALESCE(SUM(se.total_write), 0) as total_processed
FROM batch_job_execution je
JOIN batch_job_instance ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID
LEFT JOIN (
SELECT JOB_EXECUTION_ID, SUM(WRITE_COUNT) as total_write
FROM batch_step_execution
GROUP BY JOB_EXECUTION_ID
) se ON je.JOB_EXECUTION_ID = se.JOB_EXECUTION_ID
WHERE je.STATUS = 'COMPLETED'
AND je.START_TIME > NOW() - INTERVAL '7 days'
GROUP BY DATE(je.START_TIME), ji.JOB_NAME
ORDER BY stat_date
"""
);
// 일별 그룹핑: date -> [totalProcessed, vesselJobs, trackJobs]
Map<String, long[]> dailyMap = new LinkedHashMap<>();
for (Map<String, Object> row : rows) {
String date = row.get("stat_date").toString();
String jobName = (String) row.get("job_name");
int execCount = ((Number) row.get("execution_count")).intValue();
long processed = ((Number) row.get("total_processed")).longValue();
long[] accum = dailyMap.computeIfAbsent(date, k -> new long[3]);
accum[0] += processed;
if (jobName.contains("vesselAggregation")) {
accum[1] += execCount;
} else if (jobName.contains("vesselTrack")) {
accum[2] += execCount;
}
}
// 7일 전체 채우기 (데이터 없는 날도 포함)
List<Map<String, Object>> dailyStats = new ArrayList<>();
// 최근 7일간 일별 처리량
for (int i = 6; i >= 0; i--) {
LocalDateTime date = LocalDateTime.now().minusDays(i).withHour(0).withMinute(0).withSecond(0);
LocalDateTime nextDate = date.plusDays(1);
String date = LocalDate.now().minusDays(i).toString();
long[] accum = dailyMap.getOrDefault(date, new long[3]);
Map<String, Object> dailyStat = new HashMap<>();
dailyStat.put("date", date);
dailyStat.put("totalProcessed", accum[0]);
dailyStat.put("vesselJobs", (int) accum[1]);
dailyStat.put("trackJobs", (int) accum[2]);
dailyStats.add(dailyStat);
}
long totalProcessed = 0;
int vesselJobCount = 0;
int trackJobCount = 0;
// 2) 24시간 상태별 요약 (2번째 SQL)
Map<String, Integer> statusSummary = new HashMap<>();
statusSummary.put("completed", 0);
statusSummary.put("failed", 0);
statusSummary.put("stopped", 0);
// 모든 Job의 실행 이력 확인
for (String jobName : jobExplorer.getJobNames()) {
List<JobInstance> instances = jobExplorer.getJobInstances(jobName, 0, 1000);
List<Map<String, Object>> statusRows = queryJdbcTemplate.queryForList(
"""
SELECT LOWER(je.STATUS) as status, COUNT(*) as cnt
FROM batch_job_execution je
WHERE je.START_TIME > NOW() - INTERVAL '24 hours'
GROUP BY je.STATUS
"""
);
for (Map<String, Object> row : statusRows) {
String status = (String) row.get("status");
int cnt = ((Number) row.get("cnt")).intValue();
statusSummary.merge(status, cnt, Integer::sum);
}
for (JobInstance instance : instances) {
try {
List<JobExecution> executions = jobExplorer.getJobExecutions(instance);
Map<String, Object> result = new HashMap<>();
result.put("dailyStats", dailyStats);
result.put("statusSummary", statusSummary);
for (JobExecution execution : executions) {
if (execution.getStartTime() != null &&
execution.getStartTime().isAfter(date) &&
execution.getStartTime().isBefore(nextDate) &&
execution.getStatus() == BatchStatus.COMPLETED) {
log.info("Daily statistics (direct SQL): {}ms", System.currentTimeMillis() - start);
return ResponseEntity.ok(result);
// 처리된 레코드 계산
long records = execution.getStepExecutions().stream()
.mapToLong(StepExecution::getWriteCount)
.sum();
totalProcessed += records;
// Job 타입별 카운트
if (jobName.contains("vesselAggregation")) {
vesselJobCount++;
} else if (jobName.contains("vesselTrack")) {
trackJobCount++;
}
}
}
} catch(Exception e){
log.debug("Failed to get executions for instance {} in daily statistics: {}", instance.getId(), e.getMessage());
// ExecutionContext 역직렬화 실패 해당 instance는 건너뛰기
}
}
}
Map<String, Object> dailyStat = new HashMap<>();
dailyStat.put("date", date.toLocalDate().toString());
dailyStat.put("totalProcessed", totalProcessed);
dailyStat.put("vesselJobs", vesselJobCount);
dailyStat.put("trackJobs", trackJobCount);
dailyStats.add(dailyStat);
}
// Job 상태별 요약 (Status Distribution Chart용)
Map<String, Integer> statusSummary = new HashMap<>();
statusSummary.put("completed", 0);
statusSummary.put("failed", 0);
statusSummary.put("stopped", 0);
// 최근 24시간 Job 상태 통계
LocalDateTime since24h = LocalDateTime.now().minusHours(24);
for (String jobName : jobExplorer.getJobNames()) {
List<JobInstance> instances = jobExplorer.getJobInstances(jobName, 0, 200);
for (JobInstance instance : instances) {
List<JobExecution> executions = jobExplorer.getJobExecutions(instance);
for (JobExecution execution : executions) {
if (execution.getStartTime() != null &&
execution.getStartTime().isAfter(since24h)) {
String status = execution.getStatus().toString().toLowerCase();
statusSummary.merge(status, 1, Integer::sum);
}
}
}
}
result.put("dailyStats", dailyStats);
result.put("statusSummary", statusSummary);
return ResponseEntity.ok(result);
} catch (Exception e) {
log.error("Failed to get daily statistics", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)

파일 보기

@ -1,25 +1,47 @@
package gc.mda.signal_batch.monitoring.controller;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import gc.mda.signal_batch.batch.reader.AisTargetCacheManager;
import gc.mda.signal_batch.domain.vessel.model.AisTargetEntity;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.bind.annotation.*;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Slf4j
@RestController
@RequestMapping("/monitor")
@RequiredArgsConstructor
@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true)
@Tag(name = "시스템 모니터링 API", description = "데이터 처리 지연, 해구별 현황, 처리량 및 데이터 품질 모니터링 API")
public class MonitoringController {
private final JdbcTemplate queryJdbcTemplate;
private final AisTargetCacheManager aisTargetCacheManager;
/** 해구 경계 (1회 로딩, 변경 거의 없음) */
private volatile List<HaeguBounds> haeguBoundsList;
/** throughput 파티션 크기 캐시 (5분 TTL) */
private final Cache<String, List<Map<String, Object>>> partitionSizeCache = Caffeine.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.maximumSize(1)
.build();
public MonitoringController(
@Qualifier("queryJdbcTemplate") JdbcTemplate queryJdbcTemplate,
AisTargetCacheManager aisTargetCacheManager) {
this.queryJdbcTemplate = queryJdbcTemplate;
this.aisTargetCacheManager = aisTargetCacheManager;
}
/**
* 데이터 처리 지연 상태 확인 (AIS 수집 vs 5분 집계 지연)
@ -82,39 +104,62 @@ public class MonitoringController {
}
/**
* 대해구별 실시간 선박 현황 (t_ais_position + t_haegu_definitions 공간 조인)
* 대해구별 실시간 선박 현황 (인메모리 캐시 기반)
* 기존: t_ais_position JOIN t_haegu_definitions (~12초)
* 개선: AisTargetCacheManager 인메모리 순회 + 해구 바운딩 박스 매칭
*/
@GetMapping("/haegu/realtime")
@Operation(summary = "대해구별 실시간 현황", description = "최신 AIS 위치 데이터 기준 대해구별 선박 통계를 조회합니다")
public List<Map<String, Object>> getRealtimeHaeguStatus() {
String sql = """
SELECT
h.haegu_no,
CONCAT('대해구 ', h.haegu_no) as haegu_name,
COUNT(DISTINCT a.mmsi) as current_vessels,
ROUND(AVG(a.sog)::numeric, 1) as avg_speed,
ROUND((COUNT(DISTINCT a.mmsi)::numeric /
GREATEST((h.max_lat - h.min_lat) * (h.max_lon - h.min_lon) * 12321, 0.01))::numeric,
4) as avg_density,
MAX(a.last_update) as last_update,
h.center_lon,
h.center_lat
FROM signal.t_haegu_definitions h
JOIN signal.t_ais_position a
ON a.lat BETWEEN h.min_lat AND h.max_lat
AND a.lon BETWEEN h.min_lon AND h.max_lon
WHERE a.last_update > NOW() - INTERVAL '30 minutes'
GROUP BY h.haegu_no, h.min_lat, h.min_lon, h.max_lat, h.max_lon,
h.center_lon, h.center_lat
HAVING COUNT(DISTINCT a.mmsi) > 0
ORDER BY current_vessels DESC
LIMIT 50
""";
try {
long start = System.currentTimeMillis();
List<Map<String, Object>> result = queryJdbcTemplate.queryForList(sql);
log.info("Haegu realtime query: {} rows in {}ms", result.size(), System.currentTimeMillis() - start);
List<HaeguBounds> bounds = getHaeguBounds();
OffsetDateTime threshold = OffsetDateTime.now().minusMinutes(30);
Map<Integer, HaeguAccumulator> accumulators = new LinkedHashMap<>();
for (HaeguBounds b : bounds) {
accumulators.put(b.haeguNo, new HaeguAccumulator());
}
for (AisTargetEntity entity : aisTargetCacheManager.getAllValues()) {
if (entity.getLat() == null || entity.getLon() == null) continue;
if (entity.getMessageTimestamp() == null || entity.getMessageTimestamp().isBefore(threshold)) continue;
double lat = entity.getLat();
double lon = entity.getLon();
for (HaeguBounds b : bounds) {
if (lat >= b.minLat && lat <= b.maxLat && lon >= b.minLon && lon <= b.maxLon) {
accumulators.get(b.haeguNo).add(entity);
}
}
}
List<Map<String, Object>> result = new ArrayList<>();
for (HaeguBounds b : bounds) {
HaeguAccumulator acc = accumulators.get(b.haeguNo);
if (acc.vesselCount() == 0) continue;
double area = Math.max((b.maxLat - b.minLat) * (b.maxLon - b.minLon) * 12321, 0.01);
Map<String, Object> row = new LinkedHashMap<>();
row.put("haegu_no", b.haeguNo);
row.put("haegu_name", "대해구 " + b.haeguNo);
row.put("current_vessels", acc.vesselCount());
row.put("avg_speed", Math.round(acc.avgSpeed() * 10.0) / 10.0);
row.put("avg_density", Math.round(acc.vesselCount() / area * 10000.0) / 10000.0);
row.put("last_update", acc.latestUpdate());
row.put("center_lon", b.centerLon);
row.put("center_lat", b.centerLat);
result.add(row);
}
result.sort((a, b2) -> Integer.compare(
((Number) b2.get("current_vessels")).intValue(),
((Number) a.get("current_vessels")).intValue()));
if (result.size() > 50) {
result = new ArrayList<>(result.subList(0, 50));
}
log.info("Haegu realtime (in-memory): {} rows in {}ms", result.size(), System.currentTimeMillis() - start);
return result;
} catch (Exception e) {
log.error("Failed to get realtime haegu status: {}", e.getMessage(), e);
@ -158,17 +203,19 @@ public class MonitoringController {
metrics.put("hourlyDetails", hourlyStats);
}
List<Map<String, Object>> partitionSizes = queryJdbcTemplate.queryForList(
"""
SELECT
tablename,
pg_size_pretty(pg_total_relation_size('signal.' || tablename)) as size,
pg_total_relation_size('signal.' || tablename) as size_bytes
FROM pg_tables
WHERE schemaname = 'signal'
AND tablename LIKE 't_vessel_tracks_%'
ORDER BY tablename
"""
List<Map<String, Object>> partitionSizes = partitionSizeCache.get("sizes", k ->
queryJdbcTemplate.queryForList(
"""
SELECT
tablename,
pg_size_pretty(pg_total_relation_size('signal.' || tablename)) as size,
pg_total_relation_size('signal.' || tablename) as size_bytes
FROM pg_tables
WHERE schemaname = 'signal'
AND tablename LIKE 't_vessel_tracks_%'
ORDER BY tablename
"""
)
);
metrics.put("partitionSizes", partitionSizes);
@ -210,6 +257,7 @@ public class MonitoringController {
SELECT COUNT(*)
FROM signal.t_ais_position
WHERE last_update < NOW() - INTERVAL '30 minutes'
AND last_update > NOW() - INTERVAL '24 hours'
""",
Integer.class
);
@ -232,6 +280,60 @@ public class MonitoringController {
return quality;
}
// ==================== 해구 인메모리 헬퍼 ====================
private List<HaeguBounds> getHaeguBounds() {
if (haeguBoundsList != null) {
return haeguBoundsList;
}
synchronized (this) {
if (haeguBoundsList != null) {
return haeguBoundsList;
}
haeguBoundsList = queryJdbcTemplate.query(
"SELECT haegu_no, min_lat, max_lat, min_lon, max_lon, center_lon, center_lat FROM signal.t_haegu_definitions",
(rs, rowNum) -> new HaeguBounds(
rs.getInt("haegu_no"),
rs.getDouble("min_lat"), rs.getDouble("max_lat"),
rs.getDouble("min_lon"), rs.getDouble("max_lon"),
rs.getDouble("center_lon"), rs.getDouble("center_lat")
)
);
log.info("해구 경계 로딩 완료: {}개", haeguBoundsList.size());
return haeguBoundsList;
}
}
private record HaeguBounds(int haeguNo, double minLat, double maxLat,
double minLon, double maxLon,
double centerLon, double centerLat) {}
private static class HaeguAccumulator {
private final Set<String> mmsiSet = new HashSet<>();
private double sogSum = 0;
private int sogCount = 0;
private OffsetDateTime latest = null;
void add(AisTargetEntity entity) {
mmsiSet.add(entity.getMmsi());
if (entity.getSog() != null) {
sogSum += entity.getSog();
sogCount++;
}
if (entity.getMessageTimestamp() != null) {
if (latest == null || entity.getMessageTimestamp().isAfter(latest)) {
latest = entity.getMessageTimestamp();
}
}
}
int vesselCount() { return mmsiSet.size(); }
double avgSpeed() { return sogCount > 0 ? sogSum / sogCount : 0; }
OffsetDateTime latestUpdate() { return latest; }
}
// ==================== 유틸 ====================
private LocalDateTime toLocalDateTime(Object raw) {
if (raw == null) return null;
if (raw instanceof LocalDateTime ldt) return ldt;