日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Upsert Kafka

本文為您介紹如何使用Upsert Kafka連接器。

背景信息

Upsert Kafka連接器支持以upsert方式從Kafka topic中讀取數據并將數據寫入Kafka topic。

  • 作為源表,此連接器可以將Kafka中存儲的數據轉換為changelog流,其中每條數據記錄代表一個更新或刪除事件。更準確地說,數據記錄中的value被解釋為同一key的最后一個value的UPDATE,如果有這個key,如果不存在相應的key,則該更新被視為INSERT。用表來類比,changelog流中的數據記錄被解釋為UPSERT,也稱為INSERT或UPDATE,因為任何具有相同key的現有行都被覆蓋。另外,value為空的消息將會被視作為DELETE消息。

  • 作為結果表或數據攝入目標端,此連接器可以消費上游計算邏輯產生的changelog流。它會將INSERT或UPDATE_AFTER數據作為正常的Kafka消息寫入,并將DELETE數據以value為空的Kafka消息寫入,表示對應key的消息被刪除。Flink將根據主鍵列的值對數據進行分區,從而保證主鍵上的消息有序,因此同一主鍵上的更新或刪除消息將落在同一分區中。

類別

詳情

支持類型

源表和結果表,數據攝入目標端

運行模式

流模式

數據格式

avro、avro-confluent、csv、json和raw

特有監控指標

  • 源表

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • 結果表

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

API種類

SQL,數據攝入YAML作業

是否支持更新或刪除結果表數據

前提條件

使用限制

  • 僅Flink計算引擎VVR 2.0.0及以上版本支持消息隊列Kafka連接器。

  • 僅支持讀取和寫入Apache Kafka 0.10及以上版本的數據。

  • 僅支持Apache Kafka 2.8版本的客戶端配置項,詳情請參見Apache Kafka消費者生產者配置項文檔。

  • Upsert Kafka結果表在使用精確一次語義時,寫入的Kafka集群必須開啟事務功能,且僅支持Apache Kafka 0.11及以上版本的集群。

SQL

Upsert Kafka連接器支持以upsert方式從Kafka topic中讀取數據并將數據寫入Kafka topic。

語法結構

CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);

WITH參數

  • 通用

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    connector

    表類型。

    String

    固定值為upsert-kafka。

    properties.bootstrap.servers

    Kafka broker地址。

    String

    格式為host:port,host:port,host:port,以英文逗號(,)分割。

    properties.*

    對Kafka客戶端的直接配置。

    String

    后綴名必須是Kafka官方文檔中定義的生產者消費者配置。

    Flink會將properties.前綴移除,并將剩余的配置傳遞給Kafka客戶端。例如可以通過'properties.allow.auto.create.topics' = 'false' 來禁用自動創建topic。

    不能通過該方式修改以下配置,因為它們會被Kafka連接器覆蓋:

    • key.deserializer

    • value.deserializer

    key.format

    讀取或寫入Kafka消息key部分時使用的格式。

    String

    當使用該配置時,key.fieldskey.fields-prefix配置是必填的。

    參數取值如下:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    key.fields-prefix

    為所有Kafka消息key部分指定自定義前綴,以避免與消息value部分格式字段重名。

    String

    該配置項僅用于源表和結果表的列名區分,解析和生成Kafka消息key部分時,該前綴會被移除。

    說明

    使用該配置時,value.fields-include必須配置為EXCEPT_KEY。

    value.format

    讀取或寫入Kafka消息value部分時使用的格式。

    String

    該配置等同于format,因此formatvalue.format 只能配置其中一個,如果同時配置兩個會產生沖突。

    value.fields-include

    在解析或生成Kafka消息value部分時,是否要包含消息key部分對應的字段。

    String

    ALL

    參數取值如下:

    • ALL(默認值):所有列都會作為Kafka消息value部分處理。

    • EXCEPT_KEY:除去key.fields定義的字段,剩余字段作為Kafka消息value部分處理。

    topic

    讀取或寫入topic名稱。

    String

    無。

  • 結果表

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    sink.parallelism

    Kafka結果表算子的并發數。

    Integer

    上游算子的并發,由框架決定

    無。

    sink.buffer-flush.max-rows

    緩存刷新前,最多能緩存多少條記錄。

    Integer

    0(未開啟)

    當結果表收到很多同key上的更新時,緩存將保留同key的最后一條記錄,因此結果表緩存能幫助減少發往Kafka topic的數據量,以及避免發送潛在的tombstone消息。

    說明

    如果要開啟結果表緩存,需要同時設置sink.buffer-flush.max-rowssink.buffer-flush.interval兩個選項為大于零的值。

    sink.buffer-flush.interval

    緩存刷新的間隔時間。

    Duration

    0(未開啟)

    單位可以為毫秒(ms)、秒(s)、分鐘(min)或小時(h)。例如'sink.buffer-flush.interval'='1 s'。

    當結果表收到很多同key上的更新時,緩存將保留同key的最后一條記錄,因此結果表緩存能幫助減少發往Kafka topic的數據量,以及避免發送潛在的tombstone消息。

    說明

    如果要開啟結果表緩存,需要同時設置sink.buffer-flush.max-rowssink.buffer-flush.interval兩個選項為大于零的值。

數據攝入

Upsert Kafka連接器可以用于數據攝入YAML作業開發,作為目標端寫入。寫入時使用JSON格式,主鍵字段也會放入消息體中。

語法結構

sink:
  type: upsert-kafka
  name: upsert-kafka Sink
  properties.bootstrap.servers: localhost:9092
  # 阿里云消息隊列 Kafka 版
  aliyun.kafka.accessKeyId: ${secret_values.kafka-ak}
  aliyun.kafka.accessKeySecret: ${secret_values.kafka-sk}
  aliyun.kafka.instanceId: ${instancd-id}
  aliyun.kafka.endpoint: ${endpoint}
  aliyun.kafka.regionId: ${region-id}

配置項

參數

說明

數據類型

是否必填

默認值

備注

type

目標端類型。

STRING

固定值為upsert-kafka。

name

目標端名稱。

STRING

無。

properties.bootstrap.servers

Kafka broker地址。

STRING

格式為host:port,host:port,host:port,以英文逗號(,)分割。

properties.*

對Kafka客戶端的直接配置。

STRING

后綴名必須是Kafka官方文檔中定義的生產者配置。

Flink會將properties.前綴移除,并將剩余的配置傳遞給Kafka客戶端。例如可以通過'properties.allow.auto.create.topics' = 'false' 來禁用自動創建topic。

sink.delivery-guarantee

寫入時的語義模式。

STRING

at-least-once

取值如下:

  • none:不保證任何語義,數據可能會丟失或重復。

  • at-least-once(默認值):保證數據不丟失,但可能會重復。

  • exactly-once:使用Kafka事務保證數據不會丟失和重復。

sink.add-tableId-to-header-enabled

是否將table信息寫入header。

BOOLEAN

false

開啟時,namespace、schemaName和tableName會分別寫入header。

aliyun.kafka.accessKeyId

阿里云賬號AccessKey ID。

STRING

詳情請參見創建AccessKey

說明

同步數據到阿里云消息隊列Kafka版時需要配置。

aliyun.kafka.accessKeySecret

阿里云賬號AccessKey Secret。

STRING

詳情請參見創建AccessKey。

說明

同步數據到阿里云消息隊列Kafka版時需要配置。

aliyun.kafka.instanceId

阿里云Kafka消息隊列實例ID。

STRING

請在阿里云Kafka實例詳情界面查看。

說明

同步數據到阿里云消息隊列Kafka版時需要配置。

aliyun.kafka.endpoint

阿里云Kafka API服務接入地址。

STRING

詳情請參見服務接入點。

說明

同步數據到阿里云消息隊列Kafka版時需要配置。

aliyun.kafka.regionId

Topic所在實例的地域ID。

STRING

詳情請參見服務接入點。

說明

同步數據到阿里云消息隊列Kafka版時需要配置。

使用示例

  • 源表

    創建Kafka數據源表,源表中包含網站用戶的瀏覽數據。

    CREATE TABLE pageviews(
    user_id BIGINT,
    page_id BIGINT,
    viewtime TIMESTAMP,
    user_region STRING,
    WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
    )WITH(
    'connector'='kafka',
    'topic'='<yourTopicName>',
    'properties.bootstrap.servers'='...',
    'format'='json'
    );
  • 結果表

    • 創建Upsert Kafka結果表。

      CREATE TABLE pageviews_per_region(
      user_region STRING,
      pv BIGINT,
      uv BIGINT,
      PRIMARY KEY(user_region) NOT ENFORCED
      )WITH(
      'connector'='upsert-kafka',
      'topic'='<yourTopicName>',
      'properties.bootstrap.servers'='...',
      'key.format'='avro',
      'value.format'='avro'
      );
    • 將統計網站用戶的瀏覽數據寫入結果表中。

      INSERT INTO pageviews_per_region
      SELECT
      user_region,
      COUNT(*),
      COUNT(DISTINCTuser_id)
      FROM pageviews
      GROUP BY user_region;
  • 數據攝入目標端

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: upsert-kafka
      name: Upsert Kafka Sink
      properties.bootstrap.servers: ${upsert.kafka.bootstraps.server}
      aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak}
      aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk}
      aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid}
      aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint}
      aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid}
    
    route:
      - source-table: ${mysql.source.table}
        sink-table: ${upsert.kafka.topic}

最佳實踐