日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用MongoDB Connector連接MongoDB數(shù)據(jù)庫

本文介紹如何創(chuàng)建MongoDB Connector,將MongoDB數(shù)據(jù)庫和云消息隊列 Kafka 版的數(shù)據(jù)進行雙向同步。

前提條件

步驟一:創(chuàng)建數(shù)據(jù)表

  1. 登錄云數(shù)據(jù)庫MongoDB管理控制臺,創(chuàng)建實例或使用已有實例。以創(chuàng)建分片集群實例為例,詳細步驟,請參見創(chuàng)建分片集群實例

    重要
    • 創(chuàng)建實例或使用已有實例時,請保證實例與前提條件中云消息隊列 Kafka 版實例使用相同的VPC,否則將會導(dǎo)致鏈路不通。

    • 創(chuàng)建實例時會自動創(chuàng)建root用戶名,設(shè)置密碼時請勿包含at(@)或冒號(:)。

    • 創(chuàng)建實例時,請選擇與云消息隊列 Kafka 版實例相同的vSwitch,若使用已有實例,請檢查vSwitch是否相同,若不在同一vSwitch且在同一VPC下,可以將Kafka實例的vSwitch網(wǎng)段添加至MongoDB訪問白名單中。詳情請見設(shè)置白名單。vSwitch的網(wǎng)段信息可以在專有網(wǎng)絡(luò)控制臺的交換機詳情頁面獲取。

  2. 實例創(chuàng)建完成后,在實例列表頁面單擊目標實例,然后在實例詳情頁面的左側(cè)導(dǎo)航欄,完成以下操作。

    1. 將DMS服務(wù)器的IP地址加入白名單。更多信息,請參見設(shè)置白名單

    2. 基本信息頁面的連接信息區(qū)域,記錄專有網(wǎng)絡(luò)的連接地址。

      連接地址
  3. 基本信息頁面,單擊登錄數(shù)據(jù)庫進入DMS數(shù)據(jù)管理服務(wù)平臺。更多信息,請參見通過DMS連接MongoDB分片集群實例

  4. 在目標實例中創(chuàng)建數(shù)據(jù)庫和集合。

    • SQL Console頁面中,執(zhí)行以下命令,創(chuàng)建test數(shù)據(jù)庫。

      use test
    • 在test數(shù)據(jù)庫中執(zhí)行以下命令,創(chuàng)建名為mongo的集合。

      db.createCollection("mongo")

    更多信息,請參見創(chuàng)建數(shù)據(jù)庫和集合并寫入數(shù)據(jù)

步驟二:創(chuàng)建Connector

Source Connector

  1. 下載MongoDB Connector1.8.1版本文件,解壓至本地,目錄結(jié)構(gòu)如下所示。

    重要

    下載MongoDB Connector文件時請選擇適配Java 8的版本。

    .
    ├── assets
    │   ├── mongodb-leaf.png
    │   └── mongodb-logo.png
    ├── doc
    │   ├── LICENSE.txt
    │   └── README.md
    ├── etc
    │   ├── MongoSinkConnector.properties
    │   └── MongoSourceConnector.properties
    ├── lib
    │   └── mongo-kafka-connect-1.9.1-confluent.jar
    └── manifest.json
  2. Maven倉庫中下載avro-1.9.2.jar和mongodb-driver-sync-4.9.0.jar,將這兩個jar包移動至MongoDB Connector文件夾中的lib目錄下,然后將其壓縮成.ZIP文件,上傳至提前創(chuàng)建好的OSS Bucket。更多信息,請參見控制臺上傳文件

  3. 登錄云消息隊列 Kafka 版控制臺,在概覽頁面的資源分布區(qū)域,選擇地域。

  4. 在左側(cè)導(dǎo)航欄,選擇Connector生態(tài)集成 > 任務(wù)列表

  5. 任務(wù)列表頁面,單擊創(chuàng)建任務(wù)列表

  6. 創(chuàng)建任務(wù)面板。設(shè)置任務(wù)名稱,配置以下配置項。

    • 任務(wù)創(chuàng)建

      1. Source(源)配置向?qū)Вx擇數(shù)據(jù)提供方Apache Kafka Connect,單擊下一步

      2. 連接器配置配置向?qū)ВO(shè)置以下配置項,然后單擊下一步

        配置項

        參數(shù)

        說明

        Kafka Connect插件

        Bucket存儲桶

        選擇OSS Bucket。

        文件

        選擇上傳的.ZIP文件。

        Kafka資源信息

        Kafka參數(shù)配置

        選擇Source Connect。

        Kafka實例

        選擇前提條件中創(chuàng)建的實例。

        專有網(wǎng)絡(luò)VPC

        選擇VPC ID。

        交換機

        選擇vSwitch ID。

        安全組

        選擇安全組。

        Kafka Connect配置信息

        解析當前ZIP包下的properties文件

        選擇新建properties文件。選擇.ZIP文件中包含的SourceConnector對應(yīng)的.properties文件。路徑為/etc/MongoSourceConnector.properties。

        在輸入框中更新相關(guān)字段的取值。展開查看字段描述

        字段名

        描述

        connector.class

        運行的Connector包名稱,無需修改。

        tasks.max

        Task的最大數(shù)量。在MongoDB Source Connector中此參數(shù)取值只能為1。

        connection.url

        填寫步驟一:創(chuàng)建數(shù)據(jù)表中獲取的MongoDB數(shù)據(jù)庫的專有網(wǎng)絡(luò)連接地址。地址中的****需替換為root賬號的密碼。

        database

        MongoDB數(shù)據(jù)庫名稱。

        collection

        MongoDB集合名稱。

        topic.namespace.map

        目標Topic信息,為Key-Value 結(jié)構(gòu),Key的組成為database.{collection},Value為目標Topic名稱,此參數(shù)表示指定Collection的數(shù)據(jù)變化會傳輸至指定Topic中。在投遞數(shù)據(jù)前,請?zhí)崆皠?chuàng)建好目標Topic。

        copy.existing

        是否將源MongoDB Collection中的已存在數(shù)據(jù)全量同步至Kafka中。若為true,則在Connector首次啟動時,就會將MongoDB Collection中的存量數(shù)據(jù)全量同步至下游Kafka Topic中。建議在首次全量同步完成后,將該值更新為false,防止后續(xù)將Connector刪除重建后,再次進行數(shù)據(jù)全量同步,導(dǎo)致重復(fù)消費。

        展開查看示例代碼

        connector.class=com.mongodb.kafka.connect.MongoSourceConnector
        name=mongo-source
        batch.size=0
        change.stream.full.document=updateLookup
        collection=testCollection
        connection.uri=mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]
        database=testDatabase
        poll.await.time.ms=5000
        poll.max.batch.size=1000
        tasks.max=1
        topic.namespace.map={"testDatabase.testCollection": "targetTopic"}
        # 是否將源MongoDB Collection中的已存在數(shù)據(jù)全量同步至Kafka中。
        # 若為true,則在Connector首次啟動時,就會將MongoDB Collection中的存量數(shù)據(jù)全量同步至下游Kafka Topic中。
        # 建議在首次全量同步完成后,將該值更新為false,防止后續(xù)將Connector刪除重建后,再次進行數(shù)據(jù)全量同步,導(dǎo)致重復(fù)消費。
        copy.existing=true

        Connector全量參數(shù),請參見MongoDB Source Connector Properties

      3. 實例配置配置向?qū)ВO(shè)置以下參數(shù),然后單擊下一步

        配置項

        參數(shù)

        說明

        Worker規(guī)格

        Worker規(guī)格

        選擇合適的Worker規(guī)格。

        最小Worker數(shù)

        設(shè)置為1。

        最大Worker數(shù)

        設(shè)置為1。

        Kafka Connect Worker配置

        自動創(chuàng)建Kafka Connect Worker依賴資源

        建議勾選此項,此時會在選擇的Kafka實例中自動創(chuàng)建Kafka Connect運行所需的一些Internal Topic以及ConsumerGroup,并將這些必填配置自動填入配置框中,包括以下配置項:

        • Offset Topic:用于存儲源數(shù)據(jù)偏移量,命名規(guī)則為connect-eb-offset-<任務(wù)名稱>

        • Config Topic:用于存儲Connectors以及Tasks的配置信息,命名規(guī)則為connect-eb-config-<任務(wù)名稱>

        • Status Topic:用于存儲Connectors以及Tasks狀態(tài)信息,命名規(guī)則為connect-eb-status-<任務(wù)名稱>

        • Kafka Connect Consumer Group:Kafka Connect Worker用于消費Internal Topics的消費組,命名規(guī)則為connect-eb-cluster-<任務(wù)名稱>

        • Kafka Source Connector Consumer Group:只針對Sink Connector有效,用于消費源Kafka Topic中的數(shù)據(jù),命名規(guī)則為connector-eb-cluster-<任務(wù)名稱>-<connector名稱>

      4. 運行配置配置向?qū)В瑢?b data-tag="uicontrol" id="uicontrol-g7e-c0c-q3o" class="uicontrol">日志投遞方式設(shè)置為投遞至SLS或者投遞至Kafka,在角色授權(quán)卡片設(shè)置Connect依賴的角色配置,然后單擊保存

        重要

        建議配置的角色包含AliyunSAEFullAccess權(quán)限,否則可能會導(dǎo)致任務(wù)運行失敗。

    • 任務(wù)屬性

      設(shè)置此任務(wù)的重試策略及死信隊列。更多信息,請參見重試和死信

    等待任務(wù)狀態(tài)變?yōu)?b data-tag="uicontrol" id="uicontrol-vtx-cm0-vwk" class="uicontrol">運行中,此時Connector已經(jīng)在正常工作中。

Sink Connector

  1. 下載MongoDB Connector1.8.1版本文件,解壓至本地,目錄結(jié)構(gòu)如下所示。

    重要

    下載MongoDB Connector文件時請選擇適配Java 8的版本。

    .
    ├── assets
    │   ├── mongodb-leaf.png
    │   └── mongodb-logo.png
    ├── doc
    │   ├── LICENSE.txt
    │   └── README.md
    ├── etc
    │   ├── MongoSinkConnector.properties
    │   └── MongoSourceConnector.properties
    ├── lib
    │   └── mongo-kafka-connect-1.9.1-confluent.jar
    └── manifest.json
  2. Maven倉庫中下載avro-1.9.2.jar和mongodb-driver-sync-4.9.0.jar,將這兩個jar包移動至MongoDB Connector文件夾中的lib目錄下,然后將其壓縮成.ZIP文件,上傳至提前創(chuàng)建好的OSS Bucket。更多信息,請參見控制臺上傳文件

  3. 登錄云消息隊列 Kafka 版控制臺,在概覽頁面的資源分布區(qū)域,選擇地域。

  4. 在左側(cè)導(dǎo)航欄,選擇Connector生態(tài)集成 > 任務(wù)列表

  5. 任務(wù)列表頁面,單擊創(chuàng)建任務(wù)列表

  6. 創(chuàng)建任務(wù)面板。設(shè)置任務(wù)名稱,配置以下配置項。

    • 任務(wù)創(chuàng)建

      1. Source(源)配置向?qū)Вx擇數(shù)據(jù)提供方Apache Kafka Connect,單擊下一步

      2. 連接器配置配置向?qū)ВO(shè)置以下配置項,然后單擊下一步

        配置項

        參數(shù)

        說明

        Kafka Connect插件

        Bucket存儲桶

        選擇OSS Bucket。

        文件

        選擇上傳的.ZIP文件。

        Kafka資源信息

        Kafka參數(shù)配置

        選擇Sink Connect。

        Kafka實例

        選擇前提條件中創(chuàng)建的實例。

        專有網(wǎng)絡(luò)VPC

        選擇VPC ID。

        交換機

        選擇vSwitch ID。

        安全組

        選擇安全組。

        Kafka Connect配置信息

        解析當前ZIP包下的properties文件

        選擇新建properties文件。選擇.ZIP文件中包含的SinkConnector對應(yīng)的.properties文件。路徑為/etc/MongoSinkConnector.properties。

        在輸入框中更新相關(guān)字段的取值。展開查看字段描述

        字段名

        描述

        connector.class

        運行的Connector包名稱,無需修改。

        tasks.max

        Task的最大數(shù)量。

        topics

        數(shù)據(jù)源Topic名稱。不同Topic間以英文逗號(,)分隔。

        connection.url

        數(shù)據(jù)庫連接地址。填寫步驟一:創(chuàng)建數(shù)據(jù)表中獲取的專有網(wǎng)絡(luò)連接地址。地址中的****需替換為root賬號的密碼。

        database

        目標MongoDB數(shù)據(jù)庫名稱。

        connection

        目標MongoDB集合名稱。

        展開查看示例代碼

        connector.class=com.mongodb.kafka.connect.MongoSinkConnector
        name=mongo-sink
        collection=testCollection
        connection.uri=mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]
        database=testDatabase
        delete.on.null.values=false
        key.converter=org.apache.kafka.connect.storage.StringConverter
        key.converter.schemas.enable=false
        max.batch.size=0
        rate.limiting.every.n=0
        rate.limiting.timeout=0
        tasks.max=2
        topics=testTopic
        value.converter=org.apache.kafka.connect.storage.StringConverter
        value.converter.schemas.enable=true
        writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

        Connector全量參數(shù),請參見MongoDB Sink Connector Properties

      3. 實例配置配置向?qū)ВO(shè)置以下參數(shù),然后單擊下一步

        配置項

        參數(shù)

        說明

        Worker規(guī)格

        Worker規(guī)格

        選擇合適的Worker規(guī)格。

        最小Worker數(shù)

        設(shè)置為1。

        最大Worker數(shù)

        設(shè)置為1。此值不能超過tasks.max的取值。

        Kafka Connect Worker 配置

        自動創(chuàng)建Kafka Connect Worker依賴資源

        建議勾選此項,此時會在選擇的Kafka實例中自動創(chuàng)建Kafka Connect運行所需的一些Internal Topic以及ConsumerGroup,并將這些必填配置自動填入配置框中,包括以下配置項:

        • Offset Topic:用于存儲源數(shù)據(jù)偏移量,命名規(guī)則為connect-eb-offset-<任務(wù)名稱>

        • Config Topic:用于存儲Connectors以及Tasks的配置信息,命名規(guī)則為connect-eb-config-<任務(wù)名稱>

        • Status Topic:用于存儲Connectors以及Tasks狀態(tài)信息,命名規(guī)則為connect-eb-status-<任務(wù)名稱>

        • Kafka Connect Consumer Group:Kafka Connect Worker用于消費Internal Topics的消費組,命名規(guī)則為connect-eb-cluster-<任務(wù)名稱>

        • Kafka Source Connector Consumer Group:只針對Sink Connector有效,用于消費源Kafka Topic中的數(shù)據(jù),命名規(guī)則為connector-eb-cluster-<任務(wù)名稱>-<connector名稱>

      4. 運行配置配置向?qū)В瑢?b data-tag="uicontrol" id="28bad2e183dw5" class="uicontrol">日志投遞方式設(shè)置為投遞至SLS或者投遞至Kafka,在角色授權(quán)卡片設(shè)置Connect依賴的角色配置,然后單擊保存

        重要

        建議配置的角色包含AliyunSAEFullAccess權(quán)限,否則可能會導(dǎo)致任務(wù)運行失敗。

    • 任務(wù)屬性

      設(shè)置此任務(wù)的重試策略及死信隊列。更多信息,請參見重試和死信

    等待任務(wù)狀態(tài)變?yōu)?b data-tag="uicontrol" id="uicontrol-m6v-xxa-n1r" class="uicontrol">運行中,此時Connector已經(jīng)在正常工作中。

步驟三:測試Connector

Source Connector

  1. 在DMS數(shù)據(jù)管理服務(wù)平臺,向步驟一:創(chuàng)建數(shù)據(jù)表中創(chuàng)建的Connection插入一條數(shù)據(jù)。例如,插入一條Key為testKey,Value為testValue的數(shù)據(jù),命令如下。

    db.testCollection.insert({"testKey":"testValue"})
  2. 登錄云消息隊列 Kafka 版控制臺,在實例列表頁面,單擊目標實例。

  3. 在目標實例頁面,單擊目標Topic,然后單擊消息查詢,查看插入的消息數(shù)據(jù),消息Value示例如下。

    {"_id": {"_data": "826464A63D000000012B022C0100296E5A1004CB11AB15FD6D4C409E37370B43A4B82246645F696400646464A624458CE6B7B626645B****"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1684317757, "i": 1}}, "fullDocument": {"_id": {"$oid": "6464a624458ce6b7b626****"}, "testKey": "testValue"}, "ns": {"db": "test", "coll": "mongo"}, "documentKey": {"_id": {"$oid": "6464a624458ce6b7b626****"}}}

Sink Connector

  1. 登錄云消息隊列 Kafka 版控制臺,在實例列表頁面,單擊目標實例。

  2. 在左側(cè)導(dǎo)航欄,單擊Topic管理,然后單擊目標Topic。

  3. 在Topic詳情頁面右上角,單擊體驗發(fā)送消息

  4. 快速體驗消息收發(fā)面板,設(shè)置消息內(nèi)容。例如在目標表格中添加一條Key為Key1,Value為Value1的數(shù)據(jù),消息內(nèi)容如下。

    {"key1": "value1"}
  5. 在DMS數(shù)據(jù)管理服務(wù)平臺,執(zhí)行以下命令,查看目標集合中接收的數(shù)據(jù)。

    db.mongo.find()

    接收數(shù)據(jù)示例如下:

    {
        "_id":"ObjectId("643f4d5551daf4552246****")"
        "_insertedTS":"ISODate("2023-05-18T02:09:25.314Z")"
        "_modifiedTS":"ISODate("2023-05-18T02:09:25.314Z")"
        "key1":"value1"
    }

常見報錯

場景一:所有Tasks運行失敗

錯誤信息:

All tasks under connector mongo-source failed, please check the error trace of the task.

解決方法:在消息流入任務(wù)詳情頁面,單擊基礎(chǔ)信息區(qū)域的診斷鏈接,即可跳轉(zhuǎn)到Connector監(jiān)控頁面,可以看到Tasks運行失敗的詳細錯誤信息。

場景二:Kafka Connect退出

錯誤信息:

Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.

解決方法:由于狀態(tài)獲取可能會有延遲,建議您先嘗試刷新頁面。若刷新后仍然是失敗狀態(tài),您可以按照以下步驟查看錯誤信息。

  1. 在消息流入任務(wù)詳情頁面的Worker信息區(qū)域,單擊SAE應(yīng)用后的實例名稱,跳轉(zhuǎn)到SAE應(yīng)用詳情頁面。

  2. 基本信息頁面,單擊實例部署信息頁簽。

  3. 在實例右側(cè)操作列,單擊Webshell登錄Kafka Connect運行環(huán)境。實例部署信息

    • 執(zhí)行vi /home/admin/connector-bootstrap.log命令,查看Connector啟動日志,查找其中是否包含錯誤信息。

    • 執(zhí)行vi /opt/kafka/logs/connect.log命令,查看Connector運行日志,在其中查找ERROR或者WARN字段來查看是否有錯誤信息。

基于錯誤信息提示進行修復(fù)操作后,可以重新啟動對應(yīng)任務(wù)。

場景三:Connector參數(shù)校驗失敗

錯誤信息:

Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`

解決方法:此時需要根據(jù)錯誤信息,找出具體哪個參數(shù)出錯,更新對應(yīng)參數(shù)即可。若基于上述錯誤信息無法定位具體的出錯參數(shù),可以參考上文場景二中的步驟登錄Kafka Connect運行環(huán)境,執(zhí)行以下命令,查詢參數(shù)是否校驗通過。

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate

該指令會返回Connector參數(shù)中每個參數(shù)是否校驗通過,若不通過,則errors屬性非空,如下所示。

"value":{
    "name":"snapshot.mode",
    "value":null,
    "recommended_values":[
        "never",
        "initial_only",
        "when_needed",
        "initial",
        "schema_only",
        "schema_only_recovery"
    ],
    "errors":[
        "Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
    ],
    "visible":true
}

場景四:無法連接服務(wù)器

錯誤信息:

Start or update connector mongo-source failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):Unable to connect to the server.

解決方法:請查看Connector的配置信息是否正確,然后檢查MongoDB實例與Kafka實例是否在同一個VPC及vSwitch下,如果不在一個vSwitch但是在同一VPC下,那么需要將Kafka實例所在的交換機網(wǎng)段加入MongoDB實例的白名單。

場景五:數(shù)據(jù)庫用戶名或者密碼信息包含特殊字符

錯誤信息:

The connection string contains invalid user information. If the username or password contains a colon (:) or an at-sign (@) then it must be urlencoded\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}

解決方法:請檢查MongoDB賬戶名和密碼中是否包含有at(@)或冒號(:),如有請轉(zhuǎn)義對應(yīng)的符號。at(@)和冒號(:)進行16進制URL編碼后分別為%40%3A