3.2.2 Hadoop中的Hello World程序

上面所述的过程是MapReduce的核心,所有的MapReduce程序都具有图3-1所示的结构。下面我再举一个例子详述MapReduce的执行过程。

大家初次接触编程时学习的不论是哪种语言,看到的第一个示例程序可能都是“Hello World”。在Hadoop中也有一个类似于Hello World的程序。这就是WordCount。本节会结合这个程序具体讲解与MapReduce程序有关的所有类。这个程序的内容如下:


package cn.edu.ruc.cloudcomputing.book.chapter03;

import java.io.IOException;

import java.util.*;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.conf.*;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapred.*;

import org.apache.hadoop.util.*;

public class WordCount{

public static class Map extends MapReduceBase implements Mapper<LongWritable,

Text, Text, IntWritable>{

private final static IntWritable one=new IntWritable(1);

private Text word=new Text();

public void map(LongWritable key, Text value, OutputCollector<Text,

IntWritable>output, Reporter reporter)throws IOException{

String line=value.toString();

StringTokenizer tokenizer=new StringTokenizer(line);

while(tokenizer.hasMoreTokens()){

word.set(tokenizer.nextToken());

output.collect(word, one);

}

}

}

public static class Reduce extends MapReduceBase implements Reducer<Text,

IntWritable, Text, IntWritable>{

public void reduce(Text key, Iterator<IntWritable>values, OutputCollector<Text,

IntWritable>output, Reporter reporter)throws IOException{

int sum=0;

while(values.hasNext()){

sum+=values.next().get();

}

output.collect(key, new IntWritable(sum));

}

}

public static void main(String[]args)throws Exception{

JobConf conf=new JobConf(WordCount.class);

conf.setJobName("wordcount");

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map.class);

conf.setReducerClass(Reduce.class);

conf.setInputFormat(TextInputFormat.class);

conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(args[0]));

FileOutputFormat.setOutputPath(conf, new Path(args[1]));

JobClient.runJob(conf);

}

}


同时,为了叙述方便,设定两个输入文件,如下:


echo"Hello World Bye World">file01

echo"Hello Hadoop Goodbye Hadoop">file02


看到这个程序,相信很多读者会对众多的预定义类感到很迷惑。其实这些类非常简单明了。首先,WordCount程序的代码虽多,但是执行过程却很简单,在本例中,它首先将输入文件读进来,然后交由Map程序处理,Map程序将输入读入后切出其中的单词,并标记它的数目为1,形成<word,1>的形式,然后交由Reduce处理,Reduce将相同key值(也就是word)的value值收集起来,形成<word, list of 1>的形式,之后将这些1值加起来,即为单词的个数,最后将这个<key, value>对以TextOutputFormat的形式输出到HDFS中。

针对这个数据流动过程,我挑出了如下几句代码来表述它的执行过程:


JobConf conf=new JobConf(MyMapre.class);

conf.setJobName("wordcount");

conf.setInputFormat(TextInputFormat.class);

conf.setOutputFormat(TextOutputFormat.class);

conf.setMapperClass(Map.class);

conf.setReducerClass(Reduce.class);

FileInputFormat.setInputPaths(conf, new Path(args[0]));

FileOutputFormat.setOutputPath(conf, new Path(args[1]));


首先讲解一下Job的初始化过程。Main函数调用Jobconf类来对MapReduce Job进行初始化,然后调用setJobName()方法命名这个Job。对Job进行合理的命名有助于更快地找到Job,以便在JobTracker和TaskTracker的页面中对其进行监视。接着就会调用setInputPath()和setOutputPath()设置输入输出路径。下面会结合WordCount程序重点讲解Inputformat()、OutputFormat()、Map()、Reduce()这4种方法。

1.InputFormat()和InputSplit

InputSplit是Hadoop中用来把输入数据传送给每个单独的Map, InputSplit存储的并非数据本身,而是一个分片长度和一个记录数据位置的数组。生成InputSplit的方法可以通过Inputformat()来设置。当数据传送给Map时,Map会将输入分片传送到InputFormat()上,InputFormat()则调用getRecordReader()方法生成RecordReader, RecordReader再通过creatKey()、creatValue()方法创建可供Map处理的<key, value>对,即<k1,v1>。简而言之,InputFormat()方法是用来生成可供Map处理的<key, value>对的。

Hadoop预定义了多种方法将不同类型的输入数据转化为Map能够处理的<key, value>对,它们都继承自InputFormat,分别是:

BaileyBorweinPlouffe. BbpInputFormat

ComposableInputFormat

CompositeInputFormat

DBInputFormat

DistSum. Machine.AbstractInputFormat

FileInputFormat

其中,FileInputFormat又有多个子类,分别为:

CombineFileInputFormat

KeyValueTextInputFormat

NLineInputFormat

SequenceFileInputFormat

TeraInputFormat

TextInputFormat

其中,TextInputFormat是Hadoop默认的输入方法,在TextInputFormat中,每个文件(或其一部分)都会单独作为Map的输入,而这是继承自FileInputFormat的。之后,每行数据都会生成一条记录,每条记录则表示成<key, value>形式:

key值是每个数据的记录在数据分片中的字节偏移量,数据类型是LongWritable;

value值是每行的内容,数据类型是Text。

也就是说,输入数据会以如下的形式被传入Map中:


file01:

0 hello world bye world

file02

0 hello hadoop bye hadoop


因为file01和file02都会被单独输入到一个Map中,因此它们的key值都是0

2.OutputFormat()

对于每一种输入格式都有一种输出格式与其对应。同样,默认的输出格式是TextOutputFormat,这种输出方式与输入类似,会将每条记录以一行的形式存入文本文件。不过,它的键和值可以是任意形式的,因为程序内部会调用toString()方法将键和值转换为String类型再输出。最后的输出形式如下所示:


Bye 2

Hadoop 2

Hello 2

World 2


3.Map()和Reduce()

Map()方法和Reduce()方法是本章的重点,从前面的内容知道,Map()函数接收经过InputFormat处理所产生的<k1,v1>,然后输出<k2,v2>。WordCount的Map()函数如下:


public class MyMapre{

public static class Map extends MapReduceBase implements Mapper<LongWritable,

Text, Text, IntWritable>{

private final static IntWritable one=new IntWritable(1);

private Text word=new Text();

public void map(LongWritable key, Text value,

OutputCollector<Text, IntWritable>output, Reporter reporter)throws IOException{

String line=value.toString();

StringTokenizer tokenizer=new StringTokenizer(line);

while(tokenizer.hasMoreTokens()){

word.set(tokenizer.nextToken());

output.collect(word, one);

}

}

}


Map()函数继承自MapReduceBase,并且它实现了Mapper接口,此接口是一个范型类型,它有4种形式的参数,分别用来指定Map()的输入key值类型、输入value值类型、输出key值类型和输出value值类型。在本例中,因为使用的是TextInputFormat,它的输出key值是LongWritable类型,输出value值是Text类型,所以Map()的输入类型即为<LongWritable, Text>。如前面的内容所述,在本例中需要输出<word,1>这样的形式,因此输出的key值类型是Text,输出的value值类型是IntWritable。

实现此接口类还需要实现Map()方法,Map()方法会负责具体对输入进行操作,在本例中,Map()方法对输入的行以空格为单位进行切分,然后使用OutputCollect收集输出的<word,1>,即<k2,v2>。

下面来看Reduce()函数:


public static class Reduce extends MapReduceBase implements Reducer<Text,

IntWritable, Text, IntWritable>{

public void reduce(Text key, Iterator<IntWritable>values,

OutputCollector<Text, IntWritable>output, Reporter reporter)throws IOException{

int sum=0;

while(values.hasNext()){

sum+=values.next().get();

}

output.collect(key, new IntWritable(sum));

}

}


与Map()类似,Reduce()函数也继承自MapReduceBase,需要实现Reducer接口。Reduce()函数以Map()的输出作为输入,因此Reduce()的输入类型是<Text, IneWritable>。而Reduce()的输出是单词和它的数目,因此,它的输出类型是<Text, IntWritable>。Reduce()函数也要实现Reduce()方法,在此方法中,Reduce()函数将输入的key值作为输出的key值,然后将获得的多个value值加起来,作为输出的value值。

4.运行MapReduce程序

读者可以在Eclipse里运行MapReduce程序,也可以在命令行中运行MapReduce程序,但是在实际应用中,还是推荐到命令行中运行程序。按照第2章介绍的步骤,首先安装Hadoop,然后输入编译打包生成的JAR程序,如下所示(以Hadoop-0.20.2为例,安装路径是~/hadoop):


mkdir FirstJar

javac-classpath~/hadoop/hadoop-0.20.2-core.jar-d FirstJar

WordCount.java

jar-cvf wordcount.jar-C FirstJar/.


首先建立FirstJar,然后编译文件生成.class,存放到文件夹FirstJar中,并将FirstJar中的文件打包生成wordcount.jar文件。

接着上传输入文件(输入文件是file01,file02,存放在~/input):


~/hadoop/bin/hadoop dfs-mkdir input

~/hadoop/bin/hadoop dfs-put~/input/file0*input


在此上传过程中,先建立文件夹input,然后上传文件file01、file02到input中。

最后运行生成的JAR文件,为了叙述方便,先将生成的JAR文件放入Hadoop的安装文件夹中(HADOOP_HOME),然后运行如下命令。


~/hadoop/bin/hadoop jar wordcount.jar WordCount input output

11/01/21 20:02:38 WARN mapred.JobClient:Use GenericOptionsParser for parsing the

arguments.Applications should implement Tool for the same.

11/01/21 20:02:38 INFO mapred.FileInputFormat:Total input paths to process:2

11/01/21 20:02:38 INFO mapred.JobClient:Running job:job_201101111819_0002

11/01/21 20:02:39 INFO mapred.JobClient:map 0%reduce 0%

11/01/21 20:02:49 INFO mapred.JobClient:map 100%reduce 0%

11/01/21 20:03:01 INFO mapred.JobClient:map 100%reduce 100%

11/01/21 20:03:03 INFO mapred.JobClient:Job complete:job_201101111819_0002

11/01/21 20:03:03 INFO mapred.JobClient:Counters:18

11/01/21 20:03:03 INFO mapred.JobClient:Job Counters

11/01/21 20:03:03 INFO mapred.JobClient:Launched reduce tasks=1

11/01/21 20:03:03 INFO mapred.JobClient:Launched map tasks=2

11/01/21 20:03:03 INFO mapred.JobClient:Data-local map tasks=2

11/01/21 20:03:03 INFO mapred.JobClient:FileSystemCounters

11/01/21 20:03:03 INFO mapred.JobClient:FILE_BYTES_READ=100

11/01/21 20:03:03 INFO mapred.JobClient:HDFS_BYTES_READ=46

11/01/21 20:03:03 INFO mapred.JobClient:FILE_BYTES_WRITTEN=270

11/01/21 20:03:03 INFO mapred.JobClient:HDFS_BYTES_WRITTEN=31

11/01/21 20:03:03 INFO mapred.JobClient:Map-Reduce Framework

11/01/21 20:03:04 INFO mapred.JobClient:Reduce input groups=4

11/01/21 20:03:04 INFO mapred.JobClient:Combine output records=0

11/01/21 20:03:04 INFO mapred.JobClient:Map input records=2

11/01/21 20:03:04 INFO mapred.JobClient:Reduce shuffle bytes=106

11/01/21 20:03:04 INFO mapred.JobClient:Reduce output records=4

11/01/21 20:03:04 INFO mapred.JobClient:Spilled Records=16

11/01/21 20:03:04 INFO mapred.JobClient:Map output bytes=78

11/01/21 20:03:04 INFO mapred.JobClient:Map input bytes=46

11/01/21 20:03:04 INFO mapred.JobClient:Combine input records=0

11/01/21 20:03:04 INFO mapred.JobClient:Map output records=8

11/01/21 20:03:04 INFO mapred.JobClient:Reduce input records=8


Hadoop命令(注意不是Hadoop本身)会启动一个JVM来运行这个MapReduce程序,并自动获取Hadoop的配置,同时把类的路径(及其依赖关系)加入到Hadoop的库中。以上就是Hadoop Job的运行记录,从这里面可以看到,这个Job被赋予了一个ID号:job_201101111819_0002,而且得知输入文件有两个(Total input paths to process:2),同时还可以了解Map的输入输出记录(record数及字节数),以及Reduce的输入输出记录。比如说,在本例中,Map的task数量是2个,Reduce的Task数量是一个;Map的输入record数是2个,输出record数是8个等。

可以通过命令查看输出文件输出文件为:


bye 2

hadoop 2

hello 2

world 2


5.新的API

从0.20.2版本开始,Hadoop提供了一个新的API。新的API是在org.apache.hadoop.mapreduce中的,旧版的API则在org.apache.hadoop.mapred中。新的API不兼容旧的API, WordCount程序用新的API重写如下:


package cn.ruc.edu.cloudcomputing.book.chaptero3;

import java.io.IOException;

import java.util.*;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.conf.*;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.lib.input.*;

import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class WordCount extends Configured implements Tool{

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable>{

private final static IntWritable one=new IntWritable(1);

private Text word=new Text();

public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException{

String line=value.toString();

StringTokenizer tokenizer=new StringTokenizer(line);

while(tokenizer.hasMoreTokens()){

word.set(tokenizer.nextToken());

context.write(word, one);

}

}

}

public static class Reduce extends Reducer<Text, IntWritable, Text,

IntWritable>{

public void reduce(Text key, Iterable<IntWritable>values, Context context)

throws IOException, InterruptedException{

int sum=0;

for(IntWritable val:values){

sum+=val.get();

}

context.write(key, new IntWritable(sum));

}

}

public int run(String[]args)throws Exception{

Job job=new Job(getConf());

job.setJarByClass(WordCount.class);

job.setJobName("wordcount");

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

job.setMapperClass(Map.class);

job.setReducerClass(Reduce.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

boolean success=job.waitForCompletion(true);

return success?0:1;

}

public static void main(String[]args)throws Exception{

int ret=ToolRunner.run(new WordCount(),args);

System.exit(ret);

}

}


从这个程序可以看到新旧API的几个区别:

在新的API中,Mapper与Reducer已经不是接口而是抽象类。而且Map函数与Reduce函数也已经不再实现Mapper和Reducer接口,而是继承Mapper和Reducer抽象类。这样做更容易扩展,因为添加方法到抽象类中更容易。

新的API中更广泛地使用了context对象,并使用MapContext进行MapReduce间的通信,MapContext同时充当OutputCollector和Reporter的角色。

Job的配置统一由Configurartion来完成,而不必额外地使用JobConf对守护进程进行配置。

由Job类来负责Job的控制,而不是JobClient, JobClient在新的API中已经被删除。这些区别,都可以在以上的程序中看出。

此外,新的API同时支持“推”和“拉”式的迭代方式。在以往的操作中,<key, value>对是被推入到Map中的,但是在新的API中,允许程序将数据拉入Map中,Reduce也一样。这样做更加方便程序分批处理数据。