日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

消費者分類

云消息隊列 RocketMQ 版支持PushConsumer和SimpleConsumer這兩種類型的消費者,本文分別從使用方式、實現(xiàn)原理、可靠性重試和適用場景等方面為您介紹這兩種類型的消費者。

背景信息

云消息隊列 RocketMQ 版面向不同的業(yè)務(wù)場景提供了不同消費者類型,每種消費者類型的集成方式和控制方式都不一樣。了解如下問題,可以幫助您選擇更匹配業(yè)務(wù)場景的消費者類型。

  • 如何實現(xiàn)并發(fā)消費:消費者如何使用并發(fā)的多線程機制處理消息,以此提高消息處理效率?

  • 如何實現(xiàn)同步、異步消息處理:對于不同的集成場景,消費者獲取消息后可能會將消息異步分發(fā)到業(yè)務(wù)邏輯中處理,此時,消息異步化處理如何實現(xiàn)?

  • 如何實現(xiàn)消息可靠處理:消費者處理消息時如何返回響應(yīng)結(jié)果?如何在消息異常情況進行重試,保證消息的可靠處理?

以上問題的具體答案,請參考下文的PushConsumerSimpleConsumer

功能概述

消息消費流程

如上圖所示,云消息隊列 RocketMQ 版的消費者處理消息時主要經(jīng)過以下階段:消息獲取—>消息處理—>消費狀態(tài)提交。

針對以上幾個階段,云消息隊列 RocketMQ 版提供了不同的消費者類型:PushConsumer和SimpleConsumer。這兩種類型的消費者通過不同的實現(xiàn)方式和接口可滿足您在不同業(yè)務(wù)場景下的消費需求。具體差異如下:

說明

若您的業(yè)務(wù)場景發(fā)生變更,或您當前使用的消費者類型不適合當前業(yè)務(wù),您可以選擇變更消費者類型。變更消費者類型不影響當前云消息隊列 RocketMQ 版資源的使用和業(yè)務(wù)處理。

對比項

PushConsumer

SimpleConsumer

接口方式

使用監(jiān)聽器回調(diào)接口返回消費結(jié)果,消費者僅允許在監(jiān)聽器范圍內(nèi)處理消費邏輯。

業(yè)務(wù)方自行實現(xiàn)消息處理,并主動調(diào)用接口返回消費結(jié)果。

消費并發(fā)度管理

由SDK管理消費并發(fā)度。

由業(yè)務(wù)方消費邏輯自行管理消費線程。

接口靈活度

高度封裝,不夠靈活。

原子接口,可靈活自定義。

適用場景

適用于無自定義流程的開發(fā)場景。

適用于需要高度自定義業(yè)務(wù)流程的開發(fā)場景。

PushConsumer

PushConsumers是一種高度封裝的消費者類型,消費消息僅通過消費監(jiān)聽器處理業(yè)務(wù)并返回消費結(jié)果。消息的獲取、消費狀態(tài)提交以及消費重試都通過云消息隊列 RocketMQ 版的客戶端SDK完成。

使用方式

PushConsumer的使用方式比較固定,在消費者初始化時注冊一個消費監(jiān)聽器,并在消費監(jiān)聽器內(nèi)部實現(xiàn)消息處理邏輯。由云消息隊列 RocketMQ 版的SDK在后臺完成消息獲取、觸發(fā)監(jiān)聽器調(diào)用以及進行消息重試處理。

示例代碼如下:

//消費示例:使用PushConsumer消費普通消息。
ClientServiceProvider provider = ClientServiceProvider.loadService();
        String topic = "Your Topic";
        FilterExpression filterExpression = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                //設(shè)置消費者分組。
                .setConsumerGroup("Your ConsumerGroup")
                //設(shè)置接入點。
                .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
                //設(shè)置預(yù)綁定的訂閱關(guān)系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                //設(shè)置消費監(jiān)聽器。
                .setMessageListener(new MessageListener() {
                    @Override
                    public ConsumeResult consume(MessageView messageView) {
                        //消費消息并返回處理結(jié)果。
                        return ConsumeResult.SUCCESS;
                    }
                })
                .build();
                

PushConsumer的消費監(jiān)聽器執(zhí)行結(jié)果分為以下三種情況:

  • 返回消費成功:以Java SDK為例,返回ConsumeResult.SUCCESS,表示該消息處理成功,服務(wù)端按照消費結(jié)果更新消費進度。

  • 返回消費失敗:以Java SDK為例,返回ConsumeResult.FAILURE,表示該消息處理失敗,需要根據(jù)消費重試邏輯判斷是否進行重試消費。

  • 出現(xiàn)非預(yù)期失敗:例如拋異常等行為,該結(jié)果按照消費失敗處理,需要根據(jù)消費重試邏輯判斷是否進行重試消費。

PushConsumer消費消息時,若消息處理邏輯出現(xiàn)預(yù)期之外的阻塞導(dǎo)致消息處理一直無法執(zhí)行成功,SDK會按照消費超時處理強制提交消費失敗結(jié)果,并按照消費重試邏輯進行處理。消息超時,請參見PushConsumer消費重試策略

說明

出現(xiàn)消費超時情況時,SDK雖然提交消費失敗結(jié)果,但是當前消費線程可能仍然無法響應(yīng)中斷,還會繼續(xù)處理消息。

內(nèi)部原理

在PushConsumer類型中,消息的實時處理能力是基于SDK內(nèi)部的典型Reactor線程模型實現(xiàn)的。如下圖所示,SDK內(nèi)置了一個長輪詢線程,先將消息異步拉取到SDK內(nèi)置的緩存隊列中,再分別提交到消費線程中,觸發(fā)監(jiān)聽器執(zhí)行本地消費邏輯。

PushConsumer原理

可靠性重試

PushConsumer消費者類型中,客戶端SDK和消費邏輯的唯一邊界是消費監(jiān)聽器接口。客戶端SDK嚴格按照監(jiān)聽器的返回結(jié)果判斷消息是否消費成功,并做可靠性重試。所有消息必須以同步方式進行消費處理,并在監(jiān)聽器接口結(jié)束時返回調(diào)用結(jié)果,不允許再做異步化分發(fā)。消息重試具體信息,請參見PushConsumer消費重試策略

使用PushConsumer消費者消費時,不允許使用以下方式處理消息,否則云消息隊列 RocketMQ 版無法保證消息的可靠性。

  • 錯誤方式一:消息還未處理完成,就提前返回消費成功結(jié)果。此時如果消息消費失敗,云消息隊列 RocketMQ 版服務(wù)端是無法感知的,因此不會進行消費重試。

  • 錯誤方式二:在消費監(jiān)聽器內(nèi)將消息再次分發(fā)到自定義的其他線程,消費監(jiān)聽器提前返回消費結(jié)果。此時如果消息消費失敗,云消息隊列 RocketMQ 版服務(wù)端同樣無法感知,因此也不會進行消費重試。

順序性保障

基于云消息隊列 RocketMQ 版順序消息的定義,如果消費者分組設(shè)置了順序消費模式,則PushConsumer在觸發(fā)消費監(jiān)聽器時,嚴格遵循消息的先后順序。業(yè)務(wù)處理邏輯無感知即可保證消息的消費順序。

說明

消息消費按照順序處理的前提是遵循同步提交原則,如果業(yè)務(wù)邏輯自定義實現(xiàn)了異步分發(fā),則云消息隊列 RocketMQ 版無法保證消息的順序性。

適用場景

PushConsumer嚴格限制了消息同步處理及每條消息的處理超時時間,適用于以下場景:

  • 消息處理時間可預(yù)估:如果不確定消息處理耗時,經(jīng)常有預(yù)期之外的長時間耗時的消息,PushConsumer的可靠性保證會頻繁觸發(fā)消息重試機制造成大量重復(fù)消息。

  • 無異步化、高級定制場景:PushConsumer限制了消費邏輯的線程模型,由客戶端SDK內(nèi)部按最大吞吐量觸發(fā)消息處理。該模型開發(fā)邏輯簡單,但是不允許使用異步化和自定義處理流程。

SimpleConsumer

SimpleConsumer是一種接口原子型的消費者類型,消息的獲取、消費狀態(tài)提交以及消費重試都是通過消費者業(yè)務(wù)邏輯主動發(fā)起調(diào)用完成。

使用方式

SimpleConsumer的使用涉及多個接口調(diào)用,由業(yè)務(wù)邏輯按需調(diào)用接口獲取消息,然后分發(fā)給業(yè)務(wù)線程處理消息,最后按照處理的結(jié)果調(diào)用提交接口,返回服務(wù)端當前消息的處理結(jié)果。示例如下:

 //消費示例:使用SimpleConsumer消費普通消息,主動獲取消息處理并提交。
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        String topic = "Your Topic";
        FilterExpression filterExpression = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);

        SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
                //設(shè)置消費者分組。
                .setConsumerGroup("Your ConsumerGroup")
                //設(shè)置接入點。
                .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
                //設(shè)置預(yù)綁定的訂閱關(guān)系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .build();
        List<MessageView> messageViewList = null;
        try {
            //SimpleConsumer需要主動獲取消息,并處理。
            messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
            messageViewList.forEach(messageView -> {
                System.out.println(messageView);
                //消費處理完成后,需要主動調(diào)用ACK提交消費結(jié)果。
                try {
                    simpleConsumer.ack(messageView);
                } catch (ClientException e) {
                    e.printStackTrace();
                }
            });
        } catch (ClientException e) {
            //如果遇到系統(tǒng)流控等原因造成拉取失敗,需要重新發(fā)起獲取消息請求。
            e.printStackTrace();
        }

SimpleConsumer主要涉及以下幾個接口行為:

接口名稱

主要作用

可修改參數(shù)

ReceiveMessage

消費者主動調(diào)用該接口從服務(wù)端獲取消息。

說明

由于服務(wù)端存儲為分布式,可能會出現(xiàn)服務(wù)端實際有消息,但是返回為空的現(xiàn)象。

一般可通過重新發(fā)起ReceiveMessage調(diào)用或提高ReceiveMessage的并發(fā)度解決。

  • 批量拉取消息數(shù):SimpleConsumer可以一次性批量獲取多條消息實現(xiàn)批量消費,該接口可修改批量獲取的消息數(shù)量。

  • 消費不可見時間:消息的最長處理耗時,該參數(shù)用于控制消費失敗時的消息重試間隔。具體信息,請參見SimpleConsumer消費重試策略。消費者調(diào)用ReceiveMessage接口時需要指定消費不可見時間。

AckMessage

消費者成功消費消息后,主動調(diào)用該接口向服務(wù)端返回消費成功響應(yīng)。

ChangeInvisibleDuration

消費重試場景下,消費者可通過該接口修改消息處理時長,即控制消息的重試間隔。

消費不可見時間:調(diào)用本接口可修改ReceiveMessage接口預(yù)設(shè)的消費不可見時間的參數(shù)值。一般用于需要延長消息處理時長的場景。

可靠性重試

SimpleConsumer消費者類型中,客戶端SDK和服務(wù)端通過ReceiveMessageAckMessage接口通信。客戶端SDK如果處理消息成功則調(diào)用AckMessage接口;如果處理失敗只需要不回復(fù)ACK響應(yīng),即可在定義的消費不可見時間到達后觸發(fā)消費重試流程。更多信息,請參見SimpleConsumer消費重試策略

順序性保障

基于云消息隊列 RocketMQ 版順序消息的定義,SimpleConsumer在處理順序消息時,會按照消息存儲的先后順序獲取消息。即需要保持順序的一組消息中,如果前面的消息未處理完成,則無法獲取到后面的消息。

適用場景

SimpleConsumer提供原子接口,用于消息獲取和提交消費結(jié)果,相對于PushConsumer方式更加靈活。SimpleConsumer適用于以下場景:

  • 消息處理時長不可控:如果消息處理時長無法預(yù)估,經(jīng)常有長時間耗時的消息處理情況。建議使用SimpleConsumer消費類型,可以在消費時自定義消息的預(yù)估處理時長,若實際業(yè)務(wù)中預(yù)估的消息處理時長不符合預(yù)期,也可以通過接口提前修改。

  • 需要異步化、批量消費等高級定制場景:SimpleConsumer在SDK內(nèi)部沒有復(fù)雜的線程封裝,完全由業(yè)務(wù)邏輯自由定制,可以實現(xiàn)異步分發(fā)、批量消費等高級定制場景。

  • 需要自定義消費速率:SimpleConsumer是由業(yè)務(wù)邏輯主動調(diào)用接口獲取消息,因此可以自由調(diào)整獲取消息的頻率,自定義控制消費速率。

使用建議

PushConsumer合理控制消費耗時,避免無限阻塞

對于PushConsumer消費類型,需要嚴格控制消息的消費耗時,盡量避免出現(xiàn)消息處理超時導(dǎo)致消息重復(fù)。如果業(yè)務(wù)經(jīng)常會出現(xiàn)一些預(yù)期外的長時間耗時的消息,建議使用SimpleConsumer,并設(shè)置好消費不可見時間。