import { Client, type IMessage } from '@stomp/stompjs' import { useMergedTrackStore } from '../stores/mergedTrackStore' import { useReplayStore } from '../stores/replayStore' import type { ViewportBounds } from '../../vessel-map' const CONNECTION_TIMEOUT = 10_000 const QUERY_TIMEOUT = 300_000 export interface TrackQueryRequest { startTime: string endTime: string viewport?: { minLon: number maxLon: number minLat: number maxLat: number } chunkedMode: boolean chunkSize: number simplificationMode: string zoomLevel: number } export interface TrackChunkResponse { queryId: string chunkIndex: number totalChunks?: number | null tracks?: TrackChunkData[] mergedTracks?: TrackChunkData[] compactTracks?: TrackChunkData[] isLastChunk?: boolean } export interface TrackChunkData { vesselId: string shipName?: string shipKindCode?: string nationalCode?: string geometry?: [number, number][] timestamps?: (string | number)[] speeds?: number[] totalDistance?: number maxSpeed?: number avgSpeed?: number } /** 타임스탬프를 ms 단위로 정규화 */ function parseTimestamp(ts: string | number): number { if (typeof ts === 'number') { return ts < 1e12 ? ts * 1000 : ts } if (/^\d{10,}$/.test(ts)) { return parseInt(ts, 10) * 1000 } if (ts.includes(' ') && !ts.includes('T')) { const [datePart, timePart] = ts.split(' ') return new Date(`${datePart}T${timePart}`).getTime() } const parsed = new Date(ts).getTime() return isNaN(parsed) ? 0 : parsed } /** * STOMP WebSocket 리플레이 서비스 * 싱글턴 — connect/disconnect/executeQuery/cancel */ class ReplayWebSocketService { private client: Client | null = null private currentQueryId: string | null = null private queryTimeoutId: ReturnType | null = null /** WebSocket 연결 */ connect(wsUrl: string): Promise { return new Promise((resolve, reject) => { if (this.client?.connected) { resolve() return } const replayStore = useReplayStore.getState() replayStore.setConnectionState('connecting') this.client = new Client({ brokerURL: wsUrl, reconnectDelay: 0, connectionTimeout: CONNECTION_TIMEOUT, heartbeatIncoming: 10_000, heartbeatOutgoing: 10_000, onConnect: () => { replayStore.setConnectionState('connected') this.setupSubscriptions() resolve() }, onStompError: (frame) => { console.error('[ReplayWS] STOMP error:', frame.headers.message, frame.body) if (replayStore.querying) { replayStore.completeQuery() } replayStore.setConnectionState('error') reject(new Error(frame.headers.message)) }, onWebSocketError: (evt) => { console.error('[ReplayWS] WebSocket error:', evt) replayStore.setConnectionState('error') reject(new Error('WebSocket connection failed')) }, onDisconnect: () => { replayStore.setConnectionState('disconnected') }, }) this.client.activate() }) } /** WebSocket 연결 해제 */ disconnect(): void { this.clearQueryTimeout() if (this.client) { this.client.deactivate() this.client = null } useReplayStore.getState().setConnectionState('disconnected') } /** 항적 쿼리 실행 */ executeQuery( startTime: string, endTime: string, viewport: ViewportBounds, zoomLevel: number, ): void { if (!this.client?.connected) { console.error('[ReplayWS] Not connected') return } // 이전 쿼리 정리 if (this.currentQueryId) { this.cancelQuery() } useMergedTrackStore.getState().clear() useReplayStore.getState().startQuery() const request: TrackQueryRequest = { startTime, endTime, viewport: { minLon: viewport.west, maxLon: viewport.east, minLat: viewport.south, maxLat: viewport.north, }, chunkedMode: true, chunkSize: 20_000, simplificationMode: 'AUTO', zoomLevel, } console.log('[ReplayWS] Sending query:', request.startTime, '~', request.endTime, 'zoom:', request.zoomLevel) this.client.publish({ destination: '/app/tracks/query', body: JSON.stringify(request), }) this.queryTimeoutId = setTimeout(() => { console.warn('[ReplayWS] Query timeout') useReplayStore.getState().completeQuery() }, QUERY_TIMEOUT) } /** 진행 중인 쿼리 취소 */ cancelQuery(): void { if (this.currentQueryId && this.client?.connected) { this.client.publish({ destination: `/app/tracks/cancel/${this.currentQueryId}`, body: '', }) } this.clearQueryTimeout() this.currentQueryId = null useReplayStore.getState().completeQuery() } private setupSubscriptions(): void { if (!this.client) return this.client.subscribe('/user/queue/tracks/chunk', (msg: IMessage) => { this.handleChunkMessage(msg) }) this.client.subscribe('/user/queue/tracks/status', (msg: IMessage) => { this.handleStatusMessage(msg) }) this.client.subscribe('/user/queue/tracks/response', (msg: IMessage) => { try { const data = JSON.parse(msg.body) console.log('[ReplayWS] Response:', data.status, data.queryId) if (data.queryId) { this.currentQueryId = data.queryId } if (data.status === 'ERROR') { console.error('[ReplayWS] Query error:', data.message) this.clearQueryTimeout() useReplayStore.getState().completeQuery() } } catch { /* ignore */ } }) } private handleChunkMessage(msg: IMessage): void { try { const chunk = JSON.parse(msg.body) as TrackChunkResponse const tracks = chunk.tracks || chunk.mergedTracks || chunk.compactTracks || [] if (tracks.length === 0) return // 타임스탬프 정규화 const normalizedTracks = tracks.map((t) => ({ ...t, timestampsMs: (t.timestamps || []).map(parseTimestamp), })) useMergedTrackStore.getState().addChunk(normalizedTracks) const replayStore = useReplayStore.getState() replayStore.updateProgress( replayStore.receivedChunks + 1, chunk.totalChunks ?? replayStore.totalChunks, ) if (chunk.isLastChunk) { this.clearQueryTimeout() replayStore.completeQuery() } } catch (err) { console.error('[ReplayWS] Chunk parse error:', err) } } private handleStatusMessage(msg: IMessage): void { try { const data = JSON.parse(msg.body) if (data.status === 'COMPLETED' || data.status === 'ERROR') { this.clearQueryTimeout() useReplayStore.getState().completeQuery() } } catch { /* ignore */ } } private clearQueryTimeout(): void { if (this.queryTimeoutId) { clearTimeout(this.queryTimeoutId) this.queryTimeoutId = null } } } export const replayWebSocket = new ReplayWebSocketService()