5.2 Worker启动原理和源码详解
本节讲解Worker启动原理和源码。对于Worker的部署启动,我们以Worker的脚本为入口点进行分析。
5.2.1 Worker启动的原理流程
Spark中各个组件是通过脚本启动部署的。Worker的部署以脚本为入口点开始分析。每个组件对应提供了启动的脚本,同时也会提供停止的脚本,停止脚本比较简单,在此仅分析启动的脚本。
部署Worker组件时,最简单的方式是通过配置Spark部署目录下的conf/slaves文件,然后以批量的方式启动集群中在该文件中列出的全部节点上的Worker实例。启动组件的命令如下所示:
1. ./sbin/start-slaves.sh
或者动态地在某个新增节点上(注意是新增节点,如果之前已经部署过,可以参考后面对启动多个实例的进一步分析)启动一个Worker实例,此时可以在该新增的节点上执行如下启动命令。
1. ./sbin/start-slave.sh MasterURL
其中,参数MasterURL表示当前集群中Master的监听地址,启动后Worker会通过该地址动态注册到Master组件,实现为集群动态添加Worker节点的目的。
下面是Worker部署脚本的解析。
部署脚本根据单个节点以及多个节点的Worker部署,对应有两个脚本:start-slave.sh和start-slaves.sh。其中,start-slave.sh负责在脚本执行节点启动一个Worker组件。start-slaves.sh脚本则会读取配置的conf/slaves文件,逐个启动集群中各个Slave节点上的Worker组件。
1.首先分析脚本start-slaves.sh
脚本start-slaves.sh提供了批量启动集群中各个Slave节点上的Worker组件的方法,即可以在配置好Slave节点(即配置好conf/slaves文件)后,通过该脚本一次性全部启动集群中的Worker组件。
脚本的代码如下:
其中,脚本slaves.sh通过ssh协议在指定的各个Slave节点上执行各种命令。
在ssh启动的start-slave.sh命令中,可以看到它的参数是"spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT",即启动slave节点上的Worker进程时,使用的Master URL的值是通过两个环境变量(SPARK_MASTER_IP和SPARK_MASTER_PORT)拼接而成的。
2.脚本start-slave.sh分析
从前面start-slaves.sh脚本的分析中可以看到,最终是在各个Slave节点上执行start-slave.sh脚本来部署Worker组件。对应地,就可以通过该脚本,动态地为集群添加新的Worker组件。
脚本的代码如下:
手动启动Worker实例时,如果需要在一个节点上部署多个Worker组件,则需要配置SPARK_WORKER_INSTANCES环境变量,否则多次启动脚本部署Worker组件时会报错,其原因在于spark-daemon.sh脚本的执行控制,这里给出关键代码的简单分析。
首先,脚本中带了实例是否已经运行的判断,代码如下:
其中,记录对应实例的PID的文件相关的代码如下:
从上面的分析可以看出,如果不是通过设置SPARK_WORKER_INSTANCES,然后一次性启动多个Worker实例,而是手动一个个地启动,对应的在脚本中每次启动时的实例编号都是1,在后台守护进程的spark-daemon.sh脚本中生成的pid就是同一个文件。因此,第二次启动时,pid文件已经存在,此时就会报错(对应停止时也是通过读取pid文件获取进程ID的,因此自动停止多个实例,也需要设置SPARK_WORKER_INSTANCES)。
5.2.2 Worker启动的源码详解
首先查看Worker伴生对象中的main方法,Spark 2.2.1版本的Worker.scala源码如下:
Spark 2.4.3版本的Worker.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第3行之后新增SSL_NODE_LOCAL_CONFIG_PATTERN的变量。
上段代码中第5行之后新增SparkUncaughtExceptionHandler的处理。
上段代码中第11行之后新增对外部shuffle服务的处理。
可以看到,Worker伴生对象中的main方法、格式和Master基本一致。通过参数的类型WorkerArguments来解析命令行参数。具体的代码解析可以参考Master节点部署时的MasterArguments的代码解析。
另外,MasterArguments中的printUsageAndExit方法,对应的就是命令行中的帮助信息。
解析完Worker的参数后,调用startRpcEnvAndEndpoint方法启动RPC通信环境以及Worker的RPC通信终端。该方法的代码解析可以参考Master节点部署时使用的同名方法的代码解析。
最终会实例化一个Worker对象。Worker也继承ThreadSafeRpcEndpoint,对应的也是一个RPC的通信终端,实例化该对象后会调用onStart方法,该方法的代码如下所示。
Spark 2.2.1版本的Worker.scala的源码如下:
Spark 2.4.3版本的Worker.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第11行shuffleService.startIfEnabled()代码替换为startExternalShuffleService()。
1. startExternalShuffleService()
其中,createWorkDir()方法对应构建了该Worker节点上的工作目录,后续在该节点上执行的Application相关信息都会存放在该目录下。
Worker.scala的createWorkDir的源码如下:
可以看到,如果workDirPath没有设置,默认使用的是sparkHome目录下的work子目录。对应的workDirPath在Worker实例化时传入,反推代码可以查到该变量在WorkerArguments中设置。相关代码有两处:一处在WorkerArguments的主构造体中,代码如下所示。
WorkerArguments.scala的源码如下:
1. if (System.getenv("SPARK_WORKER_DIR") != null) { 2. workDir = System.getenv("SPARK_WORKER_DIR") 3. }
即workDirPath由环境变量SPARK_WORKER_DIR设置。
另外一处在命令行选项解析时设置,代码如下所示。
WorkerArguments.scala的源码如下:
即workDirPath由启动Worker实例时传入的可选项--work-dir设置。属性配置:通常由命令可选项来动态设置启动时的配置属性,此时配置的优先级高于默认的属性文件以及环境变量中设置的属性。
启动Worker后一个关键的步骤就是注册到Master,对应的方法registerWithMaster()的代码如下所示。
Worker.scala的源码如下:
继续查看tryRegisterAllMasters方法,代码如下:
其中,sendRegisterMessageToMaster(masterEndpoint)向特定Master的RPC通信终端发送消息RegisterWorker。
Worker接收到反馈消息后,进一步调用handleRegisterResponse方法进行处理。对应的处理代码如下所示。
Worker.scala的源码如下:
分析到这一步,已经明确了注册以及对注册的反馈信息的处理细节。下面进一步分析注册重试定时器的相关处理。注册重试定时器会定期向Worker本身发送ReregisterWithMaster消息,因此可以在receive方法中查看该消息的处理,具体代码如下: