325 lines
12 KiB
Python
325 lines
12 KiB
Python
"""로컬 EasyOCR 기반 HNS 카드 이미지 파싱.
|
||
|
||
전용 venv(.venv)에 설치된 easyocr을 사용한다.
|
||
|
||
1. 이미지 → EasyOCR → (bbox, text, conf) 리스트
|
||
2. y좌표로 행 그룹화 후 각 행 내 x좌표 정렬
|
||
3. 레이블 키워드 기반 필드 매핑 (정규식)
|
||
4. 결과를 out/ocr.json 에 누적 저장 (재실행 가능)
|
||
|
||
실행:
|
||
cd backend/scripts/hns-import
|
||
source .venv/Scripts/activate # Windows Git Bash
|
||
python ocr-local.py [--limit N] [--only 벤젠,톨루엔,...]
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import io
|
||
import json
|
||
import os
|
||
import re
|
||
import sys
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
|
||
|
||
SCRIPT_DIR = Path(__file__).parent.resolve()
|
||
OUT_DIR = SCRIPT_DIR / 'out'
|
||
IMG_DIR = OUT_DIR / 'images'
|
||
OCR_PATH_DEFAULT = OUT_DIR / 'ocr.json'
|
||
FAIL_PATH_DEFAULT = OUT_DIR / 'ocr-failures.json'
|
||
|
||
|
||
# ────────── 필드 레이블 패턴 (EasyOCR 오인식 변형 포함) ──────────
|
||
# 각 필드의 후보 레이블 문자열(공백 제거 후 비교). 한글 OCR이 종종 비슷한 글자로 오인식되므로
|
||
# 대표적인 변형도 함께 등록 (예: "인화점" ↔ "인회점", "끓는점" ↔ "꿈는점" ↔ "끝는점").
|
||
LABEL_CANDIDATES: dict[str, list[str]] = {
|
||
'casNumber': ['CAS번호', 'CASNO', 'CAS'],
|
||
'unNumber': ['UN번호', 'UNNO', 'UN'],
|
||
'transportMethod': ['운송방법', '운승방벌', '운송방벌', '운송방립', '운송'],
|
||
'usage': ['용도'],
|
||
'state': ['성상', '상태', '형태'],
|
||
'color': ['색상', '색'],
|
||
'odor': ['냄새'],
|
||
'flashPoint': ['인화점', '인회점', '인하점', '인호점'],
|
||
'autoIgnition': ['발화점', '발회점', '발하점'],
|
||
'boilingPoint': ['끓는점', '꿈는점', '끝는점', '끊는점'],
|
||
'density': ['비중'],
|
||
'solubility': ['용해도', '용해'],
|
||
'vaporPressure': ['증기압', '증기압력'],
|
||
'vaporDensity': ['증기밀도'],
|
||
'explosionRange': ['폭발범위', '곡발범위', '폭범위', '폭발한계'],
|
||
'idlh': ['IDLH'],
|
||
'aegl2': ['AEGL-2', 'AEGL2'],
|
||
'erpg2': ['ERPG-2', 'ERPG2'],
|
||
'twa': ['TWA'],
|
||
'stel': ['STEL'],
|
||
'ergNumber': ['ERG번호', 'ERG'],
|
||
'hazardClass': ['위험분류', '위험', '분류'],
|
||
'synonymsKr': ['유사명'],
|
||
'responseDistanceFire': ['대피거리', '머피거리'],
|
||
'ppeClose': ['근거리(레벨A)', '근거리레벨A', '근거리', '레벨A'],
|
||
'ppeFar': ['원거리(레벨C)', '원거리레벨C', '원거리', '레벨C'],
|
||
'emsFire': ['화재(F-E)', '화재(F-C)', '화재(F-D)', '화재대응'],
|
||
'emsSpill': ['유출(S-U)', '유출(S-O)', '유출(S-D)', '해상유출'],
|
||
'marineResponse': ['해상대응', '해상'],
|
||
}
|
||
|
||
|
||
def _norm_label(s: str) -> str:
|
||
"""공백/특수문자 제거 후 비교용 정규화."""
|
||
return re.sub(r'[\s,.·()\[\]:;\'"-]+', '', s).strip()
|
||
|
||
|
||
LABEL_INDEX: dict[str, str] = {}
|
||
for _field, _candidates in LABEL_CANDIDATES.items():
|
||
for _cand in _candidates:
|
||
LABEL_INDEX[_norm_label(_cand)] = _field
|
||
|
||
# NFPA 셀 값(한 자릿수 0~4) 추출용
|
||
NFPA_VALUE_RE = re.compile(r'^[0-4]$')
|
||
|
||
|
||
def group_rows(items: list[dict], y_tolerance_ratio: float = 0.6) -> list[list[dict]]:
|
||
"""텍스트 조각들을 y 좌표 기준으로 행 단위로 그룹화 (글자 높이 비례 허용치)."""
|
||
if not items:
|
||
return []
|
||
heights = [it['y1'] - it['y0'] for it in items]
|
||
median_h = sorted(heights)[len(heights) // 2]
|
||
y_tol = max(8, median_h * y_tolerance_ratio)
|
||
|
||
sorted_items = sorted(items, key=lambda it: it['cy'])
|
||
rows: list[list[dict]] = []
|
||
for it in sorted_items:
|
||
if rows and abs(it['cy'] - rows[-1][-1]['cy']) <= y_tol:
|
||
rows[-1].append(it)
|
||
else:
|
||
rows.append([it])
|
||
for row in rows:
|
||
row.sort(key=lambda it: it['cx'])
|
||
return rows
|
||
|
||
|
||
def _match_label(text: str) -> str | None:
|
||
key = _norm_label(text)
|
||
if not key:
|
||
return None
|
||
# 정확 일치 우선
|
||
if key in LABEL_INDEX:
|
||
return LABEL_INDEX[key]
|
||
# 접두 일치 (OCR이 뒤에 잡티를 붙이는 경우)
|
||
for cand_key, field in LABEL_INDEX.items():
|
||
if len(cand_key) >= 2 and key.startswith(cand_key):
|
||
return field
|
||
return None
|
||
|
||
|
||
def parse_card(items: list[dict]) -> dict[str, Any]:
|
||
"""OCR 결과 목록을 필드 dict로 변환."""
|
||
rows = group_rows(items)
|
||
result: dict[str, Any] = {}
|
||
|
||
# 1) 행 내 "레이블 → 값" 쌍 추출
|
||
# 같은 행에서 레이블 바로 뒤의 첫 non-label 텍스트를 값으로 사용.
|
||
for row in rows:
|
||
# 여러 레이블이 같은 행에 있을 수 있음 (2컬럼 표 구조)
|
||
idx = 0
|
||
while idx < len(row):
|
||
field = _match_label(row[idx]['text'])
|
||
if field:
|
||
# 다음 non-label 조각을 값으로 취함
|
||
value_parts: list[str] = []
|
||
j = idx + 1
|
||
while j < len(row):
|
||
nxt = row[j]
|
||
if _match_label(nxt['text']):
|
||
break
|
||
value_parts.append(nxt['text'])
|
||
j += 1
|
||
if value_parts and field not in result:
|
||
value = ' '.join(value_parts).strip()
|
||
if value and value not in ('-', '–', 'N/A'):
|
||
result[field] = value
|
||
idx = j
|
||
else:
|
||
idx += 1
|
||
|
||
# 2) NFPA 추출: "NFPA" 단어 주변의 0~4 숫자 3개
|
||
nfpa_idx_row: int | None = None
|
||
for ri, row in enumerate(rows):
|
||
for cell in row:
|
||
if re.search(r'NFPA', cell['text']):
|
||
nfpa_idx_row = ri
|
||
break
|
||
if nfpa_idx_row is not None:
|
||
break
|
||
if nfpa_idx_row is not None:
|
||
# 해당 행 + 다음 2개 행에서 0~4 숫자 수집
|
||
candidates: list[int] = []
|
||
for ri in range(nfpa_idx_row, min(nfpa_idx_row + 3, len(rows))):
|
||
for cell in rows[ri]:
|
||
m = NFPA_VALUE_RE.match(cell['text'].strip())
|
||
if m:
|
||
candidates.append(int(cell['text'].strip()))
|
||
if len(candidates) >= 3:
|
||
break
|
||
if len(candidates) >= 3:
|
||
break
|
||
if len(candidates) >= 3:
|
||
result['nfpa'] = {
|
||
'health': candidates[0],
|
||
'fire': candidates[1],
|
||
'reactivity': candidates[2],
|
||
'special': '',
|
||
}
|
||
|
||
# 3) EmS 코드 (F-x / S-x 패턴)
|
||
all_text = ' '.join(cell['text'] for row in rows for cell in row)
|
||
f_match = re.search(r'F\s*-\s*([A-Z])', all_text)
|
||
s_match = re.search(r'S\s*-\s*([A-Z])', all_text)
|
||
if f_match or s_match:
|
||
parts = []
|
||
if f_match:
|
||
parts.append(f'F-{f_match.group(1)}')
|
||
if s_match:
|
||
parts.append(f'S-{s_match.group(1)}')
|
||
if parts:
|
||
result['emsCode'] = ', '.join(parts)
|
||
|
||
# 4) ERG 번호 (3자리 숫자, P 접미사 가능, "ERG" 키워드 근처)
|
||
erg_match = re.search(r'ERG[^\d]{0,10}(\d{3}P?)', all_text)
|
||
if erg_match:
|
||
result['ergNumber'] = erg_match.group(1)
|
||
|
||
# 5) EmS F-x / S-x 코드 뒤의 본문 (생략 - 이미지 내 텍스트 밀도가 낮아 행 단위로 이미 잡힘)
|
||
|
||
return result
|
||
|
||
|
||
def _preprocess_image(pil_img, upscale: float = 2.5):
|
||
"""한글 OCR 정확도 향상을 위한 업스케일 + 샤프닝 + 대비 향상."""
|
||
from PIL import Image, ImageEnhance, ImageFilter
|
||
import numpy as np
|
||
|
||
if pil_img.mode != 'RGB':
|
||
pil_img = pil_img.convert('RGB')
|
||
|
||
# 1) 업스케일 (LANCZOS)
|
||
w, h = pil_img.size
|
||
pil_img = pil_img.resize((int(w * upscale), int(h * upscale)), Image.LANCZOS)
|
||
|
||
# 2) 대비 향상
|
||
pil_img = ImageEnhance.Contrast(pil_img).enhance(1.3)
|
||
|
||
# 3) 샤프닝
|
||
pil_img = pil_img.filter(ImageFilter.UnsharpMask(radius=1.5, percent=150, threshold=2))
|
||
|
||
return np.array(pil_img)
|
||
|
||
|
||
def run_ocr(image_path: Path, reader, upscale: float = 2.5) -> list[dict]:
|
||
# OpenCV가 Windows에서 한글 경로를 못 읽으므로 PIL로 로드 후 전처리
|
||
from PIL import Image
|
||
with Image.open(image_path) as pil:
|
||
img = _preprocess_image(pil, upscale=upscale)
|
||
raw = reader.readtext(img, detail=1, paragraph=False)
|
||
items: list[dict] = []
|
||
for bbox, text, conf in raw:
|
||
if not text or not str(text).strip():
|
||
continue
|
||
xs = [p[0] for p in bbox]
|
||
ys = [p[1] for p in bbox]
|
||
items.append({
|
||
'text': str(text).strip(),
|
||
'cx': sum(xs) / 4.0,
|
||
'cy': sum(ys) / 4.0,
|
||
'x0': min(xs),
|
||
'x1': max(xs),
|
||
'y0': min(ys),
|
||
'y1': max(ys),
|
||
'conf': float(conf),
|
||
})
|
||
return items
|
||
|
||
|
||
def load_json(path: Path, fallback):
|
||
if not path.exists():
|
||
return fallback
|
||
try:
|
||
return json.loads(path.read_text(encoding='utf-8'))
|
||
except Exception:
|
||
return fallback
|
||
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument('--limit', type=int, default=None)
|
||
parser.add_argument('--only', type=str, default=None,
|
||
help='파이프(|)로 구분된 물질명 리스트')
|
||
parser.add_argument('--img-dir', type=Path, default=IMG_DIR)
|
||
parser.add_argument('--out', type=Path, default=OCR_PATH_DEFAULT)
|
||
parser.add_argument('--fail', type=Path, default=FAIL_PATH_DEFAULT)
|
||
parser.add_argument('--debug', action='store_true',
|
||
help='파싱 중간 결과(row 단위) 함께 출력')
|
||
args = parser.parse_args()
|
||
|
||
import easyocr # noqa: WPS433
|
||
|
||
print('[로딩] EasyOCR 모델 (ko + en)... (최초 실행 시 수 분 소요)')
|
||
reader = easyocr.Reader(['ko', 'en'], gpu=False, verbose=False)
|
||
print('[로딩] 완료')
|
||
|
||
images = sorted([p for p in args.img_dir.iterdir() if p.suffix.lower() in {'.png', '.jpg', '.jpeg'}])
|
||
if args.only:
|
||
only_set = {s.strip() for s in args.only.split('|') if s.strip()}
|
||
images = [p for p in images if p.stem in only_set]
|
||
|
||
existing: dict[str, Any] = load_json(args.out, {})
|
||
failures: dict[str, str] = load_json(args.fail, {})
|
||
|
||
pending = [p for p in images if p.stem not in existing]
|
||
if args.limit:
|
||
pending = pending[: args.limit]
|
||
|
||
print(f'[대상] {len(images)}개 중 대기 {len(pending)}개, 이미 처리 {len(existing)}개')
|
||
|
||
ok = 0
|
||
fail = 0
|
||
for i, path in enumerate(pending, start=1):
|
||
name = path.stem
|
||
try:
|
||
items = run_ocr(path, reader)
|
||
parsed = parse_card(items)
|
||
if args.debug:
|
||
print(f'\n--- {name} (텍스트 {len(items)}개) ---')
|
||
for row in group_rows(items):
|
||
print(' |', ' │ '.join(f'{c["text"]}' for c in row))
|
||
print(f' → parsed: {parsed}')
|
||
existing[name] = parsed
|
||
if name in failures:
|
||
del failures[name]
|
||
ok += 1
|
||
except Exception as e: # noqa: BLE001
|
||
failures[name] = f'{type(e).__name__}: {e}'[:500]
|
||
fail += 1
|
||
print(f'[실패] {name}: {e}')
|
||
|
||
if i % 10 == 0 or i == len(pending):
|
||
args.out.write_text(json.dumps(existing, ensure_ascii=False, indent=2), encoding='utf-8')
|
||
args.fail.write_text(json.dumps(failures, ensure_ascii=False, indent=2), encoding='utf-8')
|
||
print(f' 진행 {i}/{len(pending)} (성공 {ok}, 실패 {fail}) - 중간 저장')
|
||
|
||
args.out.write_text(json.dumps(existing, ensure_ascii=False, indent=2), encoding='utf-8')
|
||
args.fail.write_text(json.dumps(failures, ensure_ascii=False, indent=2), encoding='utf-8')
|
||
|
||
print(f'\n[완료] 성공 {ok} / 실패 {fail}')
|
||
print(f' 결과: {args.out}')
|
||
if failures:
|
||
print(f' 실패 목록: {args.fail}')
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|