From 197da138267ccd2db77f839ca6706f29fe03dfe1 Mon Sep 17 00:00:00 2001 From: htlee Date: Fri, 17 Apr 2026 11:28:30 +0900 Subject: [PATCH] =?UTF-8?q?refactor(prediction):=20=EC=82=AC=EC=9D=B4?= =?UTF-8?q?=ED=81=B4=20=EC=8A=A4=ED=85=8C=EC=9D=B4=EC=A7=80=20=EC=97=90?= =?UTF-8?q?=EB=9F=AC=20=EA=B2=BD=EA=B3=84=20=EB=8F=84=EC=9E=85=20(Phase=20?= =?UTF-8?q?0-1)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit docs/prediction-analysis.md P1 권고 반영. 5분 사이클의 각 스테이지를 한 try/except 로 뭉친 기존 구조를 스테이지 단위로 분리해 실패 지점을 명시적으로 특정하고 부분 실패 시에도 후속 스테이지가 계속 돌아가도록 개선. - prediction/pipeline/stage_runner.py 신설 - run_stage(name, fn, *args, required=False, **kwargs) 유틸 - required=True 면 예외 re-raise (상위 사이클 try/except 가 잡도록) - required=False 면 logger.exception 으로 stacktrace 보존 + None 반환 - 지속시간 로깅 포함 - prediction/scheduler.py run_analysis_cycle() 수정 - 출력 단계 6모듈을 각각 run_stage() 로 분리: violation_classifier / event_generator / kpi_writer / stats_aggregate_hourly / stats_aggregate_daily / alert_dispatcher - upsert_results / cleanup_old 도 run_stage 로 래핑 (upsert 는 required=True) - 내부 try/except 의 logger.warning → logger.exception 으로 업그레이드 (fetch_dark_history, gear collision event promotion, group polygon, gear correlation, pair detection, chat cache) - 스테이지 실패 시 journalctl -u kcg-ai-prediction 에서 stacktrace 로 원인 바로 특정 가능 (기존은 "failed: X" 한 줄만 남아 디버깅 불가) 검증: - python3 -c "import ast; ast.parse(...)" scheduler.py / stage_runner.py 통과 - run_stage smoke test (정상/실패 흡수/required 재raise 3가지) 통과 범위 밖 (후속): - Phase 0-2 ILLEGAL_FISHING_PATTERN 전용 페이지 (다음 MR) - Phase 0-3 Transshipment 전용 페이지 (다음 MR) --- prediction/pipeline/stage_runner.py | 58 ++++++++++++++++++++++++ prediction/scheduler.py | 70 ++++++++++++++--------------- 2 files changed, 93 insertions(+), 35 deletions(-) create mode 100644 prediction/pipeline/stage_runner.py diff --git a/prediction/pipeline/stage_runner.py b/prediction/pipeline/stage_runner.py new file mode 100644 index 0000000..b402a7b --- /dev/null +++ b/prediction/pipeline/stage_runner.py @@ -0,0 +1,58 @@ +"""사이클 스테이지 에러 경계 유틸. + +`run_analysis_cycle` 내부의 각 스테이지를 한 지점에서 감싸서 +실패 스테이지를 명시적으로 로깅하고, 부분 실패가 후속 스테이지를 +막지 않도록 한다. + +설계 원칙: +- 비필수 스테이지는 예외를 흡수하고 None 을 반환 → 호출자는 + `if result is None` 로 건너뛰기 선택 가능 +- 필수 스테이지(`required=True`)는 예외를 그대로 올려 상위 + `run_analysis_cycle` 의 top-level try/except 가 잡도록 한다 +- `logger.exception` 사용으로 stacktrace 가 저널에 남도록 하여 + 원격 서버(journalctl) 에서 실패 지점 특정 가능 +""" +from __future__ import annotations + +import logging +import time +from typing import Any, Callable, TypeVar + +logger = logging.getLogger(__name__) + +T = TypeVar('T') + + +def run_stage( + name: str, + fn: Callable[..., T], + *args: Any, + required: bool = False, + **kwargs: Any, +) -> T | None: + """스테이지 실행 + 지속시간 로깅 + 실패 격리. + + Args: + name: 스테이지 이름 (로그 라벨). 'fleet_tracking', 'pair_detection' 등 + fn: 실행할 호출 가능 객체 + *args, **kwargs: fn 에 전달 + required: True 면 실패 시 예외를 re-raise. False 면 None 반환하고 계속 + + Returns: + fn 의 반환값, 또는 실패 시 None (required=False 일 때) + + Raises: + fn 이 던진 예외 (required=True 일 때만) + """ + t0 = time.time() + try: + result = fn(*args, **kwargs) + elapsed = time.time() - t0 + logger.info('stage %s ok in %.2fs', name, elapsed) + return result + except Exception as e: + elapsed = time.time() - t0 + logger.exception('stage %s failed after %.2fs: %s', name, elapsed, e) + if required: + raise + return None diff --git a/prediction/scheduler.py b/prediction/scheduler.py index 08c3f22..fd9a030 100644 --- a/prediction/scheduler.py +++ b/prediction/scheduler.py @@ -8,6 +8,7 @@ from apscheduler.schedulers.background import BackgroundScheduler from config import settings from fleet_tracker import GEAR_PATTERN +from pipeline.stage_runner import run_stage logger = logging.getLogger(__name__) @@ -69,7 +70,7 @@ def _fetch_dark_history(kcg_conn, mmsi_list: list[str]) -> dict[str, dict]: for m, n7, n24, t in cur.fetchall() } except Exception as e: - logger.warning('fetch_dark_history failed: %s', e) + logger.exception('fetch_dark_history failed: %s', e) return {} @@ -170,7 +171,7 @@ def run_analysis_cycle(): collision_events['skipped_low'], ) except Exception as e: - logger.warning('gear collision event promotion failed: %s', e) + logger.exception('gear collision event promotion failed: %s', e) fleet_roles = fleet_tracker.build_fleet_clusters(vessel_dfs) @@ -193,7 +194,7 @@ def run_analysis_cycle(): logger.info('group polygons: %d saved, %d cleaned, %d gear groups', saved, cleaned, len(gear_groups)) except Exception as e: - logger.warning('group polygon generation failed: %s', e) + logger.exception('group polygon generation failed: %s', e) # 4.7 어구 연관성 분석 (멀티모델 패턴 추적) try: @@ -226,7 +227,7 @@ def run_analysis_cycle(): inference_result['skipped'], ) except Exception as e: - logger.warning('gear correlation failed: %s', e) + logger.exception('gear correlation failed: %s', e) # 4.9 페어 후보 탐색 (bbox 1차 + 궤적 유사도 2차 → G-06 pair_trawl 판정) pair_results: dict[str, dict] = {} @@ -300,7 +301,7 @@ def run_analysis_cycle(): REJECT_COUNTERS['insufficient_aligned'], REJECT_COUNTERS['no_sync_at_any_tier'], ) except Exception as e: - logger.warning('pair detection failed: %s', e) + logger.exception('pair detection failed: %s', e) # 5. 선박별 추가 알고리즘 → AnalysisResult 생성 # dark 이력 일괄 조회 (7일 history) — 사이클당 1회 @@ -712,37 +713,36 @@ def run_analysis_cycle(): 'transship_score': item['score'], } - # 7. 결과 저장 - upserted = kcgdb.upsert_results(results) - kcgdb.cleanup_old(hours=48) + # 7. 결과 저장 (필수 — 실패 시 사이클 abort) + upserted = run_stage('upsert_results', kcgdb.upsert_results, results, required=True) + run_stage('cleanup_old', kcgdb.cleanup_old, hours=48) - # 8. 출력 모듈 (이벤트 생성, 위반 분류, KPI 갱신, 통계 집계, 경보) - try: - from output.violation_classifier import run_violation_classifier - from output.event_generator import run_event_generator - from output.kpi_writer import run_kpi_writer - from output.stats_aggregator import aggregate_hourly, aggregate_daily - from output.alert_dispatcher import run_alert_dispatcher + # 8. 출력 모듈 — 각 단계를 독립적으로 실행해 실패 지점을 명시적으로 기록. + # 한 모듈이 깨져도 다른 모듈은 계속 돌아가야 한다 (예: event_generator 는 실패했어도 + # kpi_writer / stats_aggregator / alert_dispatcher 는 이전 사이클 결과로 동작 가능). + from output.violation_classifier import run_violation_classifier + from output.event_generator import run_event_generator + from output.kpi_writer import run_kpi_writer + from output.stats_aggregator import aggregate_hourly, aggregate_daily + from output.alert_dispatcher import run_alert_dispatcher - from dataclasses import asdict - results_dicts = [asdict(r) for r in results] - # 필드명 매핑 (AnalysisResult → 출력 모듈 기대 형식) - for d in results_dicts: - d['zone_code'] = d.pop('zone', None) - d['gap_duration_min'] = d.get('gap_duration_min', 0) - d['transship_suspect'] = d.pop('is_transship_suspect', False) - d['fleet_is_leader'] = d.pop('is_leader', False) - d['fleet_cluster_id'] = d.pop('cluster_id', None) - d['speed_kn'] = None # 분석 결과에 속도 없음 - run_violation_classifier(results_dicts) - run_event_generator(results_dicts) - run_kpi_writer() - aggregate_hourly() - aggregate_daily() - run_alert_dispatcher() - logger.info('output modules completed') - except Exception as e: - logger.warning('output modules failed (non-fatal): %s', e) + from dataclasses import asdict + results_dicts = [asdict(r) for r in results] + # 필드명 매핑 (AnalysisResult → 출력 모듈 기대 형식) + for d in results_dicts: + d['zone_code'] = d.pop('zone', None) + d['gap_duration_min'] = d.get('gap_duration_min', 0) + d['transship_suspect'] = d.pop('is_transship_suspect', False) + d['fleet_is_leader'] = d.pop('is_leader', False) + d['fleet_cluster_id'] = d.pop('cluster_id', None) + d['speed_kn'] = None # 분석 결과에 속도 없음 + + run_stage('violation_classifier', run_violation_classifier, results_dicts) + run_stage('event_generator', run_event_generator, results_dicts) + run_stage('kpi_writer', run_kpi_writer) + run_stage('stats_aggregate_hourly', aggregate_hourly) + run_stage('stats_aggregate_daily', aggregate_daily) + run_stage('alert_dispatcher', run_alert_dispatcher) # 9. Redis에 분석 컨텍스트 캐싱 (채팅용) try: @@ -788,7 +788,7 @@ def run_analysis_cycle(): 'polygon_summary': kcgdb.fetch_polygon_summary(), }) except Exception as e: - logger.warning('failed to cache analysis context for chat: %s', e) + logger.exception('failed to cache analysis context for chat: %s', e) elapsed = round(time.time() - start, 2) _last_run['duration_sec'] = elapsed