SLS觸發(fā)器
通過配置日志服務(wù)SLS觸發(fā)器,您可以實現(xiàn)日志服務(wù)SLS與函數(shù)計算的集成。SLS觸發(fā)器能夠在新日志產(chǎn)生時自動觸發(fā)函數(shù)執(zhí)行,從而增量消費(fèi)日志服務(wù)Logstore的數(shù)據(jù),并完成自定義加工任務(wù)。
使用場景
數(shù)據(jù)清洗、加工場景
通過日志服務(wù),快速完成日志采集、加工、查詢、分析。
數(shù)據(jù)投遞場景
為數(shù)據(jù)的目的端落地提供支撐,構(gòu)建云上大數(shù)據(jù)產(chǎn)品間的數(shù)據(jù)管道。
數(shù)據(jù)加工函數(shù)
函數(shù)類型
模板函數(shù)
更多信息,請參見aliyun-log-fc-functions。
自定義函數(shù)
函數(shù)配置的格式與函數(shù)的具體實現(xiàn)有關(guān)。更多信息,請參見ETL函數(shù)開發(fā)指南。
函數(shù)計算觸發(fā)機(jī)制
日志服務(wù)ETL Job對應(yīng)于函數(shù)計算的一個觸發(fā)器,當(dāng)創(chuàng)建日志服務(wù)ETL Job后,日志服務(wù)會根據(jù)該ETL Job的配置啟動定時器,定時器輪詢Logstore中的Shard信息,當(dāng)發(fā)現(xiàn)有新的數(shù)據(jù)寫入時,即生成<shard_id,begin_cursor,end_cursor >
三元組信息作為函數(shù)Event,并觸發(fā)函數(shù)執(zhí)行。函數(shù)Event是日志服務(wù)SLS推送到函數(shù)計算。
當(dāng)存儲系統(tǒng)升級時,即使沒有新數(shù)據(jù)寫入,也可能發(fā)生Cursor變化,在這種情況下,每個Shard會額外空觸發(fā)一次。針對這種情況,您可以在函數(shù)內(nèi)通過Cursor嘗試獲取Shard的數(shù)據(jù),如果獲取不到數(shù)據(jù)說明是一次空觸發(fā),可以在函數(shù)內(nèi)做忽略處理。更多信息,請參見自定義函數(shù)開發(fā)指南。
日志服務(wù)的ETL任務(wù)觸發(fā)機(jī)制是時間觸發(fā)。例如:您設(shè)置的ETL Job觸發(fā)間隔為60秒,Logstore的Shard0一直有數(shù)據(jù)寫入,那么Shard每60秒就會觸發(fā)一次函數(shù)執(zhí)行(如果Shard沒有新的數(shù)據(jù)寫入則不會觸發(fā)函數(shù)執(zhí)行),函數(shù)執(zhí)行的輸入為最近60秒的Cursor區(qū)間。在函數(shù)內(nèi),可以根據(jù)Cursor讀取Shard0數(shù)據(jù)進(jìn)行下一步處理。
使用限制
單個日志項目(Project)關(guān)聯(lián)的SLS觸發(fā)器數(shù)量最大不得超過該P(yáng)roject下已有的Logstore數(shù)量的5倍。
建議每個Logstore配置的SLS觸發(fā)器數(shù)量不超過5個,否則可能會影響數(shù)據(jù)投遞到函數(shù)計算的效率。
示例場景
您可以配置一個SLS觸發(fā)器,該觸發(fā)器將定時獲取更新的數(shù)據(jù)并觸發(fā)函數(shù)執(zhí)行,增量消費(fèi)日志服務(wù)Logstore中的數(shù)據(jù),在函數(shù)里完成自定義加工任務(wù)(例如數(shù)據(jù)清洗和加工)以及將數(shù)據(jù)投遞給第三方服務(wù)。本示例中只演示如何獲取日志數(shù)據(jù)并打印。
用于數(shù)據(jù)加工的函數(shù)可以是日志服務(wù)提供的模板,也可以是您的自定義函數(shù)。
前提條件
函數(shù)計算
日志服務(wù)SLS
您需要創(chuàng)建一個日志項目和兩個日志庫。一個日志庫用于處理日志及數(shù)據(jù)源,另一個日志庫用于存儲函數(shù)計算產(chǎn)生的日志。
說明日志項目(Project)所在地域和函數(shù)計算服務(wù)所在地域必須一致。
步驟一:創(chuàng)建SLS觸發(fā)器
登錄函數(shù)計算控制臺,在左側(cè)導(dǎo)航欄,單擊函數(shù)。
在頂部菜單欄,選擇地域,然后在函數(shù)頁面,單擊目標(biāo)函數(shù)。
在函數(shù)配置頁面,選擇配置頁簽,在左側(cè)導(dǎo)航欄,單擊觸發(fā)器,然后單擊創(chuàng)建觸發(fā)器。
在創(chuàng)建觸發(fā)器面板,填寫相關(guān)信息,然后單擊確定。
配置項
操作
本文示例
觸發(fā)器類型
選擇日志服務(wù) SLS。
日志服務(wù)SLS
名稱
填寫自定義的觸發(fā)器名稱。
log_trigger
版本或別名
默認(rèn)值為LATEST,如果您需要創(chuàng)建其他版本或別名的觸發(fā)器,需先在函數(shù)詳情頁的右上角切換到該版本或別名。關(guān)于版本和別名的簡介,請參見管理版本和管理別名。
LATEST
日志項目
選擇已創(chuàng)建的日志項目。
aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****
日志庫
選擇已創(chuàng)建的日志庫,當(dāng)前觸發(fā)器會定時從該日志庫中訂閱數(shù)據(jù)到函數(shù)服務(wù)進(jìn)行自定義加工。
function-log
觸發(fā)間隔
填寫日志服務(wù)觸發(fā)函數(shù)運(yùn)行的時間間隔。
取值范圍:[3,600],單位:秒。默認(rèn)值:60。
60
重試次數(shù)
填寫單次觸發(fā)允許的最大重試次數(shù)。
取值范圍:[0,100]。默認(rèn)值:3。
說明執(zhí)行成功的情況為status=200并且header中參數(shù)
X-Fc-Error-Type
的值不是UnhandledInvocationError
和HandledInvocationError
的錯誤。其他情況表示執(zhí)行失敗,會觸發(fā)重試。關(guān)于參數(shù)X-Fc-Error-Type
請參見返回數(shù)據(jù)。如果函數(shù)執(zhí)行失敗,會一直重試當(dāng)前請求,直到函數(shù)執(zhí)行成功。首先會按照配置的重試次數(shù)進(jìn)行重試,超過最大重試次數(shù)仍然無法成功的,會增加時間間隔進(jìn)入退避重試。
3
觸發(fā)器日志
選擇已創(chuàng)建的日志庫,日志服務(wù)觸發(fā)函數(shù)執(zhí)行過程的日志會記錄到該日志庫中。
function-log2
調(diào)用參數(shù)
如果您想傳入自定義參數(shù),可以在此處配置。該參數(shù)將作為event的parameter參數(shù)傳入函數(shù)。該參數(shù)取值必須是JSON格式的字符串。
默認(rèn)值為空。
無
角色名稱
選擇AliyunLogETLRole。
說明如果您第一次創(chuàng)建該類型的觸發(fā)器,則需要在單擊確定后,在彈出的對話框中選擇立即授權(quán)。
AliyunLogETLRole
創(chuàng)建完成后,在觸發(fā)器名稱列表中顯示已創(chuàng)建的觸發(fā)器。如需對創(chuàng)建的觸發(fā)器進(jìn)行修改或刪除,具體操作,請參見觸發(fā)器管理。
步驟二:配置函數(shù)的入口參數(shù)
在函數(shù)配置頁面的代碼頁簽,單擊測試函數(shù)右側(cè)的圖標(biāo),從下拉列表中,選擇配置測試參數(shù)。
在配置測試參數(shù)面板,選擇創(chuàng)建新測試事件或編輯已有測試事件,填寫事件名稱和事件內(nèi)容,然后單擊確定。
event是函數(shù)計算的入口參數(shù)。具體格式如下:
{ "parameter": {}, "source": { "endpoint": "http://cn-hangzhou-intranet.log.aliyuncs.com", "projectName": "aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****", "logstoreName": "function-log", "shardId": 0, "beginCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2Mw==", "endCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2NA==" }, "jobName": "1f7043ced683de1a4e3d8d70b5a412843d81****", "taskId": "c2691505-38da-4d1b-998a-f1d4bb8c****", "cursorTime": 1529486425 }
參數(shù)
描述
本文示例
parameter
您配置觸發(fā)器時填寫的調(diào)用參數(shù)的值。
無。
source
設(shè)置函數(shù)讀取的日志塊信息。
endpoint:日志服務(wù)Project所屬的阿里云地域。
projectName:日志服務(wù)Project名稱。
logstoreName:Logstore名稱。
shardId:Logstore中一個確定的Shard。
beginCursor:開始消費(fèi)數(shù)據(jù)的位置。
endCursor:停止消費(fèi)數(shù)據(jù)的位置。
{ "endpoint": "http://cn-hangzhou-intranet.log.aliyuncs.com", "projectName": "aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****", "logstoreName": "function-log", "shardId": 0, "beginCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2Mw==", "endCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2NA==" }
jobName
日志服務(wù)ETL Job名字,函數(shù)配置的SLS觸發(fā)器對應(yīng)一個日志服務(wù)的ETL Job。
1f7043ced683de1a4e3d8d70b5a412843d81****
taskId
對于ETL Job而言,taskId是一個確定性的函數(shù)調(diào)用標(biāo)識。
c2691505-38da-4d1b-998a-f1d4bb8c****
cursorTime
最后一條日志到達(dá)日志服務(wù)端的Unix時間戳,單位:秒。
1529486425
步驟三:編寫函數(shù)并測試
完成創(chuàng)建日志觸發(fā)器后,您可以編寫函數(shù)代碼并測試以驗證代碼的正確性。在實際操作過程中,當(dāng)日志服務(wù)收集增量日志時觸發(fā)該函數(shù),函數(shù)計算獲取對應(yīng)日志,然后打印收集的日志。
在函數(shù)配置頁面的代碼頁簽,在代碼編輯器中編寫代碼,然后單擊部署代碼。
本文以Python函數(shù)代碼為例。以下示例代碼可以作為提取大部分邏輯日志的模板。其中
accessKeyId
和accessKey
可以從context
和creds
中獲取。""" 本代碼樣例主要實現(xiàn)以下功能: * 從 event 中解析出 SLS 事件觸發(fā)相關(guān)信息 * 根據(jù)以上獲取的信息,初始化 SLS 客戶端 * 從源 log store 獲取實時日志數(shù)據(jù) This sample code is mainly doing the following things: * Get SLS processing related information from event * Initiate SLS client * Pull logs from source log store """ #!/usr/bin/env python # -*- coding: utf-8 -*- import logging import json import os from aliyun.log import LogClient logger = logging.getLogger() def handler(event, context): # 可以通過 context.credentials 獲取密鑰信息 # Access keys can be fetched through context.credentials print("The content in context entity is: \n") print(context) creds = context.credentials access_key_id = creds.access_key_id access_key_secret = creds.access_key_secret security_token = creds.security_token # 解析 event 參數(shù)至 object 格式 # parse event in object event_obj = json.loads(event.decode()) print("The content in event entity is: \n") print(event_obj) # 從 event.source 中獲取日志項目名稱、日志倉庫名稱、日志服務(wù)訪問 endpoint、日志起始游標(biāo)、日志終點游標(biāo)以及分區(qū) id # Get the name of log project, the name of log store, the endpoint of sls, begin cursor, end cursor and shardId from event.source source = event_obj['source'] log_project = source['projectName'] log_store = source['logstoreName'] endpoint = source['endpoint'] begin_cursor = source['beginCursor'] end_cursor = source['endCursor'] shard_id = source['shardId'] # 初始化 sls 客戶端 # Initialize client of sls client = LogClient(endpoint=endpoint, accessKeyId=access_key_id, accessKey=access_key_secret, securityToken=security_token) # 基于日志的游標(biāo)從源日志庫中讀取日志,本示例中的游標(biāo)范圍包含了觸發(fā)本次執(zhí)行的所有日志內(nèi)容 # Read data from source logstore within cursor: [begin_cursor, end_cursor) in the example, which contains all the logs trigger the invocation while True: response = client.pull_logs(project_name=log_project, logstore_name=log_store, shard_id=shard_id, cursor=begin_cursor, count=100, end_cursor=end_cursor, compress=False) log_group_cnt = response.get_loggroup_count() if log_group_cnt == 0: break logger.info("get %d log group from %s" % (log_group_cnt, log_store)) logger.info(response.get_loggroup_list()) begin_cursor = response.get_next_cursor() return 'success'
單擊測試函數(shù)。
執(zhí)行完成后,您可以在函數(shù)代碼頁簽的上方查看執(zhí)行結(jié)果。
常見問題
在有新日志產(chǎn)生時,SLS觸發(fā)器未觸發(fā)函數(shù)執(zhí)行,解決方案請參考觸發(fā)器不能正常觸發(fā)函數(shù)執(zhí)行怎么辦?。
日志服務(wù)SLS的每個Shard在有新的數(shù)據(jù)寫入的情況下都會觸發(fā)函數(shù)執(zhí)行,所以您看到的觸發(fā)頻次是一個Logstore整體的觸發(fā)次數(shù)。同時當(dāng)觸發(fā)發(fā)生延遲時觸發(fā)器會追趕數(shù)據(jù),可能會縮短觸發(fā)間隔。更詳細(xì)的信息請參見為什么SLS觸發(fā)器觸發(fā)函數(shù)執(zhí)行的頻次有時高于預(yù)期?。