Spark Streaming原理
Spark Streaming 是基於spark的流式批處理引擎。其基本原理是:將實時輸入數據流以時間片為單位進行拆分,然後經Spark引擎以類似批處理的方式處理每個時間片數據。
Spark Streaming作業流程
- 客户端提交作業後啓動Driver(Driver是spark作業的Master);
- 每個作業包含多個Executor,每個Executor以線程的方式運行task,Spark Streaming至少包含一個receiver task(可選的);
- Receiver接收數據後生成Block,並把BlockId彙報給Driver,然後備份到另外一個Executor上;
- ReceiverTracker維護Reciver彙報的BlockId;
- Driver定時啓動JobGenerator,根據Dstream的關係生成邏輯RDD,然後創建Jobset,交給JobScheduler;
- JobScheduler負責調度Jobset,交給DAGScheduler,DAGScheduler根據邏輯RDD,生成相應的Stages,每個stage包含一到多個task;
- TaskScheduler負責把task調度到Executor上,並維護task的運行狀態;
- 當tasks、stages、jobset完成後,單個batch才算完成。
Spark Streaming 與 Storm
聯繫:
流式系統的特點:
低延遲。秒級或更短時間的響應
高性能
分佈式
可擴展。伴隨着業務的發展,數據量、計算量可能會越來越大,所以要求系統是可擴展的
容錯。分佈式系統中的通用問題,一個節點掛了不能影響應用
區別:
1、同一套系統,安裝spark之後就一切都有了
2、spark較強的容錯能力;strom使用較廣、更穩定
3、storm是用Clojure語言去寫的,它的很多擴展都是使用java完成的
4、任務執行方面和strom的區別是:
spark steaming數據進來是一小段時間的RDD,數據進來之後切成一小塊一小塊進行批處理
storm是基於record形式來的,進來的是一個tuple,一條進來就處理一下
5、中間過程實質上就是spark引擎,只不過sparkstreaming在spark之後引擎之上動了一點手腳:對進入spark引擎之前的數據進行了一個封裝,方便進行基於時間片的小批量作業,交給spark進行計算
離散數據流
Spark Streaming最主要的抽象是DStream(Discretized Stream,離散化數據流),表示連續不斷的數據流。
在內部實現上,Spark Streaming的輸入數據按照時間片(如1秒)分成一段一段,每一段數據轉換為Spark中的RDD,這些分段就是DStream,並且對DStream的操作都最終轉變為對相應的RDD的操作。
Spark Streaming提供了被稱為離散化流或者DStream的高層抽象,這個高層抽象用於表示數據的連續流;
創建DStream的方式:由文件、Socket、Kafka、Flume等取得的數據作為輸入數據流;或其他DStream進行的高層操作;
在內部,DStream被表達為RDDs的一個序列。
1、Dstream叫做離散數據流,是一個數據抽象,代表一個數據流。這個數據流可以從對輸入流的轉換獲得
2、Dstream是RDD在時間序列上的一個封裝
3、DStream的內部是通過一組時間序列上連續的RDD表示,每個都包含了特定時間間隔的數據流,RDD代表按照規定時間收集到的數據集
4、DStream這種數據流抽象也可以整體轉換,一個操作結束後轉換另外一種DStream
5、DStream的默認存儲級別為<內存+磁盤>
6、sparkstreaming有一種特別的操作:windows操作,稱為窗口操作,實質是對固定的以時間片積累起來的幾個RDD作為一整體操作
7、可以使用persist()函數進行序列化(KryoSerializer)
輸入輸出數據源
Spark Streaming可整合多種輸入數據源,如:
文件系統(本地文件、HDFS文件)
TCP套接字
Flume
Kafka
處理後的數據可存儲至文件系統、數據庫等系統中
Spark Streaming 讀取外部數據
在Spark Streaming中,有一個組件Receiver,作為一個長期運行的task跑在一個Executor上;
每個Receiver都會負責一個input DStream(比如從文件中讀取數據的文件流,比如套接字流,或者從Kafka中讀取的一個輸入流等等);
Spark Streaming通過input DStream與外部數據源進行連接,讀取相關數據。這項工作由Receiver完成。
Streaming 程序基本步驟
1、創建輸入DStream來定義輸入源
2、通過對DStream應用轉換操作和輸出操作來定義流計算
3、用streamingContext.start()來開始接收數據和處理流程;start之後不能再添加業務邏輯。
4、通過streamingContext.awaitTermination()方法來等待處理結束(手動結束或因為錯誤而結束)
5、可以通過streamingContext.stop()來手動結束流計算進程
StreamingContext 對象
StreamingContext 對象可以通過 SparkConf 對象創建;
不要硬編碼 master 參數在集羣中, 而是通過 spark-submit 接收參數;
對於本地測試和單元測試, 可以傳遞“local[*]” 來運行 Spark Streaming 在進程內運行(自動檢測本地系統的CPU內核數量);
分批間隔時間基於應用延遲需求和可用的集羣資源進行設定(設定間隔要大於應用數據的最小延遲需求,同時不能設置太小以至於系統無法在給定的週期內處理完畢)
其他問題
StreamingContext 對象也可以通過SparkContext對象創建。在context創建之後,可以接着開始如下的工作:
定義 input sources,通過創建 input Dstreams 完成
定義 streaming 計算,通過DStreams的 transformation 和 output 操作實現
啓動接收數據和處理,通過 streamingContext.start()
等待處理停止 (通常因為錯誤),通過streamingContext.awaitTermination()
處理過程可以手動停止,通過 streamingContext.stop()
備註:
一旦context啓動, 沒有新的 streaming 計算可以被設置和添加進來
一旦context被停止, 它不能被再次啓動
只有一個StreamingContext在JVM中在同一時間可以被激活
StreamingContext.stop()執行時,同時停止了SparkContext
基本輸入源
文件流
1、文件必須是cp到指定的路徑中,不能是mv。新建文件也可以。
hdfs、本地文件系統都可以
2、文件流不需要運行接收器,可以不分配核數,即可以使用local[1],這是特例
Socket(套接字)流
Spark Streaming可以通過Socket端口監聽並接收數據,然後進行相應處理
編寫基於套接字的WordCount程序
新開一個命令窗口,啓動nc程序:
nc -lk 9999
(nc 需要安裝 yum install nc)
隨後可以在nc窗口中隨意輸入一些單詞,監聽窗口會自動獲得單詞數據流信息,在監聽窗口每隔x秒就會打印出詞頻統計信息,可以在屏幕上出現結果。
備註:使用local[],可能存在問題。
如果給虛擬機配置的cpu數為1,使用local[]也只會啓動一個線程,該線程用於receiver task,此時沒有資源處理接收達到的數據。
【現象:程序正常執行,不會打印時間戳,屏幕上也不會有其他有效信息】
有幾個問題:
日誌信息太多,不爽,能改善嗎?
加入 setLogLevel
可以從別的機器發送字符串嗎,可以監聽別的機器的端口嗎?
nc –lk 9999
ssc.socketTextStream(“node1”, 9999)
nc命令只能將字符串發送到本地的端口;
streaming程序可以監聽其他機器的端口
每次都需要手動輸入字符串,實在不爽!能寫一個模仿nc的程序,向固定端口發送數據嗎?
RDD隊列流
調試Spark Streaming應用程序的時候,可使用streamingContext.
queueStream(queueOfRDD)創建基於RDD隊列的Dstream;
新建一個RDDQueueStream.scala代碼文件,功能是:每秒創建一個RDD,Streaming每隔5秒就對數據進行處理;
這種方式多用來測試streaming程序。
備註:
oneAtATime:缺省為true,一次處理一個RDD,
設為false,一次處理全部RDD;
RDD隊列流可以使用local[1];
涉及到同時出隊和入隊操作,所以要做同步;