阿里云流式數據服務DataHub是流式數據(Streaming Data)的處理平臺,提供對流式數據的發布、訂閱和分發功能,讓您可以輕松構建基于流式數據的分析和應用。通過數據傳輸服務(Data Transmission Service,簡稱DTS),您可以將PolarDB MySQL版同步至DataHub,幫助您快速實現使用流計算等大數據產品對數據實時分析。
前提條件
- DataHub實例的地域為華東1、華東2、華北2或華南1。
- DataHub實例中,已創建用作接收同步數據的項目(Project),詳情請參見創建項目。
- PolarDB MySQL版集群已開啟Binlog,詳情請參見如何開啟Binlog。
- PolarDB MySQL版中待同步的表需具備主鍵或唯一約束。
費用說明
同步類型 | 鏈路配置費用 |
---|---|
庫表結構同步和全量數據同步 | 不收費。 |
增量數據同步 | 收費,詳情請參見計費概述。 |
功能限制
- 不支持全量數據初始化,即DTS不會將源PolarDB MySQL版集群中同步對象的存量數據同步至目標DataHub實例。
- 僅支持表級別的數據同步。
- 不支持新增列的數據同步,即源數據表新增了某個列,該列的數據不會同步至目標DataHub實例。
- 數據同步的過程中,請勿對源庫中待同步的表執行DDL變更,否則會導致同步失敗。
支持同步的SQL操作
INSERT、UPDATE、DELETE。
操作步驟
- 購買數據同步作業,詳情請參見購買流程。說明 購買時,選擇源實例為PolarDB、目標實例為DataHub,并選擇同步拓撲為單向同步。
- 登錄數據傳輸控制臺。說明 若數據傳輸控制臺自動跳轉至數據管理DMS控制臺,您可以在右下角的中單擊,返回至舊版數據傳輸控制臺。
- 在左側導航欄,單擊數據同步。
- 在同步作業列表頁面頂部,選擇同步的目標實例所屬地域。
- 定位至已購買的數據同步實例,單擊配置同步鏈路。
- 配置同步作業的源實例及目標實例信息。
類別 配置 說明 無 同步作業名稱 DTS會自動生成一個同步作業名稱,建議配置具有業務意義的名稱(無唯一性要求),便于后續識別。 源實例信息 實例類型 固定為PolarDB實例,不可變更。 實例地區 購買數據同步實例時選擇的源實例地域信息,不可變更。 PolarDB實例ID 選擇源PolarDB MySQL版集群ID。 數據庫賬號 填入PolarDB MySQL版集群的數據庫賬號。 數據庫密碼 填入數據庫賬號對應的密碼。 目標實例信息 實例類型 固定為DataHub,不可變更。 實例地區 購買數據同步實例時選擇的目標實例地域信息,不可變更。 Project 選擇DataHub實例的Project。 - 單擊頁面右下角的授權白名單并進入下一步。說明
- 如果源或目標數據庫是阿里云數據庫實例(例如RDS MySQL、云數據庫MongoDB版等)或ECS上的自建數據庫,DTS會自動將對應地區DTS服務的IP地址添加到阿里云數據庫實例的白名單或ECS的安全規則中,您無需手動添加,請參見DTS服務器的IP地址段。
- DTS任務完成或釋放后,建議您手動刪除添加的DTS服務器IP地址段。
- 配置同步策略和同步對象。
配置 說明 同步初始化 勾選結構初始化。 說明 勾選結構初始化后,在數據同步作業的初始化階段,DTS會將同步對象的結構信息(例如表結構)同步至目標DataHub實例。選擇同步對象 在源庫對象框中單擊待遷移的對象,然后單擊將其移動至已選擇對象框。
說明- 同步對象的選擇粒度為表。
- 默認情況下,同步對象的名稱保持不變。如果您需要改變同步對象在目標實例中的名稱,需要使用對象名映射功能,詳情請參見設置同步對象在目標實例中的名稱。
選擇附加列規則 DTS在將數據同步到DataHub時,會在同步的目標Topic中添加一些附加列。如果附加列和目標Topic中已有的列出現名稱沖突將會導致數據同步失敗。您需要根據業務需求選擇是否啟用新的附加列規則為是或否。 警告 在選擇附加列規則前,您需要評估附加列和目標Topic中已有的列是否會出現名稱沖突。關于附加列的規則和定義說明,請參見附加列名稱和定義說明。映射名稱更改 如需更改同步對象在目標實例中的名稱,請使用對象名映射功能,詳情請參見庫表列映射。
源表DMS_ONLINE_DDL過程中是否復制臨時表到目標庫 如源庫使用數據管理DMS(Data Management Service)執行Online DDL變更,您可以選擇是否同步Online DDL變更產生的臨時表數據。- 是:同步Online DDL變更產生的臨時表數據。說明 Online DDL變更產生的臨時表數據過大,可能會導致同步任務延遲。
- 否:不同步Online DDL變更產生的臨時表數據,只同步源庫的原始DDL數據。說明 該方案會導致目標庫鎖表。
源、目標庫無法連接重試時間 當源、目標庫無法連接時,DTS默認重試720分鐘(即12小時),您也可以自定義重試時間。如果DTS在設置的時間內重新連接上源、目標庫,同步任務將自動恢復。否則,同步任務將失敗。說明 由于連接重試期間,DTS將收取任務運行費用,建議您根據業務需要自定義重試時間,或者在源和目標庫實例釋放后盡快釋放DTS實例。 - 可選:將鼠標指針放置在已選擇對象框中待同步的Topic名上,單擊對象后出現的編輯,然后在彈出的對話框中設置Shardkey(即用于分區的key)。
- 上述配置完成后,單擊頁面右下角的預檢查并啟動。說明
- 在同步作業正式啟動之前,會先進行預檢查。只有預檢查通過后,才能成功啟動同步作業。
- 如果預檢查失敗,單擊具體檢查項后的,查看失敗詳情。
- 您可以根據提示修復后重新進行預檢查。
- 如無需修復告警檢測項,您也可以選擇確認屏蔽、忽略告警項并重新進行預檢查,跳過告警檢測項重新進行預檢查。
- 在預檢查對話框中顯示預檢查通過后,關閉預檢查對話框,同步作業將正式開始。
- 等待同步作業的鏈路初始化完成,直至處于同步中狀態。您可以在數據同步頁面,查看數據同步作業的狀態。
Topic結構定義說明
id
、name
、address
,由于在配置數據同步時選用的是舊版附加列規則,DTS會為業務字段添加dts_
的前綴。結構定義說明:
舊版附加列名稱 | 新版附加列名稱 | 數據類型 | 說明 |
---|---|---|---|
dts_record_id | new_dts_sync_dts_record_id | String | 增量日志的記錄ID,為該日志唯一標識。 說明
|
dts_operation_flag | new_dts_sync_dts_operation_flag | String | 操作類型,取值:
|
dts_instance_id | new_dts_sync_dts_instance_id | String | 數據庫的server ID。 |
dts_db_name | new_dts_sync_dts_db_name | String | 數據庫名稱。 |
dts_table_name | new_dts_sync_dts_table_name | String | 表名。 |
dts_utc_timestamp | new_dts_sync_dts_utc_timestamp | String | 操作時間戳,即日志的時間戳(UTC 時間)。 |
dts_before_flag | new_dts_sync_dts_before_flag | String | 所有列的值是否更新前的值,取值:Y或N。 |
dts_after_flag | new_dts_sync_dts_after_flag | String | 所有列的值是否更新后的值,取值:Y或N。 |
關于dts_before_flag和dts_after_flag的補充說明
對于不同的操作類型,增量日志中的dts_before_flag
和dts_after_flag
定義如下:
- INSERT
當操作類型為INSERT時,所有列的值為新插入的值,即為更新后的值,所以
dts_before_flag
取值為N,dts_after_flag
取值為Y,示例如下。 - UPDATE
當操作類型為UPDATE時,DTS會將UPDATE操作拆為兩條增量日志。這兩條增量日志的
dts_record_id
、dts_operation_flag
及dts_utc_timestamp
對應的值相同。第一條增量日志記錄了更新前的值,所以
dts_before_flag
取值為Y,dts_after_flag
取值為N。第二條增量日志記錄了更新后的值,所以dts_before_flag
取值為N,dts_after_flag
取值為Y,示例如下。 - DELETE
當操作類型為DELETE時,增量日志中所有的列值為被刪除的值,即列值不變,所以
dts_before_flag
取值為Y,dts_after_flag
取值為N,示例如下。
后續操作
配置完數據同步作業后,您可以對同步到DataHub實例中的數據執行計算分析。更多詳情,請參見阿里云實時計算。