D.2 详细代码
package cn.edu.ruc.cloudcomputing.book;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.ChainMapper;
import org.apache.hadoop.mapred.lib.ChainReducer;
public class ChainWordCount{
public static class FilterMapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text>{
private final static String[]StopWord=
{"a","an","the","of","in","and","to","at","with","as","for"};
private HashSet<String>StopWordSet;
//此函数实现Mapper接口中的函数,每个Map Task启动之后立即执行(此处因使用
//旧API—org.apache.hadoop.mapred.Mapper,所以此函数名是configure而不是
//新API中的setup,使用旧API是因为ChainMapper和ChainReducer不支持新Mapper//API。有
疑问可查看API)
public void configure(JobConf job){
StopWordSet=new HashSet<String>();
for(int i=0;i<StopWord.length;i++){
StopWordSet.add(StopWord[i]);
}
}
//将输入流中的无意义单词过滤掉
public void map(LongWritable key, Text value, OutputCollector<Text, Text>
collector,
Reporter reportter)throws IOException{
StringTokenizer itr=new StringTokenizer(value.toString());
while(itr.hasMoreTokens()){
String aword=itr.nextToken();
if(StopWordSet.contains(aword)==true)
continue;
collector.collect(new Text(aword),new Text(""));
}
}
}
public static class TokenizerMapper extends MapReduceBase implements
Mapper<Text, Text, Text, IntWritable>{
private final static IntWritable one=new IntWritable(1);
public void map(Text key, Text value, OutputCollector<Text, IntWritable>
collector, Reporter reportter)throws IOException{
collector.collect(key, one);
}
}
public static class IntSumReducer extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable result=new IntWritable();
public void reduce(Text key, Iterator<IntWritable>values, OutputCollector
<Text, IntWritable>collector, Reporter reportter)throws IOException{
int sum=0;
while(values.hasNext()){
sum+=values.next().get();
}
result.set(sum);
collector.collect(key, result);
}
}
public static void main(String[]args)throws Exception{
JobConf job=new JobConf(ChainWordCount.class);
job.setJobName("Chain Map Reduce");
job.setJarByClass(ChainWordCount.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
//将第一个过滤单词的Map加入作业流
JobConf map1Conf=new JobConf(false);
ChainMapper.addMapper(job, FilterMapper.class,
LongWritable.class,
Text.class,
Text.class,
Text.class,
true,
map1Conf);
//将第二个统计单词单次出现的Map加入作业流
JobConf map2Conf=new JobConf(false);
ChainMapper.addMapper(job,
TokenizerMapper.class,
Text.class,
Text.class,
Text.class,
IntWritable.class,
false,
map2Conf);
//将合并单词单次出现次数的Reduce设置成作业流唯一的Reduce
JobConf reduceConf=new JobConf(false);
ChainReducer.setReducer(job,
IntSumReducer.class,
Text.class,
IntWritable.class,
Text.class,
IntWritable.class,
false,
reduceConf);
FileInputFormat.addInputPath(job, new Path("input"));
FileOutputFormat.setOutputPath(job, new Path("output"));
JobClient.runJob(job);
}
}