延時消息
如果您希望消息被投遞后延遲一段時間被消費者消費,您可以使用云消息隊列 RabbitMQ 版的延時消息。云消息隊列 RabbitMQ 版原生支持延時消息,使用方式比開源RabbitMQ更簡單。
什么是延時消息
延時消息是指在指定時間段之后才被消費者消費的消息。
應用場景
延時消息適用于以下場景:
對消息生產和消費有時間窗口要求的場景。例如,在電商交易中超時未支付關閉訂單的場景,在訂單創建時會發送一條延時消息。這條消息將會在30分鐘以后投遞給消費者,消費者收到此消息后需要判斷對應的訂單是否已完成支付。如支付未完成,則關閉訂單。如已完成支付則忽略。
通過消息觸發延時任務的場景。例如,在指定時間段之后向用戶發送提醒消息。
延時時間設置規則
方案對比
云消息隊列 RabbitMQ 版無需進行代碼改造,即可實現開源RabbitMQ的所有延時消息方案,詳情請參見下表:
項目 | 開源RabbitMQ | 云消息隊列 RabbitMQ 版 |
死信Exchange+Queue的消息存活時間 | 支持 | 支持 |
死信Exchange+消息的消息存活時間 | 支持 | 支持 |
支持 | 支持 | |
不支持 | 支持 |
開源延時消息插件方案
為了減少與開源RabbitMQ的差別,云消息隊列 RabbitMQ 版也基于原生的延時消息支持使用開源插件式的方式來使用延時消息,并免去插件的安裝。具體使用流程如下:
聲明
x-delayed-message
類型的Exchange,并填寫該Exchange的擴展參數x-delayed-type以指定Exchange的路由類型。示例如下:Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("ExchangeName", "x-delayed-message", true, false, args);
參數說明如下:
參數
說明
x-delayed-type
Exchange的類型,指定路由規則。取值說明如下:
direct
fanout
topic
headers
ExchangeName
Exchange的名稱。
說明請確保聲明的Exchange已存在。具體步驟,請參見Exchange管理。
x-delayed-message
指定Exchange類型,以支持投遞延時消息。
發送延時消息。在消息的Header屬性中增加一個鍵為x-delay,值為毫秒數的鍵值對,并且指定發送的目標Exchange為上一步已聲明的Exchange。示例如下:
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8"); Map<String, Object> headers = new HashMap<String, Object>(); headers.put("x-delay", 5000);//表示消息延時5000毫秒。 AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers); channel.basicPublish("ExchangeName", "", props.build(), messageBodyBytes);
原生延時消息方案
云消息隊列 RabbitMQ 版通過對消息設置delay來實現延時效果。云消息隊列 RabbitMQ 版原生延時消息的流轉過程如下:
生產者向Exchange發布設置了delay屬性的消息。
Exchange將消息路由至Queue。
在設置的delay時間到期后,消費者才能從Queue消費消息。
原生延時消息最佳實踐
生產者客戶端
云消息隊列 RabbitMQ 版原生延時消息的使用方式非常簡單。您只需要在生產者客戶端發布消息時,通過delay為消息設置一個延時時間。
發布延時消息的Java示例代碼如下:
Map<String, Object> headers = new HashMap<>(); headers.put("delay", "5000");//表示消息延時5000毫秒。 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).headers(headers).build();
更多語言示例代碼,請參見AMQP Demos。
消費者客戶端
為保證延時消息時效性,建議您在消費消息時使用push模式的
basic.consume
方法,而不要使用pull模式的basic.get
方法。因為云消息隊列 RabbitMQ 版的消息是分布式存儲的,如果您使用pull模式的basic.get
方法獲取消息,并不能保證正好從存儲的節點獲取消息。
常見問題
為什么實際的延時時間大于設置的延時時間?
因為客戶端使用了pull模式的basic.get方法消費消息。云消息隊列 RabbitMQ 版的消息是集群存儲的,使用pull模式的basic.get方法路由到一臺Broker時,可能無法及時拉取存儲在其他Broker上的消息。