知識庫 / Spring WebFlux RSS 訂閱

Spring Boot 計劃推送 WebSocket

Spring Boot,Spring WebFlux
HongKong
8
12:43 PM · Dec 06 ,2025

1. 概述

在本教程中,我們將學習如何使用 WebSockets 從服務器向瀏覽器發送定時消息。另一種選擇是使用服務器發送事件 (SSE),但本文檔中不會涉及該技術。

Spring 提供了多種定時任務選項。首先,我們將介紹 <em @Scheduled</em> 註解。然後,我們將提供一個使用 <em <a href="https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#interval-java.time.Duration-">Flux::interval</a></em> 方法(由 Project Reactor 提供)的示例。該庫是 Webflux 應用程序的標準庫,並且可以在任何 Java 項目中作為獨立庫使用。

此外,還存在更高級的機制,例如 Quartz 調度器,但本文檔中不會涉及它們。

2. 一個簡單的聊天應用程序

在上一篇文章中,我們使用 WebSockets 構建了一個聊天應用程序。現在,讓我們通過添加一個新的功能來擴展它:聊天機器人。這些機器人是服務器端組件,它們向瀏覽器推送計劃好的消息。

2.1. Maven 依賴

讓我們首先在 Maven 中設置必要的依賴項。為了構建這個項目,我們的 <em pom.xml</em> 應該包含:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>
<dependency>
    <groupId>com.github.javafaker</groupId>
    <artifactId>javafaker</artifactId>
    <version>1.0.2</version>
</dependency>
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
</dependency>

2.2. JavaFaker 依賴

我們將使用 JavaFaker 庫來生成我們的機器人消息。該庫通常用於生成測試數據。在這裏,我們將向我們的聊天室添加一個名為“Chuck Norris”的訪客。

讓我們查看代碼:

Faker faker = new Faker();
ChuckNorris chuckNorris = faker.chuckNorris();
String messageFromChuck = chuckNorris.fact();

Faker 將提供用於各種數據生成器的工廠方法。我們將使用 ChuckNorris 生成器。調用 chuckNorris.fact() 將顯示來自預定義消息列表中的隨機句子。

2.3. 數據模型

該聊天應用程序使用一個簡單的POJO作為消息包裝器:

public class OutputMessage {

    private String from;
    private String text;
    private String time;

   // standard constructors, getters/setters, equals and hashcode
}

將所有內容整合在一起,下面是一個創建聊天消息的示例:

OutputMessage message = new OutputMessage(
  "Chatbot 1", "Hello there!", new SimpleDateFormat("HH:mm").format(new Date())));

2.4. 客户端

我們的聊天客户端是一個簡單的 HTML 頁面。它使用了 SockJS 客户端STOMP 消息協議。

讓我們看看客户端如何訂閲一個主題:

<html>
<head>
    <script src="./js/sockjs-0.3.4.js"></script>
    <script src="./js/stomp.js"></script>
    <script type="text/javascript">
        // ...
        stompClient = Stomp.over(socket);
	
        stompClient.connect({}, function(frame) {
            // ...
            stompClient.subscribe('/topic/pushmessages', function(messageOutput) {
                showMessageOutput(JSON.parse(messageOutput.body));
            });
        });
        // ...
    </script>
</head>
<!-- ... -->
</html>

首先,我們創建了一個基於 SockJS 協議的 Stomp 客户端。然後,主題訂閲作為服務器和連接客户端之間的通信渠道。

在我們的倉庫中,這段代碼位於 webapp/bots.html。本地運行時,我們通過 http://localhost:8080/bots.html 訪問它。當然,根據應用程序的部署方式,我們需要調整主機和端口。

2.5. 服務器端

我們之前在另一篇文章中瞭解到如何在 Spring 中配置 WebSockets。現在,讓我們稍微修改一下該配置:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // ...
        registry.addEndpoint("/chatwithbots");
        registry.addEndpoint("/chatwithbots").withSockJS();
    }
}

為了發送我們的消息,我們使用實用類 SimpMessagingTemplate。 默認情況下,它作為 Spring Context 中的 @Bean” 存在。 我們可以通過自動配置看到它在 AbstractMessageBrokerConfiguration” 類中是如何聲明的,該類位於 classpath 中。 因此,我們可以將其注入到任何 Spring 組件中。

隨後,我們使用它將消息發佈到主題 /topic/pushmessages

假設我們的類中有一個名為 simpMessagingTemplate 的變量,其中注入了該 Bean。

simpMessagingTemplate.convertAndSend("/topic/pushmessages", 
  new OutputMessage("Chuck Norris", faker.chuckNorris().fact(), time));

正如我們在客户端示例中之前所見,客户端訂閲該主題以處理接收到的消息。

3. 計劃推送消息

在 Spring 生態系統中,我們可以選擇多種計劃消息的方法。如果使用 Spring MVC,<em >@Scheduled</em> 註解作為首選,因為它具有簡潔性。如果使用 Spring Webflux,我們也可以使用 Project Reactor 的 <em >Flux::interval</em> 方法。 我們將分別展示一個示例。

3.1. 配置

我們的聊天機器人將使用 JavaFaker 的 Chuck Norris 生成器。我們將將其配置為 Bean,以便在需要時注入它。

@Configuration
class AppConfig {

    @Bean
    public ChuckNorris chuckNorris() {
        return (new Faker()).chuckNorris();
    }
}

3.2. 使用 @Scheduled 標註

我們的示例機器人是使用 @Scheduled 標註的方法。當它們運行時,它們會將我們的 OutputMessage POJO 通過 WebSocket 使用 SimpMessagingTemplate 發送。

正如其名稱所示,@Scheduled 標註允許方法重複執行。通過它,我們可以使用基於速率的簡單調度或更復雜的“cron”表達式。

讓我們編寫我們的第一個聊天機器人:

@Service
public class ScheduledPushMessages {

    @Scheduled(fixedRate = 5000)
    public void sendMessage(SimpMessagingTemplate simpMessagingTemplate, ChuckNorris chuckNorris) {
        String time = new SimpleDateFormat("HH:mm").format(new Date());
        simpMessagingTemplate.convertAndSend("/topic/pushmessages", 
          new OutputMessage("Chuck Norris (@Scheduled)", chuckNorris().fact(), time));
    }
    
}

我們對 sendMessage 方法進行了標註,使用了 @Scheduled(fixedRate = 5000)。 這使得 sendMessage 運行間隔為五秒。 然後,我們使用 simpMessagingTemplate 實例向主題發送 OutputMessagesimpMessagingTemplatechuckNorris 實例作為方法參數從 Spring 上下文中注入。

3.3. 使用 <em >Flux::interval()</em >

如果使用 WebFlux,我們可以使用 `Flux::interval` 運算符。

它會發佈一個無限流的 Long 類型的項,項之間間隔由選擇的 Duration 決定。

現在,讓我們使用 Flux 與我們之前的示例。目標是每五秒發送一條 Chuck Norris 的名言。首先,我們需要實現 InitializingBean 接口,以便在應用程序啓動時訂閲 Flux

@Service
public class ReactiveScheduledPushMessages implements InitializingBean {

    private SimpMessagingTemplate simpMessagingTemplate;

    private ChuckNorris chuckNorris;

    @Autowired
    public ReactiveScheduledPushMessages(SimpMessagingTemplate simpMessagingTemplate, ChuckNorris chuckNorris) {
        this.simpMessagingTemplate = simpMessagingTemplate;
        this.chuckNorris = chuckNorris;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Flux.interval(Duration.ofSeconds(5L))
            // discard the incoming Long, replace it by an OutputMessage
            .map((n) -> new OutputMessage("Chuck Norris (Flux::interval)", 
                              chuckNorris.fact(), 
                              new SimpleDateFormat("HH:mm").format(new Date()))) 
            .subscribe(message -> simpMessagingTemplate.convertAndSend("/topic/pushmessages", message));
    }
}

在這裏,我們使用構造器注入來設置simpMessagingTemplatechuckNorris實例。 這一次,調度邏輯位於afterPropertiesSet()中,當我們實現InitializingBean時,我們會覆蓋它。 該方法將在服務啓動時立即運行。

interval 運算符每五秒發出一個Long。 然後,map 運算符丟棄該值並用我們的消息替換它。 最後,我們subscribeFlux 以觸發每個消息的邏輯。

4. 結論

在本教程中,我們瞭解到實用類 SimpMessagingTemplate 使得通過 WebSocket 輕鬆地推送服務器消息。此外,我們還看到了兩種安排代碼執行的方法。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.