日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Kafka集群的數據存儲格式

DTS支持選擇遷移或同步到Kafka集群的數據存儲格式,本文為您介紹數據格式的定義說明,方便您根據定義解析數據。

數據存儲格式

DTS支持將寫入至Kafka集群的數據存儲為如下三種格式:

  • DTS Avro:一種數據序列化格式,可以將數據結構或對象轉化成便于存儲或傳輸的格式。

  • Shareplex Json:數據復制軟件Shareplex讀取源庫中的數據,將數據寫入至Kafka集群時,數據存儲格式為Shareplex Json。

  • Canal JsonCanal解析數據庫增量日志,并將增量數據傳輸至Kafka集群,數據存儲格式為Canal Json。

DTS Avro

DTS Avro為默認存儲格式。您需要根據DTS Avro的schema定義進行數據解析,schema定義詳情請參見遷移或同步至Kafka集群中的數據均DTS Avro的schema定義

說明

DTS Avro格式中的DDL語句為String類型。

Shareplex Json

表 1. 參數說明

參數

說明

time

數據庫中事務的提交時間,格式為yyyy-MM-ddTHH:mm:ssZ(UTC時間)。

userid

提交事務的用戶ID。

op

數據操作類型,包括INSERT, UPDATE, DELETE, TRUNCATE, DROP COLUMN, UPDATE BEFORE, UPDATE AFTER。

scn

系統變化編號SCN(System Change Number),用以標識數據庫在某個確切時刻提交事務的版本。每個已提交的事務分配一個唯一的SCN。

rowid

用于定位數據庫中一條記錄的一個相對唯一地址值。

trans

事務ID。

seq

事務內部的操作序號,從1開始記錄。

size

事務內部的操作總數。

table

表名。

idx

事務內部操作的索引,格式為seq/size,例如1/11表示,在操作總數為11的事務內部,該操作的序號為1。

posttime

事務提交至目標庫的時間。

示例如下:

  • 插入數據:

    {
        "meta": {
            "time": "2017-06-16T14:24:34", 
            "userid": 84,                                    
            "op": "ins",                                   
              "scn": "14589063118712",                  
              "rowid": "AAATGpAAIAAItcIAAA",      
            "trans": "7.0.411499",                 
            "seq": 1,                                          
            "size": 11,                                         
            "table": "CL_BIZ1.MIO_LOG",       
              "idx": "1/11",                                       
            "posttime": "2017-06-16T14:33:52"
        },
        "data": {
            "MIO_LOG_ID": "32539737"
         }
    }
  • 更新數據:

    {
        "meta": {
            "time": "2017-06-16T15:38:13",
            "userid": 84,
            "op": "upd",                             
            "table": "CL_BIZ1.MIO_LOG"
            ….
        },
        "data": {                                          
            "CNTR_NO": "1171201606"
        },
        "key": {                                            
            "MIO_LOG_ID": "32537893",
            "PLNMIO_REC_ID": "31557806",
            "POL_CODE": null,
            "CNTR_TYPE": null,
            "CNTR_NO": "1171201606syui26"
        }
    }
  • 刪除數據:

    {
        "meta": {
            "time": "2017-06-16T15:51:35",
            "userid": 84,
            "op": "del",                      
         },
        "data": {                                    
            "MIO_LOG_ID": "32539739",
            "PLNMIO_REC_ID": "31557806",
            "POL_CODE": null,
            "CNTR_TYPE": null,
            "CG_NO": null
         }
    }

Canal Json

表 2. 參數說明

參數

說明

database

數據庫名稱。

es

操作在源庫的執行時間,13位Unix時間戳,單位為毫秒。

說明

Unix時間戳轉換工具可用搜索引擎獲取。

id

操作的序列號。

isDdl

是否是DDL操作。

  • true:是。

  • false:否。

mysqlType

字段的數據類型。

說明

不支持精度等數據類型的參數信息。

olddata

變更前或變更后的數據。

說明

2022年3月20日之前創建的DTS訂閱實例,old的值是變更后的數據(默認包含所有列的數據,而不是只包含被修改列的數據),data的值是變更前的數據。為了和開源社區保持一致,2022年3月20日起創建或重啟的DTS訂閱實例,data的值是變更后的數據,old的值是變更前的數據。

pkNames

主鍵名稱。

sql

SQL語句。

sqlType

經轉換處理后的字段類型,如unsigned int會被轉化為Long,unsigned long會被轉換為BigDecimal。

table

表名。

ts

操作開始寫入到目標庫的時間,13位Unix時間戳,單位為毫秒。

說明

Unix時間戳轉換工具可用搜索引擎獲取。

type

操作的類型,比如DELETE、UPDATE、INSERT。

gtid

全局事務標識GTID(Global Transaction IDentifier),具有全局唯一性,一個事務對應一個GTID。

更新數據的示例如下:

說明

2022年3月20日之前創建的DTS訂閱實例,源表的DELETE語句同步到kafka,其中old的值是數據,data的值是null。為了和開源社區保持一致,2022年3月20日起創建或重啟的DTS訂閱實例,data的值是數據,old的值是null。

2022年3月20日之前創建的DTS訂閱實例

{
    "old": [
        {
            "shipping_type": "aaa"
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint", 
        "shipping_type": "varchar"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}

2022年3月20日起創建或重啟的DTS訂閱實例

{
    "data": [
        {
            "id": "500000287", 
            "shipping_type": null
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint", 
        "shipping_type": "varchar"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}
            

DDL操作示例如下:

{
    "database":"dbname",表示同步的數據庫名稱
    "es":1600161894000,表示源庫數據寫入到binlog的時間
    "id":58,DTS緩存的偏移量
    "isDdl":true,是否同步DDL
    "sql":"eg:createxxx",Binlog的DDL語句
    "table":"tablename",同步的表名
    "ts":1600161894771,DTS將數據寫入目標的時間
    "type":"DDL"
}