kcg-ai-monitoring/prediction/scripts/load_fishery_permit_cn.py
htlee 8ff04a8cca feat(prediction): DAR-03 탐지 로직 보강 + 한중어업협정 906척 레지스트리 적재
- V029: kcg.fishery_permit_cn 신규 테이블(연단위, permit_year+permit_no 복합 유니크) + fleet_vessels permit_year/fishery_code 컬럼
- load_fishery_permit_cn.py: xls → DB 적재 스크립트, 906척 + 497 신청인사 upsert
- G-04/G-05/G-06 Dead code 해결: classify_gear_violations 호출 연결, dir() 버그 제거
- find_pair_candidates: bbox 1차 + 궤적 유사도(location/sog_corr/cog_alignment) 2차, role 가점
- spoofing 산식 교체: 1시간 윈도우 + teleport 절대 가점 + extreme 50kn 단독 0.6 확정
- transshipment 선종 완화: shipTy 부분일치 + 412* FISHING 간주
- gear_code DB write 경로 신설 + fleet_tracker API 3개 추가
- cron 스크립트: fishery_permit/pair_type/fleet_role 신규 섹션
2026-04-16 07:43:24 +09:00

359 lines
14 KiB
Python

"""한중어업협정 중국어선 허가현황 XLS → kcgdb 적재.
Usage:
python3 prediction/scripts/load_fishery_permit_cn.py <xls_path>
# 또는 기본 경로(docs/중국어선_허가현황_YYYYMMDD.xls 최신) 자동 탐색
python3 prediction/scripts/load_fishery_permit_cn.py
수행:
1) XLS 파싱 → kcg.fishery_permit_cn upsert (permit_year + permit_no 복합 유니크)
2) 신청인(중국어) 기준 kcg.fleet_companies upsert
3) 해당 연도 레코드를 kcg.fleet_vessels로 동기화
- PT-S(부속선)는 parent_permit_no로 본선 pair_vessel_id 연결
- fleet_role: FC=TRANSPORT, PT=MAIN, PT-S=CREW, 기타=MAIN
"""
from __future__ import annotations
import json
import logging
import os
import re
import sys
from pathlib import Path
from typing import Optional
import pandas as pd
import psycopg2
import psycopg2.extras
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger(__name__)
def _env(key: str, default: Optional[str] = None) -> str:
v = os.environ.get(key, default)
if v is None:
raise SystemExit(f'환경변수 {key} 가 필요합니다')
return v
def _find_latest_xls(docs_dir: Path) -> Path:
pattern = re.compile(r'중국어선_허가현황_(\d{8})\.xls$')
candidates = []
for p in docs_dir.glob('중국어선_허가현황_*.xls'):
m = pattern.search(p.name)
if m:
candidates.append((m.group(1), p))
if not candidates:
raise SystemExit(f'{docs_dir} 에서 허가현황 XLS를 찾지 못했습니다')
candidates.sort()
return candidates[-1][1]
def _extract_year(path: Path) -> int:
m = re.search(r'(\d{4})\d{4}', path.name)
if not m:
raise SystemExit(f'파일명에서 연도 추출 실패: {path.name}')
return int(m.group(1))
FISHERY_ROLE = {
'FC': 'TRANSPORT',
'PT': 'MAIN',
'PT-S': 'CREW',
'GN': 'MAIN',
'PS': 'MAIN',
'OT': 'MAIN',
}
# 업종코드 → fleet_tracker가 쓰는 레거시 gear_code 매핑
# (vessel_type 오버라이드 로직이 C21을 체크하므로 PT만 C21)
GEAR_CODE_LEGACY = {
'PT': 'C21',
'PT-S': 'C21',
'GN': 'C22',
'OT': 'C22',
'PS': 'PS',
'FC': 'FC',
}
def _clean(v) -> Optional[str]:
if v is None:
return None
if isinstance(v, float) and pd.isna(v):
return None
s = str(v).strip()
return s if s else None
def _num(v) -> Optional[float]:
if v is None:
return None
try:
if isinstance(v, float) and pd.isna(v):
return None
return float(v)
except (TypeError, ValueError):
return None
def _int(v) -> Optional[int]:
f = _num(v)
return int(f) if f is not None else None
def parse_xls(path: Path) -> list[dict]:
df = pd.read_excel(path, engine='xlrd', header=1)
rows: list[dict] = []
for _, r in df.iterrows():
permit_no = _clean(r.get('허가번호'))
fishery_code = _clean(r.get('업종코드'))
name_cn = _clean(r.get('선박명(중국)'))
name_en = _clean(r.get('선박명(로마)'))
if not (permit_no and fishery_code and name_cn and name_en):
continue
rows.append({
'permit_no': permit_no,
'fishery_type': _clean(r.get('업종')),
'fishery_code': fishery_code,
'name_cn': name_cn,
'name_en': name_en,
'applicant_cn': _clean(r.get('신청인(중국)')),
'applicant_en': _clean(r.get('신청인(로마)')),
'applicant_addr_cn': _clean(r.get('신청인주소(중국)')),
'applicant_addr_en': _clean(r.get('신청인주소(로마)')),
'registration_no': _clean(r.get('선박등록번호')),
'tonnage': _num(r.get('톤수')),
'port_cn': _clean(r.get('선적항(중국)')),
'port_en': _clean(r.get('선적항(로마)')),
'callsign': _clean(r.get('호출부호')),
'engine_power': _num(r.get('기관출력(마력)')),
'length_m': _num(r.get('길이(m)')),
'beam_m': _num(r.get('폭(m)')),
'depth_m': _num(r.get('깊이(m)')),
'fishing_zones': _clean(r.get('조업수역')),
'fishing_period_1': _clean(r.get('조업기간1')),
'fishing_period_2': _clean(r.get('조업기간2')),
'catch_quota_t': _num(r.get('어획할당량(톤)')),
'cumulative_quota_t': _num(r.get('현어기의\n신규(첫)허가부터 \n누적 어획할당량(톤)')),
'refrig_hold_count': _int(r.get('냉장 어창의 수')),
'freezer_hold_count': _int(r.get('냉동 어창의 수')),
'admin_sanction': _clean(r.get('행정처분일자/행정처분기관/위반내용')),
'parent_permit_no': _clean(r.get('본선의\n허가번호')),
'volume_enclosed': _num(r.get('용적\n폐위장소(㎥)')),
'volume_above_deck': _num(r.get('용적\n상갑판 위(㎥)')),
'volume_below_deck': _num(r.get('용적\n상갑판 아래(㎥)')),
'volume_excluded': _num(r.get('용적\n제외장소(㎥)')),
'raw': {k: (None if (isinstance(v, float) and pd.isna(v)) else v) for k, v in r.items()},
})
logger.info('파싱된 허가 수: %d', len(rows))
return rows
def upsert_permits(cur, year: int, source_file: str, rows: list[dict]) -> None:
sql = """
INSERT INTO kcg.fishery_permit_cn (
permit_year, permit_no, fishery_type, fishery_code,
name_cn, name_en, applicant_cn, applicant_en,
applicant_addr_cn, applicant_addr_en, registration_no, tonnage,
port_cn, port_en, callsign, engine_power,
length_m, beam_m, depth_m, fishing_zones,
fishing_period_1, fishing_period_2, catch_quota_t, cumulative_quota_t,
refrig_hold_count, freezer_hold_count, admin_sanction, parent_permit_no,
volume_enclosed, volume_above_deck, volume_below_deck, volume_excluded,
raw_data, source_file
) VALUES %s
ON CONFLICT (permit_year, permit_no) DO UPDATE SET
fishery_type = EXCLUDED.fishery_type,
fishery_code = EXCLUDED.fishery_code,
name_cn = EXCLUDED.name_cn,
name_en = EXCLUDED.name_en,
applicant_cn = EXCLUDED.applicant_cn,
applicant_en = EXCLUDED.applicant_en,
applicant_addr_cn = EXCLUDED.applicant_addr_cn,
applicant_addr_en = EXCLUDED.applicant_addr_en,
registration_no = EXCLUDED.registration_no,
tonnage = EXCLUDED.tonnage,
port_cn = EXCLUDED.port_cn,
port_en = EXCLUDED.port_en,
callsign = EXCLUDED.callsign,
engine_power = EXCLUDED.engine_power,
length_m = EXCLUDED.length_m,
beam_m = EXCLUDED.beam_m,
depth_m = EXCLUDED.depth_m,
fishing_zones = EXCLUDED.fishing_zones,
fishing_period_1 = EXCLUDED.fishing_period_1,
fishing_period_2 = EXCLUDED.fishing_period_2,
catch_quota_t = EXCLUDED.catch_quota_t,
cumulative_quota_t = EXCLUDED.cumulative_quota_t,
refrig_hold_count = EXCLUDED.refrig_hold_count,
freezer_hold_count = EXCLUDED.freezer_hold_count,
admin_sanction = EXCLUDED.admin_sanction,
parent_permit_no = EXCLUDED.parent_permit_no,
volume_enclosed = EXCLUDED.volume_enclosed,
volume_above_deck = EXCLUDED.volume_above_deck,
volume_below_deck = EXCLUDED.volume_below_deck,
volume_excluded = EXCLUDED.volume_excluded,
raw_data = EXCLUDED.raw_data,
source_file = EXCLUDED.source_file,
loaded_at = now()
"""
tuples = [
(
year, r['permit_no'], r['fishery_type'], r['fishery_code'],
r['name_cn'], r['name_en'], r['applicant_cn'], r['applicant_en'],
r['applicant_addr_cn'], r['applicant_addr_en'], r['registration_no'], r['tonnage'],
r['port_cn'], r['port_en'], r['callsign'], r['engine_power'],
r['length_m'], r['beam_m'], r['depth_m'], r['fishing_zones'],
r['fishing_period_1'], r['fishing_period_2'], r['catch_quota_t'], r['cumulative_quota_t'],
r['refrig_hold_count'], r['freezer_hold_count'], r['admin_sanction'], r['parent_permit_no'],
r['volume_enclosed'], r['volume_above_deck'], r['volume_below_deck'], r['volume_excluded'],
json.dumps({k: (v.isoformat() if hasattr(v, 'isoformat') else v) for k, v in r['raw'].items()}, ensure_ascii=False, default=str),
source_file,
)
for r in rows
]
psycopg2.extras.execute_values(cur, sql, tuples, page_size=200)
logger.info('fishery_permit_cn upsert: %d rows', len(tuples))
def upsert_companies(cur, rows: list[dict]) -> dict[str, int]:
"""신청인(중국어) 기준 fleet_companies upsert → {applicant_cn: company_id}."""
applicants: dict[str, dict] = {}
for r in rows:
key = r['applicant_cn'] or r['applicant_en'] or 'UNKNOWN'
applicants.setdefault(key, {'name_cn': r['applicant_cn'], 'name_en': r['applicant_en']})
result: dict[str, int] = {}
for key, meta in applicants.items():
cur.execute(
"""
INSERT INTO kcg.fleet_companies (name_cn, name_en, country)
VALUES (%s, %s, 'CN')
ON CONFLICT DO NOTHING
""",
(meta['name_cn'], meta['name_en']),
)
cur.execute(
'SELECT id FROM kcg.fleet_companies WHERE name_cn IS NOT DISTINCT FROM %s AND name_en IS NOT DISTINCT FROM %s',
(meta['name_cn'], meta['name_en']),
)
row = cur.fetchone()
if row:
result[key] = row[0]
logger.info('fleet_companies upsert: %d 신청인', len(result))
return result
def sync_fleet_vessels(cur, year: int, rows: list[dict], company_map: dict[str, int]) -> None:
"""해당 연도 허가를 fleet_vessels로 동기화. permit_no+permit_year 기준 upsert."""
# 기존 연도 데이터 먼저 비우기 (허가 취소된 선박 정리)
cur.execute(
'DELETE FROM kcg.fleet_vessels WHERE permit_year = %s AND permit_no NOT IN %s',
(year, tuple([r['permit_no'] for r in rows]) or ('',)),
)
inserted = 0
updated = 0
permit_to_id: dict[str, int] = {}
for r in rows:
company_key = r['applicant_cn'] or r['applicant_en'] or 'UNKNOWN'
company_id = company_map.get(company_key)
if company_id is None:
continue
fishery_code = r['fishery_code']
legacy_gear = GEAR_CODE_LEGACY.get(fishery_code, fishery_code)
fleet_role = FISHERY_ROLE.get(fishery_code, 'MAIN')
cur.execute(
"""
INSERT INTO kcg.fleet_vessels (
company_id, permit_no, name_cn, name_en, tonnage,
gear_code, fishery_code, fleet_role, permit_year, updated_at
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, now())
ON CONFLICT DO NOTHING
RETURNING id
""",
(
company_id, r['permit_no'], r['name_cn'], r['name_en'], r['tonnage'],
legacy_gear, fishery_code, fleet_role, year,
),
)
ret = cur.fetchone()
if ret:
permit_to_id[r['permit_no']] = ret[0]
inserted += 1
else:
cur.execute(
"""
UPDATE kcg.fleet_vessels SET
company_id = %s, name_cn = %s, name_en = %s, tonnage = %s,
gear_code = %s, fishery_code = %s, fleet_role = %s, updated_at = now()
WHERE permit_year = %s AND permit_no = %s
RETURNING id
""",
(
company_id, r['name_cn'], r['name_en'], r['tonnage'],
legacy_gear, fishery_code, fleet_role, year, r['permit_no'],
),
)
ret = cur.fetchone()
if ret:
permit_to_id[r['permit_no']] = ret[0]
updated += 1
logger.info('fleet_vessels 동기화: inserted=%d, updated=%d (year=%d)', inserted, updated, year)
# PT-S(부속선) → 본선 pair_vessel_id 연결
pair_linked = 0
for r in rows:
if r['fishery_code'] != 'PT-S' or not r['parent_permit_no']:
continue
child_id = permit_to_id.get(r['permit_no'])
parent_id = permit_to_id.get(r['parent_permit_no'])
if child_id and parent_id:
cur.execute(
'UPDATE kcg.fleet_vessels SET pair_vessel_id = %s WHERE id = %s',
(parent_id, child_id),
)
cur.execute(
'UPDATE kcg.fleet_vessels SET pair_vessel_id = %s WHERE id = %s AND pair_vessel_id IS NULL',
(child_id, parent_id),
)
pair_linked += 1
logger.info('PT-S pair 연결: %d', pair_linked)
def main() -> None:
repo_root = Path(__file__).resolve().parents[2]
if len(sys.argv) >= 2:
xls_path = Path(sys.argv[1])
else:
xls_path = _find_latest_xls(repo_root / 'docs')
if not xls_path.exists():
raise SystemExit(f'파일 없음: {xls_path}')
year = _extract_year(xls_path)
logger.info('source=%s year=%d', xls_path.name, year)
rows = parse_xls(xls_path)
conn = psycopg2.connect(
host=_env('KCGDB_HOST'),
port=int(_env('KCGDB_PORT', '5432')),
dbname=_env('KCGDB_NAME'),
user=_env('KCGDB_USER'),
password=_env('KCGDB_PASSWORD'),
)
try:
with conn:
with conn.cursor() as cur:
upsert_permits(cur, year, xls_path.name, rows)
company_map = upsert_companies(cur, rows)
sync_fleet_vessels(cur, year, rows, company_map)
logger.info('완료')
finally:
conn.close()
if __name__ == '__main__':
main()