日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

消息冪等

更新時(shí)間:

如果消息重復(fù)消費(fèi)會(huì)影響您的業(yè)務(wù)處理,請對消息做冪等處理。本文介紹消息冪等的概念、適用場景以及處理方法。

什么是消息冪等

在數(shù)學(xué)與計(jì)算機(jī)學(xué)中,冪等操作的特點(diǎn)是其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。在消息領(lǐng)域,冪等是指Consumer重復(fù)消費(fèi)某條消息時(shí),重復(fù)消費(fèi)的結(jié)果與消費(fèi)一次的結(jié)果是相同的,并且多次消費(fèi)并未對業(yè)務(wù)系統(tǒng)產(chǎn)生任何負(fù)面影響。

例如,在支付場景下,Consumer消費(fèi)扣款消息,對一筆訂單執(zhí)行扣款操作,扣款金額為100。如果因網(wǎng)絡(luò)不穩(wěn)定等原因?qū)е驴劭钕⒅貜?fù)投遞,Consumer重復(fù)消費(fèi)了該扣款消息,但最終的業(yè)務(wù)結(jié)果是只扣款一次,扣費(fèi)100,且用戶的扣款記錄中對應(yīng)的訂單只有一條扣款流水,不會(huì)多次扣除費(fèi)用。那么這次扣款操作是符合要求的,整個(gè)消費(fèi)過程實(shí)現(xiàn)了消息冪等。

適用場景

在互聯(lián)網(wǎng)應(yīng)用中,尤其在網(wǎng)絡(luò)不穩(wěn)定的情況下,云消息隊(duì)列 RabbitMQ 版的消息有可能會(huì)出現(xiàn)重復(fù)。如果消息重復(fù)消費(fèi)會(huì)影響您的業(yè)務(wù)處理,請對消息做冪等處理。消息重復(fù)的可能原因如下:

  • 發(fā)送時(shí)消息重復(fù)

    當(dāng)一條消息已被成功發(fā)送到服務(wù)端并完成持久化,此時(shí)出現(xiàn)了網(wǎng)絡(luò)閃斷或者客戶端宕機(jī),導(dǎo)致服務(wù)端對客戶端應(yīng)答失敗。 如果此時(shí)Producer意識到消息發(fā)送失敗并嘗試再次發(fā)送消息,Consumer后續(xù)會(huì)收到兩條內(nèi)容相同并且Message ID也相同的消息。

  • 投遞時(shí)消息重復(fù)

    消息消費(fèi)的場景下,消息已投遞到Consumer并完成業(yè)務(wù)處理,當(dāng)客戶端給服務(wù)端反饋應(yīng)答的時(shí)候網(wǎng)絡(luò)閃斷。為了保證消息至少被消費(fèi)一次,云消息隊(duì)列 RabbitMQ 版的服務(wù)端將在網(wǎng)絡(luò)恢復(fù)后再次嘗試投遞之前已被處理過的消息,Consumer后續(xù)會(huì)收到兩條內(nèi)容相同并且Message ID也相同的消息。

  • 負(fù)載均衡時(shí)消息重復(fù)(包括但不限于網(wǎng)絡(luò)抖動(dòng)、服務(wù)端重啟以及Consumer應(yīng)用重啟)

    當(dāng)云消息隊(duì)列 RabbitMQ 版的服務(wù)端或客戶端重啟、擴(kuò)容或縮容時(shí),會(huì)觸發(fā)Rebalance,此時(shí)Consumer可能會(huì)收到重復(fù)消息。

處理方法

以Message ID為冪等鍵對消息進(jìn)行冪等處理的步驟如下:

  1. 在數(shù)據(jù)庫中創(chuàng)建一張unique key索引為唯一Message ID的表。

  2. 在Producer客戶端為每條消息設(shè)置唯一Message ID。

    設(shè)置唯一Message ID的示例代碼如下:

    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
    channel.basicPublish("${ExchangeName}", "RoutingKey", true, props, ("消息發(fā)送Body" + i).getBytes(StandardCharsets.UTF_8));

    了解更多Message ID相關(guān)信息,請參見如何設(shè)置Message ID

  3. 在Consumer客戶端根據(jù)唯一Message ID對消息進(jìn)行冪等處理。

    根據(jù)唯一Message ID進(jìn)行冪等處理的示例代碼如下:

    channel.basicConsume(Producer.QueueName, false, "YourConsumerTag",
        new DefaultConsumer(channel) {
        @Override public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body) throws IOException {
            // 1. 獲取業(yè)務(wù)唯一性索引數(shù)據(jù)。
            try{
                String messageId = properties.getMessageId();
                // Message ID或者其他作為unique key的信息。
                // 2. 開啟數(shù)據(jù)庫事務(wù)。
                idempTable.insert(messageId);
                // 3. 對接收到的消息,進(jìn)行業(yè)務(wù)邏輯處理。
                // 4. 提交或回滾事務(wù)。// 處理成功,則進(jìn)行ACK,否則不要進(jìn)行ACK。
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
            catch (數(shù)據(jù)庫主鍵沖突異常 e){
                // 重復(fù)消息,直接確認(rèn)掉。
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        }
    }
    );