wing-ops/prediction/scat/geocoder.py
leedano d9fb4506bc feat(scat): Pre-SCAT 관할서 필터링 + 해안조사 데이터 파이프라인 구축
- 백엔드: 관할서 목록 API, zone 필터링 쿼리 추가
- 프론트: ScatLeftPanel 관할서 드롭다운, ScatMap/ScatPopup 개선
- 기상탭: WeatherRightPanel 리팩토링
- prediction/scat: PDF 파싱 → 지오코딩 → ESI 매핑 파이프라인
- vite.config: proxy 설정 추가

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 10:53:19 +09:00

267 lines
7.6 KiB
Python

"""Kakao Local API 기반 Geocoding + 오프셋 분산.
환경변수 KAKAO_REST_KEY 필요.
- access_pt 있는 구간: 주소/키워드 검색으로 정확한 좌표
- access_pt 없는 구간: sect_nm 기반 대표 좌표 + 나선형 오프셋
"""
from __future__ import annotations
import json
import math
import re
import time
from pathlib import Path
from typing import Optional, Tuple
import requests
import config
_HEADERS = {
'Authorization': f'KakaoAK {config.KAKAO_REST_KEY}',
}
# Kakao API endpoints
_ADDRESS_URL = 'https://dapi.kakao.com/v2/local/search/address.json'
_KEYWORD_URL = 'https://dapi.kakao.com/v2/local/search/keyword.json'
# 캐시: query → (lat, lng) or None
_cache: dict[str, Optional[Tuple[float, float]]] = {}
# API 호출 간격 (초)
_RATE_LIMIT = 0.05
# ---------------------------------------------------------------------------
# Kakao API 호출
# ---------------------------------------------------------------------------
def _search_address(query: str) -> Optional[Tuple[float, float]]:
"""Kakao 주소 검색 API."""
try:
resp = requests.get(
_ADDRESS_URL,
params={'query': query},
headers=_HEADERS,
timeout=5,
)
if resp.status_code != 200:
return None
data = resp.json()
docs = data.get('documents', [])
if docs:
return float(docs[0]['y']), float(docs[0]['x'])
except Exception:
pass
return None
def _search_keyword(query: str) -> Optional[Tuple[float, float]]:
"""Kakao 키워드 검색 API."""
try:
resp = requests.get(
_KEYWORD_URL,
params={'query': query},
headers=_HEADERS,
timeout=5,
)
if resp.status_code != 200:
return None
data = resp.json()
docs = data.get('documents', [])
if docs:
return float(docs[0]['y']), float(docs[0]['x'])
except Exception:
pass
return None
# ---------------------------------------------------------------------------
# 메인 Geocoding 함수
# ---------------------------------------------------------------------------
def geocode(query: str) -> Optional[Tuple[float, float]]:
"""주소/키워드 → (lat, lng). 캐시 적용. 실패 시 None."""
if not config.KAKAO_REST_KEY:
return None
if query in _cache:
return _cache[query]
time.sleep(_RATE_LIMIT)
# 1차: 주소 검색 (지번/도로명)
result = _search_address(query)
# 2차: 키워드 검색 (장소명, 방조제, 해수욕장 등)
if result is None:
time.sleep(_RATE_LIMIT)
result = _search_keyword(query)
_cache[query] = result
return result
def geocode_section(
access_pt: Optional[str],
sect_nm: str,
zone_name: str,
) -> Optional[Tuple[float, float]]:
"""구간에 대한 최적 좌표 검색.
우선순위:
1. access_pt (주소/지명)
2. sect_nm에서 '인근 해안' 제거한 지역명
3. zone_name (PDF 구역명)
"""
# 1. access_pt로 시도
if access_pt:
result = geocode(access_pt)
if result:
return result
# access_pt + zone_name 조합 시도
combined = f'{zone_name} {access_pt}' if zone_name else access_pt
result = geocode(combined)
if result:
return result
# 2. sect_nm에서 지역명 추출
area = _extract_area(sect_nm)
if area:
result = geocode(area)
if result:
return result
# 3. zone_name fallback
if zone_name:
result = geocode(zone_name)
if result:
return result
return None
def _extract_area(sect_nm: str) -> str:
"""sect_nm에서 geocoding용 지역명 추출.
예: '하동군 금남면 노량리 인근 해안''하동군 금남면 노량리'
"""
if not sect_nm:
return ''
# '인근 해안' 등 제거
area = re.sub(r'\s*인근\s*해안\s*$', '', sect_nm).strip()
# '해안' 단독 제거
area = re.sub(r'\s*해안\s*$', '', area).strip()
return area
# ---------------------------------------------------------------------------
# 오프셋 분산 (같은 좌표에 겹치는 구간들 분산)
# ---------------------------------------------------------------------------
def apply_spiral_offset(
base_lat: float,
base_lng: float,
index: int,
spacing: float = 0.0005,
) -> Tuple[float, float]:
"""나선형 오프셋 적용. index=0이면 원점, 1부터 나선.
spacing ~= 50m (위도 기준)
"""
if index == 0:
return base_lat, base_lng
# 나선형: 각도와 반경 증가
angle = index * 137.508 * math.pi / 180 # 황금각
radius = spacing * math.sqrt(index)
lat = base_lat + radius * math.cos(angle)
lng = base_lng + radius * math.sin(angle)
return round(lat, 6), round(lng, 6)
# ---------------------------------------------------------------------------
# 배치 Geocoding
# ---------------------------------------------------------------------------
def geocode_sections(
sections: list[dict],
zone_name: str = '',
) -> Tuple[int, int]:
"""섹션 리스트에 lat/lng를 채운다.
Returns:
(성공 수, 실패 수)
"""
# sect_nm 그룹별로 처리 (오프셋 적용)
from collections import defaultdict
groups: dict[str, list[dict]] = defaultdict(list)
for s in sections:
groups[s.get('sect_nm', '')].append(s)
success = 0
fail = 0
for sect_nm, group in groups.items():
# 그룹 대표 좌표 구하기 (첫 번째 access_pt 있는 구간 또는 sect_nm)
base_coord = None
# access_pt가 있는 구간에서 먼저 시도
for s in group:
if s.get('access_pt'):
coord = geocode_section(s['access_pt'], sect_nm, zone_name)
if coord:
base_coord = coord
break
# access_pt로 못 찾으면 sect_nm으로
if base_coord is None:
base_coord = geocode_section(None, sect_nm, zone_name)
if base_coord is None:
fail += len(group)
continue
# 그룹 내 구간별 좌표 할당
# access_pt가 있는 구간은 개별 geocoding, 없으면 대표+오프셋
offset_idx = 0
for s in sorted(group, key=lambda x: x.get('section_number', 0)):
if s.get('access_pt'):
coord = geocode_section(s['access_pt'], sect_nm, zone_name)
if coord:
s['lat'], s['lng'] = coord
success += 1
continue
# 오프셋 적용
lat, lng = apply_spiral_offset(base_coord[0], base_coord[1], offset_idx)
s['lat'] = lat
s['lng'] = lng
offset_idx += 1
success += 1
return success, fail
# ---------------------------------------------------------------------------
# 캐시 저장/로드
# ---------------------------------------------------------------------------
_CACHE_FILE = Path(__file__).parent / 'output' / '.geocode_cache.json'
def save_cache():
"""캐시를 파일로 저장."""
_CACHE_FILE.parent.mkdir(parents=True, exist_ok=True)
serializable = {k: list(v) if v else None for k, v in _cache.items()}
with open(_CACHE_FILE, 'w', encoding='utf-8') as f:
json.dump(serializable, f, ensure_ascii=False, indent=2)
def load_cache():
"""캐시 파일 로드."""
global _cache
if _CACHE_FILE.exists():
with open(_CACHE_FILE, encoding='utf-8') as f:
data = json.load(f)
_cache = {k: tuple(v) if v else None for k, v in data.items()}