6.1.6 更新任务执行进度和状态

在本章的作业提交过程中我们曾介绍:一个MapReduce作业在提交到Hadoop上之后,会进入完全地自动化执行过程,用户只能监控程序的执行状态和强制中止作业。但是MapReduce作业是一个长时间运行的批量作业,有时候可能需要运行数小时。所以对于用户而言,能够得知作业的运行状态是非常重要的。在Linux终端运行MapReduce作业时,可以看到在作业执行过程中有一些简单的作业执行状态报告,这能让用户大致了解作业的运行情况,并通过与预期运行情况的对比来确定作业是否按照预定方式运行。

在MapReduce作业中,作业的进度主要由一些可衡量可计数的小操作组成。比如在Map任务中,其任务进度就是已处理输入的百分比,如果完成100条记录中的50条,那么Map任务的进度就是50%(这里只是针对一个Map任务举例,并不是在Linux终端中执行MapReduce任务时出现的Map 50%,在终端中出现的50%是总体Map任务的进度,这是将所有Map任务的进度组合起来的结果)。总体来讲,MapReduce作业的进度由下面几项组成:Mapper(或Reducer)读入或写出一条记录,在报告中设置状态描述,增加计数器,调用Reporter对象的progess()方法。

由MapReduce作业分割成的每个任务中都有一组计数器,它们对任务执行过程中的进度组成事件进行计数。如果任务要报告进度,它便会设置一个标志以表明状态变化将会发送到TaskTracker上。另一个监听线程检查到这标志后,会告知TaskTracker当前的任务状态。具体代码如下(这是Map Task中run函数的部分代码):


//同TaskTracker通信,汇报任务执行进度

TaskReporter reporter=new TaskReporter(getProgress(),umbilical, jvmContext);

startCommunicationThread(umbilical);

initialize(job, getJobID(),reporter, useNewApi);


同时,TaskTracker在每隔5秒发送给JobTracker的心跳中封装任务状态,报告自己的任务执行状态。具体代码如下(这是TaskTracker中transmitHeartBeat()方法的部分代码):


//每隔一段时间,向JobTracker返回一些统计信息

boolean sendCounters;

if(now>(previousUpdate+COUNTER_UPDATE_INTERVAL)){

sendCounters=true;previousUpdate=now;

}

else{

sendCounters=false;

}


通过心跳通信机制,所有TaskTracker的统计信息都会汇总到JobTracker处。JobTracker将这些统计信息合并起来,产生一个全局作业进度统计信息,用来表明正在运行的所有作业,以及其中所含任务的状态。最后,JobClient通过每秒查看JobTracker来接收作业进度的最新状态。具体代码如下(这是JobClient中用来提交作业的runJob()方法的部分代码):


//首先生成一个JobClient对象

JobClient jc=new JobClient(job);

//调用submitJob来提交一个任务

running=jc.submitJob(job);

……

//使用monitorAndPrintJob方法不断监控作业进度

if(!jc.monitorAndPrintJob(job, rj)){

LOG.info("Job Failed:"+rj.getFailureInfo());

throw new IOException("Job failed!");

}