"""signal-batch API 클라이언트 — gc-signal-batch(192.168.1.18:18090) HTTP 연동. 데이터 소스: S&P Global AIS API → gc-signal-batch L0 캐시(Caffeine, 120분 TTL) 용도: 정적정보 보강 (shipKindCode, status, heading, draught 등) 궤적 소스는 SNPDB 유지 — 이 모듈은 정적정보 전용. 호출 전략: - 초기 로드: minutes=120 (L0 캐시 전체, 412* ~3,150척) - 5분 주기 보강: minutes=10 (직전 2사이클분, 412* ~728척) - gc-signal-batch 수집 주기: 매 분 :45초 → 호출 시점 :50초 이후 권장 """ import logging from typing import Optional import httpx from config import settings logger = logging.getLogger(__name__) _KR_WATERS_POLYGON = [[122, 31], [132, 31], [132, 39], [122, 39], [122, 31]] class SignalBatchClient: def __init__( self, base_url: str = '', timeout_sec: int = 60, ) -> None: self._base_url = (base_url or settings.SIGNAL_BATCH_URL).rstrip('/') self._timeout = timeout_sec def fetch_recent_detail( self, minutes: int = 10, polygon: Optional[list[list[float]]] = None, ) -> list[dict]: """POST /api/v1/vessels/recent-positions-detail L0 캐시(AisTargetCacheManager)에서 직접 조회. 공간 필터(폴리곤) 적용 가능. Returns: list[dict] — RecentPositionDetailResponse 배열 각 dict 필드: mmsi, imo, lon, lat, sog, cog, heading, shipNm, shipTy(텍스트 선종), shipKindCode(6자리 코드), nationalCode(MID 숫자), status(항해상태 텍스트), destination, eta, draught, length, width, lastUpdate """ url = f'{self._base_url}/api/v1/vessels/recent-positions-detail' body: dict = {'minutes': minutes} if polygon: body['coordinates'] = polygon try: with httpx.Client(timeout=self._timeout) as client: resp = client.post(url, json=body) resp.raise_for_status() data = resp.json() logger.info( 'fetch_recent_detail: %d vessels (minutes=%d)', len(data), minutes, ) return data except httpx.TimeoutException: logger.warning('fetch_recent_detail timeout (%ds)', self._timeout) return [] except httpx.HTTPStatusError as e: logger.warning('fetch_recent_detail HTTP %d: %s', e.response.status_code, e) return [] except Exception as e: logger.error('fetch_recent_detail failed: %s', e) return [] def fetch_area_tracks( self, start_time: str, end_time: str, polygon: Optional[list[list[float]]] = None, ) -> dict: """POST /api/v2/tracks/area-search D-1~D-7 인메모리 캐시 기반 항적 조회. 503 시 캐시 미준비 상태. Returns: dict — AreaSearchResponse {tracks, hitDetails, summary} """ url = f'{self._base_url}/api/v2/tracks/area-search' body = { 'startTime': start_time, 'endTime': end_time, 'polygons': [{ 'id': 'kr-waters', 'name': '한국 해역', 'coordinates': polygon or _KR_WATERS_POLYGON, }], } try: with httpx.Client(timeout=self._timeout) as client: resp = client.post(url, json=body) resp.raise_for_status() data = resp.json() tracks = data.get('tracks', []) logger.info( 'fetch_area_tracks: %d tracks (%s ~ %s)', len(tracks), start_time, end_time, ) return data except httpx.HTTPStatusError as e: if e.response.status_code == 503: logger.warning('area-search: cache not ready (503)') else: logger.warning('area-search HTTP %d', e.response.status_code) return {} except Exception as e: logger.error('fetch_area_tracks failed: %s', e) return {} @staticmethod def parse_static_info(detail_response: list[dict]) -> dict[str, dict]: """recent-positions-detail 응답에서 정적정보 dict 추출. Returns: {mmsi: {ship_kind_code, ship_ty, national_code, status, heading, draught, destination, name, length, width}} """ result: dict[str, dict] = {} for item in detail_response: mmsi = str(item.get('mmsi', '')) if not mmsi: continue info: dict = {} # 선종 (6자리 코드: 000020=어선, 000023=화물, 000028=미분류) if item.get('shipKindCode'): info['ship_kind_code'] = item['shipKindCode'] # 텍스트 선종 ("Cargo", "Tanker", "Vessel", "N/A") if item.get('shipTy'): info['ship_ty'] = item['shipTy'] # 국적 MID ("412"=중국, "440"=한국) if item.get('nationalCode'): info['national_code'] = item['nationalCode'] # 항해 상태 ("Under way using engine", "Anchored", "N/A") if item.get('status') and item['status'] != 'N/A': info['status'] = item['status'] # 선수 방향 if item.get('heading') is not None: info['heading'] = float(item['heading']) # 흘수 if item.get('draught') is not None and item['draught'] > 0: info['draught'] = float(item['draught']) # 목적지 if item.get('destination'): info['destination'] = item['destination'] # 기본 정보 (기존 SNPDB 정적정보와 동일 필드) if item.get('shipNm'): info['name'] = item['shipNm'] if item.get('length') and item['length'] > 0: info['length'] = int(item['length']) if item.get('width') and item['width'] > 0: info['width'] = int(item['width']) if info: result[mmsi] = info return result # 모듈 레벨 싱글턴 signal_client = SignalBatchClient()