本文為您介紹如何使用Upsert Kafka連接器。
背景信息
Upsert Kafka連接器支持以upsert方式從Kafka topic中讀取數據并將數據寫入Kafka topic。
作為源表,此連接器可以將Kafka中存儲的數據轉換為changelog流,其中每條數據記錄代表一個更新或刪除事件。更準確地說,數據記錄中的value被解釋為同一key的最后一個value的UPDATE,如果有這個key,如果不存在相應的key,則該更新被視為INSERT。用表來類比,changelog流中的數據記錄被解釋為UPSERT,也稱為INSERT或UPDATE,因為任何具有相同key的現有行都被覆蓋。另外,value為空的消息將會被視作為DELETE消息。
作為結果表或數據攝入目標端,此連接器可以消費上游計算邏輯產生的changelog流。它會將INSERT或UPDATE_AFTER數據作為正常的Kafka消息寫入,并將DELETE數據以value為空的Kafka消息寫入,表示對應key的消息被刪除。Flink將根據主鍵列的值對數據進行分區,從而保證主鍵上的消息有序,因此同一主鍵上的更新或刪除消息將落在同一分區中。
類別 | 詳情 |
支持類型 | 源表和結果表,數據攝入目標端 |
運行模式 | 流模式 |
數據格式 | avro、avro-confluent、csv、json和raw |
特有監控指標 |
|
API種類 | SQL,數據攝入YAML作業 |
是否支持更新或刪除結果表數據 | 是 |
前提條件
您需要創建Kafka集群,詳情請參見創建DataFlow Kafka集群或在Kafka創建資源。
您需要連接實時計算Flink與Kafka集群之間網絡。Kafka on EMR可參見文檔配置創建和管理專有網絡和安全組概述,云消息隊列 Kafka 版需要配置白名單。
使用限制
SQL
Upsert Kafka連接器支持以upsert方式從Kafka topic中讀取數據并將數據寫入Kafka topic。
語法結構
CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);
WITH參數
通用
參數
說明
數據類型
是否必填
默認值
備注
connector
表類型。
String
是
無
固定值為upsert-kafka。
properties.bootstrap.servers
Kafka broker地址。
String
是
無
格式為
host:port,host:port,host:port
,以英文逗號(,)分割。properties.*
對Kafka客戶端的直接配置。
String
否
無
Flink會將properties.前綴移除,并將剩余的配置傳遞給Kafka客戶端。例如可以通過
'properties.allow.auto.create.topics' = 'false'
來禁用自動創建topic。不能通過該方式修改以下配置,因為它們會被Kafka連接器覆蓋:
key.deserializer
value.deserializer
key.format
讀取或寫入Kafka消息key部分時使用的格式。
String
是
無
當使用該配置時,key.fields或key.fields-prefix配置是必填的。
參數取值如下:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
key.fields-prefix
為所有Kafka消息key部分指定自定義前綴,以避免與消息value部分格式字段重名。
String
否
無
該配置項僅用于源表和結果表的列名區分,解析和生成Kafka消息key部分時,該前綴會被移除。
說明使用該配置時,value.fields-include必須配置為EXCEPT_KEY。
value.format
讀取或寫入Kafka消息value部分時使用的格式。
String
是
無
該配置等同于format,因此format和value.format 只能配置其中一個,如果同時配置兩個會產生沖突。
value.fields-include
在解析或生成Kafka消息value部分時,是否要包含消息key部分對應的字段。
String
是
ALL
參數取值如下:
ALL(默認值):所有列都會作為Kafka消息value部分處理。
EXCEPT_KEY:除去key.fields定義的字段,剩余字段作為Kafka消息value部分處理。
topic
讀取或寫入topic名稱。
String
是
無
無。
結果表
參數
說明
數據類型
是否必填
默認值
備注
sink.parallelism
Kafka結果表算子的并發數。
Integer
否
上游算子的并發,由框架決定
無。
sink.buffer-flush.max-rows
緩存刷新前,最多能緩存多少條記錄。
Integer
否
0(未開啟)
當結果表收到很多同key上的更新時,緩存將保留同key的最后一條記錄,因此結果表緩存能幫助減少發往Kafka topic的數據量,以及避免發送潛在的tombstone消息。
說明如果要開啟結果表緩存,需要同時設置sink.buffer-flush.max-rows和sink.buffer-flush.interval兩個選項為大于零的值。
sink.buffer-flush.interval
緩存刷新的間隔時間。
Duration
否
0(未開啟)
單位可以為毫秒(ms)、秒(s)、分鐘(min)或小時(h)。例如
'sink.buffer-flush.interval'='1 s'
。當結果表收到很多同key上的更新時,緩存將保留同key的最后一條記錄,因此結果表緩存能幫助減少發往Kafka topic的數據量,以及避免發送潛在的tombstone消息。
說明如果要開啟結果表緩存,需要同時設置sink.buffer-flush.max-rows和sink.buffer-flush.interval兩個選項為大于零的值。
數據攝入
Upsert Kafka連接器可以用于數據攝入YAML作業開發,作為目標端寫入。寫入時使用JSON格式,主鍵字段也會放入消息體中。
語法結構
sink:
type: upsert-kafka
name: upsert-kafka Sink
properties.bootstrap.servers: localhost:9092
# 阿里云消息隊列 Kafka 版
aliyun.kafka.accessKeyId: ${secret_values.kafka-ak}
aliyun.kafka.accessKeySecret: ${secret_values.kafka-sk}
aliyun.kafka.instanceId: ${instancd-id}
aliyun.kafka.endpoint: ${endpoint}
aliyun.kafka.regionId: ${region-id}
配置項
參數 | 說明 | 數據類型 | 是否必填 | 默認值 | 備注 |
type | 目標端類型。 | STRING | 是 | 無 | 固定值為upsert-kafka。 |
name | 目標端名稱。 | STRING | 否 | 無 | 無。 |
properties.bootstrap.servers | Kafka broker地址。 | STRING | 是 | 無 | 格式為 |
properties.* | 對Kafka客戶端的直接配置。 | STRING | 否 | 無 | 后綴名必須是Kafka官方文檔中定義的生產者配置。 Flink會將properties.前綴移除,并將剩余的配置傳遞給Kafka客戶端。例如可以通過 |
sink.delivery-guarantee | 寫入時的語義模式。 | STRING | 否 | at-least-once | 取值如下:
|
sink.add-tableId-to-header-enabled | 是否將table信息寫入header。 | BOOLEAN | 否 | false | 開啟時,namespace、schemaName和tableName會分別寫入header。 |
aliyun.kafka.accessKeyId | 阿里云賬號AccessKey ID。 | STRING | 否 | 無 | 詳情請參見創建AccessKey。 說明 同步數據到阿里云消息隊列Kafka版時需要配置。 |
aliyun.kafka.accessKeySecret | 阿里云賬號AccessKey Secret。 | STRING | 否 | 無 | 詳情請參見創建AccessKey。 說明 同步數據到阿里云消息隊列Kafka版時需要配置。 |
aliyun.kafka.instanceId | 阿里云Kafka消息隊列實例ID。 | STRING | 否 | 無 | 請在阿里云Kafka實例詳情界面查看。 說明 同步數據到阿里云消息隊列Kafka版時需要配置。 |
aliyun.kafka.endpoint | 阿里云Kafka API服務接入地址。 | STRING | 否 | 無 | 詳情請參見服務接入點。 說明 同步數據到阿里云消息隊列Kafka版時需要配置。 |
aliyun.kafka.regionId | Topic所在實例的地域ID。 | STRING | 否 | 無 | 詳情請參見服務接入點。 說明 同步數據到阿里云消息隊列Kafka版時需要配置。 |
使用示例
源表
創建Kafka數據源表,源表中包含網站用戶的瀏覽數據。
CREATE TABLE pageviews( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING, WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND )WITH( 'connector'='kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'format'='json' );
結果表
創建Upsert Kafka結果表。
CREATE TABLE pageviews_per_region( user_region STRING, pv BIGINT, uv BIGINT, PRIMARY KEY(user_region) NOT ENFORCED )WITH( 'connector'='upsert-kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'key.format'='avro', 'value.format'='avro' );
將統計網站用戶的瀏覽數據寫入結果表中。
INSERT INTO pageviews_per_region SELECT user_region, COUNT(*), COUNT(DISTINCTuser_id) FROM pageviews GROUP BY user_region;
數據攝入目標端
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: upsert-kafka name: Upsert Kafka Sink properties.bootstrap.servers: ${upsert.kafka.bootstraps.server} aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak} aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk} aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid} aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint} aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid} route: - source-table: ${mysql.source.table} sink-table: ${upsert.kafka.topic}