日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

數據攝入YAML最佳實踐

本文為您介紹數據攝入YAML的一些常見使用場景和最佳實踐,幫助您快速打造數據同步鏈路。

MySQL整庫同步Hologres

使用數據攝入YAML同步數據到Hologres搭建實時數倉,可以充分利用Flink強大的實時處理能力和Hologres提供的Binlog、行列共存和資源強隔離等能力,實現高效、可擴展的實時數據處理和分析。

最基本的MySQL整庫同步Hologres的數據攝入YAML作業如下所示。

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN

使用更寬容的類型映射

Hologres連接器無法處理列類型變更事件,但支持了多種類型映射關系。為了更好地支持數據源的變更,您可以通過將多個MySQL數據類型映射到更寬的Hologres類型,跳過不必要的類型變更事件,從而讓作業正常運行。您可以通過配置項sink.type-normalize-strategy進行更改,默認值為STANDARD,詳情請見數據攝入YAML作業Hologres連接器類型映射。

例如,可以使用ONLY_BIGINT_OR_TEXT讓類型只對應到Hologres的int8和text類型。此時如果MySQL某個列的類型從INT改為BIGINT,Hologres將這兩種MySQL類型對應到int8類型,作業不會因為無法處理類型轉換而報錯。

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT

分區表寫入

數據攝入YAML在使用Hologres連接器作為目標端時支持寫入分區表,詳情請參見分區表寫入

MySQL整庫同步Kafka

MySQL整庫同步Kafka通過SQL作業實現MySQL整庫同步到Kafka,通過數據攝入YAML作業實現的具體方法如下。

假設數據庫kafka_test中有兩張表customers和products,下面的作業可以分別將表數據同步到topic customers和products中。

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: kafka_test.\.*
  server-id: 8601-8604

sink:
  type: upsert-kafka
  name: Upsert Kafka Sink
  properties.bootstrap.servers: ${upsert.kafka.bootstraps.server}
  aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak}
  aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk}
  aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid}
  aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint}
  aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid}

route:
  - source-table: kafka_test.customers
    sink-table: customers
  - source-table: kafka_test.products
    sink-table: products
說明
  • 如果不使用route模塊,在Kafka中會使用database.table的格式創建topic。例如MySQL表kafka_test.customers在Kafka中對應的topic名稱為kafka_test.customers。

  • 如果使用阿里云消息隊列Kafka版,需要配置aliyun.kafka.accessKeyId、aliyun.kafka.accessKeySecret、aliyun.kafka.instanceId、aliyun.kafka.endpointaliyun.kafka.regionId。阿里云消息隊列Kafka版默認不開啟自動創建Topic功能,參見自動化創建Topic相關問題,需要預先創建對應的Topic,詳情請參見步驟三:創建資源。

  • 數據攝入YAML同步的topic中存儲的不是原始Binlog數據,可以在Flink SQL作業中,使用Upsert Kafka連接器進行讀取。

同步MySQL Binlog數據到Kafka

MySQL整庫同步Kafka方案給您在Kafka中提供了MySQL表的快照,但在某些場景,您需要存儲原始的Binlog數據,方便后續的數據審計、數據重放等工作。

數據攝入YAML支持同步MySQL原始Binlog數據到Kafka,方便您分布式讀取Binlog數據,解決數據熱點問題。

假設數據庫kafka_test中有兩張表customers和products,下面的作業可以分別將表數據同步到topic customers和products中。

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: kafka_test.\.*
  server-id: 8601-8604
  metadata-column.include-list: op_ts

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}
  properties.enable.idempotence: false

route:
  - source-table: kafka_test.customers
    sink-table: customers
  - source-table: kafka_test.products
    sink-table: products

customers表的一條Update語句產生Kafka消息的消息體格式如下:

// debezium-json
{
  "before": {
    "id": 4,
    "name": "John",
    "address": "New York",
    "phone_number": "2222",
    "age": 12
  },
  "after": {
    "id": 4,
    "name": "John",
    "address": "New York",
    "phone_number": "1234",
    "age": 12
  },
  "op": "u",
  "source": {
    "db": null,
    "table": "customers",
    "ts_ms": 1728528674000
  }
}

// canal-json
{
  "old": [
    {
      "id": 4,
      "name": "John",
      "address": "New York",
      "phone_number": "2222",
      "age": 12
    }
  ],
  "data": [
    {
      "id": 4,
      "name": "John",
      "address": "New York",
      "phone_number": "1234",
      "age": 12
    }
  ],
  "type": "UPDATE",
  "database": null,
  "table": "customers",
  "pkNames": [
    "id"
  ],
  "ts": 1728528674000
}
說明
  • 當配置了route模塊時,JSON數據database寫入的值為null。

  • 寫入的Binlog格式支持canal-json和debezium-json(默認),詳情請參見消息隊列Kafka

  • 默認所有數據寫入Topic的分區0,可以使用partition.strategy配置進行調整,詳情請參見partition.strategy。例如可使用如下配置,每個表的數據會根據主鍵的哈希值將數據寫到多個分區,保證同一個主鍵的數據在同一個分區并且有序。

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: kafka_test.\.*
      server-id: 8601-8604
    
    sink:
      type: kafka
      name: Kafka Sink
      properties.bootstrap.servers: ${kafka.bootstraps.server}
      properties.enable.idempotence: false
      partition.strategy: hash-by-key
  • 阿里云消息隊列Kafka版不支持冪等和事務寫入,作為數據攝入目標端時,需要在數據攝入目標端添加配置項properties.enable.idempotence: false以關閉冪等寫入功能。

  • 如果僅希望將所有表的數據寫入到Kafka的同一個topic,可以使用topic配置設置寫入的topic,不需要額外的route模塊。例如可使用如下配置,將所有數據寫入到kafka_test這一個topic中。

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: kafka_test.\.*
      server-id: 8601-8604
    
    sink:
      type: kafka
      name: Kafka Sink
      properties.bootstrap.servers: ${kafka.bootstraps.server}
      properties.enable.idempotence: false
      topic: kafka_test

Schema變更

數據攝入YAML作業會自動同步數據源的Schema變更到下游目標端。為了防止刪除表、清空表等一些高危操作對目標端產生影響,支持通過pipeline模塊的schema.change.behavior配置項修改Schema變更的行為。schema.change.behavior默認值為LENIENT,不允許刪除下游表或者清空下游表,并更寬容的支持某些變更,保證數據完整性,詳情請參見Schema變更行為配置。

如下代碼可以修改變更行為為EVOLVE。

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN
  
pipeline:
  name: MySQL to Hologres yaml job
  schema.change.behavior: EVOLVE
重要

在EVOLVE模式下,DROP TABLE 和 TRUNCATE TABLE都會直接同步數據到目標端。

此外,為了更靈活地控制不同變更事件,還支持在sink模塊使用include.schema.changesexclude.schema.changes來應用和排除某些變更,詳情請參見控制目標端接收的Schema變更。

如下配置可以在同步數據時,跳過刪除表,刪除列和清空表操作。

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN
  exclude.schema.changes: [drop, truncate.table]
  
pipeline:
  name: MySQL to Hologres yaml job
  schema.change.behavior: EVOLVE

新增表功能

在數據攝入YAML作業中,支持兩種情況下的新增表:

  • 新增加的表為空表,不存在歷史數據,數據全部是新插入的。

  • 新增加的表已經存在,需要同步歷史數據。

新增加的表為空表,不存在歷史數據

新增加的表為空表是指在作業運行期間,創建的能被作業匹配到的表,不需要同步歷史數據。這種場景下,數據攝入YAML支持不重啟新增表,需要在MySQL數據源開啟scan.binlog.newly-added-table.enabled。

例如MySQL數據庫中有表customers,數據攝入YAML作業運行后,MySQL數據庫中又創建了表products,希望不重啟作業能同步products表,作業需要如下配置:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604
  scan.binlog.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN

這樣配置的YAML作業運行后會自動在目標端創建holo_test數據庫下的全部新增表。

新增加的表已經存在,需要同步歷史數據

假設MySQL數據庫中有表customers和products,啟動時只需要同步customers表,作業配置如:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.customers
  server-id: 8601-8604

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN

作業運行一段時間后,需要額外同步該數據庫下全部的表和歷史數據,則需要按照如下步驟操作:

  1. 保留Savepoint停止作業。

  2. 修改MySQL數據源tables配置為需要匹配的表,同時MySQL數據源開啟scan.newly-added-table.enabled。

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604
  scan.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN
  1. 從保留的Savepoint重啟作業。

重要

不支持同時開啟scan.binlog.newly-added-table.enabledscan.newly-added-table.enabled。

添加元數據列

寫入數據時,可以使用transform模塊添加元數據列到數據中。例如如下的同步到Hologres的作業,可以將操作類型寫入,詳情請見元數據列。

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.customers
  server-id: 8601-8604

transform:
  - source-table: holo_test.customers
    projection: \*, __data_event_type__ as op
    description: add op

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}

指定時間戳啟動

在無狀態啟動數據攝入YAML作業時,支持指定數據源的開始時間,幫助您從指定的Binlog位置恢復數據的讀取。