3.2.2 Hadoop中的Hello World程序
上面所述的过程是MapReduce的核心,所有的MapReduce程序都具有图3-1所示的结构。下面我再举一个例子详述MapReduce的执行过程。
大家初次接触编程时学习的不论是哪种语言,看到的第一个示例程序可能都是“Hello World”。在Hadoop中也有一个类似于Hello World的程序。这就是WordCount。本节会结合这个程序具体讲解与MapReduce程序有关的所有类。这个程序的内容如下:
package cn.edu.ruc.cloudcomputing.book.chapter03;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount{
public static class Map extends MapReduceBase implements Mapper<LongWritable,
Text, Text, IntWritable>{
private final static IntWritable one=new IntWritable(1);
private Text word=new Text();
public void map(LongWritable key, Text value, OutputCollector<Text,
IntWritable>output, Reporter reporter)throws IOException{
String line=value.toString();
StringTokenizer tokenizer=new StringTokenizer(line);
while(tokenizer.hasMoreTokens()){
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text,
IntWritable, Text, IntWritable>{
public void reduce(Text key, Iterator<IntWritable>values, OutputCollector<Text,
IntWritable>output, Reporter reporter)throws IOException{
int sum=0;
while(values.hasNext()){
sum+=values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[]args)throws Exception{
JobConf conf=new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
同时,为了叙述方便,设定两个输入文件,如下:
echo"Hello World Bye World">file01
echo"Hello Hadoop Goodbye Hadoop">file02
看到这个程序,相信很多读者会对众多的预定义类感到很迷惑。其实这些类非常简单明了。首先,WordCount程序的代码虽多,但是执行过程却很简单,在本例中,它首先将输入文件读进来,然后交由Map程序处理,Map程序将输入读入后切出其中的单词,并标记它的数目为1,形成<word,1>的形式,然后交由Reduce处理,Reduce将相同key值(也就是word)的value值收集起来,形成<word, list of 1>的形式,之后将这些1值加起来,即为单词的个数,最后将这个<key, value>对以TextOutputFormat的形式输出到HDFS中。
针对这个数据流动过程,我挑出了如下几句代码来表述它的执行过程:
JobConf conf=new JobConf(MyMapre.class);
conf.setJobName("wordcount");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
首先讲解一下Job的初始化过程。Main函数调用Jobconf类来对MapReduce Job进行初始化,然后调用setJobName()方法命名这个Job。对Job进行合理的命名有助于更快地找到Job,以便在JobTracker和TaskTracker的页面中对其进行监视。接着就会调用setInputPath()和setOutputPath()设置输入输出路径。下面会结合WordCount程序重点讲解Inputformat()、OutputFormat()、Map()、Reduce()这4种方法。
1.InputFormat()和InputSplit
InputSplit是Hadoop中用来把输入数据传送给每个单独的Map, InputSplit存储的并非数据本身,而是一个分片长度和一个记录数据位置的数组。生成InputSplit的方法可以通过Inputformat()来设置。当数据传送给Map时,Map会将输入分片传送到InputFormat()上,InputFormat()则调用getRecordReader()方法生成RecordReader, RecordReader再通过creatKey()、creatValue()方法创建可供Map处理的<key, value>对,即<k1,v1>。简而言之,InputFormat()方法是用来生成可供Map处理的<key, value>对的。
Hadoop预定义了多种方法将不同类型的输入数据转化为Map能够处理的<key, value>对,它们都继承自InputFormat,分别是:
BaileyBorweinPlouffe. BbpInputFormat
ComposableInputFormat
CompositeInputFormat
DBInputFormat
DistSum. Machine.AbstractInputFormat
FileInputFormat
其中,FileInputFormat又有多个子类,分别为:
CombineFileInputFormat
KeyValueTextInputFormat
NLineInputFormat
SequenceFileInputFormat
TeraInputFormat
TextInputFormat
其中,TextInputFormat是Hadoop默认的输入方法,在TextInputFormat中,每个文件(或其一部分)都会单独作为Map的输入,而这是继承自FileInputFormat的。之后,每行数据都会生成一条记录,每条记录则表示成<key, value>形式:
key值是每个数据的记录在数据分片中的字节偏移量,数据类型是LongWritable;
value值是每行的内容,数据类型是Text。
也就是说,输入数据会以如下的形式被传入Map中:
file01:
0 hello world bye world
file02
0 hello hadoop bye hadoop
因为file01和file02都会被单独输入到一个Map中,因此它们的key值都是0
2.OutputFormat()
对于每一种输入格式都有一种输出格式与其对应。同样,默认的输出格式是TextOutputFormat,这种输出方式与输入类似,会将每条记录以一行的形式存入文本文件。不过,它的键和值可以是任意形式的,因为程序内部会调用toString()方法将键和值转换为String类型再输出。最后的输出形式如下所示:
Bye 2
Hadoop 2
Hello 2
World 2
3.Map()和Reduce()
Map()方法和Reduce()方法是本章的重点,从前面的内容知道,Map()函数接收经过InputFormat处理所产生的<k1,v1>,然后输出<k2,v2>。WordCount的Map()函数如下:
public class MyMapre{
public static class Map extends MapReduceBase implements Mapper<LongWritable,
Text, Text, IntWritable>{
private final static IntWritable one=new IntWritable(1);
private Text word=new Text();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable>output, Reporter reporter)throws IOException{
String line=value.toString();
StringTokenizer tokenizer=new StringTokenizer(line);
while(tokenizer.hasMoreTokens()){
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
Map()函数继承自MapReduceBase,并且它实现了Mapper接口,此接口是一个范型类型,它有4种形式的参数,分别用来指定Map()的输入key值类型、输入value值类型、输出key值类型和输出value值类型。在本例中,因为使用的是TextInputFormat,它的输出key值是LongWritable类型,输出value值是Text类型,所以Map()的输入类型即为<LongWritable, Text>。如前面的内容所述,在本例中需要输出<word,1>这样的形式,因此输出的key值类型是Text,输出的value值类型是IntWritable。
实现此接口类还需要实现Map()方法,Map()方法会负责具体对输入进行操作,在本例中,Map()方法对输入的行以空格为单位进行切分,然后使用OutputCollect收集输出的<word,1>,即<k2,v2>。
下面来看Reduce()函数:
public static class Reduce extends MapReduceBase implements Reducer<Text,
IntWritable, Text, IntWritable>{
public void reduce(Text key, Iterator<IntWritable>values,
OutputCollector<Text, IntWritable>output, Reporter reporter)throws IOException{
int sum=0;
while(values.hasNext()){
sum+=values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
与Map()类似,Reduce()函数也继承自MapReduceBase,需要实现Reducer接口。Reduce()函数以Map()的输出作为输入,因此Reduce()的输入类型是<Text, IneWritable>。而Reduce()的输出是单词和它的数目,因此,它的输出类型是<Text, IntWritable>。Reduce()函数也要实现Reduce()方法,在此方法中,Reduce()函数将输入的key值作为输出的key值,然后将获得的多个value值加起来,作为输出的value值。
4.运行MapReduce程序
读者可以在Eclipse里运行MapReduce程序,也可以在命令行中运行MapReduce程序,但是在实际应用中,还是推荐到命令行中运行程序。按照第2章介绍的步骤,首先安装Hadoop,然后输入编译打包生成的JAR程序,如下所示(以Hadoop-0.20.2为例,安装路径是~/hadoop):
mkdir FirstJar
javac-classpath~/hadoop/hadoop-0.20.2-core.jar-d FirstJar
WordCount.java
jar-cvf wordcount.jar-C FirstJar/.
首先建立FirstJar,然后编译文件生成.class,存放到文件夹FirstJar中,并将FirstJar中的文件打包生成wordcount.jar文件。
接着上传输入文件(输入文件是file01,file02,存放在~/input):
~/hadoop/bin/hadoop dfs-mkdir input
~/hadoop/bin/hadoop dfs-put~/input/file0*input
在此上传过程中,先建立文件夹input,然后上传文件file01、file02到input中。
最后运行生成的JAR文件,为了叙述方便,先将生成的JAR文件放入Hadoop的安装文件夹中(HADOOP_HOME),然后运行如下命令。
~/hadoop/bin/hadoop jar wordcount.jar WordCount input output
11/01/21 20:02:38 WARN mapred.JobClient:Use GenericOptionsParser for parsing the
arguments.Applications should implement Tool for the same.
11/01/21 20:02:38 INFO mapred.FileInputFormat:Total input paths to process:2
11/01/21 20:02:38 INFO mapred.JobClient:Running job:job_201101111819_0002
11/01/21 20:02:39 INFO mapred.JobClient:map 0%reduce 0%
11/01/21 20:02:49 INFO mapred.JobClient:map 100%reduce 0%
11/01/21 20:03:01 INFO mapred.JobClient:map 100%reduce 100%
11/01/21 20:03:03 INFO mapred.JobClient:Job complete:job_201101111819_0002
11/01/21 20:03:03 INFO mapred.JobClient:Counters:18
11/01/21 20:03:03 INFO mapred.JobClient:Job Counters
11/01/21 20:03:03 INFO mapred.JobClient:Launched reduce tasks=1
11/01/21 20:03:03 INFO mapred.JobClient:Launched map tasks=2
11/01/21 20:03:03 INFO mapred.JobClient:Data-local map tasks=2
11/01/21 20:03:03 INFO mapred.JobClient:FileSystemCounters
11/01/21 20:03:03 INFO mapred.JobClient:FILE_BYTES_READ=100
11/01/21 20:03:03 INFO mapred.JobClient:HDFS_BYTES_READ=46
11/01/21 20:03:03 INFO mapred.JobClient:FILE_BYTES_WRITTEN=270
11/01/21 20:03:03 INFO mapred.JobClient:HDFS_BYTES_WRITTEN=31
11/01/21 20:03:03 INFO mapred.JobClient:Map-Reduce Framework
11/01/21 20:03:04 INFO mapred.JobClient:Reduce input groups=4
11/01/21 20:03:04 INFO mapred.JobClient:Combine output records=0
11/01/21 20:03:04 INFO mapred.JobClient:Map input records=2
11/01/21 20:03:04 INFO mapred.JobClient:Reduce shuffle bytes=106
11/01/21 20:03:04 INFO mapred.JobClient:Reduce output records=4
11/01/21 20:03:04 INFO mapred.JobClient:Spilled Records=16
11/01/21 20:03:04 INFO mapred.JobClient:Map output bytes=78
11/01/21 20:03:04 INFO mapred.JobClient:Map input bytes=46
11/01/21 20:03:04 INFO mapred.JobClient:Combine input records=0
11/01/21 20:03:04 INFO mapred.JobClient:Map output records=8
11/01/21 20:03:04 INFO mapred.JobClient:Reduce input records=8
Hadoop命令(注意不是Hadoop本身)会启动一个JVM来运行这个MapReduce程序,并自动获取Hadoop的配置,同时把类的路径(及其依赖关系)加入到Hadoop的库中。以上就是Hadoop Job的运行记录,从这里面可以看到,这个Job被赋予了一个ID号:job_201101111819_0002,而且得知输入文件有两个(Total input paths to process:2),同时还可以了解Map的输入输出记录(record数及字节数),以及Reduce的输入输出记录。比如说,在本例中,Map的task数量是2个,Reduce的Task数量是一个;Map的输入record数是2个,输出record数是8个等。
可以通过命令查看输出文件输出文件为:
bye 2
hadoop 2
hello 2
world 2
5.新的API
从0.20.2版本开始,Hadoop提供了一个新的API。新的API是在org.apache.hadoop.mapreduce中的,旧版的API则在org.apache.hadoop.mapred中。新的API不兼容旧的API, WordCount程序用新的API重写如下:
package cn.ruc.edu.cloudcomputing.book.chaptero3;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class WordCount extends Configured implements Tool{
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable>{
private final static IntWritable one=new IntWritable(1);
private Text word=new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException{
String line=value.toString();
StringTokenizer tokenizer=new StringTokenizer(line);
while(tokenizer.hasMoreTokens()){
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text,
IntWritable>{
public void reduce(Text key, Iterable<IntWritable>values, Context context)
throws IOException, InterruptedException{
int sum=0;
for(IntWritable val:values){
sum+=val.get();
}
context.write(key, new IntWritable(sum));
}
}
public int run(String[]args)throws Exception{
Job job=new Job(getConf());
job.setJarByClass(WordCount.class);
job.setJobName("wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean success=job.waitForCompletion(true);
return success?0:1;
}
public static void main(String[]args)throws Exception{
int ret=ToolRunner.run(new WordCount(),args);
System.exit(ret);
}
}
从这个程序可以看到新旧API的几个区别:
在新的API中,Mapper与Reducer已经不是接口而是抽象类。而且Map函数与Reduce函数也已经不再实现Mapper和Reducer接口,而是继承Mapper和Reducer抽象类。这样做更容易扩展,因为添加方法到抽象类中更容易。
新的API中更广泛地使用了context对象,并使用MapContext进行MapReduce间的通信,MapContext同时充当OutputCollector和Reporter的角色。
Job的配置统一由Configurartion来完成,而不必额外地使用JobConf对守护进程进行配置。
由Job类来负责Job的控制,而不是JobClient, JobClient在新的API中已经被删除。这些区别,都可以在以上的程序中看出。
此外,新的API同时支持“推”和“拉”式的迭代方式。在以往的操作中,<key, value>对是被推入到Map中的,但是在新的API中,允许程序将数据拉入Map中,Reduce也一样。这样做更加方便程序分批处理数据。