Merge pull request 'release: 2026-03-20.2 (129건 커밋)' (#111) from develop into main
All checks were successful
Build and Deploy Wing-Demo / build-and-deploy (push) Successful in 35s
All checks were successful
Build and Deploy Wing-Demo / build-and-deploy (push) Successful in 35s
This commit is contained in:
커밋
84fa49189c
@ -4,6 +4,11 @@
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [2026-03-20.2]
|
||||
|
||||
### 변경
|
||||
- prediction/scat 파이프라인 제거 + SCAT/사고관리 UI 수정
|
||||
|
||||
## [2026-03-20]
|
||||
|
||||
### 추가
|
||||
|
||||
@ -18,7 +18,7 @@ export function TopBar({ activeTab, onTabChange }: TopBarProps) {
|
||||
const { menuConfig, isLoaded } = useMenuStore()
|
||||
const { mapToggles, toggleMap, mapTypes, measureMode, setMeasureMode } = useMapStore()
|
||||
|
||||
const MAP_TABS = new Set<string>(['prediction', 'hns', 'scat', 'weather'])
|
||||
const MAP_TABS = new Set<string>(['prediction', 'hns', 'scat', 'incidents'])
|
||||
const isMapTab = MAP_TABS.has(activeTab)
|
||||
|
||||
const handleToggleMeasure = (mode: 'distance' | 'area') => {
|
||||
|
||||
@ -12,6 +12,10 @@ import { fetchIncidents } from '../services/incidentsApi'
|
||||
import type { IncidentCompat } from '../services/incidentsApi'
|
||||
import { DischargeZonePanel } from './DischargeZonePanel'
|
||||
import { estimateDistanceFromCoast, getDischargeZoneLines } from '../utils/dischargeZoneData'
|
||||
import { useMapStore } from '@common/store/mapStore'
|
||||
import { useMeasureTool } from '@common/hooks/useMeasureTool'
|
||||
import { buildMeasureLayers } from '@common/components/map/measureLayers'
|
||||
import { MeasureOverlay } from '@common/components/map/MeasureOverlay'
|
||||
|
||||
// ── CartoDB Positron 베이스맵 (밝은 테마) ────────────────
|
||||
const BASE_STYLE: StyleSpecification = {
|
||||
@ -97,6 +101,11 @@ export function IncidentsView() {
|
||||
const [dischargeMode, setDischargeMode] = useState(false)
|
||||
const [dischargeInfo, setDischargeInfo] = useState<{ lat: number; lon: number; distanceNm: number } | null>(null)
|
||||
|
||||
// Measure tool
|
||||
const { handleMeasureClick, measureMode } = useMeasureTool()
|
||||
const measureInProgress = useMapStore((s) => s.measureInProgress)
|
||||
const measurements = useMapStore((s) => s.measurements)
|
||||
|
||||
// Analysis view mode
|
||||
const [viewMode, setViewMode] = useState<ViewMode>('overlay')
|
||||
const [analysisActive, setAnalysisActive] = useState(false)
|
||||
@ -250,10 +259,15 @@ export function IncidentsView() {
|
||||
)
|
||||
}, [dischargeMode])
|
||||
|
||||
const measureDeckLayers = useMemo(
|
||||
() => buildMeasureLayers(measureInProgress, measureMode, measurements),
|
||||
[measureInProgress, measureMode, measurements],
|
||||
)
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const deckLayers: any[] = useMemo(
|
||||
() => [incidentLayer, vesselIconLayer, ...dischargeZoneLayers],
|
||||
[incidentLayer, vesselIconLayer, dischargeZoneLayers],
|
||||
() => [incidentLayer, vesselIconLayer, ...dischargeZoneLayers, ...measureDeckLayers],
|
||||
[incidentLayer, vesselIconLayer, dischargeZoneLayers, measureDeckLayers],
|
||||
)
|
||||
|
||||
return (
|
||||
@ -350,6 +364,10 @@ export function IncidentsView() {
|
||||
style={{ width: '100%', height: '100%', background: '#f0f0f0' }}
|
||||
attributionControl={false}
|
||||
onClick={(e) => {
|
||||
if (measureMode !== null && e.lngLat) {
|
||||
handleMeasureClick(e.lngLat.lng, e.lngLat.lat)
|
||||
return
|
||||
}
|
||||
if (dischargeMode && e.lngLat) {
|
||||
const lat = e.lngLat.lat
|
||||
const lon = e.lngLat.lng
|
||||
@ -357,9 +375,10 @@ export function IncidentsView() {
|
||||
setDischargeInfo({ lat, lon, distanceNm })
|
||||
}
|
||||
}}
|
||||
cursor={dischargeMode ? 'crosshair' : undefined}
|
||||
cursor={(measureMode !== null || dischargeMode) ? 'crosshair' : undefined}
|
||||
>
|
||||
<DeckGLOverlay layers={deckLayers} />
|
||||
<MeasureOverlay />
|
||||
|
||||
{/* 사고 팝업 */}
|
||||
{incidentPopup && (
|
||||
|
||||
@ -73,7 +73,7 @@ export default function ScatRightPanel({ detail, loading, onOpenReport, onNewSur
|
||||
) : detail ? (
|
||||
<>
|
||||
{activeTab === 0 && <DetailTab detail={detail} />}
|
||||
{activeTab === 1 && <PhotoTab />}
|
||||
{activeTab === 1 && <PhotoTab detail={detail} />}
|
||||
{activeTab === 2 && <CleanupTab detail={detail} />}
|
||||
</>
|
||||
) : null}
|
||||
@ -137,16 +137,33 @@ function DetailTab({ detail }: { detail: ScatDetail }) {
|
||||
}
|
||||
|
||||
/* ═══ 탭 1: 현장 사진 ═══ */
|
||||
function PhotoTab() {
|
||||
function PhotoTab({ detail }: { detail: ScatDetail }) {
|
||||
const [imgError, setImgError] = useState(false);
|
||||
const imgSrc = `/scat-img/${detail.code}-1.png`;
|
||||
|
||||
if (imgError) {
|
||||
return (
|
||||
<div className="flex flex-col items-center justify-center py-10 gap-3">
|
||||
<div className="text-3xl">📷</div>
|
||||
<div className="text-[11px] text-text-3 text-center leading-relaxed">
|
||||
현장 사진 기능은<br />추후 업데이트 예정입니다.
|
||||
해당 구간의 사진이<br />등록되지 않았습니다.
|
||||
</div>
|
||||
<div className="px-3 py-1.5 rounded text-[10px] text-text-3"
|
||||
style={{ background: 'rgba(6,182,212,.06)', border: '1px solid rgba(6,182,212,.15)' }}>
|
||||
사진 업로드 API 연동 예정
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
return (
|
||||
<div className="flex flex-col gap-2">
|
||||
<div className="rounded-md overflow-hidden border border-border">
|
||||
<img
|
||||
src={imgSrc}
|
||||
alt={`${detail.name} 해안 사진`}
|
||||
className="w-full h-auto object-cover"
|
||||
onError={() => setImgError(true)}
|
||||
/>
|
||||
</div>
|
||||
<div className="text-[10px] text-text-3 text-center">
|
||||
{detail.code} 해안 조사 사진
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
|
||||
@ -1,29 +0,0 @@
|
||||
"""SCAT PDF 파싱 설정."""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
BASE_DIR = Path(__file__).parent
|
||||
|
||||
# 이미지 저장 경로
|
||||
IMAGE_OUTPUT_DIR = Path(os.getenv('SCAT_IMAGE_DIR', str(BASE_DIR / 'scat_images')))
|
||||
|
||||
# 파싱 결과 저장 경로
|
||||
OUTPUT_DIR = Path(os.getenv('SCAT_OUTPUT_DIR', str(BASE_DIR / 'output')))
|
||||
|
||||
# DB 설정 (wingDb.ts 기본값과 동일, 추후 사용)
|
||||
DB_HOST = os.getenv('DB_HOST', 'localhost')
|
||||
DB_PORT = int(os.getenv('DB_PORT', '5432'))
|
||||
DB_NAME = os.getenv('DB_NAME', 'wing')
|
||||
DB_USER = os.getenv('DB_USER', 'postgres')
|
||||
DB_PASSWORD = os.getenv('DB_PASSWORD', 'dano2030')
|
||||
DB_SCHEMA = 'wing'
|
||||
|
||||
# 해안 사진 저장 경로 (frontend public)
|
||||
SCAT_PHOTOS_DIR = Path(os.getenv(
|
||||
'SCAT_PHOTOS_DIR',
|
||||
str(BASE_DIR.parent.parent / 'frontend' / 'public' / 'scat-photos'),
|
||||
))
|
||||
|
||||
# Kakao Local API (Geocoding)
|
||||
KAKAO_REST_KEY = os.getenv('KAKAO_REST_KEY', '5e5dc9a10eb86b88d5106e508ec4d236')
|
||||
@ -1,180 +0,0 @@
|
||||
"""PostgreSQL 연결 및 SCAT 데이터 upsert."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Optional
|
||||
|
||||
import psycopg2
|
||||
from psycopg2 import pool
|
||||
|
||||
import config
|
||||
from models import CoastalSection
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 커넥션 풀 (싱글턴)
|
||||
# ---------------------------------------------------------------------------
|
||||
_pool: pool.ThreadedConnectionPool | None = None
|
||||
|
||||
|
||||
def get_pool() -> pool.ThreadedConnectionPool:
|
||||
global _pool
|
||||
if _pool is None:
|
||||
_pool = pool.ThreadedConnectionPool(
|
||||
minconn=1,
|
||||
maxconn=5,
|
||||
host=config.DB_HOST,
|
||||
port=config.DB_PORT,
|
||||
dbname=config.DB_NAME,
|
||||
user=config.DB_USER,
|
||||
password=config.DB_PASSWORD,
|
||||
options=f'-c search_path={config.DB_SCHEMA},public',
|
||||
)
|
||||
return _pool
|
||||
|
||||
|
||||
def get_conn():
|
||||
return get_pool().getconn()
|
||||
|
||||
|
||||
def put_conn(conn):
|
||||
get_pool().putconn(conn)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Zone 관리
|
||||
# ---------------------------------------------------------------------------
|
||||
def ensure_zone(zone_cd: str, zone_nm: str, jrsd_nm: str) -> int:
|
||||
"""구역이 없으면 생성, 있으면 SN 반환."""
|
||||
conn = get_conn()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
'SELECT cst_srvy_zone_sn FROM cst_srvy_zone WHERE zone_cd = %s',
|
||||
(zone_cd,),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
if row:
|
||||
return row[0]
|
||||
|
||||
cur.execute(
|
||||
'''INSERT INTO cst_srvy_zone (zone_cd, zone_nm, jrsd_nm, sect_cnt)
|
||||
VALUES (%s, %s, %s, 0)
|
||||
RETURNING cst_srvy_zone_sn''',
|
||||
(zone_cd, zone_nm, jrsd_nm),
|
||||
)
|
||||
sn = cur.fetchone()[0]
|
||||
conn.commit()
|
||||
return sn
|
||||
finally:
|
||||
put_conn(conn)
|
||||
|
||||
|
||||
def update_zone_sect_count(zone_sn: int):
|
||||
"""구역의 구간 수를 갱신."""
|
||||
conn = get_conn()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
'''UPDATE cst_srvy_zone
|
||||
SET sect_cnt = (SELECT count(*) FROM cst_sect WHERE cst_srvy_zone_sn = %s)
|
||||
WHERE cst_srvy_zone_sn = %s''',
|
||||
(zone_sn, zone_sn),
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
put_conn(conn)
|
||||
|
||||
|
||||
def update_zone_center(zone_sn: int):
|
||||
"""zone의 sections 좌표 평균으로 LAT_CENTER/LNG_CENTER 갱신."""
|
||||
conn = get_conn()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
'''UPDATE cst_srvy_zone SET
|
||||
lat_center = sub.avg_lat,
|
||||
lng_center = sub.avg_lng
|
||||
FROM (
|
||||
SELECT AVG(lat) as avg_lat, AVG(lng) as avg_lng
|
||||
FROM cst_sect
|
||||
WHERE cst_srvy_zone_sn = %s AND lat IS NOT NULL
|
||||
) sub
|
||||
WHERE cst_srvy_zone_sn = %s''',
|
||||
(zone_sn, zone_sn),
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
put_conn(conn)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Section upsert
|
||||
# ---------------------------------------------------------------------------
|
||||
def upsert_section(zone_sn: int, section: CoastalSection) -> int:
|
||||
"""구간 INSERT 또는 UPDATE (SECT_CD 기준 ON CONFLICT)."""
|
||||
conn = get_conn()
|
||||
try:
|
||||
sensitive = json.dumps(
|
||||
[item.model_dump() for item in section.sensitive_info],
|
||||
ensure_ascii=False,
|
||||
)
|
||||
cleanup = json.dumps(section.cleanup_methods, ensure_ascii=False)
|
||||
end_crit = json.dumps(section.end_criteria, ensure_ascii=False)
|
||||
notes = json.dumps(section.notes, ensure_ascii=False)
|
||||
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
'''INSERT INTO cst_sect (
|
||||
cst_srvy_zone_sn, sect_cd, sect_nm,
|
||||
cst_tp_cd, esi_cd, esi_num, shore_tp, len_m,
|
||||
lat, lng,
|
||||
access_dc, access_pt,
|
||||
sensitive_info, cleanup_methods, end_criteria, notes,
|
||||
srvy_stts_cd
|
||||
) VALUES (
|
||||
%s, %s, %s,
|
||||
%s, %s, %s, %s, %s,
|
||||
%s, %s,
|
||||
%s, %s,
|
||||
%s::jsonb, %s::jsonb, %s::jsonb, %s::jsonb,
|
||||
'미조사'
|
||||
)
|
||||
ON CONFLICT (sect_cd) DO UPDATE SET
|
||||
sect_nm = EXCLUDED.sect_nm,
|
||||
cst_tp_cd = EXCLUDED.cst_tp_cd,
|
||||
esi_cd = EXCLUDED.esi_cd,
|
||||
esi_num = EXCLUDED.esi_num,
|
||||
shore_tp = EXCLUDED.shore_tp,
|
||||
len_m = EXCLUDED.len_m,
|
||||
lat = EXCLUDED.lat,
|
||||
lng = EXCLUDED.lng,
|
||||
access_dc = EXCLUDED.access_dc,
|
||||
access_pt = EXCLUDED.access_pt,
|
||||
sensitive_info = EXCLUDED.sensitive_info,
|
||||
cleanup_methods = EXCLUDED.cleanup_methods,
|
||||
end_criteria = EXCLUDED.end_criteria,
|
||||
notes = EXCLUDED.notes
|
||||
RETURNING cst_sect_sn''',
|
||||
(
|
||||
zone_sn, section.sect_cd, section.sect_nm,
|
||||
section.cst_tp_cd, section.esi_cd, section.esi_num,
|
||||
section.shore_tp, section.len_m,
|
||||
section.lat, section.lng,
|
||||
section.access_dc, section.access_pt,
|
||||
sensitive, cleanup, end_crit, notes,
|
||||
),
|
||||
)
|
||||
sn = cur.fetchone()[0]
|
||||
conn.commit()
|
||||
return sn
|
||||
finally:
|
||||
put_conn(conn)
|
||||
|
||||
|
||||
def close_pool():
|
||||
global _pool
|
||||
if _pool:
|
||||
_pool.closeall()
|
||||
_pool = None
|
||||
@ -1,65 +0,0 @@
|
||||
"""cst_tp_cd(해안 구성) → ESI 등급 매핑.
|
||||
|
||||
ESI(Environmental Sensitivity Index) 등급은 해안의 물리적 특성에 따라 분류된다.
|
||||
매핑 기준: NOAA(2010) + 성 등(2003) ESI 등급 표.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from typing import Optional, Tuple
|
||||
|
||||
# 키워드 → (esi_cd, esi_num) 매핑 (구체적 키워드 우선)
|
||||
_ESI_RULES: list[tuple[list[str], str, int]] = [
|
||||
# 8B: 습지
|
||||
(['염습지', '습지'], '8B', 8),
|
||||
# 8A: 갯벌/점토질
|
||||
(['갯벌', '점토질', '점토'], '8A', 8),
|
||||
# 7: 반폐쇄형
|
||||
(['반폐쇄', '반 폐쇄'], '7', 7),
|
||||
# 6B: 투과성 인공호안/사석
|
||||
(['투과성 인공호안', '투과성 사석', '투과성 인공해안', '투과성 경사식'], '6B', 6),
|
||||
# 6A: 자갈/바위
|
||||
(['자갈', '왕자갈', '바위', '갈('], '6A', 6),
|
||||
# 5: 모래+자갈 혼합
|
||||
(['모래자갈', '모래+자갈', '혼합'], '5', 5),
|
||||
# 4: 굵은 모래
|
||||
(['굵은 모래', '조립질 모래'], '4', 4),
|
||||
# 3: 세립질 모래/모래(기본)
|
||||
(['세립질 모래', '세립질', '모래'], '3', 3),
|
||||
# 2: 수평암반/비투과성
|
||||
(['수평암반', '수평호안', '기반암', '비투과성 기질', '비투과성 인공호안',
|
||||
'비투과성 인공해안', '노출기반암', '수암반', '콘크리트'], '2', 2),
|
||||
# 1: 수직암반/인공구조물/계류안벽
|
||||
(['수직암반', '인공구조물', '직립호안', '절벽', '수직호안', '수진호안',
|
||||
'수직 계류', '인공호안', '계류 안벽', '안벽'], '1', 1),
|
||||
]
|
||||
|
||||
# ESI 코드 → 숫자 변환
|
||||
_ESI_NUM_RE = re.compile(r'^(\d+)')
|
||||
|
||||
|
||||
def map_esi(cst_tp_cd: Optional[str]) -> Tuple[Optional[str], Optional[int]]:
|
||||
"""cst_tp_cd(해안 구성 키워드)에서 ESI 등급을 매핑한다.
|
||||
|
||||
Returns:
|
||||
(esi_cd, esi_num) 튜플. 매핑 실패 시 (None, None).
|
||||
"""
|
||||
if not cst_tp_cd:
|
||||
return None, None
|
||||
|
||||
text = cst_tp_cd.strip()
|
||||
for keywords, esi_cd, esi_num in _ESI_RULES:
|
||||
for kw in keywords:
|
||||
if kw in text:
|
||||
return esi_cd, esi_num
|
||||
|
||||
return None, None
|
||||
|
||||
|
||||
def parse_esi_cd(esi_str: str) -> Tuple[str, int]:
|
||||
"""ESI 등급 문자열(e.g. '8A', '6B', '2')에서 (esi_cd, esi_num) 추출."""
|
||||
esi_cd = esi_str.strip()
|
||||
m = _ESI_NUM_RE.match(esi_cd)
|
||||
esi_num = int(m.group(1)) if m else 0
|
||||
return esi_cd, esi_num
|
||||
@ -1,266 +0,0 @@
|
||||
"""Kakao Local API 기반 Geocoding + 오프셋 분산.
|
||||
|
||||
환경변수 KAKAO_REST_KEY 필요.
|
||||
- access_pt 있는 구간: 주소/키워드 검색으로 정확한 좌표
|
||||
- access_pt 없는 구간: sect_nm 기반 대표 좌표 + 나선형 오프셋
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import math
|
||||
import re
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Optional, Tuple
|
||||
|
||||
import requests
|
||||
|
||||
import config
|
||||
|
||||
_HEADERS = {
|
||||
'Authorization': f'KakaoAK {config.KAKAO_REST_KEY}',
|
||||
}
|
||||
|
||||
# Kakao API endpoints
|
||||
_ADDRESS_URL = 'https://dapi.kakao.com/v2/local/search/address.json'
|
||||
_KEYWORD_URL = 'https://dapi.kakao.com/v2/local/search/keyword.json'
|
||||
|
||||
# 캐시: query → (lat, lng) or None
|
||||
_cache: dict[str, Optional[Tuple[float, float]]] = {}
|
||||
|
||||
# API 호출 간격 (초)
|
||||
_RATE_LIMIT = 0.05
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Kakao API 호출
|
||||
# ---------------------------------------------------------------------------
|
||||
def _search_address(query: str) -> Optional[Tuple[float, float]]:
|
||||
"""Kakao 주소 검색 API."""
|
||||
try:
|
||||
resp = requests.get(
|
||||
_ADDRESS_URL,
|
||||
params={'query': query},
|
||||
headers=_HEADERS,
|
||||
timeout=5,
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
return None
|
||||
data = resp.json()
|
||||
docs = data.get('documents', [])
|
||||
if docs:
|
||||
return float(docs[0]['y']), float(docs[0]['x'])
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def _search_keyword(query: str) -> Optional[Tuple[float, float]]:
|
||||
"""Kakao 키워드 검색 API."""
|
||||
try:
|
||||
resp = requests.get(
|
||||
_KEYWORD_URL,
|
||||
params={'query': query},
|
||||
headers=_HEADERS,
|
||||
timeout=5,
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
return None
|
||||
data = resp.json()
|
||||
docs = data.get('documents', [])
|
||||
if docs:
|
||||
return float(docs[0]['y']), float(docs[0]['x'])
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 메인 Geocoding 함수
|
||||
# ---------------------------------------------------------------------------
|
||||
def geocode(query: str) -> Optional[Tuple[float, float]]:
|
||||
"""주소/키워드 → (lat, lng). 캐시 적용. 실패 시 None."""
|
||||
if not config.KAKAO_REST_KEY:
|
||||
return None
|
||||
|
||||
if query in _cache:
|
||||
return _cache[query]
|
||||
|
||||
time.sleep(_RATE_LIMIT)
|
||||
|
||||
# 1차: 주소 검색 (지번/도로명)
|
||||
result = _search_address(query)
|
||||
|
||||
# 2차: 키워드 검색 (장소명, 방조제, 해수욕장 등)
|
||||
if result is None:
|
||||
time.sleep(_RATE_LIMIT)
|
||||
result = _search_keyword(query)
|
||||
|
||||
_cache[query] = result
|
||||
return result
|
||||
|
||||
|
||||
def geocode_section(
|
||||
access_pt: Optional[str],
|
||||
sect_nm: str,
|
||||
zone_name: str,
|
||||
) -> Optional[Tuple[float, float]]:
|
||||
"""구간에 대한 최적 좌표 검색.
|
||||
|
||||
우선순위:
|
||||
1. access_pt (주소/지명)
|
||||
2. sect_nm에서 '인근 해안' 제거한 지역명
|
||||
3. zone_name (PDF 구역명)
|
||||
"""
|
||||
# 1. access_pt로 시도
|
||||
if access_pt:
|
||||
result = geocode(access_pt)
|
||||
if result:
|
||||
return result
|
||||
# access_pt + zone_name 조합 시도
|
||||
combined = f'{zone_name} {access_pt}' if zone_name else access_pt
|
||||
result = geocode(combined)
|
||||
if result:
|
||||
return result
|
||||
|
||||
# 2. sect_nm에서 지역명 추출
|
||||
area = _extract_area(sect_nm)
|
||||
if area:
|
||||
result = geocode(area)
|
||||
if result:
|
||||
return result
|
||||
|
||||
# 3. zone_name fallback
|
||||
if zone_name:
|
||||
result = geocode(zone_name)
|
||||
if result:
|
||||
return result
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _extract_area(sect_nm: str) -> str:
|
||||
"""sect_nm에서 geocoding용 지역명 추출.
|
||||
|
||||
예: '하동군 금남면 노량리 인근 해안' → '하동군 금남면 노량리'
|
||||
"""
|
||||
if not sect_nm:
|
||||
return ''
|
||||
# '인근 해안' 등 제거
|
||||
area = re.sub(r'\s*인근\s*해안\s*$', '', sect_nm).strip()
|
||||
# '해안' 단독 제거
|
||||
area = re.sub(r'\s*해안\s*$', '', area).strip()
|
||||
return area
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 오프셋 분산 (같은 좌표에 겹치는 구간들 분산)
|
||||
# ---------------------------------------------------------------------------
|
||||
def apply_spiral_offset(
|
||||
base_lat: float,
|
||||
base_lng: float,
|
||||
index: int,
|
||||
spacing: float = 0.0005,
|
||||
) -> Tuple[float, float]:
|
||||
"""나선형 오프셋 적용. index=0이면 원점, 1부터 나선.
|
||||
|
||||
spacing ~= 50m (위도 기준)
|
||||
"""
|
||||
if index == 0:
|
||||
return base_lat, base_lng
|
||||
|
||||
# 나선형: 각도와 반경 증가
|
||||
angle = index * 137.508 * math.pi / 180 # 황금각
|
||||
radius = spacing * math.sqrt(index)
|
||||
|
||||
lat = base_lat + radius * math.cos(angle)
|
||||
lng = base_lng + radius * math.sin(angle)
|
||||
|
||||
return round(lat, 6), round(lng, 6)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 배치 Geocoding
|
||||
# ---------------------------------------------------------------------------
|
||||
def geocode_sections(
|
||||
sections: list[dict],
|
||||
zone_name: str = '',
|
||||
) -> Tuple[int, int]:
|
||||
"""섹션 리스트에 lat/lng를 채운다.
|
||||
|
||||
Returns:
|
||||
(성공 수, 실패 수)
|
||||
"""
|
||||
# sect_nm 그룹별로 처리 (오프셋 적용)
|
||||
from collections import defaultdict
|
||||
|
||||
groups: dict[str, list[dict]] = defaultdict(list)
|
||||
for s in sections:
|
||||
groups[s.get('sect_nm', '')].append(s)
|
||||
|
||||
success = 0
|
||||
fail = 0
|
||||
|
||||
for sect_nm, group in groups.items():
|
||||
# 그룹 대표 좌표 구하기 (첫 번째 access_pt 있는 구간 또는 sect_nm)
|
||||
base_coord = None
|
||||
|
||||
# access_pt가 있는 구간에서 먼저 시도
|
||||
for s in group:
|
||||
if s.get('access_pt'):
|
||||
coord = geocode_section(s['access_pt'], sect_nm, zone_name)
|
||||
if coord:
|
||||
base_coord = coord
|
||||
break
|
||||
|
||||
# access_pt로 못 찾으면 sect_nm으로
|
||||
if base_coord is None:
|
||||
base_coord = geocode_section(None, sect_nm, zone_name)
|
||||
|
||||
if base_coord is None:
|
||||
fail += len(group)
|
||||
continue
|
||||
|
||||
# 그룹 내 구간별 좌표 할당
|
||||
# access_pt가 있는 구간은 개별 geocoding, 없으면 대표+오프셋
|
||||
offset_idx = 0
|
||||
for s in sorted(group, key=lambda x: x.get('section_number', 0)):
|
||||
if s.get('access_pt'):
|
||||
coord = geocode_section(s['access_pt'], sect_nm, zone_name)
|
||||
if coord:
|
||||
s['lat'], s['lng'] = coord
|
||||
success += 1
|
||||
continue
|
||||
|
||||
# 오프셋 적용
|
||||
lat, lng = apply_spiral_offset(base_coord[0], base_coord[1], offset_idx)
|
||||
s['lat'] = lat
|
||||
s['lng'] = lng
|
||||
offset_idx += 1
|
||||
success += 1
|
||||
|
||||
return success, fail
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 캐시 저장/로드
|
||||
# ---------------------------------------------------------------------------
|
||||
_CACHE_FILE = Path(__file__).parent / 'output' / '.geocode_cache.json'
|
||||
|
||||
|
||||
def save_cache():
|
||||
"""캐시를 파일로 저장."""
|
||||
_CACHE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
serializable = {k: list(v) if v else None for k, v in _cache.items()}
|
||||
with open(_CACHE_FILE, 'w', encoding='utf-8') as f:
|
||||
json.dump(serializable, f, ensure_ascii=False, indent=2)
|
||||
|
||||
|
||||
def load_cache():
|
||||
"""캐시 파일 로드."""
|
||||
global _cache
|
||||
if _CACHE_FILE.exists():
|
||||
with open(_CACHE_FILE, encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
_cache = {k: tuple(v) if v else None for k, v in data.items()}
|
||||
@ -1,124 +0,0 @@
|
||||
"""PDF에서 해안조사 사진을 추출하여 scat-photos 폴더에 저장.
|
||||
|
||||
Type A (해안사전평가정보집): 1페이지 = 1구간 → 가장 큰 RGB 사진
|
||||
Type B (방제정보집): 1페이지 = 2구간 → RGB 사진 2장, 순서대로 매칭
|
||||
|
||||
저장 네이밍: {sect_cd}-1.png (ScatPopup에서 참조하는 형식)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple
|
||||
|
||||
import fitz # PyMuPDF
|
||||
|
||||
from pdf_parser import is_data_page, _CODE_RE
|
||||
from pdf_parser_b import is_data_page_b
|
||||
|
||||
import config
|
||||
|
||||
|
||||
def _get_page_photos(
|
||||
doc: fitz.Document, page: fitz.Page,
|
||||
) -> List[fitz.Pixmap]:
|
||||
"""페이지에서 RGB 사진만 추출 (배경 제외). 크기 내림차순."""
|
||||
photos: List[Tuple[fitz.Pixmap, int]] = []
|
||||
for img_info in page.get_images(full=True):
|
||||
xref = img_info[0]
|
||||
pix = fitz.Pixmap(doc, xref)
|
||||
# CMYK → RGB
|
||||
if pix.n > 4:
|
||||
pix = fitz.Pixmap(fitz.csRGB, pix)
|
||||
elif pix.n == 4:
|
||||
pix = fitz.Pixmap(fitz.csRGB, pix)
|
||||
# 흑백(n=1) 또는 배경(2000px 이상) 스킵
|
||||
if pix.n < 3 or pix.width >= 2000:
|
||||
continue
|
||||
photos.append((pix, pix.width * pix.height))
|
||||
photos.sort(key=lambda x: x[1], reverse=True)
|
||||
return photos
|
||||
|
||||
|
||||
def _extract_codes_type_a(page: fitz.Page) -> List[str]:
|
||||
"""Type A 페이지에서 sect_cd 추출."""
|
||||
text = page.get_text('text')
|
||||
return _CODE_RE.findall(text)
|
||||
|
||||
|
||||
def _extract_codes_type_b(page: fitz.Page) -> List[str]:
|
||||
"""Type B 페이지에서 sect_cd 추출 (순서 유지)."""
|
||||
text = page.get_text('text')
|
||||
codes = re.findall(r'([A-Z]{4,}-\d+(?:-[A-Z]){3,})', text)
|
||||
seen = set()
|
||||
unique = []
|
||||
for c in codes:
|
||||
if c not in seen:
|
||||
seen.add(c)
|
||||
unique.append(c)
|
||||
return unique
|
||||
|
||||
|
||||
def _save_pixmap(pix: fitz.Pixmap, output_path: Path):
|
||||
"""Pixmap을 PNG로 저장."""
|
||||
if pix.alpha:
|
||||
pix = fitz.Pixmap(fitz.csRGB, pix)
|
||||
pix.save(str(output_path))
|
||||
|
||||
|
||||
def extract_images_from_pdf(
|
||||
pdf_path: str | Path,
|
||||
output_dir: str | Path | None = None,
|
||||
pdf_type: str = 'A',
|
||||
) -> int:
|
||||
"""PDF에서 데이터 페이지별 대표 사진을 추출.
|
||||
|
||||
Args:
|
||||
pdf_path: PDF 파일 경로
|
||||
output_dir: 이미지 저장 경로 (기본: config.SCAT_PHOTOS_DIR)
|
||||
pdf_type: 'A' 또는 'B'
|
||||
|
||||
Returns:
|
||||
추출된 이미지 수
|
||||
"""
|
||||
pdf_path = Path(pdf_path)
|
||||
out_dir = Path(output_dir) if output_dir else config.SCAT_PHOTOS_DIR
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
doc = fitz.open(str(pdf_path))
|
||||
saved = 0
|
||||
|
||||
for i in range(doc.page_count):
|
||||
page = doc[i]
|
||||
|
||||
if pdf_type == 'A':
|
||||
if not is_data_page(page):
|
||||
continue
|
||||
codes = _extract_codes_type_a(page)
|
||||
photos = _get_page_photos(doc, page)
|
||||
if not codes or not photos:
|
||||
continue
|
||||
# 가장 작은 RGB 이미지 = 실제 현장 사진
|
||||
pix = photos[-1][0]
|
||||
out_path = out_dir / f'{codes[0]}-1.png'
|
||||
_save_pixmap(pix, out_path)
|
||||
saved += 1
|
||||
|
||||
elif pdf_type == 'B':
|
||||
if not is_data_page_b(page):
|
||||
continue
|
||||
codes = _extract_codes_type_b(page)
|
||||
photos = _get_page_photos(doc, page)
|
||||
if not codes or not photos:
|
||||
continue
|
||||
# Type B: 사진 크기 동일, 순서대로 매칭
|
||||
for idx, code in enumerate(codes):
|
||||
if idx < len(photos):
|
||||
pix = photos[idx][0]
|
||||
out_path = out_dir / f'{code}-1.png'
|
||||
_save_pixmap(pix, out_path)
|
||||
saved += 1
|
||||
|
||||
doc.close()
|
||||
return saved
|
||||
@ -1,52 +0,0 @@
|
||||
"""SCAT PDF 파싱 데이터 모델."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import List, Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class SensitiveItem(BaseModel):
|
||||
"""민감자원 항목."""
|
||||
t: str # 유형 (사회경제적 / 생물자원)
|
||||
v: str # 내용
|
||||
|
||||
|
||||
class PhotoInfo(BaseModel):
|
||||
"""추출된 사진 정보."""
|
||||
filename: str
|
||||
page: int
|
||||
index: int
|
||||
|
||||
|
||||
class CoastalSection(BaseModel):
|
||||
"""해안 구간 1건 (PDF 1페이지)."""
|
||||
section_number: int = 0
|
||||
sect_nm: str = '' # 지역명
|
||||
sect_cd: str = '' # 코드명 (SSDD-1)
|
||||
esi_cd: Optional[str] = None # ESI 등급 (1, 2, 3, 6A, 6B, 8A, 8B 등)
|
||||
esi_num: Optional[int] = None # ESI 숫자 (1~8)
|
||||
shore_tp: Optional[str] = None # 해안 형태 (폐쇄형/개방형)
|
||||
cst_tp_cd: Optional[str] = None # 해안 구성 (투과성 인공호안, 모래 등)
|
||||
len_m: Optional[float] = None # 해안길이 (m)
|
||||
width_m: Optional[float] = None # 해안 폭 (m, 선택)
|
||||
lat: Optional[float] = None # 위도
|
||||
lng: Optional[float] = None # 경도
|
||||
access_dc: Optional[str] = None # 접근방법 설명
|
||||
access_pt: Optional[str] = None # 주요접근지점
|
||||
sensitive_info: List[SensitiveItem] = [] # 민감자원
|
||||
cleanup_methods: List[str] = [] # 권장 방제 방법
|
||||
end_criteria: List[str] = [] # 권장 방제 중지 기준
|
||||
notes: List[str] = [] # 해안 방제시 고려사항
|
||||
photos: List[PhotoInfo] = [] # 추출된 사진
|
||||
|
||||
|
||||
class ParseResult(BaseModel):
|
||||
"""PDF 파싱 전체 결과."""
|
||||
pdf_filename: str
|
||||
zone_name: str = '' # PDF 헤더에서 추출한 구역명
|
||||
jurisdiction: str = '' # 관할 (보령 해양경비안전서 등)
|
||||
total_sections: int = 0
|
||||
sections: List[CoastalSection] = []
|
||||
skipped_pages: int = 0
|
||||
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
@ -1,390 +0,0 @@
|
||||
"""PDF 텍스트 파싱 — 상태머신 방식으로 해안사전평가 정보를 추출한다."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from enum import Enum, auto
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Tuple, Union
|
||||
|
||||
import fitz # PyMuPDF
|
||||
|
||||
from models import CoastalSection, SensitiveItem, ParseResult
|
||||
from esi_mapper import map_esi
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 상태머신 상태
|
||||
# ---------------------------------------------------------------------------
|
||||
class State(Enum):
|
||||
HEADER = auto()
|
||||
GENERAL = auto() # 해안 일반특성
|
||||
ACCESS = auto() # 접근방법
|
||||
SENSITIVE = auto() # 민감자원 정보
|
||||
CLEANUP = auto() # 권장 방제 방법
|
||||
END_CRITERIA = auto() # 권장 방제 중지 기준
|
||||
CONSIDER = auto() # 해안 방제시 고려사항
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 섹션 시작 키워드 → 상태 매핑
|
||||
# ---------------------------------------------------------------------------
|
||||
_SECTION_KEYWORDS: list[tuple[str, State]] = [
|
||||
('해안 일반특성', State.GENERAL),
|
||||
('해안 일반 특성', State.GENERAL),
|
||||
('접근방법', State.ACCESS),
|
||||
('접근 방법', State.ACCESS),
|
||||
('민감자원 정보', State.SENSITIVE),
|
||||
('민감자원정보', State.SENSITIVE),
|
||||
('권장 방제 방법', State.CLEANUP),
|
||||
('권장방제방법', State.CLEANUP),
|
||||
('권 장 방 제 방 법', State.CLEANUP),
|
||||
('권장 방제 중지 기준', State.END_CRITERIA),
|
||||
('권장방제중지기준', State.END_CRITERIA),
|
||||
('권 장 방 제 중 지 기 준', State.END_CRITERIA),
|
||||
('해안 방제시 고려사항', State.CONSIDER),
|
||||
('해안 방제 시 고려사항', State.CONSIDER),
|
||||
('해안방제시 고려사항', State.CONSIDER),
|
||||
]
|
||||
|
||||
# 코드 패턴: SSDD-1, BRSM-12, DDIS-1 등
|
||||
_CODE_RE = re.compile(r'\(([A-Z]{2,}-\d+)\)')
|
||||
_NAME_CODE_RE = re.compile(r'(.+?)\s*\(([A-Z]{2,}-\d+)\)')
|
||||
_LENGTH_RE = re.compile(r'약\s*([\d,]+\.?\d*)\s*m\s*임?')
|
||||
_WIDTH_RE = re.compile(r'폭[은는]?\s*약\s*([\d,]+\.?\d*)\s*m')
|
||||
_NUMBER_RE = re.compile(r'^\d+$')
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 유틸 함수
|
||||
# ---------------------------------------------------------------------------
|
||||
def _clean_bullet(line: str) -> str:
|
||||
"""불릿 접두사(Ÿ, ·, •, -) 제거 후 strip."""
|
||||
return line.lstrip('Ÿ \t·•- ').strip()
|
||||
|
||||
|
||||
def _is_bullet(line: str) -> bool:
|
||||
"""불릿으로 시작하는 줄인지 확인."""
|
||||
stripped = line.strip()
|
||||
return stripped.startswith('Ÿ') or stripped.startswith('·') or stripped.startswith('•')
|
||||
|
||||
|
||||
def _is_sub_bullet(line: str) -> bool:
|
||||
"""서브 불릿(- 접두사)으로 시작하는 줄인지 확인."""
|
||||
stripped = line.strip()
|
||||
return stripped.startswith('-') and len(stripped) > 1
|
||||
|
||||
|
||||
def _is_end_criteria_item(text: str) -> bool:
|
||||
"""방제 중지 기준 항목인지 판별 (조건문 패턴)."""
|
||||
criteria_patterns = [
|
||||
'없어야', '않아야', '미만', '이하', '이상',
|
||||
'분포해야', '발생하지',
|
||||
]
|
||||
return any(p in text for p in criteria_patterns)
|
||||
|
||||
|
||||
def _parse_measurement(text: str, pattern: re.Pattern) -> float | None:
|
||||
"""정규식으로 수치 추출."""
|
||||
m = pattern.search(text)
|
||||
if m:
|
||||
return float(m.group(1).replace(',', ''))
|
||||
return None
|
||||
|
||||
|
||||
def _detect_section_keyword(line: str) -> State | None:
|
||||
"""줄이 섹션 시작 키워드를 포함하는지 확인."""
|
||||
normalized = line.replace(' ', '')
|
||||
for keyword, state in _SECTION_KEYWORDS:
|
||||
if keyword.replace(' ', '') in normalized:
|
||||
return state
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 데이터 페이지 판별
|
||||
# ---------------------------------------------------------------------------
|
||||
def is_data_page(page: fitz.Page) -> bool:
|
||||
"""데이터 페이지인지 판별 — 코드 패턴 + 키워드 존재 여부."""
|
||||
text = page.get_text('text')
|
||||
has_code = bool(_CODE_RE.search(text))
|
||||
has_keyword = '일반특성' in text or '접근방법' in text or '방제 방법' in text
|
||||
return has_code and has_keyword
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 단일 페이지 파싱
|
||||
# ---------------------------------------------------------------------------
|
||||
def _merge_bullet_lines(raw_lines: list) -> list:
|
||||
"""F-series 형식: 'Ÿ' 단독 줄 + 다음 줄 텍스트를 병합."""
|
||||
merged = []
|
||||
i = 0
|
||||
while i < len(raw_lines):
|
||||
line = raw_lines[i].strip()
|
||||
if line == 'Ÿ' and i + 1 < len(raw_lines):
|
||||
# 다음 줄과 병합
|
||||
merged.append('Ÿ ' + raw_lines[i + 1].strip())
|
||||
i += 2
|
||||
elif line:
|
||||
merged.append(line)
|
||||
i += 1
|
||||
else:
|
||||
i += 1
|
||||
return merged
|
||||
|
||||
|
||||
def parse_page(page: fitz.Page) -> CoastalSection | None:
|
||||
"""데이터 페이지에서 CoastalSection 추출."""
|
||||
text = page.get_text('text')
|
||||
raw_lines = text.split('\n')
|
||||
lines = _merge_bullet_lines(raw_lines)
|
||||
|
||||
section = CoastalSection()
|
||||
state = State.HEADER
|
||||
|
||||
# 현재 섹션에 수집 중인 불릿 항목들
|
||||
current_bullets: list[str] = []
|
||||
# 민감자원 서브 섹션 추적
|
||||
sensitive_sub: str = ''
|
||||
sensitive_items: list[SensitiveItem] = []
|
||||
# 방제방법+중지기준 두 컬럼 병합 모드
|
||||
cleanup_merged = False
|
||||
|
||||
def _flush_bullets():
|
||||
"""현재 상태의 불릿을 section에 반영."""
|
||||
nonlocal current_bullets, sensitive_sub
|
||||
if state == State.GENERAL:
|
||||
_parse_general(section, current_bullets)
|
||||
elif state == State.ACCESS:
|
||||
_parse_access(section, current_bullets)
|
||||
elif state == State.SENSITIVE:
|
||||
if sensitive_sub and current_bullets:
|
||||
sensitive_items.append(SensitiveItem(
|
||||
t=sensitive_sub,
|
||||
v='\n'.join(current_bullets),
|
||||
))
|
||||
elif state == State.CLEANUP:
|
||||
section.cleanup_methods = current_bullets[:]
|
||||
elif state == State.END_CRITERIA:
|
||||
# 병합 모드: 불릿을 방제방법/중지기준으로 분류
|
||||
if cleanup_merged:
|
||||
_split_cleanup_and_criteria(section, current_bullets)
|
||||
else:
|
||||
section.end_criteria = current_bullets[:]
|
||||
elif state == State.CONSIDER:
|
||||
section.notes = current_bullets[:]
|
||||
current_bullets = []
|
||||
|
||||
for line in lines:
|
||||
# 페이지 헤더/푸터 스킵
|
||||
if '해양경비안전서' in line and ('관할' in line or '정보집' in line):
|
||||
continue
|
||||
if '해안사전평가 정보' in line and '∙' in line:
|
||||
continue
|
||||
if '해양경찰서' in line and ('관할' in line or '정보집' in line):
|
||||
continue
|
||||
|
||||
# 섹션 전환 감지
|
||||
new_state = _detect_section_keyword(line)
|
||||
if new_state and new_state != state:
|
||||
# 방제방법→중지기준 헤더가 연속 (두 컬럼 레이아웃)
|
||||
if state == State.CLEANUP and new_state == State.END_CRITERIA and not current_bullets:
|
||||
cleanup_merged = True
|
||||
state = State.END_CRITERIA
|
||||
continue
|
||||
_flush_bullets()
|
||||
state = new_state
|
||||
sensitive_sub = ''
|
||||
continue
|
||||
|
||||
# HEADER 상태: 번호 + 지역명/코드명 추출
|
||||
if state == State.HEADER:
|
||||
if _NUMBER_RE.match(line):
|
||||
section.section_number = int(line)
|
||||
continue
|
||||
m = _NAME_CODE_RE.search(line)
|
||||
if m:
|
||||
section.sect_nm = m.group(1).strip()
|
||||
section.sect_cd = m.group(2).strip()
|
||||
continue
|
||||
continue
|
||||
|
||||
# 민감자원: 서브 섹션 감지 (Ÿ 불릿 또는 일반 텍스트)
|
||||
if state == State.SENSITIVE:
|
||||
cleaned_for_check = _clean_bullet(line) if _is_bullet(line) else line
|
||||
if '경제적' in cleaned_for_check and '자원' in cleaned_for_check:
|
||||
if sensitive_sub and current_bullets:
|
||||
sensitive_items.append(SensitiveItem(
|
||||
t=sensitive_sub, v='\n'.join(current_bullets),
|
||||
))
|
||||
sensitive_sub = '사회경제적'
|
||||
current_bullets = []
|
||||
continue
|
||||
if '생물자원' in cleaned_for_check:
|
||||
if sensitive_sub and current_bullets:
|
||||
sensitive_items.append(SensitiveItem(
|
||||
t=sensitive_sub, v='\n'.join(current_bullets),
|
||||
))
|
||||
sensitive_sub = '생물자원'
|
||||
current_bullets = []
|
||||
continue
|
||||
|
||||
# 불릿 항목 수집
|
||||
if _is_bullet(line):
|
||||
current_bullets.append(_clean_bullet(line))
|
||||
elif _is_sub_bullet(line):
|
||||
# "-" 접두사 서브 항목 (민감자원 상세 등)
|
||||
cleaned = line.strip().lstrip('-').strip()
|
||||
if cleaned:
|
||||
current_bullets.append(cleaned)
|
||||
elif current_bullets and line and not _detect_section_keyword(line):
|
||||
# 연속행 (불릿 없이 이어지는 텍스트)
|
||||
cleaned = line.strip()
|
||||
if cleaned:
|
||||
current_bullets[-1] += ' ' + cleaned
|
||||
|
||||
# 마지막 섹션 flush
|
||||
_flush_bullets()
|
||||
section.sensitive_info = sensitive_items
|
||||
|
||||
if not section.sect_cd:
|
||||
return None
|
||||
|
||||
# ESI 등급 매핑 (cst_tp_cd 기반)
|
||||
if section.cst_tp_cd:
|
||||
section.esi_cd, section.esi_num = map_esi(section.cst_tp_cd)
|
||||
|
||||
return section
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 섹션별 파싱 헬퍼
|
||||
# ---------------------------------------------------------------------------
|
||||
def _parse_general(section: CoastalSection, bullets: list[str]):
|
||||
"""해안 일반특성 불릿에서 shore_tp, cst_tp_cd, len_m, width_m 추출."""
|
||||
for b in bullets:
|
||||
if '형태' in b:
|
||||
if '폐쇄' in b:
|
||||
section.shore_tp = '폐쇄형'
|
||||
elif '개방' in b:
|
||||
section.shore_tp = '개방형'
|
||||
elif '반폐쇄' in b or '반 폐쇄' in b:
|
||||
section.shore_tp = '반폐쇄형'
|
||||
elif '이루어져' in b or '조성' in b or '으로 이루어' in b:
|
||||
# 해안 구성 추출
|
||||
section.cst_tp_cd = _extract_coastal_type(b)
|
||||
length = _parse_measurement(b, _LENGTH_RE)
|
||||
if length and not section.len_m:
|
||||
section.len_m = length
|
||||
width = _parse_measurement(b, _WIDTH_RE)
|
||||
if width:
|
||||
section.width_m = width
|
||||
|
||||
|
||||
def _extract_coastal_type(text: str) -> str:
|
||||
"""해안 구성 유형 추출."""
|
||||
types = [
|
||||
'투과성 인공호안', '비투과성 인공호안', '인공호안',
|
||||
'모래', '세립질 모래', '굵은 모래',
|
||||
'자갈', '수직암반', '수평암반',
|
||||
'갯벌', '습지', '사석',
|
||||
'콘크리트', '테트라포드',
|
||||
]
|
||||
for t in types:
|
||||
if t in text:
|
||||
return t
|
||||
# fallback: "해안은 XXX으로 이루어져" 패턴
|
||||
m = re.search(r'해안은\s+(.+?)(?:으로|로)\s*이루어져', text)
|
||||
if m:
|
||||
return m.group(1).strip()
|
||||
return text
|
||||
|
||||
|
||||
def _split_cleanup_and_criteria(section: CoastalSection, bullets: list[str]):
|
||||
"""두 컬럼이 병합된 불릿을 방제방법/중지기준으로 분류."""
|
||||
cleanup = []
|
||||
criteria = []
|
||||
for b in bullets:
|
||||
if _is_end_criteria_item(b):
|
||||
criteria.append(b)
|
||||
else:
|
||||
cleanup.append(b)
|
||||
section.cleanup_methods = cleanup
|
||||
section.end_criteria = criteria
|
||||
|
||||
|
||||
def _parse_access(section: CoastalSection, bullets: list[str]):
|
||||
"""접근방법 불릿에서 access_dc, access_pt 추출."""
|
||||
access_parts = []
|
||||
for b in bullets:
|
||||
if '주요접근지점' in b or '주요 접근지점' in b or '주요접근 지점' in b:
|
||||
# "주요접근지점 : 부사방조제" 패턴
|
||||
parts = re.split(r'[::]', b, maxsplit=1)
|
||||
if len(parts) > 1:
|
||||
section.access_pt = parts[1].strip()
|
||||
else:
|
||||
section.access_pt = b.replace('주요접근지점', '').strip()
|
||||
else:
|
||||
access_parts.append(b)
|
||||
if access_parts:
|
||||
section.access_dc = ' / '.join(access_parts)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 전체 PDF 파싱
|
||||
# ---------------------------------------------------------------------------
|
||||
def parse_pdf(pdf_path: str | Path) -> ParseResult:
|
||||
"""PDF 전체를 파싱하여 ParseResult 반환."""
|
||||
pdf_path = Path(pdf_path)
|
||||
doc = fitz.open(str(pdf_path))
|
||||
|
||||
result = ParseResult(
|
||||
pdf_filename=pdf_path.name,
|
||||
)
|
||||
|
||||
# 관할/구역명 추출 시도 (첫 30페이지 탐색)
|
||||
for i in range(min(35, doc.page_count)):
|
||||
text = doc[i].get_text('text')
|
||||
if '관할' in text and '해안' in text:
|
||||
# "보령 해양경비안전서 관할" 패턴
|
||||
m = re.search(r'(\S+\s*해양경[비찰]\S*)\s*관할', text)
|
||||
if m and not result.jurisdiction:
|
||||
result.jurisdiction = m.group(1).strip()
|
||||
# 구역명: "X. 충남 서천군 해안 사전평가 정보" 패턴
|
||||
m = re.search(r'\d+\.\s*(.+?)\s*해안\s*사전평가', text)
|
||||
if m and not result.zone_name:
|
||||
result.zone_name = m.group(1).strip()
|
||||
if result.jurisdiction and result.zone_name:
|
||||
break
|
||||
|
||||
# 데이터 페이지 파싱
|
||||
skipped = 0
|
||||
for i in range(doc.page_count):
|
||||
page = doc[i]
|
||||
if not is_data_page(page):
|
||||
skipped += 1
|
||||
continue
|
||||
section = parse_page(page)
|
||||
if section:
|
||||
result.sections.append(section)
|
||||
|
||||
result.total_sections = len(result.sections)
|
||||
result.skipped_pages = skipped
|
||||
doc.close()
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI 실행
|
||||
# ---------------------------------------------------------------------------
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
import json
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
print('Usage: python parser.py <pdf_path>')
|
||||
sys.exit(1)
|
||||
|
||||
r = parse_pdf(sys.argv[1])
|
||||
print(json.dumps(r.model_dump(), ensure_ascii=False, indent=2))
|
||||
@ -1,341 +0,0 @@
|
||||
"""PDF 텍스트 파싱 — Type B (방제정보집) 형식.
|
||||
|
||||
여수해경서 등의 '방제정보집' PDF에서 해안 구간 정보를 추출한다.
|
||||
Type A(해안사전평가정보집)와 다른 레이아웃:
|
||||
- 코드: HDPJ-1-M-E-R-P-L (괄호 없음)
|
||||
- 페이지당 2개 구간
|
||||
- '식별자 코드명' 라벨로 구간 시작
|
||||
- '민감자원' 섹션 없음
|
||||
- '초기 방제 및 고려사항' 섹션
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
import fitz # PyMuPDF
|
||||
|
||||
from models import CoastalSection, ParseResult
|
||||
from esi_mapper import parse_esi_cd
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 정규식 패턴
|
||||
# ---------------------------------------------------------------------------
|
||||
# HDPJ-1-M-E-R-P-L, HDDH-10-MI-E-R-P-L 등
|
||||
_CODE_RE_B = re.compile(r'([A-Z]{2,}-\d+(?:-[A-Z]{1,2}){0,5}(?:-[A-Z])*)')
|
||||
_ESI_RE = re.compile(r'ESI\s*등급\s*[::]\s*(\d+[A-Z]?)')
|
||||
_LENGTH_RE = re.compile(r'([\d,.]+)\s*/\s*(-|[\d,.]+(?:\.\d+)?)')
|
||||
_NUMBER_RE = re.compile(r'^(\d{1,3})$')
|
||||
_SECTION_START_RE = re.compile(r'식별자\s+코드명')
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 유틸
|
||||
# ---------------------------------------------------------------------------
|
||||
def _clean_bullet(line: str) -> str:
|
||||
return line.lstrip('Ÿ \t·•- ').strip()
|
||||
|
||||
|
||||
def _is_bullet(line: str) -> bool:
|
||||
s = line.strip()
|
||||
return s.startswith('Ÿ') or s.startswith('·') or s.startswith('•')
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 데이터 페이지 판별
|
||||
# ---------------------------------------------------------------------------
|
||||
def is_data_page_b(page: fitz.Page) -> bool:
|
||||
text = page.get_text('text')
|
||||
has_identifier = bool(_SECTION_START_RE.search(text))
|
||||
has_code = bool(_CODE_RE_B.search(text))
|
||||
return has_identifier and has_code
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 단일 구간 블록 파싱
|
||||
# ---------------------------------------------------------------------------
|
||||
def _parse_section_block(lines: list[str], area_name: str) -> CoastalSection | None:
|
||||
"""식별자 코드명 ~ 다음 식별자 코드명 사이의 텍스트 블록을 파싱."""
|
||||
section = CoastalSection()
|
||||
section.sect_nm = area_name
|
||||
|
||||
# Phase: code → general → cleanup → end_criteria → consider
|
||||
phase = 'code'
|
||||
cleanup_items: list[str] = []
|
||||
end_criteria_items: list[str] = []
|
||||
consider_items: list[str] = []
|
||||
current_target: list[str] | None = None
|
||||
|
||||
i = 0
|
||||
while i < len(lines):
|
||||
line = lines[i].strip()
|
||||
i += 1
|
||||
|
||||
if not line:
|
||||
continue
|
||||
|
||||
# 코드 추출
|
||||
if phase == 'code':
|
||||
m = _CODE_RE_B.search(line)
|
||||
if m:
|
||||
section.sect_cd = m.group(1)
|
||||
phase = 'general'
|
||||
else:
|
||||
# 구간 번호
|
||||
nm = _NUMBER_RE.match(line)
|
||||
if nm:
|
||||
section.section_number = int(nm.group(1))
|
||||
continue
|
||||
|
||||
# 해안 형태/저질 특성 + ESI + 길이
|
||||
if phase == 'general':
|
||||
# 형태
|
||||
if '형태' in line and ':' in line:
|
||||
val = line.split(':', 1)[1].strip().split(':')[-1].strip()
|
||||
if val:
|
||||
section.shore_tp = val
|
||||
continue
|
||||
# 퇴적물
|
||||
if '퇴적물' in line and ':' in line:
|
||||
val = line.split(':', 1)[1].strip().split(':')[-1].strip()
|
||||
if val:
|
||||
section.cst_tp_cd = val
|
||||
continue
|
||||
# ESI
|
||||
m = _ESI_RE.search(line)
|
||||
if m:
|
||||
section.esi_cd, section.esi_num = parse_esi_cd(m.group(1))
|
||||
continue
|
||||
# 길이/폭
|
||||
m = _LENGTH_RE.search(line)
|
||||
if m:
|
||||
try:
|
||||
section.len_m = float(m.group(1).replace(',', ''))
|
||||
except ValueError:
|
||||
pass
|
||||
width_str = m.group(2)
|
||||
if width_str != '-':
|
||||
# '2차선 도로' 등 비숫자 후속 방지
|
||||
end_pos = m.end(2)
|
||||
after = line[end_pos:end_pos + 1] if end_pos < len(line) else ''
|
||||
if not after or after in (' ', '\t', '\n', ')', ''):
|
||||
try:
|
||||
section.width_m = float(width_str.replace(',', ''))
|
||||
except ValueError:
|
||||
pass
|
||||
continue
|
||||
# 접근성 — þ 마커들은 접근성 열에 해당
|
||||
if 'þ' in line:
|
||||
continue
|
||||
# 섹션 전환 감지
|
||||
normalized = line.replace(' ', '')
|
||||
if '권장방제방법' in normalized:
|
||||
phase = 'cleanup'
|
||||
current_target = cleanup_items
|
||||
continue
|
||||
if '접근성' in normalized or '차량' in normalized or '도로' in normalized or '도보' in normalized or '선박' in normalized:
|
||||
continue
|
||||
if '해안길이' in normalized or '대표' in normalized or '사진' in normalized:
|
||||
continue
|
||||
continue
|
||||
|
||||
# 방제 방법 / 종료 기준 (두 컬럼이 같은 줄에 섞여 나옴)
|
||||
if phase == 'cleanup':
|
||||
normalized = line.replace(' ', '')
|
||||
if '방제종료기준' in normalized:
|
||||
# 헤더만 있는 줄 — 이후 불릿은 종료기준
|
||||
current_target = end_criteria_items
|
||||
continue
|
||||
if '초기방제' in normalized and '고려사항' in normalized:
|
||||
phase = 'consider'
|
||||
current_target = consider_items
|
||||
continue
|
||||
if _is_bullet(line):
|
||||
text = _clean_bullet(line)
|
||||
if text:
|
||||
# 두 컬럼이 혼합된 경우 heuristic 분류
|
||||
if _is_criteria(text):
|
||||
end_criteria_items.append(text)
|
||||
else:
|
||||
cleanup_items.append(text)
|
||||
continue
|
||||
|
||||
# 고려사항
|
||||
if phase == 'consider':
|
||||
if _is_bullet(line):
|
||||
text = _clean_bullet(line)
|
||||
if text:
|
||||
consider_items.append(text)
|
||||
elif consider_items and line and not _SECTION_START_RE.search(line):
|
||||
# 연속행
|
||||
consider_items[-1] += ' ' + line
|
||||
continue
|
||||
|
||||
section.cleanup_methods = cleanup_items
|
||||
section.end_criteria = end_criteria_items
|
||||
section.notes = consider_items
|
||||
|
||||
if not section.sect_cd:
|
||||
return None
|
||||
return section
|
||||
|
||||
|
||||
def _is_criteria(text: str) -> bool:
|
||||
patterns = ['없어야', '않아야', '미만', '이하', '이상', '분포해야',
|
||||
'분포하면', '발생하지', '묻어나지', '유출되지', '관찰되는']
|
||||
return any(p in text for p in patterns)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 접근성 추출
|
||||
# ---------------------------------------------------------------------------
|
||||
def _extract_accessibility(text_block: str) -> str | None:
|
||||
"""블록 텍스트에서 접근성(차량/도보/선박) 추출."""
|
||||
lines = text_block.split('\n')
|
||||
access_types = []
|
||||
# 접근성 헤더 찾기
|
||||
header_idx = -1
|
||||
for idx, line in enumerate(lines):
|
||||
s = line.replace(' ', '')
|
||||
if '접근성' in s:
|
||||
header_idx = idx
|
||||
break
|
||||
|
||||
if header_idx < 0:
|
||||
return None
|
||||
|
||||
# 헤더 이후에서 차량/도로/도보/선박 라벨과 þ 위치 매칭
|
||||
labels = []
|
||||
for idx in range(header_idx, min(header_idx + 5, len(lines))):
|
||||
line = lines[idx]
|
||||
for label in ['차량', '도로', '도보', '선박']:
|
||||
if label in line.replace(' ', ''):
|
||||
labels.append('도로' if label == '도로' else label)
|
||||
|
||||
# þ 마커 수 세기
|
||||
check_count = 0
|
||||
for idx in range(header_idx, min(header_idx + 8, len(lines))):
|
||||
check_count += lines[idx].count('þ')
|
||||
|
||||
if labels and check_count > 0:
|
||||
# þ 개수만큼 앞에서부터 접근 가능
|
||||
accessed = labels[:check_count] if check_count <= len(labels) else labels
|
||||
return ', '.join(accessed) + ' 접근 가능'
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 페이지 파싱 (여러 구간)
|
||||
# ---------------------------------------------------------------------------
|
||||
def parse_page_b(page: fitz.Page) -> list[CoastalSection]:
|
||||
"""데이터 페이지에서 CoastalSection 목록 추출 (보통 2개)."""
|
||||
text = page.get_text('text')
|
||||
lines = text.split('\n')
|
||||
|
||||
# 지역명 추출 (첫 줄 또는 헤더)
|
||||
area_name = ''
|
||||
for line in lines[:3]:
|
||||
stripped = line.strip()
|
||||
if stripped and '정보집' not in stripped and '∙' not in stripped:
|
||||
area_name = stripped
|
||||
break
|
||||
|
||||
# 접근성 추출 (페이지 전체에서)
|
||||
accessibility = _extract_accessibility(text)
|
||||
|
||||
# 구간 블록 분리: "식별자 코드명" 기준
|
||||
block_starts: list[int] = []
|
||||
for idx, line in enumerate(lines):
|
||||
if _SECTION_START_RE.search(line):
|
||||
# 구간 번호는 이전 줄에 있을 수 있음
|
||||
start = idx
|
||||
if idx > 0 and _NUMBER_RE.match(lines[idx - 1].strip()):
|
||||
start = idx - 1
|
||||
block_starts.append(start)
|
||||
|
||||
if not block_starts:
|
||||
return []
|
||||
|
||||
sections: list[CoastalSection] = []
|
||||
for i, start in enumerate(block_starts):
|
||||
end = block_starts[i + 1] if i + 1 < len(block_starts) else len(lines)
|
||||
block = lines[start:end]
|
||||
section = _parse_section_block(block, area_name)
|
||||
if section:
|
||||
if accessibility and not section.access_dc:
|
||||
section.access_dc = accessibility
|
||||
sections.append(section)
|
||||
|
||||
# 접근성은 구간마다 다를 수 있음 — 각 블록에서 개별 추출
|
||||
for i, start in enumerate(block_starts):
|
||||
end = block_starts[i + 1] if i + 1 < len(block_starts) else len(lines)
|
||||
block_text = '\n'.join(lines[start:end])
|
||||
acc = _extract_accessibility(block_text)
|
||||
if acc and i < len(sections):
|
||||
sections[i].access_dc = acc
|
||||
|
||||
return sections
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 전체 PDF 파싱
|
||||
# ---------------------------------------------------------------------------
|
||||
def parse_pdf_b(pdf_path: str | Path) -> ParseResult:
|
||||
"""Type B PDF 전체를 파싱하여 ParseResult 반환."""
|
||||
pdf_path = Path(pdf_path)
|
||||
doc = fitz.open(str(pdf_path))
|
||||
|
||||
result = ParseResult(pdf_filename=pdf_path.name)
|
||||
|
||||
# 관할/구역명 추출 (페이지 헤더/푸터에서)
|
||||
for i in range(min(40, doc.page_count)):
|
||||
text = doc[i].get_text('text')
|
||||
# "여수해경서 관할 해안 사전 평가 정보집" 패턴
|
||||
m = re.search(r'(\S+해경서)\s*관할', text)
|
||||
if m and not result.jurisdiction:
|
||||
result.jurisdiction = m.group(1).strip()
|
||||
# "2. 하동군 해안 사전 평가 정보" 패턴
|
||||
m = re.search(r'\d+\.\s*(.+?)\s*해안\s*사전\s*평가', text)
|
||||
if m and not result.zone_name:
|
||||
result.zone_name = m.group(1).strip()
|
||||
if result.jurisdiction and result.zone_name:
|
||||
break
|
||||
|
||||
# 데이터 페이지 파싱
|
||||
skipped = 0
|
||||
for i in range(doc.page_count):
|
||||
page = doc[i]
|
||||
if not is_data_page_b(page):
|
||||
skipped += 1
|
||||
continue
|
||||
sections = parse_page_b(page)
|
||||
result.sections.extend(sections)
|
||||
|
||||
result.total_sections = len(result.sections)
|
||||
result.skipped_pages = skipped
|
||||
doc.close()
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
import json
|
||||
import io
|
||||
|
||||
if sys.platform == 'win32':
|
||||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
print('Usage: python pdf_parser_b.py <pdf_path>')
|
||||
sys.exit(1)
|
||||
|
||||
r = parse_pdf_b(sys.argv[1])
|
||||
print(json.dumps(r.model_dump(), ensure_ascii=False, indent=2))
|
||||
@ -1,14 +0,0 @@
|
||||
# PDF 파싱
|
||||
PyMuPDF==1.26.5
|
||||
|
||||
# 이미지 처리
|
||||
Pillow==10.3.0
|
||||
|
||||
# DB (추후 사용)
|
||||
psycopg2-binary==2.9.9
|
||||
|
||||
# 데이터 모델
|
||||
pydantic==2.7.0
|
||||
|
||||
# HTTP (Geocoding)
|
||||
requests>=2.31.0
|
||||
@ -1,378 +0,0 @@
|
||||
"""SCAT PDF 파싱 CLI 도구.
|
||||
|
||||
사용법:
|
||||
python run.py <pdf_path> # 단일 PDF 파싱
|
||||
python run.py <directory_path> # 배치 파싱
|
||||
python run.py --load-json output/ --geocode # JSON에 좌표 추가
|
||||
python run.py --load-json output/ --save # JSON → DB 저장
|
||||
python run.py --load-json output/ --save --dry-run # DB 저장 미리보기
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import io
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
|
||||
# Windows cp949 대응
|
||||
if sys.platform == 'win32':
|
||||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
|
||||
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
|
||||
|
||||
import fitz
|
||||
|
||||
from pdf_parser import parse_pdf
|
||||
from pdf_parser_b import parse_pdf_b
|
||||
from models import CoastalSection, SensitiveItem
|
||||
|
||||
OUTPUT_DIR = Path(__file__).parent / 'output'
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# PDF 형식 감지
|
||||
# ---------------------------------------------------------------------------
|
||||
def detect_pdf_type(pdf_path: Path) -> str:
|
||||
"""PDF 형식 감지. 'A'(해안사전평가정보집) 또는 'B'(방제정보집) 반환."""
|
||||
doc = fitz.open(str(pdf_path))
|
||||
for i in range(min(30, doc.page_count)):
|
||||
text = doc[i].get_text('text')
|
||||
if '식별자' in text and '코드명' in text:
|
||||
doc.close()
|
||||
return 'B'
|
||||
doc.close()
|
||||
return 'A'
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# PDF 파싱
|
||||
# ---------------------------------------------------------------------------
|
||||
def process_pdf(pdf_path: Path) -> dict:
|
||||
"""단일 PDF를 파싱하고 JSON 파일로 저장한다."""
|
||||
pdf_type = detect_pdf_type(pdf_path)
|
||||
if pdf_type == 'B':
|
||||
result = parse_pdf_b(str(pdf_path))
|
||||
else:
|
||||
result = parse_pdf(str(pdf_path))
|
||||
data = result.model_dump()
|
||||
|
||||
for s in data['sections']:
|
||||
s.pop('photos', None)
|
||||
|
||||
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
out_path = OUTPUT_DIR / f'{pdf_path.stem}.json'
|
||||
with open(out_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||
|
||||
return {
|
||||
'file': pdf_path.name,
|
||||
'output': str(out_path),
|
||||
'zone_name': result.zone_name,
|
||||
'jurisdiction': result.jurisdiction,
|
||||
'total_sections': result.total_sections,
|
||||
'skipped_pages': result.skipped_pages,
|
||||
}
|
||||
|
||||
|
||||
def run_parse(target: Path):
|
||||
"""PDF 파싱 실행."""
|
||||
if target.is_file() and target.suffix.lower() == '.pdf':
|
||||
pdf_files = [target]
|
||||
elif target.is_dir():
|
||||
pdf_files = sorted(target.glob('*.pdf'))
|
||||
if not pdf_files:
|
||||
print(f'PDF 파일을 찾을 수 없습니다: {target}')
|
||||
sys.exit(1)
|
||||
print(f'{len(pdf_files)}개 PDF 발견\n')
|
||||
else:
|
||||
print(f'유효하지 않은 경로: {target}')
|
||||
sys.exit(1)
|
||||
|
||||
results = []
|
||||
for i, pdf in enumerate(pdf_files, 1):
|
||||
pdf_type = detect_pdf_type(pdf)
|
||||
print(f'[{i}/{len(pdf_files)}] {pdf.name} (Type {pdf_type}) 파싱 중...')
|
||||
try:
|
||||
info = process_pdf(pdf)
|
||||
results.append(info)
|
||||
print(f' -> {info["total_sections"]}개 구간 | {info["zone_name"]} | {info["jurisdiction"]}')
|
||||
print(f' -> 저장: {info["output"]}')
|
||||
except Exception as e:
|
||||
print(f' -> 오류: {e}')
|
||||
results.append({'file': pdf.name, 'error': str(e)})
|
||||
|
||||
if len(results) > 1:
|
||||
print(f'\n=== 요약 ===')
|
||||
success = [r for r in results if 'error' not in r]
|
||||
failed = [r for r in results if 'error' in r]
|
||||
total_sections = sum(r['total_sections'] for r in success)
|
||||
print(f'성공: {len(success)}개 / 실패: {len(failed)}개 / 총 구간: {total_sections}개')
|
||||
if failed:
|
||||
print(f'실패 파일: {", ".join(r["file"] for r in failed)}')
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# JSON → DB 저장
|
||||
# ---------------------------------------------------------------------------
|
||||
def _extract_zone_cd(sect_cd: str) -> str:
|
||||
"""sect_cd에서 zone_cd 추출 (영문 접두사).
|
||||
|
||||
Type A: SSDD-1 → SSDD (하이픈 앞 영문)
|
||||
Type B: BSBB-1-M-E-S-N → BSBB (첫 하이픈 앞 영문)
|
||||
"""
|
||||
m = re.match(r'^([A-Z]{2,})', sect_cd)
|
||||
return m.group(1) if m else sect_cd
|
||||
|
||||
|
||||
def _extract_jrsd_short(jurisdiction: str) -> str:
|
||||
"""관할 기관명에서 짧은 이름 추출. 예: '보령 해양경비안전서' → '보령'"""
|
||||
if not jurisdiction:
|
||||
return ''
|
||||
return jurisdiction.split()[0] if ' ' in jurisdiction else jurisdiction
|
||||
|
||||
|
||||
def _dict_to_section(d: dict) -> CoastalSection:
|
||||
"""JSON dict → CoastalSection 모델 변환."""
|
||||
sensitive = [SensitiveItem(**item) for item in (d.get('sensitive_info') or [])]
|
||||
return CoastalSection(
|
||||
section_number=d.get('section_number', 0),
|
||||
sect_nm=d.get('sect_nm', ''),
|
||||
sect_cd=d.get('sect_cd', ''),
|
||||
esi_cd=d.get('esi_cd'),
|
||||
esi_num=d.get('esi_num'),
|
||||
shore_tp=d.get('shore_tp'),
|
||||
cst_tp_cd=d.get('cst_tp_cd'),
|
||||
len_m=d.get('len_m'),
|
||||
width_m=d.get('width_m'),
|
||||
lat=d.get('lat'),
|
||||
lng=d.get('lng'),
|
||||
access_dc=d.get('access_dc'),
|
||||
access_pt=d.get('access_pt'),
|
||||
sensitive_info=sensitive,
|
||||
cleanup_methods=d.get('cleanup_methods', []),
|
||||
end_criteria=d.get('end_criteria', []),
|
||||
notes=d.get('notes', []),
|
||||
)
|
||||
|
||||
|
||||
def load_json_files(json_dir: Path) -> list[dict]:
|
||||
"""JSON 디렉토리에서 모든 파싱 결과를 로드한다."""
|
||||
all_data = []
|
||||
for f in sorted(json_dir.glob('*.json')):
|
||||
with open(f, encoding='utf-8') as fp:
|
||||
data = json.load(fp)
|
||||
if data.get('total_sections', 0) > 0:
|
||||
all_data.append(data)
|
||||
return all_data
|
||||
|
||||
|
||||
def group_by_zone(all_data: list[dict]) -> dict:
|
||||
"""파싱 결과를 zone_cd로 그룹핑한다.
|
||||
|
||||
Returns:
|
||||
{zone_cd: {
|
||||
'zone_nm': str,
|
||||
'jrsd_nm': str,
|
||||
'sections': [dict, ...]
|
||||
}}
|
||||
"""
|
||||
zones = defaultdict(lambda: {'zone_nm': '', 'jrsd_nm': '', 'sections': []})
|
||||
|
||||
for data in all_data:
|
||||
zone_name = data.get('zone_name', '')
|
||||
jrsd_nm = _extract_jrsd_short(data.get('jurisdiction', ''))
|
||||
|
||||
for sect in data['sections']:
|
||||
zone_cd = _extract_zone_cd(sect['sect_cd'])
|
||||
zone = zones[zone_cd]
|
||||
if not zone['zone_nm']:
|
||||
zone['zone_nm'] = zone_name
|
||||
if not zone['jrsd_nm']:
|
||||
zone['jrsd_nm'] = jrsd_nm
|
||||
zone['sections'].append(sect)
|
||||
|
||||
return dict(zones)
|
||||
|
||||
|
||||
def run_save(json_dir: Path, dry_run: bool = False):
|
||||
"""JSON 파싱 결과를 DB에 저장한다."""
|
||||
all_data = load_json_files(json_dir)
|
||||
if not all_data:
|
||||
print(f'유효한 JSON 파일을 찾을 수 없습니다: {json_dir}')
|
||||
sys.exit(1)
|
||||
|
||||
zones = group_by_zone(all_data)
|
||||
total_sections = sum(len(z['sections']) for z in zones.values())
|
||||
|
||||
print(f'=== DB 저장 {"미리보기" if dry_run else "시작"} ===')
|
||||
print(f'총 {len(zones)}개 zone, {total_sections}개 구간\n')
|
||||
|
||||
for zone_cd, zone_info in sorted(zones.items()):
|
||||
sect_count = len(zone_info['sections'])
|
||||
print(f' {zone_cd:8s} | {zone_info["zone_nm"]:20s} | {zone_info["jrsd_nm"]:8s} | {sect_count}개 구간')
|
||||
|
||||
if dry_run:
|
||||
print(f'\n(dry-run 모드 — DB에 저장하지 않음)')
|
||||
return
|
||||
|
||||
# 실제 DB 저장
|
||||
from db import ensure_zone, upsert_section, update_zone_sect_count, update_zone_center, close_pool
|
||||
|
||||
saved_zones = 0
|
||||
saved_sections = 0
|
||||
|
||||
try:
|
||||
for zone_cd, zone_info in sorted(zones.items()):
|
||||
zone_sn = ensure_zone(zone_cd, zone_info['zone_nm'], zone_info['jrsd_nm'])
|
||||
saved_zones += 1
|
||||
|
||||
for sect_dict in zone_info['sections']:
|
||||
section = _dict_to_section(sect_dict)
|
||||
upsert_section(zone_sn, section)
|
||||
saved_sections += 1
|
||||
|
||||
update_zone_sect_count(zone_sn)
|
||||
update_zone_center(zone_sn)
|
||||
|
||||
print(f'\n=== 완료 ===')
|
||||
print(f'{saved_zones}개 zone, {saved_sections}개 구간 저장 완료')
|
||||
except Exception as e:
|
||||
print(f'\n오류 발생: {e}')
|
||||
print(f'저장 진행: {saved_zones}개 zone, {saved_sections}개 구간까지 완료')
|
||||
raise
|
||||
finally:
|
||||
close_pool()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Geocoding
|
||||
# ---------------------------------------------------------------------------
|
||||
def run_geocode(json_dir: Path):
|
||||
"""JSON 파싱 결과에 좌표를 추가한다."""
|
||||
from geocoder import geocode_sections, load_cache, save_cache
|
||||
|
||||
load_cache()
|
||||
|
||||
json_files = sorted(json_dir.glob('*.json'))
|
||||
json_files = [f for f in json_files if not f.name.startswith('.')]
|
||||
if not json_files:
|
||||
print(f'JSON 파일을 찾을 수 없습니다: {json_dir}')
|
||||
sys.exit(1)
|
||||
|
||||
print(f'=== Geocoding 시작 ({len(json_files)}개 JSON) ===\n')
|
||||
|
||||
total_success = 0
|
||||
total_fail = 0
|
||||
|
||||
for i, f in enumerate(json_files, 1):
|
||||
with open(f, encoding='utf-8') as fp:
|
||||
data = json.load(fp)
|
||||
|
||||
sections = data.get('sections', [])
|
||||
if not sections:
|
||||
continue
|
||||
|
||||
zone_name = data.get('zone_name', '')
|
||||
print(f'[{i}/{len(json_files)}] {f.name} ({len(sections)}개 구간)...')
|
||||
|
||||
success, fail = geocode_sections(sections, zone_name)
|
||||
total_success += success
|
||||
total_fail += fail
|
||||
|
||||
# 좌표가 있는 구간 수
|
||||
with_coords = sum(1 for s in sections if s.get('lat'))
|
||||
print(f' -> 좌표: {with_coords}/{len(sections)}')
|
||||
|
||||
# JSON 업데이트 저장
|
||||
with open(f, 'w', encoding='utf-8') as fp:
|
||||
json.dump(data, fp, ensure_ascii=False, indent=2)
|
||||
|
||||
save_cache()
|
||||
|
||||
print(f'\n=== Geocoding 완료 ===')
|
||||
print(f'성공: {total_success} / 실패: {total_fail}')
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 이미지 추출
|
||||
# ---------------------------------------------------------------------------
|
||||
def run_extract_images(target: Path):
|
||||
"""PDF에서 해안사진을 추출하여 scat-photos/에 저장."""
|
||||
from image_extractor import extract_images_from_pdf
|
||||
|
||||
if target.is_file() and target.suffix.lower() == '.pdf':
|
||||
pdf_files = [target]
|
||||
elif target.is_dir():
|
||||
pdf_files = sorted(target.glob('*.pdf'))
|
||||
if not pdf_files:
|
||||
print(f'PDF 파일을 찾을 수 없습니다: {target}')
|
||||
sys.exit(1)
|
||||
print(f'{len(pdf_files)}개 PDF 발견\n')
|
||||
else:
|
||||
print(f'유효하지 않은 경로: {target}')
|
||||
sys.exit(1)
|
||||
|
||||
total = 0
|
||||
for i, pdf in enumerate(pdf_files, 1):
|
||||
pdf_type = detect_pdf_type(pdf)
|
||||
print(f'[{i}/{len(pdf_files)}] {pdf.name} (Type {pdf_type}) 이미지 추출 중...')
|
||||
try:
|
||||
count = extract_images_from_pdf(pdf, pdf_type=pdf_type)
|
||||
total += count
|
||||
print(f' -> {count}개 이미지 저장')
|
||||
except Exception as e:
|
||||
print(f' -> 오류: {e}')
|
||||
|
||||
print(f'\n=== 이미지 추출 완료: 총 {total}개 ===')
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='SCAT PDF 파싱 CLI 도구')
|
||||
parser.add_argument('target', nargs='?', help='PDF 파일 또는 디렉토리 경로')
|
||||
parser.add_argument('--save', action='store_true', help='파싱 결과를 DB에 저장')
|
||||
parser.add_argument('--load-json', type=Path, help='이미 파싱된 JSON 디렉토리에서 로드')
|
||||
parser.add_argument('--geocode', action='store_true', help='JSON에 Kakao Geocoding으로 좌표 추가')
|
||||
parser.add_argument('--extract-images', action='store_true', help='PDF에서 해안사진 추출 → scat-photos/')
|
||||
parser.add_argument('--dry-run', action='store_true', help='DB 저장 미리보기 (실제 저장 안 함)')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# JSON 로드 모드
|
||||
if args.load_json:
|
||||
if args.geocode:
|
||||
run_geocode(args.load_json)
|
||||
if args.save:
|
||||
run_save(args.load_json, dry_run=args.dry_run)
|
||||
if not args.geocode and not args.save:
|
||||
print('--load-json은 --geocode 또는 --save와 함께 사용해야 합니다.')
|
||||
sys.exit(1)
|
||||
return
|
||||
|
||||
# PDF 파싱 모드
|
||||
if not args.target:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
target = Path(args.target)
|
||||
|
||||
# 이미지 추출 모드
|
||||
if args.extract_images:
|
||||
run_extract_images(target)
|
||||
return
|
||||
|
||||
run_parse(target)
|
||||
|
||||
# 파싱 후 바로 DB 저장
|
||||
if args.save:
|
||||
print('\n')
|
||||
run_save(OUTPUT_DIR, dry_run=args.dry_run)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
불러오는 중...
Reference in New Issue
Block a user