1 背景
由于歷史原因,提交了大量的批量計算 DAG 作業,但是作業沒有及時清理;或者單次提交了大量的作業,需要做清理操作。目前批量計算控制臺支持單頁批量刪除,但若存在幾十頁作業的情況,清理動作也比較麻煩。因此提供了一個批量清理批量計算作業的工作。
2 工具
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import multiprocessing
import threading
import sys
PY2 = sys.version_info[0] == 2
if PY2:
import Queue as bcQueue
else:
import queue as bcQueue
import json
import time
from batchcompute import Client, ClientError
import argparse
from functools import wraps
def retryWrapper(func):
@wraps(func)
def wrapper(*args,**kwargs):
index = 0
while True:
try:
res = func(*args,**kwargs)
break
except ClientError as e:
status = e.get_status_code()
if status >= 400 and status < 500:
raise Exception(str(e))
if status >= 500 and status < 600:
if index > 6:
raise Exception(str(e))
else:
time.sleep(0.5 * pow(2,index))
index += 1
except Exception as e:
if index > 6:
raise Exception(str(e))
else:
time.sleep(0.5 * pow(2,index))
index += 1
return res
return wrapper
class ShowProcess():
"""
顯示處理進度的類
調用該類相關函數即可實現處理進度的顯示
"""
i = 0 # 當前的處理進度
max_steps = 0 # 總共需要處理的次數
max_arrow = 50 #進度條的長度
infoDone = 'done'
# 初始化函數,需要知道總共的處理次數
def __init__(self, max_steps, infoDone = 'Done'):
self.max_steps = max_steps
self.i = 0
self.infoDone = infoDone
# 顯示函數,根據當前的處理進度i顯示進度
# 效果為[>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>]100.00%
def show_process(self, i=None):
if i is not None:
self.i = i
else:
self.i += 1
num_arrow = int(self.i * self.max_arrow / self.max_steps) #計算顯示多少個'>'
num_line = self.max_arrow - num_arrow #計算顯示多少個'-'
percent = self.i * 100.0 / self.max_steps #計算完成進度,格式為xx.xx%
process_bar = '[' + '>' * num_arrow + '-' * num_line + ']'\
+ '%.2f' % percent + '%' + '\r' #帶輸出的字符串,'\r'表示不換行回到最左邊
sys.stdout.write(process_bar) #這兩句打印字符到終端
sys.stdout.flush()
if self.i >= self.max_steps:
self.close()
def close(self):
print('')
print(self.infoDone)
self.i = 0
class DeleteJob(threading.Thread):
def __init__(self, client, jobQueue, error_queue):
threading.Thread.__init__(self)
self.jobQueue = jobQueue
self.error_queue = error_queue
self.client = client
@retryWrapper
def delteSpecJob(self, jobId):
self.client.delete_job(jobId)
def run(self):
while True:
try:
jobId = self.jobQueue.get(block=False)
self.delteSpecJob(jobId)
self.jobQueue.task_done()
except bcQueue.Empty:
break
except Exception as e:
self.jobQueue.task_done()
self.error_queue.put('Delte Job(%s) exception %s' % (jobId, str(e)))
class DeleteJobs:
def __init__(self, client, days):
self.client = client
self.days = days
@retryWrapper
def listSpecIdxJob(self, marker, max_item):
response = self.client.list_jobs(marker, max_item)
return response
def listAndDeleteJobs(self):
marker = ""
max_item = 100
index = 0
jobQueue = bcQueue.Queue(0)
error_queue = bcQueue.Queue(0)
print("Begin List Spec Jobs...")
while marker or index == 0:
response = self.listSpecIdxJob(marker, max_item)
marker = response.NextMarker
for job in response.Items:
if job.State == "Running" or job.State == "Waiting":
continue
if self.days == 0:
jobQueue.put(job.Id)
else:
createTime = job.CreationTime.strftime("%Y-%m-%d %H:%M:%S.%f")
timestap = int(time.mktime(time.strptime(createTime, "%Y-%m-%d %H:%M:%S.%f")))
detaTime = self.days * 3600 * 24
if int(time.time()) - timestap >= detaTime:
jobQueue.put(job.Id)
# print "jobId: %s" % job.Id
index += 1
print("List Spec Jobs Finish...")
threadnum = multiprocessing.cpu_count()
size = jobQueue.qsize()
if size < threadnum:
threadnum = size
thread_pool = []
for threadnum in range(threadnum):
current = DeleteJob(self.client, jobQueue, error_queue)
thread_pool.append(current)
current.start()
print("Begin Delete Jobs...")
process_bar = ShowProcess(size, 'OK')
totalSize = size
while size != 0:
size = jobQueue.qsize()
process_bar.show_process(totalSize - size)
time.sleep(0.5)
jobQueue.join()
for thread in thread_pool:
thread.join()
errs = []
while True:
try:
error = error_queue.get(block=False)
errs.append(error)
except bcQueue.Empty:
break
return errs
if __name__ == "__main__":
parser = argparse.ArgumentParser(
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
description = 'python scripyt for Delete Batchcompute jobs',
usage='deleteJobs.py <positional argument> [<args>]',
)
region_surpport = ["cn-beijing", "cn-zhangjiakou", "cn-hangzhou", "cn-huhehaote", "cn-shanghai", "cn-shenzhen"]
parser.add_argument('--region', action='store', type = str, choices=region_surpport, help = 'the region info.')
parser.add_argument('--id', action='store', type = str, required=True, help = 'keyid.')
parser.add_argument('--key', action='store', type = str, required=True, help = 'key secrete.')
parser.add_argument('--day', action='store', type=int, required=True, help = 'delete the jobs which creating time before the spec day.')
args = parser.parse_args()
BatchHost = "batchcompute.%s.aliyuncs.com" % args.region
client = Client(BatchHost, args.id, args.key)
deleteJobs = DeleteJobs(client, args.day)
err = deleteJobs.listAndDeleteJobs()
if len(err) != 0 :
a = ""
print("\n".join(err))
else:
print("delete Jobs success!!!")
3 執行刪除動作
3.1 準備工作
復制腳本到執行機器,執行機器 必須安裝批量計算的SDK:
pip install batchcompute
若執行機器已經安裝批量計算的 SDK ,則建議更新到最新版本。
pip install --upgrade batchcompute
準備好阿里云 AK 信息,且該 AK 已經開通批量計算相關權限。
執行機器安裝好 Python,建議 Python 2.7及以上。
3.2 執行腳本
python delete.py --region cn-beijing --id xxxx --key xxx --day 10
id 表示 AK的 ID 信息
key 表示 AK的 KEY 信息
day 表示保留最近 N 天的意思
以上的示例表示:刪除北京 region 10天前創建的非等待和運行狀態的作業。
3.3 執行結果
文檔內容是否對您有幫助?