4.8.2 MapReduce Job中全局共享数据
在编写MapReduce代码的时候,经常会遇到这样的困扰:全局变量应该如何保存?如何让每个处理都能获取保存的这些全局变量?在编程过程中全局变量的使用是不可避免的,但是在MapReduce中直接使用代码级别的全局变量是不现实的。这主要是因为继承Mapper基类的Map阶段类的运行和继承Reducer基类的Reduce阶段类的运行都是独立的,并不像代码看起来的那样会共享同一个Java虚拟机的资源。下面介绍几种在MapReduce编程中相对有效的设置全局共享数据的方法。
1.读写HDFS文件
在MapReduce框架中,Map task和Reduce task都运行在Hadoop集群的节点上,所以Map task和Reduce task、甚至不同的Job都可以通过读写HDFS中预定好的同一个文件来实现全局共享数据。具体实现是利用Hadoop的Java API(关于Java API请参见第9章)来完成的。需要注意的是,针对多个Map或Reduce的写操作会产生冲突,覆盖原有数据。
这种方法的优点是能够实现读写,也比较直观;而缺点是要共享一些很小的全局数据也需要使用I/O,这将占用系统资源,增加作业完成的资源消耗。
2.配置Job属性
在MapReduce执行过程中,task可以读取Job的属性。基于这个特性,大家可以在任务启动之初利用Configuration类中的set(String name, String value)将一些简单的全局数据封装到作业的配置属性中,然后在task中再利用Configuration类中的get(String name)获取配置到属性中的全局数据。这种方法的优点是简单,资源消耗小;缺点是对量比较大的共享数据显得比较无力。
3.使用DistributedCache
DistributedCache是MapReduce为应用提供缓存文件的只读工具,它可以缓存文本文件、压缩文件和jar文件等。在使用时,用户可以在作业配置时使用本地或HDFS文件的URL来将其设置成共享缓存文件。在作业启动之后和task启动之前,MapReduce框架会将可能需要的缓存文件复制到执行任务节点的本地。这种方法的优点是每个Job共享文件只会在启动之后复制一次,并且它适用于大量的共享数据;而缺点是它是只读的。下面举一个简单的例子说明如何使用DistributedCache(具体的示例程序可查看本书附录C)。
1)将要缓存的文件复制到HDFS上。
$bin/hadoop fs-copyFromLocal lookup/myapp/lookup
2)启用作业的属性配置,并设置待缓存文件。
Configuration conf=new Configuration();
DistributedCache.addCacheFile(new URI("/myapp/lookup#lookup"),conf);
3)在Map函数中使用DistributedCache。
public static class Map extends Mapper<Object, Text, Text, Text>{
private Path[]localArchives;
private Path[]localFiles;
public void setup(Context context
)throws IOException, InterruptedException{
//获取缓存文件
Configuration conf=context.getConfiguration();
localArchives=DistributedCache.getLocalCacheArchives(conf);
localFiles=DistributedCache.getLocalCacheFiles(conf);
}
public void map(K key, V value,
Context context)
throws IOException{
//使用从缓存文件中获取的数据
//……
//……
Context.collect(k, v);
}
}