C.2 详细代码
package cn.edu.ruc.cloudcomputing.book;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class AdvancedWordCount{
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one=new IntWritable(1);
private Text word=new Text();
private HashSet<String>keyWord;
private Path[]localFiles;
//此函数在每个Map Task启动之后立即执行(此处因使用新
//API—org.apache.hadoop.mapreduce.Mapper,所以此函数名是setup而不是
//旧API中的configure,有疑问可查看API)
public void setup(Context context
)throws IOException, InterruptedException{
keyWord=new HashSet<String>();
Configuration conf=context.getConfiguration();
localFiles=DistributedCache.getLocalCacheFiles(conf);
//将缓存文件内容读入到当前Map Task的全局变量中
for(int i=0;i<localFiles.length;i++){
String aKeyWord;
BufferedReader br=new BufferedReader(new FileReader
(localFiles[i].toString()));
while((aKeyWord=br.readLine())!=null){
keyWord.add(aKeyWord);
}
br.close();
}
}
//根据缓存文件中缓存的无意义单词对输入流进行过滤
public void map(Object key, Text value, Context context
)throws IOException, InterruptedException{
StringTokenizer itr=new StringTokenizer(value.toString());
while(itr.hasMoreTokens()){
String aword=itr.nextToken();
if(keyWord.contains(aword)==true)
continue;
word.set(aword);
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable result=new IntWritable();
public void reduce(Text key, Iterable<IntWritable>values,
Context context
)throws IOException, InterruptedException{
int sum=0;
for(IntWritable val:values){
sum+=val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[]args)throws Exception{
Configuration conf=new Configuration();
//将HDFS上的文件设置成当前作业的缓存文件
DistributedCache.addCacheFile(new URI("hdfs://localhost:9000/user/ubuntu/
cachefile/KeyWord#KeyWord"),conf);
Job job=new Job(conf,"advanced word count");
job.setJarByClass(AdvancedWordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("input"));
FileOutputFormat.setOutputPath(job, new Path("output"));
System.exit(job.waitForCompletion(true)?0:1);
}
}