消息冪等
為了防止消息重復(fù)消費(fèi)導(dǎo)致業(yè)務(wù)處理異常,SOFAStack 消息隊列的消費(fèi)者在接收到消息后,有必要根據(jù)業(yè)務(wù)上的唯一 Key 對消息做冪等處理。本文介紹消息冪等的概念、適用場景以及處理方法。
什么是消息冪等
當(dāng)出現(xiàn)消費(fèi)者對某條消息重復(fù)消費(fèi)的情況時,重復(fù)消費(fèi)的結(jié)果與消費(fèi)一次的結(jié)果是相同的,并且多次消費(fèi)并未對業(yè)務(wù)系統(tǒng)產(chǎn)生任何負(fù)面影響,那么這整個過程就實(shí)現(xiàn)了消息冪等。
例如,在支付場景下,消費(fèi)者消費(fèi)扣款消息,對一筆訂單執(zhí)行扣款操作,扣款金額為 100 元。如果因網(wǎng)絡(luò)不穩(wěn)定等原因?qū)е驴劭钕⒅貜?fù)投遞,消費(fèi)者重復(fù)消費(fèi)了該扣款消息,但最終的業(yè)務(wù)結(jié)果是只扣款一次,扣費(fèi) 100 元,且用戶的扣款記錄中對應(yīng)的訂單只有一條扣款流水,不會多次扣除費(fèi)用。那么這次扣款操作是符合要求的,整個消費(fèi)過程實(shí)現(xiàn)了消費(fèi)冪等。
適用場景
在互聯(lián)網(wǎng)應(yīng)用中,尤其在網(wǎng)絡(luò)不穩(wěn)定的情況下,消息隊列的消息有可能會出現(xiàn)重復(fù)。如果消息重復(fù)會影響您的業(yè)務(wù)處理,請對消息做冪等處理。
消息重復(fù)的場景如下:
發(fā)送時消息重復(fù)。當(dāng)一條消息已被成功發(fā)送到服務(wù)端并完成持久化,此時出現(xiàn)了網(wǎng)絡(luò)閃斷或者客戶端宕機(jī),導(dǎo)致服務(wù)端對客戶端應(yīng)答失敗。 如果此時生產(chǎn)者意識到消息發(fā)送失敗并嘗試再次發(fā)送消息,消費(fèi)者后續(xù)會收到兩條內(nèi)容相同并且 Message ID 也相同的消息。
投遞時消息重復(fù)消息消費(fèi)的場景下,消息已投遞到消費(fèi)者并完成業(yè)務(wù)處理,當(dāng)客戶端給服務(wù)端反饋應(yīng)答的時候網(wǎng)絡(luò)閃斷。為了保證消息至少被消費(fèi)一次,消息隊列的服務(wù)端將在網(wǎng)絡(luò)恢復(fù)后再次嘗試投遞之前已被處理過的消息,消費(fèi)者后續(xù)會收到兩條內(nèi)容相同并且 Message ID 也會收到相同的消息。
負(fù)載均衡時消息重復(fù)(包括但不限于網(wǎng)絡(luò)抖動、Broker 重啟以及消費(fèi)者應(yīng)用重啟)。當(dāng)消息隊列的 Broker 或客戶端重啟、擴(kuò)容或縮容時,會觸發(fā) Rebalance,此時消費(fèi)者可能會收到重復(fù)消息。
處理方法
因為 Message ID 有可能出現(xiàn)沖突(重復(fù))的情況,所以真正安全的冪等處理,不建議以 Message ID 作為處理依據(jù)。最好的方式是以業(yè)務(wù)唯一標(biāo)識作為冪等處理的關(guān)鍵依據(jù),而業(yè)務(wù)的唯一標(biāo)識可以通過消息 Key 設(shè)置。
以支付場景為例,可以將消息的 Key 設(shè)置為訂單號,作為冪等處理的依據(jù)。具體代碼示例如下:
OrderPojo orderPojo = getPojo();
Message message = producer.messageBuilder().withTopic("TP_XXX").withValue(orderPojo).build();
// set unique key
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);
消費(fèi)者收到消息時可以根據(jù)消息的 Key,即訂單號來實(shí)現(xiàn)消息冪等:
consumer.subscribe("ons_test", "*", new GenericMessageListener<OrderPojo>() {
@Override
public Class<OrderPojo> payloadClass() {
return OrderPojo.class;
}
@Override
public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) {
String key = message.getKey();
// 根據(jù)業(yè)務(wù)唯一標(biāo)識的 Key 做冪等處理
return Action.CommitMessage;
}
});