示例代碼
更新時間:
云消息隊(duì)列 RocketMQ 版5.x版本實(shí)例支持C++ 1.x SDK客戶端接入,您可以使用1.x版本的SDK接入5.x實(shí)例進(jìn)行消息收發(fā)。本文為您介紹1.x版本下的C++ SDK消息收發(fā)示例代碼。
重要
- 推薦您使用最新的RocketMQ 5.x系列SDK,5.x系列SDK作為主力研發(fā)版本,和云消息隊(duì)列 RocketMQ 版5.x服務(wù)端完全兼容,提供了更全面的功能并支持更多增強(qiáng)特性。更多信息,請參見5.x系列SDK。
- RocketMQ 4.x/3.x系列SDK和ONS系列SDK后續(xù)僅做功能維護(hù),建議僅存量業(yè)務(wù)使用。
普通消息收發(fā)示例
發(fā)送普通消息
#include "ONSFactory.h"
#include "ONSClientException.h"
using namespace ons;
int main()
{
// 創(chuàng)建Producer,并配置發(fā)送消息所必需的信息。
ONSFactoryProperty factoryInfo;
// 設(shè)置為您在消息隊(duì)列RocketMQ版控制臺創(chuàng)建的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
// 設(shè)置為您從消息隊(duì)列RocketMQ版控制臺獲取的接入點(diǎn)信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標(biāo)識,也不要用IP解析地址。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// 您在消息隊(duì)列RocketMQ版控制臺創(chuàng)建的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
// 消息內(nèi)容。
factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
/**
* 如果是使用公網(wǎng)接入點(diǎn)訪問,則必須設(shè)置AccessKey和SecretKey,里面填寫實(shí)例的用戶名和密碼。實(shí)例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務(wù)必區(qū)分開。
* 如果是在阿里云ECS內(nèi)網(wǎng)訪問,則無需配置,服務(wù)端會根據(jù)內(nèi)網(wǎng)VPC信息智能獲取。
* 如果實(shí)例類型為Serverless實(shí)例,公網(wǎng)訪問必須設(shè)置實(shí)例的用戶名密碼,當(dāng)開啟內(nèi)網(wǎng)免身份識別時,內(nèi)網(wǎng)訪問可以不設(shè)置用戶名和密碼。
*/
// 實(shí)例用戶名和密碼在消息隊(duì)列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// 注意!!!訪問RocketMQ 5.x實(shí)例時,InstanceID屬性不需要設(shè)置,否則會導(dǎo)致失敗。
// 創(chuàng)建Producer;
Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);
// 在發(fā)送消息前,必須調(diào)用start方法來啟動Producer,只需調(diào)用一次即可。
pProducer->start();
Message msg(
// Message Topic。
factoryInfo.getPublishTopics(),
// Message Tag,可理解為Gmail中的標(biāo)簽,對消息進(jìn)行再歸類,方便Consumer指定過濾條件在消息隊(duì)列RocketMQ版的服務(wù)器過濾。
"TagA",
// Message Body,不能為空,消息隊(duì)列RocketMQ版不做任何干預(yù),需要Producer與Consumer協(xié)商好一致的序列化和反序列化方式。
factoryInfo.getMessageContent()
);
// 設(shè)置代表消息的業(yè)務(wù)關(guān)鍵屬性,請盡可能全局唯一。
// 以方便您在無法正常收到消息情況下,可通過消息隊(duì)列RocketMQ版控制臺查詢消息并補(bǔ)發(fā)。
// 注意:不設(shè)置也不會影響消息正常收發(fā)。
msg.setKey("ORDERID_100");
// 發(fā)送消息,只要不拋出異常,就代表發(fā)送成功。
try
{
SendResultONS sendResult = pProducer->send(msg);
}
catch(ONSClientException & e)
{
// 自定義處理exception的細(xì)節(jié)。
}
// 在應(yīng)用退出前,必須銷毀Producer對象,否則會導(dǎo)致內(nèi)存泄露等問題。
pProducer->shutdown();
return 0;
}
訂閱普通消息
#include "ONSFactory.h"
#include <iostream>
#include <thread>
#include <mutex>
using namespace ons;
std::mutex console_mtx;
class ExampleMessageListener : public MessageListener {
public:
Action consume(Message& message, ConsumeContext& context) {
// 此處為具體的消息處理過程,確認(rèn)消息被處理成功請返回CommitMessage。
// 如果有消費(fèi)異常,或者期望重新消費(fèi),可以返回ReconsumeLater,消息將會在一段時間后重新投遞。
std::lock_guard<std::mutex> lk(console_mtx);
std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
<< message.getMsgID() << std::endl;
return CommitMessage;
}
};
int main(int argc, char* argv[]) {
std::cout << "=======Before consuming messages=======" << std::endl;
ONSFactoryProperty factoryInfo;
// 設(shè)置為您在消息隊(duì)列RocketMQ版控制臺創(chuàng)建的Group ID。從實(shí)例化的版本開始,ProducerId和CounsumerId已經(jīng)統(tǒng)一,此處設(shè)置是為了接口保持向前兼容。
factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");
/**
* 如果是使用公網(wǎng)接入點(diǎn)訪問,則必須設(shè)置AccessKey和SecretKey,里面填寫實(shí)例的用戶名和密碼。實(shí)例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務(wù)必區(qū)分開。
* 如果是在阿里云ECS內(nèi)網(wǎng)訪問,則無需配置,服務(wù)端會根據(jù)內(nèi)網(wǎng)VPC信息智能獲取。
* 如果實(shí)例類型為Serverless實(shí)例,公網(wǎng)訪問必須設(shè)置實(shí)例的用戶名密碼,當(dāng)開啟內(nèi)網(wǎng)免身份識別時,內(nèi)網(wǎng)訪問可以不設(shè)置用戶名和密碼。
*/
// 實(shí)例用戶名和密碼在消息隊(duì)列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// 設(shè)置為您從消息隊(duì)列RocketMQ版控制臺獲取的接入點(diǎn)信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標(biāo)識,也不要用IP解析地址。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// 注意!!!訪問RocketMQ 5.x實(shí)例時,InstanceID屬性不需要設(shè)置,否則會導(dǎo)致失敗。
PushConsumer *consumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);
// 設(shè)置為您在消息隊(duì)列RocketMQ版控制臺上創(chuàng)建的Topic。
const char* topic_1 = "topic-1";
// 訂閱topic-1中Tag消息屬性為tag-1的所有消息。
const char* tag_1 = "tag-1";
const char* topic_2 = "topic-2";
// 訂閱topic-2的所有消息。
const char* tag_2 = "*";
// 請注冊自定義偵聽函數(shù)用來處理接收到的消息,并返回響應(yīng)的處理結(jié)果。
ExampleMessageListener * message_listener = new ExampleMessageListener();
consumer->subscribe(topic_1, tag_1, message_listener);
consumer->subscribe(topic_2, tag_2, message_listener);
// 準(zhǔn)備工作完成,必須調(diào)用啟動函數(shù),才可以正常工作。
consumer->start();
// 請保持線程常駐,不要執(zhí)行shutdown操作。
std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
consumer->shutdown();
delete message_listener;
std::cout << "=======After consuming messages======" << std::endl;
return 0;
}
順序消息收發(fā)示例
發(fā)送順序消息
#include "ONSFactory.h"
#include "ONSClientException.h"
#include <iostream>
using namespace ons;
int main()
{
// Producer創(chuàng)建和正常工作的參數(shù),必須輸入。
ONSFactoryProperty factoryInfo;
// 設(shè)置為您在消息隊(duì)列RocketMQ版控制臺創(chuàng)建的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
// 設(shè)置為您從消息隊(duì)列RocketMQ版控制臺獲取的接入點(diǎn)信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標(biāo)識,也不要用IP解析地址。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// 您在消息隊(duì)列RocketMQ版控制臺創(chuàng)建的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
// 消息內(nèi)容。
factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
/**
* 如果是使用公網(wǎng)接入點(diǎn)訪問,則必須設(shè)置AccessKey和SecretKey,里面填寫實(shí)例的用戶名和密碼。實(shí)例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務(wù)必區(qū)分開。
* 如果是在阿里云ECS內(nèi)網(wǎng)訪問,則無需配置,服務(wù)端會根據(jù)內(nèi)網(wǎng)VPC信息智能獲取。
* 如果實(shí)例類型為Serverless實(shí)例,公網(wǎng)訪問必須設(shè)置實(shí)例的用戶名密碼,當(dāng)開啟內(nèi)網(wǎng)免身份識別時,內(nèi)網(wǎng)訪問可以不設(shè)置用戶名和密碼。
*/
// 實(shí)例用戶名和密碼在消息隊(duì)列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// 注意!!!訪問RocketMQ 5.x實(shí)例時,InstanceID屬性不需要設(shè)置,否則會導(dǎo)致失敗。
// 創(chuàng)建Producer。
OrderProducer *pProducer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);
// 在發(fā)送消息前,必須調(diào)用start方法來啟動Producer,只需調(diào)用一次即可。
pProducer->start();
Message msg(
// Message Topic。
factoryInfo.getPublishTopics(),
// Message Tag,可理解為Gmail中的標(biāo)簽,對消息進(jìn)行再歸類,方便Consumer指定過濾條件在消息隊(duì)列RocketMQ版的服務(wù)器過濾。
"TagA",
// Message Body,任何二進(jìn)制形式的數(shù)據(jù),消息隊(duì)列RocketMQ版不做任何干預(yù),需要Producer與Consumer協(xié)商好一致的序列化和反序列化方式。
factoryInfo.getMessageContent()
);
// 設(shè)置代表消息的業(yè)務(wù)屬性,請盡可能全局唯一。
// 以方便您在無法正常收到消息情況下,可通過消息隊(duì)列RocketMQ版控制臺查詢消息并補(bǔ)發(fā)。
// 注意:不設(shè)置也不會影響消息正常收發(fā)。
msg.setKey("ORDERID_100");
// 分區(qū)順序消息中區(qū)分不同分區(qū)的關(guān)鍵字段。
// 如果是全局順序消息,該字段可以設(shè)置為任意非空字符串。
std::string shardingKey = "abc";
// 帶有同一Sharding Key的消息會按照順序發(fā)送。
try
{
// 發(fā)送消息,只要不拋異常就是成功。
SendResultONS sendResult = pProducer->send(msg, shardingKey);
std::cout << "send success" << std::endl;
}
catch(ONSClientException & e)
{
// 添加對exception的處理。
}
// 在應(yīng)用退出前,必須銷毀Producer對象,否則會導(dǎo)致內(nèi)存泄露等問題。
pProducer->shutdown();
return 0;
}
訂閱順序消息
#include "ONSFactory.h"
using namespace std;
using namespace ons;
// 創(chuàng)建消費(fèi)消息的實(shí)例。
// pushConsumer拉取到消息后,會主動調(diào)用該實(shí)例的consumeMessage函數(shù)。
class ONSCLIENT_API MyMsgListener : public MessageOrderListener
{
public:
MyMsgListener()
{
}
virtual ~MyMsgListener()
{
}
virtual OrderAction consume(Message &message, ConsumeOrderContext &context)
{
// 根據(jù)業(yè)務(wù)需求,消費(fèi)消息。
return Success; //CONSUME_SUCCESS;
}
};
int main(int argc, char* argv[])
{
// OrderConsumer創(chuàng)建和工作需要的參數(shù),必須輸入。
ONSFactoryProperty factoryInfo;
// 設(shè)置為您在消息隊(duì)列RocketMQ版控制臺創(chuàng)建的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");
// 設(shè)置為您在消息隊(duì)列RocketMQ版控制臺上創(chuàng)建的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
/**
* 如果是使用公網(wǎng)接入點(diǎn)訪問,則必須設(shè)置AccessKey和SecretKey,里面填寫實(shí)例的用戶名和密碼。實(shí)例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務(wù)必區(qū)分開。
* 如果是在阿里云ECS內(nèi)網(wǎng)訪問,則無需配置,服務(wù)端會根據(jù)內(nèi)網(wǎng)VPC信息智能獲取。
* 如果實(shí)例類型為Serverless實(shí)例,公網(wǎng)訪問必須設(shè)置實(shí)例的用戶名密碼,當(dāng)開啟內(nèi)網(wǎng)免身份識別時,內(nèi)網(wǎng)訪問可以不設(shè)置用戶名和密碼。
*/
// 實(shí)例用戶名和密碼在消息隊(duì)列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// 設(shè)置為您從消息隊(duì)列RocketMQ版控制臺獲取的接入點(diǎn)信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標(biāo)識,也不要用IP解析地址。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// 注意!!!訪問RocketMQ 5.x實(shí)例時,InstanceID屬性不需要設(shè)置,否則會導(dǎo)致失敗。
// 創(chuàng)建orderConsumer。
OrderConsumer* orderConsumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
MyMsgListener msglistener;
// 指定orderConsumer訂閱的消息Topic和消息Tag。
orderConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );
// 注冊消息監(jiān)聽的處理實(shí)例,orderConsumer拉取到消息后,會調(diào)用該類的consumeMessage函數(shù)。
// 啟動orderConsumer。
orderConsumer->start();
for(volatile int i = 0; i < 10; ++i) {
// wait
}
// 銷毀orderConsumer, 在應(yīng)用退出前,必須銷毀Consumer對象,否則會導(dǎo)致內(nèi)存泄露等問題。
orderConsumer->shutdown();
return 0;
}
定時/延時消息收發(fā)示例
發(fā)送定時/延時消息
#include "ONSFactory.h"
#include "ONSClientException.h"
#include <windows.h>
using namespace ons;
int main()
{
// 創(chuàng)建Producer,并配置發(fā)送消息所必需的信息。
ONSFactoryProperty factoryInfo;
// 設(shè)置為您在消息隊(duì)列RocketMQ版控制臺創(chuàng)建的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
// 設(shè)置為您從消息隊(duì)列RocketMQ版控制臺獲取的接入點(diǎn)信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標(biāo)識,也不要用IP解析地址。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// 您在消息隊(duì)列RocketMQ版控制臺創(chuàng)建的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
// 消息內(nèi)容。
factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
/**
* 如果是使用公網(wǎng)接入點(diǎn)訪問,則必須設(shè)置AccessKey和SecretKey,里面填寫實(shí)例的用戶名和密碼。實(shí)例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務(wù)必區(qū)分開。
* 如果是在阿里云ECS內(nèi)網(wǎng)訪問,則無需配置,服務(wù)端會根據(jù)內(nèi)網(wǎng)VPC信息智能獲取。
* 如果實(shí)例類型為Serverless實(shí)例,公網(wǎng)訪問必須設(shè)置實(shí)例的用戶名密碼,當(dāng)開啟內(nèi)網(wǎng)免身份識別時,內(nèi)網(wǎng)訪問可以不設(shè)置用戶名和密碼。
*/
// 實(shí)例用戶名和密碼在消息隊(duì)列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// 注意!!!訪問RocketMQ 5.x實(shí)例時,InstanceID屬性不需要設(shè)置,否則會導(dǎo)致失敗。
// 創(chuàng)建Producer。
Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);
// 在發(fā)送消息前,必須調(diào)用start方法來啟動Producer,只需調(diào)用一次即可。
pProducer->start();
Message msg(
// Message Topic。
factoryInfo.getPublishTopics(),
// Message Tag,可理解為Gmail中的標(biāo)簽,對消息進(jìn)行再歸類,方便Consumer指定過濾條件在消息隊(duì)列RocketMQ版的服務(wù)器過濾
"TagA",
// Message Body,不能為空,消息隊(duì)列RocketMQ版不做任何干預(yù),需要Producer與Consumer協(xié)商好一致的序列化和反序列化方式。
factoryInfo.getMessageContent()
);
// 設(shè)置代表消息的業(yè)務(wù)關(guān)鍵屬性,請盡可能全局唯一。
// 以方便您在無法正常收到消息情況下,可通過消息隊(duì)列RocketMQ版控制臺查詢消息并補(bǔ)發(fā)。
// 注意:不設(shè)置也不會影響消息正常收發(fā)。
msg.setKey("ORDERID_100");
// deliver time單位ms,指定一個時刻,在這個時刻之后消息才能被消費(fèi),這個例子表示3s后才能被消費(fèi)。
long deliverTime = GetTickCount64() + 3000;
msg.setStartDeliverTime(deliverTime);
// 發(fā)送消息,只要不拋出異常,就代表發(fā)送成功。
try
{
SendResultONS sendResult = pProducer->send(msg);
}
catch(ONSClientException & e)
{
// 自定義處理exception的細(xì)節(jié)。
}
// 在應(yīng)用退出前,必須銷毀Producer對象,否則會導(dǎo)致內(nèi)存泄露等問題。
pProducer->shutdown();
return 0;
}
訂閱定時/延時消息
訂閱定時/延時消息的示例代碼和訂閱普通消息一樣,請參見訂閱普通消息。
事務(wù)消息收發(fā)示例
發(fā)送事務(wù)消息
發(fā)送半事務(wù)消息(Half Message)及執(zhí)行本地事務(wù),示例代碼如下。
#include "ONSFactory.h" #include "ONSClientException.h" using namespace ons; class MyLocalTransactionExecuter : LocalTransactionExecuter { MyLocalTransactionExecuter() { } ~MyLocalTransactionExecuter() { } virtual TransactionStatus execute(Message &value) { // 消息ID(有可能消息體一樣,但消息ID不一樣,當(dāng)前消息ID在消息隊(duì)列RocketMQ版控制臺無法查詢。) string msgId = value.getMsgID(); // 消息體內(nèi)容進(jìn)行crc32,也可以使用其它的如MD5。 // 消息ID和crc32id主要是用來防止消息重復(fù)。 // 如果業(yè)務(wù)本身是冪等的,可以忽略,否則需要利用msgId或crc32Id來做冪等。 // 如果要求消息絕對不重復(fù),推薦做法是對消息體body使用crc32或MD5來防止重復(fù)消息。 TransactionStatus transactionStatus = Unknow; try { boolean isCommit = 本地事務(wù)執(zhí)行結(jié)果; if (isCommit) { // 本地事務(wù)成功、提交消息。 transactionStatus = CommitTransaction; } else { // 本地事務(wù)失敗、回滾消息。 transactionStatus = RollbackTransaction; } } catch (...) { //exception handle } return transactionStatus; } } int main(int argc, char* argv[]) { //創(chuàng)建Producer和發(fā)送消息所必需的信息。 ONSFactoryProperty factoryInfo; // 設(shè)置為您在消息隊(duì)列RocketMQ版控制臺創(chuàng)建的Group ID。 factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX"); // 設(shè)置為您從消息隊(duì)列RocketMQ版控制臺獲取的接入點(diǎn)信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。 // 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標(biāo)識,也不要用IP解析地址。 factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT"); // 您在消息隊(duì)列RocketMQ版控制臺創(chuàng)建的Topic。 factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" ); // 消息內(nèi)容。 factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX"); /** * 如果是使用公網(wǎng)接入點(diǎn)訪問,則必須設(shè)置AccessKey和SecretKey,里面填寫實(shí)例的用戶名和密碼。實(shí)例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。 * 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務(wù)必區(qū)分開。 * 如果是在阿里云ECS內(nèi)網(wǎng)訪問,則無需配置,服務(wù)端會根據(jù)內(nèi)網(wǎng)VPC信息智能獲取。 * 如果實(shí)例類型為Serverless實(shí)例,公網(wǎng)訪問必須設(shè)置實(shí)例的用戶名密碼,當(dāng)開啟內(nèi)網(wǎng)免身份識別時,內(nèi)網(wǎng)訪問可以不設(shè)置用戶名和密碼。 */ // 實(shí)例用戶名和密碼在消息隊(duì)列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。 factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME"); factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" ); // 注意!!!訪問RocketMQ 5.x實(shí)例時,InstanceID屬性不需要設(shè)置,否則會導(dǎo)致失敗。 // 創(chuàng)建producer,消息隊(duì)列RocketMQ版不負(fù)責(zé)pChecker的釋放,需要業(yè)務(wù)方自行釋放資源。 MyLocalTransactionChecker *pChecker = new MyLocalTransactionChecker(); g_producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo,pChecker); // 在發(fā)送消息前,必須調(diào)用start方法來啟動Producer,只需調(diào)用一次即可。 pProducer->start(); Message msg( // Message Topic。 factoryInfo.getPublishTopics(), // Message Tag,可理解為Gmail中的標(biāo)簽,對消息進(jìn)行再歸類,方便Consumer指定過濾條件在消息隊(duì)列RocketMQ版的服務(wù)器過濾。 "TagA", // Message Body,不能為空,消息隊(duì)列RocketMQ版不做任何干預(yù),需要Producer與Consumer協(xié)商好一致的序列化和反序列化方式。 factoryInfo.getMessageContent() ); // 設(shè)置代表消息的業(yè)務(wù)關(guān)鍵屬性,請盡可能全局唯一。 // 以方便您在無法正常收到消息情況下,可通過消息隊(duì)列RocketMQ版控制臺查詢消息并補(bǔ)發(fā)。 // 注意:不設(shè)置也不會影響消息正常收發(fā)。 msg.setKey("ORDERID_100"); // 發(fā)送消息,只要不拋出異常,就代表發(fā)送成功。 try { //消息隊(duì)列RocketMQ版不負(fù)責(zé)pExecuter的釋放,需要業(yè)務(wù)方自行釋放資源。 MyLocalTransactionExecuter pExecuter = new MyLocalTransactionExecuter(); SendResultONS sendResult = pProducer->send(msg,pExecuter); } catch(ONSClientException & e) { // 自定義處理exception的細(xì)節(jié)。 } // 在應(yīng)用退出前,必須銷毀Producer對象,否則會導(dǎo)致內(nèi)存泄露等問題。 pProducer->shutdown(); return 0; }
提交事務(wù)消息狀態(tài),示例代碼如下。
class MyLocalTransactionChecker : LocalTransactionChecker { MyLocalTransactionChecker() { } ~MyLocalTransactionChecker() { } virtual TransactionStatus check(Message &value) { // 消息ID(有可能消息體一樣,但消息ID不一樣,當(dāng)前消息ID在消息隊(duì)列RocketMQ版控制臺無法查詢。) string msgId = value.getMsgID(); // 消息體內(nèi)容進(jìn)行crc32,也可以使用其它的如MD5。 // 消息ID和crc32id主要是用來防止消息重復(fù)。 // 如果業(yè)務(wù)本身是冪等的,可以忽略,否則需要利用msgId或crc32Id來做冪等。 // 如果要求消息絕對不重復(fù),推薦做法是對消息體body使用crc32或MD5來防止重復(fù)消息。 TransactionStatus transactionStatus = Unknow; try { boolean isCommit = 本地事務(wù)執(zhí)行結(jié)果; if (isCommit) { // 本地事務(wù)成功、提交消息。 transactionStatus = CommitTransaction; } else { // 本地事務(wù)失敗、回滾消息。 transactionStatus = RollbackTransaction; } } catch(...) { //exception error } return transactionStatus; } }
訂閱事務(wù)消息
訂閱事務(wù)消息的示例代碼和訂閱普通消息一樣,請參見訂閱普通消息。
文檔內(nèi)容是否對您有幫助?