本文介紹自定義連接器中Workers的概念及配置。
背景信息
Workers是運行連接器邏輯的Java虛擬機 (JVM) 進程。每個Worker創建一組并行線程中的Tasks,并完成復制數據的工作。
Workers中的Tasks不存儲狀態,可以隨時啟動、停止或重新啟動。SAE將提供彈性和可擴展的數據管道,通過CPU或者Memory水位閾值判斷并在指定范圍內自動進行彈性擴縮,滿足Workers的彈性訴求。
Workers配置總覽
Workers配置參數與開源Kafka Connect的配置參數兼容,配置全集請參見Confluent Kafka Connect配置。
默認Workers配置
云消息隊列 Kafka 版提供了對Confluent Kafka Connect進行半托管的一站式平臺,提供如下默認配置:
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=true
offset.flush.interval.ms=60000
request.timeout.ms=40000
task.shutdown.graceful.timeout.ms=10000
plugin.path=/opt/kafka/connect/plugins
rest.advertised.port=8083
topic.creation.enable=false
listeners=http://:8083
可自定義的Workers配置
創建任務時允許您自定義以下參數值,這些自定義配置將覆蓋云消息隊列 Kafka 版提供的默認配置。
- 必填參數配置項(控制臺會預設必填配置項):
配置項 說明 示例 bootstrap.servers Kafka實例的接入點。用于與Kafka實例相連接。 alikafka-post-cn-7mz301t5****.alikafka.aliyuncs.com:9092 offset.storage.topic 存儲offsets信息的Topic名稱。 topic_offset config.storage.topic 存儲配置信息的Topic名稱。 topic_config status.storage.topic 存儲狀態信息的Topic名稱。 topic_status group.id 標識此Worker所屬的Connect集群。 test 控制臺預設默認配置值:group.id=connect-eb-cluster-35345 offset.storage.topic=connect-eb-offset-35345 config.storage.topic=connect-eb-config-35345 status.storage.topic=connect-eb-status-35345 consumer.group.id=connector-eb-cluster-mongo-sink bootstrap.servers=alikafka-pre-cn-zpr3156gn006-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zpr3156gn006-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zpr3156gn006-3-vpc.alikafka.aliyuncs.com:9092
- 選填參數配置項:
key.converter key.converter.schemas.enable value.converter value.converter.schemas.enable exactly.once.source.support heartbeat.interval.ms rebalance.timeout.ms session.timeout.ms client.dns.lookup connections.max.idle.ms connector.client.config.override.policy receive.buffer.bytes request.timeout.ms send.buffer.bytes worker.sync.timeout.ms worker.unsync.backoff.ms access.control.allow.methods access.control.allow.origin admin.listeners client.id config.providers connect.protocol header.converter metadata.max.age.ms offset.flush.interval.ms offset.flush.timeout.ms reconnect.backoff.max.ms reconnect.backoff.ms retry.backoff.ms scheduled.rebalance.max.delay.ms task.shutdown.graceful.timeout.ms topic.tracking.allow.reset topic.tracking.enable
不可自定義的Workers配置
以下配置項不支持自定義設置。
- 使用云消息隊列 Kafka 版提供默認值的配置項:
plugin.path rest.advertised.port topic.creation.enable listeners
- 不會傳輸至Kafka Connect的配置項。
sasl.* ssl.* security.* rest.advertised.host.name rest.advertised.listener rest.extension.classes client.* inter.worker.* metrics.* metrics.context.* response.http.headers.config socket.*