AHAS應用防護功能與消息隊列RocketMQ版組合,可以讓MQ消費端負載保持在消息處理水位之下,同時盡可能處理更多消息,達到削峰填谷的效果。本文以AHAS應用防護的勻速處理請求的能力為例,說明如何對RocketMQ消費端進行限流。
背景信息
在消息隊列RocketMQ版中,消費者消費消息時,很可能出現因消息發送量突增而消費者來不及處理的情況,導致消費方負載過高,進而導致影響系統穩定性。
在實際場景中,消息的到來具有瞬時性、不規律性,導致系統可能出現空閑資源。利用AHAS應用防護的勻速處理請求的能力,可以把超過消費端處理能力的消息(圖中黃色部分)均攤到后面系統空閑時去處理,讓系統負載處在一個穩定的水位,同時盡可能地處理更多消息,起到削峰填谷的作用。
AHAS應用防護在削峰填谷的場景時,以固定的間隔時間讓請求通過,以穩定的速度逐步處理這些請求,避免流量突刺造成系統負載過高。同時堆積的請求將會排隊,逐步進行處理;當請求排隊預計超過最大超時時長的時候則直接拒絕,而不是拒絕全部請求。
例如,在RocketMQ的場景下,配置勻速模式下請求QPS為8,則每200 ms處理一條消息,多余的處理任務將排隊;同時配置超時時間為8秒,預計的排隊時長超過8秒的處理任務將會被直接拒絕。
前提條件
- 已在消息隊列RocketMQ中發送和訂閱消息,請參見消息隊列RocketMQ版快速入門。
- 已開通AHAS,請參見開通AHAS。
步驟一:接入AHAS應用防護
下面將介紹如何快速在消息隊列RocketMQ Consumer(消費端)接入和使用AHAS應用防護服務 。您可以下載Demo工程來完成以下步驟。
-
在Consumer的pom文件中引入AHAS應用防護(即Sentinel)依賴。
<dependency> <groupId>com.alibaba.csp</groupId> <artifactId>ahas-sentinel-client</artifactId> <version>x.y.z</version> </dependency>
說明 請在AHAS依賴倉庫查看依賴最新版本。 - 定義資源。
由于消息隊列RocketMQ Consumer未提供相應攔截機制,而且每次收到都可能是批量的消息,因此用戶需要在處理消息時手動定義資源。
定義消息處理邏輯為消息被拒絕后會記錄錯誤并觸發重新投遞,代碼示例如下。
private static Action handleMessage(Message message, String groupId, String topic) { Entry entry = null; try { // 定義資源。為了便于標識,資源名稱定義為Group ID和Topic的組合。Group ID和Topic可以通過消息隊列RocketMQ控制臺獲得。 entry = SphU.entry("handleMqMessage:" + groupId + ":" + topic); // 業務真實的消息處理邏輯。 System.out.println(System.currentTimeMillis() + " | handling message: " + message); return Action.CommitMessage; } catch (BlockException ex) { // 編寫被流控的消息的處理邏輯。示例:記錄錯誤或進行重試。 System.err.println("Blocked, will retry later: " + message); // 會觸發消息重新投遞 return Action.ReconsumeLater; } finally { if (entry != null) { entry.exit(); } } }
消費者訂閱消息的邏輯示例如下。
Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe(topic, "*", (message, context) -> { return handleMessage(message); }); consumer.start();
關于消息隊列RocketMQ版如何訂閱消息,請參見消息隊列RocketMQ版文檔。
- 登錄AHAS控制臺,獲取AHAS啟動參數。
- 在控制臺最上方地域列表中,選擇地域為公網。
- 在左側導航欄選擇新應用接入。 ,單擊左上角
- 選擇
啟動參數示例如下。
-Dproject.name=MqConsumerDemo -Dahas.license=<License>
其中
MqConsumerDemo
表示應用名,可自定義;<License>
表示您的授權證書,請修改為真實值。
頁簽,查看啟動參數。
- 在Consumer中添加啟動參數。
- 啟動Publisher開始發送消息,再啟動Consumer開始接收消息。
啟動Publisher、Consumer后,本地IDE的console區域開始打印消息發送日志、消息接收日志,通過查看日志判斷消息發送情況。
步驟二:配置削峰填谷規則
將應用接入AHAS應用防護后,需要為其配置規則來實現削峰填谷。
- 登錄AHAS控制臺,在左側導航欄選擇 。
- 在應用防護頁面單擊目標應用資源卡片,進入該應用管理界面。
- 在左側導航欄選擇應用概覽,在目標接口的操作列中,單擊流控,填寫流控規則,并單擊新建完成創建。詳情請參見配置流控規則。
- 流控效果:排隊等待。
- 單機QPS閾值:QPS,設置QPS閾值為10,代表每100 ms勻速通過一個請求。
- 超時時間:2000 ms,超出此超時時間的請求將立即被拒絕,不會進入隊列。
- 通過消息隊列RocketMQ Producer端向Consumer批量發送消息,查看流控效果。
- 在Consumer控制臺,通過觀察消息頭部的時間戳(如下所示),可以發現消息消費的速率是勻速的,大約每100毫秒消費一條消息。同時,不斷有排隊的處理任務完成,超出等待時長的處理請求直接被拒絕。
1550732955137 | handling message: Hello MQ 2453 1550732955236 | handling message: Hello MQ 9162 1550732955338 | handling message: Hello MQ 4944 1550732955438 | handling message: Hello MQ 5582 1550732955538 | handling message: Hello MQ 4493 1550732955637 | handling message: Hello MQ 3036 1550732955738 | handling message: Hello MQ 1381 1550732955834 | handling message: Hello MQ 1450 1550732955937 | handling message: Hello MQ 5871
說明 在處理被拒絕請求的時候,需要根據業務需求,決定是否重新消費消息。 - 在AHAS控制臺的應用詳情頁面,單擊監控詳情,查看消息處理的監控曲線。
如果沒有使用勻速限流模式,該消息處理的監控曲線會如下圖。
如果不開啟勻速模式,只會同時處理10條消息,其余的全部被拒絕。即使后面的時間系統資源充足,多余的請求也無法被處理,因而浪費了許多空閑資源。兩種模式對比說明勻速模式下消息處理能力得到了更好的利用。
- 在Consumer控制臺,通過觀察消息頭部的時間戳(如下所示),可以發現消息消費的速率是勻速的,大約每100毫秒消費一條消息。同時,不斷有排隊的處理任務完成,超出等待時長的處理請求直接被拒絕。
消息隊列Kafka Consumer接入示例
與消息隊列RocketMQ版類似,消息隊列Kafka Consumer也可通過類似方法接入和使用AHAS應用防護服務。
示例:在處理消息的邏輯處定義資源。
private static void handleMessage(ConsumerRecord<String, String> record, String groupId, String topic) {
pool.submit(() -> {
Entry entry = null;
try {
// 定義資源。為了便于標識,資源名稱定義為Group ID和Topic的組合。Group ID和Topic可以通過MQ控制臺獲得。
entry = SphU.entry("handleKafkaMessage:" + groupId + ":" + topic);
// 業務的消息處理邏輯。
System.out.printf("[%d] Receive new messages: %s%n", System.currentTimeMillis(), record.toString());
} catch (BlockException ex) {
// Blocked
// 在處理請求被拒絕的情況時候,需要根據業務需求,決定是否重新消費消息。
System.err.println("Blocked: " + record.toString());
} finally {
if (entry != null) {
entry.exit();
}
}
});
}
示例:消費者訂閱消息的邏輯如下。
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
// 必須在下次poll之前消費完這些數據, 且總耗時不得超過SESSION_TIMEOUT_MS_CONFIG。
// 建議開一個單獨的線程池來消費消息,然后異步返回結果。
for (ConsumerRecord<String, String> record : records) {
handleMessage(record, groupId, topic);
}
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (Throwable ignore) {
}
e.printStackTrace();
}
}
相關文檔
本文介紹的是AHAS應用防護服務的一個場景:請求勻速。AHAS應用防護服務還可以處理更復雜的各種情況。
- 流量控制:針對不同的調用關系,以不同的運行指標(如QPS、線程數、系統負載等)為基準,對資源調用進行流量控制,將隨機的請求調整成合適的形狀(請求勻速、Warm Up等)。
- 熔斷降級:當調用鏈路中某個資源出現不穩定的情況,如平均響應時間增高、異常比例升高的時候,使對此資源的調用請求快速失敗,避免影響其它的資源導致級聯失敗。
- 系統負載保護:從系統的維度提供保護。當系統負載較高的時候,提供保護機制,讓系統的入口流量和系統的負載達到一個平衡,保證系統在能力范圍之內處理最多的請求。
詳情請參見AHAS應用防護。