4.8.2 MapReduce Job中全局共享数据

在编写MapReduce代码的时候,经常会遇到这样的困扰:全局变量应该如何保存?如何让每个处理都能获取保存的这些全局变量?在编程过程中全局变量的使用是不可避免的,但是在MapReduce中直接使用代码级别的全局变量是不现实的。这主要是因为继承Mapper基类的Map阶段类的运行和继承Reducer基类的Reduce阶段类的运行都是独立的,并不像代码看起来的那样会共享同一个Java虚拟机的资源。下面介绍几种在MapReduce编程中相对有效的设置全局共享数据的方法。

1.读写HDFS文件

在MapReduce框架中,Map task和Reduce task都运行在Hadoop集群的节点上,所以Map task和Reduce task、甚至不同的Job都可以通过读写HDFS中预定好的同一个文件来实现全局共享数据。具体实现是利用Hadoop的Java API(关于Java API请参见第9章)来完成的。需要注意的是,针对多个Map或Reduce的写操作会产生冲突,覆盖原有数据。

这种方法的优点是能够实现读写,也比较直观;而缺点是要共享一些很小的全局数据也需要使用I/O,这将占用系统资源,增加作业完成的资源消耗。

2.配置Job属性

在MapReduce执行过程中,task可以读取Job的属性。基于这个特性,大家可以在任务启动之初利用Configuration类中的set(String name, String value)将一些简单的全局数据封装到作业的配置属性中,然后在task中再利用Configuration类中的get(String name)获取配置到属性中的全局数据。这种方法的优点是简单,资源消耗小;缺点是对量比较大的共享数据显得比较无力。

3.使用DistributedCache

DistributedCache是MapReduce为应用提供缓存文件的只读工具,它可以缓存文本文件、压缩文件和jar文件等。在使用时,用户可以在作业配置时使用本地或HDFS文件的URL来将其设置成共享缓存文件。在作业启动之后和task启动之前,MapReduce框架会将可能需要的缓存文件复制到执行任务节点的本地。这种方法的优点是每个Job共享文件只会在启动之后复制一次,并且它适用于大量的共享数据;而缺点是它是只读的。下面举一个简单的例子说明如何使用DistributedCache(具体的示例程序可查看本书附录C)。

1)将要缓存的文件复制到HDFS上。


$bin/hadoop fs-copyFromLocal lookup/myapp/lookup


2)启用作业的属性配置,并设置待缓存文件。


Configuration conf=new Configuration();

DistributedCache.addCacheFile(new URI("/myapp/lookup#lookup"),conf);


3)在Map函数中使用DistributedCache。


public static class Map extends Mapper<Object, Text, Text, Text>{

private Path[]localArchives;

private Path[]localFiles;

public void setup(Context context

)throws IOException, InterruptedException{

//获取缓存文件

Configuration conf=context.getConfiguration();

localArchives=DistributedCache.getLocalCacheArchives(conf);

localFiles=DistributedCache.getLocalCacheFiles(conf);

}

public void map(K key, V value,

Context context)

throws IOException{

//使用从缓存文件中获取的数据

//……

//……

Context.collect(k, v);

}

}