日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

管理Kafka JSON Catalog

重要

本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。

配置Kafka JSON Catalog后,您可以在Flink全托管作業開發中直接訪問Kafka集群中格式為JSON的Topic,無需再定義Schema。本文為您介紹如何在Flink全托管模式下創建、查看及刪除Kafka JSON Catalog。

背景信息

Kafka JSON Catalog通過自動解析JSON格式的消息來推導Topic的Schema,您無需在Flink SQL中聲明Kafka表的Schema便可以獲取消息的具體字段信息。Kafka JSON Catalog具有以下功能特點:

  • Kafka JSON Catalog的表名對應Kafka Topic名,無需再通過DDL語句手動注冊Kafka表,提升開發效率和正確性。

  • Kafka JSON Catalog提供的表可以直接作為Flink SQL作業中的源表使用。

  • Kafka JSON Catalog可以配合CREATE TABLE AS(CTAS)語句完成Schema變更的數據同步。

本文將從以下方面為您介紹如何管理Kafka JSON Catalog:

使用限制

  • Kafka JSON Catalog僅支持消息格式為JSON的Topic,暫不支持其他格式。

  • 僅Flink計算引擎VVR 6.0.2及以上版本支持配置Kafka JSON Catalog。

    說明

    如果您使用的是VVR 4.x版本,建議升級作業至VVR 6.0.2及以上版本后使用Kafka JSON Catalog。

  • 不支持通過DDL語句修改已有的Kafka JSON Catalog。

  • 僅支持查詢數據表,不支持創建、修改和刪除數據庫和表。

    說明

    CDAS或CTAS的Kafka JSON Catalog場景下,可以自動創建Topic。

  • Kafka JSON Catalog不支持讀取或寫入開啟了SSL或SASL認證的Kafka。

  • Kafka JSON Catalog提供的表可以直接作為Flink SQL作業中的源表,不支持作為結果表和Lookup維表。

  • 由于云消息隊列 Kafka 版暫不支持采用開源版Kafka相同的接口刪除Group,創建Kafka JSON Catalog時需要指定aliyun.kafka.instanceIdaliyun.kafka.accessKeyIdaliyun.kafka.accessKeySecretaliyun.kafka.endpointaliyun.kafka.regionId才能自動刪除Group ID,詳情請參見開源對比

創建Kafka JSON Catalog

  1. 數據查詢文本編輯區域,輸入以下配置Kafka JSON Catalog的命令。

    • 自建Kafka集群或EMR Kafka集群

      CREATE CATALOG <YourCatalogName> WITH(
       'type'='kafka',
       'properties.bootstrap.servers'='<brokers>',
       'format'='json',
       'default-database'='<dbName>',
       'key.fields-prefix'='<keyPrefix>',
       'value.fields-prefix'='<valuePrefix>',
       'timestamp-format.standard'='<timestampFormat>',
       'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>',
       'infer-schema.primitive-as-string'='<primitiveAsString>',
       'infer-schema.parse-key-error.field-name'='<parseKeyErrorFieldName>',
       'infer-schema.compacted-topic-as-upsert-table'='true',
       'max.fetch.records'='100'
      );
    • 阿里云消息隊列Kafka版

      CREATE CATALOG <YourCatalogName> WITH(
       'type'='kafka',
       'properties.bootstrap.servers'='<brokers>',
       'format'='json',
       'default-database'='<dbName>',
       'key.fields-prefix'='<keyPrefix>',
       'value.fields-prefix'='<valuePrefix>',
       'timestamp-format.standard'='<timestampFormat>',
       'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>',
       'infer-schema.primitive-as-string'='<primitiveAsString>',
       'infer-schema.parse-key-error.field-name'='<parseKeyErrorFieldName>',
       'infer-schema.compacted-topic-as-upsert-table'='true',
       'max.fetch.records'='100',
       'aliyun.kafka.accessKeyId'='<aliyunAccessKeyId>',
       'aliyun.kafka.accessKeySecret'='<aliyunAccessKeySecret>',
       'aliyun.kafka.instanceId'='<aliyunKafkaInstanceId>',
       'aliyun.kafka.endpoint'='<aliyunKafkaEndpoint>',
       'aliyun.kafka.regionId'='<aliyunKafkaRegionId>'
      );

    參數

    類型

    說明

    是否必填

    備注

    YourCatalogName

    String

    Kafka JSON Catalog名稱。

    請填寫為自定義的英文名。

    重要

    參數替換為您的Catalog名稱后,需要去掉尖括號(<>),否則語法檢查會報錯。

    type

    String

    Catalog類型。

    固定值為kafka。

    properties.bootstrap.servers

    String

    Kafka Broker地址。

    格式為host1:port1,host2:port2,host3:port3

    以英文逗號(,)分割。

    format

    String

    Kafka消息格式。

    目前只支持配置為JSON。Flink會解析JSON格式的Kafka消息,來獲取Schema。

    default-database

    String

    Kafka集群名稱。

    默認值為kafka。Catalog要求三層結構定位一張表,即catalog_name.db_name.table_name。此處是配置默認的db_name,由于Kafka沒有Database的概念,您可以在此處使用任意字符串指代Kafka集群作為database的定義。

    key.fields-prefix

    String

    自定義添加到消息鍵(Key)解析出字段名稱的前綴,來避免Kafka消息鍵解析后的命名沖突問題。

    默認值為key_。例如,如果您的key字段名為a,則系統默認解析key后的字段名稱為key_a。

    說明

    key.fields-prefix的配置值不可以是value.fields-prefix的配置值的前綴。例如value.fields-prefix配置為test1_value_,則key.fields-prefix不可以配置為test1_。

    value.fields-prefix

    String

    自定義添加到消息體(Value)解析出字段名稱的前綴,來避免Kafka消息體解析后的命名沖突問題。

    默認值為value_。例如,如果您的value字段名為b,則系統默認解析value后的字段名稱為value_b。

    說明

    value.fields-prefix的配置值不可以是key.fields-prefix的配置值的前綴。例如key.fields-prefix配置為test2_value_,則value.fields-prefix不可以配置為test2_。

    timestamp-format.standard

    String

    解析JSON格式消息中Timestamp類型字段的格式,首先嘗試通過您配置的格式去解析,解析失敗后再自動嘗試使用其他格式解析。

    可配置的值有以下兩種:

    • SQL(默認值)

    • ISO-8601

    infer-schema.flatten-nested-columns.enable

    Boolean

    解析JSON格式消息體(Value)時,是否遞歸式地展開JSON中的嵌套列。

    參數取值如下:

    • true:遞歸式展開。

      對于被展開的列,Flink使用索引該值的路徑作為名字。例如,對于{"nested": {"col": true}} 中的列col,它展開后的名字為nested.col。

      說明

      設置為true時,建議和CREATE TABLE AS(CTAS)語句配合使用,目前暫不支持其它DML語句自動展開嵌套列。

    • false(默認值):將嵌套類型當作String處理。

    infer-schema.primitive-as-string

    Boolean

    解析JSON格式消息體(Value)時,是否推導所有基本類型為String類型。

    參數取值如下:

    • true:推導所有基本類型為String。

    • false(默認值):按照基本規則進行推導。

    infer-schema.parse-key-error.field-name

    String

    解析JSON格式消息鍵(Key)時,如果消息鍵不為空,且解析失敗,會添加key.fields-prefix前綴拼接此配置項的值為列名,類型為VARBINARY的字段到表Schema,表示消息鍵部分的數據。

    默認值為col。如:消息體解析出的字段為value_name,消息鍵不為空但解析失敗,則默認返回的Schema包含兩個字段:key_col,value_name。

    infer-schema.compacted-topic-as-upsert-table

    Boolean

    當Kafka topic的日志清理策略為compact且消息鍵(Key)不為空時,是否作為Upsert Kafka表使用。

    默認值為true。使用CTAS或CDAS語法同步數據到阿里云消息隊列Kafka版時需要配置為true。

    說明

    僅實時計算引擎VVR 6.0.2及以上版本支持該參數。

    max.fetch.records

    Int

    解析JSON格式消息時,最多嘗試消費的消息數量。

    默認值為100。

    aliyun.kafka.accessKeyId

    String

    阿里云賬號AccessKey ID,詳情請參見創建AccessKey

    使用CTAS或CDAS語法同步數據到阿里云消息隊列Kafka版時需要配置。

    說明

    僅實時計算引擎VVR 6.0.2及以上版本支持該參數。

    aliyun.kafka.accessKeySecret

    String

    阿里云賬號AccessKey Secret,詳情請參見創建AccessKey

    使用CTAS或CDAS語法同步數據到阿里云消息隊列Kafka版時需要配置。

    說明

    僅實時計算引擎VVR 6.0.2及以上版本支持該參數。

    aliyun.kafka.instanceId

    String

    阿里云Kafka消息隊列實例ID,可在消息隊列Kafka實例詳情界面查看。

    使用CTAS或CDAS語法同步數據到阿里云消息隊列Kafka版時需要配置。

    說明

    僅實時計算引擎VVR 6.0.2及以上版本支持該參數。

    aliyun.kafka.endpoint

    String

    阿里云Kafka API服務接入地址,詳情請參見服務接入點

    使用CTAS或CDAS語法同步數據到阿里云消息隊列Kafka版時需要配置。

    說明

    僅實時計算引擎VVR 6.0.2及以上版本支持該參數。

    aliyun.kafka.regionId

    String

    Topic所在實例的地域ID,詳情請參見服務接入點

    使用CTAS或CDAS語法同步數據到阿里云消息隊列Kafka版時需要配置。

    說明

    僅實時計算引擎VVR 6.0.2及以上版本支持該參數。

  2. 選中創建Catalog的代碼后,單擊左側代碼行數上的運行

    image.png

  3. 在左側元數據區域,查看創建的Catalog。

查看Kafka JSON Catalog

  1. 數據查詢文本編輯區域,輸入以下命令。

    DESCRIBE `${catalog_name}`.`${db_name}`.`${topic_name}`;

    參數

    說明

    ${catalog_name}

    Kafka JSON Catalog名稱。

    ${db_name}

    Kafka集群名稱。

    ${topic_name}

    Kafka Topic名稱。

  2. 選中查看Catalog的代碼后,單擊左側代碼行數上的運行

    運行成功后,可以在運行結果中查看表的具體信息。表信息

使用Kafka JSON Catalog

  • 作為源表,從Kafka Topic中讀取數據。

    INSERT INTO ${other_sink_table}
    SELECT...
    FROM `${kafka_catalog}`.`${db_name}`.`${topic_name}`/*+OPTIONS('scan.startup.mode'='earliest-offset')*/;
    說明

    如果Kafka JSON Catalog的表使用時需要指定其他WITH參數,則建議使用SQL Hints的方式來添加其他參數。例如,如上SQL使用了SQL Hints指定從最早的數據開始消費。其他參數詳情請參見消息隊列Kafka源表消息隊列Kafka結果表

  • 作為源表,使用CREATE TABLE AS(CTAS)語句將Kafka Topic中的數據同步至目標表中。

    • 單表同步,實時同步數據。

      CREATE TABLE IF NOT EXISTS `${target_table_name}`
      WITH(...)
      AS TABLE `${kafka_catalog}`.`${db_name}`.`${topic_name}`
      /*+OPTIONS('scan.startup.mode'='earliest-offset')*/;
    • 在一個作業中同步多張表。

      BEGIN STATEMENT SET;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table0`
      AS TABLE `kafka-catalog`.`kafka`.`topic0`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table1`
      AS TABLE `kafka-catalog`.`kafka`.`topic1`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table2`
      AS TABLE `kafka-catalog`.`kafka`.`topic2`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      END;

      結合Kafka JSON Catalog,您可以在同一個任務中同步多張Kafka表。但需要滿足以下條件:

      • 所有Kafka表均未配置topic-pattern參數。

      • 每張表關于Kafka的配置必須完全相同,即properties.*配置的屬性完全相同,包括properties.bootstrap.servers和properties.group.id。

      • 每張表的 scan.startup.mode配置必須完全相同,且只能配置為group-offsets、latest-offset或earliest-offset,不能配置為其他值。

      例如,下圖中上面兩張表滿足條件,下面兩張表違反了以上三個條件。示例

說明

完整的端到端的Kafka JSON Catalog使用示例詳情請參見日志實時入倉快速入門使用Flink讀取kafka Catalog源表數據后寫入下游Hologres表

刪除Kafka JSON Catalog

警告

刪除Kafka JSON Catalog不會影響已運行的作業,但會導致使用該Catalog下表的作業,在上線或重啟時報無法找到該表的錯誤,請您謹慎操作。

  1. 數據查詢文本編輯區域,輸入以下命令。

    DROP CATALOG ${catalog_name};

    其中${catalog_name}為您要刪除的目標Kafka JSON Catalog名稱。

  2. 選中刪除Catalog的命令,鼠標右鍵選擇運行

  3. 在左側元數據區域,查看目標Catalog是否已刪除。

從Kafka JSON Catalog獲取的表信息詳解

為了方便使用Kafka JSON Catalog獲取的表,Kafka JSON Catalog會在推導的表上添加默認的配置參數、元數據和主鍵信息。Kafka JSON Catalog獲取的表的詳細信息如下:

  • Kafka表的Schema推導

    Kafka JSON Catalog在解析JSON格式消息獲取Topic的Schema時,Catalog會嘗試消費最多max.fetch.records條消息,解析每條數據的Schema,解析規則與Kafka作為CTAS數據源時的基本規則相同,再將這些Schema合并作為最終的Schema。

    重要
    • Kafka JSON Catalog在推導Schema時,會建立消費組消費該Topic的數據,消費組名稱使用前綴表明是Catalog創建的。

    • 對于阿里云消息隊列Kafka版,建議在6.0.7及以上的版本使用Kafka JSON Catalog。6.0.7版本以前不會自動刪除消費組,將導致用戶收到消費組數據堆積告警。

    Schema主要包含以下幾個部分:

    • 推導的物理列(Physical Columns)

      Kafka JSON Catalog會從Kafka消息的消息鍵(Key)和消息體(Value)推導出消息的物理列,列名添加對應的前綴。

      如果消息鍵不為空但解析失敗,會返回列名為key.fields-prefix前綴和infer-schema.parse-key-error.field-name參數配置值的拼接結果,類型為VARBINARY的列。

      當拉取到一組Kafka消息后,Catalog會逐條解析Kafka消息并按以下規則合并解析出的物理列,從而作為整個Topic的Schema。合并規則如下:

      • 如果解析出的物理列中包含結果Schema中沒有的字段,則Kafka JSON Catalog會自動將這些字段加入到結果Schema。

      • 如果兩者出現了同名列,則按照以下場景進行處理:

        • 當類型相同且精度不同時,會取兩者中較大的精度的類型。

        • 當類型不同時,會按照如下圖的樹型結構找到最小父節點,作為該同名列的類型。但當Decimal和Float類型合并時,為了保留精度會合并為Double類型。Schema合并

      例如,對于下面包含三條數據的一個Kafka topic,Kafka JSON Catalog得到的Schema如下圖所示。Schema

    • 默認添加的元數據列(Metadata Column)

      Kafka JSON Catalog會默認添加partition,offset和timestamp三個有用的元數據列。詳情如下表所示。

      元數據名

      列名稱

      類型

      說明

      partition

      partition

      INT NOT NULL

      分區值。

      offset

      offset

      BIGINT NOT NULL

      偏移量。

      timestamp

      timestamp

      TIMESTAMP_LTZ(3) NOT NULL

      消息時間戳。

    • 默認添加的主鍵約束

      從Kafka JSON Catalog獲取的表,在作為源表消費時,會默認把元數據列partition和offset列作為主鍵,確保數據不重復。

    說明

    如果Kafka JSON Catalog推導出來的表Schema不符合預期,您可以通過CREATE TEMPORARY TABLE ... LIKE語法聲明臨時表來指定期望的表Schema。比如JSON數據中存在字段ts,字段格式是'2023-01-01 12:00:01',Kafka JSON Catalog會將ts字段自動推導成TIMESTAMP類型,如果希望ts字段作為STRING類型使用,可以通過CREATE TEMPORARY TABLE ... LIKE語法聲明該表進行使用。如下所示,由于默認配置中消息Value部分字段添加了value_前綴,此處字段名為value_ts。

    CREATE TEMPORARY TABLE tempTable (
        value_name STRING,
        value_ts STRING
    ) LIKE `kafkaJsonCatalog`.`kafka`.`testTopic`;
  • 默認添加的表參數

    參數

    說明

    備注

    connector

    Connector類型。

    固定值為kafka或upsert-kafka。

    topic

    對應的Topic名稱。

    聲明的表名。

    properties.bootstrap.servers

    Kafka Broker地址。

    對應Catalog的properties.bootstrap.servers參數配置值。

    value.format

    Flink Kafka Connector在序列化或反序列化Kafka的消息體(Value)時使用的格式。

    固定值為json。

    value.fields-prefix

    為所有Kafka消息體(Value)指定自定義前綴,以避免與消息鍵(Key)或Metadata字段重名。

    對應Catalog的value.fields-prefix參數配置值。

    value.json.infer-schema.flatten-nested-columns.enable

    Kafka消息體(Value)是否遞歸式地展開JSON中的嵌套列。

    對應Catalog的infer-schema.flatten-nested-columns.enable參數配置值。

    value.json.infer-schema.primitive-as-string

    Kafka消息體(Value)是否推導所有基本類型為String類型。

    對應Catalog的infer-schema.primitive-as-string參數配置值。

    value.fields-include

    定義消息體在處理消息鍵字段時的策略。

    固定值為EXCEPT_KEY。表示消息體中不包含消息鍵的字段。

    消息鍵(Key)不為空時配置該參數,消息鍵(Key)為空時不配置該參數。

    key.format

    Flink Kafka Connector在序列化/反序列化Kafka的消息鍵(Key)時使用的格式。

    固定值為json或raw。

    消息鍵(Key)不為空時配置該參數,消息鍵(Key)為空時不配置該參數。

    當消息鍵(Key)不為空但解析失敗時,配置為raw;解析成功時,配置為json。

    key.fields-prefix

    為所有Kafka消息鍵(Key)指定自定義前綴,以避免與消息體(Value)格式字段重名。

    對應Catalog的key.fields-prefix參數配置值。

    消息鍵(Key)不為空時配置該參數,消息鍵(Key)為空時不配置該參數。

    key.fields

    Kafka消息鍵(Key)解析出來的數據存放的字段。

    自動填寫解析出來的Key字段列表。

    消息鍵(Key)不為空且不是Upsert Kafka表時配置該參數,否則不配置該參數。