本文共 4498 字,大约阅读时间需要 14 分钟。
driver端调用launchTasks来向worker节点中的executor发送启动任务命令spark-master\core\src\main\scala\org\apache\spark\scheduler\cluster\CoarseGrainedSchedulerBackend.scala private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { #向executor发送启动任务命令 executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) }./core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala override def receive: PartialFunction[Any, Unit] = { case RegisteredExecutor => logInfo("Successfully registered with driver") try { executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) } catch { case NonFatal(e) => exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) } case LaunchTask(data) =>#处理driver端发送过来的LaunchTask命令。executor为null的话,直接退出 if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else {#拿到task的描述 val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId)#启动任务的运行 executor.launchTask(this, taskDesc) }} def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {#创建新的TaskRunner,然后将tr降到线程池中运行 val tr = new TaskRunner(context, taskDescription) runningTasks.put(taskDescription.taskId, tr) threadPool.execute(tr) }override def run(): Unit = { try { // Run the actual task and measure its runtime. taskStartTime = System.currentTimeMillis() taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L var threwException = true val value = try {#执行task的run方法 val res = task.run( taskAttemptId = taskId, attemptNumber = taskDescription.attemptNumber, metricsSystem = env.metricsSystem) threwException = false } }spark-master\core\src\main\scala\org\apache\spark\scheduler\Task.scala */ final def run( taskAttemptId: Long, attemptNumber: Int, metricsSystem: MetricsSystem): T = { SparkEnv.get.blockManager.registerTask(taskAttemptId) // TODO SPARK-24874 Allow create BarrierTaskContext based on partitions, instead of whether // the stage is barrier. TaskContext.setTaskContext(context) taskThread = Thread.currentThread() if (_reasonIfKilled != null) { kill(interruptThread = false, _reasonIfKilled) } try {#调用runtask方法执行,不同的任务其实现不同 runTask(context) } catch这里的task 分为两类,一类是ResultTask,另一类是shufflemaptaskspark-master\core\src\main\scala\org\apache\spark\scheduler\ResultTask.scala这里以ResultTask为例override def runTask(context: TaskContext): U = { // Deserialize the RDD and the func using the broadcast variables. val threadMXBean = ManagementFactory.getThreadMXBean val deserializeStartTime = System.currentTimeMillis() val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L val ser = SparkEnv.get.closureSerializer.newInstance()#反序列化 val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L#执行rdd.iterator完成计算任务 func(context, rdd.iterator(partition, context)) }再来看看shufflemaptaskspark-master\core\src\main\scala\org\apache\spark\scheduler\ShuffleMapTask.scala override def runTask(context: TaskContext): MapStatus = { // Deserialize the RDD using the broadcast variable. val threadMXBean = ManagementFactory.getThreadMXBean#反序列化rdd val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) var writer: ShuffleWriter[Any, Any] = null try {#根据shuffleManager得到writer,然后将rdd写入 val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch }
转载地址:http://bsnmi.baihongyu.com/