kcg-monitoring/prediction/algorithms/location.py
2026-03-20 12:47:29 +09:00

176 lines
6.0 KiB
Python
Raw Blame 히스토리

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import math
from pathlib import Path
from typing import List, Optional, Tuple
EARTH_RADIUS_NM = 3440.065
TERRITORIAL_SEA_NM = 12.0
CONTIGUOUS_ZONE_NM = 24.0
_baseline_points: Optional[List[Tuple[float, float]]] = None
_zone_polygons: Optional[list] = None
def _load_baseline() -> List[Tuple[float, float]]:
global _baseline_points
if _baseline_points is not None:
return _baseline_points
path = Path(__file__).parent.parent / 'data' / 'korea_baseline.json'
with open(path, 'r') as f:
data = json.load(f)
_baseline_points = [(p['lat'], p['lon']) for p in data['points']]
return _baseline_points
def haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""두 좌표 간 거리 (해리)."""
R = EARTH_RADIUS_NM
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def dist_to_baseline(vessel_lat: float, vessel_lon: float,
baseline_points: Optional[List[Tuple[float, float]]] = None) -> float:
"""선박 좌표에서 기선까지 최소 거리 (NM)."""
if baseline_points is None:
baseline_points = _load_baseline()
min_dist = float('inf')
for bp_lat, bp_lon in baseline_points:
d = haversine_nm(vessel_lat, vessel_lon, bp_lat, bp_lon)
if d < min_dist:
min_dist = d
return min_dist
def _epsg3857_to_wgs84(x: float, y: float) -> Tuple[float, float]:
"""EPSG:3857 (Web Mercator) → WGS84 변환."""
lon = x / (math.pi * 6378137) * 180
lat = math.atan(math.exp(y / 6378137)) * 360 / math.pi - 90
return lat, lon
def _load_zone_polygons() -> list:
"""특정어업수역 ~Ⅳ GeoJSON 로드 + EPSG:3857→WGS84 변환."""
global _zone_polygons
if _zone_polygons is not None:
return _zone_polygons
zone_dir = Path(__file__).parent.parent / 'data' / 'zones'
zones_meta = [
('ZONE_I', '수역Ⅰ(동해)', ['PS', 'FC'], '특정어업수역Ⅰ.json'),
('ZONE_II', '수역Ⅱ(남해)', ['PT', 'OT', 'GN', 'PS', 'FC'], '특정어업수역Ⅱ.json'),
('ZONE_III', '수역Ⅲ(서남해)', ['PT', 'OT', 'GN', 'PS', 'FC'], '특정어업수역Ⅲ.json'),
('ZONE_IV', '수역Ⅳ(서해)', ['GN', 'PS', 'FC'], '특정어업수역Ⅳ.json'),
]
result = []
for zone_id, name, allowed, filename in zones_meta:
filepath = zone_dir / filename
if not filepath.exists():
continue
with open(filepath, 'r') as f:
data = json.load(f)
multi_coords = data['features'][0]['geometry']['coordinates']
wgs84_polys = []
for poly in multi_coords:
wgs84_rings = []
for ring in poly:
wgs84_rings.append([_epsg3857_to_wgs84(x, y) for x, y in ring])
wgs84_polys.append(wgs84_rings)
result.append({
'id': zone_id, 'name': name, 'allowed': allowed,
'polygons': wgs84_polys,
})
_zone_polygons = result
return result
def _point_in_polygon(lat: float, lon: float, ring: list) -> bool:
"""Ray-casting point-in-polygon."""
n = len(ring)
inside = False
j = n - 1
for i in range(n):
yi, xi = ring[i]
yj, xj = ring[j]
if ((yi > lat) != (yj > lat)) and (lon < (xj - xi) * (lat - yi) / (yj - yi) + xi):
inside = not inside
j = i
return inside
def _point_in_multipolygon(lat: float, lon: float, polygons: list) -> bool:
"""MultiPolygon 내 포함 여부 (외곽 링 in + 내곽 링 hole 제외)."""
for poly in polygons:
outer = poly[0]
if _point_in_polygon(lat, lon, outer):
for hole in poly[1:]:
if _point_in_polygon(lat, lon, hole):
return False
return True
return False
def classify_zone(vessel_lat: float, vessel_lon: float) -> dict:
"""선박 위치 수역 분류 — 특정어업수역 ~Ⅳ 폴리곤 기반."""
zones = _load_zone_polygons()
for z in zones:
if _point_in_multipolygon(vessel_lat, vessel_lon, z['polygons']):
dist = dist_to_baseline(vessel_lat, vessel_lon)
return {
'zone': z['id'],
'zone_name': z['name'],
'allowed_gears': z['allowed'],
'dist_from_baseline_nm': round(dist, 2),
'violation': False,
'alert_level': 'WATCH',
}
dist = dist_to_baseline(vessel_lat, vessel_lon)
if dist <= TERRITORIAL_SEA_NM:
return {
'zone': 'TERRITORIAL_SEA',
'dist_from_baseline_nm': round(dist, 2),
'violation': True,
'alert_level': 'CRITICAL',
}
elif dist <= CONTIGUOUS_ZONE_NM:
return {
'zone': 'CONTIGUOUS_ZONE',
'dist_from_baseline_nm': round(dist, 2),
'violation': False,
'alert_level': 'WATCH',
}
else:
return {
'zone': 'EEZ_OR_BEYOND',
'dist_from_baseline_nm': round(dist, 2),
'violation': False,
'alert_level': 'NORMAL',
}
def bd09_to_wgs84(bd_lat: float, bd_lon: float) -> tuple[float, float]:
"""BD-09 좌표계를 WGS84로 변환."""
x = bd_lon - 0.0065
y = bd_lat - 0.006
z = math.sqrt(x ** 2 + y ** 2) - 0.00002 * math.sin(y * 52.35987756)
theta = math.atan2(y, x) - 0.000003 * math.cos(x * 52.35987756)
gcj_lon = z * math.cos(theta)
gcj_lat = z * math.sin(theta)
wgs_lat = gcj_lat - 0.0023
wgs_lon = gcj_lon - 0.0059
return wgs_lat, wgs_lon
def compute_bd09_offset(lat: float, lon: float) -> float:
"""BD09 좌표와 WGS84 좌표 간 오프셋 (미터)."""
wgs_lat, wgs_lon = bd09_to_wgs84(lat, lon)
dist_nm = haversine_nm(lat, lon, wgs_lat, wgs_lon)
return round(dist_nm * 1852.0, 1) # NM to meters