feat: Stale 데이터 비정상 궤적 전환 — 과거 timestamp 수신 시 정보 보존
- CacheBasedVesselTrackDataReader: stale 그룹 스킵 제거 → 전체 통과 + 감지 로깅 - VesselTrackStepConfig: Processor에서 stale time_bucket 감지 시 비정상 궤적으로 전환 - abnormal_type: stale_timestamp (신규 분류) - time_bucket: 현재 5분 버킷으로 오버라이드 (파티션 존재 보장) - details: 원본 time_bucket, 지연 시간(분/시), 속도/거리 정보 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
부모
82ae4d9ef5
커밋
732f69eb02
@ -35,8 +35,11 @@ import org.springframework.context.annotation.Profile;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@ -93,9 +96,6 @@ public class VesselTrackStepConfig {
|
||||
@Value("${vessel.batch.chunk-size:1000}")
|
||||
private int chunkSize;
|
||||
|
||||
@Value("${partition.retention.tables.t_vessel_tracks_5min.retention-days:7}")
|
||||
private int trackRetentionDays;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
// 5분 Job의 이름을 명시적으로 설정
|
||||
@ -116,7 +116,7 @@ public class VesselTrackStepConfig {
|
||||
@Bean
|
||||
@StepScope
|
||||
public CacheBasedVesselTrackDataReader trackDataReader() {
|
||||
return new CacheBasedVesselTrackDataReader(aisTargetCacheManager, trackRetentionDays);
|
||||
return new CacheBasedVesselTrackDataReader(aisTargetCacheManager);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ -140,8 +140,15 @@ public class VesselTrackStepConfig {
|
||||
|
||||
// 3. 강화된 비정상 궤적 필터링 (버킷 내 + 버킷 간 점프 검출)
|
||||
List<VesselTrack> filteredTracks = new ArrayList<>();
|
||||
LocalDateTime staleCutoff = LocalDateTime.now().toLocalDate().atStartOfDay();
|
||||
|
||||
for (VesselTrack track : tracks) {
|
||||
// Stale 데이터 감지 → 비정상 궤적으로 전환 (정상 집계에서 누락)
|
||||
if (track.getTimeBucket() != null && track.getTimeBucket().isBefore(staleCutoff)) {
|
||||
saveStaleAbnormalTrack(track);
|
||||
continue;
|
||||
}
|
||||
|
||||
boolean isAbnormal = false;
|
||||
String abnormalReason = "";
|
||||
|
||||
@ -285,6 +292,70 @@ public class VesselTrackStepConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stale 데이터(오늘 이전 time_bucket)를 비정상 궤적으로 전환 저장
|
||||
* - time_bucket: 현재 5분 버킷으로 오버라이드 (파티션 존재 보장)
|
||||
* - abnormal_type: stale_timestamp
|
||||
* - details: 원본 time_bucket, 지연 시간(분/시), 속도/거리
|
||||
*/
|
||||
private void saveStaleAbnormalTrack(VesselTrack track) {
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
LocalDateTime currentBucket = now.withSecond(0).withNano(0)
|
||||
.minusMinutes(now.getMinute() % 5);
|
||||
LocalDateTime originalTimeBucket = track.getTimeBucket();
|
||||
long delayMinutes = Duration.between(originalTimeBucket, now).toMinutes();
|
||||
|
||||
VesselTrack staleTrack = VesselTrack.builder()
|
||||
.mmsi(track.getMmsi())
|
||||
.timeBucket(currentBucket)
|
||||
.trackGeom(track.getTrackGeom())
|
||||
.distanceNm(track.getDistanceNm())
|
||||
.avgSpeed(track.getAvgSpeed())
|
||||
.maxSpeed(track.getMaxSpeed())
|
||||
.pointCount(track.getPointCount())
|
||||
.startPosition(track.getStartPosition())
|
||||
.endPosition(track.getEndPosition())
|
||||
.build();
|
||||
|
||||
log.info("Stale 데이터 비정상 전환: MMSI={}, 원본={}, 현재={}, 지연={}분",
|
||||
track.getMmsi(), originalTimeBucket, currentBucket, delayMinutes);
|
||||
|
||||
Map<String, Object> details = new HashMap<>();
|
||||
details.put("originalTimeBucket", originalTimeBucket.toString());
|
||||
details.put("currentTimeBucket", currentBucket.toString());
|
||||
details.put("delayMinutes", delayMinutes);
|
||||
details.put("delayHours", delayMinutes / 60);
|
||||
if (track.getAvgSpeed() != null) details.put("avgSpeed", track.getAvgSpeed());
|
||||
if (track.getDistanceNm() != null) details.put("distanceNm", track.getDistanceNm());
|
||||
if (track.getPointCount() != null) details.put("pointCount", track.getPointCount());
|
||||
|
||||
List<AbnormalTrackDetector.AbnormalSegment> segments = List.of(
|
||||
AbnormalTrackDetector.AbnormalSegment.builder()
|
||||
.type("stale_timestamp")
|
||||
.startIndex(0)
|
||||
.endIndex(track.getPointCount() != null
|
||||
? Math.max(track.getPointCount() - 1, 0) : 0)
|
||||
.actualValue(delayMinutes)
|
||||
.threshold(0)
|
||||
.description(String.format("Stale 데이터: 원본 %s, 지연 %d분 (%d시간)",
|
||||
originalTimeBucket, delayMinutes, delayMinutes / 60))
|
||||
.details(details)
|
||||
.build());
|
||||
|
||||
AbnormalDetectionResult result = AbnormalDetectionResult.builder()
|
||||
.originalTrack(staleTrack)
|
||||
.correctedTrack(null)
|
||||
.abnormalSegments(segments)
|
||||
.hasAbnormalities(true)
|
||||
.build();
|
||||
|
||||
try {
|
||||
abnormalTrackWriter.write(new Chunk<>(List.of(result)));
|
||||
} catch (Exception e) {
|
||||
log.error("Stale 비정상 궤적 저장 실패: MMSI={}", track.getMmsi(), e);
|
||||
}
|
||||
}
|
||||
|
||||
// CompositeItemWriter로 3개 테이블에 동시 저장
|
||||
@Bean
|
||||
@StepScope
|
||||
|
||||
@ -29,7 +29,6 @@ import java.util.stream.Collectors;
|
||||
public class CacheBasedVesselTrackDataReader implements ItemReader<List<VesselData>> {
|
||||
|
||||
private final AisTargetCacheManager cacheManager;
|
||||
private final int staleDataThresholdDays;
|
||||
|
||||
private Iterator<List<VesselData>> groupIterator;
|
||||
private boolean initialized = false;
|
||||
@ -59,11 +58,12 @@ public class CacheBasedVesselTrackDataReader implements ItemReader<List<VesselDa
|
||||
}
|
||||
|
||||
// AisTargetEntity → VesselData 변환 + MMSI × 5분 time_bucket 이중 그룹화
|
||||
LocalDateTime staleCutoff = LocalDateTime.now().minusDays(staleDataThresholdDays);
|
||||
// 오늘 00:00 이전 데이터 필터 (파티션은 오늘부터만 존재)
|
||||
LocalDateTime staleCutoff = LocalDateTime.now().toLocalDate().atStartOfDay();
|
||||
List<List<VesselData>> allGroups = new ArrayList<>();
|
||||
long totalPoints = 0;
|
||||
int totalVessels = 0;
|
||||
int skippedStaleGroups = 0;
|
||||
int staleGroups = 0;
|
||||
|
||||
for (Map.Entry<String, List<AisTargetEntity>> entry : trackBuffer.entrySet()) {
|
||||
List<VesselData> vesselDataList = entry.getValue().stream()
|
||||
@ -88,16 +88,15 @@ public class CacheBasedVesselTrackDataReader implements ItemReader<List<VesselDa
|
||||
|
||||
for (Map.Entry<LocalDateTime, List<VesselData>> bucketEntry : bucketGroups.entrySet()) {
|
||||
if (bucketEntry.getKey().isBefore(staleCutoff)) {
|
||||
skippedStaleGroups++;
|
||||
continue;
|
||||
staleGroups++;
|
||||
}
|
||||
allGroups.add(bucketEntry.getValue());
|
||||
totalPoints += bucketEntry.getValue().size();
|
||||
}
|
||||
}
|
||||
|
||||
if (skippedStaleGroups > 0) {
|
||||
log.info("Stale data 필터: {}개 그룹 스킵 ({}일 이전 데이터)", skippedStaleGroups, staleDataThresholdDays);
|
||||
if (staleGroups > 0) {
|
||||
log.info("Stale 그룹 감지: {}건 (기준: {} 이전, 비정상 궤적으로 전환 예정)", staleGroups, staleCutoff);
|
||||
}
|
||||
log.info("트랙 버퍼 Reader 초기화: {} 선박, {} 그룹(MMSI×버킷), {} 포인트 (평균 {}pt/그룹)",
|
||||
totalVessels, allGroups.size(), totalPoints,
|
||||
|
||||
불러오는 중...
Reference in New Issue
Block a user