package com.snp.batch.service; import com.snp.batch.global.dto.JobExecutionDetailDto; import com.snp.batch.global.model.BatchCollectionPeriod; import com.snp.batch.global.model.BatchLastExecution; import com.snp.batch.global.model.BatchRecollectionHistory; import com.snp.batch.global.model.BatchFailedRecord; import com.snp.batch.global.repository.BatchApiLogRepository; import com.snp.batch.global.repository.BatchCollectionPeriodRepository; import com.snp.batch.global.repository.BatchFailedRecordRepository; import com.snp.batch.global.repository.BatchLastExecutionRepository; import com.snp.batch.global.repository.BatchRecollectionHistoryRepository; import jakarta.persistence.criteria.Predicate; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.explore.JobExplorer; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.domain.Specification; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import com.snp.batch.global.model.BatchApiLog; import java.time.Duration; import java.time.LocalDateTime; import java.util.*; import java.util.stream.Collectors; @Slf4j @Service @RequiredArgsConstructor public class RecollectionHistoryService { private final BatchRecollectionHistoryRepository historyRepository; private final BatchCollectionPeriodRepository periodRepository; private final BatchLastExecutionRepository lastExecutionRepository; private final BatchApiLogRepository apiLogRepository; private final BatchFailedRecordRepository failedRecordRepository; private final JobExplorer jobExplorer; /** * 재수집 실행 시작 기록 * REQUIRES_NEW: Job 실패해도 이력은 보존 */ @Transactional(propagation = Propagation.REQUIRES_NEW) public BatchRecollectionHistory recordStart( String jobName, Long jobExecutionId, String apiKey, String executor, String reason) { if (apiKey == null || apiKey.isBlank()) { log.warn("[RecollectionHistory] apiKey가 null이므로 이력 미생성: jobName={}, executor={}", jobName, executor); return null; } boolean isRetryByRecordKeys = "AUTO_RETRY".equals(executor) || "MANUAL_RETRY".equals(executor); LocalDateTime rangeFrom = null; LocalDateTime rangeTo = null; String apiKeyName = null; boolean hasOverlap = false; String overlapIds = null; if (isRetryByRecordKeys) { // 실패 건 재수집 (자동/수동): 날짜 범위가 아닌 실패 레코드 키 기반이므로 날짜 없이 이력 생성 if (apiKey != null) { apiKeyName = periodRepository.findById(apiKey) .map(BatchCollectionPeriod::getApiKeyName) .orElse(null); } log.info("[RecollectionHistory] 실패 건 재수집 이력 생성 (날짜 범위 없음): executor={}, apiKey={}, apiKeyName={}", executor, apiKey, apiKeyName); } else { // 수동 재수집: BatchCollectionPeriod에서 날짜 범위 조회 Optional period = periodRepository.findById(apiKey); if (period.isEmpty()) { log.warn("[RecollectionHistory] apiKey {} 에 대한 수집기간 없음, 이력 미생성", apiKey); return null; } BatchCollectionPeriod cp = period.get(); rangeFrom = cp.getRangeFromDate(); rangeTo = cp.getRangeToDate(); apiKeyName = cp.getApiKeyName(); // 기간 중복 검출 List overlaps = historyRepository .findOverlappingHistories(apiKey, rangeFrom, rangeTo, -1L); hasOverlap = !overlaps.isEmpty(); overlapIds = overlaps.stream() .map(h -> String.valueOf(h.getHistoryId())) .collect(Collectors.joining(",")); if (overlapIds.length() > 490) { overlapIds = overlapIds.substring(0, 490) + "..."; } } LocalDateTime now = LocalDateTime.now(); BatchRecollectionHistory history = BatchRecollectionHistory.builder() .apiKey(apiKey) .apiKeyName(apiKeyName) .jobName(jobName) .jobExecutionId(jobExecutionId) .rangeFromDate(rangeFrom) .rangeToDate(rangeTo) .executionStatus("STARTED") .executionStartTime(now) .executor(executor != null ? executor : "SYSTEM") .recollectionReason(reason) .hasOverlap(hasOverlap) .overlappingHistoryIds(hasOverlap ? overlapIds : null) .createdAt(now) .updatedAt(now) .build(); BatchRecollectionHistory saved = historyRepository.save(history); log.info("[RecollectionHistory] 재수집 이력 생성: historyId={}, apiKey={}, jobExecutionId={}, range={}~{}", saved.getHistoryId(), apiKey, jobExecutionId, rangeFrom, rangeTo); return saved; } /** * 재수집 실행 완료 기록 */ @Transactional(propagation = Propagation.REQUIRES_NEW) public void recordCompletion( Long jobExecutionId, String status, Long readCount, Long writeCount, Long skipCount, Integer apiCallCount, Long totalResponseTimeMs, String failureReason) { Optional opt = historyRepository.findByJobExecutionId(jobExecutionId); if (opt.isEmpty()) { log.warn("[RecollectionHistory] jobExecutionId {} 에 해당하는 이력 없음", jobExecutionId); return; } BatchRecollectionHistory history = opt.get(); LocalDateTime now = LocalDateTime.now(); history.setExecutionStatus(status); history.setExecutionEndTime(now); history.setReadCount(readCount); history.setWriteCount(writeCount); history.setSkipCount(skipCount); history.setApiCallCount(apiCallCount); history.setTotalResponseTimeMs(totalResponseTimeMs); history.setFailureReason(failureReason); history.setUpdatedAt(now); if (history.getExecutionStartTime() != null) { history.setDurationMs(Duration.between(history.getExecutionStartTime(), now).toMillis()); } historyRepository.save(history); log.info("[RecollectionHistory] 재수집 완료 기록: jobExecutionId={}, status={}, read={}, write={}", jobExecutionId, status, readCount, writeCount); } /** * 동적 필터링 + 페이징 목록 조회 */ @Transactional(readOnly = true) public Page getHistories( String apiKey, String jobName, String status, LocalDateTime from, LocalDateTime to, Pageable pageable) { Specification spec = (root, query, cb) -> { List predicates = new ArrayList<>(); if (apiKey != null && !apiKey.isEmpty()) { predicates.add(cb.equal(root.get("apiKey"), apiKey)); } if (jobName != null && !jobName.isEmpty()) { predicates.add(cb.equal(root.get("jobName"), jobName)); } if (status != null && !status.isEmpty()) { predicates.add(cb.equal(root.get("executionStatus"), status)); } if (from != null) { predicates.add(cb.greaterThanOrEqualTo(root.get("executionStartTime"), from)); } if (to != null) { predicates.add(cb.lessThanOrEqualTo(root.get("executionStartTime"), to)); } query.orderBy(cb.desc(root.get("createdAt"))); return cb.and(predicates.toArray(new Predicate[0])); }; return historyRepository.findAll(spec, pageable); } /** * CSV 내보내기용 전체 목록 조회 (최대 10,000건) */ @Transactional(readOnly = true) public List getHistoriesForExport( String apiKey, String jobName, String status, LocalDateTime from, LocalDateTime to) { Specification spec = (root, query, cb) -> { List predicates = new ArrayList<>(); if (apiKey != null && !apiKey.isEmpty()) { predicates.add(cb.equal(root.get("apiKey"), apiKey)); } if (jobName != null && !jobName.isEmpty()) { predicates.add(cb.equal(root.get("jobName"), jobName)); } if (status != null && !status.isEmpty()) { predicates.add(cb.equal(root.get("executionStatus"), status)); } if (from != null) { predicates.add(cb.greaterThanOrEqualTo(root.get("executionStartTime"), from)); } if (to != null) { predicates.add(cb.lessThanOrEqualTo(root.get("executionStartTime"), to)); } query.orderBy(cb.desc(root.get("createdAt"))); return cb.and(predicates.toArray(new Predicate[0])); }; return historyRepository.findAll(spec, org.springframework.data.domain.PageRequest.of(0, 10000)).getContent(); } /** * 상세 조회 (중복 이력 실시간 재검사 포함) */ @Transactional(readOnly = true) public Map getHistoryDetail(Long historyId) { BatchRecollectionHistory history = historyRepository.findById(historyId) .orElseThrow(() -> new IllegalArgumentException("이력을 찾을 수 없습니다: " + historyId)); // 중복 이력 실시간 재검사 List currentOverlaps; if (history.getRangeFromDate() != null && history.getRangeToDate() != null) { currentOverlaps = historyRepository.findOverlappingHistories( history.getApiKey(), history.getRangeFromDate(), history.getRangeToDate(), history.getHistoryId()); } else { currentOverlaps = Collections.emptyList(); } // API 응답시간 통계 Map apiStats = null; if (history.getJobExecutionId() != null) { apiStats = getApiStats(history.getJobExecutionId()); } Map result = new LinkedHashMap<>(); result.put("history", history); result.put("overlappingHistories", currentOverlaps); result.put("apiStats", apiStats); return result; } /** * 상세 조회 + Step Execution + Collection Period 포함 * job_execution_id로 batch_step_execution, batch_collection_period를 조인 */ @Transactional(readOnly = true) public Map getHistoryDetailWithSteps(Long historyId) { BatchRecollectionHistory history = historyRepository.findById(historyId) .orElseThrow(() -> new IllegalArgumentException("이력을 찾을 수 없습니다: " + historyId)); // 중복 이력 실시간 재검사 List currentOverlaps; if (history.getRangeFromDate() != null && history.getRangeToDate() != null) { currentOverlaps = historyRepository.findOverlappingHistories( history.getApiKey(), history.getRangeFromDate(), history.getRangeToDate(), history.getHistoryId()); } else { currentOverlaps = Collections.emptyList(); } // API 응답시간 통계 Map apiStats = null; if (history.getJobExecutionId() != null) { apiStats = getApiStats(history.getJobExecutionId()); } // Collection Period 조회 BatchCollectionPeriod collectionPeriod = periodRepository .findById(history.getApiKey()).orElse(null); // Step Execution 조회 (job_execution_id 기반) List stepExecutions = new ArrayList<>(); if (history.getJobExecutionId() != null) { JobExecution jobExecution = jobExplorer.getJobExecution(history.getJobExecutionId()); if (jobExecution != null) { // N+1 방지: stepExecutionId 목록을 일괄 조회 후 Map으로 변환 List stepIds = jobExecution.getStepExecutions().stream() .map(StepExecution::getId) .toList(); Map> failedRecordsMap = failedRecordRepository .findByStepExecutionIdIn(stepIds).stream() .collect(Collectors.groupingBy(BatchFailedRecord::getStepExecutionId)); stepExecutions = jobExecution.getStepExecutions().stream() .map(step -> convertStepToDto(step, failedRecordsMap.getOrDefault(step.getId(), Collections.emptyList()))) .collect(Collectors.toList()); } } Map result = new LinkedHashMap<>(); result.put("history", history); result.put("overlappingHistories", currentOverlaps); result.put("apiStats", apiStats); result.put("collectionPeriod", collectionPeriod); result.put("stepExecutions", stepExecutions); return result; } private JobExecutionDetailDto.StepExecutionDto convertStepToDto(StepExecution stepExecution, List failedRecords) { Long duration = null; if (stepExecution.getStartTime() != null && stepExecution.getEndTime() != null) { duration = Duration.between(stepExecution.getStartTime(), stepExecution.getEndTime()).toMillis(); } // StepExecutionContext에서 API 정보 추출 (ExecutionDetail 호환) JobExecutionDetailDto.ApiCallInfo apiCallInfo = null; var context = stepExecution.getExecutionContext(); if (context.containsKey("apiUrl")) { apiCallInfo = JobExecutionDetailDto.ApiCallInfo.builder() .apiUrl(context.getString("apiUrl", "")) .method(context.getString("apiMethod", "")) .totalCalls(context.containsKey("totalApiCalls") ? context.getInt("totalApiCalls", 0) : null) .completedCalls(context.containsKey("completedApiCalls") ? context.getInt("completedApiCalls", 0) : null) .lastCallTime(context.containsKey("lastCallTime") ? context.getString("lastCallTime", "") : null) .build(); } // batch_api_log 테이블에서 Step별 API 로그 집계 + 개별 로그 조회 JobExecutionDetailDto.StepApiLogSummary apiLogSummary = buildStepApiLogSummary(stepExecution.getId()); // Step별 실패 레코드 DTO 변환 (사전 일괄 조회된 목록 사용) List failedRecordDtos = failedRecords.stream() .map(record -> JobExecutionDetailDto.FailedRecordDto.builder() .id(record.getId()) .jobName(record.getJobName()) .recordKey(record.getRecordKey()) .errorMessage(record.getErrorMessage()) .retryCount(record.getRetryCount()) .status(record.getStatus()) .createdAt(record.getCreatedAt()) .build()) .collect(Collectors.toList()); return JobExecutionDetailDto.StepExecutionDto.builder() .stepExecutionId(stepExecution.getId()) .stepName(stepExecution.getStepName()) .status(stepExecution.getStatus().name()) .startTime(stepExecution.getStartTime()) .endTime(stepExecution.getEndTime()) .readCount((int) stepExecution.getReadCount()) .writeCount((int) stepExecution.getWriteCount()) .commitCount((int) stepExecution.getCommitCount()) .rollbackCount((int) stepExecution.getRollbackCount()) .readSkipCount((int) stepExecution.getReadSkipCount()) .processSkipCount((int) stepExecution.getProcessSkipCount()) .writeSkipCount((int) stepExecution.getWriteSkipCount()) .filterCount((int) stepExecution.getFilterCount()) .exitCode(stepExecution.getExitStatus().getExitCode()) .exitMessage(stepExecution.getExitStatus().getExitDescription()) .duration(duration) .apiCallInfo(apiCallInfo) .apiLogSummary(apiLogSummary) .failedRecords(failedRecordDtos.isEmpty() ? null : failedRecordDtos) .build(); } /** * Step별 batch_api_log 통계 집계 (개별 로그는 별도 API로 페이징 조회) */ private JobExecutionDetailDto.StepApiLogSummary buildStepApiLogSummary(Long stepExecutionId) { List stats = apiLogRepository.getApiStatsByStepExecutionId(stepExecutionId); if (stats.isEmpty() || stats.get(0) == null || ((Number) stats.get(0)[0]).longValue() == 0L) { return null; } Object[] row = stats.get(0); return JobExecutionDetailDto.StepApiLogSummary.builder() .totalCalls(((Number) row[0]).longValue()) .successCount(((Number) row[1]).longValue()) .errorCount(((Number) row[2]).longValue()) .avgResponseMs(((Number) row[3]).doubleValue()) .maxResponseMs(((Number) row[4]).longValue()) .minResponseMs(((Number) row[5]).longValue()) .totalResponseMs(((Number) row[6]).longValue()) .totalRecordCount(((Number) row[7]).longValue()) .build(); } /** * 재수집 이력 목록의 jobExecutionId별 FAILED 상태 실패건수 조회 */ @Transactional(readOnly = true) public Map getFailedRecordCounts(List jobExecutionIds) { if (jobExecutionIds.isEmpty()) { return Collections.emptyMap(); } return failedRecordRepository.countFailedByJobExecutionIds(jobExecutionIds).stream() .collect(Collectors.toMap( row -> ((Number) row[0]).longValue(), row -> ((Number) row[1]).longValue() )); } /** * 대시보드용 최근 10건 */ @Transactional(readOnly = true) public List getRecentHistories() { return historyRepository.findTop10ByOrderByCreatedAtDesc(); } /** * 통계 조회 */ @Transactional(readOnly = true) public Map getHistoryStats() { Map stats = new LinkedHashMap<>(); stats.put("totalCount", historyRepository.count()); stats.put("completedCount", historyRepository.countByExecutionStatus("COMPLETED")); stats.put("failedCount", historyRepository.countByExecutionStatus("FAILED")); stats.put("runningCount", historyRepository.countByExecutionStatus("STARTED")); stats.put("overlapCount", historyRepository.countByHasOverlapTrue()); return stats; } /** * API 응답시간 통계 (BatchApiLog 집계) */ @Transactional(readOnly = true) public Map getApiStats(Long jobExecutionId) { List results = apiLogRepository.getApiStatsByJobExecutionId(jobExecutionId); if (results.isEmpty() || results.get(0) == null) { return null; } Object[] row = results.get(0); Map stats = new LinkedHashMap<>(); stats.put("callCount", row[0]); stats.put("totalMs", row[1]); stats.put("avgMs", row[2]); stats.put("maxMs", row[3]); stats.put("minMs", row[4]); return stats; } /** * jobName으로 BatchCollectionPeriod에서 apiKey를 조회합니다. */ @Transactional(readOnly = true) public String findApiKeyByJobName(String jobName) { return periodRepository.findByJobName(jobName) .map(BatchCollectionPeriod::getApiKey) .orElse(null); } /** * 재수집 실행 전: 현재 last_success_date 조회 (복원용) */ @Transactional(readOnly = true) public LocalDateTime getLastSuccessDate(String apiKey) { return lastExecutionRepository.findById(apiKey) .map(BatchLastExecution::getLastSuccessDate) .orElse(null); } /** * 재수집 실행 후: Tasklet이 업데이트한 last_success_date를 원래 값으로 복원 * 재수집은 과거 데이터 재처리이므로 last_success_date를 변경하면 안 됨 */ @Transactional(propagation = Propagation.REQUIRES_NEW) public void restoreLastSuccessDate(String apiKey, LocalDateTime originalDate) { if (originalDate == null) return; lastExecutionRepository.findById(apiKey).ifPresent(lastExec -> { LocalDateTime beforeDate = lastExec.getLastSuccessDate(); lastExec.setLastSuccessDate(originalDate); lastExec.setUpdatedAt(LocalDateTime.now()); lastExecutionRepository.save(lastExec); log.info("[RecollectionHistory] last_success_date 복원: apiKey={}, before={}, after={}", apiKey, beforeDate, originalDate); }); } /** * 수집 기간 전체 조회 */ @Transactional(readOnly = true) public List getAllCollectionPeriods() { return periodRepository.findAllByOrderByOrderSeqAsc(); } /** * 수집 기간 수정 */ @Transactional public BatchCollectionPeriod updateCollectionPeriod(String apiKey, LocalDateTime rangeFromDate, LocalDateTime rangeToDate) { BatchCollectionPeriod period = periodRepository.findById(apiKey) .orElseGet(() -> new BatchCollectionPeriod(apiKey, rangeFromDate, rangeToDate)); period.setRangeFromDate(rangeFromDate); period.setRangeToDate(rangeToDate); return periodRepository.save(period); } /** * 수집 기간 초기화 (rangeFromDate, rangeToDate를 null로) */ @Transactional public void resetCollectionPeriod(String apiKey) { periodRepository.findById(apiKey).ifPresent(period -> { period.setRangeFromDate(null); period.setRangeToDate(null); periodRepository.save(period); log.info("[RecollectionHistory] 수집 기간 초기화: apiKey={}", apiKey); }); } }