日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

協(xié)同消費(fèi)

更新時(shí)間:

協(xié)同消費(fèi)

概念

點(diǎn)位服務(wù)

點(diǎn)位服務(wù)是提供將消費(fèi)的點(diǎn)位保存在服務(wù)端的功能,點(diǎn)位由sequence和timestamp組成,sequence是遞增的對(duì)應(yīng)唯一記錄的序列,timestamp是記錄寫(xiě)入datahub的單位為ms的時(shí)間戳。

為Topic創(chuàng)建訂閱,并在完成消費(fèi)一部分?jǐn)?shù)據(jù)后,將點(diǎn)位提交至服務(wù)端。下次啟動(dòng)任務(wù)時(shí),可以從服務(wù)端獲取上次提交的點(diǎn)位,從指定點(diǎn)位的下一條記錄開(kāi)始消費(fèi)。將點(diǎn)位保存在服務(wù)端才能夠?qū)崿F(xiàn)shard重新分配后,能夠從上次提交的點(diǎn)位之后消費(fèi),是協(xié)同消費(fèi)功能的前提。

在Consumer中不需要手動(dòng)處理點(diǎn)位,在config中設(shè)置點(diǎn)位提交的間隔,在讀取記錄時(shí),認(rèn)為之前的記錄已經(jīng)完成處理,若距離上次提交點(diǎn)位已經(jīng)超過(guò)提交間隔,則嘗試提交。在提交失敗并且同時(shí)任務(wù)強(qiáng)制停止時(shí),有一定可能造成點(diǎn)位提交不及時(shí),重復(fù)消費(fèi)一部分?jǐn)?shù)據(jù)。

協(xié)同消費(fèi)

協(xié)同消費(fèi)是為了解決多個(gè)消費(fèi)者同時(shí)消費(fèi)一個(gè)topic時(shí),自動(dòng)分配shard的問(wèn)題。能夠簡(jiǎn)化消費(fèi)的客戶端處理,多個(gè)消費(fèi)者可能是在不同機(jī)器上,通過(guò)自己協(xié)調(diào)分配shard是困難的。使用同一個(gè)Sub Id的Consummer在同一個(gè)Consumer Group中,同一個(gè)shard在一個(gè)Consumer Group中只會(huì)被分配給1個(gè)Consumer。

協(xié)同消費(fèi)示意圖

場(chǎng)景

現(xiàn)有3個(gè)消費(fèi)者實(shí)例A,B,C,Topic共有10個(gè)shard

  1. 實(shí)例A啟動(dòng),分配10個(gè)shard

  2. 實(shí)例B,C啟動(dòng),shard分配為4,3,3

  3. 將1個(gè)shard進(jìn)行split操作,在父節(jié)點(diǎn)消費(fèi)完后,客戶端主動(dòng)釋放,2個(gè)子節(jié)點(diǎn)加入后,shard分配為4,4,3

  4. 實(shí)例C停止后,shard分配為6,5

心跳

要實(shí)現(xiàn)協(xié)同消費(fèi)的功能,需要通過(guò)心跳機(jī)制來(lái)通知讓服務(wù)端消費(fèi)者實(shí)例的狀態(tài),當(dāng)前分配的shard和需要釋放的shard,超過(guò)時(shí)間間隔沒(méi)有收到心跳,則認(rèn)為消費(fèi)者實(shí)例已經(jīng)停止。當(dāng)消費(fèi)者實(shí)例的狀態(tài)發(fā)生改變,服務(wù)端會(huì)重新分配shard,新的分配計(jì)劃也是通過(guò)心跳請(qǐng)求來(lái)返回,所以客戶端感知shard變化是有時(shí)間間隔的。

版本

Maven依賴以及JDK:

maven pom

<dependency>
    <groupId>com.aliyun.datahub</groupId>
    <artifactId>aliyun-sdk-datahub</artifactId>
    <version>2.25.1</version>
</dependency>
<dependency>
      <groupId>com.aliyun.datahub</groupId>
      <artifactId>datahub-client-library</artifactId>
      <version>1.4.1/version>
</dependency>

jdk

jdk: >= 1.8
說(shuō)明

datahub-client-library 1.4及以后版本 Producer / Consumer修改為線程安全,可以在多個(gè)線程中使用同一個(gè)Producer / Consumer,低于1.4版本,非線程安全的consumer / producer 的多線程使用方式請(qǐng)參考下文 "多個(gè)Consumer/Producer線程消費(fèi)示例"章節(jié)

身份驗(yàn)證

背景信息

AccessKey(簡(jiǎn)稱AK)是阿里云提供給阿里云用戶的訪問(wèn)密鑰,用于訪問(wèn)阿里云OpenAPI時(shí)的身份驗(yàn)證。AccessKey包括AccessKey ID和AccessKey Secret,需妥善保管。AK如果泄露,會(huì)威脅該賬號(hào)下所有資源的安全。訪問(wèn)阿里云OpenAPI時(shí),如果在代碼中硬編碼明文AK,容易因代碼倉(cāng)庫(kù)權(quán)限管理不當(dāng)造成AK泄露。

Alibaba Cloud Credentials是阿里云為阿里云開(kāi)發(fā)者用戶提供的身份憑證管理工具。配置了Credentials默認(rèn)憑據(jù)鏈后,訪問(wèn)阿里云OpenAPI時(shí),您無(wú)需在代碼中硬編碼明文AK,可有效保證您賬號(hào)下云資源的安全。

前提條件

  • 已獲取RAM用戶賬號(hào)的AccessKey ID和AccessKey Secret。相關(guān)操作,請(qǐng)參見(jiàn)查看RAM用戶的AccessKey信息

  • 重要

    阿里云賬號(hào)(即主賬號(hào))的AccessKey泄露會(huì)威脅該賬號(hào)下所有資源的安全。為保證賬號(hào)安全,強(qiáng)烈建議您為RAM用戶創(chuàng)建AccessKey,非必要情況下請(qǐng)勿為阿里云主賬號(hào)創(chuàng)建AccessKey。

    RAM用戶的AccessKey Secret只能在創(chuàng)建AccessKey時(shí)顯示,創(chuàng)建完成后不支持查看。請(qǐng)?jiān)趧?chuàng)建好AccessKey后,及時(shí)并妥善保存AccessKey Secret。

  • 已安裝阿里云SDK Credentials工具。

    Maven安裝方式(推薦使用Credentials最新版本):

    <dependency>
     <groupId>com.aliyun</groupId>
     <artifactId>credentials-java</artifactId>
     <version>0.2.11</version>
    </dependency>
  • JDK版本為1.7及以上。

配置方案

本文示例的是通過(guò)配置環(huán)境變量方式,更多方式請(qǐng)?jiān)L問(wèn)配置環(huán)境變量

重要

使用配置文件的方案時(shí),請(qǐng)確保您系統(tǒng)中不存在環(huán)境變量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。否則,配置文件將不生效。

阿里云SDK支持通過(guò)定義ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET環(huán)境變量來(lái)創(chuàng)建默認(rèn)的訪問(wèn)憑證。調(diào)用接口時(shí),程序直接訪問(wèn)憑證,讀取您的訪問(wèn)密鑰(即AccessKey)并自動(dòng)完成鑒權(quán)。

配置方法

配置環(huán)境變量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET

  • Linux和macOS系統(tǒng)配置方法

    執(zhí)行以下命令:

    export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id>
    export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>

    <access_key_id>需替換為已準(zhǔn)備好的AccessKey ID,<access_key_secret>替換為AccessKey Secret。

  • Windows系統(tǒng)配置方法

    1. 新建環(huán)境變量文件,添加環(huán)境變量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET,并寫(xiě)入已準(zhǔn)備好的AccessKey ID和AccessKey Secret。

    2. 重啟Windows系統(tǒng)。

代碼示例

    Client credentialClient = new Client();
    String accessKeyId = credentialClient.getAccessKeyId();
    String accessKeySecret = credentialClient.getAccessKeySecret();

示例

Producer 代碼示例

同步寫(xiě)入

import com.aliyun.credentials.Client;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpClient;
import com.aliyun.datahub.client.model.BlobRecordData;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.clientlibrary.common.ExceptionChecker;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.producer.DatahubProducer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleProducer {

  private static final Logger LOGGER = LoggerFactory.getLogger(SimpleProducer.class);

  public static void main(String[] args) throws Exception {
    // 是否開(kāi)啟內(nèi)存Metric,開(kāi)啟后,配置log4j日志后,內(nèi)存metric會(huì)打印到日志中
    // ClientMetrics.startMetrics();

    //以杭州Region為例
    String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
    Client credentialClient = new Client();
    String accessKeyId = credentialClient.getAccessKeyId();
    String accessKeySecret = credentialClient.getAccessKeySecret();
    String projectName = "";
    String topicName = "";

    ////////////////////////////// STEP1. 創(chuàng)建DatahubProducer //////////////////////////
    ProducerConfig config = new ProducerConfig(endpoint, accessKeyId, accessKeySecret);
    DatahubProducer datahubProducer = new DatahubProducer(projectName, topicName, config);
    RecordSchema schema = datahubProducer.getTopicSchema();
    ///////////////////// STEP2. 根據(jù)Topic是BLOB還是TUPLE類型,選擇構(gòu)建寫(xiě)入Record ////////////
    List<RecordEntry> recordList = new ArrayList<>();
    // 構(gòu)建BLOB非結(jié)構(gòu)化數(shù)據(jù)寫(xiě)入
    for (int i = 0; i < 10; ++i) {
      RecordEntry record = new RecordEntry();
      // 構(gòu)建BLOB數(shù)據(jù)
      BlobRecordData data = new BlobRecordData("HelloWorld".getBytes(StandardCharsets.UTF_8));
      // 構(gòu)建TUPLE數(shù)據(jù)
     //TupleRecordData data = new TupleRecordData(schema);
     //data.setField("f1", "f1_" + i);

      record.setRecordData(data);
      record.addAttribute("key1", "value1"); // 數(shù)據(jù)字段,可選

      recordList.add(record);
    }

    ///////////////////////// STEP3:循環(huán)寫(xiě)入數(shù)據(jù) /////////////////////////
    try {
      for (int i = 0; i < 10000; ++i) {
        try {
          String shardId = datahubProducer.send(recordList);
          LOGGER.info("Write shard {} ok, record count:{}", shardId, recordList.size());
        } catch (DatahubClientException e) {
          if (!ExceptionChecker.isRetryableException(e)) {
            LOGGER.info("Write data fail", e);
            break;
          }
          // sleep重試
          Thread.sleep(1000);
        }
      }
    } finally {
      // 關(guān)閉producer相關(guān)資源
      datahubProducer.close();
    }

    // 進(jìn)程退出時(shí),調(diào)用全局清理函數(shù)
    HttpClient.close();

//        ClientMetrics.stopMetrics();
  }
}

異步寫(xiě)入

public class SimpleProducerAsync {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleProducerAsync.class);

    public static void main(String[] args) throws Exception {
        // 是否開(kāi)啟內(nèi)存Metric,開(kāi)啟后,配置log4j日志后,內(nèi)存metric會(huì)打印到日志中
        // ClientMetrics.startMetrics();
        //以杭州Region為例
        String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
        Client credentialClient = new Client();
        String accessKeyId = credentialClient.getAccessKeyId();
        String accessKeySecret = credentialClient.getAccessKeySecret();
        String projectName = "";
        String topicName = "";

        ////////////////////////////// STEP1. 創(chuàng)建DatahubProducer //////////////////////////
        ProducerConfig config = new ProducerConfig(endpoint, accessKeyId, accessKeySecret);
        config.setMaxAsyncBufferTimeMs(30000); // 設(shè)置緩存時(shí)間
        DatahubProducer datahubProducer = new DatahubProducer(projectName, topicName, config);

        RecordSchema schema = datahubProducer.getTopicSchema();

        // 異步寫(xiě)入可以注冊(cè)回調(diào)函數(shù)
        WriteCallback callback  = new WriteCallback() {
            @Override
            public void onSuccess(String shardId, List<RecordEntry> records,
                                  long elapsedTimeMs, long sendTimeMs) {
                LOGGER.info("Message sent successfully");
            }

            @Override
            public void onFailure(String shardId, List<RecordEntry> records,
                                  long elapsedTimeMs, DatahubClientException e) {
                LOGGER.error("Message sent fail", e);
            }
        };

        // 可選,配置數(shù)據(jù)哈希策略
        // partition優(yōu)先順序: 依次按照RecordEntry的shardId, hashKey, partitionKey的順序計(jì)算最終寫(xiě)入的shardId
        RecordPartitioner partitioner = new DefaultRecordPartitioner();

        ///////////////////////// STEP2:異步循環(huán)寫(xiě)入數(shù)據(jù) /////////////////////////
        try {
            for (int i = 0; i < 1000; ++i) {
                try {
					//Tuple結(jié)構(gòu)化數(shù)據(jù)寫(xiě)入
                    RecordEntry record = new RecordEntry();
                    TupleRecordData data = new TupleRecordData(schema);
                    data.setField("f1", "f1_" + i);
					          //BLOB非結(jié)構(gòu)化數(shù)據(jù)寫(xiě)入
					          //BlobRecordData data = new BlobRecordData("HelloWorld".getBytes(StandardCharsets.UTF_8));
                    record.setRecordData(data);
                    record.addAttribute("key1", "value1"); // 數(shù)據(jù)字段,可選
                    // 單條發(fā)送,發(fā)送數(shù)據(jù)時(shí)可以指定是否進(jìn)行partition,
                    datahubProducer.sendAsync(record, callback, partitioner);
                } catch (DatahubClientException e) {
                    if (!ExceptionChecker.isRetryableException(e)) {
                        LOGGER.info("Write data fail", e);
                        break;
                    }
                    // sleep重試
                    Thread.sleep(1000);
                }
            }

            // 阻塞到數(shù)據(jù)發(fā)送成功
            datahubProducer.flush(true);
        } catch (Exception e) {
            LOGGER.warn("Write fail", e);
        } finally {
            // 關(guān)閉producer相關(guān)資源
            datahubProducer.close();
        }

        // 進(jìn)程退出時(shí),調(diào)用全局清理函數(shù)
        HttpClient.close();
//        ClientMetrics.stopMetrics();
    }
}

協(xié)同消費(fèi)代碼示例

配置

名稱

描述

autoCommit

是否自動(dòng)提交點(diǎn)位,默認(rèn)為true。點(diǎn)位的提交會(huì)在后臺(tái)線程按配置的時(shí)間間隔執(zhí)行,自動(dòng)提交的邏輯是當(dāng)read接口被調(diào)用時(shí),認(rèn)為之前讀的數(shù)據(jù)已經(jīng)處理完畢。如果設(shè)置為false,那么每條record處理完必須ack,后臺(tái)提交點(diǎn)位會(huì)保證該點(diǎn)位之前的record全部被ack。

offsetCommitTimeoutMs

點(diǎn)位的提交間隔,單位毫秒,默認(rèn)30000ms,范圍[3000, 300000]

sessionTimeoutMs

會(huì)話超時(shí)時(shí)間,心跳間隔會(huì)設(shè)為改置的2/3,超過(guò)時(shí)間沒(méi)有心跳,認(rèn)為客戶端已停止,服務(wù)端會(huì)重新分配被占有shard,單位毫秒,默認(rèn)60000ms,范圍[60000, 180000]

fetchSize

單個(gè)shard異步讀取記錄的大小,會(huì)緩存2倍于該值的記錄,少于2倍會(huì)觸發(fā)異步任務(wù)去讀取,默認(rèn)1000,必須大于0

import com.aliyun.credentials.Client;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpClient;
import com.aliyun.datahub.client.model.*;
import com.aliyun.datahub.clientlibrary.common.ExceptionChecker;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.consumer.DatahubConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumer.class);

    public static void main(String[] args) throws Exception {
        // 是否開(kāi)啟內(nèi)存Metric,開(kāi)啟后,配置log4j日志后,內(nèi)存metric會(huì)打印到日志中
        // ClientMetrics.startMetrics();


        //以杭州Region為例
        String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
        Client credentialClient = new Client();
        String accessKeyId = credentialClient.getAccessKeyId();
        String accessKeySecret = credentialClient.getAccessKeySecret();
        String projectName = "";
        String topicName = "";
        String subId = "";

        ////////////////////////////// STEP1. 創(chuàng)建DatahubConsumer //////////////////////////
        ConsumerConfig config = new ConsumerConfig(endpoint, accessKeyId, accessKeySecret);
        DatahubConsumer datahubConsumer = new DatahubConsumer(projectName, topicName, subId, config);
        ///////////////////////// STEP2:循環(huán)讀取數(shù)據(jù) /////////////////////////
        try {
            while (true) {
                try {
                    RecordEntry record = datahubConsumer.read(3000);
                    if (record == null) {
                        continue; // 3s內(nèi)未讀取到數(shù)據(jù),(1). 無(wú)數(shù)據(jù) (2). 內(nèi)部狀態(tài)未Ready,比如協(xié)同消費(fèi)暫時(shí)未分配到shard
                    }

                    RecordData recordData = record.getRecordData();
                    // 根據(jù)Topic為BLOB類型還是TUPLE類型進(jìn)行不同的數(shù)據(jù)處理邏輯, 一種topic只有一種類型
                    if (recordData instanceof TupleRecordData) {
                        TupleRecordData data = (TupleRecordData) recordData;
                        RecordSchema schema = data.getRecordSchema();
                        // 示例中僅做簡(jiǎn)單的字符串拼接
                        StringBuilder sb = new StringBuilder();
                        for (int i = 0; i < schema.getFields().size(); ++i) {
                            sb.append(data.getField(i)).append(",");
                        }
                        LOGGER.debug("Read record. shardId:{}, seq:{}, ts:{}, batchIndex:{}, batchSize:{}, data:{}",
                                record.getShardId(), record.getSequence(), record.getSystemTime(), record.getSegmentIndexForBatch(),
                                record.getSegmentSizeForBatch(), sb);
                    } else {
                        BlobRecordData data = (BlobRecordData) recordData;
                        LOGGER.debug("Read record. shardId:{}, seq:{}, ts:{}, batchIndex:{}, batchSize:{}, data:{}",
                                record.getShardId(), record.getSequence(), record.getSystemTime(), record.getSegmentIndexForBatch(),
                                record.getSegmentSizeForBatch(), new String(data.getData()));
                    }
                } catch (DatahubClientException e) {
                    if (!ExceptionChecker.isRetryableException(e)) {
                        LOGGER.info("Read data fail", e);
                        break;
                    }
                    // sleep重試
                    Thread.sleep(1000);
                }
            }
        }  catch (Exception e) {
            LOGGER.warn("Read data fail", e);
        } finally {
            // 關(guān)閉consumer相關(guān)資源
            datahubConsumer.close();
        }

        // 進(jìn)程退出時(shí),調(diào)用全局清理函數(shù)
        HttpClient.close();

//        ClientMetrics.stopMetrics();
    }
}

多線程讀寫(xiě)示例

多線程共用同一Consumer/Producer消費(fèi)示例(適用于1.4及以上版本)

package com.aliyun.datahub.clientlibrary.example;

import com.aliyun.credentials.Client;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpClient;
import com.aliyun.datahub.client.model.BlobRecordData;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.clientlibrary.common.ExceptionChecker;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.consumer.DatahubConsumer;
import com.aliyun.datahub.clientlibrary.producer.DatahubProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class MultiThreadReadWrite {
    private static final Logger LOGGER = LoggerFactory.getLogger(MultiThreadReadWrite.class);

    public static void main(String[] args) throws Exception {
        try {
//            testProducer();
            testConsumer();
        } finally {
            // 進(jìn)程退出時(shí),調(diào)用全局清理函數(shù)
            HttpClient.close();
        }
    }

    private static void testProducer() throws Exception {
        List<RecordEntry> records = new ArrayList<>();
        String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
        Client credentialClient = new Client();
        String accessKeyId = credentialClient.getAccessKeyId();
        String accessKeySecret = credentialClient.getAccessKeySecret();
        String projectName = "";
        String topicName = "";
        for (int i = 0; i < 2; ++i) {
            RecordEntry record = new RecordEntry();
            BlobRecordData data = new BlobRecordData(("HelloWorld-" + i).getBytes());
            record.setRecordData(data);
            record.addAttribute("key1", "value1");

            records.add(record);
        }

        ProducerConfig config = new ProducerConfig(endpoint, accessKeyId, accessKeySecret);
        config.getDatahubConfig().setProtocol(DatahubConfig.Protocol.BATCH); // 是否開(kāi)啟batch寫(xiě)入,建議開(kāi)啟

        Map<String, AtomicInteger> shardCountMap = new ConcurrentHashMap<>();

        DatahubProducer producer = new DatahubProducer(projectName, topicName,config);
        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 10; ++i) {
            ProducerThread thread = new ProducerThread(producer, records, shardCountMap);
            thread.start();
            threads.add(thread);
        }

        for (int i = 0; i < 10; ++i) {
            threads.get(i).join();
        }

        producer.close();
        // print write count
        for (Map.Entry<String, AtomicInteger> entry : shardCountMap.entrySet()) {
            LOGGER.info("ShardId:{}, count:{}", entry.getKey(), entry.getValue());
        }
    }

    private static class ProducerThread extends Thread {
        private final DatahubProducer producer;
        private final List<RecordEntry> records;
        private final Map<String, AtomicInteger> shardCountMap;

        public ProducerThread(DatahubProducer producer,
                              List<RecordEntry> records,
                              Map<String, AtomicInteger> shardCountMap) {
            this.producer = producer;
            this.records = records;
            this.shardCountMap = shardCountMap;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; ++i) {
                try {
                    String shardId = producer.send(records);
                    shardCountMap.putIfAbsent(shardId, new AtomicInteger(0));
                    AtomicInteger cnt = shardCountMap.get(shardId);
                    cnt.incrementAndGet();
                } catch (DatahubClientException e) {
                    if (!ExceptionChecker.isRetryableException(e)) {
                        LOGGER.info("Write data fail", e);
                        break;
                    }
                    // sleep重試
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException ex) {
                        // ignore
                    }
                } catch (Exception e) {
                    break;
                }
            }
        }
    }

    private static void testConsumer() throws Exception {
        String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
        Client credentialClient = new Client();
        String accessKeyId = credentialClient.getAccessKeyId();
        String accessKeySecret = credentialClient.getAccessKeySecret();
        String projectName = "";
        String topicName = "";
        String subId = "";
        ConsumerConfig config = new ConsumerConfig(endpoint, accessKeyId,accessKeySecret);
        config.getDatahubConfig().setProtocol(DatahubConfig.Protocol.BATCH); // 是否開(kāi)啟batch寫(xiě)入,建議開(kāi)啟

        Map<String, RecordEntry> firstMap = new ConcurrentHashMap<>();
        Map<String, RecordEntry> lastMap = new ConcurrentHashMap<>();
        Map<String, AtomicInteger> shardCountMap = new ConcurrentHashMap<>();

        DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, config);
        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 10; ++i) {
            ConsumerThread thread = new ConsumerThread(consumer, firstMap, lastMap, shardCountMap);
            thread.start();
            threads.add(thread);
        }

        for (int i = 0; i < 10; ++i) {
            threads.get(i).join();
        }

        // print start and end sequence
        for (RecordEntry first : firstMap.values()) {
            RecordEntry last = lastMap.get(first.getShardId());
            AtomicInteger cnt = shardCountMap.get(first.getShardId());
            LOGGER.info("ShardId:{}, startSeq:{}, endSeq:{}, cnt:{}",
                    first.getShardId(), first.getSequence(), last.getSequence(), cnt);
        }

        // 關(guān)閉consumer相關(guān)資源
        consumer.close();
    }

    private static class ConsumerThread extends Thread {
        private final DatahubConsumer consumer;
        private final Map<String, RecordEntry> firstMap;
        private final Map<String, RecordEntry> lastMap;
        private final Map<String, AtomicInteger> shardCountMap;

        public ConsumerThread(DatahubConsumer consumer,
                              Map<String, RecordEntry> firstMap,
                              Map<String, RecordEntry> lastMap,
                              Map<String, AtomicInteger> shardCountMap) {
            this.consumer = consumer;
            this.lastMap = lastMap;
            this.firstMap = firstMap;
            this.shardCountMap = shardCountMap;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    RecordEntry record = consumer.read(30000);
                    if (record == null) {
                        // 在demo中,這里30秒讀取不到數(shù)據(jù)就退出測(cè)試
                        break;
                    }
                    String shardId = record.getShardId();
                    firstMap.putIfAbsent(shardId, record);
                    lastMap.put(shardId, record);
                    shardCountMap.putIfAbsent(shardId, new AtomicInteger(0));
                    AtomicInteger cnt = shardCountMap.get(shardId);
                    cnt.incrementAndGet();
                } catch (DatahubClientException e) {
                    if (!ExceptionChecker.isRetryableException(e)) {
                        LOGGER.info("Read data fail", e);
                        break;
                    }
                    // sleep重試
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException ex) {
                        // ignore
                    }
                } catch (Exception e) {
                    LOGGER.warn("Read fail.", e);
                    break;
                }
            }
        }
    }
}

多個(gè)Consumer/Producer線程消費(fèi)示例(適用于低于1.4的版本)

package com.aliyun.datahub.clientlibrary.example;

import com.aliyun.credentials.Client;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpClient;
import com.aliyun.datahub.client.model.BlobRecordData;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.clientlibrary.common.ExceptionChecker;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.consumer.DatahubConsumer;
import com.aliyun.datahub.clientlibrary.producer.DatahubProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class MultiProducerAndConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(MultiProducerAndConsumer.class);

    public static void main(String[] args) throws Exception {
        try {
//            testProducer();
            testConsumer();
        } finally {
            // 進(jìn)程退出時(shí),調(diào)用全局清理函數(shù)
            HttpClient.close();
        }
    }

    private static void testProducer() throws Exception {
        List<RecordEntry> records = new ArrayList<>();
        for (int i = 0; i < 2; ++i) {
            RecordEntry record = new RecordEntry();
            BlobRecordData data = new BlobRecordData(("HelloWorld-" + i).getBytes());
            record.setRecordData(data);
            record.addAttribute("key1", "value1");

            records.add(record);
        }
        String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
        Client credentialClient = new Client();
        String accessKeyId = credentialClient.getAccessKeyId();
        String accessKeySecret = credentialClient.getAccessKeySecret();
        ProducerConfig config = new ProducerConfig(endpoint,accessKeyId ,accessKeySecret);
        config.getDatahubConfig().setProtocol(DatahubConfig.Protocol.BATCH); // 是否開(kāi)啟batch寫(xiě)入,建議開(kāi)啟

        Map<String, AtomicInteger> shardCountMap = new ConcurrentHashMap<>();


        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 3; ++i) {
            ProducerThread thread = new ProducerThread(config, records, shardCountMap);
            thread.start();
            threads.add(thread);
        }

        for (int i = 0; i < 3; ++i) {
            threads.get(i).join();
        }

        // print write count
        for (Map.Entry<String, AtomicInteger> entry : shardCountMap.entrySet()) {
            LOGGER.info("ShardId:{}, count:{}", entry.getKey(), entry.getValue());
        }
    }

    private static class ProducerThread extends Thread {
        private final DatahubProducer producer;
        private final List<RecordEntry> records;
        private final Map<String, AtomicInteger> shardCountMap;
        String projectName = "";
        String topicName = "";
        public ProducerThread(ProducerConfig config,
                              List<RecordEntry> records,
                              Map<String, AtomicInteger> shardCountMap) {
            this.producer = new DatahubProducer(projectName, topicName, config);;
            this.records = records;
            this.shardCountMap = shardCountMap;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 100; ++i) {
                    try {
                        String shardId = producer.send(records);
                        shardCountMap.putIfAbsent(shardId, new AtomicInteger(0));
                        AtomicInteger cnt = shardCountMap.get(shardId);
                        cnt.incrementAndGet();
                    } catch (DatahubClientException e) {
                        LOGGER.info("Producer send fail", e);
                        if (!ExceptionChecker.isRetryableException(e)) {
                            break;
                        }
                        // sleep重試
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException ex) {
                            // ignore
                        }
                    } catch (Exception e) {
                        LOGGER.info("Producer send fail", e);
                        break;
                    }
                }
            } finally {
                producer.close();
            }
        }
    }

    private static void testConsumer() throws Exception {

        ConsumerConfig config = new ConsumerConfig(ExampleConstants.ENDPOINT, ExampleConstants.ACCESS_ID, ExampleConstants.SUB_ACCESS_KEY);
        config.getDatahubConfig().setProtocol(DatahubConfig.Protocol.BATCH); // 是否開(kāi)啟batch寫(xiě)入,建議開(kāi)啟

        Map<String, RecordEntry> firstMap = new ConcurrentHashMap<>();
        Map<String, RecordEntry> lastMap = new ConcurrentHashMap<>();
        Map<String, AtomicInteger> shardCountMap = new ConcurrentHashMap<>();

        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 3; ++i) {
            ConsumerThread thread = new ConsumerThread(config, firstMap, lastMap, shardCountMap);
            thread.start();
            threads.add(thread);
        }

        for (int i = 0; i < 3; ++i) {
            threads.get(i).join();
        }

        // print start and end sequence
        for (RecordEntry first : firstMap.values()) {
            RecordEntry last = lastMap.get(first.getShardId());
            AtomicInteger cnt = shardCountMap.get(first.getShardId());
            LOGGER.info("ShardId:{}, startSeq:{}, endSeq:{}, cnt:{}",
                    first.getShardId(), first.getSequence(), last.getSequence(), cnt);
        }
    }

    private static class ConsumerThread extends Thread {
        private final DatahubConsumer consumer;
        private final Map<String, RecordEntry> firstMap;
        private final Map<String, RecordEntry> lastMap;
        private final Map<String, AtomicInteger> shardCountMap;

        public ConsumerThread(ConsumerConfig config,
                              Map<String, RecordEntry> firstMap,
                              Map<String, RecordEntry> lastMap,
                              Map<String, AtomicInteger> shardCountMap) {
            this.consumer = new DatahubConsumer(ExampleConstants.PROJECT_NAME, ExampleConstants.TOPIC_NAME, ExampleConstants.SUB_ID, config);;
            this.lastMap = lastMap;
            this.firstMap = firstMap;
            this.shardCountMap = shardCountMap;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    try {
                        RecordEntry record = consumer.read(30000);
                        if (record == null) {
                            // 在demo中,這里30秒讀取不到數(shù)據(jù)就退出測(cè)試
                            break;
                        }
                        String shardId = record.getShardId();
                        firstMap.putIfAbsent(shardId, record);
                        lastMap.put(shardId, record);
                        shardCountMap.putIfAbsent(shardId, new AtomicInteger(0));
                        AtomicInteger cnt = shardCountMap.get(shardId);
                        cnt.incrementAndGet();
                    } catch (DatahubClientException e) {
                        if (!ExceptionChecker.isRetryableException(e)) {
                            LOGGER.info("Read data fail", e);
                            break;
                        }
                        // sleep重試
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException ex) {
                            // ignore
                        }
                    } catch (Exception e) {
                        LOGGER.warn("Read fail.", e);
                        break;
                    }
                }
            } finally {
                consumer.close();
            }

        }
    }
}