C.2 详细代码


package cn.edu.ruc.cloudcomputing.book;

import java.io.BufferedReader;

import java.io.FileReader;

import java.io.IOException;

import java.net.URI;

import java.util.HashSet;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.filecache.DistributedCache;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class AdvancedWordCount{

public static class TokenizerMapper

extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one=new IntWritable(1);

private Text word=new Text();

private HashSet<String>keyWord;

private Path[]localFiles;

//此函数在每个Map Task启动之后立即执行(此处因使用新

//API—org.apache.hadoop.mapreduce.Mapper,所以此函数名是setup而不是

//旧API中的configure,有疑问可查看API)

public void setup(Context context

)throws IOException, InterruptedException{

keyWord=new HashSet<String>();

Configuration conf=context.getConfiguration();

localFiles=DistributedCache.getLocalCacheFiles(conf);

//将缓存文件内容读入到当前Map Task的全局变量中

for(int i=0;i<localFiles.length;i++){

String aKeyWord;

BufferedReader br=new BufferedReader(new FileReader

(localFiles[i].toString()));

while((aKeyWord=br.readLine())!=null){

keyWord.add(aKeyWord);

}

br.close();

}

}

//根据缓存文件中缓存的无意义单词对输入流进行过滤

public void map(Object key, Text value, Context context

)throws IOException, InterruptedException{

StringTokenizer itr=new StringTokenizer(value.toString());

while(itr.hasMoreTokens()){

String aword=itr.nextToken();

if(keyWord.contains(aword)==true)

continue;

word.set(aword);

context.write(word, one);

}

}

}

public static class IntSumReducer

extends Reducer<Text, IntWritable, Text, IntWritable>{

private IntWritable result=new IntWritable();

public void reduce(Text key, Iterable<IntWritable>values,

Context context

)throws IOException, InterruptedException{

int sum=0;

for(IntWritable val:values){

sum+=val.get();

}

result.set(sum);

context.write(key, result);

}

}

public static void main(String[]args)throws Exception{

Configuration conf=new Configuration();

//将HDFS上的文件设置成当前作业的缓存文件

DistributedCache.addCacheFile(new URI("hdfs://localhost:9000/user/ubuntu/

cachefile/KeyWord#KeyWord"),conf);

Job job=new Job(conf,"advanced word count");

job.setJarByClass(AdvancedWordCount.class);

job.setMapperClass(TokenizerMapper.class);

job.setCombinerClass(IntSumReducer.class);

job.setReducerClass(IntSumReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path("input"));

FileOutputFormat.setOutputPath(job, new Path("output"));

System.exit(job.waitForCompletion(true)?0:1);

}

}