1. 簡介
在本教程中,我們將使用 Spring Boot 和 Apache RocketMQ 創建一個消息生產者和消費者。Apache RocketMQ 是一個開源的分佈式消息和流數據平台。
2. 依賴項
對於 Maven 項目,需要添加 RocketMQ Spring Boot Starter 依賴項:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>3. 生產消息
為了我們的示例,我們將創建一個基本的消息生產者,該生產者會在用户從購物車中添加或刪除項目時發送事件。
首先,讓我們在 application.properties 中設置我們的服務器位置和組名:
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=cart-producer-group請注意,如果我們的 DNS 服務器數量超過一個,我們可以這樣列出它們:host:port;host:port。
為了簡化操作,我們將創建一個 CommandLineRunner 應用程序,並在應用程序啓動時生成一些事件:
@SpringBootApplication
public class CartEventProducer implements CommandLineRunner {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public static void main(String[] args) {
SpringApplication.run(CartEventProducer.class, args);
}
public void run(String... args) throws Exception {
rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("bike", 1));
rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("computer", 2));
rocketMQTemplate.convertAndSend("cart-item-removed-topic", new CartItemEvent("bike", 1));
}
}CartItemEvent 僅包含兩個屬性:項目 ID 和數量。
class CartItemEvent {
private String itemId;
private int quantity;
// constructor, getters and setters
}在上述示例中,我們使用 convertAndSend() 方法,該方法是 AbstractMessageSendingTemplate 抽象類中定義的通用方法,用於發送我們的購物車事件。該方法接受兩個參數:一個目的地,在本例中是一個主題名稱,以及一個消息負載。
4. 消息消費者
消費 RocketMQ 消息就像創建 Spring 組件,並使用 @RocketMQMessageListener 標註,然後實現 RocketMQListener 接口:
@SpringBootApplication
public class CartEventConsumer {
public static void main(String[] args) {
SpringApplication.run(CartEventConsumer.class, args);
}
@Service
@RocketMQMessageListener(
topic = "cart-item-add-topic",
consumerGroup = "cart-consumer_cart-item-add-topic"
)
public class CardItemAddConsumer implements RocketMQListener<CartItemEvent> {
public void onMessage(CartItemEvent addItemEvent) {
log.info("Adding item: {}", addItemEvent);
// additional logic
}
}
@Service
@RocketMQMessageListener(
topic = "cart-item-removed-topic",
consumerGroup = "cart-consumer_cart-item-removed-topic"
)
public class CardItemRemoveConsumer implements RocketMQListener<CartItemEvent> {
public void onMessage(CartItemEvent removeItemEvent) {
log.info("Removing item: {}", removeItemEvent);
// additional logic
}
}
}我們需要為每個監聽的主題創建單獨的組件。在這些監聽器中,我們通過 @RocketMQMessageListener@ 標註來定義主題和消費者組的名稱。
5. 同步和異步傳輸
在之前的示例中,我們使用了 convertAndSend 方法發送我們的消息。雖然還有其他選項可以使用。
我們可以例如調用 syncSend,它與 convertAndSend 不同,因為它會返回 SendResult 對象。
它可以用於例如驗證我們的消息是否已成功發送或獲取其 ID:
public void run(String... args) throws Exception {
SendResult addBikeResult = rocketMQTemplate.syncSend("cart-item-add-topic",
new CartItemEvent("bike", 1));
SendResult addComputerResult = rocketMQTemplate.syncSend("cart-item-add-topic",
new CartItemEvent("computer", 2));
SendResult removeBikeResult = rocketMQTemplate.syncSend("cart-item-removed-topic",
new CartItemEvent("bike", 1));
}類似於 ,此方法僅在發送過程完成後返回。
在需要高可靠性的情況下,如重要通知消息或短信通知時,我們應該使用同步傳輸。
另一方面,我們可能希望異步發送消息並收到發送完成的通知。
我們可以使用 ,它作為參數接受一個 ,並立即返回:
rocketMQTemplate.asyncSend("cart-item-add-topic", new CartItemEvent("bike", 1), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.error("Successfully sent cart item");
}
@Override
public void onException(Throwable throwable) {
log.error("Exception during cart item sending", throwable);
}
});我們會在需要高吞吐量的情況下使用異步傳輸。
最後,對於吞吐量要求極高的場景,我們可以使用 sendOneWay 代替 asyncSend。sendOneWay 與 asyncSend 的區別在於,它不保證消息會被髮送。
單向傳輸也可用於普通可靠性場景,例如收集日誌。
6. 在事務中發送消息
RocketMQ 提供了在事務中發送消息的能力。我們可以通過使用 sendInTransaction() 方法來實現:
MessageBuilder.withPayload(new CartItemEvent("bike", 1)).build();
rocketMQTemplate.sendMessageInTransaction("test-transaction", "topic-name", msg, null);我們還需要實現一個 RocketMQLocalTransactionListener 接口:
@RocketMQTransactionListener(txProducerGroup="test-transaction")
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// ... local transaction process, return ROLLBACK, COMMIT or UNKNOWN
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// ... check transaction status and return ROLLBACK, COMMIT or UNKNOWN
return RocketMQLocalTransactionState.COMMIT;
}
}在 sendMessageInTransaction() 中,第一個參數是事務名稱。它必須與 @RocketMQTransactionListener 的成員變量 txProducerGroup 相同。
7. 消息生產者配置
我們還可以配置消息生產者本身的一些方面:
- rocketmq.producer.send-message-timeout: 消息發送超時時間,單位毫秒 – 默認值為 3000
- rocketmq.producer.compress-message-body-threshold: 超過該閾值時,RocketMQ 會壓縮消息 – 默認值為 1024。
- rocketmq.producer.max-message-size: 消息最大大小,單位字節 – 默認值為 4096。
- rocketmq.producer.retry-times-when-send-async-failed: 在異步模式下,發送失敗後內部重試的最大次數 – 默認值為 2。
- rocketmq.producer.retry-next-server: 在發送失敗時內部重試是否使用其他 Broker – 默認值為 false。
- rocketmq.producer.retry-times-when-send-failed: 在異步模式下,發送失敗後內部重試的最大次數 – 默認值為 2。
8. 結論
在本文中,我們學習瞭如何使用 Apache RocketMQ 和 Spring Boot 發送和消費消息。