本文介紹如何創建MySQL Source Connector,通過DataWorks將數據從阿里云數據庫RDS MySQL版導出至云消息隊列 Kafka 版實例的Topic。

前提條件

在導出數據前,請確保您已完成以下操作:
  • 云消息隊列 Kafka 版實例開啟Connector。更多信息,請參見開啟Connector
    重要 請確保您的云消息隊列 Kafka 版實例部署在華南1(深圳)、西南1(成都)、華北2(北京)、華北3(張家口)、華東1(杭州)、華東2(上海)或新加坡地域。
  • 創建RDS MySQL實例
  • 創建數據庫和賬號
  • 創建數據庫表。常見的SQL語句,請參見常用語句
  • 阿里云賬號和RAM用戶均須授予DataWorks訪問您彈性網卡ENI資源的權限。授予權限,請訪問云資源訪問授權
    重要 如果您使用的是RAM用戶,請確保您的賬號有以下權限:
    • AliyunDataWorksFullAccess:DataWorks所有資源的管理權限。
    • AliyunBSSOrderAccess:購買阿里云產品的權限。

    如何為RAM用戶添加權限策略,請參見步驟二:為RAM用戶添加權限

  • 請確保您是阿里云數據庫RDS MySQL版實例(數據源)和云消息隊列 Kafka 版實例(數據目標)的所有者,即創建者。
  • 請確保阿里云數據庫RDS MySQL版實例(數據源)和云消息隊列 Kafka 版實例(數據目標)所在的VPC網段沒有重合,否則無法成功創建同步任務。

背景信息

您可以在云消息隊列 Kafka 版控制臺創建數據同步任務,將您在阿里云數據庫RDS MySQL版數據庫表中的數據同步至云消息隊列 Kafka 版的Topic。該同步任務將依賴阿里云DataWorks產品實現,流程圖如下所示。mysql_connector

如果您在云消息隊列 Kafka 版控制臺成功創建了數據同步任務,那么阿里云DataWorks會自動為您開通DataWorks產品基礎版服務(免費)、新建DataWorks項目(免費)、并新建數據集成獨享資源組(需付費),資源組規格為4c8g,購買模式為包年包月,時長為1個月并自動續費。阿里云DataWorks的計費詳情,請參見DataWorks計費概述

此外,DataWorks會根據您數據同步任務的配置,自動為您生成云消息隊列 Kafka 版的目標Topic。數據庫表和Topic是一對一的關系,對于有主鍵的表,默認6分區;無主鍵的表,默認1分區。請確保實例剩余Topic數和分區數充足,不然任務會因為創建Topic失敗而導致異常。

Topic的命名格式為<配置的前綴>_<數據庫表名>,下劃線(_)為系統自動添加的字符。詳情如下圖所示。

table_topic_match

例如,您將前綴配置為mysql,需同步的數據庫表名為table_1,那么DataWorks會自動為您生成專用Topic,用來接收table_1同步過來的數據,該Topic的名稱為mysql_table_1;table_2的專用Topic名稱為mysql_table_2,以此類推。

注意事項

  • 地域說明
    • 如果數據源和目標實例位于不同地域,請確保您使用的賬號擁有云企業網實例,且云企業網實例已掛載數據源和目標實例所在的VPC,并配置好流量帶寬完成網絡打通。

      否則,可能會新建云企業網實例,并將目標實例和獨享資源組ECS全部掛載到云企業網實例來打通網絡。這樣的云企業網實例沒有配置帶寬,所以帶寬流量很小,可能導致創建同步任務過程中的網絡訪問出錯,或者同步任務創建成功后,在運行過程中出錯。

    • 如果數據源和目標實例位于同一地域,創建數據同步任務會自動在其中一個實例所在VPC創建ENI,并綁定到獨享資源組ECS上,以打通網絡。
  • DataWorks獨享資源組說明
    • DataWorks的每個獨享資源組可以運行最多3個同步任務。創建數據同步任務時,如果DataWorks發現您的賬號名下有資源組的歷史購買記錄,并且運行的同步任務少于3個,將使用已有資源組運行新建的同步任務。
    • DataWorks的每個獨享資源組最多綁定兩個VPC的ENI。如果DataWorks發現已購買的資源組綁定的ENI與需要新綁定的ENI有網段沖突,或者其他技術限制,導致使用已購買的資源組無法創建出同步任務,此時,即使已有的資源組運行的同步任務少于3個,也將新建資源組確保同步任務能夠順利創建。

創建并部署MySQL Source Connector

  1. 登錄云消息隊列 Kafka 版控制臺
  2. 概覽頁面的資源分布區域,選擇地域。
  3. 在左側導航欄,單擊Connector 任務列表
  4. Connector 任務列表頁面,從選擇實例的下拉列表選擇Connector所屬的實例,然后單擊創建 Connector
  5. 創建 Connector配置向導中,完成以下操作。
    1. 配置基本信息頁簽的名稱文本框,輸入Connector名稱,然后單擊下一步
      參數描述示例值
      名稱Connector的名稱。命名規則:
      • 可以包含數字、小寫英文字母和短劃線(-),但不能以短劃線(-)開頭,長度限制為48個字符。
      • 同一個云消息隊列 Kafka 版實例內保持唯一。

      Connector的數據同步任務必須使用名稱為connect-任務名稱Group。如果您未手動創建該Group,系統將為您自動創建。

      kafka-source-mysql
      實例默認配置為實例的名稱與實例ID。demo alikafka_post-cn-st21p8vj****
    2. 配置源服務頁簽,選擇數據源云數據庫RDS MySQL版,配置以下參數,然后單擊下一步
      參數描述示例值
      RDS 實例所在地域從下拉列表中,選擇阿里云數據庫RDS MySQL版實例所在的地域。華南1(深圳)
      云數據庫 RDS 實例 ID需要同步數據的阿里云數據庫RDS MySQL版的實例ID。rm-wz91w3vk6owmz****
      數據庫名稱需要同步的阿里云數據庫RDS MySQL版實例數據庫的名稱。mysql-to-kafka
      數據庫賬號需要同步的阿里云數據庫RDS MySQL版實例數據庫賬號。mysql_to_kafka
      數據庫賬號密碼需要同步的阿里云數據庫RDS MySQL版實例數據庫賬號的密碼。
      數據庫表需要同步的阿里云數據庫RDS MySQL版實例數據庫表的名稱,多個表名以英文逗號(,)分隔。

      數據庫表和目標Topic是一對一的關系。

      mysql_tbl
      自動添加數據表批量添加數據庫中的其他表。當創建的新表匹配成功時,也可被識別并同步數據。

      格式為正則表達式。例如,輸入.*,表示匹配數據庫中的所有表。

      .*
      Topic 前綴阿里云數據庫RDS MySQL版數據庫表同步到云消息隊列 Kafka 版的Topic的命名前綴,請確保前綴全局唯一。mysql
      重要
      請確保阿里云數據庫RDS MySQL版數據庫賬號有以下最小權限:
      • SELECT
      • REPLICATION SLAVE
      • REPLICATION CLIENT
      授權命令示例:
      GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '同步賬號'@'%'; //授予數據庫賬號的SELECT、REPLICATION SLAVE和REPLICATION CLIENT權限。
    3. 配置目標服務頁簽,顯示數據將同步到目標云消息隊列 Kafka 版實例,確認信息無誤后,單擊創建
  6. 創建完成后,在Connector 任務列表頁面,找到創建的Connector ,單擊其操作列的部署
    Connector 任務列表頁面,您可以看到創建的任務狀態運行中,則說明任務創建成功。
    說明 如果創建失敗,請再次檢查本文前提條件中的操作是否已全部完成。

    如需配置同步任務,單擊其操作列的任務配置,跳轉至DataWorks控制臺完成操作。

驗證結果

  1. 向阿里云數據庫RDS MySQL版數據庫表插入數據。
    示例如下。
    INSERT INTO mysql_tbl
        (mysql_title, mysql_author, submission_date)
        VALUES
        ("mysql2kafka", "tester", NOW())
    更多SQL語句,請參見常用語句
  2. 使用云消息隊列 Kafka 版提供的消息查詢功能,驗證數據能否被導出至云消息隊列 Kafka 版目標Topic。
    查詢的具體步驟,請參見查詢消息
    云數據庫RDS MySQL版數據庫表導出至云消息隊列 Kafka 版Topic的數據示例如下。消息結構及各字段含義,請參見附錄:消息格式
    {
        "schema":{
            "dataColumn":[
                {
                    "name":"mysql_id",
                    "type":"LONG"
                },
                {
                    "name":"mysql_title",
                    "type":"STRING"
                },
                {
                    "name":"mysql_author",
                    "type":"STRING"
                },
                {
                    "name":"submission_date",
                    "type":"DATE"
                }
            ],
            "primaryKey":[
                "mysql_id"
            ],
            "source":{
                "dbType":"MySQL",
                "dbName":"mysql_to_kafka",
                "tableName":"mysql_tbl"
            }
        },
        "payload":{
            "before":null,
            "after":{
                "dataColumn":{
                    "mysql_title":"mysql2kafka",
                    "mysql_author":"tester",
                    "submission_date":1614700800000
                }
            },
            "sequenceId":"1614748790461000000",
            "timestamp":{
                "eventTime":1614748870000,
                "systemTime":1614748870925,
                "checkpointTime":1614748870000
            },
            "op":"INSERT",
            "ddl":null
        },
        "version":"0.0.1"
    }