博客 / 詳情

返回

聊聊 RocketMQ 主從複製

RocketMQ 主從複製是 RocketMQ 高可用機制之一,數據可以從主節點複製到一個或多個從節點。

這篇文章,我們聊聊 RocketMQ 的主從複製,希望大家讀完之後,能夠理解主從複製的精髓。

1 同步與異步

在 RocketMQ 的集羣模式中,Broker 分為 Master 與 Slave,一個 Master 可以對應多個 Slave,但是一個 Slave 只能對應一個 Master。

每個 Broker 與 Name Server 集羣中的所有節點建立長連接,定時註冊 Topic 信息到所有 Name Server。

Master 節點負責接收客户端的寫入請求,並將消息持久化到磁盤上。而 Slave 節點則負責從 Master 節點複製消息數據,並保持與 Master 節點的同步。

1、同步複製

每個 Master 配置一個 Slave ,有多對 Master-Slave ,HA 採用同步雙寫方式,即只有主備都寫成功,才嚮應用返回成功。

這種模式的優缺點如下:

  • 優點:數據與服務都無單點故障,Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高;
  • 缺點:性能比異步複製模式略低(大約低10%左右),發送單個消息的 RT 會略高,且目前版本在主節點宕機後,備機不能自動切換為主機。

2、異步複製

每個 Master 配置一個 Slave ,有多對 Master-Slave ,HA 採用異步複製方式,主備有短暫消息延遲(毫秒級),這種模式的優缺點如下:

  • 優點:即使磁盤損壞,消息丟失的非常少,且消息實時性不會受影響,同時Master宕機後,消費者仍然可以從Slave消費,而且此過程對應用透明,不需要人工干預,性能同多 Master 模式幾乎一樣;
  • 缺點:Master 宕機,磁盤損壞情況下會丟失少量消息 。

複製流程分為兩個部分:元數據複製消息數據複製

  • 主從服務器同步主題,消費者進度,延遲消費進度,消費者配置數據
  • 主從服務器同步消息數據

2 元數據複製

Slave Broker 定時任務每隔 10 秒會同步元數據,包括主題消費進度延遲消費進度消費者配置

同步主題時, Slave Broker 向 Master Broker 發送 RPC 請求,返回數據後,首先加入本地緩存裏,然後持久化到本地。

3 消息數據複製

下圖是 Master 和 Slave 消息數據同步的流程圖。

1、Master 啓動後監聽指定端口;

Master 啓動後創建 AcceptSocketService 服務 , 用來創建客户端到服務端的 TCP 鏈接。

RocketMQ 抽象了鏈接對象 HAConnection , HAConnection 會啓動兩個線程,分別用於讀服務和寫服務:

  • 讀服務:處理 Slave 發送的請求
  • 寫服務:用於向 Slave 傳輸數據

2、Slave 啓動後,嘗試連接 Master ,建立 TCP 連接;

HAClient 是客户端 Slave 的核心類 ,負責和 Master 創建連接和數據交互。

客户端在啓動後,首先嚐試連接 Master , 查詢當前消息存儲中最大的物理偏移量 ,並存儲在變量 currentReportedOffset 裏。

3、Slave 向 Master 彙報拉取消息偏移量;

上報進度的數據格式是一個 Long 類型的 Offset , 8個字節 , 非常簡潔 。

發送到 Socket 緩衝區後 , 修改最後一次的寫時間 lastWriteTimestamp 。

4、Master 解析請求偏移量,從消息文件中檢索該偏移量後的所有消息;

當 Slave 上報數據到 Master 時,觸發 SelectionKey.OP_READ 事件,Master 將請求交由 ReadSocketService 服務處理:

當 Slave Broker 傳遞了自身 commitlog 的 maxPhyOffset 時,Master 會馬上中斷 selector.select(1000) ,執行 processReadEvent 方法。

processReadEvent 方法的核心邏輯是設置 Slave 的當前進度 offset ,然後通知複製線程當前的複製進度。

寫服務 WriteSocketService 從消息文件中檢索該偏移量後的所有消息(傳輸批次數據大小限制),並將消息數據發送給 Slave。

5、Slave 接收到數據,將消息數據 append 到消息文件 commitlog 裏 。

首先 HAClient 類中調用 dispatchReadRequest 方法 , 解析出消息數據 ;

然後將消息數據 append 到本地的消息存儲。

4 同步的實現

從數據複製流程圖,我們發覺數據複製本身就是一個異步執行的,但是同步是如何實現的呢?

Master Broker 接收到寫入消息的請求後 ,調用 Commitlog 的 aysncPutMessage 方法寫入消息。

這段代碼中,當 commitLog 執行完 appendMessage 後, 需要執行刷盤任務同步複製兩個任務。

但這兩個任務並不是同步執行,而是異步的方式,使用了 CompletableFuture 這個異步神器

當 HAConnection 讀服務接收到 Slave 的進度反饋,發現消息數據複製成功,則喚醒 future 。

最後 Broker 組裝響應命令 ,並將響應命令返回給客户端。

5 總結

RocketMQ 主從複製的實現思路非常簡單,Slave 啓動一個線程,不斷從 Master 拉取 Commit Log 中的數據,然後在異步 build 出 Consume Queue 數據結構。

核心要點如下:

1、主從複製包含元數據複製和消息數據複製兩個部分;

2、元數據複製

​ Slave Broker 定時任務每隔 10 秒向 Master Broker 發送 RPC 請求,將元數據同步到緩存後,然後持久化到磁盤裏;

3、消息數據複製

  1. Master 啓動監聽指定端口
  2. Slave 啓動 HaClient 服務,和 Master 創建 TCP 鏈接
  3. Slave 向 Master 上報存儲進度
  4. Master 接收進度,消息文件中檢索該偏移量後的所有消息,並傳輸給 Slave
  5. Slave 接收到數據後,將消息數據 append 到本地的消息存儲。

4、同步的實現

​ 當 commitLog 執行完 appendMessage 後, 需要執行刷盤任務同步複製兩個任務,這裏用到了 CompletableFuture 這個異步神器。
​ 當 HAConnection 讀服務接收到 Slave 的進度反饋,發現消息數據複製成功,則喚醒 future 。最後 Broker 組裝響應命令 ,並將響應命令 返回給客户端 。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.