日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

利用Flink CDC實現數據同步至Delta Table

MaxCompute為您提供對接Flink CDC的新版插件Connector連接器。您可以通過對接Flink CDC,將數據源(例如MySQL)數據實時同步至MaxCompute的目標表(普通表或Delta表)。本文為您介紹MaxCompute新版插件的能力支持情況與主要操作流程。

Flink CDC背景介紹

Flink CDC是一個端到端的開源實時數據集成工具,定義了一套功能完整的編程接口和ETL數據處理框架,用戶可通過提交Flink作業使用其功能,詳情請參見Flink CDC。Flink CDC深度集成并由Apache Flink驅動,提供以下核心功能:

  • 端到端的數據集成框架。

  • 為數據集成的用戶提供了易于構建作業的API。

  • 支持在Source(數據源)和Sink(輸出端)中處理多個表。

  • 整庫同步。

  • 具備表結構變更自動同步的能力(Schema Evolution)。

前提條件

已創建MaxCompute項目,詳情請參見創建MaxCompute項目

注意事項

  • 數據同步Connector連接器支持自動建表,將MaxCompute表與源表的位置關系、數據類型進行自動映射。當源表有主鍵時,會自動創建Delta表,否則會創建MaxCompute普通表。映射詳情請參見表位置映射數據類型映射

  • 當數據寫入至普通表時,系統會忽略DELETE操作,UPDATE操作會被視為INSERT操作。

  • 目前僅支持at-least-once,Delta表由于主鍵特性能夠實現冪等寫。

  • 對于表結構變更同步。

  • 新增列只能添加到最后一列。

  • 修改列類型,只能修改為兼容的數據類型。數據類型兼容表詳情請參見更改列數據類型

快速開始

本文將基于Flink CDC,快速構建MySQL到MaxCompute的Streaming ETL作業(MySQL to MaxCompute),實現Flink CDC Pipeline的編寫。其中包含整庫同步、表結構變更同步和分庫分表同步功能。

環境準備

準備Flink Standalone集群

  1. 下載flink-1.18.0-bin-scala_2.12.tgz并解壓,解壓后得到flink-1.18.0目錄。進入flink-1.18.0目錄,執行以下命令,將FLINK_HOME設置為flink-1.18.0的安裝目錄。

    export FLINK_HOME=$(pwd)
  2. $flink-1.18.0/conf目錄下執行vim flink-conf.yaml命令,在配置文件中追加下列參數并保存。

    # 開啟checkpoint,每隔3秒做一次checkpoint
    # 僅作測試使用,實際作業checkpoint間隔時間不建議低于30s
    execution.checkpointing.interval: 3000
    
    # 由于flink-cdc-pipeline-connector-maxcompute依賴flink通信機制進行寫入同步,
    # 這里適當增大消息通信超時時間
    pekko.ask.timeout: 60s
  3. 執行如下命令,啟動Flink集群。

    ./bin/start-cluster.sh

    如啟動成功,可以在http://localhost:8081/(8081為默認端口)訪問到Flink Web UI。

    多次執行start-cluster.sh可以拉起多個TaskManager,用于并發執行。

準備MySQL環境

此處以Docker Compose的方式為例指導您準備MySQL環境。

  1. 啟動Docker鏡像后,創建一個名為docker-compose.yaml的文件,文件內容如下:

    version: '2.1'
    services:
      mysql:
        image: debezium/example-mysql:1.1
        ports:
          - "3306:3306"
        environment:
          - MYSQL_ROOT_PASSWORD=123456
          - MYSQL_USER=mysqluser
          - MYSQL_PASSWORD=mysqlpw

    參數說明:

    參數

    描述

    version

    Docker版本。

    image

    鏡像版本,配置為debezium/example-mysql:1.1。

    ports

    MySQL端口號。

    environment

    MySQL賬號密碼。

    該Docker Compose中包含的容器有:MySQL-包含商品信息的數據庫app_db。

  2. 在docker-compose.yaml所在目錄執行如下命令,啟動所需組件:

    docker-compose up -d

    該命令將以Detached模式自動啟動Docker Compose配置中定義的所有容器。您可以執行docker ps命令查看上述容器是否已正常啟動。

在MySQL數據庫中準備數據

  1. 執行如下命令,進入MySQL容器。

    docker-compose exec mysql mysql -uroot -p123456
  2. 在MySQL中創建數據庫,并準備表數據。

    1. 創建數據庫。

      CREATE DATABASE app_db;
      USE app_db;
    2. 準備表數據。

      • 創建orders表,并插入數據。

        CREATE TABLE `orders` (
        `id` INT NOT NULL,
        `price` DECIMAL(10,2) NOT NULL,
        PRIMARY KEY (`id`)
        );
        
        -- 插入數據
        INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
        INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);
      • 創建shipments表,并插入數據。

        CREATE TABLE `shipments` (
        `id` INT NOT NULL,
        `city` VARCHAR(255) NOT NULL,
        PRIMARY KEY (`id`)
        );
        
        -- 插入數據
        INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
        INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');
      • 創建products表,并插入數據。

        -- 
        CREATE TABLE `products` (
        `id` INT NOT NULL,
        `product` VARCHAR(255) NOT NULL,
        PRIMARY KEY (`id`)
        );
        
        -- 插入數據
        INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
        INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
        INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');

通過Flink CDC CLI提交任務

  1. 下載所需JAR包:

    • flink-cdc包

      進入flink-cdc下載二進制壓縮包flink-cdc-3.1.1-bin.tar.gz,并解壓得到flink-cdc-3.1.1目錄,其中會包含bin、lib、log及conf四個目錄,將這四個目錄下的文件移動至flink-1.18.0對應的目錄下。

    • Connector包

      下載以下Connector包,并移動至flink-1.18.0/lib目錄下。

      說明

      下載鏈接只對已發布的版本有效, SNAPSHOT版本需要本地基于master或release-分支編譯。

    • Driver包

      下載MySQL Connector Java包,通過--jar參數將其傳入Flink CDC CLI,或將其放在$flink-1.18.0/lib目錄下并重啟Flink集群,因為CDC Connectors不再包含這些Drivers。

  2. 編寫任務配置YAML文件。下述為您提供一個整庫同步的示例文件mysql-to-maxcompute.yaml

    ################################################################################
    # Description: Sync MySQL all tables to MaxCompute
    ################################################################################
    source:
      type: mysql
      hostname: localhost
      port: 3306
      username: root
      password: 123456
      tables: app_db.\.*
      server-id: 5400-5404
      server-time-zone: UTC
    
    # accessId, accessKey, endpoint, project需要用戶自行填寫
    sink:
       type: maxcompute
       name: MaxComputeSink
       accessId: ${your_accessId}
       accessKey: ${your_accessKey}
       endpoint: ${your_maxcompute_endpoint}
       project: ${your_project}
       bucketsNum: 8
    
    pipeline:
      name: Sync MySQL Database to MaxCompute
      parallelism: 1
    

    參數說明:

  3. 執行下述命令,提交任務至Flink Standalone集群。

    ./bin/flink-cdc.sh mysql-to-maxcompute.yaml

    提交成功后,返回如下信息:

    Pipeline has been submitted to cluster.
    Job ID: f9f9689866946e25bf151ecc179ef46f
    Job Description: Sync MySQL Database to MaxCompute

    在Flink Web UI中,即可看到一個名為Sync MySQL Database to MaxCompute的任務正在運行。

  4. 在MaxCompute中執行如下SQL,查看orders、shipments及products三張表是否已被成功創建,并且可以進行數據寫入。

    -- 查看orders表
    read orders;
    
    -- 返回結果:
    +------------+------------+
    | id         | price      |
    +------------+------------+
    | 1          | 4          |
    | 2          | 100        |
    +------------+------------+
    
    -- 查看shipments表
    read shipments;
    
    -- 返回結果
    +------------+------------+
    | id         | city       |
    +------------+------------+
    | 1          | beijing    |
    | 2          | xian       |
    +------------+------------+
    
    -- 查看products表
    read products;
    
    -- 返回結果
    +------------+------------+
    | id         | product    |
    +------------+------------+
    | 3          | Peanut     |
    | 1          | Beer       |
    | 2          | Cap        |
    +------------+------------+

同步變更操作

此處以orders表為例,為您展示在修改MySQL數據庫中的源表數據時,MaxCompute中對應的目標表數據也會實時更新。

  1. 執行如下命令,進入MySQL容器。

    docker-compose exec mysql mysql -uroot -p123456
  2. 在MySQL的orders表中插入一條數據。

    INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);

    在MaxCompute中執行read orders;命令查詢orders表數據。返回結果如下:

    +------------+------------+
    | id         | price      |
    +------------+------------+
    | 3          | 100        |
    | 1          | 4          |
    | 2          | 100        |
    +------------+------------+
  3. 在MySQL的orders表中增加一個字段。

    ALTER TABLE app_db.orders ADD amount varchar(100) NULL;

    在MaxCompute中執行read orders;命令查詢orders表數據。返回結果如下:

    +------------+------------+------------+
    | id         | price      | amount     |
    +------------+------------+------------+
    | 3          | 100        | NULL       |
    | 1          | 4          | NULL       |
    | 2          | 100        | NULL       |
    +------------+------------+------------+
  4. 在MySQL的orders表中更新一條數據。

    UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;

    在MaxCompute中執行read orders;命令查詢orders表數據。返回結果如下:

    +------------+------------+------------+
    | id         | price      | amount     |
    +------------+------------+------------+
    | 3          | 100        | NULL       |
    | 1          | 100        | 100.00     |
    | 2          | 100        | NULL       |
    +------------+------------+------------+
  5. 在MySQL的orders表中刪除一條數據。

    DELETE FROM app_db.orders WHERE id=2;

    在MaxCompute中執行read orders;命令查詢orders表數據。返回結果如下:

    +------------+------------+------------+
    | id         | price      | amount     |
    +------------+------------+------------+
    | 3          | 100        | NULL       |
    | 1          | 100        | 100.00     |
    +------------+------------+------------+

對于上述操作,在MySQL中每執行一步,就在MaxCompute中進行一次數據預覽,可以看到MaxCompute中顯示的orders表數據是實時更新的。

輪詢變更操作

Flink CDC提供了將源表的表結構或數據路由到其他表名的配置,借助這種能力,我們能夠實現表名、庫名替換,整庫同步等功能。 下面提供一個配置文件說明:

################################################################################
# Description: Sync MySQL all tables to MaxCompute
################################################################################
source:
   type: mysql
   hostname: localhost
   port: 3306
   username: root
   password: 123456
   tables: app_db.\.*
   server-id: 5400-5404
   server-time-zone: UTC

# accessId, accessKey, endpoint, project 需要用戶自行填寫
sink:
   type: maxcompute
   name: MaxComputeSink
   accessId: ${your_accessId}
   accessKey: ${your_accessKey}
   endpoint: ${your_maxcompute_endpoint}
   project: ${your_project}
   bucketsNum: 8

route:
   - source-table: app_db.orders
     sink-table: ods_db.ods_orders
   - source-table: app_db.shipments
     sink-table: ods_db.ods_shipments
   - source-table: app_db.products
     sink-table: ods_db.ods_products

pipeline:
   name: Sync MySQL Database to MaxCompute
   parallelism: 1

route部分的參數詳情請參見Flink CDC Route

通過上面的route配置,會將app_db.orders表的結構和數據同步至ods_db.ods_orders中。從而實現數據庫遷移的功能。 特別地,source-table支持正則表達式匹配多表,從而實現分庫分表同步的功能,例如下面的配置:

route:
  - source-table: app_db.order\.*
    sink-table: ods_db.ods_orders

這樣,就可以將諸如app_db.order01、app_db.order02、app_db.order03的表數據匯總到ods_db.ods_orders中。

說明

目前還不支持多表中存在相同主鍵數據的場景,將在后續版本支持。

環境清理

執行完上述操作后,您需要進行環境清理。

  1. 在docker-compose.yml文件所在的目錄下執行如下命令停止所有容器:

    docker-compose down
  2. 在Flink所在目錄flink-1.18.0下,執行如下命令停止Flink集群:

    ./bin/stop-cluster.sh

附錄

連接器Connector配置項

配置項

是否必填

默認值

類型

描述

type

none

String

指定要使用的連接器,這里需要設置成 maxcompute

name

none

String

Sink的名稱。

accessId

none

String

阿里云賬號或RAM用戶的AccessKey ID。您可以進入AccessKey管理頁面獲取AccessKey ID。

accessKey

none

String

AccessKey ID對應的AccessKey Secret。

endpoint

none

String

MaxCompute服務的連接地址。您需要根據創建MaxCompute項目時選擇的地域以及網絡連接方式配置Endpoint。各地域及網絡對應的Endpoint值,請參見 Endpoint

project

none

String

MaxCompute項目名稱。您可以登錄MaxCompute控制臺,在工作區>項目管理頁面獲取MaxCompute項目名稱。

tunnelEndpoint

none

String

MaxCompute Tunnel服務的連接地址,通常這項配置可以根據指定的項目所在的地域進行自動路由。僅在使用代理等特殊網絡環境下使用該配置。

quotaName

none

String

MaxCompute數據傳輸使用的獨享資源組名稱,如不指定該配置,則使用共享資源組。詳情可以參見購買與使用獨享數據傳輸服務資源組

stsToken

none

String

當使用RAM角色頒發的短時有效的訪問令牌(STS Token)進行鑒權時,需要指定該參數。

bucketsNum

16

Integer

自動創建MaxCompute Delta表時使用的桶數。使用方式請參見近實時數倉概述

compressAlgorithm

zlib

String

寫入MaxCompute時使用的數據壓縮算法,當前支持raw(不進行壓縮)、zlibsnappy

totalBatchSize

64MB

String

內存中緩沖的數據量大小,單位為分區級(非分區表單位為表級),不同分區(表)的緩沖區相互獨立,達到閾值后數據寫入到MaxCompute。

bucketBatchSize

4MB

String

內存中緩沖的數據量大小,單位為桶級,僅寫入Delta表時生效。不同數據桶的緩沖區相互獨立,達到閾值后將該桶數據寫入到MaxCompute。

numCommitThreads

16

Integer

Checkpoint階段,能夠同時處理的分區(表)數量。

numFlushConcurrent

4

Integer

寫入數據到MaxCompute時,能夠同時寫入的桶數量。僅寫入Delta表時生效。

retryTimes

3

Integer

當網絡鏈接發生錯誤時,進行重試的次數。

sleepMillis

true

Long

當網絡鏈接發生錯誤時,每次重試等待的時間,單位:毫秒。

表位置映射

連接器Connector自動建表時,使用如下映射關系,將源表的位置信息映射到MaxCompute表中。

重要

當MaxCompute項目不支持Schema模型時,每個同步任務僅能同步一個MySQL Database。(其他數據源同理,連接器Connector會忽略tableId.namespace信息)。

Flink CDC中對象

MaxCompute位置

MySQL位置

配置文件中project

Project

none

TableId.namespace

Schema(僅當MaxCompute項目支持Schema模型時,如不支持,將忽略該配置)

Database

TableId.tableName

Table

Table

數據類型映射

Flink Type

MaxCompute Type

CHAR/VARCHAR

STRING

BOOLEAN

BOOLEAN

BINARY/VARBINARY

BINARY

DECIMAL

DECIMAL

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

TIME_WITHOUT_TIME_ZONE

STRING

DATE

DATE

TIMESTAMP_WITHOUT_TIME_ZONE

TIMESTAMP_NTZ

TIMESTAMP_WITH_LOCAL_TIME_ZONE

TIMESTAMP

TIMESTAMP_WITH_TIME_ZONE

TIMESTAMP

ARRAY

ARRAY

MAP

MAP

ROW

STRUCT