動態

詳情 返回 返回

TDMQ CKafka 版客户端實戰指南系列之一:生產最佳實踐 - 動態 詳情

導語

在當今數字化時代,數據的產生和流動呈爆發式增長,消息隊列作為一種高效的數據傳輸和處理工具,在各種應用場景中發揮着關鍵作用。TDMQ CKafka 版作為一款分佈式、高吞吐量、高可擴展性的消息系統,100% 兼容開源 Kafka API 2.4、2.8、3.2 版本 ,基於發佈 / 訂閲模式,通過消息解耦,使生產者和消費者異步交互,無需彼此等待。憑藉高可用、數據壓縮、同時支持離線和實時數據處理等優點,TDMQ CKafka 版廣泛應用於日誌壓縮收集、監控數據聚合、流式數據集成等場景。

對於開發者而言,深入瞭解並熟練掌握 TDMQ CKafka 版的生產消費實踐至關重要。它不僅能夠幫助我們構建高效、穩定的數據傳輸和處理系統,還能在面對海量數據時,確保系統的性能和可靠性。本文將詳細介紹 TDMQ CKafka 版的生產實踐教程,包括生產消息的各個環節以及相關的參數配置和最佳實踐,希望能為大家在實際項目中應用 TDMQ CKafka 版提供有益的參考和指導。

生產篇:步步為營,高效生產

Topic 使用與創建

配置要求:推薦節點的整倍數副本,減少數據傾斜問題,同步複製最小同步副本數為2,且同步副本數不能等於 Topic 副本數,否則宕機1個副本會導致無法生產消息。

創建方式:支持選擇是否開啓 CKafka 自動創建 Topic 的開關。選擇開啓後,表示生產或消費一個未創建的 Topic 時,會自動創建一個默認值包含3個分區和2個副本的 Topic,控制枱支持修改默認值。

分區數估計

分區數的準確估算能實現數據的均衡分佈。為了達到這個目的,分區數建議為節點數的整倍數。同時,還需結合預估流量來設置,按照 10MB/s 一個分區的標準來計算。例如,若一個 Topic 的預估吞吐為 100MB/s,那麼建議設置分區數為 10。這樣可以確保在高流量情況下,消息能均勻地分佈在各個分區,避免某個分區負載過高。

失敗重試

在分佈式環境中,由於網絡等原因,消息發送偶爾會失敗,其原因可能是消息已經發送成功但是 ACK 機制失敗或者是消息確實沒有發送成功,這就需要設置合理的重試策略,您可以根據業務需求,設置以下重試參數:

  • Retries:用於設置重試次數,默認值為 3。重試不成功會觸發報錯,如果客户不接受消息丟失,建議改重試次數或者手動重試。
  • Retry.backoff.ms:設置重試間隔,建議設置為 1000。這個間隔時間可以讓生產者在重試前等待一段時間,避免在短時間內頻繁重試。

這樣將能應對 Broker 的 Leader 分區出現無法立刻響應 Producer 請求的情況。

異步發送

消息發送接口通常是異步的,這意味着生產者在發送消息後不需要等待消息被完全處理就可以繼續執行其他任務。如果想要接收發送的結果,可以使用 Send 方法中的 Callback 接口獲取發送結果。

一個 Producer 對應一個應用

Producer 是線程安全的,且可以往任何 Topic 發送消息。通常情況下,建議一個應用對應一個 Producer。

Acks

Kafka 的 ACK 機制,指 Producer 的消息發送確認機制,Acks 參數決定了生產者在發送消息後等待服務端響應的方式,對 Kafka 集羣的吞吐量和消息可靠性有直接影響。

Acks 的參數説明如下:

  • Acks=0 時,當生產者採用無確認機制時,消息發送後無需等待任何 Broker 節點的響應即可繼續執行,這種模式可獲得最高的吞吐性能,但因缺乏寫入保障機制,存在較高的數據丟失風險;
  • Acks=1 時,採用主節點單確認機制時,生產者僅需等待 Leader 副本完成消息寫入即會收到確認響應。該模式在性能與可靠性間取得平衡,但需注意:若 Leader 節點在同步完成前發生故障,已發送但未同步的消息存在部分丟失的可能性;
  • Acks=all 時,啓用全副本確認機制時,生產者必須等待 Leader 副本及所有同步副本(ISR 集合)均完成消息持久化後才會收到確認。雖然該模式通過多重冗餘保障實現了最高級別的數據安全性(僅當整個 ISR 集羣同時失效時才會丟失數據),但跨節點同步帶來的延遲使其吞吐性能相對較低。

一般建議選擇 Acks=1,對於重要的服務可以設置 Acks=all 。

Batch

通常情況下,TDMQ CKafka 版的 Topic 會設置多個分區。Producer 客户端向服務端發送消息時,要先明確消息要發送到哪個 Topic 的哪個分區。當向同一分區發送多條消息時,Producer 客户端會把這些消息整合成一個 Batch,批量發送至服務端。不過,Producer 客户端在處理 Batch 時會產生額外開銷。一般來説,小 Batch 會使 Producer 客户端產生大量請求,致使請求在客户端和服務端堆積排隊,還會提高相關機器的 CPU 使用率,進而整體抬高消息發送和消費的延遲。而設置一個合適的 Batch 大小,能減少客户端向服務端發送消息時的請求次數,從整體上提升消息發送的吞吐量。

以下是 Batch 相關參數的説明:

  • Batch.size:這是發往每個分區的消息緩存量閾值,當緩存的消息量達到這個設定值時,就會觸發一次網絡請求,隨後 Producer 客户端會把消息批量發送到服務器。
  • Linger.ms:它規定了每條消息在緩存中的最長停留時間,如果消息在緩存中的時間超過這個值,Producer 客户端就會不再遵循 Batch.size 的限制,直接把消息發送到服務器。
  • Buffer.memory:當所有緩存消息的總體大小超過這個數值時,就會觸發消息發送到服務器的操作,此時會忽略 Batch.size 和 Linger.ms 的限制。Buffer.memory 的默認值是 32MB,對於單個 Producer 而言,這個數值足以保障其性能。

Key 和 Value

消息隊列中的消息有 Key(消息標識)和 Value(消息內容)兩個字段。為消息設置一個唯一的 Key 便於追蹤消息,通過打印發送日誌和消費日誌,就能瞭解該消息的生產和消費情況。比如在電商訂單系統中,將訂單號作為 Key,就可以輕鬆追蹤訂單消息的流轉過程。如果消息發送量較大,建議不要設置 Key,並使用黏性分區策略。

黏性分區

在消息隊列 Kafka 中,只有被髮送至同一分區的消息才會被歸入同一個 Batch,所以 Kafka Producer 端配置的分區策略是影響 Batch 形成的關鍵因素之一。Kafka Producer 支持用户通過自定義 Partitioner 實現類,來契合業務需求選擇合適的分區方式。

當消息指定了 Key 時,Kafka Producer 默認會先對消息的 Key 進行哈希計算,再依據哈希結果選定分區,以此確保相同 Key 的消息能夠發送到同一分區。

當消息未指定 Key 時,在 Kafka 2.4 版本之前,其默認分區策略是按順序循環遍歷主題下的所有分區,以輪詢形式把消息依次發送到各分區。不過,這種默認策略在 Batch 聚合方面表現不佳,實際應用中容易生成大量小 Batch,進而導致消息處理的實際延遲上升。為改善無 Key 消息分區效率低的問題,Kafka 在 2.4 版本推出了黏性分區策略(Sticky Partitioning Strategy)。

黏性分區策略重點針對無 Key 消息被分散到不同分區、進而產生眾多小 Batch 的問題。它的核心機制是,當某個分區的 Batch 處理完畢,會隨機挑選另一個分區,之後儘可能讓後續消息都發送到這個新選定的分區。從短期視角看,消息會集中發送到同一分區;但從長期運行來看,消息仍能均勻分佈到各個分區。如此一來,既避免了消息在分區上分佈不均(分區傾斜),又能降低延遲,提升整個服務的性能。

要是你用的 Kafka Producer 客户端版本是 2.4 及以上,那默認就會採用黏性分區策略。要是客户端版本低於 2.4,你可以依據黏性分區策略的原理,自己動手實現分區策略,再通過參數 Partitioner.class 來指定所設置的分區策略。

關於黏性分區策略的實現,下面給出了 Java 版的代碼示例,其核心邏輯是按照一定時間間隔來切換分區。

public class MyStickyPartitioner implements Partitioner {
    // 記錄上一次切換分區時間。
    private long lastPartitionChangeTimeMillis = 0L;
    // 記錄當前分區。
    private int currentPartition = -1;
    // 分區切換時間間隔,可以根據實際業務選擇切換分區的時間間隔。
    private long partitionChangeTimeGap = 100L;
    public void configure(Map<String, ?> configs) {}
    /**      * Compute the partition for the given record.      *
     * @param topic The topic name      * @param key The key to partition on (or null if no key)      * @param keyBytes serialized key to partition on (or null if no key)      * @param value The value to partition on or null      * @param valueBytes serialized value to partition on or null      * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 獲取所有分區信息。
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            int availablePartitionSize = availablePartitions.size();
            // 判斷當前可用分區。
            if (availablePartitionSize > 0) {
                handlePartitionChange(availablePartitionSize);
                return availablePartitions.get(currentPartition).partition();
            } else {
                handlePartitionChange(numPartitions);
                return currentPartition;
            }
        } else {
            // 對於有key的消息,根據key的哈希值選擇分區。
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
    private void handlePartitionChange(int partitionNum) {
        long currentTimeMillis = System.currentTimeMillis();
        // 如果超過分區切換時間間隔,則切換下一個分區,否則還是選擇之前的分區。
        if (currentTimeMillis - lastPartitionChangeTimeMillis >= partitionChangeTimeGap
            || currentPartition < 0 || currentPartition >= partitionNum) {
            lastPartitionChangeTimeMillis = currentTimeMillis;
            currentPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNum;
        }
    }
    public void close() {}
}

分區順序

單個分區(Partition)內,消息是按照發送順序儲存的,是基本有序的。每個主題下面都有若干分區,如果消息被分配到不同的分區中,不同 Partition 之間不能保證順序。

如果需要消息具有消費順序性,可以在生產端指定這一類消息的 Key,這類消息都用相同的 Key 進行消息發送,CKafka 就會根據 Key 哈希取模選取其中一個分區進行存儲,由於一個分區只能由一個消費者進行監聽消費,此時消息就具有消息消費的順序性了。

TDMQ CKafka 版順序消息場景實踐教程

順序消息場景

在 TDMQ CKafka 版中,確保消息順序性的主要手段依賴於其分區(Partition)設計以及消息 Key 的使用。客户端所涉及的順序消息使用場景可分為兩類:一是全局順序場景,二是分區順序場景。針對這兩種場景,CKafka 的實踐教程如下:

  1. 全局順序:為保證全局順序,在 CKafka 控制枱,您需設置 Topic 分區為1,副本數可以根據具體使用場景和可用性要求以及平衡成本指定,建議設置為2。

    全局順序由於單分區存在吞吐上限,因此整體吞吐不會太高,單分區吞吐指標請參見:https://cloud.tencent.com/document/product/597/52489。

  2. 分區順序:為保證分區順序,您可以根據預估 Topic 的業務流量,除以單分區流量,取整後獲得分區數,同時為避免數據傾斜,分區數儘量向節點整倍數取整,從而確定最終合理的分區數。單分區的吞吐量可參見:https://cloud.tencent.com/document/product/597/52489。

    在發送 CKafka 消息時候,需要指定 Key,CKafka 會根據 Key 計算出一個哈希值,確保具有相同 Key 的消息會被髮送到同一個分區,從而確保這些消息在分區內部是有序的。同時建議儘可能讓業務 Key 分散,如果生產消息都指定同一個 Key,那麼分區順序會退化為全局順序,從而降低整體的寫入吞吐。

參數實踐教程

由於順序消息,要求消息有序、不重複,默認的 Kafka 生產者發送參數當遇到網絡抖動、Kafka Broker 節點變化、分區 Leader 選舉等場景,容易出現消息重複、亂序問題,因此順序場景,必須對 Kafka 生產者參數進行特別設置,關鍵參數設置如下:

  • Enable.idempotence

Enable.idempotence 表示是否開啓冪等功能。順序場景建議開啓冪等功能,應對上述場景出現的分區消息亂序、消息重複等問題。建議 Kafka 的 Producer 設置:Enable.idempotence 為 True。需要注意,該功能要確保 Kafka 的 Broker 版本大於等於0.11,即 Kafka versions >= 0.11,同時:從 Kafka 3.0開始包括3.0,Kafka 的 Producer 默認 Enable.idempotence=True 和 Acks=All ,而對於 Kafka 版本>=0.11且 Kafka<3.0 的版本,默認是關閉冪等的,因此建議順序場景顯式指定該參數值確保開啓冪等。

  • Acks

在開啓冪等後,Acks 需要顯式指定為 All,如果不指定為 All 的話,則會無法通過參數校驗從而報錯。

  • Max.in.flight.requests.per.connection

默認情況下,Kafka 生產者會嘗試儘快發送記錄,Max.in.flight.requests.per.connection 表示一個 Connection 同時發送的最大請求數,默認值是5。Kafka 在 0.11 版本之後包括 0.11,小於 1.1 的版本,即(Kafka >= 0.11 & < 1.1),Kafka Broker 沒有針對該方面優化,需要設置

Max.in.flight.requests.per.connection 為1,在 Kafka>=1.1 後,針對冪等場景的吞吐進行優化,在 Broker 端會維持一個隊列對5個併發批次的消息的順序進行順序校驗,允許 Max.in.flight.requests.per.connection 設置5,但不能大於5。

因此建議:

  • Kafka >= 0.11 & < 1.1:顯式設置 Max.in.flight.requests.per.connection 為1。
  • Kafka>=1.1:顯式設置 Max.in.flight.requests.per.connection 可以為 1<=Max.in.flight.requests.per.connection<=5;建議設置為5。
  • Retries

在順序場景下,建議指定重試參數,Retries 在不同版本,有不同的默認行為,在 Kafka <= 2.0,默認為0;Kafka >= 2.1,默認為 Integer.MAX\_VALUE,即2147483647;建議順序場景,顯式設置為 Integer.MAX\_VALUE。

總結

在順序場景中,需要開啓的生產者參數示例如下:

Kafka >= 0.11 & < 1.1:

// create Producer properties
Properties properties = new Properties();
properties.setProperty("enable.idempotence", "true");
properties.setProperty("acks", "all");
properties.setProperty("max.in.flight.requests.per.connection", "1");
properties.setProperty("retries", Integer.toString(Integer.MAX_VALUE));

Kafka>=1.1:

// create Producer properties
Properties properties = new Properties();
properties.setProperty("enable.idempotence", "true");
properties.setProperty("acks", "all");
properties.setProperty("max.in.flight.requests.per.connection", "5");
properties.setProperty("retries", Integer.toString(Integer.MAX_VALUE));

數據傾斜

Kafka Broker 數據傾斜問題通常是由於分區分佈不均勻或者生產者發送數據的 Key 分佈不均勻導致的,會引發幾類問題:

  1. 整體流量沒有限流,但是節點局部限流;
  2. 某些節點負載過大,導致整體 Kafka 使用率不高,影響整體吞吐。

針對該類問題可以通過以下方式進行優化:

  1. 使用合理分區數,分區數保障為節點數的整倍數。
  2. 合理的分區策略,例如:RoundRobin(輪詢)、Range(範圍)和 Sticky(粘性)或者自定義的分區策略,均衡發送消息。
  3. 查是否使用 Key 進行發送,如果使用了 Key 進行發送,儘量設計策略讓 Key 更加分區均衡。

總結

在消息隊列的消息生產環節中,“高效” 不僅是吞吐的追求,更是穩定性與可靠性的平衡。無論是 Topic 的副本與分區設計、重試策略的精細調優,還是順序消息的場景化實現,每一個配置細節都可能影響集羣的整體性能。本文圍繞 TDMQ CKafka 版的生產實踐,詳解如何通過合理的參數設置與策略選擇,構建高可靠、低延遲的消息生產鏈路,避免數據傾斜、消息亂序等典型問題,為業務流量的平穩流轉奠定基礎。下一篇,我們將會為大家詳細介紹 TDMQ CKafka 版的消費實踐,敬請期待!

user avatar hppyvyv6 頭像 seven97_top 頭像 automq 頭像
點贊 3 用戶, 點贊了這篇動態!
點贊

Add a new 評論

Some HTML is okay.