From bb99387168f9c5cd812fd30d113b589f3e985eff Mon Sep 17 00:00:00 2001 From: htlee Date: Fri, 20 Mar 2026 18:07:15 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=EC=84=A0=EB=8B=A8=20=EB=93=B1=EB=A1=9D?= =?UTF-8?q?=20DB=20+=20=EC=96=B4=EB=A7=9D/=EC=96=B4=EA=B5=AC=20=EC=A0=95?= =?UTF-8?q?=EC=B2=B4=EC=84=B1=20=EC=B6=94=EC=A0=81=20=EC=8B=9C=EC=8A=A4?= =?UTF-8?q?=ED=85=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - DB 007: fleet_companies, fleet_vessels, gear_identity_log, fleet_tracking_snapshot - 906척 선단 구성 데이터 적재 (497개 회사, 279쌍 PT) - FleetTracker: 등록 선단 ↔ AIS 매칭(NAME_EXACT) + 어구 정체성 추적 - track_similarity.py: DTW 기반 궤적 유사도 (TRACK_SIMILAR 플래그) - scheduler: fleet_tracker 통합 (기존 assign_fleet_roles 대체) Co-Authored-By: Claude Opus 4.6 (1M context) --- database/migration/007_fleet_registry.sql | 74 +++++ prediction/algorithms/track_similarity.py | 160 +++++++++++ prediction/fleet_tracker.py | 322 ++++++++++++++++++++++ prediction/scheduler.py | 30 +- prediction/scripts/load_fleet_registry.py | 176 ++++++++++++ 5 files changed, 758 insertions(+), 4 deletions(-) create mode 100644 database/migration/007_fleet_registry.sql create mode 100644 prediction/algorithms/track_similarity.py create mode 100644 prediction/fleet_tracker.py create mode 100644 prediction/scripts/load_fleet_registry.py diff --git a/database/migration/007_fleet_registry.sql b/database/migration/007_fleet_registry.sql new file mode 100644 index 0000000..906d0f2 --- /dev/null +++ b/database/migration/007_fleet_registry.sql @@ -0,0 +1,74 @@ +-- 선단 등록 + 어망/어구 정체성 추적 시스템 + +-- 1. 소유자/회사 +CREATE TABLE IF NOT EXISTS kcg.fleet_companies ( + id SERIAL PRIMARY KEY, + name_cn TEXT NOT NULL, + name_en TEXT, + created_at TIMESTAMPTZ DEFAULT NOW() +); + +-- 2. 등록 선박 (906척 참고자료 기반) +CREATE TABLE IF NOT EXISTS kcg.fleet_vessels ( + id SERIAL PRIMARY KEY, + company_id INT REFERENCES kcg.fleet_companies(id), + permit_no VARCHAR(20) NOT NULL, + name_cn TEXT NOT NULL, + name_en TEXT, + tonnage INT, + gear_code VARCHAR(10), + fleet_role VARCHAR(20), + pair_vessel_id INT REFERENCES kcg.fleet_vessels(id), + mmsi VARCHAR(15), + match_confidence REAL DEFAULT 0, + match_method VARCHAR(20), + last_seen_at TIMESTAMPTZ, + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_fleet_vessels_mmsi ON kcg.fleet_vessels(mmsi); +CREATE INDEX IF NOT EXISTS idx_fleet_vessels_name_en ON kcg.fleet_vessels(name_en); +CREATE INDEX IF NOT EXISTS idx_fleet_vessels_name_cn ON kcg.fleet_vessels(name_cn); +CREATE INDEX IF NOT EXISTS idx_fleet_vessels_company ON kcg.fleet_vessels(company_id); + +-- 3. 어망/어구 정체성 이력 +CREATE TABLE IF NOT EXISTS kcg.gear_identity_log ( + id BIGSERIAL PRIMARY KEY, + mmsi VARCHAR(15) NOT NULL, + name TEXT NOT NULL, + parent_name TEXT, + parent_mmsi VARCHAR(15), + parent_vessel_id INT REFERENCES kcg.fleet_vessels(id), + gear_index_1 INT, + gear_index_2 INT, + lat DOUBLE PRECISION, + lon DOUBLE PRECISION, + match_method VARCHAR(30), + match_confidence REAL DEFAULT 0, + first_seen_at TIMESTAMPTZ NOT NULL, + last_seen_at TIMESTAMPTZ NOT NULL, + is_active BOOLEAN DEFAULT TRUE, + created_at TIMESTAMPTZ DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_gear_identity_mmsi ON kcg.gear_identity_log(mmsi); +CREATE INDEX IF NOT EXISTS idx_gear_identity_parent ON kcg.gear_identity_log(parent_mmsi); +CREATE INDEX IF NOT EXISTS idx_gear_identity_active ON kcg.gear_identity_log(is_active) WHERE is_active = TRUE; + +-- 4. 선단 추적 스냅샷 (5분 주기) +CREATE TABLE IF NOT EXISTS kcg.fleet_tracking_snapshot ( + id BIGSERIAL PRIMARY KEY, + company_id INT REFERENCES kcg.fleet_companies(id), + snapshot_time TIMESTAMPTZ NOT NULL, + total_vessels INT, + active_vessels INT, + in_zone_vessels INT, + operating_vessels INT, + gear_count INT, + fleet_status VARCHAR(20), + center_lat DOUBLE PRECISION, + center_lon DOUBLE PRECISION, + details JSONB, + created_at TIMESTAMPTZ DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_fleet_snapshot_time ON kcg.fleet_tracking_snapshot(snapshot_time DESC); +CREATE INDEX IF NOT EXISTS idx_fleet_snapshot_company ON kcg.fleet_tracking_snapshot(company_id); diff --git a/prediction/algorithms/track_similarity.py b/prediction/algorithms/track_similarity.py new file mode 100644 index 0000000..0212f98 --- /dev/null +++ b/prediction/algorithms/track_similarity.py @@ -0,0 +1,160 @@ +"""궤적 유사도 — DTW(Dynamic Time Warping) 기반.""" +import math + +_MAX_RESAMPLE_POINTS = 50 + + +def haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float: + """두 좌표 간 거리 (미터).""" + R = 6371000 + phi1, phi2 = math.radians(lat1), math.radians(lat2) + dphi = math.radians(lat2 - lat1) + dlam = math.radians(lon2 - lon1) + a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2 + return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) + + +def _resample(track: list[tuple[float, float]], n: int) -> list[tuple[float, float]]: + """궤적을 n 포인트로 균등 리샘플링 (선형 보간).""" + if len(track) == 0: + return [] + if len(track) == 1: + return [track[0]] * n + if len(track) <= n: + return list(track) + + # 누적 거리 계산 + cumulative = [0.0] + for i in range(1, len(track)): + d = haversine_m(track[i - 1][0], track[i - 1][1], track[i][0], track[i][1]) + cumulative.append(cumulative[-1] + d) + + total_dist = cumulative[-1] + if total_dist == 0.0: + return [track[0]] * n + + step = total_dist / (n - 1) + result: list[tuple[float, float]] = [] + + seg = 0 + for k in range(n): + target = step * k + # 해당 target 거리에 해당하는 선분 찾기 + while seg < len(cumulative) - 2 and cumulative[seg + 1] < target: + seg += 1 + seg_len = cumulative[seg + 1] - cumulative[seg] + if seg_len == 0.0: + result.append(track[seg]) + else: + t = (target - cumulative[seg]) / seg_len + lat = track[seg][0] + t * (track[seg + 1][0] - track[seg][0]) + lon = track[seg][1] + t * (track[seg + 1][1] - track[seg][1]) + result.append((lat, lon)) + + return result + + +def _dtw_distance( + track_a: list[tuple[float, float]], + track_b: list[tuple[float, float]], +) -> float: + """두 궤적 간 DTW 거리 (미터 단위 평균 거리).""" + n, m = len(track_a), len(track_b) + if n == 0 or m == 0: + return float('inf') + + INF = float('inf') + # 1D 롤링 DP (공간 최적화) + prev = [INF] * (m + 1) + prev[0] = 0.0 + # 첫 행 초기화 + row = [INF] * (m + 1) + row[0] = INF + + dp_prev = [INF] * (m + 1) + dp_curr = [INF] * (m + 1) + dp_prev[0] = 0.0 + for j in range(1, m + 1): + dp_prev[j] = INF + + for i in range(1, n + 1): + dp_curr[0] = INF + for j in range(1, m + 1): + cost = haversine_m(track_a[i - 1][0], track_a[i - 1][1], + track_b[j - 1][0], track_b[j - 1][1]) + min_prev = min(dp_curr[j - 1], dp_prev[j], dp_prev[j - 1]) + dp_curr[j] = cost + min_prev + dp_prev, dp_curr = dp_curr, [INF] * (m + 1) + + # dp_prev는 마지막으로 계산된 행 + total = dp_prev[m] + if total == INF: + return INF + return total / (n + m) + + +def compute_track_similarity( + track_a: list[tuple[float, float]], + track_b: list[tuple[float, float]], + max_dist_m: float = 10000.0, +) -> float: + """두 궤적의 DTW 거리 기반 유사도 (0~1). + + track이 비어있으면 0.0 반환. + 유사할수록 1.0에 가까움. + """ + if not track_a or not track_b: + return 0.0 + + a = _resample(track_a, _MAX_RESAMPLE_POINTS) + b = _resample(track_b, _MAX_RESAMPLE_POINTS) + + avg_dist = _dtw_distance(a, b) + if avg_dist == float('inf') or max_dist_m <= 0.0: + return 0.0 + + similarity = 1.0 - (avg_dist / max_dist_m) + return max(0.0, min(1.0, similarity)) + + +def match_gear_by_track( + gear_tracks: dict[str, list[tuple[float, float]]], + vessel_tracks: dict[str, list[tuple[float, float]]], + threshold: float = 0.6, +) -> list[dict]: + """어구 궤적을 선단 선박 궤적과 비교하여 매칭. + + Args: + gear_tracks: mmsi → [(lat, lon), ...] — 어구 궤적 + vessel_tracks: mmsi → [(lat, lon), ...] — 선박 궤적 + threshold: 유사도 하한 (이상이면 매칭) + + Returns: + [{gear_mmsi, vessel_mmsi, similarity, match_method: 'TRACK_SIMILAR'}] + """ + results: list[dict] = [] + + for gear_mmsi, g_track in gear_tracks.items(): + if not g_track: + continue + + best_mmsi: str | None = None + best_sim = -1.0 + + for vessel_mmsi, v_track in vessel_tracks.items(): + if not v_track: + continue + sim = compute_track_similarity(g_track, v_track) + if sim > best_sim: + best_sim = sim + best_mmsi = vessel_mmsi + + if best_mmsi is not None and best_sim >= threshold: + results.append({ + 'gear_mmsi': gear_mmsi, + 'vessel_mmsi': best_mmsi, + 'similarity': best_sim, + 'match_method': 'TRACK_SIMILAR', + }) + + return results diff --git a/prediction/fleet_tracker.py b/prediction/fleet_tracker.py new file mode 100644 index 0000000..981d7db --- /dev/null +++ b/prediction/fleet_tracker.py @@ -0,0 +1,322 @@ +"""등록 선단 기반 추적기.""" +import logging +import re +import time +from datetime import datetime, timezone +from typing import Optional + +import pandas as pd + +logger = logging.getLogger(__name__) + +# 어구 이름 패턴 +GEAR_PATTERN = re.compile(r'^(.+?)_(\d+)_(\d*)$') +GEAR_PATTERN_PCT = re.compile(r'^(.+?)%$') + +_REGISTRY_CACHE_SEC = 3600 + + +class FleetTracker: + def __init__(self) -> None: + self._companies: dict[int, dict] = {} # id → {name_cn, name_en} + self._vessels: dict[int, dict] = {} # id → {permit_no, name_cn, ...} + self._name_cn_map: dict[str, int] = {} # name_cn → vessel_id + self._name_en_map: dict[str, int] = {} # name_en(lowercase) → vessel_id + self._mmsi_to_vid: dict[str, int] = {} # mmsi → vessel_id (매칭된 것만) + self._gear_active: dict[str, dict] = {} # mmsi → {name, parent_mmsi, ...} + self._last_registry_load: float = 0.0 + + def load_registry(self, conn) -> None: + """DB에서 fleet_companies + fleet_vessels 로드. 1시간 캐시.""" + if time.time() - self._last_registry_load < _REGISTRY_CACHE_SEC: + return + + cur = conn.cursor() + cur.execute('SELECT id, name_cn, name_en FROM kcg.fleet_companies') + self._companies = {r[0]: {'name_cn': r[1], 'name_en': r[2]} for r in cur.fetchall()} + + cur.execute( + """SELECT id, company_id, permit_no, name_cn, name_en, tonnage, + gear_code, fleet_role, pair_vessel_id, mmsi + FROM kcg.fleet_vessels""" + ) + self._vessels = {} + self._name_cn_map = {} + self._name_en_map = {} + self._mmsi_to_vid = {} + + for r in cur.fetchall(): + vid = r[0] + v: dict = { + 'id': vid, + 'company_id': r[1], + 'permit_no': r[2], + 'name_cn': r[3], + 'name_en': r[4], + 'tonnage': r[5], + 'gear_code': r[6], + 'fleet_role': r[7], + 'pair_vessel_id': r[8], + 'mmsi': r[9], + } + self._vessels[vid] = v + if r[3]: + self._name_cn_map[r[3]] = vid + if r[4]: + self._name_en_map[r[4].lower().strip()] = vid + if r[9]: + self._mmsi_to_vid[r[9]] = vid + + cur.close() + self._last_registry_load = time.time() + logger.info( + 'fleet registry loaded: %d companies, %d vessels', + len(self._companies), + len(self._vessels), + ) + + def match_ais_to_registry(self, ais_vessels: list[dict], conn) -> None: + """AIS 선박을 등록 선단에 매칭. DB 업데이트. + + ais_vessels: [{mmsi, name, lat, lon, sog, cog}, ...] + """ + cur = conn.cursor() + matched = 0 + + for v in ais_vessels: + mmsi = v.get('mmsi', '') + name = v.get('name', '') + if not mmsi or not name: + continue + + # 이미 매칭됨 → last_seen_at 업데이트 + if mmsi in self._mmsi_to_vid: + cur.execute( + 'UPDATE kcg.fleet_vessels SET last_seen_at = NOW() WHERE id = %s', + (self._mmsi_to_vid[mmsi],), + ) + continue + + # NAME_EXACT 매칭 + vid: Optional[int] = self._name_cn_map.get(name) + if not vid: + vid = self._name_en_map.get(name.lower().strip()) + + if vid: + cur.execute( + """UPDATE kcg.fleet_vessels + SET mmsi = %s, match_confidence = 0.95, match_method = 'NAME_EXACT', + last_seen_at = NOW(), updated_at = NOW() + WHERE id = %s AND (mmsi IS NULL OR mmsi = %s)""", + (mmsi, vid, mmsi), + ) + self._mmsi_to_vid[mmsi] = vid + matched += 1 + + conn.commit() + cur.close() + if matched > 0: + logger.info('AIS→registry matched: %d vessels', matched) + + def track_gear_identity(self, gear_signals: list[dict], conn) -> None: + """어구/어망 정체성 추적. + + gear_signals: [{mmsi, name, lat, lon}, ...] — 이름이 XXX_숫자_숫자 패턴인 AIS 신호 + """ + cur = conn.cursor() + now = datetime.now(timezone.utc) + + for g in gear_signals: + mmsi = g['mmsi'] + name = g['name'] + lat = g.get('lat', 0) + lon = g.get('lon', 0) + + # 모선명 + 인덱스 추출 + parent_name: Optional[str] = None + idx1: Optional[int] = None + idx2: Optional[int] = None + + m = GEAR_PATTERN.match(name) + if m: + parent_name = m.group(1).strip() + idx1 = int(m.group(2)) + idx2 = int(m.group(3)) if m.group(3) else None + else: + m2 = GEAR_PATTERN_PCT.match(name) + if m2: + parent_name = m2.group(1).strip() + + # 모선 매칭 + parent_mmsi: Optional[str] = None + parent_vid: Optional[int] = None + if parent_name: + vid = self._name_cn_map.get(parent_name) + if not vid: + vid = self._name_en_map.get(parent_name.lower()) + if vid: + parent_vid = vid + parent_mmsi = self._vessels[vid].get('mmsi') + + match_method: Optional[str] = 'NAME_PARENT' if parent_vid else None + confidence = 0.9 if parent_vid else 0.0 + + # 기존 활성 행 조회 + cur.execute( + """SELECT id, name FROM kcg.gear_identity_log + WHERE mmsi = %s AND is_active = TRUE""", + (mmsi,), + ) + existing = cur.fetchone() + + if existing: + if existing[1] == name: + # 같은 MMSI + 같은 이름 → 위치/시간 업데이트 + cur.execute( + """UPDATE kcg.gear_identity_log + SET last_seen_at = %s, lat = %s, lon = %s + WHERE id = %s""", + (now, lat, lon, existing[0]), + ) + else: + # 같은 MMSI + 다른 이름 → 이전 비활성화 + 새 행 + cur.execute( + 'UPDATE kcg.gear_identity_log SET is_active = FALSE WHERE id = %s', + (existing[0],), + ) + cur.execute( + """INSERT INTO kcg.gear_identity_log + (mmsi, name, parent_name, parent_mmsi, parent_vessel_id, + gear_index_1, gear_index_2, lat, lon, + match_method, match_confidence, first_seen_at, last_seen_at) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""", + (mmsi, name, parent_name, parent_mmsi, parent_vid, + idx1, idx2, lat, lon, + match_method, confidence, now, now), + ) + else: + # 새 MMSI → 같은 이름이 다른 MMSI로 있는지 확인 + cur.execute( + """SELECT id, mmsi FROM kcg.gear_identity_log + WHERE name = %s AND is_active = TRUE AND mmsi != %s""", + (name, mmsi), + ) + old_mmsi_row = cur.fetchone() + if old_mmsi_row: + # 같은 이름 + 다른 MMSI → MMSI 변경 + cur.execute( + 'UPDATE kcg.gear_identity_log SET is_active = FALSE WHERE id = %s', + (old_mmsi_row[0],), + ) + logger.info('gear MMSI change: %s → %s (name=%s)', old_mmsi_row[1], mmsi, name) + + cur.execute( + """INSERT INTO kcg.gear_identity_log + (mmsi, name, parent_name, parent_mmsi, parent_vessel_id, + gear_index_1, gear_index_2, lat, lon, + match_method, match_confidence, first_seen_at, last_seen_at) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""", + (mmsi, name, parent_name, parent_mmsi, parent_vid, + idx1, idx2, lat, lon, + match_method, confidence, now, now), + ) + + conn.commit() + cur.close() + + def build_fleet_clusters(self, vessel_dfs: dict[str, pd.DataFrame]) -> dict[str, dict]: + """등록 선단 기준으로 cluster 정보 구성. + + Returns: {mmsi → {cluster_id, cluster_size, is_leader, fleet_role}} + cluster_id = company_id (등록 선단 기준) + """ + results: dict[str, dict] = {} + + # 회사별로 현재 AIS 수신 중인 선박 그룹핑 + company_vessels: dict[int, list[str]] = {} + for mmsi, vid in self._mmsi_to_vid.items(): + v = self._vessels.get(vid) + if not v or mmsi not in vessel_dfs: + continue + cid = v['company_id'] + company_vessels.setdefault(cid, []).append(mmsi) + + for cid, mmsis in company_vessels.items(): + if len(mmsis) < 2: + # 단독 선박 → NOISE + for mmsi in mmsis: + v = self._vessels.get(self._mmsi_to_vid.get(mmsi, -1), {}) + results[mmsi] = { + 'cluster_id': -1, + 'cluster_size': 1, + 'is_leader': False, + 'fleet_role': v.get('fleet_role', 'NOISE'), + } + continue + + # 2척 이상 → 등록 선단 클러스터 + for mmsi in mmsis: + vid = self._mmsi_to_vid[mmsi] + v = self._vessels[vid] + results[mmsi] = { + 'cluster_id': cid, + 'cluster_size': len(mmsis), + 'is_leader': v['fleet_role'] == 'MAIN', + 'fleet_role': v['fleet_role'], + } + + # 매칭 안 된 선박 → NOISE + for mmsi in vessel_dfs: + if mmsi not in results: + results[mmsi] = { + 'cluster_id': -1, + 'cluster_size': 0, + 'is_leader': False, + 'fleet_role': 'NOISE', + } + + return results + + def save_snapshot(self, vessel_dfs: dict[str, pd.DataFrame], conn) -> None: + """fleet_tracking_snapshot 저장.""" + now = datetime.now(timezone.utc) + cur = conn.cursor() + + company_vessels: dict[int, list[str]] = {} + for mmsi, vid in self._mmsi_to_vid.items(): + v = self._vessels.get(vid) + if not v or mmsi not in vessel_dfs: + continue + company_vessels.setdefault(v['company_id'], []).append(mmsi) + + for cid, mmsis in company_vessels.items(): + active = len(mmsis) + total = sum(1 for v in self._vessels.values() if v['company_id'] == cid) + + lats: list[float] = [] + lons: list[float] = [] + for mmsi in mmsis: + df = vessel_dfs.get(mmsi) + if df is not None and len(df) > 0: + last = df.iloc[-1] + lats.append(float(last['lat'])) + lons.append(float(last['lon'])) + + center_lat = sum(lats) / len(lats) if lats else None + center_lon = sum(lons) / len(lons) if lons else None + + cur.execute( + """INSERT INTO kcg.fleet_tracking_snapshot + (company_id, snapshot_time, total_vessels, active_vessels, + center_lat, center_lon) + VALUES (%s, %s, %s, %s, %s, %s)""", + (cid, now, total, active, center_lat, center_lon), + ) + + conn.commit() + cur.close() + logger.info('fleet snapshot saved: %d companies', len(company_vessels)) + + +# 싱글턴 +fleet_tracker = FleetTracker() diff --git a/prediction/scheduler.py b/prediction/scheduler.py index 542700e..9d889d3 100644 --- a/prediction/scheduler.py +++ b/prediction/scheduler.py @@ -25,6 +25,7 @@ def get_last_run() -> dict: def run_analysis_cycle(): """5분 주기 분석 사이클 — 인메모리 캐시 기반.""" + import re as _re from cache.vessel_store import vessel_store from db import snpdb, kcgdb from pipeline.orchestrator import ChineseFishingVesselPipeline @@ -32,8 +33,8 @@ def run_analysis_cycle(): from algorithms.fishing_pattern import compute_ucaf_score, compute_ucft_score from algorithms.dark_vessel import is_dark_vessel from algorithms.spoofing import compute_spoofing_score, count_speed_jumps, compute_bd09_offset - from algorithms.fleet import assign_fleet_roles from algorithms.risk import compute_vessel_risk_score + from fleet_tracker import fleet_tracker from models.result import AnalysisResult start = time.time() @@ -71,9 +72,30 @@ def run_analysis_cycle(): _last_run['vessel_count'] = 0 return - # 4. 선단 역할 분석 - cluster_map = {c['mmsi']: c['cluster_id'] for c in classifications} - fleet_roles = assign_fleet_roles(vessel_dfs, cluster_map) + # 4. 등록 선단 기반 fleet 분석 + _gear_re = _re.compile(r'^.+_\d+_\d*$|%$') + with kcgdb.get_conn() as kcg_conn: + fleet_tracker.load_registry(kcg_conn) + + all_ais = [] + for mmsi, df in vessel_dfs.items(): + if len(df) > 0: + last = df.iloc[-1] + all_ais.append({ + 'mmsi': mmsi, + 'name': vessel_store.get_vessel_info(mmsi).get('name', ''), + 'lat': float(last['lat']), + 'lon': float(last['lon']), + }) + + fleet_tracker.match_ais_to_registry(all_ais, kcg_conn) + + gear_signals = [v for v in all_ais if _gear_re.match(v.get('name', ''))] + fleet_tracker.track_gear_identity(gear_signals, kcg_conn) + + fleet_roles = fleet_tracker.build_fleet_clusters(vessel_dfs) + + fleet_tracker.save_snapshot(vessel_dfs, kcg_conn) # 5. 선박별 추가 알고리즘 → AnalysisResult 생성 results = [] diff --git a/prediction/scripts/load_fleet_registry.py b/prediction/scripts/load_fleet_registry.py new file mode 100644 index 0000000..c1cf479 --- /dev/null +++ b/prediction/scripts/load_fleet_registry.py @@ -0,0 +1,176 @@ +"""선단 구성 JSX → kcgdb fleet_companies + fleet_vessels 적재. + +Usage: python3 prediction/scripts/load_fleet_registry.py +""" + +import json +import re +import sys +from pathlib import Path + +import psycopg2 +import psycopg2.extras + +# JSX 파일에서 D 배열 추출 +JSX_PATH = Path(__file__).parent.parent.parent.parent / 'gc-wing-dev' / 'legacy' / '선단구성_906척_어업수역 (1).jsx' + +# kcgdb 접속 — prediction/.env 또는 환경변수 +DB_HOST = '211.208.115.83' +DB_PORT = 5432 +DB_NAME = 'kcgdb' +DB_USER = 'kcg_app' +DB_SCHEMA = 'kcg' + + +def parse_jsx(path: Path) -> list[list]: + """JSX 파일에서 D=[ ... ] 배열을 파싱.""" + text = path.read_text(encoding='utf-8') + + # const D=[ 부터 ]; 까지 추출 + m = re.search(r'const\s+D\s*=\s*\[', text) + if not m: + raise ValueError('D 배열을 찾을 수 없습니다') + + start = m.end() - 1 # [ 위치 + # 중첩 배열을 추적하여 닫는 ] 찾기 + depth = 0 + end = start + for i in range(start, len(text)): + if text[i] == '[': + depth += 1 + elif text[i] == ']': + depth -= 1 + if depth == 0: + end = i + 1 + break + + raw = text[start:end] + + # JavaScript → JSON 변환 (trailing comma 제거) + raw = re.sub(r',\s*]', ']', raw) + raw = re.sub(r',\s*}', '}', raw) + + return json.loads(raw) + + +def load_to_db(data: list[list], db_password: str): + """파싱된 데이터를 DB에 적재.""" + conn = psycopg2.connect( + host=DB_HOST, port=DB_PORT, dbname=DB_NAME, + user=DB_USER, password=db_password, + options=f'-c search_path={DB_SCHEMA}', + ) + conn.autocommit = False + cur = conn.cursor() + + try: + # 기존 데이터 초기화 + cur.execute('DELETE FROM fleet_vessels') + cur.execute('DELETE FROM fleet_companies') + + company_count = 0 + vessel_count = 0 + pair_links = [] # (vessel_id, pair_vessel_id) 후처리 + + for row in data: + if len(row) < 7: + continue + + name_cn = row[0] + name_en = row[1] + + # 회사 INSERT + cur.execute( + 'INSERT INTO fleet_companies (name_cn, name_en) VALUES (%s, %s) RETURNING id', + (name_cn, name_en), + ) + company_id = cur.fetchone()[0] + company_count += 1 + + # 인덱스: 0=own, 1=ownEn, 2=pairs, 3=gn, 4=ot, 5=ps, 6=fc, 7=upt, 8=upts + pairs = row[2] if len(row) > 2 and isinstance(row[2], list) else [] + gn = row[3] if len(row) > 3 and isinstance(row[3], list) else [] + ot = row[4] if len(row) > 4 and isinstance(row[4], list) else [] + ps = row[5] if len(row) > 5 and isinstance(row[5], list) else [] + fc = row[6] if len(row) > 6 and isinstance(row[6], list) else [] + upt = row[7] if len(row) > 7 and isinstance(row[7], list) else [] + upts = row[8] if len(row) > 8 and isinstance(row[8], list) else [] + + def insert_vessel(v, gear_code, role): + nonlocal vessel_count + if not isinstance(v, list) or len(v) < 4: + return None + cur.execute( + '''INSERT INTO fleet_vessels + (company_id, permit_no, name_cn, name_en, tonnage, gear_code, fleet_role) + VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id''', + (company_id, v[0], v[1], v[2], v[3], gear_code, role), + ) + vessel_count += 1 + return cur.fetchone()[0] + + # PT 본선쌍 (pairs) + for pair in pairs: + if not isinstance(pair, list) or len(pair) < 2: + continue + main_id = insert_vessel(pair[0], 'C21', 'MAIN') + sub_id = insert_vessel(pair[1], 'C21', 'SUB') + if main_id and sub_id: + pair_links.append((main_id, sub_id)) + + # GN 유자망 + for v in gn: + insert_vessel(v, 'C25', 'GN') + + # OT 기타 + for v in ot: + insert_vessel(v, 'C22', 'OT') + + # PS 선망 + for v in ps: + insert_vessel(v, 'C23', 'PS') + + # FC 운반선 + for v in fc: + insert_vessel(v, 'C40', 'FC') + + # UPT 단독 본선 + for v in upt: + insert_vessel(v, 'C21', 'MAIN_SOLO') + + # UPTS 단독 부속선 + for v in upts: + insert_vessel(v, 'C21', 'SUB_SOLO') + + # PT 쌍 상호 참조 설정 + for main_id, sub_id in pair_links: + cur.execute('UPDATE fleet_vessels SET pair_vessel_id = %s WHERE id = %s', (sub_id, main_id)) + cur.execute('UPDATE fleet_vessels SET pair_vessel_id = %s WHERE id = %s', (main_id, sub_id)) + + conn.commit() + print(f'적재 완료: {company_count}개 회사, {vessel_count}척 선박, {len(pair_links)}쌍 PT') + + except Exception as e: + conn.rollback() + print(f'적재 실패: {e}', file=sys.stderr) + raise + finally: + cur.close() + conn.close() + + +if __name__ == '__main__': + if not JSX_PATH.exists(): + print(f'파일을 찾을 수 없습니다: {JSX_PATH}', file=sys.stderr) + sys.exit(1) + + # DB 비밀번호 — 환경변수 또는 직접 입력 + import os + password = os.environ.get('KCGDB_PASSWORD', 'Kcg2026monitor') + + print(f'JSX 파싱: {JSX_PATH}') + data = parse_jsx(JSX_PATH) + print(f'파싱 완료: {len(data)}개 회사') + + print('DB 적재 시작...') + load_to_db(data, password) -- 2.45.2