kcg-ai-monitoring/prediction/scheduler.py
htlee 197da13826 refactor(prediction): 사이클 스테이지 에러 경계 도입 (Phase 0-1)
docs/prediction-analysis.md P1 권고 반영. 5분 사이클의 각 스테이지를
한 try/except 로 뭉친 기존 구조를 스테이지 단위로 분리해 실패 지점을
명시적으로 특정하고 부분 실패 시에도 후속 스테이지가 계속 돌아가도록 개선.

- prediction/pipeline/stage_runner.py 신설
  - run_stage(name, fn, *args, required=False, **kwargs) 유틸
  - required=True 면 예외 re-raise (상위 사이클 try/except 가 잡도록)
  - required=False 면 logger.exception 으로 stacktrace 보존 + None 반환
  - 지속시간 로깅 포함

- prediction/scheduler.py run_analysis_cycle() 수정
  - 출력 단계 6모듈을 각각 run_stage() 로 분리:
    violation_classifier / event_generator / kpi_writer /
    stats_aggregate_hourly / stats_aggregate_daily / alert_dispatcher
  - upsert_results / cleanup_old 도 run_stage 로 래핑 (upsert 는 required=True)
  - 내부 try/except 의 logger.warning → logger.exception 으로 업그레이드
    (fetch_dark_history, gear collision event promotion, group polygon,
     gear correlation, pair detection, chat cache)
  - 스테이지 실패 시 journalctl -u kcg-ai-prediction 에서 stacktrace 로
    원인 바로 특정 가능 (기존은 "failed: X" 한 줄만 남아 디버깅 불가)

검증:
- python3 -c "import ast; ast.parse(...)" scheduler.py / stage_runner.py 통과
- run_stage smoke test (정상/실패 흡수/required 재raise 3가지) 통과

범위 밖 (후속):
- Phase 0-2 ILLEGAL_FISHING_PATTERN 전용 페이지 (다음 MR)
- Phase 0-3 Transshipment 전용 페이지 (다음 MR)
2026-04-17 11:28:30 +09:00

837 lines
37 KiB
Python

import logging
import time
from datetime import datetime, timedelta, timezone
from typing import Optional
from zoneinfo import ZoneInfo
from apscheduler.schedulers.background import BackgroundScheduler
from config import settings
from fleet_tracker import GEAR_PATTERN
from pipeline.stage_runner import run_stage
logger = logging.getLogger(__name__)
_KST = ZoneInfo('Asia/Seoul')
_scheduler: Optional[BackgroundScheduler] = None
_last_run: dict = {
'timestamp': None,
'duration_sec': 0,
'vessel_count': 0,
'upserted': 0,
'error': None,
}
_transship_pair_history: dict = {}
# 한국 선박 MMSI prefix — dark 판별 완전 제외
_KR_DOMESTIC_PREFIXES = ('440', '441')
def _is_dark_excluded(mmsi: str, name: str) -> tuple[bool, str]:
"""dark 탐지 대상에서 완전 제외할지. 어구/한국선만 필터.
사용자 알람은 선박만 대상, 한국선은 해경 관할 아님.
"""
if any(mmsi.startswith(p) for p in _KR_DOMESTIC_PREFIXES):
return True, 'kr_domestic'
if name and GEAR_PATTERN.match(name):
return True, 'gear_signal'
return False, ''
def _fetch_dark_history(kcg_conn, mmsi_list: list[str]) -> dict[str, dict]:
"""최근 7일 내 is_dark=True 이력을 mmsi별로 집계.
사이클 시작 시 한 번에 조회하여 점수 계산 시 재사용.
"""
if not mmsi_list:
return {}
try:
cur = kcg_conn.cursor()
cur.execute(
"""
SELECT mmsi,
count(DISTINCT analyzed_at::date) AS n7,
count(DISTINCT analyzed_at::date) FILTER (WHERE analyzed_at > now() - interval '24 hours') AS n24,
max(analyzed_at) AS last_at
FROM kcg.vessel_analysis_results
WHERE is_dark = true
AND gap_duration_min >= 100
AND analyzed_at > now() - interval '7 days'
AND mmsi = ANY(%s)
GROUP BY mmsi
""",
(list(mmsi_list),),
)
return {
str(m): {'count_7d': int(n7 or 0), 'count_24h': int(n24 or 0), 'last_at': t}
for m, n7, n24, t in cur.fetchall()
}
except Exception as e:
logger.exception('fetch_dark_history failed: %s', e)
return {}
def get_last_run() -> dict:
return _last_run.copy()
def run_analysis_cycle():
"""5분 주기 분석 사이클 — 인메모리 캐시 기반."""
from cache.vessel_store import vessel_store
from db import snpdb, kcgdb
from pipeline.orchestrator import ChineseFishingVesselPipeline
from algorithms.location import classify_zone
from algorithms.fishing_pattern import compute_ucaf_score, compute_ucft_score
from algorithms.dark_vessel import is_dark_vessel, analyze_dark_pattern, compute_dark_suspicion
from algorithms.spoofing import compute_spoofing_score, count_speed_jumps, compute_bd09_offset
from algorithms.risk import compute_vessel_risk_score
from fleet_tracker import fleet_tracker
from models.result import AnalysisResult
start = time.time()
_last_run['timestamp'] = datetime.now(timezone.utc).isoformat()
_last_run['error'] = None
try:
# 1. 증분 로드 + stale 제거
if vessel_store.last_bucket is None:
logger.warning('last_bucket is None, skipping incremental fetch (initial load not complete)')
df_new = None
else:
df_new = snpdb.fetch_incremental(vessel_store.last_bucket)
if df_new is not None and len(df_new) > 0:
vessel_store.merge_incremental(df_new)
vessel_store.evict_stale(settings.CACHE_WINDOW_HOURS)
# 정적정보 / 허가어선 주기적 갱신
vessel_store.refresh_static_info()
vessel_store.refresh_permit_registry()
# signal-batch API 정적정보 보강 (shipKindCode, status, heading, draught 등)
vessel_store.enrich_from_signal_api(minutes=10)
# 2. 분석 대상 선별 (SOG/COG 계산 포함)
df_targets = vessel_store.select_analysis_targets()
if len(df_targets) == 0:
logger.info('no analysis targets, skipping cycle')
_last_run['vessel_count'] = 0
return
# 3. 7단계 파이프라인 실행
pipeline = ChineseFishingVesselPipeline()
classifications, vessel_dfs = pipeline.run(df_targets)
if not classifications:
logger.info('no vessels classified, skipping')
_last_run['vessel_count'] = 0
return
# 4. 등록 선단 기반 fleet 분석
with kcgdb.get_conn() as kcg_conn:
fleet_tracker.load_registry(kcg_conn)
# 등록 선단 매칭은 classification 통과자(500척) 만이 아닌
# 전체 중국 MID(412/413/414) 활성 선박을 대상으로 한다.
# vessel_dfs 에만 돌리면 허가선 906척 중 95% AIS 존재에도 매칭률이 크게 낮아짐.
all_ais = []
all_tracks = getattr(vessel_store, '_tracks', {}) or vessel_dfs
for mmsi, df in all_tracks.items():
if not (mmsi.startswith('412') or mmsi.startswith('413') or mmsi.startswith('414')):
continue
if df is None or len(df) == 0:
continue
name = vessel_store.get_vessel_info(mmsi).get('name', '')
if not name:
continue
last = df.iloc[-1]
all_ais.append({
'mmsi': mmsi,
'name': name,
'lat': float(last['lat']),
'lon': float(last['lon']),
})
fleet_tracker.match_ais_to_registry(all_ais, kcg_conn)
gear_signals = [v for v in all_ais if GEAR_PATTERN.match(v.get('name', '') or '')]
fleet_tracker.track_gear_identity(gear_signals, kcg_conn)
# 이번 사이클에 갱신된 어구 정체성 충돌을 이벤트 허브로 승격 (CRITICAL/HIGH 만)
collision_ids = fleet_tracker.get_recent_collision_ids()
if collision_ids:
try:
from output.event_generator import run_gear_identity_collision_events
collision_events = run_gear_identity_collision_events(collision_ids)
logger.info(
'gear collision events: generated=%d, skipped_dedup=%d, skipped_low=%d',
collision_events['generated'],
collision_events['skipped_dedup'],
collision_events['skipped_low'],
)
except Exception as e:
logger.exception('gear collision event promotion failed: %s', e)
fleet_roles = fleet_tracker.build_fleet_clusters(vessel_dfs)
fleet_tracker.save_snapshot(vessel_dfs, kcg_conn)
gear_groups = []
# 4.5 그룹 폴리곤 생성 + 저장
try:
from algorithms.polygon_builder import detect_gear_groups, build_all_group_snapshots
company_vessels = fleet_tracker.get_company_vessels(vessel_dfs)
gear_groups = detect_gear_groups(vessel_store)
group_snapshots = build_all_group_snapshots(
vessel_store, company_vessels,
fleet_tracker._companies,
)
saved = kcgdb.save_group_snapshots(group_snapshots)
cleaned = kcgdb.cleanup_group_snapshots(days=7)
logger.info('group polygons: %d saved, %d cleaned, %d gear groups',
saved, cleaned, len(gear_groups))
except Exception as e:
logger.exception('group polygon generation failed: %s', e)
# 4.7 어구 연관성 분석 (멀티모델 패턴 추적)
try:
from algorithms.gear_correlation import run_gear_correlation
from algorithms.gear_parent_inference import run_gear_parent_inference
corr_result = run_gear_correlation(
vessel_store=vessel_store,
gear_groups=gear_groups,
conn=kcg_conn,
)
logger.info(
'gear correlation: %d scores updated, %d raw metrics, %d models',
corr_result['updated'], corr_result['raw_inserted'],
corr_result['models'],
)
inference_result = run_gear_parent_inference(
vessel_store=vessel_store,
gear_groups=gear_groups,
conn=kcg_conn,
)
logger.info(
'gear parent inference: %d groups, %d direct-match, %d candidates, %d promoted, %d review, %d skipped',
inference_result['groups'],
inference_result.get('direct_matched', 0),
inference_result['candidates'],
inference_result['promoted'],
inference_result['review_required'],
inference_result['skipped'],
)
except Exception as e:
logger.exception('gear correlation failed: %s', e)
# 4.9 페어 후보 탐색 (bbox 1차 + 궤적 유사도 2차 → G-06 pair_trawl 판정)
pair_results: dict[str, dict] = {}
try:
from algorithms.pair_trawl import find_pair_candidates, detect_pair_trawl
pt_registered = fleet_tracker.get_pt_registered_mmsis()
pt_sub_registered: set[str] = set() # TODO: fishery_code=PT-S 구분
base_mmsis: set[str] = {c['mmsi'] for c in classifications}
base_mmsis |= pt_registered
# pool 은 전체 24h 누적 tracks (중국 8k+ 한국/러시아 포함 55k).
# sog/cog 미계산 상태여도 _trajectory_similarity 내부에서 on-demand 계산.
pool_tracks = getattr(vessel_store, '_tracks', {}) or vessel_dfs
# 조업 속력대(1.5~5.0kn)에서 움직이는 모든 중국 선박을 base로 확장.
# 중국 MID: 412(본토) / 413(홍콩) / 414(마카오)
for mmsi, df in pool_tracks.items():
if not (mmsi.startswith('412') or mmsi.startswith('413') or mmsi.startswith('414')):
continue
if df is None or df.empty:
continue
sog_col = 'sog' if 'sog' in df.columns else ('raw_sog' if 'raw_sog' in df.columns else None)
if sog_col is None:
continue
try:
mean_sog = float(df[sog_col].tail(12).mean())
if 1.5 <= mean_sog <= 5.0:
base_mmsis.add(mmsi)
except Exception:
continue
pair_candidates = find_pair_candidates(
base_mmsis=base_mmsis,
vessel_dfs=pool_tracks,
get_vessel_info=vessel_store.get_vessel_info,
pt_registered=pt_registered,
pt_sub_registered=pt_sub_registered,
min_common_samples=4,
)
from algorithms.pair_trawl import REJECT_COUNTERS, reset_reject_counters
reset_reject_counters()
tier_counts = {'STRONG': 0, 'PROBABLE': 0, 'SUSPECT': 0}
pt_det = 0; coop_det = 0
for cand in pair_candidates:
ma, mb = cand['base_mmsi'], cand['target_mmsi']
if ma not in pool_tracks or mb not in pool_tracks:
continue
result = detect_pair_trawl(
pool_tracks[ma], pool_tracks[mb], ma, mb,
role_a=cand['base_role'], role_b=cand['target_role'],
similarity=cand['similarity'],
)
if not result.get('pair_detected'):
continue
tier = result.get('tier') or 'UNKNOWN'
tier_counts[tier] = tier_counts.get(tier, 0) + 1
pair_results[ma] = {**result, 'pair_mmsi': mb}
pair_results[mb] = {**result, 'pair_mmsi': ma}
if result.get('pair_type') == 'PT_REGISTERED':
pt_det += 1
elif result.get('pair_type') == 'COOP_FISHING':
coop_det += 1
logger.info(
'pair detection: candidates=%d, detected=%d '
'(STRONG=%d PROBABLE=%d SUSPECT=%d, pt=%d coop=%d) '
'reject={empty=%d miss=%d insuf_align=%d no_sync=%d}',
len(pair_candidates), len(pair_results) // 2,
tier_counts['STRONG'], tier_counts['PROBABLE'], tier_counts['SUSPECT'],
pt_det, coop_det,
REJECT_COUNTERS['empty_df'], REJECT_COUNTERS['missing_columns'],
REJECT_COUNTERS['insufficient_aligned'], REJECT_COUNTERS['no_sync_at_any_tier'],
)
except Exception as e:
logger.exception('pair detection failed: %s', e)
# 5. 선박별 추가 알고리즘 → AnalysisResult 생성
# dark 이력 일괄 조회 (7일 history) — 사이클당 1회
now_kst_hour = datetime.now(_KST).hour
all_chinese = vessel_store.get_chinese_mmsis()
with kcgdb.get_conn() as hist_conn:
dark_history_map = _fetch_dark_history(hist_conn, list(all_chinese))
pipeline_dark_tiers = {'CRITICAL': 0, 'HIGH': 0, 'WATCH': 0, 'NONE': 0}
pipeline_skip_counts = {'kr_domestic': 0, 'gear_signal': 0}
results = []
for c in classifications:
mmsi = c['mmsi']
df_v = vessel_dfs.get(mmsi)
if df_v is None or len(df_v) == 0:
continue
last_row = df_v.iloc[-1]
ts = last_row.get('timestamp')
zone_info = classify_zone(last_row['lat'], last_row['lon'])
gear_map = {'TRAWL': 'OT', 'PT': 'PT', 'PURSE': 'PS', 'LONGLINE': 'GN', 'TRAP': 'TRAP'}
# fleet_registry gear_code C21(쌍끌이) → vessel_type 'PT' 오버라이드
vtype = c['vessel_type']
registered_gear_code: Optional[str] = None
if hasattr(fleet_tracker, 'get_vessel_gear_code'):
registered_gear_code = fleet_tracker.get_vessel_gear_code(mmsi)
if registered_gear_code == 'C21':
vtype = 'PT'
gear = gear_map.get(vtype, 'OT')
ucaf = compute_ucaf_score(df_v, gear)
ucft = compute_ucft_score(df_v)
# ── Dark: 넓은 탐지 + 의도적 OFF 의심 점수화 ──
vname = vessel_store.get_vessel_info(mmsi).get('name', '') or ''
is_permitted = vessel_store.is_permitted(mmsi)
dark_excluded, dark_skip_reason = _is_dark_excluded(mmsi, vname)
if dark_excluded:
pipeline_skip_counts[dark_skip_reason] = pipeline_skip_counts.get(dark_skip_reason, 0) + 1
dark = False
gap_min = 0
dark_features: dict = {
'dark_suspicion_score': 0,
'dark_patterns': [],
'dark_tier': 'EXCLUDED',
'dark_history_7d': 0,
'dark_history_24h': 0,
}
else:
gap_info = analyze_dark_pattern(df_v)
dark = bool(gap_info.get('is_dark'))
gap_min = int(gap_info.get('gap_min') or 0)
history = dark_history_map.get(mmsi, {'count_7d': 0, 'count_24h': 0})
v_info = vessel_store.get_vessel_info(mmsi)
last_cog_val = float(df_v.iloc[-1].get('cog', 0)) if len(df_v) > 0 else None
score, patterns, tier = compute_dark_suspicion(
gap_info, mmsi, is_permitted, history,
now_kst_hour, classify_zone,
ship_kind_code=v_info.get('ship_kind_code', ''),
nav_status=v_info.get('status', ''),
heading=v_info.get('heading'),
last_cog=last_cog_val,
)
pipeline_dark_tiers[tier] = pipeline_dark_tiers.get(tier, 0) + 1
dark_features = {
'dark_suspicion_score': score,
'dark_patterns': patterns,
'dark_tier': tier,
'dark_history_7d': int(history.get('count_7d', 0) or 0),
'dark_history_24h': int(history.get('count_24h', 0) or 0),
'gap_start_lat': gap_info.get('gap_start_lat'),
'gap_start_lon': gap_info.get('gap_start_lon'),
'gap_start_sog': gap_info.get('gap_start_sog'),
'gap_start_state': gap_info.get('gap_start_state'),
}
spoof_score = compute_spoofing_score(df_v)
speed_jumps = count_speed_jumps(df_v)
bd09_offset = compute_bd09_offset(last_row['lat'], last_row['lon'])
fleet_info = fleet_roles.get(mmsi, {})
risk_score, risk_level = compute_vessel_risk_score(
mmsi, df_v, zone_info, is_permitted=is_permitted,
)
activity = 'UNKNOWN'
if 'state' in df_v.columns and len(df_v) > 0:
activity = df_v['state'].mode().iloc[0]
# ── G-01/G-04/G-05/G-06 통합 판정 (DAR-03) ──
from algorithms.gear_violation import classify_gear_violations
pair_result = pair_results.get(mmsi)
if pair_result and not pair_result.get('pair_detected'):
pair_result = None
# G-06 판정은 STRONG/PROBABLE tier + 유효 pair_type 만. SUSPECT 는 플래그만 유지.
if pair_result:
if pair_result.get('tier') not in ('STRONG', 'PROBABLE'):
pair_result = None
elif pair_result.get('pair_type') not in (
'PT_REGISTERED', 'TRANSSHIP_LIKE', 'COOP_FISHING', 'GENERIC',
):
pair_result = None
gear_episodes: list = []
gear_positions: list = []
permit_periods: list = []
if gear in ('GN', 'TRAP', 'FYK', 'FPO', 'GNS', 'GND'):
try:
with kcgdb.get_conn() as gv_conn:
gear_episodes = fleet_tracker.get_gear_episodes(mmsi, gv_conn, hours=24)
gear_positions = fleet_tracker.get_gear_positions(mmsi, df_v)
except Exception as e:
logger.debug('gear episode/pos 조회 실패 [%s]: %s', mmsi, e)
# G-02/G-03 입력: 허가 조업 기간 + 등록 업종코드 (등록 매칭된 선박만 대상)
registered_fishery_code = fleet_tracker.get_registered_fishery_code(mmsi)
if registered_fishery_code:
try:
with kcgdb.get_conn() as gp_conn:
permit_periods = fleet_tracker.get_permit_periods(mmsi, gp_conn)
except Exception as e:
logger.debug('permit_periods 조회 실패 [%s]: %s', mmsi, e)
observation_ts = ts if isinstance(ts, datetime) else None
if observation_ts is None and ts is not None:
try:
import pandas as pd
observation_ts = pd.to_datetime(ts).to_pydatetime()
except Exception:
observation_ts = None
gv = classify_gear_violations(
mmsi=mmsi, gear_type=gear, zone_info=zone_info,
df_vessel=df_v, pair_result=pair_result,
is_permitted=is_permitted,
gear_episodes=gear_episodes or None,
gear_positions=gear_positions or None,
permit_periods=permit_periods or None,
registered_fishery_code=registered_fishery_code,
observation_ts=observation_ts,
)
g_codes = gv['g_codes']
gear_judgment = gv['gear_judgment']
gear_violation_score = gv['gear_violation_score']
gear_violation_evidence = gv['evidence']
zone_code = zone_info.get('zone', 'EEZ_OR_BEYOND')
# risk_score에 어구 위반 가산
final_risk = min(100, risk_score + gear_violation_score)
final_risk_level = risk_level
if final_risk >= 70:
final_risk_level = 'CRITICAL'
elif final_risk >= 50:
final_risk_level = 'HIGH'
elif final_risk >= 30:
final_risk_level = 'MEDIUM'
# pair_result 는 STRONG/PROBABLE 필터링으로 SUSPECT 는 None.
# SUSPECT tier 까지 통계로 남기려면 raw pair 결과도 조회.
raw_pair = pair_results.get(mmsi) or {}
merged_features = {
**(c.get('features', {}) or {}),
**dark_features,
'g_codes': g_codes,
'gear_violation_score': gear_violation_score,
'gear_violation_evidence': gear_violation_evidence,
'pair_trawl_detected': bool(pair_result and pair_result.get('pair_detected')),
'pair_trawl_pair_mmsi': (pair_result or {}).get('pair_mmsi', ''),
'pair_tier': raw_pair.get('tier') or '',
'pair_type': raw_pair.get('pair_type') or '',
'pair_reject_reason': raw_pair.get('reject_reason') or '',
'similarity': raw_pair.get('similarity', 0),
'confidence': raw_pair.get('confidence', 0),
'registered_fishery_code': registered_fishery_code or '',
}
# 분석 시점의 선박 위치 — 특이운항 판별 근거 좌표로 DB에 저장
lat_val = last_row.get('lat')
lon_val = last_row.get('lon')
results.append(AnalysisResult(
mmsi=mmsi,
timestamp=ts,
vessel_type=vtype,
confidence=c['confidence'],
fishing_pct=c['fishing_pct'],
cluster_id=fleet_info.get('cluster_id', -1),
season=c['season'],
lat=float(lat_val) if lat_val is not None else None,
lon=float(lon_val) if lon_val is not None else None,
zone=zone_info.get('zone', 'EEZ_OR_BEYOND'),
dist_to_baseline_nm=zone_info.get('dist_from_baseline_nm', 999.0),
activity_state=activity,
ucaf_score=ucaf,
ucft_score=ucft,
is_dark=dark,
gap_duration_min=gap_min,
spoofing_score=spoof_score,
bd09_offset_m=bd09_offset,
speed_jump_count=speed_jumps,
cluster_size=fleet_info.get('cluster_size', 0),
is_leader=fleet_info.get('is_leader', False),
fleet_role=fleet_info.get('fleet_role', 'NOISE'),
risk_score=final_risk,
risk_level=final_risk_level,
features=merged_features,
gear_judgment=gear_judgment,
gear_code=registered_gear_code,
))
logger.info(
'pipeline dark: tiers=%s skip=%s',
pipeline_dark_tiers, pipeline_skip_counts,
)
# ── 5.5 경량 분석 — 파이프라인 미통과 412* 선박 ──
# vessel_store._tracks의 24h 누적 궤적을 직접 활용하여 dark/spoof 신호도 산출.
from algorithms.risk import compute_lightweight_risk_score
from algorithms.vessel_type_mapping import fishery_code_to_vessel_type
pipeline_mmsis = {c['mmsi'] for c in classifications}
lightweight_mmsis = vessel_store.get_chinese_mmsis() - pipeline_mmsis
if lightweight_mmsis:
now = datetime.now(timezone.utc)
all_positions = vessel_store.get_all_latest_positions()
lw_count = 0
lw_dark = 0
lw_spoof = 0
lw_dark_tiers = {'CRITICAL': 0, 'HIGH': 0, 'WATCH': 0, 'NONE': 0}
lw_dark_skip = {'kr_domestic': 0, 'gear_signal': 0}
for mmsi in lightweight_mmsis:
pos = all_positions.get(mmsi)
if pos is None or pos.get('lat') is None:
continue
lat, lon = pos['lat'], pos['lon']
sog = pos.get('sog', 0) or 0
cog = pos.get('cog', 0) or 0
ts = pos.get('timestamp', now)
zone_info = classify_zone(lat, lon)
if sog <= 1.0:
state = 'STATIONARY'
elif sog <= 5.0:
state = 'FISHING'
else:
state = 'SAILING'
is_permitted = vessel_store.is_permitted(mmsi)
vname = vessel_store.get_vessel_info(mmsi).get('name', '') or ''
# ── Dark: 사전 필터 (어구/한국선) ──
dark_excluded, dark_skip_reason = _is_dark_excluded(mmsi, vname)
if dark_excluded:
lw_dark_skip[dark_skip_reason] = lw_dark_skip.get(dark_skip_reason, 0) + 1
dark = False
gap_min = 0
dark_features: dict = {
'dark_suspicion_score': 0,
'dark_patterns': [],
'dark_tier': 'EXCLUDED',
'dark_history_7d': 0,
'dark_history_24h': 0,
}
spoof_score = 0.0
speed_jumps = 0
else:
df_v = vessel_store._tracks.get(mmsi)
spoof_score = 0.0
speed_jumps = 0
if df_v is not None and len(df_v) >= 2:
try:
spoof_score = compute_spoofing_score(df_v)
except Exception:
pass
try:
speed_jumps = count_speed_jumps(df_v)
except Exception:
pass
try:
gap_info = analyze_dark_pattern(df_v)
except Exception:
gap_info = {'is_dark': False, 'gap_min': 0}
else:
gap_info = {'is_dark': False, 'gap_min': 0}
dark = bool(gap_info.get('is_dark'))
gap_min = int(gap_info.get('gap_min') or 0)
history = dark_history_map.get(mmsi, {'count_7d': 0, 'count_24h': 0})
lw_info = vessel_store.get_vessel_info(mmsi)
lw_last_cog = float(df_v.iloc[-1].get('cog', 0)) if df_v is not None and len(df_v) > 0 else None
score, patterns, tier = compute_dark_suspicion(
gap_info, mmsi, is_permitted, history,
now_kst_hour, classify_zone,
ship_kind_code=lw_info.get('ship_kind_code', ''),
nav_status=lw_info.get('status', ''),
heading=lw_info.get('heading'),
last_cog=lw_last_cog,
)
lw_dark_tiers[tier] = lw_dark_tiers.get(tier, 0) + 1
dark_features = {
'dark_suspicion_score': score,
'dark_patterns': patterns,
'dark_tier': tier,
'dark_history_7d': int(history.get('count_7d', 0) or 0),
'dark_history_24h': int(history.get('count_24h', 0) or 0),
'gap_start_lat': gap_info.get('gap_start_lat'),
'gap_start_lon': gap_info.get('gap_start_lon'),
'gap_start_sog': gap_info.get('gap_start_sog'),
'gap_start_state': gap_info.get('gap_start_state'),
}
if dark:
lw_dark += 1
if spoof_score > 0.5:
lw_spoof += 1
# dark_features 에 저장된 패턴 기반 점수 + 반복 이력을 리스크 산출에 직접 연결
# (경량 경로가 45점 포화되던 원인 해소)
risk_score, risk_level = compute_lightweight_risk_score(
zone_info, sog, is_permitted=is_permitted,
is_dark=dark, gap_duration_min=gap_min,
spoofing_score=spoof_score,
dark_suspicion_score=int(dark_features.get('dark_suspicion_score', 0) or 0),
dist_from_baseline_nm=float(zone_info.get('dist_from_baseline_nm', 999.0) or 999.0),
dark_history_24h=int(dark_features.get('dark_history_24h', 0) or 0),
)
# 등록선은 fishery_code 로 vessel_type 채움 (미등록선은 UNKNOWN 유지)
registered_fc = (
fleet_tracker.get_registered_fishery_code(mmsi)
if hasattr(fleet_tracker, 'get_registered_fishery_code') else None
)
vessel_type = fishery_code_to_vessel_type(registered_fc)
# BD-09 오프셋은 중국 선박이므로 제외 (412* = 중국)
results.append(AnalysisResult(
mmsi=mmsi,
timestamp=ts,
vessel_type=vessel_type,
confidence=0.0,
fishing_pct=0.0,
lat=float(lat) if lat is not None else None,
lon=float(lon) if lon is not None else None,
zone=zone_info.get('zone', 'EEZ_OR_BEYOND'),
dist_to_baseline_nm=zone_info.get('dist_from_baseline_nm', 999.0),
activity_state=state,
ucaf_score=0.0,
ucft_score=0.0,
is_dark=dark,
gap_duration_min=gap_min,
spoofing_score=spoof_score,
bd09_offset_m=0.0,
speed_jump_count=speed_jumps,
cluster_id=-1,
cluster_size=0,
is_leader=False,
fleet_role='NONE',
risk_score=risk_score,
risk_level=risk_level,
is_transship_suspect=False,
transship_pair_mmsi='',
transship_duration_min=0,
features=dark_features,
gear_code=(
fleet_tracker.get_vessel_gear_code(mmsi)
if hasattr(fleet_tracker, 'get_vessel_gear_code') else None
),
))
lw_count += 1
logger.info(
'lightweight analysis: %d vessels (dark=%d, spoof>0.5=%d, tiers=%s, skip=%s)',
lw_count, lw_dark, lw_spoof, lw_dark_tiers, lw_dark_skip,
)
# 6. 환적 의심 탐지 (점수 기반, 베테랑 관점 필터)
from algorithms.transshipment import detect_transshipment
results_map = {r.mmsi: r for r in results}
transship_items = detect_transshipment(
df_targets,
_transship_pair_history,
get_vessel_info=vessel_store.get_vessel_info,
is_permitted=vessel_store.is_permitted,
classify_zone_fn=classify_zone,
now_kst_hour=now_kst_hour,
)
for item in transship_items:
a = item['pair_a']
b = item['pair_b']
dur = item['duration_min']
tier = item['severity']
if tier == 'WATCH':
continue # WATCH 등급은 저장 안 함 (로그만)
for m, pair in ((a, b), (b, a)):
if m in results_map:
r_obj = results_map[m]
r_obj.is_transship_suspect = True
r_obj.transship_pair_mmsi = pair
r_obj.transship_duration_min = dur
r_obj.features = {
**(r_obj.features or {}),
'transship_tier': tier,
'transship_score': item['score'],
}
# 7. 결과 저장 (필수 — 실패 시 사이클 abort)
upserted = run_stage('upsert_results', kcgdb.upsert_results, results, required=True)
run_stage('cleanup_old', kcgdb.cleanup_old, hours=48)
# 8. 출력 모듈 — 각 단계를 독립적으로 실행해 실패 지점을 명시적으로 기록.
# 한 모듈이 깨져도 다른 모듈은 계속 돌아가야 한다 (예: event_generator 는 실패했어도
# kpi_writer / stats_aggregator / alert_dispatcher 는 이전 사이클 결과로 동작 가능).
from output.violation_classifier import run_violation_classifier
from output.event_generator import run_event_generator
from output.kpi_writer import run_kpi_writer
from output.stats_aggregator import aggregate_hourly, aggregate_daily
from output.alert_dispatcher import run_alert_dispatcher
from dataclasses import asdict
results_dicts = [asdict(r) for r in results]
# 필드명 매핑 (AnalysisResult → 출력 모듈 기대 형식)
for d in results_dicts:
d['zone_code'] = d.pop('zone', None)
d['gap_duration_min'] = d.get('gap_duration_min', 0)
d['transship_suspect'] = d.pop('is_transship_suspect', False)
d['fleet_is_leader'] = d.pop('is_leader', False)
d['fleet_cluster_id'] = d.pop('cluster_id', None)
d['speed_kn'] = None # 분석 결과에 속도 없음
run_stage('violation_classifier', run_violation_classifier, results_dicts)
run_stage('event_generator', run_event_generator, results_dicts)
run_stage('kpi_writer', run_kpi_writer)
run_stage('stats_aggregate_hourly', aggregate_hourly)
run_stage('stats_aggregate_daily', aggregate_daily)
run_stage('alert_dispatcher', run_alert_dispatcher)
# 9. Redis에 분석 컨텍스트 캐싱 (채팅용)
try:
from chat.cache import cache_analysis_context
results_map = {r.mmsi: r for r in results}
risk_dist = {}
zone_dist = {}
dark_count = 0
spoofing_count = 0
transship_count = 0
top_risk_list = []
for r in results:
risk_dist[r.risk_level] = risk_dist.get(r.risk_level, 0) + 1
zone_dist[r.zone] = zone_dist.get(r.zone, 0) + 1
if r.is_dark:
dark_count += 1
if r.spoofing_score > 0.5:
spoofing_count += 1
if r.is_transship_suspect:
transship_count += 1
top_risk_list.append({
'mmsi': r.mmsi,
'name': vessel_store.get_vessel_info(r.mmsi).get('name', r.mmsi),
'risk_score': r.risk_score,
'risk_level': r.risk_level,
'zone': r.zone,
'is_dark': r.is_dark,
'is_transship': r.is_transship_suspect,
'activity_state': r.activity_state,
})
top_risk_list.sort(key=lambda x: x['risk_score'], reverse=True)
cache_analysis_context({
'vessel_stats': vessel_store.stats(),
'risk_distribution': {**risk_dist, **zone_dist},
'dark_count': dark_count,
'spoofing_count': spoofing_count,
'transship_count': transship_count,
'top_risk_vessels': top_risk_list[:10],
'polygon_summary': kcgdb.fetch_polygon_summary(),
})
except Exception as e:
logger.exception('failed to cache analysis context for chat: %s', e)
elapsed = round(time.time() - start, 2)
_last_run['duration_sec'] = elapsed
_last_run['vessel_count'] = len(results)
_last_run['upserted'] = upserted
logger.info(
'analysis cycle: %d vessels, %d upserted, %.2fs',
len(results), upserted, elapsed,
)
except Exception as e:
_last_run['error'] = str(e)
logger.exception('analysis cycle failed: %s', e)
def start_scheduler():
global _scheduler
_scheduler = BackgroundScheduler()
_scheduler.add_job(
run_analysis_cycle,
'interval',
minutes=settings.SCHEDULER_INTERVAL_MIN,
id='vessel_analysis',
max_instances=1,
replace_existing=True,
)
# 파티션 유지보수 (매일 04:00)
from db.partition_manager import maintain_partitions
_scheduler.add_job(
maintain_partitions,
'cron', hour=4, minute=0,
id='partition_maintenance',
max_instances=1,
replace_existing=True,
)
_scheduler.start()
logger.info('scheduler started (interval=%dm)', settings.SCHEDULER_INTERVAL_MIN)
def stop_scheduler():
global _scheduler
if _scheduler:
_scheduler.shutdown(wait=False)
_scheduler = None
logger.info('scheduler stopped')