"""한중어업협정 중국어선 허가현황 XLS → kcgdb 적재. Usage: python3 prediction/scripts/load_fishery_permit_cn.py # 또는 기본 경로(docs/중국어선_허가현황_YYYYMMDD.xls 최신) 자동 탐색 python3 prediction/scripts/load_fishery_permit_cn.py 수행: 1) XLS 파싱 → kcg.fishery_permit_cn upsert (permit_year + permit_no 복합 유니크) 2) 신청인(중국어) 기준 kcg.fleet_companies upsert 3) 해당 연도 레코드를 kcg.fleet_vessels로 동기화 - PT-S(부속선)는 parent_permit_no로 본선 pair_vessel_id 연결 - fleet_role: FC=TRANSPORT, PT=MAIN, PT-S=CREW, 기타=MAIN """ from __future__ import annotations import json import logging import os import re import sys from pathlib import Path from typing import Optional import pandas as pd import psycopg2 import psycopg2.extras logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') logger = logging.getLogger(__name__) def _env(key: str, default: Optional[str] = None) -> str: v = os.environ.get(key, default) if v is None: raise SystemExit(f'환경변수 {key} 가 필요합니다') return v def _find_latest_xls(docs_dir: Path) -> Path: pattern = re.compile(r'중국어선_허가현황_(\d{8})\.xls$') candidates = [] for p in docs_dir.glob('중국어선_허가현황_*.xls'): m = pattern.search(p.name) if m: candidates.append((m.group(1), p)) if not candidates: raise SystemExit(f'{docs_dir} 에서 허가현황 XLS를 찾지 못했습니다') candidates.sort() return candidates[-1][1] def _extract_year(path: Path) -> int: m = re.search(r'(\d{4})\d{4}', path.name) if not m: raise SystemExit(f'파일명에서 연도 추출 실패: {path.name}') return int(m.group(1)) FISHERY_ROLE = { 'FC': 'TRANSPORT', 'PT': 'MAIN', 'PT-S': 'CREW', 'GN': 'MAIN', 'PS': 'MAIN', 'OT': 'MAIN', } # 업종코드 → fleet_tracker가 쓰는 레거시 gear_code 매핑 # (vessel_type 오버라이드 로직이 C21을 체크하므로 PT만 C21) GEAR_CODE_LEGACY = { 'PT': 'C21', 'PT-S': 'C21', 'GN': 'C22', 'OT': 'C22', 'PS': 'PS', 'FC': 'FC', } def _clean(v) -> Optional[str]: if v is None: return None if isinstance(v, float) and pd.isna(v): return None s = str(v).strip() return s if s else None def _num(v) -> Optional[float]: if v is None: return None try: if isinstance(v, float) and pd.isna(v): return None return float(v) except (TypeError, ValueError): return None def _int(v) -> Optional[int]: f = _num(v) return int(f) if f is not None else None def parse_xls(path: Path) -> list[dict]: df = pd.read_excel(path, engine='xlrd', header=1) rows: list[dict] = [] for _, r in df.iterrows(): permit_no = _clean(r.get('허가번호')) fishery_code = _clean(r.get('업종코드')) name_cn = _clean(r.get('선박명(중국)')) name_en = _clean(r.get('선박명(로마)')) if not (permit_no and fishery_code and name_cn and name_en): continue rows.append({ 'permit_no': permit_no, 'fishery_type': _clean(r.get('업종')), 'fishery_code': fishery_code, 'name_cn': name_cn, 'name_en': name_en, 'applicant_cn': _clean(r.get('신청인(중국)')), 'applicant_en': _clean(r.get('신청인(로마)')), 'applicant_addr_cn': _clean(r.get('신청인주소(중국)')), 'applicant_addr_en': _clean(r.get('신청인주소(로마)')), 'registration_no': _clean(r.get('선박등록번호')), 'tonnage': _num(r.get('톤수')), 'port_cn': _clean(r.get('선적항(중국)')), 'port_en': _clean(r.get('선적항(로마)')), 'callsign': _clean(r.get('호출부호')), 'engine_power': _num(r.get('기관출력(마력)')), 'length_m': _num(r.get('길이(m)')), 'beam_m': _num(r.get('폭(m)')), 'depth_m': _num(r.get('깊이(m)')), 'fishing_zones': _clean(r.get('조업수역')), 'fishing_period_1': _clean(r.get('조업기간1')), 'fishing_period_2': _clean(r.get('조업기간2')), 'catch_quota_t': _num(r.get('어획할당량(톤)')), 'cumulative_quota_t': _num(r.get('현어기의\n신규(첫)허가부터 \n누적 어획할당량(톤)')), 'refrig_hold_count': _int(r.get('냉장 어창의 수')), 'freezer_hold_count': _int(r.get('냉동 어창의 수')), 'admin_sanction': _clean(r.get('행정처분일자/행정처분기관/위반내용')), 'parent_permit_no': _clean(r.get('본선의\n허가번호')), 'volume_enclosed': _num(r.get('용적\n폐위장소(㎥)')), 'volume_above_deck': _num(r.get('용적\n상갑판 위(㎥)')), 'volume_below_deck': _num(r.get('용적\n상갑판 아래(㎥)')), 'volume_excluded': _num(r.get('용적\n제외장소(㎥)')), 'raw': {k: (None if (isinstance(v, float) and pd.isna(v)) else v) for k, v in r.items()}, }) logger.info('파싱된 허가 수: %d', len(rows)) return rows def upsert_permits(cur, year: int, source_file: str, rows: list[dict]) -> None: sql = """ INSERT INTO kcg.fishery_permit_cn ( permit_year, permit_no, fishery_type, fishery_code, name_cn, name_en, applicant_cn, applicant_en, applicant_addr_cn, applicant_addr_en, registration_no, tonnage, port_cn, port_en, callsign, engine_power, length_m, beam_m, depth_m, fishing_zones, fishing_period_1, fishing_period_2, catch_quota_t, cumulative_quota_t, refrig_hold_count, freezer_hold_count, admin_sanction, parent_permit_no, volume_enclosed, volume_above_deck, volume_below_deck, volume_excluded, raw_data, source_file ) VALUES %s ON CONFLICT (permit_year, permit_no) DO UPDATE SET fishery_type = EXCLUDED.fishery_type, fishery_code = EXCLUDED.fishery_code, name_cn = EXCLUDED.name_cn, name_en = EXCLUDED.name_en, applicant_cn = EXCLUDED.applicant_cn, applicant_en = EXCLUDED.applicant_en, applicant_addr_cn = EXCLUDED.applicant_addr_cn, applicant_addr_en = EXCLUDED.applicant_addr_en, registration_no = EXCLUDED.registration_no, tonnage = EXCLUDED.tonnage, port_cn = EXCLUDED.port_cn, port_en = EXCLUDED.port_en, callsign = EXCLUDED.callsign, engine_power = EXCLUDED.engine_power, length_m = EXCLUDED.length_m, beam_m = EXCLUDED.beam_m, depth_m = EXCLUDED.depth_m, fishing_zones = EXCLUDED.fishing_zones, fishing_period_1 = EXCLUDED.fishing_period_1, fishing_period_2 = EXCLUDED.fishing_period_2, catch_quota_t = EXCLUDED.catch_quota_t, cumulative_quota_t = EXCLUDED.cumulative_quota_t, refrig_hold_count = EXCLUDED.refrig_hold_count, freezer_hold_count = EXCLUDED.freezer_hold_count, admin_sanction = EXCLUDED.admin_sanction, parent_permit_no = EXCLUDED.parent_permit_no, volume_enclosed = EXCLUDED.volume_enclosed, volume_above_deck = EXCLUDED.volume_above_deck, volume_below_deck = EXCLUDED.volume_below_deck, volume_excluded = EXCLUDED.volume_excluded, raw_data = EXCLUDED.raw_data, source_file = EXCLUDED.source_file, loaded_at = now() """ tuples = [ ( year, r['permit_no'], r['fishery_type'], r['fishery_code'], r['name_cn'], r['name_en'], r['applicant_cn'], r['applicant_en'], r['applicant_addr_cn'], r['applicant_addr_en'], r['registration_no'], r['tonnage'], r['port_cn'], r['port_en'], r['callsign'], r['engine_power'], r['length_m'], r['beam_m'], r['depth_m'], r['fishing_zones'], r['fishing_period_1'], r['fishing_period_2'], r['catch_quota_t'], r['cumulative_quota_t'], r['refrig_hold_count'], r['freezer_hold_count'], r['admin_sanction'], r['parent_permit_no'], r['volume_enclosed'], r['volume_above_deck'], r['volume_below_deck'], r['volume_excluded'], json.dumps({k: (v.isoformat() if hasattr(v, 'isoformat') else v) for k, v in r['raw'].items()}, ensure_ascii=False, default=str), source_file, ) for r in rows ] psycopg2.extras.execute_values(cur, sql, tuples, page_size=200) logger.info('fishery_permit_cn upsert: %d rows', len(tuples)) def upsert_companies(cur, rows: list[dict]) -> dict[str, int]: """신청인(중국어) 기준 fleet_companies upsert → {applicant_cn: company_id}.""" applicants: dict[str, dict] = {} for r in rows: key = r['applicant_cn'] or r['applicant_en'] or 'UNKNOWN' applicants.setdefault(key, {'name_cn': r['applicant_cn'], 'name_en': r['applicant_en']}) result: dict[str, int] = {} for key, meta in applicants.items(): cur.execute( """ INSERT INTO kcg.fleet_companies (name_cn, name_en, country) VALUES (%s, %s, 'CN') ON CONFLICT DO NOTHING """, (meta['name_cn'], meta['name_en']), ) cur.execute( 'SELECT id FROM kcg.fleet_companies WHERE name_cn IS NOT DISTINCT FROM %s AND name_en IS NOT DISTINCT FROM %s', (meta['name_cn'], meta['name_en']), ) row = cur.fetchone() if row: result[key] = row[0] logger.info('fleet_companies upsert: %d 신청인', len(result)) return result def sync_fleet_vessels(cur, year: int, rows: list[dict], company_map: dict[str, int]) -> None: """해당 연도 허가를 fleet_vessels로 동기화. permit_no+permit_year 기준 upsert.""" # 기존 연도 데이터 먼저 비우기 (허가 취소된 선박 정리) cur.execute( 'DELETE FROM kcg.fleet_vessels WHERE permit_year = %s AND permit_no NOT IN %s', (year, tuple([r['permit_no'] for r in rows]) or ('',)), ) inserted = 0 updated = 0 permit_to_id: dict[str, int] = {} for r in rows: company_key = r['applicant_cn'] or r['applicant_en'] or 'UNKNOWN' company_id = company_map.get(company_key) if company_id is None: continue fishery_code = r['fishery_code'] legacy_gear = GEAR_CODE_LEGACY.get(fishery_code, fishery_code) fleet_role = FISHERY_ROLE.get(fishery_code, 'MAIN') cur.execute( """ INSERT INTO kcg.fleet_vessels ( company_id, permit_no, name_cn, name_en, tonnage, gear_code, fishery_code, fleet_role, permit_year, updated_at ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, now()) ON CONFLICT DO NOTHING RETURNING id """, ( company_id, r['permit_no'], r['name_cn'], r['name_en'], r['tonnage'], legacy_gear, fishery_code, fleet_role, year, ), ) ret = cur.fetchone() if ret: permit_to_id[r['permit_no']] = ret[0] inserted += 1 else: cur.execute( """ UPDATE kcg.fleet_vessels SET company_id = %s, name_cn = %s, name_en = %s, tonnage = %s, gear_code = %s, fishery_code = %s, fleet_role = %s, updated_at = now() WHERE permit_year = %s AND permit_no = %s RETURNING id """, ( company_id, r['name_cn'], r['name_en'], r['tonnage'], legacy_gear, fishery_code, fleet_role, year, r['permit_no'], ), ) ret = cur.fetchone() if ret: permit_to_id[r['permit_no']] = ret[0] updated += 1 logger.info('fleet_vessels 동기화: inserted=%d, updated=%d (year=%d)', inserted, updated, year) # PT-S(부속선) → 본선 pair_vessel_id 연결 pair_linked = 0 for r in rows: if r['fishery_code'] != 'PT-S' or not r['parent_permit_no']: continue child_id = permit_to_id.get(r['permit_no']) parent_id = permit_to_id.get(r['parent_permit_no']) if child_id and parent_id: cur.execute( 'UPDATE kcg.fleet_vessels SET pair_vessel_id = %s WHERE id = %s', (parent_id, child_id), ) cur.execute( 'UPDATE kcg.fleet_vessels SET pair_vessel_id = %s WHERE id = %s AND pair_vessel_id IS NULL', (child_id, parent_id), ) pair_linked += 1 logger.info('PT-S pair 연결: %d 건', pair_linked) def main() -> None: repo_root = Path(__file__).resolve().parents[2] if len(sys.argv) >= 2: xls_path = Path(sys.argv[1]) else: xls_path = _find_latest_xls(repo_root / 'docs') if not xls_path.exists(): raise SystemExit(f'파일 없음: {xls_path}') year = _extract_year(xls_path) logger.info('source=%s year=%d', xls_path.name, year) rows = parse_xls(xls_path) conn = psycopg2.connect( host=_env('KCGDB_HOST'), port=int(_env('KCGDB_PORT', '5432')), dbname=_env('KCGDB_NAME'), user=_env('KCGDB_USER'), password=_env('KCGDB_PASSWORD'), ) try: with conn: with conn.cursor() as cur: upsert_permits(cur, year, xls_path.name, rows) company_map = upsert_companies(cur, rows) sync_fleet_vessels(cur, year, rows, company_map) logger.info('완료') finally: conn.close() if __name__ == '__main__': main()