5.5.3 程序代码

程序代码如下:


package cn.edu.ruc.cloudcomputing.book.chapter05;

import java.io.IOException;

import java.util.*;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class MTjoin{

public static int time=0;

public static class Map extends Mapper<Object, Text, Text, Text>{

//在Map中先区分输入行属于左表还是右表,然后对两列值进行分割,

//连接列保存在key值,剩余列和左右表标志保存在value中,最后输出

public void map(Object key, Text value, Context context)throws

IOException, InterruptedException{

String line=value.toString();

int i=0;

//输入文件首行,不处理

if(line.contains("factoryname")==true||line.contains("addressID")==true){

return;

}

//找出数据中的分割点

while(line.charAt(i)>='9'||line.charAt(i)<='0'){

i++;

}

if(line.charAt(0)>='9'||line.charAt(0)<='0'){

//左表

int j=i-1;

while(line.charAt(j)!='')j—;

String[]values={line.substring(0,j),line.substring(i)};

context.write(new Text(values[1]),new Text("1+"+values[0]));

}

else{//右表

int j=i+1;

while(line.charAt(j)!='')j++;

String[]values={line.substring(0,i+1),line.substring(j)};

context.write(new Text(values[0]),new Text("2+"+values[1]));

}

}

}

public static class Reduce extends Reducer<Text, Text, Text, Text>{

//Reduce解析Map输出,将value中数据按照左右表分别保存,然后求//笛卡儿积,输出

public void reduce(Text key, Iterable<Text>values, Context context)throws

IOException, InterruptedException{

if(time==0){//输出文件第一行

context.write(new Text("factoryname"),new Text("addressname"));

time++;

}

int factorynum=0;

String factory[]=new String[10];

int addressnum=0;

String address[]=new String[10];

Iterator ite=values.iterator();

while(ite.hasNext())

{

String record=ite.next().toString();

int len=record.length();

int i=2;

char type=record.charAt(0);

String factoryname=new String();

String addressname=new String();

if(type=='1'){//左表

factory[factorynum]=record.substring(2);

factorynum++;

}

else{//右表

address[addressnum]=record.substring(2);

addressnum++;

}

}

if(factorynum!=0&&addressnum!=0){//求笛卡儿积

for(int m=0;m<factorynum;m++){

for(int n=0;n<addressnum;n++){

c o n t e x t.w r i t e(n e w T e x t(f a c t o r y[m]),n e w

Text(address[n]));

}

}

}

}

}

public static void main(String[]args)throws Exception{

Configuration conf=new Configuration();

String[]otherArgs=new GenericOptionsParser(conf, args).getRemainingArgs();

if(otherArgs.length!=2){

System.err.println("Usage:wordcount<in><out>");

System.exit(2);

}

Job job=new Job(conf,"multiple table join");

job.setJarByClass(MTjoin.class);

job.setMapperClass(Map.class);

job.setReducerClass(Reduce.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true)?0:1);

}

}