1. 概述
在本教程中,我們將探討如何使用 <a href="https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#zipWhen-java.util.function.Function-">zipWhen()組合兩個或多個Mono流的結果,以協調的方式進行。 我們將從一個快速概述開始。 接下來,我們將設置一個簡單的示例,涉及用户數據存儲和電子郵件。 我們將展示zipWhen()` 如何使我們能夠協調和同步多個異步操作,尤其是在我們需要從各種來源併發收集和處理數據的情況下。
2. 什麼是 zipWhen()
在 Reactive Programming with Mono, zipWhen() 是一個運算符,允許我們以協調的方式組合兩個或多個 Mono 流的結果。 它通常用於我們有多個異步操作需要併發執行,並且需要將它們的成果合併到一個單一輸出時。
我們從兩個或多個 Mono 流開始,這些流代表異步操作。 這些 Monos 可以發出不同類型的數據,並且可能相互依賴,也可能沒有依賴關係。
然後我們使用 zipWhen() 來協調。 我們將 zipWhen() 運算符應用於其中一個 Mono。 此運算符等待第一個 Mono 發出值,然後使用該值來觸發其他 Monos 的執行。 zipWhen() 的結果是一個新的 Mono,它將所有 Mono 的結果合併到一個單一的數據結構中,通常是一個 Tuple 或我們定義的對象。
最後,我們可以指定我們希望如何組合 Monos 的結果。 我們可以使用合併的值來創建一個新對象、執行計算或構建有意義的響應。
3. 示例配置
讓我們設置一個簡單的示例,包含三個簡化的服務:UserService, EmailService, 和 DataBaseService。每個服務都以 Mono 的形式產生數據。我們希望將所有數據合併到一個響應中並返回給調用客户端。首先,讓我們設置適當的 POM 依賴項。
3.1. 依賴項
首先設置所需的依賴項。我們需要 spring-boot-starter-webflux 和 reactor-test。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>3.2. 設置 UserService
首先介紹一下 User Service:
public class UserService {
public Mono<User> getUser(String userId) {
return Mono.just(new User(userId, "John Stewart"));
}
}
在此,UserService 提供了一個根據給定的 userId 檢索用户數據的⽅法。它返回一個 Mono<User>,代表用户信息。
3.3. 設置 EmailService
接下來,我們添加 EmailService:
public class EmailService {
private final UserService userService;
public EmailService(UserService userService) {
this.userService = userService;
}
public Mono<Boolean> sendEmail(String userId) {
return userService.getUser(userId)
.flatMap(user -> {
System.out.println("Sending email to: " + user.getEmail());
return Mono.just(true);
})
.defaultIfEmpty(false);
}
}正如其名稱所示,EmailService負責向用户發送電子郵件。 重要的是,它依賴於UserService來獲取用户信息,然後根據檢索到的信息發送電子郵件。 sendEmail()方法返回一個Mono<Boolean>,指示電子郵件是否已成功發送。
3.4. 設置 DatabaseService
public class DatabaseService {
private Map<String, User> dataStore = new ConcurrentHashMap<>();
public Mono<Boolean> saveUserData(User user) {
return Mono.create(sink -> {
try {
dataStore.put(user.getId(), user);
sink.success(true);
} catch (Exception e) {
sink.success(false);
}
});
}
}
DatabaseService 處理用户數據持久化到數據庫。為了簡化,我們在這裏使用併發映射來表示數據存儲。
它提供了一個saveUserData()方法,該方法接受用户信息並返回一個Mono<Boolean>以指示數據庫操作的成功或失敗。
4. zipWhen() 示例
現在我們已經定義了所有服務,接下來定義一個控制器方法,它將來自所有三個服務的 Mono 流合併為一個類型為 Mono<ResponseEntity<String>>. 我們將演示如何使用 zipWhen() 運算符來協調各種異步操作並將它們全部轉換為單個響應,以便供調用客户端使用。 讓我們首先定義 GET 方法:
@GetMapping("/example/{userId}")
public Mono<ResponseEntity<String>> combineAllDataFor(@PathVariable String userId) {
Mono<User> userMono = userService.getUser(userId);
Mono<Boolean> emailSentMono = emailService.sendEmail(userId)
.subscribeOn(Schedulers.parallel());
Mono<String> databaseResultMono = userMono.flatMap(user -> databaseService.saveUserData(user)
.map(Object::toString));
return userMono.zipWhen(user -> emailSentMono, (t1, t2) -> Tuples.of(t1, t2))
.zipWhen(tuple -> databaseResultMono, (tuple, databaseResult) -> {
User user = tuple.getT1();
Boolean emailSent = tuple.getT2();
return ResponseEntity.ok()
.body("Response: " + user + ", Email Sent: " + emailSent + ", Database Result: " + databaseResult);
});
}當客户端調用 GET /example/{userId} 終點時, userService 會調用 combineAllData() 方法來根據提供的 userId 檢索用户的信息,通過調用 userService.getUser(userId)。 結果存儲在名為 userMono 的 Mono<User> 中。
接下來,它會向該用户發送電子郵件。 但是,在發送電子郵件之前,它會檢查用户是否存在。 發送電子郵件操作的結果(成功或失敗)表示為類型為 Mono<Boolean> 的 emailSentMono。 此操作並行執行以節省時間。
它使用 databaseService.saveUserData(user) 方法將用户數據(在步驟 1 中檢索)保存到數據庫。 此操作的結果(成功或失敗)轉換為字符串並存儲在 Mono<String> 中。
重要的是,它使用 zipWhen() 運算符將來自先前步驟的結果組合在一起。 第一個 將用户數據 和來自 的電子郵件發送狀態組合成一個元組。 第二個 將先前元組和來自 的數據庫結果組合,以構建最終響應。 在第二個 中,它使用組合的數據構建響應消息。
消息包含用户信息、電子郵件是否已成功發送以及數據庫操作的結果。 基本上,此方法協調了針對特定用户的用户數據檢索、電子郵件發送和數據庫操作,並將結果組合成一個有意義的響應,確保一切高效地同時發生。
5. 測試
現在,讓我們對我們的系統進行測試,並驗證正確響應是否組合了三種不同的 Reactive Streams 類型:
@Test
public void givenUserId_whenCombineAllData_thenReturnsMonoWithCombinedData() {
UserService userService = Mockito.mock(UserService.class);
EmailService emailService = Mockito.mock(EmailService.class);
DatabaseService databaseService = Mockito.mock(DatabaseService.class);
String userId = "123";
User user = new User(userId, "John Doe");
Mockito.when(userService.getUser(userId))
.thenReturn(Mono.just(user));
Mockito.when(emailService.sendEmail(userId))
.thenReturn(Mono.just(true));
Mockito.when(databaseService.saveUserData(user))
.thenReturn(Mono.just(true));
UserController userController = new UserController(userService, emailService, databaseService);
Mono<ResponseEntity<String>> responseMono = userController.combineAllDataFor(userId);
StepVerifier.create(responseMono)
.expectNextMatches(responseEntity -> responseEntity.getStatusCode() == HttpStatus.OK && responseEntity.getBody()
.equals("Response: " + user + ", Email Sent: true, Database Result: " + true))
.verifyComplete();
}<div>
<div>
<div>
<div>
我們使用 <em >StepVerifier</em> 來驗證響應實體是否具有預期的 200 OK 狀態碼,以及一個包含不同 Monos 結果的 body,利用 <em >zipWhen()</em> 方法實現。
</div>
</div>
</div>
</div>
6. 結論
在本教程中,我們快速介紹瞭如何使用 zipWhen() 與 Mono 在響應式編程中進行交互。我們使用了用户數據收集、電子郵件和存儲組件的示例,這些組件都提供了不同類型的 Mono。該示例展示瞭如何使用 zipWhen() 以高效的方式處理數據依賴關係,並在響應式 Spring WebFlux 應用程序中編排異步操作。