import logging from contextlib import contextmanager from typing import TYPE_CHECKING, Optional import psycopg2 from psycopg2 import pool from psycopg2.extras import execute_values from config import settings if TYPE_CHECKING: from models.result import AnalysisResult logger = logging.getLogger(__name__) _pool: Optional[pool.ThreadedConnectionPool] = None def init_pool(): global _pool _pool = pool.ThreadedConnectionPool( minconn=1, maxconn=3, host=settings.KCGDB_HOST, port=settings.KCGDB_PORT, dbname=settings.KCGDB_NAME, user=settings.KCGDB_USER, password=settings.KCGDB_PASSWORD, options=f'-c search_path={settings.KCGDB_SCHEMA},public', ) logger.info('kcgdb connection pool initialized') def close_pool(): global _pool if _pool: _pool.closeall() _pool = None logger.info('kcgdb connection pool closed') @contextmanager def get_conn(): conn = _pool.getconn() try: yield conn except Exception: conn.rollback() raise finally: _pool.putconn(conn) def check_health() -> bool: try: with get_conn() as conn: with conn.cursor() as cur: cur.execute('SELECT 1') return True except Exception as e: logger.error('kcgdb health check failed: %s', e) return False def upsert_results(results: list['AnalysisResult']) -> int: """분석 결과를 vessel_analysis_results 테이블에 upsert.""" if not results: return 0 insert_sql = """ INSERT INTO vessel_analysis_results ( mmsi, timestamp, vessel_type, confidence, fishing_pct, cluster_id, season, zone, dist_to_baseline_nm, activity_state, ucaf_score, ucft_score, is_dark, gap_duration_min, spoofing_score, bd09_offset_m, speed_jump_count, cluster_size, is_leader, fleet_role, risk_score, risk_level, is_transship_suspect, transship_pair_mmsi, transship_duration_min, features, analyzed_at ) VALUES %s ON CONFLICT (mmsi, timestamp) DO UPDATE SET vessel_type = EXCLUDED.vessel_type, confidence = EXCLUDED.confidence, fishing_pct = EXCLUDED.fishing_pct, cluster_id = EXCLUDED.cluster_id, season = EXCLUDED.season, zone = EXCLUDED.zone, dist_to_baseline_nm = EXCLUDED.dist_to_baseline_nm, activity_state = EXCLUDED.activity_state, ucaf_score = EXCLUDED.ucaf_score, ucft_score = EXCLUDED.ucft_score, is_dark = EXCLUDED.is_dark, gap_duration_min = EXCLUDED.gap_duration_min, spoofing_score = EXCLUDED.spoofing_score, bd09_offset_m = EXCLUDED.bd09_offset_m, speed_jump_count = EXCLUDED.speed_jump_count, cluster_size = EXCLUDED.cluster_size, is_leader = EXCLUDED.is_leader, fleet_role = EXCLUDED.fleet_role, risk_score = EXCLUDED.risk_score, risk_level = EXCLUDED.risk_level, is_transship_suspect = EXCLUDED.is_transship_suspect, transship_pair_mmsi = EXCLUDED.transship_pair_mmsi, transship_duration_min = EXCLUDED.transship_duration_min, features = EXCLUDED.features, analyzed_at = EXCLUDED.analyzed_at """ try: with get_conn() as conn: with conn.cursor() as cur: tuples = [r.to_db_tuple() for r in results] execute_values(cur, insert_sql, tuples, page_size=100) conn.commit() count = len(tuples) logger.info('upserted %d analysis results', count) return count except Exception as e: logger.error('failed to upsert results: %s', e) return 0 def cleanup_old(hours: int = 48) -> int: """오래된 분석 결과 삭제.""" try: with get_conn() as conn: with conn.cursor() as cur: cur.execute( 'DELETE FROM vessel_analysis_results WHERE analyzed_at < NOW() - (%s * INTERVAL \'1 hour\')', (hours,), ) deleted = cur.rowcount conn.commit() if deleted > 0: logger.info('cleaned up %d old results (older than %dh)', deleted, hours) return deleted except Exception as e: logger.error('failed to cleanup old results: %s', e) return 0