Spark Scheduler模块源码分析之DAGScheduler

  本文主要结合Spark-1.6.0的源码,对Spark中任务调度模块的执行过程进行分析。Spark Application在遇到Action操作时才会真正的提交任务并进行计算。这时Spark会根据Action操作之前一系列Transform操作的关联关系,生成一个DAG,在后续的操作中,对DAG进行Stage划分,生成Task并最终运行。整个过程如下图所示,DAGScheduler用于对Application进行分析,然后根据各RDD之间的依赖关系划分Stage,根据这些划分好的Stage,对应每个Stage会生成一组Task,将Task Set提交到TaskScheduler后,会由TaskScheduler启动Executor进行任务的计算。
  这里写图片描述
  在任务调度模块中最重要的三个类是:
1. org.apache.spark.scheduler.DAGScheduler
2. org.apache.spark.scheduler.SchedulerBackend
3. org.apache.spark.scheduler.TaskScheduler
这里面SchedulerBackend主要起到的作用是为Task分配计算资源。
  接下来对这分成三篇博客对这三个主要的类进行分析,本文分析DAGScheduler的执行过程。

一、DAGScheduler的构建

  Spark在构造SparkContext时就会生成DAGScheduler的实例。

    val (sched, ts) = SparkContext.createTaskScheduler(this, master)
    _schedulerBackend = sched//生成schedulerBackend
    _taskScheduler = ts//生成taskScheduler
    _dagScheduler = new DAGScheduler(this)//生成dagScheduler,传入当前sparkContext对象。

  在生成_dagScheduler之前,已经生成了_schedulerBackend和_taskScheduler对象。这两个对象会在接下来第二和第三部分中介绍。之所以taskScheduler对象在dagScheduler对象构造之前先生成,是由于在生成DAGScheduler的构造方法中会从传入的SparkContext中获取到taskScheduler对象def this(sc: SparkContext) = this(sc, sc.taskScheduler)
  看一下DAGScheduler对象的主构造方法,

class DAGScheduler(
    private[scheduler] val sc: SparkContext, // 获得当前SparkContext对象
    private[scheduler] val taskScheduler: TaskScheduler,  // 获得当前saprkContext内置的taskScheduler
    listenerBus: LiveListenerBus,     // 异步处理事件的对象,从sc中获取
    mapOutputTracker: MapOutputTrackerMaster, //运行在Driver端管理shuffle map task的输出,从sc中获取
    blockManagerMaster: BlockManagerMaster, //运行在driver端,管理整个JobBlock信息,从sc中获取
    env: SparkEnv, // 从sc中获取
    clock: Clock = new SystemClock())

  其中有关LiveListenerBus会在Spark-1.6.0之Application运行信息记录器JobProgressListener中有具体介绍。MapOutputTrackerMaster,BlockManagerMaster后续也会写博客进行分析。

DAGScheduler的数据结构

  在DAGScheduler的源代码中,定义了很多变量,在刚构造出来时,仅仅只是初始化这些变量,具体使用是在后面Job提交的过程中了。

  private[scheduler] val nextJobId = new AtomicInteger(0) // 生成JobId
  private[scheduler] def numTotalJobs: Int = nextJobId.get() // 总的Job数
  private val nextStageId = new AtomicInteger(0) // 下一个StageId

  private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]] // 记录某个job对应的包含的所有stage
  private[scheduler] val stageIdToStage = new HashMap[Int, Stage] // 记录StageId对应的Stage
  private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage] // 记录每一个shuffle对应的ShuffleMapStage,key为shuffleId
  private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob] // 

  // Stages we need to run whose parents aren't done
  private[scheduler] val waitingStages = new HashSet[Stage]

  // Stages we are running right now
  private[scheduler] val runningStages = new HashSet[Stage]

  // Stages that must be resubmitted due to fetch failures
  private[scheduler] val failedStages = new HashSet[Stage]

  private[scheduler] val activeJobs = new HashSet[ActiveJob]

  private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)

  DAGScheduler构造完成,并初始化一个eventProcessLoop实例后,会调用其eventProcessLoop.start()方法,启动一个多线程,然后把各种event都提交到eventProcessLoop中。这个eventProcessLoop比较重要,在后面也会提到。

二、Job的提交

  一个Job实际上是从RDD调用一个Action操作开始的,该Action操作最终会进入到org.apache.spark.SparkContext.runJob()方法中,在SparkContext中有多个重载的runJob方法,最终入口是下面这个:

  // SparkContext.runJob
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

  这里调用dagScheduler.runJob()方法后,正式进入之前构造的DAGScheduler对象中。在这个方法中,后续一系列的过程以此为:

1. DAGScheduler#runJob

  执行过程中各变量的内容如下图所示
  这里写图片描述
  调用DAGScheduler.submitJob方法后会得到一个JobWaiter实例来监听Job的执行情况。针对Job的Succeeded状态和Failed状态,在接下来代码中都有不同的处理方式。
  

2. DAGScheduler#submitJob

  进入submitJob方法,首先会去检查rdd的分区信息,在确保rdd分区信息正确的情况下,给当前job生成一个jobId,nexJobId在刚构造出来时是从0开始编号的,在同一个SparkContext中,jobId会逐渐顺延。然后构造出一个JobWaiter对象返回给上一级调用函数。通过上面提到的eventProcessLoop提交该任务,最终会调用到DAGScheduler.handleJobSubmitted来处理这次提交的Job。handleJobSubmitted在下面的Stage划分部分会有提到。
  这里写图片描述
  

3. DAGSchedulerEventProcessLoop#post

  在前面的方法中,调用post方法传入的是一个JobSubmitted实例。DAGSchedulerEventProcessLoop类继承自EventLoop类,其中的post方法也是在EventLoop中定义的。在EventLoop中维持了一个LinkedBlockingDeque类型的事件队列,将该Job提交事件存入该队列后,事件线程会从队列中取出事件并进行处理。

  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]() // 事件队列
  def post(event: E): Unit = {
    eventQueue.put(event) // 将JobSubmitted,Job提交事件存入该队列中
  }

4、EventLoop#run

  该方法从eventQueue队列中顺序取出event,调用onReceive方法处理事件

val event = eventQueue.take()
try {
   onReceive(event)
}

5、DAGSchedulerEventProcessLoop#onReceive

  在onReceive方法中,进一步调用doOnReceive方法

  override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
      doOnReceive(event)
    } finally {
      timerContext.stop()
    }
  }

6、DAGSchedulerEventProcessLoop#doOnReceive

  在该方法中,根据事件类别分别匹配不同的方法进一步处理。本次传入的是JobSubmitted方法,那么进一步调用的方法是DAGScheduler.handleJobSubmitted。这部分的逻辑,以及还可以处理的其他事件,都在下面的源代码中。
  

  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
// 处理Job提交事件
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
// 处理Map Stage提交事件
    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
// 处理Stage取消事件
    case StageCancelled(stageId) =>
      dagScheduler.handleStageCancellation(stageId)
// 处理Job取消事件
    case JobCancelled(jobId) =>
      dagScheduler.handleJobCancellation(jobId)
// 处理Job组取消事件
    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)
// 处理所以Job取消事件
    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()
// 处理Executor分配事件
    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)
// 处理Executor丢失事件
    case ExecutorLost(execId) =>
      dagScheduler.handleExecutorLost(execId, fetchFailed = false)

    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)

    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)
// 处理完成事件
    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
      dagScheduler.handleTaskCompletion(completion)
// 处理task集失败事件
    case TaskSetFailed(taskSet, reason, exception) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
// 处理重新提交失败Stage事件
    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  }

7、DAGScheduler#handleJobSubmitted

  当Job提交后,JobSubmitted事件会被eventProcessLoop捕获到,然后进入本方法中。开始处理Job,并执行Stage的划分。这一部分会衔接下一节,所以这个方法的源码以及Stage如何划分会在下一节中详细描述。
  

三、Stage的划分

  Stage的划分过程中,会涉及到宽依赖和窄依赖的概念,宽依赖是Stage的分界线,连续的窄依赖都属于同一Stage。
  这里写图片描述
  比如上图中,在RDD G处调用了Action操作,在划分Stage时,会从G开始逆向分析,G依赖于B和F,其中对B是窄依赖,对F是宽依赖,所以F和G不能算在同一个Stage中,即在F和G之间会有一个Stage分界线。上图中还有一处宽依赖在A和B之间,所以这里还会分出一个Stage。最终形成了3个Stage,由于Stage1和Stage2是相互独立的,所以可以并发执行,等Stage1和Stage2准备就绪后,Stage3才能开始执行。

1、DAGScheduler#handleJobSubmitted

  这个方法的具体代码如下所示,前面提到了Stage的划分是从最后一个Stage开始逆推的,每遇到一个宽依赖处,就分裂成另外一个Stage,依此类推直到Stage划分完毕为止。并且,只有最后一个Stage的类型是ResultStage类型。

  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)// Stage划分过程是从最后一个Stage开始往前执行的,最后一个Stage的类型是ResultStage
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    //为该Job生成一个ActiveJob对象,并准备计算这个finalStage
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job // 该job进入active状态
    activeJobs += job
    finalStage.setActiveJob(job) 
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    submitStage(finalStage)   //提交当前Stage

    submitWaitingStages()
  }

2、DAGScheduler#newResultStage

  在这个方法中,会根据最后调用Action的那个RDD,以及方法调用过程callSite,生成的jobId,partitions等信息生成最后那个Stage。

  private def newResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)// 获取当前Stage的parent Stage,这个方法是划分Stage的核心实现
    val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)// 创建当前最后的ResultStage
    stageIdToStage(id) = stage // 将ResultStage与stageId相关联
    updateJobIdStageIdMaps(jobId, stage) // 更新该job中包含的stage
    stage
  }

3、DAGScheduler#getParentStagesAndId

四、任务的生成

本页内容版权归属为原作者,如有侵犯您的权益,请通知我们删除。

Hadoop 2.0工作原理学习 - 2016-07-15 17:07:13

1 HDFS简介 1.1 Hadoop 2.0介绍 Hadoop是Apache的一个分布式系统基础架构,可以为海量数据提供存储和计算。Hadoop 2.0即第二代Hadoop系统,其框架最核心的设计是HDFS、MapReduce和YARN。其中,HDFS为海量数据提供存储,MapReduce用于分布式计算,YARN用于进行资源管理。 Hadoop 1.0和Hadoop 2.0的结构对比: Hadoop 2.0的主要改进有: 1、通过YARN实现资源的调度与管理,从而使Hadoop 2.0可以运行更多种类的
    本文借鉴官文,添加了一些解释和看法,其中有些理解,写的比较粗糙,有问题的地方希望大家指出。写这篇文章,是想把一些官文和资料中基础、重点拿出来,能总结出便于大家理解的话语。与大多数“wordcount”代码不同的是,并不会有如何运行第一storm代码等内容,只有在运行完代码后,发现需要明白:“知其然,并知其所以然”。 Storm是什么?为什么要用Storm?为什么不用Spark? 第一个问题,以下概念足以解释: Storm是 基于数据流的实时处理系统 ,提供了大吞吐量的实时计算能力。通过数据入口获取
本文要解决的问题: 从源码级别对Spark Streaming进行简单学习。 Summarize Spark Streaming实现了对实时流数据的高吞吐量、低容错的数据处理API。它的数据来源有很多种:Kafka、Flume、Twitter、ZeroMQ、TCP Scoket等。架构图如下: Streaming接收实时流输入的数据,将其按批划分,然后交给Spark Enigne分批处理。如下图所示: StreamingContext 和SparkContext相似。要使用Spark的流处理就必须创建St
1.前言 HBase是云计算环境下最重要的NOSQL数据库,提供了基于Hadoop的数据存储、索引、查询,其最大的优点就是可以通过硬件的扩展从而几乎无限的扩展其存储和检索能力。但是HBase与传统的基于SQL语言的关系数据库无论从理念还是使用方式上都相去甚远,以至于要将基于SQL的项目移植到HBase时往往需要重写整个项目。 为了解决这个问题,很多开源项目提供了HBase的类SQL中间件,意即提供一种在HBase上使用的类SQL语言,使得程序员能够像使用关系数据库一样使用HBase, Apache Pho
为什么要用VXLAN 随着云计算数据中心的大规模建设与运营,传统的依赖VLAN技术的二层网络技术面临着越来越多的问题: vlan的数量限制   4096个vlan远不能满足大规模云计算数据中心的需求 物理网络基础设施的限制    基于IP子网的区域划分限制了需要二层网络连通性的应用负载的部署 TOR交换机MAC表耗尽     虚拟化以及东西向流量导致更多的MAC表项 多租户场景 租户可以自定义网络,且无需考虑与其他租户IP地址的重叠。 目前解决这些问题的主要方案是基于overlay的大二层网络技术。典型的
本节我们讨论 volume 的 Backup 操作。 Backup 是将 volume 备份到别的地方(备份设备),将来可以通过 restore 操作恢复。 Backup VS Snapshot 初看 backup 功能好像与 snapshot 很相似,都可以保存 volume 的当前状态,以备以后恢复。但二者在用途和实现上还是有区别的,具体表现在: Snapshot 依赖于源 volume,不能独立存在;而 backup 不依赖源 volume,即便源 volume 不存在了,也可以 restore。

LibSVM在Java中的简单应用 - 2016-07-14 18:07:36

首先,在这里首先感谢台湾林智仁先生的开源工具包libsvm。使SVM算法更加普及。大家可以到下面的libsvm官网去了解相关的信息。 Libsvm官方网站- https://www.csie.ntu.edu.tw/~cjlin/libsvm/ 其次,我在使用过程中发现,先生svm_scale文件中无法将经过规约的文件输出到本地txt文件中,只能在控制台重定向,而我并不想在程序运行中打开控制台进行较为繁琐的操作。 所以我改造了svm_scale文件,实现了文件的写入,在这里可以和大家分享一下。 改造后新增参
最新消息 Docker在上周的DockerCon技术大会上发布了1.12版核心产品Docker Engine,最大的新特性是Docker Swarm已经被整合到了Docker Engine里面而不再是一个单独的工具了,这样就可以更容易的把多个Docker主机组合成一整个规模更大可靠性更高的逻辑单元。Docker的掌舵者 Adrian Mouat相信这种新的集群模式可以大大增强Docker在相关领域的竞争力。 把Docker Swarm整合进Docker Engine是一个重大改进,但它也只是一个附加功能,
本文记录在3台物理机上搭建Hadoop 2.6.0的详细步骤及碰到的问题解决。默认使用root账号操作,实际中建议使用专用的hadoop用户账号。 1. 环境 机器: 物理机3台,ip分别为192.168.1.130、192.168.1.132、192.168.1.134 操作系统: CentOS 6.6 Java: 1.7 Hadoop: 2.6.0 请确保JDK已安装,使用 java -version 确认。 hosts配置 配置主机hosts文件: vim /etc/hosts 192.168.1.

Docker的安装配置及使用详解 - 2016-07-14 14:07:12

基本概念 Docker 包括三个基本概念 镜像(Image) 容器(Container) 仓库(Repository) 先理解了这三个概念,就理解了 Docker 的整个生命周期。 1、docker安装与启动 yum install -y epel-releaseyum install docker-io # 安装docker # 配置文件 /etc/sysconfig/docker chkconfig docker on # 加入开机启动 service docker start # 启动docker服