云數據庫 SelectDB 版兼容Apache Doris,支持通過Spark Doris Connector,利用Spark的分布式計算能力導入大批量數據。本文介紹使用Spark Doris Connector同步數據至云數據庫 SelectDB 版的基本原理和使用方式。
功能介紹
Spark Doris Connector是云數據庫 SelectDB 版導入大批量數據的方式之一。基于Spark的分布式計算能力,您可以將上游數據源(MySQL、PostgreSQL、HDFS、S3等)中的大量數據讀取到DataFrame中,再通過Spark Doris Connector導入到云數據庫 SelectDB 版表中。同時,您也可以使用Spark的JDBC方式來讀取云數據庫 SelectDB 版表中的數據。
工作原理
云數據庫 SelectDB 版通過Spark Doris Connector導入數據的工作原理如下圖所示。在這種架構下,Spark Doris Connector通常作為外部數據寫入到云數據庫 SelectDB 版的橋梁,利用其分布式計算集群對數據做預處理,加速了整個數據鏈路的數據流動,從而替代了傳統的低性能JDBC連接寫入方式。
前提條件
若使用Spark Doris Connector進行數據導入,必須確保使用的Connector包版本為 1.3.1 及之上。
引入Spark Doris Connector依賴
可以選擇如下任一的方式獲取Doris Connector依賴。
采用Maven時,引入依賴的方式如下所示。更多依賴版本請參見Maven倉庫。
<dependency> <groupId>org.apache.doris</groupId> <artifactId>spark-doris-connector-3.2_2.12</artifactId> <version>1.3.2</version> </dependency>
通過JAR包的方式引入Connector。
以下為您提供了三個常用的Connector,建議根據Spark版本選擇對應的Connector包。更多依賴版本請參見Maven倉庫。
說明下述JAR包使用Java 8進行編譯,如果您有其他版本的需求,請聯系云數據庫 SelectDB 版技術支持。
下述列表中Connector列從左到右版本依次含義為該jar包支持的Spark版本、使用的Scala版本以及Connector版本。
Connector
Runtime JAR
2.4-2.12-1.3.2
3.1-2.12-1.3.2
3.2-2.12-1.3.2
獲取到Jar包后,可通過如下方式使用:
Local方式運行Spark,將下載的JAR包放置于Spark安裝目錄的jars目錄下。
Yarn集群模式運行Spark,將JAR文件放入預部署包中。示例如下:
將spark-doris-connector-3.2_2.12-1.3.2.jar上傳到HDFS。
hdfs dfs -mkdir /spark-jars/ hdfs dfs -put /<your_local_path>/spark-doris-connector-3.2_2.12-1.3.2.jar/spark-jars/
在集群中添加spark-doris-connector-3.2_2.12-1.3.2.jar依賴。
spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.3.2.jar
使用方式
在Spark Client運行Spark后,或者引入Connector包進入Spark開發環境后,您可以通過Spark SQL方式或者Dataframe方式進行數據的同步操作。以下為如何將上游的Spark數據同步到云數據庫 SelectDB 版的示例。
Spark SQL方式
val selectdbHttpPort = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080"
val selectdbJdbc = "jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030"
val selectdbUser = "admin"
val selectdbPwd = "****"
val selectdbTable = "test_db.test_order"
CREATE TEMPORARY VIEW test_order
USING doris
OPTIONS(
"table.identifier"="${selectdbTable}",
"fenodes"="${selectdbHttpPort}",
"user"="${selectdbUser}",
"password"="${selectdbPwd}",
"sink.properties.format"="json"
);
INSERT INTO test_order SELECT order_id,order_amount,order_status FROM tmp_tb;
參數配置項說明
參數 | 默認值 | 是否必填 | 描述 |
fenodes | 無 | 是 | 云數據庫 SelectDB 版實例的HTTP協議訪問地址。 您可以從云數據庫 SelectDB 版控制臺的實例詳情 > 網絡信息中獲取VPC地址(或公網地址)和HTTP協議端口。 示例: |
table.identifier | 無 | 是 | 云數據庫 SelectDB 版實例的表名,格式為: |
request.retries | 3 | 否 | 向SelectDB發送請求的重試次數 |
request.connect.timeout.ms | 30000 | 否 | 向SelectDB發送請求的連接超時時間 |
request.read.timeout.ms | 30000 | 否 | 向SelectDB發送請求的讀取超時時間 |
request.query.timeout.s | 3600 | 否 | 查詢SelectDB的超時時間,默認值為1小時,-1表示無超時限制 |
request.tablet.size | Integer.MAX_VALUE | 否 | 一個RDD Partition對應的SelectDB Tablet個數。 此數值設置越小,則會生成越多的Partition。從而提升Spark側的并行度,但同時會對SelectDB造成更大的壓力。 |
read.field | 無 | 否 | 讀取SelectDB表的列名列表,多列之間使用逗號分隔 |
batch.size | 1024 | 否 | 一次從BE讀取數據的最大行數。增大此數值可減少Spark與SelectDB之間建立連接的次數。從而減輕網絡延遲所帶來的額外時間開銷。 |
exec.mem.limit | 2147483648 | 否 | 單個查詢的內存限制。默認為 2GB,單位為字節。 |
deserialize.arrow.async | false | 否 | 是否支持異步轉換Arrow格式到spark-doris-connector迭代所需的RowBatch。 |
deserialize.queue.size | 64 | 否 | 異步轉換Arrow格式的內部處理隊列,當 |
write.fields | 無 | 否 | 指定寫入SelectDB表的字段或者字段順序,多列之間使用逗號分隔。默認寫入時要按照SelectDB表字段順序寫入全部字段。 |
sink.batch.size | 100000 | 否 | 單次寫BE的最大行數。 |
sink.max-retries | 0 | 否 | 寫BE失敗之后的重試次數。 |
sink.properties.format | csv | 否 | Stream Load的數據格式。共支持3種格式:csv,json,arrow。 |
sink.properties.* | -- | 否 | Stream Load的導入參數。例如:指定列分隔符: |
sink.task.partition.size | 無 | 否 | SelectDB寫入任務對應的Partition個數。Spark RDD經過過濾等操作,最后寫入的 Partition 數可能會比較大,但每個Partition對應的記錄數比較少,導致寫入頻率增加和計算資源浪費。 此數值設置越小,可以降低SelectDB寫入頻率,減少SelectDB合并壓力。該參數配合 |
sink.task.use.repartition | false | 否 | 是否采用repartition方式控制SelectDB寫入Partition數。默認值為 false,采用coalesce方式控制(注意:如果在寫入之前沒有 Spark action 算子,可能會導致整個計算并行度降低)。 如果設置為true,則采用repartition方式(注意:可設置最后Partition數,但會額外增加shuffle開銷)。 |
sink.batch.interval.ms | 50 | 否 | 每個批次sink的間隔時間,單位 ms。 |
sink.enable-2pc | false | 否 | 是否開啟兩階段提交。開啟后將會在作業結束時提交事務,而部分任務失敗時會將所有預提交狀態的事務會滾。 |
sink.auto-redirect | true | 否 | 是否重定向 StreamLoad 請求。開啟后 StreamLoad 將通過 FE 寫入,不再顯式獲取 BE 信息。 |
user | 無 | 是 | 訪問云數據庫 SelectDB 版實例的用戶名。 |
password | 無 | 是 | 訪問云數據庫 SelectDB 版實例的密碼。 |
filter.query.in.max.count | 100 | 否 | 謂詞下推中,in表達式value列表元素最大數量。超過此數量,則in表達式條件過濾在Spark側處理。 |
ignore-type | 無 | 否 | 指在定臨時視圖中,讀取 schema 時要忽略的字段類型。 例如: |
DataFrame方式
val spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
("1", 100, "待付款"),
("2", 200, null),
("3", 300, "已收貨")
)).toDF("order_id", "order_amount", "order_status")
df.write
.format("doris")
.option("fenodes", selectdbHttpPort)
.option("table.identifier", selectdbTable)
.option("user", selectdbUser)
.option("password", selectdbPwd)
.option("sink.batch.size", 100000)
.option("sink.max-retries", 3)
.option("sink.properties.file.column_separator", "\t")
.option("sink.properties.file.line_delimiter", "\n")
.save()
參數配置項說明
參數 | 默認值 | 是否必填 | 描述 |
fenodes | 無 | 是 | 云數據庫 SelectDB 版實例的HTTP協議訪問地址。 您可以從云數據庫 SelectDB 版控制臺的實例詳情 > 網絡信息中獲取VPC地址(或公網地址)和HTTP協議端口。 示例: |
table.identifier | 無 | 是 | 云數據庫 SelectDB 版實例的表名,格式為: |
request.retries | 3 | 否 | 向SelectDB發送請求的重試次數 |
request.connect.timeout.ms | 30000 | 否 | 向SelectDB發送請求的連接超時時間 |
request.read.timeout.ms | 30000 | 否 | 向SelectDB發送請求的讀取超時時間 |
request.query.timeout.s | 3600 | 否 | 查詢SelectDB的超時時間,默認值為1小時,-1表示無超時限制 |
request.tablet.size | Integer.MAX_VALUE | 否 | 一個RDD Partition對應的SelectDB Tablet個數。 此數值設置越小,則會生成越多的Partition。從而提升Spark側的并行度,但同時會對SelectDB造成更大的壓力。 |
read.field | 無 | 否 | 讀取SelectDB表的列名列表,多列之間使用逗號分隔 |
batch.size | 1024 | 否 | 一次從BE讀取數據的最大行數。增大此數值可減少Spark與SelectDB之間建立連接的次數。從而減輕網絡延遲所帶來的額外時間開銷。 |
exec.mem.limit | 2147483648 | 否 | 單個查詢的內存限制。默認為 2GB,單位為字節。 |
deserialize.arrow.async | false | 否 | 是否支持異步轉換Arrow格式到spark-doris-connector迭代所需的RowBatch。 |
deserialize.queue.size | 64 | 否 | 異步轉換Arrow格式的內部處理隊列,當 |
write.fields | 無 | 否 | 指定寫入SelectDB表的字段或者字段順序,多列之間使用逗號分隔。默認寫入時要按照SelectDB表字段順序寫入全部字段。 |
sink.batch.size | 100000 | 否 | 單次寫BE的最大行數。 |
sink.max-retries | 0 | 否 | 寫BE失敗之后的重試次數。 |
sink.properties.format | csv | 否 | Stream Load的數據格式。共支持3種格式:csv,json,arrow。 |
sink.properties.* | -- | 否 | Stream Load的導入參數。例如:指定列分隔符: |
sink.task.partition.size | 無 | 否 | SelectDB寫入任務對應的Partition個數。Spark RDD經過過濾等操作,最后寫入的 Partition 數可能會比較大,但每個Partition對應的記錄數比較少,導致寫入頻率增加和計算資源浪費。 此數值設置越小,可以降低SelectDB寫入頻率,減少SelectDB合并壓力。該參數配合 |
sink.task.use.repartition | false | 否 | 是否采用repartition方式控制SelectDB寫入Partition數。默認值為 false,采用coalesce方式控制(注意:如果在寫入之前沒有 Spark action 算子,可能會導致整個計算并行度降低)。 如果設置為true,則采用repartition方式(注意:設置最后Partition數,但會額外增加shuffle開銷)。 |
sink.batch.interval.ms | 50 | 否 | 每個批次sink的間隔時間,單位 ms。 |
sink.enable-2pc | false | 否 | 是否開啟兩階段提交。開啟后將會在作業結束時提交事務,而部分任務失敗時會將所有預提交狀態的事務會滾。 |
sink.auto-redirect | true | 否 | 是否重定向 StreamLoad 請求。開啟后 StreamLoad 將通過 FE 寫入, 不再顯式獲取 BE 信息。 |
user | 無 | 是 | 訪問云數據庫 SelectDB 版實例的用戶名。 |
password | 無 | 是 | 訪問云數據庫 SelectDB 版實例的密碼。 |
filter.query.in.max.count | 100 | 否 | 謂詞下推中,in表達式value列表元素最大數量。超過此數量,則in表達式條件過濾在Spark側處理。 |
ignore-type | 無 | 否 | 指在定臨時視圖中,讀取 schema 時要忽略的字段類型。 例如: |
sink.streaming.passthrough | false | 否 | 將第一列的值不經過處理直接寫入。 |
使用示例
示例環境中各個軟件的版本如下:
軟件 | Java | Spark | Scala | SelectDB |
版本 | 1.8 | 3.1.2 | 2.12 | 3.0.4 |
環境準備
配置Spark環境。
下載并解壓Spark安裝包。本示例中使用Spark安裝包:spark-3.1.2-bin-hadoop3.2.tgz。
wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz tar xvzf spark-3.1.2-bin-hadoop3.2.tgz
將spark-doris-connector-3.2_2.12-1.3.2.jar放到SPARK_HOME/jars目錄下。
構造需要導入的數據。本文以MySQL為例,構造少量樣例數據來完成導入。
創建MySQL測試表。
CREATE TABLE `employees` ( `emp_no` int NOT NULL, `birth_date` date NOT NULL, `first_name` varchar(14) NOT NULL, `last_name` varchar(16) NOT NULL, `gender` enum('M','F') NOT NULL, `hire_date` date NOT NULL, PRIMARY KEY (`emp_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
使用DMS構建測試數據,詳情請參見測試數據構建。
配置云數據庫 SelectDB 版實例。
創建云數據庫 SelectDB 版實例,詳情請參見創建實例。
通過MySQL協議連接云數據庫 SelectDB 版實例,詳情請參見連接實例。
創建測試數據庫和測試表。
創建測試數據庫。
CREATE DATABASE test_db;
創建測試表。
USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 32;
開通云數據庫 SelectDB 版公網地址,詳情請參見申請和釋放公網地址。
將Spark環境的公網IP添加到IP白名單中,詳情請參見設置白名單。
同步MySQL數據到SelectDB
Spark SQL方式
本示例為如何使用Spark SQL方式將上游的MySQL數據導入至云數據庫 SelectDB 版。
啟動spark-sql服務。
bin/spark-sql
在spark-sql上提交任務。
CREATE TEMPORARY VIEW mysql_tbl USING jdbc OPTIONS( "url"="jdbc:mysql://host:port/test_db", "dbtable"="employees", "driver"="com.mysql.jdbc.Driver", "user"="admin", "password"="****" ); CREATE TEMPORARY VIEW selectdb_tbl USING doris OPTIONS( "table.identifier"="test_db.employees", "fenodes"="selectdb-cn-****-public.selectdbfe.rds.aliyuncs.com:8080", "user"="admin", "password"="****", "sink.properties.format"="json" ); INSERT INTO selectdb_tbl SELECT emp_no, birth_date, first_name, last_name, gender, hire_date FROM mysql_tbl;
Spark任務執行完成后,登錄云數據庫 SelectDB 版,查看通過Spark導入的數據。
DataFrame方式
本示例為如何使用DataFrame方式將上游的MySQL數據導入至云數據庫 SelectDB 版。
啟動spark-shell服務。
bin/spark-shell
在spark-shell上提交任務。
val mysqlDF = spark.read.format("jdbc") .option("url", "jdbc:mysql://host:port/test_db") .option("dbtable", "employees") .option("driver", "com.mysql.jdbc.Driver") .option("user", "admin") .option("password", "****") .load() mysqlDF.write.format("doris") .option("fenodes", "host:httpPort") .option("table.identifier", "test_db.employees") .option("user", "admin") .option("password", "****") .option("sink.batch.size", 100000) .option("sink.max-retries", 3) .option("sink.properties.format", "json") .save()
Spark任務執行完成后,登錄云數據庫 SelectDB 版,查看通過Spark導入的數據。