附錄:消息格式
本文介紹寫入Kafka消息的消息結(jié)構(gòu)及各字段含義。
背景信息
同步整庫(kù)數(shù)據(jù)至kafka任務(wù),是將從上游數(shù)據(jù)源讀取的數(shù)據(jù),按照下面描述的JSON格式寫入到Kafka的topic。消息總體格式包括變更記錄的列信息、以及數(shù)據(jù)變更前后的狀態(tài)信息等。為確保消費(fèi)Kafka中數(shù)據(jù)時(shí)能夠準(zhǔn)確判斷同步任務(wù)進(jìn)度,同步任務(wù)還將定時(shí)產(chǎn)生op字段作為MHEARTBEAT的同步任務(wù)心跳記錄寫入Kafka的topic中。以下為您介紹寫入Kafka的消息總體格式、同步任務(wù)心跳消息格式及源端更改數(shù)據(jù)對(duì)應(yīng)的消息格式,關(guān)于字段類型及參數(shù)說(shuō)明等信息,詳情請(qǐng)參見字段類型和參數(shù)說(shuō)明。
消息總體格式
寫入Kafka消息的總體格式如下所示:
{
"schema": { //變更的元數(shù)據(jù)信息,僅指定列名與列類型信息
"dataColumn": [//變更的數(shù)據(jù)列信息,更新目標(biāo)表記錄內(nèi)容
{
"name": "id",
"type": "LONG"
},
{
"name": "name",
"type": "STRING"
},
{
"name": "binData",
"type": "BYTES"
},
{
"name": "ts",
"type": "DATE"
},
{
"name":"rowid",// 數(shù)據(jù)源為Oracle時(shí),rowid會(huì)放在數(shù)據(jù)列中
"type":"STRING"
}
],
"primaryKey": [
"pkName1",
"pkName2"
],
"source": {
"dbType": "mysql",
"dbVersion": "1.0.0",
"dbName": "myDatabase",
"schemaName": "mySchema",
"tableName": "tableName"
}
},
"payload": {
"before": {
"dataColumn":{
"id": 111,
"name":"scooter",
"binData": "[base64 string]",
"ts": 1590315269000,
"rowid": "AAIUMPAAFAACxExAAE"http://字符串類型,Oracle的rowid信息
}
},
"after": {
"dataColumn":{
"id": 222,
"name":"donald",
"binData": "[base64 string]",
"ts": 1590315269000,
"rowid": "AAIUMPAAFAACxExAAE"http://字符串類型,Oracle的rowid信息
}
},
"sequenceId":"XXX",//字符串類型,用于增全量數(shù)據(jù)合并的數(shù)據(jù)排序,
"scn":"xxxx",//字符串類型,Oracle的scn信息
"op": "INSERT/UPDATE_BEFOR/UPDATE_AFTER/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT...",//大小寫敏感,
"timestamp": {
"eventTime": 1,//必選,記錄源端庫(kù)發(fā)生變更的時(shí)間,毫秒精度的13位時(shí)間戳
"systemTime": 2,//可選,同步任務(wù)處理該條變更消息的時(shí)間,毫秒精度的13位時(shí)間戳
"checkpointTime": 3//可選,重置同步位點(diǎn)時(shí)的設(shè)置時(shí)間,毫秒精度的13位時(shí)間戳,一般等于eventTime
},
"ddl": {
"text": "ADD COLUMN ...",
"ddlMeta": "[SQLStatement serialized binary, expressed in base64 string]"
}
},
"version":"1.0.0"
}
關(guān)于字段類型及參數(shù)說(shuō)明等信息,詳情請(qǐng)參見字段類型和參數(shù)說(shuō)明。
同步任務(wù)心跳消息格式
{
"schema": {
"dataColumn": null,
"primaryKey": null,
"source": null
},
"payload": {
"before": null,
"after": null,
"sequenceId": null,
"timestamp": {
"eventTime": 1620457659000,
"checkpointTime": 1620457659000
},
"op": "MHEARTBEAT",
"ddl": null
},
"version": "0.0.1"
}
關(guān)于字段類型及參數(shù)說(shuō)明等信息,詳情請(qǐng)參見字段類型和參數(shù)說(shuō)明。
源端更改數(shù)據(jù)對(duì)應(yīng)的消息格式
源端插入數(shù)據(jù)對(duì)應(yīng)的Kafka消息格式:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": null, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000000", "timestamp": { "eventTime": 1620457896000, "systemTime": 1620457896977, "checkpointTime": 1620457896000 }, "op": "INSERT", "ddl": null }, "version": "0.0.1" }
源端更新數(shù)據(jù)對(duì)應(yīng)的Kafka消息格式:
當(dāng)未勾選源端update變更對(duì)應(yīng)一條Kafka記錄時(shí),源端更新數(shù)據(jù)對(duì)應(yīng)的Kafka消息格式包含兩條Kafka消息,分別描述更新前的數(shù)據(jù)狀態(tài)和更新后的數(shù)據(jù)狀態(tài)。具體消息格式如下:
更新前的數(shù)據(jù)狀態(tài)消息格式:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "after": null, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_BEFOR", "ddl": null }, "version": "0.0.1" }
更新后的數(shù)據(jù)狀態(tài)消息格式:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": null, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_AFTER", "ddl": null }, "version": "0.0.1" }
當(dāng)勾選了源端update變更對(duì)應(yīng)一條Kafka記錄時(shí),源端更新數(shù)據(jù)對(duì)應(yīng)的Kafka消息格式包含一條Kafka消息,描述更新前的數(shù)據(jù)狀態(tài)和更新后的數(shù)據(jù)狀態(tài)。具體消息格式如下:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_AFTER", "ddl": null }, "version": "0.0.1" }
源端刪除數(shù)據(jù)對(duì)應(yīng)的Kafka消息格式:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "after": null, "sequenceId": "1620457642589000002", "timestamp": { "eventTime": 1620458266000, "systemTime": 1620458266101, "checkpointTime": 1620458266000 }, "op": "DELETE", "ddl": null }, "version": "0.0.1" }
關(guān)于字段類型及參數(shù)說(shuō)明等信息,詳情請(qǐng)參見字段類型和參數(shù)說(shuō)明。
字段類型
寫入Kafka topic中的消息將從源端讀取數(shù)據(jù)映射為BOOLEAN、DOUBLE、DATE、BYTES、LONG,STRING六種類型,再以不同的JSON格式寫入kafka topic中。
類型 | 說(shuō)明 |
BOOLEAN | 對(duì)應(yīng)JSON中的布爾類型,取值為true,false |
DATE | 對(duì)應(yīng)JSON中的數(shù)值類型,取值為13位數(shù)字時(shí)間戳,精確到毫秒(ms)級(jí)。 |
BYTES | 對(duì)應(yīng)JSON中的字符串類型,寫入Kafka前會(huì)先對(duì)字節(jié)數(shù)組進(jìn)行base64編碼轉(zhuǎn)換為字符串,消費(fèi)時(shí)需要進(jìn)行base64解碼(編碼Base64.getEncoder().encodeToString(text.getBytes("UTF-8"));解碼Base64.getDecoder().decode(encodedText))。 |
STRING | 對(duì)應(yīng)JSON中的字符串類型 |
LONG | 對(duì)應(yīng)JSON中的數(shù)值類型 |
DOUBLE | 對(duì)應(yīng)JSON中的數(shù)值類型 |
參數(shù)說(shuō)明
以下為您介紹寫入Kafka的消息中的各個(gè)字段的含義及說(shuō)明。
一級(jí)元素 | 二級(jí)元素 | 說(shuō)明 |
schema | dataColumn | JSONArray類型,數(shù)據(jù)列的類型信息。dataColumn記錄上游數(shù)據(jù)變更記錄的所有列和對(duì)應(yīng)的列類型信息。變更操作包括數(shù)據(jù)庫(kù)對(duì)數(shù)據(jù)的更改(新增、刪除及修改)和數(shù)據(jù)庫(kù)表結(jié)構(gòu)等變更。
|
primaryKey | List類型,主鍵信息。 pk:主鍵名。 | |
source | Object 類型,源端數(shù)據(jù)庫(kù)或表信息。
| |
payload | before | JSONObject類型,修改前的數(shù)據(jù)。例如:數(shù)據(jù)源端為mysql,做了一次記錄的update操作,before字段存儲(chǔ)記錄被update之前的數(shù)據(jù)內(nèi)容。
|
after | 修改后的數(shù)據(jù)。格式同before相同。 | |
sequenceId | 字符串類型,Streamx產(chǎn)生,用于增量數(shù)據(jù)和全量數(shù)據(jù)合并的數(shù)據(jù)排序,每個(gè)streamx record都是唯一的。 說(shuō)明 對(duì)于從源端讀取的更新操作消息,會(huì)生成兩條寫入記錄,一條update before記錄和一條update after記錄,這兩條記錄的sequenceId相同。 | |
scn | 當(dāng)源端為Oracle數(shù)據(jù)庫(kù)時(shí)有效,對(duì)應(yīng)Oracle的scn信息。 | |
op | 對(duì)應(yīng)源端讀取到的消息類型,取值如下:
| |
timestamp | JSONObject 類型,本條數(shù)據(jù)的相關(guān)時(shí)間戳。
| |
ddl | 該字段只在更改數(shù)據(jù)庫(kù)的表結(jié)構(gòu)時(shí)才會(huì)填充數(shù)據(jù),更改數(shù)據(jù)(包括新增、刪除和修改)時(shí)對(duì)應(yīng)的ddl直接填充為null。
| |
version | 無(wú) | 格式的版本號(hào)。 |