日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

附錄:DataHub消息格式

本文為您介紹DataHub不同數據類型對應操作的支持情況,不同數據類型的分片策略、數據格式及相關消息示例。

不同數據類型對應操作的支持情況

Topic是DataHub訂閱和發布的最小單位,用戶可以用Topic來表示一類或者一種流數據。目前支持Tuple與Blob兩種類型:

DataHub類型

寫入DML消息

寫入上游心跳消息

寫入DDL消息

源表和目標topic映射方式

數據類型

Tuple

支持

不支持

不支持

單表對單topic

DataHub支持的類型

Blob

支持

支持

支持

單庫(多表)對單topic

Blob二進制數據

  • Tuple類型由于schema各字段在topic創建后無法更改,所以適用于schema固定,且源表無add column、drop column等改變schema的DDL操作場景。Tuple類型不支持保留上游傳遞的DDL消息以及心跳消息,即Tuple不能將此類消息透穿給消費DataHub的下游。而且源表和topic的映射方式為單表對單topic,如果源表數量過多,則需要創建大量的topic,將不便于下游消費。

  • Blob類型由于不存在schema,topic內只存放Blob二進制數據,因此有較大的自由度。支持存放源表的DDL消息和源端的心跳消息,可傳遞給下游消費,且采用單庫(多表)對單topic的映射方式,不論源表數量多少僅需創建一個topic,方便下游消費。更適用于DataHub作為中間消息隊列進行整庫遷移的場景。

不同數據類型分片策略

Shard表示對DataHub的一個topic進行數據傳輸的并發通道,單個Shard寫入速率有上限,多個Shard可以提高寫入性能,但DataHub僅能保證單個Shard在消費時的有序性,不保證多個Shard之間消息的順序。因此,為了既能通過增加Shard數量提高寫入性能,又能夠保證多個Shard之間消息的有序性,同時避免數據傾斜,現針對Blob和Tuple類型提供以下分片策略。

場景

Tuple

Blob

有主鍵(包含自定義主鍵)

按主鍵進行分片

按主鍵進行分片

順序保證

同一主鍵消息保證有序

同一主鍵消息保證有序

無主鍵

隨機分片

按表名進行分片

順序保證

不保證有序

同一表消息保證有序

同步數據格式

  • Tuple

    數據類型為DataHub Tuple topic自身支持的類型。當使用數據集成創建的topic時,會增加部分元數據列。元數據列其中_sequence_id__excute_time__source_table_ _before_image__after_image_為元信息列。

    參數

    描述

    _sequence_id_

    string類型,由數字組成,每條消息唯一(update before和update after共用一個sequence id)。

    _excute_time_

    數據產生時間。

    _source_table_

    數據源表名。

    _before_image_

    前鏡像(update before和delete為Y,update after和insert為N)。

    _after_image_

    后鏡像(update before和delete為N,update after和insert為Y)。

    示例:下表為一條Insert、Update、Delete語句同步到DataHub的結果。

    _sequence_id_

    _operation_type_

    _excute_time_

    _before_image_

    _after_image_

    1649991610688000000

    I

    1649991726000

    N

    Y

    1649991610688000001

    U

    1649991756000

    Y

    N

    1649991610688000001

    U

    1649991756000

    N

    Y

    1649991610688000002

    D

    1649991774000

    Y

    N

  • Blob

    Blob類型的消息格式為JSON字符串轉化的二進制數據,其對應的JSON格式如下:

    {
        "schema": { //變更的元數據信息,僅指定列名與列類型信息
            "dataColumn": [//變更的數據列信息,更新目標表記錄內容
                {
                    "name": "id",
                    "type": "LONG"
                },
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "binData",
                    "type": "BYTES"
                },
                {
                    "name": "ts",
                    "type": "DATE"
                }
            ],
            "primaryKey": [
                "pkName1",
                "pkName2"
            ],
            "source": {
                "dbType": "mysql",
                "dbVersion": "1.0.0",
                "dbName": "myDatabase",
                "schemaName": "mySchema",
                "tableName": "tableName"
            }
        },
        "payload": {
            "before": {
                "dataColumn":{
                    "id": 111,
                    "name":"scooter",
                    "binData": "[base64 string]",
                    "ts": 1590315269000
                }
            },
            "after": {
                "dataColumn":{
                    "id": 222,
                    "name":"donald",
                    "binData": "[base64 string]",
                    "ts": 1590315269000
                }
            },
              "sequenceId":XXX//字符串類型,用于增全量數據合并的數據排序,
            "op": "INSERT/UPDATE/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT..."http://大小寫敏感,
            "timestamp": {
                "eventTime": 1,//必選,記錄的變更時間,13為時間戳,ms精度
                "systemTime": 2,//可選,oracle CDC等部分數據源存在
                "checkpointTime": 3//可選,部分數據庫如oceanbase等數據源包含
            },
            "ddl": {
                "text": "ADD COLUMN ...",
                "ddlMeta": "[SQLStatement serialized binary, expressed in base64 string]"
            }
        },
        "version":"1.0.0"
    }
    • Blob字段說明

      重要

      消息中的所有字段類型范圍為StreamX定義的BOOLEAN、DOUBLE、DATE、BYTES、LONG,STRING六種類型。

      BOOLEAN:取值為true,false
      DATE:取值為13為整形,時間精確到ms級
      BYTES: 存儲bytes類型,格式為base64編碼后的字符串
      
      BASE64編解碼使用java.util.Base64中的接口實現:
      String text = "測試text123";
      //編碼
      Base64.getEncoder().encodeToString(text.getBytes("UTF-8"))
      //編碼
      Base64.getDecoder().decode(encodedText)//解碼
                                  

      一級元素

      二級元素

      說明

      schema

      dataColumn

      JSONArray類型,數據列的類型信息。dataColumn記錄上游數據變更記錄的所有列和對應的列類型信息。變更操作包括數據庫對數據的更改(新增、刪除及修改)和數據庫表結構等變更。

      • name:列名

      • type:列類型

      primaryKey

      List類型,主鍵信息。

      pk:主鍵名。

      source

      Object 類型,源端數據庫或表信息。

      • dbType:String類型,數據庫類型

      • dbVersion:String類型,數據庫版本

      • dbName:String類型,數據庫名

      • schemaName:String類型,Schema名(針對Postgres和SQL Server等)

      • tableName:String 類型,數據表名

      payload

      before

      JSONObject類型,修改前的數據。例如:數據源端為mysql,做了一次記錄的update操作,before字段存儲記錄被update之前的數據內容。

      • 在從源端讀取到更新、刪除操作消息時,在寫入記錄中填充該字段。

      • dataColumn:JSONObject類型,表示數據信息。格式為列名:列值, 列名為字符串,列值取決于本身類型,BYTES類型使用Base64 String進行表示,DATE類型采用long表示的13位時間戳,其余類型的值均為本身類型。

      after

      修改后的數據。格式同before相同。

      說明

      在更新、插入操作時必填。

      op

      操作類型。取值如下:

      • INSERT:數據插入

      • UPDATE_BEFOR:數據更新前

      • UPDATE_AFTER:數據更新后

      • DELETE:數據刪除

      • TRANSACTION_BEGIN:數據庫事務開始

      • TRANSACTION_END:數據庫事務結束

      • CREATE:數據庫建表

      • ALTER:數據庫表變更

      • QUERY:數據庫變更的原始SQL

      • TRUNCATE:數據庫表清空

      • RENAME:數據庫表重命名

      • CINDEX:創建索引

      • DINDEX:刪除索引

      • MHEARTBEAT:用于在源端無新增數據時標識同步仍正常進行的心跳消息

      timestamp

      JSONObject 類型,本條數據的相關時間戳。

      • eventTime:Long類型,記錄源端庫發生變更的時間,毫秒精度的13位時間戳。

      • systemTime:Long類型,同步任務處理該條變更消息的時間,毫秒精度的13位時間戳。

      • checkpointTime:Long類型,重置同步位點時的設置時間,毫秒精度的13位時間戳,一般與eventTime值一致。

      ddl

      該字段只在更改數據庫的表結構時才會填充數據,更改數據(包括新增、刪除和修改)時對應的ddl直接填充為null。

      • text:String類型,數據庫DDL語句文本。

      • ddlMeta:String類型,使用FastSQL對DDL進行解析后生成的SQLStatement Object進行序列化的二進制表示,并使用Base64編碼為String存儲。

        開啟ddl支持時,需要傳遞的SQLStatement序列化對象,下游鏈路反序列化解析對象后,還原成目標數據源的ddl語句做變更。

      version

      格式的版本號。

    • Blob序列化說明

      本文定義的JSON格式,一條消息對應一個JSONObject,JSONObject內部按照消息格式,逐級映射為相應的格式(JSONObject,JSONArray,相應類型的value等)。

      整個JSONObject中每個字段的存放類型均按照上述字段說明。序列化將JSONObject轉換為String(如fastJSON的toJSONString方法)然后再采用String的getBytes(Charsets.UTF_8)方法,指定UTF_8字符集轉化為byte[]。

相關消息的JSON樣例

  • Insert:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "id",
                    "type": "LONG"
                },
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "comment",
                    "type": "STRING"
                }
            ],
            "source": {
                "dbName": "yunshi_db",
                "dbType": "MySQL",
                "tableName": "t_shiyu_pk"
            },
            "primaryKey": [
                "id",
                "name"
            ]
        },
        "payload": {
            "op": "INSERT",
            "after": {
                "dataColumn": {
                    "name": "joe",
                    "comment": "comment",
                    "id": 1
                }
            },
            "sequenceId": "1605339516000000004",
            "timestamp": {
                "eventTime": 1605339932000,
                "systemTime": 1605339932736,
                "checkpointTime": 1605339932000
            }
        },
        "version": "0.0.1"
    }
  • update before:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "id",
                    "type": "LONG"
                },
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "comment",
                    "type": "STRING"
                }
            ],
            "source": {
                "dbName": "yunshi_db",
                "dbType": "MySQL",
                "tableName": "t_shiyu_pk"
            },
            "primaryKey": [
                "id",
                "name"
            ]
        },
        "payload": {
            "op": "UPDATE_BEFOR",
            "before": {
                "dataColumn": {
                    "name": "joe",
                    "comment": "comment",
                    "id": 1
                }
            },
            "sequenceId": "1605339516000000005",
            "timestamp": {
                "eventTime": 1605339934000,
                "systemTime": 1605339934951,
                "checkpointTime": 1605339934000
            }
        },
        "version": "0.0.1"
    }
  • update after:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "id",
                    "type": "LONG"
                },
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "comment",
                    "type": "STRING"
                }
            ],
            "source": {
                "dbName": "yunshi_db",
                "dbType": "MySQL",
                "tableName": "t_shiyu_pk"
            },
            "primaryKey": [
                "id",
                "name"
            ]
        },
        "payload": {
            "op": "UPDATE_AFTER",
            "after": {
                "dataColumn": {
                    "name": "joe",
                    "comment": "com1",
                    "id": 1
                }
            },
            "sequenceId": "1605339516000000005",
            "timestamp": {
                "eventTime": 1605339934000,
                "systemTime": 1605339934951,
                "checkpointTime": 1605339934000
            }
        },
        "version": "0.0.1"
    }
  • delete:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "id",
                    "type": "LONG"
                },
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "comment",
                    "type": "STRING"
                }
            ],
            "source": {
                "dbName": "yunshi_db",
                "dbType": "MySQL",
                "tableName": "t_shiyu_pk"
            },
            "primaryKey": [
                "id",
                "name"
            ]
        },
        "payload": {
            "op": "DELETE",
            "before": {
                "dataColumn": {
                    "name": "joe",
                    "comment": "com1",
                    "id": 1
                }
            },
            "sequenceId": "1605339516000000006",
            "timestamp": {
                "eventTime": 1605339937000,
                "systemTime": 1605339937671,
                "checkpointTime": 1605339937000
            }
        },
        "version": "0.0.1"
    }
  • Heartbeat:

    {
        "schema": {},
        "payload": {
            "op": "MHEARTBEAT",
            "timestamp": {
                "eventTime": 1605339953629,
                "checkpointTime": 1605339953629
            }
        },
        "version": "0.0.1"
    }
  • DDL:

    {
        "schema": {
            "source": {
                "dbName": "yunshi_db",
                "dbType": "MySQL",
                "tableName": "t_shiyu_nopk"
            }
        },
        "payload": {
            "op": "ALTER",
            "sequenceId": "1605339516000000035",
            "ddl": {
                "text": "alter table t_shiyu_nopk add column holo text",
                "ddlMeta": "rO0ABXNyACljb20uYWxpYmFiYS5kaS5wbHVnaW4uY2VudGVyLm1ldGEuRERMTWV0YQLb5Cx/YWXtAgACTAAHZGRsVGV4dHQAEkxqYXZhL2xhbmcvU3RyaW5nO0wACXN0YXRlbWVudHQAKkxjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvU1FMU3RhdGVtZW50O3hwdAAtYWx0ZXIgdGFibGUgdF9zaGl5dV9ub3BrIGFkZCBjb2x1bW4gaG9sbyB0ZXh0c3IAPGNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMQWx0ZXJUYWJsZVN0YXRlbWVudBQPP3vMUl2cAgAPSQAHYnVja2V0c1oABmlnbm9yZVoAF2ludmFsaWRhdGVHbG9iYWxJbmRleGVzWgAPbWVyZ2VTbWFsbEZpbGVzWgAHb2ZmbGluZVoABm9ubGluZVoADnJlbW92ZVBhdGl0aW5nWgATdXBkYXRlR2xvYmFsSW5kZXhlc1oAD3VwZ3JhZGVQYXRpdGluZ0wAC2NsdXN0ZXJlZEJ5dAAQTGphdmEvdXRpbC9MaXN0O0wABWl0ZW1zcQB+AAZMAAlwYXJ0aXRpb250ACxMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTFBhcnRpdGlvbkJ5O0wACHNvcnRlZEJ5cQB+AAZMAAx0YWJsZU9wdGlvbnNxAH4ABkwAC3RhYmxlU291cmNldAA6TGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9zdGF0ZW1lbnQvU1FMRXhwclRhYmxlU291cmNlO3hyACxjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3QuU1FMU3RhdGVtZW50SW1wbEOxUUDVCJMGAgADWgAJYWZ0ZXJTZW1pTAAGZGJUeXBldAAcTGNvbS9hbGliYWJhL2Zhc3RzcWwvRGJUeXBlO0wACWhlYWRIaW50c3EAfgAGeHIAKWNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5TUUxPYmplY3RJbXBs5LvqLFggFVECAAVJAAxzb3VyY2VDb2x1bW5JAApzb3VyY2VMaW5lTAAKYXR0cmlidXRlc3QAD0xqYXZhL3V0aWwvTWFwO0wABGhpbnR0ACxMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTENvbW1lbnRIaW50O0wABnBhcmVudHQAJ0xjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvU1FMT2JqZWN0O3hwAAAAAAAAAABwcHAAfnIAGmNvbS5hbGliYWJhLmZhc3RzcWwuRGJUeXBlAAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAAFbXlzcWxwAAAAAAAAAAAAAAAAc3IAE2phdmEudXRpbC5BcnJheUxpc3R4gdIdmcdhnQMAAUkABHNpemV4cAAAAAB3BAAAAAB4c3EAfgAUAAAAAXcEAAAAAXNyADxjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTEFsdGVyVGFibGVBZGRDb2x1bW4l5T6CFe//BAIABloAB2Nhc2NhZGVaAAVmaXJzdEwAC2FmdGVyQ29sdW1udAAlTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9TUUxOYW1lO0wAB2NvbHVtbnNxAH4ABkwAC2ZpcnN0Q29sdW1ucQB+ABhMAAhyZXN0cmljdHQAE0xqYXZhL2xhbmcvQm9vbGVhbjt4cQB+AAsAAAAAAAAAAHBwcQB+AA8AAHBzcQB+ABQAAAABdwQAAAABc3IAOWNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMQ29sdW1uRGVmaW5pdGlvbst0gLKZ0qAtAgAmWgANYXV0b0luY3JlbWVudFoADGRpc2FibGVJbmRleFoAB3ByZVNvcnRJAAxwcmVTb3J0T3JkZXJaAAZzdG9yZWRaAAd2aXJ0dWFsWgAHdmlzaWJsZUwACGFubkluZGV4dAApTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9TUUxBbm5JbmRleDtMAAZhc0V4cHJ0ACVMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTEV4cHI7TAALY2hhcnNldEV4cHJxAH4AHkwADWNvbFByb3BlcnRpZXNxAH4ABkwAC2NvbGxhdGVFeHBycQB+AB5MAAdjb21tZW50cQB+AB5MAAtjb21wcmVzc2lvbnQALkxjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvZXhwci9TUUxDaGFyRXhwcjtMAAtjb25zdHJhaW50c3EAfgAGTAAIZGF0YVR5cGV0AClMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTERhdGFUeXBlO0wABmRiVHlwZXEAfgAKTAALZGVmYXVsdEV4cHJxAH4AHkwACWRlbGltaXRlcnEAfgAeTAASZGVsaW1pdGVyVG9rZW5pemVycQB+AB5MAAZlbmFibGVxAH4AGUwABmVuY29kZXEAfgAfTAAGZm9ybWF0cQB+AB5MABBnZW5lcmF0ZWRBbGF3c0FzcQB+AB5MAAhpZGVudGl0eXQARExjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3Qvc3RhdGVtZW50L1NRTENvbHVtbkRlZmluaXRpb24kSWRlbnRpdHk7TAASanNvbkluZGV4QXR0cnNFeHBycQB+AB5MAAhtYXBwZWRCeXEAfgAGTAAEbmFtZXEAfgAYTAAMbmxwVG9rZW5pemVycQB+AB5MAAhvblVwZGF0ZXEAfgAeTAAEcmVseXEAfgAZTAAMc2VxdWVuY2VUeXBldAAvTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9BdXRvSW5jcmVtZW50VHlwZTtMAARzdGVwcQB+AB5MAAdzdG9yYWdlcQB+AB5MAAl1bml0Q291bnRxAH4AHkwACXVuaXRJbmRleHEAfgAeTAAIdmFsaWRhdGVxAH4AGUwACXZhbHVlVHlwZXEAfgAeeHEAfgALAAAAAAAAAABwcHEAfgAaAAAAAAAAAAAAAHBwcHBwcHBzcQB+ABQAAAAAdwQAAAAAeHNyADpjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTENoYXJhY3RlckRhdGFUeXBlqtJac/d+04cCAAVaAAloYXNCaW5hcnlMAAtjaGFyU2V0TmFtZXEAfgABTAAIY2hhclR5cGVxAH4AAUwAB2NvbGxhdGVxAH4AAUwABWhpbnRzcQB+AAZ4cgArY29tLmFsaWJhYmEuZmFzdHNxbC5zcWwuYXN0LlNRTERhdGFUeXBlSW1wbEWL29pc1gZFAgAJSgAObmFtZUhhc2hDb2RlNjRaAAh1bnNpZ25lZFoAEXdpdGhMb2NhbFRpbWVab25lWgAIemVyb2ZpbGxMAAlhcmd1bWVudHNxAH4ABkwABmRiVHlwZXEAfgAKTAAHaW5kZXhCeXEAfgAeTAAEbmFtZXEAfgABTAAMd2l0aFRpbWVab25lcQB+ABl4cQB+AAsAAAAAAAAAAHBwcQB+ACP6BPTvGZVAfgAAAHNxAH4AFAAAAAB3BAAAAAB4cHB0AAR0ZXh0cABwcHBwcQB+ABJwcHBwcHBwcHBwc3IAMmNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5leHByLlNRTElkZW50aWZpZXJFeHBy3DXH1zvWbgkCAARKAApoYXNoQ29kZTY0TAAEbmFtZXEAfgABTAAOcmVzb2x2ZWRDb2x1bW5xAH4ADkwAE3Jlc29sdmVkT3duZXJPYmplY3RxAH4ADnhyACdjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3QuU1FMRXhwckltcGxs2ypmFJxWrQIAAHhxAH4ACwAAAAAAAAAAcHBwQCnxzH5tIDl0AARob2xvcHBwcHBwcHBwcHBweHBweHBzcQB+ABQAAAAAdwQAAAAAeHNxAH4AFAAAAAB3BAAAAAB4c3IAOGNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMRXhwclRhYmxlU291cmNlRHD7eYJ4eswCAAVMAAdjb2x1bW5zcQB+AAZMAARleHBycQB+AB5MAApwYXJ0aXRpb25zcQB+AAZMAAhzYW1wbGluZ3QAOExjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3Qvc3RhdGVtZW50L1NRTFRhYmxlU2FtcGxpbmc7TAAMc2NoZW1hT2JqZWN0dAAxTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL3JlcG9zaXRvcnkvU2NoZW1hT2JqZWN0O3hyADhjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTFRhYmxlU291cmNlSW1wbAqEMenTm5zUAgAESgAPYWxpYXNIYXNoQ29kZTY0TAAFYWxpYXNxAH4AAUwACWZsYXNoYmFja3EAfgAeTAAFaGludHNxAH4ABnhxAH4ACwAAAAAAAAAAcHBwAAAAAAAAAABwcHBwc3EAfgAqAAAAAAAAAABwcHEAfgA0NH7o4UvP9Dt0AAx0X3NoaXl1X25vcGtwcHBwcA=="
            },
            "timestamp": {
                "eventTime": 1605342109000,
                "systemTime": 1605342109259,
                "checkpointTime": 1605342109000
            }
        },
        "version": "0.0.1"
    }