博客 / 詳情

返回

Kafka 監聽器詳解

Kafka 監聽器詳解

Kafka Assistant 是一款 Kafka GUI 管理工具——管理Broker,Topic,Group、查看消費詳情、監控服務器狀態、支持多種消息格式。

[TOC]

你需要將 advertised.listeners(如果你使用Docker鏡像,則為 KAFKA_ADVERTISED_LISTENERS)設置為外部地址(host/IP),以便客户端可以正確地連接到它。否則,他們會嘗試連接到內部主機地址--如果無法到達,問題就會接踵而來。

換一種説法,由Spencer Ruport提供。

listeners 是Kafka所綁定的接口。advertised.listeners 是客户端的連接方式。

在這篇文章中,我將談論為什麼這是有必要的配置 listenersadvertised.listeners,然後展示如何基於幾個場景--Docker和AWS來做。

是誰在監聽

Apache Kafka是一個分佈式系統。數據是從一個給定的分區的領導者那裏讀取和寫入的,這個領導者可以是集羣中的任何一個Broker。當一個客户端(生產者/消費者)啓動時,它將請求關於哪個broker是一個分區的領導者的元數據--它可以從任何broker那裏做到這一點。返回的元數據將包括該分區的領導者broker的可用端點,然後客户端將使用這些端點連接到broker,根據需要讀/寫數據。

正是這些端點給人們帶來了麻煩。在單機上,運行裸機(沒有虛擬機,沒有Docker),可能只需要使用主機名(或只是localhost),這很容易。但是,一旦你進入更復雜的網絡設置和多節點,你就必須更加註意。

讓我們假設你有一個以上的網絡。比如:

  • Docker內部網絡()加上主機
  • 雲中的Broker(如AWS EC2)和本地的企業內部機器(甚至在另一個雲中)。

你需要告訴Kafka Broker如何相互聯繫,但也要確保外部客户端(生產者/消費者)可以聯繫到他們需要聯繫的Broker。

最關鍵的是,當你運行一個客户端時,你傳遞給它的Broker只是它要去的地方,並從那裏獲得集羣中Broker的元數據。它為讀/寫數據而連接的實際主機和IP是基於Broker在初始連接中傳遞回來的數據--即使它只是一個單一的節點,而且返回的Broker與它所連接的Broker相同。

為了正確配置,你需要了解Kafka Broker可以有多個監聽器。一個監聽器是一個組合:

  • Host/IP
  • Port
  • Protocol

讓我們來看看一些配置。通常情況下,協議也被用於監聽器的名稱,但在這裏,讓我們通過使用監聽器的抽象名稱來使它變得漂亮和清晰。

KAFKA_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB

我使用的是Docker配置名稱--如果你直接配置 server.properties(例如,在AWS等),其對應名稱在以下列表中縮進顯示。

  • KAFKA_LISTENERS 是一個以逗號分隔的監聽器列表,以及Kafka綁定監聽的主機/IP和端口。對於更復雜的網絡,這可能是一個與機器上的特定網絡接口相關的IP地址。默認是0.0.0.0,這意味着在所有接口上進行監聽。

    • listeners
  • KAFKA_ADVERTISED_LISTENERS 是一個以逗號分隔的監聽器列表,包括它們的主機/IP和端口。這是傳回給客户端的元數據。

    • advertised.listeners
  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP 定義了每個監聽器名稱要使用的安全協議的鍵/值對。

    • listener.security.protocol.map

Kafka Broker之間的通信,通常是在內部網絡(如Docker網絡,AWS VPC等)。要定義使用哪個監聽器,請指定 KAFKA_INTER_BROKER_LISTENER_NAMEinter.broker.listener.name)。使用的主機/IP必須是其他人可以從Broker機器上訪問的。

Kafka客户端很可能不在Broker的網絡中,這就是額外監聽器的作用。

每個監聽器在被連接到時,都會報告它可以被接連到的地址。你到達Broker的地址取決於所使用的網絡。如果你從內部網絡連接到經紀人,它將是一個不同於外部連接的主機/IP。

當連接到Broker時,返回給客户端的監聽器將是你連接到的監聽器(基於端口)。

kafkacat是一個探索這個問題的有用工具。使用 -L,你可以看到你所連接的監聽器的元數據。基於上述相同的監聽器配置(LISTENER_BOB/LISTENER_FRED),注意觀察返回的結果:

  • 在9092端口(我們映射為 LISTENER_FRED)進行連接,Broker的地址被反饋為 localhost

    $ kafkacat -b kafka0:9092 \
           -L
    Metadata for all topics (from broker -1: kafka0:9092/bootstrap):
    1 brokers:
    broker 0 at localhost:9092
  • 在29092端口(我們將其映射為 LISTENER_BOB)進行連接,Broker的地址被反饋為kafka0。

    $ kafkacat -b kafka0:29092 \
               -L
    Metadata for all topics (from broker 0: kafka0:29092/0):
    1 brokers:
      broker 0 at kafka0:29092

你也可以用tcpdump來檢查連接到Broker服務器的客户端的流量,並發現從代理服務器返回的主機名。

為什麼我可以連接到Broker,但客户端仍然失敗?

即使你能與Broker建立初始連接,在元數據中返回的地址仍然可能是一個你的客户端無法訪問的主機名。

讓我們一步一步來分析。

  1. 我們在AWS上有一個Broker。我們想從我們的筆記本電腦向它發送一個消息。我們知道EC2實例的外部主機名(ec2-54-191-84-122.us-west-2.compute.amazonaws.com)。我們已經在安全組中創建了必要的條目,為我們的入站流量打開Broker的端口。保險起見,還要檢查我們的本地機器是否可以連接到AWS實例上的端口。

    $ nc -vz ec2-54-191-84-122.us-west-2.compute.amazonaws.com 9092
    found 0 associations
    found 1 connections:
        1:  flags=82<CONNECTED,PREFERRED>
      outif utun5
      src 172.27.230.23 port 53352
      dst 54.191.84.122 port 9092
      rank info not available
      TCP aux info available

    我們連上了9092端口,繼續

    echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
    

    這一步發生了什麼:

    • 我們的筆記本電腦成功地解析了 ec2-54-191-84-122.us-west-2.compute.amazonaws.com(到IP地址 54.191.84.122),並在9092端口連接到AWS的機器。
    • Broker在端口9092上接收入站連接。它把元數據返回給客户端,主機名是ip-172-31-18-160.us-west-2.compute.internal,因為這是Broker的主機名和監聽器的默認值。
    • 然後,客户端試圖使用它被賦予的元數據向代理髮送數據。由於ip-172-31-18-160.us-west-2.compute.internal不能從互聯網上解析,所以它失敗了。

      $ echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
      >>[2022-07-30 15:08:41,932] ERROR Error when sending message to topic test with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
      org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0: 1547 ms has passed since batch creation plus linger time
    • 但是,如果我們從Broker所在的機器上嘗試同樣的事情。

      $ echo "foo"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
      >>
      $ kafka-console-consumer --bootstrap-server ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test --from-beginning
      foo

      它工作得很好! 這是因為我們正在連接到 9092 端口,它被配置為內部監聽器,因此報告其主機名為 ip-172-31-18-160.us-west-2.compute.internal,這可以從Broker機器上解析(因為這是它自己的主機名!)。

    • 我們可以通過使用kafkacat -L標誌,看到Broker返回的元數據。

      $ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -L
      Metadata for all topics (from broker -1: ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092/bootstrap):
      1 brokers:
        broker 0 at ip-172-31-18-160.us-west-2.compute.internal:9092

      很明顯,內部主機名被返回。這也使得這個看似混亂的錯誤變得更有意義--連接到一個主機名,在另一個主機上得到一個查詢錯誤。

      $ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -C -t test
      % ERROR: Local: Host resolution failure: ip-172-31-18-160.us-west-2.compute.internal:9092/0: Failed to resolve 'ip-172-31-18-160.us-west-2.compute.internal:9092': nodename nor servname provided, or not known

      這裏,我們在本地機器上使用消費者模式(-C)的kafkacat來嘗試從主題中讀取。和以前一樣,因為我們從元數據中的Broker那裏得到了內部監聽器的主機名,所以客户端無法解析這個主機名來進行讀/寫。

修改我的hosts文件

我看到一個Stack Overflow的答案,建議直接更新我的hosts文件......這不是更簡單嗎?
這可以解決問題,而不是真正修復它。

如果Broker服務器報告了一個客户端無法連接的主機名,那麼在本地 /etc/hosts 中硬編碼主機名/IP組合可能看起來是一個很好的修復。但這是一個非常脆弱的、手動的解決方案。當IP發生變化時,當你移動主機而忘記帶配置hosts時,以及當其他人想做同樣的事情時,會發生什麼?

瞭解並實際修復你的網絡的 advertised.listeners 設置要好得多。

如何連接在Docker上的Kafka

為了在Docker中運行,你需要為Kafka配置兩個監聽器。

  • Docker網絡內的通信。這可能是Broker之間的通信以及在Docker中運行的其他組件之間的通信,如Kafka Connect或第三方客户端或生產者。對於這些通信,我們需要使用Docker容器的主機名。同一Docker網絡上的每個Docker容器將使用Kafka Broker容器的主機名來到達它。
  • 非Docker網絡流量。這可能是在Docker主機上本地運行的客户端,例如。假設他們將在localhost上連接到從Docker容器暴露出來的端口。這是Docker Compose的片段。

    • Docker網絡中的客户端使用監聽器BOB進行連接,端口為29092,主機名為kafka0。 這樣做,他們會得到要連接的主機名kafka0。每個Docker容器將使用Docker的內部網絡來解析kafka0,並能夠到達Broker。
    • Docker網絡外部(但都在同一個Host Machine裏)的客户端使用監聽器FRED連接,端口為9092,主機名為localhost。9092端口是由Docker容器暴露的,因此可以連接到。當客户端連接時,他們被賦予Broker元數據的主機名localhost,因此在讀/寫數據時連接到此。
    • 上述配置無法處理Docker外部和主機外部的客户端想要連接的情況。這是因為無論是kafka0(Docker內部的主機名)還是localhost(Docker主機的環回地址)都是無法解析的。

如何連接到AWS/IaaS上的Kafka

我命名AWS是因為它是大多數人使用的,但這適用於任何IaaS/雲解決方案。

這裏適用的概念與Docker完全相同。主要的區別是,在Docker中,外部連接很可能只是在localhost上(如上),而在雲託管的Kafka中(如AWS上),外部連接將來自非本地的機器。

另一個複雜的問題是,雖然Docker網絡與主機的網絡嚴重隔離,但在IaaS上,外部主機名往往是可以在內部解析的,這使得你可能真正遇到這些問題時,一發不可收拾。

有兩種方法,取決於你要連接到Broker服務器的外部地址是否也可以在網絡(如VPC)上的所有Broker服務器上進行本地解析。

選項1:外部地址是可以在本地解析的

在這裏,你可以用一個監聽器搞定。現有的監聽器叫 PLAINTEXT,只需要設置 advertised.listeners(即傳遞給入站客户的那個)。

advertised.listeners=PLAINTEXT://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092

現在內部和外部的連接都將使用 ec2-54-191-84-122.us-west-2.compute.amazonaws.com 進行連接。因為 ec2-54-191-84-122.us-west-2.com compute.amazonaws.com 既可以在本地也可以在外部解決,所以事情進展順利。

選項2:外部地址在本地無法解析

你將需要為Kafka配置兩個監聽器。

  • AWS網絡(VPC)內的通信。這可能是Broker之間的通信和在VPC中運行的其他組件之間的通信,如Kafka Connect或第三方客户端或生產商。對於這些通信,我們需要使用EC2機器的內部IP(或主機名,如果配置了DNS)。
  • 外部AWS流量。這可能是測試來自筆記本電腦的連接,或者只是來自不在亞馬遜託管的機器。在這兩種情況下,需要使用實例的外部IP(或主機名,如果配置了DNS)。

下面是一個例子:

listeners=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
advertised.listeners=INTERNAL://ip-172-31-18-160.us-west-2.compute.internal:19092,EXTERNAL://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092
inter.broker.listener.name=INTERNAL

原文地址:

  • Kafka Listeners – Explained
user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.