日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

實(shí)時(shí)消費(fèi)Kafka數(shù)據(jù)

更新時(shí)間:

當(dāng)您需要將Kafka數(shù)據(jù)寫(xiě)入云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版,且不希望使用其他數(shù)據(jù)集成工具時(shí),可以通過(guò)實(shí)時(shí)數(shù)據(jù)消費(fèi)功能直接消費(fèi)Kafka數(shù)據(jù),減少實(shí)時(shí)處理組件依賴,提升寫(xiě)入吞吐。

Apache Kafka是一個(gè)容錯(cuò)、低延遲、分布式的發(fā)布-訂閱消息系統(tǒng)。Streaming Server支持從Apache和Confluent Kafka發(fā)行版中加載Kafka數(shù)據(jù)。通過(guò)云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版可讀外表對(duì)Kafka數(shù)據(jù)進(jìn)行轉(zhuǎn)換,并將數(shù)據(jù)寫(xiě)入云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版目標(biāo)表中。

前提條件

  • Kafka服務(wù)與云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版實(shí)例需在同一專有網(wǎng)絡(luò)(VPC)。

    重要

    如果Kafka服務(wù)與云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版實(shí)例屬于同一專有網(wǎng)絡(luò)但是不在同一交換機(jī)(vSwitch)下,您需要進(jìn)行如下操作:

    • 將Kafka服務(wù)所在交換機(jī)的IPv4網(wǎng)段添加至云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版實(shí)例白名單中。具體操作,請(qǐng)參見(jiàn)設(shè)置白名單

    • 云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版實(shí)例所在交換機(jī)的IPv4網(wǎng)段添加至Kafka服務(wù)白名單中。具體操作,請(qǐng)參見(jiàn)配置白名單

  • 已在Kafka服務(wù)中生成了大量樣例數(shù)據(jù)。本文以阿里云云消息隊(duì)列Kafka版為例,具體信息如下。

    • 接入點(diǎn)信息:alikafka-post-cn-wwo3hflb****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflb****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflb****-3-vpc.alikafka.aliyuncs.com:9092

    • Topic:test_topic

    • consumer group:test_consumer_group

  • 已在云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版中創(chuàng)建目標(biāo)用戶和目標(biāo)表,同時(shí)在任務(wù)中使用的數(shù)據(jù)庫(kù)用戶需要具備以下權(quán)限。

    • 使用gpfdist協(xié)議創(chuàng)建只讀外表的權(quán)限。

    • 任務(wù)中配置的數(shù)據(jù)庫(kù)Schema的USAGE和CREATE權(quán)限。

    • 任務(wù)中配置的寫(xiě)入目標(biāo)表的SELECT和INSERT權(quán)限。

    本文以liss_test用戶和liss_test.liss_test_plaintext表為例。

    CREATE role liss_test with login;
    ALTER role liss_test with password 'lissTest****';
    ALTER role liss_test CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist'); 
    
    \c - liss_test
    CREATE DATABASE liss_test;
    \c liss_test
    CREATE SCHEMA liss_test;
    
    CREATE TABLE liss_test.liss_test_plaintext (
    column_1 varchar(32),
    column_2 bigint,
    column_3 numeric,
    column_4 varchar(32),
    column_5 varchar(32)
    ) distributed by (column_1, column_2);

使用限制

  • 云原生數(shù)據(jù)倉(cāng)庫(kù) AnalyticDB PostgreSQL 版6.0實(shí)例需為v6.6.0及以上版本。云原生數(shù)據(jù)倉(cāng)庫(kù) AnalyticDB PostgreSQL 版7.0實(shí)例需為v7.0.3及以上版本。AnalyticDB PostgreSQL版Serverless模式實(shí)例暫不支持。

  • 實(shí)時(shí)數(shù)據(jù)消費(fèi)目前僅支持INSERT、MERGE(UPSERT)、UPDATE三種語(yǔ)法,暫不支持DELETE與READ。

  • 使用MERGE(UPSERT)或UPDATE時(shí),需要云原生數(shù)據(jù)倉(cāng)庫(kù) AnalyticDB PostgreSQL 版表有主鍵索引。

  • 使用實(shí)時(shí)數(shù)據(jù)消費(fèi),不同分區(qū)(Partition)之間需要使用主鍵列做分區(qū)因子,否則可能會(huì)造成全局死鎖錯(cuò)誤,導(dǎo)致部分?jǐn)?shù)據(jù)更新失敗。

  • 實(shí)時(shí)數(shù)據(jù)消費(fèi)當(dāng)前僅支持Kafka消息隊(duì)列,暫不支持CDC格式的數(shù)據(jù)源。

  • 當(dāng)前的版本向?qū)J街С諧SV和Delimited兩種格式的數(shù)據(jù)源,專業(yè)模式支持CSV、Delimited和protobuf三種格式的數(shù)據(jù)源。

操作步驟

步驟一:開(kāi)啟實(shí)時(shí)數(shù)據(jù)服務(wù)

  1. 登錄云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版控制臺(tái)

  2. 在控制臺(tái)左上角,選擇實(shí)例所在地域。

  3. 找到目標(biāo)實(shí)例,單擊實(shí)例ID。

  4. 在控制臺(tái)左側(cè)導(dǎo)航欄單擊實(shí)時(shí)數(shù)據(jù)消費(fèi),再單擊左上角開(kāi)啟實(shí)時(shí)數(shù)據(jù)服務(wù)

    image

  5. 在彈出的對(duì)話框中填寫(xiě)名稱服務(wù)描述并單擊確定。開(kāi)通完成后,可在控制臺(tái)看到服務(wù)狀態(tài)連接信息

    image

    說(shuō)明

    服務(wù)規(guī)格當(dāng)前不可選,默認(rèn)為8CU。

步驟二:新增實(shí)時(shí)數(shù)據(jù)源

  1. 登錄云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版控制臺(tái)
  2. 在控制臺(tái)左上角,選擇實(shí)例所在地域。
  3. 找到目標(biāo)實(shí)例,單擊實(shí)例ID。
  4. 在左側(cè)導(dǎo)航欄,單擊實(shí)時(shí)數(shù)據(jù)消費(fèi)

  5. 實(shí)時(shí)數(shù)據(jù)源卡片中,單擊新增數(shù)據(jù)源,并完成以下配置。

    配置項(xiàng)

    描述

    關(guān)聯(lián)數(shù)據(jù)服務(wù)

    在下拉框中選擇已創(chuàng)建的實(shí)時(shí)數(shù)據(jù)服務(wù)。

    數(shù)據(jù)源名稱

    自定義數(shù)據(jù)源名稱。

    數(shù)據(jù)源描述

    自定義數(shù)據(jù)源描述。

    數(shù)據(jù)源類型

    目前僅支持Kafka。

    brokers

    Kafka接入點(diǎn)信息。

    • 阿里云的Kafka服務(wù),可登錄阿里云控制臺(tái)獲取默認(rèn)接入點(diǎn)。具體操作,請(qǐng)參見(jiàn)查看接入點(diǎn)

    • 自建的kafka服務(wù),Brokers需要填寫(xiě)Kafka服務(wù)具體的`hostname:port``ip:port`信息。

    topic

    Kafka的Topic名稱。

    format

    當(dāng)前版本向?qū)J街С諧SV和Delimited兩種格式的數(shù)據(jù)源,專業(yè)模式支持CSV、Delimited和protobuf三種格式的數(shù)據(jù)源。

    列分隔符

    可設(shè)置任意單字符分隔符。

  6. 單擊確定

步驟三:新增實(shí)時(shí)任務(wù)

  1. 實(shí)時(shí)任務(wù)卡片中,單擊新增實(shí)時(shí)任務(wù),并完成以下配置。

    請(qǐng)根據(jù)業(yè)務(wù)需要選擇向?qū)J?/b>或專業(yè)模式

    向?qū)J剑嚎梢酝ㄟ^(guò)控制臺(tái)中的指引來(lái)快速搭建任務(wù)。

    專業(yè)模式:可以通過(guò)提交YAML的方式向Streaming Server提交任務(wù),功能相比于向?qū)J礁S富。

    向?qū)J?/b>

    配置項(xiàng)

    描述

    基本信息

    任務(wù)名稱

    定義任務(wù)的名稱,任務(wù)名稱不可以重復(fù),必填。

    任務(wù)描述

    描述任務(wù)內(nèi)容,選填。

    配置模式

    向?qū)J健?/p>

    源端配置

    數(shù)據(jù)源

    選擇在新增實(shí)時(shí)數(shù)據(jù)源中配置的數(shù)據(jù)源,目前僅支持Kafka為源的數(shù)據(jù)源。

    group_name

    Kafka的消費(fèi)者組。

    failback_offset

    消費(fèi)位點(diǎn)。

    • earliest:從最早可用位點(diǎn)消費(fèi)。

    • latest:從最新的位點(diǎn)開(kāi)始消費(fèi)。

    投遞保證

    流計(jì)算中的一致性語(yǔ)義,支持:

    • ATLEAST:在Kafka中的數(shù)據(jù)至少有一次被寫(xiě)入云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版

    • EXACTLY:在Kafka中的數(shù)據(jù)有且僅有一次被寫(xiě)入云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版

    目標(biāo)端配置

    目標(biāo)庫(kù)

    需要寫(xiě)入的云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版目標(biāo)數(shù)據(jù)庫(kù)名稱。

    Schema

    云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版的模式名稱。

    目標(biāo)表

    需要寫(xiě)入的云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版目標(biāo)表名稱。

    賬號(hào)

    當(dāng)前任務(wù)使用的云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版數(shù)據(jù)庫(kù)賬號(hào)。

    密碼

    賬號(hào)密碼。

    寫(xiě)入模式

    目前僅支持INSERT、UPDATE和MERGE三種寫(xiě)入模式。

    • INSERT:將數(shù)據(jù)直接寫(xiě)入目標(biāo)表。

    • UPDATE:當(dāng)輸入列中的MatchColumns與目標(biāo)表中的列匹配,更新UpdateColumns中列出的目標(biāo)表列。

    • MERGE:當(dāng)寫(xiě)入數(shù)據(jù)與目標(biāo)表列的值相等時(shí),使用寫(xiě)入數(shù)據(jù)更新目標(biāo)表列的現(xiàn)有數(shù)據(jù)。當(dāng)寫(xiě)入數(shù)據(jù)與目標(biāo)表列的值不相等時(shí),直接將數(shù)據(jù)寫(xiě)入目標(biāo)表。MERGE寫(xiě)入模式可類比于UPSERT(UPDATE and INSERT),關(guān)于UPSERT的寫(xiě)入方式,請(qǐng)參見(jiàn)使用INSERT ON CONFLICT覆蓋寫(xiě)入數(shù)據(jù)

    說(shuō)明

    MatchColumns與UpdateColumns的含義請(qǐng)參見(jiàn)下文字段類型的描述。

    ErrorLimitCount

    錯(cuò)誤數(shù)據(jù)的容忍閾值。當(dāng)寫(xiě)入的錯(cuò)誤數(shù)據(jù)到達(dá)ErrorLimitCount時(shí),Streaming Server會(huì)自動(dòng)停止將數(shù)據(jù)源的數(shù)據(jù)寫(xiě)入云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版。0表示Streaming Server遇到第一次錯(cuò)誤數(shù)據(jù)時(shí)就會(huì)停止數(shù)據(jù)寫(xiě)入。目前該參數(shù)未啟用,填0即可。

    字段映射

    源字段

    Kafka消息中的Value字段名,需要按照在Value中出現(xiàn)的順序指定所有的字段名。

    目標(biāo)字段

    云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版目標(biāo)表的字段名。

    字段類型

    目前支持以下三種類型:

    • MatchColumns:作為寫(xiě)入時(shí)的Join條件列作為更新條件,用于判斷目標(biāo)表中哪些行需要被更新。

    • UpdateColumns:如果某一行數(shù)據(jù)符合更新條件,那么在UpdateColumns中的列會(huì)被更新為新的值。

    • 空(不填):即使某一行數(shù)據(jù)符合更新條件,該字段也不會(huì)被更新為新的值。

    在UPDATE和MERGE寫(xiě)入時(shí),Streaming Server會(huì)先將數(shù)據(jù)寫(xiě)入一個(gè)臨時(shí)表,然后利用MatchColumns作為條件列與目標(biāo)表進(jìn)行Join:

    • 如果有匹配的數(shù)據(jù),則會(huì)更新UpdateColumns中的數(shù)據(jù)。

    • 如果沒(méi)有匹配的數(shù)據(jù)時(shí),則會(huì)根據(jù)寫(xiě)入模式有以下兩種情況:

      • 寫(xiě)入模式為UPDATE時(shí),數(shù)據(jù)不會(huì)被寫(xiě)入。

      • 寫(xiě)入模式為MERGE時(shí),數(shù)據(jù)會(huì)被寫(xiě)入。

    專業(yè)模式

    配置項(xiàng)

    描述

    基本信息

    任務(wù)名稱

    定義任務(wù)的名稱,任務(wù)名稱不可以重復(fù),必填。

    任務(wù)描述

    描述任務(wù)內(nèi)容,選填。

    配置模式

    專業(yè)模式。

    數(shù)據(jù)源

    選擇在新增實(shí)時(shí)數(shù)據(jù)源中配置的數(shù)據(jù)源,目前僅支持Kafka為源的數(shù)據(jù)源。

    YAML

    可以通過(guò)YAML配置更復(fù)雜的寫(xiě)入邏輯。本文的YAML配置示例如下。更多詳情,請(qǐng)參見(jiàn)附錄:YAML配置說(shuō)明

    DATABASE: liss_test
    USER: liss_test
    PASSWORD: lissTest****
    HOST: gp-2ze517f9l7****-master.gpdb.rds-aliyun-pre.rds.aliyuncs.com
    PORT: 5432
    KAFKA:
      INPUT:
        SOURCE:
          BROKERS: alikafka-post-cn-wwo3hflbo002-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflbo002-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflbo002-3-vpc.alikafka.aliyuncs.com:9092
          TOPIC: test_topic
          FALLBACK_OFFSET: EARLIEST
        VALUE:
          COLUMNS:
          - NAME: column_1
            TYPE: varchar(32)
          - NAME: column_2
            TYPE: bigint
          - NAME: column_3
            TYPE: numeric
          - NAME: column_4
            TYPE: varchar(32)
          - NAME: column_5
            TYPE: varchar(32)
          FORMAT: delimited
          DELIMITED_OPTION:
            DELIMITER: "|"
        ERROR_LIMIT: 20
      OUTPUT:
        SCHEMA: liss_test
        TABLE: liss_test_plaintext
        MODE: MERGE
        MATCH_COLUMNS:
        - column_1
        - column_2
        UPDATE_COLUMNS:
        - column_3
        - column_4
        - column_5
        MAPPING:
        - NAME: column_1
          EXPRESSION: column_1
        - NAME: column_2
          EXPRESSION: column_2
        - NAME: column_3
          EXPRESSION: column_3
        - NAME: column_4
          EXPRESSION: column_4
        - NAME: column_5
          EXPRESSION: column_5
      COMMIT:
        MAX_ROW: 1000
        MINIMAL_INTERVAL: 1000
        CONSISTENCY: ATLEAST
      POLL:
        BATCHSIZE: 1000
        TIMEOUT: 1000
      PROPERTIES:
        group.id: test_consumer_group
  2. 單擊確定,并等待實(shí)時(shí)任務(wù)狀態(tài)為運(yùn)行中,即可將數(shù)據(jù)源中的數(shù)據(jù)寫(xiě)入云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版

在任務(wù)啟動(dòng)后會(huì)在目標(biāo)端配置的Schema(專業(yè)模式為METADATA.SCHEMA中配置的schema)下生成任務(wù)的兩種輔助表,其格式分別為:

  • lissext_$UID:本任務(wù)定義的gpfdist外表,用于將數(shù)據(jù)寫(xiě)入云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版

  • lisskafka_mission_info_$UID:用于存儲(chǔ)任務(wù)當(dāng)前位點(diǎn)推進(jìn)的情況,保障數(shù)據(jù)寫(xiě)入的一致性。目前為了保障寫(xiě)入任務(wù)的高可用,每個(gè)寫(xiě)入任務(wù)會(huì)生成4個(gè)子任務(wù),所以每啟動(dòng)一個(gè)寫(xiě)入任務(wù),會(huì)生成4張表。

  • UID是每個(gè)寫(xiě)入任務(wù)的唯一標(biāo)識(shí)ID。

附錄:YAML配置說(shuō)明

YAML配置文件格式如下。

DATABASE: <db_name>
USER: <user_name>
PASSWORD: <password>
HOST: <host>
PORT: <adbpg_port>
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: <kafka_broker_host:broker_port> [, ... ]
        TOPIC: <kafka_topic>
        [PARTITIONS: (<partition_numbers>)]
        [FALLBACK_OFFSET: { earliest | latest }]
      [VALUE:
        COLUMNS:
           - NAME: { <column_name> }
             TYPE: <column_data_type>
           [ ... ]
         FORMAT: <value_data_format>
         [[DELIMITED_OPTION:
            DELIMITER: <delimiter_string>
            [QUOTE: <quote_char>]
            [ESCAPE: <escape_char>] ] |
         [CSV_OPTION:
            [DELIMITER: <delim_char>]
            [QUOTE: <quote_char>]
            [NULL_STRING: <nullstr_val>]
            [ESCAPE: <escape_char>]
      [KEY:
        COLUMNS:
           - NAME: { <column_name> }
             TYPE: <column_data_type>
           [ ... ]
         FORMAT: <key_data_format>
         [[DELIMITED_OPTION:
            DELIMITER: <delimiter_string> |
            [QUOTE: <quote_char>]
            [ESCAPE: <escape_char>] ] |
         [CSV_OPTION:
            [DELIMITER: <delim_char>]
            [QUOTE: <quote_char>]
            [NULL_STRING: <nullstr_val>]
            [ESCAPE: <escape_char>]
      [META:
         COLUMNS:
            - NAME: <meta_column_name>
              TYPE: { json | jsonb }
         FORMAT: json]
      [ERROR_LIMIT: { <num_errors> | <percentage_errors> }]
   { OUTPUT:
      [SCHEMA: <output_schema_name>]
      TABLE: <table_name>
      [MODE: <mode>]
      [MATCH_COLUMNS: 
         - <match_column_name>
         [ ... ]]
      [ORDER_COLUMNS: 
         - <order_column_name>
         [ ... ]]
      [UPDATE_COLUMNS: 
         - <update_column_name>
         [ ... ]]
      [MAPPING: 
         - NAME: <target_column_name>
           EXPRESSION: { <source_column_name> | <expression> } 
         [ ... ]
           |
         <target_column_name> : { <source_column_name> | <expression> }
         [ ... ] ] }
   [METADATA:
      [SCHEMA: <metadata_schema_name>]]
   COMMIT:
      MAX_ROW: <num_rows>
      MINIMAL_INTERVAL: <wait_time>
      CONSISTENCY: { strong | at-least | at-most | none }
   [POLL:
      BATCHSIZE: <num_records>
      TIMEOUT: <poll_time>]
   [PROPERTIES:
      <kafka_property_name>: <kafka_property_value>
      [ ... ]]
[SCHEDULE:
   RETRY_INTERVAL: <retry_time>
   MAX_RETRIES: <num_retries> ]

數(shù)據(jù)庫(kù)相關(guān)配置

參數(shù)

描述

是否必填

DATABASE

目標(biāo)端云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版實(shí)例的數(shù)據(jù)庫(kù)名稱。

USER

云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版實(shí)例的賬號(hào)。

PASSWORD

云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版實(shí)例的賬號(hào)密碼。

HOST

目標(biāo)端云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版實(shí)例的內(nèi)網(wǎng)地址。

PORT

云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版實(shí)例的端口號(hào)。

VERSION

當(dāng)前采用的YAML文件格式版本,預(yù)留字段,無(wú)限制。

KAFKA:INPUT配置

KAFKA:INPUT:SOURCE

參數(shù)

描述

是否必填

參數(shù)值限制

BROKERS

Kafka接入點(diǎn)信息。

  • 阿里云的Kafka服務(wù),可登錄阿里云控制臺(tái)獲取默認(rèn)接入點(diǎn)。具體操作,請(qǐng)參見(jiàn)查看接入點(diǎn)

  • 自建的Kafka服務(wù),Brokers需要填寫(xiě)Kafka服務(wù)具體的ip:port信息。

如有多個(gè)使用英文逗號(hào)(,)進(jìn)行分隔。

對(duì)應(yīng)kafka consumer bootstrap.server 配置,需要填寫(xiě)有效的Brokers地址,否則會(huì)報(bào)錯(cuò)。

TOPIC

Kafka Topic名稱。

僅支持單個(gè)Topic。

PARTITIONS

分區(qū)編號(hào)。

如有多個(gè)分區(qū)編號(hào),使用英文逗號(hào)(,)進(jìn)行分隔。如果在PROPERTIES中配置了group.id,那么該參數(shù)會(huì)被忽略。

例如:1,2,3,4,5

FALLBACK_OFFSET

消費(fèi)位點(diǎn)。

  • earliest:從最早可用位點(diǎn)消費(fèi)。

  • latest:從最新的位點(diǎn)開(kāi)始消費(fèi)。

無(wú)

KAFKA:INPUT:KEY和KAFKA:INPUT:VALUE

Kafka消息的Key值字段名稱、數(shù)據(jù)類型和數(shù)據(jù)格式。

Kafka消息的Value字段名稱、數(shù)據(jù)類型和數(shù)據(jù)格式。

必須按照在Key和Value中出現(xiàn)的順序指定所有Kafka數(shù)據(jù)元素。

KAFKA:INPUT:KEYKAFKA:INPUT:VALUE至少需要配置一個(gè),如果兩個(gè)都未配置會(huì)報(bào)錯(cuò)。

參數(shù)

描述

是否必填

參數(shù)值限制

COLUMNS

如果定義KAFKA:INPUT:KEY,則用于定義Kafka消息中Key部分的列名與類型;

如果定義KAFKA:INPUT:VALUE,則用于定義Kafka消息中Value部分的列名與類型。

無(wú)

NAME

定義Kafka消息中的列名。該列名主要在KAFKA:OUTPUT:MAPPING中使用,用于標(biāo)記Kafka消息中的數(shù)據(jù)列。

無(wú)

TYPE

定義Kafka消息中列的類型,數(shù)據(jù)類型需要與這個(gè)列在目標(biāo)數(shù)據(jù)庫(kù)中的類型保持一致。

由于Kafka消息中Key和Value的格式不透明,因此當(dāng)前Streaming Server默認(rèn)從Kafka消息中獲取的數(shù)據(jù)格式為文本形式。

云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版支持的數(shù)據(jù)類型請(qǐng)參見(jiàn)數(shù)據(jù)類型

如果Kafka消息的列與目標(biāo)列的類型不一致,請(qǐng)?jiān)贛apping中的expression部分對(duì)類型進(jìn)行轉(zhuǎn)換。

FORMAT

定義Kafka消息數(shù)據(jù)的類型,當(dāng)前支持CSV、Delimited和protobuf。

無(wú)

KAFKA:INPUT:META

META不是必填項(xiàng),當(dāng)您需要展示Message Meta信息時(shí)配置。

參數(shù)

描述

是否必填

參數(shù)值限制

COLUMNS

定義Meta信息,為一組NAME,TYPE。

無(wú)

NAME

Meta名稱,可以指定為其他的名稱,默認(rèn)使用meta

無(wú)

TYPE

只能使用Text類型。

Text

FORMAT

只能使用Text類型。

Text

KAFKA:INPUT:ERROR_LIMIT

錯(cuò)誤數(shù)據(jù)的容忍閾值。當(dāng)寫(xiě)入的錯(cuò)誤數(shù)據(jù)達(dá)到ERROR_LIMIT時(shí),Streaming Server會(huì)退出當(dāng)前任務(wù),自動(dòng)停止將數(shù)據(jù)源的數(shù)據(jù)寫(xiě)入云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版。默認(rèn)值為0,即Streaming Server會(huì)在出現(xiàn)第一次錯(cuò)誤數(shù)據(jù)時(shí)就退出當(dāng)前任務(wù),停止數(shù)據(jù)寫(xiě)入。ERROR_LIMIT值必須大于1。

目前該參數(shù)未啟用,不選擇或者填0即可。

KAFKA:OUTPUT配置

數(shù)據(jù)庫(kù)相關(guān)配置

數(shù)據(jù)寫(xiě)入到云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版數(shù)據(jù)庫(kù)的相關(guān)配置,包括Kafka值到目標(biāo)數(shù)據(jù)庫(kù)的映射、寫(xiě)入模式等。

參數(shù)

描述

是否必填

SCHEMA

寫(xiě)入云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版的目標(biāo)表所在的Schema。

TABLE

目標(biāo)表的名稱。

MODE

寫(xiě)入模式,目前支持INSERT、UPDATE和MERGE三種方式。

MATCH_COLUMNS

當(dāng)寫(xiě)入模式為UPDATE和MERGE時(shí)生效。

指定目標(biāo)表的部分列,當(dāng)寫(xiě)入數(shù)據(jù)與目標(biāo)表數(shù)據(jù)匹配時(shí),目標(biāo)表中這部分?jǐn)?shù)據(jù)會(huì)根據(jù)UPDATE或MERGE模式對(duì)數(shù)據(jù)進(jìn)行更新。

建議MATCH_COLUMNS使用目標(biāo)表的主鍵或者唯一鍵。

ORDER_COLUMNS

在寫(xiě)入模式(MODE)為MERGE時(shí)生效。

當(dāng)寫(xiě)入數(shù)據(jù)根據(jù)MATCH_COLUMNS存在多個(gè)匹配行時(shí),使用ORDER_COLUMNS對(duì)這些數(shù)據(jù)進(jìn)行排序,以確定具有最大值的輸入行,Streaming Server使用該行來(lái)更新目標(biāo)。

UPDATE_COLUMNS

當(dāng)寫(xiě)入模式為UPDATE和MERGE時(shí)生效。

如果寫(xiě)入數(shù)據(jù)能夠根據(jù)MATCH_COLUMNS匹配到目標(biāo)表數(shù)據(jù),則會(huì)基于UPDATE_COLUMNS更新對(duì)應(yīng)的列。

說(shuō)明
  • 在使用MERGE和UPDATE模式時(shí),如果不指定ORDER_COLUMNS,當(dāng)寫(xiě)入數(shù)據(jù)根據(jù)MATCH_COLUMNS匹配到多行相同時(shí),則會(huì)隨機(jī)取一條作為結(jié)果寫(xiě)入。

  • 在指定了ORDER_COLUMNS后,其排序結(jié)果是a desc,b desc,c desc

KAFKA:OUTPUT:MAPPING

參數(shù)

描述

是否必填

NAME

目標(biāo)列名稱。

EXPRESSION

可以是源端的列名(KAFKA:INPUT:VALUE:COLUMNS中定義的列名),或者一個(gè)表達(dá)式。例如NAME : targetColumnName Expression: input + 1,其效果等效于SELECT input + 1 AS targetColumnName FROM xxx

KAFKA:METADATA配置

參數(shù)

描述

是否必填

參數(shù)限制

schema

Streaming Server創(chuàng)建的外表和其他輔助表所在的Schema名稱。

默認(rèn)取值KAFKA:OUTPUT中的Schema。

KAFKA:COMMIT配置

COMMIT用于控制向數(shù)據(jù)庫(kù)提交數(shù)據(jù)的行為。

參數(shù)

描述

是否必填

參數(shù)限制

MAX_ROW

指定一次寫(xiě)入目標(biāo)庫(kù)的最大Batch Size。

單位為行,默認(rèn):500。

MINIMAL_INTERVAL

在兩個(gè)Batch寫(xiě)入之間的等待時(shí)間。如果超過(guò)該時(shí)間,會(huì)嘗試再寫(xiě)一次。

單位為毫秒(ms),默認(rèn):1000。

CONSISTENCY

數(shù)據(jù)一致性保證。

目前僅支持ATLEAST,即kafka中的數(shù)據(jù)至少會(huì)寫(xiě)入目標(biāo)數(shù)據(jù)庫(kù)一次。

KAFKA:POLL配置

POLL用于控制Kafka Consumer消費(fèi)數(shù)據(jù)的行為。

參數(shù)

描述

是否必填

參數(shù)限制

BATCHSIZE

一次從Topic中拿出的event數(shù)量。保留字段,目前沒(méi)有實(shí)現(xiàn)相關(guān)功能。

單位為行,默認(rèn):64。

TIMEOUT

Kafka Consumer從Kafka中獲取event等待的超時(shí)時(shí)間。

單位為毫秒(ms),默認(rèn):5000。

KAFKA:PROPERTIES配置

PROPERTIES用于配置Kafka Connect,當(dāng)前采用白名單制,僅支持配置group.idauto.offset.resetisolation.level。詳細(xì)信息,請(qǐng)參見(jiàn)Kafka Connect Configs