signal-batch/scripts/websocket-load-test.py
htlee 2e9361ee58 refactor: SNP API 전환 및 레거시 코드 전면 정리
- CollectDB 다중 신호 수집 → S&P Global AIS API 단일 수집으로 전환
- sig_src_cd + target_id 이중 식별자 → mmsi(VARCHAR) 단일 식별자
- t_vessel_latest_position → t_ais_position 테이블 전환
- 레거시 배치/유틸 ~30개 클래스 삭제 (VesselAggregationJobConfig, ShipKindCodeConverter 등)
- AisTargetCacheManager 기반 캐시 이중 구조 (최신위치 + 트랙 버퍼)
- CacheBasedVesselTrackDataReader + CacheBasedTrackJobListener 신규 추가
- VesselStaticStepConfig: 정적정보 CDC 변경 검출 + hourly job 편승
- SignalKindCode enum: vesselType/extraInfo 기반 선종 자동 분류
- WebSocket/STOMP 전체 mmsi 전환 (StompTrackStreamingService ~40곳)
- 모니터링/성능 최적화 코드 mmsi 기반 전환
- DataSource 설정 통합 (snpdb 단일 DB)
- AreaBoundaryCache Polygon→Geometry 캐스트 수정 (MULTIPOLYGON 지원)
- ConcurrentHashMap 적용 (VesselTrackStepConfig 동시성 버그 수정)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 09:59:49 +09:00

176 lines
6.5 KiB
Python

#!/usr/bin/env python3
"""
WebSocket 부하 테스트 자동화 스크립트
"""
import asyncio
import json
import time
import statistics
from datetime import datetime, timedelta
import websockets
import stomper
from concurrent.futures import ThreadPoolExecutor
class WebSocketLoadTest:
def __init__(self, base_url="ws://10.26.252.48:8090/ws-tracks"):
self.base_url = base_url
self.results = []
self.active_connections = 0
async def single_client_test(self, client_id, duration_seconds=60):
"""단일 클라이언트 테스트"""
start_time = time.time()
messages_received = 0
bytes_received = 0
errors = 0
try:
async with websockets.connect(self.base_url) as websocket:
self.active_connections += 1
print(f"Client {client_id}: Connected")
# STOMP CONNECT
connect_frame = stomper.connect(host='/', accept_version='1.2')
await websocket.send(connect_frame)
# Subscribe to data channel
sub_frame = stomper.subscribe('/user/queue/tracks/data', client_id)
await websocket.send(sub_frame)
# Send query request
query_request = {
"startTime": (datetime.now() - timedelta(days=1)).isoformat(),
"endTime": datetime.now().isoformat(),
"viewport": {
"minLon": 124.0,
"maxLon": 132.0,
"minLat": 33.0,
"maxLat": 38.0
},
"filters": {
"minDistance": 10,
"minSpeed": 5
},
"chunkSize": 2000
}
send_frame = stomper.send('/app/tracks/query', json.dumps(query_request))
await websocket.send(send_frame)
# Receive messages
while time.time() - start_time < duration_seconds:
try:
message = await asyncio.wait_for(websocket.recv(), timeout=1.0)
messages_received += 1
bytes_received += len(message)
# Parse STOMP frame
frame = stomper.Frame()
frame.parse(message)
if frame.cmd == 'MESSAGE':
data = json.loads(frame.body)
if data.get('type') == 'complete':
print(f"Client {client_id}: Query completed")
break
except asyncio.TimeoutError:
continue
except Exception as e:
errors += 1
print(f"Client {client_id}: Error - {e}")
except Exception as e:
errors += 1
print(f"Client {client_id}: Connection error - {e}")
finally:
self.active_connections -= 1
# Calculate results
elapsed_time = time.time() - start_time
result = {
'client_id': client_id,
'duration': elapsed_time,
'messages': messages_received,
'bytes': bytes_received,
'errors': errors,
'msg_per_sec': messages_received / elapsed_time if elapsed_time > 0 else 0,
'mbps': (bytes_received / 1024 / 1024) / elapsed_time if elapsed_time > 0 else 0
}
self.results.append(result)
return result
async def run_load_test(self, num_clients=10, duration=60):
"""병렬 부하 테스트 실행"""
print(f"Starting load test with {num_clients} clients for {duration} seconds...")
tasks = []
for i in range(num_clients):
task = asyncio.create_task(self.single_client_test(i, duration))
tasks.append(task)
await asyncio.sleep(0.1) # Stagger connections
# Wait for all clients to complete
await asyncio.gather(*tasks)
# Print summary
self.print_summary()
def print_summary(self):
"""테스트 결과 요약 출력"""
print("\n" + "="*60)
print("LOAD TEST SUMMARY")
print("="*60)
total_messages = sum(r['messages'] for r in self.results)
total_bytes = sum(r['bytes'] for r in self.results)
total_errors = sum(r['errors'] for r in self.results)
avg_msg_per_sec = statistics.mean(r['msg_per_sec'] for r in self.results)
avg_mbps = statistics.mean(r['mbps'] for r in self.results)
print(f"Total Clients: {len(self.results)}")
print(f"Total Messages: {total_messages:,}")
print(f"Total Data: {total_bytes/1024/1024:.2f} MB")
print(f"Total Errors: {total_errors}")
print(f"Avg Messages/sec per client: {avg_msg_per_sec:.2f}")
print(f"Avg Throughput per client: {avg_mbps:.2f} MB/s")
print(f"Total Throughput: {avg_mbps * len(self.results):.2f} MB/s")
# Error rate
error_rate = (total_errors / len(self.results)) * 100 if self.results else 0
print(f"Error Rate: {error_rate:.2f}%")
# Success rate
successful_clients = sum(1 for r in self.results if r['errors'] == 0)
success_rate = (successful_clients / len(self.results)) * 100 if self.results else 0
print(f"Success Rate: {success_rate:.2f}%")
print("="*60)
async def main():
# Test scenarios
scenarios = [
{"clients": 10, "duration": 60, "name": "Light Load"},
{"clients": 50, "duration": 120, "name": "Medium Load"},
{"clients": 100, "duration": 180, "name": "Heavy Load"}
]
for scenario in scenarios:
print(f"\n{'='*60}")
print(f"Running scenario: {scenario['name']}")
print(f"{'='*60}")
tester = WebSocketLoadTest()
await tester.run_load_test(
num_clients=scenario['clients'],
duration=scenario['duration']
)
# Wait between scenarios
print(f"\nWaiting 30 seconds before next scenario...")
await asyncio.sleep(30)
if __name__ == "__main__":
asyncio.run(main())