1. 概述
RSocket 是一種應用程序協議,它提供 Reactive Streams 語義——例如,它作為 HTTP 的一種替代方案而運作。
在本教程中,我們將使用 Spring Boot 學習 RSocket,並重點關注它如何抽象出低級別的 RSocket API。
2. 依賴項
讓我們首先添加 spring-boot-starter-rsocket 依賴項:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>這將遞歸地引入與RSocket相關的依賴項,例如 rsocket-core 和 rsocket-transport-netty.
3. 示例應用程序
現在我們將繼續使用我們的示例應用程序。為了突出 RSocket 提供的交互模型,我們將創建一個交易員應用程序。我們的交易員應用程序將包含客户端和服務器。
3.1. 服務器設置
首先,我們需要設置服務器,該服務器將是一個 Spring Boot 應用,它將引導一個 RSocket 服務器。
由於我們有 spring-boot-starter-rsocket 依賴項,Spring Boot 會自動配置一個 RSocket 服務器。 就像使用 Spring Boot 一樣,我們可以通過驅動的方式更改 RSocket 服務器的默認配置值。
例如,我們可以通過在 application.properties 文件中添加以下行來更改 RSocket 服務器的端口:
spring.rsocket.server.port=7000我們還可以更改 其他屬性,以便根據我們的需求進一步修改我們的服務器。
3.2. 客户端設置
接下來,我們將設置客户端,該客户端也將是一個 Spring Boot 應用程序。
儘管 Spring Boot 自動配置了大部分 RSocket 相關組件,但我們仍然應該定義一些 Bean 以完成設置:
@Configuration
public class ClientConfiguration {
@Bean
public RSocketRequester getRSocketRequester(){
RSocketRequester.Builder builder = RSocketRequester.builder();
return builder
.rsocketConnector(
rSocketConnector ->
rSocketConnector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2)))
)
.dataMimeType(MimeTypeUtils.APPLICATION_JSON)
.tcp("localhost", 7000);
}
}我們正在創建 RSocket 客户端並將其配置為在 7000 端口上使用 TCP 傳輸。請注意,這與我們之前配置的服務器端口相同。
定義完此 Bean 配置後,我們擁有一個基本結構。
接下來,我們將探索不同的交互模型,並瞭解 Spring Boot 在其中如何發揮作用。
4. 使用RSocket和Spring Boot進行請求/響應
讓我們從請求/響應開始。 這可能是最常見和熟悉的交互模型,因為HTTP也採用這種類型的通信。
在這一交互模型中,客户端發起通信併發送請求。隨後,服務器執行操作並返回響應給客户端——因此通信完成。
在我們的Trader應用程序中,客户端會請求給定股票的當前市場數據。服務器會返回這些請求的數據。
4.1. 服務器
在服務器端,我們首先應該創建一個控制器來存放我們的處理方法。儘管我們不使用像 Spring MVC 中那樣 @RequestMapping 或 @GetMapping 註解,而是使用 @MessageMapping 註解。
@Controller
public class MarketDataRSocketController {
private final MarketDataRepository marketDataRepository;
public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
this.marketDataRepository = marketDataRepository;
}
@MessageMapping("currentMarketData")
public Mono<MarketData> currentMarketData(MarketDataRequest marketDataRequest) {
return marketDataRepository.getOne(marketDataRequest.getStock());
}
}讓我們來調查一下我們的控制器。
我們使用 @Controller 註解來定義一個處理傳入 RSocket 請求的處理程序。 此外, @MessageMapping 註解允許我們定義我們感興趣的路由以及在收到請求時如何響應。
在這種情況下,服務器監聽 currentMarketData 路由,該路由返回一個單一結果給客户端,該結果為 Mono<MarketData>。
4.2. 客户端
接下來,我們的 RSocket 客户端應請求股票的當前價格並獲得一個單一響應。
要發起請求,我們應該使用 RSocketRequester 類:
@RestController
public class MarketDataRestController {
private final RSocketRequester rSocketRequester;
public MarketDataRestController(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
@GetMapping(value = "/current/{stock}")
public Publisher<MarketData> current(@PathVariable("stock") String stock) {
return rSocketRequester
.route("currentMarketData")
.data(new MarketDataRequest(stock))
.retrieveMono(MarketData.class);
}
}請注意,在我們的案例中,RSocket 客户端也是一個 REST 控制器,我們從中調用 RSocket 服務器。因此,我們使用 @RestController 和 @GetMapping 來定義請求/響應端點。
在端點方法中,我們使用 RSocketRequester 並指定路由。實際上,這正是 RSocket 服務器期望的路由。然後我們傳遞請求數據。最後,當調用 retrieveMono() 方法時,Spring Boot 會啓動請求/響應交互。
5. 使用 RSocket 和 Spring Boot 實現“忘卻式”交互
接下來,我們將探討“忘卻式”交互模型。正如其名稱所示,客户端向服務器發送請求,但不期望收到響應。
在我們的交易應用程序中,某些客户端將充當數據源,並將市場數據推送到服務器。
5.1. 服務器
讓我們為我們的服務器應用程序創建一個新的端點:
@MessageMapping("collectMarketData")
public Mono<Void> collectMarketData(MarketData marketData) {
marketDataRepository.add(marketData);
return Mono.empty();
}再次強調,我們正在定義一個新的 @MessageMapping,其路由值為 collectMarketData。 此外,Spring Boot 會自動將傳入的 payload 轉換為 MarketData 實例。
這裏的主要區別在於,我們返回一個 Mono<Void>,因為客户端不需要從我們這裏獲得響應。
5.2. 客户端
讓我們看看如何啓動我們的“一次性執行”請求。
我們將創建一個新的 REST 端點:
@GetMapping(value = "/collect")
public Publisher<Void> collect() {
return rSocketRequester
.route("collectMarketData")
.data(getMarketData())
.send();
}在這裏,我們指定了我們的路由以及我們的報負載載是 MarketData 實例。 由於我們使用 send() 方法發起請求,而不是 retrieveMono(),因此交互模型變為“發送即忘”模式。
6. 使用RSocket和Spring Boot進行請求流
請求流是一種更復雜的交互模型,客户端發送一個請求,但會在一段時間內從服務器收到多個響應。
為了模擬這種交互模型,客户端會請求給定股票的所有市場數據。
6.1. 服務器
讓我們從我們的服務器開始。我們將添加另一個消息映射方法:
@MessageMapping("feedMarketData")
public Flux<MarketData> feedMarketData(MarketDataRequest marketDataRequest) {
return marketDataRepository.getAll(marketDataRequest.getStock());
}我們能看到,這個處理方法與其他的非常相似。主要的區別在於,我們返回一個 Flux
6.2. 客户端
在客户端,我們應該創建一個端點來啓動我們的請求/流通信:
@GetMapping(value = "/feed/{stock}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<MarketData> feed(@PathVariable("stock") String stock) {
return rSocketRequester
.route("feedMarketData")
.data(new MarketDataRequest(stock))
.retrieveFlux(MarketData.class);
}讓我們來調查我們的RSocket請求。
首先,我們定義了路由和請求負載。然後,我們使用 retrieveFlux() 方法調用來定義響應期望。 這部分決定了交互模型。
請注意,由於我們的客户端也是一個REST服務器,因此它定義了響應媒體類型為 MediaType.TEXT_EVENT_STREAM_VALUE.。
7. 異常處理
現在讓我們看看如何在服務器應用程序中以聲明式的方式處理異常。
當進行請求/響應時,我們可以簡單地使用 @MessageExceptionHandler 註解:
@MessageExceptionHandler
public Mono<MarketData> handleException(Exception e) {
return Mono.just(MarketData.fromException(e));
}我們已經為異常處理方法添加了 @MessageExceptionHandler 註解。 這樣,它將處理所有類型的異常,因為 Exception 類是所有其他類的超類。
我們可以更具體,為不同異常類型創建不同的異常處理方法。
這當然是針對請求/響應模型的,因此我們返回的是 Mono<MarketData>。 我們希望此處返回類型與交互模型返回類型匹配。
8. 概述
在本教程中,我們介紹了 Spring Boot 對 RSocket 的支持,並詳細講解了 RSocket 提供的不同交互模型。