1. 概述
本教程將比較 Java 19 的虛擬線程與 Project Reactor 的 Webflux。我們首先將回顧每種方法的基本原理,隨後將分析它們的優缺點。
我們首先將探索反應式框架的優勢,並瞭解 Webflux 仍然具有價值的原因。之後,我們將討論線程池單請求模型,並突出顯示虛擬線程在哪些場景下更具優勢。
2. 代碼示例
對於本文檔中的代碼示例,我們假設我們正在開發電子商務應用程序的後端。我們將重點關注負責計算併發布添加到購物車中的商品價格的函數:
class ProductService {
private final String PRODUCT_ADDED_TO_CART_TOPIC = "product-added-to-cart";
private final ProductRepository repository;
private final DiscountService discountService;
private final KafkaTemplate<String, ProductAddedToCartEvent> kafkaTemplate;
// constructor
public void addProductToCart(String productId, String cartId) {
Product product = repository.findById(productId)
.orElseThrow(() -> new IllegalArgumentException("not found!"));
Price price = product.basePrice();
if (product.category().isEligibleForDiscount()) {
BigDecimal discount = discountService.discountForProduct(productId);
price.setValue(price.getValue().subtract(discount));
}
var event = new ProductAddedToCartEvent(productId, price.getValue(), price.getCurrency(), cartId);
kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event);
}
}如我們所見,我們首先使用 MongoRepository 從 MongoDB 數據庫中檢索 Product。檢索到 Product 後,我們確定它是否符合折扣條件。如果是,我們使用 DiscountService 發送 HTTP 請求以確定該產品的可用折扣信息。
最後,我們計算產品的最終價格。完成計算後,我們發送一條 Kafka 消息,其中包含 productId, cartId 以及計算出的價格。
3. WebFlux
WebFlux 是構建異步、非阻塞和事件驅動型應用程序的框架。它基於響應式編程原則,利用 Flux 和 Mono 類型來處理異步通信的複雜性。這些類型實現了發佈-訂閲設計模式,解耦數據生產者和消費者。
3.1. 反應式庫
來自 Spring 生態系統的眾多模塊都與 WebFlux 整合,用於反應式編程。 讓我們在重構代碼的過程中,利用這些模塊,朝着反應式範式過渡。
例如,我們可以將 MongoRepository 替換為 ReactiveMongoRepository。 這種更改意味着我們需要使用 Mono<Product> 而不是 Optional<Product>:
Mono<Product> product = repository.findById(productId)
.switchIfEmpty(Mono.error(() -> new IllegalArgumentException("not found!")));同樣,我們可以將 ProductService 改為異步和非阻塞。例如,我們可以讓它使用 WebClient 執行 HTTP 請求,並因此返回折扣作為 Mono<BigDecimal>:。
Mono<BigDecimal> discount = discountService.discountForProduct(productId);
3.2. 不可變性
在函數式和響應式編程範式中,首選不可變數據,而非可變數據。 我們的初始方法是使用設置器來修改 Price 的值。 但是,隨着我們轉向響應式方法,讓我們重構 Price 對象並使其不可變。
例如,我們可以引入一個專門的方法,該方法應用折扣並生成一個新的 Price 實例,而不是修改現有實例:
record Price(BigDecimal value, String currency) {
public Price applyDiscount(BigDecimal discount) {
return new Price(value.subtract(discount), currency);
}
}現在,我們可以根據折扣計算新的價格,使用 WebFlux 的 map() 方法:
Mono<Price> price = discountService.discountForProduct(productId)
.map(discount -> price.applyDiscount(discount));此外,我們甚至可以使用方法引用在這裏,以保持代碼簡潔:
Mono<Price> price = discountService.discountForProduct(productId).map(price::applyDiscount);
3.3. 功能流水線
Mono 和 Flux 遵循函數式和單調模式,通過諸如 map() 和 flatMap() 等方法實現。 這使得我們可以將使用案例描述為對不可變數據的轉換流水線。
讓我們嘗試識別我們使用案例所需的轉換:
- 我們從一個原始 productId 開始
- 我們將其轉換為 Product
- 我們使用 Product 來計算 Price
- 我們使用 Price 來創建 event
- 最後,我們將 event 發佈到消息隊列
現在,讓我們重構代碼以反映此函數鏈:
void addProductToCart(String productId, String cartId) {
Mono<Product> productMono = repository.findById(productId)
.switchIfEmpty(Mono.error(() -> new IllegalArgumentException("not found!")));
Mono<Price> priceMono = productMono.flatMap(product -> {
if (product.category().isEligibleForDiscount()) {
return discountService.discountForProduct(productId)
.map(product.basePrice()::applyDiscount);
}
return Mono.just(product.basePrice());
});
Mono<ProductAddedToCartEvent> eventMono = priceMono.map(
price -> new ProductAddedToCartEvent(productId, price.value(), price.currency(), cartId));
eventMono.subscribe(event -> kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event));
}現在,我們內聯本地變量以保持代碼緊湊。此外,我們提取一個計算價格的函數,並在 <em>flatMap()</em> 中使用它。
void addProductToCart(String productId, String cartId) {
repository.findById(productId)
.switchIfEmpty(Mono.error(() -> new IllegalArgumentException("not found!")))
.flatMap(this::computePrice)
.map(price -> new ProductAddedToCartEvent(productId, price.value(), price.currency(), cartId))
.subscribe(event -> kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event));
}
Mono<Price> computePrice(Product product) {
if (product.category().isEligibleForDiscount()) {
return discountService.discountForProduct(product.id())
.map(product.basePrice()::applyDiscount);
}
return Mono.just(product.basePrice());
}4. 虛擬線程
虛擬線程是通過 Java Project Loom 在 Java 中引入的一種替代方案,用於並行處理。 它們是輕量級的,在用户模式下由 Java 虛擬機 (JVM) 管理的線程。 由於如此,它們特別適合用於 I/O 操作,其中傳統線程可能會花費大量時間等待外部資源。
與異步或響應式解決方案相比,虛擬線程使我們能夠繼續使用線程池單請求處理模型。 換句話説,我們可以繼續編寫順序代碼,而無需混合業務邏輯和響應式 API。
4.1. 虛擬線程
有幾種方法可以利用虛擬線程來執行我們的代碼。對於單個方法,例如前一個示例中演示的方法,我們可以使用 <em >startVirtualThread()</em>。這個靜態方法最近被添加到 <em >Thread</em> API 中,它會在一個新的虛擬線程上執行一個 <em >Runnable</em>:
public void addProductToCart(String productId, String cartId) {
Thread.startVirtualThread(() -> computePriceAndPublishMessage(productId, cartId));
}
private void computePriceAndPublishMessage(String productId, String cartId) {
// ...
}或者,我們還可以創建一個ExecutorService,該ExecutorService依賴於新的靜態工廠方法Executors.newVirtualThreadPerTaskExecutor()。Furthermore, for applications using Spring Framework 6 and Spring Boot 3, we can leverage the new Executor and configure Spring to favor virtual threads over platform threads.
4.2. 兼容性
虛擬線程通過使用更傳統的同步編程模型,簡化了代碼。因此,我們可以像進行阻塞 I/O 操作一樣,以順序的方式編寫代碼,而無需擔心顯式的反應式構造。
此外,我們可以無縫地從常規單線程代碼切換到虛擬線程,只需少量或無需修改。 例如,在我們之前的示例中,我們只需要使用靜態工廠方法 startVirtualThread() 創建虛擬線程並執行其中的邏輯:
void addProductToCart(String productId, String cartId) {
Thread.startVirtualThread(() -> computePriceAndPublishMessage(productId, cartId));
}
void computePriceAndPublishMessage(String productId, String cartId) {
Product product = repository.findById(productId)
.orElseThrow(() -> new IllegalArgumentException("not found!"));
Price price = computePrice(productId, product);
var event = new ProductAddedToCartEvent(productId, price.value(), price.currency(), cartId);
kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event);
}
Price computePrice(String productId, Product product) {
if (product.category().isEligibleForDiscount()) {
BigDecimal discount = discountService.discountForProduct(productId);
return product.basePrice().applyDiscount(discount);
}
return product.basePrice();
}4.3. 可讀性
採用線程池單請求處理模型,可以更容易地理解和推理業務邏輯。這有助於降低與響應式編程範式相關的認知負擔。
換句話説,虛擬線程允許我們清晰地將技術關注點與業務邏輯分離。從而消除了在實現業務用例時需要外部 API 的需求。
5. 結論
在本文中,我們比較了兩種不同的併發和異步處理方法。我們首先分析了項目 Reactor 的 WebFlux 以及反應式編程範式。我們發現這種方法傾向於使用不可變對象和函數式流水線。
隨後,我們討論了虛擬線程及其與遺留代碼庫的卓越兼容性,這使得平滑過渡到非阻塞代碼成為可能。此外,它們還具有將業務邏輯與基礎設施代碼和其他技術問題明確分離的額外優勢。