DTS觸發(fā)器
DTS(Data Transmission Service)作為事件源通過(guò)事件總線EventBridge與函數(shù)計(jì)算集成后,通過(guò)DTS觸發(fā)器能夠觸發(fā)關(guān)聯(lián)函數(shù)執(zhí)行。通過(guò)函數(shù)可以對(duì)DTS數(shù)據(jù)訂閱獲取的數(shù)據(jù)庫(kù)實(shí)時(shí)增量數(shù)據(jù)進(jìn)行自定義處理。本文介紹如何在函數(shù)計(jì)算控制臺(tái)創(chuàng)建DTS觸發(fā)器、配置入口參數(shù)以及編寫(xiě)代碼并測(cè)試代碼。
功能簡(jiǎn)介
您在函數(shù)計(jì)算的控制臺(tái)提交觸發(fā)器創(chuàng)建請(qǐng)求之后,函數(shù)計(jì)算會(huì)根據(jù)觸發(fā)器的配置信息,自動(dòng)在事件總線EventBridge側(cè)創(chuàng)建事件流資源。
創(chuàng)建完成后,您可以在函數(shù)計(jì)算控制臺(tái)查看觸發(fā)器信息,同時(shí)也可以在事件總線EventBridge控制臺(tái)查看自動(dòng)創(chuàng)建的資源信息。當(dāng)DTS數(shù)據(jù)訂閱捕捉到數(shù)據(jù)庫(kù)的增量數(shù)據(jù)后,將會(huì)觸發(fā)函數(shù)執(zhí)行,觸發(fā)時(shí)會(huì)根據(jù)您的攢批配置將一個(gè)或多個(gè)消息事件以批的形式推送到函數(shù)中進(jìn)行處理。
注意事項(xiàng)
作為觸發(fā)源的DTS數(shù)據(jù)訂閱任務(wù)必須和函數(shù)計(jì)算的函數(shù)在相同的地域。
創(chuàng)建的事件流數(shù)量超過(guò)上限后,將無(wú)法再創(chuàng)建DTS觸發(fā)器。關(guān)于事件流的數(shù)量限制,請(qǐng)參見(jiàn)使用限制。
前提條件
事件總線EventBridge
函數(shù)計(jì)算
數(shù)據(jù)傳輸服務(wù)DTS
步驟一:創(chuàng)建DTS觸發(fā)器
登錄函數(shù)計(jì)算控制臺(tái),在左側(cè)導(dǎo)航欄,單擊函數(shù)。
在頂部菜單欄,選擇地域,然后在函數(shù)頁(yè)面,單擊目標(biāo)函數(shù)。
在函數(shù)配置頁(yè)面,選擇配置頁(yè)簽,在左側(cè)導(dǎo)航欄,單擊觸發(fā)器,然后單擊創(chuàng)建觸發(fā)器。
在創(chuàng)建觸發(fā)器面板,填寫(xiě)相關(guān)信息,然后單擊確定。
基礎(chǔ)配置項(xiàng)說(shuō)明如下所示。
配置項(xiàng)
取值說(shuō)明
本文示例
觸發(fā)器類型
觸發(fā)器類型。關(guān)于支持的觸發(fā)器類型,請(qǐng)參見(jiàn)觸發(fā)器簡(jiǎn)介。
DTS
名稱
自定義的觸發(fā)器名稱。
dts-trigger
版本或別名
默認(rèn)值為LATEST,如果您需要?jiǎng)?chuàng)建其他版本或別名的觸發(fā)器,請(qǐng)先在函數(shù)詳情頁(yè)的右上角切換到該版本或別名。關(guān)于版本和別名的簡(jiǎn)介,請(qǐng)參見(jiàn)管理版本和管理別名。
LATEST
數(shù)據(jù)訂閱任務(wù)
已創(chuàng)建的數(shù)據(jù)訂閱任務(wù)名稱。
dtsqntc2***
消費(fèi)組
已創(chuàng)建的用于消費(fèi)訂閱任務(wù)數(shù)據(jù)的消費(fèi)組名稱。
重要請(qǐng)確保該消費(fèi)組沒(méi)有在其他客戶端的實(shí)例上運(yùn)行,否則可能導(dǎo)致傳入的消費(fèi)位點(diǎn)失效。
test
賬號(hào)
創(chuàng)建消費(fèi)組時(shí)設(shè)置的賬號(hào)。
test
密碼
創(chuàng)建消費(fèi)組時(shí)設(shè)置的密碼。
******
消費(fèi)位點(diǎn)
期望消費(fèi)第一條數(shù)據(jù)的時(shí)間戳。消費(fèi)位點(diǎn)必須在訂閱實(shí)例的數(shù)據(jù)范圍之內(nèi)。
說(shuō)明消費(fèi)位點(diǎn)僅在新消費(fèi)組第一次運(yùn)行時(shí)生效。若后續(xù)任務(wù)重啟,會(huì)基于上次消費(fèi)位點(diǎn)繼續(xù)消費(fèi)。
2022-06-21 00:00:00
調(diào)用方式
選擇函數(shù)調(diào)用方式。
取值說(shuō)明如下。
同步調(diào)用:適用于順序調(diào)用場(chǎng)景。單個(gè)(批)事件觸發(fā)函數(shù)調(diào)用,等待函數(shù)執(zhí)行完成返回結(jié)果后,再由下一個(gè)(批)事件繼續(xù)觸發(fā)函數(shù)調(diào)用。同步調(diào)用請(qǐng)求正文有效負(fù)載最大為32 MB。更多信息,請(qǐng)參見(jiàn)同步調(diào)用。
異步調(diào)用:可以快速消費(fèi)事件。單個(gè)(批)事件觸發(fā)函數(shù)調(diào)用,函數(shù)計(jì)算會(huì)立刻返回響應(yīng),再由下一個(gè)(批)事件繼續(xù)觸發(fā)函數(shù)調(diào)用。該過(guò)程中函數(shù)會(huì)異步執(zhí)行。異步調(diào)用請(qǐng)求正文有效負(fù)載最大為128 KB。更多信息,請(qǐng)參見(jiàn)功能概覽。
同步調(diào)用
觸發(fā)器啟用狀態(tài)
創(chuàng)建觸發(fā)器后是否立即啟用。默認(rèn)勾選啟用觸發(fā)器,即創(chuàng)建觸發(fā)器后立即啟用觸發(fā)器。
不涉及
關(guān)于推送配置、重試和死信等高級(jí)配置項(xiàng)說(shuō)明,請(qǐng)參見(jiàn)觸發(fā)器高級(jí)功能。
創(chuàng)建完成后,在觸發(fā)器名稱列表中顯示已創(chuàng)建的觸發(fā)器。如需對(duì)創(chuàng)建的觸發(fā)器進(jìn)行修改或刪除,具體操作,請(qǐng)參見(jiàn)觸發(fā)器管理。
步驟二:配置函數(shù)入口參數(shù)
DTS事件源會(huì)以event
的形式作為輸入?yún)?shù)傳遞給函數(shù),您可以手動(dòng)將event
傳給函數(shù)模擬觸發(fā)事件。
在函數(shù)配置頁(yè)面的代碼頁(yè)簽,單擊測(cè)試函數(shù)右側(cè)的圖標(biāo),從下拉列表中,選擇配置測(cè)試參數(shù)。
在配置測(cè)試參數(shù)面板,選擇創(chuàng)建新測(cè)試事件或編輯已有測(cè)試事件,填寫(xiě)事件名稱和事件內(nèi)容,然后單擊確定。
event
格式如下所示:[ { "data": { "id": 321****, "topicPartition": { "hash": 0, "partition": 0, "topic": "cn_hangzhou_rm_1234****_test_version2" }, "offset": 3218099, "sourceTimestamp": 1654847757, "operationType": "UPDATE", "schema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou--test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "beforeImage": { "recordSchema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou-test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "values": [ { "data": 115 }, { "data": { "hb": [ 104, 101, 108, 108, 111 ], "offset": 0, "isReadOnly": false, "bigEndian": true, "nativeByteOrder": false, "mark": -1, "position": 0, "limit": 9, "capacity": 9, "address": 0 }, "charset": "utf8mb4" } ], "size": 45 }, "afterImage": { "recordSchema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou-test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "values": [ { "data": 115 }, { "data": { "hb": [ 98, 121, 101 ], "offset": 0, "isReadOnly": false, "bigEndian": true, "nativeByteOrder": false, "mark": -1, "position": 0, "limit": 11, "capacity": 11, "address": 0 }, "charset": "utf8mb4" } ], "size": 47 } }, "id": "12f701a43741d404fa9a7be89d9acae0-321****", "source": "DTSstreamDemo", "specversion": "1.0", "type": "dts:ConsumeMessage", "datacontenttype": "application/json; charset=utf-8", "time": "2022-06-10T07:55:57Z", "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro" } ]
CloudEvents規(guī)范中定義的參數(shù)解釋,請(qǐng)參見(jiàn)事件概述。
data字段包含的參數(shù)解釋如下表所示。
參數(shù)
類型
說(shuō)明
id
String
DTS數(shù)據(jù)ID。
topicPartition
Array
Topic的分區(qū)信息。
hash
String
DTS底層存儲(chǔ)參數(shù)。
partition
String
Topic的分區(qū)。
topic
String
Topic的名稱。
offset
Int
DTS數(shù)據(jù)對(duì)應(yīng)的消息存儲(chǔ)位點(diǎn)。
sourceTimestamp
Int
DTS數(shù)據(jù)生成時(shí)間戳。
operationType
String
DTS數(shù)據(jù)的操作類型。
schema
Array
數(shù)據(jù)庫(kù)表結(jié)構(gòu)信息。
recordFields
Array
字段詳情記錄。
fieldName
String
字段名稱。
rawDataTypeNum
Int
字段類型映射值。
該值對(duì)應(yīng)從數(shù)據(jù)訂閱通道中獲取的增量數(shù)據(jù)反序列化后的dataTypeNumber字段值,詳情請(qǐng)參見(jiàn)使用Kafka客戶端消費(fèi)訂閱數(shù)據(jù)。
isPrimaryKey
Boolean
字段是否是主鍵。
isUniqueKey
Boolean
字段是否是唯一值。
fieldPosition
String
字段位置。
nameIndex
Array
命名索引。
schemaId
String
數(shù)據(jù)庫(kù)表結(jié)構(gòu)信息的ID。
databaseName
String
數(shù)據(jù)庫(kù)名稱。
tableName
String
數(shù)據(jù)表名稱。
primaryIndexInfo
String
主鍵索引。
indexType
String
主鍵索引類型。
indexFields
Array
主鍵索引字段內(nèi)容。
cardinality
String
主鍵基數(shù)。
nullable
Boolean
主鍵是否可為null。
isFirstUniqueIndex
Boolean
是否是第一個(gè)唯一索引。
uniqueIndexInfo
String
唯一索引。
foreignIndexInfo
String
外鍵索引。
normalIndexInfo
String
普通索引。
databaseInfo
Array
數(shù)據(jù)庫(kù)信息。
databaseType
String
數(shù)據(jù)庫(kù)類型。
version
String
數(shù)據(jù)庫(kù)版本。
totalRows
Int
數(shù)據(jù)表的總行數(shù)。
beforeImage
String
操作前記錄字段內(nèi)容鏡像。
values
String
記錄字段的值。
size
Int
記錄字段大小。
afterImage
String
操作后記錄字段內(nèi)容鏡像。
步驟三:編寫(xiě)函數(shù)代碼并測(cè)試
觸發(fā)器創(chuàng)建完成后,您可以開(kāi)始編寫(xiě)并測(cè)試函數(shù)代碼,以驗(yàn)證代碼的正確性。在實(shí)際操作過(guò)程中,當(dāng)DTS數(shù)據(jù)訂閱捕捉到數(shù)據(jù)庫(kù)的增量數(shù)據(jù)后,觸發(fā)器會(huì)自動(dòng)觸發(fā)函數(shù)的執(zhí)行。
在函數(shù)配置頁(yè)面的代碼頁(yè)簽,在代碼編輯器中編寫(xiě)代碼,然后單擊部署代碼。
本文以Node.js函數(shù)代碼為例。
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); //解析event參數(shù),對(duì)event進(jìn)行處理。 callback(null, 'return result'); }
單擊測(cè)試函數(shù)。
更多信息
除了函數(shù)計(jì)算控制臺(tái),您還可通過(guò)以下方式配置觸發(fā)器:
通過(guò)SDK配置觸發(fā)器。更多操作,請(qǐng)參見(jiàn)SDK列表。
如需對(duì)創(chuàng)建的觸發(fā)器進(jìn)行修改或刪除,具體操作,請(qǐng)參見(jiàn)觸發(fā)器管理。