日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Flink批處理快速入門

作為流批一體的計算框架,Flink不僅能夠提供低延遲的流式數據處理(Streaming Data Processsing),也能進行高吞吐的批處理(Batch Data Processing)。實時計算Flink版對批處理能力進行了專門的支持,提供了包括作業開發、作業運維、作業編排、資源隊列管理、數據結果探查等能力,可以利用Flink批處理能力更好地解決業務需求。本文通過具體的示例為您介紹如何利用實時計算Flink版關鍵功能進行數據批處理。

功能介紹

實時計算Flink版提供了以下關鍵功能來支持Flink批處理:

  • SQL作業開發:在SQL開發頁面的作業草稿頁簽,可以創建批作業草稿,批作業草稿會以批作業的形式被部署和執行。

  • 作業管理:在作業運維頁面,可以直接部署JAR或Python類型的批作業。在頂部下拉框中選擇批作業,查看已部署的批作業。展開目標批作業,可查看其作業實例列表。通常,一個批作業的不同作業實例具有相同的處理邏輯,但是采用不同的參數,例如處理的數據所屬日期。

  • 查詢腳本:在SQL開發頁面的查詢腳本頁簽,可以執行一些DDL或短查詢,快速地進行數據管理和數據探查。這些短查詢執行在預創建的Flink Session中,通過資源復用,實現低延遲的簡單查詢。

  • 管理元數據:在元數據管理頁面,可以創建和查看Catalog,包括其中的數據庫和表的信息。您也可以在SQL開發頁面的元數據頁簽進行查看,提高開發效率。

  • 任務編排(公測):在任務編排頁面,可以定義工作流,通過可視化的操作方式,編排一系列批作業的執行依賴。工作流會作為一個整體,根據定義好的依賴關系執行包含的批作業。支持通過手動觸發或定時調度方式來執行創建好的工作流。

  • 管理資源隊列:在隊列管理頁面,可以對工作空間中的資源進行劃分,從而避免流作業和批作業、以及不同優先級的作業間發生資源爭搶。

注意事項

  • 已創建Flink工作空間,詳情請參見開通實時計算Flink版

  • 已開通對象存儲OSS,詳情請參見控制臺快速入門。OSS Bucket的存儲類型需要為標準存儲,詳情請參見存儲類型概述。

  • 由于本文示例使用Apache Paimon存儲數據,僅實時計算引擎VVR 8.0.5及以上版本支持本文示例。

示例場景

本文以一個電子商務平臺的業務場景為例,使用Apache Paimon的湖倉格式對數據進行存儲。模擬了一個數據倉庫結構,包括ODS(操作數據存儲)、DWD(數據倉庫細節級)、DWS(數據倉庫匯總級)的存儲層級。通過Flink的批處理能力,對數據進行加工清洗后寫入Paimon表,從而實現數據分層結構的搭建。

image

準備工作

  1. 創建查詢腳本。

    通過查詢腳本頁簽,您可以創建Catalog以及其中的數據庫和表,并且向表中插入一些模擬的數據。

  2. 創建Paimon Catalog。

    1. 查詢腳本的文本編輯區域,輸入如下SQL語句。

      CREATE CATALOG `my_catalog` WITH (
        'type' = 'paimon',
        'metastore' = 'filesystem',
        'warehouse' = '<warehouse>',
        'fs.oss.endpoint' = '<fs.oss.endpoint>',
        'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>',
        'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>'
      );

      參數配置項如下。

      配置項

      說明

      是否必填

      備注

      type

      Catalog類型。

      固定值為Paimon。

      metastore

      元數據存儲類型。

      本文示例填寫filesystem,其他類型詳情請參見管理Paimon Catalog

      warehouse

      OSS服務中所指定的數倉目錄。

      格式為oss://<bucket>/<object>。其中:

      • bucket:表示您創建的OSS Bucket名稱。

      • object:表示您存放數據的路徑。

      請在OSS管理控制臺上查看您的Bucket和Object名稱。

      fs.oss.endpoint

      OSS服務的連接地址。

      當warehouse指定的OSS Bucket與Flink工作空間不在同一地域,或使用其它賬號下的OSS bucket時需要填寫。

      請參見訪問域名和數據中心。

      fs.oss.accessKeyId

      擁有讀寫OSS權限的阿里云賬號或RAM賬號的AccessKey。

      當warehouse指定的OSS Bucket與Flink工作空間不在同一地域,或使用其它賬號下的OSS Bucket時需要填寫。獲取方法請參見創建AccessKey。

      fs.oss.accessKeySecret

      擁有讀寫OSS權限的阿里云賬號或RAM賬號的AccessKey Secret。

    2. 選中上述代碼,單擊左側的運行

      返回The following statement has been executed successfully!信息表示Catalog創建成功。此時可以在元數據管理頁面(或是SQL開發頁面的元數據子頁面),查看新創建的Catalog。

      image.png

操作流程

步驟一:創建ODS表并插入測試數據

說明

為了簡化本示例,我們直接向ODS表中插入了一些測試數據,用于后續的DWD/DWS表的數據生成。在實際生產中,一般會使用Flink流處理從外部數據源讀取數據并寫入到湖中作為ODS層,具體可以參見 Paimon快速開始:基本功能。

  1. 查詢腳本文本編輯區域,輸入如下SQL語句并單擊左側的運行。

    CREATE DATABASE `my_catalog`.`order_dw`;
    
    USE `my_catalog`.`order_dw`;
    
    CREATE TABLE orders (
      order_id BIGINT,
      user_id STRING,
      shop_id BIGINT,
      product_id BIGINT,
      buy_fee BIGINT,   
      create_time TIMESTAMP,
      update_time TIMESTAMP,
      state INT
    );
    
    CREATE TABLE orders_pay (
      pay_id BIGINT,
      order_id BIGINT,
      pay_platform INT, 
      create_time TIMESTAMP
    );
    
    CREATE TABLE product_catalog (
      product_id BIGINT,
      catalog_name STRING
    );
    
    -- 插入測試數據
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000, TO_TIMESTAMP('2023-02-15 16:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
    (100002, 'user_002', 12346, 2, 4000, TO_TIMESTAMP('2023-02-15 15:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
    (100003, 'user_003', 12347, 3, 3000, TO_TIMESTAMP('2023-02-15 14:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
    (100004, 'user_001', 12347, 4, 2000, TO_TIMESTAMP('2023-02-15 13:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
    (100005, 'user_002', 12348, 5, 1000, TO_TIMESTAMP('2023-02-15 12:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
    (100006, 'user_001', 12348, 1, 1000, TO_TIMESTAMP('2023-02-15 11:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
    (100007, 'user_003', 12347, 4, 2000, TO_TIMESTAMP('2023-02-15 10:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, TO_TIMESTAMP('2023-02-15 17:40:56')),
    (2002, 100002, 1, TO_TIMESTAMP('2023-02-15 17:40:56')),
    (2003, 100003, 0, TO_TIMESTAMP('2023-02-15 17:40:56')),
    (2004, 100004, 0, TO_TIMESTAMP('2023-02-15 17:40:56')),
    (2005, 100005, 0, TO_TIMESTAMP('2023-02-15 18:40:56')),
    (2006, 100006, 0, TO_TIMESTAMP('2023-02-15 18:40:56')),
    (2007, 100007, 0, TO_TIMESTAMP('2023-02-15 18:40:56'));
    
    INSERT INTO product_catalog VALUES
      (1, 'phone_aaa'),
      (2, 'phone_bbb'),
      (3, 'phone_ccc'),
      (4, 'phone_ddd'),
      (5, 'phone_eee');
    說明

    本文創建的是不帶主鍵的Paimon Append Only表,其相比于Paimon主鍵表具有更好的批量寫入性能,但不支持基于主鍵的更新操作。

    執行結果會包含多個子標簽頁,返回The following statement has been executed successfully!信息表示對應的DDL語句執行成功。

    INSERT等DML語句則會返回一個JobId,表明生成了Flink作業并在Flink Session中執行,單擊結果欄左側的在Flink UI中查看,可觀察到這幾條SQL語句的執行情況,等待數秒至其執行完成。

  2. 探查ODS表數據。

    查詢腳本文本編輯區域,輸入如下SQL語句并單擊左側的運行

    SELECT count(*) as order_count FROM `my_catalog`.`order_dw`.`orders`;
    SELECT count(*) as pay_count FROM `my_catalog`.`order_dw`.`orders_pay`;
    SELECT * FROM `my_catalog`.`order_dw`.`product_catalog`;

    這些SQL語句也會在Flink Session中執行,最終可以在3個查詢的結果頁面中查看返回結果。

    image.png image.png image.png

步驟二:創建DWD和DWS表

查詢腳本文本編輯區域,輸入如下SQL語句并單擊左側的運行。

USE `my_catalog`.`order_dw`;

CREATE TABLE dwd_orders (
    order_id BIGINT,
    order_user_id STRING,
    order_shop_id BIGINT,
    order_product_id BIGINT,
    order_product_catalog_name STRING,
    order_fee BIGINT,
    order_create_time TIMESTAMP,
    order_update_time TIMESTAMP,
    order_state INT,
    pay_id BIGINT,
    pay_platform INT COMMENT 'platform 0: phone, 1: pc',
    pay_create_time TIMESTAMP
) WITH (
    'sink.parallelism' = '2'
);

CREATE TABLE dws_users (
    user_id STRING,
    ds STRING,
    total_fee BIGINT COMMENT '當日完成支付的總金額'
) WITH (
    'sink.parallelism' = '2'
);

CREATE TABLE dws_shops (
    shop_id BIGINT,
    ds STRING,
    total_fee BIGINT COMMENT '當日完成支付總金額'
) WITH (
    'sink.parallelism' = '2'
);
說明

此處創建的仍然是Paimon Append Only表。Paimon表作為Flink Sink不支持自動并發推導,需要顯式設置其并發度,否則可能會報錯。

步驟三:創建與部署DWD和DWS作業

  1. 創建和部署DWD作業。

    1. 創建DWD表更新作業。

      數據開發 > ETL頁面新建空白的批作業草稿,命名為dwd_orders,將如下SQL語句復制到文本編輯區域中。由于DWD表是Paimon Append Only表,因此此處使用INSERT OVERWRITE語句進行整體的覆寫。

      INSERT OVERWRITE my_catalog.order_dw.dwd_orders
      SELECT 
          o.order_id,
          o.user_id,
          o.shop_id,
          o.product_id,
          c.catalog_name,
          o.buy_fee,
          o.create_time,
          o.update_time,
          o.state,
          p.pay_id,
          p.pay_platform,
          p.create_time
      FROM 
          my_catalog.order_dw.orders as o, 
          my_catalog.order_dw.product_catalog as c, 
          my_catalog.order_dw.orders_pay as p
      WHERE o.product_id = c.product_id AND o.order_id = p.order_id
    2. 單擊頁面右上方的部署,單擊確定,部署dwd_orders作業。

  2. 創建和部署DWS作業。

    1. 創建DWS表更新作業。

      數據開發 > ETL頁面新建兩個空白的批作業草稿,分別命名為dws_shops和dws_users,將下列SQL語句分別復制到對應草稿的文本編輯區域中。

      INSERT OVERWRITE my_catalog.order_dw.dws_shops
      SELECT 
          order_shop_id,
          DATE_FORMAT(pay_create_time, 'yyyyMMdd') as ds,
          SUM(order_fee) as total_fee
      FROM my_catalog.order_dw.dwd_orders
      WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL
      GROUP BY order_shop_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd');
      INSERT OVERWRITE my_catalog.order_dw.dws_users
      SELECT 
          order_user_id,
          DATE_FORMAT(pay_create_time, 'yyyyMMdd') as ds,
          SUM(order_fee) as total_fee
      FROM my_catalog.order_dw.dwd_orders
      WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL
      GROUP BY order_user_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd');
    2. 單擊頁面右上方的部署,單擊確定,部署dws_shops和dws_users作業。

步驟四:啟動與查看DWD和DWS作業

  • 啟動與查看DWD作業數據。

    1. 運維中心 > 作業運維頁面,在下拉框中選擇批作業,單擊dwd_orders作業操作列下的啟動。

      對應批作業實例列表中,生成了一個啟動中的批作業實例,如下圖所示。

      image.png

      當該作業實例的狀態變為已完成時,表示數據處理完畢。

    2. 探查數據結果。

      查詢腳本文本編輯區域,輸入如下SQL語句并單擊左側的運行,查詢DWD表的數據。

      SELECT * FROM `my_catalog`.`order_dw`.`dwd_orders`;

      結果如下所示。

      image

  • 啟動與查看DWS作業數據。

    1. 運維中心 > 作業運維頁面,在下拉框中選擇批作業,單擊dws_shops和dws_users作業操作列下的啟動。

    2. 查詢腳本文本編輯區域,輸入如下SQL語句并單擊左側的運行,查詢DWS表的數據。

      SELECT * FROM `my_catalog`.`order_dw`.`dws_shops`;
      SELECT * FROM `my_catalog`.`order_dw`.`dws_users`;

      結果如下所示。

      image.png image.png

步驟五:通過作業編排構建批處理鏈路

本部分將把前面創建的作業編排成一個工作流,使得它們可以被統一的觸發并有序的執行。

  1. 創建工作流。

    1. 單擊左側的運維中心 > 任務編排,單擊創建工作流。

    2. 在彈出的面板中,填入名稱wf_orders,調度類型保持不變(默認為手動觸發),資源隊列選擇default-queue后,單擊創建,進入工作流編輯頁面。

    3. 編輯工作流。

      1. 單擊初始的節點,命名為v_dwd_orders,選取其作業為dwd_orders。

      2. 單擊添加節點,創建節點v_dws_shops,選取其作業為dws_shops,上游節點為v_dwd_orders。

      3. 再次單擊添加節點,創建節點v_dws_users,選取其作業為dws_users,上游節點為v_dwd_orders。

      4. 單擊右上角的保存確定。

        創建的工作流如下所示。

        image.png

  2. 手動觸發工作流運行

    說明

    工作流也可以被修改為定時調度的工作流,只需要在任務編排頁面,單擊工作流右側的編輯工作流,將調度模式修改為周期調度即可,詳情請參見任務編排(公測)。

    1. 在觸發工作流運行前,先給ODS表插入一些新數據,用于驗證工作流的執行結果。

      查詢腳本文本編輯區域,輸入如下SQL語句并單擊左側的運行。

      USE `my_catalog`.`order_dw`;
      
      INSERT INTO orders VALUES
      (100008, 'user_001', 12346, 1, 10000, TO_TIMESTAMP('2023-02-15 17:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
      (100009, 'user_002', 12347, 2, 20000, TO_TIMESTAMP('2023-02-15 18:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
      (100010, 'user_003', 12348, 3, 30000, TO_TIMESTAMP('2023-02-15 19:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1);
      
      INSERT INTO orders_pay VALUES
      (2008, 100008, 1, TO_TIMESTAMP('2023-02-15 20:40:56')),
      (2009, 100009, 1, TO_TIMESTAMP('2023-02-15 20:40:56')),
      (2010, 100010, 1, TO_TIMESTAMP('2023-02-15 20:40:56'));

      單擊結果欄左側的在Flink UI,觀察作業狀態。

    2. 運維中心 > 任務編排頁面,單擊上一部分創建的工作流操作列下的觸發運行,單擊確定,觸發工作流運行。

      image.png

      單擊工作流名稱,進入工作流實例列表與詳情頁面,可以看到工作流實例列表。

      image.png

      單擊運行中的工作流實例運行ID,即可進入工作流實例的執行詳情頁面,觀察到各個節點的執行狀態。等待整個工作流執行完成。

      image.png

  3. 查看工作流執行結果

    1. 查詢腳本文本編輯區域,輸入如下SQL語句并單擊左側的運行

      SELECT * FROM `my_catalog`.`order_dw`.`dws_shops`;
      SELECT * FROM `my_catalog`.`order_dw`.`dws_users`;
    2. 查看工作流的執行結果。

      可以看到,ODS層新增數據經過處理已經寫入DWS表中。

      image.png image.png

相關文檔