Merge pull request 'fix: shipimg path conflict' (#59) from feature/dashboard-phase-1 into develop
This commit is contained in:
커밋
fc7beac9f7
@ -1,23 +1,20 @@
|
||||
package gc.mda.signal_batch.batch.job;
|
||||
|
||||
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
|
||||
import gc.mda.signal_batch.batch.processor.DailyTrackProcessor;
|
||||
import gc.mda.signal_batch.batch.processor.DailyTrackProcessorWithAbnormalDetection;
|
||||
import gc.mda.signal_batch.batch.processor.DailyTrackMergeProcessor;
|
||||
import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector;
|
||||
import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetectionResult;
|
||||
import gc.mda.signal_batch.batch.reader.CacheBasedDailyTrackReader;
|
||||
import gc.mda.signal_batch.batch.reader.HourlyTrackCache;
|
||||
import gc.mda.signal_batch.batch.writer.VesselTrackBulkWriter;
|
||||
import gc.mda.signal_batch.batch.writer.AbnormalTrackWriter;
|
||||
import gc.mda.signal_batch.batch.writer.CompositeTrackWriter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.core.Step;
|
||||
import org.springframework.batch.core.configuration.annotation.StepScope;
|
||||
import org.springframework.batch.core.repository.JobRepository;
|
||||
import org.springframework.batch.core.step.builder.StepBuilder;
|
||||
import org.springframework.batch.item.ItemProcessor;
|
||||
import org.springframework.batch.item.ItemWriter;
|
||||
import org.springframework.batch.item.database.JdbcCursorItemReader;
|
||||
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
@ -29,7 +26,6 @@ import org.springframework.transaction.PlatformTransactionManager;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@ -44,6 +40,7 @@ public class DailyAggregationStepConfig {
|
||||
private final VesselTrackBulkWriter vesselTrackBulkWriter;
|
||||
private final AbnormalTrackWriter abnormalTrackWriter;
|
||||
private final AbnormalTrackDetector abnormalTrackDetector;
|
||||
private final HourlyTrackCache hourlyTrackCache;
|
||||
|
||||
public DailyAggregationStepConfig(
|
||||
JobRepository jobRepository,
|
||||
@ -51,42 +48,72 @@ public class DailyAggregationStepConfig {
|
||||
@Qualifier("queryTransactionManager") PlatformTransactionManager transactionManager,
|
||||
VesselTrackBulkWriter vesselTrackBulkWriter,
|
||||
AbnormalTrackWriter abnormalTrackWriter,
|
||||
AbnormalTrackDetector abnormalTrackDetector) {
|
||||
AbnormalTrackDetector abnormalTrackDetector,
|
||||
HourlyTrackCache hourlyTrackCache) {
|
||||
this.jobRepository = jobRepository;
|
||||
this.queryDataSource = queryDataSource;
|
||||
this.transactionManager = transactionManager;
|
||||
this.vesselTrackBulkWriter = vesselTrackBulkWriter;
|
||||
this.abnormalTrackWriter = abnormalTrackWriter;
|
||||
this.abnormalTrackDetector = abnormalTrackDetector;
|
||||
this.hourlyTrackCache = hourlyTrackCache;
|
||||
}
|
||||
|
||||
|
||||
@Value("${vessel.batch.chunk-size:5000}")
|
||||
private int chunkSize;
|
||||
|
||||
|
||||
@Bean
|
||||
public Step mergeDailyTracksStep() {
|
||||
// 비정상 궤적 검출은 항상 활성화 (설정 파일로 제어)
|
||||
boolean detectAbnormal = true;
|
||||
|
||||
if (detectAbnormal) {
|
||||
log.info("Building mergeDailyTracksStep with abnormal detection enabled");
|
||||
return new StepBuilder("mergeDailyTracksStep", jobRepository)
|
||||
.<VesselTrack.VesselKey, AbnormalDetectionResult>chunk(chunkSize, transactionManager)
|
||||
.reader(dailyVesselKeyReader(null, null))
|
||||
.processor(dailyTrackProcessorWithAbnormalDetection())
|
||||
.writer(dailyCompositeTrackWriter())
|
||||
.build();
|
||||
} else {
|
||||
log.info("Building mergeDailyTracksStep without abnormal detection");
|
||||
return new StepBuilder("mergeDailyTracksStep", jobRepository)
|
||||
.<VesselTrack.VesselKey, VesselTrack>chunk(chunkSize, transactionManager)
|
||||
.reader(dailyVesselKeyReader(null, null))
|
||||
.processor(dailyTrackItemProcessor())
|
||||
.writer(dailyTrackWriter())
|
||||
.build();
|
||||
}
|
||||
log.info("Building mergeDailyTracksStep with cache-based in-memory merge");
|
||||
return new StepBuilder("mergeDailyTracksStep", jobRepository)
|
||||
.<List<VesselTrack>, AbnormalDetectionResult>chunk(chunkSize, transactionManager)
|
||||
.reader(cacheBasedDailyTrackReader(null, null))
|
||||
.processor(dailyTrackMergeProcessor(null))
|
||||
.writer(dailyCompositeTrackWriter())
|
||||
.listener(dailyTrackMergeProcessor(null))
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
@StepScope
|
||||
public CacheBasedDailyTrackReader cacheBasedDailyTrackReader(
|
||||
@Value("#{jobParameters['startTime']}") String startTime,
|
||||
@Value("#{jobParameters['endTime']}") String endTime) {
|
||||
|
||||
LocalDateTime start = LocalDateTime.parse(startTime);
|
||||
LocalDateTime end = LocalDateTime.parse(endTime);
|
||||
|
||||
return new CacheBasedDailyTrackReader(
|
||||
hourlyTrackCache,
|
||||
new JdbcTemplate(queryDataSource),
|
||||
start,
|
||||
end);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@StepScope
|
||||
public DailyTrackMergeProcessor dailyTrackMergeProcessor(
|
||||
@Value("#{jobParameters['startTime']}") String startTime) {
|
||||
|
||||
LocalDateTime dayBucket = LocalDateTime.parse(startTime)
|
||||
.withHour(0).withMinute(0).withSecond(0).withNano(0);
|
||||
|
||||
return new DailyTrackMergeProcessor(
|
||||
abnormalTrackDetector,
|
||||
new JdbcTemplate(queryDataSource),
|
||||
dayBucket);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ItemWriter<AbnormalDetectionResult> dailyCompositeTrackWriter() {
|
||||
abnormalTrackWriter.setJobName("dailyAggregationJob");
|
||||
return new CompositeTrackWriter(
|
||||
vesselTrackBulkWriter,
|
||||
abnormalTrackWriter,
|
||||
"daily"
|
||||
);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Step gridDailySummaryStep() {
|
||||
return new StepBuilder("gridDailySummaryStep", jobRepository)
|
||||
@ -96,7 +123,7 @@ public class DailyAggregationStepConfig {
|
||||
.writer(dailyGridWriter(null, null))
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Step areaDailySummaryStep() {
|
||||
return new StepBuilder("areaDailySummaryStep", jobRepository)
|
||||
@ -106,75 +133,24 @@ public class DailyAggregationStepConfig {
|
||||
.writer(dailyAreaWriter(null, null))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@StepScope
|
||||
public JdbcCursorItemReader<VesselTrack.VesselKey> dailyVesselKeyReader(
|
||||
@Value("#{jobParameters['startTime']}") String startTime,
|
||||
@Value("#{jobParameters['endTime']}") String endTime) {
|
||||
|
||||
LocalDateTime start = LocalDateTime.parse(startTime);
|
||||
LocalDateTime end = LocalDateTime.parse(endTime);
|
||||
|
||||
String sql = """
|
||||
SELECT DISTINCT mmsi, date_trunc('day', time_bucket) as day_bucket
|
||||
FROM signal.t_vessel_tracks_hourly
|
||||
WHERE time_bucket >= ? AND time_bucket < ?
|
||||
ORDER BY mmsi, day_bucket
|
||||
""";
|
||||
|
||||
return new JdbcCursorItemReaderBuilder<VesselTrack.VesselKey>()
|
||||
.name("dailyVesselKeyReader")
|
||||
.dataSource(queryDataSource)
|
||||
.sql(sql)
|
||||
.preparedStatementSetter(ps -> {
|
||||
ps.setTimestamp(1, java.sql.Timestamp.valueOf(start));
|
||||
ps.setTimestamp(2, java.sql.Timestamp.valueOf(end));
|
||||
})
|
||||
.rowMapper((rs, rowNum) -> new VesselTrack.VesselKey(
|
||||
rs.getString("mmsi"),
|
||||
rs.getObject("day_bucket", LocalDateTime.class)
|
||||
))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ItemProcessor<VesselTrack.VesselKey, VesselTrack> dailyTrackItemProcessor() {
|
||||
return new DailyTrackProcessor(queryDataSource, new JdbcTemplate(queryDataSource));
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ItemWriter<VesselTrack> dailyTrackWriter() {
|
||||
return items -> {
|
||||
List<VesselTrack> tracks = new ArrayList<>();
|
||||
for (VesselTrack track : items) {
|
||||
if (track != null) {
|
||||
tracks.add(track);
|
||||
}
|
||||
}
|
||||
if (!tracks.isEmpty()) {
|
||||
vesselTrackBulkWriter.writeDailyTracks(tracks);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Bean
|
||||
@StepScope
|
||||
public JdbcCursorItemReader<Integer> dailyGridReader(
|
||||
public org.springframework.batch.item.database.JdbcCursorItemReader<Integer> dailyGridReader(
|
||||
@Value("#{jobParameters['startTime']}") String startTime,
|
||||
@Value("#{jobParameters['endTime']}") String endTime) {
|
||||
|
||||
|
||||
LocalDateTime start = LocalDateTime.parse(startTime);
|
||||
LocalDateTime end = LocalDateTime.parse(endTime);
|
||||
|
||||
|
||||
String sql = """
|
||||
SELECT DISTINCT haegu_no
|
||||
FROM signal.t_grid_tracks_summary_hourly
|
||||
WHERE time_bucket >= ? AND time_bucket < ?
|
||||
ORDER BY haegu_no
|
||||
""";
|
||||
|
||||
return new JdbcCursorItemReaderBuilder<Integer>()
|
||||
|
||||
return new org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder<Integer>()
|
||||
.name("dailyGridReader")
|
||||
.dataSource(queryDataSource)
|
||||
.sql(sql)
|
||||
@ -185,36 +161,36 @@ public class DailyAggregationStepConfig {
|
||||
.rowMapper((rs, rowNum) -> rs.getInt("haegu_no"))
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public ItemProcessor<Integer, DailyGridSummary> dailyGridProcessor() {
|
||||
public org.springframework.batch.item.ItemProcessor<Integer, DailyGridSummary> dailyGridProcessor() {
|
||||
return haeguNo -> {
|
||||
DailyGridSummary summary = new DailyGridSummary();
|
||||
summary.haeguNo = haeguNo;
|
||||
return summary;
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
@StepScope
|
||||
public ItemWriter<DailyGridSummary> dailyGridWriter(
|
||||
@Value("#{jobParameters['startTime']}") String startTime,
|
||||
@Value("#{jobParameters['endTime']}") String endTime) {
|
||||
|
||||
|
||||
return items -> {
|
||||
LocalDateTime start = LocalDateTime.parse(startTime);
|
||||
LocalDateTime end = LocalDateTime.parse(endTime);
|
||||
LocalDateTime dayBucket = start.withHour(0).withMinute(0).withSecond(0).withNano(0);
|
||||
|
||||
|
||||
JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource);
|
||||
|
||||
|
||||
for (DailyGridSummary summary : items) {
|
||||
if (summary == null) continue;
|
||||
|
||||
|
||||
String sql = """
|
||||
INSERT INTO signal.t_grid_tracks_summary_daily
|
||||
(haegu_no, time_bucket, total_vessels, total_distance_nm, avg_speed, vessel_list, created_at)
|
||||
SELECT
|
||||
SELECT
|
||||
haegu_no,
|
||||
?::timestamp as time_bucket,
|
||||
COUNT(DISTINCT vessel_key) as total_vessels,
|
||||
@ -239,29 +215,29 @@ public class DailyAggregationStepConfig {
|
||||
vessel_list = EXCLUDED.vessel_list,
|
||||
created_at = NOW()
|
||||
""";
|
||||
|
||||
|
||||
jdbcTemplate.update(sql, dayBucket, summary.haeguNo, start, end);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
@StepScope
|
||||
public JdbcCursorItemReader<String> dailyAreaReader(
|
||||
public org.springframework.batch.item.database.JdbcCursorItemReader<String> dailyAreaReader(
|
||||
@Value("#{jobParameters['startTime']}") String startTime,
|
||||
@Value("#{jobParameters['endTime']}") String endTime) {
|
||||
|
||||
|
||||
LocalDateTime start = LocalDateTime.parse(startTime);
|
||||
LocalDateTime end = LocalDateTime.parse(endTime);
|
||||
|
||||
|
||||
String sql = """
|
||||
SELECT DISTINCT area_id
|
||||
FROM signal.t_area_tracks_summary_hourly
|
||||
WHERE time_bucket >= ? AND time_bucket < ?
|
||||
ORDER BY area_id
|
||||
""";
|
||||
|
||||
return new JdbcCursorItemReaderBuilder<String>()
|
||||
|
||||
return new org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder<String>()
|
||||
.name("dailyAreaReader")
|
||||
.dataSource(queryDataSource)
|
||||
.sql(sql)
|
||||
@ -272,36 +248,36 @@ public class DailyAggregationStepConfig {
|
||||
.rowMapper((rs, rowNum) -> rs.getString("area_id"))
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public ItemProcessor<String, DailyAreaSummary> dailyAreaProcessor() {
|
||||
public org.springframework.batch.item.ItemProcessor<String, DailyAreaSummary> dailyAreaProcessor() {
|
||||
return areaId -> {
|
||||
DailyAreaSummary summary = new DailyAreaSummary();
|
||||
summary.areaId = areaId;
|
||||
return summary;
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
@StepScope
|
||||
public ItemWriter<DailyAreaSummary> dailyAreaWriter(
|
||||
@Value("#{jobParameters['startTime']}") String startTime,
|
||||
@Value("#{jobParameters['endTime']}") String endTime) {
|
||||
|
||||
|
||||
return items -> {
|
||||
LocalDateTime start = LocalDateTime.parse(startTime);
|
||||
LocalDateTime end = LocalDateTime.parse(endTime);
|
||||
LocalDateTime dayBucket = start.withHour(0).withMinute(0).withSecond(0).withNano(0);
|
||||
|
||||
|
||||
JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource);
|
||||
|
||||
|
||||
for (DailyAreaSummary summary : items) {
|
||||
if (summary == null) continue;
|
||||
|
||||
|
||||
String sql = """
|
||||
INSERT INTO signal.t_area_tracks_summary_daily
|
||||
(area_id, time_bucket, total_vessels, total_distance_nm, avg_speed, vessel_list, created_at)
|
||||
SELECT
|
||||
SELECT
|
||||
area_id,
|
||||
?::timestamp as time_bucket,
|
||||
COUNT(DISTINCT vessel_key) as total_vessels,
|
||||
@ -326,39 +302,18 @@ public class DailyAggregationStepConfig {
|
||||
vessel_list = EXCLUDED.vessel_list,
|
||||
created_at = NOW()
|
||||
""";
|
||||
|
||||
|
||||
jdbcTemplate.update(sql, dayBucket, summary.areaId, start, end);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// 비정상 궤적 검출 관련 빈 정의
|
||||
@Bean
|
||||
public ItemProcessor<VesselTrack.VesselKey, AbnormalDetectionResult> dailyTrackProcessorWithAbnormalDetection() {
|
||||
return new DailyTrackProcessorWithAbnormalDetection(
|
||||
dailyTrackItemProcessor(),
|
||||
abnormalTrackDetector,
|
||||
queryDataSource
|
||||
);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ItemWriter<AbnormalDetectionResult> dailyCompositeTrackWriter() {
|
||||
// Job 이름 직접 설정
|
||||
abnormalTrackWriter.setJobName("dailyAggregationJob");
|
||||
return new CompositeTrackWriter(
|
||||
vesselTrackBulkWriter,
|
||||
abnormalTrackWriter,
|
||||
"daily"
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
// Summary 클래스들
|
||||
public static class DailyGridSummary {
|
||||
public Integer haeguNo;
|
||||
}
|
||||
|
||||
|
||||
public static class DailyAreaSummary {
|
||||
public String areaId;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,136 +0,0 @@
|
||||
package gc.mda.signal_batch.batch.processor;
|
||||
|
||||
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
|
||||
import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetectionResult;
|
||||
import gc.mda.signal_batch.global.util.LineStringMUtils;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.item.ItemProcessor;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
||||
|
||||
/**
|
||||
* 시간별/일별 궤적 프로세서 기본 클래스 - 비정상 궤적 검출 기능 포함
|
||||
*/
|
||||
@Slf4j
|
||||
@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true)
|
||||
@RequiredArgsConstructor
|
||||
public abstract class BaseTrackProcessorWithAbnormalDetection implements ItemProcessor<VesselTrack.VesselKey, AbnormalDetectionResult> {
|
||||
|
||||
protected final ItemProcessor<VesselTrack.VesselKey, VesselTrack> trackProcessor;
|
||||
protected final AbnormalTrackDetector abnormalTrackDetector;
|
||||
protected final DataSource queryDataSource;
|
||||
|
||||
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
@Override
|
||||
public AbnormalDetectionResult process(VesselTrack.VesselKey vesselKey) throws Exception {
|
||||
// 기존 프로세서로 궤적 생성
|
||||
VesselTrack track = trackProcessor.process(vesselKey);
|
||||
|
||||
if (track == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 이전 bucket의 마지막 궤적 조회
|
||||
VesselTrack previousTrack = getPreviousBucketLastTrack(vesselKey);
|
||||
|
||||
// Bucket 간 연결점만 검사 (하위 데이터는 이미 검증됨)
|
||||
AbnormalDetectionResult result = abnormalTrackDetector.detectBucketTransitionOnly(track, previousTrack);
|
||||
|
||||
if (result.hasAbnormalities()) {
|
||||
log.debug("Abnormal track detected for vessel {} at {}: {}",
|
||||
track.getMmsi(), track.getTimeBucket(),
|
||||
result.getAbnormalSegments().size());
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 이전 버킷의 마지막 궤적 조회
|
||||
*/
|
||||
protected VesselTrack getPreviousBucketLastTrack(VesselTrack.VesselKey vesselKey) {
|
||||
try {
|
||||
String sql = """
|
||||
SELECT mmsi, time_bucket,
|
||||
end_position,
|
||||
public.ST_AsText(public.ST_LineSubstring(track_geom, 0.9, 1.0)) as last_segment
|
||||
FROM %s
|
||||
WHERE mmsi = ?
|
||||
AND time_bucket >= ?
|
||||
AND time_bucket < ?
|
||||
ORDER BY time_bucket DESC
|
||||
LIMIT 1
|
||||
""".formatted(getPreviousTrackTableName());
|
||||
|
||||
LocalDateTime currentBucket = getNormalizedBucket(vesselKey.getTimeBucket());
|
||||
LocalDateTime previousBucket = getPreviousBucket(currentBucket);
|
||||
|
||||
// Convert to java.sql.Timestamp for proper PostgreSQL type handling
|
||||
Timestamp previousBucketTimestamp = Timestamp.valueOf(previousBucket);
|
||||
Timestamp currentBucketTimestamp = Timestamp.valueOf(currentBucket);
|
||||
|
||||
JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource);
|
||||
return jdbcTemplate.queryForObject(sql,
|
||||
(rs, rowNum) -> {
|
||||
return VesselTrack.builder()
|
||||
.mmsi(rs.getString("mmsi"))
|
||||
.timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime())
|
||||
.trackGeom(rs.getString("last_segment"))
|
||||
.endPosition(parseEndPosition(rs.getString("end_position")))
|
||||
.build();
|
||||
},
|
||||
vesselKey.getMmsi(), previousBucketTimestamp, currentBucketTimestamp
|
||||
);
|
||||
} catch (Exception e) {
|
||||
log.debug("No previous bucket track found for vessel {}", vesselKey);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* JSON 형식의 end_position 파싱
|
||||
*/
|
||||
protected VesselTrack.TrackPosition parseEndPosition(String json) {
|
||||
if (json == null) return null;
|
||||
try {
|
||||
String lat = LineStringMUtils.extractJsonValue(json, "lat");
|
||||
String lon = LineStringMUtils.extractJsonValue(json, "lon");
|
||||
String time = LineStringMUtils.extractJsonValue(json, "time");
|
||||
String sog = LineStringMUtils.extractJsonValue(json, "sog");
|
||||
|
||||
return VesselTrack.TrackPosition.builder()
|
||||
.lat(lat != null ? Double.parseDouble(lat) : null)
|
||||
.lon(lon != null ? Double.parseDouble(lon) : null)
|
||||
.time(time != null ? LocalDateTime.parse(time, TIMESTAMP_FORMATTER) : null)
|
||||
.sog(sog != null ? new BigDecimal(sog) : null)
|
||||
.build();
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to parse end position: {}", json, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 이전 트랙을 조회할 테이블명 반환 (하위 클래스에서 구현)
|
||||
*/
|
||||
protected abstract String getPreviousTrackTableName();
|
||||
|
||||
/**
|
||||
* 정규화된 버킷 시간 반환 (하위 클래스에서 구현)
|
||||
*/
|
||||
protected abstract LocalDateTime getNormalizedBucket(LocalDateTime timeBucket);
|
||||
|
||||
/**
|
||||
* 이전 버킷 시간 계산 (하위 클래스에서 구현)
|
||||
*/
|
||||
protected abstract LocalDateTime getPreviousBucket(LocalDateTime currentBucket);
|
||||
}
|
||||
@ -0,0 +1,298 @@
|
||||
package gc.mda.signal_batch.batch.processor;
|
||||
|
||||
import gc.mda.signal_batch.batch.processor.AbnormalTrackDetector.AbnormalDetectionResult;
|
||||
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
|
||||
import gc.mda.signal_batch.global.util.LineStringMUtils;
|
||||
import gc.mda.signal_batch.global.util.TrackSimplificationUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.core.ExitStatus;
|
||||
import org.springframework.batch.core.StepExecution;
|
||||
import org.springframework.batch.core.StepExecutionListener;
|
||||
import org.springframework.batch.item.ItemProcessor;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.RoundingMode;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* 인메모리 기반 Daily Track 병합 프로세서
|
||||
*
|
||||
* Hourly 트랙 리스트를 받아:
|
||||
* 1. WKT 좌표 연결 (Java String)
|
||||
* 2. 통계 집계 (distance, speed, pointCount)
|
||||
* 3. 간소화 (TrackSimplificationUtils.simplifyDailyTrack)
|
||||
* 4. 비정상 검출 (이전 날짜 1회 bulk prefetch)
|
||||
*
|
||||
* N+1 SQL 제거 → DB 쿼리 최대 1회 (비정상 검출용 이전 날짜 prefetch)
|
||||
*/
|
||||
@Slf4j
|
||||
public class DailyTrackMergeProcessor
|
||||
implements ItemProcessor<List<VesselTrack>, AbnormalDetectionResult>, StepExecutionListener {
|
||||
|
||||
private static final Pattern WKT_COORDS_PATTERN = Pattern.compile("LINESTRING M\\((.+)\\)");
|
||||
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
private final AbnormalTrackDetector abnormalTrackDetector;
|
||||
private final JdbcTemplate queryJdbcTemplate;
|
||||
private final LocalDateTime dayBucket;
|
||||
|
||||
// Lazy-init: 이전 날짜 데이터 1회 bulk prefetch
|
||||
private Map<String, VesselTrack> previousDayCache;
|
||||
private boolean previousDayLoaded = false;
|
||||
|
||||
// Step 레벨 집계 카운터
|
||||
private int totalProcessed = 0;
|
||||
private int mergeFailCount = 0;
|
||||
private int simplifiedCount = 0;
|
||||
private int abnormalCount = 0;
|
||||
private int avgSpeedFailCount = 0;
|
||||
|
||||
public DailyTrackMergeProcessor(
|
||||
AbnormalTrackDetector abnormalTrackDetector,
|
||||
JdbcTemplate queryJdbcTemplate,
|
||||
LocalDateTime dayBucket) {
|
||||
this.abnormalTrackDetector = abnormalTrackDetector;
|
||||
this.queryJdbcTemplate = queryJdbcTemplate;
|
||||
this.dayBucket = dayBucket;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbnormalDetectionResult process(List<VesselTrack> hourlyTracks) throws Exception {
|
||||
if (hourlyTracks == null || hourlyTracks.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String mmsi = hourlyTracks.get(0).getMmsi();
|
||||
totalProcessed++;
|
||||
|
||||
// Step 1: WKT 좌표 병합
|
||||
String mergedWkt = mergeTrackGeometries(hourlyTracks);
|
||||
if (mergedWkt == null) {
|
||||
mergeFailCount++;
|
||||
return null;
|
||||
}
|
||||
|
||||
// Step 2: 통계 집계
|
||||
BigDecimal totalDistance = BigDecimal.ZERO;
|
||||
BigDecimal maxSpeed = BigDecimal.ZERO;
|
||||
int totalPoints = 0;
|
||||
|
||||
for (VesselTrack track : hourlyTracks) {
|
||||
if (track.getDistanceNm() != null) {
|
||||
totalDistance = totalDistance.add(track.getDistanceNm());
|
||||
}
|
||||
if (track.getMaxSpeed() != null && track.getMaxSpeed().compareTo(maxSpeed) > 0) {
|
||||
maxSpeed = track.getMaxSpeed();
|
||||
}
|
||||
if (track.getPointCount() != null) {
|
||||
totalPoints += track.getPointCount();
|
||||
}
|
||||
}
|
||||
|
||||
// avgSpeed: M값 기반 시간 차이로 계산
|
||||
BigDecimal avgSpeed = calculateAvgSpeed(mergedWkt, totalDistance);
|
||||
|
||||
VesselTrack.TrackPosition startPos = hourlyTracks.get(0).getStartPosition();
|
||||
VesselTrack.TrackPosition endPos = hourlyTracks.get(hourlyTracks.size() - 1).getEndPosition();
|
||||
|
||||
// Step 3: 일별 간소화
|
||||
String simplifiedWkt = TrackSimplificationUtils.simplifyDailyTrack(mergedWkt);
|
||||
int simplifiedPoints = countWktPoints(simplifiedWkt);
|
||||
|
||||
if (!mergedWkt.equals(simplifiedWkt)) {
|
||||
simplifiedCount++;
|
||||
}
|
||||
|
||||
VesselTrack dailyTrack = VesselTrack.builder()
|
||||
.mmsi(mmsi)
|
||||
.timeBucket(dayBucket)
|
||||
.trackGeom(simplifiedWkt)
|
||||
.distanceNm(totalDistance)
|
||||
.avgSpeed(avgSpeed)
|
||||
.maxSpeed(maxSpeed)
|
||||
.pointCount(simplifiedPoints > 0 ? simplifiedPoints : totalPoints)
|
||||
.startPosition(startPos)
|
||||
.endPosition(endPos)
|
||||
.build();
|
||||
|
||||
// Step 4: 비정상 검출 (lazy-init, 1회 bulk prefetch)
|
||||
if (!previousDayLoaded) {
|
||||
previousDayCache = bulkFetchPreviousDayLastTracks();
|
||||
previousDayLoaded = true;
|
||||
}
|
||||
|
||||
VesselTrack prevTrack = previousDayCache.get(mmsi);
|
||||
AbnormalDetectionResult result = abnormalTrackDetector.detectBucketTransitionOnly(dailyTrack, prevTrack);
|
||||
|
||||
if (result.hasAbnormalities()) {
|
||||
abnormalCount++;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Hourly 트랙들의 WKT 좌표를 하나로 연결
|
||||
*/
|
||||
private String mergeTrackGeometries(List<VesselTrack> tracks) {
|
||||
StringBuilder allCoords = new StringBuilder();
|
||||
|
||||
for (VesselTrack track : tracks) {
|
||||
String wkt = track.getTrackGeom();
|
||||
if (wkt == null || wkt.isEmpty()) continue;
|
||||
|
||||
Matcher matcher = WKT_COORDS_PATTERN.matcher(wkt);
|
||||
if (matcher.find()) {
|
||||
String coords = matcher.group(1);
|
||||
if (!coords.isBlank()) {
|
||||
if (allCoords.length() > 0) {
|
||||
allCoords.append(", ");
|
||||
}
|
||||
allCoords.append(coords);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (allCoords.length() == 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return "LINESTRING M(" + allCoords + ")";
|
||||
}
|
||||
|
||||
/**
|
||||
* M값(Unix timestamp) 기반 평균 속도 계산
|
||||
*/
|
||||
private BigDecimal calculateAvgSpeed(String wkt, BigDecimal totalDistance) {
|
||||
try {
|
||||
Matcher matcher = WKT_COORDS_PATTERN.matcher(wkt);
|
||||
if (!matcher.find()) return BigDecimal.ZERO;
|
||||
|
||||
String coords = matcher.group(1);
|
||||
String[] points = coords.split(",");
|
||||
if (points.length < 2) return BigDecimal.ZERO;
|
||||
|
||||
// 첫 번째 포인트의 M값
|
||||
String[] firstParts = points[0].trim().split("\\s+");
|
||||
double firstM = firstParts.length >= 3 ? Double.parseDouble(firstParts[2]) : 0;
|
||||
|
||||
// 마지막 포인트의 M값
|
||||
String[] lastParts = points[points.length - 1].trim().split("\\s+");
|
||||
double lastM = lastParts.length >= 3 ? Double.parseDouble(lastParts[2]) : 0;
|
||||
|
||||
double timeDiffSeconds = lastM - firstM;
|
||||
if (timeDiffSeconds <= 0) return BigDecimal.ZERO;
|
||||
|
||||
double timeDiffHours = timeDiffSeconds / 3600.0;
|
||||
double avgSpeedVal = totalDistance.doubleValue() / timeDiffHours;
|
||||
|
||||
// 비현실적 속도 제한
|
||||
avgSpeedVal = Math.min(avgSpeedVal, 9999.99);
|
||||
|
||||
return BigDecimal.valueOf(avgSpeedVal).setScale(2, RoundingMode.HALF_UP);
|
||||
} catch (Exception e) {
|
||||
avgSpeedFailCount++;
|
||||
return BigDecimal.ZERO;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeStep(StepExecution stepExecution) {
|
||||
// no-op
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExitStatus afterStep(StepExecution stepExecution) {
|
||||
log.info("Daily 병합 처리 집계 — 총: {}, 병합실패: {}, 간소화: {}, 비정상: {}, avgSpeed실패: {}",
|
||||
totalProcessed, mergeFailCount, simplifiedCount, abnormalCount, avgSpeedFailCount);
|
||||
return null;
|
||||
}
|
||||
|
||||
private int countWktPoints(String wkt) {
|
||||
if (wkt == null || !wkt.startsWith("LINESTRING M")) return 0;
|
||||
try {
|
||||
String coords = wkt.substring("LINESTRING M(".length(), wkt.length() - 1);
|
||||
return coords.split(",").length;
|
||||
} catch (Exception e) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 비정상 검출용 — 전일(dayBucket-1일)의 MMSI별 마지막 hourly 트랙 bulk prefetch
|
||||
*/
|
||||
private Map<String, VesselTrack> bulkFetchPreviousDayLastTracks() {
|
||||
LocalDateTime prevStart = dayBucket.minusDays(1);
|
||||
LocalDateTime prevEnd = dayBucket;
|
||||
|
||||
String sql = """
|
||||
SELECT DISTINCT ON (mmsi)
|
||||
mmsi, time_bucket, end_position,
|
||||
public.ST_AsText(public.ST_LineSubstring(track_geom, 0.9, 1.0)) as last_segment
|
||||
FROM signal.t_vessel_tracks_hourly
|
||||
WHERE time_bucket >= ? AND time_bucket < ?
|
||||
AND track_geom IS NOT NULL
|
||||
ORDER BY mmsi, time_bucket DESC
|
||||
""";
|
||||
|
||||
Map<String, VesselTrack> result = new HashMap<>();
|
||||
int[] parseFailCount = {0};
|
||||
|
||||
try {
|
||||
queryJdbcTemplate.query(sql,
|
||||
ps -> {
|
||||
ps.setTimestamp(1, Timestamp.valueOf(prevStart));
|
||||
ps.setTimestamp(2, Timestamp.valueOf(prevEnd));
|
||||
},
|
||||
rs -> {
|
||||
String mmsi = rs.getString("mmsi");
|
||||
VesselTrack.TrackPosition endPos = parseEndPosition(rs.getString("end_position"));
|
||||
if (endPos == null && rs.getString("end_position") != null) {
|
||||
parseFailCount[0]++;
|
||||
}
|
||||
VesselTrack track = VesselTrack.builder()
|
||||
.mmsi(mmsi)
|
||||
.timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime())
|
||||
.trackGeom(rs.getString("last_segment"))
|
||||
.endPosition(endPos)
|
||||
.build();
|
||||
result.put(mmsi, track);
|
||||
});
|
||||
|
||||
log.info("전일 트랙 prefetch 완료: {} 선박 (기간: {} ~ {})",
|
||||
result.size(), prevStart, prevEnd);
|
||||
if (parseFailCount[0] > 0) {
|
||||
log.debug("end_position 파싱 실패: {} 건", parseFailCount[0]);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("전일 트랙 prefetch 실패 (첫 실행일 수 있음): {}", e.getMessage());
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private VesselTrack.TrackPosition parseEndPosition(String json) {
|
||||
if (json == null) return null;
|
||||
try {
|
||||
String lat = LineStringMUtils.extractJsonValue(json, "lat");
|
||||
String lon = LineStringMUtils.extractJsonValue(json, "lon");
|
||||
String time = LineStringMUtils.extractJsonValue(json, "time");
|
||||
String sog = LineStringMUtils.extractJsonValue(json, "sog");
|
||||
|
||||
return VesselTrack.TrackPosition.builder()
|
||||
.lat(lat != null ? Double.parseDouble(lat) : null)
|
||||
.lon(lon != null ? Double.parseDouble(lon) : null)
|
||||
.time(time != null ? LocalDateTime.parse(time, TIMESTAMP_FORMATTER) : null)
|
||||
.sog(sog != null ? new BigDecimal(sog) : null)
|
||||
.build();
|
||||
} catch (Exception e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,197 +0,0 @@
|
||||
package gc.mda.signal_batch.batch.processor;
|
||||
|
||||
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.item.ItemProcessor;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import gc.mda.signal_batch.global.util.LineStringMUtils;
|
||||
import gc.mda.signal_batch.global.util.TrackSimplificationUtils;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
|
||||
|
||||
@Slf4j
|
||||
@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true)
|
||||
@RequiredArgsConstructor
|
||||
public class DailyTrackProcessor implements ItemProcessor<VesselTrack.VesselKey, VesselTrack> {
|
||||
|
||||
private final DataSource queryDataSource;
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
@Override
|
||||
public VesselTrack process(VesselTrack.VesselKey vesselKey) throws Exception {
|
||||
LocalDateTime dayBucket = vesselKey.getTimeBucket()
|
||||
.withHour(0)
|
||||
.withMinute(0)
|
||||
.withSecond(0)
|
||||
.withNano(0);
|
||||
|
||||
String sql = """
|
||||
WITH ordered_tracks AS (
|
||||
SELECT *
|
||||
FROM signal.t_vessel_tracks_hourly
|
||||
WHERE mmsi = ?
|
||||
AND time_bucket >= ?
|
||||
AND time_bucket < ?
|
||||
AND track_geom IS NOT NULL
|
||||
AND public.ST_NPoints(track_geom) > 0
|
||||
ORDER BY time_bucket
|
||||
),
|
||||
merged_coords AS (
|
||||
SELECT
|
||||
mmsi,
|
||||
string_agg(
|
||||
substring(public.ST_AsText(track_geom) from 'M \\((.+)\\)'),
|
||||
','
|
||||
ORDER BY time_bucket
|
||||
) FILTER (WHERE track_geom IS NOT NULL) as all_coords
|
||||
FROM ordered_tracks
|
||||
GROUP BY mmsi
|
||||
),
|
||||
merged_tracks AS (
|
||||
SELECT
|
||||
mc.mmsi,
|
||||
TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS') as time_bucket,
|
||||
public.ST_GeomFromText('LINESTRING M(' || mc.all_coords || ')') as merged_geom,
|
||||
(SELECT MAX(max_speed) FROM ordered_tracks WHERE mmsi = mc.mmsi) as max_speed,
|
||||
(SELECT SUM(point_count) FROM ordered_tracks WHERE mmsi = mc.mmsi) as total_points,
|
||||
(SELECT MIN(time_bucket) FROM ordered_tracks WHERE mmsi = mc.mmsi) as start_time,
|
||||
(SELECT MAX(time_bucket) FROM ordered_tracks WHERE mmsi = mc.mmsi) as end_time,
|
||||
(SELECT start_position FROM ordered_tracks WHERE mmsi = mc.mmsi ORDER BY time_bucket LIMIT 1) as start_pos,
|
||||
(SELECT end_position FROM ordered_tracks WHERE mmsi = mc.mmsi ORDER BY time_bucket DESC LIMIT 1) as end_pos
|
||||
FROM merged_coords mc
|
||||
),
|
||||
calculated_tracks AS (
|
||||
SELECT
|
||||
*,
|
||||
public.ST_Length(merged_geom::geography) / 1852.0 as total_distance,
|
||||
CASE
|
||||
WHEN public.ST_NPoints(merged_geom) > 0 THEN
|
||||
public.ST_M(public.ST_PointN(merged_geom, public.ST_NPoints(merged_geom))) -
|
||||
public.ST_M(public.ST_PointN(merged_geom, 1))
|
||||
ELSE
|
||||
EXTRACT(EPOCH FROM
|
||||
TO_TIMESTAMP(end_pos->>'time', 'YYYY-MM-DD HH24:MI:SS') - TO_TIMESTAMP(start_pos->>'time', 'YYYY-MM-DD HH24:MI:SS')
|
||||
)
|
||||
END as time_diff_seconds
|
||||
FROM merged_tracks
|
||||
)
|
||||
SELECT
|
||||
mmsi,
|
||||
time_bucket,
|
||||
merged_geom,
|
||||
total_distance,
|
||||
CASE
|
||||
WHEN time_diff_seconds > 0 THEN
|
||||
CAST(LEAST((total_distance / (time_diff_seconds / 3600.0)), 9999.99) AS numeric(6,2))
|
||||
ELSE 0
|
||||
END as avg_speed,
|
||||
max_speed,
|
||||
total_points,
|
||||
start_time,
|
||||
end_time,
|
||||
start_pos,
|
||||
end_pos,
|
||||
public.ST_AsText(merged_geom) as geom_text
|
||||
FROM calculated_tracks
|
||||
""";
|
||||
|
||||
LocalDateTime startTime = dayBucket;
|
||||
LocalDateTime endTime = dayBucket.plusDays(1);
|
||||
|
||||
Timestamp startTimestamp = Timestamp.valueOf(startTime);
|
||||
Timestamp endTimestamp = Timestamp.valueOf(endTime);
|
||||
Timestamp dayBucketTimestamp = Timestamp.valueOf(dayBucket);
|
||||
|
||||
log.debug("DailyTrackProcessor params - mmsi: {}, startTime: {}, endTime: {}, dayBucket: {}",
|
||||
vesselKey.getMmsi(), startTimestamp, endTimestamp, dayBucketTimestamp);
|
||||
|
||||
try {
|
||||
return jdbcTemplate.queryForObject(sql,
|
||||
(rs, rowNum) -> {
|
||||
try {
|
||||
return buildDailyTrack(rs, dayBucket);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to build daily track", e);
|
||||
}
|
||||
},
|
||||
vesselKey.getMmsi(),
|
||||
startTimestamp, endTimestamp, dayBucketTimestamp
|
||||
);
|
||||
} catch (org.springframework.dao.EmptyResultDataAccessException e) {
|
||||
log.warn("No hourly data found for vessel {} in time range {}-{}, skipping daily aggregation",
|
||||
vesselKey.getMmsi(), startTimestamp, endTimestamp);
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to process daily track for vessel {}: {}",
|
||||
vesselKey.getMmsi(), e.getMessage(), e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private VesselTrack buildDailyTrack(ResultSet rs, LocalDateTime dayBucket) throws Exception {
|
||||
VesselTrack.TrackPosition startPos = null;
|
||||
VesselTrack.TrackPosition endPos = null;
|
||||
|
||||
String startPosJson = rs.getString("start_pos");
|
||||
String endPosJson = rs.getString("end_pos");
|
||||
|
||||
if (startPosJson != null) {
|
||||
startPos = parseTrackPosition(startPosJson);
|
||||
}
|
||||
if (endPosJson != null) {
|
||||
endPos = parseTrackPosition(endPosJson);
|
||||
}
|
||||
|
||||
String dailyLineStringM = rs.getString("geom_text");
|
||||
String simplifiedLineStringM = TrackSimplificationUtils.simplifyDailyTrack(dailyLineStringM);
|
||||
|
||||
if (!dailyLineStringM.equals(simplifiedLineStringM)) {
|
||||
TrackSimplificationUtils.SimplificationStats stats =
|
||||
TrackSimplificationUtils.getSimplificationStats(dailyLineStringM, simplifiedLineStringM);
|
||||
log.debug("일별 궤적 간소화 - vessel: {}, 원본: {}포인트, 간소화: {}포인트 ({}% 감소)",
|
||||
rs.getString("mmsi"),
|
||||
stats.originalPoints, stats.simplifiedPoints, (int)stats.reductionRate);
|
||||
}
|
||||
|
||||
return VesselTrack.builder()
|
||||
.mmsi(rs.getString("mmsi"))
|
||||
.timeBucket(dayBucket)
|
||||
.trackGeom(simplifiedLineStringM)
|
||||
.distanceNm(rs.getBigDecimal("total_distance"))
|
||||
.avgSpeed(rs.getBigDecimal("avg_speed"))
|
||||
.maxSpeed(rs.getBigDecimal("max_speed"))
|
||||
.pointCount(rs.getInt("total_points"))
|
||||
.startPosition(startPos)
|
||||
.endPosition(endPos)
|
||||
.build();
|
||||
}
|
||||
|
||||
private VesselTrack.TrackPosition parseTrackPosition(String json) {
|
||||
try {
|
||||
String latStr = LineStringMUtils.extractJsonValue(json, "lat");
|
||||
String lonStr = LineStringMUtils.extractJsonValue(json, "lon");
|
||||
String timeStr = LineStringMUtils.extractJsonValue(json, "time");
|
||||
String sogStr = LineStringMUtils.extractJsonValue(json, "sog");
|
||||
|
||||
return VesselTrack.TrackPosition.builder()
|
||||
.lat(latStr != null ? Double.parseDouble(latStr) : null)
|
||||
.lon(lonStr != null ? Double.parseDouble(lonStr) : null)
|
||||
.time(timeStr != null ? LocalDateTime.parse(timeStr, TIMESTAMP_FORMATTER) : null)
|
||||
.sog(sogStr != null ? new BigDecimal(sogStr) : null)
|
||||
.build();
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to parse track position: {}", json, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,38 +0,0 @@
|
||||
package gc.mda.signal_batch.batch.processor;
|
||||
|
||||
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.item.ItemProcessor;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import javax.sql.DataSource;
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* 일별 궤적 프로세서 - 비정상 궤적 검출 기능 포함
|
||||
*/
|
||||
@Slf4j
|
||||
@ConditionalOnProperty(name = "vessel.batch.scheduler.enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class DailyTrackProcessorWithAbnormalDetection extends BaseTrackProcessorWithAbnormalDetection {
|
||||
|
||||
public DailyTrackProcessorWithAbnormalDetection(
|
||||
ItemProcessor<VesselTrack.VesselKey, VesselTrack> dailyTrackProcessor,
|
||||
AbnormalTrackDetector abnormalTrackDetector,
|
||||
DataSource queryDataSource) {
|
||||
super(dailyTrackProcessor, abnormalTrackDetector, queryDataSource);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getPreviousTrackTableName() {
|
||||
return "signal.t_vessel_tracks_hourly";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected LocalDateTime getNormalizedBucket(LocalDateTime timeBucket) {
|
||||
return timeBucket.withHour(0).withMinute(0).withSecond(0).withNano(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected LocalDateTime getPreviousBucket(LocalDateTime currentBucket) {
|
||||
return currentBucket.minusDays(1);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,190 @@
|
||||
package gc.mda.signal_batch.batch.reader;
|
||||
|
||||
import gc.mda.signal_batch.domain.vessel.model.VesselTrack;
|
||||
import gc.mda.signal_batch.global.util.LineStringMUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.item.ItemReader;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* 캐시 기반 Daily Track Reader
|
||||
*
|
||||
* HourlyTrackCache(L2)에서 시간별 트랙을 MMSI별로 읽어 반환.
|
||||
* 캐시에 없는 MMSI는 DB fallback으로 보충.
|
||||
*
|
||||
* 정상 운영 시: DB 쿼리 1회 (DISTINCT mmsi 완전성 확인)
|
||||
* 앱 재시작 후: DB 쿼리 2회 (완전성 확인 + fallback 벌크)
|
||||
*/
|
||||
@Slf4j
|
||||
public class CacheBasedDailyTrackReader implements ItemReader<List<VesselTrack>> {
|
||||
|
||||
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
private final HourlyTrackCache hourlyTrackCache;
|
||||
private final JdbcTemplate queryJdbcTemplate;
|
||||
private final LocalDateTime startTime;
|
||||
private final LocalDateTime endTime;
|
||||
|
||||
private Iterator<List<VesselTrack>> groupIterator;
|
||||
private boolean initialized = false;
|
||||
|
||||
public CacheBasedDailyTrackReader(
|
||||
HourlyTrackCache hourlyTrackCache,
|
||||
JdbcTemplate queryJdbcTemplate,
|
||||
LocalDateTime startTime,
|
||||
LocalDateTime endTime) {
|
||||
this.hourlyTrackCache = hourlyTrackCache;
|
||||
this.queryJdbcTemplate = queryJdbcTemplate;
|
||||
this.startTime = startTime;
|
||||
this.endTime = endTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<VesselTrack> read() {
|
||||
if (!initialized) {
|
||||
initialize();
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
if (groupIterator != null && groupIterator.hasNext()) {
|
||||
return groupIterator.next();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void initialize() {
|
||||
// 1. L2 캐시에서 데이터 로드
|
||||
Map<String, List<VesselTrack>> cacheData = hourlyTrackCache.getTracksInRange(startTime, endTime);
|
||||
log.info("Daily Reader 초기화 — 캐시에서 {} 선박 로드 (기간: {} ~ {})",
|
||||
cacheData.size(), startTime, endTime);
|
||||
|
||||
// 2. DB에서 해당 기간 MMSI 목록 조회 (완전성 확인)
|
||||
String distinctSql = """
|
||||
SELECT DISTINCT mmsi FROM signal.t_vessel_tracks_hourly
|
||||
WHERE time_bucket >= ? AND time_bucket < ?
|
||||
""";
|
||||
|
||||
List<String> dbMmsiList = queryJdbcTemplate.queryForList(
|
||||
distinctSql, String.class,
|
||||
Timestamp.valueOf(startTime), Timestamp.valueOf(endTime));
|
||||
Set<String> dbMmsiSet = new HashSet<>(dbMmsiList);
|
||||
|
||||
// 3. 캐시에 없는 MMSI 감지
|
||||
Set<String> cacheMmsiSet = cacheData.keySet();
|
||||
Set<String> missingMmsi = new HashSet<>(dbMmsiSet);
|
||||
missingMmsi.removeAll(cacheMmsiSet);
|
||||
|
||||
if (!missingMmsi.isEmpty()) {
|
||||
log.info("캐시 미스 {} 선박 → DB fallback", missingMmsi.size());
|
||||
Map<String, List<VesselTrack>> fallbackData = fetchFromDb(missingMmsi);
|
||||
|
||||
// 캐시 데이터와 병합
|
||||
Map<String, List<VesselTrack>> merged = new LinkedHashMap<>(cacheData);
|
||||
merged.putAll(fallbackData);
|
||||
cacheData = merged;
|
||||
}
|
||||
|
||||
// 캐시에만 있고 DB에 없는 경우 (stale 캐시) → 제거
|
||||
int staleCount = 0;
|
||||
Iterator<String> it = cacheData.keySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
if (!dbMmsiSet.contains(it.next())) {
|
||||
it.remove();
|
||||
staleCount++;
|
||||
}
|
||||
}
|
||||
if (staleCount > 0) {
|
||||
log.debug("Stale 캐시 항목 {} 건 제거", staleCount);
|
||||
}
|
||||
|
||||
log.info("Daily Reader 준비 완료 — 총 {} 선박 (캐시: {}, DB fallback: {})",
|
||||
cacheData.size(), cacheData.size() - missingMmsi.size(), missingMmsi.size());
|
||||
|
||||
groupIterator = cacheData.values().iterator();
|
||||
}
|
||||
|
||||
/**
|
||||
* 누락된 MMSI의 시간별 트랙을 DB에서 벌크 조회
|
||||
*/
|
||||
private Map<String, List<VesselTrack>> fetchFromDb(Set<String> mmsiSet) {
|
||||
if (mmsiSet.isEmpty()) return Collections.emptyMap();
|
||||
|
||||
String[] mmsiArray = mmsiSet.toArray(new String[0]);
|
||||
|
||||
String sql = """
|
||||
SELECT mmsi, time_bucket,
|
||||
public.ST_AsText(track_geom) as geom_text,
|
||||
distance_nm, avg_speed, max_speed, point_count,
|
||||
start_position, end_position
|
||||
FROM signal.t_vessel_tracks_hourly
|
||||
WHERE mmsi = ANY(?)
|
||||
AND time_bucket >= ? AND time_bucket < ?
|
||||
AND track_geom IS NOT NULL
|
||||
ORDER BY mmsi, time_bucket
|
||||
""";
|
||||
|
||||
Map<String, List<VesselTrack>> result = new LinkedHashMap<>();
|
||||
int[] parseFailCount = {0};
|
||||
|
||||
queryJdbcTemplate.query(sql,
|
||||
ps -> {
|
||||
ps.setArray(1, ps.getConnection().createArrayOf("varchar", mmsiArray));
|
||||
ps.setTimestamp(2, Timestamp.valueOf(startTime));
|
||||
ps.setTimestamp(3, Timestamp.valueOf(endTime));
|
||||
},
|
||||
rs -> {
|
||||
String mmsi = rs.getString("mmsi");
|
||||
VesselTrack.TrackPosition startPos = parseTrackPosition(rs.getString("start_position"));
|
||||
VesselTrack.TrackPosition endPos = parseTrackPosition(rs.getString("end_position"));
|
||||
if ((startPos == null && rs.getString("start_position") != null)
|
||||
|| (endPos == null && rs.getString("end_position") != null)) {
|
||||
parseFailCount[0]++;
|
||||
}
|
||||
VesselTrack track = VesselTrack.builder()
|
||||
.mmsi(mmsi)
|
||||
.timeBucket(rs.getTimestamp("time_bucket").toLocalDateTime())
|
||||
.trackGeom(rs.getString("geom_text"))
|
||||
.distanceNm(rs.getBigDecimal("distance_nm"))
|
||||
.avgSpeed(rs.getBigDecimal("avg_speed"))
|
||||
.maxSpeed(rs.getBigDecimal("max_speed"))
|
||||
.pointCount(rs.getInt("point_count"))
|
||||
.startPosition(startPos)
|
||||
.endPosition(endPos)
|
||||
.build();
|
||||
|
||||
result.computeIfAbsent(mmsi, k -> new ArrayList<>()).add(track);
|
||||
});
|
||||
|
||||
log.info("DB fallback 완료: {} 선박, {} 트랙",
|
||||
result.size(), result.values().stream().mapToInt(List::size).sum());
|
||||
if (parseFailCount[0] > 0) {
|
||||
log.debug("TrackPosition 파싱 실패: {} 건", parseFailCount[0]);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private VesselTrack.TrackPosition parseTrackPosition(String json) {
|
||||
if (json == null) return null;
|
||||
try {
|
||||
String latStr = LineStringMUtils.extractJsonValue(json, "lat");
|
||||
String lonStr = LineStringMUtils.extractJsonValue(json, "lon");
|
||||
String timeStr = LineStringMUtils.extractJsonValue(json, "time");
|
||||
String sogStr = LineStringMUtils.extractJsonValue(json, "sog");
|
||||
|
||||
return VesselTrack.TrackPosition.builder()
|
||||
.lat(latStr != null ? Double.parseDouble(latStr) : null)
|
||||
.lon(lonStr != null ? Double.parseDouble(lonStr) : null)
|
||||
.time(timeStr != null ? LocalDateTime.parse(timeStr, TIMESTAMP_FORMATTER) : null)
|
||||
.sog(sogStr != null ? new BigDecimal(sogStr) : null)
|
||||
.build();
|
||||
} catch (Exception e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
불러오는 중...
Reference in New Issue
Block a user