本文介紹如何創建RabbitMQ Connector,將云消息隊列 RabbitMQ 版的數據同步至云消息隊列 Kafka 版。
前提條件
開通對象存儲OSS服務并創建存儲空間(Bucket)。更多信息,請參見控制臺創建存儲空間。
開通Serverless應用引擎服務。更多信息,請參見準備工作。
購買并部署云消息隊列 Kafka 版實例。更多信息,請參見購買和部署實例。
步驟一:創建RabbitMQ資源
登錄云消息隊列 RabbitMQ 版控制臺,創建RabbitMQ實例。操作步驟,請參見創建實例。
單擊已創建的實例,在實例詳情頁面創建以下資源。
步驟二:創建Connector
下載RabbitMQ Connector文件,上傳至提前創建好的OSS Bucket。更多信息,請參見控制臺上傳文件。
登錄云消息隊列 Kafka 版控制臺,在概覽頁面的資源分布區域,選擇地域。
在左側導航欄,選擇
。在任務列表頁面,單擊創建任務。
在創建任務面板。設置任務名稱,配置以下配置項。
任務創建
在Source(源)配置向導,選擇數據提供方為Apache Kafka Connect,單擊下一步。
在連接器配置配置向導,設置以下配置項,然后單擊下一步。
配置項
參數
說明
Kafka Connect插件
Bucket存儲桶
選擇OSS Bucket。
文件
選擇上傳的.ZIP文件。
Kafka資源信息
Kafka 參數配置
選擇Source Connect。
Kafka實例
選擇前提條件中創建的實例。
專有網絡VPC
默認選擇部署Kafka實例時選擇的VPC ID且不可更改。
交換機
默認選擇部署Kafka實例時選擇的vSwitch ID且不可更改。
安全組
選擇安全組。
Kafka Connect配置信息
解析當前ZIP包下的properties文件
選擇新建properties文件。選擇.ZIP文件中包含的SourceConnector對應的.properties文件。路徑為/etc/source-xxx.properties。在輸入框中更新下列字段的取值。
connector.class:運行的Connector的包名稱,無需修改。
tasks.max:Task的最大數量。
rabbitmq.host:填寫RabbitMQ實例VPC接入點地址。可在RabbitMQ實例詳情頁面的接入點信息區域查看。
rabbitmq.username:填寫步驟一:創建RabbitMQ資源中創建的RabbitMQ實例靜態用戶名。
rabbitmq.password:填寫步驟一:創建RabbitMQ資源中創建的RabbitMQ實例靜態用戶名密碼。
rabbitmq.virtual.host:填寫步驟一:創建RabbitMQ資源中創建的Vhost。
kafka.topic:目標Kafka Topic,請在投遞數據前,提前創建好目標Topic。
rabbitmq.queue:填寫步驟一:創建RabbitMQ資源中創建的Queue。
示例代碼如下:
connector.class=com.ibm.eventstreams.connect.rabbitmqsource.RabbitMQSourceConnector name=rabbitmq-source-connector # RabbitMQ實例VPC接入點信息。 rabbitmq.host=xxx # RabbitMQ實例靜態用戶名密碼。 rabbitmq.password=xxx # RabbitMQ實例靜態用戶名。 rabbitmq.username=xxx # RabbitMQ實例Vhost。 rabbitmq.virtual.host=xxx # 目標Kafka Topic。 kafka.topic=xxx # RabbitMQ實例隊列。 rabbitmq.queue=xxx tasks.max=4
在實例配置配置向導,設置以下參數,然后單擊下一步。
配置項
參數
說明
Worker規格
Worker規格
選擇合適的Worker規格。
最小Worker數
設置最小Worker數量。
最大Worker數
設置最大Worker數量。此數值不得超過Task的最大數量。
橫向擴縮容閾值 %
當利用率大于或小于設置的CPU和Memory數值時,觸發自動擴容或縮容。僅當最小Worker數和最大Worker數值不相等時,需要配置此參數。
Kafka Connect Worker 配置
自動創建Kafka Connect Worker依賴資源
建議勾選此項,此時會在選擇的Kafka實例中自動創建Kafka Connect運行所需的一些Internal Topic以及ConsumerGroup,并將這些必填配置自動填入配置框中,包括以下配置項:
Offset Topic:用于存儲源數據偏移量,命名規則為
connect-eb-offset-<任務名稱>
。Config Topic:用于存儲Connectors以及Tasks的配置信息,命名規則為
connect-eb-config-<任務名稱>
。Status Topic:用于存儲Connectors以及Tasks狀態信息,命名規則為
connect-eb-status-<任務名稱>
。Kafka Connect Consumer Group:Kafka Connect Worker用于消費Internal Topics的消費組,命名規則為
connect-eb-cluster-<任務名稱>
。Kafka Source Connector Consumer Group:只針對Sink Connector有效,用于消費源Topic中的數據,命名規則為
connector-eb-cluster-<任務名稱>-<connector名稱>
。
在運行配置區域,將日志投遞方式設置為投遞至SLS或者投遞至Kafka,在角色授權卡片設置Connect依賴的角色配置,然后單擊保存。
重要建議配置的角色包含AliyunSAEFullAccess權限,否則可能會導致任務運行失敗。
任務屬性
設置此任務的重試策略及死信隊列。更多信息,請參見重試和死信。
等待任務狀態變為運行中,此時Connector已經在正常工作中。
步驟三:測試Connector
登錄云消息隊列 RabbitMQ 版控制臺,然后在左側導航欄選擇實例列表。
在實例列表頁面的頂部菜單欄選擇地域,然后在實例列表中,單擊目標實例名稱。
在左側導航欄,單擊Queue列表,然后單擊目標Queue右側操作列的詳情。
在Queue詳情頁面,單擊被綁定信息頁簽的添加被綁定。
在添加被綁定面板,選擇源Exchange為amq.direct,單擊確定。
在被綁定信息頁簽,單擊amq.direct Exchange右側操作列的發送消息,向Kafka的目標Topic發送消息。更多信息,請參見發送消息。
登錄云消息隊列 Kafka 版控制臺,在實例列表頁面,單擊目標實例。
在目標實例頁面,單擊目標Topic,然后單擊消息查詢,查看插入的消息數據。
常見報錯
場景一:所有Tasks運行失敗
錯誤信息:
All tasks under connector mongo-source failed, please check the error trace of the task.
解決方法:在消息流入任務詳情頁面,單擊基礎信息區域的診斷鏈接,即可跳轉到Connector監控頁面,可以看到Tasks運行失敗的詳細錯誤信息。
場景二:Kafka Connect退出
錯誤信息:
Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.
解決方法:由于狀態獲取可能會有延遲,建議您先嘗試刷新頁面。若刷新后仍然是失敗狀態,您可以按照以下步驟查看錯誤信息。
在消息流入任務詳情頁面的Worker信息區域,單擊SAE應用后的實例名稱,跳轉到SAE應用詳情頁面。
在基本信息頁面,單擊實例部署信息頁簽。
在實例右側操作列,單擊Webshell登錄Kafka Connect運行環境。
執行
vi /home/admin/connector-bootstrap.log
命令,查看Connector啟動日志,查找其中是否包含錯誤信息。執行
vi /opt/kafka/logs/connect.log
命令,查看Connector運行日志,在其中查找ERROR或者WARN字段來查看是否有錯誤信息。
基于錯誤信息提示進行修復操作后,可以重新啟動對應任務。
場景三:Connector參數校驗失敗
錯誤信息:
Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`
解決方法:此時需要根據錯誤信息,找出具體哪個參數出錯,更新對應參數即可。若基于上述錯誤信息無法定位具體的出錯參數,可以參考上文場景二中的步驟登錄Kafka Connect運行環境,執行以下命令,查詢參數是否校驗通過。
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate
該指令會返回Connector參數中每個參數是否校驗通過,若不通過,則errors屬性非空,如下所示。
"value":{
"name":"snapshot.mode",
"value":null,
"recommended_values":[
"never",
"initial_only",
"when_needed",
"initial",
"schema_only",
"schema_only_recovery"
],
"errors":[
"Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
],
"visible":true
}