日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Kafka觸發器

云消息隊列 Kafka 版觸發器(以下簡稱Kafka觸發器)是通過事件總線EventBridge云消息隊列 Kafka 版作為事件源與函數計算進行集成。創建完成后,您可以在函數計算控制臺和事件總線EventBridge控制臺查看創建的信息。當有消息入隊時,事件總線EventBridge會根據您的攢批配置將一個或多個消息事件以批的形式推送到函數中進行處理。

注意事項

  • 作為觸發源的云消息隊列 Kafka 版實例必須和函數計算的函數在相同的地域。

  • 創建的事件流數量超過上限后,將無法再創建Kafka觸發器。關于事件流數量的限制,請參見使用限制

前提條件

步驟一:創建Kafka觸發器

  1. 登錄函數計算控制臺,在左側導航欄,單擊服務及函數
  2. 在頂部菜單欄,選擇地域,然后在服務列表頁面,單擊目標服務。
  3. 函數管理頁面,單擊目標函數名稱。

  4. 在函數詳情頁面,單擊觸發器管理頁簽,從版本或別名下拉列表選擇要創建觸發器的版本或別名,然后單擊創建觸發器

  5. 在創建觸發器面板,填寫相關信息。然后單擊確定

    基礎配置項說明如下所示。

    配置項

    操作

    本文示例

    觸發器類型

    選擇消息隊列 Kafka 版

    消息隊列 Kafka 版

    名稱

    填寫自定義的觸發器名稱。

    kafka-trigger

    版本或別名

    默認值為LATEST,如果您需要創建其他版本或別名的觸發器,首先需要在函數詳情頁的右上角切換到該版本或別名。關于版本和別名的簡介,請參見管理版本管理別名

    LATEST

    Kafka 實例

    選擇已創建的云消息隊列 Kafka 版實例。

    alikafka_pre-cn-i7m2t7t1****

    Topic

    選擇已創建的云消息隊列 Kafka 版實例的Topic。

    topic1

    Group ID

    選擇已創建的云消息隊列 Kafka 版實例的Group ID。

    說明

    請使用獨立的Group ID來創建觸發器,不要與已有的業務混用Group ID,否則會影響已有的消息收發。

    GID_group1

    消費任務并發數

    消費者的并發數量,取值范圍為[1,Topic的分區數]。

    2

    消費位點

    選擇消息的消費位點,即云消息隊列 Kafka 版從事件總線開始拉取消息的位置。

    取值說明如下。

    • 最早位點:從最早位點開始消費。

    • 最新位點:從最新位點開始消費。

    最新位點

    調用方式

    選擇函數調用方式。

    取值說明如下。

    • 同步調用:適用于順序調用場景。單個(批)事件觸發函數調用,等待函數執行完成返回結果后,再由下一個(批)事件繼續觸發函數調用。同步調用請求正文有效負載最大為32 MB。更多信息,請參見同步調用

    • 異步調用:可以快速消費事件。單個(批)事件觸發函數調用,函數計算會立刻返回響應,再由下一個(批)事件繼續觸發函數調用。該過程中函數會異步執行。異步調用請求正文有效負載最大為128 KB。更多信息,請參見異步調用

    同步調用

    投遞并發最大值

    Kafka消息投遞到函數計算的并發最大值,取值范圍為1~300。該參數僅對同步調用生效。如果需要更高的并發,請進入EventBridge配額中心申請配額名稱為EventStreaming FC Sink 同步投遞最大并發數的配額。

    1

    觸發器啟用狀態

    創建觸發器后是否立即啟用。默認勾選啟用觸發器,即創建觸發器后立即啟用觸發器。

    不涉及

    關于推送配置、重試和死信等高級配置項說明,請參見觸發器高級功能

    創建完成后,在觸發器名稱列表中顯示已創建的觸發器。如需對創建的觸發器進行修改或刪除,具體操作,請參見觸發器管理

步驟二:配置函數入口參數

云消息隊列 Kafka 版事件源會以event的形式作為輸入參數傳遞給函數,您可以手動將event傳給函數模擬觸發事件。

  1. 在函數詳情頁面,單擊函數代碼頁簽,然后單擊測試函數右側xialatubiao圖標,從下拉列表中,選擇配置測試參數

  2. 配置測試參數面板,選擇創建新測試事件編輯已有測試事件頁簽,填寫事件名稱和事件內容。然后單擊確定

    event格式如下所示:

    [
        {
            "specversion":"1.0",
            "id":"8e215af8-ca18-4249-8645-f96c1026****",
            "source":"acs:alikafka",
            "type":"alikafka:Topic:Message",
            "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
            "datacontenttype":"application/json; charset=utf-8",
            "time":"2022-06-23T02:49:51.589Z",
            "aliyunaccountid":"164901546557****",
            "data":{
                "topic":"****",
                "partition":7,
                "offset":25,
                "timestamp":1655952591589,
                "headers":{
                    "headers":[
    
                    ],
                    "isReadOnly":false
                },
                "key":"keytest",
                "value":"hello kafka msg"
            }
        },
        {
            "specversion":"1.0",
            "id":"8e215af8-ca18-4249-8645-f96c1026****",
            "source":"acs:alikafka",
            "type":"alikafka:Topic:Message",
            "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
            "datacontenttype":"application/json; charset=utf-8",
            "time":"2022-06-23T02:49:51.589Z",
            "aliyunaccountid":"164901546557****",
            "data":{
                "topic":"****",
                "partition":7,
                "offset":25,
                "timestamp":1655952591589,
                "headers":{
                    "headers":[
    
                    ],
                    "isReadOnly":false
                },
                "key":"keytest",
                "value":"hello kafka msg"
            }
        }
    ]

    CloudEvents規范中定義的參數解釋,請參見事件概述

    data字段包含的參數解釋如下表所示。

    參數

    類型

    示例值

    描述

    topic

    String

    TopicName

    Topic的名稱。

    partition

    Int

    1

    云消息隊列 Kafka 版的消費分區信息。

    offset

    Int

    0

    云消息隊列 Kafka 版的消息位點。

    timestamp

    String

    1655952591589

    開始消費時間戳。

步驟三:編寫函數代碼并測試

觸發器創建完成后,您可以開始編寫并測試函數代碼,以驗證代碼的正確性。在實際操作過程中,當云消息隊列 Kafka 版事件發生時,觸發器會自動觸發函數的執行。

  1. 在函數詳情頁面,單擊函數代碼頁簽,在代碼編輯器中編寫代碼,然后單擊部署代碼

    本文以Node.js函數代碼為例。

    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      //解析event參數,對event進行處理。
      callback(null, 'return result');
    }
  2. 單擊函數代碼頁簽的測試函數

    執行完成后,您可以在函數代碼頁簽的上方查看執行結果。

常見問題

為什么Kafka觸發器觸發函數的次數與消息的條數不一致?

事件總線EventBridge會根據您的攢批配置將消息推送給函數處理,不能通過函數調用次數判斷消息是否全部被消費。您可以在函數中打印日志來確認消費的消息情況。更多關于攢批配置的信息請參見推送配置

函數計算支持處理自建的Kafka消息隊列產生的消息嗎?

暫時不支持。函數計算不會主動監測自建的Kafka消息隊列是否有新的消息,您可以使用HTTP函數,將消息附加到HTTP請求中主動觸發函數。