Argo Workflows是一個強大的工作流管理工具,廣泛應用于定時任務、機器學習和ETL數據處理等場景,但是使用YAML定義工作流程可能會增加學習難度。Hera Python SDK提供了一種簡潔易用的替代方案,Hera允許用戶以Python代碼構建工作流,支持復雜任務,易于測試,并與Python生態無縫集成,顯著降低了工作流設計的門檻。本文將介紹如何使用Python SDK構建大規模Argo Workflows。
背景信息
Argo Workflows是一個專為Kubernetes環境設計的開源工作流管理工具,它專注于實現復雜工作流程的自動化編排。允許用戶定義一系列任務,并靈活安排這些任務的執行順序及依賴關系,Argo Workflows助力用戶高效構建和管理高度定制化的自動化工作流程。
Argo Workflows的應用場景非常廣泛,包括定時任務、機器學習、仿真計算、科學計算、ETL數據處理、模型訓練、CI/CD等。Argo Workflows主要依賴YAML來定義工作流程,這種設計目的在于實現配置的清晰與簡潔。然而,對于初次接觸或不熟悉YAML的用戶來說,面對復雜的工作流設計時,YAML的嚴格縮進要求及層次化的結構可能會增加一定的學習曲線和配置難度。
Hera是一個專為構建和提交Argo工作流程設計的Python SDK框架,其主要目標是簡化工作流程的構建和提交,對于數據科學家而言,通過使用Python能更好地兼容平時的使用習慣,克服YAML的阻礙。使用Hera PythonSDK具有以下優勢。
簡潔性:Hera提供了易于理解和編寫的代碼,大幅提升了開發效率。
支持復雜工作流:在處理復雜工作流時,使用Hera可以有效避免YAML可能產生的語法錯誤。
Python生態集成:每個Function就是一個Template,與Python生態中的各種框架輕松集成。
可測試性:可直接利用Python的測試框架,有助于提高代碼的質量和可維護性。
ACK One Serverless Argo工作流集群托管了Argo Workflows,其架構如下所示:
步驟一:創建并獲取集群Token
開啟Argo Server訪問工作流集群,可通過以下兩種方式開啟。
開通Argo Server公網訪問。(專線用戶可選)
執行以下命令,創建并獲取集群Token。
kubectl create token default -n default
步驟二:開啟Hera PythonSDK之旅
執行以下命令, 安裝Hera。
pip install hera-workflows
編寫并提交Workflows。
場景一:Simple DAG Diamond
在Argo Workflows中,DAG(有向無環圖)常用于定義復雜的任務依賴關系,其中Diamond結構是一種常見的工作流模式,可以實現多個任務并行執行后,將它們的結果匯聚到一個共同的后續任務。這種結構在合并不同數據流或處理結果的場景中非常有效。下面是一個具體示例,展示如何使用Hera定義一個具有Diamond結構的工作流,其中兩個任務taskA和taskB并行運行,它們的輸出共同作為輸入傳遞給taskC。
使用以下內容,創建simpleDAG.py。
# 導入相關包。 from hera.workflows import DAG, Workflow, script from hera.shared import global_config import urllib3 urllib3.disable_warnings() # 配置訪問地址和Token。 global_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746" global_config.token = "abcdefgxxxxxx" # 填入之前獲取的Token。 global_config.verify_ssl = "" # 裝飾器函數script是Hera實現近乎原生的Python函數編排的關鍵功能。 # 它允許您在Hera上下文管理器(例如Workflow或Steps上下文)下調用該函數。 # 該函數在任何Hera上下文之外仍將正常運行,這意味著您可以在給定函數上編寫單元測試。 # 該示例是打印輸入的信息。 @script() def echo(message: str): print(message) # 構建workflow,Workflow是Argo中的主要資源,也是Hera的關鍵類,負責保存模板、設置入口點和運行模板。 with Workflow( generate_name="dag-diamond-", entrypoint="diamond", ) as w: with DAG(name="diamond"): A = echo(name="A", arguments={"message": "A"}) # 構建Template。 B = echo(name="B", arguments={"message": "B"}) C = echo(name="C", arguments={"message": "C"}) D = echo(name="D", arguments={"message": "D"}) A >> [B, C] >> D # 構建依賴關系,B、C任務依賴A,D依賴B和C。 # 創建Workflow。 w.create()
執行以下命令, 提交工作流。
python simpleDAG.py
工作流運行后,您可以在工作流控制臺(Argo)查看任務DAG流程與運行結果。
場景二:Map-Reduce
在Argo Workflows中實現MapReduce風格的數據處理的關鍵在于有效利用其DAG(有向無環圖)模板,以組織和協調多個任務,從而模擬Map和Reduce階段。以下是一個更加詳細的示例,展示了如何使用Hera構建一個簡單的MapReduce工作流,用于處理文本文件的單詞計數任務。每一步都是一個Python函數,便于和Python生態進行集成。
使用以下內容,創建map-reduce.py。
from hera.workflows import DAG, Artifact, NoneArchiveStrategy, Parameter, OSSArtifact, Workflow, script from hera.shared import global_config import urllib3 urllib3.disable_warnings() # 設置訪問地址。 global_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746" global_config.token = "abcdefgxxxxxx" # 填入之前獲取的Token。 global_config.verify_ssl = "" # 使用script裝飾函數時,將script參數傳遞給script裝飾器。這包括image、inputs、outputs、resources等。 @script( image="python:alpine3.6", inputs=Parameter(name="num_parts"), outputs=OSSArtifact(name="parts", path="/mnt/out", archive=NoneArchiveStrategy(), key="{{workflow.name}}/parts"), ) def split(num_parts: int) -> None: # 根據輸入參數num_parts創建多個文件,文件中寫入foo字符和parts編號 import json import os import sys os.mkdir("/mnt/out") part_ids = list(map(lambda x: str(x), range(num_parts))) for i, part_id in enumerate(part_ids, start=1): with open("/mnt/out/" + part_id + ".json", "w") as f: json.dump({"foo": i}, f) json.dump(part_ids, sys.stdout) # script中定義image、inputs、outputs @script( image="python:alpine3.6", inputs=[Parameter(name="part_id", value="0"), Artifact(name="part", path="/mnt/in/part.json"),], outputs=OSSArtifact( name="part", path="/mnt/out/part.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/results/{{inputs.parameters.part_id}}.json", ), ) def map_() -> None: # 根據文件中foo字符的個數,生成新文件,將foo內容parts編號乘以2,寫入bar內容 import json import os os.mkdir("/mnt/out") with open("/mnt/in/part.json") as f: part = json.load(f) with open("/mnt/out/part.json", "w") as f: json.dump({"bar": part["foo"] * 2}, f) # script中定義image、inputs、outputs、resources @script( image="python:alpine3.6", inputs=OSSArtifact(name="results", path="/mnt/in", key="{{workflow.name}}/results"), outputs=OSSArtifact( name="total", path="/mnt/out/total.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/total.json" ), ) def reduce() -> None: # 計算每個parts對應bar值的總和。 import json import os os.mkdir("/mnt/out") total = 0 for f in list(map(lambda x: open("/mnt/in/" + x), os.listdir("/mnt/in"))): result = json.load(f) total = total + result["bar"] with open("/mnt/out/total.json", "w") as f: json.dump({"total": total}, f) # 構建workflow,輸入name、設置入口點、namespace、全局參數等。 with Workflow(generate_name="map-reduce-", entrypoint="main", namespace="default", arguments=Parameter(name="num_parts", value="4")) as w: with DAG(name="main"): s = split(arguments=Parameter(name="num_parts", value="{{workflow.parameters.num_parts}}")) # 構建Templetes。 m = map_( with_param=s.result, arguments=[Parameter(name="part_id", value="{{item}}"), OSSArtifact(name="part", key="{{workflow.name}}/parts/{{item}}.json"),], ) # 輸入參數并構建templetes。 s >> m >> reduce() # 構建任務依賴關系。 # 創建工作流。 w.create()
執行以下命令,提交工作流。
python map-reduce.py
工作流運行后,您可以在工作流控制臺(Argo)查看任務DAG流程與運行結果。
編輯方式對比
Argo Workflows的編輯方式主要包括YAML和Hera Framework。以下是這兩種方式的對比。
特性 | YAML | Hera Framework |
簡潔性 | 較高 | 高,代碼量少 |
復雜工作流編寫難易程度 | 難 | 易 |
Python生態集成難易程度 | 難 | 易,豐富的Python Lib |
可測試性 | 難,容易出現語法錯誤 | 易,可使用測試框架 |
Hera Framework以優雅的方式將Python生態體系與Argo Workflows框架結合,使繁瑣的工作流設計變得直觀簡明。它不僅為大規模任務編排提供了一條免受YAML復雜性困擾的通路,同時也有效連接了數據工程師與他們熟悉的Python語言,使得構建和優化機器學習工作流變得無縫和高效,迅速實現創意到部署的迭代循環,從而推動智能應用的快速落地與持續發展。
如果您對于ACK One有任何疑問,歡迎使用釘釘搜索釘釘群號35688562加入釘釘群。
相關文檔
Hera相關文檔。
如果您需要詳細了解Hera相關信息,請參見Hera概述。
若您想學習如何設置和使用Hera來進行LLM的訓練過程,請參見Train LLM with Hera。
YAML部署示例。
如果您想了解以YAML的方式部署simple-diamond,請參見dag-diamond.yaml。
如果您想了解以YAML的方式部署map-reduce,請參見 map-reduce.yaml。