本文共 5455 字,大约阅读时间需要 18 分钟。
本节书摘来自华章社区《深入理解Spark:核心思想与源码分析》一书中的第3章,第3.7节创建和启动DAGScheduler,作者耿嘉安,更多章节内容可以访问云栖社区“华章社区”公众号查看
3.7 创建和启动DAGScheduler
DAGScheduler主要用于在任务正式交给TaskSchedulerImpl提交之前做一些准备工作,包括:创建Job,将DAG中的RDD划分到不同的Stage,提交Stage,等等。创建DAG-Scheduler的代码如下。@volatile private[spark] var dagScheduler: DAGScheduler = _ dagScheduler = new DAGScheduler(this)
DAGScheduler的数据结构主要维护jobId和stageId的关系、Stage、ActiveJob,以及缓存的RDD的partitions的位置信息,见代码清单3-32。
代码清单3-32 DAGScheduler维护的数据结构private[scheduler] val nextJobId = new AtomicInteger(0)private[scheduler] def numTotalJobs: Int = nextJobId.get()private val nextStageId = new AtomicInteger(0)private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]private[scheduler] val stageIdToStage = new HashMap[Int, Stage]private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage]private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob] // Stages we need to run whose parents aren't done private[scheduler] val waitingStages = new HashSet[Stage] // Stages we are running right now private[scheduler] val runningStages = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures private[scheduler] val failedStages = new HashSet[Stage] private[scheduler] val activeJobs = new HashSet[ActiveJob] // Contains the locations that each RDD's partitions are cached on private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]] private val failedEpoch = new HashMap[String, Long] private val dagSchedulerActorSupervisor = env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this))) private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()在构造DAGScheduler的时候会调用initializeEventProcessActor方法创建DAGScheduler-EventProcessActor,见代码清单3-33。代码清单3-33 DAGSchedulerEventProcessActor的初始化 private[scheduler] var eventProcessActor: ActorRef = _private def initializeEventProcessActor() { // blocking the thread until supervisor is started, which ensures eventProcess-Actor is // not null before any job is submitted implicit val timeout = Timeout(30 seconds) val initEventActorReply = dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this)) eventProcessActor = Await.result(initEventActorReply, timeout.duration). asInstanceOf[ActorRef]}initializeEventProcessActor()
这里的DAGSchedulerActorSupervisor主要作为DAGSchedulerEventProcessActor的监管者,负责生成DAGSchedulerEventProcessActor。从代码清单3-34可以看出,DAGScheduler-ActorSupervisor对于DAGSchedulerEventProcessActor采用了Akka的一对一监管策略。DAG-SchedulerActorSupervisor一旦生成DAGSchedulerEventProcessActor,并注册到ActorSystem,ActorSystem就会调用DAGSchedulerEventProcessActor的preStart,taskScheduler于是就持有了dagScheduler,见代码清单3-35。从代码清单3-35我们还看到DAG-SchedulerEventProcessActor所能处理的消息类型,比如JobSubmitted、BeginEvent、CompletionEvent等。DAGScheduler-EventProcessActor接受这些消息后会有不同的处理动作。在本章,读者只需要理解到这里即可,后面章节用到时会详细分析。
代码清单3-34 DAGSchedulerActorSupervisor的监管策略private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) extends Actor with Logging { override val supervisorStrategy = OneForOneStrategy() { case x: Exception => logError("eventProcesserActor failed; shutting down SparkContext", x) try { dagScheduler.doCancelAllJobs() } catch { case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t) } dagScheduler.sc.stop() Stop }def receive = { case p: Props => sender ! context.actorOf(p) case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor") }}代码清单3-35 DAGSchedulerEventProcessActor的实现private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGS-cheduler) extends Actor with Logging { override def preStart() { dagScheduler.taskScheduler.setDAGScheduler(dagScheduler) } /** * The main event loop of the DAG scheduler. */ def receive = { case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) case StageCancelled(stageId) => dagScheduler.handleStageCancellation(stageId) case JobCancelled(jobId) => dagScheduler.handleJobCancellation(jobId) case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId) case AllJobsCancelled => dagScheduler.doCancelAllJobs() case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId) => dagScheduler.handleExecutorLost(execId, fetchFailed = false) case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => dagScheduler.handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason) => dagScheduler.handleTaskSetFailed(taskSet, reason) case ResubmitFailedStages => dagScheduler.resubmitFailedStages()}override def postStop() { // Cancel any active jobs in postStop hook dagScheduler.cleanUpAfterSchedulerStop()}
转载地址:http://hdzlo.baihongyu.com/