實(shí)時(shí)消費(fèi)Kafka數(shù)據(jù)
當(dāng)您需要將Kafka數(shù)據(jù)寫(xiě)入云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版,且不希望使用其他數(shù)據(jù)集成工具時(shí),可以通過(guò)實(shí)時(shí)數(shù)據(jù)消費(fèi)功能直接消費(fèi)Kafka數(shù)據(jù),減少實(shí)時(shí)處理組件依賴,提升寫(xiě)入吞吐。
Apache Kafka是一個(gè)容錯(cuò)、低延遲、分布式的發(fā)布-訂閱消息系統(tǒng)。Streaming Server支持從Apache和Confluent Kafka發(fā)行版中加載Kafka數(shù)據(jù)。通過(guò)云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版可讀外表對(duì)Kafka數(shù)據(jù)進(jìn)行轉(zhuǎn)換,并將數(shù)據(jù)寫(xiě)入云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版目標(biāo)表中。
前提條件
Kafka服務(wù)與云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版實(shí)例需在同一專有網(wǎng)絡(luò)(VPC)。
重要如果Kafka服務(wù)與云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版實(shí)例屬于同一專有網(wǎng)絡(luò)但是不在同一交換機(jī)(vSwitch)下,您需要進(jìn)行如下操作:
將Kafka服務(wù)所在交換機(jī)的IPv4網(wǎng)段添加至云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版實(shí)例白名單中。具體操作,請(qǐng)參見(jiàn)設(shè)置白名單。
將云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版實(shí)例所在交換機(jī)的IPv4網(wǎng)段添加至Kafka服務(wù)白名單中。具體操作,請(qǐng)參見(jiàn)配置白名單。
已在Kafka服務(wù)中生成了大量樣例數(shù)據(jù)。本文以阿里云云消息隊(duì)列Kafka版為例,具體信息如下。
接入點(diǎn)信息:alikafka-post-cn-wwo3hflb****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflb****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflb****-3-vpc.alikafka.aliyuncs.com:9092
Topic:test_topic
consumer group:test_consumer_group
已在云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版中創(chuàng)建目標(biāo)用戶和目標(biāo)表,同時(shí)在任務(wù)中使用的數(shù)據(jù)庫(kù)用戶需要具備以下權(quán)限。
使用gpfdist協(xié)議創(chuàng)建只讀外表的權(quán)限。
任務(wù)中配置的數(shù)據(jù)庫(kù)Schema的USAGE和CREATE權(quán)限。
任務(wù)中配置的寫(xiě)入目標(biāo)表的SELECT和INSERT權(quán)限。
本文以
liss_test
用戶和liss_test.liss_test_plaintext
表為例。CREATE role liss_test with login; ALTER role liss_test with password 'lissTest****'; ALTER role liss_test CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist'); \c - liss_test CREATE DATABASE liss_test; \c liss_test CREATE SCHEMA liss_test; CREATE TABLE liss_test.liss_test_plaintext ( column_1 varchar(32), column_2 bigint, column_3 numeric, column_4 varchar(32), column_5 varchar(32) ) distributed by (column_1, column_2);
使用限制
云原生數(shù)據(jù)倉(cāng)庫(kù) AnalyticDB PostgreSQL 版6.0實(shí)例需為v6.6.0及以上版本。云原生數(shù)據(jù)倉(cāng)庫(kù) AnalyticDB PostgreSQL 版7.0實(shí)例需為v7.0.3及以上版本。AnalyticDB PostgreSQL版Serverless模式實(shí)例暫不支持。
實(shí)時(shí)數(shù)據(jù)消費(fèi)目前僅支持INSERT、MERGE(UPSERT)、UPDATE三種語(yǔ)法,暫不支持DELETE與READ。
使用MERGE(UPSERT)或UPDATE時(shí),需要云原生數(shù)據(jù)倉(cāng)庫(kù) AnalyticDB PostgreSQL 版表有主鍵索引。
使用實(shí)時(shí)數(shù)據(jù)消費(fèi),不同分區(qū)(Partition)之間需要使用主鍵列做分區(qū)因子,否則可能會(huì)造成全局死鎖錯(cuò)誤,導(dǎo)致部分?jǐn)?shù)據(jù)更新失敗。
實(shí)時(shí)數(shù)據(jù)消費(fèi)當(dāng)前僅支持Kafka消息隊(duì)列,暫不支持CDC格式的數(shù)據(jù)源。
當(dāng)前的版本向?qū)J街С諧SV和Delimited兩種格式的數(shù)據(jù)源,專業(yè)模式支持CSV、Delimited和protobuf三種格式的數(shù)據(jù)源。
操作步驟
步驟一:開(kāi)啟實(shí)時(shí)數(shù)據(jù)服務(wù)
在控制臺(tái)左上角,選擇實(shí)例所在地域。
找到目標(biāo)實(shí)例,單擊實(shí)例ID。
在控制臺(tái)左側(cè)導(dǎo)航欄單擊實(shí)時(shí)數(shù)據(jù)消費(fèi),再單擊左上角開(kāi)啟實(shí)時(shí)數(shù)據(jù)服務(wù)。
在彈出的對(duì)話框中填寫(xiě)名稱及服務(wù)描述并單擊確定。開(kāi)通完成后,可在控制臺(tái)看到服務(wù)狀態(tài)和連接信息。
說(shuō)明服務(wù)規(guī)格當(dāng)前不可選,默認(rèn)為8CU。
步驟二:新增實(shí)時(shí)數(shù)據(jù)源
- 登錄云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版控制臺(tái)。
- 在控制臺(tái)左上角,選擇實(shí)例所在地域。
- 找到目標(biāo)實(shí)例,單擊實(shí)例ID。
在左側(cè)導(dǎo)航欄,單擊實(shí)時(shí)數(shù)據(jù)消費(fèi)。
在實(shí)時(shí)數(shù)據(jù)源卡片中,單擊新增數(shù)據(jù)源,并完成以下配置。
配置項(xiàng)
描述
關(guān)聯(lián)數(shù)據(jù)服務(wù)
在下拉框中選擇已創(chuàng)建的實(shí)時(shí)數(shù)據(jù)服務(wù)。
數(shù)據(jù)源名稱
自定義數(shù)據(jù)源名稱。
數(shù)據(jù)源描述
自定義數(shù)據(jù)源描述。
數(shù)據(jù)源類型
目前僅支持Kafka。
brokers
Kafka接入點(diǎn)信息。
阿里云的Kafka服務(wù),可登錄阿里云控制臺(tái)獲取默認(rèn)接入點(diǎn)。具體操作,請(qǐng)參見(jiàn)查看接入點(diǎn)。
自建的kafka服務(wù),Brokers需要填寫(xiě)Kafka服務(wù)具體的
`hostname:port`
或`ip:port`
信息。
topic
Kafka的Topic名稱。
format
當(dāng)前版本向?qū)J街С諧SV和Delimited兩種格式的數(shù)據(jù)源,專業(yè)模式支持CSV、Delimited和protobuf三種格式的數(shù)據(jù)源。
列分隔符
可設(shè)置任意單字符分隔符。
單擊確定。
步驟三:新增實(shí)時(shí)任務(wù)
在實(shí)時(shí)任務(wù)卡片中,單擊新增實(shí)時(shí)任務(wù),并完成以下配置。
請(qǐng)根據(jù)業(yè)務(wù)需要選擇向?qū)J?/b>或專業(yè)模式。
向?qū)J剑嚎梢酝ㄟ^(guò)控制臺(tái)中的指引來(lái)快速搭建任務(wù)。
專業(yè)模式:可以通過(guò)提交YAML的方式向Streaming Server提交任務(wù),功能相比于向?qū)J礁S富。
向?qū)J?/b>
配置項(xiàng)
描述
基本信息
任務(wù)名稱
定義任務(wù)的名稱,任務(wù)名稱不可以重復(fù),必填。
任務(wù)描述
描述任務(wù)內(nèi)容,選填。
配置模式
向?qū)J健?/p>
源端配置
數(shù)據(jù)源
選擇在新增實(shí)時(shí)數(shù)據(jù)源中配置的數(shù)據(jù)源,目前僅支持Kafka為源的數(shù)據(jù)源。
group_name
Kafka的消費(fèi)者組。
failback_offset
消費(fèi)位點(diǎn)。
earliest:從最早可用位點(diǎn)消費(fèi)。
latest:從最新的位點(diǎn)開(kāi)始消費(fèi)。
投遞保證
流計(jì)算中的一致性語(yǔ)義,支持:
ATLEAST:在Kafka中的數(shù)據(jù)至少有一次被寫(xiě)入云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版。
EXACTLY:在Kafka中的數(shù)據(jù)有且僅有一次被寫(xiě)入云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版。
目標(biāo)端配置
目標(biāo)庫(kù)
需要寫(xiě)入的云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版目標(biāo)數(shù)據(jù)庫(kù)名稱。
Schema
云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版的模式名稱。
目標(biāo)表
需要寫(xiě)入的云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版目標(biāo)表名稱。
賬號(hào)
當(dāng)前任務(wù)使用的云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版數(shù)據(jù)庫(kù)賬號(hào)。
密碼
賬號(hào)密碼。
寫(xiě)入模式
目前僅支持INSERT、UPDATE和MERGE三種寫(xiě)入模式。
INSERT:將數(shù)據(jù)直接寫(xiě)入目標(biāo)表。
UPDATE:當(dāng)輸入列中的MatchColumns與目標(biāo)表中的列匹配,更新UpdateColumns中列出的目標(biāo)表列。
MERGE:當(dāng)寫(xiě)入數(shù)據(jù)與目標(biāo)表列的值相等時(shí),使用寫(xiě)入數(shù)據(jù)更新目標(biāo)表列的現(xiàn)有數(shù)據(jù)。當(dāng)寫(xiě)入數(shù)據(jù)與目標(biāo)表列的值不相等時(shí),直接將數(shù)據(jù)寫(xiě)入目標(biāo)表。MERGE寫(xiě)入模式可類比于UPSERT(UPDATE and INSERT),關(guān)于UPSERT的寫(xiě)入方式,請(qǐng)參見(jiàn)使用INSERT ON CONFLICT覆蓋寫(xiě)入數(shù)據(jù)。
說(shuō)明MatchColumns與UpdateColumns的含義請(qǐng)參見(jiàn)下文字段類型的描述。
ErrorLimitCount
錯(cuò)誤數(shù)據(jù)的容忍閾值。當(dāng)寫(xiě)入的錯(cuò)誤數(shù)據(jù)到達(dá)ErrorLimitCount時(shí),Streaming Server會(huì)自動(dòng)停止將數(shù)據(jù)源的數(shù)據(jù)寫(xiě)入云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版。0表示Streaming Server遇到第一次錯(cuò)誤數(shù)據(jù)時(shí)就會(huì)停止數(shù)據(jù)寫(xiě)入。目前該參數(shù)未啟用,填0即可。
字段映射
源字段
Kafka消息中的Value字段名,需要按照在Value中出現(xiàn)的順序指定所有的字段名。
目標(biāo)字段
云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版目標(biāo)表的字段名。
字段類型
目前支持以下三種類型:
MatchColumns:作為寫(xiě)入時(shí)的Join條件列作為更新條件,用于判斷目標(biāo)表中哪些行需要被更新。
UpdateColumns:如果某一行數(shù)據(jù)符合更新條件,那么在UpdateColumns中的列會(huì)被更新為新的值。
空(不填):即使某一行數(shù)據(jù)符合更新條件,該字段也不會(huì)被更新為新的值。
在UPDATE和MERGE寫(xiě)入時(shí),Streaming Server會(huì)先將數(shù)據(jù)寫(xiě)入一個(gè)臨時(shí)表,然后利用MatchColumns作為條件列與目標(biāo)表進(jìn)行Join:
如果有匹配的數(shù)據(jù),則會(huì)更新UpdateColumns中的數(shù)據(jù)。
如果沒(méi)有匹配的數(shù)據(jù)時(shí),則會(huì)根據(jù)寫(xiě)入模式有以下兩種情況:
寫(xiě)入模式為UPDATE時(shí),數(shù)據(jù)不會(huì)被寫(xiě)入。
寫(xiě)入模式為MERGE時(shí),數(shù)據(jù)會(huì)被寫(xiě)入。
專業(yè)模式
配置項(xiàng)
描述
基本信息
任務(wù)名稱
定義任務(wù)的名稱,任務(wù)名稱不可以重復(fù),必填。
任務(wù)描述
描述任務(wù)內(nèi)容,選填。
配置模式
專業(yè)模式。
數(shù)據(jù)源
選擇在新增實(shí)時(shí)數(shù)據(jù)源中配置的數(shù)據(jù)源,目前僅支持Kafka為源的數(shù)據(jù)源。
YAML
可以通過(guò)YAML配置更復(fù)雜的寫(xiě)入邏輯。本文的YAML配置示例如下。更多詳情,請(qǐng)參見(jiàn)附錄:YAML配置說(shuō)明。
DATABASE: liss_test USER: liss_test PASSWORD: lissTest**** HOST: gp-2ze517f9l7****-master.gpdb.rds-aliyun-pre.rds.aliyuncs.com PORT: 5432 KAFKA: INPUT: SOURCE: BROKERS: alikafka-post-cn-wwo3hflbo002-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflbo002-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflbo002-3-vpc.alikafka.aliyuncs.com:9092 TOPIC: test_topic FALLBACK_OFFSET: EARLIEST VALUE: COLUMNS: - NAME: column_1 TYPE: varchar(32) - NAME: column_2 TYPE: bigint - NAME: column_3 TYPE: numeric - NAME: column_4 TYPE: varchar(32) - NAME: column_5 TYPE: varchar(32) FORMAT: delimited DELIMITED_OPTION: DELIMITER: "|" ERROR_LIMIT: 20 OUTPUT: SCHEMA: liss_test TABLE: liss_test_plaintext MODE: MERGE MATCH_COLUMNS: - column_1 - column_2 UPDATE_COLUMNS: - column_3 - column_4 - column_5 MAPPING: - NAME: column_1 EXPRESSION: column_1 - NAME: column_2 EXPRESSION: column_2 - NAME: column_3 EXPRESSION: column_3 - NAME: column_4 EXPRESSION: column_4 - NAME: column_5 EXPRESSION: column_5 COMMIT: MAX_ROW: 1000 MINIMAL_INTERVAL: 1000 CONSISTENCY: ATLEAST POLL: BATCHSIZE: 1000 TIMEOUT: 1000 PROPERTIES: group.id: test_consumer_group
單擊確定,并等待實(shí)時(shí)任務(wù)狀態(tài)為運(yùn)行中,即可將數(shù)據(jù)源中的數(shù)據(jù)寫(xiě)入云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版。
在任務(wù)啟動(dòng)后會(huì)在目標(biāo)端配置的Schema(專業(yè)模式為METADATA.SCHEMA中配置的schema)下生成任務(wù)的兩種輔助表,其格式分別為:
lissext_$UID
:本任務(wù)定義的gpfdist外表,用于將數(shù)據(jù)寫(xiě)入云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版。lisskafka_mission_info_$UID
:用于存儲(chǔ)任務(wù)當(dāng)前位點(diǎn)推進(jìn)的情況,保障數(shù)據(jù)寫(xiě)入的一致性。目前為了保障寫(xiě)入任務(wù)的高可用,每個(gè)寫(xiě)入任務(wù)會(huì)生成4個(gè)子任務(wù),所以每啟動(dòng)一個(gè)寫(xiě)入任務(wù),會(huì)生成4張表。UID是每個(gè)寫(xiě)入任務(wù)的唯一標(biāo)識(shí)ID。
附錄:YAML配置說(shuō)明
YAML配置文件格式如下。
DATABASE: <db_name>
USER: <user_name>
PASSWORD: <password>
HOST: <host>
PORT: <adbpg_port>
VERSION: 2
KAFKA:
INPUT:
SOURCE:
BROKERS: <kafka_broker_host:broker_port> [, ... ]
TOPIC: <kafka_topic>
[PARTITIONS: (<partition_numbers>)]
[FALLBACK_OFFSET: { earliest | latest }]
[VALUE:
COLUMNS:
- NAME: { <column_name> }
TYPE: <column_data_type>
[ ... ]
FORMAT: <value_data_format>
[[DELIMITED_OPTION:
DELIMITER: <delimiter_string>
[QUOTE: <quote_char>]
[ESCAPE: <escape_char>] ] |
[CSV_OPTION:
[DELIMITER: <delim_char>]
[QUOTE: <quote_char>]
[NULL_STRING: <nullstr_val>]
[ESCAPE: <escape_char>]
[KEY:
COLUMNS:
- NAME: { <column_name> }
TYPE: <column_data_type>
[ ... ]
FORMAT: <key_data_format>
[[DELIMITED_OPTION:
DELIMITER: <delimiter_string> |
[QUOTE: <quote_char>]
[ESCAPE: <escape_char>] ] |
[CSV_OPTION:
[DELIMITER: <delim_char>]
[QUOTE: <quote_char>]
[NULL_STRING: <nullstr_val>]
[ESCAPE: <escape_char>]
[META:
COLUMNS:
- NAME: <meta_column_name>
TYPE: { json | jsonb }
FORMAT: json]
[ERROR_LIMIT: { <num_errors> | <percentage_errors> }]
{ OUTPUT:
[SCHEMA: <output_schema_name>]
TABLE: <table_name>
[MODE: <mode>]
[MATCH_COLUMNS:
- <match_column_name>
[ ... ]]
[ORDER_COLUMNS:
- <order_column_name>
[ ... ]]
[UPDATE_COLUMNS:
- <update_column_name>
[ ... ]]
[MAPPING:
- NAME: <target_column_name>
EXPRESSION: { <source_column_name> | <expression> }
[ ... ]
|
<target_column_name> : { <source_column_name> | <expression> }
[ ... ] ] }
[METADATA:
[SCHEMA: <metadata_schema_name>]]
COMMIT:
MAX_ROW: <num_rows>
MINIMAL_INTERVAL: <wait_time>
CONSISTENCY: { strong | at-least | at-most | none }
[POLL:
BATCHSIZE: <num_records>
TIMEOUT: <poll_time>]
[PROPERTIES:
<kafka_property_name>: <kafka_property_value>
[ ... ]]
[SCHEDULE:
RETRY_INTERVAL: <retry_time>
MAX_RETRIES: <num_retries> ]
數(shù)據(jù)庫(kù)相關(guān)配置
參數(shù) | 描述 | 是否必填 |
DATABASE | 目標(biāo)端云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版實(shí)例的數(shù)據(jù)庫(kù)名稱。 | 是 |
USER | 云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版實(shí)例的賬號(hào)。 | 是 |
PASSWORD | 云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版實(shí)例的賬號(hào)密碼。 | 是 |
HOST | 目標(biāo)端云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版實(shí)例的內(nèi)網(wǎng)地址。 | 是 |
PORT | 云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版實(shí)例的端口號(hào)。 | 是 |
VERSION | 當(dāng)前采用的YAML文件格式版本,預(yù)留字段,無(wú)限制。 | 否 |
KAFKA:INPUT配置
KAFKA:INPUT:SOURCE
參數(shù) | 描述 | 是否必填 | 參數(shù)值限制 |
BROKERS | Kafka接入點(diǎn)信息。
如有多個(gè)使用英文逗號(hào)( | 是 | 對(duì)應(yīng)kafka consumer bootstrap.server 配置,需要填寫(xiě)有效的Brokers地址,否則會(huì)報(bào)錯(cuò)。 |
TOPIC | Kafka Topic名稱。 | 是 | 僅支持單個(gè)Topic。 |
PARTITIONS | 分區(qū)編號(hào)。 如有多個(gè)分區(qū)編號(hào),使用英文逗號(hào)( | 否 | 例如:1,2,3,4,5 |
FALLBACK_OFFSET | 消費(fèi)位點(diǎn)。
| 是 | 無(wú) |
KAFKA:INPUT:KEY和KAFKA:INPUT:VALUE
Kafka消息的Key值字段名稱、數(shù)據(jù)類型和數(shù)據(jù)格式。
Kafka消息的Value字段名稱、數(shù)據(jù)類型和數(shù)據(jù)格式。
必須按照在Key和Value中出現(xiàn)的順序指定所有Kafka數(shù)據(jù)元素。
KAFKA:INPUT:KEY
和KAFKA:INPUT:VALUE
至少需要配置一個(gè),如果兩個(gè)都未配置會(huì)報(bào)錯(cuò)。
參數(shù) | 描述 | 是否必填 | 參數(shù)值限制 |
COLUMNS | 如果定義 如果定義 | 是 | 無(wú) |
NAME | 定義Kafka消息中的列名。該列名主要在 | 是 | 無(wú) |
TYPE | 定義Kafka消息中列的類型,數(shù)據(jù)類型需要與這個(gè)列在目標(biāo)數(shù)據(jù)庫(kù)中的類型保持一致。 由于Kafka消息中Key和Value的格式不透明,因此當(dāng)前Streaming Server默認(rèn)從Kafka消息中獲取的數(shù)據(jù)格式為文本形式。 | 是 | 云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版支持的數(shù)據(jù)類型請(qǐng)參見(jiàn)數(shù)據(jù)類型。 如果Kafka消息的列與目標(biāo)列的類型不一致,請(qǐng)?jiān)贛apping中的expression部分對(duì)類型進(jìn)行轉(zhuǎn)換。 |
FORMAT | 定義Kafka消息數(shù)據(jù)的類型,當(dāng)前支持CSV、Delimited和protobuf。 | 是 | 無(wú) |
KAFKA:INPUT:META
META不是必填項(xiàng),當(dāng)您需要展示Message Meta信息時(shí)配置。
參數(shù) | 描述 | 是否必填 | 參數(shù)值限制 |
COLUMNS | 定義Meta信息,為一組NAME,TYPE。 | 是 | 無(wú) |
NAME | Meta名稱,可以指定為其他的名稱,默認(rèn)使用 | 是 | 無(wú) |
TYPE | 只能使用Text類型。 | 是 | Text |
FORMAT | 只能使用Text類型。 | 是 | Text |
KAFKA:INPUT:ERROR_LIMIT
錯(cuò)誤數(shù)據(jù)的容忍閾值。當(dāng)寫(xiě)入的錯(cuò)誤數(shù)據(jù)達(dá)到ERROR_LIMIT時(shí),Streaming Server會(huì)退出當(dāng)前任務(wù),自動(dòng)停止將數(shù)據(jù)源的數(shù)據(jù)寫(xiě)入云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版。默認(rèn)值為0,即Streaming Server會(huì)在出現(xiàn)第一次錯(cuò)誤數(shù)據(jù)時(shí)就退出當(dāng)前任務(wù),停止數(shù)據(jù)寫(xiě)入。ERROR_LIMIT值必須大于1。
目前該參數(shù)未啟用,不選擇或者填0即可。
KAFKA:OUTPUT配置
數(shù)據(jù)庫(kù)相關(guān)配置
數(shù)據(jù)寫(xiě)入到云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版數(shù)據(jù)庫(kù)的相關(guān)配置,包括Kafka值到目標(biāo)數(shù)據(jù)庫(kù)的映射、寫(xiě)入模式等。
參數(shù) | 描述 | 是否必填 |
SCHEMA | 寫(xiě)入云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版的目標(biāo)表所在的Schema。 | 是 |
TABLE | 目標(biāo)表的名稱。 | 是 |
MODE | 寫(xiě)入模式,目前支持INSERT、UPDATE和MERGE三種方式。 | 是 |
MATCH_COLUMNS | 當(dāng)寫(xiě)入模式為UPDATE和MERGE時(shí)生效。 指定目標(biāo)表的部分列,當(dāng)寫(xiě)入數(shù)據(jù)與目標(biāo)表數(shù)據(jù)匹配時(shí),目標(biāo)表中這部分?jǐn)?shù)據(jù)會(huì)根據(jù)UPDATE或MERGE模式對(duì)數(shù)據(jù)進(jìn)行更新。 建議MATCH_COLUMNS使用目標(biāo)表的主鍵或者唯一鍵。 | 否 |
ORDER_COLUMNS | 在寫(xiě)入模式(MODE)為MERGE時(shí)生效。 當(dāng)寫(xiě)入數(shù)據(jù)根據(jù)MATCH_COLUMNS存在多個(gè)匹配行時(shí),使用ORDER_COLUMNS對(duì)這些數(shù)據(jù)進(jìn)行排序,以確定具有最大值的輸入行,Streaming Server使用該行來(lái)更新目標(biāo)。 | 否 |
UPDATE_COLUMNS | 當(dāng)寫(xiě)入模式為UPDATE和MERGE時(shí)生效。 如果寫(xiě)入數(shù)據(jù)能夠根據(jù)MATCH_COLUMNS匹配到目標(biāo)表數(shù)據(jù),則會(huì)基于UPDATE_COLUMNS更新對(duì)應(yīng)的列。 | 否 |
在使用MERGE和UPDATE模式時(shí),如果不指定ORDER_COLUMNS,當(dāng)寫(xiě)入數(shù)據(jù)根據(jù)MATCH_COLUMNS匹配到多行相同時(shí),則會(huì)隨機(jī)取一條作為結(jié)果寫(xiě)入。
在指定了ORDER_COLUMNS后,其排序結(jié)果是
a desc,b desc,c desc
。
KAFKA:OUTPUT:MAPPING
參數(shù) | 描述 | 是否必填 |
NAME | 目標(biāo)列名稱。 | 是 |
EXPRESSION | 可以是源端的列名( | 是 |
KAFKA:METADATA配置
參數(shù) | 描述 | 是否必填 | 參數(shù)限制 |
schema | Streaming Server創(chuàng)建的外表和其他輔助表所在的Schema名稱。 | 否 | 默認(rèn)取值 |
KAFKA:COMMIT配置
COMMIT用于控制向數(shù)據(jù)庫(kù)提交數(shù)據(jù)的行為。
參數(shù) | 描述 | 是否必填 | 參數(shù)限制 |
MAX_ROW | 指定一次寫(xiě)入目標(biāo)庫(kù)的最大Batch Size。 | 否 | 單位為行,默認(rèn):500。 |
MINIMAL_INTERVAL | 在兩個(gè)Batch寫(xiě)入之間的等待時(shí)間。如果超過(guò)該時(shí)間,會(huì)嘗試再寫(xiě)一次。 | 否 | 單位為毫秒(ms),默認(rèn):1000。 |
CONSISTENCY | 數(shù)據(jù)一致性保證。 | 否 | 目前僅支持ATLEAST,即kafka中的數(shù)據(jù)至少會(huì)寫(xiě)入目標(biāo)數(shù)據(jù)庫(kù)一次。 |
KAFKA:POLL配置
POLL用于控制Kafka Consumer消費(fèi)數(shù)據(jù)的行為。
參數(shù) | 描述 | 是否必填 | 參數(shù)限制 |
BATCHSIZE | 一次從Topic中拿出的event數(shù)量。保留字段,目前沒(méi)有實(shí)現(xiàn)相關(guān)功能。 | 否 | 單位為行,默認(rèn):64。 |
TIMEOUT | Kafka Consumer從Kafka中獲取event等待的超時(shí)時(shí)間。 | 否 | 單位為毫秒(ms),默認(rèn):5000。 |
KAFKA:PROPERTIES配置
PROPERTIES用于配置Kafka Connect,當(dāng)前采用白名單制,僅支持配置group.id
,auto.offset.reset
和isolation.level
。詳細(xì)信息,請(qǐng)參見(jiàn)Kafka Connect Configs。