wing-ops/prediction/scat/run.py
leedano d9fb4506bc feat(scat): Pre-SCAT 관할서 필터링 + 해안조사 데이터 파이프라인 구축
- 백엔드: 관할서 목록 API, zone 필터링 쿼리 추가
- 프론트: ScatLeftPanel 관할서 드롭다운, ScatMap/ScatPopup 개선
- 기상탭: WeatherRightPanel 리팩토링
- prediction/scat: PDF 파싱 → 지오코딩 → ESI 매핑 파이프라인
- vite.config: proxy 설정 추가

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 10:53:19 +09:00

379 lines
13 KiB
Python

"""SCAT PDF 파싱 CLI 도구.
사용법:
python run.py <pdf_path> # 단일 PDF 파싱
python run.py <directory_path> # 배치 파싱
python run.py --load-json output/ --geocode # JSON에 좌표 추가
python run.py --load-json output/ --save # JSON → DB 저장
python run.py --load-json output/ --save --dry-run # DB 저장 미리보기
"""
from __future__ import annotations
import argparse
import io
import json
import re
import sys
from collections import defaultdict
from pathlib import Path
# Windows cp949 대응
if sys.platform == 'win32':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
import fitz
from pdf_parser import parse_pdf
from pdf_parser_b import parse_pdf_b
from models import CoastalSection, SensitiveItem
OUTPUT_DIR = Path(__file__).parent / 'output'
# ---------------------------------------------------------------------------
# PDF 형식 감지
# ---------------------------------------------------------------------------
def detect_pdf_type(pdf_path: Path) -> str:
"""PDF 형식 감지. 'A'(해안사전평가정보집) 또는 'B'(방제정보집) 반환."""
doc = fitz.open(str(pdf_path))
for i in range(min(30, doc.page_count)):
text = doc[i].get_text('text')
if '식별자' in text and '코드명' in text:
doc.close()
return 'B'
doc.close()
return 'A'
# ---------------------------------------------------------------------------
# PDF 파싱
# ---------------------------------------------------------------------------
def process_pdf(pdf_path: Path) -> dict:
"""단일 PDF를 파싱하고 JSON 파일로 저장한다."""
pdf_type = detect_pdf_type(pdf_path)
if pdf_type == 'B':
result = parse_pdf_b(str(pdf_path))
else:
result = parse_pdf(str(pdf_path))
data = result.model_dump()
for s in data['sections']:
s.pop('photos', None)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
out_path = OUTPUT_DIR / f'{pdf_path.stem}.json'
with open(out_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return {
'file': pdf_path.name,
'output': str(out_path),
'zone_name': result.zone_name,
'jurisdiction': result.jurisdiction,
'total_sections': result.total_sections,
'skipped_pages': result.skipped_pages,
}
def run_parse(target: Path):
"""PDF 파싱 실행."""
if target.is_file() and target.suffix.lower() == '.pdf':
pdf_files = [target]
elif target.is_dir():
pdf_files = sorted(target.glob('*.pdf'))
if not pdf_files:
print(f'PDF 파일을 찾을 수 없습니다: {target}')
sys.exit(1)
print(f'{len(pdf_files)}개 PDF 발견\n')
else:
print(f'유효하지 않은 경로: {target}')
sys.exit(1)
results = []
for i, pdf in enumerate(pdf_files, 1):
pdf_type = detect_pdf_type(pdf)
print(f'[{i}/{len(pdf_files)}] {pdf.name} (Type {pdf_type}) 파싱 중...')
try:
info = process_pdf(pdf)
results.append(info)
print(f' -> {info["total_sections"]}개 구간 | {info["zone_name"]} | {info["jurisdiction"]}')
print(f' -> 저장: {info["output"]}')
except Exception as e:
print(f' -> 오류: {e}')
results.append({'file': pdf.name, 'error': str(e)})
if len(results) > 1:
print(f'\n=== 요약 ===')
success = [r for r in results if 'error' not in r]
failed = [r for r in results if 'error' in r]
total_sections = sum(r['total_sections'] for r in success)
print(f'성공: {len(success)}개 / 실패: {len(failed)}개 / 총 구간: {total_sections}')
if failed:
print(f'실패 파일: {", ".join(r["file"] for r in failed)}')
# ---------------------------------------------------------------------------
# JSON → DB 저장
# ---------------------------------------------------------------------------
def _extract_zone_cd(sect_cd: str) -> str:
"""sect_cd에서 zone_cd 추출 (영문 접두사).
Type A: SSDD-1 → SSDD (하이픈 앞 영문)
Type B: BSBB-1-M-E-S-N → BSBB (첫 하이픈 앞 영문)
"""
m = re.match(r'^([A-Z]{2,})', sect_cd)
return m.group(1) if m else sect_cd
def _extract_jrsd_short(jurisdiction: str) -> str:
"""관할 기관명에서 짧은 이름 추출. 예: '보령 해양경비안전서''보령'"""
if not jurisdiction:
return ''
return jurisdiction.split()[0] if ' ' in jurisdiction else jurisdiction
def _dict_to_section(d: dict) -> CoastalSection:
"""JSON dict → CoastalSection 모델 변환."""
sensitive = [SensitiveItem(**item) for item in (d.get('sensitive_info') or [])]
return CoastalSection(
section_number=d.get('section_number', 0),
sect_nm=d.get('sect_nm', ''),
sect_cd=d.get('sect_cd', ''),
esi_cd=d.get('esi_cd'),
esi_num=d.get('esi_num'),
shore_tp=d.get('shore_tp'),
cst_tp_cd=d.get('cst_tp_cd'),
len_m=d.get('len_m'),
width_m=d.get('width_m'),
lat=d.get('lat'),
lng=d.get('lng'),
access_dc=d.get('access_dc'),
access_pt=d.get('access_pt'),
sensitive_info=sensitive,
cleanup_methods=d.get('cleanup_methods', []),
end_criteria=d.get('end_criteria', []),
notes=d.get('notes', []),
)
def load_json_files(json_dir: Path) -> list[dict]:
"""JSON 디렉토리에서 모든 파싱 결과를 로드한다."""
all_data = []
for f in sorted(json_dir.glob('*.json')):
with open(f, encoding='utf-8') as fp:
data = json.load(fp)
if data.get('total_sections', 0) > 0:
all_data.append(data)
return all_data
def group_by_zone(all_data: list[dict]) -> dict:
"""파싱 결과를 zone_cd로 그룹핑한다.
Returns:
{zone_cd: {
'zone_nm': str,
'jrsd_nm': str,
'sections': [dict, ...]
}}
"""
zones = defaultdict(lambda: {'zone_nm': '', 'jrsd_nm': '', 'sections': []})
for data in all_data:
zone_name = data.get('zone_name', '')
jrsd_nm = _extract_jrsd_short(data.get('jurisdiction', ''))
for sect in data['sections']:
zone_cd = _extract_zone_cd(sect['sect_cd'])
zone = zones[zone_cd]
if not zone['zone_nm']:
zone['zone_nm'] = zone_name
if not zone['jrsd_nm']:
zone['jrsd_nm'] = jrsd_nm
zone['sections'].append(sect)
return dict(zones)
def run_save(json_dir: Path, dry_run: bool = False):
"""JSON 파싱 결과를 DB에 저장한다."""
all_data = load_json_files(json_dir)
if not all_data:
print(f'유효한 JSON 파일을 찾을 수 없습니다: {json_dir}')
sys.exit(1)
zones = group_by_zone(all_data)
total_sections = sum(len(z['sections']) for z in zones.values())
print(f'=== DB 저장 {"미리보기" if dry_run else "시작"} ===')
print(f'{len(zones)}개 zone, {total_sections}개 구간\n')
for zone_cd, zone_info in sorted(zones.items()):
sect_count = len(zone_info['sections'])
print(f' {zone_cd:8s} | {zone_info["zone_nm"]:20s} | {zone_info["jrsd_nm"]:8s} | {sect_count}개 구간')
if dry_run:
print(f'\n(dry-run 모드 — DB에 저장하지 않음)')
return
# 실제 DB 저장
from db import ensure_zone, upsert_section, update_zone_sect_count, update_zone_center, close_pool
saved_zones = 0
saved_sections = 0
try:
for zone_cd, zone_info in sorted(zones.items()):
zone_sn = ensure_zone(zone_cd, zone_info['zone_nm'], zone_info['jrsd_nm'])
saved_zones += 1
for sect_dict in zone_info['sections']:
section = _dict_to_section(sect_dict)
upsert_section(zone_sn, section)
saved_sections += 1
update_zone_sect_count(zone_sn)
update_zone_center(zone_sn)
print(f'\n=== 완료 ===')
print(f'{saved_zones}개 zone, {saved_sections}개 구간 저장 완료')
except Exception as e:
print(f'\n오류 발생: {e}')
print(f'저장 진행: {saved_zones}개 zone, {saved_sections}개 구간까지 완료')
raise
finally:
close_pool()
# ---------------------------------------------------------------------------
# Geocoding
# ---------------------------------------------------------------------------
def run_geocode(json_dir: Path):
"""JSON 파싱 결과에 좌표를 추가한다."""
from geocoder import geocode_sections, load_cache, save_cache
load_cache()
json_files = sorted(json_dir.glob('*.json'))
json_files = [f for f in json_files if not f.name.startswith('.')]
if not json_files:
print(f'JSON 파일을 찾을 수 없습니다: {json_dir}')
sys.exit(1)
print(f'=== Geocoding 시작 ({len(json_files)}개 JSON) ===\n')
total_success = 0
total_fail = 0
for i, f in enumerate(json_files, 1):
with open(f, encoding='utf-8') as fp:
data = json.load(fp)
sections = data.get('sections', [])
if not sections:
continue
zone_name = data.get('zone_name', '')
print(f'[{i}/{len(json_files)}] {f.name} ({len(sections)}개 구간)...')
success, fail = geocode_sections(sections, zone_name)
total_success += success
total_fail += fail
# 좌표가 있는 구간 수
with_coords = sum(1 for s in sections if s.get('lat'))
print(f' -> 좌표: {with_coords}/{len(sections)}')
# JSON 업데이트 저장
with open(f, 'w', encoding='utf-8') as fp:
json.dump(data, fp, ensure_ascii=False, indent=2)
save_cache()
print(f'\n=== Geocoding 완료 ===')
print(f'성공: {total_success} / 실패: {total_fail}')
# ---------------------------------------------------------------------------
# 이미지 추출
# ---------------------------------------------------------------------------
def run_extract_images(target: Path):
"""PDF에서 해안사진을 추출하여 scat-photos/에 저장."""
from image_extractor import extract_images_from_pdf
if target.is_file() and target.suffix.lower() == '.pdf':
pdf_files = [target]
elif target.is_dir():
pdf_files = sorted(target.glob('*.pdf'))
if not pdf_files:
print(f'PDF 파일을 찾을 수 없습니다: {target}')
sys.exit(1)
print(f'{len(pdf_files)}개 PDF 발견\n')
else:
print(f'유효하지 않은 경로: {target}')
sys.exit(1)
total = 0
for i, pdf in enumerate(pdf_files, 1):
pdf_type = detect_pdf_type(pdf)
print(f'[{i}/{len(pdf_files)}] {pdf.name} (Type {pdf_type}) 이미지 추출 중...')
try:
count = extract_images_from_pdf(pdf, pdf_type=pdf_type)
total += count
print(f' -> {count}개 이미지 저장')
except Exception as e:
print(f' -> 오류: {e}')
print(f'\n=== 이미지 추출 완료: 총 {total}개 ===')
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description='SCAT PDF 파싱 CLI 도구')
parser.add_argument('target', nargs='?', help='PDF 파일 또는 디렉토리 경로')
parser.add_argument('--save', action='store_true', help='파싱 결과를 DB에 저장')
parser.add_argument('--load-json', type=Path, help='이미 파싱된 JSON 디렉토리에서 로드')
parser.add_argument('--geocode', action='store_true', help='JSON에 Kakao Geocoding으로 좌표 추가')
parser.add_argument('--extract-images', action='store_true', help='PDF에서 해안사진 추출 → scat-photos/')
parser.add_argument('--dry-run', action='store_true', help='DB 저장 미리보기 (실제 저장 안 함)')
args = parser.parse_args()
# JSON 로드 모드
if args.load_json:
if args.geocode:
run_geocode(args.load_json)
if args.save:
run_save(args.load_json, dry_run=args.dry_run)
if not args.geocode and not args.save:
print('--load-json은 --geocode 또는 --save와 함께 사용해야 합니다.')
sys.exit(1)
return
# PDF 파싱 모드
if not args.target:
parser.print_help()
sys.exit(1)
target = Path(args.target)
# 이미지 추출 모드
if args.extract_images:
run_extract_images(target)
return
run_parse(target)
# 파싱 후 바로 DB 저장
if args.save:
print('\n')
run_save(OUTPUT_DIR, dry_run=args.dry_run)
if __name__ == '__main__':
main()