本文提供使用TCP協議下的Java SDK收發事務消息的示例代碼。
云消息隊列 RocketMQ 版提供類似XA或Open XA的分布式事務功能,通過云消息隊列 RocketMQ 版事務消息,能達到分布式事務的最終一致。
說明
對于新手用戶,建議在正式收發消息前,閱讀Demo工程來了解搭建云消息隊列 RocketMQ 版工程的具體步驟。
交互流程
事務消息交互流程如下圖所示。
更多信息,請參見事務消息。
前提條件
您已完成以下操作:
發送事務消息
具體的示例代碼,請以云消息隊列 RocketMQ 版代碼庫為準。
package com.aliyun.openservices.tcp.example.producer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import java.util.Date;
import java.util.Properties;
public class SimpleTransactionProducer {
public static void main(String[] args) {
Properties properties = new Properties();
// 您在消息隊列RocketMQ版控制臺創建的Group ID。注意:事務消息的Group ID不能與其他類型消息的Group ID共用。
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
// 請確保環境變量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設置。
// AccessKey ID,阿里云身份驗證標識。
properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
// AccessKey Secret,阿里云身份驗證密鑰。
properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// 設置TCP接入域名,進入消息隊列RocketMQ版控制臺實例詳情頁面的接入點頁簽查看。
properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
// 初始化事務消息Producer時,需要注冊一個本地事務狀態的Checker。
LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
transactionProducer.start();
Message msg = new Message("XXX","TagA","Hello MQ transaction===".getBytes());
for (int i = 0; i < 3; i++) {
try{
SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
@Override
public TransactionStatus execute(Message msg, Object arg) {
System.out.println("執行本地事務,并根據本地事務的狀態提交TransactionStatus。");
return TransactionStatus.CommitTransaction;
}
}, null);
assert sendResult != null;
}catch (ONSClientException e){
// 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
System.out.println(new Date() + " Send mq message failed! Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
System.out.println("Send transaction message success.");
}
}
// 本地事務檢查器。
class LocalTransactionCheckerImpl implements LocalTransactionChecker {
@Override
public TransactionStatus check(Message msg) {
System.out.println("收到事務消息的回查請求,MsgId: " + msg.getMsgID());
return TransactionStatus.CommitTransaction;
}
}
事務回查機制說明
發送事務消息為什么必須要實現回查Check機制?
當半事務消息發送完成,但本地事務返回狀態為
TransactionStatus.Unknow
,或者應用退出導致本地事務未提交任何狀態時,從Broker的角度看,這條半事務消息的狀態是未知的。因此Broker會定期向消息發送方即消息生產者集群中的任意一生產者實例發起消息回查,要求發送方回查該Half狀態消息,并上報其最終狀態。Check被回調時,業務邏輯都需要做些什么?
事務消息的Check方法里面,應該寫一些檢查事務一致性的邏輯。云消息隊列 RocketMQ 版發送事務消息時需要實現
LocalTransactionChecker
接口,用來處理Broker主動發起的本地事務狀態回查請求,因此在事務消息的Check方法中,需要完成兩件事情:檢查該半事務消息對應的本地事務的狀態(committed or rollback)。
向Broker提交該半事務消息本地事務的狀態。
訂閱事務消息
事務消息的訂閱與普通消息訂閱一致,更多信息,請參見訂閱消息。
文檔內容是否對您有幫助?