8.3.2 Collect过程分析

待map()函数处理完一对key/value,并产生新的key/value后,会调用OutputCollector.collect()函数输出结果。本小节重点剖析该函数内部实现机制。

跟踪进入Map Task的入口函数run(),可发现,如果用户选用旧API,则会调用runOldMapper函数处理数据。该函数根据实际的配置创建合适的MapRunnable以迭代调用用户编写的map()函数,而map()函数的参数OutputCollector正是MapRunnable传入的OldOutputCollector对象。

OldOutputCollector根据作业是否包含Reduce Task封装了不同的MapOutputCollector实现,如果Reduce Task数目为0,则封装DirectMapOutputCollector对象直接将结果写入HDFS中作为最终结果,否则封装MapOutputBuffer对象暂时将结果写入本地磁盘上以供Reduce Task进一步处理。本小节主要分析Reduce Task数目非0的情况。

用户在map()函数中调用OldOutputCollector.collect(key, value)后,在该函数内部,首先会调用Partitioner.getPartition()函数获取记录的分区号partition,然后将三元组<key, value, partition>传递给MapOutputBuffer.collect()函数做进一步处理。

MapOutputBuffer内部使用了一个缓冲区暂时存储用户输出数据,当缓冲区使用率达到一定阈值后,再将缓冲区中的数据写到磁盘上。数据缓冲区的设计方式直接影响到Map Task的写效率,而现有多种数据结构可供选择,最简单的是单向缓冲区,生产者向缓冲区中单向写入输出,当缓冲区写满后,一次性写到磁盘上,就这样,不断写缓冲区,直到所有数据写到磁盘上。单向缓冲区最大的问题是性能不高,不能支持同时读写数据。双缓冲区是对单向缓冲区的一个改进,它使用两个缓冲区,其中一个用于写入数据,另一个将写满的数据写到磁盘上,这样,两个缓冲区交替读写,进而提高效率。实际上,双缓冲区只能一定程度上让读写并行,仍会存在读写等待问题。一种更好的缓冲区设计方式是采用环形缓冲区:当缓冲区使用率达到一定阈值后,便开始向磁盘上写入数据,同时,生产者仍可以向不断增加的剩余空间中循环写入数据,进而达到真正的读写并行。三种缓冲区结构如图8-7所示。

8.3.2 Collect过程分析 - 图1

图 8-7 三种缓冲区结构

a)单向缓冲区 b)双缓冲区 c)循环缓冲区

MapOutputBuffer正是采用了环形内存缓冲区保存数据[1],当缓冲区使用率达到一定阈值后,由线程SpillThread将数据写到一个临时文件中,当所有数据处理完毕后,对所有临时文件进行一次合并以生成一个最终文件。环形缓冲区使得Map Task的Collect阶段和Spill阶段可并行进行。

MapOutputBuffer内部采用了两级索引结构(见图8-8),涉及三个环形内存缓冲区,分别是kvoffsets、kvindices和kvbuffer,这三个缓冲区所占内存空间总大小为io.sort.mb(默认是100 MB)。下面分别介绍这三个缓冲区的含义。

8.3.2 Collect过程分析 - 图2

图 8-8 MapOutputBuffer的两级索引结构

(1)kvoffsets

kvoffsets即偏移量索引数组,用于保存key/value信息在位置索引kvindices中的偏移量。考虑到一对key/value需占用数组kvoffsets的1个int(整型)大小,数组kvindices的3个int大小(分别保存所在partition号、key开始位置和value开始位置),所以Hadoop按比例1:3将大小为${io.sort.record.percent}*${io.sort.mb}的内存空间分配给数组kvoffsets和kvindices,其间涉及的缓冲区分配方式见图8-9,计算过程如下:


private static final int ACCTSIZE=3;//每对key/value占用kvindices中的三项

private static final int RECSIZE=(ACCTSIZE+1)*4;//每对key/value共占用

kvoffsets和kvindices中的4个字节(4*4=16 byte)

final float recper=job.getFloat("io.sort.record.percent",(float)0.05);

final int sortmb=job.getInt("io.sort.mb",100);

int maxMemUsage=sortmb<<20;//将内存单位转化为字节

int recordCapacity=(int)(maxMemUsage*recper);

recordCapacity-=recordCapacity%RECSIZE;//保证recordCapacity是4*4的整数倍

recordCapacity/=RECSIZE;//计算内存中最多保存key/value数目

kvoffsets=new int[recordCapacity];//kvoffsets占用1:3中的1

kvindices=new int[recordCapacity*ACCTSIZE];//kvindices占用1:3中的3


当该数组使用率超过io.sort.spill.percent后,便会触发线程SpillThread将数据写入磁盘。

(2)kvindices

kvindices即位置索引数组,用于保存key/value值在数据缓冲区kvbuffer中的起始位置。

(3)kvbuffer

kvbuffer即数据缓冲区,用于保存实际的key/value值,默认情况下最多可使用io.sort.mb中的95%,当该缓冲区使用率超过io.sort.spill.percent后,便会触发线程SpillThread将数据写入磁盘。

8.3.2 Collect过程分析 - 图3

图 8-9 内存io.sort.mb的分配方式

以上几个缓冲区读写采用了典型的单生产者消费者模型,其中,MapOutputBuffer的collect方法和MapOutputBuffer.Buffer的write方法是生产者,spillThread线程是消费者,它们之间同步是通过可重入的互斥锁spillLock和spillLock上的两个条件变量(spillDone和spillReady)完成的。生产者主要的伪代码如下:


//取得下一个可写入的位置

spillLock.lock();

if(缓冲区使用率达到阈值){

//唤醒SpillThread线程,将缓冲区数据写入磁盘

spillReady.signal();

}

if(缓冲区满){

//等待SpillThread线程结束

spillDone.wait();

}

spillLock.lock();

//将数据写入缓冲区


下面分别介绍环形缓冲区kvoffsets和kvbuffer的数据写入过程。

(1)环形缓冲区kvoffsets

通常用一个线性缓冲区模拟实现环形缓冲区,并通过取模操作实现循环数据存储。下面介绍环形缓冲区kvoffsets的写数据过程。该过程由指针kvstart/kvend/kvindex控制,其中kvstart表示存有数据的内存段初始位置,kvindex表示未存储数据的内存段初始位置,而在正常写入情况下,kvend=kvstart,一旦满足溢写条件,则kvend=kvindex,此时指针区间[kvstart, kvend)为有效数据区间。具体涉及的操作如下。

操作1:写入缓冲区。

直接将数据写入kvindex指针指向的内存空间,同时移动kvindex指向下一个可写入的内存空间首地址,kvindex移动公式为:kvindex=(kvindex+1)%kvoffsets.length。由于kvoffsets为环形缓冲区,因此可能涉及两种写入情况。

情况1:kvindex>kvend,如图8-10所示。在这种情况下,指针kvindex在指针kvend后面,如果向缓冲区中写入一个字符串,则kvindex指针后移一位。

8.3.2 Collect过程分析 - 图4

图 8-10 环形缓冲区在kvindex>kvend情况下写入数据

情况2:kvindex<=kvend,如图8-11所示。在这种情况下,指针kvindex位于指针kvend前面,如果向缓冲区中写入一个字符串,则kvindex指针后移一位。

8.3.2 Collect过程分析 - 图5

图 8-11 环形缓冲区在kvindex<=kvend情况下写入数据

操作2:溢写到磁盘。

当kvoffsets内存空间使用率超过io.sort.spill.percent(默认是80%)后,需将内存中数据写到磁盘上。为了判断是否满足该条件,需先求出kvoffsets已使用内存。如果kvindex>kvend,则已使用内存大小为kvindex-kvend;否则,已使用内存大小为kvoffsets.length-(kvend-kvindex)。

(2)环形缓冲区kvbuffer

环形缓冲区kvbuffer的读写操作过程由指针bufstart/bufend/bufvoid/bufindex/bufmark控制,其中,bufstart/bufend/bufindex含义与kvstart/kvend/kvindex相同,而bufvoid指向kvbuffer中有效内存结束为止,kvbuffer表示最后写入的一个完整key/value结束位置,具体写入过程中涉及的状态和操作如下:

情况1:初始状态。

初始状态下,bufstart=bufend=bufindex=bufmark=0,bufvoid=kvbuffer.length,如图8-12所示。

8.3.2 Collect过程分析 - 图6

图 8-12 初始状态下的kvbuffer

情况2:写入一个key。

写入一个key后,需移动bufindex指针到可写入内存初始位置,如图8-13所示。

8.3.2 Collect过程分析 - 图7

图 8-13 向kvbuffer中写入一个key

情况3:写入一个value。

写入key对应的value后,除移动bufindex指针外,还要移动bufmark指针,表示已经写入一个完整的key/value,具体如图8-14所示。

8.3.2 Collect过程分析 - 图8

图 8-14 向kvbuffer中写入一个value

情况4:不断写入key/value,直到满足溢写条件,即kvoffsets或者kvbuffer空间使用率超过io.sort.spill.percent(默认值为80%)。此时需要将数据写到磁盘上,如图8-15所示。

8.3.2 Collect过程分析 - 图9

图 8-15 kvbuffer使用率达到阈值

情况5:溢写。

如果达到溢写条件,则令bufend←bufindex,并将缓冲区[bufstart, bufend)之间的数据写到磁盘上,具体如图8-16所示。

8.3.2 Collect过程分析 - 图10

图 8-16 准备开始溢写

溢写完成之后,恢复正常写入状态,令bufstart←bufend,如图8-17所示。

8.3.2 Collect过程分析 - 图11

图 8-17 溢写完成

在溢写的同时,Map Task仍可向kvbuffer中写入数据,如图8-18所示。

8.3.2 Collect过程分析 - 图12

图 8-18 溢写的同时,被写入新数据

情况6:写入key时,发生跨界现象。

当写入某个key时,缓冲区尾部剩余空间不足以容纳整个key值,此时需要将key值分开存储,其中一部分存到缓冲区末尾,另外一部分存到缓冲区首部,具体如图8-19所示。

8.3.2 Collect过程分析 - 图13

图 8-19 写入key时,发生越界

情况7:调整key位置,防止key跨界现象。

由于key是排序的关键字,通常需交给RawComparator进行排序,而它要求排序关键字必须在内存中连续存储,因此不允许key跨界存储。为解决该问题,Hadoop将跨界的key值重新存储到缓冲区的首位置,通常可分为以下两种情况。

❑bufindex+(bufvoid-bufmark)<bufstart:此时缓冲区前半段有足够的空间容纳整个key值,因此可通过两次内存复制解决跨行问题,具体如图8-20所示。


int headbytelen=bufvoid-bufmark;

System.arraycopy(kvbuffer,0,kvbuffer, headbytelen, bufindex);

System.arraycopy(kvbuffer, bufvoid, kvbuffer,0,headbytelen);


8.3.2 Collect过程分析 - 图14

图 8-20 调整位置,避免key跨界

❑bufindex+(bufvoid-bufmark)>=bufstart:此时缓冲区前半段没有足够的空间容纳整个key值,将key值移到缓冲区开始位置时将触发一次Spill操作。这种情况下,可通过三次内存复制解决跨行问题:


byte[]keytmp=new byte[bufindex];//申请一个临时缓冲区

System.arraycopy(kvbuffer,0,keytmp,0,bufindex);

bufindex=0;

out.write(kvbuffer, bufmark, headbytelen);//将key值写入缓冲区开始位置

out.write(keytmp);


情况8:某个key或者value太大,以至于整个缓冲区不能容纳它。

如果一条记录的key或value太大,整个缓冲区都不能容纳它,则Map Task会抛出MapBufferTooSmallException异常,并将该记录单独输出到一个文件中。

(3)环形缓冲区优化

在Hadoop 1.X版本中,当满足以下两个条件之一时,Map Task会发生溢写现象。

条件1:缓冲区kvindices或者kvbuffer的空间使用率达到io.sort.spill.percent(默认值为80%)。

条件2:出现一条kvbuffer无法容纳的超大记录。

前面提到,Map Task将可用的缓冲区空间io.sort.mb按照一定比例(由参数io.sort.record.percent决定)静态分配给了kvoffsets、kvindices和kvbuffer三个缓冲区,而正如条件1所述,只要任何一个缓冲区的使用率达到一定比例,就会发生溢写现象,即使另外的缓冲区使用率非常低。因此,设置合理的io.sort.record.percent参数,对于充分利用缓冲区空间和减少溢写次数,是十分必要的。考虑到每条数据(一个key/value对)需占用索引大小为16 B,因此,建议用户采用以下公式[2]设置io.sort.record.percent:


io.sort.record.percent=16/(16+R)


其中R为平均每条记录的长度。

【实例】假设一个作业的Map Task输入数据量和输出数据量相同,每个Map Task输入数据量大小为128 MB,且共有1 342 177条记录,每条记录大小约为100 B,则需要索引大小为16*1 342 177=20.9 MB。根据这些信息,可设置参数如下:

❑io. sort.mb:128 MB+20.9 MB=148.9 MB

❑io. sort.record.percent:16/(16+100)=0.138

❑io. sort.spill.percent:1.0

这样配置可保证数据只“落”一次地,效率最高!当然,实际使用时可能很难达到这种情况,比如每个Map Task输出数据量非常大,缓冲区难以全部容纳它们,但你至少可以设置合理的io.sort.record.percent以更充分地利用io.sort.mb并尽可能减少中间文件数目。

尽管用户可通过该经验公式设置一个较优的io.sort.record.percent参数,但在实际应用中,估算一个非常合理的R值仍是较麻烦的。为了从根本上解决这个问题,Hadoop 0.21采用共享环形缓冲区对Map Task输出数据的组织方式进行了优化,这样,用户无须再为自己的作业设置io.sort.record.percent参数。如图8-21所示,Hadoop 0.21主要有两个修改点[3]

❑不再将索引和记录分放到不同的环形缓冲区中,而是让它们共用一个环形缓冲区。

❑引入一个新的指针equator。该指针界定了索引和数据的共同起始存放位置。从该位置开始,索引和数据分别沿相反的方向增长内存使用空间。

通过让索引和记录共享一个环形缓冲区,可舍弃io.sort.record.percent参数,这样,不仅解决了用户设置参数的苦恼,也使得Map Task能够最大限度地利用io.sort.mb空间,进而减少磁盘溢写次数,提高效率。

8.3.2 Collect过程分析 - 图15

图 8-21 Hadoop 0.21中Map Task的环形缓冲区结构

[1]Hadoop也曾采用过双缓冲区,具体可参考:https://issues.apache.org/jira/browse/HADOOP-1965。

[2]Todd Lipcon, Cloudera,“Optimiziong MapReduce Job Performance”,Hadoop Summit 2012.

[3]https://issues. apache.org/jira/browse/MAPREDUCE-64