目錄


  • 一、Spark Core
  • 1、什麼是Spark?特點
  • 二、安裝和部署Spark、Spark 的 HA
  • 1、spark體系結構
  • 2、spark的搭建
  • 3、Spark的 HA
  • 三、執行Spark的任務:兩個工具
  • 1、spark-submit:用於提交Spark的任務
  • 2、spark-shell 相當於REPL
  • 四、WordCount(scala版本和java版本)
  • 1、scala版本的WordCount
  • 2、java版本的WordCount
  • 五、分析Spark的任務流程
  • 1、分析WordCount程序處理過程
  • 2、Spark調度任務的過程
  • 六、RDD和RDD特性、RDD的算子
  • 1、RDD:彈性分佈式數據集
  • 2、 算子
  • 3、RDD的集合運算
  • 4、分組操作:reduceByKey
  • 5、cogroup
  • 6、reduce操作(Action)
  • 7、需求:按照value排序
  • 七、RDD的高級算子
  • 1、mapPartitionsWithIndex
  • 2、aggregate
  • 八、編程案例
  • 1、分析日誌
  • 2、創建自定義分區
  • 3、使用JDBCRDD 操作數據庫
  • 4、操作數據庫:把結果存放到數據庫中


Spark Core

Spark生態圈:
Spark Core : RDD(彈性分佈式數據集)
Spark SQL
Spark Streaming
Spark MLLib :協同過濾,ALS,邏輯迴歸等等 –> 機器學習
Spark Graphx : 圖計算

一、Spark Core

1、什麼是Spark?特點

Apache Spark™ is a unified analytics engine for large-scale data processing.
特點:快、易用、通用性、兼容性(完全兼容Hadoop)

快:快100倍(Hadoop 3 之前)
易用:支持多種語言開發
通用性:生態系統全
易用性:兼容Hadoop

二、安裝和部署Spark、Spark 的 HA

1、spark體系結構

Spark的運行方式

Yarn

Standalone:本機調試(demo)

Worker:從節點。每個服務器上,資源和任務的管理者。只負責管理一個節點

執行過程:
一個Worker 有多個 Executor。 Executor是任務的執行者,按階段(stage)劃分任務。————> RDD

客户端:Driver Program 提交任務到集羣中
1)spark-submit
2)spark-shell

2、spark的搭建

1)準備工作:JDK 配置主機名 免密碼登錄

2)偽分佈式模式
在一台虛擬機上模擬分佈式環境(Master和Worker在一個節點上)
配置spark-env.sh
vi spark-env.sh

配置slaves
vi slaves
hsiehchou121

瀏覽器訪問hsiehchou121:8080

在spark中使用scala語言

3)全分佈式環境
修改slave文件 拷貝到其他三台服務器 啓動

3、Spark的 HA

回顧HA(高可用)
(*)HDFS Yarn Hbase Spark 主從結構
(*)單點故障

(1)基於文件目錄的單點恢復
主要用於開發或測試環境。當spark提供目錄保存spark Application和worker的註冊信息,並將他們的恢復狀態寫入該目錄中,這時,一旦Master發生故障,就可以通過重新啓動Master進程(sbin/start-master.sh),恢復已運行的spark Application和worker的註冊信息

基於文件系統的單點恢復,主要是在spark-en.sh裏對SPARK_DAEMON_JAVA_OPTS設置

配置參數

參考值

spark.deploy.recoveryMode

設置為FILESYSTEM開啓單點恢復功能,默認值:NONE

spark.deploy.recoveryDirectory

Spark 保存恢復狀態的目錄

參考:
export SPARK_DAEMON_JAVA_OPTS=”-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/hd/spark-2.1.0-bin-hadoop2.7/recovery”

(*)本質:還是隻有一個主節點Master,創建了一個恢復目錄,保存集羣狀態和任務的信息
當Master掛掉,重新啓動時,會從恢復目錄下讀取狀態信息,恢復出來原來的狀態

用途:這個只用於開發和測試,但是生產使用用zookeeper

(2)基於Zookeeper :和Hadoop類似
ZooKeeper提供了一個Leader Election機制,利用這個機制可以保證雖然集羣存在多個Master,但是隻有一個是Active的,其他的都是Standby。當Active的Master出現故障時,另外的一個Standby Master會被選舉出來。由於集羣的信息,包括Worker, Driver和Application的信息都已經持久化到ZooKeeper,因此在切換的過程中只會影響新Job的提交,對於正在進行的Job沒有任何的影響

配置參數

參考值

spark.deploy.recoveryMode

設置為ZOOKEEPER開啓單點恢復功能,默認值:NONE

spark.deploy.zookeeper.url

ZooKeeper集羣的地址

spark.deploy.zookeeper.dir

Spark信息在ZK中的保存目錄,默認:/spark

參考:
export SPARK_DAEMON_JAVA_OPTS=”-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hsiehchou121:2181,hsiehchou122:2181,hsiehchou123:2181,hsiehchou124:2181 -Dspark.deploy.zookeeper.dir=/spark”

(*)複習一下zookeeper:
相當於一個數據庫,把一些信息存放在zookeeper中,比如集羣的信息
數據同步功能,選舉功能,分佈式鎖功能

數據同步:給一個節點中寫入數據,可以同步到其他節點

選舉:Zookeeper中存在不同的角色,Leader Follower。如果Leader掛掉,重新選舉Leader

分佈式鎖:秒殺。以目錄節點的方式來保存數據

修改 spark-env.sh

同步到其他三台服務器
[root@hsiehchou121 spark-2.1.0-bin-hadoop2.7]# scp conf/spark-env.sh hsiehchou122:/root/hd/spark-2.1.0-bin-hadoop2.7/conf
[root@hsiehchou121 spark-2.1.0-bin-hadoop2.7]# scp conf/spark-env.sh hsiehchou123:/root/hd/spark-2.1.0-bin-hadoop2.7/conf
[root@hsiehchou121 spark-2.1.0-bin-hadoop2.7]# scp conf/spark-env.sh hsiehchou124:/root/hd/spark-2.1.0-bin-hadoop2.7/conf

在hsiehchou121 start-all hsiehchou121 master hsiehchou122 Worker hsiehchou123 Worker hsiehchou124 Worker
在hsiehchou121 start-master hsiehchou121 master hsiehchou122 master(standby) hsiehchou122 Worker hsiehchou123 Worker hsiehchou124 Worker

在hsiehchou121 上kill master
hsiehchou122 master(Active) hsiehchou122 Worker hsiehchou123 Worker hsiehchou124 Worker

在網頁http://192.168.116.122:8080/ 可以看到相應信息

三、執行Spark的任務:兩個工具

1、spark-submit:用於提交Spark的任務

任務:jar

舉例:蒙特卡洛求PI(圓周率)

–class指明主程序的名字

其中100指定執行的次數

2、spark-shell 相當於REPL

spark-shell是Spark自帶的交互式Shell程序,方便用户進行交互式編程,用户可以在該命令行下用scala編寫spark程序
(*)啓動Spark Shell:spark-shell
也可以使用以下參數:
參數説明:

例如:

注意:
如果啓動spark shell時沒有指定master地址,但是也可以正常啓動spark shell和執行spark shell中的程序,其實是啓動了spark的local模式,該模式僅在本機啓動一個進程,沒有與集羣建立聯繫

作為一個獨立的Application運行
兩種模式:
(1)本地模式
spark-shell 後面不接任何參數,代表本地模式
./spark-shell
Spark context available as ‘sc’ (master = local[*], app id = local-1554372019995).
sc 是 SparkContext 對象名。 local[*] 代表本地模式,不提交到集羣中運行

(2)集羣模式

提交到集羣運行
Spark context available as ‘sc’ (master = spark://hsiehchou121:7077, app id = app-20190404190030-0000).

master = spark://hsiehchou121:7077
Spark session available as ‘spark’
Spark Session 是 2.0 以後提供的,利用 SparkSession 可以訪問spark所有組件

示例:WordCount程序
程序如下:

説明:
sc是SparkContext對象,該對象時提交spark程序的入口
textFile(“hdfs://192.168.116.121:9000/data.txt”)是hdfs中讀取數據
flatMap(_.split(” “))先map在壓平
map((_,1))將單詞和1構成元組
reduceByKey(+)按照key進行reduce,並將value累加
saveAsTextFile(“hdfs://192.168.116.121:9000/output/wc”)將結果寫入到hdfs中

(*)處理本地文件,把結果打印到屏幕上
vi /root/hd/tmp_files/test_WordCount.txt
I love China
I love Jiangsu
Jiangsu is a beautiful place in China

(*)處理HDFS文件,結果保存在hdfs上

-rw-r–r– 3 root supergroup 0 2019-04-04 19:12 /out/0404/test_WordCount/_SUCCESS
-rw-r–r– 3 root supergroup 16 2019-04-04 19:12 /out/0404/test_WordCount/part-00000
-rw-r–r– 3 root supergroup 65 2019-04-04 19:12 /out/0404/test_WordCount/part-00001

_SUCCESS 代表程序執行成功

part-00000 part-00001 結果文件,分區。裏面內容不重複

(*)單步運行WordCount —-> RDD

RDD 彈性分佈式數據集
(1)依賴關係 : 寬依賴和窄依賴
(2)算子:
函數:
Transformation : 延時計算 map flatMap textFile
Action : 立即觸發計算 collect

説明:scala複習
(*)flatten:把嵌套的結果展開
scala> List(List(2,4,6,8,10),List(1,3,5,7,9)).flatten
res21: List[Int] = List(2, 4, 6, 8, 10, 1, 3, 5, 7, 9)

(*)flatmap : 相當於一個 map + flatten
scala> var myList = List(List(2,4,6,8,10),List(1,3,5,7,9))
myList: List[List[Int]] = List(List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9))

scala> myList.flatMap(x=>x.map(_*2))
res22: List[Int] = List(4, 8, 12, 16, 20, 2, 6, 10, 14, 18)

myList.flatMap(x=>x.map(_*2))

執行過程:
1、將 List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9) 調用 map(_*2) 方法。x 代表一個List
2、flatten
3、在IDE中開發scala版本和Java版本的WorkCount

四、WordCount(scala版本和java版本)

1、scala版本的WordCount

新建一個工程,把jar引入到工程中

export Demo1.jar 點擊下一步,把jar包上傳到服務器上/root/hd/tmp_files/下

在spark裏面的bin目錄下輸入

2、java版本的WordCount

五、分析Spark的任務流程

1、分析WordCount程序處理過程
2、Spark調度任務的過程

提交到及羣眾運行任務時,spark執行任務調度

六、RDD和RDD特性、RDD的算子

1、RDD:彈性分佈式數據集

(*)Spark中最基本的數據抽象
(*)RDD的特性

  • Internally, each RDD is characterized by five main properties:
    *
  • A list of partitions
    1)是一組分區
    RDD由分區組成,每個分區運行在不同的Worker上,通過這種方式來實現分佈式計算
  • A function for computing each split
    在RDD中,提供算子處理每個分區中的數據
  • -A list of dependencies on other RDDs
    RDD存在依賴關係:寬依賴和窄依賴
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    可以自定義分區規則來創建RDD
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
    優先選擇離文件位置近的節點來執行

如何創建RDD
(1)通過SparkContext.parallelize方法來創建

(2)通過外部數據源來創建

2、 算子

1)Transformation
map(func):相當於for循環,返回一個新的RDD

filter(func):過濾
flatMap(func):flat+map 壓平

mapPartitions(func):對RDD中的每個分區進行操作
mapPartitionsWithIndex(func):對RDD中的每個分區進行操作,可以取到分區號

sample(withReplacement, fraction, seed):採樣

集合運算
union(otherDataset):對源RDD和參數RDD求並集後返回一個新的RDD
intersection(otherDataset):對源RDD和參數RDD求交集後返回一個新的RDD

distinct([numTasks])):去重

聚合操作:group by
groupByKey([numTasks]) :在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]):在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設置
aggregateByKey(zeroValue)(seqOp,combOp,[numTasks]):按照key進行聚合

排序
sortByKey([ascending], [numTasks]):在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]):與sortByKey類似,但是更靈活

join(otherDataset, [numTasks]):在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]):在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable,Iterable))類型的RDD
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)

重分區:
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)

舉例:
(1)創建一個RDD,每個元素乘以2,再排序

過濾出大於20的元素:

(2)字符串(字符)類型的RDD

3、RDD的集合運算
4、分組操作:reduceByKey

reduceByKey will provide much better performance.
官方不推薦使用 groupByKey 推薦使用 reduceByKey

5、cogroup

在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable,Iterable))類型的RDD

對兩個RDD中的KV元素,每個RDD中相同key中的元素分別聚合成一個集合。與reduceByKey不同的是針對兩個RDD中相同的key的元素進行合併,與groupByKey返回值上與區別

6、reduce操作(Action)

聚合操作

7、需求:按照value排序

做法:
1、交換,把key 和 value交換,然後調用sortByKey方法
2、再次交換

(2)Action
reduce(func):通過func函數聚集RDD中的所有元素,這個功能必須是課交換且可並聯的

collect():在驅動程序中,以數組的形式返回數據集的所有元素
count():返回RDD的元素個數
first():返回RDD的第一個元素(類似於take(1))
take(n):返回一個由數據集的前n個元素組成的數組

takeSample(withReplacement,num, [seed]):返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子

takeOrdered(n, [ordering]):takeOrdered和top類似,只不過以和top相反的順序返回元素

saveAsTextFile(path):將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對於每個元素,Spark將會調用toString方法,將它裝換為文件中的文本

saveAsSequenceFile(path) :將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統

saveAsObjectFile(path) :saveAsObjectFile用於將RDD中的元素序列化成對象,存儲到文件中

countByKey():針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。

foreach(func):在數據集的每一個元素上,運行函數func進行更新。
與map類似,沒有返回值

3、特性
1)RDD的緩存機制
RDD通過persist方法或cache方法可以將前面的計算結果緩存,但是並不是這兩個方法被調用時立即緩存,而是觸發後面的action時,該RDD將會被緩存在計算節點的內存中,並供後面重用

通過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的

緩存有可能丟失,或者存儲存儲於內存的數據由於內存不足而被刪除,RDD的緩存容錯機制保證了即使緩存丟失也能保證計算的正確執行。通過基於RDD的一系列轉換,丟失的數據會被重算,由於RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,並不需要重算全部Partition

(*)作用:提高性能
(*)使用:標識RDD可以被緩存 persist cache
(*)可以緩存的位置:

舉例:測試數據,92萬條
進入spark-shell命令

2)RDD的容錯機制:通過檢查點來實現
檢查點(本質是通過將RDD寫入Disk做檢查點)是為了通過lineage(血統)做容錯的輔助,lineage過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果之後有節點出現問題而丟失分區,從做檢查點的RDD開始重做Lineage,就會減少開銷

設置checkpoint的目錄,可以是本地的文件夾、也可以是HDFS。一般是在具有容錯能力,高可靠的文件系統上(比如HDFS, S3等)設置一個檢查點路徑,用於保存檢查點數據

/**

  • Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
  • directory set with SparkContext#setCheckpointDir and all references to its parent
  • RDDs will be removed. This function must be called before any job has been
  • executed on this RDD. It is strongly recommended that this RDD is persisted in
  • memory, otherwise saving it on a file will require recomputation.
    */

(*)複習檢查點:
HDFS中的檢查點:有SecondaryNamenode來實現日誌的合併

(*)RDD的檢查點:容錯
概念:血統 Lineage
理解:表示任務執行的生命週期
WordCount textFile —> redceByKey

如果血統越長,越容易出錯

假如有檢查點,可以從最近的一個檢查點開始,往後面計算。不用重頭計算

(*)RDD檢查點的類型
(1)基於本地目錄:需要將Spark shell 或者任務運行在本地模式上(setMaster(“local”))
開發和測試

(2)HDFS目錄:用於生產
sc.setCheckPointDir(目錄)

舉例:設置檢查點

設置檢查點目錄:
scala> sc.setCheckpointDir(“hdfs://192.168.116.121:9000/sparkchkpt”)

標識rdd1可以執行檢查點操作
scala> rdd1.checkpoint

scala> rdd1.count
res2: Long = 921911

(3)依賴關係:寬依賴,窄依賴
劃分任務執行的stage
RDD和它依賴的父RDD(s)的關係有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)

窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用(一(父)對一(子))
總結:窄依賴我們形象的比喻為獨生子女

寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition(一(父)對多(子))
總結:窄依賴我們形象的比喻為超生

DAG(Directed Acyclic Graph)叫做有向無環圖,原始的RDD通過一系列的轉換就就形成了DAG,根據RDD之間的依賴關係的不同將DAG劃分成不同的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,由於有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,因此寬依賴是劃分Stage的依據

七、RDD的高級算子

1、mapPartitionsWithIndex

對RDD中的每個分區(帶有下標)進行操作,下標用index表示
通過這個算子,我們可以獲取分區號

def mapPartitionsWithIndex<a href="
f: %28Int, Iterator%5bT%5d%29 ⇒ Iterator%5bU%5d,
preservesPartitioning: Boolean = false”>U(implicit arg0: ClassTag[U]): RDD[U]

通過將函數應用於此RDD的每個分區來返回新的RDD,同時跟蹤原始分區的索引

preservesPartitioning指輸入函數是否保留分區器,除非是一對RDD並且輸入函數不修改keys,否則應該是false

參數:f是個函數參數 f 中第一個參數是Int,代表分區號,第二個Iterator[T]代表分區中的元素

舉例:把分區中的元素,包括分區號,都打印出來

2、aggregate

聚合操作。類似於分組
(*)先對局部進行聚合操作,再對全局進行聚合操作

調用聚合操作

分析結果:初始值是100,代表每個分區多了一個100
全局操作,也多了一個100
100+100+100 = 300

對RDD中的元素進行求和

  1. RDD.map
  2. 聚合操作(效率大於map)

(*)對字符串操作

結果分析:

  1. *abc *def
  2. **def*abc

(*)複雜的例子
1)

執行過程:
第一個分區:
第一次比較: “” “12” 長度最大值 2 2–>”2”
第二次比較: “2” “23” 長度最大值 2 2–>”2”

第二個分區:
第一次比較: “” “345” 長度最大值 3 3–>”3”
第二次比較: “3” “4567” 長度最大值 4 4–>”4”
結果:24 或者42

2)

執行過程:
第一個分區:
第一次比較: “” “12” 長度最小值 0 0–>”0”
第二次比較: “0” “23” 長度最小值 1 1–>”1”

第二個分區:
第一次比較: “” “345” 長度最小值 0 0–>”0”
第二次比較: “0” “4567” 長度最小值 1 1–>”1”

3)aggregateByKey:類似於aggregate,區別:操作的是 key value 的數據類型

1.將每個動物園(分區)中,動物數最多的動物,進行求和
動物園0
[partId : 0 , value = (cat,2) ], [partId : 0 , value = (cat,5) ], [partId : 0 , value = (mouse,4) ],

動物園1
[partId : 1 , value = (cat,12) ], [partId : 1 , value = (dog,12) ], [partId : 1 , value = (mouse,2) ])

2.將所有動物求和

aggregateByKey效率更高

4)coalesce與repartition
與分區有關
都是對RDD進行重分區

區別:
coalesce 默認不會進行Shuffle 默認 false 如需修改分區,需置為true

repartition 會進行Shuffle

5)其他高級算子
比較好的高級算子的博客(推薦)
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

八、編程案例

1、分析日誌

需求:找到訪問量最高的兩個網頁
(*)第一步:對網頁的訪問量求和
(*)第二步:排序,降序
日誌數據
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] “GET /MyDemoWeb/ HTTP/1.1” 200 259
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] “GET /MyDemoWeb/head.jsp HTTP/1.1” 200 713
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] “GET /MyDemoWeb/body.jsp HTTP/1.1” 200 240
192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:38 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:38 +0800] “GET /MyDemoWeb/java.jsp HTTP/1.1” 200 240
192.168.88.1 - - [30/Jul/2017:12:54:40 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:40 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:41 +0800] “GET /MyDemoWeb/mysql.jsp HTTP/1.1” 200 241
192.168.88.1 - - [30/Jul/2017:12:54:41 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] “GET /MyDemoWeb/web.jsp HTTP/1.1” 200 239
192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:53 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] “GET /MyDemoWeb/mysql.jsp HTTP/1.1” 200 241
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:56 +0800] “GET /MyDemoWeb/web.jsp HTTP/1.1” 200 239
192.168.88.1 - - [30/Jul/2017:12:54:56 +0800] “GET /MyDemoWeb/java.jsp HTTP/1.1” 200 240
192.168.88.1 - - [30/Jul/2017:12:54:57 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:57 +0800] “GET /MyDemoWeb/java.jsp HTTP/1.1” 200 240
192.168.88.1 - - [30/Jul/2017:12:54:58 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:58 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:59 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:59 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:55:43 +0800] “GET /MyDemoWeb/mysql.jsp HTTP/1.1” 200 241
192.168.88.1 - - [30/Jul/2017:12:55:43 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:55:43 +0800] “GET /MyDemoWeb/web.jsp HTTP/1.1” 200 239
192.168.88.1 - - [30/Jul/2017:12:55:43 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242

結果:
(/hadoop.jsp,9)
(/oracle.jsp,9)

2、創建自定義分區

根據jsp文件的名字,將各自的訪問日誌放入到不同的分區文件中

3、使用JDBCRDD 操作數據庫

將RDD的數據保存到mysql數據庫中

mysql的company的emp數據
1 Tom 10 2400
2 Alis 11 1900
3 Kei 12 1500
4 Mi 11 900
結果
ArrayBuffer((Alis,1900), (Kei,1500))

JdbcRDD參數説明

參數名稱

類型

説明

sc

org.apache.spark.SparkContext

Spark Context對象

getConnection

scala.Function0[java.sql.Connection]

得到一個數據庫Connection

sql

scala.Predef.String

執行的SQL語句

lowerBound

scala.Long

下邊界值,即:SQL的第一個參數

upperBound

scala.Long

上邊界值,即:SQL的第二個參數

numPartitions

scala.Int

分區的個數,即:啓動多少個Executor

mapRow

scala.Function1[java.sql.ResultSet, T]

得到的結果集

JdbcRDD的缺點:從上面的參數説明可以看出,JdbcRDD有以下兩個缺點:
(1)執行的SQL必須有兩個參數,並類型都是Long
(2)得到的結果是ResultSet,即:只支持select操作

4、操作數據庫:把結果存放到數據庫中