本文介紹如何使用自定義SelectDB連接器寫入數據至云數據庫SelectDB版。
背景信息
云數據庫 SelectDB 版是新一代實時數據倉庫SelectDB在阿里云上的全托管服務,100%兼容Apache Doris。您可以在阿里云上便捷地購買SelectDB數倉服務,滿足海量數據分析需求,具體的產品優勢和應用場景請參見什么是云數據庫SelectDB版。
自定義SelectDB連接器支持的信息如下:
類別 | 詳情 |
支持類型 | 結果表 |
運行模式 | 流模式和批模式 |
數據格式 | JSON和CSV |
特有監控指標 | 無 |
API種類 | DataStream和SQL |
是否支持更新/刪除 | 是 |
特色功能
支持整庫數據同步。
SelectDB連接器提供Exactly-Once語義,保證數據不重復也不丟失。
兼容1.0及以上Apache Doris,可以使用Flink SelectDB自定義連接器同步數據至Apache Doris。
注意事項
使用方法
單擊JAR包獲取SelectDB自定義連接器(需要為1.15~1.17)。
在實時計算開發控制臺上,上傳SelectDB自定義連接器,詳情請參見管理自定義連接器。
在SQL作業中使用SelectDB自定義連接器,作業開發詳情請參見SQL作業開發。
具體的語法結構如下。
CREATE TABLE selectdb_sink ( emp_no INT , birth_date DATE, first_name STRING, last_name STRING, gender STRING, hire_date DATE ) WITH ( 'connector' = 'doris', 'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080', 'table.identifier' = 'test.employees', 'username' = 'admin', 'password' = '****', 'sink.enable-delete' = 'true' );
connector為表類型,固定值為
doris
。SelectDB自定義連接器結果表參數配置詳情請參見Sink配置項。
類型映射
使用示例
本文以MySQL數據寫入SelectDB為例為您詳細介紹如何使用SelectDB自定義連接器。
準備工作。
創建Flink工作空間、MySQL和SelectDB實例,詳情請參見開通實時計算Flink版、第一步:快捷創建RDS MySQL實例與配置數據庫和創建實例。
在MySQL中創建名稱為order_dw_mysql的數據庫和名稱為orders的表并導入測試數據。
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee decimal(20,2) not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
通過DMS連接云數據庫SelectDB版實例后創建名稱為selectdb的數據庫和名稱為selecttable的表。
CREATE DATABASE selectdb; CREATE TABLE `selecttable` ( order_id bigint, user_id varchar(50), shop_id bigint, product_id bigint, buy_fee DECIMAL, create_time DATETIME, update_time DATETIME, state int )DISTRIBUTED BY HASH(order_id) BUCKETS 10;
在 實時計算開發控制臺上創建Flink SQL作業并啟動。
創建名稱為mysqlcatalog的MySQL Catalog,詳情請參見管理MySQL Catalog。
單擊JAR包獲取SelectDB自定義連接器(需要為1.15~1.17),注冊SelectDB自定義連接器,詳情請參見管理自定義連接器。
在
新建作業草稿,代碼示例如下。CREATE TEMPORARY TABLE selectdb_sink ( order_id BIGINT, user_id STRING, shop_id BIGINT, product_id BIGINT, buy_fee DECIMAL, create_time TIMESTAMP(6), update_time TIMESTAMP(6), state int ) WITH ( 'connector' = 'doris', 'fenodes' = 'selectdb-cn-jfj3z******.selectdbfe.rds.aliyuncs.com:8080', 'table.identifier' = 'selectdb.selecttable', 'username' = 'admin', 'password' = '${secret_values.selectdb}', 'sink.enable-delete' = 'true' ); INSERT INTO selectdb_sink SELECT * FROM `mysqlcatalog`.`order_dw_mysql`.`orders`;
通過DMS連接云數據庫SelectDB版實例后,查詢名稱為selecttable的表數據。
SELECT * FROM `selecttable` ;