作為流批一體的計算框架,Flink不僅能夠提供低延遲的流式數據處理(Streaming Data Processsing),也能進行高吞吐的批處理(Batch Data Processing)。實時計算Flink版對批處理能力進行了專門的支持,提供了包括作業開發、作業運維、作業編排、資源隊列管理、數據結果探查等能力,可以利用Flink批處理能力更好地解決業務需求。本文通過具體的示例為您介紹如何利用實時計算Flink版關鍵功能進行數據批處理。
功能介紹
實時計算Flink版提供了以下關鍵功能來支持Flink批處理:
SQL作業開發:在SQL開發頁面的作業草稿頁簽,可以創建批作業草稿,批作業草稿會以批作業的形式被部署和執行。
作業管理:在作業運維頁面,可以直接部署JAR或Python類型的批作業。在頂部下拉框中選擇批作業,查看已部署的批作業。展開目標批作業,可查看其作業實例列表。通常,一個批作業的不同作業實例具有相同的處理邏輯,但是采用不同的參數,例如處理的數據所屬日期。
查詢腳本:在SQL開發頁面的查詢腳本頁簽,可以執行一些DDL或短查詢,快速地進行數據管理和數據探查。這些短查詢執行在預創建的Flink Session中,通過資源復用,實現低延遲的簡單查詢。
管理元數據:在元數據管理頁面,可以創建和查看Catalog,包括其中的數據庫和表的信息。您也可以在SQL開發頁面的元數據頁簽進行查看,提高開發效率。
任務編排(公測):在任務編排頁面,可以定義工作流,通過可視化的操作方式,編排一系列批作業的執行依賴。工作流會作為一個整體,根據定義好的依賴關系執行包含的批作業。支持通過手動觸發或定時調度方式來執行創建好的工作流。
管理資源隊列:在隊列管理頁面,可以對工作空間中的資源進行劃分,從而避免流作業和批作業、以及不同優先級的作業間發生資源爭搶。
注意事項
已創建Flink工作空間,詳情請參見開通實時計算Flink版。
由于本文示例使用Apache Paimon存儲數據,僅實時計算引擎VVR 8.0.5及以上版本支持本文示例。
示例場景
本文以一個電子商務平臺的業務場景為例,使用Apache Paimon的湖倉格式對數據進行存儲。模擬了一個數據倉庫結構,包括ODS(操作數據存儲)、DWD(數據倉庫細節級)、DWS(數據倉庫匯總級)的存儲層級。通過Flink的批處理能力,對數據進行加工清洗后寫入Paimon表,從而實現數據分層結構的搭建。
準備工作
創建查詢腳本。
通過查詢腳本頁簽,您可以創建Catalog以及其中的數據庫和表,并且向表中插入一些模擬的數據。
創建Paimon Catalog。
在查詢腳本的文本編輯區域,輸入如下SQL語句。
CREATE CATALOG `my_catalog` WITH ( 'type' = 'paimon', 'metastore' = 'filesystem', 'warehouse' = '<warehouse>', 'fs.oss.endpoint' = '<fs.oss.endpoint>', 'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>', 'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>' );
參數配置項如下。
配置項
說明
是否必填
備注
type
Catalog類型。
是
固定值為Paimon。
metastore
元數據存儲類型。
是
本文示例填寫filesystem,其他類型詳情請參見管理Paimon Catalog。
warehouse
OSS服務中所指定的數倉目錄。
是
格式為oss://<bucket>/<object>。其中:
bucket:表示您創建的OSS Bucket名稱。
object:表示您存放數據的路徑。
請在OSS管理控制臺上查看您的Bucket和Object名稱。
fs.oss.endpoint
OSS服務的連接地址。
否
當warehouse指定的OSS Bucket與Flink工作空間不在同一地域,或使用其它賬號下的OSS bucket時需要填寫。
請參見訪問域名和數據中心。
fs.oss.accessKeyId
擁有讀寫OSS權限的阿里云賬號或RAM賬號的AccessKey。
否
當warehouse指定的OSS Bucket與Flink工作空間不在同一地域,或使用其它賬號下的OSS Bucket時需要填寫。獲取方法請參見創建AccessKey。
fs.oss.accessKeySecret
擁有讀寫OSS權限的阿里云賬號或RAM賬號的AccessKey Secret。
否
選中上述代碼,單擊左側的運行。
返回
The following statement has been executed successfully!
信息表示Catalog創建成功。此時可以在元數據管理頁面(或是SQL開發頁面的元數據子頁面),查看新創建的Catalog。
操作流程
步驟一:創建ODS表并插入測試數據
為了簡化本示例,我們直接向ODS表中插入了一些測試數據,用于后續的DWD/DWS表的數據生成。在實際生產中,一般會使用Flink流處理從外部數據源讀取數據并寫入到湖中作為ODS層,具體可以參見 Paimon快速開始:基本功能。
在查詢腳本文本編輯區域,輸入如下SQL語句并單擊左側的運行。
CREATE DATABASE `my_catalog`.`order_dw`; USE `my_catalog`.`order_dw`; CREATE TABLE orders ( order_id BIGINT, user_id STRING, shop_id BIGINT, product_id BIGINT, buy_fee BIGINT, create_time TIMESTAMP, update_time TIMESTAMP, state INT ); CREATE TABLE orders_pay ( pay_id BIGINT, order_id BIGINT, pay_platform INT, create_time TIMESTAMP ); CREATE TABLE product_catalog ( product_id BIGINT, catalog_name STRING ); -- 插入測試數據 INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000, TO_TIMESTAMP('2023-02-15 16:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100002, 'user_002', 12346, 2, 4000, TO_TIMESTAMP('2023-02-15 15:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100003, 'user_003', 12347, 3, 3000, TO_TIMESTAMP('2023-02-15 14:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100004, 'user_001', 12347, 4, 2000, TO_TIMESTAMP('2023-02-15 13:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100005, 'user_002', 12348, 5, 1000, TO_TIMESTAMP('2023-02-15 12:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100006, 'user_001', 12348, 1, 1000, TO_TIMESTAMP('2023-02-15 11:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100007, 'user_003', 12347, 4, 2000, TO_TIMESTAMP('2023-02-15 10:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1); INSERT INTO orders_pay VALUES (2001, 100001, 1, TO_TIMESTAMP('2023-02-15 17:40:56')), (2002, 100002, 1, TO_TIMESTAMP('2023-02-15 17:40:56')), (2003, 100003, 0, TO_TIMESTAMP('2023-02-15 17:40:56')), (2004, 100004, 0, TO_TIMESTAMP('2023-02-15 17:40:56')), (2005, 100005, 0, TO_TIMESTAMP('2023-02-15 18:40:56')), (2006, 100006, 0, TO_TIMESTAMP('2023-02-15 18:40:56')), (2007, 100007, 0, TO_TIMESTAMP('2023-02-15 18:40:56')); INSERT INTO product_catalog VALUES (1, 'phone_aaa'), (2, 'phone_bbb'), (3, 'phone_ccc'), (4, 'phone_ddd'), (5, 'phone_eee');
說明本文創建的是不帶主鍵的Paimon Append Only表,其相比于Paimon主鍵表具有更好的批量寫入性能,但不支持基于主鍵的更新操作。
執行結果會包含多個子標簽頁,返回
The following statement has been executed successfully!
信息表示對應的DDL語句執行成功。INSERT等DML語句則會返回一個JobId,表明生成了Flink作業并在Flink Session中執行,單擊結果欄左側的在Flink UI中查看,可觀察到這幾條SQL語句的執行情況,等待數秒至其執行完成。
探查ODS表數據。
在查詢腳本文本編輯區域,輸入如下SQL語句并單擊左側的運行。
SELECT count(*) as order_count FROM `my_catalog`.`order_dw`.`orders`; SELECT count(*) as pay_count FROM `my_catalog`.`order_dw`.`orders_pay`; SELECT * FROM `my_catalog`.`order_dw`.`product_catalog`;
這些SQL語句也會在Flink Session中執行,最終可以在3個查詢的結果頁面中查看返回結果。
步驟二:創建DWD和DWS表
在查詢腳本文本編輯區域,輸入如下SQL語句并單擊左側的運行。
USE `my_catalog`.`order_dw`;
CREATE TABLE dwd_orders (
order_id BIGINT,
order_user_id STRING,
order_shop_id BIGINT,
order_product_id BIGINT,
order_product_catalog_name STRING,
order_fee BIGINT,
order_create_time TIMESTAMP,
order_update_time TIMESTAMP,
order_state INT,
pay_id BIGINT,
pay_platform INT COMMENT 'platform 0: phone, 1: pc',
pay_create_time TIMESTAMP
) WITH (
'sink.parallelism' = '2'
);
CREATE TABLE dws_users (
user_id STRING,
ds STRING,
total_fee BIGINT COMMENT '當日完成支付的總金額'
) WITH (
'sink.parallelism' = '2'
);
CREATE TABLE dws_shops (
shop_id BIGINT,
ds STRING,
total_fee BIGINT COMMENT '當日完成支付總金額'
) WITH (
'sink.parallelism' = '2'
);
此處創建的仍然是Paimon Append Only表。Paimon表作為Flink Sink不支持自動并發推導,需要顯式設置其并發度,否則可能會報錯。
步驟三:創建與部署DWD和DWS作業
創建和部署DWD作業。
創建DWD表更新作業。
在
頁面新建空白的批作業草稿,命名為dwd_orders,將如下SQL語句復制到文本編輯區域中。由于DWD表是Paimon Append Only表,因此此處使用INSERT OVERWRITE語句進行整體的覆寫。INSERT OVERWRITE my_catalog.order_dw.dwd_orders SELECT o.order_id, o.user_id, o.shop_id, o.product_id, c.catalog_name, o.buy_fee, o.create_time, o.update_time, o.state, p.pay_id, p.pay_platform, p.create_time FROM my_catalog.order_dw.orders as o, my_catalog.order_dw.product_catalog as c, my_catalog.order_dw.orders_pay as p WHERE o.product_id = c.product_id AND o.order_id = p.order_id
單擊頁面右上方的部署,單擊確定,部署dwd_orders作業。
創建和部署DWS作業。
創建DWS表更新作業。
在
頁面新建兩個空白的批作業草稿,分別命名為dws_shops和dws_users,將下列SQL語句分別復制到對應草稿的文本編輯區域中。INSERT OVERWRITE my_catalog.order_dw.dws_shops SELECT order_shop_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd') as ds, SUM(order_fee) as total_fee FROM my_catalog.order_dw.dwd_orders WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL GROUP BY order_shop_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd');
INSERT OVERWRITE my_catalog.order_dw.dws_users SELECT order_user_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd') as ds, SUM(order_fee) as total_fee FROM my_catalog.order_dw.dwd_orders WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL GROUP BY order_user_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd');
單擊頁面右上方的部署,單擊確定,部署dws_shops和dws_users作業。
步驟四:啟動與查看DWD和DWS作業
啟動與查看DWD作業數據。
在
頁面,在下拉框中選擇批作業,單擊dwd_orders作業操作列下的啟動。對應批作業實例列表中,生成了一個啟動中的批作業實例,如下圖所示。
當該作業實例的狀態變為已完成時,表示數據處理完畢。
探查數據結果。
在查詢腳本文本編輯區域,輸入如下SQL語句并單擊左側的運行,查詢DWD表的數據。
SELECT * FROM `my_catalog`.`order_dw`.`dwd_orders`;
結果如下所示。
啟動與查看DWS作業數據。
在
頁面,在下拉框中選擇批作業,單擊dws_shops和dws_users作業操作列下的啟動。在查詢腳本文本編輯區域,輸入如下SQL語句并單擊左側的運行,查詢DWS表的數據。
SELECT * FROM `my_catalog`.`order_dw`.`dws_shops`; SELECT * FROM `my_catalog`.`order_dw`.`dws_users`;
結果如下所示。
步驟五:通過作業編排構建批處理鏈路
本部分將把前面創建的作業編排成一個工作流,使得它們可以被統一的觸發并有序的執行。
創建工作流。
單擊左側的
,單擊創建工作流。在彈出的面板中,填入名稱wf_orders,調度類型保持不變(默認為手動觸發),資源隊列選擇default-queue后,單擊創建,進入工作流編輯頁面。
編輯工作流。
單擊初始的節點,命名為v_dwd_orders,選取其作業為dwd_orders。
單擊添加節點,創建節點v_dws_shops,選取其作業為dws_shops,上游節點為v_dwd_orders。
再次單擊添加節點,創建節點v_dws_users,選取其作業為dws_users,上游節點為v_dwd_orders。
單擊右上角的保存并確定。
創建的工作流如下所示。
手動觸發工作流運行
說明工作流也可以被修改為定時調度的工作流,只需要在任務編排頁面,單擊工作流右側的編輯工作流,將調度模式修改為周期調度即可,詳情請參見任務編排(公測)。
在觸發工作流運行前,先給ODS表插入一些新數據,用于驗證工作流的執行結果。
在查詢腳本文本編輯區域,輸入如下SQL語句并單擊左側的運行。
USE `my_catalog`.`order_dw`; INSERT INTO orders VALUES (100008, 'user_001', 12346, 1, 10000, TO_TIMESTAMP('2023-02-15 17:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100009, 'user_002', 12347, 2, 20000, TO_TIMESTAMP('2023-02-15 18:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100010, 'user_003', 12348, 3, 30000, TO_TIMESTAMP('2023-02-15 19:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1); INSERT INTO orders_pay VALUES (2008, 100008, 1, TO_TIMESTAMP('2023-02-15 20:40:56')), (2009, 100009, 1, TO_TIMESTAMP('2023-02-15 20:40:56')), (2010, 100010, 1, TO_TIMESTAMP('2023-02-15 20:40:56'));
單擊結果欄左側的在Flink UI,觀察作業狀態。
在
頁面,單擊上一部分創建的工作流操作列下的觸發運行,單擊確定,觸發工作流運行。單擊工作流名稱,進入工作流實例列表與詳情頁面,可以看到工作流實例列表。
單擊運行中的工作流實例運行ID,即可進入工作流實例的執行詳情頁面,觀察到各個節點的執行狀態。等待整個工作流執行完成。
查看工作流執行結果
在查詢腳本文本編輯區域,輸入如下SQL語句并單擊左側的運行。
SELECT * FROM `my_catalog`.`order_dw`.`dws_shops`; SELECT * FROM `my_catalog`.`order_dw`.`dws_users`;
查看工作流的執行結果。
可以看到,ODS層新增數據經過處理已經寫入DWS表中。
相關文檔
如果您想要對Flink批處理的原理和配置調優有更多了解,請參見 Flink批處理調優指南
如果您想要使用Flink+Paimon搭建實時數倉,操作步驟詳情請參見基于Flink+Paimon搭建流式湖倉。
除了在實時計算開發控制臺進行Flink作業開發等操作,您同樣可以在本地進行,具體操作請參見VS Code本地開發插件。