kcg-ai-monitoring/prediction/algorithms/location.py
htlee e2fc355b2c feat: S2 prediction 분석 엔진 모노레포 이식
iran prediction 47개 Python 파일을 prediction/ 디렉토리로 복제:
- algorithms/ 14개 분석 알고리즘 (어구추론, 다크베셀, 스푸핑, 환적, 위험도 등)
- pipeline/ 7단계 분류 파이프라인
- cache/vessel_store (24h 슬라이딩 윈도우)
- db/ 어댑터 (snpdb 원본조회, kcgdb 결과저장)
- chat/ AI 채팅 (Ollama, 후순위)
- data/ 정적 데이터 (기선, 특정어업수역 GeoJSON)

config.py를 kcgaidb로 재구성 (DB명, 사용자, 비밀번호)
DB 연결 검증 완료 (kcgaidb 37개 테이블 접근 확인)
Makefile에 dev-prediction / dev-all 타겟 추가
CLAUDE.md에 prediction 섹션 추가

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 12:56:51 +09:00

176 lines
6.0 KiB
Python

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import math
from pathlib import Path
from typing import List, Optional, Tuple
EARTH_RADIUS_NM = 3440.065
TERRITORIAL_SEA_NM = 12.0
CONTIGUOUS_ZONE_NM = 24.0
_baseline_points: Optional[List[Tuple[float, float]]] = None
_zone_polygons: Optional[list] = None
def _load_baseline() -> List[Tuple[float, float]]:
global _baseline_points
if _baseline_points is not None:
return _baseline_points
path = Path(__file__).parent.parent / 'data' / 'korea_baseline.json'
with open(path, 'r') as f:
data = json.load(f)
_baseline_points = [(p['lat'], p['lon']) for p in data['points']]
return _baseline_points
def haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""두 좌표 간 거리 (해리)."""
R = EARTH_RADIUS_NM
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def dist_to_baseline(vessel_lat: float, vessel_lon: float,
baseline_points: Optional[List[Tuple[float, float]]] = None) -> float:
"""선박 좌표에서 기선까지 최소 거리 (NM)."""
if baseline_points is None:
baseline_points = _load_baseline()
min_dist = float('inf')
for bp_lat, bp_lon in baseline_points:
d = haversine_nm(vessel_lat, vessel_lon, bp_lat, bp_lon)
if d < min_dist:
min_dist = d
return min_dist
def _epsg3857_to_wgs84(x: float, y: float) -> Tuple[float, float]:
"""EPSG:3857 (Web Mercator) → WGS84 변환."""
lon = x / (math.pi * 6378137) * 180
lat = math.atan(math.exp(y / 6378137)) * 360 / math.pi - 90
return lat, lon
def _load_zone_polygons() -> list:
"""특정어업수역 ~Ⅳ GeoJSON 로드 + EPSG:3857→WGS84 변환."""
global _zone_polygons
if _zone_polygons is not None:
return _zone_polygons
zone_dir = Path(__file__).parent.parent / 'data' / 'zones'
zones_meta = [
('ZONE_I', '수역Ⅰ(동해)', ['PS', 'FC'], '특정어업수역Ⅰ.json'),
('ZONE_II', '수역Ⅱ(남해)', ['PT', 'OT', 'GN', 'PS', 'FC'], '특정어업수역Ⅱ.json'),
('ZONE_III', '수역Ⅲ(서남해)', ['PT', 'OT', 'GN', 'PS', 'FC'], '특정어업수역Ⅲ.json'),
('ZONE_IV', '수역Ⅳ(서해)', ['GN', 'PS', 'FC'], '특정어업수역Ⅳ.json'),
]
result = []
for zone_id, name, allowed, filename in zones_meta:
filepath = zone_dir / filename
if not filepath.exists():
continue
with open(filepath, 'r') as f:
data = json.load(f)
multi_coords = data['features'][0]['geometry']['coordinates']
wgs84_polys = []
for poly in multi_coords:
wgs84_rings = []
for ring in poly:
wgs84_rings.append([_epsg3857_to_wgs84(x, y) for x, y in ring])
wgs84_polys.append(wgs84_rings)
result.append({
'id': zone_id, 'name': name, 'allowed': allowed,
'polygons': wgs84_polys,
})
_zone_polygons = result
return result
def _point_in_polygon(lat: float, lon: float, ring: list) -> bool:
"""Ray-casting point-in-polygon."""
n = len(ring)
inside = False
j = n - 1
for i in range(n):
yi, xi = ring[i]
yj, xj = ring[j]
if ((yi > lat) != (yj > lat)) and (lon < (xj - xi) * (lat - yi) / (yj - yi) + xi):
inside = not inside
j = i
return inside
def _point_in_multipolygon(lat: float, lon: float, polygons: list) -> bool:
"""MultiPolygon 내 포함 여부 (외곽 링 in + 내곽 링 hole 제외)."""
for poly in polygons:
outer = poly[0]
if _point_in_polygon(lat, lon, outer):
for hole in poly[1:]:
if _point_in_polygon(lat, lon, hole):
return False
return True
return False
def classify_zone(vessel_lat: float, vessel_lon: float) -> dict:
"""선박 위치 수역 분류 — 특정어업수역 ~Ⅳ 폴리곤 기반."""
zones = _load_zone_polygons()
for z in zones:
if _point_in_multipolygon(vessel_lat, vessel_lon, z['polygons']):
dist = dist_to_baseline(vessel_lat, vessel_lon)
return {
'zone': z['id'],
'zone_name': z['name'],
'allowed_gears': z['allowed'],
'dist_from_baseline_nm': round(dist, 2),
'violation': False,
'alert_level': 'WATCH',
}
dist = dist_to_baseline(vessel_lat, vessel_lon)
if dist <= TERRITORIAL_SEA_NM:
return {
'zone': 'TERRITORIAL_SEA',
'dist_from_baseline_nm': round(dist, 2),
'violation': True,
'alert_level': 'CRITICAL',
}
elif dist <= CONTIGUOUS_ZONE_NM:
return {
'zone': 'CONTIGUOUS_ZONE',
'dist_from_baseline_nm': round(dist, 2),
'violation': False,
'alert_level': 'WATCH',
}
else:
return {
'zone': 'EEZ_OR_BEYOND',
'dist_from_baseline_nm': round(dist, 2),
'violation': False,
'alert_level': 'NORMAL',
}
def bd09_to_wgs84(bd_lat: float, bd_lon: float) -> tuple[float, float]:
"""BD-09 좌표계를 WGS84로 변환."""
x = bd_lon - 0.0065
y = bd_lat - 0.006
z = math.sqrt(x ** 2 + y ** 2) - 0.00002 * math.sin(y * 52.35987756)
theta = math.atan2(y, x) - 0.000003 * math.cos(x * 52.35987756)
gcj_lon = z * math.cos(theta)
gcj_lat = z * math.sin(theta)
wgs_lat = gcj_lat - 0.0023
wgs_lon = gcj_lon - 0.0059
return wgs_lat, wgs_lon
def compute_bd09_offset(lat: float, lon: float) -> float:
"""BD09 좌표와 WGS84 좌표 간 오프셋 (미터)."""
wgs_lat, wgs_lon = bd09_to_wgs84(lat, lon)
dist_nm = haversine_nm(lat, lon, wgs_lat, wgs_lon)
return round(dist_nm * 1852.0, 1) # NM to meters