Tom,DataBase,80

Tom,Algorithm,50

Tom,DataStructure,60

Jim,DataBase,90

Jim,Algorithm,60

Jim,DataStructure,80

.......

根據給定的數據在spark-shell中通過編程來計算以下內容

(1) 該系總共有多少學生;

val lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
val par = lines.map(row=>row.split(",")(0))
val distinct_par = par.distinct() //去重操作
distinct_par.count //取得總數

  

答案為:265 人

(2) 該系共開設來多少門課程;

val lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
val par = lines.map(row=>row.split(",")(1))//根據,切分的每行數據的第二列進行map
val distinct_par = par.distinct()//去重
distinct_par.count//取總數

  答案為 8 門

(3) Tom 同學的總成績平均分是多少;

val lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
val pare = lines.filter(row=>row.split(",")(0)=="Tom")
pare.foreach(println)
Tom,DataBase,26
Tom,Algorithm,12
Tom,OperatingSystem,16
Tom,Python,40
Tom,Software,60
pare.map(row=>(row.split(",")(0),row.split(",")(2).toInt))
.mapValues(x=>(x,1)).//mapValues是對值的操作,不操作key使數據變成(Tom,(26,1))
reduceByKey((x,y) => (x._1+y._1,x._2 + y._2))//接着需要按key進行reduce,讓key合併當將Tom進行reduce後 這裏的(x,y) 表示的是(26,1)(12,1)
.mapValues(x => (x._1 / x._2))//接着要對value進行操作,用mapValues()就行啦
.collect()
//res9: Array[(String, Int)] = Array((Tom,30))

  Tom 同學的平均分為 30 分

(4) 求每名同學的選修的課程門數;

val lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
val pare = lines.map(row=>(row.split(",")(0),row.split(",")(1)))
pare.mapValues(x => (x,1))//數據變為(Tom,(DataBase,1)),(Tom,(Algorithm,1)),(Tom,(OperatingSystem,1)),(Tom,(Python,1)),(Tom,(Software,1))
.reduceByKey((x,y) => (" ",x._2 + y._2))//數據變為(Tom,( ,5))
.mapValues(x =>x._2)//數據變為(Tom, 5)
.foreach(println)

  答案共 265 行

(5) 該系 DataBase 課程共有多少人選修

val lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
val pare = lines.filter(row=>row.split(",")(1)=="DataBase")filter方法允許你提供一個判斷條件(函數),來過濾集合元素
pare.count
res1: Long = 126

  答案為 126 人

(6) 各門課程的平均分是多少;

val lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
val pare = lines.map(row=>(row.split(",")(1),row.split(",")(2).toInt))
pare.mapValues(x=>(x,1)).reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1 / x._2)).collect()
res0: Array[(String, Int)] = Array((Python,57), (OperatingSystem,54), (CLanguage,50),
(Software,50), (Algorithm,48), (DataStructure,47), (DataBase,50), (ComputerNetwork,51))

  答案為: (CLanguage,50) (Python,57) (Software,50) (OperatingSystem,54) (Algorithm,48) (DataStructure,47) (DataBase,50) (ComputerNetwork,51)

(7)使用累加器計算共有多少人選了 DataBase 這門課。

val lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
val pare = lines.filter(row=>row.split(",")(1)=="DataBase").map(row=>(row.split(",")(1),1))
val accum = sc.longAccumulator("My Accumulator")//累加器函數Accumulator
pare.values.foreach(x => accum.add(x))
accum.value
res19: Long = 126

  答案:共有 126 人

2.編寫獨立應用程序實現數據去重

對於兩個輸入文件 A 和 B,編寫 Spark 獨立應用程序,對兩個文件進行合併,並剔除其
中重複的內容,得到一個新文件 C。下面是輸入文件和輸出文件的一個樣例,供參考。
輸入文件 A 的樣例如下:
20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 z
輸入文件 B 的樣例如下:
20170101 y
20170102 y
20170103 x
20170104 z
20170105 y
根據輸入的文件 A 和 B 合併得到的輸出文件 C 的樣例如下:
20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 z

  eclipse代碼

package my.scala
import org.apache.spark.{SparkConf, SparkContext}
object case2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("reduce")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    //獲取數據
    val two = sc.textFile("hdfs://192.168.85.128:9000/quchong")
    two.filter(_.trim().length>0) //需要有空格。
        .map(line=>(line.trim,""))//全部值當key,(key value,"")
          .groupByKey()//groupByKey,過濾重複的key value ,發送到總機器上彙總
              .sortByKey() //按key value的自然順序排序
                  .keys.collect().foreach(println) //所有的keys變成數組再輸出
    //第二種有風險
    two.filter(_.trim().length>0)
          .map(line=>(line.trim,"1"))
            .distinct()
                .reduceByKey(_+_)
                    .sortByKey()
                        .foreach(println)

    //reduceByKey,在本機suffle後,再發送一個總map,發送到一個總機器上彙總,(彙總要壓力小)
    //groupByKey,發送本機所有的map,在一個機器上彙總(彙總壓力大)
    //如果數據在不同的機器上,則會出現先重複數據,distinct,reduceBykey,只是在本機上去重,謹慎一點的話,在reduceByKey後面需要加多一個distinct

  }
}

  

3.編寫獨立應用程序實現求平均值問題
每個輸入文件表示班級學生某個學科的成績,每行內容由兩個字段組成,第一個是學生
名字,第二個是學生的成績;編寫 Spark 獨立應用程序求出所有學生的平均成績,並輸出到
一個新文件中。下面是輸入文件和輸出文件的一個樣例,供參考。
Algorithm 成績:
小明 92
小紅 87
小新 82
小麗 90
Database 成績:
小明 95
小紅 81
小新 89
小麗 85
Python 成績:
小明 82
小紅 83
小新 94
小麗 91
平均成績如下:
 (小紅,83.67)
 (小新,88.33)
 (小明,89.67)
(小麗,88.67)

  

package my.scala
import org.apache.spark.{SparkConf, SparkContext}
object pingjunzhi {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("reduce")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
  
val fourth = sc.textFile("hdfs://192.168.85.128:9000/pingjunzhi")
 
val res = fourth.filter(_.trim().length>0).map(line=>(line.split("\t")(0).trim(),line.split("\t")(1).trim().toInt)).groupByKey().map(x => {
   var num = 0.0
   var sum = 0 
   for(i <- x._2){
    sum = sum + i
    num = num +1
   }
   val avg = sum/num 
   val format = f"$avg%1.2f".toDouble
   (x._1,format)
 }).collect.foreach(x => println(x._1+"\t"+x._2))
  }
}

  

 

文學使思想充滿血與肉,他比科學和哲學更能給予思想以巨大的明確性和説明性。