Python SDK 快速開始
本文檔將介紹如何使用 Python 版 SDK 來提交一個作業(yè),目的是統(tǒng)計一個日志文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出現(xiàn)的次數(shù)。
作業(yè)準(zhǔn)備
上傳數(shù)據(jù)文件到 OSS
上傳任務(wù)程序到 OSS
使用 SDK 創(chuàng)建(提交)作業(yè)
查看結(jié)果
1. 作業(yè)準(zhǔn)備
本作業(yè)是統(tǒng)計一個日志文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出現(xiàn)的次數(shù)。
該作業(yè)包含3個任務(wù):split, count 和 merge:
split 任務(wù)會把日志文件分成 3 份。
count 任務(wù)會統(tǒng)計每份日志文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出現(xiàn)的次數(shù)(count 任務(wù)需要配置 InstanceCount 為 3,表示同時啟動3臺機(jī)器運行個 count 程序)。
merge 任務(wù)會把 count 任務(wù)的結(jié)果統(tǒng)一合并起來。
DAG圖例:
(1) 上傳數(shù)據(jù)文件到OSS
下載本例子所需的數(shù)據(jù):log-count-data.txt
將 log-count-data.txt 上傳到:oss://your-bucket/log-count/log-count-data.txt
your-bucket 表示您自己創(chuàng)建的 bucket,本例子假設(shè) region 為:cn-shenzhen。
如何上傳到 OSS,請參考OSS文件上傳 。
(2) 上傳任務(wù)程序到OSS
本例的作業(yè)程序是使用 python 編寫的, 下載本例子所需的程序:log-count.tar.gz
本例不需要改動示例代碼。直接將 log-count.tar.gz 上傳到 oss,如上傳到:oss://your-bucket/log-count/log-count.tar.gz
如何上傳前面已經(jīng)講過。
BatchCompute 只支持以 tar.gz 為后綴的壓縮包, 請注意務(wù)必用以上方式(gzip)打包, 否則將會無法解析。
如果您要修改代碼,可以解壓后修改,然后要用下面的方法打包:
命令如下:
> cd log-count #進(jìn)入目錄 > tar -czf log-count.tar.gz * #打包,將所有這個目錄下的文件打包到 log-count.tar.gz
可以運行這條命令查看壓縮包內(nèi)容:
$ tar -tvf log-count.tar.gz
可以看到以下列表:
conf.py count.py merge.py split.py
2. 使用SDK創(chuàng)建(提交)作業(yè)
python SDK 的相關(guān)下載與安裝請參閱 相關(guān)下載與安裝。
v20151111 版本,提交作業(yè)需要指定集群 ID 或者使用匿名集群參數(shù)。本例子使用匿名集群方式進(jìn)行,匿名集群需要配置 2 個參數(shù), 其中:
可用的鏡像 ID, 可以使用系統(tǒng)提供的 Image,也可以自行制作鏡像 。
實例規(guī)格(InstanceType,實例類型),請參考 目前支持類型。
在 OSS 中創(chuàng)建存儲 StdoutRedirectPath(程序輸出結(jié)果)和 StderrRedirectPath(錯誤日志)的文件路徑,本例中創(chuàng)建的路徑為oss://your-bucket/log-count/logs/
如需運行本例,請按照上文所述的變量獲取以及與上文對應(yīng)的您的 OSS 路徑對程序中注釋中的變量進(jìn)行修改。
Python SDK 提交程序模板如下,程序中具體參數(shù)含義請參見 參數(shù)說明。
#encoding=utf-8
import sys
from batchcompute import Client, ClientError
from batchcompute import CN_SHENZHEN as REGION #這里的region根據(jù)實際情況填寫
from batchcompute.resources import (
JobDescription, TaskDescription, DAG, AutoCluster, Configs, Networks, VPC,
)
ACCESS_KEY_ID='' # 填寫您的 AK
ACCESS_KEY_SECRET='' # 填寫您的 AK
IMAGE_ID = 'img-ubuntu' #這里填寫您的鏡像 ID
INSTANCE_TYPE = 'ecs.sn1.medium' # 根據(jù)實際 region 支持的 InstanceType 填寫
WORKER_PATH = '' # 'oss://your-bucket/log-count/log-count.tar.gz' 這里填寫您上傳的 log-count.tar.gz 的 OSS 存儲路徑
LOG_PATH = '' # 'oss://your-bucket/log-count/logs/' 這里填寫您創(chuàng)建的錯誤反饋和 task 輸出的 OSS 存儲路徑
OSS_MOUNT= '' # 'oss://your-bucket/log-count/' 同時掛載到/home/inputs 和 /home/outputs
client = Client(REGION, ACCESS_KEY_ID, ACCESS_KEY_SECRET)
def main():
try:
job_desc = JobDescription()
# Create auto cluster.
cluster = AutoCluster()
cluster.InstanceType = INSTANCE_TYPE
cluster.ResourceType = "OnDemand"
cluster.ImageId = IMAGE_ID
configs = Configs()
networks = Networks()
vpc = VPC()
vpc.CidrBlock = '192.168.0.0/16'
# vpc.VpcId = "vpc-8vbfxdyhxxxx"
networks.VPC = vpc
configs.Networks = networks
# 設(shè)置系統(tǒng)盤type(cloud_efficiency/cloud_ssd)以及size(單位GB)
configs.add_system_disk(size=40, type_='cloud_efficiency')
configs.InstanceCount = 1
cluster.Configs = configs
# Create split task.
split_task = TaskDescription()
split_task.Parameters.Command.CommandLine = "python split.py"
split_task.Parameters.Command.PackagePath = WORKER_PATH
split_task.Parameters.StdoutRedirectPath = LOG_PATH
split_task.Parameters.StderrRedirectPath = LOG_PATH
split_task.InstanceCount = 1
split_task.AutoCluster = cluster
split_task.InputMapping[OSS_MOUNT]='/home/input'
split_task.OutputMapping['/home/output'] = OSS_MOUNT
# Create map task.
count_task = TaskDescription(split_task)
count_task.Parameters.Command.CommandLine = "python count.py"
count_task.InstanceCount = 3
count_task.InputMapping[OSS_MOUNT] = '/home/input'
count_task.OutputMapping['/home/output'] = OSS_MOUNT
# Create merge task
merge_task = TaskDescription(split_task)
merge_task.Parameters.Command.CommandLine = "python merge.py"
merge_task.InstanceCount = 1
merge_task.InputMapping[OSS_MOUNT] = '/home/input'
merge_task.OutputMapping['/home/output'] = OSS_MOUNT
# Create task dag.
task_dag = DAG()
task_dag.add_task(task_name="split", task=split_task)
task_dag.add_task(task_name="count", task=count_task)
task_dag.add_task(task_name="merge", task=merge_task)
task_dag.Dependencies = {
'split': ['count'],
'count': ['merge']
}
# Create job description.
job_desc.DAG = task_dag
job_desc.Priority = 99 # 0-1000
job_desc.Name = "log-count"
job_desc.Description = "PythonSDKDemo"
job_desc.JobFailOnInstanceFail = True
job_id = client.create_job(job_desc).Id
print('job created: %s' % job_id)
except ClientError, e:
print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())
if __name__ == '__main__':
sys.exit(main())
3. 查看作業(yè)狀態(tài)
您可以用 SDK 中的 獲取作業(yè)信息 方法獲取作業(yè)狀態(tài):
jobInfo = client.get_job(job_id)
print (jobInfo.State)
State 狀態(tài)可能為:Waiting, Running, Finished, Failed, Stopped。
4. 查看結(jié)果
您可以登錄 OSS 控制臺 查看 your-bucket 下面的這個文件:/log-count/merge_result.json。
內(nèi)容應(yīng)該如下:
{"INFO": 2460, "WARN": 2448, "DEBUG": 2509, "ERROR": 2583}
您也可以使用OSS SDK 來獲取結(jié)果。