日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

定時/延時消息

定時/延時消息為云消息隊列 RocketMQ 版中的高級特性消息,本文為您介紹定時/延時消息的應(yīng)用場景、功能原理、使用限制、使用方法和使用建議。

應(yīng)用場景

說明

定時消息和延時消息本質(zhì)相同,都是服務(wù)端根據(jù)消息設(shè)置的定時時間在某一固定時刻將消息投遞給消費者消費。因此,下文統(tǒng)一用定時消息描述。

在分布式定時調(diào)度觸發(fā)、任務(wù)超時處理等場景,需要實現(xiàn)精準、可靠的定時事件觸發(fā)。使用云消息隊列 RocketMQ 版的定時消息可以簡化定時調(diào)度任務(wù)的開發(fā)邏輯,實現(xiàn)高性能、可擴展、高可靠的定時觸發(fā)能力。

典型場景一:分布式定時調(diào)度

定時消息

在分布式定時調(diào)度場景下,需要實現(xiàn)各類精度的定時任務(wù),例如每天5點執(zhí)行文件清理,每隔2分鐘觸發(fā)一次消息推送等需求。傳統(tǒng)基于數(shù)據(jù)庫的定時調(diào)度方案在分布式場景下,性能不高,實現(xiàn)復(fù)雜。基于云消息隊列 RocketMQ 版的定時消息可以封裝出多種類型的定時觸發(fā)器。

典型場景二:任務(wù)超時處理

超時任務(wù)處理

以電商交易場景為例,訂單下單后暫未支付,此時不可以直接關(guān)閉訂單,而是需要等待一段時間后才能關(guān)閉訂單。使用云消息隊列 RocketMQ 版定時消息可以實現(xiàn)超時任務(wù)的檢查觸發(fā)。

基于定時消息的超時任務(wù)處理具備如下優(yōu)勢:

  • 精度高、開發(fā)門檻低:基于消息通知方式不存在定時階梯間隔。可以輕松實現(xiàn)任意精度事件觸發(fā),無需業(yè)務(wù)去重。

  • 高性能可擴展:傳統(tǒng)的數(shù)據(jù)庫掃描方式較為復(fù)雜,需要頻繁調(diào)用接口掃描,容易產(chǎn)生性能瓶頸。云消息隊列 RocketMQ 版的定時消息具有高并發(fā)和水平擴展的能力。

功能原理

什么是定時消息

定時消息是云消息隊列 RocketMQ 版提供的一種高級消息類型,消息被發(fā)送至服務(wù)端后,在指定時間后才能被消費者消費。通過設(shè)置一定的定時時間可以實現(xiàn)分布式場景的延時調(diào)度觸發(fā)效果。

定時時間設(shè)置原則

  • 云消息隊列 RocketMQ 版定時消息設(shè)置的定時時間是一個預(yù)期觸發(fā)的系統(tǒng)時間戳,延時時間也需要轉(zhuǎn)換成當(dāng)前系統(tǒng)時間后的某一個時間戳,而不是一段延時時長。

  • 定時時間的格式為毫秒級的Unix時間戳,您需要將要設(shè)置的時刻轉(zhuǎn)換成時間戳形式。

  • 定時時間必須設(shè)置在定時時長范圍內(nèi),超過范圍則定時不生效,服務(wù)端會立即投遞消息。

  • 定時消息最大定時時長:

    • 包年包月、按量付費標準版,Serverless標準版與專業(yè)版最大支持7天。

    • 包年包月、按量付費專業(yè)版,鉑金版最大支持40天。

  • 定時時間必須設(shè)置為當(dāng)前時間之后,若設(shè)置到當(dāng)前時間之前,則定時不生效,服務(wù)端會立即投遞消息。

示例如下:

  • 定時消息:例如,當(dāng)前系統(tǒng)時間為2022-06-09 17:30:00,您希望消息在下午19:20:00定時投遞,則定時時間為2022-06-09 19:20:00,轉(zhuǎn)換成時間戳格式為1654773600000。

  • 延時消息:例如,當(dāng)前系統(tǒng)時間為2022-06-09 17:30:00,您希望延時1個小時后投遞消息,則您需要根據(jù)當(dāng)前時間和延時時長換算成定時時刻,即消息投遞時間為2022-06-09 18:30:00,轉(zhuǎn)換為時間戳格式為1654770600000。

定時消息生命周期

定時消息生命周期

  • 初始化

    消息被生產(chǎn)者構(gòu)建并完成初始化,待發(fā)送到服務(wù)端的狀態(tài)。

  • 定時中

    消息被發(fā)送到服務(wù)端,和普通消息不同的是,服務(wù)端不會直接構(gòu)建消息索引,而是會將定時消息單獨存儲在定時存儲系統(tǒng)中,等待定時時刻到達。

  • 待消費

    定時時刻到達后,服務(wù)端將消息重新寫入普通存儲引擎,對下游消費者可見,等待消費者消費的狀態(tài)。

  • 消費中

    消息被消費者獲取,并按照消費者本地的業(yè)務(wù)邏輯進行處理的過程。

    此時服務(wù)端會等待消費者完成消費并提交消費結(jié)果,如果一定時間后沒有收到消費者的響應(yīng),云消息隊列 RocketMQ 版會對消息進行重試處理。具體信息,請參見消費重試

  • 消費提交

    消費者完成消費處理,并向服務(wù)端提交消費結(jié)果,服務(wù)端標記當(dāng)前消息已經(jīng)被處理(包括消費成功和失敗)。

    云消息隊列 RocketMQ 版默認支持保留所有消息,此時消息數(shù)據(jù)并不會立即被刪除,只是邏輯標記已消費。消息在保存時間到期或存儲空間不足被刪除前,消費者仍然可以回溯消息重新消費。

  • 消息刪除

    云消息隊列 RocketMQ 版按照消息保存機制滾動清理最早的消息數(shù)據(jù),將消息從物理文件中刪除。更多信息,請參見消息存儲和清理機制

使用限制

消息類型一致性

定時消息僅支持在MessageTypeDelay的主題內(nèi)使用,即定時消息只能發(fā)送至類型為定時消息的主題中,發(fā)送的消息的類型必須和主題的類型一致。

定時精度約束

云消息隊列 RocketMQ 版定時消息的定時時長參數(shù)精確到毫秒級,但是默認精度為1000ms,即定時消息為秒級精度。

云消息隊列 RocketMQ 版定時消息的狀態(tài)支持持久化存儲,系統(tǒng)由于故障重啟后,仍支持按照原來設(shè)置的定時時間觸發(fā)消息投遞。若存儲系統(tǒng)異常重啟,可能會導(dǎo)致定時消息投遞出現(xiàn)一定延遲。

使用示例

和普通消息相比,定時消費發(fā)送時,必須設(shè)置定時觸發(fā)的目標時間戳。

以Java語言為例,使用定時消息示例參考如下:

完整的消息收發(fā)示例代碼請參見RocketMQ 5.x系列SDK(推薦)

示例代碼

定時/延時消息發(fā)送

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;


public class ProducerExample {
    public static void main(String[] args) throws ClientException {
        /**
         * 實例接入點,從控制臺實例詳情頁的接入點頁簽中獲取。
         * 如果是在阿里云ECS內(nèi)網(wǎng)訪問,建議填寫VPC接入點。
         * 如果是在本地公網(wǎng)訪問,或者是線下IDC環(huán)境訪問,可以使用公網(wǎng)接入點。使用公網(wǎng)接入點訪問,必須開啟實例的公網(wǎng)訪問功能。
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        //消息發(fā)送的目標Topic名稱,需要提前在控制臺創(chuàng)建,如果不創(chuàng)建直接使用會返回報錯。
        String topic = "Your Topic";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * 如果是使用公網(wǎng)接入點訪問,configuration還需要設(shè)置實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
         * 如果是在阿里云ECS內(nèi)網(wǎng)訪問,無需填寫該配置,服務(wù)端會根據(jù)內(nèi)網(wǎng)VPC信息智能獲取。
         * 如果實例類型為Serverless實例,公網(wǎng)訪問必須設(shè)置實例的用戶名密碼,當(dāng)開啟內(nèi)網(wǎng)免身份識別時,內(nèi)網(wǎng)訪問可以不設(shè)置用戶名和密碼。
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
        ClientConfiguration configuration = builder.build();
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();
        //定時/延時消息發(fā)送
        //以下示例表示:延遲時間為10分鐘之后的Unix時間戳。
        long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
        Message message = provider.newMessageBuilder()
                .setTopic("topic")
                //設(shè)置消息索引鍵,可根據(jù)關(guān)鍵字精確查找某條消息。
                .setKeys("messageKey")
                //設(shè)置消息Tag,用于消費端根據(jù)指定Tag過濾消息。
                .setTag("messageTag")
                .setDeliveryTimestamp(deliverTimeStamp)
                //消息體
                .setBody("messageBody".getBytes())
                .build();
        try {
            //發(fā)送消息,需要關(guān)注發(fā)送結(jié)果,并捕獲失敗等異常。
            SendReceipt sendReceipt = producer.send(message);
            System.out.println(sendReceipt.getMessageId());
        } catch (ClientException e) {
            e.printStackTrace();
        }
    }
}

PushConsumer消費

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;

public class PushConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        /**
         * 實例接入點,從控制臺實例詳情頁的接入點頁簽中獲取。
         * 如果是在阿里云ECS內(nèi)網(wǎng)訪問,建議填寫VPC接入點。
         * 如果是在本地公網(wǎng)訪問,或者是線下IDC環(huán)境訪問,可以使用公網(wǎng)接入點。使用公網(wǎng)接入點訪問,必須開啟實例的公網(wǎng)訪問功能。
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        //指定需要訂閱哪個目標Topic,Topic需要提前在控制臺創(chuàng)建,如果不創(chuàng)建直接使用會返回報錯。
        String topic = "Your Topic";
        //為消費者指定所屬的消費者分組,Group需要提前在控制臺創(chuàng)建,如果不創(chuàng)建直接使用會返回報錯。
        String consumerGroup = "Your ConsumerGroup";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * 如果是使用公網(wǎng)接入點訪問,configuration還需要設(shè)置實例的用戶名和密碼。用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
         * 如果是在阿里云ECS內(nèi)網(wǎng)訪問,無需填寫該配置,服務(wù)端會根據(jù)內(nèi)網(wǎng)VPC信息智能獲取。
         * 如果實例類型為Serverless實例,公網(wǎng)訪問必須設(shè)置實例的用戶名密碼,當(dāng)開啟內(nèi)網(wǎng)免身份識別時,內(nèi)網(wǎng)訪問可以不設(shè)置用戶名和密碼。
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
        ClientConfiguration clientConfiguration = builder.build();
        //訂閱消息的過濾規(guī)則,表示訂閱所有Tag的消息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        //初始化PushConsumer,需要綁定消費者分組ConsumerGroup、通信參數(shù)以及訂閱關(guān)系。
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                //設(shè)置消費者分組。
                .setConsumerGroup(consumerGroup)
                //設(shè)置預(yù)綁定的訂閱關(guān)系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                //設(shè)置消費監(jiān)聽器。
                .setMessageListener(messageView -> {
                    //處理消息并返回消費結(jié)果。
                    // LOGGER.info("Consume message={}", messageView);
                    System.out.println("Consume Message: " + messageView);
                    return ConsumeResult.SUCCESS;
                })
                .build();
        Thread.sleep(Long.MAX_VALUE);
        //如果不需要再使用PushConsumer,可關(guān)閉該進程。
        //pushConsumer.close();
    }
}                                                 

SimpleConsumer消費

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;

public class SimpleConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);

    private SimpleConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException {
        /**
         * 實例接入點,從控制臺實例詳情頁的接入點頁簽中獲取。
         * 如果是在阿里云ECS內(nèi)網(wǎng)訪問,建議填寫VPC接入點。
         * 如果是在本地公網(wǎng)訪問,或者是線下IDC環(huán)境訪問,可以使用公網(wǎng)接入點。使用公網(wǎng)接入點訪問,必須開啟實例的公網(wǎng)訪問功能。
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        //指定需要訂閱哪個目標Topic,Topic需要提前在控制臺創(chuàng)建,如果不創(chuàng)建直接使用會返回報錯。
        String topic = "Your Topic";
        //為消費者指定所屬的消費者分組,Group需要提前在控制臺創(chuàng)建,如果不創(chuàng)建直接使用會返回報錯。
        String consumerGroup = "Your ConsumerGroup";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * 如果是使用公網(wǎng)接入點訪問,configuration還需要設(shè)置實例的用戶名和密碼。用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
         * 如果是在阿里云ECS內(nèi)網(wǎng)訪問,無需填寫該配置,服務(wù)端會根據(jù)內(nèi)網(wǎng)VPC信息智能獲取。
         * 如果實例類型為Serverless實例,公網(wǎng)訪問必須設(shè)置實例的用戶名密碼,當(dāng)開啟內(nèi)網(wǎng)免身份識別時,內(nèi)網(wǎng)訪問可以不設(shè)置用戶名和密碼。
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
        ClientConfiguration clientConfiguration = builder.build();

        Duration awaitDuration = Duration.ofSeconds(10);
        //訂閱消息的過濾規(guī)則,表示訂閱所有Tag的消息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        //初始化SimpleConsumer,需要綁定消費者分組ConsumerGroup、通信參數(shù)以及訂閱關(guān)系。
        SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                //設(shè)置消費者分組。
                .setConsumerGroup(consumerGroup)
                //設(shè)置長輪詢超時時間。
                .setAwaitDuration(awaitDuration)
                //設(shè)置預(yù)綁定的訂閱關(guān)系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .build();
        //設(shè)置本次拉取的最大消息條數(shù)。
        int maxMessageNum = 16;
        //設(shè)置消息的不可見時間。
        Duration invisibleDuration = Duration.ofSeconds(10);
        //SimpleConsumer需要客戶端一直主動循環(huán)獲取消息,并進行消費處理。
        //如果需要提高消費實時性,建議多線程并發(fā)拉取。
        while (true) {
            final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
            messages.forEach(messageView -> {
                // LOGGER.info("Received message: {}", messageView);
                System.out.println("Received message: " + messageView);
            });
            for (MessageView message : messages) {
                final MessageId messageId = message.getMessageId();
                try {
                    //消費處理完成后,需要主動調(diào)用ACK向服務(wù)端提交消費結(jié)果。
                    consumer.ack(message);
                    System.out.println("Message is acknowledged successfully, messageId= " + messageId);
                    //LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
                } catch (Throwable t) {
                    t.printStackTrace();
                    //LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
                }
            }
        }
        // 如果不需要再使用SimpleConsumer,可關(guān)閉該進程。
        // consumer.close();
    }
}                                           

使用建議

避免大量相同定時時刻的消息

定時消息的實現(xiàn)邏輯需要先經(jīng)過定時存儲等待觸發(fā),定時時間到達后才會被投遞給消費者。因此,如果將大量定時消息的定時時間設(shè)置為同一時刻,則到達該時刻后會有大量消息同時需要被處理,會造成系統(tǒng)壓力過大,導(dǎo)致消息分發(fā)延遲,影響定時精度。

定時/延時消息常見問題

定時消息在定時時間到達前可以撤回或修改定時時間嗎?

不支持。

定時時間設(shè)置一個已過去的時間會怎么樣?

定時不生效,消息會被立即投遞。

定時消息已發(fā)送成功為什么控制臺查詢不到?

定時消息到達定時時間后才對消費者可見,并在控制臺查詢到消息軌跡。