refactor(prediction): 사이클 스테이지 에러 경계 도입 (Phase 0-1)

docs/prediction-analysis.md P1 권고 반영. 5분 사이클의 각 스테이지를
한 try/except 로 뭉친 기존 구조를 스테이지 단위로 분리해 실패 지점을
명시적으로 특정하고 부분 실패 시에도 후속 스테이지가 계속 돌아가도록 개선.

- prediction/pipeline/stage_runner.py 신설
  - run_stage(name, fn, *args, required=False, **kwargs) 유틸
  - required=True 면 예외 re-raise (상위 사이클 try/except 가 잡도록)
  - required=False 면 logger.exception 으로 stacktrace 보존 + None 반환
  - 지속시간 로깅 포함

- prediction/scheduler.py run_analysis_cycle() 수정
  - 출력 단계 6모듈을 각각 run_stage() 로 분리:
    violation_classifier / event_generator / kpi_writer /
    stats_aggregate_hourly / stats_aggregate_daily / alert_dispatcher
  - upsert_results / cleanup_old 도 run_stage 로 래핑 (upsert 는 required=True)
  - 내부 try/except 의 logger.warning → logger.exception 으로 업그레이드
    (fetch_dark_history, gear collision event promotion, group polygon,
     gear correlation, pair detection, chat cache)
  - 스테이지 실패 시 journalctl -u kcg-ai-prediction 에서 stacktrace 로
    원인 바로 특정 가능 (기존은 "failed: X" 한 줄만 남아 디버깅 불가)

검증:
- python3 -c "import ast; ast.parse(...)" scheduler.py / stage_runner.py 통과
- run_stage smoke test (정상/실패 흡수/required 재raise 3가지) 통과

범위 밖 (후속):
- Phase 0-2 ILLEGAL_FISHING_PATTERN 전용 페이지 (다음 MR)
- Phase 0-3 Transshipment 전용 페이지 (다음 MR)
This commit is contained in:
htlee 2026-04-17 11:28:30 +09:00
부모 bae2dbde08
커밋 197da13826
2개의 변경된 파일93개의 추가작업 그리고 35개의 파일을 삭제

파일 보기

@ -0,0 +1,58 @@
"""사이클 스테이지 에러 경계 유틸.
`run_analysis_cycle` 내부의 스테이지를 지점에서 감싸서
실패 스테이지를 명시적으로 로깅하고, 부분 실패가 후속 스테이지를
막지 않도록 한다.
설계 원칙:
- 비필수 스테이지는 예외를 흡수하고 None 반환 호출자는
`if result is None` 건너뛰기 선택 가능
- 필수 스테이지(`required=True`) 예외를 그대로 올려 상위
`run_analysis_cycle` top-level try/except 잡도록 한다
- `logger.exception` 사용으로 stacktrace 저널에 남도록 하여
원격 서버(journalctl) 에서 실패 지점 특정 가능
"""
from __future__ import annotations
import logging
import time
from typing import Any, Callable, TypeVar
logger = logging.getLogger(__name__)
T = TypeVar('T')
def run_stage(
name: str,
fn: Callable[..., T],
*args: Any,
required: bool = False,
**kwargs: Any,
) -> T | None:
"""스테이지 실행 + 지속시간 로깅 + 실패 격리.
Args:
name: 스테이지 이름 (로그 라벨). 'fleet_tracking', 'pair_detection'
fn: 실행할 호출 가능 객체
*args, **kwargs: fn 전달
required: True 실패 예외를 re-raise. False None 반환하고 계속
Returns:
fn 반환값, 또는 실패 None (required=False )
Raises:
fn 던진 예외 (required=True 때만)
"""
t0 = time.time()
try:
result = fn(*args, **kwargs)
elapsed = time.time() - t0
logger.info('stage %s ok in %.2fs', name, elapsed)
return result
except Exception as e:
elapsed = time.time() - t0
logger.exception('stage %s failed after %.2fs: %s', name, elapsed, e)
if required:
raise
return None

파일 보기

@ -8,6 +8,7 @@ from apscheduler.schedulers.background import BackgroundScheduler
from config import settings
from fleet_tracker import GEAR_PATTERN
from pipeline.stage_runner import run_stage
logger = logging.getLogger(__name__)
@ -69,7 +70,7 @@ def _fetch_dark_history(kcg_conn, mmsi_list: list[str]) -> dict[str, dict]:
for m, n7, n24, t in cur.fetchall()
}
except Exception as e:
logger.warning('fetch_dark_history failed: %s', e)
logger.exception('fetch_dark_history failed: %s', e)
return {}
@ -170,7 +171,7 @@ def run_analysis_cycle():
collision_events['skipped_low'],
)
except Exception as e:
logger.warning('gear collision event promotion failed: %s', e)
logger.exception('gear collision event promotion failed: %s', e)
fleet_roles = fleet_tracker.build_fleet_clusters(vessel_dfs)
@ -193,7 +194,7 @@ def run_analysis_cycle():
logger.info('group polygons: %d saved, %d cleaned, %d gear groups',
saved, cleaned, len(gear_groups))
except Exception as e:
logger.warning('group polygon generation failed: %s', e)
logger.exception('group polygon generation failed: %s', e)
# 4.7 어구 연관성 분석 (멀티모델 패턴 추적)
try:
@ -226,7 +227,7 @@ def run_analysis_cycle():
inference_result['skipped'],
)
except Exception as e:
logger.warning('gear correlation failed: %s', e)
logger.exception('gear correlation failed: %s', e)
# 4.9 페어 후보 탐색 (bbox 1차 + 궤적 유사도 2차 → G-06 pair_trawl 판정)
pair_results: dict[str, dict] = {}
@ -300,7 +301,7 @@ def run_analysis_cycle():
REJECT_COUNTERS['insufficient_aligned'], REJECT_COUNTERS['no_sync_at_any_tier'],
)
except Exception as e:
logger.warning('pair detection failed: %s', e)
logger.exception('pair detection failed: %s', e)
# 5. 선박별 추가 알고리즘 → AnalysisResult 생성
# dark 이력 일괄 조회 (7일 history) — 사이클당 1회
@ -712,12 +713,13 @@ def run_analysis_cycle():
'transship_score': item['score'],
}
# 7. 결과 저장
upserted = kcgdb.upsert_results(results)
kcgdb.cleanup_old(hours=48)
# 7. 결과 저장 (필수 — 실패 시 사이클 abort)
upserted = run_stage('upsert_results', kcgdb.upsert_results, results, required=True)
run_stage('cleanup_old', kcgdb.cleanup_old, hours=48)
# 8. 출력 모듈 (이벤트 생성, 위반 분류, KPI 갱신, 통계 집계, 경보)
try:
# 8. 출력 모듈 — 각 단계를 독립적으로 실행해 실패 지점을 명시적으로 기록.
# 한 모듈이 깨져도 다른 모듈은 계속 돌아가야 한다 (예: event_generator 는 실패했어도
# kpi_writer / stats_aggregator / alert_dispatcher 는 이전 사이클 결과로 동작 가능).
from output.violation_classifier import run_violation_classifier
from output.event_generator import run_event_generator
from output.kpi_writer import run_kpi_writer
@ -734,15 +736,13 @@ def run_analysis_cycle():
d['fleet_is_leader'] = d.pop('is_leader', False)
d['fleet_cluster_id'] = d.pop('cluster_id', None)
d['speed_kn'] = None # 분석 결과에 속도 없음
run_violation_classifier(results_dicts)
run_event_generator(results_dicts)
run_kpi_writer()
aggregate_hourly()
aggregate_daily()
run_alert_dispatcher()
logger.info('output modules completed')
except Exception as e:
logger.warning('output modules failed (non-fatal): %s', e)
run_stage('violation_classifier', run_violation_classifier, results_dicts)
run_stage('event_generator', run_event_generator, results_dicts)
run_stage('kpi_writer', run_kpi_writer)
run_stage('stats_aggregate_hourly', aggregate_hourly)
run_stage('stats_aggregate_daily', aggregate_daily)
run_stage('alert_dispatcher', run_alert_dispatcher)
# 9. Redis에 분석 컨텍스트 캐싱 (채팅용)
try:
@ -788,7 +788,7 @@ def run_analysis_cycle():
'polygon_summary': kcgdb.fetch_polygon_summary(),
})
except Exception as e:
logger.warning('failed to cache analysis context for chat: %s', e)
logger.exception('failed to cache analysis context for chat: %s', e)
elapsed = round(time.time() - start, 2)
_last_run['duration_sec'] = elapsed