4.8.3 链接MapReduce Job
在日常的数据处理过程中,常常会碰到有些问题不是一个MapReduce作业就能解决的,这时就需要在工作流中安排多个MapReduce作业,让它们配合起来自动完成一些复杂任务,而不需要用户手动启动每一个作业。那么怎样将MapReduce Job链接起来呢?应该怎么管理呢?下面来介绍如何链接MapReduce Job和如何配置MapReduce Job流。
1.线性MapReduce Job流
MapReduce Job也是一个程序,作为程序就是将输入经过处理再输出。所以在处理复杂问题的时候,如果一个Job不能完成,最简单的办法就是设置多个有一定顺序的Job,每个Job以前一个Job的输出作为输入,经过处理,将数据再输出到下一个Job中。这样Job流就能按照预定的代码处理数据,达到预期的目的。这种办法的具体实现非常简单:将每个Job的启动代码设置成只有上一个Job结束之后才执行,然后将Job的输入设置成上一个Job的输出路径。
2.复杂MapReduce Job流
第一种方法非常直观简单,但是在某些复杂任务下它仍然不能满足需求。一种情况是处理过程中数据流并不是简单的线性流,如Job3需要将Job1和Job2的输出结果组合起来进行处理。在这种情况下Job3的启动依赖于Job1和Job2的完成,但是Job1和Job2之间并没有关系。针对这种复杂情况,MapReduce框架提供了让用户将Job组织成复杂Job流的API—ControlledJob类和JobControl类(这两个类属于org.apache.hadoop.mapreduce.lib.jobcontrol包)。具体做法是:先按照正常情况配置各个Job,配置完成后再将各个Job封装到对应的ControlledJob对象中,然后使用ControlledJob的addDependingJob()设置依赖关系,接着再实例化一个JobControl对象,并使用addJob()方法将所有的Job注入JobControl对象中,最后使用JobControl对象的run方法启动Job流。
3.Job设置预处理和后处理过程
对于前面已经介绍的复杂任务的例子,使用前面的两种方法能很好地解决。现在假设另一种情况,在Job处理前和处理后需要做一些简单地处理,这种情况使用第一种方法仍能解决,但是如果针对这些简单的处理设置新的Job来处理稍显笨拙,这里涉及第三种情况,通过在Job前或后链接Map过程来解决预处理和后处理。比如,在一般统计词频的Job中,并不会统计那些无意义的单词(a、an和the等),这就需要在正式的Job前链接一个Map过程过滤掉这些无意义的单词。这种方法具体是通过MapReduce中org.apache.hadoop.mapred.lib包下的ChainMapper和ChainReducer两个静态类来实现的,这种方法最终形成的是一个独立的Job,而不是Job流,并且只有针对Job的输入输出流,各个阶段函数之间的输入输出MapReduce框架会自动组织。下面是一个具体的实现:
……
Configuration conf=new Configuration();
JobConf job=new JobConf(conf);
job.setJobName("Job");
job.setInputFormatClass(TextInputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
JobConf map1Conf=new JobConf(false);
ChainMapper.addMapper(job,
Map1.class,
LongWritable.class,
Text.class,
Text.class,
Text.class,
true,
map1Conf);
JobConf map2Conf=new JobConf(false);
ChainMapper.addMapper(job,
Map2.class,
Text.class,
Text.class,
LongWritable.class,
Text.class,
true,
map2Conf);
JobConf reduceConf=new JobConf(false);
ChainReducer.setReducer(job,
Reduce.class,
LongWritable.class,
Text.class,
Text.class,
Text.class,
true,
reduceConf);
JobConf map3Conf=new JobConf(false);
ChainReducer.addMapper(job,
Map3.class,
Text.class,
Text.class,
LongWritable.class,
Text.class,
true,
map3Conf);
JobClient.runJob(job);
在这个例子中,job对象先组织了作业全局的配置,接下来再使用ChainMapper和ChainReducer两个静态类的静态方法设置了作业的各个阶段函数。需要注意的是,ChainMapper和ChainReducer到目前为止只支持旧API,即Map和Reduce必须是实现org.apache.hadoop.mapred.Mapper接口的静态类(详细的示例程序请查看附录D)。