本文介紹如何創建MySQL Source Connector,通過DataWorks將數據從阿里云數據庫RDS MySQL版導出至云消息隊列 Kafka 版實例的Topic。
前提條件
- 為云消息隊列 Kafka 版實例開啟Connector。更多信息,請參見開啟Connector。重要 請確保您的云消息隊列 Kafka 版實例部署在華南1(深圳)、西南1(成都)、華北2(北京)、華北3(張家口)、華東1(杭州)、華東2(上海)或新加坡地域。
- 創建RDS MySQL實例。
- 創建數據庫和賬號。
- 創建數據庫表。常見的SQL語句,請參見常用語句。
- 阿里云賬號和RAM用戶均須授予DataWorks訪問您彈性網卡ENI資源的權限。授予權限,請訪問云資源訪問授權。重要 如果您使用的是RAM用戶,請確保您的賬號有以下權限:
- AliyunDataWorksFullAccess:DataWorks所有資源的管理權限。
- AliyunBSSOrderAccess:購買阿里云產品的權限。
如何為RAM用戶添加權限策略,請參見步驟二:為RAM用戶添加權限。
- 請確保您是阿里云數據庫RDS MySQL版實例(數據源)和云消息隊列 Kafka 版實例(數據目標)的所有者,即創建者。
- 請確保阿里云數據庫RDS MySQL版實例(數據源)和云消息隊列 Kafka 版實例(數據目標)所在的VPC網段沒有重合,否則無法成功創建同步任務。
背景信息
您可以在云消息隊列 Kafka 版控制臺創建數據同步任務,將您在阿里云數據庫RDS MySQL版數據庫表中的數據同步至云消息隊列 Kafka 版的Topic。該同步任務將依賴阿里云DataWorks產品實現,流程圖如下所示。如果您在云消息隊列 Kafka 版控制臺成功創建了數據同步任務,那么阿里云DataWorks會自動為您開通DataWorks產品基礎版服務(免費)、新建DataWorks項目(免費)、并新建數據集成獨享資源組(需付費),資源組規格為4c8g,購買模式為包年包月,時長為1個月并自動續費。阿里云DataWorks的計費詳情,請參見DataWorks計費概述。
此外,DataWorks會根據您數據同步任務的配置,自動為您生成云消息隊列 Kafka 版的目標Topic。數據庫表和Topic是一對一的關系,對于有主鍵的表,默認6分區;無主鍵的表,默認1分區。請確保實例剩余Topic數和分區數充足,不然任務會因為創建Topic失敗而導致異常。
Topic的命名格式為<配置的前綴>_<數據庫表名>
,下劃線(_)為系統自動添加的字符。詳情如下圖所示。
例如,您將前綴配置為mysql,需同步的數據庫表名為table_1,那么DataWorks會自動為您生成專用Topic,用來接收table_1同步過來的數據,該Topic的名稱為mysql_table_1;table_2的專用Topic名稱為mysql_table_2,以此類推。
注意事項
- 地域說明
- 如果數據源和目標實例位于不同地域,請確保您使用的賬號擁有云企業網實例,且云企業網實例已掛載數據源和目標實例所在的VPC,并配置好流量帶寬完成網絡打通。
否則,可能會新建云企業網實例,并將目標實例和獨享資源組ECS全部掛載到云企業網實例來打通網絡。這樣的云企業網實例沒有配置帶寬,所以帶寬流量很小,可能導致創建同步任務過程中的網絡訪問出錯,或者同步任務創建成功后,在運行過程中出錯。
- 如果數據源和目標實例位于同一地域,創建數據同步任務會自動在其中一個實例所在VPC創建ENI,并綁定到獨享資源組ECS上,以打通網絡。
- 如果數據源和目標實例位于不同地域,請確保您使用的賬號擁有云企業網實例,且云企業網實例已掛載數據源和目標實例所在的VPC,并配置好流量帶寬完成網絡打通。
- DataWorks獨享資源組說明
- DataWorks的每個獨享資源組可以運行最多3個同步任務。創建數據同步任務時,如果DataWorks發現您的賬號名下有資源組的歷史購買記錄,并且運行的同步任務少于3個,將使用已有資源組運行新建的同步任務。
- DataWorks的每個獨享資源組最多綁定兩個VPC的ENI。如果DataWorks發現已購買的資源組綁定的ENI與需要新綁定的ENI有網段沖突,或者其他技術限制,導致使用已購買的資源組無法創建出同步任務,此時,即使已有的資源組運行的同步任務少于3個,也將新建資源組確保同步任務能夠順利創建。
創建并部署MySQL Source Connector
- 登錄云消息隊列 Kafka 版控制臺。
- 在概覽頁面的資源分布區域,選擇地域。
- 在左側導航欄,單擊Connector 任務列表。
- 在Connector 任務列表頁面,從選擇實例的下拉列表選擇Connector所屬的實例,然后單擊創建 Connector。
- 在創建 Connector配置向導中,完成以下操作。
- 創建完成后,在Connector 任務列表頁面,找到創建的Connector ,單擊其操作列的部署。在Connector 任務列表頁面,您可以看到創建的任務狀態為運行中,則說明任務創建成功。說明 如果創建失敗,請再次檢查本文前提條件中的操作是否已全部完成。
如需配置同步任務,單擊其操作列的任務配置,跳轉至DataWorks控制臺完成操作。
驗證結果
- 向阿里云數據庫RDS MySQL版數據庫表插入數據。示例如下。
更多SQL語句,請參見常用語句。INSERT INTO mysql_tbl (mysql_title, mysql_author, submission_date) VALUES ("mysql2kafka", "tester", NOW())
- 使用云消息隊列 Kafka 版提供的消息查詢功能,驗證數據能否被導出至云消息隊列 Kafka 版目標Topic。查詢的具體步驟,請參見查詢消息。云數據庫RDS MySQL版數據庫表導出至云消息隊列 Kafka 版Topic的數據示例如下。消息結構及各字段含義,請參見附錄:消息格式。
{ "schema":{ "dataColumn":[ { "name":"mysql_id", "type":"LONG" }, { "name":"mysql_title", "type":"STRING" }, { "name":"mysql_author", "type":"STRING" }, { "name":"submission_date", "type":"DATE" } ], "primaryKey":[ "mysql_id" ], "source":{ "dbType":"MySQL", "dbName":"mysql_to_kafka", "tableName":"mysql_tbl" } }, "payload":{ "before":null, "after":{ "dataColumn":{ "mysql_title":"mysql2kafka", "mysql_author":"tester", "submission_date":1614700800000 } }, "sequenceId":"1614748790461000000", "timestamp":{ "eventTime":1614748870000, "systemTime":1614748870925, "checkpointTime":1614748870000 }, "op":"INSERT", "ddl":null }, "version":"0.0.1" }