普通消息是指阿里云云消息隊列 RocketMQ 版中無特性的消息,區別于有特性的定時消息、順序消息和事務消息。本文提供使用TCP協議下的社區版C++ SDK收發普通消息的示例代碼供您參考。
前提條件
獲取阿里云訪問密鑰AccessKey ID和AccessKey Secret。更多信息,請參見創建AccessKey。
發送普通消息
將以下代碼拷貝到ProducerDemo.cpp文件中,修改相應的參數后,使用g++命令進行編譯,生成可執行文件,即可直接運行。
g++ -o producer_demo -std=c++11 -lz -lrocketmq ProducerDemo.cpp
發送普通消息的示例代碼如下。
#include <iostream> #include <chrono> #include <thread> #include "DefaultMQProducer.h" using namespace std; using namespace rocketmq; int main() { std::cout << "=======Before sending messages=======" << std::endl; //您在阿里云消息隊列RocketMQ控制臺上申請的GID。 DefaultMQProducer producer("GID_XXXXXXXXX"); //設置TCP協議接入點,從阿里云RocketMQ控制臺的實例詳情頁面獲取。 producer.setNamesrvAddr("http://MQ_INST_XXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80"); //請確保環境變量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設置。 //ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET分別為阿里云賬號的AccessKey ID和AccessKey Secret,用于身份驗證。 //用戶渠道,默認值為:ALIYUN。 producer.setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN"); //請確保參數設置完成之后啟動Producer。 producer.start(); auto start = std::chrono::system_clock::now(); int count = 32; for (int i = 0; i < count; ++i) { //發送消息時請設置您在阿里云消息隊列RocketMQ控制臺上申請的Topic。 MQMessage msg("YOURTOPIC","HiTAG","HelloCPPSDK."); try { SendResult sendResult = producer.send(msg); std::cout <<"SendResult:"<<sendResult.getSendStatus()<< ", Message ID: " << sendResult.getMsgId() << std::endl; this_thread::sleep_for(chrono::seconds(1)); } catch (MQException e) { std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl; } } auto interval = std::chrono::system_clock::now() - start; std::cout << "Send " << count << " messages OK, costs " << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl; producer.shutdown(); std::cout << "=======After sending messages=======" << std::endl; return 0; }
消費普通消息
將以下代碼拷貝到ConsumerDemo.cpp文件中,修改相應的參數后,使用g++命令進行編譯,生成可執行文件,即可直接運行。
g++ -o consumer_demo -std=c++11 -lz -lrocketmq ConsumerDemo.cpp
消費普通消息的示例代碼如下。
#include <iostream> #include <thread> #include "DefaultMQPushConsumer.h" using namespace rocketmq; class ExampleMessageListener : public MessageListenerConcurrently { public: ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) { for (auto item = msgs.begin(); item != msgs.end(); item++) { std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl; } return CONSUME_SUCCESS; } }; int main(int argc, char *argv[]) { std::cout << "=======Before consuming messages=======" << std::endl; //您在阿里云消息隊列RocketMQ控制臺上申請的GID。 DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXXXXXXX"); //設置TCP協議接入點,從阿里云消息隊列RocketMQ控制臺的實例詳情頁面獲取。 consumer->setNamesrvAddr("http://MQ_INST_XXXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80"); //請確保環境變量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設置。 //ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET分別為阿里云賬號的AccessKey ID和AccessKey Secret,用于身份驗證。 //用戶渠道,默認值為:ALIYUN。 consumer->setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN"); auto start = std::chrono::system_clock::now(); //請注冊自定義偵聽函數用來處理接收到的消息,并返回響應的處理結果。 ExampleMessageListener *messageListener = new ExampleMessageListener(); consumer->subscribe("YOURTOPIC", "*"); consumer->registerMessageListener(messageListener); //準備工作完成,必須調用啟動函數,才可以正常工作。 // ******************************************** // 1.確保訂閱關系的設置在啟動之前完成。 // 2.確保相同GID下面的消費者的訂閱關系一致。 // ********************************************* consumer->start(); // 保持主線程運行直到進程結束。 std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000)); consumer->shutdown(); std::cout << "=======After consuming messages======" << std::endl; return 0; }
文檔內容是否對您有幫助?