8.2.3 Spark Streaming原理剖析

下面将通过一个example示例的源码呈现Spark Streaming的底层机制。示例及源码基于Spark 1.0版本,后续的发布版中可能会有更新。

1.初始化与集群上分布接收器

图8-12所示为Spark Streaming执行模型从中可看到数据接收及组件间的通信。

8.2.3 Spark Streaming原理剖析 - 图1

图8-12 Spark Streaming执行模型

初始化的过程主要可以概括为以下两点。

1)调度器的初始化。

2)将输入流的接收器转化为RDD在集群打散,然后启动接收器集合中的每个接收器。

下面通过具体的代码更深入地理解这个过程。

(1)NetworkWordCount示例

本例以NetworkWordCount作为研究Spark Streaming的入口程序。


  1. object NetworkWordCount {
  2. def mainargs Array[String]) {
  3. if args.length < 2 {
  4. System.err.println"Usage: NetworkWordCount <hostname> <port>"
  5. System.exit1
  6. }
  7. StreamingExamples.setStreamingLogLevels()
  8. val sparkConf = new SparkConf().setAppName"NetworkWordCount"
  9. /*创建StreamingContext对象,形成整个程序的上下文*/
  10. val ssc = new StreamingContextsparkConf Seconds1))
  11. /*通过socketTextStream接收源源不断地socket文本流*/
  12. val lines = ssc.socketTextStreamargs0), args1).toInt StorageLevel.MEMORY_AND_DISK_SER
  13. val words = lines.flatMap_.split" "))
  14. val wordCounts = words.mapx => x 1)).reduceByKey_ + _
  15. wordCounts.print()
  16. ssc.start()
  17. ssc.awaitTermination()
  18. }
  19. }

(2)进入scoketTextStream


  1. def socketTextStream
  2. hostname String
  3. port Int
  4. storageLevel StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2):
  5. ReceiverInputDStream[String] = {
  6. /*内部实际调用的socketStream方法 */
  7. socketStream[String](hostname port SocketReceiver.bytesToLines storageLevel
  8. }
  9. /*进入socketStream方法 */
  10. def socketStream[T ClassTag](
  11. hostname String
  12. port Int
  13. converter InputStream => Iterator[T],
  14. storageLevel StorageLevel
  15. ): ReceiverInputDStream[T] = {
  16. /*此处初始化SocketInputDStream对象 */
  17. new SocketInputDStream[T](this hostname port converter storageLevel
  18. }

(3)初始化SocketInputDStream

在之前的Spark Streaming介绍中,读者已经了解到整个Spark Streaming的调度灵魂就是DStream的DAG,可以将这个DStream DAG类比Spark中的RDD DAG,而DStream类比RDD,DStream可以理解为包含各个时间段的一个RDD集合。SocketInputDStream就是一个DStream。


  1. private[streaming]
  2. class SocketInputDStream[T ClassTag](
  3. @transient ssc_ StreamingContext
  4. host String
  5. port Int
  6. bytesToObjects InputStream => Iterator[T],
  7. storageLevel StorageLevel
  8. extends ReceiverInputDStream[T](ssc_ {
  9. def getReceiver(): Receiver[T] = {
  10. new SocketReceiverhost port bytesToObjects storageLevel
  11. }
  12. }

(4)触发StreamingContext中的Start()方法

上面的步骤基本完成了Spark Streaming的初始化工作。类似于Spark机制,Spark Streaming也是延迟(Lazy)触发的,只有调用了start()方法,才真正地执行了。


  1. private[streaming] val scheduler = new JobSchedulerthis
  2. /*StreamingContext中维持着一个调度器*/
  3. def start(): Unit = synchronized {
  4. ……
  5. /*启动调度器*/
  6. scheduler.start()
  7. ……
  8. }

(5)JobScheduler.start()启动调度器

在start方法中初始化了很多重要的组件。


  1. def start(): Unit = synchronized {
  2. ……
  3. /*初始化事件处理Actor,当有消息传递给Actor时,调用processEvent进行事件处理*/
  4. eventActor = ssc.env.actorSystem.actorOfPropsnew Actor {
  5. def receive = {
  6. case event JobSchedulerEvent => processEventevent
  7. }
  8. }), "JobScheduler"
  9. /*启动监听总线*/
  10. listenerBus.start()
  11. receiverTracker = new ReceiverTrackerssc
  12. /*启动接收器的监听器receiverTracker*/
  13. receiverTracker.start()
  14. /*启动job生成器*/
  15. jobGenerator.start()
  16. ……
  17. }

(6)ReceiverTracker类

/进入ReceiverTracker查看/


  1. private[streaming]
  2. class ReceiverTrackerssc StreamingContext extends Logging {
  3. val receiverInputStreams = ssc.graph.getReceiverInputStreams()
  4. def start() = synchronized {
  5. ……
  6. val receiverExecutor = new ReceiverLauncher()
  7. ……
  8. if (!receiverInputStreams.isEmpty {
  9. /*初始化ReceiverTrackerActor */
  10. actor = ssc.env.actorSystem.actorOfPropsnew ReceiverTrackerActor),
  11. "ReceiverTracker"
  12. /*启动ReceiverLauncher()实例,(7)中进行介绍*/
  13. receiverExecutor.start()
  14. ……
  15. }
  16. }
  17. /*读者可以先参考ReceiverTrackerActor的代码查看实现注册Receiver和注册Block元数据信息的功能。 */
  18. private class ReceiverTrackerActor extends Actor {
  19. def receive = {
  20. /*接收注册receiver的消息,每个receiver就是一个输入流接收器,Receiver分布在Worker节点,一个Receiver接收一个输入流,一个Spark Streaming集群可以有多个输入流 */
  21. case RegisterReceiverstreamId typ host receiverActor =>
  22. registerReceiverstreamId typ host receiverActor sender
  23. sender true
  24. case AddBlockreceivedBlockInfo =>
  25. addBlocksreceivedBlockInfo
  26. ……
  27. }
  28. }

(7)receivelauncher类,在集群上分布式启动接收器


  1. class ReceiverLauncher {
  2. ……
  3. @transient val thread = new Thread() {
  4. override def run() {
  5. ……
  6. /*启动ReceiverTrackerActor已经注册的Receiver*/
  7. startReceivers()
  8. ……
  9. }

下面进入startReceivers方法,方法中将Receiver集合转变为RDD,从而在集群上打散,分布式分布。如图8-13所示,一个集群可以分布式地在不同的Worker节点接收输入数据流。

8.2.3 Spark Streaming原理剖析 - 图2

图8-13 Spark Streaming接收器


  1. private def startReceivers() {
  2. /*获取之前配置的接收器 */
  3. val receivers = receiverInputStreams.mapnis => {
  4. val rcvr = nis.getReceiver()
  5. rcvr.setReceiverIdnis.id
  6. rcvr
  7. })
  8. ……
  9. /* 创建并行的在不同Worker节点分布的receiver集合 */
  10. val tempRDD =
  11. if hasLocationPreferences {
  12. val receiversWithPreferences = receivers.mapr => r Seqr.preferredLocation.get)))
  13. ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences
  14. } else {
  15. /*在这里创造RDD相当于进入SparkContext.makeRDD,此经典之处在于将receivers集合作为一个RDD [Receiver]进行分区。即使只有一个输入流,按照分布式分区方式,也是将输入分布在Worker端,而不在Master*/
  16. ssc.sc.makeRDDreceivers receivers.size
  17. /*调用Sparkcontext中的makeRDD方法,本质是调用将数据分布式化的方法parallelize*/
  18. /* def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): //RDD[T] = { parallelize(seq, numSlices) */
  19. /*在RDD[Receiver[_]]每个分区的每个Receiver 上都同时启动,这样其实Spark Streaming可以构建大量的分布式输入流 */
  20. val startReceiver = iterator Iterator[Receiver[_]]) => {
  21. if (!iterator.hasNext {
  22. throw new SparkException
  23. "Could not start receiver as object not found."
  24. }
  25. val receiver = iterator.next()
  26. /*此处的supervisorImpl是一个监督者的角色,在下面的内容中将会剖析这个对象的作用 */
  27. val executor = new ReceiverSupervisorImplreceiver SparkEnv.get
  28. executor.start()
  29. executor.awaitTermination()
  30. }
  31. /
  32. /*将receivers的集合打散,然后启动它们 */
  33. ……
  34. ssc.sparkContext.runJobtempRDD startReceiver
  35. ……
  36. }

2.数据接收与转化

在“1.初始化与集群上分布接收器”中介绍了,receiver集合转换为RDD在集群上分布式地接收数据流。那么每个receiver是怎样接收并处理数据流的呢?Spark Streaming数据接收与转化的示意图如图8-14所示。

图8-14的主要流程如下。

1)数据缓冲:在Receiver的receive函数中接收流数据,将接收到的数据源源不断地放入BlockGenerator.currentBuffer。

2)缓冲数据转化为数据块:在BlockGenerator中有一个定时器(recurring timer),将当前缓冲区中的数据以用户定义的时间间隔封装为一个数据块Block,放入BlockGenerator的blocksForPush队列中。

3)数据块转化为Spark数据块:在BlockGenerator中有一个BlockPushingThread线程,不断地将blocksForPush队列中的块传递给Blockmanager,让BlockManager将数据存储为块,读者可以在本书的Spark IO章节了解Spark的底层存储机制。BlockManager负责Spark中的块管理。

4)元数据存储:在pushArrayBuffer方法中还会将已经由BlockManager存储的元数据信息(如Block的ID号)传递给ReceiverTracker,ReceiverTracker将存储的blockId放到对应StreamId的队列中。

上面过程中涉及最多的类就是BlockGenerator,在数据转化的过程中,其扮演着不可或缺的角色。


  1. private[streaming] class BlockGenerator
  2. listener BlockGeneratorListener
  3. receiverId Int
  4. conf SparkConf
  5. extends Logging

8.2.3 Spark Streaming原理剖析 - 图3

图8-14 Spark Streaming数据接收与转化

感兴趣的读者可以参照图8-14中的类和方法更加具体地了解机制。由于篇幅所限,这个数据生成过程的代码不再具体剖析。

3.生成RDD与提交Spark Job

Spark Streaming根据时间段,将数据切分为RDD,然后触发RDD的Action提交Job,Job被提交到JobManager中的Job Queue中,由JobScheduler调度,Job Scheduler将Job提交到Spark的Job调度器,然后将Job转换为大量的任务分发给Spark集群执行。

如图8-15所示,Jobgenerator中通过下面的方法生成Job调度和执行。

8.2.3 Spark Streaming原理剖析 - 图4

图8-15 Spark Streaming调度模型

从下面的代码中可以看出,Jobs是从outputStream中生成的,然后触发反向回溯执行整个DStream DAG,类似于RDD的机制。


  1. private def generateJobstime Time {
  2. SparkEnv.setssc.env
  3. Trygraph.generateJobstime)) match {
  4. case Successjobs =>
  5. /*获取输入数据块的信息*/
  6. val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
  7. ……
  8. }.toMap
  9. jobScheduler.submitJobSetJobSettime jobs receivedBlockInfo))
  10. case Failuree =>
  11. jobScheduler.reportError"Error generating jobs for time " + time e
  12. }
  13. eventActor DoCheckpointtime
  14. }
  15. /*下面进入JobScheduler的submitJobSet方法一探究竟,JobScheduler是整个Spark Streaming调度的核心组件*/
  16. def submitJobSetjobSet JobSet {
  17. ……
  18. jobSets.putjobSet.time jobSet
  19. jobSet.jobs.foreachjob => jobExecutor.executenew JobHandlerjob)))
  20. }
  21. /*进入Graph生成job的方法,graph的本质是DStreamGraph类生成的对象 */
  22. final private[streaming] class DStreamGraph extends Serializable with Logging {
  23. def generateJobstime Time): Seq[Job] = {
  24. ……
  25. private val inputStreams = new ArrayBuffer[InputDStream[_]]()
  26. private val outputStreams = new ArrayBuffer[DStream[_]]()
  27. ……
  28. val jobs = this.synchronized {
  29. outputStreams.flatMapoutputStream => outputStream.generateJobtime))
  30. }
  31. /*outputStreams中的对象是DStream,下面进入DStream的generateJob一探究竟*/
  32. private[streaming] def generateJobtime Time): Option[Job] = {
  33. getOrComputetime match {
  34. case Somerdd => {
  35. val jobFunc = () => {
  36. val emptyFunc = { iterator Iterator[T]) => {} }
  37. /*此处相当于针对每个时间段生成的一个RDD,会调用SparkContext的方法runJob提交Spark的一个Job*/
  38. context.sparkContext.runJobrdd emptyFunc
  39. }
  40. Somenew Jobtime jobFunc))
  41. }
  42. case None => None
  43. }
  44. }
  45. /*在DStream算是父类,一些具体的DStream,如SocketInputStream等的类的父类。可以通过SocketInputDStream查看如何通过上面的getOrCompute生成RDD*/
  46. private[streaming] def getOrComputetime Time): Option[RDD[T]] = {
  47. generatedRDDs.gettime match {
  48. case None => {
  49. if isTimeValidtime)) {
  50. /* Dstream是个父类,Dstream的子类可以完成不同算子运算,这样的继承关系意味着Action类型的Dstream会触发compute函数运算,并反向回溯到顶层的Dstream类运行compute函数计算。这样每隔一段时间,生成的RDD反向计算一次。计算模式类似于RDD的DAG */
  51. computetime match {
  52. ……
  53. generatedRDDs.puttime newRDD
  54. ……
  55. }
  56. SocketInputDStreamcompute方法中生成对应时间片的RDD
  57. override def computevalidTime Time): Option[RDD[T]] = {
  58. if validTime >= graph.startTime {
  59. val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfoid
  60. receivedBlockInfovalidTime = blockInfo
  61. val blockIds = blockInfo.map_.blockId.asInstanceOf[BlockId])
  62. Somenew BlockRDD[T](ssc.sc blockIds))
  63. } else {
  64. Somenew BlockRDD[T](ssc.sc Array[BlockId]()))
  65. }
  66. }

Dstream是个父类,其子类可以完成不同算子运算,这样的继承关系意味着Action类型的Dstream会触发compute函数运算,并反向回溯到顶层的Dstream类运行compute函数计算,这样每隔一段时间,生成的新RDD反向计算。计算模式类似于RDD的DAG。