協(xié)同消費(fèi)
協(xié)同消費(fèi)
概念
點(diǎn)位服務(wù)
點(diǎn)位服務(wù)是提供將消費(fèi)的點(diǎn)位保存在服務(wù)端的功能,點(diǎn)位由sequence和timestamp組成,sequence是遞增的對(duì)應(yīng)唯一記錄的序列,timestamp是記錄寫(xiě)入datahub的單位為ms的時(shí)間戳。
為Topic創(chuàng)建訂閱,并在完成消費(fèi)一部分?jǐn)?shù)據(jù)后,將點(diǎn)位提交至服務(wù)端。下次啟動(dòng)任務(wù)時(shí),可以從服務(wù)端獲取上次提交的點(diǎn)位,從指定點(diǎn)位的下一條記錄開(kāi)始消費(fèi)。將點(diǎn)位保存在服務(wù)端才能夠?qū)崿F(xiàn)shard重新分配后,能夠從上次提交的點(diǎn)位之后消費(fèi),是協(xié)同消費(fèi)功能的前提。
在Consumer中不需要手動(dòng)處理點(diǎn)位,在config中設(shè)置點(diǎn)位提交的間隔,在讀取記錄時(shí),認(rèn)為之前的記錄已經(jīng)完成處理,若距離上次提交點(diǎn)位已經(jīng)超過(guò)提交間隔,則嘗試提交。在提交失敗并且同時(shí)任務(wù)強(qiáng)制停止時(shí),有一定可能造成點(diǎn)位提交不及時(shí),重復(fù)消費(fèi)一部分?jǐn)?shù)據(jù)。
協(xié)同消費(fèi)
協(xié)同消費(fèi)是為了解決多個(gè)消費(fèi)者同時(shí)消費(fèi)一個(gè)topic時(shí),自動(dòng)分配shard的問(wèn)題。能夠簡(jiǎn)化消費(fèi)的客戶端處理,多個(gè)消費(fèi)者可能是在不同機(jī)器上,通過(guò)自己協(xié)調(diào)分配shard是困難的。使用同一個(gè)Sub Id的Consummer在同一個(gè)Consumer Group中,同一個(gè)shard在一個(gè)Consumer Group中只會(huì)被分配給1個(gè)Consumer。
場(chǎng)景
現(xiàn)有3個(gè)消費(fèi)者實(shí)例A,B,C,Topic共有10個(gè)shard
實(shí)例A啟動(dòng),分配10個(gè)shard
實(shí)例B,C啟動(dòng),shard分配為4,3,3
將1個(gè)shard進(jìn)行split操作,在父節(jié)點(diǎn)消費(fèi)完后,客戶端主動(dòng)釋放,2個(gè)子節(jié)點(diǎn)加入后,shard分配為4,4,3
實(shí)例C停止后,shard分配為6,5
心跳
要實(shí)現(xiàn)協(xié)同消費(fèi)的功能,需要通過(guò)心跳機(jī)制來(lái)通知讓服務(wù)端消費(fèi)者實(shí)例的狀態(tài),當(dāng)前分配的shard和需要釋放的shard,超過(guò)時(shí)間間隔沒(méi)有收到心跳,則認(rèn)為消費(fèi)者實(shí)例已經(jīng)停止。當(dāng)消費(fèi)者實(shí)例的狀態(tài)發(fā)生改變,服務(wù)端會(huì)重新分配shard,新的分配計(jì)劃也是通過(guò)心跳請(qǐng)求來(lái)返回,所以客戶端感知shard變化是有時(shí)間間隔的。
版本
Maven依賴以及JDK:
maven pom
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.25.1</version>
</dependency>
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>datahub-client-library</artifactId>
<version>1.4.1/version>
</dependency>
jdk
jdk: >= 1.8
datahub-client-library 1.4及以后版本 Producer / Consumer修改為線程安全,可以在多個(gè)線程中使用同一個(gè)Producer / Consumer,低于1.4版本,非線程安全的consumer / producer 的多線程使用方式請(qǐng)參考下文 "多個(gè)Consumer/Producer線程消費(fèi)示例"章節(jié)
身份驗(yàn)證
背景信息
AccessKey(簡(jiǎn)稱AK)是阿里云提供給阿里云用戶的訪問(wèn)密鑰,用于訪問(wèn)阿里云OpenAPI時(shí)的身份驗(yàn)證。AccessKey包括AccessKey ID和AccessKey Secret,需妥善保管。AK如果泄露,會(huì)威脅該賬號(hào)下所有資源的安全。訪問(wèn)阿里云OpenAPI時(shí),如果在代碼中硬編碼明文AK,容易因代碼倉(cāng)庫(kù)權(quán)限管理不當(dāng)造成AK泄露。
Alibaba Cloud Credentials是阿里云為阿里云開(kāi)發(fā)者用戶提供的身份憑證管理工具。配置了Credentials默認(rèn)憑據(jù)鏈后,訪問(wèn)阿里云OpenAPI時(shí),您無(wú)需在代碼中硬編碼明文AK,可有效保證您賬號(hào)下云資源的安全。
前提條件
已獲取RAM用戶賬號(hào)的AccessKey ID和AccessKey Secret。相關(guān)操作,請(qǐng)參見(jiàn)查看RAM用戶的AccessKey信息。
- 重要
阿里云賬號(hào)(即主賬號(hào))的AccessKey泄露會(huì)威脅該賬號(hào)下所有資源的安全。為保證賬號(hào)安全,強(qiáng)烈建議您為RAM用戶創(chuàng)建AccessKey,非必要情況下請(qǐng)勿為阿里云主賬號(hào)創(chuàng)建AccessKey。
RAM用戶的AccessKey Secret只能在創(chuàng)建AccessKey時(shí)顯示,創(chuàng)建完成后不支持查看。請(qǐng)?jiān)趧?chuàng)建好AccessKey后,及時(shí)并妥善保存AccessKey Secret。
已安裝阿里云SDK Credentials工具。
Maven安裝方式(推薦使用Credentials最新版本):
<dependency> <groupId>com.aliyun</groupId> <artifactId>credentials-java</artifactId> <version>0.2.11</version> </dependency>
JDK版本為1.7及以上。
配置方案
本文示例的是通過(guò)配置環(huán)境變量方式,更多方式請(qǐng)?jiān)L問(wèn)配置環(huán)境變量
使用配置文件的方案時(shí),請(qǐng)確保您系統(tǒng)中不存在環(huán)境變量ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
。否則,配置文件將不生效。
阿里云SDK支持通過(guò)定義ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
環(huán)境變量來(lái)創(chuàng)建默認(rèn)的訪問(wèn)憑證。調(diào)用接口時(shí),程序直接訪問(wèn)憑證,讀取您的訪問(wèn)密鑰(即AccessKey)并自動(dòng)完成鑒權(quán)。
配置方法
配置環(huán)境變量ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
。
Linux和macOS系統(tǒng)配置方法
執(zhí)行以下命令:
export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id> export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>
<access_key_id>
需替換為已準(zhǔn)備好的AccessKey ID,<access_key_secret>
替換為AccessKey Secret。Windows系統(tǒng)配置方法
新建環(huán)境變量文件,添加環(huán)境變量
ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
,并寫(xiě)入已準(zhǔn)備好的AccessKey ID和AccessKey Secret。重啟Windows系統(tǒng)。
代碼示例
Client credentialClient = new Client();
String accessKeyId = credentialClient.getAccessKeyId();
String accessKeySecret = credentialClient.getAccessKeySecret();
示例
Producer 代碼示例
同步寫(xiě)入
import com.aliyun.credentials.Client;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpClient;
import com.aliyun.datahub.client.model.BlobRecordData;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.clientlibrary.common.ExceptionChecker;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.producer.DatahubProducer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleProducer.class);
public static void main(String[] args) throws Exception {
// 是否開(kāi)啟內(nèi)存Metric,開(kāi)啟后,配置log4j日志后,內(nèi)存metric會(huì)打印到日志中
// ClientMetrics.startMetrics();
//以杭州Region為例
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
Client credentialClient = new Client();
String accessKeyId = credentialClient.getAccessKeyId();
String accessKeySecret = credentialClient.getAccessKeySecret();
String projectName = "";
String topicName = "";
////////////////////////////// STEP1. 創(chuàng)建DatahubProducer //////////////////////////
ProducerConfig config = new ProducerConfig(endpoint, accessKeyId, accessKeySecret);
DatahubProducer datahubProducer = new DatahubProducer(projectName, topicName, config);
RecordSchema schema = datahubProducer.getTopicSchema();
///////////////////// STEP2. 根據(jù)Topic是BLOB還是TUPLE類型,選擇構(gòu)建寫(xiě)入Record ////////////
List<RecordEntry> recordList = new ArrayList<>();
// 構(gòu)建BLOB非結(jié)構(gòu)化數(shù)據(jù)寫(xiě)入
for (int i = 0; i < 10; ++i) {
RecordEntry record = new RecordEntry();
// 構(gòu)建BLOB數(shù)據(jù)
BlobRecordData data = new BlobRecordData("HelloWorld".getBytes(StandardCharsets.UTF_8));
// 構(gòu)建TUPLE數(shù)據(jù)
//TupleRecordData data = new TupleRecordData(schema);
//data.setField("f1", "f1_" + i);
record.setRecordData(data);
record.addAttribute("key1", "value1"); // 數(shù)據(jù)字段,可選
recordList.add(record);
}
///////////////////////// STEP3:循環(huán)寫(xiě)入數(shù)據(jù) /////////////////////////
try {
for (int i = 0; i < 10000; ++i) {
try {
String shardId = datahubProducer.send(recordList);
LOGGER.info("Write shard {} ok, record count:{}", shardId, recordList.size());
} catch (DatahubClientException e) {
if (!ExceptionChecker.isRetryableException(e)) {
LOGGER.info("Write data fail", e);
break;
}
// sleep重試
Thread.sleep(1000);
}
}
} finally {
// 關(guān)閉producer相關(guān)資源
datahubProducer.close();
}
// 進(jìn)程退出時(shí),調(diào)用全局清理函數(shù)
HttpClient.close();
// ClientMetrics.stopMetrics();
}
}
異步寫(xiě)入
public class SimpleProducerAsync {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleProducerAsync.class);
public static void main(String[] args) throws Exception {
// 是否開(kāi)啟內(nèi)存Metric,開(kāi)啟后,配置log4j日志后,內(nèi)存metric會(huì)打印到日志中
// ClientMetrics.startMetrics();
//以杭州Region為例
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
Client credentialClient = new Client();
String accessKeyId = credentialClient.getAccessKeyId();
String accessKeySecret = credentialClient.getAccessKeySecret();
String projectName = "";
String topicName = "";
////////////////////////////// STEP1. 創(chuàng)建DatahubProducer //////////////////////////
ProducerConfig config = new ProducerConfig(endpoint, accessKeyId, accessKeySecret);
config.setMaxAsyncBufferTimeMs(30000); // 設(shè)置緩存時(shí)間
DatahubProducer datahubProducer = new DatahubProducer(projectName, topicName, config);
RecordSchema schema = datahubProducer.getTopicSchema();
// 異步寫(xiě)入可以注冊(cè)回調(diào)函數(shù)
WriteCallback callback = new WriteCallback() {
@Override
public void onSuccess(String shardId, List<RecordEntry> records,
long elapsedTimeMs, long sendTimeMs) {
LOGGER.info("Message sent successfully");
}
@Override
public void onFailure(String shardId, List<RecordEntry> records,
long elapsedTimeMs, DatahubClientException e) {
LOGGER.error("Message sent fail", e);
}
};
// 可選,配置數(shù)據(jù)哈希策略
// partition優(yōu)先順序: 依次按照RecordEntry的shardId, hashKey, partitionKey的順序計(jì)算最終寫(xiě)入的shardId
RecordPartitioner partitioner = new DefaultRecordPartitioner();
///////////////////////// STEP2:異步循環(huán)寫(xiě)入數(shù)據(jù) /////////////////////////
try {
for (int i = 0; i < 1000; ++i) {
try {
//Tuple結(jié)構(gòu)化數(shù)據(jù)寫(xiě)入
RecordEntry record = new RecordEntry();
TupleRecordData data = new TupleRecordData(schema);
data.setField("f1", "f1_" + i);
//BLOB非結(jié)構(gòu)化數(shù)據(jù)寫(xiě)入
//BlobRecordData data = new BlobRecordData("HelloWorld".getBytes(StandardCharsets.UTF_8));
record.setRecordData(data);
record.addAttribute("key1", "value1"); // 數(shù)據(jù)字段,可選
// 單條發(fā)送,發(fā)送數(shù)據(jù)時(shí)可以指定是否進(jìn)行partition,
datahubProducer.sendAsync(record, callback, partitioner);
} catch (DatahubClientException e) {
if (!ExceptionChecker.isRetryableException(e)) {
LOGGER.info("Write data fail", e);
break;
}
// sleep重試
Thread.sleep(1000);
}
}
// 阻塞到數(shù)據(jù)發(fā)送成功
datahubProducer.flush(true);
} catch (Exception e) {
LOGGER.warn("Write fail", e);
} finally {
// 關(guān)閉producer相關(guān)資源
datahubProducer.close();
}
// 進(jìn)程退出時(shí),調(diào)用全局清理函數(shù)
HttpClient.close();
// ClientMetrics.stopMetrics();
}
}
協(xié)同消費(fèi)代碼示例
配置
名稱 | 描述 |
autoCommit | 是否自動(dòng)提交點(diǎn)位,默認(rèn)為true。點(diǎn)位的提交會(huì)在后臺(tái)線程按配置的時(shí)間間隔執(zhí)行,自動(dòng)提交的邏輯是當(dāng)read接口被調(diào)用時(shí),認(rèn)為之前讀的數(shù)據(jù)已經(jīng)處理完畢。如果設(shè)置為false,那么每條record處理完必須ack,后臺(tái)提交點(diǎn)位會(huì)保證該點(diǎn)位之前的record全部被ack。 |
offsetCommitTimeoutMs | 點(diǎn)位的提交間隔,單位毫秒,默認(rèn)30000ms,范圍[3000, 300000] |
sessionTimeoutMs | 會(huì)話超時(shí)時(shí)間,心跳間隔會(huì)設(shè)為改置的2/3,超過(guò)時(shí)間沒(méi)有心跳,認(rèn)為客戶端已停止,服務(wù)端會(huì)重新分配被占有shard,單位毫秒,默認(rèn)60000ms,范圍[60000, 180000] |
fetchSize | 單個(gè)shard異步讀取記錄的大小,會(huì)緩存2倍于該值的記錄,少于2倍會(huì)觸發(fā)異步任務(wù)去讀取,默認(rèn)1000,必須大于0 |
import com.aliyun.credentials.Client;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpClient;
import com.aliyun.datahub.client.model.*;
import com.aliyun.datahub.clientlibrary.common.ExceptionChecker;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.consumer.DatahubConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumer.class);
public static void main(String[] args) throws Exception {
// 是否開(kāi)啟內(nèi)存Metric,開(kāi)啟后,配置log4j日志后,內(nèi)存metric會(huì)打印到日志中
// ClientMetrics.startMetrics();
//以杭州Region為例
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
Client credentialClient = new Client();
String accessKeyId = credentialClient.getAccessKeyId();
String accessKeySecret = credentialClient.getAccessKeySecret();
String projectName = "";
String topicName = "";
String subId = "";
////////////////////////////// STEP1. 創(chuàng)建DatahubConsumer //////////////////////////
ConsumerConfig config = new ConsumerConfig(endpoint, accessKeyId, accessKeySecret);
DatahubConsumer datahubConsumer = new DatahubConsumer(projectName, topicName, subId, config);
///////////////////////// STEP2:循環(huán)讀取數(shù)據(jù) /////////////////////////
try {
while (true) {
try {
RecordEntry record = datahubConsumer.read(3000);
if (record == null) {
continue; // 3s內(nèi)未讀取到數(shù)據(jù),(1). 無(wú)數(shù)據(jù) (2). 內(nèi)部狀態(tài)未Ready,比如協(xié)同消費(fèi)暫時(shí)未分配到shard
}
RecordData recordData = record.getRecordData();
// 根據(jù)Topic為BLOB類型還是TUPLE類型進(jìn)行不同的數(shù)據(jù)處理邏輯, 一種topic只有一種類型
if (recordData instanceof TupleRecordData) {
TupleRecordData data = (TupleRecordData) recordData;
RecordSchema schema = data.getRecordSchema();
// 示例中僅做簡(jiǎn)單的字符串拼接
StringBuilder sb = new StringBuilder();
for (int i = 0; i < schema.getFields().size(); ++i) {
sb.append(data.getField(i)).append(",");
}
LOGGER.debug("Read record. shardId:{}, seq:{}, ts:{}, batchIndex:{}, batchSize:{}, data:{}",
record.getShardId(), record.getSequence(), record.getSystemTime(), record.getSegmentIndexForBatch(),
record.getSegmentSizeForBatch(), sb);
} else {
BlobRecordData data = (BlobRecordData) recordData;
LOGGER.debug("Read record. shardId:{}, seq:{}, ts:{}, batchIndex:{}, batchSize:{}, data:{}",
record.getShardId(), record.getSequence(), record.getSystemTime(), record.getSegmentIndexForBatch(),
record.getSegmentSizeForBatch(), new String(data.getData()));
}
} catch (DatahubClientException e) {
if (!ExceptionChecker.isRetryableException(e)) {
LOGGER.info("Read data fail", e);
break;
}
// sleep重試
Thread.sleep(1000);
}
}
} catch (Exception e) {
LOGGER.warn("Read data fail", e);
} finally {
// 關(guān)閉consumer相關(guān)資源
datahubConsumer.close();
}
// 進(jìn)程退出時(shí),調(diào)用全局清理函數(shù)
HttpClient.close();
// ClientMetrics.stopMetrics();
}
}
多線程讀寫(xiě)示例
多線程共用同一Consumer/Producer消費(fèi)示例(適用于1.4及以上版本)
package com.aliyun.datahub.clientlibrary.example;
import com.aliyun.credentials.Client;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpClient;
import com.aliyun.datahub.client.model.BlobRecordData;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.clientlibrary.common.ExceptionChecker;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.consumer.DatahubConsumer;
import com.aliyun.datahub.clientlibrary.producer.DatahubProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class MultiThreadReadWrite {
private static final Logger LOGGER = LoggerFactory.getLogger(MultiThreadReadWrite.class);
public static void main(String[] args) throws Exception {
try {
// testProducer();
testConsumer();
} finally {
// 進(jìn)程退出時(shí),調(diào)用全局清理函數(shù)
HttpClient.close();
}
}
private static void testProducer() throws Exception {
List<RecordEntry> records = new ArrayList<>();
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
Client credentialClient = new Client();
String accessKeyId = credentialClient.getAccessKeyId();
String accessKeySecret = credentialClient.getAccessKeySecret();
String projectName = "";
String topicName = "";
for (int i = 0; i < 2; ++i) {
RecordEntry record = new RecordEntry();
BlobRecordData data = new BlobRecordData(("HelloWorld-" + i).getBytes());
record.setRecordData(data);
record.addAttribute("key1", "value1");
records.add(record);
}
ProducerConfig config = new ProducerConfig(endpoint, accessKeyId, accessKeySecret);
config.getDatahubConfig().setProtocol(DatahubConfig.Protocol.BATCH); // 是否開(kāi)啟batch寫(xiě)入,建議開(kāi)啟
Map<String, AtomicInteger> shardCountMap = new ConcurrentHashMap<>();
DatahubProducer producer = new DatahubProducer(projectName, topicName,config);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
ProducerThread thread = new ProducerThread(producer, records, shardCountMap);
thread.start();
threads.add(thread);
}
for (int i = 0; i < 10; ++i) {
threads.get(i).join();
}
producer.close();
// print write count
for (Map.Entry<String, AtomicInteger> entry : shardCountMap.entrySet()) {
LOGGER.info("ShardId:{}, count:{}", entry.getKey(), entry.getValue());
}
}
private static class ProducerThread extends Thread {
private final DatahubProducer producer;
private final List<RecordEntry> records;
private final Map<String, AtomicInteger> shardCountMap;
public ProducerThread(DatahubProducer producer,
List<RecordEntry> records,
Map<String, AtomicInteger> shardCountMap) {
this.producer = producer;
this.records = records;
this.shardCountMap = shardCountMap;
}
@Override
public void run() {
for (int i = 0; i < 100; ++i) {
try {
String shardId = producer.send(records);
shardCountMap.putIfAbsent(shardId, new AtomicInteger(0));
AtomicInteger cnt = shardCountMap.get(shardId);
cnt.incrementAndGet();
} catch (DatahubClientException e) {
if (!ExceptionChecker.isRetryableException(e)) {
LOGGER.info("Write data fail", e);
break;
}
// sleep重試
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
// ignore
}
} catch (Exception e) {
break;
}
}
}
}
private static void testConsumer() throws Exception {
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
Client credentialClient = new Client();
String accessKeyId = credentialClient.getAccessKeyId();
String accessKeySecret = credentialClient.getAccessKeySecret();
String projectName = "";
String topicName = "";
String subId = "";
ConsumerConfig config = new ConsumerConfig(endpoint, accessKeyId,accessKeySecret);
config.getDatahubConfig().setProtocol(DatahubConfig.Protocol.BATCH); // 是否開(kāi)啟batch寫(xiě)入,建議開(kāi)啟
Map<String, RecordEntry> firstMap = new ConcurrentHashMap<>();
Map<String, RecordEntry> lastMap = new ConcurrentHashMap<>();
Map<String, AtomicInteger> shardCountMap = new ConcurrentHashMap<>();
DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, config);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
ConsumerThread thread = new ConsumerThread(consumer, firstMap, lastMap, shardCountMap);
thread.start();
threads.add(thread);
}
for (int i = 0; i < 10; ++i) {
threads.get(i).join();
}
// print start and end sequence
for (RecordEntry first : firstMap.values()) {
RecordEntry last = lastMap.get(first.getShardId());
AtomicInteger cnt = shardCountMap.get(first.getShardId());
LOGGER.info("ShardId:{}, startSeq:{}, endSeq:{}, cnt:{}",
first.getShardId(), first.getSequence(), last.getSequence(), cnt);
}
// 關(guān)閉consumer相關(guān)資源
consumer.close();
}
private static class ConsumerThread extends Thread {
private final DatahubConsumer consumer;
private final Map<String, RecordEntry> firstMap;
private final Map<String, RecordEntry> lastMap;
private final Map<String, AtomicInteger> shardCountMap;
public ConsumerThread(DatahubConsumer consumer,
Map<String, RecordEntry> firstMap,
Map<String, RecordEntry> lastMap,
Map<String, AtomicInteger> shardCountMap) {
this.consumer = consumer;
this.lastMap = lastMap;
this.firstMap = firstMap;
this.shardCountMap = shardCountMap;
}
@Override
public void run() {
while (true) {
try {
RecordEntry record = consumer.read(30000);
if (record == null) {
// 在demo中,這里30秒讀取不到數(shù)據(jù)就退出測(cè)試
break;
}
String shardId = record.getShardId();
firstMap.putIfAbsent(shardId, record);
lastMap.put(shardId, record);
shardCountMap.putIfAbsent(shardId, new AtomicInteger(0));
AtomicInteger cnt = shardCountMap.get(shardId);
cnt.incrementAndGet();
} catch (DatahubClientException e) {
if (!ExceptionChecker.isRetryableException(e)) {
LOGGER.info("Read data fail", e);
break;
}
// sleep重試
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
// ignore
}
} catch (Exception e) {
LOGGER.warn("Read fail.", e);
break;
}
}
}
}
}
多個(gè)Consumer/Producer線程消費(fèi)示例(適用于低于1.4的版本)
package com.aliyun.datahub.clientlibrary.example;
import com.aliyun.credentials.Client;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpClient;
import com.aliyun.datahub.client.model.BlobRecordData;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.clientlibrary.common.ExceptionChecker;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.consumer.DatahubConsumer;
import com.aliyun.datahub.clientlibrary.producer.DatahubProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class MultiProducerAndConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(MultiProducerAndConsumer.class);
public static void main(String[] args) throws Exception {
try {
// testProducer();
testConsumer();
} finally {
// 進(jìn)程退出時(shí),調(diào)用全局清理函數(shù)
HttpClient.close();
}
}
private static void testProducer() throws Exception {
List<RecordEntry> records = new ArrayList<>();
for (int i = 0; i < 2; ++i) {
RecordEntry record = new RecordEntry();
BlobRecordData data = new BlobRecordData(("HelloWorld-" + i).getBytes());
record.setRecordData(data);
record.addAttribute("key1", "value1");
records.add(record);
}
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
Client credentialClient = new Client();
String accessKeyId = credentialClient.getAccessKeyId();
String accessKeySecret = credentialClient.getAccessKeySecret();
ProducerConfig config = new ProducerConfig(endpoint,accessKeyId ,accessKeySecret);
config.getDatahubConfig().setProtocol(DatahubConfig.Protocol.BATCH); // 是否開(kāi)啟batch寫(xiě)入,建議開(kāi)啟
Map<String, AtomicInteger> shardCountMap = new ConcurrentHashMap<>();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 3; ++i) {
ProducerThread thread = new ProducerThread(config, records, shardCountMap);
thread.start();
threads.add(thread);
}
for (int i = 0; i < 3; ++i) {
threads.get(i).join();
}
// print write count
for (Map.Entry<String, AtomicInteger> entry : shardCountMap.entrySet()) {
LOGGER.info("ShardId:{}, count:{}", entry.getKey(), entry.getValue());
}
}
private static class ProducerThread extends Thread {
private final DatahubProducer producer;
private final List<RecordEntry> records;
private final Map<String, AtomicInteger> shardCountMap;
String projectName = "";
String topicName = "";
public ProducerThread(ProducerConfig config,
List<RecordEntry> records,
Map<String, AtomicInteger> shardCountMap) {
this.producer = new DatahubProducer(projectName, topicName, config);;
this.records = records;
this.shardCountMap = shardCountMap;
}
@Override
public void run() {
try {
for (int i = 0; i < 100; ++i) {
try {
String shardId = producer.send(records);
shardCountMap.putIfAbsent(shardId, new AtomicInteger(0));
AtomicInteger cnt = shardCountMap.get(shardId);
cnt.incrementAndGet();
} catch (DatahubClientException e) {
LOGGER.info("Producer send fail", e);
if (!ExceptionChecker.isRetryableException(e)) {
break;
}
// sleep重試
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
// ignore
}
} catch (Exception e) {
LOGGER.info("Producer send fail", e);
break;
}
}
} finally {
producer.close();
}
}
}
private static void testConsumer() throws Exception {
ConsumerConfig config = new ConsumerConfig(ExampleConstants.ENDPOINT, ExampleConstants.ACCESS_ID, ExampleConstants.SUB_ACCESS_KEY);
config.getDatahubConfig().setProtocol(DatahubConfig.Protocol.BATCH); // 是否開(kāi)啟batch寫(xiě)入,建議開(kāi)啟
Map<String, RecordEntry> firstMap = new ConcurrentHashMap<>();
Map<String, RecordEntry> lastMap = new ConcurrentHashMap<>();
Map<String, AtomicInteger> shardCountMap = new ConcurrentHashMap<>();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 3; ++i) {
ConsumerThread thread = new ConsumerThread(config, firstMap, lastMap, shardCountMap);
thread.start();
threads.add(thread);
}
for (int i = 0; i < 3; ++i) {
threads.get(i).join();
}
// print start and end sequence
for (RecordEntry first : firstMap.values()) {
RecordEntry last = lastMap.get(first.getShardId());
AtomicInteger cnt = shardCountMap.get(first.getShardId());
LOGGER.info("ShardId:{}, startSeq:{}, endSeq:{}, cnt:{}",
first.getShardId(), first.getSequence(), last.getSequence(), cnt);
}
}
private static class ConsumerThread extends Thread {
private final DatahubConsumer consumer;
private final Map<String, RecordEntry> firstMap;
private final Map<String, RecordEntry> lastMap;
private final Map<String, AtomicInteger> shardCountMap;
public ConsumerThread(ConsumerConfig config,
Map<String, RecordEntry> firstMap,
Map<String, RecordEntry> lastMap,
Map<String, AtomicInteger> shardCountMap) {
this.consumer = new DatahubConsumer(ExampleConstants.PROJECT_NAME, ExampleConstants.TOPIC_NAME, ExampleConstants.SUB_ID, config);;
this.lastMap = lastMap;
this.firstMap = firstMap;
this.shardCountMap = shardCountMap;
}
@Override
public void run() {
try {
while (true) {
try {
RecordEntry record = consumer.read(30000);
if (record == null) {
// 在demo中,這里30秒讀取不到數(shù)據(jù)就退出測(cè)試
break;
}
String shardId = record.getShardId();
firstMap.putIfAbsent(shardId, record);
lastMap.put(shardId, record);
shardCountMap.putIfAbsent(shardId, new AtomicInteger(0));
AtomicInteger cnt = shardCountMap.get(shardId);
cnt.incrementAndGet();
} catch (DatahubClientException e) {
if (!ExceptionChecker.isRetryableException(e)) {
LOGGER.info("Read data fail", e);
break;
}
// sleep重試
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
// ignore
}
} catch (Exception e) {
LOGGER.warn("Read fail.", e);
break;
}
}
} finally {
consumer.close();
}
}
}
}