Compare commits
No commits in common. "84fa49189c6c37b0807745d699065897103cbb33" and "5f622c7520de274b3ed28eec1d4adcb7618c7ee7" have entirely different histories.
84fa49189c
...
5f622c7520
@ -4,11 +4,6 @@
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [2026-03-20.2]
|
||||
|
||||
### 변경
|
||||
- prediction/scat 파이프라인 제거 + SCAT/사고관리 UI 수정
|
||||
|
||||
## [2026-03-20]
|
||||
|
||||
### 추가
|
||||
|
||||
@ -18,7 +18,7 @@ export function TopBar({ activeTab, onTabChange }: TopBarProps) {
|
||||
const { menuConfig, isLoaded } = useMenuStore()
|
||||
const { mapToggles, toggleMap, mapTypes, measureMode, setMeasureMode } = useMapStore()
|
||||
|
||||
const MAP_TABS = new Set<string>(['prediction', 'hns', 'scat', 'incidents'])
|
||||
const MAP_TABS = new Set<string>(['prediction', 'hns', 'scat', 'weather'])
|
||||
const isMapTab = MAP_TABS.has(activeTab)
|
||||
|
||||
const handleToggleMeasure = (mode: 'distance' | 'area') => {
|
||||
|
||||
@ -12,10 +12,6 @@ import { fetchIncidents } from '../services/incidentsApi'
|
||||
import type { IncidentCompat } from '../services/incidentsApi'
|
||||
import { DischargeZonePanel } from './DischargeZonePanel'
|
||||
import { estimateDistanceFromCoast, getDischargeZoneLines } from '../utils/dischargeZoneData'
|
||||
import { useMapStore } from '@common/store/mapStore'
|
||||
import { useMeasureTool } from '@common/hooks/useMeasureTool'
|
||||
import { buildMeasureLayers } from '@common/components/map/measureLayers'
|
||||
import { MeasureOverlay } from '@common/components/map/MeasureOverlay'
|
||||
|
||||
// ── CartoDB Positron 베이스맵 (밝은 테마) ────────────────
|
||||
const BASE_STYLE: StyleSpecification = {
|
||||
@ -101,11 +97,6 @@ export function IncidentsView() {
|
||||
const [dischargeMode, setDischargeMode] = useState(false)
|
||||
const [dischargeInfo, setDischargeInfo] = useState<{ lat: number; lon: number; distanceNm: number } | null>(null)
|
||||
|
||||
// Measure tool
|
||||
const { handleMeasureClick, measureMode } = useMeasureTool()
|
||||
const measureInProgress = useMapStore((s) => s.measureInProgress)
|
||||
const measurements = useMapStore((s) => s.measurements)
|
||||
|
||||
// Analysis view mode
|
||||
const [viewMode, setViewMode] = useState<ViewMode>('overlay')
|
||||
const [analysisActive, setAnalysisActive] = useState(false)
|
||||
@ -259,15 +250,10 @@ export function IncidentsView() {
|
||||
)
|
||||
}, [dischargeMode])
|
||||
|
||||
const measureDeckLayers = useMemo(
|
||||
() => buildMeasureLayers(measureInProgress, measureMode, measurements),
|
||||
[measureInProgress, measureMode, measurements],
|
||||
)
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const deckLayers: any[] = useMemo(
|
||||
() => [incidentLayer, vesselIconLayer, ...dischargeZoneLayers, ...measureDeckLayers],
|
||||
[incidentLayer, vesselIconLayer, dischargeZoneLayers, measureDeckLayers],
|
||||
() => [incidentLayer, vesselIconLayer, ...dischargeZoneLayers],
|
||||
[incidentLayer, vesselIconLayer, dischargeZoneLayers],
|
||||
)
|
||||
|
||||
return (
|
||||
@ -364,10 +350,6 @@ export function IncidentsView() {
|
||||
style={{ width: '100%', height: '100%', background: '#f0f0f0' }}
|
||||
attributionControl={false}
|
||||
onClick={(e) => {
|
||||
if (measureMode !== null && e.lngLat) {
|
||||
handleMeasureClick(e.lngLat.lng, e.lngLat.lat)
|
||||
return
|
||||
}
|
||||
if (dischargeMode && e.lngLat) {
|
||||
const lat = e.lngLat.lat
|
||||
const lon = e.lngLat.lng
|
||||
@ -375,10 +357,9 @@ export function IncidentsView() {
|
||||
setDischargeInfo({ lat, lon, distanceNm })
|
||||
}
|
||||
}}
|
||||
cursor={(measureMode !== null || dischargeMode) ? 'crosshair' : undefined}
|
||||
cursor={dischargeMode ? 'crosshair' : undefined}
|
||||
>
|
||||
<DeckGLOverlay layers={deckLayers} />
|
||||
<MeasureOverlay />
|
||||
|
||||
{/* 사고 팝업 */}
|
||||
{incidentPopup && (
|
||||
|
||||
@ -73,7 +73,7 @@ export default function ScatRightPanel({ detail, loading, onOpenReport, onNewSur
|
||||
) : detail ? (
|
||||
<>
|
||||
{activeTab === 0 && <DetailTab detail={detail} />}
|
||||
{activeTab === 1 && <PhotoTab detail={detail} />}
|
||||
{activeTab === 1 && <PhotoTab />}
|
||||
{activeTab === 2 && <CleanupTab detail={detail} />}
|
||||
</>
|
||||
) : null}
|
||||
@ -137,33 +137,16 @@ function DetailTab({ detail }: { detail: ScatDetail }) {
|
||||
}
|
||||
|
||||
/* ═══ 탭 1: 현장 사진 ═══ */
|
||||
function PhotoTab({ detail }: { detail: ScatDetail }) {
|
||||
const [imgError, setImgError] = useState(false);
|
||||
const imgSrc = `/scat-img/${detail.code}-1.png`;
|
||||
|
||||
if (imgError) {
|
||||
return (
|
||||
<div className="flex flex-col items-center justify-center py-10 gap-3">
|
||||
<div className="text-3xl">📷</div>
|
||||
<div className="text-[11px] text-text-3 text-center leading-relaxed">
|
||||
해당 구간의 사진이<br />등록되지 않았습니다.
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
function PhotoTab() {
|
||||
return (
|
||||
<div className="flex flex-col gap-2">
|
||||
<div className="rounded-md overflow-hidden border border-border">
|
||||
<img
|
||||
src={imgSrc}
|
||||
alt={`${detail.name} 해안 사진`}
|
||||
className="w-full h-auto object-cover"
|
||||
onError={() => setImgError(true)}
|
||||
/>
|
||||
<div className="flex flex-col items-center justify-center py-10 gap-3">
|
||||
<div className="text-3xl">📷</div>
|
||||
<div className="text-[11px] text-text-3 text-center leading-relaxed">
|
||||
현장 사진 기능은<br />추후 업데이트 예정입니다.
|
||||
</div>
|
||||
<div className="text-[10px] text-text-3 text-center">
|
||||
{detail.code} 해안 조사 사진
|
||||
<div className="px-3 py-1.5 rounded text-[10px] text-text-3"
|
||||
style={{ background: 'rgba(6,182,212,.06)', border: '1px solid rgba(6,182,212,.15)' }}>
|
||||
사진 업로드 API 연동 예정
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
|
||||
29
prediction/scat/config.py
Normal file
29
prediction/scat/config.py
Normal file
@ -0,0 +1,29 @@
|
||||
"""SCAT PDF 파싱 설정."""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
BASE_DIR = Path(__file__).parent
|
||||
|
||||
# 이미지 저장 경로
|
||||
IMAGE_OUTPUT_DIR = Path(os.getenv('SCAT_IMAGE_DIR', str(BASE_DIR / 'scat_images')))
|
||||
|
||||
# 파싱 결과 저장 경로
|
||||
OUTPUT_DIR = Path(os.getenv('SCAT_OUTPUT_DIR', str(BASE_DIR / 'output')))
|
||||
|
||||
# DB 설정 (wingDb.ts 기본값과 동일, 추후 사용)
|
||||
DB_HOST = os.getenv('DB_HOST', 'localhost')
|
||||
DB_PORT = int(os.getenv('DB_PORT', '5432'))
|
||||
DB_NAME = os.getenv('DB_NAME', 'wing')
|
||||
DB_USER = os.getenv('DB_USER', 'postgres')
|
||||
DB_PASSWORD = os.getenv('DB_PASSWORD', 'dano2030')
|
||||
DB_SCHEMA = 'wing'
|
||||
|
||||
# 해안 사진 저장 경로 (frontend public)
|
||||
SCAT_PHOTOS_DIR = Path(os.getenv(
|
||||
'SCAT_PHOTOS_DIR',
|
||||
str(BASE_DIR.parent.parent / 'frontend' / 'public' / 'scat-photos'),
|
||||
))
|
||||
|
||||
# Kakao Local API (Geocoding)
|
||||
KAKAO_REST_KEY = os.getenv('KAKAO_REST_KEY', '5e5dc9a10eb86b88d5106e508ec4d236')
|
||||
180
prediction/scat/db.py
Normal file
180
prediction/scat/db.py
Normal file
@ -0,0 +1,180 @@
|
||||
"""PostgreSQL 연결 및 SCAT 데이터 upsert."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Optional
|
||||
|
||||
import psycopg2
|
||||
from psycopg2 import pool
|
||||
|
||||
import config
|
||||
from models import CoastalSection
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 커넥션 풀 (싱글턴)
|
||||
# ---------------------------------------------------------------------------
|
||||
_pool: pool.ThreadedConnectionPool | None = None
|
||||
|
||||
|
||||
def get_pool() -> pool.ThreadedConnectionPool:
|
||||
global _pool
|
||||
if _pool is None:
|
||||
_pool = pool.ThreadedConnectionPool(
|
||||
minconn=1,
|
||||
maxconn=5,
|
||||
host=config.DB_HOST,
|
||||
port=config.DB_PORT,
|
||||
dbname=config.DB_NAME,
|
||||
user=config.DB_USER,
|
||||
password=config.DB_PASSWORD,
|
||||
options=f'-c search_path={config.DB_SCHEMA},public',
|
||||
)
|
||||
return _pool
|
||||
|
||||
|
||||
def get_conn():
|
||||
return get_pool().getconn()
|
||||
|
||||
|
||||
def put_conn(conn):
|
||||
get_pool().putconn(conn)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Zone 관리
|
||||
# ---------------------------------------------------------------------------
|
||||
def ensure_zone(zone_cd: str, zone_nm: str, jrsd_nm: str) -> int:
|
||||
"""구역이 없으면 생성, 있으면 SN 반환."""
|
||||
conn = get_conn()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
'SELECT cst_srvy_zone_sn FROM cst_srvy_zone WHERE zone_cd = %s',
|
||||
(zone_cd,),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
if row:
|
||||
return row[0]
|
||||
|
||||
cur.execute(
|
||||
'''INSERT INTO cst_srvy_zone (zone_cd, zone_nm, jrsd_nm, sect_cnt)
|
||||
VALUES (%s, %s, %s, 0)
|
||||
RETURNING cst_srvy_zone_sn''',
|
||||
(zone_cd, zone_nm, jrsd_nm),
|
||||
)
|
||||
sn = cur.fetchone()[0]
|
||||
conn.commit()
|
||||
return sn
|
||||
finally:
|
||||
put_conn(conn)
|
||||
|
||||
|
||||
def update_zone_sect_count(zone_sn: int):
|
||||
"""구역의 구간 수를 갱신."""
|
||||
conn = get_conn()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
'''UPDATE cst_srvy_zone
|
||||
SET sect_cnt = (SELECT count(*) FROM cst_sect WHERE cst_srvy_zone_sn = %s)
|
||||
WHERE cst_srvy_zone_sn = %s''',
|
||||
(zone_sn, zone_sn),
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
put_conn(conn)
|
||||
|
||||
|
||||
def update_zone_center(zone_sn: int):
|
||||
"""zone의 sections 좌표 평균으로 LAT_CENTER/LNG_CENTER 갱신."""
|
||||
conn = get_conn()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
'''UPDATE cst_srvy_zone SET
|
||||
lat_center = sub.avg_lat,
|
||||
lng_center = sub.avg_lng
|
||||
FROM (
|
||||
SELECT AVG(lat) as avg_lat, AVG(lng) as avg_lng
|
||||
FROM cst_sect
|
||||
WHERE cst_srvy_zone_sn = %s AND lat IS NOT NULL
|
||||
) sub
|
||||
WHERE cst_srvy_zone_sn = %s''',
|
||||
(zone_sn, zone_sn),
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
put_conn(conn)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Section upsert
|
||||
# ---------------------------------------------------------------------------
|
||||
def upsert_section(zone_sn: int, section: CoastalSection) -> int:
|
||||
"""구간 INSERT 또는 UPDATE (SECT_CD 기준 ON CONFLICT)."""
|
||||
conn = get_conn()
|
||||
try:
|
||||
sensitive = json.dumps(
|
||||
[item.model_dump() for item in section.sensitive_info],
|
||||
ensure_ascii=False,
|
||||
)
|
||||
cleanup = json.dumps(section.cleanup_methods, ensure_ascii=False)
|
||||
end_crit = json.dumps(section.end_criteria, ensure_ascii=False)
|
||||
notes = json.dumps(section.notes, ensure_ascii=False)
|
||||
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
'''INSERT INTO cst_sect (
|
||||
cst_srvy_zone_sn, sect_cd, sect_nm,
|
||||
cst_tp_cd, esi_cd, esi_num, shore_tp, len_m,
|
||||
lat, lng,
|
||||
access_dc, access_pt,
|
||||
sensitive_info, cleanup_methods, end_criteria, notes,
|
||||
srvy_stts_cd
|
||||
) VALUES (
|
||||
%s, %s, %s,
|
||||
%s, %s, %s, %s, %s,
|
||||
%s, %s,
|
||||
%s, %s,
|
||||
%s::jsonb, %s::jsonb, %s::jsonb, %s::jsonb,
|
||||
'미조사'
|
||||
)
|
||||
ON CONFLICT (sect_cd) DO UPDATE SET
|
||||
sect_nm = EXCLUDED.sect_nm,
|
||||
cst_tp_cd = EXCLUDED.cst_tp_cd,
|
||||
esi_cd = EXCLUDED.esi_cd,
|
||||
esi_num = EXCLUDED.esi_num,
|
||||
shore_tp = EXCLUDED.shore_tp,
|
||||
len_m = EXCLUDED.len_m,
|
||||
lat = EXCLUDED.lat,
|
||||
lng = EXCLUDED.lng,
|
||||
access_dc = EXCLUDED.access_dc,
|
||||
access_pt = EXCLUDED.access_pt,
|
||||
sensitive_info = EXCLUDED.sensitive_info,
|
||||
cleanup_methods = EXCLUDED.cleanup_methods,
|
||||
end_criteria = EXCLUDED.end_criteria,
|
||||
notes = EXCLUDED.notes
|
||||
RETURNING cst_sect_sn''',
|
||||
(
|
||||
zone_sn, section.sect_cd, section.sect_nm,
|
||||
section.cst_tp_cd, section.esi_cd, section.esi_num,
|
||||
section.shore_tp, section.len_m,
|
||||
section.lat, section.lng,
|
||||
section.access_dc, section.access_pt,
|
||||
sensitive, cleanup, end_crit, notes,
|
||||
),
|
||||
)
|
||||
sn = cur.fetchone()[0]
|
||||
conn.commit()
|
||||
return sn
|
||||
finally:
|
||||
put_conn(conn)
|
||||
|
||||
|
||||
def close_pool():
|
||||
global _pool
|
||||
if _pool:
|
||||
_pool.closeall()
|
||||
_pool = None
|
||||
65
prediction/scat/esi_mapper.py
Normal file
65
prediction/scat/esi_mapper.py
Normal file
@ -0,0 +1,65 @@
|
||||
"""cst_tp_cd(해안 구성) → ESI 등급 매핑.
|
||||
|
||||
ESI(Environmental Sensitivity Index) 등급은 해안의 물리적 특성에 따라 분류된다.
|
||||
매핑 기준: NOAA(2010) + 성 등(2003) ESI 등급 표.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from typing import Optional, Tuple
|
||||
|
||||
# 키워드 → (esi_cd, esi_num) 매핑 (구체적 키워드 우선)
|
||||
_ESI_RULES: list[tuple[list[str], str, int]] = [
|
||||
# 8B: 습지
|
||||
(['염습지', '습지'], '8B', 8),
|
||||
# 8A: 갯벌/점토질
|
||||
(['갯벌', '점토질', '점토'], '8A', 8),
|
||||
# 7: 반폐쇄형
|
||||
(['반폐쇄', '반 폐쇄'], '7', 7),
|
||||
# 6B: 투과성 인공호안/사석
|
||||
(['투과성 인공호안', '투과성 사석', '투과성 인공해안', '투과성 경사식'], '6B', 6),
|
||||
# 6A: 자갈/바위
|
||||
(['자갈', '왕자갈', '바위', '갈('], '6A', 6),
|
||||
# 5: 모래+자갈 혼합
|
||||
(['모래자갈', '모래+자갈', '혼합'], '5', 5),
|
||||
# 4: 굵은 모래
|
||||
(['굵은 모래', '조립질 모래'], '4', 4),
|
||||
# 3: 세립질 모래/모래(기본)
|
||||
(['세립질 모래', '세립질', '모래'], '3', 3),
|
||||
# 2: 수평암반/비투과성
|
||||
(['수평암반', '수평호안', '기반암', '비투과성 기질', '비투과성 인공호안',
|
||||
'비투과성 인공해안', '노출기반암', '수암반', '콘크리트'], '2', 2),
|
||||
# 1: 수직암반/인공구조물/계류안벽
|
||||
(['수직암반', '인공구조물', '직립호안', '절벽', '수직호안', '수진호안',
|
||||
'수직 계류', '인공호안', '계류 안벽', '안벽'], '1', 1),
|
||||
]
|
||||
|
||||
# ESI 코드 → 숫자 변환
|
||||
_ESI_NUM_RE = re.compile(r'^(\d+)')
|
||||
|
||||
|
||||
def map_esi(cst_tp_cd: Optional[str]) -> Tuple[Optional[str], Optional[int]]:
|
||||
"""cst_tp_cd(해안 구성 키워드)에서 ESI 등급을 매핑한다.
|
||||
|
||||
Returns:
|
||||
(esi_cd, esi_num) 튜플. 매핑 실패 시 (None, None).
|
||||
"""
|
||||
if not cst_tp_cd:
|
||||
return None, None
|
||||
|
||||
text = cst_tp_cd.strip()
|
||||
for keywords, esi_cd, esi_num in _ESI_RULES:
|
||||
for kw in keywords:
|
||||
if kw in text:
|
||||
return esi_cd, esi_num
|
||||
|
||||
return None, None
|
||||
|
||||
|
||||
def parse_esi_cd(esi_str: str) -> Tuple[str, int]:
|
||||
"""ESI 등급 문자열(e.g. '8A', '6B', '2')에서 (esi_cd, esi_num) 추출."""
|
||||
esi_cd = esi_str.strip()
|
||||
m = _ESI_NUM_RE.match(esi_cd)
|
||||
esi_num = int(m.group(1)) if m else 0
|
||||
return esi_cd, esi_num
|
||||
266
prediction/scat/geocoder.py
Normal file
266
prediction/scat/geocoder.py
Normal file
@ -0,0 +1,266 @@
|
||||
"""Kakao Local API 기반 Geocoding + 오프셋 분산.
|
||||
|
||||
환경변수 KAKAO_REST_KEY 필요.
|
||||
- access_pt 있는 구간: 주소/키워드 검색으로 정확한 좌표
|
||||
- access_pt 없는 구간: sect_nm 기반 대표 좌표 + 나선형 오프셋
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import math
|
||||
import re
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Optional, Tuple
|
||||
|
||||
import requests
|
||||
|
||||
import config
|
||||
|
||||
_HEADERS = {
|
||||
'Authorization': f'KakaoAK {config.KAKAO_REST_KEY}',
|
||||
}
|
||||
|
||||
# Kakao API endpoints
|
||||
_ADDRESS_URL = 'https://dapi.kakao.com/v2/local/search/address.json'
|
||||
_KEYWORD_URL = 'https://dapi.kakao.com/v2/local/search/keyword.json'
|
||||
|
||||
# 캐시: query → (lat, lng) or None
|
||||
_cache: dict[str, Optional[Tuple[float, float]]] = {}
|
||||
|
||||
# API 호출 간격 (초)
|
||||
_RATE_LIMIT = 0.05
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Kakao API 호출
|
||||
# ---------------------------------------------------------------------------
|
||||
def _search_address(query: str) -> Optional[Tuple[float, float]]:
|
||||
"""Kakao 주소 검색 API."""
|
||||
try:
|
||||
resp = requests.get(
|
||||
_ADDRESS_URL,
|
||||
params={'query': query},
|
||||
headers=_HEADERS,
|
||||
timeout=5,
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
return None
|
||||
data = resp.json()
|
||||
docs = data.get('documents', [])
|
||||
if docs:
|
||||
return float(docs[0]['y']), float(docs[0]['x'])
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def _search_keyword(query: str) -> Optional[Tuple[float, float]]:
|
||||
"""Kakao 키워드 검색 API."""
|
||||
try:
|
||||
resp = requests.get(
|
||||
_KEYWORD_URL,
|
||||
params={'query': query},
|
||||
headers=_HEADERS,
|
||||
timeout=5,
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
return None
|
||||
data = resp.json()
|
||||
docs = data.get('documents', [])
|
||||
if docs:
|
||||
return float(docs[0]['y']), float(docs[0]['x'])
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 메인 Geocoding 함수
|
||||
# ---------------------------------------------------------------------------
|
||||
def geocode(query: str) -> Optional[Tuple[float, float]]:
|
||||
"""주소/키워드 → (lat, lng). 캐시 적용. 실패 시 None."""
|
||||
if not config.KAKAO_REST_KEY:
|
||||
return None
|
||||
|
||||
if query in _cache:
|
||||
return _cache[query]
|
||||
|
||||
time.sleep(_RATE_LIMIT)
|
||||
|
||||
# 1차: 주소 검색 (지번/도로명)
|
||||
result = _search_address(query)
|
||||
|
||||
# 2차: 키워드 검색 (장소명, 방조제, 해수욕장 등)
|
||||
if result is None:
|
||||
time.sleep(_RATE_LIMIT)
|
||||
result = _search_keyword(query)
|
||||
|
||||
_cache[query] = result
|
||||
return result
|
||||
|
||||
|
||||
def geocode_section(
|
||||
access_pt: Optional[str],
|
||||
sect_nm: str,
|
||||
zone_name: str,
|
||||
) -> Optional[Tuple[float, float]]:
|
||||
"""구간에 대한 최적 좌표 검색.
|
||||
|
||||
우선순위:
|
||||
1. access_pt (주소/지명)
|
||||
2. sect_nm에서 '인근 해안' 제거한 지역명
|
||||
3. zone_name (PDF 구역명)
|
||||
"""
|
||||
# 1. access_pt로 시도
|
||||
if access_pt:
|
||||
result = geocode(access_pt)
|
||||
if result:
|
||||
return result
|
||||
# access_pt + zone_name 조합 시도
|
||||
combined = f'{zone_name} {access_pt}' if zone_name else access_pt
|
||||
result = geocode(combined)
|
||||
if result:
|
||||
return result
|
||||
|
||||
# 2. sect_nm에서 지역명 추출
|
||||
area = _extract_area(sect_nm)
|
||||
if area:
|
||||
result = geocode(area)
|
||||
if result:
|
||||
return result
|
||||
|
||||
# 3. zone_name fallback
|
||||
if zone_name:
|
||||
result = geocode(zone_name)
|
||||
if result:
|
||||
return result
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _extract_area(sect_nm: str) -> str:
|
||||
"""sect_nm에서 geocoding용 지역명 추출.
|
||||
|
||||
예: '하동군 금남면 노량리 인근 해안' → '하동군 금남면 노량리'
|
||||
"""
|
||||
if not sect_nm:
|
||||
return ''
|
||||
# '인근 해안' 등 제거
|
||||
area = re.sub(r'\s*인근\s*해안\s*$', '', sect_nm).strip()
|
||||
# '해안' 단독 제거
|
||||
area = re.sub(r'\s*해안\s*$', '', area).strip()
|
||||
return area
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 오프셋 분산 (같은 좌표에 겹치는 구간들 분산)
|
||||
# ---------------------------------------------------------------------------
|
||||
def apply_spiral_offset(
|
||||
base_lat: float,
|
||||
base_lng: float,
|
||||
index: int,
|
||||
spacing: float = 0.0005,
|
||||
) -> Tuple[float, float]:
|
||||
"""나선형 오프셋 적용. index=0이면 원점, 1부터 나선.
|
||||
|
||||
spacing ~= 50m (위도 기준)
|
||||
"""
|
||||
if index == 0:
|
||||
return base_lat, base_lng
|
||||
|
||||
# 나선형: 각도와 반경 증가
|
||||
angle = index * 137.508 * math.pi / 180 # 황금각
|
||||
radius = spacing * math.sqrt(index)
|
||||
|
||||
lat = base_lat + radius * math.cos(angle)
|
||||
lng = base_lng + radius * math.sin(angle)
|
||||
|
||||
return round(lat, 6), round(lng, 6)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 배치 Geocoding
|
||||
# ---------------------------------------------------------------------------
|
||||
def geocode_sections(
|
||||
sections: list[dict],
|
||||
zone_name: str = '',
|
||||
) -> Tuple[int, int]:
|
||||
"""섹션 리스트에 lat/lng를 채운다.
|
||||
|
||||
Returns:
|
||||
(성공 수, 실패 수)
|
||||
"""
|
||||
# sect_nm 그룹별로 처리 (오프셋 적용)
|
||||
from collections import defaultdict
|
||||
|
||||
groups: dict[str, list[dict]] = defaultdict(list)
|
||||
for s in sections:
|
||||
groups[s.get('sect_nm', '')].append(s)
|
||||
|
||||
success = 0
|
||||
fail = 0
|
||||
|
||||
for sect_nm, group in groups.items():
|
||||
# 그룹 대표 좌표 구하기 (첫 번째 access_pt 있는 구간 또는 sect_nm)
|
||||
base_coord = None
|
||||
|
||||
# access_pt가 있는 구간에서 먼저 시도
|
||||
for s in group:
|
||||
if s.get('access_pt'):
|
||||
coord = geocode_section(s['access_pt'], sect_nm, zone_name)
|
||||
if coord:
|
||||
base_coord = coord
|
||||
break
|
||||
|
||||
# access_pt로 못 찾으면 sect_nm으로
|
||||
if base_coord is None:
|
||||
base_coord = geocode_section(None, sect_nm, zone_name)
|
||||
|
||||
if base_coord is None:
|
||||
fail += len(group)
|
||||
continue
|
||||
|
||||
# 그룹 내 구간별 좌표 할당
|
||||
# access_pt가 있는 구간은 개별 geocoding, 없으면 대표+오프셋
|
||||
offset_idx = 0
|
||||
for s in sorted(group, key=lambda x: x.get('section_number', 0)):
|
||||
if s.get('access_pt'):
|
||||
coord = geocode_section(s['access_pt'], sect_nm, zone_name)
|
||||
if coord:
|
||||
s['lat'], s['lng'] = coord
|
||||
success += 1
|
||||
continue
|
||||
|
||||
# 오프셋 적용
|
||||
lat, lng = apply_spiral_offset(base_coord[0], base_coord[1], offset_idx)
|
||||
s['lat'] = lat
|
||||
s['lng'] = lng
|
||||
offset_idx += 1
|
||||
success += 1
|
||||
|
||||
return success, fail
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 캐시 저장/로드
|
||||
# ---------------------------------------------------------------------------
|
||||
_CACHE_FILE = Path(__file__).parent / 'output' / '.geocode_cache.json'
|
||||
|
||||
|
||||
def save_cache():
|
||||
"""캐시를 파일로 저장."""
|
||||
_CACHE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
serializable = {k: list(v) if v else None for k, v in _cache.items()}
|
||||
with open(_CACHE_FILE, 'w', encoding='utf-8') as f:
|
||||
json.dump(serializable, f, ensure_ascii=False, indent=2)
|
||||
|
||||
|
||||
def load_cache():
|
||||
"""캐시 파일 로드."""
|
||||
global _cache
|
||||
if _CACHE_FILE.exists():
|
||||
with open(_CACHE_FILE, encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
_cache = {k: tuple(v) if v else None for k, v in data.items()}
|
||||
124
prediction/scat/image_extractor.py
Normal file
124
prediction/scat/image_extractor.py
Normal file
@ -0,0 +1,124 @@
|
||||
"""PDF에서 해안조사 사진을 추출하여 scat-photos 폴더에 저장.
|
||||
|
||||
Type A (해안사전평가정보집): 1페이지 = 1구간 → 가장 큰 RGB 사진
|
||||
Type B (방제정보집): 1페이지 = 2구간 → RGB 사진 2장, 순서대로 매칭
|
||||
|
||||
저장 네이밍: {sect_cd}-1.png (ScatPopup에서 참조하는 형식)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple
|
||||
|
||||
import fitz # PyMuPDF
|
||||
|
||||
from pdf_parser import is_data_page, _CODE_RE
|
||||
from pdf_parser_b import is_data_page_b
|
||||
|
||||
import config
|
||||
|
||||
|
||||
def _get_page_photos(
|
||||
doc: fitz.Document, page: fitz.Page,
|
||||
) -> List[fitz.Pixmap]:
|
||||
"""페이지에서 RGB 사진만 추출 (배경 제외). 크기 내림차순."""
|
||||
photos: List[Tuple[fitz.Pixmap, int]] = []
|
||||
for img_info in page.get_images(full=True):
|
||||
xref = img_info[0]
|
||||
pix = fitz.Pixmap(doc, xref)
|
||||
# CMYK → RGB
|
||||
if pix.n > 4:
|
||||
pix = fitz.Pixmap(fitz.csRGB, pix)
|
||||
elif pix.n == 4:
|
||||
pix = fitz.Pixmap(fitz.csRGB, pix)
|
||||
# 흑백(n=1) 또는 배경(2000px 이상) 스킵
|
||||
if pix.n < 3 or pix.width >= 2000:
|
||||
continue
|
||||
photos.append((pix, pix.width * pix.height))
|
||||
photos.sort(key=lambda x: x[1], reverse=True)
|
||||
return photos
|
||||
|
||||
|
||||
def _extract_codes_type_a(page: fitz.Page) -> List[str]:
|
||||
"""Type A 페이지에서 sect_cd 추출."""
|
||||
text = page.get_text('text')
|
||||
return _CODE_RE.findall(text)
|
||||
|
||||
|
||||
def _extract_codes_type_b(page: fitz.Page) -> List[str]:
|
||||
"""Type B 페이지에서 sect_cd 추출 (순서 유지)."""
|
||||
text = page.get_text('text')
|
||||
codes = re.findall(r'([A-Z]{4,}-\d+(?:-[A-Z]){3,})', text)
|
||||
seen = set()
|
||||
unique = []
|
||||
for c in codes:
|
||||
if c not in seen:
|
||||
seen.add(c)
|
||||
unique.append(c)
|
||||
return unique
|
||||
|
||||
|
||||
def _save_pixmap(pix: fitz.Pixmap, output_path: Path):
|
||||
"""Pixmap을 PNG로 저장."""
|
||||
if pix.alpha:
|
||||
pix = fitz.Pixmap(fitz.csRGB, pix)
|
||||
pix.save(str(output_path))
|
||||
|
||||
|
||||
def extract_images_from_pdf(
|
||||
pdf_path: str | Path,
|
||||
output_dir: str | Path | None = None,
|
||||
pdf_type: str = 'A',
|
||||
) -> int:
|
||||
"""PDF에서 데이터 페이지별 대표 사진을 추출.
|
||||
|
||||
Args:
|
||||
pdf_path: PDF 파일 경로
|
||||
output_dir: 이미지 저장 경로 (기본: config.SCAT_PHOTOS_DIR)
|
||||
pdf_type: 'A' 또는 'B'
|
||||
|
||||
Returns:
|
||||
추출된 이미지 수
|
||||
"""
|
||||
pdf_path = Path(pdf_path)
|
||||
out_dir = Path(output_dir) if output_dir else config.SCAT_PHOTOS_DIR
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
doc = fitz.open(str(pdf_path))
|
||||
saved = 0
|
||||
|
||||
for i in range(doc.page_count):
|
||||
page = doc[i]
|
||||
|
||||
if pdf_type == 'A':
|
||||
if not is_data_page(page):
|
||||
continue
|
||||
codes = _extract_codes_type_a(page)
|
||||
photos = _get_page_photos(doc, page)
|
||||
if not codes or not photos:
|
||||
continue
|
||||
# 가장 작은 RGB 이미지 = 실제 현장 사진
|
||||
pix = photos[-1][0]
|
||||
out_path = out_dir / f'{codes[0]}-1.png'
|
||||
_save_pixmap(pix, out_path)
|
||||
saved += 1
|
||||
|
||||
elif pdf_type == 'B':
|
||||
if not is_data_page_b(page):
|
||||
continue
|
||||
codes = _extract_codes_type_b(page)
|
||||
photos = _get_page_photos(doc, page)
|
||||
if not codes or not photos:
|
||||
continue
|
||||
# Type B: 사진 크기 동일, 순서대로 매칭
|
||||
for idx, code in enumerate(codes):
|
||||
if idx < len(photos):
|
||||
pix = photos[idx][0]
|
||||
out_path = out_dir / f'{code}-1.png'
|
||||
_save_pixmap(pix, out_path)
|
||||
saved += 1
|
||||
|
||||
doc.close()
|
||||
return saved
|
||||
52
prediction/scat/models.py
Normal file
52
prediction/scat/models.py
Normal file
@ -0,0 +1,52 @@
|
||||
"""SCAT PDF 파싱 데이터 모델."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import List, Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class SensitiveItem(BaseModel):
|
||||
"""민감자원 항목."""
|
||||
t: str # 유형 (사회경제적 / 생물자원)
|
||||
v: str # 내용
|
||||
|
||||
|
||||
class PhotoInfo(BaseModel):
|
||||
"""추출된 사진 정보."""
|
||||
filename: str
|
||||
page: int
|
||||
index: int
|
||||
|
||||
|
||||
class CoastalSection(BaseModel):
|
||||
"""해안 구간 1건 (PDF 1페이지)."""
|
||||
section_number: int = 0
|
||||
sect_nm: str = '' # 지역명
|
||||
sect_cd: str = '' # 코드명 (SSDD-1)
|
||||
esi_cd: Optional[str] = None # ESI 등급 (1, 2, 3, 6A, 6B, 8A, 8B 등)
|
||||
esi_num: Optional[int] = None # ESI 숫자 (1~8)
|
||||
shore_tp: Optional[str] = None # 해안 형태 (폐쇄형/개방형)
|
||||
cst_tp_cd: Optional[str] = None # 해안 구성 (투과성 인공호안, 모래 등)
|
||||
len_m: Optional[float] = None # 해안길이 (m)
|
||||
width_m: Optional[float] = None # 해안 폭 (m, 선택)
|
||||
lat: Optional[float] = None # 위도
|
||||
lng: Optional[float] = None # 경도
|
||||
access_dc: Optional[str] = None # 접근방법 설명
|
||||
access_pt: Optional[str] = None # 주요접근지점
|
||||
sensitive_info: List[SensitiveItem] = [] # 민감자원
|
||||
cleanup_methods: List[str] = [] # 권장 방제 방법
|
||||
end_criteria: List[str] = [] # 권장 방제 중지 기준
|
||||
notes: List[str] = [] # 해안 방제시 고려사항
|
||||
photos: List[PhotoInfo] = [] # 추출된 사진
|
||||
|
||||
|
||||
class ParseResult(BaseModel):
|
||||
"""PDF 파싱 전체 결과."""
|
||||
pdf_filename: str
|
||||
zone_name: str = '' # PDF 헤더에서 추출한 구역명
|
||||
jurisdiction: str = '' # 관할 (보령 해양경비안전서 등)
|
||||
total_sections: int = 0
|
||||
sections: List[CoastalSection] = []
|
||||
skipped_pages: int = 0
|
||||
8758
prediction/scat/output/.geocode_cache.json
Normal file
8758
prediction/scat/output/.geocode_cache.json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
2936
prediction/scat/output/01_(완료)여수해경서_방제정보집_(하동군-광양시).json
Normal file
2936
prediction/scat/output/01_(완료)여수해경서_방제정보집_(하동군-광양시).json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
9300
prediction/scat/output/02_여수해경서_방제정보집_(여수시 율촌산업단지-화양면 안포리).json
Normal file
9300
prediction/scat/output/02_여수해경서_방제정보집_(여수시 율촌산업단지-화양면 안포리).json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
8932
prediction/scat/output/03_여수해경서_방제정보집_(여수시 돌산읍).json
Normal file
8932
prediction/scat/output/03_여수해경서_방제정보집_(여수시 돌산읍).json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
5561
prediction/scat/output/04_여수해경서_방제정보집_(여수시 백야도- 율촌면 상봉리).json
Normal file
5561
prediction/scat/output/04_여수해경서_방제정보집_(여수시 백야도- 율촌면 상봉리).json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
3245
prediction/scat/output/05_여수해경서_방제정보집_(순천시-보성군).json
Normal file
3245
prediction/scat/output/05_여수해경서_방제정보집_(순천시-보성군).json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
6665
prediction/scat/output/06_여수해경서_방제정보집_(고흥군 동강면 죽암리- 포두면 남성리).json
Normal file
6665
prediction/scat/output/06_여수해경서_방제정보집_(고흥군 동강면 죽암리- 포두면 남성리).json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
5031
prediction/scat/output/07_여수해경서_방제정보집_(고흥군 나로도).json
Normal file
5031
prediction/scat/output/07_여수해경서_방제정보집_(고흥군 나로도).json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
7818
prediction/scat/output/07_충남 서천군해안사전평가정보집.json
Normal file
7818
prediction/scat/output/07_충남 서천군해안사전평가정보집.json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
12685
prediction/scat/output/08_여수해경서_방제정보집_(고흥군 도화면 덕중리-대서면 안남리)(1).json
Normal file
12685
prediction/scat/output/08_여수해경서_방제정보집_(고흥군 도화면 덕중리-대서면 안남리)(1).json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
6965
prediction/scat/output/08_전북 군산시해안사전평가정보집.json
Normal file
6965
prediction/scat/output/08_전북 군산시해안사전평가정보집.json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
5626
prediction/scat/output/09_여수해경서 방제정보집_(남해군).json
Normal file
5626
prediction/scat/output/09_여수해경서 방제정보집_(남해군).json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
6947
prediction/scat/output/09_전북 부안군해안사전평가정보집.json
Normal file
6947
prediction/scat/output/09_전북 부안군해안사전평가정보집.json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
4466
prediction/scat/output/10_전북 고창군해안사전평가정보집.json
Normal file
4466
prediction/scat/output/10_전북 고창군해안사전평가정보집.json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
15942
prediction/scat/output/F03_목포해경(신안군 흑산).json
Normal file
15942
prediction/scat/output/F03_목포해경(신안군 흑산).json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
25563
prediction/scat/output/F03_무안군_01.json
Normal file
25563
prediction/scat/output/F03_무안군_01.json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
17854
prediction/scat/output/F04_신안군_01.json
Normal file
17854
prediction/scat/output/F04_신안군_01.json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
6434
prediction/scat/output/F05_목포시영암군_01.json
Normal file
6434
prediction/scat/output/F05_목포시영암군_01.json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
17894
prediction/scat/output/F07_해남군_01.json
Normal file
17894
prediction/scat/output/F07_해남군_01.json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
15360
prediction/scat/output/F07_해남군_02.json
Normal file
15360
prediction/scat/output/F07_해남군_02.json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
26057
prediction/scat/output/F08_완도군_01.json
Normal file
26057
prediction/scat/output/F08_완도군_01.json
Normal file
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
Load Diff
390
prediction/scat/pdf_parser.py
Normal file
390
prediction/scat/pdf_parser.py
Normal file
@ -0,0 +1,390 @@
|
||||
"""PDF 텍스트 파싱 — 상태머신 방식으로 해안사전평가 정보를 추출한다."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from enum import Enum, auto
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Tuple, Union
|
||||
|
||||
import fitz # PyMuPDF
|
||||
|
||||
from models import CoastalSection, SensitiveItem, ParseResult
|
||||
from esi_mapper import map_esi
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 상태머신 상태
|
||||
# ---------------------------------------------------------------------------
|
||||
class State(Enum):
|
||||
HEADER = auto()
|
||||
GENERAL = auto() # 해안 일반특성
|
||||
ACCESS = auto() # 접근방법
|
||||
SENSITIVE = auto() # 민감자원 정보
|
||||
CLEANUP = auto() # 권장 방제 방법
|
||||
END_CRITERIA = auto() # 권장 방제 중지 기준
|
||||
CONSIDER = auto() # 해안 방제시 고려사항
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 섹션 시작 키워드 → 상태 매핑
|
||||
# ---------------------------------------------------------------------------
|
||||
_SECTION_KEYWORDS: list[tuple[str, State]] = [
|
||||
('해안 일반특성', State.GENERAL),
|
||||
('해안 일반 특성', State.GENERAL),
|
||||
('접근방법', State.ACCESS),
|
||||
('접근 방법', State.ACCESS),
|
||||
('민감자원 정보', State.SENSITIVE),
|
||||
('민감자원정보', State.SENSITIVE),
|
||||
('권장 방제 방법', State.CLEANUP),
|
||||
('권장방제방법', State.CLEANUP),
|
||||
('권 장 방 제 방 법', State.CLEANUP),
|
||||
('권장 방제 중지 기준', State.END_CRITERIA),
|
||||
('권장방제중지기준', State.END_CRITERIA),
|
||||
('권 장 방 제 중 지 기 준', State.END_CRITERIA),
|
||||
('해안 방제시 고려사항', State.CONSIDER),
|
||||
('해안 방제 시 고려사항', State.CONSIDER),
|
||||
('해안방제시 고려사항', State.CONSIDER),
|
||||
]
|
||||
|
||||
# 코드 패턴: SSDD-1, BRSM-12, DDIS-1 등
|
||||
_CODE_RE = re.compile(r'\(([A-Z]{2,}-\d+)\)')
|
||||
_NAME_CODE_RE = re.compile(r'(.+?)\s*\(([A-Z]{2,}-\d+)\)')
|
||||
_LENGTH_RE = re.compile(r'약\s*([\d,]+\.?\d*)\s*m\s*임?')
|
||||
_WIDTH_RE = re.compile(r'폭[은는]?\s*약\s*([\d,]+\.?\d*)\s*m')
|
||||
_NUMBER_RE = re.compile(r'^\d+$')
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 유틸 함수
|
||||
# ---------------------------------------------------------------------------
|
||||
def _clean_bullet(line: str) -> str:
|
||||
"""불릿 접두사(Ÿ, ·, •, -) 제거 후 strip."""
|
||||
return line.lstrip('Ÿ \t·•- ').strip()
|
||||
|
||||
|
||||
def _is_bullet(line: str) -> bool:
|
||||
"""불릿으로 시작하는 줄인지 확인."""
|
||||
stripped = line.strip()
|
||||
return stripped.startswith('Ÿ') or stripped.startswith('·') or stripped.startswith('•')
|
||||
|
||||
|
||||
def _is_sub_bullet(line: str) -> bool:
|
||||
"""서브 불릿(- 접두사)으로 시작하는 줄인지 확인."""
|
||||
stripped = line.strip()
|
||||
return stripped.startswith('-') and len(stripped) > 1
|
||||
|
||||
|
||||
def _is_end_criteria_item(text: str) -> bool:
|
||||
"""방제 중지 기준 항목인지 판별 (조건문 패턴)."""
|
||||
criteria_patterns = [
|
||||
'없어야', '않아야', '미만', '이하', '이상',
|
||||
'분포해야', '발생하지',
|
||||
]
|
||||
return any(p in text for p in criteria_patterns)
|
||||
|
||||
|
||||
def _parse_measurement(text: str, pattern: re.Pattern) -> float | None:
|
||||
"""정규식으로 수치 추출."""
|
||||
m = pattern.search(text)
|
||||
if m:
|
||||
return float(m.group(1).replace(',', ''))
|
||||
return None
|
||||
|
||||
|
||||
def _detect_section_keyword(line: str) -> State | None:
|
||||
"""줄이 섹션 시작 키워드를 포함하는지 확인."""
|
||||
normalized = line.replace(' ', '')
|
||||
for keyword, state in _SECTION_KEYWORDS:
|
||||
if keyword.replace(' ', '') in normalized:
|
||||
return state
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 데이터 페이지 판별
|
||||
# ---------------------------------------------------------------------------
|
||||
def is_data_page(page: fitz.Page) -> bool:
|
||||
"""데이터 페이지인지 판별 — 코드 패턴 + 키워드 존재 여부."""
|
||||
text = page.get_text('text')
|
||||
has_code = bool(_CODE_RE.search(text))
|
||||
has_keyword = '일반특성' in text or '접근방법' in text or '방제 방법' in text
|
||||
return has_code and has_keyword
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 단일 페이지 파싱
|
||||
# ---------------------------------------------------------------------------
|
||||
def _merge_bullet_lines(raw_lines: list) -> list:
|
||||
"""F-series 형식: 'Ÿ' 단독 줄 + 다음 줄 텍스트를 병합."""
|
||||
merged = []
|
||||
i = 0
|
||||
while i < len(raw_lines):
|
||||
line = raw_lines[i].strip()
|
||||
if line == 'Ÿ' and i + 1 < len(raw_lines):
|
||||
# 다음 줄과 병합
|
||||
merged.append('Ÿ ' + raw_lines[i + 1].strip())
|
||||
i += 2
|
||||
elif line:
|
||||
merged.append(line)
|
||||
i += 1
|
||||
else:
|
||||
i += 1
|
||||
return merged
|
||||
|
||||
|
||||
def parse_page(page: fitz.Page) -> CoastalSection | None:
|
||||
"""데이터 페이지에서 CoastalSection 추출."""
|
||||
text = page.get_text('text')
|
||||
raw_lines = text.split('\n')
|
||||
lines = _merge_bullet_lines(raw_lines)
|
||||
|
||||
section = CoastalSection()
|
||||
state = State.HEADER
|
||||
|
||||
# 현재 섹션에 수집 중인 불릿 항목들
|
||||
current_bullets: list[str] = []
|
||||
# 민감자원 서브 섹션 추적
|
||||
sensitive_sub: str = ''
|
||||
sensitive_items: list[SensitiveItem] = []
|
||||
# 방제방법+중지기준 두 컬럼 병합 모드
|
||||
cleanup_merged = False
|
||||
|
||||
def _flush_bullets():
|
||||
"""현재 상태의 불릿을 section에 반영."""
|
||||
nonlocal current_bullets, sensitive_sub
|
||||
if state == State.GENERAL:
|
||||
_parse_general(section, current_bullets)
|
||||
elif state == State.ACCESS:
|
||||
_parse_access(section, current_bullets)
|
||||
elif state == State.SENSITIVE:
|
||||
if sensitive_sub and current_bullets:
|
||||
sensitive_items.append(SensitiveItem(
|
||||
t=sensitive_sub,
|
||||
v='\n'.join(current_bullets),
|
||||
))
|
||||
elif state == State.CLEANUP:
|
||||
section.cleanup_methods = current_bullets[:]
|
||||
elif state == State.END_CRITERIA:
|
||||
# 병합 모드: 불릿을 방제방법/중지기준으로 분류
|
||||
if cleanup_merged:
|
||||
_split_cleanup_and_criteria(section, current_bullets)
|
||||
else:
|
||||
section.end_criteria = current_bullets[:]
|
||||
elif state == State.CONSIDER:
|
||||
section.notes = current_bullets[:]
|
||||
current_bullets = []
|
||||
|
||||
for line in lines:
|
||||
# 페이지 헤더/푸터 스킵
|
||||
if '해양경비안전서' in line and ('관할' in line or '정보집' in line):
|
||||
continue
|
||||
if '해안사전평가 정보' in line and '∙' in line:
|
||||
continue
|
||||
if '해양경찰서' in line and ('관할' in line or '정보집' in line):
|
||||
continue
|
||||
|
||||
# 섹션 전환 감지
|
||||
new_state = _detect_section_keyword(line)
|
||||
if new_state and new_state != state:
|
||||
# 방제방법→중지기준 헤더가 연속 (두 컬럼 레이아웃)
|
||||
if state == State.CLEANUP and new_state == State.END_CRITERIA and not current_bullets:
|
||||
cleanup_merged = True
|
||||
state = State.END_CRITERIA
|
||||
continue
|
||||
_flush_bullets()
|
||||
state = new_state
|
||||
sensitive_sub = ''
|
||||
continue
|
||||
|
||||
# HEADER 상태: 번호 + 지역명/코드명 추출
|
||||
if state == State.HEADER:
|
||||
if _NUMBER_RE.match(line):
|
||||
section.section_number = int(line)
|
||||
continue
|
||||
m = _NAME_CODE_RE.search(line)
|
||||
if m:
|
||||
section.sect_nm = m.group(1).strip()
|
||||
section.sect_cd = m.group(2).strip()
|
||||
continue
|
||||
continue
|
||||
|
||||
# 민감자원: 서브 섹션 감지 (Ÿ 불릿 또는 일반 텍스트)
|
||||
if state == State.SENSITIVE:
|
||||
cleaned_for_check = _clean_bullet(line) if _is_bullet(line) else line
|
||||
if '경제적' in cleaned_for_check and '자원' in cleaned_for_check:
|
||||
if sensitive_sub and current_bullets:
|
||||
sensitive_items.append(SensitiveItem(
|
||||
t=sensitive_sub, v='\n'.join(current_bullets),
|
||||
))
|
||||
sensitive_sub = '사회경제적'
|
||||
current_bullets = []
|
||||
continue
|
||||
if '생물자원' in cleaned_for_check:
|
||||
if sensitive_sub and current_bullets:
|
||||
sensitive_items.append(SensitiveItem(
|
||||
t=sensitive_sub, v='\n'.join(current_bullets),
|
||||
))
|
||||
sensitive_sub = '생물자원'
|
||||
current_bullets = []
|
||||
continue
|
||||
|
||||
# 불릿 항목 수집
|
||||
if _is_bullet(line):
|
||||
current_bullets.append(_clean_bullet(line))
|
||||
elif _is_sub_bullet(line):
|
||||
# "-" 접두사 서브 항목 (민감자원 상세 등)
|
||||
cleaned = line.strip().lstrip('-').strip()
|
||||
if cleaned:
|
||||
current_bullets.append(cleaned)
|
||||
elif current_bullets and line and not _detect_section_keyword(line):
|
||||
# 연속행 (불릿 없이 이어지는 텍스트)
|
||||
cleaned = line.strip()
|
||||
if cleaned:
|
||||
current_bullets[-1] += ' ' + cleaned
|
||||
|
||||
# 마지막 섹션 flush
|
||||
_flush_bullets()
|
||||
section.sensitive_info = sensitive_items
|
||||
|
||||
if not section.sect_cd:
|
||||
return None
|
||||
|
||||
# ESI 등급 매핑 (cst_tp_cd 기반)
|
||||
if section.cst_tp_cd:
|
||||
section.esi_cd, section.esi_num = map_esi(section.cst_tp_cd)
|
||||
|
||||
return section
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 섹션별 파싱 헬퍼
|
||||
# ---------------------------------------------------------------------------
|
||||
def _parse_general(section: CoastalSection, bullets: list[str]):
|
||||
"""해안 일반특성 불릿에서 shore_tp, cst_tp_cd, len_m, width_m 추출."""
|
||||
for b in bullets:
|
||||
if '형태' in b:
|
||||
if '폐쇄' in b:
|
||||
section.shore_tp = '폐쇄형'
|
||||
elif '개방' in b:
|
||||
section.shore_tp = '개방형'
|
||||
elif '반폐쇄' in b or '반 폐쇄' in b:
|
||||
section.shore_tp = '반폐쇄형'
|
||||
elif '이루어져' in b or '조성' in b or '으로 이루어' in b:
|
||||
# 해안 구성 추출
|
||||
section.cst_tp_cd = _extract_coastal_type(b)
|
||||
length = _parse_measurement(b, _LENGTH_RE)
|
||||
if length and not section.len_m:
|
||||
section.len_m = length
|
||||
width = _parse_measurement(b, _WIDTH_RE)
|
||||
if width:
|
||||
section.width_m = width
|
||||
|
||||
|
||||
def _extract_coastal_type(text: str) -> str:
|
||||
"""해안 구성 유형 추출."""
|
||||
types = [
|
||||
'투과성 인공호안', '비투과성 인공호안', '인공호안',
|
||||
'모래', '세립질 모래', '굵은 모래',
|
||||
'자갈', '수직암반', '수평암반',
|
||||
'갯벌', '습지', '사석',
|
||||
'콘크리트', '테트라포드',
|
||||
]
|
||||
for t in types:
|
||||
if t in text:
|
||||
return t
|
||||
# fallback: "해안은 XXX으로 이루어져" 패턴
|
||||
m = re.search(r'해안은\s+(.+?)(?:으로|로)\s*이루어져', text)
|
||||
if m:
|
||||
return m.group(1).strip()
|
||||
return text
|
||||
|
||||
|
||||
def _split_cleanup_and_criteria(section: CoastalSection, bullets: list[str]):
|
||||
"""두 컬럼이 병합된 불릿을 방제방법/중지기준으로 분류."""
|
||||
cleanup = []
|
||||
criteria = []
|
||||
for b in bullets:
|
||||
if _is_end_criteria_item(b):
|
||||
criteria.append(b)
|
||||
else:
|
||||
cleanup.append(b)
|
||||
section.cleanup_methods = cleanup
|
||||
section.end_criteria = criteria
|
||||
|
||||
|
||||
def _parse_access(section: CoastalSection, bullets: list[str]):
|
||||
"""접근방법 불릿에서 access_dc, access_pt 추출."""
|
||||
access_parts = []
|
||||
for b in bullets:
|
||||
if '주요접근지점' in b or '주요 접근지점' in b or '주요접근 지점' in b:
|
||||
# "주요접근지점 : 부사방조제" 패턴
|
||||
parts = re.split(r'[::]', b, maxsplit=1)
|
||||
if len(parts) > 1:
|
||||
section.access_pt = parts[1].strip()
|
||||
else:
|
||||
section.access_pt = b.replace('주요접근지점', '').strip()
|
||||
else:
|
||||
access_parts.append(b)
|
||||
if access_parts:
|
||||
section.access_dc = ' / '.join(access_parts)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 전체 PDF 파싱
|
||||
# ---------------------------------------------------------------------------
|
||||
def parse_pdf(pdf_path: str | Path) -> ParseResult:
|
||||
"""PDF 전체를 파싱하여 ParseResult 반환."""
|
||||
pdf_path = Path(pdf_path)
|
||||
doc = fitz.open(str(pdf_path))
|
||||
|
||||
result = ParseResult(
|
||||
pdf_filename=pdf_path.name,
|
||||
)
|
||||
|
||||
# 관할/구역명 추출 시도 (첫 30페이지 탐색)
|
||||
for i in range(min(35, doc.page_count)):
|
||||
text = doc[i].get_text('text')
|
||||
if '관할' in text and '해안' in text:
|
||||
# "보령 해양경비안전서 관할" 패턴
|
||||
m = re.search(r'(\S+\s*해양경[비찰]\S*)\s*관할', text)
|
||||
if m and not result.jurisdiction:
|
||||
result.jurisdiction = m.group(1).strip()
|
||||
# 구역명: "X. 충남 서천군 해안 사전평가 정보" 패턴
|
||||
m = re.search(r'\d+\.\s*(.+?)\s*해안\s*사전평가', text)
|
||||
if m and not result.zone_name:
|
||||
result.zone_name = m.group(1).strip()
|
||||
if result.jurisdiction and result.zone_name:
|
||||
break
|
||||
|
||||
# 데이터 페이지 파싱
|
||||
skipped = 0
|
||||
for i in range(doc.page_count):
|
||||
page = doc[i]
|
||||
if not is_data_page(page):
|
||||
skipped += 1
|
||||
continue
|
||||
section = parse_page(page)
|
||||
if section:
|
||||
result.sections.append(section)
|
||||
|
||||
result.total_sections = len(result.sections)
|
||||
result.skipped_pages = skipped
|
||||
doc.close()
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI 실행
|
||||
# ---------------------------------------------------------------------------
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
import json
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
print('Usage: python parser.py <pdf_path>')
|
||||
sys.exit(1)
|
||||
|
||||
r = parse_pdf(sys.argv[1])
|
||||
print(json.dumps(r.model_dump(), ensure_ascii=False, indent=2))
|
||||
341
prediction/scat/pdf_parser_b.py
Normal file
341
prediction/scat/pdf_parser_b.py
Normal file
@ -0,0 +1,341 @@
|
||||
"""PDF 텍스트 파싱 — Type B (방제정보집) 형식.
|
||||
|
||||
여수해경서 등의 '방제정보집' PDF에서 해안 구간 정보를 추출한다.
|
||||
Type A(해안사전평가정보집)와 다른 레이아웃:
|
||||
- 코드: HDPJ-1-M-E-R-P-L (괄호 없음)
|
||||
- 페이지당 2개 구간
|
||||
- '식별자 코드명' 라벨로 구간 시작
|
||||
- '민감자원' 섹션 없음
|
||||
- '초기 방제 및 고려사항' 섹션
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
import fitz # PyMuPDF
|
||||
|
||||
from models import CoastalSection, ParseResult
|
||||
from esi_mapper import parse_esi_cd
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 정규식 패턴
|
||||
# ---------------------------------------------------------------------------
|
||||
# HDPJ-1-M-E-R-P-L, HDDH-10-MI-E-R-P-L 등
|
||||
_CODE_RE_B = re.compile(r'([A-Z]{2,}-\d+(?:-[A-Z]{1,2}){0,5}(?:-[A-Z])*)')
|
||||
_ESI_RE = re.compile(r'ESI\s*등급\s*[::]\s*(\d+[A-Z]?)')
|
||||
_LENGTH_RE = re.compile(r'([\d,.]+)\s*/\s*(-|[\d,.]+(?:\.\d+)?)')
|
||||
_NUMBER_RE = re.compile(r'^(\d{1,3})$')
|
||||
_SECTION_START_RE = re.compile(r'식별자\s+코드명')
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 유틸
|
||||
# ---------------------------------------------------------------------------
|
||||
def _clean_bullet(line: str) -> str:
|
||||
return line.lstrip('Ÿ \t·•- ').strip()
|
||||
|
||||
|
||||
def _is_bullet(line: str) -> bool:
|
||||
s = line.strip()
|
||||
return s.startswith('Ÿ') or s.startswith('·') or s.startswith('•')
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 데이터 페이지 판별
|
||||
# ---------------------------------------------------------------------------
|
||||
def is_data_page_b(page: fitz.Page) -> bool:
|
||||
text = page.get_text('text')
|
||||
has_identifier = bool(_SECTION_START_RE.search(text))
|
||||
has_code = bool(_CODE_RE_B.search(text))
|
||||
return has_identifier and has_code
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 단일 구간 블록 파싱
|
||||
# ---------------------------------------------------------------------------
|
||||
def _parse_section_block(lines: list[str], area_name: str) -> CoastalSection | None:
|
||||
"""식별자 코드명 ~ 다음 식별자 코드명 사이의 텍스트 블록을 파싱."""
|
||||
section = CoastalSection()
|
||||
section.sect_nm = area_name
|
||||
|
||||
# Phase: code → general → cleanup → end_criteria → consider
|
||||
phase = 'code'
|
||||
cleanup_items: list[str] = []
|
||||
end_criteria_items: list[str] = []
|
||||
consider_items: list[str] = []
|
||||
current_target: list[str] | None = None
|
||||
|
||||
i = 0
|
||||
while i < len(lines):
|
||||
line = lines[i].strip()
|
||||
i += 1
|
||||
|
||||
if not line:
|
||||
continue
|
||||
|
||||
# 코드 추출
|
||||
if phase == 'code':
|
||||
m = _CODE_RE_B.search(line)
|
||||
if m:
|
||||
section.sect_cd = m.group(1)
|
||||
phase = 'general'
|
||||
else:
|
||||
# 구간 번호
|
||||
nm = _NUMBER_RE.match(line)
|
||||
if nm:
|
||||
section.section_number = int(nm.group(1))
|
||||
continue
|
||||
|
||||
# 해안 형태/저질 특성 + ESI + 길이
|
||||
if phase == 'general':
|
||||
# 형태
|
||||
if '형태' in line and ':' in line:
|
||||
val = line.split(':', 1)[1].strip().split(':')[-1].strip()
|
||||
if val:
|
||||
section.shore_tp = val
|
||||
continue
|
||||
# 퇴적물
|
||||
if '퇴적물' in line and ':' in line:
|
||||
val = line.split(':', 1)[1].strip().split(':')[-1].strip()
|
||||
if val:
|
||||
section.cst_tp_cd = val
|
||||
continue
|
||||
# ESI
|
||||
m = _ESI_RE.search(line)
|
||||
if m:
|
||||
section.esi_cd, section.esi_num = parse_esi_cd(m.group(1))
|
||||
continue
|
||||
# 길이/폭
|
||||
m = _LENGTH_RE.search(line)
|
||||
if m:
|
||||
try:
|
||||
section.len_m = float(m.group(1).replace(',', ''))
|
||||
except ValueError:
|
||||
pass
|
||||
width_str = m.group(2)
|
||||
if width_str != '-':
|
||||
# '2차선 도로' 등 비숫자 후속 방지
|
||||
end_pos = m.end(2)
|
||||
after = line[end_pos:end_pos + 1] if end_pos < len(line) else ''
|
||||
if not after or after in (' ', '\t', '\n', ')', ''):
|
||||
try:
|
||||
section.width_m = float(width_str.replace(',', ''))
|
||||
except ValueError:
|
||||
pass
|
||||
continue
|
||||
# 접근성 — þ 마커들은 접근성 열에 해당
|
||||
if 'þ' in line:
|
||||
continue
|
||||
# 섹션 전환 감지
|
||||
normalized = line.replace(' ', '')
|
||||
if '권장방제방법' in normalized:
|
||||
phase = 'cleanup'
|
||||
current_target = cleanup_items
|
||||
continue
|
||||
if '접근성' in normalized or '차량' in normalized or '도로' in normalized or '도보' in normalized or '선박' in normalized:
|
||||
continue
|
||||
if '해안길이' in normalized or '대표' in normalized or '사진' in normalized:
|
||||
continue
|
||||
continue
|
||||
|
||||
# 방제 방법 / 종료 기준 (두 컬럼이 같은 줄에 섞여 나옴)
|
||||
if phase == 'cleanup':
|
||||
normalized = line.replace(' ', '')
|
||||
if '방제종료기준' in normalized:
|
||||
# 헤더만 있는 줄 — 이후 불릿은 종료기준
|
||||
current_target = end_criteria_items
|
||||
continue
|
||||
if '초기방제' in normalized and '고려사항' in normalized:
|
||||
phase = 'consider'
|
||||
current_target = consider_items
|
||||
continue
|
||||
if _is_bullet(line):
|
||||
text = _clean_bullet(line)
|
||||
if text:
|
||||
# 두 컬럼이 혼합된 경우 heuristic 분류
|
||||
if _is_criteria(text):
|
||||
end_criteria_items.append(text)
|
||||
else:
|
||||
cleanup_items.append(text)
|
||||
continue
|
||||
|
||||
# 고려사항
|
||||
if phase == 'consider':
|
||||
if _is_bullet(line):
|
||||
text = _clean_bullet(line)
|
||||
if text:
|
||||
consider_items.append(text)
|
||||
elif consider_items and line and not _SECTION_START_RE.search(line):
|
||||
# 연속행
|
||||
consider_items[-1] += ' ' + line
|
||||
continue
|
||||
|
||||
section.cleanup_methods = cleanup_items
|
||||
section.end_criteria = end_criteria_items
|
||||
section.notes = consider_items
|
||||
|
||||
if not section.sect_cd:
|
||||
return None
|
||||
return section
|
||||
|
||||
|
||||
def _is_criteria(text: str) -> bool:
|
||||
patterns = ['없어야', '않아야', '미만', '이하', '이상', '분포해야',
|
||||
'분포하면', '발생하지', '묻어나지', '유출되지', '관찰되는']
|
||||
return any(p in text for p in patterns)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 접근성 추출
|
||||
# ---------------------------------------------------------------------------
|
||||
def _extract_accessibility(text_block: str) -> str | None:
|
||||
"""블록 텍스트에서 접근성(차량/도보/선박) 추출."""
|
||||
lines = text_block.split('\n')
|
||||
access_types = []
|
||||
# 접근성 헤더 찾기
|
||||
header_idx = -1
|
||||
for idx, line in enumerate(lines):
|
||||
s = line.replace(' ', '')
|
||||
if '접근성' in s:
|
||||
header_idx = idx
|
||||
break
|
||||
|
||||
if header_idx < 0:
|
||||
return None
|
||||
|
||||
# 헤더 이후에서 차량/도로/도보/선박 라벨과 þ 위치 매칭
|
||||
labels = []
|
||||
for idx in range(header_idx, min(header_idx + 5, len(lines))):
|
||||
line = lines[idx]
|
||||
for label in ['차량', '도로', '도보', '선박']:
|
||||
if label in line.replace(' ', ''):
|
||||
labels.append('도로' if label == '도로' else label)
|
||||
|
||||
# þ 마커 수 세기
|
||||
check_count = 0
|
||||
for idx in range(header_idx, min(header_idx + 8, len(lines))):
|
||||
check_count += lines[idx].count('þ')
|
||||
|
||||
if labels and check_count > 0:
|
||||
# þ 개수만큼 앞에서부터 접근 가능
|
||||
accessed = labels[:check_count] if check_count <= len(labels) else labels
|
||||
return ', '.join(accessed) + ' 접근 가능'
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 페이지 파싱 (여러 구간)
|
||||
# ---------------------------------------------------------------------------
|
||||
def parse_page_b(page: fitz.Page) -> list[CoastalSection]:
|
||||
"""데이터 페이지에서 CoastalSection 목록 추출 (보통 2개)."""
|
||||
text = page.get_text('text')
|
||||
lines = text.split('\n')
|
||||
|
||||
# 지역명 추출 (첫 줄 또는 헤더)
|
||||
area_name = ''
|
||||
for line in lines[:3]:
|
||||
stripped = line.strip()
|
||||
if stripped and '정보집' not in stripped and '∙' not in stripped:
|
||||
area_name = stripped
|
||||
break
|
||||
|
||||
# 접근성 추출 (페이지 전체에서)
|
||||
accessibility = _extract_accessibility(text)
|
||||
|
||||
# 구간 블록 분리: "식별자 코드명" 기준
|
||||
block_starts: list[int] = []
|
||||
for idx, line in enumerate(lines):
|
||||
if _SECTION_START_RE.search(line):
|
||||
# 구간 번호는 이전 줄에 있을 수 있음
|
||||
start = idx
|
||||
if idx > 0 and _NUMBER_RE.match(lines[idx - 1].strip()):
|
||||
start = idx - 1
|
||||
block_starts.append(start)
|
||||
|
||||
if not block_starts:
|
||||
return []
|
||||
|
||||
sections: list[CoastalSection] = []
|
||||
for i, start in enumerate(block_starts):
|
||||
end = block_starts[i + 1] if i + 1 < len(block_starts) else len(lines)
|
||||
block = lines[start:end]
|
||||
section = _parse_section_block(block, area_name)
|
||||
if section:
|
||||
if accessibility and not section.access_dc:
|
||||
section.access_dc = accessibility
|
||||
sections.append(section)
|
||||
|
||||
# 접근성은 구간마다 다를 수 있음 — 각 블록에서 개별 추출
|
||||
for i, start in enumerate(block_starts):
|
||||
end = block_starts[i + 1] if i + 1 < len(block_starts) else len(lines)
|
||||
block_text = '\n'.join(lines[start:end])
|
||||
acc = _extract_accessibility(block_text)
|
||||
if acc and i < len(sections):
|
||||
sections[i].access_dc = acc
|
||||
|
||||
return sections
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 전체 PDF 파싱
|
||||
# ---------------------------------------------------------------------------
|
||||
def parse_pdf_b(pdf_path: str | Path) -> ParseResult:
|
||||
"""Type B PDF 전체를 파싱하여 ParseResult 반환."""
|
||||
pdf_path = Path(pdf_path)
|
||||
doc = fitz.open(str(pdf_path))
|
||||
|
||||
result = ParseResult(pdf_filename=pdf_path.name)
|
||||
|
||||
# 관할/구역명 추출 (페이지 헤더/푸터에서)
|
||||
for i in range(min(40, doc.page_count)):
|
||||
text = doc[i].get_text('text')
|
||||
# "여수해경서 관할 해안 사전 평가 정보집" 패턴
|
||||
m = re.search(r'(\S+해경서)\s*관할', text)
|
||||
if m and not result.jurisdiction:
|
||||
result.jurisdiction = m.group(1).strip()
|
||||
# "2. 하동군 해안 사전 평가 정보" 패턴
|
||||
m = re.search(r'\d+\.\s*(.+?)\s*해안\s*사전\s*평가', text)
|
||||
if m and not result.zone_name:
|
||||
result.zone_name = m.group(1).strip()
|
||||
if result.jurisdiction and result.zone_name:
|
||||
break
|
||||
|
||||
# 데이터 페이지 파싱
|
||||
skipped = 0
|
||||
for i in range(doc.page_count):
|
||||
page = doc[i]
|
||||
if not is_data_page_b(page):
|
||||
skipped += 1
|
||||
continue
|
||||
sections = parse_page_b(page)
|
||||
result.sections.extend(sections)
|
||||
|
||||
result.total_sections = len(result.sections)
|
||||
result.skipped_pages = skipped
|
||||
doc.close()
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
import json
|
||||
import io
|
||||
|
||||
if sys.platform == 'win32':
|
||||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
print('Usage: python pdf_parser_b.py <pdf_path>')
|
||||
sys.exit(1)
|
||||
|
||||
r = parse_pdf_b(sys.argv[1])
|
||||
print(json.dumps(r.model_dump(), ensure_ascii=False, indent=2))
|
||||
14
prediction/scat/requirements.txt
Normal file
14
prediction/scat/requirements.txt
Normal file
@ -0,0 +1,14 @@
|
||||
# PDF 파싱
|
||||
PyMuPDF==1.26.5
|
||||
|
||||
# 이미지 처리
|
||||
Pillow==10.3.0
|
||||
|
||||
# DB (추후 사용)
|
||||
psycopg2-binary==2.9.9
|
||||
|
||||
# 데이터 모델
|
||||
pydantic==2.7.0
|
||||
|
||||
# HTTP (Geocoding)
|
||||
requests>=2.31.0
|
||||
378
prediction/scat/run.py
Normal file
378
prediction/scat/run.py
Normal file
@ -0,0 +1,378 @@
|
||||
"""SCAT PDF 파싱 CLI 도구.
|
||||
|
||||
사용법:
|
||||
python run.py <pdf_path> # 단일 PDF 파싱
|
||||
python run.py <directory_path> # 배치 파싱
|
||||
python run.py --load-json output/ --geocode # JSON에 좌표 추가
|
||||
python run.py --load-json output/ --save # JSON → DB 저장
|
||||
python run.py --load-json output/ --save --dry-run # DB 저장 미리보기
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import io
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
|
||||
# Windows cp949 대응
|
||||
if sys.platform == 'win32':
|
||||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
|
||||
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
|
||||
|
||||
import fitz
|
||||
|
||||
from pdf_parser import parse_pdf
|
||||
from pdf_parser_b import parse_pdf_b
|
||||
from models import CoastalSection, SensitiveItem
|
||||
|
||||
OUTPUT_DIR = Path(__file__).parent / 'output'
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# PDF 형식 감지
|
||||
# ---------------------------------------------------------------------------
|
||||
def detect_pdf_type(pdf_path: Path) -> str:
|
||||
"""PDF 형식 감지. 'A'(해안사전평가정보집) 또는 'B'(방제정보집) 반환."""
|
||||
doc = fitz.open(str(pdf_path))
|
||||
for i in range(min(30, doc.page_count)):
|
||||
text = doc[i].get_text('text')
|
||||
if '식별자' in text and '코드명' in text:
|
||||
doc.close()
|
||||
return 'B'
|
||||
doc.close()
|
||||
return 'A'
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# PDF 파싱
|
||||
# ---------------------------------------------------------------------------
|
||||
def process_pdf(pdf_path: Path) -> dict:
|
||||
"""단일 PDF를 파싱하고 JSON 파일로 저장한다."""
|
||||
pdf_type = detect_pdf_type(pdf_path)
|
||||
if pdf_type == 'B':
|
||||
result = parse_pdf_b(str(pdf_path))
|
||||
else:
|
||||
result = parse_pdf(str(pdf_path))
|
||||
data = result.model_dump()
|
||||
|
||||
for s in data['sections']:
|
||||
s.pop('photos', None)
|
||||
|
||||
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
out_path = OUTPUT_DIR / f'{pdf_path.stem}.json'
|
||||
with open(out_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||
|
||||
return {
|
||||
'file': pdf_path.name,
|
||||
'output': str(out_path),
|
||||
'zone_name': result.zone_name,
|
||||
'jurisdiction': result.jurisdiction,
|
||||
'total_sections': result.total_sections,
|
||||
'skipped_pages': result.skipped_pages,
|
||||
}
|
||||
|
||||
|
||||
def run_parse(target: Path):
|
||||
"""PDF 파싱 실행."""
|
||||
if target.is_file() and target.suffix.lower() == '.pdf':
|
||||
pdf_files = [target]
|
||||
elif target.is_dir():
|
||||
pdf_files = sorted(target.glob('*.pdf'))
|
||||
if not pdf_files:
|
||||
print(f'PDF 파일을 찾을 수 없습니다: {target}')
|
||||
sys.exit(1)
|
||||
print(f'{len(pdf_files)}개 PDF 발견\n')
|
||||
else:
|
||||
print(f'유효하지 않은 경로: {target}')
|
||||
sys.exit(1)
|
||||
|
||||
results = []
|
||||
for i, pdf in enumerate(pdf_files, 1):
|
||||
pdf_type = detect_pdf_type(pdf)
|
||||
print(f'[{i}/{len(pdf_files)}] {pdf.name} (Type {pdf_type}) 파싱 중...')
|
||||
try:
|
||||
info = process_pdf(pdf)
|
||||
results.append(info)
|
||||
print(f' -> {info["total_sections"]}개 구간 | {info["zone_name"]} | {info["jurisdiction"]}')
|
||||
print(f' -> 저장: {info["output"]}')
|
||||
except Exception as e:
|
||||
print(f' -> 오류: {e}')
|
||||
results.append({'file': pdf.name, 'error': str(e)})
|
||||
|
||||
if len(results) > 1:
|
||||
print(f'\n=== 요약 ===')
|
||||
success = [r for r in results if 'error' not in r]
|
||||
failed = [r for r in results if 'error' in r]
|
||||
total_sections = sum(r['total_sections'] for r in success)
|
||||
print(f'성공: {len(success)}개 / 실패: {len(failed)}개 / 총 구간: {total_sections}개')
|
||||
if failed:
|
||||
print(f'실패 파일: {", ".join(r["file"] for r in failed)}')
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# JSON → DB 저장
|
||||
# ---------------------------------------------------------------------------
|
||||
def _extract_zone_cd(sect_cd: str) -> str:
|
||||
"""sect_cd에서 zone_cd 추출 (영문 접두사).
|
||||
|
||||
Type A: SSDD-1 → SSDD (하이픈 앞 영문)
|
||||
Type B: BSBB-1-M-E-S-N → BSBB (첫 하이픈 앞 영문)
|
||||
"""
|
||||
m = re.match(r'^([A-Z]{2,})', sect_cd)
|
||||
return m.group(1) if m else sect_cd
|
||||
|
||||
|
||||
def _extract_jrsd_short(jurisdiction: str) -> str:
|
||||
"""관할 기관명에서 짧은 이름 추출. 예: '보령 해양경비안전서' → '보령'"""
|
||||
if not jurisdiction:
|
||||
return ''
|
||||
return jurisdiction.split()[0] if ' ' in jurisdiction else jurisdiction
|
||||
|
||||
|
||||
def _dict_to_section(d: dict) -> CoastalSection:
|
||||
"""JSON dict → CoastalSection 모델 변환."""
|
||||
sensitive = [SensitiveItem(**item) for item in (d.get('sensitive_info') or [])]
|
||||
return CoastalSection(
|
||||
section_number=d.get('section_number', 0),
|
||||
sect_nm=d.get('sect_nm', ''),
|
||||
sect_cd=d.get('sect_cd', ''),
|
||||
esi_cd=d.get('esi_cd'),
|
||||
esi_num=d.get('esi_num'),
|
||||
shore_tp=d.get('shore_tp'),
|
||||
cst_tp_cd=d.get('cst_tp_cd'),
|
||||
len_m=d.get('len_m'),
|
||||
width_m=d.get('width_m'),
|
||||
lat=d.get('lat'),
|
||||
lng=d.get('lng'),
|
||||
access_dc=d.get('access_dc'),
|
||||
access_pt=d.get('access_pt'),
|
||||
sensitive_info=sensitive,
|
||||
cleanup_methods=d.get('cleanup_methods', []),
|
||||
end_criteria=d.get('end_criteria', []),
|
||||
notes=d.get('notes', []),
|
||||
)
|
||||
|
||||
|
||||
def load_json_files(json_dir: Path) -> list[dict]:
|
||||
"""JSON 디렉토리에서 모든 파싱 결과를 로드한다."""
|
||||
all_data = []
|
||||
for f in sorted(json_dir.glob('*.json')):
|
||||
with open(f, encoding='utf-8') as fp:
|
||||
data = json.load(fp)
|
||||
if data.get('total_sections', 0) > 0:
|
||||
all_data.append(data)
|
||||
return all_data
|
||||
|
||||
|
||||
def group_by_zone(all_data: list[dict]) -> dict:
|
||||
"""파싱 결과를 zone_cd로 그룹핑한다.
|
||||
|
||||
Returns:
|
||||
{zone_cd: {
|
||||
'zone_nm': str,
|
||||
'jrsd_nm': str,
|
||||
'sections': [dict, ...]
|
||||
}}
|
||||
"""
|
||||
zones = defaultdict(lambda: {'zone_nm': '', 'jrsd_nm': '', 'sections': []})
|
||||
|
||||
for data in all_data:
|
||||
zone_name = data.get('zone_name', '')
|
||||
jrsd_nm = _extract_jrsd_short(data.get('jurisdiction', ''))
|
||||
|
||||
for sect in data['sections']:
|
||||
zone_cd = _extract_zone_cd(sect['sect_cd'])
|
||||
zone = zones[zone_cd]
|
||||
if not zone['zone_nm']:
|
||||
zone['zone_nm'] = zone_name
|
||||
if not zone['jrsd_nm']:
|
||||
zone['jrsd_nm'] = jrsd_nm
|
||||
zone['sections'].append(sect)
|
||||
|
||||
return dict(zones)
|
||||
|
||||
|
||||
def run_save(json_dir: Path, dry_run: bool = False):
|
||||
"""JSON 파싱 결과를 DB에 저장한다."""
|
||||
all_data = load_json_files(json_dir)
|
||||
if not all_data:
|
||||
print(f'유효한 JSON 파일을 찾을 수 없습니다: {json_dir}')
|
||||
sys.exit(1)
|
||||
|
||||
zones = group_by_zone(all_data)
|
||||
total_sections = sum(len(z['sections']) for z in zones.values())
|
||||
|
||||
print(f'=== DB 저장 {"미리보기" if dry_run else "시작"} ===')
|
||||
print(f'총 {len(zones)}개 zone, {total_sections}개 구간\n')
|
||||
|
||||
for zone_cd, zone_info in sorted(zones.items()):
|
||||
sect_count = len(zone_info['sections'])
|
||||
print(f' {zone_cd:8s} | {zone_info["zone_nm"]:20s} | {zone_info["jrsd_nm"]:8s} | {sect_count}개 구간')
|
||||
|
||||
if dry_run:
|
||||
print(f'\n(dry-run 모드 — DB에 저장하지 않음)')
|
||||
return
|
||||
|
||||
# 실제 DB 저장
|
||||
from db import ensure_zone, upsert_section, update_zone_sect_count, update_zone_center, close_pool
|
||||
|
||||
saved_zones = 0
|
||||
saved_sections = 0
|
||||
|
||||
try:
|
||||
for zone_cd, zone_info in sorted(zones.items()):
|
||||
zone_sn = ensure_zone(zone_cd, zone_info['zone_nm'], zone_info['jrsd_nm'])
|
||||
saved_zones += 1
|
||||
|
||||
for sect_dict in zone_info['sections']:
|
||||
section = _dict_to_section(sect_dict)
|
||||
upsert_section(zone_sn, section)
|
||||
saved_sections += 1
|
||||
|
||||
update_zone_sect_count(zone_sn)
|
||||
update_zone_center(zone_sn)
|
||||
|
||||
print(f'\n=== 완료 ===')
|
||||
print(f'{saved_zones}개 zone, {saved_sections}개 구간 저장 완료')
|
||||
except Exception as e:
|
||||
print(f'\n오류 발생: {e}')
|
||||
print(f'저장 진행: {saved_zones}개 zone, {saved_sections}개 구간까지 완료')
|
||||
raise
|
||||
finally:
|
||||
close_pool()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Geocoding
|
||||
# ---------------------------------------------------------------------------
|
||||
def run_geocode(json_dir: Path):
|
||||
"""JSON 파싱 결과에 좌표를 추가한다."""
|
||||
from geocoder import geocode_sections, load_cache, save_cache
|
||||
|
||||
load_cache()
|
||||
|
||||
json_files = sorted(json_dir.glob('*.json'))
|
||||
json_files = [f for f in json_files if not f.name.startswith('.')]
|
||||
if not json_files:
|
||||
print(f'JSON 파일을 찾을 수 없습니다: {json_dir}')
|
||||
sys.exit(1)
|
||||
|
||||
print(f'=== Geocoding 시작 ({len(json_files)}개 JSON) ===\n')
|
||||
|
||||
total_success = 0
|
||||
total_fail = 0
|
||||
|
||||
for i, f in enumerate(json_files, 1):
|
||||
with open(f, encoding='utf-8') as fp:
|
||||
data = json.load(fp)
|
||||
|
||||
sections = data.get('sections', [])
|
||||
if not sections:
|
||||
continue
|
||||
|
||||
zone_name = data.get('zone_name', '')
|
||||
print(f'[{i}/{len(json_files)}] {f.name} ({len(sections)}개 구간)...')
|
||||
|
||||
success, fail = geocode_sections(sections, zone_name)
|
||||
total_success += success
|
||||
total_fail += fail
|
||||
|
||||
# 좌표가 있는 구간 수
|
||||
with_coords = sum(1 for s in sections if s.get('lat'))
|
||||
print(f' -> 좌표: {with_coords}/{len(sections)}')
|
||||
|
||||
# JSON 업데이트 저장
|
||||
with open(f, 'w', encoding='utf-8') as fp:
|
||||
json.dump(data, fp, ensure_ascii=False, indent=2)
|
||||
|
||||
save_cache()
|
||||
|
||||
print(f'\n=== Geocoding 완료 ===')
|
||||
print(f'성공: {total_success} / 실패: {total_fail}')
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 이미지 추출
|
||||
# ---------------------------------------------------------------------------
|
||||
def run_extract_images(target: Path):
|
||||
"""PDF에서 해안사진을 추출하여 scat-photos/에 저장."""
|
||||
from image_extractor import extract_images_from_pdf
|
||||
|
||||
if target.is_file() and target.suffix.lower() == '.pdf':
|
||||
pdf_files = [target]
|
||||
elif target.is_dir():
|
||||
pdf_files = sorted(target.glob('*.pdf'))
|
||||
if not pdf_files:
|
||||
print(f'PDF 파일을 찾을 수 없습니다: {target}')
|
||||
sys.exit(1)
|
||||
print(f'{len(pdf_files)}개 PDF 발견\n')
|
||||
else:
|
||||
print(f'유효하지 않은 경로: {target}')
|
||||
sys.exit(1)
|
||||
|
||||
total = 0
|
||||
for i, pdf in enumerate(pdf_files, 1):
|
||||
pdf_type = detect_pdf_type(pdf)
|
||||
print(f'[{i}/{len(pdf_files)}] {pdf.name} (Type {pdf_type}) 이미지 추출 중...')
|
||||
try:
|
||||
count = extract_images_from_pdf(pdf, pdf_type=pdf_type)
|
||||
total += count
|
||||
print(f' -> {count}개 이미지 저장')
|
||||
except Exception as e:
|
||||
print(f' -> 오류: {e}')
|
||||
|
||||
print(f'\n=== 이미지 추출 완료: 총 {total}개 ===')
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='SCAT PDF 파싱 CLI 도구')
|
||||
parser.add_argument('target', nargs='?', help='PDF 파일 또는 디렉토리 경로')
|
||||
parser.add_argument('--save', action='store_true', help='파싱 결과를 DB에 저장')
|
||||
parser.add_argument('--load-json', type=Path, help='이미 파싱된 JSON 디렉토리에서 로드')
|
||||
parser.add_argument('--geocode', action='store_true', help='JSON에 Kakao Geocoding으로 좌표 추가')
|
||||
parser.add_argument('--extract-images', action='store_true', help='PDF에서 해안사진 추출 → scat-photos/')
|
||||
parser.add_argument('--dry-run', action='store_true', help='DB 저장 미리보기 (실제 저장 안 함)')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# JSON 로드 모드
|
||||
if args.load_json:
|
||||
if args.geocode:
|
||||
run_geocode(args.load_json)
|
||||
if args.save:
|
||||
run_save(args.load_json, dry_run=args.dry_run)
|
||||
if not args.geocode and not args.save:
|
||||
print('--load-json은 --geocode 또는 --save와 함께 사용해야 합니다.')
|
||||
sys.exit(1)
|
||||
return
|
||||
|
||||
# PDF 파싱 모드
|
||||
if not args.target:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
target = Path(args.target)
|
||||
|
||||
# 이미지 추출 모드
|
||||
if args.extract_images:
|
||||
run_extract_images(target)
|
||||
return
|
||||
|
||||
run_parse(target)
|
||||
|
||||
# 파싱 후 바로 DB 저장
|
||||
if args.save:
|
||||
print('\n')
|
||||
run_save(OUTPUT_DIR, dry_run=args.dry_run)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
불러오는 중...
Reference in New Issue
Block a user