日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

消息過濾

本文描述 SOFAStack 消息隊(duì)列的消費(fèi)者如何根據(jù) Tag 在消息隊(duì)列服務(wù)端完成消息過濾,以確保消費(fèi)者最終只消費(fèi)到其關(guān)注的消息類型。

Tag,即消息標(biāo)簽,用于對某個 Topic 下的消息進(jìn)行分類。消息隊(duì)列的生產(chǎn)者在發(fā)送消息時,已經(jīng)指定消息的 Tag,消費(fèi)者需根據(jù)已經(jīng)指定的 Tag 來進(jìn)行訂閱。

示例代碼

發(fā)送消息

發(fā)送消息時,每條消息必須指明 Tag:

producer.messageBuilder().withTopic("TP_XXX").withTags("TAGA").withValue(orderPojo).build()

訂閱所有 Tag

消費(fèi)者如需訂閱某 Topic 下所有類型的消息,Tag 用符號 * 表示:

  • SOFABOOT 示例

    import com.alipay.sofa.sofamq.api.MessageConsumer;
    import com.alipay.sofa.sofamq.api.Messaging;
    
    // 請使用 xml 或注解將該類配置為 Bean,只有 @Messaging 掃描不到
    @Messaging
    public class SomeClass {
        @MessageConsumer(group = "GID_XXX", topic = "TP_XXX", filter = "*")
        public void someMethodReceivePojo(OrderPojo somePojo) {
            // do something
        }
    }
  • 非 SOFABOOT 示例

        consumer.subscribe("TP_XXX", "*", new GenericMessageListener<OrderPojo>() {
                @Override
                public Class<OrderPojo> payloadClass() {
                    return OrderPojo.class;
                }
    
                @Override
                public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) {
                    System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue());
                    // 如果想測試消息重投的功能,可以將Action.CommitMessage 替換成Action.ReconsumeLater
                    return Action.CommitMessage;
                }
        });

訂閱單個 Tag

消費(fèi)者如需訂閱某 Topic 下某一種類型的消息,請明確標(biāo)明 Tag:

  • SOFABOOT 示例

    import com.alipay.sofa.sofamq.api.MessageConsumer;
    import com.alipay.sofa.sofamq.api.Messaging;
    
    // 請使用 xml 或注解將該類配置為 Bean,只有 @Messaging 掃描不到
    @Messaging
    public class SomeClass {
        @MessageConsumer(group = "GID_XXX", topic = "TP_XXX", filter = "TAGA")
        public void someMethodReceivePojo(OrderPojo somePojo) {
            // do something
        }
    }
  • 非 SOFABOOT 示例

    consumer.subscribe("TP_XXX", "TAGA", new GenericMessageListener<OrderPojo>() {
        @Override
        public Class<OrderPojo> payloadClass() {
            return OrderPojo.class;
        }
    
        @Override
        public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) {
            System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue());
            // 如果想測試消息重投的功能,可以將Action.CommitMessage 替換成Action.ReconsumeLater
            return Action.CommitMessage;
        }
    });

訂閱多個 Tag

消費(fèi)者如需訂閱某 Topic 下多種類型的消息,請?jiān)诙鄠€ Tag 之間用 || 分隔:

consumer.subscribe("MQ_TOPIC", "TagA||TagB", new GenericMessageListener<OrderPojo>() {
    @Override
    public Class<OrderPojo> payloadClass() {
        return OrderPojo.class;
    }

    @Override
    public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) {
        System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue());
        // 如果想測試消息重投的功能,可以將Action.CommitMessage 替換成Action.ReconsumeLater
        return Action.CommitMessage;
    }
});

SQL92 過濾

過濾表達(dá)式中的 TAGS 特指消息的 TAG,其他的變量取自消息屬性 Properties,可通過 Message#putUserProperties 設(shè)置。

  • SOFABOOT 示例

    import static com.alipay.sofa.sofamq.api.MessageConsumer.SQL_FILTER;
    
    import com.alipay.sofa.sofamq.api.MessageConsumer;
    import com.alipay.sofa.sofamq.api.Messaging;
    
    // 請使用 xml 或注解將該類配置為 Bean,只有 @Messaging 掃描不到
    @Messaging
    public class SomeClass {
        @MessageConsumer(group = "GID_XXX", topic = "TP_XXX", filter = "(TAGS in ('tag')) and a > 5", filterType = SQL_FILTER)
        public void someMethodReceivePojo(OrderPojo somePojo) {
            // do something
        }
    }
  • 非 SOFABOOT 示例

    consumer.subscribe(MqConfig.TOPIC, MessageSelector.bySql("(TAGS in ('tag')) and a > 5"), new MessageListenerImpl());

錯誤示例

同一個消費(fèi)者多次訂閱某個 Topic 下的 Tag,以最后一次訂閱的 Tag 為準(zhǔn):

//如下錯誤代碼中,Consumer 只能訂閱到 MQ_TOPIC 下 TagB 的消息,而不能訂閱 TagA 的消息。
consumer.subscribe("MQ_TOPIC", "TagA", new GenericMessageListener<OrderPojo>() {
    @Override
    public Class<OrderPojo> payloadClass() {
        return OrderPojo.class;
    }

    @Override
    public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) {
        System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue());
        // 如果想測試消息重投的功能,可以將Action.CommitMessage 替換成Action.ReconsumeLater
        return Action.CommitMessage;
    }
});

consumer.subscribe("MQ_TOPIC", "TagB", new GenericMessageListener<OrderPojo>() {
    @Override
    public Class<OrderPojo> payloadClass() {
        return OrderPojo.class;
    }

    @Override
    public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) {
        System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue());
        // 如果想測試消息重投的功能,可以將Action.CommitMessage 替換成Action.ReconsumeLater
        return Action.CommitMessage;
    }
});

更多信息

  • 同一個 Group ID 下的消費(fèi)者實(shí)例與 Topic 的訂閱關(guān)系需保持一致,詳情請參見 訂閱關(guān)系一致

  • 合理使用 Topic 和 Tag 來過濾消息可以讓業(yè)務(wù)更清晰,詳情請參見 Topic 與 Tag