wing-ops/prediction/scat/pdf_parser.py
leedano d9fb4506bc feat(scat): Pre-SCAT 관할서 필터링 + 해안조사 데이터 파이프라인 구축
- 백엔드: 관할서 목록 API, zone 필터링 쿼리 추가
- 프론트: ScatLeftPanel 관할서 드롭다운, ScatMap/ScatPopup 개선
- 기상탭: WeatherRightPanel 리팩토링
- prediction/scat: PDF 파싱 → 지오코딩 → ESI 매핑 파이프라인
- vite.config: proxy 설정 추가

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 10:53:19 +09:00

391 lines
14 KiB
Python
Raw Blame 히스토리

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""PDF 텍스트 파싱 — 상태머신 방식으로 해안사전평가 정보를 추출한다."""
from __future__ import annotations
import re
from enum import Enum, auto
from pathlib import Path
from typing import List, Optional, Tuple, Union
import fitz # PyMuPDF
from models import CoastalSection, SensitiveItem, ParseResult
from esi_mapper import map_esi
# ---------------------------------------------------------------------------
# 상태머신 상태
# ---------------------------------------------------------------------------
class State(Enum):
HEADER = auto()
GENERAL = auto() # 해안 일반특성
ACCESS = auto() # 접근방법
SENSITIVE = auto() # 민감자원 정보
CLEANUP = auto() # 권장 방제 방법
END_CRITERIA = auto() # 권장 방제 중지 기준
CONSIDER = auto() # 해안 방제시 고려사항
# ---------------------------------------------------------------------------
# 섹션 시작 키워드 → 상태 매핑
# ---------------------------------------------------------------------------
_SECTION_KEYWORDS: list[tuple[str, State]] = [
('해안 일반특성', State.GENERAL),
('해안 일반 특성', State.GENERAL),
('접근방법', State.ACCESS),
('접근 방법', State.ACCESS),
('민감자원 정보', State.SENSITIVE),
('민감자원정보', State.SENSITIVE),
('권장 방제 방법', State.CLEANUP),
('권장방제방법', State.CLEANUP),
('권 장 방 제 방 법', State.CLEANUP),
('권장 방제 중지 기준', State.END_CRITERIA),
('권장방제중지기준', State.END_CRITERIA),
('권 장 방 제 중 지 기 준', State.END_CRITERIA),
('해안 방제시 고려사항', State.CONSIDER),
('해안 방제 시 고려사항', State.CONSIDER),
('해안방제시 고려사항', State.CONSIDER),
]
# 코드 패턴: SSDD-1, BRSM-12, DDIS-1 등
_CODE_RE = re.compile(r'\(([A-Z]{2,}-\d+)\)')
_NAME_CODE_RE = re.compile(r'(.+?)\s*\(([A-Z]{2,}-\d+)\)')
_LENGTH_RE = re.compile(r'\s*([\d,]+\.?\d*)\s*m\s*임?')
_WIDTH_RE = re.compile(r'폭[은는]?\s*약\s*([\d,]+\.?\d*)\s*m')
_NUMBER_RE = re.compile(r'^\d+$')
# ---------------------------------------------------------------------------
# 유틸 함수
# ---------------------------------------------------------------------------
def _clean_bullet(line: str) -> str:
"""불릿 접두사(Ÿ, ·, •, -) 제거 후 strip."""
return line.lstrip('Ÿ \t·•- ').strip()
def _is_bullet(line: str) -> bool:
"""불릿으로 시작하는 줄인지 확인."""
stripped = line.strip()
return stripped.startswith('Ÿ') or stripped.startswith('·') or stripped.startswith('')
def _is_sub_bullet(line: str) -> bool:
"""서브 불릿(- 접두사)으로 시작하는 줄인지 확인."""
stripped = line.strip()
return stripped.startswith('-') and len(stripped) > 1
def _is_end_criteria_item(text: str) -> bool:
"""방제 중지 기준 항목인지 판별 (조건문 패턴)."""
criteria_patterns = [
'없어야', '않아야', '미만', '이하', '이상',
'분포해야', '발생하지',
]
return any(p in text for p in criteria_patterns)
def _parse_measurement(text: str, pattern: re.Pattern) -> float | None:
"""정규식으로 수치 추출."""
m = pattern.search(text)
if m:
return float(m.group(1).replace(',', ''))
return None
def _detect_section_keyword(line: str) -> State | None:
"""줄이 섹션 시작 키워드를 포함하는지 확인."""
normalized = line.replace(' ', '')
for keyword, state in _SECTION_KEYWORDS:
if keyword.replace(' ', '') in normalized:
return state
return None
# ---------------------------------------------------------------------------
# 데이터 페이지 판별
# ---------------------------------------------------------------------------
def is_data_page(page: fitz.Page) -> bool:
"""데이터 페이지인지 판별 — 코드 패턴 + 키워드 존재 여부."""
text = page.get_text('text')
has_code = bool(_CODE_RE.search(text))
has_keyword = '일반특성' in text or '접근방법' in text or '방제 방법' in text
return has_code and has_keyword
# ---------------------------------------------------------------------------
# 단일 페이지 파싱
# ---------------------------------------------------------------------------
def _merge_bullet_lines(raw_lines: list) -> list:
"""F-series 형식: 'Ÿ' 단독 줄 + 다음 줄 텍스트를 병합."""
merged = []
i = 0
while i < len(raw_lines):
line = raw_lines[i].strip()
if line == 'Ÿ' and i + 1 < len(raw_lines):
# 다음 줄과 병합
merged.append('Ÿ ' + raw_lines[i + 1].strip())
i += 2
elif line:
merged.append(line)
i += 1
else:
i += 1
return merged
def parse_page(page: fitz.Page) -> CoastalSection | None:
"""데이터 페이지에서 CoastalSection 추출."""
text = page.get_text('text')
raw_lines = text.split('\n')
lines = _merge_bullet_lines(raw_lines)
section = CoastalSection()
state = State.HEADER
# 현재 섹션에 수집 중인 불릿 항목들
current_bullets: list[str] = []
# 민감자원 서브 섹션 추적
sensitive_sub: str = ''
sensitive_items: list[SensitiveItem] = []
# 방제방법+중지기준 두 컬럼 병합 모드
cleanup_merged = False
def _flush_bullets():
"""현재 상태의 불릿을 section에 반영."""
nonlocal current_bullets, sensitive_sub
if state == State.GENERAL:
_parse_general(section, current_bullets)
elif state == State.ACCESS:
_parse_access(section, current_bullets)
elif state == State.SENSITIVE:
if sensitive_sub and current_bullets:
sensitive_items.append(SensitiveItem(
t=sensitive_sub,
v='\n'.join(current_bullets),
))
elif state == State.CLEANUP:
section.cleanup_methods = current_bullets[:]
elif state == State.END_CRITERIA:
# 병합 모드: 불릿을 방제방법/중지기준으로 분류
if cleanup_merged:
_split_cleanup_and_criteria(section, current_bullets)
else:
section.end_criteria = current_bullets[:]
elif state == State.CONSIDER:
section.notes = current_bullets[:]
current_bullets = []
for line in lines:
# 페이지 헤더/푸터 스킵
if '해양경비안전서' in line and ('관할' in line or '정보집' in line):
continue
if '해안사전평가 정보' in line and '' in line:
continue
if '해양경찰서' in line and ('관할' in line or '정보집' in line):
continue
# 섹션 전환 감지
new_state = _detect_section_keyword(line)
if new_state and new_state != state:
# 방제방법→중지기준 헤더가 연속 (두 컬럼 레이아웃)
if state == State.CLEANUP and new_state == State.END_CRITERIA and not current_bullets:
cleanup_merged = True
state = State.END_CRITERIA
continue
_flush_bullets()
state = new_state
sensitive_sub = ''
continue
# HEADER 상태: 번호 + 지역명/코드명 추출
if state == State.HEADER:
if _NUMBER_RE.match(line):
section.section_number = int(line)
continue
m = _NAME_CODE_RE.search(line)
if m:
section.sect_nm = m.group(1).strip()
section.sect_cd = m.group(2).strip()
continue
continue
# 민감자원: 서브 섹션 감지 (Ÿ 불릿 또는 일반 텍스트)
if state == State.SENSITIVE:
cleaned_for_check = _clean_bullet(line) if _is_bullet(line) else line
if '경제적' in cleaned_for_check and '자원' in cleaned_for_check:
if sensitive_sub and current_bullets:
sensitive_items.append(SensitiveItem(
t=sensitive_sub, v='\n'.join(current_bullets),
))
sensitive_sub = '사회경제적'
current_bullets = []
continue
if '생물자원' in cleaned_for_check:
if sensitive_sub and current_bullets:
sensitive_items.append(SensitiveItem(
t=sensitive_sub, v='\n'.join(current_bullets),
))
sensitive_sub = '생물자원'
current_bullets = []
continue
# 불릿 항목 수집
if _is_bullet(line):
current_bullets.append(_clean_bullet(line))
elif _is_sub_bullet(line):
# "-" 접두사 서브 항목 (민감자원 상세 등)
cleaned = line.strip().lstrip('-').strip()
if cleaned:
current_bullets.append(cleaned)
elif current_bullets and line and not _detect_section_keyword(line):
# 연속행 (불릿 없이 이어지는 텍스트)
cleaned = line.strip()
if cleaned:
current_bullets[-1] += ' ' + cleaned
# 마지막 섹션 flush
_flush_bullets()
section.sensitive_info = sensitive_items
if not section.sect_cd:
return None
# ESI 등급 매핑 (cst_tp_cd 기반)
if section.cst_tp_cd:
section.esi_cd, section.esi_num = map_esi(section.cst_tp_cd)
return section
# ---------------------------------------------------------------------------
# 섹션별 파싱 헬퍼
# ---------------------------------------------------------------------------
def _parse_general(section: CoastalSection, bullets: list[str]):
"""해안 일반특성 불릿에서 shore_tp, cst_tp_cd, len_m, width_m 추출."""
for b in bullets:
if '형태' in b:
if '폐쇄' in b:
section.shore_tp = '폐쇄형'
elif '개방' in b:
section.shore_tp = '개방형'
elif '반폐쇄' in b or '반 폐쇄' in b:
section.shore_tp = '반폐쇄형'
elif '이루어져' in b or '조성' in b or '으로 이루어' in b:
# 해안 구성 추출
section.cst_tp_cd = _extract_coastal_type(b)
length = _parse_measurement(b, _LENGTH_RE)
if length and not section.len_m:
section.len_m = length
width = _parse_measurement(b, _WIDTH_RE)
if width:
section.width_m = width
def _extract_coastal_type(text: str) -> str:
"""해안 구성 유형 추출."""
types = [
'투과성 인공호안', '비투과성 인공호안', '인공호안',
'모래', '세립질 모래', '굵은 모래',
'자갈', '수직암반', '수평암반',
'갯벌', '습지', '사석',
'콘크리트', '테트라포드',
]
for t in types:
if t in text:
return t
# fallback: "해안은 XXX으로 이루어져" 패턴
m = re.search(r'해안은\s+(.+?)(?:으로|로)\s*이루어져', text)
if m:
return m.group(1).strip()
return text
def _split_cleanup_and_criteria(section: CoastalSection, bullets: list[str]):
"""두 컬럼이 병합된 불릿을 방제방법/중지기준으로 분류."""
cleanup = []
criteria = []
for b in bullets:
if _is_end_criteria_item(b):
criteria.append(b)
else:
cleanup.append(b)
section.cleanup_methods = cleanup
section.end_criteria = criteria
def _parse_access(section: CoastalSection, bullets: list[str]):
"""접근방법 불릿에서 access_dc, access_pt 추출."""
access_parts = []
for b in bullets:
if '주요접근지점' in b or '주요 접근지점' in b or '주요접근 지점' in b:
# "주요접근지점 : 부사방조제" 패턴
parts = re.split(r'[:]', b, maxsplit=1)
if len(parts) > 1:
section.access_pt = parts[1].strip()
else:
section.access_pt = b.replace('주요접근지점', '').strip()
else:
access_parts.append(b)
if access_parts:
section.access_dc = ' / '.join(access_parts)
# ---------------------------------------------------------------------------
# 전체 PDF 파싱
# ---------------------------------------------------------------------------
def parse_pdf(pdf_path: str | Path) -> ParseResult:
"""PDF 전체를 파싱하여 ParseResult 반환."""
pdf_path = Path(pdf_path)
doc = fitz.open(str(pdf_path))
result = ParseResult(
pdf_filename=pdf_path.name,
)
# 관할/구역명 추출 시도 (첫 30페이지 탐색)
for i in range(min(35, doc.page_count)):
text = doc[i].get_text('text')
if '관할' in text and '해안' in text:
# "보령 해양경비안전서 관할" 패턴
m = re.search(r'(\S+\s*해양경[비찰]\S*)\s*관할', text)
if m and not result.jurisdiction:
result.jurisdiction = m.group(1).strip()
# 구역명: "X. 충남 서천군 해안 사전평가 정보" 패턴
m = re.search(r'\d+\.\s*(.+?)\s*해안\s*사전평가', text)
if m and not result.zone_name:
result.zone_name = m.group(1).strip()
if result.jurisdiction and result.zone_name:
break
# 데이터 페이지 파싱
skipped = 0
for i in range(doc.page_count):
page = doc[i]
if not is_data_page(page):
skipped += 1
continue
section = parse_page(page)
if section:
result.sections.append(section)
result.total_sections = len(result.sections)
result.skipped_pages = skipped
doc.close()
return result
# ---------------------------------------------------------------------------
# CLI 실행
# ---------------------------------------------------------------------------
if __name__ == '__main__':
import sys
import json
if len(sys.argv) < 2:
print('Usage: python parser.py <pdf_path>')
sys.exit(1)
r = parse_pdf(sys.argv[1])
print(json.dumps(r.model_dump(), ensure_ascii=False, indent=2))