1. 引言
本教程將介紹使用 SSL 身份驗證將 Spring Boot 客户端連接到 Apache Kafka 代理的基本設置。
Secure Sockets Layer (SSL) 實際上已於 2015 年被 Transport Layer Security (TLS) 取代。然而,出於歷史原因,Kafka(以及 Java)仍然使用“SSL”這個術語,並且本文也將遵循這一約定。
2. SSL 概述
默認情況下,Apache Kafka 將所有數據以明文形式發送,且不進行任何身份驗證。
首先,我們可以配置 SSL 以在 Broker 和客户端之間進行加密。這默認要求使用公鑰加密的單向身份驗證,客户端驗證服務器證書。
此外,服務器還可以使用單獨的機制(如 SSL 或 SASL)驗證客户端,從而啓用雙向身份驗證或多路 TLS(mTLS)。基本上,雙向 SSL 身份驗證確保客户端和服務器都使用 SSL 證書相互驗證身份並相互信任,雙向信任
在本文中,Broker 將使用 SSL 身份驗證客户端,keystore 和 truststore 將用於存儲證書和密鑰。
每個 Broker 都需要自己的 keystore,其中包含私鑰和公證書。客户端使用其 truststore 來驗證此證書並信任服務器。 類似地,每個客户端也需要自己的 keystore,其中包含其私鑰和公證書。 服務器使用其 truststore 來驗證客户端的證書並信任客户端的證書,從而建立安全的連接。
truststore 可以包含一個證書頒發機構 (CA),該 CA 可以對證書進行簽名。 在這種情況下,Broker 或客户端信任 truststore 中存在的任何由 CA 簽名證書。 這簡化了證書身份驗證,因為添加新的客户端或 Broker 不需要更改 truststore。
3. 依賴與配置
我們的示例應用程序將是一個簡單的 Spring Boot 應用程序。
為了連接到 Kafka,請在我們的 POM 文件中添加 spring-kafka 依賴項:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.2</version>
</dependency>我們還將使用 Docker Compose 文件來配置和測試 Kafka 服務器設置。 初始階段,我們將不進行任何 SSL 配置:
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:6.2.0
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
現在,我們開始創建容器:
docker-compose up這應該會啓動交易器,使用默認配置。
4. 經紀人配置
首先,讓我們來看一下為了建立安全連接所需的最小配置要求。
4.1. 單獨代理(Standalone Broker)
雖然本示例中未使用獨立的代理實例,但瞭解啓用 SSL 身份驗證所需的配置更改仍然很有用。
首先,我們需要配置代理監聽 SSL 連接,端口為 9093,在 server.properties 中:
listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093接下來,需要配置與 keystore 和 truststore 相關的屬性,包括證書位置和憑證:
ssl.keystore.location=/certs/kafka.server.keystore.jks
ssl.keystore.password=password
ssl.truststore.location=/certs/kafka.server.truststore.jks
ssl.truststore.password=password
ssl.key.password=password最後,代理必須配置為驗證客户端,以實現雙向認證:
ssl.client.auth=required4.2. Docker Compose
由於我們使用 Compose 來管理我們的 broker 環境,讓我們將上述所有屬性添加到我們的 docker-compose.yml 文件中:
kafka:
image: confluentinc/cp-kafka:6.2.0
depends_on:
- zookeeper
ports:
- 9092:9092
- 9093:9093
environment:
...
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,SSL://localhost:9093
KAFKA_SSL_CLIENT_AUTH: 'required'
KAFKA_SSL_KEYSTORE_FILENAME: '/certs/kafka.server.keystore.jks'
KAFKA_SSL_KEYSTORE_CREDENTIALS: '/certs/kafka_keystore_credentials'
KAFKA_SSL_KEY_CREDENTIALS: '/certs/kafka_sslkey_credentials'
KAFKA_SSL_TRUSTSTORE_FILENAME: '/certs/kafka.server.truststore.jks'
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: '/certs/kafka_truststore_credentials'
volumes:
- ./certs/:/etc/kafka/secrets/certs在這裏,我們已在配置的 ports部分暴露了 SSL 端口 (9093)。 此外,我們還已將 certs項目文件夾掛載到配置的 volumes部分。 這包含所需的證書和相關憑據。
現在,使用 Compose 重啓堆棧時,相關 SSL 詳細信息將顯示在 broker 日誌中:
...
kafka_1 | uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
kafka_1 | ===> Configuring ...
kafka_1 | SSL is enabled.
....
kafka_1 | [2021-08-20 22:45:10,772] INFO KafkaConfig values:
kafka_1 | advertised.listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093
kafka_1 | ssl.client.auth = required
kafka_1 | ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
kafka_1 | ssl.endpoint.identification.algorithm = https
kafka_1 | ssl.key.password = [hidden]
kafka_1 | ssl.keymanager.algorithm = SunX509
kafka_1 | ssl.keystore.location = /etc/kafka/secrets/certs/kafka.server.keystore.jks
kafka_1 | ssl.keystore.password = [hidden]
kafka_1 | ssl.keystore.type = JKS
kafka_1 | ssl.principal.mapping.rules = DEFAULT
kafka_1 | ssl.protocol = TLSv1.3
kafka_1 | ssl.trustmanager.algorithm = PKIX
kafka_1 | ssl.truststore.certificates = null
kafka_1 | ssl.truststore.location = /etc/kafka/secrets/certs/kafka.server.truststore.jks
kafka_1 | ssl.truststore.password = [hidden]
kafka_1 | ssl.truststore.type = JKS
....5. Spring Boot 客户端
現在服務器設置已完成,我們將創建所需的 Spring Boot 組件。這些組件將與現在需要 SSL 進行雙向身份驗證的我們的消息代理進行交互。
5.1. Producer
首先,使用 KafkaTemplate 發送消息到指定的 topic:
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message, String topic) {
log.info("Producing message: {}", message);
kafkaTemplate.send(topic, "key", message)
.addCallback(
result -> log.info("Message sent to topic: {}", message),
ex -> log.error("Failed to send message", ex)
);
}
}send 方法是一個異步操作。因此,我們附加了一個簡單的回調函數,該函數在消息代理接收到消息後,僅會記錄一些信息。
5.2. 消費者
接下來,我們創建一個簡單的消費者,使用 <a href="https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/KafkaListener.html">@KafkaListener</a> 註解。它連接到消息代理並從與生產者使用的相同主題中消費消息:
public class KafkaConsumer {
public static final String TOPIC = "test-topic";
public final List<String> messages = new ArrayList<>();
@KafkaListener(topics = TOPIC)
public void receive(ConsumerRecord<String, String> consumerRecord) {
log.info("Received payload: '{}'", consumerRecord.toString());
messages.add(consumerRecord.value());
}
}在我們的演示應用程序中,我們保持了簡單,消費者僅僅將消息存儲在一個 List 中。在實際的真實世界系統中,消費者接收消息並根據應用程序的業務邏輯進行處理。
5.3. 配置
最後,讓我們為我們的 application.yml 添加必要的配置:
spring:
kafka:
security:
protocol: "SSL"
bootstrap-servers: localhost:9093
ssl:
trust-store-location: classpath:/client-certs/kafka.client.truststore.jks
trust-store-password: <password>
key-store-location: classpath:/client-certs/kafka.client.keystore.jks
key-store-password: <password>
# additional config for producer/consumer 在這裏,我們設置了 Spring Boot 提供的所需屬性,以配置生產者和消費者。由於這兩個組件都連接到同一個代理服務器,我們可以將所有關鍵屬性聲明在 spring.kafka 部分。但是,如果生產者和消費者連接到不同的代理服務器,我們將在 spring.kafka.producer 和 spring.kafka.consumer 部分分別指定這些屬性。
在配置的 ssl 部分,我們 指向 JKS 信任存儲以進行 Kafka 代理服務器身份驗證。其中包含由也對代理服務器證書進行簽名 CA 的證書。此外,我們還 提供了 Spring 客户端密鑰存儲的路徑,其中包含由 CA 簽名證書,該證書應在代理服務器端存在於信任存儲中。
5.4. 測試
由於我們使用 Compose 文件,我們將使用 Testcontainers 框架來創建一個端到端測試,該測試將與我們的 Producer 和 Consumer 一起使用:
@ActiveProfiles("ssl")
@Testcontainers
@SpringBootTest(classes = KafkaSslApplication.class)
class KafkaSslApplicationLiveTest {
private static final String KAFKA_SERVICE = "kafka";
private static final int SSL_PORT = 9093;
@Container
public DockerComposeContainer<?> container =
new DockerComposeContainer<>(KAFKA_COMPOSE_FILE)
.withExposedService(KAFKA_SERVICE, SSL_PORT, Wait.forListeningPort());
@Autowired
private KafkaProducer kafkaProducer;
@Autowired
private KafkaConsumer kafkaConsumer;
@Test
void givenSslIsConfigured_whenProducerSendsMessageOverSsl_thenConsumerReceivesOverSsl() {
String message = generateSampleMessage();
kafkaProducer.sendMessage(message, TOPIC);
await().atMost(Duration.ofMinutes(2))
.untilAsserted(() -> assertThat(kafkaConsumer.messages).containsExactly(message));
}
private static String generateSampleMessage() {
return UUID.randomUUID().toString();
}
}當我們運行測試時,Testcontainers 使用我們的 Compose 文件啓動 Kafka broker,包括 SSL 配置。應用程序也使用其 SSL 配置啓動,並連接到 broker 建立加密且身份驗證過的連接。由於這是一個異步的事件序列,我們使用了 Awaitlity 來輪詢消費者消息存儲中期望的消息。這驗證了所有配置以及 broker 和客户端之間成功的雙向身份驗證。
6. 結論
在本文中,我們介紹了 Kafka 代理與 Spring Boot 客户端之間 SSL 身份驗證設置的基礎知識。
最初,我們探討了啓用雙向身份驗證所需的代理設置。然後,我們研究了客户端端配置,以便通過加密且身份驗證的連接與代理建立連接。最後,我們使用集成測試來驗證代理和客户端之間的安全連接。