知識庫 / Spring RSS 訂閱

項目Reactor Bus 簡介

Spring
HongKong
4
02:43 PM · Dec 06 ,2025

1. 概述

本文將介紹 reactor-bus,通過設置一個實際應用場景,來構建一個反應式、事件驅動的應用。

注意: reactor-bus 項目在 Reactor 3.x 版本中已被移除:已存檔的 reactor-bus 倉庫

2. Project Reactor 的基礎知識

2.1. 為什麼選擇 Reactor?

現代應用程序需要處理大量的併發請求並處理大量數據。傳統的阻塞式代碼已經無法滿足這些需求。

反應式設計模式是一種 事件驅動的架構方法,用於異步處理來自單個或多個服務處理器的大量併發服務請求

Project Reactor 基於此模式,並有明確且雄心勃勃的目標,即在 JVM 上構建非阻塞、反應式應用程序。

2.2. 示例場景

在開始之前,以下是一些利用響應式架構風格的有趣場景,旨在幫助您瞭解我們可能在哪裏應用它:

  • 大型在線購物平台(如亞馬遜)的通知服務
  • 銀行領域的巨量交易處理服務
  • 股票交易業務,其中股票價格同時變化

3. Maven 依賴

讓我們通過將以下依賴項添加到我們的 <em pom.xml</em> 中來開始使用 Project Reactor Bus:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-bus</artifactId>
    <version>2.0.8.RELEASE</version>
</dependency>

我們可以在 reactor-bus 的最新版本中 Maven Central

4. 構建演示應用程序

為了更好地理解基於反應器的優勢,讓我們來看一個實際的例子。

我們將構建一個簡單的應用程序,負責向在線購物平台的用户發送通知。例如,如果用户下達一個新的訂單,應用程序將通過電子郵件或短信發送訂單確認。

典型的同步實現自然受到電子郵件或短信服務的吞吐量限制。因此,如節假日等流量高峯通常會帶來問題。

採用反應式方法,我們可以設計系統使其更具靈活性,並更好地適應外部系統(如網關服務器)中可能發生的故障或超時。

讓我們來查看該應用程序——從更傳統的方面開始,然後過渡到更反應式的構造。

4.1. 簡單 POJO

首先,讓我們創建一個 POJO 類來表示通知數據:

public class NotificationData {
	
    private long id;
    private String name;
    private String email;
    private String mobile;
    
    // getter and setter methods
}

4.2. 服務層

現在,讓我們定義一個簡單的服務層:

public interface NotificationService {

    void initiateNotification(NotificationData notificationData) 
      throws InterruptedException;

}

以下代碼模擬了一個長時間運行的操作:

@Service
public class NotificationServiceimpl implements NotificationService {
	
    @Override
    public void initiateNotification(NotificationData notificationData) 
      throws InterruptedException {

      System.out.println("Notification service started for "
        + "Notification ID: " + notificationData.getId());
		
      Thread.sleep(5000);
		
      System.out.println("Notification service ended for "
        + "Notification ID: " + notificationData.getId());
    }
}

請注意,為了演示通過短信或電子郵件網關發送消息的實際場景,我們故意在 initiateNotification 方法中引入了五秒的延遲,使用 Thread.sleep(5000)

因此,當線程到達服務時,將會被阻塞五秒。

4.3. 消費者

現在,讓我們深入探討應用程序的更具反應性的方面,並實現一個消費者,然後將其映射到反應器事件總線:

@Service
public class NotificationConsumer implements 
  Consumer<Event<NotificationData>> {

    @Autowired
    private NotificationService notificationService;
	
    @Override
    public void accept(Event<NotificationData> notificationDataEvent) {
        NotificationData notificationData = notificationDataEvent.getData();
        
        try {
            notificationService.initiateNotification(notificationData);
        } catch (InterruptedException e) {
            // ignore        
        }	
    }
}

 

如我們所見,我們創建的消費者實現了 Consumer<T> 接口。主要的邏輯位於 accept 方法中。

這種方法與典型的 Spring 監聽器實現類似。

4.4. 控制器

現在我們能夠消費事件,接下來我們也將生成它們。

我們將通過一個簡單的控制器來實現這一點:

@Controller
public class NotificationController {

    @Autowired
    private EventBus eventBus;

    @GetMapping("/startNotification/{param}")
    public void startNotification(@PathVariable Integer param) {
        for (int i = 0; i < param; i++) {
            NotificationData data = new NotificationData();
            data.setId(i);

            eventBus.notify("notificationConsumer", Event.wrap(data));

            System.out.println(
              "Notification " + i + ": notification task submitted successfully");
        }
    }
}

這非常直觀——我們通過 EventBus 發射事件。

例如,如果客户端訪問帶有 param 值 ten 的 URL,則將發送十個事件通過事件總線。

4.5. Java 配置

現在,讓我們將所有內容整合在一起,創建一個簡單的 Spring Boot 應用程序。

首先,我們需要配置 EventBusEnvironment Bean:

@Configuration
public class Config {

    @Bean
    public Environment env() {
        return Environment.initializeIfEmpty().assignErrorJournal();
    }

    @Bean
    public EventBus createEventBus(Environment env) {
        return EventBus.create(env, Environment.THREAD_POOL);
    }
}

在我們的案例中,我們正在使用環境默認提供的線程池實例化 EventBus

或者,我們可以使用自定義的 Dispatcher 實例:

EventBus evBus = EventBus.create(
  env, 
  Environment.newDispatcher(
    REACTOR_CAPACITY,REACTOR_CONSUMERS_COUNT,   
    DispatcherType.THREAD_POOL_EXECUTOR));

現在,我們準備創建主應用程序代碼:

import static reactor.bus.selector.Selectors.$;

@SpringBootApplication
public class NotificationApplication implements CommandLineRunner {

    @Autowired
    private EventBus eventBus;

    @Autowired
    private NotificationConsumer notificationConsumer;

    @Override
    public void run(String... args) throws Exception {
        eventBus.on($("notificationConsumer"), notificationConsumer);
    }

    public static void main(String[] args) {
        SpringApplication.run(NotificationApplication.class, args);
    }
}

在我們的 run 方法中,我們正在註冊 notificationConsumer 以在通知匹配給定選擇器時觸發

請注意我們如何使用靜態導入的 $ 屬性來創建 Selector 對象。

5. 測試應用程序

現在,讓我們創建一個測試,以查看我們的NotificationApplication 在實際應用中的效果:

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class NotificationApplicationIntegrationTest {

    @LocalServerPort
    private int port;

    @Test
    public void givenAppStarted_whenNotificationTasksSubmitted_thenProcessed() {
        RestTemplate restTemplate = new RestTemplate();
        restTemplate.getForObject("http://localhost:" + port + "/startNotification/10", String.class);
    }
}

如我們所見,一旦請求執行,所有十個任務都會立即提交,而不會產生任何阻塞。並且,提交後,通知事件會並行處理。

Notification 0: notification task submitted successfully
Notification 1: notification task submitted successfully
Notification 2: notification task submitted successfully
Notification 3: notification task submitted successfully
Notification 4: notification task submitted successfully
Notification 5: notification task submitted successfully
Notification 6: notification task submitted successfully
Notification 7: notification task submitted successfully
Notification 8: notification task submitted successfully
Notification 9: notification task submitted successfully
Notification service started for Notification ID: 1
Notification service started for Notification ID: 2
Notification service started for Notification ID: 3
Notification service started for Notification ID: 0
Notification service ended for Notification ID: 1
Notification service ended for Notification ID: 0
Notification service started for Notification ID: 4
Notification service ended for Notification ID: 3
Notification service ended for Notification ID: 2
Notification service started for Notification ID: 6
Notification service started for Notification ID: 5
Notification service started for Notification ID: 7
Notification service ended for Notification ID: 4
Notification service started for Notification ID: 8
Notification service ended for Notification ID: 6
Notification service ended for Notification ID: 5
Notification service started for Notification ID: 9
Notification service ended for Notification ID: 7
Notification service ended for Notification ID: 8
Notification service ended for Notification ID: 9

請務必記住,在我們的場景中,無需對這些事件進行任何特定順序的處理。

6. 結論

在本快速教程中,我們創建了一個簡單的基於事件驅動的應用. 我們還看到了如何編寫更具反應性和非阻塞特性代碼。

然而,此場景只是對該主題的一個初步探索,並僅代表一個良好的起點,用於嘗試反應性範式.

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.