日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

收發(fā)事務(wù)消息

本文提供使用 TCP 協(xié)議下的 Java SDK 收發(fā)事務(wù)消息的示例代碼供您參考。

消息隊列提供類似 X/Open XA 的分布式事務(wù)功能,通過消息隊列事務(wù)消息,能達到分布式事務(wù)的最終一致。

說明

對于新手用戶,建議在正式收發(fā)消息前,閱讀 Demo 工程來了解搭建消息隊列工程的具體步驟。

交互流程

事務(wù)消息交互流程如下圖所示。

事務(wù)消息交互流程

詳情請參見 消息類型 > 事務(wù)消息

前提條件

您已完成以下操作:

發(fā)送事務(wù)消息

說明

具體的示例代碼,請以消息隊列代碼庫為準。

發(fā)送事務(wù)消息包含以下兩個步驟:

  1. 發(fā)送半事務(wù)消息(Half Message)及執(zhí)行本地事務(wù),示例代碼如下。

    import java.util.Properties;
    import com.alipay.sofa.sofamq.client.PropertyKeyConst;
    import io.openmessaging.api.Message;
    import io.openmessaging.api.MessagingAccessPoint;
    import io.openmessaging.api.OMS;
    import io.openmessaging.api.OMSBuiltinKeys;
    import io.openmessaging.api.transaction.LocalTransactionChecker;
    import io.openmessaging.api.transaction.LocalTransactionExecuter;
    import io.openmessaging.api.transaction.TransactionProducer;
    import io.openmessaging.api.transaction.TransactionStatus;
    
    public class TransactionProducerTest {
        public static void main(String... args) {
            Properties credentials = new Properties();
            // 阿里云賬號 AccessKey 擁有所有 API 的訪問權(quán)限,風險很高。強烈建議您創(chuàng)建并使用 RAM 用戶進行 API 訪問或日常運維,請登錄 RAM 控制臺創(chuàng)建 RAM 用戶。
            // 此處以把 AccessKey 和 AccessKeySecret 保存在環(huán)境變量為例說明。
            // 強烈建議不要把 AccessKey 和 AccessKeySecret 保存到代碼里,會存在密鑰泄漏風險
            credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV");        
            credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV");
    
            // 設(shè)置 TCP 接入域名,進入控制臺的概覽頁面查看接入點配置
            MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint").withCredentials(credentials).build();
    
            Properties properties = new Properties();
            // 設(shè)置用戶實例,進入控制臺的概覽頁面查看接入點配置
            properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");
            properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");
    
            TransactionProducer producer = accessPoint.createTransactionProducer(properties, newLocalTransactionChecker() {
                @Override
                public TransactionStatus check (Message msg){
                    // check business commit status
                    return TransactionStatus.CommitTransaction;
                }
            });
            producer.start();
    
            Message message = new Message("$topic", "YOUR_TAG", "hello world".getBytes());
            producer.send(message, new LocalTransactionExecuter() {
                @Override
                public TransactionStatus execute(Message msg, Object arg) {
                    // if business success, then commit; else rollback
                    return TransactionStatus.CommitTransaction;
                }
            }, null);
        }
    }
  2. 提交事務(wù)消息狀態(tài)。當本地事務(wù)執(zhí)行完成(執(zhí)行成功或執(zhí)行失敗),需要通知服務(wù)器當前消息的事務(wù)狀態(tài)。通知方式有以下兩種:

    • 執(zhí)行本地事務(wù)完成后提交。

    • 執(zhí)行本地事務(wù)一直沒提交狀態(tài),等待服務(wù)器回查消息的事務(wù)狀態(tài)。事務(wù)狀態(tài)有以下三種:

      • TransactionStatus.CommitTransaction 提交事務(wù),允許訂閱方消費該消息。

      • TransactionStatus.RollbackTransaction 回滾事務(wù),消息將被丟棄不允許消費。

      • TransactionStatus.Unknow 無法判斷狀態(tài),期待消息隊列的 Broker 向發(fā)送方再次詢問該消息對應(yīng)的本地事務(wù)的狀態(tài)。

事務(wù)回查機制說明

  • 發(fā)送事務(wù)消息為什么必須要實現(xiàn)回查 Check 機制?當步驟 1 中半事務(wù)消息發(fā)送完成,但本地事務(wù)返回狀態(tài)為 TransactionStatus.Unknow,或者應(yīng)用退出導(dǎo)致本地事務(wù)未提交任何狀態(tài)時,從 Broker 的角度看,這條 Half 狀態(tài)的消息的狀態(tài)是未知的。因此 Broker 會定期要求發(fā)送方能 Check 該 Half 狀態(tài)消息,并上報其最終狀態(tài)。

  • Check 被回調(diào)時,業(yè)務(wù)邏輯都需要做些什么?事務(wù)消息的 Check 方法里面,應(yīng)該寫一些檢查事務(wù)一致性的邏輯。消息隊列發(fā)送事務(wù)消息時需要實現(xiàn) LocalTransactionChecker 接口,用來處理 Broker 主動發(fā)起的本地事務(wù)狀態(tài)回查請求;因此在事務(wù)消息的 Check 方法中,需要完成兩件事情:

    1. 檢查該半事務(wù)消息對應(yīng)的本地事務(wù)的狀態(tài)(committed or rollback)。

    2. 向 Broker 提交該半事務(wù)消息本地事務(wù)的狀態(tài)。

訂閱事務(wù)消息

事務(wù)消息的訂閱與普通消息訂閱一致,詳情請參見 訂閱消息