消息過濾
本文描述 SOFAStack 消息隊(duì)列的消費(fèi)者如何根據(jù) Tag 在消息隊(duì)列服務(wù)端完成消息過濾,以確保消費(fèi)者最終只消費(fèi)到其關(guān)注的消息類型。
Tag,即消息標(biāo)簽,用于對某個 Topic 下的消息進(jìn)行分類。消息隊(duì)列的生產(chǎn)者在發(fā)送消息時,已經(jīng)指定消息的 Tag,消費(fèi)者需根據(jù)已經(jīng)指定的 Tag 來進(jìn)行訂閱。
示例代碼
發(fā)送消息
發(fā)送消息時,每條消息必須指明 Tag:
producer.messageBuilder().withTopic("TP_XXX").withTags("TAGA").withValue(orderPojo).build()
訂閱所有 Tag
消費(fèi)者如需訂閱某 Topic 下所有類型的消息,Tag 用符號 * 表示:
SOFABOOT 示例
import com.alipay.sofa.sofamq.api.MessageConsumer; import com.alipay.sofa.sofamq.api.Messaging; // 請使用 xml 或注解將該類配置為 Bean,只有 @Messaging 掃描不到 @Messaging public class SomeClass { @MessageConsumer(group = "GID_XXX", topic = "TP_XXX", filter = "*") public void someMethodReceivePojo(OrderPojo somePojo) { // do something } }
非 SOFABOOT 示例
consumer.subscribe("TP_XXX", "*", new GenericMessageListener<OrderPojo>() { @Override public Class<OrderPojo> payloadClass() { return OrderPojo.class; } @Override public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) { System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue()); // 如果想測試消息重投的功能,可以將Action.CommitMessage 替換成Action.ReconsumeLater return Action.CommitMessage; } });
訂閱單個 Tag
消費(fèi)者如需訂閱某 Topic 下某一種類型的消息,請明確標(biāo)明 Tag:
SOFABOOT 示例
import com.alipay.sofa.sofamq.api.MessageConsumer; import com.alipay.sofa.sofamq.api.Messaging; // 請使用 xml 或注解將該類配置為 Bean,只有 @Messaging 掃描不到 @Messaging public class SomeClass { @MessageConsumer(group = "GID_XXX", topic = "TP_XXX", filter = "TAGA") public void someMethodReceivePojo(OrderPojo somePojo) { // do something } }
非 SOFABOOT 示例
consumer.subscribe("TP_XXX", "TAGA", new GenericMessageListener<OrderPojo>() { @Override public Class<OrderPojo> payloadClass() { return OrderPojo.class; } @Override public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) { System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue()); // 如果想測試消息重投的功能,可以將Action.CommitMessage 替換成Action.ReconsumeLater return Action.CommitMessage; } });
訂閱多個 Tag
消費(fèi)者如需訂閱某 Topic 下多種類型的消息,請?jiān)诙鄠€ Tag 之間用 || 分隔:
consumer.subscribe("MQ_TOPIC", "TagA||TagB", new GenericMessageListener<OrderPojo>() {
@Override
public Class<OrderPojo> payloadClass() {
return OrderPojo.class;
}
@Override
public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) {
System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue());
// 如果想測試消息重投的功能,可以將Action.CommitMessage 替換成Action.ReconsumeLater
return Action.CommitMessage;
}
});
SQL92 過濾
過濾表達(dá)式中的 TAGS 特指消息的 TAG,其他的變量取自消息屬性 Properties
,可通過 Message#putUserProperties
設(shè)置。
SOFABOOT 示例
import static com.alipay.sofa.sofamq.api.MessageConsumer.SQL_FILTER; import com.alipay.sofa.sofamq.api.MessageConsumer; import com.alipay.sofa.sofamq.api.Messaging; // 請使用 xml 或注解將該類配置為 Bean,只有 @Messaging 掃描不到 @Messaging public class SomeClass { @MessageConsumer(group = "GID_XXX", topic = "TP_XXX", filter = "(TAGS in ('tag')) and a > 5", filterType = SQL_FILTER) public void someMethodReceivePojo(OrderPojo somePojo) { // do something } }
非 SOFABOOT 示例
consumer.subscribe(MqConfig.TOPIC, MessageSelector.bySql("(TAGS in ('tag')) and a > 5"), new MessageListenerImpl());
錯誤示例
同一個消費(fèi)者多次訂閱某個 Topic 下的 Tag,以最后一次訂閱的 Tag 為準(zhǔn):
//如下錯誤代碼中,Consumer 只能訂閱到 MQ_TOPIC 下 TagB 的消息,而不能訂閱 TagA 的消息。
consumer.subscribe("MQ_TOPIC", "TagA", new GenericMessageListener<OrderPojo>() {
@Override
public Class<OrderPojo> payloadClass() {
return OrderPojo.class;
}
@Override
public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) {
System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue());
// 如果想測試消息重投的功能,可以將Action.CommitMessage 替換成Action.ReconsumeLater
return Action.CommitMessage;
}
});
consumer.subscribe("MQ_TOPIC", "TagB", new GenericMessageListener<OrderPojo>() {
@Override
public Class<OrderPojo> payloadClass() {
return OrderPojo.class;
}
@Override
public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) {
System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue());
// 如果想測試消息重投的功能,可以將Action.CommitMessage 替換成Action.ReconsumeLater
return Action.CommitMessage;
}
});
更多信息
同一個 Group ID 下的消費(fèi)者實(shí)例與 Topic 的訂閱關(guān)系需保持一致,詳情請參見 訂閱關(guān)系一致。
合理使用 Topic 和 Tag 來過濾消息可以讓業(yè)務(wù)更清晰,詳情請參見 Topic 與 Tag。