E-MapReduce支持通過Flink SQL對Paimon進行讀寫操作。本文通過示例為您介紹如何通過Flink SQL對Paimon進行讀寫操作。
前提條件
已創建選擇了Flink和Paimon的DataFlow或Custom類型的集群,創建集群詳情請參見創建集群。
如果您需要使用Hive Catalog的方式,則只能創建選擇了Flink、Paimon和Hive的Custom類型的集群,且元數據類型僅可選擇自建RDS或內置MySQL。
使用限制
EMR-3.46.0版本,暫不支持DLF Catalog和Hive Catalog的方式。
EMR-3.46.0至EMR-3.50.X版本、EMR-5.12.0至EMR-5.16.X版本的集群,支持使用Flink SQL對Paimon進行讀寫操作。
說明EMR-3.51.X及其后續版本、EMR-5.17.X及其后續版本,建議您參考Paimon社區文檔,在EMR集群中自行配置。
操作步驟
步驟一:配置依賴
本文介紹了通過Flink SQL對Paimon進行讀寫操作的三種方法:Filesystem Catalog、Hive Catalog和DLF Catalog。每種方法針對不同的應用場景和環境需求,請根據所選方法配置相應的依賴。
Filesystem Catalog
cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
Hive Catalog
cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/*.jar /opt/apps/FLINK/flink-current/lib/
創建DLF Catalog
cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/PAIMON/paimon-current/lib/jackson/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/METASTORE/metastore-*/hive2/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/*.jar /opt/apps/FLINK/flink-current/lib/
步驟二:啟動集群
本文以Session模式為例,其余模式請參見基礎使用。
執行以下命令,啟動YARN Session。
yarn-session.sh --detached
步驟三:創建Catalog
Paimon將數據和元數據都保存在文件系統(例如,HDFS)或對象存儲(例如,OSS-HDFS)中,存儲的根路徑由warehouse參數指定。如果指定的warehouse路徑不存在,將會自動創建該路徑;如果指定的warehouse路徑存在,則可以通過該Catalog訪問路徑中已有的表。
您還可以將元數據額外同步到Hive或DLF中,方便其他服務訪問Paimon。
EMR-3.46.0和EMR-5.17.0版本,暫不支持DLF Catalog和Hive Catalog的方式。
創建Filesystem Catalog
Filesystem Catalog僅將元數據保存在文件系統或對象存儲中。
執行以下命令,啟動Flink SQL。
sql-client.sh
執行以下Flink SQL語句,創建Filesystem Catalog。
CREATE CATALOG test_catalog WITH ( 'type' = 'paimon', 'metastore' = 'filesystem', 'warehouse' = 'oss://<yourBucketName>/warehouse' );
創建Hive Catalog
Hive Catalog會同步元數據到Hive MetaStore中。在Hive Catalog中創建的表可以直接在Hive中查詢。
Hive查詢Paimon,詳情請參見Paimon與Hive集成。
執行以下命令,啟動Flink SQL。
sql-client.sh
說明即使您使用的是Hive3,也無需修改啟動命令。
執行以下Flink SQL語句,創建Hive Catalog。
CREATE CATALOG test_catalog WITH ( 'type' = 'paimon', 'metastore' = 'hive', 'uri' = 'thrift://master-1-1:9083', -- uri參數指向Hive metastore service的地址。 'warehouse' = 'oss://<yourBucketName>/warehouse' );
創建DLF Catalog
DLF Catalog會將元數據同步到DLF中。
創建集群時,元數據必須為DLF統一元數據。
執行以下命令,啟動Flink SQL。
sql-client.sh
說明即使您使用的是Hive3,也無需修改啟動命令。
執行以下Flink SQL語句,創建DLF Catalog。
CREATE CATALOG test_catalog WITH ( 'type' = 'paimon', 'metastore' = 'dlf', 'hive-conf-dir' = '/etc/taihao-apps/flink-conf', 'warehouse' = 'oss://<yourBucketName>/warehouse' );
步驟四 :流作業讀寫Paimon
執行以下Flink SQL語句,在Catalog中創建一張表,并讀寫表中的數據。
-- 設置為流作業。
SET 'execution.runtime-mode' = 'streaming';
-- Paimon在流作業中需要設置checkpoint。
SET 'execution.checkpointing.interval' = '10s';
-- 使用之前創建的catalog。
USE CATALOG test_catalog;
-- 創建并使用一個測試database。
CREATE DATABASE test_db;
USE test_db;
-- 用datagen產生隨機數據。
CREATE TEMPORARY TABLE datagen_source (
uuid int,
kind int,
price int
) WITH (
'connector' = 'datagen',
'fields.kind.min' = '0',
'fields.kind.max' = '9',
'rows-per-second' = '10'
);
-- 創建Paimon表。
CREATE TABLE test_tbl (
uuid int,
kind int,
price int,
PRIMARY KEY (uuid) NOT ENFORCED
);
-- 向Paimon中寫入數據。
INSERT INTO test_tbl SELECT * FROM datagen_source;
-- 讀取表中的數據。
-- 流式查詢作業運行的過程中,上面觸發的流式寫入作業仍在運行。
-- 您需要保證Flink集群有足夠的資源(task slot)同時運行兩個作業,否則無法查到數據。
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;
步驟五:OLAP查詢Paimon
執行以下Flink SQL語句,對剛才創建的表進行OLAP查詢。
-- 設置為批作業。
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';
-- 使用tableau展示模式,在命令行中直接打出結果。
SET 'sql-client.execution.result-mode' = 'tableau';
-- 對表中數據進行查詢。
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;
步驟六:清理資源
完成測試后,請手動停止流式寫入Paimon的作業,防止資源泄露。
停止作業后,執行以下Flink SQL語句,刪除剛才創建的表。
DROP TABLE test_tbl;