背景
MQTT 是一個基於 TCP 協議的發佈/訂閲模型協議,它被廣泛應用於物聯網、傳感器網絡和其他低帶寬、不穩定網絡環境中。在這些網絡環境中,網絡連接往往不穩定,可能會出現網絡故障、信號弱化、丟包等問題,這可能會導致 MQTT 客户端與服務器之間的連接中斷。物聯網應用中,常見的觸發斷線重連的場景包括:
- 網絡環境惡劣或者斷網,造成 MQTT 客户端連接超時斷開。
- 由於業務需要服務端升級切換,服務端主動關閉斷開。
- 設備重啓或客户端重啓,客户端主動重連。
- 其他網絡因素造成 TCP/IP 傳輸層斷開導致 MQTT 連接重連。
為了確保 MQTT 客户端與服務器之間的穩定連接,MQTT 客户端需要實現重連邏輯,幫助 MQTT 客户端自動重新連接服務器,並恢復之前的訂閲關係、保持會話等狀態。
為什麼 MQTT 客户端重連代碼需要良好的設計
MQTT 設備重連是很多物聯網應用中不可避免的情況。設計 MQTT 客户端重連邏輯時需要注意使用正確的事件回調方法,每次重連設置合理的隨機退避時間,以保證客户端和服務端的長時間穩定運行,從而確保業務的正常開展。
不合理的重連邏輯設計可能會造成諸多問題:
- 重連邏輯失效導致客户端靜默不再接受 Broker 消息。
- 客户端頻繁重連,無重連退避時間導致形成 DDOS 攻擊服務端 Broker。
- 客户端頻繁上下線導致 Broker 服務端資源過量不必要的消耗。
而合理的重連邏輯既可以提高 MQTT 客户端的穩定性和可靠性,避免因網絡連接中斷而導致的數據丟失、延遲等問題,還可以降低由於頻繁連接對服務器端的壓力。
如何設計一段 MQTT 客户端重連代碼
在進行 MQTT 客户端重連代碼設計時需要考慮以下幾個方面:
- 設置正確的連接保活時間 MQTT 客户端的連接保活時間即 Keep Alive,負責檢測當前連接的健康狀態。Keep Alive 超時會觸發客户端重連和服務端關閉客户端連接。該數值會影響到服務端和客户端檢測到連接斷開不可用的時長,用户需要根據自身網絡狀態,以及期望的最長等待時間來設置合理的 Keep Alive。
- 重連策略和退避 用户應該根據網絡環境的不同,制定不同的重連策略。例如,當網絡連接中斷時,可以設置一個初始等待時間,並在每次重連嘗試後逐漸增加等待時間,以避免網絡連接中斷導致的大量重連嘗試。建議使用指數退避算法或隨機 + 階梯延時來留出足夠的退避時隙。
- 連接狀態管理 需要在客户端中維護連接狀態,包括連接狀態的記錄、連接斷開的原因、已訂閲的主題列表等信息。當連接中斷時,客户端應該記錄下連接斷開的原因,並進行相應的重連嘗試。但如果使用會話保持功能,則不需要客户端自己保存這些信息。
- 異常處理 在連接過程中可能會發生各種異常情況,例如服務器不可用、認證失敗、網絡異常等。需要在客户端中添加異常處理邏輯,根據異常情況進行相應的處理。MQTT 5 協議提供了詳實的此類斷開連接原因,客户端可以根據這些信息記錄異常日誌、斷開連接、再次重連等。
- 最大嘗試次數限制 對於一些低功耗設備,為避免重連次數過多導致客户端資源消耗過大,有時候需要考慮限制最大重連嘗試次數。當超過最大嘗試次數後,客户端應該中止重連嘗試進入休眠狀態,避免無意義的重連。
- 退避算法 有兩種常用的重連退避方法:指數補償算法和隨機退避。指數補償算法是通過負反饋機制指數增加等待時間來找到合適的發送/連接速率。隨機退避即通過設置等待時間的上下限,每次重連都等待隨機的延時時間,由於其易於實現而有廣泛使用。
重連代碼示例
我們將以 Paho MQTT C 的庫為例,示範如何使用異步編程模型優雅完成自動重連功能。Paho 提供了豐富的回調函數,請注意不同回調方法觸發條件和設置方式不同,分別有全局回調、API 回調和異步方法回調。API 回調有相當的靈活性,但當開啓自動重連功能時,建議只使用異步回調。此處對三種回調函數都提供了例程,用户可以使用此例程驗證三種回調函數的觸發。
// 是 Async 使用的回調方法
// 連接成功的異步回調函數,在連接成功的地方進行Subscribe操作。
void conn_established(void *context, char *cause)
{
printf("client reconnected!\n");
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
printf("Successful connection\n");
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
"Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
opts.onSuccess = onSubscribe;
opts.onFailure = onSubscribeFailure;
opts.context = client;
if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start subscribe, return code %d\n", rc);
finished = 1;
}
}
// 以下為客户端全局連接斷開回調函數
void conn_lost(void *context, char *cause)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
printf("\nConnection lost\n");
if (cause) {
printf(" cause: %s\n", cause);
}
printf("Reconnecting\n");
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.maxRetryInterval = 16;
conn_opts.minRetryInterval = 1;
conn_opts.automaticReconnect = 1;
conn_opts.onFailure = onConnectFailure;
MQTTAsync_setConnected(client, client, conn_established);
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
finished = 1;
}
}
int main(int argc, char* argv[])
{
// 創建異步連接客户端需要使用的屬性結構體
MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
int rc;
int ch;
// 創建異步連接客户端,不使用 Paho SDK 內置的持久化來處理緩存消息
if ((rc = MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL))
!= MQTTASYNC_SUCCESS)
{
printf("Failed to create client, return code %d\n", rc);
rc = EXIT_FAILURE;
goto exit;
}
// 設置異步連接回調,注意此處設置的回調函數為連接層面的全局回調函數
// conn_lost 為連接斷開觸發,有且只有連接成功後斷開才會觸發,在斷開連接的情況下進行重連失敗不觸發。
// msgarrvd 收到消息時觸發的回調函數
// msgdeliverd 是消息成功發送的回調函數,一般設置為NULL
if ((rc = MQTTAsync_setCallbacks(client, client, conn_lost, msgarrvd, msgdeliverd)) != MQTTASYNC_SUCCESS)
{
printf("Failed to set callbacks, return code %d\n", rc);
rc = EXIT_FAILURE;
goto destroy_exit;
}
// 設置連接參數
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
// 此處設置 API調用失敗會觸發的回調,接下來進行connect操作所以設置為 onConnectFailure 方法
conn_opts.onFailure = onConnectFailure;
// 此處設置 客户端連接API調用成功會觸發的回調,由於例程使用異步連接的 API,設置了會導致2個回調都被觸發,所以建議不使用此回調
//conn_opts.onSuccess = onConnect;
// 注意第一次發起連接失敗不會觸發自動重連,只有曾經成功連接並斷開後才會觸發
conn_opts.automaticReconnect = 1;
//開啓自動重連,並且設置 2-16s 的隨機退避時間
conn_opts.maxRetryInterval = 16;
conn_opts.minRetryInterval = 2;
conn_opts.context = client;
// 設置異步回調函數,此與之前的 API 回調不同,每次連接/斷開都會觸發
MQTTAsync_setConnected(client, client, conn_established);
MQTTAsync_setDisconnected(client, client, disconnect_lost);
// 啓動客户端連接,之前設置的 API 回調只會在這一次操作生效
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
rc = EXIT_FAILURE;
goto destroy_exit;
}
......
}
查看 MQTTAsync_subscribe.c 詳細代碼。
更多選擇:NanoSDK 內置重連策略
NanoSDK 是除了 Paho 以外的又一 MQTT SDK 選擇。NanoSDK 基於 NNG-NanoMSG 項目開發,使用 MIT License,對開源和商業都很友好。相較於 Paho 其最大的不同在於內置的全異步 I/O 和 支持 Actor 編程模型,當使用 QoS 1/2 消息時可以獲得更高的消息吞吐速率。而且 NanoSDK 支持 MQTT over QUIC 協議,與大規模物聯網 MQTT 消息服務器 EMQX 5.0 結合可解決弱網下的數據傳輸難題。這些優勢使得它已經在車聯網和工業場景中得到了廣泛的使用。
在 NanoSDK 中,重連策略已經完全內置,無需用户手動實現。
// nanosdk 採用自動撥號機制,默認進行重連
nng_dialer_set_ptr(*dialer, NNG_OPT_MQTT_CONNMSG, connmsg);
nng_dialer_start(*dialer, NNG_FLAG_NONBLOCK);
總結
本文介紹在 MQTT 客户端代碼實現過程中,重連邏輯設計的重要性與最佳實踐。通過本文,讀者可以設計更為合理的 MQTT 設備重連代碼,降低客户端與服務器端的資源開銷,構建更加穩定可靠的物聯網設備連接。
版權聲明: 本文為 EMQ 原創,轉載請註明出處。
原文鏈接:https://www.emqx.com/zh/blog/mqtt-client-auto-reconnect-best-practices