AHAS應用防護功能與消息隊列RocketMQ版組合,可以讓MQ消費端負載保持在消息處理水位之下,同時盡可能處理更多消息,達到削峰填谷的效果。本文以AHAS應用防護的勻速處理請求的能力為例,說明如何對RocketMQ消費端進行限流。

背景信息

在消息隊列RocketMQ版中,消費者消費消息時,很可能出現因消息發送量突增而消費者來不及處理的情況,導致消費方負載過高,進而導致影響系統穩定性。

在實際場景中,消息的到來具有瞬時性、不規律性,導致系統可能出現空閑資源。利用AHAS應用防護的勻速處理請求的能力,可以把超過消費端處理能力的消息(圖中黃色部分)均攤到后面系統空閑時去處理,讓系統負載處在一個穩定的水位,同時盡可能地處理更多消息,起到削峰填谷的作用。

削峰填谷

AHAS應用防護在削峰填谷的場景時,以固定的間隔時間讓請求通過,以穩定的速度逐步處理這些請求,避免流量突刺造成系統負載過高。同時堆積的請求將會排隊,逐步進行處理;當請求排隊預計超過最大超時時長的時候則直接拒絕,而不是拒絕全部請求。

例如,在RocketMQ的場景下,配置勻速模式下請求QPS為8,則每200 ms處理一條消息,多余的處理任務將排隊;同時配置超時時間為8秒,預計的排隊時長超過8秒的處理任務將會被直接拒絕。

原理圖

前提條件

步驟一:接入AHAS應用防護

下面將介紹如何快速在消息隊列RocketMQ Consumer(消費端)接入和使用AHAS應用防護服務 。您可以下載Demo工程來完成以下步驟。

  1. 在Consumer的pom文件中引入AHAS應用防護(即Sentinel)依賴。

    <dependency>
        <groupId>com.alibaba.csp</groupId>
        <artifactId>ahas-sentinel-client</artifactId>
        <version>x.y.z</version>
    </dependency>
    說明 請在AHAS依賴倉庫查看依賴最新版本。
  2. 定義資源。

    由于消息隊列RocketMQ Consumer未提供相應攔截機制,而且每次收到都可能是批量的消息,因此用戶需要在處理消息時手動定義資源。

    定義消息處理邏輯為消息被拒絕后會記錄錯誤并觸發重新投遞,代碼示例如下。

    private static Action handleMessage(Message message, String groupId, String topic) {
            Entry entry = null;
            try {
                // 定義資源。為了便于標識,資源名稱定義為Group ID和Topic的組合。Group ID和Topic可以通過消息隊列RocketMQ控制臺獲得。
                entry = SphU.entry("handleMqMessage:" + groupId + ":" + topic);
    
                // 業務真實的消息處理邏輯。
                System.out.println(System.currentTimeMillis() + " | handling message: " + message);
                return Action.CommitMessage;
            } catch (BlockException ex) {
                // 編寫被流控的消息的處理邏輯。示例:記錄錯誤或進行重試。
                System.err.println("Blocked, will retry later: " + message);
                // 會觸發消息重新投遞
                return Action.ReconsumeLater; 
            } finally {
                if (entry != null) {
                    entry.exit();
                }
            }
        }

    消費者訂閱消息的邏輯示例如下。

    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe(topic, "*", (message, context) -> {
        return handleMessage(message);
    });
    consumer.start();

    關于消息隊列RocketMQ版如何訂閱消息,請參見消息隊列RocketMQ版文檔

  3. 登錄AHAS控制臺,獲取AHAS啟動參數。
    1. 在控制臺最上方地域列表中,選擇地域為公網
    2. 在左側導航欄選擇流量防護 > 應用防護,單擊左上角新應用接入
    3. 選擇SDK接入 > 自定義埋點頁簽,查看啟動參數。

      啟動參數示例如下。

      -Dproject.name=MqConsumerDemo -Dahas.license=<License>

      其中MqConsumerDemo表示應用名,可自定義;<License>表示您的授權證書,請修改為真實值。

  4. 在Consumer中添加啟動參數。
  5. 啟動Publisher開始發送消息,再啟動Consumer開始接收消息。

    啟動Publisher、Consumer后,本地IDE的console區域開始打印消息發送日志、消息接收日志,通過查看日志判斷消息發送情況。

步驟二:配置削峰填谷規則

將應用接入AHAS應用防護后,需要為其配置規則來實現削峰填谷。

  1. 登錄AHAS控制臺,在左側導航欄選擇流量防護 > 應用防護
  2. 應用防護頁面單擊目標應用資源卡片,進入該應用管理界面。
  3. 在左側導航欄選擇應用概覽,在目標接口的操作列中,單擊流控,填寫流控規則,并單擊新建完成創建。詳情請參見配置流控規則
    • 流控效果:排隊等待。
    • 單機QPS閾值:QPS,設置QPS閾值為10,代表每100 ms勻速通過一個請求。
    • 超時時間:2000 ms,超出此超時時間的請求將立即被拒絕,不會進入隊列。
  4. 通過消息隊列RocketMQ Producer端向Consumer批量發送消息,查看流控效果。
    • 在Consumer控制臺,通過觀察消息頭部的時間戳(如下所示),可以發現消息消費的速率是勻速的,大約每100毫秒消費一條消息。同時,不斷有排隊的處理任務完成,超出等待時長的處理請求直接被拒絕。
      1550732955137 | handling message: Hello MQ 2453
      1550732955236 | handling message: Hello MQ 9162
      1550732955338 | handling message: Hello MQ 4944
      1550732955438 | handling message: Hello MQ 5582
      1550732955538 | handling message: Hello MQ 4493
      1550732955637 | handling message: Hello MQ 3036
      1550732955738 | handling message: Hello MQ 1381
      1550732955834 | handling message: Hello MQ 1450
      1550732955937 | handling message: Hello MQ 5871
      說明 在處理被拒絕請求的時候,需要根據業務需求,決定是否重新消費消息。
    • AHAS控制臺的應用詳情頁面,單擊監控詳情,查看消息處理的監控曲線。pg_sentinel_app_monitor.png

      如果沒有使用勻速限流模式,該消息處理的監控曲線會如下圖。

      如果不開啟勻速模式,只會同時處理10條消息,其余的全部被拒絕。即使后面的時間系統資源充足,多余的請求也無法被處理,因而浪費了許多空閑資源。兩種模式對比說明勻速模式下消息處理能力得到了更好的利用。

      pg_sentinel_app_monitor_vs.png

消息隊列Kafka Consumer接入示例

與消息隊列RocketMQ版類似,消息隊列Kafka Consumer也可通過類似方法接入和使用AHAS應用防護服務。

示例:在處理消息的邏輯處定義資源。

private static void handleMessage(ConsumerRecord<String, String> record, String groupId, String topic) {
    pool.submit(() -> {
        Entry entry = null;
        try {
            // 定義資源。為了便于標識,資源名稱定義為Group ID和Topic的組合。Group ID和Topic可以通過MQ控制臺獲得。
            entry = SphU.entry("handleKafkaMessage:" + groupId + ":" + topic);

            // 業務的消息處理邏輯。
            System.out.printf("[%d] Receive new messages: %s%n", System.currentTimeMillis(), record.toString());
        } catch (BlockException ex) {
            // Blocked
            // 在處理請求被拒絕的情況時候,需要根據業務需求,決定是否重新消費消息。
             System.err.println("Blocked: " + record.toString());
        } finally {
            if (entry != null) {
                entry.exit();
            }
        }
    });
}

示例:消費者訂閱消息的邏輯如下。

while (true) {
    try {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        // 必須在下次poll之前消費完這些數據, 且總耗時不得超過SESSION_TIMEOUT_MS_CONFIG。
        // 建議開一個單獨的線程池來消費消息,然后異步返回結果。
        for (ConsumerRecord<String, String> record : records) {
            handleMessage(record, groupId, topic);
        }
    } catch (Exception e) {
        try {
            Thread.sleep(1000);
        } catch (Throwable ignore) {
        }
        e.printStackTrace();
    }
}

相關文檔

本文介紹的是AHAS應用防護服務的一個場景:請求勻速。AHAS應用防護服務還可以處理更復雜的各種情況。

  • 流量控制:針對不同的調用關系,以不同的運行指標(如QPS、線程數、系統負載等)為基準,對資源調用進行流量控制,將隨機的請求調整成合適的形狀(請求勻速、Warm Up等)。
  • 熔斷降級:當調用鏈路中某個資源出現不穩定的情況,如平均響應時間增高、異常比例升高的時候,使對此資源的調用請求快速失敗,避免影響其它的資源導致級聯失敗。
  • 系統負載保護:從系統的維度提供保護。當系統負載較高的時候,提供保護機制,讓系統的入口流量和系統的負載達到一個平衡,保證系統在能力范圍之內處理最多的請求。

詳情請參見AHAS應用防護