feat: 항적 조회 안정성 개선 - 계층적 폴백, 선박 누락 방지, 2-pass 뷰포트 필터링

- GisService: 하위 구간 미집계 감지 시 상위 테이블에서 보완 집계하는
  계층적 폴백 로직 추가 (daily→hourly→5min)
- ChunkedTrackStreamingService: 빈 LineString으로 인한 선박 객체 누락 방지,
  MAX_TRACKS_PER_CHUNK를 500,000으로 증가
- ChunkedTrackStreamingService: 2-pass 뷰포트 필터링 구현
  Pass 1에서 뷰포트 교차 선박 ID를 수집하고, Pass 2에서 해당 선박의
  전체 항적을 viewport 필터 없이 조회하여 선박 소실 현상 해결
- TrackConverter: 빈 geometry에서도 선박 객체 반환하도록 수정

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
HeungTak Lee 2026-02-05 06:53:47 +09:00
부모 89482d854f
커밋 cc165fc36a
6개의 변경된 파일531개의 추가작업 그리고 199개의 파일을 삭제

35
.claude.bak/settings.json Normal file
파일 보기

@ -0,0 +1,35 @@
{
"hooks": {
"SessionStart": [
{
"hooks": [
{
"type": "prompt",
"prompt": "새 세션이 시작되었습니다. 다음 파일들을 확인하여 이전 작업 컨텍스트를 파악해주세요:\n\n1. .claude/SESSION_HANDOVER.md - 세션 이력 및 작업 현황\n2. git status - 현재 브랜치 및 변경사항\n\n파악한 내용을 간략히 요약하고, 미완료 작업이 있다면 안내해주세요."
}
]
}
],
"PreCompact": [
{
"matcher": "auto",
"hooks": [
{
"type": "prompt",
"prompt": "컨텍스트 압축 전입니다. .claude/SESSION_HANDOVER.md 파일을 현재 세션 작업 내용으로 업데이트해주세요. 세션 이력에 오늘 날짜와 완료된 작업을 추가하고, 진행 중인 작업이 있다면 미완료 섹션에 기록해주세요."
}
]
}
],
"SessionEnd": [
{
"hooks": [
{
"type": "prompt",
"prompt": "세션이 종료됩니다. .claude/SESSION_HANDOVER.md 파일이 최신 상태인지 확인하고, 업데이트가 필요하면 현재 세션 작업 내용을 반영해주세요."
}
]
}
]
}
}

파일 보기

@ -0,0 +1,47 @@
{
"permissions": {
"allow": [
"Bash(git add:*)",
"Bash(git commit:*)",
"Bash(git status:*)",
"Bash(git restore:*)",
"Bash(git reset:*)",
"Bash(git checkout:*)",
"Bash(git update-index:*)",
"Bash(mvn clean:*)",
"Bash(mvn compile:*)",
"Bash(mvn package:*)",
"Bash(mvn test:*)",
"Bash(mvn clean compile:*)",
"Bash(mvn clean package:*)",
"Bash(./mvnw:*)",
"Bash(ls:*)",
"Bash(find:*)",
"Bash(grep:*)",
"Bash(curl:*)",
"WebSearch",
"WebFetch(domain:github.com)",
"WebFetch(domain:docs.spring.io)",
"WebFetch(domain:baeldung.com)",
"Bash(cls)",
"Bash(clear)",
"Bash(./gradlew build:*)",
"Bash(gradlew.bat build:*)",
"Bash(cmd /c \"dir /b C:\\\\Users\\\\lht87\\\\IdeaProjects\\\\signal_batch\")",
"Bash(powershell -Command:*)",
"Bash(dir /b \"C:\\\\Users\\\\lht87\\\\.claude\"\" 2>nul || echo \"No .claude folder in home \")",
"Bash(cmd /c \"dir /b C:\\\\Users\\\\lht87\\\\.claude\\\\ 2>nul\")",
"Bash(cmd /c \"dir /s /b C:\\\\Users\\\\lht87\\\\IdeaProjects\\\\*.json 2>nul | findstr /i settings.json\")",
"Bash(powershell -NoProfile -Command:*)",
"Bash(claude doctor:*)",
"Bash(dir C:Userslht87.claude)",
"Bash(cat:*)",
"Bash(dir:*)",
"Bash(echo:*)",
"Bash(claude config:*)"
],
"deny": [],
"ask": [],
"defaultMode": "acceptEdits"
}
}

파일 보기

@ -10,6 +10,7 @@ import gc.mda.signal_batch.domain.vessel.service.IntegrationVesselService;
import gc.mda.signal_batch.global.util.IntegrationSignalConstants;
import gc.mda.signal_batch.global.util.NationalCodeUtil;
import gc.mda.signal_batch.global.util.ShipKindCodeConverter;
import gc.mda.signal_batch.global.util.TrackSimplificationUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jdbc.core.JdbcTemplate;
@ -20,11 +21,15 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@ -321,6 +326,15 @@ public class GisService {
return allTracks;
}
/**
* 선박별 항적 조회 (계층적 보완 조회 + 간소화)
*
* 조회 전략:
* 1. 상위 테이블(daily hourly 5min) 순서로 조회
* 2. 테이블에서 누락 구간 감지
* 3. 누락 구간은 하위 테이블에서 보완 조회 + 상위 수준으로 간소화
* 4. 전체 시간순 정렬
*/
public List<CompactVesselTrack> getVesselTracks(VesselTracksRequest request) {
JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource);
List<CompactVesselTrack> results = new ArrayList<>();
@ -329,80 +343,8 @@ public class GisService {
LocalDateTime endTime = request.getEndTime();
for (VesselTracksRequest.VesselIdentifier vessel : request.getVessels()) {
// String vesselId = vessel.getSigSrcCd() + "_" + vessel.getTargetId();
// Determine which tables to query based on time range
Duration duration = Duration.between(startTime, endTime);
long hours = duration.toHours();
List<TrackResponse> tracks = new ArrayList<>();
LocalDateTime now = LocalDateTime.now();
LocalDateTime currentHour = now.withMinute(0).withSecond(0).withNano(0);
LocalDateTime currentDay = now.withHour(0).withMinute(0).withSecond(0).withNano(0);
// Query daily table first (oldest data)
if (hours > 24 && startTime.isBefore(currentDay)) {
// Query daily table - time_bucket is DATE type
String sqlDaily = """
SELECT sig_src_cd, target_id,
time_bucket::timestamp as time_bucket,
public.ST_AsText(track_geom) as track_geom,
distance_nm, avg_speed, max_speed, point_count
FROM signal.t_vessel_tracks_daily
WHERE sig_src_cd = ? AND target_id = ?
AND time_bucket BETWEEN ?::date AND ?::date
ORDER BY time_bucket
""";
LocalDateTime queryDailyEnd = endTime.isBefore(currentDay) ? endTime : currentDay.minusDays(1);
tracks.addAll(jdbcTemplate.query(sqlDaily, this::mapTrackResponse,
vessel.getSigSrcCd(), vessel.getTargetId(),
Timestamp.valueOf(startTime), Timestamp.valueOf(queryDailyEnd)));
}
// Query hourly table (middle-aged data)
if (hours > 1 && startTime.isBefore(currentHour)) {
// Query hourly table
String sqlHourly = """
SELECT sig_src_cd, target_id, time_bucket,
public.ST_AsText(track_geom) as track_geom,
distance_nm, avg_speed, max_speed, point_count
FROM signal.t_vessel_tracks_hourly
WHERE sig_src_cd = ? AND target_id = ?
AND time_bucket BETWEEN ? AND ?
ORDER BY time_bucket
""";
// Hourly data should start from currentDay if daily data was queried
LocalDateTime queryHourlyStart = hours > 24 ? currentDay : startTime;
LocalDateTime queryHourlyEnd = endTime.isBefore(currentHour) ? endTime : currentHour.minusHours(1);
if (queryHourlyEnd.isAfter(queryHourlyStart)) {
tracks.addAll(jdbcTemplate.query(sqlHourly, this::mapTrackResponse,
vessel.getSigSrcCd(), vessel.getTargetId(),
Timestamp.valueOf(queryHourlyStart), Timestamp.valueOf(queryHourlyEnd)));
}
}
// Query 5min table last (newest data)
if (hours <= 1 || endTime.isAfter(currentHour)) {
// Query 5min table for recent data
String sql5min = """
SELECT sig_src_cd, target_id, time_bucket,
public.ST_AsText(track_geom) as track_geom,
distance_nm, avg_speed, max_speed, point_count
FROM signal.t_vessel_tracks_5min
WHERE sig_src_cd = ? AND target_id = ?
AND time_bucket BETWEEN ? AND ?
ORDER BY time_bucket
""";
LocalDateTime query5minStart = startTime.isAfter(currentHour) ? startTime : currentHour;
if (endTime.isAfter(currentHour)) {
tracks.addAll(jdbcTemplate.query(sql5min, this::mapTrackResponse,
vessel.getSigSrcCd(), vessel.getTargetId(),
Timestamp.valueOf(query5minStart), Timestamp.valueOf(endTime)));
}
}
List<TrackResponse> tracks = queryVesselTracksWithFallback(
jdbcTemplate, vessel.getSigSrcCd(), vessel.getTargetId(), startTime, endTime);
// Sort all tracks by time_bucket to ensure proper ordering
tracks.sort((t1, t2) -> t1.getTimeBucket().compareTo(t2.getTimeBucket()));
@ -421,6 +363,234 @@ public class GisService {
return results;
}
/**
* 계층적 보완 조회 로직
* 상위 테이블에서 데이터가 없는 구간을 하위 테이블에서 보완
*/
private List<TrackResponse> queryVesselTracksWithFallback(
JdbcTemplate jdbcTemplate, String sigSrcCd, String targetId,
LocalDateTime startTime, LocalDateTime endTime) {
List<TrackResponse> allTracks = new ArrayList<>();
Duration duration = Duration.between(startTime, endTime);
long hours = duration.toHours();
LocalDateTime now = LocalDateTime.now();
// 배치 완료 여유 시간 (hourly 배치는 매시 10분 시작, 5분 소요)
LocalDateTime safeHourlyBoundary = now.withMinute(0).withSecond(0).withNano(0);
if (now.getMinute() < 15) {
safeHourlyBoundary = safeHourlyBoundary.minusHours(1);
}
LocalDateTime safeDailyBoundary = now.toLocalDate().atStartOfDay().minusDays(1);
// === 1단계: Daily 테이블 조회 ===
Set<LocalDate> coveredDays = new HashSet<>();
if (hours >= 24 && startTime.toLocalDate().isBefore(safeDailyBoundary.toLocalDate().plusDays(1))) {
LocalDate dailyStart = startTime.toLocalDate();
LocalDate dailyEnd = endTime.toLocalDate().isBefore(safeDailyBoundary.toLocalDate())
? endTime.toLocalDate()
: safeDailyBoundary.toLocalDate();
if (!dailyEnd.isBefore(dailyStart)) {
List<TrackResponse> dailyTracks = queryDailyTracks(
jdbcTemplate, sigSrcCd, targetId, dailyStart, dailyEnd);
for (TrackResponse track : dailyTracks) {
coveredDays.add(track.getTimeBucket().toLocalDate());
}
allTracks.addAll(dailyTracks);
log.debug("[FALLBACK] Daily: {} days covered for {}_{}",
coveredDays.size(), sigSrcCd, targetId);
}
}
// === 2단계: Daily 누락 구간 Hourly에서 보완 ===
if (hours >= 24) {
LocalDate dailyStart = startTime.toLocalDate();
LocalDate dailyEnd = endTime.toLocalDate().isBefore(safeDailyBoundary.toLocalDate())
? endTime.toLocalDate()
: safeDailyBoundary.toLocalDate();
List<LocalDate> missingDays = detectMissingDays(dailyStart, dailyEnd, coveredDays);
for (LocalDate missingDay : missingDays) {
LocalDateTime dayStart = missingDay.atStartOfDay();
LocalDateTime dayEnd = missingDay.plusDays(1).atStartOfDay();
// Hourly로 보완 조회 (Daily 수준으로 간소화)
List<TrackResponse> fallbackTracks = queryHourlyTracks(
jdbcTemplate, sigSrcCd, targetId, dayStart, dayEnd);
for (TrackResponse track : fallbackTracks) {
track.setTrackGeom(TrackSimplificationUtils.simplifyDailyTrack(track.getTrackGeom()));
}
allTracks.addAll(fallbackTracks);
log.debug("[FALLBACK] Daily missing {} → {} hourly segments (simplified to daily level)",
missingDay, fallbackTracks.size());
}
}
// === 3단계: Hourly 테이블 조회 ===
Set<LocalDateTime> coveredHours = new HashSet<>();
LocalDateTime hourlyStart = hours >= 24
? safeDailyBoundary.plusDays(1) // Daily 다음날부터
: startTime.withMinute(0).withSecond(0).withNano(0);
LocalDateTime hourlyEnd = endTime.isBefore(safeHourlyBoundary) ? endTime : safeHourlyBoundary;
if (hours > 1 && hourlyStart.isBefore(hourlyEnd)) {
List<TrackResponse> hourlyTracks = queryHourlyTracks(
jdbcTemplate, sigSrcCd, targetId, hourlyStart, hourlyEnd);
for (TrackResponse track : hourlyTracks) {
coveredHours.add(track.getTimeBucket().withMinute(0).withSecond(0).withNano(0));
}
allTracks.addAll(hourlyTracks);
log.debug("[FALLBACK] Hourly: {} hours covered for {}_{}",
coveredHours.size(), sigSrcCd, targetId);
}
// === 4단계: Hourly 누락 구간 5min에서 보완 ===
if (hours > 1 && hourlyStart.isBefore(hourlyEnd)) {
List<LocalDateTime> missingHours = detectMissingHours(hourlyStart, hourlyEnd, coveredHours);
for (LocalDateTime missingHour : missingHours) {
LocalDateTime hourStart = missingHour;
LocalDateTime hourEnd = missingHour.plusHours(1);
// 5min으로 보완 조회 (Hourly 수준으로 간소화)
List<TrackResponse> fallbackTracks = query5minTracks(
jdbcTemplate, sigSrcCd, targetId, hourStart, hourEnd);
for (TrackResponse track : fallbackTracks) {
track.setTrackGeom(TrackSimplificationUtils.simplifyHourlyTrack(track.getTrackGeom()));
}
allTracks.addAll(fallbackTracks);
log.debug("[FALLBACK] Hourly missing {} → {} 5min segments (simplified to hourly level)",
missingHour, fallbackTracks.size());
}
}
// === 5단계: 5min 테이블 조회 (최신 데이터) ===
LocalDateTime fiveMinStart = safeHourlyBoundary.isAfter(startTime) ? safeHourlyBoundary : startTime;
if (endTime.isAfter(fiveMinStart)) {
List<TrackResponse> fiveMinTracks = query5minTracks(
jdbcTemplate, sigSrcCd, targetId, fiveMinStart, endTime);
allTracks.addAll(fiveMinTracks);
log.debug("[FALLBACK] 5min: {} segments for {}_{} ({} ~ {})",
fiveMinTracks.size(), sigSrcCd, targetId, fiveMinStart, endTime);
}
return allTracks;
}
/**
* Daily 테이블 조회
*/
private List<TrackResponse> queryDailyTracks(
JdbcTemplate jdbcTemplate, String sigSrcCd, String targetId,
LocalDate startDate, LocalDate endDate) {
String sql = """
SELECT sig_src_cd, target_id,
time_bucket::timestamp as time_bucket,
public.ST_AsText(track_geom) as track_geom,
distance_nm, avg_speed, max_speed, point_count
FROM signal.t_vessel_tracks_daily
WHERE sig_src_cd = ? AND target_id = ?
AND time_bucket BETWEEN ?::date AND ?::date
ORDER BY time_bucket
""";
return jdbcTemplate.query(sql, this::mapTrackResponse,
sigSrcCd, targetId,
java.sql.Date.valueOf(startDate), java.sql.Date.valueOf(endDate));
}
/**
* Hourly 테이블 조회
*/
private List<TrackResponse> queryHourlyTracks(
JdbcTemplate jdbcTemplate, String sigSrcCd, String targetId,
LocalDateTime startTime, LocalDateTime endTime) {
String sql = """
SELECT sig_src_cd, target_id, time_bucket,
public.ST_AsText(track_geom) as track_geom,
distance_nm, avg_speed, max_speed, point_count
FROM signal.t_vessel_tracks_hourly
WHERE sig_src_cd = ? AND target_id = ?
AND time_bucket >= ? AND time_bucket < ?
ORDER BY time_bucket
""";
return jdbcTemplate.query(sql, this::mapTrackResponse,
sigSrcCd, targetId,
Timestamp.valueOf(startTime), Timestamp.valueOf(endTime));
}
/**
* 5min 테이블 조회
*/
private List<TrackResponse> query5minTracks(
JdbcTemplate jdbcTemplate, String sigSrcCd, String targetId,
LocalDateTime startTime, LocalDateTime endTime) {
String sql = """
SELECT sig_src_cd, target_id, time_bucket,
public.ST_AsText(track_geom) as track_geom,
distance_nm, avg_speed, max_speed, point_count
FROM signal.t_vessel_tracks_5min
WHERE sig_src_cd = ? AND target_id = ?
AND time_bucket >= ? AND time_bucket < ?
ORDER BY time_bucket
""";
return jdbcTemplate.query(sql, this::mapTrackResponse,
sigSrcCd, targetId,
Timestamp.valueOf(startTime), Timestamp.valueOf(endTime));
}
/**
* 누락된 일자 감지
*/
private List<LocalDate> detectMissingDays(LocalDate start, LocalDate end, Set<LocalDate> coveredDays) {
List<LocalDate> missingDays = new ArrayList<>();
LocalDate current = start;
while (!current.isAfter(end)) {
if (!coveredDays.contains(current)) {
missingDays.add(current);
}
current = current.plusDays(1);
}
return missingDays;
}
/**
* 누락된 시간 감지
*/
private List<LocalDateTime> detectMissingHours(
LocalDateTime start, LocalDateTime end, Set<LocalDateTime> coveredHours) {
List<LocalDateTime> missingHours = new ArrayList<>();
LocalDateTime current = start.withMinute(0).withSecond(0).withNano(0);
while (current.isBefore(end)) {
if (!coveredHours.contains(current)) {
missingHours.add(current);
}
current = current.plusHours(1);
}
return missingHours;
}
/**
* 통합선박 기준 필터링 (REST API용)
*/

파일 보기

@ -140,11 +140,7 @@ public class TrackConverter {
}
}
if (allGeometry.isEmpty()) {
return null;
}
// 선박 정보 조회
// 선박 정보 조회 (geometry가 비어있어도 선박 객체는 생성)
VesselInfo vesselInfo = vesselInfoProvider != null
? vesselInfoProvider.getVesselInfo(sigSrcCd, targetId)
: new VesselInfo("-", "-");

파일 보기

@ -57,7 +57,7 @@ public class ChunkedTrackStreamingService {
@SuppressWarnings("unused")
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final int MAX_TRACKS_PER_CHUNK = 20000; // 청크당 최대 트랙 (버퍼 오버플로우 방지)
private static final int MAX_TRACKS_PER_CHUNK = 500000; // 청크당 최대 트랙 (10만 선박 지원)
private static final int MAX_MESSAGE_SIZE_KB = 1024; // 메시지당 최대 크기 1MB
private static final int MIN_MESSAGE_SIZE_KB = 256; // 최소 메시지 크기 256KB
private static final int DAILY_PAGE_SIZE = 15000; // Daily 테이블 페이지네이션용 (선박 기준)
@ -398,11 +398,70 @@ public class ChunkedTrackStreamingService {
}
}
/**
* 2-pass 뷰포트 필터링 - Pass 1: 뷰포트에 교차하는 선박 ID 수집
* 전체 쿼리 시간 범위에서 뷰포트 영역을 지나는 모든 선박을 식별하여,
* Pass 2에서 해당 선박의 전체 항적(뷰포트 포함) 조회할 있도록
*/
private Set<String> collectViewportVesselIds(Map<TableStrategy, List<TimeRange>> strategyMap, TrackQueryRequest request) {
ViewportFilter viewport = request.getViewport();
if (viewport == null || !viewport.isValid()) {
return null;
}
long startMs = System.currentTimeMillis();
Set<String> vesselIds = new HashSet<>();
for (Map.Entry<TableStrategy, List<TimeRange>> entry : strategyMap.entrySet()) {
String tableName = entry.getKey().getTableName();
for (TimeRange range : entry.getValue()) {
StringBuilder sql = new StringBuilder();
sql.append("SELECT DISTINCT sig_src_cd, target_id FROM ").append(tableName);
sql.append(" WHERE time_bucket >= ? AND time_bucket < ?");
sql.append(" AND public.ST_Intersects(track_geom, public.ST_MakeEnvelope(?, ?, ?, ?, 4326))");
if (request.getMinAvgSpeed() != null) {
sql.append(" AND avg_speed >= ").append(request.getMinAvgSpeed());
}
if (request.getMaxAvgSpeed() != null) {
sql.append(" AND avg_speed <= ").append(request.getMaxAvgSpeed());
}
try (Connection conn = queryDataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql.toString())) {
ps.setTimestamp(1, Timestamp.valueOf(range.getStart()));
ps.setTimestamp(2, Timestamp.valueOf(range.getEnd()));
ps.setDouble(3, viewport.getMinLon());
ps.setDouble(4, viewport.getMinLat());
ps.setDouble(5, viewport.getMaxLon());
ps.setDouble(6, viewport.getMaxLat());
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
vesselIds.add(rs.getString("sig_src_cd") + "_" + rs.getString("target_id"));
}
}
} catch (SQLException e) {
log.error("Error collecting viewport vessel IDs from {}: {}", tableName, e.getMessage());
}
}
}
long elapsed = System.currentTimeMillis() - startMs;
log.info("2-pass viewport filter Pass 1: {} unique vessels found across {} tables in {}ms",
vesselIds.size(), strategyMap.size(), elapsed);
return vesselIds;
}
private List<CompactVesselTrack> processTableRange(TrackQueryRequest request, TableStrategy strategy, TimeRange range) {
return processTableRange(request, strategy, range, null);
return processTableRange(request, strategy, range, null, null);
}
private List<CompactVesselTrack> processTableRange(TrackQueryRequest request, TableStrategy strategy, TimeRange range, String sessionId) {
return processTableRange(request, strategy, range, sessionId, null);
}
private List<CompactVesselTrack> processTableRange(TrackQueryRequest request, TableStrategy strategy, TimeRange range, String sessionId, Set<String> viewportVesselIds) {
Map<String, VesselAccumulator> vesselMap = new HashMap<>(20000); // 예상 선박
String tableName = strategy.getTableName();
@ -413,7 +472,7 @@ public class ChunkedTrackStreamingService {
log.info("Using {} table with simplification level {} for range [{} - {}]",
strategy, simplificationLevel, range.getStart(), range.getEnd());
String sql = buildRangeQuery(tableName, request, range, simplificationLevel);
String sql = buildRangeQuery(tableName, request, range, simplificationLevel, viewportVesselIds);
// Connection을 직접 사용하여 스트리밍 처리
try (Connection conn = queryDataSource.getConnection();
@ -424,7 +483,11 @@ public class ChunkedTrackStreamingService {
ps.setTimestamp(paramIndex++, Timestamp.valueOf(range.getStart()));
ps.setTimestamp(paramIndex++, Timestamp.valueOf(range.getEnd()));
if (request.getViewport() != null) {
// 2-pass 뷰포트 필터: vessel ID 배열 또는 기존 viewport 좌표 바인딩
if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) {
Array vesselArray = conn.createArrayOf("text", viewportVesselIds.toArray(new String[0]));
ps.setArray(paramIndex++, vesselArray);
} else if (request.getViewport() != null) {
ViewportFilter vp = request.getViewport();
ps.setDouble(paramIndex++, vp.getMinLon());
ps.setDouble(paramIndex++, vp.getMinLat());
@ -467,12 +530,34 @@ public class ChunkedTrackStreamingService {
// 5min 테이블은 필드가 없을 있음
}
// 선박 객체는 geometry가 비어있어도 생성 (선박 누락 방지)
// 먼저 선박 객체 확보
VesselAccumulator accumulator = vesselMap.get(vesselId);
if (accumulator == null) {
vesselCount++; // 선박 추가 카운트
accumulator = new VesselAccumulator();
accumulator.sigSrcCd = sigSrcCd;
accumulator.targetId = targetId;
// 선박 정보 조회 (캐시 우선)
VesselInfo vesselInfo = getVesselInfo(sigSrcCd, targetId);
accumulator.shipName = vesselInfo.shipName;
accumulator.shipType = vesselInfo.shipType;
// shipKindCode 계산 (선박명 패턴 매칭 포함 - 어망/부이 판별)
accumulator.shipKindCode = ShipKindCodeConverter.getShipKindCodeWithNamePattern(
sigSrcCd, vesselInfo.shipType, vesselInfo.shipName, targetId);
vesselMap.put(vesselId, accumulator);
}
if (trackGeomWkt != null && !trackGeomWkt.isEmpty() && !"LINESTRING EMPTY".equals(trackGeomWkt)) {
try {
LineString lineString = (LineString) wktReader.read(trackGeomWkt);
// LineString 처리
// LineString 건너뛰되, 선박 객체는 이미 생성됨
if (lineString.getNumPoints() == 0) {
trackCount++;
continue;
}
@ -522,32 +607,6 @@ public class ChunkedTrackStreamingService {
prevTimeMillis = currentTimeMillis;
}
// 선박별 데이터 병합
VesselAccumulator accumulator = vesselMap.get(vesselId);
if (accumulator == null) {
vesselCount++; // 선박 추가 카운트
accumulator = new VesselAccumulator();
accumulator.sigSrcCd = sigSrcCd;
accumulator.targetId = targetId;
// 선박 정보 조회 (캐시 우선)
VesselInfo vesselInfo = getVesselInfo(sigSrcCd, targetId);
accumulator.shipName = vesselInfo.shipName;
accumulator.shipType = vesselInfo.shipType;
// shipKindCode 계산 (선박명 패턴 매칭 포함 - 어망/부이 판별)
accumulator.shipKindCode = ShipKindCodeConverter.getShipKindCodeWithNamePattern(
sigSrcCd, vesselInfo.shipType, vesselInfo.shipName, targetId);
// 테스트용 로그 - 처음 10개 선박만
// if (vesselCount <= 10) {
// log.info("[VESSEL_INFO] {} - Name: {}, Type: {}",
// vesselId, vesselInfo.shipName, vesselInfo.shipType);
// }
vesselMap.put(vesselId, accumulator);
}
// 데이터 직접 추가 (성능 개선)
accumulator.geometry.addAll(geometry);
accumulator.timestamps.addAll(timestamps);
@ -649,6 +708,9 @@ public class ChunkedTrackStreamingService {
log.info("Query {} using strategies: {}", queryId, strategyMap.keySet());
// 2-pass 뷰포트 필터링: 전체 시간 범위에서 뷰포트 교차 선박 수집
Set<String> viewportVesselIds = collectViewportVesselIds(strategyMap, request);
List<TrackChunkResponse> responses = new ArrayList<>();
int globalChunkIndex = 0;
Set<String> uniqueVesselIds = new HashSet<>();
@ -665,7 +727,7 @@ public class ChunkedTrackStreamingService {
for (TimeRange range : ranges) {
try {
// 선박 기준 페이지네이션으로 모든 데이터 수집 병합
List<CompactVesselTrack> compactTracks = processDailyTableWithPagination(request, range, null);
List<CompactVesselTrack> compactTracks = processDailyTableWithPagination(request, range, null, viewportVesselIds);
if (!compactTracks.isEmpty()) {
// 메시지 크기로 분할
@ -704,7 +766,7 @@ public class ChunkedTrackStreamingService {
for (TimeChunk chunk : chunks) {
try {
List<CompactVesselTrack> compactTracks = processTableRange(request, strategy,
new TimeRange(chunk.start, chunk.end));
new TimeRange(chunk.start, chunk.end), null, viewportVesselIds);
if (!compactTracks.isEmpty()) {
// 메시지 크기로 분할 (5min/hourly에도 적용)
@ -797,6 +859,9 @@ public class ChunkedTrackStreamingService {
estimatedTotalMinutes = (int)Duration.between(request.getStartTime(), request.getEndTime()).toMinutes();
log.info("Total time range: {} minutes", estimatedTotalMinutes);
// 2-pass 뷰포트 필터링: 전체 시간 범위에서 뷰포트 교차 선박 수집
Set<String> viewportVesselIds = collectViewportVesselIds(strategyMap, request);
int globalChunkIndex = 0;
int totalVessels = 0;
Set<String> uniqueVesselIds = new HashSet<>();
@ -811,7 +876,7 @@ public class ChunkedTrackStreamingService {
if (strategy == TableStrategy.DAILY) {
// Daily는 기존 방식 유지 (이미 단위)
processDailyStrategy(ranges, request, queryId, chunkConsumer, statusConsumer,
globalChunkIndex, uniqueVesselIds);
globalChunkIndex, uniqueVesselIds, viewportVesselIds);
globalChunkIndex = getCurrentChunkIndex();
} else {
// Hourly/5min은 6시간 단위로 그룹화하여 처리
@ -834,7 +899,7 @@ public class ChunkedTrackStreamingService {
}
List<CompactVesselTrack> compactTracks = processTableRangeWithBaseTime(
request, strategy, range, baseTime);
request, strategy, range, baseTime, viewportVesselIds);
// 선박별로 볕합
for (CompactVesselTrack track : compactTracks) {
@ -1533,9 +1598,10 @@ public class ChunkedTrackStreamingService {
}
/**
* 쿼리 생성 (간소화 적용)
* 쿼리 생성 (간소화 적용, 2-pass 뷰포트 필터링 지원)
*/
private String buildRangeQuery(String tableName, TrackQueryRequest request, TimeRange range, SimplificationLevel simplificationLevel) {
private String buildRangeQuery(String tableName, TrackQueryRequest request, TimeRange range,
SimplificationLevel simplificationLevel, Set<String> viewportVesselIds) {
StringBuilder sql = new StringBuilder();
sql.append("SELECT sig_src_cd, target_id, time_bucket, ");
@ -1562,8 +1628,10 @@ public class ChunkedTrackStreamingService {
sql.append("WHERE time_bucket >= ? ");
sql.append("AND time_bucket < ? ");
// Viewport 필터 - 파라미터 바인딩 사용
if (request.getViewport() != null) {
// 2-pass 뷰포트 필터: vessel ID 기반 필터 (Pass 2) 또는 기존 viewport 필터
if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) {
sql.append("AND sig_src_cd || '_' || target_id = ANY(?) ");
} else if (request.getViewport() != null) {
sql.append("AND public.ST_Intersects(track_geom, public.ST_MakeEnvelope(?, ?, ?, ?, 4326)) ");
}
@ -1586,7 +1654,7 @@ public class ChunkedTrackStreamingService {
* Daily 테이블용 페이지네이션 쿼리 생성
*/
private String buildDailyPaginationQuery(String tableName, TrackQueryRequest request, TimeRange range,
double tolerance, String lastSigSrcCd, String lastTargetId) {
double tolerance, String lastSigSrcCd, String lastTargetId, Set<String> viewportVesselIds) {
StringBuilder sql = new StringBuilder();
sql.append("SELECT sig_src_cd, target_id, time_bucket, ");
@ -1611,8 +1679,10 @@ public class ChunkedTrackStreamingService {
sql.append("AND (sig_src_cd, target_id) > (?, ?) ");
}
// Viewport 필터
if (request.getViewport() != null) {
// 2-pass 뷰포트 필터: vessel ID 기반 필터 (Pass 2) 또는 기존 viewport 필터
if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) {
sql.append("AND sig_src_cd || '_' || target_id = ANY(?) ");
} else if (request.getViewport() != null) {
sql.append("AND public.ST_Intersects(track_geom, public.ST_MakeEnvelope(?, ?, ?, ?, 4326)) ");
}
@ -1634,6 +1704,10 @@ public class ChunkedTrackStreamingService {
* Daily 테이블 처리 - 선박 기준 페이지네이션으로 모든 데이터 수집 병합
*/
private List<CompactVesselTrack> processDailyTableWithPagination(TrackQueryRequest request, TimeRange range, String sessionId) {
return processDailyTableWithPagination(request, range, sessionId, null);
}
private List<CompactVesselTrack> processDailyTableWithPagination(TrackQueryRequest request, TimeRange range, String sessionId, Set<String> viewportVesselIds) {
Map<String, VesselAccumulator> vesselMap = new HashMap<>(50000); // 예상 선박
String tableName = TableStrategy.DAILY.getTableName();
@ -1649,7 +1723,7 @@ public class ChunkedTrackStreamingService {
// 페이지네이션 루프
while (true) {
String sql = buildDailyPaginationQuery(tableName, request, range, tolerance, lastSigSrcCd, lastTargetId);
String sql = buildDailyPaginationQuery(tableName, request, range, tolerance, lastSigSrcCd, lastTargetId, viewportVesselIds);
try (Connection conn = queryDataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
@ -1664,8 +1738,11 @@ public class ChunkedTrackStreamingService {
ps.setString(paramIndex++, lastTargetId);
}
// Viewport 필터 파라미터
if (request.getViewport() != null) {
// 2-pass 뷰포트 필터: vessel ID 배열 또는 기존 viewport 좌표 바인딩
if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) {
Array vesselArray = conn.createArrayOf("text", viewportVesselIds.toArray(new String[0]));
ps.setArray(paramIndex++, vesselArray);
} else if (request.getViewport() != null) {
ViewportFilter vp = request.getViewport();
ps.setDouble(paramIndex++, vp.getMinLon());
ps.setDouble(paramIndex++, vp.getMinLat());
@ -1705,10 +1782,27 @@ public class ChunkedTrackStreamingService {
endTimeStr = rs.getString("end_time");
} catch (SQLException ignored) {}
// 선박 객체는 geometry가 비어있어도 생성 (선박 누락 방지)
final String finalSigSrcCd = currentSigSrcCd;
final String finalTargetId = currentTargetId;
VesselAccumulator accumulator = vesselMap.computeIfAbsent(vesselId, k -> {
VesselInfo info = getVesselInfo(finalSigSrcCd, finalTargetId);
VesselAccumulator acc = new VesselAccumulator();
acc.sigSrcCd = finalSigSrcCd;
acc.targetId = finalTargetId;
acc.shipName = info.shipName;
acc.shipType = info.shipType;
// shipKindCode 계산 (선박명 패턴 매칭 포함 - 어망/부이 판별)
acc.shipKindCode = ShipKindCodeConverter.getShipKindCodeWithNamePattern(
finalSigSrcCd, info.shipType, info.shipName, finalTargetId);
return acc;
});
if (trackGeomWkt != null && !trackGeomWkt.isEmpty() && !"LINESTRING EMPTY".equals(trackGeomWkt)) {
try {
LineString lineString = (LineString) wktReader.read(trackGeomWkt);
// LineString은 건너뛰되, 선박 객체는 이미 생성됨
if (lineString.getNumPoints() == 0) {
pageTrackCount++;
continue;
@ -1754,22 +1848,6 @@ public class ChunkedTrackStreamingService {
prevTimeMillis = currentTimeMillis;
}
// VesselAccumulator에 누적
final String finalSigSrcCd = currentSigSrcCd;
final String finalTargetId = currentTargetId;
VesselAccumulator accumulator = vesselMap.computeIfAbsent(vesselId, k -> {
VesselInfo info = getVesselInfo(finalSigSrcCd, finalTargetId);
VesselAccumulator acc = new VesselAccumulator();
acc.sigSrcCd = finalSigSrcCd;
acc.targetId = finalTargetId;
acc.shipName = info.shipName;
acc.shipType = info.shipType;
// shipKindCode 계산 (선박명 패턴 매칭 포함 - 어망/부이 판별)
acc.shipKindCode = ShipKindCodeConverter.getShipKindCodeWithNamePattern(
finalSigSrcCd, info.shipType, info.shipName, finalTargetId);
return acc;
});
accumulator.geometry.addAll(geometry);
accumulator.timestamps.addAll(timestamps);
accumulator.speeds.addAll(speeds);
@ -2068,10 +2146,11 @@ public class ChunkedTrackStreamingService {
* 기준 시간을 사용하여 테이블 범위 처리 (M값 재계산)
*/
private List<CompactVesselTrack> processTableRangeWithBaseTime(
TrackQueryRequest request, TableStrategy strategy, TimeRange range, LocalDateTime dayBaseTime) {
TrackQueryRequest request, TableStrategy strategy, TimeRange range, LocalDateTime dayBaseTime,
Set<String> viewportVesselIds) {
// 기본 처리 M값 보정
List<CompactVesselTrack> tracks = processTableRange(request, strategy, range);
List<CompactVesselTrack> tracks = processTableRange(request, strategy, range, null, viewportVesselIds);
// 트랙의 timestamp를 dayBaseTime 기준으로 재계산 간소화
for (CompactVesselTrack track : tracks) {
@ -2215,7 +2294,8 @@ public class ChunkedTrackStreamingService {
Consumer<TrackChunkResponse> chunkConsumer,
Consumer<QueryStatusUpdate> statusConsumer,
int startChunkIndex,
Set<String> uniqueVesselIds) throws Exception {
Set<String> uniqueVesselIds,
Set<String> viewportVesselIds) throws Exception {
currentGlobalChunkIndex = startChunkIndex;
@ -2225,7 +2305,7 @@ public class ChunkedTrackStreamingService {
for (TimeRange range : ranges) {
// 페이지별 즉시 스트리밍으로 처리 (세션 캐시 공유)
streamDailyTableWithPagination(request, range, queryId, chunkConsumer, statusConsumer, uniqueVesselIds, sessionVesselCache);
streamDailyTableWithPagination(request, range, queryId, chunkConsumer, statusConsumer, uniqueVesselIds, sessionVesselCache, viewportVesselIds);
}
log.info("Session vessel cache final size: {} vessels cached", sessionVesselCache.size());
@ -2238,7 +2318,8 @@ public class ChunkedTrackStreamingService {
Consumer<TrackChunkResponse> chunkConsumer,
Consumer<QueryStatusUpdate> statusConsumer,
Set<String> uniqueVesselIds,
Map<String, VesselInfo> sessionVesselCache) {
Map<String, VesselInfo> sessionVesselCache,
Set<String> viewportVesselIds) {
String tableName = TableStrategy.DAILY.getTableName();
// 레벨에 따른 강화된 간소화 tolerance
@ -2265,7 +2346,7 @@ public class ChunkedTrackStreamingService {
}
Map<String, VesselAccumulator> pageVesselMap = new HashMap<>(20000);
String sql = buildDailyPaginationQuery(tableName, request, range, tolerance, lastSigSrcCd, lastTargetId);
String sql = buildDailyPaginationQuery(tableName, request, range, tolerance, lastSigSrcCd, lastTargetId, viewportVesselIds);
try (Connection conn = queryDataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
@ -2280,8 +2361,11 @@ public class ChunkedTrackStreamingService {
ps.setString(paramIndex++, lastTargetId);
}
// Viewport 필터 파라미터
if (request.getViewport() != null) {
// 2-pass 뷰포트 필터: vessel ID 배열 또는 기존 viewport 좌표 바인딩
if (viewportVesselIds != null && !viewportVesselIds.isEmpty()) {
Array vesselArray = conn.createArrayOf("text", viewportVesselIds.toArray(new String[0]));
ps.setArray(paramIndex++, vesselArray);
} else if (request.getViewport() != null) {
ViewportFilter vp = request.getViewport();
ps.setDouble(paramIndex++, vp.getMinLon());
ps.setDouble(paramIndex++, vp.getMinLat());
@ -2355,10 +2439,32 @@ public class ChunkedTrackStreamingService {
double distanceNm = Double.parseDouble(trackData[6]);
double maxSpeed = Double.parseDouble(trackData[7]);
// 선박 객체는 geometry가 비어있어도 생성 (선박 누락 방지)
final String finalSigSrcCd = currentSigSrcCd;
final String finalTargetId = currentTargetId;
final String finalVesselId = vesselId;
VesselAccumulator accumulator = pageVesselMap.computeIfAbsent(vesselId, k -> {
// 세션 캐시에서 조회 (이미 preload됨)
VesselInfo info = sessionVesselCache.get(finalVesselId);
if (info == null) {
info = new VesselInfo(null, null);
}
VesselAccumulator acc = new VesselAccumulator();
acc.sigSrcCd = finalSigSrcCd;
acc.targetId = finalTargetId;
acc.shipName = info.shipName;
acc.shipType = info.shipType;
// shipKindCode 계산 (선박명 패턴 매칭 포함 - 어망/부이 판별)
acc.shipKindCode = ShipKindCodeConverter.getShipKindCodeWithNamePattern(
finalSigSrcCd, info.shipType, info.shipName, finalTargetId);
return acc;
});
if (trackGeomWkt != null && !trackGeomWkt.isEmpty() && !"LINESTRING EMPTY".equals(trackGeomWkt)) {
try {
LineString lineString = (LineString) wktReader.read(trackGeomWkt);
// LineString은 건너뛰되, 선박 객체는 이미 생성됨
if (lineString.getNumPoints() == 0) {
pageTrackCount++;
continue;
@ -2395,27 +2501,6 @@ public class ChunkedTrackStreamingService {
prevTimeMillis = currentTimeMillis;
}
// VesselAccumulator에 누적 (세션 캐시에서 조회 - 이미 배치로 로드됨)
final String finalSigSrcCd = currentSigSrcCd;
final String finalTargetId = currentTargetId;
final String finalVesselId = vesselId;
VesselAccumulator accumulator = pageVesselMap.computeIfAbsent(vesselId, k -> {
// 세션 캐시에서 조회 (이미 preload됨)
VesselInfo info = sessionVesselCache.get(finalVesselId);
if (info == null) {
info = new VesselInfo(null, null);
}
VesselAccumulator acc = new VesselAccumulator();
acc.sigSrcCd = finalSigSrcCd;
acc.targetId = finalTargetId;
acc.shipName = info.shipName;
acc.shipType = info.shipType;
// shipKindCode 계산 (선박명 패턴 매칭 포함 - 어망/부이 판별)
acc.shipKindCode = ShipKindCodeConverter.getShipKindCodeWithNamePattern(
finalSigSrcCd, info.shipType, info.shipName, finalTargetId);
return acc;
});
accumulator.geometry.addAll(geometry);
accumulator.timestamps.addAll(timestamps);
accumulator.speeds.addAll(speeds);

파일 보기

@ -32,42 +32,39 @@ spring:
datasource:
# 수집 DB
collect:
url: ${COLLECT_DB_URL:jdbc:postgresql://localhost:5432/wingdb?stringtype=unspecified&currentSchema=signal&TimeZone=Asia/Seoul}
jdbc-url: ${COLLECT_DB_URL:jdbc:postgresql://localhost:5432/wingdb?stringtype=unspecified&currentSchema=signal&TimeZone=Asia/Seoul}
username: ${COLLECT_DB_USER:collect_user}
password: ${COLLECT_DB_PASS:collect_pass}
driver-class-name: org.postgresql.Driver
hikari:
pool-name: CollectHikariPool
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
maximum-pool-size: 20
minimum-idle: 5
pool-name: CollectHikariPool
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
maximum-pool-size: 20
minimum-idle: 5
# 조회 DB
query:
url: ${QUERY_DB_URL:jdbc:postgresql://localhost:5432/wingdb?stringtype=unspecified&currentSchema=signal&TimeZone=Asia/Seoul}
jdbc-url: ${QUERY_DB_URL:jdbc:postgresql://localhost:5432/wingdb?stringtype=unspecified&currentSchema=signal&TimeZone=Asia/Seoul}
username: ${QUERY_DB_USER:query_user}
password: ${QUERY_DB_PASS:query_pass}
driver-class-name: org.postgresql.Driver
hikari:
pool-name: QueryHikariPool
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
maximum-pool-size: 30
minimum-idle: 10
pool-name: QueryHikariPool
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
maximum-pool-size: 30
minimum-idle: 10
# 배치 메타데이터 DB
batch:
url: ${BATCH_DB_URL:jdbc:postgresql://localhost:5432/wingdb?stringtype=unspecified&currentSchema=signal&TimeZone=Asia/Seoul}
jdbc-url: ${BATCH_DB_URL:jdbc:postgresql://localhost:5432/wingdb?stringtype=unspecified&currentSchema=signal&TimeZone=Asia/Seoul}
username: ${BATCH_DB_USER:batch_user}
password: ${BATCH_DB_PASS:batch_pass}
driver-class-name: org.postgresql.Driver
hikari:
pool-name: BatchHikariPool
maximum-pool-size: 10
minimum-idle: 2
pool-name: BatchHikariPool
maximum-pool-size: 10
minimum-idle: 2
# 로깅 설정
logging:
@ -84,8 +81,10 @@ logging:
file: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
file:
name: ${LOG_PATH:logs}/vessel-batch.log
max-size: 100MB
max-history: 30
logback:
rollingpolicy:
max-file-size: 100MB
max-history: 30
# 액추에이터 설정
management: