snp-batch-validation/src/main/java/com/snp/batch/service/RecollectionHistoryService.java
HYOJIN ce67dcd7e3 feat(global): Job 한글 표시명 DB 관리 및 전체 화면 통합 (#45)
- job_display_name 테이블 신규 생성 (jobName, displayName, apiKey)
- 정적 Map 제거 → DB 캐시 기반 표시명 조회로 전환
- 초기 데이터 시드 20건 (테이블 비어있을 때 자동 삽입)
- 표시명 조회/수정 REST API 추가 (GET/PUT /api/batch/display-names)
- 재수집 이력 생성 시 displayName 우선 적용
- 전체 화면 displayName 통합 (Dashboard, Executions, Recollects, RecollectDetail, Schedules, Timeline)
2026-03-13 14:38:34 +09:00

548 lines
24 KiB
Java

package com.snp.batch.service;
import com.snp.batch.global.dto.JobExecutionDetailDto;
import com.snp.batch.global.model.BatchCollectionPeriod;
import com.snp.batch.global.model.BatchLastExecution;
import com.snp.batch.global.model.BatchRecollectionHistory;
import com.snp.batch.global.model.BatchFailedRecord;
import com.snp.batch.global.model.JobDisplayNameEntity;
import com.snp.batch.global.repository.BatchApiLogRepository;
import com.snp.batch.global.repository.BatchCollectionPeriodRepository;
import com.snp.batch.global.repository.BatchFailedRecordRepository;
import com.snp.batch.global.repository.BatchLastExecutionRepository;
import com.snp.batch.global.repository.BatchRecollectionHistoryRepository;
import com.snp.batch.global.repository.JobDisplayNameRepository;
import jakarta.persistence.criteria.Predicate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import com.snp.batch.global.model.BatchApiLog;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
@Service
@RequiredArgsConstructor
public class RecollectionHistoryService {
private final BatchRecollectionHistoryRepository historyRepository;
private final BatchCollectionPeriodRepository periodRepository;
private final BatchLastExecutionRepository lastExecutionRepository;
private final BatchApiLogRepository apiLogRepository;
private final BatchFailedRecordRepository failedRecordRepository;
private final JobExplorer jobExplorer;
private final JobDisplayNameRepository jobDisplayNameRepository;
/**
* 재수집 실행 시작 기록
* REQUIRES_NEW: Job 실패해도 이력은 보존
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
public BatchRecollectionHistory recordStart(
String jobName,
Long jobExecutionId,
String apiKey,
String executor,
String reason) {
if (apiKey == null || apiKey.isBlank()) {
log.warn("[RecollectionHistory] apiKey가 null이므로 이력 미생성: jobName={}, executor={}", jobName, executor);
return null;
}
boolean isRetryByRecordKeys = "AUTO_RETRY".equals(executor) || "MANUAL_RETRY".equals(executor);
LocalDateTime rangeFrom = null;
LocalDateTime rangeTo = null;
String apiKeyName = null;
boolean hasOverlap = false;
String overlapIds = null;
if (isRetryByRecordKeys) {
// 실패 건 재수집 (자동/수동): 날짜 범위가 아닌 실패 레코드 키 기반이므로 날짜 없이 이력 생성
if (apiKey != null) {
apiKeyName = jobDisplayNameRepository.findByApiKey(apiKey)
.map(JobDisplayNameEntity::getDisplayName)
.orElseGet(() -> periodRepository.findById(apiKey)
.map(BatchCollectionPeriod::getApiKeyName)
.orElse(null));
}
log.info("[RecollectionHistory] 실패 건 재수집 이력 생성 (날짜 범위 없음): executor={}, apiKey={}, apiKeyName={}", executor, apiKey, apiKeyName);
} else {
// 수동 재수집: BatchCollectionPeriod에서 날짜 범위 조회
Optional<BatchCollectionPeriod> period = periodRepository.findById(apiKey);
if (period.isEmpty()) {
log.warn("[RecollectionHistory] apiKey {} 에 대한 수집기간 없음, 이력 미생성", apiKey);
return null;
}
BatchCollectionPeriod cp = period.get();
rangeFrom = cp.getRangeFromDate();
rangeTo = cp.getRangeToDate();
apiKeyName = jobDisplayNameRepository.findByApiKey(apiKey)
.map(JobDisplayNameEntity::getDisplayName)
.orElseGet(cp::getApiKeyName);
// 기간 중복 검출
List<BatchRecollectionHistory> overlaps = historyRepository
.findOverlappingHistories(apiKey, rangeFrom, rangeTo, -1L);
hasOverlap = !overlaps.isEmpty();
overlapIds = overlaps.stream()
.map(h -> String.valueOf(h.getHistoryId()))
.collect(Collectors.joining(","));
if (overlapIds.length() > 490) {
overlapIds = overlapIds.substring(0, 490) + "...";
}
}
LocalDateTime now = LocalDateTime.now();
BatchRecollectionHistory history = BatchRecollectionHistory.builder()
.apiKey(apiKey)
.apiKeyName(apiKeyName)
.jobName(jobName)
.jobExecutionId(jobExecutionId)
.rangeFromDate(rangeFrom)
.rangeToDate(rangeTo)
.executionStatus("STARTED")
.executionStartTime(now)
.executor(executor != null ? executor : "SYSTEM")
.recollectionReason(reason)
.hasOverlap(hasOverlap)
.overlappingHistoryIds(hasOverlap ? overlapIds : null)
.createdAt(now)
.updatedAt(now)
.build();
BatchRecollectionHistory saved = historyRepository.save(history);
log.info("[RecollectionHistory] 재수집 이력 생성: historyId={}, apiKey={}, jobExecutionId={}, range={}~{}",
saved.getHistoryId(), apiKey, jobExecutionId, rangeFrom, rangeTo);
return saved;
}
/**
* 재수집 실행 완료 기록
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void recordCompletion(
Long jobExecutionId,
String status,
Long readCount,
Long writeCount,
Long skipCount,
Integer apiCallCount,
Long totalResponseTimeMs,
String failureReason) {
Optional<BatchRecollectionHistory> opt =
historyRepository.findByJobExecutionId(jobExecutionId);
if (opt.isEmpty()) {
log.warn("[RecollectionHistory] jobExecutionId {} 에 해당하는 이력 없음", jobExecutionId);
return;
}
BatchRecollectionHistory history = opt.get();
LocalDateTime now = LocalDateTime.now();
history.setExecutionStatus(status);
history.setExecutionEndTime(now);
history.setReadCount(readCount);
history.setWriteCount(writeCount);
history.setSkipCount(skipCount);
history.setApiCallCount(apiCallCount);
history.setTotalResponseTimeMs(totalResponseTimeMs);
history.setFailureReason(failureReason);
history.setUpdatedAt(now);
if (history.getExecutionStartTime() != null) {
history.setDurationMs(Duration.between(history.getExecutionStartTime(), now).toMillis());
}
historyRepository.save(history);
log.info("[RecollectionHistory] 재수집 완료 기록: jobExecutionId={}, status={}, read={}, write={}",
jobExecutionId, status, readCount, writeCount);
}
/**
* 동적 필터링 + 페이징 목록 조회
*/
@Transactional(readOnly = true)
public Page<BatchRecollectionHistory> getHistories(
String apiKey, String jobName, String status,
LocalDateTime from, LocalDateTime to,
Pageable pageable) {
Specification<BatchRecollectionHistory> spec = (root, query, cb) -> {
List<Predicate> predicates = new ArrayList<>();
if (apiKey != null && !apiKey.isEmpty()) {
predicates.add(cb.equal(root.get("apiKey"), apiKey));
}
if (jobName != null && !jobName.isEmpty()) {
predicates.add(cb.equal(root.get("jobName"), jobName));
}
if (status != null && !status.isEmpty()) {
predicates.add(cb.equal(root.get("executionStatus"), status));
}
if (from != null) {
predicates.add(cb.greaterThanOrEqualTo(root.get("executionStartTime"), from));
}
if (to != null) {
predicates.add(cb.lessThanOrEqualTo(root.get("executionStartTime"), to));
}
query.orderBy(cb.desc(root.get("createdAt")));
return cb.and(predicates.toArray(new Predicate[0]));
};
return historyRepository.findAll(spec, pageable);
}
/**
* CSV 내보내기용 전체 목록 조회 (최대 10,000건)
*/
@Transactional(readOnly = true)
public List<BatchRecollectionHistory> getHistoriesForExport(
String apiKey, String jobName, String status,
LocalDateTime from, LocalDateTime to) {
Specification<BatchRecollectionHistory> spec = (root, query, cb) -> {
List<Predicate> predicates = new ArrayList<>();
if (apiKey != null && !apiKey.isEmpty()) {
predicates.add(cb.equal(root.get("apiKey"), apiKey));
}
if (jobName != null && !jobName.isEmpty()) {
predicates.add(cb.equal(root.get("jobName"), jobName));
}
if (status != null && !status.isEmpty()) {
predicates.add(cb.equal(root.get("executionStatus"), status));
}
if (from != null) {
predicates.add(cb.greaterThanOrEqualTo(root.get("executionStartTime"), from));
}
if (to != null) {
predicates.add(cb.lessThanOrEqualTo(root.get("executionStartTime"), to));
}
query.orderBy(cb.desc(root.get("createdAt")));
return cb.and(predicates.toArray(new Predicate[0]));
};
return historyRepository.findAll(spec, org.springframework.data.domain.PageRequest.of(0, 10000)).getContent();
}
/**
* 상세 조회 (중복 이력 실시간 재검사 포함)
*/
@Transactional(readOnly = true)
public Map<String, Object> getHistoryDetail(Long historyId) {
BatchRecollectionHistory history = historyRepository.findById(historyId)
.orElseThrow(() -> new IllegalArgumentException("이력을 찾을 수 없습니다: " + historyId));
// 중복 이력 실시간 재검사
List<BatchRecollectionHistory> currentOverlaps;
if (history.getRangeFromDate() != null && history.getRangeToDate() != null) {
currentOverlaps = historyRepository.findOverlappingHistories(
history.getApiKey(), history.getRangeFromDate(), history.getRangeToDate(),
history.getHistoryId());
} else {
currentOverlaps = Collections.emptyList();
}
// API 응답시간 통계
Map<String, Object> apiStats = null;
if (history.getJobExecutionId() != null) {
apiStats = getApiStats(history.getJobExecutionId());
}
Map<String, Object> result = new LinkedHashMap<>();
result.put("history", history);
result.put("overlappingHistories", currentOverlaps);
result.put("apiStats", apiStats);
return result;
}
/**
* 상세 조회 + Step Execution + Collection Period 포함
* job_execution_id로 batch_step_execution, batch_collection_period를 조인
*/
@Transactional(readOnly = true)
public Map<String, Object> getHistoryDetailWithSteps(Long historyId) {
BatchRecollectionHistory history = historyRepository.findById(historyId)
.orElseThrow(() -> new IllegalArgumentException("이력을 찾을 수 없습니다: " + historyId));
// 중복 이력 실시간 재검사
List<BatchRecollectionHistory> currentOverlaps;
if (history.getRangeFromDate() != null && history.getRangeToDate() != null) {
currentOverlaps = historyRepository.findOverlappingHistories(
history.getApiKey(), history.getRangeFromDate(), history.getRangeToDate(),
history.getHistoryId());
} else {
currentOverlaps = Collections.emptyList();
}
// API 응답시간 통계
Map<String, Object> apiStats = null;
if (history.getJobExecutionId() != null) {
apiStats = getApiStats(history.getJobExecutionId());
}
// Collection Period 조회
BatchCollectionPeriod collectionPeriod = periodRepository
.findById(history.getApiKey()).orElse(null);
// Step Execution 조회 (job_execution_id 기반)
List<JobExecutionDetailDto.StepExecutionDto> stepExecutions = new ArrayList<>();
if (history.getJobExecutionId() != null) {
JobExecution jobExecution = jobExplorer.getJobExecution(history.getJobExecutionId());
if (jobExecution != null) {
// N+1 방지: stepExecutionId 목록을 일괄 조회 후 Map으로 변환
List<Long> stepIds = jobExecution.getStepExecutions().stream()
.map(StepExecution::getId)
.toList();
Map<Long, List<BatchFailedRecord>> failedRecordsMap = failedRecordRepository
.findByStepExecutionIdIn(stepIds).stream()
.collect(Collectors.groupingBy(BatchFailedRecord::getStepExecutionId));
stepExecutions = jobExecution.getStepExecutions().stream()
.map(step -> convertStepToDto(step, failedRecordsMap.getOrDefault(step.getId(), Collections.emptyList())))
.collect(Collectors.toList());
}
}
Map<String, Object> result = new LinkedHashMap<>();
result.put("history", history);
result.put("overlappingHistories", currentOverlaps);
result.put("apiStats", apiStats);
result.put("collectionPeriod", collectionPeriod);
result.put("stepExecutions", stepExecutions);
return result;
}
private JobExecutionDetailDto.StepExecutionDto convertStepToDto(StepExecution stepExecution,
List<BatchFailedRecord> failedRecords) {
Long duration = null;
if (stepExecution.getStartTime() != null && stepExecution.getEndTime() != null) {
duration = Duration.between(stepExecution.getStartTime(), stepExecution.getEndTime()).toMillis();
}
// StepExecutionContext에서 API 정보 추출 (ExecutionDetail 호환)
JobExecutionDetailDto.ApiCallInfo apiCallInfo = null;
var context = stepExecution.getExecutionContext();
if (context.containsKey("apiUrl")) {
apiCallInfo = JobExecutionDetailDto.ApiCallInfo.builder()
.apiUrl(context.getString("apiUrl", ""))
.method(context.getString("apiMethod", ""))
.totalCalls(context.containsKey("totalApiCalls") ? context.getInt("totalApiCalls", 0) : null)
.completedCalls(context.containsKey("completedApiCalls") ? context.getInt("completedApiCalls", 0) : null)
.lastCallTime(context.containsKey("lastCallTime") ? context.getString("lastCallTime", "") : null)
.build();
}
// batch_api_log 테이블에서 Step별 API 로그 집계 + 개별 로그 조회
JobExecutionDetailDto.StepApiLogSummary apiLogSummary =
buildStepApiLogSummary(stepExecution.getId());
// Step별 실패 레코드 DTO 변환 (사전 일괄 조회된 목록 사용)
List<JobExecutionDetailDto.FailedRecordDto> failedRecordDtos = failedRecords.stream()
.map(record -> JobExecutionDetailDto.FailedRecordDto.builder()
.id(record.getId())
.jobName(record.getJobName())
.recordKey(record.getRecordKey())
.errorMessage(record.getErrorMessage())
.retryCount(record.getRetryCount())
.status(record.getStatus())
.createdAt(record.getCreatedAt())
.build())
.collect(Collectors.toList());
return JobExecutionDetailDto.StepExecutionDto.builder()
.stepExecutionId(stepExecution.getId())
.stepName(stepExecution.getStepName())
.status(stepExecution.getStatus().name())
.startTime(stepExecution.getStartTime())
.endTime(stepExecution.getEndTime())
.readCount((int) stepExecution.getReadCount())
.writeCount((int) stepExecution.getWriteCount())
.commitCount((int) stepExecution.getCommitCount())
.rollbackCount((int) stepExecution.getRollbackCount())
.readSkipCount((int) stepExecution.getReadSkipCount())
.processSkipCount((int) stepExecution.getProcessSkipCount())
.writeSkipCount((int) stepExecution.getWriteSkipCount())
.filterCount((int) stepExecution.getFilterCount())
.exitCode(stepExecution.getExitStatus().getExitCode())
.exitMessage(stepExecution.getExitStatus().getExitDescription())
.duration(duration)
.apiCallInfo(apiCallInfo)
.apiLogSummary(apiLogSummary)
.failedRecords(failedRecordDtos.isEmpty() ? null : failedRecordDtos)
.build();
}
/**
* Step별 batch_api_log 통계 집계 (개별 로그는 별도 API로 페이징 조회)
*/
private JobExecutionDetailDto.StepApiLogSummary buildStepApiLogSummary(Long stepExecutionId) {
List<Object[]> stats = apiLogRepository.getApiStatsByStepExecutionId(stepExecutionId);
if (stats.isEmpty() || stats.get(0) == null || ((Number) stats.get(0)[0]).longValue() == 0L) {
return null;
}
Object[] row = stats.get(0);
return JobExecutionDetailDto.StepApiLogSummary.builder()
.totalCalls(((Number) row[0]).longValue())
.successCount(((Number) row[1]).longValue())
.errorCount(((Number) row[2]).longValue())
.avgResponseMs(((Number) row[3]).doubleValue())
.maxResponseMs(((Number) row[4]).longValue())
.minResponseMs(((Number) row[5]).longValue())
.totalResponseMs(((Number) row[6]).longValue())
.totalRecordCount(((Number) row[7]).longValue())
.build();
}
/**
* 재수집 이력 목록의 jobExecutionId별 FAILED 상태 실패건수 조회
*/
@Transactional(readOnly = true)
public Map<Long, Long> getFailedRecordCounts(List<Long> jobExecutionIds) {
if (jobExecutionIds.isEmpty()) {
return Collections.emptyMap();
}
return failedRecordRepository.countFailedByJobExecutionIds(jobExecutionIds).stream()
.collect(Collectors.toMap(
row -> ((Number) row[0]).longValue(),
row -> ((Number) row[1]).longValue()
));
}
/**
* 대시보드용 최근 10건
*/
@Transactional(readOnly = true)
public List<BatchRecollectionHistory> getRecentHistories() {
return historyRepository.findTop10ByOrderByCreatedAtDesc();
}
/**
* 통계 조회
*/
@Transactional(readOnly = true)
public Map<String, Object> getHistoryStats() {
Map<String, Object> stats = new LinkedHashMap<>();
stats.put("totalCount", historyRepository.count());
stats.put("completedCount", historyRepository.countByExecutionStatus("COMPLETED"));
stats.put("failedCount", historyRepository.countByExecutionStatus("FAILED"));
stats.put("runningCount", historyRepository.countByExecutionStatus("STARTED"));
stats.put("overlapCount", historyRepository.countByHasOverlapTrue());
return stats;
}
/**
* API 응답시간 통계 (BatchApiLog 집계)
*/
@Transactional(readOnly = true)
public Map<String, Object> getApiStats(Long jobExecutionId) {
List<Object[]> results = apiLogRepository.getApiStatsByJobExecutionId(jobExecutionId);
if (results.isEmpty() || results.get(0) == null) {
return null;
}
Object[] row = results.get(0);
Map<String, Object> stats = new LinkedHashMap<>();
stats.put("callCount", row[0]);
stats.put("totalMs", row[1]);
stats.put("avgMs", row[2]);
stats.put("maxMs", row[3]);
stats.put("minMs", row[4]);
return stats;
}
/**
* jobName으로 BatchCollectionPeriod에서 apiKey를 조회합니다.
*/
@Transactional(readOnly = true)
public String findApiKeyByJobName(String jobName) {
return periodRepository.findByJobName(jobName)
.map(BatchCollectionPeriod::getApiKey)
.orElse(null);
}
/**
* 재수집 실행 전: 현재 last_success_date 조회 (복원용)
*/
@Transactional(readOnly = true)
public LocalDateTime getLastSuccessDate(String apiKey) {
return lastExecutionRepository.findById(apiKey)
.map(BatchLastExecution::getLastSuccessDate)
.orElse(null);
}
/**
* 재수집 실행 후: Tasklet이 업데이트한 last_success_date를 원래 값으로 복원
* 재수집은 과거 데이터 재처리이므로 last_success_date를 변경하면 안 됨
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void restoreLastSuccessDate(String apiKey, LocalDateTime originalDate) {
if (originalDate == null) return;
lastExecutionRepository.findById(apiKey).ifPresent(lastExec -> {
LocalDateTime beforeDate = lastExec.getLastSuccessDate();
lastExec.setLastSuccessDate(originalDate);
lastExec.setUpdatedAt(LocalDateTime.now());
lastExecutionRepository.save(lastExec);
log.info("[RecollectionHistory] last_success_date 복원: apiKey={}, before={}, after={}",
apiKey, beforeDate, originalDate);
});
}
/**
* 수집 기간 전체 조회
*/
@Transactional(readOnly = true)
public List<BatchCollectionPeriod> getAllCollectionPeriods() {
return periodRepository.findAllByOrderByOrderSeqAsc();
}
/**
* 수집 기간 수정
*/
@Transactional
public BatchCollectionPeriod updateCollectionPeriod(String apiKey,
LocalDateTime rangeFromDate,
LocalDateTime rangeToDate) {
BatchCollectionPeriod period = periodRepository.findById(apiKey)
.orElseGet(() -> new BatchCollectionPeriod(apiKey, rangeFromDate, rangeToDate));
period.setRangeFromDate(rangeFromDate);
period.setRangeToDate(rangeToDate);
return periodRepository.save(period);
}
/**
* 수집 기간 초기화 (rangeFromDate, rangeToDate를 null로)
*/
@Transactional
public void resetCollectionPeriod(String apiKey) {
periodRepository.findById(apiKey).ifPresent(period -> {
period.setRangeFromDate(null);
period.setRangeToDate(null);
periodRepository.save(period);
log.info("[RecollectionHistory] 수집 기간 초기화: apiKey={}", apiKey);
});
}
}