Spring 集成
更新時(shí)間:
本文介紹如何在 SpringBoot 框架下用 SOFAStack 消息隊(duì)列收發(fā)消息。
背景信息
主要包括以下三部分內(nèi)容:
普通消息生產(chǎn)者和 Spring 集成
事務(wù)消息生產(chǎn)者和 Spring 集成
消息消費(fèi)者和 Spring 集成
請(qǐng)確保同一個(gè) Group ID 下所有 Consumer 實(shí)例的訂閱關(guān)系保持一致。詳情請(qǐng)參見 訂閱關(guān)系一致。
SpringBoot 框架下支持的配置參數(shù)和 TCP Java 一致。詳情請(qǐng)參見 Java SDK 接口和參數(shù)說明。
生產(chǎn)者與 Spring 集成
聲明生產(chǎn)者。
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.alipay.sofa.sofamq.example.springboot.config.MqConfig; import io.openmessaging.api.OMS; import io.openmessaging.api.Producer; @Configuration public class ProducerClient{ @Autowired private MqConfig mqConfig; @Bean(initMethod ="start", destroyMethod ="shutdown") public Producer buildProducer(){ Producer producer = OMS.builder().driver("sofamq").build(mqConfig.getMqProperties()) .createProducer(mqConfig.getMqProperties()); return producer; } }
通過已經(jīng)與 Spring 集成好的生產(chǎn)者生產(chǎn)消息。
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.alipay.sofa.sofamq.example.springboot.config.MqConfig; import io.openmessaging.api.Message; import io.openmessaging.api.Producer; import io.openmessaging.api.SendResult; import io.openmessaging.api.exception.OMSRuntimeException; @Component public class SyncProducerTest{ //普通消息的 Producer 已經(jīng)注冊(cè)到了 spring 容器中,后面需要使用時(shí)可以直接注入到其它類中 @Autowired private Producer producer; @Autowired private MqConfig mqConfig; @Test public void testSend(){ //循環(huán)發(fā)送消息 for(int i =0; i <100; i++){ Message msg =new Message( // Message 所屬的 Topic mqConfig.getTopic(), // Message Tag 可理解為 Gmail 中的標(biāo)簽,對(duì)消息進(jìn)行再歸類,方便 Consumer 指定過濾條件在 MQ 服務(wù)器過濾 mqConfig.getTag(), // Message Body 可以是任何二進(jìn)制形式的數(shù)據(jù), MQ 不做任何干預(yù) // 需要 Producer 與 Consumer 協(xié)商好一致的序列化和反序列化方式 "Hello MQ".getBytes()); // 設(shè)置代表消息的業(yè)務(wù)關(guān)鍵屬性,請(qǐng)盡可能全局唯一 // 以方便您在無法正常收到消息情況下,可通過 MQ 控制臺(tái)查詢消息并補(bǔ)發(fā) // 注意:不設(shè)置也不會(huì)影響消息正常收發(fā) msg.setKey("ORDERID_100"); // 發(fā)送消息,只要不拋異常就是成功 try{ SendResult sendResult = producer.send(msg); assert sendResult !=null; System.out.println(sendResult); }catch(OMSRuntimeException e){ System.out.println("發(fā)送失敗"); //出現(xiàn)異常意味著發(fā)送失敗,為了避免消息丟失,建議緩存該消息然后進(jìn)行重試。 } } } }
事務(wù)消息生產(chǎn)者與 Spring 集成
事務(wù)消息的概念詳情請(qǐng)參見收發(fā)事務(wù)消息。
首先需要實(shí)現(xiàn)一個(gè)
LocalTransactionChecker
,如下所示。 一個(gè)消息生產(chǎn)者只能有一個(gè)LocalTransactionChecker
。import org.springframework.stereotype.Component; import io.openmessaging.api.Message; import io.openmessaging.api.transaction.LocalTransactionChecker; import io.openmessaging.api.transaction.TransactionStatus; @Component public class DemoLocalTransactionChecker implements LocalTransactionChecker{ @Override public TransactionStatus check(Message msg){ System.out.println("開始回查本地事務(wù)狀態(tài)"); return TransactionStatus.CommitTransaction;//根據(jù)本地事務(wù)狀態(tài)檢查結(jié)果返回不同的 TransactionStatus } }
聲明生產(chǎn)者。
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.alipay.sofa.sofamq.example.springboot.config.MqConfig; import io.openmessaging.api.OMS; import io.openmessaging.api.transaction.TransactionProducer; @Configuration public class TransactionProducerClient{ @Autowired private MqConfig mqConfig; @Autowired private DemoLocalTransactionChecker localTransactionChecker; @Bean(initMethod ="start", destroyMethod ="shutdown") public TransactionProducer buildTransactionProducer(){ TransactionProducer producer = OMS.builder().driver("sofamq").build(mqConfig.getMqProperties()) .createTransactionProducer(mqConfig.getMqProperties(), localTransactionChecker); return producer; } }
通過已經(jīng)與 Spring 集成好的生產(chǎn)者生產(chǎn)事務(wù)消息。
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;
import io.openmessaging.api.Message;
import io.openmessaging.api.SendResult;
import io.openmessaging.api.transaction.LocalTransactionExecuter;
import io.openmessaging.api.transaction.TransactionProducer;
import io.openmessaging.api.transaction.TransactionStatus;
@RunWith(SpringRunner.class)
@SpringBootTest
public class TransactionProducerTest{
//事務(wù)消息的 Producer 已經(jīng)注冊(cè)到了 spring 容器中,后面需要使用時(shí)可以直接注入到其它類中
@Autowired
private TransactionProducer transactionProducer;
@Autowired
private MqConfig mqConfig;
@Test
public void testSend(){
Message msg =newMessage(mqConfig.getTopic(),"TagA","Hello MQ".getBytes());
SendResult sendResult = transactionProducer.send(msg,newLocalTransactionExecuter(){
@Override
public TransactionStatus execute(Message msg,Object arg){
System.out.println("執(zhí)行本地事務(wù)");
return TransactionStatus.CommitTransaction;//根據(jù)本地事務(wù)執(zhí)行結(jié)果來返回不同的 TransactionStatus
}
},null);
System.out.println(sendResult);
}
}
消費(fèi)者與 SpringBoot 集成
創(chuàng)建
MessageListener
,如下所示。import org.springframework.stereotype.Component; import io.openmessaging.api.Action; import io.openmessaging.api.ConsumeContext; import io.openmessaging.api.Message; import io.openmessaging.api.MessageListener; @Component public class DemoMessageListener implements MessageListener{ @Override public Action consume(Message message,ConsumeContext context){ System.out.println("Receive: "+ message); try{ //do something.. return Action.CommitMessage; }catch(Exception e){ //消費(fèi)失敗 return Action.ReconsumeLater; } } }
聲明消費(fèi)者。
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import com.alipay.sofa.sofamq.example.springboot.config.MqConfig; import io.openmessaging.api.Consumer; import io.openmessaging.api.OMS; import org.springframework.context.annotation.Configuration; @Configuration public class ConsumerClient{ @Autowired private MqConfig mqConfig; @Autowired private DemoMessageListener messageListener; @Bean(initMethod ="start", destroyMethod ="shutdown") public Consumer buildConsumer(){ Consumer consumer = OMS.builder().driver("sofamq").build(mqConfig.getMqProperties()) .createConsumer(mqConfig.getMqProperties()); consumer.subscribe(mqConfig.getTopic(), mqConfig.getTag(), messageListener); return consumer; } }
文檔內(nèi)容是否對(duì)您有幫助?