diff --git a/backend/src/main/resources/db/migration/V029__fishery_permit_cn.sql b/backend/src/main/resources/db/migration/V029__fishery_permit_cn.sql new file mode 100644 index 0000000..8939c4b --- /dev/null +++ b/backend/src/main/resources/db/migration/V029__fishery_permit_cn.sql @@ -0,0 +1,72 @@ +-- V026: 한중어업협정 중국어선 허가현황 원본 테이블 + fleet_vessels 연도 컬럼 +-- 출처: docs/중국어선_허가현황_YYYYMMDD.xls (연단위 갱신) + +-- ===== 1. fishery_permit_cn : 허가현황 원본 스냅샷 ===== +CREATE TABLE IF NOT EXISTS kcg.fishery_permit_cn ( + id BIGSERIAL PRIMARY KEY, + permit_year INTEGER NOT NULL, + permit_no VARCHAR(30) NOT NULL, + fishery_type VARCHAR(60), -- 업종 (2척식저인망어업 등) + fishery_code VARCHAR(10) NOT NULL, -- 업종코드 (PT/PT-S/GN/FC/PS/OT) + name_cn VARCHAR(100) NOT NULL, + name_en VARCHAR(200) NOT NULL, + applicant_cn VARCHAR(100), + applicant_en VARCHAR(200), + applicant_addr_cn VARCHAR(300), + applicant_addr_en VARCHAR(300), + registration_no VARCHAR(100), + tonnage NUMERIC(10,2), + port_cn VARCHAR(100), + port_en VARCHAR(200), + callsign VARCHAR(40), + engine_power NUMERIC(10,2), + length_m NUMERIC(6,2), + beam_m NUMERIC(6,2), + depth_m NUMERIC(6,2), + fishing_zones VARCHAR(30), -- Ⅱ,Ⅲ 등 + fishing_period_1 VARCHAR(50), + fishing_period_2 VARCHAR(50), + catch_quota_t NUMERIC(10,2), + cumulative_quota_t NUMERIC(10,2), + refrig_hold_count INTEGER, + freezer_hold_count INTEGER, + admin_sanction TEXT, + parent_permit_no VARCHAR(30), -- 부속선(PT-S)이 참조하는 본선 허가번호 + volume_enclosed NUMERIC(10,2), + volume_above_deck NUMERIC(10,2), + volume_below_deck NUMERIC(10,2), + volume_excluded NUMERIC(10,2), + raw_data JSONB, + source_file VARCHAR(255), + loaded_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (permit_year, permit_no) +); + +CREATE INDEX idx_fishery_permit_cn_name_cn ON kcg.fishery_permit_cn(permit_year, name_cn); +CREATE INDEX idx_fishery_permit_cn_name_en ON kcg.fishery_permit_cn(permit_year, LOWER(name_en)); +CREATE INDEX idx_fishery_permit_cn_code ON kcg.fishery_permit_cn(permit_year, fishery_code); +CREATE INDEX idx_fishery_permit_cn_parent ON kcg.fishery_permit_cn(permit_year, parent_permit_no); + +COMMENT ON TABLE kcg.fishery_permit_cn IS '한중어업협정 중국어선 허가현황 원본 스냅샷 (연단위 갱신)'; +COMMENT ON COLUMN kcg.fishery_permit_cn.permit_year IS '허가 연도 (파일명 YYYY에서 추출)'; +COMMENT ON COLUMN kcg.fishery_permit_cn.fishery_code IS 'PT(쌍끌이 본선)/PT-S(부속선)/GN(자망)/FC(운반)/PS(선망)/OT(외끌이)'; +COMMENT ON COLUMN kcg.fishery_permit_cn.parent_permit_no IS 'PT-S(부속선)가 소속된 본선의 허가번호'; + +-- ===== 2. fleet_vessels 확장 : 연도 + 업종코드 추적 ===== +ALTER TABLE kcg.fleet_vessels ADD COLUMN IF NOT EXISTS permit_year INTEGER; +ALTER TABLE kcg.fleet_vessels ADD COLUMN IF NOT EXISTS fishery_code VARCHAR(10); + +CREATE INDEX IF NOT EXISTS idx_fleet_vessels_permit_year ON kcg.fleet_vessels(permit_year); +CREATE INDEX IF NOT EXISTS idx_fleet_vessels_fishery_code ON kcg.fleet_vessels(fishery_code); + +COMMENT ON COLUMN kcg.fleet_vessels.permit_year IS '허가 연도. fleet_tracker는 현재 연도만 조회'; +COMMENT ON COLUMN kcg.fleet_vessels.fishery_code IS 'fishery_permit_cn.fishery_code 복제 (PT/PT-S/GN/FC/PS/OT)'; + +-- ===== 3. V014 데모 seed 제거 ===== +-- 기존 6행 데모 vessels 제거 (실제 허가현황만 남김). +-- fleet_companies id=1,2는 vessel_permit_master가 FK로 참조하여 삭제 불가 — 잔존 허용 +-- (loader 실행 시 실 허가 신청인 회사가 별도 id로 upsert됨) +DELETE FROM kcg.fleet_vessels WHERE permit_no IN ( + 'ZY-2024-001','ZY-2024-002','ZY-2024-003', + 'ZY-2024-010','ZY-2024-011','ZY-2024-012' +); diff --git a/docs/RELEASE-NOTES.md b/docs/RELEASE-NOTES.md index 2cf60ce..1beeeec 100644 --- a/docs/RELEASE-NOTES.md +++ b/docs/RELEASE-NOTES.md @@ -4,6 +4,26 @@ ## [Unreleased] +## [2026-04-16] + +### 추가 +- **한중어업협정 중국어선 허가현황 레지스트리** — `kcg.fishery_permit_cn` 신규 테이블(29컬럼, 연단위 스냅샷). V029 마이그레이션 + `fleet_vessels.permit_year/fishery_code` 컬럼 추가. `load_fishery_permit_cn.py`로 연도별 XLS → DB 적재(906척/497 신청인사) +- **페어 탐색 재설계** — `find_pair_candidates()` bbox 1차(인접 9 cell) + 궤적 유사도 2차(location/sog_corr/cog_alignment). 동종 어선 페어도 허용, role 가점(PT_REGISTERED/COOP_FISHING/TRANSSHIP_LIKE) +- **fleet_tracker API 3개** — `get_pt_registered_mmsis` / `get_gear_episodes` / `get_gear_positions` + +### 수정 +- **DAR-03 G-04/G-05/G-06 Dead code 해결** — `classify_gear_violations()` scheduler 호출 연결. `if 'pair_results' in dir()` 버그 제거. 사이클당 G-05 303건 / G-04 1건 탐지 시작 +- **spoofing 산식** — 24h 희석 버그 → 최근 1h 윈도우 + teleport 절대 가점(건당 0.20) + extreme(>50kn) 단독 발견 시 score=max(0.6) 확정 +- **gear_code DB write 경로** — `AnalysisResult.gear_code` 필드 + `kcgdb.upsert_results()` INSERT/UPDATE + scheduler 두 경로에서 `fleet_tracker.get_vessel_gear_code()` 호출 + +### 변경 +- **transshipment 선종 완화** — `_CARRIER_HINTS`(cargo/tanker/supply/carrier/reefer) 부분일치 + 412* 중국어선 FISHING 간주 +- **gear drift 임계** — 750m → **500m** (DAR-03 스펙 정합) +- **fleet_tracker 현재 연도 필터** — `WHERE permit_year = EXTRACT(YEAR FROM now())::int OR permit_year IS NULL` + +### 기타 +- cron 스크립트 신규 섹션: hourly P1~P5(허가/매칭/gear_code/fleet_role) + D3.5(pair_type) / diagnostic PART 7.5 + 4-5.5 + ## [2026-04-15] ### 추가 diff --git a/docs/중국어선_허가현황_20260106.xls b/docs/중국어선_허가현황_20260106.xls new file mode 100644 index 0000000..e01f2f7 Binary files /dev/null and b/docs/중국어선_허가현황_20260106.xls differ diff --git a/prediction/algorithms/gear_violation.py b/prediction/algorithms/gear_violation.py index 7ea2cdc..623e60f 100644 --- a/prediction/algorithms/gear_violation.py +++ b/prediction/algorithms/gear_violation.py @@ -27,7 +27,7 @@ SIGNAL_CYCLING_GAP_MIN = 30 # minutes SIGNAL_CYCLING_MIN_COUNT = 2 # G-05 thresholds -GEAR_DRIFT_THRESHOLD_NM = 0.405 # ≈ 750m (보수적, 조류보정 없음) +GEAR_DRIFT_THRESHOLD_NM = 0.270 # ≈ 500m (DAR-03 스펙, 조류 보정 전) # Fixed gear types (stow net, gillnet, trap) FIXED_GEAR_TYPES = {'GN', 'TRAP', 'FYK', 'FPO', 'GNS', 'GND'} diff --git a/prediction/algorithms/pair_trawl.py b/prediction/algorithms/pair_trawl.py index 472d746..683fb72 100644 --- a/prediction/algorithms/pair_trawl.py +++ b/prediction/algorithms/pair_trawl.py @@ -14,6 +14,7 @@ from __future__ import annotations import logging import math +from typing import Optional import pandas as pd @@ -133,6 +134,9 @@ def detect_pair_trawl( df_b: pd.DataFrame, mmsi_a: str, mmsi_b: str, + role_a: str = 'UNKNOWN', + role_b: str = 'UNKNOWN', + similarity: float = 0.0, ) -> dict: """쌍끌이 트롤 공조 탐지 (DAR-03 G-06). @@ -141,20 +145,11 @@ def detect_pair_trawl( df_b: 선박 B의 AIS DataFrame. 필수 컬럼: timestamp, lat, lon, sog, cog mmsi_a: 선박 A MMSI mmsi_b: 선박 B MMSI (결과의 pair_mmsi에 기록) + role_a/role_b: 'FISHING'|'CARRIER'|'PT_MAIN'|'PT_SUB'|'UNKNOWN' + similarity: find_pair_candidates가 계산한 1차 유사도 (0~1) Returns: - { - 'pair_detected': bool, - 'sync_duration_min': float, - 'max_sync_block_min': float, - 'mean_separation_nm': float, - 'sog_delta_mean': float, - 'cog_delta_mean': float, - 'simultaneous_gap_min': float, - 'g_codes': list[str], - 'confidence': float, - 'pair_mmsi': str, - } + 위 필드들 + role_a/role_b/similarity/bonus/pair_type """ required_cols = {'timestamp', 'lat', 'lon', 'sog', 'cog'} @@ -259,12 +254,24 @@ def detect_pair_trawl( 4, ) + # ── role 매칭 가점 (DAR-03 선종 정합) ───────────────────── + bonus = 0 + pair_type = 'GENERIC' + if role_a == 'PT_MAIN' and role_b == 'PT_SUB': + bonus = 15; pair_type = 'PT_REGISTERED' + elif role_b == 'PT_MAIN' and role_a == 'PT_SUB': + bonus = 15; pair_type = 'PT_REGISTERED' + elif role_a == 'FISHING' and role_b == 'FISHING': + bonus = 10; pair_type = 'COOP_FISHING' + elif {role_a, role_b} == {'FISHING', 'CARRIER'}: + bonus = 15; pair_type = 'TRANSSHIP_LIKE' + logger.info( 'pair_trawl(%s, %s): detected — sync=%.0fmin max_block=%.0fmin ' - 'sep=%.3fnm confidence=%.3f g_codes=%s', + 'sep=%.3fnm confidence=%.3f bonus=%d type=%s g_codes=%s', mmsi_a, mmsi_b, sync_duration_min, max_sync_block_min, - mean_separation_nm, confidence, g_codes, + mean_separation_nm, confidence, bonus, pair_type, g_codes, ) return { @@ -278,9 +285,209 @@ def detect_pair_trawl( 'g_codes': g_codes, 'confidence': confidence, 'pair_mmsi': mmsi_b, + 'role_a': role_a, + 'role_b': role_b, + 'similarity': round(similarity, 4), + 'bonus': bonus, + 'pair_type': pair_type, } +def _classify_role( + mmsi: str, + info: dict, + pt_registered: set[str], + pt_sub_registered: set[str], +) -> str: + """페어 후보 role 판정. 반환: PT_MAIN/PT_SUB/FISHING/CARRIER/UNKNOWN.""" + if mmsi in pt_sub_registered: + return 'PT_SUB' + if mmsi in pt_registered: + return 'PT_MAIN' + kind = info.get('ship_kind_code', '') if info else '' + ship_ty = (info.get('ship_ty') or info.get('vessel_type') or '') if info else '' + if kind == '000020': + return 'FISHING' + if kind in ('000023', '000024'): + return 'CARRIER' + # 중국 412* 허가 어선은 ship_kind 없어도 FISHING 간주 + if mmsi.startswith('412'): + return 'FISHING' + st = ship_ty.lower() if isinstance(ship_ty, str) else '' + if any(k in st for k in ('cargo', 'tanker', 'supply', 'carrier')): + return 'CARRIER' + if 'fishing' in st: + return 'FISHING' + return 'UNKNOWN' + + +def _trajectory_similarity( + df_a: pd.DataFrame, df_b: pd.DataFrame, min_samples: int = 6, +) -> tuple[float, int, dict]: + """두 선박의 공통 타임스탬프 기반 궤적 유사도 (0~1). + + Returns: (similarity, common_samples, breakdown) + breakdown = {location_score, sog_corr, cog_alignment} + """ + if df_a.empty or df_b.empty: + return 0.0, 0, {} + cols = {'timestamp', 'lat', 'lon', 'sog', 'cog'} + if cols - set(df_a.columns) or cols - set(df_b.columns): + return 0.0, 0, {} + try: + a = df_a[['timestamp', 'lat', 'lon', 'sog', 'cog']].copy() + b = df_b[['timestamp', 'lat', 'lon', 'sog', 'cog']].copy() + a['timestamp'] = pd.to_datetime(a['timestamp']) + b['timestamp'] = pd.to_datetime(b['timestamp']) + m = pd.merge( + a.rename(columns={'lat': 'la', 'lon': 'oa', 'sog': 'sa', 'cog': 'ca'}), + b.rename(columns={'lat': 'lb', 'lon': 'ob', 'sog': 'sb', 'cog': 'cb'}), + on='timestamp', how='inner', + ) + except Exception: + return 0.0, 0, {} + n = len(m) + if n < min_samples: + return 0.0, n, {} + + # location: 평균 거리 기반 (500m 이내 1.0 → 1km 0.0 선형) + dists = [ + haversine_nm(r.la, r.oa, r.lb, r.ob) + for r in m.itertuples(index=False) + ] + mean_dist_nm = sum(dists) / len(dists) + location_score = max(0.0, min(1.0, 1.0 - (mean_dist_nm - PROXIMITY_NM) / PROXIMITY_NM)) + + # sog 상관 (Pearson) + try: + sa = m['sa'].astype(float) + sb = m['sb'].astype(float) + if sa.std() > 0 and sb.std() > 0: + sog_corr = abs(float(sa.corr(sb))) + else: + sog_corr = 1.0 if (sa - sb).abs().mean() < 0.5 else 0.0 + except Exception: + sog_corr = 0.0 + + # cog 방향 일치: 평균 각도차 → 10°=1.0, 90°=0.0 선형 + cog_deltas = [ + _cog_delta(float(r.ca), float(r.cb)) + for r in m.itertuples(index=False) + ] + mean_cog_delta = sum(cog_deltas) / len(cog_deltas) + cog_alignment = max(0.0, min(1.0, 1.0 - (mean_cog_delta - COG_DELTA_MAX) / 80.0)) + + similarity = 0.4 * location_score + 0.3 * sog_corr + 0.3 * cog_alignment + return round(similarity, 4), n, { + 'location_score': round(location_score, 3), + 'sog_corr': round(sog_corr, 3), + 'cog_alignment': round(cog_alignment, 3), + 'mean_distance_nm': round(mean_dist_nm, 4), + 'mean_cog_delta_deg': round(mean_cog_delta, 2), + } + + +# bbox 1차 탐색 반경 (도 단위) +BBOX_DEG = 0.01 # 약 1.1km — 주변 후보만 컷 +SIMILARITY_PAIR = 0.70 # 유력 페어 +SIMILARITY_OBSERVE = 0.50 # 관찰 페어 + + +def find_pair_candidates( + base_mmsis: set[str], + vessel_dfs: dict[str, pd.DataFrame], + get_vessel_info, + pt_registered: Optional[set[str]] = None, + pt_sub_registered: Optional[set[str]] = None, + bbox_deg: float = BBOX_DEG, + min_common_samples: int = 6, + similarity_threshold: float = SIMILARITY_OBSERVE, +) -> list[dict]: + """base 선박별로 bbox 1차 → 궤적 유사도 2차로 페어 후보 반환. + + base 조건: PT 등록 선박, 어선(000020), 중국 412* 허가선 등 최소 1척 어선 성격. + target: 인접 격자 내 모든 선박. 궤적 유사도 ≥ similarity_threshold 이면 후보. + + Returns: [{'base_mmsi', 'target_mmsi', 'similarity', 'common_samples', + 'base_role', 'target_role', 'breakdown'}, ...] + """ + pt_registered = pt_registered or set() + pt_sub_registered = pt_sub_registered or set() + + # 각 선박의 최근 위치 → grid 빌드 + last_pos: dict[str, dict] = {} + for mmsi, df in vessel_dfs.items(): + if df.empty or not all(c in df.columns for c in ('lat', 'lon')): + continue + try: + row = df.iloc[-1] + last_pos[mmsi] = {'lat': float(row['lat']), 'lon': float(row['lon'])} + except Exception: + continue + + def _cell(lat: float, lon: float) -> tuple[int, int]: + return (int(lat / bbox_deg), int(lon / bbox_deg)) + + grid: dict[tuple[int, int], list[str]] = {} + for mmsi, p in last_pos.items(): + grid.setdefault(_cell(p['lat'], p['lon']), []).append(mmsi) + + checked: set[tuple[str, str]] = set() + results: list[dict] = [] + + for base in base_mmsis: + if base not in last_pos: + continue + bp = last_pos[base] + bc = _cell(bp['lat'], bp['lon']) + # 인접 9 cell + neighbors: list[str] = [] + for dr in (-1, 0, 1): + for dc in (-1, 0, 1): + neighbors.extend(grid.get((bc[0] + dr, bc[1] + dc), [])) + for target in neighbors: + if target == base: + continue + key = (base, target) if base < target else (target, base) + if key in checked: + continue + checked.add(key) + + sim, n_common, breakdown = _trajectory_similarity( + vessel_dfs[base], vessel_dfs[target], min_common_samples, + ) + if sim < similarity_threshold: + continue + + info_a = get_vessel_info(base) if get_vessel_info else {} + info_b = get_vessel_info(target) if get_vessel_info else {} + role_a = _classify_role(base, info_a, pt_registered, pt_sub_registered) + role_b = _classify_role(target, info_b, pt_registered, pt_sub_registered) + + # 최소 1척이 어선 성격이어야 함 + fishing_like = {'FISHING', 'PT_MAIN', 'PT_SUB'} + if role_a not in fishing_like and role_b not in fishing_like: + continue + + results.append({ + 'base_mmsi': base, + 'target_mmsi': target, + 'similarity': sim, + 'common_samples': n_common, + 'base_role': role_a, + 'target_role': role_b, + 'breakdown': breakdown, + 'is_strong': sim >= SIMILARITY_PAIR, + }) + + logger.info( + 'find_pair_candidates: base=%d, pool=%d → candidates=%d (strong=%d)', + len(base_mmsis), len(last_pos), len(results), + sum(1 for r in results if r['is_strong']), + ) + return results + + def scan_unregistered_pairs( vessel_dfs: dict[str, pd.DataFrame], registered_pairs: set[tuple[str, str]], diff --git a/prediction/algorithms/spoofing.py b/prediction/algorithms/spoofing.py index a75db08..bbac4fb 100644 --- a/prediction/algorithms/spoofing.py +++ b/prediction/algorithms/spoofing.py @@ -2,6 +2,8 @@ import pandas as pd from algorithms.location import haversine_nm, bd09_to_wgs84, compute_bd09_offset # noqa: F401 MAX_FISHING_SPEED_KNOTS = 25.0 +EXTREME_SPEED_KNOTS = 50.0 # 물리적 불가능 속도 (단독으로 spoofing 확정) +RECENT_WINDOW_BUCKETS = 12 # 최근 12 x 5min = 1시간 윈도우로 분모 제한 def detect_teleportation(df_vessel: pd.DataFrame, @@ -51,24 +53,45 @@ def count_speed_jumps(df_vessel: pd.DataFrame, threshold_knots: float = 10.0) -> def compute_spoofing_score(df_vessel: pd.DataFrame) -> float: - """종합 GPS 스푸핑 점수 (0~1).""" + """종합 GPS 스푸핑 점수 (0~1). + + 분모를 24h 누적 길이가 아닌 최근 1시간 윈도우로 제한하고, + 물리적 불가능 속도(>50kn)는 단독으로 강한 가점을 부여한다. + """ if len(df_vessel) < 2: return 0.0 + # 최근 N 버킷(약 1시간) 기준으로 분모 고정 + window_df = df_vessel.tail(RECENT_WINDOW_BUCKETS) + n_window = max(len(window_df), 2) + score = 0.0 - n = len(df_vessel) - # 순간이동 비율 - teleports = detect_teleportation(df_vessel) + # 1) 순간이동 — 절대 가점 (건당 0.2) + extreme 단독 확정 가점 0.6 + teleports = detect_teleportation(window_df) if teleports: - score += min(0.4, len(teleports) / n * 10) + score += min(0.4, len(teleports) * 0.20) + extreme = any(t.get('implied_kn', 0) >= EXTREME_SPEED_KNOTS for t in teleports) + if extreme: + score = max(score, 0.6) - # SOG 급변 비율 - jumps = count_speed_jumps(df_vessel) + # 2) max_speed 컬럼이 있으면 window 내 최고속 직접 확인 (SNPDB 집계값 활용) + if 'max_speed' in window_df.columns: + try: + peak_kn = float(window_df['max_speed'].max()) + if peak_kn >= EXTREME_SPEED_KNOTS: + score = max(score, 0.6) + elif peak_kn > MAX_FISHING_SPEED_KNOTS: + score += 0.15 + except (TypeError, ValueError): + pass + + # 3) SOG 급변 — 윈도우 비율 가점 + jumps = count_speed_jumps(window_df) if jumps > 0: - score += min(0.3, jumps / n * 5) + score += min(0.3, jumps / n_window * 3) - # BD09 오프셋 — 중국 선박(412*)은 좌표계 차이로 항상 ~300m이므로 제외 + # 4) BD09 오프셋 — 비중국 선박만 (중국 412*는 좌표계 차이로 노이즈) mmsi_str = str(df_vessel.iloc[0].get('mmsi', '')) if 'mmsi' in df_vessel.columns else '' if not mmsi_str.startswith('412'): mid_idx = len(df_vessel) // 2 diff --git a/prediction/algorithms/transshipment.py b/prediction/algorithms/transshipment.py index 508bccb..040ba72 100644 --- a/prediction/algorithms/transshipment.py +++ b/prediction/algorithms/transshipment.py @@ -45,6 +45,8 @@ _EXCLUDED_SHIP_TY = frozenset({ 'Tug', 'Pilot Boat', 'Search And Rescue', 'Law Enforcement', 'AtoN', 'Anti Pollution', 'Passenger', 'Medical Transport', }) +# shipTy 텍스트에 포함되면 CARRIER 로 승격 (부분일치, 대소문자 무시) +_CARRIER_HINTS = ('cargo', 'tanker', 'supply', 'carrier', 'reefer') # ────────────────────────────────────────────────────────────── # 감시영역 로드 @@ -101,24 +103,27 @@ def _is_in_transship_zone(lat: float, lon: float) -> Optional[str]: def _classify_vessel_role( ship_kind_code: str, ship_ty: str, + mmsi: str = '', ) -> str: """선박 역할 분류: 'FISHING', 'CARRIER', 'EXCLUDED', 'UNKNOWN'""" + st_lower = ship_ty.lower() if isinstance(ship_ty, str) else '' if ship_kind_code in _FISHING_KINDS: return 'FISHING' if ship_kind_code in _CARRIER_KINDS: - # 화물선/유조선 — shipTy가 예인선/관공선이면 제외 if ship_ty in _EXCLUDED_SHIP_TY: return 'EXCLUDED' return 'CARRIER' - # 000027(기타) / 000028(미분류): shipTy 텍스트로 엄격 판정 - if ship_kind_code in ('000027', '000028'): - if ship_ty == 'Cargo': - return 'CARRIER' - if ship_ty == 'Tanker': - return 'CARRIER' + # 000027/000028/기타: shipTy 텍스트 부분일치로 완화 + if ship_kind_code in ('000027', '000028', ''): if ship_ty in _EXCLUDED_SHIP_TY: return 'EXCLUDED' - # N/A, Vessel, 기타 → UNKNOWN (환적 후보에서 제외) + if any(h in st_lower for h in _CARRIER_HINTS): + return 'CARRIER' + if 'fishing' in st_lower: + return 'FISHING' + # 중국 412* 허가어선은 ship_kind/shipTy 불명이어도 FISHING 간주 + if mmsi.startswith('412'): + return 'FISHING' return 'UNKNOWN' # 000021(함정), 000022(여객), 000025(관공) → 제외 if ship_kind_code in ('000021', '000022', '000025'): @@ -127,7 +132,7 @@ def _classify_vessel_role( def _is_transship_pair(role_a: str, role_b: str) -> bool: - """어선 + 운반선 조합만 True.""" + """이종 쌍(어선↔운반선)만 True. 동종 어선 공조는 pair_trawl 경로(COOP_FISHING)에서 처리.""" return (role_a == 'FISHING' and role_b == 'CARRIER') or \ (role_b == 'FISHING' and role_a == 'CARRIER') @@ -355,7 +360,7 @@ def detect_transshipment( info = get_vessel_info(mmsi) if get_vessel_info else {} kind = info.get('ship_kind_code', '') ship_ty = info.get('ship_ty', info.get('vessel_type', '')) - role = _classify_vessel_role(kind, ship_ty) + role = _classify_vessel_role(kind, ship_ty, mmsi) if role in ('EXCLUDED', 'UNKNOWN'): continue diff --git a/prediction/db/kcgdb.py b/prediction/db/kcgdb.py index cb0fc21..05d463d 100644 --- a/prediction/db/kcgdb.py +++ b/prediction/db/kcgdb.py @@ -79,7 +79,8 @@ def upsert_results(results: list['AnalysisResult']) -> int: risk_score, risk_level, transship_suspect, transship_pair_mmsi, transship_duration_min, features, - gear_judgment + gear_judgment, + gear_code ) VALUES %s ON CONFLICT (mmsi, analyzed_at) DO UPDATE SET vessel_type = EXCLUDED.vessel_type, @@ -106,7 +107,8 @@ def upsert_results(results: list['AnalysisResult']) -> int: transship_pair_mmsi = EXCLUDED.transship_pair_mmsi, transship_duration_min = EXCLUDED.transship_duration_min, features = EXCLUDED.features, - gear_judgment = EXCLUDED.gear_judgment + gear_judgment = EXCLUDED.gear_judgment, + gear_code = EXCLUDED.gear_code """ try: diff --git a/prediction/fleet_tracker.py b/prediction/fleet_tracker.py index 5f54bc6..7c49e29 100644 --- a/prediction/fleet_tracker.py +++ b/prediction/fleet_tracker.py @@ -43,10 +43,13 @@ class FleetTracker: cur.execute(f'SELECT id, name_cn, name_en FROM {FLEET_COMPANIES}') self._companies = {r[0]: {'name_cn': r[1], 'name_en': r[2]} for r in cur.fetchall()} + # 현재 연도 허가만 조회 (연단위 갱신 정책, permit_year NULL은 legacy 허용) cur.execute( f"""SELECT id, company_id, permit_no, name_cn, name_en, tonnage, gear_code, fleet_role, pair_vessel_id, mmsi - FROM {FLEET_VESSELS}""" + FROM {FLEET_VESSELS} + WHERE permit_year = EXTRACT(YEAR FROM now())::int + OR permit_year IS NULL""" ) self._vessels = {} self._name_cn_map = {} @@ -103,6 +106,59 @@ class FleetTracker: return None return self._vessels.get(vid, {}).get('gear_code') + def get_pt_registered_mmsis(self) -> set[str]: + """쌍끌이(PT / PT-S) 로 등록된 선박의 MMSI 집합. + + fishery_code 미도입 환경에서도 legacy gear_code='C21' 로 동작. + """ + result: set[str] = set() + for v in self._vessels.values(): + gc = v.get('gear_code') or '' + mmsi = v.get('mmsi') + if mmsi and gc in ('C21',): + result.add(mmsi) + return result + + def get_gear_episodes(self, mmsi: str, conn, hours: int = 24) -> list[dict]: + """gear_identity_log 에서 최근 N시간 신호 에피소드 목록을 반환. + + G-04 MMSI 조작 탐지용. first_seen_at / last_seen_at 포함. + """ + try: + cur = conn.cursor() + cur.execute( + f"""SELECT first_seen_at, last_seen_at + FROM {GEAR_IDENTITY_LOG} + WHERE mmsi = %s + AND first_seen_at > NOW() - (%s || ' hours')::interval + ORDER BY first_seen_at""", + (mmsi, str(hours)), + ) + rows = cur.fetchall() + cur.close() + return [{'first_seen_at': r[0], 'last_seen_at': r[1]} for r in rows] + except Exception as exc: + logger.warning('get_gear_episodes 실패 [mmsi=%s]: %s', mmsi, exc) + return [] + + def get_gear_positions( + self, mmsi: str, df_vessel: Optional[pd.DataFrame] = None, + ) -> list[tuple[float, float]]: + """고정어구 위치 목록(lat, lon). G-05 drift 탐지용. + + 현재 단계에서는 선박 DataFrame(track 좌표)을 그대로 전달하는 경로만 지원. + 향후 gear_tracking_snapshot 테이블이 도입되면 DB 경로 추가. + """ + if df_vessel is None or len(df_vessel) == 0: + return [] + try: + lats = df_vessel['lat'].tolist() + lons = df_vessel['lon'].tolist() + return list(zip(lats, lons)) + except Exception as exc: + logger.warning('get_gear_positions 실패 [mmsi=%s]: %s', mmsi, exc) + return [] + def match_ais_to_registry(self, ais_vessels: list[dict], conn) -> None: """AIS 선박을 등록 선단에 매칭. DB 업데이트. diff --git a/prediction/models/result.py b/prediction/models/result.py index 53c0883..91a22cb 100644 --- a/prediction/models/result.py +++ b/prediction/models/result.py @@ -55,6 +55,9 @@ class AnalysisResult: # ALGO 09: 어구 위반 판정 gear_judgment: str = '' + # 등록 선박 어구 코드 (fleet_vessels.gear_code: C21=PT, C22=OT 등) + gear_code: Optional[str] = None + # 메타 analyzed_at: Optional[datetime] = None @@ -120,4 +123,5 @@ class AnalysisResult: _i(self.transship_duration_min), json.dumps(safe_features), str(self.gear_judgment) if self.gear_judgment else None, + str(self.gear_code) if self.gear_code else None, ) diff --git a/prediction/output/violation_classifier.py b/prediction/output/violation_classifier.py index ac4f730..379a5f3 100644 --- a/prediction/output/violation_classifier.py +++ b/prediction/output/violation_classifier.py @@ -58,7 +58,7 @@ def classify_violations(result: dict) -> list[str]: if transship: violations.append('ILLEGAL_TRANSSHIP') - # 어구 불법 (gear_judgment이 있는 경우만 — 현재는 scheduler에서 채우지 않음) + # 어구 불법 (gear_judgment은 classify_gear_violations()로 채워짐: G-01/G-04/G-05/G-06) gear_judgment = result.get('gear_judgment', '') or '' if gear_judgment in ('NO_PERMIT', 'GEAR_MISMATCH', 'ZONE_VIOLATION', 'SEASON_VIOLATION', 'PAIR_TRAWL'): violations.append('ILLEGAL_GEAR') diff --git a/prediction/scheduler.py b/prediction/scheduler.py index 8d382c4..91e39f1 100644 --- a/prediction/scheduler.py +++ b/prediction/scheduler.py @@ -203,6 +203,50 @@ def run_analysis_cycle(): except Exception as e: logger.warning('gear correlation failed: %s', e) + # 4.9 페어 후보 탐색 (bbox 1차 + 궤적 유사도 2차 → G-06 pair_trawl 판정) + pair_results: dict[str, dict] = {} + try: + from algorithms.pair_trawl import find_pair_candidates, detect_pair_trawl + + pt_registered = fleet_tracker.get_pt_registered_mmsis() + pt_sub_registered: set[str] = set() # TODO: fishery_code=PT-S 구분 + base_mmsis: set[str] = {c['mmsi'] for c in classifications} + base_mmsis |= pt_registered + + # pool은 전체 24h 누적 tracks (중국 어선 중심 8k+ 선박) + pool_tracks = getattr(vessel_store, '_tracks', {}) or vessel_dfs + pair_candidates = find_pair_candidates( + base_mmsis=base_mmsis, + vessel_dfs=pool_tracks, + get_vessel_info=vessel_store.get_vessel_info, + pt_registered=pt_registered, + pt_sub_registered=pt_sub_registered, + ) + pt_det = 0; coop_det = 0 + for cand in pair_candidates: + ma, mb = cand['base_mmsi'], cand['target_mmsi'] + if ma not in pool_tracks or mb not in pool_tracks: + continue + result = detect_pair_trawl( + pool_tracks[ma], pool_tracks[mb], ma, mb, + role_a=cand['base_role'], role_b=cand['target_role'], + similarity=cand['similarity'], + ) + if not result.get('pair_detected'): + continue + pair_results[ma] = {**result, 'pair_mmsi': mb} + pair_results[mb] = {**result, 'pair_mmsi': ma} + if result.get('pair_type') == 'PT_REGISTERED': + pt_det += 1 + elif result.get('pair_type') == 'COOP_FISHING': + coop_det += 1 + logger.info( + 'pair detection: candidates=%d, detected=%d (pt=%d, coop=%d)', + len(pair_candidates), len(pair_results) // 2, pt_det, coop_det, + ) + except Exception as e: + logger.warning('pair detection failed: %s', e) + # 5. 선박별 추가 알고리즘 → AnalysisResult 생성 # dark 이력 일괄 조회 (7일 history) — 사이클당 1회 now_kst_hour = datetime.now(_KST).hour @@ -228,8 +272,11 @@ def run_analysis_cycle(): gear_map = {'TRAWL': 'OT', 'PT': 'PT', 'PURSE': 'PS', 'LONGLINE': 'GN', 'TRAP': 'TRAP'} # fleet_registry gear_code C21(쌍끌이) → vessel_type 'PT' 오버라이드 vtype = c['vessel_type'] - if hasattr(fleet_tracker, 'get_vessel_gear_code') and fleet_tracker.get_vessel_gear_code(mmsi) == 'C21': - vtype = 'PT' + registered_gear_code: Optional[str] = None + if hasattr(fleet_tracker, 'get_vessel_gear_code'): + registered_gear_code = fleet_tracker.get_vessel_gear_code(mmsi) + if registered_gear_code == 'C21': + vtype = 'PT' gear = gear_map.get(vtype, 'OT') ucaf = compute_ucaf_score(df_v, gear) ucft = compute_ucft_score(df_v) @@ -291,37 +338,38 @@ def run_analysis_cycle(): if 'state' in df_v.columns and len(df_v) > 0: activity = df_v['state'].mode().iloc[0] - # ── G-01 수역-어구 불일치 체크 ── - g_codes: list[str] = [] - gear_judgment = '' + # ── G-01/G-04/G-05/G-06 통합 판정 (DAR-03) ── + from algorithms.gear_violation import classify_gear_violations + + pair_result = pair_results.get(mmsi) + if pair_result and not pair_result.get('pair_detected'): + pair_result = None + # G-06 판정은 pair_type='PT_REGISTERED' 또는 엄격 sync 조건 만족일 때만 + if pair_result and pair_result.get('pair_type') not in ('PT_REGISTERED', 'TRANSSHIP_LIKE', 'COOP_FISHING', 'GENERIC'): + pair_result = None + + gear_episodes: list = [] + gear_positions: list = [] + if gear in ('GN', 'TRAP', 'FYK', 'FPO', 'GNS', 'GND'): + try: + with kcgdb.get_conn() as gv_conn: + gear_episodes = fleet_tracker.get_gear_episodes(mmsi, gv_conn, hours=24) + gear_positions = fleet_tracker.get_gear_positions(mmsi, df_v) + except Exception as e: + logger.debug('gear episode/pos 조회 실패 [%s]: %s', mmsi, e) + + gv = classify_gear_violations( + mmsi=mmsi, gear_type=gear, zone_info=zone_info, + df_vessel=df_v, pair_result=pair_result, + is_permitted=is_permitted, + gear_episodes=gear_episodes or None, + gear_positions=gear_positions or None, + ) + g_codes = gv['g_codes'] + gear_judgment = gv['gear_judgment'] + gear_violation_score = gv['gear_violation_score'] + gear_violation_evidence = gv['evidence'] zone_code = zone_info.get('zone', 'EEZ_OR_BEYOND') - allowed_gears = zone_info.get('allowed_gears', []) - if zone_code.startswith('ZONE_') and allowed_gears and gear not in allowed_gears: - g_codes.append('G-01') - gear_judgment = 'ZONE_VIOLATION' - - # pair_trawl 결과 병합 (Phase 2에서 pair_results dict 채워짐) - pair_result = pair_results.get(mmsi) if 'pair_results' in dir() else None - if pair_result and pair_result.get('pair_detected'): - g_codes.extend(pair_result.get('g_codes', [])) - if not gear_judgment: - gear_judgment = 'PAIR_TRAWL' - - gear_violation_score = 0 - gear_violation_evidence: dict = {} - if 'G-01' in g_codes: - gear_violation_score += 15 - gear_violation_evidence['G-01'] = { - 'zone': zone_code, 'gear': gear, - 'allowed': allowed_gears, - } - if 'G-06' in g_codes and pair_result: - gear_violation_score += 20 - gear_violation_evidence['G-06'] = { - 'sync_duration_min': pair_result.get('sync_duration_min', 0), - 'mean_separation_nm': pair_result.get('mean_separation_nm', 0), - 'pair_mmsi': pair_result.get('pair_mmsi', ''), - } # risk_score에 어구 위반 가산 final_risk = min(100, risk_score + gear_violation_score) @@ -368,6 +416,7 @@ def run_analysis_cycle(): risk_level=final_risk_level, features=merged_features, gear_judgment=gear_judgment, + gear_code=registered_gear_code, )) logger.info( @@ -511,6 +560,10 @@ def run_analysis_cycle(): transship_pair_mmsi='', transship_duration_min=0, features=dark_features, + gear_code=( + fleet_tracker.get_vessel_gear_code(mmsi) + if hasattr(fleet_tracker, 'get_vessel_gear_code') else None + ), )) lw_count += 1 logger.info( diff --git a/prediction/scripts/diagnostic-snapshot.sh b/prediction/scripts/diagnostic-snapshot.sh index 96fe03f..6bc7da7 100644 --- a/prediction/scripts/diagnostic-snapshot.sh +++ b/prediction/scripts/diagnostic-snapshot.sh @@ -229,6 +229,18 @@ WHERE analyzed_at > now() - interval '5 minutes' ORDER BY risk_score DESC LIMIT 10; SQL +echo "" +echo "--- 4-5.5 pair_type 분포 (DAR-03 base-target 탐색, 5min) ---" +$PSQL_TABLE << 'SQL' +SELECT coalesce(features->>'pair_type', '(none)') pair_type, + count(*) cnt, + round(avg((features->>'similarity')::numeric)::numeric, 3) avg_sim +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '5 minutes' + AND features->>'pair_trawl_detected' = 'true' +GROUP BY pair_type ORDER BY cnt DESC; +SQL + echo "" echo "--- 4-6. GEAR_ILLEGAL 이벤트 ---" $PSQL_TABLE << 'SQL' @@ -299,6 +311,57 @@ journalctl -u kcg-ai-prediction --since '6 minutes ago' --no-pager 2>/dev/null | grep -E 'analysis cycle:|lightweight|pipeline dark:|event_generator:|pair_trawl|gear_violation|GEAR_ILLEGAL|ERROR|Traceback' | \ tail -20 +#=================================================================== +# PART 7.5: 한중어업협정 레지스트리 매칭 (V029) +#=================================================================== +echo "" +echo "=================================================================" +echo "PART 7.5: FISHERY PERMIT CN REGISTRY 매칭 현황" +echo "=================================================================" + +echo "" +echo "--- 7.5-1. fleet_vessels 매칭 현황 (현재 연도) ---" +$PSQL_TABLE << 'SQL' +SELECT permit_year, + count(*) total, + count(mmsi) with_mmsi, + round(count(mmsi)::numeric / NULLIF(count(*),0) * 100, 1) match_pct +FROM kcg.fleet_vessels +WHERE permit_year IS NOT NULL +GROUP BY permit_year ORDER BY permit_year DESC; +SQL + +echo "" +echo "--- 7.5-2. fishery_code 별 매칭률 (현재 연도) ---" +$PSQL_TABLE << 'SQL' +SELECT fishery_code, count(*) total, count(mmsi) matched, + round(count(mmsi)::numeric / NULLIF(count(*),0) * 100, 1) pct +FROM kcg.fleet_vessels +WHERE permit_year = EXTRACT(YEAR FROM now())::int +GROUP BY fishery_code ORDER BY total DESC; +SQL + +echo "" +echo "--- 7.5-3. vessel_analysis_results.gear_code 분포 (last 5min) ---" +$PSQL_TABLE << 'SQL' +SELECT coalesce(gear_code, '(null)') gear_code, + count(*) cnt, + round(avg(risk_score)::numeric, 1) avg_risk +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '5 minutes' +GROUP BY gear_code ORDER BY cnt DESC LIMIT 15; +SQL + +echo "" +echo "--- 7.5-4. 최근 매칭된 선박 (top 10 by last_seen_at) ---" +$PSQL_TABLE << 'SQL' +SELECT permit_no, fishery_code, name_cn, mmsi, match_method, + match_confidence, last_seen_at +FROM kcg.fleet_vessels +WHERE permit_year = EXTRACT(YEAR FROM now())::int AND mmsi IS NOT NULL +ORDER BY last_seen_at DESC NULLS LAST LIMIT 10; +SQL + #=================================================================== # PART 8: 해역별 종합 교차 #=================================================================== diff --git a/prediction/scripts/hourly-analysis-snapshot.sh b/prediction/scripts/hourly-analysis-snapshot.sh index d7c5efa..4f7b450 100755 --- a/prediction/scripts/hourly-analysis-snapshot.sh +++ b/prediction/scripts/hourly-analysis-snapshot.sh @@ -130,6 +130,57 @@ SELECT FROM kcg.vessel_analysis_results WHERE analyzed_at > now() - interval '1 hour'; +\echo +\echo =================================================================== +\echo === FISHERY PERMIT CN REGISTRY (V029 - 한중어업협정) +\echo =================================================================== + +\echo +\echo === P1. fishery_permit_cn year-by-year === +SELECT permit_year, count(*) permits, + count(DISTINCT fishery_code) codes, + count(DISTINCT applicant_cn) applicants, + max(loaded_at) loaded_at +FROM kcg.fishery_permit_cn +GROUP BY permit_year ORDER BY permit_year DESC; + +\echo +\echo === P2. fleet_vessels matching (current year = registry) === +SELECT permit_year, + count(*) total, + count(mmsi) with_mmsi, + round(count(mmsi)::numeric / NULLIF(count(*),0) * 100, 1) match_pct, + max(last_seen_at) last_match +FROM kcg.fleet_vessels +WHERE permit_year IS NOT NULL +GROUP BY permit_year ORDER BY permit_year DESC; + +\echo +\echo === P3. fleet_vessels breakdown by fishery_code (current year) === +SELECT fishery_code, count(*) total, count(mmsi) matched, + round(count(mmsi)::numeric / NULLIF(count(*),0) * 100, 1) pct +FROM kcg.fleet_vessels +WHERE permit_year = EXTRACT(YEAR FROM now())::int +GROUP BY fishery_code ORDER BY total DESC; + +\echo +\echo === P4. vessel_analysis_results.gear_code distribution (last 1h) === +SELECT coalesce(gear_code, '(null)') gear_code, + count(*) cnt, + round(avg(risk_score)::numeric, 1) avg_risk +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '1 hour' +GROUP BY gear_code ORDER BY cnt DESC LIMIT 15; + +\echo +\echo === P5. fleet_role distribution (last 1h, from registry match) === +SELECT fleet_role, count(*) cnt, + count(*) FILTER (WHERE fleet_is_leader) is_leader, + round(avg(risk_score)::numeric, 1) avg_risk +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '1 hour' +GROUP BY fleet_role ORDER BY cnt DESC; + \echo \echo === G1. PIPELINE vessel_type distribution === SELECT vessel_type, count(*), @@ -196,6 +247,17 @@ WHERE analyzed_at > now() - interval '1 hour' AND vessel_type != 'UNKNOWN' AND zone_code LIKE 'ZONE_%' GROUP BY zone_code, vessel_type ORDER BY zone_code, vessel_type; +\echo +\echo === D3.5 pair_type distribution (DAR-03 base-target 탐색) === +SELECT coalesce(features->>'pair_type', '(none)') pair_type, + count(*) cnt, + round(avg((features->>'similarity')::numeric)::numeric, 3) avg_sim, + round(avg((features->>'confidence')::numeric)::numeric, 3) avg_conf +FROM kcg.vessel_analysis_results +WHERE analyzed_at > now() - interval '1 hour' + AND features->>'pair_trawl_detected' = 'true' +GROUP BY pair_type ORDER BY cnt DESC; + \echo \echo === D4. G-06 pair trawl detections === SELECT mmsi, zone_code, vessel_type, risk_score, diff --git a/prediction/scripts/load_fishery_permit_cn.py b/prediction/scripts/load_fishery_permit_cn.py new file mode 100644 index 0000000..4ee8bb1 --- /dev/null +++ b/prediction/scripts/load_fishery_permit_cn.py @@ -0,0 +1,358 @@ +"""한중어업협정 중국어선 허가현황 XLS → kcgdb 적재. + +Usage: + python3 prediction/scripts/load_fishery_permit_cn.py + # 또는 기본 경로(docs/중국어선_허가현황_YYYYMMDD.xls 최신) 자동 탐색 + python3 prediction/scripts/load_fishery_permit_cn.py + +수행: + 1) XLS 파싱 → kcg.fishery_permit_cn upsert (permit_year + permit_no 복합 유니크) + 2) 신청인(중국어) 기준 kcg.fleet_companies upsert + 3) 해당 연도 레코드를 kcg.fleet_vessels로 동기화 + - PT-S(부속선)는 parent_permit_no로 본선 pair_vessel_id 연결 + - fleet_role: FC=TRANSPORT, PT=MAIN, PT-S=CREW, 기타=MAIN +""" +from __future__ import annotations + +import json +import logging +import os +import re +import sys +from pathlib import Path +from typing import Optional + +import pandas as pd +import psycopg2 +import psycopg2.extras + +logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') +logger = logging.getLogger(__name__) + + +def _env(key: str, default: Optional[str] = None) -> str: + v = os.environ.get(key, default) + if v is None: + raise SystemExit(f'환경변수 {key} 가 필요합니다') + return v + + +def _find_latest_xls(docs_dir: Path) -> Path: + pattern = re.compile(r'중국어선_허가현황_(\d{8})\.xls$') + candidates = [] + for p in docs_dir.glob('중국어선_허가현황_*.xls'): + m = pattern.search(p.name) + if m: + candidates.append((m.group(1), p)) + if not candidates: + raise SystemExit(f'{docs_dir} 에서 허가현황 XLS를 찾지 못했습니다') + candidates.sort() + return candidates[-1][1] + + +def _extract_year(path: Path) -> int: + m = re.search(r'(\d{4})\d{4}', path.name) + if not m: + raise SystemExit(f'파일명에서 연도 추출 실패: {path.name}') + return int(m.group(1)) + + +FISHERY_ROLE = { + 'FC': 'TRANSPORT', + 'PT': 'MAIN', + 'PT-S': 'CREW', + 'GN': 'MAIN', + 'PS': 'MAIN', + 'OT': 'MAIN', +} + +# 업종코드 → fleet_tracker가 쓰는 레거시 gear_code 매핑 +# (vessel_type 오버라이드 로직이 C21을 체크하므로 PT만 C21) +GEAR_CODE_LEGACY = { + 'PT': 'C21', + 'PT-S': 'C21', + 'GN': 'C22', + 'OT': 'C22', + 'PS': 'PS', + 'FC': 'FC', +} + + +def _clean(v) -> Optional[str]: + if v is None: + return None + if isinstance(v, float) and pd.isna(v): + return None + s = str(v).strip() + return s if s else None + + +def _num(v) -> Optional[float]: + if v is None: + return None + try: + if isinstance(v, float) and pd.isna(v): + return None + return float(v) + except (TypeError, ValueError): + return None + + +def _int(v) -> Optional[int]: + f = _num(v) + return int(f) if f is not None else None + + +def parse_xls(path: Path) -> list[dict]: + df = pd.read_excel(path, engine='xlrd', header=1) + rows: list[dict] = [] + for _, r in df.iterrows(): + permit_no = _clean(r.get('허가번호')) + fishery_code = _clean(r.get('업종코드')) + name_cn = _clean(r.get('선박명(중국)')) + name_en = _clean(r.get('선박명(로마)')) + if not (permit_no and fishery_code and name_cn and name_en): + continue + rows.append({ + 'permit_no': permit_no, + 'fishery_type': _clean(r.get('업종')), + 'fishery_code': fishery_code, + 'name_cn': name_cn, + 'name_en': name_en, + 'applicant_cn': _clean(r.get('신청인(중국)')), + 'applicant_en': _clean(r.get('신청인(로마)')), + 'applicant_addr_cn': _clean(r.get('신청인주소(중국)')), + 'applicant_addr_en': _clean(r.get('신청인주소(로마)')), + 'registration_no': _clean(r.get('선박등록번호')), + 'tonnage': _num(r.get('톤수')), + 'port_cn': _clean(r.get('선적항(중국)')), + 'port_en': _clean(r.get('선적항(로마)')), + 'callsign': _clean(r.get('호출부호')), + 'engine_power': _num(r.get('기관출력(마력)')), + 'length_m': _num(r.get('길이(m)')), + 'beam_m': _num(r.get('폭(m)')), + 'depth_m': _num(r.get('깊이(m)')), + 'fishing_zones': _clean(r.get('조업수역')), + 'fishing_period_1': _clean(r.get('조업기간1')), + 'fishing_period_2': _clean(r.get('조업기간2')), + 'catch_quota_t': _num(r.get('어획할당량(톤)')), + 'cumulative_quota_t': _num(r.get('현어기의\n신규(첫)허가부터 \n누적 어획할당량(톤)')), + 'refrig_hold_count': _int(r.get('냉장 어창의 수')), + 'freezer_hold_count': _int(r.get('냉동 어창의 수')), + 'admin_sanction': _clean(r.get('행정처분일자/행정처분기관/위반내용')), + 'parent_permit_no': _clean(r.get('본선의\n허가번호')), + 'volume_enclosed': _num(r.get('용적\n폐위장소(㎥)')), + 'volume_above_deck': _num(r.get('용적\n상갑판 위(㎥)')), + 'volume_below_deck': _num(r.get('용적\n상갑판 아래(㎥)')), + 'volume_excluded': _num(r.get('용적\n제외장소(㎥)')), + 'raw': {k: (None if (isinstance(v, float) and pd.isna(v)) else v) for k, v in r.items()}, + }) + logger.info('파싱된 허가 수: %d', len(rows)) + return rows + + +def upsert_permits(cur, year: int, source_file: str, rows: list[dict]) -> None: + sql = """ + INSERT INTO kcg.fishery_permit_cn ( + permit_year, permit_no, fishery_type, fishery_code, + name_cn, name_en, applicant_cn, applicant_en, + applicant_addr_cn, applicant_addr_en, registration_no, tonnage, + port_cn, port_en, callsign, engine_power, + length_m, beam_m, depth_m, fishing_zones, + fishing_period_1, fishing_period_2, catch_quota_t, cumulative_quota_t, + refrig_hold_count, freezer_hold_count, admin_sanction, parent_permit_no, + volume_enclosed, volume_above_deck, volume_below_deck, volume_excluded, + raw_data, source_file + ) VALUES %s + ON CONFLICT (permit_year, permit_no) DO UPDATE SET + fishery_type = EXCLUDED.fishery_type, + fishery_code = EXCLUDED.fishery_code, + name_cn = EXCLUDED.name_cn, + name_en = EXCLUDED.name_en, + applicant_cn = EXCLUDED.applicant_cn, + applicant_en = EXCLUDED.applicant_en, + applicant_addr_cn = EXCLUDED.applicant_addr_cn, + applicant_addr_en = EXCLUDED.applicant_addr_en, + registration_no = EXCLUDED.registration_no, + tonnage = EXCLUDED.tonnage, + port_cn = EXCLUDED.port_cn, + port_en = EXCLUDED.port_en, + callsign = EXCLUDED.callsign, + engine_power = EXCLUDED.engine_power, + length_m = EXCLUDED.length_m, + beam_m = EXCLUDED.beam_m, + depth_m = EXCLUDED.depth_m, + fishing_zones = EXCLUDED.fishing_zones, + fishing_period_1 = EXCLUDED.fishing_period_1, + fishing_period_2 = EXCLUDED.fishing_period_2, + catch_quota_t = EXCLUDED.catch_quota_t, + cumulative_quota_t = EXCLUDED.cumulative_quota_t, + refrig_hold_count = EXCLUDED.refrig_hold_count, + freezer_hold_count = EXCLUDED.freezer_hold_count, + admin_sanction = EXCLUDED.admin_sanction, + parent_permit_no = EXCLUDED.parent_permit_no, + volume_enclosed = EXCLUDED.volume_enclosed, + volume_above_deck = EXCLUDED.volume_above_deck, + volume_below_deck = EXCLUDED.volume_below_deck, + volume_excluded = EXCLUDED.volume_excluded, + raw_data = EXCLUDED.raw_data, + source_file = EXCLUDED.source_file, + loaded_at = now() + """ + tuples = [ + ( + year, r['permit_no'], r['fishery_type'], r['fishery_code'], + r['name_cn'], r['name_en'], r['applicant_cn'], r['applicant_en'], + r['applicant_addr_cn'], r['applicant_addr_en'], r['registration_no'], r['tonnage'], + r['port_cn'], r['port_en'], r['callsign'], r['engine_power'], + r['length_m'], r['beam_m'], r['depth_m'], r['fishing_zones'], + r['fishing_period_1'], r['fishing_period_2'], r['catch_quota_t'], r['cumulative_quota_t'], + r['refrig_hold_count'], r['freezer_hold_count'], r['admin_sanction'], r['parent_permit_no'], + r['volume_enclosed'], r['volume_above_deck'], r['volume_below_deck'], r['volume_excluded'], + json.dumps({k: (v.isoformat() if hasattr(v, 'isoformat') else v) for k, v in r['raw'].items()}, ensure_ascii=False, default=str), + source_file, + ) + for r in rows + ] + psycopg2.extras.execute_values(cur, sql, tuples, page_size=200) + logger.info('fishery_permit_cn upsert: %d rows', len(tuples)) + + +def upsert_companies(cur, rows: list[dict]) -> dict[str, int]: + """신청인(중국어) 기준 fleet_companies upsert → {applicant_cn: company_id}.""" + applicants: dict[str, dict] = {} + for r in rows: + key = r['applicant_cn'] or r['applicant_en'] or 'UNKNOWN' + applicants.setdefault(key, {'name_cn': r['applicant_cn'], 'name_en': r['applicant_en']}) + result: dict[str, int] = {} + for key, meta in applicants.items(): + cur.execute( + """ + INSERT INTO kcg.fleet_companies (name_cn, name_en, country) + VALUES (%s, %s, 'CN') + ON CONFLICT DO NOTHING + """, + (meta['name_cn'], meta['name_en']), + ) + cur.execute( + 'SELECT id FROM kcg.fleet_companies WHERE name_cn IS NOT DISTINCT FROM %s AND name_en IS NOT DISTINCT FROM %s', + (meta['name_cn'], meta['name_en']), + ) + row = cur.fetchone() + if row: + result[key] = row[0] + logger.info('fleet_companies upsert: %d 신청인', len(result)) + return result + + +def sync_fleet_vessels(cur, year: int, rows: list[dict], company_map: dict[str, int]) -> None: + """해당 연도 허가를 fleet_vessels로 동기화. permit_no+permit_year 기준 upsert.""" + # 기존 연도 데이터 먼저 비우기 (허가 취소된 선박 정리) + cur.execute( + 'DELETE FROM kcg.fleet_vessels WHERE permit_year = %s AND permit_no NOT IN %s', + (year, tuple([r['permit_no'] for r in rows]) or ('',)), + ) + inserted = 0 + updated = 0 + permit_to_id: dict[str, int] = {} + for r in rows: + company_key = r['applicant_cn'] or r['applicant_en'] or 'UNKNOWN' + company_id = company_map.get(company_key) + if company_id is None: + continue + fishery_code = r['fishery_code'] + legacy_gear = GEAR_CODE_LEGACY.get(fishery_code, fishery_code) + fleet_role = FISHERY_ROLE.get(fishery_code, 'MAIN') + cur.execute( + """ + INSERT INTO kcg.fleet_vessels ( + company_id, permit_no, name_cn, name_en, tonnage, + gear_code, fishery_code, fleet_role, permit_year, updated_at + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, now()) + ON CONFLICT DO NOTHING + RETURNING id + """, + ( + company_id, r['permit_no'], r['name_cn'], r['name_en'], r['tonnage'], + legacy_gear, fishery_code, fleet_role, year, + ), + ) + ret = cur.fetchone() + if ret: + permit_to_id[r['permit_no']] = ret[0] + inserted += 1 + else: + cur.execute( + """ + UPDATE kcg.fleet_vessels SET + company_id = %s, name_cn = %s, name_en = %s, tonnage = %s, + gear_code = %s, fishery_code = %s, fleet_role = %s, updated_at = now() + WHERE permit_year = %s AND permit_no = %s + RETURNING id + """, + ( + company_id, r['name_cn'], r['name_en'], r['tonnage'], + legacy_gear, fishery_code, fleet_role, year, r['permit_no'], + ), + ) + ret = cur.fetchone() + if ret: + permit_to_id[r['permit_no']] = ret[0] + updated += 1 + logger.info('fleet_vessels 동기화: inserted=%d, updated=%d (year=%d)', inserted, updated, year) + + # PT-S(부속선) → 본선 pair_vessel_id 연결 + pair_linked = 0 + for r in rows: + if r['fishery_code'] != 'PT-S' or not r['parent_permit_no']: + continue + child_id = permit_to_id.get(r['permit_no']) + parent_id = permit_to_id.get(r['parent_permit_no']) + if child_id and parent_id: + cur.execute( + 'UPDATE kcg.fleet_vessels SET pair_vessel_id = %s WHERE id = %s', + (parent_id, child_id), + ) + cur.execute( + 'UPDATE kcg.fleet_vessels SET pair_vessel_id = %s WHERE id = %s AND pair_vessel_id IS NULL', + (child_id, parent_id), + ) + pair_linked += 1 + logger.info('PT-S pair 연결: %d 건', pair_linked) + + +def main() -> None: + repo_root = Path(__file__).resolve().parents[2] + if len(sys.argv) >= 2: + xls_path = Path(sys.argv[1]) + else: + xls_path = _find_latest_xls(repo_root / 'docs') + if not xls_path.exists(): + raise SystemExit(f'파일 없음: {xls_path}') + + year = _extract_year(xls_path) + logger.info('source=%s year=%d', xls_path.name, year) + + rows = parse_xls(xls_path) + + conn = psycopg2.connect( + host=_env('KCGDB_HOST'), + port=int(_env('KCGDB_PORT', '5432')), + dbname=_env('KCGDB_NAME'), + user=_env('KCGDB_USER'), + password=_env('KCGDB_PASSWORD'), + ) + try: + with conn: + with conn.cursor() as cur: + upsert_permits(cur, year, xls_path.name, rows) + company_map = upsert_companies(cur, rows) + sync_fleet_vessels(cur, year, rows, company_map) + logger.info('완료') + finally: + conn.close() + + +if __name__ == '__main__': + main()