本文介紹TCP協議下C++ SDK v3.x.x的版本信息,包括使用限制、版本的基本信息、環境要求、編譯說明以及和歷史版本相比各功能特性的變更內容。
使用限制
C++ SDK v3.x.x版本僅支持有命名空間的實例,若您使用的實例無命名空間,請勿將客戶端版本升級到C++ SDK v3.x.x。
5.x版本實例默認都有命名空間,4.x版本實例可在云消息隊列 RocketMQ 版控制臺實例詳情頁面的基礎信息區域查看是否有命名空間。
版本信息
發布時間 | 版本號 | 下載鏈接 |
2021-10-18 | v3.x.x |
環境要求
ONS-Client-CPP是基于Apache RocketMQ 5.0協議原生實現的開源客戶端開發軟件包。Apache RocketMQ 5.0新的通信協議基于gRPC(HTTP 2.0/Protobuf)實現,因此v3.0.0版本的C++ SDK也需要依賴于grpc/grpc實現,滿足以下依賴和工具鏈的要求。
依賴
依賴 | 版本 |
grpc/grpc | 1.39.0 |
fmt | 8.0.1 |
spdlog | 1.9.2 |
filesystem | 1.5.0 |
asio | 1.18.2 |
cpp_httplib | 0.9.4 |
protobuf | 3.17.2 |
工具鏈
操作系統 | 工具鏈版本 |
Linux、macOS | GCC 4.9或以上版本、Clang 3.4或以上版本 |
Windows 7或以上版本 | Visual Studio 2015或以上版本 |
C++ Standard
SDK使用了C++ 11語法,因此需要啟用C++ 11或以上標準。
編譯說明
開源代碼編譯說明
參考Bazel安裝指南安裝Bazel工具。
說明使用Bazel 4.x版本,需要安裝Python 3.x.x。
下載開源代碼并解壓。可通過以下兩種方式下載:
通過GitHub克隆源代碼:執行
git clone https://github.com/aliyun-mq/ons-client-cpp.git
命令。本地下載:下載鏈接請參見版本信息。
在項目文件夾內執行以下命令,Bazel將自動下載所有第三方依賴。
bazel -c opt //dist/...
輸出示例如下:
INFO: From Action dist/libons_library.pic.a: starting to run shell INFO: Elapsed time: 39.480s, Critical Path: 38.89s INFO: 2044 processes: 1796 remote cache hit, 241 internal, 7 processwrapper-sandbox. INFO: Build completed successfully, 2044 total actions
編譯完成后,合并好的靜態庫在bazel-bin/dist/ons-dist.tar.gz文件內。
root@a36849cf2f24:~/ons-client-cpp# ls -lah bazel-bin/dist/ons-dist.tar.gz -r-xr-xr-x 1 root root 15M Oct 14 08:03 bazel-bin/dist/ons-dist.tar.gz
CentOS 7編譯說明
CentOS 7.x默認安裝GCC 4.8.5,不滿足工具鏈的要求。因此您需要安裝devtoolset-4,devtoolset-4提供的工具鏈版本為GCC 5.3.1。
wget https://copr.fedorainfracloud.org/coprs/vbatts/bazel/repo/epel-7/vbatts-bazel-epel-7.repo
cp vbatts-bazel-epel-7.repo /etc/yum.repos.d/
yum install devtoolset-4-gcc devtoolset-4-gcc-c++ bazel4 python3 git -y
scl enable devtoolset-4 bash
unlink /usr/bin/python && ln -s /usr/bin/python3 /usr/bin/python
git clone git@github.com:aliyun-mq/ons-client-cpp.git
cd ons-client-cpp && bazel build //dist/...
功能變更
順序消息
順序消息的最大重試次數MaxReconsumeTimes參數的默認值從Integer.MAX變更為16次。超過最大重試次數消息還未被消費成功將直接被投遞至死信隊列。您可以通過自定義MaxReconsumeTimes參數值修改順序消息的最大重試次數。
廣播消費
廣播消費模式下,支持使用offsetStore
接口的方式定制消費者啟動時的消費位點。若未設置,默認和歷史版本一致直接從最新消費位點開始消費。
示例代碼如下:
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>
#include "ons/MessageModel.h"
#include "ons/ONSFactory.h"
#include "rocketmq/Logger.h"
using namespace std;
using namespace ons;
std::mutex console_mtx;
class ExampleMessageListener : public MessageListener {
public:
Action consume(const Message& message, ConsumeContext& context) noexcept override {
std::lock_guard<std::mutex> lk(console_mtx);
auto latency = std::chrono::system_clock::now() - message.getStoreTimestamp();
auto latency2 = std::chrono::system_clock::now() - message.getBornTimestamp();
std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: " << message.getMsgID()
<< ", Body-size: " << message.getBody().size()
<< ", Current - Store-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency).count()
<< "ms, Current - Born-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency2).count()
<< "ms" << std::endl;
return Action::CommitMessage;
}
};
int main(int argc, char* argv[]) {
auto& logger = rocketmq::getLogger();
logger.setLevel(rocketmq::Level::Debug);
logger.init();
std::cout << "=======Before consuming messages=======" << std::endl;
ONSFactoryProperty factory_property;
//從OffsetStore讀取消費位點的功能僅支持廣播消費模式。
factory_property.setMessageModel(ONS_NAMESPACE::MessageModel::BROADCASTING);
factory_property.setFactoryProperty(ons::ONSFactoryProperty::GroupId, "GID_cpp_sdk_standard");
PushConsumer* consumer = ONSFactory::getInstance()->createPushConsumer(factory_property);
const char* topic = "cpp_sdk_standard";
const char* tag = "*";
// register your own listener here to handle the messages received.
auto* messageListener = new ExampleMessageListener();
consumer->subscribe(topic, tag);
consumer->registerMessageListener(messageListener);
// Start this consumer
consumer->start();
// Keep main thread running until process finished.
std::this_thread::sleep_for(std::chrono::minutes(15));
consumer->shutdown();
std::cout << "=======After consuming messages======" << std::endl;
return 0;
}
Push消費
如果設置的消費線程數不在合法區間[1,1000]內,系統會在創建消費者時拋出異常,而不是在啟動消費者時拋出異常。
新增消費速度限流功能。為了避免消息洪峰可能對消費端應用產生沖擊,您可通過該功能限制消息的消費速度,保護消費端應用。
說明順序消息的消息重試不受限流控制。
消費限流示例代碼如下:
#include <chrono> #include <iostream> #include <mutex> #include <thread> #include "ons/MessageModel.h" #include "ons/ONSFactory.h" #include "rocketmq/Logger.h" using namespace std; using namespace ons; std::mutex console_mtx; class ExampleMessageListener : public MessageListener { public: Action consume(const Message& message, ConsumeContext& context) noexcept override { std::lock_guard<std::mutex> lk(console_mtx); auto latency = std::chrono::system_clock::now() - message.getStoreTimestamp(); auto latency2 = std::chrono::system_clock::now() - message.getBornTimestamp(); std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: " << message.getMsgID() << ", Body-size: " << message.getBody().size() << ", Tag: " << message.getTag() << ", Current - Store-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency).count() << "ms, Current - Born-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency2).count() << "ms" << std::endl; return Action::CommitMessage; } }; int main(int argc, char* argv[]) { auto& logger = rocketmq::getLogger(); logger.setLevel(rocketmq::Level::Debug); logger.init(); const char* topic = "cpp_sdk_standard"; const char* tag = "*"; std::cout << "=======Before consuming messages=======" << std::endl; ONSFactoryProperty factory_property; factory_property.setFactoryProperty(ons::ONSFactoryProperty::GroupId, "GID_cpp_sdk_standard"); // Client-side throttling. factory_property.throttle(topic, 16); PushConsumer* consumer = ONSFactory::getInstance()->createPushConsumer(factory_property); // register your own listener here to handle the messages received. auto* messageListener = new ExampleMessageListener(); consumer->subscribe(topic, tag); consumer->registerMessageListener(messageListener); // Start this consumer. consumer->start(); // Keep main thread running until process finished. std::this_thread::sleep_for(std::chrono::minutes(15)); consumer->shutdown(); std::cout << "=======After consuming messages======" << std::endl; return 0; }
消息軌跡
參數 | 說明 |
AccessKey | 您的阿里云賬號或RAM用戶的AccessKey ID,用于標識用戶。當您通過SDK或API調用云消息隊列 RocketMQ 版資源時,需要使用AccessKey ID進行身份驗證。 |
到達Server | 消息到達消息隊列RocketMQ版服務端的時間。 |
預設DeliverAt | 定時消息的預計投遞時間。 |
實際AvailableAt | 定時消息定時結束的時間。即消息可被消費者消費的開始時間。 |
Available Time | 消息可被消費者消費的開始時間。 |
提交/回滾時間 | 事務消息提交或回滾的時間。 |
到達消費端 | 消息到達消費者客戶端的時間。 |
等待處理耗時 | 消息到達消費者客戶端,等待線程池分配線程和分配處理資源的耗時。 |
API接口變更
日志默認路徑由~/logs/rocketmqlogs/ons.log變更為~/logs/rocketmqlogs/ons.log。
枚舉Action從全局命名空間移動到ons命名空間下。
頭文件都存放到/ons路徑下。
Message#getStartDeliverTime參數返回值,從int64_t修改為std::chrono::system_clock::timepoint或std::chrono::milliseconds。
刪除了函數的throws聲明,該聲明在C++ 11標準下已不支持。
Producer
類提供noexcept
接口,用于禁用異常場景使用。枚舉類型都轉變為
namespace enum
,即enum class Type
。
SDK常見問題
在同一個進程內,新舊版本的SDK是否可以共存?會不會有符號沖突?
新版本SDK的預編譯靜態庫,符號在默認的命名空間ons下,會與歷史版本的符號沖突。若要實現同進程兩個版本兼容,您可以自行從源碼編譯,只要保證在編譯過程中,定義
ONS_NAMESPACE
這個宏為非ons的值即可。Bazel提供多種方式定義宏,可以通過.bazelrc、cc_library規則中的defines屬性定義或
cc_library#copts
屬性定義。調試代碼時,怎么編譯一個帶符號表的靜態庫?
我已經使用了Protobuf依賴,跟你們要求的依賴版本不一致,怎么解決依賴沖突?
ONS-Client-CPP對第三方采用源碼依賴的形式,您只需要調整ONS-Client-CPP項目的依賴版本與您自身的依賴一致即可。
ONS-Client-CPP依賴了RocketMQ-Client-CPP,您需要fork apache/rocketmq-client-cpp倉庫,將依賴描述文件ons-client-cpp/bazel/deps.bzl中的依賴指向地址修改為fork的地址。
本地使用了HTTP代理,并聲明了
http_proxy
、grpc_proxy
等環境變量,發現發送消息有很多超時,是什么原因?SDK基于gRPC實現,支持http_proxy、https_proxy和grpc_proxy等方式配置代理。如果不需要proxy,可以配置no_grpc_proxy或no_proxy環境變量。忽略代理站點,請參見gRPC環境變量列表說明。
我們項目使用的是C++ 98、C++ 03標準,能支持么?
不能。由于gRPC和Protobuf是目前項目的核心協議和依賴,本項目也依據這兩個依賴的標準,不再支持C++ 98和C++ 03標準。