4.4.2 Client、Master和Worker间的通信
在Standalone模式下,存在以下角色。
·Client:提交作业。
·Master:接收作业,启动Driver和Executor,管理Worker。
·Worker:管理节点资源,启动Driver和Executor。
1.模块间的主要消息
这里结合图4-15列出了各个模块之间传递的主要消息及其作用:
图4-15 Spark通信模型
(1)Client to Master
RegisterApplication:注册应用。
(2)Master to Client
·RegisteredApplication:注册应用后,回复给Client。
·ExecutorAdded:通知Client Worker已经启动了Executor,当向Worker发送Launch-Executor时,通知Client Actor。
·ExecutorUpdated:通知Client Executor状态已更新。
(3)Master to Worker
·LaunchExecutor:启动Executor。
·RegisteredWorker:Worker注册的回复。
·RegisterWorkerFailed:注册Worker失败的回复。
·KillExecutor:停止Executor进程。
(4)Worker to Master
·RegisterWorker:注册Worker。
·Heartbeat:周期性地Master发送心跳信息。
·ExecutorStateChanged:通知Master,Executor状态更新。
2.主要的通信逻辑
Actor之间,消息发送端通过“!”符号发送消息,接收端通过receive方法中的case模式匹配接收和处理消息。下面通过源码介绍Client、Master、Worker这3个Actor的主要通信接收逻辑。
(1)Client Actor通信代码逻辑
Client Actor通信代码逻辑如下所示。
- private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging {
- ……
- override def preStart() = {
- masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))
- ……
- driverArgs.cmd match {
- case "launch" =>
- ……
- /*在这段代码向Master 的Actor提交Driver*/
- masterActor ! RequestSubmitDriver(driverDescription)
- ……
- }
- }
- ……
- override def receive = {
- /*接收Master命令在Worker创建Driver成功与否的消息*/
- case SubmitDriverResponse(success, driverId, message) =>
- println(message)
- if (success) pollAndReportStatus(driverId.get) else System.exit(-1)
- /*接收终止Driver成功与否的通知*/
- case KillDriverResponse(driverId, success, message) =>
- println(message)
- if (success) pollAndReportStatus(driverId) else System.exit(-1)
- ……
- }
- }
(2)Master Actor通信代码逻辑
Master Actor通信代码逻辑如下所示。
- private[spark] class Master(
- host: String,
- port: Int,
- webUiPort: Int,
- val securityMgr: SecurityManager)
- extends Actor with Logging {
- ……
- override def receive = {
- /*选举为Master,并判断该Master的State为RecoveryState.RECOVERING,恢复beginRecovery*/
- case ElectedLeader => {
- ……
- }
- /*完成恢复*/
- case CompleteRecovery => completeRecovery()
- ……
- /*注册Worker*/
- case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort,
- publicAddress) =>
- {
- ……
- /*如果Master的状态为RecoveryState.STANDBY,则不对Worker进行注册*/
- if (state == RecoveryState.STANDBY) {
- } else if (idToWorker.contains(id)) {
- /*该Worker已经注册,通知Worker不能重复注册*/
- sender ! RegisterWorkerFailed("Duplicate worker ID")
- } else {
- val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
- sender, workerUiPort, publicAddress)
- if (registerWorker(worker)) {
- persistenceEngine.addWorker(worker)
- /*Worker注册成功,通知Worker*/
- sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
- schedule()
- } else {
- ……
- /*Worker注册失败,通知Worker*/
- sender ! RegisterWorkerFailed("Attempted to re-register worker at same
- address: "+ workerAddress)
- }
- }
- }
- case RequestSubmitDriver(description) => {
- /*如果Master的状态为ALIVE,则提交Driver,否则通知Client无法提交*/
- if (state != RecoveryState.ALIVE) {
- val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state."
- sender ! SubmitDriverResponse(false, None, msg)
- } else {
- ……
- /*提交Driver*/
- sender ! SubmitDriverResponse(true, Some(driver.id),
- s"Driver successfully submitted as ${driver.id}")
- }
- }
- case RequestKillDriver(driverId) => {
- if (state != RecoveryState.ALIVE) {
- /*如果Master状态不是ALIVE,则通知请求者无法终止Driver*/
- val msg = s"Can only kill drivers in ALIVE state. Current state: $state."
- sender ! KillDriverResponse(driverId, success = false, msg)
- } else {
- logInfo("Asked to kill driver " + driverId)
- val driver = drivers.find(_.id == driverId)
- driver match {
- case Some(d) =>
- if (waitingDrivers.contains(d)) {
- /*如果请求终止的Driver在等待队列,则从队列中删除Driver并更新Driver状态为KILLED*/
- waitingDrivers -= d
- self ! DriverStateChanged(driverId, DriverState.KILLED, None)
- } else {
- /*通知所有Worker,查看Worker上是否运行着需要被终止运行的Driver进程,如果存
- 在则终止相应进程*/
- d.worker.foreach { w =>
- w.actor ! KillDriver(driverId)
- }
- }
- val msg = s"Kill request for $driverId submitted"
- logInfo(msg)
- sender ! KillDriverResponse(driverId, success = true, msg)
- case None =>
- // 通知请求者,请求被终止运行的Driver已经被终止运行或者不存在
- ……
- }
- }
- }
- case RequestDriverStatus(driverId) => {
- /*请求查找指定Driver的状态,如果找到,则返回相应的状态*/
- ……
- }
- case RegisterApplication(description) => {
- if (state == RecoveryState.STANDBY) {
- } else {
- /*如果Master的状态不为STANDBY,则创建并注册Application,并通知请求者*/
- logInfo("Registering app " + description.name)
- val app = createApplication(description, sender)
- registerApplication(app)
- logInfo("Registered app " + description.name + " with ID " + app.id)
- persistenceEngine.addApplication(app)
- sender ! RegisteredApplication(app.id, masterUrl)
- schedule()
- }
- }
- case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
- /*通过元数据映射,获取到Executor,然后通知使用这个Executor的Driver更新Executor状
- 态。如果执行完,则移除Executor,如果异常退出,则移除Application*/
- ……
- }
- case DriverStateChanged(driverId, state, exception) => {
- /*当Driver的state为ERROR | FINISHED | KILLED | FAILED时,删除这个Driver*/
- }
- }
- case Heartbeat(workerId) => {
- idToWorker.get(workerId) match {
- case Some(workerInfo) =>
- /*更新Worker的最近心跳时间为最新时间*/
- workerInfo.lastHeartbeat = System.currentTimeMillis()
- ……
- }
- }
- case MasterChangeAcknowledged(appId) => {
- ……
- /*将指定的App状态置为WAITING,为下一步切换Master做准备*/
- app.state = ApplicationState.WAITING
- ……
- }
- case WorkerSchedulerStateResponse(workerId, executors, driverIds) => {
- /*如果找到指定的Worker,则将Worker的状态置为ALIVE,并且查找对应App状态为idDefined的Executors,将这些executors都加入app中,然后保存这些Exectutor信息到Worker中,并将DriverIds中的Driver加入这个Worker中*/
- ……
- }
- case DisassociatedEvent(_, address, _) => {
- /*Worker或者Application发送请求,删除请求的Worker*/
- addressToWorker.get(address).foreach(removeWorker)
- /*删除请求的应用*/
- addressToApp.get(address).foreach(finishApplication)
- /*如果满足条件,则终止恢复*/
- if (state == Recov· eryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
- }
- case RequestMasterState => {
- /*向请求者返回Master状态*/
- sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray,
- drivers.toArray, completedDrivers.toArray, state)
- }
- case CheckForWorkerTimeOut => {
- /*检查并删除超时Worker*/
- }
- case RequestWebUIPort => {
- /*向请求者返回Web UI的端口号*/
- }
- }
- ……
- }
3.Worker Actor的消息处理逻辑
Worker Actor的消息处理逻辑,将通过下面的代码分析进行介绍。
- private[spark] class Worker(
- host: String,
- port: Int,
- webUiPort: Int,
- cores: Int,
- memory: Int,
- masterUrls: Array[String],
- actorSystemName: String,
- actorName: String,
- workDirPath: String = null,
- val conf: SparkConf,
- val securityMgr: SecurityManager)
- extends Actor with Logging {
- import context.dispatcher
- ……
- override def receive = {
- case RegisteredWorker(masterUrl, masterWebUiUrl) =>
- /*Worker收到Master传回的注册成功消息,然后Worker配置对应的Master*/
- ……
- case SendHeartbeat =>
- /*收到主节点消息后,向主节点发送心跳,证明本Worker存活*/
- masterLock.synchronized {
- if (connected) { master ! Heartbeat(workerId) }
- }
- case WorkDirCleanup =>
- /*启动一个独立的线程去清理旧应用的目录和文件*/
- val cleanupFuture = concurrent.future {
- logInfo("Cleaning up oldest application directories in " + workDir + " ...")
- Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS)
- .foreach(Utils.deleteRecursively)
- }
- ……
- case MasterChanged(masterUrl, masterWebUiUrl) =>
- /*当选举出新的Master时,Worker更新Master节点URL等信息*/
- logInfo("Master has changed, new master is at " + masterUrl)
- changeMaster(masterUrl, masterWebUiUrl)
- ……
- /*从Driver节点接收心跳消息*/
- case Heartbeat =>
- ……
- /*在主节点注册Worker失败*/
- case RegisterWorkerFailed(message) =>
- /*启动Executor*/
- case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
- /*启动Executor进程*/
- if (masterUrl != activeMasterUrl) {
- logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
- } else {
- try {
- logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
- val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
- self, workerId, host,
- appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
- workDir, akkaUrl, conf, ExecutorState.RUNNING)
- /*对元数据进行更新*/
- executors(appId + "/" + execId) = manager
- manager.start()
- coresUsed += cores_
- memoryUsed += memory_
- masterLock.synchronized {
- master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
- }
- } catch {
- case e: Exception => {
- logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
- if (executors.contains(appId + "/" + execId)) {
- executors(appId + "/" + execId).kill()
- executors -= appId + "/" + execId
- }
- masterLock.synchronized {
- master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
- None, None)
- }
- }
- }
- }
- /*Executor状态更新*/
- case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
- masterLock.synchronized {
- /*同步通知Master节点,Executor状态进行了更新*/
- master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
- }
- val fullId = appId + "/" + execId
- if (ExecutorState.isFinished(state)) {
- executors.get(fullId) match {
- /*如果Executor完成工作,则释放资源*/
- case Some(executor) =>
- logInfo("Executor " + fullId + " finished with state " + state +
- message.map(" message " + _).getOrElse("") +
- exitStatus.map(" exitStatus " + _).getOrElse(""))
- executors -= fullId
- finishedExecutors(fullId) = executor
- coresUsed -= executor.cores
- memoryUsed -= executor.memory
- ……
- }
- }
- /*终止Executor*/
- case KillExecutor(masterUrl, appId, execId) =>
- /*终止本Worker节点上运行的Executor*/
- ……
- executors.get(fullId) match {
- case Some(executor) =>
- logInfo("Asked to kill executor " + fullId)
- executor.kill()
- ……
- }
- }
- /*启动Driver*/
- case LaunchDriver(driverId, driverDesc) => {
- /*接收Master节点命令,启动Driver*/
- logInfo(s"Asked to launch driver $driverId")
- val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
- drivers(driverId) = driver
- driver.start()
- coresUsed += driverDesc.cores
- memoryUsed += driverDesc.mem
- }
- case KillDriver(driverId) => {
- /*终止本Worker节点正在运行的Driver*/
- ……
- }
- /*Driver状态更新*/
- case DriverStateChanged(driverId, state, exception) => {
- /*如果Worker上运行的Apllication的Driver状态为错误异常或者正常结束等,则打印相应的
- 状态日志,通知主节点并移除Driver*/
- state match {
- case DriverState.ERROR =>
- logWarning(s"Driver $driverId failed with unrecoverable exception:
- ${exception.get}")
- case DriverState.FAILED =>
- logWarning(s"Driver $driverId exited with failure")
- case DriverState.FINISHED =>
- logInfo(s"Driver $driverId exited successfully")
- case DriverState.KILLED =>
- logInfo(s"Driver $driverId was killed by user")
- case _ =>
- logDebug(s"Driver $driverId changed state to $state")
- }
- masterLock.synchronized {
- master ! DriverStateChanged(driverId, state, exception)
- }
- val driver = drivers.remove(driverId).get
- finishedDrivers(driverId) = driver
- memoryUsed -= driver.driverDesc.mem
- coresUsed -= driver.driverDesc.cores
- }
- case x: DisassociatedEvent if x.remoteAddress == masterAddress =>
- /*与主节点失去连接,更新connected状态为false,等待Master重新连接*/
- logInfo(s"$x Disassociated !")
- masterDisconnected()
- case RequestWorkerState => {
- /*向请求者返回本Worker的目前状态信息*/
- sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
- finishedExecutors.values.toList, drivers.values.toList,
- finishedDrivers.values.toList, activeMasterUrl, cores, memory,
- coresUsed, memoryUsed, activeMasterWebUiUrl)
- }
- }
- ……
- }