RDD算子介紹

RDD(彈性分佈式數據集)是Spark的核心數據結構,代表不可變分區化的數據集合。RDD算子分為兩類:

  1. 轉換算子(Transformations):惰性操作,生成新RDD
    $$ \text{新RDD} = \text{原RDD} \rightarrow \text{算子操作} $$
  2. 行動算子(Actions):觸發實際計算並返回值

一、常用轉換算子
  1. map算子
  2. 功能:對RDD中每個元素調用一次參數中的函數,並將每次調用的返回值直接放入一個新的RDD中
    分類:轉換算子
    場景:一對一的轉換,需要返回值
    語法格式:
    def map(self , f: T -> U ) -> RDD[U]
    f:代表參數是一個函數
    T:代表RDD中的每個元素
    U:代表RDD中每個元素轉換的結果
  3. 舉例説明:
  4. 需求:計算每個元素的立方
    原始數據
     1 2 3 4 5 6
    目標結果
     1 8 27 64 125 216
    list01 = [1,2,3,4,5,6]
        listRdd = sc.parallelize(list01)
        mapRdd = listRdd.map(lambda x: math.pow(x,3))
        mapRdd.foreach(lambda x: print(x))
  5. 對每個元素應用函數$f: T \rightarrow U$
rdd = sc.parallelize([1, 2, 3])
squared = rdd.map(lambda x: x**2)  # 得到 [1, 4, 9]
  1. filter(func) 保留滿足$f(x)=True$的元素
filtered = rdd.filter(lambda x: x > 1)  # 得到 [2, 3]
  1. flatMap(func) 先映射再扁平化(輸出可迭代對象)
words = sc.parallelize(["hello world", "spark rdd"])
result = words.flatMap(lambda s: s.split())  # 得到 ["hello", "world", "spark", "rdd"]
  1. groupByKey()****(僅PairRDD)
    按Key分組:$ {(k_i,v_j)} \rightarrow (k_i, [v_{j1},v_{j2}...]) $
pair_rdd = sc.parallelize([("a",1), ("b",2), ("a",3)])
grouped = pair_rdd.groupByKey()  # 得到 [("a", [1,3]), ("b", [2])]

二、常用行動算子
  1. collect() 返回所有元素到Driver程序
data = rdd.collect()  # [1, 2, 3]
  1. count() 返回元素總數$n$
total = rdd.count()  # 3
  1. reduce(func) 使用結合律函數$f: (T,T) \rightarrow T$聚合
sum_val = rdd.reduce(lambda a,b: a+b)  # 6
  1. foreach(func) 對每個元素應用函數(無返回值)
rdd.foreach(lambda x: print(x))  # 分佈式打印

三、算子特性説明

特性

轉換算子

行動算子

惰性執行



生成新RDD



觸發任務計算



返回值類型

RDD

非RDD

重要提示:轉換算子僅在行動算子觸發時才執行計算,這種設計優化了執行計劃(如流水線優化)。

通過組合算子可實現複雜數據處理:

# 計算文本詞頻
text_rdd = sc.textFile("hdfs://data.txt")
word_counts = text_rdd.flatMap(lambda line: line.split()) \
                     .map(lambda word: (word, 1)) \
                     .reduceByKey(lambda a,b: a+b) \
                     .collect()