6.1.2 提交作业

一个MapReduce作业在提交到Hadoop上之后,会进入完全地自动化执行过程。在这个过程中,用户除了监控程序的执行情况和强制中止作业之外,不能对作业的执行过程进行任何干预。所以在作业提交之前,用户需要将所有应该配置的参数按照自己的需求配置完毕。需要配置的主要内容有:

程序代码:这里主要是指Map和Reduce函数的具体代码,这是一个MapReduce作业对应的程序必不可少的部分,并且这部分代码的逻辑正确与否与运行结果直接相关。

Map和Reduce接口的配置:在MapReduce中,Map接口需要派生自Mapper<k1,v1,k2,v2>接口,Reduce接口则要派生自Reducer<k2,v2,k3,v3>。它们都对应唯一一个方法,分别是Map函数和Reduce函数,也就是在上一点中所写的代码。在调用这两个方法时需要配置它们的四个参数,分别是输入key的数据类型、输入value的数据类型、输出key-value对的数据类型和context实例,其中输入输出的数据类型要与继承时所设置的数据类型相同。还有一个要求是Map接口的输出key-value类型和Reduce接口的输入key-value类型要对应,因为Map输出组合value之后,它们会成为Reduce的输入内容(初学者请特别注意,很多初学者编写的MapReduce程序中会忽视这个问题)。

输入输出路径:作业提交之前,还需要在主函数中配置MapReduce作业在Hadoop集群上的输入路径和输出路径(必须保证输出路径不存在,如果存在程序会报错,这也是初学者经常忽视的错误)。具体的代码是:


FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));


其他类型设置,比如调用runJob方法:先要在主函数中配置如Output的key和value类型、作业名称、InputFormat和OutputFormat等,最后再调用JobClient的runJob方法。

配置完作业的所有内容并确认无误之后就可以运行作业了,也就是执行图6-1中的步骤①(具体提交方法不再赘述,请参考本书的第5章)。

用户程序调用JobClient的runJob方法,在提交JobConf对象之后,runJob方法会先行调用JobSubmissionProtocol接口所定义的submitJob方法,并将作业提交给JobTracker。紧接着,runJob不断循环,并在循环中调用JobSubmissionProtocol的getTaskCompletionEvents方法,获取TaskCompletionEvent类的对象实例,了解作业的实时执行情况。如果发现作业运行状态有更新,就将状态报告给JobTracker。作业完成后,如果成功则显示作业计数器,否则,将导致作业失败的错误记录到控制台。

从上面介绍的作业提交的过程可以看出,最关键的是JobClient对象中submitJobInternal(final JobConf job)方法的调用执行(submitJob()方法调用此方法真正执行Job),那么submitJobInternal方法具体是怎么做的?下面从submitJobInternal的代码出发介绍作业提交的详细过程(只列举关键代码)。


public RunningJob submitJob(JobConf job)throws FileNotFoundException,

ClassNotFoundException, InvalidJobConfException, IOException{

……

//从JobTracker得到当前任务的ID

JobID jobId=jobSubmitClient.getNewJobId();

//获取HDFS路径:

Path submitJobDir=new Path(jobStagingArea, jobId.toString());

jobCopy.set("mapreduce.job.dir",submitJobDir.toString());

//获取路径令牌

TokenCache.obtainTokensForNameNodes(jobCopy.getCredentials(),new Path[]

{submitJobDir},jobCopy);

//为作业生成splits

FileSystem fs=submitJobDir.getFileSystem(jobCopy);

LOG.debug("Creating splits at"+fs.makeQualified(submitJobDir));

int maps=writeSplits(context, submitJobDir);

jobCopy.setNumMapTasks(maps);

//将Job的配置信息写入JobTracker的作业缓存文件中

FSDataOutputStream out=FileSystem.create(fs, submitSplitFile, new

FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));

try{

jobCopy.writeXml(out);

}finally{

out.close();

}

//真正地调用JobTracker来提交任务

JobStatus status=jobSubmitClient.submitJob(jobId, submitJobDir.toString(),

jobCopy.getCredentials());

……

}


从上面的代码可以看出,整个提交过程包含以下步骤:

1)通过调用JobTracker对象的getNewJobId()方法从JobTracker处获取当前作业的ID号(见图6-1中的步骤②)。

2)检查作业相关路径。在代码中获取各个路径信息时会对作业的对应路径进行检查。比如,如果没有指定输出目录或它已经存在,作业就不会被提交,并且会给MapReduce程序返回错误信息;再比如输入目录不存在或没有对应令牌也会返回错误等。

3)计算作业的输入划分,并将划分信息写入Job.split文件,如果写入失败就会返回错误。split文件的信息主要包括:split文件头、split文件版本号、split的个数。这些信息中每一条都会包括以下内容:split类型名(默认FileSplit)、split的大小、split的内容(对于FileSplit来说是写入的文件名,此split在文件中的起始位置上)、split的location信息(即在哪个DataNode上)。

4)将运行作业所需要的资源—包括作业JAR文件、配置文件和计算所得的输入划分等—复制到作业对应的HDFS上(见图6-1的步骤③)。

5)调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行(见图6-1的步骤④)。