日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用SDK示例代碼消費訂閱數據

重要

本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。

完成數據訂閱通道的配置后,您可以使用DTS提供的SDK示例代碼來訂閱數據變更信息,本文介紹該示例代碼的使用說明。

操作步驟

重要

本文以IntelliJ IDEA軟件(Community Edition 2020.1 Windows版本)為例,介紹如何運行SDK示例代碼來消費訂閱數據。

  1. 創建新版數據訂閱通道,詳情請參見創建RDS MySQL數據訂閱通道創建PolarDB MySQL版數據訂閱通道創建Oracle數據訂閱通道

  2. 創建一個或多個消費組,詳情請參見新增消費組

    重要

    在消費訂閱數據時,您需要調用DefaultUserRecord的commit方法以提交位點信息,否則會導致數據重復消費。

  3. 根據業務需求,使用SDK示例代碼。

    • 使用打包好的新版訂閱SDK(推薦)

      1. 打開IntelliJ IDEA軟件,然后單擊Create New Project,新建一個業務Project。

      2. 在新建的業務Project中,找到項目對象模型文件:pom.xml

      3. pom.xml中添加如下依賴:

        <dependency>
            <groupId>com.aliyun.dts</groupId>
            <artifactId>dts-new-subscribe-sdk</artifactId>
            <version>{dts_new_sdk_version}</version>
        </dependency>
        說明

        您可以在dts-new-subscribe-sdk頁面查看最新Maven依賴。

      4. 參考使用示例代碼使用新版訂閱SDK。

    • 使用定制修改后的新版訂閱SDK

      1. 下載SDK示例代碼文件,然后解壓該文件。

        說明

        單擊code,然后選擇Download ZIP下載文件。

      2. 定位至SDK示例代碼解壓的目錄,使用文本編輯工具打開pom.xml文件,將數據訂閱SDK的版本修改為最新版本。設置SDK版本

        重要

        您可以在Maven網站中獲取最新的數據訂閱SDK版本,詳情請參見數據訂閱SDK的Maven頁面

      3. 打開IntelliJ IDEA軟件,然后單擊Open or Import打開工程

      4. 在彈出的對話框中,定位至SDK示例代碼解壓的目錄,依次展開文件夾,找到項目對象模型文件:pom.xml找到項目對象模型文件

      5. 在彈出對話框中,選擇Open as Project

      6. 在IntelliJ IDEA軟件界面,依次展開文件夾,并根據SDK客戶端的使用模式,選擇并雙擊打開對應的Java文件:DTSConsumerAssignDemo.javaDTSConsumerSubscribeDemo.javajava客戶端文件

        說明

        DTS支持以下兩種SDK客戶端的使用模式:

        • ASSIGN模式:DTS為了保證消息的全局有序,每個訂閱Topic只有一個partition,且固定分配至partition 0中。當SDK客戶端的使用模式為ASSIGN模式時,建議只啟動一個SDK客戶端。

        • SUBSCRIBE模式:DTS為了保證消息的全局有序,每個訂閱Topic只有一個partition,且固定分配至partition 0中。當SDK客戶端的使用模式為SUBSCRIBE模式時,您可以在一個消費組下同時啟動多個SDK客戶端,以實現災備。實現原理是當消費組下的正常消費數據的客戶端發生故障后,其他的SDK客戶端將隨機且自動地分配到partition 0,繼續消費。

  4. 設置Java文件代碼中的必填參數。

    assigndemo

    表 1. 必填參數說明

    參數

    說明

    獲取方式

    brokerUrl

    數據訂閱通道的網絡地址及端口號信息。

    說明
    • 如果您部署SDK客戶端所屬的ECS實例與數據訂閱通道屬于經典網絡或同一專有網絡,建議通過內網地址進行數據訂閱,網絡延遲最小。

    • 不建議使用公網地址。

    在DTS控制臺單擊目標訂閱實例ID,在基本信息頁面的網絡區域,您可以獲取網絡地址及端口號信息。網絡

    topic

    數據訂閱通道的訂閱Topic。

    在DTS控制臺單擊目標訂閱實例ID,在基本信息頁面的基本信息區域,您可以獲取到訂閱Topictopic

    sid

    消費組ID。

    在DTS控制臺單擊目標訂閱實例ID,然后單擊數據消費,您可以獲取到消費組ID和消費組的賬號信息。

    說明

    消費組賬號的密碼已在您新建消費組時指定。

    消費組賬號

    userName

    消費組的賬號。

    警告

    如您未使用本文提供的客戶端,請按照<消費組的賬號>-<消費組ID>的格式設置用戶名(例如:dtstest-dtsae******bpv),否則無法正常連接。

    password

    該賬號的密碼。

    initCheckpoint

    消費位點,即SDK客戶端消費第一條數據的時間戳,格式為Unix時間戳,例如1620962769。

    說明

    消費位點信息可用于:

    • 當業務程序中斷后,傳入已消費位點繼續消費數據,防止數據丟失。

    • 在訂閱客戶端啟動時,傳入所需的消費位點,調整訂閱位點,實現按需消費數據。

    消費位點必須在訂閱實例的數據范圍(如圖示)之內,并需轉化為Unix時間戳。數據范圍

    說明

    Unix時間戳轉換工具可用搜索引擎獲取。

    ConsumerContext.ConsumerSubscribeMode subscribeMode

    SDK客戶端的使用模式,取值為:

    • ConsumerContext.ConsumerSubscribeMode.ASSIGN:ASSIGN模式,即一個消費組下僅支持一個SDK客戶端消費訂閱數據。

    • ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE:SUBSCRIBE模式,即支持在同一個消費組下同時啟動多個SDK客戶端實現災備。

  5. 在IntelliJ IDEA軟件界面的頂部,選擇Run > Run運行該客戶端。

    說明

    首次運行時,軟件需要一定時間自動加載相關依賴包并完成安裝。

    • 運行結果如下圖所示,該客戶端可正常訂閱到源庫的數據變更信息。消費數據

    • SDK客戶端每隔一定時間會統計并顯示消費數據的信息,包括數據發送和接受時數據總數、數據總量、每秒請求數接收RPS等。統計信息

      表 2. 消費數據的統計信息

      參數

      說明

      outCounts

      SDK客戶端所消費的數據總數。

      outBytes

      SDK客戶端所消費的數據總量,單位為Byte。

      outRps

      SDK客戶端消費數據時的每秒請求數。

      outBps

      SDK客戶端消費數據時每秒傳送的比特數。

      inBytes

      DTS服務器發送的數據總量,單位為Byte。

      DStoreRecordQueue

      DTS服務器發送數據時,當前數據緩存隊列的大小。

      inCounts

      DTS服務器發送數據總數。

      inRps

      DTS服務器發送數據時的每秒請求數。

      __dt

      SDK客戶端接收到數據的當前時間戳,單位為毫秒。

      DefaultUserRecordQueue

      序列化后,當前數據緩存隊列的大小。

保存和查詢消費位點

當SDK客戶端首次啟動、重啟或者發生內部重試時,您需要查詢并傳入消費位點,開始或重新消費數據。下文將介紹在不同情況下如何管理和查詢消費位點,以確保數據不丟失,且盡量不重復,實現按需消費。

場景

SDK使用模式

查詢方法

查詢消費位點

ASSIGN模式、SUBSCRIBE模式

  • 由于SDK客戶端每5秒保存一次消息位點,并提交至DTS服務器,如需查詢最近一次消費位點,您可通過以下路徑查詢:

    • SDK客戶端所在服務器的localCheckpointStore文件。

    • 訂閱通道的數據消費界面。

  • 如您在consumerContext.java文件中setUserRegisteredStore(newUserMetaStore())配置了外部的持久化共享存儲介質(如數據庫),該存儲介質每5秒會保存一次消息位點,供您查詢。

首次啟動SDK客戶端,需傳入消費位點,來消費數據。

ASSIGN模式、SUBSCRIBE模式

根據SDK客戶端使用模式,選擇Java文件DTSConsumerAssignDemo.javaDTSConsumerSubscribeDemo.java,并配置消費位點initCheckpoint進行消費。配置方式,請參見34

SDK客戶端因內部重試,需重新傳入上一個記錄的消費位點,以繼續消費數據。

ASSIGN模式

請按如下順序,查找上一個記錄的消費位點,找到即可返回位點信息:

  1. 您在consumerContext.java文件中setUserRegisteredStore(newUserMetaStore())配置的外部存儲介質。

  2. SDK客戶端所在服務器的localCheckpointStore文件。

  3. 您在DTSConsumerSubscribeDemo.java文件中initCheckpoint傳入的開始時間戳(start timestamp)。

SUBSCRIBE模式

請按如下順序,查找上一個記錄的消費位點,找到即可返回位點信息:

  1. 您在consumerContext.java文件中setUserRegisteredStore(newUserMetaStore())配置的外部存儲介質。

  2. DTS Server(增量數據采集模塊)保存的位點。

    說明

    SDK客戶端調用commit方法更新消費位點后,此位點才會更新。

  3. 您在DTSConsumerSubscribeDemo.java文件中initCheckpoint傳入的開始時間戳(start timestamp)。

  4. 使用DTS Server(新建增量數據采集模塊)的起始位點。

    重要

    如果增量數據采集模塊發生了切換,新建的增量數據采集模塊將無法保存客戶端上次的消費位點信息,可能會導致從一個較舊的位點開始消費訂閱數據。建議您在客戶端持久化存儲消費位點,詳情請參見持久化存儲消費位點

已重啟SDK客戶端,需重新傳入上一個記錄的消費位點,以繼續消費數據。

ASSIGN模式

根據consumerContext.java文件中setForceUseCheckpoint配置情況,查詢消費位點,找到即可返回位點信息:

  • 配置為true時,每次重啟SDK客戶端,都會強制使用傳入的initCheckpoint作為消費位點。

  • 配置為false或者沒有配置時,請按如下順序,查找上一個記錄的消費位點:

    1. SDK客戶端所在服務器的localCheckpointStore文件。

    2. DTS Server(增量數據采集模塊)保存的位點。

      說明

      SDK客戶端調用commit方法更新消費位點后,此位點才會更新。

    3. 您在consumerContext.java文件中setUserRegisteredStore(newUserMetaStore())配置的外部存儲介質。

SUBSCRIBE模式

該模式下consumerContext.java文件中setForceUseCheckpoint配置不生效,請按如下順序,查找上一個記錄的消費位點:

  1. 您在consumerContext.java文件中setUserRegisteredStore(newUserMetaStore())配置的外部存儲介質。

  2. DTS Server(增量數據采集模塊)保存的位點。

    說明

    SDK客戶端調用commit方法更新消費位點后,此位點才會更新。

  3. 您在DTSConsumerSubscribeDemo.java文件中initCheckpoint傳入的開始時間戳(start timestamp)。

  4. 使用DTS Server(新建增量數據采集模塊)的起始位點。

持久化存儲消費位點

如果增量數據采集模塊觸發容災機制(特別是SUBSCRIBE模式),新建的增量數據采集模塊將無法保存客戶端上次的消費位點信息,可能會導致客戶端從一個較舊的位點開始消費訂閱數據,從而造成歷史數據的重復消費。例如:增量數據服務切換前,老的增量數據采集模塊位點范圍為2023年11月11日 08:00:00~ 2023年11月12日 08:00:00,客戶端的消費位點為2023年11月12日 08:00:00;增量數據服務切換后,新的增量數據采集模塊位點范圍為2023年11月08日 10:00:00~ 2023年11月12日 08:01:00,那么客戶端會從新的增量數據采集模塊的起始位點2023年11月08日 10:00:00開始消費,造成重復消費歷史數據。

為了規避這種切換場景對歷史數據的重復消費,建議您在客戶端配置一個在客戶端保存的消費位點持久化存儲方式。示例方法如下,您可以根據實際情況進行修改。

  1. 創建一個UserMetaStore()方法,繼承實現AbstractUserMetaStore()方法。

    例如使用MySQL數據庫存儲位點信息,Java示例代碼如下:

    public class UserMetaStore extends AbstractUserMetaStore {
    
        @Override
        protected void saveData(String groupID, String toStoreJson) {
            Connection con = getConnection();
    			  String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
                pres.setString(2, toStoreJson);
                pres.execute();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    
        @Override
        protected String getData(String groupID) {
            Connection con = getConnection();
    			  String sql = "select checkpoint from dts_checkpoint where group_id = ?";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
    						ResultSet rs = pres.executeQuery()
                              
                String checkpoint = rs.getString("checkpoint");
              
                return checkpoint;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    }
    
  2. consumerContext.java文件中的setUserRegisteredStore(new UserMetaStore())方法,配置外部存儲介質。

常見問題

  • 無法連接訂閱實例,如何處理?

    請根據報錯提示進行排查,詳情請參見問題排查

  • 持久化后的消費位點是什么格式的數據?

    消費位點在持久化處理后,將返回JSON格式的數據。其中,持久化后的消費位點的格式為Unix時間戳,您可以直接將其傳回SDK進行使用。如下返回數據中,"timestamp"后的1700709977即為持久化后的消費位點。

    {"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":1700709977,"info":""}]}

問題排查

問題

報錯提示

原因

解決方法

無法連接

ERROR
CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009
failed, please check the network and if the brokerUrl is correct'}
(com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

brokerUrl填寫錯誤。

填入正確的brokerUrluserNamepassword,查詢方式,請參見必填參數說明

telnet real node *** failed, please check the network

無法通過broker地址連接真實的IP地址。

ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

用戶名和密碼錯誤。

com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failed

consumerContext.java文件中 setUseCheckpoint配置為true,但消費位點不在訂閱實例的數據范圍(如圖示)之內。

傳入在訂閱實例的數據范圍(如圖示)之內的消費位點,查詢方式,請參見必填參數說明

消費訂閱速度變慢

  • 可通過查詢統計信息中的參數DStoreRecordQueueDefaultUserRecordQueue隊列的大小,分析消費數據變慢的原因。查詢方式,請參見消費數據的統計信息

    • 如參數DStoreRecordQueue保持為0,則表示DTS服務器拉取數據速度變慢。

    • 如參數DefaultUserRecordQueue保持為默認值512,則表示SDK客戶端消費數據的速度變慢。

  • 根據實際情況,修改代碼中的消費位點(initCheckpoint)進行重置位點。