本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。
新版數據訂閱支持使用0.11版本至2.7版本的Kafka客戶端消費訂閱數據,DTS為您提供了Kafka客戶端Demo,本文將介紹該客戶端的使用說明。
注意事項
使用本文提供的Demo消費數據時,如果采用auto commit(自動提交),可能會因為數據還沒被消費完就執行了提交操作,從而丟失部分數據,建議采用手動提交的方式以避免該問題。
說明如果發生故障沒有提交成功,重啟客戶端后會從上一個記錄的位點進行數據消費,期間會有部分重復數據,您需要手動過濾。
數據以Avro序列化存儲,詳細格式請參見Record.avsc文檔。
警告如果您使用的不是本文提供的Kafka客戶端,在進行反序列化解析時,可能出現解析的數據有誤,您需要自行驗證數據的正確性。
關于
offsetForTimes
接口,DTS的搜索單位為秒,原生Kafka的搜索單位為毫秒。由于數據訂閱服務端會因容災等原因導致網絡閃斷,若您未使用本文提供的Kafka客戶端,您使用的Kafka客戶端需具備網絡重試能力。
若您使用原生的Kafka客戶端消費訂閱數據,則可能會在DTS發生增量數據采集模塊切換行為,從而使subscribe模式下訂閱客戶端保存在服務端的消費位點被清除,您需要手動調整訂閱的消費位點以實現按需消費數據。
Kafka客戶端運行流程說明
請下載Kafka客戶端Demo代碼。更多關于代碼使用的詳細介紹,請參見Demo中的Readme文檔。
單擊,然后選擇Download ZIP下載文件。
如需使用Kafka客戶端2.0版本,您需要修改subscribe_example-master/javaimpl/pom.xml文件,將kafka客戶端的版本號修改成2.0.0。
表 1. 運行流程說明
步驟 | 相關目錄或文件 |
1、使用原生的Kafka consumer從數據訂閱通道中獲取增量數據。 | subscribe_example-master/javaimpl/src/main/java/recordgenerator/ |
2、將獲取的增量數據鏡像執行反序列化,并從中獲取前鏡像(該條記錄在數據發生變更前,各字段對應的值)、 后鏡像(該條記錄在數據發生變更后,各字段對應的值)和其他屬性。 警告
| subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java |
3、將反序列化后的數據中的dataTypeNumber字段轉換為對應數據庫的字段類型。 | subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/ |
操作步驟
本文以IntelliJ IDEA軟件(Community Edition 2018.1.4 Windows版本)為例,介紹如何運行該客戶端消費訂閱通道中的數據。
創建新版數據訂閱通道,詳情請參見創建RDS MySQL數據訂閱通道、創建PolarDB MySQL版數據訂閱通道或創建Oracle數據訂閱通道。
創建一個或多個消費組,詳情請參見新增消費組。
下載Kafka客戶端Demo代碼,然后解壓該文件。
說明單擊,然后選擇Download ZIP下載文件。
打開IntelliJ IDEA軟件,然后單擊Open。
在彈出的對話框中,定位至Kafka客戶端Demo代碼下載的目錄,參照下圖依次展開文件夾,找到項目對象模型文件:pom.xml。
在彈出對話框中,選擇Open as Project。
在IntelliJ IDEA軟件界面,依次展開文件夾,找到并雙擊打開Kafka客戶端Demo文件:NotifyDemoDB.java。
設置NotifyDemoDB.java文件中的各參數對應的值。
參數
說明
獲取方式
USER_NAME
消費組的賬號。
警告如您未使用本文提供的客戶端,請按照
<消費組的賬號>-<消費組ID>
的格式設置用戶名(例如:dtstest-dtsae******bpv
),否則無法正常連接。在DTS控制臺單擊目標訂閱實例ID,然后單擊數據消費,您可以獲取到消費組ID和消費組的賬號信息。
說明消費組賬號的密碼已在您新建消費組時指定。
PASSWORD_NAME
該賬號的密碼。
SID_NAME
消費組ID。
GROUP_NAME
消費組名稱,需保持和消費組ID相同(即本參數也填入消費組ID)。
KAFKA_TOPIC
數據訂閱通道的訂閱Topic。
在DTS控制臺單擊目標訂閱實例ID,在任務管理頁面,您可以獲取到訂閱Topic、網絡地址信息。
KAFKA_BROKER_URL_NAME
數據訂閱通道的網絡地址信息。
說明如果您部署Kafka客戶端所屬的ECS實例與數據訂閱通道屬于經典網絡或同一專有網絡,建議通過內網地址進行數據訂閱,網絡延遲最小。
不建議使用公網地址。
INITIAL_CHECKPOINT_NAME
消費的數據時間點,格式為Unix時間戳,例如1592269238。
說明您需要自行保存時間點信息,以便:
當業務程序中斷后,傳入已消費的數據時間點繼續消費數據,防止數據丟失。
在訂閱客戶端啟動時,傳入所需的消費位點,調整訂閱位點,實現按需消費數據。
若SUBSCRIBE_MODE_NAME為subscribe時,傳入的INITIAL_CHECKPOINT_NAME僅在訂閱客戶端首次啟動時生效。
消費的數據時間點必須在訂閱實例的數據范圍(如圖示)之內,并需轉化為Unix時間戳。
說明Unix時間戳轉換工具可用搜索引擎獲取。
USE_CONFIG_CHECKPOINT_NAME
默認取值為true,即強制使用指定的數據時間點來消費數據,避免丟失已接收到的但未處理的數據。
無
SUBSCRIBE_MODE_NAME
一個消費組下支持同時啟動兩個及以上Kafka客戶端,如需實現該功能,請將所有客戶端該參數的值設置為subscribe。
默認值為assign,即不使用該功能,建議只部署一個客戶端。
無
在IntelliJ IDEA軟件界面的頂部,選擇 運行該客戶端。
說明首次運行時,軟件需要一定時間自動加載相關依賴包并完成安裝。
執行結果
運行結果如下圖所示,該客戶端可正常訂閱到源庫的數據變更信息。
您也可以去除NotifyDemoDB.java文件中的打印日志詳情的注釋(即刪除第25行//log.info(ret);
中的//
),然后再次運行該客戶端即可查看詳細的數據變更信息。
常見問題
Q:為什么需要自行記錄客戶端的消費位點?
A:由于DTS記錄的消費位點是接收到Kafka消費客戶端執行commit操作的時間點,可能與當前實際消費到的時間點存在一定的時間差。當業務程序或Kafka消費客戶端異常中斷后,您可以傳入自行記錄的消費位點以繼續消費,避免消費到重復的數據或缺失部分數據。
MySQL字段類型與dataTypeNumber數值的對應關系
MySQL字段類型 | 對應dataTypeNumber數值 |
MYSQL_TYPE_DECIMAL | 0 |
MYSQL_TYPE_INT8 | 1 |
MYSQL_TYPE_INT16 | 2 |
MYSQL_TYPE_INT32 | 3 |
MYSQL_TYPE_FLOAT | 4 |
MYSQL_TYPE_DOUBLE | 5 |
MYSQL_TYPE_NULL | 6 |
MYSQL_TYPE_TIMESTAMP | 7 |
MYSQL_TYPE_INT64 | 8 |
MYSQL_TYPE_INT24 | 9 |
MYSQL_TYPE_DATE | 10 |
MYSQL_TYPE_TIME | 11 |
MYSQL_TYPE_DATETIME | 12 |
MYSQL_TYPE_YEAR | 13 |
MYSQL_TYPE_DATE_NEW | 14 |
MYSQL_TYPE_VARCHAR | 15 |
MYSQL_TYPE_BIT | 16 |
MYSQL_TYPE_TIMESTAMP_NEW | 17 |
MYSQL_TYPE_DATETIME_NEW | 18 |
MYSQL_TYPE_TIME_NEW | 19 |
MYSQL_TYPE_JSON | 245 |
MYSQL_TYPE_DECIMAL_NEW | 246 |
MYSQL_TYPE_ENUM | 247 |
MYSQL_TYPE_SET | 248 |
MYSQL_TYPE_TINY_BLOB | 249 |
MYSQL_TYPE_MEDIUM_BLOB | 250 |
MYSQL_TYPE_LONG_BLOB | 251 |
MYSQL_TYPE_BLOB | 252 |
MYSQL_TYPE_VAR_STRING | 253 |
MYSQL_TYPE_STRING | 254 |
MYSQL_TYPE_GEOMETRY | 255 |
Oracle字段類型與dataTypeNumber數值的對應關系
Oracle字段類型 | 對應dataTypeNumber數值 |
VARCHAR2/NVARCHAR2 | 1 |
NUMBER/FLOAT | 2 |
LONG | 8 |
DATE | 12 |
RAW | 23 |
LONG_RAW | 24 |
UNDEFINED | 29 |
XMLTYPE | 58 |
ROWID | 69 |
CHAR、NCHAR | 96 |
BINARY_FLOAT | 100 |
BINARY_DOUBLE | 101 |
CLOB/NCLOB | 112 |
BLOB | 113 |
BFILE | 114 |
TIMESTAMP | 180 |
TIMESTAMP_WITH_TIME_ZONE | 181 |
INTERVAL_YEAR_TO_MONTH | 182 |
INTERVAL_DAY_TO_SECOND | 183 |
UROWID | 208 |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | 231 |
PostgreSQL字段類型與dataTypeNumber數值的對應關系
PostgreSQL字段類型 | 對應dataTypeNumber數值 |
INT2/SMALLINT | 21 |
INT4/INTEGER/SERIAL | 23 |
INT8/BIGINT | 20 |
CHARACTER | 18 |
CHARACTER VARYING | 1043 |
REAL | 700 |
DOUBLE PRECISION | 701 |
NUMERIC | 1700 |
MONEY | 790 |
DATE | 1082 |
TIME/TIME WITHOUT TIME ZONE | 1083 |
TIME WITH TIME ZONE | 1266 |
TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE | 1114 |
TIMESTAMP WITH TIME ZONE | 1184 |
BYTEA | 17 |
TEXT | 25 |
JSON | 114 |
JSONB | 3082 |
XML | 142 |
UUID | 2950 |
POINT | 600 |
LSEG | 601 |
PATH | 602 |
BOX | 603 |
POLYGON | 604 |
LINE | 628 |
CIDR | 650 |
CIRCLE | 718 |
MACADDR | 829 |
INET | 869 |
INTERVAL | 1186 |
TXID_SNAPSHOT | 2970 |
PG_LSN | 3220 |
TSVECTOR | 3614 |
TSQUERY | 3615 |