本文介紹通過JDBC連接Spark Thrift Servert并成功提交Spark作業。
前提條件
連接Spark Thrift Server需要校驗用戶名和密碼,請進行用戶認證配置,請參見:用戶管理
DDI集群Spark Thrift Server默認端口號為10001,請確認成功添加安全組白名單,請參見:安全組白名單
背景信息
JDBC連接Spark Thrift Server如下:
Beeline:通過HiveServer2的JDBC客戶端進行連接。
Java:編寫Java代碼進行連接。
Python:編寫Python代碼進行連接。
Beeline客戶端連接Spark Thrift Server
執行如下命令,進入Beeline客戶端。
beeline
返回如下信息
Beeline version 2.3.7 by Apache Hive
執行如下命令,連接Spark Thrift Servert。
!connect jdbc:hive2://{ddi-header-ip}:10001/{db_name}
輸入用戶名和密碼。
Enter username for jdbc:hive2://ip:10001/beijing_dlf_db_test: username
Enter password for jdbc:hive2://ip:10001/beijing_dlf_db_test: ********
查詢數據,返回結果如下:
0: jdbc:hive2://ip:10001/beijing_d> select * from table_name limit 10;
+---------+-------------+
| action | date |
+---------+-------------+
| Close | 2016-07-27 |
| Open | 2016-07-27 |
| Close | 2016-07-27 |
| Close | 2016-07-27 |
| Open | 2016-07-27 |
| Close | 2016-07-27 |
| Open | 2016-07-27 |
| Open | 2016-07-27 |
| Open | 2016-07-27 |
| Open | 2016-07-27 |
+---------+-------------+
10 rows selected (15.853 seconds)
Java代碼連接Spark Thrift Server
在執行本操作前,確保您已安裝Java環境和Java編程工具,并且已配置環境變量
Java代碼連接Spark Thrift Server需要下載Databricks提供的依賴包,下載路徑:Databricks JDBC Driver
將項目依賴SparkJDBC42.jar添加到編程工具的Lib下,如圖:
編寫代碼,連接Spark Thrift Server并提交作業。
代碼如下:
import com.simba.spark.jdbc.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class SparkJDBC {
private static Connection connectViaDS() throws Exception {
Connection connection = null;
Class.forName("com.simba.spark.jdbc.Driver");
DataSource ds = new com.simba.spark.jdbc.DataSource();
ds.setURL("jdbc:spark://{ip}:10001/{db_name};AuthMech=3;UID=username;PWD=ps");
connection = ds.getConnection();
return connection;
}
public static void main(String[] args) throws Exception {
Connection connection = connectViaDS();
System.out.println("日志打印:"+connection.getClientInfo());
PreparedStatement tables = connection.prepareStatement("show databases ");
ResultSet res = tables.executeQuery();
while (res.next()){
System.out.println("database_name :"+ res.getString(1));
}
res.close();
tables.close();
connection.close();
}
}
本地測試是否正常執行。
注意
執行代碼過程中如果出現NoSuchFileException請聯系DDI運維人員。
Python代碼連接Spark Thrift Server
使用PyHive連接Spark Thrift Server需要依的賴包安裝如下:
pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive
編寫Python代碼,連接Spark Thrift Server并提交作業。
代碼如下:vim sparkJDBC.py
from pyhive import hive
conn = hive.Connection(host='host',database='db_name', port=10001, username='username',password='ps',auth="LDAP")
cursor=conn.cursor()
cursor.execute('select * from table_name limit 10')
for result in cursor.fetchall():print(result)
conn.close()
執行Python代碼結果如下:
python sparkJDBC.py
執行結果:
('Close', '2016-07-27')
('Open', '2016-07-27')
('Close', '2016-07-27')
('Close', '2016-07-27')
('Open', '2016-07-27')
('Close', '2016-07-27')
('Open', '2016-07-27')
('Open', '2016-07-27')
('Open', '2016-07-27')
('Open', '2016-07-27')
文檔內容是否對您有幫助?