博客 / 詳情

返回

【趙渝強老師】Spark RDD的緩存機制

Spark RDD通過persist方法或cache方法可以將計算結果的緩存,但是並不是這兩個方法被調用時立即緩存,而是觸發後面的action時,該RDD才會被緩存在計算節點的內存中並供後面重用。下面是persist方法或cache方法的函數定義:

def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
def cache(): this.type = persist()

視頻講解如下:
https://www.bilibili.com/video/BV1azxferEAf/?aid=113246879093...

通過函數的定義發現,cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark在object StorageLevel中定義了緩存的存儲級別。下面是在StorageLevel中的定義的緩存級別。

val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2=new StorageLevel(true, true, false, false, 2)
valOFF_HEAP = new StorageLevel(true, true, true, false, 1)
需要説明的是,使用RDD的緩存機制,數據可能丟失;或者會由於內存的不足而造成數據被刪除。可以通過使用RDD的檢查點機制了保證緩存的容錯,即使緩存丟失了也能保證計算的正確執行。

下面是使用RDD緩存機制的一個示例。這裏使用RDD讀取一個大的文件,該文件中包含918843條記錄。通過Spark Web Console可以對比出在不使用緩存和使用緩存時,執行效率的差別。
(1)讀取一個大文件。

scala> val rdd1 = sc.textFile("/root/temp/sales")

(2)觸發一個計算,這裏沒有使用緩存。

scala> rdd1.count

(3)調用cache方法標識該RDD可以被緩存。

scala> rdd1.cache

(4)第二次觸發計算,計算完成後會將結果緩存。

scala> rdd1.count

(5)第三次觸發計算,這裏會直接從之前的緩存中獲取結果。

scala> rdd1.count

(6)訪問Spark的Web Console觀察這三次count計算的執行時間,可以看成最後一次count計算只耗費了98ms,如下圖所示。
image.png

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.