4.5 运行MapReduce程序

想要测试人体的健康状况,要先知道人体各个组织的健康状况,然后再综合评价人体的健康状况。假设每个组织的健康指标是一个0~100之间的数字,得到综合身体健康状况的方法是计算所有组织健康指标的平均数。由于测试的人数众多,因此存储数据的格式为:姓名+得分+#(代表一个人单个人体组织的健康状况),每个组织的健康状况分别用一个文件存储。现在一共有1000个组织参与了评估,即用1000个文件分别存储。

由于此例中对数据的处理与前面对学生成绩进行的简单处理有一些区别,下面先将程序的主要部分列举出来。

Mapper部分的代码如下:


public static class Map

extends Mapper<LongWritable, Text, Text, IntWritable>{

public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException{

String line=value.toString();

//以“#”为分隔符,将输入的文件分割成单个记录

StringTokenizer tokenizerArticle=new StringTokenizer(line,"#");

//对每个记录进行处理

while(tokenizerArticle.hasMoreTokens()){

//将每个记录分成姓名和分数两个部分

StringTokenizer tokenizerLine=new StringTokenizer(tokenizerArticle.nextToken());

while(tokenizerLine.hasMoreTokens()){

String strName=tokenizerLine.nextToken();

if(tokenizerLine.hasMoreTokens()){

String strScore=tokenizerLine.nextToken();

Text name=new Text(strName);//姓名

int scoreInt=Integer.parseInt(strScore);//该组织的状况得分

context.write(name, new IntWritable(scoreInt));

}

}

}

}


上述程序比较简单,和单节点上的代码也很相似,配合注释就能够很好地理解,因此就不再多讲解了。

下面是Reducer部分的代码:


public static class Reduce

extends Reducer<Text, IntWritable, Text, IntWritable>{

public void reduce(Text key, Iterable<IntWritable>values,

Context context)throws IOException, InterruptedException{

int sum=0;

int count=0;

Iterator<IntWritable>iterator=values.iterator();

while(iterator.hasNext()){

sum+=iterator.next().get();

count++;

}

int average=(int)sum/count;

context.write(key, new IntWritable(average));

}

}


4.5.1 打包

为了能够在命令行中运行程序,首先需要对它进行编译和打包,下面就分别展示编译和打包的过程。

编译代码如下:


Javac-classpath/usr/local/hadoop/hadoop-1.0.1/hadoop-core-1.0.1.jar-d

ScoreProcessFinal_classes ScoreProcessFinal.java


上述命令会将ScoreProcessFinal.java编译后的所有class文件放到ScoreProcessFinal_classes文件夹下。执行下面的命令打包所有的class文件:


jar-cvf/usr/local/hadoop/hadoop-1.0.1/bin/ScoreProcessFinal.jar-C ScoreProcessFinal_classes/.

标明清单(manifest)

增加:ScoreProcessFinal$Map.class(读入=1899)(写出=806)(压缩了57%)

增加:ScoreProcessFinal$Reduce.class(读入=1671)(写出=707)(压缩了57%)

增加:ScoreProcessFinal.class(读入=2374)(写出=1183)(压缩了50%)