Protocol Buffers (Protobuf) 是一種語言中立、平台中立的可擴展機制,用於序列化結構化數據的二進制傳輸格式。相比常規數據傳輸格式(如 JSON 或 XML),Protobuf 更加高效和快速並節省傳輸帶寬,因此得到了廣泛的應用。
在雲邊協同架構中,往往既需要發送數據到雲端,同時也需要接收雲端發送過來的數據,進行雲邊協同計算。大規模的雲邊協同計算傳輸的數據總量巨大,在公網帶寬資源有限而且昂貴的情況下,採用更緊湊的數據傳輸格式顯得尤為重要。
LF Edge eKuiper 是適合部署於資源受限的邊緣端的超輕量物聯網邊緣數據流式分析引擎,可通過 source 和 sink 連接 MQTT、HTTP 等各種通信協議的外部系統。eKuiper 支持配置 source/sink 的傳輸數據的編解碼格式,目前可支持 JSON、ProtoBuf 和 Binary 格式。
本文將以 Protobuf 格式為例,講解如何在 eKuiper 中設置編解碼格式,通過 source 讀入並解析該格式的數據以及在 sink 中使用該格式編碼寫入,從而實現高效的雲邊協同數據傳輸,緩解雲邊傳輸帶寬緊張問題。
本教程採用 eKuiper Manager 進行規則的創建和管理,請參考 UI 教程。您也可以採用 REST API 或者在 eKuiper 運行的邊端運行命令行工具來完成相同的規則管理操作。
環境準備
開始動手操作之前,需要準備以下環境:
- MQTT 服務器用於數據傳輸。 本教程使用位於
tcp://broker.emqx.io:1883的 MQTT 服務器,broker.emqx.io是一個由 EMQX Cloud 提供的公共 MQTT 服務器。若本地運行 eKuiper,需要更改etc/mqtt_source.yaml,配置項 server 改為"tcp://broker.emqx.io:1883";若使用 docker 啓動,應設置環境變量 MQTT_SOURCEDEFAULTSERVER="tcp://broker.emqx.io:1883"。 - 為了方便觀察運行結果,我們需要安裝一個 MQTT 客户端,例如 MQTT X 。
模式註冊(Schema Registry)
相比於無模式的 JSON 格式,Protobuf 需要提前定義數據結構,即模式。在 proto 文件中,可以包含多個 message 以及其他實體的定義,但是在編解碼格式的配置中,只有 message 的定義可以被使用。 本教程中,我們使用以下模式進行數據結構的定義。該文件定義了一個名為 Book 的 message 結構,其中包含字符串類型的 title 和整型的 price。傳輸的數據將依據此結構對書籍數據進行二進制數據的編解碼。
message Book {
required string title = 1;
required int32 price = 2;
}
-
註冊模式。在管理控制枱中,打開配置->模式,點擊創建模式。
-
在模式創建窗口中,如下圖所示填寫。其中,模式類型選擇
protobuf;模式名稱可輸入自定義的不重複的名稱作為後續規則創建中模式的標識 id;模式內容可採用文件或者文本內容填寫。選擇 file 的情況下,需要填寫文件所在的 url;本教程使用的模式較為簡單,因此可選擇 content,然後在內容框中填入 proto 文件的文本。 -
點擊提交。在模式列表中應當能夠看到新創建的模式。後續可使用操作欄中的按鈕進行修改或刪除的操作。
至此,我們已經註冊了名為schema1的模式,其中定義了Book這種類型,在規則的 source 和 sink 中可以使用該註冊的模式。用户也可以繼續在此界面進行更多的模式註冊和管理工作。
讀取 Protobuf 數據
本節中,我們以 MQTT source 為例,介紹如何接入並解析基於 Protobuf 編碼傳輸的數據,使之可以在 eKuiper 中進行規則的計算。需要注意的是,在 Source 中,編碼格式與傳輸協議並不是綁定的。任何的 source 類型如 MQTT, httpPull 等都可以搭配不同的編碼格式,例如 ProtoBuf 和 JSON 等。
假設我們有一個 MQTT 主題 demo,出於節省傳輸帶寬的目的,裏面傳輸的數據為 Protobuf 編碼的二進制數據。接下來,我們將配置 eKuiper 數據源,接入這個主題的數據並進行處理。
- 創建數據流:在管理控制枱中,選擇源管理->流管理,點擊創建流。
-
配置數據流及其格式:流名稱可設置為自定義的不重複的名稱;數據源為要監聽的 MQTT 主題;流類型設置為 mqtt;流格式選擇
protobuf;模式名稱選擇上一步註冊的schema1;模式消息設置為 proto 文件裏定義的 messageBook。該配置表示數據流protoDemo將監聽 MQTT 主題protoDemo,收到二進制數據後將採用schema1中的Book的格式進行 protobuf 解碼。點擊提交,在流列表中應當列出新創建的流。 -
創建規則:選擇規則,點擊新建規則,進入規則創建界面。如下圖所示,右上角點擊進入文本模式,輸入自定義的規則ID,規則名字,在文本內容中輸入規則的 JSON 文本。該規則表示選擇流
protoDemo中的內容,發送到 MQTT 主題result/protobuf中。{ "id": "ruleDecode", "sql": "SELECT * FROM protoDemo", "actions": [{ "mqtt": { "server": "tcp://broker.emqx.io:1883", "topic": "result/protobuf", "sendSingle": true } }] } -
發送數據並查看結果:我們將使用 MQTTX 發送 Protobuf 編碼後的二進制數據到
protoDemo主題中,觀察收到的結果是否是解碼後的正確數據。- 打開 MQTT X,連接到雲端
tcp://broker.emqx.io:1883。 - 訂閲主題上文規則發送結果的主題
result/protobuf,便於觀察結果。 -
在消息發送窗格中,設置主題為
protoDemo,Payload 格式為Hex, 發送根據 schema1 中 Book 格式編碼的二進制數據,例如0a1073747265616d696e672073797374656d107b。 -
確保接收窗口收到正確的 JSON 數據,如下圖所示。
- 打開 MQTT X,連接到雲端
至此,我們完成了 Protobuf 數據的讀取和解碼並用簡單的規則進行處理輸出。用户像處理普通 JSON 格式數據一樣創建各種各樣的規則。若未得到預期結果,可在管理控制枱的規則列表頁面,查看規則狀態,確保規則數據入出的指標符合預期。
寫入 Protobuf 數據
本節中,我們將展示讀取 JSON 格式數據進行處理後採用 Protobuf 格式發送到雲端 MQTT broker 的用法。在物聯網邊雲協同的場景中,該用法可節省邊雲傳輸的帶寬開銷。部署在邊緣端的 eKuiper 接入本地的 MQTT broker 無需消耗帶寬,可通過處理較快的 JSON 格式接入。規則運算之後,計算結果需要發送到雲端 MQTT broker 時,可使用 Protobuf 編碼節省帶寬。
-
創建數據流:在管理控制枱中,選擇源管理->流管理,點擊創建流。如下圖所示,創建一個連入 demo 主題,JSON 格式數據的流。
-
創建規則,使用 Protobuf 格式發送到雲端。
- 點擊新建規則,輸入自定義的 Rule ID 和名稱,輸入 SQL
SELECT * FROM demo。 -
點擊動作右邊的新建按鈕,配置 MQTT 動作。其中,MQTT 服務器地址配置為雲端 broker 地址,MQTT 主題為
result/protobufOut;數據按條發送配置為 true,確保收到的為單條數據以匹配格式配置;流格式配置為protobuf,模式名稱為第一節註冊的schema1,模式消息為Book。該規則將讀取 JSON 數據,然後按照 Book 的格式編碼成二進制數據發往result/protobufOut主題。點擊提交,完成動作配置。 - 每個規則可以有多個動作,每個動作使用的編碼格式是獨立的。用户可以繼續配置其餘動作。全部配置完成後,點擊提交,完成規則的創建。
- 點擊新建規則,輸入自定義的 Rule ID 和名稱,輸入 SQL
-
發送數據並查看結果,該流程與上一節類似。本次我們將向 demo 主題發送 JSON 數據,並期望在訂閲的
result/protobufOut主題中查看到 protobuf 編碼的二進制數據。如下圖所示,注意數據格式的配置以免顯示亂碼。
總結
本教程介紹瞭如何在 eKuiper 中進行 Protobuf 數據的讀取和寫入。ProtoBuf 格式是 eKuiper 對外連接的格式的一種,各種格式之間可以任意組合,接入系統後使用的都是內部的格式表示。首先,用户需要先定義 Protobuf 的模式;之後在流的創建和動作的創建中可配置 Protobuf 格式,並選擇已定義的模式進行數據的編解碼。
版權聲明: 本文為 EMQ 原創,轉載請註明出處。
原文鏈接:https://www.emqx.com/zh/blog/using-ekuiper-to-process-protocol-buffers-data