"""로컬 EasyOCR 기반 HNS 카드 이미지 파싱. 전용 venv(.venv)에 설치된 easyocr을 사용한다. 1. 이미지 → EasyOCR → (bbox, text, conf) 리스트 2. y좌표로 행 그룹화 후 각 행 내 x좌표 정렬 3. 레이블 키워드 기반 필드 매핑 (정규식) 4. 결과를 out/ocr.json 에 누적 저장 (재실행 가능) 실행: cd backend/scripts/hns-import source .venv/Scripts/activate # Windows Git Bash python ocr-local.py [--limit N] [--only 벤젠,톨루엔,...] """ from __future__ import annotations import argparse import io import json import os import re import sys from pathlib import Path from typing import Any sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') SCRIPT_DIR = Path(__file__).parent.resolve() OUT_DIR = SCRIPT_DIR / 'out' IMG_DIR = OUT_DIR / 'images' OCR_PATH_DEFAULT = OUT_DIR / 'ocr.json' FAIL_PATH_DEFAULT = OUT_DIR / 'ocr-failures.json' # ────────── 필드 레이블 패턴 (EasyOCR 오인식 변형 포함) ────────── # 각 필드의 후보 레이블 문자열(공백 제거 후 비교). 한글 OCR이 종종 비슷한 글자로 오인식되므로 # 대표적인 변형도 함께 등록 (예: "인화점" ↔ "인회점", "끓는점" ↔ "꿈는점" ↔ "끝는점"). LABEL_CANDIDATES: dict[str, list[str]] = { 'casNumber': ['CAS번호', 'CASNO', 'CAS'], 'unNumber': ['UN번호', 'UNNO', 'UN'], 'transportMethod': ['운송방법', '운승방벌', '운송방벌', '운송방립', '운송'], 'usage': ['용도'], 'state': ['성상', '상태', '형태'], 'color': ['색상', '색'], 'odor': ['냄새'], 'flashPoint': ['인화점', '인회점', '인하점', '인호점'], 'autoIgnition': ['발화점', '발회점', '발하점'], 'boilingPoint': ['끓는점', '꿈는점', '끝는점', '끊는점'], 'density': ['비중'], 'solubility': ['용해도', '용해'], 'vaporPressure': ['증기압', '증기압력'], 'vaporDensity': ['증기밀도'], 'explosionRange': ['폭발범위', '곡발범위', '폭범위', '폭발한계'], 'idlh': ['IDLH'], 'aegl2': ['AEGL-2', 'AEGL2'], 'erpg2': ['ERPG-2', 'ERPG2'], 'twa': ['TWA'], 'stel': ['STEL'], 'ergNumber': ['ERG번호', 'ERG'], 'hazardClass': ['위험분류', '위험', '분류'], 'synonymsKr': ['유사명'], 'responseDistanceFire': ['대피거리', '머피거리'], 'ppeClose': ['근거리(레벨A)', '근거리레벨A', '근거리', '레벨A'], 'ppeFar': ['원거리(레벨C)', '원거리레벨C', '원거리', '레벨C'], 'emsFire': ['화재(F-E)', '화재(F-C)', '화재(F-D)', '화재대응'], 'emsSpill': ['유출(S-U)', '유출(S-O)', '유출(S-D)', '해상유출'], 'marineResponse': ['해상대응', '해상'], } def _norm_label(s: str) -> str: """공백/특수문자 제거 후 비교용 정규화.""" return re.sub(r'[\s,.·()\[\]:;\'"-]+', '', s).strip() LABEL_INDEX: dict[str, str] = {} for _field, _candidates in LABEL_CANDIDATES.items(): for _cand in _candidates: LABEL_INDEX[_norm_label(_cand)] = _field # NFPA 셀 값(한 자릿수 0~4) 추출용 NFPA_VALUE_RE = re.compile(r'^[0-4]$') def group_rows(items: list[dict], y_tolerance_ratio: float = 0.6) -> list[list[dict]]: """텍스트 조각들을 y 좌표 기준으로 행 단위로 그룹화 (글자 높이 비례 허용치).""" if not items: return [] heights = [it['y1'] - it['y0'] for it in items] median_h = sorted(heights)[len(heights) // 2] y_tol = max(8, median_h * y_tolerance_ratio) sorted_items = sorted(items, key=lambda it: it['cy']) rows: list[list[dict]] = [] for it in sorted_items: if rows and abs(it['cy'] - rows[-1][-1]['cy']) <= y_tol: rows[-1].append(it) else: rows.append([it]) for row in rows: row.sort(key=lambda it: it['cx']) return rows def _match_label(text: str) -> str | None: key = _norm_label(text) if not key: return None # 정확 일치 우선 if key in LABEL_INDEX: return LABEL_INDEX[key] # 접두 일치 (OCR이 뒤에 잡티를 붙이는 경우) for cand_key, field in LABEL_INDEX.items(): if len(cand_key) >= 2 and key.startswith(cand_key): return field return None def parse_card(items: list[dict]) -> dict[str, Any]: """OCR 결과 목록을 필드 dict로 변환.""" rows = group_rows(items) result: dict[str, Any] = {} # 1) 행 내 "레이블 → 값" 쌍 추출 # 같은 행에서 레이블 바로 뒤의 첫 non-label 텍스트를 값으로 사용. for row in rows: # 여러 레이블이 같은 행에 있을 수 있음 (2컬럼 표 구조) idx = 0 while idx < len(row): field = _match_label(row[idx]['text']) if field: # 다음 non-label 조각을 값으로 취함 value_parts: list[str] = [] j = idx + 1 while j < len(row): nxt = row[j] if _match_label(nxt['text']): break value_parts.append(nxt['text']) j += 1 if value_parts and field not in result: value = ' '.join(value_parts).strip() if value and value not in ('-', '–', 'N/A'): result[field] = value idx = j else: idx += 1 # 2) NFPA 추출: "NFPA" 단어 주변의 0~4 숫자 3개 nfpa_idx_row: int | None = None for ri, row in enumerate(rows): for cell in row: if re.search(r'NFPA', cell['text']): nfpa_idx_row = ri break if nfpa_idx_row is not None: break if nfpa_idx_row is not None: # 해당 행 + 다음 2개 행에서 0~4 숫자 수집 candidates: list[int] = [] for ri in range(nfpa_idx_row, min(nfpa_idx_row + 3, len(rows))): for cell in rows[ri]: m = NFPA_VALUE_RE.match(cell['text'].strip()) if m: candidates.append(int(cell['text'].strip())) if len(candidates) >= 3: break if len(candidates) >= 3: break if len(candidates) >= 3: result['nfpa'] = { 'health': candidates[0], 'fire': candidates[1], 'reactivity': candidates[2], 'special': '', } # 3) EmS 코드 (F-x / S-x 패턴) all_text = ' '.join(cell['text'] for row in rows for cell in row) f_match = re.search(r'F\s*-\s*([A-Z])', all_text) s_match = re.search(r'S\s*-\s*([A-Z])', all_text) if f_match or s_match: parts = [] if f_match: parts.append(f'F-{f_match.group(1)}') if s_match: parts.append(f'S-{s_match.group(1)}') if parts: result['emsCode'] = ', '.join(parts) # 4) ERG 번호 (3자리 숫자, P 접미사 가능, "ERG" 키워드 근처) erg_match = re.search(r'ERG[^\d]{0,10}(\d{3}P?)', all_text) if erg_match: result['ergNumber'] = erg_match.group(1) # 5) EmS F-x / S-x 코드 뒤의 본문 (생략 - 이미지 내 텍스트 밀도가 낮아 행 단위로 이미 잡힘) return result def _preprocess_image(pil_img, upscale: float = 2.5): """한글 OCR 정확도 향상을 위한 업스케일 + 샤프닝 + 대비 향상.""" from PIL import Image, ImageEnhance, ImageFilter import numpy as np if pil_img.mode != 'RGB': pil_img = pil_img.convert('RGB') # 1) 업스케일 (LANCZOS) w, h = pil_img.size pil_img = pil_img.resize((int(w * upscale), int(h * upscale)), Image.LANCZOS) # 2) 대비 향상 pil_img = ImageEnhance.Contrast(pil_img).enhance(1.3) # 3) 샤프닝 pil_img = pil_img.filter(ImageFilter.UnsharpMask(radius=1.5, percent=150, threshold=2)) return np.array(pil_img) def run_ocr(image_path: Path, reader, upscale: float = 2.5) -> list[dict]: # OpenCV가 Windows에서 한글 경로를 못 읽으므로 PIL로 로드 후 전처리 from PIL import Image with Image.open(image_path) as pil: img = _preprocess_image(pil, upscale=upscale) raw = reader.readtext(img, detail=1, paragraph=False) items: list[dict] = [] for bbox, text, conf in raw: if not text or not str(text).strip(): continue xs = [p[0] for p in bbox] ys = [p[1] for p in bbox] items.append({ 'text': str(text).strip(), 'cx': sum(xs) / 4.0, 'cy': sum(ys) / 4.0, 'x0': min(xs), 'x1': max(xs), 'y0': min(ys), 'y1': max(ys), 'conf': float(conf), }) return items def load_json(path: Path, fallback): if not path.exists(): return fallback try: return json.loads(path.read_text(encoding='utf-8')) except Exception: return fallback def main() -> None: parser = argparse.ArgumentParser() parser.add_argument('--limit', type=int, default=None) parser.add_argument('--only', type=str, default=None, help='파이프(|)로 구분된 물질명 리스트') parser.add_argument('--img-dir', type=Path, default=IMG_DIR) parser.add_argument('--out', type=Path, default=OCR_PATH_DEFAULT) parser.add_argument('--fail', type=Path, default=FAIL_PATH_DEFAULT) parser.add_argument('--debug', action='store_true', help='파싱 중간 결과(row 단위) 함께 출력') args = parser.parse_args() import easyocr # noqa: WPS433 print('[로딩] EasyOCR 모델 (ko + en)... (최초 실행 시 수 분 소요)') reader = easyocr.Reader(['ko', 'en'], gpu=False, verbose=False) print('[로딩] 완료') images = sorted([p for p in args.img_dir.iterdir() if p.suffix.lower() in {'.png', '.jpg', '.jpeg'}]) if args.only: only_set = {s.strip() for s in args.only.split('|') if s.strip()} images = [p for p in images if p.stem in only_set] existing: dict[str, Any] = load_json(args.out, {}) failures: dict[str, str] = load_json(args.fail, {}) pending = [p for p in images if p.stem not in existing] if args.limit: pending = pending[: args.limit] print(f'[대상] {len(images)}개 중 대기 {len(pending)}개, 이미 처리 {len(existing)}개') ok = 0 fail = 0 for i, path in enumerate(pending, start=1): name = path.stem try: items = run_ocr(path, reader) parsed = parse_card(items) if args.debug: print(f'\n--- {name} (텍스트 {len(items)}개) ---') for row in group_rows(items): print(' |', ' │ '.join(f'{c["text"]}' for c in row)) print(f' → parsed: {parsed}') existing[name] = parsed if name in failures: del failures[name] ok += 1 except Exception as e: # noqa: BLE001 failures[name] = f'{type(e).__name__}: {e}'[:500] fail += 1 print(f'[실패] {name}: {e}') if i % 10 == 0 or i == len(pending): args.out.write_text(json.dumps(existing, ensure_ascii=False, indent=2), encoding='utf-8') args.fail.write_text(json.dumps(failures, ensure_ascii=False, indent=2), encoding='utf-8') print(f' 진행 {i}/{len(pending)} (성공 {ok}, 실패 {fail}) - 중간 저장') args.out.write_text(json.dumps(existing, ensure_ascii=False, indent=2), encoding='utf-8') args.fail.write_text(json.dumps(failures, ensure_ascii=False, indent=2), encoding='utf-8') print(f'\n[완료] 성공 {ok} / 실패 {fail}') print(f' 결과: {args.out}') if failures: print(f' 실패 목록: {args.fail}') if __name__ == '__main__': main()