- 백엔드: 관할서 목록 API, zone 필터링 쿼리 추가 - 프론트: ScatLeftPanel 관할서 드롭다운, ScatMap/ScatPopup 개선 - 기상탭: WeatherRightPanel 리팩토링 - prediction/scat: PDF 파싱 → 지오코딩 → ESI 매핑 파이프라인 - vite.config: proxy 설정 추가 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
342 lines
12 KiB
Python
342 lines
12 KiB
Python
"""PDF 텍스트 파싱 — Type B (방제정보집) 형식.
|
||
|
||
여수해경서 등의 '방제정보집' PDF에서 해안 구간 정보를 추출한다.
|
||
Type A(해안사전평가정보집)와 다른 레이아웃:
|
||
- 코드: HDPJ-1-M-E-R-P-L (괄호 없음)
|
||
- 페이지당 2개 구간
|
||
- '식별자 코드명' 라벨로 구간 시작
|
||
- '민감자원' 섹션 없음
|
||
- '초기 방제 및 고려사항' 섹션
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import re
|
||
from pathlib import Path
|
||
from typing import List, Optional
|
||
|
||
import fitz # PyMuPDF
|
||
|
||
from models import CoastalSection, ParseResult
|
||
from esi_mapper import parse_esi_cd
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 정규식 패턴
|
||
# ---------------------------------------------------------------------------
|
||
# HDPJ-1-M-E-R-P-L, HDDH-10-MI-E-R-P-L 등
|
||
_CODE_RE_B = re.compile(r'([A-Z]{2,}-\d+(?:-[A-Z]{1,2}){0,5}(?:-[A-Z])*)')
|
||
_ESI_RE = re.compile(r'ESI\s*등급\s*[::]\s*(\d+[A-Z]?)')
|
||
_LENGTH_RE = re.compile(r'([\d,.]+)\s*/\s*(-|[\d,.]+(?:\.\d+)?)')
|
||
_NUMBER_RE = re.compile(r'^(\d{1,3})$')
|
||
_SECTION_START_RE = re.compile(r'식별자\s+코드명')
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 유틸
|
||
# ---------------------------------------------------------------------------
|
||
def _clean_bullet(line: str) -> str:
|
||
return line.lstrip('Ÿ \t·•- ').strip()
|
||
|
||
|
||
def _is_bullet(line: str) -> bool:
|
||
s = line.strip()
|
||
return s.startswith('Ÿ') or s.startswith('·') or s.startswith('•')
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 데이터 페이지 판별
|
||
# ---------------------------------------------------------------------------
|
||
def is_data_page_b(page: fitz.Page) -> bool:
|
||
text = page.get_text('text')
|
||
has_identifier = bool(_SECTION_START_RE.search(text))
|
||
has_code = bool(_CODE_RE_B.search(text))
|
||
return has_identifier and has_code
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 단일 구간 블록 파싱
|
||
# ---------------------------------------------------------------------------
|
||
def _parse_section_block(lines: list[str], area_name: str) -> CoastalSection | None:
|
||
"""식별자 코드명 ~ 다음 식별자 코드명 사이의 텍스트 블록을 파싱."""
|
||
section = CoastalSection()
|
||
section.sect_nm = area_name
|
||
|
||
# Phase: code → general → cleanup → end_criteria → consider
|
||
phase = 'code'
|
||
cleanup_items: list[str] = []
|
||
end_criteria_items: list[str] = []
|
||
consider_items: list[str] = []
|
||
current_target: list[str] | None = None
|
||
|
||
i = 0
|
||
while i < len(lines):
|
||
line = lines[i].strip()
|
||
i += 1
|
||
|
||
if not line:
|
||
continue
|
||
|
||
# 코드 추출
|
||
if phase == 'code':
|
||
m = _CODE_RE_B.search(line)
|
||
if m:
|
||
section.sect_cd = m.group(1)
|
||
phase = 'general'
|
||
else:
|
||
# 구간 번호
|
||
nm = _NUMBER_RE.match(line)
|
||
if nm:
|
||
section.section_number = int(nm.group(1))
|
||
continue
|
||
|
||
# 해안 형태/저질 특성 + ESI + 길이
|
||
if phase == 'general':
|
||
# 형태
|
||
if '형태' in line and ':' in line:
|
||
val = line.split(':', 1)[1].strip().split(':')[-1].strip()
|
||
if val:
|
||
section.shore_tp = val
|
||
continue
|
||
# 퇴적물
|
||
if '퇴적물' in line and ':' in line:
|
||
val = line.split(':', 1)[1].strip().split(':')[-1].strip()
|
||
if val:
|
||
section.cst_tp_cd = val
|
||
continue
|
||
# ESI
|
||
m = _ESI_RE.search(line)
|
||
if m:
|
||
section.esi_cd, section.esi_num = parse_esi_cd(m.group(1))
|
||
continue
|
||
# 길이/폭
|
||
m = _LENGTH_RE.search(line)
|
||
if m:
|
||
try:
|
||
section.len_m = float(m.group(1).replace(',', ''))
|
||
except ValueError:
|
||
pass
|
||
width_str = m.group(2)
|
||
if width_str != '-':
|
||
# '2차선 도로' 등 비숫자 후속 방지
|
||
end_pos = m.end(2)
|
||
after = line[end_pos:end_pos + 1] if end_pos < len(line) else ''
|
||
if not after or after in (' ', '\t', '\n', ')', ''):
|
||
try:
|
||
section.width_m = float(width_str.replace(',', ''))
|
||
except ValueError:
|
||
pass
|
||
continue
|
||
# 접근성 — þ 마커들은 접근성 열에 해당
|
||
if 'þ' in line:
|
||
continue
|
||
# 섹션 전환 감지
|
||
normalized = line.replace(' ', '')
|
||
if '권장방제방법' in normalized:
|
||
phase = 'cleanup'
|
||
current_target = cleanup_items
|
||
continue
|
||
if '접근성' in normalized or '차량' in normalized or '도로' in normalized or '도보' in normalized or '선박' in normalized:
|
||
continue
|
||
if '해안길이' in normalized or '대표' in normalized or '사진' in normalized:
|
||
continue
|
||
continue
|
||
|
||
# 방제 방법 / 종료 기준 (두 컬럼이 같은 줄에 섞여 나옴)
|
||
if phase == 'cleanup':
|
||
normalized = line.replace(' ', '')
|
||
if '방제종료기준' in normalized:
|
||
# 헤더만 있는 줄 — 이후 불릿은 종료기준
|
||
current_target = end_criteria_items
|
||
continue
|
||
if '초기방제' in normalized and '고려사항' in normalized:
|
||
phase = 'consider'
|
||
current_target = consider_items
|
||
continue
|
||
if _is_bullet(line):
|
||
text = _clean_bullet(line)
|
||
if text:
|
||
# 두 컬럼이 혼합된 경우 heuristic 분류
|
||
if _is_criteria(text):
|
||
end_criteria_items.append(text)
|
||
else:
|
||
cleanup_items.append(text)
|
||
continue
|
||
|
||
# 고려사항
|
||
if phase == 'consider':
|
||
if _is_bullet(line):
|
||
text = _clean_bullet(line)
|
||
if text:
|
||
consider_items.append(text)
|
||
elif consider_items and line and not _SECTION_START_RE.search(line):
|
||
# 연속행
|
||
consider_items[-1] += ' ' + line
|
||
continue
|
||
|
||
section.cleanup_methods = cleanup_items
|
||
section.end_criteria = end_criteria_items
|
||
section.notes = consider_items
|
||
|
||
if not section.sect_cd:
|
||
return None
|
||
return section
|
||
|
||
|
||
def _is_criteria(text: str) -> bool:
|
||
patterns = ['없어야', '않아야', '미만', '이하', '이상', '분포해야',
|
||
'분포하면', '발생하지', '묻어나지', '유출되지', '관찰되는']
|
||
return any(p in text for p in patterns)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 접근성 추출
|
||
# ---------------------------------------------------------------------------
|
||
def _extract_accessibility(text_block: str) -> str | None:
|
||
"""블록 텍스트에서 접근성(차량/도보/선박) 추출."""
|
||
lines = text_block.split('\n')
|
||
access_types = []
|
||
# 접근성 헤더 찾기
|
||
header_idx = -1
|
||
for idx, line in enumerate(lines):
|
||
s = line.replace(' ', '')
|
||
if '접근성' in s:
|
||
header_idx = idx
|
||
break
|
||
|
||
if header_idx < 0:
|
||
return None
|
||
|
||
# 헤더 이후에서 차량/도로/도보/선박 라벨과 þ 위치 매칭
|
||
labels = []
|
||
for idx in range(header_idx, min(header_idx + 5, len(lines))):
|
||
line = lines[idx]
|
||
for label in ['차량', '도로', '도보', '선박']:
|
||
if label in line.replace(' ', ''):
|
||
labels.append('도로' if label == '도로' else label)
|
||
|
||
# þ 마커 수 세기
|
||
check_count = 0
|
||
for idx in range(header_idx, min(header_idx + 8, len(lines))):
|
||
check_count += lines[idx].count('þ')
|
||
|
||
if labels and check_count > 0:
|
||
# þ 개수만큼 앞에서부터 접근 가능
|
||
accessed = labels[:check_count] if check_count <= len(labels) else labels
|
||
return ', '.join(accessed) + ' 접근 가능'
|
||
|
||
return None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 페이지 파싱 (여러 구간)
|
||
# ---------------------------------------------------------------------------
|
||
def parse_page_b(page: fitz.Page) -> list[CoastalSection]:
|
||
"""데이터 페이지에서 CoastalSection 목록 추출 (보통 2개)."""
|
||
text = page.get_text('text')
|
||
lines = text.split('\n')
|
||
|
||
# 지역명 추출 (첫 줄 또는 헤더)
|
||
area_name = ''
|
||
for line in lines[:3]:
|
||
stripped = line.strip()
|
||
if stripped and '정보집' not in stripped and '∙' not in stripped:
|
||
area_name = stripped
|
||
break
|
||
|
||
# 접근성 추출 (페이지 전체에서)
|
||
accessibility = _extract_accessibility(text)
|
||
|
||
# 구간 블록 분리: "식별자 코드명" 기준
|
||
block_starts: list[int] = []
|
||
for idx, line in enumerate(lines):
|
||
if _SECTION_START_RE.search(line):
|
||
# 구간 번호는 이전 줄에 있을 수 있음
|
||
start = idx
|
||
if idx > 0 and _NUMBER_RE.match(lines[idx - 1].strip()):
|
||
start = idx - 1
|
||
block_starts.append(start)
|
||
|
||
if not block_starts:
|
||
return []
|
||
|
||
sections: list[CoastalSection] = []
|
||
for i, start in enumerate(block_starts):
|
||
end = block_starts[i + 1] if i + 1 < len(block_starts) else len(lines)
|
||
block = lines[start:end]
|
||
section = _parse_section_block(block, area_name)
|
||
if section:
|
||
if accessibility and not section.access_dc:
|
||
section.access_dc = accessibility
|
||
sections.append(section)
|
||
|
||
# 접근성은 구간마다 다를 수 있음 — 각 블록에서 개별 추출
|
||
for i, start in enumerate(block_starts):
|
||
end = block_starts[i + 1] if i + 1 < len(block_starts) else len(lines)
|
||
block_text = '\n'.join(lines[start:end])
|
||
acc = _extract_accessibility(block_text)
|
||
if acc and i < len(sections):
|
||
sections[i].access_dc = acc
|
||
|
||
return sections
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 전체 PDF 파싱
|
||
# ---------------------------------------------------------------------------
|
||
def parse_pdf_b(pdf_path: str | Path) -> ParseResult:
|
||
"""Type B PDF 전체를 파싱하여 ParseResult 반환."""
|
||
pdf_path = Path(pdf_path)
|
||
doc = fitz.open(str(pdf_path))
|
||
|
||
result = ParseResult(pdf_filename=pdf_path.name)
|
||
|
||
# 관할/구역명 추출 (페이지 헤더/푸터에서)
|
||
for i in range(min(40, doc.page_count)):
|
||
text = doc[i].get_text('text')
|
||
# "여수해경서 관할 해안 사전 평가 정보집" 패턴
|
||
m = re.search(r'(\S+해경서)\s*관할', text)
|
||
if m and not result.jurisdiction:
|
||
result.jurisdiction = m.group(1).strip()
|
||
# "2. 하동군 해안 사전 평가 정보" 패턴
|
||
m = re.search(r'\d+\.\s*(.+?)\s*해안\s*사전\s*평가', text)
|
||
if m and not result.zone_name:
|
||
result.zone_name = m.group(1).strip()
|
||
if result.jurisdiction and result.zone_name:
|
||
break
|
||
|
||
# 데이터 페이지 파싱
|
||
skipped = 0
|
||
for i in range(doc.page_count):
|
||
page = doc[i]
|
||
if not is_data_page_b(page):
|
||
skipped += 1
|
||
continue
|
||
sections = parse_page_b(page)
|
||
result.sections.extend(sections)
|
||
|
||
result.total_sections = len(result.sections)
|
||
result.skipped_pages = skipped
|
||
doc.close()
|
||
|
||
return result
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# CLI
|
||
# ---------------------------------------------------------------------------
|
||
if __name__ == '__main__':
|
||
import sys
|
||
import json
|
||
import io
|
||
|
||
if sys.platform == 'win32':
|
||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
|
||
|
||
if len(sys.argv) < 2:
|
||
print('Usage: python pdf_parser_b.py <pdf_path>')
|
||
sys.exit(1)
|
||
|
||
r = parse_pdf_b(sys.argv[1])
|
||
print(json.dumps(r.model_dump(), ensure_ascii=False, indent=2))
|