E-MapReduce的Flink Table Store服務支持通過Spark SQL對Flink Table Store進行讀寫操作。本文通過示例為您介紹如何通過Spark SQL對Flink Table Store進行讀寫操作。
使用限制
僅EMR-3.45.0版本、EMR-5.11.0版本的集群,支持Spark SQL和Spark CLI對Flink Table Store進行讀寫操作。
僅Spark3的Spark SQL可以通過Catalog讀寫Flink Table Store。
Spark CLI只能通過文件系統或對象存儲的路徑讀取Flink Table Store。
操作步驟
步驟一:創建Catalog
Flink Table Store將數據和元數據都保存在文件系統或對象存儲中,存儲的根路徑由spark.sql.catalog.tablestore.warehouse參數指定。如果指定的warehouse路徑不存在,將會自動創建該路徑;如果指定的warehouse路徑存在,您可以通過該Catalog訪問路徑中已有的表。
您還可以同步元數據到Hive或DLF中,方便其它服務訪問Flink Table Store。
創建Filesystem Catalog。
Filesystem Catalog僅將元數據保存在文件系統或對象存儲中。
spark-sql --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog --conf spark.sql.catalog.tablestore.metastore=filesystem --conf spark.sql.catalog.tablestore.warehouse=oss://oss-bucket/warehouse
創建Hive Catalog
Hive Catalog會同步元數據到Hive MetaStore中。在Hive Catalog中創建的表可以直接在Hive中查詢。
Hive查詢Flink Table Store,詳情請參見Flink Table Store與Hive集成。
spark-sql --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog --conf spark.sql.catalog.tablestore.metastore=hive --conf spark.sql.catalog.tablestore.uri=thrift://master-1-1:9083 --conf spark.sql.catalog.tablestore.warehouse=oss://oss-bucket/warehouse
說明spark.sql.catalog.tablestore.uri
為Hive MetaStore Service的地址。創建DLF Catalog
DLF Catalog會同步元數據到DLF中。
重要創建集群時,元數據須為DLF統一元數據。
spark-sql --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog --conf spark.sql.catalog.tablestore.metastore=dlf --conf spark.sql.catalog.tablestore.warehouse=oss://oss-bucket/warehouse
步驟二 :通過Spark SQL讀寫Flink Table Store中的數據
執行以下Spark SQL語句,在Catalog中創建一張表,并讀寫表中的數據。
-- 在創建的Catalog中,創建并使用一個測試database。
CREATE DATABASE tablestore.test_db;
USE tablestore.test_db;
-- 創建Flink Table Store表。
CREATE TABLE test_tbl (
uuid int,
name string,
price double
) TBLPROPERTIES (
'primary-key' = 'uuid'
);
-- 向Flink Table Store中寫入數據。
INSERT INTO test_tbl VALUES (1, 'apple', 3.5), (2, 'banana', 4.0), (3, 'cherry', 20.5);
-- 讀取表中的數據。
SELECT * FROM test_tbl;
步驟三:通過Spark CLI讀Flink Table Store中的數據
執行以下命令,啟動Spark CLI。
spark-shell
在Spark CLI中運行以下Scala代碼,查詢指定目錄下存儲的Flink Table Store表。
val dataset = spark.read.format("tablestore").load("oss://oss-bucket/warehouse/test_db.db/test_tbl") dataset.createOrReplaceTempView("test_tbl") spark.sql("SELECT * FROM test_tbl").show()