消費者分類
云消息隊列 RocketMQ 版支持PushConsumer和SimpleConsumer這兩種類型的消費者,本文分別從使用方式、實現(xiàn)原理、可靠性重試和適用場景等方面為您介紹這兩種類型的消費者。
背景信息
云消息隊列 RocketMQ 版面向不同的業(yè)務(wù)場景提供了不同消費者類型,每種消費者類型的集成方式和控制方式都不一樣。了解如下問題,可以幫助您選擇更匹配業(yè)務(wù)場景的消費者類型。
如何實現(xiàn)并發(fā)消費:消費者如何使用并發(fā)的多線程機制處理消息,以此提高消息處理效率?
如何實現(xiàn)同步、異步消息處理:對于不同的集成場景,消費者獲取消息后可能會將消息異步分發(fā)到業(yè)務(wù)邏輯中處理,此時,消息異步化處理如何實現(xiàn)?
如何實現(xiàn)消息可靠處理:消費者處理消息時如何返回響應(yīng)結(jié)果?如何在消息異常情況進行重試,保證消息的可靠處理?
以上問題的具體答案,請參考下文的PushConsumer和SimpleConsumer。
功能概述
如上圖所示,云消息隊列 RocketMQ 版的消費者處理消息時主要經(jīng)過以下階段:消息獲取—>消息處理—>消費狀態(tài)提交。
針對以上幾個階段,云消息隊列 RocketMQ 版提供了不同的消費者類型:PushConsumer和SimpleConsumer。這兩種類型的消費者通過不同的實現(xiàn)方式和接口可滿足您在不同業(yè)務(wù)場景下的消費需求。具體差異如下:
若您的業(yè)務(wù)場景發(fā)生變更,或您當前使用的消費者類型不適合當前業(yè)務(wù),您可以選擇變更消費者類型。變更消費者類型不影響當前云消息隊列 RocketMQ 版資源的使用和業(yè)務(wù)處理。
對比項 | PushConsumer | SimpleConsumer |
接口方式 | 使用監(jiān)聽器回調(diào)接口返回消費結(jié)果,消費者僅允許在監(jiān)聽器范圍內(nèi)處理消費邏輯。 | 業(yè)務(wù)方自行實現(xiàn)消息處理,并主動調(diào)用接口返回消費結(jié)果。 |
消費并發(fā)度管理 | 由SDK管理消費并發(fā)度。 | 由業(yè)務(wù)方消費邏輯自行管理消費線程。 |
接口靈活度 | 高度封裝,不夠靈活。 | 原子接口,可靈活自定義。 |
適用場景 | 適用于無自定義流程的開發(fā)場景。 | 適用于需要高度自定義業(yè)務(wù)流程的開發(fā)場景。 |
PushConsumer
PushConsumers是一種高度封裝的消費者類型,消費消息僅通過消費監(jiān)聽器處理業(yè)務(wù)并返回消費結(jié)果。消息的獲取、消費狀態(tài)提交以及消費重試都通過云消息隊列 RocketMQ 版的客戶端SDK完成。
使用方式
PushConsumer的使用方式比較固定,在消費者初始化時注冊一個消費監(jiān)聽器,并在消費監(jiān)聽器內(nèi)部實現(xiàn)消息處理邏輯。由云消息隊列 RocketMQ 版的SDK在后臺完成消息獲取、觸發(fā)監(jiān)聽器調(diào)用以及進行消息重試處理。
示例代碼如下:
//消費示例:使用PushConsumer消費普通消息。
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "Your Topic";
FilterExpression filterExpression = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
//設(shè)置消費者分組。
.setConsumerGroup("Your ConsumerGroup")
//設(shè)置接入點。
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
//設(shè)置預(yù)綁定的訂閱關(guān)系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
//設(shè)置消費監(jiān)聽器。
.setMessageListener(new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
//消費消息并返回處理結(jié)果。
return ConsumeResult.SUCCESS;
}
})
.build();
PushConsumer的消費監(jiān)聽器執(zhí)行結(jié)果分為以下三種情況:
返回消費成功:以Java SDK為例,返回
ConsumeResult.SUCCESS
,表示該消息處理成功,服務(wù)端按照消費結(jié)果更新消費進度。返回消費失敗:以Java SDK為例,返回
ConsumeResult.FAILURE
,表示該消息處理失敗,需要根據(jù)消費重試邏輯判斷是否進行重試消費。出現(xiàn)非預(yù)期失敗:例如拋異常等行為,該結(jié)果按照消費失敗處理,需要根據(jù)消費重試邏輯判斷是否進行重試消費。
PushConsumer消費消息時,若消息處理邏輯出現(xiàn)預(yù)期之外的阻塞導(dǎo)致消息處理一直無法執(zhí)行成功,SDK會按照消費超時處理強制提交消費失敗結(jié)果,并按照消費重試邏輯進行處理。消息超時,請參見PushConsumer消費重試策略。
出現(xiàn)消費超時情況時,SDK雖然提交消費失敗結(jié)果,但是當前消費線程可能仍然無法響應(yīng)中斷,還會繼續(xù)處理消息。
內(nèi)部原理
在PushConsumer類型中,消息的實時處理能力是基于SDK內(nèi)部的典型Reactor線程模型實現(xiàn)的。如下圖所示,SDK內(nèi)置了一個長輪詢線程,先將消息異步拉取到SDK內(nèi)置的緩存隊列中,再分別提交到消費線程中,觸發(fā)監(jiān)聽器執(zhí)行本地消費邏輯。
可靠性重試
PushConsumer消費者類型中,客戶端SDK和消費邏輯的唯一邊界是消費監(jiān)聽器接口。客戶端SDK嚴格按照監(jiān)聽器的返回結(jié)果判斷消息是否消費成功,并做可靠性重試。所有消息必須以同步方式進行消費處理,并在監(jiān)聽器接口結(jié)束時返回調(diào)用結(jié)果,不允許再做異步化分發(fā)。消息重試具體信息,請參見PushConsumer消費重試策略。
使用PushConsumer消費者消費時,不允許使用以下方式處理消息,否則云消息隊列 RocketMQ 版無法保證消息的可靠性。
錯誤方式一:消息還未處理完成,就提前返回消費成功結(jié)果。此時如果消息消費失敗,云消息隊列 RocketMQ 版服務(wù)端是無法感知的,因此不會進行消費重試。
錯誤方式二:在消費監(jiān)聽器內(nèi)將消息再次分發(fā)到自定義的其他線程,消費監(jiān)聽器提前返回消費結(jié)果。此時如果消息消費失敗,云消息隊列 RocketMQ 版服務(wù)端同樣無法感知,因此也不會進行消費重試。
順序性保障
基于云消息隊列 RocketMQ 版順序消息的定義,如果消費者分組設(shè)置了順序消費模式,則PushConsumer在觸發(fā)消費監(jiān)聽器時,嚴格遵循消息的先后順序。業(yè)務(wù)處理邏輯無感知即可保證消息的消費順序。
消息消費按照順序處理的前提是遵循同步提交原則,如果業(yè)務(wù)邏輯自定義實現(xiàn)了異步分發(fā),則云消息隊列 RocketMQ 版無法保證消息的順序性。
適用場景
PushConsumer嚴格限制了消息同步處理及每條消息的處理超時時間,適用于以下場景:
消息處理時間可預(yù)估:如果不確定消息處理耗時,經(jīng)常有預(yù)期之外的長時間耗時的消息,PushConsumer的可靠性保證會頻繁觸發(fā)消息重試機制造成大量重復(fù)消息。
無異步化、高級定制場景:PushConsumer限制了消費邏輯的線程模型,由客戶端SDK內(nèi)部按最大吞吐量觸發(fā)消息處理。該模型開發(fā)邏輯簡單,但是不允許使用異步化和自定義處理流程。
SimpleConsumer
SimpleConsumer是一種接口原子型的消費者類型,消息的獲取、消費狀態(tài)提交以及消費重試都是通過消費者業(yè)務(wù)邏輯主動發(fā)起調(diào)用完成。
使用方式
SimpleConsumer的使用涉及多個接口調(diào)用,由業(yè)務(wù)邏輯按需調(diào)用接口獲取消息,然后分發(fā)給業(yè)務(wù)線程處理消息,最后按照處理的結(jié)果調(diào)用提交接口,返回服務(wù)端當前消息的處理結(jié)果。示例如下:
//消費示例:使用SimpleConsumer消費普通消息,主動獲取消息處理并提交。
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "Your Topic";
FilterExpression filterExpression = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
//設(shè)置消費者分組。
.setConsumerGroup("Your ConsumerGroup")
//設(shè)置接入點。
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
//設(shè)置預(yù)綁定的訂閱關(guān)系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
List<MessageView> messageViewList = null;
try {
//SimpleConsumer需要主動獲取消息,并處理。
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消費處理完成后,需要主動調(diào)用ACK提交消費結(jié)果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系統(tǒng)流控等原因造成拉取失敗,需要重新發(fā)起獲取消息請求。
e.printStackTrace();
}
SimpleConsumer主要涉及以下幾個接口行為:
接口名稱 | 主要作用 | 可修改參數(shù) |
| 消費者主動調(diào)用該接口從服務(wù)端獲取消息。 說明 由于服務(wù)端存儲為分布式,可能會出現(xiàn)服務(wù)端實際有消息,但是返回為空的現(xiàn)象。 一般可通過重新發(fā)起ReceiveMessage調(diào)用或提高ReceiveMessage的并發(fā)度解決。 |
|
| 消費者成功消費消息后,主動調(diào)用該接口向服務(wù)端返回消費成功響應(yīng)。 | 無 |
| 消費重試場景下,消費者可通過該接口修改消息處理時長,即控制消息的重試間隔。 | 消費不可見時間:調(diào)用本接口可修改 |
可靠性重試
SimpleConsumer消費者類型中,客戶端SDK和服務(wù)端通過ReceiveMessage
和AckMessage
接口通信。客戶端SDK如果處理消息成功則調(diào)用AckMessage
接口;如果處理失敗只需要不回復(fù)ACK響應(yīng),即可在定義的消費不可見時間到達后觸發(fā)消費重試流程。更多信息,請參見SimpleConsumer消費重試策略。
順序性保障
基于云消息隊列 RocketMQ 版順序消息的定義,SimpleConsumer在處理順序消息時,會按照消息存儲的先后順序獲取消息。即需要保持順序的一組消息中,如果前面的消息未處理完成,則無法獲取到后面的消息。
適用場景
SimpleConsumer提供原子接口,用于消息獲取和提交消費結(jié)果,相對于PushConsumer方式更加靈活。SimpleConsumer適用于以下場景:
消息處理時長不可控:如果消息處理時長無法預(yù)估,經(jīng)常有長時間耗時的消息處理情況。建議使用SimpleConsumer消費類型,可以在消費時自定義消息的預(yù)估處理時長,若實際業(yè)務(wù)中預(yù)估的消息處理時長不符合預(yù)期,也可以通過接口提前修改。
需要異步化、批量消費等高級定制場景:SimpleConsumer在SDK內(nèi)部沒有復(fù)雜的線程封裝,完全由業(yè)務(wù)邏輯自由定制,可以實現(xiàn)異步分發(fā)、批量消費等高級定制場景。
需要自定義消費速率:SimpleConsumer是由業(yè)務(wù)邏輯主動調(diào)用接口獲取消息,因此可以自由調(diào)整獲取消息的頻率,自定義控制消費速率。
使用建議
PushConsumer合理控制消費耗時,避免無限阻塞
對于PushConsumer消費類型,需要嚴格控制消息的消費耗時,盡量避免出現(xiàn)消息處理超時導(dǎo)致消息重復(fù)。如果業(yè)務(wù)經(jīng)常會出現(xiàn)一些預(yù)期外的長時間耗時的消息,建議使用SimpleConsumer,并設(shè)置好消費不可見時間。