title: "RocketMQ 5.0 API 與 SDK 的演進" date: "2022/10/12" author: "艾陽坤" img: "https://img.alicdn.com/imgextra/i2/O1CN01gk1XJw1g7YivksdVN_!!6000000004095-0-tps-685-383.jpg" tags: ["explore"] description: "。"

本文已同步收錄至「RocketMQ 中文社區」👉 面向 RocketMQ 中文開發者的一站式學習社區 RocketMQ 5.0 SDK 採用了全新的 API,使用 gRPC 作為通信層的實現,並在可觀測性上做了很大幅度的提升。

全新統一的 API

此處的 API 並不單單只是接口上的定義,同時也規定了各個接口不同的方法和行為,明確了整個消息模型。

RocketMQ 5.0 API 與 SDK 的演進_客户端

RocketMQ 過去的 API 從第一版開始,至今已經過了很長時間,長期依賴是一個缺乏變革的狀態,對於一些中途打算廢棄或者變更的 API 也沒有進行後續的迭代。此外,接口的定義也不夠清晰。因此,RocketMQ 希望在 5.0 中能夠建立一個統一的規範,精簡整個 API,通過引入 builder 模式來引入更多的不變性,同時做好異常管理,給開發者和用户一個更加清爽的面貌。

目前 C++ 和 Java 已經進行了 5.0 API 的定義與實現,更多語言的支持也陸續在路上了。我們也歡迎更多的開發者可以參與到社區的工作中來。這裏給出 5.0 客户端的倉庫鏈接:

https://github.com/apache/rocketmq-clients

除了在上述接口上做的一些修改之外, RocketMQ 5.0 還規定了四種新的不同的客户端類型,即 Producer/Push Consumer/Simple Consumer/Pull Consumer。

其中 Pull Consumer 還在開發中;Producer 主要還是做了接口裁剪,規範了異常管理。在功能上其實並沒有做一些顛覆性的改變。Push Consumer 也是比較類似的;Simple consumer 將更多的權利將下發給用户,是一種用户可以主動控制消息接收與處理過程的消費者,特別的,5.0 的 SDK 中,Push Consumer 和 Simple Consumer 都採用 RocketMQ 的 pop 機制進行實現,一些社區的同學可能已經熟悉了。

如果用户並不一定想控制或者關心整個消息的接收過程,只在乎消息的消費過程的話,這個時候 Push Consumer 可能是一個更好的選擇。

RocketMQ 5.0 定義了四種不同的消息類型。過去的開源版本中其實我們並沒有去突出消息類型這樣一個概念,後續出於維護及運維方面的需要以及模型定義的完備,才讓今天的 5.0 有了消息類型的這樣一個概念。

RocketMQ 5.0 API 與 SDK 的演進_API_02

1、NORMAL:普通消息。 2、FIFO:滿足先入先出的語義。用户可以通過定義 message group 來控制消息間的消費順序。例如圖中的 fruit 這個 topic 下,可以定義不同的 message group,在 apple 這個 message group 下,會按照發送順序決定消息被消費的順序,並且不同的 message group 之間不會互相干擾。 3、TRANSACTIONAL:可以將一條或多條消息包成一個事務,最終用户可以根據自己的業務結果選擇提交或者回滾。 4、DELAY:用户可以自主地設置消息的定時時間,相比之前開源版本僅允許用户設置定時/延遲級別,5.0 的實現中還講允許用户設置更精確的時間戳。

以上四種消息是互斥的,我們會在 topic 的元數據去標識它的類型。實際在消息發送的時候如果如果出現了嘗試發送的消息類型與 topic 類型不匹配的情況,也會做一些限制。

實現

RocketMQ 5.0 在客户端的啓動過程中提前進行了更多的準備工作。比如用户提前設置要發送消息的 topic 時,Producer 會在啓動過程中嘗試獲取對應 topic 的路由。在過去的客户端實現中,在針對於某個 topic 第一次發送消息時,需要先獲取路由,這裏就會有一個類似冷啓動的過程。

RocketMQ 5.0 API 與 SDK 的演進_客户端_03

提前獲取 Topic 的路由信息有兩點好處:

  1. 不阻塞後面的發送,讓消息的發送僅僅觸發發送這一個動作。
  2. 錯誤前置,比如用户要往一個不存在 Topic 發送消息時,因為路由的獲取參與到整個客户端的啓動過程,獲取路由不成功,那整個客户端啓動可能就會失敗,用户也是拿不到對應的 Producer 對象的。

類似的,Consumer 的啓動也會有這樣的一個過程。

除此之外,我們在客户端和服務端之間增加了一個 Telemetry 的部分,它會在客户端和服務端之間建立起了一個進行雙向數據通訊的通道,客户端和服務端會在這個過程中溝通配置,比如服務端可以實現對客户端配置的下發,更好地管理客户端。此外,Telemetry 也可以將本地配置主動上報給服務端,讓服務端也可以對客户端的設置有更好的瞭解。Telemetry 通道也會在客户端啓動時嘗試建立,如果這個通道沒有建立成功,也會影響客户端的啓動。

總的來説,客户端的啓動過程會盡可能將所有準備工作做好。同時在客户端和服務端之間建立 Telemetry 這樣一個通訊通道。

RocketMQ 5.0 API 與 SDK 的演進_客户端_04

客户端內部存在一些週期性的任務,比如路由的定時更新以及客户端往服務端發送心跳等。對於上文中提到的 Telemetry 過程中,客户端的配置上報也是週期性的。

RocketMQ 5.0 API 與 SDK 的演進_服務端_05

Producer 在 RocketMQ 5.0 中的具體工作流程

消息在發送時,會檢查是否已經獲取對應 topic 的路由信息。如果已經獲取,則嘗試在路由中選取隊列,隨即查看要發送的消息的類型是否與 topic 類型匹配,如果匹配,則進行消息發送。如果發送成功,則返回;否則,判斷當前重試次數是否超出用户設置的上限,如果超出,則返回失敗;否則輪轉到下一個隊列,然後對新的隊列進行重試直到消費次數超出上線。而如果啓動過程中沒有提前獲取路由,那麼消息發送時依然會先嚐試獲取路由,然後再進行下一步操作。

另外一點相對於老客户端較大的改變在於,客户端從底層 RPC 交互到上層的業務邏輯全部採用異步實現。Producer 依然會提供一個同步發送接口和異步發送接口,但同步的方法也是使用異步來實現,整個邏輯非常統一和清爽。

RocketMQ 5.0 API 與 SDK 的演進_API_06

Push Consumer 分為兩部分,消息的接收和消費。

消息接收流程為:客户端需要不斷地從服務端拉取消息,並將消息緩存。Push Consumer 會將消息先緩存到客户端的本地,再進行消費,因此它會判斷客户端本地的 Cache 是否已滿,如果已滿,則隔一段時間再判斷,直到消息被客户端消費,Cache 尚有餘量時再進行消息拉取。為了避免出現一些內存問題,Cache 的大小也是被嚴格限制的。

RocketMQ 5.0 API 與 SDK 的演進_客户端_07

消息消費過程分為兩個類型,順序類型和非順序類型。

其中非順序類型即併發消費。消費者會先從 Cache 中獲取消息,然後嘗試消費消息,消費後再將消息從 Cache 中移除。消息消費成功時,會嘗試將消息 ACK ,消費流程結束;如果消費失敗,則嘗試修改消息的可見時間,即決定下一次什麼時候可見。

順序消費指對於同一個 Group 的消息,最終消費時一定是前一條消息被消費過並且得到確認後,後面的消息才能夠繼續消費。而消費過程與非順序消費類似,首先嚐試從 Cache 中拉取消息,如果消費成功,則將消息 ACK。ACK 成功後,將其從 Cache 中移除。特別地,如果消費失敗,會 suspend 一段時間,然後繼續嘗試對消息進行消費。此時會判斷消費次數是否超限,如果超限,則會嘗試將消息放入死信隊列中。

相對於非順序消費,順序消費更復雜,因為其需要保證前一個消息消費成功後才能對後面的消息進行消費。順序消費的消費邏輯是基於 message group 隔離的。message group 會在發送時做哈希,從而保證 message group 的消息最終會落在一個隊列上,順序消費模式本質上保證隊列內部消費的順序。

此外,因為不同 message group 的順序消息最終可能會映射到同一個隊列上,這可能會導致不同的 message group 之間的消費形成阻塞,因此服務端未來會實現一個虛擬隊列,讓不同的 message group 映射到客户端的虛擬隊列,保證他們之間沒有任何阻塞,從而加速數據消息的消費過程。

RocketMQ 5.0 API 與 SDK 的演進_API_08

對於 Simple Consumer,用户可以主動控制消息接收和確認的流程。比如用户收到消息後,可以根據業務決定是否過一段時間再消費該消息,或者不需要再收到該消息。消費成功後將消息 ACK 掉,如果失敗則主動修改可見時間,選擇該消息下一次什麼時候可見,即由用户自發地控制整個過程。

可觀測性

Shaded Logback

RocketMQ 5.0 API 與 SDK 的演進_服務端_09

因為歷史原因,RocketMQ 的老客户端並不是面向 SLF4J 進行編程的,而是面向 logback 的。這麼做的目的其實是為了方便快捷地獲取日誌,不需要讓用户自己去手動配置。

RocketMQ 中專門有一個 logging 模塊是負責日誌部分的,像用户自己使用了 logback ,RocketMQ SDK 如果也直接去使用 logback,兩者就會產生各種各樣的衝突,這個 logging 模塊就是用來保證這一層隔離性的。

但是 logging 模塊本身的實現並不是很優雅,也帶來了一定的維護成本。因此我們採用了 shade logback 的方式來達到上文提到的隔離性。shaded logback 不僅能夠避免用户的 logback 與 RocketMQ 自己的 logback 衝突,還能保持較好的可維護性,將來要想在日誌上去做一些修改,也來得容易的多。

具體來説,用户的 logback 會採用 logback.xml 的配置文件,通過 shade logback, RocketMQ 5.0 的客户端會使用 rocketmq.logback.xml 的配置文件,因此在配置部分就已經完全隔離了,同時在 shade 的過程中,還對原生 logback 中使用到的一些環境變量和系統變量也進行了修改,這樣就保證了兩者的徹底隔離。

另外,使用 shadeed logback 之後,RocketMQ 5.0 客户端中的日誌部分就全都是面向 SLF4J 來進行編程的了,這樣一來,如果我們未來想讓用户自己去完全控制日誌的話,提供一個去除 logback 的 SDK 就可以了,非常方便。

Trace

5.0 的消息軌跡基於 OpenTelemetry 模型進行定義與實現,消息發送或接收消息的流程被定義為一個個獨立的 span ,這一套 span 規範參照了 OpenTelemetry 關於 Messaging 的定義。圖中這裏 Process P 表示 Producer ,Process C 表示 Consumer。消息的全生命週期,從發送到接收到消費,就可以具象化為這樣一個個的 span。

RocketMQ 5.0 API 與 SDK 的演進_服務端_10

比如,針對 Push Consumer 而言,先會有一個 receive 的 span 來表示從服務端獲取消息的過程,收到消息後到會先等待消息被處理,這個也就是 await span 表示的過程,消息被處理則對應圖中的 process span,消息消費結束之後,向服務端反饋消息處理結果也會有專門的 span 進行描述。

我們通過 parent 和 link 來講所有的這些 span 關聯起來,這樣通過一條消息的任意一個 span,就可以獲得這條消息全生命週期的所有 span。

不僅如此,用户還將允許可以設置一個 span context 與自己的業務鏈路進行關聯,將 RocketMQ 5.0 的消息軌跡本身嵌入進自己的全鏈路可觀測系統中去。

Metrics

Tracing 相對來説成本是比較高的,因為一條消息從發送到接收,可能會有很多流程,這就伴隨着很多的 span,這就導致相對來説,tracing 數據的存儲查詢成本相對來説比較高。我們希望診斷整個 SDK 的健康狀況,同時又不希望收集太多的 tracing 信息提高成本,此時提供一份 metrics 數據就能比較好地滿足我們的需求。

RocketMQ 5.0 API 與 SDK 的演進_客户端_11

在 SDK 的 metrics 中我們新增了諸多指標,包括不限於 Producer 中消息發送延時,Push Consumer 中消息的消費耗時和消息緩存量,可以幫助用户和運維者更快更好地發現異常。

5.0 中 SDK 的 metrics 也是基於 OpenTelemetry 進行實現的。以 Java程序為例,OpenTelemetry 對於 Java 的實現本身提供了一個 agent,agent 在運行時會打點採集 SDK 的一些 tracing/metrics 信息,並將它上報到對應的 metric collector 中,這種通過 agent 來降低無侵入式數據採集的方式被稱之為 automatic instrumentation,而手動在代碼中實現打點採集的方式則被稱之 manual instrumentation。對於 metrics 我們目前還是採用 manual instrumentation 的方式來進行數據的採集和上報的。服務端會告知客户端對應的 collector 的地址,然後客户端將 Metrics 數據上傳到對應的 collector 當中去。

作者介紹:艾陽坤,Apache RocketMQ 5.0 Java SDK 作者,CNCF Envoy Contributor,CNCF OpenTelemetry Contributor,阿里雲智能高級開發工程師。

RocketMQ 5.0 API 與 SDK 的演進_API_12