- 실시간 선박 13K: MapLibre symbol → deck.gl IconLayer (useShipDeckLayers + shipDeckStore) - 선단/어구 폴리곤: MapLibre Source/Layer → deck.gl GeoJsonLayer (useFleetClusterDeckLayers) - 선박 팝업: MapLibre Popup → React 오버레이 (ShipPopupOverlay + ShipHoverTooltip) - 리플레이 집중 모드 (focusMode), 라벨 클러스터링, fontScale 연동 - Python: group_key 고정 + sub_cluster_id 분리, 한국 국적 어구 오탐 제외 - DB: sub_cluster_id 컬럼 추가 + 기존 '#N' 데이터 마이그레이션 - Backend: DISTINCT ON CTE로 서브클러스터 중복 제거, subClusterId DTO 추가 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
327 lines
12 KiB
Python
327 lines
12 KiB
Python
import json
|
|
import logging
|
|
from contextlib import contextmanager
|
|
from typing import TYPE_CHECKING, Optional
|
|
|
|
import psycopg2
|
|
from psycopg2 import pool
|
|
from psycopg2.extras import execute_values
|
|
|
|
from config import settings
|
|
|
|
if TYPE_CHECKING:
|
|
from models.result import AnalysisResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_pool: Optional[pool.ThreadedConnectionPool] = None
|
|
|
|
|
|
def init_pool():
|
|
global _pool
|
|
_pool = pool.ThreadedConnectionPool(
|
|
minconn=1,
|
|
maxconn=5,
|
|
host=settings.KCGDB_HOST,
|
|
port=settings.KCGDB_PORT,
|
|
dbname=settings.KCGDB_NAME,
|
|
user=settings.KCGDB_USER,
|
|
password=settings.KCGDB_PASSWORD,
|
|
options=f'-c search_path={settings.KCGDB_SCHEMA},public',
|
|
)
|
|
logger.info('kcgdb connection pool initialized')
|
|
|
|
|
|
def close_pool():
|
|
global _pool
|
|
if _pool:
|
|
_pool.closeall()
|
|
_pool = None
|
|
logger.info('kcgdb connection pool closed')
|
|
|
|
|
|
@contextmanager
|
|
def get_conn():
|
|
conn = _pool.getconn()
|
|
try:
|
|
yield conn
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
_pool.putconn(conn)
|
|
|
|
|
|
def check_health() -> bool:
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute('SELECT 1')
|
|
return True
|
|
except Exception as e:
|
|
logger.error('kcgdb health check failed: %s', e)
|
|
return False
|
|
|
|
|
|
def upsert_results(results: list['AnalysisResult']) -> int:
|
|
"""분석 결과를 vessel_analysis_results 테이블에 upsert."""
|
|
if not results:
|
|
return 0
|
|
|
|
insert_sql = """
|
|
INSERT INTO vessel_analysis_results (
|
|
mmsi, timestamp, vessel_type, confidence, fishing_pct,
|
|
cluster_id, season, zone, dist_to_baseline_nm, activity_state,
|
|
ucaf_score, ucft_score, is_dark, gap_duration_min,
|
|
spoofing_score, bd09_offset_m, speed_jump_count,
|
|
cluster_size, is_leader, fleet_role,
|
|
risk_score, risk_level,
|
|
is_transship_suspect, transship_pair_mmsi, transship_duration_min,
|
|
features, analyzed_at
|
|
) VALUES %s
|
|
ON CONFLICT (mmsi, timestamp) DO UPDATE SET
|
|
vessel_type = EXCLUDED.vessel_type,
|
|
confidence = EXCLUDED.confidence,
|
|
fishing_pct = EXCLUDED.fishing_pct,
|
|
cluster_id = EXCLUDED.cluster_id,
|
|
season = EXCLUDED.season,
|
|
zone = EXCLUDED.zone,
|
|
dist_to_baseline_nm = EXCLUDED.dist_to_baseline_nm,
|
|
activity_state = EXCLUDED.activity_state,
|
|
ucaf_score = EXCLUDED.ucaf_score,
|
|
ucft_score = EXCLUDED.ucft_score,
|
|
is_dark = EXCLUDED.is_dark,
|
|
gap_duration_min = EXCLUDED.gap_duration_min,
|
|
spoofing_score = EXCLUDED.spoofing_score,
|
|
bd09_offset_m = EXCLUDED.bd09_offset_m,
|
|
speed_jump_count = EXCLUDED.speed_jump_count,
|
|
cluster_size = EXCLUDED.cluster_size,
|
|
is_leader = EXCLUDED.is_leader,
|
|
fleet_role = EXCLUDED.fleet_role,
|
|
risk_score = EXCLUDED.risk_score,
|
|
risk_level = EXCLUDED.risk_level,
|
|
is_transship_suspect = EXCLUDED.is_transship_suspect,
|
|
transship_pair_mmsi = EXCLUDED.transship_pair_mmsi,
|
|
transship_duration_min = EXCLUDED.transship_duration_min,
|
|
features = EXCLUDED.features,
|
|
analyzed_at = EXCLUDED.analyzed_at
|
|
"""
|
|
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
tuples = [r.to_db_tuple() for r in results]
|
|
execute_values(cur, insert_sql, tuples, page_size=100)
|
|
conn.commit()
|
|
count = len(tuples)
|
|
logger.info('upserted %d analysis results', count)
|
|
return count
|
|
except Exception as e:
|
|
logger.error('failed to upsert results: %s', e)
|
|
return 0
|
|
|
|
|
|
def cleanup_old(hours: int = 48) -> int:
|
|
"""오래된 분석 결과 삭제."""
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
'DELETE FROM vessel_analysis_results WHERE analyzed_at < NOW() - (%s * INTERVAL \'1 hour\')',
|
|
(hours,),
|
|
)
|
|
deleted = cur.rowcount
|
|
conn.commit()
|
|
if deleted > 0:
|
|
logger.info('cleaned up %d old results (older than %dh)', deleted, hours)
|
|
return deleted
|
|
except Exception as e:
|
|
logger.error('failed to cleanup old results: %s', e)
|
|
return 0
|
|
|
|
|
|
def save_group_snapshots(snapshots: list[dict]) -> int:
|
|
"""group_polygon_snapshots에 폴리곤 스냅샷 배치 INSERT.
|
|
|
|
snapshots: polygon_builder.build_all_group_snapshots() 결과
|
|
각 항목은: group_type, group_key, group_label, snapshot_time,
|
|
polygon_wkt (str|None), center_wkt (str|None),
|
|
area_sq_nm, member_count, zone_id, zone_name,
|
|
members (list[dict]), color
|
|
"""
|
|
if not snapshots:
|
|
return 0
|
|
|
|
insert_sql = """
|
|
INSERT INTO kcg.group_polygon_snapshots (
|
|
group_type, group_key, group_label, sub_cluster_id, snapshot_time,
|
|
polygon, center_point, area_sq_nm, member_count,
|
|
zone_id, zone_name, members, color
|
|
) VALUES (
|
|
%s, %s, %s, %s, %s,
|
|
ST_GeomFromText(%s, 4326), ST_GeomFromText(%s, 4326),
|
|
%s, %s, %s, %s, %s::jsonb, %s
|
|
)
|
|
"""
|
|
|
|
inserted = 0
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
for s in snapshots:
|
|
cur.execute(
|
|
insert_sql,
|
|
(
|
|
s['group_type'],
|
|
s['group_key'],
|
|
s['group_label'],
|
|
s.get('sub_cluster_id', 0),
|
|
s['snapshot_time'],
|
|
s.get('polygon_wkt'),
|
|
s.get('center_wkt'),
|
|
s.get('area_sq_nm'),
|
|
s.get('member_count'),
|
|
s.get('zone_id'),
|
|
s.get('zone_name'),
|
|
json.dumps(s.get('members', []), ensure_ascii=False),
|
|
s.get('color'),
|
|
),
|
|
)
|
|
inserted += 1
|
|
conn.commit()
|
|
logger.info('saved %d group polygon snapshots', inserted)
|
|
return inserted
|
|
except Exception as e:
|
|
logger.error('failed to save group snapshots: %s', e)
|
|
return 0
|
|
|
|
|
|
def fetch_analysis_summary() -> dict:
|
|
"""최근 1시간 분석 결과 요약 (채팅 컨텍스트용)."""
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
# 위험도 분포
|
|
cur.execute("""
|
|
SELECT risk_level, COUNT(*) FROM vessel_analysis_results
|
|
WHERE analyzed_at > NOW() - INTERVAL '1 hour'
|
|
GROUP BY risk_level
|
|
""")
|
|
risk_dist = {row[0]: row[1] for row in cur.fetchall()}
|
|
|
|
# 수역별 분포
|
|
cur.execute("""
|
|
SELECT zone, COUNT(*) FROM vessel_analysis_results
|
|
WHERE analyzed_at > NOW() - INTERVAL '1 hour'
|
|
GROUP BY zone
|
|
""")
|
|
zone_dist = {row[0]: row[1] for row in cur.fetchall()}
|
|
|
|
# 다크/스푸핑/환적 카운트
|
|
cur.execute("""
|
|
SELECT
|
|
COUNT(*) FILTER (WHERE is_dark = TRUE) AS dark_count,
|
|
COUNT(*) FILTER (WHERE spoofing_score > 0.5) AS spoofing_count,
|
|
COUNT(*) FILTER (WHERE is_transship_suspect = TRUE) AS transship_count
|
|
FROM vessel_analysis_results
|
|
WHERE analyzed_at > NOW() - INTERVAL '1 hour'
|
|
""")
|
|
row = cur.fetchone()
|
|
|
|
result = {
|
|
'risk_distribution': {**risk_dist, **zone_dist},
|
|
'dark_count': row[0] if row else 0,
|
|
'spoofing_count': row[1] if row else 0,
|
|
'transship_count': row[2] if row else 0,
|
|
}
|
|
return result
|
|
except Exception as e:
|
|
logger.error('fetch_analysis_summary failed: %s', e)
|
|
return {'risk_distribution': {}, 'dark_count': 0, 'spoofing_count': 0, 'transship_count': 0}
|
|
|
|
|
|
def fetch_recent_high_risk(limit: int = 10) -> list[dict]:
|
|
"""위험도 상위 N척 선박 상세 (채팅 컨텍스트용)."""
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
SELECT mmsi, risk_score, risk_level, zone, is_dark,
|
|
is_transship_suspect, activity_state, spoofing_score
|
|
FROM vessel_analysis_results
|
|
WHERE analyzed_at > NOW() - INTERVAL '1 hour'
|
|
ORDER BY risk_score DESC
|
|
LIMIT %s
|
|
""", (limit,))
|
|
rows = cur.fetchall()
|
|
|
|
result = []
|
|
for row in rows:
|
|
result.append({
|
|
'mmsi': row[0],
|
|
'name': row[0], # vessel_store에서 이름 조회 필요시 보강
|
|
'risk_score': row[1],
|
|
'risk_level': row[2],
|
|
'zone': row[3],
|
|
'is_dark': row[4],
|
|
'is_transship': row[5],
|
|
'activity_state': row[6],
|
|
'spoofing_score': float(row[7]) if row[7] else 0.0,
|
|
})
|
|
return result
|
|
except Exception as e:
|
|
logger.error('fetch_recent_high_risk failed: %s', e)
|
|
return []
|
|
|
|
|
|
def fetch_polygon_summary() -> dict:
|
|
"""최신 그룹 폴리곤 요약 (채팅 컨텍스트용)."""
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
SELECT group_type, COUNT(*), SUM(member_count)
|
|
FROM kcg.group_polygon_snapshots
|
|
WHERE snapshot_time = (
|
|
SELECT MAX(snapshot_time) FROM kcg.group_polygon_snapshots
|
|
)
|
|
GROUP BY group_type
|
|
""")
|
|
rows = cur.fetchall()
|
|
|
|
result = {
|
|
'fleet_count': 0, 'fleet_members': 0,
|
|
'gear_in_zone': 0, 'gear_out_zone': 0,
|
|
}
|
|
for row in rows:
|
|
gtype, count, members = row[0], row[1], row[2] or 0
|
|
if gtype == 'FLEET':
|
|
result['fleet_count'] = count
|
|
result['fleet_members'] = members
|
|
elif gtype == 'GEAR_IN_ZONE':
|
|
result['gear_in_zone'] = count
|
|
elif gtype == 'GEAR_OUT_ZONE':
|
|
result['gear_out_zone'] = count
|
|
return result
|
|
except Exception as e:
|
|
logger.error('fetch_polygon_summary failed: %s', e)
|
|
return {'fleet_count': 0, 'fleet_members': 0, 'gear_in_zone': 0, 'gear_out_zone': 0}
|
|
|
|
|
|
def cleanup_group_snapshots(days: int = 7) -> int:
|
|
"""오래된 그룹 폴리곤 스냅샷 삭제."""
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
f"DELETE FROM kcg.group_polygon_snapshots WHERE snapshot_time < NOW() - INTERVAL '{days} days'",
|
|
)
|
|
deleted = cur.rowcount
|
|
conn.commit()
|
|
if deleted > 0:
|
|
logger.info('cleaned up %d old group snapshots (older than %dd)', deleted, days)
|
|
return deleted
|
|
except Exception as e:
|
|
logger.error('failed to cleanup group snapshots: %s', e)
|
|
return 0
|