依托阿里云函數計算服務,日志服務提供流式數據加工服務。您可以通過配置一個函數計算觸發器任務,定時獲取更新的數據并觸發函數的執行,進而增量消費日志服務Logstore的數據,并完成自定義加工任務。日志服務提供的函數模板或者用戶自定義函數均可作為數據加工函數。
前提條件
已為日志服務觸發函數執行授權。更多信息,請參見云資源訪問授權。
已創建Project和Logstore。更多信息,請參見創建Project和Logstore。
使用限制
單個日志項目(Project)關聯的SLS觸發器數量最大不得超過該Project下已有的Logstore數量的5倍。
建議每個Logstore配置的SLS觸發器數量不超過5個,否則可能會影響數據投遞到函數計算的效率。
適用場景
數據清洗、加工場景
通過日志服務,快速完成日志采集、加工、查詢及分析。
數據投遞場景
為數據的目的端提供支撐,構建云上大數據產品之間的數據管道。
數據加工函數
函數類型
模板函數
更多信息,請參見aliyun-log-fc-functions。
用戶自定義函數
函數配置的格式與函數的具體實現有關。更多信息,請參見ETL函數開發指南。
函數計算觸發機制
函數計算觸發器任務對應于函數計算的一個觸發器,當創建函數計算觸發器任務后,日志服務會根據該觸發器任務的配置啟動定時器,定時器輪詢Logstore中的Shard信息,當發現有新的數據寫入時,即生成
<shard_id,begin_cursor,end_cursor >
三元組信息作為函數Event,并觸發函數執行。說明當存儲系統升級時,即使沒有新數據寫入,也可能發生Cursor變化,在這種情況下,每個Shard會額外空觸發一次。針對這種情況,您可以在函數內通過Cursor嘗試獲取Shard的數據,如果獲取不到數據說明是一次空觸發,可以在函數內做忽略處理。更多信息,請參見自定義函數開發指南。
函數計算觸發器任務觸發機制是時間觸發。例如:您設置的函數計算觸發器任務觸發間隔為60秒,Logstore的Shard0一直有數據寫入,那么Shard0每60秒就會觸發一次函數執行(如果Shard沒有新的數據寫入則不會觸發函數執行),函數執行的輸入為最近60秒的Cursor區間。在函數內,可以根據Cursor讀取Shard0數據進行下一步處理。
步驟一:創建SLS觸發器
登錄函數計算控制臺,在左側導航欄,單擊函數。
在頂部菜單欄,選擇地域,然后在函數頁面,單擊目標函數。
在函數詳情頁面,選擇配置頁簽,在左側導航欄,單擊觸發器,然后單擊創建觸發器。
在創建觸發器面板,填寫相關信息,然后單擊確定。
配置項
操作
本文示例
觸發器類型
選擇日志服務 SLS。
日志服務SLS
名稱
填寫自定義的觸發器名稱。
log_trigger
版本或別名
默認值為LATEST,如果您需要創建其他版本或別名的觸發器,需先在函數詳情頁的右上角切換到該版本或別名。關于版本和別名的簡介,請參見版本管理和別名管理。
LATEST
日志項目
選擇已創建的日志項目。
aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****
日志庫
選擇已創建的日志庫,當前觸發器會定時從該日志庫中訂閱數據到函數服務進行自定義加工。
function-log
觸發間隔
填寫日志服務觸發函數運行的時間間隔。
取值范圍:[3,600],單位:秒。默認值:60。
60
重試次數
填寫單次觸發允許的最大重試次數。
取值范圍:[0,100]。默認值:3。
說明執行成功的情況為status=200并且header中參數
X-Fc-Error-Type
的值不是UnhandledInvocationError
和HandledInvocationError
的錯誤。其他情況表示執行失敗,會觸發重試。關于參數X-Fc-Error-Type
請參見返回數據。如果函數執行失敗,會一直重試當前請求,直到函數執行成功。首先會按照配置的重試次數進行重試,超過最大重試次數仍然無法成功的,會增加時間間隔進入退避重試。
3
觸發器日志
選擇已創建的日志庫,日志服務觸發函數執行過程的日志會記錄到該日志庫中。
function-log2
調用參數
如果您想傳入自定義參數,可以在此處配置。該參數將作為event的parameter參數傳入函數。該參數取值必須是JSON格式的字符串。
默認值為空。
無
角色名稱
選擇AliyunLogETLRole。
說明如果您第一次創建該類型的觸發器,則需要在單擊確定后,在彈出的對話框中選擇立即授權。
AliyunLogETLRole
創建完成后,在觸發器名稱列表中顯示已創建的觸發器。如需對創建的觸發器進行修改或刪除,具體操作,請參見觸發器管理。
步驟二:配置函數的入口參數
在函數詳情頁面的代碼頁簽,單擊測試函數右側的圖標,從下拉列表中,選擇配置測試參數。
在配置測試參數面板,選擇創建新測試事件或編輯已有測試事件,填寫事件名稱和事件內容,然后單擊確定。
event是函數計算的入口參數。具體格式如下:
{ "parameter": {}, "source": { "endpoint": "http://cn-hangzhou-intranet.log.aliyuncs.com", "projectName": "aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****", "logstoreName": "function-log", "shardId": 0, "beginCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2Mw==", "endCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2NA==" }, "jobName": "1f7043ced683de1a4e3d8d70b5a412843d81****", "taskId": "c2691505-38da-4d1b-998a-f1d4bb8c****", "cursorTime": 1529486425 }
參數
描述
本文示例
parameter
您配置觸發器時填寫的調用參數的值。
無。
source
設置函數讀取的日志塊信息。
endpoint:日志服務Project所屬的阿里云地域。
projectName:日志服務Project名稱。
logstoreName:Logstore名稱。
shardId:Logstore中一個確定的Shard。
beginCursor:開始消費數據的位置。
endCursor:停止消費數據的位置。
{ "endpoint": "http://cn-hangzhou-intranet.log.aliyuncs.com", "projectName": "aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****", "logstoreName": "function-log", "shardId": 0, "beginCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2Mw==", "endCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2NA==" }
jobName
日志服務ETL Job名字,函數配置的SLS觸發器對應一個日志服務的ETL Job。
1f7043ced683de1a4e3d8d70b5a412843d81****
taskId
對于ETL Job而言,taskId是一個確定性的函數調用標識。
c2691505-38da-4d1b-998a-f1d4bb8c****
cursorTime
最后一條日志到達日志服務端的Unix時間戳,單位:秒。
1529486425
步驟三:編寫函數并測試
完成創建日志觸發器后,您可以編寫函數代碼并測試以驗證代碼的正確性。在實際操作過程中,當日志服務收集增量日志時觸發該函數,函數計算獲取對應日志,然后打印收集的日志。
在函數詳情頁面的代碼頁簽,在代碼編輯器中編寫代碼,然后單擊部署代碼。
本文以Python函數代碼為例。以下示例代碼可以作為提取大部分邏輯日志的模板。其中
accessKeyId
和accessKey
可以從context
和creds
中獲取。""" 本代碼樣例主要實現以下功能: * 從 event 中解析出 SLS 事件觸發相關信息 * 根據以上獲取的信息,初始化 SLS 客戶端 * 從源 log store 獲取實時日志數據 This sample code is mainly doing the following things: * Get SLS processing related information from event * Initiate SLS client * Pull logs from source log store """ #!/usr/bin/env python # -*- coding: utf-8 -*- import logging import json import os from aliyun.log import LogClient logger = logging.getLogger() def handler(event, context): # 可以通過 context.credentials 獲取密鑰信息 # Access keys can be fetched through context.credentials print("The content in context entity is: \n") print(context) creds = context.credentials access_key_id = creds.access_key_id access_key_secret = creds.access_key_secret security_token = creds.security_token # 解析 event 參數至 object 格式 # parse event in object event_obj = json.loads(event.decode()) print("The content in event entity is: \n") print(event_obj) # 從 event.source 中獲取日志項目名稱、日志倉庫名稱、日志服務訪問 endpoint、日志起始游標、日志終點游標以及分區 id # Get the name of log project, the name of log store, the endpoint of sls, begin cursor, end cursor and shardId from event.source source = event_obj['source'] log_project = source['projectName'] log_store = source['logstoreName'] endpoint = source['endpoint'] begin_cursor = source['beginCursor'] end_cursor = source['endCursor'] shard_id = source['shardId'] # 初始化 sls 客戶端 # Initialize client of sls client = LogClient(endpoint=endpoint, accessKeyId=access_key_id, accessKey=access_key_secret, securityToken=security_token) # 基于日志的游標從源日志庫中讀取日志,本示例中的游標范圍包含了觸發本次執行的所有日志內容 # Read data from source logstore within cursor: [begin_cursor, end_cursor) in the example, which contains all the logs trigger the invocation while True: response = client.pull_logs(project_name=log_project, logstore_name=log_store, shard_id=shard_id, cursor=begin_cursor, count=100, end_cursor=end_cursor, compress=False) log_group_cnt = response.get_loggroup_count() if log_group_cnt == 0: break logger.info("get %d log group from %s" % (log_group_cnt, log_store)) logger.info(response.get_loggroup_list()) begin_cursor = response.get_next_cursor() return 'success'
單擊測試函數。
執行完成后,您可以在函數代碼頁簽的上方查看執行結果。
相關操作
常見問題
當您創建觸發器后但未觸發函數執行,如何解決?
您可以從以下兩個方面排查。
確認函數計算觸發器任務配置的Logstore是否有數據增量修改,當Shard數據有變化時會觸發函數執行。
查看觸發器日志、函數運行日志查看是否有異常。
為什么函數觸發頻次有時會高于預期的觸發頻次?
每個Shard是單獨觸發的,您看到的可能是一個Logstore整體觸發次數很多,但每個Shard實時觸發時間是符合間隔的。
單個Shard的觸發間隔和每次處理的數據范圍相同(時間區間)。觸發間隔在函數執行時分如下兩種情況,假設觸發間隔為60秒。
觸發沒有延遲:按照設定周期觸發,每60秒觸發一次,處理的數據范圍為
[now -60s, now)
。說明函數觸發是分Shard獨立進行的, 假設Logstore有10個Shard,在實時處理數據時(觸發無延遲),每60秒對應10次函數觸發執行。
觸發發生延遲(當前處理到的日志服務Shard位置落后于最新寫入數據超過10秒):觸發器會進行追趕,可能縮短到2秒觸發一次,每次處理的數據范圍仍是60秒窗口。