18.3 Hadoop Streaming的介绍和使用

18.3.1 Hadoop Streaming的介绍

Hadoop Streaming是Hadoop的一个工具,它帮助用户创建和运行一类特殊的MapReduce作业,这些特殊的MapReduce作业是由一些可执行文件或脚本文件充当Mapper或Reducer。也就是说Hadoop Streaming允许用户用非Java的编程语言编写MapReduce程序,然后Streaming用STDIN(标准输入)和STDOUT(标准输出)来和我们编写的Map和Reduce进行数据交换,并提交给Hadoop。命令格式如下:


$HADOOP_HOME/bin/hadoop jar$HADOOP_HOME/hadoop-streaming.jar\

-input myInputDirs\

-output myOutputDir\

-mapper/bin/cat\

-reducer/bin/wc


1.Streaming的工作原理

在上面的命令里,Mapper和Reducer都是可执行文件,它们从标准输入按行读入数据,并把计算结果发送给标准输出。Streaming工具会创建一个MapReduce作业,并把它发送给合适的集群,同时监视这个作业的整个执行过程。

如果一个可执行文件被用于Mapper,则在其初始化时,每一个Mapper任务会把这个可执行文件作为一个单独的进程启动。Mapper任务运行时,它把输入切分成行,并把结果提供给可执行文件对应进程的标准输入。同时,它会收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成key/value对,作为输出。默认情况下,一行中第一个tab之前的部分被当做key,之后的(不包括tab)被当做value。如果没有tab,则整行内容被当做key值,value值为null。具体的转化策略会在下面讨论。

如果一个可执行文件被用于Reducer,每个Reducer任务同样会把这个可执行文件作为一个单独的进程启动。Reducer任务运行时,它把输入切分成行,并把结果提供给可执行文件对应进程的标准输入。同时,它会收集可执行文件进程标准输出的内容,并把每一行内容转化成key/value对,作为输出。默认情况下,一行中第一个tab之前的部分被当作key,之后的(不包括tab)被当做value。

用户也可以使用Java类作为Mapper或Reducer。本节最初给出的命令与这里的命令等价:


$HADOOP_HOME/bin/Hadoop jar$HADOOP_HOME/Hadoop-streaming.jar\

-input myInputDirs\

-output myOutputDir\

-mapper org.apache.hadoop.mapred.lib.IdentityMapper\

-reducer/bin/wc


用户可以设定stream.non.zero.exit.is.failure的值为true或false,从而表明streaming task的返回值非零时是Failure还是Success。默认情况下,streaming task返回非零时表示失败。

2.将文件打包到提交的作业中

利用Streaming用户可以将任何可执行文件指定为Mapper/Reducer。这些可执行文件可以事先存放在集群上,也可以用-file选项让可执行文件成为作业的一部分,并且会一起打包提交。例如:


$HADOOP_HOME/bin/hadoop jar$HADOOP_HOME/hadoop-streaming.jar\

-input myInputDirs\

-output myOutputDir\

-mapper myPythonScript.py\

-reducer/bin/wc\

-file myPythonScript.py


上面的例子描述了一个用户把可执行Python文件指定为Mapper。其中的选项“-file myPythonScirpt.py”使可执行Python文件作为作业的一部分被上传到集群的机器上。

除了可执行文件外,其他Mapper或Reducer需要用到的辅助文件(比如字典、配置文件等)也可以用这种方式打包上传。例如:


$HADOOP_HOME/bin/hadoop jar$HADOOP_HOME/hadoop-streaming.jar\

-input myInputDirs\

-output myOutputDir\

-mapper myPythonScript.py\

-reducer/bin/wc\

-file myPythonScript.py\

-file myDictionary.txt


3.Streaming选项与用法

(1)只使用Mapper的作业

有时候只需要使用Map函数处理输入数据。这时只须把mapred.reduce.tasks设置为零,Mapreduce框架就不会创建Reducer任务,Mapper任务的输出就是整个作业的最终输出。

为了做到向下兼容,Hadoop Streaming也支持“-reduce None”选项,它与“-jobconf mapred.reduce.tasks=0”等价。

(2)为作业指定其他属性

和其他普通的MapReduce作业一样,用户可以为Streaming作业指定数据格式,命令如下:


-inputformat JavaClassName

-outputformat JavaClassName

-partitioner JavaClassName

-combiner JavaClassName


如果不指定输入格式,程序会默认使用TextInputFormat。因为TextInputFormat得到的key值是LongWritable类型的(key值并不是输入文件中的内容,而是value偏移量),所以key会被丢弃,只会把value用管道方式发给Mapper。

另外,用户提供的定义输出格式的类需要能够处理Text类型的key/value对。如果不指定输出格式,则默认会使用TextOutputFormat类。

(3)Hadoop Streaming中的大文件和档案

任务依据-File和-Archive选项在集群中分发文件和档案,选项的参数是用户已上传至HDFS的文件或档案的URI。这些文件和档案在不同的作业间缓存。用户可以通过fs.default.name配置参数的值得到文件所在的host和fs_port。

下面是使用-cacheFile选项的例子:


-File hdfs://host:fs_port/user/testfile.txt#testlink


在上面的例子里,URL中#后面的内容是建立在任务当前工作目录下的符号链接的名字。这个任务的当前工作目录下有一个“testlink”符号链接,它指向testfile.txt文件在本地的复制位置。如果有多个文件,选项可以写成:


-File hdfs://host:fs_port/user/testfile1.txt#testlink1

-File hdfs://host:fs_port/user/testfile2.txt#testlink2


-Archive选项用于把JAR文件复制到任务当前工作目录,并自动把JAR文件解压缩。例如:


-Archive hdfs://host:fs_port/user/testfile.jar#testlink3


在上面的例子中,testlink3是当前工作目录下的符号链接,它指向testfile.jar解压后的目录。

下面是使用-Archive选项的另一个例子。其中,input.txt文件有两行内容,分别是两个文件的名字:testlink/cache.txt和testlink/cache2.txt。“testlink”是指向档案目录(JAR文件解压后的目录)的符号链接,这个目录下有“cache.txt”和“cache2.txt”两个文件。代码如下所示:


$HADOOP_HOME/bin/Hadoop jar$HADOOP_HOME/Hadoop-streaming.jar\

-input"/user/me/samples/cachefile/input.txt"\

-mapper"xargs cat"\

-reducer"cat"\

-output"/user/me/samples/cachefile/out"\

-Archive'hdfs://Hadoop-nn1.example.com/user/me/samples/

cachefile/cchedir.jar#testlink'\

-D mapred.map.tasks=1\

-D mapred.reduce.tasks=1\

-D mapred.job.name="Experiment"

$ls test_jar/

cache.txt cache2.txt

$jar cvf cachedir.jar-C test_jar/.

added manifest

adding:cache.txt(in=30)(out=29)(deflated 3%)

adding:cache2.txt(in=37)(out=35)(deflated 5%)

$Hadoop dfs-put cachedir.jar samples/cachefile

$Hadoop dfs-cat/user/me/samples/cachefile/input.txt

testlink/cache.txt

testlink/cache2.txt

$cat test_jar/cache.txt

This is just the cache string

$cat test_jar/cache2.txt

This is just the second cache string

$Hadoop dfs-ls/user/me/samples/cachefile/out

Found 1 items

/user/me/samples/cachefile/out/part-00000<r 3>69

$Hadoop dfs-cat/user/me/samples/cachefile/out/part-00000

This is just the cache string

This is just the second cache string


4.为作业指定附加配置参数

用户可以使用“-jobconf<n>=<v>”增加一些配置变量。例如:


$HADOOP_HOME/bin/Hadoop jar$HADOOP_HOME/Hadoop-streaming.jar\

-input myInputDirs\

-output myOutputDir\

-mapper org.apache.Hadoop.mapred.lib.IdentityMapper\

-reducer/bin/wc\

-D mapred.reduce.tasks=2


在上面的例子中,-jobconf mapred.reduce.tasks=2表明用两个Reducer完成作业。

关于jobconf参数的更多细节可以参考Hadoop安装包中的Hadoop-default.html文件。

5.其他选项

Streaming命令的其他选项如表18-3所示。

18.3 Hadoop Streaming的介绍和使用 - 图1

使用-cluster<name>实现“本地”Hadoop和一个或多个远程Hadoop集群间的切换。默认情况下,使用Hadoop-default.xml和Hadoop-site.xml。当使用-cluster<name>选项时,会使用$HADOOP_HOME/conf/Hadoop-<name>.xml。

下面的选项可改变temp目录:


-D dfs.data.dir=/tmp


下面的选项指定其他本地temp目录:


-D mapred.local.dir=/tmp/local

-D mapred.system.dir=/tmp/system

-D mapred.temp.dir=/tmp/temp


在streaming命令中设置环境变量:


-cmdenv EXAMPLE_DIR=/home/example/dictionaries/