前言
這次是繼 《記 Kafka Consumer 消息阻塞(1)》 之後,其實應該是放在同一篇文章裏面。但因為是新問題,就再加一篇文章。
還是繼那篇文章,提出要調大 max.partition.fetch.bytes、message.max.bytes 的參數值。但是不能調太大,調太大之後,同樣帶來新的問題。
本次就是新問題。再調大10倍後,消費能力下降了不止100倍。
通過消費的監控圖來看,不是不消費,而是隔將近半小時才消費一次。
原以為這兩個參數只是名義上只限制上限,不會影響實際值,但大錯特錯。
1. 場景參數
下面我們模擬場景吧,設定的條件:
- 每條消息大小:1 KB
- max.poll.records = 100
→ 每次poll()返回給應用線程的記錄數最多 100 條(即約 100 KB)。 - max.partition.fetch.bytes = 50 MB
→ 單個分區一次 fetch 請求最多拉取 50 MB 數據。 - fetch.max.bytes = 100 MB
→ 一次 fetch 請求總返回數據上限為 100 MB(所有分區合計)。
假設:
- 消費者訂閲了多個分區,比如 3 個分區。
- 每個分區上有大量可消費數據(遠超過 50 MB)。
- 網絡和內存都足夠大,不會限制拉取。
2. Kafka 拉取數據的兩個階段
理解這個過程的關鍵是分清拉取階段和應用消費階段:
階段 A:消費者從 broker 拉取到本地緩衝區
- 消費者後台線程(Fetcher)會週期性向 broker 發送
FetchRequest。 -
Broker 按你的參數限制:
- 每個分區不超過 50 MB(
max.partition.fetch.bytes)。 - 所有分區總和不超過 100 MB(
fetch.max.bytes)。
- 每個分區不超過 50 MB(
-
在你的場景:
-
如果訂閲了 3 個分區,Broker 可能會返回:
- P1: 50 MB
- P2: 50 MB
- P3: 不返回(因為總量已經到 100 MB)
- 一次網絡包大小 ≈ 100 MB(受
fetch.max.bytes限制)。
-
- 這些數據會被放到消費者端的內部緩衝區(fetch buffer),等待應用線程消費。
注意:這個階段與max.poll.records無關,因為max.poll.records是應用線程從緩衝區取數據的限制,不影響後台拉取量。
階段 B:應用線程從緩衝區取數據
- 當你調用
poll()方法時,Kafka 消費者會從緩衝區中取出消息。 - max.poll.records = 100 意味着一次
poll()最多返回 100 條記錄(100 KB)。 - 即使緩衝區中已經有 100 MB 數據,應用線程一次也只會拿 100 KB。
- 剩下的數據會繼續留在緩衝區,等下一次
poll()再取。
3. 完整時序
我們按時間順序看一次拉取和消費的過程:
-
後台線程發送 Fetch 請求
- 請求分區 P1、P2、P3 的數據。
- 告訴 broker:單分區最多 50 MB,總量最多 100 MB。
-
Broker 返回數據
- P1: 50 MB
- P2: 50 MB
- 總量達到 100 MB,P3 暫時不返回。
- 數據通過網絡傳輸到消費者端。
-
數據進入消費者緩衝區
- 內部緩衝區現在有 100 MB 數據。
-
應用線程調用 poll()
- 從緩衝區取出 100 條記錄(每條 1 KB) → 共 100 KB。
- 返回給你的業務代碼處理。
-
緩衝區剩餘數據
- 還剩下 100 MB - 100 KB ≈ 99.9 MB 數據在緩衝區中。
- 下次
poll()會繼續從剩餘數據中取,不會再立即拉取新的數據(除非緩衝區不足)。
-
循環進行
- 當緩衝區數據消耗到一定程度,後台線程會再次向 broker 拉取數據,填充到緩衝區。
4. 數據量總結
在你的場景中:
| 階段 | 數據量 | 控制參數 |
|---|---|---|
| 一次從 broker 拉取到緩衝區 | ≤ 100 MB(總量受 fetch.max.bytes 限制,單分區 ≤ 50 MB) |
fetch.max.bytes、max.partition.fetch.bytes |
| 一次 poll() 返回給應用線程 | ≤ 100 KB(100 條 × 1 KB) | max.poll.records |
關鍵點:
- 拉取量和消費量是兩個不同的概念。
- 拉取量受
fetch.max.bytes和max.partition.fetch.bytes控制。 - 消費量受
max.poll.records控制。 - 如果拉取量遠大於消費量,緩衝區可能長期積壓數據,佔用大量內存。
5. 額外注意
- 如果你的
max.poll.records很小,而拉取量很大,緩衝區會一直積壓數據,可能導致延遲消費。 - 如果消費者的處理速度慢,
fetch.max.bytes設置太大,可能會導致內存壓力。 - Kafka 內部還有一個參數
max.poll.interval.ms,如果 poll 間隔太久,消費者會被認為掛掉,觸發 rebalance。
可視化流程圖(簡化版)
[Broker]
↑ Fetch Response (<= fetch.max.bytes)
| ├─ P1: <= max.partition.fetch.bytes
| ├─ P2: <= max.partition.fetch.bytes
| └─ ...
[Consumer 內部緩衝區]
↓ poll() (<= max.poll.records 條)
[應用線程處理]