MaxCompute為您提供對接Flink CDC的新版插件Connector連接器。您可以通過對接Flink CDC,將數據源(例如MySQL)數據實時同步至MaxCompute的目標表(普通表或Delta表)。本文為您介紹MaxCompute新版插件的能力支持情況與主要操作流程。
Flink CDC背景介紹
Flink CDC是一個端到端的開源實時數據集成工具,定義了一套功能完整的編程接口和ETL數據處理框架,用戶可通過提交Flink作業使用其功能,詳情請參見Flink CDC。Flink CDC深度集成并由Apache Flink驅動,提供以下核心功能:
端到端的數據集成框架。
為數據集成的用戶提供了易于構建作業的API。
支持在Source(數據源)和Sink(輸出端)中處理多個表。
整庫同步。
具備表結構變更自動同步的能力(Schema Evolution)。
前提條件
已創建MaxCompute項目,詳情請參見創建MaxCompute項目。
注意事項
快速開始
本文將基于Flink CDC,快速構建MySQL到MaxCompute的Streaming ETL作業(MySQL to MaxCompute),實現Flink CDC Pipeline的編寫。其中包含整庫同步、表結構變更同步和分庫分表同步功能。
環境準備
準備Flink Standalone集群
下載flink-1.18.0-bin-scala_2.12.tgz并解壓,解壓后得到
flink-1.18.0
目錄。進入flink-1.18.0
目錄,執行以下命令,將FLINK_HOME設置為flink-1.18.0的安裝目錄。export FLINK_HOME=$(pwd)
在
$flink-1.18.0/conf
目錄下執行vim flink-conf.yaml
命令,在配置文件中追加下列參數并保存。# 開啟checkpoint,每隔3秒做一次checkpoint # 僅作測試使用,實際作業checkpoint間隔時間不建議低于30s execution.checkpointing.interval: 3000 # 由于flink-cdc-pipeline-connector-maxcompute依賴flink通信機制進行寫入同步, # 這里適當增大消息通信超時時間 pekko.ask.timeout: 60s
執行如下命令,啟動Flink集群。
./bin/start-cluster.sh
如啟動成功,可以在http://localhost:8081/(8081為默認端口)訪問到Flink Web UI。
多次執行start-cluster.sh可以拉起多個TaskManager,用于并發執行。
準備MySQL環境
此處以Docker Compose的方式為例指導您準備MySQL環境。
啟動Docker鏡像后,創建一個名為
docker-compose.yaml
的文件,文件內容如下:version: '2.1' services: mysql: image: debezium/example-mysql:1.1 ports: - "3306:3306" environment: - MYSQL_ROOT_PASSWORD=123456 - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw
參數說明:
參數
描述
version
Docker版本。
image
鏡像版本,配置為debezium/example-mysql:1.1。
ports
MySQL端口號。
environment
MySQL賬號密碼。
該Docker Compose中包含的容器有:MySQL-包含商品信息的數據庫app_db。
在docker-compose.yaml所在目錄執行如下命令,啟動所需組件:
docker-compose up -d
該命令將以Detached模式自動啟動Docker Compose配置中定義的所有容器。您可以執行
docker ps
命令查看上述容器是否已正常啟動。
在MySQL數據庫中準備數據
執行如下命令,進入MySQL容器。
docker-compose exec mysql mysql -uroot -p123456
在MySQL中創建數據庫,并準備表數據。
創建數據庫。
CREATE DATABASE app_db; USE app_db;
準備表數據。
創建orders表,并插入數據。
CREATE TABLE `orders` ( `id` INT NOT NULL, `price` DECIMAL(10,2) NOT NULL, PRIMARY KEY (`id`) ); -- 插入數據 INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00); INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);
創建shipments表,并插入數據。
CREATE TABLE `shipments` ( `id` INT NOT NULL, `city` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); -- 插入數據 INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing'); INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');
創建products表,并插入數據。
-- CREATE TABLE `products` ( `id` INT NOT NULL, `product` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); -- 插入數據 INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer'); INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap'); INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
通過Flink CDC CLI提交任務
下載所需JAR包:
flink-cdc包
進入flink-cdc下載二進制壓縮包flink-cdc-3.1.1-bin.tar.gz,并解壓得到
flink-cdc-3.1.1
目錄,其中會包含bin、lib、log及conf四個目錄,將這四個目錄下的文件移動至flink-1.18.0對應的目錄下。Connector包
下載以下Connector包,并移動至
flink-1.18.0/lib
目錄下。說明下載鏈接只對已發布的版本有效, SNAPSHOT版本需要本地基于master或release-分支編譯。
Driver包
下載MySQL Connector Java包,通過--jar參數將其傳入Flink CDC CLI,或將其放在
$flink-1.18.0/lib
目錄下并重啟Flink集群,因為CDC Connectors不再包含這些Drivers。
編寫任務配置YAML文件。下述為您提供一個整庫同步的示例文件
mysql-to-maxcompute.yaml
:################################################################################ # Description: Sync MySQL all tables to MaxCompute ################################################################################ source: type: mysql hostname: localhost port: 3306 username: root password: 123456 tables: app_db.\.* server-id: 5400-5404 server-time-zone: UTC # accessId, accessKey, endpoint, project需要用戶自行填寫 sink: type: maxcompute name: MaxComputeSink accessId: ${your_accessId} accessKey: ${your_accessKey} endpoint: ${your_maxcompute_endpoint} project: ${your_project} bucketsNum: 8 pipeline: name: Sync MySQL Database to MaxCompute parallelism: 1
參數說明:
Source部分的參數配置詳情請參見Apache Flink CDC MySQL Connector。
Sink部分的參數配置方式請參見連接器Connector配置項。
執行下述命令,提交任務至Flink Standalone集群。
./bin/flink-cdc.sh mysql-to-maxcompute.yaml
提交成功后,返回如下信息:
Pipeline has been submitted to cluster. Job ID: f9f9689866946e25bf151ecc179ef46f Job Description: Sync MySQL Database to MaxCompute
在Flink Web UI中,即可看到一個名為
Sync MySQL Database to MaxCompute
的任務正在運行。在MaxCompute中執行如下SQL,查看orders、shipments及products三張表是否已被成功創建,并且可以進行數據寫入。
-- 查看orders表 read orders; -- 返回結果: +------------+------------+ | id | price | +------------+------------+ | 1 | 4 | | 2 | 100 | +------------+------------+ -- 查看shipments表 read shipments; -- 返回結果 +------------+------------+ | id | city | +------------+------------+ | 1 | beijing | | 2 | xian | +------------+------------+ -- 查看products表 read products; -- 返回結果 +------------+------------+ | id | product | +------------+------------+ | 3 | Peanut | | 1 | Beer | | 2 | Cap | +------------+------------+
同步變更操作
此處以orders表為例,為您展示在修改MySQL數據庫中的源表數據時,MaxCompute中對應的目標表數據也會實時更新。
執行如下命令,進入MySQL容器。
docker-compose exec mysql mysql -uroot -p123456
在MySQL的orders表中插入一條數據。
INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);
在MaxCompute中執行
read orders;
命令查詢orders表數據。返回結果如下:+------------+------------+ | id | price | +------------+------------+ | 3 | 100 | | 1 | 4 | | 2 | 100 | +------------+------------+
在MySQL的orders表中增加一個字段。
ALTER TABLE app_db.orders ADD amount varchar(100) NULL;
在MaxCompute中執行
read orders;
命令查詢orders表數據。返回結果如下:+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 4 | NULL | | 2 | 100 | NULL | +------------+------------+------------+
在MySQL的orders表中更新一條數據。
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;
在MaxCompute中執行
read orders;
命令查詢orders表數據。返回結果如下:+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 100 | 100.00 | | 2 | 100 | NULL | +------------+------------+------------+
在MySQL的orders表中刪除一條數據。
DELETE FROM app_db.orders WHERE id=2;
在MaxCompute中執行
read orders;
命令查詢orders表數據。返回結果如下:+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 100 | 100.00 | +------------+------------+------------+
對于上述操作,在MySQL中每執行一步,就在MaxCompute中進行一次數據預覽,可以看到MaxCompute中顯示的orders表數據是實時更新的。
輪詢變更操作
Flink CDC提供了將源表的表結構或數據路由到其他表名的配置,借助這種能力,我們能夠實現表名、庫名替換,整庫同步等功能。 下面提供一個配置文件說明:
################################################################################
# Description: Sync MySQL all tables to MaxCompute
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: UTC
# accessId, accessKey, endpoint, project 需要用戶自行填寫
sink:
type: maxcompute
name: MaxComputeSink
accessId: ${your_accessId}
accessKey: ${your_accessKey}
endpoint: ${your_maxcompute_endpoint}
project: ${your_project}
bucketsNum: 8
route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
pipeline:
name: Sync MySQL Database to MaxCompute
parallelism: 1
route部分的參數詳情請參見Flink CDC Route。
通過上面的route配置,會將app_db.orders表的結構和數據同步至ods_db.ods_orders中。從而實現數據庫遷移的功能。 特別地,source-table支持正則表達式匹配多表,從而實現分庫分表同步的功能,例如下面的配置:
route:
- source-table: app_db.order\.*
sink-table: ods_db.ods_orders
這樣,就可以將諸如app_db.order01、app_db.order02、app_db.order03的表數據匯總到ods_db.ods_orders中。
目前還不支持多表中存在相同主鍵數據的場景,將在后續版本支持。
環境清理
執行完上述操作后,您需要進行環境清理。
在docker-compose.yml文件所在的目錄下執行如下命令停止所有容器:
docker-compose down
在Flink所在目錄flink-1.18.0下,執行如下命令停止Flink集群:
./bin/stop-cluster.sh
附錄
連接器Connector配置項
配置項 | 是否必填 | 默認值 | 類型 | 描述 |
type | 是 | none | String | 指定要使用的連接器,這里需要設置成 |
name | 否 | none | String | Sink的名稱。 |
accessId | 是 | none | String | 阿里云賬號或RAM用戶的AccessKey ID。您可以進入AccessKey管理頁面獲取AccessKey ID。 |
accessKey | 是 | none | String | AccessKey ID對應的AccessKey Secret。 |
endpoint | 是 | none | String | MaxCompute服務的連接地址。您需要根據創建MaxCompute項目時選擇的地域以及網絡連接方式配置Endpoint。各地域及網絡對應的Endpoint值,請參見 Endpoint。 |
project | 是 | none | String | MaxCompute項目名稱。您可以登錄MaxCompute控制臺,在工作區>項目管理頁面獲取MaxCompute項目名稱。 |
tunnelEndpoint | 否 | none | String | MaxCompute Tunnel服務的連接地址,通常這項配置可以根據指定的項目所在的地域進行自動路由。僅在使用代理等特殊網絡環境下使用該配置。 |
quotaName | 否 | none | String | MaxCompute數據傳輸使用的獨享資源組名稱,如不指定該配置,則使用共享資源組。詳情可以參見購買與使用獨享數據傳輸服務資源組。 |
stsToken | 否 | none | String | 當使用RAM角色頒發的短時有效的訪問令牌(STS Token)進行鑒權時,需要指定該參數。 |
bucketsNum | 否 | 16 | Integer | 自動創建MaxCompute Delta表時使用的桶數。使用方式請參見近實時數倉概述。 |
compressAlgorithm | 否 | zlib | String | 寫入MaxCompute時使用的數據壓縮算法,當前支持 |
totalBatchSize | 否 | 64MB | String | 內存中緩沖的數據量大小,單位為分區級(非分區表單位為表級),不同分區(表)的緩沖區相互獨立,達到閾值后數據寫入到MaxCompute。 |
bucketBatchSize | 否 | 4MB | String | 內存中緩沖的數據量大小,單位為桶級,僅寫入Delta表時生效。不同數據桶的緩沖區相互獨立,達到閾值后將該桶數據寫入到MaxCompute。 |
numCommitThreads | 否 | 16 | Integer | Checkpoint階段,能夠同時處理的分區(表)數量。 |
numFlushConcurrent | 否 | 4 | Integer | 寫入數據到MaxCompute時,能夠同時寫入的桶數量。僅寫入Delta表時生效。 |
retryTimes | 否 | 3 | Integer | 當網絡鏈接發生錯誤時,進行重試的次數。 |
sleepMillis | 否 | true | Long | 當網絡鏈接發生錯誤時,每次重試等待的時間,單位:毫秒。 |
表位置映射
連接器Connector自動建表時,使用如下映射關系,將源表的位置信息映射到MaxCompute表中。
當MaxCompute項目不支持Schema模型時,每個同步任務僅能同步一個MySQL Database。(其他數據源同理,連接器Connector會忽略tableId.namespace信息)。
Flink CDC中對象 | MaxCompute位置 | MySQL位置 |
配置文件中project | Project | none |
TableId.namespace | Schema(僅當MaxCompute項目支持Schema模型時,如不支持,將忽略該配置) | Database |
TableId.tableName | Table | Table |
數據類型映射
Flink Type | MaxCompute Type |
CHAR/VARCHAR | STRING |
BOOLEAN | BOOLEAN |
BINARY/VARBINARY | BINARY |
DECIMAL | DECIMAL |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
TIME_WITHOUT_TIME_ZONE | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_NTZ |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | TIMESTAMP |
TIMESTAMP_WITH_TIME_ZONE | TIMESTAMP |
ARRAY | ARRAY |
MAP | MAP |
ROW | STRUCT |