diff --git a/.claude.bak/settings.json b/.claude.bak/settings.json new file mode 100644 index 0000000..7243506 --- /dev/null +++ b/.claude.bak/settings.json @@ -0,0 +1,35 @@ +{ + "hooks": { + "SessionStart": [ + { + "hooks": [ + { + "type": "prompt", + "prompt": "새 세션이 시작되었습니다. 다음 파일들을 확인하여 이전 작업 컨텍스트를 파악해주세요:\n\n1. .claude/SESSION_HANDOVER.md - 세션 이력 및 작업 현황\n2. git status - 현재 브랜치 및 변경사항\n\n파악한 내용을 간략히 요약하고, 미완료 작업이 있다면 안내해주세요." + } + ] + } + ], + "PreCompact": [ + { + "matcher": "auto", + "hooks": [ + { + "type": "prompt", + "prompt": "컨텍스트 압축 전입니다. .claude/SESSION_HANDOVER.md 파일을 현재 세션 작업 내용으로 업데이트해주세요. 세션 이력에 오늘 날짜와 완료된 작업을 추가하고, 진행 중인 작업이 있다면 미완료 섹션에 기록해주세요." + } + ] + } + ], + "SessionEnd": [ + { + "hooks": [ + { + "type": "prompt", + "prompt": "세션이 종료됩니다. .claude/SESSION_HANDOVER.md 파일이 최신 상태인지 확인하고, 업데이트가 필요하면 현재 세션 작업 내용을 반영해주세요." + } + ] + } + ] + } +} diff --git a/.claude.bak/settings.local.json b/.claude.bak/settings.local.json new file mode 100644 index 0000000..2e6a45c --- /dev/null +++ b/.claude.bak/settings.local.json @@ -0,0 +1,47 @@ +{ + "permissions": { + "allow": [ + "Bash(git add:*)", + "Bash(git commit:*)", + "Bash(git status:*)", + "Bash(git restore:*)", + "Bash(git reset:*)", + "Bash(git checkout:*)", + "Bash(git update-index:*)", + "Bash(mvn clean:*)", + "Bash(mvn compile:*)", + "Bash(mvn package:*)", + "Bash(mvn test:*)", + "Bash(mvn clean compile:*)", + "Bash(mvn clean package:*)", + "Bash(./mvnw:*)", + "Bash(ls:*)", + "Bash(find:*)", + "Bash(grep:*)", + "Bash(curl:*)", + "WebSearch", + "WebFetch(domain:github.com)", + "WebFetch(domain:docs.spring.io)", + "WebFetch(domain:baeldung.com)", + "Bash(cls)", + "Bash(clear)", + "Bash(./gradlew build:*)", + "Bash(gradlew.bat build:*)", + "Bash(cmd /c \"dir /b C:\\\\Users\\\\lht87\\\\IdeaProjects\\\\signal_batch\")", + "Bash(powershell -Command:*)", + "Bash(dir /b \"C:\\\\Users\\\\lht87\\\\.claude\"\" 2>nul || echo \"No .claude folder in home \")", + "Bash(cmd /c \"dir /b C:\\\\Users\\\\lht87\\\\.claude\\\\ 2>nul\")", + "Bash(cmd /c \"dir /s /b C:\\\\Users\\\\lht87\\\\IdeaProjects\\\\*.json 2>nul | findstr /i settings.json\")", + "Bash(powershell -NoProfile -Command:*)", + "Bash(claude doctor:*)", + "Bash(dir C:Userslht87.claude)", + "Bash(cat:*)", + "Bash(dir:*)", + "Bash(echo:*)", + "Bash(claude config:*)" + ], + "deny": [], + "ask": [], + "defaultMode": "acceptEdits" + } +} diff --git a/src/main/java/gc/mda/signal_batch/domain/gis/service/GisService.java b/src/main/java/gc/mda/signal_batch/domain/gis/service/GisService.java index 37c79f7..0c91413 100644 --- a/src/main/java/gc/mda/signal_batch/domain/gis/service/GisService.java +++ b/src/main/java/gc/mda/signal_batch/domain/gis/service/GisService.java @@ -10,6 +10,7 @@ import gc.mda.signal_batch.domain.vessel.service.IntegrationVesselService; import gc.mda.signal_batch.global.util.IntegrationSignalConstants; import gc.mda.signal_batch.global.util.NationalCodeUtil; import gc.mda.signal_batch.global.util.ShipKindCodeConverter; +import gc.mda.signal_batch.global.util.TrackSimplificationUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jdbc.core.JdbcTemplate; @@ -20,11 +21,15 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; import java.time.Duration; +import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; @@ -321,92 +326,29 @@ public class GisService { return allTracks; } + /** + * 선박별 항적 조회 (계층적 보완 조회 + 간소화) + * + * 조회 전략: + * 1. 상위 테이블(daily → hourly → 5min) 순서로 조회 + * 2. 각 테이블에서 누락 구간 감지 + * 3. 누락 구간은 하위 테이블에서 보완 조회 + 상위 수준으로 간소화 + * 4. 전체 시간순 정렬 + */ public List getVesselTracks(VesselTracksRequest request) { JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource); List results = new ArrayList<>(); - + LocalDateTime startTime = request.getStartTime(); LocalDateTime endTime = request.getEndTime(); - + for (VesselTracksRequest.VesselIdentifier vessel : request.getVessels()) { - // String vesselId = vessel.getSigSrcCd() + "_" + vessel.getTargetId(); - - // Determine which tables to query based on time range - Duration duration = Duration.between(startTime, endTime); - long hours = duration.toHours(); - - List tracks = new ArrayList<>(); - LocalDateTime now = LocalDateTime.now(); - LocalDateTime currentHour = now.withMinute(0).withSecond(0).withNano(0); - LocalDateTime currentDay = now.withHour(0).withMinute(0).withSecond(0).withNano(0); - - // Query daily table first (oldest data) - if (hours > 24 && startTime.isBefore(currentDay)) { - // Query daily table - time_bucket is DATE type - String sqlDaily = """ - SELECT sig_src_cd, target_id, - time_bucket::timestamp as time_bucket, - public.ST_AsText(track_geom) as track_geom, - distance_nm, avg_speed, max_speed, point_count - FROM signal.t_vessel_tracks_daily - WHERE sig_src_cd = ? AND target_id = ? - AND time_bucket BETWEEN ?::date AND ?::date - ORDER BY time_bucket - """; - - LocalDateTime queryDailyEnd = endTime.isBefore(currentDay) ? endTime : currentDay.minusDays(1); - tracks.addAll(jdbcTemplate.query(sqlDaily, this::mapTrackResponse, - vessel.getSigSrcCd(), vessel.getTargetId(), - Timestamp.valueOf(startTime), Timestamp.valueOf(queryDailyEnd))); - } - - // Query hourly table (middle-aged data) - if (hours > 1 && startTime.isBefore(currentHour)) { - // Query hourly table - String sqlHourly = """ - SELECT sig_src_cd, target_id, time_bucket, - public.ST_AsText(track_geom) as track_geom, - distance_nm, avg_speed, max_speed, point_count - FROM signal.t_vessel_tracks_hourly - WHERE sig_src_cd = ? AND target_id = ? - AND time_bucket BETWEEN ? AND ? - ORDER BY time_bucket - """; - - // Hourly data should start from currentDay if daily data was queried - LocalDateTime queryHourlyStart = hours > 24 ? currentDay : startTime; - LocalDateTime queryHourlyEnd = endTime.isBefore(currentHour) ? endTime : currentHour.minusHours(1); - if (queryHourlyEnd.isAfter(queryHourlyStart)) { - tracks.addAll(jdbcTemplate.query(sqlHourly, this::mapTrackResponse, - vessel.getSigSrcCd(), vessel.getTargetId(), - Timestamp.valueOf(queryHourlyStart), Timestamp.valueOf(queryHourlyEnd))); - } - } - - // Query 5min table last (newest data) - if (hours <= 1 || endTime.isAfter(currentHour)) { - // Query 5min table for recent data - String sql5min = """ - SELECT sig_src_cd, target_id, time_bucket, - public.ST_AsText(track_geom) as track_geom, - distance_nm, avg_speed, max_speed, point_count - FROM signal.t_vessel_tracks_5min - WHERE sig_src_cd = ? AND target_id = ? - AND time_bucket BETWEEN ? AND ? - ORDER BY time_bucket - """; - - LocalDateTime query5minStart = startTime.isAfter(currentHour) ? startTime : currentHour; - if (endTime.isAfter(currentHour)) { - tracks.addAll(jdbcTemplate.query(sql5min, this::mapTrackResponse, - vessel.getSigSrcCd(), vessel.getTargetId(), - Timestamp.valueOf(query5minStart), Timestamp.valueOf(endTime))); - } - } - + List tracks = queryVesselTracksWithFallback( + jdbcTemplate, vessel.getSigSrcCd(), vessel.getTargetId(), startTime, endTime); + // Sort all tracks by time_bucket to ensure proper ordering tracks.sort((t1, t2) -> t1.getTimeBucket().compareTo(t2.getTimeBucket())); - + if (!tracks.isEmpty()) { CompactVesselTrack compactTrack = buildCompactVesselTrack(vessel, tracks); results.add(compactTrack); @@ -421,6 +363,234 @@ public class GisService { return results; } + /** + * 계층적 보완 조회 로직 + * 상위 테이블에서 데이터가 없는 구간을 하위 테이블에서 보완 + */ + private List queryVesselTracksWithFallback( + JdbcTemplate jdbcTemplate, String sigSrcCd, String targetId, + LocalDateTime startTime, LocalDateTime endTime) { + + List allTracks = new ArrayList<>(); + Duration duration = Duration.between(startTime, endTime); + long hours = duration.toHours(); + + LocalDateTime now = LocalDateTime.now(); + // 배치 완료 여유 시간 (hourly 배치는 매시 10분 시작, 약 5분 소요) + LocalDateTime safeHourlyBoundary = now.withMinute(0).withSecond(0).withNano(0); + if (now.getMinute() < 15) { + safeHourlyBoundary = safeHourlyBoundary.minusHours(1); + } + LocalDateTime safeDailyBoundary = now.toLocalDate().atStartOfDay().minusDays(1); + + // === 1단계: Daily 테이블 조회 === + Set coveredDays = new HashSet<>(); + if (hours >= 24 && startTime.toLocalDate().isBefore(safeDailyBoundary.toLocalDate().plusDays(1))) { + LocalDate dailyStart = startTime.toLocalDate(); + LocalDate dailyEnd = endTime.toLocalDate().isBefore(safeDailyBoundary.toLocalDate()) + ? endTime.toLocalDate() + : safeDailyBoundary.toLocalDate(); + + if (!dailyEnd.isBefore(dailyStart)) { + List dailyTracks = queryDailyTracks( + jdbcTemplate, sigSrcCd, targetId, dailyStart, dailyEnd); + + for (TrackResponse track : dailyTracks) { + coveredDays.add(track.getTimeBucket().toLocalDate()); + } + allTracks.addAll(dailyTracks); + + log.debug("[FALLBACK] Daily: {} days covered for {}_{}", + coveredDays.size(), sigSrcCd, targetId); + } + } + + // === 2단계: Daily 누락 구간 → Hourly에서 보완 === + if (hours >= 24) { + LocalDate dailyStart = startTime.toLocalDate(); + LocalDate dailyEnd = endTime.toLocalDate().isBefore(safeDailyBoundary.toLocalDate()) + ? endTime.toLocalDate() + : safeDailyBoundary.toLocalDate(); + + List missingDays = detectMissingDays(dailyStart, dailyEnd, coveredDays); + + for (LocalDate missingDay : missingDays) { + LocalDateTime dayStart = missingDay.atStartOfDay(); + LocalDateTime dayEnd = missingDay.plusDays(1).atStartOfDay(); + + // Hourly로 보완 조회 (Daily 수준으로 간소화) + List fallbackTracks = queryHourlyTracks( + jdbcTemplate, sigSrcCd, targetId, dayStart, dayEnd); + + for (TrackResponse track : fallbackTracks) { + track.setTrackGeom(TrackSimplificationUtils.simplifyDailyTrack(track.getTrackGeom())); + } + allTracks.addAll(fallbackTracks); + + log.debug("[FALLBACK] Daily missing {} → {} hourly segments (simplified to daily level)", + missingDay, fallbackTracks.size()); + } + } + + // === 3단계: Hourly 테이블 조회 === + Set coveredHours = new HashSet<>(); + LocalDateTime hourlyStart = hours >= 24 + ? safeDailyBoundary.plusDays(1) // Daily 다음날부터 + : startTime.withMinute(0).withSecond(0).withNano(0); + LocalDateTime hourlyEnd = endTime.isBefore(safeHourlyBoundary) ? endTime : safeHourlyBoundary; + + if (hours > 1 && hourlyStart.isBefore(hourlyEnd)) { + List hourlyTracks = queryHourlyTracks( + jdbcTemplate, sigSrcCd, targetId, hourlyStart, hourlyEnd); + + for (TrackResponse track : hourlyTracks) { + coveredHours.add(track.getTimeBucket().withMinute(0).withSecond(0).withNano(0)); + } + allTracks.addAll(hourlyTracks); + + log.debug("[FALLBACK] Hourly: {} hours covered for {}_{}", + coveredHours.size(), sigSrcCd, targetId); + } + + // === 4단계: Hourly 누락 구간 → 5min에서 보완 === + if (hours > 1 && hourlyStart.isBefore(hourlyEnd)) { + List missingHours = detectMissingHours(hourlyStart, hourlyEnd, coveredHours); + + for (LocalDateTime missingHour : missingHours) { + LocalDateTime hourStart = missingHour; + LocalDateTime hourEnd = missingHour.plusHours(1); + + // 5min으로 보완 조회 (Hourly 수준으로 간소화) + List fallbackTracks = query5minTracks( + jdbcTemplate, sigSrcCd, targetId, hourStart, hourEnd); + + for (TrackResponse track : fallbackTracks) { + track.setTrackGeom(TrackSimplificationUtils.simplifyHourlyTrack(track.getTrackGeom())); + } + allTracks.addAll(fallbackTracks); + + log.debug("[FALLBACK] Hourly missing {} → {} 5min segments (simplified to hourly level)", + missingHour, fallbackTracks.size()); + } + } + + // === 5단계: 5min 테이블 조회 (최신 데이터) === + LocalDateTime fiveMinStart = safeHourlyBoundary.isAfter(startTime) ? safeHourlyBoundary : startTime; + if (endTime.isAfter(fiveMinStart)) { + List fiveMinTracks = query5minTracks( + jdbcTemplate, sigSrcCd, targetId, fiveMinStart, endTime); + allTracks.addAll(fiveMinTracks); + + log.debug("[FALLBACK] 5min: {} segments for {}_{} ({} ~ {})", + fiveMinTracks.size(), sigSrcCd, targetId, fiveMinStart, endTime); + } + + return allTracks; + } + + /** + * Daily 테이블 조회 + */ + private List queryDailyTracks( + JdbcTemplate jdbcTemplate, String sigSrcCd, String targetId, + LocalDate startDate, LocalDate endDate) { + + String sql = """ + SELECT sig_src_cd, target_id, + time_bucket::timestamp as time_bucket, + public.ST_AsText(track_geom) as track_geom, + distance_nm, avg_speed, max_speed, point_count + FROM signal.t_vessel_tracks_daily + WHERE sig_src_cd = ? AND target_id = ? + AND time_bucket BETWEEN ?::date AND ?::date + ORDER BY time_bucket + """; + + return jdbcTemplate.query(sql, this::mapTrackResponse, + sigSrcCd, targetId, + java.sql.Date.valueOf(startDate), java.sql.Date.valueOf(endDate)); + } + + /** + * Hourly 테이블 조회 + */ + private List queryHourlyTracks( + JdbcTemplate jdbcTemplate, String sigSrcCd, String targetId, + LocalDateTime startTime, LocalDateTime endTime) { + + String sql = """ + SELECT sig_src_cd, target_id, time_bucket, + public.ST_AsText(track_geom) as track_geom, + distance_nm, avg_speed, max_speed, point_count + FROM signal.t_vessel_tracks_hourly + WHERE sig_src_cd = ? AND target_id = ? + AND time_bucket >= ? AND time_bucket < ? + ORDER BY time_bucket + """; + + return jdbcTemplate.query(sql, this::mapTrackResponse, + sigSrcCd, targetId, + Timestamp.valueOf(startTime), Timestamp.valueOf(endTime)); + } + + /** + * 5min 테이블 조회 + */ + private List query5minTracks( + JdbcTemplate jdbcTemplate, String sigSrcCd, String targetId, + LocalDateTime startTime, LocalDateTime endTime) { + + String sql = """ + SELECT sig_src_cd, target_id, time_bucket, + public.ST_AsText(track_geom) as track_geom, + distance_nm, avg_speed, max_speed, point_count + FROM signal.t_vessel_tracks_5min + WHERE sig_src_cd = ? AND target_id = ? + AND time_bucket >= ? AND time_bucket < ? + ORDER BY time_bucket + """; + + return jdbcTemplate.query(sql, this::mapTrackResponse, + sigSrcCd, targetId, + Timestamp.valueOf(startTime), Timestamp.valueOf(endTime)); + } + + /** + * 누락된 일자 감지 + */ + private List detectMissingDays(LocalDate start, LocalDate end, Set coveredDays) { + List missingDays = new ArrayList<>(); + LocalDate current = start; + + while (!current.isAfter(end)) { + if (!coveredDays.contains(current)) { + missingDays.add(current); + } + current = current.plusDays(1); + } + + return missingDays; + } + + /** + * 누락된 시간 감지 + */ + private List detectMissingHours( + LocalDateTime start, LocalDateTime end, Set coveredHours) { + + List missingHours = new ArrayList<>(); + LocalDateTime current = start.withMinute(0).withSecond(0).withNano(0); + + while (current.isBefore(end)) { + if (!coveredHours.contains(current)) { + missingHours.add(current); + } + current = current.plusHours(1); + } + + return missingHours; + } + /** * 통합선박 기준 필터링 (REST API용) */ diff --git a/src/main/java/gc/mda/signal_batch/global/util/TrackConverter.java b/src/main/java/gc/mda/signal_batch/global/util/TrackConverter.java index 2486205..be80c1b 100644 --- a/src/main/java/gc/mda/signal_batch/global/util/TrackConverter.java +++ b/src/main/java/gc/mda/signal_batch/global/util/TrackConverter.java @@ -140,11 +140,7 @@ public class TrackConverter { } } - if (allGeometry.isEmpty()) { - return null; - } - - // 선박 정보 조회 + // 선박 정보 조회 (geometry가 비어있어도 선박 객체는 생성) VesselInfo vesselInfo = vesselInfoProvider != null ? vesselInfoProvider.getVesselInfo(sigSrcCd, targetId) : new VesselInfo("-", "-"); diff --git a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java index fd759f7..f94ffce 100644 --- a/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java +++ b/src/main/java/gc/mda/signal_batch/global/websocket/service/ChunkedTrackStreamingService.java @@ -57,7 +57,7 @@ public class ChunkedTrackStreamingService { @SuppressWarnings("unused") private final ExecutorService executorService = Executors.newFixedThreadPool(10); private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - private static final int MAX_TRACKS_PER_CHUNK = 20000; // 청크당 최대 트랙 수 (버퍼 오버플로우 방지) + private static final int MAX_TRACKS_PER_CHUNK = 500000; // 청크당 최대 트랙 수 (10만 선박 지원) private static final int MAX_MESSAGE_SIZE_KB = 1024; // 메시지당 최대 크기 1MB private static final int MIN_MESSAGE_SIZE_KB = 256; // 최소 메시지 크기 256KB private static final int DAILY_PAGE_SIZE = 15000; // Daily 테이블 페이지네이션용 (선박 기준) @@ -398,11 +398,70 @@ public class ChunkedTrackStreamingService { } } + /** + * 2-pass 뷰포트 필터링 - Pass 1: 뷰포트에 교차하는 선박 ID 수집 + * 전체 쿼리 시간 범위에서 뷰포트 영역을 지나는 모든 선박을 식별하여, + * Pass 2에서 해당 선박의 전체 항적(뷰포트 밖 포함)을 조회할 수 있도록 함 + */ + private Set collectViewportVesselIds(Map> strategyMap, TrackQueryRequest request) { + ViewportFilter viewport = request.getViewport(); + if (viewport == null || !viewport.isValid()) { + return null; + } + + long startMs = System.currentTimeMillis(); + Set vesselIds = new HashSet<>(); + + for (Map.Entry> entry : strategyMap.entrySet()) { + String tableName = entry.getKey().getTableName(); + for (TimeRange range : entry.getValue()) { + StringBuilder sql = new StringBuilder(); + sql.append("SELECT DISTINCT sig_src_cd, target_id FROM ").append(tableName); + sql.append(" WHERE time_bucket >= ? AND time_bucket < ?"); + sql.append(" AND public.ST_Intersects(track_geom, public.ST_MakeEnvelope(?, ?, ?, ?, 4326))"); + + if (request.getMinAvgSpeed() != null) { + sql.append(" AND avg_speed >= ").append(request.getMinAvgSpeed()); + } + if (request.getMaxAvgSpeed() != null) { + sql.append(" AND avg_speed <= ").append(request.getMaxAvgSpeed()); + } + + try (Connection conn = queryDataSource.getConnection(); + PreparedStatement ps = conn.prepareStatement(sql.toString())) { + ps.setTimestamp(1, Timestamp.valueOf(range.getStart())); + ps.setTimestamp(2, Timestamp.valueOf(range.getEnd())); + ps.setDouble(3, viewport.getMinLon()); + ps.setDouble(4, viewport.getMinLat()); + ps.setDouble(5, viewport.getMaxLon()); + ps.setDouble(6, viewport.getMaxLat()); + + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + vesselIds.add(rs.getString("sig_src_cd") + "_" + rs.getString("target_id")); + } + } + } catch (SQLException e) { + log.error("Error collecting viewport vessel IDs from {}: {}", tableName, e.getMessage()); + } + } + } + + long elapsed = System.currentTimeMillis() - startMs; + log.info("2-pass viewport filter Pass 1: {} unique vessels found across {} tables in {}ms", + vesselIds.size(), strategyMap.size(), elapsed); + return vesselIds; + } + private List processTableRange(TrackQueryRequest request, TableStrategy strategy, TimeRange range) { - return processTableRange(request, strategy, range, null); + return processTableRange(request, strategy, range, null, null); } private List processTableRange(TrackQueryRequest request, TableStrategy strategy, TimeRange range, String sessionId) { + return processTableRange(request, strategy, range, sessionId, null); + } + + private List processTableRange(TrackQueryRequest request, TableStrategy strategy, TimeRange range, String sessionId, Set viewportVesselIds) { Map vesselMap = new HashMap<>(20000); // 예상 선박 수 String tableName = strategy.getTableName(); @@ -413,7 +472,7 @@ public class ChunkedTrackStreamingService { log.info("Using {} table with simplification level {} for range [{} - {}]", strategy, simplificationLevel, range.getStart(), range.getEnd()); - String sql = buildRangeQuery(tableName, request, range, simplificationLevel); + String sql = buildRangeQuery(tableName, request, range, simplificationLevel, viewportVesselIds); // Connection을 직접 사용하여 스트리밍 처리 try (Connection conn = queryDataSource.getConnection(); @@ -424,7 +483,11 @@ public class ChunkedTrackStreamingService { ps.setTimestamp(paramIndex++, Timestamp.valueOf(range.getStart())); ps.setTimestamp(paramIndex++, Timestamp.valueOf(range.getEnd())); - if (request.getViewport() != null) { + // 2-pass 뷰포트 필터: vessel ID 배열 또는 기존 viewport 좌표 바인딩 + if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) { + Array vesselArray = conn.createArrayOf("text", viewportVesselIds.toArray(new String[0])); + ps.setArray(paramIndex++, vesselArray); + } else if (request.getViewport() != null) { ViewportFilter vp = request.getViewport(); ps.setDouble(paramIndex++, vp.getMinLon()); ps.setDouble(paramIndex++, vp.getMinLat()); @@ -467,12 +530,34 @@ public class ChunkedTrackStreamingService { // 5min 테이블은 이 필드가 없을 수 있음 } + // 선박 객체는 geometry가 비어있어도 생성 (선박 누락 방지) + // 먼저 선박 객체 확보 + VesselAccumulator accumulator = vesselMap.get(vesselId); + if (accumulator == null) { + vesselCount++; // 새 선박 추가 시 카운트 + accumulator = new VesselAccumulator(); + accumulator.sigSrcCd = sigSrcCd; + accumulator.targetId = targetId; + + // 선박 정보 조회 (캐시 우선) + VesselInfo vesselInfo = getVesselInfo(sigSrcCd, targetId); + accumulator.shipName = vesselInfo.shipName; + accumulator.shipType = vesselInfo.shipType; + + // shipKindCode 계산 (선박명 패턴 매칭 포함 - 어망/부이 판별) + accumulator.shipKindCode = ShipKindCodeConverter.getShipKindCodeWithNamePattern( + sigSrcCd, vesselInfo.shipType, vesselInfo.shipName, targetId); + + vesselMap.put(vesselId, accumulator); + } + if (trackGeomWkt != null && !trackGeomWkt.isEmpty() && !"LINESTRING EMPTY".equals(trackGeomWkt)) { try { LineString lineString = (LineString) wktReader.read(trackGeomWkt); - // 빈 LineString 처리 + // 빈 LineString은 건너뛰되, 선박 객체는 이미 생성됨 if (lineString.getNumPoints() == 0) { + trackCount++; continue; } @@ -505,10 +590,10 @@ public class ChunkedTrackStreamingService { // 속도 계산 (두 점 사이의 거리와 시간차를 이용) double speed = 0.0; long currentTimeMillis; - + // Unix timestamp (초 단위를 밀리초로 변환) currentTimeMillis = (long)coord.getM() * 1000; - + if (prevCoord != null && i > 0) { double distance = calculateDistance(prevCoord, coord); // 해리(nm) double timeDiff = (currentTimeMillis - prevTimeMillis) / 3600000.0; // 시간(hour) @@ -522,32 +607,6 @@ public class ChunkedTrackStreamingService { prevTimeMillis = currentTimeMillis; } - // 선박별 데이터 병합 - VesselAccumulator accumulator = vesselMap.get(vesselId); - if (accumulator == null) { - vesselCount++; // 새 선박 추가 시 카운트 - accumulator = new VesselAccumulator(); - accumulator.sigSrcCd = sigSrcCd; - accumulator.targetId = targetId; - - // 선박 정보 조회 (캐시 우선) - VesselInfo vesselInfo = getVesselInfo(sigSrcCd, targetId); - accumulator.shipName = vesselInfo.shipName; - accumulator.shipType = vesselInfo.shipType; - - // shipKindCode 계산 (선박명 패턴 매칭 포함 - 어망/부이 판별) - accumulator.shipKindCode = ShipKindCodeConverter.getShipKindCodeWithNamePattern( - sigSrcCd, vesselInfo.shipType, vesselInfo.shipName, targetId); - - // 테스트용 로그 - 처음 10개 선박만 -// if (vesselCount <= 10) { -// log.info("[VESSEL_INFO] {} - Name: {}, Type: {}", -// vesselId, vesselInfo.shipName, vesselInfo.shipType); -// } - - vesselMap.put(vesselId, accumulator); - } - // 데이터 직접 추가 (성능 개선) accumulator.geometry.addAll(geometry); accumulator.timestamps.addAll(timestamps); @@ -649,6 +708,9 @@ public class ChunkedTrackStreamingService { log.info("Query {} using strategies: {}", queryId, strategyMap.keySet()); + // 2-pass 뷰포트 필터링: 전체 시간 범위에서 뷰포트 교차 선박 수집 + Set viewportVesselIds = collectViewportVesselIds(strategyMap, request); + List responses = new ArrayList<>(); int globalChunkIndex = 0; Set uniqueVesselIds = new HashSet<>(); @@ -665,7 +727,7 @@ public class ChunkedTrackStreamingService { for (TimeRange range : ranges) { try { // 선박 기준 페이지네이션으로 모든 데이터 수집 후 병합 - List compactTracks = processDailyTableWithPagination(request, range, null); + List compactTracks = processDailyTableWithPagination(request, range, null, viewportVesselIds); if (!compactTracks.isEmpty()) { // 메시지 크기로 분할 @@ -704,7 +766,7 @@ public class ChunkedTrackStreamingService { for (TimeChunk chunk : chunks) { try { List compactTracks = processTableRange(request, strategy, - new TimeRange(chunk.start, chunk.end)); + new TimeRange(chunk.start, chunk.end), null, viewportVesselIds); if (!compactTracks.isEmpty()) { // 메시지 크기로 분할 (5min/hourly에도 적용) @@ -797,6 +859,9 @@ public class ChunkedTrackStreamingService { estimatedTotalMinutes = (int)Duration.between(request.getStartTime(), request.getEndTime()).toMinutes(); log.info("Total time range: {} minutes", estimatedTotalMinutes); + // 2-pass 뷰포트 필터링: 전체 시간 범위에서 뷰포트 교차 선박 수집 + Set viewportVesselIds = collectViewportVesselIds(strategyMap, request); + int globalChunkIndex = 0; int totalVessels = 0; Set uniqueVesselIds = new HashSet<>(); @@ -811,7 +876,7 @@ public class ChunkedTrackStreamingService { if (strategy == TableStrategy.DAILY) { // Daily는 기존 방식 유지 (이미 일 단위) processDailyStrategy(ranges, request, queryId, chunkConsumer, statusConsumer, - globalChunkIndex, uniqueVesselIds); + globalChunkIndex, uniqueVesselIds, viewportVesselIds); globalChunkIndex = getCurrentChunkIndex(); } else { // Hourly/5min은 6시간 단위로 그룹화하여 처리 @@ -834,7 +899,7 @@ public class ChunkedTrackStreamingService { } List compactTracks = processTableRangeWithBaseTime( - request, strategy, range, baseTime); + request, strategy, range, baseTime, viewportVesselIds); // 선박별로 볕합 for (CompactVesselTrack track : compactTracks) { @@ -1533,14 +1598,15 @@ public class ChunkedTrackStreamingService { } /** - * 쿼리 생성 (간소화 적용) + * 쿼리 생성 (간소화 적용, 2-pass 뷰포트 필터링 지원) */ - private String buildRangeQuery(String tableName, TrackQueryRequest request, TimeRange range, SimplificationLevel simplificationLevel) { + private String buildRangeQuery(String tableName, TrackQueryRequest request, TimeRange range, + SimplificationLevel simplificationLevel, Set viewportVesselIds) { StringBuilder sql = new StringBuilder(); sql.append("SELECT sig_src_cd, target_id, time_bucket, "); // track_geom 고정 사용 - + // 간소화 적용 if (simplificationLevel != SimplificationLevel.NONE && simplificationLevel.getTolerance() > 0) { sql.append("public.ST_AsText(public.ST_Simplify(track_geom, ").append(simplificationLevel.getTolerance()) @@ -1562,8 +1628,10 @@ public class ChunkedTrackStreamingService { sql.append("WHERE time_bucket >= ? "); sql.append("AND time_bucket < ? "); - // Viewport 필터 - 파라미터 바인딩 사용 - if (request.getViewport() != null) { + // 2-pass 뷰포트 필터: vessel ID 기반 필터 (Pass 2) 또는 기존 viewport 필터 + if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) { + sql.append("AND sig_src_cd || '_' || target_id = ANY(?) "); + } else if (request.getViewport() != null) { sql.append("AND public.ST_Intersects(track_geom, public.ST_MakeEnvelope(?, ?, ?, ?, 4326)) "); } @@ -1586,7 +1654,7 @@ public class ChunkedTrackStreamingService { * Daily 테이블용 페이지네이션 쿼리 생성 */ private String buildDailyPaginationQuery(String tableName, TrackQueryRequest request, TimeRange range, - double tolerance, String lastSigSrcCd, String lastTargetId) { + double tolerance, String lastSigSrcCd, String lastTargetId, Set viewportVesselIds) { StringBuilder sql = new StringBuilder(); sql.append("SELECT sig_src_cd, target_id, time_bucket, "); @@ -1611,8 +1679,10 @@ public class ChunkedTrackStreamingService { sql.append("AND (sig_src_cd, target_id) > (?, ?) "); } - // Viewport 필터 - if (request.getViewport() != null) { + // 2-pass 뷰포트 필터: vessel ID 기반 필터 (Pass 2) 또는 기존 viewport 필터 + if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) { + sql.append("AND sig_src_cd || '_' || target_id = ANY(?) "); + } else if (request.getViewport() != null) { sql.append("AND public.ST_Intersects(track_geom, public.ST_MakeEnvelope(?, ?, ?, ?, 4326)) "); } @@ -1634,6 +1704,10 @@ public class ChunkedTrackStreamingService { * Daily 테이블 처리 - 선박 기준 페이지네이션으로 모든 데이터 수집 후 병합 */ private List processDailyTableWithPagination(TrackQueryRequest request, TimeRange range, String sessionId) { + return processDailyTableWithPagination(request, range, sessionId, null); + } + + private List processDailyTableWithPagination(TrackQueryRequest request, TimeRange range, String sessionId, Set viewportVesselIds) { Map vesselMap = new HashMap<>(50000); // 예상 선박 수 String tableName = TableStrategy.DAILY.getTableName(); @@ -1649,7 +1723,7 @@ public class ChunkedTrackStreamingService { // 페이지네이션 루프 while (true) { - String sql = buildDailyPaginationQuery(tableName, request, range, tolerance, lastSigSrcCd, lastTargetId); + String sql = buildDailyPaginationQuery(tableName, request, range, tolerance, lastSigSrcCd, lastTargetId, viewportVesselIds); try (Connection conn = queryDataSource.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { @@ -1664,8 +1738,11 @@ public class ChunkedTrackStreamingService { ps.setString(paramIndex++, lastTargetId); } - // Viewport 필터 파라미터 - if (request.getViewport() != null) { + // 2-pass 뷰포트 필터: vessel ID 배열 또는 기존 viewport 좌표 바인딩 + if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) { + Array vesselArray = conn.createArrayOf("text", viewportVesselIds.toArray(new String[0])); + ps.setArray(paramIndex++, vesselArray); + } else if (request.getViewport() != null) { ViewportFilter vp = request.getViewport(); ps.setDouble(paramIndex++, vp.getMinLon()); ps.setDouble(paramIndex++, vp.getMinLat()); @@ -1705,10 +1782,27 @@ public class ChunkedTrackStreamingService { endTimeStr = rs.getString("end_time"); } catch (SQLException ignored) {} + // 선박 객체는 geometry가 비어있어도 생성 (선박 누락 방지) + final String finalSigSrcCd = currentSigSrcCd; + final String finalTargetId = currentTargetId; + VesselAccumulator accumulator = vesselMap.computeIfAbsent(vesselId, k -> { + VesselInfo info = getVesselInfo(finalSigSrcCd, finalTargetId); + VesselAccumulator acc = new VesselAccumulator(); + acc.sigSrcCd = finalSigSrcCd; + acc.targetId = finalTargetId; + acc.shipName = info.shipName; + acc.shipType = info.shipType; + // shipKindCode 계산 (선박명 패턴 매칭 포함 - 어망/부이 판별) + acc.shipKindCode = ShipKindCodeConverter.getShipKindCodeWithNamePattern( + finalSigSrcCd, info.shipType, info.shipName, finalTargetId); + return acc; + }); + if (trackGeomWkt != null && !trackGeomWkt.isEmpty() && !"LINESTRING EMPTY".equals(trackGeomWkt)) { try { LineString lineString = (LineString) wktReader.read(trackGeomWkt); + // 빈 LineString은 건너뛰되, 선박 객체는 이미 생성됨 if (lineString.getNumPoints() == 0) { pageTrackCount++; continue; @@ -1754,22 +1848,6 @@ public class ChunkedTrackStreamingService { prevTimeMillis = currentTimeMillis; } - // VesselAccumulator에 누적 - final String finalSigSrcCd = currentSigSrcCd; - final String finalTargetId = currentTargetId; - VesselAccumulator accumulator = vesselMap.computeIfAbsent(vesselId, k -> { - VesselInfo info = getVesselInfo(finalSigSrcCd, finalTargetId); - VesselAccumulator acc = new VesselAccumulator(); - acc.sigSrcCd = finalSigSrcCd; - acc.targetId = finalTargetId; - acc.shipName = info.shipName; - acc.shipType = info.shipType; - // shipKindCode 계산 (선박명 패턴 매칭 포함 - 어망/부이 판별) - acc.shipKindCode = ShipKindCodeConverter.getShipKindCodeWithNamePattern( - finalSigSrcCd, info.shipType, info.shipName, finalTargetId); - return acc; - }); - accumulator.geometry.addAll(geometry); accumulator.timestamps.addAll(timestamps); accumulator.speeds.addAll(speeds); @@ -2068,10 +2146,11 @@ public class ChunkedTrackStreamingService { * 기준 시간을 사용하여 테이블 범위 처리 (M값 재계산) */ private List processTableRangeWithBaseTime( - TrackQueryRequest request, TableStrategy strategy, TimeRange range, LocalDateTime dayBaseTime) { + TrackQueryRequest request, TableStrategy strategy, TimeRange range, LocalDateTime dayBaseTime, + Set viewportVesselIds) { // 기본 처리 후 M값 보정 - List tracks = processTableRange(request, strategy, range); + List tracks = processTableRange(request, strategy, range, null, viewportVesselIds); // 각 트랙의 timestamp를 dayBaseTime 기준으로 재계산 및 간소화 for (CompactVesselTrack track : tracks) { @@ -2215,7 +2294,8 @@ public class ChunkedTrackStreamingService { Consumer chunkConsumer, Consumer statusConsumer, int startChunkIndex, - Set uniqueVesselIds) throws Exception { + Set uniqueVesselIds, + Set viewportVesselIds) throws Exception { currentGlobalChunkIndex = startChunkIndex; @@ -2225,7 +2305,7 @@ public class ChunkedTrackStreamingService { for (TimeRange range : ranges) { // 페이지별 즉시 스트리밍으로 처리 (세션 캐시 공유) - streamDailyTableWithPagination(request, range, queryId, chunkConsumer, statusConsumer, uniqueVesselIds, sessionVesselCache); + streamDailyTableWithPagination(request, range, queryId, chunkConsumer, statusConsumer, uniqueVesselIds, sessionVesselCache, viewportVesselIds); } log.info("Session vessel cache final size: {} vessels cached", sessionVesselCache.size()); @@ -2238,7 +2318,8 @@ public class ChunkedTrackStreamingService { Consumer chunkConsumer, Consumer statusConsumer, Set uniqueVesselIds, - Map sessionVesselCache) { + Map sessionVesselCache, + Set viewportVesselIds) { String tableName = TableStrategy.DAILY.getTableName(); // 줌 레벨에 따른 강화된 간소화 tolerance @@ -2265,7 +2346,7 @@ public class ChunkedTrackStreamingService { } Map pageVesselMap = new HashMap<>(20000); - String sql = buildDailyPaginationQuery(tableName, request, range, tolerance, lastSigSrcCd, lastTargetId); + String sql = buildDailyPaginationQuery(tableName, request, range, tolerance, lastSigSrcCd, lastTargetId, viewportVesselIds); try (Connection conn = queryDataSource.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { @@ -2280,8 +2361,11 @@ public class ChunkedTrackStreamingService { ps.setString(paramIndex++, lastTargetId); } - // Viewport 필터 파라미터 - if (request.getViewport() != null) { + // 2-pass 뷰포트 필터: vessel ID 배열 또는 기존 viewport 좌표 바인딩 + if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) { + Array vesselArray = conn.createArrayOf("text", viewportVesselIds.toArray(new String[0])); + ps.setArray(paramIndex++, vesselArray); + } else if (request.getViewport() != null) { ViewportFilter vp = request.getViewport(); ps.setDouble(paramIndex++, vp.getMinLon()); ps.setDouble(paramIndex++, vp.getMinLat()); @@ -2355,10 +2439,32 @@ public class ChunkedTrackStreamingService { double distanceNm = Double.parseDouble(trackData[6]); double maxSpeed = Double.parseDouble(trackData[7]); + // 선박 객체는 geometry가 비어있어도 생성 (선박 누락 방지) + final String finalSigSrcCd = currentSigSrcCd; + final String finalTargetId = currentTargetId; + final String finalVesselId = vesselId; + VesselAccumulator accumulator = pageVesselMap.computeIfAbsent(vesselId, k -> { + // 세션 캐시에서 조회 (이미 preload됨) + VesselInfo info = sessionVesselCache.get(finalVesselId); + if (info == null) { + info = new VesselInfo(null, null); + } + VesselAccumulator acc = new VesselAccumulator(); + acc.sigSrcCd = finalSigSrcCd; + acc.targetId = finalTargetId; + acc.shipName = info.shipName; + acc.shipType = info.shipType; + // shipKindCode 계산 (선박명 패턴 매칭 포함 - 어망/부이 판별) + acc.shipKindCode = ShipKindCodeConverter.getShipKindCodeWithNamePattern( + finalSigSrcCd, info.shipType, info.shipName, finalTargetId); + return acc; + }); + if (trackGeomWkt != null && !trackGeomWkt.isEmpty() && !"LINESTRING EMPTY".equals(trackGeomWkt)) { try { LineString lineString = (LineString) wktReader.read(trackGeomWkt); + // 빈 LineString은 건너뛰되, 선박 객체는 이미 생성됨 if (lineString.getNumPoints() == 0) { pageTrackCount++; continue; @@ -2395,27 +2501,6 @@ public class ChunkedTrackStreamingService { prevTimeMillis = currentTimeMillis; } - // VesselAccumulator에 누적 (세션 캐시에서 조회 - 이미 배치로 로드됨) - final String finalSigSrcCd = currentSigSrcCd; - final String finalTargetId = currentTargetId; - final String finalVesselId = vesselId; - VesselAccumulator accumulator = pageVesselMap.computeIfAbsent(vesselId, k -> { - // 세션 캐시에서 조회 (이미 preload됨) - VesselInfo info = sessionVesselCache.get(finalVesselId); - if (info == null) { - info = new VesselInfo(null, null); - } - VesselAccumulator acc = new VesselAccumulator(); - acc.sigSrcCd = finalSigSrcCd; - acc.targetId = finalTargetId; - acc.shipName = info.shipName; - acc.shipType = info.shipType; - // shipKindCode 계산 (선박명 패턴 매칭 포함 - 어망/부이 판별) - acc.shipKindCode = ShipKindCodeConverter.getShipKindCodeWithNamePattern( - finalSigSrcCd, info.shipType, info.shipName, finalTargetId); - return acc; - }); - accumulator.geometry.addAll(geometry); accumulator.timestamps.addAll(timestamps); accumulator.speeds.addAll(speeds); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 394fc36..41a5e7b 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -32,42 +32,39 @@ spring: datasource: # 수집 DB collect: - url: ${COLLECT_DB_URL:jdbc:postgresql://localhost:5432/wingdb?stringtype=unspecified¤tSchema=signal&TimeZone=Asia/Seoul} + jdbc-url: ${COLLECT_DB_URL:jdbc:postgresql://localhost:5432/wingdb?stringtype=unspecified¤tSchema=signal&TimeZone=Asia/Seoul} username: ${COLLECT_DB_USER:collect_user} password: ${COLLECT_DB_PASS:collect_pass} driver-class-name: org.postgresql.Driver - hikari: - pool-name: CollectHikariPool - connection-timeout: 30000 - idle-timeout: 600000 - max-lifetime: 1800000 - maximum-pool-size: 20 - minimum-idle: 5 + pool-name: CollectHikariPool + connection-timeout: 30000 + idle-timeout: 600000 + max-lifetime: 1800000 + maximum-pool-size: 20 + minimum-idle: 5 # 조회 DB query: - url: ${QUERY_DB_URL:jdbc:postgresql://localhost:5432/wingdb?stringtype=unspecified¤tSchema=signal&TimeZone=Asia/Seoul} + jdbc-url: ${QUERY_DB_URL:jdbc:postgresql://localhost:5432/wingdb?stringtype=unspecified¤tSchema=signal&TimeZone=Asia/Seoul} username: ${QUERY_DB_USER:query_user} password: ${QUERY_DB_PASS:query_pass} driver-class-name: org.postgresql.Driver - hikari: - pool-name: QueryHikariPool - connection-timeout: 30000 - idle-timeout: 600000 - max-lifetime: 1800000 - maximum-pool-size: 30 - minimum-idle: 10 + pool-name: QueryHikariPool + connection-timeout: 30000 + idle-timeout: 600000 + max-lifetime: 1800000 + maximum-pool-size: 30 + minimum-idle: 10 # 배치 메타데이터 DB batch: - url: ${BATCH_DB_URL:jdbc:postgresql://localhost:5432/wingdb?stringtype=unspecified¤tSchema=signal&TimeZone=Asia/Seoul} + jdbc-url: ${BATCH_DB_URL:jdbc:postgresql://localhost:5432/wingdb?stringtype=unspecified¤tSchema=signal&TimeZone=Asia/Seoul} username: ${BATCH_DB_USER:batch_user} password: ${BATCH_DB_PASS:batch_pass} driver-class-name: org.postgresql.Driver - hikari: - pool-name: BatchHikariPool - maximum-pool-size: 10 - minimum-idle: 2 + pool-name: BatchHikariPool + maximum-pool-size: 10 + minimum-idle: 2 # 로깅 설정 logging: @@ -84,8 +81,10 @@ logging: file: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n" file: name: ${LOG_PATH:logs}/vessel-batch.log - max-size: 100MB - max-history: 30 + logback: + rollingpolicy: + max-file-size: 100MB + max-history: 30 # 액추에이터 설정 management: