因特殊業務場景,如大促、秒殺活動與突發熱點事情等業務流量在短時間內劇增,形成巨大的流量毛刺,數據流入的速度遠高於數據處理的速度,對流處理系統構成巨大的負載壓力,如果不能正確處理,可能導致集羣資源耗盡最終集羣崩潰,因此有效的反壓機制(backpressure)對保障流處理系統的穩定至關重要。

Storm和Spark Streaming都提供了反壓機制,實現各不相同

對於開啓了acker機制的storm程序,可以通過設置conf.setMaxSpoutPending參數來實現反壓效果,如果下游組件(bolt)處理速度跟不上導致spout發送的tuple沒有及時確認的數超過了參數設定的值,spout會停止發送數據,這種方式的缺點是很難調優conf.setMaxSpoutPending參數的設置以達到最好的反壓效果,設小了會導致吞吐上不去,設大了會導致worker OOM;有震盪,數據流會處於一個顛簸狀態,效果不如逐級反壓;另外對於關閉acker機制的程序無效;

新的storm自動反壓機制(Automatic Back Pressure)通過監控bolt中的接收隊列的情況,當超過高水位值時專門的線程會將反壓信息寫到 Zookeeper ,Zookeeper上的watch會通知該拓撲的所有Worker都進入反壓狀態,最後Spout降低tuple發送的速度。具體實現:

spark 被壓_Streaming

Spark Streaming程序中當計算過程中出現batch processing time > batch interval的情況時,(其中batch processing time為實際計算一個批次花費時間,batch interval為Streaming應用設置的批處理間隔),意味着處理數據的速度小於接收數據的速度,如果這種情況持續過長的時間,會造成數據在內存中堆積,導致Receiver所在Executor內存溢出等問題(如果設置StorageLevel包含disk, 則內存存放不下的數據會溢寫至disk, 加大延遲),可以通過設置參數spark.streaming.receiver.maxRate來限制Receiver的數據接收速率,此舉雖然可以通過限制接收速率,來適配當前的處理能力,防止內存溢出,但也會引入其它問題。比如:producer數據生產高於maxRate,當前集羣處理能力也高於maxRate,這就會造成資源利用率下降等問題。為了更好的協調數據接收速率與資源處理能力,Spark Streaming 從v1.5開始引入反壓機制(back-pressure),通過動態控制數據接收速率來適配集羣數據處理能力

Spark Streaming Backpressure:  根據JobScheduler反饋作業的執行信息來動態調整Receiver數據接收率。通過屬性"spark.streaming.backpressure.enabled"來控制是否啓用backpressure機制,默認值false,即不啓用

Streaming架構如下圖所示:

spark 被壓_大數據_02

BackPressure執行過程如下圖所示:

spark 被壓_執行過程_03

 本文轉至:

3.1 RateController類體系

                RatenController 繼承自StreamingListener. 用於處理BatchCompleted事件。核心代碼為:



1



2



3



4



5



6



7



8



9



10



11



12



13



14



15



16



17



18



19



20



21



22



23



24



25



26



27



28



29



30



31

**         


                    * A StreamingListener that receives batch completion updates, and maintains         


                    * an estimate of the speed at which           this           stream should ingest messages,         


                    * given an estimate computation from a `RateEstimator`         


                    */         


          private          [streaming]           abstract           class           RateController(          val           streamUID          :           Int, rateEstimator          :           RateEstimator)         


          extends           StreamingListener           with           Serializable {         


          ……         


          ……            /**         


                    * Compute the new rate limit and publish it asynchronously.         


                    */         


                    private           def           computeAndPublish(time          :           Long, elems          :           Long, workDelay          :           Long, waitDelay          :           Long)          :           Unit           =         


                    Future[Unit] {         


                    val           newRate           =           rateEstimator.compute(time, elems, workDelay, waitDelay)         


                    newRate.foreach { s           =          >         


                    rateLimit.set(s.toLong)         


                    publish(getLatestRate())         


                    }         


                    }         


                    def           getLatestRate()          :           Long           =           rateLimit.get()         


                    


                    override           def           onBatchCompleted(batchCompleted          :           StreamingListenerBatchCompleted) {         


                    val           elements           =           batchCompleted.batchInfo.streamIdToInputInfo         


                    for           {         


                    processingEnd <- batchCompleted.batchInfo.processingEndTime         


                    workDelay <- batchCompleted.batchInfo.processingDelay         


                    waitDelay <- batchCompleted.batchInfo.schedulingDelay         


                    elems <- elements.get(streamUID).map(          _          .numRecords)         


                    } computeAndPublish(processingEnd, elems, workDelay, waitDelay)         


                    }         


          }  



 

3.2 RateController的註冊

                JobScheduler啓動時會抽取在DStreamGraph中註冊的所有InputDstream中的rateController,並向ListenerBus註冊監聽. 此部分代碼如下:



1



2



3



4



5



6



7



8



9



10



11



12



13



14



15



16



17



18



19



20



21



22



23



24

def           start()          :           Unit           =           synchronized {         


                    if           (eventLoop !          =           null          )           return           // scheduler has already been started         


                    


                    logDebug(          "Starting JobScheduler"          )         


                    eventLoop           =           new           EventLoop[JobSchedulerEvent](          "JobScheduler"          ) {         


                    override           protected           def           onReceive(event          :           JobSchedulerEvent)          :           Unit           =           processEvent(event)         


                    


                    override           protected           def           onError(e          :           Throwable)          :           Unit           =           reportError(          "Error in job scheduler"          , e)         


                    }         


                    eventLoop.start()         


                    


                    // attach rate controllers of input streams to receive batch completion updates         


                    for           {         


                    inputDStream <- ssc.graph.getInputStreams         


                    rateController <- inputDStream.rateController         


                    } ssc.addStreamingListener(rateController)         


                    


                    listenerBus.start()         


                    receiverTracker           =           new           ReceiverTracker(ssc)         


                    inputInfoTracker           =           new           InputInfoTracker(ssc)         


                    receiverTracker.start()         


                    jobGenerator.start()         


                    logInfo(          "Started JobScheduler"          )         


                    }



 

3.3 BackPressure執行過程分析

                BackPressure 執行過程分為BatchCompleted事件觸發時機和事件處理兩個過程

3.3.1 BatchCompleted觸發過程

                對BatchedCompleted的分析,應該從JobGenerator入手,因為BatchedCompleted是批次處理結束的標誌,也就是JobGenerator產生的作業執行完成時觸發的,因此進行作業執行分析。

                Streaming 應用中JobGenerator每個Batch Interval都會為應用中的每個Output Stream建立一個Job, 該批次中的所有Job組成一個Job Set.使用JobScheduler的submitJobSet進行批量Job提交。此部分代碼結構如下所示



1



2



3



4



5



6



7



8



9



10



11



12



13



14



15



16



17



18



19



20



21



22

/** Generate jobs and perform checkpoint for the given `time`.  */         


          private           def           generateJobs(time          :           Time) {         


                    // Set the SparkEnv in this thread, so that job generation code can access the environment         


                    // Example: BlockRDDs are created in this thread, and it needs to access BlockManager         


                    // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.         


                    SparkEnv.set(ssc.env)         


                    


                    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are         


                    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).         


                    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT          _          ALL          _          MARKED          _          ANCESTORS,           "true"          )         


                    Try {         


                    jobScheduler.receiverTracker.allocateBlocksToBatch(time)           // allocate received blocks to batch         


                    graph.generateJobs(time)           // generate jobs using allocated block         


                    }           match           {         


                    case           Success(jobs)           =          >         


                    val           streamIdToInputInfos           =           jobScheduler.inputInfoTracker.getInfo(time)         


                    jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))         


                    case           Failure(e)           =          >         


                    jobScheduler.reportError(          "Error generating jobs for time "           + time, e)         


                    }         


                    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater           =           false          ))         


          }



 其中,sumitJobSet會創建固定數量的後台線程(具體由“spark.streaming.concurrentJobs”指定),去處理Job Set中的Job. 具體實現邏輯為:



1



2



3



4



5



6



7



8



9



10

def           submitJobSet(jobSet          :           JobSet) {         


                    if           (jobSet.jobs.isEmpty) {         


                    logInfo(          "No jobs added for time "           + jobSet.time)         


                    }           else           {         


                    listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))         


                    jobSets.put(jobSet.time, jobSet)         


                    jobSet.jobs.foreach(job           =          > jobExecutor.execute(          new           JobHandler(job)))         


                    logInfo(          "Added jobs for time "           + jobSet.time)         


                    }         


          }



其中JobHandler用於執行Job及處理Job執行結果信息。當Job執行完成時會產生JobCompleted事件. JobHandler的具體邏輯如下面代碼所示:



+ View Code



  當Job執行完成時,向eventLoop發送JobCompleted事件。EventLoop事件處理器接到JobCompleted事件後將調用handleJobCompletion 來處理Job完成事件。handleJobCompletion使用Job執行信息創建StreamingListenerBatchCompleted事件並通過StreamingListenerBus向監聽器發送。實現如下:



1



2



3



4



5



6



7



8



9



10



11



12



13



14



15



16



17



18



19



20



21

private           def           handleJobCompletion(job          :           Job, completedTime          :           Long) {         


                    val           jobSet           =           jobSets.get(job.time)         


                    jobSet.handleJobCompletion(job)         


                    job.setEndTime(completedTime)         


                    listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))         


                    logInfo(          "Finished job "           + job.id +           " from job set of time "           + jobSet.time)         


                    if           (jobSet.hasCompleted) {         


                    jobSets.remove(jobSet.time)         


                    jobGenerator.onBatchCompletion(jobSet.time)         


                    logInfo(          "Total delay: %.3f s for time %s (execution: %.3f s)"          .format(         


                    jobSet.totalDelay /           1000.0          , jobSet.time.toString,         


                    jobSet.processingDelay /           1000.0         


                    ))         


                    listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))         


                    }         


                    job.result           match           {         


                    case           Failure(e)           =          >         


                    reportError(          "Error running job "           + job, e)         


                    case           _           =          >         


                    }         


                    }



 

3.3.2、BatchCompleted事件處理過程

                StreamingListenerBus將事件轉交給具體的StreamingListener,因此BatchCompleted將交由RateController進行處理。RateController接到BatchCompleted事件後將調用onBatchCompleted對事件進行處理。



1



2



3



4



5



6



7



8



9



10

override           def           onBatchCompleted(batchCompleted          :           StreamingListenerBatchCompleted) {         


                    val           elements           =           batchCompleted.batchInfo.streamIdToInputInfo         


                    


                    for           {         


                    processingEnd <- batchCompleted.batchInfo.processingEndTime         


                    workDelay <- batchCompleted.batchInfo.processingDelay         


                    waitDelay <- batchCompleted.batchInfo.schedulingDelay         


                    elems <- elements.get(streamUID).map(          _          .numRecords)         


                    } computeAndPublish(processingEnd, elems, workDelay, waitDelay)         


          }



  onBatchCompleted會從完成的任務中抽取任務的執行延遲和調度延遲,然後用這兩個參數用RateEstimator(目前存在唯一實現PIDRateEstimator,proportional-integral-derivative (PID) controller, PID控制器)估算出新的rate併發布。代碼如下:



spark 被壓_數據_04



/**
   * Compute the new rate limit and publish it asynchronously.
   */
  private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
    Future[Unit] {
      val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
      newRate.foreach { s =>
        rateLimit.set(s.toLong)
        publish(getLatestRate())
      }
    }


spark 被壓_數據_04



其中publish()由RateController的子類ReceiverRateController來定義。具體邏輯如下(ReceiverInputDStream中定義):

 



spark 被壓_數據_04



/**
   * A RateController that sends the new rate to receivers, via the receiver tracker.
   */
  private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
      extends RateController(id, estimator) {
    override def publish(rate: Long): Unit =
      ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
  }



spark 被壓_數據_04



publish的功能為新生成的rate 藉助ReceiverTracker進行轉發。ReceiverTracker將rate包裝成UpdateReceiverRateLimit事交ReceiverTrackerEndpoint



1



2



3



4



5



6

/** Update a receiver's maximum ingestion rate */         


          def           sendRateUpdate(streamUID          :           Int, newRate          :           Long)          :           Unit           =           synchronized {         


                    if           (isTrackerStarted) {         


                    endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))         


                    }         


          }



ReceiverTrackerEndpoint接到消息後,其將會從receiverTrackingInfos列表中獲取Receiver註冊時使用的endpoint(實為ReceiverSupervisorImpl),再將rate包裝成UpdateLimit發送至endpoint.其接到信息後,使用updateRate更新BlockGenerators(RateLimiter子類),來計算出一個固定的令牌間隔。



1



2



3



4



5



6



7



8



9



10



11



12



13



14



15



16



17



18



19

/** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */          


           private            val            endpoint            =            env.rpcEnv.setupEndpoint(          


                      "Receiver-"            + streamId +            "-"            + System.currentTimeMillis(),            new            ThreadSafeRpcEndpoint {          


                      override            val            rpcEnv           :            RpcEnv            =            env.rpcEnv          


                      


                      override            def            receive           :            PartialFunction[Any, Unit]            =            {          


                      case            StopReceiver            =           >          


                      logInfo(           "Received stop signal"           )          


                      ReceiverSupervisorImpl.           this           .stop(           "Stopped by driver"           , None)          


                      case            CleanupOldBlocks(threshTime)            =           >          


                      logDebug(           "Received delete old batch signal"           )          


                      cleanupOldBlocks(threshTime)          


                      case            UpdateRateLimit(eps)            =           >          


                      logInfo(s           "Received a new rate limit: $eps."           )          


                      registeredBlockGenerators.asScala.foreach { bg            =           >          


                      bg.updateRate(eps)          


                      }          


                      }          


                      })



其中RateLimiter的updateRate實現如下:



1



2



3



4



5



6



7



8



9



10



11



12



13



14

/**         


                    * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by         


                    * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.         


                    *         


                    * @param newRate A new rate in events per second. It has no effect if it's 0 or negative.         


                    */         


                    private          [receiver]           def           updateRate(newRate          :           Long)          :           Unit           =         


                    if           (newRate >           0          ) {         


                    if           (maxRateLimit >           0          ) {         


                    rateLimiter.setRate(newRate.min(maxRateLimit))         


                    }           else           {         


                    rateLimiter.setRate(newRate)         


                    }         


                    }



 setRate的實現 如下:



1



2



3



4



5



6



7



8



9



10

public           final           void setRate(double permitsPerSecond) {         


                    Preconditions.checkArgument(permitsPerSecond >           0.0         


                    && !Double.isNaN(permitsPerSecond),           "rate must be positive"          );         


                    synchronized (mutex) {         


                    resync(readSafeMicros());         


                    double stableIntervalMicros           =           TimeUnit.SECONDS.toMicros(          1          L) / permitsPerSecond;            //固定間隔         


                    this          .stableIntervalMicros           =           stableIntervalMicros;         


                    doSetRate(permitsPerSecond, stableIntervalMicros);         


                    }         


                    }



到此,backpressure反壓機制調整rate結束。

 

4.流量控制點

  當Receiver開始接收數據時,會通過supervisor.pushSingle()方法將接收的數據存入currentBuffer等待BlockGenerator定時將數據取走,包裝成block. 在將數據存放入currentBuffer之時,要獲取許可(令牌)。如果獲取到許可就可以將數據存入buffer, 否則將被阻塞,進而阻塞Receiver從數據源拉取數據。



1



2



3



4



5



6



7



8



9



10



11



12



13



14



15



16



17



18



19

/**         


                    * Push a single data item into the buffer.         


                    */         


          def           addData(data          :           Any)          :           Unit           =           {         


                    if           (state           ==           Active) {         


                    waitToPush()            //獲取令牌         


                    synchronized {         


                    if           (state           ==           Active) {         


                    currentBuffer +          =           data         


                    }           else           {         


                    throw           new           SparkException(         


                    "Cannot add data as BlockGenerator has not been started or has been stopped"          )         


                    }         


                    }         


                    }           else           {         


                    throw           new           SparkException(         


                    "Cannot add data as BlockGenerator has not been started or has been stopped"          )         


                    }         


          }



 

      其令牌投放採用令牌桶機制進行, 原理如下圖所示:

spark 被壓_Streaming_08

令牌桶機制: 大小固定的令牌桶可自行以恆定的速率源源不斷地產生令牌。如果令牌不被消耗,或者被消耗的速度小於產生的速度,令牌就會不斷地增多,直到把桶填滿。後面再產生的令牌就會從桶中溢出。最後桶中可以保存的最大令牌數永遠不會超過桶的大小。當進行某操作時需要令牌時會從令牌桶中取出相應的令牌數,如果獲取到則繼續操作,否則阻塞。用完之後不用放回。

  Streaming 數據流被Receiver接收後,按行解析後存入iterator中。然後逐個存入Buffer,在存入buffer時會先獲取token,如果沒有token存在,則阻塞;如果獲取到則將數據存入buffer.  然後等價後續生成block操作。