"""PostgreSQL 연결 및 SCAT 데이터 upsert.""" from __future__ import annotations import json from typing import Optional import psycopg2 from psycopg2 import pool import config from models import CoastalSection # --------------------------------------------------------------------------- # 커넥션 풀 (싱글턴) # --------------------------------------------------------------------------- _pool: pool.ThreadedConnectionPool | None = None def get_pool() -> pool.ThreadedConnectionPool: global _pool if _pool is None: _pool = pool.ThreadedConnectionPool( minconn=1, maxconn=5, host=config.DB_HOST, port=config.DB_PORT, dbname=config.DB_NAME, user=config.DB_USER, password=config.DB_PASSWORD, options=f'-c search_path={config.DB_SCHEMA},public', ) return _pool def get_conn(): return get_pool().getconn() def put_conn(conn): get_pool().putconn(conn) # --------------------------------------------------------------------------- # Zone 관리 # --------------------------------------------------------------------------- def ensure_zone(zone_cd: str, zone_nm: str, jrsd_nm: str) -> int: """구역이 없으면 생성, 있으면 SN 반환.""" conn = get_conn() try: with conn.cursor() as cur: cur.execute( 'SELECT cst_srvy_zone_sn FROM cst_srvy_zone WHERE zone_cd = %s', (zone_cd,), ) row = cur.fetchone() if row: return row[0] cur.execute( '''INSERT INTO cst_srvy_zone (zone_cd, zone_nm, jrsd_nm, sect_cnt) VALUES (%s, %s, %s, 0) RETURNING cst_srvy_zone_sn''', (zone_cd, zone_nm, jrsd_nm), ) sn = cur.fetchone()[0] conn.commit() return sn finally: put_conn(conn) def update_zone_sect_count(zone_sn: int): """구역의 구간 수를 갱신.""" conn = get_conn() try: with conn.cursor() as cur: cur.execute( '''UPDATE cst_srvy_zone SET sect_cnt = (SELECT count(*) FROM cst_sect WHERE cst_srvy_zone_sn = %s) WHERE cst_srvy_zone_sn = %s''', (zone_sn, zone_sn), ) conn.commit() finally: put_conn(conn) def update_zone_center(zone_sn: int): """zone의 sections 좌표 평균으로 LAT_CENTER/LNG_CENTER 갱신.""" conn = get_conn() try: with conn.cursor() as cur: cur.execute( '''UPDATE cst_srvy_zone SET lat_center = sub.avg_lat, lng_center = sub.avg_lng FROM ( SELECT AVG(lat) as avg_lat, AVG(lng) as avg_lng FROM cst_sect WHERE cst_srvy_zone_sn = %s AND lat IS NOT NULL ) sub WHERE cst_srvy_zone_sn = %s''', (zone_sn, zone_sn), ) conn.commit() finally: put_conn(conn) # --------------------------------------------------------------------------- # Section upsert # --------------------------------------------------------------------------- def upsert_section(zone_sn: int, section: CoastalSection) -> int: """구간 INSERT 또는 UPDATE (SECT_CD 기준 ON CONFLICT).""" conn = get_conn() try: sensitive = json.dumps( [item.model_dump() for item in section.sensitive_info], ensure_ascii=False, ) cleanup = json.dumps(section.cleanup_methods, ensure_ascii=False) end_crit = json.dumps(section.end_criteria, ensure_ascii=False) notes = json.dumps(section.notes, ensure_ascii=False) with conn.cursor() as cur: cur.execute( '''INSERT INTO cst_sect ( cst_srvy_zone_sn, sect_cd, sect_nm, cst_tp_cd, esi_cd, esi_num, shore_tp, len_m, lat, lng, access_dc, access_pt, sensitive_info, cleanup_methods, end_criteria, notes, srvy_stts_cd ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s::jsonb, %s::jsonb, %s::jsonb, '미조사' ) ON CONFLICT (sect_cd) DO UPDATE SET sect_nm = EXCLUDED.sect_nm, cst_tp_cd = EXCLUDED.cst_tp_cd, esi_cd = EXCLUDED.esi_cd, esi_num = EXCLUDED.esi_num, shore_tp = EXCLUDED.shore_tp, len_m = EXCLUDED.len_m, lat = EXCLUDED.lat, lng = EXCLUDED.lng, access_dc = EXCLUDED.access_dc, access_pt = EXCLUDED.access_pt, sensitive_info = EXCLUDED.sensitive_info, cleanup_methods = EXCLUDED.cleanup_methods, end_criteria = EXCLUDED.end_criteria, notes = EXCLUDED.notes RETURNING cst_sect_sn''', ( zone_sn, section.sect_cd, section.sect_nm, section.cst_tp_cd, section.esi_cd, section.esi_num, section.shore_tp, section.len_m, section.lat, section.lng, section.access_dc, section.access_pt, sensitive, cleanup, end_crit, notes, ), ) sn = cur.fetchone()[0] conn.commit() return sn finally: put_conn(conn) def close_pool(): global _pool if _pool: _pool.closeall() _pool = None