1. 概述
Apache Kafka 是一個開源、容錯性強且可擴展性極高的流式平台。它採用發佈-訂閲架構,用於實時流式傳輸數據。通過將數據放入隊列,我們可以處理高吞吐量的海量數據,並實現極低的延遲處理。有時,我們需要將 JSON 數據類型發送到 Kafka 主題,以便進行數據處理和分析。
在本教程中,我們將學習如何將 JSON 數據流式傳輸到 Kafka 主題。此外,我們還將探討如何配置 Kafka 生產者和消費者,用於 JSON 數據。
2. JSON 數據在 Kafka 中的重要性
從架構上講,Kafka 支持其系統中的消息流。因此,我們也可以將 JSON 數據發送到 Kafka 服務器。如今,在現代應用程序系統中,每個應用程序主要處理 JSON 數據,因此使用 JSON 格式進行通信變得非常重要。 通過以 JSON 格式發送數據,可以有效地進行用户的實時活動跟蹤,以及他們在網站和應用程序上的行為。
將 JSON 類型的數據流式傳輸到 Kafka 服務器有助於進行實時數據分析。它促進了事件驅動的架構,其中每個微服務訂閲其相關的 topic 並提供實時變更。 藉助 Kafka topic 和 JSON 格式,可以輕鬆地傳遞物聯網 (IoT) 數據、在微服務之間進行通信以及聚合指標。
3. Kafka 部署
為了將 JSON 數據流式傳輸到 Kafka 服務器,首先需要設置 Kafka 代理(broker)和 ZooKeeper。 可以參考此教程來設置一個完整的 Kafka 服務器。 接下來,我們查看創建 Kafka 主題 baeldung 的命令,我們將在此主題上生產和消費 JSON 數據:
$ docker-compose exec kafka kafka-topics.sh --create --topic baeldung
--partitions 1 --replication-factor 1 --bootstrap-server kafka:9092上述命令創建了一個 Kafka 主題 baeldung,並設置了複製因子為 1。這裏我們創建了一個 Kafka 主題,複製因子僅為 1,因為僅用於演示目的。在實際場景中,我們可能需要使用多副本複製因子,因為這有助於在系統故障轉移情況下提供保障。同時,它還提供數據的高可用性和可靠性。
4. 生產數據
Kafka 生產者是整個 Kafka 生態系統中最基本的組件,它提供向 Kafka 服務器生產數據的能力。為了演示,讓我們看一下使用 docker-compose 命令啓動生產者的命令:
$ docker-compose exec kafka kafka-console-producer.sh --topic baeldung
--broker-list kafka:9092在上述命令中,我們創建了一個 Kafka producer 用於向 Kafka broker 發送消息。 此外,為了發送 JSON 數據類型,我們需要調整命令。 在繼續之前,我們首先創建一個示例 JSON 文件 sampledata.json:
{
"name": "test",
"age": 26,
"email": "[email protected]",
"city": "Bucharest",
"occupation": "Software Engineer",
"company": "Baeldung Inc.",
"interests": ["programming", "hiking", "reading"]
}上述 sampledata.json 文件包含用户的基本信息,格式為 JSON 格式。要將 JSON 數據發送到 Kafka 主題,我們需要使用 jq 庫,因為它在處理 JSON 數據方面非常強大。為了演示,我們安裝 jq 庫,以便將此 JSON 數據傳遞到 Kafka 生產者:
$ sudo apt-get install jq上述命令僅在 Linux 機器上安裝了 jq 庫。 此外,讓我們來看一下發送 JSON 數據命令:
$ jq -rc . sampledata.json | docker-compose exec -T kafka kafka-console-producer.sh --topic baeldung --broker-list kafka:9092上述命令是一個單行命令,用於在 Docker 環境中處理和流式傳輸 JSON 數據到 Kafka 主題。首先,jq 命令處理 sampledata.json 文件,然後使用 -r 選項,確保 JSON 數據處於行格式和未引號格式。之後,-c 選項確保數據以單行形式呈現,以便輕鬆地流式傳輸到相應的 Kafka 主題。
5. 消費者數據
我們已經成功地將 JSON 數據發送到 baeldung Kafka 主題。現在,讓我們來看一下用於消費該數據的命令:
$ docker-compose exec kafka kafka-console-consumer.sh --topic baeldung --from-beginning --bootstrap-server kafka:9092
{"name":"test","age":26,"email":"[email protected]","city":"Bucharest","occupation":"Software Engineer","company":"Baeldung Inc.","interests":["programming","hiking","reading"]}上述命令會消費自始至終發送到 baeldung 主題的所有數據。 在上一節中,我們發送了 JSON 數據。 因此,它也消費了這些 JSON 數據。 簡而言之,上述命令允許用户主動監控所有發送到 baeldung 主題的消息。 它通過基於 Kafka 的消息傳遞系統實現實時數據消費。
6. 結論
本文介紹瞭如何將 JSON 數據流式傳輸到 Kafka 主題。首先,我們創建了一個示例 JSON,然後使用生產者將該 JSON 傳輸到 Kafka 主題。之後,我們使用 <em >docker-compose</em> 命令消耗該數據。
簡而言之,我們涵蓋了使用 Kafka 生產者和消費者將 JSON 格式數據發送到主題的所有必要步驟。 此外,它還提供了 JSON 可以進行優雅更新,而不會影響現有數據的模式演化。