Compare commits
3 커밋
bae2dbde08
...
3e29bc9995
| 작성자 | SHA1 | 날짜 | |
|---|---|---|---|
| 3e29bc9995 | |||
| a32d09f75a | |||
| 197da13826 |
@ -4,6 +4,9 @@
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
### 변경
|
||||||
|
- **Prediction 5분 사이클 스테이지 에러 경계 도입 (Phase 0-1)** — `prediction/pipeline/stage_runner.py` 신설해 `run_stage(name, fn, required=False)` 유틸 제공. `scheduler.py run_analysis_cycle()` 의 출력 6모듈(violation_classifier / event_generator / kpi_writer / stats_aggregate_hourly / stats_aggregate_daily / alert_dispatcher)을 한 try/except 로 묶던 구조를 스테이지별 독립 실행으로 분리, 한 모듈이 깨져도 다른 모듈이 계속 돌아가도록 개선. `upsert_results` 는 required=True 로 실패 시 사이클 abort. 내부 try/except 의 `logger.warning` 을 `logger.exception` 으로 업그레이드(fetch_dark_history / gear collision event promotion / group polygon / gear correlation / pair detection / chat cache)하여 `journalctl -u kcg-ai-prediction` 에서 stacktrace 로 실패 지점 즉시 특정 가능. (docs/prediction-analysis.md P1 권고)
|
||||||
|
|
||||||
### 문서
|
### 문서
|
||||||
- **Prediction 모듈 심층 분석 리포트 신설** — `docs/prediction-analysis.md` (9개 섹션, 250 라인). opus 4.7 독립 리뷰 관점에서 현재 17 알고리즘의 레이어 분리·5분 사이클 시퀀스·4대 도메인 커버리지를 평가하고, 6축(관심사 분리/재사용성/테스트 가능성/에러 격리/동시성/설정 가능성)으로 구조 채점 + P1~P4 개선 제안·임계값 전수표 제공
|
- **Prediction 모듈 심층 분석 리포트 신설** — `docs/prediction-analysis.md` (9개 섹션, 250 라인). opus 4.7 독립 리뷰 관점에서 현재 17 알고리즘의 레이어 분리·5분 사이클 시퀀스·4대 도메인 커버리지를 평가하고, 6축(관심사 분리/재사용성/테스트 가능성/에러 격리/동시성/설정 가능성)으로 구조 채점 + P1~P4 개선 제안·임계값 전수표 제공
|
||||||
- **루트·SFR 문서 drift 해소** — V001~V016 → V030 + 51 테이블, Python 3.9 → 3.11+, 14 → 17 알고리즘 모듈 실측 반영. SFR-10 에 GEAR_IDENTITY_COLLISION 패턴 + GearCollisionDetection 페이지 섹션 추가 (sfr-traceability/sfr-user-guide), `/gear-collision` 라우트 architecture.md 포함, system-flow-guide 노드 수 102→115 + V030 manifest 미반영 경고, backend/README "Phase 2 예정" 상태 → 실제 운영 구성 전면 재작성 (PR #79 hotfix 요구사항 명시)
|
- **루트·SFR 문서 drift 해소** — V001~V016 → V030 + 51 테이블, Python 3.9 → 3.11+, 14 → 17 알고리즘 모듈 실측 반영. SFR-10 에 GEAR_IDENTITY_COLLISION 패턴 + GearCollisionDetection 페이지 섹션 추가 (sfr-traceability/sfr-user-guide), `/gear-collision` 라우트 architecture.md 포함, system-flow-guide 노드 수 102→115 + V030 manifest 미반영 경고, backend/README "Phase 2 예정" 상태 → 실제 운영 구성 전면 재작성 (PR #79 hotfix 요구사항 명시)
|
||||||
|
|||||||
58
prediction/pipeline/stage_runner.py
Normal file
58
prediction/pipeline/stage_runner.py
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
"""사이클 스테이지 에러 경계 유틸.
|
||||||
|
|
||||||
|
`run_analysis_cycle` 내부의 각 스테이지를 한 지점에서 감싸서
|
||||||
|
실패 스테이지를 명시적으로 로깅하고, 부분 실패가 후속 스테이지를
|
||||||
|
막지 않도록 한다.
|
||||||
|
|
||||||
|
설계 원칙:
|
||||||
|
- 비필수 스테이지는 예외를 흡수하고 None 을 반환 → 호출자는
|
||||||
|
`if result is None` 로 건너뛰기 선택 가능
|
||||||
|
- 필수 스테이지(`required=True`)는 예외를 그대로 올려 상위
|
||||||
|
`run_analysis_cycle` 의 top-level try/except 가 잡도록 한다
|
||||||
|
- `logger.exception` 사용으로 stacktrace 가 저널에 남도록 하여
|
||||||
|
원격 서버(journalctl) 에서 실패 지점 특정 가능
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from typing import Any, Callable, TypeVar
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
T = TypeVar('T')
|
||||||
|
|
||||||
|
|
||||||
|
def run_stage(
|
||||||
|
name: str,
|
||||||
|
fn: Callable[..., T],
|
||||||
|
*args: Any,
|
||||||
|
required: bool = False,
|
||||||
|
**kwargs: Any,
|
||||||
|
) -> T | None:
|
||||||
|
"""스테이지 실행 + 지속시간 로깅 + 실패 격리.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: 스테이지 이름 (로그 라벨). 'fleet_tracking', 'pair_detection' 등
|
||||||
|
fn: 실행할 호출 가능 객체
|
||||||
|
*args, **kwargs: fn 에 전달
|
||||||
|
required: True 면 실패 시 예외를 re-raise. False 면 None 반환하고 계속
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
fn 의 반환값, 또는 실패 시 None (required=False 일 때)
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
fn 이 던진 예외 (required=True 일 때만)
|
||||||
|
"""
|
||||||
|
t0 = time.time()
|
||||||
|
try:
|
||||||
|
result = fn(*args, **kwargs)
|
||||||
|
elapsed = time.time() - t0
|
||||||
|
logger.info('stage %s ok in %.2fs', name, elapsed)
|
||||||
|
return result
|
||||||
|
except Exception as e:
|
||||||
|
elapsed = time.time() - t0
|
||||||
|
logger.exception('stage %s failed after %.2fs: %s', name, elapsed, e)
|
||||||
|
if required:
|
||||||
|
raise
|
||||||
|
return None
|
||||||
@ -8,6 +8,7 @@ from apscheduler.schedulers.background import BackgroundScheduler
|
|||||||
|
|
||||||
from config import settings
|
from config import settings
|
||||||
from fleet_tracker import GEAR_PATTERN
|
from fleet_tracker import GEAR_PATTERN
|
||||||
|
from pipeline.stage_runner import run_stage
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -69,7 +70,7 @@ def _fetch_dark_history(kcg_conn, mmsi_list: list[str]) -> dict[str, dict]:
|
|||||||
for m, n7, n24, t in cur.fetchall()
|
for m, n7, n24, t in cur.fetchall()
|
||||||
}
|
}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning('fetch_dark_history failed: %s', e)
|
logger.exception('fetch_dark_history failed: %s', e)
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
|
||||||
@ -170,7 +171,7 @@ def run_analysis_cycle():
|
|||||||
collision_events['skipped_low'],
|
collision_events['skipped_low'],
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning('gear collision event promotion failed: %s', e)
|
logger.exception('gear collision event promotion failed: %s', e)
|
||||||
|
|
||||||
fleet_roles = fleet_tracker.build_fleet_clusters(vessel_dfs)
|
fleet_roles = fleet_tracker.build_fleet_clusters(vessel_dfs)
|
||||||
|
|
||||||
@ -193,7 +194,7 @@ def run_analysis_cycle():
|
|||||||
logger.info('group polygons: %d saved, %d cleaned, %d gear groups',
|
logger.info('group polygons: %d saved, %d cleaned, %d gear groups',
|
||||||
saved, cleaned, len(gear_groups))
|
saved, cleaned, len(gear_groups))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning('group polygon generation failed: %s', e)
|
logger.exception('group polygon generation failed: %s', e)
|
||||||
|
|
||||||
# 4.7 어구 연관성 분석 (멀티모델 패턴 추적)
|
# 4.7 어구 연관성 분석 (멀티모델 패턴 추적)
|
||||||
try:
|
try:
|
||||||
@ -226,7 +227,7 @@ def run_analysis_cycle():
|
|||||||
inference_result['skipped'],
|
inference_result['skipped'],
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning('gear correlation failed: %s', e)
|
logger.exception('gear correlation failed: %s', e)
|
||||||
|
|
||||||
# 4.9 페어 후보 탐색 (bbox 1차 + 궤적 유사도 2차 → G-06 pair_trawl 판정)
|
# 4.9 페어 후보 탐색 (bbox 1차 + 궤적 유사도 2차 → G-06 pair_trawl 판정)
|
||||||
pair_results: dict[str, dict] = {}
|
pair_results: dict[str, dict] = {}
|
||||||
@ -300,7 +301,7 @@ def run_analysis_cycle():
|
|||||||
REJECT_COUNTERS['insufficient_aligned'], REJECT_COUNTERS['no_sync_at_any_tier'],
|
REJECT_COUNTERS['insufficient_aligned'], REJECT_COUNTERS['no_sync_at_any_tier'],
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning('pair detection failed: %s', e)
|
logger.exception('pair detection failed: %s', e)
|
||||||
|
|
||||||
# 5. 선박별 추가 알고리즘 → AnalysisResult 생성
|
# 5. 선박별 추가 알고리즘 → AnalysisResult 생성
|
||||||
# dark 이력 일괄 조회 (7일 history) — 사이클당 1회
|
# dark 이력 일괄 조회 (7일 history) — 사이클당 1회
|
||||||
@ -712,37 +713,36 @@ def run_analysis_cycle():
|
|||||||
'transship_score': item['score'],
|
'transship_score': item['score'],
|
||||||
}
|
}
|
||||||
|
|
||||||
# 7. 결과 저장
|
# 7. 결과 저장 (필수 — 실패 시 사이클 abort)
|
||||||
upserted = kcgdb.upsert_results(results)
|
upserted = run_stage('upsert_results', kcgdb.upsert_results, results, required=True)
|
||||||
kcgdb.cleanup_old(hours=48)
|
run_stage('cleanup_old', kcgdb.cleanup_old, hours=48)
|
||||||
|
|
||||||
# 8. 출력 모듈 (이벤트 생성, 위반 분류, KPI 갱신, 통계 집계, 경보)
|
# 8. 출력 모듈 — 각 단계를 독립적으로 실행해 실패 지점을 명시적으로 기록.
|
||||||
try:
|
# 한 모듈이 깨져도 다른 모듈은 계속 돌아가야 한다 (예: event_generator 는 실패했어도
|
||||||
from output.violation_classifier import run_violation_classifier
|
# kpi_writer / stats_aggregator / alert_dispatcher 는 이전 사이클 결과로 동작 가능).
|
||||||
from output.event_generator import run_event_generator
|
from output.violation_classifier import run_violation_classifier
|
||||||
from output.kpi_writer import run_kpi_writer
|
from output.event_generator import run_event_generator
|
||||||
from output.stats_aggregator import aggregate_hourly, aggregate_daily
|
from output.kpi_writer import run_kpi_writer
|
||||||
from output.alert_dispatcher import run_alert_dispatcher
|
from output.stats_aggregator import aggregate_hourly, aggregate_daily
|
||||||
|
from output.alert_dispatcher import run_alert_dispatcher
|
||||||
|
|
||||||
from dataclasses import asdict
|
from dataclasses import asdict
|
||||||
results_dicts = [asdict(r) for r in results]
|
results_dicts = [asdict(r) for r in results]
|
||||||
# 필드명 매핑 (AnalysisResult → 출력 모듈 기대 형식)
|
# 필드명 매핑 (AnalysisResult → 출력 모듈 기대 형식)
|
||||||
for d in results_dicts:
|
for d in results_dicts:
|
||||||
d['zone_code'] = d.pop('zone', None)
|
d['zone_code'] = d.pop('zone', None)
|
||||||
d['gap_duration_min'] = d.get('gap_duration_min', 0)
|
d['gap_duration_min'] = d.get('gap_duration_min', 0)
|
||||||
d['transship_suspect'] = d.pop('is_transship_suspect', False)
|
d['transship_suspect'] = d.pop('is_transship_suspect', False)
|
||||||
d['fleet_is_leader'] = d.pop('is_leader', False)
|
d['fleet_is_leader'] = d.pop('is_leader', False)
|
||||||
d['fleet_cluster_id'] = d.pop('cluster_id', None)
|
d['fleet_cluster_id'] = d.pop('cluster_id', None)
|
||||||
d['speed_kn'] = None # 분석 결과에 속도 없음
|
d['speed_kn'] = None # 분석 결과에 속도 없음
|
||||||
run_violation_classifier(results_dicts)
|
|
||||||
run_event_generator(results_dicts)
|
run_stage('violation_classifier', run_violation_classifier, results_dicts)
|
||||||
run_kpi_writer()
|
run_stage('event_generator', run_event_generator, results_dicts)
|
||||||
aggregate_hourly()
|
run_stage('kpi_writer', run_kpi_writer)
|
||||||
aggregate_daily()
|
run_stage('stats_aggregate_hourly', aggregate_hourly)
|
||||||
run_alert_dispatcher()
|
run_stage('stats_aggregate_daily', aggregate_daily)
|
||||||
logger.info('output modules completed')
|
run_stage('alert_dispatcher', run_alert_dispatcher)
|
||||||
except Exception as e:
|
|
||||||
logger.warning('output modules failed (non-fatal): %s', e)
|
|
||||||
|
|
||||||
# 9. Redis에 분석 컨텍스트 캐싱 (채팅용)
|
# 9. Redis에 분석 컨텍스트 캐싱 (채팅용)
|
||||||
try:
|
try:
|
||||||
@ -788,7 +788,7 @@ def run_analysis_cycle():
|
|||||||
'polygon_summary': kcgdb.fetch_polygon_summary(),
|
'polygon_summary': kcgdb.fetch_polygon_summary(),
|
||||||
})
|
})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning('failed to cache analysis context for chat: %s', e)
|
logger.exception('failed to cache analysis context for chat: %s', e)
|
||||||
|
|
||||||
elapsed = round(time.time() - start, 2)
|
elapsed = round(time.time() - start, 2)
|
||||||
_last_run['duration_sec'] = elapsed
|
_last_run['duration_sec'] = elapsed
|
||||||
|
|||||||
불러오는 중...
Reference in New Issue
Block a user