使用SDK示例代碼消費(fèi)訂閱數(shù)據(jù)
本文中含有需要您注意的重要提示信息,忽略該信息可能對(duì)您的業(yè)務(wù)造成影響,請(qǐng)務(wù)必仔細(xì)閱讀。
在完成數(shù)據(jù)訂閱通道的配置(創(chuàng)建好訂閱任務(wù)和消費(fèi)組)后,您可以自行編寫SDK示例代碼或使用DTS提供的SDK示例代碼來(lái)訂閱數(shù)據(jù)變更信息,本文介紹示例代碼的使用方法。
本操作為Java語(yǔ)言的SDK客戶端示例,Python和Go語(yǔ)言的示例代碼,請(qǐng)參見(jiàn)dts-subscribe-demo。
操作步驟
如果數(shù)據(jù)源是PolarDB-X 1.0或DMS LogicDB,消費(fèi)訂閱數(shù)據(jù)的操作步驟,請(qǐng)參見(jiàn)使用SDK示例代碼消費(fèi)PolarDB-X 1.0訂閱數(shù)據(jù)。
如果使用子賬號(hào)(RAM用戶)來(lái)訂閱數(shù)據(jù),該賬號(hào)需具備AliyunDTSFullAccess權(quán)限,以及訂閱對(duì)象的訪問(wèn)權(quán)限。授權(quán)方法,請(qǐng)參見(jiàn)通過(guò)系統(tǒng)策略授權(quán)子賬號(hào)管理DTS和為RAM用戶授權(quán)。
不同的消費(fèi)之間是相互獨(dú)立的。
本文以IntelliJ IDEA軟件(Community Edition 2020.1 Windows版本)為例,介紹如何運(yùn)行SDK示例代碼來(lái)消費(fèi)訂閱數(shù)據(jù)。
- 創(chuàng)建新版數(shù)據(jù)訂閱通道,詳情請(qǐng)參見(jiàn)訂閱方案概覽中的相關(guān)配置文檔。
- 創(chuàng)建一個(gè)或多個(gè)消費(fèi)組,詳情請(qǐng)參見(jiàn)新增消費(fèi)組。
根據(jù)業(yè)務(wù)需求,使用SDK示例代碼。
重要在消費(fèi)訂閱數(shù)據(jù)時(shí),您需要調(diào)用DefaultUserRecord的commit方法以提交位點(diǎn)信息,否則會(huì)導(dǎo)致數(shù)據(jù)重復(fù)消費(fèi)。
使用打包好的新版訂閱SDK(推薦)
打開IntelliJ IDEA軟件,然后單擊Create New Project,新建一個(gè)業(yè)務(wù)Project。
在新建的業(yè)務(wù)Project中,找到項(xiàng)目對(duì)象模型文件:pom.xml。
在pom.xml中添加如下依賴:
<dependency> <groupId>com.aliyun.dts</groupId> <artifactId>dts-new-subscribe-sdk</artifactId> <version>{dts_new_sdk_version}</version> </dependency>
重要您可以在dts-new-subscribe-sdk頁(yè)面查看最新Maven依賴。
dts-new-subscribe-sdk中封裝了一個(gè)原生依賴:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>{version}</version> </dependency>
2.0.0版本的dts-new-subscribe-sdk封裝的是2.7版本的kafka-clients,2.0.0以下版本封裝的是1.0.0版本的kafka-clients。
參考使用示例代碼使用新版訂閱SDK。
使用定制修改后的新版訂閱SDK
下載SDK示例代碼文件,然后解壓該文件。
說(shuō)明單擊,然后選擇Download ZIP下載文件。
定位至SDK示例代碼解壓的目錄,使用文本編輯工具打開pom.xml文件,將數(shù)據(jù)訂閱SDK的版本修改為最新版本。
重要您可以在dts-new-subscribe-sdk頁(yè)面查看最新Maven依賴。
dts-new-subscribe-sdk中封裝了一個(gè)原生依賴:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>{version}</version> </dependency>
2.0.0版本的dts-new-subscribe-sdk封裝的是2.7版本的kafka-clients,2.0.0以下版本封裝的是1.0.0版本的kafka-clients。
打開IntelliJ IDEA軟件,然后單擊Open or Import。
在彈出的對(duì)話框中,定位至SDK示例代碼解壓的目錄,依次展開文件夾,找到項(xiàng)目對(duì)象模型文件:pom.xml。
在彈出對(duì)話框中,選擇Open as Project。
在IntelliJ IDEA軟件界面,依次展開文件夾,并根據(jù) SDK客戶端的使用模式,選擇并雙擊打開對(duì)應(yīng)的Java文件:DTSConsumerAssignDemo.java或DTSConsumerSubscribeDemo.java。
說(shuō)明DTS支持以下兩種SDK客戶端的使用模式:
ASSIGN模式:DTS為了保證消息的全局有序,每個(gè)訂閱Topic只有一個(gè)partition,且固定分配至partition 0中。當(dāng)SDK客戶端的使用模式為ASSIGN模式時(shí),建議只啟動(dòng)一個(gè)SDK客戶端。
SUBSCRIBE模式:DTS為了保證消息的全局有序,每個(gè)訂閱Topic只有一個(gè)partition,且固定分配至partition 0中。當(dāng)SDK客戶端的使用模式為SUBSCRIBE模式時(shí),您可以在一個(gè)消費(fèi)組下同時(shí)啟動(dòng)多個(gè)SDK客戶端,以實(shí)現(xiàn)災(zāi)備。實(shí)現(xiàn)原理是當(dāng)消費(fèi)組下的正常消費(fèi)數(shù)據(jù)的客戶端發(fā)生故障后,其他的SDK客戶端將隨機(jī)且自動(dòng)地分配到partition 0,繼續(xù)消費(fèi)。
設(shè)置Java文件代碼中的必填參數(shù)。
表 1. 必填參數(shù)說(shuō)明
參數(shù)
說(shuō)明
獲取方式
brokerUrl
數(shù)據(jù)訂閱通道的網(wǎng)絡(luò)地址及端口號(hào)信息。
說(shuō)明如果您部署SDK客戶端所屬的ECS實(shí)例與數(shù)據(jù)訂閱通道屬于經(jīng)典網(wǎng)絡(luò)或同一專有網(wǎng)絡(luò),建議通過(guò)內(nèi)網(wǎng)地址進(jìn)行數(shù)據(jù)訂閱,網(wǎng)絡(luò)延遲最小。
不建議使用公網(wǎng)地址。
在DTS控制臺(tái)單擊目標(biāo)訂閱實(shí)例ID,在基本信息頁(yè)面的網(wǎng)絡(luò)區(qū)域,您可以獲取網(wǎng)絡(luò)地址及端口號(hào)信息。
topic
數(shù)據(jù)訂閱通道的訂閱Topic。
在DTS控制臺(tái)單擊目標(biāo)訂閱實(shí)例ID,在基本信息頁(yè)面的基本信息區(qū)域,您可以獲取到訂閱Topic。
sid
消費(fèi)組ID。
在DTS控制臺(tái)單擊目標(biāo)訂閱實(shí)例ID,然后單擊數(shù)據(jù)消費(fèi),您可以獲取到消費(fèi)組ID/名稱和消費(fèi)組的賬號(hào)信息。
說(shuō)明消費(fèi)組賬號(hào)的密碼已在您新建消費(fèi)組時(shí)指定。
userName
消費(fèi)組的賬號(hào)。
警告如您未使用本文提供的客戶端,請(qǐng)按照
<消費(fèi)組的賬號(hào)>-<消費(fèi)組ID>
的格式設(shè)置用戶名(例如:dtstest-dtsae******bpv
),否則無(wú)法正常連接。password
該賬號(hào)的密碼。
initCheckpoint
消費(fèi)位點(diǎn),即SDK客戶端消費(fèi)第一條數(shù)據(jù)的時(shí)間戳,格式為Unix時(shí)間戳,例如1620962769。
說(shuō)明消費(fèi)位點(diǎn)信息可用于:
當(dāng)業(yè)務(wù)程序中斷后,傳入已消費(fèi)位點(diǎn)繼續(xù)消費(fèi)數(shù)據(jù),防止數(shù)據(jù)丟失。
在訂閱客戶端啟動(dòng)時(shí),傳入所需的消費(fèi)位點(diǎn),調(diào)整訂閱位點(diǎn),實(shí)現(xiàn)按需消費(fèi)數(shù)據(jù)。
消費(fèi)位點(diǎn)必須在訂閱實(shí)例的數(shù)據(jù)范圍之內(nèi),并需轉(zhuǎn)化為Unix時(shí)間戳。
說(shuō)明您可以在訂閱任務(wù)列表的數(shù)據(jù)范圍列,查看訂閱實(shí)例的數(shù)據(jù)范圍。
Unix時(shí)間戳轉(zhuǎn)換工具可用搜索引擎獲取。
ConsumerContext.ConsumerSubscribeMode subscribeMode
SDK客戶端的使用模式,取值為:
ConsumerContext.ConsumerSubscribeMode.ASSIGN
:ASSIGN模式,即一個(gè)消費(fèi)組下僅支持一個(gè)SDK客戶端消費(fèi)訂閱數(shù)據(jù)。ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE
:SUBSCRIBE模式,即支持在同一個(gè)消費(fèi)組下同時(shí)啟動(dòng)多個(gè)SDK客戶端實(shí)現(xiàn)災(zāi)備。
無(wú)
在IntelliJ IDEA軟件界面的頂部,選擇 運(yùn)行該客戶端。
說(shuō)明首次運(yùn)行時(shí),軟件需要一定時(shí)間自動(dòng)加載相關(guān)依賴包并完成安裝。
運(yùn)行結(jié)果如下圖所示,該客戶端可正常訂閱到源庫(kù)的數(shù)據(jù)變更信息。
SDK客戶端每隔一定時(shí)間會(huì)統(tǒng)計(jì)并顯示消費(fèi)數(shù)據(jù)的信息,包括數(shù)據(jù)發(fā)送和接受時(shí)數(shù)據(jù)總數(shù)、數(shù)據(jù)總量、每秒請(qǐng)求數(shù)接收RPS等。
表 2. 消費(fèi)數(shù)據(jù)的統(tǒng)計(jì)信息
參數(shù)
說(shuō)明
outCounts
SDK客戶端所消費(fèi)的數(shù)據(jù)總數(shù)。
outBytes
SDK客戶端所消費(fèi)的數(shù)據(jù)總量,單位為Byte。
outRps
SDK客戶端消費(fèi)數(shù)據(jù)時(shí)的每秒請(qǐng)求數(shù)。
outBps
SDK客戶端消費(fèi)數(shù)據(jù)時(shí)每秒傳送的比特?cái)?shù)。
inBytes
DTS服務(wù)器發(fā)送的數(shù)據(jù)總量,單位為Byte。
DStoreRecordQueue
DTS服務(wù)器發(fā)送數(shù)據(jù)時(shí),當(dāng)前數(shù)據(jù)緩存隊(duì)列的大小。
inCounts
DTS服務(wù)器發(fā)送數(shù)據(jù)總數(shù)。
inRps
DTS服務(wù)器發(fā)送數(shù)據(jù)時(shí)的每秒請(qǐng)求數(shù)。
__dt
SDK客戶端接收到數(shù)據(jù)的當(dāng)前時(shí)間戳,單位為毫秒。
DefaultUserRecordQueue
序列化后,當(dāng)前數(shù)據(jù)緩存隊(duì)列的大小。
管理消費(fèi)位點(diǎn)
當(dāng)SDK客戶端首次啟動(dòng)、重啟或者發(fā)生內(nèi)部重試時(shí),您需要查詢并傳入 消費(fèi)位點(diǎn),開始或重新消費(fèi)數(shù)據(jù)。下文將介紹在不同情況下如何管理和查詢消費(fèi)位點(diǎn),以確保數(shù)據(jù)不丟失,且盡量不重復(fù),實(shí)現(xiàn)按需消費(fèi)。
若您需要重置客戶端的消費(fèi)位點(diǎn),可以根據(jù)訂閱的模式(SDK使用模式)參考下表查詢消費(fèi)位點(diǎn)并進(jìn)行修改。
場(chǎng)景 | SDK使用模式 | 位點(diǎn)管理方式 |
查詢消費(fèi)位點(diǎn) | ASSIGN模式、SUBSCRIBE模式 |
|
首次啟動(dòng)SDK客戶端,需傳入消費(fèi)位點(diǎn),來(lái)消費(fèi)數(shù)據(jù)。 | ASSIGN模式、SUBSCRIBE模式 | 根據(jù)SDK客戶端使用模式,選擇Java文件DTSConsumerAssignDemo.java或DTSConsumerSubscribeDemo.java,并配置消費(fèi)位點(diǎn) |
SDK客戶端因內(nèi)部重試,需重新傳入上一個(gè)記錄的消費(fèi)位點(diǎn),以繼續(xù)消費(fèi)數(shù)據(jù)。 | ASSIGN模式 | 按如下順序,查找上一個(gè)記錄的消費(fèi)位點(diǎn),找到即可返回位點(diǎn)信息:
|
SUBSCRIBE模式 | 按如下順序,查找上一個(gè)記錄的消費(fèi)位點(diǎn),找到即可返回位點(diǎn)信息:
| |
已重啟SDK客戶端,需重新傳入上一個(gè)記錄的消費(fèi)位點(diǎn),以繼續(xù)消費(fèi)數(shù)據(jù)。 | ASSIGN模式 | 根據(jù)consumerContext.java文件中
|
SUBSCRIBE模式 | 該模式下consumerContext.java文件中
|
持久化存儲(chǔ)消費(fèi)位點(diǎn)
如果增量數(shù)據(jù)采集模塊觸發(fā)容災(zāi)機(jī)制(特別是SUBSCRIBE模式),新建的增量數(shù)據(jù)采集模塊將無(wú)法保存客戶端上次的消費(fèi)位點(diǎn)信息,可能會(huì)導(dǎo)致客戶端從一個(gè)較舊的位點(diǎn)開始消費(fèi)訂閱數(shù)據(jù),從而造成歷史數(shù)據(jù)的重復(fù)消費(fèi)。例如:增量數(shù)據(jù)服務(wù)切換前,老的增量數(shù)據(jù)采集模塊位點(diǎn)范圍為2023年11月11日 08:00:00~ 2023年11月12日 08:00:00,客戶端的消費(fèi)位點(diǎn)為2023年11月12日 08:00:00;增量數(shù)據(jù)服務(wù)切換后,新的增量數(shù)據(jù)采集模塊位點(diǎn)范圍為2023年11月08日 10:00:00~ 2023年11月12日 08:01:00,那么客戶端會(huì)從新的增量數(shù)據(jù)采集模塊的起始位點(diǎn)2023年11月08日 10:00:00開始消費(fèi),造成重復(fù)消費(fèi)歷史數(shù)據(jù)。
為了規(guī)避這種切換場(chǎng)景對(duì)歷史數(shù)據(jù)的重復(fù)消費(fèi),建議您在客戶端配置一個(gè)在客戶端保存的消費(fèi)位點(diǎn)持久化存儲(chǔ)方式。示例方法如下,您可以根據(jù)實(shí)際情況進(jìn)行修改。
創(chuàng)建一個(gè)
UserMetaStore()
方法,繼承實(shí)現(xiàn)AbstractUserMetaStore()
方法。例如使用MySQL數(shù)據(jù)庫(kù)存儲(chǔ)位點(diǎn)信息,Java示例代碼如下:
public class UserMetaStore extends AbstractUserMetaStore { @Override protected void saveData(String groupID, String toStoreJson) { Connection con = getConnection(); String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); pres.setString(2, toStoreJson); pres.execute(); } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } @Override protected String getData(String groupID) { Connection con = getConnection(); String sql = "select checkpoint from dts_checkpoint where group_id = ?"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); ResultSet rs = pres.executeQuery() String checkpoint = rs.getString("checkpoint"); return checkpoint; } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } }
在consumerContext.java文件中的
setUserRegisteredStore(new UserMetaStore())
方法,配置外部存儲(chǔ)介質(zhì)。
常見(jiàn)問(wèn)題
無(wú)法連接訂閱實(shí)例,如何處理?
請(qǐng)根據(jù)報(bào)錯(cuò)提示進(jìn)行排查,詳情請(qǐng)參見(jiàn)問(wèn)題排查。
持久化后的消費(fèi)位點(diǎn)是什么格式的數(shù)據(jù)?
消費(fèi)位點(diǎn)在持久化處理后,將返回JSON格式的數(shù)據(jù)。其中,持久化后的消費(fèi)位點(diǎn)的格式為Unix時(shí)間戳,您可以直接將其傳回SDK進(jìn)行使用。如下返回?cái)?shù)據(jù)中,
"timestamp"
后的1700709977
即為持久化后的消費(fèi)位點(diǎn)。{"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":1700709977,"info":""}]}
訂閱任務(wù)是否支持多個(gè)客戶端并行消費(fèi)?
不支持。SUBSCRIBE模式允許多個(gè)客戶端并行,但只有一個(gè)客戶端可以消費(fèi)到數(shù)據(jù)。
是否支持使用Python或Go語(yǔ)言消費(fèi)訂閱數(shù)據(jù)?
支持。Python和Go語(yǔ)言的示例代碼,請(qǐng)參見(jiàn)dts-subscribe-demo。
問(wèn)題排查
問(wèn)題 | 報(bào)錯(cuò)提示 | 原因 | 解決方法 |
無(wú)法連接 |
|
| 填入正確的 |
| 無(wú)法通過(guò)broker地址連接真實(shí)的IP地址。 | ||
| 用戶名和密碼錯(cuò)誤。 | ||
| consumerContext.java文件中 | 傳入在訂閱實(shí)例的數(shù)據(jù)范圍之內(nèi)的消費(fèi)位點(diǎn),查詢方式,請(qǐng)參見(jiàn)必填參數(shù)說(shuō)明。 | |
消費(fèi)訂閱速度變慢 | 無(wú) |
|