4.5.2 Checkpoint机制
通过上述分析可以看出在以下两种情况下,RDD需要加检查点。
1)DAG中的Lineage过长,如果重算,则开销太大(如在PageRank中)。
2)在Shuffle Dependency上做Checkpoint(检查点)获得的收益更大。
由于RDD是只读的,所以Spark的RDD计算中一致性不是主要关心的内容,内存相对容易管理,这也是设计者很有远见的地方,这样减少了框架的复杂性,提升了性能和可扩展性,为以后上层框架的丰富奠定了强有力的基础。
在RDD计算中,通过检查点机制进行容错,传统做检查点有两种方式:通过冗余数据和日志记录更新操作。在RDD中的doCheckPoint方法相当于通过冗余数据来缓存数据,而之前介绍的血统就是通过相当粗粒度的记录更新操作来实现容错的。
在Spark中,通过RDD中的checkpoint()方法来做检查点。
- def checkpoint():Unit
可以通过SparkContext.setCheckPointDir()设置检查点数据的存储路径,进而将数据存储备份,然后Spark删除所有已经做检查点的RDD的祖先RDD依赖。这个操作需要在所有需要对这个RDD所做的操作完成之后再做,因为数据会写入持久化存储造成I/O开销。官方建议,做检查点的RDD最好是在内存中已经缓存的RDD,否则保存这个RDD在持久化的文件中需要重新计算,产生I/O开销。
下面通过源码来了解检查点的机制。
检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。
在RDD中通过doCheckpoint()方法作为检查点的入口方法。
- private[spark] def doCheckpoint() { ……
- checkpointData.get.doCheckpoint() } else
- { dependencies.foreach(_.rdd.doCheckpoint()) } …… }
在RDDCheckpointData中,通过doCheckpoint()方法做检查点。
- def doCheckpoint() { ……
RDD通过同步方式做检查点,具体使用Synchronized保证方法的同步和线程安全。代码实现如下。
- /*path检查点RDD的输出文件路径*/
- CheckpointData.synchronized { ……
- val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
- val fs = path.getFileSystem(new Configuration())
- if (!fs.mkdirs(path)) {
- throw new SparkException("Failed to create checkpoint path " + path) }
- /*在SparkContext提交作业,将检查点RDD写入之前设置的路径中*/
- rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _)
- val newRDD = new CheckpointRDD[T](rdd.context, path.toString) ……
- }
- /*在CheckPointRDD中调用 writeToFile方法将RDD写入HDFS*/
- def writeToFile[T](path: String, blockSize: Int = -1)(ctx:
- TaskContext, iterator: Iterator[T]) {
- val env = SparkEnv.get
- val outputDir = new Path(path)
- /*本质相当于在Hadoop 的分布式文件系统将RDD数据写进HDFS*/
- val fs = outputDir.getFileSystem(env.hadoop.newConfiguration()) val finalOutputName = splitIdToFile(ctx.splitId)
- val finalOutputPath = new Path(outputDir, finalOutputName)
- val tempOutputPath = new Path(outputDir, "." + finalOutputName + "-attempt-" + ctx.attemptId) ……
- /*根据数据量不同设置,不同的缓冲区大小*/
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
- val fileOutputStream = if (blockSize < 0) {
- fs.create(tempOutputPath, false, bufferSize)
- } else { // This is mainly for testing purpose
- fs.create(tempOutputPath, false, bufferSize,
- fs.getDefaultReplication, blockSize) }
- /*创建序列化器*/
- val serializer = env.serializer.newInstance()
- val serializeStream = serializer.serializeStream(fileOutputStream)
- /*此处为写入操作,关键是在iterator上相当于将iteraor迭代器的对象序列化写到HDFS中*/
- serializeStream.writeAll(iterator) serializeStream.close() …… }
- SerializationStream写入操作
- trait SerializationStream {
- def writeObject[T](t:T): SerializationStream
- def flush():Unit
- def close():Unit
- def writeAll[T](iter: Iterator[T]): SerializationStream = {
- while (iter.hasNext) { writeObject(iter.next()) } this } }
- /*如果配置了kyro序列化器进行写入,则调用下面的writeObject方法将数据序列化后写入HDFS*/
- private[spark] class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream
- { val output = new KryoOutput(outStream)
- ……
- def writeObject[T](t: T): SerializationStream = {
- kryo.writeClassAndObject(output, t)
- this }
- def flush() { output.flush() }
- def close() { output.close() }
- ……
- }