日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

發布者最佳實踐

本文介紹云消息隊列 Kafka 版發布者的最佳實踐,幫助您降低發送消息的錯誤率。本文最佳實踐基于Java客戶端。對于其他語言的客戶端,其基本概念與思想是相通的,但實現細節可能存在差異。

發送消息

發送消息的示例代碼如下:

Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(
        topic,   //消息主題。
        null,   //分區編號。建議為null,由Producer分配。
        System.currentTimeMillis(),   //時間戳。
        String.valueOf(value.hashCode()),   //消息鍵。
        value   //消息值。
));

完整示例代碼,請參見SDK概述

Key和Value

0.10.2版本的云消息隊列 Kafka 版的消息有以下兩個字段:

  • Key:消息的標識。

  • Value:消息內容。

為了便于追蹤,請為消息設置一個唯一的Key。您可以通過Key追蹤某消息,打印發送日志和消費日志,了解該消息的發送和消費情況。

如果消息發送量較大,建議不要設置Key,并使用黏性分區策略。黏性分區策略詳情,請參見黏性分區策略

重要

在0.11.0以及之后的版本,云消息隊列 Kafka 版開始支持headers,如果您需要使用headers,需要將服務端升級至2.2.0版本。

失敗重試

分布式環境下,由于網絡等原因偶爾發送失敗是常見的。導致這種失敗的原因可能是消息已經發送成功,但是ACK失敗,也有可能是確實沒發送成功。

云消息隊列 Kafka 版是VIP網絡架構,長時間不進行通信連接會被主動斷開,因此,不是一直活躍的客戶端會經常收到connection reset by peer錯誤,建議重試消息發送。

您可以根據業務需求,設置以下重試參數:

  • retries:消息發送失敗時的重試次數。

  • retry.backoff.ms,消息發送失敗的重試間隔,建議設置為1000,單位:毫秒。

異步發送

發送接口是異步的,如果您想接收發送的結果,可以調用metadataFuture.get(timeout, TimeUnit.MILLISECONDS)

線程安全

Producer是線程安全的,且可以往任何Topic發送消息。通常情況下,一個應用對應一個Producer。

Acks

Acks的說明如下:

  • acks=0:無需服務端的Response、性能較高、丟數據風險較大。

  • acks=1:服務端主節點寫成功即返回Response、性能中等、丟數據風險中等、主節點宕機可能導致數據丟失。

  • acks=all:服務端主節點寫成功且備節點同步成功才返回Response、性能較差、數據較為安全、主節點和備節點都宕機才會導致數據丟失。

為了提升發送性能, 建議設置為acks=1

提升發送性能(減少碎片化發送請求)

一般情況下,一個云消息隊列 Kafka 版Topic會有多個分區。云消息隊列 Kafka 版Producer客戶端在向服務端發送消息時,需要先確認往哪個Topic的哪個分區發送。我們給同一個分區發送多條消息時,Producer客戶端將相關消息打包成一個Batch,批量發送到服務端。Producer客戶端在處理Batch時,是有額外開銷的。一般情況下,小Batch會導致Producer客戶端產生大量請求,造成請求隊列在客戶端和服務端的排隊,并造成相關機器的CPU升高,從而整體推高了消息發送和消費延遲。一個合適的Batch大小,可以減少發送消息時客戶端向服務端發起的請求次數,在整體上提高消息發送的吞吐和延遲。

Batch機制,云消息隊列 Kafka 版Producer端主要通過兩個參數進行控制:

  • batch.size : 發往每個分區(Partition)的消息緩存量(消息內容的字節數之和,不是條數)。達到設置的數值時,就會觸發一次網絡請求,然后Producer客戶端把消息批量發往服務器。如果batch.size設置過小,有可能影響發送性能和穩定性。建議保持默認值16384。單位:字節。

  • linger.ms : 每條消息在緩存中的最長時間。若超過這個時間,Producer客戶端就會忽略batch.size的限制,立即把消息發往服務器。建議根據業務場景, 設置linger.ms在100~1000之間。單位:毫秒。

因此,云消息隊列 Kafka 版Producer客戶端什么時候把消息批量發送至服務器是由batch.sizelinger.ms共同決定的。您可以根據具體業務需求進行調整。為了提升發送的性能,保障服務的穩定性, 建議您設置batch.size=16384linger.ms=1000

黏性分區策略

只有發送到相同分區的消息,才會被放到同一個Batch中,因此決定一個Batch如何形成的一個因素是云消息隊列 Kafka 版Producer端設置的分區策略。云消息隊列 Kafka 版Producer允許通過設置Partitioner的實現類來選擇適合自己業務的分區。在消息指定Key的情況下,云消息隊列 Kafka 版Producer的默認策略是對消息的Key進行哈希,然后根據哈希結果選擇分區,保證相同Key的消息會發送到同一個分區。

在消息沒有指定Key的情況下,云消息隊列 Kafka 版2.4版本之前的默認策略是循環使用主題的所有分區,將消息以輪詢的方式發送到每一個分區上。但是,這種默認策略Batch的效果會比較差,在實際使用中,可能會產生大量的小Batch,從而使得實際的延遲增加。鑒于該默認策略對無Key消息的分區效率低問題,云消息隊列 Kafka 版在2.4版本引入了黏性分區策略(Sticky Partitioning Strategy)。

黏性分區策略主要解決無Key消息分散到不同分區,造成小Batch問題。其主要策略是如果一個分區的Batch完成后,就隨機選擇另一個分區,然后后續的消息盡可能地使用該分區。這種策略在短時間內看,會將消息發送到同一個分區,如果拉長整個運行時間,消息還是可以均勻地發布到各個分區上的。這樣可以避免消息出現分區傾斜,同時還可以降低延遲,提升服務整體性能。

如果您使用的云消息隊列 Kafka 版Producer客戶端是2.4及以上版本,默認的分區策略就采用黏性分區策略。如果您使用的Producer客戶端版本小于2.4,可以根據黏性分區策略原理,自行實現分區策略,然后通過參數partitioner.class設置指定的分區策略。

關于黏性分區策略實現,您可以參考如下Java版代碼實現。該代碼的實現邏輯主要是根據一定的時間間隔,切換一次分區。

public class MyStickyPartitioner implements Partitioner {

    // 記錄上一次切換分區時間。
    private long lastPartitionChangeTimeMillis = 0L;
    // 記錄當前分區。
    private int currentPartition = -1;
    // 分區切換時間間隔,可以根據實際業務選擇切換分區的時間間隔。
    private long partitionChangeTimeGap = 100L;
    
    public void configure(Map<String, ?> configs) {}

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        // 獲取所有分區信息。
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if (keyBytes == null) {
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            int availablePartitionSize = availablePartitions.size();

            // 判斷當前可用分區。
            if (availablePartitionSize > 0) {
                handlePartitionChange(availablePartitionSize);
                return availablePartitions.get(currentPartition).partition();
            } else {
                handlePartitionChange(numPartitions);
                return currentPartition;
            }
        } else {
            // 對于有key的消息,根據key的哈希值選擇分區。
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private void handlePartitionChange(int partitionNum) {
        long currentTimeMillis = System.currentTimeMillis();

        // 如果超過分區切換時間間隔,則切換下一個分區,否則還是選擇之前的分區。
        if (currentTimeMillis - lastPartitionChangeTimeMillis >= partitionChangeTimeGap
            || currentPartition < 0 || currentPartition >= partitionNum) {
            lastPartitionChangeTimeMillis = currentTimeMillis;
            currentPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNum;
        }
    }

    public void close() {}

}

OOM

結合云消息隊列 Kafka 版的Batch設計思路,云消息隊列 Kafka 版會緩存消息并打包發送,如果緩存太多,則有可能造成OOM(Out of Memory)。

  • buffer.memory : 發送的內存池大小。如果內存池設置過小,則有可能導致申請內存耗時過長,從而影響發送性能,甚至導致發送超時。建議buffer.memory ≧ batch.size * 分區數 * 2。單位:字節。

  • buffer.memory的默認數值是32 MB,對于單個Producer而言,可以保證足夠的性能。

    重要

    如果您在同一個JVM中啟動多個Producer,那么每個Producer都有可能占用32 MB緩存空間,此時便有可能觸發OOM。

  • 在生產時,一般沒有必要啟動多個Producer;如有特殊情況需要,則需要考慮buffer.memory的大小,避免觸發OOM。

分區順序

單個分區(Partition)內,消息是按照發送順序儲存的,是基本有序的。

默認情況下,云消息隊列 Kafka 版為了提升可用性,并不保證單個分區內絕對有序,在升級或者宕機時,會發生少量消息亂序(某個分區掛掉后把消息Failover到其它分區)。

如果業務要求分區保證嚴格有序,請在創建Topic時選擇使用Local存儲。