日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

示例代碼

云消息隊(duì)列 RocketMQ 版5.x版本實(shí)例支持C++ 1.x SDK客戶端接入,您可以使用1.x版本的SDK接入5.x實(shí)例進(jìn)行消息收發(fā)。本文為您介紹1.x版本下的C++ SDK消息收發(fā)示例代碼。

重要
  • 推薦您使用最新的RocketMQ 5.x系列SDK,5.x系列SDK作為主力研發(fā)版本,和云消息隊(duì)列 RocketMQ 版5.x服務(wù)端完全兼容,提供了更全面的功能并支持更多增強(qiáng)特性。更多信息,請參見5.x系列SDK
  • RocketMQ 4.x/3.x系列SDK和ONS系列SDK后續(xù)僅做功能維護(hù),建議僅存量業(yè)務(wù)使用。

普通消息收發(fā)示例

發(fā)送普通消息

#include "ONSFactory.h"
#include "ONSClientException.h"

using namespace ons;

int main()
{

    // 創(chuàng)建Producer,并配置發(fā)送消息所必需的信息。
    ONSFactoryProperty factoryInfo;  
    // 設(shè)置為您在消息隊(duì)列RocketMQ版控制臺創(chuàng)建的Group ID。 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
    // 設(shè)置為您從消息隊(duì)列RocketMQ版控制臺獲取的接入點(diǎn)信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標(biāo)識,也不要用IP解析地址。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT"); 
    // 您在消息隊(duì)列RocketMQ版控制臺創(chuàng)建的Topic。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
    // 消息內(nèi)容。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
    /**
    * 如果是使用公網(wǎng)接入點(diǎn)訪問,則必須設(shè)置AccessKey和SecretKey,里面填寫實(shí)例的用戶名和密碼。實(shí)例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
    * 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務(wù)必區(qū)分開。
    * 如果是在阿里云ECS內(nèi)網(wǎng)訪問,則無需配置,服務(wù)端會根據(jù)內(nèi)網(wǎng)VPC信息智能獲取。
    * 如果實(shí)例類型為Serverless實(shí)例,公網(wǎng)訪問必須設(shè)置實(shí)例的用戶名密碼,當(dāng)開啟內(nèi)網(wǎng)免身份識別時,內(nèi)網(wǎng)訪問可以不設(shè)置用戶名和密碼。
    */  
    // 實(shí)例用戶名和密碼在消息隊(duì)列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
    // 注意!!!訪問RocketMQ 5.x實(shí)例時,InstanceID屬性不需要設(shè)置,否則會導(dǎo)致失敗。

    // 創(chuàng)建Producer;
    Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);

    // 在發(fā)送消息前,必須調(diào)用start方法來啟動Producer,只需調(diào)用一次即可。
    pProducer->start();

    Message msg(
            // Message Topic。
            factoryInfo.getPublishTopics(),
            // Message Tag,可理解為Gmail中的標(biāo)簽,對消息進(jìn)行再歸類,方便Consumer指定過濾條件在消息隊(duì)列RocketMQ版的服務(wù)器過濾。       
            "TagA",
            // Message Body,不能為空,消息隊(duì)列RocketMQ版不做任何干預(yù),需要Producer與Consumer協(xié)商好一致的序列化和反序列化方式。
            factoryInfo.getMessageContent()
    );

    // 設(shè)置代表消息的業(yè)務(wù)關(guān)鍵屬性,請盡可能全局唯一。
    // 以方便您在無法正常收到消息情況下,可通過消息隊(duì)列RocketMQ版控制臺查詢消息并補(bǔ)發(fā)。
    // 注意:不設(shè)置也不會影響消息正常收發(fā)。
    msg.setKey("ORDERID_100");

    // 發(fā)送消息,只要不拋出異常,就代表發(fā)送成功。     
    try
    {
        SendResultONS sendResult = pProducer->send(msg);
    }
    catch(ONSClientException & e)
    {
        // 自定義處理exception的細(xì)節(jié)。
    }
    // 在應(yīng)用退出前,必須銷毀Producer對象,否則會導(dǎo)致內(nèi)存泄露等問題。
    pProducer->shutdown();

    return 0;
}

訂閱普通消息

#include "ONSFactory.h"

#include <iostream>
#include <thread>
#include <mutex>

using namespace ons;

std::mutex console_mtx;

class ExampleMessageListener : public MessageListener {
public:
    Action consume(Message& message, ConsumeContext& context) {
        // 此處為具體的消息處理過程,確認(rèn)消息被處理成功請返回CommitMessage。
        // 如果有消費(fèi)異常,或者期望重新消費(fèi),可以返回ReconsumeLater,消息將會在一段時間后重新投遞。
        std::lock_guard<std::mutex> lk(console_mtx);
        std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
        << message.getMsgID() << std::endl;
        return CommitMessage;
    }
};

int main(int argc, char* argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;
    ONSFactoryProperty factoryInfo;
    // 設(shè)置為您在消息隊(duì)列RocketMQ版控制臺創(chuàng)建的Group ID。從實(shí)例化的版本開始,ProducerId和CounsumerId已經(jīng)統(tǒng)一,此處設(shè)置是為了接口保持向前兼容。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");
    /**
    * 如果是使用公網(wǎng)接入點(diǎn)訪問,則必須設(shè)置AccessKey和SecretKey,里面填寫實(shí)例的用戶名和密碼。實(shí)例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
    * 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務(wù)必區(qū)分開。
    * 如果是在阿里云ECS內(nèi)網(wǎng)訪問,則無需配置,服務(wù)端會根據(jù)內(nèi)網(wǎng)VPC信息智能獲取。
    * 如果實(shí)例類型為Serverless實(shí)例,公網(wǎng)訪問必須設(shè)置實(shí)例的用戶名密碼,當(dāng)開啟內(nèi)網(wǎng)免身份識別時,內(nèi)網(wǎng)訪問可以不設(shè)置用戶名和密碼。
    */
    // 實(shí)例用戶名和密碼在消息隊(duì)列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
    // 設(shè)置為您從消息隊(duì)列RocketMQ版控制臺獲取的接入點(diǎn)信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標(biāo)識,也不要用IP解析地址。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
    // 注意!!!訪問RocketMQ 5.x實(shí)例時,InstanceID屬性不需要設(shè)置,否則會導(dǎo)致失敗。

    PushConsumer *consumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);

    // 設(shè)置為您在消息隊(duì)列RocketMQ版控制臺上創(chuàng)建的Topic。
    const char* topic_1 = "topic-1";
    // 訂閱topic-1中Tag消息屬性為tag-1的所有消息。
    const char* tag_1 = "tag-1";

    const char* topic_2 = "topic-2";
    // 訂閱topic-2的所有消息。
    const char* tag_2 = "*";


    // 請注冊自定義偵聽函數(shù)用來處理接收到的消息,并返回響應(yīng)的處理結(jié)果。
    ExampleMessageListener * message_listener = new ExampleMessageListener();
    consumer->subscribe(topic_1, tag_1, message_listener);
    consumer->subscribe(topic_2, tag_2, message_listener);

    // 準(zhǔn)備工作完成,必須調(diào)用啟動函數(shù),才可以正常工作。
    consumer->start();

    // 請保持線程常駐,不要執(zhí)行shutdown操作。
    std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
    consumer->shutdown();
    delete message_listener;
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

順序消息收發(fā)示例

發(fā)送順序消息

#include "ONSFactory.h"
#include "ONSClientException.h"
#include <iostream>
using namespace ons;

int main()
{
    // Producer創(chuàng)建和正常工作的參數(shù),必須輸入。
    ONSFactoryProperty factoryInfo;
    // 設(shè)置為您在消息隊(duì)列RocketMQ版控制臺創(chuàng)建的Group ID。 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
    // 設(shè)置為您從消息隊(duì)列RocketMQ版控制臺獲取的接入點(diǎn)信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標(biāo)識,也不要用IP解析地址。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT"); 
    // 您在消息隊(duì)列RocketMQ版控制臺創(chuàng)建的Topic。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
    // 消息內(nèi)容。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
    /**
    * 如果是使用公網(wǎng)接入點(diǎn)訪問,則必須設(shè)置AccessKey和SecretKey,里面填寫實(shí)例的用戶名和密碼。實(shí)例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
    * 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務(wù)必區(qū)分開。
    * 如果是在阿里云ECS內(nèi)網(wǎng)訪問,則無需配置,服務(wù)端會根據(jù)內(nèi)網(wǎng)VPC信息智能獲取。
    * 如果實(shí)例類型為Serverless實(shí)例,公網(wǎng)訪問必須設(shè)置實(shí)例的用戶名密碼,當(dāng)開啟內(nèi)網(wǎng)免身份識別時,內(nèi)網(wǎng)訪問可以不設(shè)置用戶名和密碼。
    */  
    // 實(shí)例用戶名和密碼在消息隊(duì)列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
    // 注意!!!訪問RocketMQ 5.x實(shí)例時,InstanceID屬性不需要設(shè)置,否則會導(dǎo)致失敗。
    
    // 創(chuàng)建Producer。
    OrderProducer *pProducer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);


    // 在發(fā)送消息前,必須調(diào)用start方法來啟動Producer,只需調(diào)用一次即可。
    pProducer->start();

    Message msg(
                // Message Topic。
                factoryInfo.getPublishTopics(),
                // Message Tag,可理解為Gmail中的標(biāo)簽,對消息進(jìn)行再歸類,方便Consumer指定過濾條件在消息隊(duì)列RocketMQ版的服務(wù)器過濾。
                "TagA",
                // Message Body,任何二進(jìn)制形式的數(shù)據(jù),消息隊(duì)列RocketMQ版不做任何干預(yù),需要Producer與Consumer協(xié)商好一致的序列化和反序列化方式。
                factoryInfo.getMessageContent()
    );

    // 設(shè)置代表消息的業(yè)務(wù)屬性,請盡可能全局唯一。
    // 以方便您在無法正常收到消息情況下,可通過消息隊(duì)列RocketMQ版控制臺查詢消息并補(bǔ)發(fā)。
    // 注意:不設(shè)置也不會影響消息正常收發(fā)。
    msg.setKey("ORDERID_100");
    // 分區(qū)順序消息中區(qū)分不同分區(qū)的關(guān)鍵字段。
    // 如果是全局順序消息,該字段可以設(shè)置為任意非空字符串。
    std::string shardingKey = "abc";  
    // 帶有同一Sharding Key的消息會按照順序發(fā)送。
    try
    {
        // 發(fā)送消息,只要不拋異常就是成功。
        SendResultONS sendResult = pProducer->send(msg, shardingKey);
    std::cout << "send success" << std::endl;
    }
    catch(ONSClientException & e)
    {
        // 添加對exception的處理。
    }
    // 在應(yīng)用退出前,必須銷毀Producer對象,否則會導(dǎo)致內(nèi)存泄露等問題。
    pProducer->shutdown();

    return 0;
}           

訂閱順序消息

#include "ONSFactory.h"
using namespace std;
using namespace ons;

// 創(chuàng)建消費(fèi)消息的實(shí)例。
// pushConsumer拉取到消息后,會主動調(diào)用該實(shí)例的consumeMessage函數(shù)。
class ONSCLIENT_API MyMsgListener : public MessageOrderListener
{
public:
    MyMsgListener()
    {
    }
    virtual ~MyMsgListener()
    {
    }

    virtual OrderAction consume(Message &message, ConsumeOrderContext &context)
    {
        // 根據(jù)業(yè)務(wù)需求,消費(fèi)消息。
        return Success; //CONSUME_SUCCESS;
    }
};


int main(int argc, char* argv[])
{
    // OrderConsumer創(chuàng)建和工作需要的參數(shù),必須輸入。
    ONSFactoryProperty factoryInfo;
    // 設(shè)置為您在消息隊(duì)列RocketMQ版控制臺創(chuàng)建的Group ID。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");
    // 設(shè)置為您在消息隊(duì)列RocketMQ版控制臺上創(chuàng)建的Topic。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
    /**
    * 如果是使用公網(wǎng)接入點(diǎn)訪問,則必須設(shè)置AccessKey和SecretKey,里面填寫實(shí)例的用戶名和密碼。實(shí)例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
    * 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務(wù)必區(qū)分開。
    * 如果是在阿里云ECS內(nèi)網(wǎng)訪問,則無需配置,服務(wù)端會根據(jù)內(nèi)網(wǎng)VPC信息智能獲取。
    * 如果實(shí)例類型為Serverless實(shí)例,公網(wǎng)訪問必須設(shè)置實(shí)例的用戶名密碼,當(dāng)開啟內(nèi)網(wǎng)免身份識別時,內(nèi)網(wǎng)訪問可以不設(shè)置用戶名和密碼。
    */  
    // 實(shí)例用戶名和密碼在消息隊(duì)列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
    // 設(shè)置為您從消息隊(duì)列RocketMQ版控制臺獲取的接入點(diǎn)信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標(biāo)識,也不要用IP解析地址。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
    // 注意!!!訪問RocketMQ 5.x實(shí)例時,InstanceID屬性不需要設(shè)置,否則會導(dǎo)致失敗。

    // 創(chuàng)建orderConsumer。
    OrderConsumer* orderConsumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
    MyMsgListener  msglistener;
    // 指定orderConsumer訂閱的消息Topic和消息Tag。
    orderConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );

    // 注冊消息監(jiān)聽的處理實(shí)例,orderConsumer拉取到消息后,會調(diào)用該類的consumeMessage函數(shù)。

    // 啟動orderConsumer。
    orderConsumer->start();

    for(volatile int i = 0; i < 10; ++i) {
        // wait
    }

    // 銷毀orderConsumer, 在應(yīng)用退出前,必須銷毀Consumer對象,否則會導(dǎo)致內(nèi)存泄露等問題。
    orderConsumer->shutdown();

   return 0;
}        

定時/延時消息收發(fā)示例

發(fā)送定時/延時消息

#include "ONSFactory.h"
#include "ONSClientException.h"

#include <windows.h>
using namespace ons;
int main()
{

    // 創(chuàng)建Producer,并配置發(fā)送消息所必需的信息。
    ONSFactoryProperty factoryInfo;
    // 設(shè)置為您在消息隊(duì)列RocketMQ版控制臺創(chuàng)建的Group ID。 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
    // 設(shè)置為您從消息隊(duì)列RocketMQ版控制臺獲取的接入點(diǎn)信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標(biāo)識,也不要用IP解析地址。 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT"); 
    // 您在消息隊(duì)列RocketMQ版控制臺創(chuàng)建的Topic。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
    // 消息內(nèi)容。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
    /**
    * 如果是使用公網(wǎng)接入點(diǎn)訪問,則必須設(shè)置AccessKey和SecretKey,里面填寫實(shí)例的用戶名和密碼。實(shí)例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
    * 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務(wù)必區(qū)分開。
    * 如果是在阿里云ECS內(nèi)網(wǎng)訪問,則無需配置,服務(wù)端會根據(jù)內(nèi)網(wǎng)VPC信息智能獲取。
    * 如果實(shí)例類型為Serverless實(shí)例,公網(wǎng)訪問必須設(shè)置實(shí)例的用戶名密碼,當(dāng)開啟內(nèi)網(wǎng)免身份識別時,內(nèi)網(wǎng)訪問可以不設(shè)置用戶名和密碼。
    */  
    // 實(shí)例用戶名和密碼在消息隊(duì)列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
    // 注意!!!訪問RocketMQ 5.x實(shí)例時,InstanceID屬性不需要設(shè)置,否則會導(dǎo)致失敗。

    // 創(chuàng)建Producer。
    Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);

    // 在發(fā)送消息前,必須調(diào)用start方法來啟動Producer,只需調(diào)用一次即可。
    pProducer->start();

    Message msg(
            // Message Topic。
            factoryInfo.getPublishTopics(),
            // Message Tag,可理解為Gmail中的標(biāo)簽,對消息進(jìn)行再歸類,方便Consumer指定過濾條件在消息隊(duì)列RocketMQ版的服務(wù)器過濾       
            "TagA",
            // Message Body,不能為空,消息隊(duì)列RocketMQ版不做任何干預(yù),需要Producer與Consumer協(xié)商好一致的序列化和反序列化方式。
            factoryInfo.getMessageContent()
    );

    // 設(shè)置代表消息的業(yè)務(wù)關(guān)鍵屬性,請盡可能全局唯一。
    // 以方便您在無法正常收到消息情況下,可通過消息隊(duì)列RocketMQ版控制臺查詢消息并補(bǔ)發(fā)。
    // 注意:不設(shè)置也不會影響消息正常收發(fā)。
    msg.setKey("ORDERID_100");

    // deliver time單位ms,指定一個時刻,在這個時刻之后消息才能被消費(fèi),這個例子表示3s后才能被消費(fèi)。
    long deliverTime = GetTickCount64() + 3000;
    msg.setStartDeliverTime(deliverTime);

    // 發(fā)送消息,只要不拋出異常,就代表發(fā)送成功。     
    try
    {
        SendResultONS sendResult = pProducer->send(msg);
    }
    catch(ONSClientException & e)
    {
        // 自定義處理exception的細(xì)節(jié)。
    }

    // 在應(yīng)用退出前,必須銷毀Producer對象,否則會導(dǎo)致內(nèi)存泄露等問題。
    pProducer->shutdown();

    return 0;
}
            

訂閱定時/延時消息

訂閱定時/延時消息的示例代碼和訂閱普通消息一樣,請參見訂閱普通消息

事務(wù)消息收發(fā)示例

發(fā)送事務(wù)消息

  1. 發(fā)送半事務(wù)消息(Half Message)及執(zhí)行本地事務(wù),示例代碼如下。

    #include "ONSFactory.h"
    #include "ONSClientException.h"
    using namespace ons;
    
        class MyLocalTransactionExecuter : LocalTransactionExecuter
        {
            MyLocalTransactionExecuter()
            {
            }
    
            ~MyLocalTransactionExecuter()
            {
            }
            virtual TransactionStatus execute(Message &value)
            {
                    // 消息ID(有可能消息體一樣,但消息ID不一樣,當(dāng)前消息ID在消息隊(duì)列RocketMQ版控制臺無法查詢。)
                    string msgId = value.getMsgID();
                    // 消息體內(nèi)容進(jìn)行crc32,也可以使用其它的如MD5。
                    // 消息ID和crc32id主要是用來防止消息重復(fù)。
                    // 如果業(yè)務(wù)本身是冪等的,可以忽略,否則需要利用msgId或crc32Id來做冪等。
                    // 如果要求消息絕對不重復(fù),推薦做法是對消息體body使用crc32或MD5來防止重復(fù)消息。
                    TransactionStatus transactionStatus = Unknow;
                    try {
                        boolean isCommit = 本地事務(wù)執(zhí)行結(jié)果;
                        if (isCommit) {
                            // 本地事務(wù)成功、提交消息。
                            transactionStatus = CommitTransaction;
                        } else {
                            // 本地事務(wù)失敗、回滾消息。
                            transactionStatus = RollbackTransaction;
                        }
                    } catch (...) {
                        //exception handle
                    }
                    return transactionStatus;
            }
        }
    
        int main(int argc, char* argv[])
        {
            //創(chuàng)建Producer和發(fā)送消息所必需的信息。
            ONSFactoryProperty factoryInfo;
            // 設(shè)置為您在消息隊(duì)列RocketMQ版控制臺創(chuàng)建的Group ID。 
            factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
            // 設(shè)置為您從消息隊(duì)列RocketMQ版控制臺獲取的接入點(diǎn)信息,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
            // 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標(biāo)識,也不要用IP解析地址。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT"); 
            // 您在消息隊(duì)列RocketMQ版控制臺創(chuàng)建的Topic。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
            // 消息內(nèi)容。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
            /**
            * 如果是使用公網(wǎng)接入點(diǎn)訪問,則必須設(shè)置AccessKey和SecretKey,里面填寫實(shí)例的用戶名和密碼。實(shí)例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
            * 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務(wù)必區(qū)分開。
            * 如果是在阿里云ECS內(nèi)網(wǎng)訪問,則無需配置,服務(wù)端會根據(jù)內(nèi)網(wǎng)VPC信息智能獲取。
            * 如果實(shí)例類型為Serverless實(shí)例,公網(wǎng)訪問必須設(shè)置實(shí)例的用戶名密碼,當(dāng)開啟內(nèi)網(wǎng)免身份識別時,內(nèi)網(wǎng)訪問可以不設(shè)置用戶名和密碼。
            */  
            // 實(shí)例用戶名和密碼在消息隊(duì)列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
            factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
            // 注意!!!訪問RocketMQ 5.x實(shí)例時,InstanceID屬性不需要設(shè)置,否則會導(dǎo)致失敗。
    
            // 創(chuàng)建producer,消息隊(duì)列RocketMQ版不負(fù)責(zé)pChecker的釋放,需要業(yè)務(wù)方自行釋放資源。
            MyLocalTransactionChecker *pChecker = new MyLocalTransactionChecker();
            g_producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo,pChecker);
    
            // 在發(fā)送消息前,必須調(diào)用start方法來啟動Producer,只需調(diào)用一次即可。
            pProducer->start();
    
            Message msg(
                // Message Topic。
                factoryInfo.getPublishTopics(),
                // Message Tag,可理解為Gmail中的標(biāo)簽,對消息進(jìn)行再歸類,方便Consumer指定過濾條件在消息隊(duì)列RocketMQ版的服務(wù)器過濾。       
                "TagA",
                // Message Body,不能為空,消息隊(duì)列RocketMQ版不做任何干預(yù),需要Producer與Consumer協(xié)商好一致的序列化和反序列化方式。
                factoryInfo.getMessageContent()
            );
    
            // 設(shè)置代表消息的業(yè)務(wù)關(guān)鍵屬性,請盡可能全局唯一。
            // 以方便您在無法正常收到消息情況下,可通過消息隊(duì)列RocketMQ版控制臺查詢消息并補(bǔ)發(fā)。
            // 注意:不設(shè)置也不會影響消息正常收發(fā)。
            msg.setKey("ORDERID_100");
    
            // 發(fā)送消息,只要不拋出異常,就代表發(fā)送成功。   
            try
            {
                //消息隊(duì)列RocketMQ版不負(fù)責(zé)pExecuter的釋放,需要業(yè)務(wù)方自行釋放資源。
                MyLocalTransactionExecuter pExecuter = new MyLocalTransactionExecuter();
                SendResultONS sendResult = pProducer->send(msg,pExecuter);
            }
            catch(ONSClientException & e)
            {
                // 自定義處理exception的細(xì)節(jié)。
            }
            // 在應(yīng)用退出前,必須銷毀Producer對象,否則會導(dǎo)致內(nèi)存泄露等問題。
            pProducer->shutdown();
    
            return 0;
    
        }        
  2. 提交事務(wù)消息狀態(tài),示例代碼如下。

     class MyLocalTransactionChecker : LocalTransactionChecker
     {
         MyLocalTransactionChecker()
         {
         }
    
         ~MyLocalTransactionChecker()
         {
         }
    
         virtual TransactionStatus check(Message &value)
         {
             // 消息ID(有可能消息體一樣,但消息ID不一樣,當(dāng)前消息ID在消息隊(duì)列RocketMQ版控制臺無法查詢。)
             string msgId = value.getMsgID();
             // 消息體內(nèi)容進(jìn)行crc32,也可以使用其它的如MD5。
             // 消息ID和crc32id主要是用來防止消息重復(fù)。
             // 如果業(yè)務(wù)本身是冪等的,可以忽略,否則需要利用msgId或crc32Id來做冪等。
             // 如果要求消息絕對不重復(fù),推薦做法是對消息體body使用crc32或MD5來防止重復(fù)消息。
             TransactionStatus transactionStatus = Unknow;
             try {
                 boolean isCommit = 本地事務(wù)執(zhí)行結(jié)果;
                 if (isCommit) {
                     // 本地事務(wù)成功、提交消息。
                     transactionStatus = CommitTransaction;
                 } else {
                     // 本地事務(wù)失敗、回滾消息。
                     transactionStatus = RollbackTransaction;
                 }
             } catch(...) {
                 //exception error
             }
             return transactionStatus;
         }
     }               

訂閱事務(wù)消息

訂閱事務(wù)消息的示例代碼和訂閱普通消息一樣,請參見訂閱普通消息