signal-batch/scripts/test-with-real-data.sql
htlee 2e9361ee58 refactor: SNP API 전환 및 레거시 코드 전면 정리
- CollectDB 다중 신호 수집 → S&P Global AIS API 단일 수집으로 전환
- sig_src_cd + target_id 이중 식별자 → mmsi(VARCHAR) 단일 식별자
- t_vessel_latest_position → t_ais_position 테이블 전환
- 레거시 배치/유틸 ~30개 클래스 삭제 (VesselAggregationJobConfig, ShipKindCodeConverter 등)
- AisTargetCacheManager 기반 캐시 이중 구조 (최신위치 + 트랙 버퍼)
- CacheBasedVesselTrackDataReader + CacheBasedTrackJobListener 신규 추가
- VesselStaticStepConfig: 정적정보 CDC 변경 검출 + hourly job 편승
- SignalKindCode enum: vesselType/extraInfo 기반 선종 자동 분류
- WebSocket/STOMP 전체 mmsi 전환 (StompTrackStreamingService ~40곳)
- 모니터링/성능 최적화 코드 mmsi 기반 전환
- DataSource 설정 통합 (snpdb 단일 DB)
- AreaBoundaryCache Polygon→Geometry 캐스트 수정 (MULTIPOLYGON 지원)
- ConcurrentHashMap 적용 (VesselTrackStepConfig 동시성 버그 수정)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 09:59:49 +09:00

275 lines
9.8 KiB
SQL

-- ========================================
-- 실제 테이블 데이터로 CAST 호환성 테스트
-- ========================================
-- 1. 최근 5분 데이터 샘플 확인 (100개)
SELECT
'=== SAMPLE 5MIN DATA ===' as section,
sig_src_cd,
target_id,
time_bucket,
public.ST_NPoints(track_geom) as points,
public.ST_IsValid(track_geom) as is_valid
FROM signal.t_vessel_tracks_5min
WHERE track_geom IS NOT NULL
AND public.ST_NPoints(track_geom) > 0
ORDER BY time_bucket DESC
LIMIT 100;
-- 2. 테스트할 선박 선정 (최근 1시간 내 5분 데이터가 있는 선박)
WITH recent_vessels AS (
SELECT
sig_src_cd,
target_id,
DATE_TRUNC('hour', time_bucket) as hour_bucket,
COUNT(*) as record_count,
MIN(time_bucket) as min_time,
MAX(time_bucket) as max_time
FROM signal.t_vessel_tracks_5min
WHERE time_bucket >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
AND track_geom IS NOT NULL
AND public.ST_NPoints(track_geom) > 0
GROUP BY sig_src_cd, target_id, DATE_TRUNC('hour', time_bucket)
HAVING COUNT(*) >= 2
ORDER BY hour_bucket DESC
LIMIT 10
)
SELECT
'=== TEST CANDIDATE VESSELS ===' as section,
sig_src_cd,
target_id,
hour_bucket,
record_count,
min_time,
max_time
FROM recent_vessels;
-- 3. 특정 선박의 5분 데이터 상세 확인
-- 아래 값들을 위 결과에서 선택해서 수정하세요
-- 예시: sig_src_cd = '000019', target_id = '111440547', hour_bucket = '2025-01-07 10:00:00'
\set test_sig_src_cd '000019'
\set test_target_id '111440547'
\set test_hour_start '''2025-01-07 10:00:00'''
\set test_hour_end '''2025-01-07 11:00:00'''
SELECT
'=== 5MIN DATA FOR TEST VESSEL ===' as section,
sig_src_cd,
target_id,
time_bucket,
public.ST_NPoints(track_geom) as points,
public.ST_IsValid(track_geom) as is_valid,
public.ST_GeometryType(track_geom) as geom_type,
public.ST_AsText(track_geom) as wkt,
substring(public.ST_AsText(track_geom) from 'LINESTRING\\s*M\\s*\\((.+)\\)') as regex_v1,
COALESCE(
substring(public.ST_AsText(track_geom) from 'LINESTRING\\s*M\\s*\\((.+)\\)'),
substring(public.ST_AsText(track_geom) from '\\((.+)\\)')
) as regex_v2
FROM signal.t_vessel_tracks_5min
WHERE sig_src_cd = :'test_sig_src_cd'
AND target_id = :'test_target_id'
AND time_bucket >= CAST(:test_hour_start AS timestamp)
AND time_bucket < CAST(:test_hour_end AS timestamp)
AND track_geom IS NOT NULL
AND public.ST_NPoints(track_geom) > 0
ORDER BY time_bucket;
-- 4. string_agg 결과 확인
SELECT
'=== STRING_AGG TEST ===' as section,
sig_src_cd,
target_id,
string_agg(
COALESCE(
substring(public.ST_AsText(track_geom) from 'LINESTRING\\s*M\\s*\\((.+)\\)'),
substring(public.ST_AsText(track_geom) from '\\((.+)\\)')
),
','
ORDER BY time_bucket
) FILTER (WHERE track_geom IS NOT NULL) as all_coords,
COUNT(*) as track_count
FROM signal.t_vessel_tracks_5min
WHERE sig_src_cd = :'test_sig_src_cd'
AND target_id = :'test_target_id'
AND time_bucket >= CAST(:test_hour_start AS timestamp)
AND time_bucket < CAST(:test_hour_end AS timestamp)
AND track_geom IS NOT NULL
AND public.ST_NPoints(track_geom) > 0
GROUP BY sig_src_cd, target_id;
-- 5. 병합된 WKT로 geometry 생성 테스트
WITH ordered_tracks AS (
SELECT *
FROM signal.t_vessel_tracks_5min
WHERE sig_src_cd = :'test_sig_src_cd'
AND target_id = :'test_target_id'
AND time_bucket >= CAST(:test_hour_start AS timestamp)
AND time_bucket < CAST(:test_hour_end AS timestamp)
AND track_geom IS NOT NULL
AND public.ST_NPoints(track_geom) > 0
ORDER BY time_bucket
),
merged_coords AS (
SELECT
sig_src_cd,
target_id,
string_agg(
COALESCE(
substring(public.ST_AsText(track_geom) from 'LINESTRING\\s*M\\s*\\((.+)\\)'),
substring(public.ST_AsText(track_geom) from '\\((.+)\\)')
),
','
ORDER BY time_bucket
) FILTER (WHERE track_geom IS NOT NULL) as all_coords
FROM ordered_tracks
GROUP BY sig_src_cd, target_id
)
SELECT
'=== WKT GENERATION TEST ===' as section,
sig_src_cd,
target_id,
'LINESTRING M(' || all_coords || ')' as full_wkt,
LENGTH(all_coords) as coords_length,
public.ST_GeomFromText('LINESTRING M(' || all_coords || ')', 4326) as test_geom,
public.ST_NPoints(public.ST_GeomFromText('LINESTRING M(' || all_coords || ')', 4326)) as merged_points,
public.ST_IsValid(public.ST_GeomFromText('LINESTRING M(' || all_coords || ')', 4326)) as is_valid
FROM merged_coords;
-- 6. 전체 시간별 집계 쿼리 실행 (SELECT만, INSERT 안함)
WITH ordered_tracks AS (
SELECT *
FROM signal.t_vessel_tracks_5min
WHERE sig_src_cd = :'test_sig_src_cd'
AND target_id = :'test_target_id'
AND time_bucket >= CAST(:test_hour_start AS timestamp)
AND time_bucket < CAST(:test_hour_end AS timestamp)
AND track_geom IS NOT NULL
AND public.ST_NPoints(track_geom) > 0
ORDER BY time_bucket
),
merged_coords AS (
SELECT
sig_src_cd,
target_id,
string_agg(
COALESCE(
substring(public.ST_AsText(track_geom) from 'LINESTRING\\s*M\\s*\\((.+)\\)'),
substring(public.ST_AsText(track_geom) from '\\((.+)\\)')
),
','
ORDER BY time_bucket
) FILTER (WHERE track_geom IS NOT NULL) as all_coords
FROM ordered_tracks
GROUP BY sig_src_cd, target_id
),
merged_tracks AS (
SELECT
mc.sig_src_cd,
mc.target_id,
CAST(:test_hour_start AS timestamp) as time_bucket,
public.ST_GeomFromText('LINESTRING M(' || mc.all_coords || ')', 4326) as merged_geom,
(SELECT MAX(max_speed) FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id) as max_speed,
(SELECT SUM(point_count) FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id) as total_points,
(SELECT MIN(time_bucket) FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id) as start_time,
(SELECT MAX(time_bucket) FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id) as end_time,
(SELECT start_position FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id ORDER BY time_bucket LIMIT 1) as start_pos,
(SELECT end_position FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id ORDER BY time_bucket DESC LIMIT 1) as end_pos
FROM merged_coords mc
),
calculated_tracks AS (
SELECT
*,
public.ST_Length(merged_geom::geography) / 1852.0 as total_distance,
CASE
WHEN public.ST_NPoints(merged_geom) > 0 THEN
public.ST_M(public.ST_PointN(merged_geom, public.ST_NPoints(merged_geom))) -
public.ST_M(public.ST_PointN(merged_geom, 1))
ELSE
EXTRACT(EPOCH FROM
CAST(end_pos->>'time' AS timestamp) - CAST(start_pos->>'time' AS timestamp)
)
END as time_diff_seconds
FROM merged_tracks
)
SELECT
'=== FULL HOURLY AGGREGATION TEST ===' as section,
sig_src_cd,
target_id,
time_bucket,
public.ST_NPoints(merged_geom) as merged_points,
public.ST_IsValid(merged_geom) as is_valid,
total_distance,
CASE
WHEN time_diff_seconds > 0 THEN
CAST(LEAST((total_distance / (time_diff_seconds / 3600.0)), 9999.99) AS numeric(6,2))
ELSE 0
END as avg_speed,
max_speed,
total_points,
start_time,
end_time,
start_pos,
end_pos,
public.ST_AsText(merged_geom) as geom_text,
time_diff_seconds
FROM calculated_tracks;
-- 7. M값 시간 순서 검증
WITH ordered_tracks AS (
SELECT *
FROM signal.t_vessel_tracks_5min
WHERE sig_src_cd = :'test_sig_src_cd'
AND target_id = :'test_target_id'
AND time_bucket >= CAST(:test_hour_start AS timestamp)
AND time_bucket < CAST(:test_hour_end AS timestamp)
AND track_geom IS NOT NULL
AND public.ST_NPoints(track_geom) > 0
ORDER BY time_bucket
),
merged_coords AS (
SELECT
sig_src_cd,
target_id,
string_agg(
COALESCE(
substring(public.ST_AsText(track_geom) from 'LINESTRING\\s*M\\s*\\((.+)\\)'),
substring(public.ST_AsText(track_geom) from '\\((.+)\\)')
),
','
ORDER BY time_bucket
) FILTER (WHERE track_geom IS NOT NULL) as all_coords
FROM ordered_tracks
GROUP BY sig_src_cd, target_id
),
merged_tracks AS (
SELECT
mc.sig_src_cd,
mc.target_id,
public.ST_GeomFromText('LINESTRING M(' || mc.all_coords || ')', 4326) as merged_geom
FROM merged_coords mc
)
SELECT
'=== TIME ORDERING CHECK ===' as section,
sig_src_cd,
target_id,
public.ST_M(public.ST_PointN(merged_geom, 1)) as first_m_value,
to_timestamp(public.ST_M(public.ST_PointN(merged_geom, 1))) as first_time,
public.ST_M(public.ST_PointN(merged_geom, public.ST_NPoints(merged_geom))) as last_m_value,
to_timestamp(public.ST_M(public.ST_PointN(merged_geom, public.ST_NPoints(merged_geom)))) as last_time,
CASE
WHEN public.ST_M(public.ST_PointN(merged_geom, public.ST_NPoints(merged_geom))) >=
public.ST_M(public.ST_PointN(merged_geom, 1))
THEN 'PASS'
ELSE 'FAIL'
END as time_order_check
FROM merged_tracks;
-- ========================================
-- 사용 방법:
-- 1. 먼저 쿼리 2번 실행해서 테스트할 선박 선택
-- 2. \set 변수 값 수정 (라인 48-51)
-- 3. 전체 스크립트 실행
-- 4. 각 섹션별 결과 확인
-- ========================================