Kafka是應(yīng)用較為廣泛的分布式、高吞吐量、高可擴展性消息隊列服務(wù),普遍用于日志收集、監(jiān)控數(shù)據(jù)聚合、流式數(shù)據(jù)處理、在線和離線分析等大數(shù)據(jù)領(lǐng)域,是大數(shù)據(jù)生態(tài)中不可或缺的產(chǎn)品之一。通過數(shù)據(jù)傳輸服務(wù)DTS(Data Transmission Service),您可以將PolarDB MySQL版同步至自建Kafka集群,擴展消息處理能力。

前提條件

  • Kafka集群的版本為0.10.1.0~2.7.0版本。
  • PolarDB MySQL版已開啟Binlog,詳情請參見如何開啟Binlog

注意事項

如果源數(shù)據(jù)庫沒有主鍵或唯一約束,且所有字段沒有唯一性,可能會導(dǎo)致目標數(shù)據(jù)庫中出現(xiàn)重復(fù)數(shù)據(jù)。

費用說明

同步類型鏈路配置費用
庫表結(jié)構(gòu)同步和全量數(shù)據(jù)同步不收費。
增量數(shù)據(jù)同步收費,詳情請參見計費概述

功能限制

  • 僅支持表粒度的數(shù)據(jù)同步。
  • 不支持自動調(diào)整同步對象。
    說明 如果在同步的過程中,對源庫中待同步的表執(zhí)行了重命名操作,且重命名后的名稱不在同步對象中,那么該表將不再被同步到目標Kafka集群中。如果該表還需要同步,那么您需要新增同步對象

操作步驟

  1. 購買數(shù)據(jù)同步作業(yè),詳情請參見購買流程
    說明 購買時,選擇源實例為PolarDB、目標實例為Kafka,并選擇同步拓撲為單向同步
  2. 登錄數(shù)據(jù)傳輸控制臺
    說明 若數(shù)據(jù)傳輸控制臺自動跳轉(zhuǎn)至數(shù)據(jù)管理DMS控制臺,您可以在右下角的jiqiren中單擊返回舊版,返回至舊版數(shù)據(jù)傳輸控制臺。
  3. 在左側(cè)導(dǎo)航欄,單擊數(shù)據(jù)同步
  4. 同步作業(yè)列表頁面頂部,選擇同步的目標實例所屬地域。
  5. 定位至已購買的數(shù)據(jù)同步實例,單擊配置同步鏈路
  6. 配置源實例及目標實例信息。
    同步通道的源和目標實例配置
    類別配置說明
    同步作業(yè)名稱DTS會自動生成一個同步作業(yè)名稱,建議配置具有業(yè)務(wù)意義的名稱(無唯一性要求),便于后續(xù)識別。
    源實例信息實例類型固定為PolarDB實例,不可變更。
    實例地區(qū)購買數(shù)據(jù)同步實例時選擇的源實例地域,不可變更。
    PolarDB實例ID選擇PolarDB MySQL版集群ID。
    數(shù)據(jù)庫賬號填入PolarDB MySQL版集群的數(shù)據(jù)庫賬號,需要具備待同步數(shù)據(jù)庫的讀權(quán)限。
    數(shù)據(jù)庫密碼填入該數(shù)據(jù)庫賬號的密碼。
    目標實例信息實例類型根據(jù)Kafka集群的部署位置選擇,本文以ECS上的自建數(shù)據(jù)庫為例介紹配置流程。
    說明 當選擇為其他實例類型時,您還需要執(zhí)行相應(yīng)的準備工作,詳情請參見準備工作概覽
    實例地區(qū)購買數(shù)據(jù)同步實例時選擇的目標實例地域,不可變更。
    ECS實例ID選擇部署了Kafka集群的ECS實例ID。
    數(shù)據(jù)庫類型選擇為Kafka
    端口Kafka集群對外提供服務(wù)的端口,默認為9092。
    數(shù)據(jù)庫賬號填入Kafka集群的用戶名,如Kafka集群未開啟驗證可不填寫。
    數(shù)據(jù)庫密碼填入Kafka集群用戶名對應(yīng)的密碼,如Kafka集群未開啟驗證可不填寫。
    Topic單擊右側(cè)的獲取Topic列表,然后在下拉框中選擇具體的Topic。
    Kafka版本根據(jù)目標Kafka集群版本,選擇對應(yīng)的版本信息。
    連接方式根據(jù)業(yè)務(wù)及安全需求,選擇非加密連接SCRAM-SHA-256
  7. 單擊頁面右下角的授權(quán)白名單并進入下一步
    說明
    • 如果源或目標數(shù)據(jù)庫是阿里云數(shù)據(jù)庫實例(例如RDS MySQL云數(shù)據(jù)庫MongoDB版等)或ECS上的自建數(shù)據(jù)庫,DTS會自動將對應(yīng)地區(qū)DTS服務(wù)的IP地址添加到阿里云數(shù)據(jù)庫實例的白名單或ECS的安全規(guī)則中,您無需手動添加,請參見DTS服務(wù)器的IP地址段
    • DTS任務(wù)完成或釋放后,建議您手動刪除添加的DTS服務(wù)器IP地址段。
  8. 配置同步策略和同步對象信息。
    配置同步對象
    配置說明
    投遞到kafka的數(shù)據(jù)格式同步到Kafka集群中的數(shù)據(jù)以avro格式或者Canal Json格式存儲,定義詳情請參見Kafka集群的數(shù)據(jù)存儲格式
    同步到Kafka Partition策略根據(jù)業(yè)務(wù)需求選擇同步的策略,詳細介紹請參見Kafka Partition同步策略說明
    同步對象源庫對象區(qū)域框中,選擇需要同步的對象(選擇的粒度為表),然后單擊向右箭頭圖標將其移動到已選對象區(qū)域框中。
    說明 DTS會自動將表名映射為步驟6選擇的Topic名稱。如果需要更換同步的目標Topic,請參見步驟9
    映射名稱更改

    如需更改同步對象在目標實例中的名稱,請使用對象名映射功能,詳情請參見庫表列映射

    源、目標庫無法連接重試時間
    當源、目標庫無法連接時,DTS默認重試720分鐘(即12小時),您也可以自定義重試時間。如果DTS在設(shè)置的時間內(nèi)重新連接上源、目標庫,同步任務(wù)將自動恢復(fù)。否則,同步任務(wù)將失敗。
    說明 由于連接重試期間,DTS將收取任務(wù)運行費用,建議您根據(jù)業(yè)務(wù)需要自定義重試時間,或者在源和目標庫實例釋放后盡快釋放DTS實例。
  9. 可選:已選擇對象區(qū)域框中,將鼠標指針放置在目標Topic名上,然后單擊Topic名后出現(xiàn)的編輯,在彈出的對話框中設(shè)置源表在目標Kafka集群中的Topic名稱、Topic的Partition數(shù)量、Partition Key等信息。
    設(shè)置topic
    配置說明
    數(shù)據(jù)庫表名稱設(shè)置源表同步到的目標Topic名稱。
    說明 如果設(shè)置的Topic名稱在目標Kafka集群中不存在,您還需要設(shè)置該Topic的Partition數(shù)量。
    過濾條件
    • 過濾條件支持標準的SQL WHERE語句(僅支持=!=<>操作符),只有滿足WHERE條件的數(shù)據(jù)才會被同步到目標Topic。本案例填入id>1000
    • 過濾條件中如需使用引號,請使用單引號('),例如address in('hangzhou','shanghai')
    設(shè)置新Topic的Partition數(shù)量在下拉列表中,選擇新Topic的Partition數(shù)量。
    說明 只有當設(shè)置的目標Topic名稱在目標Kafka集群中不存在時,您才需要配置本參數(shù)。
    設(shè)置Partition Key當您在步驟8中選擇同步策略為按主鍵的hash值投遞到不同Partition時,您可以配置本參數(shù),指定單個或多個列作為Partition Key來計算Hash值,DTS將根據(jù)計算得到的Hash值將不同的行投遞到目標Topic的各Partition中。
  10. 上述配置完成后單擊頁面右下角的下一步
  11. 配置同步初始化的高級配置信息。
    Kafka同步初始化高級配置
    配置說明
    同步初始化默認選擇結(jié)構(gòu)初始化全量數(shù)據(jù)初始化,DTS會在增量數(shù)據(jù)同步之前,將源庫中待同步對象的結(jié)構(gòu)和存量數(shù)據(jù),同步到目標庫。
    過濾選項默認選擇忽略增量同步階段的 DDL,即增量同步階段源庫執(zhí)行的DDL操作不會被DTS同步至目標庫。
  12. 上述配置完成后,單擊頁面右下角的預(yù)檢查并啟動
    說明
    • 在同步作業(yè)正式啟動之前,會先進行預(yù)檢查。只有預(yù)檢查通過后,才能成功啟動同步作業(yè)。
    • 如果預(yù)檢查失敗,單擊具體檢查項后的提示,查看失敗詳情。
      • 您可以根據(jù)提示修復(fù)后重新進行預(yù)檢查。
      • 如無需修復(fù)告警檢測項,您也可以選擇確認屏蔽忽略告警項并重新進行預(yù)檢查,跳過告警檢測項重新進行預(yù)檢查。
  13. 預(yù)檢查對話框中顯示預(yù)檢查通過后,關(guān)閉預(yù)檢查對話框,數(shù)據(jù)同步作業(yè)正式開始。
    您可以在數(shù)據(jù)同步頁面,查看數(shù)據(jù)同步狀態(tài)。查看數(shù)據(jù)同步狀態(tài)