5.4 Hadoop DistributedCache原理分析

DistributedCache是Hadoop为方便用户进行应用程序开发而设计的文件分发工具。它能够将只读的外部文件自动分发到各个节点上进行本地缓存,以便Task运行时加载使用。它的大体工作流程如下:用户提交作业后,Hadoop将由-files和-archives选项指定的文件复制到JobTracker的文件系统(一般为HDFS)中;之后,当某个TaskTracker收到该作业的第一个Task后,该任务将负责从JobTracker文件系统中将文件下载到本地磁盘进行缓存,这样后续的Task就可以直接在本地访问这些文件了。除了文件分发外,DistributedCache还可用于软件自动安装部署。比如,用户使用PHP语言编写了MapReduce程序,为了能够让程序成功运行,用户需要求运维人员在Hadoop集群的各个节点上提前安装好PHP解释器,而当需要升级PHP解释器时,可能需通知Hadoop运维人员进行一次升级,这使得软件升级变得非常麻烦。为了让软件升级变得更可控,用户可采用DistributedCache将PHP解释器分发到各个节点上,每次运行程序时,DistributedCache会检查PHP解释器被改过(比如升级新版本),如果是,则会自动重新下载。

本节首先介绍Hadoop DistributedCache使用场景和使用方法,接着介绍其工作原理。

5.4.1 使用方法介绍

用户编写的MapReduce应用程序往往需要一些外部的资源,比如分词程序需词表文件,或者依赖于三方的jar包。这时候,我们希望每个Task初始化时能够加载这些文件,而DistributedCache正是为了完成该功能而提供的。

使用Hadoop DistributedCache通常有两种方法:调用相关API和设置命令行参数。

(1)调用相关API

Hadoop DistributedCache允许用户分发归档文件(后缀为.zip、.jar、.tar、.tgz或者.tar.gz的文件)和普通文件,对应的API如下:


//添加归档文件

void addCacheArchive(URI uri, Configuration conf)

void setCacheArchives(URI[]archives, Configuration conf)

//添加普通文件

void addCacheFile(URI uri, Configuration conf)

void setCacheFiles(URI[]files, Configuration conf)

//将三方jar包或者动态库添加到classpath中

void addFileToClassPath(Path file, Configuration conf)

//在任务工作目录下建立文件软连接

void createSymlink(Configuration conf)


使用Hadoop DistributedCache可分为3个步骤:

步骤1 在HDFS上准备好文件(文本文件、压缩文件、jar包等),并按照文件可见级别设置目录/文件的访问权限;

步骤2 调用相关API添加文件信息,这里主要是配置作业的JobConf对象;

步骤3 在Mapper或者Reducer类中使用文件,Mapper或者Reducer开始运行前,各种文件已经下载到本地的工作目录中,直接调用文件读写API即可获取文件内容。

【实例】假设一个MapReduce应用程序需要dictionary.zip、blacklist.txt、whitelist.txt和third-party.jar四个文件,其中,dictionary.zip和third-party.jar为private可见级别,而blacklist.txt和whitelist.txt为public可见级别,则可按以下步骤分发这些文件:

步骤1 准备文件。将文件dictionary.zip和third-party.jar上传到HDFS上的目录/data/private/中,blacklist.txt和whitelist.txt上传到目录/data/public/中。其中,目录/data/private/的权限为“drwxr-xr—”,目录/data/public/的权限为“drwxr-xr-x”。


$bin/hadoop fs-copyFromLocal dictionary.zip/data/private/

$bin/hadoop fs-copyFromLocal third-party.jar/data/private/

$bin/hadoop fs-copyFromLocal blacklist.txt/data/public/

$bin/hadoop fs-copyFromLocal whitelist.txt/data/public/


步骤2 配置JobConf。


JobConf job=new JobConf();

DistributedCache.addCacheFile(new URI("/data/public/blacklist.txt#blacklist"),job);

DistributedCache.addCacheFile(new URI("/data/public/whitelist.txt#whitelist",job);

DistributedCache.addFileToClassPath(new Path("/data/private/third-party.jar"),job);

DistributedCache.addCacheArchive(new URI("/data/private/dictionary.zip",job);

DistributedCache.createSymlink(job);


步骤3 在Mapper或者Reducer类中使用文件。


public static class MapClass extends MapReduceBase

implements Mapper<K, V,K, V>{

private Path[]localArchives;

private Path[]localFiles;

public void configure(JobConf job){

//在本地获取archives或者files

localArchives=DistributedCache.getLocalCacheArchives(job);

localFiles=DistributedCache.getLocalCacheFiles(job);

//调用文件API读取文件内容,保存到相关变量中

……

}

public void map(K key, V value,

OutputCollector<K, V>output, Reporter reporter)

throws IOException{

//在此使用缓存中的archives/files

//……

output.collect(k, v);

}

}


(2)设置命令行参数

这是一种比较简单且灵活的方法,但前提是用户编写MapReduce应用程序时实现了Tool接口支持常规选项。该方法包括两个步骤,其中第一个步骤与“调用相关API”的步骤1相同,而第二个步骤则是使用以下两种Shell命令之一提交作业。

Shell命令1:


$HADOOP_HOME/bin/hadoop jar xxx.jar\

-files=hdfs:///data/public/blacklist.txt#blacklist,\

hdfs:///data/public/whitelist.txt#whitelist\

-libjars=hdfs:///data/private/third-party.jar\

-archives=hdfs:///data/private/dictionary.zip\

……


Shell命令2:


$HADOOP_HOME/bin/hadoop jar xxx.jar\

-D mapred.cache.files=/data/public/blacklist.txt#blacklist,\

/data/public/whitelist.txt#whitelist\

-D mapred.cache.archives=/data/private/dictionary.zip\

-D mapred.job.classpath.files=/data/private/third-party.jar\

-D mapred.create.symlink=yes\

……