18.3 Hadoop Streaming的介绍和使用
18.3.1 Hadoop Streaming的介绍
Hadoop Streaming是Hadoop的一个工具,它帮助用户创建和运行一类特殊的MapReduce作业,这些特殊的MapReduce作业是由一些可执行文件或脚本文件充当Mapper或Reducer。也就是说Hadoop Streaming允许用户用非Java的编程语言编写MapReduce程序,然后Streaming用STDIN(标准输入)和STDOUT(标准输出)来和我们编写的Map和Reduce进行数据交换,并提交给Hadoop。命令格式如下:
$HADOOP_HOME/bin/hadoop jar$HADOOP_HOME/hadoop-streaming.jar\
-input myInputDirs\
-output myOutputDir\
-mapper/bin/cat\
-reducer/bin/wc
1.Streaming的工作原理
在上面的命令里,Mapper和Reducer都是可执行文件,它们从标准输入按行读入数据,并把计算结果发送给标准输出。Streaming工具会创建一个MapReduce作业,并把它发送给合适的集群,同时监视这个作业的整个执行过程。
如果一个可执行文件被用于Mapper,则在其初始化时,每一个Mapper任务会把这个可执行文件作为一个单独的进程启动。Mapper任务运行时,它把输入切分成行,并把结果提供给可执行文件对应进程的标准输入。同时,它会收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成key/value对,作为输出。默认情况下,一行中第一个tab之前的部分被当做key,之后的(不包括tab)被当做value。如果没有tab,则整行内容被当做key值,value值为null。具体的转化策略会在下面讨论。
如果一个可执行文件被用于Reducer,每个Reducer任务同样会把这个可执行文件作为一个单独的进程启动。Reducer任务运行时,它把输入切分成行,并把结果提供给可执行文件对应进程的标准输入。同时,它会收集可执行文件进程标准输出的内容,并把每一行内容转化成key/value对,作为输出。默认情况下,一行中第一个tab之前的部分被当作key,之后的(不包括tab)被当做value。
用户也可以使用Java类作为Mapper或Reducer。本节最初给出的命令与这里的命令等价:
$HADOOP_HOME/bin/Hadoop jar$HADOOP_HOME/Hadoop-streaming.jar\
-input myInputDirs\
-output myOutputDir\
-mapper org.apache.hadoop.mapred.lib.IdentityMapper\
-reducer/bin/wc
用户可以设定stream.non.zero.exit.is.failure的值为true或false,从而表明streaming task的返回值非零时是Failure还是Success。默认情况下,streaming task返回非零时表示失败。
2.将文件打包到提交的作业中
利用Streaming用户可以将任何可执行文件指定为Mapper/Reducer。这些可执行文件可以事先存放在集群上,也可以用-file选项让可执行文件成为作业的一部分,并且会一起打包提交。例如:
$HADOOP_HOME/bin/hadoop jar$HADOOP_HOME/hadoop-streaming.jar\
-input myInputDirs\
-output myOutputDir\
-mapper myPythonScript.py\
-reducer/bin/wc\
-file myPythonScript.py
上面的例子描述了一个用户把可执行Python文件指定为Mapper。其中的选项“-file myPythonScirpt.py”使可执行Python文件作为作业的一部分被上传到集群的机器上。
除了可执行文件外,其他Mapper或Reducer需要用到的辅助文件(比如字典、配置文件等)也可以用这种方式打包上传。例如:
$HADOOP_HOME/bin/hadoop jar$HADOOP_HOME/hadoop-streaming.jar\
-input myInputDirs\
-output myOutputDir\
-mapper myPythonScript.py\
-reducer/bin/wc\
-file myPythonScript.py\
-file myDictionary.txt
3.Streaming选项与用法
(1)只使用Mapper的作业
有时候只需要使用Map函数处理输入数据。这时只须把mapred.reduce.tasks设置为零,Mapreduce框架就不会创建Reducer任务,Mapper任务的输出就是整个作业的最终输出。
为了做到向下兼容,Hadoop Streaming也支持“-reduce None”选项,它与“-jobconf mapred.reduce.tasks=0”等价。
(2)为作业指定其他属性
和其他普通的MapReduce作业一样,用户可以为Streaming作业指定数据格式,命令如下:
-inputformat JavaClassName
-outputformat JavaClassName
-partitioner JavaClassName
-combiner JavaClassName
如果不指定输入格式,程序会默认使用TextInputFormat。因为TextInputFormat得到的key值是LongWritable类型的(key值并不是输入文件中的内容,而是value偏移量),所以key会被丢弃,只会把value用管道方式发给Mapper。
另外,用户提供的定义输出格式的类需要能够处理Text类型的key/value对。如果不指定输出格式,则默认会使用TextOutputFormat类。
(3)Hadoop Streaming中的大文件和档案
任务依据-File和-Archive选项在集群中分发文件和档案,选项的参数是用户已上传至HDFS的文件或档案的URI。这些文件和档案在不同的作业间缓存。用户可以通过fs.default.name配置参数的值得到文件所在的host和fs_port。
下面是使用-cacheFile选项的例子:
-File hdfs://host:fs_port/user/testfile.txt#testlink
在上面的例子里,URL中#后面的内容是建立在任务当前工作目录下的符号链接的名字。这个任务的当前工作目录下有一个“testlink”符号链接,它指向testfile.txt文件在本地的复制位置。如果有多个文件,选项可以写成:
-File hdfs://host:fs_port/user/testfile1.txt#testlink1
-File hdfs://host:fs_port/user/testfile2.txt#testlink2
-Archive选项用于把JAR文件复制到任务当前工作目录,并自动把JAR文件解压缩。例如:
-Archive hdfs://host:fs_port/user/testfile.jar#testlink3
在上面的例子中,testlink3是当前工作目录下的符号链接,它指向testfile.jar解压后的目录。
下面是使用-Archive选项的另一个例子。其中,input.txt文件有两行内容,分别是两个文件的名字:testlink/cache.txt和testlink/cache2.txt。“testlink”是指向档案目录(JAR文件解压后的目录)的符号链接,这个目录下有“cache.txt”和“cache2.txt”两个文件。代码如下所示:
$HADOOP_HOME/bin/Hadoop jar$HADOOP_HOME/Hadoop-streaming.jar\
-input"/user/me/samples/cachefile/input.txt"\
-mapper"xargs cat"\
-reducer"cat"\
-output"/user/me/samples/cachefile/out"\
-Archive'hdfs://Hadoop-nn1.example.com/user/me/samples/
cachefile/cchedir.jar#testlink'\
-D mapred.map.tasks=1\
-D mapred.reduce.tasks=1\
-D mapred.job.name="Experiment"
$ls test_jar/
cache.txt cache2.txt
$jar cvf cachedir.jar-C test_jar/.
added manifest
adding:cache.txt(in=30)(out=29)(deflated 3%)
adding:cache2.txt(in=37)(out=35)(deflated 5%)
$Hadoop dfs-put cachedir.jar samples/cachefile
$Hadoop dfs-cat/user/me/samples/cachefile/input.txt
testlink/cache.txt
testlink/cache2.txt
$cat test_jar/cache.txt
This is just the cache string
$cat test_jar/cache2.txt
This is just the second cache string
$Hadoop dfs-ls/user/me/samples/cachefile/out
Found 1 items
/user/me/samples/cachefile/out/part-00000<r 3>69
$Hadoop dfs-cat/user/me/samples/cachefile/out/part-00000
This is just the cache string
This is just the second cache string
4.为作业指定附加配置参数
用户可以使用“-jobconf<n>=<v>”增加一些配置变量。例如:
$HADOOP_HOME/bin/Hadoop jar$HADOOP_HOME/Hadoop-streaming.jar\
-input myInputDirs\
-output myOutputDir\
-mapper org.apache.Hadoop.mapred.lib.IdentityMapper\
-reducer/bin/wc\
-D mapred.reduce.tasks=2
在上面的例子中,-jobconf mapred.reduce.tasks=2表明用两个Reducer完成作业。
关于jobconf参数的更多细节可以参考Hadoop安装包中的Hadoop-default.html文件。
5.其他选项
Streaming命令的其他选项如表18-3所示。
使用-cluster<name>实现“本地”Hadoop和一个或多个远程Hadoop集群间的切换。默认情况下,使用Hadoop-default.xml和Hadoop-site.xml。当使用-cluster<name>选项时,会使用$HADOOP_HOME/conf/Hadoop-<name>.xml。
下面的选项可改变temp目录:
-D dfs.data.dir=/tmp
下面的选项指定其他本地temp目录:
-D mapred.local.dir=/tmp/local
-D mapred.system.dir=/tmp/system
-D mapred.temp.dir=/tmp/temp
在streaming命令中设置环境变量:
-cmdenv EXAMPLE_DIR=/home/example/dictionaries/