1. 引言
在本教程中,我們將探討使用 Spring 5 WebClient 限制每秒請求數量的不同方法。
雖然我們通常希望充分利用其非阻塞特性,但在某些情況下,我們可能需要添加延遲。 我們將通過使用一些 Project Reactor 功能來控制對服務器的請求流,瞭解這些場景。
2. 初步設置
在避免服務器過載的情況下,我們通常需要限制每秒請求數量。 此外,某些Web服務允許每小時的最大請求數量。 類似地,它們還控制着每個客户端的併發請求數量。
2.1. 創建一個簡單的 Web 服務
為了探索這個場景,我們將從一個簡單的 @RestController 開始,它將從一個固定的範圍內提供隨機數:
@RestController
@RequestMapping("/random")
public class RandomController {
@GetMapping
Integer getRandom() {
return new Random().nextInt(50));
}
}
接下來,我們將模擬一個昂貴的運算並限制併發請求的數量。
2.2. 限制服務器請求速率
在查看解決方案之前,我們先修改服務以模擬更現實的場景。
首先,我們將限制服務器可以處理的併發請求數量,當達到限制時,拋出異常。
其次,我們將添加延遲來處理響應,模擬耗時操作。雖然有更健壯的解決方案可用,但我們這樣做僅用於説明目的。
public class Concurrency {
public static final int MAX_CONCURRENT = 5;
static final Map<String, AtomicInteger> CONCURRENT_REQUESTS = new HashMap<>();
public static int protect(IntSupplier supplier) {
try {
if (CONCURRENT_REQUESTS.incrementAndGet() > MAX_CONCURRENT) {
throw new UnsupportedOperationException("max concurrent requests reached");
}
TimeUnit.SECONDS.sleep(2);
return supplier.getAsInt();
} finally {
CONCURRENT_REQUESTS.decrementAndGet();
}
}
}最後,讓我們更改端點以使用它:
@GetMapping
Integer getRandom() {
return Concurrency.protect(() -> new Random().nextInt(50));
}現在,我們的端點在請求數量超過 MAX_CONCURRENT 時拒絕處理請求,並向客户端返回錯誤。
2.3. 編寫一個簡單的客户端
所有示例都將遵循此模式,以生成一個 Flux 請求流並向我們的服務發出 GET 請求:
Flux.range(1, n)
.flatMap(i -> {
// GET request
});為了減少冗餘代碼,我們將在一個可重用的方法中實現請求部分,該方法將在所有示例中使用。我們將接收一個 WebClient,調用 get(),並使用 ParameterizedTypeReference> 通過泛型檢索響應體。
public interface RandomConsumer {
static <T> Mono<T> get(WebClient client) {
return client.get()
.retrieve()
.bodyToMono(new ParameterizedTypeReference<T>() {});
}
}
現在我們準備好看看不同的方法。
3. 使用 zipWith(Flux.interval()) 延遲
我們的第一個示例結合了我們的請求和固定的延遲,使用了 zipWith():
public class ZipWithInterval {
public static Flux<Integer> fetch(
WebClient client, int requests, int delay) {
return Flux.range(1, requests)
.zipWith(Flux.interval(Duration.ofMillis(delay)))
.flatMap(i -> RandomConsumer.get(client));
}
}
因此,這會使每個請求延遲 毫秒。請注意,此延遲在發送請求之前生效。
4. 使用 Flux.delayElements() 延遲元素
Flux 提供了一種更直接的方式來延遲其元素:
public class DelayElements {
public static Flux<Integer> fetch(
WebClient client, int requests, int delay) {
return Flux.range(1, requests)
.delayElements(Duration.ofMillis(delay))
.flatMap(i -> RandomConsumer.get(client));
}
}
使用 delayElements() 時,延遲直接應用於 Subscriber.onNext() 信號。換句話説,它會延遲 Flux.range() 中的每個元素。因此,傳遞給 flatMap() 的函數將受到影響,啓動時間會更長。例如,如果 delay 的值為 1000,則在我們的請求開始之前,會有一個一秒的延遲。
4.1. 調整我們的解決方案
由此可見,如果我們沒有提供足夠長的延遲時間,將會出現錯誤:
@Test
void givenSmallDelay_whenDelayElements_thenExceptionThrown() {
int delay = 100;
int requests = 10;
assertThrows(InternalServerError.class, () -> {
DelayElements.fetch(client, requests, delay)
.blockLast();
});
}這是因為我們每請求等待 100 毫秒,但每個請求在服務器端完成需要兩秒。因此,我們的併發請求限制很快被達到,我們收到一個 500 錯誤。
我們如果添加足夠的延遲,可以規避請求限制。但是,那樣我們又會等待比必要時間更長。
根據我們的使用場景,等待時間過長可能會顯著影響性能。因此,接下來我們應該檢查一種更合適的處理方式,因為我們知道服務器的限制。
5. 使用 flatMap() 進行併發控制
鑑於我們服務的限制,我們的最佳方案是在最多 Concurrency.MAX_CONCURRENT 個請求並行發送。為此,我們可以將一個額外的參數添加到 <em >flatMap()</em> 中,用於指定最大並行處理數量:
public class LimitConcurrency {
public static Flux<Integer> fetch(
WebClient client, int requests, int concurrency) {
return Flux.range(1, requests)
.flatMap(i -> RandomConsumer.get(client), concurrency);
}
}
此參數確保同時請求的最大數量不超過併發,並且我們的處理不會因不必要的原因而延遲:
@Test
void givenLimitInsideServerRange_whenLimitedConcurrency_thenNoExceptionThrown() {
int limit = Concurrency.MAX_CONCURRENT;
int requests = 10;
assertDoesNotThrow(() -> {
LimitConcurrency.fetch(client, TOTAL_REQUESTS, limit)
.blockLast();
});
}當然,以下是翻譯後的內容:
不過,還有一些其他選項值得我們探討,具體取決於我們的場景和偏好。我們來詳細瞭解一下。
6. 使用 Resilience4j 的 RateLimiter
Resilience4j 是一款功能強大的庫,旨在處理應用程序中的容錯性問題。 我們將使用它來限制在指定時間間隔內的併發請求,幷包含超時設置。
讓我們首先添加 resilience4j-reactor 和 resilience4j-ratelimiter 依賴項:
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-reactor</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-ratelimiter</artifactId>
<version>1.7.1</version>
</dependency>然後,我們使用 RateLimiter.of()</em/> 構建速率限制器,提供名稱、發送新請求的間隔、併發限制和超時時間:
public class Resilience4jRateLimit {
public static Flux<Integer> fetch(
WebClient client, int requests, int concurrency, int interval) {
RateLimiter limiter = RateLimiter.of("my-rate-limiter", RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofMillis(interval))
.limitForPeriod(concurrency)
.timeoutDuration(Duration.ofMillis(interval * concurrency))
.build());
// ...
}
}現在我們將其包含在我們的 Flux 中,使用 transformDeferred(),從而控制我們的 GET 請求速率:
return Flux.range(1, requests)
.flatMap(i -> RandomConsumer.get(client)
.transformDeferred(RateLimiterOperator.of(limiter))
);我們應該注意到,如果我們將時間間隔定義得過低,仍然可能存在問題。但是,這種方法在我們需要與其它操作共享速率限制規範時非常有用。
7. 精確的限速與 Guava
Guava 具有通用限速器,非常適合我們的場景。 此外,由於它使用令牌桶算法,因此只有在必要時才會阻止,而不是像 Flux.delayElements() 那樣每次都阻止。
首先,我們需要將 Guava 添加到我們的 pom.xml 中:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.2.1-jre</version>
</dependency>為了使用它,我們調用 <em >RateLimiter.create()</em > 並傳入我們希望發送的最大請求數每秒鐘。然後,在發送請求時,我們調用limiter的acquire()方法,以便在必要時進行請求限流:acquire()``。
public class GuavaRateLimit {
public static Flux<Integer> fetch(
WebClient client, int requests, int requestsPerSecond) {
RateLimiter limiter = RateLimiter.create(requestsPerSecond);
return Flux.range(1, requests)
.flatMap(i -> {
limiter.acquire();
return RandomConsumer.get(client);
});
}
}
由於其簡潔性,該解決方案效果卓越——它不會使我們的代碼塊變得過長。例如,如果,出於某種原因,某個請求花費的時間超過預期,下一個請求將不會等待執行。但是,這種情況僅在我們在 requestsPerSecond 的範圍內時才會發生。
8. 結論
在本文中,我們探討了幾種用於限制 WebClient 速率的方法。隨後,我們模擬了一個受控的 Web 服務,以觀察其對我們代碼和測試的影響。此外,我們利用 Project Reactor 和一些庫,以不同的方式實現了相同的目標。