本文為您介紹Spark如何讀取Hologres表數據。
Hologres表全量數據
Spark讀取Hologres表全量數據通過JDBC接口,JDBC的Driver需要使用PostgreSQL驅動,請至官網下載PostgreSQL JDBC Driver,需要使用42.2.25以上版本的JDBC驅動,詳情請參見JDBC。
使用spark-submit、spark-shell、spark-sql命令訪問Hologres時,需要加上PostgreSQL驅動依賴到classpath,即增加命令--driver-class-path <postgresql-**.jar>--jars <postgresql-**.jar>
,其中<postgresql-**.jar>
是下載的PostgreSQL JDBC Driver的路徑。
例如,下載的PostgreSQL驅動的路徑為/home/hadoop/postgresql-42.6.0.jar
。
spark-submit
spark-submit --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar --class ***
spark-shell
spark-Shell --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar
spark-sql
spark-sql --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar
讀取Hologres表全量數據示例如下。
spark-scala-dataframe全量讀取
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db")
.option("dbtable", "tablename")
.option("user", "ram ak")
.option("password", "ram ak secret")
.load()
jdbcDF.show(1000)
部分參數含義如下:
url
:本示例為jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db
,其中hgpostcn****.hologres.aliyuncs.com:80
為您Hologres實例的網絡地址。您可以在Hologres管理控制臺的實例詳情頁獲取網絡地址。user
:Hologres賬號的AccessKey ID。password
:Hologres賬號的AccessKey Secret。
option
更多配置,請參見JDBC To Other Databases。
spark-sql全量讀取
CREATE TABLE holo_test
USING jdbc2
OPTIONS(url='jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db',
driver='org.postgresql.Driver',
dbtable='test_table',
user='ram ak',
password='ram ak secret'
);
desc holo_test;
select * from holo_test;
Hologres表增量數據
Hologres增量數據詳情,請參見訂閱Hologres Binlog。
spark-streaming訪問Hologres增量數據時,需要添加一些Hologres相關的依賴包到classpath上。添加內容如下所示。
Spark2依賴
--driver-class-path /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar:/opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/postgresql-42.2.23.jar --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar, /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/postgresql-42.2.23.jar
Spark3依賴
--driver-class-path /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.1.jar:/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/postgresql-42.2.23.jar --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.1.jar,/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/postgresql-42.2.23.jar
讀取Hologres表增量數據示例如下。
spark-structured-streaming增量讀取
//讀取Hologres增量數據。
val df = spark
.readStream
.format("hologres")
.option(url, 'jdbc:postgresql://hgpostcn-****.hologres.aliyuncs.com:80/test_db')
.option(username, 'ram ak')
.option(password, 'ram ak secret')
.option(tablename, 'test_1')
.option(starttime, '2022-04-19 10:00:00')
.load()
//寫入到delta中。
df.writeStream
.outputMode("append")
.format("delta")
.start()
spark-streaming-sql增量讀取
drop table if exists holo;
CREATE TABLE if not exists holo
USING hologres
OPTIONS(url='jdbc:postgresql://hgpostcn-****.hologres.aliyuncs.com:80/test_db',
username='ram ak',
password='ram ak secret',
tablename='test_1',
starttime='2022-04-19 10:00:00',
max.offset.per.trigger="1");
desc holo;
drop table if exists holo_sink;
create table if not exists holo_sink(id int, name string) using delta;
create scan holo_scan
on holo
using stream
;
create stream holo_test
options(
checkpointLocation='file:///tmp/',
outputMode='Append',
triggerType='ProcessingTime',
triggerIntervalMs='3000')
insert into holo_sink
select id, name from holo_scan;