4.4.2 Client、Master和Worker间的通信

在Standalone模式下,存在以下角色。

·Client:提交作业。

·Master:接收作业,启动Driver和Executor,管理Worker。

·Worker:管理节点资源,启动Driver和Executor。

1.模块间的主要消息

这里结合图4-15列出了各个模块之间传递的主要消息及其作用:

4.4.2 Client、Master和Worker间的通信 - 图1

图4-15 Spark通信模型

(1)Client to Master

RegisterApplication:注册应用。

(2)Master to Client

·RegisteredApplication:注册应用后,回复给Client。

·ExecutorAdded:通知Client Worker已经启动了Executor,当向Worker发送Launch-Executor时,通知Client Actor。

·ExecutorUpdated:通知Client Executor状态已更新。

(3)Master to Worker

·LaunchExecutor:启动Executor。

·RegisteredWorker:Worker注册的回复。

·RegisterWorkerFailed:注册Worker失败的回复。

·KillExecutor:停止Executor进程。

(4)Worker to Master

·RegisterWorker:注册Worker。

·Heartbeat:周期性地Master发送心跳信息。

·ExecutorStateChanged:通知Master,Executor状态更新。

2.主要的通信逻辑

Actor之间,消息发送端通过“!”符号发送消息,接收端通过receive方法中的case模式匹配接收和处理消息。下面通过源码介绍Client、Master、Worker这3个Actor的主要通信接收逻辑。

(1)Client Actor通信代码逻辑

Client Actor通信代码逻辑如下所示。


  1. private class ClientActordriverArgs ClientArguments conf SparkConf extends Actor with Logging {
  2. ……
  3. override def preStart() = {
  4. masterActor = context.actorSelectionMaster.toAkkaUrldriverArgs.master))
  5. ……
  6. driverArgs.cmd match {
  7. case "launch" =>
  8. ……
  9. /*在这段代码向Master 的Actor提交Driver*/
  10. masterActor RequestSubmitDriverdriverDescription
  11. ……
  12. }
  13. }
  14. ……
  15. override def receive = {
  16. /*接收Master命令在Worker创建Driver成功与否的消息*/
  17. case SubmitDriverResponsesuccess driverId message =>
  18. printlnmessage
  19. if success pollAndReportStatusdriverId.get else System.exit(-1
  20. /*接收终止Driver成功与否的通知*/
  21. case KillDriverResponsedriverId success message =>
  22. printlnmessage
  23. if success pollAndReportStatusdriverId else System.exit(-1
  24. ……
  25. }
  26. }

(2)Master Actor通信代码逻辑

Master Actor通信代码逻辑如下所示。


  1. private[spark] class Master
  2. host String
  3. port Int
  4. webUiPort Int
  5. val securityMgr SecurityManager
  6. extends Actor with Logging {
  7. ……
  8. override def receive = {
  9. /*选举为Master,并判断该Master的State为RecoveryState.RECOVERING,恢复beginRecovery*/
  10. case ElectedLeader => {
  11. ……
  12. }
  13. /*完成恢复*/
  14. case CompleteRecovery => completeRecovery()
  15. ……
  16. /*注册Worker*/
  17. case RegisterWorkerid workerHost workerPort cores memory workerUiPort
  18. publicAddress =>
  19. {
  20. ……
  21. /*如果Master的状态为RecoveryState.STANDBY,则不对Worker进行注册*/
  22. if state == RecoveryState.STANDBY {
  23. } else if idToWorker.containsid)) {
  24. /*该Worker已经注册,通知Worker不能重复注册*/
  25. sender RegisterWorkerFailed"Duplicate worker ID"
  26. } else {
  27. val worker = new WorkerInfoid workerHost workerPort cores memory
  28. sender workerUiPort publicAddress
  29. if registerWorkerworker)) {
  30. persistenceEngine.addWorkerworker
  31. /*Worker注册成功,通知Worker*/
  32. sender RegisteredWorkermasterUrl masterWebUiUrl
  33. schedule()
  34. } else {
  35. ……
  36. /*Worker注册失败,通知Worker*/
  37. sender RegisterWorkerFailed"Attempted to re-register worker at same
  38. address: "+ workerAddress
  39. }
  40. }
  41. }
  42. case RequestSubmitDriverdescription => {
  43. /*如果Master的状态为ALIVE,则提交Driver,否则通知Client无法提交*/
  44. if state != RecoveryState.ALIVE {
  45. val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state."
  46. sender SubmitDriverResponsefalse None msg
  47. } else {
  48. ……
  49. /*提交Driver*/
  50. sender SubmitDriverResponsetrue Somedriver.id),
  51. s"Driver successfully submitted as ${driver.id}"
  52. }
  53. }
  54. case RequestKillDriverdriverId => {
  55. if state != RecoveryState.ALIVE {
  56. /*如果Master状态不是ALIVE,则通知请求者无法终止Driver*/
  57. val msg = s"Can only kill drivers in ALIVE state. Current state: $state."
  58. sender KillDriverResponsedriverId success = false msg
  59. } else {
  60. logInfo"Asked to kill driver " + driverId
  61. val driver = drivers.find_.id == driverId
  62. driver match {
  63. case Somed =>
  64. if waitingDrivers.containsd)) {
  65. /*如果请求终止的Driver在等待队列,则从队列中删除Driver并更新Driver状态为KILLED*/
  66. waitingDrivers -= d
  67. self DriverStateChangeddriverId DriverState.KILLED None
  68. } else {
  69. /*通知所有Worker,查看Worker上是否运行着需要被终止运行的Driver进程,如果存
  70. 在则终止相应进程*/
  71. d.worker.foreach { w =>
  72. w.actor KillDriverdriverId
  73. }
  74. }
  75. val msg = s"Kill request for $driverId submitted"
  76. logInfomsg
  77. sender KillDriverResponsedriverId success = true msg
  78. case None =>
  79. // 通知请求者,请求被终止运行的Driver已经被终止运行或者不存在
  80. ……
  81. }
  82. }
  83. }
  84. case RequestDriverStatusdriverId => {
  85. /*请求查找指定Driver的状态,如果找到,则返回相应的状态*/
  86. ……
  87. }
  88. case RegisterApplicationdescription => {
  89. if state == RecoveryState.STANDBY {
  90. } else {
  91. /*如果Master的状态不为STANDBY,则创建并注册Application,并通知请求者*/
  92. logInfo"Registering app " + description.name
  93. val app = createApplicationdescription sender
  94. registerApplicationapp
  95. logInfo"Registered app " + description.name + " with ID " + app.id
  96. persistenceEngine.addApplicationapp
  97. sender RegisteredApplicationapp.id masterUrl
  98. schedule()
  99. }
  100. }
  101. case ExecutorStateChangedappId execId state message exitStatus => {
  102. /*通过元数据映射,获取到Executor,然后通知使用这个Executor的Driver更新Executor状
  103. 态。如果执行完,则移除Executor,如果异常退出,则移除Application*/
  104. ……
  105. }
  106. case DriverStateChangeddriverId state exception => {
  107. /*当Driver的state为ERROR | FINISHED | KILLED | FAILED时,删除这个Driver*/
  108. }
  109. }
  110. case HeartbeatworkerId => {
  111. idToWorker.getworkerId match {
  112. case SomeworkerInfo =>
  113. /*更新Worker的最近心跳时间为最新时间*/
  114. workerInfo.lastHeartbeat = System.currentTimeMillis()
  115. ……
  116. }
  117. }
  118. case MasterChangeAcknowledgedappId => {
  119. ……
  120. /*将指定的App状态置为WAITING,为下一步切换Master做准备*/
  121. app.state = ApplicationState.WAITING
  122. ……
  123. }
  124. case WorkerSchedulerStateResponseworkerId executors driverIds => {
  125. /*如果找到指定的Worker,则将Worker的状态置为ALIVE,并且查找对应App状态为idDefined的Executors,将这些executors都加入app中,然后保存这些Exectutor信息到Worker中,并将DriverIds中的Driver加入这个Worker中*/
  126. ……
  127. }
  128. case DisassociatedEvent_ address _ => {
  129. /*Worker或者Application发送请求,删除请求的Worker*/
  130. addressToWorker.getaddress).foreachremoveWorker
  131. /*删除请求的应用*/
  132. addressToApp.getaddress).foreachfinishApplication
  133. /*如果满足条件,则终止恢复*/
  134. if state == Recov· eryState.RECOVERING && canCompleteRecovery { completeRecovery() }
  135. }
  136. case RequestMasterState => {
  137. /*向请求者返回Master状态*/
  138. sender MasterStateResponsehost port workers.toArray apps.toArray completedApps.toArray
  139. drivers.toArray completedDrivers.toArray state
  140. }
  141. case CheckForWorkerTimeOut => {
  142. /*检查并删除超时Worker*/
  143. }
  144. case RequestWebUIPort => {
  145. /*向请求者返回Web UI的端口号*/
  146. }
  147. }
  148. ……
  149. }

3.Worker Actor的消息处理逻辑

Worker Actor的消息处理逻辑,将通过下面的代码分析进行介绍。


  1. private[spark] class Worker
  2. host String
  3. port Int
  4. webUiPort Int
  5. cores Int
  6. memory Int
  7. masterUrls Array[String],
  8. actorSystemName String
  9. actorName String
  10. workDirPath String = null
  11. val conf SparkConf
  12. val securityMgr SecurityManager
  13. extends Actor with Logging {
  14. import context.dispatcher
  15. ……
  16. override def receive = {
  17. case RegisteredWorkermasterUrl masterWebUiUrl =>
  18. /*Worker收到Master传回的注册成功消息,然后Worker配置对应的Master*/
  19. ……
  20. case SendHeartbeat =>
  21. /*收到主节点消息后,向主节点发送心跳,证明本Worker存活*/
  22. masterLock.synchronized {
  23. if connected { master HeartbeatworkerId }
  24. }
  25. case WorkDirCleanup =>
  26. /*启动一个独立的线程去清理旧应用的目录和文件*/
  27. val cleanupFuture = concurrent.future {
  28. logInfo"Cleaning up oldest application directories in " + workDir + " ..."
  29. Utils.findOldFilesworkDir APP_DATA_RETENTION_SECS
  30. .foreachUtils.deleteRecursively
  31. }
  32. ……
  33. case MasterChangedmasterUrl masterWebUiUrl =>
  34. /*当选举出新的Master时,Worker更新Master节点URL等信息*/
  35. logInfo"Master has changed, new master is at " + masterUrl
  36. changeMastermasterUrl masterWebUiUrl
  37. ……
  38. /*从Driver节点接收心跳消息*/
  39. case Heartbeat =>
  40. ……
  41. /*在主节点注册Worker失败*/
  42. case RegisterWorkerFailedmessage =>
  43. /*启动Executor*/
  44. case LaunchExecutormasterUrl appId execId appDesc cores_ memory_ =>
  45. /*启动Executor进程*/
  46. if masterUrl != activeMasterUrl {
  47. logWarning"Invalid Master (" + masterUrl + ") attempted to launch executor."
  48. } else {
  49. try {
  50. logInfo"Asked to launch executor %s/%d for %s".formatappId execId appDesc.name))
  51. val manager = new ExecutorRunnerappId execId appDesc cores_ memory_
  52. self workerId host
  53. appDesc.sparkHome.mapuserSparkHome => new FileuserSparkHome)).getOrElsesparkHome),
  54. workDir akkaUrl conf ExecutorState.RUNNING
  55. /*对元数据进行更新*/
  56. executorsappId + "/" + execId = manager
  57. manager.start()
  58. coresUsed += cores_
  59. memoryUsed += memory_
  60. masterLock.synchronized {
  61. master ExecutorStateChangedappId execId manager.state None None
  62. }
  63. } catch {
  64. case e Exception => {
  65. logError"Failed to launch executor %s/%d for %s".formatappId execId appDesc.name))
  66. if executors.containsappId + "/" + execId)) {
  67. executorsappId + "/" + execId).kill()
  68. executors -= appId + "/" + execId
  69. }
  70. masterLock.synchronized {
  71. master ExecutorStateChangedappId execId ExecutorState.FAILED
  72. None None
  73. }
  74. }
  75. }
  76. }
  77. /*Executor状态更新*/
  78. case ExecutorStateChangedappId execId state message exitStatus =>
  79. masterLock.synchronized {
  80. /*同步通知Master节点,Executor状态进行了更新*/
  81. master ExecutorStateChangedappId execId state message exitStatus
  82. }
  83. val fullId = appId + "/" + execId
  84. if ExecutorState.isFinishedstate)) {
  85. executors.getfullId match {
  86. /*如果Executor完成工作,则释放资源*/
  87. case Someexecutor =>
  88. logInfo"Executor " + fullId + " finished with state " + state +
  89. message.map" message " + _).getOrElse"" +
  90. exitStatus.map" exitStatus " + _).getOrElse""))
  91. executors -= fullId
  92. finishedExecutorsfullId = executor
  93. coresUsed -= executor.cores
  94. memoryUsed -= executor.memory
  95. ……
  96. }
  97. }
  98. /*终止Executor*/
  99. case KillExecutormasterUrl appId execId =>
  100. /*终止本Worker节点上运行的Executor*/
  101. ……
  102. executors.getfullId match {
  103. case Someexecutor =>
  104. logInfo"Asked to kill executor " + fullId
  105. executor.kill()
  106. ……
  107. }
  108. }
  109. /*启动Driver*/
  110. case LaunchDriverdriverId driverDesc => {
  111. /*接收Master节点命令,启动Driver*/
  112. logInfos"Asked to launch driver $driverId"
  113. val driver = new DriverRunnerdriverId workDir sparkHome driverDesc self akkaUrl
  114. driversdriverId = driver
  115. driver.start()
  116. coresUsed += driverDesc.cores
  117. memoryUsed += driverDesc.mem
  118. }
  119. case KillDriverdriverId => {
  120. /*终止本Worker节点正在运行的Driver*/
  121. ……
  122. }
  123. /*Driver状态更新*/
  124. case DriverStateChangeddriverId state exception => {
  125. /*如果Worker上运行的Apllication的Driver状态为错误异常或者正常结束等,则打印相应的
  126. 状态日志,通知主节点并移除Driver*/
  127. state match {
  128. case DriverState.ERROR =>
  129. logWarnings"Driver $driverId failed with unrecoverable exception:
  130. ${exception.get}"
  131. case DriverState.FAILED =>
  132. logWarnings"Driver $driverId exited with failure"
  133. case DriverState.FINISHED =>
  134. logInfos"Driver $driverId exited successfully"
  135. case DriverState.KILLED =>
  136. logInfos"Driver $driverId was killed by user"
  137. case _ =>
  138. logDebugs"Driver $driverId changed state to $state"
  139. }
  140. masterLock.synchronized {
  141. master DriverStateChangeddriverId state exception
  142. }
  143. val driver = drivers.removedriverId).get
  144. finishedDriversdriverId = driver
  145. memoryUsed -= driver.driverDesc.mem
  146. coresUsed -= driver.driverDesc.cores
  147. }
  148. case x DisassociatedEvent if x.remoteAddress == masterAddress =>
  149. /*与主节点失去连接,更新connected状态为false,等待Master重新连接*/
  150. logInfos"$x Disassociated !"
  151. masterDisconnected()
  152. case RequestWorkerState => {
  153. /*向请求者返回本Worker的目前状态信息*/
  154. sender WorkerStateResponsehost port workerId executors.values.toList
  155. finishedExecutors.values.toList drivers.values.toList
  156. finishedDrivers.values.toList activeMasterUrl cores memory
  157. coresUsed memoryUsed activeMasterWebUiUrl
  158. }
  159. }
  160. ……
  161. }