Spark Thrift Server是Apache Spark提供的一種服務,支持通過JDBC或ODBC連接并執行SQL查詢,從而便捷地將Spark環境與現有的商業智能(BI)工具、數據可視化工具及其他數據分析工具集成。本文主要為您介紹如何創建并連接Spark Thrift Server。
前提條件
已創建工作空間,詳情請參見管理工作空間。
創建Spark Thrift Server會話
Spark Thrift Server創建完成后,您可以在創建Spark SQL類型任務時選擇此會話。
進入會話管理頁面。
在左側導航欄,選擇
。在Spark頁面,單擊目標工作空間名稱。
在EMR Serverless Spark頁面,單擊左側導航欄中的會話管理。
在會話管理頁面,單擊Spark Thrift Server頁簽。
單擊創建Spark Thrift Server。
在創建Spark Thrift Server頁面,配置以下信息,單擊創建。
參數
說明
名稱
新建Spark Thrift Server的名稱。
長度限制為1~64個字符,僅支持字母、數字、短劃線(-)、下劃線(_)和空格。
部署隊列
請選擇合適的開發隊列部署會話。僅支持選擇開發或者開發和生產公用的隊列。
隊列更多信息,請參見管理資源隊列。
引擎版本
當前會話使用的引擎版本。引擎版本號含義等詳情請參見引擎版本介紹。
使用Fusion加速
Fusion可加速Spark負載的運行并降低任務的總成本。有關計費信息,請參見產品計費。有關Fusion引擎介紹,請參見Fusion引擎。
自動停止
默認開啟。45分鐘不活動后自動停止Spark Thrift Server會話。
Spark Thrift Server端口
默認443端口。
認證方式
僅支持Token方式。
spark.driver.cores
用于指定Spark應用程序中Driver進程所使用的CPU核心數量。默認值為1 CPU。
spark.driver.memory
用于指定Spark應用程序中Driver進程可以使用的內存量。默認值為3.5 GB。
spark.executor.cores
用于指定每個Executor進程可以使用的CPU核心數量。默認值為1 CPU。
spark.executor.memory
用于指定每個Executor進程可以使用的內存量。默認值為3.5 GB。
spark.executor.instances
Spark分配的執行器(Executor)數量。默認值為2。
動態資源分配
默認關閉。開啟后,需要配置以下參數:
executors數量下限:默認為2。
executors數量上限:如果未設置spark.executor.instances,則默認值為10。
更多內存配置
spark.driver.memoryOverhead:每個Driver可利用的非堆內存。如果未設置該參數,Spark會根據默認值自動分配,默認值為
max(384MB, 10% × spark.driver.memory)
。spark.executor.memoryOverhead:每個Executor可利用的非堆內存。如果未設置該參數,Spark會根據默認值自動分配,默認值為
max(384MB, 10% × spark.executor.memory)
。spark.memory.offHeap.size:Spark可用的堆外內存大小。默認值為1 GB。
僅在
spark.memory.offHeap.enabled
設置為true
時生效。默認情況下,當采用Fusion Engine時,該功能將處于啟用狀態,其非堆內存默認設置為1 GB。
Spark配置
填寫Spark配置信息,默認以空格符分隔,例如,
spark.sql.catalog.paimon.metastore dlf
。獲取Endpoint信息。
在Spark Thrift Server頁簽,單擊新增的Spark Thrift Server的名稱。
在總覽頁簽,復制Endpoint信息。
創建Token
Token使用時,請在請求的header中添加--header `x-acs-spark-livy-token: token`
。
在Spark Thrift Server頁簽,單擊新增的Spark Thrift Server的名稱。
單擊Token管理頁簽。
單擊創建Token。
在創建Token對話框中,配置以下信息,單擊確定。
參數
說明
名稱
新建Token的名稱。
過期時間
設置該Token的過期時間。設置的天數應大于或等于1。默認情況下為開啟狀態,365天后過期。
復制Token信息。
重要Token創建完成后,請務必立即復制新Token的信息,后續不支持查看。如果您的Token過期或遺失,請選擇新建Token或重置Token。
連接Spark Thrift Server
在連接Spark Thrift Server時,請根據您的實際情況替換以下信息:
<endpoint>
:您在總覽頁簽獲取的Endpoint信息。<username>
:您在Token管理頁簽新建的Token的名稱。<token>
:您在Token管理頁簽復制的Token信息。
使用Python連接Spark Thrift Server
執行以下命令,安裝PyHive和Thrift包。
pip install pyhive thrift
編寫Python腳本,連接Spark Thrift Server。
以下是一個Python腳本示例,展示如何連接到Hive并顯示數據庫列表。
from pyhive import hive if __name__ == '__main__': # 替換<endpoint>, <username>, 和 <token> 為您的實際信息。 cursor = hive.connect('<endpoint>', port=443, scheme='https', username='<username>', password='<token>').cursor() cursor.execute('show databases') print(cursor.fetchall()) cursor.close()
使用Java連接Spark Thrift Server
請在您的
pom.xml
中引入以下Maven依賴。<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.1.0</version> </dependency> </dependencies>
說明當前Serverless Spark內置的Hive版本為2.x,因此僅支持hive-jdbc 2.x版本。
編寫Java代碼,連接Spark Thrift Server。
以下是一個Sample Java代碼,用于連接到Spark Thrift Server,并查詢數據庫列表。
import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; public class Main { public static void main(String[] args) throws Exception { String url = "jdbc:hive2://<endpoint>:443/;transportMode=http;httpPath=cliservice/token/<token>"; Class.forName("org.apache.hive.jdbc.HiveDriver"); Connection conn = DriverManager.getConnection(url); HiveStatement stmt = (HiveStatement) conn.createStatement(); String sql = "show databases"; System.out.println("Running " + sql); ResultSet res = stmt.executeQuery(sql); ResultSetMetaData md = res.getMetaData(); String[] columns = new String[md.getColumnCount()]; for (int i = 0; i < columns.length; i++) { columns[i] = md.getColumnName(i + 1); } while (res.next()) { System.out.print("Row " + res.getRow() + "=["); for (int i = 0; i < columns.length; i++) { if (i != 0) { System.out.print(", "); } System.out.print(columns[i] + "='" + res.getObject(i + 1) + "'"); } System.out.println(")]"); } conn.close(); } }
通過Spark Beeline連接Spark Thrift Server
(可選)如果您使用的是EMR on ECS的集群,建議您先進入Spark的
bin
目錄。cd /opt/apps/SPARK3/spark-3.4.2-hadoop3.2-1.0.3/bin/
使用Beeline客戶端連接到Spark Thrift Server。
beeline -u "jdbc:hive2://<endpoint>:443/;transportMode=http;httpPath=cliservice/token/<token>"
如果連接Serverless Spark Thrift Server時出現以下報錯,通常是由于Hive Beeline版本不兼容導致的。請確保您使用的是Spark Beeline。
24/08/22 15:09:11 [main]: ERROR jdbc.HiveConnection: Error opening session org.apache.thrift.transport.TTransportException: HTTP Response code: 404
配置Apache Superset以連接Spark Thrift Server
Apache Superset是一個現代數據探索和可視化平臺,具有豐富的從簡單的折線圖到高度詳細的地理空間圖表的圖表形態。更多Superset信息,請參見Superset。
安裝依賴。
請確保您已經安裝了高版本的
thrift
包(建議高于16.0.0)。如未安裝,您可以使用以下命令安裝。pip install thrift==20.0.0
啟動Superset,進入Superset界面。
更多啟動操作信息,請參見Superset文檔。
在頁面右上角單擊DATABASE,進入Connect a database頁面。
在Connect a database頁面,選擇Apache Spark SQL。
填寫連接字符串,然后配置相關數據源參數。
hive+https://<username>:<token>@<endpoint>:443/<db_name>
單擊FINISH,以確認成功連接和驗證。
配置Hue以連接Spark Thrift Server
Hue是一個流行的開源Web界面,可用于與Hadoop生態系統進行交互。關于Hue的更多介紹,請參見Hue官方文檔。
安裝依賴。
請確保您已經安裝了高版本的
thrift
包(建議高于16.0.0)。如未安裝,您可以使用以下命令安裝。pip install thrift==20.0.0
在Hue的配置文件中添加Spark SQL連接串。
請找到Hue的配置文件(通常位于
/etc/hue/hue.conf
),并在文件中添加以下內容。[[[sparksql]]] name = Spark Sql interface=sqlalchemy options='{"url": "hive+https://<username>:<token>@<endpoint>:443/"}'
重啟Hue。
修改配置后,您需要執行以下命令重啟Hue服務以使更改生效。
sudo service hue restart
驗證連接。
成功重啟后,訪問Hue界面,找到Spark SQL選項。如果配置正確,您應能夠成功連接到Spark Thrift Server并執行SQL查詢。
相關文檔
Fusion引擎更多介紹,請參見Fusion引擎。