動態

詳情 返回 返回

Kafka4.0 可觀測性最佳實踐 - 動態 詳情

Kafka4.0 介紹

Kafka4.0 的重大變革 —— KRaft 模式。Kafka4.0 最具革命性的變化,默認運行在 KRaft(Kafka Raft)模式下,徹底摒棄了對 Apache ZooKeeper 的依賴。KRaft 模式的引入,可謂是 Kafka 架構演進的一次重大飛躍。它基於 Raft 一致性算法構建共識機制,將元數據管理功能巧妙地集成到 Kafka 自身的體系之中,從而實現了對 ZooKeeper 的無縫替換。

主要優勢:

  • 簡化部署與運維流程:運維人員從此無需再為搭建和維護複雜的 ZooKeeper 集羣耗費大量精力,大大降低了整體的運營開銷。新的架構設計極大地簡化了系統的複雜性,使得 Kafka 的安裝、配置以及日常管理工作變得更加直觀、高效,即使是新手也能輕鬆上手。
  • 顯著增強可擴展性:在 KRaft 模式下,Kafka 集羣的擴展性得到了進一步的提升。新增 Broker 節點的操作變得更加簡便快捷,能夠更好地適應大規模數據處理場景下,對系統資源進行動態調整的需求。無論是應對業務高峯期的數據洪峯,還是隨着業務增長逐步擴展集羣規模,KRaft 模式都能遊刃有餘。
  • 提升系統性能與穩定性:去除 ZooKeeper 這一外部依賴後,Kafka 在元數據操作的響應速度和一致性方面表現得更加出色。特別是在高併發寫入和讀取的場景中,系統的穩定性和可靠性得到了顯著增強,有效減少了因外部組件故障而可能引發的單點問題,為企業級應用提供了更加堅實可靠的底層支撐。

觀測雲

觀測雲是一款專為 IT 工程師打造的全鏈路可觀測產品,它集成了基礎設施監控、應用程序性能監控和日誌管理,為整個技術棧提供實時可觀察性。這款產品能夠幫助工程師全面瞭解端到端的用户體驗追蹤,瞭解應用內函數的每一次調用,以及全面監控雲時代的基礎設施。此外,觀測雲還具備快速發現系統安全風險的能力,為數字化時代提供安全保障。

部署 DataKit

DataKit 是一個開源的、跨平台的數據收集和監控工具,由觀測雲開發並維護。它旨在幫助用户收集、處理和分析各種數據源,如日誌、指標和事件,以便進行有效的監控和故障排查。DataKit 支持多種數據輸入和輸出格式,可以輕鬆集成到現有的監控系統中。

登錄觀測雲控制枱,在「集成」 - 「DataKit」選擇對應安裝方式,當前採用 Linux 主機部署 DataKit。

圖片

採集步驟

下載 JMX Exporter

下載地址:https://github.com/prometheus/jmx_exporter/releases/tag/1.3.0

配置 JMX 腳本和啓動參數

注意:採集 Producer、Consumer、Streams、Connect 指標需要開各自獨立進程,啓動各自進程時注意替換對應的 yaml 文件和對應的啓動腳本,如下可參考。

KRaft Metrics

  • 創建 KRaft Metrics 配置文件 kafka.yml

# ------------------------------------------------------------

# Kafka 4 Prometheus JMX Exporter Configuration

# ------------------------------------------------------------

lowercaseOutputName: false

lowercaseOutputLabelNames: true

cacheRules: true

rules:

# 1. Broker / Topic / Partition Metrics

  - pattern: kafka.server<type=BrokerTopicMetrics, name=(BytesInPerSec|BytesOutPerSec|MessagesInPerSec|TotalFetchRequestsPerSec|ProduceRequestsPerSec|FailedProduceRequestsPerSec|TotalProduceRequestsPerSec|ReassignmentBytesInPerSec|ReassignmentBytesOutPerSec|ProduceMessageConversionsPerSec|FetchMessageConversionsPerSec)(?:, topic=([-\.\w]*))?><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)

    name: kafka_server_broker_topic_metrics_$1

    type: GAUGE

    labels:

      topic: "$2"

# 2. Request / Network Metrics

  - pattern: kafka.network<type=RequestMetrics, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)

    name: kafka_network_request_metrics_$1

    type: GAUGE

# 3. Socket Server Metrics

  - pattern: kafka.network<type=SocketServer, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate|Value)

    name: kafka_network_socket_server_metrics_$1

    type: GAUGE

# 4. Log / Segment / Cleaner Metrics

  - pattern: kafka.log<type=LogFlushStats, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)

    name: kafka_log_$1_$2

    type: GAUGE

# 5. Controller (KRaft) Metrics

  - pattern: kafka.controller<type=KafkaController, name=(.+)><>(Count|Value)

    name: kafka_controller_$1

    type: GAUGE

# 6. Group / Coordinator Metrics

  - pattern: kafka.coordinator.group<type=GroupMetadataManager, name=(.+)><>(Count|Value)

    name: kafka_coordinator_group_metadata_manager_$1

    type: GAUGE

# 7. KRaft Specific Metrics

  - pattern: kafka.controller<type=KafkaController, name=(LeaderElectionSuccessRate|LeaderElectionLatencyMs)><>(Count|Value)

    name: kafka_controller_$1

    type: GAUGE

# 8. New Generation Consumer Rebalance Protocol Metrics

  - pattern: kafka.coordinator.group<type=GroupMetadataManager, name=(RebalanceTimeMs|RebalanceFrequency)><>(Count|Value)

    name: kafka_coordinator_group_metadata_manager_$1

    type: GAUGE

# 9. Queue Metrics

  - pattern: kafka.server<type=Queue, name=(QueueSize|QueueConsumerRate)><>(Count|Value)

    name: kafka_server_queue_$1

    type: GAUGE

# 10. Client Metrics

  - pattern: kafka.network<type=RequestMetrics, name=(ClientConnections|ClientRequestRate|ClientResponseTime)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)

    name: kafka_network_request_metrics_$1

    type: GAUGE

# 11. Log Flush Rate and Time

  - pattern: kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)

    name: kafka_log_log_flush_rate_and_time_ms

    type: GAUGE
  • 啓動參數

export KAFKA_HEAP_OPTS="-Xms1g -Xmx1g"

export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=9999 \

  -Dcom.sun.management.jmxremote.rmi.port=9999 \

  -Dcom.sun.management.jmxremote.authenticate=false \

  -Dcom.sun.management.jmxremote.ssl=false \

  -Djava.rmi.server.hostname=127.0.0.1"

export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7071:/opt/jmx_exporter/kafka.yml"

/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/kraft/server.properties

Producer Metrics

  • 創建 Procucer Metrics 配置文件 producer.yml

---

lowercaseOutputName: true

rules:

  # 新增:producer-node-metrics

  - pattern: kafka\.producer<type=producer-node-metrics, client-id=([^,]+), node-id=([^>]+)><>([^:]+)

    name: kafka_producer_node_$3

    labels:

      client_id: "$1"

      node_id: "$2"

    type: GAUGE

  - pattern: 'kafka\.producer<type=producer-metrics, client-id=([^>]+)><>([^:,\s]+).*'

    name: 'kafka_producer_metrics_$2'

    labels:

      client_id: "$1"

    type: GAUGE

  # 抓取 Selector 全部指標(Kafka 4.0 新增)

  - pattern: 'kafka\.(?:(producer|consumer|connect))<type=(producer|consumer|connect)-metrics, client-id=([^>]+)><>(connection-.+|io-.+|network-.+|select-.+|send-.+|receive-.+|reauthentication-.+)'

    name: 'kafka_${1}_${4}'

    labels:

      client_id: '$3'

    type: GAUGE
  • 啓動參數

export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7072:/opt/jmx_exporter/producer.yml"

/opt/kafka/kafka/bin/kafka-console-producer.sh \

  --broker-list localhost:9092 \

  --topic xxxx \

  --producer-property bootstrap.servers=localhost:9092

Consumer Metrics

  • 創建 Consumer Metrics 配置文件consumer.yml

lowercaseOutputName: true

rules:

  # consumer-coordinator-metrics

  - pattern: 'kafka\.consumer<type=consumer-coordinator-metrics, client-id=([^>]+)><>([^:,\s]+).*'

    name: 'kafka_consumer_coordinator_metrics_$2'

    labels:

      client_id: "$1"

    type: GAUGE

  - pattern: 'kafka\.consumer<type=consumer-metrics, client-id=([^>]+)><>([^:,\s]+).*'

    name: 'kafka_consumer_metrics_$2'

    labels:

      client_id: "$1"
  • 啓動參數

export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7073:/opt/jmx_exporter/consumer.yml"

/opt/kafka/kafka/bin/kafka-console-consumer.sh \

  --broker-list localhost:9092 \

  --topic xxxx \

  --producer-property bootstrap.servers=localhost:9092

Streams Metrics

  • 創建 Streams Metrics 配置文件stream.yml

lowercaseOutputName: true

lowercaseOutputLabelNames: true

rules:

  # Kafka Streams 應用指標 - 移除特殊字符

  - pattern: 'kafka.streams<type=stream-metrics, client-id=(.+)><>([a-zA-Z0-9\-]+)$'

    name: kafka_streams_$2

    labels:

      client_id: "$1"



  # 處理包含特殊字符的屬性名

  - pattern: 'kafka.streams<type=stream-metrics, client-id=(.+)><>([a-zA-Z0-9\-]+):(.+)$'

    name: kafka_streams_$2_$3

    labels:

      client_id: "$1"

  # Processor Node 指標

  - pattern: 'kafka.streams<type=stream-processor-node-metrics, client-id=(.+), task-id=(.+), processor-node-id=(.+)><>(.+)'

    name: kafka_streams_processor_$4

    labels:

      client_id: "$1"

      task_id: "$2"

      processor_node_id: "$3"



  # Task 指標

  - pattern: 'kafka.streams<type=stream-task-metrics, client-id=(.+), task-id=(.+)><>(.+)'

    name: kafka_streams_task_$3

    labels:

      client_id: "$1"

      task_id: "$2"



  # 線程指標

  - pattern: 'kafka.streams<type=stream-thread-metrics, client-id=(.+), thread-id=(.+)><>(.+)'

    name: kafka_streams_thread_$3

    labels:

      client_id: "$1"

      thread_id: "$2"



  # JVM 指標

  - pattern: 'java.lang<type=Memory><>(.+)'

    name: jvm_memory_$1



  - pattern: 'java.lang<type=GarbageCollector, name=(.+)><>(\w+)'

    name: jvm_gc_$2

    labels:

      gc: "$1"



  # 線程池指標

  - pattern: 'java.lang<type=Threading><>(.+)'

    name: jvm_threads_$1



  # 默認規則

  - pattern: '(.*)'
  • 啓動參數

export KAFKA_HEAP_OPTS="-Xms512m -Xmx512m"

export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=9996 \

  -Dcom.sun.management.jmxremote.rmi.port=9996 \

  -Dcom.sun.management.jmxremote.authenticate=false \

  -Dcom.sun.management.jmxremote.ssl=false \

  -Djava.rmi.server.hostname=127.0.0.1"

export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7075:/opt/jmx_exporter/stream.yml"

java $KAFKA_HEAP_OPTS $KAFKA_JMX_OPTS $EXTRA_ARGS -cp "libs/*:my-streams.jar" WordCountDemo

Connect Metrics

  • 創建 Connect Metrics 配置文件 connect.yml

lowercaseOutputName: true

lowercaseOutputLabelNames: true

rules:

  # 1) connect-worker-metrics(全局)

  - pattern: 'kafka\.connect<type=connect-worker-metrics><>([^:]+)'

    name: 'kafka_connect_worker_$1'

    type: GAUGE

  # 2) connect-worker-metrics,connector=xxx

  - pattern: 'kafka\.connect<type=connect-worker-metrics, connector=([^>]+)><>([^:]+)'

    name: 'kafka_connect_worker_$2'

    labels:

      connector: "$1"

    type: GAUGE

  # 3) connect-worker-rebalance-metrics

  - pattern: 'kafka\.connect<type=connect-worker-rebalance-metrics><>([^:]+)'

    name: 'kafka_connect_worker_rebalance_$1'

    type: GAUGE

  # 4) connector-task-metrics

  - pattern: 'kafka\.connect<type=connector-task-metrics, connector=([^>]+), task=([^>]+)><>([^:]+)'

    name: 'kafka_connect_task_$3'

    labels:

      connector: "$1"

      task_id: "$2"

    type: GAUGE

  # 5) sink-task-metrics

  - pattern: 'kafka\.connect<type=sink-task-metrics, connector=([^>]+), task=([^>]+)><>([^:]+)'

    name: 'kafka_connect_sink_task_$3'

    labels:

      connector: "$1"

      task_id: "$2"
  • 啓動參數

export KAFKA_HEAP_OPTS="-Xms512m -Xmx512m"

export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote \

  -Dcom.sun.management.jmxremote.authenticate=false \

  -Dcom.sun.management.jmxremote.ssl=false \

  -Dcom.sun.management.jmxremote.port=9995 \

  -Dcom.sun.management.jmxremote.rmi.port=9995 \

  -Djava.rmi.server.hostname=127.0.0.1"

export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7074:/opt/jmx_exporter/connect.yml"

# 啓動 Kafka Connect

/opt/kafka/kafka/bin/connect-distributed.sh /opt/kafka/kafka/config/connect-distributed.properties

啓動成功後,可通過 curl http://IP:端口號/metrics 查看獲取到的監控數據。

配置 DataKit

  • 進入 datakit 安裝目錄下的 conf.d/prom 目錄,複製 prom.conf.sample 並命名為 kafka.conf

cp prom.conf.sample kafka.conf
  • 調整 kafka.conf

[[inputs.prom]]

  ## Exporter URLs.

  urls = ["http://127.0.0.1:7071/metrics","http://127.0.0.1:7072/metrics","http://127.0.0.1:7073/metrics","http://127.0.0.1:7074/metrics","http://127.0.0.1:7075/metrics"]

  ## Collector alias.

  source = "kafka"

  ## Prioritier over 'measurement_name' configuration.

  [[inputs.prom.measurements]]

    prefix = "kafka_controller_"

    name = "kafka_controller"



  [[inputs.prom.measurements]]

    prefix = "kafka_network_"

    name = "kafka_network"

  [[inputs.prom.measurements]]

    prefix = "kafka_log_"

    name = "kafka_log"

  [[inputs.prom.measurements]]

    prefix = "kafka_server_"

    name = "kafka_server"

  [[inputs.prom.measurements]]

    prefix = "kafka_connect_"

    name = "kafka_connect"

  [[inputs.prom.measurements]]

    prefix = "kafka_stream_"

    name = "kafka_stream"

重啓 DataKit

執行以下命令:


datakit service -R

指標集

以下是 kafka4.0 部分指標説明,更多指標可參考 Kafka 指標詳情。

kafka_server指標集

| 指標名 | 描述 | 單位 |

| Fetch_queue_size | 當前活躍的 Broker 數量 | Count |

| Produce_queue_size | 當前活躍的 Broker 數量 | Count |

| Request_queue_size | 當前活躍的 Broker 數量 | Count |

| broker_topic_metrics_BytesInPerSec | 當前活躍的 Broker 數量 | Count |

| broker_topic_metrics_BytesOutPerSec | 當前活躍的 Broker 數量 | Count |

| broker_topic_metrics_FailedProduceRequestsPerSec | 當前活躍的 Broker 數量 | Count |

| broker_topic_metrics_FetchMessageConversionsPerSec | 當前活躍的 Broker 數量 | Count |

| broker_topic_metrics_MessagesInPerSec | 當前活躍的 Broker 數量 | Count |

| broker_topic_metrics_ProduceMessageConversionsPerSec | 當前活躍的 Broker 數量 | Count |

| broker_topic_metrics_TotalFetchRequestsPerSec | 當前活躍的 Broker 數量 | Count |

| broker_topic_metrics_TotalProduceRequestsPerSec | 當前活躍的 Broker 數量 | Count |

| socket_server_metrics_connection_count | 當前活躍的 Broker 數量 | Count |

| socket_server_metrics_connection_close_total | 當前活躍的 Broker 數量 | Count |

| socket_server_metrics_incoming_byte_rate | 當前活躍的 Broker 數量 | Count |

kafka_network 指標集

| 指標名 | 描述 | 單位 |

| request_metrics_RequestBytes_request_AddOffsetsToTxn | AddOffsetsToTxn 請求大小 | bytes |

| request_metrics_RequestBytes_request_Fetch | Fetch 請求大小 | count |

| request_metrics_RequestBytes_request_FetchConsumer | FetchConsumer 請求大小 | bytes |

| request_metrics_RequestBytes_request_FetchFollower | FetchFollower 請求大小 | bytes |

| request_metrics_TotalTimeMs_request_CreateTopics | CreateTopics 請求總時間 | ms |

| request_metrics_TotalTimeMs_request_CreatePartitions | CreatePartitions 請求總時間 | ms |

| request_metrics_RequestQueueTimeMs_request_CreatePartitions | CreatePartitions 在請求對列等待時間 | ms |

| request_metrics_RequestQueueTimeMs_request_Produce | Produce 在請求對列的等待時間 | ms |

kafka_controller 指標集

| 指標名 | 描述 | 單位 |

| ActiveBrokerCount | 當前活躍的 Broker 數量 | Count |

| ActiveControllerCount | 活躍控制器數量 | Count |

| GlobalPartitionCount | 分區數量 | Count |

| GlobalTopicCount | 主題數量 | Count |

| IgnoredStaticVoters | bytes | Kong的帶寬使用量,單位為字節 |

| OfflinePartitionsCount | 離線分區數量 | Count |

| PreferredReplicaImbalanceCount | Preferred Leader 選舉條件的分區數 | Count |

| TimedOutBrokerHeartbeatCount | Broker 心跳超時的次數 | Count |

kafka_producer 指標集

| 指標名 | 描述 | 單位 |

| producer_metrics_batch_split_rate | 批次分割率 | count/s |

| producer_metrics_buffer_available_bytes | 未使用的緩衝區內存總量 | bytes |

| producer_metrics_buffer_exhausted_rate | 緩衝區耗盡而丟棄的每秒平均記錄發送數量 | count/s |

| producer_metrics_buffer_total_bytes | 緩衝區總字節大小 | bytes |

| producer_metrics_bufferpool_wait_ratio | 緩衝池等待比率 | % |

| producer_metrics_bufferpool_wait_time_ns_total | 緩衝池等待時間 | ms |

| producer_metrics_connection_close_rate | 關閉連接率 | count/s |

| producer_metrics_connection_count | 關閉連接數量 | count |

kafka_consumer 指標集

| 指標名 | 描述 | 單位 |

| consumer_coordinator_metrics_failed_rebalance_total | 再平衡失敗數量 | count |

| consumer_coordinator_metrics_heartbeat_rate | 每秒平均心跳次數 | count/s |

| consumer_coordinator_metrics_heartbeat_response_time_max | 心跳響應最大時間 | count |

| consumer_coordinator_metrics_join_rate | Group 每秒加入速率 | count/s |

| consumer_coordinator_metrics_join_total | Group 加入總數 | count |

| consumer_coordinator_metrics_last_rebalance_seconds_ago | 自上次重新平衡事件以來的秒數 | ms |

| consumer_coordinator_metrics_rebalance_latency_total | 重新平衡延遲總計 | ms |

| consumer_fetch_manager_metrics_bytes_consumed_rate | 每秒消耗的字節數 | bytes/s |

| consumer_fetch_manager_metrics_fetch_latency_avg | Fetch 請求延遲 | ms |

| consumer_metrics_connection_count | 連接數 | count |

| consumer_metrics_incoming_byte_rate | 輸入字節數率 | bytes/s |

kafka_connect 指標集

| 指標名 | 描述 | 單位 |

| worker_connector_count | Connector 數量 | count |

| worker_task_startup_attempts_total | 任務啓動重試次數 | count |

| worker_connector_startup_attempts_total | 連接器嘗試啓動次數 | count |

| worker_task_startup_failure_total | 任務啓動失敗數量 | count |

| worker_connector_startup_failure_percentage | 連接失敗率 | % |

| worker_rebalance_completed_rebalances_total | 再平衡完成總數 | count |

| worker_task_startup_failure_percentage | 任務啓動失敗佔比 | % |

| worker_rebalance_time_since_last_rebalance_ms | 自上次重新平衡以來的時間 | ms |

| worker_task_startup_attempts_total | 任務嘗試啓動次數 | count |

kafka_stream 指標集

| 指標名 | 描述 | 單位 |

| stream_thread_metrics_thread_start_time | 線程啓動時間 | 時間戳 ms |

| stream_thread_metrics_task_created_total | 任務創建總數 | count |

| stream_state_metrics_block_cache_capacity | 塊緩存大小 | bytes |

| stream_state_metrics_all_rate | 所有操作率 | count/s |

| stream_state_metrics_block_cache_usage | 塊緩存使用率 | % |

| stream_state_metrics_bytes_read_compaction_rate | 字節讀取壓縮率 | bytes/s |

| stream_state_metrics_bytes_written_compaction_rate | 字節寫入壓縮率 | bytes/s |

| stream_state_metrics_block_cache_index_hit_ratio | 塊緩存索引命中率 | % |

| stream_state_metrics_block_cache_data_hit_ratio | 塊緩存數據命中率 | % |

場景視圖

登錄觀測雲控制枱,點擊「場景」 -「新建儀表板」,輸入 “Kafka 4”, 選擇 “Kafka 4”,點擊 “確定” 即可添加視圖。

圖片

圖片

圖片

圖片

監控器(告警)

登錄觀測雲控制枱,點擊「監控」 -「新建監控器」,輸入 “kafka”, 選擇對應的監控器,點擊 “確定” 即可添加。

Kafka 連接過期被關閉客户端連接

圖片

Kafka 集羣在處理消費者拉取請求時的延遲過高

圖片

Kafka 集羣在處理生產者請求時的延遲過高

圖片

Kafka 集羣 ActiveController 為0異常

圖片

Kafka 離線分區數量過高異常

圖片

總結

通過監控 Kafka,我們可以實時掌握消息吞吐、消費者滯後、Broker 健康等關鍵指標,提前發現副本缺失、網絡擁塞或消費延遲,保障系統穩定;也能結合歷史基線做容量預測與異常檢測,為擴縮容、參數調優提供量化依據,讓數據持續高效、可觀測、可演進。

user avatar cherish_5ad82c136df47 頭像 kerrywu 頭像 alixitongruanjianjishu 頭像
點贊 3 用戶, 點贊了這篇動態!
點贊

Add a new 評論

Some HTML is okay.