import logging import time from datetime import datetime, timedelta, timezone from typing import Optional from zoneinfo import ZoneInfo from apscheduler.schedulers.background import BackgroundScheduler from config import settings from fleet_tracker import GEAR_PATTERN logger = logging.getLogger(__name__) _KST = ZoneInfo('Asia/Seoul') _scheduler: Optional[BackgroundScheduler] = None _last_run: dict = { 'timestamp': None, 'duration_sec': 0, 'vessel_count': 0, 'upserted': 0, 'error': None, } _transship_pair_history: dict = {} # 한국 선박 MMSI prefix — dark 판별 완전 제외 _KR_DOMESTIC_PREFIXES = ('440', '441') def _is_dark_excluded(mmsi: str, name: str) -> tuple[bool, str]: """dark 탐지 대상에서 완전 제외할지. 어구/한국선만 필터. 사용자 알람은 선박만 대상, 한국선은 해경 관할 아님. """ if any(mmsi.startswith(p) for p in _KR_DOMESTIC_PREFIXES): return True, 'kr_domestic' if name and GEAR_PATTERN.match(name): return True, 'gear_signal' return False, '' def _fetch_dark_history(kcg_conn, mmsi_list: list[str]) -> dict[str, dict]: """최근 7일 내 is_dark=True 이력을 mmsi별로 집계. 사이클 시작 시 한 번에 조회하여 점수 계산 시 재사용. """ if not mmsi_list: return {} try: cur = kcg_conn.cursor() cur.execute( """ SELECT mmsi, count(DISTINCT analyzed_at::date) AS n7, count(DISTINCT analyzed_at::date) FILTER (WHERE analyzed_at > now() - interval '24 hours') AS n24, max(analyzed_at) AS last_at FROM kcg.vessel_analysis_results WHERE is_dark = true AND gap_duration_min >= 100 AND analyzed_at > now() - interval '7 days' AND mmsi = ANY(%s) GROUP BY mmsi """, (list(mmsi_list),), ) return { str(m): {'count_7d': int(n7 or 0), 'count_24h': int(n24 or 0), 'last_at': t} for m, n7, n24, t in cur.fetchall() } except Exception as e: logger.warning('fetch_dark_history failed: %s', e) return {} def get_last_run() -> dict: return _last_run.copy() def run_analysis_cycle(): """5분 주기 분석 사이클 — 인메모리 캐시 기반.""" from cache.vessel_store import vessel_store from db import snpdb, kcgdb from pipeline.orchestrator import ChineseFishingVesselPipeline from algorithms.location import classify_zone from algorithms.fishing_pattern import compute_ucaf_score, compute_ucft_score from algorithms.dark_vessel import is_dark_vessel, analyze_dark_pattern, compute_dark_suspicion from algorithms.spoofing import compute_spoofing_score, count_speed_jumps, compute_bd09_offset from algorithms.risk import compute_vessel_risk_score from fleet_tracker import fleet_tracker from models.result import AnalysisResult start = time.time() _last_run['timestamp'] = datetime.now(timezone.utc).isoformat() _last_run['error'] = None try: # 1. 증분 로드 + stale 제거 if vessel_store.last_bucket is None: logger.warning('last_bucket is None, skipping incremental fetch (initial load not complete)') df_new = None else: df_new = snpdb.fetch_incremental(vessel_store.last_bucket) if df_new is not None and len(df_new) > 0: vessel_store.merge_incremental(df_new) vessel_store.evict_stale(settings.CACHE_WINDOW_HOURS) # 정적정보 / 허가어선 주기적 갱신 vessel_store.refresh_static_info() vessel_store.refresh_permit_registry() # signal-batch API 정적정보 보강 (shipKindCode, status, heading, draught 등) vessel_store.enrich_from_signal_api(minutes=10) # 2. 분석 대상 선별 (SOG/COG 계산 포함) df_targets = vessel_store.select_analysis_targets() if len(df_targets) == 0: logger.info('no analysis targets, skipping cycle') _last_run['vessel_count'] = 0 return # 3. 7단계 파이프라인 실행 pipeline = ChineseFishingVesselPipeline() classifications, vessel_dfs = pipeline.run(df_targets) if not classifications: logger.info('no vessels classified, skipping') _last_run['vessel_count'] = 0 return # 4. 등록 선단 기반 fleet 분석 with kcgdb.get_conn() as kcg_conn: fleet_tracker.load_registry(kcg_conn) all_ais = [] for mmsi, df in vessel_dfs.items(): if len(df) > 0: last = df.iloc[-1] all_ais.append({ 'mmsi': mmsi, 'name': vessel_store.get_vessel_info(mmsi).get('name', ''), 'lat': float(last['lat']), 'lon': float(last['lon']), }) fleet_tracker.match_ais_to_registry(all_ais, kcg_conn) gear_signals = [v for v in all_ais if GEAR_PATTERN.match(v.get('name', '') or '')] fleet_tracker.track_gear_identity(gear_signals, kcg_conn) fleet_roles = fleet_tracker.build_fleet_clusters(vessel_dfs) fleet_tracker.save_snapshot(vessel_dfs, kcg_conn) gear_groups = [] # 4.5 그룹 폴리곤 생성 + 저장 try: from algorithms.polygon_builder import detect_gear_groups, build_all_group_snapshots company_vessels = fleet_tracker.get_company_vessels(vessel_dfs) gear_groups = detect_gear_groups(vessel_store) group_snapshots = build_all_group_snapshots( vessel_store, company_vessels, fleet_tracker._companies, ) saved = kcgdb.save_group_snapshots(group_snapshots) cleaned = kcgdb.cleanup_group_snapshots(days=7) logger.info('group polygons: %d saved, %d cleaned, %d gear groups', saved, cleaned, len(gear_groups)) except Exception as e: logger.warning('group polygon generation failed: %s', e) # 4.7 어구 연관성 분석 (멀티모델 패턴 추적) try: from algorithms.gear_correlation import run_gear_correlation from algorithms.gear_parent_inference import run_gear_parent_inference corr_result = run_gear_correlation( vessel_store=vessel_store, gear_groups=gear_groups, conn=kcg_conn, ) logger.info( 'gear correlation: %d scores updated, %d raw metrics, %d models', corr_result['updated'], corr_result['raw_inserted'], corr_result['models'], ) inference_result = run_gear_parent_inference( vessel_store=vessel_store, gear_groups=gear_groups, conn=kcg_conn, ) logger.info( 'gear parent inference: %d groups, %d direct-match, %d candidates, %d promoted, %d review, %d skipped', inference_result['groups'], inference_result.get('direct_matched', 0), inference_result['candidates'], inference_result['promoted'], inference_result['review_required'], inference_result['skipped'], ) except Exception as e: logger.warning('gear correlation failed: %s', e) # 4.9 페어 후보 탐색 (bbox 1차 + 궤적 유사도 2차 → G-06 pair_trawl 판정) pair_results: dict[str, dict] = {} try: from algorithms.pair_trawl import find_pair_candidates, detect_pair_trawl pt_registered = fleet_tracker.get_pt_registered_mmsis() pt_sub_registered: set[str] = set() # TODO: fishery_code=PT-S 구분 base_mmsis: set[str] = {c['mmsi'] for c in classifications} base_mmsis |= pt_registered # pool 은 전체 24h 누적 tracks (중국 8k+ 한국/러시아 포함 55k). # sog/cog 미계산 상태여도 _trajectory_similarity 내부에서 on-demand 계산. pool_tracks = getattr(vessel_store, '_tracks', {}) or vessel_dfs # 조업 속력대(1.5~5.0kn)에서 움직이는 모든 중국 선박을 base로 확장. # 중국 MID: 412(본토) / 413(홍콩) / 414(마카오) for mmsi, df in pool_tracks.items(): if not (mmsi.startswith('412') or mmsi.startswith('413') or mmsi.startswith('414')): continue if df is None or df.empty: continue sog_col = 'sog' if 'sog' in df.columns else ('raw_sog' if 'raw_sog' in df.columns else None) if sog_col is None: continue try: mean_sog = float(df[sog_col].tail(12).mean()) if 1.5 <= mean_sog <= 5.0: base_mmsis.add(mmsi) except Exception: continue pair_candidates = find_pair_candidates( base_mmsis=base_mmsis, vessel_dfs=pool_tracks, get_vessel_info=vessel_store.get_vessel_info, pt_registered=pt_registered, pt_sub_registered=pt_sub_registered, min_common_samples=4, ) from algorithms.pair_trawl import REJECT_COUNTERS, reset_reject_counters reset_reject_counters() tier_counts = {'STRONG': 0, 'PROBABLE': 0, 'SUSPECT': 0} pt_det = 0; coop_det = 0 for cand in pair_candidates: ma, mb = cand['base_mmsi'], cand['target_mmsi'] if ma not in pool_tracks or mb not in pool_tracks: continue result = detect_pair_trawl( pool_tracks[ma], pool_tracks[mb], ma, mb, role_a=cand['base_role'], role_b=cand['target_role'], similarity=cand['similarity'], ) if not result.get('pair_detected'): continue tier = result.get('tier') or 'UNKNOWN' tier_counts[tier] = tier_counts.get(tier, 0) + 1 pair_results[ma] = {**result, 'pair_mmsi': mb} pair_results[mb] = {**result, 'pair_mmsi': ma} if result.get('pair_type') == 'PT_REGISTERED': pt_det += 1 elif result.get('pair_type') == 'COOP_FISHING': coop_det += 1 logger.info( 'pair detection: candidates=%d, detected=%d ' '(STRONG=%d PROBABLE=%d SUSPECT=%d, pt=%d coop=%d) ' 'reject={empty=%d miss=%d insuf_align=%d no_sync=%d}', len(pair_candidates), len(pair_results) // 2, tier_counts['STRONG'], tier_counts['PROBABLE'], tier_counts['SUSPECT'], pt_det, coop_det, REJECT_COUNTERS['empty_df'], REJECT_COUNTERS['missing_columns'], REJECT_COUNTERS['insufficient_aligned'], REJECT_COUNTERS['no_sync_at_any_tier'], ) except Exception as e: logger.warning('pair detection failed: %s', e) # 5. 선박별 추가 알고리즘 → AnalysisResult 생성 # dark 이력 일괄 조회 (7일 history) — 사이클당 1회 now_kst_hour = datetime.now(_KST).hour all_chinese = vessel_store.get_chinese_mmsis() with kcgdb.get_conn() as hist_conn: dark_history_map = _fetch_dark_history(hist_conn, list(all_chinese)) pipeline_dark_tiers = {'CRITICAL': 0, 'HIGH': 0, 'WATCH': 0, 'NONE': 0} pipeline_skip_counts = {'kr_domestic': 0, 'gear_signal': 0} results = [] for c in classifications: mmsi = c['mmsi'] df_v = vessel_dfs.get(mmsi) if df_v is None or len(df_v) == 0: continue last_row = df_v.iloc[-1] ts = last_row.get('timestamp') zone_info = classify_zone(last_row['lat'], last_row['lon']) gear_map = {'TRAWL': 'OT', 'PT': 'PT', 'PURSE': 'PS', 'LONGLINE': 'GN', 'TRAP': 'TRAP'} # fleet_registry gear_code C21(쌍끌이) → vessel_type 'PT' 오버라이드 vtype = c['vessel_type'] registered_gear_code: Optional[str] = None if hasattr(fleet_tracker, 'get_vessel_gear_code'): registered_gear_code = fleet_tracker.get_vessel_gear_code(mmsi) if registered_gear_code == 'C21': vtype = 'PT' gear = gear_map.get(vtype, 'OT') ucaf = compute_ucaf_score(df_v, gear) ucft = compute_ucft_score(df_v) # ── Dark: 넓은 탐지 + 의도적 OFF 의심 점수화 ── vname = vessel_store.get_vessel_info(mmsi).get('name', '') or '' is_permitted = vessel_store.is_permitted(mmsi) dark_excluded, dark_skip_reason = _is_dark_excluded(mmsi, vname) if dark_excluded: pipeline_skip_counts[dark_skip_reason] = pipeline_skip_counts.get(dark_skip_reason, 0) + 1 dark = False gap_min = 0 dark_features: dict = { 'dark_suspicion_score': 0, 'dark_patterns': [], 'dark_tier': 'EXCLUDED', 'dark_history_7d': 0, 'dark_history_24h': 0, } else: gap_info = analyze_dark_pattern(df_v) dark = bool(gap_info.get('is_dark')) gap_min = int(gap_info.get('gap_min') or 0) history = dark_history_map.get(mmsi, {'count_7d': 0, 'count_24h': 0}) v_info = vessel_store.get_vessel_info(mmsi) last_cog_val = float(df_v.iloc[-1].get('cog', 0)) if len(df_v) > 0 else None score, patterns, tier = compute_dark_suspicion( gap_info, mmsi, is_permitted, history, now_kst_hour, classify_zone, ship_kind_code=v_info.get('ship_kind_code', ''), nav_status=v_info.get('status', ''), heading=v_info.get('heading'), last_cog=last_cog_val, ) pipeline_dark_tiers[tier] = pipeline_dark_tiers.get(tier, 0) + 1 dark_features = { 'dark_suspicion_score': score, 'dark_patterns': patterns, 'dark_tier': tier, 'dark_history_7d': int(history.get('count_7d', 0) or 0), 'dark_history_24h': int(history.get('count_24h', 0) or 0), 'gap_start_lat': gap_info.get('gap_start_lat'), 'gap_start_lon': gap_info.get('gap_start_lon'), 'gap_start_sog': gap_info.get('gap_start_sog'), 'gap_start_state': gap_info.get('gap_start_state'), } spoof_score = compute_spoofing_score(df_v) speed_jumps = count_speed_jumps(df_v) bd09_offset = compute_bd09_offset(last_row['lat'], last_row['lon']) fleet_info = fleet_roles.get(mmsi, {}) risk_score, risk_level = compute_vessel_risk_score( mmsi, df_v, zone_info, is_permitted=is_permitted, ) activity = 'UNKNOWN' if 'state' in df_v.columns and len(df_v) > 0: activity = df_v['state'].mode().iloc[0] # ── G-01/G-04/G-05/G-06 통합 판정 (DAR-03) ── from algorithms.gear_violation import classify_gear_violations pair_result = pair_results.get(mmsi) if pair_result and not pair_result.get('pair_detected'): pair_result = None # G-06 판정은 STRONG/PROBABLE tier + 유효 pair_type 만. SUSPECT 는 플래그만 유지. if pair_result: if pair_result.get('tier') not in ('STRONG', 'PROBABLE'): pair_result = None elif pair_result.get('pair_type') not in ( 'PT_REGISTERED', 'TRANSSHIP_LIKE', 'COOP_FISHING', 'GENERIC', ): pair_result = None gear_episodes: list = [] gear_positions: list = [] permit_periods: list = [] if gear in ('GN', 'TRAP', 'FYK', 'FPO', 'GNS', 'GND'): try: with kcgdb.get_conn() as gv_conn: gear_episodes = fleet_tracker.get_gear_episodes(mmsi, gv_conn, hours=24) gear_positions = fleet_tracker.get_gear_positions(mmsi, df_v) except Exception as e: logger.debug('gear episode/pos 조회 실패 [%s]: %s', mmsi, e) # G-02/G-03 입력: 허가 조업 기간 + 등록 업종코드 (등록 매칭된 선박만 대상) registered_fishery_code = fleet_tracker.get_registered_fishery_code(mmsi) if registered_fishery_code: try: with kcgdb.get_conn() as gp_conn: permit_periods = fleet_tracker.get_permit_periods(mmsi, gp_conn) except Exception as e: logger.debug('permit_periods 조회 실패 [%s]: %s', mmsi, e) observation_ts = ts if isinstance(ts, datetime) else None if observation_ts is None and ts is not None: try: import pandas as pd observation_ts = pd.to_datetime(ts).to_pydatetime() except Exception: observation_ts = None gv = classify_gear_violations( mmsi=mmsi, gear_type=gear, zone_info=zone_info, df_vessel=df_v, pair_result=pair_result, is_permitted=is_permitted, gear_episodes=gear_episodes or None, gear_positions=gear_positions or None, permit_periods=permit_periods or None, registered_fishery_code=registered_fishery_code, observation_ts=observation_ts, ) g_codes = gv['g_codes'] gear_judgment = gv['gear_judgment'] gear_violation_score = gv['gear_violation_score'] gear_violation_evidence = gv['evidence'] zone_code = zone_info.get('zone', 'EEZ_OR_BEYOND') # risk_score에 어구 위반 가산 final_risk = min(100, risk_score + gear_violation_score) final_risk_level = risk_level if final_risk >= 70: final_risk_level = 'CRITICAL' elif final_risk >= 50: final_risk_level = 'HIGH' elif final_risk >= 30: final_risk_level = 'MEDIUM' # pair_result 는 STRONG/PROBABLE 필터링으로 SUSPECT 는 None. # SUSPECT tier 까지 통계로 남기려면 raw pair 결과도 조회. raw_pair = pair_results.get(mmsi) or {} merged_features = { **(c.get('features', {}) or {}), **dark_features, 'g_codes': g_codes, 'gear_violation_score': gear_violation_score, 'gear_violation_evidence': gear_violation_evidence, 'pair_trawl_detected': bool(pair_result and pair_result.get('pair_detected')), 'pair_trawl_pair_mmsi': (pair_result or {}).get('pair_mmsi', ''), 'pair_tier': raw_pair.get('tier') or '', 'pair_type': raw_pair.get('pair_type') or '', 'pair_reject_reason': raw_pair.get('reject_reason') or '', 'similarity': raw_pair.get('similarity', 0), 'confidence': raw_pair.get('confidence', 0), 'registered_fishery_code': registered_fishery_code or '', } results.append(AnalysisResult( mmsi=mmsi, timestamp=ts, vessel_type=vtype, confidence=c['confidence'], fishing_pct=c['fishing_pct'], cluster_id=fleet_info.get('cluster_id', -1), season=c['season'], zone=zone_info.get('zone', 'EEZ_OR_BEYOND'), dist_to_baseline_nm=zone_info.get('dist_from_baseline_nm', 999.0), activity_state=activity, ucaf_score=ucaf, ucft_score=ucft, is_dark=dark, gap_duration_min=gap_min, spoofing_score=spoof_score, bd09_offset_m=bd09_offset, speed_jump_count=speed_jumps, cluster_size=fleet_info.get('cluster_size', 0), is_leader=fleet_info.get('is_leader', False), fleet_role=fleet_info.get('fleet_role', 'NOISE'), risk_score=final_risk, risk_level=final_risk_level, features=merged_features, gear_judgment=gear_judgment, gear_code=registered_gear_code, )) logger.info( 'pipeline dark: tiers=%s skip=%s', pipeline_dark_tiers, pipeline_skip_counts, ) # ── 5.5 경량 분석 — 파이프라인 미통과 412* 선박 ── # vessel_store._tracks의 24h 누적 궤적을 직접 활용하여 dark/spoof 신호도 산출. from algorithms.risk import compute_lightweight_risk_score pipeline_mmsis = {c['mmsi'] for c in classifications} lightweight_mmsis = vessel_store.get_chinese_mmsis() - pipeline_mmsis if lightweight_mmsis: now = datetime.now(timezone.utc) all_positions = vessel_store.get_all_latest_positions() lw_count = 0 lw_dark = 0 lw_spoof = 0 lw_dark_tiers = {'CRITICAL': 0, 'HIGH': 0, 'WATCH': 0, 'NONE': 0} lw_dark_skip = {'kr_domestic': 0, 'gear_signal': 0} for mmsi in lightweight_mmsis: pos = all_positions.get(mmsi) if pos is None or pos.get('lat') is None: continue lat, lon = pos['lat'], pos['lon'] sog = pos.get('sog', 0) or 0 cog = pos.get('cog', 0) or 0 ts = pos.get('timestamp', now) zone_info = classify_zone(lat, lon) if sog <= 1.0: state = 'STATIONARY' elif sog <= 5.0: state = 'FISHING' else: state = 'SAILING' is_permitted = vessel_store.is_permitted(mmsi) vname = vessel_store.get_vessel_info(mmsi).get('name', '') or '' # ── Dark: 사전 필터 (어구/한국선) ── dark_excluded, dark_skip_reason = _is_dark_excluded(mmsi, vname) if dark_excluded: lw_dark_skip[dark_skip_reason] = lw_dark_skip.get(dark_skip_reason, 0) + 1 dark = False gap_min = 0 dark_features: dict = { 'dark_suspicion_score': 0, 'dark_patterns': [], 'dark_tier': 'EXCLUDED', 'dark_history_7d': 0, 'dark_history_24h': 0, } spoof_score = 0.0 speed_jumps = 0 else: df_v = vessel_store._tracks.get(mmsi) spoof_score = 0.0 speed_jumps = 0 if df_v is not None and len(df_v) >= 2: try: spoof_score = compute_spoofing_score(df_v) except Exception: pass try: speed_jumps = count_speed_jumps(df_v) except Exception: pass try: gap_info = analyze_dark_pattern(df_v) except Exception: gap_info = {'is_dark': False, 'gap_min': 0} else: gap_info = {'is_dark': False, 'gap_min': 0} dark = bool(gap_info.get('is_dark')) gap_min = int(gap_info.get('gap_min') or 0) history = dark_history_map.get(mmsi, {'count_7d': 0, 'count_24h': 0}) lw_info = vessel_store.get_vessel_info(mmsi) lw_last_cog = float(df_v.iloc[-1].get('cog', 0)) if df_v is not None and len(df_v) > 0 else None score, patterns, tier = compute_dark_suspicion( gap_info, mmsi, is_permitted, history, now_kst_hour, classify_zone, ship_kind_code=lw_info.get('ship_kind_code', ''), nav_status=lw_info.get('status', ''), heading=lw_info.get('heading'), last_cog=lw_last_cog, ) lw_dark_tiers[tier] = lw_dark_tiers.get(tier, 0) + 1 dark_features = { 'dark_suspicion_score': score, 'dark_patterns': patterns, 'dark_tier': tier, 'dark_history_7d': int(history.get('count_7d', 0) or 0), 'dark_history_24h': int(history.get('count_24h', 0) or 0), 'gap_start_lat': gap_info.get('gap_start_lat'), 'gap_start_lon': gap_info.get('gap_start_lon'), 'gap_start_sog': gap_info.get('gap_start_sog'), 'gap_start_state': gap_info.get('gap_start_state'), } if dark: lw_dark += 1 if spoof_score > 0.5: lw_spoof += 1 risk_score, risk_level = compute_lightweight_risk_score( zone_info, sog, is_permitted=is_permitted, is_dark=dark, gap_duration_min=gap_min, spoofing_score=spoof_score, ) # BD-09 오프셋은 중국 선박이므로 제외 (412* = 중국) results.append(AnalysisResult( mmsi=mmsi, timestamp=ts, vessel_type='UNKNOWN', confidence=0.0, fishing_pct=0.0, zone=zone_info.get('zone', 'EEZ_OR_BEYOND'), dist_to_baseline_nm=zone_info.get('dist_from_baseline_nm', 999.0), activity_state=state, ucaf_score=0.0, ucft_score=0.0, is_dark=dark, gap_duration_min=gap_min, spoofing_score=spoof_score, bd09_offset_m=0.0, speed_jump_count=speed_jumps, cluster_id=-1, cluster_size=0, is_leader=False, fleet_role='NONE', risk_score=risk_score, risk_level=risk_level, is_transship_suspect=False, transship_pair_mmsi='', transship_duration_min=0, features=dark_features, gear_code=( fleet_tracker.get_vessel_gear_code(mmsi) if hasattr(fleet_tracker, 'get_vessel_gear_code') else None ), )) lw_count += 1 logger.info( 'lightweight analysis: %d vessels (dark=%d, spoof>0.5=%d, tiers=%s, skip=%s)', lw_count, lw_dark, lw_spoof, lw_dark_tiers, lw_dark_skip, ) # 6. 환적 의심 탐지 (점수 기반, 베테랑 관점 필터) from algorithms.transshipment import detect_transshipment results_map = {r.mmsi: r for r in results} transship_items = detect_transshipment( df_targets, _transship_pair_history, get_vessel_info=vessel_store.get_vessel_info, is_permitted=vessel_store.is_permitted, classify_zone_fn=classify_zone, now_kst_hour=now_kst_hour, ) for item in transship_items: a = item['pair_a'] b = item['pair_b'] dur = item['duration_min'] tier = item['severity'] if tier == 'WATCH': continue # WATCH 등급은 저장 안 함 (로그만) for m, pair in ((a, b), (b, a)): if m in results_map: r_obj = results_map[m] r_obj.is_transship_suspect = True r_obj.transship_pair_mmsi = pair r_obj.transship_duration_min = dur r_obj.features = { **(r_obj.features or {}), 'transship_tier': tier, 'transship_score': item['score'], } # 7. 결과 저장 upserted = kcgdb.upsert_results(results) kcgdb.cleanup_old(hours=48) # 8. 출력 모듈 (이벤트 생성, 위반 분류, KPI 갱신, 통계 집계, 경보) try: from output.violation_classifier import run_violation_classifier from output.event_generator import run_event_generator from output.kpi_writer import run_kpi_writer from output.stats_aggregator import aggregate_hourly, aggregate_daily from output.alert_dispatcher import run_alert_dispatcher from dataclasses import asdict results_dicts = [asdict(r) for r in results] # 필드명 매핑 (AnalysisResult → 출력 모듈 기대 형식) for d in results_dicts: d['zone_code'] = d.pop('zone', None) d['gap_duration_min'] = d.get('gap_duration_min', 0) d['transship_suspect'] = d.pop('is_transship_suspect', False) d['fleet_is_leader'] = d.pop('is_leader', False) d['fleet_cluster_id'] = d.pop('cluster_id', None) d['speed_kn'] = None # 분석 결과에 속도 없음 run_violation_classifier(results_dicts) run_event_generator(results_dicts) run_kpi_writer() aggregate_hourly() aggregate_daily() run_alert_dispatcher() logger.info('output modules completed') except Exception as e: logger.warning('output modules failed (non-fatal): %s', e) # 9. Redis에 분석 컨텍스트 캐싱 (채팅용) try: from chat.cache import cache_analysis_context results_map = {r.mmsi: r for r in results} risk_dist = {} zone_dist = {} dark_count = 0 spoofing_count = 0 transship_count = 0 top_risk_list = [] for r in results: risk_dist[r.risk_level] = risk_dist.get(r.risk_level, 0) + 1 zone_dist[r.zone] = zone_dist.get(r.zone, 0) + 1 if r.is_dark: dark_count += 1 if r.spoofing_score > 0.5: spoofing_count += 1 if r.is_transship_suspect: transship_count += 1 top_risk_list.append({ 'mmsi': r.mmsi, 'name': vessel_store.get_vessel_info(r.mmsi).get('name', r.mmsi), 'risk_score': r.risk_score, 'risk_level': r.risk_level, 'zone': r.zone, 'is_dark': r.is_dark, 'is_transship': r.is_transship_suspect, 'activity_state': r.activity_state, }) top_risk_list.sort(key=lambda x: x['risk_score'], reverse=True) cache_analysis_context({ 'vessel_stats': vessel_store.stats(), 'risk_distribution': {**risk_dist, **zone_dist}, 'dark_count': dark_count, 'spoofing_count': spoofing_count, 'transship_count': transship_count, 'top_risk_vessels': top_risk_list[:10], 'polygon_summary': kcgdb.fetch_polygon_summary(), }) except Exception as e: logger.warning('failed to cache analysis context for chat: %s', e) elapsed = round(time.time() - start, 2) _last_run['duration_sec'] = elapsed _last_run['vessel_count'] = len(results) _last_run['upserted'] = upserted logger.info( 'analysis cycle: %d vessels, %d upserted, %.2fs', len(results), upserted, elapsed, ) except Exception as e: _last_run['error'] = str(e) logger.exception('analysis cycle failed: %s', e) def start_scheduler(): global _scheduler _scheduler = BackgroundScheduler() _scheduler.add_job( run_analysis_cycle, 'interval', minutes=settings.SCHEDULER_INTERVAL_MIN, id='vessel_analysis', max_instances=1, replace_existing=True, ) # 파티션 유지보수 (매일 04:00) from db.partition_manager import maintain_partitions _scheduler.add_job( maintain_partitions, 'cron', hour=4, minute=0, id='partition_maintenance', max_instances=1, replace_existing=True, ) _scheduler.start() logger.info('scheduler started (interval=%dm)', settings.SCHEDULER_INTERVAL_MIN) def stop_scheduler(): global _scheduler if _scheduler: _scheduler.shutdown(wait=False) _scheduler = None logger.info('scheduler stopped')