知識庫 / Reactive RSS 訂閱

響應式應用中的 Spring AMQP

Reactive,Spring Persistence,Spring Web
HongKong
11
02:00 PM · Dec 06 ,2025

1. 概述

本教程演示如何創建一個簡單的 Spring Boot Reactive 應用,該應用與 RabbitMQ 消息服務器集成,RabbitMQ 是 AMQP 消息標準的一種流行實現。

我們涵蓋了點對點和發佈訂閲兩種場景,並使用分佈式設置來突出這兩種模式之間的差異。

請注意,我們假設您對 AMQP、RabbitMQ 和 Spring Boot 有基本的瞭解,特別是 Exchange、Queue、Topic 等關鍵概念。有關這些概念的更多信息,請參閲以下鏈接:

  • 使用 Spring AMQP 進行消息傳遞
  • RabbitMQ 簡介

2. RabbitMQ 服務器設置

雖然我們可以本地搭建 RabbitMQ,但在實際應用中,我們更傾向於使用帶有高可用性、監控、安全等附加功能的專用安裝。

為了在我們的開發機器上模擬這樣的環境,我們將使用 Docker 創建一個服務器,供我們的應用程序使用。

以下命令將啓動獨立的 RabbitMQ 服務器:

$ docker run -d --name rabbitmq -p 5672:5672 rabbitmq:3

我們未聲明任何持久卷,因此重啓後未讀消息將會丟失。服務將在主機上端口 5672 上提供。

可以使用 docker logs 命令檢查服務器日誌,該命令的輸出應如下所示:

$ docker logs rabbitmq
2018-06-09 13:42:29.718 [info] <0.33.0>
  Application lager started on node rabbit@rabbit
// ... some lines omitted
2018-06-09 13:42:33.491 [info] <0.226.0>
 Starting RabbitMQ 3.7.5 on Erlang 20.3.5
 Copyright (C) 2007-2018 Pivotal Software, Inc.
 Licensed under the MPL.  See http://www.rabbitmq.com/

  ##  ##
  ##  ##      RabbitMQ 3.7.5. Copyright (C) 2007-2018 Pivotal Software, Inc.
  ##########  Licensed under the MPL.  See http://www.rabbitmq.com/
  ######  ##
  ##########  Logs: <stdout>

              Starting broker...
2018-06-09 13:42:33.494 [info] <0.226.0>
 node           : rabbit@rabbit
 home dir       : /var/lib/rabbitmq
 config file(s) : /etc/rabbitmq/rabbitmq.conf
 cookie hash    : CY9rzUYh03PK3k6DJie09g==
 log(s)         : <stdout>
 database dir   : /var/lib/rabbitmq/mnesia/rabbit@rabbit

// ... more log lines

由於鏡像中包含了 rabbitmqctl 工具,我們可以使用它來執行鏡像運行期間的各種管理任務。

例如,我們可以使用以下命令獲取服務器狀態信息:

$ docker exec rabbitmq rabbitmqctl status
Status of node rabbit@rabbit ...
[{pid,299},
 {running_applications,
     [{rabbit,"RabbitMQ","3.7.5"},
      {rabbit_common,
          "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
          "3.7.5"},
// ... other info omitted for brevity

以下是翻譯後的內容:

以下是一些其他有用的命令:

  • list_exchanges:列出所有聲明的 Exchange
  • list_queues:列出所有聲明的 Queue,包括未讀消息的數量
  • list_bindings:列出 Exchange 和 Queue 之間的所有 Binding 定義,也包括路由鍵

3. Spring AMQP 項目設置

一旦我們的 RabbitMQ 服務器啓動並運行,我們就可以創建我們的 Spring 項目。這個示例項目允許任何 REST 客户端向消息服務器發佈和/或接收消息,利用 Spring AMQP 模塊和相應的 Spring Boot 啓動器與該服務器進行通信。

我們需要在 pom.xml 項目文件中添加的主要依賴項如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>3.1.5</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <version>3.1.5</version> 
</dependency>

spring-boot-starter-amqp 提供了所有與 AMQP 相關的內容,而 spring-boot-starter-webflux 是用於實現我們反應式 REST 服務器的核心依賴。

注意:您可以在 Maven Central 上查看最新版本的 Spring Boot Starter AMQPWebflux 模塊。

4. 場景 1:點對點消息傳遞

在本場景中,我們將使用直接交換(Direct Exchange),它是消息代理(broker)中的邏輯實體,用於接收來自客户端的消息。

直接交換(Direct Exchange)會將所有傳入的消息路由到單個 – 且僅單個 – 隊列,然後由客户端消費。 多個客户端可以訂閲相同的隊列,但只有一個客户端將接收到特定消息。

4.1. 交換和隊列設置

在我們的場景中,我們使用一個 DestinationInfo 對象,該對象封裝了交換名稱和路由鍵。一個以目的地名稱為鍵的映射將用於存儲所有可用的目的地。

以下 @PostConstruct 方法將負責此初始設置:

@Autowired
private AmqpAdmin amqpAdmin;
    
@Autowired
private DestinationsConfig destinationsConfig;

@PostConstruct
public void setupQueueDestinations() {
    destinationsConfig.getQueues()
        .forEach((key, destination) -> {
            Exchange ex = ExchangeBuilder.directExchange(
              destination.getExchange())
              .durable(true)
              .build();
            amqpAdmin.declareExchange(ex);
            Queue q = QueueBuilder.durable(
              destination.getRoutingKey())
              .build();
            amqpAdmin.declareQueue(q);
            Binding b = BindingBuilder.bind(q)
              .to(ex)
              .with(destination.getRoutingKey())
              .noargs();
            amqpAdmin.declareBinding(b);
        });
}

此方法使用由 Spring 創建的 adminAmqp Bean 來聲明 Exchange、Queue 並使用指定的路由鍵將它們綁定在一起。

所有目的地均來自 DestinationsConfig Bean,該 Bean 是我們在示例中使用的 @ConfigurationProperties 類。

此類具有一個屬性,該屬性已使用 DestinationInfo 對象填充,這些對象是從 application.yml 配置文件中讀取的映射構建的。

4.2. 生產者端點

生產者將通過向 /queue/{name} 路徑發送 HTTP POST 請求來發送消息。

這是一個反應式端點,因此我們使用 Mono 以返回一個簡單的確認:

@SpringBootApplication
@EnableConfigurationProperties(DestinationsConfig.class)
@RestController
public class SpringWebfluxAmqpApplication {
 
    // ... other members omitted
 
    @Autowired
    private AmqpTemplate amqpTemplate;

    @PostMapping(value = "/queue/{name}")
    public Mono<ResponseEntity<?>> sendMessageToQueue(
      @PathVariable String name, @RequestBody String payload) {

        DestinationInfo d = destinationsConfig
          .getQueues().get(name);
        if (d == null) {
            return Mono.just(
              ResponseEntity.notFound().build());
        }
    
        return Mono.fromCallable(() -> {
            amqpTemplate.convertAndSend(
              d.getExchange(), 
              d.getRoutingKey(), 
              payload);  
            return ResponseEntity.accepted().build();
        });
    }

我們首先檢查 name 參數是否對應一個有效的目的地,如果是,則使用自動注入的 amqpTemplate 實例來實際發送 payload——一個簡單的 String 消息——到 RabbitMQ。

4.3. <em >MessageListenerContainer</em >> 工廠

為了異步接收消息,Spring AMQP 使用一個 <em >MessageContainerListener</em >> 抽象類,它來協調來自 AMQP 隊列和應用程序提供的監聽器之間的信息流動。

由於我們需要一個該類的具體實現才能附加我們的消息監聽器,因此我們定義了一個工廠,將控制器代碼與實際實現隔離。

在我們的例子中,該工廠方法每次調用其 <em >createMessageListenerContainer</em > 方法時,都會返回一個新的 <em >SimpleMessageContainerListener</em >>

@Component
public class MessageListenerContainerFactory {

    @Autowired
    private ConnectionFactory connectionFactory;

    public MessageListenerContainerFactory() {}

    public MessageListenerContainer createMessageListenerContainer(String queueName) {
        SimpleMessageListenerContainer mlc = new SimpleMessageListenerContainer(connectionFactory);
        mlc.addQueueNames(queueName);
        return mlc;
    }
}

4.4. 消費者端點

消費者將使用與生產者相同的端點地址 (/queue/{name}) 獲取消息。

該端點返回一個 Flux 事件流,其中每個事件對應一個接收到的消息:

@Autowired
private MessageListenerContainerFactory messageListenerContainerFactory;

@GetMapping(
  value = "/queue/{name}",
  produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromQueue(@PathVariable String name) {

    DestinationInfo d = destinationsConfig
      .getQueues()
      .get(name);
    if (d == null) {
        return Flux.just(ResponseEntity.notFound()
          .build());
    }

    MessageListenerContainer mlc = messageListenerContainerFactory
      .createMessageListenerContainer(d.getRoutingKey());

    Flux<String> f = Flux.<String> create(emitter -> {
        mlc.setupMessageListener((MessageListener) m -> {
            String payload = new String(m.getBody());
            emitter.next(payload);
        });
        emitter.onRequest(v -> {
            mlc.start();
        });
        emitter.onDispose(() -> {
            mlc.stop();
        });
      });

    return Flux.interval(Duration.ofSeconds(5))
      .map(v -> "No news is good news")
      .mergeWith(f);
}

在目標名稱的初步檢查後,消費者端點使用 MessageListenerContainer ,通過 MessageListenerContainerFactory 及其從註冊表中恢復的隊列名稱創建。

一旦我們獲得了 MessageListenerContainer,我們使用其 create() 構造方法之一創建 Flux,該方法接受一個 FluxSink 參數。

在我們的特定情況下,我們使用一個接受 lambda 表達式,該 lambda 表達式接受 FluxSink 參數的方法,從而將 Spring AMQP 的基於監聽器的異步 API 橋接到我們的反應式應用程序中。

我們還附加了兩個額外的 lambda 表達式到發射器的 onRequest 和 onDispose 回調函數中,以便我們的 MessageListenerContainer 可以根據 Flux 的生命週期分配/釋放其內部資源。

最後,我們將生成的 Flux 與使用 interval() 創建的另一個 Flux 合併,該方法每五秒創建一個新的事件。 這些虛假消息在我們的案例中起着重要作用:如果沒有它們,我們只會檢測到客户端在接收和發送消息時斷開連接,這取決於您特定的用例,可能需要很長時間。

4.5. 測試

有了我們消費者和發佈者端點都已配置完成,現在我們可以使用我們的示例應用程序進行一些測試。

我們需要在 application.yml 中定義 RabbitMQ 服務器連接詳細信息以及至少一個目的地,它應該如下所示:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    
destinations:
  queues:
    NYSE:
      exchange: nyse
      routing-key: NYSE

spring.rabbitmq.* 屬性定義了用於連接到我們運行在本地 Docker 容器中的 RabbitMQ 服務器所需的基本屬性。請注意,上述 IP 地址僅為示例,在特定設置中可能不同。

隊列定義使用 destinations.queues.<name>.*,其中 <name> 用作目標名稱。這裏聲明瞭一個名為“NYSE”的目標,該目標將向 RabbitMQ 中的“nyse”交換器發送消息,並使用“NYSE”路由鍵。

通過命令行或從我們的 IDE 啓動服務器後,我們就可以開始發送和接收消息。我們將使用 curl 工具,這是一個在 Windows、Mac & Linux 操作系統上常用的工具。

以下列表顯示瞭如何將消息發送到我們的目標以及服務器的預期響應:

$ curl -v -d "Test message" http://localhost:8080/queue/NYSE
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /queue/NYSE HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
> Content-Length: 12
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 12 out of 12 bytes
< HTTP/1.1 202 Accepted
< content-length: 0
<
* Connection #0 to host localhost left intact

執行完該命令後,我們可以驗證消息是否已通過 RabbitMQ 接收並準備好供消費,通過以下命令進行驗證:

$ docker exec rabbitmq rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
NYSE    1

現在我們可以使用 curl 命令讀取消息:

$ curl -v http://localhost:8080/queue/NYSE
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /queue/NYSE HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
<
data:Test message

data:No news is good news...

... same message repeating every 5 secs

如我們所見,首先我們獲取之前存儲的消息,然後開始接收我們的假消息,每 5 秒發送一次。

如果再次運行列出隊列的命令,現在可以看到沒有消息存儲在隊列中:

$ docker exec rabbitmq rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
NYSE    0

5. 場景 2:發佈-訂閲 (Publish-Subscribe)

另一個常見的消息應用程序場景是發佈-訂閲模式,其中一條消息必須發送給多個消費者。

RabbitMQ 提供了兩種支持這種類型的交換 (exchange) 模式:扇出 (Fan-out) 和主題 (Topic)。

這兩種交換模式的主要區別在於,後一種模式允許我們根據註冊時提供的路由鍵模式 (例如 “alarm.mailserver.*”) 過濾接收的消息,而前者則簡單地將傳入的消息複製到所有綁定的隊列。

RabbitMQ 還支持 Header 交換,這允許進行更復雜的消息過濾,但其使用超出了本文的範圍。

5.1. 目的地配置

在啓動時,我們使用另一個 <iPostConstruct> 方法定義 Pub/Sub 目的地,正如在點對點場景中所做的那樣。

唯一的區別在於,我們只創建 <emExchanges>,而沒有 <emQueues> – 這些隊列將在以後按需創建並綁定到 <emExchange> 上,因為我們希望為每個客户端創建一個獨有的 <emQueue>。

@PostConstruct
public void setupTopicDestinations(
    destinationsConfig.getTopics()
      .forEach((key, destination) -> {
          Exchange ex = ExchangeBuilder
            .topicExchange(destination.getExchange())
            .durable(true)
            .build();
            amqpAdmin.declareExchange(ex);
      });
}

5.2. publisher 端點

客户端將使用位於 /topic/{name} 路徑上的 publisher 端點,以發佈消息,這些消息將被髮送到所有已連接的客户端。

如前所述,我們使用一個 @PostMapping 註解,它返回一個 Mono 對象,該對象包含消息發送後的狀態。

@PostMapping(value = "/topic/{name}")
public Mono<ResponseEntity<?>> sendMessageToTopic(
  @PathVariable String name, @RequestBody String payload) {

    DestinationInfo d = destinationsConfig
      .getTopics()
      .get(name);
    
    if (d == null) {
        return Mono.just(ResponseEntity.notFound().build());
    }      
    
   return Mono.fromCallable(() -> {
       amqpTemplate.convertAndSend(
         d.getExchange(), d.getRoutingKey(),payload);   
            return ResponseEntity.accepted().build();
        });
    }

5.3. 訂閲者端點

我們的訂閲者端點位於 /topic/{name},並向連接的客户端產生一個 Flux 消息流。

這些消息包括接收到的消息和每 5 秒生成的一個虛擬消息:

@GetMapping(
  value = "/topic/{name}",
  produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromTopic(@PathVariable String name) {
    DestinationInfo d = destinationsConfig.getTopics()
        .get(name);
    if (d == null) {
        return Flux.just(ResponseEntity.notFound()
            .build());
    }
    Queue topicQueue = createTopicQueue(d);
    String qname = topicQueue.getName();
    MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(qname);
    Flux<String> f = Flux.<String> create(emitter -> {
        mlc.setupMessageListener((MessageListener) m -> {
            String payload = new String(m.getBody());
            emitter.next(payload);
        });
        emitter.onRequest(v -> {
            mlc.start();
        });
        emitter.onDispose(() -> {
            amqpAdmin.deleteQueue(qname);
            mlc.stop();
        });            
      });
    
    return Flux.interval(Duration.ofSeconds(5))
        .map(v -> "No news is good news")
        .mergeWith(f);
}

這段代碼與之前案例基本相同,僅有以下差異:首先,我們為每個新訂閲者創建一個新的 Queue

我們通過調用 createTopicQueue() 方法來實現,該方法使用來自 DestinationInfo 實例的信息,創建一個專有、不可持久化的隊列,然後使用配置的路由鍵將其綁定到 Exchange

private Queue createTopicQueue(DestinationInfo destination) {

    Exchange ex = ExchangeBuilder
      .topicExchange(destination.getExchange())
      .durable(true)
      .build();
    amqpAdmin.declareExchange(ex);
    Queue q = QueueBuilder
      .nonDurable()
      .build();     
    amqpAdmin.declareQueue(q);
    Binding b = BindingBuilder.bind(q)
      .to(ex)
      .with(destination.getRoutingKey())
      .noargs();        
    amqpAdmin.declareBinding(b);
    return q;
}

需要注意的是,儘管我們再次聲明瞭 Exchange,RabbitMQ 不會創建新的一個,因為我們在啓動時就已經聲明瞭它。

第二個差異在於我們傳遞給 onDispose() 方法的 lambda 函數,該函數會在訂閲者斷開連接時也會刪除 Queue

5.3. 測試

為了測試 Pub/Sub 場景,首先需要在 application.yml 中定義一個主題目的地,如下所示:

destinations:
## ... queue destinations omitted      
  topics:
    weather:
      exchange: alerts
      routing-key: WEATHER

這裏,我們定義了一個主題端點,該端點將在 /topic/weather 位置可用。該端點將用於將消息發佈到 RabbitMQ 的“alerts”交換,並使用“WEATHER”路由鍵。

啓動服務器後,我們可以使用 rabbitmqctl 命令來驗證交換是否已創建。

$ docker exec docker_rabbitmq_1 rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
amq.topic       topic
amq.fanout      fanout
amq.match       headers
amq.headers     headers
        direct
amq.rabbitmq.trace      topic
amq.direct      direct
alerts  topic

現在,如果我們發出 list_bindings 命令,我們可以看到沒有與“alerts”交換相關的隊列:

$ docker exec rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
        exchange        NYSE    queue   NYSE    []
nyse    exchange        NYSE    queue   NYSE    []

讓我們啓動幾個訂閲者,它們將訂閲我們的目標,通過在每個命令殼中打開兩個命令殼併發出以下命令來實現:

$ curl -v http://localhost:8080/topic/weather
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /topic/weather HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
<
data:No news is good news...

# ... same message repeating indefinitely

最後,我們再次使用 curl 向我們的訂閲者發送警報:

$ curl -v -H "Content-Type: application/json" -d "Hurricane approaching!" http://localhost:8080/topic/weather
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /topic/weather HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
> Content-Length: 22
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 22 out of 22 bytes
< HTTP/1.1 202 Accepted
< content-length: 0
<
* Connection #0 to host localhost left intact

一旦我們發送消息,我們幾乎可以立即在每個訂閲者的設備上看到“颶風逼近!”的消息。

如果現在檢查可用的綁定,我們可以看到每個訂閲者都有一個隊列:

$ docker exec rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
        exchange        IBOV    queue   IBOV    []
        exchange        NYSE    queue   NYSE    []
        exchange        spring.gen-i0m0pbyKQMqpz2_KFZCd0g       
  queue   spring.gen-i0m0pbyKQMqpz2_KFZCd0g       []
        exchange        spring.gen-wCHALTsIS1q11PQbARJ7eQ       
  queue   spring.gen-wCHALTsIS1q11PQbARJ7eQ       []
alerts  exchange        spring.gen-i0m0pbyKQMqpz2_KFZCd0g     
  queue   WEATHER []
alerts  exchange        spring.gen-wCHALTsIS1q11PQbARJ7eQ     
  queue   WEATHER []
ibov    exchange        IBOV    queue   IBOV    []
nyse    exchange        NYSE    queue   NYSE    []
quotes  exchange        NYSE    queue   NYSE    []

當我們在訂閲者的殼層上按下 Ctrl-C 時,我們的網關最終會檢測到客户端已斷開連接,並移除這些綁定。

6. 結論

在本文中,我們演示瞭如何使用 spring-amqp 模塊創建一個簡單的響應式應用程序,並與之交互以使用 RabbitMQ 服務器。

僅需幾行代碼,我們就能創建一個功能完善的 HTTP 到 AMQP 網關,它支持點對點和發佈訂閲集成模式,並且可以輕鬆地通過添加標準 Spring 功能來擴展以添加額外的功能,例如安全功能。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.