日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過Spark導入數據

云數據庫 SelectDB 版兼容Apache Doris,支持通過Spark Doris Connector,利用Spark的分布式計算能力導入大批量數據。本文介紹使用Spark Doris Connector同步數據至云數據庫 SelectDB 版的基本原理和使用方式。

功能介紹

Spark Doris Connector是云數據庫 SelectDB 版導入大批量數據的方式之一。基于Spark的分布式計算能力,您可以將上游數據源(MySQL、PostgreSQL、HDFS、S3等)中的大量數據讀取到DataFrame中,再通過Spark Doris Connector導入到云數據庫 SelectDB 版表中。同時,您也可以使用Spark的JDBC方式來讀取云數據庫 SelectDB 版表中的數據。

工作原理

云數據庫 SelectDB 版通過Spark Doris Connector導入數據的工作原理如下圖所示。在這種架構下,Spark Doris Connector通常作為外部數據寫入到云數據庫 SelectDB 版的橋梁,利用其分布式計算集群對數據做預處理,加速了整個數據鏈路的數據流動,從而替代了傳統的低性能JDBC連接寫入方式。

image

前提條件

若使用Spark Doris Connector進行數據導入,必須確保使用的Connector包版本為 1.3.1 及之上。

引入Spark Doris Connector依賴

可以選擇如下任一的方式獲取Doris Connector依賴。

  • 采用Maven時,引入依賴的方式如下所示。更多依賴版本請參見Maven倉庫

    <dependency>
        <groupId>org.apache.doris</groupId>
        <artifactId>spark-doris-connector-3.2_2.12</artifactId>
        <version>1.3.2</version>
    </dependency>
  • 通過JAR包的方式引入Connector。

    以下為您提供了三個常用的Connector,建議根據Spark版本選擇對應的Connector包。更多依賴版本請參見Maven倉庫

    說明
    • 下述JAR包使用Java 8進行編譯,如果您有其他版本的需求,請聯系云數據庫 SelectDB 版技術支持。

    • 下述列表中Connector列從左到右版本依次含義為該jar包支持的Spark版本、使用的Scala版本以及Connector版本。

    Connector

    Runtime JAR

    2.4-2.12-1.3.2

    spark-doris-connector-2.4_2.12-1.3.2

    3.1-2.12-1.3.2

    spark-doris-connector-3.1_2.12-1.3.2

    3.2-2.12-1.3.2

    spark-doris-connector-3.2_2.12-1.3.2

    獲取到Jar包后,可通過如下方式使用:

    • Local方式運行Spark,將下載的JAR包放置于Spark安裝目錄的jars目錄下。

    • Yarn集群模式運行Spark,將JAR文件放入預部署包中。示例如下:

      1. 將spark-doris-connector-3.2_2.12-1.3.2.jar上傳到HDFS。

        hdfs dfs -mkdir /spark-jars/ 
        hdfs dfs -put /<your_local_path>/spark-doris-connector-3.2_2.12-1.3.2.jar/spark-jars/
      2. 在集群中添加spark-doris-connector-3.2_2.12-1.3.2.jar依賴。

        spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.3.2.jar

使用方式

在Spark Client運行Spark后,或者引入Connector包進入Spark開發環境后,您可以通過Spark SQL方式或者Dataframe方式進行數據的同步操作。以下為如何將上游的Spark數據同步到云數據庫 SelectDB 版的示例。

Spark SQL方式

val selectdbHttpPort = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080"
val selectdbJdbc = "jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030"
val selectdbUser = "admin"
val selectdbPwd = "****"
val selectdbTable = "test_db.test_order"
  
CREATE TEMPORARY VIEW test_order
USING doris
OPTIONS(
 "table.identifier"="${selectdbTable}",
 "fenodes"="${selectdbHttpPort}",
 "user"="${selectdbUser}",
 "password"="${selectdbPwd}",
 "sink.properties.format"="json"
);

INSERT INTO test_order SELECT order_id,order_amount,order_status FROM tmp_tb;

參數配置項說明

參數

默認值

是否必填

描述

fenodes

云數據庫 SelectDB 版實例的HTTP協議訪問地址。

您可以從云數據庫 SelectDB 版控制臺的實例詳情 > 網絡信息中獲取VPC地址(或公網地址)和HTTP協議端口

示例:selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080

table.identifier

云數據庫 SelectDB 版實例的表名,格式為:庫名.表名。例如:test_db.test_table

request.retries

3

向SelectDB發送請求的重試次數

request.connect.timeout.ms

30000

向SelectDB發送請求的連接超時時間

request.read.timeout.ms

30000

向SelectDB發送請求的讀取超時時間

request.query.timeout.s

3600

查詢SelectDB的超時時間,默認值為1小時,-1表示無超時限制

request.tablet.size

Integer.MAX_VALUE

一個RDD Partition對應的SelectDB Tablet個數。

此數值設置越小,則會生成越多的Partition。從而提升Spark側的并行度,但同時會對SelectDB造成更大的壓力。

read.field

讀取SelectDB表的列名列表,多列之間使用逗號分隔

batch.size

1024

一次從BE讀取數據的最大行數。增大此數值可減少Spark與SelectDB之間建立連接的次數。從而減輕網絡延遲所帶來的額外時間開銷。

exec.mem.limit

2147483648

單個查詢的內存限制。默認為 2GB,單位為字節。

deserialize.arrow.async

false

是否支持異步轉換Arrow格式到spark-doris-connector迭代所需的RowBatch。

deserialize.queue.size

64

異步轉換Arrow格式的內部處理隊列,當deserialize.arrow.async為true時生效。

write.fields

指定寫入SelectDB表的字段或者字段順序,多列之間使用逗號分隔。默認寫入時要按照SelectDB表字段順序寫入全部字段。

sink.batch.size

100000

單次寫BE的最大行數。

sink.max-retries

0

寫BE失敗之后的重試次數。

sink.properties.format

csv

Stream Load的數據格式。共支持3種格式:csv,json,arrow。

sink.properties.*

--

Stream Load的導入參數。例如:指定列分隔符:'sink.properties.column_separator' = ','。參數更多信息,請參見Stream Load

sink.task.partition.size

SelectDB寫入任務對應的Partition個數。Spark RDD經過過濾等操作,最后寫入的 Partition 數可能會比較大,但每個Partition對應的記錄數比較少,導致寫入頻率增加和計算資源浪費。

此數值設置越小,可以降低SelectDB寫入頻率,減少SelectDB合并壓力。該參數配合 sink.task.use.repartition使用。

sink.task.use.repartition

false

是否采用repartition方式控制SelectDB寫入Partition數。默認值為 false,采用coalesce方式控制(注意:如果在寫入之前沒有 Spark action 算子,可能會導致整個計算并行度降低)。

如果設置為true,則采用repartition方式(注意:可設置最后Partition數,但會額外增加shuffle開銷)。

sink.batch.interval.ms

50

每個批次sink的間隔時間,單位 ms。

sink.enable-2pc

false

是否開啟兩階段提交。開啟后將會在作業結束時提交事務,而部分任務失敗時會將所有預提交狀態的事務會滾。

sink.auto-redirect

true

是否重定向 StreamLoad 請求。開啟后 StreamLoad 將通過 FE 寫入,不再顯式獲取 BE 信息。

user

訪問云數據庫 SelectDB 版實例的用戶名。

password

訪問云數據庫 SelectDB 版實例的密碼。

filter.query.in.max.count

100

謂詞下推中,in表達式value列表元素最大數量。超過此數量,則in表達式條件過濾在Spark側處理。

ignore-type

指在定臨時視圖中,讀取 schema 時要忽略的字段類型。

例如:'ignore-type'='bitmap,hll'

DataFrame方式

val spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
  ("1", 100, "待付款"),
  ("2", 200, null),
  ("3", 300, "已收貨")
)).toDF("order_id", "order_amount", "order_status")

df.write
  .format("doris")
  .option("fenodes", selectdbHttpPort)
  .option("table.identifier", selectdbTable)
  .option("user", selectdbUser)
  .option("password", selectdbPwd)
  .option("sink.batch.size", 100000)
  .option("sink.max-retries", 3)
  .option("sink.properties.file.column_separator", "\t")
  .option("sink.properties.file.line_delimiter", "\n")
  .save()

參數配置項說明

參數

默認值

是否必填

描述

fenodes

云數據庫 SelectDB 版實例的HTTP協議訪問地址。

您可以從云數據庫 SelectDB 版控制臺的實例詳情 > 網絡信息中獲取VPC地址(或公網地址)和HTTP協議端口

示例:selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080

table.identifier

云數據庫 SelectDB 版實例的表名,格式為:庫名.表名。例如:test_db.test_table

request.retries

3

向SelectDB發送請求的重試次數

request.connect.timeout.ms

30000

向SelectDB發送請求的連接超時時間

request.read.timeout.ms

30000

向SelectDB發送請求的讀取超時時間

request.query.timeout.s

3600

查詢SelectDB的超時時間,默認值為1小時,-1表示無超時限制

request.tablet.size

Integer.MAX_VALUE

一個RDD Partition對應的SelectDB Tablet個數。

此數值設置越小,則會生成越多的Partition。從而提升Spark側的并行度,但同時會對SelectDB造成更大的壓力。

read.field

讀取SelectDB表的列名列表,多列之間使用逗號分隔

batch.size

1024

一次從BE讀取數據的最大行數。增大此數值可減少Spark與SelectDB之間建立連接的次數。從而減輕網絡延遲所帶來的額外時間開銷。

exec.mem.limit

2147483648

單個查詢的內存限制。默認為 2GB,單位為字節。

deserialize.arrow.async

false

是否支持異步轉換Arrow格式到spark-doris-connector迭代所需的RowBatch。

deserialize.queue.size

64

異步轉換Arrow格式的內部處理隊列,當deserialize.arrow.async為true時生效。

write.fields

指定寫入SelectDB表的字段或者字段順序,多列之間使用逗號分隔。默認寫入時要按照SelectDB表字段順序寫入全部字段。

sink.batch.size

100000

單次寫BE的最大行數。

sink.max-retries

0

寫BE失敗之后的重試次數。

sink.properties.format

csv

Stream Load的數據格式。共支持3種格式:csv,json,arrow。

sink.properties.*

--

Stream Load的導入參數。例如:指定列分隔符:'sink.properties.column_separator' = ','。更多參數請參考:Stream Load

sink.task.partition.size

SelectDB寫入任務對應的Partition個數。Spark RDD經過過濾等操作,最后寫入的 Partition 數可能會比較大,但每個Partition對應的記錄數比較少,導致寫入頻率增加和計算資源浪費。

此數值設置越小,可以降低SelectDB寫入頻率,減少SelectDB合并壓力。該參數配合 sink.task.use.repartition使用。

sink.task.use.repartition

false

是否采用repartition方式控制SelectDB寫入Partition數。默認值為 false,采用coalesce方式控制(注意:如果在寫入之前沒有 Spark action 算子,可能會導致整個計算并行度降低)。

如果設置為true,則采用repartition方式(注意:設置最后Partition數,但會額外增加shuffle開銷)。

sink.batch.interval.ms

50

每個批次sink的間隔時間,單位 ms。

sink.enable-2pc

false

是否開啟兩階段提交。開啟后將會在作業結束時提交事務,而部分任務失敗時會將所有預提交狀態的事務會滾。

sink.auto-redirect

true

是否重定向 StreamLoad 請求。開啟后 StreamLoad 將通過 FE 寫入, 不再顯式獲取 BE 信息。

user

訪問云數據庫 SelectDB 版實例的用戶名。

password

訪問云數據庫 SelectDB 版實例的密碼。

filter.query.in.max.count

100

謂詞下推中,in表達式value列表元素最大數量。超過此數量,則in表達式條件過濾在Spark側處理。

ignore-type

指在定臨時視圖中,讀取 schema 時要忽略的字段類型。

例如:'ignore-type'='bitmap,hll'

sink.streaming.passthrough

false

將第一列的值不經過處理直接寫入。

使用示例

示例環境中各個軟件的版本如下:

軟件

Java

Spark

Scala

SelectDB

版本

1.8

3.1.2

2.12

3.0.4

環境準備

  • 配置Spark環境。

    1. 下載并解壓Spark安裝包。本示例中使用Spark安裝包:spark-3.1.2-bin-hadoop3.2.tgz。

      wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
      tar xvzf spark-3.1.2-bin-hadoop3.2.tgz
    2. 將spark-doris-connector-3.2_2.12-1.3.2.jar放到SPARK_HOME/jars目錄下。

  • 構造需要導入的數據。本文以MySQL為例,構造少量樣例數據來完成導入。

    1. 創建MySQL測試表。

      CREATE TABLE `employees` (
        `emp_no` int NOT NULL,
        `birth_date` date NOT NULL,
        `first_name` varchar(14) NOT NULL,
        `last_name` varchar(16) NOT NULL,
        `gender` enum('M','F') NOT NULL,
        `hire_date` date NOT NULL,
        PRIMARY KEY (`emp_no`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
    2. 使用DMS構建測試數據,詳情請參見測試數據構建

  • 配置云數據庫 SelectDB 版實例。

    1. 創建云數據庫 SelectDB 版實例,詳情請參見創建實例

    2. 通過MySQL協議連接云數據庫 SelectDB 版實例,詳情請參見連接實例

    3. 創建測試數據庫和測試表。

      1. 創建測試數據庫。

        CREATE DATABASE test_db;
      2. 創建測試表。

        USE test_db;
        CREATE TABLE employees (
            emp_no       int NOT NULL,
            birth_date   date,
            first_name   varchar(20),
            last_name    varchar(20),
            gender       char(2),
            hire_date    date
        )
        UNIQUE KEY(`emp_no`)
        DISTRIBUTED BY HASH(`emp_no`) BUCKETS 32;
    4. 開通云數據庫 SelectDB 版公網地址,詳情請參見申請和釋放公網地址

    5. 將Spark環境的公網IP添加到IP白名單中,詳情請參見設置白名單

同步MySQL數據到SelectDB

Spark SQL方式

本示例為如何使用Spark SQL方式將上游的MySQL數據導入至云數據庫 SelectDB 版

  1. 啟動spark-sql服務。

    bin/spark-sql
  2. 在spark-sql上提交任務。

    CREATE TEMPORARY VIEW mysql_tbl
    USING jdbc
    OPTIONS(
     "url"="jdbc:mysql://host:port/test_db",
     "dbtable"="employees",
     "driver"="com.mysql.jdbc.Driver",
     "user"="admin",
     "password"="****"
    );
    
    CREATE TEMPORARY VIEW selectdb_tbl
    USING doris
    OPTIONS(
     "table.identifier"="test_db.employees",
     "fenodes"="selectdb-cn-****-public.selectdbfe.rds.aliyuncs.com:8080",
     "user"="admin",
     "password"="****",
     "sink.properties.format"="json"
    );
    
    INSERT INTO selectdb_tbl SELECT emp_no, birth_date, first_name, last_name, gender, hire_date FROM mysql_tbl;
  3. Spark任務執行完成后,登錄云數據庫 SelectDB 版,查看通過Spark導入的數據。

DataFrame方式

本示例為如何使用DataFrame方式將上游的MySQL數據導入至云數據庫 SelectDB 版

  1. 啟動spark-shell服務。

    bin/spark-shell
  2. 在spark-shell上提交任務。

    val mysqlDF = spark.read.format("jdbc")
    	.option("url", "jdbc:mysql://host:port/test_db")
    	.option("dbtable", "employees")
    	.option("driver", "com.mysql.jdbc.Driver")
    	.option("user", "admin")
    	.option("password", "****")
    	.load()
    
    mysqlDF.write.format("doris")
    	.option("fenodes", "host:httpPort")
    	.option("table.identifier", "test_db.employees")
    	.option("user", "admin")
    	.option("password", "****")
    	.option("sink.batch.size", 100000)
    	.option("sink.max-retries", 3)
    	.option("sink.properties.format", "json")
    	.save()
  3. Spark任務執行完成后,登錄云數據庫 SelectDB 版,查看通過Spark導入的數據。