阿里云流式數據服務DataHub是流式數據(Streaming Data)的處理平臺,提供對流式數據的發布、訂閱和分發功能,讓您可以輕松構建基于流式數據的分析和應用。通過數據傳輸服務(Data Transmission Service,簡稱DTS),您可以將PolarDB MySQL版同步至DataHub,幫助您快速實現使用流計算等大數據產品對數據實時分析。

前提條件

  • DataHub實例的地域為華東1、華東2、華北2或華南1。
  • DataHub實例中,已創建用作接收同步數據的項目(Project),詳情請參見創建項目
  • PolarDB MySQL版集群已開啟Binlog,詳情請參見如何開啟Binlog
  • PolarDB MySQL版中待同步的表需具備主鍵或唯一約束。

費用說明

同步類型鏈路配置費用
庫表結構同步和全量數據同步不收費。
增量數據同步收費,詳情請參見計費概述

功能限制

  • 不支持全量數據初始化,即DTS不會將源PolarDB MySQL版集群中同步對象的存量數據同步至目標DataHub實例。
  • 僅支持表級別的數據同步。
  • 不支持新增列的數據同步,即源數據表新增了某個列,該列的數據不會同步至目標DataHub實例。
  • 數據同步的過程中,請勿對源庫中待同步的表執行DDL變更,否則會導致同步失敗。

支持同步的SQL操作

INSERT、UPDATE、DELETE。

操作步驟

  1. 購買數據同步作業,詳情請參見購買流程
    說明 購買時,選擇源實例為PolarDB、目標實例為DataHub,并選擇同步拓撲為單向同步
  2. 登錄數據傳輸控制臺
    說明 若數據傳輸控制臺自動跳轉至數據管理DMS控制臺,您可以在右下角的jiqiren中單擊返回舊版,返回至舊版數據傳輸控制臺。
  3. 在左側導航欄,單擊數據同步
  4. 同步作業列表頁面頂部,選擇同步的目標實例所屬地域。
  5. 定位至已購買的數據同步實例,單擊配置同步鏈路
  6. 配置同步作業的源實例及目標實例信息。
    配置源和目標實例信息
    類別配置說明
    同步作業名稱DTS會自動生成一個同步作業名稱,建議配置具有業務意義的名稱(無唯一性要求),便于后續識別。
    源實例信息實例類型固定為PolarDB實例,不可變更。
    實例地區購買數據同步實例時選擇的源實例地域信息,不可變更。
    PolarDB實例ID選擇源PolarDB MySQL版集群ID。
    數據庫賬號填入PolarDB MySQL版集群的數據庫賬號。
    數據庫密碼填入數據庫賬號對應的密碼。
    目標實例信息實例類型固定為DataHub,不可變更。
    實例地區購買數據同步實例時選擇的目標實例地域信息,不可變更。
    Project選擇DataHub實例的Project
  7. 單擊頁面右下角的授權白名單并進入下一步
    說明
    • 如果源或目標數據庫是阿里云數據庫實例(例如RDS MySQL云數據庫MongoDB版等)或ECS上的自建數據庫,DTS會自動將對應地區DTS服務的IP地址添加到阿里云數據庫實例的白名單或ECS的安全規則中,您無需手動添加,請參見DTS服務器的IP地址段
    • DTS任務完成或釋放后,建議您手動刪除添加的DTS服務器IP地址段。
  8. 配置同步策略和同步對象。
    配置同步對象
    配置說明
    同步初始化勾選結構初始化
    說明 勾選結構初始化后,在數據同步作業的初始化階段,DTS會將同步對象的結構信息(例如表結構)同步至目標DataHub實例。
    選擇同步對象

    源庫對象框中單擊待遷移的對象,然后單擊向右小箭頭將其移動至已選擇對象框。

    說明
    • 同步對象的選擇粒度為表。
    • 默認情況下,同步對象的名稱保持不變。如果您需要改變同步對象在目標實例中的名稱,需要使用對象名映射功能,詳情請參見設置同步對象在目標實例中的名稱
    選擇附加列規則DTS在將數據同步到DataHub時,會在同步的目標Topic中添加一些附加列。如果附加列和目標Topic中已有的列出現名稱沖突將會導致數據同步失敗。您需要根據業務需求選擇是否啟用新的附加列規則
    警告 在選擇附加列規則前,您需要評估附加列和目標Topic中已有的列是否會出現名稱沖突。關于附加列的規則和定義說明,請參見附加列名稱和定義說明
    映射名稱更改

    如需更改同步對象在目標實例中的名稱,請使用對象名映射功能,詳情請參見庫表列映射

    源表DMS_ONLINE_DDL過程中是否復制臨時表到目標庫
    如源庫使用數據管理DMS(Data Management Service)執行Online DDL變更,您可以選擇是否同步Online DDL變更產生的臨時表數據。
    • :同步Online DDL變更產生的臨時表數據。
      說明 Online DDL變更產生的臨時表數據過大,可能會導致同步任務延遲。
    • :不同步Online DDL變更產生的臨時表數據,只同步源庫的原始DDL數據。
      說明 該方案會導致目標庫鎖表。
    源、目標庫無法連接重試時間
    當源、目標庫無法連接時,DTS默認重試720分鐘(即12小時),您也可以自定義重試時間。如果DTS在設置的時間內重新連接上源、目標庫,同步任務將自動恢復。否則,同步任務將失敗。
    說明 由于連接重試期間,DTS將收取任務運行費用,建議您根據業務需要自定義重試時間,或者在源和目標庫實例釋放后盡快釋放DTS實例。
  9. 可選:將鼠標指針放置在已選擇對象框中待同步的Topic名上,單擊對象后出現的編輯,然后在彈出的對話框中設置Shardkey(即用于分區的key)。
  10. 上述配置完成后,單擊頁面右下角的預檢查并啟動
    說明
    • 在同步作業正式啟動之前,會先進行預檢查。只有預檢查通過后,才能成功啟動同步作業。
    • 如果預檢查失敗,單擊具體檢查項后的提示,查看失敗詳情。
      • 您可以根據提示修復后重新進行預檢查。
      • 如無需修復告警檢測項,您也可以選擇確認屏蔽忽略告警項并重新進行預檢查,跳過告警檢測項重新進行預檢查。
  11. 預檢查對話框中顯示預檢查通過后,關閉預檢查對話框,同步作業將正式開始。
  12. 等待同步作業的鏈路初始化完成,直至處于同步中狀態。
    您可以在數據同步頁面,查看數據同步作業的狀態。查看同步作業狀態

Topic結構定義說明

DTS在將數據變更同步至DataHub實例的Topic時,目標Topic中除了存儲變更數據外,還會新增一些附加列用于存儲元信息,示例如下。
說明 本案例中的業務字段為idnameaddress,由于在配置數據同步時選用的是舊版附加列規則,DTS會為業務字段添加dts_的前綴。
Topic定義

結構定義說明:

舊版附加列名稱新版附加列名稱數據類型說明
dts_record_idnew_dts_sync_dts_record_idString增量日志的記錄ID,為該日志唯一標識。
說明
  • 正常情況下是全局唯一自增的,在容災的情況下會有回退且無法保證自增和唯一。
  • 如果增量日志的操作類型為UPDATE,那么增量更新會被拆分成兩條記錄(分別記錄更新前和更新后的值),且dts_record_id的值相同。
dts_operation_flagnew_dts_sync_dts_operation_flagString操作類型,取值:
  • I:INSERT操作。
  • D:DELETE操作。
  • U:UPDATE操作。
dts_instance_idnew_dts_sync_dts_instance_idString數據庫的server ID。
dts_db_namenew_dts_sync_dts_db_nameString數據庫名稱。
dts_table_namenew_dts_sync_dts_table_nameString表名。
dts_utc_timestampnew_dts_sync_dts_utc_timestampString操作時間戳,即日志的時間戳(UTC 時間)。
dts_before_flagnew_dts_sync_dts_before_flagString所有列的值是否更新前的值,取值:Y或N。
dts_after_flagnew_dts_sync_dts_after_flagString所有列的值是否更新后的值,取值:Y或N。

關于dts_before_flag和dts_after_flag的補充說明

對于不同的操作類型,增量日志中的dts_before_flagdts_after_flag定義如下:

  • INSERT

    當操作類型為INSERT時,所有列的值為新插入的值,即為更新后的值,所以dts_before_flag取值為N,dts_after_flag取值為Y,示例如下。

    INSERT操作
  • UPDATE

    當操作類型為UPDATE時,DTS會將UPDATE操作拆為兩條增量日志。這兩條增量日志的dts_record_iddts_operation_flagdts_utc_timestamp對應的值相同。

    第一條增量日志記錄了更新前的值,所以dts_before_flag取值為Y,dts_after_flag取值為N。第二條增量日志記錄了更新后的值,所以 dts_before_flag取值為N,dts_after_flag取值為Y,示例如下。

    UPDATE操作
  • DELETE

    當操作類型為DELETE時,增量日志中所有的列值為被刪除的值,即列值不變,所以dts_before_flag取值為Y, dts_after_flag取值為N,示例如下。

    DELETE操作

后續操作

配置完數據同步作業后,您可以對同步到DataHub實例中的數據執行計算分析。更多詳情,請參見阿里云實時計算