阿里云表格存儲Tablestore是構建在阿里云飛天分布式系統之上的分布式NoSQL數據存儲服務。您可以通過創建Tablestore觸發器,將Tablestore作為事件源接入函數計算,當Tablestore中的數據更新時會自動觸發函數執行,從而完成對Tablestore變更數據的自定義處理。
使用場景
Tablestore觸發器典型的使用場景如下圖所示。
原始信號源數據存儲到原始Table A,當Table A中的數據發生變更時會觸發函數自定義清洗數據,將清洗后的數據存入Table B,您可以直接讀取Table B的數據統計展示,完成一個彈性可伸縮的Serverless Web應用。
前提條件
函數計算
表格存儲Tablestore
使用限制
目前支持Tablestore觸發器的地域為:華北2(北京)、華東1(杭州)、華東2(上海)、華南1(深圳)、日本(東京)、新加坡、德國(法蘭克福)、中國香港。
表格存儲數據表與函數計算服務要處于同一地域。
若您需要使用內網訪問Tablestore觸發器對應的函數,可以使用VPC Endpoint:{instance}.{region}.vpc.tablestore.aliyuncs.com,不可以使用Tablestore內網Endpoint。詳細信息,請參見開啟VPC功能對可用性的影響。
觸發的函數執行時間不能超過一分鐘。
注意事項
編寫函數的時候,請注意不要出現以下邏輯:表格存儲Table A觸發函數B,函數B又更新Table A的數據,從而造成函數無限循環調用。
若函數執行出現異常,函數將無限重試直到Tablestore中的日志數據過期。
步驟一:為數據表開啟Stream功能
使用觸發器功能需要先在表格存儲控制臺開啟數據表的Stream功能,才能在函數計算中處理寫入表格存儲中的增量數據。
登錄表格存儲控制臺。
在頁面上方,選擇地域。
在概覽頁面,單擊實例名稱或在實例操作列單擊實例管理。
在實例詳情頁簽的數據表列表頁簽,單擊數據表名稱后選擇實時消費通道頁簽或單擊后選擇實時消費通道。
在實時消費通道頁簽,單擊Stream信息對應的開啟。
在開啟Stream功能對話框,設置日志過期時長,單擊開啟。
日志過期時長取值為非零整數,單位為小時,最長時長為168小時。
重要日志過期時長設置后不能修改,請謹慎設置。
步驟二:創建Tablestore觸發器
登錄函數計算控制臺。
- 在左側導航欄,單擊服務及函數。
在頂部菜單欄,選擇地域。
在服務列表頁面,找到目標服務,在其右側操作列單擊函數管理。
在函數管理頁面,單擊目標函數名稱。
在函數詳情頁面,單擊觸發器管理頁簽,從版本或別名下拉列表選擇要創建觸發器的版本或別名,然后單擊創建觸發器。
在創建觸發器面板,填寫相關信息,然后單擊確定。
參數
操作
本文示例
觸發器類型
選擇表格存儲 Tablestore。
表格存儲Tablestore
名稱
自定義填寫觸發器名稱。
Tablestore-trigger
版本或別名
默認值為LATEST,如果您需要創建其他版本或別名的觸發器,需先在函數詳情頁的版本或別名下拉列表選擇該版本或別名。關于版本和別名的簡介,請參見管理版本和管理別名。
LATEST
實例
在列表中選擇已創建的Tablestore實例。
d00dd8xm****
表格
在列表中選擇已創建的表格。
mytable
角色名稱
選擇AliyunTableStoreStreamNotificationRole。
說明如果您第一次創建該類型的觸發器,則需要在單擊確定后,在彈出的對話框中選擇立即授權。
AliyunTableStoreStreamNotificationRole
創建完成后,在觸發器名稱列表中顯示已創建的觸發器。如需對創建的觸發器進行修改或刪除,具體操作,請參見觸發器管理。
步驟三:配置函數的入口參數
在函數詳情頁面,單擊函數代碼頁簽,然后單擊測試函數右側圖標,從下拉列表中,選擇配置測試參數。
在配置測試參數面板,選擇創建新測試事件或編輯已有測試事件頁簽,填寫事件名稱和事件內容。然后單擊確定。
表格存儲觸發器使用CBOR格式對增量數據進行編碼,構成函數計算的event。具體格式如下所示。
{ "Version": "Sync-v1", "Records": [ { "Type": "PutRow", "Info": { "Timestamp": 1506416585740836 }, "PrimaryKey": [ { "ColumnName": "pk_0", "Value": 1506416585881590900 }, { "ColumnName": "pk_1", "Value": "2017-09-26 17:03:05.8815909 +0800 CST" }, { "ColumnName": "pk_2", "Value": 1506416585741000 } ], "Columns": [ { "Type": "Put", "ColumnName": "attr_0", "Value": "hello_table_store", "Timestamp": 1506416585741 }, { "Type": "Put", "ColumnName": "attr_1", "Value": 1506416585881590900, "Timestamp": 1506416585741 } ] } ] }
event參數中不同屬性字段的解釋如下表所示。
參數
描述
Version
Payload版本號。示例如Sync-v1。類型為String。
Records
數據表中的增量數據行數組。包含如下內部成員:
Type:數據行類型,包含PutRow、UpdateRow和DeleteRow。類型為String。
Info:包含Timestamp內部成員。Timestamp表示該行的最后修改UTC時間。類型為Int64。
PrimaryKey
主鍵列數組。包含如下內部成員:
ColumnName:主鍵列名稱。類型為String。
Value:主鍵列內容。類型為formated_value,支持Integer、String和Blob。
Columns
屬性列數組。包括如下內部成員:
Type:屬性列類型,包含Put、DeleteOneVersion和DeleteAllVersions。類型為String。
ColumnName:屬性列名稱。類型為String。
Value:屬性列內容。類型為formated_value,支持Integer、Boolean、Double、String和Blob。
Timestamp:屬性列最后修改UTC時間。類型為Int64。
步驟四:編寫函數代碼并測試
完成Tablestore觸發器創建后,您可以開始編寫函數代碼并測試,以驗證代碼的正確性。在實際操作過程中當Tablestore中有數據更新時會自動觸發函數執行。
在函數詳情頁面,單擊函數代碼頁簽,在代碼編輯器中編寫代碼,然后單擊部署代碼。
本文以Python函數代碼為例。如果您想使用其他運行環境,更多代碼示例,請參見表格存儲觸發函數計算示例。
import logging import cbor import json def get_attribute_value(record, column): attrs = record[u'Columns'] for x in attrs: if x[u'ColumnName'] == column: return x['Value'] def get_pk_value(record, column): attrs = record[u'PrimaryKey'] for x in attrs: if x['ColumnName'] == column: return x['Value'] def handler(event, context): logger = logging.getLogger() logger.info("Begin to handle event") #records = cbor.loads(event) records = json.loads(event) for record in records['Records']: logger.info("Handle record: %s", record) pk_0 = get_pk_value(record, "pk_0") attr_0 = get_attribute_value(record, "attr_0") return 'OK'
單擊函數代碼頁簽的測試函數。
執行完成后,您可以在函數代碼頁簽的上方查看執行結果。
常見問題
如果您無法在某一地域創建Tablestore觸發器,請確認支持創建Tablestore觸發器的地域,具體請查看使用限制。
如果您在創建Tablestore觸發器時無法找到已經創建好的表格存儲數據表,請確認表格存儲數據表與函數計算服務是否處于同一地域。
使用Tablestore觸發器時,總是會報客戶端取消的報錯,一般是由于客戶端調用函數時設置的超時時間小于函數執行時間。建議您將客戶端超時時間調大,具體請參見客戶端斷開連接,報錯Invocation canceled by client怎么辦?。
如果Tablestore數據表中有新增的數據,但是Tablestore觸發器沒有被觸發,您可以從以下方面進行排查。關于觸發器不能正常觸發的詳細排查方案可參見觸發器不能正常觸發函數執行怎么辦?。
確認數據表是否開啟了Stream功能,具體請參見步驟一:為數據表開啟Stream功能。
確認在創建觸發器時配置的角色是否正確,您可以使用默認的觸發器角色
AliyunTableStoreStreamNotificationRole
,具體請參見步驟二:創建Tablestore觸發器。查看是否有函數運行日志,可以根據日志確認是否是函數執行失敗。函數執行失敗后,會一直重試直到Tablestore中的日志數據過期。