日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

消息重試

本文介紹 SOFAStack 消息隊列的消息重試機制和配置方式。

順序消息的重試

對于順序消息,當消費者消費消息失敗后,消息隊列會自動不斷地進行消息重試(每次間隔時間為 1 秒),這時,應用會出現消息消費被阻塞的情況。因此,建議您使用順序消息時,務必保證應用能夠及時監控并處理消費失敗的情況,避免阻塞現象的發生。

無序消息的重試

對于無序消息(普通、定時、延時、事務消息),當消費者消費消息失敗時,您可以通過設置返回狀態達到消息重試的結果。

無序消息的重試只針對集群消費方式生效;廣播方式不提供失敗重試特性,即消費失敗后,失敗消息不再重試,繼續消費新的消息。

說明

以下內容都只針對無序消息生效。

重試次數

消息隊列默認允許每條消息最多重試 16 次,每次重試的間隔時間如下:

第幾次重試

與上次重試的間隔時間

第幾次重試

與上次重試的間隔時間

1

10 秒

9

7 分鐘

2

30 秒

10

8 分鐘

3

1 分鐘

11

9 分鐘

4

2 分鐘

12

10 分鐘

5

3 分鐘

13

20 分鐘

6

4 分鐘

14

30 分鐘

7

5 分鐘

15

1 小時

8

6 分鐘

16

2 小時

如果消息重試 16 次后仍然失敗,消息將不再投遞。如果嚴格按照上述重試時間間隔計算,某條消息在一直消費失敗的前提下,將會在接下來的 4 小時 46 分鐘之內進行 16 次重試,超過這個時間范圍消息將不再重試投遞。

說明

一條消息無論重試多少次,這些重試消息的 Message ID 不會改變。

配置方式

消費失敗后,重試配置方式

集群消費方式下,消息消費失敗后期望消息重試,需要在消息監聽器接口的實現中明確進行配置(三種方式任選一種):

  • 返回 Action.ReconsumeLater(推薦)

  • 返回 Null

  • 拋出異常

示例代碼:

public class MessageListenerImpl implements MessageListener{
    @Override
    public Action consume(Message message,ConsumeContext context){
        //方法 3:消息處理邏輯拋出異常,消息將重試
        doConsumeMessage(message);
        //方式 1:返回 Action.ReconsumeLater,消息將重試
        return Action.ReconsumeLater;
        //方式 2:返回 null,消息將重試
        return null;
        //方式 3:直接拋出異常,消息將重試
        throw new RuntimeException("Consumer Message exceotion");
    }
}

消費失敗后,無需重試的配置方式

集群消費方式下,消息失敗后期望消息不重試,需要捕獲消費邏輯中可能拋出的異常,最終返回 Action.CommitMessage,此后這條消息將不會再重試。

示例代碼:

public class MessageListenerImpl implements MessageListener{
    @Override
    public Action consume(Message message,ConsumeContext context){
        try{
            doConsumeMessage(message);
        }catch(Throwable e){
            //捕獲消費邏輯中的所有異常,并返回 Action.CommitMessage;
            return Action.CommitMessage;
        }
        //消息處理正常,直接返回 Action.CommitMessage;
        return Action.CommitMessage;
    }
}

自定義消息最大重試次數

消息隊列允許 Consumer 啟動的時候設置最大重試次數,重試時間間隔將按照以下策略:

  • 最大重試次數小于等于 16 次,則重試時間間隔同上表描述。

  • 最大重試次數大于 16 次,超過 16 次的重試時間間隔均為每次 2 小時。

配置方式如下:

  • SOFABOOT

    // 配置對應 Group ID 的最大消息重試次數為 20 次
    @MessageConsumer(group = "GID_xxx", topic = "TP_xxx", maxRetryTimes = 20)
  • 非 SOFABOOT

    Properties properties = new Properties();
    // 配置對應 Group ID 的最大消息重試次數為 20 次
    properties.put(PropertyKeyConst.MAX_RECONSUME_TIMES, "20");
    Consumer consumer = OMS.builder().driver("sofamq").createConsumer(properties);
說明

  • 消息最大重試次數的設置對相同 Group ID 下的所有 Consumer 實例有效。

  • 如果只對相同 Group ID 下兩個 Consumer 實例中的其中一個設置了 MaxReconsumeTimes,那么該配置對兩個 Consumer 實例均生效。

  • 配置采用覆蓋的方式生效,即最后啟動的 Consumer 實例會覆蓋之前的啟動實例的配置。

獲取消息重試次數

消費者收到消息后,可按照以下方式獲取消息的重試次數:

public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        //獲取消息的重試次數
        System.out.println(message.getReconsumeTimes());
        return Action.CommitMessage;
    }
}