博客 / 詳情

返回

記 Kafka Consumer 消息阻塞(2)

前言

這次是繼 《記 Kafka Consumer 消息阻塞(1)》 之後,其實應該是放在同一篇文章裏面。但因為是新問題,就再加一篇文章。

還是繼那篇文章,提出要調大 max.partition.fetch.bytesmessage.max.bytes 的參數值。但是不能調太大,調太大之後,同樣帶來新的問題。

本次就是新問題。再調大10倍後,消費能力下降了不止100倍。

通過消費的監控圖來看,不是不消費,而是隔將近半小時才消費一次。

原以為這兩個參數只是名義上只限制上限,不會影響實際值,但大錯特錯。

1. 場景參數

下面我們模擬場景吧,設定的條件:

  • 每條消息大小:1 KB
  • max.poll.records = 100
    → 每次 poll() 返回給應用線程的記錄數最多 100 條(即約 100 KB)。
  • max.partition.fetch.bytes = 50 MB
    → 單個分區一次 fetch 請求最多拉取 50 MB 數據。
  • fetch.max.bytes = 100 MB
    → 一次 fetch 請求總返回數據上限為 100 MB(所有分區合計)。

假設:

  • 消費者訂閲了多個分區,比如 3 個分區。
  • 每個分區上有大量可消費數據(遠超過 50 MB)。
  • 網絡和內存都足夠大,不會限制拉取。

2. Kafka 拉取數據的兩個階段

理解這個過程的關鍵是分清拉取階段應用消費階段

階段 A:消費者從 broker 拉取到本地緩衝區

  • 消費者後台線程(Fetcher)會週期性向 broker 發送 FetchRequest
  • Broker 按你的參數限制:

    • 每個分區不超過 50 MB(max.partition.fetch.bytes)。
    • 所有分區總和不超過 100 MB(fetch.max.bytes)。
  • 在你的場景:

    • 如果訂閲了 3 個分區,Broker 可能會返回:

      • P1: 50 MB
      • P2: 50 MB
      • P3: 不返回(因為總量已經到 100 MB)
    • 一次網絡包大小 ≈ 100 MB(受 fetch.max.bytes 限制)。
  • 這些數據會被放到消費者端的內部緩衝區(fetch buffer),等待應用線程消費。
注意:這個階段與 max.poll.records 無關,因為 max.poll.records 是應用線程從緩衝區取數據的限制,不影響後台拉取量。

階段 B:應用線程從緩衝區取數據

  • 當你調用 poll() 方法時,Kafka 消費者會從緩衝區中取出消息。
  • max.poll.records = 100 意味着一次 poll() 最多返回 100 條記錄(100 KB)。
  • 即使緩衝區中已經有 100 MB 數據,應用線程一次也只會拿 100 KB。
  • 剩下的數據會繼續留在緩衝區,等下一次 poll() 再取。

3. 完整時序

我們按時間順序看一次拉取和消費的過程:

  1. 後台線程發送 Fetch 請求

    • 請求分區 P1、P2、P3 的數據。
    • 告訴 broker:單分區最多 50 MB,總量最多 100 MB。
  2. Broker 返回數據

    • P1: 50 MB
    • P2: 50 MB
    • 總量達到 100 MB,P3 暫時不返回。
    • 數據通過網絡傳輸到消費者端。
  3. 數據進入消費者緩衝區

    • 內部緩衝區現在有 100 MB 數據。
  4. 應用線程調用 poll()

    • 從緩衝區取出 100 條記錄(每條 1 KB) → 共 100 KB。
    • 返回給你的業務代碼處理。
  5. 緩衝區剩餘數據

    • 還剩下 100 MB - 100 KB ≈ 99.9 MB 數據在緩衝區中。
    • 下次 poll() 會繼續從剩餘數據中取,不會再立即拉取新的數據(除非緩衝區不足)。
  6. 循環進行

    • 當緩衝區數據消耗到一定程度,後台線程會再次向 broker 拉取數據,填充到緩衝區。

4. 數據量總結

在你的場景中:

階段 數據量 控制參數
一次從 broker 拉取到緩衝區 ≤ 100 MB(總量受 fetch.max.bytes 限制,單分區 ≤ 50 MB) fetch.max.bytesmax.partition.fetch.bytes
一次 poll() 返回給應用線程 ≤ 100 KB(100 條 × 1 KB) max.poll.records

關鍵點

  • 拉取量消費量是兩個不同的概念。
  • 拉取量受 fetch.max.bytesmax.partition.fetch.bytes 控制。
  • 消費量受 max.poll.records 控制。
  • 如果拉取量遠大於消費量,緩衝區可能長期積壓數據,佔用大量內存。

5. 額外注意

  • 如果你的 max.poll.records 很小,而拉取量很大,緩衝區會一直積壓數據,可能導致延遲消費。
  • 如果消費者的處理速度慢,fetch.max.bytes 設置太大,可能會導致內存壓力。
  • Kafka 內部還有一個參數 max.poll.interval.ms,如果 poll 間隔太久,消費者會被認為掛掉,觸發 rebalance。
可視化流程圖(簡化版)
[Broker]
   ↑ Fetch Response (<= fetch.max.bytes)
   |   ├─ P1: <= max.partition.fetch.bytes
   |   ├─ P2: <= max.partition.fetch.bytes
   |   └─ ...
[Consumer 內部緩衝區]
   ↓ poll() (<= max.poll.records 條)
[應用線程處理]
user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.