本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業(yè)務造成影響,請務必仔細閱讀。
在完成數(shù)據(jù)訂閱通道的配置(創(chuàng)建好訂閱任務和消費組)后,您可以自行編寫SDK示例代碼或使用DTS提供的SDK示例代碼來訂閱數(shù)據(jù)變更信息,本文介紹示例代碼的使用方法。
本操作為Java語言的SDK客戶端示例,Python和Go語言的示例代碼,請參見dts-subscribe-demo。
操作步驟
如果數(shù)據(jù)源是PolarDB-X 1.0或DMS LogicDB,消費訂閱數(shù)據(jù)的操作步驟,請參見使用SDK示例代碼消費PolarDB-X 1.0訂閱數(shù)據(jù)。
如果使用子賬號(RAM用戶)來訂閱數(shù)據(jù),該賬號需具備AliyunDTSFullAccess權限,以及訂閱對象的訪問權限。授權方法,請參見通過系統(tǒng)策略授權子賬號管理DTS和為RAM用戶授權。
不同的消費之間是相互獨立的。
本文以IntelliJ IDEA軟件(Community Edition 2020.1 Windows版本)為例,介紹如何運行SDK示例代碼來消費訂閱數(shù)據(jù)。
- 創(chuàng)建新版數(shù)據(jù)訂閱通道,詳情請參見訂閱方案概覽中的相關配置文檔。
- 創(chuàng)建一個或多個消費組,詳情請參見新增消費組。
根據(jù)業(yè)務需求,使用SDK示例代碼。
重要在消費訂閱數(shù)據(jù)時,您需要調用DefaultUserRecord的commit方法以提交位點信息,否則會導致數(shù)據(jù)重復消費。
使用打包好的新版訂閱SDK(推薦)
打開IntelliJ IDEA軟件,然后單擊Create New Project,新建一個業(yè)務Project。
在新建的業(yè)務Project中,找到項目對象模型文件:pom.xml。
在pom.xml中添加如下依賴:
<dependency> <groupId>com.aliyun.dts</groupId> <artifactId>dts-new-subscribe-sdk</artifactId> <version>{dts_new_sdk_version}</version> </dependency>
重要您可以在dts-new-subscribe-sdk頁面查看最新Maven依賴。
dts-new-subscribe-sdk中封裝了一個原生依賴:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>{version}</version> </dependency>
2.0.0版本的dts-new-subscribe-sdk封裝的是2.7版本的kafka-clients,2.0.0以下版本封裝的是1.0.0版本的kafka-clients。
參考使用示例代碼使用新版訂閱SDK。
使用定制修改后的新版訂閱SDK
下載SDK示例代碼文件,然后解壓該文件。
說明單擊,然后選擇Download ZIP下載文件。
定位至SDK示例代碼解壓的目錄,使用文本編輯工具打開pom.xml文件,將數(shù)據(jù)訂閱SDK的版本修改為最新版本。
重要您可以在dts-new-subscribe-sdk頁面查看最新Maven依賴。
dts-new-subscribe-sdk中封裝了一個原生依賴:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>{version}</version> </dependency>
2.0.0版本的dts-new-subscribe-sdk封裝的是2.7版本的kafka-clients,2.0.0以下版本封裝的是1.0.0版本的kafka-clients。
打開IntelliJ IDEA軟件,然后單擊Open or Import。
在彈出的對話框中,定位至SDK示例代碼解壓的目錄,依次展開文件夾,找到項目對象模型文件:pom.xml。
在彈出對話框中,選擇Open as Project。
在IntelliJ IDEA軟件界面,依次展開文件夾,并根據(jù) SDK客戶端的使用模式,選擇并雙擊打開對應的Java文件:DTSConsumerAssignDemo.java或DTSConsumerSubscribeDemo.java。
說明DTS支持以下兩種SDK客戶端的使用模式:
ASSIGN模式:DTS為了保證消息的全局有序,每個訂閱Topic只有一個partition,且固定分配至partition 0中。當SDK客戶端的使用模式為ASSIGN模式時,建議只啟動一個SDK客戶端。
SUBSCRIBE模式:DTS為了保證消息的全局有序,每個訂閱Topic只有一個partition,且固定分配至partition 0中。當SDK客戶端的使用模式為SUBSCRIBE模式時,您可以在一個消費組下同時啟動多個SDK客戶端,以實現(xiàn)災備。實現(xiàn)原理是當消費組下的正常消費數(shù)據(jù)的客戶端發(fā)生故障后,其他的SDK客戶端將隨機且自動地分配到partition 0,繼續(xù)消費。
設置Java文件代碼中的必填參數(shù)。
表 1. 必填參數(shù)說明
參數(shù)
說明
獲取方式
brokerUrl
數(shù)據(jù)訂閱通道的網絡地址及端口號信息。
說明如果您部署SDK客戶端所屬的ECS實例與數(shù)據(jù)訂閱通道屬于經典網絡或同一專有網絡,建議通過內網地址進行數(shù)據(jù)訂閱,網絡延遲最小。
不建議使用公網地址。
在DTS控制臺單擊目標訂閱實例ID,在基本信息頁面的網絡區(qū)域,您可以獲取網絡地址及端口號信息。
topic
數(shù)據(jù)訂閱通道的訂閱Topic。
在DTS控制臺單擊目標訂閱實例ID,在基本信息頁面的基本信息區(qū)域,您可以獲取到訂閱Topic。
sid
消費組ID。
在DTS控制臺單擊目標訂閱實例ID,然后單擊數(shù)據(jù)消費,您可以獲取到消費組ID/名稱和消費組的賬號信息。
說明消費組賬號的密碼已在您新建消費組時指定。
userName
消費組的賬號。
警告如您未使用本文提供的客戶端,請按照
<消費組的賬號>-<消費組ID>
的格式設置用戶名(例如:dtstest-dtsae******bpv
),否則無法正常連接。password
該賬號的密碼。
initCheckpoint
消費位點,即SDK客戶端消費第一條數(shù)據(jù)的時間戳,格式為Unix時間戳,例如1620962769。
說明消費位點信息可用于:
當業(yè)務程序中斷后,傳入已消費位點繼續(xù)消費數(shù)據(jù),防止數(shù)據(jù)丟失。
在訂閱客戶端啟動時,傳入所需的消費位點,調整訂閱位點,實現(xiàn)按需消費數(shù)據(jù)。
消費位點必須在訂閱實例的數(shù)據(jù)范圍之內,并需轉化為Unix時間戳。
說明您可以在訂閱任務列表的數(shù)據(jù)范圍列,查看訂閱實例的數(shù)據(jù)范圍。
Unix時間戳轉換工具可用搜索引擎獲取。
ConsumerContext.ConsumerSubscribeMode subscribeMode
SDK客戶端的使用模式,取值為:
ConsumerContext.ConsumerSubscribeMode.ASSIGN
:ASSIGN模式,即一個消費組下僅支持一個SDK客戶端消費訂閱數(shù)據(jù)。ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE
:SUBSCRIBE模式,即支持在同一個消費組下同時啟動多個SDK客戶端實現(xiàn)災備。
無
在IntelliJ IDEA軟件界面的頂部,選擇 運行該客戶端。
說明首次運行時,軟件需要一定時間自動加載相關依賴包并完成安裝。
運行結果如下圖所示,該客戶端可正常訂閱到源庫的數(shù)據(jù)變更信息。
SDK客戶端每隔一定時間會統(tǒng)計并顯示消費數(shù)據(jù)的信息,包括數(shù)據(jù)發(fā)送和接受時數(shù)據(jù)總數(shù)、數(shù)據(jù)總量、每秒請求數(shù)接收RPS等。
表 2. 消費數(shù)據(jù)的統(tǒng)計信息
參數(shù)
說明
outCounts
SDK客戶端所消費的數(shù)據(jù)總數(shù)。
outBytes
SDK客戶端所消費的數(shù)據(jù)總量,單位為Byte。
outRps
SDK客戶端消費數(shù)據(jù)時的每秒請求數(shù)。
outBps
SDK客戶端消費數(shù)據(jù)時每秒傳送的比特數(shù)。
inBytes
DTS服務器發(fā)送的數(shù)據(jù)總量,單位為Byte。
DStoreRecordQueue
DTS服務器發(fā)送數(shù)據(jù)時,當前數(shù)據(jù)緩存隊列的大小。
inCounts
DTS服務器發(fā)送數(shù)據(jù)總數(shù)。
inRps
DTS服務器發(fā)送數(shù)據(jù)時的每秒請求數(shù)。
__dt
SDK客戶端接收到數(shù)據(jù)的當前時間戳,單位為毫秒。
DefaultUserRecordQueue
序列化后,當前數(shù)據(jù)緩存隊列的大小。
管理消費位點
當SDK客戶端首次啟動、重啟或者發(fā)生內部重試時,您需要查詢并傳入 消費位點,開始或重新消費數(shù)據(jù)。下文將介紹在不同情況下如何管理和查詢消費位點,以確保數(shù)據(jù)不丟失,且盡量不重復,實現(xiàn)按需消費。
若您需要重置客戶端的消費位點,可以根據(jù)訂閱的模式(SDK使用模式)參考下表查詢消費位點并進行修改。
場景 | SDK使用模式 | 位點管理方式 |
查詢消費位點 | ASSIGN模式、SUBSCRIBE模式 |
|
首次啟動SDK客戶端,需傳入消費位點,來消費數(shù)據(jù)。 | ASSIGN模式、SUBSCRIBE模式 | 根據(jù)SDK客戶端使用模式,選擇Java文件DTSConsumerAssignDemo.java或DTSConsumerSubscribeDemo.java,并配置消費位點 |
SDK客戶端因內部重試,需重新傳入上一個記錄的消費位點,以繼續(xù)消費數(shù)據(jù)。 | ASSIGN模式 | 按如下順序,查找上一個記錄的消費位點,找到即可返回位點信息:
|
SUBSCRIBE模式 | 按如下順序,查找上一個記錄的消費位點,找到即可返回位點信息:
| |
已重啟SDK客戶端,需重新傳入上一個記錄的消費位點,以繼續(xù)消費數(shù)據(jù)。 | ASSIGN模式 | 根據(jù)consumerContext.java文件中
|
SUBSCRIBE模式 | 該模式下consumerContext.java文件中
|
持久化存儲消費位點
如果增量數(shù)據(jù)采集模塊觸發(fā)容災機制(特別是SUBSCRIBE模式),新建的增量數(shù)據(jù)采集模塊將無法保存客戶端上次的消費位點信息,可能會導致客戶端從一個較舊的位點開始消費訂閱數(shù)據(jù),從而造成歷史數(shù)據(jù)的重復消費。例如:增量數(shù)據(jù)服務切換前,老的增量數(shù)據(jù)采集模塊位點范圍為2023年11月11日 08:00:00~ 2023年11月12日 08:00:00,客戶端的消費位點為2023年11月12日 08:00:00;增量數(shù)據(jù)服務切換后,新的增量數(shù)據(jù)采集模塊位點范圍為2023年11月08日 10:00:00~ 2023年11月12日 08:01:00,那么客戶端會從新的增量數(shù)據(jù)采集模塊的起始位點2023年11月08日 10:00:00開始消費,造成重復消費歷史數(shù)據(jù)。
為了規(guī)避這種切換場景對歷史數(shù)據(jù)的重復消費,建議您在客戶端配置一個在客戶端保存的消費位點持久化存儲方式。示例方法如下,您可以根據(jù)實際情況進行修改。
創(chuàng)建一個
UserMetaStore()
方法,繼承實現(xiàn)AbstractUserMetaStore()
方法。例如使用MySQL數(shù)據(jù)庫存儲位點信息,Java示例代碼如下:
public class UserMetaStore extends AbstractUserMetaStore { @Override protected void saveData(String groupID, String toStoreJson) { Connection con = getConnection(); String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); pres.setString(2, toStoreJson); pres.execute(); } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } @Override protected String getData(String groupID) { Connection con = getConnection(); String sql = "select checkpoint from dts_checkpoint where group_id = ?"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); ResultSet rs = pres.executeQuery() String checkpoint = rs.getString("checkpoint"); return checkpoint; } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } }
在consumerContext.java文件中的
setUserRegisteredStore(new UserMetaStore())
方法,配置外部存儲介質。
常見問題
無法連接訂閱實例,如何處理?
請根據(jù)報錯提示進行排查,詳情請參見問題排查。
持久化后的消費位點是什么格式的數(shù)據(jù)?
消費位點在持久化處理后,將返回JSON格式的數(shù)據(jù)。其中,持久化后的消費位點的格式為Unix時間戳,您可以直接將其傳回SDK進行使用。如下返回數(shù)據(jù)中,
"timestamp"
后的1700709977
即為持久化后的消費位點。{"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":1700709977,"info":""}]}
訂閱任務是否支持多個客戶端并行消費?
不支持。SUBSCRIBE模式允許多個客戶端并行,但只有一個客戶端可以消費到數(shù)據(jù)。
是否支持使用Python或Go語言消費訂閱數(shù)據(jù)?
支持。Python和Go語言的示例代碼,請參見dts-subscribe-demo。
問題排查
問題 | 報錯提示 | 原因 | 解決方法 |
無法連接 |
|
| 填入正確的 |
| 無法通過broker地址連接真實的IP地址。 | ||
| 用戶名和密碼錯誤。 | ||
| consumerContext.java文件中 | 傳入在訂閱實例的數(shù)據(jù)范圍之內的消費位點,查詢方式,請參見必填參數(shù)說明。 | |
消費訂閱速度變慢 | 無 |
|