本文為您介紹Spark如何讀取Hologres表數據。

Hologres表全量數據

Spark讀取Hologres表全量數據通過JDBC接口,JDBC的Driver需要使用PostgreSQL驅動,請至官網下載PostgreSQL JDBC Driver,需要使用42.2.25以上版本的JDBC驅動,詳情請參見JDBC

使用spark-submit、spark-shell、spark-sql命令訪問Hologres時,需要加上PostgreSQL驅動依賴到classpath,即增加命令--driver-class-path <postgresql-**.jar>--jars <postgresql-**.jar>,其中<postgresql-**.jar>是下載的PostgreSQL JDBC Driver的路徑。

例如,下載的PostgreSQL驅動的路徑為/home/hadoop/postgresql-42.6.0.jar。

spark-submit

spark-submit --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar --class ***

spark-shell

spark-Shell --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar

spark-sql

spark-sql --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar

讀取Hologres表全量數據示例如下。

spark-scala-dataframe全量讀取

// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db")
  .option("dbtable", "tablename")
  .option("user", "ram ak")
  .option("password", "ram ak secret")
  .load()
jdbcDF.show(1000)
部分參數含義如下:
  • url:本示例為jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db,其中hgpostcn****.hologres.aliyuncs.com:80為您Hologres實例的網絡地址。您可以在Hologres管理控制臺的實例詳情頁獲取網絡地址。Net IP
  • user:Hologres賬號的AccessKey ID。
  • password:Hologres賬號的AccessKey Secret。

option更多配置,請參見JDBC To Other Databases。

spark-sql全量讀取

CREATE TABLE holo_test
USING jdbc2
OPTIONS(url='jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db',
driver='org.postgresql.Driver',
dbtable='test_table',
user='ram ak',
password='ram ak secret'
);

desc holo_test;

select * from holo_test;

Hologres表增量數據

Hologres增量數據詳情,請參見訂閱Hologres Binlog。

spark-streaming訪問Hologres增量數據時,需要添加一些Hologres相關的依賴包到classpath上。添加內容如下所示。

Spark2依賴

--driver-class-path /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar:/opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/postgresql-42.2.23.jar  --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar, /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/postgresql-42.2.23.jar

Spark3依賴

--driver-class-path /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.1.jar:/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/postgresql-42.2.23.jar --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.1.jar,/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/postgresql-42.2.23.jar

讀取Hologres表增量數據示例如下。

spark-structured-streaming增量讀取

//讀取Hologres增量數據。
val df = spark
  .readStream
  .format("hologres")
  .option(url, 'jdbc:postgresql://hgpostcn-****.hologres.aliyuncs.com:80/test_db')
  .option(username, 'ram ak')
  .option(password, 'ram ak secret')
  .option(tablename, 'test_1')
  .option(starttime, '2022-04-19 10:00:00')
  .load()

//寫入到delta中。
df.writeStream
    .outputMode("append")
  .format("delta")
  .start()
                

spark-streaming-sql增量讀取

drop table if exists holo;
CREATE TABLE if not exists holo
USING hologres
OPTIONS(url='jdbc:postgresql://hgpostcn-****.hologres.aliyuncs.com:80/test_db',
    username='ram ak',
    password='ram ak secret',
    tablename='test_1',
    starttime='2022-04-19 10:00:00',
    max.offset.per.trigger="1");

desc holo;

drop table if exists holo_sink;
create table if not exists holo_sink(id int, name string) using delta;


create scan holo_scan
on holo
using stream
;

create stream holo_test
options(
checkpointLocation='file:///tmp/',
outputMode='Append',
triggerType='ProcessingTime',
triggerIntervalMs='3000')
insert into holo_sink
select  id,  name  from holo_scan;