日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

附錄:消息格式

更新時(shí)間:

本文介紹寫入Kafka消息的消息結(jié)構(gòu)及各字段含義。

背景信息

同步整庫(kù)數(shù)據(jù)至kafka任務(wù),是將從上游數(shù)據(jù)源讀取的數(shù)據(jù),按照下面描述的JSON格式寫入到Kafka的topic。消息總體格式包括變更記錄的列信息、以及數(shù)據(jù)變更前后的狀態(tài)信息等。為確保消費(fèi)Kafka中數(shù)據(jù)時(shí)能夠準(zhǔn)確判斷同步任務(wù)進(jìn)度,同步任務(wù)還將定時(shí)產(chǎn)生op字段作為MHEARTBEAT的同步任務(wù)心跳記錄寫入Kafka的topic中。以下為您介紹寫入Kafka的消息總體格式同步任務(wù)心跳消息格式源端更改數(shù)據(jù)對(duì)應(yīng)的消息格式,關(guān)于字段類型及參數(shù)說(shuō)明等信息,詳情請(qǐng)參見字段類型參數(shù)說(shuō)明

消息總體格式

寫入Kafka消息的總體格式如下所示:

{
    "schema": { //變更的元數(shù)據(jù)信息,僅指定列名與列類型信息
        "dataColumn": [//變更的數(shù)據(jù)列信息,更新目標(biāo)表記錄內(nèi)容
            {
                "name": "id",
                "type": "LONG"
            },
            {
                "name": "name",
                "type": "STRING"
            },
            {
                "name": "binData",
                "type": "BYTES"
            },
            {
                "name": "ts",
                "type": "DATE"
            },
            {
              "name":"rowid",// 數(shù)據(jù)源為Oracle時(shí),rowid會(huì)放在數(shù)據(jù)列中
              "type":"STRING"
            }
        ],
        "primaryKey": [
            "pkName1",
            "pkName2"
        ],
        "source": {
            "dbType": "mysql",
            "dbVersion": "1.0.0",
            "dbName": "myDatabase",
            "schemaName": "mySchema",
            "tableName": "tableName"
        }
    },
    "payload": {
        "before": {
            "dataColumn":{
                "id": 111,
                "name":"scooter",
                "binData": "[base64 string]",
                "ts": 1590315269000,
                "rowid": "AAIUMPAAFAACxExAAE"http://字符串類型,Oracle的rowid信息
            }
        },
        "after": {
            "dataColumn":{
                "id": 222,
                "name":"donald",
                "binData": "[base64 string]",
                "ts": 1590315269000,
                "rowid": "AAIUMPAAFAACxExAAE"http://字符串類型,Oracle的rowid信息
            }
        },
        "sequenceId":"XXX",//字符串類型,用于增全量數(shù)據(jù)合并的數(shù)據(jù)排序,
        "scn":"xxxx",//字符串類型,Oracle的scn信息
        "op": "INSERT/UPDATE_BEFOR/UPDATE_AFTER/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT...",//大小寫敏感,
        "timestamp": {
            "eventTime": 1,//必選,記錄源端庫(kù)發(fā)生變更的時(shí)間,毫秒精度的13位時(shí)間戳
            "systemTime": 2,//可選,同步任務(wù)處理該條變更消息的時(shí)間,毫秒精度的13位時(shí)間戳
            "checkpointTime": 3//可選,重置同步位點(diǎn)時(shí)的設(shè)置時(shí)間,毫秒精度的13位時(shí)間戳,一般等于eventTime
        },
        "ddl": {
            "text": "ADD COLUMN ...",
            "ddlMeta": "[SQLStatement serialized binary, expressed in base64 string]"
        }
    },
    "version":"1.0.0"
}
說(shuō)明

關(guān)于字段類型及參數(shù)說(shuō)明等信息,詳情請(qǐng)參見字段類型參數(shù)說(shuō)明

同步任務(wù)心跳消息格式

{
    "schema": {
        "dataColumn": null,
        "primaryKey": null,
        "source": null
    },
    "payload": {
        "before": null,
        "after": null,
        "sequenceId": null,
        "timestamp": {
            "eventTime": 1620457659000,
            "checkpointTime": 1620457659000
        },
        "op": "MHEARTBEAT",
        "ddl": null
    },
    "version": "0.0.1"
}
說(shuō)明

關(guān)于字段類型及參數(shù)說(shuō)明等信息,詳情請(qǐng)參見字段類型參數(shù)說(shuō)明

源端更改數(shù)據(jù)對(duì)應(yīng)的消息格式

  • 源端插入數(shù)據(jù)對(duì)應(yīng)的Kafka消息格式:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "job",
                    "type": "STRING"
                },
                {
                    "name": "sex",
                    "type": "STRING"
                },
                {
                    "name": "#alibaba_rds_row_id#",
                    "type": "LONG"
                }
            ],
            "primaryKey": null,
            "source": {
                "dbType": "MySQL",
                "dbName": "pkset_test",
                "tableName": "pkset_test_no_pk"
            }
        },
        "payload": {
            "before": null,
            "after": {
                "dataColumn": {
                    "name": "name11",
                    "job": "job11",
                    "sex": "man",
                    "#alibaba_rds_row_id#": 15
                }
            },
            "sequenceId": "1620457642589000000",
            "timestamp": {
                "eventTime": 1620457896000,
                "systemTime": 1620457896977,
                "checkpointTime": 1620457896000
            },
            "op": "INSERT",
            "ddl": null
        },
        "version": "0.0.1"
    }
  • 源端更新數(shù)據(jù)對(duì)應(yīng)的Kafka消息格式:

    • 當(dāng)未勾選源端update變更對(duì)應(yīng)一條Kafka記錄時(shí),源端更新數(shù)據(jù)對(duì)應(yīng)的Kafka消息格式包含兩條Kafka消息,分別描述更新前的數(shù)據(jù)狀態(tài)和更新后的數(shù)據(jù)狀態(tài)。具體消息格式如下:

      更新前的數(shù)據(jù)狀態(tài)消息格式:

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "man",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "after": null,
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_BEFOR",
              "ddl": null
          },
          "version": "0.0.1"
      }

      更新后的數(shù)據(jù)狀態(tài)消息格式:

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": null,
              "after": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "woman",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_AFTER",
              "ddl": null
          },
          "version": "0.0.1"
      }
    • 當(dāng)勾選了源端update變更對(duì)應(yīng)一條Kafka記錄時(shí),源端更新數(shù)據(jù)對(duì)應(yīng)的Kafka消息格式包含一條Kafka消息,描述更新前的數(shù)據(jù)狀態(tài)和更新后的數(shù)據(jù)狀態(tài)。具體消息格式如下:

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "man",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "after": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "woman",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_AFTER",
              "ddl": null
          },
          "version": "0.0.1"
      }
  • 源端刪除數(shù)據(jù)對(duì)應(yīng)的Kafka消息格式:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "job",
                    "type": "STRING"
                },
                {
                    "name": "sex",
                    "type": "STRING"
                },
                {
                    "name": "#alibaba_rds_row_id#",
                    "type": "LONG"
                }
            ],
            "primaryKey": null,
            "source": {
                "dbType": "MySQL",
                "dbName": "pkset_test",
                "tableName": "pkset_test_no_pk"
            }
        },
        "payload": {
            "before": {
                "dataColumn": {
                    "name": "name11",
                    "job": "job11",
                    "sex": "woman",
                    "#alibaba_rds_row_id#": 15
                }
            },
            "after": null,
            "sequenceId": "1620457642589000002",
            "timestamp": {
                "eventTime": 1620458266000,
                "systemTime": 1620458266101,
                "checkpointTime": 1620458266000
            },
            "op": "DELETE",
            "ddl": null
        },
        "version": "0.0.1"
    }
說(shuō)明

關(guān)于字段類型及參數(shù)說(shuō)明等信息,詳情請(qǐng)參見字段類型參數(shù)說(shuō)明

字段類型

寫入Kafka topic中的消息將從源端讀取數(shù)據(jù)映射為BOOLEAN、DOUBLE、DATE、BYTES、LONG,STRING六種類型,再以不同的JSON格式寫入kafka topic中。

類型

說(shuō)明

BOOLEAN

對(duì)應(yīng)JSON中的布爾類型,取值為true,false

DATE

對(duì)應(yīng)JSON中的數(shù)值類型,取值為13位數(shù)字時(shí)間戳,精確到毫秒(ms)級(jí)。

BYTES

對(duì)應(yīng)JSON中的字符串類型,寫入Kafka前會(huì)先對(duì)字節(jié)數(shù)組進(jìn)行base64編碼轉(zhuǎn)換為字符串,消費(fèi)時(shí)需要進(jìn)行base64解碼(編碼Base64.getEncoder().encodeToString(text.getBytes("UTF-8"));解碼Base64.getDecoder().decode(encodedText))

STRING

對(duì)應(yīng)JSON中的字符串類型

LONG

對(duì)應(yīng)JSON中的數(shù)值類型

DOUBLE

對(duì)應(yīng)JSON中的數(shù)值類型

參數(shù)說(shuō)明

以下為您介紹寫入Kafka的消息中的各個(gè)字段的含義及說(shuō)明。

一級(jí)元素

二級(jí)元素

說(shuō)明

schema

dataColumn

JSONArray類型,數(shù)據(jù)列的類型信息。dataColumn記錄上游數(shù)據(jù)變更記錄的所有列和對(duì)應(yīng)的列類型信息。變更操作包括數(shù)據(jù)庫(kù)對(duì)數(shù)據(jù)的更改(新增、刪除及修改)和數(shù)據(jù)庫(kù)表結(jié)構(gòu)等變更。

  • name:列名

  • type:列類型

primaryKey

List類型,主鍵信息。

pk:主鍵名。

source

Object 類型,源端數(shù)據(jù)庫(kù)或表信息。

  • dbType:String類型,數(shù)據(jù)庫(kù)類型

  • dbVersion:String類型,數(shù)據(jù)庫(kù)版本

  • dbName:String類型,數(shù)據(jù)庫(kù)名

  • schemaName:String類型,Schema名(針對(duì)Postgres和SQL Server等)

  • tableName:String 類型,數(shù)據(jù)表名

payload

before

JSONObject類型,修改前的數(shù)據(jù)。例如:數(shù)據(jù)源端為mysql,做了一次記錄的update操作,before字段存儲(chǔ)記錄被update之前的數(shù)據(jù)內(nèi)容。

  • 在從源端讀取到更新、刪除操作消息時(shí),在寫入記錄中填充該字段。

  • dataColumn:JSONObject類型,表示數(shù)據(jù)信息。格式為列名:列值, 列名為字符串,列值BOOLEAN、DOUBLE、DATE、BYTES、LONG,STRING。

after

修改后的數(shù)據(jù)。格式同before相同。

sequenceId

字符串類型,Streamx產(chǎn)生,用于增量數(shù)據(jù)和全量數(shù)據(jù)合并的數(shù)據(jù)排序,每個(gè)streamx record都是唯一的。

說(shuō)明

對(duì)于從源端讀取的更新操作消息,會(huì)生成兩條寫入記錄,一條update before記錄和一條update after記錄,這兩條記錄的sequenceId相同。

scn

當(dāng)源端為Oracle數(shù)據(jù)庫(kù)時(shí)有效,對(duì)應(yīng)Oracle的scn信息。

op

對(duì)應(yīng)源端讀取到的消息類型,取值如下:

  • INSERT:數(shù)據(jù)插入

  • UPDATE_BEFOR:數(shù)據(jù)更新前

  • UPDATE_AFTER:數(shù)據(jù)更新后

  • DELETE:數(shù)據(jù)刪除

  • TRANSACTION_BEGIN:數(shù)據(jù)庫(kù)事務(wù)開始

  • TRANSACTION_END:數(shù)據(jù)庫(kù)事務(wù)結(jié)束

  • CREATE:數(shù)據(jù)庫(kù)建表

  • ALTER:數(shù)據(jù)庫(kù)表變更

  • QUERY:數(shù)據(jù)庫(kù)變更的原始SQL

  • TRUNCATE:數(shù)據(jù)庫(kù)表清空

  • RENAME:數(shù)據(jù)庫(kù)表重命名

  • CINDEX:創(chuàng)建索引

  • DINDEX:刪除索引

  • MHEARTBEAT:用于在源端無(wú)新增數(shù)據(jù)時(shí)標(biāo)識(shí)同步仍正常進(jìn)行的心跳消息

timestamp

JSONObject 類型,本條數(shù)據(jù)的相關(guān)時(shí)間戳。

  • eventTime:Long類型,記錄源端庫(kù)發(fā)生變更的時(shí)間,毫秒精度的13位時(shí)間戳。

  • systemTime:Long類型,同步任務(wù)處理該條變更消息的時(shí)間,毫秒精度的13位時(shí)間戳。

  • checkpointTime:Long類型,重置同步位點(diǎn)時(shí)的設(shè)置時(shí)間,毫秒精度的13位時(shí)間戳,一般與eventTime值一致。

ddl

該字段只在更改數(shù)據(jù)庫(kù)的表結(jié)構(gòu)時(shí)才會(huì)填充數(shù)據(jù),更改數(shù)據(jù)(包括新增、刪除和修改)時(shí)對(duì)應(yīng)的ddl直接填充為null。

  • text:String類型,數(shù)據(jù)庫(kù)DDL語(yǔ)句文本。

  • ddlMeta:String類型,將數(shù)據(jù)庫(kù)ddl類型變更記錄到一個(gè)Java對(duì)象,使用對(duì)象序列化后再進(jìn)行base64編碼得到的字符串。

version

無(wú)

格式的版本號(hào)。