Spark Streaming原理

Spark Streaming 是基於spark的流式批處理引擎。其基本原理是:將實時輸入數據流以時間片為單位進行拆分,然後經Spark引擎以類似批處理的方式處理每個時間片數據。

Spark Operator 與工作流 集成_spark

Spark Streaming作業流程

Spark Operator 與工作流 集成_數據_02

  • 客户端提交作業後啓動Driver(Driver是spark作業的Master);
  • 每個作業包含多個Executor,每個Executor以線程的方式運行task,Spark Streaming至少包含一個receiver task(可選的);
  • Receiver接收數據後生成Block,並把BlockId彙報給Driver,然後備份到另外一個Executor上;
  • ReceiverTracker維護Reciver彙報的BlockId;
  • Driver定時啓動JobGenerator,根據Dstream的關係生成邏輯RDD,然後創建Jobset,交給JobScheduler;
  • JobScheduler負責調度Jobset,交給DAGScheduler,DAGScheduler根據邏輯RDD,生成相應的Stages,每個stage包含一到多個task;
  • TaskScheduler負責把task調度到Executor上,並維護task的運行狀態;
  • 當tasks、stages、jobset完成後,單個batch才算完成。

Spark Streaming 與 Storm

聯繫:
流式系統的特點:
低延遲。秒級或更短時間的響應
高性能
分佈式
可擴展。伴隨着業務的發展,數據量、計算量可能會越來越大,所以要求系統是可擴展的
容錯。分佈式系統中的通用問題,一個節點掛了不能影響應用

區別:
1、同一套系統,安裝spark之後就一切都有了
2、spark較強的容錯能力;strom使用較廣、更穩定
3、storm是用Clojure語言去寫的,它的很多擴展都是使用java完成的
4、任務執行方面和strom的區別是:
spark steaming數據進來是一小段時間的RDD,數據進來之後切成一小塊一小塊進行批處理
storm是基於record形式來的,進來的是一個tuple,一條進來就處理一下
5、中間過程實質上就是spark引擎,只不過sparkstreaming在spark之後引擎之上動了一點手腳:對進入spark引擎之前的數據進行了一個封裝,方便進行基於時間片的小批量作業,交給spark進行計算

離散數據流

Spark Streaming最主要的抽象是DStream(Discretized Stream,離散化數據流),表示連續不斷的數據流。
在內部實現上,Spark Streaming的輸入數據按照時間片(如1秒)分成一段一段,每一段數據轉換為Spark中的RDD,這些分段就是DStream,並且對DStream的操作都最終轉變為對相應的RDD的操作。
Spark Streaming提供了被稱為離散化流或者DStream的高層抽象,這個高層抽象用於表示數據的連續流;

創建DStream的方式:由文件、Socket、Kafka、Flume等取得的數據作為輸入數據流;或其他DStream進行的高層操作;

在內部,DStream被表達為RDDs的一個序列。
1、Dstream叫做離散數據流,是一個數據抽象,代表一個數據流。這個數據流可以從對輸入流的轉換獲得
2、Dstream是RDD在時間序列上的一個封裝
3、DStream的內部是通過一組時間序列上連續的RDD表示,每個都包含了特定時間間隔的數據流,RDD代表按照規定時間收集到的數據集
4、DStream這種數據流抽象也可以整體轉換,一個操作結束後轉換另外一種DStream
5、DStream的默認存儲級別為<內存+磁盤>
6、sparkstreaming有一種特別的操作:windows操作,稱為窗口操作,實質是對固定的以時間片積累起來的幾個RDD作為一整體操作
7、可以使用persist()函數進行序列化(KryoSerializer)

輸入輸出數據源

Spark Streaming可整合多種輸入數據源,如:
文件系統(本地文件、HDFS文件)
TCP套接字
Flume
Kafka
處理後的數據可存儲至文件系統、數據庫等系統中

Spark Streaming 讀取外部數據

在Spark Streaming中,有一個組件Receiver,作為一個長期運行的task跑在一個Executor上;

每個Receiver都會負責一個input DStream(比如從文件中讀取數據的文件流,比如套接字流,或者從Kafka中讀取的一個輸入流等等);

Spark Streaming通過input DStream與外部數據源進行連接,讀取相關數據。這項工作由Receiver完成。

Streaming 程序基本步驟

1、創建輸入DStream來定義輸入源

2、通過對DStream應用轉換操作和輸出操作來定義流計算

3、用streamingContext.start()來開始接收數據和處理流程;start之後不能再添加業務邏輯。

4、通過streamingContext.awaitTermination()方法來等待處理結束(手動結束或因為錯誤而結束)

5、可以通過streamingContext.stop()來手動結束流計算進程

StreamingContext 對象

StreamingContext 對象可以通過 SparkConf 對象創建;

不要硬編碼 master 參數在集羣中, 而是通過 spark-submit 接收參數;

對於本地測試和單元測試, 可以傳遞“local[*]” 來運行 Spark Streaming 在進程內運行(自動檢測本地系統的CPU內核數量);

分批間隔時間基於應用延遲需求和可用的集羣資源進行設定(設定間隔要大於應用數據的最小延遲需求,同時不能設置太小以至於系統無法在給定的週期內處理完畢)

其他問題

StreamingContext 對象也可以通過SparkContext對象創建。在context創建之後,可以接着開始如下的工作:
定義 input sources,通過創建 input Dstreams 完成
定義 streaming 計算,通過DStreams的 transformation 和 output 操作實現
啓動接收數據和處理,通過 streamingContext.start()
等待處理停止 (通常因為錯誤),通過streamingContext.awaitTermination()
處理過程可以手動停止,通過 streamingContext.stop()

備註:
一旦context啓動, 沒有新的 streaming 計算可以被設置和添加進來
一旦context被停止, 它不能被再次啓動
只有一個StreamingContext在JVM中在同一時間可以被激活
StreamingContext.stop()執行時,同時停止了SparkContext

基本輸入源

文件流

1、文件必須是cp到指定的路徑中,不能是mv。新建文件也可以。
hdfs、本地文件系統都可以

2、文件流不需要運行接收器,可以不分配核數,即可以使用local[1],這是特例

Socket(套接字)流

Spark Streaming可以通過Socket端口監聽並接收數據,然後進行相應處理

編寫基於套接字的WordCount程序

新開一個命令窗口,啓動nc程序:
nc -lk 9999
(nc 需要安裝 yum install nc)

隨後可以在nc窗口中隨意輸入一些單詞,監聽窗口會自動獲得單詞數據流信息,在監聽窗口每隔x秒就會打印出詞頻統計信息,可以在屏幕上出現結果。

備註:使用local[],可能存在問題。
如果給虛擬機配置的cpu數為1,使用local[
]也只會啓動一個線程,該線程用於receiver task,此時沒有資源處理接收達到的數據。
【現象:程序正常執行,不會打印時間戳,屏幕上也不會有其他有效信息】

有幾個問題:
日誌信息太多,不爽,能改善嗎?
加入 setLogLevel

可以從別的機器發送字符串嗎,可以監聽別的機器的端口嗎?
nc –lk 9999
ssc.socketTextStream(“node1”, 9999)
nc命令只能將字符串發送到本地的端口;
streaming程序可以監聽其他機器的端口

每次都需要手動輸入字符串,實在不爽!能寫一個模仿nc的程序,向固定端口發送數據嗎?

RDD隊列流

調試Spark Streaming應用程序的時候,可使用streamingContext.
queueStream(queueOfRDD)創建基於RDD隊列的Dstream;

新建一個RDDQueueStream.scala代碼文件,功能是:每秒創建一個RDD,Streaming每隔5秒就對數據進行處理;

這種方式多用來測試streaming程序。

Spark Operator 與工作流 集成_spark_03


備註:

oneAtATime:缺省為true,一次處理一個RDD,

設為false,一次處理全部RDD;

RDD隊列流可以使用local[1];

涉及到同時出隊和入隊操作,所以要做同步;