import json import logging from contextlib import contextmanager from typing import TYPE_CHECKING, Optional import psycopg2 from psycopg2 import pool from psycopg2.extras import execute_values from config import settings if TYPE_CHECKING: from models.result import AnalysisResult logger = logging.getLogger(__name__) _pool: Optional[pool.ThreadedConnectionPool] = None def init_pool(): global _pool _pool = pool.ThreadedConnectionPool( minconn=1, maxconn=5, host=settings.KCGDB_HOST, port=settings.KCGDB_PORT, dbname=settings.KCGDB_NAME, user=settings.KCGDB_USER, password=settings.KCGDB_PASSWORD, options=f'-c search_path={settings.KCGDB_SCHEMA},public', ) logger.info('kcgdb connection pool initialized') def close_pool(): global _pool if _pool: _pool.closeall() _pool = None logger.info('kcgdb connection pool closed') @contextmanager def get_conn(): conn = _pool.getconn() try: yield conn except Exception: conn.rollback() raise finally: _pool.putconn(conn) def check_health() -> bool: try: with get_conn() as conn: with conn.cursor() as cur: cur.execute('SELECT 1') return True except Exception as e: logger.error('kcgdb health check failed: %s', e) return False def upsert_results(results: list['AnalysisResult']) -> int: """분석 결과를 vessel_analysis_results 테이블에 upsert.""" if not results: return 0 insert_sql = """ INSERT INTO vessel_analysis_results ( mmsi, timestamp, vessel_type, confidence, fishing_pct, cluster_id, season, zone, dist_to_baseline_nm, activity_state, ucaf_score, ucft_score, is_dark, gap_duration_min, spoofing_score, bd09_offset_m, speed_jump_count, cluster_size, is_leader, fleet_role, risk_score, risk_level, is_transship_suspect, transship_pair_mmsi, transship_duration_min, features, analyzed_at ) VALUES %s ON CONFLICT (mmsi, timestamp) DO UPDATE SET vessel_type = EXCLUDED.vessel_type, confidence = EXCLUDED.confidence, fishing_pct = EXCLUDED.fishing_pct, cluster_id = EXCLUDED.cluster_id, season = EXCLUDED.season, zone = EXCLUDED.zone, dist_to_baseline_nm = EXCLUDED.dist_to_baseline_nm, activity_state = EXCLUDED.activity_state, ucaf_score = EXCLUDED.ucaf_score, ucft_score = EXCLUDED.ucft_score, is_dark = EXCLUDED.is_dark, gap_duration_min = EXCLUDED.gap_duration_min, spoofing_score = EXCLUDED.spoofing_score, bd09_offset_m = EXCLUDED.bd09_offset_m, speed_jump_count = EXCLUDED.speed_jump_count, cluster_size = EXCLUDED.cluster_size, is_leader = EXCLUDED.is_leader, fleet_role = EXCLUDED.fleet_role, risk_score = EXCLUDED.risk_score, risk_level = EXCLUDED.risk_level, is_transship_suspect = EXCLUDED.is_transship_suspect, transship_pair_mmsi = EXCLUDED.transship_pair_mmsi, transship_duration_min = EXCLUDED.transship_duration_min, features = EXCLUDED.features, analyzed_at = EXCLUDED.analyzed_at """ try: with get_conn() as conn: with conn.cursor() as cur: tuples = [r.to_db_tuple() for r in results] execute_values(cur, insert_sql, tuples, page_size=100) conn.commit() count = len(tuples) logger.info('upserted %d analysis results', count) return count except Exception as e: logger.error('failed to upsert results: %s', e) return 0 def cleanup_old(hours: int = 48) -> int: """오래된 분석 결과 삭제.""" try: with get_conn() as conn: with conn.cursor() as cur: cur.execute( 'DELETE FROM vessel_analysis_results WHERE analyzed_at < NOW() - (%s * INTERVAL \'1 hour\')', (hours,), ) deleted = cur.rowcount conn.commit() if deleted > 0: logger.info('cleaned up %d old results (older than %dh)', deleted, hours) return deleted except Exception as e: logger.error('failed to cleanup old results: %s', e) return 0 def save_group_snapshots(snapshots: list[dict]) -> int: """group_polygon_snapshots에 폴리곤 스냅샷 배치 INSERT. snapshots: polygon_builder.build_all_group_snapshots() 결과 각 항목은: group_type, group_key, group_label, snapshot_time, polygon_wkt (str|None), center_wkt (str|None), area_sq_nm, member_count, zone_id, zone_name, members (list[dict]), color """ if not snapshots: return 0 insert_sql = """ INSERT INTO kcg.group_polygon_snapshots ( group_type, group_key, group_label, sub_cluster_id, resolution, snapshot_time, polygon, center_point, area_sq_nm, member_count, zone_id, zone_name, members, color ) VALUES ( %s, %s, %s, %s, %s, %s, ST_GeomFromText(%s, 4326), ST_GeomFromText(%s, 4326), %s, %s, %s, %s, %s::jsonb, %s ) """ inserted = 0 try: with get_conn() as conn: with conn.cursor() as cur: for s in snapshots: cur.execute( insert_sql, ( s['group_type'], s['group_key'], s['group_label'], s.get('sub_cluster_id', 0), s.get('resolution', '6h'), s['snapshot_time'], s.get('polygon_wkt'), s.get('center_wkt'), s.get('area_sq_nm'), s.get('member_count'), s.get('zone_id'), s.get('zone_name'), json.dumps(s.get('members', []), ensure_ascii=False), s.get('color'), ), ) inserted += 1 conn.commit() logger.info('saved %d group polygon snapshots', inserted) return inserted except Exception as e: logger.error('failed to save group snapshots: %s', e) return 0 def fetch_analysis_summary() -> dict: """최근 1시간 분석 결과 요약 (채팅 컨텍스트용).""" try: with get_conn() as conn: with conn.cursor() as cur: # 위험도 분포 cur.execute(""" SELECT risk_level, COUNT(*) FROM vessel_analysis_results WHERE analyzed_at > NOW() - INTERVAL '1 hour' GROUP BY risk_level """) risk_dist = {row[0]: row[1] for row in cur.fetchall()} # 수역별 분포 cur.execute(""" SELECT zone, COUNT(*) FROM vessel_analysis_results WHERE analyzed_at > NOW() - INTERVAL '1 hour' GROUP BY zone """) zone_dist = {row[0]: row[1] for row in cur.fetchall()} # 다크/스푸핑/환적 카운트 cur.execute(""" SELECT COUNT(*) FILTER (WHERE is_dark = TRUE) AS dark_count, COUNT(*) FILTER (WHERE spoofing_score > 0.5) AS spoofing_count, COUNT(*) FILTER (WHERE is_transship_suspect = TRUE) AS transship_count FROM vessel_analysis_results WHERE analyzed_at > NOW() - INTERVAL '1 hour' """) row = cur.fetchone() result = { 'risk_distribution': {**risk_dist, **zone_dist}, 'dark_count': row[0] if row else 0, 'spoofing_count': row[1] if row else 0, 'transship_count': row[2] if row else 0, } return result except Exception as e: logger.error('fetch_analysis_summary failed: %s', e) return {'risk_distribution': {}, 'dark_count': 0, 'spoofing_count': 0, 'transship_count': 0} def fetch_recent_high_risk(limit: int = 10) -> list[dict]: """위험도 상위 N척 선박 상세 (채팅 컨텍스트용).""" try: with get_conn() as conn: with conn.cursor() as cur: cur.execute(""" SELECT mmsi, risk_score, risk_level, zone, is_dark, is_transship_suspect, activity_state, spoofing_score FROM vessel_analysis_results WHERE analyzed_at > NOW() - INTERVAL '1 hour' ORDER BY risk_score DESC LIMIT %s """, (limit,)) rows = cur.fetchall() result = [] for row in rows: result.append({ 'mmsi': row[0], 'name': row[0], # vessel_store에서 이름 조회 필요시 보강 'risk_score': row[1], 'risk_level': row[2], 'zone': row[3], 'is_dark': row[4], 'is_transship': row[5], 'activity_state': row[6], 'spoofing_score': float(row[7]) if row[7] else 0.0, }) return result except Exception as e: logger.error('fetch_recent_high_risk failed: %s', e) return [] def fetch_polygon_summary() -> dict: """최신 그룹 폴리곤 요약 (채팅 컨텍스트용).""" try: with get_conn() as conn: with conn.cursor() as cur: cur.execute(""" SELECT group_type, COUNT(*), SUM(member_count) FROM kcg.group_polygon_snapshots WHERE snapshot_time = ( SELECT MAX(snapshot_time) FROM kcg.group_polygon_snapshots ) GROUP BY group_type """) rows = cur.fetchall() result = { 'fleet_count': 0, 'fleet_members': 0, 'gear_in_zone': 0, 'gear_out_zone': 0, } for row in rows: gtype, count, members = row[0], row[1], row[2] or 0 if gtype == 'FLEET': result['fleet_count'] = count result['fleet_members'] = members elif gtype == 'GEAR_IN_ZONE': result['gear_in_zone'] = count elif gtype == 'GEAR_OUT_ZONE': result['gear_out_zone'] = count return result except Exception as e: logger.error('fetch_polygon_summary failed: %s', e) return {'fleet_count': 0, 'fleet_members': 0, 'gear_in_zone': 0, 'gear_out_zone': 0} def cleanup_group_snapshots(days: int = 7) -> int: """오래된 그룹 폴리곤 스냅샷 삭제.""" try: with get_conn() as conn: with conn.cursor() as cur: cur.execute( f"DELETE FROM kcg.group_polygon_snapshots WHERE snapshot_time < NOW() - INTERVAL '{days} days'", ) deleted = cur.rowcount conn.commit() if deleted > 0: logger.info('cleaned up %d old group snapshots (older than %dd)', deleted, days) return deleted except Exception as e: logger.error('failed to cleanup group snapshots: %s', e) return 0