Dataphin的Flink_SQL任務支持多種開發方式,包括原生DDL+DML開發、使用Dataphin元表進行開發、使用Dataphin數據源表進行開發、以及使用計算源物理表進行開發,且不同開發方式所創建的表支持任意混用,包括鏡像表。不同開發方式的使用方法、使用場景以及優缺點不同。本文將為您介紹各開發方式幫助您更好的完成Flink_SQL任務開發。
Dataphin計算源物理表開發方式
Dataphin計算源物理表開發方式是指您在開發Flink_SQL任務時,您可以直接通過寫項目名.表名稱
的方式直接訪問計算源中的物理表。并且支持跨項目訪問,訪問其他項目綁定的物理表。
目前支持訪問Hologres計算源的物理表數據。
訪問的物理表所在的項目已綁定支持的計算源。
使用示例
若您需要將example
項目計算源中的test
物理表數據插入到test_demo
物理表中。您可以參考以下示例代碼:
insert into test_demo select id,name from example.test;
Dataphin數據源表開發方式
Dataphin數據源表開發方式是指您在開發Flink_SQL任務時,可以直接訪問在Dataphin中所創建的數據源中的表進行任務開發。如果您希望使用此種方式,需要先在數據源上配置數據源編碼。具體操作,請參見數據源管理概述。
配置數據源編碼后,可在Flink SQL任務中通過數據源編碼.table
或數據源編碼.schema.table
的格式引用數據源中的表;如果需要根據所處環境自動訪問對應環境的數據源,請通過${數據源編碼}.table或${數據源編碼}.schema.table
的格式訪問;下面以MySQL和Hologres數據源為例說明訪問方式:
目前僅支持MySQL、Hologres、MaxCompute、StarRocks數據源。
MySQL、StarRocks數據源表:支持通過
數據源編碼.表名稱
的形式訪問數據源中的物理表。Hologres數據源表:支持通過
數據源編碼.schema名稱.表名稱
的形式訪問Hologres數據源中的物理表。
使用示例
若您需要將MySQL數據源(數據源編碼為ds_demo_mysql
)的demo_mysql
物理表的數據插入至test_demo
物理表中。您可以參考以下代碼完成開發。
insert into test_demo select id,name from ds_demo_mysql.demo_mysql;
若您需要將Hologres數據源(數據源編碼為ds_demo_hologres
、schema的名稱為hologres
)的demo_hologres
物理表的數據插入至test_demo
物理表中。您可以參考以下代碼完成開發。
insert into test_demo select id,name from ds_demo_hologres.hologres.demo_hologres;
Dataphin元表開發方式
在Dataphin中,元表是在原生DDL+DML開發上更高一層的邏輯概念,元表是通過數據管理的跨存儲類型表。開發過程中所用到的輸入表、輸出表、維表可以通過創建元表方式來進行創建和管理,以支持您通過引用元表的方式來創建其他數據表。這種方式可以使您一次建表,可多次引用。您無需重復編寫DDL語句,無需進行繁雜的輸入、輸出、維表映射,從而簡化開發,提升效率和體驗。同時通過元表可以有效避免直接編寫原生Flink DDL語句導致的敏感信息透出等問題。
使用示例
若您需要創建demo01
和demo02
數據表,并將demo01
的數據插入至demo02
。您可以參考以下步驟完成開發。
通過Dataphin元表功能,創建
demo01
和demo02
數據表,具體操作,請參見新建元表。在Flink_SQL任務中編寫插入語句,示例代碼如下:
INSERT into demo02 select * from demo01;
原生DDL+DML開發方式
原生DDL開發是指在Flink_SQL任務使用Flink SQL語句直接創建和管理數據表的開發方式。如使用CREATE TABLE/CREATE TEMPORARY TABLE
創建表。這種開發方式通過代碼定義表結構并通過SQL語句來創建和管理表。
原生DDL+DML開發方式因為需要在代碼中編寫明文的用戶名和密碼,導致數據不安全,可能造成數據泄露,請謹慎使用。
使用示例
若您需要在Flink_SQL任務中使用原生DDL+DML開發方式,您可以參考以下示例代碼進行創建。以下示例代碼實現了模擬數據的輸入輸出(從t1表中讀數據寫到t2表中)。
使用原生DDL+DML開發語句創建表,您需關閉Dataphin編碼規范中的禁止使用Flink原生DDL語句設置。具體操作,請參見編碼規范。
create temporary table t1 (
id bigint,
name varchar
) with (
'connector' = 'datagen',
'rows-per-second' = '1'
);
create temporary table t2 (
id bigint,
name varchar
) with (
'connector' = 'print'
);
-- begin statement set;
insert into t2 select id,replace(name, '\"', '"') as name from t1;
-- set;