讀寫數據
用戶可以調用SDK方法進行讀寫數據狀態為CLOSED和ACTIVE的shard都可以讀取數據,不過只有狀態為ACTIVE的shard可以寫數據。
同時用戶可以引入datahub-client-library依賴,datahub-client-library是在Java-SDK讀寫功能的封裝,用戶可以使用Producer實現均勻寫入shard,也可以使用Consumer實現協同消費,(建議使用)
讀數據
讀取數據有兩種方式,
使用SDK
使用協同消費
使用SDK
步驟一:獲取cursor
讀取Topic下的數據,需要指定對應的shard,同時需要指定數據讀取的游標位置Cursor。Cursor的獲取方式有四種,分別是OLDEST, LATEST, SEQUENCE, SYSTEM_TIME。
OLDEST : 表示獲取的cursor指向當前有效數據中時間最久遠的record。
LATEST : 表示獲取的cursor指向當前最新的record。
SEQUENCE : 表示獲取的cursor指向該序列的record。
SYSTEM_TIME : 表示獲取的cursor指向該大于等于該時間戳的第一條record。
說明GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type);GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type, long param);
參數
projectName The name of the project.
topicName The name of the topic.
shardId The id of the shard.
CursorType Which type used to get cursor.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
SeekOutOfRangeException
示例代碼
public static void getcursor() { String shardId = "5"; try { /* OLDEST用法示例 */ String oldestCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor(); /* LATEST用法示例 */ String latestCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.LATEST).getCursor(); /* SEQUENCE用法示例 */ //獲取最新數據的sequence long seq = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.LATEST).getSequence(); //獲取最新的十條數據的讀取位置 String seqCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, seq - 9).getCursor(); /* SYSTEM_TIME用法示例 */ //將時間轉為時間戳形式 String time = "2019-07-01 10:00:00"; SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long timestamp = 0L; try { Date date = simpleDateFormat.parse(time); timestamp = date.getTime();//獲取時間的時間戳 //System.out.println(timestamp); } catch (ParseException e) { System.exit(-1); } //獲取時間time之后的數據讀取位置 String timeCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SYSTEM_TIME, timestamp).getCursor(); System.out.println("get cursor successful"); } catch (InvalidParameterException e) { System.out.println("invalid parameter, please check your parameter"); System.exit(1); } catch (AuthorizationFailureException e) { System.out.println("AK error, please check your accessId and accessKey"); System.exit(1); } catch (ResourceNotFoundException e) { System.out.println("project or topic or shard not found"); System.exit(1); } catch (SeekOutOfRangeException e) { System.out.println("offset invalid or has expired"); System.exit(1); } catch (DatahubClientException e) { System.out.println("other error"); System.out.println(e); System.exit(1); } }
步驟二:讀取數據接口:
GetRecordsResult getRecords(String projectName, String topicName, String shardId, String cursor, int limit);GetRecordsResult getRecords(String projectName, String topicName, String shardId, RecordSchema schema, String cursor, int limit);
參數
projectName The name of the project.
topicName The name of the topic.
shardId The id of the shard.
schema If you read TUPLE records, you need this parameter.
cursor The start cursor used to read data.
limit Max record size to read.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
ShardSealedException
LimitExceededException
示例
1). 讀取Tuple topic數據
public static void example() {
//每次最多讀取數據量
int recordLimit = 1000;
String shardId = "7";
// 獲取cursor, 這里獲取有效數據中時間最久遠的record游標
// 注: 正常情況下,getCursor只需在初始化時獲取一次,然后使用getRecords的nextCursor進行下一次讀取
String cursor = "";
try {
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
} catch (InvalidParameterException e) {
System.out.println("invalid parameter, please check your parameter");
System.exit(1);
} catch (AuthorizationFailureException e) {
System.out.println("AK error, please check your accessId and accessKey");
System.exit(1);
} catch (ResourceNotFoundException e) {
System.out.println("project or topic or shard not found");
System.exit(1);
} catch (SeekOutOfRangeException e) {
System.out.println("offset invalid or has expired");
System.exit(1);
} catch (DatahubClientException e) {
System.out.println("other error");
System.out.println(e);
System.exit(1);
}
while (true) {
try {
GetRecordsResult result = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, recordSchema, cursor, recordLimit);
if (result.getRecordCount() <= 0) {
// 無數據,sleep后讀取
Thread.sleep(10000);
continue;
}
for (RecordEntry entry : result.getRecords()) {
TupleRecordData data = (TupleRecordData) entry.getRecordData();
System.out.println("field1:" + data.getField("field1") + "\t"
+ "field2:" + data.getField("field2"));
}
// 拿到下一個游標
cursor = result.getNextCursor();
} catch (InvalidCursorException ex) {
// 非法游標或游標已過期,建議重新定位后開始消費
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
} catch (SeekOutOfRangeException e) {
System.out.println("offset invalid");
System.exit(1);
} catch (ResourceNotFoundException e) {
System.out.println("project or topic or shard not found");
System.exit(1);
} catch (ShardSealedException e) {
System.out.println("shard is closed, all data has been read");
System.exit(1);
} catch (LimitExceededException e) {
System.out.println("maybe exceed limit, retry");
} catch (DatahubClientException e) {
System.out.println("other error");
System.out.println(e);
System.exit(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2). 讀取Blob topic數據
public static void example() {
//每次最多讀取數據量
int recordLimit = 1000;
String shardId = "7";
// 獲取cursor, 這里獲取有效數據中時間最久遠的record游標
// 注: 正常情況下,getCursor只需在初始化時獲取一次,然后使用getRecords的nextCursor進行下一次讀取
String cursor = "";
try {
cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
} catch (InvalidParameterException e) {
System.out.println("invalid parameter, please check your parameter");
System.exit(1);
} catch (AuthorizationFailureException e) {
System.out.println("AK error, please check your accessId and accessKey");
System.exit(1);
} catch (ResourceNotFoundException e) {
System.out.println("project or topic or shard not found");
System.exit(1);
} catch (SeekOutOfRangeException e) {
System.out.println("offset invalid or has expired");
System.exit(1);
} catch (DatahubClientException e) {
System.out.println("other error");
System.out.println(e);
System.exit(1);
}
while (true) {
try {
GetRecordsResult result = datahubClient.getRecords(Constant.projectName, Constant.blobTopicName, shardId, recordSchema, cursor, recordLimit);
if (result.getRecordCount() <= 0) {
// 無數據,sleep后讀取
Thread.sleep(10000);
continue;
}
/* 消費數據 */
for (RecordEntry record: result.getRecords()){
BlobRecordData data = (BlobRecordData) record.getRecordData();
System.out.println(new String(data.getData()));
}
// 拿到下一個游標
cursor = result.getNextCursor();
} catch (InvalidCursorException ex) {
// 非法游標或游標已過期,建議重新定位后開始消費
cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
} catch (SeekOutOfRangeException e) {
System.out.println("offset invalid");
System.exit(1);
} catch (ResourceNotFoundException e) {
System.out.println("project or topic or shard not found");
System.exit(1);
} catch (ShardSealedException e) {
System.out.println("shard is closed, all data has been read");
System.exit(1);
} catch (LimitExceededException e) {
System.out.println("maybe exceed limit, retry");
} catch (DatahubClientException e) {
System.out.println("other error");
System.out.println(e);
System.exit(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
使用協同消費
步驟一:初始化Consumer
配置
名稱 | 描述 |
autoCommit | 是否自動提交點位,默認為true。點位的提交會在后臺線程按配置的時間間隔執行,自動提交的邏輯是當read接口被調用時,認為之前讀的數據已經處理完畢。如果設置為false,那么每條record處理完必須ack,后臺提交點位會保證該點位之前的record全部被ack。 |
offsetCommitTimeoutMs | 點位的提交間隔,單位毫秒,默認30000ms,范圍[3000, 300000] |
sessionTimeoutMs | 會話超時時間,心跳間隔會設為改置的2/3,超過時間沒有心跳,認為客戶端已停止,服務端會重新分配被占有shard,單位毫秒,默認60000ms,范圍[60000, 180000] |
fetchSize | 單個shard異步讀取記錄的大小,會緩存2倍于該值的記錄,少于2倍會觸發異步任務去讀取,默認1000,必須大于0 |
您還需要在工程中配置相應的Access Key和Secret Key,推薦使用環境變量的形式在配置文件中配置。
datahub.endpoint=<yourEndpoint>
datahub.accessId=<yourAccessKeyId>
datahub.accessKey=<yourAccessKeySecret>
阿里云賬號AccessKey擁有所有API的訪問權限,建議您使用RAM用戶進行API訪問或日常運維。
強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。
{
@Value("${datahub.endpoint}")
String endpoint ;
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
// Endpoint以Region: 華東1為例,其他Region請按實際情況填寫
String projectName = "<YourProjectName>";
String topicName = "<YourTopicName>";
String subId = "<YourSubscriptionId>";
/**
* 1.使用協同消費,使用點位服務
*使用協同消費:
如果有兩臺機器使用同一個subid來消費topic(有5個shard)中的數據,不需要手動指定哪臺機器消費哪幾個shard,
服務器端會自動分配,當有第三臺機器加入了后也會做balance
*
* 使用點位服務:
消費的時候,會根據服務端subid的點位來讀(如果是新建的訂閱,還沒有點位就從頭讀),如果要指定從哪個時間點開始讀,可以在頁面上重置subid的點位
*
* */
ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
Consumer consumer = new Consumer(projectName, topicName, subId, config);
/**
* 2. 不使用協同消費,使用點位服務,提供subId和Consumer讀取的shard列表
*不使用協同消費:
如果有兩臺機器使用同一個subid來消費topic(有5個shard)中的數據,那需要手動指定哪臺機器消費哪幾個shard,
(客戶端機器A,消費0,1,2號shard;客戶端機器B,消費3,4號shard)當有第三臺機器加入了后需要自己重新指定消費策略
*
* */
//客戶端A消費shardid為0,1,2的示例
List<String> shardlists = Arrays.asList("0", "1", "2");
ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
Consumer consumer = new Consumer(projectName, topicName, subId, shardlists, config);
//客戶端B消費shardid為3,4的示例
// List<String> shardlists = Arrays.asList("3", "4");
// ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
// Consumer consumer = new Consumer(projectName, topicName, subId, shardlists, config);
/**
*3.不使用協同消費,不使用點位服務
*不使用點位服務:
就是自己需要找個存儲(db/redis等)來記錄自己哪個shard消費到什么時間/Sequence,每次讀的時候都要根據自己記錄的點位來初始化
*
* */
Map<String, Offset> offsetMap = new HashMap<>();
// 提供sequence和timestamp,若sequence超出范圍則使用timestamp獲取Cursor
offsetMap.put("0", new Offset(100, 1548573440756L));
// 只提供sequence,按照sequence獲取Cursor
offsetMap.put("1", new Offset().setSequence(1));
// 只提供timestamp,按照timestamp獲取Cursor
offsetMap.put("2", new Offset().setTimestamp(1548573440756L));
ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
Consumer consumer = new Consumer(projectName, topicName, subId, offsetMap, config);
}
步驟二:協同代碼示例
import com.aliyun.datahub.client.exception.AuthorizationFailureException;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.exception.NoPermissionException;
import com.aliyun.datahub.client.exception.SubscriptionOfflineException;
import com.aliyun.datahub.client.exception.SubscriptionOffsetResetException;
import com.aliyun.datahub.client.exception.SubscriptionSessionInvalidException;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.TupleRecordData;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class DatahubReader {
private static final Logger LOG = LoggerFactory.getLogger(DatahubReader.class);
private static void sleep(long milliSeconds) {
try {
TimeUnit.MILLISECONDS.sleep(milliSeconds);
} catch (InterruptedException e) {
// TODO:自行處理異常
}
}
public static Consumer createConsumer(ConsumerConfig config, String project, String topic, String subId)
{
return new Consumer(project, topic, subId, config);
}
public static void main(String[] args) {
String projectName = "<YourProjectName>";
String topicName = "<YourTopicName>";
String subId = "<YourSubscriptionId>";
ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
Consumer consumer = createConsumer(config, projectName, topicName, subId);
int maxRetry = 3;
boolean stop = false;
try {
while (!stop) {
try {
while (true) {
// 協同消費剛初始化,需要等待服務端分配shard,約40秒,期間只能返回null
// 自動提交模式,每次調用read,認為之前讀的數據都已處理完成,自動ack
RecordEntry record = consumer.read(maxRetry);
// 處理數據
if (record != null) {
TupleRecordData data = (TupleRecordData) record.getRecordData();
// 根據自己的schema來處理數據,此處打印第一列的內容
LOG.info("field1: {}", data.getField(0));
// 根據列名取數據
// LOG.info("field2: {}", data.getField("field2"));
// 非自動提交模式,每條record處理完后都需要ack
// 自動提交模式,ack不會做任何操作
// 1.1.7版本及以上
record.getKey().ack();
} else {
LOG.info("read null");
}
}
} catch (SubscriptionOffsetResetException e) {
// 點位被重置,重新初始化consumer
try {
consumer.close();
consumer = createConsumer(config, projectName, topicName, subId);
} catch (DatahubClientException e1) {
// 初始化失敗,重試或直接拋異常
LOG.error("create consumer failed", e);
throw e;
}
} catch (InvalidParameterException |
SubscriptionOfflineException |
SubscriptionSessionInvalidException |
AuthorizationFailureException |
NoPermissionException e) {
// 請求參數非法
// 訂閱被下線
// 訂閱下相同shard被其他客戶端占用
// 簽名不正確
// 沒有權限
LOG.error("read failed", e);
throw e;
} catch (DatahubClientException e) {
// 基類異常,包含網絡問題等,可以選擇重試
LOG.error("read failed, retry", e);
sleep(1000);
}
}
} catch (Throwable e) {
LOG.error("read failed", e);
} finally {
// 確保資源正確釋放
// 會提交已ack的點位
consumer.close();
}
}
}
寫數據
使用SDK
服務器2.12之后版本開始支持PutRecordsByShardResult接口,之前版本putRecords接口,使用putRecordsByShard接口時需指定寫入的shard,否則會默認寫入第一個處于ACTIVE狀態的shard。兩個方法中傳入參數records是一個List對象,每個元素為一個record,但是必須為相同類型的record,即Tuple類型或者Blob類型。DataHub目前支持按照Shard寫入 (服務端 >= 2.12版本) 以及混合寫入,分別對應putRecordsByShard
和putRecords
兩個接口。針對第二個接口,用戶需要判斷PutRecordsResult
結果以確認數據是否寫入成功;而putRecordsByShard
接口則直接通過異常告知用戶是否成功。如果服務端支持,建議用戶使用putRecordsByShard
接口。
PutRecordsResult putRecords(String projectName, String topicName, List records);PutRecordsByShardResult putRecordsByShard(String projectName, String topicName, String shardId, List records);
參數
projectName The name of the project.
topicName The name of the topic.
shardId The id of the shard.
records Records list to written.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
ShardSealedException
LimitExceededException
1). 寫入Tuple topic
// 寫入Tuple型數據
public static void tupleExample() {
String shardId = "9";
// 獲取schema
recordSchema = datahubClient.getTopic(Constant.projectName, Constant.topicName).getRecordSchema();
// 生成十條數據
List<RecordEntry> recordEntries = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
RecordEntry recordEntry = new RecordEntry();
// 對每條數據設置額外屬性,例如ip 機器名等。可以不設置額外屬性,不影響數據寫入
recordEntry.addAttribute("key1", "value1");
TupleRecordData data = new TupleRecordData(recordSchema);
data.setField("field1", "HelloWorld");
data.setField("field2", 1234567);
recordEntry.setRecordData(data);
recordEntry.setShardId(shardId);
recordEntries.add(recordEntry);
}
try {
// 服務端從2.12版本開始支持,之前版本請使用putRecords接口
//datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
System.out.println("write data successful");
} catch (InvalidParameterException e) {
System.out.println("invalid parameter, please check your parameter");
System.exit(1);
} catch (AuthorizationFailureException e) {
System.out.println("AK error, please check your accessId and accessKey");
System.exit(1);
} catch (ResourceNotFoundException e) {
System.out.println("project or topic or shard not found");
System.exit(1);
} catch (ShardSealedException e) {
System.out.println("shard status is CLOSED, can not write");
System.exit(1);
} catch (DatahubClientException e) {
System.out.println("other error");
System.out.println(e);
System.exit(1);
}
}
2). 寫入Blob topic
// 寫入blob型數據
public static void blobExample() {
// 生成十條數據
List<RecordEntry> recordEntries = new ArrayList<>();
String shardId = "4";
for (int i = 0; i < 10; ++i) {
RecordEntry recordEntry = new RecordEntry();
// 對每條數據設置額外屬性
recordEntry.addAttribute("key1", "value1");
BlobRecordData data = new BlobRecordData("123456".getBytes(Charsets.UTF_8));
recordEntry.setRecordData(data);
recordEntry.setShardId(shardId);
recordEntries.add(recordEntry);
recordEntry.setShardId("0");
}
while (true) {
try {
// 服務端從2.12版本開始支持,之前版本請使用putRecords接口
//datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
System.out.println("write data successful");
break;
} catch (InvalidParameterException e) {
System.out.println("invalid parameter, please check your parameter");
System.exit(1);
} catch (AuthorizationFailureException e) {
System.out.println("AK error, please check your accessId and accessKey");
System.exit(1);
} catch (ResourceNotFoundException e) {
System.out.println("project or topic or shard not found");
System.exit(1);
} catch (ShardSealedException e) {
System.out.println("shard status is CLOSED, can not write");
System.exit(1);
} catch (LimitExceededException e) {
System.out.println("maybe qps exceed limit, retry");
} catch (DatahubClientException e) {
System.out.println("other error");
System.out.println(e);
System.exit(1);
}
}
}
使用Producer
步驟一:引入依賴
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.19.0-public</version>
</dependency>
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>datahub-client-library</artifactId>
<version>1.1.12-public</version>
</dependency>
步驟二:代碼示例
import com.aliyun.datahub.client.exception.AuthorizationFailureException;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.exception.MalformedRecordException;
import com.aliyun.datahub.client.exception.NoPermissionException;
import com.aliyun.datahub.client.exception.ShardNotFoundException;
import com.aliyun.datahub.client.model.Field;
import com.aliyun.datahub.client.model.FieldType;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.RecordSchema;
import com.aliyun.datahub.client.model.TupleRecordData;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.producer.Producer;
import com.aliyun.datahub.exception.ResourceNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class DatahubWriter {
private static final Logger LOG = LoggerFactory.getLogger(DatahubWriter.class);
private static void sleep(long milliSeconds) {
try {
TimeUnit.MILLISECONDS.sleep(milliSeconds);
} catch (InterruptedException e) {
// TODO:自行處理異常
}
}
private static List<RecordEntry> genRecords(RecordSchema schema) {
List<RecordEntry> recordEntries = new ArrayList<>();
for (int cnt = 0; cnt < 10; ++cnt) {
RecordEntry entry = new RecordEntry();
entry.addAttribute("key1", "value1");
entry.addAttribute("key2", "value2");
TupleRecordData data = new TupleRecordData(schema);
data.setField("field1", "testValue");
data.setField("field2", 1);
entry.setRecordData(data);
recordEntries.add(entry);
}
return recordEntries;
}
private static void sendRecords(Producer producer, List<RecordEntry> recordEntries) {
int maxRetry = 3;
while (true) {
try {
// 自動選擇shard寫入
producer.send(recordEntries, maxRetry);
// 指定寫入shard "0"
// producer.send(recordEntries, "0", maxRetry);
LOG.error("send records: {}", recordEntries.size());
break;
} catch (MalformedRecordException e) {
// record 格式非法,根據業務場景選擇忽略或直接拋異常
LOG.error("write fail", e);
throw e;
} catch (InvalidParameterException |
AuthorizationFailureException |
NoPermissionException e) {
// 請求參數非法
// 簽名不正確
// 沒有權限
LOG.error("write fail", e);
throw e;
} catch (ShardNotFoundException e) {
// shard 不存在, 如果不是寫入自己指定的shard,可以不用處理
LOG.error("write fail", e);
sleep(1000);
} catch (ResourceNotFoundException e) {
// project, topic 或 shard 不存在
LOG.error("write fail", e);
throw e;
} catch (DatahubClientException e) {
// 基類異常,包含網絡問題等,可以選擇重試
LOG.error("write fail", e);
sleep(1000);
}
}
}
public static void main(String[] args) {
String projectName = "<YourProjectName>";
String topicName = "<YourTopicName>";
RecordSchema schema = datahubClient.getTopic(projectName, topicName).getRecordSchema();
ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey);
Producer producer = new Producer(projectName, topicName, config);
// 根據場景控制循環
boolean stop = false;
try {
while (!stop) {
List<RecordEntry> recordEntries = genRecords(schema);
sendRecords(producer, recordEntries);
}
} finally {
// 確保資源正確釋放
producer.close();
}
}
}
多方式寫入
在DataHub 2.12之前版本,DataHub僅支持putRecords
接口,在RecordEntry
類中包含shardId
、partitionKey
和hashKey
三個屬性,用戶通過指定對應屬性的值決定數據寫入到哪個Shard中注意:開啟Shard extend模式無法按Hashkey和PartitionKey方式寫入
2.12及之后版本,建議用戶使用putRecordsByShard接口,避免服務端partition造成的性能損耗
1). 按照ShardID寫入推薦方式,使用示例如下:
RecordEntry entry = new RecordEntry();
entry.setShardId("0");
2). 按HashKey寫入指定一個128 bit的MD5值。 按照HashKey寫入,根據Shard的Shard操作決定數據寫入的Shard使用示例:
RecordEntry entry = new RecordEntry();
entry.setHashKey("7FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD");
2). 按PartitionKey寫入指定一個String類型參數作為PartitionKey,系統根據該String的MD5值以及Shard的Shard操作決定寫入的Shard使用示例:
RecordEntry entry = new RecordEntry();
entry.setPartitionKey("TestPartitionKey");
注意事項
Consumer和Producer都不支持多線程訪問,如果需要使用多線程,則在每個線程都使用不同的Consumer或Producer對象。