signal-batch/sql/V2_snp_schema_migration.sql
htlee 2e9361ee58 refactor: SNP API 전환 및 레거시 코드 전면 정리
- CollectDB 다중 신호 수집 → S&P Global AIS API 단일 수집으로 전환
- sig_src_cd + target_id 이중 식별자 → mmsi(VARCHAR) 단일 식별자
- t_vessel_latest_position → t_ais_position 테이블 전환
- 레거시 배치/유틸 ~30개 클래스 삭제 (VesselAggregationJobConfig, ShipKindCodeConverter 등)
- AisTargetCacheManager 기반 캐시 이중 구조 (최신위치 + 트랙 버퍼)
- CacheBasedVesselTrackDataReader + CacheBasedTrackJobListener 신규 추가
- VesselStaticStepConfig: 정적정보 CDC 변경 검출 + hourly job 편승
- SignalKindCode enum: vesselType/extraInfo 기반 선종 자동 분류
- WebSocket/STOMP 전체 mmsi 전환 (StompTrackStreamingService ~40곳)
- 모니터링/성능 최적화 코드 mmsi 기반 전환
- DataSource 설정 통합 (snpdb 단일 DB)
- AreaBoundaryCache Polygon→Geometry 캐스트 수정 (MULTIPOLYGON 지원)
- ConcurrentHashMap 적용 (VesselTrackStepConfig 동시성 버그 수정)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 09:59:49 +09:00

585 lines
24 KiB
PL/PgSQL

-- ============================================================
-- gc-signal-batch V2: SNP API 기반 스키마 (신규 생성)
-- 타겟 DB: snpdb (211.208.115.83), 스키마: signal
--
-- 핵심 변경:
-- sig_src_cd + target_id → mmsi VARCHAR(20) 단일 식별자
-- t_vessel_latest_position → t_ais_position (새 구조)
-- 신규: t_vessel_static (정적 정보 이력)
--
-- 실행 전 확인:
-- 1. PostGIS 확장이 설치되어 있는지 확인
-- 2. signal 스키마가 존재하는지 확인
-- 3. 파티션 테이블은 PartitionManager가 런타임에 자동 생성
-- ============================================================
-- 스키마 생성
CREATE SCHEMA IF NOT EXISTS signal;
-- PostGIS 확장 활성화
CREATE EXTENSION IF NOT EXISTS postgis;
-- ============================================================
-- 1. AIS 위치/정적 정보 (SNP API 전용, 신규)
-- ============================================================
-- t_ais_position: AIS 최신 위치 (MMSI별 1건 UPSERT)
-- 용도: 캐시 복원, 타 프로세스 최신 위치 조회, API 불가 환경 대응
-- 갱신: 5분 집계 Job에서 캐시 스냅샷 UPSERT
CREATE TABLE IF NOT EXISTS signal.t_ais_position (
mmsi VARCHAR(20) PRIMARY KEY,
imo BIGINT,
name VARCHAR(50),
callsign VARCHAR(20),
vessel_type VARCHAR(50),
extra_info VARCHAR(200),
lat DOUBLE PRECISION NOT NULL,
lon DOUBLE PRECISION NOT NULL,
geom GEOMETRY(POINT, 4326),
heading DOUBLE PRECISION,
sog DOUBLE PRECISION,
cog DOUBLE PRECISION,
rot INTEGER,
length INTEGER,
width INTEGER,
draught DOUBLE PRECISION,
destination VARCHAR(200),
eta TIMESTAMPTZ,
status VARCHAR(50),
message_timestamp TIMESTAMPTZ NOT NULL,
signal_kind_code VARCHAR(10),
class_type VARCHAR(1),
last_update TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_ais_position_geom ON signal.t_ais_position USING GIST (geom);
CREATE INDEX IF NOT EXISTS idx_ais_position_signal_kind ON signal.t_ais_position (signal_kind_code);
CREATE INDEX IF NOT EXISTS idx_ais_position_timestamp ON signal.t_ais_position (message_timestamp);
COMMENT ON TABLE signal.t_ais_position IS 'AIS 최신 위치 (MMSI별 1건, 5분 집계 Job에서 UPSERT)';
COMMENT ON COLUMN signal.t_ais_position.mmsi IS 'MMSI (VARCHAR — 문자 혼합 MMSI 장비 지원)';
COMMENT ON COLUMN signal.t_ais_position.signal_kind_code IS 'MDA 범례코드 (SignalKindCode.resolve 결과)';
-- t_vessel_static: 정적 정보 이력 (위변조/흘수 변경 추적)
-- 전략: COALESCE + CDC 하이브리드 (HourlyJob에서 저장)
-- 보존: 90일
CREATE TABLE IF NOT EXISTS signal.t_vessel_static (
mmsi VARCHAR(20) NOT NULL,
time_bucket TIMESTAMPTZ NOT NULL,
imo BIGINT,
name VARCHAR(50),
callsign VARCHAR(20),
vessel_type VARCHAR(50),
extra_info VARCHAR(200),
length INTEGER,
width INTEGER,
draught DOUBLE PRECISION,
destination VARCHAR(200),
eta TIMESTAMPTZ,
status VARCHAR(50),
signal_kind_code VARCHAR(10),
class_type VARCHAR(1),
PRIMARY KEY (mmsi, time_bucket)
);
CREATE INDEX IF NOT EXISTS idx_vessel_static_mmsi ON signal.t_vessel_static (mmsi);
COMMENT ON TABLE signal.t_vessel_static IS '선박 정적 정보 이력 (시간별, COALESCE+CDC). 보존 90일';
-- ============================================================
-- 2. 핵심 항적 테이블 (5분/시간/일별 — 파티션)
-- ============================================================
-- t_vessel_tracks_5min: 5분 단위 항적 (일별 파티션)
CREATE TABLE IF NOT EXISTS signal.t_vessel_tracks_5min (
mmsi VARCHAR(20) NOT NULL,
time_bucket TIMESTAMP NOT NULL,
track_geom GEOMETRY(LINESTRINGM, 4326),
distance_nm NUMERIC(10,2),
avg_speed NUMERIC(6,2),
max_speed NUMERIC(6,2),
point_count INTEGER,
start_position JSONB,
end_position JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT t_vessel_tracks_5min_pkey PRIMARY KEY (mmsi, time_bucket)
) PARTITION BY RANGE (time_bucket);
CREATE INDEX IF NOT EXISTS idx_tracks_5min_mmsi ON signal.t_vessel_tracks_5min (mmsi);
CREATE INDEX IF NOT EXISTS idx_tracks_5min_bucket ON signal.t_vessel_tracks_5min (time_bucket);
COMMENT ON TABLE signal.t_vessel_tracks_5min IS '선박 항적 5분 단위 집계';
COMMENT ON COLUMN signal.t_vessel_tracks_5min.mmsi IS 'MMSI (VARCHAR)';
COMMENT ON COLUMN signal.t_vessel_tracks_5min.track_geom IS 'LineStringM 형식 항적 (M값은 첫 포인트 기준 상대시간 초)';
COMMENT ON COLUMN signal.t_vessel_tracks_5min.start_position IS '시작 위치 JSON {lat, lon, time, sog}';
COMMENT ON COLUMN signal.t_vessel_tracks_5min.end_position IS '종료 위치 JSON {lat, lon, time, sog}';
-- t_vessel_tracks_hourly: 시간별 항적 (월별 파티션)
CREATE TABLE IF NOT EXISTS signal.t_vessel_tracks_hourly (
mmsi VARCHAR(20) NOT NULL,
time_bucket TIMESTAMP NOT NULL,
track_geom GEOMETRY(LINESTRINGM, 4326),
distance_nm NUMERIC(10,2),
avg_speed NUMERIC(6,2),
max_speed NUMERIC(6,2),
point_count INTEGER,
start_position JSONB,
end_position JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT t_vessel_tracks_hourly_pkey PRIMARY KEY (mmsi, time_bucket)
) PARTITION BY RANGE (time_bucket);
CREATE INDEX IF NOT EXISTS idx_tracks_hourly_mmsi ON signal.t_vessel_tracks_hourly (mmsi);
CREATE INDEX IF NOT EXISTS idx_tracks_hourly_bucket ON signal.t_vessel_tracks_hourly (time_bucket);
CREATE INDEX IF NOT EXISTS idx_tracks_hourly_geom ON signal.t_vessel_tracks_hourly USING GIST (track_geom);
COMMENT ON TABLE signal.t_vessel_tracks_hourly IS '선박 항적 시간별 집계';
-- t_vessel_tracks_daily: 일별 항적 (월별 파티션)
CREATE TABLE IF NOT EXISTS signal.t_vessel_tracks_daily (
mmsi VARCHAR(20) NOT NULL,
time_bucket DATE NOT NULL,
track_geom GEOMETRY(LINESTRINGM, 4326),
distance_nm NUMERIC(10,2),
avg_speed NUMERIC(6,2),
max_speed NUMERIC(6,2),
point_count INTEGER,
operating_hours NUMERIC(4,2),
port_visits JSONB,
start_position JSONB,
end_position JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT t_vessel_tracks_daily_pkey PRIMARY KEY (mmsi, time_bucket)
) PARTITION BY RANGE (time_bucket);
CREATE INDEX IF NOT EXISTS idx_tracks_daily_mmsi ON signal.t_vessel_tracks_daily (mmsi);
CREATE INDEX IF NOT EXISTS idx_tracks_daily_bucket ON signal.t_vessel_tracks_daily (time_bucket);
CREATE INDEX IF NOT EXISTS idx_tracks_daily_geom ON signal.t_vessel_tracks_daily USING GIST (track_geom);
COMMENT ON TABLE signal.t_vessel_tracks_daily IS '선박 항적 일별 집계';
-- ============================================================
-- 3. 해구(Grid) 관련 테이블 — 파티션
-- ============================================================
-- t_haegu_definitions: 대해구 정의 (일반 테이블)
CREATE TABLE IF NOT EXISTS signal.t_haegu_definitions (
haegu_no INTEGER NOT NULL,
min_lat DOUBLE PRECISION NOT NULL,
min_lon DOUBLE PRECISION NOT NULL,
max_lat DOUBLE PRECISION NOT NULL,
max_lon DOUBLE PRECISION NOT NULL,
center_lat DOUBLE PRECISION NOT NULL,
center_lon DOUBLE PRECISION NOT NULL,
geom GEOMETRY(MULTIPOLYGON, 4326) NOT NULL,
center_point GEOMETRY(POINT, 4326) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT t_haegu_definitions_pkey PRIMARY KEY (haegu_no)
);
CREATE INDEX IF NOT EXISTS idx_haegu_definitions_geom ON signal.t_haegu_definitions USING GIST (geom);
COMMENT ON TABLE signal.t_haegu_definitions IS '대해구 정의 정보';
-- t_grid_tiles: 그리드 타일 정의 (일반 테이블)
CREATE TABLE IF NOT EXISTS signal.t_grid_tiles (
tile_id VARCHAR(50) NOT NULL,
tile_level INTEGER NOT NULL,
haegu_no INTEGER NOT NULL,
sohaegu_no INTEGER,
min_lat DOUBLE PRECISION NOT NULL,
min_lon DOUBLE PRECISION NOT NULL,
max_lat DOUBLE PRECISION NOT NULL,
max_lon DOUBLE PRECISION NOT NULL,
tile_geom GEOMETRY(POLYGON, 4326) NOT NULL,
center_point GEOMETRY(POINT, 4326) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT t_grid_tiles_pkey PRIMARY KEY (tile_id)
);
CREATE INDEX IF NOT EXISTS idx_grid_tiles_tile_geom ON signal.t_grid_tiles USING GIST (tile_geom);
CREATE INDEX IF NOT EXISTS idx_grid_tiles_haegu ON signal.t_grid_tiles (haegu_no);
CREATE INDEX IF NOT EXISTS idx_grid_tiles_level ON signal.t_grid_tiles (tile_level);
CREATE INDEX IF NOT EXISTS idx_grid_tiles_haegu_sohaegu ON signal.t_grid_tiles (haegu_no, sohaegu_no);
COMMENT ON TABLE signal.t_grid_tiles IS '그리드 타일 정의 (대해구/소해구)';
-- t_grid_vessel_tracks: 해구별 선박 항적 (5분, 일별 파티션)
CREATE TABLE IF NOT EXISTS signal.t_grid_vessel_tracks (
haegu_no INTEGER NOT NULL,
mmsi VARCHAR(20) NOT NULL,
time_bucket TIMESTAMP NOT NULL,
distance_nm NUMERIC(10,2),
avg_speed NUMERIC(6,2),
point_count INTEGER,
entry_time TIMESTAMP,
exit_time TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT t_grid_vessel_tracks_pkey PRIMARY KEY (haegu_no, mmsi, time_bucket)
) PARTITION BY RANGE (time_bucket);
CREATE INDEX IF NOT EXISTS idx_grid_vessel_tracks_mmsi_time ON signal.t_grid_vessel_tracks (mmsi, time_bucket DESC);
CREATE INDEX IF NOT EXISTS idx_grid_vessel_tracks_haegu_time ON signal.t_grid_vessel_tracks (haegu_no, time_bucket DESC);
COMMENT ON TABLE signal.t_grid_vessel_tracks IS '해구별 선박 항적 (5분 단위)';
-- t_grid_tracks_summary: 해구별 항적 요약 (5분, 일별 파티션)
CREATE TABLE IF NOT EXISTS signal.t_grid_tracks_summary (
haegu_no INTEGER NOT NULL,
time_bucket TIMESTAMP NOT NULL,
total_vessels INTEGER,
total_distance_nm NUMERIC(12,2),
avg_speed NUMERIC(6,2),
vessel_list JSONB,
traffic_density NUMERIC(10,4),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT t_grid_tracks_summary_pkey PRIMARY KEY (haegu_no, time_bucket)
) PARTITION BY RANGE (time_bucket);
COMMENT ON TABLE signal.t_grid_tracks_summary IS '해구별 5분 단위 항적 요약 통계';
COMMENT ON COLUMN signal.t_grid_tracks_summary.vessel_list IS '선박별 상세 정보 [{mmsi, distance_nm, avg_speed}]';
-- t_grid_tracks_summary_hourly: 해구별 시간별 요약 (월별 파티션)
CREATE TABLE IF NOT EXISTS signal.t_grid_tracks_summary_hourly (
haegu_no INTEGER NOT NULL,
time_bucket TIMESTAMP NOT NULL,
total_vessels INTEGER,
total_distance_nm NUMERIC(12,2),
avg_speed NUMERIC(6,2),
vessel_list JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT t_grid_tracks_summary_hourly_pkey PRIMARY KEY (haegu_no, time_bucket)
) PARTITION BY RANGE (time_bucket);
CREATE INDEX IF NOT EXISTS idx_grid_tracks_summary_hourly_time ON signal.t_grid_tracks_summary_hourly (time_bucket DESC, haegu_no);
COMMENT ON TABLE signal.t_grid_tracks_summary_hourly IS '해구별 시간별 항적 요약 통계';
-- t_grid_tracks_summary_daily: 해구별 일별 요약 (월별 파티션)
CREATE TABLE IF NOT EXISTS signal.t_grid_tracks_summary_daily (
haegu_no INTEGER NOT NULL,
time_bucket DATE NOT NULL,
total_vessels INTEGER,
total_distance_nm NUMERIC(12,2),
avg_speed NUMERIC(6,2),
vessel_list JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT t_grid_tracks_summary_daily_pkey PRIMARY KEY (haegu_no, time_bucket)
) PARTITION BY RANGE (time_bucket);
CREATE INDEX IF NOT EXISTS idx_grid_tracks_summary_daily_time ON signal.t_grid_tracks_summary_daily (time_bucket DESC, haegu_no);
COMMENT ON TABLE signal.t_grid_tracks_summary_daily IS '해구별 일일 항적 요약 통계';
-- ============================================================
-- 4. 영역(Area) 관련 테이블 — 파티션
-- ============================================================
-- t_areas: 사용자 정의 영역 (일반 테이블)
CREATE TABLE IF NOT EXISTS signal.t_areas (
area_id VARCHAR(50) NOT NULL,
area_name VARCHAR(100) NOT NULL,
area_type VARCHAR(20) NOT NULL,
area_geom GEOMETRY(MULTIPOLYGON, 4326) NOT NULL,
properties JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT t_areas_pkey PRIMARY KEY (area_id)
);
CREATE INDEX IF NOT EXISTS idx_t_areas_area_geom ON signal.t_areas USING GIST (area_geom);
COMMENT ON TABLE signal.t_areas IS '사용자 정의 영역 정보';
-- t_area_vessel_tracks: 영역별 선박 항적 (5분, 일별 파티션)
CREATE TABLE IF NOT EXISTS signal.t_area_vessel_tracks (
area_id VARCHAR(50) NOT NULL,
mmsi VARCHAR(20) NOT NULL,
time_bucket TIMESTAMP NOT NULL,
distance_nm NUMERIC(10,2),
avg_speed NUMERIC(6,2),
point_count INTEGER,
metrics JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT t_area_vessel_tracks_pkey PRIMARY KEY (area_id, mmsi, time_bucket)
) PARTITION BY RANGE (time_bucket);
CREATE INDEX IF NOT EXISTS idx_area_vessel_tracks_mmsi_time ON signal.t_area_vessel_tracks (mmsi, time_bucket DESC);
CREATE INDEX IF NOT EXISTS idx_area_vessel_tracks_area_time ON signal.t_area_vessel_tracks (area_id, time_bucket DESC);
COMMENT ON TABLE signal.t_area_vessel_tracks IS '영역별 선박 항적 (5분 단위)';
-- t_area_tracks_summary: 영역별 항적 요약 (5분, 일별 파티션)
CREATE TABLE IF NOT EXISTS signal.t_area_tracks_summary (
area_id VARCHAR(50) NOT NULL,
time_bucket TIMESTAMP NOT NULL,
total_vessels INTEGER,
total_distance_nm NUMERIC(12,2),
avg_speed NUMERIC(6,2),
vessel_list JSONB,
metrics_summary JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT t_area_tracks_summary_pkey PRIMARY KEY (area_id, time_bucket)
) PARTITION BY RANGE (time_bucket);
COMMENT ON TABLE signal.t_area_tracks_summary IS '영역별 5분 단위 항적 요약 통계';
COMMENT ON COLUMN signal.t_area_tracks_summary.vessel_list IS '선박별 상세 정보 [{mmsi, distance_nm, avg_speed}]';
-- t_area_tracks_summary_hourly: 영역별 시간별 요약 (월별 파티션)
CREATE TABLE IF NOT EXISTS signal.t_area_tracks_summary_hourly (
area_id VARCHAR(50) NOT NULL,
time_bucket TIMESTAMP NOT NULL,
total_vessels INTEGER,
total_distance_nm NUMERIC(12,2),
avg_speed NUMERIC(6,2),
vessel_list JSONB,
metrics_summary JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT t_area_tracks_summary_hourly_pkey PRIMARY KEY (area_id, time_bucket)
) PARTITION BY RANGE (time_bucket);
CREATE INDEX IF NOT EXISTS idx_area_tracks_summary_hourly_time ON signal.t_area_tracks_summary_hourly (time_bucket DESC, area_id);
COMMENT ON TABLE signal.t_area_tracks_summary_hourly IS '영역별 시간별 항적 요약 통계';
-- t_area_tracks_summary_daily: 영역별 일별 요약 (월별 파티션)
CREATE TABLE IF NOT EXISTS signal.t_area_tracks_summary_daily (
area_id VARCHAR(50) NOT NULL,
time_bucket DATE NOT NULL,
total_vessels INTEGER,
total_distance_nm NUMERIC(12,2),
avg_speed NUMERIC(6,2),
vessel_list JSONB,
metrics_summary JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT t_area_tracks_summary_daily_pkey PRIMARY KEY (area_id, time_bucket)
) PARTITION BY RANGE (time_bucket);
CREATE INDEX IF NOT EXISTS idx_area_tracks_summary_daily_time ON signal.t_area_tracks_summary_daily (time_bucket DESC, area_id);
COMMENT ON TABLE signal.t_area_tracks_summary_daily IS '영역별 일일 항적 요약 통계';
-- t_area_statistics: 영역별 선박 통계 (5분, 일별 파티션)
CREATE TABLE IF NOT EXISTS signal.t_area_statistics (
area_id VARCHAR(50) NOT NULL,
time_bucket TIMESTAMP NOT NULL,
vessel_count INTEGER DEFAULT 0,
in_count INTEGER DEFAULT 0,
out_count INTEGER DEFAULT 0,
transit_vessels JSONB,
stationary_vessels JSONB,
avg_sog NUMERIC(25,1),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT t_area_statistics_pkey PRIMARY KEY (area_id, time_bucket)
) PARTITION BY RANGE (time_bucket);
CREATE INDEX IF NOT EXISTS idx_area_stats_lookup ON signal.t_area_statistics (area_id, time_bucket DESC);
COMMENT ON TABLE signal.t_area_statistics IS '영역별 5분 단위 선박 통계';
-- ============================================================
-- 5. 비정상 항적 테이블 — 파티션
-- ============================================================
-- t_abnormal_tracks: 비정상 항적 (월별 파티션)
-- id는 GENERATED ALWAYS로 자동 생성
CREATE TABLE IF NOT EXISTS signal.t_abnormal_tracks (
id BIGINT GENERATED ALWAYS AS IDENTITY,
mmsi VARCHAR(20) NOT NULL,
time_bucket TIMESTAMP NOT NULL,
track_geom GEOMETRY(LINESTRINGM, 4326),
abnormal_type VARCHAR(50) NOT NULL,
abnormal_reason JSONB NOT NULL,
distance_nm NUMERIC(10,2),
avg_speed NUMERIC(6,2),
max_speed NUMERIC(6,2),
point_count INTEGER,
source_table VARCHAR(50) NOT NULL,
detected_at TIMESTAMP DEFAULT NOW(),
CONSTRAINT t_abnormal_tracks_pkey PRIMARY KEY (id, time_bucket)
) PARTITION BY RANGE (time_bucket);
-- ON CONFLICT (mmsi, time_bucket, source_table) 지원
CREATE UNIQUE INDEX IF NOT EXISTS abnormal_tracks_uk ON signal.t_abnormal_tracks (mmsi, time_bucket, source_table);
CREATE INDEX IF NOT EXISTS idx_abnormal_tracks_mmsi ON signal.t_abnormal_tracks (mmsi);
CREATE INDEX IF NOT EXISTS idx_abnormal_tracks_time ON signal.t_abnormal_tracks (time_bucket);
CREATE INDEX IF NOT EXISTS idx_abnormal_tracks_type ON signal.t_abnormal_tracks (abnormal_type);
CREATE INDEX IF NOT EXISTS idx_abnormal_tracks_geom ON signal.t_abnormal_tracks USING GIST (track_geom);
COMMENT ON TABLE signal.t_abnormal_tracks IS '비정상 선박 항적';
COMMENT ON COLUMN signal.t_abnormal_tracks.mmsi IS 'MMSI (VARCHAR)';
COMMENT ON COLUMN signal.t_abnormal_tracks.abnormal_type IS '비정상 유형 (excessive_speed, teleport, impossible_distance, excessive_avg_speed, gap_jump)';
COMMENT ON COLUMN signal.t_abnormal_tracks.source_table IS '검출 원본 테이블 (t_vessel_tracks_5min/hourly/daily)';
-- t_abnormal_track_stats: 비정상 항적 일별 통계 (일반 테이블)
CREATE TABLE IF NOT EXISTS signal.t_abnormal_track_stats (
stat_date DATE NOT NULL,
abnormal_type VARCHAR(50) NOT NULL,
vessel_count INTEGER NOT NULL,
track_count INTEGER NOT NULL,
total_points INTEGER,
avg_deviation NUMERIC(10,2),
max_deviation NUMERIC(10,2),
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
CONSTRAINT t_abnormal_track_stats_pkey PRIMARY KEY (stat_date, abnormal_type)
);
CREATE INDEX IF NOT EXISTS idx_abnormal_track_stats_date ON signal.t_abnormal_track_stats (stat_date);
COMMENT ON TABLE signal.t_abnormal_track_stats IS '비정상 항적 일별 통계';
-- ============================================================
-- 6. 타일 요약 테이블 — 파티션
-- ============================================================
-- t_tile_summary: 타일별 선박 요약 (5분, 일별 파티션)
-- ON CONFLICT (tile_id, time_bucket) 지원을 위해 UNIQUE 추가
CREATE TABLE IF NOT EXISTS signal.t_tile_summary (
tile_id VARCHAR(50) NOT NULL,
tile_level INTEGER NOT NULL,
time_bucket TIMESTAMP NOT NULL,
vessel_count INTEGER DEFAULT 0,
unique_vessels JSONB,
total_points BIGINT DEFAULT 0,
avg_sog NUMERIC(25,1),
max_sog NUMERIC(25,1),
vessel_density NUMERIC(10,6),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
haegu_no INTEGER,
sohaegu_no INTEGER,
CONSTRAINT t_tile_summary_pkey PRIMARY KEY (tile_id, time_bucket, tile_level)
) PARTITION BY RANGE (time_bucket);
-- ConcurrentUpdateManager에서 ON CONFLICT (tile_id, time_bucket) 사용
CREATE UNIQUE INDEX IF NOT EXISTS idx_tile_summary_tile_time_uk ON signal.t_tile_summary (tile_id, time_bucket);
CREATE INDEX IF NOT EXISTS idx_tile_summary_time ON signal.t_tile_summary (time_bucket DESC);
CREATE INDEX IF NOT EXISTS idx_tile_summary_vessel_count ON signal.t_tile_summary (vessel_count DESC);
CREATE INDEX IF NOT EXISTS idx_tile_summary_tile_level ON signal.t_tile_summary (tile_level);
COMMENT ON TABLE signal.t_tile_summary IS '타일별 5분 단위 선박 요약 통계';
COMMENT ON COLUMN signal.t_tile_summary.unique_vessels IS '고유 선박 목록 [{mmsi}]';
-- ============================================================
-- 7. 배치 성능 메트릭 (일반 테이블)
-- ============================================================
CREATE TABLE IF NOT EXISTS signal.t_batch_performance_metrics (
id SERIAL PRIMARY KEY,
job_name VARCHAR(100) NOT NULL,
execution_id BIGINT NOT NULL,
start_time TIMESTAMP NOT NULL,
end_time TIMESTAMP,
duration_seconds BIGINT,
total_read BIGINT,
total_write BIGINT,
throughput_per_sec NUMERIC(10,2),
status VARCHAR(20),
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_batch_metrics_job ON signal.t_batch_performance_metrics (job_name, start_time DESC);
CREATE INDEX IF NOT EXISTS idx_batch_metrics_status ON signal.t_batch_performance_metrics (status) WHERE status != 'COMPLETED';
COMMENT ON TABLE signal.t_batch_performance_metrics IS '배치 작업 성능 메트릭';
-- ============================================================
-- 8. 초기 파티션 생성 (수동 실행용)
-- PartitionManager가 런타임에 자동 생성하지만,
-- 최초 배포 시 수동으로 미리 생성할 수 있음.
-- ============================================================
-- 일별 파티션 생성 함수
CREATE OR REPLACE FUNCTION signal.create_daily_partition(
parent_table TEXT,
target_date DATE
) RETURNS VOID AS $$
DECLARE
partition_name TEXT;
start_date DATE;
end_date DATE;
BEGIN
partition_name := parent_table || '_' || to_char(target_date, 'YYMMDD');
start_date := target_date;
end_date := target_date + INTERVAL '1 day';
EXECUTE format(
'CREATE TABLE IF NOT EXISTS signal.%I PARTITION OF signal.%I FOR VALUES FROM (%L) TO (%L)',
partition_name, parent_table, start_date, end_date
);
END;
$$ LANGUAGE plpgsql;
-- 월별 파티션 생성 함수
CREATE OR REPLACE FUNCTION signal.create_monthly_partition(
parent_table TEXT,
target_date DATE
) RETURNS VOID AS $$
DECLARE
partition_name TEXT;
start_date DATE;
end_date DATE;
BEGIN
partition_name := parent_table || '_' || to_char(target_date, 'YYYY_MM');
start_date := date_trunc('month', target_date);
end_date := date_trunc('month', target_date) + INTERVAL '1 month';
EXECUTE format(
'CREATE TABLE IF NOT EXISTS signal.%I PARTITION OF signal.%I FOR VALUES FROM (%L) TO (%L)',
partition_name, parent_table, start_date, end_date
);
END;
$$ LANGUAGE plpgsql;
-- 현재 월 + 다음 달 파티션 일괄 생성
DO $$
DECLARE
today DATE := CURRENT_DATE;
day_offset INTEGER;
daily_tables TEXT[] := ARRAY[
't_vessel_tracks_5min',
't_grid_vessel_tracks',
't_grid_tracks_summary',
't_area_vessel_tracks',
't_area_tracks_summary',
't_tile_summary',
't_area_statistics'
];
monthly_tables TEXT[] := ARRAY[
't_vessel_tracks_hourly',
't_vessel_tracks_daily',
't_grid_tracks_summary_hourly',
't_grid_tracks_summary_daily',
't_area_tracks_summary_hourly',
't_area_tracks_summary_daily',
't_abnormal_tracks'
];
tbl TEXT;
BEGIN
-- 일별 파티션: 오늘부터 7일간
FOREACH tbl IN ARRAY daily_tables LOOP
FOR day_offset IN 0..6 LOOP
PERFORM signal.create_daily_partition(tbl, today + day_offset);
END LOOP;
END LOOP;
-- 월별 파티션: 이번 달 + 다음 달
FOREACH tbl IN ARRAY monthly_tables LOOP
PERFORM signal.create_monthly_partition(tbl, today);
PERFORM signal.create_monthly_partition(tbl, (today + INTERVAL '1 month')::DATE);
END LOOP;
RAISE NOTICE 'Initial partitions created successfully';
END;
$$;
-- ============================================================
-- 9. ANALYZE (통계 수집)
-- ============================================================
ANALYZE signal.t_ais_position;
ANALYZE signal.t_haegu_definitions;
ANALYZE signal.t_grid_tiles;
ANALYZE signal.t_areas;
ANALYZE signal.t_abnormal_track_stats;
ANALYZE signal.t_batch_performance_metrics;