日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

讀寫數據

讀寫數據

  • 用戶可以調用SDK方法進行讀寫數據狀態為CLOSED和ACTIVE的shard都可以讀取數據,不過只有狀態為ACTIVE的shard可以寫數據。

  • 同時用戶可以引入datahub-client-library依賴,datahub-client-library是在Java-SDK讀寫功能的封裝,用戶可以使用Producer實現均勻寫入shard,也可以使用Consumer實現協同消費,(建議使用)

讀數據

讀取數據有兩種方式,

  1. 使用SDK

  2. 使用協同消費

    使用SDK

    步驟一:獲取cursor

    讀取Topic下的數據,需要指定對應的shard,同時需要指定數據讀取的游標位置Cursor。Cursor的獲取方式有四種,分別是OLDEST, LATEST, SEQUENCE, SYSTEM_TIME。

  • OLDEST : 表示獲取的cursor指向當前有效數據中時間最久遠的record。

  • LATEST : 表示獲取的cursor指向當前最新的record。

  • SEQUENCE : 表示獲取的cursor指向該序列的record。

  • SYSTEM_TIME : 表示獲取的cursor指向該大于等于該時間戳的第一條record。

    說明

    GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type);GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type, long param);

  • 參數

    • projectName The name of the project.

    • topicName The name of the topic.

    • shardId The id of the shard.

    • CursorType Which type used to get cursor.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

    • SeekOutOfRangeException

  • 示例代碼

    public static void getcursor() {
      String shardId = "5";
      try {
          /* OLDEST用法示例 */
          String oldestCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
          /* LATEST用法示例 */
          String latestCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.LATEST).getCursor();
          /* SEQUENCE用法示例 */
          //獲取最新數據的sequence
          long seq = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.LATEST).getSequence();
          //獲取最新的十條數據的讀取位置
          String seqCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, seq - 9).getCursor();
          /* SYSTEM_TIME用法示例 */
          //將時間轉為時間戳形式
          String time = "2019-07-01 10:00:00";
          SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
          long timestamp = 0L;
          try {
              Date date = simpleDateFormat.parse(time);
              timestamp = date.getTime();//獲取時間的時間戳
              //System.out.println(timestamp);
          } catch (ParseException e) {
              System.exit(-1);
          }
          //獲取時間time之后的數據讀取位置
          String timeCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SYSTEM_TIME, timestamp).getCursor();
          System.out.println("get cursor successful");
      } catch (InvalidParameterException e) {
          System.out.println("invalid parameter, please check your parameter");
          System.exit(1);
      } catch (AuthorizationFailureException e) {
          System.out.println("AK error, please check your accessId and accessKey");
          System.exit(1);
      } catch (ResourceNotFoundException e) {
          System.out.println("project or topic or shard not found");
          System.exit(1);
      } catch (SeekOutOfRangeException e) {
          System.out.println("offset invalid or has expired");
          System.exit(1);
      } catch (DatahubClientException e) {
          System.out.println("other error");
          System.out.println(e);
          System.exit(1);
      }
    }

步驟二:讀取數據接口:

說明

GetRecordsResult getRecords(String projectName, String topicName, String shardId, String cursor, int limit);GetRecordsResult getRecords(String projectName, String topicName, String shardId, RecordSchema schema, String cursor, int limit);

  • 參數

    • projectName The name of the project.

    • topicName The name of the topic.

    • shardId The id of the shard.

    • schema If you read TUPLE records, you need this parameter.

    • cursor The start cursor used to read data.

    • limit Max record size to read.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

    • ShardSealedException

    • LimitExceededException

  • 示例

1). 讀取Tuple topic數據

public static void example() {
     //每次最多讀取數據量
     int recordLimit = 1000;
     String shardId = "7";
     // 獲取cursor, 這里獲取有效數據中時間最久遠的record游標
     // 注: 正常情況下,getCursor只需在初始化時獲取一次,然后使用getRecords的nextCursor進行下一次讀取
     String cursor = "";
     try {
         cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
     } catch (InvalidParameterException e) {
         System.out.println("invalid parameter, please check your parameter");
         System.exit(1);
     } catch (AuthorizationFailureException e) {
         System.out.println("AK error, please check your accessId and accessKey");
         System.exit(1);
     } catch (ResourceNotFoundException e) {
         System.out.println("project or topic or shard not found");
         System.exit(1);
     } catch (SeekOutOfRangeException e) {
         System.out.println("offset invalid or has expired");
         System.exit(1);
     } catch (DatahubClientException e) {
         System.out.println("other error");
         System.out.println(e);
         System.exit(1);
     }
     while (true) {
         try {
             GetRecordsResult result = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, recordSchema, cursor, recordLimit);
             if (result.getRecordCount() <= 0) {
                 // 無數據,sleep后讀取
                 Thread.sleep(10000);
                 continue;
             }
             for (RecordEntry entry : result.getRecords()) {
                 TupleRecordData data = (TupleRecordData) entry.getRecordData();
                 System.out.println("field1:" + data.getField("field1") + "\t"
                         + "field2:" + data.getField("field2"));
             }
             // 拿到下一個游標
             cursor = result.getNextCursor();
         } catch (InvalidCursorException ex) {
             // 非法游標或游標已過期,建議重新定位后開始消費
             cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
         } catch (SeekOutOfRangeException e) {
             System.out.println("offset invalid");
             System.exit(1);
         } catch (ResourceNotFoundException e) {
             System.out.println("project or topic or shard not found");
             System.exit(1);
         } catch (ShardSealedException e) {
             System.out.println("shard is closed, all data has been read");
             System.exit(1);
         } catch (LimitExceededException e) {
             System.out.println("maybe exceed limit, retry");
         } catch (DatahubClientException e) {
             System.out.println("other error");
             System.out.println(e);
             System.exit(1);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
     }
 }

2). 讀取Blob topic數據

public static void example() {
    //每次最多讀取數據量
    int recordLimit = 1000;
    String shardId = "7";
    // 獲取cursor, 這里獲取有效數據中時間最久遠的record游標
    // 注: 正常情況下,getCursor只需在初始化時獲取一次,然后使用getRecords的nextCursor進行下一次讀取
    String cursor = "";
    try {
        cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
    } catch (InvalidParameterException e) {
        System.out.println("invalid parameter, please check your parameter");
        System.exit(1);
    } catch (AuthorizationFailureException e) {
        System.out.println("AK error, please check your accessId and accessKey");
        System.exit(1);
    } catch (ResourceNotFoundException e) {
        System.out.println("project or topic or shard not found");
        System.exit(1);
    } catch (SeekOutOfRangeException e) {
        System.out.println("offset invalid or has expired");
        System.exit(1);
    } catch (DatahubClientException e) {
        System.out.println("other error");
        System.out.println(e);
        System.exit(1);
    }
    while (true) {
        try {
            GetRecordsResult result = datahubClient.getRecords(Constant.projectName, Constant.blobTopicName, shardId, recordSchema, cursor, recordLimit);
            if (result.getRecordCount() <= 0) {
                // 無數據,sleep后讀取
                Thread.sleep(10000);
                continue;
            }
            /* 消費數據 */
            for (RecordEntry record: result.getRecords()){
                 BlobRecordData data = (BlobRecordData) record.getRecordData();
                 System.out.println(new String(data.getData()));
            }
            // 拿到下一個游標
            cursor = result.getNextCursor();
        } catch (InvalidCursorException ex) {
            // 非法游標或游標已過期,建議重新定位后開始消費
            cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
        } catch (SeekOutOfRangeException e) {
            System.out.println("offset invalid");
            System.exit(1);
        } catch (ResourceNotFoundException e) {
            System.out.println("project or topic or shard not found");
            System.exit(1);
        } catch (ShardSealedException e) {
            System.out.println("shard is closed, all data has been read");
            System.exit(1);
        } catch (LimitExceededException e) {
            System.out.println("maybe exceed limit, retry");
        } catch (DatahubClientException e) {
            System.out.println("other error");
            System.out.println(e);
            System.exit(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

使用協同消費

步驟一:初始化Consumer

配置

名稱

描述

autoCommit

是否自動提交點位,默認為true。點位的提交會在后臺線程按配置的時間間隔執行,自動提交的邏輯是當read接口被調用時,認為之前讀的數據已經處理完畢。如果設置為false,那么每條record處理完必須ack,后臺提交點位會保證該點位之前的record全部被ack。

offsetCommitTimeoutMs

點位的提交間隔,單位毫秒,默認30000ms,范圍[3000, 300000]

sessionTimeoutMs

會話超時時間,心跳間隔會設為改置的2/3,超過時間沒有心跳,認為客戶端已停止,服務端會重新分配被占有shard,單位毫秒,默認60000ms,范圍[60000, 180000]

fetchSize

單個shard異步讀取記錄的大小,會緩存2倍于該值的記錄,少于2倍會觸發異步任務去讀取,默認1000,必須大于0

您還需要在工程中配置相應的Access Key和Secret Key,推薦使用環境變量的形式在配置文件中配置。

datahub.endpoint=<yourEndpoint>
datahub.accessId=<yourAccessKeyId>
datahub.accessKey=<yourAccessKeySecret>
重要

阿里云賬號AccessKey擁有所有API的訪問權限,建議您使用RAM用戶進行API訪問或日常運維。

強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。

{
				@Value("${datahub.endpoint}")
				String endpoint ;
				@Value("${datahub.accessId}")
				String accessId;
				@Value("${datahub.accessKey}")
				String accessKey;
        // Endpoint以Region: 華東1為例,其他Region請按實際情況填寫
        String projectName = "<YourProjectName>";
        String topicName = "<YourTopicName>";
        String subId = "<YourSubscriptionId>";
        /**
         * 1.使用協同消費,使用點位服務
         *使用協同消費:
             如果有兩臺機器使用同一個subid來消費topic(有5個shard)中的數據,不需要手動指定哪臺機器消費哪幾個shard, 
            服務器端會自動分配,當有第三臺機器加入了后也會做balance
         *
         * 使用點位服務:
             消費的時候,會根據服務端subid的點位來讀(如果是新建的訂閱,還沒有點位就從頭讀),如果要指定從哪個時間點開始讀,可以在頁面上重置subid的點位
         * 
         * */
        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
        Consumer consumer = new Consumer(projectName, topicName, subId, config);



        /**
         * 2. 不使用協同消費,使用點位服務,提供subId和Consumer讀取的shard列表
         *不使用協同消費:
             如果有兩臺機器使用同一個subid來消費topic(有5個shard)中的數據,那需要手動指定哪臺機器消費哪幾個shard,
            (客戶端機器A,消費0,1,2號shard;客戶端機器B,消費3,4號shard)當有第三臺機器加入了后需要自己重新指定消費策略
         * 
         * */

        //客戶端A消費shardid為0,1,2的示例
        List<String> shardlists = Arrays.asList("0", "1", "2");
        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
        Consumer consumer = new Consumer(projectName, topicName, subId, shardlists, config);


        //客戶端B消費shardid為3,4的示例
//        List<String> shardlists = Arrays.asList("3", "4");
//        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
//        Consumer consumer = new Consumer(projectName, topicName, subId, shardlists, config);



        /**
         *3.不使用協同消費,不使用點位服務
         *不使用點位服務:
             就是自己需要找個存儲(db/redis等)來記錄自己哪個shard消費到什么時間/Sequence,每次讀的時候都要根據自己記錄的點位來初始化
         * 
         * */
        Map<String, Offset> offsetMap = new HashMap<>();
// 提供sequence和timestamp,若sequence超出范圍則使用timestamp獲取Cursor
        offsetMap.put("0", new Offset(100, 1548573440756L));
// 只提供sequence,按照sequence獲取Cursor
        offsetMap.put("1", new Offset().setSequence(1));
// 只提供timestamp,按照timestamp獲取Cursor
        offsetMap.put("2", new Offset().setTimestamp(1548573440756L));
        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
        Consumer consumer = new Consumer(projectName, topicName, subId, offsetMap, config);
    }

步驟二:協同代碼示例

import com.aliyun.datahub.client.exception.AuthorizationFailureException;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.exception.NoPermissionException;
import com.aliyun.datahub.client.exception.SubscriptionOfflineException;
import com.aliyun.datahub.client.exception.SubscriptionOffsetResetException;
import com.aliyun.datahub.client.exception.SubscriptionSessionInvalidException;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.TupleRecordData;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

public class DatahubReader {
    private static final Logger LOG = LoggerFactory.getLogger(DatahubReader.class);

    private static void sleep(long milliSeconds) {
        try {
            TimeUnit.MILLISECONDS.sleep(milliSeconds);
        } catch (InterruptedException e) {
            // TODO:自行處理異常
        }
    }

    public static Consumer createConsumer(ConsumerConfig config, String project, String topic, String subId)
    {
        return new Consumer(project, topic, subId, config);
    }

    public static void main(String[] args) {
        String projectName = "<YourProjectName>";
        String topicName = "<YourTopicName>";
        String subId = "<YourSubscriptionId>";

        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
        Consumer consumer = createConsumer(config, projectName, topicName, subId);

        int maxRetry = 3;
        boolean stop = false;
        try {
            while (!stop) {
                try {
                    while (true) {
                        // 協同消費剛初始化,需要等待服務端分配shard,約40秒,期間只能返回null
                        // 自動提交模式,每次調用read,認為之前讀的數據都已處理完成,自動ack
                        RecordEntry record = consumer.read(maxRetry);

                        // 處理數據
                        if (record != null) {
                            TupleRecordData data = (TupleRecordData) record.getRecordData();
                            // 根據自己的schema來處理數據,此處打印第一列的內容
                            LOG.info("field1: {}", data.getField(0));

                            // 根據列名取數據
                            // LOG.info("field2: {}", data.getField("field2"));

                            // 非自動提交模式,每條record處理完后都需要ack
                            // 自動提交模式,ack不會做任何操作
                            // 1.1.7版本及以上
                            record.getKey().ack();
                        } else {
                            LOG.info("read null");
                        }
                    }
                } catch (SubscriptionOffsetResetException e) {
                    // 點位被重置,重新初始化consumer
                    try {
                        consumer.close();
                        consumer = createConsumer(config, projectName, topicName, subId);
                    } catch (DatahubClientException e1) {
                        // 初始化失敗,重試或直接拋異常
                        LOG.error("create consumer failed", e);
                        throw e;
                    }
                } catch (InvalidParameterException |
                        SubscriptionOfflineException |
                        SubscriptionSessionInvalidException |
                        AuthorizationFailureException |
                        NoPermissionException e) {
                    // 請求參數非法
                    // 訂閱被下線
                    // 訂閱下相同shard被其他客戶端占用
                    // 簽名不正確
                    // 沒有權限
                    LOG.error("read failed", e);
                    throw e;
                } catch (DatahubClientException e) {
                    // 基類異常,包含網絡問題等,可以選擇重試
                    LOG.error("read failed, retry", e);
                    sleep(1000);
                }
            }
        } catch (Throwable e) {
            LOG.error("read failed", e);
        } finally {
            // 確保資源正確釋放
            // 會提交已ack的點位
            consumer.close();
        }
    }
}

寫數據

使用SDK

服務器2.12之后版本開始支持PutRecordsByShardResult接口,之前版本putRecords接口,使用putRecordsByShard接口時需指定寫入的shard,否則會默認寫入第一個處于ACTIVE狀態的shard。兩個方法中傳入參數records是一個List對象,每個元素為一個record,但是必須為相同類型的record,即Tuple類型或者Blob類型。DataHub目前支持按照Shard寫入 (服務端 >= 2.12版本) 以及混合寫入,分別對應putRecordsByShardputRecords兩個接口。針對第二個接口,用戶需要判斷PutRecordsResult結果以確認數據是否寫入成功;而putRecordsByShard接口則直接通過異常告知用戶是否成功。如果服務端支持,建議用戶使用putRecordsByShard接口。

說明

PutRecordsResult putRecords(String projectName, String topicName, List records);PutRecordsByShardResult putRecordsByShard(String projectName, String topicName, String shardId, List records);

  • 參數

    • projectName The name of the project.

    • topicName The name of the topic.

    • shardId The id of the shard.

    • records Records list to written.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

    • ShardSealedException

    • LimitExceededException

1). 寫入Tuple topic

// 寫入Tuple型數據
public static void tupleExample() {
    String shardId = "9";
    // 獲取schema
    recordSchema = datahubClient.getTopic(Constant.projectName, Constant.topicName).getRecordSchema();
    // 生成十條數據
    List<RecordEntry> recordEntries = new ArrayList<>();
    for (int i = 0; i < 10; ++i) {
        RecordEntry recordEntry = new RecordEntry();
        // 對每條數據設置額外屬性,例如ip 機器名等。可以不設置額外屬性,不影響數據寫入
        recordEntry.addAttribute("key1", "value1");
        TupleRecordData data = new TupleRecordData(recordSchema);
        data.setField("field1", "HelloWorld");
        data.setField("field2", 1234567);
        recordEntry.setRecordData(data);
        recordEntry.setShardId(shardId);
        recordEntries.add(recordEntry);
    }
    try {
        // 服務端從2.12版本開始支持,之前版本請使用putRecords接口
        //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
        datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
        System.out.println("write data successful");
    } catch (InvalidParameterException e) {
        System.out.println("invalid parameter, please check your parameter");
        System.exit(1);
    } catch (AuthorizationFailureException e) {
        System.out.println("AK error, please check your accessId and accessKey");
        System.exit(1);
    } catch (ResourceNotFoundException e) {
        System.out.println("project or topic or shard not found");
        System.exit(1);
    } catch (ShardSealedException e) {
        System.out.println("shard status is CLOSED, can not write");
        System.exit(1);
    } catch (DatahubClientException e) {
        System.out.println("other error");
        System.out.println(e);
        System.exit(1);
    }
}

2). 寫入Blob topic

// 寫入blob型數據
public static void blobExample() {
    // 生成十條數據
    List<RecordEntry> recordEntries = new ArrayList<>();
    String shardId = "4";
    for (int i = 0; i < 10; ++i) {
        RecordEntry recordEntry = new RecordEntry();
        // 對每條數據設置額外屬性
        recordEntry.addAttribute("key1", "value1");
        BlobRecordData data = new BlobRecordData("123456".getBytes(Charsets.UTF_8));
        recordEntry.setRecordData(data);
        recordEntry.setShardId(shardId);
        recordEntries.add(recordEntry);
        recordEntry.setShardId("0");
    }
    while (true) {
        try {
            // 服務端從2.12版本開始支持,之前版本請使用putRecords接口
            //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
            datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
            System.out.println("write data  successful");
            break;
        } catch (InvalidParameterException e) {
            System.out.println("invalid parameter, please check your parameter");
            System.exit(1);
        } catch (AuthorizationFailureException e) {
            System.out.println("AK error, please check your accessId and accessKey");
            System.exit(1);
        } catch (ResourceNotFoundException e) {
            System.out.println("project or topic or shard not found");
            System.exit(1);
        } catch (ShardSealedException e) {
            System.out.println("shard status is CLOSED, can not write");
            System.exit(1);
        } catch (LimitExceededException e) {
            System.out.println("maybe qps exceed limit, retry");
        } catch (DatahubClientException e) {
            System.out.println("other error");
            System.out.println(e);
            System.exit(1);
        }
    }
}

使用Producer

步驟一:引入依賴

<dependency>
    <groupId>com.aliyun.datahub</groupId>
    <artifactId>aliyun-sdk-datahub</artifactId>
    <version>2.19.0-public</version>
</dependency>
<dependency>
      <groupId>com.aliyun.datahub</groupId>
      <artifactId>datahub-client-library</artifactId>
      <version>1.1.12-public</version>
</dependency>

步驟二:代碼示例

import com.aliyun.datahub.client.exception.AuthorizationFailureException;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.exception.MalformedRecordException;
import com.aliyun.datahub.client.exception.NoPermissionException;
import com.aliyun.datahub.client.exception.ShardNotFoundException;
import com.aliyun.datahub.client.model.Field;
import com.aliyun.datahub.client.model.FieldType;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.RecordSchema;
import com.aliyun.datahub.client.model.TupleRecordData;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.producer.Producer;
import com.aliyun.datahub.exception.ResourceNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class DatahubWriter {
    private static final Logger LOG = LoggerFactory.getLogger(DatahubWriter.class);

    private static void sleep(long milliSeconds) {
        try {
            TimeUnit.MILLISECONDS.sleep(milliSeconds);
        } catch (InterruptedException e) {
            // TODO:自行處理異常
        }
    }

    private static List<RecordEntry> genRecords(RecordSchema schema) {
        List<RecordEntry> recordEntries = new ArrayList<>();
        for (int cnt = 0; cnt < 10; ++cnt) {
            RecordEntry entry = new RecordEntry();

            entry.addAttribute("key1", "value1");
            entry.addAttribute("key2", "value2");

            TupleRecordData data = new TupleRecordData(schema);
            data.setField("field1", "testValue");
            data.setField("field2", 1);

            entry.setRecordData(data);
            recordEntries.add(entry);
        }
        return recordEntries;
    }

    private static void sendRecords(Producer producer, List<RecordEntry> recordEntries) {
        int maxRetry = 3;
        while (true) {
            try {
                // 自動選擇shard寫入
                producer.send(recordEntries, maxRetry);

                // 指定寫入shard "0"
                // producer.send(recordEntries, "0", maxRetry);
                LOG.error("send records: {}", recordEntries.size());
                break;
            } catch (MalformedRecordException e) {
                // record 格式非法,根據業務場景選擇忽略或直接拋異常
                LOG.error("write fail", e);
                throw e;
            } catch (InvalidParameterException |
                    AuthorizationFailureException |
                    NoPermissionException e) {
                // 請求參數非法
                // 簽名不正確
                // 沒有權限
                LOG.error("write fail", e);
                throw e;
            } catch (ShardNotFoundException e) {
                // shard 不存在, 如果不是寫入自己指定的shard,可以不用處理
                LOG.error("write fail", e);
                sleep(1000);
            } catch (ResourceNotFoundException e) {
                // project, topic 或 shard 不存在
                LOG.error("write fail", e);
                throw e;
            } catch (DatahubClientException e) {
                // 基類異常,包含網絡問題等,可以選擇重試
                LOG.error("write fail", e);
                sleep(1000);
            }
        }
    }

    public static void main(String[] args) {
        String projectName = "<YourProjectName>";
        String topicName = "<YourTopicName>";

        RecordSchema schema =  datahubClient.getTopic(projectName, topicName).getRecordSchema();



        ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey);
        Producer producer = new Producer(projectName, topicName, config);

        // 根據場景控制循環
        boolean stop = false;
        try {
            while (!stop) {
                List<RecordEntry> recordEntries = genRecords(schema);
                sendRecords(producer, recordEntries);
            }
        } finally {
            // 確保資源正確釋放
            producer.close();
        }
    }
}

多方式寫入

在DataHub 2.12之前版本,DataHub僅支持putRecords接口,在RecordEntry類中包含shardIdpartitionKeyhashKey三個屬性,用戶通過指定對應屬性的值決定數據寫入到哪個Shard中注意:開啟Shard extend模式無法按Hashkey和PartitionKey方式寫入

說明

2.12及之后版本,建議用戶使用putRecordsByShard接口,避免服務端partition造成的性能損耗

1). 按照ShardID寫入推薦方式,使用示例如下:

RecordEntry entry = new RecordEntry();
entry.setShardId("0");

2). 按HashKey寫入指定一個128 bit的MD5值。 按照HashKey寫入,根據Shard的Shard操作決定數據寫入的Shard使用示例:

RecordEntry entry = new RecordEntry();
entry.setHashKey("7FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD");

2). 按PartitionKey寫入指定一個String類型參數作為PartitionKey,系統根據該String的MD5值以及Shard的Shard操作決定寫入的Shard使用示例:

RecordEntry entry = new RecordEntry();
entry.setPartitionKey("TestPartitionKey");

注意事項

Consumer和Producer都不支持多線程訪問,如果需要使用多線程,則在每個線程都使用不同的Consumer或Producer對象。