本文為您介紹DataHub不同數據類型對應操作的支持情況,不同數據類型的分片策略、數據格式及相關消息示例。
不同數據類型對應操作的支持情況
Topic是DataHub訂閱和發布的最小單位,用戶可以用Topic來表示一類或者一種流數據。目前支持Tuple與Blob兩種類型:
DataHub類型 | 寫入DML消息 | 寫入上游心跳消息 | 寫入DDL消息 | 源表和目標topic映射方式 | 數據類型 |
Tuple | 支持 | 不支持 | 不支持 | 單表對單topic | DataHub支持的類型 |
Blob | 支持 | 支持 | 支持 | 單庫(多表)對單topic | Blob二進制數據 |
Tuple類型由于schema各字段在topic創建后無法更改,所以適用于schema固定,且源表無add column、drop column等改變schema的DDL操作場景。Tuple類型不支持保留上游傳遞的DDL消息以及心跳消息,即Tuple不能將此類消息透穿給消費DataHub的下游。而且源表和topic的映射方式為單表對單topic,如果源表數量過多,則需要創建大量的topic,將不便于下游消費。
Blob類型由于不存在schema,topic內只存放Blob二進制數據,因此有較大的自由度。支持存放源表的DDL消息和源端的心跳消息,可傳遞給下游消費,且采用單庫(多表)對單topic的映射方式,不論源表數量多少僅需創建一個topic,方便下游消費。更適用于DataHub作為中間消息隊列進行整庫遷移的場景。
不同數據類型分片策略
Shard表示對DataHub的一個topic進行數據傳輸的并發通道,單個Shard寫入速率有上限,多個Shard可以提高寫入性能,但DataHub僅能保證單個Shard在消費時的有序性,不保證多個Shard之間消息的順序。因此,為了既能通過增加Shard數量提高寫入性能,又能夠保證多個Shard之間消息的有序性,同時避免數據傾斜,現針對Blob和Tuple類型提供以下分片策略。
場景 | Tuple | Blob |
有主鍵(包含自定義主鍵) | 按主鍵進行分片 | 按主鍵進行分片 |
順序保證 | 同一主鍵消息保證有序 | 同一主鍵消息保證有序 |
無主鍵 | 隨機分片 | 按表名進行分片 |
順序保證 | 不保證有序 | 同一表消息保證有序 |
同步數據格式
Tuple
數據類型為DataHub Tuple topic自身支持的類型。當使用數據集成創建的topic時,會增加部分元數據列。其中
_sequence_id_
、_excute_time_
、_source_table_
、_before_image_
、_after_image_
為元信息列。參數
描述
_sequence_id_
string類型,由數字組成,每條消息唯一(update before和update after共用一個sequence id)。
_excute_time_
數據產生時間。
_source_table_
數據源表名。
_before_image_
前鏡像(update before和delete為Y,update after和insert為N)。
_after_image_
后鏡像(update before和delete為N,update after和insert為Y)。
示例:下表為一條Insert、Update、Delete語句同步到DataHub的結果。
_sequence_id_
_operation_type_
_excute_time_
_before_image_
_after_image_
1649991610688000000
I
1649991726000
N
Y
1649991610688000001
U
1649991756000
Y
N
1649991610688000001
U
1649991756000
N
Y
1649991610688000002
D
1649991774000
Y
N
Blob
Blob類型的消息格式為JSON字符串轉化的二進制數據,其對應的JSON格式如下:
{ "schema": { //變更的元數據信息,僅指定列名與列類型信息 "dataColumn": [//變更的數據列信息,更新目標表記錄內容 { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "binData", "type": "BYTES" }, { "name": "ts", "type": "DATE" } ], "primaryKey": [ "pkName1", "pkName2" ], "source": { "dbType": "mysql", "dbVersion": "1.0.0", "dbName": "myDatabase", "schemaName": "mySchema", "tableName": "tableName" } }, "payload": { "before": { "dataColumn":{ "id": 111, "name":"scooter", "binData": "[base64 string]", "ts": 1590315269000 } }, "after": { "dataColumn":{ "id": 222, "name":"donald", "binData": "[base64 string]", "ts": 1590315269000 } }, "sequenceId":XXX//字符串類型,用于增全量數據合并的數據排序, "op": "INSERT/UPDATE/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT..."http://大小寫敏感, "timestamp": { "eventTime": 1,//必選,記錄的變更時間,13為時間戳,ms精度 "systemTime": 2,//可選,oracle CDC等部分數據源存在 "checkpointTime": 3//可選,部分數據庫如oceanbase等數據源包含 }, "ddl": { "text": "ADD COLUMN ...", "ddlMeta": "[SQLStatement serialized binary, expressed in base64 string]" } }, "version":"1.0.0" }
Blob字段說明
重要消息中的所有字段類型范圍為StreamX定義的BOOLEAN、DOUBLE、DATE、BYTES、LONG,STRING六種類型。
BOOLEAN:取值為true,false DATE:取值為13為整形,時間精確到ms級 BYTES: 存儲bytes類型,格式為base64編碼后的字符串 BASE64編解碼使用java.util.Base64中的接口實現: String text = "測試text123"; //編碼 Base64.getEncoder().encodeToString(text.getBytes("UTF-8")) //編碼 Base64.getDecoder().decode(encodedText)//解碼
一級元素
二級元素
說明
schema
dataColumn
JSONArray類型,數據列的類型信息。dataColumn記錄上游數據變更記錄的所有列和對應的列類型信息。變更操作包括數據庫對數據的更改(新增、刪除及修改)和數據庫表結構等變更。
name:列名
type:列類型
primaryKey
List類型,主鍵信息。
pk:主鍵名。
source
Object 類型,源端數據庫或表信息。
dbType:String類型,數據庫類型
dbVersion:String類型,數據庫版本
dbName:String類型,數據庫名
schemaName:String類型,Schema名(針對Postgres和SQL Server等)
tableName:String 類型,數據表名
payload
before
JSONObject類型,修改前的數據。例如:數據源端為mysql,做了一次記錄的update操作,before字段存儲記錄被update之前的數據內容。
在從源端讀取到更新、刪除操作消息時,在寫入記錄中填充該字段。
dataColumn:JSONObject類型,表示數據信息。格式為列名:列值, 列名為字符串,列值取決于本身類型,BYTES類型使用Base64 String進行表示,DATE類型采用long表示的13位時間戳,其余類型的值均為本身類型。
after
修改后的數據。格式同before相同。
說明在更新、插入操作時必填。
op
操作類型。取值如下:
INSERT:數據插入
UPDATE_BEFOR:數據更新前
UPDATE_AFTER:數據更新后
DELETE:數據刪除
TRANSACTION_BEGIN:數據庫事務開始
TRANSACTION_END:數據庫事務結束
CREATE:數據庫建表
ALTER:數據庫表變更
QUERY:數據庫變更的原始SQL
TRUNCATE:數據庫表清空
RENAME:數據庫表重命名
CINDEX:創建索引
DINDEX:刪除索引
MHEARTBEAT:用于在源端無新增數據時標識同步仍正常進行的心跳消息
timestamp
JSONObject 類型,本條數據的相關時間戳。
eventTime:Long類型,記錄源端庫發生變更的時間,毫秒精度的13位時間戳。
systemTime:Long類型,同步任務處理該條變更消息的時間,毫秒精度的13位時間戳。
checkpointTime:Long類型,重置同步位點時的設置時間,毫秒精度的13位時間戳,一般與eventTime值一致。
ddl
該字段只在更改數據庫的表結構時才會填充數據,更改數據(包括新增、刪除和修改)時對應的ddl直接填充為null。
text:String類型,數據庫DDL語句文本。
ddlMeta:String類型,使用FastSQL對DDL進行解析后生成的SQLStatement Object進行序列化的二進制表示,并使用Base64編碼為String存儲。
開啟ddl支持時,需要傳遞的SQLStatement序列化對象,下游鏈路反序列化解析對象后,還原成目標數據源的ddl語句做變更。
version
無
格式的版本號。
Blob序列化說明
本文定義的JSON格式,一條消息對應一個JSONObject,JSONObject內部按照消息格式,逐級映射為相應的格式(JSONObject,JSONArray,相應類型的value等)。
整個JSONObject中每個字段的存放類型均按照上述字段說明。序列化將JSONObject轉換為String(如fastJSON的toJSONString方法)然后再采用String的getBytes(Charsets.UTF_8)方法,指定UTF_8字符集轉化為byte[]。
相關消息的JSON樣例
Insert:
{ "schema": { "dataColumn": [ { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "comment", "type": "STRING" } ], "source": { "dbName": "yunshi_db", "dbType": "MySQL", "tableName": "t_shiyu_pk" }, "primaryKey": [ "id", "name" ] }, "payload": { "op": "INSERT", "after": { "dataColumn": { "name": "joe", "comment": "comment", "id": 1 } }, "sequenceId": "1605339516000000004", "timestamp": { "eventTime": 1605339932000, "systemTime": 1605339932736, "checkpointTime": 1605339932000 } }, "version": "0.0.1" }
update before:
{ "schema": { "dataColumn": [ { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "comment", "type": "STRING" } ], "source": { "dbName": "yunshi_db", "dbType": "MySQL", "tableName": "t_shiyu_pk" }, "primaryKey": [ "id", "name" ] }, "payload": { "op": "UPDATE_BEFOR", "before": { "dataColumn": { "name": "joe", "comment": "comment", "id": 1 } }, "sequenceId": "1605339516000000005", "timestamp": { "eventTime": 1605339934000, "systemTime": 1605339934951, "checkpointTime": 1605339934000 } }, "version": "0.0.1" }
update after:
{ "schema": { "dataColumn": [ { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "comment", "type": "STRING" } ], "source": { "dbName": "yunshi_db", "dbType": "MySQL", "tableName": "t_shiyu_pk" }, "primaryKey": [ "id", "name" ] }, "payload": { "op": "UPDATE_AFTER", "after": { "dataColumn": { "name": "joe", "comment": "com1", "id": 1 } }, "sequenceId": "1605339516000000005", "timestamp": { "eventTime": 1605339934000, "systemTime": 1605339934951, "checkpointTime": 1605339934000 } }, "version": "0.0.1" }
delete:
{ "schema": { "dataColumn": [ { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "comment", "type": "STRING" } ], "source": { "dbName": "yunshi_db", "dbType": "MySQL", "tableName": "t_shiyu_pk" }, "primaryKey": [ "id", "name" ] }, "payload": { "op": "DELETE", "before": { "dataColumn": { "name": "joe", "comment": "com1", "id": 1 } }, "sequenceId": "1605339516000000006", "timestamp": { "eventTime": 1605339937000, "systemTime": 1605339937671, "checkpointTime": 1605339937000 } }, "version": "0.0.1" }
Heartbeat:
{ "schema": {}, "payload": { "op": "MHEARTBEAT", "timestamp": { "eventTime": 1605339953629, "checkpointTime": 1605339953629 } }, "version": "0.0.1" }
DDL:
{ "schema": { "source": { "dbName": "yunshi_db", "dbType": "MySQL", "tableName": "t_shiyu_nopk" } }, "payload": { "op": "ALTER", "sequenceId": "1605339516000000035", "ddl": { "text": "alter table t_shiyu_nopk add column holo text", "ddlMeta": "rO0ABXNyACljb20uYWxpYmFiYS5kaS5wbHVnaW4uY2VudGVyLm1ldGEuRERMTWV0YQLb5Cx/YWXtAgACTAAHZGRsVGV4dHQAEkxqYXZhL2xhbmcvU3RyaW5nO0wACXN0YXRlbWVudHQAKkxjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvU1FMU3RhdGVtZW50O3hwdAAtYWx0ZXIgdGFibGUgdF9zaGl5dV9ub3BrIGFkZCBjb2x1bW4gaG9sbyB0ZXh0c3IAPGNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMQWx0ZXJUYWJsZVN0YXRlbWVudBQPP3vMUl2cAgAPSQAHYnVja2V0c1oABmlnbm9yZVoAF2ludmFsaWRhdGVHbG9iYWxJbmRleGVzWgAPbWVyZ2VTbWFsbEZpbGVzWgAHb2ZmbGluZVoABm9ubGluZVoADnJlbW92ZVBhdGl0aW5nWgATdXBkYXRlR2xvYmFsSW5kZXhlc1oAD3VwZ3JhZGVQYXRpdGluZ0wAC2NsdXN0ZXJlZEJ5dAAQTGphdmEvdXRpbC9MaXN0O0wABWl0ZW1zcQB+AAZMAAlwYXJ0aXRpb250ACxMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTFBhcnRpdGlvbkJ5O0wACHNvcnRlZEJ5cQB+AAZMAAx0YWJsZU9wdGlvbnNxAH4ABkwAC3RhYmxlU291cmNldAA6TGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9zdGF0ZW1lbnQvU1FMRXhwclRhYmxlU291cmNlO3hyACxjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3QuU1FMU3RhdGVtZW50SW1wbEOxUUDVCJMGAgADWgAJYWZ0ZXJTZW1pTAAGZGJUeXBldAAcTGNvbS9hbGliYWJhL2Zhc3RzcWwvRGJUeXBlO0wACWhlYWRIaW50c3EAfgAGeHIAKWNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5TUUxPYmplY3RJbXBs5LvqLFggFVECAAVJAAxzb3VyY2VDb2x1bW5JAApzb3VyY2VMaW5lTAAKYXR0cmlidXRlc3QAD0xqYXZhL3V0aWwvTWFwO0wABGhpbnR0ACxMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTENvbW1lbnRIaW50O0wABnBhcmVudHQAJ0xjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvU1FMT2JqZWN0O3hwAAAAAAAAAABwcHAAfnIAGmNvbS5hbGliYWJhLmZhc3RzcWwuRGJUeXBlAAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAAFbXlzcWxwAAAAAAAAAAAAAAAAc3IAE2phdmEudXRpbC5BcnJheUxpc3R4gdIdmcdhnQMAAUkABHNpemV4cAAAAAB3BAAAAAB4c3EAfgAUAAAAAXcEAAAAAXNyADxjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTEFsdGVyVGFibGVBZGRDb2x1bW4l5T6CFe//BAIABloAB2Nhc2NhZGVaAAVmaXJzdEwAC2FmdGVyQ29sdW1udAAlTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9TUUxOYW1lO0wAB2NvbHVtbnNxAH4ABkwAC2ZpcnN0Q29sdW1ucQB+ABhMAAhyZXN0cmljdHQAE0xqYXZhL2xhbmcvQm9vbGVhbjt4cQB+AAsAAAAAAAAAAHBwcQB+AA8AAHBzcQB+ABQAAAABdwQAAAABc3IAOWNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMQ29sdW1uRGVmaW5pdGlvbst0gLKZ0qAtAgAmWgANYXV0b0luY3JlbWVudFoADGRpc2FibGVJbmRleFoAB3ByZVNvcnRJAAxwcmVTb3J0T3JkZXJaAAZzdG9yZWRaAAd2aXJ0dWFsWgAHdmlzaWJsZUwACGFubkluZGV4dAApTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9TUUxBbm5JbmRleDtMAAZhc0V4cHJ0ACVMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTEV4cHI7TAALY2hhcnNldEV4cHJxAH4AHkwADWNvbFByb3BlcnRpZXNxAH4ABkwAC2NvbGxhdGVFeHBycQB+AB5MAAdjb21tZW50cQB+AB5MAAtjb21wcmVzc2lvbnQALkxjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvZXhwci9TUUxDaGFyRXhwcjtMAAtjb25zdHJhaW50c3EAfgAGTAAIZGF0YVR5cGV0AClMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTERhdGFUeXBlO0wABmRiVHlwZXEAfgAKTAALZGVmYXVsdEV4cHJxAH4AHkwACWRlbGltaXRlcnEAfgAeTAASZGVsaW1pdGVyVG9rZW5pemVycQB+AB5MAAZlbmFibGVxAH4AGUwABmVuY29kZXEAfgAfTAAGZm9ybWF0cQB+AB5MABBnZW5lcmF0ZWRBbGF3c0FzcQB+AB5MAAhpZGVudGl0eXQARExjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3Qvc3RhdGVtZW50L1NRTENvbHVtbkRlZmluaXRpb24kSWRlbnRpdHk7TAASanNvbkluZGV4QXR0cnNFeHBycQB+AB5MAAhtYXBwZWRCeXEAfgAGTAAEbmFtZXEAfgAYTAAMbmxwVG9rZW5pemVycQB+AB5MAAhvblVwZGF0ZXEAfgAeTAAEcmVseXEAfgAZTAAMc2VxdWVuY2VUeXBldAAvTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9BdXRvSW5jcmVtZW50VHlwZTtMAARzdGVwcQB+AB5MAAdzdG9yYWdlcQB+AB5MAAl1bml0Q291bnRxAH4AHkwACXVuaXRJbmRleHEAfgAeTAAIdmFsaWRhdGVxAH4AGUwACXZhbHVlVHlwZXEAfgAeeHEAfgALAAAAAAAAAABwcHEAfgAaAAAAAAAAAAAAAHBwcHBwcHBzcQB+ABQAAAAAdwQAAAAAeHNyADpjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTENoYXJhY3RlckRhdGFUeXBlqtJac/d+04cCAAVaAAloYXNCaW5hcnlMAAtjaGFyU2V0TmFtZXEAfgABTAAIY2hhclR5cGVxAH4AAUwAB2NvbGxhdGVxAH4AAUwABWhpbnRzcQB+AAZ4cgArY29tLmFsaWJhYmEuZmFzdHNxbC5zcWwuYXN0LlNRTERhdGFUeXBlSW1wbEWL29pc1gZFAgAJSgAObmFtZUhhc2hDb2RlNjRaAAh1bnNpZ25lZFoAEXdpdGhMb2NhbFRpbWVab25lWgAIemVyb2ZpbGxMAAlhcmd1bWVudHNxAH4ABkwABmRiVHlwZXEAfgAKTAAHaW5kZXhCeXEAfgAeTAAEbmFtZXEAfgABTAAMd2l0aFRpbWVab25lcQB+ABl4cQB+AAsAAAAAAAAAAHBwcQB+ACP6BPTvGZVAfgAAAHNxAH4AFAAAAAB3BAAAAAB4cHB0AAR0ZXh0cABwcHBwcQB+ABJwcHBwcHBwcHBwc3IAMmNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5leHByLlNRTElkZW50aWZpZXJFeHBy3DXH1zvWbgkCAARKAApoYXNoQ29kZTY0TAAEbmFtZXEAfgABTAAOcmVzb2x2ZWRDb2x1bW5xAH4ADkwAE3Jlc29sdmVkT3duZXJPYmplY3RxAH4ADnhyACdjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3QuU1FMRXhwckltcGxs2ypmFJxWrQIAAHhxAH4ACwAAAAAAAAAAcHBwQCnxzH5tIDl0AARob2xvcHBwcHBwcHBwcHBweHBweHBzcQB+ABQAAAAAdwQAAAAAeHNxAH4AFAAAAAB3BAAAAAB4c3IAOGNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMRXhwclRhYmxlU291cmNlRHD7eYJ4eswCAAVMAAdjb2x1bW5zcQB+AAZMAARleHBycQB+AB5MAApwYXJ0aXRpb25zcQB+AAZMAAhzYW1wbGluZ3QAOExjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3Qvc3RhdGVtZW50L1NRTFRhYmxlU2FtcGxpbmc7TAAMc2NoZW1hT2JqZWN0dAAxTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL3JlcG9zaXRvcnkvU2NoZW1hT2JqZWN0O3hyADhjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTFRhYmxlU291cmNlSW1wbAqEMenTm5zUAgAESgAPYWxpYXNIYXNoQ29kZTY0TAAFYWxpYXNxAH4AAUwACWZsYXNoYmFja3EAfgAeTAAFaGludHNxAH4ABnhxAH4ACwAAAAAAAAAAcHBwAAAAAAAAAABwcHBwc3EAfgAqAAAAAAAAAABwcHEAfgA0NH7o4UvP9Dt0AAx0X3NoaXl1X25vcGtwcHBwcA==" }, "timestamp": { "eventTime": 1605342109000, "systemTime": 1605342109259, "checkpointTime": 1605342109000 } }, "version": "0.0.1" }