9.2.3 网络传输优化
1.大任务分发优化
在任务的分发过程中会序列化任务的元数据信息,以及任务需要的jar和文件。任务的分发是通过AKKA库中的Actor模型之间的消息传送的。因为Spark采用了Scala的函数式风格,传递函数的变量引用采用闭包方式传递,所以当需要传输的数据通过Task进行分发时,会拖慢整体的执行速度。配置参数spark.akka.frameSize(默认buffer的大小为10MB)可以缓解过大的任务造成AKKA缓冲区溢出的问题,但是这个方式并不能解决本质的问题。下面具体讲解配置参数spark.akka.frameSize。
spark.akka.frameSize控制Spark框架内使用的AKKA框架中,Actor通信消息的最大容量(如任务(Task)的输出结果),因为整个Spark集群的消息传递都是通过Actor进行的,默认为10MB。当处理大规模数据时,任务的输出可能会大于这个值,需要根据实际数据设置一个更高的值。如果是这个值不够大而产生的错误,则可以从Worker节点的日志中排查。通常Worker上的任务失败后,主节点Master的运行日志上提示“Lost TID:”,可通过查看失败的Worker日志文件$SPARK_HOME/work/目录下面的日志文件中记录的任务的Serialized size of result是否超过10MB来确定通信数据超过AKKA的Buffer异常。
2.Broadcast在调优场景的使用
Spark的Broadcast(广播)变量对数据传输进行优化,通过Broadcast变量将用到的大数据量数据进行广播发送,可以提升整体速度。Broadcast主要用于共享Spark在计算过程中各个task都会用到的只读变量,Broadcast变量只会在每台计算机器上保存一份,而不会每个task都传递一份,这样就大大节省了空间,节省空间的同时意味着传输时间的减少,效率也高。在Spark的HadoopRDD实现中,就采用Broadcast进行Hadoop JobConf的传输。官方文档的说法是,当task大于20KB时,可以考虑使用Broadcast进行优化,还可以在控制台日志看到任务是多大,进而决定是否优化。还需要注意,每次迭代所传输的Broadcast变量都会保存在从节点Worker的内存中,直至内存不够用,Spark才会把旧的Broadcast变量释放掉,不能提前进行释放。BroadCast变量有一些应用场景,如MapSideJoin中的小表进行广播、机器学习中需要共享的矩阵的广播等。
用户可以调用SparkContext中的方法生成广播变量。
- def broadcast[T](value: T)(implicit arg0: ClassTag[T]): Broadcast[T]
3.Collect结果过大优化
在开发程序的过程中,会常常用到Collect操作符。Collect函数的实现如下。
- def collect(): Array[T] = {
- val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
- Array.concat(results: _*)
- }
函数通过SparkContext将每个分区执行变为数组,返回主节点后,将所有分区的数组合并成一个数组。这时,如果进行Collect的数据过大,就会产生问题,大量的从节点将数据写回同一个节点,拖慢整体运行时间,或者可能造成内存溢出的问题。
解决方式:当收集的最终结果数据过大时,可以将数据存储在分布式的HDFS或其他分布式持久化层上。将数据分布式地存储,可以减小单机数据的I/O开销和单机内存存储压力。或者当数据不太大,但会超出AKKA传输的Buffer大小时,需要增加AKKA Actor的buffer,可以通过配置参数spark.akka.frameSize(默认大小为10MB)进行调整。