知識庫 / Spring RSS 訂閱

Spring AMQP 中 RabbitMQ 消息分發

Spring
HongKong
4
02:36 PM · Dec 06 ,2025

1. 引言

在本教程中,我們將探討 fanout(廣播)和主題交換的概念,以及使用 Spring AMQPRabbitMQ 的相關內容。

在高層面上,fanout 交換 將相同的消息廣播到所有綁定的隊列,而 topic 交換 則使用路由鍵來將消息傳遞到特定的綁定的隊列或隊列。

在開始本教程之前,建議您先閲讀《Messaging With Spring AMQP》。

2. 設置一個 Fanout 交換器

讓我們設置一個 Fanout 交換器,並將其與兩個隊列綁定。當我們將消息發送到此交換器時,這兩個隊列都會接收到該消息。我們的 Fanout 交換器會忽略消息中包含的任何路由鍵。

Spring AMQP 允許我們通過 Declarables 對象聚合隊列、交換器和綁定的所有聲明:

@Bean
public Declarables fanoutBindings() {
    Queue fanoutQueue1 = new Queue("fanout.queue1", false);
    Queue fanoutQueue2 = new Queue("fanout.queue2", false);
    FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange");

    return new Declarables(
      fanoutQueue1,
      fanoutQueue2,
      fanoutExchange,
      BindingBuilder.bind(fanoutQueue1).to(fanoutExchange),
      BindingBuilder.bind(fanoutQueue2).to(fanoutExchange));
}

3. 設置主題交換

現在,我們還將設置一個主題交換,其中包含兩個隊列,每個隊列具有不同的綁定模式:

@Bean
public Declarables topicBindings() {
    Queue topicQueue1 = new Queue(topicQueue1Name, false);
    Queue topicQueue2 = new Queue(topicQueue2Name, false);

    TopicExchange topicExchange = new TopicExchange(topicExchangeName);

    return new Declarables(
      topicQueue1,
      topicQueue2,
      topicExchange,
      BindingBuilder
        .bind(topicQueue1)
        .to(topicExchange).with("*.important.*"),
      BindingBuilder
        .bind(topicQueue2)
        .to(topicExchange).with("#.error"));
}

主題交換允許我們將隊列綁定到不同的模式下,實現靈活的綁定。 它可以將多個隊列綁定到相同的模式,甚至將相同的模式綁定到多個隊列。

當消息的路由鍵與模式匹配時,消息將被放置在隊列中。 如果隊列具有多個匹配消息路由鍵的綁定,則僅在一個隊列上放置一個消息副本。

我們的綁定模式可以使用星號(“*”)來匹配特定位置的單詞,或使用井號(“#”)來匹配零個或多個單詞。

因此,我們的 topicQueue1 將接收到具有三個單詞模式,中間單詞為“important”的路由鍵的消息,例如:user.important.errorblog.important.notification

而我們的 topicQueue2 將接收到以“error”結尾的路由鍵的消息,匹配的示例包括 erroruser.important.errorblog.post.save.error

4. 設置生產者

我們將使用 <em >convertAndSend</em> 方法,通過 <em >RabbitTemplate</em> 發送我們的示例消息:

    String message = " payload is broadcast";
    return args -> {
        rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, "", "fanout" + message);
        rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN, 
            "topic important warn" + message);
        rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR, 
            "topic important error" + message);
    };

RabbitTemplate 提供多種重載的 convertAndSend() 方法,以適應不同的交換類型。

當向 fanout 交換器發送消息時,路由鍵將被忽略,消息將傳遞到所有綁定的隊列。

當向 topic 交換器發送消息時,需要提供路由鍵。根據此路由鍵,消息將被傳遞到特定的隊列。

5. 配置消費者

最後,我們將設置四個消費者,每個消費者對應一個隊列,用於接收生產者產生的消息:

    @RabbitListener(queues = {FANOUT_QUEUE_1_NAME})
    public void receiveMessageFromFanout1(String message) {
        System.out.println("Received fanout 1 message: " + message);
    }

    @RabbitListener(queues = {FANOUT_QUEUE_2_NAME})
    public void receiveMessageFromFanout2(String message) {
        System.out.println("Received fanout 2 message: " + message);
    }

    @RabbitListener(queues = {TOPIC_QUEUE_1_NAME})
    public void receiveMessageFromTopic1(String message) {
        System.out.println("Received topic 1 (" + BINDING_PATTERN_IMPORTANT + ") message: " + message);
    }

    @RabbitListener(queues = {TOPIC_QUEUE_2_NAME})
    public void receiveMessageFromTopic2(String message) {
        System.out.println("Received topic 2 (" + BINDING_PATTERN_ERROR + ") message: " + message);
    }

我們使用@RabbitListener註解來配置消費者

此處傳遞的唯一參數是隊列的名稱。消費者在此處不瞭解交換器或路由鍵。

6. 運行示例

我們的示例項目是一個 Spring Boot 應用程序,因此它將與 RabbitMQ 的連接一起初始化應用程序,並設置所有隊列、交換器和綁定。

默認情況下,我們的應用程序期望 RabbitMQ 實例在 localhost 上的端口 5672 運行。我們可以修改這些默認值和其他默認值,具體修改請參考 application.yaml

我們的項目通過 HTTP 端點 – /broadcast – 暴露,該端點接受請求主體中包含消息的 POST 請求。

當我們向該 URI 發送包含“Test”的請求,我們應該在輸出中看到類似以下內容:

Received fanout 1 message: fanout payload is broadcast
Received topic 1 (*.important.*) message: topic important warn payload is broadcast
Received topic 2 (#.error) message: topic important error payload is broadcast
Received fanout 2 message: fanout payload is broadcast
Received topic 1 (*.important.*) message: topic important error payload is broadcast

這些消息的顯示順序並非保證。

7. 結論

在本快速教程中,我們介紹了使用 Spring AMQP 和 RabbitMQ 實現 fanout 和主題交換的內容。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.