事務消息為云消息隊列 RocketMQ 版中的高級特性消息,本文為您介紹事務消息的應用場景、功能原理、使用限制、使用方法和使用建議。
應用場景
分布式事務的訴求
分布式系統調用的特點為一個核心業務邏輯的執行,同時需要調用多個下游業務進行處理。因此,如何保證核心業務和多個下游業務的執行結果完全一致,是分布式事務需要解決的主要問題。
以電商交易場景為例,用戶支付訂單這一核心操作的同時會涉及到下游物流發貨、積分變更、購物車狀態清空等多個子系統的變更。當前業務的處理分支包括:
主分支訂單系統狀態更新:由未支付變更為支付成功。
物流系統狀態新增:新增待發貨物流記錄,創建訂單物流記錄。
積分系統狀態變更:變更用戶積分,更新用戶積分表。
購物車系統狀態變更:清空購物車,更新用戶購物車記錄。
傳統XA事務方案:性能不足
為了保證上述四個分支的執行結果一致性,典型方案是基于XA協議的分布式事務系統來實現。將四個調用分支封裝成包含四個獨立事務分支的大事務。基于XA分布式事務的方案可以滿足業務處理結果的正確性,但最大的缺點是多分支環境下資源鎖定范圍大,并發度低,隨著下游分支的增加,系統性能會越來越差。
基于普通消息方案:一致性保障困難
將上述基于XA事務的方案進行簡化,將訂單系統變更作為本地事務,剩下的系統變更作為普通消息的下游來執行,事務分支簡化成普通消息+訂單表事務,充分利用消息異步化的能力縮短鏈路,提高并發度。
該方案中消息下游分支和訂單系統變更的主分支很容易出現不一致的現象,例如:
消息發送成功,訂單沒有執行成功,需要回滾整個事務。
訂單執行成功,消息沒有發送成功,需要額外補償才能發現不一致。
消息發送超時未知,此時無法判斷需要回滾訂單還是提交訂單變更。
基于云消息隊列 RocketMQ 版分布式事務消息:支持最終一致性
上述普通消息方案中,普通消息和訂單事務無法保證一致的原因,本質上是由于普通消息無法像單機數據庫事務一樣,具備提交、回滾和統一協調的能力。
而基于云消息隊列 RocketMQ 版實現的分布式事務消息功能,在普通消息基礎上,支持二階段的提交能力。將二階段提交和本地事務綁定,實現全局提交結果的一致性。
云消息隊列 RocketMQ 版事務消息的方案,具備高性能、可擴展、業務開發簡單的優勢。具體事務消息的原理和流程,請參見下文的功能原理。
功能原理
什么是事務消息
事務消息是云消息隊列 RocketMQ 版提供的一種高級消息類型,支持在分布式場景下保障消息生產和本地事務的最終一致性。
事務消息處理流程
事務消息交互流程如下圖所示。
生產者將消息發送至云消息隊列 RocketMQ 版服務端。
云消息隊列 RocketMQ 版服務端將消息持久化成功之后,向生產者返回Ack確認消息已經發送成功,此時消息被標記為“暫不能投遞”,這種狀態下的消息即為半事務消息。
生產者開始執行本地事務邏輯。
生產者根據本地事務執行結果向服務端提交二次確認結果(Commit或是Rollback),服務端收到確認結果后處理邏輯如下:
二次確認結果為Commit:服務端將半事務消息標記為可投遞,并投遞給消費者。
二次確認結果為Rollback:服務端將回滾事務,不會將半事務消息投遞給消費者。
在斷網或者是生產者應用重啟的特殊情況下,若服務端未收到發送者提交的二次確認結果,或服務端收到的二次確認結果為Unknown未知狀態,經過固定時間后,服務端將對消息生產者即生產者集群中任一生產者實例發起消息回查。
說明服務端回查的間隔時間和最大回查次數,請參見參數限制。
生產者收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。
生產者根據檢查到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟4對半事務消息進行處理。
事務消息生命周期
初始化
半事務消息被生產者構建并完成初始化,待發送到服務端的狀態。
事務待提交
半事務消息被發送到服務端,和普通消息不同,并不會直接被服務端持久化,而是會被單獨存儲到事務存儲系統中,等待第二階段本地事務返回執行結果后再提交。此時消息對下游消費者不可見。
消息回滾
第二階段如果事務執行結果明確為回滾,服務端會將半事務消息回滾,該事務消息流程終止。
提交待消費
第二階段如果事務執行結果明確為提交,服務端會將半事務消息重新存儲到普通存儲系統中,此時消息對下游消費者可見,等待被消費者獲取并消費。
消費中
消息被消費者獲取,并按照消費者本地的業務邏輯進行處理的過程。
此時服務端會等待消費者完成消費并提交消費結果,如果一定時間后沒有收到消費者的響應,云消息隊列 RocketMQ 版會對消息進行重試處理。具體信息,請參見消費重試。
消費提交
消費者完成消費處理,并向服務端提交消費結果,服務端標記當前消息已經被處理(包括消費成功和失敗)。
云消息隊列 RocketMQ 版默認支持保留所有消息,此時消息數據并不會立即被刪除,只是邏輯標記已消費。消息在保存時間到期或存儲空間不足被刪除前,消費者仍然可以回溯消息重新消費。
消息刪除
云消息隊列 RocketMQ 版按照消息保存機制滾動清理最早的消息數據,將消息從物理文件中刪除。更多信息,請參見消息存儲和清理機制。
使用限制
消息類型一致性
事務消息僅支持在MessageType為Transaction的主題內使用,即事務消息只能發送至類型為事務消息的主題中,發送的消息的類型必須和主題的類型一致。
消費事務性
云消息隊列 RocketMQ 版事務消息保證本地主分支事務和下游消息發送事務的一致性,但不保證消息消費結果和上游事務的一致性。因此需要下游業務分支自行保證消息正確處理,建議消費端做好消費重試,如果有短暫失敗可以利用重試機制保證最終處理成功。
中間狀態可見性
云消息隊列 RocketMQ 版事務消息為最終一致性,即在消息提交到下游消費端處理完成之前,下游分支和上游事務之間的狀態會不一致。因此,事務消息僅適合接受異步執行的事務場景。
事務超時機制
云消息隊列 RocketMQ 版事務消息的生命周期存在超時機制,即半事務消息被生產者發送服務端后,如果在指定時間內服務端無法確認提交或者回滾狀態,則消息默認會被回滾。事務超時時間,請參見參數限制。
不支持多個sendReceipt
事務消息在一個事務中僅允許一個sendReceipt,不支持多個sendReceipt。
使用示例
事務消息相比普通消息發送時需要修改以下幾點:
發送事務消息前,需要開啟事務并關聯本地的事務執行。
為保證事務一致性,在構建生產者時,必須設置事務檢查器和預綁定事務消息發送的主題列表,客戶端內置的事務檢查器會對綁定的事務主題做異常狀態恢復。
以Java語言為例,使用事務消息示例參考如下:
完整的消息收發示例代碼請參見RocketMQ 5.x系列SDK(推薦)。
示例代碼
import java.time.Duration;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.shaded.com.google.common.base.Strings;
public class ProducerTransactionMessageExample {
/**
* 演示demo,模擬訂單表查詢服務,用來確認訂單事務是否提交成功。
*/
private static boolean checkOrderById(String orderId) {
return true;
}
/**
* 演示demo,模擬本地事務的執行結果。
*/
private static boolean doLocalTransaction() {
return true;
}
public static void main(String[] args) throws ClientException {
/**
* 實例接入點,從控制臺實例詳情頁的接入點頁簽中獲取。
* 如果是在阿里云ECS內網訪問,建議填寫VPC接入點。
* 如果是在本地公網訪問,或者是線下IDC環境訪問,可以使用公網接入點。使用公網接入點訪問,必須開啟實例的公網訪問功能。
*/
String endpoints = "xxx-hangzhou.rmq.aliyuncs.com:8080";
//消息發送的目標Topic名稱,需要提前在控制臺創建,如果不創建直接使用會返回報錯。
String topic = "topic1";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* 如果是使用公網接入點訪問,configuration還需要設置實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 如果是在阿里云ECS內網訪問,無需填寫該配置,服務端會根據內網VPC信息智能獲取。
* 如果實例類型為Serverlesss實例,則不管是公網訪問還是內網訪問都必須設置實例的用戶名密碼。
*/
builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
builder.setRequestTimeout(Duration.ofMillis(5000));
ClientConfiguration configuration = builder.build();
//構造事務生產者:事務消息需要生產者構建一個事務檢查器,用于檢查確認異常半事務的中間狀態。
Producer producer = provider.newProducerBuilder()
.setTransactionChecker(messageView -> {
/**
* 事務檢查器一般是根據業務的ID去檢查本地事務是否正確提交還是回滾,此處以訂單ID屬性為例。
* 在訂單表找到了這個訂單,說明本地事務插入訂單的操作已經正確提交;如果訂單表沒有訂單,說明本地事務已經回滾。
*/
final String orderId = messageView.getProperties().get("OrderId");
if (Strings.isNullOrEmpty(orderId)) {
// 錯誤的消息,直接返回Rollback。
return TransactionResolution.ROLLBACK;
}
return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
}).setTopics(topic)
.setClientConfiguration(configuration)
.build();
//開啟事務分支。
final Transaction transaction;
try {
transaction = producer.beginTransaction();
} catch (ClientException e) {
e.printStackTrace();
//事務分支開啟失敗,直接退出。
return;
}
Message message = provider.newMessageBuilder()
.setTopic(topic)
//設置消息索引鍵,可根據關鍵字精確查找某條消息。
.setKeys("messageKey1")
//設置消息Tag,用于消費端根據指定Tag過濾消息。
.setTag("messageTag")
//一般事務消息都會設置一個本地事務關聯的唯一ID,用來做本地事務回查的校驗。
.addProperty("OrderId", "xxx")
//消息體。
.setBody("messageBody".getBytes())
.build();
//發送半事務消息
final SendReceipt sendReceipt;
try {
sendReceipt = producer.send(message, transaction);
} catch (ClientException e) {
//半事務消息發送失敗,事務可以直接退出并回滾。
return;
}
/**
* 執行本地事務,并確定本地事務結果。
* 1. 如果本地事務提交成功,則提交消息事務。
* 2. 如果本地事務提交失敗,則回滾消息事務。
* 3. 如果本地事務未知異常,則不處理,等待事務消息回查。
*
*/
boolean localTransactionOk = doLocalTransaction();
if (localTransactionOk) {
try {
transaction.commit();
} catch (ClientException e) {
// 業務可以自身對實時性的要求選擇是否重試,如果放棄重試,可以依賴事務消息回查機制進行事務狀態的提交。
e.printStackTrace();
}
} else {
try {
transaction.rollback();
} catch (ClientException e) {
// 建議記錄異常信息,回滾異常時可以無需重試,依賴事務消息回查機制進行事務狀態的提交。
e.printStackTrace();
}
}
}
}
使用建議
避免大量未決事務導致超時
云消息隊列 RocketMQ 版支持在事務提交階段異常的情況下發起事務回查,保證事務一致性。但生產者應該盡量避免本地事務返回未知結果。大量的事務檢查會導致系統性能受損,容易導致事務處理延遲。
正確處理“進行中”的事務
消息回查時,對于正在進行中的事務不要返回Rollback或Commit結果,應繼續保持Unknown的狀態。
一般出現消息回查時事務正在處理的原因為:事務執行較慢,消息回查太快。解決方案如下:
將第一次事務回查時間設置較大一些,但可能導致依賴回查的事務提交延遲較大。
程序能正確識別正在進行中的事務。