一、RDD操作


1、創建操作


①從文件創建


文件的一行對應RDD的一個元素:


a.從本地文件創建

//格式:sc.textFile("file://本地文件絕對路徑")

val rdd = sc.textFile("file:///home/centos7/infos.txt")


b.從HDFS文件夾創建


//格式一:sc.textFile("hdfs://HDFS文件絕對路徑")
val rdd = sc.textFile("hdfs:///user/centos7/infos.txt")

//格式二:sc.textFile("HDFS文件絕對路徑")
val rdd = sc.textFile("/user/centos7/infos.txt")

//格式三:sc.textFile("相對路徑"),相當於在相對路徑請默認加了“/user/賬號/”
val rdd = sc.textFile("infos.txt")


②從並行集合創建

//sc.parallelize(arr),arr必須時集合或者數組/序列
val rdd2 = sc.parallelize(arr)


2、轉換操作/轉換算子/Transformation

①map(func):

將每個元素傳遞到函數func中,並將結果返回為一個新的數據集(新RDD的元素個數等於原本RDD的元素個數)


②flatMap(func):

將每個元素傳遞到函數func中,並將結果 “拍扁” 返回為一個新的數據集(新RDD的元素個數與原本RDD的元素個數無必然聯繫)


//創建一個數組
scala> val arr = Array("zhangsan lisi wangwu","zhaoliu"))
arr: Array[String] = Array(zhangsan lisi wangwu, zhaoliu)

//將數組轉化為RDD
scala> val rdd4 = sc.parallelize(arr)
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:26

//查看RDD的全部內容
scala> rdd4.collect
res19: Array[String] = Array(zhangsan lisi wangwu, zhaoliu)

//map算子
scala> rdd4.map(_.split(" ")).collect
res20: Array[Array[String]] = Array(Array(zhangsan, lisi, wangwu), Array(zhaoliu))

//flatMap算子
scala> rdd4.flatMap(_.split(" ")).collect
res21: Array[String] = Array(zhangsan, lisi, wangwu, zhaoliu)


③filter(func):

func返回值必須是布爾類型,將每個元素傳遞到函數func中,並且將滿足func的RDD返回


//創建RDD
scala> val rdd2 = sc.parallelize(Array(1, 2, 3, 4, 5, 6))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:24

//查看RDD的全部內容
scala> rdd2.collect
res30: Array[Int] = Array(1, 2, 3, 4, 5, 6)

//過濾:將滿足條件_%2==0的元素保存下來
scala> rdd2.filter(_%2==0).collect
res31: Array[Int] = Array(2, 4, 6)

scala> rdd2.filter(_%2!=0).collect
res32: Array[Int] = Array(1, 3, 5)

scala> rdd2.filter(_<=3).collect
res33: Array[Int] = Array(1, 2, 3)


④groupByKey():

將相同key的value放在一起,必須要應用在鍵值對RDD上


//創建RDD
scala> val rdd5 = sc.parallelize(List("hadoop","spark","spark"))
rdd5: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[35] at parallelize at <console>:24

//查看RDD的全部內容
scala> rdd5.collect
res35: Array[String] = Array(hadoop, spark, spark)

//將RDD的每一個元素變為鍵值對(元組),//這一類元素是鍵值對的RDD,稱其為鍵值對RDD(Pair RDD)
scala> rdd5.map((_,1)).collect
res36: Array[(String, Int)] = Array((hadoop,1), (spark,1), (spark,1))

//執行groupByKey算子,將相同key的value放在一起
scala> rdd5.map((_,1)).groupByKey().collect
res41: Array[(String, Iterable[Int])] = Array((spark,CompactBuffer(1, 1)), (hadoop,CompactBuffer(1)))


⑤reduceByKey(func):

將相同key的value調用func,func必須有兩個參數


//創建RDD
scala> val rdd5 = sc.parallelize(List("hadoop","spark","spark"))
rdd5: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[35] at parallelize at <console>:24

//查看RDD的全部內容
scala> rdd5.collect
res35: Array[String] = Array(hadoop, spark, spark)

//將RDD的每一個元素變為鍵值對(元組),//這一類元素是鍵值對的RDD,稱其為鍵值對RDD(Pair RDD)
scala> rdd5.map((_,1)).collect
res36: Array[(String, Int)] = Array((hadoop,1), (spark,1), (spark,1))

//執行reduceByKey(_+_)算子,將相同key的value相加
//_+_  ============>  (x,y)=>x+y
scala> rdd5.map((_,1)).reduceByKey(_+_).collect
res48: Array[(String, Int)] = Array((spark,2), (hadoop,1))


3、控制操作

持久化數據

①rdd.cache():

將rdd保存在內存中,以便下次使用,等同於調用rdd.persist(MEMORY_ONLY)


②rdd.persist(MEMORY_ONLY)

將rdd保存在內存中,以便下次使用


③rdd.persist(MEMORY_AND_DISK)

將rdd持久化到磁盤,表示將RDD作為反序列化的對象存儲在JVM中,如果內存不足,超出的分區將會被存放在硬盤上


④rdd.unpersist()

手動地把持久化的RDD從緩存中移除


4、行動操作/行動算子/Action

①collect

返回RDD的所有元素,如果RDD過大,則不推薦使用collect

scala> val rdd6 = sc.parallelize(Array("spark","hadoop","scala"))
rdd6: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[55] at parallelize at <console>:24

scala> rdd6.collect
res57: Array[String] = Array(spark, hadoop, scala)


②first

返回rdd的第一個元素

scala> rdd6.first
res59: String = spark


③take(n)

返回rdd的前n個元素

scala> rdd6.take(2)
res60: Array[String] = Array(spark, hadoop)

scala> rdd6.take(1)
res61: Array[String] = Array(spark)

scala> rdd6.take(3)
res62: Array[String] = Array(spark, hadoop, scala)

scala> rdd6.take(10000)
res63: Array[String] = Array(spark, hadoop, scala)


④count

返回rdd元素個數

scala> rdd.count
res64: Long = 5


⑤reduce(func)

通過函數func(輸入兩個參數並返回一個值)聚合數據集中的元素

scala> val rdd7 = sc.parallelize(List(1,2,3,4,5,6,7,8,9))
rdd7: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[58] at parallelize at <console>:24

scala> rdd7.reduce(_+_)
res66: Int = 45

scala> rdd7.reduce(_-_)
res67: Int = 17


⑥foreach(fucn)

遍歷,將數據集中的每個元素傳遞到函數func中運行

scala> rdd6.foreach(println(_))
spark
hadoop
scala


練習:

練習1.計算銷售量

現有某書店spark和hadoop書籍五天銷售量,請計算每本書5天銷售總量。

泰漲知識 | RDD編程_spark


//數據的格式
//("spark",24),("spark",44),("spark",17),("spark",22),("spark",31)
//("hadoop",16),("hadoop",33),("hadoop",21),("hadoop",22),("hadoop",18)

//定義數組
scala> var arr = Array(("spark",24),("spark",44),("spark",17),("spark",22),("spark",31),("hadoop",16),("hadoop",33),("hadoop",21),("hadoop",22),("hadoop",18))
arr: Array[(String, Int)] = Array((spark,24), (spark,44), (spark,17), (spark,22), (spark,31), (hadoop,16), (hadoop,33), (hadoop,21), (hadoop,22), (hadoop,18))

//轉換為RDD
scala> var rdd = sc.parallelize(arr)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:26

//查看RDD內容
scala> rdd.collect
res0: Array[(String, Int)] = Array((spark,24), (spark,44), (spark,17), (spark,22), (spark,31), (hadoop,16), (hadoop,33), (hadoop,21), (hadoop,22), (hadoop,18))

//相同key的值相加
scala> rdd.reduceByKey(_+_).collect
res2: Array[(String, Int)] = Array((spark,138), (hadoop,110))


練習2.詞頻統計

現有一段話:

What is Apache Spark

Apache Spark is a multi-language engine for executing data engineering data science

Apache Spark integrates with your favorite frameworks helping to scale them to thousands of machines


請統計這段話中每個單詞出現的次數。


//從文件創建RDD
scala> val rdd = sc.textFile("file:///home/centos7/word.txt")
rdd: org.apache.spark.rdd.RDD[String] = file:///home/centos7/word.txt MapPartitionsRDD[4] at textFile at:24

//查看RDD內容
scala> rdd.collect
res3: Array[String] = Array("What is Apache Spark ", Apache Spark is a multi-language engine for executing data engineering data science, Apache Spark integrates with your favorite frameworks helping to scale them to thousands of machines)

//flatMap(_.split(" ")) 拆分
scala> rdd.flatMap(_.split(" ")).collect
res4: Array[String] = Array(What, is, Apache, Spark, Apache, Spark, is, a, multi-language, engine, for, executing, data, engineering, data, science, Apache, Spark, integrates, with, your, favorite, frameworks, helping, to, scale, them, to, thousands, of, machines)

//map(x=>(x,1)) 轉換為鍵值對(key, value),給每個單詞標記一個1
scala> rdd.flatMap(_.split(" ")).map(x=>(x,1)).collect
res5: Array[(String, Int)] = Array((What,1), (is,1), (Apache,1), (Spark,1), (Apache,1), (Spark,1), (is,1), (a,1), (multi-language,1), (engine,1), (for,1), (executing,1), (data,1), (engineering,1), (data,1), (science,1), (Apache,1), (Spark,1), (integrates,1), (with,1), (your,1), (favorite,1), (frameworks,1), (helping,1), (to,1), (scale,1), (them,1), (to,1), (thousands,1), (of,1), (machines,1))

//reduceByKey(_+_) 相同key的值相加
scala> rdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect
res7: Array[(String, Int)] = Array((scale,1), (is,2), (executing,1), (Apache,3), (with,1), (data,2), (science,1), (integrates,1), (machines,1), (multi-language,1), (What,1), (them,1), (engine,1), (favorite,1), (Spark,3), (a,1), (helping,1), (to,2), (engineering,1), (frameworks,1), (of,1), (for,1), (thousands,1), (your,1))



二、分區

1、行動算子saveAsTextFile

saveAsTextFile(“路徑”)

根據路徑保存後是一個文件夾,而不是文件;在文件夾中有兩類文件:

①part-00000 ,以part開頭的文件保存着數據,part開頭的文件的個數由RDD的分區所決定,一個分區對應生成一個part文件

②_SUCCESS ,大小為0,用來表示成功


2、分區


//查看分區數量
scala> rdd.partitions.size
res10: Int = 2

//修改分區數量

scala> val rdd2 = rdd.repartition(4)
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[25] at repartition at <console>:26

scala> rdd2.partitions.size
res14: Int = 4


3、默認的分區數量取決於

進入spark-shell時的方式

spark-shell --master <master-url>

泰漲知識 | RDD編程_hadoop_02


4、在創建RDD時指定分區數量


scala> val arr = Array(1,2,3,4,5,6)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6)

//在創建RDD時指定分區數量為4
scala> val rdd = sc.parallelize(arr,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:26

scala> rdd.partitions.size
res2: Int = 4

//在創建RDD時指定分區數量為3
scala> val rdd2 = sc.textFile("file:///home/centos7/word.txt",3)
rdd2: org.apache.spark.rdd.RDD[String] = file:///home/centos7/word.txt MapPartitionsRDD[3] at textFile at:24

scala> rdd2.partitions.size
res3: Int = 3


三、鍵值對RDD(Pair RDD)

含義:RDD中的每一個元素都是鍵值對(key, value)


1、keys


返回鍵值對RDD的所有key

//創建數組
scala> var arr = Array(("spark",24),("spark",44),("hadoop",16))
arr: Array[(String, Int)] = Array((spark,24), (spark,44), (hadoop,16))

//創建鍵值對RDD
scala> valrdd = sc.parallelize(arr)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:26

//查看RDD的所有元素
scala> rdd.collect
res4: Array[(String, Int)] = Array((spark,24), (spark,44), (hadoop,16))

//keys:返回鍵值對RDD的所有key
scala> rdd.keys.collect
res6: Array[String] = Array(spark, spark, hadoop)


2、values


values:返回鍵值對RDD的所有value


//創建數組
scala> var arr = Array(("spark",24),("spark",44),("hadoop",16))
arr: Array[(String, Int)] = Array((spark,24), (spark,44), (hadoop,16))

//創建鍵值對RDD
scala> val rdd = sc.parallelize(arr)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:26

//查看RDD的所有元素
scala> rdd.collect
res4: Array[(String, Int)] = Array((spark,24), (spark,44), (hadoop,16))

//values:返回鍵值對RDD的所有value
scala> rdd.values.collect
res8: Array[Int] = Array(24, 44, 16)


3、sortByKey()和sortBy


sortByKey():根據鍵key排序

sortBy():根據指定內容排序


scala> var arr = Array(("spark",24),("javascript",44),("hadoop",16))
arr: Array[(String, Int)] = Array((spark,24), (javascript,44), (hadoop,16))

scala> var rdd = sc.parallelize(arr)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:26

scala> rdd.collect
res14: Array[(String, Int)] = Array((spark,24), (javascript,44), (hadoop,16))

//根據key按照字典序排序(降序)
scala> rdd.sortByKey(false).collect
res15: Array[(String, Int)] = Array((spark,24), (javascript,44), (hadoop,16))

//根據key按照字典序排序(升序)
scala> rdd.sortByKey().collect
res16: Array[(String, Int)] = Array((hadoop,16), (javascript,44), (spark,24))

//根據鍵值對的value排序(默認,升序)
scala> rdd.sortBy(_._2).collect
res24: Array[(String, Int)] = Array((hadoop,16), (spark,24), (javascript,44))

//根據鍵值對的key排序(默認,升序)
scala> rdd.sortBy(_._1).collect
res25: Array[(String, Int)] = Array((hadoop,16), (javascript,44), (spark,24))

//根據鍵值對的value排序(降序)
scala> rdd.sortBy(_._2,false).collect
res26: Array[(String, Int)] = Array((javascript,44), (spark,24), (hadoop,16))


4、mapValues(func)


針對鍵值對中的value執行函數func

scala> var arr = Array(("spark",24),("javascript",44),("hadoop",16))
arr: Array[(String, Int)] = Array((spark,24), (javascript,44), (hadoop,16))

scala> var rdd = sc.parallelize(arr)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:26

scala> rdd.collect
res14: Array[(String, Int)] = Array((spark,24), (javascript,44), (hadoop,16))

//給鍵值對中的每個value乘10
scala> rdd.mapValues(_*10).collect
res28: Array[(String, Int)] = Array((spark,240), (javascript,440), (hadoop,160))

//給鍵值對中的每個value加1
scala> rdd.mapValues(_+1).collect
res30: Array[(String, Int)] = Array((spark,25), (javascript,45), (hadoop,17))


5、join


join就表示內連接。對於內連接,對於給定的兩個輸入數據集(K,V1)和(K,V2),只有在兩個數據集中都存在的key才會被輸出,最終得到一個(K,(V1,V2))類型的數據集。


scala> var arr = Array(("spark",24),("javascript",44),("hadoop",16))
arr: Array[(String, Int)] = Array((spark,24), (javascript,44), (hadoop,16))

scala> var rdd = sc.parallelize(arr)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[37] at parallelize at <console>:26

scala> var arr2 = Array(("spark",24),("hadoop",16))
arr2: Array[(String, Int)] = Array((spark,24), (hadoop,16))

scala> var rdd2 = sc.parallelize(arr2)
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at parallelize at <console>:26

scala> rdd.collect
res32: Array[(String, Int)] = Array((spark,24), (javascript,44), (hadoop,16))

scala> rdd2.collect
res33: Array[(String, Int)] = Array((spark,24), (hadoop,16))

//將rdd和rdd2進行合併,兩個RDD中都有的key會被保留
scala> rdd.join(rdd2)
res34: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[41] at join at <console>:33

scala> rdd.join(rdd2).collect
res35: Array[(String, (Int, Int))] = Array((spark,(24,24)), (hadoop,(16,16)))=