SeaTunnel集成云數據庫 SelectDB 版,支持使用SeaTunnel SelectDB Sink導入表數據至云數據庫 SelectDB 版。本文將為您介紹使用SeaTunnel SelectDB Sink同步數據至云數據庫 SelectDB 版的使用方式。
概述
SeaTunnel是一款簡單易用、高性能的分布式數據集成平臺,支持海量數據實時同步。您可以通過SeaTunnel平臺讀取MySQL、Hive、Kafka等數據源中的海量數據,然后由SeaTunnel SelectDB Sink將數據寫入到云數據庫 SelectDB 版中。
前提條件
SeaTunnel 2.3.1版本及以上。
使用方式
SeaTunnel支持以JSON格式或CSV格式將上游數據寫入到云數據庫 SelectDB 版,不同寫入方式的配置語法如下。
JSON格式
sink {
SelectDB {
load-url="ip:http_port"
jdbc-url="ip:mysql_port"
cluster-name="Cluster"
table.identifier="test_db.test_table"
username="admin"
password="****"
selectdb.config {
file.type="json"
}
}
}
CSV格式
sink {
SelectDB {
load-url="ip:http_port"
jdbc-url="ip:mysql_port"
cluster-name="Cluster"
table.identifier="test_db.test_table"
username="admin"
password="****"
selectdb.config {
file.type="csv"
file.column_separator=","
file.line_delimiter="\n"
}
}
}
參數說明如下。
參數 | 是否必填 | 說明 |
load-url | 是 | 云數據庫 SelectDB 版實例的訪問地址和HTTP協議端口。 您可以從云數據庫 SelectDB 版控制臺的實例詳情 > 網絡信息中獲取VPC地址(或公網地址)和HTTP協議端口。 示例: |
jdbc-url | 是 | 云數據庫 SelectDB 版實例的訪問地址和MySQL協議端口。 您可以從云數據庫 SelectDB 版控制臺的實例詳情 > 網絡信息中獲取VPC地址(或公網地址)和MySQL協議端口。 示例: |
cluster-name | 是 | 云數據庫 SelectDB 版實例中的集群名稱。實例中可能包含多個集群,可按需選擇。 |
username | 是 | 云數據庫 SelectDB 版實例的用戶名。 |
password | 是 | 云數據庫 SelectDB 版實例對應用戶名的密碼。 |
table.identifier | 是 | 云數據庫 SelectDB 版實例的表名,格式為 |
selectdb.config | 是 | 寫入任務的屬性配置。
|
sink.enable-delete | 否 | 是否開啟批量刪除功能(僅支持Unique表)。 |
sink.buffer-size | 否 | 緩存的最大容量,單位字節,默認為:10MB,當緩存超過最大容量時,會將緩存中的內容全部flush到對象存儲上,不建議修改。 |
sink.buffer-count | 否 | 緩存的最大條數,默認為:10000,當緩存超過最大條數時,會將緩存中的內容全部flush到對象存儲上,不建議修改。 |
sink.max-retries | 否 | Commit階段的最大重試次數。默認3次。 |
sink.enable-2pc | 否 | 是否啟用兩階段提交,以確保exact-once語義。默認為true。 |
使用示例
以MySQL數據源為例,為您介紹如何通過SeaTunnel將上游的MySQL數據導入至云數據庫 SelectDB 版。示例中各軟件版本如下:
環境 | 版本 |
JDK | 1.8 |
SeaTunnel | 2.3.3 |
SelectDB | 3.0.4 |
環境準備
配置SeaTunnel環境。
下載并解壓SeaTunnel安裝包。本示例中使用SeaTunnel安裝包:apache-seatunnel-2.3.3-bin.tar.gz。
wget https://dlcdn.apache.org/seatunnel/2.3.3/apache-seatunnel-2.3.3-bin.tar.gz tar -xzvf apache-seatunnel-2.3.3-bin.tar.gz
修改SEATUNNEL_HOME/config/plugin_config配置文件,保留需要的Connector插件。
--connectors-v2-- connector-cdc-mysql connector-selectdb-cloud connector-jdbc connector-fake connector-console connector-assert --end--
安裝SeaTunnel Connector插件。
sh bin/install-plugin.sh
下載MySQL驅動并放至SEATUNNEL_HOME/jar目錄。
cd lib/ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
構造需要導入的數據。本文以MySQL為例,構造少量樣例數據來完成導入。
創建MySQL測試表。
CREATE TABLE `employees` ( `emp_no` INT NOT NULL, `birth_date` DATE NOT NULL, `first_name` VARCHAR(14) NOT NULL, `last_name` VARCHAR(16) NOT NULL, `gender` ENUM('M','F') NOT NULL, `hire_date` DATE NOT NULL, PRIMARY KEY (`emp_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
使用DMS構建測試數據,詳情請參見測試數據構建。
配置云數據庫 SelectDB 版實例。
創建云數據庫 SelectDB 版實例,詳情請參見創建實例。
通過MySQL協議連接云數據庫 SelectDB 版實例,詳情請參見連接實例。
創建測試數據庫和測試表。
創建測試數據庫。
CREATE DATABASE test_db;
創建測試表。
USE test_db; CREATE TABLE employees ( emp_no INT NOT NULL, birth_date DATE, first_name VARCHAR(20), last_name VARCHAR(20), gender CHAR(2), hire_date DATE ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
開通云數據庫 SelectDB 版公網地址,詳情請參見申請和釋放公網地址。
將SeaTunnel環境的公網IP添加到IP白名單中,詳情請參見設置白名單。
通過SuaTunnel本地引擎同步MySQL數據到SelectDB
創建配置文件
mysqlToSelectDB.conf
,配置任務信息。env { execution.parallelism = 2 job.mode = "BATCH" checkpoint.interval = 10000 } source{ jdbc { url = "jdbc:mysql://host:ip/test_db" driver = "com.mysql.cj.jdbc.Driver" user = "admin" password = "****" query = "select * from employees" } } sink { SelectDBCloud { load-url="selectdb-cn-pe33hab****-public.selectdbfe.rds.aliyuncs.com:8080" jdbc-url="selectdb-cn-pe33hab****-public.selectdbfe.rds.aliyuncs.com:9030" cluster-name="new_cluster" table.identifier="test_db.employees" username="admin" password="****" selectdb.config { file.type="json" } } }
命令行提交任務。
sh ./bin/seatunnel.sh --config ./mysqlToSelectDB.conf -e local