Merge pull request 'feat: Stale 데이터 비정상 궤적 전환 + vesselStatic N+1 쿼리 제거' (#5) from feature/stale-data-abnormal-track into develop
This commit is contained in:
커밋
bfc4614ce7
@ -162,7 +162,7 @@ public class VesselBatchScheduler {
|
|||||||
JobParameters params = new JobParametersBuilder()
|
JobParameters params = new JobParametersBuilder()
|
||||||
.addString("startTime", startTime.toString())
|
.addString("startTime", startTime.toString())
|
||||||
.addString("endTime", endTime.toString())
|
.addString("endTime", endTime.toString())
|
||||||
.addString("timeBucket", "hourly")
|
.addString("timeBucket", startTime.toString())
|
||||||
.addString("executionTime", now.toString())
|
.addString("executionTime", now.toString())
|
||||||
.addString("enableAbnormalDetection", String.valueOf(abnormalDetectionEnabled))
|
.addString("enableAbnormalDetection", String.valueOf(abnormalDetectionEnabled))
|
||||||
.toJobParameters();
|
.toJobParameters();
|
||||||
|
|||||||
@ -54,6 +54,8 @@ public class VesselStaticStepConfig {
|
|||||||
public Step vesselStaticSyncStep() {
|
public Step vesselStaticSyncStep() {
|
||||||
return new StepBuilder("vesselStaticSyncStep", jobRepository)
|
return new StepBuilder("vesselStaticSyncStep", jobRepository)
|
||||||
.tasklet((contribution, chunkContext) -> {
|
.tasklet((contribution, chunkContext) -> {
|
||||||
|
long stepStart = System.currentTimeMillis();
|
||||||
|
|
||||||
// 1. 캐시에서 전체 데이터 → MMSI별 그룹
|
// 1. 캐시에서 전체 데이터 → MMSI별 그룹
|
||||||
Collection<AisTargetEntity> allEntities = cacheManager.getAllValues();
|
Collection<AisTargetEntity> allEntities = cacheManager.getAllValues();
|
||||||
|
|
||||||
@ -70,18 +72,16 @@ public class VesselStaticStepConfig {
|
|||||||
Map<String, AisTargetEntity> coalesced = coalesceByMmsi(allEntities);
|
Map<String, AisTargetEntity> coalesced = coalesceByMmsi(allEntities);
|
||||||
|
|
||||||
JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource);
|
JdbcTemplate jdbcTemplate = new JdbcTemplate(queryDataSource);
|
||||||
|
Timestamp hourBucketTs = Timestamp.valueOf(hourBucket);
|
||||||
|
|
||||||
// 2. CDC: 이전 레코드와 비교 → 변경 시에만 INSERT
|
// 2. CDC: bulk SELECT로 이전 레코드 전체 조회 (N+1 → 1회)
|
||||||
String selectPrevSql = """
|
Map<String, Map<String, Object>> prevRecords = bulkFetchPreviousRecords(
|
||||||
SELECT imo, name, callsign, vessel_type, extra_info,
|
jdbcTemplate, hourBucketTs);
|
||||||
length, width, draught, destination, status,
|
|
||||||
signal_kind_code, class_type
|
|
||||||
FROM signal.t_vessel_static
|
|
||||||
WHERE mmsi = ? AND time_bucket <= ?
|
|
||||||
ORDER BY time_bucket DESC
|
|
||||||
LIMIT 1
|
|
||||||
""";
|
|
||||||
|
|
||||||
|
log.info("t_vessel_static CDC 비교 시작 — 현재: {} 선박, 이전: {} 레코드",
|
||||||
|
coalesced.size(), prevRecords.size());
|
||||||
|
|
||||||
|
// 3. 인메모리 비교 → 변경 시에만 INSERT
|
||||||
String insertSql = """
|
String insertSql = """
|
||||||
INSERT INTO signal.t_vessel_static (
|
INSERT INTO signal.t_vessel_static (
|
||||||
mmsi, time_bucket, imo, name, callsign,
|
mmsi, time_bucket, imo, name, callsign,
|
||||||
@ -104,26 +104,16 @@ public class VesselStaticStepConfig {
|
|||||||
class_type = EXCLUDED.class_type
|
class_type = EXCLUDED.class_type
|
||||||
""";
|
""";
|
||||||
|
|
||||||
Timestamp hourBucketTs = Timestamp.valueOf(hourBucket);
|
|
||||||
int inserted = 0;
|
int inserted = 0;
|
||||||
int skipped = 0;
|
int skipped = 0;
|
||||||
|
|
||||||
List<Object[]> batchArgs = new ArrayList<>();
|
List<Object[]> batchArgs = new ArrayList<>();
|
||||||
|
|
||||||
for (Map.Entry<String, AisTargetEntity> entry : coalesced.entrySet()) {
|
for (Map.Entry<String, AisTargetEntity> entry : coalesced.entrySet()) {
|
||||||
String mmsi = entry.getKey();
|
String mmsi = entry.getKey();
|
||||||
AisTargetEntity current = entry.getValue();
|
AisTargetEntity current = entry.getValue();
|
||||||
|
|
||||||
// 이전 레코드 조회
|
Map<String, Object> prev = prevRecords.get(mmsi);
|
||||||
boolean changed;
|
boolean changed = (prev == null) || hasStaticInfoChanged(current, prev);
|
||||||
try {
|
|
||||||
Map<String, Object> prev = jdbcTemplate.queryForMap(
|
|
||||||
selectPrevSql, mmsi, hourBucketTs);
|
|
||||||
changed = hasStaticInfoChanged(current, prev);
|
|
||||||
} catch (org.springframework.dao.EmptyResultDataAccessException e) {
|
|
||||||
// 이전 레코드 없음 → 첫 INSERT
|
|
||||||
changed = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (changed) {
|
if (changed) {
|
||||||
Timestamp etaTs = current.getEta() != null
|
Timestamp etaTs = current.getEta() != null
|
||||||
@ -148,14 +138,52 @@ public class VesselStaticStepConfig {
|
|||||||
jdbcTemplate.batchUpdate(insertSql, batchArgs);
|
jdbcTemplate.batchUpdate(insertSql, batchArgs);
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("t_vessel_static 동기화 완료: 총 {} 선박, INSERT {} 건, CDC 스킵 {} 건",
|
long elapsed = System.currentTimeMillis() - stepStart;
|
||||||
coalesced.size(), inserted, skipped);
|
log.info("t_vessel_static 동기화 완료: 총 {} 선박, INSERT {} 건, CDC 스킵 {} 건 ({}ms)",
|
||||||
|
coalesced.size(), inserted, skipped, elapsed);
|
||||||
|
|
||||||
return org.springframework.batch.repeat.RepeatStatus.FINISHED;
|
return org.springframework.batch.repeat.RepeatStatus.FINISHED;
|
||||||
}, transactionManager)
|
}, transactionManager)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* DISTINCT ON (mmsi)로 전체 이전 레코드를 1회 bulk 조회
|
||||||
|
* N+1 개별 SELECT → 1회 bulk SELECT로 최적화
|
||||||
|
*/
|
||||||
|
private Map<String, Map<String, Object>> bulkFetchPreviousRecords(
|
||||||
|
JdbcTemplate jdbcTemplate, Timestamp hourBucketTs) {
|
||||||
|
String sql = """
|
||||||
|
SELECT DISTINCT ON (mmsi)
|
||||||
|
mmsi, imo, name, callsign, vessel_type, extra_info,
|
||||||
|
length, width, draught, destination, status,
|
||||||
|
signal_kind_code, class_type
|
||||||
|
FROM signal.t_vessel_static
|
||||||
|
WHERE time_bucket <= ?
|
||||||
|
ORDER BY mmsi, time_bucket DESC
|
||||||
|
""";
|
||||||
|
|
||||||
|
Map<String, Map<String, Object>> result = new HashMap<>();
|
||||||
|
jdbcTemplate.query(sql, rs -> {
|
||||||
|
Map<String, Object> row = new HashMap<>();
|
||||||
|
row.put("imo", rs.getObject("imo"));
|
||||||
|
row.put("name", rs.getString("name"));
|
||||||
|
row.put("callsign", rs.getString("callsign"));
|
||||||
|
row.put("vessel_type", rs.getString("vessel_type"));
|
||||||
|
row.put("extra_info", rs.getString("extra_info"));
|
||||||
|
row.put("length", rs.getObject("length"));
|
||||||
|
row.put("width", rs.getObject("width"));
|
||||||
|
row.put("draught", rs.getObject("draught"));
|
||||||
|
row.put("destination", rs.getString("destination"));
|
||||||
|
row.put("status", rs.getString("status"));
|
||||||
|
row.put("signal_kind_code", rs.getString("signal_kind_code"));
|
||||||
|
row.put("class_type", rs.getString("class_type"));
|
||||||
|
result.put(rs.getString("mmsi"), row);
|
||||||
|
}, hourBucketTs);
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* MMSI별 필드 COALESCE: 각 필드별 마지막 non-empty 값 조합
|
* MMSI별 필드 COALESCE: 각 필드별 마지막 non-empty 값 조합
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -35,8 +35,11 @@ import org.springframework.context.annotation.Profile;
|
|||||||
|
|
||||||
import javax.sql.DataSource;
|
import javax.sql.DataSource;
|
||||||
import java.sql.Timestamp;
|
import java.sql.Timestamp;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
@ -93,9 +96,6 @@ public class VesselTrackStepConfig {
|
|||||||
@Value("${vessel.batch.chunk-size:1000}")
|
@Value("${vessel.batch.chunk-size:1000}")
|
||||||
private int chunkSize;
|
private int chunkSize;
|
||||||
|
|
||||||
@Value("${partition.retention.tables.t_vessel_tracks_5min.retention-days:7}")
|
|
||||||
private int trackRetentionDays;
|
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init() {
|
public void init() {
|
||||||
// 5분 Job의 이름을 명시적으로 설정
|
// 5분 Job의 이름을 명시적으로 설정
|
||||||
@ -116,7 +116,7 @@ public class VesselTrackStepConfig {
|
|||||||
@Bean
|
@Bean
|
||||||
@StepScope
|
@StepScope
|
||||||
public CacheBasedVesselTrackDataReader trackDataReader() {
|
public CacheBasedVesselTrackDataReader trackDataReader() {
|
||||||
return new CacheBasedVesselTrackDataReader(aisTargetCacheManager, trackRetentionDays);
|
return new CacheBasedVesselTrackDataReader(aisTargetCacheManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
@ -140,8 +140,15 @@ public class VesselTrackStepConfig {
|
|||||||
|
|
||||||
// 3. 강화된 비정상 궤적 필터링 (버킷 내 + 버킷 간 점프 검출)
|
// 3. 강화된 비정상 궤적 필터링 (버킷 내 + 버킷 간 점프 검출)
|
||||||
List<VesselTrack> filteredTracks = new ArrayList<>();
|
List<VesselTrack> filteredTracks = new ArrayList<>();
|
||||||
|
LocalDateTime staleCutoff = LocalDateTime.now().toLocalDate().atStartOfDay();
|
||||||
|
|
||||||
for (VesselTrack track : tracks) {
|
for (VesselTrack track : tracks) {
|
||||||
|
// Stale 데이터 감지 → 비정상 궤적으로 전환 (정상 집계에서 누락)
|
||||||
|
if (track.getTimeBucket() != null && track.getTimeBucket().isBefore(staleCutoff)) {
|
||||||
|
saveStaleAbnormalTrack(track);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
boolean isAbnormal = false;
|
boolean isAbnormal = false;
|
||||||
String abnormalReason = "";
|
String abnormalReason = "";
|
||||||
|
|
||||||
@ -285,6 +292,70 @@ public class VesselTrackStepConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stale 데이터(오늘 이전 time_bucket)를 비정상 궤적으로 전환 저장
|
||||||
|
* - time_bucket: 현재 5분 버킷으로 오버라이드 (파티션 존재 보장)
|
||||||
|
* - abnormal_type: stale_timestamp
|
||||||
|
* - details: 원본 time_bucket, 지연 시간(분/시), 속도/거리
|
||||||
|
*/
|
||||||
|
private void saveStaleAbnormalTrack(VesselTrack track) {
|
||||||
|
LocalDateTime now = LocalDateTime.now();
|
||||||
|
LocalDateTime currentBucket = now.withSecond(0).withNano(0)
|
||||||
|
.minusMinutes(now.getMinute() % 5);
|
||||||
|
LocalDateTime originalTimeBucket = track.getTimeBucket();
|
||||||
|
long delayMinutes = Duration.between(originalTimeBucket, now).toMinutes();
|
||||||
|
|
||||||
|
VesselTrack staleTrack = VesselTrack.builder()
|
||||||
|
.mmsi(track.getMmsi())
|
||||||
|
.timeBucket(currentBucket)
|
||||||
|
.trackGeom(track.getTrackGeom())
|
||||||
|
.distanceNm(track.getDistanceNm())
|
||||||
|
.avgSpeed(track.getAvgSpeed())
|
||||||
|
.maxSpeed(track.getMaxSpeed())
|
||||||
|
.pointCount(track.getPointCount())
|
||||||
|
.startPosition(track.getStartPosition())
|
||||||
|
.endPosition(track.getEndPosition())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
log.info("Stale 데이터 비정상 전환: MMSI={}, 원본={}, 현재={}, 지연={}분",
|
||||||
|
track.getMmsi(), originalTimeBucket, currentBucket, delayMinutes);
|
||||||
|
|
||||||
|
Map<String, Object> details = new HashMap<>();
|
||||||
|
details.put("originalTimeBucket", originalTimeBucket.toString());
|
||||||
|
details.put("currentTimeBucket", currentBucket.toString());
|
||||||
|
details.put("delayMinutes", delayMinutes);
|
||||||
|
details.put("delayHours", delayMinutes / 60);
|
||||||
|
if (track.getAvgSpeed() != null) details.put("avgSpeed", track.getAvgSpeed());
|
||||||
|
if (track.getDistanceNm() != null) details.put("distanceNm", track.getDistanceNm());
|
||||||
|
if (track.getPointCount() != null) details.put("pointCount", track.getPointCount());
|
||||||
|
|
||||||
|
List<AbnormalTrackDetector.AbnormalSegment> segments = List.of(
|
||||||
|
AbnormalTrackDetector.AbnormalSegment.builder()
|
||||||
|
.type("stale_timestamp")
|
||||||
|
.startIndex(0)
|
||||||
|
.endIndex(track.getPointCount() != null
|
||||||
|
? Math.max(track.getPointCount() - 1, 0) : 0)
|
||||||
|
.actualValue(delayMinutes)
|
||||||
|
.threshold(0)
|
||||||
|
.description(String.format("Stale 데이터: 원본 %s, 지연 %d분 (%d시간)",
|
||||||
|
originalTimeBucket, delayMinutes, delayMinutes / 60))
|
||||||
|
.details(details)
|
||||||
|
.build());
|
||||||
|
|
||||||
|
AbnormalDetectionResult result = AbnormalDetectionResult.builder()
|
||||||
|
.originalTrack(staleTrack)
|
||||||
|
.correctedTrack(null)
|
||||||
|
.abnormalSegments(segments)
|
||||||
|
.hasAbnormalities(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
try {
|
||||||
|
abnormalTrackWriter.write(new Chunk<>(List.of(result)));
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Stale 비정상 궤적 저장 실패: MMSI={}", track.getMmsi(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// CompositeItemWriter로 3개 테이블에 동시 저장
|
// CompositeItemWriter로 3개 테이블에 동시 저장
|
||||||
@Bean
|
@Bean
|
||||||
@StepScope
|
@StepScope
|
||||||
|
|||||||
@ -29,7 +29,6 @@ import java.util.stream.Collectors;
|
|||||||
public class CacheBasedVesselTrackDataReader implements ItemReader<List<VesselData>> {
|
public class CacheBasedVesselTrackDataReader implements ItemReader<List<VesselData>> {
|
||||||
|
|
||||||
private final AisTargetCacheManager cacheManager;
|
private final AisTargetCacheManager cacheManager;
|
||||||
private final int staleDataThresholdDays;
|
|
||||||
|
|
||||||
private Iterator<List<VesselData>> groupIterator;
|
private Iterator<List<VesselData>> groupIterator;
|
||||||
private boolean initialized = false;
|
private boolean initialized = false;
|
||||||
@ -59,11 +58,12 @@ public class CacheBasedVesselTrackDataReader implements ItemReader<List<VesselDa
|
|||||||
}
|
}
|
||||||
|
|
||||||
// AisTargetEntity → VesselData 변환 + MMSI × 5분 time_bucket 이중 그룹화
|
// AisTargetEntity → VesselData 변환 + MMSI × 5분 time_bucket 이중 그룹화
|
||||||
LocalDateTime staleCutoff = LocalDateTime.now().minusDays(staleDataThresholdDays);
|
// 오늘 00:00 이전 데이터 필터 (파티션은 오늘부터만 존재)
|
||||||
|
LocalDateTime staleCutoff = LocalDateTime.now().toLocalDate().atStartOfDay();
|
||||||
List<List<VesselData>> allGroups = new ArrayList<>();
|
List<List<VesselData>> allGroups = new ArrayList<>();
|
||||||
long totalPoints = 0;
|
long totalPoints = 0;
|
||||||
int totalVessels = 0;
|
int totalVessels = 0;
|
||||||
int skippedStaleGroups = 0;
|
int staleGroups = 0;
|
||||||
|
|
||||||
for (Map.Entry<String, List<AisTargetEntity>> entry : trackBuffer.entrySet()) {
|
for (Map.Entry<String, List<AisTargetEntity>> entry : trackBuffer.entrySet()) {
|
||||||
List<VesselData> vesselDataList = entry.getValue().stream()
|
List<VesselData> vesselDataList = entry.getValue().stream()
|
||||||
@ -88,16 +88,15 @@ public class CacheBasedVesselTrackDataReader implements ItemReader<List<VesselDa
|
|||||||
|
|
||||||
for (Map.Entry<LocalDateTime, List<VesselData>> bucketEntry : bucketGroups.entrySet()) {
|
for (Map.Entry<LocalDateTime, List<VesselData>> bucketEntry : bucketGroups.entrySet()) {
|
||||||
if (bucketEntry.getKey().isBefore(staleCutoff)) {
|
if (bucketEntry.getKey().isBefore(staleCutoff)) {
|
||||||
skippedStaleGroups++;
|
staleGroups++;
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
allGroups.add(bucketEntry.getValue());
|
allGroups.add(bucketEntry.getValue());
|
||||||
totalPoints += bucketEntry.getValue().size();
|
totalPoints += bucketEntry.getValue().size();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (skippedStaleGroups > 0) {
|
if (staleGroups > 0) {
|
||||||
log.info("Stale data 필터: {}개 그룹 스킵 ({}일 이전 데이터)", skippedStaleGroups, staleDataThresholdDays);
|
log.info("Stale 그룹 감지: {}건 (기준: {} 이전, 비정상 궤적으로 전환 예정)", staleGroups, staleCutoff);
|
||||||
}
|
}
|
||||||
log.info("트랙 버퍼 Reader 초기화: {} 선박, {} 그룹(MMSI×버킷), {} 포인트 (평균 {}pt/그룹)",
|
log.info("트랙 버퍼 Reader 초기화: {} 선박, {} 그룹(MMSI×버킷), {} 포인트 (평균 {}pt/그룹)",
|
||||||
totalVessels, allGroups.size(), totalPoints,
|
totalVessels, allGroups.size(), totalPoints,
|
||||||
|
|||||||
불러오는 중...
Reference in New Issue
Block a user