4.2.4 Task调度

通过分析下面的源码,读者可以了解Spark的Task的调度方式。

1.提交任务

在DAGScheduler中提交任务时,分配任务执行节点。


  1. private def submitMissingTasksstage Stage jobId Int {
  2. logDebug"submitMissingTasks(" + stage + ")"
  3. val myPending = pendingTasks.getOrElseUpdatestage new HashSet
  4. myPending.clear()
  5. var tasks = ArrayBuffer[Task[_]]()
  6. /*判断是否为Shuffle map stage,如果是,则这个stage输出的结果会经过Shuffle阶段作为下一个stge的输入,如果是Result Stage,则stage的结果输出到Spark空间(如count(),save())*/
  7. if stage.isShuffleMap {
  8. for p <- 0 until stage.numPartitions if stage.outputLocsp == Nil {
  9. val locs = getPreferredLocsstage.rdd p
  10. /*初始化ShuffleMapTask*/
  11. tasks += new ShuffleMapTaskstage.id stage.rdd stage.shuffleDep.get p locs
  12. }
  13. } else {
  14. val job = resultStageToJobstage
  15. for id <- 0 until job.numPartitions if job.finishedid)) {
  16. val partition = job.partitionsid
  17. val locs = getPreferredLocsstage.rdd partition
  18. /*初始化ResultTask*/
  19. tasks += new ResultTaskstage.id stage.rdd job.func partition locs id
  20. }
  21. }
  22. val properties = if jobIdToActiveJob.containsjobId)) {
  23. jobIdToActiveJobstage.jobId).properties
  24. } else {
  25. // this stage will be assigned to "default" pool
  26. null
  27. }
  28. /*通过此方法获取任务最佳的执行节点*/
  29. private[spark]
  30. def getPreferredLocsrdd RDD[_], partition Int): Seq[TaskLocation] = synchronized {
  31. ……
  32. }

2.分配任务执行节点

1)如果是调用过cache()方法的RDD,数据已经缓存在内存,则读取内存缓存中分区的数据。


  1. val cached = getCacheLocsrdd)(partition
  2. if (!cached.isEmpty {
  3. return cached
  4. }

2)如果直接能获取到执行地点,则返回执行地点作为任务的执行地点,通常DAG中最源头的RDD或者每个Stage中最开始的RDD会有执行地点的信息。例如,HadoopRDD从HDFS读出的分区就是最好的执行地点。这里涉及Hadoop分区的数据本地性问题,感兴趣的读者可以查阅Hadoop的资料了解。


  1. /@deprecated"Replaced by PartitionwiseSampledRDD" "1.0.0"
  2. private[spark] class SampledRDD[T ClassTag](
  3. override def getPreferredLocationssplit Partition): Seq[String] =
  4. firstParent[T].preferredLocationssplit.asInstanceOf[SampledRDDPartit//ion].prev)/
  5. val rddPrefs = rdd.preferredLocationsrdd.partitionspartition)).toList
  6. if (!rddPrefs.isEmpty {
  7. return rddPrefs.maphost => TaskLocationhost))
  8. }


3)如果不是上面两种情况,将遍历RDD获取第一个窄依赖的父亲RDD对应分区的执行地点。


  1. rdd.dependencies.foreach {
  2. case n NarrowDependency[_] =>
  3. for inPart <- n.getParentspartition)) {

获取到子RDD分区的父母分区的集合,再继续深度优先遍历,不断获取到这个分区的父母分区的第一个分区,直到没有Narrow Dependency。可以通过图4-8看到RDD2的p0分区位置就是RDD0中p0分区的位置。


  1. val locs = getPreferredLocsn.rdd inPart
  2. if locs != Nil {
  3. return locs
  4. }
  5. }
  6. case _ =>
  7. }
  8. Nil
  9. }

如果是Shuffle Dependency,由于在Stage之间需要进行Shuffle,而分区无法确定,所以无法获取分区的存储位置。这表示如果一个Stage的父母Stage还没执行完,则子Stage中的Task不能够获得执行位置。

4.2.4 Task调度 - 图1

图4-8 分区获取prefer位置

整体的Task分发由TaskSchedulerImpl来实现,但是Task的调度(本质上是Task在哪个分区执行)逻辑由TaskSetManager完成。这个类监控整个任务的生命周期,当任务失败时(如执行时间超过一定的阈值),重新调度,也会通过delay scheduling进行基于位置感知(locality-aware)的任务调度。TaskSchedulerImpl类有几个主要接口:接口resourceOffer,作用为判断任务集合是否需要在一个节点上运行。接口statusUpdate,其主要作用为更新任务状态。

任务的locality由以下两种方式确定。

1)RDD DAG源头有HDFS等类型的分布式存储,它们内置的数据本地性决定(RDD中配置preferred location确定)数据存储位置和分区的选取。

2)每个其他非源头Stage由于都要进行Shuffle,所以地址以在resourceoffer中进行round robin来确定,初始提交Stage时,将prefer的位置设置为Nil。但在Stage调度过程中,内部是通过Narrow dep的祖先Stage确定最佳执行位置的。这样相当于每个RDD的分区都有prefer执行位置。