知識庫 / Spring WebFlux RSS 訂閱

取消正在進行的 Flux 在 Spring WebFlux 中

Reactive,Spring WebFlux
HongKong
5
11:41 AM · Dec 06 ,2025

1. 簡介

本文將探討 Spring WebFlux 提供的一些用於取消正在進行的 Flux 的選項。首先,我們將對 Flux 在響應式編程中的概念進行快速概述。接下來,我們將探討取消正在進行的 Flux 的必要性。

我們將研究 Spring WebFlux 提供的各種方法,用於顯式和自動取消訂閲。我們將使用 JUnit 測試驅動一個簡單的示例,以驗證系統行為是否符合預期。最後,我們將看到如何執行取消後的清理操作,從而使系統在取消後恢復到所需狀態。

讓我們從對 Flux 的快速概述開始。

2. 什麼是 Flux?

Spring WebFlux 是一個反應式 Web 框架,它提供了構建異步、非阻塞應用程序的強大功能。 Spring WebFlux 的關鍵特性之一是它能夠處理 Flux。 Flux 是一個反應式數據流,它可以發出零個或多個項目。 它可以從各種來源創建,例如數據庫查詢、網絡調用或內存集合。

在本文檔中,我們應注意到的相關術語是訂閲,它代表了數據源(即發佈者)和數據消費者(即訂閲者)之間的連接。 訂閲維護一個反映訂閲狀態(活動或非活動)的狀態。 它可以用於取消訂閲,這將停止 Flux 發出的數據,並釋放發佈者持有的任何資源。 某些可能想要取消持續 訂閲 的場景可能是用户取消請求或超時發生等情況。

3. 取消正在進行的 Flux 的益處

在 Reactive Spring WebFlux 中,為了確保有效利用系統資源並防止潛在的內存泄漏,重要的是取消正在進行的 Flux。以下是一些原因:

  • 反壓機制:Reactive 編程使用反壓機制來調節發佈者和訂閲者之間的數據流。如果訂閲者無法跟上發佈者的速度,則使用反壓機制來減緩或停止數據流。 如果未取消正在進行的訂閲,它將繼續生成數據,即使訂閲者沒有消費它,從而導致反壓積累並可能導致內存泄漏
  • 資源管理:它可以佔用系統資源,如內存、CPU 和網絡連接,如果未進行監控,可能會導致資源耗盡。 通過取消訂閲並稍後釋放系統資源,可以使它們可供其他任務使用
  • 性能通過提前終止訂閲,系統可以避免不必要的處理並降低響應時間,從而提高整體系統性能

4. Maven 依賴

我們以一個非常簡單的示例來看,傳感器數據以 Flux 的形式傳入,並且我們希望通過訂閲來取消數據發射,利用 WebFlux 提供的各種選項。

為了開始,我們需要添加以下關鍵依賴項:

  •  spring-boot-starter-webflux:它包含了啓動使用 Spring WebFlux 構建響應式 Web 應用程序所需的所有依賴項,包括用於響應式編程的 Reactor 庫,以及 Netty 作為默認嵌入式服務器。
  • reactor-spring:這是一個 Reactor 項目中的模塊,它提供了與 Spring Framework 的集成。
  • reactor-test:它提供了 Reactive Streams 的測試支持。

現在,讓我們在項目 POM 中聲明這些依賴項。

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectreactor</groupId>
        <artifactId>reactor-spring</artifactId>
        <version>${reactor-spring.version}</version>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

5. 取消正在進行的 Flux 在 WebFlux 中的使用

在 Spring WebFlux 中,我們可以使用顯式取消操作 dispose(),或者通過使用某些調用 cancel() 方法於 Subscription 對象的運算符而實現隱式取消。這些運算符包括:

  • takeUntil()
  • takeWhile()
  • take(long n)
  • take(Duration n)

更深入地瞭解,我們會發現這些運算符內部會調用 cancel() 方法於 Subscription 對象,該對象作為 Subscriber 的 OnSubscribe() 方法的參數。

接下來,我們來討論這些運算符。

5.1. 使用 takeUntil() 運算符取消

讓我們繼續完善我們對傳感器數據的示例。我們希望繼續接收來自數據流的數據,直到我們遇到值 8,此時我們希望取消任何更多數據的發射:

@Test
void givenOngoingFlux_whentakeUntil_thenFluxCancels() {
    Flux<Integer> sensorData = Flux.range(1, 10);
    List<Integer> result = new ArrayList<>();

    sensorData.takeUntil(reading -> reading == 8)
      .subscribe(result::add);
    assertThat(result).containsExactly(1, 2, 3, 4, 5, 6, 7, 8);
}
<div>
  <div>
    <div>
      <div>
        <div>
          <div>
            <div>
              <p>此代碼片段使用 <em ref="Flux">Flux</em> API 創建一個整數流,並使用各種運算符對其進行操作。首先,使用 <em ref="Flux.range()">Flux.range()</em> 創建從 1 到 10 的整數序列。然後,應用 <em ref="Flux.takeUntil()">takeUntil()</em> 運算符,該運算符期望一個謂詞來指定 <em ref="Flux">Flux</em> 應繼續發出整數,直到值達到 8。</p>
              <p><strong ref="Finally">最後,調用 <em ref="Flux.subscribe()">subscribe()</em> 方法,這會導致 Flux 發出值,直到 <em ref="Flux.takeUntil()">takeUntil()</em> 謂詞評估為 true。</strong> 在 <em ref="Flux.subscribe()">subscribe()</em> 方法中,每次新整數發出時,都會將其添加到 List&lt;Integer&gt; 中,從而捕獲和操作發出
              的值。</p>
              <p><strong ref="Finally">重要的是要注意,<em ref="Flux.subscribe()">subscribe()</em> 方法是觸發 <em ref="Flux">Flux</em> 發出值的關鍵,如果沒有它,則不會發出任何值,因為 <em ref="Flux">Flux</em> 將沒有訂閲。</strong> 當由 <em ref="Flux.takeUntil()">takeUntil()</em> 運算符指定的條件為 true 時,訂閲會自動取消,並且 <em ref="Flux">Flux</em> 停止發出值。測試結果確認結果列表僅包含 Integer 值,最多為 8,這證明了任何進一步數據發射的取消。</p>
           </div>
        </div>
      </div>
    </div>
  </div>
</div>

5.2. 使用 takeWhile() 運算符取消訂閲

接下來,我們考慮一個場景,即我們希望訂閲繼續發出數據,只要傳感器讀數小於 8 時。我們可以利用 takeWhile() 運算符,該運算符期望一個繼續謂詞:

@Test
void givenOngoingFlux_whentakeWhile_thenFluxCancels() {
    List<Integer> result = new ArrayList<>();
    Flux<Integer> sensorData = Flux.range(1, 10)
      .takeWhile(reading -> reading < 8)
      .doOnNext(result::add);

    sensorData.subscribe();
    assertThat(result).containsExactly(1, 2, 3, 4, 5, 6, 7);
}

基本上,這裏 takeWhile() 運算符也期望一個謂詞。只要謂詞評估結果為真,數據流就會發出數據。 一旦謂詞評估結果為假,訂閲將被取消,並且不再發出任何數據

之後,我們調用 sensorData.subscribe()。

5.3. 使用 take(long n) 運算符取消操作

接下來,我們來看一下 `take(long n)` 運算符,它能夠限制我們從潛在的無限數量的反應式數據流中取出的元素數量。

讓我們以一個從 1 到 Integer 類型的最大值範圍內的 中的 Integer 類型數據流為例,然後取前 10 個元素:

@Test
void givenOngoingFlux_whentake_thenFluxCancels() {
    Flux<Integer> sensorData = Flux.range(1, Integer.MAX_VALUE);
    List<Integer> result = new ArrayList<>();

    sensorData.take(10)
      .subscribe(result::add);
    Assertions.assertThat(result)
      .containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

此處訂閲在首次 10 個元素後再次取消,我們的 結果 列表對此予以證實。

5.4. 使用take(Duration d)運算符取消訂閲

在某個時間間隔結束後,我們可能希望停止接收任何進一步的數據發射。 這種情況中,我們首先查看Flux的持續時間,然後停止接收超出該持續時間的任何數據:

@Test
void givenAnOnGoingFlux_whenTimeout_thenCancelsFlux() {
    Flux<Integer> sensorData = Flux.interval(Duration.ZERO, Duration.ofSeconds(2))
      .map(i -> i.intValue() + 10)
      .take(5);

    Flux<Integer> canceledByTimeout = sensorData.take(Duration.ofSeconds(3));

    StepVerifier.create(canceledByTimeout)
      .expectNext(10, 11)
      .expectComplete()
      .verify();
}

首先,我們使用 interval() 運算符創建一系列整數 Flux,該運算符從每 2 秒開始,以 2 秒的間隔發出值,從 0 開始。然後,我們將每個發出的值映射為整數,通過添加 10。 接下來,我們使用 take() 運算符來限制發出的值的數量為 5。這意味着 Flux 只會發出前 5 個值,然後完成。

然後,我們創建一個名為 canceledBytimeOut 的新 Flux,通過將 take(Duration) 運算符與持續時間值為 3 秒應用於該 Flux。這意味着 canceledBytimeout Flux 將發出前 2 個傳感器數據值,然後完成。

這裏我們使用了 StepVerifier。 StepVerifier 是 Reactor-Test 庫提供的實用工具,用於通過設置對預期事件的期望,並驗證事件是否按預期順序和具有預期值發出來驗證 Flux 流程的行為。

在本例中,預期的順序和值是 10 和 11,我們還驗證了 Flux 使用 expectComplete() 而沒有發出任何其他值完成。

請注意,subscribe() 方法沒有顯式調用,因為當調用 verify() 時,它會被內部調用。 這意味着事件僅在運行 StepVerifier 時發出,而不是在創建 Flux 流程時發出。

5.5. 使用 dispose() 方法取消

接下來,讓我們看看如何通過調用 dispose() 方法進行顯式取消,該方法屬於 Disposable 接口。 簡單來説,Disposable 接口作為一種單向取消機制。 它允許釋放資源或取消訂閲。

讓我們設置一個例子,其中我們有一個整數 Flux,它以 1 秒的延遲發出 1 到 10 的值。 我們將訂閲該流以在控制枱中打印值。 然後,我們讓線程休眠 5 毫秒,然後調用 dispose() :

@Test
void giveAnOnGoingFlux_whenDispose_thenCancelsFluxExplicitly() throws InterruptedException {
    Flux<Integer> flux = Flux.range(1, 10)
      .delayElements(Duration.ofSeconds(1));

    AtomicInteger count = new AtomicInteger(0);
    Disposable disposable = flux.subscribe(i -> {
        System.out.println("Received: " + i);
        count.incrementAndGet();
    }, e -> System.err.println("Error: " + e.getMessage())
    );

    Thread.sleep(5000);
    System.out.println("Will Dispose the Flux Next");
    disposable.dispose();
    if(disposable.isDisposed()) {
        System.out.println("Flux Disposed");
    }
    assertEquals(4, count.get());
}

在這裏,我們使線程休眠 5 秒鐘,然後調用 dispose()。這會導致訂閲取消。

6. 取消後清理

重要的是要理解,取消正在進行的訂閲不會自動釋放任何關聯的資源。 但是,在流(flux)被取消或完成之後,進行清理和狀態重置仍然非常重要。我們可以使用提供的 doOnCancel()doFinally() 方法來實現這一點。

為了簡化我們的測試,我們將打印適當的消息,一旦流(flux)被取消。 然而,在實際場景中,這一步可以執行任何資源清理,例如關閉連接,例如。

讓我們快速測試,當 Flux 被取消時,我們的所需字符串作為取消後清理的一部分打印出來:

@Test
void givenAFluxIsCanceled_whenDoOnCancelAndDoFinally_thenMessagePrinted() throws InterruptedException {

    List<Integer> result = new ArrayList<>();
    PrintStream mockPrintStream = mock(PrintStream.class);
    System.setOut(mockPrintStream);

    Flux<Integer> sensorData = Flux.interval(Duration.ofMillis(100))
      .doOnCancel(() -> System.out.println("Flux Canceled"))
      .doFinally(signalType -> {
          if (signalType == SignalType.CANCEL) {
              System.out.println("Flux Completed due to Cancelation");
          } else {
              System.out.println("Flux Completed due to Completion or Error");
          }
      })
      .map(i -> ThreadLocalRandom.current().nextInt(1, 1001))
      .doOnNext(result::add);

    Disposable subscription = sensorData.subscribe();

    Thread.sleep(1000);
    subscription.dispose();

    ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
    Mockito.verify(mockPrintStream, times(2)).println(captor.capture());

    assertThat(captor.getAllValues()).contains("Flux Canceled", "Flux Completed due to Cancelation");
}
<div>
  <div>
    <div>
      <p>代碼調用了 <em >doOnCancel()</em> 和 <em >doFinally()</em> 運算符。&nbsp;<strong>請注意,<em >doOnCancel()</em> 運算符僅在明確取消 <em >Flux</em> 序列時才會執行。另一方面,<em >doFinally()</em> 運算符無論是否被取消、成功完成還是發生錯誤,都會執行。</strong></p>
      <p>此外,<em >doFinally()</em> 運算符會消耗一種 <em >SignalType</em> 接口。它代表了信號的可能類型,例如 <em >OnComplete</em>、<em >OnError</em> 和 <em >CANCEL</em>。在這種情況下,<em >SignalType</em> 是 <em >CANCEL</em>,因此“由於取消完成 Flux”的消息也被捕獲。</p>
    </div>
  </div>
</div >

7. 結論

在本教程中,我們探討了 Webflux 提供的多種方式來取消正在進行的 Flux。我們對 Flux 在響應式編程中的應用進行了快速回顧。我們分析了訂閲可能需要取消的原因。然後,我們討論了各種方法以促進取消操作。此外,我們還研究了取消操作後的清理工作。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.