日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用Kafka客戶端消費訂閱數據

重要

本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。

新版數據訂閱支持使用0.11版本至2.7版本的Kafka客戶端消費訂閱數據,DTS為您提供了Kafka客戶端Demo,本文將介紹該客戶端的使用說明。

注意事項

  • 使用本文提供的Demo消費數據時,如果采用auto commit(自動提交),可能會因為數據還沒被消費完就執行了提交操作,從而丟失部分數據,建議采用手動提交的方式以避免該問題。

    說明

    如果發生故障沒有提交成功,重啟客戶端后會從上一個記錄的位點進行數據消費,期間會有部分重復數據,您需要手動過濾。

  • 數據以Avro序列化存儲,詳細格式請參見Record.avsc文檔。

    警告

    如果您使用的不是本文提供的Kafka客戶端,在進行反序列化解析時,可能出現解析的數據有誤,您需要自行驗證數據的正確性。

  • 關于offsetForTimes接口,DTS的搜索單位為秒,原生Kafka的搜索單位為毫秒。

  • 由于數據訂閱服務端會因容災等原因導致網絡閃斷,若您未使用本文提供的Kafka客戶端,您使用的Kafka客戶端需具備網絡重試能力。

  • 若您使用原生的Kafka客戶端消費訂閱數據,則可能會在DTS發生增量數據采集模塊切換行為,從而使subscribe模式下訂閱客戶端保存在服務端的消費位點被清除,您需要手動調整訂閱的消費位點以實現按需消費數據。

Kafka客戶端運行流程說明

請下載Kafka客戶端Demo代碼。更多關于代碼使用的詳細介紹,請參見Demo中的Readme文檔。

說明
  • 單擊code,然后選擇Download ZIP下載文件。

  • 如需使用Kafka客戶端2.0版本,您需要修改subscribe_example-master/javaimpl/pom.xml文件,將kafka客戶端的版本號修改成2.0.0。

kafka2.0

表 1. 運行流程說明

步驟

相關目錄或文件

1、使用原生的Kafka consumer從數據訂閱通道中獲取增量數據。

subscribe_example-master/javaimpl/src/main/java/recordgenerator/

2、將獲取的增量數據鏡像執行反序列化,并從中獲取前鏡像(該條記錄在數據發生變更前,各字段對應的值)、 后鏡像(該條記錄在數據發生變更后,各字段對應的值)和其他屬性。

警告
  • 如源實例為自建Oracle數據庫,則為確保客戶端成功消費訂閱數據,并保證前后鏡像完整性,您需要開啟全列補償日志。

  • 如源實例不為自建Oracle數據庫,則DTS暫時不能保證前鏡像的完整性,建議您對所獲得的前鏡像進行校驗。

subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java

3、將反序列化后的數據中的dataTypeNumber字段轉換為對應數據庫的字段類型。

subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/

操作步驟

本文以IntelliJ IDEA軟件(Community Edition 2018.1.4 Windows版本)為例,介紹如何運行該客戶端消費訂閱通道中的數據。

  1. 創建新版數據訂閱通道,詳情請參見創建RDS MySQL數據訂閱通道創建PolarDB MySQL版數據訂閱通道創建Oracle數據訂閱通道

  2. 創建一個或多個消費組,詳情請參見新增消費組

  3. 下載Kafka客戶端Demo代碼,然后解壓該文件。

    說明

    單擊code,然后選擇Download ZIP下載文件。

  4. 打開IntelliJ IDEA軟件,然后單擊Open

    打開項目

  5. 在彈出的對話框中,定位至Kafka客戶端Demo代碼下載的目錄,參照下圖依次展開文件夾,找到項目對象模型文件:pom.xml

    打開項目文件

  6. 在彈出對話框中,選擇Open as Project

  7. 在IntelliJ IDEA軟件界面,依次展開文件夾,找到并雙擊打開Kafka客戶端Demo文件:NotifyDemoDB.java

  8. 設置NotifyDemoDB.java文件中的各參數對應的值。

    設置參數值

    參數

    說明

    獲取方式

    USER_NAME

    消費組的賬號。

    警告

    如您未使用本文提供的客戶端,請按照<消費組的賬號>-<消費組ID>的格式設置用戶名(例如:dtstest-dtsae******bpv),否則無法正常連接。

    在DTS控制臺單擊目標訂閱實例ID,然后單擊數據消費,您可以獲取到消費組ID和消費組的賬號信息。

    說明

    消費組賬號的密碼已在您新建消費組時指定。

    查看消費組和賬號

    PASSWORD_NAME

    該賬號的密碼。

    SID_NAME

    消費組ID。

    GROUP_NAME

    消費組名稱,需保持和消費組ID相同(即本參數也填入消費組ID)。

    KAFKA_TOPIC

    數據訂閱通道的訂閱Topic。

    在DTS控制臺單擊目標訂閱實例ID,在任務管理頁面,您可以獲取到訂閱Topic、網絡地址信息。獲取topic和網絡信息

    KAFKA_BROKER_URL_NAME

    數據訂閱通道的網絡地址信息。

    說明
    • 如果您部署Kafka客戶端所屬的ECS實例與數據訂閱通道屬于經典網絡或同一專有網絡,建議通過內網地址進行數據訂閱,網絡延遲最小。

    • 不建議使用公網地址。

    INITIAL_CHECKPOINT_NAME

    消費的數據時間點,格式為Unix時間戳,例如1592269238。

    說明
    • 您需要自行保存時間點信息,以便:

      • 當業務程序中斷后,傳入已消費的數據時間點繼續消費數據,防止數據丟失。

      • 在訂閱客戶端啟動時,傳入所需的消費位點,調整訂閱位點,實現按需消費數據。

    • SUBSCRIBE_MODE_NAMEsubscribe時,傳入的INITIAL_CHECKPOINT_NAME僅在訂閱客戶端首次啟動時生效。

    消費的數據時間點必須在訂閱實例的數據范圍(如圖示)之內,并需轉化為Unix時間戳。數據范圍

    說明

    Unix時間戳轉換工具可用搜索引擎獲取。

    USE_CONFIG_CHECKPOINT_NAME

    默認取值為true,即強制使用指定的數據時間點來消費數據,避免丟失已接收到的但未處理的數據。

    SUBSCRIBE_MODE_NAME

    一個消費組下支持同時啟動兩個及以上Kafka客戶端,如需實現該功能,請將所有客戶端該參數的值設置為subscribe

    默認值為assign,即不使用該功能,建議只部署一個客戶端。

  9. 在IntelliJ IDEA軟件界面的頂部,選擇Run > Run運行該客戶端。

    說明

    首次運行時,軟件需要一定時間自動加載相關依賴包并完成安裝。

執行結果

運行結果如下圖所示,該客戶端可正常訂閱到源庫的數據變更信息。

Kafka客戶端訂閱結果

您也可以去除NotifyDemoDB.java文件中的打印日志詳情的注釋(即刪除第25行//log.info(ret);中的//),然后再次運行該客戶端即可查看詳細的數據變更信息。

kafka

常見問題

  • Q:為什么需要自行記錄客戶端的消費位點?

    A:由于DTS記錄的消費位點是接收到Kafka消費客戶端執行commit操作的時間點,可能與當前實際消費到的時間點存在一定的時間差。當業務程序或Kafka消費客戶端異常中斷后,您可以傳入自行記錄的消費位點以繼續消費,避免消費到重復的數據或缺失部分數據。

MySQL字段類型與dataTypeNumber數值的對應關系

MySQL字段類型

對應dataTypeNumber數值

MYSQL_TYPE_DECIMAL

0

MYSQL_TYPE_INT8

1

MYSQL_TYPE_INT16

2

MYSQL_TYPE_INT32

3

MYSQL_TYPE_FLOAT

4

MYSQL_TYPE_DOUBLE

5

MYSQL_TYPE_NULL

6

MYSQL_TYPE_TIMESTAMP

7

MYSQL_TYPE_INT64

8

MYSQL_TYPE_INT24

9

MYSQL_TYPE_DATE

10

MYSQL_TYPE_TIME

11

MYSQL_TYPE_DATETIME

12

MYSQL_TYPE_YEAR

13

MYSQL_TYPE_DATE_NEW

14

MYSQL_TYPE_VARCHAR

15

MYSQL_TYPE_BIT

16

MYSQL_TYPE_TIMESTAMP_NEW

17

MYSQL_TYPE_DATETIME_NEW

18

MYSQL_TYPE_TIME_NEW

19

MYSQL_TYPE_JSON

245

MYSQL_TYPE_DECIMAL_NEW

246

MYSQL_TYPE_ENUM

247

MYSQL_TYPE_SET

248

MYSQL_TYPE_TINY_BLOB

249

MYSQL_TYPE_MEDIUM_BLOB

250

MYSQL_TYPE_LONG_BLOB

251

MYSQL_TYPE_BLOB

252

MYSQL_TYPE_VAR_STRING

253

MYSQL_TYPE_STRING

254

MYSQL_TYPE_GEOMETRY

255

Oracle字段類型與dataTypeNumber數值的對應關系

Oracle字段類型

對應dataTypeNumber數值

VARCHAR2/NVARCHAR2

1

NUMBER/FLOAT

2

LONG

8

DATE

12

RAW

23

LONG_RAW

24

UNDEFINED

29

XMLTYPE

58

ROWID

69

CHAR、NCHAR

96

BINARY_FLOAT

100

BINARY_DOUBLE

101

CLOB/NCLOB

112

BLOB

113

BFILE

114

TIMESTAMP

180

TIMESTAMP_WITH_TIME_ZONE

181

INTERVAL_YEAR_TO_MONTH

182

INTERVAL_DAY_TO_SECOND

183

UROWID

208

TIMESTAMP_WITH_LOCAL_TIME_ZONE

231

PostgreSQL字段類型與dataTypeNumber數值的對應關系

PostgreSQL字段類型

對應dataTypeNumber數值

INT2/SMALLINT

21

INT4/INTEGER/SERIAL

23

INT8/BIGINT

20

CHARACTER

18

CHARACTER VARYING

1043

REAL

700

DOUBLE PRECISION

701

NUMERIC

1700

MONEY

790

DATE

1082

TIME/TIME WITHOUT TIME ZONE

1083

TIME WITH TIME ZONE

1266

TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE

1114

TIMESTAMP WITH TIME ZONE

1184

BYTEA

17

TEXT

25

JSON

114

JSONB

3082

XML

142

UUID

2950

POINT

600

LSEG

601

PATH

602

BOX

603

POLYGON

604

LINE

628

CIDR

650

CIRCLE

718

MACADDR

829

INET

869

INTERVAL

1186

TXID_SNAPSHOT

2970

PG_LSN

3220

TSVECTOR

3614

TSQUERY

3615