- V014: fleet_vessels, fleet_tracking_snapshot, gear_identity_log, gear_correlation_scores/raw_metrics, correlation_param_models, group_polygon_snapshots, gear_group_episodes/episode_snapshots, gear_group_parent_candidate_snapshots, gear_parent_label_tracking_cycles, system_config 테이블 추가 - V015: 점수/비율 NUMERIC precision 일괄 확대 (score→7,4 / pct→12,2) + vessel_analysis_results UNIQUE(mmsi, analyzed_at) 인덱스 추가 - prediction kcgdb.py: timestamp→analyzed_at, zone→zone_code, is_leader→fleet_is_leader, is_transship_suspect→transship_suspect 매핑 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
330 lines
12 KiB
Python
330 lines
12 KiB
Python
import json
|
|
import logging
|
|
from contextlib import contextmanager
|
|
from typing import TYPE_CHECKING, Optional
|
|
|
|
import psycopg2
|
|
from psycopg2 import pool
|
|
from psycopg2.extras import execute_values
|
|
|
|
from config import qualified_table, settings
|
|
|
|
if TYPE_CHECKING:
|
|
from models.result import AnalysisResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_pool: Optional[pool.ThreadedConnectionPool] = None
|
|
GROUP_POLYGON_SNAPSHOTS = qualified_table('group_polygon_snapshots')
|
|
|
|
|
|
def init_pool():
|
|
global _pool
|
|
_pool = pool.ThreadedConnectionPool(
|
|
minconn=1,
|
|
maxconn=5,
|
|
host=settings.KCGDB_HOST,
|
|
port=settings.KCGDB_PORT,
|
|
dbname=settings.KCGDB_NAME,
|
|
user=settings.KCGDB_USER,
|
|
password=settings.KCGDB_PASSWORD,
|
|
options=f'-c search_path={settings.KCGDB_SCHEMA},public',
|
|
)
|
|
logger.info('kcgdb connection pool initialized')
|
|
|
|
|
|
def close_pool():
|
|
global _pool
|
|
if _pool:
|
|
_pool.closeall()
|
|
_pool = None
|
|
logger.info('kcgdb connection pool closed')
|
|
|
|
|
|
@contextmanager
|
|
def get_conn():
|
|
conn = _pool.getconn()
|
|
try:
|
|
yield conn
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
_pool.putconn(conn)
|
|
|
|
|
|
def check_health() -> bool:
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute('SELECT 1')
|
|
return True
|
|
except Exception as e:
|
|
logger.error('kcgdb health check failed: %s', e)
|
|
return False
|
|
|
|
|
|
def upsert_results(results: list['AnalysisResult']) -> int:
|
|
"""분석 결과를 vessel_analysis_results 테이블에 upsert."""
|
|
if not results:
|
|
return 0
|
|
|
|
insert_sql = """
|
|
INSERT INTO vessel_analysis_results (
|
|
mmsi, analyzed_at, vessel_type, confidence, fishing_pct,
|
|
cluster_id, season, zone_code, dist_to_baseline_nm, activity_state,
|
|
ucaf_score, ucft_score, is_dark, gap_duration_min,
|
|
spoofing_score, bd09_offset_m, speed_jump_count,
|
|
fleet_cluster_id, fleet_is_leader, fleet_role,
|
|
risk_score, risk_level,
|
|
transship_suspect, transship_pair_mmsi, transship_duration_min,
|
|
features
|
|
) VALUES %s
|
|
ON CONFLICT (mmsi, analyzed_at) DO UPDATE SET
|
|
vessel_type = EXCLUDED.vessel_type,
|
|
confidence = EXCLUDED.confidence,
|
|
fishing_pct = EXCLUDED.fishing_pct,
|
|
cluster_id = EXCLUDED.cluster_id,
|
|
season = EXCLUDED.season,
|
|
zone_code = EXCLUDED.zone_code,
|
|
dist_to_baseline_nm = EXCLUDED.dist_to_baseline_nm,
|
|
activity_state = EXCLUDED.activity_state,
|
|
ucaf_score = EXCLUDED.ucaf_score,
|
|
ucft_score = EXCLUDED.ucft_score,
|
|
is_dark = EXCLUDED.is_dark,
|
|
gap_duration_min = EXCLUDED.gap_duration_min,
|
|
spoofing_score = EXCLUDED.spoofing_score,
|
|
bd09_offset_m = EXCLUDED.bd09_offset_m,
|
|
speed_jump_count = EXCLUDED.speed_jump_count,
|
|
fleet_cluster_id = EXCLUDED.fleet_cluster_id,
|
|
fleet_is_leader = EXCLUDED.fleet_is_leader,
|
|
fleet_role = EXCLUDED.fleet_role,
|
|
risk_score = EXCLUDED.risk_score,
|
|
risk_level = EXCLUDED.risk_level,
|
|
transship_suspect = EXCLUDED.transship_suspect,
|
|
transship_pair_mmsi = EXCLUDED.transship_pair_mmsi,
|
|
transship_duration_min = EXCLUDED.transship_duration_min,
|
|
features = EXCLUDED.features
|
|
"""
|
|
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
tuples = [r.to_db_tuple() for r in results]
|
|
execute_values(cur, insert_sql, tuples, page_size=100)
|
|
conn.commit()
|
|
count = len(tuples)
|
|
logger.info('upserted %d analysis results', count)
|
|
return count
|
|
except Exception as e:
|
|
logger.error('failed to upsert results: %s', e)
|
|
return 0
|
|
|
|
|
|
def cleanup_old(hours: int = 48) -> int:
|
|
"""오래된 분석 결과 삭제."""
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
'DELETE FROM vessel_analysis_results WHERE analyzed_at < NOW() - (%s * INTERVAL \'1 hour\')',
|
|
(hours,),
|
|
)
|
|
deleted = cur.rowcount
|
|
conn.commit()
|
|
if deleted > 0:
|
|
logger.info('cleaned up %d old results (older than %dh)', deleted, hours)
|
|
return deleted
|
|
except Exception as e:
|
|
logger.error('failed to cleanup old results: %s', e)
|
|
return 0
|
|
|
|
|
|
def save_group_snapshots(snapshots: list[dict]) -> int:
|
|
"""group_polygon_snapshots에 폴리곤 스냅샷 배치 INSERT.
|
|
|
|
snapshots: polygon_builder.build_all_group_snapshots() 결과
|
|
각 항목은: group_type, group_key, group_label, snapshot_time,
|
|
polygon_wkt (str|None), center_wkt (str|None),
|
|
area_sq_nm, member_count, zone_id, zone_name,
|
|
members (list[dict]), color
|
|
"""
|
|
if not snapshots:
|
|
return 0
|
|
|
|
insert_sql = f"""
|
|
INSERT INTO {GROUP_POLYGON_SNAPSHOTS} (
|
|
group_type, group_key, group_label, sub_cluster_id, resolution, snapshot_time,
|
|
polygon, center_point, area_sq_nm, member_count,
|
|
zone_id, zone_name, members, color
|
|
) VALUES (
|
|
%s, %s, %s, %s, %s, %s,
|
|
ST_GeomFromText(%s, 4326), ST_GeomFromText(%s, 4326),
|
|
%s, %s, %s, %s, %s::jsonb, %s
|
|
)
|
|
"""
|
|
|
|
inserted = 0
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
for s in snapshots:
|
|
cur.execute(
|
|
insert_sql,
|
|
(
|
|
s['group_type'],
|
|
s['group_key'],
|
|
s['group_label'],
|
|
s.get('sub_cluster_id', 0),
|
|
s.get('resolution', '6h'),
|
|
s['snapshot_time'],
|
|
s.get('polygon_wkt'),
|
|
s.get('center_wkt'),
|
|
s.get('area_sq_nm'),
|
|
s.get('member_count'),
|
|
s.get('zone_id'),
|
|
s.get('zone_name'),
|
|
json.dumps(s.get('members', []), ensure_ascii=False),
|
|
s.get('color'),
|
|
),
|
|
)
|
|
inserted += 1
|
|
conn.commit()
|
|
logger.info('saved %d group polygon snapshots', inserted)
|
|
return inserted
|
|
except Exception as e:
|
|
logger.error('failed to save group snapshots: %s', e)
|
|
return 0
|
|
|
|
|
|
def fetch_analysis_summary() -> dict:
|
|
"""최근 1시간 분석 결과 요약 (채팅 컨텍스트용)."""
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
# 위험도 분포
|
|
cur.execute("""
|
|
SELECT risk_level, COUNT(*) FROM vessel_analysis_results
|
|
WHERE analyzed_at > NOW() - INTERVAL '1 hour'
|
|
GROUP BY risk_level
|
|
""")
|
|
risk_dist = {row[0]: row[1] for row in cur.fetchall()}
|
|
|
|
# 수역별 분포
|
|
cur.execute("""
|
|
SELECT zone, COUNT(*) FROM vessel_analysis_results
|
|
WHERE analyzed_at > NOW() - INTERVAL '1 hour'
|
|
GROUP BY zone
|
|
""")
|
|
zone_dist = {row[0]: row[1] for row in cur.fetchall()}
|
|
|
|
# 다크/스푸핑/환적 카운트
|
|
cur.execute("""
|
|
SELECT
|
|
COUNT(*) FILTER (WHERE is_dark = TRUE) AS dark_count,
|
|
COUNT(*) FILTER (WHERE spoofing_score > 0.5) AS spoofing_count,
|
|
COUNT(*) FILTER (WHERE is_transship_suspect = TRUE) AS transship_count
|
|
FROM vessel_analysis_results
|
|
WHERE analyzed_at > NOW() - INTERVAL '1 hour'
|
|
""")
|
|
row = cur.fetchone()
|
|
|
|
result = {
|
|
'risk_distribution': {**risk_dist, **zone_dist},
|
|
'dark_count': row[0] if row else 0,
|
|
'spoofing_count': row[1] if row else 0,
|
|
'transship_count': row[2] if row else 0,
|
|
}
|
|
return result
|
|
except Exception as e:
|
|
logger.error('fetch_analysis_summary failed: %s', e)
|
|
return {'risk_distribution': {}, 'dark_count': 0, 'spoofing_count': 0, 'transship_count': 0}
|
|
|
|
|
|
def fetch_recent_high_risk(limit: int = 10) -> list[dict]:
|
|
"""위험도 상위 N척 선박 상세 (채팅 컨텍스트용)."""
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
SELECT mmsi, risk_score, risk_level, zone, is_dark,
|
|
is_transship_suspect, activity_state, spoofing_score
|
|
FROM vessel_analysis_results
|
|
WHERE analyzed_at > NOW() - INTERVAL '1 hour'
|
|
ORDER BY risk_score DESC
|
|
LIMIT %s
|
|
""", (limit,))
|
|
rows = cur.fetchall()
|
|
|
|
result = []
|
|
for row in rows:
|
|
result.append({
|
|
'mmsi': row[0],
|
|
'name': row[0], # vessel_store에서 이름 조회 필요시 보강
|
|
'risk_score': row[1],
|
|
'risk_level': row[2],
|
|
'zone': row[3],
|
|
'is_dark': row[4],
|
|
'is_transship': row[5],
|
|
'activity_state': row[6],
|
|
'spoofing_score': float(row[7]) if row[7] else 0.0,
|
|
})
|
|
return result
|
|
except Exception as e:
|
|
logger.error('fetch_recent_high_risk failed: %s', e)
|
|
return []
|
|
|
|
|
|
def fetch_polygon_summary() -> dict:
|
|
"""최신 그룹 폴리곤 요약 (채팅 컨텍스트용)."""
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(f"""
|
|
SELECT group_type, COUNT(*), SUM(member_count)
|
|
FROM {GROUP_POLYGON_SNAPSHOTS}
|
|
WHERE snapshot_time = (
|
|
SELECT MAX(snapshot_time) FROM {GROUP_POLYGON_SNAPSHOTS}
|
|
)
|
|
GROUP BY group_type
|
|
""")
|
|
rows = cur.fetchall()
|
|
|
|
result = {
|
|
'fleet_count': 0, 'fleet_members': 0,
|
|
'gear_in_zone': 0, 'gear_out_zone': 0,
|
|
}
|
|
for row in rows:
|
|
gtype, count, members = row[0], row[1], row[2] or 0
|
|
if gtype == 'FLEET':
|
|
result['fleet_count'] = count
|
|
result['fleet_members'] = members
|
|
elif gtype == 'GEAR_IN_ZONE':
|
|
result['gear_in_zone'] = count
|
|
elif gtype == 'GEAR_OUT_ZONE':
|
|
result['gear_out_zone'] = count
|
|
return result
|
|
except Exception as e:
|
|
logger.error('fetch_polygon_summary failed: %s', e)
|
|
return {'fleet_count': 0, 'fleet_members': 0, 'gear_in_zone': 0, 'gear_out_zone': 0}
|
|
|
|
|
|
def cleanup_group_snapshots(days: int = 7) -> int:
|
|
"""오래된 그룹 폴리곤 스냅샷 삭제."""
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
f"DELETE FROM {GROUP_POLYGON_SNAPSHOTS} "
|
|
"WHERE snapshot_time < NOW() - (%s * INTERVAL '1 day')",
|
|
(days,),
|
|
)
|
|
deleted = cur.rowcount
|
|
conn.commit()
|
|
if deleted > 0:
|
|
logger.info('cleaned up %d old group snapshots (older than %dd)', deleted, days)
|
|
return deleted
|
|
except Exception as e:
|
|
logger.error('failed to cleanup group snapshots: %s', e)
|
|
return 0
|