9.2.5 其他优化方法
除了之前介绍的性能调优方法,还有一些其他方法可供使用。
1.批处理
有些程序可能会调用外部资源,如数据库连接等,这些连接通过JDBC或者ODBC与外部数据源进行交互。用户可能会在编写程序时忽略掉一个问题。例如,将所有数据写入数据库,如果是一条一条地写:
- rdd.map{line=>con=getConnection;
- con.write(line.toString);
- con.close}
因为整个RDD的数据项很大,整个集群会在短时间内产生高并发写入数据库的操作,对数据库压力很大,将产生很大的写入开销。
这里,可以将单条记录写转化为数据库的批量写,每个分区的数据写一次,这样可以利用数据库的批量写优化减少开销和减轻数据库压力。
- rdd.mapPartitions(lines => conn.getDBConn;
- for(item <- lines)
- write(item.toString);
- conn.close)
同理,对于其他类型的需要和外部资源进行交互的操作,也是应该采用这种处理方式。
2.reduce和reduceByKey的优化
reduce是Action操作,reduceByKey是Transformation操作。
reduce的源码如下。
- def reduce(f: (T, T) => T): T = {
- val cleanF = sc.clean(f)
- val reducePartition: Iterator[T] => Option[T] = iter => {
- if (iter.hasNext) {
- Some(iter.reduceLeft(cleanF))
- } else {
- None
- }
- }
- var jobResult: Option[T] = None
- val mergeResult = (index: Int, taskResult: Option[T]) => {
- if (taskResult.isDefined) {
- jobResult = jobResult match {
- case Some(value) => Some(f(value, taskResult.get))
- case None => taskResult
- }
- }
- }
- sc.runJob(this, reducePartition, mergeResult)
- jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
- }
在reduce函数中会触发sc.runJob,提交任务,reduce是一个Action操作符。
reduceByKey的源码如下。
- def reduceByKey(partitioner: Partitioner, func: (V, V) => V):
- RDD[(K, V)] = {
- combineByKey[V]((v: V) => v, func, func, partitioner)
- }
由代码可知,reduceByeKey并没有触发runJob,而是调用了combineByKey,该函数调用聚集器聚集数据。
reduce是一种聚合函数,可以把各个任务的执行结果汇集到一个节点,还可以指定自定义的函数传入reduce执行。Spark也对reduce的实现进行了优化,可以把同一个任务内的结果先在本地Worker节点执行聚合函数,再把结果传给Driver执行聚合。但最终数据还是要汇总到主节点,而且reduce会把接收到的数据保存到内存中,直到所有任务都完成为止。因此,当任务很多,任务的结果数据又比较大时Driver容易造成性能瓶颈,这样就应该考虑尽量避免reduce的使用,而将数据转化为Key-Value对,并使用reduceByKey实现逻辑,使计算变为分布式计算。
reduceByKey也是聚合操作,是根据key聚合对应的value。同样的,在每一个mapper把数据发送给reducer前,会在Map端本地先合并(类似于MapReduce中的Combiner)。与reduce不同的是,reduceByKey不是把数据汇集到Driver节点,是分布式进行的,因此不会存在reduce那样的性能瓶颈。
3.Shuffle操作符的内存使用
在有些情况下,应用将会遇到OutOfMemory的错误,其中并不是因为内存大小不能够容纳RDD,而是因为执行任务中使用的数据集合太大(如groupByKey)。Spark的Shuffle操作符(sortByKey、groupByKey、reduceByKey、join等都可以算是Shuffle操作符,因为这些操作会引发Shuffle)在执行分组操作的过程中,会在每个任务执行过程中,在内存创建Hash表来对数据进行分组,而这个Hash表在很多情况下通常变得很大。最简单的一种解决方案就是增加并行度,即增加任务数量和分区数量。这样每轮次每个Executor执行的任务数是固定的,每个任务接收的输入数据变少会减少Hash表的大小,占用的内存就会减少,从而避免内存溢出OOM的发生。
Spark通过多任务复用Worker的JVM,每个节点所有任务的执行是在同一个JVM上的线程池中执行的,这样就减少了线程的启动开销,可以高效地支持单个任务200ms的执行时间。通过这个机制,可以安全地将任务数量的配置扩展到超过集群的整体的CPU core数,而不会出现问题。