- V029: kcg.fishery_permit_cn 신규 테이블(연단위, permit_year+permit_no 복합 유니크) + fleet_vessels permit_year/fishery_code 컬럼 - load_fishery_permit_cn.py: xls → DB 적재 스크립트, 906척 + 497 신청인사 upsert - G-04/G-05/G-06 Dead code 해결: classify_gear_violations 호출 연결, dir() 버그 제거 - find_pair_candidates: bbox 1차 + 궤적 유사도(location/sog_corr/cog_alignment) 2차, role 가점 - spoofing 산식 교체: 1시간 윈도우 + teleport 절대 가점 + extreme 50kn 단독 0.6 확정 - transshipment 선종 완화: shipTy 부분일치 + 412* FISHING 간주 - gear_code DB write 경로 신설 + fleet_tracker API 3개 추가 - cron 스크립트: fishery_permit/pair_type/fleet_role 신규 섹션
106 lines
3.6 KiB
Python
106 lines
3.6 KiB
Python
import pandas as pd
|
|
from algorithms.location import haversine_nm, bd09_to_wgs84, compute_bd09_offset # noqa: F401
|
|
|
|
MAX_FISHING_SPEED_KNOTS = 25.0
|
|
EXTREME_SPEED_KNOTS = 50.0 # 물리적 불가능 속도 (단독으로 spoofing 확정)
|
|
RECENT_WINDOW_BUCKETS = 12 # 최근 12 x 5min = 1시간 윈도우로 분모 제한
|
|
|
|
|
|
def detect_teleportation(df_vessel: pd.DataFrame,
|
|
max_speed_knots: float = MAX_FISHING_SPEED_KNOTS) -> list[dict]:
|
|
"""연속 AIS 포인트 간 물리적 불가능 이동 탐지."""
|
|
if len(df_vessel) < 2:
|
|
return []
|
|
|
|
anomalies = []
|
|
records = df_vessel.sort_values('timestamp').to_dict('records')
|
|
|
|
for i in range(1, len(records)):
|
|
prev, curr = records[i - 1], records[i]
|
|
dist_nm = haversine_nm(prev['lat'], prev['lon'], curr['lat'], curr['lon'])
|
|
dt_hours = (
|
|
pd.Timestamp(curr['timestamp']) - pd.Timestamp(prev['timestamp'])
|
|
).total_seconds() / 3600
|
|
|
|
if dt_hours <= 0:
|
|
continue
|
|
|
|
implied_speed = dist_nm / dt_hours
|
|
|
|
if implied_speed > max_speed_knots:
|
|
anomalies.append({
|
|
'idx': i,
|
|
'dist_nm': round(dist_nm, 2),
|
|
'implied_kn': round(implied_speed, 1),
|
|
'type': 'TELEPORTATION',
|
|
'confidence': 'HIGH' if implied_speed > 50 else 'MED',
|
|
})
|
|
|
|
return anomalies
|
|
|
|
|
|
def count_speed_jumps(df_vessel: pd.DataFrame, threshold_knots: float = 10.0) -> int:
|
|
"""연속 SOG 급변 횟수."""
|
|
if len(df_vessel) < 2:
|
|
return 0
|
|
|
|
sog = df_vessel['sog'].values
|
|
jumps = 0
|
|
for i in range(1, len(sog)):
|
|
if abs(sog[i] - sog[i - 1]) > threshold_knots:
|
|
jumps += 1
|
|
return jumps
|
|
|
|
|
|
def compute_spoofing_score(df_vessel: pd.DataFrame) -> float:
|
|
"""종합 GPS 스푸핑 점수 (0~1).
|
|
|
|
분모를 24h 누적 길이가 아닌 최근 1시간 윈도우로 제한하고,
|
|
물리적 불가능 속도(>50kn)는 단독으로 강한 가점을 부여한다.
|
|
"""
|
|
if len(df_vessel) < 2:
|
|
return 0.0
|
|
|
|
# 최근 N 버킷(약 1시간) 기준으로 분모 고정
|
|
window_df = df_vessel.tail(RECENT_WINDOW_BUCKETS)
|
|
n_window = max(len(window_df), 2)
|
|
|
|
score = 0.0
|
|
|
|
# 1) 순간이동 — 절대 가점 (건당 0.2) + extreme 단독 확정 가점 0.6
|
|
teleports = detect_teleportation(window_df)
|
|
if teleports:
|
|
score += min(0.4, len(teleports) * 0.20)
|
|
extreme = any(t.get('implied_kn', 0) >= EXTREME_SPEED_KNOTS for t in teleports)
|
|
if extreme:
|
|
score = max(score, 0.6)
|
|
|
|
# 2) max_speed 컬럼이 있으면 window 내 최고속 직접 확인 (SNPDB 집계값 활용)
|
|
if 'max_speed' in window_df.columns:
|
|
try:
|
|
peak_kn = float(window_df['max_speed'].max())
|
|
if peak_kn >= EXTREME_SPEED_KNOTS:
|
|
score = max(score, 0.6)
|
|
elif peak_kn > MAX_FISHING_SPEED_KNOTS:
|
|
score += 0.15
|
|
except (TypeError, ValueError):
|
|
pass
|
|
|
|
# 3) SOG 급변 — 윈도우 비율 가점
|
|
jumps = count_speed_jumps(window_df)
|
|
if jumps > 0:
|
|
score += min(0.3, jumps / n_window * 3)
|
|
|
|
# 4) BD09 오프셋 — 비중국 선박만 (중국 412*는 좌표계 차이로 노이즈)
|
|
mmsi_str = str(df_vessel.iloc[0].get('mmsi', '')) if 'mmsi' in df_vessel.columns else ''
|
|
if not mmsi_str.startswith('412'):
|
|
mid_idx = len(df_vessel) // 2
|
|
row = df_vessel.iloc[mid_idx]
|
|
offset = compute_bd09_offset(row['lat'], row['lon'])
|
|
if offset > 300:
|
|
score += 0.3
|
|
elif offset > 100:
|
|
score += 0.1
|
|
|
|
return round(min(score, 1.0), 4)
|