- AI분석/현장분석/보고서 위험도 용어 통일 (HIGH→WATCH, MEDIUM→MONITOR, LOW→NORMAL) - 공통 riskMapping.ts: ALERT_COLOR/EMOJI/LEVELS, RISK_TO_ALERT, STATS_KEY_MAP - deck.gl 오버레이 색상 현장분석 팔레트로 통일 - Python 경량 분석: 파이프라인 미통과 412* 선박에 위치 기반 간이 AnalysisResult 생성 - 현장분석 fallback 제거: classifyStateFallback/classifyFishingZone → Python 결과 전용 - 보고서 위험 평가: Python riskCounts 실데이터 기반으로 전면 교체 - 현장분석 우측 패널: 항적 미니맵 (72시간, fetchVesselTrack API) - 현장분석 좌측 패널: 위험도 점수 기준 섹션 추가
127 lines
3.2 KiB
Python
127 lines
3.2 KiB
Python
from typing import Optional, Tuple
|
|
|
|
import pandas as pd
|
|
from algorithms.location import classify_zone
|
|
from algorithms.fishing_pattern import detect_fishing_segments, detect_trawl_uturn
|
|
from algorithms.dark_vessel import detect_ais_gaps
|
|
from algorithms.spoofing import detect_teleportation
|
|
|
|
|
|
def compute_lightweight_risk_score(
|
|
zone_info: dict,
|
|
sog: float,
|
|
is_permitted: Optional[bool] = None,
|
|
) -> Tuple[int, str]:
|
|
"""위치·허가 이력 기반 경량 위험도 (파이프라인 미통과 선박용).
|
|
|
|
compute_vessel_risk_score의 1번(위치)+4번(허가) 로직과 동일.
|
|
Returns: (risk_score, risk_level)
|
|
"""
|
|
score = 0
|
|
|
|
# 1. 위치 기반 (최대 40점)
|
|
zone = zone_info.get('zone', '')
|
|
if zone == 'TERRITORIAL_SEA':
|
|
score += 40
|
|
elif zone == 'CONTIGUOUS_ZONE':
|
|
score += 10
|
|
elif zone.startswith('ZONE_'):
|
|
if is_permitted is not None and not is_permitted:
|
|
score += 25
|
|
|
|
# 4. 허가 이력 (최대 20점)
|
|
if is_permitted is not None and not is_permitted:
|
|
score += 20
|
|
|
|
score = min(score, 100)
|
|
|
|
if score >= 70:
|
|
level = 'CRITICAL'
|
|
elif score >= 50:
|
|
level = 'HIGH'
|
|
elif score >= 30:
|
|
level = 'MEDIUM'
|
|
else:
|
|
level = 'LOW'
|
|
|
|
return score, level
|
|
|
|
|
|
def compute_vessel_risk_score(
|
|
mmsi: str,
|
|
df_vessel: pd.DataFrame,
|
|
zone_info: Optional[dict] = None,
|
|
is_permitted: Optional[bool] = None,
|
|
) -> Tuple[int, str]:
|
|
"""선박별 종합 위반 위험도 (0~100점).
|
|
|
|
Returns: (risk_score, risk_level)
|
|
"""
|
|
if len(df_vessel) == 0:
|
|
return 0, 'LOW'
|
|
|
|
score = 0
|
|
|
|
# 1. 위치 기반 (최대 40점)
|
|
if zone_info is None:
|
|
last = df_vessel.iloc[-1]
|
|
zone_info = classify_zone(last['lat'], last['lon'])
|
|
|
|
zone = zone_info.get('zone', '')
|
|
if zone == 'TERRITORIAL_SEA':
|
|
score += 40
|
|
elif zone == 'CONTIGUOUS_ZONE':
|
|
score += 10
|
|
elif zone.startswith('ZONE_'):
|
|
# 특정어업수역 내 — 무허가면 가산
|
|
if is_permitted is not None and not is_permitted:
|
|
score += 25
|
|
|
|
# 2. 조업 행위 (최대 30점)
|
|
segs = detect_fishing_segments(df_vessel)
|
|
ts_fishing = [s for s in segs if s.get('in_territorial_sea')]
|
|
if ts_fishing:
|
|
score += 20
|
|
elif segs:
|
|
score += 5
|
|
|
|
uturn = detect_trawl_uturn(df_vessel)
|
|
if uturn.get('trawl_suspected'):
|
|
score += 10
|
|
|
|
# 3. AIS 조작 (최대 35점)
|
|
teleports = detect_teleportation(df_vessel)
|
|
if teleports:
|
|
score += 20
|
|
|
|
from algorithms.spoofing import count_speed_jumps
|
|
jumps = count_speed_jumps(df_vessel)
|
|
if jumps >= 3:
|
|
score += 10
|
|
elif jumps >= 1:
|
|
score += 5
|
|
|
|
gaps = detect_ais_gaps(df_vessel)
|
|
critical_gaps = [g for g in gaps if g['gap_min'] >= 60]
|
|
if critical_gaps:
|
|
score += 15
|
|
elif gaps:
|
|
score += 5
|
|
|
|
# 4. 허가 이력 (최대 20점)
|
|
if is_permitted is not None and not is_permitted:
|
|
score += 20
|
|
|
|
score = min(score, 100)
|
|
|
|
if score >= 70:
|
|
level = 'CRITICAL'
|
|
elif score >= 50:
|
|
level = 'HIGH'
|
|
elif score >= 30:
|
|
level = 'MEDIUM'
|
|
else:
|
|
level = 'LOW'
|
|
|
|
return score, level
|