日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

接入方式

本文介紹Linkedmall分銷消息在多種SDK下的接入方式

JAVA SDK

環境準備

  • 安裝1.8或以上版本JDK。具體操作。請參見安裝JDK

  • 安裝2.5或以上版本Maven。具體操作,請參見安裝Maven

  • 安裝編譯工具。

安裝依賴庫

在pom.xml中添加如下依賴

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.4.0</version>
</dependency>

配置準備

  1. 下載SSL根證書

  2. 在項目src/resource文件下新建kafka.properties文件,并填寫以下內容。

    說明

    bootstrap.serversgroup.idsasl.usernamesasl.password等參數請登錄LinkedMall開放平臺自助獲取

## SSL接入點
bootstrap.servers=xxxx
## Group
group.id=xxxx
## SASL用戶名
sasl.username=12345
## SASL密碼
sasl.password=12345
##SSL根證書,請指向步驟1中下載的根證書文件絕對路徑。
ssl.truststore.location=/xxxx/only.4096.client.truststore.jks
  1. 創建配置文件加載程序

import java.util.Properties;

public class KafkaConfigurer {
    private static volatile Properties properties;

    public static Properties getKafkaProperties() {
        if (properties == null) {
            synchronized (KafkaConfigurer.class) {
                if (properties == null) {
                    //獲取配置文件kafka.properties的內容。
                    Properties kafkaProperties = new Properties();
                    try {
                        kafkaProperties.load(KafkaConfigurer.class.getClassLoader().getResourceAsStream("kafka.properties"));
                    } catch (Exception e) {
                        //沒加載到文件,程序要考慮退出。
                        e.printStackTrace();
                    }
                    properties = kafkaProperties;
                }
            }
        }
        return properties;
    }
}

消息消費

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;

import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.*;
import java.util.stream.Collectors;


public class LinkedmallConsumerExample {

    private static final String JAAS_CONFIG_TEMPLATE = "org.apache.kafka.common.security.plain.PlainLoginModule required" +
            " username=\"%s\"" +
            " password=\"%s\";";

    public static void main(String[] args) throws IOException {

        //加載kafka.properties
        Properties kafkaProperties = KafkaConfigurer.getKafkaProperties();
        Properties props = new Properties();

        //設置接入點及鑒權相關配置。
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        String jaasConfig = String.format(JAAS_CONFIG_TEMPLATE, kafkaProperties.get("sasl.username"), kafkaProperties.get("sasl.password"));
        props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

        //設置group
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));

        //兩次Poll之間的最大允許間隔。
        //消費者超過該值沒有返回心跳,服務端判斷消費者處于非存活狀態,服務端將消費者從Group移除并觸發Rebalance,默認30s。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

        //每次poll的消息最大數量。
        //不宜設置過大,需要考慮消費端消費速率,如果poll太多數據,而不能在下次poll之前消費完,則會觸發一次負載均衡,產生卡頓。
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);

        //設置單次拉取的字節數,推薦設置為每次poll的消息最大數量*1024(Linkedmall下發消息約1K),過大可能導致限流。
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);

        //消息的反序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //建議關閉自動提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);


        //構造消息對象,也即生成一個消費實例。
        KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);

        //設置訂閱的topic
        List<String> topics = Arrays.stream(kafkaProperties.getProperty("topics").split(",")).collect(Collectors.toList());
        consumer.subscribe(topics);

        //用于異步處理消息的線程池。
        ExecutorService executorService = new ThreadPoolExecutor(0,
                Math.max(1, Runtime.getRuntime().availableProcessors() - 1),
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<>(), Executors.defaultThreadFactory());
        //循環消費消息。
        while (true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(Duration.of(1, ChronoUnit.SECONDS));
                System.out.printf("Record pulled: %d\n", records.count());

                // 按partition分別串行處理,保證順序性;
                Collection<Callable<Void>> tasks = records.partitions().stream()
                        // 獲取單個分區內消息
                        .map(records::records)
                        // 為每個分區創建分區內消息串行消費的邏輯
                        .map(partitionRecords -> (Callable<Void>) () -> {
                            for (ConsumerRecord<String, String> record : partitionRecords) {
                                System.out.printf("Consume topic:%s partition:%d offset:%d%n", record.topic(), record.partition(), record.offset());

                                consume(record);
                            }
                            return null;
                        })
                        .collect(Collectors.toList());

                // 等待所有分區消息消費完成,超時時間25s;等待時間應小于SESSION_TIMEOUT_MS_CONFIG,否則會觸發rebalance
                List<Future<Void>> result = executorService.invokeAll(tasks, 25, TimeUnit.SECONDS);
                for (Future<Void> future : result) {
                    future.get();
                }

                // 全部消費成功,手動提交位點
                consumer.commitSync();
            } catch (Exception e) {
                // 消費失敗,記錄失敗原因
                try {
                    Thread.sleep(1000);
                } catch (Throwable ignore) {
                }
                e.printStackTrace();
            }
        }
    }

    private static void consume(ConsumerRecord<String, String> record) {
        int retryTime = 10;
        Exception finalException = null;
        // 手動捕獲異常重試
        for (int i = 0; i < retryTime; i++) {
            try {
                doConsume(record);
                return;
            } catch (Exception e) {
                finalException = e;
            }
        }
        // 記錄異常原因
        finalException.printStackTrace();
    }

    private static void doConsume(ConsumerRecord<String, String> record) {
        // 消息業務處理邏輯
    }
}

注意事項

1. 消費重試

kafka客戶端自身并不提供消費重試機制,建議捕獲異常后自行進行重試,或通過spring-kafka等組件實現重試機制。

  • 注意:注意區分消息失敗原因是否是可重試的,對不可重試的消息(如因非預期異常導致的失敗)不做阻塞,通過日志等其他途徑記錄,事后分析訂正;對可重試的消息做最大重試次數等限制,否則無限制地重試可能導致消息進度被阻塞,導致消息堆積等問題。

  • 注意:未提交位點時,客戶端重啟后會重新拉取上次提交位點開始的消息,因此可能重復消費部分已成功消費的消息,需要進行冪等處理。

2. 順序消費

Linkedmall消息在發送時保證按分區有序,如您需要按序消費消息,請務必參考示例代碼中的消費部分,對每個分區內的消息串行處理

// 拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.of(1, ChronoUnit.SECONDS));

// 獲取分區信息
Set<TopicPartition> partitions = records.partitions();

for (TopicPartition partition: partitions) {
  // 獲取單個分區內的消息
  List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
  // 可以使用異步線程串行處理分區內的消息
  consumePartition(partitionRecords);
}
  1. 消息類型

    Topic中會包含多種Event,如商品Topic中包含ProductCreated,SkuEdited等消息,您可以根據需要接受,如果業務上不需要,消息處理時可以直接跳過。

重置消息消費位點

package com.aliyun.neuron.demo.kafka;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;

import java.time.Duration;
import java.util.*;

/**
 * @author zj-407802
 * @date 2024/7/10 17:58
 * @desc 設置偏移量
 **/
public class LinkedMallConsumerOffsetTest {
    private static final String JAAS_CONFIG_TEMPLATE = "org.apache.kafka.common.security.plain.PlainLoginModule required" +
            " username=\"%s\"" +
            " password=\"%s\";";

    private static final String BOOTSTRAP_SERVERS="alikafka-post-cn-uqm3e7bsu003-1.alikafka.aliyuncs.com:9093,alikafka-post-cn-uqm3e7bsu003-2.alikafka.aliyuncs.com:9093,alikafka-post-cn-uqm3e7bsu003-3.alikafka.aliyuncs.com:9093";
    private static final String SSL_TRUSTSTORE_LOCATION="/Users/*******/ideaProjets/linkedmall/distributor-kafka-example/src/main/resources/only.4096.client.truststore.jks";
    private static final String USER_NAME="u12000*****";
    private static final String PASS_WORD="ygBmPv4******";
    private static final String GROUP_ID="1200*****-prod";
    private static String topic = "1200******-product";


    private static Consumer<String, String> consumer;

    public static void main(String[] args) throws InterruptedException {
        //1.初始化消費者
        initConsumer();
        //2.添加訂閱關系
        addSubscribe();
        //3.查詢分區和位點
        Set<TopicPartition> assignmentPartitions = consumer.assignment();
        for (TopicPartition assignmentPartition : assignmentPartitions) {
            int partition = assignmentPartition.partition();
            long position = consumer.position(new TopicPartition(topic, partition));
            System.out.println("position:" + partition + ",position:" + position);
        }

        //4.重置到最早的消費位點
       // resetOffsetToBeging(0);
        //4.2根據時間戳重置
        resetOffsetByTimestamp(1,1718709716000L);

        //5.添加訂閱關系
        addSubscribe();
        //6.查詢重置位點后 分區和位點
        Set<TopicPartition> assignmentPartitions2 = consumer.assignment();
        for (TopicPartition assignmentPartition : assignmentPartitions2) {
            int partition = assignmentPartition.partition();
            long position = consumer.position(new TopicPartition(topic, partition));
            System.out.println("position2:" + partition + ",position2:" + position);
        }
    }

    private static void addSubscribe() {

        //設置訂閱的topic
        List<String> topics = Arrays.asList(topic);
        consumer.subscribe(topics);

        // 等待分區分配
        while (consumer.assignment().isEmpty ()) {
            consumer.poll(Duration.ofMillis(100));
        }
    }

    private static void initConsumer() {
        //加載kafka.properties
        Properties props = new Properties();

        //設置接入點及鑒權相關配置。
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, SSL_TRUSTSTORE_LOCATION);
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        String jaasConfig = String.format(JAAS_CONFIG_TEMPLATE, USER_NAME, PASS_WORD);
        props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

        //設置group
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);

        //兩次Poll之間的最大允許間隔。
        //消費者超過該值沒有返回心跳,服務端判斷消費者處于非存活狀態,服務端將消費者從Group移除并觸發Rebalance,默認30s。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

        //每次poll的消息最大數量。
        //不宜設置過大,需要考慮消費端消費速率,如果poll太多數據,而不能在下次poll之前消費完,則會觸發一次負載均衡,產生卡頓。
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);

        //設置單次拉取的字節數,推薦設置為每次poll的消息最大數量*1024(Linkedmall下發消息約1K),過大可能導致限流。
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);

        //消息的反序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //建議關閉自動提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        //構造消息對象,也即生成一個消費實例。
        consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
    }

    /**
     * 重置到最早的消費位點
     * partition:分區
     * @throws InterruptedException
     */
    private static void resetOffsetToBeging(int partition) throws InterruptedException {
        //取消訂閱關系
        consumer.unsubscribe();
        HashMap<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        ArrayList<TopicPartition> list = new ArrayList<>();
        for (PartitionInfo part : partitionInfos) {
            if (part.partition() == partition) {
                TopicPartition topicPartition = new TopicPartition(part.topic(), part.partition());
                list.add(topicPartition);
                offset.put(new TopicPartition(part.topic(), part.partition()), new OffsetAndMetadata(0));
            }

        }
        consumer.assign(list);
        //移動到最早offset
        consumer.seekToBeginning(list);
        //需要停掉已運行的服務再執行commit
        consumer.commitSync(offset);
        Thread.sleep(5000);
        //取消訂閱關系
        consumer.unsubscribe();
    }

    /**
     * 根據時間戳重置消費位點
     */
    private static void resetOffsetByTimestamp(int partition,Long timestampMs) throws InterruptedException {
        //取消訂閱關系
        consumer.unsubscribe();
        HashMap<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        ArrayList<TopicPartition> list = new ArrayList<>();
        HashMap<TopicPartition, Long> map = new HashMap<>();
        for (PartitionInfo part : partitionInfos) {
            if (part.partition() == partition) {
                TopicPartition topicPartition = new TopicPartition(part.topic(), part.partition());
                list.add(topicPartition);
                map.put(new TopicPartition(part.topic(), part.partition()), timestampMs);
            }
        }

        final Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = consumer.offsetsForTimes(map);
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetAndTimestampMap.entrySet()) {
            final TopicPartition key = entry.getKey();
            final OffsetAndTimestamp value = entry.getValue();

            long position = 0;
            if (value != null) {
                position = value.offset();
            } else {
                list.add(key);
                position = consumer.position(key);
            }
            offset.put(key, new OffsetAndMetadata(position));
        }
        consumer.assign(list);
        //移動到最早offset
        consumer.seekToBeginning(list);
        //需要停掉已運行的服務再執行commit
        consumer.commitSync(offset);
        Thread.sleep(5000);
        //取消訂閱關系
        consumer.unsubscribe();
    }

}

其他語言

其他語言請根據接入點、topic、group等,參考阿里云kafka官方接入demo自助接入:https://github.com/AliwareMQ/aliware-kafka-demos/tree/master