8.2.3 Spark Streaming原理剖析
下面将通过一个example示例的源码呈现Spark Streaming的底层机制。示例及源码基于Spark 1.0版本,后续的发布版中可能会有更新。
1.初始化与集群上分布接收器
图8-12所示为Spark Streaming执行模型从中可看到数据接收及组件间的通信。
图8-12 Spark Streaming执行模型
初始化的过程主要可以概括为以下两点。
1)调度器的初始化。
2)将输入流的接收器转化为RDD在集群打散,然后启动接收器集合中的每个接收器。
下面通过具体的代码更深入地理解这个过程。
(1)NetworkWordCount示例
本例以NetworkWordCount作为研究Spark Streaming的入口程序。
- object NetworkWordCount {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: NetworkWordCount <hostname> <port>")
- System.exit(1)
- }
- StreamingExamples.setStreamingLogLevels()
- val sparkConf = new SparkConf().setAppName("NetworkWordCount")
- /*创建StreamingContext对象,形成整个程序的上下文*/
- val ssc = new StreamingContext(sparkConf, Seconds(1))
- /*通过socketTextStream接收源源不断地socket文本流*/
- val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
- ssc.start()
- ssc.awaitTermination()
- }
- }
(2)进入scoketTextStream
- def socketTextStream(
- hostname: String,
- port: Int,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2):
- ReceiverInputDStream[String] = {
- /*内部实际调用的socketStream方法 */
- socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
- }
- /*进入socketStream方法 */
- def socketStream[T: ClassTag](
- hostname: String,
- port: Int,
- converter: (InputStream) => Iterator[T],
- storageLevel: StorageLevel
- ): ReceiverInputDStream[T] = {
- /*此处初始化SocketInputDStream对象 */
- new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
- }
(3)初始化SocketInputDStream
在之前的Spark Streaming介绍中,读者已经了解到整个Spark Streaming的调度灵魂就是DStream的DAG,可以将这个DStream DAG类比Spark中的RDD DAG,而DStream类比RDD,DStream可以理解为包含各个时间段的一个RDD集合。SocketInputDStream就是一个DStream。
- private[streaming]
- class SocketInputDStream[T: ClassTag](
- @transient ssc_ : StreamingContext,
- host: String,
- port: Int,
- bytesToObjects: InputStream => Iterator[T],
- storageLevel: StorageLevel
- )extends ReceiverInputDStream[T](ssc_) {
- def getReceiver(): Receiver[T] = {
- new SocketReceiver(host, port, bytesToObjects, storageLevel)
- }
- }
(4)触发StreamingContext中的Start()方法
上面的步骤基本完成了Spark Streaming的初始化工作。类似于Spark机制,Spark Streaming也是延迟(Lazy)触发的,只有调用了start()方法,才真正地执行了。
- private[streaming] val scheduler = new JobScheduler(this)
- /*StreamingContext中维持着一个调度器*/
- def start(): Unit = synchronized {
- ……
- /*启动调度器*/
- scheduler.start()
- ……
- }
(5)JobScheduler.start()启动调度器
在start方法中初始化了很多重要的组件。
- def start(): Unit = synchronized {
- ……
- /*初始化事件处理Actor,当有消息传递给Actor时,调用processEvent进行事件处理*/
- eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
- def receive = {
- case event: JobSchedulerEvent => processEvent(event)
- }
- }), "JobScheduler")
- /*启动监听总线*/
- listenerBus.start()
- receiverTracker = new ReceiverTracker(ssc)
- /*启动接收器的监听器receiverTracker*/
- receiverTracker.start()
- /*启动job生成器*/
- jobGenerator.start()
- ……
- }
(6)ReceiverTracker类
/进入ReceiverTracker查看/
- private[streaming]
- class ReceiverTracker(ssc: StreamingContext) extends Logging {
- val receiverInputStreams = ssc.graph.getReceiverInputStreams()
- def start() = synchronized {
- ……
- val receiverExecutor = new ReceiverLauncher()
- ……
- if (!receiverInputStreams.isEmpty) {
- /*初始化ReceiverTrackerActor */
- actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor),
- "ReceiverTracker")
- /*启动ReceiverLauncher()实例,(7)中进行介绍*/
- receiverExecutor.start()
- ……
- }
- }
- /*读者可以先参考ReceiverTrackerActor的代码查看实现注册Receiver和注册Block元数据信息的功能。 */
- private class ReceiverTrackerActor extends Actor {
- def receive = {
- /*接收注册receiver的消息,每个receiver就是一个输入流接收器,Receiver分布在Worker节点,一个Receiver接收一个输入流,一个Spark Streaming集群可以有多个输入流 */
- case RegisterReceiver(streamId, typ, host, receiverActor) =>
- registerReceiver(streamId, typ, host, receiverActor, sender)
- sender ! true
- case AddBlock(receivedBlockInfo) =>
- addBlocks(receivedBlockInfo)
- ……
- }
- }
(7)receivelauncher类,在集群上分布式启动接收器
- class ReceiverLauncher {
- ……
- @transient val thread = new Thread() {
- override def run() {
- ……
- /*启动ReceiverTrackerActor已经注册的Receiver*/
- startReceivers()
- ……
- }
下面进入startReceivers方法,方法中将Receiver集合转变为RDD,从而在集群上打散,分布式分布。如图8-13所示,一个集群可以分布式地在不同的Worker节点接收输入数据流。
图8-13 Spark Streaming接收器
- private def startReceivers() {
- /*获取之前配置的接收器 */
- val receivers = receiverInputStreams.map(nis => {
- val rcvr = nis.getReceiver()
- rcvr.setReceiverId(nis.id)
- rcvr
- })
- ……
- /* 创建并行的在不同Worker节点分布的receiver集合 */
- val tempRDD =
- if (hasLocationPreferences) {
- val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
- ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
- } else {
- /*在这里创造RDD相当于进入SparkContext.makeRDD,此经典之处在于将receivers集合作为一个RDD [Receiver]进行分区。即使只有一个输入流,按照分布式分区方式,也是将输入分布在Worker端,而不在Master*/
- ssc.sc.makeRDD(receivers, receivers.size)
- /*调用Sparkcontext中的makeRDD方法,本质是调用将数据分布式化的方法parallelize*/
- /* def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): //RDD[T] = { parallelize(seq, numSlices) */
- /*在RDD[Receiver[_]]每个分区的每个Receiver 上都同时启动,这样其实Spark Streaming可以构建大量的分布式输入流 */
- val startReceiver = (iterator: Iterator[Receiver[_]]) => {
- if (!iterator.hasNext) {
- throw new SparkException(
- "Could not start receiver as object not found.")
- }
- val receiver = iterator.next()
- /*此处的supervisorImpl是一个监督者的角色,在下面的内容中将会剖析这个对象的作用 */
- val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
- executor.start()
- executor.awaitTermination()
- }
- /
- /*将receivers的集合打散,然后启动它们 */
- ……
- ssc.sparkContext.runJob(tempRDD, startReceiver)
- ……
- }
2.数据接收与转化
在“1.初始化与集群上分布接收器”中介绍了,receiver集合转换为RDD在集群上分布式地接收数据流。那么每个receiver是怎样接收并处理数据流的呢?Spark Streaming数据接收与转化的示意图如图8-14所示。
图8-14的主要流程如下。
1)数据缓冲:在Receiver的receive函数中接收流数据,将接收到的数据源源不断地放入BlockGenerator.currentBuffer。
2)缓冲数据转化为数据块:在BlockGenerator中有一个定时器(recurring timer),将当前缓冲区中的数据以用户定义的时间间隔封装为一个数据块Block,放入BlockGenerator的blocksForPush队列中。
3)数据块转化为Spark数据块:在BlockGenerator中有一个BlockPushingThread线程,不断地将blocksForPush队列中的块传递给Blockmanager,让BlockManager将数据存储为块,读者可以在本书的Spark IO章节了解Spark的底层存储机制。BlockManager负责Spark中的块管理。
4)元数据存储:在pushArrayBuffer方法中还会将已经由BlockManager存储的元数据信息(如Block的ID号)传递给ReceiverTracker,ReceiverTracker将存储的blockId放到对应StreamId的队列中。
上面过程中涉及最多的类就是BlockGenerator,在数据转化的过程中,其扮演着不可或缺的角色。
- private[streaming] class BlockGenerator(
- listener: BlockGeneratorListener,
- receiverId: Int,
- conf: SparkConf
- ) extends Logging
图8-14 Spark Streaming数据接收与转化
感兴趣的读者可以参照图8-14中的类和方法更加具体地了解机制。由于篇幅所限,这个数据生成过程的代码不再具体剖析。
3.生成RDD与提交Spark Job
Spark Streaming根据时间段,将数据切分为RDD,然后触发RDD的Action提交Job,Job被提交到JobManager中的Job Queue中,由JobScheduler调度,Job Scheduler将Job提交到Spark的Job调度器,然后将Job转换为大量的任务分发给Spark集群执行。
如图8-15所示,Jobgenerator中通过下面的方法生成Job调度和执行。
图8-15 Spark Streaming调度模型
从下面的代码中可以看出,Jobs是从outputStream中生成的,然后触发反向回溯执行整个DStream DAG,类似于RDD的机制。
- private def generateJobs(time: Time) {
- SparkEnv.set(ssc.env)
- Try(graph.generateJobs(time)) match {
- case Success(jobs) =>
- /*获取输入数据块的信息*/
- val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
- ……
- }.toMap
- jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
- case Failure(e) =>
- jobScheduler.reportError("Error generating jobs for time " + time, e)
- }
- eventActor ! DoCheckpoint(time)
- }
- /*下面进入JobScheduler的submitJobSet方法一探究竟,JobScheduler是整个Spark Streaming调度的核心组件*/
- def submitJobSet(jobSet: JobSet) {
- ……
- jobSets.put(jobSet.time, jobSet)
- jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
- …
- }
- /*进入Graph生成job的方法,graph的本质是DStreamGraph类生成的对象 */
- final private[streaming] class DStreamGraph extends Serializable with Logging {
- def generateJobs(time: Time): Seq[Job] = {
- ……
- private val inputStreams = new ArrayBuffer[InputDStream[_]]()
- private val outputStreams = new ArrayBuffer[DStream[_]]()
- ……
- val jobs = this.synchronized {
- outputStreams.flatMap(outputStream => outputStream.generateJob(time))
- …
- }
- /*outputStreams中的对象是DStream,下面进入DStream的generateJob一探究竟*/
- private[streaming] def generateJob(time: Time): Option[Job] = {
- getOrCompute(time) match {
- case Some(rdd) => {
- val jobFunc = () => {
- val emptyFunc = { (iterator: Iterator[T]) => {} }
- /*此处相当于针对每个时间段生成的一个RDD,会调用SparkContext的方法runJob提交Spark的一个Job*/
- context.sparkContext.runJob(rdd, emptyFunc)
- }
- Some(new Job(time, jobFunc))
- }
- case None => None
- }
- }
- /*在DStream算是父类,一些具体的DStream,如SocketInputStream等的类的父类。可以通过SocketInputDStream查看如何通过上面的getOrCompute生成RDD*/
- private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
- generatedRDDs.get(time) match {
- …
- case None => {
- if (isTimeValid(time)) {
- /* Dstream是个父类,Dstream的子类可以完成不同算子运算,这样的继承关系意味着Action类型的Dstream会触发compute函数运算,并反向回溯到顶层的Dstream类运行compute函数计算。这样每隔一段时间,生成的RDD反向计算一次。计算模式类似于RDD的DAG */
- compute(time) match {
- ……
- generatedRDDs.put(time, newRDD)
- ……
- }
- 在SocketInputDStream的compute方法中生成对应时间片的RDD。
- override def compute(validTime: Time): Option[RDD[T]] = {
- if (validTime >= graph.startTime) {
- val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
- receivedBlockInfo(validTime) = blockInfo
- val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
- Some(new BlockRDD[T](ssc.sc, blockIds))
- } else {
- Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
- }
- }
Dstream是个父类,其子类可以完成不同算子运算,这样的继承关系意味着Action类型的Dstream会触发compute函数运算,并反向回溯到顶层的Dstream类运行compute函数计算,这样每隔一段时间,生成的新RDD反向计算。计算模式类似于RDD的DAG。