6.4.2 JobInProgress

JobInProgress类主要用于监控和跟踪作业运行状态,并为调度器提供最底层的调度接口。本小节主要介绍其作业监控信息,而其调度函数相关实现将在6.7节中介绍。

JobInProgress维护了两种作业信息:一种是静态信息,这些信息是作业提交之时就已经确定好的;另一种是动态信息,这些信息随着作业的运行而动态变化。

(1)作业静态信息

作业静态信息是指作业提交之时就已经确定好的属性信息,主要包括以下几项:


//map task, reduce task, cleanup task和setup task对应的TaskInProgress

TaskInProgress maps[]=new TaskInProgress[0];

TaskInProgress reduces[]=new TaskInProgress[0];

TaskInProgress cleanup[]=new TaskInProgress[0];

TaskInProgress setup[]=new TaskInProgress[0];

int numMapTasks=0;//Map Task个数

int numReduceTasks=0;//Reduce Task个数

final long memoryPerMap;//每个Map Task需要的内存量

final long memoryPerReduce;//每个Reduce Task需要的内存量

volatile int numSlotsPerMap=1;//每个Map Task需要的slot个数

volatile int numSlotsPerReduce=1;//每个Reduce Task需要的slot个数

/*允许每个TaskTracker上失败的Task个数,默认是4,通过参数mapred.max.tracker.failures

设置。当该作业在某个TaskTracker上失败的个数超过该值时,会将该节点添加到该作业的黑名单中,调度

器便不再为该节点分配该作业的任务*/

final int maxTaskFailuresPerTracker;

……

private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART=0.05f;

//当有5%的Map Task完成后,才可以调度Reduce Task

int completedMapsForReduceSlowstart=0;//多少Map Task完成后开始调度Reduce Task

……

//允许的Map Task失败比例上限,通过参数mapred.max.map.failures.percent设置

final int mapFailuresPercent;

//允许的Reduce Task失败比例上限,通过参数mapred.max.reduce.failures.percent设置

final int reduceFailuresPercent;

……

JobPriority priority=JobPriority.NORMAL;//作业优先级


(2)作业动态信息

作业动态信息是指作业运行过程中会动态更新的信息。这些信息对于发现TaskTracker/Job/Task故障非常有用,也可以为调度器进行任务调度提供决策依据。


int runningMapTasks=0;//正在运行的Map Task数目

int runningReduceTasks=0;//正在运行的Reduce Task数目

int finishedMapTasks=0;//运行完成的Map Task数目

int finishedReduceTasks=0;//运行完成的Reduce Task数目

int failedMapTasks=0;//失败的Map Task Attempt数目

int failedReduceTasks=0;//失败的Reduce Task Attempt数目

……

int speculativeMapTasks=0;//正在运行的备份任务(MAP)数目

int speculativeReduceTasks=0;//正在运行的备份任务(REDUCE)数目

int failedMapTIPs=0;/*失败的TaskInProgress(MAP)数目,这意味着对应的输入数据将被丢弃,

不会产生最终结果*/

int failedReduceTIPs=0;//失败的TaskInProgress(REDUCE)数目

private volatile boolean launchedCleanup=false;//是否已启动Cleanup Task

private volatile boolean launchedSetup=false;//是否已启动Setup Task

private volatile boolean jobKilled=false;//作业是否已被杀死

private volatile boolean jobFailed=false;//作业是否已失败

//节点与TaskInProgress的映射关系,即TaskInProgress输入数据位置与节点对应关系

Map<Node, List<TaskInProgress>>nonRunningMapCache;

//节点及其上面正在运行的Task映射关系

Map<Node, Set<TaskInProgress>>runningMapCache;

/*不需要考虑数据本地性的Map Task,如果一个Map Task的InputSplit Location为空,则进行任

务调度时不需考虑本地性*/

final List<TaskInProgress>nonLocalMaps;

//按照失败次数进行排序的TIP集合

final SortedSet<TaskInProgress>failedMaps;

//未运行的Map Task集合

Set<TaskInProgress>nonLocalRunningMaps;

//未运行的Reduce Task集合

Set<TaskInProgress>nonRunningReduces;

//正在运行的Reduce Task集合

Set<TaskInProgress>runningReduces;

//待清理的Map Task列表,比如用户直接通过命令“bin/hadoop job-kill”杀死的Task

List<TaskAttemptID>mapCleanupTasks=new LinkedList<TaskAttemptID>();

List<TaskAttemptID>reduceCleanupTasks=new LinkedList<TaskAttemptID>();

long startTime;//作业提交时间

long launchTime;//作业开始执行时间

long finishTime;//作业完成时间