1. 引言
本文將探討開發人員在使用 Spring Webflux 時常見的錯誤。 Spring Webflux 是一個從零開始構建的非阻塞 Web 框架,旨在充分利用多核、下一代處理器和處理大規模併發連接。
由於它是非阻塞框架,因此線程不應被阻塞。讓我們更詳細地探討一下。
2. Spring Webflux 併發模型
為了更好地理解這個問題,我們需要了解 Spring Webflux 的併發模型。
在 Spring Webflux 中,一個小的工線程池處理傳入的請求。這與 Servlet 模型不同,Servlet 模型中每個請求都會分配一個單獨的線程。因此,框架會保護請求接受線程上的操作。
有了這個理解,讓我們深入探討本文的主要內容。
3. 理解 IllegalStateException 與線程阻塞
讓我們通過一個示例來理解在 Spring Webflux 中出現錯誤“java.lang.IllegalStateException: block()/blockFirst()/blockLast() 在線程中是阻塞的,不支持”的原因。
讓我們以一個文件搜索 API 為例。該 API 從文件系統讀取文件,並在文件中搜索用户提供的文本。
3.1. 文件服務
讓我們首先定義一個 FileService 類,該類將文件的內容讀取為字符串:
@Service
public class FileService {
@Value("${files.base.dir:/tmp/bael-7724}")
private String filesBaseDir;
public Mono<String> getFileContentAsString(String fileName) {
return DataBufferUtils.read(Paths.get(filesBaseDir + "/" + fileName), DefaultDataBufferFactory.sharedInstance, DefaultDataBufferFactory.DEFAULT_INITIAL_CAPACITY)
.map(dataBuffer -> dataBuffer.toString(StandardCharsets.UTF_8))
.reduceWith(StringBuilder::new, StringBuilder::append)
.map(StringBuilder::toString);
}
}
值得注意的是,<em >FileService</em> 會異步地(反應式地)從文件系統讀取文件。
3.2. 文件內容搜索服務
我們準備利用這個 FileService 來編寫一個文件搜索服務:
@Service
public class FileContentSearchService {
@Autowired
private FileService fileService;
public Mono<Boolean> blockingSearch(String fileName, String searchTerm) {
String fileContent = fileService
.getFileContentAsString(fileName)
.doOnNext(content -> ThreadLogger.log("1. BlockingSearch"))
.block();
boolean isSearchTermPresent = fileContent.contains(searchTerm);
return Mono.just(isSearchTermPresent);
}
}文件搜索服務返回一個 boolean 值,取決於搜索詞是否在文件中找到。為此,我們調用 FileService 中的 getFileContentAsString() 方法。由於我們獲得的結果是異步的,即為 Mono<String>,因此我們調用 block() 以獲取 String 值。之後,我們檢查 fileContent 是否包含 searchTerm。最後,我們將結果包裝並返回為 Mono。
3.3. 文件控制器
最後,我們得到了 FileController,它利用了 FileContentSearchService 的 blockingSearch() 方法:
@RestController
@RequestMapping("bael7724/v1/files")
public class FileController {
...
@GetMapping(value = "/{name}/blocking-search")
Mono<Boolean> blockingSearch(@PathVariable("name") String fileName, @RequestParam String term) {
return fileContentSearchService.blockingSearch(fileName, term);
}
}3.4. 重新觸發異常
我們可以觀察到,Controller 調用了 FileContentSearchService 的方法,後者又調用了 block() 方法。由於這發生在請求-接受線程上,如果我們按照目前的配置調用我們的 API,我們將會遇到我們所要追尋的臭名昭著的異常:
12:28:51.610 [reactor-http-epoll-2] ERROR o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler - [ea98e542-1] 500 Server Error for HTTP GET "/bael7724/v1/files/a/blocking-search?term=a"
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-2
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ com.baeldung.filters.TraceWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ com.baeldung.filters.ExceptionalTraceFilter [DefaultWebFilterChain]
*__checkpoint ⇢ HTTP GET "/bael7724/v1/files/a/blocking-search?term=a" [ExceptionHandlingWebHandler]
Original Stack Trace:
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
at reactor.core.publisher.Mono.block(Mono.java:1712)
at com.baeldung.bael7724.service.FileContentSearchService.blockingSearch(FileContentSearchService.java:20)
at com.baeldung.bael7724.controller.FileController.blockingSearch(FileController.java:35)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)3.5. 根本原因
該異常的根本原因是於接收請求的線程上調用 block()。 在我們上面的示例代碼中,block() 方法被調用於線程池中的一個線程,該線程接收請求。 具體來説,在標記為“僅限非阻塞操作”的線程上,即實現 Reactor 的 NonBlocking 標記接口的線程,例如由 Schedulers.parallel() 啓動的線程。
4. 解決方案
現在讓我們看看如何解決此異常。
4.1. 擁抱反應式操作
採用反應式操作而非調用 <em >block()</em> 是一種慣用方法。讓我們修改代碼,利用 <i >map()</i> 操作將 <em >String</i> 轉換為 <em >Boolean</i>:
public Mono<Boolean> nonBlockingSearch(String fileName, String searchTerm) {
return fileService.getFileContentAsString(fileName)
.doOnNext(content -> ThreadLogger.log("1. NonBlockingSearch"))
.map(content -> content.contains(searchTerm))
.doOnNext(content -> ThreadLogger.log("2. NonBlockingSearch"));
}我們因此消除了調用 block() 的需求。當我們運行上述方法時,我們注意到以下線程上下文:
[1. NonBlockingSearch] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506215299Z
[2. NonBlockingSearch] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506361786Z
[1. In Controller] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506465805Z
[2. In Controller] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506543145Z上述日誌表明我們正在同一線程池上執行操作,該線程池正在接受請求。
值得注意的是,即使我們沒有遇到異常,但最好在不同的線程池上執行 I/O 操作,例如從文件讀取。
4.2. 等待限時彈性線程池
假設我們出於某種原因無法使用 block(),那麼該如何處理?我們得出結論,在請求接受線程池上調用 block() 時會發生異常。因此,為了調用 block(),我們需要切換線程池。下面我們來看如何做到這一點:
public Mono<Boolean> workableBlockingSearch(String fileName, String searchTerm) {
return Mono.just("")
.doOnNext(s -> ThreadLogger.log("1. WorkableBlockingSearch"))
.publishOn(Schedulers.boundedElastic())
.doOnNext(s -> ThreadLogger.log("2. WorkableBlockingSearch"))
.map(s -> fileService.getFileContentAsString(fileName)
.block()
.contains(searchTerm))
.doOnNext(s -> ThreadLogger.log("3. WorkableBlockingSearch"));
}為了切換線程池,Spring Webflux 提供兩個操作 publishOn() 和 subscribeOn() 。我們使用了 publishOn() ,它會更改後續操作的線程,而不會影響訂閲或上游操作。由於線程池現在已切換到有限的彈性池,因此我們可以調用 block() 。
現在,如果運行 workableBlockingSearch() 方法,將會得到以下線程上下文:
[1. WorkableBlockingSearch] ThreadName: parallel-2, Time: 2024-06-17T07:40:59.440562518Z
[2. WorkableBlockingSearch] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.442161018Z
[3. WorkableBlockingSearch] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.442891230Z
[1. In Controller] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.443058091Z
[2. In Controller] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.443181770Z我們可以看出,從第 2 號開始,操作確實發生在有限彈性和線程池上,因此我們沒有收到 IllegalStateException 異常。
4.3. 限制條件
讓我們分析一下該解決方案的限制條件。
即使我們使用 block() 函數時,仍然可能出現多種錯誤情況。 舉例來説,即使我們使用 Scheduler 來切換線程上下文,也可能不會按照預期的方式運行:
public Mono<Boolean> incorrectUseOfSchedulersSearch(String fileName, String searchTerm) {
String fileContent = fileService.getFileContentAsString(fileName)
.doOnNext(content -> ThreadLogger.log("1. IncorrectUseOfSchedulersSearch"))
.publishOn(Schedulers.boundedElastic())
.doOnNext(content -> ThreadLogger.log("2. IncorrectUseOfSchedulersSearch"))
.block();
boolean isSearchTermPresent = fileContent.contains(searchTerm);
return Mono.just(isSearchTermPresent);
}在上述代碼示例中,我們使用了 publishOn(),如解決方案中所建議的,但 block() 方法仍然導致異常。當我們運行上述代碼時,我們會得到以下日誌:
[1. IncorrectUseOfSchedulersSearch] ThreadName: Thread-4, Time: 2024-06-17T08:57:02.490298417Z
[2. IncorrectUseOfSchedulersSearch] ThreadName: boundedElastic-1, Time: 2024-06-17T08:57:02.491870410Z
14:27:02.495 [parallel-1] ERROR o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler - [53e4bce1] 500 Server Error for HTTP GET "/bael7724/v1/files/robots.txt/incorrect-use-of-schedulers-search?term=r-"
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ com.baeldung.filters.TraceWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ com.baeldung.filters.ExceptionalTraceFilter [DefaultWebFilterChain]
*__checkpoint ⇢ HTTP GET "/bael7724/v1/files/robots.txt/incorrect-use-of-schedulers-search?term=r-" [ExceptionHandlingWebHandler]
Original Stack Trace:
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
at reactor.core.publisher.Mono.block(Mono.java:1712)
at com.baeldung.bael7724.service.FileContentSearchService.incorrectUseOfSchedulersSearch(FileContentSearchService.java:64)
at com.baeldung.bael7724.controller.FileController.incorrectUseOfSchedulersSearch(FileController.java:48)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)這表明第二個日誌語句確實在受限彈性線程池中執行了。但是,我們仍然遇到了異常。原因是 block() 仍然在相同的請求接受線程池中運行。
下面再來看一個注意事項。即使我們切換了線程池,也不能使用並行線程池,即 Schedulers.parallel()。正如前面提到的,某些線程池不允許在它們的線程上調用 block() ,並行線程池就是其中之一。
最後,我們在示例中僅使用了 Schedulers.boundedElastic() 。相反,我們也可以通過 Schedulers.fromExecutorService() 使用任何自定義線程池。
5. 結論
綜上所述,要有效地解決在 Spring Webflux 中使用阻塞操作(如 <em >block()</em>)時出現 <em >IllegalStateException</em> 的問題,我們應該採用非阻塞、響應式的方法。 通過利用響應式操作符,如 <em >map()</em>,可以在同一響應式線程池中執行操作,從而避免顯式使用 <em >block()</em> 的需要。 如果無法避免使用 <em >block()</em>,則可以通過使用 <em >publishOn()</em> 將執行上下文切換到 <em >boundedElastic</em> 調度器或自定義線程池,從而將這些操作與響應式請求-接受線程池隔離,從而防止出現異常。
必須意識到不支持阻塞調用且線程池的特性,並確保正確應用執行上下文切換,以保持應用程序的穩定性和性能。