动态

详情 返回 返回

記 Kafka Consumer 消息阻塞 - 动态 详情

事件概述

最近在接入一個新的埋點 Kafka Topic 後,遇到一個非常隱蔽的問題:

  • 新 Topic 一直消費不到消息;
  • 在公司 MQ 平台上查看 Topic,查不到消費組的註冊信息;
  • 日誌平台沒有任何 Error 日誌。

消費端的邏輯是典型的主動拉取模式,由異步線程循環執行:

  1. poll() 拉取一批消息;
  2. 事務處理(讀取消息、執行腳本、寫入存儲);
  3. commit 提交偏移量。

初步懷疑過:

  • Consumer 沒註冊成功? 檢查 Bean 加載正常;
  • 新 Kafka 集羣地址網絡不通? Ping 正常;
  • 事務處理異常? 加註釋排查,不是。

最終通過代碼跟蹤定位到 org.apache.kafka.clients.consumer.KafkaConsumer#poll,第一步就拋了異常,但由於是 UncheckedException,沒有被業務代碼 catch 打印,所以表面上看不到錯誤。

異常內容:

org.apache.kafka.common.errors.RecordTooLargeException: 
There are some messages at [Partition=Offset]: {new_bury_point_full_data_topic-3=1912557955} 
whose size is larger than the fetch size 1048576 and hence cannot be ever returned. 
Increase the fetch size, or decrease the maximum message size the broker will allow.

問題原因:拉取的消息內容太大,超過了 1048576(1MB)的上限。

搜索配置發現,這個值並沒有顯式設置,是 Kafka Consumer 端 max.partition.fetch.bytes 的默認值。

這類問題的隱蔽性在於:

  • Kafka 在 Producer、Broker、Consumer 三端都有消息大小限制;
  • RocketMQ 只有 Broker 端有限制;
  • 在 Kafka 消費端,如果批次中有一條消息超過限制,整個拉取會失敗並卡在當前 offset,後續消息無法消費;
  • 長期無法提交 offset,MQ 平台上就看不到消費組“在線”。

接下來,我們深入講解 Kafka 與 RocketMQ 在消息大小限制上的機制。

Kafka Consumer 參數

1. max.poll.records

  • 作用
    控制 poll() 方法每次調用 從 Kafka 返回的最大消息條數
  • 單位:條(record 條數)
  • 默認值500
  • 影響

    • 限制了單次批量處理的消息數量,防止一次拉取太多數據導致內存壓力過大或處理延遲過長。
    • 如果值太大,可能導致處理時間長,從而觸發 max.poll.interval.ms 超時,導致消費者被認為失效。
    • 如果值太小,會增加拉取次數,降低吞吐量。
  • 調優建議

    • 根據單條消息大小、處理速度和業務延遲要求來設置。
    • 如果處理速度較快,可以適當調大,提升吞吐量。
    • 如果單條消息很大或者處理很耗時,建議調小,避免一次 poll 拉取過多數據。

2. max.partition.fetch.bytes

  • 作用
    控制 每個分區 一次 fetch 請求中返回的最大字節數
  • 單位:字節(bytes)
  • 默認值1MB(1048576 字節)
  • 影響

    • 這是按分區的限制。例如一個消費者訂閲了 3 個分區,那麼一次 fetch 請求理論上最多可以返回 3 × max.partition.fetch.bytes 的數據。
    • 如果消息很大(單條消息接近或超過該值),需要保證 max.partition.fetch.bytes 大於等於 message.max.bytes(Broker 端)或 max.message.bytes(Topic 級別),否則消費者無法拉取該消息。
  • 調優建議

    • 如果單條消息比較大,需要調大該值。
    • 如果分區數多,調大該值會顯著增加一次 fetch 的數據量,需要考慮內存壓力。

3. fetch.max.bytes

  • 作用
    控制 一次 fetch 請求 從所有分區總共返回的最大字節數
  • 單位:字節(bytes)
  • 默認值50MB(52428800 字節)
  • 影響

    • 這是一個總量限制,即使 max.partition.fetch.bytes 允許每個分區返回更多數據,但總和不能超過 fetch.max.bytes
    • 如果分區很多,並且每個分區都接近 max.partition.fetch.bytes,可能會因為 fetch.max.bytes 限制而無法一次拉完所有數據。
  • 調優建議

    • 如果消費者訂閲的分區較多,並且每個分區消息量大,可以適當調大該值。
    • 但調太大可能會造成網絡和內存壓力。

三者關係總結

  • max.poll.records:限制的是 條數(處理批次大小)。
  • max.partition.fetch.bytes:限制的是 單分區單次請求的最大字節數。
  • fetch.max.bytes:限制的是 單次請求總字節數(所有分區加起來)。

Kafka 拉取數據的流程中,這幾個參數是層層限制的:

一次 poll() 能取到的數據量 
= min( max.poll.records 條數限制, 
       數據總字節數不超過 fetch.max.bytes, 
       每個分區的數據字節數不超過 max.partition.fetch.bytes )

調優建議示例

假設:

  • 消費者訂閲 5 個分區
  • 單條消息平均大小:10KB
  • 處理速度:500 條/秒
  • 希望單次 poll 處理 1 秒的數據量

調優:

  • max.poll.records = 500(一次 poll 拉 500 條)
  • 單個分區一次拉取的最大數據量:

    max.partition.fetch.bytes >= 10KB × (500 / 5分區) = 1MB
  • 總拉取量:

    fetch.max.bytes >= 5 × max.partition.fetch.bytes
    
    ## Kafka 各端限制
    
    好的,這個問題其實很關鍵,因為 Kafka 在 **Producer → Broker → Consumer** 整個鏈路上都有消息大小的限制,而且這幾個限制相互關聯,如果配置不一致會導致生產或消費失敗。我們來詳細拆解一下。
    
    ---
    
    ### **1. Kafka 消息大小限制的三層**
    
    Kafka 一條消息從 Producer 發出,到 Broker 存儲,再到 Consumer 拉取,中間會經過三個端的限制:
  • Producer 端限制
  • Broker 端限制
  • Consumer 端限制

    每一端都有自己的參數控制最大消息大小,必須保證三端的限制一致或相互匹配,否則會出錯。


    2. Producer 端限制

    核心參數

  • max.request.size

    • 作用:Producer 端單次請求(包含一批消息)的最大字節數。
    • 默認值1MB(1048576 字節)
    • 注意

      • 這是單次發送的總大小限制,不是單條消息的限制。如果是批量發送(batch),所有消息加起來不能超過這個值。
      • 如果單條消息就超過該值,會直接報錯(RecordTooLargeException)。
  • buffer.memory

    • 作用:Producer 端用於緩衝待發送消息的總內存大小。
    • 默認值32MB
    • 影響:如果發送速度大於網絡傳輸速度,緩衝區滿了會阻塞 send() 或拋異常。
  • batch.size

    • 作用:Producer 端按分區批量發送的最大字節數。
    • 默認值16KB
    • 影響:不是直接的消息大小限制,但會影響批量發送策略。

    調優建議

  • 如果要發送大消息(比如幾 MB),必須同時調大 max.request.size,並且保證 Broker 端允許的大小更大。
  • 如果批量消息總大小超過 max.request.size 會報錯,需要調大。


    3. Broker 端限制

    核心參數

  • message.max.bytes

    • 作用:Broker 端單條消息允許的最大字節數。
    • 默認值1MB
    • 注意

      • 如果 Producer 發送的消息超過這個值,Broker 會拒絕並返回錯誤。
      • 這是 Broker 層面全局的限制,可以在 server.properties 配置。
  • replica.fetch.max.bytes

    • 作用:Follower 副本從 Leader 拉取消息時,單次請求的最大字節數。
    • 默認值1MB
    • 注意

      • 必須 ≥ message.max.bytes,否則副本無法同步大消息,會導致 ISR 縮小甚至丟數據。
  • Topic 級別的限制

    • 可以在創建 Topic 時用 max.message.bytes 單獨設置該 Topic 的最大消息大小,優先級高於全局 message.max.bytes

    調優建議

  • 如果要支持大消息,必須保證:

    replica.fetch.max.bytes >= message.max.bytes
  • 如果有多個 Topic,可以按需調整 Topic 級別的 max.message.bytes


    4. Consumer 端限制

    核心參數

  • max.partition.fetch.bytes

    • 作用:Consumer 從單個分區一次 fetch 請求的最大字節數。
    • 默認值1MB
    • 注意

      • 必須 ≥ message.max.bytes,否則 Consumer 無法拉取該分區的大消息。
      • 如果一個分區有一條大消息超過這個值,消費會卡住(因為無法拉取該消息)。
  • fetch.max.bytes

    • 作用:Consumer 一次 fetch 請求從所有分區總共能拉取的最大字節數。
    • 默認值50MB
    • 注意

      • 必須保證該值 ≥ 訂閲分區數 × max.partition.fetch.bytes,否則可能一次拉不全數據。

    調優建議

  • 如果 Broker 存了大消息,Consumer 必須調大 max.partition.fetch.bytes
  • 如果分區多且每個分區消息都很大,fetch.max.bytes 也要跟着調大。


    5. 三端參數對應關係

    假設要發送最大 10MB 的單條消息:

    Producer:
      max.request.size >= 10MB
      buffer.memory >= 10MB(否則可能阻塞)
    Broker:
      message.max.bytes >= 10MB
      replica.fetch.max.bytes >= 10MB
    Consumer:
      max.partition.fetch.bytes >= 10MB
      fetch.max.bytes >= 分區數 × 10MB

    鏈路要求

    Producer.max.request.size ≤ Broker.message.max.bytes ≤ Consumer.max.partition.fetch.bytes

    並且:

    Broker.replica.fetch.max.bytes ≥ Broker.message.max.bytes

    6. 常見錯誤場景

  • Producer 發大消息失敗

    • 原因:max.request.size 小於消息大小。
    • 解決:調大 Producer 的 max.request.size
  • Broker 拒收消息

    • 原因:message.max.bytes 小於 Producer 消息大小。
    • 解決:調大 Broker 的 message.max.bytes,同時調大 replica.fetch.max.bytes
  • Consumer 卡住不消費

    • 原因:max.partition.fetch.bytes 小於 Broker 存儲的大消息。
    • 解決:調大 Consumer 的 max.partition.fetch.bytesfetch.max.bytes

    7. 總結表

    參數 默認值 作用 調優重點
    Producer max.request.size 1MB 單次請求最大字節數 ≥ 消息大小
    buffer.memory 32MB 緩衝區大小 ≥ 批量消息總大小
    Broker message.max.bytes 1MB 單條消息最大字節數 ≥ Producer 最大消息
    replica.fetch.max.bytes 1MB 副本拉取最大字節數 ≥ message.max.bytes
    Consumer max.partition.fetch.bytes 1MB 單分區拉取最大字節數 ≥ message.max.bytes
    fetch.max.bytes 50MB 總拉取最大字節數 ≥ 分區數 × 單分區限制

    RocketMQ 各端

    好的,我們來講一下 RocketMQProducer → Broker → Consumer 三端的消息大小限制,以及它和 Kafka 的區別。

    RocketMQ 的消息大小限制機制比 Kafka 簡單一些,但也有幾個關鍵點需要注意,尤其是在大消息場景下。


    1. RocketMQ 消息鏈路回顧

    RocketMQ 消息流轉過程:

    Producer → Broker(CommitLog 存儲) → Consumer

    在這條鏈路上,每一端都有自己的限制參數或機制來控制單條消息大小。


    2. Producer 端限制

    核心參數

  • maxMessageSize

    • 作用:Producer 允許發送的單條消息的最大字節數。
    • 默認值4MB(4194304 字節)
    • 位置org.apache.rocketmq.client.producer.DefaultMQProducer
    • 注意

      • 這是單條消息(Message 對象)的限制,不是批量消息總大小。
      • 如果消息超過這個大小,Producer 會直接拋出 MESSAGE_SIZE_EXCEEDED 異常。
      • 如果是批量消息(send(List<Message>)),會按總大小來檢查。
  • 批量消息發送限制

    • RocketMQ 會在發送批量消息時檢查所有消息總大小不能超過 maxMessageSize
    • 這和 Kafka 的 max.request.size 類似。

    調優建議

  • 如果需要發送大消息,必須在 Producer 端調大 maxMessageSize,同時 Broker 端也要允許更大的消息。
  • RocketMQ 官方不建議發送特別大的消息(>4MB),推薦用 分片 + 合併 或者 外部存儲 + 存儲引用


    3. Broker 端限制

    核心參數

  • maxMessageSize

    • 作用:Broker 允許存儲的單條消息的最大字節數。
    • 默認值4MB
    • 位置broker.conf
    • 注意

      • 必須 ≥ Producer.maxMessageSize,否則 Broker 會拒收超大消息。
      • Broker 存儲層(CommitLog)會按照這個限制來寫入數據。
  • 網絡傳輸限制

    • RocketMQ 使用 Netty 傳輸消息,在 Broker 和 Producer/Consumer 之間傳輸的消息大小受 maxMessageSize 控制。
    • 如果超過這個值,網絡層也會拒絕。

    調優建議

  • 如果要支持大消息,Broker 的 maxMessageSize 必須調大,並且和 Producer 保持一致。
  • 調大消息大小會增加磁盤 IO 壓力和網絡延遲,要評估性能影響。


    4. Consumer 端限制

    RocketMQ 的 Consumer 端沒有像 Kafka 那樣的複雜字節數限制參數,它主要依賴 Broker 的限制來保證消息大小不會超過 Consumer 能處理的範圍。但也有一些隱性限制:

  • 拉取消息的批量大小

    • 參數pullBatchSize
    • 默認值32(條數)
    • 作用:一次拉取的最大消息條數。
    • 注意:如果單條消息很大,批量拉取多條可能導致內存壓力。
  • 一次拉取的最大字節數

    • 參數pullThresholdSizeForQueue
    • 默認值100MB
    • 作用:針對單個隊列一次拉取的總字節數限制(RocketMQ 4.9+)。
    • 注意:這個值必須 ≥ 單條最大消息大小,否則會卡住。

    調優建議

  • 如果 Broker 允許大消息,Consumer 端要保證 pullThresholdSizeForQueue 足夠大。
  • 批量拉取時要結合單條消息大小調整 pullBatchSize,防止一次拉取太多佔用過多內存。


    5. 三端參數對應關係

    假設要發送最大 10MB 的單條消息:

    Producer:
      maxMessageSize >= 10MB
    Broker:
      maxMessageSize >= 10MB
    Consumer:
      pullThresholdSizeForQueue >= 10MB

    鏈路要求


---

### **6. 常見錯誤場景**

1. **Producer 發送大消息失敗**
   - 原因:Producer `maxMessageSize` 小於消息大小。
   - 解決:調大 Producer 的 `maxMessageSize`。

2. **Broker 拒收消息**
   - 原因:Broker `maxMessageSize` 小於 Producer 消息大小。
   - 解決:調大 Broker 的 `maxMessageSize`。

3. **Consumer 內存溢出**
   - 原因:一次拉取太多大消息,`pullBatchSize` 和 `pullThresholdSizeForQueue` 沒有控制好。
   - 解決:調小批量條數,或調大總字節數限制。

---

### **7. RocketMQ 與 Kafka 消息大小限制的區別**

| 對比點       | Kafka                                                   | RocketMQ                                    |
|--------------|---------------------------------------------------------|---------------------------------------------|
| Producer 限制 | `max.request.size`(總請求字節數)                      | `maxMessageSize`(單條消息字節數)          |
| Broker 限制   | `message.max.bytes`(單條消息字節數)                   | `maxMessageSize`(單條消息字節數)          |
| Consumer 限制 | `max.partition.fetch.bytes` / `fetch.max.bytes`         | `pullThresholdSizeForQueue`(總字節數)     |
| 默認大小      | 1MB                                                     | 4MB                                         |
| 配置複雜度    | 多參數,Producer/Broker/Consumer 端都需匹配              | 參數少,但 Producer/Broker 必須一致         |
| 大消息處理建議| 分片、壓縮、外部存儲                                    | 分片、壓縮、外部存儲                        |

Add a new 评论

Some HTML is okay.