一、Pulsar存儲架構簡析
Pulsar作為新一代MQ中間件,在底層架構設計上充分貫徹了存算分離的思想,broker與Bookeeper兩個組件獨立部署,前者負責流量的調度、聚合、計算,後者負責數據的存儲,這也契合了雲原生下k8s大行其道的時代背景。Bookeeper又名Bookie ,是一個單獨的存儲引擎。在組件關係上,broker深度依賴Bookie,內部集成了 Bookie的client端,broker和Bookie之間基於TCP通信,使用protobuf。
Pulsar整體架構消息流從client端發送到broker,經過broker的計算、轉化、路由後再次被分發到具體的Bookie節點,一條消息被存儲幾份是可配置的。數據的高可用由broker來保障而非Bookie,Bookie只是一個簡單的單機存儲引擎。一般而言數據多副本有兩種主要的分發方式:一種是基於主從模式,主節點在收到數據寫入後,將數據二次分發到從節點,從節點的數據流源頭只有主節點,可以存在多個從節點,這種架構典型實現有rocketMQ ,MySQL等;另一種方式是並行多份寫入多份相同的數據,在接收到SDK側數據後進行多路分發。兩種方式各有優劣,前者實現簡單,但是延遲較高,在開啓同步複製(異步複製可能丟數據)的情況下延遲為: master寫入延遲+slave寫入延遲;後者實現複雜,需要處理單節點分發失敗補償的問題,但是延遲較低,實際的寫入延遲為Max(shard1寫入延遲,shard2寫入延遲,.....)。Pulsar的數據分發模式為後者。
Pulsar數據流架構一個topic在時間序列上被分為多個Ledger,使用LedgerId標識,在一個物理集羣中,LedgerId不會重複,採用全局分配模式,對於單個topic(分區topic)而言同一時刻只會有一個Ledger在寫入,關閉的Ledger不可以寫入,以topicA-partition1的 Ledgers[ledger1, ledger3, ledger7, ...., ledgerN]為例,可寫入的Ledger只有N,小於N的Ledger均不可寫入,單個Ledger默認可以存儲5W條消息,當broker以 (3,2,2)模式寫入數據時,具體架構如下圖所示。3,2,2 可以解釋為當前topic可以寫入的節點有3個,每次數據寫入2份,並且收到2個數據寫入成功的ACK後才會返回響應client端。
Ledger分段機制
二、Bookie的架構設計
對Pulsar的架構有了大致的瞭解後,我們重點剖析下Bookie這個核心的存儲引擎。消息系統為了追求最大寫入吞吐,一般都採用順序寫的方式來壓榨磁盤的IO性能。Bookie也是一樣,默認情況下Bookie的數據會寫入journal日誌文件,這個日誌類似於MySQL中的binlog文件或者rocketMQ中的commitlog文件,採用亂序追加寫的方式,存在多個topic的數據寫入同一個文件的情況。
為了更好的IO隔離,官方建議journal單獨掛一塊盤。為了充分發揮磁盤IO性能,journal目錄可以有多個,即同時存在多個並行寫入的journal日誌,每個journal日誌會綁定一個寫入線程,寫入請求提交後會被歸一化到某個具體線程,實現無鎖化,單個消息寫入是按照LedgerId對目錄數量取模,決定當前數據落到哪個journal目錄。journal日誌落盤策略是可配置的,當配置同步落盤時,數據實時落盤後才會返回寫入成功。journal日誌數據寫入後會確認返回寫入成功,而entrylog的數據是否落盤並不影響請求的立即返回。journal和entrylog均可以配置為異步刷盤,這種情況下落盤的時序上並沒有先後之分。
Bookie數據存儲架構Journal日誌的主要作用是保證數據不丟失,同時提供足夠快的性能,因此採用了混合落盤的模式。實際業務消費時,針對單個topic的數據在時間序列上是順序消費,如果實際的數據從journal文件中讀取則會出現大量的隨機IO,性能較差。Bookie通過將數據進行二次轉寫的方式實現數據的局部有序從而提升讀取性能,默認情況下一份數據在磁盤上會存兩份:一份在journal日誌中,一份在entry日誌中。entry日誌中的數據具備局部有序的特性,在一批數據刷盤時,會針對這批數據按照LedgerId,entryId進行排序後落盤。這樣消費側在消費數據時能夠實現一定程度上的順序IO,以提升性能。
entryIndex的作用是保存(LedgerId+entryId)到offset的映射關係,這裏的offset是指entry data文件中的offset。
這樣的一組映射關係很容易想到其在內存中的組織形式,一個map。實際的存儲 Pulsar選擇rocksDB來存儲這樣的KV關係,但Bookie本身也有自己的KV 存儲實現;
通過對Bookie架構的上分析,我們發現針對讀寫場景Bookie做了兩件事來支撐:
- 混合Ledger順序寫的journal日誌支撐高吞吐低延遲的寫入場景;
-
局部有序的entry data 支撐消費場景下的Ledger級別的順序讀。
三、Bookie的數據寫入流程
對於Bookie的寫入流程大致如下圖所示。Bookie收到數據後會同時寫入journal日誌和memtable,memtable是一個內存buffer。memtable再次分發到entry logger以及entry index,數據在journal中append完後會立即返回寫入成功。entry data和entry index的構建可以理解都是異步操作。
Bookie數據寫入流程
client端源碼分析
Pulsar中broker組件 使用low level API與Bookie進行通信。下文結合具體代碼進行分析。
ClientConfiguration conf = new ClientConfiguration();
conf.setThrottleValue(bkthrottle);
conf.setMetadataServiceUri("zk://" + zkservers + "/ledgers");
BookKeeper bkc = new BookKeeper(conf);
final LedgerHandle ledger = bkc.createLedger(3, 2, 2, DigestType.CRC32, new byte[]{'a', 'b'});
final long entryId = ledger.addEntry("ABC".getBytes(UTF_8));
使用low level api時,藉助於LedgerHandle添加entry對象。在Pulsar中entryId為一個遞增的序列,在broker中Bookie的源碼調用順序如下所示,其中LedgerHandle,OpAddEntry,LedgerHandle class對象為Bookeeper模塊提供。
- ManagedLedgerImpl#asyncAddEntry()方法(參數省略,下同)
- ManagedLedgerImpl#internalAsyncAddEntry()方法
- LedgerHandle#asyncAddEntry()方法
- OpAddEntry#initiate()方法
- LedgerHandle#doAsyncAddEntry()方法
- BookieClient#addEntry()方法
LedgerHandle#doAsyncAddEntry方法在doAsyncAddEntry中的729行,發現entryId其實是由lastAddPushed遞增得到,並且這段代碼也被加上了重量級鎖。PendingAddOp對象構建完成後會進入一個pendingAddOps隊列,該隊列與當前Ledger綁定。
PendingAddOp#initiate方法這裏的PendingAddOp對象代表着一個寫數據的請求,在initiate進一步加鎖,結合寫入節點的數量分別向不同的Bookie存儲節點發送寫請求,sendWriteRequest方法內容比較簡單,直接調用addEntry方法即可。
PendingAddOp#sendWriteRequest
BookieClient#addEntryaddEntry方法的實現依然有很多方法包裝的細節,但最終通過網絡調用server端的相關接口,這裏篇幅有限,不過度展開。
server端源碼分析
請求路由組件:BookieRequestProcessor
直接跳轉bookeeper的server端的核心處理方法上,BookieRequestHandler為server端的處理類,其繼承了Netty的ChannelInboundHandlerAdapter,是最外層與netty組合工作的handler。
BookieRequestHandler在channelRead方法中觸發了requestProcessor的處理邏輯,這裏的processor實際為BookieRequestProcessor,具體的相關代碼在BookieServer類的構造函數中,BookieServer是整個bookeeper server端的啓動類。
BookieRequestProcessor#processRequest方法為數據流的核心指令分發器。
BookieRequestProcessor#processRequest這裏圍繞processAddRequestV3方法展開分析;Bookie中有個很有意思的設定,將請求處理線程池分為普通線程池和高優線程池;兩者執行邏輯相同。在下圖的452行將寫操作請求放入了線程池,需要説明的是這個線程池是經過改良的,多了一個 orderingKey參數,在內部會將根據該參數進行hash運算,映射具體的線程上,其內部由多個單線程的線程池組成。這樣做的好處是可以大幅度減少投遞任務時的隊列頭部競爭,相比傳統線程池有一定的性能優勢。
processAddRequestV3
核心線程池任務:WriteEntryProcessorV3顯然,核心的處理邏輯在write.run方法內,繼續開扒。run方法中核心邏輯封裝在 getAddResponse()。
WriteEntryProcessorV3#rungetAddResponse方法內會對當前請求的標記,判斷後分別調用recoveryAddEntry 和addEntry這兩個方法。前者的使用場景顧名思義是在異常恢復流程中被觸發,一般是節點啓動,宕機後重啓等過程中恢復數據。addEntry方法位於Bookie內,Bookie是個接口,只有一個實現類BookieImpl。
WriteEntryProcessorV3#getAddResponse
存儲引擎接口抽象:Bookie
繼續來看BookieImpl#addEntry方法,在1067這一行加上了synchronized鎖,鎖的對象為handle,具體為LedgerDescriptor類型,這表示在單個Ledger內部的數據在寫入時通過加鎖的方式實現串行化寫入。
1073行的addEntryInternal 方法內部是核心的寫入邏輯。
BookieImpl#addEntry
Ledger的管理者:LedgerDescriptor
getLedgerForEntry方法基於傳入的參數LedgerId查找到對應的LedgerDescriptor,該類是一個抽象類,有兩個實現類,分別是 LedgerDescriptorImpl和LedgerDescriptorReadOnlyImpl,顧名思義,二者分別提供讀寫功能。
BookieImpl#getLedgerForEntry
LedgerDescriptor的兩個實現類
handles是HandleFactory類型接口,從其定義的接口來看主要作用就是實現LedgerDescriptor的讀寫分離,且只有一個實現HandleFactoryImpl,在HandleFactoryImpl 中保存了2個Map類型的MAP。分別服務於兩個接口的調用,getHandle方法就是從map中獲取可以寫入的LedgerDescriptor。
HandleFactory
事實上LedgerDescriptorReadOnlyImpl的實現很簡單,繼承了LedgerDescriptorImpl後將該類涉及到寫入的方法全部重寫為拋出異常!
LedgerDescriptorReadOnlyImpl
獲取到對應的LedgerDescriptor後,就需要進行寫入操作,下面分析BookieImpl#addEntryInternal方法。
從邏輯上來講,entry先是被寫入Ledger storage(930行),其次才被寫入journal 日誌,同時journal日誌的寫入是可選的,默認情況下開啓;journal 關閉後將不存在數據落盤的邏輯,這意味着將無法依靠journal日誌進行數據恢復。但考慮到消息寫入時一般是多份,不考慮寫入的多個節點同時宕機的情況,數據某種程度上依然是可靠的。
BookieImpl#addEntryInternal
Ledger級的接口抽象:LedgerStorageLedgerDescriptorImpl中持有一個ledgerStorage類型,該組件負責最終的entry對象寫入,存在多個實現類,分別是:DbLedgerStorage,SingleDirectoryDbLedgerStorage,InterleavedLedgerStorage,SortedLedgerStorage。
LedgerDescriptorImpl
LedgerStorage實現類
Bookie默認使用SortedLedgerStorage,但Pulsar 中使用 DbLedgerStorage 進行管理。
實際可配置的實現只有三個選項,下面依次對每個實現類進行分析。
ServerConfiguration
1.DbLedgerStorage->SingleDirectoryDbLedgerStorage
writeCache寫入
DbLedgerStorage主要特點是使用了rocksDB保存[ledgerId+entryId --> location]的映射關係;內部又存在了一層套娃。addEntry方法中先獲取到LedgerId, 再根據LedgerId獲取ledgerStorage,也就是説LedgerId和實際的LedgerStorage 存在映射關係;DbLedgerStorage內部又繼續封裝SingleDirectoryDbLedgerStorage 類來支撐數據寫入,具體是一個ListledgerStrageList;字段。經過hash後獲得真實的
SingleDirectoryDbLedgerStorage對象進行實際的addEntry操作;下文首先對該實現進行分析。
DbLedgerStorage#addEntry
DbLedgerStorage#getLedgerStorage
DbLedgerStorage的成員變量
在SingleDirectoryDbLedgerStorage的源碼中,待寫入的entry僅僅是被放入writeCache 中,put成功後更新LAC並通知相關監聽者,同時觸發寫入成功事件,貌似沒有任何寫盤的操作出現!!!進一步分析497行,如果put失敗會觸發flush操作並嘗試再次addEntry,這裏的flush有點眼熟,有必要展開分析一波。
不難發現這裏的寫入操作和刷盤操作其實是線程隔離的,默認情況下,類比於RMQ,大部分存儲組件的刷盤操作和實際寫入動作切分為兩個線程在執行,刷盤線程會不斷地巡檢是否需要刷盤,主要基於當前未刷盤的數據量以及距離上次刷盤的時間間隔,如果開啓同步刷盤,一般寫入線程會被掛起在req請求上,當刷盤進度已經cover寫入請求的offset時,被掛起的請求上的線程會被喚醒繼續執行,這是一種非常典型的存儲引擎設計模式。這裏writeCache就是個buffer,既可以充當寫入緩衝也可以充當讀取緩衝,在tail read場景下會有非常好的性能收益。
SingleDirectoryDbLedgerStorage#addEntry
writeCache背後的flush
triggerFlushAndAddEntry的邏輯並不複雜,在超時時間到來之前會不斷的檢查當前的刷盤標記位,如果沒有正在刷盤以及刷盤邏輯沒有被觸發,會嘗試刷盤,同時嘗試繼續向writeCache中put數據,因為刷盤成功後會在cache中清理出一部分空間,用於put新的的數據,一旦put成功立即返回,跟外層的addEntry方法類似,只是多了個刷盤邏輯的處理。
SingleDirectoryDbLedgerStorage#triggerFlushAndAddEntry方法
flush方法其實是個空殼,核心邏輯在checkpoint()方法內,該方法的主要邏輯為:
交換writeCache,避免刷盤過程中數據無法寫入,導致寫入抖動;
對writeCache內的數據進行排序,實現局部有序;
分別調用entryLog的add方法和entryIndex的add方法;
調用entrylog的flush和entryIndex的flush進行刷盤。
SingleDirectoryDbLedgerStorage#flush
SingleDirectoryDbLedgerStorage#checkpoint
源碼中的writeCacheBeingFlushed實際上和writeCache一體兩面,上一次刷盤結束後writeCacheBeingFlushed會被clear,再次刷盤時會交換兩者;保證寫入的穩定性;如果實際查詢數據時要利用這部分cache,需要查詢兩次,先查writeCache如果不存在 ,再查writeCacheBeingFlushed。
writeCacheBeingFlushed的註釋
entryLocationIndex.addLocation(batch, ledgerId, entryId, location);方法底層依賴rocksDB 建立了[ledgerId, entryId]-->location的映射關係,Batch在這裏代表着 一個 RocksDBBatch,location可以理解為實際磁盤文件上的offset。rocksDB引擎超出本文範疇,這裏不做分析。
EntryLogger
entryLogger代表着存儲實際數據的組件抽象,調用addEntry(ledgerId, entry)方法完成數據寫入。進一步對addEntry方法展開分析,發現EntryLogger是一個接口,有2個直接實現類,分別是DefaultEntryLogger和DirectEntryLogger,默認使用DefaultEntryLogger參見源碼:
DbLedgerStorage#initialize -part1
DbLedgerStorage#initialize -part2
最終的調用來到了EntryLogManagerBase#addEntry方法,首先獲取到待寫入的數據,然後調用BufferedLogChannel#write將其寫入,可以看到實際的數據長度為:entry.readableBytes() + 4,4個字節用於記錄長度,先寫入長度值,再寫入entry的二進制數據;addEntry方法返回值為location,方法的最後一行表明location由2部分組成,分別是logId和pos,各暫用高位和低位的4個字節。很容易想到隨着時間的推移EntryLogger中的文件不止一個,因此需要一個logId來標識不同文件,具體到文件上又需要一個偏移量來定位具體的一條數據,4個字節的pos也表明了單個entryLog文件理論大小值不能超過4個G,實際默認值為1G。
EntryLogManagerBase#addEntry
進一步分析BufferedLogChannel#write方法,發現BufferedLogChannel 繼承置之BufferedChannel ,在BufferedChannel中有一個writeBuffer ,write方法只是將數據寫入到這個writeBuffer 中,至於是否刷盤則不一定。
只在滿足下列兩種情況時數據才會刷盤:
當前writeBuffer 已經寫滿,writeBuffer默認值64KB;
ServerConfiguration中配置了FLUSH_ENTRYLOG_INTERVAL_BYTES參數,且值大於0,默認值為0。
BufferedLogChannel#write
flush方法內容很簡單,調用底層的fileChannel將writeBuffer中的數據寫入底層的文件系統,但是flush並不保證一定落盤,而是最後一行代碼forceWrite方法保證。forceWrite 會調用fileChannel.force(forceMetadata)將數據同步到磁盤上。
BufferedChannel#flush
為了保證數據的務必落盤,在SingleDirectoryDbLedgerStorage#checkpoint方法中,addEntry方法之後,又單獨調用了entryLogger.flush();和ledgerIndex.flush();對entryLogger.flush()進一步拆分,發現底層調用了EntryLogManagerBase#flush方法,二者兩個方法在base類中是abstract類型,具體實現又落到了 EntryLogManagerForSingleEntryLog中,最終任務還是落在了 BufferedChannel#flushAndForceWrite上。
BufferedChannel#flushAndForceWrite
2.SortedLedgerStorage->InterleavedLedgerStorage
在SortedLedgerStorage類中,持有了InterleavedLedgerStorage類型,大部分的接口實現都委託給了InterleavedLedgerStorage的相關方法調用,SortedLedgerStorage的最大特點是,每次數據寫入時都會進行排序,其內部使用了跳錶。
EntryMemTable寫入
addEntry方法的邏輯非常簡單,將數據add到memtTable後,更新下LAC即結束;
SortedLedgerStorage#addEntry
繼續研究EntryMemTable的addEntry方法之前先了解下EntryMemTable的結構,這個組件是一個純內存的數據保存結構,kvmap和snapshot 負責實際數據保存,二者類型皆是 EntrySkipList ,這個類簡單封裝了ConcurrentSkipListMap,實際使用時KV值相同,因為需要保證有序,所以重寫了排序規則,主要比較LedgerId和entryId。
kvmap和snapshot工作機制是,當寫滿kvmap時,會將數據交換給snapshot,kvmap重新構建一個新的指定大小的結構,後台線程負責將snpshot中的數據刷盤保存,因此只要後台刷盤的速度不是特別垮,可以提供持續不間斷的寫入。單個kvmap有大小限制,默認64M大小,結合前面的swap機制,最多可以兜住128M的寫入緩存。
EntryMemTable
addEntry 寫入之前先獲取讀鎖,(沒錯,寫入用的是讀鎖!!!)然後將數據put進入kvmap結構中,internalAdd方法內容很簡單,就是一個對kvmap的putIfAbsent 調用,看到這裏可以理解為什麼用的是讀鎖了。因為這裏kvmap的併發安全控制根本不依賴這個讀寫鎖。
EntryMemTable#addEntry
EntryMemTable刷盤
讀寫鎖的主要作用是,在swap kvmap和snapshot瞬間加上寫鎖控制以及讀取數據時加上讀鎖控制。
ReentrantReadWriteLock使用場景
每次刷盤之前會先創建個一個snapshot快照,用以保證此快照之前的數據在此次的刷盤範圍內;創建snapshot時,會交換kvmap與snapshot兩個字段,因為快照的創建是刷盤行為觸發的,而刷盤動作一般都是有個單獨的線程在執行,所以這裏需要控制併發邏輯,保證在swap的瞬間,不能有addEntry操作,同樣的在刷盤結束後需要清理snapshot的數據,也加上了寫鎖來控制。
EntryMemTable#snapshot
會有一個後台的刷盤線程執行flush操作,首先會先將snapshot數據flush,然後嘗試創建新的snapshot,如果創建成功説明,仍然有可刷數據,再次執行flushSnapshot的動作。
EntryMemTable#flush
在flushSnapshot 方法中,會調用flusher的process 方法,這裏的flusher 其實就是 SortedLedgerStorage,在process方法內的實際調用了InterleavedLedgerStorage 的processEntry方法,這個方法並不能保證數據一定會落到磁盤文件上,因此EntryMemTable所謂的flush操作只是將其內存數據刷新到InterleavedLedgerStorage組件中。
EntryMemTable#flushSnapshot
SortedLedgerStorage#process
EntryLogger
繼續來看InterleavedLedgerStorage的處理邏輯,添加Entry後將對應的KV索引寫入 LedgerCache 緩存後返回。查看InterleavedLedgerStorage的entryLogger字段發現,與上文的
SingleDirectoryDbLedgerStorage相同,寫入entry依然用的是 DefaultEntryLogger。
InterleavedLedgerStorage#processEntry
InterleavedLedgerStorage#entryLogger
EntryIndex
上文提到默認情況下Pulsar使用DbLedgerStorage來存儲數據和索引信息,而索引信息默認情況下使用rocksDB來存儲,rocksDB作為頂級KV引擎其性能和穩定性毋庸置疑。但是在實際的使用過程中,某些時候會選擇LedgerStorage的另一個實現類:SortedLedgerStorage。SortedLedgerStorage的主要特點是是在每次寫入數據的時候都會進行內部排序,內部維護一個跳錶,同時其存儲leggerId+entryId到location的映射關係是使用Java的引擎實現。下面對這個Java實現的KV引擎做詳細分析。
ServerConfiguration關於ledgerStorageClass 的配置
仍然是先從entryLog的寫入作為突破口,SortedLedgerStorage內部套了一個InterleavedLedgerStorage對象,前者複用了後者的addEntry方法,核心方法在InterleavedLedgerStorage#processEntry中。
long pos = entryLogger.addEntry(ledgerId, entry, rollLog);方法添加完entry對象後返回對象在文件的offset,內部的add邏輯與上文分析的
SingleDirectoryDbLedgerStorage 一致。
InterleavedLedgerStorage#addEntry
InterleavedLedgerStorage#processEntry
LedgerCache是一個接口,具體的實現只有一個LedgerCacheImpl類,後者內部有兩個支撐組件,IndexInMemPageMgr和IndexPersistenceMgr,從名稱可以看出前者負責數據在內存中的保持,後者負責實際的存儲。按照之前的分析的源碼,很容易聯想到數據大概率先落入memoryPage再落盤,pageSize默認8K,entriesPerPage默認為pageSize/8= 1K。
LedgerCacheImpl
putEntryOffset方法首先通過entryId模以單個page頁的entry數量得到當前entryId在具體的page頁中的偏移量,這裏的page不是OS中的page頁,是Bookeeper單獨抽象出來的page概念,需要區分開。在getLedgerEntryPage方法中,首先會嘗試從內存中獲取LedgerEntryPage對象,如果沒有則調用grabLedgerEntryPage方法從磁盤上加載,內存中緩存的對象結構為InMemPageCollection,內部是一個LRU緩存。
寫入算法分析:
LedgerEntryPage是對單個頁的抽象;
int offsetInPage = entry % entriesPerPage;計算出當前的entryId在單個LedgerEntryPage的邏輯偏移量;
long pageEntry = entry - offsetInPage; 計算出當前LedgerEntryPage中初始entryId;
基於 LedgerId和初始entryId查找定位到LedgerEntryPage,如果緩存中不存在,則從文件中加載;
按照offsetInPage計算當前的offset需要寫入的真實位置,這裏的offset即是 entryLogger中entry location值;
由於寫入的數據為offset是個long類型,需要8個二進制為,實際的寫入的位點為邏輯上的offsetInPage*8。
IndexInMemPageMgr#putEntryOffset
上述的算法自然也是可逆的,讀取的時候同樣基於LedgerId和entryId定位到具體的LedgerEntryPage。然後在計算出實際的物理偏移量,在特定位置讀取到location 參數。
順序寫入的WAL日誌:Journal
分析完writeCache的寫入及其背後的邏輯,我們繼續分析journal日誌的寫入流程;上文提到journal為混合寫入模式,可能存在多個LedgerId的數據混編。在addEntryInternal方法的最後一行中通過LedgerId獲取到真實的journal,獲取的邏輯依然是個hash算法,用來保證相同LedgerId始終落到一個journal上進行處理。
logAddEntry幹了三件事:
entry.retain()調整entry的引用計數值;
journalStats給內部的queueSize +1;
memoryLimitController內存使用限速器,如果超限時,當前線程會被置為等待狀態;
queue.put(.......), 將待寫入的數據放進隊列。
結合logAddEntry源碼發現又是熟悉的味道,寫入方法只是將請求放入隊列,那麼必然存在從隊列獲取數據並進行刷盤的邏輯。既然有put操作,必然有take操作,我們發現takeAll和pollAll方法 ,都位於journal#run方法中,run方法這個名字如此敏感,以至於不跟Thread扯上點關係都説不過去。
Journal#logAddEntry
queue的調用點
public class Journal extends BookieCriticalThread implements CheckpointSource
查看 journal的class簽名 發現其不出所料的實現了Thread的run方法,journal既是順序寫入日誌邏輯的抽象也是後台的刷盤線程的抽象;run方法的實現較為複雜,其註釋表明這是一個專門負責持久化的線程方法,同時負責journal文件的滾動,當journal文件被寫滿時,會使用當前時間戳創建一個新的journal文件,老的文件會被定期回收。
在queue字段旁邊有一個forceWriteRequests字段,這個字段在實際的刷盤邏輯中起到了重要作用。
Journal部分成員變量釋義:
maxGroupWaitInNanos組提交間隔,一般超過這個時間需要刷盤;
flushWhenQueueEmpty開關表示當queue為空時是否刷盤;
bufferedEntriesThreshold表示暫存在toFlush中的對象數量的閾值;
bufferedWritesThreshold表示待刷盤的字節數閾值;
journalPageCacheFlushIntervalMSec真實刷盤的時間間隔。
Journal#run方法成員變量釋義:
localQueueEntries是一個複用的定長數組;
localQueueEntriesIdx是這個定長數組中當前處理的元素索引編號,從0開始;
localQueueEntriesLen代表每次從queue隊列中獲取的對象數量;
toFlush隊列是個可複用的ArrayList,可以認為是個對象池;
numEntriesToFlush是個待刷盤對象數量的計數器,與toFlush配合使用;
lastFlushPosition為上次刷盤位點記錄值;
lastFlushTimeMs為上次刷盤時間點(毫秒單位);
JournalChannel是單個journal文件的抽象,journal代表單個目錄下的多文件抽象;
BufferedChannel代表一個寫入緩衝區,來自於JournalChannel;
qe為QueueEntry類型的臨時變量。
Journal#run的主要邏輯如下:
啓動forceWriteThread線程,這是一個真正意義上的刷盤線程;
journal線程只是將queue中的QueueEntry對象寫入相關的FileChannel 的buffer中,並不保證一定落盤;
實際的刷盤行為由forceWriteThread負責。
不斷的從queue中獲取一組QueueEntry對象,並逐一將其寫入BufferedChannel緩衝區;
從queue中獲取的QE對象放入localQueueEntries數組中;
entry需要符合一定的條件才會被寫入二進制數據流(主要entryId和版本的識別);
寫入調用的是BufferedChannel#write方法,只是將數據寫入內部的writeBuffer中;
寫入緩衝區後,將QE對象添加進入toFlush隊列,同時調整numEntriesToFlush(+1);
繼續處理localQueueEntries中的下一個元素。當localQueueEntriesIdx == localQueueEntriesLen時,表示localQueueEntries元素全部處理完成,此時臨時變量qe(QueueEntry) 置為null。
在處理qe對象的過程中,會綜合多方面條件判斷是否需要刷盤,使用臨時變量 shouldFlush 表示;
當numEntriesToFlush>0且符合以下條件時會觸發“刷盤”邏輯;
當臨時變量qe為空 或者當前的qe 處理的時間超過 maxGroupWaitInNanos;
當臨時變量qe為空並且開啓flushWhenQueueEmpty配置時刷盤;
當臨時變量qe不為空,符合下面兩個條件時刷盤;
且toFlush 中暫存的對象數量超過bufferedEntriesThreshold;
或距離上次刷盤的位點間隔超過bufferedWritesThreshold。
如果滿足刷盤條件,調用 BufferedChannel#flush操作;
flush操作會將之前攢批的writeBuffer中的數據寫入OS的文件系統;
底層作為FileChannel#write方法的入參;
將toFlush相關索引為置空,同時調整numEntriesToFlush;
觸發entry寫入的相關回調邏輯執行;
更新lastFlushPosition。
flush操作完成後將進一步判斷是否需要向forceWriteThread提交真實的刷盤請求;
提交時會將toFlush列表中全部對象連同其他參數封裝成一個請求對象;
一旦提交後將更新lastFlushTimeMs;
符合提交條件的情況有:
開啓syncData ,journal級別的開啓同步刷盤的開關;
當前需要滾動創建新的journal文件;
距離上次真實刷盤時間超過閾值 journalPageCacheFlushIntervalMSec。
Journal#run方法
當真實刷盤請求被提交到forceWriteThread線程後,有必要進一步分析該線程的執行邏輯,相比之下ForceWriteThread#run方法的邏輯簡單很多,解包收到的請求,然後調用syncJournal進行強制刷盤,同時做一些清理回收的動作,以及最後的一些回調方法的觸發和統計操作。這裏的localRequests 也是一個可複用的臨時數組。
ForceWriteThread#run
Journal#syncJournal方法調用了request對象的flushFileToDisk方法,該方法內部調用了logFile.forceWrite(false); 。
logFile就是之前提到的單個journal文件的抽象,即JournalChannel,其內部封裝了BufferedChannel,實際的類型為DefaultEntryLogger,與EntryLogger所使用的底層實現如出一轍。
Journal#syncJournal
JournalChannel類
再論Bookie
上文提到BookieImpl中的addEntry邏輯似乎很簡單,數據寫入交由LedgerHander 和journal 組件,自身則是簡單的封裝。實則不然,查看BooikeImpl的實現,發現其中存在一個SyncThread對象,該對象是一個同步線程,其邏輯為轉寫journal日誌的數據到entryLog和entryIndex。
BookieImpl
啓動checkpoint定期檢查
doCheckpoint在底層最終調用了LedgerStorage#checkpoint方法,與上文提到的writeCache 背後的flush殊途同歸。這裏存在另外一個問題:SyncThread線程是否會與triggerFlushAndAddEntry中的flush線程併發執行,以及是否存在併發刷盤帶來的數據錯亂問題。答案是不會,具體來看checkpoint方法內部存在一個flushMutex鎖,同時在進入鎖之前,首先會對當前的checkpoint做判斷,如果傳入的checkpoint水位線低於當前SingleDirectoryDbLedgerStorage對象持有的lastCheckpoint水位線,則不執行實際的checkpoint動作。
SyncThread#doCheckpoint
server端分析總結
Bookeeper的server端的架構較為複雜,分為多級寫入的架構,收據流向為:
數據首先進入writeCache,有後台線程定期將cache數據同步到entryLog和 entryIndex;
writeCache 底層採用swap機制,保證寫入延遲的穩定性。
調用entryLog和entryIndex 分別寫入業務數據和索引數據;
entryIndex 使用rocksDB作為KV索引保存 LedgerId+ entryId 到 offset的映射關係。
SingleDirectoryDbLedgerStorage#flush操作和EntryLogger#flush操作不同;
前者只是將數據同步到entryLog和entryIndex中;
後者真實調用底層的文件系統進行刷盤。
journal日誌的寫入時可配置的,默認開啓,journal日誌同樣存在後台的刷盤線程;
journal線程一直重複在幹兩件事;
將QueueEntry轉化為二進制寫入bufferChannel的writebuffer;
綜合判斷各種條件,定期向forceWriteThread線程 提交真實的刷盤任務。
四、Bookie的數據讀取流程
server端源碼分析
請求路由
回到BookieRequestProcessor#processRequest的源碼截圖,讀取流程圍繞READ_ENTRY這一opCode展開,同樣在最新版本的 Bookie代碼中,read processor 升級到了V3版本。
BookieRequestProcessor#processRequest
和寫入一樣,在讀取的processReadRequestV3方法中,依然有高優先級線程池和普通線程池,不同的是還多了一個長輪詢線程池,在投遞任務時又出現了熟悉的操作,跟LedgerId 選擇線程池中具體的線程執行操作。
BookieRequestProcessor#processReadRequestV3
直接跳轉ReadEntryProcessorV3#run方法。發現是個空殼,邏輯封裝executeOp方法中。
ReadEntryProcessorV3#run
BookieImpl#readEntry
最終的讀取邏輯在BookieImpl#readEntry中,該方法只是簡單的封裝,根據LedgerId獲取到LedgerDescriptor後,讀取邏輯順利委託給了LedgerDescriptor,在LedgerDescriptor#readEntry方法內進一步套娃,又將請求轉移給了LedgerStorage#getEntry,前文提到LedgerStorage 是個接口,真正幹活的是
SingleDirectoryDbLedgerStorage中的doGetEntry 方法,這個類在寫入請求的分析過程中同樣出場過。
BookieImpl#readEntry
doGetEntry方法的邏輯整體較為簡單,主要分為以下幾步:
如果傳入的entryId為-1 ,表示讀取LAC ,先從Ledger中獲取實際的LAC的entryId,在進行讀取;
默認先從writeCache中讀取,如果讀取不到則去writeCacheBeingFlushed中讀取,命中則直接返回;
如果2級緩存中均不存在,則去readCache中據需讀取;
如果readCache也不存在,那麼就要觸發磁盤讀,先去entryLocationIndex獲取entryLogger中的物理偏移量;
隨後調用entry = entryLogger.readEntry(ledgerId, entryId, entryLocation); 獲取真實數據;
數據獲取到後會放入readCache中;
在方法結束時,會觸發一次預讀,讀取緊挨着當前entry的下一個entry並放入readCache中。
SingleDirectoryDbLedgerStorage#doGetEntry
DefaultEntryLogger如何讀取entry
entry的讀取在上圖的640行,最終調用方法為 DefaultEntryLogger#internalReadEntry方法。邏輯如下:
將location參數轉化為buffer中的position位點;
獲取到FileChannel(856行);
從 pos-4 位置開始讀取20個字節並解析,sizeBuf值為entry的整體長度(4+8+8);
然後分配一個 sizeBuf大小的buffer用於裝載即將要讀取的entry。
DefaultEntryLogger#internalReadEntry
DefaultEntryLogger#readEntrySize
在存量數據足夠的情況下readFromLogChannel方法會盡可能將入參中的buffer填滿,在BufferedReadChannel中存在一個readBuffer,默認大小512字節,read方法仍然有可能命中該緩存。
DefaultEntryLogger#readFromLogChannel
server端分析總結
數據的查詢內容比較簡單,從大的架構上來看整個讀取過程存在三級緩存,都不命中的話才會讀取磁盤。
實際上在上層的broker組件裏還有一層緩存存在。消息獲取流程如下圖所示:
五、讀寫調用鏈分析組件模塊分析
組件模塊分析
BufferedChannel
其派生關係如下圖所示,還有一個SlowBufferedChannel繼承BufferedChannel類,但是該類為測試使用。BufferedReadChannel是讀場景下的主要支撐類,內部有512字節的讀緩衝。
EntryLogger
默認使用DefaultEntryLogger,主要用於存儲實際的entry對象數據,DefaultEntryLogger 和DirectEntryLogger的區別在於一個使用JDK的RandomAccessFile ,另一個直接使用DIO(單獨依賴特定C庫)。
DefaultEntryLogger
DirectEntryLogger
LedgerStorage
基於EntryLogger的上層抽象,主要實現有InterleavedLedgerStorage和
SingleDirectoryDbLedgerStorage, 還有一個SortedLedgerStorage,內部封裝了InterleavedLedgerStorage,複用了大部分的InterleavedLedgerStorage的方法。SortedLedgerStorage每次寫入時對內部的數據進行排序,使用自帶的KV引擎存儲 LedgerId+entryId-->location映射關係。SingleDirectoryDbLedgerStorage每次刷盤時才會對緩存的數據進行排序,使用rocksDB存儲KV關係。
SingleDirectoryDbLedgerStorage
InterleavedLedgerStorage
LedgerDescriptor
包裝類,大部分邏輯委託給ledgerStorage實現。內部持有ledgerId,每個ledgerId 對應一個LedgerDescriptor對象。
Bookie
Bookie節點級存儲抽象,內部封裝了多個journal抽象組成的journalList,ledgerStorage,syncThread線程。
syncThread線程主要負責將journal中的appendLog轉寫為entryLog和enrtyIndex,checkpoint之前的數據在執行GC(數據清理工作,非JVM中的GC)時可被回收刪除。
ReadEntryProcessorV3,WriteEntryProcessorV3
負責讀寫指令的路由和轉化。
寫入流程調用時序
WriteEntryProcessorV3
--> Bookie
-->LedgerDescriptor
-->LedgerStorage
-->EntryLogger
-->BufferedLogChannel
讀取流程調用時序
ReadEntryProcessorV3
--> Bookie
-->LedgerDescriptor
-->LedgerStorage
-->EntryLogger
-->BufferedReadChannel
六、架構總結
Bookie的存儲架構主要分為三大塊,首先是代表WAL日誌的journal文件寫入,以順序混寫的方式提升寫入性能,保證低延遲,通常以獨立盤隔離掛載,典型的消息場景下journal日誌寫完後即可返回。由於是不同topic的混合寫入,journal日誌無法很好的支撐單個topic的消息的順序讀,回溯等場景,會存在讀放大問題。
由此就衍生出了entryLog的二次轉儲,為了儘可能利用順序讀,單個entryLog內部的數據在寫入時會根據ledgerId+entryId排序,這樣同一個ledgerId的數據會緊密的收斂在局部,能夠一定程度上提升讀性能;entryLog寫入後會獲取到消息實際存儲的位點信息offset,由於該offset不可被自定義,很難表述出這條消息在topic寫入序列上為第幾條信息,這一點很重要,因為消費的時候是基於這樣的序列來消費的,同時在消費位點管理時也需要這樣的信息。
entryId的作為一個傳入參數,其作用恰恰如此,是一個面向用户的更易於管理的唯一Id。當用户基於ledgerId+entryId來查找數據時,顯然並不知道這個這條數據實際存儲offset信息。這就誕生了一個額外的 KV 結構,用來保存ledgerId+entryId到 offset的映射關係。Bookie 內嵌了rocksDB的KV引擎,同時也自行實現了一套,Pulsar默認使用rocksDB方式保存 KV 關係。
bookie在整個寫入和讀取過程中利用了大量的用户態緩存機制,相比於mmap的 pageCache機制更為靈活可控,同時也很大程度上降低了讀寫的抖動,尤其是在容器環境下不同 POD 互相干擾的情況。
*文/簌語
本文屬得物技術原創,更多精彩文章請看:得物技術
未經得物技術許可嚴禁轉載,否則依法追究法律責任!