本文介紹 SOFAStack 消息隊列的消息重試機制和配置方式。
順序消息的重試
對于順序消息,當消費者消費消息失敗后,消息隊列會自動不斷地進行消息重試(每次間隔時間為 1 秒),這時,應用會出現消息消費被阻塞的情況。因此,建議您使用順序消息時,務必保證應用能夠及時監控并處理消費失敗的情況,避免阻塞現象的發生。
無序消息的重試
對于無序消息(普通、定時、延時、事務消息),當消費者消費消息失敗時,您可以通過設置返回狀態達到消息重試的結果。
無序消息的重試只針對集群消費方式生效;廣播方式不提供失敗重試特性,即消費失敗后,失敗消息不再重試,繼續消費新的消息。
以下內容都只針對無序消息生效。
重試次數
消息隊列默認允許每條消息最多重試 16 次,每次重試的間隔時間如下:
第幾次重試 | 與上次重試的間隔時間 | 第幾次重試 | 與上次重試的間隔時間 |
1 | 10 秒 | 9 | 7 分鐘 |
2 | 30 秒 | 10 | 8 分鐘 |
3 | 1 分鐘 | 11 | 9 分鐘 |
4 | 2 分鐘 | 12 | 10 分鐘 |
5 | 3 分鐘 | 13 | 20 分鐘 |
6 | 4 分鐘 | 14 | 30 分鐘 |
7 | 5 分鐘 | 15 | 1 小時 |
8 | 6 分鐘 | 16 | 2 小時 |
如果消息重試 16 次后仍然失敗,消息將不再投遞。如果嚴格按照上述重試時間間隔計算,某條消息在一直消費失敗的前提下,將會在接下來的 4 小時 46 分鐘之內進行 16 次重試,超過這個時間范圍消息將不再重試投遞。
一條消息無論重試多少次,這些重試消息的 Message ID 不會改變。
配置方式
消費失敗后,重試配置方式
集群消費方式下,消息消費失敗后期望消息重試,需要在消息監聽器接口的實現中明確進行配置(三種方式任選一種):
返回 Action.ReconsumeLater(推薦)
返回 Null
拋出異常
示例代碼:
public class MessageListenerImpl implements MessageListener{
@Override
public Action consume(Message message,ConsumeContext context){
//方法 3:消息處理邏輯拋出異常,消息將重試
doConsumeMessage(message);
//方式 1:返回 Action.ReconsumeLater,消息將重試
return Action.ReconsumeLater;
//方式 2:返回 null,消息將重試
return null;
//方式 3:直接拋出異常,消息將重試
throw new RuntimeException("Consumer Message exceotion");
}
}
消費失敗后,無需重試的配置方式
集群消費方式下,消息失敗后期望消息不重試,需要捕獲消費邏輯中可能拋出的異常,最終返回 Action.CommitMessage
,此后這條消息將不會再重試。
示例代碼:
public class MessageListenerImpl implements MessageListener{
@Override
public Action consume(Message message,ConsumeContext context){
try{
doConsumeMessage(message);
}catch(Throwable e){
//捕獲消費邏輯中的所有異常,并返回 Action.CommitMessage;
return Action.CommitMessage;
}
//消息處理正常,直接返回 Action.CommitMessage;
return Action.CommitMessage;
}
}
自定義消息最大重試次數
消息隊列允許 Consumer 啟動的時候設置最大重試次數,重試時間間隔將按照以下策略:
最大重試次數小于等于 16 次,則重試時間間隔同上表描述。
最大重試次數大于 16 次,超過 16 次的重試時間間隔均為每次 2 小時。
配置方式如下:
SOFABOOT
// 配置對應 Group ID 的最大消息重試次數為 20 次 @MessageConsumer(group = "GID_xxx", topic = "TP_xxx", maxRetryTimes = 20)
非 SOFABOOT
Properties properties = new Properties(); // 配置對應 Group ID 的最大消息重試次數為 20 次 properties.put(PropertyKeyConst.MAX_RECONSUME_TIMES, "20"); Consumer consumer = OMS.builder().driver("sofamq").createConsumer(properties);
消息最大重試次數的設置對相同 Group ID 下的所有 Consumer 實例有效。
如果只對相同 Group ID 下兩個 Consumer 實例中的其中一個設置了
MaxReconsumeTimes
,那么該配置對兩個 Consumer 實例均生效。配置采用覆蓋的方式生效,即最后啟動的 Consumer 實例會覆蓋之前的啟動實例的配置。
獲取消息重試次數
消費者收到消息后,可按照以下方式獲取消息的重試次數:
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
//獲取消息的重試次數
System.out.println(message.getReconsumeTimes());
return Action.CommitMessage;
}
}