feat(batch): 비정상 궤적 포함 저장 플래그 추가 — 강화학습 데이터 수집용
GPS 스푸핑 등 비정상 운항 패턴의 강화학습 분류기 고도화를 위해, 비정상 궤적을 정상 테이블(5min/hourly/daily)과 캐시(L1/L2)에도 포함 저장하는 설정 플래그 추가. - vessel.batch.track.include-abnormal-in-tracks 플래그 (기본 false) - 5min: isAbnormal 시에도 filteredTracks에 포함 (플래그 true) - Hourly/Daily: correctedTrack null 시에도 originalTrack 포함 (플래그 true) - 비정상 검출 + t_abnormal_tracks 기록은 플래그와 무관하게 항상 유지 - prod 환경 true 설정 (강화학습 데이터 수집) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
부모
da6db06dcc
커밋
da7e8f0e41
@ -62,6 +62,9 @@ public class DailyAggregationStepConfig {
|
||||
@Value("${vessel.batch.chunk-size:5000}")
|
||||
private int chunkSize;
|
||||
|
||||
@Value("${vessel.batch.track.include-abnormal-in-tracks:false}")
|
||||
private boolean includeAbnormalInTracks;
|
||||
|
||||
@Bean
|
||||
public Step mergeDailyTracksStep() {
|
||||
log.info("Building mergeDailyTracksStep with cache-based in-memory merge");
|
||||
@ -110,7 +113,9 @@ public class DailyAggregationStepConfig {
|
||||
return new CompositeTrackWriter(
|
||||
vesselTrackBulkWriter,
|
||||
abnormalTrackWriter,
|
||||
"daily"
|
||||
"daily",
|
||||
null,
|
||||
includeAbnormalInTracks
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@ -69,6 +69,9 @@ public class HourlyAggregationStepConfig {
|
||||
@Value("${vessel.batch.chunk-size:5000}")
|
||||
private int chunkSize;
|
||||
|
||||
@Value("${vessel.batch.track.include-abnormal-in-tracks:false}")
|
||||
private boolean includeAbnormalInTracks;
|
||||
|
||||
// ──────────────────────────────────────────────
|
||||
// Step 1: 5분 → 시간 병합 (인메모리 캐시 기반)
|
||||
// ──────────────────────────────────────────────
|
||||
@ -122,7 +125,8 @@ public class HourlyAggregationStepConfig {
|
||||
vesselTrackBulkWriter,
|
||||
abnormalTrackWriter,
|
||||
"hourly",
|
||||
hourlyTrackCache
|
||||
hourlyTrackCache,
|
||||
includeAbnormalInTracks
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@ -96,6 +96,9 @@ public class VesselTrackStepConfig {
|
||||
@Value("${vessel.batch.chunk-size:1000}")
|
||||
private int chunkSize;
|
||||
|
||||
@Value("${vessel.batch.track.include-abnormal-in-tracks:false}")
|
||||
private boolean includeAbnormalInTracks;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
// 5분 Job의 이름을 명시적으로 설정
|
||||
@ -203,11 +206,15 @@ public class VesselTrackStepConfig {
|
||||
log.warn("비정상 궤적 감지 [{}]: vessel={}, avg_speed={}, distance={}",
|
||||
abnormalReason, track.getVesselKey(), track.getAvgSpeed(), track.getDistanceNm());
|
||||
saveAbnormalTrack(track, abnormalReason);
|
||||
if (includeAbnormalInTracks) {
|
||||
filteredTracks.add(track); // 플래그 true → 정상 테이블+캐시에도 포함
|
||||
}
|
||||
} else {
|
||||
filteredTracks.add(track);
|
||||
}
|
||||
|
||||
// 정상 궤적의 종료 위치 저장 (캐시 업데이트용)
|
||||
if (track.getEndPosition() != null) {
|
||||
// 궤적의 종료 위치 저장 (캐시 업데이트용) — 비정상 포함 시에도 위치 추적
|
||||
if (filteredTracks.contains(track) && track.getEndPosition() != null) {
|
||||
currentBucketEndPositions.put(track.getMmsi(), VesselBucketPositionDto.builder()
|
||||
.mmsi(track.getMmsi())
|
||||
.endLon(track.getEndPosition().getLon())
|
||||
@ -216,7 +223,6 @@ public class VesselTrackStepConfig {
|
||||
.build());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return filteredTracks.isEmpty() ? null : filteredTracks;
|
||||
};
|
||||
|
||||
@ -25,21 +25,24 @@ public class CompositeTrackWriter implements ItemWriter<AbnormalDetectionResult>
|
||||
private final AbnormalTrackWriter abnormalTrackWriter;
|
||||
private final String targetTable;
|
||||
private final HourlyTrackCache hourlyTrackCache; // nullable (daily writer는 미사용)
|
||||
private final boolean includeAbnormalInTracks;
|
||||
|
||||
public CompositeTrackWriter(VesselTrackBulkWriter vesselTrackBulkWriter,
|
||||
AbnormalTrackWriter abnormalTrackWriter,
|
||||
String targetTable,
|
||||
HourlyTrackCache hourlyTrackCache) {
|
||||
HourlyTrackCache hourlyTrackCache,
|
||||
boolean includeAbnormalInTracks) {
|
||||
this.vesselTrackBulkWriter = vesselTrackBulkWriter;
|
||||
this.abnormalTrackWriter = abnormalTrackWriter;
|
||||
this.targetTable = targetTable;
|
||||
this.hourlyTrackCache = hourlyTrackCache;
|
||||
this.includeAbnormalInTracks = includeAbnormalInTracks;
|
||||
}
|
||||
|
||||
public CompositeTrackWriter(VesselTrackBulkWriter vesselTrackBulkWriter,
|
||||
AbnormalTrackWriter abnormalTrackWriter,
|
||||
String targetTable) {
|
||||
this(vesselTrackBulkWriter, abnormalTrackWriter, targetTable, null);
|
||||
this(vesselTrackBulkWriter, abnormalTrackWriter, targetTable, null, false);
|
||||
}
|
||||
|
||||
@BeforeStep
|
||||
@ -66,9 +69,11 @@ public class CompositeTrackWriter implements ItemWriter<AbnormalDetectionResult>
|
||||
abnormalResults.add(result);
|
||||
|
||||
// 정정된 궤적이 있으면 정상 궤적으로 저장
|
||||
// null이면 전체 궤적이 비정상이므로 제외
|
||||
// null이면 전체 궤적이 비정상이므로 제외 (플래그 true면 원본 포함)
|
||||
if (result.getCorrectedTrack() != null) {
|
||||
normalTracks.add(result.getCorrectedTrack());
|
||||
} else if (includeAbnormalInTracks) {
|
||||
normalTracks.add(result.getOriginalTrack());
|
||||
} else {
|
||||
log.debug("비정상 궤적 전체 제외: vessel={}",
|
||||
result.getOriginalTrack().getVesselKey());
|
||||
|
||||
@ -173,6 +173,7 @@ vessel: # spring 하위가 아닌 최상위 레벨
|
||||
|
||||
# 궤적 비정상 검출 설정
|
||||
track:
|
||||
include-abnormal-in-tracks: true # 비정상 궤적도 정상 테이블+캐시에 포함 (강화학습 데이터 수집용)
|
||||
abnormal-detection:
|
||||
large-gap-threshold-hours: 4 # 이 시간 이상 gap은 연결 안함
|
||||
extreme-speed-threshold: 1000 # 이 속도 이상은 무조건 비정상 (knots)
|
||||
|
||||
@ -159,6 +159,8 @@ vessel:
|
||||
page-size: ${BATCH_PAGE_SIZE:10000}
|
||||
partition-size: ${BATCH_PARTITION_SIZE:24}
|
||||
skip-limit: 100
|
||||
track:
|
||||
include-abnormal-in-tracks: false # true: 비정상 궤적도 정상 테이블+캐시에 포함 (강화학습 데이터 수집용)
|
||||
retry-limit: 3
|
||||
# Reader 설정
|
||||
use-cursor-reader: true # Cursor Reader 사용 여부
|
||||
|
||||
불러오는 중...
Reference in New Issue
Block a user