日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Dataphin Flink SQL詞法提效

Dataphin在Flink SQL基礎上遵循ANSI-SQL標準進行了部分語法改進,以幫助您提升開發效率。本文將為說明Dataphin Flink SQL支持的詞法新特性及使用方法。

支持跨項目空間引用

DDL與DML表名語法增強

通常一個標準的Flink SQL任務需要完整DDL與DML語句。示例如下:

--DDL
CREATE TABLE input_table (
  	field1 VARCHAR,
    field2 BIGINT
  ) WITH (
		type = 'datagen'
  );
CREATE TABLE output_table (
  	field1 VARCHAR,
    field2 BIGINT
  ) WITH (
		type = 'print'
  );
--DML
INSERT INTO output_table
SELECT field1, field2 FROM input_table;

以上語句的DDL在多個任務中需要重復定義。為避免重復聲明,提高開發效率。Dataphin對FlinkSQL的表名做了語法擴展增強。在Dataphin中定義元表(schema)后, 可以在任何任務中通過以下語法直接引用, 同時在多個任務中無需再次聲明, 并且支持跨項目空間引用。

<compoundTableName> ::= [projectName <DOT>] tableName

示例說明

以上一段SQL為例,定義好元表后,研發任務中只需要輸入以下語句。

INSERT INTO output_table -- 項目前綴缺省時默認是當前所在項目。
SELECT field1, field2 FROM ${project_name}.input_table;

內置函數與UDX語法增強

通常一個標準的Flink SQL任務使用UDX時,需要進行聲明。示例如下:

CREATE FUNCTION MY_UDX AS 'package.class';
...
CREATE VIEW my_view AS 
SELECT MY_UDX(args)
FROM ...

以上自定義UDX語句在多個任務中需要重復定義。為避免重復聲明,提高開發效率。Dataphin引入資源概念,您可以在Dataphin注冊函數后,在任何任務中通過以下語法直接引用,同時在多個任務中無需再次聲明,并且支持跨項目空間引用。

<compoundFunctionName> ::= [projectName <COLON>] functionName

示例說明

以上一段SQL為例,在Dataphin注冊函數后,研發任務中只需要輸入以下語句。

CREATE VIEW my_view AS SELECT ${project_name}:MY_UDX(args) -- 項目前綴缺省時默認是當前所在項目FROM。
FROM ...

特殊說明

Dataphin支持overload或override內置函數,且優先使用overload或override的內置函數。示例如下:

  • 示例一: 有項目前綴。

SELECT ${project_name}:SUBSTRING('20200108171740', 1, 8) FROM dual;
--Dataphin將默認使用已overload了的內置函數`SUBSTRING`, 若未在`${project_name}`下找到該自定義函數, 在預編譯時將無法通過。
  • 示例二: 無項目前綴。

SELECT SUBSTRING('20200108171740', 1, 8) FROM dual;
--Dataphin將優先在當前項目內尋找自定義的`SUBSTRING`函數, 若找不到, 則認為是內置函數。

使用UpdateStatement及TableHints設置任務級別DDL參數

在Dataphin中支持對已經創建的元表進行DDL的WITH參數配置,同時Dataphin支持配置任務粒度的參數,包括Watermark、計算列、主鍵、Header等,  且增加了SetOption語法支持。語法規范如下:

SET {
	[ projectName <DOT>] tableName <DOT> 
		{ 
			propertyName
			| WATERMARK
			| <computedColumn> 
			| <primaryKey> 
			| <procTime> 
			| {fieldName <DOT> <isHeader>} 
		}
} 	<EQ> {
		@variable = identifier
		| @variable = literal
		| @variable = expression
}

設置WITH參數時,建議將表DDL的公用WITH參數在元表中進行定義, 任務粒度的個性化參數可以通過SET語句或TableHints在代碼中定義, 多個SET語句間以;為分隔符。

重要

使用Flink SQL保留字時需要加(``)符號示例如下:

  • Watermark:SET語句使用SET [project.]table.`watermark` = ...

  • Partition:SET語句使用SET [project.]table.`partition` = ...

DDL參數優先級

任務級別的參數覆蓋

任務級別的參數設置會覆蓋元表中配置的參數設置(僅針對可覆蓋的參數)。例如,元表中設置表Tprimary keyfield1 , 任務中使用表T后, 如果存在任何primary key設置,都以任務中的設置為準 (即任務中設置表T的primary keyfield2, 則field1在該任務中不再是表T的主鍵)。

重要

基于Dataphin的安全和管理策略,元表上的部分參數不允許在SQL任務中通過SET覆蓋。例如MaxCompute元表上定義的表名,不能在SQL任務中通過SET語句覆蓋。不同類型的元表有不同的可覆蓋參數,具體以實際操作為準。

任務內SET參數

任務內SET參數具有全局優先級, TableHints的作用范圍是Context-specific, SET參數會補充到TableHints中不存在的參數項,相同參數項會被Shade。

例如Blink在2.2.5版本后, 增加了對維表設置INDEXUNIQUE INDEX的支持, 所以目前聲明JOIN KEY的關鍵字時存在PIRMARY KEYINDEXUNIQUE INDEX三種,故Dataphin會聯合做判斷。示例如下:

SET my_dim_table.primarykey = 'field1';

CREATE VIEW tmp1 AS 
SELECT my_source_table1.*
FROM my_source_table1 JOIN my_dim_table WITH (index = 'field2') FOR SYSTEM_TIME AS OF PROCTIME()
ON ...;

CREATE VIEW tmp2 AS 
SELECT my_source_table2.*
FROM my_source_table2 JOIN my_dim_table WITH (primarykey = 'field3') FOR SYSTEM_TIME AS OF PROCTIME()
ON ...;

以上代碼Dataphin將會理解為在生成tmp1時設置my_dim_tableprimary keyfield1(參數填充),indexfield2;在生成tmp2時設置my_dim_tableprimary keyfield3(參數Shade)。

更多示例說明:

  • 示例一:Flink SQL Source表參數。

    以創建kafka源表為例。

    -- vvr語法
    CREATE TEMPORARY TABLE kafkaTable (
        `user_id` BIGINT,
        `item_id` BIGINT,
        `category_id` BIGINT,
        `behavior` STRING,
        `topic` STRING METADATA VIRTUAL,
        `partition` BIGINT METADATA VIRTUAL
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'my_excellent_topic',
        'properties.bootstrap.servers' = 'mykafka:9092',
        'properties.group.id' = 'my_excellent_group'
        'format' = 'csv',
        'scan.startup.mode' = 'earliest-offset'
    )

    個性化參數轉換成Dataphin Flink SQL的SET表達式。

    -- 如果表屬性的名字是sql關鍵字,或者出現了其他字符,可以用反引號轉義。
    SET kafkaTable.`properties.group.id` = 'new_group_id';
    SET kafkaTable.`scan.startup.mode` = 'latest-offset';
  • 示例二:Flink SQL Sink表參數。

    以創建aliHBase結果表為例。

    create table hbase_output(
      rk  varchar,
      rk1 varchar,
      rk2 varchar,
      f bigint,
      PRIMARY KEY(rk)
    ) with (
      type='alihbase',
      diamondKey='xxxxxxx',
      diamondGroup='yyyyyyy',
      columnFamily='cf',
      tableName='blink_hbase_test',
      bufferSize=500;
    );

    若需要在任務粒度中Overwrite掉元表中的配置, 可做如下設置。

    SET hbase_output.bufferSize = 1000;
  • 示例三:Flink SQL Dim表參數。

    以創建MaxCompute維表為例。

     CREATE TABLE white_list (
      id varchar,
      name varchar,
      age int,
      PRIMARY KEY (id)
      --PERIOD FOR SYSTEM_TIME -- 備注: 從blink3.x開始維表DDL不需要PERIOD FOR SYSTEM_TIME標識。
    ) with (
      type = 'odps',
      endPoint = 'your_end_point_name',
      project = 'your_project_name',
      tableName = 'your_table_name',
      accessId = 'your_access_id',
      accessKey = 'your_access_key',
      `partition` = 'ds=20180905',
      cache = 'ALL'
    );

    轉換成Dataphin Flink SQL的SET表達式。

    SET white_list.cache='ALL';
    SET white_list.cacheTTLMs=86400000;
    --若任務粒度想特別設置該表的cache更新時間, 可在此增加SET。
  • 示例四:WATERMARK、計算列、HEADER、PROCTIME、PRIMARY KEY/(UNIQUE)INDEX設置。

    目前元表中只支持主鍵(Primary Key)和Header設置, 若需要使用WATERMARK、PROCTIME、計算列等,需要在任務粒度進行設置。語法規范如下:

    • 設置WATERMARK。

      語法如下:

      SET {
      	[projectName <DOT>] tableName <DOT> WATERMARK
      } 	<EQ> {
      		WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset)
      }

      下面以標準的Flink SQL任務DDL WATERMAKR為例。

      CREATE TABLE sls_stream(
      		a INT,
      		b BIGINT,
      		c VARCHAR,
      		ts TIMESTAMP,
      		WATERMARK FOR ts AS withOffset(ts, 1000)
      	  ) with (
      		type = 'sls',
        ...);

      轉換為Dataphin Flink SQL后WATERMARK設置方式。

      3.7 及以上版本,需要帶上單引號。
      SET sls_stream.`watermark`= 'WATERMARK FOR ts AS withOffset(ts, 1000)';
      3.6 版本
      SET sls_stream.`watermark`= WATERMARK FOR ts AS withOffset(ts, 1000);
    • 設置計算列。

      語法如下:

      SET {
      	[projectName <DOT>] tableName <DOT> <computedColumn>
      } 	<EQ> {
      		column_name AS computed_column_expression
          |
          {
          	<LPAREN>
          		column_name AS computed_column_expression
          		(<COMMA> column_name AS computed_column_expression)*
            <RPAREN>  
          }
      }

      下面以標準的Flink SQL任務DDL計算列為例。

      CREATE TABLE sls_stream(
      		a INT,
      		b BIGINT,
      		c VARCHAR,
      		ts AS to_timestamp(c, 'yyyyMMddHHmmss')
      	  ) with (
      		type = 'sls',
        ...);

      轉換為Dataphin Flink SQL后計算列設置方式。

      • 單個計算列方式。

        SET sls_stream.computedColumn= ts AS to_timestamp(c, 'yyyyMMddHHmmss');
      • 多個計算列方式。

        SET sls_stream.computedColumn= ( ts1 AS to_timestamp(c, 'yyyyMMddHHmmss'), ts2 AS to_timestamp(d, 'yyyy-MM-dd HH:mm:ss') );
    • 設置PROCTIME。

      語法如下:

      SET {
      	[projectName <DOT>] tableName <DOT> <procTime>
      } 	<EQ> {
      		columnName AS PROCTIME()
      }

      轉換為Dataphin Flink SQL后PROCTIME設置方式。

      SET sls_stream.procTime= d AS PROCTIME();
    • 設置PRIMARYKEY、INDEX、UNIQUE INDEX。

      重要

      Blink 3.6.0及以上版本, TableHints采用社區的Hints方式。Ververica Flink和開源Flink不支持INDEX和UNIQUE INDEX。

      語法如下:

      query :
            select /*+ hint_content */
            ...
            from
                table_name1 /*+ hint_content */
                join 
                table_name2 /*+ hint_content */
            ...
      hint_content :
             hint_item[, hint_item]*
      hint_item :
             hint_name
         |   hint_name(k1=v1 [ , k2=v2 ]*)
         |   hint_name(hint_opt [ ,hint_opt ]*)
      k :
            simple_identifier
      v :
            string_literal
      hint_opt :
            simple_identifier
         |  numeric_literal
         |  string_literal

      下面以標準的Flink SQL任務TableHints為例。

      INSERT INTO table
      SELECT source_table.*
      FROM source_table JOIN dim_table /*+ primarykey(field1) */ FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;

      流表JOIN時Flink允許用戶根據自身情況聲明JOIN KEY的關鍵字,SET具有全局優先級,TableHints為context-specific

      SET dim_table.index = 'field1'; -- set pk和 set unique index 以此類推
      
      -- 等價于dim_table的DDL 聲明INDEX (field1)
      INSERT INTO table1
      SELECT source_table.*
      FROM source_table JOIN dim_table FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      -- 等價于dim_table的DDL 聲明INDEX (field2)。
      INSERT INTO table2
      SELECT source_table.*
      FROM source_table JOIN dim_table WITH (index = 'field2') FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      
      -- 等價于dim_table的DDL 聲明PRIMARY KEY (field1), INDEX (field2)。
      INSERT INTO table3
      SELECT source_table.*
      FROM source_table JOIN dim_table WITH (primarykey = 'field2') FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      -- 等價于dim_table的DDL 聲明PRIMARY KEY (field1), INDEX (field1)。
      INSERT INTO table4
      SELECT source_table.*
      FROM source_table JOIN dim_table WITH (primarykey = 'field1') FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      -- 等價于dim_table的DDL 聲明PRIMARY KEY (field1), UNIQUE INDEX (field2, field3)。
      INSERT INTO table5
      SELECT source_table.*
      FROM source_table JOIN dim_table WITH (UNIQUEINDEX = 'field2, field3') FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      重要

      以上的join with語法,為Blink引擎早期使用方式,新的Blink、Ververica Flink、開源Flink請使用TableHint。

      Tablehint示例如下:

      -- 指定dim_table的主鍵是id字段 (blink、vvr、apache-flink都支持)。
      INSERT INTO table1
      SELECT source_table.*
      FROM source_table 
      JOIN dim_table/*+primarykey(id)*/ FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      -- 指定dim_table的維表主鍵是id(僅blink支持)。
      INSERT INTO table1
      SELECT source_table.*
      FROM source_table 
      JOIN dim_table/*+index(id)*/ FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      -- 指定dim_table的維表唯一主鍵是id(僅blink支持)。
      INSERT INTO table1
      SELECT source_table.*
      FROM source_table 
      JOIN dim_table/*+uniqueindex(id)*/ FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
    • 設置HEADER。

      語法如下:

      SET {
      	[projectName <DOT>] tableName <DOT> <isHeader>
      } 	<EQ> {
      		TRUE
      }

      示例如下:

      -- blink demo
      create table kafka_table (
          `message` VARBINARY,
          topic varchar HEADER
      ) WITH (
          type = 'kafka010',
          topic = 'test_kafka_topic',
          `group.id` = 'test_kafka_consumer_group',
          bootstrap.servers = 'ip1:port1,ip2:port2,ip3:port3'
      );
      
      -- vvr demo
      create table kafka_table (
          `message` VARBINARY,
          topic varchar METADATA VIRTUAL
      ) WITH (
          'connector' = 'kafka',
          'topic' = 'my_excellent_topic',
          'properties.bootstrap.servers' = 'mykafka:9092',
          'properties.group.id' = 'my_excellent_group'
          'format' = 'csv',
          'scan.startup.mode' = 'earliest-offset'
      );

      轉換為Dataphin SET支持以下兩種語法。

      SET kafka_table.topic.isHeader = true; --不加單引號。
      
      SET kafka_table.topic.isHeader = 'true'; --加單引號。
  • 示例五:TimeTunnel自動訂閱功能(TimeTunnel作為輸入)。

    示例代碼如下:

    set xx_project.xx_table.accessId='xxxxxx';
    set xx_project.xx_table.accessKey='xxxxxx';
    • 超級管理員賬號及無權限賬號不可自動訂閱,必須在SQL中配置。

    • 其他個人賬號可配置,也可以不配置如上SET信息。通常情況下,在無法獲取到JobName時會用默認的JobName名default去記錄存檔訂閱的Topic信息,因此對于不同的Job,同一Topic在單獨的預編譯功能中可能不會重復訂閱。提交和發布功能在當前Job和Topic下第一次使用時會自動訂閱,有存檔記錄后不會重復訂閱。

  • 示例六:表名沖突兼容功能。

    項目如果綁定了離線計算源,則有可能存在離線表和實時表名沖突的情況。針對此類情況,兼容邏輯如下:

    1. 當任務的SQL中使用了存在于實時和離線的同名表,則在運行預編譯相關功能時,會報出錯誤提示信息:Please set only one type for this table adi_ae_matrix_rt_slr_prim_cate like SET XXX.XXX.tableType = 'XX' style! The type list as follows: [CLOUD_HBASE, MAX_COMPUTE]

    2. 此時需要明確SQL中需要用到的表的數據類型,因為同一個project下同一種數據類型不會出現表名重復。可以根據報錯提示信息選擇CLOUD_HBASE或者 MAX_COMPUTE。示例代碼如下:

      -- 強制指定使用MaxCompute物理表。
      SET xx_project.xx_table.tableType = 'MAX_COMPUTE';
  • 示例七:MaxCompute(odps)數據源表支持。

    如果要使用MaxCompute數據源表,需要設置tableType='odps';否則會報參數缺失。

  • 示例八:維度表列裁剪。

    列裁剪可以通過以下SET語句手動開啟。

    SET {project}.{dimtable}.dataphinColumnPruning='true';
    說明

    裁剪只會限于維表,且只支持直接join表,不支持直接join子查詢。

  • 示例九:流批一體參數自定義。

    流批一體任務中會廣泛使用鏡像表,而鏡像表在最終使用時會翻譯為對應的流表或批表。

    為了適應流表/批表的多樣性(流表/批表的數據源可能不一樣,帶來with參數中key可能不一樣;流表/批表的某些設置可能不一樣,比如batchSize等),所以需要將使用TableHints進行流表/批表的對應。語法如下:

    set project.table.${mode}.${key}

    設置批任務的起停時間。示例如下:

    set project.table.batch.startTime='2020-11-11 00:00:00';
    set project.table.batch.endTime='2020-11-12 00:00:00';

    設置流的AccessKey。示例如下:

    set dwd_lux_trd_ord_subpay_mirror.`stream`.accessId='xxxxxx';
    set dwd_lux_trd_ord_subpay_mirror.`stream`.accessKey='xxxxxx';
  • 示例十:結果表列裁剪。

    通過hint方式,可以指定結果表的列裁剪。

    重要

    該特性只對HBase的Sink表生效。

    -- 假設hbase ddl的語句如下
    CREATE TABLE hbase_sink(
      rowkey INT,
      family1 ROW<q1 INT>,
      family2 ROW<q2 STRING, q3 BIGINT>
    ) with (
      'connector'='cloudhbase',
      'table-name'='<yourTableName>',
      'zookeeper.quorum'='<yourZookeeperQuorum>'
    );
    
    -- 報錯:因為表字段不匹配
    insert into hbase_sink select key, ROW(f1q1) from ...
    
    -- 通過:這里通過hint方式指定裁剪sink表,指定寫入rowkey和q1列
    insert into hbase_sink/*+dataphincolumns(rowkey,family1.q1)*/
    select key, ROW(f1q1) from ...
    
    -- 通過:這里通過hint方式指定裁剪sink表,指定寫入rowkey和q2、q3列
    insert into hbase_sink/*+dataphincolumns(rowkey,family2.q2,family2.q3)*/
    select key, ROW(f2q2,f2q3) from ...