知識庫 / Spring WebFlux RSS 訂閱

限制每秒請求數量(使用 WebClient)

Reactive,Spring WebFlux
HongKong
9
11:52 AM · Dec 06 ,2025

1. 引言

在本教程中,我們將探討使用 Spring 5 WebClient 限制每秒請求數量的不同方法。

雖然我們通常希望充分利用其非阻塞特性,但在某些情況下,我們可能需要添加延遲。 我們將通過使用一些 Project Reactor 功能來控制對服務器的請求流,瞭解這些場景。

2. 初步設置

在避免服務器過載的情況下,我們通常需要限制每秒請求數量。 此外,某些Web服務允許每小時的最大請求數量。 類似地,它們還控制着每個客户端的併發請求數量。

2.1. 創建一個簡單的 Web 服務

為了探索這個場景,我們將從一個簡單的 @RestController 開始,它將從一個固定的範圍內提供隨機數:

@RestController
@RequestMapping("/random")
public class RandomController {

    @GetMapping
    Integer getRandom() {
        return new Random().nextInt(50));
    }
}

接下來,我們將模擬一個昂貴的運算並限制併發請求的數量。

2.2. 限制服務器請求速率

在查看解決方案之前,我們先修改服務以模擬更現實的場景。

首先,我們將限制服務器可以處理的併發請求數量,當達到限制時,拋出異常。

其次,我們將添加延遲來處理響應,模擬耗時操作。雖然有更健壯的解決方案可用,但我們這樣做僅用於説明目的。

public class Concurrency {

    public static final int MAX_CONCURRENT = 5;
    static final Map<String, AtomicInteger> CONCURRENT_REQUESTS = new HashMap<>();

    public static int protect(IntSupplier supplier) {
        try {
            if (CONCURRENT_REQUESTS.incrementAndGet() > MAX_CONCURRENT) {
                throw new UnsupportedOperationException("max concurrent requests reached");
            }

            TimeUnit.SECONDS.sleep(2);
            return supplier.getAsInt();
        } finally {
            CONCURRENT_REQUESTS.decrementAndGet();
        }
    }
}

最後,讓我們更改端點以使用它:

@GetMapping
Integer getRandom() {
    return Concurrency.protect(() -> new Random().nextInt(50));
}

現在,我們的端點在請求數量超過 MAX_CONCURRENT 時拒絕處理請求,並向客户端返回錯誤。

2.3. 編寫一個簡單的客户端

所有示例都將遵循此模式,以生成一個 Flux 請求流並向我們的服務發出 GET 請求:

Flux.range(1, n)
  .flatMap(i -> {
    // GET request
  });

為了減少冗餘代碼,我們將在一個可重用的方法中實現請求部分,該方法將在所有示例中使用。我們將接收一個 WebClient,調用 get(),並使用 ParameterizedTypeReference> 通過泛型檢索響應體。

public interface RandomConsumer {

    static <T> Mono<T> get(WebClient client) {
        return client.get()
          .retrieve()
          .bodyToMono(new ParameterizedTypeReference<T>() {});
    }
}

現在我們準備好看看不同的方法。

3. 使用 zipWith(Flux.interval()) 延遲

我們的第一個示例結合了我們的請求和固定的延遲,使用了 zipWith()

public class ZipWithInterval {

    public static Flux<Integer> fetch(
      WebClient client, int requests, int delay) {
        return Flux.range(1, requests)
          .zipWith(Flux.interval(Duration.ofMillis(delay)))
          .flatMap(i -> RandomConsumer.get(client));
    }
}

因此,這會使每個請求延遲 毫秒。請注意,此延遲在發送請求之前生效。

4. 使用 Flux.delayElements() 延遲元素

Flux 提供了一種更直接的方式來延遲其元素:

public class DelayElements {

    public static Flux<Integer> fetch(
      WebClient client, int requests, int delay) {
        return Flux.range(1, requests)
          .delayElements(Duration.ofMillis(delay))
          .flatMap(i -> RandomConsumer.get(client));
    }
}

使用 delayElements() 時,延遲直接應用於 Subscriber.onNext() 信號。換句話説,它會延遲 Flux.range() 中的每個元素。因此,傳遞給 flatMap() 的函數將受到影響,啓動時間會更長。例如,如果 delay 的值為 1000,則在我們的請求開始之前,會有一個一秒的延遲。

4.1. 調整我們的解決方案

由此可見,如果我們沒有提供足夠長的延遲時間,將會出現錯誤:

@Test
void givenSmallDelay_whenDelayElements_thenExceptionThrown() {
    int delay = 100;

    int requests = 10;
    assertThrows(InternalServerError.class, () -> {
      DelayElements.fetch(client, requests, delay)
        .blockLast();
    });
}

這是因為我們每請求等待 100 毫秒,但每個請求在服務器端完成需要兩秒。因此,我們的併發請求限制很快被達到,我們收到一個 500 錯誤。

我們如果添加足夠的延遲,可以規避請求限制。但是,那樣我們又會等待比必要時間更長。

根據我們的使用場景,等待時間過長可能會顯著影響性能。因此,接下來我們應該檢查一種更合適的處理方式,因為我們知道服務器的限制。

5. 使用 flatMap() 進行併發控制

鑑於我們服務的限制,我們的最佳方案是在最多 Concurrency.MAX_CONCURRENT 個請求並行發送。為此,我們可以將一個額外的參數添加到 <em >flatMap()</em> 中,用於指定最大並行處理數量:

public class LimitConcurrency {

    public static Flux<Integer> fetch(
      WebClient client, int requests, int concurrency) {
        return Flux.range(1, requests)
          .flatMap(i -> RandomConsumer.get(client), concurrency);
    }
}

此參數確保同時請求的最大數量不超過併發,並且我們的處理不會因不必要的原因而延遲:

@Test
void givenLimitInsideServerRange_whenLimitedConcurrency_thenNoExceptionThrown() {
    int limit = Concurrency.MAX_CONCURRENT;

    int requests = 10;
    assertDoesNotThrow(() -> {
      LimitConcurrency.fetch(client, TOTAL_REQUESTS, limit)
        .blockLast();
    });
}

當然,以下是翻譯後的內容:

不過,還有一些其他選項值得我們探討,具體取決於我們的場景和偏好。我們來詳細瞭解一下。

6. 使用 Resilience4j 的 RateLimiter

Resilience4j 是一款功能強大的庫,旨在處理應用程序中的容錯性問題。 我們將使用它來限制在指定時間間隔內的併發請求,幷包含超時設置。

讓我們首先添加 resilience4j-reactorresilience4j-ratelimiter 依賴項:

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-reactor</artifactId>
    <version>1.7.1</version>
</dependency>
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-ratelimiter</artifactId>
    <version>1.7.1</version>
</dependency>

然後,我們使用 RateLimiter.of()</em/> 構建速率限制器,提供名稱、發送新請求的間隔、併發限制和超時時間:

public class Resilience4jRateLimit {

    public static Flux<Integer> fetch(
      WebClient client, int requests, int concurrency, int interval) {
        RateLimiter limiter = RateLimiter.of("my-rate-limiter", RateLimiterConfig.custom()
          .limitRefreshPeriod(Duration.ofMillis(interval))
          .limitForPeriod(concurrency)
          .timeoutDuration(Duration.ofMillis(interval * concurrency))
          .build());

        // ...
    }
}

現在我們將其包含在我們的 Flux 中,使用 transformDeferred(),從而控制我們的 GET 請求速率:

return Flux.range(1, requests)
  .flatMap(i -> RandomConsumer.get(client)
    .transformDeferred(RateLimiterOperator.of(limiter))
  );

我們應該注意到,如果我們將時間間隔定義得過低,仍然可能存在問題。但是,這種方法在我們需要與其它操作共享速率限制規範時非常有用。

7. 精確的限速與 Guava

Guava 具有通用限速器,非常適合我們的場景。 此外,由於它使用令牌桶算法,因此只有在必要時才會阻止,而不是像 Flux.delayElements() 那樣每次都阻止。

首先,我們需要將 Guava 添加到我們的 pom.xml 中:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>33.2.1-jre</version>
</dependency>

為了使用它,我們調用 <em >RateLimiter.create()</em > 並傳入我們希望發送的最大請求數每秒鐘。然後,在發送請求時,我們調用limiteracquire()方法,以便在必要時進行請求限流:acquire()``。

public class GuavaRateLimit {

    public static Flux<Integer> fetch(
      WebClient client, int requests, int requestsPerSecond) {
        RateLimiter limiter = RateLimiter.create(requestsPerSecond);

        return Flux.range(1, requests)
          .flatMap(i -> {
            limiter.acquire();

            return RandomConsumer.get(client);
          });
    }
}

由於其簡潔性,該解決方案效果卓越——它不會使我們的代碼塊變得過長。例如,如果,出於某種原因,某個請求花費的時間超過預期,下一個請求將不會等待執行。但是,這種情況僅在我們在 requestsPerSecond 的範圍內時才會發生。

8. 結論

在本文中,我們探討了幾種用於限制 WebClient 速率的方法。隨後,我們模擬了一個受控的 Web 服務,以觀察其對我們代碼和測試的影響。此外,我們利用 Project Reactor 和一些庫,以不同的方式實現了相同的目標。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.