本文介紹實時數據訂閱功能的數據消費格式定義說明和示例,默認格式為Debezium Format V2.0。
數據消費定義說明
數據消費格式如下代碼,字段說明如下表所示。
{
"payload": {
"op": "u",
"ts_ms": 1465491411815,
"before": {
"id": 1004,
"name": "Jane"
},
"after": {
"id": 1004,
"name": "Anne"
},
"source": {
"version": "v1.0",
"db": "ld-xxxx",
"namespace": "default",
"table": "customers",
"ts_ms": 1465491411807
}
},
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "op"
}, {
"type": "int64",
"optional": false,
"field": "ts_ms"
}, {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
}, {
"type": "string",
"optional": false,
"field": "name"
}
],
"optional": true,
"field": "before"
}, {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
}, {
"type": "string",
"optional": false,
"field": "name"
}
],
"optional": true,
"field": "after"
}, {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
}, {
"type": "string",
"optional": false,
"field": "db"
}, {
"type": "string",
"optional": false,
"field": "namespace"
}, {
"type": "string",
"optional": false,
"field": "table"
}, {
"type": "int64",
"optional": false,
"field": "ts_ms"
}
],
"optional": false,
"field": "source"
}
],
"optional": false
}
}
Field name | 描述 |
payload.op |
|
payload.ts_ms | 表示寫入Kafka的Unix時間戳。 |
payload.before | 表示導出整行數據更新前的值。 |
payload.after | 表示導出整行數據的最新值。 |
payload.source | 表示操作的額外信息,支持額外添加。
|
schema | 根據payload內容自動生成,對整個JSON的結構及所有字段類型進行說明。默認都包括schema信息。整個schema的結構是遞歸的。
|
HBase表的訂閱格式跟SQL表一致,但在結構上存在以下兩點不同:
HBase表在數據庫中存儲的是原始的二進制數據,通過數據訂閱消費到的數據是對二進制數據進行Base64編碼后的字符串。
HBase表存在列族的概念,因此非主鍵列的列名格式為
列族_列名
,主鍵列的列名固定為ROW
。
數據消費示例
SQL表
在Lindorm數據庫中創建如下Schema。
CREATE TABLE customers (id VARCHAR,first_name VARCHAR,last_name VARCHAR, PRIMARY KEY(id));
插入數據的數據消費示例。
{ "schema": {}, "payload": { "op": "c", "ts_ms": 1465491411815, "before": null, "after": { "id": "1004", "first_name": "Anne", "last_name": "Kretchmar" }, "source": { "version": "v1.0", "db": "ld-xxxx", "namespace": "default", "table": "customers", "ts_ms": 1465491411807 } } }
更新數據的數據消費示例。
{ "schema": {}, "payload": { "op": "u", "ts_ms": 1465491411815, "before": { "id": "1004", "first_name": "Anne Marie", "last_name": "Kretchmar" }, "after": { "id": "1004", "first_name": "Anne", "last_name": "Kretchmar" }, "source": { "version": "v1.0", "db": "ld-xxxx", "namespace": "default", "table": "customers", "ts_ms": 1465491411807 } } }
刪除一行數據的數據消費示例。
{ "schema": {}, "payload": { "op": "d", "ts_ms": 1465491411815, "before": { "id": "1004", "first_name": "Anne Marie", "last_name": "Kretchmar" }, "after": null, "source": { "version": "v1.0", "db": "ld-xxxx", "namespace": "default", "table": "customers", "ts_ms": 1465491411807 } } }
刪除列的數據消費示例。
{ "schema": {}, "payload": { "op": "u", "ts_ms": 1465491411815, "before": { "id": "1004", "first_name": "Anne Marie", "last_name": "Kretchmar" }, "after": { "id": "1004", "first_name": "Anne Marie" }, "source": { "version": "v1.0", "db": "ld-xxxx", "namespace": "default", "table": "customers", "ts_ms": 1465491411807 } } }
HBase表
向HBase表中插入一條數據:
Put put = new Put(Bytes.toBytes("user1"));
put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("lucky"));
table.put(put);
對應的變更消息為:
{
"schema": {},
"payload": {
"op": "c",
"ts_ms": 1725258859839,
"before": null,
"after": {
"ROW": "dXNlcjE=",
"f_name": "bHVja3k="
},
"source": {
"version": "v2.0",
"db": "ld-xxxx",
"namespace": "default",
"table": "customers",
"ts_ms": 1725258833727
}
}
}