日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用SDK示例代碼消費(fèi)訂閱數(shù)據(jù)

更新時(shí)間:
重要

本文中含有需要您注意的重要提示信息,忽略該信息可能對(duì)您的業(yè)務(wù)造成影響,請(qǐng)務(wù)必仔細(xì)閱讀。

在完成數(shù)據(jù)訂閱通道的配置(創(chuàng)建好訂閱任務(wù)和消費(fèi)組)后,您可以自行編寫SDK示例代碼或使用DTS提供的SDK示例代碼來(lái)訂閱數(shù)據(jù)變更信息,本文介紹示例代碼的使用方法。

重要

本操作為Java語(yǔ)言的SDK客戶端示例,Python和Go語(yǔ)言的示例代碼,請(qǐng)參見(jiàn)dts-subscribe-demo

操作步驟

說(shuō)明

本文以IntelliJ IDEA軟件(Community Edition 2020.1 Windows版本)為例,介紹如何運(yùn)行SDK示例代碼來(lái)消費(fèi)訂閱數(shù)據(jù)。

  1. 創(chuàng)建新版數(shù)據(jù)訂閱通道,詳情請(qǐng)參見(jiàn)訂閱方案概覽中的相關(guān)配置文檔。
  2. 創(chuàng)建一個(gè)或多個(gè)消費(fèi)組,詳情請(qǐng)參見(jiàn)新增消費(fèi)組
  3. 根據(jù)業(yè)務(wù)需求,使用SDK示例代碼。

    重要

    在消費(fèi)訂閱數(shù)據(jù)時(shí),您需要調(diào)用DefaultUserRecord的commit方法以提交位點(diǎn)信息,否則會(huì)導(dǎo)致數(shù)據(jù)重復(fù)消費(fèi)。

    • 使用打包好的新版訂閱SDK(推薦)

      1. 打開IntelliJ IDEA軟件,然后單擊Create New Project,新建一個(gè)業(yè)務(wù)Project。

      2. 在新建的業(yè)務(wù)Project中,找到項(xiàng)目對(duì)象模型文件:pom.xml

      3. pom.xml中添加如下依賴:

        <dependency>
            <groupId>com.aliyun.dts</groupId>
            <artifactId>dts-new-subscribe-sdk</artifactId>
            <version>{dts_new_sdk_version}</version>
        </dependency>
        重要
        • 您可以在dts-new-subscribe-sdk頁(yè)面查看最新Maven依賴。

        • dts-new-subscribe-sdk中封裝了一個(gè)原生依賴:

          <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>{version}</version>
          </dependency>
        • 2.0.0版本的dts-new-subscribe-sdk封裝的是2.7版本的kafka-clients,2.0.0以下版本封裝的是1.0.0版本的kafka-clients。

      4. 參考使用示例代碼使用新版訂閱SDK。

    • 使用定制修改后的新版訂閱SDK

      1. 下載SDK示例代碼文件,然后解壓該文件。

        說(shuō)明

        單擊code,然后選擇Download ZIP下載文件。

      2. 定位至SDK示例代碼解壓的目錄,使用文本編輯工具打開pom.xml文件,將數(shù)據(jù)訂閱SDK的版本修改為最新版本。設(shè)置SDK版本

        重要
        • 您可以在dts-new-subscribe-sdk頁(yè)面查看最新Maven依賴。

        • dts-new-subscribe-sdk中封裝了一個(gè)原生依賴:

          <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>{version}</version>
          </dependency>
        • 2.0.0版本的dts-new-subscribe-sdk封裝的是2.7版本的kafka-clients,2.0.0以下版本封裝的是1.0.0版本的kafka-clients。

      3. 打開IntelliJ IDEA軟件,然后單擊Open or Import打開工程

      4. 在彈出的對(duì)話框中,定位至SDK示例代碼解壓的目錄,依次展開文件夾,找到項(xiàng)目對(duì)象模型文件:pom.xml找到項(xiàng)目對(duì)象模型文件

      5. 在彈出對(duì)話框中,選擇Open as Project

      6. 在IntelliJ IDEA軟件界面,依次展開文件夾,并根據(jù) SDK客戶端的使用模式,選擇并雙擊打開對(duì)應(yīng)的Java文件:DTSConsumerAssignDemo.javaDTSConsumerSubscribeDemo.javajava客戶端文件

        說(shuō)明

        DTS支持以下兩種SDK客戶端的使用模式:

        • ASSIGN模式:DTS為了保證消息的全局有序,每個(gè)訂閱Topic只有一個(gè)partition,且固定分配至partition 0中。當(dāng)SDK客戶端的使用模式為ASSIGN模式時(shí),建議只啟動(dòng)一個(gè)SDK客戶端。

        • SUBSCRIBE模式:DTS為了保證消息的全局有序,每個(gè)訂閱Topic只有一個(gè)partition,且固定分配至partition 0中。當(dāng)SDK客戶端的使用模式為SUBSCRIBE模式時(shí),您可以在一個(gè)消費(fèi)組下同時(shí)啟動(dòng)多個(gè)SDK客戶端,以實(shí)現(xiàn)災(zāi)備。實(shí)現(xiàn)原理是當(dāng)消費(fèi)組下的正常消費(fèi)數(shù)據(jù)的客戶端發(fā)生故障后,其他的SDK客戶端將隨機(jī)且自動(dòng)地分配到partition 0,繼續(xù)消費(fèi)。

  4. 設(shè)置Java文件代碼中的必填參數(shù)。

    assigndemo

    表 1. 必填參數(shù)說(shuō)明

    參數(shù)

    說(shuō)明

    獲取方式

    brokerUrl

    數(shù)據(jù)訂閱通道的網(wǎng)絡(luò)地址及端口號(hào)信息。

    說(shuō)明
    • 如果您部署SDK客戶端所屬的ECS實(shí)例與數(shù)據(jù)訂閱通道屬于經(jīng)典網(wǎng)絡(luò)或同一專有網(wǎng)絡(luò),建議通過(guò)內(nèi)網(wǎng)地址進(jìn)行數(shù)據(jù)訂閱,網(wǎng)絡(luò)延遲最小。

    • 不建議使用公網(wǎng)地址。

    在DTS控制臺(tái)單擊目標(biāo)訂閱實(shí)例ID,在基本信息頁(yè)面的網(wǎng)絡(luò)區(qū)域,您可以獲取網(wǎng)絡(luò)地址及端口號(hào)信息。

    topic

    數(shù)據(jù)訂閱通道的訂閱Topic。

    在DTS控制臺(tái)單擊目標(biāo)訂閱實(shí)例ID,在基本信息頁(yè)面的基本信息區(qū)域,您可以獲取到訂閱Topic

    sid

    消費(fèi)組ID。

    在DTS控制臺(tái)單擊目標(biāo)訂閱實(shí)例ID,然后單擊數(shù)據(jù)消費(fèi),您可以獲取到消費(fèi)組ID/名稱和消費(fèi)組的賬號(hào)信息。

    說(shuō)明

    消費(fèi)組賬號(hào)的密碼已在您新建消費(fèi)組時(shí)指定。

    userName

    消費(fèi)組的賬號(hào)。

    警告

    如您未使用本文提供的客戶端,請(qǐng)按照<消費(fèi)組的賬號(hào)>-<消費(fèi)組ID>的格式設(shè)置用戶名(例如:dtstest-dtsae******bpv),否則無(wú)法正常連接。

    password

    該賬號(hào)的密碼。

    initCheckpoint

    消費(fèi)位點(diǎn),即SDK客戶端消費(fèi)第一條數(shù)據(jù)的時(shí)間戳,格式為Unix時(shí)間戳,例如1620962769。

    說(shuō)明

    消費(fèi)位點(diǎn)信息可用于:

    • 當(dāng)業(yè)務(wù)程序中斷后,傳入已消費(fèi)位點(diǎn)繼續(xù)消費(fèi)數(shù)據(jù),防止數(shù)據(jù)丟失。

    • 在訂閱客戶端啟動(dòng)時(shí),傳入所需的消費(fèi)位點(diǎn),調(diào)整訂閱位點(diǎn),實(shí)現(xiàn)按需消費(fèi)數(shù)據(jù)。

    消費(fèi)位點(diǎn)必須在訂閱實(shí)例的數(shù)據(jù)范圍之內(nèi),并需轉(zhuǎn)化為Unix時(shí)間戳。

    說(shuō)明
    • 您可以在訂閱任務(wù)列表的數(shù)據(jù)范圍列,查看訂閱實(shí)例的數(shù)據(jù)范圍。

    • Unix時(shí)間戳轉(zhuǎn)換工具可用搜索引擎獲取。

    ConsumerContext.ConsumerSubscribeMode subscribeMode

    SDK客戶端的使用模式,取值為:

    • ConsumerContext.ConsumerSubscribeMode.ASSIGN:ASSIGN模式,即一個(gè)消費(fèi)組下僅支持一個(gè)SDK客戶端消費(fèi)訂閱數(shù)據(jù)。

    • ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE:SUBSCRIBE模式,即支持在同一個(gè)消費(fèi)組下同時(shí)啟動(dòng)多個(gè)SDK客戶端實(shí)現(xiàn)災(zāi)備。

    無(wú)

  5. 在IntelliJ IDEA軟件界面的頂部,選擇Run > Run運(yùn)行該客戶端。

    說(shuō)明

    首次運(yùn)行時(shí),軟件需要一定時(shí)間自動(dòng)加載相關(guān)依賴包并完成安裝。

    • 運(yùn)行結(jié)果如下圖所示,該客戶端可正常訂閱到源庫(kù)的數(shù)據(jù)變更信息。消費(fèi)數(shù)據(jù)

    • SDK客戶端每隔一定時(shí)間會(huì)統(tǒng)計(jì)并顯示消費(fèi)數(shù)據(jù)的信息,包括數(shù)據(jù)發(fā)送和接受時(shí)數(shù)據(jù)總數(shù)、數(shù)據(jù)總量、每秒請(qǐng)求數(shù)接收RPS等。統(tǒng)計(jì)信息

      表 2. 消費(fèi)數(shù)據(jù)的統(tǒng)計(jì)信息

      參數(shù)

      說(shuō)明

      outCounts

      SDK客戶端所消費(fèi)的數(shù)據(jù)總數(shù)。

      outBytes

      SDK客戶端所消費(fèi)的數(shù)據(jù)總量,單位為Byte。

      outRps

      SDK客戶端消費(fèi)數(shù)據(jù)時(shí)的每秒請(qǐng)求數(shù)。

      outBps

      SDK客戶端消費(fèi)數(shù)據(jù)時(shí)每秒傳送的比特?cái)?shù)。

      inBytes

      DTS服務(wù)器發(fā)送的數(shù)據(jù)總量,單位為Byte。

      DStoreRecordQueue

      DTS服務(wù)器發(fā)送數(shù)據(jù)時(shí),當(dāng)前數(shù)據(jù)緩存隊(duì)列的大小。

      inCounts

      DTS服務(wù)器發(fā)送數(shù)據(jù)總數(shù)。

      inRps

      DTS服務(wù)器發(fā)送數(shù)據(jù)時(shí)的每秒請(qǐng)求數(shù)。

      __dt

      SDK客戶端接收到數(shù)據(jù)的當(dāng)前時(shí)間戳,單位為毫秒。

      DefaultUserRecordQueue

      序列化后,當(dāng)前數(shù)據(jù)緩存隊(duì)列的大小。

管理消費(fèi)位點(diǎn)

當(dāng)SDK客戶端首次啟動(dòng)、重啟或者發(fā)生內(nèi)部重試時(shí),您需要查詢并傳入 消費(fèi)位點(diǎn),開始或重新消費(fèi)數(shù)據(jù)。下文將介紹在不同情況下如何管理和查詢消費(fèi)位點(diǎn),以確保數(shù)據(jù)不丟失,且盡量不重復(fù),實(shí)現(xiàn)按需消費(fèi)。

若您需要重置客戶端的消費(fèi)位點(diǎn),可以根據(jù)訂閱的模式(SDK使用模式)參考下表查詢消費(fèi)位點(diǎn)并進(jìn)行修改。

場(chǎng)景

SDK使用模式

位點(diǎn)管理方式

查詢消費(fèi)位點(diǎn)

ASSIGN模式、SUBSCRIBE模式

  • 由于SDK客戶端每5秒保存一次消息位點(diǎn),并提交至DTS服務(wù)器,如需查詢最近一次消費(fèi)位點(diǎn),您可通過(guò)以下路徑查詢:

    • SDK客戶端所在服務(wù)器的localCheckpointStore文件。

    • 訂閱通道的數(shù)據(jù)消費(fèi)界面。

  • 如您在consumerContext.java文件中setUserRegisteredStore(new UserMetaStore())配置了外部的持久化共享存儲(chǔ)介質(zhì)(如數(shù)據(jù)庫(kù)),該存儲(chǔ)介質(zhì)每5秒會(huì)保存一次消息位點(diǎn),供您查詢。

首次啟動(dòng)SDK客戶端,需傳入消費(fèi)位點(diǎn),來(lái)消費(fèi)數(shù)據(jù)。

ASSIGN模式、SUBSCRIBE模式

根據(jù)SDK客戶端使用模式,選擇Java文件DTSConsumerAssignDemo.javaDTSConsumerSubscribeDemo.java,并配置消費(fèi)位點(diǎn)initCheckpoint進(jìn)行消費(fèi)。配置方式,請(qǐng)參見(jiàn)34

SDK客戶端因內(nèi)部重試,需重新傳入上一個(gè)記錄的消費(fèi)位點(diǎn),以繼續(xù)消費(fèi)數(shù)據(jù)。

ASSIGN模式

按如下順序,查找上一個(gè)記錄的消費(fèi)位點(diǎn),找到即可返回位點(diǎn)信息:

  1. 您在consumerContext.java文件中setUserRegisteredStore(new UserMetaStore())配置的外部存儲(chǔ)介質(zhì)。

  2. SDK客戶端所在服務(wù)器的localCheckpointStore文件。

  3. 您在DTSConsumerSubscribeDemo.java文件中initCheckpoint傳入的開始時(shí)間戳(start timestamp)。

SUBSCRIBE模式

按如下順序,查找上一個(gè)記錄的消費(fèi)位點(diǎn),找到即可返回位點(diǎn)信息:

  1. 您在consumerContext.java文件中setUserRegisteredStore(new UserMetaStore())配置的外部存儲(chǔ)介質(zhì)。

  2. DTS Server(增量數(shù)據(jù)采集模塊)保存的位點(diǎn)。

    說(shuō)明

    SDK客戶端調(diào)用commit方法更新消費(fèi)位點(diǎn)后,此位點(diǎn)才會(huì)更新。

  3. 您在DTSConsumerSubscribeDemo.java文件中initCheckpoint傳入的開始時(shí)間戳(start timestamp)。

  4. 使用DTS Server(新建增量數(shù)據(jù)采集模塊)的起始位點(diǎn)。

    重要

    如果增量數(shù)據(jù)采集模塊發(fā)生了切換,新建的增量數(shù)據(jù)采集模塊將無(wú)法保存客戶端上次的消費(fèi)位點(diǎn)信息,可能會(huì)導(dǎo)致從一個(gè)較舊的位點(diǎn)開始消費(fèi)訂閱數(shù)據(jù)。建議您在客戶端持久化存儲(chǔ)消費(fèi)位點(diǎn),詳情請(qǐng)參見(jiàn)持久化存儲(chǔ)消費(fèi)位點(diǎn)

已重啟SDK客戶端,需重新傳入上一個(gè)記錄的消費(fèi)位點(diǎn),以繼續(xù)消費(fèi)數(shù)據(jù)。

ASSIGN模式

根據(jù)consumerContext.java文件中setForceUseCheckpoint配置情況,查詢消費(fèi)位點(diǎn),找到即可返回位點(diǎn)信息:

  • 配置為true時(shí),每次重啟SDK客戶端,都會(huì)強(qiáng)制使用傳入的initCheckpoint作為消費(fèi)位點(diǎn)。

  • 配置為false或者沒(méi)有配置時(shí),請(qǐng)按如下順序,查找上一個(gè)記錄的消費(fèi)位點(diǎn):

    1. SDK客戶端所在服務(wù)器的localCheckpointStore文件。

    2. DTS Server(增量數(shù)據(jù)采集模塊)保存的位點(diǎn)。

      說(shuō)明

      SDK客戶端調(diào)用commit方法更新消費(fèi)位點(diǎn)后,此位點(diǎn)才會(huì)更新。

    3. 您在consumerContext.java文件中setUserRegisteredStore(new UserMetaStore())配置的外部存儲(chǔ)介質(zhì)。

SUBSCRIBE模式

該模式下consumerContext.java文件中setForceUseCheckpoint配置不生效,請(qǐng)按如下順序,查找上一個(gè)記錄的消費(fèi)位點(diǎn):

  1. 您在consumerContext.java文件中setUserRegisteredStore(new UserMetaStore())配置的外部存儲(chǔ)介質(zhì)。

  2. DTS Server(增量數(shù)據(jù)采集模塊)保存的位點(diǎn)。

    說(shuō)明

    SDK客戶端調(diào)用commit方法更新消費(fèi)位點(diǎn)后,此位點(diǎn)才會(huì)更新。

  3. 您在DTSConsumerSubscribeDemo.java文件中initCheckpoint傳入的開始時(shí)間戳(start timestamp)。

  4. 使用DTS Server(新建增量數(shù)據(jù)采集模塊)的起始位點(diǎn)。

持久化存儲(chǔ)消費(fèi)位點(diǎn)

如果增量數(shù)據(jù)采集模塊觸發(fā)容災(zāi)機(jī)制(特別是SUBSCRIBE模式),新建的增量數(shù)據(jù)采集模塊將無(wú)法保存客戶端上次的消費(fèi)位點(diǎn)信息,可能會(huì)導(dǎo)致客戶端從一個(gè)較舊的位點(diǎn)開始消費(fèi)訂閱數(shù)據(jù),從而造成歷史數(shù)據(jù)的重復(fù)消費(fèi)。例如:增量數(shù)據(jù)服務(wù)切換前,老的增量數(shù)據(jù)采集模塊位點(diǎn)范圍為2023年11月11日 08:00:00~ 2023年11月12日 08:00:00,客戶端的消費(fèi)位點(diǎn)為2023年11月12日 08:00:00;增量數(shù)據(jù)服務(wù)切換后,新的增量數(shù)據(jù)采集模塊位點(diǎn)范圍為2023年11月08日 10:00:00~ 2023年11月12日 08:01:00,那么客戶端會(huì)從新的增量數(shù)據(jù)采集模塊的起始位點(diǎn)2023年11月08日 10:00:00開始消費(fèi),造成重復(fù)消費(fèi)歷史數(shù)據(jù)。

為了規(guī)避這種切換場(chǎng)景對(duì)歷史數(shù)據(jù)的重復(fù)消費(fèi),建議您在客戶端配置一個(gè)在客戶端保存的消費(fèi)位點(diǎn)持久化存儲(chǔ)方式。示例方法如下,您可以根據(jù)實(shí)際情況進(jìn)行修改。

  1. 創(chuàng)建一個(gè)UserMetaStore()方法,繼承實(shí)現(xiàn)AbstractUserMetaStore()方法。

    例如使用MySQL數(shù)據(jù)庫(kù)存儲(chǔ)位點(diǎn)信息,Java示例代碼如下:

    public class UserMetaStore extends AbstractUserMetaStore {
    
        @Override
        protected void saveData(String groupID, String toStoreJson) {
            Connection con = getConnection();
    			  String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
                pres.setString(2, toStoreJson);
                pres.execute();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    
        @Override
        protected String getData(String groupID) {
            Connection con = getConnection();
    			  String sql = "select checkpoint from dts_checkpoint where group_id = ?";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
    						ResultSet rs = pres.executeQuery()
                              
                String checkpoint = rs.getString("checkpoint");
              
                return checkpoint;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    }
    
  2. consumerContext.java文件中的setUserRegisteredStore(new UserMetaStore())方法,配置外部存儲(chǔ)介質(zhì)。

常見(jiàn)問(wèn)題

  • 無(wú)法連接訂閱實(shí)例,如何處理?

    請(qǐng)根據(jù)報(bào)錯(cuò)提示進(jìn)行排查,詳情請(qǐng)參見(jiàn)問(wèn)題排查

  • 持久化后的消費(fèi)位點(diǎn)是什么格式的數(shù)據(jù)?

    消費(fèi)位點(diǎn)在持久化處理后,將返回JSON格式的數(shù)據(jù)。其中,持久化后的消費(fèi)位點(diǎn)的格式為Unix時(shí)間戳,您可以直接將其傳回SDK進(jìn)行使用。如下返回?cái)?shù)據(jù)中,"timestamp"后的1700709977即為持久化后的消費(fèi)位點(diǎn)。

    {"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":1700709977,"info":""}]}
  • 訂閱任務(wù)是否支持多個(gè)客戶端并行消費(fèi)?

    不支持。SUBSCRIBE模式允許多個(gè)客戶端并行,但只有一個(gè)客戶端可以消費(fèi)到數(shù)據(jù)。

  • 是否支持使用Python或Go語(yǔ)言消費(fèi)訂閱數(shù)據(jù)?

    支持。Python和Go語(yǔ)言的示例代碼,請(qǐng)參見(jiàn)dts-subscribe-demo

問(wèn)題排查

問(wèn)題

報(bào)錯(cuò)提示

原因

解決方法

無(wú)法連接

ERROR
CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009
failed, please check the network and if the brokerUrl is correct'}
(com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

brokerUrl填寫錯(cuò)誤。

填入正確的brokerUrluserNamepassword,查詢方式,請(qǐng)參見(jiàn)必填參數(shù)說(shuō)明

telnet real node *** failed, please check the network

無(wú)法通過(guò)broker地址連接真實(shí)的IP地址。

ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

用戶名和密碼錯(cuò)誤。

com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failed

consumerContext.java文件中 setUseCheckpoint配置為true,但消費(fèi)位點(diǎn)不在訂閱實(shí)例的數(shù)據(jù)范圍之內(nèi)。

傳入在訂閱實(shí)例的數(shù)據(jù)范圍之內(nèi)的消費(fèi)位點(diǎn),查詢方式,請(qǐng)參見(jiàn)必填參數(shù)說(shuō)明

消費(fèi)訂閱速度變慢

無(wú)

  • 通過(guò)查詢統(tǒng)計(jì)信息中的參數(shù)DStoreRecordQueueDefaultUserRecordQueue隊(duì)列的大小,分析消費(fèi)數(shù)據(jù)變慢的原因。查詢方式,請(qǐng)參見(jiàn)消費(fèi)數(shù)據(jù)的統(tǒng)計(jì)信息

    • 如參數(shù)DStoreRecordQueue保持為0,則表示DTS服務(wù)器拉取數(shù)據(jù)速度變慢。

    • 如參數(shù)DefaultUserRecordQueue保持為默認(rèn)值512,則表示SDK客戶端消費(fèi)數(shù)據(jù)的速度變慢。

  • 根據(jù)實(shí)際情況,修改代碼中的消費(fèi)位點(diǎn)(initCheckpoint)進(jìn)行重置位點(diǎn)。