kcg-monitoring/prediction/scheduler.py
htlee 1bf70f46ac feat: 분석 용어/색상 통일 + 경량 분석 + 항적 미니맵
- AI분석/현장분석/보고서 위험도 용어 통일 (HIGH→WATCH, MEDIUM→MONITOR, LOW→NORMAL)
- 공통 riskMapping.ts: ALERT_COLOR/EMOJI/LEVELS, RISK_TO_ALERT, STATS_KEY_MAP
- deck.gl 오버레이 색상 현장분석 팔레트로 통일
- Python 경량 분석: 파이프라인 미통과 412* 선박에 위치 기반 간이 AnalysisResult 생성
- 현장분석 fallback 제거: classifyStateFallback/classifyFishingZone → Python 결과 전용
- 보고서 위험 평가: Python riskCounts 실데이터 기반으로 전면 교체
- 현장분석 우측 패널: 항적 미니맵 (72시간, fetchVesselTrack API)
- 현장분석 좌측 패널: 위험도 점수 기준 섹션 추가
2026-03-25 12:39:22 +09:00

296 lines
11 KiB
Python

import logging
import time
from datetime import datetime, timezone
from typing import Optional
from apscheduler.schedulers.background import BackgroundScheduler
from config import settings
logger = logging.getLogger(__name__)
_scheduler: Optional[BackgroundScheduler] = None
_last_run: dict = {
'timestamp': None,
'duration_sec': 0,
'vessel_count': 0,
'upserted': 0,
'error': None,
}
_transship_pair_history: dict = {}
def get_last_run() -> dict:
return _last_run.copy()
def run_analysis_cycle():
"""5분 주기 분석 사이클 — 인메모리 캐시 기반."""
import re as _re
from cache.vessel_store import vessel_store
from db import snpdb, kcgdb
from pipeline.orchestrator import ChineseFishingVesselPipeline
from algorithms.location import classify_zone
from algorithms.fishing_pattern import compute_ucaf_score, compute_ucft_score
from algorithms.dark_vessel import is_dark_vessel
from algorithms.spoofing import compute_spoofing_score, count_speed_jumps, compute_bd09_offset
from algorithms.risk import compute_vessel_risk_score
from fleet_tracker import fleet_tracker
from models.result import AnalysisResult
start = time.time()
_last_run['timestamp'] = datetime.now(timezone.utc).isoformat()
_last_run['error'] = None
try:
# 1. 증분 로드 + stale 제거
if vessel_store.last_bucket is None:
logger.warning('last_bucket is None, skipping incremental fetch (initial load not complete)')
df_new = None
else:
df_new = snpdb.fetch_incremental(vessel_store.last_bucket)
if df_new is not None and len(df_new) > 0:
vessel_store.merge_incremental(df_new)
vessel_store.evict_stale(settings.CACHE_WINDOW_HOURS)
# 정적정보 / 허가어선 주기적 갱신
vessel_store.refresh_static_info()
vessel_store.refresh_permit_registry()
# 2. 분석 대상 선별 (SOG/COG 계산 포함)
df_targets = vessel_store.select_analysis_targets()
if len(df_targets) == 0:
logger.info('no analysis targets, skipping cycle')
_last_run['vessel_count'] = 0
return
# 3. 7단계 파이프라인 실행
pipeline = ChineseFishingVesselPipeline()
classifications, vessel_dfs = pipeline.run(df_targets)
if not classifications:
logger.info('no vessels classified, skipping')
_last_run['vessel_count'] = 0
return
# 4. 등록 선단 기반 fleet 분석
_gear_re = _re.compile(r'^.+_\d+_\d*$|%$')
with kcgdb.get_conn() as kcg_conn:
fleet_tracker.load_registry(kcg_conn)
all_ais = []
for mmsi, df in vessel_dfs.items():
if len(df) > 0:
last = df.iloc[-1]
all_ais.append({
'mmsi': mmsi,
'name': vessel_store.get_vessel_info(mmsi).get('name', ''),
'lat': float(last['lat']),
'lon': float(last['lon']),
})
fleet_tracker.match_ais_to_registry(all_ais, kcg_conn)
gear_signals = [v for v in all_ais if _gear_re.match(v.get('name', ''))]
fleet_tracker.track_gear_identity(gear_signals, kcg_conn)
fleet_roles = fleet_tracker.build_fleet_clusters(vessel_dfs)
fleet_tracker.save_snapshot(vessel_dfs, kcg_conn)
# 4.5 그룹 폴리곤 생성 + 저장
try:
from algorithms.polygon_builder import detect_gear_groups, build_all_group_snapshots
company_vessels = fleet_tracker.get_company_vessels(vessel_dfs)
gear_groups = detect_gear_groups(vessel_store)
group_snapshots = build_all_group_snapshots(
vessel_store, company_vessels,
fleet_tracker._companies,
)
saved = kcgdb.save_group_snapshots(group_snapshots)
cleaned = kcgdb.cleanup_group_snapshots(days=7)
logger.info('group polygons: %d saved, %d cleaned, %d gear groups',
saved, cleaned, len(gear_groups))
except Exception as e:
logger.warning('group polygon generation failed: %s', e)
# 5. 선박별 추가 알고리즘 → AnalysisResult 생성
results = []
for c in classifications:
mmsi = c['mmsi']
df_v = vessel_dfs.get(mmsi)
if df_v is None or len(df_v) == 0:
continue
last_row = df_v.iloc[-1]
ts = last_row.get('timestamp')
zone_info = classify_zone(last_row['lat'], last_row['lon'])
gear_map = {'TRAWL': 'OT', 'PURSE': 'PS', 'LONGLINE': 'GN', 'TRAP': 'TRAP'}
gear = gear_map.get(c['vessel_type'], 'OT')
ucaf = compute_ucaf_score(df_v, gear)
ucft = compute_ucft_score(df_v)
dark, gap_min = is_dark_vessel(df_v)
spoof_score = compute_spoofing_score(df_v)
speed_jumps = count_speed_jumps(df_v)
bd09_offset = compute_bd09_offset(last_row['lat'], last_row['lon'])
fleet_info = fleet_roles.get(mmsi, {})
is_permitted = vessel_store.is_permitted(mmsi)
risk_score, risk_level = compute_vessel_risk_score(
mmsi, df_v, zone_info, is_permitted=is_permitted,
)
activity = 'UNKNOWN'
if 'state' in df_v.columns and len(df_v) > 0:
activity = df_v['state'].mode().iloc[0]
results.append(AnalysisResult(
mmsi=mmsi,
timestamp=ts,
vessel_type=c['vessel_type'],
confidence=c['confidence'],
fishing_pct=c['fishing_pct'],
cluster_id=fleet_info.get('cluster_id', -1),
season=c['season'],
zone=zone_info.get('zone', 'EEZ_OR_BEYOND'),
dist_to_baseline_nm=zone_info.get('dist_from_baseline_nm', 999.0),
activity_state=activity,
ucaf_score=ucaf,
ucft_score=ucft,
is_dark=dark,
gap_duration_min=gap_min,
spoofing_score=spoof_score,
bd09_offset_m=bd09_offset,
speed_jump_count=speed_jumps,
cluster_size=fleet_info.get('cluster_size', 0),
is_leader=fleet_info.get('is_leader', False),
fleet_role=fleet_info.get('fleet_role', 'NOISE'),
risk_score=risk_score,
risk_level=risk_level,
features=c.get('features', {}),
))
# ── 5.5 경량 분석 — 파이프라인 미통과 412* 선박 ──
from algorithms.risk import compute_lightweight_risk_score
pipeline_mmsis = {c['mmsi'] for c in classifications}
lightweight_mmsis = vessel_store.get_chinese_mmsis() - pipeline_mmsis
if lightweight_mmsis:
now = datetime.now(timezone.utc)
all_positions = vessel_store.get_all_latest_positions()
lw_count = 0
for mmsi in lightweight_mmsis:
pos = all_positions.get(mmsi)
if pos is None or pos.get('lat') is None:
continue
lat, lon = pos['lat'], pos['lon']
sog = pos.get('sog', 0) or 0
cog = pos.get('cog', 0) or 0
ts = pos.get('timestamp', now)
zone_info = classify_zone(lat, lon)
if sog <= 1.0:
state = 'STATIONARY'
elif sog <= 5.0:
state = 'FISHING'
else:
state = 'SAILING'
is_permitted = vessel_store.is_permitted(mmsi)
risk_score, risk_level = compute_lightweight_risk_score(
zone_info, sog, is_permitted=is_permitted,
)
# BD-09 오프셋은 중국 선박이므로 제외 (412* = 중국)
results.append(AnalysisResult(
mmsi=mmsi,
timestamp=ts,
vessel_type='UNKNOWN',
confidence=0.0,
fishing_pct=0.0,
zone=zone_info.get('zone', 'EEZ_OR_BEYOND'),
dist_to_baseline_nm=zone_info.get('dist_from_baseline_nm', 999.0),
activity_state=state,
ucaf_score=0.0,
ucft_score=0.0,
is_dark=False,
gap_duration_min=0,
spoofing_score=0.0,
bd09_offset_m=0.0,
speed_jump_count=0,
cluster_id=-1,
cluster_size=0,
is_leader=False,
fleet_role='NONE',
risk_score=risk_score,
risk_level=risk_level,
is_transship_suspect=False,
transship_pair_mmsi='',
transship_duration_min=0,
))
lw_count += 1
logger.info('lightweight analysis: %d vessels', lw_count)
# 6. 환적 의심 탐지 (pair_history 모듈 레벨로 사이클 간 유지)
from algorithms.transshipment import detect_transshipment
results_map = {r.mmsi: r for r in results}
transship_pairs = detect_transshipment(df_targets, _transship_pair_history)
for mmsi_a, mmsi_b, dur in transship_pairs:
if mmsi_a in results_map:
results_map[mmsi_a].is_transship_suspect = True
results_map[mmsi_a].transship_pair_mmsi = mmsi_b
results_map[mmsi_a].transship_duration_min = dur
if mmsi_b in results_map:
results_map[mmsi_b].is_transship_suspect = True
results_map[mmsi_b].transship_pair_mmsi = mmsi_a
results_map[mmsi_b].transship_duration_min = dur
# 7. 결과 저장
upserted = kcgdb.upsert_results(results)
kcgdb.cleanup_old(hours=48)
elapsed = round(time.time() - start, 2)
_last_run['duration_sec'] = elapsed
_last_run['vessel_count'] = len(results)
_last_run['upserted'] = upserted
logger.info(
'analysis cycle: %d vessels, %d upserted, %.2fs',
len(results), upserted, elapsed,
)
except Exception as e:
_last_run['error'] = str(e)
logger.exception('analysis cycle failed: %s', e)
def start_scheduler():
global _scheduler
_scheduler = BackgroundScheduler()
_scheduler.add_job(
run_analysis_cycle,
'interval',
minutes=settings.SCHEDULER_INTERVAL_MIN,
id='vessel_analysis',
max_instances=1,
replace_existing=True,
)
_scheduler.start()
logger.info('scheduler started (interval=%dm)', settings.SCHEDULER_INTERVAL_MIN)
def stop_scheduler():
global _scheduler
if _scheduler:
_scheduler.shutdown(wait=False)
_scheduler = None
logger.info('scheduler stopped')