12.9.3 HBase与MapReduce
从图12-1中可以看出,在伪分布模式和完全分布模式下HBase是架构在HDFS之上的。因此完全可以将MapReduce编程框架和HBase结合起来使用。也就是说,将HBase作为底层“存储结构”,MapReduce调用HBase进行特殊的处理,这样能够充分结合HBase分布式大型数据库和MapReduce并行计算的优点。
下面我们给出了一个WordCount将MapReduce与HBase结合起来使用的例子,如代码清单12-3所示。在这个例子中,输入文件为user/hadoop/input/file01(它包含内容hello world bye world)和文件user/hadoop/input/file02(它包含内容hello hadoop bye hadoop)。
程序首先从文件中收集数据,在shuffle完成之后进行统计并计算,最后将计算结果存储到HBase中。
代码清单12-3 HBase与WordCount的结合使用
1 package cn.edn.ruc.cloudcomputing.book.chapter12;
2
3 import java.io.IOException;
4
5 import org.apache.hadoop.conf.Configuration;
6 import org.apache.hadoop.fs.Path;
7 import org.apache.hadoop.hbase.HBaseConfiguration;
8 import org.apache.hadoop.hbase.HColumnDescriptor;
9 import org.apache.hadoop.hbase.HTableDescriptor;
10 import org.apache.hadoop.hbase.client.HBaseAdmin;
11 import org.apache.hadoop.hbase.client.Put;
12 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
13 import org.apache.hadoop.hbase.mapreduce.TableReducer;
14 import org.apache.hadoop.hbase.util.Bytes;
15 import org.apache.hadoop.io.IntWritable;
16 import org.apache.hadoop.io.LongWritable;
17 import org.apache.hadoop.io.NullWritable;
18 import org.apache.hadoop.io.Text;
19 import org.apache.hadoop.mapreduce.Job;
20 import org.apache.hadoop.mapreduce.Mapper;
21 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
22 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
23
24 public class WordCountHBase
25{
26 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable>{
27 private IntWritable i=new IntWritable(1);
28 public void map(LongWritable key, Text value, Context context)throws
IOException, InterruptedException{
29 String s[]=value.toString().trim().split("");//将输入的每
行输入以空格分开
30 for(String m:s){
31 context.write(new Text(m),i);
32}
33}
34}
35
36 public static class Reduce extends TableReducer<Text, IntWritable,
NullWritable>{
37 public void reduce(Text key, Iterable<IntWritable>values, Context
context)throws IOException, InterruptedException{
38 int sum=0;
39 for(IntWritable i:values){
40 sum+=i.get();
41}
42 Put put=new Put(Bytes.toBytes(key.toString()));//Put实例
化,每一个词存一行
43 put.add(Bytes.toBytes("content"),Bytes.toBytes("count"),Bytes.
toBytes(String.valueOf(sum)));//列族为content,列修饰符为count,列
值为数目
44 context.write(NullWritable.get(),put);
45}
46}
47
48 public static void createHBaseTable(String tablename)throws IOException{
49 HTableDescriptor htd=new HTableDescriptor(tablename);
50 HColumnDescriptor col=new HColumnDescriptor("content:");
51 htd.addFamily(col);
52 HBaseConfiguration config=new HBaseConfiguration();
53 HBaseAdmin admin=new HBaseAdmin(config);
54 if(admin.tableExists(tablename)){
55 System.out.println("table exists, trying recreate table!");
56 admin.disableTable(tablename);
57 admin.deleteTable(tablename);
58}
59 System.out.println("create new table:"+tablename);
60 admin.createTable(htd);
61}
62
63 public static void main(String args[])throws Exception{
64 String tablename="wordcount";
65 Configuration conf=new Configuration();
66 conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);
67 createHBaseTable(tablename);
68 String input=args[0];//设置输入值
69 Job job=new Job(conf,"WordCount table with"+input);
70 job.setJarByClass(WordCountHBase.class);
71 job.setNumReduceTasks(3);
72 job.setMapperClass(Map.class);
73 job.setReducerClass(Reduce.class);
74 job.setMapOutputKeyClass(Text.class);
75 job.setMapOutputValueClass(IntWritable.class);
76 job.setInputFormatClass(TextInputFormat.class);
77 job.setOutputFormatClass(TableOutputFormat.class);
78 FileInputFormat.addInputPath(job, new Path(input));
79 System.exit(job.waitForCompletion(true)?0:1);
80}
81}
在上述程序中,第26~34行代码负责设置Map作业;第36~46行代码负责设置Reduce作业;第48~61行代码为createHBaseTable函数,负责在HBase中创建存储WordCount输出结果的表。在Reduce作业中,第42~44行代码负责将结果存储到HBase表中。
程序运行成功后,现在通过HBase Shell检查输出结果,如图12-16所示。
图 12-16 HBase WordCount的运行结果
从输出结果中可以看出,bye、hadoop、hello、world四个单词均出现了两次。
关于HBase与MapReduce实际应用的更多详细信息请参阅http://wiki.apache.org/hadoop/Hbase/MapReduce。