知識庫 / Spring WebFlux RSS 訂閱

Spring WebFlux 反應式流中的條件語句

Reactive,Spring WebFlux
HongKong
8
10:56 AM · Dec 06 ,2025

1. 概述

使用條件語句在 Spring WebFlux 反應式流中,可以實現動態決策,同時處理反應式流。與命令式方法不同,反應式方法中的條件邏輯不限於 <em >if-else</em > 語句。相反,我們可以使用多種運算符,如 `map(), filter(), swithIfEmpty(), 以及其他運算符,在不阻塞流的情況下引入條件流。

在本文中,我們將探討使用條件語句與 Spring WebFlux 的不同方法。除非明確説明,每個方法都適用於 MonoFlux

2. 使用條件構造和 <em >map()</em >>

我們可以使用 `map()>` 運算符來轉換流中的每個元素。 此外,我們還可以在映射器中使用 if-else 語句來條件地修改元素

讓我們定義一個名為 oddEvenFluxFlux,並使用 map() 運算符為每個元素標記為“Even”或“Odd”:

Flux<String> oddEvenFlux = Flux.just(1, 2, 3, 4, 5, 6)
  .map(num -> {
    if (num % 2 == 0) {
      return "Even";
    } else {
      return "Odd";
    }
  });

請注意, 是同步的,並且在每個項目發出後立即應用轉換函數。

接下來,我們使用 StepVerifier 來測試我們的反應式流的行為,並確認每個項目的條件標記:

StepVerifier.create(oddEvenFlux)
  .expectNext("Odd")
  .expectNext("Even")
  .expectNext("Odd")
  .expectNext("Even")
  .expectNext("Odd")
  .expectNext("Even")
  .verifyComplete();

正如預期的那樣,每個數字都根據其 奇偶性 進行標記。

3. 使用 filter()

我們可以使用 filter() 運算符來過濾數據,使用謂詞,從而確保下游運算符只接收相關數據。

讓我們創建一個名為 evenNumbersFlux 的新 Flux,從一個數字流中創建它:

Flux<Integer> evenNumbersFlux = Flux.just(1, 2, 3, 4, 5, 6)
  .filter(num -> num % 2 == 0);

在這裏,我們添加了一個謂詞(predicate)用於 filter() 運算符,以確定數字是否為偶數。

現在,讓我們驗證 evenNumbersFlux 僅允許偶數向下流動:

StepVerifier.create(evenNumbersFlux)
  .expectNext(2)
  .expectNext(4)
  .expectNext(6)
  .verifyComplete();

太棒了!它按預期工作。

4. 使用 switchIfEmpty()defaultIfEmpty()

在本節中,我們將學習兩個有用的運算符,它們允許在底層流沒有發出任何項時實現條件數據流。

4.1. 使用 switchIfEmpty()

當底層流未發佈任何項時,我們可能需要切換到替代流。在這種情況下,我們可以通過 <em>switchIfEmpty()</em> 運算符 提供替代發佈器。

假設我們有一個單詞流,該流通過 <em>filter()</em> 運算符 過濾,只允許長度為兩個或更多字符的單詞:

Flux<String> flux = Flux.just("A", "B", "C", "D", "E")
  .filter(word -> word.length() >= 2);

自然地,當沒有任何單詞滿足過濾條件時,流不會產生任何項目。

現在,讓我們通過使用 switchIfEmpty() 運算符提供一個替代流。

flux = flux.switchIfEmpty(Flux.defer(() -> Flux.just("AA", "BB", "CC")));

我們使用了 Flux.defer() 方法,以確保備用流僅在上游流未產生任何項目時才創建。

最後,讓我們驗證結果流是否產生來自替代源的所有項目:

StepVerifier.create(flux)
  .expectNext("AA")
  .expectNext("BB")
  .expectNext("CC")
  .verifyComplete();

結果看起來是正確的。

4.2. 使用 defaultIfEmpty()

我們可以使用 defaultIfEmpty() 運算符,在上游流未發出任何項時,提供備用值,而不是替代發佈器。

flux = flux.defaultIfEmpty("No words found!");

另一個關鍵區別在於使用 switchIfEmpty()defaultIfEmpty() 的方式:使用 defaultIfEmpty() 時,我們只能使用單個默認值。

現在,讓我們驗證我們的反應式流的條件流:

StepVerifier.create(flux)
  .expectNext("No words found!")
  .verifyComplete();

我們已經搞定這個。

5. 使用 flatMap()

我們可以使用 flatMap() 運算符在我們的反應式流中創建多個條件分支,同時保持非阻塞、異步的流程。

下面我們來看一個由單詞創建的 Flux,並使用兩個 flatMap() 運算符對其進行修改:

Flux<String> flux = Flux.just("A", "B", "C")
  .flatMap(word -> {
    if (word.startsWith("A")) {
      return Flux.just(word + "1", word + "2", word + "3");
    } else {
      return Flux.just(word);
    }
  })
  .flatMap(word -> {
    if (word.startsWith("B")) {
      return Flux.just(word + "1", word + "2");
    } else {
      return Flux.just(word);
    }
  });

我們通過添加兩個條件轉換階段,創建了動態的分支,從而為反應式流中的每個項目提供了多個邏輯路徑。

現在,是時候驗證我們反應式流中的條件流:

StepVerifier.create(flux)
  .expectNext("A1")
  .expectNext("A2")
  .expectNext("A3")
  .expectNext("B1")
  .expectNext("B2")
  .expectNext("C")
  .verifyComplete();

太棒了!看來我們成功了。此外,我們可以使用 flatMapMany() 針對 Mono 發佈者進行類似用例。

6. 使用副作用運算符

在本節中,我們將探討如何在處理反應式流的同時,執行基於條件的同步操作。

6.1. 使用 doOnNext()

我們可以使用 `doOnNext()` 運算符,在每個 reactive stream 的項目上同步執行副作用操作。

讓我們先定義 `evenCounter` 變量來跟蹤 reactive stream 中偶數的數量:

AtomicInteger evenCounter = new AtomicInteger(0);

現在,讓我們創建一個整數流,並使用 doOnNext() 運算符將其與鏈式連接,以計數偶數:

Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6)
  .doOnNext(num -> {
  if (num % 2 == 0) {
    evenCounter.incrementAndGet();
  }
  });

我們已將操作添加到 if 塊中,從而使計數器的條件遞增得以實現。

接下來,必須驗證在從反應式流中處理每個項目後,evenCounter 的邏輯和狀態:

StepVerifier.create(flux)
  .expectNextMatches(num -> num == 1 && evenCounter.get() == 0)
  .expectNextMatches(num -> num == 2 && evenCounter.get() == 1)
  .expectNextMatches(num -> num == 3 && evenCounter.get() == 1)
  .expectNextMatches(num -> num == 4 && evenCounter.get() == 2)
  .expectNextMatches(num -> num == 5 && evenCounter.get() == 2)
  .expectNextMatches(num -> num == 6 && evenCounter.get() == 3)
  .verifyComplete();

太棒了!我們得到了預期的結果。

6.2. 使用 doOnComplete()

類似於地,我們還可以根據接收到反應式流信號的條件來關聯操作,例如在已發佈所有項後發送的完成信號。

讓我們首先初始化 done 標誌:

AtomicBoolean done = new AtomicBoolean(false);

現在,讓我們定義一個整數流,並使用 doOnComplete() 運算符設置 done 標誌為 true

Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6)
  .doOnComplete(() -> done.set(true));

需要注意的是,完整信號只發送一次,因此副作用動作最多觸發一次

此外,為了驗證副作用的條件執行,請在各個步驟中驗證 done 標誌。

StepVerifier.create(flux)
  .expectNextMatches(num -> num == 1 && !done.get())
  .expectNextMatches(num -> num == 2 && !done.get())
  .expectNextMatches(num -> num == 3 && !done.get())
  .expectNextMatches(num -> num == 4 && !done.get())
  .expectNextMatches(num -> num == 5 && !done.get())
  .expectNextMatches(num -> num == 6 && !done.get())
  .then(() -> Assertions.assertTrue(done.get()))
  .expectComplete()
  .verify();

太棒了!我們可以看到 done 標誌只有在所有項目成功發射後才被設置為 true。但是,需要注意的是,doOnComplete() 僅適用於 Flux 發佈者,而對於 Mono 發佈者,我們必須使用 doOnSuccess()

7. 使用 firstOnValue()

有時,我們可能需要從多個來源收集數據,但每個來源的延遲可能不同。從性能角度來看,最好使用延遲最低的來源的值。

對於這種條件數據訪問,我們可以使用 firstOnValue() 運算符

首先,讓我們定義兩個來源,即 source1source2,它們的延遲分別為 200 毫秒和 10 毫秒。

Mono<String[]> source1 = Mono.defer(() -> Mono.just(new String[] { "val", "source1" })
  .delayElement(Duration.ofMillis(200)));
Mono<String[]> source2 = Mono.defer(() -> Mono.just(new String[] { "val", "source2" })
  .delayElement(Duration.ofMillis(10)));

接下來,我們使用 firstWithValue() 運算符,結合這兩個數據源,並利用框架的條件邏輯來處理數據訪問:

Mono<String[]> mono = Mono.firstWithValue(source1, source2);

最後,讓我們通過將生成的項與源數據進行比較,以較低的延遲進行驗證:

StepVerifier.create(mono)
  .expectNextMatches(item -> "val".equals(item[0]) && "source2".equals(item[1]))
  .verifyComplete();

太棒了!我們已經搞定了。此外,請注意,firstWithValue() 僅適用於 Mono 發佈器

8. 使用 zip()zipWhen()

在本節中,我們將學習如何利用條件流控制,使用 zip()zipWhen() 運算符。

8.1. 使用 <em>zip()</em>

我們可以使用 `zip()` 運算符將來自多個來源的發射(emissions)組合起來。 此外,我們還可以利用其組合函數為數據處理添加條件邏輯

下面我們來看如何使用它來確定緩存和數據庫中的值是否不一致:

首先,讓我們定義 dataFromDBdataFromCache 發佈器,以模擬具有不同延遲和值的來源:

Mono<String> dataFromDB = Mono.defer(() -> Mono.just("db_val")
  .delayElement(Duration.ofMillis(200)));
Mono<String> dataFromCache = Mono.defer(() -> Mono.just("cache_val")
  .delayElement(Duration.ofMillis(10)));

現在,我們對其進行壓縮</em/>,並使用其組合函數來添加判斷緩存是否一致的條件:

Mono<String[]> mono = Mono.zip(dataFromDB, dataFromCache, 
  (dbValue, cacheValue) -> 
  new String[] { dbValue, dbValue.equals(cacheValue) ? "VALID_CACHE" : "INVALID_CACHE" });

最後,讓我們驗證這個模擬,並驗證緩存不一致,因為 db_valcache_val 不同:

StepVerifier.create(mono)
  .expectNextMatches(item -> "db_val".equals(item[0]) && "INVALID_CACHE".equals(item[1]))
  .verifyComplete();

結果看起來正確。

8.2. 使用 <em>zipWhen()</em>

當我們需要僅在第一個源產生髮射信號時,才收集第二個源的發射信號,zipWhen() 運算符更適用。 此外,我們還可以使用組合函數來為我們的反應式流添加條件邏輯以進行處理。

例如,我們想要計算用户的年齡段:

int userId = 1;
Mono<String> userAgeCategory = Mono.defer(() -> Mono.just(userId))
  .zipWhen(id -> Mono.defer(() -> Mono.just(20)), (id, age) -> age >= 18 ? "ADULT" : "KID");

我們模擬了一個具有有效用户ID的場景。因此,我們保證能夠從第二個發佈者處獲得數據。隨後,我們將獲取用户的年齡類別。

現在,讓我們驗證這個場景,並確認年齡為20歲的用户被歸類為“ADULT”。

StepVerifier.create(userDetail)
  .expectNext("ADULT")
  .verifyComplete();

接下來,我們使用 Mono.empty() 來模擬在未找到有效用户時的分類場景:

Mono<String> noUserDetail = Mono.empty()
  .zipWhen(id -> Mono.just(20), (id, age) -> age >= 18 ? "ADULT" : "KID");

最後,我們可以確認在本例中沒有排放:

StepVerifier.create(noUserDetail)
  .verifyComplete();

完美!我們得到了兩個場景下預期的結果。此外,還需要注意的是,zipWhen() 僅適用於 Mono 發佈器。

9. 結論

在本文中,我們學習瞭如何在 Spring WebFlux 中包含條件語句。此外,我們還探討了條件流是如何通過不同的操作符實現的,例如 map()flatMap()zip()firstOnValue()switchIfEmpty()defaultIfEmpty() 等。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.