6.4.2 Reduce端

在Reduce端,shuffle阶段可以分成三个阶段:复制Map输出、排序合并和Reduce处理。下面按照这三个阶段进行详细介绍。

如前文所述,Map任务成功完成后,会通知父TashTracker状态已更新,TaskTracker进而通知JobTracker(这些通知在心跳机制中进行)。所以,对于指定作业来说,JobTracker能够记录Map输出和TaskTracker的映射关系。Reduce会定期向JobTracker获取Map的输出位置。一旦拿到输出位置,Reduce任务就会从此输出对应的TaskTracker上复制输出到本地(如果Map的输出很小,则会被复制到执行Reduce任务的TaskTracker节点的内存中,便于进一步处理,否则会放入磁盘),而不会等到所有的Map任务结束。这就是Reduce任务的复制阶段。

在Reduce复制Map的输出结果的同时,Reduce任务就进入了合并(merge)阶段。这一阶段主要的任务是将从各个Map TaskTracker上复制的Map输出文件(无论在内存还是在磁盘)进行整合,并维持数据原来的顺序。

reduce端的最后阶段就是对合并的文件进行reduce处理。下面是reduce Task上run函数的部分代码,从这个函数可以看出整个Reduce端的三个步骤。


//复制阶段,从map TaskTracker处获取Map输出

boolean isLocal="local".equals(job.get("mapred.job.tracker","local"));

if(!isLocal){

reduceCopier=new ReduceCopier(umbilical, job, reporter);

if(!reduceCopier.fetchOutputs()){

……

}

}

//复制阶段结束

copyPhase.complete();

//合并阶段,将得到的Map输出合并

setPhase(TaskStatus.Phase.SORT);

……

//合并阶段结束

sortPhase.complete();

//Reduce阶段

setPhase(TaskStatus.Phase.REDUCE);

//启动Reduce

Class keyClass=job.getMapOutputKeyClass();

Class valueClass=job.getMapOutputValueClass();

RawComparator comparator=job.getOutputValueGroupingComparator();

if(useNewApi){

runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);

}else{

runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);

}

done(umbilical, reporter);

}