創(chuàng)建作業(yè)(DAG 類(lèi)型)
create_job
參數(shù)說(shuō)明:
所有類(lèi)型的參數(shù)將被轉(zhuǎn)換為包含屬性信息的字典對(duì)象。
參數(shù) | 類(lèi)型 | 描述 |
job_desc | JobDescription object, str, dict | 作業(yè)的簡(jiǎn)單描述和作業(yè)對(duì)象中各個(gè)任務(wù)的描述信息,以及各個(gè)任務(wù)之間的DAG依賴(lài)關(guān)系 |
返回值說(shuō)明:
create_job 方法將返回一個(gè)CreateResponse對(duì)象, 以下是 CreateResponse 對(duì)象的屬性。可以通過(guò) response.Id 的方式獲取新任務(wù)的 ID。
屬性 | 類(lèi)型 | 描述 |
Id | str | 新任務(wù)的任務(wù)標(biāo)識(shí)符 |
e.g.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from batchcompute import Client, ClientError
from batchcompute import CN_ZHANGJIAKOU as REGION
from batchcompute.resources import (
ClusterDescription, GroupDescription, Configs, Networks, VPC,
JobDescription, TaskDescription, DAG,Mounts,
AutoCluster,Disks,Notification,
)
access_key_id = "" # your access key id
access_key_secret = "" # your access key secret
image_id = "m-8vbd8lo9xxxx" # the id of a image created before,鏡像需要確保已經(jīng)注冊(cè)給批量計(jì)算
instance_type = "ecs.sn1.medium" # instance type
inputOssPath = "oss://xxx/input/" # your input oss path
outputOssPath = "oss://xxx/output/" #your output oss path
stdoutOssPath = "oss://xxx/log/stdout/" #your stdout oss path
stderrOssPath = "oss://xxx/log/stderr/" #your stderr oss path
def getAutoClusterDesc():
auto_desc = AutoCluster()
auto_desc.ECSImageId = image_id
#任務(wù)失敗保留環(huán)境,程序調(diào)試階段設(shè)置。環(huán)境保留費(fèi)用會(huì)繼續(xù)產(chǎn)生請(qǐng)注意及時(shí)手動(dòng)清除環(huán)境任務(wù)失敗保留環(huán)境,
# 程序調(diào)試階段設(shè)置。環(huán)境保留費(fèi)用會(huì)繼續(xù)產(chǎn)生請(qǐng)注意及時(shí)手動(dòng)清除環(huán)境
auto_desc.ReserveOnFail = False
# 實(shí)例規(guī)格
auto_desc.InstanceType = instance_type
#case1 設(shè)置上限價(jià)格的競(jìng)價(jià)實(shí)例;
# auto_desc.ResourceType = "Spot"
# auto_desc.SpotStrategy = "SpotWithPriceLimit"
# auto_desc.SpotPriceLimit = 0.5
#case2 系統(tǒng)自動(dòng)出價(jià),最高按量付費(fèi)價(jià)格
# auto_desc.ResourceType = "Spot"
# auto_desc.SpotStrategy = "SpotAsPriceGo"
#case3 按量
auto_desc.ResourceType = "OnDemand"
#Configs
configs = Configs()
#Configs.Networks
networks = Networks()
vpc = VPC()
#case1 只給CidrBlock
vpc.CidrBlock = '192.168.0.0/16'
#case2 CidrBlock和VpcId 都傳入,必須保證VpcId的CidrBlock 和傳入的CidrBlock保持一致
# vpc.CidrBlock = '172.26.0.0/16'
# vpc.VpcId = "vpc-8vbfxdyhxxxx"
networks.VPC = vpc
configs.Networks = networks
# 設(shè)置系統(tǒng)盤(pán)type(cloud_efficiency/cloud_ssd)以及size(單位GB)
configs.add_system_disk(size=40, type_='cloud_efficiency')
#設(shè)置數(shù)據(jù)盤(pán)type(必須和系統(tǒng)盤(pán)type保持一致)size(單位GB)掛載點(diǎn)
# case1 linux環(huán)境
# configs.add_data_disk(size=40, type_='cloud_efficiency', mount_point='/path/to/mount/')
# case2 windows環(huán)境
# configs.add_data_disk(size=40, type_='cloud_efficiency', mount_point='E:')
# 設(shè)置節(jié)點(diǎn)個(gè)數(shù)
configs.InstanceCount = 1
auto_desc.Configs = configs
return auto_desc
def getDagJobDesc(clusterId = None):
job_desc = JobDescription()
dag_desc = DAG()
mounts_desc = Mounts()
job_desc.Name = "testBatchSdkJob"
job_desc.Description = "test job"
job_desc.Priority = 1
# 訂閱job完成或者失敗事件
noti_desc = Notification()
noti_desc.Topic['Name'] = "test-topic"
noti_desc.Topic['Endpoint'] = "http://[UserId].mns.[Region].aliyuncs.com/"
noti_desc.Topic['Events'] = ["OnJobFinished", "OnJobFailed"]
# job_desc.Notification = noti_desc
job_desc.JobFailOnInstanceFail = False
# 作業(yè)運(yùn)行成功后戶(hù)自動(dòng)會(huì)被立即釋放掉
job_desc.AutoRelease = False
job_desc.Type = "DAG"
echo_task = TaskDescription()
# 程序的輸入路徑映射,程序直接訪問(wèn)/home/test/input/來(lái)訪問(wèn)oss://xxx/input/中的文件
# 支持文件掛載,在程序中直接訪問(wèn)文件
# echo_task.InputMapping = {"oss://xxx/input/": "/home/test/input/",
# "oss://xxx/test/file": "/home/test/test/file"}
echo_task.InputMapping = {inputOssPath: "/home/test/input/"}
# 程序的輸出路徑映射,可執(zhí)行程序?qū)⒔Y(jié)果輸出到/home/test/output/,
# 程序執(zhí)行完畢后批量計(jì)算將/home/test/output/中的結(jié)果上傳到oss://xxx/output/中
# 輸入和輸出oss路徑不要有交叉,如輸入為oss://xxx/input/,輸出為oss://xxx/input/output/;
# 這樣會(huì)導(dǎo)致未定義行為程序執(zhí)行性能不能保證
echo_task.OutputMapping = {"/home/test/output/":outputOssPath}
#觸發(fā)程序運(yùn)行的命令行
#case1 執(zhí)行l(wèi)inux命令行
echo_task.Parameters.Command.CommandLine = "/bin/bash -c 'echo BatchcomputeService'"
#case2 執(zhí)行Windows CMD.exe
# echo_task.Parameters.Command.CommandLine = "cmd /c 'echo BatchcomputeService'"
#case3 輸入可執(zhí)行文件
# PackagePath存放commandLine中的可執(zhí)行文件或者二進(jìn)制包
# echo_task.Parameters.Command.PackagePath = "oss://xxx/package/test.sh"
# echo_task.Parameters.Command.CommandLine = "sh test.sh"
# 設(shè)置程序運(yùn)行過(guò)程中相關(guān)環(huán)境變量信息
echo_task.Parameters.Command.EnvVars["key1"] = "value1"
echo_task.Parameters.Command.EnvVars["key2"] = "value2"
# 設(shè)置docker參數(shù)
#case1 docker鏡像在oss registry上
# echo_task.Parameters.Command.EnvVars["BATCH_COMPUTE_DOCKER_IMAGE"] = "localhost:5000/yuorBucket/dockers:0.1"
# echo_task.Parameters.Command.EnvVars["BATCH_COMPUTE_DOCKER_REGISTRY_OSS_PATH"] = "oss://your-bucket/dockers"
#case2 docker鏡像在容器倉(cāng)庫(kù)
# echo_task.Parameters.Command.Docker.Image = "registry.cn-beijing.aliyuncs.com/demotest/test:0.1"
# 設(shè)置程序的標(biāo)準(zhǔn)輸出地址,程序中的print打印會(huì)實(shí)時(shí)上傳到指定的oss地址
echo_task.Parameters.StdoutRedirectPath = stdoutOssPath
# 設(shè)置程序的標(biāo)準(zhǔn)錯(cuò)誤輸出地址,程序拋出的異常錯(cuò)誤會(huì)實(shí)時(shí)上傳到指定的oss地址
echo_task.Parameters.StderrRedirectPath = stderrOssPath
# 設(shè)置任務(wù)的超時(shí)時(shí)間
echo_task.Timeout = 600
# 設(shè)置任務(wù)所需實(shí)例個(gè)數(shù)
# 環(huán)境變量BATCH_COMPUTE_INSTANCE_ID為0到InstanceCount-1
# 在執(zhí)行程序中訪問(wèn)BATCH_COMPUTE_INSTANCE_ID,實(shí)現(xiàn)數(shù)據(jù)訪問(wèn)的切片實(shí)現(xiàn)單任務(wù)并發(fā)執(zhí)行
echo_task.InstanceCount = 1
# 設(shè)置任務(wù)失敗后重試次數(shù)
echo_task.MaxRetryCount = 0
# NAS數(shù)據(jù)掛載
#采用NAS時(shí)必須保證網(wǎng)絡(luò)和NAS在同一個(gè)VPC內(nèi)
nasMountEntry = {
"Source": "nas://xxxx.nas.aliyuncs.com:/",
"Destination": "/home/mnt/",
"WriteSupport":True,
}
mounts_desc.add_entry(nasMountEntry)
mounts_desc.Locale = "utf-8"
mounts_desc.Lock = False
# echo_task.Mounts = mounts_desc
# 采用固定集群提交作業(yè)
# echo_task.ClusterId = clusterId
#采用auto集群提交作業(yè)
echo_task.AutoCluster = getAutoClusterDesc()
# 添加任務(wù)
dag_desc.add_task('echoTask', echo_task)
# 可以設(shè)置多個(gè)task,每個(gè)task可以根據(jù)需求進(jìn)行設(shè)置各項(xiàng)參數(shù)
# dag_desc.add_task('echoTask2', echo_task)
# Dependencies設(shè)置多個(gè)task之間的依賴(lài)關(guān)系,echoTask2依賴(lài)echoTask;echoTask3依賴(lài)echoTask2
# dag_desc.Dependencies = {"echoTask":["echoTask2"], "echoTask2":["echoTask3"]}
#
job_desc.DAG = dag_desc
return job_desc
if __name__ == "__main__":
client = Client(REGION, access_key_id, access_key_secret)
try:
job_desc = getDagJobDesc()
job_id = client.create_job(job_desc).Id
print('job created: %s' % job_id)
except ClientError,e:
print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())
Notice: 關(guān)于Mounts的注意事項(xiàng) Job 級(jí)別的 Mounts 參數(shù)會(huì)覆蓋 Cluster 級(jí)別的配置信息; Modify Cluster 后,需要調(diào)用 RecreateInstance 接口才能使新指定的 Mounts 配置生效; 掛載 NAS 需要以 nas:// 做為 source 的前綴,否則會(huì)出錯(cuò); 每個(gè)類(lèi)的具體成員信息參見(jiàn)以下表格
(1)JobDescription 類(lèi)
參數(shù)說(shuō)明:
參數(shù) | 類(lèi)型 | 描述 |
properties | dict, str, JobDescription object | 包含作業(yè)描述信息的對(duì)象 |
屬性說(shuō)明:
序號(hào) | 屬性 | 類(lèi)型 | 描述 |
1. | Name | str | 作業(yè)名稱(chēng) |
2. | Description | str | 作業(yè)的簡(jiǎn)短描述信息 |
3. | Priority | int | 優(yōu)先級(jí)用一個(gè)[0,1000]范圍內(nèi)的整數(shù)指定,數(shù)值越高表示作業(yè)調(diào)度時(shí)的優(yōu)先級(jí)越高 |
4. | Notification | dict | 消息通知配置,可以配置 MNS 服務(wù)的 Topic 和 Job 相關(guān)事件 |
5. | JobFailOnInstanceFail | bool | Instance 失敗是否直接使 Job 失敗 |
6. | AutoRelease | boolean | 表示 Job 運(yùn)行成功自動(dòng)會(huì)被立即釋放(刪除)掉,默認(rèn)為 False |
7. | Type | str | 目前僅支持有向無(wú)環(huán)圖(directed acycline graph,DAG)形式描述任務(wù) |
8. | DAG | dict, DAG object | DAG 描述 |
(2)DAG 類(lèi)
參數(shù)說(shuō)明:
參數(shù) | 類(lèi)型 | 描述 |
properties | dict, str, DAG object | 所有任務(wù)的映射以及任務(wù)間依賴(lài)關(guān)系的描述信息 |
屬性說(shuō)明:
序號(hào) | 屬性 | 類(lèi)型 | 描述 |
1. | Tasks | dict | 所有任務(wù)名與任務(wù)描述的映射關(guān)系 |
2. | Dependencies | dict | 所有任務(wù)間的相互依賴(lài)關(guān)系 |
方法說(shuō)明 :
序號(hào) | 方法 | 描述 |
1. | add_task(task_name, task) | 增加一個(gè)任務(wù) |
2. | get_task(task_name) | 通過(guò)任務(wù)名獲取任務(wù)信息 |
3. | delete_task(task_name) | 刪除某個(gè)任務(wù) |
(3) TaskDescription 類(lèi)
參數(shù)說(shuō)明:
參數(shù) | 類(lèi)型 | 描述 |
properties | dict, str, TaskDescription object | 單個(gè)任務(wù)的描述信息 |
屬性說(shuō)明:
序號(hào) | 屬性 | 類(lèi)型 | 描述 |
1. | Parameters | dict, Parameters object | 任務(wù)參數(shù)詳情 |
2. | InputMapping | dict | OSS 到本地路徑的映射 |
3. | OutputMapping | dict | 本地路徑到 OSS 的映射 |
4. | LogMapping | dict | 本地日志路徑對(duì) OSS 映射 |
5. | Timeout | int | 任務(wù)超時(shí)時(shí)間 |
6. | InstanceCount | int | 任務(wù)中實(shí)例的個(gè)數(shù),正數(shù) |
7. | MaxRetryCount | int | 最大重試次數(shù),默認(rèn)為0 |
8. | ClusterId | str | 集群標(biāo)識(shí)符 |
9. | Mounts | dict, Mounts object | 實(shí)例的網(wǎng)絡(luò)掛載配置信息,由 Mounts 描述,目前支持 NAS 和 OSS 掛載。 |
10. | AutoCluster | dict, AutoCluster object | 匿名集群,和集群標(biāo)示符最多只能指定一個(gè) |
(4) Parameters 類(lèi)
參數(shù)說(shuō)明:
參數(shù) | 類(lèi)型 | 描述 |
properties | dict, str, Parameters object | 任務(wù)參數(shù)的描述信息 |
屬性說(shuō)明:
序號(hào) | 屬性 | 類(lèi)型 | 描述 |
1. | Command | dict, Command object | 用戶(hù)程序相關(guān)命令行參數(shù) |
2. | InputMappingConfig | dict, InputMappingConfig object | NFS 掛載服務(wù)配置項(xiàng) |
3. | StdoutRedirectPath | str | 標(biāo)準(zhǔn)輸出的 OSS 路徑 |
4. | StderrRedirectPath | str | 標(biāo)準(zhǔn)錯(cuò)誤的 OSS 路徑 |
(5) AutoCluster 類(lèi)
參數(shù)說(shuō)明:
參數(shù) | 類(lèi)型 | 描述 |
properties | dict, str, AutoCluster object | 匿名集群信息 |
屬性說(shuō)明:
序號(hào) | 屬性 | 類(lèi)型 | 描述 |
1. | ECSImageId | str | ECS 鏡像 ID,可以使用系統(tǒng)提供的鏡像 |
2. | InstanceType | str | 實(shí)例規(guī)格,實(shí)例類(lèi)型 |
3. | ResourceType | str | 資源類(lèi)型,目前支持:“OnDemand” 和 “Spot”,默認(rèn)為“OnDemand” |
4. | UserData | dict | 一個(gè) KeyValue 映射,用戶(hù)自定義的信息,使用 ECS 的 metaserver 獲取 |
5. | Configs | Configs object | 集群的配置信息, 詳見(jiàn)4.13 節(jié)中 ClusterDescription 的介紹 |
6. | SpotStrategy | str | 實(shí)例的競(jìng)價(jià)策略,只有在 ResourceType 為 Spot 的情況下有效。取值范圍:SpotWithPriceLimit:設(shè)置上限價(jià)格的競(jìng)價(jià)實(shí)例; SpotAsPriceGo:系統(tǒng)自動(dòng)出價(jià),最高按量付費(fèi)價(jià)格。 |
7. | SpotPriceLimit | float | 實(shí)例的每小時(shí)最高價(jià)格(每個(gè)實(shí)例規(guī)格的價(jià)格而非每核小時(shí)的價(jià)格)。支持最大 3 位小數(shù),SpotStrategy 為 SpotWithPriceLimit 生效。 |
8. | ReserveOnFail | bool | 任務(wù)失敗時(shí)不釋放相關(guān)的虛擬機(jī),會(huì)繼續(xù)收取這些資源的費(fèi)用直到用戶(hù)刪除作業(yè),默認(rèn)為 False,僅用于調(diào)查問(wèn)題。 |
9. | DependencyIsvService | string | 執(zhí)行程序依賴(lài)的阿里云提供的ISV服務(wù),目前提供的ISV服務(wù)有:“GTX”,默認(rèn)為””,不依賴(lài)任何ISV服務(wù)。 |
(6) Command 類(lèi)
參數(shù)說(shuō)明:
參數(shù) | 類(lèi)型 | 描述 |
properties | dict, str, Command object | 用戶(hù)程序相關(guān)命令行參數(shù) |
屬性說(shuō)明:
序號(hào) | 屬性 | 類(lèi)型 | 描述 |
1. | CommandLine | str | 執(zhí)行用戶(hù)程序的命令 |
2. | PackagePath | str | 用戶(hù)程序所在 OSS 路徑 |
3. | EnvVars | dict | 用戶(hù)程序執(zhí)行時(shí)的環(huán)境變量 |
(7) InputMappingConfig 類(lèi)
參數(shù)說(shuō)明:
參數(shù) | 類(lèi)型 | 描述 |
properties | dict, str, InputMappingConfig object | NFS 掛載服務(wù)配置項(xiàng) |
屬性說(shuō)明:
序號(hào) | 屬性 | 類(lèi)型 | 描述 |
1. | Locale | str | OSS object 掛載到本地時(shí)使用的字符集。可選范圍包括 GBK、GB2312-80、BIG5、ANSI、EUC-JP、EUC-TW、EUC-KR、SHIFT-JIS、KSC5601 等 |
2. | Lock | bool | NFS 掛載服務(wù)是否支持網(wǎng)絡(luò)文件鎖 |
(8) Notification 類(lèi)
參數(shù)說(shuō)明:
參數(shù) | 類(lèi)型 | 描述 |
properties | dict, str, Command object | 用戶(hù)程序相關(guān)命令行參數(shù) |
屬性說(shuō)明:
序號(hào) | 屬性 | 類(lèi)型 | 描述 |
1. | Topic | Topic Object | 消息 Topic |
(9) Topic 類(lèi)
參數(shù)說(shuō)明:
參數(shù) | 類(lèi)型 | 描述 |
properties | dict, str, Command object | 用戶(hù)程序相關(guān)命令行參數(shù) |
屬性說(shuō)明:
序號(hào) | 屬性 | 類(lèi)型 | 描述 |
1. | Endpoint | str | MNS 區(qū)域 endpoint,格式如: |
2. | Name | str | Topic 名稱(chēng)。 |
3. | Events | list | 事件列表,請(qǐng)?zhí)顚?xiě) cluster 相關(guān)的事件名。 |
(10) Mounts 類(lèi)
參數(shù)說(shuō)明:
參數(shù) | 類(lèi)型 | 描述 |
properties | dict, str, Command object | 創(chuàng)建集群時(shí)的網(wǎng)絡(luò)磁盤(pán)掛載配置信息。 |
屬性說(shuō)明:
序號(hào) | 屬性 | 類(lèi)型 | 描述 |
1. | Entries | array | 網(wǎng)絡(luò)磁盤(pán)掛載點(diǎn)信息列表, 由 MountPoint 描述。 |
2. | Locale | str | 掛載 OSS,NAS 存儲(chǔ)時(shí)語(yǔ)言選項(xiàng)。 |
3. | Lock | bool | 掛載 OSS,NAS 存儲(chǔ)時(shí)文件鎖支持選項(xiàng)。 |
4. | NAS | dict | NAS 配置信息。 |
5. | OSS | dict | OSS 配置信息。 |
(11) MountPoint 類(lèi)
參數(shù)說(shuō)明:
參數(shù) | 類(lèi)型 | 描述 |
properties | dict, str, Command object | 網(wǎng)絡(luò)掛載點(diǎn)。 |
屬性說(shuō)明:
序號(hào) | 屬性名稱(chēng) | 類(lèi)型 | 描述 |
1. | Source | str | 網(wǎng)絡(luò)磁盤(pán)掛載來(lái)源路徑,可以是 |
2. | Destination | str | 網(wǎng)絡(luò)磁盤(pán)本地掛載點(diǎn)路徑。 |
3. | WriteSupport | bool | 掛載點(diǎn)是否可寫(xiě)。 |
(12) NAS 類(lèi)
參數(shù)說(shuō)明:
參數(shù) | 類(lèi)型 | 描述 |
properties | dict, str, Command object | NAS 配置信息。 |
屬性說(shuō)明:
序號(hào) | 屬性名稱(chēng) | 類(lèi)型 | 描述 |
1. | AccessGroup | list | 需要將集群實(shí)例加入到的 NAS 訪問(wèn)組。 |
2. | FileSystem | list | 需要訪問(wèn)的文件系統(tǒng)。 |
(13) OSS 類(lèi)
參數(shù)說(shuō)明:
參數(shù) | 類(lèi)型 | 描述 |
properties | dict, str, Command object | OSS 配置信息。 |
屬性說(shuō)明:
序號(hào) | 屬性名稱(chēng) | 類(lèi)型 | 描述 |
1. | AccessKeyId | str | OSS掛載使用的 Access ID。 |
2. | AccessKeySecret | str | OSS掛載使用的 Access Secret。 |
3. | SecurityToken | str | OSS掛載使用的 Security Token。 |