kcg-ai-monitoring/prediction/scheduler.py
htlee e5d123e4c5 feat(prediction): dark 의심 점수화 + transship 베테랑 관점 재설계
12h 누적 분석 결과 dark/transship이 운영 불가 수준으로 판정되어
탐지 철학을 근본부터 전환.

## dark 재설계: 넓은 탐지 + 의도적 OFF 의심 점수화

기존 "필터 제외" 방식에서 "넓게 기록 + 점수 산출 + 등급별 알람"으로 전환.
해경 베테랑 관점의 8가지 패턴을 가점 합산하여 0~100점 산출.

- P1 이동 중 OFF (gap 직전 SOG > 2kn)
- P2 민감 수역 경계 근처 OFF (영해/접속수역/특정조업수역)
- P3 반복 이력 (7일 내 재발) — 가장 강력
- P4 gap 후 이동거리 비정상 (은폐 이동)
- P5 주간 조업 시간 OFF
- P6 gap 직전 이상 행동 (teleport/급변)
- P7 무허가 선박 가점
- P8 장기 gap (3h/6h 구간별)
- 감점: gap 시작 위치가 한국 AIS 수신 커버리지 밖

완전 제외:
- 어구 AIS (GEAR_PATTERN 매칭, fleet_tracker SSOT)
- 한국 선박 (MMSI 440*, 441*) — 해경 관할 아님

등급: CRITICAL(70+) / HIGH(50~69) / WATCH(30~49) / NONE
이벤트는 HIGH 이상만 생성 (WATCH는 DB 저장만).

신규 함수:
- algorithms/dark_vessel.py: analyze_dark_pattern, compute_dark_suspicion
- scheduler.py: _is_dark_excluded, _fetch_dark_history (사이클당 1회 7일 이력 일괄 조회)

pipeline path + lightweight path 모두 동일 로직 적용.
결과는 features JSONB에 {dark_suspicion_score, dark_patterns,
dark_tier, dark_history_7d, dark_history_24h, gap_start_*} 저장.

## transship 재설계: 베테랑 함정근무자 기준

한정된 함정 자원으로 단속 출동을 결정할 수 있는 신뢰도 확보.

상수 재조정:
- SOG_THRESHOLD_KN: 2.0 → 1.0 (완전 정박만)
- PROXIMITY_DEG: 0.001 → 0.0007 (~77m)
- SUSPECT_DURATION_MIN: 60 → 45 (gap tolerance 있음)
- PAIR_EXPIRY_MIN: 120 → 180
- GAP_TOLERANCE_CYCLES: 2 신규 (GPS 노이즈 완화)

필수 조건 (모두 충족):
- 한국 EEZ 관할 수역 이내
- 환적 불가 선종 제외 (passenger/military/tanker/pilot/tug/sar)
- 어구 AIS 양쪽 제외
- 45분 이상 지속 (miss_count 2 사이클까지 용인)

점수 체계 (base 40):
- 야간(KST 20~04): +15
- 무허가 가점: +20
- COG 편차 > 45°: +20 (나란히 가는 선단 배제)
- 지속 ≥ 90분: +20
- 영해/접속수역 위치: +15

등급: CRITICAL(90+) / HIGH(70~89) / WATCH(50~69)
WATCH는 저장 없이 로그만. HIGH/CRITICAL만 이벤트.

pair_history 구조 확장:
- 기존: {(a,b): datetime}
- 신규: {(a,b): {'first_seen', 'last_seen', 'miss_count', 'last_lat/lon/cog_a/cog_b'}}
- miss_count > GAP_TOLERANCE_CYCLES면 삭제 (즉시 리셋 아님)

## event_generator 룰 교체

- dark_vessel_long 룰 제거 → dark_critical, dark_high (features.dark_tier 기반)
- transship 룰 제거 → transship_critical, transship_high (features.transship_tier 기반)
- DEDUP: ILLEGAL_TRANSSHIP 67→181, DARK_VESSEL 127→131, ZONE_DEPARTURE 127→89

## 공통 정리

- scheduler.py의 _gear_re 삭제, fleet_tracker.GEAR_PATTERN 단일 SSOT로 통합
2026-04-09 07:42:15 +09:00

604 lines
24 KiB
Python

import logging
import time
from datetime import datetime, timedelta, timezone
from typing import Optional
from zoneinfo import ZoneInfo
from apscheduler.schedulers.background import BackgroundScheduler
from config import settings
from fleet_tracker import GEAR_PATTERN
logger = logging.getLogger(__name__)
_KST = ZoneInfo('Asia/Seoul')
_scheduler: Optional[BackgroundScheduler] = None
_last_run: dict = {
'timestamp': None,
'duration_sec': 0,
'vessel_count': 0,
'upserted': 0,
'error': None,
}
_transship_pair_history: dict = {}
# 한국 선박 MMSI prefix — dark 판별 완전 제외
_KR_DOMESTIC_PREFIXES = ('440', '441')
def _is_dark_excluded(mmsi: str, name: str) -> tuple[bool, str]:
"""dark 탐지 대상에서 완전 제외할지. 어구/한국선만 필터.
사용자 알람은 선박만 대상, 한국선은 해경 관할 아님.
"""
if any(mmsi.startswith(p) for p in _KR_DOMESTIC_PREFIXES):
return True, 'kr_domestic'
if name and GEAR_PATTERN.match(name):
return True, 'gear_signal'
return False, ''
def _fetch_dark_history(kcg_conn, mmsi_list: list[str]) -> dict[str, dict]:
"""최근 7일 내 is_dark=True 이력을 mmsi별로 집계.
사이클 시작 시 한 번에 조회하여 점수 계산 시 재사용.
"""
if not mmsi_list:
return {}
try:
cur = kcg_conn.cursor()
cur.execute(
"""
SELECT mmsi,
count(*) AS n7,
count(*) FILTER (WHERE analyzed_at > now() - interval '24 hours') AS n24,
max(analyzed_at) AS last_at
FROM kcg.vessel_analysis_results
WHERE is_dark = true
AND analyzed_at > now() - interval '7 days'
AND mmsi = ANY(%s)
GROUP BY mmsi
""",
(list(mmsi_list),),
)
return {
str(m): {'count_7d': int(n7 or 0), 'count_24h': int(n24 or 0), 'last_at': t}
for m, n7, n24, t in cur.fetchall()
}
except Exception as e:
logger.warning('fetch_dark_history failed: %s', e)
return {}
def get_last_run() -> dict:
return _last_run.copy()
def run_analysis_cycle():
"""5분 주기 분석 사이클 — 인메모리 캐시 기반."""
from cache.vessel_store import vessel_store
from db import snpdb, kcgdb
from pipeline.orchestrator import ChineseFishingVesselPipeline
from algorithms.location import classify_zone
from algorithms.fishing_pattern import compute_ucaf_score, compute_ucft_score
from algorithms.dark_vessel import is_dark_vessel, analyze_dark_pattern, compute_dark_suspicion
from algorithms.spoofing import compute_spoofing_score, count_speed_jumps, compute_bd09_offset
from algorithms.risk import compute_vessel_risk_score
from fleet_tracker import fleet_tracker
from models.result import AnalysisResult
start = time.time()
_last_run['timestamp'] = datetime.now(timezone.utc).isoformat()
_last_run['error'] = None
try:
# 1. 증분 로드 + stale 제거
if vessel_store.last_bucket is None:
logger.warning('last_bucket is None, skipping incremental fetch (initial load not complete)')
df_new = None
else:
df_new = snpdb.fetch_incremental(vessel_store.last_bucket)
if df_new is not None and len(df_new) > 0:
vessel_store.merge_incremental(df_new)
vessel_store.evict_stale(settings.CACHE_WINDOW_HOURS)
# 정적정보 / 허가어선 주기적 갱신
vessel_store.refresh_static_info()
vessel_store.refresh_permit_registry()
# 2. 분석 대상 선별 (SOG/COG 계산 포함)
df_targets = vessel_store.select_analysis_targets()
if len(df_targets) == 0:
logger.info('no analysis targets, skipping cycle')
_last_run['vessel_count'] = 0
return
# 3. 7단계 파이프라인 실행
pipeline = ChineseFishingVesselPipeline()
classifications, vessel_dfs = pipeline.run(df_targets)
if not classifications:
logger.info('no vessels classified, skipping')
_last_run['vessel_count'] = 0
return
# 4. 등록 선단 기반 fleet 분석
with kcgdb.get_conn() as kcg_conn:
fleet_tracker.load_registry(kcg_conn)
all_ais = []
for mmsi, df in vessel_dfs.items():
if len(df) > 0:
last = df.iloc[-1]
all_ais.append({
'mmsi': mmsi,
'name': vessel_store.get_vessel_info(mmsi).get('name', ''),
'lat': float(last['lat']),
'lon': float(last['lon']),
})
fleet_tracker.match_ais_to_registry(all_ais, kcg_conn)
gear_signals = [v for v in all_ais if GEAR_PATTERN.match(v.get('name', '') or '')]
fleet_tracker.track_gear_identity(gear_signals, kcg_conn)
fleet_roles = fleet_tracker.build_fleet_clusters(vessel_dfs)
fleet_tracker.save_snapshot(vessel_dfs, kcg_conn)
gear_groups = []
# 4.5 그룹 폴리곤 생성 + 저장
try:
from algorithms.polygon_builder import detect_gear_groups, build_all_group_snapshots
company_vessels = fleet_tracker.get_company_vessels(vessel_dfs)
gear_groups = detect_gear_groups(vessel_store)
group_snapshots = build_all_group_snapshots(
vessel_store, company_vessels,
fleet_tracker._companies,
)
saved = kcgdb.save_group_snapshots(group_snapshots)
cleaned = kcgdb.cleanup_group_snapshots(days=7)
logger.info('group polygons: %d saved, %d cleaned, %d gear groups',
saved, cleaned, len(gear_groups))
except Exception as e:
logger.warning('group polygon generation failed: %s', e)
# 4.7 어구 연관성 분석 (멀티모델 패턴 추적)
try:
from algorithms.gear_correlation import run_gear_correlation
from algorithms.gear_parent_inference import run_gear_parent_inference
corr_result = run_gear_correlation(
vessel_store=vessel_store,
gear_groups=gear_groups,
conn=kcg_conn,
)
logger.info(
'gear correlation: %d scores updated, %d raw metrics, %d models',
corr_result['updated'], corr_result['raw_inserted'],
corr_result['models'],
)
inference_result = run_gear_parent_inference(
vessel_store=vessel_store,
gear_groups=gear_groups,
conn=kcg_conn,
)
logger.info(
'gear parent inference: %d groups, %d direct-match, %d candidates, %d promoted, %d review, %d skipped',
inference_result['groups'],
inference_result.get('direct_matched', 0),
inference_result['candidates'],
inference_result['promoted'],
inference_result['review_required'],
inference_result['skipped'],
)
except Exception as e:
logger.warning('gear correlation failed: %s', e)
# 5. 선박별 추가 알고리즘 → AnalysisResult 생성
# dark 이력 일괄 조회 (7일 history) — 사이클당 1회
now_kst_hour = datetime.now(_KST).hour
all_chinese = vessel_store.get_chinese_mmsis()
with kcgdb.get_conn() as hist_conn:
dark_history_map = _fetch_dark_history(hist_conn, list(all_chinese))
pipeline_dark_tiers = {'CRITICAL': 0, 'HIGH': 0, 'WATCH': 0, 'NONE': 0}
pipeline_skip_counts = {'kr_domestic': 0, 'gear_signal': 0}
results = []
for c in classifications:
mmsi = c['mmsi']
df_v = vessel_dfs.get(mmsi)
if df_v is None or len(df_v) == 0:
continue
last_row = df_v.iloc[-1]
ts = last_row.get('timestamp')
zone_info = classify_zone(last_row['lat'], last_row['lon'])
gear_map = {'TRAWL': 'OT', 'PURSE': 'PS', 'LONGLINE': 'GN', 'TRAP': 'TRAP'}
gear = gear_map.get(c['vessel_type'], 'OT')
ucaf = compute_ucaf_score(df_v, gear)
ucft = compute_ucft_score(df_v)
# ── Dark: 넓은 탐지 + 의도적 OFF 의심 점수화 ──
vname = vessel_store.get_vessel_info(mmsi).get('name', '') or ''
is_permitted = vessel_store.is_permitted(mmsi)
dark_excluded, dark_skip_reason = _is_dark_excluded(mmsi, vname)
if dark_excluded:
pipeline_skip_counts[dark_skip_reason] = pipeline_skip_counts.get(dark_skip_reason, 0) + 1
dark = False
gap_min = 0
dark_features: dict = {
'dark_suspicion_score': 0,
'dark_patterns': [],
'dark_tier': 'EXCLUDED',
'dark_history_7d': 0,
'dark_history_24h': 0,
}
else:
gap_info = analyze_dark_pattern(df_v)
dark = bool(gap_info.get('is_dark'))
gap_min = int(gap_info.get('gap_min') or 0)
history = dark_history_map.get(mmsi, {'count_7d': 0, 'count_24h': 0})
score, patterns, tier = compute_dark_suspicion(
gap_info, mmsi, is_permitted, history,
now_kst_hour, classify_zone,
)
pipeline_dark_tiers[tier] = pipeline_dark_tiers.get(tier, 0) + 1
dark_features = {
'dark_suspicion_score': score,
'dark_patterns': patterns,
'dark_tier': tier,
'dark_history_7d': int(history.get('count_7d', 0) or 0),
'dark_history_24h': int(history.get('count_24h', 0) or 0),
'gap_start_lat': gap_info.get('gap_start_lat'),
'gap_start_lon': gap_info.get('gap_start_lon'),
'gap_start_sog': gap_info.get('gap_start_sog'),
'gap_start_state': gap_info.get('gap_start_state'),
}
spoof_score = compute_spoofing_score(df_v)
speed_jumps = count_speed_jumps(df_v)
bd09_offset = compute_bd09_offset(last_row['lat'], last_row['lon'])
fleet_info = fleet_roles.get(mmsi, {})
risk_score, risk_level = compute_vessel_risk_score(
mmsi, df_v, zone_info, is_permitted=is_permitted,
)
activity = 'UNKNOWN'
if 'state' in df_v.columns and len(df_v) > 0:
activity = df_v['state'].mode().iloc[0]
merged_features = {**(c.get('features', {}) or {}), **dark_features}
results.append(AnalysisResult(
mmsi=mmsi,
timestamp=ts,
vessel_type=c['vessel_type'],
confidence=c['confidence'],
fishing_pct=c['fishing_pct'],
cluster_id=fleet_info.get('cluster_id', -1),
season=c['season'],
zone=zone_info.get('zone', 'EEZ_OR_BEYOND'),
dist_to_baseline_nm=zone_info.get('dist_from_baseline_nm', 999.0),
activity_state=activity,
ucaf_score=ucaf,
ucft_score=ucft,
is_dark=dark,
gap_duration_min=gap_min,
spoofing_score=spoof_score,
bd09_offset_m=bd09_offset,
speed_jump_count=speed_jumps,
cluster_size=fleet_info.get('cluster_size', 0),
is_leader=fleet_info.get('is_leader', False),
fleet_role=fleet_info.get('fleet_role', 'NOISE'),
risk_score=risk_score,
risk_level=risk_level,
features=merged_features,
))
logger.info(
'pipeline dark: tiers=%s skip=%s',
pipeline_dark_tiers, pipeline_skip_counts,
)
# ── 5.5 경량 분석 — 파이프라인 미통과 412* 선박 ──
# vessel_store._tracks의 24h 누적 궤적을 직접 활용하여 dark/spoof 신호도 산출.
from algorithms.risk import compute_lightweight_risk_score
pipeline_mmsis = {c['mmsi'] for c in classifications}
lightweight_mmsis = vessel_store.get_chinese_mmsis() - pipeline_mmsis
if lightweight_mmsis:
now = datetime.now(timezone.utc)
all_positions = vessel_store.get_all_latest_positions()
lw_count = 0
lw_dark = 0
lw_spoof = 0
lw_dark_tiers = {'CRITICAL': 0, 'HIGH': 0, 'WATCH': 0, 'NONE': 0}
lw_dark_skip = {'kr_domestic': 0, 'gear_signal': 0}
for mmsi in lightweight_mmsis:
pos = all_positions.get(mmsi)
if pos is None or pos.get('lat') is None:
continue
lat, lon = pos['lat'], pos['lon']
sog = pos.get('sog', 0) or 0
cog = pos.get('cog', 0) or 0
ts = pos.get('timestamp', now)
zone_info = classify_zone(lat, lon)
if sog <= 1.0:
state = 'STATIONARY'
elif sog <= 5.0:
state = 'FISHING'
else:
state = 'SAILING'
is_permitted = vessel_store.is_permitted(mmsi)
vname = vessel_store.get_vessel_info(mmsi).get('name', '') or ''
# ── Dark: 사전 필터 (어구/한국선) ──
dark_excluded, dark_skip_reason = _is_dark_excluded(mmsi, vname)
if dark_excluded:
lw_dark_skip[dark_skip_reason] = lw_dark_skip.get(dark_skip_reason, 0) + 1
dark = False
gap_min = 0
dark_features: dict = {
'dark_suspicion_score': 0,
'dark_patterns': [],
'dark_tier': 'EXCLUDED',
'dark_history_7d': 0,
'dark_history_24h': 0,
}
spoof_score = 0.0
speed_jumps = 0
else:
df_v = vessel_store._tracks.get(mmsi)
spoof_score = 0.0
speed_jumps = 0
if df_v is not None and len(df_v) >= 2:
try:
spoof_score = compute_spoofing_score(df_v)
except Exception:
pass
try:
speed_jumps = count_speed_jumps(df_v)
except Exception:
pass
try:
gap_info = analyze_dark_pattern(df_v)
except Exception:
gap_info = {'is_dark': False, 'gap_min': 0}
else:
gap_info = {'is_dark': False, 'gap_min': 0}
dark = bool(gap_info.get('is_dark'))
gap_min = int(gap_info.get('gap_min') or 0)
history = dark_history_map.get(mmsi, {'count_7d': 0, 'count_24h': 0})
score, patterns, tier = compute_dark_suspicion(
gap_info, mmsi, is_permitted, history,
now_kst_hour, classify_zone,
)
lw_dark_tiers[tier] = lw_dark_tiers.get(tier, 0) + 1
dark_features = {
'dark_suspicion_score': score,
'dark_patterns': patterns,
'dark_tier': tier,
'dark_history_7d': int(history.get('count_7d', 0) or 0),
'dark_history_24h': int(history.get('count_24h', 0) or 0),
'gap_start_lat': gap_info.get('gap_start_lat'),
'gap_start_lon': gap_info.get('gap_start_lon'),
'gap_start_sog': gap_info.get('gap_start_sog'),
'gap_start_state': gap_info.get('gap_start_state'),
}
if dark:
lw_dark += 1
if spoof_score > 0.5:
lw_spoof += 1
risk_score, risk_level = compute_lightweight_risk_score(
zone_info, sog, is_permitted=is_permitted,
is_dark=dark, gap_duration_min=gap_min,
spoofing_score=spoof_score,
)
# BD-09 오프셋은 중국 선박이므로 제외 (412* = 중국)
results.append(AnalysisResult(
mmsi=mmsi,
timestamp=ts,
vessel_type='UNKNOWN',
confidence=0.0,
fishing_pct=0.0,
zone=zone_info.get('zone', 'EEZ_OR_BEYOND'),
dist_to_baseline_nm=zone_info.get('dist_from_baseline_nm', 999.0),
activity_state=state,
ucaf_score=0.0,
ucft_score=0.0,
is_dark=dark,
gap_duration_min=gap_min,
spoofing_score=spoof_score,
bd09_offset_m=0.0,
speed_jump_count=speed_jumps,
cluster_id=-1,
cluster_size=0,
is_leader=False,
fleet_role='NONE',
risk_score=risk_score,
risk_level=risk_level,
is_transship_suspect=False,
transship_pair_mmsi='',
transship_duration_min=0,
features=dark_features,
))
lw_count += 1
logger.info(
'lightweight analysis: %d vessels (dark=%d, spoof>0.5=%d, tiers=%s, skip=%s)',
lw_count, lw_dark, lw_spoof, lw_dark_tiers, lw_dark_skip,
)
# 6. 환적 의심 탐지 (점수 기반, 베테랑 관점 필터)
from algorithms.transshipment import detect_transshipment
results_map = {r.mmsi: r for r in results}
transship_items = detect_transshipment(
df_targets,
_transship_pair_history,
get_vessel_info=vessel_store.get_vessel_info,
is_permitted=vessel_store.is_permitted,
classify_zone_fn=classify_zone,
now_kst_hour=now_kst_hour,
)
for item in transship_items:
a = item['pair_a']
b = item['pair_b']
dur = item['duration_min']
tier = item['severity']
if tier == 'WATCH':
continue # WATCH 등급은 저장 안 함 (로그만)
for m, pair in ((a, b), (b, a)):
if m in results_map:
r_obj = results_map[m]
r_obj.is_transship_suspect = True
r_obj.transship_pair_mmsi = pair
r_obj.transship_duration_min = dur
r_obj.features = {
**(r_obj.features or {}),
'transship_tier': tier,
'transship_score': item['score'],
}
# 7. 결과 저장
upserted = kcgdb.upsert_results(results)
kcgdb.cleanup_old(hours=48)
# 8. 출력 모듈 (이벤트 생성, 위반 분류, KPI 갱신, 통계 집계, 경보)
try:
from output.violation_classifier import run_violation_classifier
from output.event_generator import run_event_generator
from output.kpi_writer import run_kpi_writer
from output.stats_aggregator import aggregate_hourly, aggregate_daily
from output.alert_dispatcher import run_alert_dispatcher
from dataclasses import asdict
results_dicts = [asdict(r) for r in results]
# 필드명 매핑 (AnalysisResult → 출력 모듈 기대 형식)
for d in results_dicts:
d['zone_code'] = d.pop('zone', None)
d['gap_duration_min'] = d.get('gap_duration_min', 0)
d['transship_suspect'] = d.pop('is_transship_suspect', False)
d['fleet_is_leader'] = d.pop('is_leader', False)
d['fleet_cluster_id'] = d.pop('cluster_id', None)
d['speed_kn'] = None # 분석 결과에 속도 없음
run_violation_classifier(results_dicts)
run_event_generator(results_dicts)
run_kpi_writer()
aggregate_hourly()
aggregate_daily()
run_alert_dispatcher()
logger.info('output modules completed')
except Exception as e:
logger.warning('output modules failed (non-fatal): %s', e)
# 9. Redis에 분석 컨텍스트 캐싱 (채팅용)
try:
from chat.cache import cache_analysis_context
results_map = {r.mmsi: r for r in results}
risk_dist = {}
zone_dist = {}
dark_count = 0
spoofing_count = 0
transship_count = 0
top_risk_list = []
for r in results:
risk_dist[r.risk_level] = risk_dist.get(r.risk_level, 0) + 1
zone_dist[r.zone] = zone_dist.get(r.zone, 0) + 1
if r.is_dark:
dark_count += 1
if r.spoofing_score > 0.5:
spoofing_count += 1
if r.is_transship_suspect:
transship_count += 1
top_risk_list.append({
'mmsi': r.mmsi,
'name': vessel_store.get_vessel_info(r.mmsi).get('name', r.mmsi),
'risk_score': r.risk_score,
'risk_level': r.risk_level,
'zone': r.zone,
'is_dark': r.is_dark,
'is_transship': r.is_transship_suspect,
'activity_state': r.activity_state,
})
top_risk_list.sort(key=lambda x: x['risk_score'], reverse=True)
cache_analysis_context({
'vessel_stats': vessel_store.stats(),
'risk_distribution': {**risk_dist, **zone_dist},
'dark_count': dark_count,
'spoofing_count': spoofing_count,
'transship_count': transship_count,
'top_risk_vessels': top_risk_list[:10],
'polygon_summary': kcgdb.fetch_polygon_summary(),
})
except Exception as e:
logger.warning('failed to cache analysis context for chat: %s', e)
elapsed = round(time.time() - start, 2)
_last_run['duration_sec'] = elapsed
_last_run['vessel_count'] = len(results)
_last_run['upserted'] = upserted
logger.info(
'analysis cycle: %d vessels, %d upserted, %.2fs',
len(results), upserted, elapsed,
)
except Exception as e:
_last_run['error'] = str(e)
logger.exception('analysis cycle failed: %s', e)
def start_scheduler():
global _scheduler
_scheduler = BackgroundScheduler()
_scheduler.add_job(
run_analysis_cycle,
'interval',
minutes=settings.SCHEDULER_INTERVAL_MIN,
id='vessel_analysis',
max_instances=1,
replace_existing=True,
)
# 파티션 유지보수 (매일 04:00)
from db.partition_manager import maintain_partitions
_scheduler.add_job(
maintain_partitions,
'cron', hour=4, minute=0,
id='partition_maintenance',
max_instances=1,
replace_existing=True,
)
_scheduler.start()
logger.info('scheduler started (interval=%dm)', settings.SCHEDULER_INTERVAL_MIN)
def stop_scheduler():
global _scheduler
if _scheduler:
_scheduler.shutdown(wait=False)
_scheduler = None
logger.info('scheduler stopped')