使用函數(shù)計算清洗數(shù)據(jù)
表格存儲高并發(fā)的寫入性能以及低廉的存儲成本非常適合物聯(lián)網(wǎng)、日志、監(jiān)控數(shù)據(jù)的存儲。將數(shù)據(jù)寫入到表格存儲時,您可以通過函數(shù)計算對新增的數(shù)據(jù)做簡單的清洗,將清洗后的數(shù)據(jù)寫回到表格存儲的另一種數(shù)據(jù)表中。同時,您也可以實時訪問表格存儲中的原始數(shù)據(jù)和結(jié)果數(shù)據(jù)。
樣例場景
假設(shè)寫入表格存儲的為日志數(shù)據(jù),且日志數(shù)據(jù)包括如下三個字段。為了便于日志查詢,您需要將level>1的日志寫入到表格存儲的另一張數(shù)據(jù)表result中。
字段名稱 | 類型 | 說明 |
id | 整型 | 日志ID。 |
level | 整型 | 日志的等級,數(shù)值越大表示日志等級越高。 |
message | 字符串 | 日志的內(nèi)容。 |
步驟一:為數(shù)據(jù)表開啟Stream功能
使用觸發(fā)器功能需要先在表格存儲控制臺開啟數(shù)據(jù)表的Stream功能,才能在函數(shù)計算中處理寫入表格存儲中的增量數(shù)據(jù)。
登錄表格存儲控制臺。
在頁面上方,選擇地域。
在概覽頁面,單擊實例名稱或在實例操作列單擊實例管理。
在實例詳情頁簽的數(shù)據(jù)表列表頁簽,單擊數(shù)據(jù)表名稱后選擇實時消費通道頁簽或單擊后選擇實時消費通道。
在實時消費通道頁簽,單擊Stream信息對應的開啟。
在開啟Stream功能對話框,設(shè)置日志過期時長,單擊開啟。
日志過期時長取值為非零整數(shù),單位為小時,最長時長為168小時。
重要日志過期時長設(shè)置后不能修改,請謹慎設(shè)置。
步驟二:創(chuàng)建函數(shù)和Tablestore觸發(fā)器
創(chuàng)建函數(shù)。
登錄函數(shù)計算控制臺。
可選:在頁面右上角,單擊體驗函數(shù)計算3.0。
說明函數(shù)計算3.0進行了多項功能改進,本文采用函數(shù)計算3.0進行函數(shù)計算的使用介紹。
若您已進入新版控制臺頁面(頁面右上角的按鈕為返回函數(shù)計算2.0),則無需執(zhí)行此操作。
在左側(cè)導航欄,單擊函數(shù)。
在頂部菜單欄,選擇地域,然后在函數(shù)頁面,單擊創(chuàng)建函數(shù)。
在創(chuàng)建函數(shù)頁面,按需選擇創(chuàng)建函數(shù)的方式,配置以下配置項,然后單擊創(chuàng)建。
此處以創(chuàng)建事件函數(shù)為例,介紹對表格存儲中數(shù)據(jù)修改進行實時計算的操作。
說明使用函數(shù)計算時,您可以通過創(chuàng)建事件函數(shù)、Web函數(shù)或任務函數(shù)實現(xiàn)對表格存儲中數(shù)據(jù)的處理。更多信息,請參見函數(shù)類型選型。
如果要Tablestore中的數(shù)據(jù)變更自動觸發(fā)數(shù)據(jù)處理,請創(chuàng)建事件函數(shù)。具體操作,請參見創(chuàng)建事件函數(shù)。
如果要通過特定HTTP請求觸發(fā)數(shù)據(jù)處理,請創(chuàng)建Web函數(shù)。具體操作,請參見創(chuàng)建Web函數(shù)。
如果要定時或異步觸發(fā)數(shù)據(jù)處理,請創(chuàng)建任務函數(shù)。具體操作,請參見創(chuàng)建任務函數(shù)。
基本設(shè)置:設(shè)置函數(shù)名稱。
函數(shù)代碼:配置函數(shù)的運行環(huán)境和代碼相關(guān)信息。
配置項
說明
示例
運行環(huán)境
選擇您熟悉的語言,例如Python、Java、PHP、Node.js或自定義容器等。
自定義容器鏡像。
此處選擇Python 3.9。
代碼上傳方式
選擇代碼上傳到函數(shù)計算的方式。
使用示例代碼:默認方式,您可以根據(jù)業(yè)務需要選擇函數(shù)計算為您提供的創(chuàng)建函數(shù)的示例代碼。
通過 ZIP 包上傳代碼:選擇函數(shù)代碼ZIP包并上傳。
通過文件夾上傳代碼:選擇包含函數(shù)代碼的文件夾并上傳。
通過 OSS 上傳代碼:選擇上傳函數(shù)代碼的Bucket 名稱和文件名稱。
此處請選擇使用示例代碼后,在示例代碼列表中選擇Hello, world! 示例。
高級配置:配置函數(shù)的實例相關(guān)信息和函數(shù)執(zhí)行超時時間等。
配置項
說明
示例
規(guī)格方案
根據(jù)您的業(yè)務情況,選擇或手動輸入合理的vCPU規(guī)格和內(nèi)存規(guī)格組合。關(guān)于各資源使用的計費詳情,請參見計費概述。
說明vCPU大小(單位為核)與內(nèi)存大小(單位為GB)的比例必須設(shè)置在1:1到1:4之間。
0.35核,512 MB
臨時硬盤大小
根據(jù)您的業(yè)務情況,選擇臨時存儲文件的硬盤大小。
取值說明如下。
512 MB:默認值。不計費,函數(shù)計算為您提供512 MB以內(nèi)的硬盤免費使用額度。
10 GB:按9.5 GB進行計費。
說明臨時硬盤中所有目錄可寫,共享臨時硬盤的空間。
臨時硬盤大小與底層執(zhí)行函數(shù)的實例生命周期一致,實例被系統(tǒng)回收后,硬盤上的數(shù)據(jù)也會消失。如您需要對文件進行持久化保存,可以選擇掛載NAS或OSS。具體操作,請參見配置NAS文件系統(tǒng)和配置OSS對象存儲。
512 MB
執(zhí)行超時時間
設(shè)置超時時間。執(zhí)行超時時間默認為180秒,最長為86400秒。
180
請求處理程序
設(shè)置請求處理程序,函數(shù)計算的運行時會加載并調(diào)用您的請求處理程序處理請求。創(chuàng)建函數(shù)的方式選擇Web函數(shù)時,無需設(shè)置此配置項。
說明代碼上傳方式選擇使用示例代碼時,不需要修改請求處理程序。當選擇其他代碼上傳方式時,則需要根據(jù)實際情況修改請求處理程序,否則函數(shù)執(zhí)行時會報錯。
index.handler
時區(qū)
選擇函數(shù)的時區(qū)。此處設(shè)置函數(shù)的時區(qū)后,將自動為函數(shù)添加一條環(huán)境變量TZ,其值為您設(shè)置的目標時區(qū)。
UTC
函數(shù)角色
函數(shù)計算平臺會使用這個RAM角色來生成訪問您的阿里云資源的臨時密鑰,并傳遞給您的代碼。
重要需授予函數(shù)角色訪問表格存儲服務的權(quán)限。更多信息,請參見附錄:授予函數(shù)計算訪問表格存儲的權(quán)限。
AliyunFCDefaultRole
允許訪問 VPC
是否允許函數(shù)訪問VPC內(nèi)資源。更多信息,請參見配置網(wǎng)絡(luò)。
是
專有網(wǎng)絡(luò)
允許訪問 VPC選擇是時必填。創(chuàng)建新的VPC或在下拉列表中選擇要訪問的VPC ID。
fc.auto.create.vpc.1632317****
交換機
允許訪問 VPC選擇是時必填。創(chuàng)建新的交換機或在下拉列表中選擇交換機ID。
fc.auto.create.vswitch.vpc-bp1p8248****
安全組
允許訪問 VPC選擇是時必填。創(chuàng)建新的安全組或在下拉列表中選擇安全組。
fc.auto.create.SecurityGroup.vsw-bp15ftbbbbd****
允許函數(shù)默認網(wǎng)卡訪問公網(wǎng)
是否允許函數(shù)可以通過默認網(wǎng)卡訪問公網(wǎng)。關(guān)閉后,當前服務中的函數(shù)將無法通過函數(shù)計算的默認網(wǎng)卡訪問公網(wǎng)。
重要使用固定公網(wǎng)IP地址功能時,您必須關(guān)閉允許函數(shù)默認網(wǎng)卡訪問公網(wǎng),否則配置的固定公網(wǎng)IP地址不生效。更多信息,請參見配置固定公網(wǎng)IP地址。
是
日志功能
是否啟用阿里云日志服務。取值說明如下:
啟用:函數(shù)的執(zhí)行日志被持久化保存到日志服務,方便您代碼調(diào)試、故障分析和數(shù)據(jù)分析等。
說明啟用日志功能后,函數(shù)中打印到 stdout 的內(nèi)容就會被阿里云日志服務采集到。然后您可以查看函數(shù)的執(zhí)行日志,從而方便您的代碼調(diào)試、故障分析、數(shù)據(jù)分析等操作。
點擊配置日志查看更多詳情。
函數(shù)計算在后臺為您創(chuàng)建的日志服務資源會產(chǎn)生費用,詳情請參見按使用功能計費模式計費項。
禁用:函數(shù)的執(zhí)行日志將無法通過日志服務存儲和查詢。
啟用
(可選)環(huán)境變量:設(shè)置函數(shù)運行環(huán)境中的環(huán)境變量。更多信息,請參見配置環(huán)境變量。
創(chuàng)建Tablestore觸發(fā)器。
在函數(shù)詳情頁簽,選擇配置頁簽,在左側(cè)導航欄,單擊觸發(fā)器,然后單擊創(chuàng)建觸發(fā)器。
在創(chuàng)建觸發(fā)器面板,填寫相關(guān)信息,然后單擊確定。
配置項
操作
本文示例
觸發(fā)器類型
選擇表格存儲 Tablestore。
表格存儲Tablestore
名稱
自定義填寫觸發(fā)器名稱。
Tablestore-trigger
版本或別名
默認值為LATEST,如果您需要創(chuàng)建其他版本或別名的觸發(fā)器,需先在函數(shù)詳情頁的版本或別名下拉列表選擇該版本或別名。關(guān)于版本和別名的簡介,請參見版本管理和別名管理。
LATEST
實例
在列表中選擇已創(chuàng)建的Tablestore實例。
d00dd8xm****
表格
在列表中選擇已創(chuàng)建的表格。
mytable
角色名稱
選擇AliyunTableStoreStreamNotificationRole。
說明如果您第一次創(chuàng)建該類型的觸發(fā)器,則需要在單擊確定后,在彈出的對話框中選擇立即授權(quán)。
AliyunTableStoreStreamNotificationRole
創(chuàng)建完成后,在觸發(fā)器名稱列表中顯示已創(chuàng)建的觸發(fā)器。如需對創(chuàng)建的觸發(fā)器進行修改或刪除,具體操作,請參見觸發(fā)器管理。
步驟三:驗證測試
創(chuàng)建觸發(fā)器后,通過在表格存儲中寫入和查詢數(shù)據(jù)驗證數(shù)據(jù)清洗是否成功。
在函數(shù)詳情頁簽的代碼頁簽,使用代碼編輯器中編寫代碼。
此處以Python函數(shù)代碼為例介紹。其中INSTANCE_NAME(表格存儲的實例名稱)、REGION(使用的地域)、ENDPOINT(服務地址)和RESULT_TABLENAME(結(jié)果表)需要根據(jù)情況進行修改。
#!/usr/bin/env python # -*- coding: utf-8 -*- import cbor import json import tablestore as ots INSTANCE_NAME = 'distribute-test' REGION = 'cn-shanghai' ENDPOINT = 'http://%s.%s.vpc.tablestore.aliyuncs.com' % (INSTANCE_NAME, REGION) RESULT_TABLENAME = 'result' def get_attrbute_value(record, column): attrs = record[u'Columns'] for x in attrs: if x[u'ColumnName'] == column: return x['Value'] def get_pk_value(record, column): attrs = record[u'PrimaryKey'] for x in attrs: if x['ColumnName'] == column: return x['Value'] # 由于已經(jīng)授權(quán)了AliyunOTSFullAccess權(quán)限,此處獲取的credentials具有訪問表格存儲的權(quán)限。 def get_ots_client(context): creds = context.credentials client = ots.OTSClient(ENDPOINT, creds.access_key_id, creds.access_key_secret, INSTANCE_NAME, sts_token=creds.security_token) return client def save_to_ots(client, record): id = int(get_pk_value(record, 'id')) level = int(get_attrbute_value(record, 'level')) msg = get_attrbute_value(record, 'message') pk = [('id', id), ] attr = [('level', level), ('message', msg), ] row = ots.Row(pk, attr) client.put_row(RESULT_TABLENAME, row) def handler(event, context): records = cbor.loads(event) # records = json.loads(event) client = get_ots_client(context) for record in records['Records']: level = int(get_attrbute_value(record, 'level')) if level > 1: save_to_ots(client, record) else: print("level <= 1, ignore.")
向source_data數(shù)據(jù)表中寫入數(shù)據(jù),依次填入id、level和message信息,并在result表中查詢清洗后的數(shù)據(jù)。
當向source_data表中寫入level>1的數(shù)據(jù)時,數(shù)據(jù)會同步到result表中。
當向source_data表中寫入level<=1的數(shù)據(jù)時,數(shù)據(jù)不會同步到result表中。
常見問題
如果您無法在某一地域創(chuàng)建Tablestore觸發(fā)器,請確認支持創(chuàng)建Tablestore觸發(fā)器的地域,具體請參見注意事項。
如果您在創(chuàng)建Tablestore觸發(fā)器時無法找到已經(jīng)創(chuàng)建好的表格存儲數(shù)據(jù)表,請確認表格存儲數(shù)據(jù)表與函數(shù)計算服務是否處于同一地域。
使用Tablestore觸發(fā)器時,總是會報客戶端取消的報錯,一般是由于客戶端調(diào)用函數(shù)時設(shè)置的超時時間小于函數(shù)執(zhí)行時間。建議您將客戶端超時時間調(diào)大,具體請參見客戶端斷開連接,報錯Invocation canceled by client怎么辦?。
如果Tablestore數(shù)據(jù)表中有新增的數(shù)據(jù),但是Tablestore觸發(fā)器沒有被觸發(fā),您可以從以下方面進行排查。關(guān)于觸發(fā)器不能正常觸發(fā)的詳細排查方案可參見觸發(fā)器不能正常觸發(fā)函數(shù)執(zhí)行怎么辦?。
確認數(shù)據(jù)表是否開啟了Stream功能,具體請參見為數(shù)據(jù)表開啟Stream功能。
確認在創(chuàng)建觸發(fā)器時配置的角色是否正確,您可以使用默認的觸發(fā)器角色
AliyunTableStoreStreamNotificationRole
,具體請參見創(chuàng)建Tablestore觸發(fā)器。查看是否有函數(shù)運行日志,可以根據(jù)日志確認是否是函數(shù)執(zhí)行失敗。函數(shù)執(zhí)行失敗后,會一直重試直到Tablestore中的日志數(shù)據(jù)過期。
如果函數(shù)執(zhí)行時報錯“access_key_id is None or empty.”,請確認配置的函數(shù)角色是否擁有訪問表格存儲的權(quán)限,具體請參見附錄:授予函數(shù)計算訪問表格存儲的權(quán)限。