signal-batch/scripts/test-daily-aggregation-fixed.sql
htlee 2e9361ee58 refactor: SNP API 전환 및 레거시 코드 전면 정리
- CollectDB 다중 신호 수집 → S&P Global AIS API 단일 수집으로 전환
- sig_src_cd + target_id 이중 식별자 → mmsi(VARCHAR) 단일 식별자
- t_vessel_latest_position → t_ais_position 테이블 전환
- 레거시 배치/유틸 ~30개 클래스 삭제 (VesselAggregationJobConfig, ShipKindCodeConverter 등)
- AisTargetCacheManager 기반 캐시 이중 구조 (최신위치 + 트랙 버퍼)
- CacheBasedVesselTrackDataReader + CacheBasedTrackJobListener 신규 추가
- VesselStaticStepConfig: 정적정보 CDC 변경 검출 + hourly job 편승
- SignalKindCode enum: vesselType/extraInfo 기반 선종 자동 분류
- WebSocket/STOMP 전체 mmsi 전환 (StompTrackStreamingService ~40곳)
- 모니터링/성능 최적화 코드 mmsi 기반 전환
- DataSource 설정 통합 (snpdb 단일 DB)
- AreaBoundaryCache Polygon→Geometry 캐스트 수정 (MULTIPOLYGON 지원)
- ConcurrentHashMap 적용 (VesselTrackStepConfig 동시성 버그 수정)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 09:59:49 +09:00

497 lines
18 KiB
SQL

-- ========================================
-- 일별 집계 쿼리 검증 스크립트
-- CAST 및 타입 호환성 테스트
-- ========================================
-- 1. 임시 테스트 테이블 생성
DROP TABLE IF EXISTS test_vessel_tracks_hourly_for_daily CASCADE;
DROP TABLE IF EXISTS test_vessel_tracks_daily CASCADE;
CREATE TABLE test_vessel_tracks_hourly_for_daily (
sig_src_cd VARCHAR(10),
target_id VARCHAR(20),
time_bucket TIMESTAMP,
track_geom geometry(LineStringM, 4326),
distance_nm NUMERIC(10,2),
avg_speed NUMERIC(6,2),
max_speed NUMERIC(6,2),
point_count INTEGER,
start_position JSONB,
end_position JSONB,
PRIMARY KEY (sig_src_cd, target_id, time_bucket)
);
CREATE TABLE test_vessel_tracks_daily (
sig_src_cd VARCHAR(10),
target_id VARCHAR(20),
time_bucket TIMESTAMP,
track_geom geometry(LineStringM, 4326),
distance_nm NUMERIC(10,2),
avg_speed NUMERIC(6,2),
max_speed NUMERIC(6,2),
point_count INTEGER,
start_position JSONB,
end_position JSONB,
PRIMARY KEY (sig_src_cd, target_id, time_bucket)
);
-- 2. 샘플 데이터 삽입 (하루치 시간별 데이터)
-- 시나리오 1: 정상 이동 선박 (24시간 중 일부)
INSERT INTO test_vessel_tracks_hourly_for_daily VALUES
(
'000001',
'TEST001',
'2025-01-07 00:00:00',
public.ST_GeomFromText('LINESTRING M(126.5 37.5 1736179200, 126.52 37.52 1736182800)', 4326),
5.5,
10.5,
12.0,
12,
'{"lat": 37.5, "lon": 126.5, "time": "2025-01-07 00:00:00", "sog": 10.5}'::jsonb,
'{"lat": 37.52, "lon": 126.52, "time": "2025-01-07 01:00:00", "sog": 11.0}'::jsonb
),
(
'000001',
'TEST001',
'2025-01-07 01:00:00',
public.ST_GeomFromText('LINESTRING M(126.52 37.52 1736182800, 126.54 37.54 1736186400)', 4326),
6.0,
11.0,
13.0,
12,
'{"lat": 37.52, "lon": 126.52, "time": "2025-01-07 01:00:00", "sog": 11.0}'::jsonb,
'{"lat": 37.54, "lon": 126.54, "time": "2025-01-07 02:00:00", "sog": 12.0}'::jsonb
),
(
'000001',
'TEST001',
'2025-01-07 02:00:00',
public.ST_GeomFromText('LINESTRING M(126.54 37.54 1736186400, 126.56 37.56 1736190000)', 4326),
5.8,
10.8,
12.5,
12,
'{"lat": 37.54, "lon": 126.54, "time": "2025-01-07 02:00:00", "sog": 10.8}'::jsonb,
'{"lat": 37.56, "lon": 126.56, "time": "2025-01-07 03:00:00", "sog": 11.5}'::jsonb
),
(
'000001',
'TEST001',
'2025-01-07 03:00:00',
public.ST_GeomFromText('LINESTRING M(126.56 37.56 1736190000, 126.58 37.58 1736193600)', 4326),
6.2,
11.2,
13.5,
12,
'{"lat": 37.56, "lon": 126.56, "time": "2025-01-07 03:00:00", "sog": 11.2}'::jsonb,
'{"lat": 37.58, "lon": 126.58, "time": "2025-01-07 04:00:00", "sog": 12.5}'::jsonb
);
-- 시나리오 2: 정박 선박
INSERT INTO test_vessel_tracks_hourly_for_daily VALUES
(
'000002',
'TEST002',
'2025-01-07 00:00:00',
public.ST_GeomFromText('LINESTRING M(129.0 35.0 1736179200, 129.0 35.0 1736182800)', 4326),
0.0,
0.0,
0.5,
24,
'{"lat": 35.0, "lon": 129.0, "time": "2025-01-07 00:00:00", "sog": 0.0}'::jsonb,
'{"lat": 35.0, "lon": 129.0, "time": "2025-01-07 01:00:00", "sog": 0.0}'::jsonb
),
(
'000002',
'TEST002',
'2025-01-07 01:00:00',
public.ST_GeomFromText('LINESTRING M(129.0 35.0 1736182800, 129.0 35.0 1736186400)', 4326),
0.0,
0.0,
0.3,
24,
'{"lat": 35.0, "lon": 129.0, "time": "2025-01-07 01:00:00", "sog": 0.0}'::jsonb,
'{"lat": 35.0, "lon": 129.0, "time": "2025-01-07 02:00:00", "sog": 0.0}'::jsonb
);
-- 시나리오 3: 단일 시간 데이터
INSERT INTO test_vessel_tracks_hourly_for_daily VALUES
(
'000003',
'TEST003',
'2025-01-07 00:00:00',
public.ST_GeomFromText('LINESTRING M(130.0 36.0 1736179200, 130.0 36.0 1736179200)', 4326),
0.0,
0.0,
0.0,
2,
'{"lat": 36.0, "lon": 130.0, "time": "2025-01-07 00:00:00", "sog": 0.0}'::jsonb,
'{"lat": 36.0, "lon": 130.0, "time": "2025-01-07 00:00:00", "sog": 0.0}'::jsonb
);
-- 3. 입력 데이터 검증
SELECT
'=== INPUT DATA VALIDATION ===' as section,
sig_src_cd,
target_id,
time_bucket,
public.ST_NPoints(track_geom) as points,
public.ST_IsValid(track_geom) as is_valid,
public.ST_AsText(track_geom) as wkt
FROM test_vessel_tracks_hourly_for_daily
ORDER BY sig_src_cd, target_id, time_bucket;
-- 4. 실제 DailyTrackProcessor SQL 실행 (CAST 사용)
-- Vessel: 000001_TEST001, Day: 2025-01-07
WITH ordered_tracks AS (
SELECT *
FROM test_vessel_tracks_hourly_for_daily
WHERE sig_src_cd = '000001'
AND target_id = 'TEST001'
AND time_bucket >= CAST('2025-01-07 00:00:00' AS timestamp)
AND time_bucket < CAST('2025-01-08 00:00:00' AS timestamp)
AND track_geom IS NOT NULL
AND public.ST_NPoints(track_geom) > 0
ORDER BY time_bucket
),
merged_coords AS (
SELECT
sig_src_cd,
target_id,
string_agg(
COALESCE(
substring(public.ST_AsText(track_geom) from 'LINESTRING\\s*M\\s*\\((.+)\\)'),
substring(public.ST_AsText(track_geom) from '\\((.+)\\)')
),
','
ORDER BY time_bucket
) FILTER (WHERE track_geom IS NOT NULL) as all_coords
FROM ordered_tracks
GROUP BY sig_src_cd, target_id
),
merged_tracks AS (
SELECT
mc.sig_src_cd,
mc.target_id,
CAST('2025-01-07 00:00:00' AS timestamp) as time_bucket,
public.ST_GeomFromText('LINESTRING M(' || mc.all_coords || ')', 4326) as merged_geom,
(SELECT MAX(max_speed) FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id) as max_speed,
(SELECT SUM(point_count) FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id) as total_points,
(SELECT MIN(time_bucket) FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id) as start_time,
(SELECT MAX(time_bucket) FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id) as end_time,
(SELECT start_position FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id ORDER BY time_bucket LIMIT 1) as start_pos,
(SELECT end_position FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id ORDER BY time_bucket DESC LIMIT 1) as end_pos
FROM merged_coords mc
),
calculated_tracks AS (
SELECT
*,
public.ST_Length(merged_geom::geography) / 1852.0 as total_distance,
CASE
WHEN public.ST_NPoints(merged_geom) > 0 THEN
public.ST_M(public.ST_PointN(merged_geom, public.ST_NPoints(merged_geom))) -
public.ST_M(public.ST_PointN(merged_geom, 1))
ELSE
EXTRACT(EPOCH FROM
CAST(end_pos->>'time' AS timestamp) - CAST(start_pos->>'time' AS timestamp)
)
END as time_diff_seconds
FROM merged_tracks
)
SELECT
'=== DAILY AGGREGATION RESULT (VESSEL 000001_TEST001) ===' as section,
sig_src_cd,
target_id,
time_bucket,
public.ST_NPoints(merged_geom) as merged_points,
public.ST_IsValid(merged_geom) as is_valid,
total_distance,
CASE
WHEN time_diff_seconds > 0 THEN
CAST(LEAST((total_distance / (time_diff_seconds / 3600.0)), 9999.99) AS numeric(6,2))
ELSE 0
END as avg_speed,
max_speed,
total_points,
start_time,
end_time,
start_pos,
end_pos,
public.ST_AsText(merged_geom) as geom_text
FROM calculated_tracks;
-- 5. INSERT 테스트 (CAST 호환성 검증)
INSERT INTO test_vessel_tracks_daily
WITH ordered_tracks AS (
SELECT *
FROM test_vessel_tracks_hourly_for_daily
WHERE sig_src_cd = '000001'
AND target_id = 'TEST001'
AND time_bucket >= CAST('2025-01-07 00:00:00' AS timestamp)
AND time_bucket < CAST('2025-01-08 00:00:00' AS timestamp)
AND track_geom IS NOT NULL
AND public.ST_NPoints(track_geom) > 0
ORDER BY time_bucket
),
merged_coords AS (
SELECT
sig_src_cd,
target_id,
string_agg(
COALESCE(
substring(public.ST_AsText(track_geom) from 'LINESTRING\\s*M\\s*\\((.+)\\)'),
substring(public.ST_AsText(track_geom) from '\\((.+)\\)')
),
','
ORDER BY time_bucket
) FILTER (WHERE track_geom IS NOT NULL) as all_coords
FROM ordered_tracks
GROUP BY sig_src_cd, target_id
),
merged_tracks AS (
SELECT
mc.sig_src_cd,
mc.target_id,
CAST('2025-01-07 00:00:00' AS timestamp) as time_bucket,
public.ST_GeomFromText('LINESTRING M(' || mc.all_coords || ')', 4326) as merged_geom,
(SELECT MAX(max_speed) FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id) as max_speed,
(SELECT SUM(point_count) FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id) as total_points,
(SELECT MIN(time_bucket) FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id) as start_time,
(SELECT MAX(time_bucket) FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id) as end_time,
(SELECT start_position FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id ORDER BY time_bucket LIMIT 1) as start_pos,
(SELECT end_position FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id ORDER BY time_bucket DESC LIMIT 1) as end_pos
FROM merged_coords mc
),
calculated_tracks AS (
SELECT
*,
public.ST_Length(merged_geom::geography) / 1852.0 as total_distance,
CASE
WHEN public.ST_NPoints(merged_geom) > 0 THEN
public.ST_M(public.ST_PointN(merged_geom, public.ST_NPoints(merged_geom))) -
public.ST_M(public.ST_PointN(merged_geom, 1))
ELSE
EXTRACT(EPOCH FROM
CAST(end_pos->>'time' AS timestamp) - CAST(start_pos->>'time' AS timestamp)
)
END as time_diff_seconds
FROM merged_tracks
)
SELECT
sig_src_cd,
target_id,
time_bucket,
merged_geom as track_geom,
total_distance as distance_nm,
CASE
WHEN time_diff_seconds > 0 THEN
CAST(LEAST((total_distance / (time_diff_seconds / 3600.0)), 9999.99) AS numeric(6,2))
ELSE 0
END as avg_speed,
max_speed,
total_points as point_count,
start_pos as start_position,
end_pos as end_position
FROM calculated_tracks;
-- 6. 정박 선박 INSERT 테스트
INSERT INTO test_vessel_tracks_daily
WITH ordered_tracks AS (
SELECT *
FROM test_vessel_tracks_hourly_for_daily
WHERE sig_src_cd = '000002'
AND target_id = 'TEST002'
AND time_bucket >= CAST('2025-01-07 00:00:00' AS timestamp)
AND time_bucket < CAST('2025-01-08 00:00:00' AS timestamp)
AND track_geom IS NOT NULL
AND public.ST_NPoints(track_geom) > 0
ORDER BY time_bucket
),
merged_coords AS (
SELECT
sig_src_cd,
target_id,
string_agg(
COALESCE(
substring(public.ST_AsText(track_geom) from 'LINESTRING\\s*M\\s*\\((.+)\\)'),
substring(public.ST_AsText(track_geom) from '\\((.+)\\)')
),
','
ORDER BY time_bucket
) FILTER (WHERE track_geom IS NOT NULL) as all_coords
FROM ordered_tracks
GROUP BY sig_src_cd, target_id
),
merged_tracks AS (
SELECT
mc.sig_src_cd,
mc.target_id,
CAST('2025-01-07 00:00:00' AS timestamp) as time_bucket,
public.ST_GeomFromText('LINESTRING M(' || mc.all_coords || ')', 4326) as merged_geom,
(SELECT MAX(max_speed) FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id) as max_speed,
(SELECT SUM(point_count) FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id) as total_points,
(SELECT MIN(time_bucket) FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id) as start_time,
(SELECT MAX(time_bucket) FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id) as end_time,
(SELECT start_position FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id ORDER BY time_bucket LIMIT 1) as start_pos,
(SELECT end_position FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id ORDER BY time_bucket DESC LIMIT 1) as end_pos
FROM merged_coords mc
),
calculated_tracks AS (
SELECT
*,
public.ST_Length(merged_geom::geography) / 1852.0 as total_distance,
CASE
WHEN public.ST_NPoints(merged_geom) > 0 THEN
public.ST_M(public.ST_PointN(merged_geom, public.ST_NPoints(merged_geom))) -
public.ST_M(public.ST_PointN(merged_geom, 1))
ELSE
EXTRACT(EPOCH FROM
CAST(end_pos->>'time' AS timestamp) - CAST(start_pos->>'time' AS timestamp)
)
END as time_diff_seconds
FROM merged_tracks
)
SELECT
sig_src_cd,
target_id,
time_bucket,
merged_geom as track_geom,
total_distance as distance_nm,
CASE
WHEN time_diff_seconds > 0 THEN
CAST(LEAST((total_distance / (time_diff_seconds / 3600.0)), 9999.99) AS numeric(6,2))
ELSE 0
END as avg_speed,
max_speed,
total_points as point_count,
start_pos as start_position,
end_pos as end_position
FROM calculated_tracks;
-- 7. 단일 시간 선박 INSERT 테스트
INSERT INTO test_vessel_tracks_daily
WITH ordered_tracks AS (
SELECT *
FROM test_vessel_tracks_hourly_for_daily
WHERE sig_src_cd = '000003'
AND target_id = 'TEST003'
AND time_bucket >= CAST('2025-01-07 00:00:00' AS timestamp)
AND time_bucket < CAST('2025-01-08 00:00:00' AS timestamp)
AND track_geom IS NOT NULL
AND public.ST_NPoints(track_geom) > 0
ORDER BY time_bucket
),
merged_coords AS (
SELECT
sig_src_cd,
target_id,
string_agg(
COALESCE(
substring(public.ST_AsText(track_geom) from 'LINESTRING\\s*M\\s*\\((.+)\\)'),
substring(public.ST_AsText(track_geom) from '\\((.+)\\)')
),
','
ORDER BY time_bucket
) FILTER (WHERE track_geom IS NOT NULL) as all_coords
FROM ordered_tracks
GROUP BY sig_src_cd, target_id
),
merged_tracks AS (
SELECT
mc.sig_src_cd,
mc.target_id,
CAST('2025-01-07 00:00:00' AS timestamp) as time_bucket,
public.ST_GeomFromText('LINESTRING M(' || mc.all_coords || ')', 4326) as merged_geom,
(SELECT MAX(max_speed) FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id) as max_speed,
(SELECT SUM(point_count) FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id) as total_points,
(SELECT MIN(time_bucket) FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id) as start_time,
(SELECT MAX(time_bucket) FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id) as end_time,
(SELECT start_position FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id ORDER BY time_bucket LIMIT 1) as start_pos,
(SELECT end_position FROM ordered_tracks WHERE sig_src_cd = mc.sig_src_cd AND target_id = mc.target_id ORDER BY time_bucket DESC LIMIT 1) as end_pos
FROM merged_coords mc
),
calculated_tracks AS (
SELECT
*,
public.ST_Length(merged_geom::geography) / 1852.0 as total_distance,
CASE
WHEN public.ST_NPoints(merged_geom) > 0 THEN
public.ST_M(public.ST_PointN(merged_geom, public.ST_NPoints(merged_geom))) -
public.ST_M(public.ST_PointN(merged_geom, 1))
ELSE
EXTRACT(EPOCH FROM
CAST(end_pos->>'time' AS timestamp) - CAST(start_pos->>'time' AS timestamp)
)
END as time_diff_seconds
FROM merged_tracks
)
SELECT
sig_src_cd,
target_id,
time_bucket,
merged_geom as track_geom,
total_distance as distance_nm,
CASE
WHEN time_diff_seconds > 0 THEN
CAST(LEAST((total_distance / (time_diff_seconds / 3600.0)), 9999.99) AS numeric(6,2))
ELSE 0
END as avg_speed,
max_speed,
total_points as point_count,
start_pos as start_position,
end_pos as end_position
FROM calculated_tracks;
-- 8. 최종 결과 검증
SELECT
'=== FINAL DAILY AGGREGATION RESULTS ===' as section,
sig_src_cd,
target_id,
time_bucket,
public.ST_NPoints(track_geom) as points,
public.ST_IsValid(track_geom) as is_valid,
distance_nm,
avg_speed,
max_speed,
point_count,
public.ST_AsText(track_geom) as wkt
FROM test_vessel_tracks_daily
ORDER BY sig_src_cd, target_id;
-- 9. 타입 검증
SELECT
'=== DATA TYPE VALIDATION ===' as section,
pg_typeof(time_bucket) as time_bucket_type,
pg_typeof(track_geom) as track_geom_type,
pg_typeof(distance_nm) as distance_type,
pg_typeof(avg_speed) as avg_speed_type,
pg_typeof(max_speed) as max_speed_type,
pg_typeof(point_count) as point_count_type,
pg_typeof(start_position) as start_position_type
FROM test_vessel_tracks_daily
LIMIT 1;
-- 10. 시간 순서 검증 (M값이 증가하는지 확인)
SELECT
'=== TIME ORDERING VALIDATION ===' as section,
sig_src_cd,
target_id,
public.ST_M(public.ST_PointN(track_geom, 1)) as first_m_value,
public.ST_M(public.ST_PointN(track_geom, public.ST_NPoints(track_geom))) as last_m_value,
CASE
WHEN public.ST_M(public.ST_PointN(track_geom, public.ST_NPoints(track_geom))) >=
public.ST_M(public.ST_PointN(track_geom, 1))
THEN 'PASS'
ELSE 'FAIL'
END as time_order_check
FROM test_vessel_tracks_daily;
-- 11. 정리
DROP TABLE IF EXISTS test_vessel_tracks_hourly_for_daily CASCADE;
DROP TABLE IF EXISTS test_vessel_tracks_daily CASCADE;
-- ========================================
-- 테스트 완료
-- 모든 INSERT가 성공하고 타입 에러가 없으면 CAST 사용이 정상
-- ========================================