本文介紹基于Java SDK提供的隊列消息發送以及消費的并發測試用例。
并發測試說明
并發測試是一種性能測試方法,用于驗證消息傳遞系統在同時處理多個消息或多個用戶請求時的性能和穩定性,在并發測試中,您可以指定并發度、運行時間;通過發送總請求數除以運行時間計算得到QPS。
前提條件
配置properties文件
mns.accountendpoint=http://12xxxxxxxx.mns.cn-xxx.aliyuncs.com
mns.perf.queueName=Queue_Test # queue名稱
mns.perf.threadNum=2 # 并發線程數
mns.perf.durationTime=6 # 測試持續時間(秒)
示例代碼
示例代碼下載,請參見JavaSDKPerfTest。
package com.aliyun.mns.sample.scenarios.perf;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.http.ClientConfiguration;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.common.utils.ThreadUtil;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.sample.utils.ReCreateUtil;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
/**
* 并發測試示例代碼
* 前置要求
* 1. 遵循阿里云規范,env設置ak、sk。
* 2. ${"user.home"}/.aliyun-mns.properties 文件配置如下:
* mns.endpoint=http://xxxxxxx
* mns.perf.queueName=JavaSDKPerfTestQueue # queue名稱
* mns.perf.threadNum=200 # 并發線程數
* mns.perf.durationTime=180 # 測試持續時間(秒)
*/
public class JavaSDKPerfTest {
private static MNSClient client = null;
private static String endpoint = null;
private static String queueName;
private static int threadNum;
/**
* 測試持續時間(秒)
*/
private static long durationTime;
public static void main(String[] args) throws InterruptedException {
if (!parseConf()) {
return;
}
// 1. init client
ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setMaxConnections(threadNum);
clientConfiguration.setMaxConnectionsPerRoute(threadNum);
CloudAccount cloudAccount = new CloudAccount(endpoint, clientConfiguration);
client = cloudAccount.getMNSClient();
// 2. reCreateQueue
ReCreateUtil.reCreateQueue(client,queueName);
// 3. SendMessage
Function<CloudQueue,Message> sendFunction = new Function<CloudQueue, Message>() {
@Override
public Message apply(CloudQueue queue) {
Message message = new Message();
message.setMessageBody("BodyTest");
return queue.putMessage(message);
}
};
actionProcess("SendMessage", sendFunction , durationTime);
// 4. Now is the ReceiveMessage
Function<CloudQueue,Message> receiveFunction = new Function<CloudQueue, Message>() {
@Override
public Message apply(CloudQueue queue) {
Message message = queue.popMessage();
String handle = message == null?null:message.getReceiptHandle();
if (StringUtils.isNotBlank(handle)) {
queue.deleteMessage(handle);
}
return message;
}
};
actionProcess("ReceiveAndDelMessage", receiveFunction, durationTime);
client.close();
System.out.println("=======end=======");
}
private static void actionProcess(String actionName, final Function<CloudQueue, Message> function, final long durationSeconds) throws InterruptedException {
System.out.println(actionName +" start!");
final AtomicLong totalCount = new AtomicLong(0);
ThreadPoolExecutor executor = ThreadUtil.initThreadPoolExecutorAbort();
ThreadUtil.asyncWithReturn(executor, threadNum, new ThreadUtil.AsyncRunInterface() {
@Override
public void run() {
try {
String threadName = Thread.currentThread().getName();
CloudQueue queue = client.getQueueRef(queueName);
Message message = new Message();
message.setMessageBody("BodyTest");
long count = 0;
Date startDate = new Date();
long startTime = startDate.getTime();
System.out.printf("[Thread%s]startTime:%s %n", threadName, getBjTime(startDate));
long endTime = startTime + durationSeconds * 1000L;
while (true) {
for (int i = 0; i < 50; ++i) {
function.apply(queue);
}
count += 50;
if (System.currentTimeMillis() >= endTime) {
break;
}
}
System.out.printf("[Thread%s]endTime:%s,count:%d %n", threadName, getBjTime(new Date()),count);
totalCount.addAndGet(count);
} catch (Exception e) {
e.printStackTrace();
}
}
});
executor.shutdown();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
System.out.println(actionName +" QPS: "+(totalCount.get() / durationSeconds));
}
protected static boolean parseConf() {
// init the member parameters
endpoint = ServiceSettings.getMNSAccountEndpoint();
System.out.println("Endpoint: " + endpoint);
queueName = ServiceSettings.getMNSPropertyValue("perf.queueName","JavaSDKPerfTestQueue");
System.out.println("QueueName: " + queueName);
threadNum = Integer.parseInt(ServiceSettings.getMNSPropertyValue("perf.threadNum","2"));
System.out.println("ThreadNum: " + threadNum);
durationTime = Long.parseLong(ServiceSettings.getMNSPropertyValue("perf.totalSeconds","6"));
System.out.println("DurationTime: " + durationTime);
return true;
}
/**
* 獲取北京時間
*/
private static String getBjTime(Date date){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
return sdf.format(date);
}
public interface Function<T, R> {
/**
* Applies this function to the given argument.
*
* @param t the function argument
* @return the function result
*/
R apply(T t);
}
}
文檔內容是否對您有幫助?