博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark job提交6
阅读量:4216 次
发布时间:2019-05-26

本文共 4498 字,大约阅读时间需要 14 分钟。

driver端调用launchTasks来向worker节点中的executor发送启动任务命令spark-master\core\src\main\scala\org\apache\spark\scheduler\cluster\CoarseGrainedSchedulerBackend.scala    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {	#向executor发送启动任务命令          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))            }./core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala  override def receive: PartialFunction[Any, Unit] = {    case RegisteredExecutor =>      logInfo("Successfully registered with driver")      try {        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)      } catch {        case NonFatal(e) =>          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)      }    case LaunchTask(data) =>#处理driver端发送过来的LaunchTask命令。executor为null的话,直接退出      if (executor == null) {        exitExecutor(1, "Received LaunchTask command but executor was null")      } else {#拿到task的描述        val taskDesc = TaskDescription.decode(data.value)        logInfo("Got assigned task " + taskDesc.taskId)#启动任务的运行        executor.launchTask(this, taskDesc)      }}  def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {#创建新的TaskRunner,然后将tr降到线程池中运行    val tr = new TaskRunner(context, taskDescription)    runningTasks.put(taskDescription.taskId, tr)    threadPool.execute(tr)  }override def run(): Unit = {      try {        // Run the actual task and measure its runtime.        taskStartTime = System.currentTimeMillis()        taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {          threadMXBean.getCurrentThreadCpuTime        } else 0L        var threwException = true        val value = try {#执行task的run方法          val res = task.run(            taskAttemptId = taskId,            attemptNumber = taskDescription.attemptNumber,            metricsSystem = env.metricsSystem)          threwException = false        } }spark-master\core\src\main\scala\org\apache\spark\scheduler\Task.scala  */  final def run(      taskAttemptId: Long,      attemptNumber: Int,      metricsSystem: MetricsSystem): T = {    SparkEnv.get.blockManager.registerTask(taskAttemptId)    // TODO SPARK-24874 Allow create BarrierTaskContext based on partitions, instead of whether    // the stage is barrier.       TaskContext.setTaskContext(context)    taskThread = Thread.currentThread()    if (_reasonIfKilled != null) {      kill(interruptThread = false, _reasonIfKilled)    }    try {#调用runtask方法执行,不同的任务其实现不同      runTask(context)    } catch这里的task 分为两类,一类是ResultTask,另一类是shufflemaptaskspark-master\core\src\main\scala\org\apache\spark\scheduler\ResultTask.scala这里以ResultTask为例override def runTask(context: TaskContext): U = {    // Deserialize the RDD and the func using the broadcast variables.    val threadMXBean = ManagementFactory.getThreadMXBean    val deserializeStartTime = System.currentTimeMillis()    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {      threadMXBean.getCurrentThreadCpuTime    } else 0L    val ser = SparkEnv.get.closureSerializer.newInstance()#反序列化    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {      threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime    } else 0L#执行rdd.iterator完成计算任务    func(context, rdd.iterator(partition, context))  }再来看看shufflemaptaskspark-master\core\src\main\scala\org\apache\spark\scheduler\ShuffleMapTask.scala override def runTask(context: TaskContext): MapStatus = {    // Deserialize the RDD using the broadcast variable.    val threadMXBean = ManagementFactory.getThreadMXBean#反序列化rdd    val ser = SparkEnv.get.closureSerializer.newInstance()    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)       var writer: ShuffleWriter[Any, Any] = null    try {#根据shuffleManager得到writer,然后将rdd写入      val manager = SparkEnv.get.shuffleManager      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])      writer.stop(success = true).get    } catch }

 

转载地址:http://bsnmi.baihongyu.com/

你可能感兴趣的文章
【屌丝程序的口才逆袭演讲稿50篇】第五篇:不要给自己找任何借口【张振华.Jack】
查看>>
【屌丝程序的口才逆袭演讲稿50篇】第七篇:请留意我们身边的风景 【张振华.Jack】
查看>>
【屌丝程序的口才逆袭演讲稿50篇】第八篇:坚持的力量 【张振华.Jack】
查看>>
【屌丝程序的口才逆袭演讲稿50篇】第九篇:春节那些事-过年回家不需要理由【张振华.Jack】
查看>>
【屌丝程序的口才逆袭演讲稿50篇】第十篇:程序员们请看看外面的世界吧【张振华.Jack】
查看>>
【屌丝程序的口才逆袭演讲稿50篇】第十一篇:马云乌镇40分钟演讲实录【张振华.Jack】
查看>>
Java并发编程从入门到精通 张振华.Jack --我的书
查看>>
【屌丝程序的口才逆袭演讲稿50篇】第十二篇:世界上最快的捷径【张振华.Jack】
查看>>
Android中Java代码和XML布局效率问题
查看>>
android TextView属性大全(转)
查看>>
Conclusion for Resource Management
查看>>
Conclusion for Constructors,Destructors,and Assignment Operators
查看>>
Conclusion for Accustoming Yourself to C++
查看>>
面试题1:赋值运算函数(offer)
查看>>
Mark : MessagePack简介及使用
查看>>
Mark : Hadoop Raid-实战经验总结
查看>>
Structured Streaming 实现思路与实现概述
查看>>
Apache Spark 2.2.0 中文文档 - Structured Streaming 编程指南 | ApacheCN
查看>>
Mark:大数据最佳学习路线
查看>>
Spark 多线程模型
查看>>