本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。
完成數據訂閱通道的配置后,您可以使用DTS提供的SDK示例代碼來訂閱數據變更信息,本文介紹該示例代碼的使用說明。
操作步驟
若數據源是PolarDB-X 1.0或DMS LogicDB,消費訂閱數據的操作步驟請參見使用SDK示例代碼消費PolarDB-X 1.0訂閱數據。
如果使用子賬號(RAM用戶)來訂閱數據,該賬號需具備AliyunDTSFullAccess權限,以及訂閱對象的訪問權限,授權方法請參見通過系統策略授權子賬號管理DTS和為RAM用戶授權。
不同的消費之間是相互獨立的。
本操作為Java語言的SDK客戶端示例,Python和Go語言的示例代碼,請參見dts-subscribe-demo。
本文以IntelliJ IDEA軟件(Community Edition 2020.1 Windows版本)為例,介紹如何運行SDK示例代碼來消費訂閱數據。
創建新版數據訂閱通道,詳情請參見創建RDS MySQL數據訂閱通道、創建PolarDB MySQL版數據訂閱通道或創建Oracle數據訂閱通道。
創建一個或多個消費組,詳情請參見新增消費組。
重要在消費訂閱數據時,您需要調用DefaultUserRecord的commit方法以提交位點信息,否則會導致數據重復消費。
根據業務需求,使用SDK示例代碼。
使用打包好的新版訂閱SDK(推薦)
打開IntelliJ IDEA軟件,然后單擊Create New Project,新建一個業務Project。
在新建的業務Project中,找到項目對象模型文件:pom.xml。
在pom.xml中添加如下依賴:
<dependency> <groupId>com.aliyun.dts</groupId> <artifactId>dts-new-subscribe-sdk</artifactId> <version>{dts_new_sdk_version}</version> </dependency>
說明您可以在dts-new-subscribe-sdk頁面查看最新Maven依賴。
參考使用示例代碼使用新版訂閱SDK。
使用定制修改后的新版訂閱SDK
下載SDK示例代碼文件,然后解壓該文件。
說明單擊,然后選擇Download ZIP下載文件。
定位至SDK示例代碼解壓的目錄,使用文本編輯工具打開pom.xml文件,將數據訂閱SDK的版本修改為最新版本。
重要您可以在Maven網站中獲取最新的數據訂閱SDK版本,詳情請參見數據訂閱SDK的Maven頁面。
打開IntelliJ IDEA軟件,然后單擊Open or Import。
在彈出的對話框中,定位至SDK示例代碼解壓的目錄,依次展開文件夾,找到項目對象模型文件:pom.xml。
在彈出對話框中,選擇Open as Project。
在IntelliJ IDEA軟件界面,依次展開文件夾,并根據SDK客戶端的使用模式,選擇并雙擊打開對應的Java文件:DTSConsumerAssignDemo.java或DTSConsumerSubscribeDemo.java。
說明DTS支持以下兩種SDK客戶端的使用模式:
ASSIGN模式:DTS為了保證消息的全局有序,每個訂閱Topic只有一個partition,且固定分配至partition 0中。當SDK客戶端的使用模式為ASSIGN模式時,建議只啟動一個SDK客戶端。
SUBSCRIBE模式:DTS為了保證消息的全局有序,每個訂閱Topic只有一個partition,且固定分配至partition 0中。當SDK客戶端的使用模式為SUBSCRIBE模式時,您可以在一個消費組下同時啟動多個SDK客戶端,以實現災備。實現原理是當消費組下的正常消費數據的客戶端發生故障后,其他的SDK客戶端將隨機且自動地分配到partition 0,繼續消費。
設置Java文件代碼中的必填參數。
表 1. 必填參數說明
參數
說明
獲取方式
brokerUrl
數據訂閱通道的網絡地址及端口號信息。
說明如果您部署SDK客戶端所屬的ECS實例與數據訂閱通道屬于經典網絡或同一專有網絡,建議通過內網地址進行數據訂閱,網絡延遲最小。
不建議使用公網地址。
在DTS控制臺單擊目標訂閱實例ID,在基本信息頁面的網絡區域,您可以獲取網絡地址及端口號信息。
topic
數據訂閱通道的訂閱Topic。
在DTS控制臺單擊目標訂閱實例ID,在基本信息頁面的基本信息區域,您可以獲取到訂閱Topic。
sid
消費組ID。
在DTS控制臺單擊目標訂閱實例ID,然后單擊數據消費,您可以獲取到消費組ID和消費組的賬號信息。
說明消費組賬號的密碼已在您新建消費組時指定。
userName
消費組的賬號。
警告如您未使用本文提供的客戶端,請按照
<消費組的賬號>-<消費組ID>
的格式設置用戶名(例如:dtstest-dtsae******bpv
),否則無法正常連接。password
該賬號的密碼。
initCheckpoint
消費位點,即SDK客戶端消費第一條數據的時間戳,格式為Unix時間戳,例如1620962769。
說明消費位點信息可用于:
當業務程序中斷后,傳入已消費位點繼續消費數據,防止數據丟失。
在訂閱客戶端啟動時,傳入所需的消費位點,調整訂閱位點,實現按需消費數據。
消費位點必須在訂閱實例的數據范圍(如圖示)之內,并需轉化為Unix時間戳。
說明Unix時間戳轉換工具可用搜索引擎獲取。
ConsumerContext.ConsumerSubscribeMode subscribeMode
SDK客戶端的使用模式,取值為:
ConsumerContext.ConsumerSubscribeMode.ASSIGN
:ASSIGN模式,即一個消費組下僅支持一個SDK客戶端消費訂閱數據。ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE
:SUBSCRIBE模式,即支持在同一個消費組下同時啟動多個SDK客戶端實現災備。
無
在IntelliJ IDEA軟件界面的頂部,選擇 運行該客戶端。
說明首次運行時,軟件需要一定時間自動加載相關依賴包并完成安裝。
運行結果如下圖所示,該客戶端可正常訂閱到源庫的數據變更信息。
SDK客戶端每隔一定時間會統計并顯示消費數據的信息,包括數據發送和接受時數據總數、數據總量、每秒請求數接收RPS等。
表 2. 消費數據的統計信息
參數
說明
outCounts
SDK客戶端所消費的數據總數。
outBytes
SDK客戶端所消費的數據總量,單位為Byte。
outRps
SDK客戶端消費數據時的每秒請求數。
outBps
SDK客戶端消費數據時每秒傳送的比特數。
inBytes
DTS服務器發送的數據總量,單位為Byte。
DStoreRecordQueue
DTS服務器發送數據時,當前數據緩存隊列的大小。
inCounts
DTS服務器發送數據總數。
inRps
DTS服務器發送數據時的每秒請求數。
__dt
SDK客戶端接收到數據的當前時間戳,單位為毫秒。
DefaultUserRecordQueue
序列化后,當前數據緩存隊列的大小。
保存和查詢消費位點
當SDK客戶端首次啟動、重啟或者發生內部重試時,您需要查詢并傳入消費位點,開始或重新消費數據。下文將介紹在不同情況下如何管理和查詢消費位點,以確保數據不丟失,且盡量不重復,實現按需消費。
場景 | SDK使用模式 | 查詢方法 |
查詢消費位點 | ASSIGN模式、SUBSCRIBE模式 |
|
首次啟動SDK客戶端,需傳入消費位點,來消費數據。 | ASSIGN模式、SUBSCRIBE模式 | 根據SDK客戶端使用模式,選擇Java文件DTSConsumerAssignDemo.java或DTSConsumerSubscribeDemo.java,并配置消費位點 |
SDK客戶端因內部重試,需重新傳入上一個記錄的消費位點,以繼續消費數據。 | ASSIGN模式 | 請按如下順序,查找上一個記錄的消費位點,找到即可返回位點信息:
|
SUBSCRIBE模式 | 請按如下順序,查找上一個記錄的消費位點,找到即可返回位點信息:
| |
已重啟SDK客戶端,需重新傳入上一個記錄的消費位點,以繼續消費數據。 | ASSIGN模式 | 根據consumerContext.java文件中
|
SUBSCRIBE模式 | 該模式下consumerContext.java文件中
|
持久化存儲消費位點
如果增量數據采集模塊觸發容災機制(特別是SUBSCRIBE模式),新建的增量數據采集模塊將無法保存客戶端上次的消費位點信息,可能會導致客戶端從一個較舊的位點開始消費訂閱數據,從而造成歷史數據的重復消費。例如:增量數據服務切換前,老的增量數據采集模塊位點范圍為2023年11月11日 08:00:00~ 2023年11月12日 08:00:00,客戶端的消費位點為2023年11月12日 08:00:00;增量數據服務切換后,新的增量數據采集模塊位點范圍為2023年11月08日 10:00:00~ 2023年11月12日 08:01:00,那么客戶端會從新的增量數據采集模塊的起始位點2023年11月08日 10:00:00開始消費,造成重復消費歷史數據。
為了規避這種切換場景對歷史數據的重復消費,建議您在客戶端配置一個在客戶端保存的消費位點持久化存儲方式。示例方法如下,您可以根據實際情況進行修改。
創建一個
UserMetaStore()
方法,繼承實現AbstractUserMetaStore()
方法。例如使用MySQL數據庫存儲位點信息,Java示例代碼如下:
public class UserMetaStore extends AbstractUserMetaStore { @Override protected void saveData(String groupID, String toStoreJson) { Connection con = getConnection(); String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); pres.setString(2, toStoreJson); pres.execute(); } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } @Override protected String getData(String groupID) { Connection con = getConnection(); String sql = "select checkpoint from dts_checkpoint where group_id = ?"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); ResultSet rs = pres.executeQuery() String checkpoint = rs.getString("checkpoint"); return checkpoint; } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } }
在consumerContext.java文件中的
setUserRegisteredStore(new UserMetaStore())
方法,配置外部存儲介質。
常見問題
無法連接訂閱實例,如何處理?
請根據報錯提示進行排查,詳情請參見問題排查。
持久化后的消費位點是什么格式的數據?
消費位點在持久化處理后,將返回JSON格式的數據。其中,持久化后的消費位點的格式為Unix時間戳,您可以直接將其傳回SDK進行使用。如下返回數據中,
"timestamp"
后的1700709977
即為持久化后的消費位點。{"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":1700709977,"info":""}]}
問題排查
問題 | 報錯提示 | 原因 | 解決方法 |
無法連接 |
|
| 填入正確的 |
| 無法通過broker地址連接真實的IP地址。 | ||
| 用戶名和密碼錯誤。 | ||
| consumerContext.java文件中 | 傳入在訂閱實例的數據范圍(如圖示)之內的消費位點,查詢方式,請參見必填參數說明。 | |
消費訂閱速度變慢 | 無 |
|