文章目錄

  • RDD簡介
  • RDD常用的算子
  • 通過並行化scala集合創建RDD
  • union求並集
  • intersection求交集
  • join(連接)
  • groupByKey
  • cogroup
  • cartesian笛卡爾積
  • WordCount
  • RDD中常見的Action方法
  • collect
  • reduce
  • count
  • top排序取前兩個
  • take從頭開始取前兩個
  • first(similer to take(1))從頭開始取第一個
  • takeOrdered取前三個
  • RDD分區的數量取決於那些因素
  • RDD中複雜的算子
  • mapPartitionsWithIndex
  • aggregate(初始值)(運算邏輯)
  • AggregateByKey
  • collectAsMap
  • countByKey
  • filterByRange
  • flatMapValues
  • foldByKey
  • foreachPartition
  • foreach
  • RDD分類
  • Transformation包括
  • Action包括

RDD簡介

Spark問題定位並且解決_#Spark


使用spark-submit客户端實例:

Spark問題定位並且解決_List_02


Spark問題定位並且解決_Spark問題定位並且解決_03


注意:如果執行的操作是TransForMation類型,即便有錯誤也可以執行(這裏的錯誤指的是非語法錯誤),等到執行Action操作時,才會拋出異常。

  • Internally, each RDD is characterized by five main properties:(RDD特徵)

  • A list of partitions
  • A function for computing each split
  • A list of dependencies on other RDDs
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for
  • an HDFS file)

1.RDD是一個基本的抽象,操作RDD就像操作一個本地集合一樣,降低了編程的複雜度。
RDD的方法分為兩類:Transformation(懶 lazy), Action
RDD不存在真正要計算的數據,而是記錄了RDD的轉換關係(調用了什麼方法,傳輸什麼函數)

創建RDD有那些方式呢?

1》通過外部的存儲系統創建RDD

2》將Driver的Scala集合通過並行化的方式編程RDD(測試,實驗),集合中存放的數據是有限的,生產環境沒有意義。如下圖

3》調用一個已經存在了的RDD的Transformation,會產生一個新的RDD

RDD的Transformation的特點:

1》lazy

2》生成新的RDD

Spark問題定位並且解決_d3_04


Spark問題定位並且解決_數據_05

RDD常用的算子

通過並行化scala集合創建RDD

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(*2).sortBy(x=>x,true)
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(
*2).sortBy(x=>x+"",true)

Spark問題定位並且解決_Spark問題定位並且解決_06


注意:sortBy()方法中只是傳遞的排序規則,並不會改變新生成RDD的數據類型。

val rdd4 = sc.parallelize(Array(“a b c”, “d e f”, “h i j”))

rdd4.flatMap(_.split(’ ')).collect

Spark問題定位並且解決_d3_07

val rdd5 = sc.parallelize(List(List(“a b c”, “a b b”),List(“e f g”, “a f g”), List(“h i j”, “a a b”)))

Spark問題定位並且解決_#Spark_08


注意:第一個flatMap方法是RDD的方法,第二個flatMap方法是List的方法。

union求並集

注意類型要一致

val rdd6 = sc.parallelize(List(5,6,4,7))

val rdd7 = sc.parallelize(List(1,2,3,4))

val rdd8 = rdd6.union(rdd7)

rdd8.distinct.sortBy(x=>x).collect

執行結果:res9: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7)

Spark問題定位並且解決_Spark問題定位並且解決_09

intersection求交集

val rdd9 = rdd6.intersection(rdd7)

Spark問題定位並且解決_List_10

join(連接)

val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 2), (“kitty”, 3)))

val rdd2 = sc.parallelize(List((“jerry”, 9), (“tom”, 8), (“shuke”, 7), (“tom”, 2)))

val rdd3 = rdd1.join(rdd2)

res0: Array[(String, (Int, Int))] = Array((tom,(1,8)), (tom,(1,2)), (jerry,(2,9)))

#左外連接

val rdd3 = rdd1.leftOuterJoin(rdd2)

res0: Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(2))), (tom,(1,Some(8))), (kitty,(3,None)), (jerry,(2,Some(9))))

#右外連接

val rdd3 = rdd1.rightOuterJoin(rdd2)

res1: Array[(String, (Option[Int], Int))] = Array((shuke,(None,7)), (tom,(Some(1),8)), (tom,(Some(1),2)), (jerry,(Some(2),9)))

Spark問題定位並且解決_數據_11

groupByKey

val rdd3 = rdd1 union rdd2
rdd3.groupByKey
res4: Array[(String, Iterable[Int])] = Array((shuke,CompactBuffer(7)), (tom,CompactBuffer(1, 8, 2)), (kitty,CompactBuffer(3)), (jerry,CompactBuffer(2, 9)))

rdd3.groupByKey.map(x=>(x._1,x.2.sum))
rdd3.groupByKey.mapValues(
.sum).collect

Spark問題定位並且解決_#Spark_12

cogroup

val rdd1 = sc.parallelize(List((“tom”, 1), (“tom”, 2), (“jerry”, 3), (“kitty”, 2)))

val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 1), (“shuke”, 2)))

val rdd3 = rdd1.cogroup(rdd2)

val rdd4 = rdd3.map(t=>(t._1, t._2._1.sum + t._2._2.sum))

Spark問題定位並且解決_Spark問題定位並且解決_13

cartesian笛卡爾積

val rdd1 = sc.parallelize(List(“tom”, “jerry”))

val rdd2 = sc.parallelize(List(“tom”, “kitty”, “shuke”))

val rdd3 = rdd1.cartesian(rdd2)

Spark問題定位並且解決_d3_14

WordCount

sc.textFile("/root/words.txt").flatMap(x=>x.split(" ")).map((,1)).reduceByKey(+).sortBy(.2,false).collect
sc.textFile("/root/words.txt").flatMap(x=>x.split(" ")).map((
,1)).groupByKey.map(t=>(t._1, t._2.sum)).collect

RDD中常見的Action方法

val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)

collect

rdd1.collect

reduce

val r = rdd1.reduce(+)

count

rdd1.count

top排序取前兩個

rdd1.top(2)

take從頭開始取前兩個

rdd1.take(2)

first(similer to take(1))從頭開始取第一個

rdd1.first

takeOrdered取前三個

rdd1.takeOrdered(3)

RDD分區的數量取決於那些因素

1.如果是將Driver端的Scala集合並行化創建RDD,並沒有指定RDD的分區,RDD的分區是為該app分配的中的核數。
2.從hdfs中讀取數據創建RDD,並且設置允許最小分區數量是1,RDD的分區數量為輸入切片的數量(一個數據源文件生成一個分區文件)。不設置最小允許分區的數量,即spark調用textFile時會默認傳入2(一個數據源文件生成2個分區文件)。
3.需要讀取的文件數量較多,輸入切片的數量等同於生成新分區的數量。
4.理想文件大小=數據源總大小/最小分區數量;第n個數據源文件大小/理想文件大小>1.1;將為第n個數據源文件分配2個輸入切片;新的分區文件數量>數據源文件數量。

RDD中複雜的算子

Spark問題定位並且解決_Spark問題定位並且解決_15


RDD上面有3個分區,每個分區中記錄的是讀取那部分數據,SparkSubmit會為每個分區生成對應一個Task,把Task發送到Executor,在Executor上面執行,Executor會一次拿出來一個分區。

mapPartitionsWithIndex

transformation 一次拿出來一個分區,(分區中並沒有數據,而是記錄要讀取那些數據,真正生成的Task會讀取多條數據),並且可以將分區的編號取出來。

功能:取分區中對應的數據時,還可以將分區的編號取出來,這樣就可以知道哪一個分區中有那些數據。

val func = (index: Int, it: Iterator[Int]) => {

it.map(e => s"part: $index, ele: $e")

}

Spark問題定位並且解決_Spark問題定位並且解決_16

aggregate(初始值)(運算邏輯)

action

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)

Spark問題定位並且解決_d3_17


第一個函數在分區內做局部聚合,第二個函數將不用分區的值進行疊加。

Spark問題定位並且解決_#Spark_18


每個分區中計算的時候會運用一下初始值,最後聚合的時候也會運用初始值。

rdd1.aggregate(5)(math.max(_, _), _ + _)

5=math.max(1,2,3,4,5) 9=math.max(5,5,6,7,8,9) 分區內

19 = 5(初始值) + 5(max) + 9(max) 聚合val rdd2 = sc.parallelize(List(“a”,“b”,“c”,“d”,“e”,“f”),2)

Spark問題定位並且解決_#Spark_19


出現上面這種情況的原因是:每一個Task都是並行計算的,那個先返回不能確定。

Spark問題定位並且解決_List_20


val rdd3 = sc.parallelize(List(“12”,“23”,“345”,“4567”),2)

Spark問題定位並且解決_Spark問題定位並且解決_21

val rdd4 = sc.parallelize(List(“12”,“23”,“345”,""),2)

rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)

Spark問題定位並且解決_d3_22


第一個分區:math.min("".length, “12”.length).toString => “0”

math.min(“0”.length, “23”.length).toString => “1”

第一分區結果為:“1”

第二個分區:math.min("".length, “345”.length).toString => “0”

math.min(“0”.length. “”.length).toString => “0”

第二分區結果為:“0”

所以結果有兩種情況:“10"或"01”val rdd5 = sc.parallelize(List(“12”,“23”,"",“345”),2)

rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)

Spark問題定位並且解決_List_23

AggregateByKey

transformation

val pairRDD = sc.parallelize(List( (“cat”,2), (“cat”, 5), (“mouse”, 4),(“cat”, 12), (“dog”, 12), (“mouse”, 2)), 2)

pairRDD.aggregateByKey(0)(math.max(_, _), _ + ).collect

Spark問題定位並且解決_d3_24


第一個
+是將同一分區,相同key的value向加。第二個+_是將不同分區內的,相同key的value相加。

注意:AggregateByKey方法的初始值只會在第一次相加的時候應用一次。

Spark問題定位並且解決_d3_25

collectAsMap

作用:將結果以Map的形式收集到客户端。

val rdd = sc.parallelize(List((“a”, 1), (“b”, 2)))

rdd.collectAsMap

Spark問題定位並且解決_List_26


Spark問題定位並且解決_d3_27

countByKey

val rdd1 = sc.parallelize(List((“a”, 1), (“b”, 2), (“b”, 2), (“c”, 2), (“c”, 1)))

rdd1.countByKey

rdd1.countByValue

Spark問題定位並且解決_List_28

filterByRange

作用:過濾指定範圍內的數據,前後都是閉區間。

val rdd1 = sc.parallelize(List((“e”, 5), (“c”, 3), (“d”, 4), (“c”, 2), (“a”, 1)))

val rdd2 = rdd1.filterByRange(“b”, “d”)

rdd2.collect

Spark問題定位並且解決_List_29

flatMapValues

val rdd3 = sc.parallelize(List((“a”, “1 2”), (“b”, “3 4”)))

rdd3.flatMapValues(_.split(" "))

Spark問題定位並且解決_d3_30

foldByKey

作用:將key相同的value進行聚合。

val rdd1 = sc.parallelize(List(“dog”, “wolf”, “cat”, “bear”), 2)

val rdd2 = rdd1.map(x => (x.length, x))

val rdd3 = rdd2.foldByKey("")(+)

等同於:rdd2.reduceByKey(+).collect

等同於:rdd2.aggregateByKey(+).collect

聚合操作底層方法是:combineByKeyWithClassTag

Spark問題定位並且解決_Spark問題定位並且解決_31

foreachPartition

action 每次打印一個分區的數據
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
rdd1.foreachPartition(x => println(x.reduce(_ + _)))
rdd1.foreachPartition(it => it.foreach(x => println(x * 10000)))
數據會打印到Executor的日誌文件中。

foreach

每次打印一條數據,在日誌文件中。

RDD分類

Transformation包括

aggregateByKey
reduceByKey
filter
flatMap
map
mapPartition
mapPartitionWithIndex

Action包括

collect
aggregate
saveAsTextFile
foreach
foreachPartition

  • A list of partitions (一系列分區,分區有編號,有順序的)
  • A function for computing each split (每一個切片都會有一個函數作業在上面用於對數據進行處理)
  • A list of dependencies on other RDDs (RDD和RDD之間存在依賴關係)
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    (可選,key value類型的RDD才有RDD[(K,V)])如果是kv類型的RDD,會一個分區器,默認是hash-partitioned
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for
  • an HDFS file)
    (可以,如果是從HDFS中讀取數據,會得到數據的最優位置(向Namenode請求元數據))