iran prediction 47개 Python 파일을 prediction/ 디렉토리로 복제: - algorithms/ 14개 분석 알고리즘 (어구추론, 다크베셀, 스푸핑, 환적, 위험도 등) - pipeline/ 7단계 분류 파이프라인 - cache/vessel_store (24h 슬라이딩 윈도우) - db/ 어댑터 (snpdb 원본조회, kcgdb 결과저장) - chat/ AI 채팅 (Ollama, 후순위) - data/ 정적 데이터 (기선, 특정어업수역 GeoJSON) config.py를 kcgaidb로 재구성 (DB명, 사용자, 비밀번호) DB 연결 검증 완료 (kcgaidb 37개 테이블 접근 확인) Makefile에 dev-prediction / dev-all 타겟 추가 CLAUDE.md에 prediction 섹션 추가 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
421 lines
15 KiB
Python
421 lines
15 KiB
Python
"""LLM Tool Calling 실행기 — 사전 쿼리 + 동적 DB 조회."""
|
||
|
||
import json
|
||
import logging
|
||
import re
|
||
from typing import Optional
|
||
|
||
from config import qualified_table
|
||
|
||
logger = logging.getLogger(__name__)
|
||
VESSEL_ANALYSIS_RESULTS = qualified_table('vessel_analysis_results')
|
||
FLEET_VESSELS = qualified_table('fleet_vessels')
|
||
GROUP_POLYGON_SNAPSHOTS = qualified_table('group_polygon_snapshots')
|
||
GEAR_CORRELATION_SCORES = qualified_table('gear_correlation_scores')
|
||
CORRELATION_PARAM_MODELS = qualified_table('correlation_param_models')
|
||
|
||
# ── 사전 쿼리 패턴 (키워드 기반, 1회 왕복으로 해결) ──
|
||
|
||
_ZONE_MAP = {
|
||
'수역1': 'ZONE_I', '수역 1': 'ZONE_I', '수역I': 'ZONE_I', 'ZONE_I': 'ZONE_I', '수역i': 'ZONE_I',
|
||
'수역2': 'ZONE_II', '수역 2': 'ZONE_II', '수역II': 'ZONE_II', 'ZONE_II': 'ZONE_II',
|
||
'수역3': 'ZONE_III', '수역 3': 'ZONE_III', '수역III': 'ZONE_III', 'ZONE_III': 'ZONE_III',
|
||
'수역4': 'ZONE_IV', '수역 4': 'ZONE_IV', '수역IV': 'ZONE_IV', 'ZONE_IV': 'ZONE_IV',
|
||
'영해': 'TERRITORIAL_SEA', '접속수역': 'CONTIGUOUS_ZONE',
|
||
}
|
||
|
||
_ACTIVITY_MAP = {
|
||
'조업': 'FISHING', '어로': 'FISHING', '조업중': 'FISHING', '조업활동': 'FISHING',
|
||
'정박': 'STATIONARY', '정지': 'STATIONARY', '대기': 'STATIONARY',
|
||
'항행': 'SAILING', '이동': 'SAILING', '항해': 'SAILING',
|
||
}
|
||
|
||
_RISK_MAP = {
|
||
'크리티컬': 'CRITICAL', 'critical': 'CRITICAL', '긴급': 'CRITICAL',
|
||
'워치': 'HIGH', 'watch': 'HIGH', '경고': 'HIGH', '고위험': 'HIGH',
|
||
'모니터': 'MEDIUM', 'monitor': 'MEDIUM', '주의': 'MEDIUM',
|
||
'위험': None, # 위험 선박 → CRITICAL+HIGH
|
||
}
|
||
|
||
_DARK_KEYWORDS = ['다크', '다크베셀', 'dark', 'ais 차단', 'ais차단', '신호차단']
|
||
_TRANSSHIP_KEYWORDS = ['환적', 'transshipment', '전재']
|
||
_SPOOF_KEYWORDS = ['스푸핑', 'spoofing', 'gps 조작', 'gps조작', '위치조작']
|
||
|
||
|
||
def detect_prequery(message: str) -> Optional[dict]:
|
||
"""사용자 메시지에서 사전 쿼리 패턴을 감지하여 DB 조회 파라미터 반환."""
|
||
msg = message.lower().strip()
|
||
params: dict = {}
|
||
|
||
# 수역 감지
|
||
for keyword, zone in _ZONE_MAP.items():
|
||
if keyword.lower() in msg:
|
||
params['zone'] = zone
|
||
break
|
||
|
||
# 활동 감지
|
||
for keyword, activity in _ACTIVITY_MAP.items():
|
||
if keyword in msg:
|
||
params['activity'] = activity
|
||
break
|
||
|
||
# 위험도 감지
|
||
for keyword, level in _RISK_MAP.items():
|
||
if keyword in msg:
|
||
if level:
|
||
params['risk_level'] = level
|
||
else:
|
||
params['risk_levels'] = ['CRITICAL', 'HIGH']
|
||
break
|
||
|
||
# 다크베셀 감지
|
||
if any(k in msg for k in _DARK_KEYWORDS):
|
||
params['is_dark'] = True
|
||
|
||
# 환적 감지
|
||
if any(k in msg for k in _TRANSSHIP_KEYWORDS):
|
||
params['is_transship'] = True
|
||
|
||
# 스푸핑 감지
|
||
if any(k in msg for k in _SPOOF_KEYWORDS):
|
||
params['spoofing'] = True
|
||
|
||
return params if params else None
|
||
|
||
|
||
def execute_prequery(params: dict) -> str:
|
||
"""사전 쿼리 패턴에 해당하는 DB 조회를 실행하여 결과를 텍스트로 반환."""
|
||
try:
|
||
from db import kcgdb
|
||
|
||
conditions = ["analyzed_at > NOW() - INTERVAL '1 hour'"]
|
||
bind_params: list = []
|
||
|
||
if 'zone' in params:
|
||
conditions.append('zone = %s')
|
||
bind_params.append(params['zone'])
|
||
|
||
if 'activity' in params:
|
||
conditions.append('activity_state = %s')
|
||
bind_params.append(params['activity'])
|
||
|
||
if 'risk_level' in params:
|
||
conditions.append('risk_level = %s')
|
||
bind_params.append(params['risk_level'])
|
||
elif 'risk_levels' in params:
|
||
placeholders = ','.join(['%s'] * len(params['risk_levels']))
|
||
conditions.append(f'risk_level IN ({placeholders})')
|
||
bind_params.extend(params['risk_levels'])
|
||
|
||
if params.get('is_dark'):
|
||
conditions.append('is_dark = TRUE')
|
||
|
||
if params.get('is_transship'):
|
||
conditions.append('is_transship_suspect = TRUE')
|
||
|
||
if params.get('spoofing'):
|
||
conditions.append('spoofing_score > 0.5')
|
||
|
||
where = ' AND '.join(conditions)
|
||
|
||
query = f"""
|
||
SELECT v.mmsi, v.risk_score, v.risk_level, v.zone, v.activity_state,
|
||
v.vessel_type, v.is_dark, v.gap_duration_min, v.spoofing_score,
|
||
v.cluster_id, v.cluster_size, v.dist_to_baseline_nm,
|
||
v.is_transship_suspect, v.transship_pair_mmsi,
|
||
fv.permit_no, fv.name_cn, fv.gear_code
|
||
FROM {VESSEL_ANALYSIS_RESULTS} v
|
||
LEFT JOIN {FLEET_VESSELS} fv ON v.mmsi = fv.mmsi
|
||
WHERE {where}
|
||
ORDER BY v.risk_score DESC
|
||
LIMIT 30
|
||
"""
|
||
|
||
with kcgdb.get_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(query, bind_params)
|
||
rows = cur.fetchall()
|
||
|
||
if not rows:
|
||
return '\n## 조회 결과\n해당 조건에 맞는 선박이 없습니다.\n'
|
||
|
||
# 결과를 간략 테이블로 구성 (토큰 절약)
|
||
lines = [f'\n## 조회 결과 ({len(rows)}척)']
|
||
lines.append('| MMSI | 점수 | 수역 | 활동 | 허가 | 다크 |')
|
||
lines.append('|---|---|---|---|---|---|')
|
||
|
||
for row in rows[:15]: # 최대 15척
|
||
mmsi, risk_score, risk_level, zone, activity, vtype, is_dark, gap, spoof, \
|
||
cid, csize, dist_nm, is_trans, trans_pair, permit, name_cn, gear = row
|
||
permit_str = 'Y' if permit else 'N'
|
||
dark_str = 'Y' if is_dark else '-'
|
||
lines.append(f'| {mmsi} | {risk_score} | {zone} | {activity} | {permit_str} | {dark_str} |')
|
||
|
||
return '\n'.join(lines)
|
||
except Exception as e:
|
||
logger.error('prequery execution failed: %s', e)
|
||
return f'\n(DB 조회 실패: {e})\n'
|
||
|
||
|
||
# ── LLM Tool Calling 응답 파싱 + 실행 ──
|
||
|
||
_TOOL_CALL_PATTERN = re.compile(
|
||
r'\{"tool"\s*:\s*"(\w+)"\s*,\s*"params"\s*:\s*(\{[^}]+\})\}',
|
||
)
|
||
|
||
|
||
def parse_tool_calls(llm_response: str) -> list[dict]:
|
||
"""LLM 응답에서 tool call JSON을 추출."""
|
||
calls = []
|
||
for match in _TOOL_CALL_PATTERN.finditer(llm_response):
|
||
try:
|
||
tool_name = match.group(1)
|
||
params = json.loads(match.group(2))
|
||
calls.append({'tool': tool_name, 'params': params})
|
||
except json.JSONDecodeError:
|
||
continue
|
||
return calls[:2] # 최대 2개
|
||
|
||
|
||
def execute_tool_call(call: dict) -> str:
|
||
"""단일 tool call 실행."""
|
||
tool = call.get('tool', '')
|
||
params = call.get('params', {})
|
||
|
||
if tool == 'query_vessels':
|
||
return execute_prequery(params)
|
||
|
||
if tool == 'query_vessel_detail':
|
||
mmsi = params.get('mmsi', '')
|
||
if mmsi:
|
||
from chat.context_builder import _build_vessel_detail
|
||
return _build_vessel_detail(mmsi)
|
||
return '(MMSI 미지정)'
|
||
|
||
if tool == 'query_fleet_group':
|
||
return _query_fleet_group(params)
|
||
|
||
if tool == 'query_vessel_history':
|
||
return _query_vessel_history(params)
|
||
|
||
if tool == 'query_vessel_static':
|
||
return _query_vessel_static(params)
|
||
|
||
if tool == 'get_knowledge':
|
||
return _get_knowledge(params)
|
||
|
||
if tool == 'query_gear_correlation':
|
||
return _query_gear_correlation(params)
|
||
|
||
return f'(알 수 없는 도구: {tool})'
|
||
|
||
|
||
def _get_knowledge(params: dict) -> str:
|
||
"""도메인 지식 섹션 조회."""
|
||
key = params.get('key', '')
|
||
if not key:
|
||
return '(key 미지정. 사용 가능: maritime_zones, fishing_agreement, algorithm_guide, response_guide, db_schema)'
|
||
from chat.domain_knowledge import get_knowledge_section
|
||
return get_knowledge_section(key)
|
||
|
||
|
||
def _query_fleet_group(params: dict) -> str:
|
||
"""선단/어구 그룹 조회."""
|
||
try:
|
||
from db import kcgdb
|
||
|
||
conditions = [f"snapshot_time = (SELECT MAX(snapshot_time) FROM {GROUP_POLYGON_SNAPSHOTS})"]
|
||
bind_params: list = []
|
||
|
||
if 'group_type' in params:
|
||
conditions.append('group_type = %s')
|
||
bind_params.append(params['group_type'])
|
||
if 'zone_id' in params:
|
||
conditions.append('zone_id = %s')
|
||
bind_params.append(params['zone_id'])
|
||
|
||
where = ' AND '.join(conditions)
|
||
query = f"""
|
||
SELECT group_type, group_key, group_label, member_count, zone_name, members
|
||
FROM {GROUP_POLYGON_SNAPSHOTS}
|
||
WHERE {where}
|
||
ORDER BY member_count DESC
|
||
LIMIT 20
|
||
"""
|
||
|
||
with kcgdb.get_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(query, bind_params)
|
||
rows = cur.fetchall()
|
||
|
||
if not rows:
|
||
return '\n(해당 조건의 그룹 없음)\n'
|
||
|
||
lines = [f'\n## 그룹 조회 결과 ({len(rows)}건)']
|
||
lines.append('| 유형 | 키 | 라벨 | 선박수 | 수역 |')
|
||
lines.append('|---|---|---|---|---|')
|
||
for row in rows:
|
||
gtype, gkey, glabel, mcount, zname, members = row
|
||
lines.append(f'| {gtype} | {gkey} | {glabel or "-"} | {mcount} | {zname or "-"} |')
|
||
|
||
return '\n'.join(lines)
|
||
except Exception as e:
|
||
logger.error('fleet group query failed: %s', e)
|
||
return f'\n(그룹 조회 실패: {e})\n'
|
||
|
||
|
||
def _query_vessel_history(params: dict) -> str:
|
||
"""snpdb에서 선박 항적 이력 조회 (daily 집계)."""
|
||
try:
|
||
from db import snpdb
|
||
|
||
mmsi = params.get('mmsi', '')
|
||
days = min(params.get('days', 7), 30) # 최대 30일
|
||
|
||
if not mmsi:
|
||
return '(MMSI 미지정)'
|
||
|
||
query = """
|
||
SELECT time_bucket, distance_nm, avg_speed, max_speed, point_count,
|
||
start_position, end_position
|
||
FROM signal.t_vessel_tracks_daily
|
||
WHERE mmsi = %s AND time_bucket >= CURRENT_DATE - %s
|
||
ORDER BY time_bucket DESC
|
||
"""
|
||
|
||
with snpdb.get_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(query, (mmsi, days))
|
||
rows = cur.fetchall()
|
||
|
||
if not rows:
|
||
return f'\n(MMSI {mmsi}: 최근 {days}일 항적 데이터 없음)\n'
|
||
|
||
lines = [f'\n## 항적 이력: {mmsi} (최근 {days}일)']
|
||
lines.append('| 날짜 | 이동거리(NM) | 평균속도 | 최대속도 | AIS포인트 |')
|
||
lines.append('|---|---|---|---|---|')
|
||
for row in rows:
|
||
dt, dist, avg_spd, max_spd, pts, start_pos, end_pos = row
|
||
lines.append(
|
||
f"| {dt} | {float(dist or 0):.1f} | {float(avg_spd or 0):.1f}kt "
|
||
f"| {float(max_spd or 0):.1f}kt | {pts or 0} |"
|
||
)
|
||
|
||
return '\n'.join(lines)
|
||
except Exception as e:
|
||
logger.error('vessel history query failed: %s', e)
|
||
return f'\n(항적 이력 조회 실패: {e})\n'
|
||
|
||
|
||
def _query_vessel_static(params: dict) -> str:
|
||
"""snpdb에서 선박 정적정보 + 변경 이력 조회."""
|
||
try:
|
||
from db import snpdb
|
||
|
||
mmsi = params.get('mmsi', '')
|
||
limit = min(params.get('limit', 10), 24)
|
||
|
||
if not mmsi:
|
||
return '(MMSI 미지정)'
|
||
|
||
query = """
|
||
SELECT time_bucket, name, vessel_type, length, width, draught,
|
||
destination, status, callsign, imo
|
||
FROM signal.t_vessel_static
|
||
WHERE mmsi = %s
|
||
ORDER BY time_bucket DESC
|
||
LIMIT %s
|
||
"""
|
||
|
||
with snpdb.get_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(query, (mmsi, limit))
|
||
rows = cur.fetchall()
|
||
|
||
if not rows:
|
||
return f'\n(MMSI {mmsi}: 정적정보 없음)\n'
|
||
|
||
# 최신 정보
|
||
latest = rows[0]
|
||
lines = [f'\n## 선박 정적정보: {mmsi}']
|
||
lines.append(f'- 선명: {latest[1] or "N/A"}')
|
||
lines.append(f'- 선종: {latest[2] or "N/A"}')
|
||
lines.append(f'- 제원: L={latest[3] or 0}m × W={latest[4] or 0}m, 흘수={latest[5] or 0}m')
|
||
lines.append(f'- 목적지: {latest[6] or "N/A"}')
|
||
lines.append(f'- 상태: {latest[7] or "N/A"}')
|
||
lines.append(f'- 호출부호: {latest[8] or "N/A"}, IMO: {latest[9] or "N/A"}')
|
||
|
||
# 변경 이력 감지
|
||
changes = []
|
||
for i in range(len(rows) - 1):
|
||
curr, prev = rows[i], rows[i + 1]
|
||
diffs = []
|
||
if curr[1] != prev[1]:
|
||
diffs.append(f'선명: {prev[1]}→{curr[1]}')
|
||
if curr[6] != prev[6]:
|
||
diffs.append(f'목적지: {prev[6]}→{curr[6]}')
|
||
if curr[7] != prev[7]:
|
||
diffs.append(f'상태: {prev[7]}→{curr[7]}')
|
||
if diffs:
|
||
changes.append(f'- {curr[0].strftime("%m/%d %H:%M")}: {", ".join(diffs)}')
|
||
|
||
if changes:
|
||
lines.append(f'\n### 변경 이력 (최근 {len(changes)}건)')
|
||
lines.extend(changes[:10])
|
||
|
||
return '\n'.join(lines)
|
||
except Exception as e:
|
||
logger.error('vessel static query failed: %s', e)
|
||
return f'\n(정적정보 조회 실패: {e})\n'
|
||
|
||
|
||
def _query_gear_correlation(params: dict) -> str:
|
||
"""어구 그룹의 연관 선박/어구 조회."""
|
||
from db import kcgdb
|
||
|
||
group_key = params.get('group_key', '')
|
||
limit = int(params.get('limit', 10))
|
||
|
||
with kcgdb.get_conn() as conn:
|
||
cur = conn.cursor()
|
||
try:
|
||
cur.execute(
|
||
'SELECT target_name, target_mmsi, target_type, current_score, '
|
||
'streak_count, observation_count, proximity_ratio, visit_score, '
|
||
'heading_coherence, freeze_state '
|
||
f'FROM {GEAR_CORRELATION_SCORES} s '
|
||
f'JOIN {CORRELATION_PARAM_MODELS} m ON s.model_id = m.id '
|
||
'WHERE s.group_key = %s AND m.is_default = TRUE AND s.current_score >= 0.3 '
|
||
'ORDER BY s.current_score DESC LIMIT %s',
|
||
(group_key, limit),
|
||
)
|
||
rows = cur.fetchall()
|
||
except Exception:
|
||
return f'어구 그룹 "{group_key}"에 대한 연관성 데이터가 없습니다 (테이블 미생성).'
|
||
finally:
|
||
cur.close()
|
||
|
||
if not rows:
|
||
return f'어구 그룹 "{group_key}"에 대한 연관성 데이터가 없습니다.'
|
||
|
||
lines = [f'## {group_key} 연관 분석 (상위 {len(rows)}개, default 모델)']
|
||
for r in rows:
|
||
name, mmsi, ttype, score, streak, obs, prox, visit, heading, state = r
|
||
pct = score * 100
|
||
disp_name = name or mmsi
|
||
detail_parts = []
|
||
if prox is not None:
|
||
detail_parts.append(f'근접 {prox*100:.0f}%')
|
||
if visit is not None:
|
||
detail_parts.append(f'방문 {visit*100:.0f}%')
|
||
if heading is not None:
|
||
detail_parts.append(f'COG동조 {heading*100:.0f}%')
|
||
detail = ', '.join(detail_parts) if detail_parts else ''
|
||
|
||
lines.append(
|
||
f'- **{disp_name}** ({mmsi}, {ttype}): '
|
||
f'일치율 {pct:.1f}% (연속 {streak}회, 관측 {obs}회) '
|
||
f'[{detail}] 상태: {state}'
|
||
)
|
||
return '\n'.join(lines)
|