Spark大数据商业实战三部曲:内核解密、商业案例、性能调优(第2版)
上QQ阅读APP看书,第一时间看更新

6.4 从Application提交的角度重新审视Executor

本节从Application提交的角度重新审视Executor,彻底解密Executor到底是什么时候启动的,以及Executor如何把结果交给Application。

6.4.1 Executor到底是什么时候启动的

SparkContext启动后,StandaloneSchedulerBackend中会调用new()函数创建一个StandaloneAppClient,StandaloneAppClient中有一个名叫ClientEndPoint的内部类,在创建ClientEndpoint时会传入Command来指定具体为当前应用程序启动的Executor进行的入口类的名称为CoarseGrainedExecutorBackend。ClientEndPoint继承自ThreadSafeRpcEndpoint,其通过RPC机制完成和Master的通信。在ClientEndPoint的start方法中,会通过registerWithMaster方法向Master发送RegisterApplication请求,Master收到该请求消息后,首先通过registerApplication方法完成信息登记,之后将会调用schedule方法,在Worker上启动Executor。Master对RegisterApplication请求处理源码如下:

在上面的代码中,Master匹配到RegisterApplication请求,先判断Master的状态是否为STANDBY(备用)状态,如果不是,说明Master为ALIVE状态,在这种状态下调用createApplication(description,sender)方法创建ApplicationInfo,完成之后调用persistenceEngine.addApplication(app)方法,将新创建的ApplicationInfo持久化,以便错误恢复。完成这两步操作后,通过driver.send(RegisteredApplication(app.id, self))向StandaloneAppClient返回注册成功后ApplicationInfo的Id和master的url地址。

ApplicationInfo对象是对application的描述,下面先来看createApplication方法的源码。

Master.scala的源码如下:

上面的代码中,createApplication方法接收ApplicationDescription和ActorRef两种类型的参数,并调用newApplicationId方法生成appId,关键代码如下:

由代码所决定,appid的格式形如:app-20160429101010-0001。desc对象中包含一些基本的配置,包括从系统中传入的一些配置信息,如appname、maxCores、memoryPerExecutorMB等。最后使用desc、date、driver、defaultCores等作为参数构造一个ApplicatiOnInfo对象并返回。函数返回之后,调用registerApplication方法,完成application的注册,该方法是如何完成注册的?

Spark 2.2.1版本的Master.scala的源码如下:

Spark 2.4.3版本的Master.scala源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第22~24行删掉。

上面的代码中,首先通过app.driver.path.address得到Driver的地址,然后查看appAddress映射表中是否已经存在这个路径,如果存在,表示该application已经注册,直接返回;如果不存在,则在waitingApps数组中加入该application,同时在idToApp、endpointToApp、addressToApp映射表中加入映射关系。加入waitingApps数组中的application等待schedule方法的调度。

schedule方法有两个作用:第一,完成Driver的调度,将waitingDrivers数组中的Driver发送到满足条件的Worker上运行;第二,在满足条件的Worker节点上为application启动Executor。

Master.scala的schedule方法的源码如下:

在Master中,schedule方法是一个很重要的方法,每一次新的Driver的注册、application的注册,或者可用资源发生变动,都将调用schedule方法。schedule方法用于为当前等待调度的application调度可用的资源,在满足条件的Worker节点上启动Executor。这个方法还有另外一个作用,就是当有Driver提交的时候,负责将Driver发送到一个可用资源满足Driver需求的Worker节点上运行。launchDriver(worker,driver)方法负责完成这一任务。

application调度成功之后,Master将会为application在Worker节点上启动Executors,调用startExecutorsOnWorkers方法完成此操作。

在scheduleExecutorsOnWorkers方法中,有两种启动Executor的策略:第一种是轮流均摊策略(round-robin),采用圆桌算法依次轮流均摊,直到满足资源需求,轮流均摊策略通常会有更好的数据本地性,因此它是默认的选择策略;第二种是依次全占,在usableWorkers中,依次获取每个Worker上的全部资源,直到满足资源需求。

scheduleExecutorsOnWorkers方法为application分配好逻辑意义上的资源后,还不能真正在Worker节点为application分配出资源,需要调用动作函数为application真正地分配资源。allocateWorkerResourceToExecutors方法的调用,将会在Worker节点上实际分配资源。下面是allocateWorkerResourceToExecutors的源码。

Master.scala的源码如下:

上面代码调用了launchExecutor(worker,exec)方法,这个方法有两个参数:第一个参数是满足条件的WorkerInfo信息;第二个参数是描述Executor的ExecutorDesc对象。这个方法将会向Worker节点发送LaunchExecutor的请求,Worker节点收到该请求之后,将会负责启动Executor。launchExecutor方法代码清单如下所示。

Master.scala的源码如下:

launchExecutor有两个参数,第一个参数是worker:WorkerInfo,代表Worker的基本信息;第二个参数是exec:ExecutorDesc,这个参数保存了Executor的基本配置信息,如memory、cores等。此方法中有worker.endpoint.send(LaunchExecutor(...)),向Worker发送LaunchExecutor请求,Worker收到该请求之后,将会调用方法启动Executor。

向Worker发送LaunchExecutor消息的同时,通过exec.application.driver.send(ExecutorAdded(…))向Driver发送ExecutorAdded消息,该消息为Driver反馈Master都在哪些Worker上启动了Executor,Executor的编号是多少,为每个Executor分配了多少个核,多大的内存以及Worker的联系hostport等消息。

Worker收到LaunchExecutor消息会做相应的处理。首先判断传过来的masterUrl是否和activeMasterUrl相同,如果不相同,说明收到的不是处于ALIVE状态的Master发送过来的请求,这种情况直接打印警告信息。如果相同,则说明该请求来自ALIVE Master,于是为Executor创建工作目录,创建好工作目录之后,使用appid、execid、appDes等参数创建ExecutorRunner。顾名思义,ExecutorRunner是Executor运行的地方,在ExecutorRunner中有一个工作线程,这个线程负责下载依赖的文件,并启动CoarseGaindExecutorBackend进程,该进程单独在一个JVM上运行。下面是ExecutorRunner中的线程启动的源码。

ExecutorRunner.scala的源码如下:

上面代码中定义了一个Thread,这个Thread的run方法中调用fetchAndRunExecutor方法,fetchAndRunExecutor负责以进程的方式启动ApplicationDescription中携带的org.apache.spark.executor.CoarseGrainedExecutorBackend进程。

其中,fetchAndRunExecutor方法中的CommandUtils.buildProcessBuilder(appDesc.command,传入的入口类是"org.apache.spark.executor.CoarseGrainedExecutorBackend",当Worker节点中启动ExecutorRunner时,ExecutorRunner中会启动CoarseGrainedExecutorBackend进程,在CoarseGrainedExecutorBackend的onStart方法中,向Driver发出RegisterExecutor注册请求。

CoarseGrainedExecutorBackend的onStart方法的源码如下:

Driver端收到注册请求,将会注册Executor的请求。

CoarseGrainedSchedulerBackend.scala的receiveAndReply方法的源码如下:

如上面代码所示,Driver向CoarseGrainedExecutorBackend发送RegisteredExecutor消息,CoarseGrainedExecutorBackend收到RegisteredExecutor消息后将会新建一个Executor执行器,并为此Executor充当信使,与Driver通信。CoarseGrainedExecutorBackend收到RegisteredExecutor消息的源码如下所示。

CoarseGrainedExecutorBackend.scala的receive的源码如下:

从上面的代码中可以看到,CoarseGrainedExecutorBackend收到RegisteredExecutor消息后,将会新创建一个org.apache.spark.executor.Executor对象,至此Executor创建完毕。

6.4.2 Executor如何把结果交给Application

CoarseGrainedExecutorBackend给DriverEndpoint发送StatusUpdate传输执行结果,DriverEndpoint会把执行结果传递给TaskSchedulerImpl处理,然后交给TaskResultGetter内部通过线程分别处理Task执行成功和失败的不同情况,然后告诉DAGScheduler任务处理结束的状况。

CoarseGrainedSchedulerBackend.scala中DriverEndpoint的receive方法的源码如下。

DriverEndpoint的receive方法中的StatusUpdate调用scheduler.statusUpdate,然后释放资源,再次进行资源调度makeOffers(executorId)。

TaskSchedulerImpl的statusUpdate中:

 如果是TaskState.LOST,则记录原因,将Executor清理掉。

 如果是TaskState.isFinished,则从taskSet中运行的任务中清除掉,调用taskResultGetter.enqueueSuccessfulTask处理。

 如果是TaskState.FAILED、TaskState.KILLED、TaskState.LOST,调用taskResultGetter.enqueueFailedTask处理。