3.4 Hadoop流

Hadoop流提供了一个API,允许用户使用任何脚本语言写Map函数或Reduce函数。Hadoop流的关键是,它使用UNIX标准流作为程序与Hadoop之间的接口。因此,任何程序只要可以从标准输入流中读取数据并且可以写入数据到标准输出流,那么就可以通过Hadoop流使用其他语言编写MapReduce程序的Map函数或Reduce函数。

举个最简单的例子(本例的运行环境:Ubuntu, Hadoop-0.20.2):


bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar-input input-output

output-mapper/bin/cat-reducer usr/bin/wc


从这个例子中可以看到,Hadoop流引入的包是hadoop-0.20.2-streaming.jar,并且具有如下命令:


-input指明输入文件路径

-output指明输出文件路径

-mapper指定map函数

-reducer指定reduce函数


Hadoop流的操作还有其他参数,后面会一一列出。

3.4.1 Hadoop流的工作原理

先来看Hadoop流的工作原理。在上例中,Map和Reduce都是Linux内的可执行文件,更重要的是,它们接受的都是标准输入(stdin),输出的都是标准输出(stdout)。如果大家熟悉Linux,那么对它们一定不会陌生。执行上一节中的示例程序的过程如下所示。

程序的输入与WordCount程序是一样的,具体如下:


file01:

hello world bye world

file02

hello hadoop bye hadoop

输入命令:

bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar-input input-output

output-mapper/bin/cat-reducer/usr/bin/wc

显示:

packageJobJar:[/root/tmp/hadoop-unjar7103575849190765740/][]/tmp/

streamjob2314757737747407133.jar tmpDir=null

11/01/23 02:07:36 INFO mapred.FileInputFormat:Total input paths to process:2

11/01/23 02:07:37 INFO streaming.StreamJob:getLocalDirs():[/root/tmp/mapred/local]

11/01/23 02:07:37 INFO streaming.StreamJob:Running job:job_201101111819_0020

11/01/23 02:07:37 INFO streaming.StreamJob:To kill this job, run:

11/01/23 02:07:37 INFO streaming.StreamJob:/root/hadoop/bin/hadoop job-Dmapred.

job.tracker=localhost:9001-kill job_201101111819_0020

11/01/23 02:07:37 INFO streaming.StreamJob:Tracking URL:http://localhost:50030/

jobdetails.jsp?jobid=job_201101111819_0020

11/01/23 02:07:38 INFO streaming.StreamJob:map 0%reduce 0%

11/01/23 02:07:47 INFO streaming.StreamJob:map 100%reduce 0%

11/01/23 02:07:59 INFO streaming.StreamJob:map 100%reduce 100%

11/01/23 02:08:02 INFO streaming.StreamJob:Job complete:job_201101111819_0020

11/01/23 02:08:02 INFO streaming.StreamJob:Output:output

程序的输出是:

2 8 46


wc命令用来统计文件中的行数、单词数与字节数,可以看到,这个结果是正确的。

Hadoop流的工作原理并不复杂,其中Map的工作原理如图3-4所示(Reduce与其相同)。

3.4 Hadoop流 - 图1

图 3-4 Hadoop流的Map流程图

当一个可执行文件作为Mapper时,每一个Map任务会以一个独立的进程启动这个可执行文件,然后在Map任务运行时,会把输入切分成行提供给可执行文件,并作为它的标准输入(stdin)内容。当可执行文件运行出结果时,Map从标准输出(stdout)中收集数据,并将其转化为<key, value>对,作为Map的输出。

Reduce与Map相同,如果可执行文件做Reducer时,Reduce任务会启动这个可执行文件,并且将<key, value>对转化为行作为这个可执行文件的标准输入(stdin)。然后Reduce会收集这个可执行文件的标准输出(stdout)的内容。并把每一行转化为<key, value>对,作为Reduce的输出。

Map与Reduce将输出转化为<key, value>对的默认方法是:将每行的第一个tab符号(制表符)之前的内容作为key,之后的内容作为value。如果没有tab符号,那么这一行的所有内容会作为key,而value值为null。当然这是可以更改的。

值得一提的是,可以使用Java类作为Map,而用一个可执行程序作为Reduce;或使用Java类作为Reduce,而用可执行程序作为Map。例如:


/bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar

-input myInputDirs-output myOutputDir-mapper

org.apache.hadoop.mapred.lib.IdentityMapper-reducer/bin/wc