wing-ops/prediction/scat/pdf_parser_b.py
leedano d9fb4506bc feat(scat): Pre-SCAT 관할서 필터링 + 해안조사 데이터 파이프라인 구축
- 백엔드: 관할서 목록 API, zone 필터링 쿼리 추가
- 프론트: ScatLeftPanel 관할서 드롭다운, ScatMap/ScatPopup 개선
- 기상탭: WeatherRightPanel 리팩토링
- prediction/scat: PDF 파싱 → 지오코딩 → ESI 매핑 파이프라인
- vite.config: proxy 설정 추가

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 10:53:19 +09:00

342 lines
12 KiB
Python
Raw Blame 히스토리

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""PDF 텍스트 파싱 — Type B (방제정보집) 형식.
여수해경서 등의 '방제정보집' PDF에서 해안 구간 정보를 추출한다.
Type A(해안사전평가정보집)와 다른 레이아웃:
- 코드: HDPJ-1-M-E-R-P-L (괄호 없음)
- 페이지당 2개 구간
- '식별자 코드명' 라벨로 구간 시작
- '민감자원' 섹션 없음
- '초기 방제 및 고려사항' 섹션
"""
from __future__ import annotations
import re
from pathlib import Path
from typing import List, Optional
import fitz # PyMuPDF
from models import CoastalSection, ParseResult
from esi_mapper import parse_esi_cd
# ---------------------------------------------------------------------------
# 정규식 패턴
# ---------------------------------------------------------------------------
# HDPJ-1-M-E-R-P-L, HDDH-10-MI-E-R-P-L 등
_CODE_RE_B = re.compile(r'([A-Z]{2,}-\d+(?:-[A-Z]{1,2}){0,5}(?:-[A-Z])*)')
_ESI_RE = re.compile(r'ESI\s*등급\s*[:]\s*(\d+[A-Z]?)')
_LENGTH_RE = re.compile(r'([\d,.]+)\s*/\s*(-|[\d,.]+(?:\.\d+)?)')
_NUMBER_RE = re.compile(r'^(\d{1,3})$')
_SECTION_START_RE = re.compile(r'식별자\s+코드명')
# ---------------------------------------------------------------------------
# 유틸
# ---------------------------------------------------------------------------
def _clean_bullet(line: str) -> str:
return line.lstrip('Ÿ \t·•- ').strip()
def _is_bullet(line: str) -> bool:
s = line.strip()
return s.startswith('Ÿ') or s.startswith('·') or s.startswith('')
# ---------------------------------------------------------------------------
# 데이터 페이지 판별
# ---------------------------------------------------------------------------
def is_data_page_b(page: fitz.Page) -> bool:
text = page.get_text('text')
has_identifier = bool(_SECTION_START_RE.search(text))
has_code = bool(_CODE_RE_B.search(text))
return has_identifier and has_code
# ---------------------------------------------------------------------------
# 단일 구간 블록 파싱
# ---------------------------------------------------------------------------
def _parse_section_block(lines: list[str], area_name: str) -> CoastalSection | None:
"""식별자 코드명 ~ 다음 식별자 코드명 사이의 텍스트 블록을 파싱."""
section = CoastalSection()
section.sect_nm = area_name
# Phase: code → general → cleanup → end_criteria → consider
phase = 'code'
cleanup_items: list[str] = []
end_criteria_items: list[str] = []
consider_items: list[str] = []
current_target: list[str] | None = None
i = 0
while i < len(lines):
line = lines[i].strip()
i += 1
if not line:
continue
# 코드 추출
if phase == 'code':
m = _CODE_RE_B.search(line)
if m:
section.sect_cd = m.group(1)
phase = 'general'
else:
# 구간 번호
nm = _NUMBER_RE.match(line)
if nm:
section.section_number = int(nm.group(1))
continue
# 해안 형태/저질 특성 + ESI + 길이
if phase == 'general':
# 형태
if '형태' in line and ':' in line:
val = line.split(':', 1)[1].strip().split('')[-1].strip()
if val:
section.shore_tp = val
continue
# 퇴적물
if '퇴적물' in line and ':' in line:
val = line.split(':', 1)[1].strip().split('')[-1].strip()
if val:
section.cst_tp_cd = val
continue
# ESI
m = _ESI_RE.search(line)
if m:
section.esi_cd, section.esi_num = parse_esi_cd(m.group(1))
continue
# 길이/폭
m = _LENGTH_RE.search(line)
if m:
try:
section.len_m = float(m.group(1).replace(',', ''))
except ValueError:
pass
width_str = m.group(2)
if width_str != '-':
# '2차선 도로' 등 비숫자 후속 방지
end_pos = m.end(2)
after = line[end_pos:end_pos + 1] if end_pos < len(line) else ''
if not after or after in (' ', '\t', '\n', ')', ''):
try:
section.width_m = float(width_str.replace(',', ''))
except ValueError:
pass
continue
# 접근성 — þ 마커들은 접근성 열에 해당
if 'þ' in line:
continue
# 섹션 전환 감지
normalized = line.replace(' ', '')
if '권장방제방법' in normalized:
phase = 'cleanup'
current_target = cleanup_items
continue
if '접근성' in normalized or '차량' in normalized or '도로' in normalized or '도보' in normalized or '선박' in normalized:
continue
if '해안길이' in normalized or '대표' in normalized or '사진' in normalized:
continue
continue
# 방제 방법 / 종료 기준 (두 컬럼이 같은 줄에 섞여 나옴)
if phase == 'cleanup':
normalized = line.replace(' ', '')
if '방제종료기준' in normalized:
# 헤더만 있는 줄 — 이후 불릿은 종료기준
current_target = end_criteria_items
continue
if '초기방제' in normalized and '고려사항' in normalized:
phase = 'consider'
current_target = consider_items
continue
if _is_bullet(line):
text = _clean_bullet(line)
if text:
# 두 컬럼이 혼합된 경우 heuristic 분류
if _is_criteria(text):
end_criteria_items.append(text)
else:
cleanup_items.append(text)
continue
# 고려사항
if phase == 'consider':
if _is_bullet(line):
text = _clean_bullet(line)
if text:
consider_items.append(text)
elif consider_items and line and not _SECTION_START_RE.search(line):
# 연속행
consider_items[-1] += ' ' + line
continue
section.cleanup_methods = cleanup_items
section.end_criteria = end_criteria_items
section.notes = consider_items
if not section.sect_cd:
return None
return section
def _is_criteria(text: str) -> bool:
patterns = ['없어야', '않아야', '미만', '이하', '이상', '분포해야',
'분포하면', '발생하지', '묻어나지', '유출되지', '관찰되는']
return any(p in text for p in patterns)
# ---------------------------------------------------------------------------
# 접근성 추출
# ---------------------------------------------------------------------------
def _extract_accessibility(text_block: str) -> str | None:
"""블록 텍스트에서 접근성(차량/도보/선박) 추출."""
lines = text_block.split('\n')
access_types = []
# 접근성 헤더 찾기
header_idx = -1
for idx, line in enumerate(lines):
s = line.replace(' ', '')
if '접근성' in s:
header_idx = idx
break
if header_idx < 0:
return None
# 헤더 이후에서 차량/도로/도보/선박 라벨과 þ 위치 매칭
labels = []
for idx in range(header_idx, min(header_idx + 5, len(lines))):
line = lines[idx]
for label in ['차량', '도로', '도보', '선박']:
if label in line.replace(' ', ''):
labels.append('도로' if label == '도로' else label)
# þ 마커 수 세기
check_count = 0
for idx in range(header_idx, min(header_idx + 8, len(lines))):
check_count += lines[idx].count('þ')
if labels and check_count > 0:
# þ 개수만큼 앞에서부터 접근 가능
accessed = labels[:check_count] if check_count <= len(labels) else labels
return ', '.join(accessed) + ' 접근 가능'
return None
# ---------------------------------------------------------------------------
# 페이지 파싱 (여러 구간)
# ---------------------------------------------------------------------------
def parse_page_b(page: fitz.Page) -> list[CoastalSection]:
"""데이터 페이지에서 CoastalSection 목록 추출 (보통 2개)."""
text = page.get_text('text')
lines = text.split('\n')
# 지역명 추출 (첫 줄 또는 헤더)
area_name = ''
for line in lines[:3]:
stripped = line.strip()
if stripped and '정보집' not in stripped and '' not in stripped:
area_name = stripped
break
# 접근성 추출 (페이지 전체에서)
accessibility = _extract_accessibility(text)
# 구간 블록 분리: "식별자 코드명" 기준
block_starts: list[int] = []
for idx, line in enumerate(lines):
if _SECTION_START_RE.search(line):
# 구간 번호는 이전 줄에 있을 수 있음
start = idx
if idx > 0 and _NUMBER_RE.match(lines[idx - 1].strip()):
start = idx - 1
block_starts.append(start)
if not block_starts:
return []
sections: list[CoastalSection] = []
for i, start in enumerate(block_starts):
end = block_starts[i + 1] if i + 1 < len(block_starts) else len(lines)
block = lines[start:end]
section = _parse_section_block(block, area_name)
if section:
if accessibility and not section.access_dc:
section.access_dc = accessibility
sections.append(section)
# 접근성은 구간마다 다를 수 있음 — 각 블록에서 개별 추출
for i, start in enumerate(block_starts):
end = block_starts[i + 1] if i + 1 < len(block_starts) else len(lines)
block_text = '\n'.join(lines[start:end])
acc = _extract_accessibility(block_text)
if acc and i < len(sections):
sections[i].access_dc = acc
return sections
# ---------------------------------------------------------------------------
# 전체 PDF 파싱
# ---------------------------------------------------------------------------
def parse_pdf_b(pdf_path: str | Path) -> ParseResult:
"""Type B PDF 전체를 파싱하여 ParseResult 반환."""
pdf_path = Path(pdf_path)
doc = fitz.open(str(pdf_path))
result = ParseResult(pdf_filename=pdf_path.name)
# 관할/구역명 추출 (페이지 헤더/푸터에서)
for i in range(min(40, doc.page_count)):
text = doc[i].get_text('text')
# "여수해경서 관할 해안 사전 평가 정보집" 패턴
m = re.search(r'(\S+해경서)\s*관할', text)
if m and not result.jurisdiction:
result.jurisdiction = m.group(1).strip()
# "2. 하동군 해안 사전 평가 정보" 패턴
m = re.search(r'\d+\.\s*(.+?)\s*해안\s*사전\s*평가', text)
if m and not result.zone_name:
result.zone_name = m.group(1).strip()
if result.jurisdiction and result.zone_name:
break
# 데이터 페이지 파싱
skipped = 0
for i in range(doc.page_count):
page = doc[i]
if not is_data_page_b(page):
skipped += 1
continue
sections = parse_page_b(page)
result.sections.extend(sections)
result.total_sections = len(result.sections)
result.skipped_pages = skipped
doc.close()
return result
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == '__main__':
import sys
import json
import io
if sys.platform == 'win32':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
if len(sys.argv) < 2:
print('Usage: python pdf_parser_b.py <pdf_path>')
sys.exit(1)
r = parse_pdf_b(sys.argv[1])
print(json.dumps(r.model_dump(), ensure_ascii=False, indent=2))