FleetClusterLayer.tsx 2357줄 → 10개 파일 분리: - fleetClusterTypes/Utils/Constants: 타입, 기하 함수, 모델 상수 - useFleetClusterGeoJson: 27개 useMemo GeoJSON 훅 - FleetClusterMapLayers: MapLibre Source/Layer JSX - CorrelationPanel/HistoryReplayController: 패널 서브컴포넌트 - GearGroupSection/FleetGearListPanel: 좌측 목록 (DRY) - FleetClusterLayer: 오케스트레이터 524줄 deck.gl + Zustand 리플레이 기반 (Phase 0~2): - zustand 5.0.12, @deck.gl/geo-layers 9.2.11 설치 - gearReplayStore: Zustand + rAF 애니메이션 루프 - gearReplayPreprocess: TripsLayer 전처리 + cursor O(1) 보간 - useGearReplayLayers: deck.gl 레이어 빌더 (10fps 스로틀) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
477 lines
17 KiB
Python
477 lines
17 KiB
Python
"""선단/어구그룹 폴리곤 생성기.
|
|
|
|
프론트엔드 FleetClusterLayer.tsx의 어구그룹 탐지 + convexHull/padPolygon 로직을
|
|
Python으로 이관한다. Shapely 라이브러리로 폴리곤 생성.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import math
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
try:
|
|
from shapely.geometry import MultiPoint, Point
|
|
from shapely import wkt as shapely_wkt
|
|
_SHAPELY_AVAILABLE = True
|
|
except ImportError:
|
|
_SHAPELY_AVAILABLE = False
|
|
|
|
from algorithms.location import classify_zone
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# 어구 이름 패턴 — _ 필수 (공백만으로는 어구 미판정, fleet_tracker.py와 동일)
|
|
GEAR_PATTERN = re.compile(r'^(.+?)_(?=\S*\d)\S+(?:[_ ]\S*)*[_ ]*$|^(\d+)$')
|
|
MAX_DIST_DEG = 0.15 # ~10NM
|
|
STALE_SEC = 21600 # 6시간 (어구 P75 갭 3.5h, P90 갭 8h 커버)
|
|
FLEET_BUFFER_DEG = 0.02
|
|
GEAR_BUFFER_DEG = 0.01
|
|
MIN_GEAR_GROUP_SIZE = 2 # 최소 어구 수 (비허가 구역 외)
|
|
|
|
# 수역 내 어구 색상, 수역 외 어구 색상
|
|
_COLOR_GEAR_IN_ZONE = '#ef4444'
|
|
_COLOR_GEAR_OUT_ZONE = '#f97316'
|
|
|
|
# classify_zone이 수역 내로 판정하는 zone 값 목록
|
|
_IN_ZONE_PREFIXES = ('ZONE_',)
|
|
|
|
|
|
def _is_in_zone(zone_info: dict) -> bool:
|
|
"""classify_zone 결과가 특정어업수역 내인지 판별."""
|
|
zone = zone_info.get('zone', '')
|
|
return any(zone.startswith(prefix) for prefix in _IN_ZONE_PREFIXES)
|
|
|
|
|
|
def _cluster_color(seed: int) -> str:
|
|
"""프론트 clusterColor(id) 이관 — hsl({(seed * 137) % 360}, 80%, 55%)."""
|
|
h = (seed * 137) % 360
|
|
return f'hsl({h}, 80%, 55%)'
|
|
|
|
|
|
def compute_area_sq_nm(polygon, center_lat: float) -> float:
|
|
"""Shapely Polygon의 면적(degrees²) → 제곱 해리 변환.
|
|
|
|
1도 위도 ≈ 60 NM, 1도 경도 ≈ 60 * cos(lat) NM
|
|
sq_nm = area_deg2 * 60 * 60 * cos(center_lat_rad)
|
|
"""
|
|
area_deg2 = polygon.area
|
|
center_lat_rad = math.radians(center_lat)
|
|
sq_nm = area_deg2 * 60.0 * 60.0 * math.cos(center_lat_rad)
|
|
return round(sq_nm, 4)
|
|
|
|
|
|
def build_group_polygon(
|
|
points: list[tuple[float, float]],
|
|
buffer_deg: float,
|
|
) -> tuple[Optional[str], Optional[str], float, float, float]:
|
|
"""좌표 목록으로 버퍼 폴리곤을 생성한다.
|
|
|
|
Args:
|
|
points: (lon, lat) 좌표 목록 — Shapely (x, y) 순서.
|
|
buffer_deg: 버퍼 크기(도).
|
|
|
|
Returns:
|
|
(polygon_wkt, center_wkt, area_sq_nm, center_lat, center_lon)
|
|
— polygon_wkt/center_wkt: ST_GeomFromText에 사용할 WKT 문자열.
|
|
— 좌표가 없거나 Shapely 미설치 시 (None, None, 0.0, 0.0, 0.0).
|
|
"""
|
|
if not _SHAPELY_AVAILABLE:
|
|
logger.warning('shapely 미설치 — build_group_polygon 건너뜀')
|
|
return None, None, 0.0, 0.0, 0.0
|
|
|
|
if not points:
|
|
return None, None, 0.0, 0.0, 0.0
|
|
|
|
if len(points) == 1:
|
|
geom = Point(points[0]).buffer(buffer_deg)
|
|
elif len(points) == 2:
|
|
# LineString → buffer로 Polygon 생성
|
|
from shapely.geometry import LineString
|
|
geom = LineString(points).buffer(buffer_deg)
|
|
else:
|
|
# 3점 이상 → convex_hull → buffer
|
|
geom = MultiPoint(points).convex_hull.buffer(buffer_deg)
|
|
|
|
# 중심 계산
|
|
centroid = geom.centroid
|
|
center_lon = centroid.x
|
|
center_lat = centroid.y
|
|
|
|
area_sq_nm = compute_area_sq_nm(geom, center_lat)
|
|
polygon_wkt = shapely_wkt.dumps(geom, rounding_precision=6)
|
|
center_wkt = f'POINT({center_lon:.6f} {center_lat:.6f})'
|
|
|
|
return polygon_wkt, center_wkt, area_sq_nm, center_lat, center_lon
|
|
|
|
|
|
def detect_gear_groups(
|
|
vessel_store,
|
|
now: Optional[datetime] = None,
|
|
) -> list[dict]:
|
|
"""어구 이름 패턴으로 어구그룹을 탐지한다.
|
|
|
|
프론트엔드 FleetClusterLayer.tsx gearGroupMap useMemo 로직 이관.
|
|
전체 AIS 선박(vessel_store._tracks)에서 어구 패턴을 탐지한다.
|
|
|
|
Args:
|
|
vessel_store: VesselStore — get_all_latest_positions() + get_vessel_info().
|
|
now: 기준 시각 (None이면 UTC now).
|
|
|
|
Returns:
|
|
[{parent_name, parent_mmsi, members: [{mmsi, name, lat, lon, sog, cog}]}]
|
|
"""
|
|
if now is None:
|
|
now = datetime.now(timezone.utc)
|
|
|
|
# 전체 선박의 최신 위치 가져오기
|
|
all_positions = vessel_store.get_all_latest_positions()
|
|
|
|
# 선박명 → mmsi 맵 (모선 탐색용, 어구 패턴이 아닌 선박만)
|
|
# 정규화 키(공백 제거) + 원본 이름 모두 등록
|
|
name_to_mmsi: dict[str, str] = {}
|
|
for mmsi, pos in all_positions.items():
|
|
name = (pos.get('name') or '').strip()
|
|
if name and not GEAR_PATTERN.match(name):
|
|
name_to_mmsi[name] = mmsi
|
|
name_to_mmsi[name.replace(' ', '')] = mmsi
|
|
|
|
# parent 이름 정규화 — 공백 제거 후 같은 모선은 하나로 통합
|
|
def _normalize_parent(raw: str) -> str:
|
|
return raw.replace(' ', '')
|
|
|
|
# 1단계: 같은 모선명 어구 수집 (60분 이내만, 공백 정규화)
|
|
raw_groups: dict[str, list[dict]] = {}
|
|
parent_display: dict[str, str] = {} # normalized → 대표 원본 이름
|
|
for mmsi, pos in all_positions.items():
|
|
name = (pos.get('name') or '').strip()
|
|
if not name:
|
|
continue
|
|
|
|
# staleness 체크
|
|
ts = pos.get('timestamp')
|
|
if ts is not None:
|
|
if isinstance(ts, datetime):
|
|
last_dt = ts if ts.tzinfo is not None else ts.replace(tzinfo=timezone.utc)
|
|
else:
|
|
try:
|
|
import pandas as pd
|
|
last_dt = pd.Timestamp(ts).to_pydatetime()
|
|
if last_dt.tzinfo is None:
|
|
last_dt = last_dt.replace(tzinfo=timezone.utc)
|
|
except Exception:
|
|
continue
|
|
age_sec = (now - last_dt).total_seconds()
|
|
if age_sec > STALE_SEC:
|
|
continue
|
|
|
|
m = GEAR_PATTERN.match(name)
|
|
if not m:
|
|
continue
|
|
|
|
parent_raw = (m.group(1) or name).strip()
|
|
parent_key = _normalize_parent(parent_raw)
|
|
# 대표 이름: 공백 없는 버전 우선 (더 정규화된 형태)
|
|
if parent_key not in parent_display or ' ' not in parent_raw:
|
|
parent_display[parent_key] = parent_raw
|
|
entry = {
|
|
'mmsi': mmsi,
|
|
'name': name,
|
|
'lat': pos['lat'],
|
|
'lon': pos['lon'],
|
|
'sog': pos.get('sog', 0),
|
|
'cog': pos.get('cog', 0),
|
|
}
|
|
raw_groups.setdefault(parent_key, []).append(entry)
|
|
|
|
# 2단계: 연결 기반 서브 클러스터링 (각 어구가 클러스터 내 최소 1개와 MAX_DIST_DEG 이내)
|
|
# 같은 parent 이름이라도 거리가 먼 어구들은 별도 서브그룹으로 분리
|
|
results: list[dict] = []
|
|
for parent_key, gears in raw_groups.items():
|
|
parent_mmsi = name_to_mmsi.get(parent_key)
|
|
display_name = parent_display.get(parent_key, parent_key)
|
|
|
|
if not gears:
|
|
continue
|
|
|
|
# 모선 위치 (있으면 시드 포인트로 활용)
|
|
seed_lat: Optional[float] = None
|
|
seed_lon: Optional[float] = None
|
|
if parent_mmsi and parent_mmsi in all_positions:
|
|
p = all_positions[parent_mmsi]
|
|
seed_lat, seed_lon = p['lat'], p['lon']
|
|
|
|
# 연결 기반 클러스터링 (Union-Find 방식)
|
|
n = len(gears)
|
|
parent_uf = list(range(n))
|
|
|
|
def find(x: int) -> int:
|
|
while parent_uf[x] != x:
|
|
parent_uf[x] = parent_uf[parent_uf[x]]
|
|
x = parent_uf[x]
|
|
return x
|
|
|
|
def union(a: int, b: int) -> None:
|
|
ra, rb = find(a), find(b)
|
|
if ra != rb:
|
|
parent_uf[ra] = rb
|
|
|
|
for i in range(n):
|
|
for j in range(i + 1, n):
|
|
if (abs(gears[i]['lat'] - gears[j]['lat']) <= MAX_DIST_DEG
|
|
and abs(gears[i]['lon'] - gears[j]['lon']) <= MAX_DIST_DEG):
|
|
union(i, j)
|
|
|
|
# 클러스터별 그룹화
|
|
clusters: dict[int, list[int]] = {}
|
|
for i in range(n):
|
|
clusters.setdefault(find(i), []).append(i)
|
|
|
|
# 모선이 있으면 모선과 가장 가까운 클러스터에 연결 (MAX_DIST_DEG 이내만)
|
|
seed_cluster_root: Optional[int] = None
|
|
if seed_lat is not None and seed_lon is not None:
|
|
best_dist = float('inf')
|
|
for root, idxs in clusters.items():
|
|
for i in idxs:
|
|
d = abs(gears[i]['lat'] - seed_lat) + abs(gears[i]['lon'] - seed_lon)
|
|
if d < best_dist:
|
|
best_dist = d
|
|
seed_cluster_root = root
|
|
# 모선이 어느 클러스터와도 MAX_DIST_DEG 초과 → 연결하지 않음
|
|
if best_dist > MAX_DIST_DEG * 2:
|
|
seed_cluster_root = None
|
|
|
|
# 클러스터마다 서브그룹 생성 (최소 2개 이상이거나 모선 포함)
|
|
for ci, (root, idxs) in enumerate(clusters.items()):
|
|
has_seed = (root == seed_cluster_root)
|
|
if len(idxs) < 2 and not has_seed:
|
|
continue
|
|
|
|
members = [
|
|
{'mmsi': gears[i]['mmsi'], 'name': gears[i]['name'],
|
|
'lat': gears[i]['lat'], 'lon': gears[i]['lon'],
|
|
'sog': gears[i]['sog'], 'cog': gears[i]['cog']}
|
|
for i in idxs
|
|
]
|
|
|
|
# 서브그룹 이름: 1개면 원본, 2개 이상이면 #1, #2
|
|
sub_name = display_name if len(clusters) == 1 else f'{display_name}#{ci + 1}'
|
|
sub_mmsi = parent_mmsi if has_seed else None
|
|
|
|
results.append({
|
|
'parent_name': sub_name,
|
|
'parent_key': parent_key,
|
|
'parent_mmsi': sub_mmsi,
|
|
'members': members,
|
|
})
|
|
|
|
# 3단계: 동일 parent_key 서브그룹 간 근접 병합 (거리 이내 시)
|
|
# prefix 기반 병합은 과도한 그룹화 유발 → 동일 키만 병합
|
|
def _groups_nearby(a: dict, b: dict) -> bool:
|
|
for ma in a['members']:
|
|
for mb in b['members']:
|
|
if abs(ma['lat'] - mb['lat']) <= MAX_DIST_DEG and abs(ma['lon'] - mb['lon']) <= MAX_DIST_DEG:
|
|
return True
|
|
return False
|
|
|
|
merged: list[dict] = []
|
|
skip: set[int] = set()
|
|
results.sort(key=lambda g: len(g['members']), reverse=True)
|
|
for i, big in enumerate(results):
|
|
if i in skip:
|
|
continue
|
|
for j, small in enumerate(results):
|
|
if j <= i or j in skip:
|
|
continue
|
|
# 동일 parent_key만 병합 (prefix 매칭 제거 — 과도한 병합 방지)
|
|
if big['parent_key'] == small['parent_key'] and _groups_nearby(big, small):
|
|
existing_mmsis = {m['mmsi'] for m in big['members']}
|
|
for m in small['members']:
|
|
if m['mmsi'] not in existing_mmsis:
|
|
big['members'].append(m)
|
|
existing_mmsis.add(m['mmsi'])
|
|
if not big['parent_mmsi'] and small['parent_mmsi']:
|
|
big['parent_mmsi'] = small['parent_mmsi']
|
|
skip.add(j)
|
|
del big['parent_key']
|
|
merged.append(big)
|
|
|
|
return merged
|
|
|
|
|
|
def build_all_group_snapshots(
|
|
vessel_store,
|
|
company_vessels: dict[int, list[str]],
|
|
companies: dict[int, dict],
|
|
) -> list[dict]:
|
|
"""선단(FLEET) + 어구그룹(GEAR) 폴리곤 스냅샷을 생성한다.
|
|
|
|
Shapely 미설치 시 빈 리스트를 반환한다.
|
|
|
|
Args:
|
|
vessel_store: VesselStore — get_all_latest_positions() + get_vessel_info().
|
|
company_vessels: {company_id: [mmsi_list]}.
|
|
companies: {id: {name_cn, name_en}}.
|
|
|
|
Returns:
|
|
DB INSERT용 dict 목록.
|
|
"""
|
|
if not _SHAPELY_AVAILABLE:
|
|
logger.warning('shapely 미설치 — build_all_group_snapshots 빈 리스트 반환')
|
|
return []
|
|
|
|
now = datetime.now(timezone.utc)
|
|
snapshots: list[dict] = []
|
|
all_positions = vessel_store.get_all_latest_positions()
|
|
|
|
# ── FLEET 타입: company_vessels 순회 ──────────────────────────
|
|
for company_id, mmsi_list in company_vessels.items():
|
|
company_info = companies.get(company_id, {})
|
|
group_label = company_info.get('name_cn') or company_info.get('name_en') or str(company_id)
|
|
|
|
# 각 선박의 최신 좌표 추출
|
|
points: list[tuple[float, float]] = []
|
|
members: list[dict] = []
|
|
|
|
for mmsi in mmsi_list:
|
|
pos = all_positions.get(mmsi)
|
|
if not pos:
|
|
continue
|
|
lat = pos['lat']
|
|
lon = pos['lon']
|
|
sog = pos.get('sog', 0)
|
|
cog = pos.get('cog', 0)
|
|
points.append((lon, lat))
|
|
members.append({
|
|
'mmsi': mmsi,
|
|
'name': pos.get('name', ''),
|
|
'lat': lat,
|
|
'lon': lon,
|
|
'sog': sog,
|
|
'cog': cog,
|
|
'role': 'LEADER' if mmsi == mmsi_list[0] else 'MEMBER',
|
|
'isParent': False,
|
|
})
|
|
|
|
# 2척 미만은 폴리곤 미생성
|
|
if len(points) < 2:
|
|
continue
|
|
|
|
polygon_wkt, center_wkt, area_sq_nm, center_lat, center_lon = build_group_polygon(
|
|
points, FLEET_BUFFER_DEG
|
|
)
|
|
|
|
snapshots.append({
|
|
'group_type': 'FLEET',
|
|
'group_key': str(company_id),
|
|
'group_label': group_label,
|
|
'snapshot_time': now,
|
|
'polygon_wkt': polygon_wkt,
|
|
'center_wkt': center_wkt,
|
|
'area_sq_nm': area_sq_nm,
|
|
'member_count': len(members),
|
|
'zone_id': None,
|
|
'zone_name': None,
|
|
'members': members,
|
|
'color': _cluster_color(company_id),
|
|
})
|
|
|
|
# ── GEAR 타입: detect_gear_groups 결과 순회 ───────────────────
|
|
gear_groups = detect_gear_groups(vessel_store, now=now)
|
|
|
|
for group in gear_groups:
|
|
parent_name: str = group['parent_name']
|
|
parent_mmsi: Optional[str] = group['parent_mmsi']
|
|
gear_members: list[dict] = group['members']
|
|
|
|
# 수역 분류: anchor(모선 or 첫 어구) 위치 기준
|
|
anchor_lat: Optional[float] = None
|
|
anchor_lon: Optional[float] = None
|
|
|
|
if parent_mmsi and parent_mmsi in all_positions:
|
|
parent_pos = all_positions[parent_mmsi]
|
|
anchor_lat = parent_pos['lat']
|
|
anchor_lon = parent_pos['lon']
|
|
|
|
if anchor_lat is None and gear_members:
|
|
anchor_lat = gear_members[0]['lat']
|
|
anchor_lon = gear_members[0]['lon']
|
|
|
|
if anchor_lat is None:
|
|
continue
|
|
|
|
zone_info = classify_zone(float(anchor_lat), float(anchor_lon))
|
|
in_zone = _is_in_zone(zone_info)
|
|
zone_id = zone_info.get('zone') if in_zone else None
|
|
zone_name = zone_info.get('zone_name') if in_zone else None
|
|
|
|
# 비허가(수역 외) 어구: MIN_GEAR_GROUP_SIZE 미만 제외
|
|
if not in_zone and len(gear_members) < MIN_GEAR_GROUP_SIZE:
|
|
continue
|
|
|
|
# 폴리곤 points: 어구 좌표 + 모선 좌표 (근접 시에만)
|
|
points = [(g['lon'], g['lat']) for g in gear_members]
|
|
parent_nearby = False
|
|
if parent_mmsi and parent_mmsi in all_positions:
|
|
parent_pos = all_positions[parent_mmsi]
|
|
p_lon, p_lat = parent_pos['lon'], parent_pos['lat']
|
|
# 모선이 어구 클러스터 내 최소 1개와 MAX_DIST_DEG*2 이내일 때만 포함
|
|
if any(abs(g['lat'] - p_lat) <= MAX_DIST_DEG * 2
|
|
and abs(g['lon'] - p_lon) <= MAX_DIST_DEG * 2 for g in gear_members):
|
|
if (p_lon, p_lat) not in points:
|
|
points.append((p_lon, p_lat))
|
|
parent_nearby = True
|
|
|
|
polygon_wkt, center_wkt, area_sq_nm, _clat, _clon = build_group_polygon(
|
|
points, GEAR_BUFFER_DEG
|
|
)
|
|
|
|
# members JSONB 구성
|
|
members_out: list[dict] = []
|
|
# 모선 먼저 (근접 시에만)
|
|
if parent_nearby and parent_mmsi and parent_mmsi in all_positions:
|
|
parent_pos = all_positions[parent_mmsi]
|
|
members_out.append({
|
|
'mmsi': parent_mmsi,
|
|
'name': parent_name,
|
|
'lat': parent_pos['lat'],
|
|
'lon': parent_pos['lon'],
|
|
'sog': parent_pos.get('sog', 0),
|
|
'cog': parent_pos.get('cog', 0),
|
|
'role': 'PARENT',
|
|
'isParent': True,
|
|
})
|
|
# 어구 목록
|
|
for g in gear_members:
|
|
members_out.append({
|
|
'mmsi': g['mmsi'],
|
|
'name': g['name'],
|
|
'lat': g['lat'],
|
|
'lon': g['lon'],
|
|
'sog': g['sog'],
|
|
'cog': g['cog'],
|
|
'role': 'GEAR',
|
|
'isParent': False,
|
|
})
|
|
|
|
color = _COLOR_GEAR_IN_ZONE if in_zone else _COLOR_GEAR_OUT_ZONE
|
|
|
|
snapshots.append({
|
|
'group_type': 'GEAR_IN_ZONE' if in_zone else 'GEAR_OUT_ZONE',
|
|
'group_key': parent_name,
|
|
'group_label': parent_name,
|
|
'snapshot_time': now,
|
|
'polygon_wkt': polygon_wkt,
|
|
'center_wkt': center_wkt,
|
|
'area_sq_nm': area_sq_nm,
|
|
'member_count': len(members_out),
|
|
'zone_id': zone_id,
|
|
'zone_name': zone_name,
|
|
'members': members_out,
|
|
'color': color,
|
|
})
|
|
|
|
return snapshots
|