kcg-ai-monitoring/prediction/db/signal_api.py

175 lines
6.3 KiB
Python

"""signal-batch API 클라이언트 — gc-signal-batch(192.168.1.18:18090) HTTP 연동.
데이터 소스: S&P Global AIS API → gc-signal-batch L0 캐시(Caffeine, 120분 TTL)
용도: 정적정보 보강 (shipKindCode, status, heading, draught 등)
궤적 소스는 SNPDB 유지 — 이 모듈은 정적정보 전용.
호출 전략:
- 초기 로드: minutes=120 (L0 캐시 전체, 412* ~3,150척)
- 5분 주기 보강: minutes=10 (직전 2사이클분, 412* ~728척)
- gc-signal-batch 수집 주기: 매 분 :45초 → 호출 시점 :50초 이후 권장
"""
import logging
from typing import Optional
import httpx
from config import settings
logger = logging.getLogger(__name__)
_KR_WATERS_POLYGON = [[122, 31], [132, 31], [132, 39], [122, 39], [122, 31]]
class SignalBatchClient:
def __init__(
self,
base_url: str = '',
timeout_sec: int = 60,
) -> None:
self._base_url = (base_url or settings.SIGNAL_BATCH_URL).rstrip('/')
self._timeout = timeout_sec
def fetch_recent_detail(
self,
minutes: int = 10,
polygon: Optional[list[list[float]]] = None,
) -> list[dict]:
"""POST /api/v1/vessels/recent-positions-detail
L0 캐시(AisTargetCacheManager)에서 직접 조회.
공간 필터(폴리곤) 적용 가능.
Returns:
list[dict] — RecentPositionDetailResponse 배열
각 dict 필드:
mmsi, imo, lon, lat, sog, cog, heading,
shipNm, shipTy(텍스트 선종), shipKindCode(6자리 코드),
nationalCode(MID 숫자), status(항해상태 텍스트),
destination, eta, draught, length, width, lastUpdate
"""
url = f'{self._base_url}/api/v1/vessels/recent-positions-detail'
body: dict = {'minutes': minutes}
if polygon:
body['coordinates'] = polygon
try:
with httpx.Client(timeout=self._timeout) as client:
resp = client.post(url, json=body)
resp.raise_for_status()
data = resp.json()
logger.info(
'fetch_recent_detail: %d vessels (minutes=%d)',
len(data), minutes,
)
return data
except httpx.TimeoutException:
logger.warning('fetch_recent_detail timeout (%ds)', self._timeout)
return []
except httpx.HTTPStatusError as e:
logger.warning('fetch_recent_detail HTTP %d: %s', e.response.status_code, e)
return []
except Exception as e:
logger.error('fetch_recent_detail failed: %s', e)
return []
def fetch_area_tracks(
self,
start_time: str,
end_time: str,
polygon: Optional[list[list[float]]] = None,
) -> dict:
"""POST /api/v2/tracks/area-search
D-1~D-7 인메모리 캐시 기반 항적 조회.
503 시 캐시 미준비 상태.
Returns:
dict — AreaSearchResponse {tracks, hitDetails, summary}
"""
url = f'{self._base_url}/api/v2/tracks/area-search'
body = {
'startTime': start_time,
'endTime': end_time,
'polygons': [{
'id': 'kr-waters',
'name': '한국 해역',
'coordinates': polygon or _KR_WATERS_POLYGON,
}],
}
try:
with httpx.Client(timeout=self._timeout) as client:
resp = client.post(url, json=body)
resp.raise_for_status()
data = resp.json()
tracks = data.get('tracks', [])
logger.info(
'fetch_area_tracks: %d tracks (%s ~ %s)',
len(tracks), start_time, end_time,
)
return data
except httpx.HTTPStatusError as e:
if e.response.status_code == 503:
logger.warning('area-search: cache not ready (503)')
else:
logger.warning('area-search HTTP %d', e.response.status_code)
return {}
except Exception as e:
logger.error('fetch_area_tracks failed: %s', e)
return {}
@staticmethod
def parse_static_info(detail_response: list[dict]) -> dict[str, dict]:
"""recent-positions-detail 응답에서 정적정보 dict 추출.
Returns:
{mmsi: {ship_kind_code, ship_ty, national_code, status,
heading, draught, destination, name, length, width}}
"""
result: dict[str, dict] = {}
for item in detail_response:
mmsi = str(item.get('mmsi', ''))
if not mmsi:
continue
info: dict = {}
# 선종 (6자리 코드: 000020=어선, 000023=화물, 000028=미분류)
if item.get('shipKindCode'):
info['ship_kind_code'] = item['shipKindCode']
# 텍스트 선종 ("Cargo", "Tanker", "Vessel", "N/A")
if item.get('shipTy'):
info['ship_ty'] = item['shipTy']
# 국적 MID ("412"=중국, "440"=한국)
if item.get('nationalCode'):
info['national_code'] = item['nationalCode']
# 항해 상태 ("Under way using engine", "Anchored", "N/A")
if item.get('status') and item['status'] != 'N/A':
info['status'] = item['status']
# 선수 방향
if item.get('heading') is not None:
info['heading'] = float(item['heading'])
# 흘수
if item.get('draught') is not None and item['draught'] > 0:
info['draught'] = float(item['draught'])
# 목적지
if item.get('destination'):
info['destination'] = item['destination']
# 기본 정보 (기존 SNPDB 정적정보와 동일 필드)
if item.get('shipNm'):
info['name'] = item['shipNm']
if item.get('length') and item['length'] > 0:
info['length'] = int(item['length'])
if item.get('width') and item['width'] > 0:
info['width'] = int(item['width'])
if info:
result[mmsi] = info
return result
# 모듈 레벨 싱글턴
signal_client = SignalBatchClient()