6.4 Shuffle和排序
从前面的介绍中我们得知,Map的输出会经过一个名为shuffle的过程交给Reduce处理(在“MapReduce数据流”图中也可以看出),当然也有Map的结果经过sort-merge交给Reduce处理的。其实在MapReduce流程中,为了让Reduce可以并行处理Map结果,必须对Map的输出进行一定的排序和分割,然后再交给对应的Reduce,而这个将Map输出进行进一步整理并交给Reduce的过程就成为了shuffle。从shuffle的过程可以看出,它是MapReduce的核心所在,shuffle过程的性能与整个MapReduce的性能直接相关。
总体来说,shuffle过程包含在Map和Reduce两端中。在Map端的shuffle过程是对Map的结果进行划分(partition)、排序(sort)和分割(spill),然后将属于同一个划分的输出合并在一起(merge)并写在磁盘上,同时按照不同的划分将结果发送给对应的Reduce(Map输出的划分与Reduce的对应关系由JobTracker确定)。Reduce端又会将各个Map送来的属于同一个划分的输出进行合并(merge),然后对merge的结果进行排序,最后交给Reduce处理。下面将从Map和Reduce两端详细介绍shuffle过程。
6.4.1 Map端
从MapReduce的程序中可以看出,Map的输出结果是由collector处理的,所以Map端的shuffle过程包含在collect函数对Map输出结果的处理过程中。下面从具体的代码来分析Map端的shuffle过程。
首先从collect函数的代码入手(MapTask类)。从下面的代码段可以看出Map函数的输出内存缓冲区是一个环形结构。
final int kvnext=(kvindex+1)%kvoffsets.length;
当输出内存缓冲区内容达到设定的阈值时,就需要把缓冲区内容分割(spill)到磁盘中。但是在分割的时候Map并不会阻止继续向缓冲区中写入结果,如果Map结果生成的速度快于写出速度,那么缓冲区会写满,这时Map任务必须等待,直到分割写出过程结束。这个过程可以参考下面的代码。
do{
//在环形缓冲区中,如果下一个空闲位置同起始位置相等,那么缓冲区
//已满
kvfull=kvnext==kvstart;
//环形缓冲区的内容是否达到写出的阈值
final boolean kvsoftlimit=((kvnext>kvend)
?kvnext-kvend>softRecordLimit
:kvend-kvnext<=kvoffsets.length-softRecordLimit);
//达到阈值,写出缓冲区内容,形成spill文件
if(kvstart==kvend&&kvsoftlimit){
startSpill();
}
//如果缓冲区满,则Map任务等待写出过程结束
if(kvfull){
while(kvstart!=kvend){
reporter.progress();
spillDone.await();
}
}
}while(kvfull);
在collect函数中将缓冲区中的内容写出时会调用sortAndSpill函数。sortAndSpill每被调用一次就会创建一个spill文件,然后按照key值对需要写出的数据进行排序,最后按照划分的顺序将所有需要写出的结果写入这个spill文件中。如果用户作业配置了combiner类,那么在写出过程中会先调用combineAndSpill()再写出,对结果进行进一步合并(combine)是为了让Map的输出数据更加紧凑。sortAndSpill函数的执行过程可以参考下面sortAndSpill函数的代码。
//创建spill文件
Path filename=mapOutputFile.getSpillFileForWrite(numSpills, size);
out=rfs.create(filename);
……
//按照key值对待写出数据进行排序
sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
……
//按照划分将数据写入文件
for(int i=0;i<partitions;++i){
IFile.Writer<K, V>writer=null;
long segmentStart=out.getPos();
writer=new Writer<K, V>(job, out, keyClass, valClass, codec, spilledRecordsCounter);
//如果没有配置combiner类,数据直接写入文件
if(null==combinerClass){
……
}
else{
……
//如果配置了combiner类,则先调用combineAndSpill函
//数后再写入文件
combineAndSpill(kvIter, combineInputCounter);
}
}
显然,直接将每个Map生成的众多spill文件(因为Map过程中,每一次缓冲区写出都会产生一个spill文件)交给Reduce处理不现实。所以在每个Map任务结束之后在Map的TaskTracker上还会执行合并操作(merge),这个操作的主要目的是将Map生成的众多spill文件中的数据按照划分重新组织,以便于Reduce处理。主要做法是针对指定的分区,从各个spill文件中拿出属于同一个分区的所有数据,然后将它们合并在一起,并写入一个已分区且已排序的Map输出文件中。这个过程的详细情况请参考mergeParts()函数的代码,这里不再列出。
待唯一的已分区且已排序的Map输出文件写入最后一条记录后,Map端的shuffle阶段就结束了。下面就进入Reduce端的shuffle阶段。