wing-ops/backend/scripts/hns-import/ocr-local.py

325 lines
12 KiB
Python
Raw Blame 히스토리

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""로컬 EasyOCR 기반 HNS 카드 이미지 파싱.
전용 venv(.venv)에 설치된 easyocr을 사용한다.
1. 이미지 → EasyOCR → (bbox, text, conf) 리스트
2. y좌표로 행 그룹화 후 각 행 내 x좌표 정렬
3. 레이블 키워드 기반 필드 매핑 (정규식)
4. 결과를 out/ocr.json 에 누적 저장 (재실행 가능)
실행:
cd backend/scripts/hns-import
source .venv/Scripts/activate # Windows Git Bash
python ocr-local.py [--limit N] [--only 벤젠,톨루엔,...]
"""
from __future__ import annotations
import argparse
import io
import json
import os
import re
import sys
from pathlib import Path
from typing import Any
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
SCRIPT_DIR = Path(__file__).parent.resolve()
OUT_DIR = SCRIPT_DIR / 'out'
IMG_DIR = OUT_DIR / 'images'
OCR_PATH_DEFAULT = OUT_DIR / 'ocr.json'
FAIL_PATH_DEFAULT = OUT_DIR / 'ocr-failures.json'
# ────────── 필드 레이블 패턴 (EasyOCR 오인식 변형 포함) ──────────
# 각 필드의 후보 레이블 문자열(공백 제거 후 비교). 한글 OCR이 종종 비슷한 글자로 오인식되므로
# 대표적인 변형도 함께 등록 (예: "인화점" ↔ "인회점", "끓는점" ↔ "꿈는점" ↔ "끝는점").
LABEL_CANDIDATES: dict[str, list[str]] = {
'casNumber': ['CAS번호', 'CASNO', 'CAS'],
'unNumber': ['UN번호', 'UNNO', 'UN'],
'transportMethod': ['운송방법', '운승방벌', '운송방벌', '운송방립', '운송'],
'usage': ['용도'],
'state': ['성상', '상태', '형태'],
'color': ['색상', ''],
'odor': ['냄새'],
'flashPoint': ['인화점', '인회점', '인하점', '인호점'],
'autoIgnition': ['발화점', '발회점', '발하점'],
'boilingPoint': ['끓는점', '꿈는점', '끝는점', '끊는점'],
'density': ['비중'],
'solubility': ['용해도', '용해'],
'vaporPressure': ['증기압', '증기압력'],
'vaporDensity': ['증기밀도'],
'explosionRange': ['폭발범위', '곡발범위', '폭범위', '폭발한계'],
'idlh': ['IDLH'],
'aegl2': ['AEGL-2', 'AEGL2'],
'erpg2': ['ERPG-2', 'ERPG2'],
'twa': ['TWA'],
'stel': ['STEL'],
'ergNumber': ['ERG번호', 'ERG'],
'hazardClass': ['위험분류', '위험', '분류'],
'synonymsKr': ['유사명'],
'responseDistanceFire': ['대피거리', '머피거리'],
'ppeClose': ['근거리(레벨A)', '근거리레벨A', '근거리', '레벨A'],
'ppeFar': ['원거리(레벨C)', '원거리레벨C', '원거리', '레벨C'],
'emsFire': ['화재(F-E)', '화재(F-C)', '화재(F-D)', '화재대응'],
'emsSpill': ['유출(S-U)', '유출(S-O)', '유출(S-D)', '해상유출'],
'marineResponse': ['해상대응', '해상'],
}
def _norm_label(s: str) -> str:
"""공백/특수문자 제거 후 비교용 정규화."""
return re.sub(r'[\s,.·()\[\]:;\'"-]+', '', s).strip()
LABEL_INDEX: dict[str, str] = {}
for _field, _candidates in LABEL_CANDIDATES.items():
for _cand in _candidates:
LABEL_INDEX[_norm_label(_cand)] = _field
# NFPA 셀 값(한 자릿수 0~4) 추출용
NFPA_VALUE_RE = re.compile(r'^[0-4]$')
def group_rows(items: list[dict], y_tolerance_ratio: float = 0.6) -> list[list[dict]]:
"""텍스트 조각들을 y 좌표 기준으로 행 단위로 그룹화 (글자 높이 비례 허용치)."""
if not items:
return []
heights = [it['y1'] - it['y0'] for it in items]
median_h = sorted(heights)[len(heights) // 2]
y_tol = max(8, median_h * y_tolerance_ratio)
sorted_items = sorted(items, key=lambda it: it['cy'])
rows: list[list[dict]] = []
for it in sorted_items:
if rows and abs(it['cy'] - rows[-1][-1]['cy']) <= y_tol:
rows[-1].append(it)
else:
rows.append([it])
for row in rows:
row.sort(key=lambda it: it['cx'])
return rows
def _match_label(text: str) -> str | None:
key = _norm_label(text)
if not key:
return None
# 정확 일치 우선
if key in LABEL_INDEX:
return LABEL_INDEX[key]
# 접두 일치 (OCR이 뒤에 잡티를 붙이는 경우)
for cand_key, field in LABEL_INDEX.items():
if len(cand_key) >= 2 and key.startswith(cand_key):
return field
return None
def parse_card(items: list[dict]) -> dict[str, Any]:
"""OCR 결과 목록을 필드 dict로 변환."""
rows = group_rows(items)
result: dict[str, Any] = {}
# 1) 행 내 "레이블 → 값" 쌍 추출
# 같은 행에서 레이블 바로 뒤의 첫 non-label 텍스트를 값으로 사용.
for row in rows:
# 여러 레이블이 같은 행에 있을 수 있음 (2컬럼 표 구조)
idx = 0
while idx < len(row):
field = _match_label(row[idx]['text'])
if field:
# 다음 non-label 조각을 값으로 취함
value_parts: list[str] = []
j = idx + 1
while j < len(row):
nxt = row[j]
if _match_label(nxt['text']):
break
value_parts.append(nxt['text'])
j += 1
if value_parts and field not in result:
value = ' '.join(value_parts).strip()
if value and value not in ('-', '', 'N/A'):
result[field] = value
idx = j
else:
idx += 1
# 2) NFPA 추출: "NFPA" 단어 주변의 0~4 숫자 3개
nfpa_idx_row: int | None = None
for ri, row in enumerate(rows):
for cell in row:
if re.search(r'NFPA', cell['text']):
nfpa_idx_row = ri
break
if nfpa_idx_row is not None:
break
if nfpa_idx_row is not None:
# 해당 행 + 다음 2개 행에서 0~4 숫자 수집
candidates: list[int] = []
for ri in range(nfpa_idx_row, min(nfpa_idx_row + 3, len(rows))):
for cell in rows[ri]:
m = NFPA_VALUE_RE.match(cell['text'].strip())
if m:
candidates.append(int(cell['text'].strip()))
if len(candidates) >= 3:
break
if len(candidates) >= 3:
break
if len(candidates) >= 3:
result['nfpa'] = {
'health': candidates[0],
'fire': candidates[1],
'reactivity': candidates[2],
'special': '',
}
# 3) EmS 코드 (F-x / S-x 패턴)
all_text = ' '.join(cell['text'] for row in rows for cell in row)
f_match = re.search(r'F\s*-\s*([A-Z])', all_text)
s_match = re.search(r'S\s*-\s*([A-Z])', all_text)
if f_match or s_match:
parts = []
if f_match:
parts.append(f'F-{f_match.group(1)}')
if s_match:
parts.append(f'S-{s_match.group(1)}')
if parts:
result['emsCode'] = ', '.join(parts)
# 4) ERG 번호 (3자리 숫자, P 접미사 가능, "ERG" 키워드 근처)
erg_match = re.search(r'ERG[^\d]{0,10}(\d{3}P?)', all_text)
if erg_match:
result['ergNumber'] = erg_match.group(1)
# 5) EmS F-x / S-x 코드 뒤의 본문 (생략 - 이미지 내 텍스트 밀도가 낮아 행 단위로 이미 잡힘)
return result
def _preprocess_image(pil_img, upscale: float = 2.5):
"""한글 OCR 정확도 향상을 위한 업스케일 + 샤프닝 + 대비 향상."""
from PIL import Image, ImageEnhance, ImageFilter
import numpy as np
if pil_img.mode != 'RGB':
pil_img = pil_img.convert('RGB')
# 1) 업스케일 (LANCZOS)
w, h = pil_img.size
pil_img = pil_img.resize((int(w * upscale), int(h * upscale)), Image.LANCZOS)
# 2) 대비 향상
pil_img = ImageEnhance.Contrast(pil_img).enhance(1.3)
# 3) 샤프닝
pil_img = pil_img.filter(ImageFilter.UnsharpMask(radius=1.5, percent=150, threshold=2))
return np.array(pil_img)
def run_ocr(image_path: Path, reader, upscale: float = 2.5) -> list[dict]:
# OpenCV가 Windows에서 한글 경로를 못 읽으므로 PIL로 로드 후 전처리
from PIL import Image
with Image.open(image_path) as pil:
img = _preprocess_image(pil, upscale=upscale)
raw = reader.readtext(img, detail=1, paragraph=False)
items: list[dict] = []
for bbox, text, conf in raw:
if not text or not str(text).strip():
continue
xs = [p[0] for p in bbox]
ys = [p[1] for p in bbox]
items.append({
'text': str(text).strip(),
'cx': sum(xs) / 4.0,
'cy': sum(ys) / 4.0,
'x0': min(xs),
'x1': max(xs),
'y0': min(ys),
'y1': max(ys),
'conf': float(conf),
})
return items
def load_json(path: Path, fallback):
if not path.exists():
return fallback
try:
return json.loads(path.read_text(encoding='utf-8'))
except Exception:
return fallback
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('--limit', type=int, default=None)
parser.add_argument('--only', type=str, default=None,
help='파이프(|)로 구분된 물질명 리스트')
parser.add_argument('--img-dir', type=Path, default=IMG_DIR)
parser.add_argument('--out', type=Path, default=OCR_PATH_DEFAULT)
parser.add_argument('--fail', type=Path, default=FAIL_PATH_DEFAULT)
parser.add_argument('--debug', action='store_true',
help='파싱 중간 결과(row 단위) 함께 출력')
args = parser.parse_args()
import easyocr # noqa: WPS433
print('[로딩] EasyOCR 모델 (ko + en)... (최초 실행 시 수 분 소요)')
reader = easyocr.Reader(['ko', 'en'], gpu=False, verbose=False)
print('[로딩] 완료')
images = sorted([p for p in args.img_dir.iterdir() if p.suffix.lower() in {'.png', '.jpg', '.jpeg'}])
if args.only:
only_set = {s.strip() for s in args.only.split('|') if s.strip()}
images = [p for p in images if p.stem in only_set]
existing: dict[str, Any] = load_json(args.out, {})
failures: dict[str, str] = load_json(args.fail, {})
pending = [p for p in images if p.stem not in existing]
if args.limit:
pending = pending[: args.limit]
print(f'[대상] {len(images)}개 중 대기 {len(pending)}개, 이미 처리 {len(existing)}')
ok = 0
fail = 0
for i, path in enumerate(pending, start=1):
name = path.stem
try:
items = run_ocr(path, reader)
parsed = parse_card(items)
if args.debug:
print(f'\n--- {name} (텍스트 {len(items)}개) ---')
for row in group_rows(items):
print(' |', ''.join(f'{c["text"]}' for c in row))
print(f' → parsed: {parsed}')
existing[name] = parsed
if name in failures:
del failures[name]
ok += 1
except Exception as e: # noqa: BLE001
failures[name] = f'{type(e).__name__}: {e}'[:500]
fail += 1
print(f'[실패] {name}: {e}')
if i % 10 == 0 or i == len(pending):
args.out.write_text(json.dumps(existing, ensure_ascii=False, indent=2), encoding='utf-8')
args.fail.write_text(json.dumps(failures, ensure_ascii=False, indent=2), encoding='utf-8')
print(f' 진행 {i}/{len(pending)} (성공 {ok}, 실패 {fail}) - 중간 저장')
args.out.write_text(json.dumps(existing, ensure_ascii=False, indent=2), encoding='utf-8')
args.fail.write_text(json.dumps(failures, ensure_ascii=False, indent=2), encoding='utf-8')
print(f'\n[완료] 성공 {ok} / 실패 {fail}')
print(f' 결과: {args.out}')
if failures:
print(f' 실패 목록: {args.fail}')
if __name__ == '__main__':
main()