概述

從高層次來看,每個 Spark 應用程序都包含一個驅動程序,該程序運行用户的主函數並在集羣上執行各種並行操作。Spark 提供的核心抽象是彈性分佈式數據集(RDD),即一種分佈在集羣節點間的元素集合,可被並行操作。RDD 可通過以下方式創建:從 Hadoop 文件系統(或任何其他 Hadoop 支持的文件系統)中的文件起步,或基於驅動程序中的現有 Scala 集合進行轉換生成。用户也可要求 Spark 將 RDD 持久化到內存中,使其能在並行操作間高效複用。最後,RDD 能自動從節點故障中恢復。

Spark 的第二個抽象是可在並行操作中使用的共享變量。默認情況下,當 Spark 在不同節點上以任務集形式並行運行函數時,它會將函數中使用的每個變量副本分別發送給每個任務。有時需要在任務之間或任務與驅動程序之間共享變量。Spark 支持兩種類型的共享變量:廣播變量(可用於在所有節點的內存中緩存值)和累加器(僅支持“累加”操作的變量,例如計數器和求和器)。

本指南將通過 Spark 支持的每種語言展示這些特性。若您啓動 Spark 的交互式 shell(Scala shell 使用 bin/spark-shell,Python 使用 bin/pyspark)進行跟隨操作,將更易於理解。

與 Spark 鏈接

Python方式

Spark 3.5.7 支持 Python 3.8 及以上版本。它可以使用標準的 CPython 解釋器,因此像 NumPy 這樣的 C 庫也能正常使用。同時,它也兼容 PyPy 7.3.6 及以上版本。

在 Python 中運行 Spark 應用程序有兩種方式:

使用 bin/spark-submit 腳本(該腳本在運行時已包含 Spark 環境);
通過將其添加到你的 setup.py 配置中,例如:

install_requires=[
        'pyspark==3.5.7'
    ]

若要在不通過 pip 安裝 PySpark 的情況下運行 Python 版 Spark 應用程序,請使用 Spark 目錄下的 bin/spark-submit 腳本。該腳本將加載 Spark 的 Java/Scala 庫,並允許您向集羣提交應用程序。您也可以使用 bin/pyspark 啓動交互式 Python shell。

如需訪問 HDFS 數據,您需要使用與您 HDFS 版本關聯的 PySpark 構建版本。Spark 官網也為常見 HDFS 版本提供了預編譯包。

最後,您需要在程序中導入部分 Spark 類。添加以下代碼行:

from pyspark import SparkContext, SparkConf

PySpark 要求驅動節點和工作節點使用相同次要版本的 Python。默認情況下會使用 PATH 中的 Python 版本,您也可以通過 PYSPARK_PYTHON 環境變量指定所需版本,例如:

$ PYSPARK_PYTHON=python3.8 bin/pyspark
$ PYSPARK_PYTHON=/path-to-your-pypy/pypy bin/spark-submit examples/src/main/python/pi.py

Scala 方式

Spark 3.5.7 默認基於 Scala 2.12 構建和分發(Spark 也可編譯為支持其他 Scala 版本)。使用 Scala 編寫應用程序時,需確保 Scala 版本兼容(例如 2.12.X)。

開發 Spark 應用程序需通過 Maven 添加 Spark 依賴。Spark 可通過 Maven Central 倉庫獲取,座標如下:

groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.5.7

此外,如需訪問 HDFS 集羣,需添加與您 HDFS 版本對應的 hadoop-client 依賴:

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <您的 HDFS 版本>

最後,在程序中導入必要的 Spark 類。添加以下代碼行:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

(在 Spark 1.3.0 之前,需顯式導入 org.apache.spark.SparkContext._ 以啓用必要的隱式轉換。)

Java 方式

Spark 3.5.7 支持使用 lambda 表達式簡潔地編寫函數,您也可以使用 org.apache.spark.api.java.function 包中的類。

請注意,Spark 2.2.0 已移除對 Java 7 的支持。

使用 Java 編寫 Spark 應用程序時,需添加 Spark 依賴。Spark 可通過 Maven Central 倉庫獲取,座標如下:

groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.5.7

此外,如需訪問 HDFS 集羣,需添加與您 HDFS 版本對應的 hadoop-client 依賴:

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <您的 HDFS 版本>

最後,在程序中導入必要的 Spark 類。添加以下代碼行:

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;

初始化 Spark

Python方式

Spark 程序的首要步驟是創建一個 SparkContext 對象,該對象用於告知 Spark 如何訪問集羣。創建 SparkContext 前,需先構建一個包含應用程序配置信息的 SparkConf 對象。

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

appName 參數是應用程序在集羣 UI 上顯示的名稱。master 參數可以是 Spark、Mesos 或 YARN 集羣的 URL,或特殊的 “local” 字符串(表示以本地模式運行)。

實際部署到集羣時,通常不應在代碼中硬編碼 master 值,而是通過 spark-submit 啓動應用並動態獲取該參數。但對於本地測試和單元測試,可傳遞 “local” 來在進程內運行 Spark。

Scala 方式

Spark 程序的首要步驟是創建一個 SparkContext 對象,該對象用於告知 Spark 如何訪問集羣。創建 SparkContext 前,需先構建一個包含應用程序配置信息的 SparkConf 對象。

每個 JVM 中只能有一個活躍的 SparkContext。在創建新的 SparkContext 前,必須通過 stop() 方法關閉當前活躍的實例。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

appName 參數是應用程序在集羣 UI 上顯示的名稱。master 參數可以是 Spark、Mesos 或 YARN 集羣的 URL,或特殊的 “local” 字符串(表示以本地模式運行)。

實際部署到集羣時,通常不應在代碼中硬編碼 master 值,而是通過 spark-submit 啓動應用並動態獲取該參數。但對於本地測試和單元測試,可傳遞 “local” 來在進程內運行 Spark。

Java 方式

Spark 程序的首要步驟是創建一個 JavaSparkContext 對象,該對象用於告知 Spark 如何訪問集羣。創建 SparkContext 前,需先構建一個包含應用程序配置信息的 SparkConf 對象。

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);

appName 參數是應用程序在集羣 UI 上顯示的名稱。master 參數可以是 Spark、Mesos 或 YARN 集羣的 URL,或特殊的 “local” 字符串(表示以本地模式運行)。

實際部署到集羣時,通常不應在代碼中硬編碼 master 值,而是通過 spark-submit 啓動應用並動態獲取該參數。但對於本地測試和單元測試,可傳遞 “local” 來在進程內運行 Spark。

使用 Shell

Python 方式

在 PySpark shell 中,系統已預先為您創建了一個特殊的解釋器感知型 SparkContext,該對象存儲在名為 sc 的變量中。此時自行創建 SparkContext 將無法正常工作。

您可通過 --master 參數指定上下文連接的集羣主節點,並通過 --py-files 參數傳遞逗號分隔的列表,將 Python 的 .zip、.egg 或 .py 文件添加到運行時路徑。關於第三方 Python 依賴管理,請參閲 Python 包管理文檔。

若需在 shell 會話中添加依賴(如 Spark 包),可通過 --packages 參數提供以逗號分隔的 Maven 座標。依賴可能存在的其他倉庫(如 Sonatype)可通過 --repositories 參數傳遞。例如,若要在恰好四個核心上運行 bin/pyspark,可使用以下命令:

$ ./bin/pyspark --master local[4]

或者,若要將 code.py 添加到搜索路徑(以便後續能夠導入 code 模塊),可使用以下命令:

$ ./bin/pyspark --master local[4] --py-files code.py

若要查看完整的選項列表,可運行 pyspark --help。實際上,pyspark 在底層調用了更通用的 spark-submit 腳本。

此外,還可以在增強型 Python 解釋器 IPython 中啓動 PySpark shell。PySpark 兼容 IPython 1.0.0 及更高版本。如需使用 IPython,請在運行 bin/pyspark 時將 PYSPARK_DRIVER_PYTHON 變量設置為 ipython:

$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark

要使用 Jupyter notebook(此前稱為 IPython notebook),

$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark

您可以通過設置 PYSPARK_DRIVER_PYTHON_OPTS 來自定義 ipython 或 jupyter 命令。

啓動 Jupyter Notebook 服務器後,您可以從 “Files” 標籤頁創建新筆記本。在筆記本中開始使用 Jupyter notebook 嘗試 Spark 之前,可以輸入命令 %pylab inline 作為筆記本的一部分。

Scala 方式

在 Spark shell 中,系統已預先為您創建了一個特殊的解釋器感知型 SparkContext,該對象存儲在名為 sc 的變量中。此時自行創建 SparkContext 將無法正常工作。

您可通過 --master 參數指定上下文連接的集羣主節點,並通過 --jars 參數傳遞逗號分隔的列表,將 JAR 文件添加到類路徑中。

若需在 shell 會話中添加依賴(如 Spark 包),可通過 --packages 參數提供以逗號分隔的 Maven 座標。依賴可能存在的其他倉庫(如 Sonatype)可通過 --repositories 參數傳遞。例如,若要在恰好四個核心上運行 bin/spark-shell,可使用以下命令:

$ ./bin/spark-shell --master local[4]

或者,若要將 code.jar 添加到其類路徑中,可使用以下命令:

$ ./bin/spark-shell --master local[4] --jars code.jar

若要通過 Maven 座標添加依賴項:

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

若要查看完整的選項列表,可運行 spark-shell --help。實際上,spark-shell 在底層調用了更通用的 spark-submit 腳本。

彈性分佈式數據集(RDDs)

Spark 的核心概念是彈性分佈式數據集(RDD),它是一個可並行操作的容錯元素集合。創建 RDD 有兩種方式:

  • 在驅動程序中並行化現有集合;
  • 引用外部存儲系統(如共享文件系統、HDFS、HBase 或任何提供 Hadoop InputFormat 的數據源)中的數據集。

並行化集合

Python 方式

並行化集合是通過在驅動程序中的現有可迭代對象或集合上調用 SparkContext 的 parallelize 方法創建的。集合的元素會被複制以形成一個可並行操作的分佈式數據集。例如,以下代碼展示瞭如何創建一個包含數字 1 到 5 的並行化集合:

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

創建後,分佈式數據集(distData)即可進行並行操作。例如,我們可以調用 distData.reduce(lambda a, b: a + b) 來對列表中的元素求和。我們將在後續內容中介紹分佈式數據集的操作。

並行集合的一個重要參數是數據集的分區數量,即將數據集切分成多少個部分。Spark 會為集羣中的每個分區運行一個任務。通常,您希望為集羣中的每個 CPU 設置 2 到 4 個分區。默認情況下,Spark 會嘗試根據您的集羣自動設置分區數。但您也可以通過將分區數作為第二個參數傳遞給 parallelize 來手動設置(例如 sc.parallelize(data, 10))。

注意:代碼中的某些地方使用術語 slices(分區的同義詞)以保持向後兼容性。

Scala 方式

並行化集合是通過在驅動程序中的現有集合(Scala Seq)上調用 SparkContext 的 parallelize 方法創建的。集合的元素會被複制以形成一個可並行操作的分佈式數據集。例如,以下代碼展示瞭如何創建一個包含數字 1 到 5 的並行化集合:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

創建後,分佈式數據集(distData)即可進行並行操作。例如,我們可以調用 distData.reduce((a, b) => a + b) 來對數組中的元素求和。我們將在後續內容中介紹分佈式數據集的操作。

並行集合的一個重要參數是數據集的分區數量,即將數據集切分成多少個部分。Spark 會為集羣中的每個分區運行一個任務。通常,您希望為集羣中的每個 CPU 設置 2 到 4 個分區。默認情況下,Spark 會嘗試根據您的集羣自動設置分區數。但您也可以通過將分區數作為第二個參數傳遞給 parallelize 來手動設置(例如 sc.parallelize(data, 10))。

注意:代碼中的某些地方使用術語 slices(分區的同義詞)以保持向後兼容性。

Java 方式

並行化集合是通過在驅動程序中的現有集合上調用 JavaSparkContext 的 parallelize 方法創建的。集合的元素會被複制以形成一個可並行操作的分佈式數據集。例如,以下代碼展示瞭如何創建一個包含數字 1 到 5 的並行化集合:

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

創建後,分佈式數據集(distData)即可進行並行操作。例如,我們可以調用 distData.reduce((a, b) -> a + b) 來對列表中的元素求和。我們將在後續內容中介紹分佈式數據集的操作。

並行集合的一個重要參數是數據集的分區數量,即將數據集切分成多少個部分。Spark 會為集羣中的每個分區運行一個任務。通常,您希望為集羣中的每個 CPU 設置 2 到 4 個分區。默認情況下,Spark 會嘗試根據您的集羣自動設置分區數。但您也可以通過將分區數作為第二個參數傳遞給 parallelize 來手動設置(例如 sc.parallelize(data, 10))。

注意:代碼中的某些地方使用術語 slices(分區的同義詞)以保持向後兼容性。

外部數據集

Python 方式

PySpark可以從Hadoop支持的任何存儲源創建分佈式數據集,包括本地文件系統、HDFS、Cassandra、HBase、Amazon S3等。Spark支持文本文件、SequenceFiles以及任何其他Hadoop輸入格式。

文本文件RDD可以通過SparkContext的textFile方法創建。該方法接收文件URI(可以是本地路徑,或hdfs://、s3a://等URI)並將其按行讀取為集合。以下是調用示例:

>>> distFile = sc.textFile("data.txt")

創建後,distFile可被數據集操作處理。例如,我們可以通過map和reduce操作累加所有行的長度:distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)。

關於Spark讀取文件的注意事項:

若使用本地文件系統路徑,該文件必須在所有工作節點的相同路徑下可訪問。需將文件複製到所有工作節點,或使用網絡掛載的共享文件系統。
Spark所有基於文件的輸入方法(包括textFile)支持對目錄、壓縮文件和通配符進行操作。例如:textFile(“/my/directory”)、textFile(“/my/directory/.txt") 和 textFile("/my/directory/.gz”)。
textFile方法支持可選的第二參數用於控制文件分區數。默認情況下,Spark為文件的每個塊創建一個分區(HDFS中默認為128MB),但可通過傳遞更大值要求更多分區。注意分區數不能少於塊數。
除文本文件外,Spark的Python API還支持其他數據格式:

SparkContext.wholeTextFiles 可讀取包含多個小文本文件的目錄,並以(文件名,內容)鍵值對形式返回每個文件。這與textFile(按行返回記錄)形成對比。
RDD.saveAsPickleFile 和 SparkContext.pickleFile 支持將RDD以Python對象序列化的簡易格式保存。序列化時默認採用批量處理,批次大小為10。
SequenceFile和Hadoop輸入/輸出格式
(注意:此功能目前標記為實驗性,面向高級用户。未來可能被基於Spark SQL的讀寫支持取代,屆時建議優先使用Spark SQL)

Writable支持
PySpark的SequenceFile支持在Java中加載鍵值對RDD,將Writable轉換為基本Java類型,並通過pickle序列化Java對象。當將鍵值對RDD保存為SequenceFile時,PySpark執行反向操作:將Python對象反序列化為Java對象,再轉換為Writable。以下Writable類型會自動轉換:

Writable Type

Python Type

Text

str

IntWritable

int

FloatWritable

float

DoubleWritable

float

BooleanWritable

bool

BytesWritable

bytearray

NullWritable

None

MapWritable

dict

數組無法直接處理。用户需要在讀寫時指定自定義的ArrayWritable子類型。寫入時,用户還需指定將數組轉換為自定義ArrayWritable子類型的轉換器。讀取時,默認轉換器會將自定義ArrayWritable子類型轉換為Java Object[],隨後序列化為Python元組。若要獲取基本類型數組對應的Python array.array,用户需指定自定義轉換器。

保存與讀取SequenceFiles
與文本文件類似,可通過指定路徑保存和讀取SequenceFiles。鍵值類型可以指定,但對於標準Writable類型則無需顯式聲明。

>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]

保存與讀取其他Hadoop輸入/輸出格式
PySpark同樣支持讀寫任何Hadoop InputFormat或OutputFormat(包括新舊版Hadoop MapReduce API)。必要時,可將Hadoop配置以Python字典形式傳入。以下是通過Elasticsearch ESInputFormat的調用示例:

$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"}  # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
                             "org.apache.hadoop.io.NullWritable",
                             "org.elasticsearch.hadoop.mr.LinkedMapWritable",
                             conf=conf)
>>> rdd.first()  # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
 {u'field1': True,
  u'field2': u'Some Text',
  u'field3': 12345})

需要注意的是,若InputFormat僅依賴於Hadoop配置和/或輸入路徑,且鍵值類型可根據上表輕鬆轉換,則此方法能良好適用於此類場景。

如果涉及自定義序列化的二進制數據(例如從Cassandra/HBase加載數據),則需要先在Scala/Java端將數據轉換為可由pickle序列化器處理的格式。為此提供了Converter特質(trait),只需擴展該特質並在convert方法中實現轉換邏輯。請確保此類以及訪問InputFormat所需的依賴項均打包至Spark作業jar包,幷包含在PySpark類路徑中。

關於使用自定義轉換器操作Cassandra/HBase InputFormat和OutputFormat的示例,可參考Python示例及Converter示例。

Scala 方式

Spark可以從Hadoop支持的任何存儲源創建分佈式數據集,包括本地文件系統、HDFS、Cassandra、HBase、Amazon S3等。Spark支持文本文件、SequenceFiles以及任何其他Hadoop輸入格式。

文本文件RDD可以通過SparkContext的textFile方法創建。該方法接收文件URI(可以是本地路徑,或hdfs://、s3a://等URI)並將其按行讀取為集合。以下是調用示例:

scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26

創建後,distFile可被數據集操作處理。例如,我們可以通過map和reduce操作累加所有行的長度:distFile.map(s => s.length).reduce((a, b) => a + b)

關於Spark讀取文件的注意事項:

  • 若使用本地文件系統路徑,該文件必須在所有工作節點的相同路徑下可訪問。需將文件複製到所有工作節點,或使用網絡掛載的共享文件系統。
  • Spark所有基於文件的輸入方法(包括textFile)支持對目錄、壓縮文件和通配符進行操作。例如:textFile("/my/directory")textFile("/my/directory/*.txt")textFile("/my/directory/*.gz")。讀取多個文件時,分區的順序取決於文件系統返回文件的順序(例如不一定按路徑字典序排列),分區內元素保持其在原文件中的順序。
  • textFile方法支持可選的第二參數用於控制文件分區數。默認情況下,Spark為文件的每個塊創建一個分區(HDFS中默認為128MB),但可通過傳遞更大值要求更多分區。注意分區數不能少於塊數。

除文本文件外,Spark的Scala API還支持其他數據格式:

  • SparkContext.wholeTextFiles 可讀取包含多個小文本文件的目錄,並以(文件名,內容)鍵值對形式返回每個文件。這與textFile(按行返回記錄)形成對比。分區由數據本地性決定,有時可能導致分區過少,此時可通過可選的第二參數控制最小分區數。
  • 對於SequenceFiles,可使用SparkContext.sequenceFile[K, V]方法,其中K和V需為Hadoop Writable接口的子類(如IntWritable和Text)。Spark還支持為常見Writable類型指定原生類型,例如sequenceFile[Int, String]會自動讀取IntWritables和Texts。
  • 對於其他Hadoop InputFormats,可使用SparkContext.hadoopRDD方法(需指定JobConf、輸入格式類及鍵值類型),或使用SparkContext.newAPIHadoopRDD(基於新版MapReduce API)。
  • RDD.saveAsObjectFileSparkContext.objectFile 支持以序列化Java對象的簡易格式保存RDD。雖然效率不如Avro等專業格式,但提供了保存任意RDD的便捷方式。
Java 方式

Spark可以從Hadoop支持的任何存儲源創建分佈式數據集,包括本地文件系統、HDFS、Cassandra、HBase、Amazon S3等。Spark支持文本文件、SequenceFiles以及任何其他Hadoop輸入格式。

文本文件RDD可以通過SparkContext的textFile方法創建。該方法接收文件URI(可以是本地路徑,或hdfs://、s3a://等URI)並將其按行讀取為集合。以下是調用示例:

JavaRDD<String> distFile = sc.textFile("data.txt");

創建後,distFile可被數據集操作處理。例如,我們可以通過map和reduce操作累加所有行的長度:distFile.map(s -> s.length()).reduce((a, b) -> a + b)

關於Spark讀取文件的注意事項:

  • 若使用本地文件系統路徑,該文件必須在所有工作節點的相同路徑下可訪問。需將文件複製到所有工作節點,或使用網絡掛載的共享文件系統。
  • Spark所有基於文件的輸入方法(包括textFile)支持對目錄、壓縮文件和通配符進行操作。例如:textFile("/my/directory")textFile("/my/directory/*.txt")textFile("/my/directory/*.gz")
  • textFile方法支持可選的第二參數用於控制文件分區數。默認情況下,Spark為文件的每個塊創建一個分區(HDFS中默認為128MB),但可通過傳遞更大值要求更多分區。注意分區數不能少於塊數。

除文本文件外,Spark的Java API還支持其他數據格式:

  • JavaSparkContext.wholeTextFiles 可讀取包含多個小文本文件的目錄,並以(文件名,內容)鍵值對形式返回每個文件。這與textFile(按行返回記錄)形成對比。
  • 對於SequenceFiles,可使用SparkContext.sequenceFile[K, V]方法,其中K和V需為Hadoop Writable接口的子類(如IntWritable和Text)。
  • 對於其他Hadoop InputFormats,可使用JavaSparkContext.hadoopRDD方法(需指定JobConf、輸入格式類及鍵值類型),或使用JavaSparkContext.newAPIHadoopRDD(基於新版MapReduce API)。
  • JavaRDD.saveAsObjectFileJavaSparkContext.objectFile 支持以序列化Java對象的簡易格式保存RDD。雖然效率不如Avro等專業格式,但提供了保存任意RDD的便捷方式。

RDD 操作

RDD支持兩種類型的操作:轉換(transformations)和行動(actions)。轉換操作會從現有數據集創建新數據集,而行動操作在對數據集進行計算後向驅動程序返回結果值。例如,map是一種轉換操作,它通過函數處理每個數據集元素,並返回代表結果的新RDD;reduce則是一種行動操作,它使用特定函數聚合RDD的所有元素,並將最終結果返回給驅動程序(雖然也存在返回分佈式數據集的並行操作reduceByKey)。

Spark中的所有轉換操作都是惰性的,它們不會立即計算結果,而是僅記錄對基礎數據集(如文件)應用的轉換操作。只有當行動操作需要將結果返回給驅動程序時,轉換操作才會真正執行計算。這種設計讓Spark能夠更高效地運行。例如,系統可以識別出通過map創建的數據集將用於reduce操作,此時只需向驅動程序返回reduce的結果,而無需傳遞龐大的map處理後的數據集。

默認情況下,每次對轉換後的RDD執行行動操作時都可能重新計算該RDD。但您可以使用persist(或cache)方法將RDD持久化到內存中,這樣Spark會在集羣中保留元素內容,顯著提升後續查詢速度。Spark還支持將RDD持久化到磁盤,或在多個節點間進行備份存儲。

基礎操作
Python 方式

RDD基礎示例
以下簡單程序演示RDD基本操作:

lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
  • 第一行從外部文件定義基礎RDD。該數據集並未立即加載到內存或執行操作:lines僅為指向文件的引用。
  • 第二行通過map轉換定義lineLengths。由於惰性計算機制,lineLengths不會立即被計算。
  • 最後執行行動操作reduce。此時Spark將計算拆分為任務在多個機器上運行,每台機器既執行本地map操作也進行局部聚合,最終僅將結果返回驅動程序。

若後續需要重複使用lineLengths,可在reduce前添加:

lineLengths.persist()

這將在首次計算後將lineLengths持久化到內存中。

Scala 方式

RDD基礎示例
以下簡單程序演示RDD基本操作:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
  • 第一行從外部文件定義基礎RDD。該數據集並未立即加載到內存或執行操作:lines僅為指向文件的引用。
  • 第二行通過map轉換定義lineLengths。由於惰性計算機制,lineLengths不會立即被計算。
  • 最後執行行動操作reduce。此時Spark將計算拆分為任務在多個機器上運行,每台機器既執行本地map操作也進行局部聚合,最終僅將結果返回驅動程序。

若後續需要重複使用lineLengths,可在reduce前添加:

lineLengths.persist()

這將在首次計算後將lineLengths持久化到內存中。

Java 方式

RDD基礎示例
以下簡單程序演示RDD基本操作:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
  • 第一行從外部文件定義基礎RDD。該數據集並未立即加載到內存或執行操作:lines僅為指向文件的引用。
  • 第二行通過map轉換定義lineLengths。由於惰性計算機制,lineLengths不會立即被計算。
  • 最後執行行動操作reduce。此時Spark將計算拆分為任務在多個機器上運行,每台機器既執行本地map操作也進行局部聚合,最終僅將結果返回驅動程序。

若後續需要重複使用lineLengths,可在reduce前添加:

lineLengths.persist(StorageLevel.MEMORY_ONLY());

這將在首次計算後將lineLengths持久化到內存中。

向 Spark 傳遞函數
Python 方式

Spark的API高度依賴將驅動程序中的函數傳遞到集羣上運行。推薦以下三種實現方式:

  1. Lambda表達式:適用於可簡寫為單行表達式的簡單函數(Lambda不支持多語句函數或無返回值的語句)
  2. 在調用Spark的函數內部定義局部函數:適用於較長代碼段
  3. 使用模塊中的頂級函數

例如,當需要傳遞比lambda表達式更復雜的函數時,可參考以下代碼:

"""MyScript.py"""
if __name__ == "__main__":
    def myFunc(s):
        words = s.split(" ")
        return len(words)
    
    sc = SparkContext(...)
    sc.textFile("file.txt").map(myFunc)

注意:雖然也可以傳遞類實例中的方法引用(與單例對象相對),但這需要將包含該方法的整個對象一起發送到集羣。例如:

class MyClass(object):
    def func(self, s):
        return s
    def doStuff(self, rdd):
        return rdd.map(self.func)

此時若創建新的MyClass實例並調用doStuff方法,map操作會引用該實例的func方法,導致整個對象被髮送到集羣。

類似地,訪問外部對象的字段也會引用整個對象:

class MyClass(object):
    def __init__(self):
        self.field = "Hello"
    def doStuff(self, rdd):
        return rdd.map(lambda s: self.field + s)

為避免此問題,最簡單的方法是將字段複製到局部變量而非直接訪問外部字段:

def doStuff(self, rdd):
    field = self.field
    return rdd.map(lambda s: field + s)
Scala 方式

Spark API 在很大程度上依賴於將驅動程序中的函數傳遞到集羣上執行。推薦以下兩種實現方式:

  • 匿名函數語法,適用於簡短代碼
  • 全局單例對象中的靜態方法。例如定義對象 MyFunctions 後傳遞 MyFunctions.func1:
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

注意:雖然也可以傳遞類實例中的方法引用(與單例對象相對),但這種方式需要將包含該方法的整個對象一同發送到集羣。例如:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

此時若創建新的 MyClass 實例並調用其 doStuff 方法,內部的 map 操作會引用該實例的 func1 方法,因此需要將整個對象發送至集羣。這相當於編寫 rdd.map(x => this.func1(x))。

類似地,訪問外部對象的字段也會引用整個對象:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

上述代碼等價於 rdd.map(x => this.field + x),其中引用了完整的 this 對象。為避免該問題,最簡單的方法是將字段複製到局部變量中,而非從外部訪問:

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}
Java 方式

Spark API 在很大程度上依賴於將驅動程序中的函數傳遞到集羣上執行。在 Java 中,函數通過實現 org.apache.spark.api.java.function 包中接口的類來表示。創建此類函數有兩種方式:

在自己的類中實現 Function 接口(可作為匿名內部類或命名類),並將其實例傳遞給 Spark
使用 lambda 表達式來簡潔地定義實現
雖然本指南為求簡潔大量使用 lambda 語法,但同樣可以輕鬆使用完整形式的 API。例如,上述代碼可以改寫為:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
  public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});

如果內聯編寫函數顯得冗長,也可以採用:

class GetLength implements Function<String, Integer> {
  public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
  public Integer call(Integer a, Integer b) { return a + b; }
}

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());

需要注意的是,Java 中的匿名內部類只要聲明為 final,也可以訪問封閉作用域中的變量。Spark 會將這些變量的副本發送到各個工作節點,其處理方式與其他語言一致。

理解閉包

Spark 中一個較難理解的方面是在跨集羣執行代碼時變量和方法的範圍與生命週期。修改其範圍外部變量的 RDD 操作常常會引發困惑。在下面的示例中,我們將查看使用 foreach() 遞增計數器的代碼,但類似問題也可能出現在其他操作中。

示例

考慮下面這種簡單的 RDD 元素求和方法,其行為可能因執行是否在同一個 JVM 內而發生差異。一個常見的例子是:在本地模式(–master = local[n])下運行 Spark 與將 Spark 應用程序部署到集羣(例如通過 spark-submit 提交到 YARN)時的對比:

counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)
var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter);
本地模式與集羣模式

上述代碼的行為是不確定的,可能無法按預期工作。為了執行作業,Spark 將 RDD 操作的處理過程分解為多個任務,每個任務由一個執行器(executor)執行。在執行之前,Spark 會計算任務的閉包(closure)。閉包是指那些執行器為了對 RDD(在本例中是 foreach())執行計算而必須可見的變量和方法。這個閉包會被序列化併發送給每個執行器。

發送給每個執行器的閉包內的變量現在是副本,因此,當在 foreach 函數內部引用 counter 時,它不再是驅動節點(driver node)上的那個 counter。驅動節點的內存中仍然存在一個 counter,但執行器無法再看到它!執行器只能看到序列化閉包中的副本。因此,counter 的最終值將仍然是零,因為所有對 counter 的操作都是在引用序列化閉包內的值。

在本地模式(local mode)下,在某些情況中,foreach 函數實際上會在與驅動程序相同的 JVM 中執行,並引用同一個原始的 counter,從而可能實際更新它。

為了確保在這類場景中有明確的行為定義,應該使用累加器(Accumulator)。Spark 中的累加器專門用於提供一種機制,當執行過程分佈在集羣的工作節點(worker nodes)上時,可以安全地更新變量。本指南的累加器部分會更詳細地討論這些內容。

總的來説,不應使用閉包——例如循環或局部定義的方法這類結構——來改變某些全局狀態。Spark 不定義也不保證對從閉包外部引用的對象進行修改的行為。某些這樣做的代碼在本地模式下可能有效,但這只是偶然情況,此類代碼在分佈式模式下不會按預期運行。如果需要某種全局聚合,請改用累加器。

打印 RDD 中的元素

另一種常見的做法是嘗試使用 rdd.foreach(println) 或 rdd.map(println) 來打印 RDD 的元素。在單機環境下,這會生成預期的輸出並打印所有 RDD 元素。然而,在集羣模式下,執行器調用 stdout 的輸出會寫入到執行器的標準輸出中,而不是驅動節點的標準輸出,因此驅動節點上的 stdout 不會顯示這些內容!

如果要在驅動節點上打印所有元素,可以使用 collect() 方法先將 RDD 數據收集到驅動節點,例如:rdd.collect().foreach(println)。但這種方式可能導致驅動節點內存不足,因為 collect() 會將整個 RDD 獲取到單個機器上。如果只需要打印 RDD 的少量元素,更安全的方法是使用 take():rdd.take(100).foreach(println)。

鍵值對操作
Python 方式

雖然大多數 Spark 操作適用於包含任意類型對象的 RDD,但一些特殊操作僅適用於鍵值對形式的 RDD。其中最常見的是分佈式的"洗牌"操作,例如按鍵對元素進行分組或聚合。

在 Python 中,這些操作適用於包含 Python 內置元組(如 (1, 2))的 RDD。只需創建此類元組,然後調用所需操作即可。

例如,以下代碼對鍵值對使用 reduceByKey 操作,統計文件中每行文本出現的次數:

lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

我們還可以使用 counts.sortByKey() 按字母順序對鍵值對進行排序,最後通過 counts.collect() 將它們作為對象列表返回驅動程序。

Scala 方式

雖然大多數 Spark 操作可作用於包含任意類型對象的 RDD,但某些特殊操作僅適用於鍵值對形式的 RDD。其中最常見的當屬分佈式的"洗牌(shuffle)"操作,例如根據鍵(key)對元素進行分組或聚合。

在 Scala 中,這些操作會自動作用於包含 Tuple2 對象(即通過 (a, b) 這種簡單寫法創建的語言內置元組)的 RDD。鍵值對操作由 PairRDDFunctions 類提供,該類會自動封裝包含元組的 RDD。

例如,以下代碼對鍵值對使用 reduceByKey 操作,統計文件中每行文本出現的次數:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

我們還可以使用 counts.sortByKey() 按字母順序對鍵值對排序,最後通過 counts.collect() 將它們作為對象數組傳回驅動程序。

注意: 當使用自定義對象作為鍵值對操作中的鍵時,必須確保自定義的 equals() 方法配套實現了相應的 hashCode() 方法。完整要求請參閲 Object.hashCode() 文檔中概述的約定。

Java 方式

雖然大多數 Spark 操作適用於包含任意類型對象的 RDD,但某些特殊操作僅適用於鍵值對形式的 RDD。其中最常見的是分佈式的"洗牌"操作,例如根據鍵對元素進行分組或聚合。

在 Java 中,鍵值對使用 Scala 標準庫中的 scala.Tuple2 類表示。只需調用 new Tuple2(a, b) 即可創建元組,後續可通過 tuple._1() 和 tuple._2() 訪問其字段。

鍵值對 RDD 由 JavaPairRDD 類表示。可以通過特殊版本的映射操作(如 mapToPair 和 flatMapToPair)從 JavaRDD 構建 JavaPairRDD。JavaPairRDD 既包含標準 RDD 函數,也包含專門的鍵值對函數。

例如,以下代碼對鍵值對使用 reduceByKey 操作來統計文件中每行文本出現的次數:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

我們還可以使用 counts.sortByKey() 按字母順序對鍵值對排序,最後通過 counts.collect() 將它們作為對象數組傳回驅動程序。

注意: 當使用自定義對象作為鍵值對操作中的鍵時,必須確保自定義的 equals() 方法配套實現了相應的 hashCode() 方法。完整要求請參閲 Object.hashCode() 文檔中概述的約定。

轉換操作

下表列出了 Spark 支持的一些常用轉換操作。詳細內容請參閲 RDD API 文檔(Scala、Java、Python、R 版本)以及鍵值對 RDD 函數文檔(Scala、Java 版本)。

轉換操作

含義

map(func)

將源數據集的每個元素通過函數 func 傳遞,返回一個新的分佈式數據集

filter(func)

返回一個新數據集,包含源數據集中使 func 返回 true 的元素

flatMap(func)

與 map 類似,但每個輸入項可映射為 0 個或多個輸出項(因此 func 應返回一個 Seq 而不是單個項)

mapPartitions(func)

與 map 類似,但在 RDD 的每個分區上單獨運行,因此在類型為 T 的 RDD 上運行時,func 必須為 Iterator => Iterator 類型

mapPartitionsWithIndex(func)

與 mapPartitions 類似,但會向 func 提供表示分區索引的整數值,因此在類型為 T 的 RDD 上運行時,func 必須為 (Int, Iterator) => Iterator 類型

sample(withReplacement, fraction, seed)

使用給定的隨機數生成器種子,對數據進行抽樣,抽樣比例為 fraction,可設置是否有放回

union(otherDataset)

返回一個新數據集,包含源數據集和參數數據集中元素的並集

intersection(otherDataset)

返回一個新 RDD,包含源數據集和參數數據集中元素的交集

distinct([numPartitions])

返回一個新數據集,包含源數據集中的不重複元素

groupByKey([numPartitions])

當在 (K, V) 對的數據集上調用時,返回一個 (K, Iterable) 對的數據集

注意:如果分組是為了對每個鍵執行聚合操作(如求和或平均值),使用 reduceByKey 或 aggregateByKey 將獲得更好的性能

注意:默認情況下,輸出的並行度取決於父 RDD 的分區數。可以傳遞可選的 numPartitions 參數來設置不同的任務數

reduceByKey(func, [numPartitions])

當在 (K, V) 對的數據集上調用時,返回一個 (K, V) 對的數據集,其中每個鍵的值使用給定的 reduce 函數 func 進行聚合,func 必須為 (V,V) => V 類型。與 groupByKey 類似,reduce 任務的數量可通過可選的第二個參數配置

aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])

當在 (K, V) 對的數據集上調用時,返回一個 (K, U) 對的數據集,其中每個鍵的值使用給定的組合函數和一箇中性的"零"值進行聚合。允許聚合值類型與輸入值類型不同,同時避免不必要的內存分配。與 groupByKey 類似,reduce 任務的數量可通過可選的第二個參數配置

sortByKey([ascending], [numPartitions])

當在 (K, V) 對的數據集上調用時(其中 K 實現了 Ordered),返回一個按鍵升序或降序排序的 (K, V) 對數據集,排序順序由布爾參數 ascending 指定

join(otherDataset, [numPartitions])

當在類型為 (K, V) 和 (K, W) 的數據集上調用時,返回一個 (K, (V, W)) 對的數據集,包含每個鍵的所有元素對。通過 leftOuterJoin、rightOuterJoin 和 fullOuterJoin 支持外連接

cogroup(otherDataset, [numPartitions])

當在類型為 (K, V) 和 (K, W) 的數據集上調用時,返回一個 (K, (Iterable, Iterable)) 元組的數據集。此操作也稱為 groupWith

cartesian(otherDataset)

當在類型為 T 和 U 的數據集上調用時,返回一個 (T, U) 對的數據集(所有元素的組合對)

pipe(command, [envVars])

通過 shell 命令(如 Perl 或 bash 腳本)處理 RDD 的每個分區。RDD 元素寫入進程的 stdin,輸出到 stdout 的行作為字符串 RDD 返回

coalesce(numPartitions)

將 RDD 中的分區數減少到 numPartitions。在過濾大型數據集後用於更高效地運行操作

repartition(numPartitions)

隨機重新洗牌 RDD 中的數據,以創建更多或更少的分區並在其間平衡數據。這總是通過網絡洗牌所有數據

repartitionAndSortWithinPartitions(partitioner)

根據給定的分區器對 RDD 重新分區,並在每個結果分區內按鍵對記錄排序。這比先調用 repartition 然後在每個分區內排序更高效,因為它可以將排序下推到洗牌機制中

Actions

下表列出了 Spark 支持的一些常用行動操作。詳細內容請參閲 RDD API 文檔(Scala、Java、Python、R 版本)以及鍵值對 RDD 函數文檔(Scala、Java 版本)。

行動操作

含義

reduce(func)

使用函數 func(接收兩個參數並返回一個值)聚合數據集的元素。該函數應滿足交換律和結合律,以便能夠正確並行計算。

collect()

將數據集的所有元素以數組形式返回到驅動程序。這通常在過濾或其他返回數據子集且結果數據量較小的操作之後使用。

count()

返回數據集中的元素個數。

first()

返回數據集中的第一個元素(類似於 take(1))。

take(n)

返回由數據集前 n 個元素組成的數組。

takeSample(withReplacement, num, [seed])

返回數據集中 num 個元素的隨機抽樣數組,可設置是否有放回,並可選擇預設隨機數生成器種子。

takeOrdered(n, [ordering])

使用元素的自然順序或自定義比較器,返回 RDD 的前 n 個元素。

saveAsTextFile(path)

將數據集的元素寫入本地文件系統、HDFS 或其他 Hadoop 支持的文件系統中指定目錄下的文本文件(或一組文本文件)。Spark 會對每個元素調用 toString 方法,將其轉換為文件中的一行文本。

saveAsSequenceFile(path)

(Java 和 Scala)

將數據集的元素以 Hadoop SequenceFile 格式寫入本地文件系統、HDFS 或其他 Hadoop 支持的文件系統中的指定路徑。此操作適用於實現了 Hadoop Writable 接口的鍵值對 RDD。在 Scala 中,也適用於可隱式轉換為 Writable 的類型(Spark 包含了基本類型如 Int、Double、String 等的轉換)。

saveAsObjectFile(path)

(Java 和 Scala)

使用 Java 序列化以簡單格式寫入數據集的元素,之後可通過 SparkContext.objectFile() 加載。

countByKey()

僅適用於 (K, V) 類型的 RDD。返回一個 (K, Int) 對的哈希映射,包含每個鍵的計數。

foreach(func)

對數據集的每個元素運行函數 func。這通常用於產生副作用,例如更新累加器(Accumulator)或與外部存儲系統交互。

注意: 在 foreach() 外部修改除累加器之外的變量可能導致未定義行為。更多細節請參閲《理解閉包》章節。

異步行動操作 Spark RDD API 還暴露了一些行動的異步版本,例如 foreachAsync 對應 foreach,這些操作會立即向調用者返回一個 FutureAction,而不是阻塞等待行動完成。這可用於管理或等待行動的異步執行。

Shuffle操作

Spark中的某些操作會觸發一個稱為**洗牌(Shuffle)**的事件。洗牌是Spark重新分發數據的機制,使得數據能夠以不同的方式在各個分區中進行分組。這一過程通常涉及在執行器(Executor)和機器之間複製數據,因此洗牌是一項複雜且代價高昂的操作。

背景原理

要理解洗牌過程中發生了什麼,我們可以以 reduceByKey 操作為例。reduceByKey 操作會生成一個新的RDD,其中同一個鍵的所有值會被組合成一個元組——包含該鍵以及對與該鍵關聯的所有值執行歸約函數後的結果。這裏面臨的挑戰是,同一個鍵的所有值不一定存儲在同一個分區,甚至不一定在同一台機器上,但為了計算結果,它們必須被放在一起。

在Spark中,數據通常不會為了某個特定操作的需要而預先分佈在合適的分區上。在計算過程中,單個任務僅操作單個分區。因此,為了組織好所有數據,以便執行單個 reduceByKey 的歸約任務,Spark需要執行一個全對全(all-to-all)操作。它必須從所有分區讀取數據,找到所有鍵的所有值,然後將跨分區的值匯聚起來計算每個鍵的最終結果——這個過程就是洗牌。

儘管新洗牌數據的每個分區中的元素集合是確定性的,分區本身的順序也是確定性的,但分區內這些元素的順序是不確定的。如果需要在洗牌後獲得有確定性順序的數據,可以使用以下方法:

  • 使用 mapPartitions 對每個分區進行排序,例如使用 .sorted。
  • 使用 repartitionAndSortWithinPartitions 在重新分區的同時高效地對分區進行排序。
  • 使用 sortBy 來生成一個全局有序的RDD。

可能引起洗牌的操作包括:重新分區操作(如 repartition 和 coalesce)、'ByKey’操作(計數操作除外,例如 groupByKey 和 reduceByKey),以及連接操作(如 cogroup 和 join)。

性能影響

洗牌是一個代價高昂的操作,因為它涉及磁盤I/O、數據序列化和網絡I/O。為了組織數據以進行洗牌,Spark會生成兩組任務:映射任務(Map tasks) 用於組織數據,以及歸約任務(Reduce tasks) 用於聚合數據。這個命名源於MapReduce,與Spark的 map 和 reduce 操作沒有直接關係。

在內部,單個映射任務的結果會保存在內存中,直到內存無法容納為止。然後,這些結果會根據目標分區進行排序,並寫入單個文件。在歸約端,任務會讀取相關的已排序數據塊。

某些洗牌操作會消耗大量的堆內存,因為它們使用內存中的數據結構來在傳輸記錄之前或之後組織記錄。具體來説,reduceByKey 和 aggregateByKey 會在映射端創建這些結構,而’ByKey’操作會在歸約端生成這些結構。當數據無法完全裝入內存時,Spark會將這些表溢出(spill)到磁盤,從而導致額外的磁盤I/O開銷和垃圾回收(GC)壓力。

洗牌還會在磁盤上生成大量的中間文件。從Spark 1.3開始,這些文件會被保留,直到相應的RDD不再被使用並被垃圾回收為止。這樣做是為了在需要重新計算血緣關係(lineage)時,無需重新創建洗牌文件。如果應用程序長時間保留對這些RDD的引用,或者垃圾回收不頻繁發生,那麼垃圾回收可能要在很長時間後才會進行。這意味着長時間運行的Spark作業可能會消耗大量的磁盤空間。臨時存儲目錄由配置Spark上下文時的 spark.local.dir 配置參數指定。

可以通過調整各種配置參數來調優洗牌行為。請參閲《Spark配置指南》中的“Shuffle Behavior”部分。

RDD 持久化

Spark最重要的功能之一是在內存中跨操作**持久化(或緩存)**數據集。當你持久化一個RDD時,每個節點會將其計算的分區存儲在內存中,並在該數據集(或衍生自它的數據集)的其他操作中重用它們。這使得後續操作能夠快得多(通常能快10倍以上)。緩存是迭代算法和快速交互式使用的關鍵工具。

你可以使用RDD的 persist() 或 cache() 方法來標記其需要被持久化。當它第一次在某個行動操作中被計算時,它會被保存在各個節點的內存中。Spark的緩存是容錯的——如果RDD的任何一個分區丟失,它會自動使用最初創建它的那些轉換操作重新計算。

此外,每個被持久化的RDD可以使用不同的存儲級別進行存儲,例如,允許你將數據集持久化到磁盤、以序列化Java對象的形式持久化在內存中(為了節省空間)、在節點間進行復制等。這些級別通過向 persist() 傳遞一個 StorageLevel 對象(Scala, Java, Python)來設置。cache() 方法是使用默認存儲級別(即 StorageLevel.MEMORY_ONLY,將反序列化對象存儲在內存中)的簡寫形式。完整的存儲級別如下:

存儲級別

含義

MEMORY_ONLY

將RDD以反序列化的Java對象形式存儲在JVM內存中。如果RDD無法完全裝入內存,部分分區將不會被緩存,並在每次需要時動態重新計算。這是默認級別。

MEMORY_AND_DISK

將RDD以反序列化的Java對象形式存儲在JVM內存中。如果RDD無法完全裝入內存,則將不適合的分區存儲在磁盤上,並在需要時從磁盤讀取。

MEMORY_ONLY_SER (Java和Scala)

將RDD以序列化的Java對象形式存儲(每個分區一個字節數組)。這通常比反序列化對象更節省空間,尤其是在使用快速序列化庫時,但讀取時CPU開銷更大。

MEMORY_AND_DISK_SER (Java和Scala)

與 MEMORY_ONLY_SER 類似,但會將不適合內存的分區**溢出(spill)**到磁盤,而不是在每次需要時動態重新計算。

DISK_ONLY

僅將RDD分區存儲在磁盤上。

MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等

與上述級別相同,但會在兩個集羣節點上覆制每個分區。

OFF_HEAP (實驗性)

與 MEMORY_ONLY_SER 類似,但將數據存儲在堆外內存中。這需要啓用堆外內存。

注意:在Python中,存儲的對象總是使用Pickle庫進行序列化,因此選擇序列化級別無關緊要。Python中可用的存儲級別包括 MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2, DISK_ONLY, DISK_ONLY_2, 和 DISK_ONLY_3。

即使在用户沒有調用 persist 的情況下,Spark也會在洗牌操作(例如 reduceByKey)中自動持久化一些中間數據。這是為了避免在洗牌過程中若有節點失敗,需要重新計算整個輸入。如果用户計劃重用洗牌結果產生的RDD,我們仍然建議用户對其調用 persist。

如何選擇存儲級別?

Spark的存儲級別旨在在內存使用率和CPU效率之間提供不同的權衡。我們建議通過以下流程進行選擇:

如果您的RDD能很好地適應默認存儲級別(MEMORY_ONLY),則保持原樣。這是CPU效率最高的選項,允許RDD上的操作儘可能快地運行。

如果不能,請嘗試使用MEMORY_ONLY_SER並選擇一個快速的序列化庫,使對象的空間效率更高,同時仍能保持合理的訪問速度。(適用於Java和Scala)

除非計算數據集的函數開銷很大,或者它們過濾了大量數據,否則不要將數據溢出到磁盤。因為在這種情況下,重新計算一個分區的速度可能與從磁盤讀取它的速度一樣快。

如果需要快速故障恢復(例如,使用Spark為Web應用程序提供請求服務),請使用帶複製的存儲級別。所有存儲級別都通過重新計算丟失的數據來提供完整的容錯能力,但帶複製的級別允許您繼續在RDD上運行任務,而無需等待重新計算丟失的分區。

數據移除

Spark會自動監控每個節點上的緩存使用情況,並以最近最少使用(LRU)的方式逐出舊的數據分區。如果您希望手動移除RDD而不是等待它被緩存自動逐出,請使用RDD.unpersist()方法。請注意,此方法默認不阻塞。若需要阻塞直到資源被釋放,請在調用此方法時指定blocking=true。

共享變量

通常,當傳遞給 Spark 操作(如 map 或 reduce)的函數在遠程集羣節點上執行時,它操作的是函數中使用的所有變量的獨立副本。這些變量會被複制到每台機器,且遠程機器上對變量的更新不會傳回驅動程序。支持跨任務的通用讀寫共享變量效率會很低。然而,Spark 確實為兩種常見的使用模式提供了兩種有限類型的共享變量:廣播變量和累加器。

廣播變量

廣播變量允許程序員將一個只讀變量緩存在每台機器上,而不是隨着任務一起發送副本。例如,它們可以用於高效地向每個節點分發一個大型輸入數據集的副本。Spark 還會嘗試使用高效的廣播算法來分發廣播變量,以降低通信成本。

Spark 動作(actions)通過一系列階段(stages)執行,這些階段由分佈式的"洗牌"(shuffle)操作分隔。Spark 會自動廣播每個階段內任務所需的公共數據。以這種方式廣播的數據以序列化形式緩存,並在運行每個任務前反序列化。這意味着,只有當跨多個階段的任務需要相同數據,或者以反序列化形式緩存數據非常重要時,顯式創建廣播變量才有用。

廣播變量通過對變量 v 調用 SparkContext.broadcast(v) 來創建。廣播變量是 v 的一個包裝器,可以通過調用 value 方法來訪問其值。以下代碼展示了這一點:

val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar.value
// 返回: Array(1, 2, 3)
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});

broadcastVar.value();
// returns [1, 2, 3]

廣播變量創建後,在集羣上運行的任何函數中都應使用該廣播變量,而不是原始值 v,以避免將 v 多次發送到節點。此外,對象 v 在廣播後不應被修改,以確保所有節點獲取的廣播變量值一致(例如,如果該變量後續被髮送到一個新節點)。

在 UI 中跟蹤累加器對於理解運行中階段的進度很有用(注意:Python 中尚不支持此功能)。

要釋放廣播變量複製到執行器(executors)上的資源,請調用 .unpersist()。如果之後再次使用該廣播變量,它會被重新廣播。要永久釋放廣播變量使用的所有資源,請調用 .destroy()。此後該廣播變量將無法再使用。請注意,這些方法默認不會阻塞。若要阻塞直到資源釋放,請在調用時指定 blocking=true。

累加器

累加器是一種只能通過關聯性和交換性操作進行"累加"的變量,因此可以高效地並行支持。它們可用於實現計數器(如 MapReduce 中)或求和。Spark 原生支持數值類型的累加器,程序員可以添加對新類型的支持。

作為用户,您可以創建命名或未命名的累加器。如下圖所示,命名累加器(在此例中名為 counter)會在 Web UI 中顯示於修改該累加器的階段下。Spark 會在"Tasks"表中顯示由任務修改的每個累加器的值。


在用户界面(UI)中跟蹤累加器有助於瞭解運行中各階段的進度(注意:此功能目前在 Python 中尚未支持)。

Python 方式

累加器可以通過調用 SparkContext.accumulator(v) 從初始值 v 創建。在集羣上運行的任務隨後可以使用 add 方法或 += 運算符來對其進行累加。但是,任務無法讀取累加器的值。只有驅動程序可以使用累加器的 value 方法來讀取其值。

以下代碼展示瞭如何使用累加器對數組元素進行求和:

>>> accum = sc.accumulator(0)
>>> accum
Accumulator<id=0, value=0>

>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

>>> accum.value
10

雖然這段代碼使用了 Spark 對 Int 類型累加器的內置支持,但程序員也可以通過繼承 AccumulatorParam 類來創建自定義類型的累加器。AccumulatorParam 接口有兩個方法:zero 方法為您的數據類型提供一個"零值",addInPlace 方法將兩個值相加。例如,假設我們有一個表示數學向量的 Vector 類,我們可以這樣寫:

class VectorAccumulatorParam(AccumulatorParam):
    def zero(self, initialValue):
        return Vector.zeros(initialValue.size)

    def addInPlace(self, v1, v2):
        v1 += v2
        return v1

# 然後,創建這種類型的累加器:
vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())

對於僅在動作(action)內部執行的累加器更新,Spark 保證每個任務對累加器的更新只會被應用一次,即重新啓動的任務不會再次更新該值。在轉換(transformation)操作中,用户應注意,如果任務或作業階段被重新執行,每個任務的更新可能會被應用多次。

累加器不會改變 Spark 的惰性求值模型。如果累加器是在對 RDD 的某個操作中進行更新的,那麼它們的值只有在該 RDD 作為某個動作的一部分被計算時才會更新。因此,在像 map() 這樣的惰性轉換操作中進行累加器更新,並不能保證會被執行。下面的代碼片段展示了這個特性:

accum = sc.accumulator(0)
def g(x):
    accum.add(x)
    return f(x)
data.map(g)
# Here, accum is still 0 because no actions have caused the `map` to be computed.
Scala 方式

可以通過調用 SparkContext.longAccumulator() 或 SparkContext.doubleAccumulator() 來分別創建累加 Long 或 Double 類型值的數值累加器。在集羣上運行的任務可以使用 add 方法對其進行累加。但是,任務無法讀取其值。只有驅動程序可以使用累加器的 value 方法讀取其值。

以下代碼展示瞭如何使用累加器對數組元素進行求和:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

雖然這段代碼使用了 Spark 對 Long 類型累加器的內置支持,但程序員也可以通過繼承 AccumulatorV2 類來創建自定義類型的累加器。AccumulatorV2 抽象類有幾個必須重寫的方法:reset 用於將累加器重置為零,add 用於將另一個值加到累加器中,merge 用於將另一個相同類型的累加器合併到當前累加器。必須重寫的其他方法包含在 API 文檔中。例如,假設我們有一個表示數學向量的 MyVector 類,我們可以這樣寫:

class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

  private val myVector: MyVector = MyVector.createZeroVector

  def reset(): Unit = {
    myVector.reset()
  }

  def add(v: MyVector): Unit = {
    myVector.add(v)
  }
  ...
}

// 然後,創建這種類型的累加器:
val myVectorAcc = new VectorAccumulatorV2
// 然後,將其註冊到 spark 上下文中:
sc.register(myVectorAcc, "MyVectorAcc1")

請注意,當程序員定義自己的 AccumulatorV2 類型時,最終的類型可以與所添加元素的類型不同。

對於僅在動作(action)內部執行的累加器更新,Spark 保證每個任務對累加器的更新只會被應用一次,即重新啓動的任務不會再次更新該值。在轉換(transformation)操作中,用户應注意,如果任務或作業階段被重新執行,每個任務的更新可能會被應用多次。

累加器不會改變 Spark 的惰性求值模型。如果累加器是在對 RDD 的某個操作中進行更新的,那麼它們的值只有在該 RDD 作為某個動作的一部分被計算時才會更新。因此,在像 map() 這樣的惰性轉換操作中進行累加器更新,並不能保證會被執行。下面的代碼片段展示了這個特性:

val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// 在這裏,accum 的值仍然是 0,因為沒有觸發 map 操作計算的行動(action)。
Java 方式

可以通過調用 SparkContext.longAccumulator() 或 SparkContext.doubleAccumulator() 來分別創建用於累加 Long 或 Double 類型值的數值累加器。在集羣上運行的任務可以使用 add 方法對其進行累加。但是,任務無法讀取其值。只有驅動程序可以使用累加器的 value 方法讀取其值。

以下代碼展示瞭如何使用累加器對數組元素進行求和:

LongAccumulator accum = jsc.sc().longAccumulator();

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// 返回 10

雖然這段代碼使用了 Spark 對 Long 類型累加器的內置支持,但程序員也可以通過繼承 AccumulatorV2 類來創建自定義類型的累加器。AccumulatorV2 抽象類有幾個必須重寫的方法:reset 用於將累加器重置為零,add 用於將另一個值加到累加器中,merge 用於將另一個相同類型的累加器合併到當前累加器。必須重寫的其他方法包含在 API 文檔中。例如,假設我們有一個表示數學向量的 MyVector 類,我們可以這樣寫:

class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> {

  private MyVector myVector = MyVector.createZeroVector();

  public void reset() {
    myVector.reset();
  }

  public void add(MyVector v) {
    myVector.add(v);
  }
  ...
}

// 然後,創建這種類型的累加器:
VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2();
// 然後,將其註冊到 spark 上下文中:
jsc.sc().register(myVectorAcc, "MyVectorAcc1");

請注意,當程序員定義自己的 AccumulatorV2 類型時,最終的類型可以與所添加元素的類型不同。

警告:當 Spark 任務完成時,Spark 會嘗試將此任務中的累積更新合併到累加器中。如果合併失敗,Spark 將忽略該失敗,仍將任務標記為成功並繼續運行其他任務。因此,一個有問題的累加器不會影響 Spark 作業,但即使 Spark 作業成功,該累加器也可能無法正確更新。

對於僅在動作(action)內部執行的累加器更新,Spark 保證每個任務對累加器的更新只會被應用一次,即重新啓動的任務不會再次更新該值。在轉換(transformation)操作中,用户應注意,如果任務或作業階段被重新執行,每個任務的更新可能會被應用多次。

累加器不會改變 Spark 的惰性求值模型。如果累加器是在對 RDD 的某個操作中進行更新的,那麼它們的值只有在該 RDD 作為某個動作的一部分被計算時才會更新。因此,在像 map() 這樣的惰性轉換操作中進行累加器更新,並不能保證會被執行。下面的代碼片段展示了這個特性:

LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.

部署到集羣

應用程序提交指南介紹瞭如何將應用程序提交到集羣。簡而言之,一旦您將應用程序打包成 JAR 文件(針對 Java/Scala)或一組 .py 或 .zip 文件(針對 Python),就可以使用 bin/spark-submit 腳本將其提交到任何受支持的集羣管理器。

從 Java/Scala 啓動 Spark 任務

org.apache.spark.launcher 包提供了使用簡單 Java API 將 Spark 作業作為子進程啓動的類。

單元測試

Spark 能夠很好地與任何流行的單元測試框架配合進行單元測試。只需在測試中創建一個 SparkContext,將其 master URL 設置為 local,運行您的操作,然後調用 SparkContext.stop() 來終止它。請確保在 finally 塊或測試框架的 tearDown 方法中停止上下文,因為 Spark 不支持在同一程序中同時運行兩個上下文。

進一步學習方向

您可以在 Spark 網站上查看一些 Spark 程序示例。此外,Spark 在 examples 目錄中包含了多個示例(Scala、Java、Python、R)。您可以通過將類名傳遞給 Spark 的 bin/run-example 腳本來運行 Java 和 Scala 示例;例如:

./bin/run-example SparkPi

對於 Python 示例,請改用 spark-submit:

./bin/spark-submit examples/src/main/python/pi.py

對於 R 示例,請改用 spark-submit:

./bin/spark-submit examples/src/main/r/dataframe.R

有關優化程序的幫助,配置和調優指南提供了最佳實踐信息。這些對於確保數據以高效格式存儲在內存中尤為重要。有關部署的幫助,集羣模式概述描述了分佈式操作中涉及的組件和受支持的集羣管理器。

最後,完整的 API 文檔可用於 Scala、Java、Python 和 R。