kcg-ai-monitoring/prediction/algorithms/gear_identity.py
htlee a4e29629fc feat(detection): GEAR_IDENTITY_COLLISION 탐지 패턴 추가
동일 어구 이름이 서로 다른 MMSI 로 같은 5분 사이클에 동시 AIS 송출되는
공존 케이스를 신규 탐지 패턴으로 분리해 기록·분류한다. 부수 효과로
fleet_tracker.track_gear_identity 의 PK 충돌로 인한 사이클 실패도 해소.

Prediction
- algorithms/gear_identity.py: detect_gear_name_collisions + classify_severity
- fleet_tracker.py: 공존/교체 분기 분리, UPSERT helper, savepoint 점수 이전
- output/event_generator.py: run_gear_identity_collision_events 추가
- scheduler.py: track_gear_identity 직후 이벤트 승격 호출

Backend (domain/analysis)
- GearIdentityCollision 엔티티 + Repository(Specification+stats)
- GearIdentityCollisionService (@Transactional readOnly / @Auditable resolve)
- GearCollisionController /api/analysis/gear-collisions (list/stats/detail/resolve)
- GearCollisionResponse / StatsResponse / ResolveRequest (record)

DB
- V030__gear_identity_collision.sql: gear_identity_collisions 테이블
  + auth_perm_tree 엔트리(detection:gear-collision nav_sort=950) + 역할별 권한

Frontend
- shared/constants/gearCollisionStatuses.ts + catalogRegistry 등록
- services/gearCollisionApi.ts (list/stats/get/resolve)
- features/detection/GearCollisionDetection.tsx (PageContainer+Section+DataTable
  + 분류 액션 폼, design system SSOT 준수)
- componentRegistry + feature index + i18n detection.json / common.json(ko/en)
2026-04-17 06:53:12 +09:00

158 lines
5.8 KiB
Python

"""
어구 정체성 충돌(GEAR_IDENTITY_COLLISION) 탐지 알고리즘.
동일 어구 이름이 서로 다른 MMSI 로 같은 5분 사이클 내 동시 AIS 송출되는 케이스를
스푸핑/복제 의심 패턴으로 탐지한다. fleet_tracker.track_gear_identity() 루프 진입
전에 사이클 단위로 사전 집계하는 데 사용된다.
"""
from datetime import datetime
from itertools import combinations
from typing import Optional
from algorithms.location import haversine_nm
# ──────────────────────────────────────────────────────────────────
# 공존 판정 · 심각도 임계
# ──────────────────────────────────────────────────────────────────
MIN_COEXISTENCE_GROUP = 2 # 같은 이름에 MMSI 2개 이상
IMPOSSIBLE_SPEED_KTS = 60.0 # 두 위치 이동에 필요한 속도가 이보다 크면 물리 불가능
CRITICAL_DISTANCE_KM = 50.0 # 단발이라도 이 거리 이상이면 즉시 CRITICAL
HIGH_DISTANCE_KM = 10.0 # HIGH 기준 거리
CRITICAL_COEXISTENCE_COUNT = 3 # 누적 공존 N회 이상이면 CRITICAL 승격
HIGH_COEXISTENCE_COUNT = 2 # 누적 공존 N회 이상이면 HIGH
NM_TO_KM = 1.852 # 1 nautical mile = 1.852 km
def detect_gear_name_collisions(
gear_signals: list[dict],
now: datetime,
) -> list[dict]:
"""동일 이름 · 다중 MMSI 공존 세트 추출.
Args:
gear_signals: [{mmsi, name, lat, lon}, ...] — track_gear_identity 와 동일 입력.
now: 사이클 기준 시각(UTC).
Returns:
공존 쌍 리스트. 세 개 이상 동시 송출 케이스는 모든 2-조합을 생성한다.
각 원소:
{
'name': str,
'mmsi_lo': str, # 사전순으로 작은 MMSI
'mmsi_hi': str,
'lat_lo', 'lon_lo': float,
'lat_hi', 'lon_hi': float,
'distance_km': float,
'parent_name': Optional[str], # 힌트 (GEAR_PATTERN parent 그룹, 있으면)
'observed_at': datetime,
}
"""
if not gear_signals:
return []
# 이름 기준 그룹핑
by_name: dict[str, list[dict]] = {}
for sig in gear_signals:
name = sig.get('name')
mmsi = sig.get('mmsi')
if not name or not mmsi:
continue
by_name.setdefault(name, []).append(sig)
collisions: list[dict] = []
for name, signals in by_name.items():
if len(signals) < MIN_COEXISTENCE_GROUP:
continue
# 같은 MMSI 중복은 제거 (한 cycle 에 동일 MMSI 가 다수 신호로 들어올 수 있음)
unique_by_mmsi: dict[str, dict] = {}
for sig in signals:
unique_by_mmsi.setdefault(sig['mmsi'], sig)
if len(unique_by_mmsi) < MIN_COEXISTENCE_GROUP:
continue
parent_name = _infer_parent_name(name)
mmsis = sorted(unique_by_mmsi.keys())
for a, b in combinations(mmsis, 2):
sa, sb = unique_by_mmsi[a], unique_by_mmsi[b]
dist_km = _haversine_km(
sa.get('lat'), sa.get('lon'),
sb.get('lat'), sb.get('lon'),
)
collisions.append({
'name': name,
'mmsi_lo': a,
'mmsi_hi': b,
'lat_lo': _to_float(sa.get('lat')),
'lon_lo': _to_float(sa.get('lon')),
'lat_hi': _to_float(sb.get('lat')),
'lon_hi': _to_float(sb.get('lon')),
'distance_km': dist_km,
'parent_name': parent_name,
'observed_at': now,
})
return collisions
def classify_severity(
coexistence_count: int,
max_distance_km: Optional[float],
swap_count: int = 0,
) -> str:
"""충돌 심각도 산정.
- CRITICAL: 거리 불가능 / 누적 공존 N회 이상
- HIGH: 상당 거리 / 2회 이상
- MEDIUM: 단발 근거리
- LOW: 근거리 + 거리 정보 없음
"""
distance = max_distance_km or 0.0
if distance >= CRITICAL_DISTANCE_KM:
return 'CRITICAL'
if coexistence_count >= CRITICAL_COEXISTENCE_COUNT:
return 'CRITICAL'
if distance >= HIGH_DISTANCE_KM:
return 'HIGH'
if coexistence_count >= HIGH_COEXISTENCE_COUNT:
return 'HIGH'
if swap_count >= HIGH_COEXISTENCE_COUNT:
return 'HIGH'
if max_distance_km is None or max_distance_km < 0.1:
return 'LOW'
return 'MEDIUM'
def _haversine_km(lat1, lon1, lat2, lon2) -> float:
"""두 좌표 사이 거리를 km 로 반환. 입력 누락 시 0.0."""
try:
if lat1 is None or lon1 is None or lat2 is None or lon2 is None:
return 0.0
nm = haversine_nm(float(lat1), float(lon1), float(lat2), float(lon2))
return round(nm * NM_TO_KM, 2)
except (TypeError, ValueError):
return 0.0
def _to_float(val) -> Optional[float]:
if val is None:
return None
try:
return float(val)
except (TypeError, ValueError):
return None
def _infer_parent_name(gear_name: str) -> Optional[str]:
"""어구 이름에서 모선명 부분 추출 (느슨).
fleet_tracker 가 이미 GEAR_PATTERN 으로 정교하게 파싱하지만, 알고리즘 모듈 독립성을
위해 단순 휴리스틱만 유지한다. 값이 필요한 경우 fleet_tracker 호출부에서 덮어쓴다.
"""
if not gear_name:
return None
# '_숫자' 로 끝나는 서픽스 제거
base = gear_name
parts = base.rsplit('_', 2)
if len(parts) >= 2 and any(ch.isdigit() for ch in parts[-1]):
return parts[0]
return None