收發(fā)事務(wù)消息
本文提供使用 TCP 協(xié)議下的 Java SDK 收發(fā)事務(wù)消息的示例代碼供您參考。
消息隊列提供類似 X/Open XA 的分布式事務(wù)功能,通過消息隊列事務(wù)消息,能達到分布式事務(wù)的最終一致。
對于新手用戶,建議在正式收發(fā)消息前,閱讀 Demo 工程來了解搭建消息隊列工程的具體步驟。
交互流程
事務(wù)消息交互流程如下圖所示。
詳情請參見 消息類型 > 事務(wù)消息。
前提條件
您已完成以下操作:
下載 Java SDK。
(可選)日志配置。
發(fā)送事務(wù)消息
具體的示例代碼,請以消息隊列代碼庫為準。
發(fā)送事務(wù)消息包含以下兩個步驟:
發(fā)送半事務(wù)消息(Half Message)及執(zhí)行本地事務(wù),示例代碼如下。
import java.util.Properties; import com.alipay.sofa.sofamq.client.PropertyKeyConst; import io.openmessaging.api.Message; import io.openmessaging.api.MessagingAccessPoint; import io.openmessaging.api.OMS; import io.openmessaging.api.OMSBuiltinKeys; import io.openmessaging.api.transaction.LocalTransactionChecker; import io.openmessaging.api.transaction.LocalTransactionExecuter; import io.openmessaging.api.transaction.TransactionProducer; import io.openmessaging.api.transaction.TransactionStatus; public class TransactionProducerTest { public static void main(String... args) { Properties credentials = new Properties(); // 阿里云賬號 AccessKey 擁有所有 API 的訪問權(quán)限,風險很高。強烈建議您創(chuàng)建并使用 RAM 用戶進行 API 訪問或日常運維,請登錄 RAM 控制臺創(chuàng)建 RAM 用戶。 // 此處以把 AccessKey 和 AccessKeySecret 保存在環(huán)境變量為例說明。 // 強烈建議不要把 AccessKey 和 AccessKeySecret 保存到代碼里,會存在密鑰泄漏風險 credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV"); credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV"); // 設(shè)置 TCP 接入域名,進入控制臺的概覽頁面查看接入點配置 MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint").withCredentials(credentials).build(); Properties properties = new Properties(); // 設(shè)置用戶實例,進入控制臺的概覽頁面查看接入點配置 properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId"); properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP"); TransactionProducer producer = accessPoint.createTransactionProducer(properties, newLocalTransactionChecker() { @Override public TransactionStatus check (Message msg){ // check business commit status return TransactionStatus.CommitTransaction; } }); producer.start(); Message message = new Message("$topic", "YOUR_TAG", "hello world".getBytes()); producer.send(message, new LocalTransactionExecuter() { @Override public TransactionStatus execute(Message msg, Object arg) { // if business success, then commit; else rollback return TransactionStatus.CommitTransaction; } }, null); } }
提交事務(wù)消息狀態(tài)。當本地事務(wù)執(zhí)行完成(執(zhí)行成功或執(zhí)行失敗),需要通知服務(wù)器當前消息的事務(wù)狀態(tài)。通知方式有以下兩種:
執(zhí)行本地事務(wù)完成后提交。
執(zhí)行本地事務(wù)一直沒提交狀態(tài),等待服務(wù)器回查消息的事務(wù)狀態(tài)。事務(wù)狀態(tài)有以下三種:
TransactionStatus.CommitTransaction
提交事務(wù),允許訂閱方消費該消息。TransactionStatus.RollbackTransaction
回滾事務(wù),消息將被丟棄不允許消費。TransactionStatus.Unknow
無法判斷狀態(tài),期待消息隊列的 Broker 向發(fā)送方再次詢問該消息對應(yīng)的本地事務(wù)的狀態(tài)。
事務(wù)回查機制說明
發(fā)送事務(wù)消息為什么必須要實現(xiàn)回查 Check 機制?當步驟 1 中半事務(wù)消息發(fā)送完成,但本地事務(wù)返回狀態(tài)為
TransactionStatus.Unknow
,或者應(yīng)用退出導(dǎo)致本地事務(wù)未提交任何狀態(tài)時,從 Broker 的角度看,這條 Half 狀態(tài)的消息的狀態(tài)是未知的。因此 Broker 會定期要求發(fā)送方能 Check 該 Half 狀態(tài)消息,并上報其最終狀態(tài)。Check 被回調(diào)時,業(yè)務(wù)邏輯都需要做些什么?事務(wù)消息的 Check 方法里面,應(yīng)該寫一些檢查事務(wù)一致性的邏輯。消息隊列發(fā)送事務(wù)消息時需要實現(xiàn)
LocalTransactionChecker
接口,用來處理 Broker 主動發(fā)起的本地事務(wù)狀態(tài)回查請求;因此在事務(wù)消息的 Check 方法中,需要完成兩件事情:檢查該半事務(wù)消息對應(yīng)的本地事務(wù)的狀態(tài)(committed or rollback)。
向 Broker 提交該半事務(wù)消息本地事務(wù)的狀態(tài)。
訂閱事務(wù)消息
事務(wù)消息的訂閱與普通消息訂閱一致,詳情請參見 訂閱消息。