kcg-ai-monitoring/prediction/chat/context_builder.py
htlee e2fc355b2c feat: S2 prediction 분석 엔진 모노레포 이식
iran prediction 47개 Python 파일을 prediction/ 디렉토리로 복제:
- algorithms/ 14개 분석 알고리즘 (어구추론, 다크베셀, 스푸핑, 환적, 위험도 등)
- pipeline/ 7단계 분류 파이프라인
- cache/vessel_store (24h 슬라이딩 윈도우)
- db/ 어댑터 (snpdb 원본조회, kcgdb 결과저장)
- chat/ AI 채팅 (Ollama, 후순위)
- data/ 정적 데이터 (기선, 특정어업수역 GeoJSON)

config.py를 kcgaidb로 재구성 (DB명, 사용자, 비밀번호)
DB 연결 검증 완료 (kcgaidb 37개 테이블 접근 확인)
Makefile에 dev-prediction / dev-all 타겟 추가
CLAUDE.md에 prediction 섹션 추가

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 12:56:51 +09:00

141 lines
5.7 KiB
Python

"""vessel_store + kcgdb 분석 데이터 + 도메인 지식을 기반으로 LLM 시스템 프롬프트를 구성."""
import logging
import re
from datetime import datetime, timezone
from chat.cache import get_cached_context
from chat.domain_knowledge import build_compact_prompt
logger = logging.getLogger(__name__)
def _build_realtime_context(ctx: dict) -> str:
"""Redis 캐시 데이터로 실시간 현황 프롬프트 구성 (간소화)."""
stats = ctx.get('vessel_stats', {})
risk = ctx.get('risk_distribution', {})
now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')
return f"""## 현황 ({now})
전체 {stats.get('vessels', 0)}척, 중국 {stats.get('chinese', 0)}척, 분석완료 {stats.get('targets', 0)}척, 허가 {stats.get('permitted', 0)}/906척
CRITICAL {risk.get('CRITICAL', 0)} / HIGH {risk.get('HIGH', 0)} / MEDIUM {risk.get('MEDIUM', 0)} / LOW {risk.get('LOW', 0)}
다크 {ctx.get('dark_count', 0)} / 스푸핑 {ctx.get('spoofing_count', 0)} / 환적 {ctx.get('transship_count', 0)}
영해 {risk.get('TERRITORIAL_SEA', 0)} / 접속 {risk.get('CONTIGUOUS_ZONE', 0)} / I {risk.get('ZONE_I', 0)} / II {risk.get('ZONE_II', 0)} / III {risk.get('ZONE_III', 0)} / IV {risk.get('ZONE_IV', 0)} / EEZ {risk.get('EEZ_OR_BEYOND', 0)}
(상세 데이터는 query_vessels 도구로 조회)"""
def _build_fallback_context() -> str:
"""Redis 캐시가 없을 때 vessel_store + kcgdb에서 직접 구성."""
try:
from cache.vessel_store import vessel_store
stats = vessel_store.stats()
from db import kcgdb
summary = kcgdb.fetch_analysis_summary()
top_risk = kcgdb.fetch_recent_high_risk(10)
polygon_summary = kcgdb.fetch_polygon_summary()
ctx = {
'vessel_stats': stats,
'risk_distribution': summary.get('risk_distribution', {}),
'dark_count': summary.get('dark_count', 0),
'spoofing_count': summary.get('spoofing_count', 0),
'transship_count': summary.get('transship_count', 0),
'top_risk_vessels': top_risk,
'polygon_summary': polygon_summary,
}
from chat.cache import cache_analysis_context
cache_analysis_context(ctx)
return _build_realtime_context(ctx)
except Exception as e:
logger.error('fallback context build failed: %s', e)
return '(실시간 데이터를 불러올 수 없습니다. 일반 해양 감시 지식으로 답변합니다.)'
# ── RAG: 사용자 질문에서 MMSI를 추출하여 선박별 상세 컨텍스트 주입 ──
_MMSI_PATTERN = re.compile(r'\b(\d{9})\b')
def _extract_mmsis(text: str) -> list[str]:
"""사용자 메시지에서 9자리 MMSI 추출."""
return _MMSI_PATTERN.findall(text)
def _build_vessel_detail(mmsi: str) -> str:
"""특정 MMSI의 분석 결과를 상세 컨텍스트로 구성 (RAG)."""
try:
from cache.vessel_store import vessel_store
info = vessel_store.get_vessel_info(mmsi)
positions = vessel_store.get_all_latest_positions()
pos = positions.get(mmsi)
from db import kcgdb
high_risk = kcgdb.fetch_recent_high_risk(100)
vessel_data = next((v for v in high_risk if v['mmsi'] == mmsi), None)
if not vessel_data and not pos:
return f'\n(MMSI {mmsi}: 분석 데이터 없음)\n'
lines = [f'\n## 선박 상세: {mmsi}']
if info:
name = info.get('name', 'N/A')
lines.append(f'- 선명: {name}')
if pos:
lines.append(f"- 위치: {pos.get('lat', 'N/A')}°N, {pos.get('lon', 'N/A')}°E")
lines.append(f"- SOG: {pos.get('sog', 'N/A')} knots, COG: {pos.get('cog', 'N/A')}°")
is_permitted = vessel_store.is_permitted(mmsi)
lines.append(f"- 허가 여부: {'허가어선' if is_permitted else '미허가/미등록'}")
if vessel_data:
lines.append(f"- 위험도: {vessel_data.get('risk_score', 'N/A')}점 ({vessel_data.get('risk_level', 'N/A')})")
lines.append(f"- 수역: {vessel_data.get('zone', 'N/A')}")
lines.append(f"- 활동: {vessel_data.get('activity_state', 'N/A')}")
lines.append(f"- 다크베셀: {'Y' if vessel_data.get('is_dark') else 'N'}")
lines.append(f"- 환적 의심: {'Y' if vessel_data.get('is_transship') else 'N'}")
lines.append(f"- 스푸핑 점수: {vessel_data.get('spoofing_score', 0):.2f}")
return '\n'.join(lines)
except Exception as e:
logger.warning('vessel detail build failed for %s: %s', mmsi, e)
return f'\n(MMSI {mmsi}: 상세 조회 실패)\n'
class MaritimeContextBuilder:
"""도메인 지식 + 실시간 데이터 + 선박별 RAG를 결합하여 시스템 프롬프트 구성."""
def build_system_prompt(self, user_message: str = '') -> str:
"""시스템 프롬프트 구성.
구조:
1) 압축 도메인 지식 (~500토큰: 역할+핵심용어+도구목록)
2) 실시간 현황 (Redis 캐시 → DB fallback)
3) RAG: 사용자 질문에 포함된 MMSI의 선박별 상세 데이터
상세 도메인 지식은 LLM이 get_knowledge 도구로 필요 시 조회.
"""
parts = []
# 1) 압축 도메인 지식 (~500토큰)
parts.append(build_compact_prompt())
# 2) 실시간 현황
cached = get_cached_context()
if cached:
parts.append(_build_realtime_context(cached))
else:
parts.append(_build_fallback_context())
# 3) RAG: MMSI 기반 선박 상세
if user_message:
mmsis = _extract_mmsis(user_message)
for mmsi in mmsis[:3]: # 최대 3척
parts.append(_build_vessel_detail(mmsi))
return '\n\n'.join(parts)