日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過數據同步功能將Kafka數據同步至湖倉版

更新時間:

云原生數據倉庫 AnalyticDB MySQL 版支持新建Kafka同步鏈路,通過同步鏈路從指定時間位點,實時同步Kafka中的數據入湖,以滿足近實時產出、全量歷史歸檔、彈性分析等需求。本文主要介紹如何添加Kafka數據源,新建Kafka同步鏈路并啟動任務,以及數據同步后如何進行數據分析和數據源管理。

前提條件

  • AnalyticDB for MySQL集群的產品系列為企業版、基礎版或湖倉版

注意事項

  • Kafka中創建的Topic數據超過一定的時間會被自動清理,如果Topic數據過期,同時數據同步任務失敗,重新啟動同步任務時讀取不到被清理掉的數據,會有丟失數據的風險。因此請適當調大Topic數據的生命周期,并在數據同步任務失敗時及時聯系技術支持。

  • 獲取Kafka樣例數據在大于8KB的情況下,Kafka API會將數據進行截斷,導致解析樣例數據為JSON格式時失敗,從而無法自動生成字段映射信息。

計費說明

通過AnalyticDB for MySQL數據遷移功能遷移數據至OSS會產生以下費用。

使用流程

新建數據源

說明

如果您已添加Kafka數據源,可跳過該步驟,直接新建同步鏈路,詳情請參見新建同步鏈路

  1. 登錄云原生數據倉庫AnalyticDB MySQL控制臺,在左上角選擇集群所在地域。在左側導航欄,單擊集群列表,在企業版、基礎版或湖倉版頁簽下,單擊目標集群ID。

  2. 在左側導航欄,單擊數據接入>數據源管理

  3. 單擊右上角新建數據源

  4. 新建數據源頁面進行參數配置。參數說明如下表所示:

    參數名稱

    參數說明

    數據源類型

    選擇數據源類型Kafka

    數據源名稱

    系統默認按數據源類型與當前時間生成名稱,可按需修改。

    數據源描述

    數據源備注描述,例如湖倉應用場景、應用業務限制等。

    部署模式

    目前僅支持阿里云實例。

    Kafka實例

    Kafka實例ID。

    登錄云消息隊列 Kafka 版控制臺,在實例列表頁面查看實例ID。

    Kafka Topic

    在Kafka中創建的Topic名稱。

    登錄云消息隊列 Kafka 版控制臺,在目標實例的Topic 管理頁面查看Topic名稱。

    消息數據格式

    Kafka消息數據格式,目前僅支持JSON。

  5. 參數配置完成后,單擊創建

新建同步鏈路

  1. 在左側導航欄,單擊SLS/Kafka數據同步

  2. 在右上角,單擊新建同步鏈路

  3. 新建同步鏈路頁面,進行數據源的數據源及目標端配置目標庫表配置同步配置

    • 數據源及目標端配置的參數說明如下:

      參數名稱

      參數說明

      數據鏈路名稱

      數據鏈路名稱。系統默認按數據源類型與當前時間生成名稱,可按需修改。

      數據源

      選擇已有的Kafka數據源,也可新建數據源。

      目標端類型

      目前僅支持數據湖-OSS存儲

      OSS路徑

      AnalyticDB for MySQL湖倉數據在OSS中的存儲路徑。

      重要
      • 展示的Bucket是與AnalyticDB for MySQL集群同地域的所有Bucket,您可以任意選擇其中一個。請謹慎規劃存儲路徑,創建后不允許修改。

      • 建議選擇一個空目錄,且不能與其他任務的OSS路徑有相互前綴關系,防止數據覆蓋。例如,兩個數據同步任務的OSS路徑分別為oss://adb_demo/test/sls1/和oss://adb_demo/test/,OSS路徑有相互前綴關系,數據同步過程中會有數據覆蓋。

    • 目標庫表配置參數說明如下:

      參數名稱

      參數說明

      庫名

      同步到AnalyticDB for MySQL的數據庫名稱。如果不存在同名數據庫,將新建庫;如果已存在同名數據庫,數據會同步到已存在的數據庫中。庫名命名規則,詳見使用限制

      表名

      同步到AnalyticDB for MySQL的表名稱。如果庫中不存在同名表,將新建表;如果庫中已存在同名表,數據同步會失敗。表名命名規則,詳見使用限制

      樣例數據

      自動從Kafka Topic中獲取的最新數據作為樣例數據。

      說明

      Kafka Topic中的數據需為JSON格式,若存在其他格式的數據,數據同步時會報錯。

      JSON解析層級

      設置JSON的嵌套解析層數,取值說明:

      • 0:不做解析。

      • 1(默認值):解析一層。

      • 2:解析兩層。

      • 3:解析三層。

      • 4:解析四層。

      JSON的嵌套解析策略,請參見JSON解析層級和Schema字段推斷示例

      Schema字段映射

      展示樣例數據經過JSON解析后的Schema信息。可在此調整目標字段名,類型或按需增刪字段等。

      分區鍵設置

      為目標表設置分區鍵。建議按日志時間或者業務邏輯配置分區,以保證入湖與查詢性能。如不設置,則目標表默認沒有分區。

      目標端分區鍵的格式處理方法分為:時間格式化和指定分區字段。

      • 按日期時間分區,分區字段名請選擇一個日期時間字段。格式處理方法選擇時間格式化,選擇源端字段格式和目標分區格式。AnalyticDB for MySQL會按源端字段格式識別分區字段的值,并將其轉換為目標分區格式進行分區。例如,源字段為gmt_created,值為1711358834,源端字段格式為秒級精度時間戳,目標分區格式為yyyyMMdd,則會按20240325進行分區。

      • 按字段值分區,格式處理方法請選擇指定分區字段。

    • 同步配置的參數說明如下:

      參數名稱

      參數說明

      增量同步起始消費位點

      同步任務啟動時會從選擇的時間點開始消費Kafka數據。取值說明:

      • 最早位點(begin_cursor):自動從Kafka數據中最開始的時間點消費數據。

      • 最近位點(end_cursor):自動從Kafka數據中最近的時間點消費數據。

      • 自定義點位:您可以選擇任意一個時間點,系統則會從Kafka中第一條大于等于該時間點的數據開始消費。

      Job型資源組

      指定任務運行的Job型資源組。

      增量同步所需ACU數

      指定任務運行的Job型資源組ACU數。最小ACU數為2,最大ACU數為Job型資源組可用計算最大資源數。建議多指定一些ACU數,可以提升入湖性能及任務穩定性。

      說明

      創建數據同步任務時,使用Job型資源組中的彈性資源。數據同步任務會長期占用資源,因此系統會從資源組中扣除該任務占用的資源。例如,Job型資源組的計算最大資源為48 ACU,已創建了一個8 ACU的同步任務,在該資源組中創建另一個同步任務時,可選的最大ACU數為40。

      高級配置

      高級配置可以讓您對同步任務進行個性化的配置。如需進行個性化配置,請聯系技術支持。

  4. 上述參數配置完成后,單擊提交

啟動數據同步任務

  1. SLS/Kafka數據同步頁面,選擇創建成功的數據同步任務,在操作列單擊啟動

  2. 單擊右上角查詢,狀態變為正在啟動即數據同步任務啟動成功。

數據分析

同步任務成功后,您可以通過Spark Jar開發對同步到AnalyticDB for MySQL的數據進行分析。Spark開發的相關操作,請參見Spark開發編輯器Spark離線應用開發

  1. 在左側導航欄,單擊作業開發 > Spark Jar 開發

  2. 在默認模板中輸入示例語句,并單擊立即執行

    -- Here is just an example of SparkSQL. Modify the content and run your spark program.
    
    conf spark.driver.resourceSpec=medium;
    conf spark.executor.instances=2;
    conf spark.executor.resourceSpec=medium;
    conf spark.app.name=Spark SQL Test;
    conf spark.adb.connectors=oss;
    
    -- Here are your sql statements
    show tables from lakehouse20220413156_adbTest;
  3. 可選:應用列表頁簽中,單擊操作列的日志,查看Spark SQL運行的日志。

管理數據源

數據源管理頁面,您可以在操作列執行以下操作。

操作按鈕

說明

新建鏈路

快捷跳轉到創建此數據源下的數據同步或數據遷移任務。

查看

查看數據源的詳細配置。

編輯

編輯數據源屬性,如更新數據源名稱、描述等。

刪除

刪除當前數據源。

說明

當數據源下存在數據同步或數據遷移任務時,此數據源無法直接刪除,需先在SLS/Kafka數據同步頁面,單擊目標同步任務操作列的刪除,刪除數據同步或數據遷移任務。

JSON解析層級和Schema字段推斷示例

解析層級指按相應層數解析出JSON中的字段。如果用戶向Kafka發送的JSON數據如下。

{
  "name" : "zhangle",
  "age" : 18,
  "device" : {
    "os" : {
        "test":lag,
        "member":{
             "fa":zhangsan,
             "mo":limei
       }
     },
    "brand" : "none",
    "version" : "11.4.2"
  }
}

JSON數據解析后,對應0~4層的效果如下。

0層解析

不做任何解析,直接輸出原始JSON數據。

JSON字段

目標字段名

__value__

{ "name" : "zhangle","age" : 18, "device" : { "os" : { "test":lag,"member":{ "fa":zhangsan,"mo":limei }},"brand": "none","version" : "11.4.2" }}

__value__

1層解析

解析JSON的第一層字段。

JSON字段

目標字段名

name

zhangle

name

age

18

age

device

{ "os" : { "test":lag,"member":{ "fa":zhangsan,"mo":limei }},"brand": "none","version" : "11.4.2" }

device

2層解析

解析JSON的第二層字段。如果字段沒有嵌套則直接輸出,例如name和age字段直接輸出。如果字段中有嵌套,則輸出其子層級字段,例如device字段有嵌套,因此輸出其子層級device.osdevice.branddevice.version

重要

由于目標字段名不支持“.”,因此會自動替換為“_”。

JSON字段

目標字段名

name

zhangle

name

age

18

age

device.os

{ "test":lag,"member":{ "fa":zhangsan,"mo":limei }}

device_os

device.brand

none

device_brand

device.version

11.4.2

device_version

3層解析

JSON字段

目標字段名

name

zhangle

name

age

18

age

device.os.test

lag

device_os_test

device.os.member

{ "fa":zhangsan,"mo":limei }

device_os_member

device.brand

none

device_brand

device.version

11.4.2

device_version

4層解析

JSON字段

目標字段名

name

zhangle

name

age

18

age

device.os.test

lag

device_os_test

device.os.member.fa

zhangsan

device_os_member_fa

device.os.member.mo

lime

device_os_member_mo

device.brand

none

device_brand

device.version

11.4.2

device_version