"""PDF 텍스트 파싱 — 상태머신 방식으로 해안사전평가 정보를 추출한다.""" from __future__ import annotations import re from enum import Enum, auto from pathlib import Path from typing import List, Optional, Tuple, Union import fitz # PyMuPDF from models import CoastalSection, SensitiveItem, ParseResult from esi_mapper import map_esi # --------------------------------------------------------------------------- # 상태머신 상태 # --------------------------------------------------------------------------- class State(Enum): HEADER = auto() GENERAL = auto() # 해안 일반특성 ACCESS = auto() # 접근방법 SENSITIVE = auto() # 민감자원 정보 CLEANUP = auto() # 권장 방제 방법 END_CRITERIA = auto() # 권장 방제 중지 기준 CONSIDER = auto() # 해안 방제시 고려사항 # --------------------------------------------------------------------------- # 섹션 시작 키워드 → 상태 매핑 # --------------------------------------------------------------------------- _SECTION_KEYWORDS: list[tuple[str, State]] = [ ('해안 일반특성', State.GENERAL), ('해안 일반 특성', State.GENERAL), ('접근방법', State.ACCESS), ('접근 방법', State.ACCESS), ('민감자원 정보', State.SENSITIVE), ('민감자원정보', State.SENSITIVE), ('권장 방제 방법', State.CLEANUP), ('권장방제방법', State.CLEANUP), ('권 장 방 제 방 법', State.CLEANUP), ('권장 방제 중지 기준', State.END_CRITERIA), ('권장방제중지기준', State.END_CRITERIA), ('권 장 방 제 중 지 기 준', State.END_CRITERIA), ('해안 방제시 고려사항', State.CONSIDER), ('해안 방제 시 고려사항', State.CONSIDER), ('해안방제시 고려사항', State.CONSIDER), ] # 코드 패턴: SSDD-1, BRSM-12, DDIS-1 등 _CODE_RE = re.compile(r'\(([A-Z]{2,}-\d+)\)') _NAME_CODE_RE = re.compile(r'(.+?)\s*\(([A-Z]{2,}-\d+)\)') _LENGTH_RE = re.compile(r'약\s*([\d,]+\.?\d*)\s*m\s*임?') _WIDTH_RE = re.compile(r'폭[은는]?\s*약\s*([\d,]+\.?\d*)\s*m') _NUMBER_RE = re.compile(r'^\d+$') # --------------------------------------------------------------------------- # 유틸 함수 # --------------------------------------------------------------------------- def _clean_bullet(line: str) -> str: """불릿 접두사(Ÿ, ·, •, -) 제거 후 strip.""" return line.lstrip('Ÿ \t·•- ').strip() def _is_bullet(line: str) -> bool: """불릿으로 시작하는 줄인지 확인.""" stripped = line.strip() return stripped.startswith('Ÿ') or stripped.startswith('·') or stripped.startswith('•') def _is_sub_bullet(line: str) -> bool: """서브 불릿(- 접두사)으로 시작하는 줄인지 확인.""" stripped = line.strip() return stripped.startswith('-') and len(stripped) > 1 def _is_end_criteria_item(text: str) -> bool: """방제 중지 기준 항목인지 판별 (조건문 패턴).""" criteria_patterns = [ '없어야', '않아야', '미만', '이하', '이상', '분포해야', '발생하지', ] return any(p in text for p in criteria_patterns) def _parse_measurement(text: str, pattern: re.Pattern) -> float | None: """정규식으로 수치 추출.""" m = pattern.search(text) if m: return float(m.group(1).replace(',', '')) return None def _detect_section_keyword(line: str) -> State | None: """줄이 섹션 시작 키워드를 포함하는지 확인.""" normalized = line.replace(' ', '') for keyword, state in _SECTION_KEYWORDS: if keyword.replace(' ', '') in normalized: return state return None # --------------------------------------------------------------------------- # 데이터 페이지 판별 # --------------------------------------------------------------------------- def is_data_page(page: fitz.Page) -> bool: """데이터 페이지인지 판별 — 코드 패턴 + 키워드 존재 여부.""" text = page.get_text('text') has_code = bool(_CODE_RE.search(text)) has_keyword = '일반특성' in text or '접근방법' in text or '방제 방법' in text return has_code and has_keyword # --------------------------------------------------------------------------- # 단일 페이지 파싱 # --------------------------------------------------------------------------- def _merge_bullet_lines(raw_lines: list) -> list: """F-series 형식: 'Ÿ' 단독 줄 + 다음 줄 텍스트를 병합.""" merged = [] i = 0 while i < len(raw_lines): line = raw_lines[i].strip() if line == 'Ÿ' and i + 1 < len(raw_lines): # 다음 줄과 병합 merged.append('Ÿ ' + raw_lines[i + 1].strip()) i += 2 elif line: merged.append(line) i += 1 else: i += 1 return merged def parse_page(page: fitz.Page) -> CoastalSection | None: """데이터 페이지에서 CoastalSection 추출.""" text = page.get_text('text') raw_lines = text.split('\n') lines = _merge_bullet_lines(raw_lines) section = CoastalSection() state = State.HEADER # 현재 섹션에 수집 중인 불릿 항목들 current_bullets: list[str] = [] # 민감자원 서브 섹션 추적 sensitive_sub: str = '' sensitive_items: list[SensitiveItem] = [] # 방제방법+중지기준 두 컬럼 병합 모드 cleanup_merged = False def _flush_bullets(): """현재 상태의 불릿을 section에 반영.""" nonlocal current_bullets, sensitive_sub if state == State.GENERAL: _parse_general(section, current_bullets) elif state == State.ACCESS: _parse_access(section, current_bullets) elif state == State.SENSITIVE: if sensitive_sub and current_bullets: sensitive_items.append(SensitiveItem( t=sensitive_sub, v='\n'.join(current_bullets), )) elif state == State.CLEANUP: section.cleanup_methods = current_bullets[:] elif state == State.END_CRITERIA: # 병합 모드: 불릿을 방제방법/중지기준으로 분류 if cleanup_merged: _split_cleanup_and_criteria(section, current_bullets) else: section.end_criteria = current_bullets[:] elif state == State.CONSIDER: section.notes = current_bullets[:] current_bullets = [] for line in lines: # 페이지 헤더/푸터 스킵 if '해양경비안전서' in line and ('관할' in line or '정보집' in line): continue if '해안사전평가 정보' in line and '∙' in line: continue if '해양경찰서' in line and ('관할' in line or '정보집' in line): continue # 섹션 전환 감지 new_state = _detect_section_keyword(line) if new_state and new_state != state: # 방제방법→중지기준 헤더가 연속 (두 컬럼 레이아웃) if state == State.CLEANUP and new_state == State.END_CRITERIA and not current_bullets: cleanup_merged = True state = State.END_CRITERIA continue _flush_bullets() state = new_state sensitive_sub = '' continue # HEADER 상태: 번호 + 지역명/코드명 추출 if state == State.HEADER: if _NUMBER_RE.match(line): section.section_number = int(line) continue m = _NAME_CODE_RE.search(line) if m: section.sect_nm = m.group(1).strip() section.sect_cd = m.group(2).strip() continue continue # 민감자원: 서브 섹션 감지 (Ÿ 불릿 또는 일반 텍스트) if state == State.SENSITIVE: cleaned_for_check = _clean_bullet(line) if _is_bullet(line) else line if '경제적' in cleaned_for_check and '자원' in cleaned_for_check: if sensitive_sub and current_bullets: sensitive_items.append(SensitiveItem( t=sensitive_sub, v='\n'.join(current_bullets), )) sensitive_sub = '사회경제적' current_bullets = [] continue if '생물자원' in cleaned_for_check: if sensitive_sub and current_bullets: sensitive_items.append(SensitiveItem( t=sensitive_sub, v='\n'.join(current_bullets), )) sensitive_sub = '생물자원' current_bullets = [] continue # 불릿 항목 수집 if _is_bullet(line): current_bullets.append(_clean_bullet(line)) elif _is_sub_bullet(line): # "-" 접두사 서브 항목 (민감자원 상세 등) cleaned = line.strip().lstrip('-').strip() if cleaned: current_bullets.append(cleaned) elif current_bullets and line and not _detect_section_keyword(line): # 연속행 (불릿 없이 이어지는 텍스트) cleaned = line.strip() if cleaned: current_bullets[-1] += ' ' + cleaned # 마지막 섹션 flush _flush_bullets() section.sensitive_info = sensitive_items if not section.sect_cd: return None # ESI 등급 매핑 (cst_tp_cd 기반) if section.cst_tp_cd: section.esi_cd, section.esi_num = map_esi(section.cst_tp_cd) return section # --------------------------------------------------------------------------- # 섹션별 파싱 헬퍼 # --------------------------------------------------------------------------- def _parse_general(section: CoastalSection, bullets: list[str]): """해안 일반특성 불릿에서 shore_tp, cst_tp_cd, len_m, width_m 추출.""" for b in bullets: if '형태' in b: if '폐쇄' in b: section.shore_tp = '폐쇄형' elif '개방' in b: section.shore_tp = '개방형' elif '반폐쇄' in b or '반 폐쇄' in b: section.shore_tp = '반폐쇄형' elif '이루어져' in b or '조성' in b or '으로 이루어' in b: # 해안 구성 추출 section.cst_tp_cd = _extract_coastal_type(b) length = _parse_measurement(b, _LENGTH_RE) if length and not section.len_m: section.len_m = length width = _parse_measurement(b, _WIDTH_RE) if width: section.width_m = width def _extract_coastal_type(text: str) -> str: """해안 구성 유형 추출.""" types = [ '투과성 인공호안', '비투과성 인공호안', '인공호안', '모래', '세립질 모래', '굵은 모래', '자갈', '수직암반', '수평암반', '갯벌', '습지', '사석', '콘크리트', '테트라포드', ] for t in types: if t in text: return t # fallback: "해안은 XXX으로 이루어져" 패턴 m = re.search(r'해안은\s+(.+?)(?:으로|로)\s*이루어져', text) if m: return m.group(1).strip() return text def _split_cleanup_and_criteria(section: CoastalSection, bullets: list[str]): """두 컬럼이 병합된 불릿을 방제방법/중지기준으로 분류.""" cleanup = [] criteria = [] for b in bullets: if _is_end_criteria_item(b): criteria.append(b) else: cleanup.append(b) section.cleanup_methods = cleanup section.end_criteria = criteria def _parse_access(section: CoastalSection, bullets: list[str]): """접근방법 불릿에서 access_dc, access_pt 추출.""" access_parts = [] for b in bullets: if '주요접근지점' in b or '주요 접근지점' in b or '주요접근 지점' in b: # "주요접근지점 : 부사방조제" 패턴 parts = re.split(r'[::]', b, maxsplit=1) if len(parts) > 1: section.access_pt = parts[1].strip() else: section.access_pt = b.replace('주요접근지점', '').strip() else: access_parts.append(b) if access_parts: section.access_dc = ' / '.join(access_parts) # --------------------------------------------------------------------------- # 전체 PDF 파싱 # --------------------------------------------------------------------------- def parse_pdf(pdf_path: str | Path) -> ParseResult: """PDF 전체를 파싱하여 ParseResult 반환.""" pdf_path = Path(pdf_path) doc = fitz.open(str(pdf_path)) result = ParseResult( pdf_filename=pdf_path.name, ) # 관할/구역명 추출 시도 (첫 30페이지 탐색) for i in range(min(35, doc.page_count)): text = doc[i].get_text('text') if '관할' in text and '해안' in text: # "보령 해양경비안전서 관할" 패턴 m = re.search(r'(\S+\s*해양경[비찰]\S*)\s*관할', text) if m and not result.jurisdiction: result.jurisdiction = m.group(1).strip() # 구역명: "X. 충남 서천군 해안 사전평가 정보" 패턴 m = re.search(r'\d+\.\s*(.+?)\s*해안\s*사전평가', text) if m and not result.zone_name: result.zone_name = m.group(1).strip() if result.jurisdiction and result.zone_name: break # 데이터 페이지 파싱 skipped = 0 for i in range(doc.page_count): page = doc[i] if not is_data_page(page): skipped += 1 continue section = parse_page(page) if section: result.sections.append(section) result.total_sections = len(result.sections) result.skipped_pages = skipped doc.close() return result # --------------------------------------------------------------------------- # CLI 실행 # --------------------------------------------------------------------------- if __name__ == '__main__': import sys import json if len(sys.argv) < 2: print('Usage: python parser.py ') sys.exit(1) r = parse_pdf(sys.argv[1]) print(json.dumps(r.model_dump(), ensure_ascii=False, indent=2))