"""선단/어구그룹 폴리곤 생성기. 프론트엔드 FleetClusterLayer.tsx의 어구그룹 탐지 + convexHull/padPolygon 로직을 Python으로 이관한다. Shapely 라이브러리로 폴리곤 생성. """ from __future__ import annotations import logging import math import re from datetime import datetime, timezone from typing import Optional from zoneinfo import ZoneInfo import pandas as pd try: from shapely.geometry import MultiPoint, Point from shapely import wkt as shapely_wkt _SHAPELY_AVAILABLE = True except ImportError: _SHAPELY_AVAILABLE = False from algorithms.location import classify_zone logger = logging.getLogger(__name__) # 어구 이름 패턴 — _ 필수 (공백만으로는 어구 미판정, fleet_tracker.py와 동일) GEAR_PATTERN = re.compile(r'^(.+?)_(?=\S*\d)\S+(?:[_ ]\S*)*[_ ]*$|^(\d+)$') MAX_DIST_DEG = 0.15 # ~10NM STALE_SEC = 21600 # 6시간 (어구 P75 갭 3.5h, P90 갭 8h 커버) — 그룹 멤버 탐색용 DISPLAY_STALE_SEC = 3600 # 1시간 — 폴리곤 스냅샷 노출 기준 (프론트엔드 초기 로드 minutes=60과 동기화) # time_bucket(적재시간) 기반 필터링 — AIS 원본 timestamp는 부표 시계 오류로 부정확할 수 있음 FLEET_BUFFER_DEG = 0.02 GEAR_BUFFER_DEG = 0.01 MIN_GEAR_GROUP_SIZE = 2 # 최소 어구 수 (비허가 구역 외) _KST = ZoneInfo('Asia/Seoul') def _get_time_bucket_age(mmsi: str, all_positions: dict, now: datetime) -> float: """MMSI의 time_bucket 기반 age(초) 반환. 실패 시 inf.""" pos = all_positions.get(mmsi) tb = pos.get('time_bucket') if pos else None if tb is None: return float('inf') try: tb_dt = pd.Timestamp(tb) if tb_dt.tzinfo is None: tb_dt = tb_dt.tz_localize(_KST).tz_convert(timezone.utc) return (now - tb_dt.to_pydatetime()).total_seconds() except Exception: return float('inf') # 수역 내 어구 색상, 수역 외 어구 색상 _COLOR_GEAR_IN_ZONE = '#ef4444' _COLOR_GEAR_OUT_ZONE = '#f97316' # classify_zone이 수역 내로 판정하는 zone 값 목록 _IN_ZONE_PREFIXES = ('ZONE_',) def _is_in_zone(zone_info: dict) -> bool: """classify_zone 결과가 특정어업수역 내인지 판별.""" zone = zone_info.get('zone', '') return any(zone.startswith(prefix) for prefix in _IN_ZONE_PREFIXES) def _cluster_color(seed: int) -> str: """프론트 clusterColor(id) 이관 — hsl({(seed * 137) % 360}, 80%, 55%).""" h = (seed * 137) % 360 return f'hsl({h}, 80%, 55%)' def compute_area_sq_nm(polygon, center_lat: float) -> float: """Shapely Polygon의 면적(degrees²) → 제곱 해리 변환. 1도 위도 ≈ 60 NM, 1도 경도 ≈ 60 * cos(lat) NM sq_nm = area_deg2 * 60 * 60 * cos(center_lat_rad) """ area_deg2 = polygon.area center_lat_rad = math.radians(center_lat) sq_nm = area_deg2 * 60.0 * 60.0 * math.cos(center_lat_rad) return round(sq_nm, 4) def build_group_polygon( points: list[tuple[float, float]], buffer_deg: float, ) -> tuple[Optional[str], Optional[str], float, float, float]: """좌표 목록으로 버퍼 폴리곤을 생성한다. Args: points: (lon, lat) 좌표 목록 — Shapely (x, y) 순서. buffer_deg: 버퍼 크기(도). Returns: (polygon_wkt, center_wkt, area_sq_nm, center_lat, center_lon) — polygon_wkt/center_wkt: ST_GeomFromText에 사용할 WKT 문자열. — 좌표가 없거나 Shapely 미설치 시 (None, None, 0.0, 0.0, 0.0). """ if not _SHAPELY_AVAILABLE: logger.warning('shapely 미설치 — build_group_polygon 건너뜀') return None, None, 0.0, 0.0, 0.0 if not points: return None, None, 0.0, 0.0, 0.0 if len(points) == 1: geom = Point(points[0]).buffer(buffer_deg) elif len(points) == 2: # LineString → buffer로 Polygon 생성 from shapely.geometry import LineString geom = LineString(points).buffer(buffer_deg) else: # 3점 이상 → convex_hull → buffer geom = MultiPoint(points).convex_hull.buffer(buffer_deg) # 중심 계산 centroid = geom.centroid center_lon = centroid.x center_lat = centroid.y area_sq_nm = compute_area_sq_nm(geom, center_lat) polygon_wkt = shapely_wkt.dumps(geom, rounding_precision=6) center_wkt = f'POINT({center_lon:.6f} {center_lat:.6f})' return polygon_wkt, center_wkt, area_sq_nm, center_lat, center_lon def detect_gear_groups( vessel_store, now: Optional[datetime] = None, ) -> list[dict]: """어구 이름 패턴으로 어구그룹을 탐지한다. 프론트엔드 FleetClusterLayer.tsx gearGroupMap useMemo 로직 이관. 전체 AIS 선박(vessel_store._tracks)에서 어구 패턴을 탐지한다. Args: vessel_store: VesselStore — get_all_latest_positions() + get_vessel_info(). now: 기준 시각 (None이면 UTC now). Returns: [{parent_name, parent_mmsi, members: [{mmsi, name, lat, lon, sog, cog}]}] """ if now is None: now = datetime.now(timezone.utc) # 전체 선박의 최신 위치 가져오기 all_positions = vessel_store.get_all_latest_positions() # 선박명 → mmsi 맵 (모선 탐색용, 어구 패턴이 아닌 선박만) # 정규화 키(공백 제거) + 원본 이름 모두 등록 name_to_mmsi: dict[str, str] = {} for mmsi, pos in all_positions.items(): name = (pos.get('name') or '').strip() if name and not GEAR_PATTERN.match(name): name_to_mmsi[name] = mmsi name_to_mmsi[name.replace(' ', '')] = mmsi # parent 이름 정규화 — 공백 제거 후 같은 모선은 하나로 통합 def _normalize_parent(raw: str) -> str: return raw.replace(' ', '') # 1단계: 같은 모선명 어구 수집 (60분 이내만, 공백 정규화) raw_groups: dict[str, list[dict]] = {} parent_display: dict[str, str] = {} # normalized → 대표 원본 이름 for mmsi, pos in all_positions.items(): name = (pos.get('name') or '').strip() if not name: continue # staleness 체크 ts = pos.get('timestamp') if ts is not None: if isinstance(ts, datetime): last_dt = ts if ts.tzinfo is not None else ts.replace(tzinfo=timezone.utc) else: try: last_dt = pd.Timestamp(ts).to_pydatetime() if last_dt.tzinfo is None: last_dt = last_dt.replace(tzinfo=timezone.utc) except Exception: continue age_sec = (now - last_dt).total_seconds() if age_sec > STALE_SEC: continue m = GEAR_PATTERN.match(name) if not m: continue # 한국 국적 선박(MMSI 440/441)은 어구 AIS 미사용 → 제외 if mmsi.startswith('440') or mmsi.startswith('441'): continue parent_raw = (m.group(1) or name).strip() parent_key = _normalize_parent(parent_raw) # 대표 이름: 공백 없는 버전 우선 (더 정규화된 형태) if parent_key not in parent_display or ' ' not in parent_raw: parent_display[parent_key] = parent_raw entry = { 'mmsi': mmsi, 'name': name, 'lat': pos['lat'], 'lon': pos['lon'], 'sog': pos.get('sog', 0), 'cog': pos.get('cog', 0), 'timestamp': ts, } raw_groups.setdefault(parent_key, []).append(entry) # 2단계: 연결 기반 서브 클러스터링 (각 어구가 클러스터 내 최소 1개와 MAX_DIST_DEG 이내) # 같은 parent 이름이라도 거리가 먼 어구들은 별도 서브그룹으로 분리 results: list[dict] = [] for parent_key, gears in raw_groups.items(): parent_mmsi = name_to_mmsi.get(parent_key) display_name = parent_display.get(parent_key, parent_key) if not gears: continue # 모선 위치 (있으면 시드 포인트로 활용) seed_lat: Optional[float] = None seed_lon: Optional[float] = None if parent_mmsi and parent_mmsi in all_positions: p = all_positions[parent_mmsi] seed_lat, seed_lon = p['lat'], p['lon'] # 연결 기반 클러스터링 (Union-Find 방식) n = len(gears) parent_uf = list(range(n)) def find(x: int) -> int: while parent_uf[x] != x: parent_uf[x] = parent_uf[parent_uf[x]] x = parent_uf[x] return x def union(a: int, b: int) -> None: ra, rb = find(a), find(b) if ra != rb: parent_uf[ra] = rb for i in range(n): for j in range(i + 1, n): if (abs(gears[i]['lat'] - gears[j]['lat']) <= MAX_DIST_DEG and abs(gears[i]['lon'] - gears[j]['lon']) <= MAX_DIST_DEG): union(i, j) # 클러스터별 그룹화 clusters: dict[int, list[int]] = {} for i in range(n): clusters.setdefault(find(i), []).append(i) # 모선이 있으면 모선과 가장 가까운 클러스터에 연결 (MAX_DIST_DEG 이내만) seed_cluster_root: Optional[int] = None if seed_lat is not None and seed_lon is not None: best_dist = float('inf') for root, idxs in clusters.items(): for i in idxs: d = abs(gears[i]['lat'] - seed_lat) + abs(gears[i]['lon'] - seed_lon) if d < best_dist: best_dist = d seed_cluster_root = root # 모선이 어느 클러스터와도 MAX_DIST_DEG 초과 → 연결하지 않음 if best_dist > MAX_DIST_DEG * 2: seed_cluster_root = None # 클러스터마다 서브그룹 생성 (최소 2개 이상이거나 모선 포함) for ci, (root, idxs) in enumerate(clusters.items()): has_seed = (root == seed_cluster_root) if len(idxs) < 2 and not has_seed: continue members = [ {'mmsi': gears[i]['mmsi'], 'name': gears[i]['name'], 'lat': gears[i]['lat'], 'lon': gears[i]['lon'], 'sog': gears[i]['sog'], 'cog': gears[i]['cog']} for i in idxs ] # group_key는 항상 원본명 유지, 서브클러스터는 별도 ID로 구분 sub_cluster_id = 0 if len(clusters) == 1 else (ci + 1) sub_mmsi = parent_mmsi if has_seed else None results.append({ 'parent_name': display_name, 'parent_key': parent_key, 'parent_mmsi': sub_mmsi, 'sub_cluster_id': sub_cluster_id, 'members': members, }) # 3단계: 동일 parent_key 서브그룹 간 근접 병합 (거리 이내 시) # prefix 기반 병합은 과도한 그룹화 유발 → 동일 키만 병합 def _groups_nearby(a: dict, b: dict) -> bool: for ma in a['members']: for mb in b['members']: if abs(ma['lat'] - mb['lat']) <= MAX_DIST_DEG and abs(ma['lon'] - mb['lon']) <= MAX_DIST_DEG: return True return False merged: list[dict] = [] skip: set[int] = set() results.sort(key=lambda g: len(g['members']), reverse=True) for i, big in enumerate(results): if i in skip: continue for j, small in enumerate(results): if j <= i or j in skip: continue # 동일 parent_key만 병합 (prefix 매칭 제거 — 과도한 병합 방지) if big['parent_key'] == small['parent_key'] and _groups_nearby(big, small): existing_mmsis = {m['mmsi'] for m in big['members']} for m in small['members']: if m['mmsi'] not in existing_mmsis: big['members'].append(m) existing_mmsis.add(m['mmsi']) if not big['parent_mmsi'] and small['parent_mmsi']: big['parent_mmsi'] = small['parent_mmsi'] big['sub_cluster_id'] = 0 # 병합됨 → 단일 클러스터 skip.add(j) del big['parent_key'] merged.append(big) return merged def build_all_group_snapshots( vessel_store, company_vessels: dict[int, list[str]], companies: dict[int, dict], ) -> list[dict]: """선단(FLEET) + 어구그룹(GEAR) 폴리곤 스냅샷을 생성한다. Shapely 미설치 시 빈 리스트를 반환한다. Args: vessel_store: VesselStore — get_all_latest_positions() + get_vessel_info(). company_vessels: {company_id: [mmsi_list]}. companies: {id: {name_cn, name_en}}. Returns: DB INSERT용 dict 목록. """ if not _SHAPELY_AVAILABLE: logger.warning('shapely 미설치 — build_all_group_snapshots 빈 리스트 반환') return [] now = datetime.now(timezone.utc) snapshots: list[dict] = [] all_positions = vessel_store.get_all_latest_positions() # ── FLEET 타입: company_vessels 순회 ────────────────────────── for company_id, mmsi_list in company_vessels.items(): company_info = companies.get(company_id, {}) group_label = company_info.get('name_cn') or company_info.get('name_en') or str(company_id) # 각 선박의 최신 좌표 추출 points: list[tuple[float, float]] = [] members: list[dict] = [] for mmsi in mmsi_list: pos = all_positions.get(mmsi) if not pos: continue lat = pos['lat'] lon = pos['lon'] sog = pos.get('sog', 0) cog = pos.get('cog', 0) points.append((lon, lat)) members.append({ 'mmsi': mmsi, 'name': pos.get('name', ''), 'lat': lat, 'lon': lon, 'sog': sog, 'cog': cog, 'role': 'LEADER' if mmsi == mmsi_list[0] else 'MEMBER', 'isParent': False, }) newest_age = min( (_get_time_bucket_age(m['mmsi'], all_positions, now) for m in members), default=float('inf'), ) # 2척 미만 또는 최근 적재가 DISPLAY_STALE_SEC 초과 → 폴리곤 미생성 if len(points) < 2 or newest_age > DISPLAY_STALE_SEC: continue polygon_wkt, center_wkt, area_sq_nm, center_lat, center_lon = build_group_polygon( points, FLEET_BUFFER_DEG ) snapshots.append({ 'group_type': 'FLEET', 'group_key': str(company_id), 'group_label': group_label, 'resolution': '1h', 'snapshot_time': now, 'polygon_wkt': polygon_wkt, 'center_wkt': center_wkt, 'area_sq_nm': area_sq_nm, 'member_count': len(members), 'zone_id': None, 'zone_name': None, 'members': members, 'color': _cluster_color(company_id), }) # ── GEAR 타입: detect_gear_groups 결과 → 1h/6h 듀얼 스냅샷 ──── gear_groups = detect_gear_groups(vessel_store, now=now) # parent_name 기준 전체 1h 활성 멤버 합산 (서브클러스터 분리 전) parent_active_1h: dict[str, int] = {} for group in gear_groups: pn = group['parent_name'] cnt = sum( 1 for gm in group['members'] if _get_time_bucket_age(gm.get('mmsi'), all_positions, now) <= DISPLAY_STALE_SEC ) parent_active_1h[pn] = parent_active_1h.get(pn, 0) + cnt for group in gear_groups: parent_name: str = group['parent_name'] parent_mmsi: Optional[str] = group['parent_mmsi'] gear_members: list[dict] = group['members'] # 6h STALE 기반 전체 멤버 if not gear_members: continue # ── 1h 활성 멤버 필터 (이 서브클러스터 내) ── active_members_1h = [ gm for gm in gear_members if _get_time_bucket_age(gm.get('mmsi'), all_positions, now) <= DISPLAY_STALE_SEC ] # fallback: 서브클러스터 내 1h < 2이면 time_bucket 최신 2개 유지 display_members_1h = active_members_1h if len(active_members_1h) < 2 and len(gear_members) >= 2: sorted_by_age = sorted( gear_members, key=lambda gm: _get_time_bucket_age(gm.get('mmsi'), all_positions, now), ) display_members_1h = sorted_by_age[:2] # ── 6h 전체 멤버 노출 조건: 최신 적재가 STALE_SEC 이내 ── newest_age_6h = min( (_get_time_bucket_age(gm.get('mmsi'), all_positions, now) for gm in gear_members), default=float('inf'), ) display_members_6h = gear_members # ── resolution별 스냅샷 생성 ── # 1h-fb: parent_name 전체 1h 활성 < 2 → 리플레이/일치율 추적용, 라이브 현황에서 제외 # parent_name 전체 기준으로 판단 (서브클러스터 분리로 개별 멤버가 적어져도 그룹 전체가 활성이면 1h) res_1h = '1h' if parent_active_1h.get(parent_name, 0) >= 2 else '1h-fb' for resolution, members_for_snap in [(res_1h, display_members_1h), ('6h', display_members_6h)]: if len(members_for_snap) < 2: continue # 6h: 최신 적재가 STALE_SEC(6h) 초과 시 스킵 if resolution == '6h' and newest_age_6h > STALE_SEC: continue # 수역 분류: anchor(모선 or 첫 멤버) 위치 기준 anchor_lat: Optional[float] = None anchor_lon: Optional[float] = None if parent_mmsi and parent_mmsi in all_positions: parent_pos = all_positions[parent_mmsi] anchor_lat = parent_pos['lat'] anchor_lon = parent_pos['lon'] if anchor_lat is None and members_for_snap: anchor_lat = members_for_snap[0]['lat'] anchor_lon = members_for_snap[0]['lon'] if anchor_lat is None: continue zone_info = classify_zone(float(anchor_lat), float(anchor_lon)) in_zone = _is_in_zone(zone_info) zone_id = zone_info.get('zone') if in_zone else None zone_name = zone_info.get('zone_name') if in_zone else None # 비허가(수역 외) 어구: MIN_GEAR_GROUP_SIZE 미만 제외 if not in_zone and len(members_for_snap) < MIN_GEAR_GROUP_SIZE: continue # 폴리곤 points: 멤버 좌표 + 모선 좌표 (근접 시에만) points = [(g['lon'], g['lat']) for g in members_for_snap] parent_nearby = False if parent_mmsi and parent_mmsi in all_positions: parent_pos = all_positions[parent_mmsi] p_lon, p_lat = parent_pos['lon'], parent_pos['lat'] if any(abs(g['lat'] - p_lat) <= MAX_DIST_DEG * 2 and abs(g['lon'] - p_lon) <= MAX_DIST_DEG * 2 for g in members_for_snap): if (p_lon, p_lat) not in points: points.append((p_lon, p_lat)) parent_nearby = True polygon_wkt, center_wkt, area_sq_nm, _clat, _clon = build_group_polygon( points, GEAR_BUFFER_DEG ) # members JSONB 구성 members_out: list[dict] = [] if parent_nearby and parent_mmsi and parent_mmsi in all_positions: parent_pos = all_positions[parent_mmsi] members_out.append({ 'mmsi': parent_mmsi, 'name': parent_name, 'lat': parent_pos['lat'], 'lon': parent_pos['lon'], 'sog': parent_pos.get('sog', 0), 'cog': parent_pos.get('cog', 0), 'role': 'PARENT', 'isParent': True, }) for g in members_for_snap: members_out.append({ 'mmsi': g['mmsi'], 'name': g['name'], 'lat': g['lat'], 'lon': g['lon'], 'sog': g['sog'], 'cog': g['cog'], 'role': 'GEAR', 'isParent': False, }) color = _COLOR_GEAR_IN_ZONE if in_zone else _COLOR_GEAR_OUT_ZONE snapshots.append({ 'group_type': 'GEAR_IN_ZONE' if in_zone else 'GEAR_OUT_ZONE', 'group_key': parent_name, 'group_label': parent_name, 'sub_cluster_id': group.get('sub_cluster_id', 0), 'resolution': resolution, 'snapshot_time': now, 'polygon_wkt': polygon_wkt, 'center_wkt': center_wkt, 'area_sq_nm': area_sq_nm, 'member_count': len(members_out), 'zone_id': zone_id, 'zone_name': zone_name, 'members': members_out, 'color': color, }) return snapshots