博客 / 詳情

返回

Kafka集羣管理:🛠️ 如何實現數據均衡與性能最大化

Kafka 概述

Kafka 起初是 由 LinkedIn 公司採用 Scala 語言開發的一個多分區、多副本且基於 ZooKeeper 協調的分佈式消息系統,現已被捐獻給 Apache 基金會。

目前 Kafka 已經定位為一個分佈式流式處理平台,它以高吞吐、可持久化、可水平擴展、支持流數據處理等多種特性而被廣泛使用,主要是由 Scala 和 Java 編寫。

它是一種高吞吐量的分佈式發佈訂閲消息系統,可以處理事件流數據。通過 Kafka 你可以非常方便的把想要發佈的消息,分發給任何想要訂閲該消息的接收者。上游生產者只需要把消息輸入到 Kafka 指定 Topic ,下游接收者只要訂閲該 Topic ,就能低延時、高吞吐量的接收到上游的消息;Kafka 還支持 同一個 Topic 同時被多個下游消費者消費,且不同消費者之間數據處理進度互不干擾。

  • 對於一個 topic,他的每一個 partition 同一時間只能被同一消費者組中的一個消費者所消費
  • 相比於 AMQ,它更加輕量級:非侵入性的、依賴的東西非常少,佔用資源非常少,部署簡單,沒有太多依賴,比較容易使用。

目前越來越多的開源分佈式處理系統如 Cloudera、Storm、Spark、Flink 等都支持與 Kafka 集成,Kafka 之所以受到越來越多的青睞,與它所“扮演”的三大角色是分不開的:

  • 消息系統:Kafka 和傳統的消息系統(也稱作消息中間件)都具備系統解耦、冗餘存儲、流量削峯、緩衝、異步通信、擴展性、可恢復性等功能。與此同時,Kafka 還提供了大多數消息系統難以實現的消息順序性保障及回溯消費的功能。
  • 存儲系統:Kafka 把消息持久化到磁盤,相比於其他基於內存存儲的系統而言,有效地降低了數據丟失的風險。也正是得益於 Kafka 的消息持久化功能和多副本機制,我們可以把 Kafka 作為長期的數據存儲系統來使用,只需要把對應的數據保留策略設置為“永久”或啓用主題的日誌壓縮功能即可。
  • 流式處理平台:Kafka 不僅為每個流行的流式處理框架提供了可靠的數據來源,還提供了一個完整的流式處理類庫,比如窗口、連接、變換和聚合等各類操作。

Kafka 解決了什麼問題

消息隊列一般主要處理:異步處理、服務解耦、流量控制,因此 Kafka 作為消息隊列的一種,同樣在解決這些問題。

Kafka 技術特性

  1. 高吞吐量、低延遲:kafka 每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個 topic 可以分多個 partition, consumer group 對 partition 進行並行 consume 操作。
  2. 可擴展性:kafka 集羣支持熱擴展
  3. 持久性、可靠性:消息被持久化到本地磁盤,並且支持數據備份防止數據丟失,消息被消費仍然不會被立即刪除,而是會有過期時間。
  4. 容錯性:允許集羣中節點失敗(若副本數量為 n,則允許 n-1 個節點失敗)
  5. 高併發:支持數千個客户端同時讀寫
  6. 隊列模式:所有 consumer 都在一個隊列,這樣消息就在隊內進行分區並行消費
  7. 訂閲-發佈模式:所有 consumer 都不再一個隊列,這樣 topic 消息可以廣播給所有訂閲的消費者

Kafka 工作原理

圖片

Producer: 消息生產者,也就是發送消息的一方。生產者負責創建消息,然後將其投遞到 Kafka 中;

Consumer: 消息消費者,也就是接收消息的一方。消費者連接到 Kafka 上並接收消息,進而進行相應的業務邏輯處理;

Consumer Group (CG): 消費者組,由多個 consumer 組成。消費者組內每個消費者負責消費不同分區的數據,一個分區只能由一個組內消費者消費;消費者組之間互不影響。所有的消費者都屬於某個消費者組,即消費者組是邏輯上的一個訂閲者。

Broker: 服務代理節點。對於 Kafka 而言,Broker 可以簡單地看作一個獨立的 Kafka 服務節點或 Kafka 服務實例。大多數情況下也可以將 Broker 看作一台 Kafka 服務器,前提是這台服務器上只部署了一個 Kafka 實例。一個或多個 Broker 組成了一個 Kafka 集羣。一般而言,我們更習慣使用首字母小寫的 broker 來表示服務代理節點。

Controller: 集羣中會有一個或者多個 broker,其中有一個 broker 會被選舉為控制器(Kafka Controller),它負責管理整個集羣中所有分區和副本的狀態。

  • 當某個分區的 leader 副本出現故障時,由控制器負責為該分區選舉新的 leader 副本。
  • 當檢測到某個分區的 ISR 集合發生變化時,由控制器負責通知所有 broker 更新其元數據信息。
  • 當某個 Topic 增加分區數量時,同樣還是由控制器負責分區的重新分配。

在 Kafka 中還有兩個特別重要的概念—主題(Topic)與分區(Partition)

Topic:
可以理解為一個隊列,生產者和消費者在隊列的兩端,一個輸出數據,一個消費數據,它們面向的都是一個 topic;

Partition:
為了實現擴展性,一個數據量非常大的 topic 可以分佈到多個 broker(即服務器)上,一個 topic 可以分為多個 partition,每個 partition 是一個有序的隊列;那麼 topic 的併發度基本等於 partition 的個數。

Kafka 中的消息以主題為單位進行歸類,生產者負責將消息發送到特定的主題(發送到 Kafka 集羣中的每一條消息都要指定一個主題),而消費者負責訂閲主題並進行消費。

主題是一個邏輯上的概念,它還可以細分為多個分區,一個分區只屬於單個主題,很多時候也會把分區稱為主題分區(Topic-Partition)。同一主題下的不同分區包含的消息是不同的,分區在存儲層面可以看作一個可追加的日誌(Log)文件,消息在被追加到分區日誌文件的時候都會分配一個特定的偏移量(offset)。

offset 是消息在分區中的唯一標識,Kafka 通過它來保證消息在分區內的順序性,不過 offset 並不跨越分區,也就是説,Kafka 保證的是分區有序而不是主題有序。

圖片

如上圖所示: 主題中有 4 個分區,消息被順序追加到每個分區日誌文件的尾部。Kafka 中的分區可以分佈在不同的服務器(broker)上,也就是説,一個主題可以橫跨多個 broker,以此來提供比單個 broker 更強大的性能。

每一條消息被髮送到 broker 之前,會根據分區規則選擇存儲到哪個具體的分區。如果分區規則設定得合理,所有的消息都可以均勻地分配到不同的分區中。如果一個主題只對應一個文件,那麼這個文件所在的機器 I/O 將會成為這個主題的性能瓶頸,而分區解決了這個問題。在創建主題的時候可以通過指定的參數來設置分區的個數,當然也可以在主題創建完成之後去修改分區的數量,通過增加分區的數量可以實現水平擴展。

Replica:

Kafka 為分區引入了多副本(Replica)機制,通過增加副本數量可以提升容災能力。

同一分區的不同副本中保存的是相同的消息(在同一時刻,副本之間並非完全一樣),副本之間是“一主多從”的關係,其中 leader 副本負責處理讀寫請求,follower 副本只負責與 leader 副本的消息同步。副本處於不同的 broker 中,當 leader 副本出現故障時,從 follower 副本中重新選舉新的 leader 副本對外提供服務。Kafka 通過多副本機制實現了故障的自動轉移,當 Kafka 集羣中某個 broker 失效時仍然能保證服務可用。

圖片

如上圖所示: Kafka 集羣中有 4 個 broker,某個主題中有 3 個分區,且副本因子(即副本個數)也為 3,如此每個分區便有 1 個 leader 副本和 2 個 follower 副本。生產者和消費者只與 leader 副本進行交互,而 follower 副本只負責消息的同步,很多時候 follower 副本中的消息相對 leader 副本而言會有一定的滯後。

Kafka 消費端也具備一定的容災能力。Consumer 使用拉(Pull)模式從服務端拉取消息,並且保存消費的具體位置,當消費者宕機後恢復上線時可以根據之前保存的消費位置重新拉取需要的消息進行消費,這樣就不會造成消息丟失。

Kafka 寫流程

圖片

  1. 連接 zk 集羣,從 zk 中拿到對應的 topic 的 partition 信息和 partition 的 leader 的相關信息。注:Kafka 2.8.0 已移出對 zookeeper 的依賴。
  2. 向對應 broker 發消息
  3. 客户端在發送消息時,必須指定消息所屬的 Topic 和消息值 Value,此外還可以指定消息所屬的 Partition 以及消息的 Key。
  4. 對消息做序列化處理
  5. 如果消息記錄中指定了 Partition,則 Partitioner 不做任何事情;否則,Partitioner 根據消息的 key 得到一個 Partition。這是生產者就知道向哪個 Topic 下的哪個 Partition 發送這條消息。
  6. 消息被添加到相應的 batch 中,獨立的線程將這些 batch 發送到 Broker 上(注意,消息不是一條一條發往 broker 的,而是會在 客户端本地緩存一批數量後,在發出去,因此客户端是以 批-batch 為單位發送消息的,即一批當中包含一條或多條消息;同樣,broker 也是以批為單位進行數據存儲的,後面會講到 )。
  7. broker 收到消息會返回一個響應。如果消息成功寫入 Kafka,則返回成功信息,內容包含了 Topic 信息、Patition 信息、消息在 Partition 中的 Offset 信息;若失敗,返回一個錯誤。

更多: 想了解更多關於:大數據運維相關的系統環境準備、基礎環境安裝、集羣部署以及應用組件安裝等全方位的技術的問題。如:從環境搭建/集羣部署,內存擴容/問題排查,數據遷移等問題都可以來找我

Kafka 讀流程

  1. 連接 zk 集羣,從 zk 中拿到對應的 topic 的 partition 信息和 partition 的 leader 的相關信息
  2. 連接到對應的 leader 對應的 broker
  3. consumer 通過請求將希望讀取的 topic、partition 以及對應的 offset 發送給 leaderleader 根據 offset 等信息定位到 segment(索引文件和日誌文件)
  4. 根據索引文件中的內容,定位到日誌文件中該偏移量對應的開始位置讀取相應長度的數據並返回給 consumerKafka 運維Kafka 的命令行工具路徑:xxx/kafka/bin/下

圖片

Topic 管理指令

可以管理 Topic ,包括 創建、刪除、分區擴容、查詢 Topic 詳細信息、查看 Topic 列表等。

命令工具:kafka-topics.sh

# 創建 Topic:
kafka-topics.sh --create --zookeeper  localhost:2181 --replication-factor 3 --partitions 3 --topic test
 
# Topic 分區擴容
kafka-topics.sh --zookeeper  localhost:2181 --alter --topic test --partitions 4
 
# 刪除 Topic:
kafka-topics.sh --delete --zookeeper  localhost:2181 localhost:9092 --topic test
 
#查詢 Topic 詳細信息
[DEV (v.v) sa_cluster@hybrid03 bin]$ ./kafka-topics.sh --topic event_topic --zookeeper  localhost:2181 --describe
Topic:event_topic   PartitionCount:10   ReplicationFactor:2 Configs:compression.type=gzip
    Topic: event_topic  Partition: 0    Leader: 1001    Replicas: 1001,1003 Isr: 1001,1003
    Topic: event_topic  Partition: 1    Leader: 1003    Replicas: 1003,1002 Isr: 1003,1002
    Topic: event_topic  Partition: 2    Leader: 1002    Replicas: 1002,1001 Isr: 1002,1001
    Topic: event_topic  Partition: 3    Leader: 1001    Replicas: 1001,1002 Isr: 1001,1002
    Topic: event_topic  Partition: 4    Leader: 1003    Replicas: 1003,1001 Isr: 1003,1001
    Topic: event_topic  Partition: 5    Leader: 1002    Replicas: 1002,1003 Isr: 1002,1003
    Topic: event_topic  Partition: 6    Leader: 1001    Replicas: 1001,1003 Isr: 1001,1003
    Topic: event_topic  Partition: 7    Leader: 1003    Replicas: 1003,1002 Isr: 1003,1002
    Topic: event_topic  Partition: 8    Leader: 1002    Replicas: 1002,1001 Isr: 1002,1001
    Topic: event_topic  Partition: 9    Leader: 1001    Replicas: 1001,1002 Isr: 1001,1002
 
#列出全部 Topic
kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal

增刪節點後的數據均衡

增加數據節點後,雖然新節點上已經啓動了 broker ,但 kafka 不會自動均衡數據,需要手動執行。

命令工具:kafka-reassign-partitions.sh

更多: 想了解更多關於:大數據運維相關的系統環境準備、基礎環境安裝、集羣部署以及應用組件安裝等全方位的技術的問題。如:從環境搭建/集羣部署,內存擴容/問題排查,數據遷移等問題都可以來找我

編寫配置文件 move-json-file.json ,告訴 kafka 你希望哪些 topic 要重新分區:

{
    "topics": [{
            "topic": "event_topic"
        },
        {
            "topic": "profile_topic"
        },
        {
            "topic": "item_topic"
        }
    ],
    "version": 1
}

執行命令生成分配信息: 要注意的是,此時分區移動尚未開始,它只是告訴你當前的分配和建議。保存當前分配,以防你想要回滾它。

# 下面 --broker-list 參數 對應的是 brokerid
[DEV (v.v) cluster@hybrid03 bin]$ ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file ~/mv.json --broker-list "1001,1002" --generate
Current partition replica assignment #當前分配信息
{"version":1,"partitions":[{"topic":"event_topic","partition":2,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":8,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":3,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":6,"replicas":[1001,1003],"log_dirs":["any","any"]},{"topic":"event_topic","partition":9,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"item_topic","partition":0,"replicas":[1001,1003],"log_dirs":["any","any"]},{"topic":"event_topic","partition":0,"replicas":[1001,1003],"log_dirs":["any","any"]},{"topic":"event_topic","partition":5,"replicas":[1002,1003],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":2,"replicas":[1001,1003],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":1,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":4,"replicas":[1003,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":1,"replicas":[1003,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":7,"replicas":[1003,1002],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":0,"replicas":[1003,1002],"log_dirs":["any","any"]}]}
 
Proposed partition reassignment configuration #分配後的信息
{"version":1,"partitions":[{"topic":"event_topic","partition":7,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":1,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":1,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"item_topic","partition":0,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":4,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":9,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":6,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":3,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":8,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":0,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":0,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":5,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":2,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":2,"replicas":[1001,1002],"log_dirs":["any","any"]}]}

將上面得到期望的重新分配方式文件保存在一個 json 文件裏面:reassignment-json-file.json,然後通過參數 —execute 執行分配:

該命令也可以用於以下使用場景:

  • 給分區增加副本,你只需要在 第 2 步生成的內容裏面, 在 replicas 參數中加入你想要增加的 副本所在 broker id 信息即可,這樣執行的時候會自動在 對應 broker 上創建副本。
  • 重新分配分區

消費情況指令

查看 group 的消費情況

# group: 指定group id名字
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092  --describe --group test-group

# 示例:
# TOPIC: group對應的topic
# PARTITION:aprtition編號,從0開始0-5表示有6個partition
# CURRENT-OFFSET:此消費着當前已消費的offset
# LOG-END-OFFSET:生產者在此partition分區上已提交確認的offset
# LAG:兩個offset的差值,就是常説的積壓。此數值過大為異常。
# HOST:消費者所在的服務器ip 
# CLIENT-ID:消費者的信息
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group  test-group
2.刪除group

刪除 group

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --delete --group test-group

重新設置消費者位移

Earliest策略:把位移調整到當前最早位移處

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute

Latest策略:把位移調整到當前最新位移處

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute
Current策略:把位移調整到當前最新提交位移處

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute
Specified-Offset策略:把位移調整到指定位移處

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute
Shift-By-N策略:把位移調整到當前位移+N處(N可以是負值)

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group  --topic test --reset-offsets --shift-by <offset_N> --execute
DateTime策略:(把位移調整到大於給定時間的最小位移處)
時間需要減8
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group   --topic test --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute
Duration策略:把位移調整到距離當前時間指定間隔的位移處,然後將位移調整到距離當前給定時間間隔的位移處,具體格式是 PnDTnHnMnS。
    以字母 P 開頭,後面由 4 部分組成,即 D、H、M 和 S,分別表示天、小時、分鐘和秒。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute

設置 Topic 過期時間

# 設置 topic 過期時間(單位 毫秒)
### 3600000 毫秒 = 1小時
./bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter --entity-name topic-devops-elk-log-hechuan-huanbao --entity-type topics --add-config retention.ms=3600000

# 查看 topic 配置
./bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --describe --entity-name topic-devops-elk-log-hechuan-huanbao --entity-type topics

工具相關

使用腳本生產/消費消息

# 連接到test-topic,然後通過輸入+會車生產消息
$ bin/kafka-console-producer.sh --broker-list kafka-host:port --topic test-topic  --producer-property
>

# --from-beginning: 指定從開始消費消息,否則會從最新的地方開始消費消息
$ bin/kafka-console-consumer.sh --bootstrap-server kafka-host:port --topic test-topic --group test-group --from-beginning --consumer-property 

kafka 性能測試

# 測試生產者
# 向指定主題發送了 1 千萬條消息,每條消息大小是 1KB
# 它會打印出測試生產者的吞吐量 (MB/s)、消息發送延時以及各種分位數下的延時
$ bin/kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4

2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency.
4190124 records sent, 838024.8 records/sec (818.38 MB/sec), 4.4 ms avg latency, 73.0 ms max latency.
10000000 records sent, 737463.126844 records/sec (720.18 MB/sec), 31.81 ms avg latency, 681.00 ms max latency, 4 ms 50th, 126 ms 95th, 604 ms 99th, 672 ms 99.9th.

# 測試消費者性能

$ bin/kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic test-topic
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2019-06-26 15:24:18:138, 2019-06-26 15:24:23:805, 9765.6202, 1723.2434, 10000000, 1764602.0822, 16, 5651, 1728.1225, 1769598.3012

Kafka 常用性能調優

磁盤目錄優化

kafka 讀寫的單位是 partition,因此將一個 topic 拆分為多個 partition 可以提高吞吐量。但是這裏有個前提,就是不同 partition 需要位於不同的磁盤(可以在同一個機器)。如果多個 partition 位於同一個磁盤,那麼意味着有多個進程同時對一個磁盤的多個文件進行讀寫,使得操作系統會對磁盤讀寫進行頻繁調度,也就是破壞了磁盤讀寫的連續性。

説明: 如果你想了解更多關於:大數據運維相關的系統環境準備、基礎環境安裝、集羣部署以及應用組件安裝等全方位的技術的問題。例如:從環境搭建/集羣部署,內存擴容/問題排查,數據遷移等助你輕鬆應對數據管理的複雜性。可以聯繫我:15928721005

JVM 參數配置

推薦使用最新的 G1 來代替 CMS 作為垃圾回收器。推薦 Java 使用的最低版本為 JDK 1.7u51。

G1 相比較於 CMS 的優勢:

  • G1 是一種適用於服務器端的垃圾回收器,很好的平衡了吞吐量和響應能力
  • 對於內存的劃分方法不同,Eden, Survivor, Old 區域不再固定,使用內存會更高效。
  • G1 通過對內存進行 Region 的劃分,有效避免了內存碎片問題。
  • G1 可以指定 GC 時可用於暫停線程的時間(不保證嚴格遵守)。
  • 而 CMS 並不提供可控選項。
  • CMS 只有在 FullGC 之後會重新合併壓縮內存,而 G1 把回收和合並集合在一起。
  • CMS 只能使用在 Old 區,在清理 Young 時一般是配合使用 ParNew,而 G1 可以統一兩類分區的回收算法。

日誌數據刷盤策略

為了大幅度提高 producer 寫入吞吐量,需要定期批量寫文件。

有 2 個參數可配置:

  • log.flush.interval.messages = 100000:
    每當 producer 寫入 100000 條數據時,就把數據刷到磁盤
  • log.flush.interval.ms=1000:
    每隔 1 秒,就刷一次盤

日誌保留時間

當 kafka server 的被寫入海量消息後,會生成很多數據文件,且佔用大量磁盤空間,如果不及時清理,可能導致磁盤空間不夠用,kafka 默認是保留 7 天。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.