- prediction/algorithms/transshipment.py 신규: 그리드 공간인덱스 O(n log n) 환적 쌍 탐지 → 후보 필터(sog<2, tanker/cargo/fishing, 외국해안 제외) + 110m 근접 + 60분 지속 - prediction/scheduler.py: 8단계 환적탐지 사이클 추가, pair_history 영속화 - prediction/models/result.py: is_transship_suspect, transship_pair_mmsi, transship_duration_min - prediction/db/kcgdb.py: UPSERT 쿼리에 3개 컬럼 추가 - database/migration/008_transshipment.sql: ALTER TABLE 3개 컬럼 추가 - backend VesselAnalysisResult + VesselAnalysisDto: TransshipInfo 중첩 DTO 추가 - frontend types.ts: algorithms.transship 타입 추가 - frontend useKoreaFilters.ts: O(n²) 65줄 → analysisMap 소비 8줄 → currentTime 매초 의존성 제거, proximityStartRef 제거 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
217 lines
7.7 KiB
Python
217 lines
7.7 KiB
Python
import logging
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
|
|
from config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_scheduler: Optional[BackgroundScheduler] = None
|
|
_last_run: dict = {
|
|
'timestamp': None,
|
|
'duration_sec': 0,
|
|
'vessel_count': 0,
|
|
'upserted': 0,
|
|
'error': None,
|
|
}
|
|
|
|
_transship_pair_history: dict = {}
|
|
|
|
|
|
def get_last_run() -> dict:
|
|
return _last_run.copy()
|
|
|
|
|
|
def run_analysis_cycle():
|
|
"""5분 주기 분석 사이클 — 인메모리 캐시 기반."""
|
|
import re as _re
|
|
from cache.vessel_store import vessel_store
|
|
from db import snpdb, kcgdb
|
|
from pipeline.orchestrator import ChineseFishingVesselPipeline
|
|
from algorithms.location import classify_zone
|
|
from algorithms.fishing_pattern import compute_ucaf_score, compute_ucft_score
|
|
from algorithms.dark_vessel import is_dark_vessel
|
|
from algorithms.spoofing import compute_spoofing_score, count_speed_jumps, compute_bd09_offset
|
|
from algorithms.risk import compute_vessel_risk_score
|
|
from fleet_tracker import fleet_tracker
|
|
from models.result import AnalysisResult
|
|
|
|
start = time.time()
|
|
_last_run['timestamp'] = datetime.now(timezone.utc).isoformat()
|
|
_last_run['error'] = None
|
|
|
|
try:
|
|
# 1. 증분 로드 + stale 제거
|
|
if vessel_store.last_bucket is None:
|
|
logger.warning('last_bucket is None, skipping incremental fetch (initial load not complete)')
|
|
df_new = None
|
|
else:
|
|
df_new = snpdb.fetch_incremental(vessel_store.last_bucket)
|
|
if df_new is not None and len(df_new) > 0:
|
|
vessel_store.merge_incremental(df_new)
|
|
vessel_store.evict_stale(settings.CACHE_WINDOW_HOURS)
|
|
|
|
# 정적정보 / 허가어선 주기적 갱신
|
|
vessel_store.refresh_static_info()
|
|
vessel_store.refresh_permit_registry()
|
|
|
|
# 2. 분석 대상 선별 (SOG/COG 계산 포함)
|
|
df_targets = vessel_store.select_analysis_targets()
|
|
if len(df_targets) == 0:
|
|
logger.info('no analysis targets, skipping cycle')
|
|
_last_run['vessel_count'] = 0
|
|
return
|
|
|
|
# 3. 7단계 파이프라인 실행
|
|
pipeline = ChineseFishingVesselPipeline()
|
|
classifications, vessel_dfs = pipeline.run(df_targets)
|
|
|
|
if not classifications:
|
|
logger.info('no vessels classified, skipping')
|
|
_last_run['vessel_count'] = 0
|
|
return
|
|
|
|
# 4. 등록 선단 기반 fleet 분석
|
|
_gear_re = _re.compile(r'^.+_\d+_\d*$|%$')
|
|
with kcgdb.get_conn() as kcg_conn:
|
|
fleet_tracker.load_registry(kcg_conn)
|
|
|
|
all_ais = []
|
|
for mmsi, df in vessel_dfs.items():
|
|
if len(df) > 0:
|
|
last = df.iloc[-1]
|
|
all_ais.append({
|
|
'mmsi': mmsi,
|
|
'name': vessel_store.get_vessel_info(mmsi).get('name', ''),
|
|
'lat': float(last['lat']),
|
|
'lon': float(last['lon']),
|
|
})
|
|
|
|
fleet_tracker.match_ais_to_registry(all_ais, kcg_conn)
|
|
|
|
gear_signals = [v for v in all_ais if _gear_re.match(v.get('name', ''))]
|
|
fleet_tracker.track_gear_identity(gear_signals, kcg_conn)
|
|
|
|
fleet_roles = fleet_tracker.build_fleet_clusters(vessel_dfs)
|
|
|
|
fleet_tracker.save_snapshot(vessel_dfs, kcg_conn)
|
|
|
|
# 5. 선박별 추가 알고리즘 → AnalysisResult 생성
|
|
results = []
|
|
for c in classifications:
|
|
mmsi = c['mmsi']
|
|
df_v = vessel_dfs.get(mmsi)
|
|
if df_v is None or len(df_v) == 0:
|
|
continue
|
|
|
|
last_row = df_v.iloc[-1]
|
|
ts = last_row.get('timestamp')
|
|
|
|
zone_info = classify_zone(last_row['lat'], last_row['lon'])
|
|
|
|
gear_map = {'TRAWL': 'OT', 'PURSE': 'PS', 'LONGLINE': 'GN', 'TRAP': 'TRAP'}
|
|
gear = gear_map.get(c['vessel_type'], 'OT')
|
|
ucaf = compute_ucaf_score(df_v, gear)
|
|
ucft = compute_ucft_score(df_v)
|
|
|
|
dark, gap_min = is_dark_vessel(df_v)
|
|
|
|
spoof_score = compute_spoofing_score(df_v)
|
|
speed_jumps = count_speed_jumps(df_v)
|
|
bd09_offset = compute_bd09_offset(last_row['lat'], last_row['lon'])
|
|
|
|
fleet_info = fleet_roles.get(mmsi, {})
|
|
|
|
is_permitted = vessel_store.is_permitted(mmsi)
|
|
risk_score, risk_level = compute_vessel_risk_score(
|
|
mmsi, df_v, zone_info, is_permitted=is_permitted,
|
|
)
|
|
|
|
activity = 'UNKNOWN'
|
|
if 'state' in df_v.columns and len(df_v) > 0:
|
|
activity = df_v['state'].mode().iloc[0]
|
|
|
|
results.append(AnalysisResult(
|
|
mmsi=mmsi,
|
|
timestamp=ts,
|
|
vessel_type=c['vessel_type'],
|
|
confidence=c['confidence'],
|
|
fishing_pct=c['fishing_pct'],
|
|
cluster_id=fleet_info.get('cluster_id', -1),
|
|
season=c['season'],
|
|
zone=zone_info.get('zone', 'EEZ_OR_BEYOND'),
|
|
dist_to_baseline_nm=zone_info.get('dist_from_baseline_nm', 999.0),
|
|
activity_state=activity,
|
|
ucaf_score=ucaf,
|
|
ucft_score=ucft,
|
|
is_dark=dark,
|
|
gap_duration_min=gap_min,
|
|
spoofing_score=spoof_score,
|
|
bd09_offset_m=bd09_offset,
|
|
speed_jump_count=speed_jumps,
|
|
cluster_size=fleet_info.get('cluster_size', 0),
|
|
is_leader=fleet_info.get('is_leader', False),
|
|
fleet_role=fleet_info.get('fleet_role', 'NOISE'),
|
|
risk_score=risk_score,
|
|
risk_level=risk_level,
|
|
features=c.get('features', {}),
|
|
))
|
|
|
|
# 6. 환적 의심 탐지 (pair_history 모듈 레벨로 사이클 간 유지)
|
|
from algorithms.transshipment import detect_transshipment
|
|
|
|
results_map = {r.mmsi: r for r in results}
|
|
transship_pairs = detect_transshipment(df_targets, _transship_pair_history)
|
|
for mmsi_a, mmsi_b, dur in transship_pairs:
|
|
if mmsi_a in results_map:
|
|
results_map[mmsi_a].is_transship_suspect = True
|
|
results_map[mmsi_a].transship_pair_mmsi = mmsi_b
|
|
results_map[mmsi_a].transship_duration_min = dur
|
|
if mmsi_b in results_map:
|
|
results_map[mmsi_b].is_transship_suspect = True
|
|
results_map[mmsi_b].transship_pair_mmsi = mmsi_a
|
|
results_map[mmsi_b].transship_duration_min = dur
|
|
|
|
# 7. 결과 저장
|
|
upserted = kcgdb.upsert_results(results)
|
|
kcgdb.cleanup_old(hours=48)
|
|
|
|
elapsed = round(time.time() - start, 2)
|
|
_last_run['duration_sec'] = elapsed
|
|
_last_run['vessel_count'] = len(results)
|
|
_last_run['upserted'] = upserted
|
|
logger.info(
|
|
'analysis cycle: %d vessels, %d upserted, %.2fs',
|
|
len(results), upserted, elapsed,
|
|
)
|
|
|
|
except Exception as e:
|
|
_last_run['error'] = str(e)
|
|
logger.exception('analysis cycle failed: %s', e)
|
|
|
|
|
|
def start_scheduler():
|
|
global _scheduler
|
|
_scheduler = BackgroundScheduler()
|
|
_scheduler.add_job(
|
|
run_analysis_cycle,
|
|
'interval',
|
|
minutes=settings.SCHEDULER_INTERVAL_MIN,
|
|
id='vessel_analysis',
|
|
max_instances=1,
|
|
replace_existing=True,
|
|
)
|
|
_scheduler.start()
|
|
logger.info('scheduler started (interval=%dm)', settings.SCHEDULER_INTERVAL_MIN)
|
|
|
|
|
|
def stop_scheduler():
|
|
global _scheduler
|
|
if _scheduler:
|
|
_scheduler.shutdown(wait=False)
|
|
_scheduler = None
|
|
logger.info('scheduler stopped')
|