- CollectDB 다중 신호 수집 → S&P Global AIS API 단일 수집으로 전환 - sig_src_cd + target_id 이중 식별자 → mmsi(VARCHAR) 단일 식별자 - t_vessel_latest_position → t_ais_position 테이블 전환 - 레거시 배치/유틸 ~30개 클래스 삭제 (VesselAggregationJobConfig, ShipKindCodeConverter 등) - AisTargetCacheManager 기반 캐시 이중 구조 (최신위치 + 트랙 버퍼) - CacheBasedVesselTrackDataReader + CacheBasedTrackJobListener 신규 추가 - VesselStaticStepConfig: 정적정보 CDC 변경 검출 + hourly job 편승 - SignalKindCode enum: vesselType/extraInfo 기반 선종 자동 분류 - WebSocket/STOMP 전체 mmsi 전환 (StompTrackStreamingService ~40곳) - 모니터링/성능 최적화 코드 mmsi 기반 전환 - DataSource 설정 통합 (snpdb 단일 DB) - AreaBoundaryCache Polygon→Geometry 캐스트 수정 (MULTIPOLYGON 지원) - ConcurrentHashMap 적용 (VesselTrackStepConfig 동시성 버그 수정) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
176 lines
6.5 KiB
Python
176 lines
6.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
WebSocket 부하 테스트 자동화 스크립트
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import time
|
|
import statistics
|
|
from datetime import datetime, timedelta
|
|
import websockets
|
|
import stomper
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
class WebSocketLoadTest:
|
|
def __init__(self, base_url="ws://10.26.252.48:8090/ws-tracks"):
|
|
self.base_url = base_url
|
|
self.results = []
|
|
self.active_connections = 0
|
|
|
|
async def single_client_test(self, client_id, duration_seconds=60):
|
|
"""단일 클라이언트 테스트"""
|
|
start_time = time.time()
|
|
messages_received = 0
|
|
bytes_received = 0
|
|
errors = 0
|
|
|
|
try:
|
|
async with websockets.connect(self.base_url) as websocket:
|
|
self.active_connections += 1
|
|
print(f"Client {client_id}: Connected")
|
|
|
|
# STOMP CONNECT
|
|
connect_frame = stomper.connect(host='/', accept_version='1.2')
|
|
await websocket.send(connect_frame)
|
|
|
|
# Subscribe to data channel
|
|
sub_frame = stomper.subscribe('/user/queue/tracks/data', client_id)
|
|
await websocket.send(sub_frame)
|
|
|
|
# Send query request
|
|
query_request = {
|
|
"startTime": (datetime.now() - timedelta(days=1)).isoformat(),
|
|
"endTime": datetime.now().isoformat(),
|
|
"viewport": {
|
|
"minLon": 124.0,
|
|
"maxLon": 132.0,
|
|
"minLat": 33.0,
|
|
"maxLat": 38.0
|
|
},
|
|
"filters": {
|
|
"minDistance": 10,
|
|
"minSpeed": 5
|
|
},
|
|
"chunkSize": 2000
|
|
}
|
|
|
|
send_frame = stomper.send('/app/tracks/query', json.dumps(query_request))
|
|
await websocket.send(send_frame)
|
|
|
|
# Receive messages
|
|
while time.time() - start_time < duration_seconds:
|
|
try:
|
|
message = await asyncio.wait_for(websocket.recv(), timeout=1.0)
|
|
messages_received += 1
|
|
bytes_received += len(message)
|
|
|
|
# Parse STOMP frame
|
|
frame = stomper.Frame()
|
|
frame.parse(message)
|
|
|
|
if frame.cmd == 'MESSAGE':
|
|
data = json.loads(frame.body)
|
|
if data.get('type') == 'complete':
|
|
print(f"Client {client_id}: Query completed")
|
|
break
|
|
|
|
except asyncio.TimeoutError:
|
|
continue
|
|
except Exception as e:
|
|
errors += 1
|
|
print(f"Client {client_id}: Error - {e}")
|
|
|
|
except Exception as e:
|
|
errors += 1
|
|
print(f"Client {client_id}: Connection error - {e}")
|
|
finally:
|
|
self.active_connections -= 1
|
|
|
|
# Calculate results
|
|
elapsed_time = time.time() - start_time
|
|
result = {
|
|
'client_id': client_id,
|
|
'duration': elapsed_time,
|
|
'messages': messages_received,
|
|
'bytes': bytes_received,
|
|
'errors': errors,
|
|
'msg_per_sec': messages_received / elapsed_time if elapsed_time > 0 else 0,
|
|
'mbps': (bytes_received / 1024 / 1024) / elapsed_time if elapsed_time > 0 else 0
|
|
}
|
|
|
|
self.results.append(result)
|
|
return result
|
|
|
|
async def run_load_test(self, num_clients=10, duration=60):
|
|
"""병렬 부하 테스트 실행"""
|
|
print(f"Starting load test with {num_clients} clients for {duration} seconds...")
|
|
|
|
tasks = []
|
|
for i in range(num_clients):
|
|
task = asyncio.create_task(self.single_client_test(i, duration))
|
|
tasks.append(task)
|
|
await asyncio.sleep(0.1) # Stagger connections
|
|
|
|
# Wait for all clients to complete
|
|
await asyncio.gather(*tasks)
|
|
|
|
# Print summary
|
|
self.print_summary()
|
|
|
|
def print_summary(self):
|
|
"""테스트 결과 요약 출력"""
|
|
print("\n" + "="*60)
|
|
print("LOAD TEST SUMMARY")
|
|
print("="*60)
|
|
|
|
total_messages = sum(r['messages'] for r in self.results)
|
|
total_bytes = sum(r['bytes'] for r in self.results)
|
|
total_errors = sum(r['errors'] for r in self.results)
|
|
avg_msg_per_sec = statistics.mean(r['msg_per_sec'] for r in self.results)
|
|
avg_mbps = statistics.mean(r['mbps'] for r in self.results)
|
|
|
|
print(f"Total Clients: {len(self.results)}")
|
|
print(f"Total Messages: {total_messages:,}")
|
|
print(f"Total Data: {total_bytes/1024/1024:.2f} MB")
|
|
print(f"Total Errors: {total_errors}")
|
|
print(f"Avg Messages/sec per client: {avg_msg_per_sec:.2f}")
|
|
print(f"Avg Throughput per client: {avg_mbps:.2f} MB/s")
|
|
print(f"Total Throughput: {avg_mbps * len(self.results):.2f} MB/s")
|
|
|
|
# Error rate
|
|
error_rate = (total_errors / len(self.results)) * 100 if self.results else 0
|
|
print(f"Error Rate: {error_rate:.2f}%")
|
|
|
|
# Success rate
|
|
successful_clients = sum(1 for r in self.results if r['errors'] == 0)
|
|
success_rate = (successful_clients / len(self.results)) * 100 if self.results else 0
|
|
print(f"Success Rate: {success_rate:.2f}%")
|
|
|
|
print("="*60)
|
|
|
|
async def main():
|
|
# Test scenarios
|
|
scenarios = [
|
|
{"clients": 10, "duration": 60, "name": "Light Load"},
|
|
{"clients": 50, "duration": 120, "name": "Medium Load"},
|
|
{"clients": 100, "duration": 180, "name": "Heavy Load"}
|
|
]
|
|
|
|
for scenario in scenarios:
|
|
print(f"\n{'='*60}")
|
|
print(f"Running scenario: {scenario['name']}")
|
|
print(f"{'='*60}")
|
|
|
|
tester = WebSocketLoadTest()
|
|
await tester.run_load_test(
|
|
num_clients=scenario['clients'],
|
|
duration=scenario['duration']
|
|
)
|
|
|
|
# Wait between scenarios
|
|
print(f"\nWaiting 30 seconds before next scenario...")
|
|
await asyncio.sleep(30)
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|