云消息隊列 RocketMQ 版支持消息重試功能,即Consumer消費某條消息失敗或消費超時,云消息隊列 RocketMQ 版會根據消息重試機制重新投遞消息。本文介紹云消息隊列 RocketMQ 版分別在HTTP協議和TCP協議下的消息重試策略。
注意事項
一條消息無論重試多少次,這些重試消息的Message ID都不會改變。
消息重試只針對集群消費模式生效;廣播消費模式不提供失敗重試特性,即消費失敗后,失敗消息不再重試,繼續消費新的消息。
消息重試策略概述
云消息隊列 RocketMQ 版消息收發過程中,若Consumer消費某條消息失敗或消費超時,則云消息隊列 RocketMQ 版會在重試間隔時間后,將消息重新投遞給Consumer消費,若達到最大重試次數后消息還沒有成功被消費,則消息將被投遞至死信隊列。您可以通過消費死信隊列中的死信消息來恢復業務異常。
消息重試主要功能行為包括:
重試間隔:上一次消費失敗或超時后,距下次消息可被重新消費的間隔時間。
最大重試次數:消息消費失敗后,可被云消息隊列 RocketMQ 版重復投遞的最大次數。
對于TCP協議和HTTP協議,消息重試的重試間隔和最大重試次數有所不同,具體信息請參見下文的TCP協議重試策略和HTTP協議重試策略。
TCP協議重試策略
重試狀態機
Consumer消費消息時,消息的主要狀態變化如下:
Ready:已就緒狀態。
消息在云消息隊列 RocketMQ 版服務端已就緒,可以被消費者消費。
Inflight:處理中狀態。
消息被消費者客戶端獲取,處于消費中還未返回消費結果的狀態。
WaitingRetry:待重試狀態。
當消費者消息處理失敗或消費超時,會觸發消費重試邏輯判斷。如果當前重試次數未達到最大次數,則該消息變為待重試狀態,經過重試間隔后,消息將重新變為已就緒狀態可被重新消費。多次重試之間,可通過重試間隔進行延長,防止無效高頻的失敗。
Commit:提交狀態。
消費成功的狀態,消費者返回成功響應即可結束消息的狀態機。
DLQ:死信狀態。
消費邏輯的最終兜底機制,消息重試失敗且超過最大重試次數,若保存死信消息功能開啟,該失敗消息會被投遞至死信Topic。您可以通過消費死信Topic的消息進行業務恢復。具體信息,請參見死信隊列。
舉例:某條消息的消費重試流程如上圖所示,假設消息處于已就緒狀態的時長為5 s,消費耗時為6 s。
每次重試消息狀態都會經過已就緒->處理中->待重試的變化,消息的重試間隔指的是上一次消費失敗或超時后,距下次消息可被重新消費的間隔時間。實際消息兩次消費之間的間隔時間還包括消費耗時和已就緒狀態的持續時間。例如:
消息第一次消費時第0 s進入已就緒狀態。
受消費者處理速度的影響,到第5 s時才開始拉取消息消費,6 s后消息處理異常客戶端返回消費失敗。
此時還不能進行消費重試,需要等待重試間隔后才能開始再次消費。
等到第21 s時消息再次變為已就緒狀態。
5 s后客戶端才再次開始重新消費消息。
因此,實際消息兩次消費的間隔時間為:消費耗時+重試間隔+已就緒的持續時間=21 s。
重試間隔和重試次數
協議 | 消息類型 | 重試間隔 | 最大重試次數 |
TCP協議 | 順序消息 | 間隔時間可通過自定義參數suspendTimeMillis取值進行配置。參數取值范圍:10~30000,單位:毫秒,默認值:1000毫秒,即1秒。 | 最大重試次數可通過自定義參數MaxReconsumeTimes取值進行配置。該參數取值無最大限制。若未設置參數值,默認最大重試次數為Integer.MAX。 |
無序消息 | 間隔時間根據重試次數階梯變化,取值范圍:10秒~2小時。不支持自定義配置。
| 最大重試次數可通過自定義參數MaxReconsumeTimes取值進行配置。默認值為16次,該參數取值無最大限制,建議使用默認值。 |
表 1. TCP協議無序消息重試間隔
第幾次重試 | 重試間隔 | 第幾次重試 | 重試間隔 |
1 | 10秒 | 9 | 7分鐘 |
2 | 30秒 | 10 | 8分鐘 |
3 | 1分鐘 | 11 | 9分鐘 |
4 | 2分鐘 | 12 | 10分鐘 |
5 | 3分鐘 | 13 | 20分鐘 |
6 | 4分鐘 | 14 | 30分鐘 |
7 | 5分鐘 | 15 | 1小時 |
8 | 6分鐘 | 16 | 2小時 |
配置方式
以下配置方式僅適用于TCP協議,HTTP協議不涉及。
消息投遞失敗后需要重試
集群消費模式下,消息消費失敗后期望消息重試,需要在消息監聽器接口的實現中明確進行配置(三種方式任選一種):
方式1:返回Action.ReconsumeLater(推薦)
方式2:返回Null
方式3:拋出異常
示例代碼
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { //消息處理邏輯拋出異常,消息將重試。 doConsumeMessage(message); //方式1:返回Action.ReconsumeLater,消息將重試。 return Action.ReconsumeLater; //方式2:返回null,消息將重試。 return null; //方式3:直接拋出異常,消息將重試。 throw new RuntimeException("Consumer Message exception"); } }
消費投遞失敗后無需重試
集群消費模式下,消息失敗后期望消息不重試,需要捕獲消費邏輯中可能拋出的異常,最終返回Action.CommitMessage,此后這條消息將不會再重試。
示例代碼
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { try { doConsumeMessage(message); } catch (Throwable e) { //捕獲消費邏輯中的所有異常,并返回Action.CommitMessage; return Action.CommitMessage; } //消息處理正常,直接返回Action.CommitMessage; return Action.CommitMessage; } }
自定義消息最大重試次數和重試間隔
說明自定義云消息隊列 RocketMQ 版的客戶端日志配置,請升級TCP Java SDK到1.2.2或以上版本。更多信息,請參見版本說明。
云消息隊列 RocketMQ 版允許Consumer實例啟動的時候設置最大重試次數和重試間隔,無序消息重試間隔時間不支持自定義,以TCP協議無序消息重試間隔為準。
配置方式如下:
Properties properties = new Properties(); //配置對應Group ID的最大消息重試次數為20次,最大重試次數為字符串類型。 properties.put(PropertyKeyConst.MaxReconsumeTimes,"20"); //配置對應Group ID的消息重試間隔時間為3000毫秒,重試間隔時間為字符串類型。 properties.put(PropertyKeyConst.SuspendTimeMillis,"3000"); Consumer consumer = ONSFactory.createConsumer(properties);
重要配置采用覆蓋的方式生效,即最后啟動的Consumer實例會覆蓋之前啟動的實例的配置。因此,請確保同一Group ID下的所有Consumer實例設置的最大重試次數和重試間隔相同,否則各實例間的配置將會互相覆蓋。
獲取消息重試次數
Consumer收到消息后,可按照以下方式獲取消息的重試次數,消息重試間隔時間一般不需要獲取。
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { //獲取消息的重試次數。 System.out.println(message.getReconsumeTimes()); return Action.CommitMessage; } }
HTTP協議重試策略
協議 | 消息類型 | 重試間隔 | 最大重試次數 | 配置方式 |
HTTP協議 | 順序消息 | 1分鐘 | 288次 | 系統預設,不支持修改。 |
無序消息 | 5分鐘 | 288次 | 系統預設,不支持修改。 |