From 2441e3068abec3397f36acd67d577df3e3213f69 Mon Sep 17 00:00:00 2001 From: htlee Date: Tue, 24 Mar 2026 13:30:31 +0900 Subject: [PATCH] =?UTF-8?q?feat(prediction):=20=EC=84=A0=EB=8B=A8/?= =?UTF-8?q?=EC=96=B4=EA=B5=AC=EA=B7=B8=EB=A3=B9=20=ED=8F=B4=EB=A6=AC?= =?UTF-8?q?=EA=B3=A4=20=EC=84=9C=EB=B2=84=EC=82=AC=EC=9D=B4=EB=93=9C=20?= =?UTF-8?q?=EC=83=9D=EC=84=B1=20+=20PostGIS=20=EC=A0=80=EC=9E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - DB migration 009: group_polygon_snapshots 테이블 (PostGIS geometry) - polygon_builder.py: Shapely 기반 convex hull + buffer 폴리곤 생성 - scheduler.py: 5분 주기 분석 사이클에 폴리곤 생성 Step 4.5 통합 - fleet_tracker.py: get_company_vessels() 메서드 추가 - kcgdb.py: save_group_snapshots(), cleanup_group_snapshots() 추가 - requirements.txt: shapely>=2.0 추가 --- database/migration/009_group_polygons.sql | 49 +++ prediction/algorithms/polygon_builder.py | 422 ++++++++++++++++++++++ prediction/db/kcgdb.py | 74 ++++ prediction/fleet_tracker.py | 13 + prediction/requirements.txt | 1 + prediction/scheduler.py | 17 + 6 files changed, 576 insertions(+) create mode 100644 database/migration/009_group_polygons.sql create mode 100644 prediction/algorithms/polygon_builder.py diff --git a/database/migration/009_group_polygons.sql b/database/migration/009_group_polygons.sql new file mode 100644 index 0000000..e8b68b0 --- /dev/null +++ b/database/migration/009_group_polygons.sql @@ -0,0 +1,49 @@ +-- 009: 선단/어구그룹 폴리곤 스냅샷 테이블 +-- 5분 주기 APPEND, 7일 보존 + +SET search_path TO kcg, public; + +CREATE TABLE IF NOT EXISTS kcg.group_polygon_snapshots ( + id BIGSERIAL PRIMARY KEY, + + -- 그룹 식별 + group_type VARCHAR(20) NOT NULL, -- FLEET | GEAR_IN_ZONE | GEAR_OUT_ZONE + group_key VARCHAR(100) NOT NULL, -- fleet: company_id, gear: parent_name + group_label TEXT, -- 표시명 (회사명 또는 모선명) + + -- 스냅샷 시각 + snapshot_time TIMESTAMPTZ NOT NULL, + + -- PostGIS geometry + polygon geometry(Polygon, 4326), -- convex hull + buffer (3점 미만 시 NULL) + center_point geometry(Point, 4326), -- 중심점 + + -- 지표 + area_sq_nm DOUBLE PRECISION DEFAULT 0, -- 면적 (제곱 해리) + member_count INT NOT NULL DEFAULT 0, -- 소속 선박/어구 수 + + -- 수역 분류 (어구그룹용) + zone_id VARCHAR(20), -- ZONE_I ~ ZONE_IV | OUTSIDE + zone_name TEXT, + + -- 멤버 상세 (JSONB 배열) + members JSONB NOT NULL DEFAULT '[]', + -- [{mmsi, name, lat, lon, sog, cog, role, isParent}] + + -- 색상 힌트 + color VARCHAR(20), + + created_at TIMESTAMPTZ DEFAULT NOW() +); + +-- 조회 성능 인덱스 +CREATE INDEX IF NOT EXISTS idx_gps_type_time + ON kcg.group_polygon_snapshots(group_type, snapshot_time DESC); +CREATE INDEX IF NOT EXISTS idx_gps_key_time + ON kcg.group_polygon_snapshots(group_key, snapshot_time DESC); +CREATE INDEX IF NOT EXISTS idx_gps_snapshot_time + ON kcg.group_polygon_snapshots(snapshot_time DESC); + +-- 공간 인덱스 +CREATE INDEX IF NOT EXISTS idx_gps_polygon_gist + ON kcg.group_polygon_snapshots USING GIST(polygon); diff --git a/prediction/algorithms/polygon_builder.py b/prediction/algorithms/polygon_builder.py new file mode 100644 index 0000000..5f014ac --- /dev/null +++ b/prediction/algorithms/polygon_builder.py @@ -0,0 +1,422 @@ +"""선단/어구그룹 폴리곤 생성기. + +프론트엔드 FleetClusterLayer.tsx의 어구그룹 탐지 + convexHull/padPolygon 로직을 +Python으로 이관한다. Shapely 라이브러리로 폴리곤 생성. +""" + +from __future__ import annotations + +import logging +import math +import re +from datetime import datetime, timezone +from typing import Optional + +try: + from shapely.geometry import MultiPoint, Point + from shapely import wkt as shapely_wkt + _SHAPELY_AVAILABLE = True +except ImportError: + _SHAPELY_AVAILABLE = False + +from algorithms.location import classify_zone + +logger = logging.getLogger(__name__) + +# 프론트 FleetClusterLayer.tsx gearGroupMap 패턴과 동일 +GEAR_PATTERN = re.compile(r'^(.+?)_\d+_\d+_?$') +MAX_DIST_DEG = 0.15 # ~10NM +STALE_SEC = 3600 # 60분 +FLEET_BUFFER_DEG = 0.02 +GEAR_BUFFER_DEG = 0.01 +MIN_GEAR_GROUP_SIZE = 2 # 최소 어구 수 (비허가 구역 외) + +# 수역 내 어구 색상, 수역 외 어구 색상 +_COLOR_GEAR_IN_ZONE = '#ef4444' +_COLOR_GEAR_OUT_ZONE = '#f97316' + +# classify_zone이 수역 내로 판정하는 zone 값 목록 +_IN_ZONE_PREFIXES = ('ZONE_',) + + +def _is_in_zone(zone_info: dict) -> bool: + """classify_zone 결과가 특정어업수역 내인지 판별.""" + zone = zone_info.get('zone', '') + return any(zone.startswith(prefix) for prefix in _IN_ZONE_PREFIXES) + + +def _cluster_color(seed: int) -> str: + """프론트 clusterColor(id) 이관 — hsl({(seed * 137) % 360}, 80%, 55%).""" + h = (seed * 137) % 360 + return f'hsl({h}, 80%, 55%)' + + +def compute_area_sq_nm(polygon, center_lat: float) -> float: + """Shapely Polygon의 면적(degrees²) → 제곱 해리 변환. + + 1도 위도 ≈ 60 NM, 1도 경도 ≈ 60 * cos(lat) NM + sq_nm = area_deg2 * 60 * 60 * cos(center_lat_rad) + """ + area_deg2 = polygon.area + center_lat_rad = math.radians(center_lat) + sq_nm = area_deg2 * 60.0 * 60.0 * math.cos(center_lat_rad) + return round(sq_nm, 4) + + +def build_group_polygon( + points: list[tuple[float, float]], + buffer_deg: float, +) -> tuple[Optional[str], Optional[str], float, float, float]: + """좌표 목록으로 버퍼 폴리곤을 생성한다. + + Args: + points: (lon, lat) 좌표 목록 — Shapely (x, y) 순서. + buffer_deg: 버퍼 크기(도). + + Returns: + (polygon_wkt, center_wkt, area_sq_nm, center_lat, center_lon) + — polygon_wkt/center_wkt: ST_GeomFromText에 사용할 WKT 문자열. + — 좌표가 없거나 Shapely 미설치 시 (None, None, 0.0, 0.0, 0.0). + """ + if not _SHAPELY_AVAILABLE: + logger.warning('shapely 미설치 — build_group_polygon 건너뜀') + return None, None, 0.0, 0.0, 0.0 + + if not points: + return None, None, 0.0, 0.0, 0.0 + + if len(points) == 1: + geom = Point(points[0]).buffer(buffer_deg) + elif len(points) == 2: + # LineString → buffer로 Polygon 생성 + from shapely.geometry import LineString + geom = LineString(points).buffer(buffer_deg) + else: + # 3점 이상 → convex_hull → buffer + geom = MultiPoint(points).convex_hull.buffer(buffer_deg) + + # 중심 계산 + centroid = geom.centroid + center_lon = centroid.x + center_lat = centroid.y + + area_sq_nm = compute_area_sq_nm(geom, center_lat) + polygon_wkt = shapely_wkt.dumps(geom, rounding_precision=6) + center_wkt = f'POINT({center_lon:.6f} {center_lat:.6f})' + + return polygon_wkt, center_wkt, area_sq_nm, center_lat, center_lon + + +def detect_gear_groups( + vessel_dfs: dict, + vessel_store, + now: Optional[datetime] = None, +) -> list[dict]: + """어구 이름 패턴으로 어구그룹을 탐지한다. + + 프론트엔드 FleetClusterLayer.tsx gearGroupMap useMemo 로직 이관. + + Args: + vessel_dfs: {mmsi: DataFrame} — 각 DataFrame은 lat, lon, sog, cog, timestamp 칼럼. + vessel_store: VesselStore — get_vessel_info(mmsi) → {name, ...}. + now: 기준 시각 (None이면 UTC now). + + Returns: + [{parent_name, parent_mmsi, members: [{mmsi, name, lat, lon, sog, cog}]}] + """ + if now is None: + now = datetime.now(timezone.utc) + + # 선박명 → mmsi 맵 (모선 탐색용) + name_to_mmsi: dict[str, str] = {} + for mmsi, df in vessel_dfs.items(): + if df is None or len(df) == 0: + continue + info = vessel_store.get_vessel_info(mmsi) + name: str = (info or {}).get('name', '') or '' + name = name.strip() + if name and not GEAR_PATTERN.match(name): + name_to_mmsi[name] = mmsi + + # 1단계: 같은 모선명 어구 수집 (60분 이내만) + raw_groups: dict[str, list[dict]] = {} + for mmsi, df in vessel_dfs.items(): + if df is None or len(df) == 0: + continue + last = df.iloc[-1] + ts = last.get('timestamp') if hasattr(last, 'get') else last['timestamp'] + + # timestamp → datetime 변환 + if isinstance(ts, datetime): + last_dt = ts if ts.tzinfo is not None else ts.replace(tzinfo=timezone.utc) + else: + # pandas Timestamp 또는 숫자(unix seconds) + try: + import pandas as pd + last_dt = pd.Timestamp(ts).to_pydatetime() + if last_dt.tzinfo is None: + last_dt = last_dt.replace(tzinfo=timezone.utc) + except Exception: + continue + + age_sec = (now - last_dt).total_seconds() + if age_sec > STALE_SEC: + continue + + info = vessel_store.get_vessel_info(mmsi) + name = (info or {}).get('name', '') or '' + name = name.strip() + + m = GEAR_PATTERN.match(name) + if not m: + continue + + parent_name = m.group(1).strip() + entry = { + 'mmsi': mmsi, + 'name': name, + 'lat': float(last['lat']), + 'lon': float(last['lon']), + 'sog': float(last.get('sog', 0) if hasattr(last, 'get') else last['sog']), + 'cog': float(last.get('cog', 0) if hasattr(last, 'get') else last['cog']), + } + raw_groups.setdefault(parent_name, []).append(entry) + + # 2단계: 거리 기반 서브 클러스터링 (anchor 기준 MAX_DIST_DEG 이내만) + results: list[dict] = [] + for parent_name, gears in raw_groups.items(): + parent_mmsi = name_to_mmsi.get(parent_name) + + # 기준점(anchor): 모선 있으면 모선 위치, 없으면 첫 어구 + anchor_lat: Optional[float] = None + anchor_lon: Optional[float] = None + + if parent_mmsi and parent_mmsi in vessel_dfs: + parent_df = vessel_dfs[parent_mmsi] + if parent_df is not None and len(parent_df) > 0: + parent_last = parent_df.iloc[-1] + anchor_lat = float(parent_last['lat']) + anchor_lon = float(parent_last['lon']) + + if anchor_lat is None and gears: + anchor_lat = gears[0]['lat'] + anchor_lon = gears[0]['lon'] + + if anchor_lat is None or anchor_lon is None: + continue + + # MAX_DIST_DEG 이내 어구만 포함 + _anchor_lat: float = anchor_lat + _anchor_lon: float = anchor_lon + nearby = [ + g for g in gears + if abs(g['lat'] - _anchor_lat) <= MAX_DIST_DEG + and abs(g['lon'] - _anchor_lon) <= MAX_DIST_DEG + ] + + if not nearby: + continue + + # members 구성: 어구 목록 + members = [ + { + 'mmsi': g['mmsi'], + 'name': g['name'], + 'lat': g['lat'], + 'lon': g['lon'], + 'sog': g['sog'], + 'cog': g['cog'], + } + for g in nearby + ] + + results.append({ + 'parent_name': parent_name, + 'parent_mmsi': parent_mmsi, + 'members': members, + }) + + return results + + +def build_all_group_snapshots( + vessel_dfs: dict, + vessel_store, + company_vessels: dict[int, list[str]], + companies: dict[int, dict], +) -> list[dict]: + """선단(FLEET) + 어구그룹(GEAR) 폴리곤 스냅샷을 생성한다. + + Shapely 미설치 시 빈 리스트를 반환한다. + + Args: + vessel_dfs: {mmsi: DataFrame}. + vessel_store: VesselStore — get_vessel_info(mmsi). + company_vessels: {company_id: [mmsi_list]}. + companies: {id: {name_cn, name_en}}. + + Returns: + DB INSERT용 dict 목록. + """ + if not _SHAPELY_AVAILABLE: + logger.warning('shapely 미설치 — build_all_group_snapshots 빈 리스트 반환') + return [] + + now = datetime.now(timezone.utc) + snapshots: list[dict] = [] + + # ── FLEET 타입: company_vessels 순회 ────────────────────────── + for company_id, mmsi_list in company_vessels.items(): + company_info = companies.get(company_id, {}) + group_label = company_info.get('name_cn') or company_info.get('name_en') or str(company_id) + + # 각 선박의 최신 좌표 추출 + points: list[tuple[float, float]] = [] + members: list[dict] = [] + + for mmsi in mmsi_list: + df = vessel_dfs.get(mmsi) + if df is None or len(df) == 0: + continue + last = df.iloc[-1] + lat = float(last['lat']) + lon = float(last['lon']) + sog_val = last.get('sog', 0) if hasattr(last, 'get') else last['sog'] + cog_val = last.get('cog', 0) if hasattr(last, 'get') else last['cog'] + sog = float(sog_val) + cog = float(cog_val) + points.append((lon, lat)) + members.append({ + 'mmsi': mmsi, + 'name': (vessel_store.get_vessel_info(mmsi) or {}).get('name', ''), + 'lat': lat, + 'lon': lon, + 'sog': sog, + 'cog': cog, + 'role': 'LEADER' if mmsi == mmsi_list[0] else 'MEMBER', + 'isParent': False, + }) + + # 2척 미만은 폴리곤 미생성 + if len(points) < 2: + continue + + polygon_wkt, center_wkt, area_sq_nm, center_lat, center_lon = build_group_polygon( + points, FLEET_BUFFER_DEG + ) + + snapshots.append({ + 'group_type': 'FLEET', + 'group_key': str(company_id), + 'group_label': group_label, + 'snapshot_time': now, + 'polygon_wkt': polygon_wkt, + 'center_wkt': center_wkt, + 'area_sq_nm': area_sq_nm, + 'member_count': len(members), + 'zone_id': None, + 'zone_name': None, + 'members': members, + 'color': _cluster_color(company_id), + }) + + # ── GEAR 타입: detect_gear_groups 결과 순회 ─────────────────── + gear_groups = detect_gear_groups(vessel_dfs, vessel_store, now=now) + + for group in gear_groups: + parent_name: str = group['parent_name'] + parent_mmsi: Optional[str] = group['parent_mmsi'] + gear_members: list[dict] = group['members'] + + # 수역 분류: anchor(모선 or 첫 어구) 위치 기준 + anchor_lat: Optional[float] = None + anchor_lon: Optional[float] = None + + if parent_mmsi and parent_mmsi in vessel_dfs: + parent_df = vessel_dfs.get(parent_mmsi) + if parent_df is not None and len(parent_df) > 0: + p_last = parent_df.iloc[-1] + anchor_lat = float(p_last['lat']) + anchor_lon = float(p_last['lon']) + + if anchor_lat is None and gear_members: + anchor_lat = gear_members[0]['lat'] + anchor_lon = gear_members[0]['lon'] + + if anchor_lat is None: + continue + + zone_info = classify_zone(float(anchor_lat), float(anchor_lon)) + in_zone = _is_in_zone(zone_info) + zone_id = zone_info.get('zone') if in_zone else None + zone_name = zone_info.get('zone_name') if in_zone else None + + # 비허가(수역 외) 어구: MIN_GEAR_GROUP_SIZE 미만 제외 + if not in_zone and len(gear_members) < MIN_GEAR_GROUP_SIZE: + continue + + # 폴리곤 points: 어구 좌표 + 모선 좌표 + points = [(g['lon'], g['lat']) for g in gear_members] + if parent_mmsi and parent_mmsi in vessel_dfs: + parent_df = vessel_dfs.get(parent_mmsi) + if parent_df is not None and len(parent_df) > 0: + p_last = parent_df.iloc[-1] + p_lat = float(p_last['lat']) + p_lon = float(p_last['lon']) + if (p_lon, p_lat) not in points: + points.append((p_lon, p_lat)) + + polygon_wkt, center_wkt, area_sq_nm, _clat, _clon = build_group_polygon( + points, GEAR_BUFFER_DEG + ) + + # members JSONB 구성 + members_out: list[dict] = [] + # 모선 먼저 + if parent_mmsi and parent_mmsi in vessel_dfs: + parent_df = vessel_dfs.get(parent_mmsi) + if parent_df is not None and len(parent_df) > 0: + p_last = parent_df.iloc[-1] + p_sog = float(p_last.get('sog', 0) if hasattr(p_last, 'get') else p_last['sog']) + p_cog = float(p_last.get('cog', 0) if hasattr(p_last, 'get') else p_last['cog']) + members_out.append({ + 'mmsi': parent_mmsi, + 'name': parent_name, + 'lat': float(p_last['lat']), + 'lon': float(p_last['lon']), + 'sog': p_sog, + 'cog': p_cog, + 'role': 'PARENT', + 'isParent': True, + }) + # 어구 목록 + for g in gear_members: + members_out.append({ + 'mmsi': g['mmsi'], + 'name': g['name'], + 'lat': g['lat'], + 'lon': g['lon'], + 'sog': g['sog'], + 'cog': g['cog'], + 'role': 'GEAR', + 'isParent': False, + }) + + color = _COLOR_GEAR_IN_ZONE if in_zone else _COLOR_GEAR_OUT_ZONE + + snapshots.append({ + 'group_type': 'GEAR_IN_ZONE' if in_zone else 'GEAR_OUT_ZONE', + 'group_key': parent_name, + 'group_label': parent_name, + 'snapshot_time': now, + 'polygon_wkt': polygon_wkt, + 'center_wkt': center_wkt, + 'area_sq_nm': area_sq_nm, + 'member_count': len(members_out), + 'zone_id': zone_id, + 'zone_name': zone_name, + 'members': members_out, + 'color': color, + }) + + return snapshots diff --git a/prediction/db/kcgdb.py b/prediction/db/kcgdb.py index 12a362b..ba4282f 100644 --- a/prediction/db/kcgdb.py +++ b/prediction/db/kcgdb.py @@ -1,3 +1,4 @@ +import json import logging from contextlib import contextmanager from typing import TYPE_CHECKING, Optional @@ -137,3 +138,76 @@ def cleanup_old(hours: int = 48) -> int: except Exception as e: logger.error('failed to cleanup old results: %s', e) return 0 + + +def save_group_snapshots(snapshots: list[dict]) -> int: + """group_polygon_snapshots에 폴리곤 스냅샷 배치 INSERT. + + snapshots: polygon_builder.build_all_group_snapshots() 결과 + 각 항목은: group_type, group_key, group_label, snapshot_time, + polygon_wkt (str|None), center_wkt (str|None), + area_sq_nm, member_count, zone_id, zone_name, + members (list[dict]), color + """ + if not snapshots: + return 0 + + insert_sql = """ + INSERT INTO kcg.group_polygon_snapshots ( + group_type, group_key, group_label, snapshot_time, + polygon, center_point, area_sq_nm, member_count, + zone_id, zone_name, members, color + ) VALUES ( + %s, %s, %s, %s, + ST_GeomFromText(%s, 4326), ST_GeomFromText(%s, 4326), + %s, %s, %s, %s, %s::jsonb, %s + ) + """ + + inserted = 0 + try: + with get_conn() as conn: + with conn.cursor() as cur: + for s in snapshots: + cur.execute( + insert_sql, + ( + s['group_type'], + s['group_key'], + s['group_label'], + s['snapshot_time'], + s.get('polygon_wkt'), + s.get('center_wkt'), + s.get('area_sq_nm'), + s.get('member_count'), + s.get('zone_id'), + s.get('zone_name'), + json.dumps(s.get('members', []), ensure_ascii=False), + s.get('color'), + ), + ) + inserted += 1 + conn.commit() + logger.info('saved %d group polygon snapshots', inserted) + return inserted + except Exception as e: + logger.error('failed to save group snapshots: %s', e) + return 0 + + +def cleanup_group_snapshots(days: int = 7) -> int: + """오래된 그룹 폴리곤 스냅샷 삭제.""" + try: + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute( + f"DELETE FROM kcg.group_polygon_snapshots WHERE snapshot_time < NOW() - INTERVAL '{days} days'", + ) + deleted = cur.rowcount + conn.commit() + if deleted > 0: + logger.info('cleaned up %d old group snapshots (older than %dd)', deleted, days) + return deleted + except Exception as e: + logger.error('failed to cleanup group snapshots: %s', e) + return 0 diff --git a/prediction/fleet_tracker.py b/prediction/fleet_tracker.py index 981d7db..26788f3 100644 --- a/prediction/fleet_tracker.py +++ b/prediction/fleet_tracker.py @@ -317,6 +317,19 @@ class FleetTracker: cur.close() logger.info('fleet snapshot saved: %d companies', len(company_vessels)) + def get_company_vessels(self, vessel_dfs: dict[str, 'pd.DataFrame']) -> dict[int, list[str]]: + """현재 AIS 수신 중인 등록 선단의 회사별 MMSI 목록 반환. + + Returns: {company_id: [mmsi, ...]} + """ + result: dict[int, list[str]] = {} + for mmsi, vid in self._mmsi_to_vid.items(): + v = self._vessels.get(vid) + if not v or mmsi not in vessel_dfs: + continue + result.setdefault(v['company_id'], []).append(mmsi) + return result + # 싱글턴 fleet_tracker = FleetTracker() diff --git a/prediction/requirements.txt b/prediction/requirements.txt index 7268415..46d3abc 100644 --- a/prediction/requirements.txt +++ b/prediction/requirements.txt @@ -6,3 +6,4 @@ numpy>=1.26 pandas>=2.2 scikit-learn>=1.5 apscheduler>=3.10 +shapely>=2.0 diff --git a/prediction/scheduler.py b/prediction/scheduler.py index ff48c3b..1cbd9a7 100644 --- a/prediction/scheduler.py +++ b/prediction/scheduler.py @@ -99,6 +99,23 @@ def run_analysis_cycle(): fleet_tracker.save_snapshot(vessel_dfs, kcg_conn) + # 4.5 그룹 폴리곤 생성 + 저장 + try: + from algorithms.polygon_builder import detect_gear_groups, build_all_group_snapshots + + company_vessels = fleet_tracker.get_company_vessels(vessel_dfs) + gear_groups = detect_gear_groups(vessel_dfs, vessel_store) + group_snapshots = build_all_group_snapshots( + vessel_dfs, vessel_store, company_vessels, + fleet_tracker._companies, + ) + saved = kcgdb.save_group_snapshots(group_snapshots) + cleaned = kcgdb.cleanup_group_snapshots(days=7) + logger.info('group polygons: %d saved, %d cleaned, %d gear groups', + saved, cleaned, len(gear_groups)) + except Exception as e: + logger.warning('group polygon generation failed: %s', e) + # 5. 선박별 추가 알고리즘 → AnalysisResult 생성 results = [] for c in classifications: