package com.snp.batch.service; import com.snp.batch.global.config.BatchTableProperties; import com.snp.batch.global.dto.SyncDataPreviewResponse; import com.snp.batch.global.dto.SyncStatusResponse; import com.snp.batch.global.dto.SyncStatusResponse.SyncDomainGroup; import com.snp.batch.global.dto.SyncStatusResponse.SyncStatusSummary; import com.snp.batch.global.dto.SyncStatusResponse.SyncTableStatus; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; import javax.sql.DataSource; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; /** * 동기화 현황 조회 서비스 * - batch_flag 기반 테이블별 N/P/S 건수 집계 * - 타겟 스키마 데이터 미리보기 * - P 상태 고착 레코드 리셋 */ @Slf4j @Service public class SyncStatusService { private static final Map DOMAIN_LABELS = Map.of( "ship", "Ship (선박)", "company", "Company (회사)", "event", "Event (사건)", "facility", "Facility (시설)", "psc", "PSC (검사)", "movements", "Movements (이동)", "code", "Code (코드)", "risk-compliance", "Risk & Compliance" ); private static final List DOMAIN_ORDER = List.of( "ship", "company", "event", "facility", "psc", "movements", "code", "risk-compliance" ); private final JdbcTemplate businessJdbc; private final BatchTableProperties tableProps; private String sourceSchema; private String targetSchema; private Map sourceTables; private Map targetTables; public SyncStatusService(@Qualifier("businessDataSource") DataSource businessDataSource, BatchTableProperties tableProps) { this.businessJdbc = new JdbcTemplate(businessDataSource); this.tableProps = tableProps; this.sourceSchema = tableProps.getSourceSchema().getName(); this.targetSchema = tableProps.getTargetSchema().getName(); this.sourceTables = tableProps.getSourceSchema().getTables(); this.targetTables = tableProps.getTargetSchema().getTables(); } /** * 전체 동기화 현황 조회 */ public SyncStatusResponse getSyncStatus() { // 테이블을 병렬 조회 (HikariCP pool=10 기준 동시 10개) ExecutorService executor = Executors.newFixedThreadPool( Math.min(sourceTables.size(), 10)); // source와 target 양쪽 모두 매핑된 테이블만 조회 List> futures = sourceTables.entrySet().stream() .filter(entry -> targetTables.containsKey(entry.getKey())) .map(entry -> CompletableFuture.supplyAsync(() -> { String tableKey = entry.getKey(); String sourceTable = entry.getValue(); String targetTable = targetTables.get(tableKey); try { return queryTableStatus(tableKey, sourceTable, targetTable); } catch (Exception e) { log.warn("테이블 상태 조회 실패: {} ({})", tableKey, e.getMessage()); return SyncTableStatus.builder() .tableKey(tableKey) .sourceTable(sourceTable) .targetTable(targetTable) .domain(extractDomain(tableKey)) .pendingCount(0) .processingCount(0) .completedCount(0) .stuck(false) .build(); } }, executor)) .collect(Collectors.toList()); List allStatuses = futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); executor.shutdown(); // 도메인별 그룹핑 Map> grouped = allStatuses.stream() .collect(Collectors.groupingBy(SyncTableStatus::getDomain)); List domains = DOMAIN_ORDER.stream() .filter(grouped::containsKey) .map(domain -> SyncDomainGroup.builder() .domain(domain) .domainLabel(DOMAIN_LABELS.getOrDefault(domain, domain)) .tables(grouped.get(domain)) .build()) .collect(Collectors.toList()); // 요약 long totalPending = allStatuses.stream().mapToLong(SyncTableStatus::getPendingCount).sum(); long totalProcessing = allStatuses.stream().mapToLong(SyncTableStatus::getProcessingCount).sum(); long totalCompleted = allStatuses.stream().mapToLong(SyncTableStatus::getCompletedCount).sum(); int stuckTables = (int) allStatuses.stream().filter(SyncTableStatus::isStuck).count(); SyncStatusSummary summary = SyncStatusSummary.builder() .totalTables(allStatuses.size()) .pendingCount(totalPending) .processingCount(totalProcessing) .completedCount(totalCompleted) .stuckTables(stuckTables) .build(); return SyncStatusResponse.builder() .summary(summary) .domains(domains) .build(); } /** * 특정 테이블의 최근 동기화 데이터 미리보기 */ public SyncDataPreviewResponse getDataPreview(String tableKey, int limit) { String targetTable = targetTables.get(tableKey); if (targetTable == null) { throw new IllegalArgumentException("존재하지 않는 테이블 키: " + tableKey); } String countSql = "SELECT COUNT(*) FROM %s.%s".formatted(targetSchema, targetTable); Long totalCount = businessJdbc.queryForObject(countSql, Long.class); String sql = "SELECT * FROM %s.%s ORDER BY mdfcn_dt DESC NULLS LAST LIMIT %d" .formatted(targetSchema, targetTable, limit); List> rows = businessJdbc.queryForList(sql); List columns = rows.isEmpty() ? getTableColumns(targetTable) : new ArrayList<>(rows.get(0).keySet()); return SyncDataPreviewResponse.builder() .tableKey(tableKey) .targetTable(targetTable) .targetSchema(targetSchema) .columns(columns) .rows(rows) .totalCount(totalCount != null ? totalCount : 0) .build(); } /** * P 상태 고착 레코드 조회 */ public SyncDataPreviewResponse getStuckRecords(String tableKey, int limit) { String sourceTable = sourceTables.get(tableKey); if (sourceTable == null) { throw new IllegalArgumentException("존재하지 않는 테이블 키: " + tableKey); } String countSql = """ SELECT COUNT(*) FROM %s.%s a INNER JOIN %s.batch_job_execution b ON a.job_execution_id = b.job_execution_id AND b.status = 'COMPLETED' WHERE a.batch_flag = 'P' """.formatted(sourceSchema, sourceTable, sourceSchema); Long totalCount = businessJdbc.queryForObject(countSql, Long.class); String sql = """ SELECT a.* FROM %s.%s a INNER JOIN %s.batch_job_execution b ON a.job_execution_id = b.job_execution_id AND b.status = 'COMPLETED' WHERE a.batch_flag = 'P' ORDER BY a.mdfcn_dt DESC NULLS LAST LIMIT %d """.formatted(sourceSchema, sourceTable, sourceSchema, limit); List> rows = businessJdbc.queryForList(sql); List columns = rows.isEmpty() ? getTableColumns(sourceTable) : new ArrayList<>(rows.get(0).keySet()); return SyncDataPreviewResponse.builder() .tableKey(tableKey) .targetTable(sourceTable) .targetSchema(sourceSchema) .columns(columns) .rows(rows) .totalCount(totalCount != null ? totalCount : 0) .build(); } /** * P 상태 고착 레코드를 N으로 리셋 */ public int resetStuckRecords(String tableKey) { String sourceTable = sourceTables.get(tableKey); if (sourceTable == null) { throw new IllegalArgumentException("존재하지 않는 테이블 키: " + tableKey); } String sql = """ UPDATE %s.%s SET batch_flag = 'N' , mdfcn_dt = CURRENT_TIMESTAMP , mdfr_id = 'MANUAL_RESET' WHERE batch_flag = 'P' """.formatted(sourceSchema, sourceTable); int updated = businessJdbc.update(sql); log.info("P→N 리셋 완료: {} ({}) {}건", tableKey, sourceTable, updated); return updated; } private SyncTableStatus queryTableStatus(String tableKey, String sourceTable, String targetTable) { // batch_job_execution.status = 'COMPLETED'인 데이터만 집계 // (수집/적재가 완전히 완료된 데이터만 동기화 대상) String sql = """ SELECT a.batch_flag, COUNT(*) AS cnt FROM %s.%s a INNER JOIN %s.batch_job_execution b ON a.job_execution_id = b.job_execution_id AND b.status = 'COMPLETED' WHERE a.batch_flag IN ('N', 'P', 'S') GROUP BY a.batch_flag """.formatted(sourceSchema, sourceTable, sourceSchema); Map counts = new HashMap<>(); counts.put("N", 0L); counts.put("P", 0L); counts.put("S", 0L); businessJdbc.query(sql, rs -> { counts.put(rs.getString("batch_flag"), rs.getLong("cnt")); }); // 최근 동기화 시간 (COMPLETED된 job의 batch_flag='S'인 가장 최근 mdfcn_dt) String lastSyncSql = """ SELECT MAX(a.mdfcn_dt) FROM %s.%s a INNER JOIN %s.batch_job_execution b ON a.job_execution_id = b.job_execution_id AND b.status = 'COMPLETED' WHERE a.batch_flag = 'S' """.formatted(sourceSchema, sourceTable, sourceSchema); String lastSyncTime = null; try { lastSyncTime = businessJdbc.queryForObject(lastSyncSql, String.class); } catch (Exception e) { log.trace("최근 동기화 시간 조회 실패: {}", tableKey); } boolean stuck = counts.get("P") > 0; return SyncTableStatus.builder() .tableKey(tableKey) .sourceTable(sourceTable) .targetTable(targetTable) .domain(extractDomain(tableKey)) .pendingCount(counts.get("N")) .processingCount(counts.get("P")) .completedCount(counts.get("S")) .lastSyncTime(lastSyncTime) .stuck(stuck) .build(); } private String extractDomain(String tableKey) { int dashIndex = tableKey.indexOf('-'); if (dashIndex < 0) return tableKey; String prefix = tableKey.substring(0, dashIndex); // "risk" prefix → "risk-compliance" domain if ("risk".equals(prefix)) return "risk-compliance"; return prefix; } private List getTableColumns(String tableName) { String sql = """ SELECT column_name FROM information_schema.columns WHERE table_schema = ? AND table_name = ? ORDER BY ordinal_position """; return businessJdbc.queryForList(sql, String.class, targetSchema, tableName); } }