DTS支持選擇遷移或同步到Kafka集群的數據存儲格式,本文為您介紹數據格式的定義說明,方便您根據定義解析數據。
數據存儲格式
DTS支持將寫入至Kafka集群的數據存儲為如下三種格式:
DTS Avro:一種數據序列化格式,可以將數據結構或對象轉化成便于存儲或傳輸的格式。
Shareplex Json:數據復制軟件Shareplex讀取源庫中的數據,將數據寫入至Kafka集群時,數據存儲格式為Shareplex Json。
Canal Json:Canal解析數據庫增量日志,并將增量數據傳輸至Kafka集群,數據存儲格式為Canal Json。
DTS Avro
DTS Avro為默認存儲格式。您需要根據DTS Avro的schema定義進行數據解析,schema定義詳情請參見遷移或同步至Kafka集群中的數據均DTS Avro的schema定義。
DTS Avro格式中的DDL語句為String類型。
Shareplex Json
參數 | 說明 |
| 數據庫中事務的提交時間,格式為yyyy-MM-ddTHH:mm:ssZ(UTC時間)。 |
| 提交事務的用戶ID。 |
| 數據操作類型,包括INSERT, UPDATE, DELETE, TRUNCATE, DROP COLUMN, UPDATE BEFORE, UPDATE AFTER。 |
| 系統變化編號SCN(System Change Number),用以標識數據庫在某個確切時刻提交事務的版本。每個已提交的事務分配一個唯一的SCN。 |
| 用于定位數據庫中一條記錄的一個相對唯一地址值。 |
| 事務ID。 |
| 事務內部的操作序號,從1開始記錄。 |
| 事務內部的操作總數。 |
| 表名。 |
| 事務內部操作的索引,格式為 |
| 事務提交至目標庫的時間。 |
示例如下:
插入數據:
{ "meta": { "time": "2017-06-16T14:24:34", "userid": 84, "op": "ins", "scn": "14589063118712", "rowid": "AAATGpAAIAAItcIAAA", "trans": "7.0.411499", "seq": 1, "size": 11, "table": "CL_BIZ1.MIO_LOG", "idx": "1/11", "posttime": "2017-06-16T14:33:52" }, "data": { "MIO_LOG_ID": "32539737" } }
更新數據:
{ "meta": { "time": "2017-06-16T15:38:13", "userid": 84, "op": "upd", "table": "CL_BIZ1.MIO_LOG" …. }, "data": { "CNTR_NO": "1171201606" }, "key": { "MIO_LOG_ID": "32537893", "PLNMIO_REC_ID": "31557806", "POL_CODE": null, "CNTR_TYPE": null, "CNTR_NO": "1171201606syui26" } }
刪除數據:
{ "meta": { "time": "2017-06-16T15:51:35", "userid": 84, "op": "del", }, "data": { "MIO_LOG_ID": "32539739", "PLNMIO_REC_ID": "31557806", "POL_CODE": null, "CNTR_TYPE": null, "CG_NO": null } }
Canal Json
參數 | 說明 |
| 數據庫名稱。 |
| 操作在源庫的執行時間,13位Unix時間戳,單位為毫秒。 說明 Unix時間戳轉換工具可用搜索引擎獲取。 |
| 操作的序列號。 |
| 是否是DDL操作。
|
| 字段的數據類型。 說明 不支持精度等數據類型的參數信息。 |
| 變更前或變更后的數據。 說明 2022年3月20日之前創建的DTS訂閱實例, |
| 主鍵名稱。 |
| SQL語句。 |
| 經轉換處理后的字段類型,如unsigned int會被轉化為Long,unsigned long會被轉換為BigDecimal。 |
| 表名。 |
| 操作開始寫入到目標庫的時間,13位Unix時間戳,單位為毫秒。 說明 Unix時間戳轉換工具可用搜索引擎獲取。 |
| 操作的類型,比如DELETE、UPDATE、INSERT。 |
| 全局事務標識GTID(Global Transaction IDentifier),具有全局唯一性,一個事務對應一個GTID。 |
更新數據的示例如下:
2022年3月20日之前創建的DTS訂閱實例,源表的DELETE
語句同步到kafka,其中old
的值是數據,data
的值是null。為了和開源社區保持一致,2022年3月20日起創建或重啟的DTS訂閱實例,data
的值是數據,old
的值是null。
2022年3月20日之前創建的DTS訂閱實例
{
"old": [
{
"shipping_type": "aaa"
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
2022年3月20日起創建或重啟的DTS訂閱實例
{
"data": [
{
"id": "500000287",
"shipping_type": null
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
DDL操作示例如下:
{
"database":"dbname",表示同步的數據庫名稱
"es":1600161894000,表示源庫數據寫入到binlog的時間
"id":58,DTS緩存的偏移量
"isDdl":true,是否同步DDL
"sql":"eg:createxxx",Binlog的DDL語句
"table":"tablename",同步的表名
"ts":1600161894771,DTS將數據寫入目標的時間
"type":"DDL"
}