自定義函數(shù)開發(fā)指南
當(dāng)您通過函數(shù)計(jì)算消費(fèi)日志數(shù)據(jù)時(shí),在不同場景您可以選擇使用日志服務(wù)提供的函數(shù)模板或自定義函數(shù)。本文介紹如何構(gòu)建一個(gè)自定義函數(shù)。
函數(shù)Event
在通過函數(shù)計(jì)算消費(fèi)日志數(shù)據(jù)的過程中,步驟二需要配置函數(shù)的入口參數(shù)(函數(shù)Event),格式為一個(gè)JSON Object序列化后字符串。
參數(shù)說明
參數(shù)
說明
jobName
日志服務(wù)ETL Job名稱。函數(shù)計(jì)算服務(wù)上的日志服務(wù)觸發(fā)器對(duì)應(yīng)一個(gè)日志服務(wù)的ETL Job。
taskId
對(duì)于一個(gè)ETL Job,taskId是某一次確定性的函數(shù)調(diào)用標(biāo)識(shí)。
cursorTime
本次函數(shù)調(diào)用包括的數(shù)據(jù)中,最后一條日志到達(dá)日志服務(wù)的服務(wù)器端的unix_timestamp。
source
該字段由日志服務(wù)生成,日志服務(wù)根據(jù)ETL Job定義的任務(wù)間隔定時(shí)觸發(fā)函數(shù)執(zhí)行,source字段是函數(shù)Event中的重要組成部分,定義了本次函數(shù)調(diào)用的消費(fèi)范圍。
endpoint:Project所屬地域的服務(wù)入口,詳情請(qǐng)參見服務(wù)入口。
projectName:Project名稱。
logstoreName:Logstore名稱。
shardId:Logstore下的某一個(gè)確定Shard。
beginCursor:需要從Shard的什么位置開始消費(fèi)數(shù)據(jù)。
endCursor:需要消費(fèi)Shard數(shù)據(jù)到什么位置。
說明Shard對(duì)應(yīng)的[beginCursor,endCursor)是一個(gè)左閉右開區(qū)間。
parameter
JSON Object類型,在您創(chuàng)建觸發(fā)器函數(shù)配置時(shí)設(shè)置。自定義日志服務(wù)ETL函數(shù)運(yùn)行時(shí)解析該字段,可以獲取到函數(shù)所需要的運(yùn)行參數(shù)。詳情請(qǐng)參見SLS觸發(fā)器。
示例
{ "source": { "endpoint": "http://cn-shanghai-intranet.log.aliyuncs.com", "projectName": "fc-****************", "logstoreName": "demo", "shardId": 0, "beginCursor": "MTUwNTM5MDI3NTY1ODcwNzU2Ng==", "endCursor": "MTUwNTM5MDI3NTY1ODcwNzU2OA==" }, "parameter": { ... }, "jobName": "fedad35f51a2a97b466da57fd71f315f539d2234", "taskId": "9bc06c96-e364-4f41-85eb-b6e579214ae4", "cursorTime": 1511429883 }
在函數(shù)調(diào)試時(shí)候,您可以調(diào)用GetCursor - 通過時(shí)間查詢Cursor接口獲取cursor并按上述示例構(gòu)建一個(gè)函數(shù)Event用于測試。
函數(shù)開發(fā)
您可以通過Java、Python、Node.js等多種語言實(shí)現(xiàn)函數(shù)開發(fā),日志服務(wù)提供了相應(yīng)Runtime的SDK,以便您在函數(shù)中進(jìn)行集成,詳情請(qǐng)參見SDK參考。
以下內(nèi)容以Java 8 Runtime為例,介紹如何開發(fā)日志服務(wù)ETL函數(shù)。關(guān)于Java 8函數(shù)編程細(xì)節(jié),詳情請(qǐng)參見函數(shù)計(jì)算服務(wù)Java編程指南。
Java函數(shù)模板
目前,日志服務(wù)提供了基于Java8 Runtime的自定義數(shù)據(jù)模板,您可以在這基礎(chǔ)上完成自定義需求的實(shí)現(xiàn)。
模板已實(shí)現(xiàn)以下功能:
函數(shù)Event中source、taskId、jobName字段的解析。
根據(jù)source中定義的數(shù)據(jù)源,通過Log Service Java SDK獲取數(shù)據(jù),并對(duì)每一批數(shù)據(jù)調(diào)用processData接口進(jìn)行處理。
在模板中,您還需要實(shí)現(xiàn)以下功能:
函數(shù)Event中parameter字段的解析,通過
UserDefinedFunctionParameter.java
實(shí)現(xiàn)。函數(shù)內(nèi)針對(duì)數(shù)據(jù)的自定義業(yè)務(wù)邏輯,通過
UserDefinedFunction.java
的processData接口實(shí)現(xiàn)。為您的函數(shù)取一個(gè)可以描述功能的名字,替換
UserDefinedFunction
。
processData接口實(shí)現(xiàn)
您可以在processData內(nèi)完成對(duì)一批數(shù)據(jù)的消費(fèi)、加工、投遞。例如在LogstoreReplication中,實(shí)現(xiàn)了將數(shù)據(jù)從一個(gè)Logstore中讀出后寫到另一個(gè)Logstore。
說明processData處理數(shù)據(jù)成功則返回true,處理數(shù)據(jù)遇到異常無法重試成功則返回false,但此時(shí)函數(shù)還會(huì)繼續(xù)運(yùn)行下去,且日志服務(wù)判定這是一次成功的ETL任務(wù),會(huì)忽略其中處理異常的數(shù)據(jù)。
如果遇到致命錯(cuò)誤或業(yè)務(wù)邏輯上認(rèn)為有異常需要提前終止函數(shù)執(zhí)行,會(huì)通過throw Exception方式跳出函數(shù)運(yùn)行,日志服務(wù)可以據(jù)此判斷函數(shù)運(yùn)行異常,并會(huì)按照ETL Job設(shè)置的規(guī)則重新調(diào)用函數(shù)執(zhí)行。
如果Shard流量較大,請(qǐng)為函數(shù)配置足夠的內(nèi)存規(guī)格,以避免函數(shù)OOM導(dǎo)致異常終止。
如果在函數(shù)內(nèi)執(zhí)行耗時(shí)操作或者Shard流量較大,請(qǐng)您設(shè)置較短的函數(shù)觸發(fā)間隔和較長的函數(shù)運(yùn)行超時(shí)時(shí)間。
請(qǐng)您為函數(shù)服務(wù)配置足夠的權(quán)限,例如在函數(shù)內(nèi)寫OSS就需要配置OSS寫權(quán)限。
ETL日志
ETL調(diào)度日志
調(diào)度日志記錄ETL任務(wù)開始時(shí)間、結(jié)束時(shí)間、任務(wù)是否成功以及成功返回的信息。如果ETL任務(wù)出錯(cuò)會(huì)生成ETL出錯(cuò)日志,并向系統(tǒng)管理員發(fā)送報(bào)警郵件或短信。請(qǐng)您在創(chuàng)建觸發(fā)器時(shí)設(shè)置觸發(fā)器日志Logstore,并為該Logstore開啟并配置索引,詳情請(qǐng)參見創(chuàng)建索引。
對(duì)于函數(shù)執(zhí)行結(jié)果的統(tǒng)計(jì),可以通過函數(shù)返回,例如在Java8 Runtime函數(shù)的
outputStream
中體現(xiàn)。日志服務(wù)提供的函數(shù)模板會(huì)寫出一個(gè)JSON Object序列化后的字符串,該字符串將記錄在ETL任務(wù)調(diào)度日志中,方便您進(jìn)行統(tǒng)計(jì)、查詢。ETL過程日志
這一部分日志是在ETL執(zhí)行過程中每執(zhí)行一步記錄的關(guān)鍵點(diǎn)和錯(cuò)誤信息,包括某一步驟的開始和結(jié)束時(shí)間、初始化動(dòng)作完成情況、模塊出錯(cuò)信息等。ETL過程日志的意義是隨時(shí)可以感知ETL運(yùn)行情況,如果發(fā)生錯(cuò)誤,可以及時(shí)通過過程日志查找原因。
您可以通過
context.getLogger()
記錄過程日志并存放在日志服務(wù)指定Project的Logstore中,建議您為該Logstore開啟索引查詢功能。