本文為您介紹如何使用數據傳輸同步 OceanBase 數據庫的數據至 RocketMQ。
背景信息
消息隊列 RocketMQ 是阿里云基于 Apache RocketMQ 構建的低延遲、高并發、高可靠的分布式消息中間件。數據傳輸的數據同步功能可以幫助您實現 OceanBase 數據庫的物理表和 RocketMQ 數據源之間的數據實時同步,擴展消息處理能力。
同步 OceanBase 數據庫的數據至 RocketMQ 時,兩種租戶對應的數據格式請參見 數據庫傳輸到文本協議的格式說明。
前提條件
數據傳輸已具備云資源訪問權限。詳情請參見 數據傳輸遷移角色授權。
已為源端 OceanBase 數據庫創建專用于數據同步任務的數據庫用戶,并為其賦予了相關權限。詳情請參見 創建數據庫用戶。
使用限制
數據傳輸支持的 RocketMQ 實例版本為 V4.x 和 V5.x,包含商業版和社區版。
數據同步的對象僅支持物理表,不支持其他對象。
數據同步過程中,數據傳輸支持刪除表之后再新建表,即支持對已經同步的表進行
DROP TABLE
操作后,再執行CREATE TABLE
。數據傳輸不支持通過重命名的方式新建表,即不支持執行RENAME TABLE a TO a_tmp
操作。待同步的表名和其中的列名不能包含中文字符。
數據傳輸僅支持遷移庫名、表名和列名為 ASCII 碼且不包含特殊字符(包括換行、空格,以及 .|"'`()=;/&\)的對象。
數據傳輸不支持 OceanBase 備庫作為源端。
注意事項
當 OceanBase 數據庫 V4.x 進行增量同步時,如果生成列沒有標記 STORED 屬性,則同步目標端時該列將同步為 NULL 值,導致下游接收到的該列數據不符合預期。
當更新的行包括 LOB 列時:
如果 LOB 列為更新列,請勿依賴 LOB 列在
UPDATE
或DELETE
操作前的值。目前使用 LOB 列進行存儲的數據類型包括 JSON、GIS、XML、UDT(用戶定義類型),以及 LONGTEXT、MEDIUMTEXT 等各類 TEXT。
如果 LOB 列為非更新列,LOB 列在
UPDATE
或DELETE
操作前或操作后的值均為 NULL。
當任務意外中斷進行斷點續傳時,RocketMQ 實例中可能會存在部分重復數據(最近一分鐘內),因此下游系統需要具備排重能力。
節點之間的時鐘不同步,或者電腦終端和服務器之間的時鐘不同步,均可能導致增量同步的延遲時間不準確。
例如,如果時鐘早于標準時間,可能導致延遲時間為負數。如果時鐘晚于標準時間,可能導致延遲。
如果在創建數據同步任務時,您僅配置了 增量同步,數據傳輸要求源端數據庫的本地增量日志保存 48 小時以上。
如果在創建數據同步任務時,您配置了 全量同步+增量同步,數據傳輸要求源端數據庫的本地增量日志至少保留 7 天以上。否則數據傳輸可能因無法獲取增量日志而導致數據同步任務失敗,甚至導致源端和目標端數據不一致。
支持的源端和目標端實例類型
下表中,OceanBase 數據庫 MySQL 租戶簡稱為 OB_MySQL,OceanBase 數據庫 Oracle 租戶簡稱為 OB_Oracle。
源端 | 目標端 |
OB_MySQL(OceanBase 集群實例) | RocketMQ(阿里云 RocketMQ 實例) |
OB_MySQL(OceanBase 集群實例) | RocketMQ(VPC 內自建 RocketMQ 實例) |
OB_MySQL(OceanBase 集群實例) | RocketMQ(公網 RocketMQ 實例) |
OB_MySQL(Serverless 實例) | RocketMQ(阿里云 RocketMQ 實例) |
OB_MySQL(Serverless 實例) | RocketMQ(VPC 內自建 RocketMQ 實例) |
OB_MySQL(Serverless 實例) | RocketMQ(公網 RocketMQ 實例) |
OB_Oracle(OceanBase 集群實例) | RocketMQ(阿里云 RocketMQ 實例) |
OB_Oracle(OceanBase 集群實例) | RocketMQ(VPC 內自建 RocketMQ 實例) |
OB_Oracle(OceanBase 集群實例) | RocketMQ(公網 RocketMQ 實例) |
操作步驟
登錄 OceanBase 管理控制臺,購買數據同步任務。
詳情請參見 購買數據同步任務。
在 數據傳輸 > 數據同步 頁面,單擊新購買的數據同步任務后的 配置。
如果您需要引用已有的任務配置信息,可以單擊 引用配置。詳情請參見 引用和清空數據同步任務配置。
在 選擇源和目標 頁面,配置各項參數。
參數
描述
同步任務名稱
建議使用中文、數字和字母的組合。名稱中不能包含空格,長度不能超過 64 個字符。
源端
如果您已新建 OceanBase 數據源,請從下拉列表中進行選擇。如果未新建,請單擊下拉列表中的 新建數據源,在右側對話框進行新建。參數詳情請參見 新建 OceanBase 數據源。
目標端
如果您已新建 RocketMQ 數據源,請從下拉列表中進行選擇。如果未新建,請單擊下拉列表中的 新建數據源,在右側對話框進行新建。參數詳情請參見 新建 RocketMQ 數據源。
標簽(可選)
單擊文本框,在下拉列表中選擇目標標簽。您也可以單擊 管理標簽,進行新建、修改和刪除。詳情請參見 通過標簽管理數據同步任務。
單擊 下一步。在 選擇同步類型 頁面,選擇當前數據同步任務的同步類型。
同步類型包括 全量同步 和 增量同步。增量同步 僅支持 DML 同步(包括
Insert
、Delete
和Update
),您可以根據需求進行自定義配置。詳情請參見 自定義配置 DDL/DML。單擊 下一步。在 選擇同步對象 頁面,選擇當前數據同步任務需要同步的對象。
您可以通過 指定對象 和 匹配規則 兩個入口選擇同步對象。本文為您介紹通過 指定對象 方式選擇同步對象的具體操作,配置匹配規則的詳情請參見 配置匹配規則 中庫到消息隊列的通配規則說明和配置方式。
說明如果您在 選擇同步類型 步驟已勾選 DDL 同步,建議通過匹配規則方式選擇同步對象,以確保所有符合同步對象規則的新增對象都將被同步。如果您通過指定對象方式選擇同步對象,則新增對象或重命名后的對象將不會被同步。
同步 OceanBase 數據庫的數據至 RocketMQ 時,支持多表到多 Topic 的同步。
在 選擇同步對象 區域,選中 指定對象。
在選擇區域左側選中需要同步的對象。
單擊 >。
在 將對象映射至 Topic 對話框的 已有 Topic 下拉列表中,搜索并選中需要同步的 Topic。
您也可以輸入已有 Topic 后選中顯示的 Topic 名稱。
單擊 確定。
數據傳輸支持通過文本導入對象,并支持對目標端對象進行更改 Topic、設置行過濾、移除單個對象或全部對象等操作。目標端對象的結構為 Topic>Database>Table。
說明通過 匹配規則 方式選擇同步對象時,重命名能力由匹配規則語法覆蓋,操作處僅支持設置過濾條件,以及選擇分片列和需要同步的列。詳情請參見 配置匹配規則。
操作
步驟
導入對象
在選擇區域的右側列表中,單擊右上角的 導入對象。
在對話框中,單擊 確定。
重要導入會覆蓋之前的操作選擇,請謹慎操作。
在 導入同步對象 對話框中,導入需要同步的對象。 您可以通過導入 CSV 文件的方式進行設置行過濾條件、設置過濾列和設置分片列等操作。詳情請參見 下載和導入同步對象配置。
單擊 檢驗合法性。
通過合法性的檢驗后,單擊 確定。
更改 Topic
數據傳輸支持對目標對象進行更改 Topic 操作。詳情請參見 更改 Topic。
設置
數據傳輸支持
WHERE
條件實現行過濾,并選擇分片列和需要同步的列。在 設置 對話框中,您可以進行以下操作。
在 行過濾條件 區域的文本框中,輸入標準的 SQL 語句中的
WHERE
子句,來配置行過濾。詳情請參見 SQL 條件過濾數據。在 分片列 下拉列表中,選擇目標分片列。您可以選擇多個字段作為分片列,該參數為可選參數。
選擇分片列時,如果沒有特殊情況,默認選擇主鍵即可。如果存在主鍵負載不均衡的情況,請選擇唯一性標識且負載相對均衡的字段作為分片列,避免潛在的性能問題。分片列的主要作用如下:
負載均衡:在目標端可以進行并發寫入的情況下,通過分片列區分發送消息需要使用的特定線程。
有序性:由于存在并發寫入可能導致的無序問題,數據傳輸確保在分片列的值相同的情況下,用戶接收到的消息是有序的。此處的有序是指變更順序(DML 對于一列的執行順序)。
在 選擇列 區域,選擇需要同步的列。詳情請參見 列過濾。
移除/全部移除
數據傳輸支持在數據映射時,對暫時選中到目標端的單個或多個對象進行移除操作。
移除單個同步對象
在選擇區域的右側列表中,鼠標懸停至目標對象,單擊顯示的 移除,即可移除該同步對象。
移除全部同步對象
在選擇區域的右側列表中,單擊右上角的 全部移除。在對話框中,單擊 確定,即可移除全部同步對象。
單擊 下一步。在 同步選項 頁面,配置各項參數。
全量同步
在 選擇同步類型 頁面,選中 全量同步,才會顯示下述參數。
參數
描述
讀取并發配置
該參數用于配置全量同步階段從源端讀取數據的并發數,最大限制為 512.并發數過高可能會造成源端壓力過大,影響業務。
寫入并發配置
該參數用于配置全量同步階段往目標端寫入數據的并發數,最大限制為 512。并發數過高可能會造成目標端壓力過大,影響業務。
全量同步速率限制
您可以根據實際需求決定是否開啟全量同步速率限制。如果開啟,請設置 RPS(全量同步階段每秒最多可以同步至目標端的數據行數的最大值限制)和 BPS(全量同步階段每秒最多可以同步至目標端的數據量的最大值限制)。
說明此處設置的 RPS 和 BPS 僅作為限速限流能力,全量同步實際可以達到的性能受限于源端、目標端、實例規格配置等因素的影響。
增量同步
在 選擇同步類型 頁面,選中 增量同步,才會顯示下述參數。
參數
描述
寫入并發配置
該參數用于配置增量同步階段往目標端寫入數據的并發數,最大限制為 512。并發數過高可能會造成目標端壓力過大,影響業務。
增量同步速率限制
您可以根據實際需求決定是否開啟增量同步速率限制。如果開啟,請設置 RPS(增量同步階段每秒最多可以同步至目標端的數據行數的最大值限制)和 BPS(增量同步階段每秒最多可以同步至目標端的數據量的最大值限制)。
說明此處設置的 RPS 和 BPS 僅作為限速限流能力,增量同步實際可以達到的性能受限于源端、目標端、實例規格配置等因素的影響。
增量同步起始位點
如果選擇同步類型時已選擇 全量同步,則不支持修改該參數。
如果選擇同步類型時未選擇 全量同步,但選擇了 增量同步,請在此處指定同步某個時間節點之后的數據,默認為當前系統時間。詳情請參見 設置增量同步位點。
高級選項
參數
描述
序列化方式
控制數據同步至 RocketMQ 的消息格式,目前支持 Default、Canal、DataWorks(支持 2.0 版本)、SharePlex、DefaultExtendColumnType、Debezium、DebeziumFlatten 和 DebeziumSmt。詳情請參見 數據格式說明。
重要目前僅 OceanBase 數據庫 MySQL 租戶支持 Debezium、DebeziumFlatten 和 DebeziumSmt。
分區規則
同步源端數據至 RocketMQ 的規則,目前僅支持 Hash。Hash 表示數據傳輸使用一定的 Hash 算法,根據主鍵值或分片列值 Hash 選擇 RocketMQ 的隊列(MessageQueue)。
業務系統標識(可選)
僅選擇 序列化方式 為 DataWorks 時,會顯示該參數,用于標識數據的業務系統來源,以便您后續進行自定義處理。該業務系統標識的長度限制為 1~20 個字符。
目標端
參數
描述
請輸入生產者群組
生產者群組用于標識一組生產者,能夠向多個 Topic 中寫入數據。
是否允許消息追蹤
如果允許消息追蹤,則可以追蹤到一條消息從生產者發送到消息隊列 RocketMQ 版服務端,再到消費者消費處理,整個過程中的各個相關節點的時間、狀態等數據匯聚而成的完整任務信息。該消息軌跡可以作為排查生產環境中的問題強有力的數據支持。
單擊 預檢查。
在 預檢查 環節,數據傳輸會檢測源端和目標端的連接情況。如果預檢查報錯:
您可以在排查并處理問題后,重新執行預檢查,直至預檢查成功。
您也可以單擊錯誤預檢查項操作列中的 跳過,會彈出對話框提示您跳過本操作的具體影響,確認可以跳過后,請單擊對話框中的 確定。
預檢查成功后,單擊 啟動任務。
如果您暫時無需啟動任務,請單擊 保存。后續您只能在 同步任務列表 頁面手動啟動任務或通過批量操作啟動任務。批量操作的詳情請參見 批量操作數據同步任務。
數據傳輸支持在數據同步任務運行過程中修改同步對象,詳情請參見 查看和修改同步對象及其過濾條件。數據同步任務啟動后,會根據選擇的同步類型依次執行,詳情請參見 查看同步詳情。
如果數據同步任務運行報錯(通常由于網絡不通或進程啟動過慢導致),您可以在數據同步任務的列表或詳情頁面,單擊 恢復。