5.4.3 程序代码

程序代码如下:


package cn.edu.ruc.cloudcomputing.book.chapter05;

import java.io.IOException;

import java.util.*;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class STjoin{

public static int time=0;

//Map将输入分割成child和parent,然后正序输出一次作为右表,反序输出一次作为左表,需要注意

的是在输出的value中必须加上左右表区别标志

public static class Map extends Mapper<Object, Text, Text, Text>{

public void map(Object key, Text value, Context context)throws

IOException, InterruptedException{

String childname=new String();

String parentname=new String();

String relationtype=new String();

String line=value.toString();

int i=0;

while(line.charAt(i)!=''){

i++;

}

String[]values={line.substring(0,i),line.substring(i+1)};

if(values[0].compareTo("child")!=0)

{

childname=values[0];

parentname=values[1];

relationtype="1";//左右表区分标志

context.write(new Text(values[1]),new Text(relationtype

+"+"+childname+"+"+parentname));

//左表

relationtype="2";

context.write(new Text(values[0]),new Text(relationtype

+"+"+childname+"+"+parentname));

//右表

}

}

}

public static class Reduce extends Reducer<Text, Text, Text, Text>{

public void reduce(Text key, Iterable<Text>values, Context context)throws

IOException, InterruptedException{

if(time==0){//输出表头

context.write(new Text("grandchild"),new Text("grandparent"));

time++;

}

int grandchildnum=0;

String grandchild[]=new String[10];

int grandparentnum=0;

String grandparent[]=new String[10];

Iterator ite=values.iterator();

while(ite.hasNext())

{

String record=ite.next().toString();

int len=record.length();

int i=2;

if(len==0)continue;

char relationtype=record.charAt(0);

String childname=new String();

String parentname=new String();

//获取value-list中value的child

while(record.charAt(i)!='+')

{

childname=childname+record.charAt(i);

i++;

}

i=i+1;

//获取value-list中value的parent

while(i<len)

{

parentname=parentname+record.charAt(i);

i++;

}

//左表,取出child放入grandchild

if(relationtype=='1'){

grandchild[grandchildnum]=childname;

grandchildnum++;

}

else{//右表,取出parent放入grandparent

grandparent[grandparentnum]=parentname;

grandparentnum++;

}

}

//grandchild和grandparent数组求笛卡儿积

if(grandparentnum!=0&&grandchildnum!=0){

for(int m=0;m<grandchildnum;m++){

for(int n=0;n<grandparentnum;n++){

context.write(new Text(grandchild[m]),new Text(grandparent[n]));

//输出结果

}

}

}

}

}

public static void main(String[]args)throws Exception{

Configuration conf=new Configuration();

String[]otherArgs=new GenericOptionsParser(conf, args).getRemainingArgs();

if(otherArgs.length!=2){

System.err.println("Usage:wordcount<in><out>");

System.exit(2);

}

Job job=new Job(conf,"single table join");

job.setJarByClass(STjoin.class);

job.setMapperClass(Map.class);

job.setReducerClass(Reduce.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true)?0:1);

}

}