本文介紹自定義連接器中Workers的概念及配置。

背景信息

Workers是運行連接器邏輯的Java虛擬機 (JVM) 進程。每個Worker創建一組并行線程中的Tasks,并完成復制數據的工作。

Workers中的Tasks不存儲狀態,可以隨時啟動、停止或重新啟動。SAE將提供彈性和可擴展的數據管道,通過CPU或者Memory水位閾值判斷并在指定范圍內自動進行彈性擴縮,滿足Workers的彈性訴求。

Worker

Workers配置總覽

Workers配置參數與開源Kafka Connect的配置參數兼容,配置全集請參見Confluent Kafka Connect配置。

默認Workers配置

云消息隊列 Kafka 版提供了對Confluent Kafka Connect進行半托管的一站式平臺,提供如下默認配置:
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=true

offset.flush.interval.ms=60000
request.timeout.ms=40000
task.shutdown.graceful.timeout.ms=10000

plugin.path=/opt/kafka/connect/plugins
rest.advertised.port=8083
topic.creation.enable=false
listeners=http://:8083

可自定義的Workers配置

創建任務時允許您自定義以下參數值,這些自定義配置將覆蓋云消息隊列 Kafka 版提供的默認配置。

  • 必填參數配置項(控制臺會預設必填配置項):
    配置項說明示例
    bootstrap.serversKafka實例的接入點。用于與Kafka實例相連接。alikafka-post-cn-7mz301t5****.alikafka.aliyuncs.com:9092
    offset.storage.topic存儲offsets信息的Topic名稱。topic_offset
    config.storage.topic存儲配置信息的Topic名稱。topic_config
    status.storage.topic存儲狀態信息的Topic名稱。topic_status
    group.id標識此Worker所屬的Connect集群。test
    控制臺預設默認配置值:
    group.id=connect-eb-cluster-35345
    offset.storage.topic=connect-eb-offset-35345
    config.storage.topic=connect-eb-config-35345
    status.storage.topic=connect-eb-status-35345
    consumer.group.id=connector-eb-cluster-mongo-sink
    bootstrap.servers=alikafka-pre-cn-zpr3156gn006-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zpr3156gn006-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zpr3156gn006-3-vpc.alikafka.aliyuncs.com:9092
  • 選填參數配置項:
    key.converter
    key.converter.schemas.enable
    value.converter
    value.converter.schemas.enable
    exactly.once.source.support
    heartbeat.interval.ms
    rebalance.timeout.ms
    session.timeout.ms
    client.dns.lookup
    connections.max.idle.ms
    connector.client.config.override.policy
    receive.buffer.bytes
    request.timeout.ms
    send.buffer.bytes
    worker.sync.timeout.ms
    worker.unsync.backoff.ms
    access.control.allow.methods
    access.control.allow.origin
    admin.listeners
    client.id
    config.providers
    connect.protocol
    header.converter
    metadata.max.age.ms
    offset.flush.interval.ms
    offset.flush.timeout.ms
    reconnect.backoff.max.ms
    reconnect.backoff.ms
    retry.backoff.ms
    scheduled.rebalance.max.delay.ms
    task.shutdown.graceful.timeout.ms
    topic.tracking.allow.reset
    topic.tracking.enable

不可自定義的Workers配置

以下配置項不支持自定義設置。

  • 使用云消息隊列 Kafka 版提供默認值的配置項:
    plugin.path
    rest.advertised.port
    topic.creation.enable
    listeners
  • 不會傳輸至Kafka Connect的配置項。
    sasl.*
    ssl.*
    security.*
    rest.advertised.host.name
    rest.advertised.listener
    rest.extension.classes
    client.*
    inter.worker.*
    metrics.*
    metrics.context.*
    response.http.headers.config
    socket.*