RDD算子介紹
RDD(彈性分佈式數據集)是Spark的核心數據結構,代表不可變、分區化的數據集合。RDD算子分為兩類:
- 轉換算子(Transformations):惰性操作,生成新RDD
$$ \text{新RDD} = \text{原RDD} \rightarrow \text{算子操作} $$ - 行動算子(Actions):觸發實際計算並返回值
一、常用轉換算子
- map算子
- 功能:對RDD中每個元素調用一次參數中的函數,並將每次調用的返回值直接放入一個新的RDD中
分類:轉換算子
場景:一對一的轉換,需要返回值
語法格式:
def map(self , f: T -> U ) -> RDD[U]
f:代表參數是一個函數
T:代表RDD中的每個元素
U:代表RDD中每個元素轉換的結果 - 舉例説明:
- 需求:計算每個元素的立方
原始數據
1 2 3 4 5 6
目標結果
1 8 27 64 125 216
list01 = [1,2,3,4,5,6]
listRdd = sc.parallelize(list01)
mapRdd = listRdd.map(lambda x: math.pow(x,3))
mapRdd.foreach(lambda x: print(x)) - 對每個元素應用函數$f: T \rightarrow U$
rdd = sc.parallelize([1, 2, 3])
squared = rdd.map(lambda x: x**2) # 得到 [1, 4, 9]
filter(func)保留滿足$f(x)=True$的元素
filtered = rdd.filter(lambda x: x > 1) # 得到 [2, 3]
flatMap(func)先映射再扁平化(輸出可迭代對象)
words = sc.parallelize(["hello world", "spark rdd"])
result = words.flatMap(lambda s: s.split()) # 得到 ["hello", "world", "spark", "rdd"]
groupByKey()****(僅PairRDD)
按Key分組:$ {(k_i,v_j)} \rightarrow (k_i, [v_{j1},v_{j2}...]) $
pair_rdd = sc.parallelize([("a",1), ("b",2), ("a",3)])
grouped = pair_rdd.groupByKey() # 得到 [("a", [1,3]), ("b", [2])]
二、常用行動算子
collect()返回所有元素到Driver程序
data = rdd.collect() # [1, 2, 3]
count()返回元素總數$n$
total = rdd.count() # 3
reduce(func)使用結合律函數$f: (T,T) \rightarrow T$聚合
sum_val = rdd.reduce(lambda a,b: a+b) # 6
foreach(func)對每個元素應用函數(無返回值)
rdd.foreach(lambda x: print(x)) # 分佈式打印
三、算子特性説明
|
特性
|
轉換算子
|
行動算子
|
|
惰性執行
|
✓
|
✗
|
|
生成新RDD
|
✓
|
✗
|
|
觸發任務計算
|
✗
|
✓
|
|
返回值類型
|
RDD
|
非RDD
|
重要提示:轉換算子僅在行動算子觸發時才執行計算,這種設計優化了執行計劃(如流水線優化)。
通過組合算子可實現複雜數據處理:
# 計算文本詞頻
text_rdd = sc.textFile("hdfs://data.txt")
word_counts = text_rdd.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a,b: a+b) \
.collect()
本文章為轉載內容,我們尊重原作者對文章享有的著作權。如有內容錯誤或侵權問題,歡迎原作者聯繫我們進行內容更正或刪除文章。