目錄

概述

1 認識Eclipse Paho C

1.1 paho.mqtt.c簡介

1.2 下載和安裝paho.mqtt.c

2 異步發佈和訂閲消息

2.1 編寫異步發佈消息功能

2.1.1 初始化MQTT參數

2.1.2 初始化函數

2.1.2 發佈消息函數

2.1.3 完整代碼

2.2 編譯代碼和測試

3 驗證MQTT Client功能

3.1 EMQX服務器上查看MQTT Client

3.2 MQTT.fx訂閲MQTTAsync


概述

本文主要介紹在linux環境(ubuntu)環境下,下載和安裝Eclipse Paho C MQTT 軟件包,還編寫一個範例實現異步發佈Message的功能,並使用基於EMQX的服務驗證其功能,還使用MQTT.fx訂閲消息,以驗證發佈消息功能的可靠性。

1 認識Eclipse Paho C

1.1 paho.mqtt.c簡介

paho.mqtt.c是一個使用C語言編寫的MQTT客户端庫,用於連接和與MQTT代理進行通信。它是Eclipse Paho項目的一部分,旨在為C開發人員提供一個易於使用、可靠和高性能的MQTT解決方案。

這個庫允許開發人員創建MQTT客户端,該客户端可以連接到遠程MQTT代理,並實現與代理之間的消息發佈、訂閲和管理等功能。paho.mqtt.c庫提供了一套簡單而直觀的API,使開發人員能夠輕鬆地使用MQTT協議進行通信。

paho.mqtt.c庫支持多種MQTT協議版本,包括MQTTv3.1和MQTTv3.1.1。它提供了多種操作和配置選項,以適應不同的需求和場景。該庫還提供了一些高級功能,例如支持SSL/TLS加密、持久化會話、消息保持和遺囑消息等。

paho.mqtt.c庫在多個平台上都可以使用,包括Windows、Linux、macOS和嵌入式系統等。它具有良好的可移植性和可擴展性,可以方便地與其他C語言項目集成。

總之,paho.mqtt.c是一個強大而靈活的MQTT客户端庫,適用於使用C語言開發MQTT應用程序的開發人員。它簡化了與MQTT代理的通信過程,並提供了豐富的功能和選項,使開發人員能夠快速構建高性能的MQTT應用程序。

1.2 下載和安裝paho.mqtt.c

登錄mqtt官網,點擊Software,可以看見如下頁面,選擇Eclipse Paho C進入下載頁面

https://mqtt.org/

java mqtt 多個設備 是共用一個client 還是一個設備一個client_#define

點擊鏈接後,會跳轉到下載頁面,在該頁面下載paho.mqtt.c

java mqtt 多個設備 是共用一個client 還是一個設備一個client_#eclipse_02

筆者選擇使用命令直接安裝該軟件包,具體操作步驟如下:

Step -1: 下載軟件包執行命令:

git clone https://github.com/eclipse/paho.mqtt.c.git

java mqtt 多個設備 是共用一個client 還是一個設備一個client_#linux_03

step-2: 進入paho.mqtt.c目錄,執行make

cd paho.mqtt.c
make

系統會自動編譯代碼,等待編譯結果。

java mqtt 多個設備 是共用一個client 還是一個設備一個client_#運維_04

編譯完成後,會自動生成build文件,這時可以安裝

java mqtt 多個設備 是共用一個client 還是一個設備一個client_#運維_05

step-3 : 執行如下命令就可以安裝軟件


sudo make install


java mqtt 多個設備 是共用一個client 還是一個設備一個client_#linux_06

2 異步發佈和訂閲消息

2.1 編寫異步發佈消息功能

2.1.1 初始化MQTT參數

參數功能介紹:

參數名稱

參數值

描述

ADDRESS

tcp://192.168.1.11:1883

mqtt broker的IP地址

CLIENTID

mqtt_ubuntu_asys

設備ID

TOPIC

MQTTAsync

發佈的Topic

PAYLOAD

12.56

Topic下的payload

QOS

1

服務質量等級=1

TIMEOUT

10000L

等待超時時間(us)

USERNAME

mqtt_ubuntu_user

終端認證username

PASSWORD

123456

終端認證username對應的password

在代碼中,其需要填寫的位置如下:

java mqtt 多個設備 是共用一個client 還是一個設備一個client_#運維_07

2.1.2 初始化函數

初始化MQTT終端需要完成以下3個步驟:

step-1: 創建MQTT Client

step-2: 配置回調函數

step-3: 連接服務器

具體實現代碼如下:

代碼85行:創建MQTT Client,需要傳入服務器IP和Client ID信息

代碼93行:配置callback函數,函數原型,見源碼

代碼100行: 心跳包時間間隔設置為60s

代碼101行: 清除會話 標記設置為1,不接受離線消息

代碼102行: 配置設備終端用户

代碼103行: 配置設備終端用户password

java mqtt 多個設備 是共用一個client 還是一個設備一個client_#define_08

2.1.2 發佈消息函數

要實現發佈消息功能,需要將payload及其相關參數填到MQTTClient_message定義的數據結構中,下面介紹整個public message 函數的功能。

代碼59行: 裝載payload

代碼60行:payload的字符長度

代碼61行:消息服務等級參數

代碼62行:配置為不保留消息

代碼64行:使用MQTTClient_publishMessage函數發佈消息

代碼74行:比較返回來token的值,如果數據發送成功,token值會自動加1

java mqtt 多個設備 是共用一個client 還是一個設備一個client_服務器_09

2.1.3 完整代碼

創建test_01_Asynchronous.c,編寫如下代碼:

/***************************************************************
Copyright  2024-2029. All rights reserved.
文件名  : test_01_Asynchronous.c
作者    : tangmingfei2013@126.com
版本    : V1.0
描述    : mqtt異步發佈和訂閲消息
日誌    : 初版V1.0 2024/03/13

***************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"
#include <unistd.h>
 
#define ADDRESS     "tcp://192.168.1.11:1883"
#define CLIENTID    "mqtt_ubuntu_asys"
#define TOPIC       "MQTTAsync"
#define PAYLOAD     "12.56"
#define QOS         1
#define TIMEOUT     10000L

#define USERNAME    "mqtt_ubuntu_user"
#define PASSWORD    "123456" 

static MQTTClient client;
static MQTTClient_deliveryToken deliveredtoken;
static MQTTClient_message pubmsg = MQTTClient_message_initializer;
static MQTTClient_deliveryToken token;
static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
 
void delivered(void *context, MQTTClient_deliveryToken dt)
{
    printf("Message with token value %d delivery confirmed\n", dt);
    deliveredtoken = dt;
}
 
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
    printf("Message arrived\n");
    printf("     topic: %s\n", topicName);
    printf("   message: %.*s\n", message->payloadlen, (char*)message->payload);
    MQTTClient_freeMessage(&message);
    MQTTClient_free(topicName);
    
    return 1;
}
 
void connlost(void *context, char *cause)
{
    printf("\nConnection lost\n");
    printf("     cause: %s\n", cause);
}

void public_mqttMsg()
{
    int rc;

    pubmsg.payload = PAYLOAD;
    pubmsg.payloadlen = (int)strlen(PAYLOAD);
    pubmsg.qos = QOS;
    pubmsg.retained = 1;
    deliveredtoken = 0;
    if ((rc = MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token)) != MQTTCLIENT_SUCCESS)
    {
        printf("Failed to publish message, return code %d\n", rc);
        rc = EXIT_FAILURE;
    }
    else
    {
        printf("Waiting for publication of %s\n"
            "on topic %s for client with ClientID: %s\n",
            PAYLOAD, TOPIC, CLIENTID);
        while (deliveredtoken != token)
        {
            usleep(10000000L);
        }
    }
}


int main(int argc, char* argv[])
{
    int rc;
    
    if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID,
        MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
    {
        printf("Failed to create client, return code %d\n", rc);
        rc = EXIT_FAILURE;
        goto exit;
    }
 
    if ((rc = MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered)) != MQTTCLIENT_SUCCESS)
    {
        printf("Failed to set callbacks, return code %d\n", rc);
        rc = EXIT_FAILURE;
        goto destroy_exit;
    }
 
    conn_opts.keepAliveInterval = 60;
    conn_opts.cleansession = 0;
    conn_opts.username = USERNAME;   //用户名
    conn_opts.password = PASSWORD;   //密碼
    
    if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
    {
        printf("Failed to connect, return code %d\n", rc);
        rc = EXIT_FAILURE;
        goto destroy_exit;
    }
    
    while(1)
    {
        public_mqttMsg();
    }
 
    if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS)
    {
        printf("Failed to disconnect, return code %d\n", rc);
        rc = EXIT_FAILURE;
    }
 
destroy_exit:
    MQTTClient_destroy(&client);
 
exit:
    return rc;
}

2.2 編譯代碼和測試

使用如下命令編譯代碼


gcc test_01_Asynchronous.c -lpaho-mqtt3a


java mqtt 多個設備 是共用一個client 還是一個設備一個client_#linux_10

執行.out文件後,可以看見,MQTT Client發佈消息成功了,服務器端收到消息後,token值會自動加1

java mqtt 多個設備 是共用一個client 還是一個設備一個client_#運維_11

3 驗證MQTT Client功能

3.1 EMQX服務器上查看MQTT Client

在ubuntu上運行MQTT Client後,EMQX服務器會顯示MQTT Client的運行狀態,登錄EMQX服務器可以看見

java mqtt 多個設備 是共用一個client 還是一個設備一個client_#eclipse_12

為了測試方便,置retained = 1,這樣EMQX服務器會保留Client發佈的payload,便於在服務器上查看client發佈的數據。

java mqtt 多個設備 是共用一個client 還是一個設備一個client_#define_13

在EMQX服務器上,打開保留消息面板,查看MQTTAsync主題的payload,其和發佈的值一致。

java mqtt 多個設備 是共用一個client 還是一個設備一個client_服務器_14

3.2 MQTT.fx訂閲MQTTAsync

要使用MQTT.fx MQTT Client工具訂閲MQTTAsync,首先保證MQTT.fx能正常連接至EMQX服務器

java mqtt 多個設備 是共用一個client 還是一個設備一個client_#運維_15

連接成功後,可以在MQTT.fx 上訂閲MQTTAsync,訂閲成功後,MQTT.fx subscribe面板會打印MQTTAsync的log

java mqtt 多個設備 是共用一個client 還是一個設備一個client_#define_16

同時,在EMQX服務器上的訂閲管理面板上可以看見:MQTT.fx 訂閲Topic 為MQTTAsync的消息

java mqtt 多個設備 是共用一個client 還是一個設備一個client_#運維_17