日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

數據消費格式

本文介紹實時數據訂閱功能的數據消費格式定義說明和示例,默認格式為Debezium Format V2.0。

數據消費定義說明

數據消費格式如下代碼,字段說明如下表所示。

{ 
  "payload": { 
    "op": "u", 
    "ts_ms": 1465491411815, 
    "before": { 
      "id": 1004,
      "name": "Jane"
    },
    "after": { 
      "id": 1004,
      "name": "Anne"
    },
    "source": { 
      "version": "v1.0",
      "db": "ld-xxxx",
      "namespace": "default",
      "table": "customers",
      "ts_ms": 1465491411807
    }
  },
  "schema": { 
  "type": "struct",
  "fields": [
     {
        "type": "string",
        "optional": false,
        "field": "op"
   }, {
        "type": "int64",
        "optional": false,
        "field": "ts_ms"
   }, {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
     }, {
            "type": "string",
            "optional": false,
            "field": "name"
          }
        ],
        "optional": true,
        "field": "before"
   }, {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
     }, {
            "type": "string",
            "optional": false,
            "field": "name"
     }
        ],
        "optional": true,
        "field": "after"
   }, {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
         }, {
            "type": "string",
            "optional": false,
            "field": "db"
     }, {
            "type": "string",
            "optional": false,
            "field": "namespace"
     }, {
            "type": "string",
            "optional": false,
            "field": "table"
     }, {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
     }
        ],
        "optional": false,
        "field": "source"
   }
    ],
  "optional": false
 }
}

Field name

描述

payload.op

  • c表示對數據進行新增操作。

  • u表示對數據進行更新操作。

  • d表示對數據進行刪除操作。

  • r表示對數據進行全量導出操作,暫時不會涉及到。

payload.ts_ms

表示寫入Kafka的Unix時間戳。

payload.before

表示導出整行數據更新前的值。

payload.after

表示導出整行數據的最新值。

payload.source

表示操作的額外信息,支持額外添加。

  • version:消息對應Lindorm數據庫的版本號。

  • db:源集群。

  • namespace:表的namespace。

  • table:表名。

  • ts_ms:記錄更新Lindorm的時間。

schema

根據payload內容自動生成,對整個JSON的結構及所有字段類型進行說明。默認都包括schema信息。整個schema的結構是遞歸的。

  • field:字段名稱。

  • type:field字段名對應的字段類型。

  • name:schema字段所屬模式名稱。

  • fields:遞歸說明當前字段包含的子字段內容。

  • optional:該字段是否可選。

說明

HBase表的訂閱格式跟SQL表一致,但在結構上存在以下兩點不同:

  • HBase表在數據庫中存儲的是原始的二進制數據,通過數據訂閱消費到的數據是對二進制數據進行Base64編碼后的字符串。

  • HBase表存在列族的概念,因此非主鍵列的列名格式為列族_列名,主鍵列的列名固定為ROW

數據消費示例

SQL表

在Lindorm數據庫中創建如下Schema。

CREATE TABLE customers (id VARCHAR,first_name VARCHAR,last_name VARCHAR, PRIMARY KEY(id));
  • 插入數據的數據消費示例。

    {
      "schema": {}, 
      "payload": { 
        "op": "c", 
        "ts_ms": 1465491411815, 
        "before": null, 
        "after": { 
          "id": "1004",
          "first_name": "Anne",
          "last_name": "Kretchmar"
        },
        "source": { 
          "version": "v1.0",
          "db": "ld-xxxx",
          "namespace": "default",
          "table": "customers",
          "ts_ms": 1465491411807
        }
      }
    }
  • 更新數據的數據消費示例。

    {
      "schema": {},
      "payload": { 
        "op": "u", 
        "ts_ms": 1465491411815,
        "before": {
          "id": "1004",
          "first_name": "Anne Marie",
          "last_name": "Kretchmar"
        }, 
        "after": {
          "id": "1004",
          "first_name": "Anne",
          "last_name": "Kretchmar"
        },
        "source": {
          "version": "v1.0",
          "db": "ld-xxxx",
          "namespace": "default",
          "table": "customers",
          "ts_ms": 1465491411807
        }
      }
    }
  • 刪除一行數據的數據消費示例。

    {
      "schema": {},
      "payload": { 
        "op": "d", 
        "ts_ms": 1465491411815,
        "before": { 
          "id": "1004",
          "first_name": "Anne Marie",
          "last_name": "Kretchmar"
        }, 
        "after": null,
        "source": {
          "version": "v1.0",
          "db": "ld-xxxx",
          "namespace": "default",
          "table": "customers",
          "ts_ms": 1465491411807
        }
      }
    }
  • 刪除列的數據消費示例。

    {
      "schema": {},
      "payload": { 
        "op": "u", 
        "ts_ms": 1465491411815,
        "before": { 
          "id": "1004",
          "first_name": "Anne Marie",
          "last_name": "Kretchmar"
        }, 
        "after": { 
          "id": "1004",
          "first_name": "Anne Marie"
        }, 
        "source": {
          "version": "v1.0",
          "db": "ld-xxxx",
          "namespace": "default",
          "table": "customers",
          "ts_ms": 1465491411807
        }
      }
    }

HBase表

向HBase表中插入一條數據:

Put put = new Put(Bytes.toBytes("user1"));
put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("lucky"));
table.put(put);

對應的變更消息為:

{
    "schema": {},
    "payload": {
        "op": "c",
        "ts_ms": 1725258859839,
        "before": null,
        "after": {
            "ROW": "dXNlcjE=",
            "f_name": "bHVja3k="
        },
        "source": {
            "version": "v2.0",
            "db": "ld-xxxx",
            "namespace": "default",
            "table": "customers",
            "ts_ms": 1725258833727
        }
    }
}