Merge pull request 'perf: Daily Job 인메모리 캐시 기반 최적화 — N+1 SQL 제거' (#60) from develop into main
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 2m52s

This commit is contained in:
htlee 2026-02-20 11:39:54 +09:00
커밋 14e61e6bd0
6개의 변경된 파일579개의 추가작업 그리고 507개의 파일을 삭제

파일 보기

@ -1,23 +1,20 @@
package gc.mda.signal_batch.batch.job;
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
import gc.mda.signal_batch.batch.processor.DailyTrackProcessor;
import gc.mda.signal_batch.batch.processor.DailyTrackProcessorWithAbnormalDetection;
import gc.mda.signal_batch.batch.processor.DailyTrackMergeProcessor;
import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector;
import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetectionResult;
import gc.mda.signal_batch.batch.reader.CacheBasedDailyTrackReader;
import gc.mda.signal_batch.batch.reader.HourlyTrackCache;
import gc.mda.signal_batch.batch.writer.VesselTrackBulkWriter;
import gc.mda.signal_batch.batch.writer.AbnormalTrackWriter;
import gc.mda.signal_batch.batch.writer.CompositeTrackWriter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@ -29,7 +26,6 @@ import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@ -44,6 +40,7 @@ public class DailyAggregationStepConfig {
private final VesselTrackBulkWriter vesselTrackBulkWriter;
private final AbnormalTrackWriter abnormalTrackWriter;
private final AbnormalTrackDetector abnormalTrackDetector;
private final HourlyTrackCache hourlyTrackCache;
public DailyAggregationStepConfig(
JobRepository jobRepository,
@ -51,13 +48,15 @@ public class DailyAggregationStepConfig {
@Qualifier("queryTransactionManager") PlatformTransactionManager transactionManager,
VesselTrackBulkWriter vesselTrackBulkWriter,
AbnormalTrackWriter abnormalTrackWriter,
AbnormalTrackDetector abnormalTrackDetector) {
AbnormalTrackDetector abnormalTrackDetector,
HourlyTrackCache hourlyTrackCache) {
this.jobRepository = jobRepository;
this.queryDataSource = queryDataSource;
this.transactionManager = transactionManager;
this.vesselTrackBulkWriter = vesselTrackBulkWriter;
this.abnormalTrackWriter = abnormalTrackWriter;
this.abnormalTrackDetector = abnormalTrackDetector;
this.hourlyTrackCache = hourlyTrackCache;
}
@Value("${vessel.batch.chunk-size:5000}")
@ -65,26 +64,54 @@ public class DailyAggregationStepConfig {
@Bean
public Step mergeDailyTracksStep() {
// 비정상 궤적 검출은 항상 활성화 (설정 파일로 제어)
boolean detectAbnormal = true;
log.info("Building mergeDailyTracksStep with cache-based in-memory merge");
return new StepBuilder("mergeDailyTracksStep", jobRepository)
.<List<VesselTrack>, AbnormalDetectionResult>chunk(chunkSize, transactionManager)
.reader(cacheBasedDailyTrackReader(null, null))
.processor(dailyTrackMergeProcessor(null))
.writer(dailyCompositeTrackWriter())
.listener(dailyTrackMergeProcessor(null))
.build();
}
if (detectAbnormal) {
log.info("Building mergeDailyTracksStep with abnormal detection enabled");
return new StepBuilder("mergeDailyTracksStep", jobRepository)
.<VesselTrack.VesselKey, AbnormalDetectionResult>chunk(chunkSize, transactionManager)
.reader(dailyVesselKeyReader(null, null))
.processor(dailyTrackProcessorWithAbnormalDetection())
.writer(dailyCompositeTrackWriter())
.build();
} else {
log.info("Building mergeDailyTracksStep without abnormal detection");
return new StepBuilder("mergeDailyTracksStep", jobRepository)
.<VesselTrack.VesselKey, VesselTrack>chunk(chunkSize, transactionManager)
.reader(dailyVesselKeyReader(null, null))
.processor(dailyTrackItemProcessor())
.writer(dailyTrackWriter())
.build();
}
@Bean
@StepScope
public CacheBasedDailyTrackReader cacheBasedDailyTrackReader(
@Value("#{jobParameters['startTime']}") String startTime,
@Value("#{jobParameters['endTime']}") String endTime) {
LocalDateTime start = LocalDateTime.parse(startTime);
LocalDateTime end = LocalDateTime.parse(endTime);
return new CacheBasedDailyTrackReader(
hourlyTrackCache,
new JdbcTemplate(queryDataSource),
start,
end);
}
@Bean
@StepScope
public DailyTrackMergeProcessor dailyTrackMergeProcessor(
@Value("#{jobParameters['startTime']}") String startTime) {
LocalDateTime dayBucket = LocalDateTime.parse(startTime)
.withHour(0).withMinute(0).withSecond(0).withNano(0);
return new DailyTrackMergeProcessor(
abnormalTrackDetector,
new JdbcTemplate(queryDataSource),
dayBucket);
}
@Bean
public ItemWriter<AbnormalDetectionResult> dailyCompositeTrackWriter() {
abnormalTrackWriter.setJobName("dailyAggregationJob");
return new CompositeTrackWriter(
vesselTrackBulkWriter,
abnormalTrackWriter,
"daily"
);
}
@Bean
@ -109,58 +136,7 @@ public class DailyAggregationStepConfig {
@Bean
@StepScope
public JdbcCursorItemReader<VesselTrack.VesselKey> dailyVesselKeyReader(
@Value("#{jobParameters['startTime']}") String startTime,
@Value("#{jobParameters['endTime']}") String endTime) {
LocalDateTime start = LocalDateTime.parse(startTime);
LocalDateTime end = LocalDateTime.parse(endTime);
String sql = """
SELECT DISTINCT mmsi, date_trunc('day', time_bucket) as day_bucket
FROM signal.t_vessel_tracks_hourly
WHERE time_bucket >= ? AND time_bucket < ?
ORDER BY mmsi, day_bucket
""";
return new JdbcCursorItemReaderBuilder<VesselTrack.VesselKey>()
.name("dailyVesselKeyReader")
.dataSource(queryDataSource)
.sql(sql)
.preparedStatementSetter(ps -> {
ps.setTimestamp(1, java.sql.Timestamp.valueOf(start));
ps.setTimestamp(2, java.sql.Timestamp.valueOf(end));
})
.rowMapper((rs, rowNum) -> new VesselTrack.VesselKey(
rs.getString("mmsi"),
rs.getObject("day_bucket", LocalDateTime.class)
))
.build();
}
@Bean
public ItemProcessor<VesselTrack.VesselKey, VesselTrack> dailyTrackItemProcessor() {
return new DailyTrackProcessor(queryDataSource, new JdbcTemplate(queryDataSource));
}
@Bean
public ItemWriter<VesselTrack> dailyTrackWriter() {
return items -> {
List<VesselTrack> tracks = new ArrayList<>();
for (VesselTrack track : items) {
if (track != null) {
tracks.add(track);
}
}
if (!tracks.isEmpty()) {
vesselTrackBulkWriter.writeDailyTracks(tracks);
}
};
}
@Bean
@StepScope
public JdbcCursorItemReader<Integer> dailyGridReader(
public org.springframework.batch.item.database.JdbcCursorItemReader<Integer> dailyGridReader(
@Value("#{jobParameters['startTime']}") String startTime,
@Value("#{jobParameters['endTime']}") String endTime) {
@ -174,7 +150,7 @@ public class DailyAggregationStepConfig {
ORDER BY haegu_no
""";
return new JdbcCursorItemReaderBuilder<Integer>()
return new org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder<Integer>()
.name("dailyGridReader")
.dataSource(queryDataSource)
.sql(sql)
@ -187,7 +163,7 @@ public class DailyAggregationStepConfig {
}
@Bean
public ItemProcessor<Integer, DailyGridSummary> dailyGridProcessor() {
public org.springframework.batch.item.ItemProcessor<Integer, DailyGridSummary> dailyGridProcessor() {
return haeguNo -> {
DailyGridSummary summary = new DailyGridSummary();
summary.haeguNo = haeguNo;
@ -247,7 +223,7 @@ public class DailyAggregationStepConfig {
@Bean
@StepScope
public JdbcCursorItemReader<String> dailyAreaReader(
public org.springframework.batch.item.database.JdbcCursorItemReader<String> dailyAreaReader(
@Value("#{jobParameters['startTime']}") String startTime,
@Value("#{jobParameters['endTime']}") String endTime) {
@ -261,7 +237,7 @@ public class DailyAggregationStepConfig {
ORDER BY area_id
""";
return new JdbcCursorItemReaderBuilder<String>()
return new org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder<String>()
.name("dailyAreaReader")
.dataSource(queryDataSource)
.sql(sql)
@ -274,7 +250,7 @@ public class DailyAggregationStepConfig {
}
@Bean
public ItemProcessor<String, DailyAreaSummary> dailyAreaProcessor() {
public org.springframework.batch.item.ItemProcessor<String, DailyAreaSummary> dailyAreaProcessor() {
return areaId -> {
DailyAreaSummary summary = new DailyAreaSummary();
summary.areaId = areaId;
@ -332,27 +308,6 @@ public class DailyAggregationStepConfig {
};
}
// 비정상 궤적 검출 관련 정의
@Bean
public ItemProcessor<VesselTrack.VesselKey, AbnormalDetectionResult> dailyTrackProcessorWithAbnormalDetection() {
return new DailyTrackProcessorWithAbnormalDetection(
dailyTrackItemProcessor(),
abnormalTrackDetector,
queryDataSource
);
}
@Bean
public ItemWriter<AbnormalDetectionResult> dailyCompositeTrackWriter() {
// Job 이름 직접 설정
abnormalTrackWriter.setJobName("dailyAggregationJob");
return new CompositeTrackWriter(
vesselTrackBulkWriter,
abnormalTrackWriter,
"daily"
);
}
// Summary 클래스들
public static class DailyGridSummary {
public Integer haeguNo;

파일 보기

@ -1,136 +0,0 @@
package gc.mda.signal_batch.batch.processor;
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetectionResult;
import gc.mda.signal_batch.global.util.LineStringMUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* 시간별/일별 궤적 프로세서 기본 클래스 - 비정상 궤적 검출 기능 포함
*/
@Slf4j
@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true)
@RequiredArgsConstructor
public abstract class BaseTrackProcessorWithAbnormalDetection implements ItemProcessor<VesselTrack.VesselKey, AbnormalDetectionResult> {
protected final ItemProcessor<VesselTrack.VesselKey, VesselTrack> trackProcessor;
protected final AbnormalTrackDetector abnormalTrackDetector;
protected final DataSource queryDataSource;
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Override
public AbnormalDetectionResult process(VesselTrack.VesselKey vesselKey) throws Exception {
// 기존 프로세서로 궤적 생성
VesselTrack track = trackProcessor.process(vesselKey);
if (track == null) {
return null;
}
// 이전 bucket의 마지막 궤적 조회
VesselTrack previousTrack = getPreviousBucketLastTrack(vesselKey);
// Bucket 연결점만 검사 (하위 데이터는 이미 검증됨)
AbnormalDetectionResult result = abnormalTrackDetector.detectBucketTransitionOnly(track, previousTrack);
if (result.hasAbnormalities()) {
log.debug("Abnormal track detected for vessel {} at {}: {}",
track.getMmsi(), track.getTimeBucket(),
result.getAbnormalSegments().size());
}
return result;
}
/**
* 이전 버킷의 마지막 궤적 조회
*/
protected VesselTrack getPreviousBucketLastTrack(VesselTrack.VesselKey vesselKey) {
try {
String sql = """
SELECT mmsi, time_bucket,
end_position,
public.ST_AsText(public.ST_LineSubstring(track_geom, 0.9, 1.0)) as last_segment
FROM %s
WHERE mmsi = ?
AND time_bucket >= ?
AND time_bucket < ?
ORDER BY time_bucket DESC
LIMIT 1
""".formatted(getPreviousTrackTableName());
LocalDateTime currentBucket = getNormalizedBucket(vesselKey.getTimeBucket());
LocalDateTime previousBucket = getPreviousBucket(currentBucket);
// Convert to java.sql.Timestamp for proper PostgreSQL type handling
Timestamp previousBucketTimestamp = Timestamp.valueOf(previousBucket);
Timestamp currentBucketTimestamp = Timestamp.valueOf(currentBucket);
JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource);
return jdbcTemplate.queryForObject(sql,
(rs, rowNum) -> {
return VesselTrack.builder()
.mmsi(rs.getString("mmsi"))
.timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime())
.trackGeom(rs.getString("last_segment"))
.endPosition(parseEndPosition(rs.getString("end_position")))
.build();
},
vesselKey.getMmsi(), previousBucketTimestamp, currentBucketTimestamp
);
} catch (Exception e) {
log.debug("No previous bucket track found for vessel {}", vesselKey);
return null;
}
}
/**
* JSON 형식의 end_position 파싱
*/
protected VesselTrack.TrackPosition parseEndPosition(String json) {
if (json == null) return null;
try {
String lat = LineStringMUtils.extractJsonValue(json, "lat");
String lon = LineStringMUtils.extractJsonValue(json, "lon");
String time = LineStringMUtils.extractJsonValue(json, "time");
String sog = LineStringMUtils.extractJsonValue(json, "sog");
return VesselTrack.TrackPosition.builder()
.lat(lat != null ? Double.parseDouble(lat) : null)
.lon(lon != null ? Double.parseDouble(lon) : null)
.time(time != null ? LocalDateTime.parse(time, TIMESTAMP_FORMATTER) : null)
.sog(sog != null ? new BigDecimal(sog) : null)
.build();
} catch (Exception e) {
log.error("Failed to parse end position: {}", json, e);
return null;
}
}
/**
* 이전 트랙을 조회할 테이블명 반환 (하위 클래스에서 구현)
*/
protected abstract String getPreviousTrackTableName();
/**
* 정규화된 버킷 시간 반환 (하위 클래스에서 구현)
*/
protected abstract LocalDateTime getNormalizedBucket(LocalDateTime timeBucket);
/**
* 이전 버킷 시간 계산 (하위 클래스에서 구현)
*/
protected abstract LocalDateTime getPreviousBucket(LocalDateTime currentBucket);
}

파일 보기

@ -0,0 +1,298 @@
package gc.mda.signal_batch.batch.processor;
import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetectionResult;
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
import gc.mda.signal_batch.global.util.LineStringMUtils;
import gc.mda.signal_batch.global.util.TrackSimplificationUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.jdbc.core.JdbcTemplate;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 인메모리 기반 Daily Track 병합 프로세서
*
* Hourly 트랙 리스트를 받아:
* 1. WKT 좌표 연결 (Java String)
* 2. 통계 집계 (distance, speed, pointCount)
* 3. 간소화 (TrackSimplificationUtils.simplifyDailyTrack)
* 4. 비정상 검출 (이전 날짜 1회 bulk prefetch)
*
* N+1 SQL 제거 DB 쿼리 최대 1회 (비정상 검출용 이전 날짜 prefetch)
*/
@Slf4j
public class DailyTrackMergeProcessor
implements ItemProcessor<List<VesselTrack>, AbnormalDetectionResult>, StepExecutionListener {
private static final Pattern WKT_COORDS_PATTERN = Pattern.compile("LINESTRING M\\((.+)\\)");
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private final AbnormalTrackDetector abnormalTrackDetector;
private final JdbcTemplate queryJdbcTemplate;
private final LocalDateTime dayBucket;
// Lazy-init: 이전 날짜 데이터 1회 bulk prefetch
private Map<String, VesselTrack> previousDayCache;
private boolean previousDayLoaded = false;
// Step 레벨 집계 카운터
private int totalProcessed = 0;
private int mergeFailCount = 0;
private int simplifiedCount = 0;
private int abnormalCount = 0;
private int avgSpeedFailCount = 0;
public DailyTrackMergeProcessor(
AbnormalTrackDetector abnormalTrackDetector,
JdbcTemplate queryJdbcTemplate,
LocalDateTime dayBucket) {
this.abnormalTrackDetector = abnormalTrackDetector;
this.queryJdbcTemplate = queryJdbcTemplate;
this.dayBucket = dayBucket;
}
@Override
public AbnormalDetectionResult process(List<VesselTrack> hourlyTracks) throws Exception {
if (hourlyTracks == null || hourlyTracks.isEmpty()) {
return null;
}
String mmsi = hourlyTracks.get(0).getMmsi();
totalProcessed++;
// Step 1: WKT 좌표 병합
String mergedWkt = mergeTrackGeometries(hourlyTracks);
if (mergedWkt == null) {
mergeFailCount++;
return null;
}
// Step 2: 통계 집계
BigDecimal totalDistance = BigDecimal.ZERO;
BigDecimal maxSpeed = BigDecimal.ZERO;
int totalPoints = 0;
for (VesselTrack track : hourlyTracks) {
if (track.getDistanceNm() != null) {
totalDistance = totalDistance.add(track.getDistanceNm());
}
if (track.getMaxSpeed() != null && track.getMaxSpeed().compareTo(maxSpeed) > 0) {
maxSpeed = track.getMaxSpeed();
}
if (track.getPointCount() != null) {
totalPoints += track.getPointCount();
}
}
// avgSpeed: M값 기반 시간 차이로 계산
BigDecimal avgSpeed = calculateAvgSpeed(mergedWkt, totalDistance);
VesselTrack.TrackPosition startPos = hourlyTracks.get(0).getStartPosition();
VesselTrack.TrackPosition endPos = hourlyTracks.get(hourlyTracks.size() - 1).getEndPosition();
// Step 3: 일별 간소화
String simplifiedWkt = TrackSimplificationUtils.simplifyDailyTrack(mergedWkt);
int simplifiedPoints = countWktPoints(simplifiedWkt);
if (!mergedWkt.equals(simplifiedWkt)) {
simplifiedCount++;
}
VesselTrack dailyTrack = VesselTrack.builder()
.mmsi(mmsi)
.timeBucket(dayBucket)
.trackGeom(simplifiedWkt)
.distanceNm(totalDistance)
.avgSpeed(avgSpeed)
.maxSpeed(maxSpeed)
.pointCount(simplifiedPoints > 0 ? simplifiedPoints : totalPoints)
.startPosition(startPos)
.endPosition(endPos)
.build();
// Step 4: 비정상 검출 (lazy-init, 1회 bulk prefetch)
if (!previousDayLoaded) {
previousDayCache = bulkFetchPreviousDayLastTracks();
previousDayLoaded = true;
}
VesselTrack prevTrack = previousDayCache.get(mmsi);
AbnormalDetectionResult result = abnormalTrackDetector.detectBucketTransitionOnly(dailyTrack, prevTrack);
if (result.hasAbnormalities()) {
abnormalCount++;
}
return result;
}
/**
* Hourly 트랙들의 WKT 좌표를 하나로 연결
*/
private String mergeTrackGeometries(List<VesselTrack> tracks) {
StringBuilder allCoords = new StringBuilder();
for (VesselTrack track : tracks) {
String wkt = track.getTrackGeom();
if (wkt == null || wkt.isEmpty()) continue;
Matcher matcher = WKT_COORDS_PATTERN.matcher(wkt);
if (matcher.find()) {
String coords = matcher.group(1);
if (!coords.isBlank()) {
if (allCoords.length() > 0) {
allCoords.append(", ");
}
allCoords.append(coords);
}
}
}
if (allCoords.length() == 0) {
return null;
}
return "LINESTRING M(" + allCoords + ")";
}
/**
* M값(Unix timestamp) 기반 평균 속도 계산
*/
private BigDecimal calculateAvgSpeed(String wkt, BigDecimal totalDistance) {
try {
Matcher matcher = WKT_COORDS_PATTERN.matcher(wkt);
if (!matcher.find()) return BigDecimal.ZERO;
String coords = matcher.group(1);
String[] points = coords.split(",");
if (points.length < 2) return BigDecimal.ZERO;
// 번째 포인트의 M값
String[] firstParts = points[0].trim().split("\\s+");
double firstM = firstParts.length >= 3 ? Double.parseDouble(firstParts[2]) : 0;
// 마지막 포인트의 M값
String[] lastParts = points[points.length - 1].trim().split("\\s+");
double lastM = lastParts.length >= 3 ? Double.parseDouble(lastParts[2]) : 0;
double timeDiffSeconds = lastM - firstM;
if (timeDiffSeconds <= 0) return BigDecimal.ZERO;
double timeDiffHours = timeDiffSeconds / 3600.0;
double avgSpeedVal = totalDistance.doubleValue() / timeDiffHours;
// 비현실적 속도 제한
avgSpeedVal = Math.min(avgSpeedVal, 9999.99);
return BigDecimal.valueOf(avgSpeedVal).setScale(2, RoundingMode.HALF_UP);
} catch (Exception e) {
avgSpeedFailCount++;
return BigDecimal.ZERO;
}
}
@Override
public void beforeStep(StepExecution stepExecution) {
// no-op
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
log.info("Daily 병합 처리 집계 — 총: {}, 병합실패: {}, 간소화: {}, 비정상: {}, avgSpeed실패: {}",
totalProcessed, mergeFailCount, simplifiedCount, abnormalCount, avgSpeedFailCount);
return null;
}
private int countWktPoints(String wkt) {
if (wkt == null || !wkt.startsWith("LINESTRING M")) return 0;
try {
String coords = wkt.substring("LINESTRING M(".length(), wkt.length() - 1);
return coords.split(",").length;
} catch (Exception e) {
return 0;
}
}
/**
* 비정상 검출용 전일(dayBucket-1일) MMSI별 마지막 hourly 트랙 bulk prefetch
*/
private Map<String, VesselTrack> bulkFetchPreviousDayLastTracks() {
LocalDateTime prevStart = dayBucket.minusDays(1);
LocalDateTime prevEnd = dayBucket;
String sql = """
SELECT DISTINCT ON (mmsi)
mmsi, time_bucket, end_position,
public.ST_AsText(public.ST_LineSubstring(track_geom, 0.9, 1.0)) as last_segment
FROM signal.t_vessel_tracks_hourly
WHERE time_bucket >= ? AND time_bucket < ?
AND track_geom IS NOT NULL
ORDER BY mmsi, time_bucket DESC
""";
Map<String, VesselTrack> result = new HashMap<>();
int[] parseFailCount = {0};
try {
queryJdbcTemplate.query(sql,
ps -> {
ps.setTimestamp(1, Timestamp.valueOf(prevStart));
ps.setTimestamp(2, Timestamp.valueOf(prevEnd));
},
rs -> {
String mmsi = rs.getString("mmsi");
VesselTrack.TrackPosition endPos = parseEndPosition(rs.getString("end_position"));
if (endPos == null && rs.getString("end_position") != null) {
parseFailCount[0]++;
}
VesselTrack track = VesselTrack.builder()
.mmsi(mmsi)
.timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime())
.trackGeom(rs.getString("last_segment"))
.endPosition(endPos)
.build();
result.put(mmsi, track);
});
log.info("전일 트랙 prefetch 완료: {} 선박 (기간: {} ~ {})",
result.size(), prevStart, prevEnd);
if (parseFailCount[0] > 0) {
log.debug("end_position 파싱 실패: {} 건", parseFailCount[0]);
}
} catch (Exception e) {
log.warn("전일 트랙 prefetch 실패 (첫 실행일 수 있음): {}", e.getMessage());
}
return result;
}
private VesselTrack.TrackPosition parseEndPosition(String json) {
if (json == null) return null;
try {
String lat = LineStringMUtils.extractJsonValue(json, "lat");
String lon = LineStringMUtils.extractJsonValue(json, "lon");
String time = LineStringMUtils.extractJsonValue(json, "time");
String sog = LineStringMUtils.extractJsonValue(json, "sog");
return VesselTrack.TrackPosition.builder()
.lat(lat != null ? Double.parseDouble(lat) : null)
.lon(lon != null ? Double.parseDouble(lon) : null)
.time(time != null ? LocalDateTime.parse(time, TIMESTAMP_FORMATTER) : null)
.sog(sog != null ? new BigDecimal(sog) : null)
.build();
} catch (Exception e) {
return null;
}
}
}

파일 보기

@ -1,197 +0,0 @@
package gc.mda.signal_batch.batch.processor;
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.jdbc.core.JdbcTemplate;
import java.math.BigDecimal;
import java.sql.ResultSet;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import gc.mda.signal_batch.global.util.LineStringMUtils;
import gc.mda.signal_batch.global.util.TrackSimplificationUtils;
import javax.sql.DataSource;
@Slf4j
@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true)
@RequiredArgsConstructor
public class DailyTrackProcessor implements ItemProcessor<VesselTrack.VesselKey, VesselTrack> {
private final DataSource queryDataSource;
private final JdbcTemplate jdbcTemplate;
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Override
public VesselTrack process(VesselTrack.VesselKey vesselKey) throws Exception {
LocalDateTime dayBucket = vesselKey.getTimeBucket()
.withHour(0)
.withMinute(0)
.withSecond(0)
.withNano(0);
String sql = """
WITH ordered_tracks AS (
SELECT *
FROM signal.t_vessel_tracks_hourly
WHERE mmsi = ?
AND time_bucket >= ?
AND time_bucket < ?
AND track_geom IS NOT NULL
AND public.ST_NPoints(track_geom) > 0
ORDER BY time_bucket
),
merged_coords AS (
SELECT
mmsi,
string_agg(
substring(public.ST_AsText(track_geom) from 'M \\((.+)\\)'),
','
ORDER BY time_bucket
) FILTER (WHERE track_geom IS NOT NULL) as all_coords
FROM ordered_tracks
GROUP BY mmsi
),
merged_tracks AS (
SELECT
mc.mmsi,
TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS') as time_bucket,
public.ST_GeomFromText('LINESTRING M(' || mc.all_coords || ')') as merged_geom,
(SELECT MAX(max_speed) FROM ordered_tracks WHERE mmsi = mc.mmsi) as max_speed,
(SELECT SUM(point_count) FROM ordered_tracks WHERE mmsi = mc.mmsi) as total_points,
(SELECT MIN(time_bucket) FROM ordered_tracks WHERE mmsi = mc.mmsi) as start_time,
(SELECT MAX(time_bucket) FROM ordered_tracks WHERE mmsi = mc.mmsi) as end_time,
(SELECT start_position FROM ordered_tracks WHERE mmsi = mc.mmsi ORDER BY time_bucket LIMIT 1) as start_pos,
(SELECT end_position FROM ordered_tracks WHERE mmsi = mc.mmsi ORDER BY time_bucket DESC LIMIT 1) as end_pos
FROM merged_coords mc
),
calculated_tracks AS (
SELECT
*,
public.ST_Length(merged_geom::geography) / 1852.0 as total_distance,
CASE
WHEN public.ST_NPoints(merged_geom) > 0 THEN
public.ST_M(public.ST_PointN(merged_geom, public.ST_NPoints(merged_geom))) -
public.ST_M(public.ST_PointN(merged_geom, 1))
ELSE
EXTRACT(EPOCH FROM
TO_TIMESTAMP(end_pos->>'time', 'YYYY-MM-DD HH24:MI:SS') - TO_TIMESTAMP(start_pos->>'time', 'YYYY-MM-DD HH24:MI:SS')
)
END as time_diff_seconds
FROM merged_tracks
)
SELECT
mmsi,
time_bucket,
merged_geom,
total_distance,
CASE
WHEN time_diff_seconds > 0 THEN
CAST(LEAST((total_distance / (time_diff_seconds / 3600.0)), 9999.99) AS numeric(6,2))
ELSE 0
END as avg_speed,
max_speed,
total_points,
start_time,
end_time,
start_pos,
end_pos,
public.ST_AsText(merged_geom) as geom_text
FROM calculated_tracks
""";
LocalDateTime startTime = dayBucket;
LocalDateTime endTime = dayBucket.plusDays(1);
Timestamp startTimestamp = Timestamp.valueOf(startTime);
Timestamp endTimestamp = Timestamp.valueOf(endTime);
Timestamp dayBucketTimestamp = Timestamp.valueOf(dayBucket);
log.debug("DailyTrackProcessor params - mmsi: {}, startTime: {}, endTime: {}, dayBucket: {}",
vesselKey.getMmsi(), startTimestamp, endTimestamp, dayBucketTimestamp);
try {
return jdbcTemplate.queryForObject(sql,
(rs, rowNum) -> {
try {
return buildDailyTrack(rs, dayBucket);
} catch (Exception e) {
throw new RuntimeException("Failed to build daily track", e);
}
},
vesselKey.getMmsi(),
startTimestamp, endTimestamp, dayBucketTimestamp
);
} catch (org.springframework.dao.EmptyResultDataAccessException e) {
log.warn("No hourly data found for vessel {} in time range {}-{}, skipping daily aggregation",
vesselKey.getMmsi(), startTimestamp, endTimestamp);
return null;
} catch (Exception e) {
log.error("Failed to process daily track for vessel {}: {}",
vesselKey.getMmsi(), e.getMessage(), e);
return null;
}
}
private VesselTrack buildDailyTrack(ResultSet rs, LocalDateTime dayBucket) throws Exception {
VesselTrack.TrackPosition startPos = null;
VesselTrack.TrackPosition endPos = null;
String startPosJson = rs.getString("start_pos");
String endPosJson = rs.getString("end_pos");
if (startPosJson != null) {
startPos = parseTrackPosition(startPosJson);
}
if (endPosJson != null) {
endPos = parseTrackPosition(endPosJson);
}
String dailyLineStringM = rs.getString("geom_text");
String simplifiedLineStringM = TrackSimplificationUtils.simplifyDailyTrack(dailyLineStringM);
if (!dailyLineStringM.equals(simplifiedLineStringM)) {
TrackSimplificationUtils.SimplificationStats stats =
TrackSimplificationUtils.getSimplificationStats(dailyLineStringM, simplifiedLineStringM);
log.debug("일별 궤적 간소화 - vessel: {}, 원본: {}포인트, 간소화: {}포인트 ({}% 감소)",
rs.getString("mmsi"),
stats.originalPoints, stats.simplifiedPoints, (int)stats.reductionRate);
}
return VesselTrack.builder()
.mmsi(rs.getString("mmsi"))
.timeBucket(dayBucket)
.trackGeom(simplifiedLineStringM)
.distanceNm(rs.getBigDecimal("total_distance"))
.avgSpeed(rs.getBigDecimal("avg_speed"))
.maxSpeed(rs.getBigDecimal("max_speed"))
.pointCount(rs.getInt("total_points"))
.startPosition(startPos)
.endPosition(endPos)
.build();
}
private VesselTrack.TrackPosition parseTrackPosition(String json) {
try {
String latStr = LineStringMUtils.extractJsonValue(json, "lat");
String lonStr = LineStringMUtils.extractJsonValue(json, "lon");
String timeStr = LineStringMUtils.extractJsonValue(json, "time");
String sogStr = LineStringMUtils.extractJsonValue(json, "sog");
return VesselTrack.TrackPosition.builder()
.lat(latStr != null ? Double.parseDouble(latStr) : null)
.lon(lonStr != null ? Double.parseDouble(lonStr) : null)
.time(timeStr != null ? LocalDateTime.parse(timeStr, TIMESTAMP_FORMATTER) : null)
.sog(sogStr != null ? new BigDecimal(sogStr) : null)
.build();
} catch (Exception e) {
log.error("Failed to parse track position: {}", json, e);
return null;
}
}
}

파일 보기

@ -1,38 +0,0 @@
package gc.mda.signal_batch.batch.processor;
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import javax.sql.DataSource;
import java.time.LocalDateTime;
/**
* 일별 궤적 프로세서 - 비정상 궤적 검출 기능 포함
*/
@Slf4j
@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true)
public class DailyTrackProcessorWithAbnormalDetection extends BaseTrackProcessorWithAbnormalDetection {
public DailyTrackProcessorWithAbnormalDetection(
ItemProcessor<VesselTrack.VesselKey, VesselTrack> dailyTrackProcessor,
AbnormalTrackDetector abnormalTrackDetector,
DataSource queryDataSource) {
super(dailyTrackProcessor, abnormalTrackDetector, queryDataSource);
}
@Override
protected String getPreviousTrackTableName() {
return "signal.t_vessel_tracks_hourly";
}
@Override
protected LocalDateTime getNormalizedBucket(LocalDateTime timeBucket) {
return timeBucket.withHour(0).withMinute(0).withSecond(0).withNano(0);
}
@Override
protected LocalDateTime getPreviousBucket(LocalDateTime currentBucket) {
return currentBucket.minusDays(1);
}
}

파일 보기

@ -0,0 +1,190 @@
package gc.mda.signal_batch.batch.reader;
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
import gc.mda.signal_batch.global.util.LineStringMUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemReader;
import org.springframework.jdbc.core.JdbcTemplate;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
/**
* 캐시 기반 Daily Track Reader
*
* HourlyTrackCache(L2)에서 시간별 트랙을 MMSI별로 읽어 반환.
* 캐시에 없는 MMSI는 DB fallback으로 보충.
*
* 정상 운영 : DB 쿼리 1회 (DISTINCT mmsi 완전성 확인)
* 재시작 : DB 쿼리 2회 (완전성 확인 + fallback 벌크)
*/
@Slf4j
public class CacheBasedDailyTrackReader implements ItemReader<List<VesselTrack>> {
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private final HourlyTrackCache hourlyTrackCache;
private final JdbcTemplate queryJdbcTemplate;
private final LocalDateTime startTime;
private final LocalDateTime endTime;
private Iterator<List<VesselTrack>> groupIterator;
private boolean initialized = false;
public CacheBasedDailyTrackReader(
HourlyTrackCache hourlyTrackCache,
JdbcTemplate queryJdbcTemplate,
LocalDateTime startTime,
LocalDateTime endTime) {
this.hourlyTrackCache = hourlyTrackCache;
this.queryJdbcTemplate = queryJdbcTemplate;
this.startTime = startTime;
this.endTime = endTime;
}
@Override
public List<VesselTrack> read() {
if (!initialized) {
initialize();
initialized = true;
}
if (groupIterator != null && groupIterator.hasNext()) {
return groupIterator.next();
}
return null;
}
private void initialize() {
// 1. L2 캐시에서 데이터 로드
Map<String, List<VesselTrack>> cacheData = hourlyTrackCache.getTracksInRange(startTime, endTime);
log.info("Daily Reader 초기화 — 캐시에서 {} 선박 로드 (기간: {} ~ {})",
cacheData.size(), startTime, endTime);
// 2. DB에서 해당 기간 MMSI 목록 조회 (완전성 확인)
String distinctSql = """
SELECT DISTINCT mmsi FROM signal.t_vessel_tracks_hourly
WHERE time_bucket >= ? AND time_bucket < ?
""";
List<String> dbMmsiList = queryJdbcTemplate.queryForList(
distinctSql, String.class,
Timestamp.valueOf(startTime), Timestamp.valueOf(endTime));
Set<String> dbMmsiSet = new HashSet<>(dbMmsiList);
// 3. 캐시에 없는 MMSI 감지
Set<String> cacheMmsiSet = cacheData.keySet();
Set<String> missingMmsi = new HashSet<>(dbMmsiSet);
missingMmsi.removeAll(cacheMmsiSet);
if (!missingMmsi.isEmpty()) {
log.info("캐시 미스 {} 선박 → DB fallback", missingMmsi.size());
Map<String, List<VesselTrack>> fallbackData = fetchFromDb(missingMmsi);
// 캐시 데이터와 병합
Map<String, List<VesselTrack>> merged = new LinkedHashMap<>(cacheData);
merged.putAll(fallbackData);
cacheData = merged;
}
// 캐시에만 있고 DB에 없는 경우 (stale 캐시) 제거
int staleCount = 0;
Iterator<String> it = cacheData.keySet().iterator();
while (it.hasNext()) {
if (!dbMmsiSet.contains(it.next())) {
it.remove();
staleCount++;
}
}
if (staleCount > 0) {
log.debug("Stale 캐시 항목 {} 건 제거", staleCount);
}
log.info("Daily Reader 준비 완료 — 총 {} 선박 (캐시: {}, DB fallback: {})",
cacheData.size(), cacheData.size() - missingMmsi.size(), missingMmsi.size());
groupIterator = cacheData.values().iterator();
}
/**
* 누락된 MMSI의 시간별 트랙을 DB에서 벌크 조회
*/
private Map<String, List<VesselTrack>> fetchFromDb(Set<String> mmsiSet) {
if (mmsiSet.isEmpty()) return Collections.emptyMap();
String[] mmsiArray = mmsiSet.toArray(new String[0]);
String sql = """
SELECT mmsi, time_bucket,
public.ST_AsText(track_geom) as geom_text,
distance_nm, avg_speed, max_speed, point_count,
start_position, end_position
FROM signal.t_vessel_tracks_hourly
WHERE mmsi = ANY(?)
AND time_bucket >= ? AND time_bucket < ?
AND track_geom IS NOT NULL
ORDER BY mmsi, time_bucket
""";
Map<String, List<VesselTrack>> result = new LinkedHashMap<>();
int[] parseFailCount = {0};
queryJdbcTemplate.query(sql,
ps -> {
ps.setArray(1, ps.getConnection().createArrayOf("varchar", mmsiArray));
ps.setTimestamp(2, Timestamp.valueOf(startTime));
ps.setTimestamp(3, Timestamp.valueOf(endTime));
},
rs -> {
String mmsi = rs.getString("mmsi");
VesselTrack.TrackPosition startPos = parseTrackPosition(rs.getString("start_position"));
VesselTrack.TrackPosition endPos = parseTrackPosition(rs.getString("end_position"));
if ((startPos == null && rs.getString("start_position") != null)
|| (endPos == null && rs.getString("end_position") != null)) {
parseFailCount[0]++;
}
VesselTrack track = VesselTrack.builder()
.mmsi(mmsi)
.timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime())
.trackGeom(rs.getString("geom_text"))
.distanceNm(rs.getBigDecimal("distance_nm"))
.avgSpeed(rs.getBigDecimal("avg_speed"))
.maxSpeed(rs.getBigDecimal("max_speed"))
.pointCount(rs.getInt("point_count"))
.startPosition(startPos)
.endPosition(endPos)
.build();
result.computeIfAbsent(mmsi, k -> new ArrayList<>()).add(track);
});
log.info("DB fallback 완료: {} 선박, {} 트랙",
result.size(), result.values().stream().mapToInt(List::size).sum());
if (parseFailCount[0] > 0) {
log.debug("TrackPosition 파싱 실패: {} 건", parseFailCount[0]);
}
return result;
}
private VesselTrack.TrackPosition parseTrackPosition(String json) {
if (json == null) return null;
try {
String latStr = LineStringMUtils.extractJsonValue(json, "lat");
String lonStr = LineStringMUtils.extractJsonValue(json, "lon");
String timeStr = LineStringMUtils.extractJsonValue(json, "time");
String sogStr = LineStringMUtils.extractJsonValue(json, "sog");
return VesselTrack.TrackPosition.builder()
.lat(latStr != null ? Double.parseDouble(latStr) : null)
.lon(lonStr != null ? Double.parseDouble(lonStr) : null)
.time(timeStr != null ? LocalDateTime.parse(timeStr, TIMESTAMP_FORMATTER) : null)
.sog(sogStr != null ? new BigDecimal(sogStr) : null)
.build();
} catch (Exception e) {
return null;
}
}
}