目錄
概述
1 認識Eclipse Paho C
1.1 paho.mqtt.c簡介
1.2 下載和安裝paho.mqtt.c
2 異步發佈和訂閲消息
2.1 編寫異步發佈消息功能
2.1.1 初始化MQTT參數
2.1.2 初始化函數
2.1.2 發佈消息函數
2.1.3 完整代碼
2.2 編譯代碼和測試
3 驗證MQTT Client功能
3.1 EMQX服務器上查看MQTT Client
3.2 MQTT.fx訂閲MQTTAsync
概述
本文主要介紹在linux環境(ubuntu)環境下,下載和安裝Eclipse Paho C MQTT 軟件包,還編寫一個範例實現異步發佈Message的功能,並使用基於EMQX的服務驗證其功能,還使用MQTT.fx訂閲消息,以驗證發佈消息功能的可靠性。
1 認識Eclipse Paho C
1.1 paho.mqtt.c簡介
paho.mqtt.c是一個使用C語言編寫的MQTT客户端庫,用於連接和與MQTT代理進行通信。它是Eclipse Paho項目的一部分,旨在為C開發人員提供一個易於使用、可靠和高性能的MQTT解決方案。
這個庫允許開發人員創建MQTT客户端,該客户端可以連接到遠程MQTT代理,並實現與代理之間的消息發佈、訂閲和管理等功能。paho.mqtt.c庫提供了一套簡單而直觀的API,使開發人員能夠輕鬆地使用MQTT協議進行通信。
paho.mqtt.c庫支持多種MQTT協議版本,包括MQTTv3.1和MQTTv3.1.1。它提供了多種操作和配置選項,以適應不同的需求和場景。該庫還提供了一些高級功能,例如支持SSL/TLS加密、持久化會話、消息保持和遺囑消息等。
paho.mqtt.c庫在多個平台上都可以使用,包括Windows、Linux、macOS和嵌入式系統等。它具有良好的可移植性和可擴展性,可以方便地與其他C語言項目集成。
總之,paho.mqtt.c是一個強大而靈活的MQTT客户端庫,適用於使用C語言開發MQTT應用程序的開發人員。它簡化了與MQTT代理的通信過程,並提供了豐富的功能和選項,使開發人員能夠快速構建高性能的MQTT應用程序。
1.2 下載和安裝paho.mqtt.c
登錄mqtt官網,點擊Software,可以看見如下頁面,選擇Eclipse Paho C進入下載頁面
https://mqtt.org/
點擊鏈接後,會跳轉到下載頁面,在該頁面下載paho.mqtt.c
筆者選擇使用命令直接安裝該軟件包,具體操作步驟如下:
Step -1: 下載軟件包執行命令:
git clone https://github.com/eclipse/paho.mqtt.c.git
step-2: 進入paho.mqtt.c目錄,執行make
cd paho.mqtt.c
make
系統會自動編譯代碼,等待編譯結果。
編譯完成後,會自動生成build文件,這時可以安裝
step-3 : 執行如下命令就可以安裝軟件
sudo make install
2 異步發佈和訂閲消息
2.1 編寫異步發佈消息功能
2.1.1 初始化MQTT參數
參數功能介紹:
|
參數名稱
|
參數值
|
描述
|
|
ADDRESS
|
tcp://192.168.1.11:1883
|
mqtt broker的IP地址
|
|
CLIENTID
|
mqtt_ubuntu_asys
|
設備ID
|
|
TOPIC
|
MQTTAsync
|
發佈的Topic
|
|
PAYLOAD
|
12.56
|
Topic下的payload
|
|
QOS
|
1
|
服務質量等級=1
|
|
TIMEOUT
|
10000L
|
等待超時時間(us)
|
|
USERNAME
|
mqtt_ubuntu_user
|
終端認證username
|
|
PASSWORD
|
123456
|
終端認證username對應的password
|
在代碼中,其需要填寫的位置如下:
2.1.2 初始化函數
初始化MQTT終端需要完成以下3個步驟:
step-1: 創建MQTT Client
step-2: 配置回調函數
step-3: 連接服務器
具體實現代碼如下:
代碼85行:創建MQTT Client,需要傳入服務器IP和Client ID信息
代碼93行:配置callback函數,函數原型,見源碼
代碼100行: 心跳包時間間隔設置為60s
代碼101行: 清除會話 標記設置為1,不接受離線消息
代碼102行: 配置設備終端用户
代碼103行: 配置設備終端用户password
2.1.2 發佈消息函數
要實現發佈消息功能,需要將payload及其相關參數填到MQTTClient_message定義的數據結構中,下面介紹整個public message 函數的功能。
代碼59行: 裝載payload
代碼60行:payload的字符長度
代碼61行:消息服務等級參數
代碼62行:配置為不保留消息
代碼64行:使用MQTTClient_publishMessage函數發佈消息
代碼74行:比較返回來token的值,如果數據發送成功,token值會自動加1
2.1.3 完整代碼
創建test_01_Asynchronous.c,編寫如下代碼:
/***************************************************************
Copyright 2024-2029. All rights reserved.
文件名 : test_01_Asynchronous.c
作者 : tangmingfei2013@126.com
版本 : V1.0
描述 : mqtt異步發佈和訂閲消息
日誌 : 初版V1.0 2024/03/13
***************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"
#include <unistd.h>
#define ADDRESS "tcp://192.168.1.11:1883"
#define CLIENTID "mqtt_ubuntu_asys"
#define TOPIC "MQTTAsync"
#define PAYLOAD "12.56"
#define QOS 1
#define TIMEOUT 10000L
#define USERNAME "mqtt_ubuntu_user"
#define PASSWORD "123456"
static MQTTClient client;
static MQTTClient_deliveryToken deliveredtoken;
static MQTTClient_message pubmsg = MQTTClient_message_initializer;
static MQTTClient_deliveryToken token;
static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
void delivered(void *context, MQTTClient_deliveryToken dt)
{
printf("Message with token value %d delivery confirmed\n", dt);
deliveredtoken = dt;
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: %.*s\n", message->payloadlen, (char*)message->payload);
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
void connlost(void *context, char *cause)
{
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
}
void public_mqttMsg()
{
int rc;
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = (int)strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 1;
deliveredtoken = 0;
if ((rc = MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to publish message, return code %d\n", rc);
rc = EXIT_FAILURE;
}
else
{
printf("Waiting for publication of %s\n"
"on topic %s for client with ClientID: %s\n",
PAYLOAD, TOPIC, CLIENTID);
while (deliveredtoken != token)
{
usleep(10000000L);
}
}
}
int main(int argc, char* argv[])
{
int rc;
if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to create client, return code %d\n", rc);
rc = EXIT_FAILURE;
goto exit;
}
if ((rc = MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to set callbacks, return code %d\n", rc);
rc = EXIT_FAILURE;
goto destroy_exit;
}
conn_opts.keepAliveInterval = 60;
conn_opts.cleansession = 0;
conn_opts.username = USERNAME; //用户名
conn_opts.password = PASSWORD; //密碼
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
rc = EXIT_FAILURE;
goto destroy_exit;
}
while(1)
{
public_mqttMsg();
}
if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to disconnect, return code %d\n", rc);
rc = EXIT_FAILURE;
}
destroy_exit:
MQTTClient_destroy(&client);
exit:
return rc;
}
2.2 編譯代碼和測試
使用如下命令編譯代碼
gcc test_01_Asynchronous.c -lpaho-mqtt3a
執行.out文件後,可以看見,MQTT Client發佈消息成功了,服務器端收到消息後,token值會自動加1
3 驗證MQTT Client功能
3.1 EMQX服務器上查看MQTT Client
在ubuntu上運行MQTT Client後,EMQX服務器會顯示MQTT Client的運行狀態,登錄EMQX服務器可以看見
為了測試方便,置retained = 1,這樣EMQX服務器會保留Client發佈的payload,便於在服務器上查看client發佈的數據。
在EMQX服務器上,打開保留消息面板,查看MQTTAsync主題的payload,其和發佈的值一致。
3.2 MQTT.fx訂閲MQTTAsync
要使用MQTT.fx MQTT Client工具訂閲MQTTAsync,首先保證MQTT.fx能正常連接至EMQX服務器
連接成功後,可以在MQTT.fx 上訂閲MQTTAsync,訂閲成功後,MQTT.fx subscribe面板會打印MQTTAsync的log
同時,在EMQX服務器上的訂閲管理面板上可以看見:MQTT.fx 訂閲Topic 為MQTTAsync的消息