日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

兼容Kafka

更新時間: 2024-12-11 17:37:34

DataHub已經完全兼容Kafka協(xié)議,您可以使用原生Kafka客戶端對DataHub進行讀寫操作。

相關介紹

Kafka映射DataHub介紹

Topic類型

Kafka的Topic擴容方式和DataHub的topic擴容方式不同,為了適配Kafka的topic擴容方式,DataHub創(chuàng)建topic時需要將擴容方式選為擴展模式。擴展模式的topic,不再支持分裂/合并操作,而是添加shard的方式,暫不支持減少shard。

Topic命名

Kafka的Topic映射之后為DataHub的project+topic,project和topic以 “.”分割,例如:test_project.test_topic對應到DataHub中Project為test_project,Topic為test_topic,如果含有多個“.”,會以首個“.”分割Project和Topic,多余的“.”和”-“會被替換為“_“。

Partition

DataHub的每個處于Active狀態(tài)shard對應Kafka的1個Partition,如果當前Active狀態(tài)shard為5個,那么就可以視為Kafka有5個Partition,寫入數(shù)據(jù)時,可以指定Partition范圍為[0,4],如果不指定,則會由kafka客戶端選擇Partition。

Tuple Topic

Kafka的數(shù)據(jù)寫入Tuple Topic時,Topic Schema必須為2列或1列,類型必須為STRING,其他情況會寫入失敗。如果為1列,則只寫入value,key的數(shù)據(jù)將被丟棄,如果為2列,則第1列和第2列分別對應key和value。Tuple Topic寫入二進制數(shù)據(jù)會存在亂碼問題,二進制數(shù)據(jù)建議寫入Blob Topic

Blob Topic

Kafka的數(shù)據(jù)寫入Blob Topic時,會把Kafka數(shù)據(jù)的value寫入Blob中,如果Kafka數(shù)據(jù)的key不為NULL,則會寫入DataHub的Attribute,其中key為”__kafka_key__“,value為Kafka數(shù)據(jù)的key。

Header

Kafka的Header對應DataHub的Attribute,但是如果Kafka的Header的value為NULL,則會忽略掉對應的header。建議不要使用”__kafka_key__“作為Header的key

Consumer Group

DataHub的消費組就是訂閱id,只能同時訂閱單個topic,而kafka的group可以同時訂閱多個topic,為了更好的兼容kafka的訂閱方式,DataHub又提供了group的功能,用戶可以在project下創(chuàng)建group并綁定想要訂閱的topic,就可以使用該group訂閱這個project下的多個topic。DataHub的group本質上就是服務端內部封裝了DataHub的多個訂閱,如果group綁定了topic,用戶可以在topic頁面的訂閱列表頁面,看到由group自動創(chuàng)建的訂閱,刪除該訂閱會導致group無法訂閱該topic,并且之前的消費點位都會消失。

目前單個group限制最多可以訂閱50個topic,如果需要訂閱更多,請開工單聯(lián)系我們。

Kafka配置參數(shù)

C=Consumer, P=Producer, S=Streams

參數(shù)

C/P/S

可選配置

是否必須

描述

bootstrap.servers

*

參考Kafka域名列表

security.protocol

*

SASL_SSL

為了保證數(shù)據(jù)傳輸?shù)陌踩裕琄afka寫入DataHub默認使用SSL加密傳輸

sasl.mechanism

*

PLAIN

AK認證方式,僅支持PLAIN

compression.type

P

LZ4

是否開啟壓縮傳輸,目前僅支持LZ4

group.id

C

project.topic:subId

或者

project.group

使用project.topic:subId時必須和訂閱的topic保持一致,否則無法讀取數(shù)據(jù),推薦使用project.group

partition.assignment.strategy

C

org.apache.kafka.clients.consumer.RangeAssignor

Kafka默認為RangeAssignor,并且DataHub目前只支持RangeAssignor,請不要修改此配置

session.timeout.ms

C/S

[60000, 180000]

kafka默認為10000, 但是因為DataHub限制最小為60000,所以這里默認會變?yōu)?0000

heartbeat.interval.ms

C/S

建議session.timeout.ms的 2/3

Kafka默認為3000,但是因為session.timeout.ms會被默認修改為60000,所以這里建議顯示設置為40000,否則heartbeat請求會過于頻繁

application.id

S

project.topic:subId

或者

project.group

使用project.topic:subId時必須和訂閱的topic保持一致,否則無法讀取數(shù)據(jù),推薦使用project.group

以上是使用Kafka客戶端寫入DataHub需要重點關注的參數(shù),對于等客戶端相關的參數(shù),行為沒有變化,例如:retries,batch.size;對于服務端相關參數(shù)不會對服務端行為有改變,例如:無論acks的值為多少,DataHub默認數(shù)據(jù)完全寫入成功之后才會返回。

Kafka域名列表

地區(qū)

Region

外網Endpoint

經典網絡ECS Endpoint

VPC ECS Endpoint

華東1(杭州)

cn-hangzhou

dh-cn-hangzhou.aliyuncs.com:9092

dh-cn-hangzhou.aliyun-inc.com:9093

dh-cn-hangzhou-int-vpc.aliyuncs.com:9094

華東2(上海)

cn-shanghai

dh-cn-shanghai.aliyuncs.com:9092

dh-cn-shanghai.aliyun-inc.com:9093

dh-cn-shanghai-int-vpc.aliyuncs.com:9094

華北2(北京)

cn-beijing

dh-cn-beijing.aliyuncs.com:9092

dh-cn-beijing.aliyun-inc.com:9093

dh-cn-beijing-int-vpc.aliyuncs.com:9094

華南1(深圳)

cn-shenzhen

dh-cn-shenzhen.aliyuncs.com:9092

dh-cn-shenzhen.aliyun-inc.com:9093

dh-cn-shenzhen-int-vpc.aliyuncs.com:9094

華北3(張家口)

cn-zhangjiakou

dh-cn-zhangjiakou.aliyuncs.com:9092

dh-cn-zhangjiakou.aliyun-inc.com:9093

dh-cn-zhangjiakou-int-vpc.aliyuncs.com:9094

亞太東南1(新加坡)

ap-southeast-1

dh-ap-southeast-1.aliyuncs.com:9092

dh-ap-southeast-1.aliyun-inc.com:9093

dh-ap-southeast-1-int-vpc.aliyuncs.com:9094

亞太東南3(吉隆坡)

ap-southeast-3

dh-ap-southeast-3.aliyuncs.com:9092

dh-ap-southeast-3.aliyun-inc.com:9093

dh-ap-southeast-3-int-vpc.aliyuncs.com:9094

亞太南部1(孟買) 已關停

ap-south-1

dh-ap-south-1.aliyuncs.com:9092

dh-ap-south-1.aliyun-inc.com:9093

dh-ap-south-1-int-vpc.aliyuncs.com:9094

歐洲中部1(法蘭克福)

eu-central-1

dh-eu-central-1.aliyuncs.com:9092

dh-eu-central-1.aliyun-inc.com:9093

dh-eu-central-1-int-vpc.aliyuncs.com:9094

上海金融云

cn-shanghai-finance-1

dh-cn-shanghai-finance-1.aliyuncs.com:9092

dh-cn-shanghai-finance-1.aliyun-inc.com:9093

dh-cn-shanghai-finance-1-int-vpc.aliyuncs.com:9094

中國香港

cn-hongkong

dh-cn-hongkong.aliyuncs.com:9092

dh-cn-hongkong.aliyun-inc.com:9093

dh-cn-hongkong-int-vpc.aliyuncs.com:9094

示例

創(chuàng)建Topic示例

頁面創(chuàng)建

kafka

代碼創(chuàng)建

注意:目前無法通過kafka的api創(chuàng)建topic,只能通過datahub的sdk創(chuàng)建,創(chuàng)建時需要指定ExpandMode為ONLY_EXTEND,maven依賴版本需為2.19.0或更高版本

您還需要在工程中配置相應的Access Key和Secret Key,推薦使用環(huán)境變量的形式在配置文件中配置。

datahub.endpoint=<yourEndpoint>
datahub.accessId=<yourAccessKeyId>
datahub.accessKey=<yourAccessKeySecret>
重要

阿里云賬號AccessKey擁有所有API的訪問權限,建議您使用RAM用戶進行API訪問或日常運維。

強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。

<dependency>
  <groupId>com.aliyun.datahub</groupId>
  <artifactId>aliyun-sdk-datahub</artifactId>
  <version>2.19.0-public</version>
</dependency>
@Value("${datahub.endpoint}")
String endpoint ;
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
public class CreateTopic {
    public static void main(String[] args) {
        DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
                .setDatahubConfig(
                        new DatahubConfig(endpoint,
                                new AliyunAccount(accessId, accessKey)))
                .build();

        int shardCount = 1;
        int lifeCycle = 7;

        try {
            datahubClient.createTopic("test_project", "test_topic", shardCount, lifeCycle, RecordType.BLOB, "comment", ExpandMode.ONLY_EXTEND);
        } catch (DatahubClientException e) {
            e.printStackTrace();
        }
    }
}

創(chuàng)建group示例

頁面創(chuàng)建

創(chuàng)建group

創(chuàng)建完成后仍舊可以修改綁定的topic列表,所以這里可以先任意選擇。

創(chuàng)建 group

創(chuàng)建完成后,可以在topic的訂閱列表頁面看到group自動創(chuàng)建了訂閱。

創(chuàng)建group

代碼創(chuàng)建

maven依賴版本需為2.21.6-public或更高版本

<dependency>
  <groupId>com.aliyun.datahub</groupId>
  <artifactId>aliyun-sdk-datahub</artifactId>
  <version>2.21.6-public</version>
</dependency>
@Value("${datahub.endpoint}")
String endpoint ;
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
public class CreateGroup {
    public static void main(String[] args) {
        DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
                .setDatahubConfig(
                        new DatahubConfig(endpoint,
                                new AliyunAccount(accessId, accessKey)))
                .build();

        List<String> topicList = new ArrayList<>();
        topicList.add("test_project.topic1");
        topicList.add("test_project.topic2");
        topicList.add("test_project.topic3");

        try {
            // 創(chuàng)建kafka group
            datahubClient.createKafkaGroup("test_project", "test_topic", "test comment");

            // 將需要訂閱的topic綁定到group上
            datahubClient.updateTopicsForKafkaGroup("test_project", "test_topic", topicList, UpdateKafkaGroupMode.ADD);
        } catch (DatahubClientException e) {
            e.printStackTrace();
        }
    }
}

Producer示例:

生成kafka_client_producer_jaas.conf文件

創(chuàng)建文件kafka_client_producer_jaas.conf,保存到任意路徑,文件內容如下。

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="accessId"
  password="accessKey";
};

maven依賴

Kafka-client版本至少大于等于0.10.0.0,推薦2.4.0

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.0</version>
</dependency>

示例代碼

public class ProducerExample {
    static {
        System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
    }

    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("compression.type", "lz4");

        String KafkaTopicName = "test_project.test_topic";
        Producer<String, String> producer = new KafkaProducer<String, String>(properties);

        try {
            List<Header> headers = new ArrayList<>();
            RecordHeader header1 = new RecordHeader("key1", "value1".getBytes());
            RecordHeader header2 = new RecordHeader("key2", "value2".getBytes());
            headers.add(header1);
            headers.add(header2);

            ProducerRecord<String, String> record = new ProducerRecord<>(KafkaTopicName, 0, "key", "Hello DataHub!", headers);

            // sync send
            producer.send(record).get();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

運行結果

運行成功之后,可以再DataHub抽樣一下,確認是否正常DataHub。

image

Consumer示例

生成kafka_client_producer_jaas.conf文件和maven依賴參考Producer示例。

新加入的consumer需要十幾秒左右分配shard,分配完成后即可消費。

示例代碼

使用kafka group示例(推薦)

package com.aliyun.datahub.kafka.demo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class ConsumerExample2 {

    static {
        System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
    }

    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        // group.id填project.group
        properties.put("group.id", "test_project.test_kafka_group");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "60000");
        properties.put("heartbeat.interval.ms", "40000");
        properties.put("ssl.endpoint.identification.algorithm", "");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

        List<String> topicList = new ArrayList<>();
        topicList.add("test_project.test_topic1");
        topicList.add("test_project.test_topic2");
        topicList.add("test_project.test_topic3");
        // 使用kafka group可以同時訂閱多個topic
        kafkaConsumer.subscribe(topicList);

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));

            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.toString());
            }
        }
    }
}

使用project.topic.subid示例

package com.aliyun.datahub.kafka.demo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerExample {

    static {
        System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
    }

    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        // group.id填project.topic.subId
        properties.put("group.id", "test_project.test_topic:1611039998153N71KM");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "60000");
        properties.put("heartbeat.interval.ms", "40000");
        properties.put("ssl.endpoint.identification.algorithm", "");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

        // 使用project.topic.subId的方式只能訂閱單個topic
        kafkaConsumer.subscribe(Collections.singletonList("test_project.test_topic"));

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));

            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.toString());
            }
        }
    }
}

運行結果

運行成功之后,便可以在終端看到讀取到的數(shù)據(jù)。

ConsumerRecord(topic = test_project.test_topic, partition = 0, leaderEpoch = 0, offset = 0, LogAppendTime = 1611040892661, serialized key size = 3, serialized value size = 14, headers = RecordHeaders(headers = [RecordHeader(key = key1, value = [118, 97, 108, 117, 101, 49]), RecordHeader(key = key2, value = [118, 97, 108, 117, 101, 50])], isReadOnly = false), key = key, value = Hello DataHub!)

注意:這里同一個請求返回的數(shù)據(jù)的LogAppendTime是相同的,是該請求返回所有的數(shù)據(jù)的寫入DataHub時間的最大值

Streams示例

maven依賴

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.4.0</version>
</dependency>

代碼示例

這里讀取test_project下input的數(shù)據(jù),將key和value的字符串轉為小寫重新寫入output。

public class StreamExample {

    static {
        System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
    }

    public static void main(final String[] args) {
        final String input = "test_project.input";
        final String output = "test_project.output";
        final Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("application.id", "test_project.input:1611293595417QH0WL");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("session.timeout.ms", "60000");
        properties.put("heartbeat.interval.ms", "40000");
        properties.put("auto.offset.reset", "earliest");

        final StreamsBuilder builder = new StreamsBuilder();
        TestMapper testMapper = new TestMapper();
        builder.stream(input, Consumed.with(Serdes.String(), Serdes.String()))
                .map(testMapper)
                .to(output, Produced.with(Serdes.String(), Serdes.String()));

        final KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        final CountDownLatch latch = new CountDownLatch(1);

        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }

    static class TestMapper implements KeyValueMapper<String, String, KeyValue<String, String>> {

        @Override
        public KeyValue<String, String> apply(String s, String s2) {
            return new KeyValue<>(StringUtils.lowerCase(s), StringUtils.lowerCase(s2));
        }
    }
}

運行結果

啟動Streams任務之后,分配shard大概需要1分鐘左右,1分鐘之后就可以在控制臺看到當前的task數(shù)量,task數(shù)量和輸入topic的shard數(shù)量保持一致,示例輸入topic為3個shard。

currently assigned active tasks: [0_0, 0_1, 0_2]
    currently assigned standby tasks: []
    revoked active tasks: []
    revoked standby tasks: []

shard分配成功之后,可以向input中寫入一組測試數(shù)據(jù) (AAAA,BBBB),(CCCC,DDDD),(EEEE,FFFF),之后再output抽樣一下,查看數(shù)據(jù)是否正確寫入。

image

注意事項

  • 目前不支持事務、冪等

  • 目前Kafka客戶端無法自動創(chuàng)建DataHub Topic,寫入之前需要保證已創(chuàng)建Topic

  • Consumer目前最多只可以訂閱一個topic

  • Consumer讀取的數(shù)據(jù)時間戳均為LogAppendTime,表示DataHub的落盤時間,單個請求返回的所有數(shù)據(jù)時間戳相同,為所有數(shù)據(jù)時間戳的最大值,所以如果讀取的時間戳可能會大于實際的落盤時間

  • Streams輸入topic目前僅支持一個,輸出可以多個topic

  • Streams目前只支持無狀態(tài)的任務。

  • 支持Kafka版本為0.10.0 -> 2.4.0

常見問題

Q: 寫入數(shù)據(jù)時連接斷開

Selector - [Producer clientId=producer-1] Connection with dh-cn-shenzhen.aliyuncs.com disconnected
java.io.EOFException
    at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:573)
    ...

A: Kafka meta請求和寫數(shù)據(jù)請求不是一個連接,第一次meta請求會請求建立一個連接,然后寫數(shù)據(jù)時會重新和meta中的返回的broker重新建立一個連接,并且之后所有的請求都是在第二個連接上發(fā)送,因此第一個連接就會閑置,服務端會主動關閉閑置超過一定時間的連接,因此如果這個錯誤并沒有影像數(shù)據(jù)的正常寫入,直接忽略即可。

Q: 啟動kafka客戶端失敗

Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: No subject alternative names matching IP address 100.67.134.161 found

A: 添加配置properties.put("ssl.endpoint.identification.algorithm", "");

Q: Consumer消費過程中出現(xiàn)DisconnectException

[INFO][Consumer clientId=client-id, groupId=consumer-project.topic:subid] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1: {}.
org.apache.kafka.common.errors.DisconnectException

A: Kafka的客戶端需要與服務端保持TCP長連接,一般情況是因為網絡抖動造成的,客戶端有重試邏輯,因此不會對客戶端的消費造成影響。

上一篇: 讀寫數(shù)據(jù) 下一篇: 通過STS方式訪問DataHub