本文介紹了Serverless 工作流的回調功能。相比較輪詢,使用回調有效地降低了延遲、減少了輪詢對服務器造成的不必要壓力。另外,回調功能配合隊列可以實現對非FC任務的編排,將Serverless 工作流的編排范圍擴展到任意類型的計算資源。
簡介
長時間執行的任務通常會采用異步提交任務并返回任務標識(ID),判斷異步任務結束的方法通常有兩種:輪詢(polling)和回調(callback),在任務狀態輪詢中我們介紹了使用輪詢來判斷任務結束。Serverless 工作流的回調(callback)功能,覆蓋以下的痛點或場景:
- 消除輪詢周期長帶來的不必要延遲。
- 消除大流量場景下高并發的輪詢造成不必要的服務器資源壓力和浪費。
- 編排非FC Function的任務,例如運行在自建機房或ECS上的進程。
- 需要人工干預的步驟,例如通知審批通過。
下圖展示了使用MNS隊列服務集成結合回調API編排自建資源,拓寬Serverless 工作流的適用場景。
回調使用詳解
在Task步驟中指定pattern: waitForCallback
,如下圖狀態機所示:該步驟會在提交resourceArn
指定的任務后(如FC invocation)該步驟會將一個taskToken
存入到該步驟的context
對象并進入一個暫停的狀態,直到Serverless 工作流收到回調或指定的任務超時。將taskToken
傳入ReportTaskSucceed
或ReportTaskFailed
接口去回調會使得該步驟繼續執行。
- type: task
name: mytask
resourceArn: acs:fc:::services/{fc-service}/functions/{fc-function}
pattern: waitForCallback # 指定該Task步驟在提交任務后等待回調。
inputMappings:
- target: taskToken
source: $context.task.token # 將context中的taskToken作為input傳入resourceArn指定的函數。
outputMappings:
- target: k
source: $local.key # 將ReportTaskSucceeded中output {"key": "value"}映射成 {"k": "value"}并作為該步驟的輸出。
示例
該示例共分為以下3個步驟:
步驟1:準備Task Function
創建下面一個簡單的函數,該函數會將輸入直接返回。
- 服務:fnf-demo。
- 函數:echo。
- 運行環境:python2.7。
- 函數入口:index.handler。
#!/usr/bin/env python
import json
def handler(event, context):
return event
步驟2:開始工作流
創建流程,并開始執行。
- 流程名稱:fnf-demo-callback。
- 流程角色:配置一個有FC Invocation權限的角色。
version: v1
type: flow
steps:
- type: task
name: mytask
resourceArn: acs:fc:::services/fnf-demo/functions/echo
pattern: waitForCallback
inputMappings:
- target: taskToken
source: $context.task.token
outputMappings:
- target: s
source: $local.status
流程開始后可以看到mytask
步驟暫停在TaskSubmitted
事件,等待回調。該事件的output
中含有taskToken
作為回調任務的標識。
步驟3:回調
使用Serverless 工作流的Python SDK在本地(或其他可以運行Python的環境)運行
callback.py
腳本,將{task-token}替換為TaskSubmitted
事件中的值。cd /tmp
mkdir fnf-demo-callback
cd fnf-demo-callback
# 在虛擬環境中,安裝fnf python SDK。
virtualenv env
source env/bin/activate
pip install -t . aliyun-python-sdk-core
pip install -t . aliyun-python-sdk-fnf
# 執行worker進程。
export ACCOUNT_ID={your-account-id}; export AK_ID={your-ak-id}; export AK_SECRET={your-ak-secret}
python worker.py {task-token-from-TaskSubmitted}
# worker.py 代碼:
import os
import sys
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.client import AcsClient
from aliyunsdkfnf.request.v20190315 import ReportTaskSucceededRequest
def main():
account_id = os.environ['ACCOUNT_ID']
akid = os.environ['AK_ID']
ak_secret = os.environ['AK_SECRET']
fnf_client = AcsClient(akid, ak_secret, "cn-hangzhou")
task_token = sys.argv[1]
print "task token " + task_token
try:
request = ReportTaskSucceededRequest.ReportTaskSucceededRequest()
request.set_Output("{\"status\": \"ok\"}")
request.set_TaskToken(task_token)
resp = fnf_client.do_action_with_exception(request)
print "Report task succeeded finished"
except ServerException as e:
print(e)
if __name__ == '__main__':
main()
上述腳本回調成功后可以看到
mytask
步驟繼續執行,ReportTaskSucceeded
中指定的輸出"{"status": "ok"}"經過outputMappings的映射后變成"{"s": "ok"}"。文檔內容是否對您有幫助?