日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Spring 集成

更新時(shí)間:

本文介紹如何在 SpringBoot 框架下用 SOFAStack 消息隊(duì)列收發(fā)消息。

背景信息

主要包括以下三部分內(nèi)容:

  • 普通消息生產(chǎn)者和 Spring 集成

  • 事務(wù)消息生產(chǎn)者和 Spring 集成

  • 消息消費(fèi)者和 Spring 集成

請(qǐng)確保同一個(gè) Group ID 下所有 Consumer 實(shí)例的訂閱關(guān)系保持一致。詳情請(qǐng)參見 訂閱關(guān)系一致

SpringBoot 框架下支持的配置參數(shù)和 TCP Java 一致。詳情請(qǐng)參見 Java SDK 接口和參數(shù)說明

生產(chǎn)者與 Spring 集成

  1. 聲明生產(chǎn)者。

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;
    import io.openmessaging.api.OMS;
    import io.openmessaging.api.Producer;
    
    @Configuration
    public class ProducerClient{
        @Autowired
        private MqConfig mqConfig;
    
        @Bean(initMethod ="start", destroyMethod ="shutdown")
        public Producer buildProducer(){
            Producer producer = OMS.builder().driver("sofamq").build(mqConfig.getMqProperties())
                    .createProducer(mqConfig.getMqProperties());
            return producer;
        }
    }
    
  2. 通過已經(jīng)與 Spring 集成好的生產(chǎn)者生產(chǎn)消息。

    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;
    
    import io.openmessaging.api.Message;
    import io.openmessaging.api.Producer;
    import io.openmessaging.api.SendResult;
    import io.openmessaging.api.exception.OMSRuntimeException;
    
    @Component
    public class SyncProducerTest{
        //普通消息的 Producer 已經(jīng)注冊(cè)到了 spring 容器中,后面需要使用時(shí)可以直接注入到其它類中
        @Autowired
        private Producer producer;
        @Autowired
        private MqConfig mqConfig;
        @Test
        public void testSend(){
                //循環(huán)發(fā)送消息
                for(int i =0; i <100; i++){
                    Message msg =new Message(
                    // Message 所屬的 Topic
                    mqConfig.getTopic(),
                    // Message Tag 可理解為 Gmail 中的標(biāo)簽,對(duì)消息進(jìn)行再歸類,方便 Consumer 指定過濾條件在 MQ 服務(wù)器過濾
                    mqConfig.getTag(),
                    // Message Body 可以是任何二進(jìn)制形式的數(shù)據(jù), MQ 不做任何干預(yù)
                    // 需要 Producer 與 Consumer 協(xié)商好一致的序列化和反序列化方式
                    "Hello MQ".getBytes());
                    // 設(shè)置代表消息的業(yè)務(wù)關(guān)鍵屬性,請(qǐng)盡可能全局唯一
                    // 以方便您在無法正常收到消息情況下,可通過 MQ 控制臺(tái)查詢消息并補(bǔ)發(fā)
                    // 注意:不設(shè)置也不會(huì)影響消息正常收發(fā)
                    msg.setKey("ORDERID_100");
                    // 發(fā)送消息,只要不拋異常就是成功
                    try{
                        SendResult sendResult = producer.send(msg);
                        assert sendResult !=null;
                        System.out.println(sendResult);
                    }catch(OMSRuntimeException e){
                        System.out.println("發(fā)送失敗");
                        //出現(xiàn)異常意味著發(fā)送失敗,為了避免消息丟失,建議緩存該消息然后進(jìn)行重試。
                    }
                }
        }
    }

事務(wù)消息生產(chǎn)者與 Spring 集成

事務(wù)消息的概念詳情請(qǐng)參見收發(fā)事務(wù)消息。

  1. 首先需要實(shí)現(xiàn)一個(gè) LocalTransactionChecker,如下所示。 一個(gè)消息生產(chǎn)者只能有一個(gè) LocalTransactionChecker

    import org.springframework.stereotype.Component;
    import io.openmessaging.api.Message;
    import io.openmessaging.api.transaction.LocalTransactionChecker;
    import io.openmessaging.api.transaction.TransactionStatus;
    
    @Component
    public class DemoLocalTransactionChecker implements LocalTransactionChecker{
        @Override
        public TransactionStatus check(Message msg){
            System.out.println("開始回查本地事務(wù)狀態(tài)");
            return TransactionStatus.CommitTransaction;//根據(jù)本地事務(wù)狀態(tài)檢查結(jié)果返回不同的 TransactionStatus
        }
    }
  2. 聲明生產(chǎn)者。

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;
    import io.openmessaging.api.OMS;
    import io.openmessaging.api.transaction.TransactionProducer;
    
    @Configuration
    public class TransactionProducerClient{
        @Autowired
        private MqConfig  mqConfig;
        @Autowired
        private DemoLocalTransactionChecker localTransactionChecker;
        @Bean(initMethod ="start", destroyMethod ="shutdown")
        public TransactionProducer buildTransactionProducer(){
            TransactionProducer producer = OMS.builder().driver("sofamq").build(mqConfig.getMqProperties())
            .createTransactionProducer(mqConfig.getMqProperties(), localTransactionChecker);
            return producer;
        }
    }
  3. 通過已經(jīng)與 Spring 集成好的生產(chǎn)者生產(chǎn)事務(wù)消息。

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;

import io.openmessaging.api.Message;
import io.openmessaging.api.SendResult;
import io.openmessaging.api.transaction.LocalTransactionExecuter;
import io.openmessaging.api.transaction.TransactionProducer;
import io.openmessaging.api.transaction.TransactionStatus;

@RunWith(SpringRunner.class)
@SpringBootTest
public class TransactionProducerTest{

    //事務(wù)消息的 Producer 已經(jīng)注冊(cè)到了 spring 容器中,后面需要使用時(shí)可以直接注入到其它類中
    @Autowired
    private TransactionProducer transactionProducer;
    
    @Autowired
    private MqConfig mqConfig;

    @Test
    public void testSend(){
        Message msg =newMessage(mqConfig.getTopic(),"TagA","Hello MQ".getBytes());
        SendResult sendResult = transactionProducer.send(msg,newLocalTransactionExecuter(){
            @Override
            public TransactionStatus execute(Message msg,Object arg){
                System.out.println("執(zhí)行本地事務(wù)");
                return TransactionStatus.CommitTransaction;//根據(jù)本地事務(wù)執(zhí)行結(jié)果來返回不同的 TransactionStatus
            }
        },null);
        System.out.println(sendResult);
    }
}

消費(fèi)者與 SpringBoot 集成

  1. 創(chuàng)建 MessageListener,如下所示。

    import org.springframework.stereotype.Component;
    import io.openmessaging.api.Action;
    import io.openmessaging.api.ConsumeContext;
    import io.openmessaging.api.Message;
    import io.openmessaging.api.MessageListener;
    
    @Component
    public class DemoMessageListener implements MessageListener{
        @Override
        public Action consume(Message message,ConsumeContext context){
            System.out.println("Receive: "+ message);
            try{
                //do something..
                return Action.CommitMessage;
            }catch(Exception e){
                //消費(fèi)失敗
                return Action.ReconsumeLater;
            }
        }
    }
  2. 聲明消費(fèi)者。

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;
    import io.openmessaging.api.Consumer;
    import io.openmessaging.api.OMS;
    import org.springframework.context.annotation.Configuration;
    @Configuration
    public class ConsumerClient{
        @Autowired
        private MqConfig mqConfig;
        @Autowired
        private DemoMessageListener messageListener;
        @Bean(initMethod ="start", destroyMethod ="shutdown")
        public Consumer buildConsumer(){
            Consumer consumer = OMS.builder().driver("sofamq").build(mqConfig.getMqProperties())
            .createConsumer(mqConfig.getMqProperties());
            consumer.subscribe(mqConfig.getTopic(), mqConfig.getTag(), messageListener);
            return consumer;
        }
    }