本文為您介紹數據攝入YAML的一些常見使用場景和最佳實踐,幫助您快速打造數據同步鏈路。
MySQL整庫同步Hologres
使用數據攝入YAML同步數據到Hologres搭建實時數倉,可以充分利用Flink強大的實時處理能力和Hologres提供的Binlog、行列共存和資源強隔離等能力,實現高效、可擴展的實時數據處理和分析。
最基本的MySQL整庫同步Hologres的數據攝入YAML作業如下所示。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
使用更寬容的類型映射
Hologres連接器無法處理列類型變更事件,但支持了多種類型映射關系。為了更好地支持數據源的變更,您可以通過將多個MySQL數據類型映射到更寬的Hologres類型,跳過不必要的類型變更事件,從而讓作業正常運行。您可以通過配置項sink.type-normalize-strategy
進行更改,默認值為STANDARD,詳情請見數據攝入YAML作業Hologres連接器類型映射。
例如,可以使用ONLY_BIGINT_OR_TEXT讓類型只對應到Hologres的int8和text類型。此時如果MySQL某個列的類型從INT改為BIGINT,Hologres將這兩種MySQL類型對應到int8類型,作業不會因為無法處理類型轉換而報錯。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
分區表寫入
數據攝入YAML在使用Hologres連接器作為目標端時支持寫入分區表,詳情請參見分區表寫入。
MySQL整庫同步Kafka
MySQL整庫同步Kafka通過SQL作業實現MySQL整庫同步到Kafka,通過數據攝入YAML作業實現的具體方法如下。
假設數據庫kafka_test中有兩張表customers和products,下面的作業可以分別將表數據同步到topic customers和products中。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: kafka_test.\.*
server-id: 8601-8604
sink:
type: upsert-kafka
name: Upsert Kafka Sink
properties.bootstrap.servers: ${upsert.kafka.bootstraps.server}
aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak}
aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk}
aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid}
aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint}
aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid}
route:
- source-table: kafka_test.customers
sink-table: customers
- source-table: kafka_test.products
sink-table: products
如果不使用route模塊,在Kafka中會使用database.table的格式創建topic。例如MySQL表
kafka_test.customers
在Kafka中對應的topic名稱為kafka_test.customers
。如果使用阿里云消息隊列Kafka版,需要配置
aliyun.kafka.accessKeyId
、aliyun.kafka.accessKeySecret
、aliyun.kafka.instanceId
、aliyun.kafka.endpoint
和aliyun.kafka.regionId
。阿里云消息隊列Kafka版默認不開啟自動創建Topic功能,參見自動化創建Topic相關問題,需要預先創建對應的Topic,詳情請參見步驟三:創建資源。數據攝入YAML同步的topic中存儲的不是原始Binlog數據,可以在Flink SQL作業中,使用Upsert Kafka連接器進行讀取。
同步MySQL Binlog數據到Kafka
MySQL整庫同步Kafka方案給您在Kafka中提供了MySQL表的快照,但在某些場景,您需要存儲原始的Binlog數據,方便后續的數據審計、數據重放等工作。
數據攝入YAML支持同步MySQL原始Binlog數據到Kafka,方便您分布式讀取Binlog數據,解決數據熱點問題。
假設數據庫kafka_test中有兩張表customers和products,下面的作業可以分別將表數據同步到topic customers和products中。
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: kafka_test.\.*
server-id: 8601-8604
metadata-column.include-list: op_ts
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
properties.enable.idempotence: false
route:
- source-table: kafka_test.customers
sink-table: customers
- source-table: kafka_test.products
sink-table: products
customers表的一條Update語句產生Kafka消息的消息體格式如下:
// debezium-json
{
"before": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
},
"after": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
},
"op": "u",
"source": {
"db": null,
"table": "customers",
"ts_ms": 1728528674000
}
}
// canal-json
{
"old": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
}
],
"data": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
}
],
"type": "UPDATE",
"database": null,
"table": "customers",
"pkNames": [
"id"
],
"ts": 1728528674000
}
當配置了route模塊時,JSON數據database寫入的值為null。
寫入的Binlog格式支持canal-json和debezium-json(默認),詳情請參見消息隊列Kafka。
默認所有數據寫入Topic的分區0,可以使用
partition.strategy
配置進行調整,詳情請參見partition.strategy。例如可使用如下配置,每個表的數據會根據主鍵的哈希值將數據寫到多個分區,保證同一個主鍵的數據在同一個分區并且有序。source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: kafka_test.\.* server-id: 8601-8604 sink: type: kafka name: Kafka Sink properties.bootstrap.servers: ${kafka.bootstraps.server} properties.enable.idempotence: false partition.strategy: hash-by-key
阿里云消息隊列Kafka版不支持冪等和事務寫入,作為數據攝入目標端時,需要在數據攝入目標端添加配置項
properties.enable.idempotence: false
以關閉冪等寫入功能。如果僅希望將所有表的數據寫入到Kafka的同一個topic,可以使用
topic
配置設置寫入的topic,不需要額外的route模塊。例如可使用如下配置,將所有數據寫入到kafka_test這一個topic中。source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: kafka_test.\.* server-id: 8601-8604 sink: type: kafka name: Kafka Sink properties.bootstrap.servers: ${kafka.bootstraps.server} properties.enable.idempotence: false topic: kafka_test
Schema變更
數據攝入YAML作業會自動同步數據源的Schema變更到下游目標端。為了防止刪除表、清空表等一些高危操作對目標端產生影響,支持通過pipeline模塊的schema.change.behavior
配置項修改Schema變更的行為。schema.change.behavior
默認值為LENIENT,不允許刪除下游表或者清空下游表,并更寬容的支持某些變更,保證數據完整性,詳情請參見Schema變更行為配置。
如下代碼可以修改變更行為為EVOLVE。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres yaml job
schema.change.behavior: EVOLVE
在EVOLVE模式下,DROP TABLE 和 TRUNCATE TABLE都會直接同步數據到目標端。
此外,為了更靈活地控制不同變更事件,還支持在sink模塊使用include.schema.changes
和exclude.schema.changes
來應用和排除某些變更,詳情請參見控制目標端接收的Schema變更。
如下配置可以在同步數據時,跳過刪除表,刪除列和清空表操作。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
exclude.schema.changes: [drop, truncate.table]
pipeline:
name: MySQL to Hologres yaml job
schema.change.behavior: EVOLVE
新增表功能
在數據攝入YAML作業中,支持兩種情況下的新增表:
新增加的表為空表,不存在歷史數據,數據全部是新插入的。
新增加的表已經存在,需要同步歷史數據。
新增加的表為空表,不存在歷史數據
新增加的表為空表是指在作業運行期間,創建的能被作業匹配到的表,不需要同步歷史數據。這種場景下,數據攝入YAML支持不重啟新增表,需要在MySQL數據源開啟scan.binlog.newly-added-table.enabled
。
例如MySQL數據庫中有表customers,數據攝入YAML作業運行后,MySQL數據庫中又創建了表products,希望不重啟作業能同步products表,作業需要如下配置:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
scan.binlog.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
這樣配置的YAML作業運行后會自動在目標端創建holo_test數據庫下的全部新增表。
新增加的表已經存在,需要同步歷史數據
假設MySQL數據庫中有表customers和products,啟動時只需要同步customers表,作業配置如:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.customers
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
作業運行一段時間后,需要額外同步該數據庫下全部的表和歷史數據,則需要按照如下步驟操作:
保留Savepoint停止作業。
修改MySQL數據源tables配置為需要匹配的表,同時MySQL數據源開啟
scan.newly-added-table.enabled
。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
scan.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
從保留的Savepoint重啟作業。
不支持同時開啟scan.binlog.newly-added-table.enabled
和scan.newly-added-table.enabled
。
添加元數據列
寫入數據時,可以使用transform模塊添加元數據列到數據中。例如如下的同步到Hologres的作業,可以將操作類型寫入,詳情請見元數據列。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.customers
server-id: 8601-8604
transform:
- source-table: holo_test.customers
projection: \*, __data_event_type__ as op
description: add op
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
指定時間戳啟動
在無狀態啟動數據攝入YAML作業時,支持指定數據源的開始時間,幫助您從指定的Binlog位置恢復數據的讀取。