消息隊列RocketMQ版作為事件源通過事件總線EventBridge函數計算集成后,通過消息隊列RocketMQ版觸發器(以下簡稱RocketMQ觸發器)能夠觸發關聯函數執行,通過函數可以對發布到消息隊列RocketMQ版中的消息進行自定義處理。本文介紹如何在函數計算控制臺創建RocketMQ觸發器、配置函數入口參數和編寫代碼并測試。

功能簡介

您在函數計算的控制臺提交觸發器創建請求之后,函數計算會根據觸發器的配置信息,自動創建事件總線EventBridge相關的資源。目前提供事件模式事件流模式兩種消息推送模式,每個模式創建的資源如下:
創建完成后,您可以在函數計算控制臺查看觸發器信息,同時也可以在事件總線EventBridge控制臺查看自動創建的資源信息。當源消息隊列RocketMQ版實例中有消息入隊時,將會觸發函數計算執行,不同消息推送模式支持的參數內容也不同。
  • 事件模式:每次會將單個消息作為事件參數傳入函數中,事件遵循CloudEvents規范。消息內容和CloudEvents的關系,請參見參數內容
  • 事件流模式:根據您的攢批配置,將一個或多個消息事件以批的形式推送到函數中進行處理,適合端到端的流式數據處理場景。

注意事項

  • 作為觸發源的消息隊列RocketMQ版的實例必須和函數計算的函數在相同的地域。
  • 創建的自定義總線以及事件規則的數量超過上限后,將無法再創建事件模式的RocketMQ觸發器。
  • 創建的事件流數量超過上限后,將無法再創建事件流模式的RocketMQ觸發器。

在單個阿里云賬號單個地域維度下,關于創建觸發器涉及的資源數量的限制,請參見使用限制

前提條件

步驟一:創建觸發器

  1. 登錄函數計算控制臺,在左側導航欄,單擊服務及函數
  2. 在頂部菜單欄,選擇地域,然后在服務列表頁面,單擊目標服務。
  3. 函數管理頁面,單擊目標函數名稱。
  4. 在函數詳情頁面,單擊觸發器管理頁簽,從版本或別名下拉列表選擇要創建觸發器的版本或別名,然后單擊創建觸發器
  5. 在創建觸發器面板,填寫相關信息。然后單擊確定
    基礎配置項說明如下所示。
    配置項操作本文示例
    觸發器類型選擇消息隊列 RocketMQ 版消息隊列 RocketMQ 版
    名稱填寫自定義的觸發器名稱。rocketmq-trigger
    版本或別名默認值為LATEST,如果您需要創建其他版本或別名的觸發器,首先需要在函數詳情頁的右上角切換到該版本或別名。關于版本和別名的簡介,請參見管理版本管理別名LATEST
    RocketMQ 實例選擇已創建的消息隊列RocketMQ版的實例。MQ_INST_164901546557****_BX7****
    Topic選擇已創建的消息隊列RocketMQ版實例的Topic。topic1
    Tag填寫消息過濾標簽。

    只有收到包含此處設置的過濾標簽字符串的消息時,才會觸發函數執行。

    tag
    Group ID選擇已創建的消息隊列RocketMQ版實例的Group ID。GID_group1
    消費位點選擇消息的消費位點,即消息隊列RocketMQ版從事件總線開始拉取消息的位置。取值說明如下。
    • 最新位點:從最新位點開始消費。
    • 最早位點:從最早位點開始消費。
    • 指定時間戳:從指定時間戳開始消費。
    最新位點
    調用方式選擇函數調用方式。
    取值說明如下。
    • 同步調用:適用于順序調用場景。單個(批)事件觸發函數調用,等待函數執行完成返回結果后,再由下一個(批)事件繼續觸發函數調用。同步調用請求正文有效負載最大為32 MB。更多信息,請參見同步調用
    • 異步調用:可以快速消費事件。單個(批)事件觸發函數調用,函數計算會立刻返回響應,再由下一個(批)事件繼續觸發函數調用。該過程中函數會異步執行。異步調用請求正文有效負載最大為128 KB。更多信息,請參見異步調用
    同步調用
    消息推送模式消息數據推送到函數計算時的底層應用模式。
    取值說明如下。
    • 事件流模式:會根據您的攢批配置將一個或多個消息事件以批的形式推送到函數中進行處理,適合端到端的流式數據處理場景。
    • 事件模式:每次會將單個消息作為事件參數傳入函數中,事件遵循CloudEvents規范。消息內容和CloudEvents的關系,請參見步驟二:配置函數入口參數
    事件模
    觸發器啟用狀態創建觸發器后是否立即啟用。默認勾選啟用觸發器,即創建觸發器后立即啟用觸發器。不涉及

    關于推送配置、重試和死信等高級配置項說明,請參見觸發器高級功能

    創建完成后,在觸發器名稱列表中顯示已創建的觸發器。如需對創建的觸發器進行修改或刪除,具體操作,請參見觸發器管理

步驟二:配置函數入口參數

消息隊列RocketMQ版事件源會以event的形式作為輸入參數傳遞給函數,您可以手動將event傳給函數模擬觸發事件,測試函數代碼是否正確。

  1. 在函數詳情頁面,單擊函數代碼頁簽,然后單擊xialatubiao圖標,從下拉列表中,選擇配置測試參數
  2. 配置測試參數面板,選擇創建新測試事件編輯已有測試事件頁簽,填寫事件名稱和事件內容。然后單擊確定
    事件模式的event格式如下所示。
    {
        "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
        "source":"RocketMQ-Function-rocketmq-trigger",
        "specversion":"1.0",
        "type":"mq:Topic:SendMessage",
        "datacontenttype":"application/json; charset=utf-8",
        "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
        "time":"2021-04-08T06:01:20.766Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2021-10-15T02:05:16.791Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "topic":"TopicName",
            "systemProperties":{
                "MIN_OFFSET":"0",
                "TRACE_ON":"true",
                "MAX_OFFSET":"8",
                "MSG_REGION":"cn-hangzhou",
                "KEYS":"systemProperties.KEYS",
                "CONSUME_START_TIME":1628577790396,
                "TAGS":"systemProperties.TAGS",
                "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi"
            },
            "userProperties":{
    
            },
            "body":"TEST"
        }
    }
    事件流模式的event格式如下所示。
    [
        {
        "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
        "source":"RocketMQ-Function-rocketmq-trigger",
        "specversion":"1.0",
        "type":"mq:Topic:SendMessage",
        "datacontenttype":"application/json; charset=utf-8",
        "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
        "time":"2021-04-08T06:01:20.766Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2021-10-15T02:05:16.791Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "topic":"TopicName",
            "systemProperties":{
                "MIN_OFFSET":"0",
                "TRACE_ON":"true",
                "MAX_OFFSET":"8",
                "MSG_REGION":"cn-hangzhou",
                "KEYS":"systemProperties.KEYS",
                "CONSUME_START_TIME":1628577790396,
                "TAGS":"systemProperties.TAGS",
                "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi"
            },
            "userProperties":{
    
            },
            "body":"TEST"
        }
        },
        {
        "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
        "source":"RocketMQ-Function-rocketmq-trigger",
        "specversion":"1.0",
        "type":"mq:Topic:SendMessage",
        "datacontenttype":"application/json; charset=utf-8",
        "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
        "time":"2021-04-08T06:01:20.766Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2021-10-15T02:05:16.791Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "topic":"TopicName",
            "systemProperties":{
                "MIN_OFFSET":"0",
                "TRACE_ON":"true",
                "MAX_OFFSET":"8",
                "MSG_REGION":"cn-hangzhou",
                "KEYS":"systemProperties.KEYS",
                "CONSUME_START_TIME":1628577790396,
                "TAGS":"systemProperties.TAGS",
                "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi"
            },
            "userProperties":{
    
            },
            "body":"TEST"
        }
        }
    ]
    data字段包含的參數解釋如下表所示。關于CloudEvents規范中定義的參數解釋,請參見事件概述
    參數類型示例值描述
    topicStringTopicNameTopic名稱。
    systemPropertiesMap系統屬性。
    MIN_OFFSETInt0最低位點。
    TRACE_ONBooleantrue是否有消息軌跡。取值說明如下:
    • true:有消息軌跡。
    • false:無消息軌跡。
    MAX_OFFSETInt8最高位點。
    MSG_REGIONStringcn-hangzhou發送消息的地域。
    KEYSStringsystemProperties.KEYS過濾屬性。
    CONSUME_START_TIMELong1628577790396開始消費時間。單位:毫秒。
    UNIQ_KEYStringAC14C305069E1B28CDFA3181CDA2****消息唯一鍵。
    TAGSStringsystemProperties.TAGS過濾屬性。
    INSTANCE_IDStringMQ_INST_123456789098****_BXhFHryi實例ID。
    userPropertiesMap用戶屬性。
    bodyStringTEST消息內容。

步驟三:編寫函數代碼并測試

完成觸發器創建后,您可以開始編寫并測試函數代碼,以驗證代碼的正確性。在實際操作過程中,當消息隊列RocketMQ版事件通過事件總線EventBridge投遞到函數計算時,觸發器會自動觸發函數的執行。

  1. 在函數詳情頁面,單擊函數代碼頁簽,在代碼編輯器中編寫代碼,然后單擊部署代碼
    本文以Node.js函數代碼為例。
    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      //解析event參數,對event進行處理。
      callback(null, 'return result');
    }
  2. 單擊函數代碼頁簽的測試函數
    執行完成后,您可以在函數代碼頁簽的上方查看執行結果。

更多信息

如需對創建的觸發器進行修改或刪除,具體操作,請參見觸發器管理