Merge pull request 'release: 선단 등록 DB + 어구 추적' (#131) from develop into main
All checks were successful
Deploy KCG / deploy (push) Successful in 2m3s

This commit is contained in:
htlee 2026-03-20 18:07:46 +09:00
커밋 9cf2dbe58c
5개의 변경된 파일758개의 추가작업 그리고 4개의 파일을 삭제

파일 보기

@ -0,0 +1,74 @@
-- 선단 등록 + 어망/어구 정체성 추적 시스템
-- 1. 소유자/회사
CREATE TABLE IF NOT EXISTS kcg.fleet_companies (
id SERIAL PRIMARY KEY,
name_cn TEXT NOT NULL,
name_en TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 2. 등록 선박 (906척 참고자료 기반)
CREATE TABLE IF NOT EXISTS kcg.fleet_vessels (
id SERIAL PRIMARY KEY,
company_id INT REFERENCES kcg.fleet_companies(id),
permit_no VARCHAR(20) NOT NULL,
name_cn TEXT NOT NULL,
name_en TEXT,
tonnage INT,
gear_code VARCHAR(10),
fleet_role VARCHAR(20),
pair_vessel_id INT REFERENCES kcg.fleet_vessels(id),
mmsi VARCHAR(15),
match_confidence REAL DEFAULT 0,
match_method VARCHAR(20),
last_seen_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_fleet_vessels_mmsi ON kcg.fleet_vessels(mmsi);
CREATE INDEX IF NOT EXISTS idx_fleet_vessels_name_en ON kcg.fleet_vessels(name_en);
CREATE INDEX IF NOT EXISTS idx_fleet_vessels_name_cn ON kcg.fleet_vessels(name_cn);
CREATE INDEX IF NOT EXISTS idx_fleet_vessels_company ON kcg.fleet_vessels(company_id);
-- 3. 어망/어구 정체성 이력
CREATE TABLE IF NOT EXISTS kcg.gear_identity_log (
id BIGSERIAL PRIMARY KEY,
mmsi VARCHAR(15) NOT NULL,
name TEXT NOT NULL,
parent_name TEXT,
parent_mmsi VARCHAR(15),
parent_vessel_id INT REFERENCES kcg.fleet_vessels(id),
gear_index_1 INT,
gear_index_2 INT,
lat DOUBLE PRECISION,
lon DOUBLE PRECISION,
match_method VARCHAR(30),
match_confidence REAL DEFAULT 0,
first_seen_at TIMESTAMPTZ NOT NULL,
last_seen_at TIMESTAMPTZ NOT NULL,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_gear_identity_mmsi ON kcg.gear_identity_log(mmsi);
CREATE INDEX IF NOT EXISTS idx_gear_identity_parent ON kcg.gear_identity_log(parent_mmsi);
CREATE INDEX IF NOT EXISTS idx_gear_identity_active ON kcg.gear_identity_log(is_active) WHERE is_active = TRUE;
-- 4. 선단 추적 스냅샷 (5분 주기)
CREATE TABLE IF NOT EXISTS kcg.fleet_tracking_snapshot (
id BIGSERIAL PRIMARY KEY,
company_id INT REFERENCES kcg.fleet_companies(id),
snapshot_time TIMESTAMPTZ NOT NULL,
total_vessels INT,
active_vessels INT,
in_zone_vessels INT,
operating_vessels INT,
gear_count INT,
fleet_status VARCHAR(20),
center_lat DOUBLE PRECISION,
center_lon DOUBLE PRECISION,
details JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_fleet_snapshot_time ON kcg.fleet_tracking_snapshot(snapshot_time DESC);
CREATE INDEX IF NOT EXISTS idx_fleet_snapshot_company ON kcg.fleet_tracking_snapshot(company_id);

파일 보기

@ -0,0 +1,160 @@
"""궤적 유사도 — DTW(Dynamic Time Warping) 기반."""
import math
_MAX_RESAMPLE_POINTS = 50
def haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""두 좌표 간 거리 (미터)."""
R = 6371000
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def _resample(track: list[tuple[float, float]], n: int) -> list[tuple[float, float]]:
"""궤적을 n 포인트로 균등 리샘플링 (선형 보간)."""
if len(track) == 0:
return []
if len(track) == 1:
return [track[0]] * n
if len(track) <= n:
return list(track)
# 누적 거리 계산
cumulative = [0.0]
for i in range(1, len(track)):
d = haversine_m(track[i - 1][0], track[i - 1][1], track[i][0], track[i][1])
cumulative.append(cumulative[-1] + d)
total_dist = cumulative[-1]
if total_dist == 0.0:
return [track[0]] * n
step = total_dist / (n - 1)
result: list[tuple[float, float]] = []
seg = 0
for k in range(n):
target = step * k
# 해당 target 거리에 해당하는 선분 찾기
while seg < len(cumulative) - 2 and cumulative[seg + 1] < target:
seg += 1
seg_len = cumulative[seg + 1] - cumulative[seg]
if seg_len == 0.0:
result.append(track[seg])
else:
t = (target - cumulative[seg]) / seg_len
lat = track[seg][0] + t * (track[seg + 1][0] - track[seg][0])
lon = track[seg][1] + t * (track[seg + 1][1] - track[seg][1])
result.append((lat, lon))
return result
def _dtw_distance(
track_a: list[tuple[float, float]],
track_b: list[tuple[float, float]],
) -> float:
"""두 궤적 간 DTW 거리 (미터 단위 평균 거리)."""
n, m = len(track_a), len(track_b)
if n == 0 or m == 0:
return float('inf')
INF = float('inf')
# 1D 롤링 DP (공간 최적화)
prev = [INF] * (m + 1)
prev[0] = 0.0
# 첫 행 초기화
row = [INF] * (m + 1)
row[0] = INF
dp_prev = [INF] * (m + 1)
dp_curr = [INF] * (m + 1)
dp_prev[0] = 0.0
for j in range(1, m + 1):
dp_prev[j] = INF
for i in range(1, n + 1):
dp_curr[0] = INF
for j in range(1, m + 1):
cost = haversine_m(track_a[i - 1][0], track_a[i - 1][1],
track_b[j - 1][0], track_b[j - 1][1])
min_prev = min(dp_curr[j - 1], dp_prev[j], dp_prev[j - 1])
dp_curr[j] = cost + min_prev
dp_prev, dp_curr = dp_curr, [INF] * (m + 1)
# dp_prev는 마지막으로 계산된 행
total = dp_prev[m]
if total == INF:
return INF
return total / (n + m)
def compute_track_similarity(
track_a: list[tuple[float, float]],
track_b: list[tuple[float, float]],
max_dist_m: float = 10000.0,
) -> float:
"""두 궤적의 DTW 거리 기반 유사도 (0~1).
track이 비어있으면 0.0 반환.
유사할수록 1.0 가까움.
"""
if not track_a or not track_b:
return 0.0
a = _resample(track_a, _MAX_RESAMPLE_POINTS)
b = _resample(track_b, _MAX_RESAMPLE_POINTS)
avg_dist = _dtw_distance(a, b)
if avg_dist == float('inf') or max_dist_m <= 0.0:
return 0.0
similarity = 1.0 - (avg_dist / max_dist_m)
return max(0.0, min(1.0, similarity))
def match_gear_by_track(
gear_tracks: dict[str, list[tuple[float, float]]],
vessel_tracks: dict[str, list[tuple[float, float]]],
threshold: float = 0.6,
) -> list[dict]:
"""어구 궤적을 선단 선박 궤적과 비교하여 매칭.
Args:
gear_tracks: mmsi [(lat, lon), ...] 어구 궤적
vessel_tracks: mmsi [(lat, lon), ...] 선박 궤적
threshold: 유사도 하한 (이상이면 매칭)
Returns:
[{gear_mmsi, vessel_mmsi, similarity, match_method: 'TRACK_SIMILAR'}]
"""
results: list[dict] = []
for gear_mmsi, g_track in gear_tracks.items():
if not g_track:
continue
best_mmsi: str | None = None
best_sim = -1.0
for vessel_mmsi, v_track in vessel_tracks.items():
if not v_track:
continue
sim = compute_track_similarity(g_track, v_track)
if sim > best_sim:
best_sim = sim
best_mmsi = vessel_mmsi
if best_mmsi is not None and best_sim >= threshold:
results.append({
'gear_mmsi': gear_mmsi,
'vessel_mmsi': best_mmsi,
'similarity': best_sim,
'match_method': 'TRACK_SIMILAR',
})
return results

322
prediction/fleet_tracker.py Normal file
파일 보기

@ -0,0 +1,322 @@
"""등록 선단 기반 추적기."""
import logging
import re
import time
from datetime import datetime, timezone
from typing import Optional
import pandas as pd
logger = logging.getLogger(__name__)
# 어구 이름 패턴
GEAR_PATTERN = re.compile(r'^(.+?)_(\d+)_(\d*)$')
GEAR_PATTERN_PCT = re.compile(r'^(.+?)%$')
_REGISTRY_CACHE_SEC = 3600
class FleetTracker:
def __init__(self) -> None:
self._companies: dict[int, dict] = {} # id → {name_cn, name_en}
self._vessels: dict[int, dict] = {} # id → {permit_no, name_cn, ...}
self._name_cn_map: dict[str, int] = {} # name_cn → vessel_id
self._name_en_map: dict[str, int] = {} # name_en(lowercase) → vessel_id
self._mmsi_to_vid: dict[str, int] = {} # mmsi → vessel_id (매칭된 것만)
self._gear_active: dict[str, dict] = {} # mmsi → {name, parent_mmsi, ...}
self._last_registry_load: float = 0.0
def load_registry(self, conn) -> None:
"""DB에서 fleet_companies + fleet_vessels 로드. 1시간 캐시."""
if time.time() - self._last_registry_load < _REGISTRY_CACHE_SEC:
return
cur = conn.cursor()
cur.execute('SELECT id, name_cn, name_en FROM kcg.fleet_companies')
self._companies = {r[0]: {'name_cn': r[1], 'name_en': r[2]} for r in cur.fetchall()}
cur.execute(
"""SELECT id, company_id, permit_no, name_cn, name_en, tonnage,
gear_code, fleet_role, pair_vessel_id, mmsi
FROM kcg.fleet_vessels"""
)
self._vessels = {}
self._name_cn_map = {}
self._name_en_map = {}
self._mmsi_to_vid = {}
for r in cur.fetchall():
vid = r[0]
v: dict = {
'id': vid,
'company_id': r[1],
'permit_no': r[2],
'name_cn': r[3],
'name_en': r[4],
'tonnage': r[5],
'gear_code': r[6],
'fleet_role': r[7],
'pair_vessel_id': r[8],
'mmsi': r[9],
}
self._vessels[vid] = v
if r[3]:
self._name_cn_map[r[3]] = vid
if r[4]:
self._name_en_map[r[4].lower().strip()] = vid
if r[9]:
self._mmsi_to_vid[r[9]] = vid
cur.close()
self._last_registry_load = time.time()
logger.info(
'fleet registry loaded: %d companies, %d vessels',
len(self._companies),
len(self._vessels),
)
def match_ais_to_registry(self, ais_vessels: list[dict], conn) -> None:
"""AIS 선박을 등록 선단에 매칭. DB 업데이트.
ais_vessels: [{mmsi, name, lat, lon, sog, cog}, ...]
"""
cur = conn.cursor()
matched = 0
for v in ais_vessels:
mmsi = v.get('mmsi', '')
name = v.get('name', '')
if not mmsi or not name:
continue
# 이미 매칭됨 → last_seen_at 업데이트
if mmsi in self._mmsi_to_vid:
cur.execute(
'UPDATE kcg.fleet_vessels SET last_seen_at = NOW() WHERE id = %s',
(self._mmsi_to_vid[mmsi],),
)
continue
# NAME_EXACT 매칭
vid: Optional[int] = self._name_cn_map.get(name)
if not vid:
vid = self._name_en_map.get(name.lower().strip())
if vid:
cur.execute(
"""UPDATE kcg.fleet_vessels
SET mmsi = %s, match_confidence = 0.95, match_method = 'NAME_EXACT',
last_seen_at = NOW(), updated_at = NOW()
WHERE id = %s AND (mmsi IS NULL OR mmsi = %s)""",
(mmsi, vid, mmsi),
)
self._mmsi_to_vid[mmsi] = vid
matched += 1
conn.commit()
cur.close()
if matched > 0:
logger.info('AIS→registry matched: %d vessels', matched)
def track_gear_identity(self, gear_signals: list[dict], conn) -> None:
"""어구/어망 정체성 추적.
gear_signals: [{mmsi, name, lat, lon}, ...] 이름이 XXX_숫자_숫자 패턴인 AIS 신호
"""
cur = conn.cursor()
now = datetime.now(timezone.utc)
for g in gear_signals:
mmsi = g['mmsi']
name = g['name']
lat = g.get('lat', 0)
lon = g.get('lon', 0)
# 모선명 + 인덱스 추출
parent_name: Optional[str] = None
idx1: Optional[int] = None
idx2: Optional[int] = None
m = GEAR_PATTERN.match(name)
if m:
parent_name = m.group(1).strip()
idx1 = int(m.group(2))
idx2 = int(m.group(3)) if m.group(3) else None
else:
m2 = GEAR_PATTERN_PCT.match(name)
if m2:
parent_name = m2.group(1).strip()
# 모선 매칭
parent_mmsi: Optional[str] = None
parent_vid: Optional[int] = None
if parent_name:
vid = self._name_cn_map.get(parent_name)
if not vid:
vid = self._name_en_map.get(parent_name.lower())
if vid:
parent_vid = vid
parent_mmsi = self._vessels[vid].get('mmsi')
match_method: Optional[str] = 'NAME_PARENT' if parent_vid else None
confidence = 0.9 if parent_vid else 0.0
# 기존 활성 행 조회
cur.execute(
"""SELECT id, name FROM kcg.gear_identity_log
WHERE mmsi = %s AND is_active = TRUE""",
(mmsi,),
)
existing = cur.fetchone()
if existing:
if existing[1] == name:
# 같은 MMSI + 같은 이름 → 위치/시간 업데이트
cur.execute(
"""UPDATE kcg.gear_identity_log
SET last_seen_at = %s, lat = %s, lon = %s
WHERE id = %s""",
(now, lat, lon, existing[0]),
)
else:
# 같은 MMSI + 다른 이름 → 이전 비활성화 + 새 행
cur.execute(
'UPDATE kcg.gear_identity_log SET is_active = FALSE WHERE id = %s',
(existing[0],),
)
cur.execute(
"""INSERT INTO kcg.gear_identity_log
(mmsi, name, parent_name, parent_mmsi, parent_vessel_id,
gear_index_1, gear_index_2, lat, lon,
match_method, match_confidence, first_seen_at, last_seen_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
(mmsi, name, parent_name, parent_mmsi, parent_vid,
idx1, idx2, lat, lon,
match_method, confidence, now, now),
)
else:
# 새 MMSI → 같은 이름이 다른 MMSI로 있는지 확인
cur.execute(
"""SELECT id, mmsi FROM kcg.gear_identity_log
WHERE name = %s AND is_active = TRUE AND mmsi != %s""",
(name, mmsi),
)
old_mmsi_row = cur.fetchone()
if old_mmsi_row:
# 같은 이름 + 다른 MMSI → MMSI 변경
cur.execute(
'UPDATE kcg.gear_identity_log SET is_active = FALSE WHERE id = %s',
(old_mmsi_row[0],),
)
logger.info('gear MMSI change: %s%s (name=%s)', old_mmsi_row[1], mmsi, name)
cur.execute(
"""INSERT INTO kcg.gear_identity_log
(mmsi, name, parent_name, parent_mmsi, parent_vessel_id,
gear_index_1, gear_index_2, lat, lon,
match_method, match_confidence, first_seen_at, last_seen_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
(mmsi, name, parent_name, parent_mmsi, parent_vid,
idx1, idx2, lat, lon,
match_method, confidence, now, now),
)
conn.commit()
cur.close()
def build_fleet_clusters(self, vessel_dfs: dict[str, pd.DataFrame]) -> dict[str, dict]:
"""등록 선단 기준으로 cluster 정보 구성.
Returns: {mmsi {cluster_id, cluster_size, is_leader, fleet_role}}
cluster_id = company_id (등록 선단 기준)
"""
results: dict[str, dict] = {}
# 회사별로 현재 AIS 수신 중인 선박 그룹핑
company_vessels: dict[int, list[str]] = {}
for mmsi, vid in self._mmsi_to_vid.items():
v = self._vessels.get(vid)
if not v or mmsi not in vessel_dfs:
continue
cid = v['company_id']
company_vessels.setdefault(cid, []).append(mmsi)
for cid, mmsis in company_vessels.items():
if len(mmsis) < 2:
# 단독 선박 → NOISE
for mmsi in mmsis:
v = self._vessels.get(self._mmsi_to_vid.get(mmsi, -1), {})
results[mmsi] = {
'cluster_id': -1,
'cluster_size': 1,
'is_leader': False,
'fleet_role': v.get('fleet_role', 'NOISE'),
}
continue
# 2척 이상 → 등록 선단 클러스터
for mmsi in mmsis:
vid = self._mmsi_to_vid[mmsi]
v = self._vessels[vid]
results[mmsi] = {
'cluster_id': cid,
'cluster_size': len(mmsis),
'is_leader': v['fleet_role'] == 'MAIN',
'fleet_role': v['fleet_role'],
}
# 매칭 안 된 선박 → NOISE
for mmsi in vessel_dfs:
if mmsi not in results:
results[mmsi] = {
'cluster_id': -1,
'cluster_size': 0,
'is_leader': False,
'fleet_role': 'NOISE',
}
return results
def save_snapshot(self, vessel_dfs: dict[str, pd.DataFrame], conn) -> None:
"""fleet_tracking_snapshot 저장."""
now = datetime.now(timezone.utc)
cur = conn.cursor()
company_vessels: dict[int, list[str]] = {}
for mmsi, vid in self._mmsi_to_vid.items():
v = self._vessels.get(vid)
if not v or mmsi not in vessel_dfs:
continue
company_vessels.setdefault(v['company_id'], []).append(mmsi)
for cid, mmsis in company_vessels.items():
active = len(mmsis)
total = sum(1 for v in self._vessels.values() if v['company_id'] == cid)
lats: list[float] = []
lons: list[float] = []
for mmsi in mmsis:
df = vessel_dfs.get(mmsi)
if df is not None and len(df) > 0:
last = df.iloc[-1]
lats.append(float(last['lat']))
lons.append(float(last['lon']))
center_lat = sum(lats) / len(lats) if lats else None
center_lon = sum(lons) / len(lons) if lons else None
cur.execute(
"""INSERT INTO kcg.fleet_tracking_snapshot
(company_id, snapshot_time, total_vessels, active_vessels,
center_lat, center_lon)
VALUES (%s, %s, %s, %s, %s, %s)""",
(cid, now, total, active, center_lat, center_lon),
)
conn.commit()
cur.close()
logger.info('fleet snapshot saved: %d companies', len(company_vessels))
# 싱글턴
fleet_tracker = FleetTracker()

파일 보기

@ -25,6 +25,7 @@ def get_last_run() -> dict:
def run_analysis_cycle(): def run_analysis_cycle():
"""5분 주기 분석 사이클 — 인메모리 캐시 기반.""" """5분 주기 분석 사이클 — 인메모리 캐시 기반."""
import re as _re
from cache.vessel_store import vessel_store from cache.vessel_store import vessel_store
from db import snpdb, kcgdb from db import snpdb, kcgdb
from pipeline.orchestrator import ChineseFishingVesselPipeline from pipeline.orchestrator import ChineseFishingVesselPipeline
@ -32,8 +33,8 @@ def run_analysis_cycle():
from algorithms.fishing_pattern import compute_ucaf_score, compute_ucft_score from algorithms.fishing_pattern import compute_ucaf_score, compute_ucft_score
from algorithms.dark_vessel import is_dark_vessel from algorithms.dark_vessel import is_dark_vessel
from algorithms.spoofing import compute_spoofing_score, count_speed_jumps, compute_bd09_offset from algorithms.spoofing import compute_spoofing_score, count_speed_jumps, compute_bd09_offset
from algorithms.fleet import assign_fleet_roles
from algorithms.risk import compute_vessel_risk_score from algorithms.risk import compute_vessel_risk_score
from fleet_tracker import fleet_tracker
from models.result import AnalysisResult from models.result import AnalysisResult
start = time.time() start = time.time()
@ -71,9 +72,30 @@ def run_analysis_cycle():
_last_run['vessel_count'] = 0 _last_run['vessel_count'] = 0
return return
# 4. 선단 역할 분석 # 4. 등록 선단 기반 fleet 분석
cluster_map = {c['mmsi']: c['cluster_id'] for c in classifications} _gear_re = _re.compile(r'^.+_\d+_\d*$|%$')
fleet_roles = assign_fleet_roles(vessel_dfs, cluster_map) with kcgdb.get_conn() as kcg_conn:
fleet_tracker.load_registry(kcg_conn)
all_ais = []
for mmsi, df in vessel_dfs.items():
if len(df) > 0:
last = df.iloc[-1]
all_ais.append({
'mmsi': mmsi,
'name': vessel_store.get_vessel_info(mmsi).get('name', ''),
'lat': float(last['lat']),
'lon': float(last['lon']),
})
fleet_tracker.match_ais_to_registry(all_ais, kcg_conn)
gear_signals = [v for v in all_ais if _gear_re.match(v.get('name', ''))]
fleet_tracker.track_gear_identity(gear_signals, kcg_conn)
fleet_roles = fleet_tracker.build_fleet_clusters(vessel_dfs)
fleet_tracker.save_snapshot(vessel_dfs, kcg_conn)
# 5. 선박별 추가 알고리즘 → AnalysisResult 생성 # 5. 선박별 추가 알고리즘 → AnalysisResult 생성
results = [] results = []

파일 보기

@ -0,0 +1,176 @@
"""선단 구성 JSX → kcgdb fleet_companies + fleet_vessels 적재.
Usage: python3 prediction/scripts/load_fleet_registry.py
"""
import json
import re
import sys
from pathlib import Path
import psycopg2
import psycopg2.extras
# JSX 파일에서 D 배열 추출
JSX_PATH = Path(__file__).parent.parent.parent.parent / 'gc-wing-dev' / 'legacy' / '선단구성_906척_어업수역 (1).jsx'
# kcgdb 접속 — prediction/.env 또는 환경변수
DB_HOST = '211.208.115.83'
DB_PORT = 5432
DB_NAME = 'kcgdb'
DB_USER = 'kcg_app'
DB_SCHEMA = 'kcg'
def parse_jsx(path: Path) -> list[list]:
"""JSX 파일에서 D=[ ... ] 배열을 파싱."""
text = path.read_text(encoding='utf-8')
# const D=[ 부터 ]; 까지 추출
m = re.search(r'const\s+D\s*=\s*\[', text)
if not m:
raise ValueError('D 배열을 찾을 수 없습니다')
start = m.end() - 1 # [ 위치
# 중첩 배열을 추적하여 닫는 ] 찾기
depth = 0
end = start
for i in range(start, len(text)):
if text[i] == '[':
depth += 1
elif text[i] == ']':
depth -= 1
if depth == 0:
end = i + 1
break
raw = text[start:end]
# JavaScript → JSON 변환 (trailing comma 제거)
raw = re.sub(r',\s*]', ']', raw)
raw = re.sub(r',\s*}', '}', raw)
return json.loads(raw)
def load_to_db(data: list[list], db_password: str):
"""파싱된 데이터를 DB에 적재."""
conn = psycopg2.connect(
host=DB_HOST, port=DB_PORT, dbname=DB_NAME,
user=DB_USER, password=db_password,
options=f'-c search_path={DB_SCHEMA}',
)
conn.autocommit = False
cur = conn.cursor()
try:
# 기존 데이터 초기화
cur.execute('DELETE FROM fleet_vessels')
cur.execute('DELETE FROM fleet_companies')
company_count = 0
vessel_count = 0
pair_links = [] # (vessel_id, pair_vessel_id) 후처리
for row in data:
if len(row) < 7:
continue
name_cn = row[0]
name_en = row[1]
# 회사 INSERT
cur.execute(
'INSERT INTO fleet_companies (name_cn, name_en) VALUES (%s, %s) RETURNING id',
(name_cn, name_en),
)
company_id = cur.fetchone()[0]
company_count += 1
# 인덱스: 0=own, 1=ownEn, 2=pairs, 3=gn, 4=ot, 5=ps, 6=fc, 7=upt, 8=upts
pairs = row[2] if len(row) > 2 and isinstance(row[2], list) else []
gn = row[3] if len(row) > 3 and isinstance(row[3], list) else []
ot = row[4] if len(row) > 4 and isinstance(row[4], list) else []
ps = row[5] if len(row) > 5 and isinstance(row[5], list) else []
fc = row[6] if len(row) > 6 and isinstance(row[6], list) else []
upt = row[7] if len(row) > 7 and isinstance(row[7], list) else []
upts = row[8] if len(row) > 8 and isinstance(row[8], list) else []
def insert_vessel(v, gear_code, role):
nonlocal vessel_count
if not isinstance(v, list) or len(v) < 4:
return None
cur.execute(
'''INSERT INTO fleet_vessels
(company_id, permit_no, name_cn, name_en, tonnage, gear_code, fleet_role)
VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id''',
(company_id, v[0], v[1], v[2], v[3], gear_code, role),
)
vessel_count += 1
return cur.fetchone()[0]
# PT 본선쌍 (pairs)
for pair in pairs:
if not isinstance(pair, list) or len(pair) < 2:
continue
main_id = insert_vessel(pair[0], 'C21', 'MAIN')
sub_id = insert_vessel(pair[1], 'C21', 'SUB')
if main_id and sub_id:
pair_links.append((main_id, sub_id))
# GN 유자망
for v in gn:
insert_vessel(v, 'C25', 'GN')
# OT 기타
for v in ot:
insert_vessel(v, 'C22', 'OT')
# PS 선망
for v in ps:
insert_vessel(v, 'C23', 'PS')
# FC 운반선
for v in fc:
insert_vessel(v, 'C40', 'FC')
# UPT 단독 본선
for v in upt:
insert_vessel(v, 'C21', 'MAIN_SOLO')
# UPTS 단독 부속선
for v in upts:
insert_vessel(v, 'C21', 'SUB_SOLO')
# PT 쌍 상호 참조 설정
for main_id, sub_id in pair_links:
cur.execute('UPDATE fleet_vessels SET pair_vessel_id = %s WHERE id = %s', (sub_id, main_id))
cur.execute('UPDATE fleet_vessels SET pair_vessel_id = %s WHERE id = %s', (main_id, sub_id))
conn.commit()
print(f'적재 완료: {company_count}개 회사, {vessel_count}척 선박, {len(pair_links)}쌍 PT')
except Exception as e:
conn.rollback()
print(f'적재 실패: {e}', file=sys.stderr)
raise
finally:
cur.close()
conn.close()
if __name__ == '__main__':
if not JSX_PATH.exists():
print(f'파일을 찾을 수 없습니다: {JSX_PATH}', file=sys.stderr)
sys.exit(1)
# DB 비밀번호 — 환경변수 또는 직접 입력
import os
password = os.environ.get('KCGDB_PASSWORD', 'Kcg2026monitor')
print(f'JSX 파싱: {JSX_PATH}')
data = parse_jsx(JSX_PATH)
print(f'파싱 완료: {len(data)}개 회사')
print('DB 적재 시작...')
load_to_db(data, password)