8.4.2 Shuffle和Merge阶段分析

在Reduce Task中,Shuffle阶段和Merge阶段是并行进行的。当远程拷贝数据量达到一定阈值后,便会触发相应的合并线程对数据进行合并。这两个阶段均是由类ReduceCopier实现的,该类大约包含2 200行代码(整个ReduceTask类才2 900行左右)。如图8-23所示,总体上看,Shuffle&Merge阶段可进一步划分为三个子阶段。

(1)准备运行完成的Map Task列表

GetMapEventsThread线程周期性通过RPC从TaskTracker获取已完成Map Task列表,并保存到映射表mapLocations(保存了TaskTracker Host与已完成任务列表的映射关系)中。为防止出现网络热点,Reduce Task通过对所有TaskTracker Host进行“混洗”操作以打乱数据拷贝顺序,并将调整后的Map Task输出数据位置保存到scheduledCopies列表中。

(2)远程拷贝数据

Reduce Task同时启动多个MapOutputCopier线程,这些线程从scheduledCopies列表中获取Map Task输出位置,并通过HTTP Get远程拷贝数据。对于获取的数据分片,如果大小超过一定阈值,则存放到磁盘上,否则直接放到内存中。

(3)合并内存文件和磁盘文件

为了防止内存或者磁盘上的文件数据过多,Reduce Task启动了LocalFSMerger和InMemFSMergeThread两个线程分别对内存和磁盘上的文件进行合并。

8.4.2 Shuffle和Merge阶段分析 - 图1

图 8-23 Shuffle阶段工作流程

接下来,我们将详细剖析每个阶段的内部实现细节。

(1)准备运行完成的Map Task列表

第7章讲到,TaskTracker启动了MapEventsFetcherThread线程。该线程会周期性(周期为心跳时间间隔)通过RPC从JobTracker上获取已经运行完成的Map Task列表,并保存到TaskCompletionEvent类型列表allMapEvents中。

而对于Reduce Task而言,它会启动GetMapEventsThread线程。该线程周期性通过RPC从TaskTracker上获取已运行完成的Map Task列表,并将成功运行完成的Map Task放到列表mapLocations中,具体如图8-24所示。

8.4.2 Shuffle和Merge阶段分析 - 图2

图 8-24 Reduce Task获取完成Map Task列表

为了避免出现数据访问热点(大量进程集中读取某个TaskTracker上的数据),Reduce Task不会直接将列表mapLocations中的Map Task输出数据位置交给MapOutputCopier线程,而是事先进行一次预处理:将所有TaskTracker Host进行混洗操作(随机打乱顺序),然后保存到scheduledCopies列表中,而MapOutputCopier线程将从该列表中获取待拷贝的Map Task输出数据位置。需要注意的是,对于一个TaskTracker而言,曾拷贝失败的Map Task将优先获得拷贝机会。

(2)远程拷贝数据

Reduce Task同时启动mapred.reduce.parallel.copies(默认是5)个数据拷贝线程MapOutputCopier。该线程从scheduledCopies列表中获取Map Task数据输出描述对象,并利用HTTP Get从对应的TaskTracker远程拷贝数据,如果数据分片大小超过一定阈值,则将数据临时写到工作目录下,否则直接保存到内存中。不管是保存到内存中还是磁盘上,MapOutputCopier均会保存一个MapOutput对象描述数据的元信息。如果数据被保存到内存中,则将该对象添加到列表mapOutputsFilesInMemory中,否则将该对象保存到列表mapOutputFilesOnDisk中。

在Reduce Task中,大部分内存用于缓存从Map Task端拷贝的数据分片,这些内存占到JVM Max Heap Size(由参数-Xmx指定)的mapred.job.shuffle.input.buffer.percent(默认是0.70)倍,并由类ShuffleRamManager管理。Reduce Task规定,如果一个数据分片大小未超过该内存的0.25倍,则可存放到内存中。如果MapOutputCopier线程要拷贝的数据分片可存放到内存中,则它先要向ShuffleRamManager申请相应的内存,待同意后才会正式拷贝数据,否则需要等待它释放内存。

由于远程拷贝数据可能需要跨网络读取多个节点上的数据,期间很容易由于网络或者磁盘等原因造成读取失败,因此提供良好的容错机制是非常有必要的。当出现拷贝错误时,Reduce Task提供了以下几个容错机制:

❑如果拷贝数据出错次数超过abortFailureLimit,则杀死该Reduce Task(等待调度器重新调度执行),其中,abortFailureLimit计算方法如下:

abortFailureLimit=max{30,numMaps/10}

❑如果拷贝数据出错次数超过maxFetchFailuresBeforeReporting(可通过参数mapreduce.reduce.shuffle.maxfetchfailures设置,默认是10),则进行一些必要的检查[1],以决定是否杀死该Reduce Task。

❑如果前两个条件均不满足,则采用对数回归模型推迟一段时间后重新拷贝对应MapTask的输出数据,其中延迟时间delayTime的计算方法如下:

delayTime=10 000×1. 3noFailedFetches

其中noFailedFetches为拷贝错误次数。

(3)合并内存文件和磁盘文件

前面提到,Reduce Task从Map Task端拷贝的数据,可能保存到内存或者磁盘上。随着拷贝数据的增多,内存或者磁盘上的文件数目也必将增加,为了减少文件数目,在数据拷贝过程中,线程LocalFSMerger和InMemFSMergeThread将分别对内存和磁盘上的文件进行合并。

对于磁盘上文件,当文件数目超过(2*ioSortFactor-1)后(ioSortFactor值由参数io.sort.factor指定,默认是10),线程LocalFSMerger会从列表mapOutputFilesOnDisk中取出最小的ioSortFactor个文件进行合并,并将合并后的文件再次写到磁盘上。

对于内存中的文件,当满足以下几个条件之一时,InMemFSMergeThread线程会将内存中所有数据合并后写到磁盘上:

❑所有数据拷贝完毕后,关闭ShuffleRamManager。

❑ShuffleRamManager中已使用内存超过可用内存的mapred.job.shuffle.merge.percent(默认是66%)倍且内存文件数目超过2个。

❑内存中的文件数目超过mapred.inmem.merge.threshold(默认是1 000)。

❑阻塞在ShuffleRamManager上的请求数目超过拷贝线程数目mapred.reduce.parallel.copies的75%。

[1]综合考虑Reduce Task拷贝失败的数据分片比例、拷贝成功的数据分片比例和最近来拷贝数据持续间隔等因素。