CREATE TABLE AS(CTAS)語(yǔ)句
通過(guò)CTAS語(yǔ)句,在實(shí)時(shí)同步數(shù)據(jù)的同時(shí),還能實(shí)時(shí)將上游表結(jié)構(gòu)(Schema)的變更同步到下游表,提高您在目標(biāo)存儲(chǔ)中創(chuàng)建表和維護(hù)源表結(jié)構(gòu)變更的效率。本文為您介紹CREATE TABLE AS(CTAS)的使用方法,并提供了多種使用場(chǎng)景下的示例。
數(shù)據(jù)攝入YAML作業(yè)是實(shí)時(shí)計(jì)算Flink產(chǎn)品中集成的最新Flink CDC功能,支持通過(guò)簡(jiǎn)單的YAML語(yǔ)言編寫強(qiáng)大數(shù)據(jù)集成作業(yè)。
YAML作業(yè)覆蓋了CTAS和CDAS語(yǔ)句的關(guān)鍵能力,如整庫(kù)同步、schema evolution等,并能支持更多場(chǎng)景,如表結(jié)構(gòu)變更立即同步,原始binlog同步、自動(dòng)同步新增表等。建議使用YAML完成您的數(shù)據(jù)攝入作業(yè)邏輯開(kāi)發(fā),您可以參考數(shù)據(jù)攝入YAML最佳實(shí)踐了解更多案例。
前提條件
執(zhí)行CTAS語(yǔ)法前,確保工作空間中已注冊(cè)目標(biāo)端的Catalog。詳情請(qǐng)參見(jiàn)數(shù)據(jù)管理。
使用限制
僅Flink計(jì)算引擎vvr-4.0.11-flink-1.13及以上版本支持CTAS語(yǔ)法。
重要CTAS語(yǔ)法不支持進(jìn)行調(diào)試。
僅Flink計(jì)算引擎vvr-4.0.12-flink-1.13及以上版本支持同步自定義計(jì)算列。
VVR 4.0.16以下版本,不支持在一個(gè)作業(yè)中使用多個(gè)CTAS語(yǔ)句將同一張數(shù)據(jù)源表同步到不同的結(jié)果表。
不支持在同一作業(yè)中混合使用CTAS和INSERT INTO語(yǔ)句。
CTAS支持的上下游存儲(chǔ)列表如下,您可以從下表的源表和結(jié)果表中各選一個(gè)進(jìn)行組合。
連接器名稱
源表
結(jié)果表
備注
√
×
分庫(kù)分表合并同步時(shí),默認(rèn)會(huì)同步上游存儲(chǔ)的數(shù)據(jù)庫(kù)名稱和表名稱。
單表同步時(shí),不會(huì)同步數(shù)據(jù)庫(kù)名稱和表名稱。如果您需要同步數(shù)據(jù)庫(kù)名稱和表名稱,請(qǐng)使用SQL命令創(chuàng)建Catalog,并添加catalog.table.metadata-columns參數(shù)。詳情請(qǐng)參見(jiàn)SQL命令。
不支持同步MySQL視圖。
√
×
無(wú)。
√
×
暫不支持分庫(kù)分表合并同步。
暫不支持同步MongoDB元信息。
暫不支持CTAS新增表功能。
支持通過(guò)CTAS語(yǔ)句將MongoDB中的數(shù)據(jù)及表結(jié)構(gòu)變更同步至目標(biāo)表,示例可參考示例九。
×
√
無(wú)。
×
√
僅支持EMR的StarRocks。
×
√
如果下游是Hologres,CTAS在默認(rèn)情況下會(huì)為每個(gè)表創(chuàng)建相應(yīng)數(shù)量(connectionSize參數(shù)值)個(gè)連接。此時(shí)您就可以使用connectionPoolName參數(shù),讓配置相同名稱連接池的表可以共享連接池。
說(shuō)明在將數(shù)據(jù)同步到Hologres時(shí),如果您的上游源表包含了Fixed Plan不支持類型的數(shù)據(jù),建議通過(guò)INSERT INTO語(yǔ)句的方式,在Flink內(nèi)部做類型轉(zhuǎn)換后將數(shù)據(jù)同步到Hologres。不要用CTAS方式創(chuàng)建Sink結(jié)果表進(jìn)行數(shù)據(jù)同步,因?yàn)檫@種方式會(huì)無(wú)法走Fixed Plan,寫入性能較差。
×
√
僅Flink計(jì)算引擎vvr-6.0.7-flink-1.15及以上版本支持Paimon結(jié)果表。
僅實(shí)時(shí)計(jì)算引擎VVR 8.0.10及以上版本支持同步到Paimon DLF 2.0結(jié)果表。
功能特性
功能 | 詳情 |
單表同步 | 支持實(shí)時(shí)同步源表的全量和增量數(shù)據(jù)到結(jié)果表中。 |
表結(jié)構(gòu)變更同步 | 在實(shí)時(shí)同步數(shù)據(jù)的同時(shí),還支持將源表的表結(jié)構(gòu)變更(增加列信息等)實(shí)時(shí)同步到結(jié)果表中。 |
分庫(kù)分表合并同步 | 支持使用正則表達(dá)式定義庫(kù)名和表名,匹配數(shù)據(jù)源的多張分庫(kù)分表,合并后同步到下游的一張表中。 說(shuō)明 正則匹配時(shí),不支持使用^進(jìn)行表開(kāi)頭的匹配。 |
自定義計(jì)算列同步 | 支持在源表上新增計(jì)算列,以支持您對(duì)源表的某些列進(jìn)行轉(zhuǎn)換計(jì)算。計(jì)算列可以使用系統(tǒng)函數(shù)或自定義函數(shù),允許指定新增列的位置,并將其作為結(jié)果表的物理列,實(shí)時(shí)地將計(jì)算列的結(jié)果同步到結(jié)果表中。 |
多CTAS語(yǔ)句 | 支持使用STATEMENT SET語(yǔ)法將多個(gè)CTAS語(yǔ)句作為一個(gè)作業(yè)一起提交,并支持對(duì)Source節(jié)點(diǎn)的合并復(fù)用,降低對(duì)數(shù)據(jù)源的壓力。 多CTAS語(yǔ)句作業(yè),支持新增CTAS語(yǔ)句加入新增表到同步作業(yè)中,詳見(jiàn)示例六。 |
啟動(dòng)流程
當(dāng)執(zhí)行CTAS語(yǔ)句時(shí),將會(huì)按照以下流程執(zhí)行:
檢查目標(biāo)存儲(chǔ)中是否存在該結(jié)果表。
如果不存在,則通過(guò)目標(biāo)端Catalog去目標(biāo)存儲(chǔ)中創(chuàng)建相應(yīng)的結(jié)果表,該結(jié)果表具有和數(shù)據(jù)源相同的Schema。
如果存在,則跳過(guò)建表。
如果已存在的結(jié)果表與源表Schema不一致,則會(huì)報(bào)錯(cuò)提示。
提交和啟動(dòng)相應(yīng)的數(shù)據(jù)同步作業(yè)。
將數(shù)據(jù)源的數(shù)據(jù)以及Schema的變更同步到結(jié)果表中。
例如,從MySQL到Hologres同步CTAS數(shù)據(jù)流程如下圖所示。
表結(jié)構(gòu)變更同步策略
通過(guò)CTAS語(yǔ)句,在實(shí)時(shí)同步數(shù)據(jù)的同時(shí),還能將源表Schema的變更同步到結(jié)果表中。Schema變更包括初始的表創(chuàng)建以及未來(lái)的表變更。
當(dāng)前支持的Schema變更策略詳情如下:
添加可空列:會(huì)自動(dòng)在結(jié)果表Schema末尾添加對(duì)應(yīng)的列,并自動(dòng)同步新增列的數(shù)據(jù)。
刪除可空列:不會(huì)直接在結(jié)果表中刪除該列,而是將該列的數(shù)據(jù)自動(dòng)填充為NULL值。
添加非空列:會(huì)自動(dòng)在結(jié)果表Schema末尾添加對(duì)應(yīng)的列,并自動(dòng)同步新增列的數(shù)據(jù),新增的列會(huì)默認(rèn)設(shè)置為可空列,對(duì)于添加列發(fā)生之前的數(shù)據(jù)自動(dòng)設(shè)置為NULL值。
重命名列:被看作為添加列和刪除列。直接在結(jié)果表中末尾添加重命名后的列,并將重命名前的列數(shù)據(jù)自動(dòng)填充為NULL值。例如,如果col_a重命名為col_b,則會(huì)在結(jié)果表末尾添加col_b,并自動(dòng)將col_a的數(shù)據(jù)填充為NULL值。
列類型變更:
對(duì)于支持列類型變更的下游系統(tǒng),在下游Sink支持處理列類型變更后,CTAS 支持普通列的類型變更,例如,從INT類型變更到BIGINT類型。此類變更依賴于下游Sink支持的列類型變更規(guī)則,不同的結(jié)果表支持的列類型變更規(guī)則也不相同,請(qǐng)參考結(jié)果表文檔獲取其支持的列類型變更規(guī)則,目前只有Paimon支持處理列類型變更。
對(duì)于不支持列類型變更的下游系統(tǒng),比如Hologres,CTAS無(wú)法支持列類型變更。此類場(chǎng)景可以使用寬容模式同步,即在CTAS作業(yè)啟動(dòng)時(shí)在下游系統(tǒng)建立類型更加寬泛的表,在列類型變更發(fā)生時(shí)判斷該類變更下游Sink是否可以接受來(lái)實(shí)現(xiàn)寬容的列類型變更支持,詳情請(qǐng)參見(jiàn)示例八:CTAS語(yǔ)句使用字段類型寬容模式同步數(shù)據(jù)到Hologres表。目前只有Hologres支持寬容模式處理列類型變更。寬容模式應(yīng)該在首次啟動(dòng)CTAS作業(yè)時(shí)開(kāi)啟,如果在首次啟動(dòng)時(shí)未開(kāi)啟寬容模式,需要?jiǎng)h除下游表并且將作業(yè)無(wú)狀態(tài)重啟才能生效。
暫不支持同步以下Schema的變更:
主鍵或索引等約束的變更。
非空列的刪除。
從NOT NULL轉(zhuǎn)為NULLABLE變更。
如果遇到以上不支持的Schema變更,需要您手動(dòng)刪除下游結(jié)果表,重新啟動(dòng)CTAS作業(yè),即重新創(chuàng)建結(jié)果表并重新同步歷史數(shù)據(jù)。
CTAS不會(huì)去識(shí)別具體的DDL類型,而是對(duì)比前后兩條數(shù)據(jù)的Schema差異。因此,如果您先刪除了某列后,又加回了該列,且這兩個(gè)DDL之間無(wú)數(shù)據(jù)變化,那么CTAS會(huì)認(rèn)為沒(méi)有發(fā)生結(jié)構(gòu)變更。同理,如果您添加了一列,直到該表有數(shù)據(jù)變化,CTAS才會(huì)感知到結(jié)構(gòu)變更,才會(huì)同步結(jié)構(gòu)變更到結(jié)果表。
基本語(yǔ)法
CREATE TABLE IF NOT EXISTS <sink_table>
[COMMENT table_comment]
WITH (key1=val1, key2=val2, ...)
AS TABLE <source_table> [/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
[ADD COLUMN { <column_component> | (<column_component> [, ...])}];
<sink_table>:
[catalog_name.][db_name.]table_name
<source_table>:
[catalog_name.][db_name.]table_name
<column_component>:
computed_column_definition [FIRST | AFTER column_name]
<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]
CTAS語(yǔ)法復(fù)用了CREATE TABLE語(yǔ)法的基本結(jié)構(gòu),其中的參數(shù)解釋如下表所示。
參數(shù) | 說(shuō)明 |
sink_table | 數(shù)據(jù)同步的結(jié)果表名,可以指定具體的Catalog名稱和數(shù)據(jù)庫(kù)名稱。 |
COMMENT | 結(jié)果表的描述,默認(rèn)使用source_table的描述。 |
WITH | 結(jié)果表參數(shù),可填入結(jié)果表支持的WITH參數(shù)。支持的WITH參數(shù)詳情請(qǐng)參見(jiàn)Upsert Kafka WITH參數(shù)、Hologres WITH參數(shù)、StarRocks WITH參數(shù)或Paimon WITH參數(shù)。 說(shuō)明 key和value都需要為字符串類型,例如 |
source_table | 數(shù)據(jù)同步的源表表名,可指定具體的Catalog名稱和Database名稱。 |
OPTIONS | 源表的參數(shù),可填入源表支持的WITH參數(shù)。支持的WITH參數(shù)詳情請(qǐng)參見(jiàn)MySQL WITH參數(shù)和Kafka WITH參數(shù)。 說(shuō)明 key和value都需要為字符串類型,例如'server-id' = '65500'。 |
ADD COLUMN | 同步到結(jié)果表時(shí),相對(duì)于源表新增的列,僅支持計(jì)算列。 |
column_component | 新增列的描述。 |
computed_column_definition | 計(jì)算列表達(dá)式的描述。 |
FIRST | 新增列作為源表的第一個(gè)字段。如果不添加該參數(shù),則新增列會(huì)默認(rèn)作為源表的最后一個(gè)字段。 |
AFTER | 新增列放在源表指定字段后面。 |
PARTITION BY | 系統(tǒng)支持根據(jù)某列進(jìn)行分區(qū),創(chuàng)建分區(qū)表。 |
因?yàn)镮F NOT EXISTS關(guān)鍵字為必填,所以如果結(jié)果表在目標(biāo)存儲(chǔ)中并不存在,則會(huì)先創(chuàng)建該結(jié)果表,否則跳過(guò)創(chuàng)建步驟。創(chuàng)建的結(jié)果表Schema會(huì)使用源表的Schema,包括主鍵以及物理字段的字段名和字段類型,不包括計(jì)算列、meta字段、Watermark。其中源表到結(jié)果表的字段類型會(huì)經(jīng)過(guò)類型映射,詳情請(qǐng)參見(jiàn)對(duì)應(yīng)連接器文檔中的類型映射。
代碼示例
示例一:?jiǎn)伪硗?/b>
通常,CTAS都會(huì)配合數(shù)據(jù)源的Catalog和目標(biāo)的Catalog一起使用,例如MySQL Catalog和Hologres Catalog結(jié)合CTAS語(yǔ)法,來(lái)完成MySQL到Hologres的全量和增量數(shù)據(jù)同步。使用MySQL Catalog可以自動(dòng)解析源表的Schema及相應(yīng)的參數(shù),而不用手動(dòng)編寫DDL 。
假設(shè)已在工作空間中注冊(cè)了名為holo的Hologres Catalog和名為mysql的MySQL Catalog。將MySQL中的web_sales表同步到Hologres中,代碼示例如下。
USE CATALOG holo;
CREATE TABLE IF NOT EXISTS web_sales
WITH ('jdbcWriteBatchSize' = '1024') -- 可選,指定結(jié)果表的參數(shù)。
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */; -- 指定mysql-cdc源表的額外參數(shù)。
示例二:分庫(kù)分表合并同步
對(duì)于分庫(kù)分表合并同步的場(chǎng)景,您可以結(jié)合MySQL Catalog,利用正則表達(dá)式的表名和庫(kù)名來(lái)匹配所要同步的多張表。使用CTAS可以將這多張分庫(kù)分表合并到一張Hologres表中,庫(kù)名和表名會(huì)作為額外的兩個(gè)字段寫入到該表中,為保證主鍵唯一性,庫(kù)名、表名和原主鍵一起作為該Hologres表的新聯(lián)合主鍵。
USE CATALOG holo;
CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;
其合并的效果如下圖所示。如果在user02表中新增一列age,并插入一條數(shù)據(jù)。此時(shí)雖然多張分表的Schema并不一致,但是user02表后續(xù)的數(shù)據(jù)和Schema變更都能實(shí)時(shí)地自動(dòng)同步到下游表中。
ALTER TABLE `user02` ADD COLUMN `age` INT;
INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30);
示例三:自定義計(jì)算列同步
本示例以u(píng)ser分庫(kù)分表合并同步作為基礎(chǔ),介紹在分庫(kù)分表合并的過(guò)程中,如何進(jìn)行一些轉(zhuǎn)換計(jì)算。
USE CATALOG holo;
CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */
ADD COLUMN (
`c_id` AS `id` + 10 AFTER `id`,
`calss` AS 3 AFTER `id`
);
新增計(jì)算列同步的效果如下圖所示。
示例四:多個(gè)CTAS語(yǔ)句作為一個(gè)作業(yè)提交
實(shí)時(shí)計(jì)算Flink版支持使用STATEMENT SET語(yǔ)法將多個(gè)CTAS語(yǔ)句作為一個(gè)作業(yè)一起提交,并且可以對(duì)Source進(jìn)行優(yōu)化,復(fù)用一個(gè)Source節(jié)點(diǎn)讀取多業(yè)務(wù)表的數(shù)據(jù)。這對(duì)于MySQL CDC數(shù)據(jù)源場(chǎng)景尤為適用,因?yàn)檫@可以減少server-id的使用,減少對(duì)數(shù)據(jù)庫(kù)的連接數(shù)和讀取壓力。
對(duì)于Source復(fù)用優(yōu)化,需要這些Source表的options保持完全一致,才能合并成功進(jìn)行復(fù)用。
例如示例一同步了web_sales表,示例二同步了user分庫(kù)分表,您可以使用STATEMENT SET語(yǔ)法將它們作為一個(gè)作業(yè)提交。
USE CATALOG holo;
BEGIN STATEMENT SET;
-- 同步web_sales表。
CREATE TABLE IF NOT EXISTS web_sales
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */;
-- 同步user分庫(kù)分表。
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;
END;
示例五:多個(gè)CTAS語(yǔ)句將同一張數(shù)據(jù)源表同步到不同的結(jié)果表
4.0.16以上版本中,在不添加計(jì)算列時(shí),可以將同一張數(shù)據(jù)源表同步到不同的結(jié)果表。
USE CATALOG `holo`;
BEGIN STATEMENT SET;
-- 通過(guò)CTAS語(yǔ)句同步MySQL的user表到Holo數(shù)倉(cāng)database1的user表中
CREATE TABLE IF NOT EXISTS `database1`.`user`
AS TABLE `mysql`.`tpcds`.`user`
/*+ OPTIONS('server-id'='8001-8004') */;
-- 通過(guò)CTAS語(yǔ)句同步MySQL的user表到Holo數(shù)倉(cāng)database2的user表中
CREATE TABLE IF NOT EXISTS `database2`.`user`
AS TABLE `mysql`.`tpcds`.`user`
/*+ OPTIONS('server-id'='8001-8004') */;
END;
如果結(jié)果表需要添加計(jì)算列,則應(yīng)按照如下方式進(jìn)行同步:
-- 基于源表user創(chuàng)建臨時(shí)表user_with_changed_id,支持定義計(jì)算列,例如這里的computed_id是基于源表的id計(jì)算獲得。
CREATE TEMPORARY TABLE `user_with_changed_id` (
`computed_id` AS `id` + 1000
) LIKE `mysql`.`tpcds`.`user`;
-- 基于源表user創(chuàng)建臨時(shí)表user_with_changed_age,支持定義計(jì)算列,例如這里的computed_age是基于源表的age計(jì)算獲得。
CREATE TEMPORARY TABLE `user_with_changed_age` (
`computed_age` AS `age` + 1
) LIKE `mysql`.`tpcds`.`user`;
BEGIN STATEMENT SET;
-- 通過(guò)CTAS語(yǔ)句同步MySQL的user表到Holo數(shù)倉(cāng)的user_with_changed_id表中,表中會(huì)包含通過(guò)計(jì)算獲得的id,即computed_id列。
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_id`
AS TABLE `user_with_changed_id`
/*+ OPTIONS('server-id'='8001-8004') */;
-- 通過(guò)CTAS語(yǔ)句同步MySQL的user表到Holo數(shù)倉(cāng)的user_with_changed_age表中,表中會(huì)包含通過(guò)計(jì)算獲得的age,即computed_age列。
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_age`
AS TABLE `user_with_changed_age`
/*+ OPTIONS('server-id'='8001-8004') */;
END;
示例六:多個(gè)CTAS語(yǔ)句時(shí),新增CTAS語(yǔ)句加入數(shù)據(jù)同步作業(yè)
使用VVR 8.0.1及以上版本時(shí),多個(gè)CTAS語(yǔ)句的作業(yè)啟動(dòng)后,如果新增CTAS語(yǔ)句,支持從作業(yè)快照重啟,從而捕獲到新的表,對(duì)新增表進(jìn)行數(shù)據(jù)同步。
SQL作業(yè)開(kāi)發(fā)時(shí)需要增加以下語(yǔ)句,開(kāi)啟新增表讀取功能。
SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
當(dāng)需要新增CTAS語(yǔ)句時(shí),在作業(yè)運(yùn)維頁(yè)面停止作業(yè)并勾選停止前創(chuàng)建一次快照。
在SQL開(kāi)發(fā)中,增加CTAS語(yǔ)句,并重新部署這個(gè)SQL作業(yè)。
在作業(yè)運(yùn)維頁(yè)面單擊目標(biāo)作業(yè)名稱,狀態(tài)集管理頁(yè)簽,單擊歷史。
在作業(yè)快照列表中,找到停止作業(yè)時(shí)創(chuàng)建的快照。
單擊目標(biāo)快照操作列,選擇
。在作業(yè)啟動(dòng)配置對(duì)話框,配置作業(yè)啟動(dòng)信息,詳情請(qǐng)參見(jiàn)作業(yè)啟動(dòng)。
新增CTAS語(yǔ)句使用時(shí),存在以下限制:
使用CDC源表同步時(shí),僅支持源表啟動(dòng)模式為initial的作業(yè)使用新增表功能。
新增的CTAS語(yǔ)句的對(duì)應(yīng)Source必須能夠復(fù)用優(yōu)化,也就是新增的源表配置需要和原有的源表配置保持完全一致。
新增CTAS語(yǔ)句前后,作業(yè)不能有其他參數(shù)的變更,比如更改啟動(dòng)模式。
示例七:通過(guò)CTAS語(yǔ)句將MySQL數(shù)據(jù)源表同步到Hologres分區(qū)表
Hologres分區(qū)表建表時(shí),如果Hologres表存在主鍵,則要求分區(qū)字段必須是主鍵中的字段。假設(shè)有一張MySQL表需要同步到Hologres,其建表語(yǔ)句如下。
CREATE TABLE orders (
order_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
city VARCHAR(100) NOT NULL
order_date DATE,
purchaser INTEGER,
PRIMARY KEY(order_id, product_id)
);
當(dāng)使用CTAS同步數(shù)據(jù)源表到Hologres的分區(qū)表中時(shí):
如果上游表的主鍵包含分區(qū)字段,例如Hologres表的分區(qū)字段是product_id,可以通過(guò)如下SQL實(shí)現(xiàn)。
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders` PARTITIONED BY (product_id) AS TABLE `mysql`.`tpcds`.`orders`;
如果上游表的主鍵不包含分區(qū)字段,例如Hologres表的分區(qū)字段是city,創(chuàng)建Hologres表時(shí)會(huì)使用MySQL表中的主鍵,由于上游表的主鍵不包含分區(qū)字段,作業(yè)會(huì)出錯(cuò)。此時(shí),您可以在CTAS中通過(guò)聲明主鍵的方式,重新指定目標(biāo)Hologres分區(qū)表的主鍵,使得任務(wù)正常運(yùn)行,示例如下。
-- 可以通過(guò)如下SQL指定Hologres分區(qū)表的主鍵為order_id,product_id和city。 CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`( CONSTRAINT `PK_order_id_city` PRIMARY KEY (`order_id`,`product_id`,`city`) NOT ENFORCED ) PARTITIONED BY (city) AS TABLE `mysql`.`tpcds`.`orders`;
示例八:CTAS語(yǔ)句使用字段類型寬容模式同步數(shù)據(jù)到Hologres表
在CTAS場(chǎng)景中,可能需要調(diào)整已有字段數(shù)據(jù)類型的精度(例如,從VARCHAR(10)到VARCHAR(20))。
Flink計(jì)算引擎VVR 6.0.5-Flink 1.15以下版本,上游修改數(shù)據(jù)類型可能導(dǎo)致CTAS任務(wù)失敗,只能重建結(jié)果表。
Flink計(jì)算引擎VVR 6.0.5-Flink 1.15及以上版本,在同步數(shù)據(jù)到Hologres表時(shí),支持使用類型寬容模式。寬容模式應(yīng)該在首次啟動(dòng)CTAS作業(yè)時(shí)開(kāi)啟,如果在首次啟動(dòng)時(shí)未開(kāi)啟寬容模式,需要?jiǎng)h除下游表并且將作業(yè)無(wú)狀態(tài)重啟才能生效。
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders` WITH ( 'connector' = 'hologres', 'enableTypeNormalization' = 'true' -- 使用字段類型寬容模式。 ) AS TABLE `mysql`.`tpcds`.`orders`;
在上游發(fā)生數(shù)據(jù)類型修改事件時(shí),只要所修改類型與原類型的歸一化類型相同,都視作修改成功。目前類型歸一化規(guī)則如下:
TINYINT、SMALLINT、INT和BIGINT歸一化為BIGINT。
CHAR、VARCHAR和STRING歸一化為STRING。
FLOAT和DOUBLE歸一化為DOUBLE。
其他數(shù)據(jù)類型按照原本的類型映射規(guī)則創(chuàng)建,詳情參見(jiàn)類型映射。
例如:
SMALLINT修改為INT,兩者的歸一化類型都是BIGINT,視為修改成功,CTAS作業(yè)正常運(yùn)行。
從FLOAT改為BIGINT,兩者的歸一化類型分別為DOUBLE和BIGINT,屬于不兼容的情況,會(huì)拋出異常。
示例九:通過(guò)CTAS語(yǔ)句將MongoDB數(shù)據(jù)源表同步到Hologres表
實(shí)時(shí)計(jì)算Flink VVR 8.0.6及以上版本,CTAS語(yǔ)句支持同步MongoDB數(shù)據(jù)源表,能夠在實(shí)時(shí)同步MongoDB數(shù)據(jù)的同時(shí)將上游表結(jié)構(gòu)變更同步到下游表??梢耘浜螹ongoDB Catalog使用,無(wú)需手動(dòng)定義Schema,MongoDB Catalog詳情可參考管理MongoDB Catalog。
這里以使用CTAS語(yǔ)句同步MongoDB數(shù)據(jù)源表數(shù)據(jù)到Hologres表為例:
BEGIN STATEMENT SET;
CREATE TABLE IF NOT EXISTS `holo`.`database`.`table1`
AS TABLE `mongodb`.`database`.`collection1`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;
CREATE TABLE IF NOT EXISTS `holo`.`database`.`table2`
AS TABLE `mongodb`.`database`.`collection2`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;
END;
使用CTAS或CDAS語(yǔ)句將MongoDB中的數(shù)據(jù)同步至目標(biāo)表時(shí),必須滿足以下要求:
實(shí)時(shí)計(jì)算Flink VVR版本必須為8.0.6及以上,MongoDB數(shù)據(jù)庫(kù)版本必須為6.0及以上。
在SQL Hints中已將scan.incremental.snapshot.enabled和scan.full-changelog參數(shù)都設(shè)置為true。
MongoDB數(shù)據(jù)庫(kù)已開(kāi)啟前像后像(Pre- and Post-images)記錄功能,開(kāi)啟方法參見(jiàn)Document Preimages。
當(dāng)使用同一個(gè)作業(yè)同步多個(gè)MongoDB集合時(shí),需要滿足以下條件:
每張表關(guān)于MongoDB的配置必須完全相同,包括hosts、scheme、username、password、connectionOptions。
每張表的scan.startup.mode配置必須完全相同。
示例十:MySQL整庫(kù)同步Kafka
在實(shí)際使用中,同一張MySQL表可能被多個(gè)作業(yè)依賴,當(dāng)多個(gè)任務(wù)使用同一張MySQL表做處理時(shí),MySQL數(shù)據(jù)庫(kù)會(huì)啟動(dòng)多個(gè)連接,對(duì)MySQL服務(wù)器和網(wǎng)絡(luò)造成很大的壓力。為了緩解對(duì)上游MySQL數(shù)據(jù)庫(kù)的壓力,實(shí)時(shí)計(jì)算Flink版提供MySQL整庫(kù)同步到Kafka的能力,通過(guò)引入Kafka作為中間層,并使用CDAS整庫(kù)同步或CTAS整表同步到Kafka來(lái)解決。具體操作請(qǐng)參見(jiàn)MySQL整庫(kù)同步Kafka。
相關(guān)文檔
如果您需要進(jìn)行整庫(kù)同步、分庫(kù)合并或源庫(kù)新增表同步,詳情請(qǐng)參見(jiàn)CREATE DATABASE AS(CDAS)語(yǔ)句。
使用CTAS和CDAS實(shí)現(xiàn)數(shù)據(jù)同步的教程詳情,請(qǐng)參見(jiàn)數(shù)據(jù)庫(kù)實(shí)時(shí)入倉(cāng)快速入門、基于Flink+Hologres搭建實(shí)時(shí)數(shù)倉(cāng)或基于Flink搭建流式湖倉(cāng)OpenLake方案。