云數據庫 SelectDB 版集成BitSail,支持使用SelectDB Sink導入表數據至云數據庫 SelectDB 版。本文將為您介紹使用SelectDB Sink同步數據至云數據庫 SelectDB 版的使用方式。
概述
BitSail是一款基于分布式架構的高性能數據集成引擎,支持多種異構數據源間的數據同步,并提供離線、實時、全量、增量場景下的全域數據集成解決方案。您可以通過BitSail引擎讀取MySQL、Hive、Kafka等數據源中的海量數據,然后由SelectDB Sink將數據寫入到云數據庫 SelectDB 版。
前提條件
BitSail 0.1.0版本及以上。
使用方式
BitSail的job.writer
中配置SelectDB連接器參數,示例如下。
{
"job": {
"writer": {
"class": "com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink",
"load_url": "<selectdb_http_address>",
"jdbc_url": "<selectdb_mysql_address>",
"cluster_name": "<selectdb_cluster_name>",
"user": "<username>",
"password": "<password>",
"table_identifier": "<selectdb_table_identifier>",
"columns": [
{
"index": 0,
"name": "id",
"type": "int"
},
{
"index": 1,
"name": "bigint_type",
"type": "bigint"
},
{
"index": 2,
"name": "string_type",
"type": "varchar"
},
{
"index": 3,
"name": "double_type",
"type": "double"
},
{
"index": 4,
"name": "date_type",
"type": "date"
}
]
}
}
}
參數說明如下。
參數名稱 | 是否必填 | 參數含義 |
class | 是 | 云數據庫 SelectDB 版寫連接器類型,默認為: |
load_url | 是 | 云數據庫 SelectDB 版實例的訪問地址和HTTP協議端口。 您可以從云數據庫 SelectDB 版控制臺的實例詳情 > 網絡信息中獲取VPC地址(或公網地址)和HTTP協議端口。 示例: |
jdbc_url | 是 | 云數據庫 SelectDB 版實例的訪問地址和MySQL協議端口。 您可以從云數據庫 SelectDB 版控制臺的實例詳情 > 網絡信息中獲取VPC地址(或公網地址)和MySQL協議端口。 示例: |
cluster_name | 是 | 云數據庫 SelectDB 版實例的集群名。實例中可能包含多個集群,可按需選擇。 |
user | 是 | 云數據庫 SelectDB 版實例的用戶名。 |
password | 是 | 云數據庫 SelectDB 版實例對應用戶名的密碼。 |
table_identifier | 是 | 云數據庫 SelectDB 版實例的表名,格式為 |
writer_parallelism_num | 否 | 指定SelectDB寫并發數量。 |
sink_flush_interval_ms | 否 | Upsert模式下的flush間隔,單位毫秒;默認為:5000。 |
sink_max_retries | 否 | 寫入的最大重試次數,默認為:3。 |
sink_buffer_size | 否 | 寫入buffer最大值,單位字節;默認為:1048576 (1 MB)。 |
sink_buffer_count | 否 | 初始化buffer的數量,默認為:3。 |
sink_enable_delete | 否 | 是否支持Delete事件同步。 |
sink_write_mode | 否 | 寫入模式,目前僅支持BATCH_UPSERT。 |
stream_load_properties | 否 | 追加在Stream Load URL后的參數,以Map<String,String>格式組成。 |
load_contend_type | 否 | copy-into使用的格式。取值范圍為CSV或JSON。默認JSON。 |
csv_field_delimiter | 否 | CSV格式的行內分隔符,默認為:逗號","。 |
csv_line_delimiter | 否 | CSV格式的行間分隔符,默認為:"\n"。 |
使用示例
下文使用BItSail構造數據源為例,為您介紹如何通過BitSail將上游數據導入至云數據庫 SelectDB 版。
環境準備
配置BitSail環境,下載并解壓BitSail安裝包。
wget feilun-justtmp.oss-cn-hongkong.aliyuncs.com/bitsail.tar.gz tar -zxvf bitsail.tar.gz
配置云數據庫 SelectDB 版實例。
創建云數據庫 SelectDB 版實例,詳情請參見創建實例。
通過MySQL協議連接云數據庫 SelectDB 版實例,詳情請參見連接實例。
創建測試數據庫和測試表。
創建測試數據庫。
CREATE DATABASE test_db;
創建測試表。
CREATE TABLE `test_table` ( `id` BIGINT(20) NULL, `bigint_type` BIGINT(20) NULL, `string_type` VARCHAR(100) NULL, `double_type` DOUBLE NULL, `decimal_type` DECIMALV3(27, 9) NULL, `date_type` DATEV2 NULL, `partition_date` DATEV2 NULL ) ENGINE=OLAP DUPLICATE KEY(`id`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) BUCKETS 10 PROPERTIES ( "light_schema_change" = "true" );
開通云數據庫 SelectDB 版公網地址,詳情請參見申請和釋放公網地址。
將BitSail環境的公網IP添加到IP白名單中,詳情請參見設置白名單。
通過BitSail本地任務同步數據到SelectDB
創建配置文件
test.json
,配置任務信息。通過bitsail包中的FakeSource類構造本地數據進行導入。{ "job": { "common": { "job_id": -2413, "job_name": "bitsail_fake_to_selectdb_test", "instance_id": -20413, "user_name": "user" }, "reader": { "class": "com.bytedance.bitsail.connector.legacy.fake.source.FakeSource", "total_count": 300, "rate": 10000, "random_null_rate": 0, "unique_fields": "id", "columns_with_fixed_value": [ { "name": "partition_date", "fixed_value": "2022-10-10" } ], "columns": [ { "index": 0, "name": "id", "type": "long" }, { "index": 1, "name": "bigint_type", "type": "long" }, { "index": 2, "name": "string_type", "type": "string" }, { "index": 3, "name": "double_type", "type": "double" }, { "index": 4, "name": "decimal_type", "type": "double" }, { "index": 5, "name": "date_type", "type": "date.date" }, { "index": 6, "name": "partition_date", "type": "string" } ] }, "writer": { "class": "com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink", "load_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080", "jdbc_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030", "cluster_name": "new_cluster", "user": "admin", "password": "****", "table_identifier": "test_db.test_table", "columns": [ { "index": 0, "name": "id", "type": "bigint" }, { "index": 1, "name": "bigint_type", "type": "bigint" }, { "index": 2, "name": "string_type", "type": "varchar" }, { "index": 3, "name": "double_type", "type": "double" }, { "index": 4, "name": "decimal_type", "type": "double" }, { "index": 5, "name": "date_type", "type": "date" }, { "index": 6, "name": "partition_date", "type": "date" } ] } } }
命令行提交任務。
bash bin/bitsail run --engine flink --execution-mode run --deployment-mode local --conf test.json