有時候我們的項目中會用到即時通訊功能,比如電商系統中的客服聊天、支付成功後的異步回調通知等。最近發現RabbitMQ可以很方便的實現即時通訊功能,如果你沒有特殊的業務需求,甚至可以不寫後端代碼,今天給大家介紹下如何使用RabbitMQ來實現即時通訊!
MQTT協議
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基於發佈/訂閲(publish/subscribe)模式的輕量級通訊協議,該協議構建於TCP/IP協議上。MQTT最大優點在於,可以以極少的代碼和有限的帶寬,為連接遠程設備提供實時可靠的消息服務。
相關概念
- Publisher(發佈者):消息的發出者,負責發送消息。
- Subscriber(訂閲者):消息的訂閲者,負責接收並處理消息。
- Broker(代理):消息代理,位於消息發佈者和訂閲者之間,各類支持MQTT協議的消息中間件都可以充當。
- Topic(主題):可以理解為消息隊列中的路由,訂閲者訂閲了主題之後,就可以收到發送到該主題的消息。
- Payload(負載);可以理解為發送消息的內容。
-
QoS(消息質量):全稱Quality of Service,即消息的發送質量,主要有
QoS 0、QoS 1、QoS 2三個等級,下面分別介紹下:- QoS 0(Almost Once):至多一次,只發送一次,會發生消息丟失或重複;
- QoS 1(Atleast Once):至少一次,確保消息到達,但消息重複可能會發生;
- QoS 2(Exactly Once):只有一次,確保消息只到達一次。
RabbitMQ啓用MQTT功能
RabbitMQ默認不會啓用啓用MQTT功能,需要手動開啓。
- 我們需要先安裝並啓動RabbitMQ,這裏以Docker環境安裝為例,運行命令如下;
docker run -p 5672:5672 -p 15672:15672 -p 1883:1883 -p 15675:15675 --name rabbitmq-mqtt \
-v /mydata/rabbitmq-mqtt/data:/var/lib/rabbitmq \
-d rabbitmq:3.9.11-management
- 接下來就是啓用RabbitMQ的MQTT WEB插件了,進入rabbitmq容器後,使用如下命令開啓;
# 先進入rabbitmq容器
docker exec -it rabbitmq-mqtt /bin/bash
# 再啓用mqtt web插件,會同時啓用rabbitmq_mqtt插件
rabbitmq-plugins enable rabbitmq_web_mqtt
- 開啓成功後,查看管理控制枱,我們可以發現MQTT的web服務運行在
15675端口上了,訪問地址:http://192.168.3.101:15672
這或許是一個對你有用的開源項目,mall項目是一套基於
SpringBoot3+ Vue 的電商系統(Github標星60K),後端支持多模塊和最新微服務架構,採用Docker和K8S部署。包括前台商城項目和後台管理系統,能支持完整的訂單流程!涵蓋商品、訂單、購物車、權限、優惠券、會員、支付等功能!
- Boot項目:https://github.com/macrozheng/mall
- Cloud項目:https://github.com/macrozheng/mall-swarm
- 教程網站:https://www.macrozheng.com
項目演示:
MQTT客户端
我們可以使用MQTT客户端來測試MQTT的即時通訊功能,這裏使用的是MQTTX這個客户端工具。
- 這裏使用Docker環境為例,運行命令如下;
docker run -p 80:80 --name mqttx-web -d emqx/mqttx-web
- 運行成功後可以訪問MQTTX的控制枱,訪問地址:http://192.168.3.101
- 點擊左側的
加號按鈕來創建一個MQTT連接,配置好連接信息,注意MQTT版本選擇3.1.1;
- 再添加一個訂閲,訂閲
testTopicA這個主題,我們會向這個主題發送消息;
- 發佈者向主題中發佈消息,訂閲者可以實時接收到。
前端直接實現即時通訊
既然MQTTX客户端可以直接通過RabbitMQ實現即時通訊,那我們直接使用前端技術是否也能實現即時通訊?答案是肯定的!下面我們將通過html+javascript實現一個簡單的聊天功能,真正不寫一行後端代碼實現即時通訊!
- WEB端與MQTT服務進行通訊需要使用一個叫
MQTT.js的庫,項目地址:https://github.com/mqttjs/MQTT.js
- 實現的功能非常簡單,一個單聊功能,需要注意的是配置好MQTT服務的訪問地址為:ws://192.168.3.101:15675/ws
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<div>
<label>目標Topic:<input id="targetTopicInput" type="text"></label><br>
<label>發送消息:<input id="messageInput" type="text"></label><br>
<button onclick="sendMessage()">發送</button>
<button onclick="clearMessage()">清空</button>
<div id="messageDiv"></div>
</div>
</body>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script>
//RabbitMQ的web-mqtt連接地址
const url = 'ws://192.168.3.101:15675/ws';
//獲取訂閲的topic
const topic = getQueryString("topic");
//連接到消息隊列
let client = mqtt.connect(url);
client.on('connect', function () {
//連接成功後訂閲topic
client.subscribe(topic, function (err) {
if (!err) {
showMessage("訂閲topic:" + topic + "成功!");
}
});
});
//獲取訂閲topic中的消息
client.on('message', function (topic, message) {
showMessage("收到消息:" + message.toString());
});
//發送消息
function sendMessage() {
let targetTopic = document.getElementById("targetTopicInput").value;
let message = document.getElementById("messageInput").value;
//向目標topic中發送消息
client.publish(targetTopic, message);
showMessage("發送消息給" + targetTopic + "的消息:" + message);
}
//從URL中獲取參數
function getQueryString(name) {
let reg = new RegExp("(^|&)" + name + "=([^&]*)(&|$)", "i");
let r = window.location.search.substr(1).match(reg);
if (r != null) {
return decodeURIComponent(r[2]);
}
return null;
}
//在消息列表中展示消息
function showMessage(message) {
let messageDiv = document.getElementById("messageDiv");
let messageEle = document.createElement("div");
messageEle.innerText = message;
messageDiv.appendChild(messageEle);
}
//清空消息列表
function clearMessage() {
let messageDiv = document.getElementById("messageDiv");
messageDiv.innerHTML = "";
}
</script>
</html>
-
接下來我們訂閲不同的主題開啓兩個頁面測試下功能(頁面已經放在SpringBoot項目的resources目錄下了,需要先啓動應用再訪問):
- 第一個訂閲主題
testTopicA,訪問地址:http://localhost:8088/page/index?topic=testTopicA - 第二個訂閲主題
testTopicB,訪問地址:http://localhost:8088/page/index?topic=testTopicB
- 第一個訂閲主題
- 之後互相發送消息,讓我們來看看效果吧!
在SpringBoot中使用
沒有特殊業務需求的時候,前端可以直接和RabbitMQ對接實現即時通訊。但有時候我們需要通過服務端去通知前端,此時就需要在應用中集成MQTT了,接下來我們來講講如何在SpringBoot應用中使用MQTT。
- 首先我們需要在項目的
pom.xml中添加MQTT相關依賴;
<!--Spring集成MQTT-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
- 在
application.yml中添加MQTT相關配置,主要是訪問地址、用户名密碼、默認主題信息;
rabbitmq:
mqtt:
url: tcp://192.168.3.101:1883
username: guest
password: guest
defaultTopic: testTopic
- 編寫一個Java配置類從配置文件中讀取上面的配置便於使用;
/**
* @auther macrozheng
* @description MQTT相關配置
* @date 2025/8/1
* @github https://github.com/macrozheng
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "rabbitmq.mqtt")
public class MqttConfig {
/**
* RabbitMQ連接用户名
*/
private String username;
/**
* RabbitMQ連接密碼
*/
private String password;
/**
* RabbitMQ的MQTT默認topic
*/
private String defaultTopic;
/**
* RabbitMQ的MQTT連接地址
*/
private String url;
}
- 添加MQTT消息訂閲者相關配置,使用
@ServiceActivator註解聲明一個服務激活器,通過MessageHandler來處理訂閲消息;
/**
* @auther macrozheng
* @description MQTT消息訂閲者相關配置
* @date 2025/8/1
* @github https://github.com/macrozheng
*/
@Slf4j
@Configuration
public class MqttInboundConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",
mqttConfig.getDefaultTopic());
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
//設置消息質量:0->至多一次;1->至少一次;2->只有一次
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
//處理訂閲消息
log.info("handleMessage : {}",message.getPayload());
}
};
}
}
- 添加MQTT消息發佈者相關配置;
/**
* @auther macrozheng
* @description MQTT消息發佈者相關配置
* @date 2025/8/1
* @github https://github.com/macrozheng
*/
@Configuration
public class MqttOutboundConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { mqttConfig.getUrl()});
options.setUserName(mqttConfig.getUsername());
options.setPassword(mqttConfig.getPassword().toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
- 添加MQTT網關,用於向主題中發送消息;
/**
* @auther macrozheng
* @description MQTT網關,通過接口將數據傳遞到集成流
* @date 2025/8/1
* @github https://github.com/macrozheng
*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* 發送消息到默認topic
*/
void sendToMqtt(String payload);
/**
* 發送消息到指定topic
*/
void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
/**
* 發送消息到指定topic並設置QOS
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
- 添加MQTT測試接口,使用MQTT網關向特定主題中發送消息;
/**
* @auther macrozheng
* @description MQTT測試接口
* @date 2025/8/1
* @github https://github.com/macrozheng
*/
@Slf4j
@RestController
@Tag(name = "MqttController", description = "MQTT測試接口")
@RequestMapping("/mqtt")
public class MqttController {
@Autowired
private MqttGateway mqttGateway;
@PostMapping("/sendToDefaultTopic")
@Operation(summary = "向默認主題發送消息")
public CommonResult sendToDefaultTopic(String payload) {
mqttGateway.sendToMqtt(payload);
return CommonResult.success(null);
}
@PostMapping("/sendToTopic")
@Operation(summary = "向指定主題發送消息")
public CommonResult sendToTopic(String payload, String topic) {
mqttGateway.sendToMqtt(payload, topic);
return CommonResult.success(null);
}
}
- 調用接口向主題中發送消息進行測試,訪問地址:http://localhost:8088/swagger-ui.html
- 後台成功接收到消息並進行打印。
2025-08-01T16:25:48.503+08:00 INFO 35516 --- [spring-mqtt] [ubscriberClient] c.m.blog.mqtt.config.MqttInboundConfig : handleMessage : 來自網頁上的消息
2025-08-01T16:25:49.329+08:00 INFO 35516 --- [spring-mqtt] [ubscriberClient] c.m.blog.mqtt.config.MqttInboundConfig : handleMessage : 來自網頁上的消息
2025-08-01T16:25:50.134+08:00 INFO 35516 --- [spring-mqtt] [ubscriberClient] c.m.blog.mqtt.config.MqttInboundConfig : handleMessage : 來自網頁上的消息
總結
消息中間件應用越來越廣泛,不僅可以實現可靠的異步通信,還可以實現即時通訊,掌握一個消息中間件還是很有必要的。如果沒有特殊業務需求,客户端或者前端直接使用MQTT對接消息中間件即可實現即時通訊,有特殊需求的時候也可以使用SpringBoot集成MQTT的方式來實現,總之消息中間件是實現即時通訊的一個好選擇!
項目源碼地址
https://github.com/macrozheng/spring-examples/tree/master/spring-mqtt