9.2.5 其他优化方法

除了之前介绍的性能调优方法,还有一些其他方法可供使用。

1.批处理

有些程序可能会调用外部资源,如数据库连接等,这些连接通过JDBC或者ODBC与外部数据源进行交互。用户可能会在编写程序时忽略掉一个问题。例如,将所有数据写入数据库,如果是一条一条地写:


  1. rdd.map{line=>con=getConnection
  2. con.writeline.toString);
  3. con.close}

因为整个RDD的数据项很大,整个集群会在短时间内产生高并发写入数据库的操作,对数据库压力很大,将产生很大的写入开销。

这里,可以将单条记录写转化为数据库的批量写,每个分区的数据写一次,这样可以利用数据库的批量写优化减少开销和减轻数据库压力。


  1. rdd.mapPartitionslines => conn.getDBConn
  2. foritem <- lines
  3. writeitem.toString);
  4. conn.close

同理,对于其他类型的需要和外部资源进行交互的操作,也是应该采用这种处理方式。

2.reduce和reduceByKey的优化

reduce是Action操作,reduceByKey是Transformation操作。

reduce的源码如下。


  1. def reducef T T => T): T = {
  2. val cleanF = sc.cleanf
  3. val reducePartition Iterator[T] => Option[T] = iter => {
  4. if iter.hasNext {
  5. Someiter.reduceLeftcleanF))
  6. } else {
  7. None
  8. }
  9. }
  10. var jobResult Option[T] = None
  11. val mergeResult = index Int taskResult Option[T]) => {
  12. if taskResult.isDefined {
  13. jobResult = jobResult match {
  14. case Somevalue => Somefvalue taskResult.get))
  15. case None => taskResult
  16. }
  17. }
  18. }
  19. sc.runJobthis reducePartition mergeResult
  20. jobResult.getOrElsethrow new UnsupportedOperationException"empty collection"))
  21. }

在reduce函数中会触发sc.runJob,提交任务,reduce是一个Action操作符。

reduceByKey的源码如下。


  1. def reduceByKeypartitioner Partitioner func V V => V):
  2. RDD[(K V)] = {
  3. combineByKey[V]((v V => v func func partitioner
  4. }

由代码可知,reduceByeKey并没有触发runJob,而是调用了combineByKey,该函数调用聚集器聚集数据。

reduce是一种聚合函数,可以把各个任务的执行结果汇集到一个节点,还可以指定自定义的函数传入reduce执行。Spark也对reduce的实现进行了优化,可以把同一个任务内的结果先在本地Worker节点执行聚合函数,再把结果传给Driver执行聚合。但最终数据还是要汇总到主节点,而且reduce会把接收到的数据保存到内存中,直到所有任务都完成为止。因此,当任务很多,任务的结果数据又比较大时Driver容易造成性能瓶颈,这样就应该考虑尽量避免reduce的使用,而将数据转化为Key-Value对,并使用reduceByKey实现逻辑,使计算变为分布式计算。

reduceByKey也是聚合操作,是根据key聚合对应的value。同样的,在每一个mapper把数据发送给reducer前,会在Map端本地先合并(类似于MapReduce中的Combiner)。与reduce不同的是,reduceByKey不是把数据汇集到Driver节点,是分布式进行的,因此不会存在reduce那样的性能瓶颈。

3.Shuffle操作符的内存使用

在有些情况下,应用将会遇到OutOfMemory的错误,其中并不是因为内存大小不能够容纳RDD,而是因为执行任务中使用的数据集合太大(如groupByKey)。Spark的Shuffle操作符(sortByKey、groupByKey、reduceByKey、join等都可以算是Shuffle操作符,因为这些操作会引发Shuffle)在执行分组操作的过程中,会在每个任务执行过程中,在内存创建Hash表来对数据进行分组,而这个Hash表在很多情况下通常变得很大。最简单的一种解决方案就是增加并行度,即增加任务数量和分区数量。这样每轮次每个Executor执行的任务数是固定的,每个任务接收的输入数据变少会减少Hash表的大小,占用的内存就会减少,从而避免内存溢出OOM的发生。

Spark通过多任务复用Worker的JVM,每个节点所有任务的执行是在同一个JVM上的线程池中执行的,这样就减少了线程的启动开销,可以高效地支持单个任务200ms的执行时间。通过这个机制,可以安全地将任务数量的配置扩展到超过集群的整体的CPU core数,而不会出现问题。