日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用Apache Airflow調(diào)度MaxCompute

更新時(shí)間:

MaxCompute支持您使用Apache Airflow通過(guò)Python接口實(shí)現(xiàn)作業(yè)調(diào)度。本文為您介紹如何使用Apache Airflow的Python Operator調(diào)度MaxCompute作業(yè)。

背景信息

Apache Airflow是Airbnb開源的、基于Python編寫的調(diào)度工具,基于有向無(wú)環(huán)圖(DAG),可以定義一組有依賴的作業(yè),并按照依賴順序依次執(zhí)行作業(yè)。還支持通過(guò)Python定義子作業(yè),并支持各種Operators操作器,靈活性大,能滿足用戶的各種需求。更多Apache Airflow信息,請(qǐng)參見Apache Airflow。

前提條件

在執(zhí)行操作前,請(qǐng)確認(rèn)您已滿足如下條件:

  • 已安裝PyODPS。

    更多安裝PyODPS操作,請(qǐng)參見安裝PyODPS。

  • 已安裝并啟動(dòng)Apache Airflow。

    更多安裝及啟動(dòng)Apache Airflow操作,請(qǐng)參見Apache Airflow快速入門。

    本文中的Apache Airflow示例版本為1.10.7。

步驟一:在Apache Airflow家目錄編寫調(diào)度Python腳本

編寫作業(yè)調(diào)度Python腳本并保存為.py文件,腳本文件中會(huì)呈現(xiàn)完整的調(diào)度邏輯及對(duì)應(yīng)的調(diào)度作業(yè)名稱。假設(shè)Python腳本名稱為Airiflow_MC.py,腳本內(nèi)容示例如下:

# -*- coding: UTF-8 -*-
import sys
import os
from odps import ODPS
from odps import options
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from configparser import ConfigParser
import time
reload(sys)
sys.setdefaultencoding('utf8')
#修改系統(tǒng)默認(rèn)編碼
#MaxCompute參數(shù)設(shè)置
options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}
cfg = ConfigParser()
cfg.read("odps.ini")
print(cfg.items())
# 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環(huán)境變量設(shè)置為用戶 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 環(huán)境變量設(shè)置為用戶 Access Key Secret,
# 不建議直接使用 Access Key ID / Access Key Secret 字符串
odps = ODPS(cfg.get("odps",os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID')),cfg.get("odps",os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')),cfg.get("odps","project"),cfg.get("odps","endpoint"))
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retry_delay': timedelta(minutes=5),
    'start_date':datetime(2020,1,15)
    # 'email': ['airflow@example.com'],
    # 'email_on_failure': False,
    # 'email_on_retry': False,
    # 'retries': 1,
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
#調(diào)度流程
dag = DAG(
    'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))
def read_sql(sqlfile):
    with io.open(sqlfile, encoding='utf-8', mode='r') as f:
        sql=f.read()
    f.closed
    return sql
#調(diào)度作業(yè)
def get_time():
    print '當(dāng)前時(shí)間是{}'.format(time.time())
    return time.time()
#調(diào)度作業(yè)
def mc_job ():

    project = odps.get_project()  # 取到默認(rèn)項(xiàng)目。
    instance=odps.run_sql("select * from long_chinese;")
    print(instance.get_logview_address())
    instance.wait_for_success()
    with instance.open_reader() as reader:
        count = reader.count
    print("查詢表數(shù)據(jù)條數(shù):{}".format(count))
    for record in reader:
        print record
    return count
t1 = PythonOperator (
    task_id = 'get_time' ,
    provide_context = False ,
    python_callable = get_time,
    dag = dag )

t2 = PythonOperator (
    task_id = 'mc_job' ,
    provide_context = False ,
    python_callable = mc_job ,
    dag = dag )
t2.set_upstream(t1)

步驟二:提交調(diào)度腳本

  1. 在系統(tǒng)的命令行窗口執(zhí)行如下命令提交步驟一中編寫的調(diào)度作業(yè)Python腳本。

    python Airiflow_MC.py
  2. 在系統(tǒng)的命令行窗口執(zhí)行如下命令生成調(diào)度流程并測(cè)試調(diào)度作業(yè)。

    # print the list of active DAGs
    airflow list_dags
    
    # prints the list of tasks the "tutorial" dag_id
    airflow list_tasks Airiflow_MC
    
    # prints the hierarchy of tasks in the tutorial DAG
    airflow list_tasks Airiflow_MC --tree
    #測(cè)試task
    airflow test Airiflow_MC get_time 2010-01-16
    airflow test Airiflow_MC mc_job 2010-01-16

步驟三:運(yùn)行調(diào)度作業(yè)

您可以登錄Apache Airflow的Web界面,在DAGs頁(yè)簽,查找到提交的調(diào)度流程,單擊Links列的運(yùn)行圖標(biāo)即可運(yùn)行調(diào)度作業(yè)。

運(yùn)行調(diào)度作業(yè)

步驟四:查看調(diào)度作業(yè)運(yùn)行結(jié)果

您也可以單擊調(diào)度作業(yè)名稱,在Graph View頁(yè)簽查看到調(diào)度作業(yè)流程。單擊調(diào)度流程中的某個(gè)作業(yè),例如mc_job,您可以在mc_job對(duì)話框,單擊View Log,即可查看運(yùn)行結(jié)果。

調(diào)度流程