D.2 详细代码


package cn.edu.ruc.cloudcomputing.book;

import java.io.IOException;

import java.util.HashSet;

import java.util.Iterator;

import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.TextOutputFormat;

import org.apache.hadoop.mapred.lib.ChainMapper;

import org.apache.hadoop.mapred.lib.ChainReducer;

public class ChainWordCount{

public static class FilterMapper extends MapReduceBase implements

Mapper<LongWritable, Text, Text, Text>{

private final static String[]StopWord=

{"a","an","the","of","in","and","to","at","with","as","for"};

private HashSet<String>StopWordSet;

//此函数实现Mapper接口中的函数,每个Map Task启动之后立即执行(此处因使用

//旧API—org.apache.hadoop.mapred.Mapper,所以此函数名是configure而不是

//新API中的setup,使用旧API是因为ChainMapper和ChainReducer不支持新Mapper//API。有

疑问可查看API)

public void configure(JobConf job){

StopWordSet=new HashSet<String>();

for(int i=0;i<StopWord.length;i++){

StopWordSet.add(StopWord[i]);

}

}

//将输入流中的无意义单词过滤掉

public void map(LongWritable key, Text value, OutputCollector<Text, Text>

collector,

Reporter reportter)throws IOException{

StringTokenizer itr=new StringTokenizer(value.toString());

while(itr.hasMoreTokens()){

String aword=itr.nextToken();

if(StopWordSet.contains(aword)==true)

continue;

collector.collect(new Text(aword),new Text(""));

}

}

}

public static class TokenizerMapper extends MapReduceBase implements

Mapper<Text, Text, Text, IntWritable>{

private final static IntWritable one=new IntWritable(1);

public void map(Text key, Text value, OutputCollector<Text, IntWritable>

collector, Reporter reportter)throws IOException{

collector.collect(key, one);

}

}

public static class IntSumReducer extends MapReduceBase implements

Reducer<Text, IntWritable, Text, IntWritable>{

private IntWritable result=new IntWritable();

public void reduce(Text key, Iterator<IntWritable>values, OutputCollector

<Text, IntWritable>collector, Reporter reportter)throws IOException{

int sum=0;

while(values.hasNext()){

sum+=values.next().get();

}

result.set(sum);

collector.collect(key, result);

}

}

public static void main(String[]args)throws Exception{

JobConf job=new JobConf(ChainWordCount.class);

job.setJobName("Chain Map Reduce");

job.setJarByClass(ChainWordCount.class);

job.setInputFormat(TextInputFormat.class);

job.setOutputFormat(TextOutputFormat.class);

//将第一个过滤单词的Map加入作业流

JobConf map1Conf=new JobConf(false);

ChainMapper.addMapper(job, FilterMapper.class,

LongWritable.class,

Text.class,

Text.class,

Text.class,

true,

map1Conf);

//将第二个统计单词单次出现的Map加入作业流

JobConf map2Conf=new JobConf(false);

ChainMapper.addMapper(job,

TokenizerMapper.class,

Text.class,

Text.class,

Text.class,

IntWritable.class,

false,

map2Conf);

//将合并单词单次出现次数的Reduce设置成作业流唯一的Reduce

JobConf reduceConf=new JobConf(false);

ChainReducer.setReducer(job,

IntSumReducer.class,

Text.class,

IntWritable.class,

Text.class,

IntWritable.class,

false,

reduceConf);

FileInputFormat.addInputPath(job, new Path("input"));

FileOutputFormat.setOutputPath(job, new Path("output"));

JobClient.runJob(job);

}

}