title: "RocketMQ 5.0 API 與 SDK 的演進" date: "2022/10/12" author: "艾陽坤" img: "https://img.alicdn.com/imgextra/i2/O1CN01gk1XJw1g7YivksdVN_!!6000000004095-0-tps-685-383.jpg" tags: ["explore"] description: "。"
本文已同步收錄至「RocketMQ 中文社區」👉 面向 RocketMQ 中文開發者的一站式學習社區 RocketMQ 5.0 SDK 採用了全新的 API,使用 gRPC 作為通信層的實現,並在可觀測性上做了很大幅度的提升。
全新統一的 API
此處的 API 並不單單只是接口上的定義,同時也規定了各個接口不同的方法和行為,明確了整個消息模型。
RocketMQ 過去的 API 從第一版開始,至今已經過了很長時間,長期依賴是一個缺乏變革的狀態,對於一些中途打算廢棄或者變更的 API 也沒有進行後續的迭代。此外,接口的定義也不夠清晰。因此,RocketMQ 希望在 5.0 中能夠建立一個統一的規範,精簡整個 API,通過引入 builder 模式來引入更多的不變性,同時做好異常管理,給開發者和用户一個更加清爽的面貌。
目前 C++ 和 Java 已經進行了 5.0 API 的定義與實現,更多語言的支持也陸續在路上了。我們也歡迎更多的開發者可以參與到社區的工作中來。這裏給出 5.0 客户端的倉庫鏈接:
https://github.com/apache/rocketmq-clients
除了在上述接口上做的一些修改之外, RocketMQ 5.0 還規定了四種新的不同的客户端類型,即 Producer/Push Consumer/Simple Consumer/Pull Consumer。
其中 Pull Consumer 還在開發中;Producer 主要還是做了接口裁剪,規範了異常管理。在功能上其實並沒有做一些顛覆性的改變。Push Consumer 也是比較類似的;Simple consumer 將更多的權利將下發給用户,是一種用户可以主動控制消息接收與處理過程的消費者,特別的,5.0 的 SDK 中,Push Consumer 和 Simple Consumer 都採用 RocketMQ 的 pop 機制進行實現,一些社區的同學可能已經熟悉了。
如果用户並不一定想控制或者關心整個消息的接收過程,只在乎消息的消費過程的話,這個時候 Push Consumer 可能是一個更好的選擇。
RocketMQ 5.0 定義了四種不同的消息類型。過去的開源版本中其實我們並沒有去突出消息類型這樣一個概念,後續出於維護及運維方面的需要以及模型定義的完備,才讓今天的 5.0 有了消息類型的這樣一個概念。
1、NORMAL:普通消息。 2、FIFO:滿足先入先出的語義。用户可以通過定義 message group 來控制消息間的消費順序。例如圖中的 fruit 這個 topic 下,可以定義不同的 message group,在 apple 這個 message group 下,會按照發送順序決定消息被消費的順序,並且不同的 message group 之間不會互相干擾。 3、TRANSACTIONAL:可以將一條或多條消息包成一個事務,最終用户可以根據自己的業務結果選擇提交或者回滾。 4、DELAY:用户可以自主地設置消息的定時時間,相比之前開源版本僅允許用户設置定時/延遲級別,5.0 的實現中還講允許用户設置更精確的時間戳。
以上四種消息是互斥的,我們會在 topic 的元數據去標識它的類型。實際在消息發送的時候如果如果出現了嘗試發送的消息類型與 topic 類型不匹配的情況,也會做一些限制。
實現
RocketMQ 5.0 在客户端的啓動過程中提前進行了更多的準備工作。比如用户提前設置要發送消息的 topic 時,Producer 會在啓動過程中嘗試獲取對應 topic 的路由。在過去的客户端實現中,在針對於某個 topic 第一次發送消息時,需要先獲取路由,這裏就會有一個類似冷啓動的過程。
提前獲取 Topic 的路由信息有兩點好處:
- 不阻塞後面的發送,讓消息的發送僅僅觸發發送這一個動作。
- 錯誤前置,比如用户要往一個不存在 Topic 發送消息時,因為路由的獲取參與到整個客户端的啓動過程,獲取路由不成功,那整個客户端啓動可能就會失敗,用户也是拿不到對應的 Producer 對象的。
類似的,Consumer 的啓動也會有這樣的一個過程。
除此之外,我們在客户端和服務端之間增加了一個 Telemetry 的部分,它會在客户端和服務端之間建立起了一個進行雙向數據通訊的通道,客户端和服務端會在這個過程中溝通配置,比如服務端可以實現對客户端配置的下發,更好地管理客户端。此外,Telemetry 也可以將本地配置主動上報給服務端,讓服務端也可以對客户端的設置有更好的瞭解。Telemetry 通道也會在客户端啓動時嘗試建立,如果這個通道沒有建立成功,也會影響客户端的啓動。
總的來説,客户端的啓動過程會盡可能將所有準備工作做好。同時在客户端和服務端之間建立 Telemetry 這樣一個通訊通道。
客户端內部存在一些週期性的任務,比如路由的定時更新以及客户端往服務端發送心跳等。對於上文中提到的 Telemetry 過程中,客户端的配置上報也是週期性的。
Producer 在 RocketMQ 5.0 中的具體工作流程
消息在發送時,會檢查是否已經獲取對應 topic 的路由信息。如果已經獲取,則嘗試在路由中選取隊列,隨即查看要發送的消息的類型是否與 topic 類型匹配,如果匹配,則進行消息發送。如果發送成功,則返回;否則,判斷當前重試次數是否超出用户設置的上限,如果超出,則返回失敗;否則輪轉到下一個隊列,然後對新的隊列進行重試直到消費次數超出上線。而如果啓動過程中沒有提前獲取路由,那麼消息發送時依然會先嚐試獲取路由,然後再進行下一步操作。
另外一點相對於老客户端較大的改變在於,客户端從底層 RPC 交互到上層的業務邏輯全部採用異步實現。Producer 依然會提供一個同步發送接口和異步發送接口,但同步的方法也是使用異步來實現,整個邏輯非常統一和清爽。
Push Consumer 分為兩部分,消息的接收和消費。
消息接收流程為:客户端需要不斷地從服務端拉取消息,並將消息緩存。Push Consumer 會將消息先緩存到客户端的本地,再進行消費,因此它會判斷客户端本地的 Cache 是否已滿,如果已滿,則隔一段時間再判斷,直到消息被客户端消費,Cache 尚有餘量時再進行消息拉取。為了避免出現一些內存問題,Cache 的大小也是被嚴格限制的。
消息消費過程分為兩個類型,順序類型和非順序類型。
其中非順序類型即併發消費。消費者會先從 Cache 中獲取消息,然後嘗試消費消息,消費後再將消息從 Cache 中移除。消息消費成功時,會嘗試將消息 ACK ,消費流程結束;如果消費失敗,則嘗試修改消息的可見時間,即決定下一次什麼時候可見。
順序消費指對於同一個 Group 的消息,最終消費時一定是前一條消息被消費過並且得到確認後,後面的消息才能夠繼續消費。而消費過程與非順序消費類似,首先嚐試從 Cache 中拉取消息,如果消費成功,則將消息 ACK。ACK 成功後,將其從 Cache 中移除。特別地,如果消費失敗,會 suspend 一段時間,然後繼續嘗試對消息進行消費。此時會判斷消費次數是否超限,如果超限,則會嘗試將消息放入死信隊列中。
相對於非順序消費,順序消費更復雜,因為其需要保證前一個消息消費成功後才能對後面的消息進行消費。順序消費的消費邏輯是基於 message group 隔離的。message group 會在發送時做哈希,從而保證 message group 的消息最終會落在一個隊列上,順序消費模式本質上保證隊列內部消費的順序。
此外,因為不同 message group 的順序消息最終可能會映射到同一個隊列上,這可能會導致不同的 message group 之間的消費形成阻塞,因此服務端未來會實現一個虛擬隊列,讓不同的 message group 映射到客户端的虛擬隊列,保證他們之間沒有任何阻塞,從而加速數據消息的消費過程。
對於 Simple Consumer,用户可以主動控制消息接收和確認的流程。比如用户收到消息後,可以根據業務決定是否過一段時間再消費該消息,或者不需要再收到該消息。消費成功後將消息 ACK 掉,如果失敗則主動修改可見時間,選擇該消息下一次什麼時候可見,即由用户自發地控制整個過程。
可觀測性
Shaded Logback
因為歷史原因,RocketMQ 的老客户端並不是面向 SLF4J 進行編程的,而是面向 logback 的。這麼做的目的其實是為了方便快捷地獲取日誌,不需要讓用户自己去手動配置。
RocketMQ 中專門有一個 logging 模塊是負責日誌部分的,像用户自己使用了 logback ,RocketMQ SDK 如果也直接去使用 logback,兩者就會產生各種各樣的衝突,這個 logging 模塊就是用來保證這一層隔離性的。
但是 logging 模塊本身的實現並不是很優雅,也帶來了一定的維護成本。因此我們採用了 shade logback 的方式來達到上文提到的隔離性。shaded logback 不僅能夠避免用户的 logback 與 RocketMQ 自己的 logback 衝突,還能保持較好的可維護性,將來要想在日誌上去做一些修改,也來得容易的多。
具體來説,用户的 logback 會採用 logback.xml 的配置文件,通過 shade logback, RocketMQ 5.0 的客户端會使用 rocketmq.logback.xml 的配置文件,因此在配置部分就已經完全隔離了,同時在 shade 的過程中,還對原生 logback 中使用到的一些環境變量和系統變量也進行了修改,這樣就保證了兩者的徹底隔離。
另外,使用 shadeed logback 之後,RocketMQ 5.0 客户端中的日誌部分就全都是面向 SLF4J 來進行編程的了,這樣一來,如果我們未來想讓用户自己去完全控制日誌的話,提供一個去除 logback 的 SDK 就可以了,非常方便。
Trace
5.0 的消息軌跡基於 OpenTelemetry 模型進行定義與實現,消息發送或接收消息的流程被定義為一個個獨立的 span ,這一套 span 規範參照了 OpenTelemetry 關於 Messaging 的定義。圖中這裏 Process P 表示 Producer ,Process C 表示 Consumer。消息的全生命週期,從發送到接收到消費,就可以具象化為這樣一個個的 span。
比如,針對 Push Consumer 而言,先會有一個 receive 的 span 來表示從服務端獲取消息的過程,收到消息後到會先等待消息被處理,這個也就是 await span 表示的過程,消息被處理則對應圖中的 process span,消息消費結束之後,向服務端反饋消息處理結果也會有專門的 span 進行描述。
我們通過 parent 和 link 來講所有的這些 span 關聯起來,這樣通過一條消息的任意一個 span,就可以獲得這條消息全生命週期的所有 span。
不僅如此,用户還將允許可以設置一個 span context 與自己的業務鏈路進行關聯,將 RocketMQ 5.0 的消息軌跡本身嵌入進自己的全鏈路可觀測系統中去。
Metrics
Tracing 相對來説成本是比較高的,因為一條消息從發送到接收,可能會有很多流程,這就伴隨着很多的 span,這就導致相對來説,tracing 數據的存儲查詢成本相對來説比較高。我們希望診斷整個 SDK 的健康狀況,同時又不希望收集太多的 tracing 信息提高成本,此時提供一份 metrics 數據就能比較好地滿足我們的需求。
在 SDK 的 metrics 中我們新增了諸多指標,包括不限於 Producer 中消息發送延時,Push Consumer 中消息的消費耗時和消息緩存量,可以幫助用户和運維者更快更好地發現異常。
5.0 中 SDK 的 metrics 也是基於 OpenTelemetry 進行實現的。以 Java程序為例,OpenTelemetry 對於 Java 的實現本身提供了一個 agent,agent 在運行時會打點採集 SDK 的一些 tracing/metrics 信息,並將它上報到對應的 metric collector 中,這種通過 agent 來降低無侵入式數據採集的方式被稱之為 automatic instrumentation,而手動在代碼中實現打點採集的方式則被稱之 manual instrumentation。對於 metrics 我們目前還是採用 manual instrumentation 的方式來進行數據的採集和上報的。服務端會告知客户端對應的 collector 的地址,然後客户端將 Metrics 數據上傳到對應的 collector 當中去。
作者介紹:艾陽坤,Apache RocketMQ 5.0 Java SDK 作者,CNCF Envoy Contributor,CNCF OpenTelemetry Contributor,阿里雲智能高級開發工程師。