通過Pull模式創(chuàng)建數(shù)據(jù)訂閱通道
本文介紹通過Pull模式創(chuàng)建數(shù)據(jù)訂閱功能,創(chuàng)建后訂閱通道會實時拉取數(shù)據(jù)庫實例的增量數(shù)據(jù),并將增量數(shù)據(jù)保存在訂閱通道中,您可以使用Lindorm提供的SDK從訂閱通道中訂閱增量數(shù)據(jù)并進行消費。同時,您可以在LTS頁面進行訂閱通道的創(chuàng)建、查看及刪除等操作。
前提條件
已將客戶端IP添加至白名單中,具體操作請參見設(shè)置白名單。
已開通數(shù)據(jù)訂閱功能,具體操作,請參見開通數(shù)據(jù)訂閱。
操作步驟
進入LTS(原BDS)頁面,在左側(cè)導(dǎo)航欄中,選擇數(shù)據(jù)訂閱 > Pull模式。
單擊創(chuàng)建數(shù)據(jù)訂閱通道,并配置以下參數(shù)。
名稱
描述
源集群
填寫Lindorm實例ID。
Lindorm表名
選擇需要創(chuàng)建數(shù)據(jù)訂閱通道的Lindorm實例表,一條通道只能選擇一張表格。
主題名
用于消費數(shù)據(jù)的主題名稱。
數(shù)據(jù)過期時間(天)
表示數(shù)據(jù)可以保存的天數(shù),默認(rèn)為7天。
主題分區(qū)數(shù)
表示Kafka客戶端為該主題設(shè)置多個分區(qū),多分區(qū)可以并發(fā)消費數(shù)據(jù),默認(rèn)為4個分區(qū)。
單擊提交。
(可選)找到目標(biāo)訂閱通道,單擊操作列的詳情,可以查看數(shù)據(jù)訂閱通道詳情、消費詳情和存儲詳情。
(可選)您可以通過以下代碼在Kafka客戶端對訂閱數(shù)據(jù)進行消費。
import org.apache.hadoop.hbase.util.Bytes; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class TestConsume { public static void main(String[] args) throws Exception { // 創(chuàng)建訂閱通道時填寫的topic名稱 String topic = "test-topic"; // 鏈接endpoint的配置項 Properties props = new Properties(); // 指定鏈接endpoint地址 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "ld-xxx:9092"); // 指定Key序列化器,不可更改 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); // 指定Value序列化器,不可更改 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); // 指定消費組名稱,在消費時會自動創(chuàng)建 props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id-0"); // 創(chuàng)建消費者 KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props); // 訂閱主題 consumer.subscribe(Arrays.asList(topic)); // 用消費者拉取數(shù)據(jù) ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(10000)); for (ConsumerRecord<byte[], byte[]> record : records) { // 查看數(shù)據(jù)內(nèi)容 System.out.println("key: " + Bytes.toString(record.key())); System.out.println("value: " + Bytes.toString(record.value())); } // 提交當(dāng)前消費位移 consumer.commitSync(); // 關(guān)閉消費者 consumer.close(); } }
說明數(shù)據(jù)消費格式說明請參見數(shù)據(jù)消費格式。