日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用SDK示例代碼消費訂閱數(shù)據(jù)

重要

本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業(yè)務造成影響,請務必仔細閱讀。

在完成數(shù)據(jù)訂閱通道的配置(創(chuàng)建好訂閱任務和消費組)后,您可以自行編寫SDK示例代碼或使用DTS提供的SDK示例代碼來訂閱數(shù)據(jù)變更信息,本文介紹示例代碼的使用方法。

重要

本操作為Java語言的SDK客戶端示例,Python和Go語言的示例代碼,請參見dts-subscribe-demo

操作步驟

說明

本文以IntelliJ IDEA軟件(Community Edition 2020.1 Windows版本)為例,介紹如何運行SDK示例代碼來消費訂閱數(shù)據(jù)。

  1. 創(chuàng)建新版數(shù)據(jù)訂閱通道,詳情請參見訂閱方案概覽中的相關配置文檔。
  2. 創(chuàng)建一個或多個消費組,詳情請參見新增消費組
  3. 根據(jù)業(yè)務需求,使用SDK示例代碼。

    重要

    在消費訂閱數(shù)據(jù)時,您需要調用DefaultUserRecord的commit方法以提交位點信息,否則會導致數(shù)據(jù)重復消費。

    • 使用打包好的新版訂閱SDK(推薦)

      1. 打開IntelliJ IDEA軟件,然后單擊Create New Project,新建一個業(yè)務Project。

      2. 在新建的業(yè)務Project中,找到項目對象模型文件:pom.xml

      3. pom.xml中添加如下依賴:

        <dependency>
            <groupId>com.aliyun.dts</groupId>
            <artifactId>dts-new-subscribe-sdk</artifactId>
            <version>{dts_new_sdk_version}</version>
        </dependency>
        重要
        • 您可以在dts-new-subscribe-sdk頁面查看最新Maven依賴。

        • dts-new-subscribe-sdk中封裝了一個原生依賴:

          <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>{version}</version>
          </dependency>
        • 2.0.0版本的dts-new-subscribe-sdk封裝的是2.7版本的kafka-clients,2.0.0以下版本封裝的是1.0.0版本的kafka-clients。

      4. 參考使用示例代碼使用新版訂閱SDK。

    • 使用定制修改后的新版訂閱SDK

      1. 下載SDK示例代碼文件,然后解壓該文件。

        說明

        單擊code,然后選擇Download ZIP下載文件。

      2. 定位至SDK示例代碼解壓的目錄,使用文本編輯工具打開pom.xml文件,將數(shù)據(jù)訂閱SDK的版本修改為最新版本。設置SDK版本

        重要
        • 您可以在dts-new-subscribe-sdk頁面查看最新Maven依賴。

        • dts-new-subscribe-sdk中封裝了一個原生依賴:

          <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>{version}</version>
          </dependency>
        • 2.0.0版本的dts-new-subscribe-sdk封裝的是2.7版本的kafka-clients,2.0.0以下版本封裝的是1.0.0版本的kafka-clients。

      3. 打開IntelliJ IDEA軟件,然后單擊Open or Import打開工程

      4. 在彈出的對話框中,定位至SDK示例代碼解壓的目錄,依次展開文件夾,找到項目對象模型文件:pom.xml找到項目對象模型文件

      5. 在彈出對話框中,選擇Open as Project

      6. 在IntelliJ IDEA軟件界面,依次展開文件夾,并根據(jù) SDK客戶端的使用模式,選擇并雙擊打開對應的Java文件:DTSConsumerAssignDemo.javaDTSConsumerSubscribeDemo.javajava客戶端文件

        說明

        DTS支持以下兩種SDK客戶端的使用模式:

        • ASSIGN模式:DTS為了保證消息的全局有序,每個訂閱Topic只有一個partition,且固定分配至partition 0中。當SDK客戶端的使用模式為ASSIGN模式時,建議只啟動一個SDK客戶端。

        • SUBSCRIBE模式:DTS為了保證消息的全局有序,每個訂閱Topic只有一個partition,且固定分配至partition 0中。當SDK客戶端的使用模式為SUBSCRIBE模式時,您可以在一個消費組下同時啟動多個SDK客戶端,以實現(xiàn)災備。實現(xiàn)原理是當消費組下的正常消費數(shù)據(jù)的客戶端發(fā)生故障后,其他的SDK客戶端將隨機且自動地分配到partition 0,繼續(xù)消費。

  4. 設置Java文件代碼中的必填參數(shù)。

    assigndemo

    表 1. 必填參數(shù)說明

    參數(shù)

    說明

    獲取方式

    brokerUrl

    數(shù)據(jù)訂閱通道的網絡地址及端口號信息。

    說明
    • 如果您部署SDK客戶端所屬的ECS實例與數(shù)據(jù)訂閱通道屬于經典網絡或同一專有網絡,建議通過內網地址進行數(shù)據(jù)訂閱,網絡延遲最小。

    • 不建議使用公網地址。

    在DTS控制臺單擊目標訂閱實例ID,在基本信息頁面的網絡區(qū)域,您可以獲取網絡地址及端口號信息。

    topic

    數(shù)據(jù)訂閱通道的訂閱Topic。

    在DTS控制臺單擊目標訂閱實例ID,在基本信息頁面的基本信息區(qū)域,您可以獲取到訂閱Topic

    sid

    消費組ID。

    在DTS控制臺單擊目標訂閱實例ID,然后單擊數(shù)據(jù)消費,您可以獲取到消費組ID/名稱和消費組的賬號信息。

    說明

    消費組賬號的密碼已在您新建消費組時指定。

    userName

    消費組的賬號。

    警告

    如您未使用本文提供的客戶端,請按照<消費組的賬號>-<消費組ID>的格式設置用戶名(例如:dtstest-dtsae******bpv),否則無法正常連接。

    password

    該賬號的密碼。

    initCheckpoint

    消費位點,即SDK客戶端消費第一條數(shù)據(jù)的時間戳,格式為Unix時間戳,例如1620962769。

    說明

    消費位點信息可用于:

    • 當業(yè)務程序中斷后,傳入已消費位點繼續(xù)消費數(shù)據(jù),防止數(shù)據(jù)丟失。

    • 在訂閱客戶端啟動時,傳入所需的消費位點,調整訂閱位點,實現(xiàn)按需消費數(shù)據(jù)。

    消費位點必須在訂閱實例的數(shù)據(jù)范圍之內,并需轉化為Unix時間戳。

    說明
    • 您可以在訂閱任務列表的數(shù)據(jù)范圍列,查看訂閱實例的數(shù)據(jù)范圍。

    • Unix時間戳轉換工具可用搜索引擎獲取。

    ConsumerContext.ConsumerSubscribeMode subscribeMode

    SDK客戶端的使用模式,取值為:

    • ConsumerContext.ConsumerSubscribeMode.ASSIGN:ASSIGN模式,即一個消費組下僅支持一個SDK客戶端消費訂閱數(shù)據(jù)。

    • ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE:SUBSCRIBE模式,即支持在同一個消費組下同時啟動多個SDK客戶端實現(xiàn)災備。

  5. 在IntelliJ IDEA軟件界面的頂部,選擇Run > Run運行該客戶端。

    說明

    首次運行時,軟件需要一定時間自動加載相關依賴包并完成安裝。

    • 運行結果如下圖所示,該客戶端可正常訂閱到源庫的數(shù)據(jù)變更信息。消費數(shù)據(jù)

    • SDK客戶端每隔一定時間會統(tǒng)計并顯示消費數(shù)據(jù)的信息,包括數(shù)據(jù)發(fā)送和接受時數(shù)據(jù)總數(shù)、數(shù)據(jù)總量、每秒請求數(shù)接收RPS等。統(tǒng)計信息

      表 2. 消費數(shù)據(jù)的統(tǒng)計信息

      參數(shù)

      說明

      outCounts

      SDK客戶端所消費的數(shù)據(jù)總數(shù)。

      outBytes

      SDK客戶端所消費的數(shù)據(jù)總量,單位為Byte。

      outRps

      SDK客戶端消費數(shù)據(jù)時的每秒請求數(shù)。

      outBps

      SDK客戶端消費數(shù)據(jù)時每秒傳送的比特數(shù)。

      inBytes

      DTS服務器發(fā)送的數(shù)據(jù)總量,單位為Byte。

      DStoreRecordQueue

      DTS服務器發(fā)送數(shù)據(jù)時,當前數(shù)據(jù)緩存隊列的大小。

      inCounts

      DTS服務器發(fā)送數(shù)據(jù)總數(shù)。

      inRps

      DTS服務器發(fā)送數(shù)據(jù)時的每秒請求數(shù)。

      __dt

      SDK客戶端接收到數(shù)據(jù)的當前時間戳,單位為毫秒。

      DefaultUserRecordQueue

      序列化后,當前數(shù)據(jù)緩存隊列的大小。

管理消費位點

當SDK客戶端首次啟動、重啟或者發(fā)生內部重試時,您需要查詢并傳入 消費位點,開始或重新消費數(shù)據(jù)。下文將介紹在不同情況下如何管理和查詢消費位點,以確保數(shù)據(jù)不丟失,且盡量不重復,實現(xiàn)按需消費。

若您需要重置客戶端的消費位點,可以根據(jù)訂閱的模式(SDK使用模式)參考下表查詢消費位點并進行修改。

場景

SDK使用模式

位點管理方式

查詢消費位點

ASSIGN模式、SUBSCRIBE模式

  • 由于SDK客戶端每5秒保存一次消息位點,并提交至DTS服務器,如需查詢最近一次消費位點,您可通過以下路徑查詢:

    • SDK客戶端所在服務器的localCheckpointStore文件。

    • 訂閱通道的數(shù)據(jù)消費界面。

  • 如您在consumerContext.java文件中setUserRegisteredStore(new UserMetaStore())配置了外部的持久化共享存儲介質(如數(shù)據(jù)庫),該存儲介質每5秒會保存一次消息位點,供您查詢。

首次啟動SDK客戶端,需傳入消費位點,來消費數(shù)據(jù)。

ASSIGN模式、SUBSCRIBE模式

根據(jù)SDK客戶端使用模式,選擇Java文件DTSConsumerAssignDemo.javaDTSConsumerSubscribeDemo.java,并配置消費位點initCheckpoint進行消費。配置方式,請參見34

SDK客戶端因內部重試,需重新傳入上一個記錄的消費位點,以繼續(xù)消費數(shù)據(jù)。

ASSIGN模式

按如下順序,查找上一個記錄的消費位點,找到即可返回位點信息:

  1. 您在consumerContext.java文件中setUserRegisteredStore(new UserMetaStore())配置的外部存儲介質。

  2. SDK客戶端所在服務器的localCheckpointStore文件。

  3. 您在DTSConsumerSubscribeDemo.java文件中initCheckpoint傳入的開始時間戳(start timestamp)。

SUBSCRIBE模式

按如下順序,查找上一個記錄的消費位點,找到即可返回位點信息:

  1. 您在consumerContext.java文件中setUserRegisteredStore(new UserMetaStore())配置的外部存儲介質。

  2. DTS Server(增量數(shù)據(jù)采集模塊)保存的位點。

    說明

    SDK客戶端調用commit方法更新消費位點后,此位點才會更新。

  3. 您在DTSConsumerSubscribeDemo.java文件中initCheckpoint傳入的開始時間戳(start timestamp)。

  4. 使用DTS Server(新建增量數(shù)據(jù)采集模塊)的起始位點。

    重要

    如果增量數(shù)據(jù)采集模塊發(fā)生了切換,新建的增量數(shù)據(jù)采集模塊將無法保存客戶端上次的消費位點信息,可能會導致從一個較舊的位點開始消費訂閱數(shù)據(jù)。建議您在客戶端持久化存儲消費位點,詳情請參見持久化存儲消費位點

已重啟SDK客戶端,需重新傳入上一個記錄的消費位點,以繼續(xù)消費數(shù)據(jù)。

ASSIGN模式

根據(jù)consumerContext.java文件中setForceUseCheckpoint配置情況,查詢消費位點,找到即可返回位點信息:

  • 配置為true時,每次重啟SDK客戶端,都會強制使用傳入的initCheckpoint作為消費位點。

  • 配置為false或者沒有配置時,請按如下順序,查找上一個記錄的消費位點:

    1. SDK客戶端所在服務器的localCheckpointStore文件。

    2. DTS Server(增量數(shù)據(jù)采集模塊)保存的位點。

      說明

      SDK客戶端調用commit方法更新消費位點后,此位點才會更新。

    3. 您在consumerContext.java文件中setUserRegisteredStore(new UserMetaStore())配置的外部存儲介質。

SUBSCRIBE模式

該模式下consumerContext.java文件中setForceUseCheckpoint配置不生效,請按如下順序,查找上一個記錄的消費位點:

  1. 您在consumerContext.java文件中setUserRegisteredStore(new UserMetaStore())配置的外部存儲介質。

  2. DTS Server(增量數(shù)據(jù)采集模塊)保存的位點。

    說明

    SDK客戶端調用commit方法更新消費位點后,此位點才會更新。

  3. 您在DTSConsumerSubscribeDemo.java文件中initCheckpoint傳入的開始時間戳(start timestamp)。

  4. 使用DTS Server(新建增量數(shù)據(jù)采集模塊)的起始位點。

持久化存儲消費位點

如果增量數(shù)據(jù)采集模塊觸發(fā)容災機制(特別是SUBSCRIBE模式),新建的增量數(shù)據(jù)采集模塊將無法保存客戶端上次的消費位點信息,可能會導致客戶端從一個較舊的位點開始消費訂閱數(shù)據(jù),從而造成歷史數(shù)據(jù)的重復消費。例如:增量數(shù)據(jù)服務切換前,老的增量數(shù)據(jù)采集模塊位點范圍為2023年11月11日 08:00:00~ 2023年11月12日 08:00:00,客戶端的消費位點為2023年11月12日 08:00:00;增量數(shù)據(jù)服務切換后,新的增量數(shù)據(jù)采集模塊位點范圍為2023年11月08日 10:00:00~ 2023年11月12日 08:01:00,那么客戶端會從新的增量數(shù)據(jù)采集模塊的起始位點2023年11月08日 10:00:00開始消費,造成重復消費歷史數(shù)據(jù)。

為了規(guī)避這種切換場景對歷史數(shù)據(jù)的重復消費,建議您在客戶端配置一個在客戶端保存的消費位點持久化存儲方式。示例方法如下,您可以根據(jù)實際情況進行修改。

  1. 創(chuàng)建一個UserMetaStore()方法,繼承實現(xiàn)AbstractUserMetaStore()方法。

    例如使用MySQL數(shù)據(jù)庫存儲位點信息,Java示例代碼如下:

    public class UserMetaStore extends AbstractUserMetaStore {
    
        @Override
        protected void saveData(String groupID, String toStoreJson) {
            Connection con = getConnection();
    			  String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
                pres.setString(2, toStoreJson);
                pres.execute();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    
        @Override
        protected String getData(String groupID) {
            Connection con = getConnection();
    			  String sql = "select checkpoint from dts_checkpoint where group_id = ?";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
    						ResultSet rs = pres.executeQuery()
                              
                String checkpoint = rs.getString("checkpoint");
              
                return checkpoint;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    }
    
  2. consumerContext.java文件中的setUserRegisteredStore(new UserMetaStore())方法,配置外部存儲介質。

常見問題

  • 無法連接訂閱實例,如何處理?

    請根據(jù)報錯提示進行排查,詳情請參見問題排查

  • 持久化后的消費位點是什么格式的數(shù)據(jù)?

    消費位點在持久化處理后,將返回JSON格式的數(shù)據(jù)。其中,持久化后的消費位點的格式為Unix時間戳,您可以直接將其傳回SDK進行使用。如下返回數(shù)據(jù)中,"timestamp"后的1700709977即為持久化后的消費位點。

    {"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":1700709977,"info":""}]}
  • 訂閱任務是否支持多個客戶端并行消費?

    不支持。SUBSCRIBE模式允許多個客戶端并行,但只有一個客戶端可以消費到數(shù)據(jù)。

  • 是否支持使用Python或Go語言消費訂閱數(shù)據(jù)?

    支持。Python和Go語言的示例代碼,請參見dts-subscribe-demo

問題排查

問題

報錯提示

原因

解決方法

無法連接

ERROR
CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009
failed, please check the network and if the brokerUrl is correct'}
(com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

brokerUrl填寫錯誤。

填入正確的brokerUrluserNamepassword,查詢方式,請參見必填參數(shù)說明

telnet real node *** failed, please check the network

無法通過broker地址連接真實的IP地址。

ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

用戶名和密碼錯誤。

com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failed

consumerContext.java文件中 setUseCheckpoint配置為true,但消費位點不在訂閱實例的數(shù)據(jù)范圍之內。

傳入在訂閱實例的數(shù)據(jù)范圍之內的消費位點,查詢方式,請參見必填參數(shù)說明

消費訂閱速度變慢

  • 通過查詢統(tǒng)計信息中的參數(shù)DStoreRecordQueueDefaultUserRecordQueue隊列的大小,分析消費數(shù)據(jù)變慢的原因。查詢方式,請參見消費數(shù)據(jù)的統(tǒng)計信息

    • 如參數(shù)DStoreRecordQueue保持為0,則表示DTS服務器拉取數(shù)據(jù)速度變慢。

    • 如參數(shù)DefaultUserRecordQueue保持為默認值512,則表示SDK客戶端消費數(shù)據(jù)的速度變慢。

  • 根據(jù)實際情況,修改代碼中的消費位點(initCheckpoint)進行重置位點。