知識庫 / Spring WebFlux RSS 訂閱

Spring WebFlux 反壓機制

Reactive,Spring WebFlux
HongKong
10
12:35 PM · Dec 06 ,2025

1. 引言

Spring WebFlux 提供反應式編程,用於 Web 應用程序。 反應式設計的異步和非阻塞特性可以提高性能並降低內存使用率。 Project Reactor 提供這些能力,以高效地管理數據流。

然而,背壓(backpressure)是這類應用程序中常見的難題。 在本教程中,我們將解釋它的含義以及如何在 Spring WebFlux 中應用背壓機制以緩解它。

2. 反應式流中的反壓機制

由於反應式編程的非阻塞特性,服務器不會一次性發送整個流。它可以在數據可用時併發地推送數據。因此,客户端等待接收和處理事件的時間更短。但是,仍然存在需要克服的問題。

在軟件系統中,反壓是指控制流量的能力。 換句話説,信息發射器通過向消費者推送超出其處理能力的數據來“壓倒”消費者。

最終,人們也將其作為一種機制來控制和處理這種情況。它指的是系統採取的保護性措施,以控制下游流量。

2.1. 什麼是背壓?

在反應式流中,背壓也定義瞭如何調節流元素傳輸的方式。換句話説,它控制接收方可以消費的元素數量。

讓我們通過一個例子來清晰地説明其含義:

  • 系統包含三個服務:發佈者、消費者和圖形用户界面(GUI)
  • 發佈者每秒向消費者發送 10000 個事件
  • 消費者處理這些事件並將結果發送到 GUI
  • GUI 將結果顯示給用户
  • 消費者只能處理每秒 7500 個事件

在該速度下,消費者無法處理事件(背壓)。 否則,系統將崩潰,用户將無法看到結果。

2.2. 使用背壓防止系統性故障

建議採用背壓策略以防止系統性故障。目標是高效地管理接收到的額外事件:

  1. 控制發送的數據流將是首選方案。基本上,發佈者需要減緩事件的發送速度,從而避免消費者過載。然而,這並不總是可行的,我們需要尋找其他可用的選項。
  2. 緩衝額外的數據量是第二種選擇。通過這種方法,消費者臨時存儲剩餘事件,直到可以處理它們。主要的缺點是緩衝區未綁定,導致內存崩潰。
  3. 丟棄額外的事件,丟失它們。即使這種解決方案並非理想,但通過這種技術,系統將不會崩潰。

2.3. 控制背壓

我們將重點關注發佈者發出的事件的控制。基本上,有三種策略可以遵循:

  • 僅在訂閲者請求時發送新事件。 這是一個拉策略,在客户端請求時收集元素
  • 限制客户端接收的事件數量。 這種策略類似於有限的推策略,發佈者一次最多可以向客户端發送一定數量的項目
  • 取消數據流當消費者無法處理更多事件時。 在這種情況下,接收器可以隨時中止傳輸並稍後重新訂閲流

 

3. 處理 Spring WebFlux 中的反壓

Spring WebFlux 提供了一種異步非阻塞的反應式流流程。 Spring WebFlux 中負責反壓的庫是 Project Reactor。 它內部使用 Flux 功能,將機制應用於發射器產生的事件。

WebFlux 使用 TCP 流量控制來調節反壓,以字節為單位。 但它不處理消費者可以接收的邏輯元素。 讓我們看看在底層發生交互流程:

  1. WebFlux 框架負責將事件轉換為字節,以便通過 TCP 進行傳輸/接收。
  2. 可能會發生消費者在請求下一個邏輯元素之前啓動並開始一個長時間運行的任務。
  3. 在接收器正在處理事件的同時,WebFlux 在不發出任何確認信息的情況下,將字節排入隊列,因為沒有對新事件的需求。
  4. 由於 TCP 協議的特性,如果發佈者有新的事件,它將繼續將其發送到網絡。

 

結論是,上面的圖表表明,消費者和發佈者之間的邏輯需求可能不同。 Spring WebFlux 並不能理想地管理整個系統中的服務之間的反壓。 它通過獨立地處理消費者,然後以相同的方式處理髮布者來處理它。 但它沒有考慮到這兩個服務之間的邏輯需求。

因此,Spring WebFlux 的反壓處理方式與我們期望的不太一樣。 在下一部分,我們將看到如何在 Spring WebFlux 中實現反壓機制!

4. 使用 Spring WebFlux 實現背壓機制

我們將使用 Flux 實現 來處理接收到的事件的控制。因此,我們將同時在讀取端和寫入端暴露請求和響應體,並提供背壓支持。生產者將減慢或停止,直到消費者的處理能力釋放出來。 讓我們看看如何做到這一點!

4.1. 依賴項

為了實現示例,我們將簡單地添加 Spring WebFlux starterReactor test 依賴項到我們的 pom.xml 中:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

4.2. 請求

第一種選擇是 賦予消費者對其可以處理的事件的控制權。因此,發佈者會等待接收者發出新的事件請求。總而言之,客户端訂閲 Flux,然後根據自身需求處理事件:

@Test
public void whenRequestingChunks10_thenMessagesAreReceived() {
    Flux request = Flux.range(1, 50);

    request.subscribe(
      System.out::println,
      err -> err.printStackTrace(),
      () -> System.out.println("All 50 items have been successfully processed!!!"),
      subscription -> {
          for (int i = 0; i < 5; i++) {
              System.out.println("Requesting the next 10 elements!!!");
              subscription.request(10);
          }
      }
    );

    StepVerifier.create(request)
      .expectSubscription()
      .thenRequest(10)
      .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
      .thenRequest(10)
      .expectNext(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
      .thenRequest(10)
      .expectNext(21, 22, 23, 24, 25, 26, 27 , 28, 29 ,30)
      .thenRequest(10)
      .expectNext(31, 32, 33, 34, 35, 36, 37 , 38, 39 ,40)
      .thenRequest(10)
      .expectNext(41, 42, 43, 44, 45, 46, 47 , 48, 49 ,50)
      .verifyComplete();

採用這種方法,發射器不會過度壓載接收器。換句話説,客户端在處理其所需事件時處於可控狀態。

我們將使用 StepVerifier 測試生產者在背壓下的行為。我們將在 thenRequest(n) 調用時,僅期望接收下一個 n 個項目。

4.3. 限制

使用 Project Reactor 中的 limitRange() 操作符是另一種選擇。 它允許一次預取項目數量。 一個有趣的特性是,限制在訂閲者請求更多事件進行處理時仍然生效。 發射器將事件分成塊,從而避免在每次請求中消耗超過限制:

@Test
public void whenLimitRateSet_thenSplitIntoChunks() throws InterruptedException {
    Flux<Integer> limit = Flux.range(1, 25);

    limit.limitRate(10);
    limit.subscribe(
      value -> System.out.println(value),
      err -> err.printStackTrace(),
      () -> System.out.println("Finished!!"),
      subscription -> subscription.request(15)
    );

    StepVerifier.create(limit)
      .expectSubscription()
      .thenRequest(15)
      .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
      .expectNext(11, 12, 13, 14, 15)
      .thenRequest(10)
      .expectNext(16, 17, 18, 19, 20, 21, 22, 23, 24, 25)
      .verifyComplete();
}

4.4. 取消

最終,消費者可以隨時取消事件以接收新的事件。對於本示例,我們將採用另一種方法。Project Reactor 允許我們實現自己的 Subscriber 或擴展 BaseSubscriber 類。 讓我們看看接收器如何隨時覆蓋上述類來中止接收新事件:

@Test
public void whenCancel_thenSubscriptionFinished() {
    Flux<Integer> cancel = Flux.range(1, 10).log();

    cancel.subscribe(new BaseSubscriber<Integer>() {
        @Override
        protected void hookOnNext(Integer value) {
            request(3);
            System.out.println(value);
            cancel();
        }
    });

    StepVerifier.create(cancel)
      .expectNext(1, 2, 3)
      .thenCancel()
      .verify();
}

5. 結論

在本教程中,我們介紹了 Reactive 編程中的背壓概念以及如何避免它。 Spring WebFlux 通過 Project Reactor 支持背壓,從而能夠在發佈者向消費者發送過多的事件時,提供可用性、健壯性和穩定性。 簡而言之,它能夠防止因高負載導致的系統性故障。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.