import { useState } from 'react' import { usePoller } from '../hooks/usePoller.ts' import { useI18n } from '../hooks/useI18n.ts' import { batchApi } from '../api/batchApi.ts' import { monitorApi } from '../api/monitorApi.ts' import type { BatchStatistics, CacheStats, DailyStats, JobExecution, ProcessingDelay, } from '../api/types.ts' import type { CacheDetails } from '../api/types.ts' import PipelineChart from '../components/charts/PipelineChart.tsx' import LineChart from '../components/charts/LineChart.tsx' import MetricCard from '../components/charts/MetricCard.tsx' import StatusBadge from '../components/common/StatusBadge.tsx' import { formatNumber, formatDuration, formatDateTime, formatPercent } from '../utils/formatters.ts' const POLL_INTERVAL = 30_000 const JOB_DISPLAY: Record = { aisTargetImportJob: 'AIS 수집 (1분)', vesselTrackAggregationJob: 'Track (5분)', hourlyAggregationJob: 'Hourly (1시간)', dailyAggregationJob: 'Daily (1일)', } export default function DataPipeline() { const { t } = useI18n() const [stats, setStats] = useState(null) const [daily, setDaily] = useState(null) const [delay, setDelay] = useState(null) const [cache, setCache] = useState(null) const [cacheDetails, setCacheDetails] = useState(null) const [recentJobs, setRecentJobs] = useState([]) usePoller(async () => { const [s, d, dl, c, cd, rj] = await Promise.allSettled([ batchApi.getStatistics(7), batchApi.getDailyStats(), monitorApi.getDelay(), monitorApi.getCacheStats(), monitorApi.getCacheDetails(), batchApi.getJobHistory(undefined, 20), ]) if (s.status === 'fulfilled') setStats(s.value) if (d.status === 'fulfilled') setDaily(d.value) if (dl.status === 'fulfilled') setDelay(dl.value) if (c.status === 'fulfilled') setCache(c.value) if (cd.status === 'fulfilled') setCacheDetails(cd.value) if (rj.status === 'fulfilled') setRecentJobs(rj.value) }, POLL_INTERVAL) const jobCounts = stats?.byJob.executionCounts ?? {} const jobTimes = stats?.byJob.processingTimes ?? {} const getStageStatus = (jobName: string) => { const recent = recentJobs.find(j => j.jobName === jobName) if (!recent) return 'idle' as const if (recent.status === 'STARTED' || recent.status === 'STARTING') return 'active' as const if (recent.status === 'FAILED') return 'error' as const return 'active' as const } const stages = [ { label: 'AIS API', sublabel: t('pipeline.collect1min'), count: formatNumber(jobCounts['aisTargetImportJob'] ?? 0), status: getStageStatus('aisTargetImportJob'), }, { label: 'Track 5min', sublabel: t('pipeline.aggregate5min'), count: formatNumber(jobCounts['vesselTrackAggregationJob'] ?? 0), status: getStageStatus('vesselTrackAggregationJob'), }, { label: 'Hourly', sublabel: t('pipeline.mergeHourly'), count: formatNumber(jobCounts['hourlyAggregationJob'] ?? 0), status: getStageStatus('hourlyAggregationJob'), }, { label: 'Daily', sublabel: t('pipeline.mergeDaily'), count: formatNumber(jobCounts['dailyAggregationJob'] ?? 0), status: getStageStatus('dailyAggregationJob'), }, ] /* 일별 처리량 라인 차트 데이터 */ const dailyChartData = daily?.dailyStats.map(d => ({ date: d.date.slice(5), processed: d.totalProcessed, vessel: d.vesselJobs, track: d.trackJobs, })) ?? [] return (
{/* Header */}

{t('pipeline.title')}

{/* Pipeline Flow */}
{t('pipeline.flowTitle')}
{/* Stage Metric Cards */}
{Object.entries(JOB_DISPLAY).map(([jobName, label]) => ( ))}
{/* Processing Delay + Cache Status */}
{t('pipeline.processingDelay')}
{delay ? (
{delay.delayMinutes ?? 0} {t('pipeline.delayMin')}
{t('pipeline.aisLatest')}
{formatDateTime(delay.aisLatestTime)}
{t('pipeline.processLatest')}
{formatDateTime(delay.queryLatestTime)}
) : (
{t('common.loading')}
)}
{t('pipeline.cacheOverview')}
{cacheDetails ? (
L1 (5min)
{formatNumber(cacheDetails.l1_fiveMin?.size)}
/ {formatNumber(cacheDetails.l1_fiveMin?.maxSize)}
L2 (Hourly)
{formatNumber(cacheDetails.l2_hourly?.size)}
/ {formatNumber(cacheDetails.l2_hourly?.maxSize)}
L3 (Daily)
{cacheDetails.l3_daily?.cachedDays ?? 0}
{t('pipeline.cachedDays')}
{cache && (
{t('pipeline.totalHitRate')}
{cache.hitRate}
)}
) : (
{t('common.loading')}
)}
{/* Daily Throughput Timeline */} {dailyChartData.length > 0 && (
{t('pipeline.dailyThroughput')}
)} {/* Recent Job Executions */}
{t('pipeline.recentJobs')}
{recentJobs.length === 0 ? (
{t('common.noData')}
) : ( recentJobs.slice(0, 10).map(job => (
{JOB_DISPLAY[job.jobName] ?? job.jobName}
#{job.executionId} · {formatDateTime(job.startTime)}
{formatDuration(job.durationSeconds)}
R:{formatNumber(job.totalRead)} W:{formatNumber(job.totalWrite)}
)) )}
{/* Summary Stats */} {stats && (
)}
) }