动态

详情 返回 返回

還在用WebSocket實現即時通訊?試試MQTT吧,真香! - 动态 详情

有時候我們的項目中會用到即時通訊功能,比如電商系統中的客服聊天、支付成功後的異步回調通知等。最近發現RabbitMQ可以很方便的實現即時通訊功能,如果你沒有特殊的業務需求,甚至可以不寫後端代碼,今天給大家介紹下如何使用RabbitMQ來實現即時通訊

MQTT協議

MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基於發佈/訂閲(publish/subscribe)模式的輕量級通訊協議,該協議構建於TCP/IP協議上。MQTT最大優點在於,可以以極少的代碼和有限的帶寬,為連接遠程設備提供實時可靠的消息服務。

相關概念

  • Publisher(發佈者):消息的發出者,負責發送消息。
  • Subscriber(訂閲者):消息的訂閲者,負責接收並處理消息。
  • Broker(代理):消息代理,位於消息發佈者和訂閲者之間,各類支持MQTT協議的消息中間件都可以充當。
  • Topic(主題):可以理解為消息隊列中的路由,訂閲者訂閲了主題之後,就可以收到發送到該主題的消息。
  • Payload(負載);可以理解為發送消息的內容。
  • QoS(消息質量):全稱Quality of Service,即消息的發送質量,主要有QoS 0QoS 1QoS 2三個等級,下面分別介紹下:

    • QoS 0(Almost Once):至多一次,只發送一次,會發生消息丟失或重複;
    • QoS 1(Atleast Once):至少一次,確保消息到達,但消息重複可能會發生;
    • QoS 2(Exactly Once):只有一次,確保消息只到達一次。

RabbitMQ啓用MQTT功能

RabbitMQ默認不會啓用啓用MQTT功能,需要手動開啓。
  • 我們需要先安裝並啓動RabbitMQ,這裏以Docker環境安裝為例,運行命令如下;
docker run -p 5672:5672 -p 15672:15672 -p 1883:1883 -p 15675:15675 --name rabbitmq-mqtt \
-v /mydata/rabbitmq-mqtt/data:/var/lib/rabbitmq \
-d rabbitmq:3.9.11-management
  • 接下來就是啓用RabbitMQ的MQTT WEB插件了,進入rabbitmq容器後,使用如下命令開啓;
# 先進入rabbitmq容器
docker exec -it rabbitmq-mqtt /bin/bash
# 再啓用mqtt web插件,會同時啓用rabbitmq_mqtt插件
rabbitmq-plugins enable rabbitmq_web_mqtt
  • 開啓成功後,查看管理控制枱,我們可以發現MQTT的web服務運行在15675端口上了,訪問地址:http://192.168.3.101:15672

這或許是一個對你有用的開源項目,mall項目是一套基於SpringBoot3 + Vue 的電商系統(Github標星60K),後端支持多模塊和最新微服務架構 ,採用Docker和K8S部署。包括前台商城項目和後台管理系統,能支持完整的訂單流程!涵蓋商品、訂單、購物車、權限、優惠券、會員、支付等功能!

  • Boot項目:https://github.com/macrozheng/mall
  • Cloud項目:https://github.com/macrozheng/mall-swarm
  • 教程網站:https://www.macrozheng.com

項目演示:

MQTT客户端

我們可以使用MQTT客户端來測試MQTT的即時通訊功能,這裏使用的是MQTTX這個客户端工具。
  • 這裏使用Docker環境為例,運行命令如下;
docker run -p 80:80 --name mqttx-web -d emqx/mqttx-web
  • 運行成功後可以訪問MQTTX的控制枱,訪問地址:http://192.168.3.101

  • 點擊左側的加號按鈕來創建一個MQTT連接,配置好連接信息,注意MQTT版本選擇3.1.1

  • 再添加一個訂閲,訂閲testTopicA這個主題,我們會向這個主題發送消息;

  • 發佈者向主題中發佈消息,訂閲者可以實時接收到。

前端直接實現即時通訊

既然MQTTX客户端可以直接通過RabbitMQ實現即時通訊,那我們直接使用前端技術是否也能實現即時通訊?答案是肯定的!下面我們將通過html+javascript實現一個簡單的聊天功能,真正不寫一行後端代碼實現即時通訊!
  • WEB端與MQTT服務進行通訊需要使用一個叫MQTT.js的庫,項目地址:https://github.com/mqttjs/MQTT.js

  • 實現的功能非常簡單,一個單聊功能,需要注意的是配置好MQTT服務的訪問地址為:ws://192.168.3.101:15675/ws
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<div>
    <label>目標Topic:<input id="targetTopicInput" type="text"></label><br>
    <label>發送消息:<input id="messageInput" type="text"></label><br>
    <button onclick="sendMessage()">發送</button>
    <button onclick="clearMessage()">清空</button>
    <div id="messageDiv"></div>
</div>
</body>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script>
    //RabbitMQ的web-mqtt連接地址
    const url = 'ws://192.168.3.101:15675/ws';
    //獲取訂閲的topic
    const topic = getQueryString("topic");
    //連接到消息隊列
    let client = mqtt.connect(url);
    client.on('connect', function () {
        //連接成功後訂閲topic
        client.subscribe(topic, function (err) {
            if (!err) {
                showMessage("訂閲topic:" + topic + "成功!");
            }
        });
    });
    //獲取訂閲topic中的消息
    client.on('message', function (topic, message) {
        showMessage("收到消息:" + message.toString());
    });

    //發送消息
    function sendMessage() {
        let targetTopic = document.getElementById("targetTopicInput").value;
        let message = document.getElementById("messageInput").value;
        //向目標topic中發送消息
        client.publish(targetTopic, message);
        showMessage("發送消息給" + targetTopic + "的消息:" + message);
    }

    //從URL中獲取參數
    function getQueryString(name) {
        let reg = new RegExp("(^|&)" + name + "=([^&]*)(&|$)", "i");
        let r = window.location.search.substr(1).match(reg);
        if (r != null) {
            return decodeURIComponent(r[2]);
        }
        return null;
    }

    //在消息列表中展示消息
    function showMessage(message) {
        let messageDiv = document.getElementById("messageDiv");
        let messageEle = document.createElement("div");
        messageEle.innerText = message;
        messageDiv.appendChild(messageEle);
    }

    //清空消息列表
    function clearMessage() {
        let messageDiv = document.getElementById("messageDiv");
        messageDiv.innerHTML = "";
    }
</script>
</html>
  • 接下來我們訂閲不同的主題開啓兩個頁面測試下功能(頁面已經放在SpringBoot項目的resources目錄下了,需要先啓動應用再訪問):

    • 第一個訂閲主題testTopicA,訪問地址:http://localhost:8088/page/index?topic=testTopicA
    • 第二個訂閲主題testTopicB,訪問地址:http://localhost:8088/page/index?topic=testTopicB
  • 之後互相發送消息,讓我們來看看效果吧!

在SpringBoot中使用

沒有特殊業務需求的時候,前端可以直接和RabbitMQ對接實現即時通訊。但有時候我們需要通過服務端去通知前端,此時就需要在應用中集成MQTT了,接下來我們來講講如何在SpringBoot應用中使用MQTT。
  • 首先我們需要在項目的pom.xml中添加MQTT相關依賴;
<!--Spring集成MQTT-->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
  • application.yml中添加MQTT相關配置,主要是訪問地址、用户名密碼、默認主題信息;
rabbitmq:
  mqtt:
    url: tcp://192.168.3.101:1883
    username: guest
    password: guest
    defaultTopic: testTopic
  • 編寫一個Java配置類從配置文件中讀取上面的配置便於使用;
/**
 * @auther macrozheng
 * @description MQTT相關配置
 * @date 2025/8/1
 * @github https://github.com/macrozheng
 */
@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "rabbitmq.mqtt")
public class MqttConfig {
    /**
     * RabbitMQ連接用户名
     */
    private String username;
    /**
     * RabbitMQ連接密碼
     */
    private String password;
    /**
     * RabbitMQ的MQTT默認topic
     */
    private String defaultTopic;
    /**
     * RabbitMQ的MQTT連接地址
     */
    private String url;
}
  • 添加MQTT消息訂閲者相關配置,使用@ServiceActivator註解聲明一個服務激活器,通過MessageHandler來處理訂閲消息;
/**
 * @auther macrozheng
 * @description MQTT消息訂閲者相關配置
 * @date 2025/8/1
 * @github https://github.com/macrozheng
 */
@Slf4j
@Configuration
public class MqttInboundConfig {
    @Autowired
    private MqttConfig mqttConfig;

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",
                        mqttConfig.getDefaultTopic());
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        //設置消息質量:0->至多一次;1->至少一次;2->只有一次
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                //處理訂閲消息
                log.info("handleMessage : {}",message.getPayload());
            }

        };
    }
}
  • 添加MQTT消息發佈者相關配置;
/**
 * @auther macrozheng
 * @description MQTT消息發佈者相關配置
 * @date 2025/8/1
 * @github https://github.com/macrozheng
 */
@Configuration
public class MqttOutboundConfig {

    @Autowired
    private MqttConfig mqttConfig;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { mqttConfig.getUrl()});
        options.setUserName(mqttConfig.getUsername());
        options.setPassword(mqttConfig.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}
  • 添加MQTT網關,用於向主題中發送消息;
/**
 * @auther macrozheng
 * @description MQTT網關,通過接口將數據傳遞到集成流
 * @date 2025/8/1
 * @github https://github.com/macrozheng
 */
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    /**
     * 發送消息到默認topic
     */
    void sendToMqtt(String payload);

    /**
     * 發送消息到指定topic
     */
    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);

    /**
     * 發送消息到指定topic並設置QOS
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
  • 添加MQTT測試接口,使用MQTT網關向特定主題中發送消息;
/**
 * @auther macrozheng
 * @description MQTT測試接口
 * @date 2025/8/1
 * @github https://github.com/macrozheng
 */
@Slf4j
@RestController
@Tag(name = "MqttController", description = "MQTT測試接口")
@RequestMapping("/mqtt")
public class MqttController {

    @Autowired
    private MqttGateway mqttGateway;

    @PostMapping("/sendToDefaultTopic")
    @Operation(summary = "向默認主題發送消息")
    public CommonResult sendToDefaultTopic(String payload) {
        mqttGateway.sendToMqtt(payload);
        return CommonResult.success(null);
    }

    @PostMapping("/sendToTopic")
    @Operation(summary = "向指定主題發送消息")
    public CommonResult sendToTopic(String payload, String topic) {
        mqttGateway.sendToMqtt(payload, topic);
        return CommonResult.success(null);
    }
}
  • 調用接口向主題中發送消息進行測試,訪問地址:http://localhost:8088/swagger-ui.html

  • 後台成功接收到消息並進行打印。
2025-08-01T16:25:48.503+08:00  INFO 35516 --- [spring-mqtt] [ubscriberClient] c.m.blog.mqtt.config.MqttInboundConfig   : handleMessage : 來自網頁上的消息
2025-08-01T16:25:49.329+08:00  INFO 35516 --- [spring-mqtt] [ubscriberClient] c.m.blog.mqtt.config.MqttInboundConfig   : handleMessage : 來自網頁上的消息
2025-08-01T16:25:50.134+08:00  INFO 35516 --- [spring-mqtt] [ubscriberClient] c.m.blog.mqtt.config.MqttInboundConfig   : handleMessage : 來自網頁上的消息

總結

消息中間件應用越來越廣泛,不僅可以實現可靠的異步通信,還可以實現即時通訊,掌握一個消息中間件還是很有必要的。如果沒有特殊業務需求,客户端或者前端直接使用MQTT對接消息中間件即可實現即時通訊,有特殊需求的時候也可以使用SpringBoot集成MQTT的方式來實現,總之消息中間件是實現即時通訊的一個好選擇!

項目源碼地址

https://github.com/macrozheng/spring-examples/tree/master/spring-mqtt

user avatar zjkal 头像 lslove 头像 feichangkudechongfengyi 头像 zhaoqianglaoshi 头像 euphoria 头像 lanyiyun666 头像
点赞 6 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.