Kafka4.x配置詳解
- server.properties
- broker.properties
- consumer.properties
- producer.properties
server.properties
- broker.id 在4.x的時候就已經移除掉了,採用node.id
# 服務器的角色,設置此項表示啓用 KRaft 模式。
# Kafka 4.x 已刪除 ZooKeeper,Kafka 進程可以具有兩種角色:
## broker:正常的數據、topic、分區服務
## controller:管理集羣元數據(替代 ZooKeeper)
# 注意:如果是多節點集羣至少需要 1 個 controller 節點:process.roles=broker,controller
process.roles=broker
# KRaft 模式下,node.id 替代了舊的 broker.id。需保證唯一
node.id=2
# 這定義了 controller quorum(類似 ZooKeeper 的職責)節點地址。
## 但 broker-only 節點只需要這個簡單配置,實際 controller 節點必須使用
## controller.listener.names=CONTROLLER
# 集羣部署中必須配置多個 controller 節點
## controller.quorum.voters=1@host1:9093,2@host2:9093,3@host3:9093
controller.quorum.bootstrap.servers=localhost:9093
############################# Socket Server Settings #############################
# 指定 Socket 服務監聽的地址
# Kafka 為不同協議提供多個 Listener,例如:PLAINTEXT、SSL、SASL_SSL、CONTROLLER(KRaft 特有)
# 生產環境推薦使用:
## 1.內部 broker 通信:INTERNAL 2.外部客户端連接:CLIENT 3.Control plane:CONTROLLER
# 注意:如果主機名為0.0.0.0,那麼將綁定所以的網絡接口地址;若主機名為空,則綁定默認的網絡接口地址;如果端口地址小於1024,則必須使用root權限啓動Kafka(不建議端口小於1024).
# 推薦:應設置成內網 IP 或 0.0.0.0
listeners=PLAINTEXT://localhost:9092
# Broker 之間內部通信使用的 listener 名稱。
# 必須是 listeners= 中定義的一個 listener
inter.broker.listener.name=PLAINTEXT
# Broker 對外暴露給客户端的訪問地址。
# 對容器、K8s、生產環境非常關鍵,必須是客户端能訪問到的地址(不能用 localhost)
advertised.listeners=PLAINTEXT://localhost:9092
# 控制器使用的監聽列表。
## 注意:如果是純 broker 節點:不會用到 CONTROLLER;controller 節點必須配置 CONTROLLER 監聽端口;
controller.listener.names=CONTROLLER
# 監聽器與安全協議映射。
# 如果沒有為監聽器(listeners=localhost:9092)指定安全協議,則還需要額外配置以下的參數
## 安全協議PLAINTEXT、SSL、SASL_SSL、CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# 網絡線程數,默認足夠,流量高可調大
num.network.threads=3
# IO 請求線程數,與磁盤數量相關(物理上獨立的存儲設備數量(非文件夾之類的))
## 設置應大致等於或略大於 Broker 所使用的物理磁盤數量
## 每個 I/O 線程可以並行處理一個磁盤的 I/O 請求
### Kafka 的日誌(log)數據通常分佈在多個磁盤上(通過配置 log.dirs 指定多個目錄,每個目錄掛載在不同物理磁盤上)
### 如果你有 N 塊物理磁盤,並且 Kafka 的日誌目錄分別位於這些磁盤上,那麼理論上你可以並行地對這 N 塊磁盤進行讀寫。
### 如果 num.io.threads < 磁盤數,則某些磁盤可能需要排隊等待 I/O 線程,無法充分利用磁盤並行能力。
### 如果 num.io.threads > 磁盤數,雖然不會出錯,但額外的線程可能處於空閒狀態,浪費資源(不過影響通常不大)
## 避免 I/O 成為瓶頸
### Kafka 是高吞吐系統,其性能受限於磁盤 I/O 能力。
### 設置合適的 num.io.threads 可以讓 Kafka 充分利用多磁盤的併發 I/O 能力,提升整體吞吐量。
num.io.threads=8
# 發送緩衝區
## TCP 緩衝區的理想大小 ≈ 帶寬 × 往返延遲(RTT)
### 本地局域網(低延遲,<1ms) 128 KB ~ 256 KB(131072 ~ 262144)
### 跨機房/雲環境(RTT 1~10ms) 1 MB ~ 2 MB(1048576 ~ 2097152)
### 高帶寬長距離(如跨區域) 2 MB ~ 4 MB(2097152 ~ 4194304)
# 注意:操作系統對 socket buffer 有最大限制(可通過 net.core.wmem_max 查看),需確保 Kafka 設置不超過系統上限。
socket.send.buffer.bytes=102400
# 接收緩衝區
# 推薦與socket.send.buffer.bytes設為相同值
# 特殊情況下
## 寫多讀少(如日誌收集) receive > send(如 2MB vs 1MB)
## 讀多寫少(如實時數倉消費) send > receive(如 2MB vs 1MB)
## 副本同步壓力大(多副本、跨機房) 兩者都調高(因 Leader ↔ Follower 雙向通信)
socket.receive.buffer.bytes=102400
# 最大請求大小,保護 Broker 不被大消息打垮
# 必須 ≥ message.max.bytes 和 replica.fetch.max.bytes
## message.max.bytes(Broker)單條消息最大大小(Producer 端需 ≤ 此值)
## replica.fetch.max.bytes Follower 拉取副本時單次請求最大字節數
## fetch.max.bytes(Consumer) Consumer 單次拉取最大字節數
socket.request.max.bytes=104857600
############################# Log Basics #############################
# 日誌文件存儲目錄。
## 1.必須配置成獨立磁盤路徑,生產不要用 /tmp! 2.多目錄用逗號分隔
# 如果指定了多個路徑,那麼broker會根據“最少使用”原則,把同一分區的日誌片段保存到同一個路徑下。
# 注意:broker會向數量最少的目錄新增分區,而不是向可用磁盤空間最小的目錄新增分區,所以並不能保證數據會被均衡地分佈在多個目錄中
log.dirs=/data/kafka1,/data/kafka2
# 創建 Topic 的默認分區數
# 注意:可以增加主題的分區,不能減少
# 通過分區來實現集羣的負載均衡,分區數量設置為集羣broker數量或倍數,這樣可以讓分區均衡地分佈到broker上
num.partitions=1
# Kafka使用線程池來處理日誌片段,影響 Kafka “冷啓動”或“故障恢復”的速度
## 當服務器啓動時,用於打開每個分區的日誌片段;
## 當服務器奔潰並重啓時,用於檢查和截短每個分區的日誌片段(快速恢復已刷盤但未完全提交的狀態)
## 當服務器正常關閉時,用於關閉日誌片段。
# 總恢復線程數 = num.recovery.threads.per.data.dir × log.dirs
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# 消費者 offset 主題 副本數,生產必須 ≥3
offsets.topic.replication.factor=1
# 事務日誌副本,生產必須 ≥3
transaction.state.log.replication.factor=1
# 事務 ISR 最小副本,必須比 replication.factor 小 1
transaction.state.log.min.isr=1
# 用於支持多消費者共享消費同一個分區(類似 RocketMQ 的集羣消費模式)
# 設置內部狀態 Topic __share_group_state 的副本數,決定該 Topic 在多少個 Broker 上保存副本。
share.coordinator.state.topic.replication.factor=1
# 設置該內部 Topic 寫入時要求的最小 ISR(In-Sync Replicas)數量。如果 ISR 數量 < 此值,寫入會失敗(拋出 NotEnoughReplicasException)
share.coordinator.state.topic.min.isr=1
############################# Log Flush Policy #############################
# Kafka Broker 中控制 日誌刷盤(flush)頻率
## Long.MaxValue(≈永不按條數刷),每個分區累計達到 N 條消息後,強制刷盤
## null(依賴 OS 默認策略),每個分區距離上次刷盤超過 N 毫秒後,強制刷盤
## Kafka 默認不主動刷盤,而是依賴操作系統的 Page Cache + 後台 flush 機制(通常 30 秒一次)。這兩個參數用於覆蓋默認行為,強制更頻繁刷盤。
# 潛在問題:性能下降,fsync() 是昂貴的同步 I/O 操作,會阻塞寫入線程;每秒刷一次對高吞吐場景(如 >10MB/s)會造成顯著瓶頸。
# 收益有限:現代操作系統和文件系統(ext4/XFS + journal)已提供強可靠性;Kafka 副本機制(replication)本身就能容忍單機宕機,刷盤並非唯一保障。
# 推薦不配置
#log.flush.interval.messages=10000
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# 用於控制 Topic 日誌的保留策略
## 按時間保留:消息寫入後超過 N 小時即標記為可刪除
log.retention.hours=168
# 按大小保留:分區總大小超過 N 字節後,刪除最舊的日誌段
# 原則 1:優先使用時間保留,謹慎使用字節保留
# 原則 2:如需限制磁盤空間,應通過監控 + 告警 + 擴容解決,而非強制刪數據
# 原則 3:如果必須設置 log.retention.bytes,請按分區計算合理值
# 原則 4:不同 Topic 應差異化配置
#log.retention.bytes=1073741824
# 影響Kafka的 存儲效率、I/O 性能、恢復速度和磁盤管理。
## 一旦日誌片段被關閉,就開始進入過期倒計時。這個參數的值越小,關閉和分配新文件就會越頻繁,從而降低整體的磁盤寫入效率(日誌片段被關閉之前消息是不會過期的)
# 日誌段(Log Segment)的大小,每個日誌段(.log 文件)的最大字節數。每當segment寫滿,Kafka就會關閉它並創建新的segment
# 每個 segment 對應一組文件:.log(數據)、.index(偏移索引)、.timeindex(時間戳索引)
## 1 GiB 適合大多數生產場景
log.segment.bytes=1073741824
# 保留策略檢查頻率
# Broker 檢查是否需要刪除過期/超量日誌段的頻率。
log.retention.check.interval.ms=300000
## 即使消息已過期,只要所在 segment 未 closed(未寫滿),就不會被刪除。
## 日誌片段的大小也會影響時間戳獲取偏移量的行為。當使用時間戳獲取日誌偏移量時,Kafka會查找在指定時間戳寫入的日誌片段文件,也就是創建時間在指定時間戳之前且最後修改時間在指定時間戳之後的文件。然後,Kafka會返回這個日誌片段開發的偏移量(也就是文件名)。
broker.properties
# 該服務器的角色。設置它將使 Kafka 進入 KRaft 模式。
process.roles=broker
# 與該實例角色關聯的節點 ID。
node.id=2
# KRaft 控制器法定數(quorum)的引導服務器列表。
## Broker 啓動時通過此地址找到 Controller 集羣
controller.quorum.bootstrap.servers=localhost:9093
############################# Socket Server Settings #############################
# Broker 監聽的地址。
listeners=PLAINTEXT://localhost:9092
# Broker 之間通信使用的監聽名稱。決定 Broker 內部複製與協調時用哪個 listener
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092
# 控制器使用的監聽器名稱集合。Controller 與 Broker 之間通信使用 CONTROLLER listener;在純 broker 節點僅使用第一個值
controller.listener.names=CONTROLLER
# 監聽器名稱到安全協議的映射表。指定 listener 使用的安全協議,如 PLAINTEXT、SSL、SASL
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# 處理網絡請求的線程數。
num.network.threads=3
# 處理磁盤 I/O 和請求邏輯。
num.io.threads=8
# Socket 緩衝區大小
socket.send.buffer.bytes=102400
# 每個請求最大
## 如果消息大於 10MB,建議避免使用 Kafka
## 若必須處理大消息,建議加大為 200MB+
socket.receive.buffer.bytes=102400
# 每個請求最大
socket.request.max.bytes=104857600
############################# Log Basics #############################
# 存放 Kafka 數據文件的目錄
log.dirs=/tmp/kraft-broker-logs
# 默認創建 Topic 的分區數
num.partitions=1
# 啓動重建日誌 segment 的線程
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# 系統主題的副本因子
offsets.topic.replication.factor=1
# 消費者 offset 的副本數
transaction.state.log.replication.factor=1
# 事務狀態日誌副本數
transaction.state.log.min.isr=1
share.coordinator.state.topic.replication.factor=1
share.coordinator.state.topic.min.isr=1
############################# Log Flush Policy #############################
# 多少條消息後觸發刷盤
#log.flush.interval.messages=10000
# 消息在 log 中停留多久必須刷盤。
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# 日誌保留時間段
log.retention.hours=168
# 按大小清理
#log.retention.bytes=1073741824
# 每個segment最大值
log.segment.bytes=1073741824
# 每 5 分鐘檢查一次過期 segment。
log.retention.check.interval.ms=300000
consumer.properties
# 同一個 group.id 下的所有 consumer 共同消費同一組分區,即:每條消息只會被這個 group 中的一個 consumer 接收。
group.id=test-consumer-group
# 當該 group 在某個分區上:沒有已提交 offset;或者 offset 已經過期 / 被刪除時,Kafka 決定 從哪裏開始消費。
## 推薦:數據倉庫 / Snowflake / 大屏 / 報表型:auto.offset.reset=earliest
## 實時通知 / 風控 / 推送型:auto.offset.reset=latest
#auto.offset.reset=
# 當沒有已提交 offset 時,從哪裏開始消費:earliest / latest / none
# 歷史+實時都要:earliest
# 只要新數據:latest
# auto.offset.reset=earliest
# key 反序列化類
# key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# value 反序列化類
# value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 是否自動定期提交 offset
# enable.auto.commit=true
# 自動提交間隔(毫秒),僅在 enable.auto.commit=true 時生效
# auto.commit.interval.ms=5000
# 是否自動定期提交 offset
# enable.auto.commit=true
# 自動提交間隔(毫秒),僅在 enable.auto.commit=true 時生效
# auto.commit.interval.ms=5000
# 單次 poll 最大返回多少條記錄
# max.poll.records=500
# 允許應用處理一批消息的最長時間
# 超過該時間還沒 poll,下次會被認為掛了,觸發 rebalance
# max.poll.interval.ms=300000 # 5 分鐘
# consumer 與 broker 的會話超時時間
# session.timeout.ms=10000
# 心跳間隔(必須小於 session.timeout.ms)
# heartbeat.interval.ms=3000
# 單個分區一次 fetch 的最小字節數(可提高批量效率)
# fetch.min.bytes=1
# fetch 調用等待的最長時間(配合 fetch.min.bytes)
# fetch.max.wait.ms=500
# 單次 fetch 返回數據的最大總字節數(必須 >= broker 的 message.max.bytes)
# fetch.max.bytes=52428800 # 50MB
# 如果要保證只讀已提交的事務消息,使用 read_committed
# isolation.level=read_committed
producer.properties
bootstrap.servers=localhost:9092
# producer 將 record batch 先壓縮 → 再發送給 broker
# broker 按原樣存儲該壓縮數據 → consumer 拉取時解壓
# 壓縮的目標:減小網絡帶寬、磁盤佔用,提高整體吞吐。
## none : 不壓縮,CPU 開銷最小;網絡和磁盤佔用最大
## gzip : 壓縮率高,但 CPU 開銷大、延遲高;適合離線批處理、日誌歸檔
## snappy : 壓縮比 & CPU 開銷平衡比較好;被很多老項目用作默認
## lz4 : 壓縮速度快、解壓快,壓縮率也不錯;非常適合高吞吐在線流處理
## zstd : 新一代算法,壓縮率好,性能也不錯(Kafka 新版本里推薦)
compression.type=none
# batch 大小和 linger 增大一點提高吞吐
# batch.size=32768
# linger.ms=10
# key/value 序列化
# key.serializer=org.apache.kafka.common.serialization.StringSerializer
# value.serializer=org.apache.kafka.common.serialization.StringSerializer
本文章為轉載內容,我們尊重原作者對文章享有的著作權。如有內容錯誤或侵權問題,歡迎原作者聯繫我們進行內容更正或刪除文章。