"""PDF 텍스트 파싱 — Type B (방제정보집) 형식. 여수해경서 등의 '방제정보집' PDF에서 해안 구간 정보를 추출한다. Type A(해안사전평가정보집)와 다른 레이아웃: - 코드: HDPJ-1-M-E-R-P-L (괄호 없음) - 페이지당 2개 구간 - '식별자 코드명' 라벨로 구간 시작 - '민감자원' 섹션 없음 - '초기 방제 및 고려사항' 섹션 """ from __future__ import annotations import re from pathlib import Path from typing import List, Optional import fitz # PyMuPDF from models import CoastalSection, ParseResult from esi_mapper import parse_esi_cd # --------------------------------------------------------------------------- # 정규식 패턴 # --------------------------------------------------------------------------- # HDPJ-1-M-E-R-P-L, HDDH-10-MI-E-R-P-L 등 _CODE_RE_B = re.compile(r'([A-Z]{2,}-\d+(?:-[A-Z]{1,2}){0,5}(?:-[A-Z])*)') _ESI_RE = re.compile(r'ESI\s*등급\s*[::]\s*(\d+[A-Z]?)') _LENGTH_RE = re.compile(r'([\d,.]+)\s*/\s*(-|[\d,.]+(?:\.\d+)?)') _NUMBER_RE = re.compile(r'^(\d{1,3})$') _SECTION_START_RE = re.compile(r'식별자\s+코드명') # --------------------------------------------------------------------------- # 유틸 # --------------------------------------------------------------------------- def _clean_bullet(line: str) -> str: return line.lstrip('Ÿ \t·•- ').strip() def _is_bullet(line: str) -> bool: s = line.strip() return s.startswith('Ÿ') or s.startswith('·') or s.startswith('•') # --------------------------------------------------------------------------- # 데이터 페이지 판별 # --------------------------------------------------------------------------- def is_data_page_b(page: fitz.Page) -> bool: text = page.get_text('text') has_identifier = bool(_SECTION_START_RE.search(text)) has_code = bool(_CODE_RE_B.search(text)) return has_identifier and has_code # --------------------------------------------------------------------------- # 단일 구간 블록 파싱 # --------------------------------------------------------------------------- def _parse_section_block(lines: list[str], area_name: str) -> CoastalSection | None: """식별자 코드명 ~ 다음 식별자 코드명 사이의 텍스트 블록을 파싱.""" section = CoastalSection() section.sect_nm = area_name # Phase: code → general → cleanup → end_criteria → consider phase = 'code' cleanup_items: list[str] = [] end_criteria_items: list[str] = [] consider_items: list[str] = [] current_target: list[str] | None = None i = 0 while i < len(lines): line = lines[i].strip() i += 1 if not line: continue # 코드 추출 if phase == 'code': m = _CODE_RE_B.search(line) if m: section.sect_cd = m.group(1) phase = 'general' else: # 구간 번호 nm = _NUMBER_RE.match(line) if nm: section.section_number = int(nm.group(1)) continue # 해안 형태/저질 특성 + ESI + 길이 if phase == 'general': # 형태 if '형태' in line and ':' in line: val = line.split(':', 1)[1].strip().split(':')[-1].strip() if val: section.shore_tp = val continue # 퇴적물 if '퇴적물' in line and ':' in line: val = line.split(':', 1)[1].strip().split(':')[-1].strip() if val: section.cst_tp_cd = val continue # ESI m = _ESI_RE.search(line) if m: section.esi_cd, section.esi_num = parse_esi_cd(m.group(1)) continue # 길이/폭 m = _LENGTH_RE.search(line) if m: try: section.len_m = float(m.group(1).replace(',', '')) except ValueError: pass width_str = m.group(2) if width_str != '-': # '2차선 도로' 등 비숫자 후속 방지 end_pos = m.end(2) after = line[end_pos:end_pos + 1] if end_pos < len(line) else '' if not after or after in (' ', '\t', '\n', ')', ''): try: section.width_m = float(width_str.replace(',', '')) except ValueError: pass continue # 접근성 — þ 마커들은 접근성 열에 해당 if 'þ' in line: continue # 섹션 전환 감지 normalized = line.replace(' ', '') if '권장방제방법' in normalized: phase = 'cleanup' current_target = cleanup_items continue if '접근성' in normalized or '차량' in normalized or '도로' in normalized or '도보' in normalized or '선박' in normalized: continue if '해안길이' in normalized or '대표' in normalized or '사진' in normalized: continue continue # 방제 방법 / 종료 기준 (두 컬럼이 같은 줄에 섞여 나옴) if phase == 'cleanup': normalized = line.replace(' ', '') if '방제종료기준' in normalized: # 헤더만 있는 줄 — 이후 불릿은 종료기준 current_target = end_criteria_items continue if '초기방제' in normalized and '고려사항' in normalized: phase = 'consider' current_target = consider_items continue if _is_bullet(line): text = _clean_bullet(line) if text: # 두 컬럼이 혼합된 경우 heuristic 분류 if _is_criteria(text): end_criteria_items.append(text) else: cleanup_items.append(text) continue # 고려사항 if phase == 'consider': if _is_bullet(line): text = _clean_bullet(line) if text: consider_items.append(text) elif consider_items and line and not _SECTION_START_RE.search(line): # 연속행 consider_items[-1] += ' ' + line continue section.cleanup_methods = cleanup_items section.end_criteria = end_criteria_items section.notes = consider_items if not section.sect_cd: return None return section def _is_criteria(text: str) -> bool: patterns = ['없어야', '않아야', '미만', '이하', '이상', '분포해야', '분포하면', '발생하지', '묻어나지', '유출되지', '관찰되는'] return any(p in text for p in patterns) # --------------------------------------------------------------------------- # 접근성 추출 # --------------------------------------------------------------------------- def _extract_accessibility(text_block: str) -> str | None: """블록 텍스트에서 접근성(차량/도보/선박) 추출.""" lines = text_block.split('\n') access_types = [] # 접근성 헤더 찾기 header_idx = -1 for idx, line in enumerate(lines): s = line.replace(' ', '') if '접근성' in s: header_idx = idx break if header_idx < 0: return None # 헤더 이후에서 차량/도로/도보/선박 라벨과 þ 위치 매칭 labels = [] for idx in range(header_idx, min(header_idx + 5, len(lines))): line = lines[idx] for label in ['차량', '도로', '도보', '선박']: if label in line.replace(' ', ''): labels.append('도로' if label == '도로' else label) # þ 마커 수 세기 check_count = 0 for idx in range(header_idx, min(header_idx + 8, len(lines))): check_count += lines[idx].count('þ') if labels and check_count > 0: # þ 개수만큼 앞에서부터 접근 가능 accessed = labels[:check_count] if check_count <= len(labels) else labels return ', '.join(accessed) + ' 접근 가능' return None # --------------------------------------------------------------------------- # 페이지 파싱 (여러 구간) # --------------------------------------------------------------------------- def parse_page_b(page: fitz.Page) -> list[CoastalSection]: """데이터 페이지에서 CoastalSection 목록 추출 (보통 2개).""" text = page.get_text('text') lines = text.split('\n') # 지역명 추출 (첫 줄 또는 헤더) area_name = '' for line in lines[:3]: stripped = line.strip() if stripped and '정보집' not in stripped and '∙' not in stripped: area_name = stripped break # 접근성 추출 (페이지 전체에서) accessibility = _extract_accessibility(text) # 구간 블록 분리: "식별자 코드명" 기준 block_starts: list[int] = [] for idx, line in enumerate(lines): if _SECTION_START_RE.search(line): # 구간 번호는 이전 줄에 있을 수 있음 start = idx if idx > 0 and _NUMBER_RE.match(lines[idx - 1].strip()): start = idx - 1 block_starts.append(start) if not block_starts: return [] sections: list[CoastalSection] = [] for i, start in enumerate(block_starts): end = block_starts[i + 1] if i + 1 < len(block_starts) else len(lines) block = lines[start:end] section = _parse_section_block(block, area_name) if section: if accessibility and not section.access_dc: section.access_dc = accessibility sections.append(section) # 접근성은 구간마다 다를 수 있음 — 각 블록에서 개별 추출 for i, start in enumerate(block_starts): end = block_starts[i + 1] if i + 1 < len(block_starts) else len(lines) block_text = '\n'.join(lines[start:end]) acc = _extract_accessibility(block_text) if acc and i < len(sections): sections[i].access_dc = acc return sections # --------------------------------------------------------------------------- # 전체 PDF 파싱 # --------------------------------------------------------------------------- def parse_pdf_b(pdf_path: str | Path) -> ParseResult: """Type B PDF 전체를 파싱하여 ParseResult 반환.""" pdf_path = Path(pdf_path) doc = fitz.open(str(pdf_path)) result = ParseResult(pdf_filename=pdf_path.name) # 관할/구역명 추출 (페이지 헤더/푸터에서) for i in range(min(40, doc.page_count)): text = doc[i].get_text('text') # "여수해경서 관할 해안 사전 평가 정보집" 패턴 m = re.search(r'(\S+해경서)\s*관할', text) if m and not result.jurisdiction: result.jurisdiction = m.group(1).strip() # "2. 하동군 해안 사전 평가 정보" 패턴 m = re.search(r'\d+\.\s*(.+?)\s*해안\s*사전\s*평가', text) if m and not result.zone_name: result.zone_name = m.group(1).strip() if result.jurisdiction and result.zone_name: break # 데이터 페이지 파싱 skipped = 0 for i in range(doc.page_count): page = doc[i] if not is_data_page_b(page): skipped += 1 continue sections = parse_page_b(page) result.sections.extend(sections) result.total_sections = len(result.sections) result.skipped_pages = skipped doc.close() return result # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- if __name__ == '__main__': import sys import json import io if sys.platform == 'win32': sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') if len(sys.argv) < 2: print('Usage: python pdf_parser_b.py ') sys.exit(1) r = parse_pdf_b(sys.argv[1]) print(json.dumps(r.model_dump(), ensure_ascii=False, indent=2))