本文介紹如何使用隊列實現按順序發送和消費消息。

背景信息

消息服務MNS提供的隊列(Queue)主要的特點是高可靠、高可用、高并發。每個隊列的數據都會被持久化三份到阿里云的飛天分布式平臺。其中每個隊列至少有兩臺服務器向外提供服務,同時每臺服務器都支持高并發訪問。這些分布式特性導致了消息服務MNS的隊列無法像傳統單機隊列嚴格保證消息FIFO,只能做到基本有序。

隊列如果同時有多個消息發送者(Sender),由于并發和網絡延遲等問題,根本無法獲知消息的實際發送順序和消息到達服務器端的真實順序。同理,當有多個接收者并發接收消息時,其真正的處理順序也不可獲知。

綜上所述,只有一個發送者(一個進程,可以是多個線程)、 一個接收者時,消息順序才有意義,也只有在這種情況下才能感知和記錄消息的真實發送和接收順序。

解決方案

基于上述假設,同時為了滿足部分用戶對于消息消費順序性的要求,設計了以下方案,確保消息按照用戶發送順序被接收和消費。

  1. 消息在發送端進行染色,加上SeqId(例如:#num#)。
  2. 消息在接收端進行還原,并根據SeqId排序后返回給上層,同時對于已經接收的消息會有后臺線程保證消息不會被重復消費。
  3. 為了避免因為發送者或者接收者fail導致SeqId丟失。SeqId會被持久存儲到本地磁盤文件,或者其他存儲和數據庫,例如OSS、OTS或RDS。流程圖

注意事項

  • 本文的主要目的是展示順序消息的解決方案,不建議不加測試直接用于生產環境。
  • 正常情況下,發送端和接收端的SeqId應該和隊列中的消息(染色)匹配。當出現刪除隊列重新創建等操作時,請注意磁盤文件中的SeqId是否和隊列中的真實情況相符,同時建議不要往染色的消息隊列里發送非染色消息。
  • 隊列的消息有效期設置過短或者每條消息的實際處理結果都有可能會對消息有序性造成影響,在您的程序中需要對這些情況所導致的亂序現象進行處理。

示例代碼

本文以Python版的方案實現為例,下載地址:有序隊列Python示例代碼(依賴MNS Python SDK)。其中,主要提供了OrderedQueueWrapper類(ordered_queue.py文件),可以將普通隊列包裝成有序隊列。

OrderedQueueWrapper提供SendMessageInOrder()和ReceiveMessageInOrder()方法:

  • 發送時對消息進行染色。
  • 接收時還原消息,并且按順序返回給接收者。

另外,send_message_in_order.pyreceive_message_in_order.py提供發送者和接收者使用OrderedQueueWrapper的程序示例。

send_message_in_order.py:
    #init orderedQueue
    seqIdConfig = {"localFileName":"/tmp/mns_send_message_seq_id"}   # 指定持久化發送SeqId的磁盤文件。
    seqIdPS = LocalDiskStorage(seqIdConfig)
    orderedQueue = OrderedQueueWrapper(myQueue, sendSeqIdPersistStorage = seqIdPS)
    orderedQueue.SendMessageInOrder(message)

receive_message_in_order.py:
    #init orderedQueue
    seqIdConfig = {"localFileName":"/tmp/mns_receive_message_seq_id"} # 指定持久化接收SeqId的磁盤文件。
    seqIdPS = LocalDiskStorage(seqIdConfig)
    orderedQueue = OrderedQueueWrapper(myQueue, receiveSeqIdPersistStorage = seqIdPS)
    recv_msg = orderedQueue.ReceiveMessageInOrder(wait_seconds)

運行方法

  1. 配置send_message_in_order.pyreceive_message_in_order.py中的配置項g_endpointg_accessKeyIdg_accessKeySecret以及g_testQueueName
  2. 運行send_message_in_order.py
    python send_message_in_order.py
  3. 運行receive_message_in_order.py
    python receive_message_in_order.py

    發送程序會發送20條消息,接收程序會按順序消費這20條消息。

    順序消費

運行ordered_queue.py的測試結果對比普通隊列,運行命令如下:

python ordered_queue.py
說明 ordered_queue.py需配置Endpoint和AccessKey。

運行結果

運行結果如下:

  • 非嚴格有序非嚴格有序

    整體有序,部分相鄰消息無序,說明單個隊列有多個服務器在同時服務。

  • 嚴格有序嚴格有序