第6课:Spark Streaming源码解读之Job动态生成和深度思考

时间:2021-06-19 20:54:48

一:Spark Streaming Job生成深度思考
    1. 大数据项目当中,例如Hadoop,Spark等,如果不是流处理的话,一般会有定时任务。例如5分钟触发一次,1个小时触发一次,这就是做流处理的感觉,一切不是流处理,或者与流处理无关的数据都将是没有价值的数据,以前做批处理的时候其实也是隐形的在做流处理。
    2. JobGenerator构造的时候有一个核心的参数是jobScheduler, jobScheduler是整个作业的生成和提交给集群的核心,JobGenerator会基于DStream生成Job。这里面的Job就相当于Java中线程要处理的Runnable里面的业务逻辑封装。Spark的Job就是运行的一个作业。
    3. Spark Streaming除了基于定时操作以外参数Job,还可以通过各种聚合操作,或者基于状态的操作。
    4. 每5秒钟JobGenerator都会产生Job,此时的Job是逻辑级别的,也就是说有这个Job,并且说这个Job具体该怎么去做,此时并没有执行。 具体执行的话是交给底层的RDD的action去触发,此时的action也是逻辑级别的。底层物理级别的,Spark Streaming他是基于DStream构建的依赖关系导致的Job是逻辑级别的,底层是基于RDD的逻辑级别的。

val ssc = new StreamingContext(conf, Seconds(5))
 
5. Spark Streaming的触发器是以时间为单位的,storm是以事件为触发器,也就是基于一个又一个record. Spark Streaming基于时间,这个时间是Batch Duractions

从逻辑级别翻译成物理级别,最后一个操作肯定是RDD的action,但是并不想一翻译立马就触发job。这个时候怎么办?
6. action触发作业,这个时候作为Runnable接口封装,他会定义一个方法,这个方法里面是基于DStream的依赖关系生成的RDD。翻译的时候 是将DStream的依赖关系翻译成RDD的依赖关系,由于DStream的依赖关系最后一个是action级别的,翻译成RDD的时候,RDD的最后一 个操作也应该是action级别的,如果翻译的时候直接执行的话,就直接生成了Job,就没有所谓的队列,所以会将翻译的事件放到一个函数中或者一个方法fun() 中,因此,如果这个函数没有指定的action触发作业是执行不了的。
7. Spark Streaming根据时间不断的去管理我们的生成的作业,所以这个时候我们每个作业又有action级别的操作,这个action操作是对 DStream进行逻辑级别的操作,他生成每个Job放到队列的时候,他一定会被翻译为RDD的操作,那基于RDD操作的最后一个一定是action级别 的,如果翻译的话直接就是触发action的话整个Spark Streaming的Job就不受管理了。因此我们既要保证他的翻译,又要保证对他的管理,把DStream之间的依赖关系转变为RDD之间的依赖关系, 最后一个DStream使得action的操作,翻译成一个RDD之间的action操作,整个翻译后的内容他是一块内容,他这一块内容是放在一个函数体 中的,这个函数体,他会函数的定义,这个函数由于他只是定义还没有执行,所以他里面的RDD的action不会执行,不会触发Job,当我们的 JobScheduler要调度Job的时候,转过来在线程池中拿出一条线程执行刚才的封装的方法。

 

总体架构图参考如下:

 第6课:Spark Streaming源码解读之Job动态生成和深度思考

 

 

 

 

 

二:Spark Streaming Job生成源码解析
Spark 作业动态生成三大核心:
     JobGenerator: 负责Job生成。
     JobSheduler: 负责Job调度。
     ReceiverTracker: 获取元数据。

1. JobScheduler的start方法被调用的时候,会启动JobGenerator的start方法。

/** Start generation of jobs */
def start(): Unit = synchronized {
//eventLoop是消息循环体,因为不断的生成Job
  if (eventLoop != null) return // generator has already been started
 
  // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
  // See SPARK-10125
  checkpointWriter
//匿名内部类
  eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
    override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
 
    override protected def onError(e: Throwable): Unit = {
      jobScheduler.reportError("Error in job generator", e)
    }
  }
//调用start方法。
  eventLoop.start()
 
  if (ssc.isCheckpointPresent) {
    restart()
  } else {
    startFirstTime()
  }
}

EvenLoop: 的start方法被调用,首先会调用onstart方法。然后就启动线程。

/**

 * An event loop toreceive events from the caller and process all events in the event thread. It

 * will start anexclusive event thread to process all events.

 *

 * Note: The eventqueue will grow indefinitely. So subclasses should make sure `onReceive` can

 * handle events intime to avoid the potential OOM.

 */

private[spark] abstract class EventLoop[E](name: String)extends Logging {

 

  private val eventQueue:BlockingQueue[E] = new LinkedBlockingDeque[E]()

 

  private valstopped = new AtomicBoolean(false)

//开启后台线程。  

  private valeventThread = new Thread(name) {

    setDaemon(true)

 

    override defrun(): Unit = {

      try {

//不断的从BlockQueue中拿消息。

        while(!stopped.get) {

//线程的start方法调用就会不断的循环队列,而我们将消息放到eventQueue中。

          val event= eventQueue.take()

          try {

//

           onReceive(event)

          } catch {

            caseNonFatal(e) => {

              try {

               onError(e)

              }catch {

               case NonFatal(e) => logError("Unexpected error in " + name,e)

              }

            }

          }

        }

      } catch {

        case ie:InterruptedException => // exit even if eventQueue is not empty

        caseNonFatal(e) => logError("Unexpected error in " + name, e)

      }

    }

 

  }

 

  def start(): Unit= {

    if(stopped.get) {

      throw newIllegalStateException(name + " has already been stopped")

    }

    // Call onStartbefore starting the event thread to make sure it happens before onReceive

 

    onStart()

   eventThread.start()

  }

EvenLoop: 的start方法被调用,首先会调用onstart方法。然后就启动线程。

/**
 * An event loop to receive events from the caller and process all events in the event thread. It
 * will start an exclusive event thread to process all events.
 *
 * Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can
 * handle events in time to avoid the potential OOM.
 */
private[spark] abstract class EventLoop[E](name: String) extends Logging {
 
  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()
 
  private val stopped = new AtomicBoolean(false)
//开启后台线程。   
  private val eventThread = new Thread(name) {
    setDaemon(true)
 
    override def run(): Unit = {
      try {
//不断的从BlockQueue中拿消息。
        while (!stopped.get) {
//线程的start方法调用就会不断的循环队列,而我们将消息放到eventQueue中。
          val event = eventQueue.take()
          try {
//
            onReceive(event)
          } catch {
            case NonFatal(e) => {
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
            }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }
 
  }
 
  def start(): Unit = {
    if (stopped.get) {
      throw new IllegalStateException(name + " has already been stopped")
    }
    // Call onStart before starting the event thread to make sure it happens before onReceive
 
    onStart()
    eventThread.start()
  }

 

 

上述之中的onReceive:不断的从消息队列中获得消息,一旦获得消息就会处理。
不要在onReceive中添加阻塞的消息,如果这样的话会不断的阻塞消息。
消息循环器一般都不会处理具体的业务逻辑,一般消息循环器发现消息以后都会将消息路由给其他的线程去处理。

/**

 * Invoked in theevent thread when polling events from the event queue.

 *

 * Note: Shouldavoid calling blocking actions in `onReceive`, or the event thread will beblocked

 * and cannotprocess events in time. If you want to call some blocking actions, run them in

 * another thread.

 */

protected def onReceive(event: E): Unit

消息队列接收到事件后具体处理如下:

/** Processes all events */

private def processEvent(event: JobGeneratorEvent) {

 logDebug("Got event " + event)

  event match {

    caseGenerateJobs(time) => generateJobs(time)

    caseClearMetadata(time) => clearMetadata(time)

    caseDoCheckpoint(time, clearCheckpointDataLater) =>

     doCheckpoint(time, clearCheckpointDataLater)

    caseClearCheckpointData(time) => clearCheckpointData(time)

  }

}

基于Batch Duractions生成Job,并完成checkpoint.
Job生成的5个步骤。

/** Generate jobs and perform checkpoint for the given`time`.  */

private def generateJobs(time: Time) {

  // Set theSparkEnv in this thread, so that job generation code can access the environment

  // Example:BlockRDDs are created in this thread, and it needs to access BlockManager

  // Update: Thisis probably redundant after threadlocal stuff in SparkEnv has been removed.

 SparkEnv.set(ssc.env)

  Try {

//第一步:获取当前时间段里面的数据。根据分配的时间来分配具体要处理的数据。

   jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocatereceived blocks to batch

//第二步:生成Job,获取RDD的DAG依赖关系。在此基于DStream生成了RDD实例。

   graph.generateJobs(time) // generate jobs using allocated block

  } match {

    caseSuccess(jobs) =>

//第三步:获取streamIdToInputInfos的信息。BacthDuractions要处理的数据,以及我们要处理的业务逻辑。

      valstreamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

//第四步:将生成的Job交给jobScheduler

     jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

    case Failure(e)=>

     jobScheduler.reportError("Error generating jobs for time " +time, e)

  }

//第五步:进行checkpoint

 eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))

}

此时的outputStream是整个DStream中的最后一个DStream,也就是foreachDStream.

def generateJobs(time: Time): Seq[Job] = {

 logDebug("Generating jobs for time " + time)

  val jobs =this.synchronized {

   outputStreams.flatMap { outputStream =>

//根据最后一个DStream,然后根据时间生成Job.

      val jobOption= outputStream.generateJob(time)

     jobOption.foreach(_.setCallSite(outputStream.creationSite))

      jobOption

    }

  }

 logDebug("Generated " + jobs.length + " jobs for time" + time)

  jobs

}

 

 

作者:大数据技术研发人员:谢彪

  • 资料来源于:DT_大数据梦工厂(Spark发行版本定制

  • DT大数据梦工厂微信公众号:DT_Spark 

  • 新浪微博:http://www.weibo.com/ilovepains

  • 王家林老师每晚20:00免费大数据实战

YY直播:68917580