kcg-ai-monitoring/prediction/chat/tools.py
htlee e2fc355b2c feat: S2 prediction 분석 엔진 모노레포 이식
iran prediction 47개 Python 파일을 prediction/ 디렉토리로 복제:
- algorithms/ 14개 분석 알고리즘 (어구추론, 다크베셀, 스푸핑, 환적, 위험도 등)
- pipeline/ 7단계 분류 파이프라인
- cache/vessel_store (24h 슬라이딩 윈도우)
- db/ 어댑터 (snpdb 원본조회, kcgdb 결과저장)
- chat/ AI 채팅 (Ollama, 후순위)
- data/ 정적 데이터 (기선, 특정어업수역 GeoJSON)

config.py를 kcgaidb로 재구성 (DB명, 사용자, 비밀번호)
DB 연결 검증 완료 (kcgaidb 37개 테이블 접근 확인)
Makefile에 dev-prediction / dev-all 타겟 추가
CLAUDE.md에 prediction 섹션 추가

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 12:56:51 +09:00

421 lines
15 KiB
Python

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""LLM Tool Calling 실행기 — 사전 쿼리 + 동적 DB 조회."""
import json
import logging
import re
from typing import Optional
from config import qualified_table
logger = logging.getLogger(__name__)
VESSEL_ANALYSIS_RESULTS = qualified_table('vessel_analysis_results')
FLEET_VESSELS = qualified_table('fleet_vessels')
GROUP_POLYGON_SNAPSHOTS = qualified_table('group_polygon_snapshots')
GEAR_CORRELATION_SCORES = qualified_table('gear_correlation_scores')
CORRELATION_PARAM_MODELS = qualified_table('correlation_param_models')
# ── 사전 쿼리 패턴 (키워드 기반, 1회 왕복으로 해결) ──
_ZONE_MAP = {
'수역1': 'ZONE_I', '수역 1': 'ZONE_I', '수역I': 'ZONE_I', 'ZONE_I': 'ZONE_I', '수역i': 'ZONE_I',
'수역2': 'ZONE_II', '수역 2': 'ZONE_II', '수역II': 'ZONE_II', 'ZONE_II': 'ZONE_II',
'수역3': 'ZONE_III', '수역 3': 'ZONE_III', '수역III': 'ZONE_III', 'ZONE_III': 'ZONE_III',
'수역4': 'ZONE_IV', '수역 4': 'ZONE_IV', '수역IV': 'ZONE_IV', 'ZONE_IV': 'ZONE_IV',
'영해': 'TERRITORIAL_SEA', '접속수역': 'CONTIGUOUS_ZONE',
}
_ACTIVITY_MAP = {
'조업': 'FISHING', '어로': 'FISHING', '조업중': 'FISHING', '조업활동': 'FISHING',
'정박': 'STATIONARY', '정지': 'STATIONARY', '대기': 'STATIONARY',
'항행': 'SAILING', '이동': 'SAILING', '항해': 'SAILING',
}
_RISK_MAP = {
'크리티컬': 'CRITICAL', 'critical': 'CRITICAL', '긴급': 'CRITICAL',
'워치': 'HIGH', 'watch': 'HIGH', '경고': 'HIGH', '고위험': 'HIGH',
'모니터': 'MEDIUM', 'monitor': 'MEDIUM', '주의': 'MEDIUM',
'위험': None, # 위험 선박 → CRITICAL+HIGH
}
_DARK_KEYWORDS = ['다크', '다크베셀', 'dark', 'ais 차단', 'ais차단', '신호차단']
_TRANSSHIP_KEYWORDS = ['환적', 'transshipment', '전재']
_SPOOF_KEYWORDS = ['스푸핑', 'spoofing', 'gps 조작', 'gps조작', '위치조작']
def detect_prequery(message: str) -> Optional[dict]:
"""사용자 메시지에서 사전 쿼리 패턴을 감지하여 DB 조회 파라미터 반환."""
msg = message.lower().strip()
params: dict = {}
# 수역 감지
for keyword, zone in _ZONE_MAP.items():
if keyword.lower() in msg:
params['zone'] = zone
break
# 활동 감지
for keyword, activity in _ACTIVITY_MAP.items():
if keyword in msg:
params['activity'] = activity
break
# 위험도 감지
for keyword, level in _RISK_MAP.items():
if keyword in msg:
if level:
params['risk_level'] = level
else:
params['risk_levels'] = ['CRITICAL', 'HIGH']
break
# 다크베셀 감지
if any(k in msg for k in _DARK_KEYWORDS):
params['is_dark'] = True
# 환적 감지
if any(k in msg for k in _TRANSSHIP_KEYWORDS):
params['is_transship'] = True
# 스푸핑 감지
if any(k in msg for k in _SPOOF_KEYWORDS):
params['spoofing'] = True
return params if params else None
def execute_prequery(params: dict) -> str:
"""사전 쿼리 패턴에 해당하는 DB 조회를 실행하여 결과를 텍스트로 반환."""
try:
from db import kcgdb
conditions = ["analyzed_at > NOW() - INTERVAL '1 hour'"]
bind_params: list = []
if 'zone' in params:
conditions.append('zone = %s')
bind_params.append(params['zone'])
if 'activity' in params:
conditions.append('activity_state = %s')
bind_params.append(params['activity'])
if 'risk_level' in params:
conditions.append('risk_level = %s')
bind_params.append(params['risk_level'])
elif 'risk_levels' in params:
placeholders = ','.join(['%s'] * len(params['risk_levels']))
conditions.append(f'risk_level IN ({placeholders})')
bind_params.extend(params['risk_levels'])
if params.get('is_dark'):
conditions.append('is_dark = TRUE')
if params.get('is_transship'):
conditions.append('is_transship_suspect = TRUE')
if params.get('spoofing'):
conditions.append('spoofing_score > 0.5')
where = ' AND '.join(conditions)
query = f"""
SELECT v.mmsi, v.risk_score, v.risk_level, v.zone, v.activity_state,
v.vessel_type, v.is_dark, v.gap_duration_min, v.spoofing_score,
v.cluster_id, v.cluster_size, v.dist_to_baseline_nm,
v.is_transship_suspect, v.transship_pair_mmsi,
fv.permit_no, fv.name_cn, fv.gear_code
FROM {VESSEL_ANALYSIS_RESULTS} v
LEFT JOIN {FLEET_VESSELS} fv ON v.mmsi = fv.mmsi
WHERE {where}
ORDER BY v.risk_score DESC
LIMIT 30
"""
with kcgdb.get_conn() as conn:
with conn.cursor() as cur:
cur.execute(query, bind_params)
rows = cur.fetchall()
if not rows:
return '\n## 조회 결과\n해당 조건에 맞는 선박이 없습니다.\n'
# 결과를 간략 테이블로 구성 (토큰 절약)
lines = [f'\n## 조회 결과 ({len(rows)}척)']
lines.append('| MMSI | 점수 | 수역 | 활동 | 허가 | 다크 |')
lines.append('|---|---|---|---|---|---|')
for row in rows[:15]: # 최대 15척
mmsi, risk_score, risk_level, zone, activity, vtype, is_dark, gap, spoof, \
cid, csize, dist_nm, is_trans, trans_pair, permit, name_cn, gear = row
permit_str = 'Y' if permit else 'N'
dark_str = 'Y' if is_dark else '-'
lines.append(f'| {mmsi} | {risk_score} | {zone} | {activity} | {permit_str} | {dark_str} |')
return '\n'.join(lines)
except Exception as e:
logger.error('prequery execution failed: %s', e)
return f'\n(DB 조회 실패: {e})\n'
# ── LLM Tool Calling 응답 파싱 + 실행 ──
_TOOL_CALL_PATTERN = re.compile(
r'\{"tool"\s*:\s*"(\w+)"\s*,\s*"params"\s*:\s*(\{[^}]+\})\}',
)
def parse_tool_calls(llm_response: str) -> list[dict]:
"""LLM 응답에서 tool call JSON을 추출."""
calls = []
for match in _TOOL_CALL_PATTERN.finditer(llm_response):
try:
tool_name = match.group(1)
params = json.loads(match.group(2))
calls.append({'tool': tool_name, 'params': params})
except json.JSONDecodeError:
continue
return calls[:2] # 최대 2개
def execute_tool_call(call: dict) -> str:
"""단일 tool call 실행."""
tool = call.get('tool', '')
params = call.get('params', {})
if tool == 'query_vessels':
return execute_prequery(params)
if tool == 'query_vessel_detail':
mmsi = params.get('mmsi', '')
if mmsi:
from chat.context_builder import _build_vessel_detail
return _build_vessel_detail(mmsi)
return '(MMSI 미지정)'
if tool == 'query_fleet_group':
return _query_fleet_group(params)
if tool == 'query_vessel_history':
return _query_vessel_history(params)
if tool == 'query_vessel_static':
return _query_vessel_static(params)
if tool == 'get_knowledge':
return _get_knowledge(params)
if tool == 'query_gear_correlation':
return _query_gear_correlation(params)
return f'(알 수 없는 도구: {tool})'
def _get_knowledge(params: dict) -> str:
"""도메인 지식 섹션 조회."""
key = params.get('key', '')
if not key:
return '(key 미지정. 사용 가능: maritime_zones, fishing_agreement, algorithm_guide, response_guide, db_schema)'
from chat.domain_knowledge import get_knowledge_section
return get_knowledge_section(key)
def _query_fleet_group(params: dict) -> str:
"""선단/어구 그룹 조회."""
try:
from db import kcgdb
conditions = [f"snapshot_time = (SELECT MAX(snapshot_time) FROM {GROUP_POLYGON_SNAPSHOTS})"]
bind_params: list = []
if 'group_type' in params:
conditions.append('group_type = %s')
bind_params.append(params['group_type'])
if 'zone_id' in params:
conditions.append('zone_id = %s')
bind_params.append(params['zone_id'])
where = ' AND '.join(conditions)
query = f"""
SELECT group_type, group_key, group_label, member_count, zone_name, members
FROM {GROUP_POLYGON_SNAPSHOTS}
WHERE {where}
ORDER BY member_count DESC
LIMIT 20
"""
with kcgdb.get_conn() as conn:
with conn.cursor() as cur:
cur.execute(query, bind_params)
rows = cur.fetchall()
if not rows:
return '\n(해당 조건의 그룹 없음)\n'
lines = [f'\n## 그룹 조회 결과 ({len(rows)}건)']
lines.append('| 유형 | 키 | 라벨 | 선박수 | 수역 |')
lines.append('|---|---|---|---|---|')
for row in rows:
gtype, gkey, glabel, mcount, zname, members = row
lines.append(f'| {gtype} | {gkey} | {glabel or "-"} | {mcount} | {zname or "-"} |')
return '\n'.join(lines)
except Exception as e:
logger.error('fleet group query failed: %s', e)
return f'\n(그룹 조회 실패: {e})\n'
def _query_vessel_history(params: dict) -> str:
"""snpdb에서 선박 항적 이력 조회 (daily 집계)."""
try:
from db import snpdb
mmsi = params.get('mmsi', '')
days = min(params.get('days', 7), 30) # 최대 30일
if not mmsi:
return '(MMSI 미지정)'
query = """
SELECT time_bucket, distance_nm, avg_speed, max_speed, point_count,
start_position, end_position
FROM signal.t_vessel_tracks_daily
WHERE mmsi = %s AND time_bucket >= CURRENT_DATE - %s
ORDER BY time_bucket DESC
"""
with snpdb.get_conn() as conn:
with conn.cursor() as cur:
cur.execute(query, (mmsi, days))
rows = cur.fetchall()
if not rows:
return f'\n(MMSI {mmsi}: 최근 {days}일 항적 데이터 없음)\n'
lines = [f'\n## 항적 이력: {mmsi} (최근 {days}일)']
lines.append('| 날짜 | 이동거리(NM) | 평균속도 | 최대속도 | AIS포인트 |')
lines.append('|---|---|---|---|---|')
for row in rows:
dt, dist, avg_spd, max_spd, pts, start_pos, end_pos = row
lines.append(
f"| {dt} | {float(dist or 0):.1f} | {float(avg_spd or 0):.1f}kt "
f"| {float(max_spd or 0):.1f}kt | {pts or 0} |"
)
return '\n'.join(lines)
except Exception as e:
logger.error('vessel history query failed: %s', e)
return f'\n(항적 이력 조회 실패: {e})\n'
def _query_vessel_static(params: dict) -> str:
"""snpdb에서 선박 정적정보 + 변경 이력 조회."""
try:
from db import snpdb
mmsi = params.get('mmsi', '')
limit = min(params.get('limit', 10), 24)
if not mmsi:
return '(MMSI 미지정)'
query = """
SELECT time_bucket, name, vessel_type, length, width, draught,
destination, status, callsign, imo
FROM signal.t_vessel_static
WHERE mmsi = %s
ORDER BY time_bucket DESC
LIMIT %s
"""
with snpdb.get_conn() as conn:
with conn.cursor() as cur:
cur.execute(query, (mmsi, limit))
rows = cur.fetchall()
if not rows:
return f'\n(MMSI {mmsi}: 정적정보 없음)\n'
# 최신 정보
latest = rows[0]
lines = [f'\n## 선박 정적정보: {mmsi}']
lines.append(f'- 선명: {latest[1] or "N/A"}')
lines.append(f'- 선종: {latest[2] or "N/A"}')
lines.append(f'- 제원: L={latest[3] or 0}m × W={latest[4] or 0}m, 흘수={latest[5] or 0}m')
lines.append(f'- 목적지: {latest[6] or "N/A"}')
lines.append(f'- 상태: {latest[7] or "N/A"}')
lines.append(f'- 호출부호: {latest[8] or "N/A"}, IMO: {latest[9] or "N/A"}')
# 변경 이력 감지
changes = []
for i in range(len(rows) - 1):
curr, prev = rows[i], rows[i + 1]
diffs = []
if curr[1] != prev[1]:
diffs.append(f'선명: {prev[1]}{curr[1]}')
if curr[6] != prev[6]:
diffs.append(f'목적지: {prev[6]}{curr[6]}')
if curr[7] != prev[7]:
diffs.append(f'상태: {prev[7]}{curr[7]}')
if diffs:
changes.append(f'- {curr[0].strftime("%m/%d %H:%M")}: {", ".join(diffs)}')
if changes:
lines.append(f'\n### 변경 이력 (최근 {len(changes)}건)')
lines.extend(changes[:10])
return '\n'.join(lines)
except Exception as e:
logger.error('vessel static query failed: %s', e)
return f'\n(정적정보 조회 실패: {e})\n'
def _query_gear_correlation(params: dict) -> str:
"""어구 그룹의 연관 선박/어구 조회."""
from db import kcgdb
group_key = params.get('group_key', '')
limit = int(params.get('limit', 10))
with kcgdb.get_conn() as conn:
cur = conn.cursor()
try:
cur.execute(
'SELECT target_name, target_mmsi, target_type, current_score, '
'streak_count, observation_count, proximity_ratio, visit_score, '
'heading_coherence, freeze_state '
f'FROM {GEAR_CORRELATION_SCORES} s '
f'JOIN {CORRELATION_PARAM_MODELS} m ON s.model_id = m.id '
'WHERE s.group_key = %s AND m.is_default = TRUE AND s.current_score >= 0.3 '
'ORDER BY s.current_score DESC LIMIT %s',
(group_key, limit),
)
rows = cur.fetchall()
except Exception:
return f'어구 그룹 "{group_key}"에 대한 연관성 데이터가 없습니다 (테이블 미생성).'
finally:
cur.close()
if not rows:
return f'어구 그룹 "{group_key}"에 대한 연관성 데이터가 없습니다.'
lines = [f'## {group_key} 연관 분석 (상위 {len(rows)}개, default 모델)']
for r in rows:
name, mmsi, ttype, score, streak, obs, prox, visit, heading, state = r
pct = score * 100
disp_name = name or mmsi
detail_parts = []
if prox is not None:
detail_parts.append(f'근접 {prox*100:.0f}%')
if visit is not None:
detail_parts.append(f'방문 {visit*100:.0f}%')
if heading is not None:
detail_parts.append(f'COG동조 {heading*100:.0f}%')
detail = ', '.join(detail_parts) if detail_parts else ''
lines.append(
f'- **{disp_name}** ({mmsi}, {ttype}): '
f'일치율 {pct:.1f}% (연속 {streak}회, 관측 {obs}회) '
f'[{detail}] 상태: {state}'
)
return '\n'.join(lines)