kcg-monitoring/prediction/algorithms/location.py
htlee a68dfb21b2 feat: Python 어선 분류기 + 배포 설정 + 백엔드 모니터링 프록시
- prediction/: FastAPI 7단계 분류 파이프라인 + 6개 탐지 알고리즘
  - snpdb 궤적 조회 → 인메모리 캐시(13K척) → 분류 → kcgdb 저장
  - APScheduler 5분 주기, Python 3.9 호환
  - 버그 수정: @property last_bucket, SQL INTERVAL 바인딩, rollback, None 가드
  - 보안: DB 비밀번호 하드코딩 제거 → env 환경변수 필수
- deploy/kcg-prediction.service: systemd 서비스 (redis-211, 포트 8001)
- deploy.yml: prediction CI/CD 배포 단계 추가 (192.168.1.18:32023)
- backend: PredictionProxyController (health/status/trigger 프록시)
- backend: AppProperties predictionBaseUrl + AuthFilter 인증 예외

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 12:10:21 +09:00

94 lines
3.0 KiB
Python

from __future__ import annotations
import json
import math
from pathlib import Path
from typing import List, Optional, Tuple
EARTH_RADIUS_NM = 3440.065
TERRITORIAL_SEA_NM = 12.0
CONTIGUOUS_ZONE_NM = 24.0
_baseline_points: Optional[List[Tuple[float, float]]] = None
def _load_baseline() -> List[Tuple[float, float]]:
global _baseline_points
if _baseline_points is not None:
return _baseline_points
path = Path(__file__).parent.parent / 'data' / 'korea_baseline.json'
with open(path, 'r') as f:
data = json.load(f)
_baseline_points = [(p['lat'], p['lon']) for p in data['points']]
return _baseline_points
def haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""두 좌표 간 거리 (해리)."""
R = EARTH_RADIUS_NM
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def dist_to_baseline(vessel_lat: float, vessel_lon: float,
baseline_points: Optional[List[Tuple[float, float]]] = None) -> float:
"""선박 좌표에서 기선까지 최소 거리 (NM)."""
if baseline_points is None:
baseline_points = _load_baseline()
min_dist = float('inf')
for bp_lat, bp_lon in baseline_points:
d = haversine_nm(vessel_lat, vessel_lon, bp_lat, bp_lon)
if d < min_dist:
min_dist = d
return min_dist
def classify_zone(vessel_lat: float, vessel_lon: float) -> dict:
"""선박 위치 수역 분류."""
dist = dist_to_baseline(vessel_lat, vessel_lon)
if dist <= TERRITORIAL_SEA_NM:
return {
'zone': 'TERRITORIAL_SEA',
'dist_from_baseline_nm': round(dist, 2),
'violation': True,
'alert_level': 'CRITICAL',
}
elif dist <= CONTIGUOUS_ZONE_NM:
return {
'zone': 'CONTIGUOUS_ZONE',
'dist_from_baseline_nm': round(dist, 2),
'violation': False,
'alert_level': 'WATCH',
}
else:
return {
'zone': 'EEZ_OR_BEYOND',
'dist_from_baseline_nm': round(dist, 2),
'violation': False,
'alert_level': 'NORMAL',
}
def bd09_to_wgs84(bd_lat: float, bd_lon: float) -> tuple[float, float]:
"""BD-09 좌표계를 WGS84로 변환."""
x = bd_lon - 0.0065
y = bd_lat - 0.006
z = math.sqrt(x ** 2 + y ** 2) - 0.00002 * math.sin(y * 52.35987756)
theta = math.atan2(y, x) - 0.000003 * math.cos(x * 52.35987756)
gcj_lon = z * math.cos(theta)
gcj_lat = z * math.sin(theta)
wgs_lat = gcj_lat - 0.0023
wgs_lon = gcj_lon - 0.0059
return wgs_lat, wgs_lon
def compute_bd09_offset(lat: float, lon: float) -> float:
"""BD09 좌표와 WGS84 좌표 간 오프셋 (미터)."""
wgs_lat, wgs_lon = bd09_to_wgs84(lat, lon)
dist_nm = haversine_nm(lat, lon, wgs_lat, wgs_lon)
return round(dist_nm * 1852.0, 1) # NM to meters