kcg-ai-monitoring/prediction/tests/test_gear_parent_episode.py
htlee e2fc355b2c feat: S2 prediction 분석 엔진 모노레포 이식
iran prediction 47개 Python 파일을 prediction/ 디렉토리로 복제:
- algorithms/ 14개 분석 알고리즘 (어구추론, 다크베셀, 스푸핑, 환적, 위험도 등)
- pipeline/ 7단계 분류 파이프라인
- cache/vessel_store (24h 슬라이딩 윈도우)
- db/ 어댑터 (snpdb 원본조회, kcgdb 결과저장)
- chat/ AI 채팅 (Ollama, 후순위)
- data/ 정적 데이터 (기선, 특정어업수역 GeoJSON)

config.py를 kcgaidb로 재구성 (DB명, 사용자, 비밀번호)
DB 연결 검증 완료 (kcgaidb 37개 테이블 접근 확인)
Makefile에 dev-prediction / dev-all 타겟 추가
CLAUDE.md에 prediction 섹션 추가

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 12:56:51 +09:00

178 lines
6.2 KiB
Python

import unittest
import sys
import types
from datetime import datetime, timedelta, timezone
stub = types.ModuleType('pydantic_settings')
class BaseSettings:
def __init__(self, **kwargs):
for name, value in self.__class__.__dict__.items():
if name.isupper():
setattr(self, name, kwargs.get(name, value))
stub.BaseSettings = BaseSettings
sys.modules.setdefault('pydantic_settings', stub)
from algorithms.gear_parent_episode import (
GroupEpisodeInput,
EpisodeState,
build_episode_plan,
compute_prior_bonus_components,
continuity_score,
)
class GearParentEpisodeTest(unittest.TestCase):
def test_continuity_score_prefers_member_overlap_and_near_center(self):
current = GroupEpisodeInput(
group_key='ZHEDAIYU02394',
normalized_parent_name='ZHEDAIYU02394',
sub_cluster_id=1,
member_mmsis=['100', '200', '300'],
member_count=3,
center_lat=35.0,
center_lon=129.0,
)
previous = EpisodeState(
episode_id='ep-prev',
lineage_key='ZHEDAIYU02394',
group_key='ZHEDAIYU02394',
normalized_parent_name='ZHEDAIYU02394',
current_sub_cluster_id=0,
member_mmsis=['100', '200', '400'],
member_count=3,
center_lat=35.02,
center_lon=129.01,
last_snapshot_time=datetime.now(timezone.utc),
status='ACTIVE',
)
score, overlap_count, distance_nm = continuity_score(current, previous)
self.assertGreaterEqual(overlap_count, 2)
self.assertGreater(score, 0.45)
self.assertLess(distance_nm, 12.0)
def test_build_episode_plan_creates_merge_episode(self):
now = datetime.now(timezone.utc)
current = GroupEpisodeInput(
group_key='JINSHI',
normalized_parent_name='JINSHI',
sub_cluster_id=0,
member_mmsis=['a', 'b', 'c', 'd'],
member_count=4,
center_lat=35.0,
center_lon=129.0,
)
previous_a = EpisodeState(
episode_id='ep-a',
lineage_key='JINSHI',
group_key='JINSHI',
normalized_parent_name='JINSHI',
current_sub_cluster_id=1,
member_mmsis=['a', 'b'],
member_count=2,
center_lat=35.0,
center_lon=129.0,
last_snapshot_time=now - timedelta(minutes=5),
status='ACTIVE',
)
previous_b = EpisodeState(
episode_id='ep-b',
lineage_key='JINSHI',
group_key='JINSHI',
normalized_parent_name='JINSHI',
current_sub_cluster_id=2,
member_mmsis=['c', 'd'],
member_count=2,
center_lat=35.01,
center_lon=129.01,
last_snapshot_time=now - timedelta(minutes=5),
status='ACTIVE',
)
plan = build_episode_plan([current], {'JINSHI': [previous_a, previous_b]})
assignment = plan.assignments[current.key]
self.assertEqual(assignment.continuity_source, 'MERGE_NEW')
self.assertEqual(set(assignment.merged_from_episode_ids), {'ep-a', 'ep-b'})
self.assertEqual(plan.merged_episode_targets['ep-a'], assignment.episode_id)
self.assertEqual(plan.merged_episode_targets['ep-b'], assignment.episode_id)
def test_build_episode_plan_marks_split_continue_and_split_new(self):
now = datetime.now(timezone.utc)
previous = EpisodeState(
episode_id='ep-prev',
lineage_key='A01859',
group_key='A01859',
normalized_parent_name='A01859',
current_sub_cluster_id=0,
member_mmsis=['a', 'b', 'c', 'd'],
member_count=4,
center_lat=35.0,
center_lon=129.0,
last_snapshot_time=now - timedelta(minutes=5),
status='ACTIVE',
)
current_a = GroupEpisodeInput(
group_key='A01859',
normalized_parent_name='A01859',
sub_cluster_id=1,
member_mmsis=['a', 'b', 'c'],
member_count=3,
center_lat=35.0,
center_lon=129.0,
)
current_b = GroupEpisodeInput(
group_key='A01859',
normalized_parent_name='A01859',
sub_cluster_id=2,
member_mmsis=['c', 'd'],
member_count=2,
center_lat=35.02,
center_lon=129.02,
)
plan = build_episode_plan([current_a, current_b], {'A01859': [previous]})
sources = {plan.assignments[current_a.key].continuity_source, plan.assignments[current_b.key].continuity_source}
self.assertIn('SPLIT_CONTINUE', sources)
self.assertIn('SPLIT_NEW', sources)
def test_compute_prior_bonus_components_caps_total_bonus(self):
observed_at = datetime.now(timezone.utc)
bonuses = compute_prior_bonus_components(
observed_at=observed_at,
normalized_parent_name='JINSHI',
episode_id='ep-1',
candidate_mmsi='412333326',
episode_prior_stats={
('ep-1', '412333326'): {
'seen_count': 12,
'top1_count': 5,
'avg_score': 0.88,
'last_seen_at': observed_at - timedelta(hours=1),
},
},
lineage_prior_stats={
('JINSHI', '412333326'): {
'seen_count': 24,
'top1_count': 6,
'top3_count': 10,
'avg_score': 0.82,
'last_seen_at': observed_at - timedelta(hours=3),
},
},
label_prior_stats={
('JINSHI', '412333326'): {
'session_count': 4,
'last_labeled_at': observed_at - timedelta(days=1),
},
},
)
self.assertGreater(bonuses['episodePriorBonus'], 0.0)
self.assertGreater(bonuses['lineagePriorBonus'], 0.0)
self.assertGreater(bonuses['labelPriorBonus'], 0.0)
self.assertLessEqual(bonuses['priorBonusTotal'], 0.20)
if __name__ == '__main__':
unittest.main()