1. 概述
本文將介紹 reactor-bus,通過設置一個實際應用場景,來構建一個反應式、事件驅動的應用。
注意: reactor-bus 項目在 Reactor 3.x 版本中已被移除:已存檔的 reactor-bus 倉庫。
2. Project Reactor 的基礎知識
2.1. 為什麼選擇 Reactor?
現代應用程序需要處理大量的併發請求並處理大量數據。傳統的阻塞式代碼已經無法滿足這些需求。
反應式設計模式是一種 事件驅動的架構方法,用於異步處理來自單個或多個服務處理器的大量併發服務請求。
Project Reactor 基於此模式,並有明確且雄心勃勃的目標,即在 JVM 上構建非阻塞、反應式應用程序。
2.2. 示例場景
在開始之前,以下是一些利用響應式架構風格的有趣場景,旨在幫助您瞭解我們可能在哪裏應用它:
- 大型在線購物平台(如亞馬遜)的通知服務
- 銀行領域的巨量交易處理服務
- 股票交易業務,其中股票價格同時變化
3. Maven 依賴
讓我們通過將以下依賴項添加到我們的 <em pom.xml</em> 中來開始使用 Project Reactor Bus:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bus</artifactId>
<version>2.0.8.RELEASE</version>
</dependency>我們可以在 reactor-bus 的最新版本中 Maven Central。
4. 構建演示應用程序
為了更好地理解基於反應器的優勢,讓我們來看一個實際的例子。
我們將構建一個簡單的應用程序,負責向在線購物平台的用户發送通知。例如,如果用户下達一個新的訂單,應用程序將通過電子郵件或短信發送訂單確認。
典型的同步實現自然受到電子郵件或短信服務的吞吐量限制。因此,如節假日等流量高峯通常會帶來問題。
採用反應式方法,我們可以設計系統使其更具靈活性,並更好地適應外部系統(如網關服務器)中可能發生的故障或超時。
讓我們來查看該應用程序——從更傳統的方面開始,然後過渡到更反應式的構造。
4.1. 簡單 POJO
首先,讓我們創建一個 POJO 類來表示通知數據:
public class NotificationData {
private long id;
private String name;
private String email;
private String mobile;
// getter and setter methods
}4.2. 服務層
現在,讓我們定義一個簡單的服務層:
public interface NotificationService {
void initiateNotification(NotificationData notificationData)
throws InterruptedException;
}以下代碼模擬了一個長時間運行的操作:
@Service
public class NotificationServiceimpl implements NotificationService {
@Override
public void initiateNotification(NotificationData notificationData)
throws InterruptedException {
System.out.println("Notification service started for "
+ "Notification ID: " + notificationData.getId());
Thread.sleep(5000);
System.out.println("Notification service ended for "
+ "Notification ID: " + notificationData.getId());
}
}請注意,為了演示通過短信或電子郵件網關發送消息的實際場景,我們故意在 initiateNotification 方法中引入了五秒的延遲,使用 Thread.sleep(5000)。
因此,當線程到達服務時,將會被阻塞五秒。
4.3. 消費者
現在,讓我們深入探討應用程序的更具反應性的方面,並實現一個消費者,然後將其映射到反應器事件總線:
@Service
public class NotificationConsumer implements
Consumer<Event<NotificationData>> {
@Autowired
private NotificationService notificationService;
@Override
public void accept(Event<NotificationData> notificationDataEvent) {
NotificationData notificationData = notificationDataEvent.getData();
try {
notificationService.initiateNotification(notificationData);
} catch (InterruptedException e) {
// ignore
}
}
}
如我們所見,我們創建的消費者實現了 Consumer<T> 接口。主要的邏輯位於 accept 方法中。
這種方法與典型的 Spring 監聽器實現類似。
4.4. 控制器
現在我們能夠消費事件,接下來我們也將生成它們。
我們將通過一個簡單的控制器來實現這一點:
@Controller
public class NotificationController {
@Autowired
private EventBus eventBus;
@GetMapping("/startNotification/{param}")
public void startNotification(@PathVariable Integer param) {
for (int i = 0; i < param; i++) {
NotificationData data = new NotificationData();
data.setId(i);
eventBus.notify("notificationConsumer", Event.wrap(data));
System.out.println(
"Notification " + i + ": notification task submitted successfully");
}
}
}這非常直觀——我們通過 EventBus 發射事件。
例如,如果客户端訪問帶有 param 值 ten 的 URL,則將發送十個事件通過事件總線。
4.5. Java 配置
現在,讓我們將所有內容整合在一起,創建一個簡單的 Spring Boot 應用程序。
首先,我們需要配置 EventBus 和 Environment Bean:
@Configuration
public class Config {
@Bean
public Environment env() {
return Environment.initializeIfEmpty().assignErrorJournal();
}
@Bean
public EventBus createEventBus(Environment env) {
return EventBus.create(env, Environment.THREAD_POOL);
}
}在我們的案例中,我們正在使用環境默認提供的線程池實例化 EventBus
或者,我們可以使用自定義的 Dispatcher 實例:
EventBus evBus = EventBus.create(
env,
Environment.newDispatcher(
REACTOR_CAPACITY,REACTOR_CONSUMERS_COUNT,
DispatcherType.THREAD_POOL_EXECUTOR));現在,我們準備創建主應用程序代碼:
import static reactor.bus.selector.Selectors.$;
@SpringBootApplication
public class NotificationApplication implements CommandLineRunner {
@Autowired
private EventBus eventBus;
@Autowired
private NotificationConsumer notificationConsumer;
@Override
public void run(String... args) throws Exception {
eventBus.on($("notificationConsumer"), notificationConsumer);
}
public static void main(String[] args) {
SpringApplication.run(NotificationApplication.class, args);
}
}在我們的 run 方法中,我們正在註冊 notificationConsumer 以在通知匹配給定選擇器時觸發。
請注意我們如何使用靜態導入的 $ 屬性來創建 Selector 對象。
5. 測試應用程序
現在,讓我們創建一個測試,以查看我們的NotificationApplication 在實際應用中的效果:
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class NotificationApplicationIntegrationTest {
@LocalServerPort
private int port;
@Test
public void givenAppStarted_whenNotificationTasksSubmitted_thenProcessed() {
RestTemplate restTemplate = new RestTemplate();
restTemplate.getForObject("http://localhost:" + port + "/startNotification/10", String.class);
}
}如我們所見,一旦請求執行,所有十個任務都會立即提交,而不會產生任何阻塞。並且,提交後,通知事件會並行處理。
Notification 0: notification task submitted successfully
Notification 1: notification task submitted successfully
Notification 2: notification task submitted successfully
Notification 3: notification task submitted successfully
Notification 4: notification task submitted successfully
Notification 5: notification task submitted successfully
Notification 6: notification task submitted successfully
Notification 7: notification task submitted successfully
Notification 8: notification task submitted successfully
Notification 9: notification task submitted successfully
Notification service started for Notification ID: 1
Notification service started for Notification ID: 2
Notification service started for Notification ID: 3
Notification service started for Notification ID: 0
Notification service ended for Notification ID: 1
Notification service ended for Notification ID: 0
Notification service started for Notification ID: 4
Notification service ended for Notification ID: 3
Notification service ended for Notification ID: 2
Notification service started for Notification ID: 6
Notification service started for Notification ID: 5
Notification service started for Notification ID: 7
Notification service ended for Notification ID: 4
Notification service started for Notification ID: 8
Notification service ended for Notification ID: 6
Notification service ended for Notification ID: 5
Notification service started for Notification ID: 9
Notification service ended for Notification ID: 7
Notification service ended for Notification ID: 8
Notification service ended for Notification ID: 9請務必記住,在我們的場景中,無需對這些事件進行任何特定順序的處理。
6. 結論
在本快速教程中,我們創建了一個簡單的基於事件驅動的應用. 我們還看到了如何編寫更具反應性和非阻塞特性代碼。
然而,此場景只是對該主題的一個初步探索,並僅代表一個良好的起點,用於嘗試反應性範式.