- V029: kcg.fishery_permit_cn 신규 테이블(연단위, permit_year+permit_no 복합 유니크) + fleet_vessels permit_year/fishery_code 컬럼 - load_fishery_permit_cn.py: xls → DB 적재 스크립트, 906척 + 497 신청인사 upsert - G-04/G-05/G-06 Dead code 해결: classify_gear_violations 호출 연결, dir() 버그 제거 - find_pair_candidates: bbox 1차 + 궤적 유사도(location/sog_corr/cog_alignment) 2차, role 가점 - spoofing 산식 교체: 1시간 윈도우 + teleport 절대 가점 + extreme 50kn 단독 0.6 확정 - transshipment 선종 완화: shipTy 부분일치 + 412* FISHING 간주 - gear_code DB write 경로 신설 + fleet_tracker API 3개 추가 - cron 스크립트: fishery_permit/pair_type/fleet_role 신규 섹션
359 lines
14 KiB
Python
359 lines
14 KiB
Python
"""한중어업협정 중국어선 허가현황 XLS → kcgdb 적재.
|
|
|
|
Usage:
|
|
python3 prediction/scripts/load_fishery_permit_cn.py <xls_path>
|
|
# 또는 기본 경로(docs/중국어선_허가현황_YYYYMMDD.xls 최신) 자동 탐색
|
|
python3 prediction/scripts/load_fishery_permit_cn.py
|
|
|
|
수행:
|
|
1) XLS 파싱 → kcg.fishery_permit_cn upsert (permit_year + permit_no 복합 유니크)
|
|
2) 신청인(중국어) 기준 kcg.fleet_companies upsert
|
|
3) 해당 연도 레코드를 kcg.fleet_vessels로 동기화
|
|
- PT-S(부속선)는 parent_permit_no로 본선 pair_vessel_id 연결
|
|
- fleet_role: FC=TRANSPORT, PT=MAIN, PT-S=CREW, 기타=MAIN
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import pandas as pd
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _env(key: str, default: Optional[str] = None) -> str:
|
|
v = os.environ.get(key, default)
|
|
if v is None:
|
|
raise SystemExit(f'환경변수 {key} 가 필요합니다')
|
|
return v
|
|
|
|
|
|
def _find_latest_xls(docs_dir: Path) -> Path:
|
|
pattern = re.compile(r'중국어선_허가현황_(\d{8})\.xls$')
|
|
candidates = []
|
|
for p in docs_dir.glob('중국어선_허가현황_*.xls'):
|
|
m = pattern.search(p.name)
|
|
if m:
|
|
candidates.append((m.group(1), p))
|
|
if not candidates:
|
|
raise SystemExit(f'{docs_dir} 에서 허가현황 XLS를 찾지 못했습니다')
|
|
candidates.sort()
|
|
return candidates[-1][1]
|
|
|
|
|
|
def _extract_year(path: Path) -> int:
|
|
m = re.search(r'(\d{4})\d{4}', path.name)
|
|
if not m:
|
|
raise SystemExit(f'파일명에서 연도 추출 실패: {path.name}')
|
|
return int(m.group(1))
|
|
|
|
|
|
FISHERY_ROLE = {
|
|
'FC': 'TRANSPORT',
|
|
'PT': 'MAIN',
|
|
'PT-S': 'CREW',
|
|
'GN': 'MAIN',
|
|
'PS': 'MAIN',
|
|
'OT': 'MAIN',
|
|
}
|
|
|
|
# 업종코드 → fleet_tracker가 쓰는 레거시 gear_code 매핑
|
|
# (vessel_type 오버라이드 로직이 C21을 체크하므로 PT만 C21)
|
|
GEAR_CODE_LEGACY = {
|
|
'PT': 'C21',
|
|
'PT-S': 'C21',
|
|
'GN': 'C22',
|
|
'OT': 'C22',
|
|
'PS': 'PS',
|
|
'FC': 'FC',
|
|
}
|
|
|
|
|
|
def _clean(v) -> Optional[str]:
|
|
if v is None:
|
|
return None
|
|
if isinstance(v, float) and pd.isna(v):
|
|
return None
|
|
s = str(v).strip()
|
|
return s if s else None
|
|
|
|
|
|
def _num(v) -> Optional[float]:
|
|
if v is None:
|
|
return None
|
|
try:
|
|
if isinstance(v, float) and pd.isna(v):
|
|
return None
|
|
return float(v)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def _int(v) -> Optional[int]:
|
|
f = _num(v)
|
|
return int(f) if f is not None else None
|
|
|
|
|
|
def parse_xls(path: Path) -> list[dict]:
|
|
df = pd.read_excel(path, engine='xlrd', header=1)
|
|
rows: list[dict] = []
|
|
for _, r in df.iterrows():
|
|
permit_no = _clean(r.get('허가번호'))
|
|
fishery_code = _clean(r.get('업종코드'))
|
|
name_cn = _clean(r.get('선박명(중국)'))
|
|
name_en = _clean(r.get('선박명(로마)'))
|
|
if not (permit_no and fishery_code and name_cn and name_en):
|
|
continue
|
|
rows.append({
|
|
'permit_no': permit_no,
|
|
'fishery_type': _clean(r.get('업종')),
|
|
'fishery_code': fishery_code,
|
|
'name_cn': name_cn,
|
|
'name_en': name_en,
|
|
'applicant_cn': _clean(r.get('신청인(중국)')),
|
|
'applicant_en': _clean(r.get('신청인(로마)')),
|
|
'applicant_addr_cn': _clean(r.get('신청인주소(중국)')),
|
|
'applicant_addr_en': _clean(r.get('신청인주소(로마)')),
|
|
'registration_no': _clean(r.get('선박등록번호')),
|
|
'tonnage': _num(r.get('톤수')),
|
|
'port_cn': _clean(r.get('선적항(중국)')),
|
|
'port_en': _clean(r.get('선적항(로마)')),
|
|
'callsign': _clean(r.get('호출부호')),
|
|
'engine_power': _num(r.get('기관출력(마력)')),
|
|
'length_m': _num(r.get('길이(m)')),
|
|
'beam_m': _num(r.get('폭(m)')),
|
|
'depth_m': _num(r.get('깊이(m)')),
|
|
'fishing_zones': _clean(r.get('조업수역')),
|
|
'fishing_period_1': _clean(r.get('조업기간1')),
|
|
'fishing_period_2': _clean(r.get('조업기간2')),
|
|
'catch_quota_t': _num(r.get('어획할당량(톤)')),
|
|
'cumulative_quota_t': _num(r.get('현어기의\n신규(첫)허가부터 \n누적 어획할당량(톤)')),
|
|
'refrig_hold_count': _int(r.get('냉장 어창의 수')),
|
|
'freezer_hold_count': _int(r.get('냉동 어창의 수')),
|
|
'admin_sanction': _clean(r.get('행정처분일자/행정처분기관/위반내용')),
|
|
'parent_permit_no': _clean(r.get('본선의\n허가번호')),
|
|
'volume_enclosed': _num(r.get('용적\n폐위장소(㎥)')),
|
|
'volume_above_deck': _num(r.get('용적\n상갑판 위(㎥)')),
|
|
'volume_below_deck': _num(r.get('용적\n상갑판 아래(㎥)')),
|
|
'volume_excluded': _num(r.get('용적\n제외장소(㎥)')),
|
|
'raw': {k: (None if (isinstance(v, float) and pd.isna(v)) else v) for k, v in r.items()},
|
|
})
|
|
logger.info('파싱된 허가 수: %d', len(rows))
|
|
return rows
|
|
|
|
|
|
def upsert_permits(cur, year: int, source_file: str, rows: list[dict]) -> None:
|
|
sql = """
|
|
INSERT INTO kcg.fishery_permit_cn (
|
|
permit_year, permit_no, fishery_type, fishery_code,
|
|
name_cn, name_en, applicant_cn, applicant_en,
|
|
applicant_addr_cn, applicant_addr_en, registration_no, tonnage,
|
|
port_cn, port_en, callsign, engine_power,
|
|
length_m, beam_m, depth_m, fishing_zones,
|
|
fishing_period_1, fishing_period_2, catch_quota_t, cumulative_quota_t,
|
|
refrig_hold_count, freezer_hold_count, admin_sanction, parent_permit_no,
|
|
volume_enclosed, volume_above_deck, volume_below_deck, volume_excluded,
|
|
raw_data, source_file
|
|
) VALUES %s
|
|
ON CONFLICT (permit_year, permit_no) DO UPDATE SET
|
|
fishery_type = EXCLUDED.fishery_type,
|
|
fishery_code = EXCLUDED.fishery_code,
|
|
name_cn = EXCLUDED.name_cn,
|
|
name_en = EXCLUDED.name_en,
|
|
applicant_cn = EXCLUDED.applicant_cn,
|
|
applicant_en = EXCLUDED.applicant_en,
|
|
applicant_addr_cn = EXCLUDED.applicant_addr_cn,
|
|
applicant_addr_en = EXCLUDED.applicant_addr_en,
|
|
registration_no = EXCLUDED.registration_no,
|
|
tonnage = EXCLUDED.tonnage,
|
|
port_cn = EXCLUDED.port_cn,
|
|
port_en = EXCLUDED.port_en,
|
|
callsign = EXCLUDED.callsign,
|
|
engine_power = EXCLUDED.engine_power,
|
|
length_m = EXCLUDED.length_m,
|
|
beam_m = EXCLUDED.beam_m,
|
|
depth_m = EXCLUDED.depth_m,
|
|
fishing_zones = EXCLUDED.fishing_zones,
|
|
fishing_period_1 = EXCLUDED.fishing_period_1,
|
|
fishing_period_2 = EXCLUDED.fishing_period_2,
|
|
catch_quota_t = EXCLUDED.catch_quota_t,
|
|
cumulative_quota_t = EXCLUDED.cumulative_quota_t,
|
|
refrig_hold_count = EXCLUDED.refrig_hold_count,
|
|
freezer_hold_count = EXCLUDED.freezer_hold_count,
|
|
admin_sanction = EXCLUDED.admin_sanction,
|
|
parent_permit_no = EXCLUDED.parent_permit_no,
|
|
volume_enclosed = EXCLUDED.volume_enclosed,
|
|
volume_above_deck = EXCLUDED.volume_above_deck,
|
|
volume_below_deck = EXCLUDED.volume_below_deck,
|
|
volume_excluded = EXCLUDED.volume_excluded,
|
|
raw_data = EXCLUDED.raw_data,
|
|
source_file = EXCLUDED.source_file,
|
|
loaded_at = now()
|
|
"""
|
|
tuples = [
|
|
(
|
|
year, r['permit_no'], r['fishery_type'], r['fishery_code'],
|
|
r['name_cn'], r['name_en'], r['applicant_cn'], r['applicant_en'],
|
|
r['applicant_addr_cn'], r['applicant_addr_en'], r['registration_no'], r['tonnage'],
|
|
r['port_cn'], r['port_en'], r['callsign'], r['engine_power'],
|
|
r['length_m'], r['beam_m'], r['depth_m'], r['fishing_zones'],
|
|
r['fishing_period_1'], r['fishing_period_2'], r['catch_quota_t'], r['cumulative_quota_t'],
|
|
r['refrig_hold_count'], r['freezer_hold_count'], r['admin_sanction'], r['parent_permit_no'],
|
|
r['volume_enclosed'], r['volume_above_deck'], r['volume_below_deck'], r['volume_excluded'],
|
|
json.dumps({k: (v.isoformat() if hasattr(v, 'isoformat') else v) for k, v in r['raw'].items()}, ensure_ascii=False, default=str),
|
|
source_file,
|
|
)
|
|
for r in rows
|
|
]
|
|
psycopg2.extras.execute_values(cur, sql, tuples, page_size=200)
|
|
logger.info('fishery_permit_cn upsert: %d rows', len(tuples))
|
|
|
|
|
|
def upsert_companies(cur, rows: list[dict]) -> dict[str, int]:
|
|
"""신청인(중국어) 기준 fleet_companies upsert → {applicant_cn: company_id}."""
|
|
applicants: dict[str, dict] = {}
|
|
for r in rows:
|
|
key = r['applicant_cn'] or r['applicant_en'] or 'UNKNOWN'
|
|
applicants.setdefault(key, {'name_cn': r['applicant_cn'], 'name_en': r['applicant_en']})
|
|
result: dict[str, int] = {}
|
|
for key, meta in applicants.items():
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO kcg.fleet_companies (name_cn, name_en, country)
|
|
VALUES (%s, %s, 'CN')
|
|
ON CONFLICT DO NOTHING
|
|
""",
|
|
(meta['name_cn'], meta['name_en']),
|
|
)
|
|
cur.execute(
|
|
'SELECT id FROM kcg.fleet_companies WHERE name_cn IS NOT DISTINCT FROM %s AND name_en IS NOT DISTINCT FROM %s',
|
|
(meta['name_cn'], meta['name_en']),
|
|
)
|
|
row = cur.fetchone()
|
|
if row:
|
|
result[key] = row[0]
|
|
logger.info('fleet_companies upsert: %d 신청인', len(result))
|
|
return result
|
|
|
|
|
|
def sync_fleet_vessels(cur, year: int, rows: list[dict], company_map: dict[str, int]) -> None:
|
|
"""해당 연도 허가를 fleet_vessels로 동기화. permit_no+permit_year 기준 upsert."""
|
|
# 기존 연도 데이터 먼저 비우기 (허가 취소된 선박 정리)
|
|
cur.execute(
|
|
'DELETE FROM kcg.fleet_vessels WHERE permit_year = %s AND permit_no NOT IN %s',
|
|
(year, tuple([r['permit_no'] for r in rows]) or ('',)),
|
|
)
|
|
inserted = 0
|
|
updated = 0
|
|
permit_to_id: dict[str, int] = {}
|
|
for r in rows:
|
|
company_key = r['applicant_cn'] or r['applicant_en'] or 'UNKNOWN'
|
|
company_id = company_map.get(company_key)
|
|
if company_id is None:
|
|
continue
|
|
fishery_code = r['fishery_code']
|
|
legacy_gear = GEAR_CODE_LEGACY.get(fishery_code, fishery_code)
|
|
fleet_role = FISHERY_ROLE.get(fishery_code, 'MAIN')
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO kcg.fleet_vessels (
|
|
company_id, permit_no, name_cn, name_en, tonnage,
|
|
gear_code, fishery_code, fleet_role, permit_year, updated_at
|
|
)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, now())
|
|
ON CONFLICT DO NOTHING
|
|
RETURNING id
|
|
""",
|
|
(
|
|
company_id, r['permit_no'], r['name_cn'], r['name_en'], r['tonnage'],
|
|
legacy_gear, fishery_code, fleet_role, year,
|
|
),
|
|
)
|
|
ret = cur.fetchone()
|
|
if ret:
|
|
permit_to_id[r['permit_no']] = ret[0]
|
|
inserted += 1
|
|
else:
|
|
cur.execute(
|
|
"""
|
|
UPDATE kcg.fleet_vessels SET
|
|
company_id = %s, name_cn = %s, name_en = %s, tonnage = %s,
|
|
gear_code = %s, fishery_code = %s, fleet_role = %s, updated_at = now()
|
|
WHERE permit_year = %s AND permit_no = %s
|
|
RETURNING id
|
|
""",
|
|
(
|
|
company_id, r['name_cn'], r['name_en'], r['tonnage'],
|
|
legacy_gear, fishery_code, fleet_role, year, r['permit_no'],
|
|
),
|
|
)
|
|
ret = cur.fetchone()
|
|
if ret:
|
|
permit_to_id[r['permit_no']] = ret[0]
|
|
updated += 1
|
|
logger.info('fleet_vessels 동기화: inserted=%d, updated=%d (year=%d)', inserted, updated, year)
|
|
|
|
# PT-S(부속선) → 본선 pair_vessel_id 연결
|
|
pair_linked = 0
|
|
for r in rows:
|
|
if r['fishery_code'] != 'PT-S' or not r['parent_permit_no']:
|
|
continue
|
|
child_id = permit_to_id.get(r['permit_no'])
|
|
parent_id = permit_to_id.get(r['parent_permit_no'])
|
|
if child_id and parent_id:
|
|
cur.execute(
|
|
'UPDATE kcg.fleet_vessels SET pair_vessel_id = %s WHERE id = %s',
|
|
(parent_id, child_id),
|
|
)
|
|
cur.execute(
|
|
'UPDATE kcg.fleet_vessels SET pair_vessel_id = %s WHERE id = %s AND pair_vessel_id IS NULL',
|
|
(child_id, parent_id),
|
|
)
|
|
pair_linked += 1
|
|
logger.info('PT-S pair 연결: %d 건', pair_linked)
|
|
|
|
|
|
def main() -> None:
|
|
repo_root = Path(__file__).resolve().parents[2]
|
|
if len(sys.argv) >= 2:
|
|
xls_path = Path(sys.argv[1])
|
|
else:
|
|
xls_path = _find_latest_xls(repo_root / 'docs')
|
|
if not xls_path.exists():
|
|
raise SystemExit(f'파일 없음: {xls_path}')
|
|
|
|
year = _extract_year(xls_path)
|
|
logger.info('source=%s year=%d', xls_path.name, year)
|
|
|
|
rows = parse_xls(xls_path)
|
|
|
|
conn = psycopg2.connect(
|
|
host=_env('KCGDB_HOST'),
|
|
port=int(_env('KCGDB_PORT', '5432')),
|
|
dbname=_env('KCGDB_NAME'),
|
|
user=_env('KCGDB_USER'),
|
|
password=_env('KCGDB_PASSWORD'),
|
|
)
|
|
try:
|
|
with conn:
|
|
with conn.cursor() as cur:
|
|
upsert_permits(cur, year, xls_path.name, rows)
|
|
company_map = upsert_companies(cur, rows)
|
|
sync_fleet_vessels(cur, year, rows, company_map)
|
|
logger.info('완료')
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|