完成數據訂閱任務后,您可以使用DTS提供的SDK示例代碼來訂閱數據變更信息。本文介紹通過SDK示例代碼消費分布式訂閱數據,支持的數據源包括PolarDB-X 1.0和DMS LogicDB。
前提條件
已安裝JDK 1.8版本。
已安裝IntelliJ IDEA軟件。
注意事項
如果使用子賬號(RAM用戶)來訂閱數據,該賬號需具備AliyunDTSFullAccess權限,以及訂閱對象的訪問權限,授權方法請參見通過系統策略授權子賬號管理DTS和為RAM用戶授權。
操作步驟
本文以IntelliJ IDEA軟件(Community Edition 2020.1 Windows版本)為例,介紹如何運行SDK示例代碼來消費PolarDB-X 1.0的訂閱數據。
創建新版數據訂閱通道,具體操作請參見創建RDS MySQL數據訂閱任務。
創建一個或多個消費組,具體操作請參見新增消費組。
下載并解壓SDK示例代碼文件,下載地址為SDK示例代碼。
重要在消費訂閱數據時,您需要調用DefaultUserRecord的commit方法以提交位點信息,否則會導致數據重復消費。
在IntelliJ IDEA軟件中打開目標項目。
打開IntelliJ IDEA軟件,然后單擊Open or Import。
在彈出的對話框中,選擇SDK示例代碼解壓的目錄,依次展開文件夾,雙擊項目對象模型文件:pom.xml。
在彈出對話框中,選擇Open as Projec。
在IntelliJ IDEA軟件界面,依次展開文件夾,并根據 SDK客戶端的使用模式,選擇并雙擊打開對應的Java文件:DistributedDTSConsumerDemo。
設置Java文件代碼中的必填參數。
public static void main(String[] args) throws ClientException { //分布式類型數據源的訂閱配置方式,例如PolarDBX10(原DRDS)。配置AccessKey、實例Id、主任務id,訂閱消費組等相關信息。 String accessKeyId = "LTA***********99reZ"; String accessKeySecret = "****************"; String regionId = "cn-hangzhou"; String dtsInstanceId = "dtse5212sed162****"; String jobId = "l791216x16d****"; String sid = "dtsip412t13160****"; String userName = "xftest"; String password = "******"; String proxyUrl = "dts-cn-****.com:18001"; // initial checkpoint for first seek(a timestamp to set, eg 1566180200 if you want (Mon Aug 19 10:03:21 CST 2019)) String checkpoint = "1639620090"; // Convert physical database/table name to logical database/table name boolean mapping = true; // if force use config checkpoint when start. for checkpoint reset, only assign mode works boolean isForceUseInitCheckpoint = false; ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.ASSIGN; DistributedDTSConsumerDemo demo = new DistributedDTSConsumerDemo(userName, password, regionId, jobId, sid, dtsInstanceId, accessKeyId, accessKeySecret, subscribeMode, proxyUrl, checkpoint, isForceUseInitCheckpoint, mapping); demo.start(); }
參數
說明
獲取方法
accessKeyId
訪問密鑰ID。
獲取方法請參見獲取AccessKey。
accessKeySecret
訪問密鑰ID的密碼。
regionId
數據訂閱任務的地域ID。
在DTS控制臺單擊目標訂閱實例ID,在基本信息頁面,您可以獲取地域信息,例如:地域為華東1(杭州),代碼中需要填寫為
cn-hangzhou
,詳情請參見地域列表。dtsInstanceId
數據訂閱任務實例ID。
在DTS控制臺單擊目標訂閱實例ID,在基本信息頁面,您可以獲取數據訂閱實例的任務實例ID。
jobId
數據訂閱任務ID。
您可以調用DescribeDtsJobs接口獲取數據訂閱任務ID(DtsJobId)。
sid
消費組ID。
在DTS控制臺單擊目標訂閱實例ID,然后單擊左側導航欄的數據消費,您可以獲取到消費組ID/名稱和消費組的賬號信息。
說明消費組賬號的密碼已在您新建消費組時指定。
userName
消費組的賬號。
password
消費組賬號的密碼。
proxyUrl
數據訂閱通道的網絡地址及端口號信息。
說明如果您部署SDK客戶端所屬的ECS實例與數據訂閱通道屬于經典網絡或同一專有網絡,建議通過內網地址進行數據訂閱,網絡延遲最小。
不建議使用公網地址。
在DTS控制臺單擊目標訂閱實例ID,在基本信息頁面,您可以獲取到網絡信息。
checkpoint
消費位點,即SDK客戶端消費第一條數據的時間戳,格式為Unix時間戳,使用秒級時間戳。
說明消費位點信息可用于:
當業務程序中斷后,傳入已消費位點繼續消費數據,防止數據丟失。
在訂閱客戶端啟動時,傳入所需的消費位點,調整訂閱位點,實現按需消費數據。
消費位點必須在訂閱實例的數據范圍之內,并需轉化為Unix時間戳。
說明您可以在訂閱任務列表的數據范圍列,查看訂閱實例的數據范圍。
Unix時間戳轉換工具可用搜索引擎獲取。
在IntelliJ IDEA軟件界面的頂部,選擇 運行該客戶端。
說明首次運行時,軟件需要一定時間自動加載相關依賴包并完成安裝。
運行結果中顯示該客戶端可正常訂閱到源庫的數據變更信息。
SDK客戶端每隔一定時間會統計并顯示消費數據的信息,包括數據發送和接受時數據總數、數據總量、每秒請求數接收RPS等。
表 1. 消費數據的統計信息
參數
說明
outCounts
SDK客戶端所消費的數據總數。
outBytes
SDK客戶端所消費的數據總量,單位為Byte。
outRps
SDK客戶端消費數據時的每秒請求數。
outBps
SDK客戶端消費數據時每秒傳送的比特數。
count
暫無。
inBytes
DTS服務器發送的數據總量,單位為Byte。
DStoreRecordQueue
DTS服務器發送數據時,當前數據緩存隊列的大小。
inCounts
DTS服務器發送數據總數。
inRps
DTS服務器發送數據時的每秒請求數。
inBps
DTS服務器發送數據時每秒傳送的比特數。
__dt
SDK客戶端接收到數據的當前時間戳,單位為毫秒。
DefaultUserRecordQueue
序列化后,當前數據緩存隊列的大小。
可選:如果您需要修改訂閱數據的數據類型,可以在
buildRecordListener()
方法中進行修改或者自定義類。public static Map<String, RecordListener> buildRecordListener() { // user can impl their own listener RecordListener mysqlRecordPrintListener = new RecordListener() { @Override public void consume(DefaultUserRecord record) { OperationType operationType = record.getOperationType(); if (operationType.equals(OperationType.INSERT) || operationType.equals(OperationType.UPDATE) || operationType.equals(OperationType.DELETE) || operationType.equals(OperationType.HEARTBEAT)) { // consume record RecordListener recordPrintListener = new DefaultRecordPrintListener(DbType.MySQL); recordPrintListener.consume(record); //commit method push the checkpoint update record.commit(""); } } }; return Collections.singletonMap("mysqlRecordPrinter", mysqlRecordPrintListener); }