本文為您介紹如何通過阿里云E-MapReduce(簡稱EMR)控制臺,快速創建EMR Studio集群并開展交互式開發和工作流調度工作。

背景信息

如果您想了解更多Zeppelin、Jupyter和Airflow的信息,請參見以下內容:

前提條件

  • 已申請體驗EMR Studio的資格。
  • 已創建EMR Studio集群,詳情請參見創建EMR Studio集群
    說明 如果您是第一次創建EMR Studio集群,在創建EMR Studio集群時,控制臺會彈出系統角色授權窗口,請您使用主賬號對系統角色AliyunECSInstanceForEMRStudioRole進行授權。角色授權詳情請參見角色授權
  • 安全組規則已開啟8000、8081和8443端口。

    添加安全組規則,詳情請參見添加安全組規則

  • 已添加用戶,詳情請參見添加用戶

使用限制

EMR Studio僅支持與同一VPC下的EMR計算集群進行關聯。

操作流程

  1. 步驟一:進入數據開發工作臺
  2. 步驟二:關聯計算集群
  3. 步驟三:使用Zeppelin交互式開發作業
  4. 步驟四:編寫Airflow Python腳本
  5. 步驟五:使用Airflow調度Notebook作業

步驟一:進入數據開發工作臺

  1. 進入詳情頁面。
    1. 登錄阿里云E-MapReduce控制臺
    2. 在頂部菜單欄處,根據實際情況選擇地域和資源組
    3. 單擊上方的集群管理頁簽。
    4. 集群管理頁面,單擊相應集群所在行的詳情
  2. 添加Linux用戶。
    您可以通過用戶管理功能添加用戶,詳情請參見添加用戶
  3. 在左側導航欄中,選擇訪問鏈接與端口
  4. 單擊Studio Workspace UI所在行的鏈接。
    輸入步驟2中添加的用戶名和密碼,即可正常訪問Web UI頁面。
    說明 請稍等幾秒即可看到Studio Workspace UI的信息。

步驟二:關聯計算集群

重要 EMR Studio僅支持與同一VPC下的EMR計算集群進行關聯。

關聯集群后,EMR Studio集群可以訪問對應EMR集群上的資源并提交Job。

  1. 關聯集群頁面,單擊可關聯集群頁簽。
    說明 僅顯示同一個VPC下的EMR集群。支持關聯Hadoop集群、Dataflow集群和DataScience集群三種集群類型。
  2. 選擇待關聯集群的集群類型。
  3. 單擊待關聯集群操作列的關聯集群
  4. 關聯集群對話框中,單擊綁定
    已關聯集群頁簽,顯示關聯的集群信息時,表示關聯成功。
    說明 綁定集群過程大約需要1~2分鐘,請您耐心等待。

步驟三:使用Zeppelin交互式開發作業

關聯集群后,您可以在Zeppelin Notebook中進行交互式開發。EMR數據開發集群自帶教程,本文以Airflow調度教程1為例介紹。

  1. 在左側導航欄中,單擊Zeppelin
  2. Zeppelin頁面,選擇阿里云EMR數據開發教程 > Airflow教程 > Airflow調度教程1
    頁面展示如下圖所示。Airflow1
    區域 描述
    Markdown語言(以%md開頭),主要是為了添加一些說明文字。
    Shell腳本語言(以%sh開頭),您可以運行Shell腳本,生成數據。
    Hive(以%hive開頭),您可以執行任意的Hive SQL語句。
    單擊drop-down圖標,可以切換關聯的集群。
    單擊run圖標,運行當前段落。
    說明 部分解釋器(Interpreter)在第一次使用的時候會去下載依賴,如果提示信息類似Interpreter Setting 'hive' is not ready,說明下載過程還沒結束,請稍后重試。

步驟四:編寫Airflow Python腳本

Airflow的調度需要手動編寫Python腳本來構建DAG,EMR Studio自動將指定OSS路徑內的Python腳本同步至Airflow DAGs,因此,您可以在編輯和上傳完DAG腳本之后,進入數據開發工作臺,在左側導航欄中,單擊Airflow,即可進入DAGs頁面查看您創建的DAG。

EMR Studio自帶調度教程,您可以在Zeppelin頁面,選擇阿里云EMR數據開發教程 > Airflow教程 > Airflow調度教程1查看。Airflow的基本用法,請參見Apache AirflowAirflow dag
說明 EMR Studio自帶用于調度Zeppelin Notebook的Operator(ZeppelinOperator),并提供Note和Paragraph兩種級別調度方式。更多信息,請參見定期調度Zeppelin中的作業
本文以Paragraph級別調度為例,為每個Paragraph構建一個Task,然后串聯起來構成一個鏈式DAG。代碼示例如下。
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.apache.zeppelin.operators.zeppelin_operator import ZeppelinOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}


with DAG('zeppelin_example_paragraph_dag',
         max_active_runs=5,
         schedule_interval='0 0 * * *',
         default_args=default_args) as dag:

    execution_date = "{{ ds }}"

    raw_data_task = ZeppelinOperator(
        task_id='raw_data_task',
        conn_id='zeppelin_default',
        note_id='2FXADMCY9',
        paragraph_id='paragraph_1613703810025_1489985581',
        params= {'dt' : execution_date}
    )

    csv_data_task = ZeppelinOperator(
        task_id='csv_data_task',
        conn_id='zeppelin_default',
        note_id='2FXADMCY9',
        paragraph_id='paragraph_1613705325984_828524031',
        params= {'dt' : execution_date}
    )

    parquet_data_task = ZeppelinOperator(
        task_id='parquet_data_task',
        conn_id='zeppelin_default',
        note_id='2FXADMCY9',
        paragraph_id='paragraph_1613709950691_1420422787',
        params= {'dt' : execution_date}
    )

    query_task = ZeppelinOperator(
        task_id='query_task',
        conn_id='zeppelin_default',
        note_id='2FXADMCY9',
        paragraph_id='paragraph_1613703819960_454383326',
        params= {'dt' : execution_date}
    )

    raw_data_task >> csv_data_task >> parquet_data_task >> query_task
 

步驟五:使用Airflow調度Notebook作業

  • 啟用DAG調度
    DAGs頁面,打開off開關,即可啟用DAG調度。DAG
  • 查看調度任務的詳細信息

    DAGs頁面,單擊待查看調度任務的DAG,進入Tree View頁面。

    Tree View頁面,您可以查看對應DAG運行調度任務的詳細信息。DAG_info
  • Task的運行情況

    Tree View頁面,單擊task圖標,即可查看對應Task的運行情況。

    詳細信息如下圖所示。task_log