本文介紹TCP協議下Java SDK 2.x.x.Final的版本說明,包括使用限制、版本的基本信息、環境要求以及和歷史版本相比各功能特性的變更內容。
使用限制
目前僅華東1(杭州)、華北1(青島)、華北2(北京)、華北3(張家口)、華北5(呼和浩特)、華南1(深圳)、西南1(成都)、中國香港、德國(法蘭克福)和印度尼西亞(雅加達)地域支持將客戶端升級為2.x.x.Final版本,其他地域請勿將SDK升級到Java SDK 2.x.x Final版本,否則將無法訪問云消息隊列 RocketMQ 版服務。
Java SDK 2.x.x.Final僅支持通過VPC網絡訪問云消息隊列 RocketMQ 版,不支持經典網絡訪問。
若您使用存量云消息隊列 RocketMQ 版實例并通過經典網絡訪問,請勿將Java SDK升級到V2.x.x.Final版本,否則將無法訪問云消息隊列 RocketMQ 版實例。
Java SDK 2.x.x.Final僅支持有命名空間的實例,若您使用的實例無命名空間,請勿將客戶端版本升級到Java SDK 2.x.x.Final。
5.x版本實例默認都有命名空間,4.x版本實例可在云消息隊列 RocketMQ 版控制臺實例詳情頁面的基礎信息區域查看是否有命名空間。
版本信息
發布時間 | 版本號 | 下載鏈接 |
2023-02-23 | 2.0.5.Final | |
2022-08-17 | 2.0.3.Final | |
2022-06-16 | 2.0.2.Final | |
2021-11-29 | 2.0.1.Final | |
2021-10-18 | 2.0.0.Final |
環境要求
使用V2.x.x版本Java SDK,請確保您使用的JDK版本為Java 8及以上。
V2.0.5功能變更
功能優化
日志增加異步支持。
問題修復
修復批量消費等待時間無法指定的問題。
修復部分安全漏洞。
V2.0.3功能變更
問題修復
修復高版本JDK中,消費線程池無法調節至32個線程以上的問題。
V2.0.2功能變更
問題修復
修復消息發送時,有小概率觸發死鎖的問題。
V2.0.1功能變更
消息軌跡
補充消息軌跡數據。
V2.0.0功能變更
順序消息
順序消息的最大重試次數MaxReconsumeTimes參數的默認值從Integer.MAX變更為16次。超過最大重試次數消息還未被消費成功將直接被投遞至死信隊列。您可以通過自定義MaxReconsumeTimes參數值修改順序消息的最大重試次數。
事務消息
若生產者發送消息時LocalTransactionExecutor
類為null,則系統會拋出異常,歷史版本則不會拋出異常。
廣播消費
廣播消費模式下,支持使用offsetStore
接口的方式定制消費者啟動時的消費位點。若未設置,默認和歷史版本一致直接從最新消費位點開始消費。
示例代碼如下:
public class BroadcastingConsumerExample {
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "MyGroupId");
properties.put(PropertyKeyConst.AccessKey, "MyAccessKey");
properties.put(PropertyKeyConst.SecretKey, "MySecretKey");
// 設置TCP協議的接入域名,進入消息隊列RocketMQ版控制臺實例詳情頁面的接入點區域查看。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
// 從OffsetStore讀取消費位點的功能僅支持廣播消費模式。
properties.put(PropertyKeyConst.MessageModel, MessageModel.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(properties);
// 參數1表示1秒鐘會調用一次AbstractOffsetStore方法來進行位點的持久化。
OffsetStore offsetStore = new AbstractOffsetStore(1) {
@Override
public Map<TopicPartition, Long> loadOffset() {
// 實現從外部存儲讀取位點的邏輯。
}
@Override
public void persistOffset(Map<TopicPartition, Long> offsetTable) {
// 持久化位點到外部存儲的邏輯。
}
};
offsetStore.start();
consumer.setOffsetStore(offsetStore);
consumer.subscribe("testBroadcastingTopic", "TagA", new MessageListener() {
@Override
public Action consume(Message message, ConsumeContext context) {
// 實現消息消費相關邏輯。
return Action.CommitMessage;
}
});
consumer.start();
Thread.sleep(100000);
consumer.shutdown();
offsetStore.shutdown();
}
}
Push消費
暫不支持普通消息的批量消費功能。
如果設置的消費線程數不在合法區間[1,1000]內,系統會在創建消費者時拋出異常,而不是在啟動消費者時拋出異常。
新增消費速度限流功能。為了避免消息洪峰可能對消費端應用產生沖擊,您可通過該功能限制消息的消費速度,保護消費端應用。自定義消費速度的示例代碼如下:
說明順序消息的消息重試不受限流控制。
public class RateLimitConsumerExample { public static void main(String[] args) throws InterruptedException { Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "MyGroupId"); properties.put(PropertyKeyConst.AccessKey, "MyAccessKey"); properties.put(PropertyKeyConst.SecretKey, "MySecretKey"); // 設置TCP協議的接入域名,進入消息隊列RocketMQ版控制臺實例詳情頁面的接入點區域查看。 properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); Consumer consumer = ONSFactory.createConsumer(properties); // 對testTopicA進行消費限流,限制消費端每秒鐘消費10條消息。 consumer.rateLimit("testTopicA", 10); consumer.subscribe("testTopic", "TagA", new MessageListener() { @Override public Action consume(Message message, ConsumeContext context) { // 實現消息消費相關邏輯。 return Action.CommitMessage; } }); consumer.start(); Thread.sleep(100000); consumer.shutdown(); } }
Pull消費
暫不支持Pull消費方式。
日志配置
日志默認路徑由~/logs/ons.log變更為~/logs/ons/ons-client.log。
日志級別與logback完全對齊。除已有的ERROR、WARN、INFO、DEBUG級別之外,增加對OFF、TRACE和ALL級別的支持。
所有日志相關參數的添加方式除了
-D
方式以外,增加對環境變量的支持。
客戶端創建
設置非法的接入點時,系統會在創建發送者或消費者時拋出異常,而不是在啟動發送者或消費者時拋出異常。
消息軌跡
使用最新版本SDK收發消息,查詢消息軌跡時,新增返回如下軌跡參數。
參數 | 說明 |
AccessKey | 您的阿里云賬號或RAM用戶的AccessKey ID,用于標識用戶。當您通過SDK或API調用云消息隊列 RocketMQ 版資源時,需要使用AccessKey ID進行身份驗證。 |
到達Server | 消息到達消息隊列RocketMQ版服務端的時間。 |
預設DeliverAt | 定時消息的預計投遞時間。 |
實際AvailableAt | 定時消息定時結束的時間。即消息可被消費者消費的開始時間。 |
Available Time | 消息可被消費者消費的開始時間。 |
提交/回滾時間 | 事務消息提交或回滾的時間。 |
到達消費端 | 消息到達消費者客戶端的時間。 |
等待處理耗時 | 消息到達消費者客戶端,等待線程池分配線程和分配處理資源的耗時。 |
V2.0.0問題修復
修復RAM角色實現跨云賬號STS授權場景下,updateCredential方法調用頻率較高時,三元組(AccessKey ID、AccessKey Secret和STS Token)更新不具備原子性而導致的鑒權失敗問題。
STS授權的SDK示例代碼,請參見STS Token配置示例。