本文介紹了任務狀態輪詢和Serverless工作流實現的具體步驟。
簡介
在長時間任務的場景中如果任務結束后沒有回調機制,開發者通常會采用輪詢的方式來判斷任務的結束。可靠的輪詢實現需要維護狀態的持久化以保證即使當前輪詢進程失敗退出,進程恢復后輪詢也會繼續進行。本示例通過一個假設場景:用戶調用函數計算提交了一個多媒體處理任務,該任務耗時從1分鐘到幾小時不等,任務執行狀態可以通過API查詢,介紹如何使用Serverless工作流實現一個通用可靠的任務輪詢工作流。
Serverless工作流實現
下面的教程會將兩個FC函數編排成一個任務輪詢工作流,該示例需要以下3個步驟:
步驟1:創建FC函數
首先創建一個名為fnf-demo
的FC服務,并在該服務下創建兩個Python3.10的函數,詳細步驟,請參見快速創建函數。
StartJob函數:模擬通過調用API開始一個長時間的任務,返回一個任務ID。
import logging import uuid def handler(event, context): logger = logging.getLogger() id = uuid.uuid4() logger.info('Started job with ID %s' % id) return {"job_id": str(id)}
GetJobStatus函數:模擬通過調用API獲取指定任務的執行結果,比較當前的時間和函數第一次執行的時間的差值和輸入中
delay
的值,返回不同的狀態:“success”或“running”。import logging import uuid import time import json start_time = int(time.time()) def handler(event, context): evt = json.loads(event) logger = logging.getLogger() job_id = evt["job_id"] logger.info('Started job with ID %s' % job_id) now = int(time.time()) status = "running" delay = 60 if "delay" in evt: delay = evt["delay"] if now - start_time > delay: status = "success" try_count = 0 if "try_count" in evt: try_count = evt["try_count"] try_count = try_count + 1 logger.info('Job %s, status %s, try_count %d' % (job_id, status, try_count)) return {"job_id": job_id, "job_status":status, "try_count":try_count}
步驟2:創建Serverless工作流流程
在云工作流CloudFlow控制臺的流程中單擊創建流程,根據引導只填寫基本信息和流程定義,其它參數默認即可。
該流程的主要邏輯描述如下:
StartJob步驟:調用
StartJob
函數開始一個任務。Wait10s步驟:等待10秒。
GetJobStatus步驟:調用
GetJobStatus
。CheckJobComplete步驟:檢查
GetJobStatus
函數返回的結果:如果返回"success"整個流程執行成功。
如果輪詢嘗試次數大于3次,認為任務執行失敗,流程執行失敗。
如果返回"running"則跳回到
Wait10s
步驟,繼續執行。
version: v1 type: flow steps: - type: task name: StartJob resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/StartJob - type: pass name: Init outputMappings: - target: try_count source: 0 - type: wait name: Wait10s duration: 10 - type: task name: GetJobStatus resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/GetJobStatus inputMappings: - target: job_id source: $local.job_id - target: delay source: $input.delay - target: try_count source: $local.try_count - type: choice name: CheckJobComplete inputMappings: - target: status source: $local.job_status - target: try_count source: $local.try_count choices: - condition: $.status == "success" goto: JobSucceeded - condition: $.try_count > 3 goto: JobFailed - condition: $.status == "running" goto: Wait10s - type: succeed name: JobSucceeded - type: fail name: JobFailed
步驟3:開始執行并查看結果
在控制臺創建好的流程中單擊新執行并提供以下JSON對象作為輸入,其中delay
字段的值模擬任務完成需要的時間,這里預期任務在開始20秒后,GetJobStatus
函數返回“success”,在此之前均返回“running”,您可以調整delay
的值觀察不同的執行結果。
{
"delay": 20
}
下圖展示的是輪詢從開始到結束的流程執行可視化。
下圖展示的是任務需要20秒完成,可以看到流程執行歷史中第一次
GetJobStatus
返回“running”因此CheckJobComplete
的后續步驟眺回到Wait10s
進行等待和下一次查詢,第二次查詢返回“success”,流程執行結束。