本文介紹如何創建MongoDB Connector,將MongoDB數據庫和云消息隊列 Kafka 版的數據進行雙向同步。
前提條件
步驟一:創建數據表
登錄云數據庫MongoDB管理控制臺,創建實例或使用已有實例。以創建分片集群實例為例,詳細步驟,請參見創建分片集群實例。
重要創建實例或使用已有實例時,請保證實例與前提條件中云消息隊列 Kafka 版實例使用相同的VPC,否則將會導致鏈路不通。
創建實例時會自動創建root用戶名,設置密碼時請勿包含at(@)或冒號(:)。
創建實例時,請選擇與云消息隊列 Kafka 版實例相同的vSwitch,若使用已有實例,請檢查vSwitch是否相同,若不在同一vSwitch且在同一VPC下,可以將Kafka實例的vSwitch網段添加至MongoDB訪問白名單中。詳情請見設置白名單。vSwitch的網段信息可以在專有網絡控制臺的交換機詳情頁面獲取。
實例創建完成后,在實例列表頁面單擊目標實例,然后在實例詳情頁面的左側導航欄,完成以下操作。
將DMS服務器的IP地址加入白名單。更多信息,請參見設置白名單。
在基本信息頁面的連接信息區域,記錄專有網絡的連接地址。
在基本信息頁面,單擊登錄數據庫進入DMS數據管理服務平臺。更多信息,請參見通過DMS連接MongoDB分片集群實例。
在目標實例中創建數據庫和集合。
在SQL Console頁面中,執行以下命令,創建test數據庫。
use test
在test數據庫中執行以下命令,創建名為mongo的集合。
db.createCollection("mongo")
更多信息,請參見創建數據庫和集合并寫入數據。
步驟二:創建Connector
Source Connector
下載MongoDB Connector1.8.1版本文件,解壓至本地,目錄結構如下所示。
重要下載MongoDB Connector文件時請選擇適配Java 8的版本。
. ├── assets │ ├── mongodb-leaf.png │ └── mongodb-logo.png ├── doc │ ├── LICENSE.txt │ └── README.md ├── etc │ ├── MongoSinkConnector.properties │ └── MongoSourceConnector.properties ├── lib │ └── mongo-kafka-connect-1.9.1-confluent.jar └── manifest.json
從Maven倉庫中下載avro-1.9.2.jar和mongodb-driver-sync-4.9.0.jar,將這兩個jar包移動至MongoDB Connector文件夾中的lib目錄下,然后將其壓縮成.ZIP文件,上傳至提前創建好的OSS Bucket。更多信息,請參見控制臺上傳文件。
登錄云消息隊列 Kafka 版控制臺,在概覽頁面的資源分布區域,選擇地域。
在左側導航欄,選擇
。在任務列表頁面,單擊創建任務列表。
在創建任務面板。設置任務名稱,配置以下配置項。
任務創建
在Source(源)配置向導,選擇數據提供方為Apache Kafka Connect,單擊下一步。
在連接器配置配置向導,設置以下配置項,然后單擊下一步。
配置項
參數
說明
Kafka Connect插件
Bucket存儲桶
選擇OSS Bucket。
文件
選擇上傳的.ZIP文件。
Kafka資源信息
Kafka參數配置
選擇Source Connect。
Kafka實例
選擇前提條件中創建的實例。
專有網絡VPC
選擇VPC ID。
交換機
選擇vSwitch ID。
安全組
選擇安全組。
Kafka Connect配置信息
解析當前ZIP包下的properties文件
選擇新建properties文件。選擇.ZIP文件中包含的SourceConnector對應的.properties文件。路徑為/etc/MongoSourceConnector.properties。
字段名
描述
connector.class
運行的Connector包名稱,無需修改。
tasks.max
Task的最大數量。在MongoDB Source Connector中此參數取值只能為1。
connection.url
填寫步驟一:創建數據表中獲取的MongoDB數據庫的專有網絡連接地址。地址中的****需替換為root賬號的密碼。
database
MongoDB數據庫名稱。
collection
MongoDB集合名稱。
topic.namespace.map
目標Topic信息,為Key-Value 結構,Key的組成為
database.{collection}
,Value為目標Topic名稱,此參數表示指定Collection的數據變化會傳輸至指定Topic中。在投遞數據前,請提前創建好目標Topic。copy.existing
是否將源MongoDB Collection中的已存在數據全量同步至Kafka中。若為true,則在Connector首次啟動時,就會將MongoDB Collection中的存量數據全量同步至下游Kafka Topic中。建議在首次全量同步完成后,將該值更新為false,防止后續將Connector刪除重建后,再次進行數據全量同步,導致重復消費。
connector.class=com.mongodb.kafka.connect.MongoSourceConnector name=mongo-source batch.size=0 change.stream.full.document=updateLookup collection=testCollection connection.uri=mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]] database=testDatabase poll.await.time.ms=5000 poll.max.batch.size=1000 tasks.max=1 topic.namespace.map={"testDatabase.testCollection": "targetTopic"} # 是否將源MongoDB Collection中的已存在數據全量同步至Kafka中。 # 若為true,則在Connector首次啟動時,就會將MongoDB Collection中的存量數據全量同步至下游Kafka Topic中。 # 建議在首次全量同步完成后,將該值更新為false,防止后續將Connector刪除重建后,再次進行數據全量同步,導致重復消費。 copy.existing=true
Connector全量參數,請參見MongoDB Source Connector Properties。
在實例配置配置向導,設置以下參數,然后單擊下一步。
配置項
參數
說明
Worker規格
Worker規格
選擇合適的Worker規格。
最小Worker數
設置為1。
最大Worker數
設置為1。
Kafka Connect Worker配置
自動創建Kafka Connect Worker依賴資源
建議勾選此項,此時會在選擇的Kafka實例中自動創建Kafka Connect運行所需的一些Internal Topic以及ConsumerGroup,并將這些必填配置自動填入配置框中,包括以下配置項:
Offset Topic:用于存儲源數據偏移量,命名規則為
connect-eb-offset-<任務名稱>
。Config Topic:用于存儲Connectors以及Tasks的配置信息,命名規則為
connect-eb-config-<任務名稱>
。Status Topic:用于存儲Connectors以及Tasks狀態信息,命名規則為
connect-eb-status-<任務名稱>
。Kafka Connect Consumer Group:Kafka Connect Worker用于消費Internal Topics的消費組,命名規則為
connect-eb-cluster-<任務名稱>
。Kafka Source Connector Consumer Group:只針對Sink Connector有效,用于消費源Kafka Topic中的數據,命名規則為
connector-eb-cluster-<任務名稱>-<connector名稱>
。
在運行配置配置向導,將日志投遞方式設置為投遞至SLS或者投遞至Kafka,在角色授權卡片設置Connect依賴的角色配置,然后單擊保存。
重要建議配置的角色包含AliyunSAEFullAccess權限,否則可能會導致任務運行失敗。
任務屬性
設置此任務的重試策略及死信隊列。更多信息,請參見重試和死信。
等待任務狀態變為運行中,此時Connector已經在正常工作中。
Sink Connector
下載MongoDB Connector1.8.1版本文件,解壓至本地,目錄結構如下所示。
重要下載MongoDB Connector文件時請選擇適配Java 8的版本。
. ├── assets │ ├── mongodb-leaf.png │ └── mongodb-logo.png ├── doc │ ├── LICENSE.txt │ └── README.md ├── etc │ ├── MongoSinkConnector.properties │ └── MongoSourceConnector.properties ├── lib │ └── mongo-kafka-connect-1.9.1-confluent.jar └── manifest.json
從Maven倉庫中下載avro-1.9.2.jar和mongodb-driver-sync-4.9.0.jar,將這兩個jar包移動至MongoDB Connector文件夾中的lib目錄下,然后將其壓縮成.ZIP文件,上傳至提前創建好的OSS Bucket。更多信息,請參見控制臺上傳文件。
登錄云消息隊列 Kafka 版控制臺,在概覽頁面的資源分布區域,選擇地域。
在左側導航欄,選擇
。在任務列表頁面,單擊創建任務列表。
在創建任務面板。設置任務名稱,配置以下配置項。
任務創建
在Source(源)配置向導,選擇數據提供方為Apache Kafka Connect,單擊下一步。
在連接器配置配置向導,設置以下配置項,然后單擊下一步。
配置項
參數
說明
Kafka Connect插件
Bucket存儲桶
選擇OSS Bucket。
文件
選擇上傳的.ZIP文件。
Kafka資源信息
Kafka參數配置
選擇Sink Connect。
Kafka實例
選擇前提條件中創建的實例。
專有網絡VPC
選擇VPC ID。
交換機
選擇vSwitch ID。
安全組
選擇安全組。
Kafka Connect配置信息
解析當前ZIP包下的properties文件
選擇新建properties文件。選擇.ZIP文件中包含的SinkConnector對應的.properties文件。路徑為/etc/MongoSinkConnector.properties。
字段名
描述
connector.class
運行的Connector包名稱,無需修改。
tasks.max
Task的最大數量。
topics
數據源Topic名稱。不同Topic間以英文逗號(,)分隔。
connection.url
數據庫連接地址。填寫步驟一:創建數據表中獲取的專有網絡連接地址。地址中的****需替換為root賬號的密碼。
database
目標MongoDB數據庫名稱。
connection
目標MongoDB集合名稱。
connector.class=com.mongodb.kafka.connect.MongoSinkConnector name=mongo-sink collection=testCollection connection.uri=mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]] database=testDatabase delete.on.null.values=false key.converter=org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable=false max.batch.size=0 rate.limiting.every.n=0 rate.limiting.timeout=0 tasks.max=2 topics=testTopic value.converter=org.apache.kafka.connect.storage.StringConverter value.converter.schemas.enable=true writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy
Connector全量參數,請參見MongoDB Sink Connector Properties。
在實例配置配置向導,設置以下參數,然后單擊下一步。
配置項
參數
說明
Worker規格
Worker規格
選擇合適的Worker規格。
最小Worker數
設置為1。
最大Worker數
設置為1。此值不能超過tasks.max的取值。
Kafka Connect Worker 配置
自動創建Kafka Connect Worker依賴資源
建議勾選此項,此時會在選擇的Kafka實例中自動創建Kafka Connect運行所需的一些Internal Topic以及ConsumerGroup,并將這些必填配置自動填入配置框中,包括以下配置項:
Offset Topic:用于存儲源數據偏移量,命名規則為
connect-eb-offset-<任務名稱>
。Config Topic:用于存儲Connectors以及Tasks的配置信息,命名規則為
connect-eb-config-<任務名稱>
。Status Topic:用于存儲Connectors以及Tasks狀態信息,命名規則為
connect-eb-status-<任務名稱>
。Kafka Connect Consumer Group:Kafka Connect Worker用于消費Internal Topics的消費組,命名規則為
connect-eb-cluster-<任務名稱>
。Kafka Source Connector Consumer Group:只針對Sink Connector有效,用于消費源Kafka Topic中的數據,命名規則為
connector-eb-cluster-<任務名稱>-<connector名稱>
。
在運行配置配置向導,將日志投遞方式設置為投遞至SLS或者投遞至Kafka,在角色授權卡片設置Connect依賴的角色配置,然后單擊保存。
重要建議配置的角色包含AliyunSAEFullAccess權限,否則可能會導致任務運行失敗。
任務屬性
設置此任務的重試策略及死信隊列。更多信息,請參見重試和死信。
等待任務狀態變為運行中,此時Connector已經在正常工作中。
步驟三:測試Connector
Source Connector
在DMS數據管理服務平臺,向步驟一:創建數據表中創建的Connection插入一條數據。例如,插入一條Key為testKey,Value為testValue的數據,命令如下。
db.testCollection.insert({"testKey":"testValue"})
登錄云消息隊列 Kafka 版控制臺,在實例列表頁面,單擊目標實例。
在目標實例頁面,單擊目標Topic,然后單擊消息查詢,查看插入的消息數據,消息Value示例如下。
{"_id": {"_data": "826464A63D000000012B022C0100296E5A1004CB11AB15FD6D4C409E37370B43A4B82246645F696400646464A624458CE6B7B626645B****"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1684317757, "i": 1}}, "fullDocument": {"_id": {"$oid": "6464a624458ce6b7b626****"}, "testKey": "testValue"}, "ns": {"db": "test", "coll": "mongo"}, "documentKey": {"_id": {"$oid": "6464a624458ce6b7b626****"}}}
Sink Connector
登錄云消息隊列 Kafka 版控制臺,在實例列表頁面,單擊目標實例。
在左側導航欄,單擊Topic管理,然后單擊目標Topic。
在Topic詳情頁面右上角,單擊體驗發送消息。
在快速體驗消息收發面板,設置消息內容。例如在目標表格中添加一條Key為Key1,Value為Value1的數據,消息內容如下。
{"key1": "value1"}
在DMS數據管理服務平臺,執行以下命令,查看目標集合中接收的數據。
db.mongo.find()
接收數據示例如下:
{ "_id":"ObjectId("643f4d5551daf4552246****")" "_insertedTS":"ISODate("2023-05-18T02:09:25.314Z")" "_modifiedTS":"ISODate("2023-05-18T02:09:25.314Z")" "key1":"value1" }
常見報錯
場景一:所有Tasks運行失敗
錯誤信息:
All tasks under connector mongo-source failed, please check the error trace of the task.
解決方法:在消息流入任務詳情頁面,單擊基礎信息區域的診斷鏈接,即可跳轉到Connector監控頁面,可以看到Tasks運行失敗的詳細錯誤信息。
場景二:Kafka Connect退出
錯誤信息:
Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.
解決方法:由于狀態獲取可能會有延遲,建議您先嘗試刷新頁面。若刷新后仍然是失敗狀態,您可以按照以下步驟查看錯誤信息。
在消息流入任務詳情頁面的Worker信息區域,單擊SAE應用后的實例名稱,跳轉到SAE應用詳情頁面。
在基本信息頁面,單擊實例部署信息頁簽。
在實例右側操作列,單擊Webshell登錄Kafka Connect運行環境。
執行
vi /home/admin/connector-bootstrap.log
命令,查看Connector啟動日志,查找其中是否包含錯誤信息。執行
vi /opt/kafka/logs/connect.log
命令,查看Connector運行日志,在其中查找ERROR或者WARN字段來查看是否有錯誤信息。
基于錯誤信息提示進行修復操作后,可以重新啟動對應任務。
場景三:Connector參數校驗失敗
錯誤信息:
Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`
解決方法:此時需要根據錯誤信息,找出具體哪個參數出錯,更新對應參數即可。若基于上述錯誤信息無法定位具體的出錯參數,可以參考上文場景二中的步驟登錄Kafka Connect運行環境,執行以下命令,查詢參數是否校驗通過。
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate
該指令會返回Connector參數中每個參數是否校驗通過,若不通過,則errors屬性非空,如下所示。
"value":{
"name":"snapshot.mode",
"value":null,
"recommended_values":[
"never",
"initial_only",
"when_needed",
"initial",
"schema_only",
"schema_only_recovery"
],
"errors":[
"Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
],
"visible":true
}
場景四:無法連接服務器
錯誤信息:
Start or update connector mongo-source failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):Unable to connect to the server.
解決方法:請查看Connector的配置信息是否正確,然后檢查MongoDB實例與Kafka實例是否在同一個VPC及vSwitch下,如果不在一個vSwitch但是在同一VPC下,那么需要將Kafka實例所在的交換機網段加入MongoDB實例的白名單。
場景五:數據庫用戶名或者密碼信息包含特殊字符
錯誤信息:
The connection string contains invalid user information. If the username or password contains a colon (:) or an at-sign (@) then it must be urlencoded\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}
解決方法:請檢查MongoDB賬戶名和密碼中是否包含有at(@)或冒號(:),如有請轉義對應的符號。at(@)和冒號(:)進行16進制URL編碼后分別為%40
和%3A
。