DTS(Data Transmission Service)作為事件源通過事件總線EventBridge與函數計算集成后,通過DTS觸發器能夠觸發關聯函數執行。通過函數可以對DTS數據訂閱獲取的數據庫實時增量數據進行自定義處理。本文介紹如何在函數計算控制臺創建DTS觸發器、配置入口參數以及編寫代碼并測試代碼。
功能簡介
您在函數計算的控制臺提交觸發器創建請求之后,函數計算會根據觸發器的配置信息,自動在事件總線EventBridge側創建事件流資源。
創建完成后,您可以在函數計算控制臺查看觸發器信息,同時也可以在事件總線EventBridge控制臺查看自動創建的資源信息。當DTS數據訂閱捕捉到數據庫的增量數據后,將會觸發函數執行,觸發時會根據您的攢批配置將一個或多個消息事件以批的形式推送到函數中進行處理。
注意事項
- 作為觸發源的DTS數據訂閱任務必須和函數計算的函數在相同的地域。
- 創建的事件流數量超過上限后,將無法再創建DTS觸發器。關于事件流的數量限制,請參見使用限制。
前提條件
- 事件總線EventBridge
- 函數計算
- 數據傳輸服務DTS
步驟一:創建DTS觸發器
- 登錄函數計算控制臺,在左側導航欄,單擊服務及函數。
- 在頂部菜單欄,選擇地域,然后在服務列表頁面,單擊目標服務。
在函數管理頁面,單擊目標函數名稱。
在函數詳情頁面,單擊觸發器管理頁簽,從版本或別名下拉列表選擇要創建觸發器的版本或別名,然后單擊創建觸發器。
- 在創建觸發器面板,填寫相關信息。然后單擊確定。基礎配置項說明如下所示。
配置項 取值說明 本文示例 觸發器類型 觸發器類型。關于支持的觸發器類型,請參見觸發器簡介。 DTS 名稱 自定義的觸發器名稱。 dts-trigger 版本或別名 默認值為LATEST,如果您需要創建其他版本或別名的觸發器,請先在函數詳情頁的右上角切換到該版本或別名。關于版本和別名的簡介,請參見管理版本和管理別名。 LATEST 數據訂閱任務 已創建的數據訂閱任務名稱。 dtsqntc2*** 消費組 已創建的用于消費訂閱任務數據的消費組名稱。 重要 請確保該消費組沒有在其他客戶端的實例上運行,否則可能導致傳入的消費位點失效。test 賬號 創建消費組時設置的賬號。 test 密碼 創建消費組時設置的密碼。 ****** 消費位點 期望消費第一條數據的時間戳。消費位點必須在訂閱實例的數據范圍之內。 說明 消費位點僅在新消費組第一次運行時生效。若后續任務重啟,會基于上次消費位點繼續消費。2022-06-21 00:00:00 調用方式 選擇函數調用方式。 同步調用 觸發器啟用狀態 創建觸發器后是否立即啟用。默認勾選啟用觸發器,即創建觸發器后立即啟用觸發器。 不涉及 關于推送配置、重試和死信等高級配置項說明,請參見觸發器高級功能。
創建完成后,在觸發器名稱列表中顯示已創建的觸發器。如需對創建的觸發器進行修改或刪除,具體操作,請參見觸發器管理。
步驟二:配置函數入口參數
DTS事件源會以event
的形式作為輸入參數傳遞給函數,您可以手動將event
傳給函數模擬觸發事件。
在函數詳情頁面,單擊函數代碼頁簽,然后單擊測試函數右側圖標,從下拉列表中,選擇配置測試參數。
- 在配置測試參數面板,選擇創建新測試事件或編輯已有測試事件頁簽,填寫事件名稱和事件內容。然后單擊確定。
event
格式如下所示:[ { "data": { "id": 321****, "topicPartition": { "hash": 0, "partition": 0, "topic": "cn_hangzhou_rm_1234****_test_version2" }, "offset": 3218099, "sourceTimestamp": 1654847757, "operationType": "UPDATE", "schema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou--test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "beforeImage": { "recordSchema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou-test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "values": [ { "data": 115 }, { "data": { "hb": [ 104, 101, 108, 108, 111 ], "offset": 0, "isReadOnly": false, "bigEndian": true, "nativeByteOrder": false, "mark": -1, "position": 0, "limit": 9, "capacity": 9, "address": 0 }, "charset": "utf8mb4" } ], "size": 45 }, "afterImage": { "recordSchema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou-test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "values": [ { "data": 115 }, { "data": { "hb": [ 98, 121, 101 ], "offset": 0, "isReadOnly": false, "bigEndian": true, "nativeByteOrder": false, "mark": -1, "position": 0, "limit": 11, "capacity": 11, "address": 0 }, "charset": "utf8mb4" } ], "size": 47 } }, "id": "12f701a43741d404fa9a7be89d9acae0-321****", "source": "DTSstreamDemo", "specversion": "1.0", "type": "dts:ConsumeMessage", "datacontenttype": "application/json; charset=utf-8", "time": "2022-06-10T07:55:57Z", "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro" } ]
CloudEvents規范中定義的參數解釋,請參見事件概述。
data字段包含的參數解釋如下表所示。
參數
類型
說明
id
String
DTS數據ID。
topicPartition
Array
Topic的分區信息。
hash
String
DTS底層存儲參數。
partition
String
Topic的分區。
topic
String
Topic的名稱。
offset
Int
DTS數據對應的消息存儲位點。
sourceTimestamp
Int
DTS數據生成時間戳。
operationType
String
DTS數據的操作類型。
schema
Array
數據庫表結構信息。
recordFields
Array
字段詳情記錄。
fieldName
String
字段名稱。
rawDataTypeNum
Int
字段類型映射值。
該值對應從數據訂閱通道中獲取的增量數據反序列化后的dataTypeNumber字段值,詳情請參見使用Kafka客戶端消費訂閱數據。
isPrimaryKey
Boolean
字段是否是主鍵。
isUniqueKey
Boolean
字段是否是唯一值。
fieldPosition
String
字段位置。
nameIndex
Array
命名索引。
schemaId
String
數據庫表結構信息的ID。
databaseName
String
數據庫名稱。
tableName
String
數據表名稱。
primaryIndexInfo
String
主鍵索引。
indexType
String
主鍵索引類型。
indexFields
Array
主鍵索引字段內容。
cardinality
String
主鍵基數。
nullable
Boolean
主鍵是否可為null。
isFirstUniqueIndex
Boolean
是否是第一個唯一索引。
uniqueIndexInfo
String
唯一索引。
foreignIndexInfo
String
外鍵索引。
normalIndexInfo
String
普通索引。
databaseInfo
Array
數據庫信息。
databaseType
String
數據庫類型。
version
String
數據庫版本。
totalRows
Int
數據表的總行數。
beforeImage
String
操作前記錄字段內容鏡像。
values
String
記錄字段的值。
size
Int
記錄字段大小。
afterImage
String
操作后記錄字段內容鏡像。
步驟三:編寫函數代碼并測試
觸發器創建完成后,您可以開始編寫并測試函數代碼,以驗證代碼的正確性。在實際操作過程中,當DTS數據訂閱捕捉到數據庫的增量數據后,觸發器會自動觸發函數的執行。
- 在函數詳情頁面,單擊函數代碼頁簽,在代碼編輯器中編寫代碼,然后單擊部署代碼。本文以Node.js函數代碼為例。
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); //解析event參數,對event進行處理。 callback(null, 'return result'); }
單擊函數代碼頁簽的測試函數。
執行完成后,您可以在函數代碼頁簽的上方查看執行結果。
更多信息
除了函數計算控制臺,您還可通過以下方式配置觸發器:
通過Serverless Devs工具配置觸發器。更多操作,請參見Serverless Devs。
通過SDK配置觸發器。更多操作,請參見SDK列表。
如需對創建的觸發器進行修改或刪除,具體操作,請參見觸發器管理。