taskscheduler是什么(taskscheduler服务)
TaskScheduler的核心任务时提交的TaskSet到集群运算并汇报结果
为TaskSet创建和维护一个TaskSetManager,并跟踪任务的本定性以及错误信息
遇到Straggle任务时,会放到其他节点进行重试。
向DAGScheduler汇报执行情况,包括Shuffle输出丢失的时候报告fetch failed错误等信息。
TaskScheduler源码,位于
org.apache.spark.scheduler.TaskScheduler
/** * Low-level task scheduler interface, currently implemented exclusively by * [[org.apache.spark.scheduler.TaskSchedulerImpl]]. * This interface allows plugging in different task schedulers. Each TaskScheduler schedules tasks * for a single SparkContext. These schedulers get sets of tasks submitted to them from the * DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running * them, retrying if there are failures, and mitigating stragglers. They return events to the * DAGScheduler. */ /** * TaskScheduler任务调度接口。从每一个Stage中的DAGScheduler中获取TaskSet,运行他们,尝试是否有故障。 * DAGScheduler是高层调度,计算每个Job的Stage的DAG, * 然后提交Stage,用TaskSets的形式启动底层TaskScheduler调度在集群中运行。 */ private[spark] trait TaskScheduler { private val appId = "spark-application-" + System.currentTimeMillis def rootPool: Pool def schedulingMode: SchedulingMode def start(): Unit // Invoked after system has successfully initialized (typically in spark context). // Yarn uses this to bootstrap allocation of resources based on preferred locations, // wait for slave registrations, etc. def postStartHook() { } // Disconnect from the cluster. def stop(): Unit // Submit a sequence of tasks to run. // 提交要运行的任务序列 def submitTasks(taskSet: TaskSet): Unit // Cancel a stage. def cancelTasks(stageId: Int, interruptThread: Boolean): Unit /** * Kills a task attempt. * * @return Whether the task was successfully killed. */ def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called. def setDAGScheduler(dagScheduler: DAGScheduler): Unit // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs. def defaultParallelism(): Int /** * Update metrics for in-progress tasks and let the master know that the BlockManager is still * alive. Return true if the driver knows about the given block manager. Otherwise, return false, * indicating that the block manager should re-register. */ def executorHeartbeatReceived( execId: String, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean /** * Get an application ID associated with the job. * * @return An application ID */ def applicationId(): String = appId /** * Process a lost executor */ def executorLost(executorId: String, reason: ExecutorLossReason): Unit /** * Get an application's attempt ID associated with the job. * * @return An application's Attempt ID */ def applicationAttemptId(): Option[String] }
TaskScheduler原理剖析
DAGScheduler将划分的一系列Stage,按照Stage的先后顺序依次提交给底层的TaskScheduler取执行。在SparkContext实例化的时候,TaskScheduler以及SchedulerBackend就已经在SparkContext的createTaskScheduler中创建实例对象了。
虽然Spark支持多种部署模式,但是底层调度器TaskScheduler接口的实现类都是TaskSchedulerImpl。对于SchedulerBackend的实现,只专注与Standalone部署模式下的具体实现
StandaloneSchedulerBackend来做分析。
TaskSchedulerImpl在createTaskScheduler方法实例化后,就立即调用自己的initialize方法把
StandaloneSchedulerBackend的实例对象传进去,赋值给TaskSchedulerImpl的backend。在initialize方法中,根据调度模式的配置创建实现了SchedulableBuilder接口的相应的实例对象,并且创建的对象会立即调用buildPools创建调度池存放和管理TaskSetManager的实例对象。实现SchedulableBuilder接口的具体类都是SchedulableBuilder的内部类。
FIFOSchedulableBuilder:调度模式是SchedulingMode.FIFO,使用先进先出策略调度。先进先出(FIFO)为默认模式。在给模式下只有一个TaskSetManager池。
FairSchedulableBuilder:调度模式是SchedulingMode.FAIR,使用公平策略调度。
在createTaskScheduler方法返回后,TaskSchedulerImpl通过DAGScheduler的实例化过程设置DAGScheduler的实例对象。然后调用自己的start方法。在TaskSchedulerImpl调用start方法的时候,会调用
StandaloneSchedulerBackend的start方法,在StandaloneSchedulerBackend的start方法中,会最终注册应用程序AppClient。TaskSchedulerImpl的start方法中还会根据配置判断是否周期性地检查任务的推测执行。
TaskSchedulerImpl启动后,就可以接受DAGScheduler的submitMissingTasks方法提交过来的TaskSet进行京一步处理。TaskSchedulerImpl在submitTasks中初始化一个TaskSetManager,对其生命周期进行管理,当TaskSchedulerImpl得到Worker节点上的Executor计算资源的时候,会通TaskSetManager发送具体的Task到Executor上执行计算。
如果Task执行过程中有错误导致失败,会调用TaskSetManager来处理Task失败的情况,进而通知DAGScheduler结束当前Task。TaskSetManager会将失败的Task再次添加到待执行Task队列中。Spark Task允许执行失败的次数默认是4次,在TaskSchedulerImpl初始化的时候,通过spark.task.maxFailures设置该值。
如果Task执行完毕,执行的结果会反馈给TaskSetManager,由TaskSetManager通知DAGScheduler,DAGScheduler根据是否还存在待执行的Stage,继续迭代提交相应的TaskSet给TaskScheduler取执行,或者输出Job的结果。
原文地址:https://tangjiusheng.cn/it/434.html