博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
《深入理解Spark:核心思想与源码分析》——3.7节创建和启动DAGScheduler
阅读量:6703 次
发布时间:2019-06-25

本文共 5455 字,大约阅读时间需要 18 分钟。

本节书摘来自华章社区《深入理解Spark:核心思想与源码分析》一书中的第3章,第3.7节创建和启动DAGScheduler,作者耿嘉安,更多章节内容可以访问云栖社区“华章社区”公众号查看

3.7 创建和启动DAGScheduler

DAGScheduler主要用于在任务正式交给TaskSchedulerImpl提交之前做一些准备工作,包括:创建Job,将DAG中的RDD划分到不同的Stage,提交Stage,等等。创建DAG-Scheduler的代码如下。

@volatile private[spark] var dagScheduler: DAGScheduler = _    dagScheduler = new DAGScheduler(this)

DAGScheduler的数据结构主要维护jobId和stageId的关系、Stage、ActiveJob,以及缓存的RDD的partitions的位置信息,见代码清单3-32。

代码清单3-32 DAGScheduler维护的数据结构

private[scheduler] val nextJobId = new AtomicInteger(0)private[scheduler] def numTotalJobs: Int = nextJobId.get()private val nextStageId = new AtomicInteger(0)private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]private[scheduler] val stageIdToStage = new HashMap[Int, Stage]private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage]private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]    // Stages we need to run whose parents aren't done    private[scheduler] val waitingStages = new HashSet[Stage]    // Stages we are running right now    private[scheduler] val runningStages = new HashSet[Stage]    // Stages that must be resubmitted due to fetch failures    private[scheduler] val failedStages = new HashSet[Stage]    private[scheduler] val activeJobs = new HashSet[ActiveJob]    // Contains the locations that each RDD's partitions are cached on    private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]    private val failedEpoch = new HashMap[String, Long]    private val dagSchedulerActorSupervisor =        env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this)))    private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()在构造DAGScheduler的时候会调用initializeEventProcessActor方法创建DAGScheduler-EventProcessActor,见代码清单3-33。代码清单3-33 DAGSchedulerEventProcessActor的初始化    private[scheduler] var eventProcessActor: ActorRef = _private def initializeEventProcessActor() {        // blocking the thread until supervisor is started, which ensures eventProcess-Actor is        // not null before any job is submitted        implicit val timeout = Timeout(30 seconds)        val initEventActorReply =            dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))        eventProcessActor = Await.result(initEventActorReply, timeout.duration).            asInstanceOf[ActorRef]}initializeEventProcessActor()

这里的DAGSchedulerActorSupervisor主要作为DAGSchedulerEventProcessActor的监管者,负责生成DAGSchedulerEventProcessActor。从代码清单3-34可以看出,DAGScheduler-ActorSupervisor对于DAGSchedulerEventProcessActor采用了Akka的一对一监管策略。DAG-SchedulerActorSupervisor一旦生成DAGSchedulerEventProcessActor,并注册到ActorSystem,ActorSystem就会调用DAGSchedulerEventProcessActor的preStart,taskScheduler于是就持有了dagScheduler,见代码清单3-35。从代码清单3-35我们还看到DAG-SchedulerEventProcessActor所能处理的消息类型,比如JobSubmitted、BeginEvent、CompletionEvent等。DAGScheduler-EventProcessActor接受这些消息后会有不同的处理动作。在本章,读者只需要理解到这里即可,后面章节用到时会详细分析。

代码清单3-34 DAGSchedulerActorSupervisor的监管策略

private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)    extends Actor with Logging {    override val supervisorStrategy =        OneForOneStrategy() {            case x: Exception =>                logError("eventProcesserActor failed; shutting down SparkContext", x)                try {                    dagScheduler.doCancelAllJobs()                } catch {                    case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)                }                dagScheduler.sc.stop()                Stop    }def receive = {        case p: Props => sender ! context.actorOf(p)        case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor")    }}代码清单3-35 DAGSchedulerEventProcessActor的实现private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGS-cheduler)    extends Actor with Logging {    override def preStart() {        dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)    }    /**    * The main event loop of the DAG scheduler.    */    def receive = {        case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>            dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,                listener, properties)        case StageCancelled(stageId) =>            dagScheduler.handleStageCancellation(stageId)        case JobCancelled(jobId) =>            dagScheduler.handleJobCancellation(jobId)        case JobGroupCancelled(groupId) =>            dagScheduler.handleJobGroupCancelled(groupId)        case AllJobsCancelled =>            dagScheduler.doCancelAllJobs()        case ExecutorAdded(execId, host) =>            dagScheduler.handleExecutorAdded(execId, host)        case ExecutorLost(execId) =>            dagScheduler.handleExecutorLost(execId, fetchFailed = false)        case BeginEvent(task, taskInfo) =>            dagScheduler.handleBeginEvent(task, taskInfo)        case GettingResultEvent(taskInfo) =>            dagScheduler.handleGetTaskResult(taskInfo)        case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>            dagScheduler.handleTaskCompletion(completion)        case TaskSetFailed(taskSet, reason) =>            dagScheduler.handleTaskSetFailed(taskSet, reason)        case ResubmitFailedStages =>            dagScheduler.resubmitFailedStages()}override def postStop() {    // Cancel any active jobs in postStop hook    dagScheduler.cleanUpAfterSchedulerStop()}

转载地址:http://hdzlo.baihongyu.com/

你可能感兴趣的文章
【Linux】开源系统监控方案:Observium
查看>>
django 内置验证登录模块(auth login logout authenticate)测试
查看>>
小工具
查看>>
在VM虚拟机中安装Redhat6.5 / CentOs6.5
查看>>
用SVN、LNMP和MySQL三种环境 部署一个社交网站
查看>>
JEPLUS工作流之分支聚合——JEPLUS软件快速开发平台
查看>>
在EditPlus中配置JSHint插件,助你提高JS代码质量
查看>>
AOP(三)
查看>>
14-DHCP Snooping //网上IOU
查看>>
17-高级路由:OSPF区域类型:Stub、Totally Stubby
查看>>
30. Substring with Concatenation of All Words
查看>>
小门禁系统服务端(实现特定协议的服务端应用)开发
查看>>
Javascript面试题解析
查看>>
扩展jQuery方法
查看>>
监听DOM上某一个元素是否发生变化,利用MutationObserver来监听元素变化
查看>>
C语言开发情人节玫瑰
查看>>
平安校园,平安考勤,让关爱无处不在
查看>>
【调试】调试利器之web前端开发必知控制台命令
查看>>
红米手机4X怎么刷入开发版启用ROOT权限
查看>>
零基础学习大数据的路线和方向
查看>>