本文介紹云消息隊列 Kafka 版發布者的最佳實踐,幫助您降低發送消息的錯誤率。本文最佳實踐基于Java客戶端。對于其他語言的客戶端,其基本概念與思想是相通的,但實現細節可能存在差異。
發送消息
發送消息的示例代碼如下:
Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(
topic, //消息主題。
null, //分區編號。建議為null,由Producer分配。
System.currentTimeMillis(), //時間戳。
String.valueOf(value.hashCode()), //消息鍵。
value //消息值。
));
完整示例代碼,請參見SDK概述。
Key和Value
0.10.2版本的云消息隊列 Kafka 版的消息有以下兩個字段:
Key:消息的標識。
Value:消息內容。
為了便于追蹤,請為消息設置一個唯一的Key。您可以通過Key追蹤某消息,打印發送日志和消費日志,了解該消息的發送和消費情況。
如果消息發送量較大,建議不要設置Key,并使用黏性分區策略。黏性分區策略詳情,請參見黏性分區策略。
在0.11.0以及之后的版本,云消息隊列 Kafka 版開始支持headers,如果您需要使用headers,需要將服務端升級至2.2.0版本。
失敗重試
分布式環境下,由于網絡等原因偶爾發送失敗是常見的。導致這種失敗的原因可能是消息已經發送成功,但是ACK失敗,也有可能是確實沒發送成功。
云消息隊列 Kafka 版是VIP網絡架構,長時間不進行通信連接會被主動斷開,因此,不是一直活躍的客戶端會經常收到connection reset by peer
錯誤,建議重試消息發送。
您可以根據業務需求,設置以下重試參數:
retries
:消息發送失敗時的重試次數。retry.backoff.ms
,消息發送失敗的重試間隔,建議設置為1000
,單位:毫秒。
異步發送
發送接口是異步的,如果您想接收發送的結果,可以調用metadataFuture.get(timeout, TimeUnit.MILLISECONDS)。
線程安全
Producer是線程安全的,且可以往任何Topic發送消息。通常情況下,一個應用對應一個Producer。
Acks
Acks的說明如下:
acks=0
:無需服務端的Response、性能較高、丟數據風險較大。acks=1
:服務端主節點寫成功即返回Response、性能中等、丟數據風險中等、主節點宕機可能導致數據丟失。acks=all
:服務端主節點寫成功且備節點同步成功才返回Response、性能較差、數據較為安全、主節點和備節點都宕機才會導致數據丟失。
為了提升發送性能, 建議設置為acks=1
。
提升發送性能(減少碎片化發送請求)
一般情況下,一個云消息隊列 Kafka 版Topic會有多個分區。云消息隊列 Kafka 版Producer客戶端在向服務端發送消息時,需要先確認往哪個Topic的哪個分區發送。我們給同一個分區發送多條消息時,Producer客戶端將相關消息打包成一個Batch,批量發送到服務端。Producer客戶端在處理Batch時,是有額外開銷的。一般情況下,小Batch會導致Producer客戶端產生大量請求,造成請求隊列在客戶端和服務端的排隊,并造成相關機器的CPU升高,從而整體推高了消息發送和消費延遲。一個合適的Batch大小,可以減少發送消息時客戶端向服務端發起的請求次數,在整體上提高消息發送的吞吐和延遲。
Batch機制,云消息隊列 Kafka 版Producer端主要通過兩個參數進行控制:
batch.size
: 發往每個分區(Partition)的消息緩存量(消息內容的字節數之和,不是條數)。達到設置的數值時,就會觸發一次網絡請求,然后Producer客戶端把消息批量發往服務器。如果batch.size
設置過小,有可能影響發送性能和穩定性。建議保持默認值16384。單位:字節。linger.ms
: 每條消息在緩存中的最長時間。若超過這個時間,Producer客戶端就會忽略batch.size
的限制,立即把消息發往服務器。建議根據業務場景, 設置linger.ms
在100~1000之間。單位:毫秒。
因此,云消息隊列 Kafka 版Producer客戶端什么時候把消息批量發送至服務器是由batch.size
和linger.ms
共同決定的。您可以根據具體業務需求進行調整。為了提升發送的性能,保障服務的穩定性, 建議您設置batch.size=16384
和linger.ms=1000
。
黏性分區策略
只有發送到相同分區的消息,才會被放到同一個Batch中,因此決定一個Batch如何形成的一個因素是云消息隊列 Kafka 版Producer端設置的分區策略。云消息隊列 Kafka 版Producer允許通過設置Partitioner的實現類來選擇適合自己業務的分區。在消息指定Key的情況下,云消息隊列 Kafka 版Producer的默認策略是對消息的Key進行哈希,然后根據哈希結果選擇分區,保證相同Key的消息會發送到同一個分區。
在消息沒有指定Key的情況下,云消息隊列 Kafka 版2.4版本之前的默認策略是循環使用主題的所有分區,將消息以輪詢的方式發送到每一個分區上。但是,這種默認策略Batch的效果會比較差,在實際使用中,可能會產生大量的小Batch,從而使得實際的延遲增加。鑒于該默認策略對無Key消息的分區效率低問題,云消息隊列 Kafka 版在2.4版本引入了黏性分區策略(Sticky Partitioning Strategy)。
黏性分區策略主要解決無Key消息分散到不同分區,造成小Batch問題。其主要策略是如果一個分區的Batch完成后,就隨機選擇另一個分區,然后后續的消息盡可能地使用該分區。這種策略在短時間內看,會將消息發送到同一個分區,如果拉長整個運行時間,消息還是可以均勻地發布到各個分區上的。這樣可以避免消息出現分區傾斜,同時還可以降低延遲,提升服務整體性能。
如果您使用的云消息隊列 Kafka 版Producer客戶端是2.4及以上版本,默認的分區策略就采用黏性分區策略。如果您使用的Producer客戶端版本小于2.4,可以根據黏性分區策略原理,自行實現分區策略,然后通過參數partitioner.class
設置指定的分區策略。
關于黏性分區策略實現,您可以參考如下Java版代碼實現。該代碼的實現邏輯主要是根據一定的時間間隔,切換一次分區。
public class MyStickyPartitioner implements Partitioner {
// 記錄上一次切換分區時間。
private long lastPartitionChangeTimeMillis = 0L;
// 記錄當前分區。
private int currentPartition = -1;
// 分區切換時間間隔,可以根據實際業務選擇切換分區的時間間隔。
private long partitionChangeTimeGap = 100L;
public void configure(Map<String, ?> configs) {}
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 獲取所有分區信息。
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
int availablePartitionSize = availablePartitions.size();
// 判斷當前可用分區。
if (availablePartitionSize > 0) {
handlePartitionChange(availablePartitionSize);
return availablePartitions.get(currentPartition).partition();
} else {
handlePartitionChange(numPartitions);
return currentPartition;
}
} else {
// 對于有key的消息,根據key的哈希值選擇分區。
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private void handlePartitionChange(int partitionNum) {
long currentTimeMillis = System.currentTimeMillis();
// 如果超過分區切換時間間隔,則切換下一個分區,否則還是選擇之前的分區。
if (currentTimeMillis - lastPartitionChangeTimeMillis >= partitionChangeTimeGap
|| currentPartition < 0 || currentPartition >= partitionNum) {
lastPartitionChangeTimeMillis = currentTimeMillis;
currentPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNum;
}
}
public void close() {}
}
OOM
結合云消息隊列 Kafka 版的Batch設計思路,云消息隊列 Kafka 版會緩存消息并打包發送,如果緩存太多,則有可能造成OOM(Out of Memory)。
buffer.memory
: 發送的內存池大小。如果內存池設置過小,則有可能導致申請內存耗時過長,從而影響發送性能,甚至導致發送超時。建議buffer.memory ≧ batch.size * 分區數 * 2
。單位:字節。buffer.memory
的默認數值是32 MB,對于單個Producer而言,可以保證足夠的性能。重要如果您在同一個JVM中啟動多個Producer,那么每個Producer都有可能占用32 MB緩存空間,此時便有可能觸發OOM。
在生產時,一般沒有必要啟動多個Producer;如有特殊情況需要,則需要考慮
buffer.memory
的大小,避免觸發OOM。
分區順序
單個分區(Partition)內,消息是按照發送順序儲存的,是基本有序的。
默認情況下,云消息隊列 Kafka 版為了提升可用性,并不保證單個分區內絕對有序,在升級或者宕機時,會發生少量消息亂序(某個分區掛掉后把消息Failover到其它分區)。
如果業務要求分區保證嚴格有序,請在創建Topic時選擇使用Local存儲。