Lucianoesu 2020-05-10
源码解析
主构造函数代码
private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master)
createTaskScheduler,创建TaskSchedulerImpl和SparkDeploySchedulerBackend对象 TaskSchedulerImpl.initialize 创建一个Pool调度池TaskSchedulerImpl.start() SparkDeployShedulerBackend.start() 创建ApplicationDescreption,描叙Application需要多少内存,启动多少excecutor 创建AppClient,是一个actor registerWithMaster,向Master进行注册 然后waitForRegistrationDAGScheduler,实现了面向stage的调度的高层次调度。它会为每一个job计算一个stage的DAG(有向无环图),追踪RDD和stage的输出是否物化(写入磁盘和内存),寻找一个最小消耗来调度job。它将stage作为tasksets提交到底层的TaskSchedulerImpl上,来在集群上运行他们。除了处理stage的DAG,它还负责决定运行一个task的最佳运行位置,基于当前缓存的状态,将这些最佳运行位置提交给底层TaskSchedulerImpl。它还会处理由于shuffle输出文件导致的失败,在这种情况下旧的stage可能会被重新提交。一个stage内部的失败,如果不是由于shuffle文件丢失导致,会被TaskScheduler处理,它会多次重试一个task,直到最后实在不行,才取消task。dagScheduler = new DAGScheduler(this)
private[spark] val ui: Option[SparkUI] =if (conf.getBoolean("spark.ui.enabled", true)) {Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener,env.securityManager,appName)) } else {// For tests, do not enable the UINone }
private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master)