1. 引言
在本教程中,我們將探討 fanout(廣播)和主題交換的概念,以及使用 Spring AMQP 和 RabbitMQ 的相關內容。
在高層面上,fanout 交換 將相同的消息廣播到所有綁定的隊列,而 topic 交換 則使用路由鍵來將消息傳遞到特定的綁定的隊列或隊列。
在開始本教程之前,建議您先閲讀《Messaging With Spring AMQP》。
2. 設置一個 Fanout 交換器
讓我們設置一個 Fanout 交換器,並將其與兩個隊列綁定。當我們將消息發送到此交換器時,這兩個隊列都會接收到該消息。我們的 Fanout 交換器會忽略消息中包含的任何路由鍵。
Spring AMQP 允許我們通過 Declarables 對象聚合隊列、交換器和綁定的所有聲明:
@Bean
public Declarables fanoutBindings() {
Queue fanoutQueue1 = new Queue("fanout.queue1", false);
Queue fanoutQueue2 = new Queue("fanout.queue2", false);
FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange");
return new Declarables(
fanoutQueue1,
fanoutQueue2,
fanoutExchange,
BindingBuilder.bind(fanoutQueue1).to(fanoutExchange),
BindingBuilder.bind(fanoutQueue2).to(fanoutExchange));
}3. 設置主題交換
現在,我們還將設置一個主題交換,其中包含兩個隊列,每個隊列具有不同的綁定模式:
@Bean
public Declarables topicBindings() {
Queue topicQueue1 = new Queue(topicQueue1Name, false);
Queue topicQueue2 = new Queue(topicQueue2Name, false);
TopicExchange topicExchange = new TopicExchange(topicExchangeName);
return new Declarables(
topicQueue1,
topicQueue2,
topicExchange,
BindingBuilder
.bind(topicQueue1)
.to(topicExchange).with("*.important.*"),
BindingBuilder
.bind(topicQueue2)
.to(topicExchange).with("#.error"));
}主題交換允許我們將隊列綁定到不同的模式下,實現靈活的綁定。 它可以將多個隊列綁定到相同的模式,甚至將相同的模式綁定到多個隊列。
當消息的路由鍵與模式匹配時,消息將被放置在隊列中。 如果隊列具有多個匹配消息路由鍵的綁定,則僅在一個隊列上放置一個消息副本。
我們的綁定模式可以使用星號(“*”)來匹配特定位置的單詞,或使用井號(“#”)來匹配零個或多個單詞。
因此,我們的 topicQueue1 將接收到具有三個單詞模式,中間單詞為“important”的路由鍵的消息,例如:user.important.error 或 blog.important.notification。
而我們的 topicQueue2 將接收到以“error”結尾的路由鍵的消息,匹配的示例包括 error、user.important.error 或 blog.post.save.error。
4. 設置生產者
我們將使用 <em >convertAndSend</em> 方法,通過 <em >RabbitTemplate</em> 發送我們的示例消息:
String message = " payload is broadcast";
return args -> {
rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, "", "fanout" + message);
rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN,
"topic important warn" + message);
rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR,
"topic important error" + message);
};RabbitTemplate 提供多種重載的 convertAndSend() 方法,以適應不同的交換類型。
當向 fanout 交換器發送消息時,路由鍵將被忽略,消息將傳遞到所有綁定的隊列。
當向 topic 交換器發送消息時,需要提供路由鍵。根據此路由鍵,消息將被傳遞到特定的隊列。
5. 配置消費者
最後,我們將設置四個消費者,每個消費者對應一個隊列,用於接收生產者產生的消息:
@RabbitListener(queues = {FANOUT_QUEUE_1_NAME})
public void receiveMessageFromFanout1(String message) {
System.out.println("Received fanout 1 message: " + message);
}
@RabbitListener(queues = {FANOUT_QUEUE_2_NAME})
public void receiveMessageFromFanout2(String message) {
System.out.println("Received fanout 2 message: " + message);
}
@RabbitListener(queues = {TOPIC_QUEUE_1_NAME})
public void receiveMessageFromTopic1(String message) {
System.out.println("Received topic 1 (" + BINDING_PATTERN_IMPORTANT + ") message: " + message);
}
@RabbitListener(queues = {TOPIC_QUEUE_2_NAME})
public void receiveMessageFromTopic2(String message) {
System.out.println("Received topic 2 (" + BINDING_PATTERN_ERROR + ") message: " + message);
}我們使用@RabbitListener註解來配置消費者。
此處傳遞的唯一參數是隊列的名稱。消費者在此處不瞭解交換器或路由鍵。
6. 運行示例
我們的示例項目是一個 Spring Boot 應用程序,因此它將與 RabbitMQ 的連接一起初始化應用程序,並設置所有隊列、交換器和綁定。
默認情況下,我們的應用程序期望 RabbitMQ 實例在 localhost 上的端口 5672 運行。我們可以修改這些默認值和其他默認值,具體修改請參考 application.yaml。
我們的項目通過 HTTP 端點 – /broadcast – 暴露,該端點接受請求主體中包含消息的 POST 請求。
當我們向該 URI 發送包含“Test”的請求,我們應該在輸出中看到類似以下內容:
Received fanout 1 message: fanout payload is broadcast
Received topic 1 (*.important.*) message: topic important warn payload is broadcast
Received topic 2 (#.error) message: topic important error payload is broadcast
Received fanout 2 message: fanout payload is broadcast
Received topic 1 (*.important.*) message: topic important error payload is broadcast這些消息的顯示順序並非保證。
7. 結論
在本快速教程中,我們介紹了使用 Spring AMQP 和 RabbitMQ 實現 fanout 和主題交換的內容。