Merge pull request 'feat: 다중구역/STS API 최적화 + ChnPrmShip 전용 필터' (#103) from feature/multi-zone-optimization into develop

This commit is contained in:
htlee 2026-03-13 10:18:06 +09:00
커밋 f405149340
17개의 변경된 파일461개의 추가작업 그리고 297개의 파일을 삭제

파일 보기

@ -4,6 +4,13 @@
## [Unreleased]
### 추가
- 다중구역/STS API 최적화 — AreaSearch/VesselContact 동시성·메모리 관리 통합, 순차 통과 SQL 동적 N-구역(2~10) 확장, chnPrmShipOnly 파라미터 추가
### 변경
- 성능 최적화 — ArrayList 사전 할당, JTS Coordinate 재사용, equirectangular 거리 근사, stream→단일 루프 전환
- DataPipeline 대시보드 차트 시각화 개선
## [2026-03-10.2]
### 추가

파일 보기

@ -207,6 +207,7 @@ export interface QueryMetricRow {
simplify_ms: number
cache_hit_days: number
db_query_days: number
client_ip: string | null
}
export interface QueryMetricsPage {

파일 보기

@ -198,6 +198,8 @@ const en = {
'metrics.allTypes': 'All',
'metrics.allPaths': 'All',
'metrics.resetFilters': 'Reset Filters',
'metrics.responseSize': 'Size',
'metrics.clientIp': 'IP',
// Time Range
'range.1d': '1D',

파일 보기

@ -198,6 +198,8 @@ const ko = {
'metrics.allTypes': '전체',
'metrics.allPaths': '전체',
'metrics.resetFilters': '필터 초기화',
'metrics.responseSize': '응답 크기',
'metrics.clientIp': 'IP',
// Time Range
'range.1d': '1일',

파일 보기

@ -6,7 +6,7 @@ import { monitorApi } from '../api/monitorApi.ts'
import type { MetricsSummary, CacheStats, ProcessingDelay, CacheDetails, QueryMetricsPage, QueryMetricsSummary, QueryMetricsParams, QueryMetricRow } from '../api/types.ts'
import MetricCard from '../components/charts/MetricCard.tsx'
import DataTable, { type Column } from '../components/common/DataTable.tsx'
import { formatNumber } from '../utils/formatters.ts'
import { formatNumber, formatBytes } from '../utils/formatters.ts'
const POLL_INTERVAL = 10_000
const QUERY_POLL_INTERVAL = 30_000
@ -68,8 +68,16 @@ export default function ApiMetrics() {
{
key: 'created_at', label: t('metrics.queryTime'), sortable: false,
render: (row) => {
const ts = row.created_at ?? ''
return ts.length >= 19 ? ts.substring(5, 19) : ts
if (!row.created_at) return '-'
const d = new Date(row.created_at)
// UTC → KST (+9h)
const kst = new Date(d.getTime() + 9 * 60 * 60 * 1000)
const mm = String(kst.getUTCMonth() + 1).padStart(2, '0')
const dd = String(kst.getUTCDate()).padStart(2, '0')
const hh = String(kst.getUTCHours()).padStart(2, '0')
const mi = String(kst.getUTCMinutes()).padStart(2, '0')
const ss = String(kst.getUTCSeconds()).padStart(2, '0')
return `${mm}-${dd} ${hh}:${mi}:${ss}`
},
},
{
@ -120,6 +128,14 @@ export default function ApiMetrics() {
return <span className={`font-mono font-medium ${color}`}>{ms < 1000 ? `${ms}ms` : `${(ms / 1000).toFixed(1)}s`}</span>
},
},
{
key: 'response_bytes', label: t('metrics.responseSize'), align: 'right' as const, sortable: false,
render: (row) => row.response_bytes ? formatBytes(row.response_bytes) : '-',
},
{
key: 'client_ip', label: t('metrics.clientIp'), sortable: false,
render: (row) => row.client_ip ? <span className="font-mono text-xs">{row.client_ip}</span> : '-',
},
]
return (

파일 보기

@ -46,7 +46,7 @@ public class AisTargetCacheManager {
@Value("${app.cache.ais-target.ttl-minutes:120}")
private long ttlMinutes;
@Value("${app.cache.ais-target.max-size:300000}")
@Value("${app.cache.ais-target.max-size:500000}")
private int maxSize;
@PostConstruct

파일 보기

@ -6,6 +6,7 @@ import gc.mda.signal_batch.domain.gis.dto.VesselContactRequest;
import gc.mda.signal_batch.domain.gis.dto.VesselContactResponse;
import gc.mda.signal_batch.domain.gis.service.AreaSearchService;
import gc.mda.signal_batch.domain.gis.service.VesselContactService;
import gc.mda.signal_batch.global.exception.QueryTimeoutException;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.ExampleObject;
@ -219,4 +220,11 @@ public class AreaSearchController {
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
.body(Map.of("error", e.getMessage()));
}
@ExceptionHandler(QueryTimeoutException.class)
public ResponseEntity<Map<String, String>> handleQueryTimeout(QueryTimeoutException e) {
log.warn("Area search query timeout: {}", e.getMessage());
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
.body(Map.of("error", e.getMessage()));
}
}

파일 보기

@ -42,6 +42,10 @@ public class AreaSearchRequest {
@Schema(description = "탐색 대상 폴리곤 영역 목록 (1~10개)", requiredMode = Schema.RequiredMode.REQUIRED)
private List<SearchPolygon> polygons;
@Schema(description = "true 시 중국허가선박(~1,400척)만 분석 대상으로 필터링", example = "false")
@Builder.Default
private boolean chnPrmShipOnly = false;
@Schema(description = "검색 모드 (폴리곤이 2개 이상일 때 적용)")
public enum SearchMode {
@Schema(description = "합집합: 어느 한 영역이라도 통과한 선박")

파일 보기

@ -47,6 +47,10 @@ public class VesselContactRequest {
@Schema(description = "최대 접촉 판정 거리 (미터, 50~5000)", example = "1000", requiredMode = Schema.RequiredMode.REQUIRED)
private Double maxContactDistanceMeters;
@Schema(description = "true 시 중국허가선박만 대상으로 접촉 분석", example = "false")
@Builder.Default
private boolean chnPrmShipOnly = false;
@Data
@Builder
@NoArgsConstructor

파일 보기

@ -16,10 +16,10 @@ import java.util.List;
@Schema(description = "비정상 접촉 선박 탐색 응답")
public class VesselContactResponse {
@Schema(description = "접촉 선박 쌍 목록")
@Schema(description = "접촉 선박 쌍 목록 — 동일 선박 쌍이 시간 갭(20분 이상)으로 분리된 여러 접촉 세그먼트를 가질 수 있음")
private List<VesselContactPair> contacts;
@Schema(description = "관련 선박의 전체 기간 항적 (CompactVesselTrack)")
@Schema(description = "관련 선박의 전체 기간 항적 — 선박당 1건으로 중복 제거됨 (CompactVesselTrack)")
private List<CompactVesselTrack> tracks;
@Schema(description = "탐색 요약 정보")

파일 보기

@ -6,9 +6,14 @@ import gc.mda.signal_batch.domain.gis.dto.AreaSearchRequest.SearchPolygon;
import gc.mda.signal_batch.domain.gis.dto.AreaSearchResponse;
import gc.mda.signal_batch.domain.gis.dto.AreaSearchResponse.AreaSearchSummary;
import gc.mda.signal_batch.domain.gis.dto.AreaSearchResponse.PolygonHitDetail;
import gc.mda.signal_batch.batch.reader.ChnPrmShipProperties;
import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack;
import gc.mda.signal_batch.global.exception.QueryTimeoutException;
import gc.mda.signal_batch.global.util.TrackMemoryEstimator;
import gc.mda.signal_batch.global.websocket.service.ActiveQueryManager;
import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager;
import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager.DailyTrackData;
import gc.mda.signal_batch.global.websocket.service.TrackMemoryBudgetManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.locationtech.jts.geom.*;
@ -28,6 +33,9 @@ import java.util.stream.Collectors;
public class AreaSearchService {
private final DailyTrackCacheManager cacheManager;
private final ActiveQueryManager activeQueryManager;
private final TrackMemoryBudgetManager memoryBudgetManager;
private final ChnPrmShipProperties chnPrmShipProperties;
private static final GeometryFactory GEOMETRY_FACTORY = new GeometryFactory();
/**
@ -45,82 +53,115 @@ public class AreaSearchService {
return buildEmptyResponse(request, startMs);
}
// 3. 다일 데이터 선박별 단일 트랙 병합
Map<String, CompactVesselTrack> mergedTracks = mergeMultipleDays(targetDates);
if (mergedTracks.isEmpty()) {
return buildEmptyResponse(request, startMs);
// 3. 동시성·메모리 관리 (데이터 로딩 슬롯/예산 확보)
String queryId = "area-search-" + Long.toHexString(System.nanoTime());
boolean slotAcquired = false, memoryReserved = false;
try {
if (!activeQueryManager.tryAcquireQuerySlotImmediate(queryId)) {
if (!activeQueryManager.tryAcquireQuerySlot(queryId)) {
throw new QueryTimeoutException("서버 과부하: area-search 슬롯 대기 타임아웃");
}
}
slotAcquired = true;
long estimatedBytes = TrackMemoryEstimator.estimateQueryBytes(targetDates.size(), 2000);
memoryBudgetManager.reserveQueryMemory(queryId, estimatedBytes, 30_000L);
memoryReserved = true;
// 4. 다일 데이터 선박별 단일 트랙 병합
Map<String, CompactVesselTrack> mergedTracks = mergeMultipleDays(targetDates);
if (mergedTracks.isEmpty()) {
return buildEmptyResponse(request, startMs);
}
// 4-1. ChnPrmShip 필터링
if (request.isChnPrmShipOnly()) {
int totalBefore = mergedTracks.size();
Set<String> chnPrmMmsiSet = chnPrmShipProperties.getMmsiSet();
mergedTracks.entrySet().removeIf(e -> !chnPrmMmsiSet.contains(e.getKey()));
log.debug("ChnPrmShip 필터 적용: {} → {} 선박", totalBefore, mergedTracks.size());
if (mergedTracks.isEmpty()) {
return buildEmptyResponse(request, startMs);
}
}
// 5. 좌표 JTS Polygon 변환
List<Polygon> jtsPolygons = convertToJtsPolygons(request.getPolygons());
// 6. 병합된 트랙으로 STRtree 빌드
STRtree spatialIndex = buildSpatialIndex(mergedTracks);
// 7. 폴리곤별 히트 선박 + 개별 방문(trip) 수집
List<Map<String, List<PolygonHitDetail>>> perPolygonHits = new ArrayList<>();
for (int i = 0; i < jtsPolygons.size(); i++) {
Polygon polygon = jtsPolygons.get(i);
SearchPolygon searchPolygon = request.getPolygons().get(i);
Map<String, List<PolygonHitDetail>> hits = findHitsForPolygon(
polygon, searchPolygon, mergedTracks, spatialIndex);
perPolygonHits.add(hits);
}
// 8. 모드별 결과 합산
SearchMode mode = request.getPolygons().size() == 1 ? SearchMode.ANY : request.getMode();
Map<String, List<PolygonHitDetail>> resultHits;
switch (mode) {
case ALL:
resultHits = processAllMode(perPolygonHits);
break;
case SEQUENTIAL:
resultHits = processSequentialMode(perPolygonHits);
break;
default:
resultHits = processAnyMode(perPolygonHits);
break;
}
// 9. 결과 선박의 전체 기간 트랙 + 히트 메타 반환
List<CompactVesselTrack> resultTracks = resultHits.keySet().stream()
.map(mergedTracks::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
long totalPoints = resultHits.values().stream()
.flatMap(Collection::stream)
.mapToLong(h -> h.getHitPointCount() != null ? h.getHitPointCount() : 0)
.sum();
int totalCachedVessels = targetDates.stream()
.mapToInt(d -> {
DailyTrackData data = cacheManager.getDailyTrackData(d);
return data != null ? data.getVesselCount() : 0;
})
.sum();
long elapsedMs = System.currentTimeMillis() - startMs;
log.info("Area search completed: mode={}, polygons={}, hitVessels={}, totalPoints={}, chnPrmOnly={}, elapsed={}ms",
mode, request.getPolygons().size(), resultHits.size(), totalPoints, request.isChnPrmShipOnly(), elapsedMs);
return AreaSearchResponse.builder()
.tracks(resultTracks)
.hitDetails(resultHits)
.summary(AreaSearchSummary.builder()
.totalVessels(resultHits.size())
.totalPoints(totalPoints)
.mode(mode)
.polygonIds(request.getPolygons().stream()
.map(SearchPolygon::getId)
.collect(Collectors.toList()))
.processingTimeMs(elapsedMs)
.cachedDates(targetDates.stream()
.map(LocalDate::toString)
.collect(Collectors.toList()))
.totalCachedVessels(totalCachedVessels)
.build())
.build();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new QueryTimeoutException("area-search 슬롯 대기 중 인터럽트");
} finally {
if (memoryReserved) memoryBudgetManager.releaseQueryMemory(queryId);
if (slotAcquired) activeQueryManager.releaseQuerySlot(queryId);
}
// 4. 좌표 JTS Polygon 변환
List<Polygon> jtsPolygons = convertToJtsPolygons(request.getPolygons());
// 5. 병합된 트랙으로 STRtree 빌드
STRtree spatialIndex = buildSpatialIndex(mergedTracks);
// 6. 폴리곤별 히트 선박 + 개별 방문(trip) 수집
List<Map<String, List<PolygonHitDetail>>> perPolygonHits = new ArrayList<>();
for (int i = 0; i < jtsPolygons.size(); i++) {
Polygon polygon = jtsPolygons.get(i);
SearchPolygon searchPolygon = request.getPolygons().get(i);
Map<String, List<PolygonHitDetail>> hits = findHitsForPolygon(
polygon, searchPolygon, mergedTracks, spatialIndex);
perPolygonHits.add(hits);
}
// 7. 모드별 결과 합산
SearchMode mode = request.getPolygons().size() == 1 ? SearchMode.ANY : request.getMode();
Map<String, List<PolygonHitDetail>> resultHits;
switch (mode) {
case ALL:
resultHits = processAllMode(perPolygonHits);
break;
case SEQUENTIAL:
resultHits = processSequentialMode(perPolygonHits);
break;
default:
resultHits = processAnyMode(perPolygonHits);
break;
}
// 8. 결과 선박의 전체 기간 트랙 + 히트 메타 반환
List<CompactVesselTrack> resultTracks = resultHits.keySet().stream()
.map(mergedTracks::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
long totalPoints = resultHits.values().stream()
.flatMap(Collection::stream)
.mapToLong(h -> h.getHitPointCount() != null ? h.getHitPointCount() : 0)
.sum();
int totalCachedVessels = targetDates.stream()
.mapToInt(d -> {
DailyTrackData data = cacheManager.getDailyTrackData(d);
return data != null ? data.getVesselCount() : 0;
})
.sum();
long elapsedMs = System.currentTimeMillis() - startMs;
log.info("Area search completed: mode={}, polygons={}, hitVessels={}, totalPoints={}, elapsed={}ms",
mode, request.getPolygons().size(), resultHits.size(), totalPoints, elapsedMs);
return AreaSearchResponse.builder()
.tracks(resultTracks)
.hitDetails(resultHits)
.summary(AreaSearchSummary.builder()
.totalVessels(resultHits.size())
.totalPoints(totalPoints)
.mode(mode)
.polygonIds(request.getPolygons().stream()
.map(SearchPolygon::getId)
.collect(Collectors.toList()))
.processingTimeMs(elapsedMs)
.cachedDates(targetDates.stream()
.map(LocalDate::toString)
.collect(Collectors.toList()))
.totalCachedVessels(totalCachedVessels)
.build())
.build();
}
// 입력 검증
@ -244,9 +285,11 @@ public class AreaSearchService {
// 여러 날짜 병합
CompactVesselTrack first = trackList.get(0);
List<double[]> geo = new ArrayList<>();
List<String> ts = new ArrayList<>();
List<Double> sp = new ArrayList<>();
int totalPoints = trackList.stream()
.mapToInt(t -> t.getPointCount() != null ? t.getPointCount() : 0).sum();
List<double[]> geo = new ArrayList<>(totalPoints);
List<String> ts = new ArrayList<>(totalPoints);
List<Double> sp = new ArrayList<>(totalPoints);
double totalDist = 0;
double maxSpeed = 0;
int pointCount = 0;
@ -347,10 +390,13 @@ public class AreaSearchService {
long currentExit = 0;
int currentHitCount = 0;
int visitIndex = 0;
Coordinate reusable = new Coordinate();
for (int i = 0; i < geometry.size(); i++) {
double[] coord = geometry.get(i);
Point point = GEOMETRY_FACTORY.createPoint(new Coordinate(coord[0], coord[1]));
reusable.x = coord[0];
reusable.y = coord[1];
Point point = GEOMETRY_FACTORY.createPoint(reusable);
boolean isInside = prepared.contains(point);
if (isInside) {

파일 보기

@ -1,10 +1,15 @@
package gc.mda.signal_batch.domain.gis.service;
import gc.mda.signal_batch.batch.reader.ChnPrmShipProperties;
import gc.mda.signal_batch.domain.gis.dto.VesselContactRequest;
import gc.mda.signal_batch.domain.gis.dto.VesselContactResponse;
import gc.mda.signal_batch.domain.gis.dto.VesselContactResponse.*;
import gc.mda.signal_batch.domain.vessel.dto.CompactVesselTrack;
import gc.mda.signal_batch.global.exception.QueryTimeoutException;
import gc.mda.signal_batch.global.util.TrackMemoryEstimator;
import gc.mda.signal_batch.global.websocket.service.ActiveQueryManager;
import gc.mda.signal_batch.global.websocket.service.DailyTrackCacheManager;
import gc.mda.signal_batch.global.websocket.service.TrackMemoryBudgetManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.locationtech.jts.geom.*;
@ -24,6 +29,9 @@ public class VesselContactService {
private final AreaSearchService areaSearchService;
private final DailyTrackCacheManager cacheManager;
private final ActiveQueryManager activeQueryManager;
private final TrackMemoryBudgetManager memoryBudgetManager;
private final ChnPrmShipProperties chnPrmShipProperties;
private static final GeometryFactory GEOMETRY_FACTORY = new GeometryFactory();
private static final double EARTH_RADIUS_M = 6_371_000.0;
@ -49,103 +57,133 @@ public class VesselContactService {
return buildEmptyResponse(request, targetDates, startMs);
}
Map<String, CompactVesselTrack> mergedTracks = areaSearchService.mergeMultipleDays(targetDates);
if (mergedTracks.isEmpty()) {
return buildEmptyResponse(request, targetDates, startMs);
}
// 3. 병합된 트랙을 직접 사용 (단일 수집원이므로 필터 불필요)
Map<String, CompactVesselTrack> filtered = mergedTracks;
// 4. JTS Polygon + PreparedGeometry
VesselContactRequest.SearchPolygon poly = request.getPolygon();
Polygon jtsPolygon = areaSearchService.toJtsPolygon(poly.getCoordinates());
PreparedGeometry prepared = PreparedGeometryFactory.prepare(jtsPolygon);
// 5. STRtree 후보 필터링 + 폴리곤 내부 포인트 수집
STRtree spatialIndex = areaSearchService.buildSpatialIndex(filtered);
Envelope mbr = jtsPolygon.getEnvelopeInternal();
@SuppressWarnings("unchecked")
List<String> candidates = spatialIndex.query(mbr);
long minDurationSec = request.getMinContactDurationMinutes() * 60L;
double maxDistanceMeters = request.getMaxContactDistanceMeters();
Map<String, List<InsidePosition>> insidePositions = new HashMap<>();
for (String vesselId : candidates) {
CompactVesselTrack track = filtered.get(vesselId);
if (track == null || track.getGeometry() == null) continue;
List<InsidePosition> inside = collectInsidePositions(track, prepared);
if (!inside.isEmpty()) {
insidePositions.put(vesselId, inside);
}
}
int totalVesselsInPolygon = insidePositions.size();
log.info("Vessel contact: filtered={}, insidePolygon={}, dates={}",
filtered.size(), totalVesselsInPolygon, targetDates.size());
// 6. 시간 범위 겹침 사전 필터 + 선박 쌍별 접촉 판정
List<String> vesselIds = new ArrayList<>(insidePositions.keySet());
List<VesselContactPair> contactPairs = new ArrayList<>();
Set<String> involvedVessels = new HashSet<>();
for (int i = 0; i < vesselIds.size(); i++) {
String idA = vesselIds.get(i);
List<InsidePosition> posA = insidePositions.get(idA);
long minTsA = posA.get(0).timestamp;
long maxTsA = posA.get(posA.size() - 1).timestamp;
for (int j = i + 1; j < vesselIds.size(); j++) {
String idB = vesselIds.get(j);
List<InsidePosition> posB = insidePositions.get(idB);
long minTsB = posB.get(0).timestamp;
long maxTsB = posB.get(posB.size() - 1).timestamp;
// 시간 겹침 사전 필터 (minContactDuration 반영)
long overlap = Math.min(maxTsA, maxTsB) - Math.max(minTsA, minTsB);
if (overlap < minDurationSec) continue;
// Two-pointer 접촉 판정
List<VesselContactPair> pairs = detectContacts(
idA, posA, idB, posB,
filtered.get(idA), filtered.get(idB),
minDurationSec, maxDistanceMeters);
if (!pairs.isEmpty()) {
contactPairs.addAll(pairs);
involvedVessels.add(idA);
involvedVessels.add(idB);
// 3. 동시성·메모리 관리
String queryId = "contact-search-" + Long.toHexString(System.nanoTime());
boolean slotAcquired = false, memoryReserved = false;
try {
if (!activeQueryManager.tryAcquireQuerySlotImmediate(queryId)) {
if (!activeQueryManager.tryAcquireQuerySlot(queryId)) {
throw new QueryTimeoutException("서버 과부하: contact-search 슬롯 대기 타임아웃");
}
}
slotAcquired = true;
long estimatedBytes = TrackMemoryEstimator.estimateQueryBytes(targetDates.size(), 2000);
memoryBudgetManager.reserveQueryMemory(queryId, estimatedBytes, 30_000L);
memoryReserved = true;
Map<String, CompactVesselTrack> mergedTracks = areaSearchService.mergeMultipleDays(targetDates);
if (mergedTracks.isEmpty()) {
return buildEmptyResponse(request, targetDates, startMs);
}
// 3-1. ChnPrmShip 필터링
if (request.isChnPrmShipOnly()) {
int totalBefore = mergedTracks.size();
Set<String> chnPrmMmsiSet = chnPrmShipProperties.getMmsiSet();
mergedTracks.entrySet().removeIf(e -> !chnPrmMmsiSet.contains(e.getKey()));
log.debug("ChnPrmShip 필터 적용: {} → {} 선박", totalBefore, mergedTracks.size());
if (mergedTracks.isEmpty()) {
return buildEmptyResponse(request, targetDates, startMs);
}
}
// 4. JTS Polygon + PreparedGeometry
VesselContactRequest.SearchPolygon poly = request.getPolygon();
Polygon jtsPolygon = areaSearchService.toJtsPolygon(poly.getCoordinates());
PreparedGeometry prepared = PreparedGeometryFactory.prepare(jtsPolygon);
// 5. STRtree 후보 필터링 + 폴리곤 내부 포인트 수집
STRtree spatialIndex = areaSearchService.buildSpatialIndex(mergedTracks);
Envelope mbr = jtsPolygon.getEnvelopeInternal();
@SuppressWarnings("unchecked")
List<String> candidates = spatialIndex.query(mbr);
long minDurationSec = request.getMinContactDurationMinutes() * 60L;
double maxDistanceMeters = request.getMaxContactDistanceMeters();
Map<String, List<InsidePosition>> insidePositions = new HashMap<>();
for (String vesselId : candidates) {
CompactVesselTrack track = mergedTracks.get(vesselId);
if (track == null || track.getGeometry() == null) continue;
List<InsidePosition> inside = collectInsidePositions(track, prepared);
if (!inside.isEmpty()) {
insidePositions.put(vesselId, inside);
}
}
int totalVesselsInPolygon = insidePositions.size();
log.info("Vessel contact: merged={}, insidePolygon={}, chnPrmOnly={}, dates={}",
mergedTracks.size(), totalVesselsInPolygon, request.isChnPrmShipOnly(), targetDates.size());
// 6. 시간 범위 겹침 사전 필터 + 선박 쌍별 접촉 판정
List<String> vesselIds = new ArrayList<>(insidePositions.keySet());
List<VesselContactPair> contactPairs = new ArrayList<>();
Set<String> involvedVessels = new HashSet<>();
for (int i = 0; i < vesselIds.size(); i++) {
String idA = vesselIds.get(i);
List<InsidePosition> posA = insidePositions.get(idA);
long minTsA = posA.get(0).timestamp;
long maxTsA = posA.get(posA.size() - 1).timestamp;
for (int j = i + 1; j < vesselIds.size(); j++) {
String idB = vesselIds.get(j);
List<InsidePosition> posB = insidePositions.get(idB);
long minTsB = posB.get(0).timestamp;
long maxTsB = posB.get(posB.size() - 1).timestamp;
// 시간 겹침 사전 필터 (minContactDuration 반영)
long overlap = Math.min(maxTsA, maxTsB) - Math.max(minTsA, minTsB);
if (overlap < minDurationSec) continue;
// Two-pointer 접촉 판정
List<VesselContactPair> pairs = detectContacts(
idA, posA, idB, posB,
mergedTracks.get(idA), mergedTracks.get(idB),
minDurationSec, maxDistanceMeters);
if (!pairs.isEmpty()) {
contactPairs.addAll(pairs);
involvedVessels.add(idA);
involvedVessels.add(idB);
}
}
}
// 7. 관련 선박 트랙 수집
List<CompactVesselTrack> resultTracks = involvedVessels.stream()
.map(mergedTracks::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
long elapsedMs = System.currentTimeMillis() - startMs;
log.info("Vessel contact completed: pairs={}, vessels={}, elapsed={}ms",
contactPairs.size(), involvedVessels.size(), elapsedMs);
return VesselContactResponse.builder()
.contacts(contactPairs)
.tracks(resultTracks)
.summary(VesselContactSummary.builder()
.totalContactPairs(contactPairs.size())
.totalVesselsInvolved(involvedVessels.size())
.totalVesselsInPolygon(totalVesselsInPolygon)
.processingTimeMs(elapsedMs)
.polygonId(poly.getId())
.cachedDates(targetDates.stream()
.map(LocalDate::toString)
.collect(Collectors.toList()))
.build())
.build();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new QueryTimeoutException("contact-search 슬롯 대기 중 인터럽트");
} finally {
if (memoryReserved) memoryBudgetManager.releaseQueryMemory(queryId);
if (slotAcquired) activeQueryManager.releaseQuerySlot(queryId);
}
// 7. 관련 선박 트랙 수집
List<CompactVesselTrack> resultTracks = involvedVessels.stream()
.map(mergedTracks::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
long elapsedMs = System.currentTimeMillis() - startMs;
log.info("Vessel contact completed: pairs={}, vessels={}, elapsed={}ms",
contactPairs.size(), involvedVessels.size(), elapsedMs);
return VesselContactResponse.builder()
.contacts(contactPairs)
.tracks(resultTracks)
.summary(VesselContactSummary.builder()
.totalContactPairs(contactPairs.size())
.totalVesselsInvolved(involvedVessels.size())
.totalVesselsInPolygon(totalVesselsInPolygon)
.processingTimeMs(elapsedMs)
.polygonId(poly.getId())
.cachedDates(targetDates.stream()
.map(LocalDate::toString)
.collect(Collectors.toList()))
.build())
.build();
}
// 입력 검증
@ -173,10 +211,13 @@ public class VesselContactService {
List<double[]> geometry = track.getGeometry();
List<String> timestamps = track.getTimestamps();
List<InsidePosition> inside = new ArrayList<>();
Coordinate reusable = new Coordinate();
for (int i = 0; i < geometry.size(); i++) {
double[] coord = geometry.get(i);
Point point = GEOMETRY_FACTORY.createPoint(new Coordinate(coord[0], coord[1]));
reusable.x = coord[0];
reusable.y = coord[1];
Point point = GEOMETRY_FACTORY.createPoint(reusable);
if (prepared.contains(point)) {
long ts = parseTimestamp(timestamps, i);
inside.add(new InsidePosition(ts, coord[0], coord[1]));
@ -232,7 +273,7 @@ public class VesselContactService {
long diff = Math.abs(a.timestamp - b.timestamp);
if (diff <= SYNC_TOLERANCE_SEC) {
double dist = haversineMeters(a.lat, a.lon, b.lat, b.lon);
double dist = equirectangularMeters(a.lat, a.lon, b.lat, b.lon);
long ts = Math.min(a.timestamp, b.timestamp) + diff / 2; // 중간 시각
matched.add(new MatchedPoint(ts, dist, a, b));
pA++;
@ -278,13 +319,19 @@ public class VesselContactService {
long contactEnd = segment.get(segment.size() - 1).timestamp;
long durationMin = (contactEnd - contactStart) / 60;
DoubleSummaryStatistics distStats = segment.stream()
.mapToDouble(p -> p.distanceMeters)
.summaryStatistics();
// 접촉 중심점 계산
double centerLon = segment.stream().mapToDouble(p -> (p.posA.lon + p.posB.lon) / 2).average().orElse(0);
double centerLat = segment.stream().mapToDouble(p -> (p.posA.lat + p.posB.lat) / 2).average().orElse(0);
// 단일 루프로 거리/중심점 동시 계산
double minDist = Double.MAX_VALUE, maxDist = 0, sumDist = 0;
double sumCenterLon = 0, sumCenterLat = 0;
for (MatchedPoint p : segment) {
if (p.distanceMeters < minDist) minDist = p.distanceMeters;
if (p.distanceMeters > maxDist) maxDist = p.distanceMeters;
sumDist += p.distanceMeters;
sumCenterLon += (p.posA.lon + p.posB.lon) / 2;
sumCenterLat += (p.posA.lat + p.posB.lat) / 2;
}
double avgDist = sumDist / segment.size();
double centerLon = sumCenterLon / segment.size();
double centerLat = sumCenterLat / segment.size();
// 선박의 접촉 구간 inside 포인트로 추정 속도 계산
double speedA = estimateAvgSpeed(insidePosA, contactStart, contactEnd);
@ -299,9 +346,9 @@ public class VesselContactService {
.contactStartTimestamp(contactStart)
.contactEndTimestamp(contactEnd)
.contactDurationMinutes(durationMin)
.minDistanceMeters(Math.round(distStats.getMin() * 10.0) / 10.0)
.avgDistanceMeters(Math.round(distStats.getAverage() * 10.0) / 10.0)
.maxDistanceMeters(Math.round(distStats.getMax() * 10.0) / 10.0)
.minDistanceMeters(Math.round(minDist * 10.0) / 10.0)
.avgDistanceMeters(Math.round(avgDist * 10.0) / 10.0)
.maxDistanceMeters(Math.round(maxDist * 10.0) / 10.0)
.contactCenterPoint(new double[]{
Math.round(centerLon * 1_000_000.0) / 1_000_000.0,
Math.round(centerLat * 1_000_000.0) / 1_000_000.0})
@ -407,16 +454,16 @@ public class VesselContactService {
return totalHours > 0 ? totalDistNm / totalHours : 0.0;
}
// Haversine 거리 계산
// 거리 계산
private double haversineMeters(double lat1, double lon1, double lat2, double lon2) {
/**
* Equirectangular 근사 접촉 거리 판정용 (10km 이내 오차 < 0.1%)
* Haversine 대비 ~2배 빠름 (Math.cos 1회 + Math.sqrt 1회)
*/
private double equirectangularMeters(double lat1, double lon1, double lat2, double lon2) {
double dLat = Math.toRadians(lat2 - lat1);
double dLon = Math.toRadians(lon2 - lon1);
double a = Math.sin(dLat / 2) * Math.sin(dLat / 2)
+ Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2))
* Math.sin(dLon / 2) * Math.sin(dLon / 2);
double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
return EARTH_RADIUS_M * c;
double dLon = Math.toRadians(lon2 - lon1) * Math.cos(Math.toRadians((lat1 + lat2) / 2));
return EARTH_RADIUS_M * Math.sqrt(dLat * dLat + dLon * dLon);
}
private double haversineNm(double lat1, double lon1, double lat2, double lon2) {

파일 보기

@ -72,10 +72,10 @@ public class SequentialPassageController {
.collect(Collectors.toList());
results = trackingService.findSequentialGridPassages(
haeguNumbers, request.getStartTime(), request.getEndTime());
haeguNumbers, request.getStartTime(), request.getEndTime(), request.isChnPrmShipOnly());
} else {
results = trackingService.findSequentialAreaPassages(
request.getZoneIds(), request.getStartTime(), request.getEndTime());
request.getZoneIds(), request.getStartTime(), request.getEndTime(), request.isChnPrmShipOnly());
}
// 응답 구성

파일 보기

@ -58,6 +58,10 @@ public class SequentialPassageRequest {
@Builder.Default
private Boolean sequentialOnly = true;
@Schema(description = "true 시 중국허가선박만 대상으로 순차 통과 조회", example = "false")
@Builder.Default
private boolean chnPrmShipOnly = false;
public enum PassageType {
GRID, AREA
}

파일 보기

@ -1,5 +1,6 @@
package gc.mda.signal_batch.domain.passage.service;
import gc.mda.signal_batch.batch.reader.ChnPrmShipProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jdbc.core.JdbcTemplate;
@ -8,8 +9,10 @@ import org.springframework.stereotype.Service;
import javax.sql.DataSource;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* 순차 구역 통과 선박 조회 최적화 서비스
@ -22,120 +25,140 @@ import java.util.Map;
public class SequentialAreaTrackingService {
private final DataSource queryDataSource;
private final ChnPrmShipProperties chnPrmShipProperties;
public SequentialAreaTrackingService(@Qualifier("queryDataSource") DataSource queryDataSource) {
public SequentialAreaTrackingService(@Qualifier("queryDataSource") DataSource queryDataSource,
ChnPrmShipProperties chnPrmShipProperties) {
this.queryDataSource = queryDataSource;
this.chnPrmShipProperties = chnPrmShipProperties;
}
/**
* 순차적으로 지정된 구역들을 통과한 선박 조회 (Grid)
* 동적 N-구역 SQL JOIN 생성 (2~10개)
*/
public List<Map<String, Object>> findSequentialGridPassages(
List<Integer> haeguNumbers,
LocalDateTime startTime,
LocalDateTime endTime) {
LocalDateTime endTime,
boolean chnPrmShipOnly) {
int n = haeguNumbers.size();
if (n < 2 || n > 10) {
throw new IllegalArgumentException("구역은 2~10개까지 지정 가능합니다: " + n);
}
JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource);
// MATERIALIZED CTE 사용으로 중간 결과 고정
String sql = """
WITH vessel_passages AS (
SELECT DISTINCT
mmsi,
haegu_no,
FIRST_VALUE(time_bucket) OVER (
PARTITION BY mmsi, haegu_no
ORDER BY time_bucket
) as entry_time,
LAST_VALUE(time_bucket) OVER (
PARTITION BY mmsi, haegu_no
ORDER BY time_bucket
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as exit_time
FROM signal.t_grid_vessel_tracks
WHERE time_bucket BETWEEN ? AND ?
AND haegu_no = ANY(ARRAY[?]::integer[])
)
SELECT
v1.mmsi,
v1.entry_time as haegu1_entry,
v1.exit_time as haegu1_exit,
v2.entry_time as haegu2_entry,
v2.exit_time as haegu2_exit,
v3.entry_time as haegu3_entry,
v3.exit_time as haegu3_exit
FROM vessel_passages v1
JOIN vessel_passages v2 ON v1.mmsi = v2.mmsi
AND v2.haegu_no = ? AND v2.entry_time > v1.exit_time
JOIN vessel_passages v3 ON v2.mmsi = v3.mmsi
AND v3.haegu_no = ? AND v3.entry_time > v2.exit_time
WHERE v1.haegu_no = ?
ORDER BY v1.entry_time
""";
StringBuilder sql = new StringBuilder();
sql.append("WITH vessel_passages AS (\n");
sql.append(" SELECT DISTINCT mmsi, haegu_no,\n");
sql.append(" FIRST_VALUE(time_bucket) OVER (PARTITION BY mmsi, haegu_no ORDER BY time_bucket) as entry_time,\n");
sql.append(" LAST_VALUE(time_bucket) OVER (PARTITION BY mmsi, haegu_no ORDER BY time_bucket ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as exit_time\n");
sql.append(" FROM signal.t_grid_vessel_tracks\n");
sql.append(" WHERE time_bucket BETWEEN ? AND ?\n");
sql.append(" AND haegu_no = ANY(ARRAY[?]::integer[])\n");
if (chnPrmShipOnly) {
sql.append(" AND mmsi = ANY(ARRAY[?]::varchar[])\n");
}
sql.append(")\n");
return jdbcTemplate.queryForList(sql,
Timestamp.valueOf(startTime),
Timestamp.valueOf(endTime),
haeguNumbers.toArray(Integer[]::new),
haeguNumbers.get(1),
haeguNumbers.get(2),
haeguNumbers.get(0)
);
// SELECT 컬럼 동적 생성
sql.append("SELECT v1.mmsi");
for (int i = 1; i <= n; i++) {
sql.append(String.format(", v%d.entry_time as haegu%d_entry, v%d.exit_time as haegu%d_exit", i, i, i, i));
}
sql.append("\nFROM vessel_passages v1\n");
// JOIN 동적 생성 (v2~vN)
for (int i = 2; i <= n; i++) {
sql.append(String.format("JOIN vessel_passages v%d ON v%d.mmsi = v1.mmsi AND v%d.haegu_no = ? AND v%d.entry_time > v%d.exit_time\n",
i, i, i, i, i - 1));
}
sql.append("WHERE v1.haegu_no = ?\n");
sql.append("ORDER BY v1.entry_time");
// 파라미터 구성
List<Object> params = new ArrayList<>();
params.add(Timestamp.valueOf(startTime));
params.add(Timestamp.valueOf(endTime));
params.add(haeguNumbers.toArray(Integer[]::new));
if (chnPrmShipOnly) {
Set<String> mmsiSet = chnPrmShipProperties.getMmsiSet();
params.add(mmsiSet.toArray(String[]::new));
}
// v2~vN의 haegu_no 파라미터
for (int i = 1; i < n; i++) {
params.add(haeguNumbers.get(i));
}
// v1의 haegu_no WHERE 조건
params.add(haeguNumbers.get(0));
return jdbcTemplate.queryForList(sql.toString(), params.toArray());
}
/**
* 순차적으로 지정된 구역들을 통과한 선박 조회 (Area)
* 동적 N-구역 SQL JOIN 생성 (2~10개)
*/
public List<Map<String, Object>> findSequentialAreaPassages(
List<String> areaIds,
LocalDateTime startTime,
LocalDateTime endTime) {
LocalDateTime endTime,
boolean chnPrmShipOnly) {
int n = areaIds.size();
if (n < 2 || n > 10) {
throw new IllegalArgumentException("구역은 2~10개까지 지정 가능합니다: " + n);
}
JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource);
String sql = """
WITH area_passages AS (
SELECT DISTINCT
mmsi,
area_id,
FIRST_VALUE(time_bucket) OVER (
PARTITION BY mmsi, area_id
ORDER BY time_bucket
) as entry_time,
LAST_VALUE(time_bucket) OVER (
PARTITION BY mmsi, area_id
ORDER BY time_bucket
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as exit_time
FROM signal.t_area_vessel_tracks
WHERE time_bucket BETWEEN ? AND ?
AND area_id = ANY(ARRAY[?]::varchar[])
)
SELECT
a1.mmsi,
a1.entry_time as area1_entry,
a1.exit_time as area1_exit,
a2.entry_time as area2_entry,
a2.exit_time as area2_exit,
a3.entry_time as area3_entry,
a3.exit_time as area3_exit
FROM area_passages a1
JOIN area_passages a2 ON a1.mmsi = a2.mmsi
AND a2.area_id = ? AND a2.entry_time > a1.exit_time
JOIN area_passages a3 ON a2.mmsi = a3.mmsi
AND a3.area_id = ? AND a3.entry_time > a2.exit_time
WHERE a1.area_id = ?
ORDER BY a1.entry_time
""";
StringBuilder sql = new StringBuilder();
sql.append("WITH area_passages AS (\n");
sql.append(" SELECT DISTINCT mmsi, area_id,\n");
sql.append(" FIRST_VALUE(time_bucket) OVER (PARTITION BY mmsi, area_id ORDER BY time_bucket) as entry_time,\n");
sql.append(" LAST_VALUE(time_bucket) OVER (PARTITION BY mmsi, area_id ORDER BY time_bucket ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as exit_time\n");
sql.append(" FROM signal.t_area_vessel_tracks\n");
sql.append(" WHERE time_bucket BETWEEN ? AND ?\n");
sql.append(" AND area_id = ANY(ARRAY[?]::varchar[])\n");
if (chnPrmShipOnly) {
sql.append(" AND mmsi = ANY(ARRAY[?]::varchar[])\n");
}
sql.append(")\n");
return jdbcTemplate.queryForList(sql,
Timestamp.valueOf(startTime),
Timestamp.valueOf(endTime),
areaIds.toArray(String[]::new),
areaIds.get(1),
areaIds.get(2),
areaIds.get(0)
);
// SELECT 컬럼 동적 생성
sql.append("SELECT a1.mmsi");
for (int i = 1; i <= n; i++) {
sql.append(String.format(", a%d.entry_time as area%d_entry, a%d.exit_time as area%d_exit", i, i, i, i));
}
sql.append("\nFROM area_passages a1\n");
// JOIN 동적 생성 (a2~aN)
for (int i = 2; i <= n; i++) {
sql.append(String.format("JOIN area_passages a%d ON a%d.mmsi = a1.mmsi AND a%d.area_id = ? AND a%d.entry_time > a%d.exit_time\n",
i, i, i, i, i - 1));
}
sql.append("WHERE a1.area_id = ?\n");
sql.append("ORDER BY a1.entry_time");
// 파라미터 구성
List<Object> params = new ArrayList<>();
params.add(Timestamp.valueOf(startTime));
params.add(Timestamp.valueOf(endTime));
params.add(areaIds.toArray(String[]::new));
if (chnPrmShipOnly) {
Set<String> mmsiSet = chnPrmShipProperties.getMmsiSet();
params.add(mmsiSet.toArray(String[]::new));
}
// a2~aN의 area_id 파라미터
for (int i = 1; i < n; i++) {
params.add(areaIds.get(i));
}
// a1의 area_id WHERE 조건
params.add(areaIds.get(0));
return jdbcTemplate.queryForList(sql.toString(), params.toArray());
}
/**

파일 보기

@ -106,7 +106,7 @@ public class QueryMetricsController {
zoom_level, requested_mmsi, unique_vessels, total_tracks,
total_points, points_after_simplify, total_chunks,
response_bytes, elapsed_ms, db_query_ms, simplify_ms,
cache_hit_days, db_query_days
cache_hit_days, db_query_days, client_ip
FROM signal.t_query_metrics
""" + whereClause +
" ORDER BY " + sortBy + " " + direction +

파일 보기

@ -291,7 +291,7 @@ app:
cache:
ais-target:
ttl-minutes: 120 # 기본 TTL (프로파일별 오버라이드)
max-size: 300000 # 최대 캐시 크기 (30만 건)
max-size: 500000 # 최대 캐시 크기 (50만 건)
five-min-track:
ttl-minutes: 75 # TTL 75분 (1시간 + 15분 여유)