6.1.2 提交作业
一个MapReduce作业在提交到Hadoop上之后,会进入完全地自动化执行过程。在这个过程中,用户除了监控程序的执行情况和强制中止作业之外,不能对作业的执行过程进行任何干预。所以在作业提交之前,用户需要将所有应该配置的参数按照自己的需求配置完毕。需要配置的主要内容有:
程序代码:这里主要是指Map和Reduce函数的具体代码,这是一个MapReduce作业对应的程序必不可少的部分,并且这部分代码的逻辑正确与否与运行结果直接相关。
Map和Reduce接口的配置:在MapReduce中,Map接口需要派生自Mapper<k1,v1,k2,v2>接口,Reduce接口则要派生自Reducer<k2,v2,k3,v3>。它们都对应唯一一个方法,分别是Map函数和Reduce函数,也就是在上一点中所写的代码。在调用这两个方法时需要配置它们的四个参数,分别是输入key的数据类型、输入value的数据类型、输出key-value对的数据类型和context实例,其中输入输出的数据类型要与继承时所设置的数据类型相同。还有一个要求是Map接口的输出key-value类型和Reduce接口的输入key-value类型要对应,因为Map输出组合value之后,它们会成为Reduce的输入内容(初学者请特别注意,很多初学者编写的MapReduce程序中会忽视这个问题)。
输入输出路径:作业提交之前,还需要在主函数中配置MapReduce作业在Hadoop集群上的输入路径和输出路径(必须保证输出路径不存在,如果存在程序会报错,这也是初学者经常忽视的错误)。具体的代码是:
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
其他类型设置,比如调用runJob方法:先要在主函数中配置如Output的key和value类型、作业名称、InputFormat和OutputFormat等,最后再调用JobClient的runJob方法。
配置完作业的所有内容并确认无误之后就可以运行作业了,也就是执行图6-1中的步骤①(具体提交方法不再赘述,请参考本书的第5章)。
用户程序调用JobClient的runJob方法,在提交JobConf对象之后,runJob方法会先行调用JobSubmissionProtocol接口所定义的submitJob方法,并将作业提交给JobTracker。紧接着,runJob不断循环,并在循环中调用JobSubmissionProtocol的getTaskCompletionEvents方法,获取TaskCompletionEvent类的对象实例,了解作业的实时执行情况。如果发现作业运行状态有更新,就将状态报告给JobTracker。作业完成后,如果成功则显示作业计数器,否则,将导致作业失败的错误记录到控制台。
从上面介绍的作业提交的过程可以看出,最关键的是JobClient对象中submitJobInternal(final JobConf job)方法的调用执行(submitJob()方法调用此方法真正执行Job),那么submitJobInternal方法具体是怎么做的?下面从submitJobInternal的代码出发介绍作业提交的详细过程(只列举关键代码)。
public RunningJob submitJob(JobConf job)throws FileNotFoundException,
ClassNotFoundException, InvalidJobConfException, IOException{
……
//从JobTracker得到当前任务的ID
JobID jobId=jobSubmitClient.getNewJobId();
//获取HDFS路径:
Path submitJobDir=new Path(jobStagingArea, jobId.toString());
jobCopy.set("mapreduce.job.dir",submitJobDir.toString());
//获取路径令牌
TokenCache.obtainTokensForNameNodes(jobCopy.getCredentials(),new Path[]
{submitJobDir},jobCopy);
//为作业生成splits
FileSystem fs=submitJobDir.getFileSystem(jobCopy);
LOG.debug("Creating splits at"+fs.makeQualified(submitJobDir));
int maps=writeSplits(context, submitJobDir);
jobCopy.setNumMapTasks(maps);
//将Job的配置信息写入JobTracker的作业缓存文件中
FSDataOutputStream out=FileSystem.create(fs, submitSplitFile, new
FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
try{
jobCopy.writeXml(out);
}finally{
out.close();
}
//真正地调用JobTracker来提交任务
JobStatus status=jobSubmitClient.submitJob(jobId, submitJobDir.toString(),
jobCopy.getCredentials());
……
}
从上面的代码可以看出,整个提交过程包含以下步骤:
1)通过调用JobTracker对象的getNewJobId()方法从JobTracker处获取当前作业的ID号(见图6-1中的步骤②)。
2)检查作业相关路径。在代码中获取各个路径信息时会对作业的对应路径进行检查。比如,如果没有指定输出目录或它已经存在,作业就不会被提交,并且会给MapReduce程序返回错误信息;再比如输入目录不存在或没有对应令牌也会返回错误等。
3)计算作业的输入划分,并将划分信息写入Job.split文件,如果写入失败就会返回错误。split文件的信息主要包括:split文件头、split文件版本号、split的个数。这些信息中每一条都会包括以下内容:split类型名(默认FileSplit)、split的大小、split的内容(对于FileSplit来说是写入的文件名,此split在文件中的起始位置上)、split的location信息(即在哪个DataNode上)。
4)将运行作业所需要的资源—包括作业JAR文件、配置文件和计算所得的输入划分等—复制到作业对应的HDFS上(见图6-1的步骤③)。
5)调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行(见图6-1的步骤④)。