知識庫 / Spring / Spring Boot RSS 訂閱

Apache RocketMQ 與 Spring Boot 集成開發指南

Data,Spring Boot
HongKong
4
01:05 PM · Dec 06 ,2025

1. 簡介

在本教程中,我們將使用 Spring Boot 和 Apache RocketMQ 創建一個消息生產者和消費者。Apache RocketMQ 是一個開源的分佈式消息和流數據平台。

2. 依賴項

對於 Maven 項目,需要添加 RocketMQ Spring Boot Starter 依賴項:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.4</version>
</dependency>

3. 生產消息

為了我們的示例,我們將創建一個基本的消息生產者,該生產者會在用户從購物車中添加或刪除項目時發送事件。

首先,讓我們在 application.properties 中設置我們的服務器位置和組名:

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=cart-producer-group

請注意,如果我們的 DNS 服務器數量超過一個,我們可以這樣列出它們:host:port;host:port

為了簡化操作,我們將創建一個 CommandLineRunner 應用程序,並在應用程序啓動時生成一些事件:

@SpringBootApplication
public class CartEventProducer implements CommandLineRunner {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public static void main(String[] args) {
        SpringApplication.run(CartEventProducer.class, args);
    }

    public void run(String... args) throws Exception {
        rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("bike", 1));
        rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("computer", 2));
        rocketMQTemplate.convertAndSend("cart-item-removed-topic", new CartItemEvent("bike", 1));
    }
}

CartItemEvent 僅包含兩個屬性:項目 ID 和數量。

class CartItemEvent {
    private String itemId;
    private int quantity;

    // constructor, getters and setters
}

在上述示例中,我們使用 convertAndSend() 方法,該方法是 AbstractMessageSendingTemplate 抽象類中定義的通用方法,用於發送我們的購物車事件。該方法接受兩個參數:一個目的地,在本例中是一個主題名稱,以及一個消息負載。

4. 消息消費者

消費 RocketMQ 消息就像創建 Spring 組件,並使用 @RocketMQMessageListener 標註,然後實現 RocketMQListener 接口:

@SpringBootApplication
public class CartEventConsumer {

    public static void main(String[] args) {
        SpringApplication.run(CartEventConsumer.class, args);
    }

    @Service
    @RocketMQMessageListener(
      topic = "cart-item-add-topic",
      consumerGroup = "cart-consumer_cart-item-add-topic"
    )
    public class CardItemAddConsumer implements RocketMQListener<CartItemEvent> {
        public void onMessage(CartItemEvent addItemEvent) {
            log.info("Adding item: {}", addItemEvent);
            // additional logic
        }
    }

    @Service
    @RocketMQMessageListener(
      topic = "cart-item-removed-topic",
      consumerGroup = "cart-consumer_cart-item-removed-topic"
    )
    public class CardItemRemoveConsumer implements RocketMQListener<CartItemEvent> {
        public void onMessage(CartItemEvent removeItemEvent) {
            log.info("Removing item: {}", removeItemEvent);
            // additional logic
        }
    }
}

我們需要為每個監聽的主題創建單獨的組件。在這些監聽器中,我們通過 @RocketMQMessageListener@ 標註來定義主題和消費者組的名稱。

5. 同步和異步傳輸

在之前的示例中,我們使用了 convertAndSend 方法發送我們的消息。雖然還有其他選項可以使用。

我們可以例如調用 syncSend,它與 convertAndSend 不同,因為它會返回 SendResult 對象。

它可以用於例如驗證我們的消息是否已成功發送或獲取其 ID:

public void run(String... args) throws Exception { 
    SendResult addBikeResult = rocketMQTemplate.syncSend("cart-item-add-topic", 
      new CartItemEvent("bike", 1)); 
    SendResult addComputerResult = rocketMQTemplate.syncSend("cart-item-add-topic", 
      new CartItemEvent("computer", 2)); 
    SendResult removeBikeResult = rocketMQTemplate.syncSend("cart-item-removed-topic", 
      new CartItemEvent("bike", 1)); 
}

類似於 此方法僅在發送過程完成後返回。

在需要高可靠性的情況下,如重要通知消息或短信通知時,我們應該使用同步傳輸。

另一方面,我們可能希望異步發送消息並收到發送完成的通知。

我們可以使用 它作為參數接受一個 並立即返回:

rocketMQTemplate.asyncSend("cart-item-add-topic", new CartItemEvent("bike", 1), new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        log.error("Successfully sent cart item");
    }

    @Override
    public void onException(Throwable throwable) {
        log.error("Exception during cart item sending", throwable);
    }
});

我們會在需要高吞吐量的情況下使用異步傳輸。

最後,對於吞吐量要求極高的場景,我們可以使用 sendOneWay 代替 asyncSendsendOneWayasyncSend 的區別在於,它不保證消息會被髮送。

單向傳輸也可用於普通可靠性場景,例如收集日誌。

6. 在事務中發送消息

RocketMQ 提供了在事務中發送消息的能力。我們可以通過使用 sendInTransaction() 方法來實現:

MessageBuilder.withPayload(new CartItemEvent("bike", 1)).build();
rocketMQTemplate.sendMessageInTransaction("test-transaction", "topic-name", msg, null);

我們還需要實現一個 RocketMQLocalTransactionListener 接口:

@RocketMQTransactionListener(txProducerGroup="test-transaction")
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
      @Override
      public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
          // ... local transaction process, return ROLLBACK, COMMIT or UNKNOWN
          return RocketMQLocalTransactionState.UNKNOWN;
      }

      @Override
      public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
          // ... check transaction status and return ROLLBACK, COMMIT or UNKNOWN
          return RocketMQLocalTransactionState.COMMIT;
      }
}

sendMessageInTransaction() 中,第一個參數是事務名稱。它必須與 @RocketMQTransactionListener 的成員變量 txProducerGroup 相同。

7. 消息生產者配置

我們還可以配置消息生產者本身的一些方面:

  • rocketmq.producer.send-message-timeout: 消息發送超時時間,單位毫秒 – 默認值為 3000
  • rocketmq.producer.compress-message-body-threshold: 超過該閾值時,RocketMQ 會壓縮消息 – 默認值為 1024。
  • rocketmq.producer.max-message-size: 消息最大大小,單位字節 – 默認值為 4096。
  • rocketmq.producer.retry-times-when-send-async-failed: 在異步模式下,發送失敗後內部重試的最大次數 – 默認值為 2。
  • rocketmq.producer.retry-next-server: 在發送失敗時內部重試是否使用其他 Broker – 默認值為 false
  • rocketmq.producer.retry-times-when-send-failed: 在異步模式下,發送失敗後內部重試的最大次數 – 默認值為 2。

8. 結論

在本文中,我們學習瞭如何使用 Apache RocketMQ 和 Spring Boot 發送和消費消息。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.