知識庫 / Spring WebFlux RSS 訂閱

訪問 Flux 中的第一個元素方法

Reactive,Spring WebFlux
HongKong
7
11:52 AM · Dec 06 ,2025

1. 概述

本教程將探討使用 Spring 5 WebFlux 訪問 Flux 中第一個元素的各種方法。

首先,我們將使用 API 的非阻塞方法,例如 <em >next()</em ><em >take()</em >》。 之後,我們將通過使用elementAt()> 方法來達到相同的效果,該方法需要指定索引。

最後,我們將學習 API 的阻塞方法,並使用 `blockFirst()> 方法來訪問 Flux 中的第一個元素。

2. 測試環境搭建

為了本文中的代碼示例,我們將使用 Payment 類,該類僅包含一個字段,即支付金額 金額

public class Payment {
    private final int amount;
    // constructor and getter
}

在測試中,我們將使用名為 fluxOfThreePayments 的測試助手方法,構建一系列支付流:

private Flux<Payment> fluxOfThreePayments() {
    return Flux.just(paymentOf100, new Payment(200), new Payment(300));
}

之後,我們將使用 Spring Reactor 的 StepVerifier 來測試結果。

3. next()

首先,我們嘗試使用 next() 方法。該方法將返回 flux 的第一個元素,並將其封裝為 reactive 的 Mono 類型。

@Test
void givenAPaymentFlux_whenUsingNext_thenGetTheFirstPaymentAsMono() {
    Mono<Payment> firstPayment = fluxOfThreePayments().next();

    StepVerifier.create(firstPayment)
      .expectNext(paymentOf100)
      .verifyComplete();
}

另一方面,如果我們在一個空的 flux 上調用 next(),結果將是一個 空的 Mono。 因此,阻塞它將返回 null

@Test
void givenAEmptyFlux_whenUsingNext_thenGetAnEmptyMono() {
    Flux<Payment> emptyFlux = Flux.empty();

    Mono<Payment> firstPayment = emptyFlux.next();

    StepVerifier.create(firstPayment)
      .verifyComplete();
}

4. take()

Reactive Flux 的 take() 方法與 Java 8 Streams 中的 limit() 類似。 換句話説,我們可以使用 take(1) 來將 flux 限制為精確的 1 個元素,並以阻塞或非阻塞的方式使用它:

@Test
void givenAPaymentFlux_whenUsingTake_thenGetTheFirstPaymentAsFlux() {
    Flux<Payment> firstPaymentFlux = fluxOfThreePayments().take(1);

    StepVerifier.create(firstPaymentFlux)
      .expectNext(paymentOf100)
      .verifyComplete();
}

同樣,對於空流,take(1) 將返回一個空流:

@Test
void givenAEmptyFlux_whenUsingNext_thenGetAnEmptyFlux() {
    Flux<Payment> emptyFlux = Flux.empty();

    Flux<Payment> firstPaymentFlux = emptyFlux.take(1);

    StepVerifier.create(firstPaymentFlux)
      .verifyComplete();
}

5. elementAt()

Flux API 還提供了 elementAt() 方法。我們可以使用 elementAt(0) 以非阻塞方式獲取 flux 中的第一個元素:

@Test
void givenAPaymentFlux_whenUsingElementAt_thenGetTheFirstPaymentAsMono() {
    Mono<Payment> firstPayment = fluxOfThreePayments().elementAt(0);

    StepVerifier.create(firstPayment)
      .expectNext(paymentOf100)
      .verifyComplete();
}

如果索引作為參數傳遞時,其值大於流中產生的元素的數量,則會發出錯誤:

@Test
void givenAEmptyFlux_whenUsingElementAt_thenGetAnEmptyMono() {
    Flux<Payment> emptyFlux = Flux.empty();

    Mono<Payment> firstPayment = emptyFlux.elementAt(0);

    StepVerifier.create(firstPayment)
      .expectError(IndexOutOfBoundsException.class);
}

6. blockFirst()

Alternatively, we can also use blockFirst(). Though, as the name suggests, this is a blocking method. As a result, if we use blockFirst(), we’ll be leaving the reactive world, and we’ll lose all its benefits:

@Test
void givenAPaymentFlux_whenUsingBlockFirst_thenGetTheFirstPayment() {
    Payment firstPayment = fluxOfThreePayments().blockFirst();

    assertThat(firstPayment).isEqualTo(paymentOf100);
}

7. <em >toStream()</em >

最後,我們可以將流轉換為 Java 8 流,然後訪問第一個元素:

@Test
void givenAPaymentFlux_whenUsingToStream_thenGetTheFirstPaymentAsOptional() {
    Optional<Payment> firstPayment = fluxOfThreePayments().toStream()
      .findFirst();

    assertThat(firstPayment).contains(paymentOf100);
}

然而,再次強調,如果我們這樣做,我們將無法繼續使用反應式管道。

8. 結論

本文介紹了 Java 中 反應式流 的 API。我們探討了如何訪問 Flux 的第一個元素,並瞭解到為了充分利用反應式流水線,我們應該採用非阻塞解決方案。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.