阿里云實時計算Flink版結合DLF Paimon Catalog,實現Flink作業結果到數據湖的高效寫入和元數據同步,支持無縫對接多種計算引擎并優化數據湖管理,本文為您介紹具體的操作流程。
背景信息
阿里云實時計算Flink版是一套基于Apache Flink構建的實時大數據分析平臺,支持多種數據源和結果表類型。Flink任務可以利用數據湖統一存儲的優勢,使用Paimon表,將作業的結果輸出到數據湖中,實現數據湖分析。在寫入數據湖的過程中,Flink可以通過設置DLF Catalog,將表的元數據同步到數據湖構建(DLF)中。依托數據湖構建產品(DLF)提供的企業級統一元數據能力,Flink+DLF方案可以實現寫入的數據湖表無縫對接阿里云上的計算引擎,如EMR、MaxCompute、Hologres等。也可以通過DLF提供的豐富的數據湖管理能力,實現數據湖生命周期管理和湖格式的優化。
前提條件
已創建Flink全托管工作空間,引擎版本須為VVR 8.0.9及以上。如未創建,詳情請參見開通實時計算Flink版。
已創建DLF 2.0數據目錄。如未創建,詳情請參見創建數據目錄。
說明如果是RAM用戶,在進行數據操作之前,需要先授予相應的資源權限。詳情請參見授權管理。
本文以MySQL數據源為例,需要創建RDS MySQL,詳情請參見快速創建RDS MySQL實例。
創建的RDS MySQL需要和實時計算Flink版在同一個地域、同一個VPC內,RDS MySQL須為5.7及以上版本。
操作流程
步驟1:Flink創建DLF Paimon Catalog
登錄Flink控制臺。
進入創建Catalog頁面。
在Flink全托管頁簽,單擊目標工作空間操作列下的控制臺。
在左側導航欄,單擊元數據管理。
單擊創建Catalog。
創建DLF Paimon Catalog。
在創建Catalog頁面,類型選擇Apache Paimon Catalog。
metastore選擇dlf,下拉列表中選擇標有v2.0的Catalog,Catalog需要提前在DLF控制臺創建。
步驟2:準備MySQL數據
登錄準備好的MySQL實例,詳情請參見通過DMS登錄RDS MySQL。
執行如下命令,在已有數據庫下創建一張表,并插入若干測試數據。
CREATE TABLE `student` ( `id` bigint(20) NOT NULL, `name` varchar(256) DEFAULT NULL, `age` bigint(20) DEFAULT NULL, PRIMARY KEY (`id`) ); INSERT INTO student VALUES (1,'name1',10); INSERT INTO student VALUES (2,'name2',20);
步驟3:創建Flink入湖作業
登錄Flink控制臺。
創建數據源表和目標表。
在Flink全托管頁簽,單擊目標工作空間操作列下的控制臺。
在左側導航欄,選擇數據開發 > 數據查詢。
單擊新建,創建一個新的查詢腳本,執行如下SQL。
說明關于MySQL源表的參數設置和使用條件,請參考MySQL的CDC源表。
-- Catalog名為步驟1創建的DLF Catalog Name,本例中填dlf_clg_test CREATE DATABASE IF NOT EXISTS dlf_clg_test.dlf_testdb; --創建目標表 CREATE TABLE IF NOT EXISTS dlf_clg_test.dlf_testdb.student_paimon ( id BIGINT PRIMARY KEY NOT ENFORCED, name STRING, age BIGINT ) WITH( 'merge-engine' = 'partial-update', -- 使用部分更新數據合并機制產生寬表 'changelog-producer' = 'lookup', -- 使用lookup增量數據產生機制以低延時產出變更數據 'bucket' = '3' );
創建Flink入湖任務。
在SQL開發頁面,創建一個新的SQL流作業,執行如下SQL。
-- 創建CDC源表 CREATE TEMPORARY TABLE student_source ( id INT, name VARCHAR (256), age INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', -- hostname替換為RDS的連接地址 'hostname' = 'rm-xxxxxxxx.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = '<RDS username>', 'password' = '<RDS password>', 'database-name' = '<RDS database>', -- table-name為數據源表,本例中填步驟2創建的student表 'table-name' = 'student' ); INSERT INTO dlf_clg_test.dlf_testdb.student_paimon SELECT * FROM student_source;
b. 單擊保存。
c. 單擊部署。
在左側導航欄,單擊運維中心 > 作業運維。找到上面創建的SQL流作業,單擊啟動。
啟動成功后一段時間,可以看到作業的狀態變成運行中。
步驟4:使用Flink進行數據分析
執行數據查詢和分析。
在Flink全托管頁簽,單擊目標工作空間操作列下的控制臺。
在左側導航欄,單擊數據開發 > 數據查詢。
在SQL開發頁面,創建一個新的查詢腳本,執行如下SQL。
SELECT count(1) FROM dlf_clg_test.dlf_testdb.student_paimon; SELECT * FROM dlf_clg_test.dlf_testdb.student_paimon;
單擊運行,可以直接對Flink寫入數據湖的數據進行查詢和分析。