4.8 MapReduce工作流

到目前为止,已经讲述了使用MapReduce编写程序的机制。不过还没有讨论如何将数据处理问题转化为MapReduce模型。

数据处理只能解决一些非常简单的问题。如果处理过程变得复杂了,这种复杂性会通过更加复杂、完善的Map和Reduce函数,甚至更多的MapReduce工作来体现。下面简单介绍一些比较复杂的MapReduce编程知识。

4.8.1 复杂的Map和Reduce函数

从前面Map和Reduce函数的代码很明显可以看出,Map和Reduce都继承自MapReduce自己定义好的Mapper和Reducer基类,MapReduce框架根据用户继承Mapper和Reducer后的衍生类和类中覆盖的核心函数来识别用户定义的Map处理阶段和Reduce处理阶段。所以只有用户继承这些类并且实现其中的核心函数,提交到MapReduce框架上的作业才能按照用户的意愿被解析出来并执行。前面介绍的MapReduce作业仅仅继承并覆盖了基类中的核心函数Map或Reduce,下面介绍基类中的其他函数,使大家能够编写功能更加复杂、控制更加完备的Map和Reduce函数。

1.setup函数

此函数在基类中的源码如下:


/**

*Called once at the start of the task.

*/

protected void setup(Context context

)throws IOException, InterruptedException{

//NOTHING

}


从上面的注释可以看出,setup函数是在task启动开始就调用的。在这里先温习一下task的知识。在MapReduce中作业会被组织成Map task和Reduce task。每个task都以Map类或Reduce类为处理方法主体,输入分片为处理方法的输入,自己的分片处理完之后task也就销毁了。从这里可以看出,setup函数在task启动之后数据处理之前只调用一次,而覆盖的Map函数或Reduce函数会针对输入分片中的每个key调用一次。所以setup函数可以看做task上的一个全局处理,而不像在Map函数或Reduce函数中,处理只对当前输入分片中的正在处理数据产生作用。利用setup函数的特性,大家可以将Map或Reduce函数中的重复处理放置到setup函数中,可以将Map或Reduce函数处理过程中可能使用到的全局变量进行初始化,或从作业信息中获取全局变量,还可以监控task的启动。需要注意的是,调用setup函数只是对应task上的全局操作,而不是整个作业的全局操作。

2.cleanup函数

cleanup函数在基类中的源码如下:


/**

*Called once at the end of the task.

*/

protected void cleanup(Context context

)throws IOException, InterruptedException{

//NOTHING

}


从这个函数的注释中可以看到,它跟setup函数相似,不同之处在于cleanup函数是在task销毁之前执行的。它的作用和setup也相似,区别仅在于它的启动处在task销毁之前,所以不再赘述cleanup的作用。大家应根据具体使用环境和这两个函数的特点,做出恰当的选择。

3.run函数

run函数在基类中的源码如下:


/**

*Expert users can override this method for more complete control over the

*execution of the Mapper.

*@param context

*@throws IOException

*/

public void run(Context context)throws IOException, InterruptedException{

setup(context);

while(context.nextKeyValue()){

map(context.getCurrentKey(),context.getCurrentValue(),context);

}

cleanup(context);

}


从上面函数的主体内容和代码的注释可以看出,此函数是Map类或Reduce类的启动方法:先调用setup函数,然后针对每个key调用一次Map函数或Reduce函数,最后销毁task之前再调用cleanup函数。这个run函数将Map阶段和Reduce阶段的代码过程呈现给了大家。正如注释中所说,如果想更加完备地控制Map或者Renduce阶段,可以覆盖此函数,并像普通的Java类中的函数一样添加自己的控制内容,比如增加自己的task启动之后和销毁之前的处理,或者在while循环内外再定义自己针对每个key的处理内容,甚至可以对Map和Reduce函数的处理结果进行进一步的处理。