定時/延時消息
定時/延時消息為云消息隊列 RocketMQ 版中的高級特性消息,本文為您介紹定時/延時消息的應(yīng)用場景、功能原理、使用限制、使用方法和使用建議。
應(yīng)用場景
定時消息和延時消息本質(zhì)相同,都是服務(wù)端根據(jù)消息設(shè)置的定時時間在某一固定時刻將消息投遞給消費者消費。因此,下文統(tǒng)一用定時消息描述。
在分布式定時調(diào)度觸發(fā)、任務(wù)超時處理等場景,需要實現(xiàn)精準、可靠的定時事件觸發(fā)。使用云消息隊列 RocketMQ 版的定時消息可以簡化定時調(diào)度任務(wù)的開發(fā)邏輯,實現(xiàn)高性能、可擴展、高可靠的定時觸發(fā)能力。
典型場景一:分布式定時調(diào)度
在分布式定時調(diào)度場景下,需要實現(xiàn)各類精度的定時任務(wù),例如每天5點執(zhí)行文件清理,每隔2分鐘觸發(fā)一次消息推送等需求。傳統(tǒng)基于數(shù)據(jù)庫的定時調(diào)度方案在分布式場景下,性能不高,實現(xiàn)復(fù)雜。基于云消息隊列 RocketMQ 版的定時消息可以封裝出多種類型的定時觸發(fā)器。
典型場景二:任務(wù)超時處理
以電商交易場景為例,訂單下單后暫未支付,此時不可以直接關(guān)閉訂單,而是需要等待一段時間后才能關(guān)閉訂單。使用云消息隊列 RocketMQ 版定時消息可以實現(xiàn)超時任務(wù)的檢查觸發(fā)。
基于定時消息的超時任務(wù)處理具備如下優(yōu)勢:
精度高、開發(fā)門檻低:基于消息通知方式不存在定時階梯間隔。可以輕松實現(xiàn)任意精度事件觸發(fā),無需業(yè)務(wù)去重。
高性能可擴展:傳統(tǒng)的數(shù)據(jù)庫掃描方式較為復(fù)雜,需要頻繁調(diào)用接口掃描,容易產(chǎn)生性能瓶頸。云消息隊列 RocketMQ 版的定時消息具有高并發(fā)和水平擴展的能力。
功能原理
什么是定時消息
定時消息是云消息隊列 RocketMQ 版提供的一種高級消息類型,消息被發(fā)送至服務(wù)端后,在指定時間后才能被消費者消費。通過設(shè)置一定的定時時間可以實現(xiàn)分布式場景的延時調(diào)度觸發(fā)效果。
定時時間設(shè)置原則
云消息隊列 RocketMQ 版定時消息設(shè)置的定時時間是一個預(yù)期觸發(fā)的系統(tǒng)時間戳,延時時間也需要轉(zhuǎn)換成當(dāng)前系統(tǒng)時間后的某一個時間戳,而不是一段延時時長。
定時時間的格式為毫秒級的Unix時間戳,您需要將要設(shè)置的時刻轉(zhuǎn)換成時間戳形式。
定時時間必須設(shè)置在定時時長范圍內(nèi),超過范圍則定時不生效,服務(wù)端會立即投遞消息。
定時消息最大定時時長:
包年包月、按量付費標準版,Serverless標準版與專業(yè)版最大支持7天。
包年包月、按量付費專業(yè)版,鉑金版最大支持40天。
定時時間必須設(shè)置為當(dāng)前時間之后,若設(shè)置到當(dāng)前時間之前,則定時不生效,服務(wù)端會立即投遞消息。
示例如下:
定時消息:例如,當(dāng)前系統(tǒng)時間為2022-06-09 17:30:00,您希望消息在下午19:20:00定時投遞,則定時時間為2022-06-09 19:20:00,轉(zhuǎn)換成時間戳格式為1654773600000。
延時消息:例如,當(dāng)前系統(tǒng)時間為2022-06-09 17:30:00,您希望延時1個小時后投遞消息,則您需要根據(jù)當(dāng)前時間和延時時長換算成定時時刻,即消息投遞時間為2022-06-09 18:30:00,轉(zhuǎn)換為時間戳格式為1654770600000。
定時消息生命周期
初始化
消息被生產(chǎn)者構(gòu)建并完成初始化,待發(fā)送到服務(wù)端的狀態(tài)。
定時中
消息被發(fā)送到服務(wù)端,和普通消息不同的是,服務(wù)端不會直接構(gòu)建消息索引,而是會將定時消息單獨存儲在定時存儲系統(tǒng)中,等待定時時刻到達。
待消費
定時時刻到達后,服務(wù)端將消息重新寫入普通存儲引擎,對下游消費者可見,等待消費者消費的狀態(tài)。
消費中
消息被消費者獲取,并按照消費者本地的業(yè)務(wù)邏輯進行處理的過程。
此時服務(wù)端會等待消費者完成消費并提交消費結(jié)果,如果一定時間后沒有收到消費者的響應(yīng),云消息隊列 RocketMQ 版會對消息進行重試處理。具體信息,請參見消費重試。
消費提交
消費者完成消費處理,并向服務(wù)端提交消費結(jié)果,服務(wù)端標記當(dāng)前消息已經(jīng)被處理(包括消費成功和失敗)。
云消息隊列 RocketMQ 版默認支持保留所有消息,此時消息數(shù)據(jù)并不會立即被刪除,只是邏輯標記已消費。消息在保存時間到期或存儲空間不足被刪除前,消費者仍然可以回溯消息重新消費。
消息刪除
云消息隊列 RocketMQ 版按照消息保存機制滾動清理最早的消息數(shù)據(jù),將消息從物理文件中刪除。更多信息,請參見消息存儲和清理機制。
使用限制
消息類型一致性
定時消息僅支持在MessageType為Delay的主題內(nèi)使用,即定時消息只能發(fā)送至類型為定時消息的主題中,發(fā)送的消息的類型必須和主題的類型一致。
定時精度約束
云消息隊列 RocketMQ 版定時消息的定時時長參數(shù)精確到毫秒級,但是默認精度為1000ms,即定時消息為秒級精度。
云消息隊列 RocketMQ 版定時消息的狀態(tài)支持持久化存儲,系統(tǒng)由于故障重啟后,仍支持按照原來設(shè)置的定時時間觸發(fā)消息投遞。若存儲系統(tǒng)異常重啟,可能會導(dǎo)致定時消息投遞出現(xiàn)一定延遲。
使用示例
和普通消息相比,定時消費發(fā)送時,必須設(shè)置定時觸發(fā)的目標時間戳。
以Java語言為例,使用定時消息示例參考如下:
完整的消息收發(fā)示例代碼請參見RocketMQ 5.x系列SDK(推薦)。
示例代碼
定時/延時消息發(fā)送
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
/**
* 實例接入點,從控制臺實例詳情頁的接入點頁簽中獲取。
* 如果是在阿里云ECS內(nèi)網(wǎng)訪問,建議填寫VPC接入點。
* 如果是在本地公網(wǎng)訪問,或者是線下IDC環(huán)境訪問,可以使用公網(wǎng)接入點。使用公網(wǎng)接入點訪問,必須開啟實例的公網(wǎng)訪問功能。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
//消息發(fā)送的目標Topic名稱,需要提前在控制臺創(chuàng)建,如果不創(chuàng)建直接使用會返回報錯。
String topic = "Your Topic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* 如果是使用公網(wǎng)接入點訪問,configuration還需要設(shè)置實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 如果是在阿里云ECS內(nèi)網(wǎng)訪問,無需填寫該配置,服務(wù)端會根據(jù)內(nèi)網(wǎng)VPC信息智能獲取。
* 如果實例類型為Serverless實例,公網(wǎng)訪問必須設(shè)置實例的用戶名密碼,當(dāng)開啟內(nèi)網(wǎng)免身份識別時,內(nèi)網(wǎng)訪問可以不設(shè)置用戶名和密碼。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration configuration = builder.build();
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
//定時/延時消息發(fā)送
//以下示例表示:延遲時間為10分鐘之后的Unix時間戳。
long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
Message message = provider.newMessageBuilder()
.setTopic("topic")
//設(shè)置消息索引鍵,可根據(jù)關(guān)鍵字精確查找某條消息。
.setKeys("messageKey")
//設(shè)置消息Tag,用于消費端根據(jù)指定Tag過濾消息。
.setTag("messageTag")
.setDeliveryTimestamp(deliverTimeStamp)
//消息體
.setBody("messageBody".getBytes())
.build();
try {
//發(fā)送消息,需要關(guān)注發(fā)送結(jié)果,并捕獲失敗等異常。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}
PushConsumer消費
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class PushConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
/**
* 實例接入點,從控制臺實例詳情頁的接入點頁簽中獲取。
* 如果是在阿里云ECS內(nèi)網(wǎng)訪問,建議填寫VPC接入點。
* 如果是在本地公網(wǎng)訪問,或者是線下IDC環(huán)境訪問,可以使用公網(wǎng)接入點。使用公網(wǎng)接入點訪問,必須開啟實例的公網(wǎng)訪問功能。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
//指定需要訂閱哪個目標Topic,Topic需要提前在控制臺創(chuàng)建,如果不創(chuàng)建直接使用會返回報錯。
String topic = "Your Topic";
//為消費者指定所屬的消費者分組,Group需要提前在控制臺創(chuàng)建,如果不創(chuàng)建直接使用會返回報錯。
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* 如果是使用公網(wǎng)接入點訪問,configuration還需要設(shè)置實例的用戶名和密碼。用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 如果是在阿里云ECS內(nèi)網(wǎng)訪問,無需填寫該配置,服務(wù)端會根據(jù)內(nèi)網(wǎng)VPC信息智能獲取。
* 如果實例類型為Serverless實例,公網(wǎng)訪問必須設(shè)置實例的用戶名密碼,當(dāng)開啟內(nèi)網(wǎng)免身份識別時,內(nèi)網(wǎng)訪問可以不設(shè)置用戶名和密碼。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
//訂閱消息的過濾規(guī)則,表示訂閱所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
//初始化PushConsumer,需要綁定消費者分組ConsumerGroup、通信參數(shù)以及訂閱關(guān)系。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
//設(shè)置消費者分組。
.setConsumerGroup(consumerGroup)
//設(shè)置預(yù)綁定的訂閱關(guān)系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
//設(shè)置消費監(jiān)聽器。
.setMessageListener(messageView -> {
//處理消息并返回消費結(jié)果。
// LOGGER.info("Consume message={}", messageView);
System.out.println("Consume Message: " + messageView);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
//如果不需要再使用PushConsumer,可關(guān)閉該進程。
//pushConsumer.close();
}
}
SimpleConsumer消費
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
public class SimpleConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException {
/**
* 實例接入點,從控制臺實例詳情頁的接入點頁簽中獲取。
* 如果是在阿里云ECS內(nèi)網(wǎng)訪問,建議填寫VPC接入點。
* 如果是在本地公網(wǎng)訪問,或者是線下IDC環(huán)境訪問,可以使用公網(wǎng)接入點。使用公網(wǎng)接入點訪問,必須開啟實例的公網(wǎng)訪問功能。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
//指定需要訂閱哪個目標Topic,Topic需要提前在控制臺創(chuàng)建,如果不創(chuàng)建直接使用會返回報錯。
String topic = "Your Topic";
//為消費者指定所屬的消費者分組,Group需要提前在控制臺創(chuàng)建,如果不創(chuàng)建直接使用會返回報錯。
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* 如果是使用公網(wǎng)接入點訪問,configuration還需要設(shè)置實例的用戶名和密碼。用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 如果是在阿里云ECS內(nèi)網(wǎng)訪問,無需填寫該配置,服務(wù)端會根據(jù)內(nèi)網(wǎng)VPC信息智能獲取。
* 如果實例類型為Serverless實例,公網(wǎng)訪問必須設(shè)置實例的用戶名密碼,當(dāng)開啟內(nèi)網(wǎng)免身份識別時,內(nèi)網(wǎng)訪問可以不設(shè)置用戶名和密碼。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
Duration awaitDuration = Duration.ofSeconds(10);
//訂閱消息的過濾規(guī)則,表示訂閱所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
//初始化SimpleConsumer,需要綁定消費者分組ConsumerGroup、通信參數(shù)以及訂閱關(guān)系。
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
//設(shè)置消費者分組。
.setConsumerGroup(consumerGroup)
//設(shè)置長輪詢超時時間。
.setAwaitDuration(awaitDuration)
//設(shè)置預(yù)綁定的訂閱關(guān)系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
//設(shè)置本次拉取的最大消息條數(shù)。
int maxMessageNum = 16;
//設(shè)置消息的不可見時間。
Duration invisibleDuration = Duration.ofSeconds(10);
//SimpleConsumer需要客戶端一直主動循環(huán)獲取消息,并進行消費處理。
//如果需要提高消費實時性,建議多線程并發(fā)拉取。
while (true) {
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
messages.forEach(messageView -> {
// LOGGER.info("Received message: {}", messageView);
System.out.println("Received message: " + messageView);
});
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
//消費處理完成后,需要主動調(diào)用ACK向服務(wù)端提交消費結(jié)果。
consumer.ack(message);
System.out.println("Message is acknowledged successfully, messageId= " + messageId);
//LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
t.printStackTrace();
//LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
}
}
}
// 如果不需要再使用SimpleConsumer,可關(guān)閉該進程。
// consumer.close();
}
}
使用建議
避免大量相同定時時刻的消息
定時消息的實現(xiàn)邏輯需要先經(jīng)過定時存儲等待觸發(fā),定時時間到達后才會被投遞給消費者。因此,如果將大量定時消息的定時時間設(shè)置為同一時刻,則到達該時刻后會有大量消息同時需要被處理,會造成系統(tǒng)壓力過大,導(dǎo)致消息分發(fā)延遲,影響定時精度。
定時/延時消息常見問題
定時消息在定時時間到達前可以撤回或修改定時時間嗎?
不支持。
定時時間設(shè)置一個已過去的時間會怎么樣?
定時不生效,消息會被立即投遞。
定時消息已發(fā)送成功為什么控制臺查詢不到?
定時消息到達定時時間后才對消費者可見,并在控制臺查詢到消息軌跡。