1. 概述
本教程將探討使用 Spring 5 WebFlux 訪問 Flux 中第一個元素的各種方法。
首先,我們將使用 API 的非阻塞方法,例如 <em >next()</em > 和 <em >take()</em >》。 之後,我們將通過使用elementAt()> 方法來達到相同的效果,該方法需要指定索引。
最後,我們將學習 API 的阻塞方法,並使用 `blockFirst()> 方法來訪問 Flux 中的第一個元素。
2. 測試環境搭建
為了本文中的代碼示例,我們將使用 Payment 類,該類僅包含一個字段,即支付金額 金額。
public class Payment {
private final int amount;
// constructor and getter
}在測試中,我們將使用名為 fluxOfThreePayments 的測試助手方法,構建一系列支付流:
private Flux<Payment> fluxOfThreePayments() {
return Flux.just(paymentOf100, new Payment(200), new Payment(300));
}之後,我們將使用 Spring Reactor 的 StepVerifier 來測試結果。
3. next()
首先,我們嘗試使用 next() 方法。該方法將返回 flux 的第一個元素,並將其封裝為 reactive 的 Mono 類型。
@Test
void givenAPaymentFlux_whenUsingNext_thenGetTheFirstPaymentAsMono() {
Mono<Payment> firstPayment = fluxOfThreePayments().next();
StepVerifier.create(firstPayment)
.expectNext(paymentOf100)
.verifyComplete();
}另一方面,如果我們在一個空的 flux 上調用 next(),結果將是一個 空的 Mono。 因此,阻塞它將返回 null:
@Test
void givenAEmptyFlux_whenUsingNext_thenGetAnEmptyMono() {
Flux<Payment> emptyFlux = Flux.empty();
Mono<Payment> firstPayment = emptyFlux.next();
StepVerifier.create(firstPayment)
.verifyComplete();
}
4. take()
Reactive Flux 的 take() 方法與 Java 8 Streams 中的 limit() 類似。 換句話説,我們可以使用 take(1) 來將 flux 限制為精確的 1 個元素,並以阻塞或非阻塞的方式使用它:
@Test
void givenAPaymentFlux_whenUsingTake_thenGetTheFirstPaymentAsFlux() {
Flux<Payment> firstPaymentFlux = fluxOfThreePayments().take(1);
StepVerifier.create(firstPaymentFlux)
.expectNext(paymentOf100)
.verifyComplete();
}同樣,對於空流,take(1) 將返回一個空流:
@Test
void givenAEmptyFlux_whenUsingNext_thenGetAnEmptyFlux() {
Flux<Payment> emptyFlux = Flux.empty();
Flux<Payment> firstPaymentFlux = emptyFlux.take(1);
StepVerifier.create(firstPaymentFlux)
.verifyComplete();
}5. elementAt()
Flux API 還提供了 elementAt() 方法。我們可以使用 elementAt(0) 以非阻塞方式獲取 flux 中的第一個元素:
@Test
void givenAPaymentFlux_whenUsingElementAt_thenGetTheFirstPaymentAsMono() {
Mono<Payment> firstPayment = fluxOfThreePayments().elementAt(0);
StepVerifier.create(firstPayment)
.expectNext(paymentOf100)
.verifyComplete();
}如果索引作為參數傳遞時,其值大於流中產生的元素的數量,則會發出錯誤:
@Test
void givenAEmptyFlux_whenUsingElementAt_thenGetAnEmptyMono() {
Flux<Payment> emptyFlux = Flux.empty();
Mono<Payment> firstPayment = emptyFlux.elementAt(0);
StepVerifier.create(firstPayment)
.expectError(IndexOutOfBoundsException.class);
}6. blockFirst()
Alternatively, we can also use blockFirst(). Though, as the name suggests, this is a blocking method. As a result, if we use blockFirst(), we’ll be leaving the reactive world, and we’ll lose all its benefits:
@Test
void givenAPaymentFlux_whenUsingBlockFirst_thenGetTheFirstPayment() {
Payment firstPayment = fluxOfThreePayments().blockFirst();
assertThat(firstPayment).isEqualTo(paymentOf100);
}7. <em >toStream()</em >
最後,我們可以將流轉換為 Java 8 流,然後訪問第一個元素:
@Test
void givenAPaymentFlux_whenUsingToStream_thenGetTheFirstPaymentAsOptional() {
Optional<Payment> firstPayment = fluxOfThreePayments().toStream()
.findFirst();
assertThat(firstPayment).contains(paymentOf100);
}然而,再次強調,如果我們這樣做,我們將無法繼續使用反應式管道。
8. 結論
本文介紹了 Java 中 反應式流 的 API。我們探討了如何訪問 Flux 的第一個元素,並瞭解到為了充分利用反應式流水線,我們應該採用非阻塞解決方案。