- 백엔드: 관할서 목록 API, zone 필터링 쿼리 추가 - 프론트: ScatLeftPanel 관할서 드롭다운, ScatMap/ScatPopup 개선 - 기상탭: WeatherRightPanel 리팩토링 - prediction/scat: PDF 파싱 → 지오코딩 → ESI 매핑 파이프라인 - vite.config: proxy 설정 추가 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
391 lines
14 KiB
Python
391 lines
14 KiB
Python
"""PDF 텍스트 파싱 — 상태머신 방식으로 해안사전평가 정보를 추출한다."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import re
|
||
from enum import Enum, auto
|
||
from pathlib import Path
|
||
from typing import List, Optional, Tuple, Union
|
||
|
||
import fitz # PyMuPDF
|
||
|
||
from models import CoastalSection, SensitiveItem, ParseResult
|
||
from esi_mapper import map_esi
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 상태머신 상태
|
||
# ---------------------------------------------------------------------------
|
||
class State(Enum):
|
||
HEADER = auto()
|
||
GENERAL = auto() # 해안 일반특성
|
||
ACCESS = auto() # 접근방법
|
||
SENSITIVE = auto() # 민감자원 정보
|
||
CLEANUP = auto() # 권장 방제 방법
|
||
END_CRITERIA = auto() # 권장 방제 중지 기준
|
||
CONSIDER = auto() # 해안 방제시 고려사항
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 섹션 시작 키워드 → 상태 매핑
|
||
# ---------------------------------------------------------------------------
|
||
_SECTION_KEYWORDS: list[tuple[str, State]] = [
|
||
('해안 일반특성', State.GENERAL),
|
||
('해안 일반 특성', State.GENERAL),
|
||
('접근방법', State.ACCESS),
|
||
('접근 방법', State.ACCESS),
|
||
('민감자원 정보', State.SENSITIVE),
|
||
('민감자원정보', State.SENSITIVE),
|
||
('권장 방제 방법', State.CLEANUP),
|
||
('권장방제방법', State.CLEANUP),
|
||
('권 장 방 제 방 법', State.CLEANUP),
|
||
('권장 방제 중지 기준', State.END_CRITERIA),
|
||
('권장방제중지기준', State.END_CRITERIA),
|
||
('권 장 방 제 중 지 기 준', State.END_CRITERIA),
|
||
('해안 방제시 고려사항', State.CONSIDER),
|
||
('해안 방제 시 고려사항', State.CONSIDER),
|
||
('해안방제시 고려사항', State.CONSIDER),
|
||
]
|
||
|
||
# 코드 패턴: SSDD-1, BRSM-12, DDIS-1 등
|
||
_CODE_RE = re.compile(r'\(([A-Z]{2,}-\d+)\)')
|
||
_NAME_CODE_RE = re.compile(r'(.+?)\s*\(([A-Z]{2,}-\d+)\)')
|
||
_LENGTH_RE = re.compile(r'약\s*([\d,]+\.?\d*)\s*m\s*임?')
|
||
_WIDTH_RE = re.compile(r'폭[은는]?\s*약\s*([\d,]+\.?\d*)\s*m')
|
||
_NUMBER_RE = re.compile(r'^\d+$')
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 유틸 함수
|
||
# ---------------------------------------------------------------------------
|
||
def _clean_bullet(line: str) -> str:
|
||
"""불릿 접두사(Ÿ, ·, •, -) 제거 후 strip."""
|
||
return line.lstrip('Ÿ \t·•- ').strip()
|
||
|
||
|
||
def _is_bullet(line: str) -> bool:
|
||
"""불릿으로 시작하는 줄인지 확인."""
|
||
stripped = line.strip()
|
||
return stripped.startswith('Ÿ') or stripped.startswith('·') or stripped.startswith('•')
|
||
|
||
|
||
def _is_sub_bullet(line: str) -> bool:
|
||
"""서브 불릿(- 접두사)으로 시작하는 줄인지 확인."""
|
||
stripped = line.strip()
|
||
return stripped.startswith('-') and len(stripped) > 1
|
||
|
||
|
||
def _is_end_criteria_item(text: str) -> bool:
|
||
"""방제 중지 기준 항목인지 판별 (조건문 패턴)."""
|
||
criteria_patterns = [
|
||
'없어야', '않아야', '미만', '이하', '이상',
|
||
'분포해야', '발생하지',
|
||
]
|
||
return any(p in text for p in criteria_patterns)
|
||
|
||
|
||
def _parse_measurement(text: str, pattern: re.Pattern) -> float | None:
|
||
"""정규식으로 수치 추출."""
|
||
m = pattern.search(text)
|
||
if m:
|
||
return float(m.group(1).replace(',', ''))
|
||
return None
|
||
|
||
|
||
def _detect_section_keyword(line: str) -> State | None:
|
||
"""줄이 섹션 시작 키워드를 포함하는지 확인."""
|
||
normalized = line.replace(' ', '')
|
||
for keyword, state in _SECTION_KEYWORDS:
|
||
if keyword.replace(' ', '') in normalized:
|
||
return state
|
||
return None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 데이터 페이지 판별
|
||
# ---------------------------------------------------------------------------
|
||
def is_data_page(page: fitz.Page) -> bool:
|
||
"""데이터 페이지인지 판별 — 코드 패턴 + 키워드 존재 여부."""
|
||
text = page.get_text('text')
|
||
has_code = bool(_CODE_RE.search(text))
|
||
has_keyword = '일반특성' in text or '접근방법' in text or '방제 방법' in text
|
||
return has_code and has_keyword
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 단일 페이지 파싱
|
||
# ---------------------------------------------------------------------------
|
||
def _merge_bullet_lines(raw_lines: list) -> list:
|
||
"""F-series 형식: 'Ÿ' 단독 줄 + 다음 줄 텍스트를 병합."""
|
||
merged = []
|
||
i = 0
|
||
while i < len(raw_lines):
|
||
line = raw_lines[i].strip()
|
||
if line == 'Ÿ' and i + 1 < len(raw_lines):
|
||
# 다음 줄과 병합
|
||
merged.append('Ÿ ' + raw_lines[i + 1].strip())
|
||
i += 2
|
||
elif line:
|
||
merged.append(line)
|
||
i += 1
|
||
else:
|
||
i += 1
|
||
return merged
|
||
|
||
|
||
def parse_page(page: fitz.Page) -> CoastalSection | None:
|
||
"""데이터 페이지에서 CoastalSection 추출."""
|
||
text = page.get_text('text')
|
||
raw_lines = text.split('\n')
|
||
lines = _merge_bullet_lines(raw_lines)
|
||
|
||
section = CoastalSection()
|
||
state = State.HEADER
|
||
|
||
# 현재 섹션에 수집 중인 불릿 항목들
|
||
current_bullets: list[str] = []
|
||
# 민감자원 서브 섹션 추적
|
||
sensitive_sub: str = ''
|
||
sensitive_items: list[SensitiveItem] = []
|
||
# 방제방법+중지기준 두 컬럼 병합 모드
|
||
cleanup_merged = False
|
||
|
||
def _flush_bullets():
|
||
"""현재 상태의 불릿을 section에 반영."""
|
||
nonlocal current_bullets, sensitive_sub
|
||
if state == State.GENERAL:
|
||
_parse_general(section, current_bullets)
|
||
elif state == State.ACCESS:
|
||
_parse_access(section, current_bullets)
|
||
elif state == State.SENSITIVE:
|
||
if sensitive_sub and current_bullets:
|
||
sensitive_items.append(SensitiveItem(
|
||
t=sensitive_sub,
|
||
v='\n'.join(current_bullets),
|
||
))
|
||
elif state == State.CLEANUP:
|
||
section.cleanup_methods = current_bullets[:]
|
||
elif state == State.END_CRITERIA:
|
||
# 병합 모드: 불릿을 방제방법/중지기준으로 분류
|
||
if cleanup_merged:
|
||
_split_cleanup_and_criteria(section, current_bullets)
|
||
else:
|
||
section.end_criteria = current_bullets[:]
|
||
elif state == State.CONSIDER:
|
||
section.notes = current_bullets[:]
|
||
current_bullets = []
|
||
|
||
for line in lines:
|
||
# 페이지 헤더/푸터 스킵
|
||
if '해양경비안전서' in line and ('관할' in line or '정보집' in line):
|
||
continue
|
||
if '해안사전평가 정보' in line and '∙' in line:
|
||
continue
|
||
if '해양경찰서' in line and ('관할' in line or '정보집' in line):
|
||
continue
|
||
|
||
# 섹션 전환 감지
|
||
new_state = _detect_section_keyword(line)
|
||
if new_state and new_state != state:
|
||
# 방제방법→중지기준 헤더가 연속 (두 컬럼 레이아웃)
|
||
if state == State.CLEANUP and new_state == State.END_CRITERIA and not current_bullets:
|
||
cleanup_merged = True
|
||
state = State.END_CRITERIA
|
||
continue
|
||
_flush_bullets()
|
||
state = new_state
|
||
sensitive_sub = ''
|
||
continue
|
||
|
||
# HEADER 상태: 번호 + 지역명/코드명 추출
|
||
if state == State.HEADER:
|
||
if _NUMBER_RE.match(line):
|
||
section.section_number = int(line)
|
||
continue
|
||
m = _NAME_CODE_RE.search(line)
|
||
if m:
|
||
section.sect_nm = m.group(1).strip()
|
||
section.sect_cd = m.group(2).strip()
|
||
continue
|
||
continue
|
||
|
||
# 민감자원: 서브 섹션 감지 (Ÿ 불릿 또는 일반 텍스트)
|
||
if state == State.SENSITIVE:
|
||
cleaned_for_check = _clean_bullet(line) if _is_bullet(line) else line
|
||
if '경제적' in cleaned_for_check and '자원' in cleaned_for_check:
|
||
if sensitive_sub and current_bullets:
|
||
sensitive_items.append(SensitiveItem(
|
||
t=sensitive_sub, v='\n'.join(current_bullets),
|
||
))
|
||
sensitive_sub = '사회경제적'
|
||
current_bullets = []
|
||
continue
|
||
if '생물자원' in cleaned_for_check:
|
||
if sensitive_sub and current_bullets:
|
||
sensitive_items.append(SensitiveItem(
|
||
t=sensitive_sub, v='\n'.join(current_bullets),
|
||
))
|
||
sensitive_sub = '생물자원'
|
||
current_bullets = []
|
||
continue
|
||
|
||
# 불릿 항목 수집
|
||
if _is_bullet(line):
|
||
current_bullets.append(_clean_bullet(line))
|
||
elif _is_sub_bullet(line):
|
||
# "-" 접두사 서브 항목 (민감자원 상세 등)
|
||
cleaned = line.strip().lstrip('-').strip()
|
||
if cleaned:
|
||
current_bullets.append(cleaned)
|
||
elif current_bullets and line and not _detect_section_keyword(line):
|
||
# 연속행 (불릿 없이 이어지는 텍스트)
|
||
cleaned = line.strip()
|
||
if cleaned:
|
||
current_bullets[-1] += ' ' + cleaned
|
||
|
||
# 마지막 섹션 flush
|
||
_flush_bullets()
|
||
section.sensitive_info = sensitive_items
|
||
|
||
if not section.sect_cd:
|
||
return None
|
||
|
||
# ESI 등급 매핑 (cst_tp_cd 기반)
|
||
if section.cst_tp_cd:
|
||
section.esi_cd, section.esi_num = map_esi(section.cst_tp_cd)
|
||
|
||
return section
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 섹션별 파싱 헬퍼
|
||
# ---------------------------------------------------------------------------
|
||
def _parse_general(section: CoastalSection, bullets: list[str]):
|
||
"""해안 일반특성 불릿에서 shore_tp, cst_tp_cd, len_m, width_m 추출."""
|
||
for b in bullets:
|
||
if '형태' in b:
|
||
if '폐쇄' in b:
|
||
section.shore_tp = '폐쇄형'
|
||
elif '개방' in b:
|
||
section.shore_tp = '개방형'
|
||
elif '반폐쇄' in b or '반 폐쇄' in b:
|
||
section.shore_tp = '반폐쇄형'
|
||
elif '이루어져' in b or '조성' in b or '으로 이루어' in b:
|
||
# 해안 구성 추출
|
||
section.cst_tp_cd = _extract_coastal_type(b)
|
||
length = _parse_measurement(b, _LENGTH_RE)
|
||
if length and not section.len_m:
|
||
section.len_m = length
|
||
width = _parse_measurement(b, _WIDTH_RE)
|
||
if width:
|
||
section.width_m = width
|
||
|
||
|
||
def _extract_coastal_type(text: str) -> str:
|
||
"""해안 구성 유형 추출."""
|
||
types = [
|
||
'투과성 인공호안', '비투과성 인공호안', '인공호안',
|
||
'모래', '세립질 모래', '굵은 모래',
|
||
'자갈', '수직암반', '수평암반',
|
||
'갯벌', '습지', '사석',
|
||
'콘크리트', '테트라포드',
|
||
]
|
||
for t in types:
|
||
if t in text:
|
||
return t
|
||
# fallback: "해안은 XXX으로 이루어져" 패턴
|
||
m = re.search(r'해안은\s+(.+?)(?:으로|로)\s*이루어져', text)
|
||
if m:
|
||
return m.group(1).strip()
|
||
return text
|
||
|
||
|
||
def _split_cleanup_and_criteria(section: CoastalSection, bullets: list[str]):
|
||
"""두 컬럼이 병합된 불릿을 방제방법/중지기준으로 분류."""
|
||
cleanup = []
|
||
criteria = []
|
||
for b in bullets:
|
||
if _is_end_criteria_item(b):
|
||
criteria.append(b)
|
||
else:
|
||
cleanup.append(b)
|
||
section.cleanup_methods = cleanup
|
||
section.end_criteria = criteria
|
||
|
||
|
||
def _parse_access(section: CoastalSection, bullets: list[str]):
|
||
"""접근방법 불릿에서 access_dc, access_pt 추출."""
|
||
access_parts = []
|
||
for b in bullets:
|
||
if '주요접근지점' in b or '주요 접근지점' in b or '주요접근 지점' in b:
|
||
# "주요접근지점 : 부사방조제" 패턴
|
||
parts = re.split(r'[::]', b, maxsplit=1)
|
||
if len(parts) > 1:
|
||
section.access_pt = parts[1].strip()
|
||
else:
|
||
section.access_pt = b.replace('주요접근지점', '').strip()
|
||
else:
|
||
access_parts.append(b)
|
||
if access_parts:
|
||
section.access_dc = ' / '.join(access_parts)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 전체 PDF 파싱
|
||
# ---------------------------------------------------------------------------
|
||
def parse_pdf(pdf_path: str | Path) -> ParseResult:
|
||
"""PDF 전체를 파싱하여 ParseResult 반환."""
|
||
pdf_path = Path(pdf_path)
|
||
doc = fitz.open(str(pdf_path))
|
||
|
||
result = ParseResult(
|
||
pdf_filename=pdf_path.name,
|
||
)
|
||
|
||
# 관할/구역명 추출 시도 (첫 30페이지 탐색)
|
||
for i in range(min(35, doc.page_count)):
|
||
text = doc[i].get_text('text')
|
||
if '관할' in text and '해안' in text:
|
||
# "보령 해양경비안전서 관할" 패턴
|
||
m = re.search(r'(\S+\s*해양경[비찰]\S*)\s*관할', text)
|
||
if m and not result.jurisdiction:
|
||
result.jurisdiction = m.group(1).strip()
|
||
# 구역명: "X. 충남 서천군 해안 사전평가 정보" 패턴
|
||
m = re.search(r'\d+\.\s*(.+?)\s*해안\s*사전평가', text)
|
||
if m and not result.zone_name:
|
||
result.zone_name = m.group(1).strip()
|
||
if result.jurisdiction and result.zone_name:
|
||
break
|
||
|
||
# 데이터 페이지 파싱
|
||
skipped = 0
|
||
for i in range(doc.page_count):
|
||
page = doc[i]
|
||
if not is_data_page(page):
|
||
skipped += 1
|
||
continue
|
||
section = parse_page(page)
|
||
if section:
|
||
result.sections.append(section)
|
||
|
||
result.total_sections = len(result.sections)
|
||
result.skipped_pages = skipped
|
||
doc.close()
|
||
|
||
return result
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# CLI 실행
|
||
# ---------------------------------------------------------------------------
|
||
if __name__ == '__main__':
|
||
import sys
|
||
import json
|
||
|
||
if len(sys.argv) < 2:
|
||
print('Usage: python parser.py <pdf_path>')
|
||
sys.exit(1)
|
||
|
||
r = parse_pdf(sys.argv[1])
|
||
print(json.dumps(r.model_dump(), ensure_ascii=False, indent=2))
|