消息冪等
如果消息重復(fù)消費(fèi)會(huì)影響您的業(yè)務(wù)處理,請對消息做冪等處理。本文介紹消息冪等的概念、適用場景以及處理方法。
什么是消息冪等
在數(shù)學(xué)與計(jì)算機(jī)學(xué)中,冪等操作的特點(diǎn)是其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。在消息領(lǐng)域,冪等是指Consumer重復(fù)消費(fèi)某條消息時(shí),重復(fù)消費(fèi)的結(jié)果與消費(fèi)一次的結(jié)果是相同的,并且多次消費(fèi)并未對業(yè)務(wù)系統(tǒng)產(chǎn)生任何負(fù)面影響。
例如,在支付場景下,Consumer消費(fèi)扣款消息,對一筆訂單執(zhí)行扣款操作,扣款金額為100元。如果因網(wǎng)絡(luò)不穩(wěn)定等原因?qū)е驴劭钕⒅貜?fù)投遞,Consumer重復(fù)消費(fèi)了該扣款消息,但最終的業(yè)務(wù)結(jié)果是只扣款一次,扣費(fèi)100元,且用戶的扣款記錄中對應(yīng)的訂單只有一條扣款流水,不會(huì)多次扣除費(fèi)用。那么這次扣款操作是符合要求的,整個(gè)消費(fèi)過程實(shí)現(xiàn)了消息冪等。
適用場景
在互聯(lián)網(wǎng)應(yīng)用中,尤其在網(wǎng)絡(luò)不穩(wěn)定的情況下,云消息隊(duì)列 RabbitMQ 版的消息有可能會(huì)出現(xiàn)重復(fù)。如果消息重復(fù)消費(fèi)會(huì)影響您的業(yè)務(wù)處理,請對消息做冪等處理。消息重復(fù)的可能原因如下:
發(fā)送時(shí)消息重復(fù)
當(dāng)一條消息已被成功發(fā)送到服務(wù)端并完成持久化,此時(shí)出現(xiàn)了網(wǎng)絡(luò)閃斷或者客戶端宕機(jī),導(dǎo)致服務(wù)端對客戶端應(yīng)答失敗。 如果此時(shí)Producer意識到消息發(fā)送失敗并嘗試再次發(fā)送消息,Consumer后續(xù)會(huì)收到兩條內(nèi)容相同并且Message ID也相同的消息。
投遞時(shí)消息重復(fù)
消息消費(fèi)的場景下,消息已投遞到Consumer并完成業(yè)務(wù)處理,當(dāng)客戶端給服務(wù)端反饋應(yīng)答的時(shí)候網(wǎng)絡(luò)閃斷。為了保證消息至少被消費(fèi)一次,云消息隊(duì)列 RabbitMQ 版的服務(wù)端將在網(wǎng)絡(luò)恢復(fù)后再次嘗試投遞之前已被處理過的消息,Consumer后續(xù)會(huì)收到兩條內(nèi)容相同并且Message ID也相同的消息。
負(fù)載均衡時(shí)消息重復(fù)(包括但不限于網(wǎng)絡(luò)抖動(dòng)、服務(wù)端重啟以及Consumer應(yīng)用重啟)
當(dāng)云消息隊(duì)列 RabbitMQ 版的服務(wù)端或客戶端重啟、擴(kuò)容或縮容時(shí),會(huì)觸發(fā)Rebalance,此時(shí)Consumer可能會(huì)收到重復(fù)消息。
處理方法
以Message ID為冪等鍵對消息進(jìn)行冪等處理的步驟如下:
在數(shù)據(jù)庫中創(chuàng)建一張unique key索引為唯一Message ID的表。
在Producer客戶端為每條消息設(shè)置唯一Message ID。
設(shè)置唯一Message ID的示例代碼如下:
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build(); channel.basicPublish("${ExchangeName}", "RoutingKey", true, props, ("消息發(fā)送Body" + i).getBytes(StandardCharsets.UTF_8));
了解更多Message ID相關(guān)信息,請參見如何設(shè)置Message ID。
在Consumer客戶端根據(jù)唯一Message ID對消息進(jìn)行冪等處理。
根據(jù)唯一Message ID進(jìn)行冪等處理的示例代碼如下:
channel.basicConsume(Producer.QueueName, false, "YourConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 1. 獲取業(yè)務(wù)唯一性索引數(shù)據(jù)。 try{ String messageId = properties.getMessageId(); // Message ID或者其他作為unique key的信息。 // 2. 開啟數(shù)據(jù)庫事務(wù)。 idempTable.insert(messageId); // 3. 對接收到的消息,進(jìn)行業(yè)務(wù)邏輯處理。 // 4. 提交或回滾事務(wù)。// 處理成功,則進(jìn)行ACK,否則不要進(jìn)行ACK。 channel.basicAck(envelope.getDeliveryTag(), false); } catch (數(shù)據(jù)庫主鍵沖突異常 e){ // 重復(fù)消息,直接確認(rèn)掉。 channel.basicAck(envelope.getDeliveryTag(), false); } } } );