日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

CREATE TABLE AS(CTAS)語(yǔ)句

更新時(shí)間:

通過(guò)CTAS語(yǔ)句,在實(shí)時(shí)同步數(shù)據(jù)的同時(shí),還能實(shí)時(shí)將上游表結(jié)構(gòu)(Schema)的變更同步到下游表,提高您在目標(biāo)存儲(chǔ)中創(chuàng)建表和維護(hù)源表結(jié)構(gòu)變更的效率。本文為您介紹CREATE TABLE AS(CTAS)的使用方法,并提供了多種使用場(chǎng)景下的示例。

說(shuō)明

數(shù)據(jù)攝入YAML作業(yè)是實(shí)時(shí)計(jì)算Flink產(chǎn)品中集成的最新Flink CDC功能,支持通過(guò)簡(jiǎn)單的YAML語(yǔ)言編寫強(qiáng)大數(shù)據(jù)集成作業(yè)。

YAML作業(yè)覆蓋了CTAS和CDAS語(yǔ)句的關(guān)鍵能力,如整庫(kù)同步、schema evolution等,并能支持更多場(chǎng)景,如表結(jié)構(gòu)變更立即同步,原始binlog同步、自動(dòng)同步新增表等。建議使用YAML完成您的數(shù)據(jù)攝入作業(yè)邏輯開(kāi)發(fā),您可以參考數(shù)據(jù)攝入YAML最佳實(shí)踐了解更多案例。

前提條件

執(zhí)行CTAS語(yǔ)法前,確保工作空間中已注冊(cè)目標(biāo)端的Catalog。詳情請(qǐng)參見(jiàn)數(shù)據(jù)管理。

使用限制

  • 僅Flink計(jì)算引擎vvr-4.0.11-flink-1.13及以上版本支持CTAS語(yǔ)法。

    重要

    CTAS語(yǔ)法不支持進(jìn)行調(diào)試。

  • 僅Flink計(jì)算引擎vvr-4.0.12-flink-1.13及以上版本支持同步自定義計(jì)算列。

  • VVR 4.0.16以下版本,不支持在一個(gè)作業(yè)中使用多個(gè)CTAS語(yǔ)句將同一張數(shù)據(jù)源表同步到不同的結(jié)果表。

  • 不支持在同一作業(yè)中混合使用CTAS和INSERT INTO語(yǔ)句。

  • CTAS支持的上下游存儲(chǔ)列表如下,您可以從下表的源表和結(jié)果表中各選一個(gè)進(jìn)行組合。

    連接器名稱

    源表

    結(jié)果表

    備注

    MySQL

    ×

    • 分庫(kù)分表合并同步時(shí),默認(rèn)會(huì)同步上游存儲(chǔ)的數(shù)據(jù)庫(kù)名稱和表名稱。

    • 單表同步時(shí),不會(huì)同步數(shù)據(jù)庫(kù)名稱和表名稱。如果您需要同步數(shù)據(jù)庫(kù)名稱和表名稱,請(qǐng)使用SQL命令創(chuàng)建Catalog,并添加catalog.table.metadata-columns參數(shù)。詳情請(qǐng)參見(jiàn)SQL命令。

    • 不支持同步MySQL視圖。

    消息隊(duì)列Kafka

    ×

    無(wú)。

    MongoDB

    ×

    • 暫不支持分庫(kù)分表合并同步。

    • 暫不支持同步MongoDB元信息。

    • 暫不支持CTAS新增表功能。

    • 支持通過(guò)CTAS語(yǔ)句將MongoDB中的數(shù)據(jù)及表結(jié)構(gòu)變更同步至目標(biāo)表,示例可參考示例九。

    Upsert Kafka

    ×

    無(wú)。

    StarRocks

    ×

    僅支持EMR的StarRocks。

    實(shí)時(shí)數(shù)倉(cāng)Hologres

    ×

    如果下游是Hologres,CTAS在默認(rèn)情況下會(huì)為每個(gè)表創(chuàng)建相應(yīng)數(shù)量(connectionSize參數(shù)值)個(gè)連接。此時(shí)您就可以使用connectionPoolName參數(shù),讓配置相同名稱連接池的表可以共享連接池。

    說(shuō)明

    在將數(shù)據(jù)同步到Hologres時(shí),如果您的上游源表包含了Fixed Plan不支持類型的數(shù)據(jù),建議通過(guò)INSERT INTO語(yǔ)句的方式,在Flink內(nèi)部做類型轉(zhuǎn)換后將數(shù)據(jù)同步到Hologres。不要用CTAS方式創(chuàng)建Sink結(jié)果表進(jìn)行數(shù)據(jù)同步,因?yàn)檫@種方式會(huì)無(wú)法走Fixed Plan,寫入性能較差。

    流式數(shù)據(jù)湖倉(cāng)Paimon

    ×

    • 僅Flink計(jì)算引擎vvr-6.0.7-flink-1.15及以上版本支持Paimon結(jié)果表。

    • 僅實(shí)時(shí)計(jì)算引擎VVR 8.0.10及以上版本支持同步到Paimon DLF 2.0結(jié)果表。

功能特性

功能

詳情

單表同步

支持實(shí)時(shí)同步源表的全量和增量數(shù)據(jù)到結(jié)果表中。

表結(jié)構(gòu)變更同步

在實(shí)時(shí)同步數(shù)據(jù)的同時(shí),還支持將源表的表結(jié)構(gòu)變更(增加列信息等)實(shí)時(shí)同步到結(jié)果表中。

分庫(kù)分表合并同步

支持使用正則表達(dá)式定義庫(kù)名和表名,匹配數(shù)據(jù)源的多張分庫(kù)分表,合并后同步到下游的一張表中。

說(shuō)明

正則匹配時(shí),不支持使用^進(jìn)行表開(kāi)頭的匹配。

自定義計(jì)算列同步

支持在源表上新增計(jì)算列,以支持您對(duì)源表的某些列進(jìn)行轉(zhuǎn)換計(jì)算。計(jì)算列可以使用系統(tǒng)函數(shù)或自定義函數(shù),允許指定新增列的位置,并將其作為結(jié)果表的物理列,實(shí)時(shí)地將計(jì)算列的結(jié)果同步到結(jié)果表中。

多CTAS語(yǔ)句

支持使用STATEMENT SET語(yǔ)法將多個(gè)CTAS語(yǔ)句作為一個(gè)作業(yè)一起提交,并支持對(duì)Source節(jié)點(diǎn)的合并復(fù)用,降低對(duì)數(shù)據(jù)源的壓力。

多CTAS語(yǔ)句作業(yè),支持新增CTAS語(yǔ)句加入新增表到同步作業(yè)中,詳見(jiàn)示例六。

啟動(dòng)流程

當(dāng)執(zhí)行CTAS語(yǔ)句時(shí),將會(huì)按照以下流程執(zhí)行:

  1. 檢查目標(biāo)存儲(chǔ)中是否存在該結(jié)果表。

    • 如果不存在,則通過(guò)目標(biāo)端Catalog去目標(biāo)存儲(chǔ)中創(chuàng)建相應(yīng)的結(jié)果表,該結(jié)果表具有和數(shù)據(jù)源相同的Schema。

    • 如果存在,則跳過(guò)建表。

    • 如果已存在的結(jié)果表與源表Schema不一致,則會(huì)報(bào)錯(cuò)提示。

  2. 提交和啟動(dòng)相應(yīng)的數(shù)據(jù)同步作業(yè)。

    將數(shù)據(jù)源的數(shù)據(jù)以及Schema的變更同步到結(jié)果表中。

例如,從MySQL到Hologres同步CTAS數(shù)據(jù)流程如下圖所示。同步示意圖

表結(jié)構(gòu)變更同步策略

通過(guò)CTAS語(yǔ)句,在實(shí)時(shí)同步數(shù)據(jù)的同時(shí),還能將源表Schema的變更同步到結(jié)果表中。Schema變更包括初始的表創(chuàng)建以及未來(lái)的表變更。

  • 當(dāng)前支持的Schema變更策略詳情如下:

    • 添加可空列:會(huì)自動(dòng)在結(jié)果表Schema末尾添加對(duì)應(yīng)的列,并自動(dòng)同步新增列的數(shù)據(jù)。

    • 刪除可空列:不會(huì)直接在結(jié)果表中刪除該列,而是將該列的數(shù)據(jù)自動(dòng)填充為NULL值。

    • 添加非空列:會(huì)自動(dòng)在結(jié)果表Schema末尾添加對(duì)應(yīng)的列,并自動(dòng)同步新增列的數(shù)據(jù),新增的列會(huì)默認(rèn)設(shè)置為可空列,對(duì)于添加列發(fā)生之前的數(shù)據(jù)自動(dòng)設(shè)置為NULL值。

    • 重命名列:被看作為添加列和刪除列。直接在結(jié)果表中末尾添加重命名后的列,并將重命名前的列數(shù)據(jù)自動(dòng)填充為NULL值。例如,如果col_a重命名為col_b,則會(huì)在結(jié)果表末尾添加col_b,并自動(dòng)將col_a的數(shù)據(jù)填充為NULL值。

    • 列類型變更:

      • 對(duì)于支持列類型變更的下游系統(tǒng),在下游Sink支持處理列類型變更后,CTAS 支持普通列的類型變更,例如,從INT類型變更到BIGINT類型。此類變更依賴于下游Sink支持的列類型變更規(guī)則,不同的結(jié)果表支持的列類型變更規(guī)則也不相同,請(qǐng)參考結(jié)果表文檔獲取其支持的列類型變更規(guī)則,目前只有Paimon支持處理列類型變更。

      • 對(duì)于不支持列類型變更的下游系統(tǒng),比如Hologres,CTAS無(wú)法支持列類型變更。此類場(chǎng)景可以使用寬容模式同步,即在CTAS作業(yè)啟動(dòng)時(shí)在下游系統(tǒng)建立類型更加寬泛的表,在列類型變更發(fā)生時(shí)判斷該類變更下游Sink是否可以接受來(lái)實(shí)現(xiàn)寬容的列類型變更支持,詳情請(qǐng)參見(jiàn)示例八:CTAS語(yǔ)句使用字段類型寬容模式同步數(shù)據(jù)到Hologres表。目前只有Hologres支持寬容模式處理列類型變更。寬容模式應(yīng)該在首次啟動(dòng)CTAS作業(yè)時(shí)開(kāi)啟,如果在首次啟動(dòng)時(shí)未開(kāi)啟寬容模式,需要?jiǎng)h除下游表并且將作業(yè)無(wú)狀態(tài)重啟才能生效。

  • 暫不支持同步以下Schema的變更:

    • 主鍵或索引等約束的變更。

    • 非空列的刪除。

    • 從NOT NULL轉(zhuǎn)為NULLABLE變更。

重要
  • 如果遇到以上不支持的Schema變更,需要您手動(dòng)刪除下游結(jié)果表,重新啟動(dòng)CTAS作業(yè),即重新創(chuàng)建結(jié)果表并重新同步歷史數(shù)據(jù)。

  • CTAS不會(huì)去識(shí)別具體的DDL類型,而是對(duì)比前后兩條數(shù)據(jù)的Schema差異。因此,如果您先刪除了某列后,又加回了該列,且這兩個(gè)DDL之間無(wú)數(shù)據(jù)變化,那么CTAS會(huì)認(rèn)為沒(méi)有發(fā)生結(jié)構(gòu)變更。同理,如果您添加了一列,直到該表有數(shù)據(jù)變化,CTAS才會(huì)感知到結(jié)構(gòu)變更,才會(huì)同步結(jié)構(gòu)變更到結(jié)果表。

基本語(yǔ)法

CREATE TABLE IF NOT EXISTS <sink_table>
[COMMENT table_comment]
WITH (key1=val1, key2=val2, ...)
AS TABLE <source_table> [/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
[ADD COLUMN { <column_component> | (<column_component> [, ...])}];

<sink_table>:
  [catalog_name.][db_name.]table_name

<source_table>:
  [catalog_name.][db_name.]table_name

<column_component>:
  computed_column_definition [FIRST | AFTER column_name]

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]

CTAS語(yǔ)法復(fù)用了CREATE TABLE語(yǔ)法的基本結(jié)構(gòu),其中的參數(shù)解釋如下表所示。

參數(shù)

說(shuō)明

sink_table

數(shù)據(jù)同步的結(jié)果表名,可以指定具體的Catalog名稱和數(shù)據(jù)庫(kù)名稱。

COMMENT

結(jié)果表的描述,默認(rèn)使用source_table的描述。

WITH

結(jié)果表參數(shù),可填入結(jié)果表支持的WITH參數(shù)。支持的WITH參數(shù)詳情請(qǐng)參見(jiàn)Upsert Kafka WITH參數(shù)、Hologres WITH參數(shù)、StarRocks WITH參數(shù)Paimon WITH參數(shù)。

說(shuō)明

key和value都需要為字符串類型,例如'jdbcWriteBatchSize' = '1024'。

source_table

數(shù)據(jù)同步的源表表名,可指定具體的Catalog名稱和Database名稱。

OPTIONS

源表的參數(shù),可填入源表支持的WITH參數(shù)。支持的WITH參數(shù)詳情請(qǐng)參見(jiàn)MySQL WITH參數(shù)Kafka WITH參數(shù)。

說(shuō)明

key和value都需要為字符串類型,例如'server-id' = '65500'

ADD COLUMN

同步到結(jié)果表時(shí),相對(duì)于源表新增的列,僅支持計(jì)算列。

column_component

新增列的描述。

computed_column_definition

計(jì)算列表達(dá)式的描述。

FIRST

新增列作為源表的第一個(gè)字段。如果不添加該參數(shù),則新增列會(huì)默認(rèn)作為源表的最后一個(gè)字段。

AFTER

新增列放在源表指定字段后面。

PARTITION BY

系統(tǒng)支持根據(jù)某列進(jìn)行分區(qū),創(chuàng)建分區(qū)表。

說(shuō)明

因?yàn)镮F NOT EXISTS關(guān)鍵字為必填,所以如果結(jié)果表在目標(biāo)存儲(chǔ)中并不存在,則會(huì)先創(chuàng)建該結(jié)果表,否則跳過(guò)創(chuàng)建步驟。創(chuàng)建的結(jié)果表Schema會(huì)使用源表的Schema,包括主鍵以及物理字段的字段名和字段類型,不包括計(jì)算列、meta字段、Watermark。其中源表到結(jié)果表的字段類型會(huì)經(jīng)過(guò)類型映射,詳情請(qǐng)參見(jiàn)對(duì)應(yīng)連接器文檔中的類型映射。

代碼示例

示例一:?jiǎn)伪硗?/b>

通常,CTAS都會(huì)配合數(shù)據(jù)源的Catalog和目標(biāo)的Catalog一起使用,例如MySQL Catalog和Hologres Catalog結(jié)合CTAS語(yǔ)法,來(lái)完成MySQL到Hologres的全量和增量數(shù)據(jù)同步。使用MySQL Catalog可以自動(dòng)解析源表的Schema及相應(yīng)的參數(shù),而不用手動(dòng)編寫DDL 。

假設(shè)已在工作空間中注冊(cè)了名為holo的Hologres Catalog和名為mysql的MySQL Catalog。將MySQL中的web_sales表同步到Hologres中,代碼示例如下。

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS web_sales  
WITH ('jdbcWriteBatchSize' = '1024')   -- 可選,指定結(jié)果表的參數(shù)。
AS TABLE mysql.tpcds.web_sales   
/*+ OPTIONS('server-id'='8001-8004') */;  -- 指定mysql-cdc源表的額外參數(shù)。

示例二:分庫(kù)分表合并同步

對(duì)于分庫(kù)分表合并同步的場(chǎng)景,您可以結(jié)合MySQL Catalog,利用正則表達(dá)式的表名和庫(kù)名來(lái)匹配所要同步的多張表。使用CTAS可以將這多張分庫(kù)分表合并到一張Hologres表中,庫(kù)名和表名會(huì)作為額外的兩個(gè)字段寫入到該表中,為保證主鍵唯一性,庫(kù)名、表名和原主鍵一起作為該Hologres表的新聯(lián)合主鍵。

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`  
/*+ OPTIONS('server-id'='8001-8004') */;

其合并的效果如下圖所示。效果如果在user02表中新增一列age,并插入一條數(shù)據(jù)。此時(shí)雖然多張分表的Schema并不一致,但是user02表后續(xù)的數(shù)據(jù)和Schema變更都能實(shí)時(shí)地自動(dòng)同步到下游表中。

ALTER TABLE `user02` ADD COLUMN `age` INT;
INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30);

image

示例三:自定義計(jì)算列同步

本示例以u(píng)ser分庫(kù)分表合并同步作為基礎(chǔ),介紹在分庫(kù)分表合并的過(guò)程中,如何進(jìn)行一些轉(zhuǎn)換計(jì)算。

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */
ADD COLUMN (
  `c_id` AS `id` + 10 AFTER `id`,
  `calss` AS 3  AFTER `id`
);

新增計(jì)算列同步的效果如下圖所示。

image

示例四:多個(gè)CTAS語(yǔ)句作為一個(gè)作業(yè)提交

實(shí)時(shí)計(jì)算Flink版支持使用STATEMENT SET語(yǔ)法將多個(gè)CTAS語(yǔ)句作為一個(gè)作業(yè)一起提交,并且可以對(duì)Source進(jìn)行優(yōu)化,復(fù)用一個(gè)Source節(jié)點(diǎn)讀取多業(yè)務(wù)表的數(shù)據(jù)。這對(duì)于MySQL CDC數(shù)據(jù)源場(chǎng)景尤為適用,因?yàn)檫@可以減少server-id的使用,減少對(duì)數(shù)據(jù)庫(kù)的連接數(shù)和讀取壓力。

重要

對(duì)于Source復(fù)用優(yōu)化,需要這些Source表的options保持完全一致,才能合并成功進(jìn)行復(fù)用。

例如示例一同步了web_sales表,示例二同步了user分庫(kù)分表,您可以使用STATEMENT SET語(yǔ)法將它們作為一個(gè)作業(yè)提交。

USE CATALOG holo;

BEGIN STATEMENT SET;

-- 同步web_sales表。
CREATE TABLE IF NOT EXISTS web_sales
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */;

-- 同步user分庫(kù)分表。
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;

END;

示例五:多個(gè)CTAS語(yǔ)句將同一張數(shù)據(jù)源表同步到不同的結(jié)果表

4.0.16以上版本中,在不添加計(jì)算列時(shí),可以將同一張數(shù)據(jù)源表同步到不同的結(jié)果表。

USE CATALOG `holo`;

BEGIN STATEMENT SET;

-- 通過(guò)CTAS語(yǔ)句同步MySQL的user表到Holo數(shù)倉(cāng)database1的user表中
CREATE TABLE IF NOT EXISTS `database1`.`user`
AS TABLE `mysql`.`tpcds`.`user`
/*+ OPTIONS('server-id'='8001-8004') */;

-- 通過(guò)CTAS語(yǔ)句同步MySQL的user表到Holo數(shù)倉(cāng)database2的user表中
CREATE TABLE IF NOT EXISTS `database2`.`user`
AS TABLE `mysql`.`tpcds`.`user`
/*+ OPTIONS('server-id'='8001-8004') */;

END;

如果結(jié)果表需要添加計(jì)算列,則應(yīng)按照如下方式進(jìn)行同步:

-- 基于源表user創(chuàng)建臨時(shí)表user_with_changed_id,支持定義計(jì)算列,例如這里的computed_id是基于源表的id計(jì)算獲得。
CREATE TEMPORARY TABLE `user_with_changed_id` (
  `computed_id` AS `id` + 1000
) LIKE `mysql`.`tpcds`.`user`;

-- 基于源表user創(chuàng)建臨時(shí)表user_with_changed_age,支持定義計(jì)算列,例如這里的computed_age是基于源表的age計(jì)算獲得。
CREATE TEMPORARY TABLE `user_with_changed_age` (
  `computed_age` AS `age` + 1
) LIKE `mysql`.`tpcds`.`user`;

BEGIN STATEMENT SET;

-- 通過(guò)CTAS語(yǔ)句同步MySQL的user表到Holo數(shù)倉(cāng)的user_with_changed_id表中,表中會(huì)包含通過(guò)計(jì)算獲得的id,即computed_id列。 
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_id`
AS TABLE `user_with_changed_id`
/*+ OPTIONS('server-id'='8001-8004') */;

-- 通過(guò)CTAS語(yǔ)句同步MySQL的user表到Holo數(shù)倉(cāng)的user_with_changed_age表中,表中會(huì)包含通過(guò)計(jì)算獲得的age,即computed_age列。 
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_age`
AS TABLE `user_with_changed_age`
/*+ OPTIONS('server-id'='8001-8004') */;

END;

示例六:多個(gè)CTAS語(yǔ)句時(shí),新增CTAS語(yǔ)句加入數(shù)據(jù)同步作業(yè)

使用VVR 8.0.1及以上版本時(shí),多個(gè)CTAS語(yǔ)句的作業(yè)啟動(dòng)后,如果新增CTAS語(yǔ)句,支持從作業(yè)快照重啟,從而捕獲到新的表,對(duì)新增表進(jìn)行數(shù)據(jù)同步。

  1. SQL作業(yè)開(kāi)發(fā)時(shí)需要增加以下語(yǔ)句,開(kāi)啟新增表讀取功能。

    SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
  2. 當(dāng)需要新增CTAS語(yǔ)句時(shí),在作業(yè)運(yùn)維頁(yè)面停止作業(yè)并勾選停止前創(chuàng)建一次快照

  3. SQL開(kāi)發(fā)中,增加CTAS語(yǔ)句,并重新部署這個(gè)SQL作業(yè)。

  4. 作業(yè)運(yùn)維頁(yè)面單擊目標(biāo)作業(yè)名稱,狀態(tài)集管理頁(yè)簽,單擊歷史。

  5. 作業(yè)快照列表中,找到停止作業(yè)時(shí)創(chuàng)建的快照。

  6. 單擊目標(biāo)快照操作列,選擇更多 > 從該快照恢復(fù)作業(yè)。

  7. 作業(yè)啟動(dòng)配置對(duì)話框,配置作業(yè)啟動(dòng)信息,詳情請(qǐng)參見(jiàn)作業(yè)啟動(dòng)。

重要

新增CTAS語(yǔ)句使用時(shí),存在以下限制:

  • 使用CDC源表同步時(shí),僅支持源表啟動(dòng)模式為initial的作業(yè)使用新增表功能。

  • 新增的CTAS語(yǔ)句的對(duì)應(yīng)Source必須能夠復(fù)用優(yōu)化,也就是新增的源表配置需要和原有的源表配置保持完全一致。

  • 新增CTAS語(yǔ)句前后,作業(yè)不能有其他參數(shù)的變更,比如更改啟動(dòng)模式。

示例七:通過(guò)CTAS語(yǔ)句將MySQL數(shù)據(jù)源表同步到Hologres分區(qū)表

Hologres分區(qū)表建表時(shí),如果Hologres表存在主鍵,則要求分區(qū)字段必須是主鍵中的字段。假設(shè)有一張MySQL表需要同步到Hologres,其建表語(yǔ)句如下。

CREATE TABLE orders (
    order_id INTEGER NOT NULL,
    product_id INTEGER NOT NULL,
    city VARCHAR(100) NOT NULL
    order_date DATE,
    purchaser INTEGER,
    PRIMARY KEY(order_id, product_id)
);

當(dāng)使用CTAS同步數(shù)據(jù)源表到Hologres的分區(qū)表中時(shí):

  • 如果上游表的主鍵包含分區(qū)字段,例如Hologres表的分區(qū)字段是product_id,可以通過(guò)如下SQL實(shí)現(xiàn)。

    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`
    PARTITIONED BY (product_id)
    AS TABLE `mysql`.`tpcds`.`orders`;
  • 如果上游表的主鍵不包含分區(qū)字段,例如Hologres表的分區(qū)字段是city,創(chuàng)建Hologres表時(shí)會(huì)使用MySQL表中的主鍵,由于上游表的主鍵不包含分區(qū)字段,作業(yè)會(huì)出錯(cuò)。此時(shí),您可以在CTAS中通過(guò)聲明主鍵的方式,重新指定目標(biāo)Hologres分區(qū)表的主鍵,使得任務(wù)正常運(yùn)行,示例如下。

    -- 可以通過(guò)如下SQL指定Hologres分區(qū)表的主鍵為order_id,product_id和city。
    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`(
        CONSTRAINT `PK_order_id_city` PRIMARY KEY (`order_id`,`product_id`,`city`) NOT ENFORCED
    )
    PARTITIONED BY (city)
    AS TABLE `mysql`.`tpcds`.`orders`;

示例八:CTAS語(yǔ)句使用字段類型寬容模式同步數(shù)據(jù)到Hologres表

在CTAS場(chǎng)景中,可能需要調(diào)整已有字段數(shù)據(jù)類型的精度(例如,從VARCHAR(10)到VARCHAR(20))。

  • Flink計(jì)算引擎VVR 6.0.5-Flink 1.15以下版本,上游修改數(shù)據(jù)類型可能導(dǎo)致CTAS任務(wù)失敗,只能重建結(jié)果表。

  • Flink計(jì)算引擎VVR 6.0.5-Flink 1.15及以上版本,在同步數(shù)據(jù)到Hologres表時(shí),支持使用類型寬容模式。寬容模式應(yīng)該在首次啟動(dòng)CTAS作業(yè)時(shí)開(kāi)啟,如果在首次啟動(dòng)時(shí)未開(kāi)啟寬容模式,需要?jiǎng)h除下游表并且將作業(yè)無(wú)狀態(tài)重啟才能生效。

    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders` 
    WITH (
    'connector' = 'hologres', 
    'enableTypeNormalization' = 'true' -- 使用字段類型寬容模式。
    ) AS TABLE `mysql`.`tpcds`.`orders`;

    在上游發(fā)生數(shù)據(jù)類型修改事件時(shí),只要所修改類型與原類型的歸一化類型相同,都視作修改成功。目前類型歸一化規(guī)則如下:

    • TINYINT、SMALLINT、INT和BIGINT歸一化為BIGINT。

    • CHAR、VARCHAR和STRING歸一化為STRING。

    • FLOAT和DOUBLE歸一化為DOUBLE。

    • 其他數(shù)據(jù)類型按照原本的類型映射規(guī)則創(chuàng)建,詳情參見(jiàn)類型映射

    例如:

    • SMALLINT修改為INT,兩者的歸一化類型都是BIGINT,視為修改成功,CTAS作業(yè)正常運(yùn)行。

    • 從FLOAT改為BIGINT,兩者的歸一化類型分別為DOUBLE和BIGINT,屬于不兼容的情況,會(huì)拋出異常。

示例九:通過(guò)CTAS語(yǔ)句將MongoDB數(shù)據(jù)源表同步到Hologres表

實(shí)時(shí)計(jì)算Flink VVR 8.0.6及以上版本,CTAS語(yǔ)句支持同步MongoDB數(shù)據(jù)源表,能夠在實(shí)時(shí)同步MongoDB數(shù)據(jù)的同時(shí)將上游表結(jié)構(gòu)變更同步到下游表??梢耘浜螹ongoDB Catalog使用,無(wú)需手動(dòng)定義Schema,MongoDB Catalog詳情可參考管理MongoDB Catalog

這里以使用CTAS語(yǔ)句同步MongoDB數(shù)據(jù)源表數(shù)據(jù)到Hologres表為例:

BEGIN STATEMENT SET;

CREATE TABLE IF NOT EXISTS `holo`.`database`.`table1`
AS TABLE `mongodb`.`database`.`collection1`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;

CREATE TABLE IF NOT EXISTS `holo`.`database`.`table2`
AS TABLE `mongodb`.`database`.`collection2`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;

END;
重要

使用CTAS或CDAS語(yǔ)句將MongoDB中的數(shù)據(jù)同步至目標(biāo)表時(shí),必須滿足以下要求:

  • 實(shí)時(shí)計(jì)算Flink VVR版本必須為8.0.6及以上,MongoDB數(shù)據(jù)庫(kù)版本必須為6.0及以上。

  • 在SQL Hints中已將scan.incremental.snapshot.enabled和scan.full-changelog參數(shù)都設(shè)置為true。

  • MongoDB數(shù)據(jù)庫(kù)已開(kāi)啟前像后像(Pre- and Post-images)記錄功能,開(kāi)啟方法參見(jiàn)Document Preimages。

當(dāng)使用同一個(gè)作業(yè)同步多個(gè)MongoDB集合時(shí),需要滿足以下條件:

  • 每張表關(guān)于MongoDB的配置必須完全相同,包括hosts、scheme、username、password、connectionOptions。

  • 每張表的scan.startup.mode配置必須完全相同。

示例十:MySQL整庫(kù)同步Kafka

在實(shí)際使用中,同一張MySQL表可能被多個(gè)作業(yè)依賴,當(dāng)多個(gè)任務(wù)使用同一張MySQL表做處理時(shí),MySQL數(shù)據(jù)庫(kù)會(huì)啟動(dòng)多個(gè)連接,對(duì)MySQL服務(wù)器和網(wǎng)絡(luò)造成很大的壓力。為了緩解對(duì)上游MySQL數(shù)據(jù)庫(kù)的壓力,實(shí)時(shí)計(jì)算Flink版提供MySQL整庫(kù)同步到Kafka的能力,通過(guò)引入Kafka作為中間層,并使用CDAS整庫(kù)同步或CTAS整表同步到Kafka來(lái)解決。具體操作請(qǐng)參見(jiàn)MySQL整庫(kù)同步Kafka。

相關(guān)文檔