package com.snp.batch.service; import com.snp.batch.global.dto.JobExecutionDetailDto; import com.snp.batch.global.model.BatchCollectionPeriod; import com.snp.batch.global.model.BatchLastExecution; import com.snp.batch.global.model.BatchRecollectionHistory; import com.snp.batch.global.repository.BatchApiLogRepository; import com.snp.batch.global.repository.BatchCollectionPeriodRepository; import com.snp.batch.global.repository.BatchLastExecutionRepository; import com.snp.batch.global.repository.BatchRecollectionHistoryRepository; import jakarta.persistence.criteria.Predicate; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.explore.JobExplorer; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.domain.Specification; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import com.snp.batch.global.model.BatchApiLog; import java.time.Duration; import java.time.LocalDateTime; import java.util.*; import java.util.stream.Collectors; @Slf4j @Service @RequiredArgsConstructor public class RecollectionHistoryService { private final BatchRecollectionHistoryRepository historyRepository; private final BatchCollectionPeriodRepository periodRepository; private final BatchLastExecutionRepository lastExecutionRepository; private final BatchApiLogRepository apiLogRepository; private final JobExplorer jobExplorer; /** * 재수집 실행 시작 기록 * REQUIRES_NEW: Job 실패해도 이력은 보존 */ @Transactional(propagation = Propagation.REQUIRES_NEW) public BatchRecollectionHistory recordStart( String jobName, Long jobExecutionId, String apiKey, String executor, String reason) { Optional period = periodRepository.findById(apiKey); if (period.isEmpty()) { log.warn("[RecollectionHistory] apiKey {} 에 대한 수집기간 없음, 이력 미생성", apiKey); return null; } BatchCollectionPeriod cp = period.get(); LocalDateTime rangeFrom = cp.getRangeFromDate(); LocalDateTime rangeTo = cp.getRangeToDate(); // 기간 중복 검출 List overlaps = historyRepository .findOverlappingHistories(apiKey, rangeFrom, rangeTo, -1L); boolean hasOverlap = !overlaps.isEmpty(); String overlapIds = overlaps.stream() .map(h -> String.valueOf(h.getHistoryId())) .collect(Collectors.joining(",")); LocalDateTime now = LocalDateTime.now(); BatchRecollectionHistory history = BatchRecollectionHistory.builder() .apiKey(apiKey) .apiKeyName(cp.getApiKeyName()) .jobName(jobName) .jobExecutionId(jobExecutionId) .rangeFromDate(rangeFrom) .rangeToDate(rangeTo) .executionStatus("STARTED") .executionStartTime(now) .executor(executor != null ? executor : "SYSTEM") .recollectionReason(reason) .hasOverlap(hasOverlap) .overlappingHistoryIds(hasOverlap ? overlapIds : null) .createdAt(now) .updatedAt(now) .build(); BatchRecollectionHistory saved = historyRepository.save(history); log.info("[RecollectionHistory] 재수집 이력 생성: historyId={}, apiKey={}, jobExecutionId={}, range={}~{}", saved.getHistoryId(), apiKey, jobExecutionId, rangeFrom, rangeTo); return saved; } /** * 재수집 실행 완료 기록 */ @Transactional(propagation = Propagation.REQUIRES_NEW) public void recordCompletion( Long jobExecutionId, String status, Long readCount, Long writeCount, Long skipCount, Integer apiCallCount, Long totalResponseTimeMs, String failureReason) { Optional opt = historyRepository.findByJobExecutionId(jobExecutionId); if (opt.isEmpty()) { log.warn("[RecollectionHistory] jobExecutionId {} 에 해당하는 이력 없음", jobExecutionId); return; } BatchRecollectionHistory history = opt.get(); LocalDateTime now = LocalDateTime.now(); history.setExecutionStatus(status); history.setExecutionEndTime(now); history.setReadCount(readCount); history.setWriteCount(writeCount); history.setSkipCount(skipCount); history.setApiCallCount(apiCallCount); history.setTotalResponseTimeMs(totalResponseTimeMs); history.setFailureReason(failureReason); history.setUpdatedAt(now); if (history.getExecutionStartTime() != null) { history.setDurationMs(Duration.between(history.getExecutionStartTime(), now).toMillis()); } historyRepository.save(history); log.info("[RecollectionHistory] 재수집 완료 기록: jobExecutionId={}, status={}, read={}, write={}", jobExecutionId, status, readCount, writeCount); } /** * 동적 필터링 + 페이징 목록 조회 */ @Transactional(readOnly = true) public Page getHistories( String apiKey, String jobName, String status, LocalDateTime from, LocalDateTime to, Pageable pageable) { Specification spec = (root, query, cb) -> { List predicates = new ArrayList<>(); if (apiKey != null && !apiKey.isEmpty()) { predicates.add(cb.equal(root.get("apiKey"), apiKey)); } if (jobName != null && !jobName.isEmpty()) { predicates.add(cb.equal(root.get("jobName"), jobName)); } if (status != null && !status.isEmpty()) { predicates.add(cb.equal(root.get("executionStatus"), status)); } if (from != null) { predicates.add(cb.greaterThanOrEqualTo(root.get("executionStartTime"), from)); } if (to != null) { predicates.add(cb.lessThanOrEqualTo(root.get("executionStartTime"), to)); } query.orderBy(cb.desc(root.get("createdAt"))); return cb.and(predicates.toArray(new Predicate[0])); }; return historyRepository.findAll(spec, pageable); } /** * 상세 조회 (중복 이력 실시간 재검사 포함) */ @Transactional(readOnly = true) public Map getHistoryDetail(Long historyId) { BatchRecollectionHistory history = historyRepository.findById(historyId) .orElseThrow(() -> new IllegalArgumentException("이력을 찾을 수 없습니다: " + historyId)); // 중복 이력 실시간 재검사 List currentOverlaps = historyRepository .findOverlappingHistories(history.getApiKey(), history.getRangeFromDate(), history.getRangeToDate(), history.getHistoryId()); // API 응답시간 통계 Map apiStats = null; if (history.getJobExecutionId() != null) { apiStats = getApiStats(history.getJobExecutionId()); } Map result = new LinkedHashMap<>(); result.put("history", history); result.put("overlappingHistories", currentOverlaps); result.put("apiStats", apiStats); return result; } /** * 상세 조회 + Step Execution + Collection Period 포함 * job_execution_id로 batch_step_execution, batch_collection_period를 조인 */ @Transactional(readOnly = true) public Map getHistoryDetailWithSteps(Long historyId) { BatchRecollectionHistory history = historyRepository.findById(historyId) .orElseThrow(() -> new IllegalArgumentException("이력을 찾을 수 없습니다: " + historyId)); // 중복 이력 실시간 재검사 List currentOverlaps = historyRepository .findOverlappingHistories(history.getApiKey(), history.getRangeFromDate(), history.getRangeToDate(), history.getHistoryId()); // API 응답시간 통계 Map apiStats = null; if (history.getJobExecutionId() != null) { apiStats = getApiStats(history.getJobExecutionId()); } // Collection Period 조회 BatchCollectionPeriod collectionPeriod = periodRepository .findById(history.getApiKey()).orElse(null); // Step Execution 조회 (job_execution_id 기반) List stepExecutions = new ArrayList<>(); if (history.getJobExecutionId() != null) { JobExecution jobExecution = jobExplorer.getJobExecution(history.getJobExecutionId()); if (jobExecution != null) { stepExecutions = jobExecution.getStepExecutions().stream() .map(this::convertStepToDto) .collect(Collectors.toList()); } } Map result = new LinkedHashMap<>(); result.put("history", history); result.put("overlappingHistories", currentOverlaps); result.put("apiStats", apiStats); result.put("collectionPeriod", collectionPeriod); result.put("stepExecutions", stepExecutions); return result; } private JobExecutionDetailDto.StepExecutionDto convertStepToDto(StepExecution stepExecution) { Long duration = null; if (stepExecution.getStartTime() != null && stepExecution.getEndTime() != null) { duration = Duration.between(stepExecution.getStartTime(), stepExecution.getEndTime()).toMillis(); } // StepExecutionContext에서 API 정보 추출 (ExecutionDetail 호환) JobExecutionDetailDto.ApiCallInfo apiCallInfo = null; var context = stepExecution.getExecutionContext(); if (context.containsKey("apiUrl")) { apiCallInfo = JobExecutionDetailDto.ApiCallInfo.builder() .apiUrl(context.getString("apiUrl", "")) .method(context.getString("apiMethod", "")) .totalCalls(context.containsKey("totalApiCalls") ? context.getInt("totalApiCalls", 0) : null) .completedCalls(context.containsKey("completedApiCalls") ? context.getInt("completedApiCalls", 0) : null) .lastCallTime(context.containsKey("lastCallTime") ? context.getString("lastCallTime", "") : null) .build(); } // batch_api_log 테이블에서 Step별 API 로그 집계 + 개별 로그 조회 JobExecutionDetailDto.StepApiLogSummary apiLogSummary = buildStepApiLogSummary(stepExecution.getId()); return JobExecutionDetailDto.StepExecutionDto.builder() .stepExecutionId(stepExecution.getId()) .stepName(stepExecution.getStepName()) .status(stepExecution.getStatus().name()) .startTime(stepExecution.getStartTime()) .endTime(stepExecution.getEndTime()) .readCount((int) stepExecution.getReadCount()) .writeCount((int) stepExecution.getWriteCount()) .commitCount((int) stepExecution.getCommitCount()) .rollbackCount((int) stepExecution.getRollbackCount()) .readSkipCount((int) stepExecution.getReadSkipCount()) .processSkipCount((int) stepExecution.getProcessSkipCount()) .writeSkipCount((int) stepExecution.getWriteSkipCount()) .filterCount((int) stepExecution.getFilterCount()) .exitCode(stepExecution.getExitStatus().getExitCode()) .exitMessage(stepExecution.getExitStatus().getExitDescription()) .duration(duration) .apiCallInfo(apiCallInfo) .apiLogSummary(apiLogSummary) .build(); } /** * Step별 batch_api_log 통계 집계 (개별 로그는 별도 API로 페이징 조회) */ private JobExecutionDetailDto.StepApiLogSummary buildStepApiLogSummary(Long stepExecutionId) { List stats = apiLogRepository.getApiStatsByStepExecutionId(stepExecutionId); if (stats.isEmpty() || stats.get(0) == null || ((Number) stats.get(0)[0]).longValue() == 0L) { return null; } Object[] row = stats.get(0); return JobExecutionDetailDto.StepApiLogSummary.builder() .totalCalls(((Number) row[0]).longValue()) .successCount(((Number) row[1]).longValue()) .errorCount(((Number) row[2]).longValue()) .avgResponseMs(((Number) row[3]).doubleValue()) .maxResponseMs(((Number) row[4]).longValue()) .minResponseMs(((Number) row[5]).longValue()) .totalResponseMs(((Number) row[6]).longValue()) .totalRecordCount(((Number) row[7]).longValue()) .build(); } /** * 대시보드용 최근 10건 */ @Transactional(readOnly = true) public List getRecentHistories() { return historyRepository.findTop10ByOrderByCreatedAtDesc(); } /** * 통계 조회 */ @Transactional(readOnly = true) public Map getHistoryStats() { Map stats = new LinkedHashMap<>(); stats.put("totalCount", historyRepository.count()); stats.put("completedCount", historyRepository.countByExecutionStatus("COMPLETED")); stats.put("failedCount", historyRepository.countByExecutionStatus("FAILED")); stats.put("runningCount", historyRepository.countByExecutionStatus("STARTED")); stats.put("overlapCount", historyRepository.countByHasOverlapTrue()); return stats; } /** * API 응답시간 통계 (BatchApiLog 집계) */ @Transactional(readOnly = true) public Map getApiStats(Long jobExecutionId) { List results = apiLogRepository.getApiStatsByJobExecutionId(jobExecutionId); if (results.isEmpty() || results.get(0) == null) { return null; } Object[] row = results.get(0); Map stats = new LinkedHashMap<>(); stats.put("callCount", row[0]); stats.put("totalMs", row[1]); stats.put("avgMs", row[2]); stats.put("maxMs", row[3]); stats.put("minMs", row[4]); return stats; } /** * 재수집 실행 전: 현재 last_success_date 조회 (복원용) */ @Transactional(readOnly = true) public LocalDateTime getLastSuccessDate(String apiKey) { return lastExecutionRepository.findById(apiKey) .map(BatchLastExecution::getLastSuccessDate) .orElse(null); } /** * 재수집 실행 후: Tasklet이 업데이트한 last_success_date를 원래 값으로 복원 * 재수집은 과거 데이터 재처리이므로 last_success_date를 변경하면 안 됨 */ @Transactional(propagation = Propagation.REQUIRES_NEW) public void restoreLastSuccessDate(String apiKey, LocalDateTime originalDate) { if (originalDate == null) return; lastExecutionRepository.findById(apiKey).ifPresent(lastExec -> { LocalDateTime beforeDate = lastExec.getLastSuccessDate(); lastExec.setLastSuccessDate(originalDate); lastExec.setUpdatedAt(LocalDateTime.now()); lastExecutionRepository.save(lastExec); log.info("[RecollectionHistory] last_success_date 복원: apiKey={}, before={}, after={}", apiKey, beforeDate, originalDate); }); } /** * 수집 기간 전체 조회 */ @Transactional(readOnly = true) public List getAllCollectionPeriods() { return periodRepository.findAllByOrderByOrderSeqAsc(); } /** * 수집 기간 수정 */ @Transactional public BatchCollectionPeriod updateCollectionPeriod(String apiKey, LocalDateTime rangeFromDate, LocalDateTime rangeToDate) { BatchCollectionPeriod period = periodRepository.findById(apiKey) .orElseGet(() -> new BatchCollectionPeriod(apiKey, rangeFromDate, rangeToDate)); period.setRangeFromDate(rangeFromDate); period.setRangeToDate(rangeToDate); return periodRepository.save(period); } /** * 수집 기간 초기화 (rangeFromDate, rangeToDate를 null로) */ @Transactional public void resetCollectionPeriod(String apiKey) { periodRepository.findById(apiKey).ifPresent(period -> { period.setRangeFromDate(null); period.setRangeToDate(null); periodRepository.save(period); log.info("[RecollectionHistory] 수집 기간 초기화: apiKey={}", apiKey); }); } }