Dataphin在Flink SQL基礎上遵循ANSI-SQL標準進行了部分語法改進,以幫助您提升開發效率。本文將為說明Dataphin Flink SQL支持的詞法新特性及使用方法。
支持跨項目空間引用
DDL與DML表名語法增強
通常一個標準的Flink SQL任務需要完整DDL與DML語句。示例如下:
--DDL
CREATE TABLE input_table (
field1 VARCHAR,
field2 BIGINT
) WITH (
type = 'datagen'
);
CREATE TABLE output_table (
field1 VARCHAR,
field2 BIGINT
) WITH (
type = 'print'
);
--DML
INSERT INTO output_table
SELECT field1, field2 FROM input_table;
以上語句的DDL在多個任務中需要重復定義。為避免重復聲明,提高開發效率。Dataphin對FlinkSQL的表名做了語法擴展增強。在Dataphin中定義元表(schema)后, 可以在任何任務中通過以下語法直接引用, 同時在多個任務中無需再次聲明, 并且支持跨項目空間引用。
<compoundTableName> ::= [projectName <DOT>] tableName
示例說明
以上一段SQL為例,定義好元表后,研發任務中只需要輸入以下語句。
INSERT INTO output_table -- 項目前綴缺省時默認是當前所在項目。
SELECT field1, field2 FROM ${project_name}.input_table;
內置函數與UDX語法增強
通常一個標準的Flink SQL任務使用UDX時,需要進行聲明。示例如下:
CREATE FUNCTION MY_UDX AS 'package.class';
...
CREATE VIEW my_view AS
SELECT MY_UDX(args)
FROM ...
以上自定義UDX語句在多個任務中需要重復定義。為避免重復聲明,提高開發效率。Dataphin引入資源概念,您可以在Dataphin注冊函數后,在任何任務中通過以下語法直接引用,同時在多個任務中無需再次聲明,并且支持跨項目空間引用。
<compoundFunctionName> ::= [projectName <COLON>] functionName
示例說明
以上一段SQL為例,在Dataphin注冊函數后,研發任務中只需要輸入以下語句。
CREATE VIEW my_view AS SELECT ${project_name}:MY_UDX(args) -- 項目前綴缺省時默認是當前所在項目FROM。
FROM ...
特殊說明
Dataphin支持overload或override內置函數,且優先使用overload或override的內置函數。示例如下:
示例一: 有項目前綴。
SELECT ${project_name}:SUBSTRING('20200108171740', 1, 8) FROM dual;
--Dataphin將默認使用已overload了的內置函數`SUBSTRING`, 若未在`${project_name}`下找到該自定義函數, 在預編譯時將無法通過。
示例二: 無項目前綴。
SELECT SUBSTRING('20200108171740', 1, 8) FROM dual;
--Dataphin將優先在當前項目內尋找自定義的`SUBSTRING`函數, 若找不到, 則認為是內置函數。
使用UpdateStatement及TableHints設置任務級別DDL參數
在Dataphin中支持對已經創建的元表進行DDL的WITH參數配置,同時Dataphin支持配置任務粒度的參數,包括Watermark、計算列、主鍵、Header等, 且增加了SetOption
語法支持。語法規范如下:
SET {
[ projectName <DOT>] tableName <DOT>
{
propertyName
| WATERMARK
| <computedColumn>
| <primaryKey>
| <procTime>
| {fieldName <DOT> <isHeader>}
}
} <EQ> {
@variable = identifier
| @variable = literal
| @variable = expression
}
設置WITH參數時,建議將表DDL的公用WITH參數在元表中進行定義, 任務粒度的個性化參數可以通過SET
語句或TableHints
在代碼中定義, 多個SET語句間以;
為分隔符。
使用Flink SQL保留字時需要加(``)符號。示例如下:
Watermark:SET語句使用
SET [project.]table.`watermark` = ...
。Partition:SET語句使用
SET [project.]table.`partition` = ...
。
DDL參數優先級
任務級別的參數覆蓋
任務級別的參數設置會覆蓋元表中配置的參數設置(僅針對可覆蓋的參數)。例如,元表中設置表T
的primary key
為field1
, 任務中使用表T
后, 如果存在任何primary key
設置,都以任務中的設置為準 (即任務中設置表T的primary key
為field2
, 則field1
在該任務中不再是表T
的主鍵)。
基于Dataphin的安全和管理策略,元表上的部分參數不允許在SQL任務中通過SET覆蓋。例如MaxCompute元表上定義的表名,不能在SQL任務中通過SET語句覆蓋。不同類型的元表有不同的可覆蓋參數,具體以實際操作為準。
任務內SET參數
任務內SET參數具有全局優先級, TableHints的作用范圍是Context-specific
, SET參數會補充到TableHints中不存在的參數項,相同參數項會被Shade。
例如Blink在2.2.5版本后, 增加了對維表設置INDEX
和UNIQUE INDEX
的支持, 所以目前聲明JOIN KEY
的關鍵字時存在PIRMARY KEY
、INDEX
、UNIQUE INDEX
三種,故Dataphin會聯合做判斷。示例如下:
SET my_dim_table.primarykey = 'field1';
CREATE VIEW tmp1 AS
SELECT my_source_table1.*
FROM my_source_table1 JOIN my_dim_table WITH (index = 'field2') FOR SYSTEM_TIME AS OF PROCTIME()
ON ...;
CREATE VIEW tmp2 AS
SELECT my_source_table2.*
FROM my_source_table2 JOIN my_dim_table WITH (primarykey = 'field3') FOR SYSTEM_TIME AS OF PROCTIME()
ON ...;
以上代碼Dataphin將會理解為在生成tmp1
時設置my_dim_table
的primary key
是field1
(參數填充),index
是field2
;在生成tmp2
時設置my_dim_table
的primary key
是field3
(參數Shade)。
更多示例說明:
示例一:Flink SQL Source表參數。
以創建kafka源表為例。
-- vvr語法 CREATE TEMPORARY TABLE kafkaTable ( `user_id` BIGINT, `item_id` BIGINT, `category_id` BIGINT, `behavior` STRING, `topic` STRING METADATA VIRTUAL, `partition` BIGINT METADATA VIRTUAL ) WITH ( 'connector' = 'kafka', 'topic' = 'my_excellent_topic', 'properties.bootstrap.servers' = 'mykafka:9092', 'properties.group.id' = 'my_excellent_group' 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset' )
個性化參數轉換成Dataphin Flink SQL的SET表達式。
-- 如果表屬性的名字是sql關鍵字,或者出現了其他字符,可以用反引號轉義。 SET kafkaTable.`properties.group.id` = 'new_group_id'; SET kafkaTable.`scan.startup.mode` = 'latest-offset';
示例二:Flink SQL Sink表參數。
以創建aliHBase結果表為例。
create table hbase_output( rk varchar, rk1 varchar, rk2 varchar, f bigint, PRIMARY KEY(rk) ) with ( type='alihbase', diamondKey='xxxxxxx', diamondGroup='yyyyyyy', columnFamily='cf', tableName='blink_hbase_test', bufferSize=500; );
若需要在任務粒度中Overwrite掉元表中的配置, 可做如下設置。
SET hbase_output.bufferSize = 1000;
示例三:Flink SQL Dim表參數。
以創建MaxCompute維表為例。
CREATE TABLE white_list ( id varchar, name varchar, age int, PRIMARY KEY (id) --PERIOD FOR SYSTEM_TIME -- 備注: 從blink3.x開始維表DDL不需要PERIOD FOR SYSTEM_TIME標識。 ) with ( type = 'odps', endPoint = 'your_end_point_name', project = 'your_project_name', tableName = 'your_table_name', accessId = 'your_access_id', accessKey = 'your_access_key', `partition` = 'ds=20180905', cache = 'ALL' );
轉換成Dataphin Flink SQL的SET表達式。
SET white_list.cache='ALL'; SET white_list.cacheTTLMs=86400000; --若任務粒度想特別設置該表的cache更新時間, 可在此增加SET。
示例四:WATERMARK、計算列、HEADER、PROCTIME、PRIMARY KEY/(UNIQUE)INDEX設置。
目前元表中只支持主鍵(Primary Key)和Header設置, 若需要使用WATERMARK、PROCTIME、計算列等,需要在任務粒度進行設置。語法規范如下:
設置WATERMARK。
語法如下:
SET { [projectName <DOT>] tableName <DOT> WATERMARK } <EQ> { WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset) }
下面以標準的Flink SQL任務DDL WATERMAKR為例。
CREATE TABLE sls_stream( a INT, b BIGINT, c VARCHAR, ts TIMESTAMP, WATERMARK FOR ts AS withOffset(ts, 1000) ) with ( type = 'sls', ...);
轉換為Dataphin Flink SQL后WATERMARK設置方式。
3.7 及以上版本,需要帶上單引號。 SET sls_stream.`watermark`= 'WATERMARK FOR ts AS withOffset(ts, 1000)'; 3.6 版本 SET sls_stream.`watermark`= WATERMARK FOR ts AS withOffset(ts, 1000);
設置計算列。
語法如下:
SET { [projectName <DOT>] tableName <DOT> <computedColumn> } <EQ> { column_name AS computed_column_expression | { <LPAREN> column_name AS computed_column_expression (<COMMA> column_name AS computed_column_expression)* <RPAREN> } }
下面以標準的Flink SQL任務DDL計算列為例。
CREATE TABLE sls_stream( a INT, b BIGINT, c VARCHAR, ts AS to_timestamp(c, 'yyyyMMddHHmmss') ) with ( type = 'sls', ...);
轉換為Dataphin Flink SQL后計算列設置方式。
單個計算列方式。
SET sls_stream.computedColumn= ts AS to_timestamp(c, 'yyyyMMddHHmmss');
多個計算列方式。
SET sls_stream.computedColumn= ( ts1 AS to_timestamp(c, 'yyyyMMddHHmmss'), ts2 AS to_timestamp(d, 'yyyy-MM-dd HH:mm:ss') );
設置PROCTIME。
語法如下:
SET { [projectName <DOT>] tableName <DOT> <procTime> } <EQ> { columnName AS PROCTIME() }
轉換為Dataphin Flink SQL后PROCTIME設置方式。
SET sls_stream.procTime= d AS PROCTIME();
設置PRIMARYKEY、INDEX、UNIQUE INDEX。
重要Blink 3.6.0及以上版本, TableHints采用社區的Hints方式。Ververica Flink和開源Flink不支持INDEX和UNIQUE INDEX。
語法如下:
query : select /*+ hint_content */ ... from table_name1 /*+ hint_content */ join table_name2 /*+ hint_content */ ... hint_content : hint_item[, hint_item]* hint_item : hint_name | hint_name(k1=v1 [ , k2=v2 ]*) | hint_name(hint_opt [ ,hint_opt ]*) k : simple_identifier v : string_literal hint_opt : simple_identifier | numeric_literal | string_literal
下面以標準的Flink SQL任務TableHints為例。
INSERT INTO table SELECT source_table.* FROM source_table JOIN dim_table /*+ primarykey(field1) */ FOR SYSTEM_TIME AS OF PROCTIME() ON ...;
流表JOIN時Flink允許用戶根據自身情況聲明
JOIN KEY
的關鍵字,SET具有全局優先級,TableHints為context-specific
。SET dim_table.index = 'field1'; -- set pk和 set unique index 以此類推 -- 等價于dim_table的DDL 聲明INDEX (field1) INSERT INTO table1 SELECT source_table.* FROM source_table JOIN dim_table FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- 等價于dim_table的DDL 聲明INDEX (field2)。 INSERT INTO table2 SELECT source_table.* FROM source_table JOIN dim_table WITH (index = 'field2') FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- 等價于dim_table的DDL 聲明PRIMARY KEY (field1), INDEX (field2)。 INSERT INTO table3 SELECT source_table.* FROM source_table JOIN dim_table WITH (primarykey = 'field2') FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- 等價于dim_table的DDL 聲明PRIMARY KEY (field1), INDEX (field1)。 INSERT INTO table4 SELECT source_table.* FROM source_table JOIN dim_table WITH (primarykey = 'field1') FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- 等價于dim_table的DDL 聲明PRIMARY KEY (field1), UNIQUE INDEX (field2, field3)。 INSERT INTO table5 SELECT source_table.* FROM source_table JOIN dim_table WITH (UNIQUEINDEX = 'field2, field3') FOR SYSTEM_TIME AS OF PROCTIME() ON ...;
重要以上的
join with
語法,為Blink引擎早期使用方式,新的Blink、Ververica Flink、開源Flink請使用TableHint。Tablehint示例如下:
-- 指定dim_table的主鍵是id字段 (blink、vvr、apache-flink都支持)。 INSERT INTO table1 SELECT source_table.* FROM source_table JOIN dim_table/*+primarykey(id)*/ FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- 指定dim_table的維表主鍵是id(僅blink支持)。 INSERT INTO table1 SELECT source_table.* FROM source_table JOIN dim_table/*+index(id)*/ FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- 指定dim_table的維表唯一主鍵是id(僅blink支持)。 INSERT INTO table1 SELECT source_table.* FROM source_table JOIN dim_table/*+uniqueindex(id)*/ FOR SYSTEM_TIME AS OF PROCTIME() ON ...;
設置HEADER。
語法如下:
SET { [projectName <DOT>] tableName <DOT> <isHeader> } <EQ> { TRUE }
示例如下:
-- blink demo create table kafka_table ( `message` VARBINARY, topic varchar HEADER ) WITH ( type = 'kafka010', topic = 'test_kafka_topic', `group.id` = 'test_kafka_consumer_group', bootstrap.servers = 'ip1:port1,ip2:port2,ip3:port3' ); -- vvr demo create table kafka_table ( `message` VARBINARY, topic varchar METADATA VIRTUAL ) WITH ( 'connector' = 'kafka', 'topic' = 'my_excellent_topic', 'properties.bootstrap.servers' = 'mykafka:9092', 'properties.group.id' = 'my_excellent_group' 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset' );
轉換為Dataphin SET支持以下兩種語法。
SET kafka_table.topic.isHeader = true; --不加單引號。 SET kafka_table.topic.isHeader = 'true'; --加單引號。
示例五:TimeTunnel自動訂閱功能(TimeTunnel作為輸入)。
示例代碼如下:
set xx_project.xx_table.accessId='xxxxxx'; set xx_project.xx_table.accessKey='xxxxxx';
超級管理員賬號及無權限賬號不可自動訂閱,必須在SQL中配置。
其他個人賬號可配置,也可以不配置如上SET信息。通常情況下,在無法獲取到JobName時會用默認的JobName名
default
去記錄存檔訂閱的Topic信息,因此對于不同的Job,同一Topic在單獨的預編譯功能中可能不會重復訂閱。提交和發布功能在當前Job和Topic下第一次使用時會自動訂閱,有存檔記錄后不會重復訂閱。
示例六:表名沖突兼容功能。
項目如果綁定了離線計算源,則有可能存在離線表和實時表名沖突的情況。針對此類情況,兼容邏輯如下:
當任務的SQL中使用了存在于實時和離線的同名表,則在運行預編譯相關功能時,會報出錯誤提示信息:Please set only one type for this table adi_ae_matrix_rt_slr_prim_cate like SET XXX.XXX.tableType = 'XX' style! The type list as follows: [CLOUD_HBASE, MAX_COMPUTE]。
此時需要明確SQL中需要用到的表的數據類型,因為同一個
project
下同一種數據類型不會出現表名重復。可以根據報錯提示信息選擇CLOUD_HBASE或者 MAX_COMPUTE。示例代碼如下:-- 強制指定使用MaxCompute物理表。 SET xx_project.xx_table.tableType = 'MAX_COMPUTE';
示例七:MaxCompute(odps)數據源表支持。
如果要使用MaxCompute數據源表,需要設置
tableType='odps';
否則會報參數缺失。示例八:維度表列裁剪。
列裁剪可以通過以下SET語句手動開啟。
SET {project}.{dimtable}.dataphinColumnPruning='true';
說明裁剪只會限于維表,且只支持直接join表,不支持直接join子查詢。
示例九:流批一體參數自定義。
流批一體任務中會廣泛使用鏡像表,而鏡像表在最終使用時會翻譯為對應的流表或批表。
為了適應流表/批表的多樣性(流表/批表的數據源可能不一樣,帶來
with
參數中key
可能不一樣;流表/批表的某些設置可能不一樣,比如batchSize
等),所以需要將使用TableHints進行流表/批表的對應。語法如下:set project.table.${mode}.${key}
設置批任務的起停時間。示例如下:
set project.table.batch.startTime='2020-11-11 00:00:00'; set project.table.batch.endTime='2020-11-12 00:00:00';
設置流的AccessKey。示例如下:
set dwd_lux_trd_ord_subpay_mirror.`stream`.accessId='xxxxxx'; set dwd_lux_trd_ord_subpay_mirror.`stream`.accessKey='xxxxxx';
示例十:結果表列裁剪。
通過hint方式,可以指定結果表的列裁剪。
重要該特性只對HBase的Sink表生效。
-- 假設hbase ddl的語句如下 CREATE TABLE hbase_sink( rowkey INT, family1 ROW<q1 INT>, family2 ROW<q2 STRING, q3 BIGINT> ) with ( 'connector'='cloudhbase', 'table-name'='<yourTableName>', 'zookeeper.quorum'='<yourZookeeperQuorum>' ); -- 報錯:因為表字段不匹配 insert into hbase_sink select key, ROW(f1q1) from ... -- 通過:這里通過hint方式指定裁剪sink表,指定寫入rowkey和q1列 insert into hbase_sink/*+dataphincolumns(rowkey,family1.q1)*/ select key, ROW(f1q1) from ... -- 通過:這里通過hint方式指定裁剪sink表,指定寫入rowkey和q2、q3列 insert into hbase_sink/*+dataphincolumns(rowkey,family2.q2,family2.q3)*/ select key, ROW(f2q2,f2q3) from ...