日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

AttachCluster最佳實(shí)踐

更新時(shí)間:

0 背景

AttachCluster作業(yè)是批量計(jì)算最新推出的作業(yè)類(lèi)型。它結(jié)合了固定集群作業(yè)和AutoCluster作業(yè)的優(yōu)勢(shì),既能自動(dòng)管理集群生命周期,彈性伸縮資源,又能使用分布式緩存節(jié)省資源。本文的目的在于介紹在阿里云批量計(jì)算服務(wù)上運(yùn)行AttachCluster作業(yè)。

1 使用限制

  • 支持創(chuàng)建集群時(shí)自定義系統(tǒng)盤(pán)和數(shù)據(jù)盤(pán)大小
  • 不支持作業(yè)中自定義系統(tǒng)盤(pán)

    創(chuàng)建默認(rèn)集群中定義實(shí)例系統(tǒng)盤(pán)大小為SystemDiskSize之后,提交到該集群中的所有AttachCluster作業(yè)都默認(rèn)設(shè)置為SystemDiskSize。提交job的時(shí)候,該字段填0或者不填寫(xiě)。

  • 不支持作業(yè)中自定義數(shù)據(jù)

    創(chuàng)建默認(rèn)集群中定義實(shí)例系統(tǒng)盤(pán)大小為SystemDiskSize之后,提交到該集群中的所有AttachCluster作業(yè)都默認(rèn)設(shè)置為DataDiskSize。提交job的時(shí)候,該字段填0或者不填寫(xiě)。

  • 不支持APP作業(yè)模式, 支持DAG作業(yè)模式

  • 作業(yè)中填寫(xiě)的鏡像必須是m-xxx開(kāi)頭的鏡像, 不是img開(kāi)頭的鏡像(提交job的時(shí)候務(wù)必仔細(xì)檢查這里?。?/strong>

2 準(zhǔn)備工作

2.1 開(kāi)通阿里云批量計(jì)算服務(wù)

要使用批量計(jì)算服務(wù),請(qǐng)根據(jù)官方文檔里面的指導(dǎo)開(kāi)通批量計(jì)算和其依賴(lài)的相關(guān)服務(wù),如OSS等。

2.2 升級(jí)Python SDK

若您未安裝批量計(jì)算Python SDK,請(qǐng)您參照安裝方法安裝該SDK。如果您檢查已經(jīng)安裝之后,請(qǐng)您參照Python SDK升級(jí)方法, 升級(jí)批量計(jì)算Python SDK至最新版。

3 創(chuàng)建集群

AttachCluster作業(yè)首次使用時(shí),需要?jiǎng)?chuàng)建一個(gè)集群,創(chuàng)建方法可參考官方文檔 。該集群對(duì)配置沒(méi)有特殊需求,實(shí)例數(shù)可設(shè)置為0。以下是創(chuàng)建集群的Python源代碼。

  1. import time
  2. import random
  3. import string
  4. import batchcompute
  5. from batchcompute import CN_SHENZHEN as REGION
  6. from batchcompute import Client, ClientError
  7. from batchcompute.resources import (
  8. JobDescription, TaskDescription, DAG,
  9. GroupDescription, ClusterDescription,
  10. Configs, Networks, VPC, Classic, Mounts, Notification, Topic
  11. )
  12. ACCESS_KEY_ID = 'Your Access Key Id'
  13. ACCESS_KEY_SECRET = 'Your Access Key Secret'
  14. IMAGE_ID = 'img-ubuntu'
  15. INSTANCE_TYPE = 'ecs.sn2ne.large'
  16. client = Client(REGION, ACCESS_KEY_ID, ACCESS_KEY_SECRET)
  17. def create_cluster(idempotent_token=''):
  18. try:
  19. # Cluster description.
  20. cluster_desc = ClusterDescription()
  21. cluster_desc.Name = "test-cluster"
  22. cluster_desc.Description = "demo"
  23. cluster_desc.ImageId = IMAGE_ID
  24. cluster_desc.InstanceType = INSTANCE_TYPE
  25. #Group description
  26. group_desc1 = GroupDescription()
  27. group_desc1.DesiredVMCount = 4
  28. group_desc1.InstanceType = 'ecs.sn1ne.large' #user group special instance type
  29. group_desc1.ResourceType = 'OnDemand'
  30. cluster_desc.add_group('group1', group_desc1)
  31. #cluster_desc.add_group('group2', group_desc2)
  32. #Configs
  33. configs = Configs()
  34. #Configs.Disks
  35. configs.add_system_disk(50, 'cloud_efficiency')
  36. configs.add_data_disk(500, 'cloud_efficiency', '/home/my-data-disk')
  37. #Configs.Networks
  38. networks = Networks()
  39. vpc = VPC()
  40. vpc.CidrBlock = '192.168.0.0/16'
  41. #vpc.VpcId = 'vpc-xxxxx'
  42. networks.VPC = vpc
  43. configs.Networks = networks
  44. cluster_desc.Configs = configs
  45. print cluster_desc
  46. rsp = client.create_cluster(cluster_desc, idempotent_token)
  47. # get cluster id for attach cluster job
  48. return rsp.Id
  49. except ClientError, e:
  50. print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())
  51. return ""
  52. if __name__ == '__main__':
  53. #Not Use idempotent token
  54. cluster_id = create_cluster()
  55. print cluster_id

3 創(chuàng)建作業(yè)

在創(chuàng)建作業(yè)的時(shí)候需要步驟2中的集群Id,填入task的AutoCluster的ClusterId字段中。以下是創(chuàng)建作業(yè)的Python源代碼。

  1. from batchcompute import Client, ClientError
  2. from batchcompute import CN_SHENZHEN as REGION
  3. from batchcompute.resources import (
  4. ClusterDescription, GroupDescription, Configs, Networks, VPC,
  5. JobDescription, TaskDescription, DAG,Mounts,
  6. AutoCluster,Disks,Notification,
  7. )
  8. access_key_id = "" # your access key id
  9. access_key_secret = "" # your access key secret
  10. image_id = "m-8vbd8lo9xxxx" # the id of a image created before,鏡像需要確保已經(jīng)注冊(cè)給批量計(jì)算,且必須是m-xx開(kāi)頭的鏡像,不是img開(kāi)頭的鏡像
  11. instance_type = "ecs.sn1.medium" # instance type
  12. inputOssPath = "oss://xxx/input/" # your input oss path
  13. outputOssPath = "oss://xxx/output/" #your output oss path
  14. stdoutOssPath = "oss://xxx/log/stdout/" #your stdout oss path
  15. stderrOssPath = "oss://xxx/log/stderr/" #your stderr oss path
  16. def getAutoClusterDesc():
  17. auto_desc = AutoCluster()
  18. # attach cluster這里里填入上一步創(chuàng)建的集群Id
  19. auto_desc.ClusterId = cls-xxxxx
  20. auto_desc.ImageId = image_id
  21. auto_desc.ReserveOnFail = False
  22. # 實(shí)例規(guī)格
  23. auto_desc.InstanceType = instance_type
  24. #case1 設(shè)置上限價(jià)格的競(jìng)價(jià)實(shí)例;
  25. # auto_desc.ResourceType = "Spot"
  26. # auto_desc.SpotStrategy = "SpotWithPriceLimit"
  27. # auto_desc.SpotPriceLimit = 0.5
  28. #case2 系統(tǒng)自動(dòng)出價(jià),最高按量付費(fèi)價(jià)格
  29. # auto_desc.ResourceType = "Spot"
  30. # auto_desc.SpotStrategy = "SpotAsPriceGo"
  31. #case3 按量
  32. auto_desc.ResourceType = "OnDemand"
  33. #Configs
  34. configs = Configs()
  35. #Configs.Networks
  36. networks = Networks()
  37. vpc = VPC()
  38. #case1 只給CidrBlock
  39. vpc.CidrBlock = '192.168.0.0/16'
  40. #case2 CidrBlock和VpcId 都傳入,必須保證VpcId的CidrBlock 和傳入的CidrBlock保持一致
  41. # vpc.CidrBlock = '172.26.0.0/16'
  42. # vpc.VpcId = "vpc-8vbfxdyhxxxx"
  43. networks.VPC = vpc
  44. configs.Networks = networks
  45. # 不支持設(shè)置系統(tǒng)盤(pán)
  46. #configs.add_system_disk(size=0, type_='cloud_efficiency')
  47. #不支持設(shè)置數(shù)據(jù)盤(pán)
  48. # case1 linux環(huán)境
  49. # configs.add_data_disk(size=0, type_='cloud_efficiency', mount_point='/path/to/mount/')
  50. # case2 windows環(huán)境
  51. # configs.add_data_disk(size=0, type_='cloud_efficiency', mount_point='E:')
  52. # 設(shè)置節(jié)點(diǎn)個(gè)數(shù)
  53. configs.InstanceCount = 1
  54. auto_desc.Configs = configs
  55. return auto_desc
  56. def getDagJobDesc(clusterId = None):
  57. job_desc = JobDescription()
  58. dag_desc = DAG()
  59. mounts_desc = Mounts()
  60. job_desc.Name = "testBatchSdkJob"
  61. job_desc.Description = "test job"
  62. job_desc.Priority = 1
  63. # 訂閱job完成或者失敗事件
  64. noti_desc = Notification()
  65. noti_desc.Topic['Name'] = "test-topic"
  66. noti_desc.Topic['Endpoint'] = "http://[UserId].mns.[Region].aliyuncs.com/"
  67. noti_desc.Topic['Events'] = ["OnJobFinished", "OnJobFailed"]
  68. # job_desc.Notification = noti_desc
  69. job_desc.JobFailOnInstanceFail = False
  70. # 作業(yè)運(yùn)行成功后戶自動(dòng)會(huì)被立即釋放掉
  71. job_desc.AutoRelease = False
  72. job_desc.Type = "DAG"
  73. echo_task = TaskDescription()
  74. # echo_task.InputMapping = {"oss://xxx/input/": "/home/test/input/",
  75. # "oss://xxx/test/file": "/home/test/test/file"}
  76. echo_task.InputMapping = {inputOssPath: "/home/test/input/"}
  77. echo_task.OutputMapping = {"/home/test/output/":outputOssPath}
  78. #觸發(fā)程序運(yùn)行的命令行
  79. #case1 執(zhí)行l(wèi)inux命令行
  80. echo_task.Parameters.Command.CommandLine = "/bin/bash -c 'echo BatchcomputeService'"
  81. #case2 執(zhí)行Windows CMD.exe
  82. # echo_task.Parameters.Command.CommandLine = "cmd /c 'echo BatchcomputeService'"
  83. #case3 輸入可執(zhí)行文件
  84. # PackagePath存放commandLine中的可執(zhí)行文件或者二進(jìn)制包
  85. # echo_task.Parameters.Command.PackagePath = "oss://xxx/package/test.sh"
  86. # echo_task.Parameters.Command.CommandLine = "sh test.sh"
  87. # 設(shè)置程序運(yùn)行過(guò)程中相關(guān)環(huán)境變量信息
  88. echo_task.Parameters.Command.EnvVars["key1"] = "value1"
  89. echo_task.Parameters.Command.EnvVars["key2"] = "value2"
  90. # 設(shè)置程序的標(biāo)準(zhǔn)輸出地址,程序中的print打印會(huì)實(shí)時(shí)上傳到指定的oss地址
  91. echo_task.Parameters.StdoutRedirectPath = stdoutOssPath
  92. # 設(shè)置程序的標(biāo)準(zhǔn)錯(cuò)誤輸出地址,程序拋出的異常錯(cuò)誤會(huì)實(shí)時(shí)上傳到指定的oss地址
  93. echo_task.Parameters.StderrRedirectPath = stderrOssPath
  94. # 設(shè)置任務(wù)的超時(shí)時(shí)間
  95. echo_task.Timeout = 600
  96. # 設(shè)置任務(wù)所需實(shí)例個(gè)數(shù)
  97. # 環(huán)境變量BATCH_COMPUTE_INSTANCE_ID為0到InstanceCount-1
  98. # 在執(zhí)行程序中訪問(wèn)BATCH_COMPUTE_INSTANCE_ID,實(shí)現(xiàn)數(shù)據(jù)訪問(wèn)的切片實(shí)現(xiàn)單任務(wù)并發(fā)執(zhí)行
  99. echo_task.InstanceCount = 1
  100. # 設(shè)置任務(wù)失敗后重試次數(shù)
  101. echo_task.MaxRetryCount = 0
  102. # NAS數(shù)據(jù)掛載
  103. #采用NAS時(shí)必須保證網(wǎng)絡(luò)和NAS在同一個(gè)VPC內(nèi)
  104. nasMountEntry = {
  105. "Source": "nas://xxxx.nas.aliyuncs.com:/",
  106. "Destination": "/home/mnt/",
  107. "WriteSupport":True,
  108. }
  109. mounts_desc.add_entry(nasMountEntry)
  110. mounts_desc.Locale = "utf-8"
  111. mounts_desc.Lock = False
  112. # echo_task.Mounts = mounts_desc
  113. # attach cluster作業(yè)該集群字段設(shè)置為空
  114. echo_task.ClusterId = ""
  115. echo_task.AutoCluster = getAutoClusterDesc()
  116. # 添加任務(wù)
  117. dag_desc.add_task('echoTask', echo_task)
  118. # 可以設(shè)置多個(gè)task,每個(gè)task可以根據(jù)需求進(jìn)行設(shè)置各項(xiàng)參數(shù)
  119. # dag_desc.add_task('echoTask2', echo_task)
  120. # Dependencies設(shè)置多個(gè)task之間的依賴(lài)關(guān)系,echoTask2依賴(lài)echoTask;echoTask3依賴(lài)echoTask2
  121. # dag_desc.Dependencies = {"echoTask":["echoTask2"], "echoTask2":["echoTask3"]}
  122. job_desc.DAG = dag_desc
  123. return job_desc
  124. if __name__ == "__main__":
  125. client = Client(REGION, access_key_id, access_key_secret)
  126. try:
  127. job_desc = getDagJobDesc()
  128. job_id = client.create_job(job_desc).Id
  129. print('job created: %s' % job_id)
  130. except ClientError,e:
  131. print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())

AttachCluster作業(yè)創(chuàng)建已經(jīng)完成。