知識庫 / Spring / Spring Boot RSS 訂閱

Spring Boot 中 RSocket 應用開發

Reactive,Spring Boot
HongKong
23
01:22 PM · Dec 06 ,2025

1. 概述

RSocket 是一種應用程序協議,它提供 Reactive Streams 語義——例如,它作為 HTTP 的一種替代方案而運作。

在本教程中,我們將使用 Spring Boot 學習 RSocket,並重點關注它如何抽象出低級別的 RSocket API。

2. 依賴項

讓我們首先添加 spring-boot-starter-rsocket 依賴項:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

這將遞歸地引入與RSocket相關的依賴項,例如 rsocket-corersocket-transport-netty.

3. 示例應用程序

現在我們將繼續使用我們的示例應用程序。為了突出 RSocket 提供的交互模型,我們將創建一個交易員應用程序。我們的交易員應用程序將包含客户端和服務器。

3.1. 服務器設置

首先,我們需要設置服務器,該服務器將是一個 Spring Boot 應用,它將引導一個 RSocket 服務器。

由於我們有 spring-boot-starter-rsocket 依賴項,Spring Boot 會自動配置一個 RSocket 服務器。 就像使用 Spring Boot 一樣,我們可以通過驅動的方式更改 RSocket 服務器的默認配置值。

例如,我們可以通過在 application.properties 文件中添加以下行來更改 RSocket 服務器的端口:

spring.rsocket.server.port=7000

我們還可以更改 其他屬性,以便根據我們的需求進一步修改我們的服務器。

3.2. 客户端設置

接下來,我們將設置客户端,該客户端也將是一個 Spring Boot 應用程序。

儘管 Spring Boot 自動配置了大部分 RSocket 相關組件,但我們仍然應該定義一些 Bean 以完成設置:

@Configuration
public class ClientConfiguration {

    @Bean
    public RSocketRequester getRSocketRequester(){
        RSocketRequester.Builder builder = RSocketRequester.builder();

        return builder
          .rsocketConnector(
             rSocketConnector ->
               rSocketConnector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2)))
          )
          .dataMimeType(MimeTypeUtils.APPLICATION_JSON)
          .tcp("localhost", 7000);
    }
}

我們正在創建 RSocket 客户端並將其配置為在 7000 端口上使用 TCP 傳輸。請注意,這與我們之前配置的服務器端口相同。

定義完此 Bean 配置後,我們擁有一個基本結構。

接下來,我們將探索不同的交互模型,並瞭解 Spring Boot 在其中如何發揮作用。

4. 使用RSocket和Spring Boot進行請求/響應

讓我們從請求/響應開始。 這可能是最常見和熟悉的交互模型,因為HTTP也採用這種類型的通信。

在這一交互模型中,客户端發起通信併發送請求。隨後,服務器執行操作並返回響應給客户端——因此通信完成。

在我們的Trader應用程序中,客户端會請求給定股票的當前市場數據。服務器會返回這些請求的數據。

4.1. 服務器

在服務器端,我們首先應該創建一個控制器來存放我們的處理方法。儘管我們不使用像 Spring MVC 中那樣 @RequestMapping@GetMapping 註解,而是使用 @MessageMapping 註解。

@Controller
public class MarketDataRSocketController {

    private final MarketDataRepository marketDataRepository;

    public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
        this.marketDataRepository = marketDataRepository;
    }

    @MessageMapping("currentMarketData")
    public Mono<MarketData> currentMarketData(MarketDataRequest marketDataRequest) {
        return marketDataRepository.getOne(marketDataRequest.getStock());
    }
}

讓我們來調查一下我們的控制器。

我們使用 @Controller 註解來定義一個處理傳入 RSocket 請求的處理程序。 此外, @MessageMapping 註解允許我們定義我們感興趣的路由以及在收到請求時如何響應。

在這種情況下,服務器監聽 currentMarketData 路由,該路由返回一個單一結果給客户端,該結果為 Mono<MarketData>

4.2. 客户端

接下來,我們的 RSocket 客户端應請求股票的當前價格並獲得一個單一響應。

要發起請求,我們應該使用 RSocketRequester 類:

@RestController
public class MarketDataRestController {

    private final RSocketRequester rSocketRequester;

    public MarketDataRestController(RSocketRequester rSocketRequester) {
        this.rSocketRequester = rSocketRequester;
    }

    @GetMapping(value = "/current/{stock}")
    public Publisher<MarketData> current(@PathVariable("stock") String stock) {
        return rSocketRequester
          .route("currentMarketData")
          .data(new MarketDataRequest(stock))
          .retrieveMono(MarketData.class);
    }
}

請注意,在我們的案例中,RSocket 客户端也是一個 REST 控制器,我們從中調用 RSocket 服務器。因此,我們使用 @RestController@GetMapping 來定義請求/響應端點。

在端點方法中,我們使用 RSocketRequester 並指定路由。實際上,這正是 RSocket 服務器期望的路由。然後我們傳遞請求數據。最後,當調用 retrieveMono() 方法時,Spring Boot 會啓動請求/響應交互。

5. 使用 RSocket 和 Spring Boot 實現“忘卻式”交互

接下來,我們將探討“忘卻式”交互模型。正如其名稱所示,客户端向服務器發送請求,但不期望收到響應。

在我們的交易應用程序中,某些客户端將充當數據源,並將市場數據推送到服務器。

5.1. 服務器

讓我們為我們的服務器應用程序創建一個新的端點:

@MessageMapping("collectMarketData")
public Mono<Void> collectMarketData(MarketData marketData) {
    marketDataRepository.add(marketData);
    return Mono.empty();
}

再次強調,我們正在定義一個新的 @MessageMapping,其路由值為 collectMarketData。 此外,Spring Boot 會自動將傳入的 payload 轉換為 MarketData 實例。

這裏的主要區別在於,我們返回一個 Mono<Void>,因為客户端不需要從我們這裏獲得響應。

5.2. 客户端

讓我們看看如何啓動我們的“一次性執行”請求。

我們將創建一個新的 REST 端點:

@GetMapping(value = "/collect")
public Publisher<Void> collect() {
    return rSocketRequester
      .route("collectMarketData")
      .data(getMarketData())
      .send();
}

在這裏,我們指定了我們的路由以及我們的報負載載是 MarketData 實例。 由於我們使用 send() 方法發起請求,而不是 retrieveMono(),因此交互模型變為“發送即忘”模式。

6. 使用RSocket和Spring Boot進行請求流

請求流是一種更復雜的交互模型,客户端發送一個請求,但會在一段時間內從服務器收到多個響應。

為了模擬這種交互模型,客户端會請求給定股票的所有市場數據。

6.1. 服務器

讓我們從我們的服務器開始。我們將添加另一個消息映射方法:

@MessageMapping("feedMarketData")
public Flux<MarketData> feedMarketData(MarketDataRequest marketDataRequest) {
    return marketDataRepository.getAll(marketDataRequest.getStock());
}

我們能看到,這個處理方法與其他的非常相似。主要的區別在於,我們返回一個 Flux</em> 而不是一個 Mono</em>。最終,我們的 RSocket 服務器會將多個響應發送給客户端。

6.2. 客户端

在客户端,我們應該創建一個端點來啓動我們的請求/流通信:

@GetMapping(value = "/feed/{stock}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<MarketData> feed(@PathVariable("stock") String stock) {
    return rSocketRequester
      .route("feedMarketData")
      .data(new MarketDataRequest(stock))
      .retrieveFlux(MarketData.class);
}

讓我們來調查我們的RSocket請求。

首先,我們定義了路由和請求負載。然後,我們使用 retrieveFlux() 方法調用來定義響應期望。 這部分決定了交互模型。

請注意,由於我們的客户端也是一個REST服務器,因此它定義了響應媒體類型為 MediaType.TEXT_EVENT_STREAM_VALUE.

7. 異常處理

現在讓我們看看如何在服務器應用程序中以聲明式的方式處理異常。

當進行請求/響應時,我們可以簡單地使用 @MessageExceptionHandler 註解:

@MessageExceptionHandler
public Mono<MarketData> handleException(Exception e) {
    return Mono.just(MarketData.fromException(e));
}

我們已經為異常處理方法添加了 @MessageExceptionHandler 註解。 這樣,它將處理所有類型的異常,因為 Exception 類是所有其他類的超類。

我們可以更具體,為不同異常類型創建不同的異常處理方法。

這當然是針對請求/響應模型的,因此我們返回的是 Mono<MarketData>我們希望此處返回類型與交互模型返回類型匹配。

8. 概述

在本教程中,我們介紹了 Spring Boot 對 RSocket 的支持,並詳細講解了 RSocket 提供的不同交互模型。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.