4.3.3 Spark块管理

RDD在逻辑上是按照Partition分块的,可以将RDD看成是一个分区作为数据项的分布式数组。这也是Spark在极力做到的一点,让编写分布式程序像编写单机程序一样简单。而物理上存储RDD是以Block为单位的,一个Partition对应一个Block,用Partition的ID通过元数据的映射到物理上的Block,而这个物理上的Block可以存储在内存,也可以存储在某个节点的Spark的硬盘临时目录,等等。

整体的I/O管理分为以下两个层次。

1)通信层:I/O模块也是采用Master-Slave结构来实现通信层的架构,Master和Slave之间传输控制信息、状态信息。

2)存储层:Spark的块数据需要存储到内存或者磁盘,有可能还需传输到远端机器,这些是由存储层完成的。

通过图4-10,可以大致了解整个Spark存储(Store)模块。下面从以下几个方面介绍存储模块。

1.实体和类

可以从以下几个维度理解整个存储系统。

(1)管理和接口

BlockManager:当其他模块要和storage模块进行交互时,storage模块提供了统一的操作类BlockManager,外部类与storage模块打交道都需要调用BlockManager相应接口来实现。

4.3.3 Spark块管理 - 图1

图4-10 Spark存储模块全景

(2)通信层

·BlockManagerMasterActor:从主节点创建,从节点通过这个Actor的引用向主节点传递消息和状态。

·BlockManagerSlaveActor:在从节点创建,主节点通过这个Actor的引用向从节点传递命令,控制从节点的块读写。

·BlockManagerMaster:对Actor通信进行管理。

(3)数据读写层

·DiskStore:提供Block在磁盘上以文件形式读写的功能。

·MemoryStore:提供Block在内存中的Block读写功能。

·ConnectionManager:提供本地机器和远端节点进行网络传输Block的功能。

·BlockManagerWorker:对远端数据的异步传输进行管理。

2.BlockManager中的通信

主节点和从节点之间通过Actor传送消息来传递命令和状态。

各个类在Master和Slave上所扮演的角色如图4-11所示。

4.3.3 Spark块管理 - 图2

图4-11 Spark存储模块通信

整体的数据存储通信仍相当于Master-Slave模型,节点之间传递消息和状态,Master节点负责总体控制,Slave节点接收命令、汇报状态。(补充介绍:Actor和ref是AKKA中两个不同的Actor引用。)

BlockManager的创建对于Master和Slave来说有所不同。

(1)Master端

BlockManagerMaster对象拥有BlockManagerMasterActor的actor引用以及所有BlockManagerSlaveActor的ref引用。

(2)Slave端

对于Slave,BlockManagerMaster对象拥有BlockManagerMasterActor对象的ref的引用和自身BlockManagerSlaveActor的actor的引用。BlockManagerMasterActor在ref和Actor之间通信,BlockManagerSlaveActor在ref和Actor之间通信。

BlockManager在内部封装BlockManagerMaster,并通过BlockManagerMaster进行通信。Spark在各节点创建各自的BlockManager,通过BlockManager对storage模块进行操作。BlockManager对象在SparkEnv中创建,SparkEnv相当于线程的上线下文变量,在SparkEnv中也会创建很多的管理组件。例如,connectionManager、broadcastManager、cacheManager等的创建过程如下。


  1. private[spark] def create
  2. conf SparkConf
  3. executorId String
  4. hostname String
  5. port Int
  6. isDriver Boolean
  7. isLocal Boolean
  8. listenerBus LiveListenerBus = null): SparkEnv = {
  9. ……
  10. mapOutputTracker.trackerActor = registerOrLookup
  11. "MapOutputTracker"
  12. new MapOutputTrackerMasterActormapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
  13. val blockManagerMaster = new BlockManagerMasterregisterOrLookup
  14. "BlockManagerMaster"new BlockManagerMasterActorisLocal conf listenerBus)), conf
  15. /*创建blockManager*/
  16. val blockManager = new BlockManagerexecutorId actorSystem blockManagerMaster
  17. serializer conf securityManager mapOutputTracker
  18. val connectionManager = blockManager.connectionManager
  19. val broadcastManager = new BroadcastManagerisDriver conf securityManager
  20. val cacheManager = new CacheManagerblockManager
  21. val httpFileServer =
  22. if isDriver {
  23. val server = new HttpFileServersecurityManager
  24. server.initialize()
  25. conf.set"spark.fileserver.uri" server.serverUri
  26. server
  27. } else {
  28. null
  29. }
  30. val metricsSystem = if isDriver {
  31. MetricsSystem.createMetricsSystem"driver" conf securityManager
  32. } else {
  33. MetricsSystem.createMetricsSystem"executor" conf securityManager
  34. }
  35. metricsSystem.start()
  36. val sparkFilesDir String = if isDriver {
  37. Utils.createTempDir().getAbsolutePath
  38. } else {
  39. "."
  40. }
  41. val shuffleManager = instantiateClass[ShuffleManager](
  42. "spark.shuffle.manager" "org.apache.spark.shuffle.hash.HashShuffleManager"
  43. if conf.contains"spark.cache.class")) {
  44. logWarning"The spark.cache.class property is no longer being used! Specify storage " + "levels using the RDD.persist() method instead."
  45. }
  46. new SparkEnv
  47. executorId
  48. actorSystem
  49. serializer
  50. closureSerializer
  51. cacheManager
  52. mapOutputTracker
  53. shuffleManager
  54. broadcastManager
  55. blockManager
  56. connectionManager
  57. securityManager
  58. httpFileServer
  59. sparkFilesDir
  60. metricsSystem
  61. conf
  62. }

通信层中涉及许多控制消息和状态消息的通信以及消息处理,感兴趣的读者可以参照源码。

3.读写流程

(1)数据写入

4.3.3 Spark块管理 - 图3

图4-12 Spark数据读写

数据写入的简要流程,读取流程和写入流程类似。数据写入流程主要分为以下几个步骤。

1)RDD调用compute()方法进行指定分区的写入。

2)CacheManager中调用BlockManater判断数据是否已经写入,如果未写则写入。

3)BlockManager中数据与其他节点同步。

4)BlockManager根据存储级别写入指定的存储层。

5)BlockManager向主节点汇报存储状态。

详细步骤如下。

1)入口在RDD类中通过compute方法调用iterator方法进行某个分区Partition的读写,Partition是逻辑概念,在物理上是一个Block。其具体实现如下:


  1. final def iteratorsplit Partition context TaskContext): Iterator[T] = {
  2. if storageLevel != StorageLevel.NONE {
  3. SparkEnv.get.cacheManager.getOrComputethis split context storageLevel
  4. } else {
  5. computeOrReadCheckpointsplit context
  6. }
  7. }

2)在CacheManager类中,getOrCompute方法通过调用BlockManager的put接口来写入数据。

我们可以看到,在这里有个判断逻辑,它先从内存cache读取是否有块可以读取,如果没有,则需要进行RDD的计算,通过触发RDD的执行和块的计算来加载数据。其具体实现如下:


  1. def getOrCompute[T](
  2. rdd RDD[T],
  3. partition Partition
  4. context TaskContext
  5. storageLevel StorageLevel): Iterator[T] = {
  6. val key = RDDBlockIdrdd.id partition.index
  7. logDebugs"Looking for partition $key"
  8. blockManager.getkey match {
  9. ……
  10. case None =>
  11. ……
  12. /*如果BlockManager中还没有数据,则将数据写入BlockManager中*/
  13. val cachedValues = putInBlockManagerkey computedValues storageLevel
  14. updatedBlocks
  15. context.taskMetrics.updatedBlocks = SomeupdatedBlocks
  16. new InterruptibleIteratorcontext cachedValues
  17. ……
  18. }
  19. private def putInBlockManager[T](
  20. ……
  21. updatedBlocks ++= blockManager.putkey values storageLevel tellMaster = true
  22. ……
  23. }
  24. /*在BlockManager中,调用put方法*/
  25. def put
  26. blockId BlockId
  27. values Iterator[Any],
  28. level StorageLevel
  29. tellMaster Boolean): Seq[(BlockId BlockStatus)] = {
  30. requirevalues != null "Values is null"
  31. doPutblockId IteratorValuesvalues), level tellMaster
  32. }
  33. /*调用doPut方法*/
  34. private def doPut
  35. blockId BlockId
  36. data BlockValues
  37. level StorageLevel
  38. tellMaster Boolean = true): Seq[(BlockId BlockStatus)] = {
  39. ……

3)将写入的数据与其他Woker进行同步。其具体实现如下:


  1. val replicationFuture = data match {
  2. case b ByteBufferValues if level.replication > 1 =>
  3. 副本并不复制二进制数据,只是创建封装器
  4. val bufferView = b.buffer.duplicate()
  5. Future { replicateblockId bufferView level }
  6. case _ => null
  7. }
  8. ……
  9. var marked = false
  10. try {

4)根据用户设置的StorageLevel来判断数据写入哪个存储层。其具体实现如下:


  1. val returnValues blockStore BlockStore = {
  2. if level.useMemory {
  3. /*优先写入内存,即设置useDisk为真,如果内存不能存储完,再写入磁盘*/
  4. true memoryStore
  5. } else if level.useOffHeap {
  6. /*写入tachyon,这样存储于Java Heap之外的内存空间*/
  7. false tachyonStore
  8. } else if level.useDisk {
  9. them
  10. /*写入磁盘*/
  11. level.replication > 1 diskStore
  12. } else {
  13. assertlevel == StorageLevel.NONE
  14. throw new BlockException
  15. blockId s"Attempted to put block $blockId without specifying storage
  16. level!"
  17. }
  18. }
  19. /*依据适用的Store类写入相应的存储,这里体现面向接口的编程*/
  20. val result = data match {
  21. case IteratorValuesiterator =>
  22. blockStore.putValuesblockId iterator level returnValues
  23. case ArrayBufferValuesarray =>
  24. blockStore.putValuesblockId array level returnValues
  25. case ByteBufferValuesbytes =>
  26. bytes.rewind()
  27. blockStore.putBytesblockId bytes level
  28. }
  29. ……

5)通知BlockManagerMaster有新数据写入,在BlockManagerMaster中保存元数据。代码实现如下:


  1. reportBlockStatusblockId putBlockInfo putBlockStatus
  2. ……
  3. }

(2)数据读取

在RDD类中,通过compute方法调用iterator读写某个分区(Partition),作为数据读取的入口。分区是逻辑概念,在物理上是一个数据块(block)。


  1. final def iteratorsplit Partition context TaskContext): Iterator[T] = {
  2. if storageLevel != StorageLevel.NONE {
  3. SparkEnv.get.cacheManager.getOrComputethis split context storageLevel
  4. } else {
  5. computeOrReadCheckpointsplit context
  6. }
  7. }
  8. /*在CacheManager的getOrCompute方法中*/
  9. def getOrCompute[T](rdd RDD[T], split Partition context TaskContext
  10. storageLevel StorageLevel): Iterator[T] = {
  11. ……
  12. /*本质调用BlockManager的get方法获取数据*/
  13. blockManager.getkey match {
  14. case Somevalues =>
  15. // Partition is already materialized, so just return its values
  16. new InterruptibleIteratorcontext values.asInstanceOf[Iterator[T]])
  17. case None =>
  18. ……
  19. }

(3)读取逻辑

通过下面BlockManager读取代码进入读取逻辑。


  1. private[spark] class BlockManager
  2. executorId String
  3. actorSystem ActorSystem
  4. val master BlockManagerMaster
  5. val defaultSerializer Serializer
  6. maxMemory Long
  7. val conf SparkConf
  8. securityManager SecurityManager
  9. mapOutputTracker MapOutputTracker
  10. extends Logging {
  11. ……
  12. def getblockId BlockId): Option[Iterator[Any]] = {
  13. /*如果需要读取的数据块在本地,则返回本地的数据块*/
  14. val local = getLocalblockId
  15. ……
  16. /*如果需要的数据块不在本地,则远端获取(Fetch)数据块*/
  17. val remote = getRemoteblockId
  18. ……
  19. /*如果远端也没有,则数据块不存在*/
  20. None
  21. }
  22. ……
  23. }

1)本地读取。

在本地同步读取数据块,首先看能否在内存读取数据块,如果不能读取,则看能否从Tachyon读取数据块,如果仍不能读取,则看能否从磁盘读取数据块。


  1. private def doGetLocalblockId BlockId asValues Boolean): Option[Any] = {
  2. val info = blockInfo.getblockId).orNull
  3. if info != null {
  4. /*同步读取数据块*/
  5. info.synchronized {
  6. ……
  7. /*在内存读取数据块*/
  8. if level.useMemory {
  9. logDebug"Getting block " + blockId + " from memory"
  10. val result = if asValues {
  11. memoryStore.getValuesblockId
  12. } else {
  13. memoryStore.getBytesblockId
  14. }
  15. ……
  16. }
  17. /*在Tachyon读取数据块*/
  18. if level.useOffHeap {
  19. logDebug"Getting block " + blockId + " from tachyon"
  20. if tachyonStore.containsblockId)) {
  21. tachyonStore.getBytesblockId match {
  22. case Somebytes => {
  23. if (!asValues {
  24. return Somebytes
  25. } else {
  26. return SomedataDeserializeblockId bytes))
  27. }
  28. }
  29. case None =>
  30. logDebug"Block " + blockId + " not found in tachyon"
  31. }
  32. }
  33. }
  34. /*在磁盘读取数据块*/
  35. if level.useDisk {
  36. logDebug"Getting block " + blockId + " from disk"
  37. val bytes ByteBuffer = diskStore.getBytesblockId match {
  38. case Somebytes => bytes
  39. case None =>
  40. throw new Exception"Block " + blockId + " not found on disk, though
  41. it should be"
  42. }
  43. ……
  44. }

2)远程读取。

远程获取调用路径,然后getRemote调用doGetRemote,通过BlockManagerWorker.syncGetBlock从远程获取数据。


  1. private def doGetRemoteblockId BlockId asValues Boolean): Option[Any] = {
  2. ……
  3. /*获取远端数据块,返回data数据块*/
  4. val data = BlockManagerWorker.syncGetBlock
  5. GetBlockblockId), ConnectionManagerIdloc.host loc.port))
  6. ……
  7. }

在BlockManagerWorker中调用syncGetBlock获取远端数据块,这里使用了Future模型。Future本身是一种被广泛运用的并发设计模式,可在很大程度上简化需要数据流同步的并发应用开发。这里以java.util.concurrent.Future为例,简单介绍Future的具体工作方式。Future对象本身可以看做是一个显式的引用,一个对异步处理结果的引用。由于其异步性质,在创建之初,它所引用的对象可能还并不可用(如尚在运算中、网络传输中或等待中)。这时,得到Future的程序流程如果并不急于使用Future所引用的对象,就可以做其他需要做的工作,当流程进行到需要Future背后引用的对象时,可能有以下两种情况。

第一种情况:希望能看到这个对象可用,并完成一些相关的后续流程。如果实在不可用,则也可以进入其他分支流程。

第二种情况:没有这个结果,则无法执行下去(可以设置超时进行时间限制)。对于前一种情况,可以通过调用Future.isDone()判断引用的对象是否就绪,并采取不同的处理;后一种情况则只需调用get()或get(long timeout,TimeUnit unit),通过同步阻塞方式等待对象就绪。实际运行期是阻塞,还是立即返回,取决于get()的调用时机和对象就绪的先后。Future模式可以在连续流程中满足数据驱动的并发需求,这样既获得了并发执行的性能提升,又不失连续流程的简洁优雅。

下面通过SyncGetBlock方法了解获取数据块的方式。


  1. def syncGetBlockmsg GetBlock toConnManagerId ConnectionManagerId): ByteBuffer = {
  2. ……
  3. /*返回responseMessage对象相当于是个Future*/
  4. val responseMessage = connectionManager.sendMessageReliablySync
  5. toConnManagerId blockMessageArray.toBufferMessage
  6. ……
  7. }
  8. def sendMessageReliablySyncconnectionManagerId ConnectionManagerId
  9. message Message): Option[Message] = {
  10. Await.resultsendMessageReliablyconnectionManagerId message), Duration.Inf
  11. }

在ConnectionManager中,通过sendMessage方法获取远端数据,通过Future异步计算模型获取远端读取结果状态。


  1. def sendMessageReliablyconnectionManagerId ConnectionManagerId message Message
  2. Future[Option[Message]] = {
  3. val promise = Promise[Option[Message]]
  4. val status = new MessageStatus
  5. message connectionManagerId s => promise.successs.ackMessage))
  6. messageStatuses.synchronized {
  7. messageStatuses += ((message.id status))
  8. }
  9. sendMessageconnectionManagerId message
  10. promise.future
  11. }

在sendMessage方法中:


  1. private def sendMessageconnectionManagerId ConnectionManagerId message Message {
  2. ……
  3. /*需要和远端身份验证建立连接*/
  4. connection.getAuthenticated().synchronized {
  5. /*发送消息*/
  6. connection.sendmessage
  7. wakeupSelector()
  8. }

4.数据块读写管理

数据块的读写,如果在本地内存存在所需数据块,则先从本地内存读取,如果不存在,则看本地的磁盘是否有数据,如果仍不存在,再看网络中其他节点上是否有数据,即数据有3个类别的读写来源。

(1)MemoryStore内存块读写

通过源码可以看到进行块读写是线程间同步的。通过entries.synchronized控制多线程并发读写,防止出现异常。

PutBlock对象用来确保只有一个线程写入数据块。这样确保数据读写且线程安全的。示例代码如下:


  1. private val putLock = new Object()

内存Block块管理是通过链表来实现的,如图4-13所示。


  1. private val entries = new LinkedHashMap[BlockId MemoryEntry](32 0.75f true

4.3.3 Spark块管理 - 图4

图4-13 MemoryStore数据存储格式

MemoryStroe内存快读写示例代码如下所示。通过getValues等方法为入口进行数据块的同步读,通过trytoPut等方法为入口进行数据块的同步写。


  1. /*读取内存数据块*/
  2. override def getValuesblockId BlockId): Option[Iterator[Any]] = {
  3. /*同步读取数据*/
  4. val entry = entries.synchronized {
  5. entries.getblockId
  6. }
  7. if entry == null {
  8. None
  9. } else if entry.deserialized {
  10. Someentry.value.asInstanceOf[ArrayBuffer[Any]].iterator
  11. } else {
  12. val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() /* 实际并不复制数据*/
  13. SomeblockManager.dataDeserializeblockId buffer))
  14. }
  15. }
  16. /*内存写入数据块*/
  17. private def tryToPut
  18. blockId BlockId
  19. value Any
  20. size Long
  21. deserialized Boolean): ResultWithDroppedBlocks = {
  22. ……
  23. /*同步进行数据写*/
  24. putLock.synchronized {
  25. ……
  26. /*如果有足够内存空间*/
  27. if enoughFreeSpace {
  28. val entry = new MemoryEntryvalue size deserialized
  29. /*互斥写入entries容器*/
  30. entries.synchronized {
  31. entries.putblockId entry
  32. currentMemory += size
  33. }
  34. val valuesOrBytes = if deserialized "values" else "bytes"
  35. logInfo"Block %s stored as %s in memory (estimated size %s, free %s)".format
  36. blockId valuesOrBytes Utils.bytesToStringsize), Utils.bytesToString
  37. freeMemory)))
  38. putSuccess = true
  39. } else {
  40. ……
  41. }

(2)DiskStore磁盘块写入

在DiskStore中,一个Block对应一个文件。在diskManager中,存储blockId和一个文件路径映射。数据块的读写入相当于读写文件流。

写入二进制数据的具体实现代码如下所示。


  1. override def putBytesblockId BlockId _bytes ByteBuffer level StorageLevel): PutResult = {
  2. val bytes = _bytes.duplicate()
  3. ……
  4. val file = diskManager.getFileblockId
  5. /*获取这个块对应的文件输出流*/
  6. val channel = new FileOutputStreamfile).getChannel
  7. while bytes.remaining > 0 {
  8. /*将数据块写入文件*/
  9. channel.writebytes
  10. }
  11. ……
  12. }