Spark Connector
MaxCompute開(kāi)放存儲(chǔ)支持Spark通過(guò)Connector調(diào)用Storage API,直接讀取MaxCompute的數(shù)據(jù),簡(jiǎn)化了讀取數(shù)據(jù)的過(guò)程,提高了數(shù)據(jù)訪問(wèn)性能。同時(shí),Spark集成MaxCompute的數(shù)據(jù)存儲(chǔ)能力,實(shí)現(xiàn)了高效、靈活和強(qiáng)大的數(shù)據(jù)處理和分析。
前提條件
已開(kāi)通MaxCompute服務(wù)并創(chuàng)建MaxCompute項(xiàng)目,詳情請(qǐng)參見(jiàn)開(kāi)通MaxCompute和創(chuàng)建MaxCompute項(xiàng)目。
使用限制
第三方引擎訪問(wèn)MaxCompute時(shí),支持讀取分區(qū)表、聚簇表、物化視圖;不支持讀取MaxCompute的外部表、邏輯視圖、Delta Table。
不支持讀JSON數(shù)據(jù)類型。
開(kāi)放存儲(chǔ)(按量付費(fèi))每個(gè)租戶的請(qǐng)求并發(fā)數(shù)限制默認(rèn)為1000個(gè),并且每個(gè)并發(fā)傳輸速率為10 MB/s。
操作步驟
部署Spark開(kāi)發(fā)環(huán)境。
本文的Spark開(kāi)發(fā)環(huán)境搭建在Linux操作系統(tǒng)下,詳情請(qǐng)參見(jiàn)搭建Linux開(kāi)發(fā)環(huán)境。您也可以選擇搭建在Windows操作系統(tǒng)下,詳情請(qǐng)參見(jiàn)搭建Windows開(kāi)發(fā)環(huán)境。
重要Spark包請(qǐng)使用Spark3.3.x版本,單擊Spark下載并解壓到本地目錄。
下載并編譯Spark Connector(當(dāng)前只支持Spark 3.3.x版本)。
## 下載Spark Connector: git clone https://github.com/aliyun/aliyun-maxcompute-data-collectors.git ## 切換到spark connector目錄cd cd aliyun-maxcompute-data-collectors/spark-connector ## 編譯 mvn clean package ## Datasource Jar包位置 datasource/target/spark-odps-datasource-3.3.1-odps0.43.0.jar ## 將Datasource Jar包拷貝到 $SPARK_HOME/jars/目錄下 cp datasource/target/spark-odps-datasource-3.3.1-odps0.43.0.jar $SPARK_HOME/jars/
配置MaxCompute賬號(hào)訪問(wèn)信息。
在Spark的
conf
目錄下創(chuàng)建spark-defaults.conf
文件:cd $SPARK_HOME/conf vim spark-defaults.conf
文件內(nèi)容示例如下:
## 在spark-defaults.conf配置賬號(hào) spark.hadoop.odps.project.name=doc_test spark.hadoop.odps.access.id=L******************** spark.hadoop.odps.access.key=******************* spark.hadoop.odps.end.point=http://service.cn-beijing.maxcompute.aliyun.com/api spark.hadoop.odps.tunnel.quota.name=ot_xxxx_p#ot_xxxx ##配置MaxCompute Catalog spark.sql.catalog.odps=org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog spark.sql.extensions=org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions
參數(shù)說(shuō)明如下:
分類
參數(shù)
是否必填
說(shuō)明
MaxCompute實(shí)例信息
spark.hadoop.odps.project.name
是
MaxCompute項(xiàng)目(Project)名稱。
此處為MaxCompute項(xiàng)目名稱,非工作空間名稱。您可以登錄MaxCompute控制臺(tái),左上角切換地域后,在左側(cè)導(dǎo)航欄選擇工作區(qū)>項(xiàng)目管理,查看具體的MaxCompute項(xiàng)目名稱。
spark.hadoop.odps.access.id
是
具備目標(biāo)MaxCompute項(xiàng)目訪問(wèn)權(quán)限的AccessKey ID。
您可以進(jìn)入AccessKey管理頁(yè)面獲取AccessKey ID。
spark.hadoop.odps.access.key
是
AccessKey ID對(duì)應(yīng)的AccessKey Secret。
spark.hadoop.odps.end.point
是
MaxCompute服務(wù)的連接地址。您需要根據(jù)創(chuàng)建MaxCompute項(xiàng)目時(shí)選擇的地域以及網(wǎng)絡(luò)連接方式配置Endpoint。各地域及網(wǎng)絡(luò)對(duì)應(yīng)的Endpoint值,請(qǐng)參見(jiàn)Endpoint。
說(shuō)明當(dāng)前僅支持使用阿里云VPC網(wǎng)絡(luò)。
spark.hadoop.odps.tunnel.quota.name
否
訪問(wèn)MaxCompute使用的Quota名稱。訪問(wèn)MaxCompute支持獨(dú)享數(shù)據(jù)傳輸服務(wù)資源組(包年包月)和開(kāi)放存儲(chǔ)(按量計(jì)費(fèi))兩種資源,獲取Quota名稱的方式分別如下:
獨(dú)享數(shù)據(jù)傳輸服務(wù)資源組:登錄MaxCompute控制臺(tái),左上角切換地域后,在左側(cè)導(dǎo)航欄選擇工作區(qū)>配額(Quota)管理,查看可使用的Quota列表。具體操作,請(qǐng)參見(jiàn)計(jì)算資源-Quota管理。
開(kāi)放存儲(chǔ):登錄MaxCompute控制臺(tái),在左側(cè)導(dǎo)航欄選擇租戶管理>租戶屬性,開(kāi)啟開(kāi)放存儲(chǔ)并進(jìn)行授權(quán)操作。具體操作,請(qǐng)參見(jiàn)使用開(kāi)放存儲(chǔ)(按量付費(fèi))。
開(kāi)放存儲(chǔ)資源名稱默認(rèn)為
pay-as-you-go
。
Catalog相關(guān)配置
spark.sql.catalog.odps
是
指定Spark Catalog,值需要設(shè)置為
org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog
。spark.sql.extensions
是
指定Spark會(huì)話擴(kuò)展,值需要設(shè)置為
org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions
。spark.sql.defaultCatalog
否
指定是否為默認(rèn)Catalog,如果需要將MaxCompute設(shè)置為默認(rèn)Catalog,需要將該參數(shù)值設(shè)置為
odps
。spark.sql.catalog.odps.enableVectorizedReader
否
指定是否開(kāi)啟向量化讀,默認(rèn)值為
true
。spark.sql.catalog.odps.columnarReaderBatchSize
否
指定向量化讀每個(gè)批處理包含的行數(shù),默認(rèn)值為
4096
。spark.sql.catalog.odps.splitSizeInMB
否
指定表切片大小,決定讀表并行度,單位為MB,默認(rèn)值為
256
。spark.sql.catalog.odps.enableNamespaceSchema
否
如果MaxCompute項(xiàng)目開(kāi)啟三層模型,值需要設(shè)置為
true
。說(shuō)明Spark通過(guò)Connector調(diào)用Storage API讀取MaxCompute的數(shù)據(jù)時(shí),Spark作業(yè)并發(fā)數(shù)決定可使用的MaxCompute Tunnel并發(fā)數(shù)。一個(gè)Spark并發(fā)讀或?qū)憣?duì)應(yīng)一個(gè)Tunnel并發(fā),而一個(gè)Spark并發(fā)可能同時(shí)讀寫(xiě),因此Tunnel并發(fā)數(shù)會(huì)介于Spark的并發(fā)數(shù)1~2倍之間。例如,Spark的任務(wù)拉起1000并發(fā),那么Tunnel并發(fā)數(shù)會(huì)介于1000到2000之間。
通過(guò)Spark Connector使用MaxCompute。
使用如下命令在Spark的
bin
目錄下啟動(dòng)Spark SQL客戶端:cd $SPARK_HOME/bin spark-sql
使用Spark SQL客戶端執(zhí)行命令示例如下:
查詢MaxCompute項(xiàng)目中存在的表:
SHOW tables in odps.doc_test;
doc_test
為示例MaxCompute項(xiàng)目名稱,實(shí)際運(yùn)行時(shí)請(qǐng)?zhí)鎿Q為您的MaxCompute項(xiàng)目名稱。創(chuàng)建表:
CREATE TABLE odps.doc_test.mc_test_table (name STRING, num BIGINT);
讀取表中數(shù)據(jù):
SELECT * FROM odps.doc_test.mc_test_table;
返回示例如下:
test1 1 test2 2 Time taken: 1.279 seconds, Fetched 2 row(s)
創(chuàng)建分區(qū)表:
CREATE TABLE odps.doc_test.mc_test_table_pt (name STRING, num BIGINT) PARTITIONED BY (pt1 STRING, pt2 STRING);
讀取分區(qū)表中數(shù)據(jù):
SELECT * FROM odps.doc_test.mc_test_table_pt;
返回結(jié)果示例如下:
test1 1 2018 0601 test2 2 2018 0601 Time taken: 1.312 seconds, Fetched 2 row(s)
刪除表:
DROP TABLE IF EXISTS odps.doc_test.mc_test_table;