消費者訂閱了某個Topic后,云消息隊列 RocketMQ 版會將該Topic中的所有消息投遞給消費端進行消費。若消費者只需要關注部分消息,可通過設置過濾條件在云消息隊列 RocketMQ 版服務端完成消息過濾,只消費需要關注的消息。本文介紹消息過濾的功能描述、應用場景、使用限制、配置方式及示例代碼。
功能描述
消息過濾功能指消息生產者向Topic中發送消息時,設置消息屬性對消息進行分類,消費者訂閱Topic時,根據消息屬性設置過濾條件對消息進行過濾,只有符合過濾條件的消息才會被投遞到消費端進行消費。
消費者訂閱Topic時若未設置過濾條件,無論消息發送時是否有設置過濾屬性,Topic中的所有消息都將被投遞到消費端進行消費。
云消息隊列 RocketMQ 版支持的消息過濾方式如下:
過濾方式 | 說明 | 場景 | 實例限制 | 協議限制 |
Tag過濾(默認過濾方式) |
訂閱的Tag和發送者設置的消息Tag一致,則消息被投遞給消費端進行消費。 | 簡單過濾場景。 一條消息支持設置一個Tag,僅需要對Topic中的消息進行一級分類并過濾時使用此方式。 | 無。 | 無。 |
SQL屬性過濾 |
滿足過濾表達式的消息被投遞給消費端進行消費。 | 復雜過濾場景。 一條消息支持設置多個自定義屬性,可根據SQL語法自定義組合多種類型的表達式對消息進行多級分類并實現多維度的過濾。 | 僅企業鉑金版實例支持該功能。 | 僅商業版TCP協議的SDK支持該功能。 |
Tag過濾
Tag,即消息標簽,用于對某個Topic下的消息進行分類。云消息隊列 RocketMQ 版的生產者在發送消息時,指定消息的Tag,消費者需根據已經指定的Tag來進行訂閱。
場景示例
以下圖電商交易場景為例,從客戶下單到收到商品這一過程會生產一系列消息,以如下消息為例:
訂單消息
支付消息
物流消息
這些消息會發送到名稱為Trade_Topic的Topic中,被各個不同的系統所訂閱,以如下系統為例:
支付系統:只需訂閱支付消息。
物流系統:只需訂閱物流消息。
交易成功率分析系統:需訂閱訂單和支付消息。
實時計算系統:需要訂閱所有和交易相關的消息。
過濾示意圖如下所示。
配置方式
云消息隊列 RocketMQ 版支持通過SDK配置Tag過濾功能,分別在消息發送和訂閱代碼中設置消息Tag和訂閱消息Tag。SDK詳細信息,請參見SDK參考概述。消息發送端和消費端的代碼配置方法如下:
發送消息
發送消息時,每條消息必須指明Tag。
Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());
訂閱所有Tag
消費者如需訂閱某Topic下所有類型的消息,Tag用星號(*)表示。
consumer.subscribe("MQ_TOPIC", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
訂閱單個Tag
消費者如需訂閱某Topic下某一種類型的消息,請明確標明Tag。
consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
訂閱多個Tag
消費者如需訂閱某Topic下多種類型的消息,請在多個Tag之間用兩個豎線(||)分隔。
consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
錯誤示例
同一個消費者多次訂閱某個Topic下的Tag,以最后一次訂閱的Tag為準。
//如下錯誤代碼中,Consumer只能訂閱到MQ_TOPIC下TagB的消息,而不能訂閱TagA的消息。 consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } }); consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
SQL屬性過濾
SQL屬性過濾是在消息發送時設置消息的自定義屬性,消費者訂閱時使用SQL語法設置過濾表達式,根據自定義屬性過濾消息,云消息隊列 RocketMQ 版根據表達式的邏輯進行計算,將符合條件的消息投遞到消費端。
Tag屬于一種特殊的消息屬性,所以SQL過濾方式也兼容Tag過濾方法,支持通過Tag屬性過濾消息。在SQL語法中,Tag的屬性值為TAGS。
使用限制
使用SQL屬性過濾消息時,有以下限制:
只有企業鉑金版實例支持SQL屬性過濾,標準版實例不支持該功能。
只有TCP協議的客戶端支持SQL屬性過濾,HTTP協議的客戶端不支持該功能。
若服務端不支持SQL過濾時,客戶端使用SQL過濾消息,則客戶端啟動會報錯或收不到消息。
場景示例
以下圖電商交易場景為例,從客戶下單到收到商品這一過程會生產一系列消息,按照類型將消息分為訂單消息和物流消息,其中給物流消息定義地域屬性,按照地域分為杭州和上海:
訂單消息
物流消息
物流消息且地域為杭州
物流消息且地域為上海
這些消息會發送到名稱為Trade_Topic的Topic中,被各個不同的系統所訂閱,以如下系統為例:
物流系統1:只需訂閱物流消息且消息地域為杭州。
物流系統2:只需訂閱物流消息且消息地域為杭州或上海。
訂單跟蹤系統:只需訂閱訂單消息。
實時計算系統:需要訂閱所有和交易相關的消息。
過濾示意圖如下所示。
配置方式
云消息隊列 RocketMQ 版支持通過SDK配置SQL屬性過濾。發送端需要在發送消息的代碼中設置消息的自定義屬性;消費端需要在訂閱消息代碼中設置SQL語法的過濾表達式。SDK詳細信息,請參見SDK參考概述。消息發送端和消費端的代碼配置方法如下:
消息發送端:
設置消息的自定義屬性。
Message msg = new Message("topic", "tagA", "Hello MQ".getBytes()); // 設置自定義屬性A,屬性值為1。 msg.putUserProperties("A", "1");
消息消費端
使用SQL語法設置過濾表達式,并根據自定義屬性過濾消息。
重要使用屬性時,需要先判斷屬性是否存在。若屬性不存在則過濾表達式的計算結果為NULL,消息不會被投遞到消費端。
// 訂閱自定義屬性A存在且屬性值為1的消息。 consumer.subscribe("topic", MessageSelector.bySql("A IS NOT NULL AND TAGS IS NOT NULL AND A = '1'"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
SQL語法如下:
語法 | 說明 | 示例 |
IS NULL | 判斷屬性不存在。 |
|
IS NOT NULL | 判斷屬性存在。 |
|
| 用于比較數字,不能用于比較字符串,否則Consumer客戶端啟動會報錯。 說明 可轉化為數字的字符串也被認為是數字。 |
|
BETWEEN xxx AND xxx | 用于比較數字,不能用于比較字符串,否則Consumer客戶端啟動會報錯。等價于>= xxx AND <= xxx。表示屬性值在兩個數字之間。 |
|
NOT BETWEEN xxx AND xxx | 用于比較數字,不能用于比較字符串,否則Consumer客戶端啟動會報錯。等價于< xxx OR > xxx,表示屬性值在兩個值的區間之外。 |
|
IN (xxx, xxx) | 表示屬性的值在某個集合內。集合的元素只能是字符串。 |
|
| 等于和不等于。可用于比較數字和字符串。 |
|
| 邏輯與和邏輯或。可用于組合任意簡單的邏輯判斷,需要將每個邏輯判斷內容放入括號內。 |
|
由于SQL屬性過濾是發送端定義消息屬性,消費端設置SQL過濾條件,因此過濾條件的計算結果具有不確定性,服務端的處理方式為:
如果過濾條件的表達式計算拋異常,消息默認被過濾,不會被投遞給消費端。例如比較數字和非數字類型的值。
如果過濾條件的表達式計算值為null或不是布爾類型(true和false),則消息默認被過濾,不會被投遞給消費端。例如發送消息時未定義某個屬性,在訂閱時過濾條件中直接使用該屬性,則過濾條件的表達式計算結果為null。
如果消息自定義屬性為浮點型,但過濾條件中使用整數進行判斷,則消息默認被過濾,不會被投遞給消費端。
示例代碼
發送消息
同時設置消息Tag和自定義屬性。
Producer producer = ONSFactory.createProducer(properties); // 設置Tag的值為tagA。 Message msg = new Message("topicA", "tagA", "Hello MQ".getBytes()); // 設置自定義屬性region為hangzhou。 msg.putUserProperties("region", "hangzhou"); // 設置自定義屬性price為50。 msg.putUserProperties("price", "50"); SendResult sendResult = producer.send(msg);
根據單個自定義屬性訂閱消息。
Consumer consumer = ONSFactory.createConsumer(properties); // 只訂閱屬性region為hangzhou的消息,若消息中未定義屬性region或屬性值不是hangzhou,則消息不會被投遞到消費端。 consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND region = 'hangzhou'"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
預期結果:示例中發送的消息屬性符合訂閱的過濾條件,消息被投遞到消費端。
同時根據Tag和自定義屬性訂閱消息。
Consumer consumer = ONSFactory.createConsumer(properties); // 只訂閱Tag的值為tagA且屬性price大于30的消息。 consumer.subscribe("topicA", MessageSelector.bySql("TAGS IS NOT NULL AND price IS NOT NULL AND TAGS = 'tagA' AND price > 30 "), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
預期結果:示例中發送的消息Tag和自定義屬性符合訂閱的過濾條件,消息被投遞到消費端。
同時根據多個自定義屬性訂閱消息。
Consumer consumer = ONSFactory.createConsumer(properties); // 只訂閱region為hangzhou且屬性price小于20的消息。 consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND price IS NOT NULL AND region = 'hangzhou' AND price < 20"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
預期結果:消息不會被投遞到消費端。訂閱的過濾條件中price小于20,發送的消息中price屬性值為50,不符合訂閱過濾條件。
訂閱Topic中的所有消息,不進行過濾。
Consumer consumer = ONSFactory.createConsumer(properties); // 若需要訂閱Topic中的所有消息,需要將SQL表達式的值設置為“TRUE”。 consumer.subscribe("topicA", MessageSelector.bySql("TRUE"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
預期結果:Topic中的所有消息都將被投遞到消費端進行消費。
錯誤示例
消息發送時未自定義某屬性,消費端在訂閱時未判斷該屬性是否存在直接使用,則消息不會被投遞給消費端。
Consumer consumer = ONSFactory.createConsumer(properties); // 屬性product在發送消息時未定義,過濾失敗,消息不會被投遞至消費端。 consumer.subscribe("topicA", MessageSelector.bySql("product = 'MQ'"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
更多信息
同一個Group ID下的消費者實例與Topic的訂閱關系需保持一致,更多信息,請參見訂閱關系一致。
合理使用Topic和Tag來過濾消息可以讓業務更清晰,更多信息,請參見Topic與Tag最佳實踐。