Spark實(shí)時(shí)同步
本文為您介紹如何通過Spark讀取或?qū)懭霐?shù)據(jù)至Hologres的操作方法。
背景信息
Spark是用于大規(guī)模數(shù)據(jù)處理的統(tǒng)一分析引擎,Hologres已經(jīng)與Spark(社區(qū)版以及EMR Spark版)高效打通,快速助力企業(yè)搭建數(shù)據(jù)倉庫。Hologres提供的Spark Connector,支持Spark以批處理的方式將數(shù)據(jù)寫入Hologres,同時(shí)Spark支持讀取多種數(shù)據(jù)源(例如文件、Hive、MySQL、PostgreSQL等)。
Hologres兼容PostgreSQL,因此Spark也可以用讀取PostgreSQL的方式直接讀取Hologres數(shù)據(jù),進(jìn)行ETL處理,再寫入Hologres及其他數(shù)據(jù)源,完成大數(shù)據(jù)開發(fā)抽取、處理、加載的完整閉環(huán)。
前提條件
實(shí)例版本需為V0.9及以上版本。請?jiān)贖ologres管控臺的實(shí)例詳情頁查看當(dāng)前實(shí)例版本,如實(shí)例是V0.9以下版本,請您使用自助升級或加入Hologres釘釘交流群反饋,詳情請參見如何獲取更多的在線支持?。
需要安裝對應(yīng)版本的Spark環(huán)境,能夠運(yùn)行
spark-shell
命令。
連接數(shù)使用
Hologres Spark Connector在進(jìn)行讀寫時(shí),會(huì)使用一定的JDBC連接數(shù)。可能受到如下因素影響:
Spark的并發(fā)特性,在作業(yè)運(yùn)行過程中,可以通過Spark UI觀察到并行執(zhí)行的Task數(shù)量。
Connector在操作時(shí),對于固定復(fù)制(fixed copy)方式的寫入,每個(gè)并發(fā)操作都只使用一個(gè)JDBC連接。而采用INSERT方式寫入時(shí),每個(gè)并發(fā)則會(huì)利用write_thread_size數(shù)量的JDBC連接。在進(jìn)行讀取操作時(shí),每個(gè)并發(fā)同樣使用一個(gè)JDBC連接。
其他方面可能使用的連接數(shù):在作業(yè)啟動(dòng)時(shí),可能會(huì)有獲取Schema信息的操作,這可能會(huì)短暫地建立1個(gè)連接。
因此作業(yè)使用的總連接數(shù)可以通過如下公式計(jì)算:
fixed copy模式:
parallelism*1+1
普通INSERT模式:
parallelism*write_thread_size+1
Spark Task并發(fā)可能受到用戶設(shè)置的參數(shù)影響,也可能受到Hadoop對文件分塊策略的影響。
通過Spark Connector寫入(推薦使用)
Hologres當(dāng)前支持使用內(nèi)置的Spark Connector將Spark數(shù)據(jù)寫入Hologres,相比其他寫入方式,調(diào)用基于Holo Client實(shí)現(xiàn)Connector寫入的方式性能更優(yōu)。具體操作步驟如下,阿里云也為您提供了相關(guān)的使用示例,詳情請參見通過Spark Connector寫入使用示例。
準(zhǔn)備工作
獲取JAR包。
Spark2和Spark3上均已支持Connector寫入,Spark寫入Hologres時(shí)需要引用connector的JAR包,當(dāng)前已經(jīng)發(fā)布到Maven中央倉庫,在項(xiàng)目中參照如下pom文件進(jìn)行配置。
說明相關(guān)Connector也已開源,詳情請參見hologres-connectors。
<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-connector-spark-3.x</artifactId> <version>1.4.0</version> <classifier>jar-with-dependencies</classifier> </dependency>
當(dāng)前Hologres已自動(dòng)生成JAR文件,下載鏈接如下。
使用JAR包。
啟動(dòng)Spark時(shí)執(zhí)行以下命令。
spark-shell --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
或者使用pyspark:
pyspark --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
通過Spark Connector寫入使用示例
根據(jù)如下示例步驟為您介紹,如何通過Spark Connector將數(shù)據(jù)寫入Hologres。
創(chuàng)建Hologres表。
在Hologres中執(zhí)行如下SQL命令創(chuàng)建目標(biāo)表,用來接收數(shù)據(jù)。
CREATE TABLE tb008 ( id BIGINT primary key, counts INT, name TEXT, price NUMERIC(38, 18), out_of_stock BOOL, weight DOUBLE PRECISION, thick FLOAT, time TIMESTAMPTZ, dt DATE, by bytea, inta int4[], longa int8[], floata float4[], doublea float8[], boola boolean[], stringa text[] );
Spark準(zhǔn)備數(shù)據(jù)并寫入Hologres。
在命令行運(yùn)行命令開啟Spark。
spark-shell --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
在spark-shell里使用命令
load spark-test.scala
執(zhí)行測試文件,加載測試示例。spark-test.scala文件示例如下。
import java.sql.{Timestamp, Date} import org.apache.spark.sql.types._ import org.apache.spark.sql.Row val byteArray = Array(1.toByte, 2.toByte, 3.toByte, 'b'.toByte, 'a'.toByte) val intArray = Array(1, 2, 3) val longArray = Array(1L, 2L, 3L) val floatArray = Array(1.2F, 2.44F, 3.77F) val doubleArray = Array(1.222, 2.333, 3.444) val booleanArray = Array(true, false, false) val stringArray = Array("abcd", "bcde", "defg") val data = Seq( Row(-7L, 100, "phone1", BigDecimal(1234.567891234), false, 199.35, 6.7F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("2021-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray), Row(6L, -10, "phone2", BigDecimal(1234.56), true, 188.45, 7.8F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("1970-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray), Row(1L, 10, "phone3\"", BigDecimal(1234.56), true, 111.45, null, Timestamp.valueOf("2020-02-29 00:12:33"), Date.valueOf("2020-07-23"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray) ) val schema = StructType(Array( StructField("id", LongType), StructField("counts", IntegerType), StructField("name", StringType, false), //false表示此Field不允許為null StructField("price", DecimalType(38, 12)), StructField("out_of_stock", BooleanType), StructField("weight", DoubleType), StructField("thick", FloatType), StructField("time", TimestampType), StructField("dt", DateType), StructField("by", BinaryType), StructField("inta", ArrayType(IntegerType)), StructField("longa", ArrayType(LongType)), StructField("floata", ArrayType(FloatType)), StructField("doublea", ArrayType(DoubleType)), StructField("boola", ArrayType(BooleanType)), StructField("stringa", ArrayType(StringType)) )) val df = spark.createDataFrame( spark.sparkContext.parallelize(data), schema ) df.show() //配置導(dǎo)入數(shù)據(jù)至Hologres的信息。 df.write.format("hologres") //必須配置為hologres .option("username", "your_username") //阿里云賬號的AccessKey ID。 .option("password", "your_password") //阿里云賬號的Accesskey SECRET。 .option("endpoint", "Ip:Port") //Hologres的Ip和Port。 .option("database", "test_database") //Hologres的數(shù)據(jù)庫名稱,示例為test_database。 .option("table", "tb008") //Hologres用于接收數(shù)據(jù)的表名稱,示例為tb008。 .option("write_batch_size", 512) // 寫入攢批大小,詳見下方參數(shù)介紹 .option("input_data_schema_ddl", df.schema.toDDL) // Dataframe對應(yīng)的DDL,僅spark3.x需要 .mode(SaveMode.Append) // spark DataFrameWriter接口的SaveMode, 必須為Append;注意與WRITE_MODE不是同一個(gè)參數(shù), 自hologres-connector1.3.3版本開始,支持SaveMode.OverWrite,會(huì)清理原始表中的數(shù)據(jù),請謹(jǐn)慎使用 .save()
查詢寫入的數(shù)據(jù)。
在Hologres側(cè)查詢目標(biāo)表,即可確認(rèn)寫入的數(shù)據(jù),示例如下圖所示。
使用pyspark加載Connector寫入示例
啟動(dòng)pyspark并加載Connector。
pyspark --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
與spark-shell類似,使用元數(shù)據(jù)創(chuàng)建DataFrame之后調(diào)用Connector進(jìn)行寫入。
data = [[1, "Elia"], [2, "Teo"], [3, "Fang"]] df = spark.createDataFrame(data, schema="id LONG, name STRING") df.show() df2.write.format("hologres").option( "username", "your_username").option( "password", "your_password").option( "endpoint", "hologres_endpoint").option( "database", "test_database").option( "table", "tb008").save()
使用Spark SQL加載Connector進(jìn)行寫入
僅Spark3版本的Connector支持SQL方式。
啟動(dòng)Spark SQL并加載Connector。
spark-sql --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
通過Spark SQL DDL,分別創(chuàng)建CSV和Hologres View,進(jìn)行寫入。
CREATE TEMPORARY VIEW csvTable ( c_custkey bigint, c_name string, c_address string, c_nationkey int, c_phone string, c_acctbal decimal(15, 2), c_mktsegment string, c_comment string) USING csv OPTIONS ( path "resources/customer1.tbl", sep "|" ); CREATE TEMPORARY VIEW hologresTable ( c_custkey bigint, c_name string, c_address string, c_nationkey int, c_phone string, c_acctbal decimal(15, 2), c_mktsegment string, c_comment string) USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hologres_endpoint/test_database", username "your_username", password "your_password", table "customer_holo_table", copy_write_mode "true", bulk_load "true", copy_write_format "text" ); -- 目前通過sql創(chuàng)建的hologres view不支持寫入部分列(如insert into hologresTable(c_custkey) select c_custkey from csvTable),寫入時(shí)需要寫入DDL中聲明的所有字段。如果希望寫入部分列,可以建表時(shí)僅聲明需要寫入的字段。 INSERT INTO hologresTable SELECT * FROM csvTable;
通過Spark讀取數(shù)據(jù)源數(shù)據(jù)并寫入Hologres
Spark從數(shù)據(jù)源讀取數(shù)據(jù)。
Spark支持從不同數(shù)據(jù)源讀取數(shù)據(jù),具體數(shù)據(jù)源分類如下。
Spark支持以Hologres為數(shù)據(jù)源。
Hologres兼容PostgreSQL,因?yàn)镾park可以用讀取PostgreSQL的方式讀取Hologres中的數(shù)據(jù)。讀取代碼如下。
說明在使用JDBC方式進(jìn)行讀取前,請前往官網(wǎng)下載Postgresql JDBC Jar,本文以
postgresql-42.2.18
版本為例,在spark-shell啟動(dòng)時(shí)執(zhí)行./bin/spark-shell --jars /path/to/postgresql-42.2.18.jar
加載該jar,可以與hologres-connector的jar包一同加載。// Read from some table, for example: tb008 val readDf = spark.read .format("jdbc") //使用postgresql jdbc driver讀取holo .option("driver","org.postgresql.Driver") .option("url", "jdbc:postgresql://Ip:Por/test_database") .option("dbtable", "tb008") .option("user", "your_username") .option("password", "your_password") .load()
Spark Connector從V1.3.2版本開始支持讀取Hologres,并提供了優(yōu)化的并發(fā)讀取功能。相較于使用PostgreSQL JDBC驅(qū)動(dòng)的方法,還可以設(shè)置讀取的并發(fā)參數(shù),根據(jù)Hologres表的Shard進(jìn)行分片,從而實(shí)現(xiàn)并發(fā)讀取,顯著提升了性能。示例如下。
val spark = SparkSession .builder .appName("ReadFromHologres") .master("local[*]") .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ val schema = StructType(Array( StructField("id", LongType), StructField("counts", IntegerType), StructField("name", StringType, false), StructField("price", DecimalType(38, 12)), StructField("out_of_stock", BooleanType) )) val readDf = spark.read .format("hologres") .schema(schema) // 可選,如果不指定schema,默認(rèn)讀取holo表全部字段 .option("username", "your_username") .option("password", "your_password") .option("jdbcurl", "jdbc:postgresql://hologres_endpoint/test_db") .option("table", "tb008") .option("scan_parallelism", "10") //讀取Hologres時(shí)的默認(rèn)并發(fā)數(shù),最大為holo表的shardcount .load()
Spark支持其他數(shù)據(jù)源(如Parquet格式的文件)。
Spark支持從其他數(shù)據(jù)源中讀取數(shù)據(jù)寫入Hologres中,例如使用Spark從Hive中讀取數(shù)據(jù),代碼如下。
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val hiveContext = new HiveContext(sc) // Read from some table, for example: phone val readDf = hiveContext.sql("select * from hive_database.phone")
Spark將讀到的數(shù)據(jù)寫入Hologres。
import com.alibaba.hologres.spark2.sink.SourceProvider -- Write to hologres table df.write .format("hologres") .option(SourceProvider.USERNAME, "your_username") .option(SourceProvider.PASSWORD, "your_password") .option(SourceProvider.ENDPOINT, "Ip:Port") .option(SourceProvider.DATABASE, "test_database") .option(SourceProvider.TABLE, table) .option(SourceProvider.WRITE_BATCH_SIZE, 512) -- 寫入攢批大小 .option(SourceProvider.INPUT_DATA_SCHEMA_DDL, df.schema.toDDL) -- 僅spark3.x需要 .mode(SaveMode.Append) // 僅spark3.x需要 .save()
通過Spark實(shí)時(shí)寫入數(shù)據(jù)至Hologres
在Hologres創(chuàng)建一張表,用于接收數(shù)據(jù),創(chuàng)建代碼如下。
CREATE TABLE test_table_stream ( value text, count bigint );
讀取本地端口輸入行,進(jìn)行詞頻統(tǒng)計(jì)并實(shí)時(shí)寫入Hologres中,相關(guān)示例代碼如下。
代碼:
val spark = SparkSession .builder .appName("StreamToHologres") .master("local[*]") .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() -- Split the lines into words val words = lines.as[String].flatMap(_.split(" ")) -- Generate running word count val wordCounts = words.groupBy("value").count() wordCounts.writeStream .outputMode(OutputMode.Complete()) .format("hologres") .option(SourceProvider.USERNAME, "your_username") .option(SourceProvider.PASSWORD, "your_password") .option(SourceProvider.JDBCURL, "jdbc:postgresql://Ip:Port/dbname") .option(SourceProvider.TABLE, "test_table_stream") .option("batchsize", 1) .option("isolationLevel", "NONE") .option("checkpointLocation", checkpointLocation) .start() .awaitTermination()
參數(shù)釋義:
參數(shù)名
默認(rèn)值
是否必填
參數(shù)描述
username
無
是
登錄Hologres賬號的AccessKey ID。您可以單擊AccessKey 管理來獲取。
建議您使用環(huán)境變量的方式調(diào)用用戶名和密碼,降低密碼泄露風(fēng)險(xiǎn)。
password
無
是
登錄Hologres賬號的AccessKey Secret。您可以單擊AccessKey 管理來獲取。
建議您使用環(huán)境變量的方式調(diào)用用戶名和密碼,降低密碼泄露風(fēng)險(xiǎn)。
table
無
是
Hologres用于接收數(shù)據(jù)的表名稱。
endpoint
無
與JDBCURL二選一
Hologres實(shí)例的網(wǎng)絡(luò)域名。
您可以進(jìn)入Hologres管理控制臺實(shí)例詳情頁,從網(wǎng)絡(luò)信息獲取主機(jī)和端口號。
database
無
與JDBCURL二選一
Hologres接收數(shù)據(jù)的表所在數(shù)據(jù)庫名稱。
jdbcurl
無
與ENDPOINT+DATABASE組合設(shè)置二選一
Hologres的JDBCURL。
copy_write_mode
true
否
是否使用Fixed Copy方式寫入,F(xiàn)ixed Copy是Hologres V1.3新增的能力,相比INSERT方法,F(xiàn)ixed Copy方式可以更高的吞吐(因?yàn)槭橇髂J剑偷臄?shù)據(jù)延時(shí),更低的客戶端內(nèi)存消耗(因?yàn)椴粩€批)。
說明需要Connector為V1.3.0及以上版本,Hologres引擎版本為V1.3.34及以上版本。
copy_write_format
false
否
僅Copy模式生效,是否進(jìn)行臟數(shù)據(jù)校驗(yàn),打開之后如果有臟數(shù)據(jù),可以定位到寫入失敗的具體行。
說明RecordChecker會(huì)對寫入性能造成一定影響,非排查環(huán)節(jié)不建議開啟。
bulk_load
true
否
是否采用批量Copy方式寫入(與fixed copy不同,fixed copy是流式的)。
說明Hologre V2.1版本對無主鍵表的寫入性能進(jìn)行了優(yōu)化。在Hologre V2.1版本中,無主鍵表的批量寫入操作不再會(huì)導(dǎo)致表鎖,而是采用了行鎖機(jī)制。這可以使其能夠與Fixed Plan并行執(zhí)行,從而提高了數(shù)據(jù)處理的效率和并發(fā)性。
Connector為V1.4.0及以上版本,Hologres引擎需要V2.1.0及以上版本。
max_cell_buffer_size
20971520(20 MB)
否
使用Copy模式寫入時(shí),單個(gè)字段的最大長度。
copy_write_dirty_data_check
false
否
是否進(jìn)行臟數(shù)據(jù)校驗(yàn),打開之后如果有臟數(shù)據(jù),可以定位到寫入失敗的具體行,RecordChecker會(huì)對寫入性能造成一定影響,非排查環(huán)節(jié)不建議開啟。
說明僅Copy模式生效。
copy_write_direct_connect
對于可以直連的環(huán)境會(huì)默認(rèn)使用直連。
否
僅Copy模式生效,Copy的瓶頸往往是VPC Endpoint的網(wǎng)絡(luò)吞吐,因此Hologres會(huì)測試當(dāng)前環(huán)境能否直連holo fe,如果支持則默認(rèn)使用直連。此參數(shù)設(shè)置為false表示不使用直連。
input_data_schema_ddl
無
spark3.x必填,值為
<your_DataFrame>.schema.toDDL
。Spark中DataFrame的DDL。
write_mode
INSERT_OR_REPLACE
否
當(dāng)INSERT目標(biāo)表為有主鍵的表時(shí)采用不同策略。
INSERT_OR_IGNORE:當(dāng)主鍵沖突時(shí),不寫入。
INSERT_OR_UPDATE:當(dāng)主鍵沖突時(shí),更新相應(yīng)列。
INSERT_OR_REPLACE:當(dāng)主鍵沖突時(shí),更新所有列。
write_batch_size
512
否
每個(gè)寫入線程的最大批次大小,在經(jīng)過write_mode合并后的Put數(shù)量達(dá)到write_batch_size時(shí)進(jìn)行一次批量提交。
write_batch_byte_size
2 MB
否
每個(gè)寫入線程的最大批次Byte大小,在經(jīng)過WRITE_MODE合并后的Put數(shù)據(jù)字節(jié)數(shù)達(dá)到WRITE_BATCH_BYTE_SIZE時(shí)進(jìn)行一次批量提交。
write_max_interval_ms
10000 ms
否
距離上次提交超過write_max_interval_ms會(huì)觸發(fā)一次批量提交。
write_fail_strategy
TYR_ONE_BY_ONE
否
當(dāng)某一批次提交失敗時(shí),會(huì)將批次內(nèi)的記錄逐條提交(保序),單條提交失敗的記錄將會(huì)跟隨異常被拋出。
write_thread_size
1
否
寫入并發(fā)線程數(shù)(每個(gè)并發(fā)占用1個(gè)數(shù)據(jù)庫連接)。
在一個(gè)Spark作業(yè)中,占用的總連接數(shù)與Spark并發(fā)相關(guān),關(guān)系為
總連接數(shù)= spark.default.parallelism * WRITE_THREAD_SIZE
。dynamic_partition
false
否
若為true,寫入分區(qū)表父表時(shí),當(dāng)分區(qū)不存在時(shí)自動(dòng)創(chuàng)建分區(qū)。
retry_count
3
否
當(dāng)連接故障時(shí),寫入和查詢的重試次數(shù)。
retry_sleep_init_ms
1000 ms
否
每次重試的等待時(shí)間=retry_sleep_init_ms+retry_count*retry_sleep_step_ms。
retry_sleep_step_ms
10*1000 ms
否
每次重試的等待時(shí)間=retry_sleep_init_ms+retry_count*retry_sleep_step_ms。
connection_max_idle_ms
60000 ms
否
寫入線程和點(diǎn)查線程數(shù)據(jù)庫連接的最大IDLE時(shí)間,超過此時(shí)間的連接將被釋放。
fixed_connection_mode
false
否
非Copy模式(如INSERT默認(rèn))下,寫入和點(diǎn)查場景不占用連接數(shù)。
說明Beta功能,需要Connector版本為V1.2.0及以上版本,Hologres引擎版本為V1.3.0及以上版本。
scan_batch_size
256
否
在從Hologres讀取數(shù)據(jù)時(shí),Scan操作一次獲取的行數(shù)。
scan_timeout_seconds
60
否
在從Hologres讀取數(shù)據(jù)時(shí),Scan操作的超時(shí)時(shí)間,單位:秒(s)。
scan_parallelism
10
否
讀取Hologres時(shí)的分片數(shù)量,最大為Hologres表的Shardcount。作業(yè)運(yùn)行時(shí),這些分片會(huì)被分配到Spark Task上進(jìn)行讀取。
數(shù)據(jù)類型映射
Spark與Hologres的數(shù)據(jù)類型映射如下表所示。
Spark類型 | Hologres類型 |
ShortType | SMALLINT |
IntegerType | INT |
LongType | BIGINT |
StringType | TEXT、JSONB、JSON |
DecimalType | NUMERIC(38, 18) |
BooleanType | BOOL |
DoubleType | DOUBLE PRECISION |
FloatType | FLOAT |
TimestampType | TIMESTAMPTZ |
DateType | DATE |
BinaryType | BYTEA、ROARINGBITMAP |
ArrayType(IntegerType) | int4[] |
ArrayType(LongType) | int8[] |
ArrayType(FloatType | float4[] |
ArrayType(DoubleType) | float8[] |
ArrayType(BooleanType) | boolean[] |
ArrayType(StringType) | text[] |