12.9.3 HBase与MapReduce

从图12-1中可以看出,在伪分布模式和完全分布模式下HBase是架构在HDFS之上的。因此完全可以将MapReduce编程框架和HBase结合起来使用。也就是说,将HBase作为底层“存储结构”,MapReduce调用HBase进行特殊的处理,这样能够充分结合HBase分布式大型数据库和MapReduce并行计算的优点。

下面我们给出了一个WordCount将MapReduce与HBase结合起来使用的例子,如代码清单12-3所示。在这个例子中,输入文件为user/hadoop/input/file01(它包含内容hello world bye world)和文件user/hadoop/input/file02(它包含内容hello hadoop bye hadoop)。

程序首先从文件中收集数据,在shuffle完成之后进行统计并计算,最后将计算结果存储到HBase中。

代码清单12-3 HBase与WordCount的结合使用


1 package cn.edn.ruc.cloudcomputing.book.chapter12;

2

3 import java.io.IOException;

4

5 import org.apache.hadoop.conf.Configuration;

6 import org.apache.hadoop.fs.Path;

7 import org.apache.hadoop.hbase.HBaseConfiguration;

8 import org.apache.hadoop.hbase.HColumnDescriptor;

9 import org.apache.hadoop.hbase.HTableDescriptor;

10 import org.apache.hadoop.hbase.client.HBaseAdmin;

11 import org.apache.hadoop.hbase.client.Put;

12 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;

13 import org.apache.hadoop.hbase.mapreduce.TableReducer;

14 import org.apache.hadoop.hbase.util.Bytes;

15 import org.apache.hadoop.io.IntWritable;

16 import org.apache.hadoop.io.LongWritable;

17 import org.apache.hadoop.io.NullWritable;

18 import org.apache.hadoop.io.Text;

19 import org.apache.hadoop.mapreduce.Job;

20 import org.apache.hadoop.mapreduce.Mapper;

21 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

22 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

23

24 public class WordCountHBase

25{

26 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable>{

27 private IntWritable i=new IntWritable(1);

28 public void map(LongWritable key, Text value, Context context)throws

IOException, InterruptedException{

29 String s[]=value.toString().trim().split("");//将输入的每

行输入以空格分开

30 for(String m:s){

31 context.write(new Text(m),i);

32}

33}

34}

35

36 public static class Reduce extends TableReducer<Text, IntWritable,

NullWritable>{

37 public void reduce(Text key, Iterable<IntWritable>values, Context

context)throws IOException, InterruptedException{

38 int sum=0;

39 for(IntWritable i:values){

40 sum+=i.get();

41}

42 Put put=new Put(Bytes.toBytes(key.toString()));//Put实例

化,每一个词存一行

43 put.add(Bytes.toBytes("content"),Bytes.toBytes("count"),Bytes.

toBytes(String.valueOf(sum)));//列族为content,列修饰符为count,列

值为数目

44 context.write(NullWritable.get(),put);

45}

46}

47

48 public static void createHBaseTable(String tablename)throws IOException{

49 HTableDescriptor htd=new HTableDescriptor(tablename);

50 HColumnDescriptor col=new HColumnDescriptor("content:");

51 htd.addFamily(col);

52 HBaseConfiguration config=new HBaseConfiguration();

53 HBaseAdmin admin=new HBaseAdmin(config);

54 if(admin.tableExists(tablename)){

55 System.out.println("table exists, trying recreate table!");

56 admin.disableTable(tablename);

57 admin.deleteTable(tablename);

58}

59 System.out.println("create new table:"+tablename);

60 admin.createTable(htd);

61}

62

63 public static void main(String args[])throws Exception{

64 String tablename="wordcount";

65 Configuration conf=new Configuration();

66 conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);

67 createHBaseTable(tablename);

68 String input=args[0];//设置输入值

69 Job job=new Job(conf,"WordCount table with"+input);

70 job.setJarByClass(WordCountHBase.class);

71 job.setNumReduceTasks(3);

72 job.setMapperClass(Map.class);

73 job.setReducerClass(Reduce.class);

74 job.setMapOutputKeyClass(Text.class);

75 job.setMapOutputValueClass(IntWritable.class);

76 job.setInputFormatClass(TextInputFormat.class);

77 job.setOutputFormatClass(TableOutputFormat.class);

78 FileInputFormat.addInputPath(job, new Path(input));

79 System.exit(job.waitForCompletion(true)?0:1);

80}

81}


在上述程序中,第26~34行代码负责设置Map作业;第36~46行代码负责设置Reduce作业;第48~61行代码为createHBaseTable函数,负责在HBase中创建存储WordCount输出结果的表。在Reduce作业中,第42~44行代码负责将结果存储到HBase表中。

程序运行成功后,现在通过HBase Shell检查输出结果,如图12-16所示。

12.9.3 HBase与MapReduce - 图1

图 12-16 HBase WordCount的运行结果

从输出结果中可以看出,bye、hadoop、hello、world四个单词均出现了两次。

关于HBase与MapReduce实际应用的更多详细信息请参阅http://wiki.apache.org/hadoop/Hbase/MapReduce。