176 lines
6.0 KiB
Python
176 lines
6.0 KiB
Python
from __future__ import annotations
|
||
|
||
import json
|
||
import math
|
||
from pathlib import Path
|
||
from typing import List, Optional, Tuple
|
||
|
||
EARTH_RADIUS_NM = 3440.065
|
||
TERRITORIAL_SEA_NM = 12.0
|
||
CONTIGUOUS_ZONE_NM = 24.0
|
||
|
||
_baseline_points: Optional[List[Tuple[float, float]]] = None
|
||
_zone_polygons: Optional[list] = None
|
||
|
||
|
||
def _load_baseline() -> List[Tuple[float, float]]:
|
||
global _baseline_points
|
||
if _baseline_points is not None:
|
||
return _baseline_points
|
||
path = Path(__file__).parent.parent / 'data' / 'korea_baseline.json'
|
||
with open(path, 'r') as f:
|
||
data = json.load(f)
|
||
_baseline_points = [(p['lat'], p['lon']) for p in data['points']]
|
||
return _baseline_points
|
||
|
||
|
||
def haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
||
"""두 좌표 간 거리 (해리)."""
|
||
R = EARTH_RADIUS_NM
|
||
phi1, phi2 = math.radians(lat1), math.radians(lat2)
|
||
dphi = math.radians(lat2 - lat1)
|
||
dlam = math.radians(lon2 - lon1)
|
||
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
|
||
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
||
|
||
|
||
def dist_to_baseline(vessel_lat: float, vessel_lon: float,
|
||
baseline_points: Optional[List[Tuple[float, float]]] = None) -> float:
|
||
"""선박 좌표에서 기선까지 최소 거리 (NM)."""
|
||
if baseline_points is None:
|
||
baseline_points = _load_baseline()
|
||
min_dist = float('inf')
|
||
for bp_lat, bp_lon in baseline_points:
|
||
d = haversine_nm(vessel_lat, vessel_lon, bp_lat, bp_lon)
|
||
if d < min_dist:
|
||
min_dist = d
|
||
return min_dist
|
||
|
||
|
||
def _epsg3857_to_wgs84(x: float, y: float) -> Tuple[float, float]:
|
||
"""EPSG:3857 (Web Mercator) → WGS84 변환."""
|
||
lon = x / (math.pi * 6378137) * 180
|
||
lat = math.atan(math.exp(y / 6378137)) * 360 / math.pi - 90
|
||
return lat, lon
|
||
|
||
|
||
def _load_zone_polygons() -> list:
|
||
"""특정어업수역 Ⅰ~Ⅳ GeoJSON 로드 + EPSG:3857→WGS84 변환."""
|
||
global _zone_polygons
|
||
if _zone_polygons is not None:
|
||
return _zone_polygons
|
||
|
||
zone_dir = Path(__file__).parent.parent / 'data' / 'zones'
|
||
zones_meta = [
|
||
('ZONE_I', '수역Ⅰ(동해)', ['PS', 'FC'], '특정어업수역Ⅰ.json'),
|
||
('ZONE_II', '수역Ⅱ(남해)', ['PT', 'OT', 'GN', 'PS', 'FC'], '특정어업수역Ⅱ.json'),
|
||
('ZONE_III', '수역Ⅲ(서남해)', ['PT', 'OT', 'GN', 'PS', 'FC'], '특정어업수역Ⅲ.json'),
|
||
('ZONE_IV', '수역Ⅳ(서해)', ['GN', 'PS', 'FC'], '특정어업수역Ⅳ.json'),
|
||
]
|
||
result = []
|
||
for zone_id, name, allowed, filename in zones_meta:
|
||
filepath = zone_dir / filename
|
||
if not filepath.exists():
|
||
continue
|
||
with open(filepath, 'r') as f:
|
||
data = json.load(f)
|
||
multi_coords = data['features'][0]['geometry']['coordinates']
|
||
wgs84_polys = []
|
||
for poly in multi_coords:
|
||
wgs84_rings = []
|
||
for ring in poly:
|
||
wgs84_rings.append([_epsg3857_to_wgs84(x, y) for x, y in ring])
|
||
wgs84_polys.append(wgs84_rings)
|
||
result.append({
|
||
'id': zone_id, 'name': name, 'allowed': allowed,
|
||
'polygons': wgs84_polys,
|
||
})
|
||
_zone_polygons = result
|
||
return result
|
||
|
||
|
||
def _point_in_polygon(lat: float, lon: float, ring: list) -> bool:
|
||
"""Ray-casting point-in-polygon."""
|
||
n = len(ring)
|
||
inside = False
|
||
j = n - 1
|
||
for i in range(n):
|
||
yi, xi = ring[i]
|
||
yj, xj = ring[j]
|
||
if ((yi > lat) != (yj > lat)) and (lon < (xj - xi) * (lat - yi) / (yj - yi) + xi):
|
||
inside = not inside
|
||
j = i
|
||
return inside
|
||
|
||
|
||
def _point_in_multipolygon(lat: float, lon: float, polygons: list) -> bool:
|
||
"""MultiPolygon 내 포함 여부 (외곽 링 in + 내곽 링 hole 제외)."""
|
||
for poly in polygons:
|
||
outer = poly[0]
|
||
if _point_in_polygon(lat, lon, outer):
|
||
for hole in poly[1:]:
|
||
if _point_in_polygon(lat, lon, hole):
|
||
return False
|
||
return True
|
||
return False
|
||
|
||
|
||
def classify_zone(vessel_lat: float, vessel_lon: float) -> dict:
|
||
"""선박 위치 수역 분류 — 특정어업수역 Ⅰ~Ⅳ 폴리곤 기반."""
|
||
zones = _load_zone_polygons()
|
||
|
||
for z in zones:
|
||
if _point_in_multipolygon(vessel_lat, vessel_lon, z['polygons']):
|
||
dist = dist_to_baseline(vessel_lat, vessel_lon)
|
||
return {
|
||
'zone': z['id'],
|
||
'zone_name': z['name'],
|
||
'allowed_gears': z['allowed'],
|
||
'dist_from_baseline_nm': round(dist, 2),
|
||
'violation': False,
|
||
'alert_level': 'WATCH',
|
||
}
|
||
|
||
dist = dist_to_baseline(vessel_lat, vessel_lon)
|
||
if dist <= TERRITORIAL_SEA_NM:
|
||
return {
|
||
'zone': 'TERRITORIAL_SEA',
|
||
'dist_from_baseline_nm': round(dist, 2),
|
||
'violation': True,
|
||
'alert_level': 'CRITICAL',
|
||
}
|
||
elif dist <= CONTIGUOUS_ZONE_NM:
|
||
return {
|
||
'zone': 'CONTIGUOUS_ZONE',
|
||
'dist_from_baseline_nm': round(dist, 2),
|
||
'violation': False,
|
||
'alert_level': 'WATCH',
|
||
}
|
||
else:
|
||
return {
|
||
'zone': 'EEZ_OR_BEYOND',
|
||
'dist_from_baseline_nm': round(dist, 2),
|
||
'violation': False,
|
||
'alert_level': 'NORMAL',
|
||
}
|
||
|
||
|
||
def bd09_to_wgs84(bd_lat: float, bd_lon: float) -> tuple[float, float]:
|
||
"""BD-09 좌표계를 WGS84로 변환."""
|
||
x = bd_lon - 0.0065
|
||
y = bd_lat - 0.006
|
||
z = math.sqrt(x ** 2 + y ** 2) - 0.00002 * math.sin(y * 52.35987756)
|
||
theta = math.atan2(y, x) - 0.000003 * math.cos(x * 52.35987756)
|
||
gcj_lon = z * math.cos(theta)
|
||
gcj_lat = z * math.sin(theta)
|
||
wgs_lat = gcj_lat - 0.0023
|
||
wgs_lon = gcj_lon - 0.0059
|
||
return wgs_lat, wgs_lon
|
||
|
||
|
||
def compute_bd09_offset(lat: float, lon: float) -> float:
|
||
"""BD09 좌표와 WGS84 좌표 간 오프셋 (미터)."""
|
||
wgs_lat, wgs_lon = bd09_to_wgs84(lat, lon)
|
||
dist_nm = haversine_nm(lat, lon, wgs_lat, wgs_lon)
|
||
return round(dist_nm * 1852.0, 1) # NM to meters
|