知識庫 / Spring WebFlux RSS 訂閱

使用反應式 Kafka 流和 Spring WebFlux 進行開發

Reactive,Spring WebFlux
HongKong
7
10:52 AM · Dec 06 ,2025

1. 概述

本文將探討 Reactive Kafka Streams,並將其集成到示例的 Spring WebFlux 應用程序中,同時分析這種組合如何使我們能夠構建具有可擴展性、效率和實時處理能力的完全響應式、數據密集型應用程序。

為了實現這一目標,我們將使用 Spring Cloud Stream Reactive Kafka BinderSpring WebFlux 以及 ClickHouse

2. Spring Cloud Stream Reactive Kafka Binder

Spring Cloud Stream 提供了一個對基於流和消息驅動的微服務的抽象層。Reactive Kafka Binder 允許通過連接 Kafka 主題、消息代理或 Spring Cloud Stream 應用程序,創建完全反應式的流水線。這些流水線利用 Project Reactor 處理數據流,從而實現非阻塞、異步和具有反壓意識的流處理。

與傳統的 Kafka Streams 不同,具有同步操作的 Reactive Kafka Streams 允許開發人員定義端到端的反應式流水線,其中每個數據元素都可以實時地映射、轉換、過濾或減少,同時仍能保持高效的資源利用率。

這種方法尤其適用於高吞吐量、事件驅動型應用程序,這些應用程序需要反應式範例以實現更好的可擴展性和響應性。

2.1. 使用 Spring 構建反應式 Kafka 流

藉助 Spring Cloud Stream 反應式 Kafka Binder,我們可以無縫地將反應式 Kafka 流集成到 Spring WebFlux 應用程序中,從而實現完全的反應式、非阻塞數據處理。通過利用 Project Reactor 提供的反應式 API,我們可以處理背壓、實現異步數據流,並在不阻塞線程的情況下高效地處理流。

這種反應式 Kafka 流與 Spring WebFlux 的結合,為構建需要分佈式、實時和反應式數據管道的應用提供了一個強大的解決方案。

接下來,讓我們來看一個示例應用程序,以展示這些功能。

3. 構建一個反應式 Kafka 流應用

在本示例應用程序中,我們將模擬一個股票分析應用程序,該應用程序接收、處理和分發股票價格數據。 該應用程序將展示 Spring Cloud Stream、Kafka 和反應式編程範式如何在 Spring 生態系統中協同工作。

首先,讓我們使用 Spring Boot 獲取構建此類應用程序所需的全部依賴項:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>2023.0.2</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

為了這個示例,我們將使用 Spring Cloud BOM,它解決了所有依賴項的版本問題。 我們還將使用 Spring Boot 以及以下模塊:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka-reactive</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

這些模塊使我們能夠構建我們的 Web 層和數據攝取管道,實現反應式開發。儘管我們有數據處理管道,但仍需要一些數據持久化來保存其結果。讓我們使用一個簡單而強大的分析數據庫來實現這一目的:

<dependency>
    <groupId>com.clickhouse</groupId>
    <artifactId>clickhouse-r2dbc</artifactId>
    <version>0.7.1</version>
</dependency>

ClickHouse 是一個快速、開源、列式數據庫管理系統,它使用 SQL 查詢生成實時分析數據報告。鑑於我們旨在構建一個完全響應式應用程序,因此我們將使用它的 R2DB 驅動程序

3.1. 反應式 Kafka Producer 設置

為了啓動我們的數據處理管道,我們需要一個 Producer,負責創建數據並將其提交給我們的應用程序以供數據攝取。接下來,我們將看到 Spring 如何幫助我們輕鬆定義和使用我們的 Producer:

@Component
public class StockPriceProducer {
    public static final String[] STOCKS = {"AAPL", "GOOG", "MSFT", "AMZN", "TSLA"};
    private static final String CURRENCY = "USD";

    private final ReactiveKafkaProducerTemplate<String, StockUpdate> kafkaProducer;
    private final NewTopic topic;
    private final Random random = new Random();

    public StockPriceProducer(KafkaProperties properties, 
                              @Qualifier(TopicConfig.STOCK_PRICES_IN) NewTopic topic) {
        this.kafkaProducer = new ReactiveKafkaProducerTemplate<>(
          SenderOptions.create(properties.buildProducerProperties())
        );
        this.topic = topic;
    }

    public Flux<SenderResult<Void>> produceStockPrices(int count) {
        return Flux.range(0, count)
          .map(i -> {
              String stock = STOCKS[random.nextInt(STOCKS.length)];
              double price = 100 + (200 * random.nextDouble());
              return MessageBuilder.withPayload(new StockUpdate(stock, price, CURRENCY, Instant.now()))
                .setHeader(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .build();
          })
          .flatMap(stock -> {
              var newRecord = new ProducerRecord<>(
                topic.name(), 
                stock.getPayload().symbol(), 
                stock.getPayload());

              stock.getHeaders()
                .forEach((key, value) -> newRecord.headers().add(key, value.toString().getBytes()));

              return kafkaProducer.send(newRecord);
          });
    }
}

此類負責生成股票價格更新並將其發送到我們的 Kafka 主題。

StockPriceProducer 類中,我們注入了在應用程序 YAML 文件中定義的 KafkaProperties,其中包含連接到我們的 Kafka 集羣所需的所有信息:

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    properties:
      spring:
        json:
          trusted:
            packages: '*'

然後,NewTopic 引用了我們的 Kafka 主題,這正是我們創建 ReactiveKafkaProducerTemplate 實例所需要的。該類抽象了我們應用程序與其 Kafka 主題之間通信中大部分的複雜性。

produceStockPrices() 方法中,我們生成 StockUpdate 對象並將它們封裝在 Message 對象中。 Spring 提供 Message 類,該類封裝了基於消息的系統細節,例如消息負載和任何我們可能需要作為消息內容類型包含的必要標頭。 最後,我們創建一個 ProducerRecord,用於定義消息的目標主題及其分區鍵,然後發送它。

3.2. 反應式 Kafka Streams 設置

現在,假設生產者位於同一應用程序之外。我們需要連接到股票價格更新主題,並將股票價格從美元轉換為歐元,以便其他應用程序部分可以使用數據。同時,我們需要保存原始股票價格在特定時間窗口內的歷史記錄。因此,讓我們配置我們的數據流管道:

spring:
  cloud:
    stream:
      default-binder: kafka
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        default:
          content-type: application/json
        processStockPrices-in-0:
          destination: stock-prices-in
          group: live-stock-consumers-x
        processStockPrices-out-0:
          destination: stock-prices-out
          group: live-stock-consumers-y
          producer:
            useNativeEncoding: true

首先,我們使用默認綁定屬性來定義 Kafka 作為我們的默認綁定。 Spring Cloud Stream 具有供應商無關性,允許我們在同一應用程序中同時使用不同的消息系統(例如 Kafka 和 RabbitMQ)如果需要。

接下來,我們配置 綁定,這些綁定充當消息系統(例如 Kafka 主題)與應用程序的生產者和消費者之間的橋樑:

  • 輸入通道 processStockPrices-in-0 綁定到 stock-prices-in 主題,其中消息被消費。
  • 輸出通道 processStockPrices-out-0 綁定到 stock-prices-out 主題,其中發佈經過處理的消息。

每個綁定都與 processStockPrices() 方法相關聯,該方法從輸入通道處理數據、應用轉換並將結果發送到輸出通道。

我們還定義內容類型為 JSON,確保消息被序列化和反序列化為 JSON。 此外,在生產者中設置 useNativeEncoding: true 確保 Kafka 生產者負責編碼和序列化數據。

組屬性(例如 live-stock-consumers-x)啓用消費者之間消息負載均衡。 同組中的所有消費者負責處理主題中的消息,防止消息重複。

3.3. 反應式 Kafka Streams 綁定設置

如前所述,綁定是輸入和輸出通道之間的橋樑,允許我們處理傳輸中的數據。YAML 文件中定義的名稱至關重要,因為它必須與綁定實現相對應,在本例中,是一個將輸入消息映射到輸出消息的函數。

接下來,讓我們看看 Spring 如何做:

@Configuration
public class StockPriceProcessor {
    private static final String USD = "USD";
    private static final String EUR = "EUR";

    @Bean
    public Function<Flux<Message<StockUpdate>>, Flux<Message<StockUpdate>>> processStockPrices(
      ClickHouseRepository repository, 
      CurrencyRate currencyRate
    ) {
        return stockPrices -> stockPrices.flatMapSequential(message -> {
            StockUpdate stockUpdate = message.getPayload();
            return repository.saveStockPrice(stockUpdate)
              .flatMap(success -> Boolean.TRUE.equals(success) ? Mono.just(stockUpdate) : Mono.empty())
              .flatMap(stock -> currencyRate.convertRate(USD, EUR, stock.price()))
                .map(newPrice -> convertPrice(stockUpdate, newPrice))
                .map(priceInEuro -> MessageBuilder.withPayload(priceInEuro)
                  .setHeader(KafkaHeaders.KEY, stockUpdate.symbol())
                  .copyHeaders(message.getHeaders())
                  .build());
        });
    }

    private StockUpdate convertPrice(StockUpdate stockUpdate, double newPrice) {
        return new StockUpdate(stockUpdate.symbol(), newPrice, EUR, stockUpdate.timestamp());
    }
}

此配置演示瞭如何在兩個 Kafka 主題之間主動處理和轉換股票價格更新的方法processStockPrices() 函數將輸入主題 stock-prices-in 綁定到輸出主題 stock-prices-out,在兩者之間添加了一個處理層。 流程如下:

  1. 消息處理:每個從輸入主題接收到的 StockUpdate 消息都使用 flatMapSequential() 函數進行順序處理。 這確保了處理順序與輸入消息的順序一致,這在保持一致性方面可能很重要。
  2. 數據庫持久化:每個股票更新都使用 ClickHouseRepository 保存到數據庫中,以便將來參考。 僅成功保存的更新才能繼續進行。
  3. 貨幣轉換:股票價格(最初以美元計價)使用 CurrencyRate 服務轉換為歐元。
  4. 消息轉換:轉換後的價格被包裝在一個新的 StockUpdate 對象中,通過 KafkaHeaders.KEY 保留原始符號作為 Kafka 消息鍵。 這確保了 Kafka 主題中的正確消息分區。
  5. 反應式管道:整個流程是反應式的,利用了 Project Reactor 的非阻塞異步功能,以實現可擴展性和效率。

3.4. 輔助服務

<em>ClickHouseRepository</em><em>CurrencyRate</em> 是簡單的接口,為我們提供了一個簡單的實現,用於演示示例應用程序:

public interface CurrencyRate {
    Mono<Double> convertRate(String from, String to, double amount);
}

public interface ClickHouseRepository {
    Mono<Boolean> saveStockPrice(StockUpdate stockUpdate);
    Flux<StockUpdate> findMinuteAvgStockPrices(Instant from, Instant to);
} 

這些功能展示了應用程序在處理此類數據管道時可以應用的業務邏輯。

3.5. 反應式 Kafka Streams 消費者設置

一旦數據處理完畢併發送到輸出通道,就可以由相同的應用程序或任何其他應用程序進行消費。 這種消費者也可以使用反應式 Kafka 模板來實現:

@Component
public class StockPriceConsumer {
    private final ReactiveKafkaConsumerTemplate<String, StockUpdate> kafkaConsumerTemplate;

    public StockPriceConsumer(@NonNull KafkaProperties properties, 
                              @Qualifier(TopicConfig.STOCK_PRICES_OUT) NewTopic topic) {
        var receiverOptions = ReceiverOptions
          .<String, StockUpdate>create(properties.buildConsumerProperties())
          .subscription(List.of(topic.name()));
        this.kafkaConsumerTemplate = new ReactiveKafkaConsumerTemplate<>(receiverOptions);
    }

    @PostConstruct
    public void consume() {
       kafkaConsumerTemplate
         .receiveAutoAck()
         .doOnNext(consumerRecord -> {
             // simulate processing
             log.info(
               "received key={}, value={} from topic={}, offset={}, partition={}", consumerRecord.key(),
               consumerRecord.value(),
               consumerRecord.topic(),
               consumerRecord.offset(),
               consumerRecord.partition());
         })
         .doOnError(e -> log.error("Consumer error",  e))
         .doOnComplete(() -> log.info("Consumed all messages"))
         .subscribe();
    }
}

StockPriceConsumer 演示了以反應式方式從 stock-prices-out 主題中消費數據:

  1. 初始化:構造函數使用 YAML 配置中的 Kafka 屬性創建ReceiverOptions。它訂閲了stock-prices-out 主題並顯式地分配了所有分區。
  2. 消息處理: consume 方法使用receiveAutoAck()訂閲輸出通道processStockPrices-out-0。每個消息都記錄了鍵、值、主題、偏移量和分區詳細信息,模擬數據處理。
  3. 反應式特性: 消費者在消息到達時開始以反應式方式處理消息,利用非阻塞、具有背壓意識的處理。它還記錄錯誤doOnError()並跟蹤完成doOnComplete()

以下屬性配置了我們的消費者:

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      group-id: my-group
      properties:
        reactiveAutoCommit: true

這段消費者會主動處理 stock-prices-out 主題,並且該實現突出了使用反應式編程與 Kafka 進行高效流處理的無縫集成。

3.6. 反應式 WebFlux 應用

現在數據已成功保存到我們的數據庫中,我們可以充分地向用户提供這些信息,因為數據已緩存在我們的服務中,並且可以根據需要進行處理:

@RestController
public class StocksApi {
    private final ClickHouseRepository repository;

    @Autowired
    public StocksApi(ClickHouseRepository repository) {
        this.repository = repository;
    }

    @GetMapping("/stock-prices-out")
    public Flux<StockUpdate> getAvgStockPrices(@RequestParam("from") @NotNull Instant from,  
                                               @RequestParam("to") @NotNull Instant to) {
        if (from.isAfter(to)) {
            throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "'from' must come before 'to'");
        }

        return repository.findMinuteAvgStockPrices(from, to);
    }
}

4. 連接點

我們構建了一個完全反應式的數據處理管道,代碼量極少,連接了兩個 Kafka 主題,應用了業務邏輯,並確保了高吞吐量處理。這種方法非常適合需要實時數據轉換的事件驅動系統。Spring Cloud Stream 和 Kafka 構成了一個功能強大的組合,其功能遠超本文所涵蓋的內容

例如,綁定支持多個輸入和輸出,以及死信隊列(DLQ)可以增強管道的健壯性。 此外,還可以集成各種消息提供者,在通道之間啓用事務性處理等等。

Spring Cloud Stream 是一款用途廣泛的工具。 將其與反應式範式相結合,可以解鎖具有彈性、高吞吐量和實時處理能力的強大數據管道。 本文只是對使用反應式 Kafka Streams 和 Spring WebFlux 進行了初步的探索,還有很多值得探索的地方,但我們已經觀察到了一些關鍵優勢:

  1. 實時轉換: 允許實時轉換和豐富事件流。
  2. 反壓管理: 動態處理數據流,避免系統過載。
  3. 無縫集成: 將 Kafka 的事件驅動能力與 Spring WebFlux 的非阻塞能力相結合。
  4. 可擴展設計: 支持高吞吐量系統,並具有死信隊列(DLQ)等強大的容錯機制。

儘管這種方法提供了許多優勢,如本文所述,仍有一些需要注意的地方。

4.1. 實際陷阱與最佳實踐

雖然反應式 Kafka 管道提供了諸多優勢,但也引入了一些挑戰:

  • 反壓處理: 未能有效管理反壓會導致內存膨脹或消息丟失。我們需要在適當的情況下使用 .onBackpressureBuffer().onBackpressureDrop()
  • 序列化問題: 生產者和消費者之間 Schema 不匹配會導致反序列化失敗。 必須確保 Schema 兼容性。
  • 錯誤恢復: 必須確保適當的重試機制或使用 DLQ (Dead Letter Queue) 以有效地處理瞬態問題。
  • 資源管理: 非高效的消息處理可能導致應用程序管道不堪重負。 在這種情況下,我們可以利用 .limitRate().take() 運算符來控制我們的反應式管道內處理速率。 還可以配置 Kafka 消費者 fetch 大小和 poll 間隔以調整從 Kafka 檢索消息的速率,並避免使應用程序管道不堪重負。
  • 數據一致性: 在沒有原子操作或適當的重試處理的情況下,可能會出現不一致的數據處理。 我們可以使用 Kafka 事務來實現原子性,或者/並且編寫冪等消費者邏輯以安全地處理重試。
  • Schema 演進: 在沒有適當的版本控制的情況下演進 Schema 可能會導致兼容性問題。 我們可以使用 Schema 註冊表進行版本控制,並應用向後兼容的更改(例如,添加可選字段)。
  • 監控與可觀測性: 如果監控不足,則很難識別管道中的瓶頸或故障。 我們必須集成像 Micrometer 這樣的工具和 Grafana (或其他首選提供商) 用於指標和監控。 我們還可以為 Kafka 消息添加 trace ID 以進行分佈式跟蹤。

注意這些要點將確保我們的系統具有穩定且可擴展的數據處理管道。

5. 結論

在本文中,我們展示瞭如何通過將 Reactive Kafka Streams 與 Spring WebFlux 集成,從而實現完全響應式、數據密集型的流水線,這些流水線具有可擴展性、效率和實時處理能力。通過利用響應式範式,我們構建了 Kafka 主題之間的無縫數據流,應用了業務邏輯,並實現了高吞吐量、事件驅動的實時處理,同時代碼量極少。這種強大的組合突顯了現代響應式技術在構建健壯且可擴展的,專門為實時數據轉換而設計的系統中的潛力。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.