云消息隊列 RocketMQ 版5.x版本實例可兼容4.x/3.x SDK客戶端接入,您可以使用4.x/3.x版本的SDK接入5.x實例進行消息收發。本文為您介紹4.x/3.x版本下的C++ SDK消息收發示例代碼。
重要
- 推薦您使用最新的RocketMQ 5.x系列SDK,5.x系列SDK作為主力研發版本,和云消息隊列 RocketMQ 版5.x服務端完全兼容,提供了更全面的功能并支持更多增強特性。更多信息,請參見5.x系列SDK。
- RocketMQ 4.x/3.x系列SDK和ONS系列SDK后續僅做功能維護,建議僅存量業務使用。
普通消息收發示例
發送普通消息
#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"
using namespace std;
using namespace rocketmq;
int main() {
std::cout << "=======Before sending messages=======" << std::endl;
// 設置為您在消息隊列RocketMQ版控制臺創建的Group ID。
DefaultMQProducer producer("GID_XXX");
// 設置為您從阿里云消息隊列RocketMQ版控制臺獲取的接入點信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意?。。≈苯犹顚懣刂婆_提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
producer.setNamesrvAddr("ACCESS POINT");
// 設置實例用戶名、實例密碼和用戶渠道。用戶渠道默認為“ALIYUN”。
// 若您使用公網接入消息隊列RocketMQ版實例,則必須配置實例的用戶名和密碼;若使用VPC接入,則無需配置。
// 如果實例類型為Serverless實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
// 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
producer.setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
// 注意?。?!訪問RocketMQ 5.0實例時,InstanceID屬性不需要設置,否則會導致失敗。
// 請確保參數設置完成之后啟動Producer。
producer.start();
auto start = std::chrono::system_clock::now();
int count = 32;
for (int i = 0; i < count; ++i) {
// 設置為您在消息隊列RocketMQ版控制臺上創建的普通消息類型的Topic。
MQMessage msg("YOURTOPIC","HiTAG","HelloCPPSDK.");
try {
SendResult sendResult = producer.send(msg);
std::cout <<"SendResult:"<<sendResult.getSendStatus()<< ", Message ID: " << sendResult.getMsgId() << std::endl;
this_thread::sleep_for(chrono::seconds(1));
} catch (MQException e) {
std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
}
}
auto interval = std::chrono::system_clock::now() - start;
std::cout << "Send " << count << " messages OK, costs "
<< std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
producer.shutdown();
std::cout << "=======After sending messages=======" << std::endl;
return 0;
}
訂閱普通消息
#include <iostream>
#include <thread>
#include "DefaultMQPushConsumer.h"
using namespace rocketmq;
class ExampleMessageListener : public MessageListenerConcurrently {
public:
ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
for (auto item = msgs.begin(); item != msgs.end(); item++) {
std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl;
}
return CONSUME_SUCCESS;
}
};
int main(int argc, char *argv[]) {
std::cout << "=======Before consuming messages=======" << std::endl;
// 設置為您在消息隊列RocketMQ版控制臺創建的Group ID。
DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXX");
// 設置為您從消息隊列RocketMQ版控制臺獲取的接入點信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!??!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
consumer->setNamesrvAddr("ACCESS POINT");
// 設置實例用戶名、實例密碼和用戶渠道。用戶渠道默認為“ALIYUN”。
// 若您使用公網接入消息隊列RocketMQ版實例,則必須配置實例的用戶名和密碼;若使用VPC接入,則無需配置。
// 如果實例類型為Serverless實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
// 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
consumer->setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
// 注意?。。≡L問RocketMQ 5.0實例時,InstanceID屬性不需要設置,否則會導致失敗。
auto start = std::chrono::system_clock::now();
// 請注冊自定義監聽函數用來處理接收到的消息,并返回響應的處理結果。
ExampleMessageListener *messageListener = new ExampleMessageListener();
consumer->subscribe("YOURTOPIC", "*");
consumer->registerMessageListener(messageListener);
// 準備工作完成,必須調用啟動函數,才可以正常工作。
// ********************************************
// 1.確保訂閱關系的設置在啟動之前完成。
// 2.確保相同消費者分組內的消費者的訂閱關系一致。
// *********************************************
consumer->start();
// 請保持線程常駐,不要執行shutdown操作。
std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
consumer->shutdown();
std::cout << "=======After consuming messages======" << std::endl;
return 0;
}
順序消息收發示例
發送順序消息
#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"
using namespace std;
using namespace rocketmq;
class ExampleSelectMessageQueueByHash : public MessageQueueSelector {
public:
MQMessageQueue select(const std::vector<MQMessageQueue> &mqs, const MQMessage &msg, void *arg) {
// 實現自定義分區邏輯,根據業務傳入arg參數即分區鍵,計算路由到哪個隊列,這里以arg為int型參數為例。
int orderId = *static_cast<int *>(arg);
int index = orderId % mqs.size();
return mqs[0];
}
};
int main() {
std::cout << "=======Before sending messages=======" << std::endl;
// 設置為您在消息隊列RocketMQ版控制臺創建的Group ID。
DefaultMQProducer producer("GID_XXX");
// 設置為您從消息隊列RocketMQ版控制臺獲取的接入點信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意?。?!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
producer.setNamesrvAddr("ACCESS POINT");
// 設置實例用戶名、實例密碼和用戶渠道。用戶渠道默認為“ALIYUN”。
// 若您使用公網接入消息隊列RocketMQ版實例,則必須配置實例的用戶名和密碼;若使用VPC接入,則無需配置。
// 如果實例類型為Serverless實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
// 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
producer.setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
// 注意?。?!訪問RocketMQ 5.0實例時,InstanceID屬性不需要設置,否則會導致失敗。
// 請確保參數設置完成之后啟動Producer。
producer.start();
auto start = std::chrono::system_clock::now();
int count = 32;
// 可以設置發送重試的次數,確保發送成功。
int retryTimes = 1;
// 參考接口MessageQueueSelector,通過設置的自定義參數arg,計算發送到指定的路由隊列中,此處的arg便是分區ID。
ExampleSelectMessageQueueByHash *pSelector = new ExampleSelectMessageQueueByHash();
for (int i = 0; i < count; ++i) {
// 設置為您在消息隊列RocketMQ版控制臺上創建的順序消息類型的Topic。
MQMessage msg("YOUR ORDERLY TOPIC", "HiTAG", "Hello,CPP SDK, Orderly Message.");
try {
SendResult sendResult = producer.send(msg, pSelector, &i, 1, false);
std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
<< "MessageQueue:" << sendResult.getMessageQueue().toString() << std::endl;
this_thread::sleep_for(chrono::seconds(1));
} catch (MQException e) {
std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
}
}
auto interval = std::chrono::system_clock::now() - start;
std::cout << "Send " << count << " messages OK, costs "
<< std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
producer.shutdown();
std::cout << "=======After sending messages=======" << std::endl;
return 0;
}
訂閱順序消息
#include <iostream>
#include <thread>
#include "DefaultMQPushConsumer.h"
using namespace rocketmq;
class ExampleOrderlyMessageListener : public MessageListenerOrderly {
public:
ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
for (auto item = msgs.begin(); item != msgs.end(); item++) {
std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl;
}
return CONSUME_SUCCESS;
}
};
int main(int argc, char *argv[]) {
std::cout << "=======Before consuming messages=======" << std::endl;
// 設置為您在消息隊列RocketMQ版控制臺創建的Group ID。
DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXX");
// 設置為您從消息隊列RocketMQ版控制臺獲取的接入點信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!??!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
consumer->setNamesrvAddr("ACCESS POINT");
// 設置實例用戶名、實例密碼和用戶渠道。用戶渠道默認為“ALIYUN”。
// 若您使用公網接入消息隊列RocketMQ版實例,則必須配置實例的用戶名和密碼;若使用VPC接入,則無需配置。
// 如果實例類型為Serverless實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
// 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
consumer->setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
// 注意?。?!訪問RocketMQ 5.0實例時,InstanceID屬性不需要設置,否則會導致失敗。
auto start = std::chrono::system_clock::now();
// 請注冊自定義偵聽函數用來處理接收到的消息,并返回響應的處理結果。
ExampleOrderlyMessageListener *messageListener = new ExampleOrderlyMessageListener();
consumer->subscribe("YOUR ORDERLY TOPIC", "*");
consumer->registerMessageListener(messageListener);
// 準備工作完成,必須調用啟動函數,才可以正常工作。
// ********************************************
// 1.確保訂閱關系的設置在啟動之前完成。
// 2.確保相同消費者分組內的消費者的訂閱關系一致。
// *********************************************
consumer->start();
// 請保持線程常駐,不要執行shutdown操作。
std::this_thread::sleep_for(std::chrono::seconds (60 ));
consumer->shutdown();
std::cout << "=======After consuming messages======" << std::endl;
return 0;
}
定時/延時消息收發示例
發送定時/延時消息
#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"
using namespace std;
using namespace rocketmq;
int main() {
std::cout << "=======Before sending messages=======" << std::endl;
// 設置為您在消息隊列RocketMQ版控制臺創建的Group ID。
DefaultMQProducer producer("GID_XXX");
// 設置為您從消息隊列RocketMQ版控制臺獲取的接入點信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意?。?!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
producer.setNamesrvAddr("ACCESS POINT");
// 設置實例用戶名、實例密碼和用戶渠道。用戶渠道默認為“ALIYUN”。
// 若您使用公網接入消息隊列RocketMQ版實例,則必須配置實例的用戶名和密碼;若使用VPC接入,則無需配置。
// 如果實例類型為Serverless實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
// 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
producer.setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
// 注意!??!訪問RocketMQ 5.0實例時,InstanceID屬性不需要設置,否則會導致失敗。
//請確保參數設置完成之后啟動Producer。
producer.start();
auto start = std::chrono::system_clock::now();
int count = 32;
for (int i = 0; i < count; ++i) {
// 設置為您在消息隊列RocketMQ版控制臺上創建的定時/延時消息類型的Topic。
MQMessage msg("YOUR DELAY TOPIC", "HiTAG", "Hello,CPP SDK, Delay Message.");
chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
// 定時延時消息,單位毫秒(ms),在指定時間戳(當前時間之后)進行投遞,例如2022-08-07 16:21:00投遞。
// 如果設置成當前時間戳之前的某個時刻,消息將被立刻投遞給消費者。
// 設置需要延時或者定時的時間,例如當前時間延遲10秒后投遞。
long exp = mil.count() + 10000;
msg.setProperty("__STARTDELIVERTIME", to_string(exp));
std::cout << "Now: " << mil.count() << " Exp:" << exp << std::endl;
try {
SendResult sendResult = producer.send(msg);
std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
<< std::endl;
this_thread::sleep_for(chrono::seconds(1));
} catch (MQException e) {
std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
}
}
auto interval = std::chrono::system_clock::now() - start;
std::cout << "Send " << count << " messages OK, costs "
<< std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
producer.shutdown();
std::cout << "=======After sending messages=======" << std::endl;
return 0;
}
訂閱定時/延時消息
#include <iostream>
#include <thread>
#include <chrono>
#include "DefaultMQPushConsumer.h"
using namespace rocketmq;
using namespace std;
class ExampleDelayMessageListener : public MessageListenerConcurrently {
public:
ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
for (auto item = msgs.begin(); item != msgs.end(); item++) {
chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
std::cout << "Now: " << mil.count() << " Received Message Topic:" << item->getTopic() << ", MsgId:"
<< item->getMsgId() << " DelayTime:" << item->getProperty("__STARTDELIVERTIME") << std::endl;
}
return CONSUME_SUCCESS;
}
};
int main(int argc, char *argv[]) {
std::cout << "=======Before consuming messages=======" << std::endl;
// 設置為您在消息隊列RocketMQ版控制臺創建的Group ID。
DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXX");
// 設置為您從消息隊列RocketMQ版控制臺獲取的接入點信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意?。。≈苯犹顚懣刂婆_提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
consumer->setNamesrvAddr("ACCESS POINT");
// 設置實例用戶名、實例密碼和用戶渠道。用戶渠道默認為“ALIYUN”。
// 若您使用公網接入消息隊列RocketMQ版實例,則必須配置實例的用戶名和密碼;若使用VPC接入,則無需配置。
// 如果實例類型為Serverless實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
// 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
consumer->setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
// 注意?。。≡L問RocketMQ 5.0實例時,InstanceID屬性不需要設置,否則會導致失敗。
auto start = std::chrono::system_clock::now();
//register your own listener here to handle the messages received.
// 請注冊自定義偵聽函數用來處理接收到的消息,并返回響應的處理結果。
ExampleDelayMessageListener *messageListener = new ExampleDelayMessageListener();
consumer->subscribe("YOUR DELAY TOPIC", "*");
consumer->registerMessageListener(messageListener);
// 準備工作完成,必須調用啟動函數,才可以正常工作。
// ********************************************
// 1.確保訂閱關系的設置在啟動之前完成。
// 2.確保相同消費者分組內的消費者的訂閱關系一致。
// *********************************************
consumer->start();
// 請保持線程常駐,不要執行shutdown操作。
std::this_thread::sleep_for(std::chrono::seconds(600));
consumer->shutdown();
std::cout << "=======After consuming messages======" << std::endl;
return 0;
}
事務消息
發送事務消息
#include <iostream>
#include <chrono>
#include <thread>
#include "TransactionMQProducer.h"
#include "MQClientException.h"
#include "TransactionListener.h"
using namespace std;
using namespace rocketmq;
class ExampleTransactionListener : public TransactionListener {
public:
LocalTransactionState executeLocalTransaction(const MQMessage &msg, void *arg) {
// 執行本地事務,成功返回COMMIT_MESSAGE,失敗返回ROLLBACK_MESSAGE,不確定返回UNKNOWN。
// 如果返回UNKNOWN,則會觸發定時任務回查狀態。
std::cout << "Execute Local Transaction,Received Message Topic:" << msg.getTopic() << ", MsgId:"
<< msg.getBody() << std::endl;
return UNKNOWN;
}
LocalTransactionState checkLocalTransaction(const MQMessageExt &msg) {
// 回查本地事務執行情況,成功返回COMMIT_MESSAGE,失敗返回ROLLBACK_MESSAGE,不確定返回UNKNOWN。
// 如果返回UNKNOWN,則等待下次定時任務回查。
std::cout << "Check Local Transaction,Received Message Topic:" << msg.getTopic() << ", MsgId:" << msg.getMsgId()
<< std::endl;
return COMMIT_MESSAGE;
}
};
int main() {
std::cout << "=======Before sending messages=======" << std::endl;
// 設置為您在消息隊列RocketMQ版控制臺創建的Group ID。
TransactionMQProducer producer("GID_XXX");
// 設置為您從消息隊列RocketMQ版控制臺獲取的接入點信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。 // 注意?。?!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。producer.setNamesrvAddr("ACCESS POINT");
// 設置實例用戶名、實例密碼和用戶渠道。用戶渠道默認為“ALIYUN”。
// 若您使用公網接入消息隊列RocketMQ版實例,則必須配置實例的用戶名和密碼;若使用VPC接入,則無需配置。
// 如果實例類型為Serverless實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
// 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
producer.setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
// 注意!??!訪問RocketMQ 5.0實例時,InstanceID屬性不需要設置,否則會導致失敗。
// 本地事務執行和回查函數。
ExampleTransactionListener *exampleTransactionListener = new ExampleTransactionListener();
producer.setTransactionListener(exampleTransactionListener);
// 請確保參數設置完成之后啟動Producer。
producer.start();
auto start = std::chrono::system_clock::now();
int count = 3;
for (int i = 0; i < count; ++i) {
// 設置為您在消息隊列RocketMQ版控制臺上創建的事務消息類型的Topic。
MQMessage msg("YOUR TRANSACTION TOPIC", "HiTAG", "Hello,CPP SDK, Transaction Message.");
try {
SendResult sendResult = producer.sendMessageInTransaction(msg, &i);
std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
<< std::endl;
this_thread::sleep_for(chrono::seconds(1));
} catch (MQException e) {
std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
}
}
auto interval = std::chrono::system_clock::now() - start;
std::cout << "Send " << count << " messages OK, costs "
<< std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
std::cout << "Wait for local transaction check..... " << std::endl;
for (int i = 0; i < 6; ++i) {
this_thread::sleep_for(chrono::seconds(10));
std::cout << "Running "<< i*10 + 10 << " Seconds......"<< std::endl;
}
producer.shutdown();
std::cout << "=======After sending messages=======" << std::endl;
return 0;
}
訂閱事務消息
訂閱事務消息的示例代碼和訂閱普通消息一樣,請參見訂閱普通消息。
文檔內容是否對您有幫助?