日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

CREATE DATABASE AS(CDAS)語句

CDAS支持整庫級別的表結構和數據的實時同步,還支持表結構變更的同步。本文為您介紹CREATE DATABASE AS(CDAS)的使用方法,并提供了多種使用場景下的示例。

說明

數據攝入YAML作業是實時計算Flink產品中集成的最新Flink CDC功能,支持通過簡單的YAML語言編寫強大數據集成作業。

YAML作業覆蓋了CTAS和CDAS語句的關鍵能力,如整庫同步、schema evolution等,并能支持更多場景,如表結構變更立即同步,原始binlog同步,自動同步新增表等。建議使用YAML完成您的數據攝入作業邏輯開發,您可以參考數據攝入YAML最佳實踐了解更多案例。

背景信息

CDAS是CTAS語法的一個語法糖,用于實現整庫同步、多表同步的功能。阿里云Flink引擎會將CDAS語句中每個需要同步的表翻譯成一個對應的CTAS語句。因此,CDAS還擁有CTAS的數據同步和表結構變更同步的能力,常用于全自動化的數據集成場景。此外,阿里云Flink還能對源表進行優化,復用一個源表節點讀取多業務表的數據。這對于MySQL CDC數據源場景尤為適用,因為不僅可以減少數據庫的連接數,還能避免重復拉取Binlog數據,以降低數據庫的讀取壓力。

使用限制

  • 僅Flink計算引擎vvr-4.0.11-flink-1.13及以上版本支持CDAS語法。

    重要

    CDAS語法不支持進行調試。

  • 僅Flink計算引擎vvr-4.0.13-flink-1.13及以上版本支持分庫合并同步。

  • CDAS支持的上下游存儲列表如下。

    連接器名稱

    源表

    結果表

    備注

    MySQL

    ×

    不支持同步MySQL視圖。

    消息隊列Kafka

    ×

    無。

    MongoDB

    ×

    • 暫不支持分庫合并同步。

    • 暫不支持同步MongoDB元信息。

    • 支持通過CDAS語句將MongoDB中的數據及表結構變更同步至目標表。具體的配置要求請參見使用MongoDB Catalog

    Upsert Kafka

    ×

    無。

    實時數倉Hologres

    ×

    如果下游是Hologres,CDAS在默認情況下會為每個表創建相應數量(connectionSize參數值)個連接。此時您可以使用connectionPoolName參數,讓配置相同名稱連接池的表可以共享連接池。

    說明
    • 在將數據同步到Hologres時,如果您的上游源表包含了Fixed Plan不支持類型的數據,建議通過INSERT INTO語句的方式,在Flink內部做類型轉換后將數據同步到Hologres。不要用CDAS方式創建Sink結果表進行數據同步,因為這種方式會無法走Fixed Plan,寫入性能較差。

    • 實時計算Flink版僅支持讀寫Hologres內表,因此Hologres實例必須是獨占實例,不支持Hologres共享集群實例

    StarRocks

    ×

    僅支持EMR的StarRocks。

    流式數據湖倉Paimon

    ×

    • 僅實時計算引擎VVR 6.0.7及以上版本支持Paimon結果表。

    • 僅實時計算引擎VVR 8.0.10及以上版本支持同步到Paimon DLF 2.0結果表。

前提條件

  • 執行CDAS語法前,確保工作空間中已注冊目標端的Catalog,詳情請參見數據管理

  • 執行CDAS語法前,如果您需要訪問不同賬號下的上下游資源、以及使用RAM用戶或RAM角色等身份訪問時,請確保登錄Flink全托管的賬號具有讀寫上下游資源的權限,否則會因為權限不足導致讀寫操作失敗。

注意事項

  • 使用VVR 8.0.6及以上版本時,CDAS作業啟動后,支持添加新表后從作業快照重啟,從而捕獲到新的表。詳情請參見示例三:源庫新增表加入數據同步

  • 使用VVR 8.0.5及以下版本時,CDAS作業啟動后,作業同步的表已經確定,數據庫中新增的表不會自動捕捉,也無法通過重啟作業的方式捕獲到。如果需要同步新增的表,您可以選擇以下任一種方案:

    • 原有CDAS作業不變,啟動一個新的作業同步新增的表。例如

      // 新建CTAS作業同步新增加的表new_table
      CREATE TABLE IF NOT EXISTS new_table
      AS TABLE mysql.tpcds.new_table 
      /*+ OPTIONS('server-id'='8008-8010') */;
    • 停止現有CDAS作業,清理已同步的數據后,以全新狀態重啟CDAS作業來重新同步數據。

功能特性

功能

詳情

整庫同步

支持實時同步整庫(或者多張表)的全量和增量數據到每張對應的結果表中。

表結構變更同步

在實時同步整庫數據的同時,還支持將每張源表的表結構變更(加列等)實時同步到結果表中。

分庫合并同步

支持使用正則表達式定義庫名,匹配數據源的多個分庫下的源表,合并后同步到下游每張對應表名的結果表中。

源庫新增表加入數據同步

CDAS作業啟動后,如果源庫新增表,支持從作業快照重啟,從而捕獲到新的表,對新增表進行數據同步。

多CDAS&CTAS語句

支持使用STATEMENT SET語法將多個CDAS和CTAS語句作為一個作業一起提交,并支持對源表節點的合并復用,降低對數據源的壓力。

啟動流程

當執行CDAS語句時,阿里云Flink將會按照以下流程執行:

  1. 檢查目標存儲中是否存在目標庫和結果表。

    • 如果不存在目標庫,則通過目標端Catalog去目標存儲中創建相應的目標庫。

    • 如果存在目標庫,則跳過建庫,并檢查目標庫是否存在該結果表。

      • 如果不存在,則在目標庫中創建相應的結果表,該結果表具有和源庫中表相同的表名和Schema。

      • 如果存在,則跳過建表。

  2. 提交和啟動相應的數據同步作業。將源庫中的數據以及Schema變更同步到目標庫下的表中。

例如,從MySQL到Hologres的CDAS數據同步流程如下圖所示。CDAS示意圖

表結構變更同步策略

因為CDAS是CTAS語法的一個語法糖,所以表結構變更能力與CTAS一致,詳情請參見CREATE TABLE AS(CTAS)語句

基本語法

CREATE DATABASE IF NOT EXISTS <target_database>
[COMMENT database_comment]
[WITH (key1=val1, key2=val2, ...)]
AS DATABASE <source_database>
INCLUDING { ALL TABLES | TABLE 'table_name' }
[EXCLUDING TABLE 'table_name']
[/*+ OPTIONS(key1=val1, key2=val2, ... ) */]

<target_database>:
  [catalog_name.]db_name

<source_database>:
  [catalog_name.]db_name

CDAS語法復用了CREATE DATABASE語法的基本結構,其中的參數解釋如下表所示。

參數

說明

target_database

數據同步的目標數據庫名,可以指定具體的Catalog名稱。

COMMENT

目標庫的描述,默認使用source_database的描述。

WITH

目標庫的參數,詳情請參見數據管理中對應的Catalog文檔。

說明

key和value都需要為字符串類型,例如'sink.parallelism' = '4'

source_database

數據同步的源庫名稱,可以指定具體的Catalog名稱。

INCLUDING ALL TABLES

同步源庫中的所有表。

INCLUDING TABLE

同步源庫中指定的表。支持使用豎線(|)分隔指定多個表,也可以使用正則表達式指定符合某一規則的表。例如INCLUDING TABLE 'web.*'表示要同步源庫中所有web開頭的表。

EXCLUDING TABLE

用于指定不需要同步的表,支持使用豎線(|)分隔指定多個表,也可以使用正則表達式指定符合某一規則的表,例如INCLUDING ALL TABLES EXCLUDING TABLE 'web.*'表示同步源庫中所有不是web開頭的表。

OPTIONS

源表的參數,詳情請參見對應連接器支持的源表WITH參數。

說明

key和value都需要為字符串類型,例如'server-id' = '65500'

說明

因為IF NOT EXISTS關鍵字為必填,所以如果目標庫或結果表在目標存儲中并不存在,則會先創建該目標庫和結果表,否則跳過創建步驟。創建的結果表Schema會使用源表的Schema,包括主鍵以及物理字段的字段名和字段類型,不包括計算列、meta字段、Watermark。其中源表到結果表的字段類型會經過類型映射,詳見對應連接器文檔中的類型映射。

示例

示例一:整庫同步

CDAS通常會配合數據源的Catalog和目標的Catalog一起使用。例如,MySQL Catalog和Hologres Catalog結合CDAS語法,完成MySQL到Hologres的全量和增量數據同步。使用MySQL Catalog可以自動解析源表的Schema及相應的參數,而不用手動編寫DDL。

假設已在工作空間中注冊了名為holo的Hologres Catalog和名為mysql的MySQL Catalog,MySQL中有一個名為tpcds的庫。您可以使用以下語句將tpcds庫下的24張表全部同步到Hologres中,包括未來的數據變更和表結構變更,無需提前在Hologres中創建表。

USE CATALOG holo;

CREATE DATABASE IF NOT EXISTS holo_tpcds  -- 在hologres中創建holo_tpcds庫。
WITH ('sink.parallelism' = '4') -- 可選,指定目標庫的參數,每個holo sink默認使用4并發。
AS DATABASE mysql.tpcds INCLUDING ALL TABLES  -- 同步mysql中tpcds庫下所有表。
/*+ OPTIONS('server-id'='8001-8004') */ ; -- 可選,指定mysql-cdc源表的額外參數。
說明

Hologres支持在創建目標Database時指定WITH參數,這些參數僅對當前作業生效,用于控制寫入結果表時的行為,不會持久化到Hologres中。支持的WITH參數詳情請參見實時數倉Hologres

示例二:分庫合并同步

對于分庫合并同步的場景,需要利用正則表達式的庫名來匹配所要同步的多個分庫。使用CDAS可以將上游多個分庫下相同表名的數據合并同步到Hologres目標庫對應表名的同一張表中,庫名和表名會作為額外的兩個字段寫入到每張結果表中。為保證主鍵唯一性,庫名、表名和原主鍵一起作為對應Hologres表的新聯合主鍵。

假設MySQL實例中有order_db01~order_db99多個分庫,每個分庫下都有order、order_detail等多張表。您可以使用以下語句將99個分庫下的order、order_detail等表全部同步到Hologres中,包括未來的數據變更和表結構變更,無需提前在Hologres中創建表。order1

USE CATALOG holo;

CREATE DATABASE IF NOT EXISTS holo_order--在Hologres中創建holo_order庫,包括mysql中order分庫的所有表。
WITH('sink.parallelism'='4')        --可選,指定目標庫的參數,每個HologresSink默認并發為4。
AS DATABASE mysql.`order_db[0-9]+` INCLUDING ALL TABLES --同步mysql中order_db分庫下所有表。
/*+OPTIONS('server-id'='8001-8004')*/;  --可選,指定mysql-cdc源表的額外參數。

示例三:源庫新增表加入數據同步

使用VVR 8.0.6及以上版本時,CDAS作業啟動后,如果源庫新增表,支持從作業快照重啟,從而捕獲到新的表,對新增表進行數據同步。

  1. SQL作業開發時需要增加以下語句,開啟CDAS新增表讀取功能。

    SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
  2. 當出現新增的表需要同步時,停止作業并勾選停止前創建一次快照

  3. SQL開發中,重新部署這個SQL作業。

  4. 作業運維頁面單擊目標作業名稱,狀態集管理頁簽,單擊歷史

  5. 作業快照列表中,找到停止作業時創建的快照。

  6. 單擊目標快照操作列,選擇更多 > 從該快照恢復作業

  7. 作業啟動配置對話框,配置作業啟動信息,詳情請參見作業啟動

重要

新增表功能只能用于默認的initial啟動模式。

示例四:多CDAS&CTAS語句

實時計算Flink版支持使用STATEMENT SET語法將多個CTAS語句作為一個作業一起提交,并且可以對Source進行優化,復用一個Source節點讀取多業務表的數據。這對于MySQL CDC數據源場景尤為適用,因為這可以減少server-id的使用,減少對數據庫的連接數和讀取壓力。

說明

對于Source復用優化,需要這些Source表的options保持完全一致,才能合并成功進行復用。

假設MySQL實例中有tpcds、tpch、user_db01~user_db99(分庫分表)多個庫。您可以通過組合多條CDAS和CTAS語句,將MySQL實例下的所有庫和表都同步到Hologres,只需一個Flink作業便能完成所有表的同步,只需一個Source便能讀取所有表的數據,代碼示例如下。

USE CATALOG holo;

BEGIN STATEMENT SET;

-- 同步user分庫分表。
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`user_db[0-9]+`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;

-- 同步TPCDS庫。
CREATE DATABASE IF NOT EXISTS holo_tpcds
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
-- 同步TPCH庫。
CREATE DATABASE IF NOT EXISTS holo_tpch
AS DATABASE mysql.tpch INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;

END;

示例五:多CDAS語句整庫同步到Kafka

在使用多個CDAS語句整庫同步到Kafka時,由于不同的數據庫中可能存在相同的表,為了防止topic沖突,需要使用cdas.topic.pattern配置。cdas.topic.pattern定義了創建topic的名稱的格式,其中可通過{table-name}占位符來替換為表名。如:當設置'cdas.topic.pattern'='db1-{table-name}',對于上游表名為table1的表,在Kafka中對應的topic名稱為db1-table1

假設MySQL實例中有tpcds、tpch多個庫。您可以通過如下方式將MySQL實例下的所有庫和表都同步到Kafka,避免topic沖突,代碼示例如下。

USE CATALOG kafkaCatalog;

BEGIN STATEMENT SET;

-- 同步TPCDS庫。
CREATE DATABASE IF NOT EXISTS kafka
WITH ('cdas.topic.pattern' = 'tpcds-{table-name}')
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;

-- 同步TPCH庫。
CREATE DATABASE IF NOT EXISTS kafka
WITH ('cdas.topic.pattern' = 'tpch-{table-name}')
AS DATABASE mysql.tpch INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;

END;

實時計算Flink版提供MySQL整庫同步到Kafka的能力,通過引入Kafka作為中間層,并使用CDAS整庫同步或CTAS整表同步到Kafka來解決,具體操作請參見MySQL整庫同步Kafka

相關文檔