wing-ops/prediction/scat/db.py
leedano d9fb4506bc feat(scat): Pre-SCAT 관할서 필터링 + 해안조사 데이터 파이프라인 구축
- 백엔드: 관할서 목록 API, zone 필터링 쿼리 추가
- 프론트: ScatLeftPanel 관할서 드롭다운, ScatMap/ScatPopup 개선
- 기상탭: WeatherRightPanel 리팩토링
- prediction/scat: PDF 파싱 → 지오코딩 → ESI 매핑 파이프라인
- vite.config: proxy 설정 추가

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 10:53:19 +09:00

181 lines
5.9 KiB
Python

"""PostgreSQL 연결 및 SCAT 데이터 upsert."""
from __future__ import annotations
import json
from typing import Optional
import psycopg2
from psycopg2 import pool
import config
from models import CoastalSection
# ---------------------------------------------------------------------------
# 커넥션 풀 (싱글턴)
# ---------------------------------------------------------------------------
_pool: pool.ThreadedConnectionPool | None = None
def get_pool() -> pool.ThreadedConnectionPool:
global _pool
if _pool is None:
_pool = pool.ThreadedConnectionPool(
minconn=1,
maxconn=5,
host=config.DB_HOST,
port=config.DB_PORT,
dbname=config.DB_NAME,
user=config.DB_USER,
password=config.DB_PASSWORD,
options=f'-c search_path={config.DB_SCHEMA},public',
)
return _pool
def get_conn():
return get_pool().getconn()
def put_conn(conn):
get_pool().putconn(conn)
# ---------------------------------------------------------------------------
# Zone 관리
# ---------------------------------------------------------------------------
def ensure_zone(zone_cd: str, zone_nm: str, jrsd_nm: str) -> int:
"""구역이 없으면 생성, 있으면 SN 반환."""
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute(
'SELECT cst_srvy_zone_sn FROM cst_srvy_zone WHERE zone_cd = %s',
(zone_cd,),
)
row = cur.fetchone()
if row:
return row[0]
cur.execute(
'''INSERT INTO cst_srvy_zone (zone_cd, zone_nm, jrsd_nm, sect_cnt)
VALUES (%s, %s, %s, 0)
RETURNING cst_srvy_zone_sn''',
(zone_cd, zone_nm, jrsd_nm),
)
sn = cur.fetchone()[0]
conn.commit()
return sn
finally:
put_conn(conn)
def update_zone_sect_count(zone_sn: int):
"""구역의 구간 수를 갱신."""
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute(
'''UPDATE cst_srvy_zone
SET sect_cnt = (SELECT count(*) FROM cst_sect WHERE cst_srvy_zone_sn = %s)
WHERE cst_srvy_zone_sn = %s''',
(zone_sn, zone_sn),
)
conn.commit()
finally:
put_conn(conn)
def update_zone_center(zone_sn: int):
"""zone의 sections 좌표 평균으로 LAT_CENTER/LNG_CENTER 갱신."""
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute(
'''UPDATE cst_srvy_zone SET
lat_center = sub.avg_lat,
lng_center = sub.avg_lng
FROM (
SELECT AVG(lat) as avg_lat, AVG(lng) as avg_lng
FROM cst_sect
WHERE cst_srvy_zone_sn = %s AND lat IS NOT NULL
) sub
WHERE cst_srvy_zone_sn = %s''',
(zone_sn, zone_sn),
)
conn.commit()
finally:
put_conn(conn)
# ---------------------------------------------------------------------------
# Section upsert
# ---------------------------------------------------------------------------
def upsert_section(zone_sn: int, section: CoastalSection) -> int:
"""구간 INSERT 또는 UPDATE (SECT_CD 기준 ON CONFLICT)."""
conn = get_conn()
try:
sensitive = json.dumps(
[item.model_dump() for item in section.sensitive_info],
ensure_ascii=False,
)
cleanup = json.dumps(section.cleanup_methods, ensure_ascii=False)
end_crit = json.dumps(section.end_criteria, ensure_ascii=False)
notes = json.dumps(section.notes, ensure_ascii=False)
with conn.cursor() as cur:
cur.execute(
'''INSERT INTO cst_sect (
cst_srvy_zone_sn, sect_cd, sect_nm,
cst_tp_cd, esi_cd, esi_num, shore_tp, len_m,
lat, lng,
access_dc, access_pt,
sensitive_info, cleanup_methods, end_criteria, notes,
srvy_stts_cd
) VALUES (
%s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s,
%s, %s,
%s::jsonb, %s::jsonb, %s::jsonb, %s::jsonb,
'미조사'
)
ON CONFLICT (sect_cd) DO UPDATE SET
sect_nm = EXCLUDED.sect_nm,
cst_tp_cd = EXCLUDED.cst_tp_cd,
esi_cd = EXCLUDED.esi_cd,
esi_num = EXCLUDED.esi_num,
shore_tp = EXCLUDED.shore_tp,
len_m = EXCLUDED.len_m,
lat = EXCLUDED.lat,
lng = EXCLUDED.lng,
access_dc = EXCLUDED.access_dc,
access_pt = EXCLUDED.access_pt,
sensitive_info = EXCLUDED.sensitive_info,
cleanup_methods = EXCLUDED.cleanup_methods,
end_criteria = EXCLUDED.end_criteria,
notes = EXCLUDED.notes
RETURNING cst_sect_sn''',
(
zone_sn, section.sect_cd, section.sect_nm,
section.cst_tp_cd, section.esi_cd, section.esi_num,
section.shore_tp, section.len_m,
section.lat, section.lng,
section.access_dc, section.access_pt,
sensitive, cleanup, end_crit, notes,
),
)
sn = cur.fetchone()[0]
conn.commit()
return sn
finally:
put_conn(conn)
def close_pool():
global _pool
if _pool:
_pool.closeall()
_pool = None