日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過Pull模式創(chuàng)建數(shù)據(jù)訂閱通道

本文介紹通過Pull模式創(chuàng)建數(shù)據(jù)訂閱功能,創(chuàng)建后訂閱通道會實時拉取數(shù)據(jù)庫實例的增量數(shù)據(jù),并將增量數(shù)據(jù)保存在訂閱通道中,您可以使用Lindorm提供的SDK從訂閱通道中訂閱增量數(shù)據(jù)并進行消費。同時,您可以在LTS頁面進行訂閱通道的創(chuàng)建、查看及刪除等操作。

前提條件

已將客戶端IP添加至白名單中,具體操作請參見設(shè)置白名單。

已開通數(shù)據(jù)訂閱功能,具體操作,請參見開通數(shù)據(jù)訂閱。

操作步驟

  1. 進入LTS(原BDS)頁面,在左側(cè)導(dǎo)航欄中,選擇數(shù)據(jù)訂閱 > Pull模式。

    streamone
  2. 單擊創(chuàng)建數(shù)據(jù)訂閱通道,并配置以下參數(shù)。

    創(chuàng)建訂閱通道

    名稱

    描述

    源集群

    填寫Lindorm實例ID。

    Lindorm表名

    選擇需要創(chuàng)建數(shù)據(jù)訂閱通道的Lindorm實例表,一條通道只能選擇一張表格。

    主題名

    用于消費數(shù)據(jù)的主題名稱。

    數(shù)據(jù)過期時間(天)

    表示數(shù)據(jù)可以保存的天數(shù),默認(rèn)為7天。

    主題分區(qū)數(shù)

    表示Kafka客戶端為該主題設(shè)置多個分區(qū),多分區(qū)可以并發(fā)消費數(shù)據(jù),默認(rèn)為4個分區(qū)。

  3. 單擊提交

  4. (可選)找到目標(biāo)訂閱通道,單擊操作列的詳情,可以查看數(shù)據(jù)訂閱通道詳情、消費詳情和存儲詳情。

    詳細信息
  5. (可選)您可以通過以下代碼在Kafka客戶端對訂閱數(shù)據(jù)進行消費。

    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.kafka.clients.admin.AdminClientConfig;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.ByteArrayDeserializer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class TestConsume {
      public static void main(String[] args) throws Exception {
        // 創(chuàng)建訂閱通道時填寫的topic名稱
        String topic = "test-topic";
    
        // 鏈接endpoint的配置項
        Properties props = new Properties();
        // 指定鏈接endpoint地址
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "ld-xxx:9092");
        // 指定Key序列化器,不可更改
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        // 指定Value序列化器,不可更改
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        // 指定消費組名稱,在消費時會自動創(chuàng)建
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id-0");
    
        // 創(chuàng)建消費者
        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
        // 訂閱主題
        consumer.subscribe(Arrays.asList(topic));
    
        // 用消費者拉取數(shù)據(jù)
        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(10000));
        for (ConsumerRecord<byte[], byte[]> record : records) {
          // 查看數(shù)據(jù)內(nèi)容
          System.out.println("key: " + Bytes.toString(record.key()));
          System.out.println("value: " + Bytes.toString(record.value()));
        }
        // 提交當(dāng)前消費位移
        consumer.commitSync();
        // 關(guān)閉消費者
        consumer.close();
      }
    }
    說明

    數(shù)據(jù)消費格式說明請參見數(shù)據(jù)消費格式。