from __future__ import annotations import json import math from pathlib import Path from typing import List, Optional, Tuple EARTH_RADIUS_NM = 3440.065 TERRITORIAL_SEA_NM = 12.0 CONTIGUOUS_ZONE_NM = 24.0 _baseline_points: Optional[List[Tuple[float, float]]] = None _zone_polygons: Optional[list] = None def _load_baseline() -> List[Tuple[float, float]]: global _baseline_points if _baseline_points is not None: return _baseline_points path = Path(__file__).parent.parent / 'data' / 'korea_baseline.json' with open(path, 'r') as f: data = json.load(f) _baseline_points = [(p['lat'], p['lon']) for p in data['points']] return _baseline_points def haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """두 좌표 간 거리 (해리).""" R = EARTH_RADIUS_NM phi1, phi2 = math.radians(lat1), math.radians(lat2) dphi = math.radians(lat2 - lat1) dlam = math.radians(lon2 - lon1) a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2 return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) def dist_to_baseline(vessel_lat: float, vessel_lon: float, baseline_points: Optional[List[Tuple[float, float]]] = None) -> float: """선박 좌표에서 기선까지 최소 거리 (NM).""" if baseline_points is None: baseline_points = _load_baseline() min_dist = float('inf') for bp_lat, bp_lon in baseline_points: d = haversine_nm(vessel_lat, vessel_lon, bp_lat, bp_lon) if d < min_dist: min_dist = d return min_dist def _epsg3857_to_wgs84(x: float, y: float) -> Tuple[float, float]: """EPSG:3857 (Web Mercator) → WGS84 변환.""" lon = x / (math.pi * 6378137) * 180 lat = math.atan(math.exp(y / 6378137)) * 360 / math.pi - 90 return lat, lon def _load_zone_polygons() -> list: """특정어업수역 Ⅰ~Ⅳ GeoJSON 로드 + EPSG:3857→WGS84 변환.""" global _zone_polygons if _zone_polygons is not None: return _zone_polygons zone_dir = Path(__file__).parent.parent / 'data' / 'zones' zones_meta = [ ('ZONE_I', '수역Ⅰ(동해)', ['PS', 'FC'], '특정어업수역Ⅰ.json'), ('ZONE_II', '수역Ⅱ(남해)', ['PT', 'OT', 'GN', 'PS', 'FC'], '특정어업수역Ⅱ.json'), ('ZONE_III', '수역Ⅲ(서남해)', ['PT', 'OT', 'GN', 'PS', 'FC'], '특정어업수역Ⅲ.json'), ('ZONE_IV', '수역Ⅳ(서해)', ['GN', 'PS', 'FC'], '특정어업수역Ⅳ.json'), ] result = [] for zone_id, name, allowed, filename in zones_meta: filepath = zone_dir / filename if not filepath.exists(): continue with open(filepath, 'r') as f: data = json.load(f) multi_coords = data['features'][0]['geometry']['coordinates'] wgs84_polys = [] for poly in multi_coords: wgs84_rings = [] for ring in poly: wgs84_rings.append([_epsg3857_to_wgs84(x, y) for x, y in ring]) wgs84_polys.append(wgs84_rings) result.append({ 'id': zone_id, 'name': name, 'allowed': allowed, 'polygons': wgs84_polys, }) _zone_polygons = result return result def _point_in_polygon(lat: float, lon: float, ring: list) -> bool: """Ray-casting point-in-polygon.""" n = len(ring) inside = False j = n - 1 for i in range(n): yi, xi = ring[i] yj, xj = ring[j] if ((yi > lat) != (yj > lat)) and (lon < (xj - xi) * (lat - yi) / (yj - yi) + xi): inside = not inside j = i return inside def _point_in_multipolygon(lat: float, lon: float, polygons: list) -> bool: """MultiPolygon 내 포함 여부 (외곽 링 in + 내곽 링 hole 제외).""" for poly in polygons: outer = poly[0] if _point_in_polygon(lat, lon, outer): for hole in poly[1:]: if _point_in_polygon(lat, lon, hole): return False return True return False def classify_zone(vessel_lat: float, vessel_lon: float) -> dict: """선박 위치 수역 분류 — 특정어업수역 Ⅰ~Ⅳ 폴리곤 기반.""" zones = _load_zone_polygons() for z in zones: if _point_in_multipolygon(vessel_lat, vessel_lon, z['polygons']): dist = dist_to_baseline(vessel_lat, vessel_lon) return { 'zone': z['id'], 'zone_name': z['name'], 'allowed_gears': z['allowed'], 'dist_from_baseline_nm': round(dist, 2), 'violation': False, 'alert_level': 'WATCH', } dist = dist_to_baseline(vessel_lat, vessel_lon) if dist <= TERRITORIAL_SEA_NM: return { 'zone': 'TERRITORIAL_SEA', 'dist_from_baseline_nm': round(dist, 2), 'violation': True, 'alert_level': 'CRITICAL', } elif dist <= CONTIGUOUS_ZONE_NM: return { 'zone': 'CONTIGUOUS_ZONE', 'dist_from_baseline_nm': round(dist, 2), 'violation': False, 'alert_level': 'WATCH', } else: return { 'zone': 'EEZ_OR_BEYOND', 'dist_from_baseline_nm': round(dist, 2), 'violation': False, 'alert_level': 'NORMAL', } def bd09_to_wgs84(bd_lat: float, bd_lon: float) -> tuple[float, float]: """BD-09 좌표계를 WGS84로 변환.""" x = bd_lon - 0.0065 y = bd_lat - 0.006 z = math.sqrt(x ** 2 + y ** 2) - 0.00002 * math.sin(y * 52.35987756) theta = math.atan2(y, x) - 0.000003 * math.cos(x * 52.35987756) gcj_lon = z * math.cos(theta) gcj_lat = z * math.sin(theta) wgs_lat = gcj_lat - 0.0023 wgs_lon = gcj_lon - 0.0059 return wgs_lat, wgs_lon def compute_bd09_offset(lat: float, lon: float) -> float: """BD09 좌표와 WGS84 좌표 간 오프셋 (미터).""" wgs_lat, wgs_lon = bd09_to_wgs84(lat, lon) dist_nm = haversine_nm(lat, lon, wgs_lat, wgs_lon) return round(dist_nm * 1852.0, 1) # NM to meters