7.3.2 状态发送

TaskTracker通过心跳向JobTracker汇报的是当前节点运行时信息,包括TaskTracker基本信息、节点资源使用情况和各个任务状态等,这些信息被封装到可序列化类TaskTrackerStatus中。每次发送心跳时,TaskTracker会根据最新信息重新构造一个TaskTrackerStatus,且每次包含的信息量可能不一样。比如,任务的计数器信息每隔60 s才会发送一次,且只有当askForNewTask为true时,才会发送节点资源使用信息。其中,askForNewTask值的计算方法如下:


askForNewTask=

((status.countOccupiedMapSlots()<maxMapSlots||

status.countOccupiedReduceSlots()<maxReduceSlots)&&

acceptNewTasks);//存在空闲的Map slot或者Reduce slot,且磁盘剩余空间大于

mapred.local.dir.minspacekill


TaskTrackerStatus包含的基本信息如下:


String trackerName;//TaskTracker名称

String host;//TaskTracker主机名

int httpPort;//TaskTracker对外的HTTP端口号

int failures;//TaskTracker上已经失败任务总数

List<TaskStatus>taskReports;//当前TaskTracker上各个任务运行状态

volatile long lastSeen;//上次汇报心跳的时间

private int maxMapTasks;//Map slot总数,即允许同时运行的Map Task总数,由参数mapred.

tasktracker.map.tasks.maximum设定

private int maxReduceTasks;//Reduce slot总数,即允许同时运行的Reduce Task总数,由参

数mapred.tasktracker.reduce.tasks.maximum设定

private TaskTrackerHealthStatus healthStatus;//TaskTracker健康状态

private ResourceStatus resStatus;//TaskTracker资源(内存,CPU等)信息


下面重点介绍taskReport、healthStatus和resStatus三个变量值的意义及其计算方法。

1.taskReport

taskReport保存当前TaskTracker上所有任务(实际为Task Attempt)的运行状态,每个任务保存信息如下:


public abstract class TaskStatus implements Writable, Cloneable{

……

private final TaskAttemptID taskid;//Task Attempt ID

private float progress;//任务执行进度,范围为0.0~1.0

private volatile State runState;/*任务运行状态,包括RUNNING, SUCCEEDED, FAILED,

UNASSIGNED, KILLED, COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN*/

private String diagnosticInfo;//诊断信息,一般为错误信息和运行异常信息

private String stateString;//字符串形式的运行状态

private String taskTracker;//该TaskTracker名称,可唯一表示一个TaskTracker,形式如

tracker_mymachine:50010

private int numSlots;//运行该Task Attempt需要的slot数目,默认值是1

private long startTime;//Task Attempt开始时间

private long finishTime;//Task Attempt完成时间

private long outputSize=-1L;//Task Attempt输出数据量

private volatile Phase phase=Phase.STARTING;//任务运行阶段,包括STARTING, MAP,

SHUFFLE, SORT, REDUCE, CLEANUP

private Counters counters;//该任务中定义的所有计数器(包括系统自带计数器和用户自定义计数

器两种)

private boolean includeCounters;//是否包含计数器,计数器信息每隔60 s发送一次

//下一个要处理的数据区间,用于定位坏记录所在区间

private SortedRanges.Range nextRecordRange=new SortedRanges.Range();

……

}


2.healthStatus

healthStatus保存了当前节点健康状况,该变量对应TaskTrackerHealthStatus类,定义如下:


static class TaskTrackerHealthStatus implements Writable{

private boolean isNodeHealthy;//节点是否健康

private String healthReport;//如果节点不健康,则记录导致不健康的原因

private long lastReported;//上次汇报健康状况的时间

……

}


healthStatus是由NodeHealthCheckerService线程[1]计算得到的。该线程允许管理员配置一个“健康监测脚本”以检查节点健康状况,且管理员可在该脚本中添加任何检查语句作为节点是否健康运行的依据。如果脚本检测到该节点处于不健康状态,它需要在标准输出中打印一条以字符串“ERROR”开头的输出语句。NodeHealthCheckerService线程周期性调用健康监测脚本并检查其输出,一旦发现脚本输出是以“ERROR”开头的字符串,则认为节点处于不健康状态,进而将其标注为“unhealthy”并通过心跳告诉JobTracker,而JobTracker得知节点状态变为“unhealthy”后,会将其加入黑名单,此后不再为它分配新任务。需要注意的是,只要TaskTracker服务是活着的,该线程会一直运行该脚本,一旦发现节点又变为“healthy”,JobTracker会立刻将其从黑名单中移除,从而又会为之分配任务。通过引入该机制,可带来很多好处。

❑可作为节点负载的反馈:比如,可让健康检测脚本检查网络、磁盘、文件系统等运行状况,一旦发现特殊情况,比如网络拥塞、磁盘空间不足或者文件系统出现问题,可将健康状况变为“unhealthy”,暂时不接收新的任务,待它们恢复正常后再继续接收新任务。

❑人为暂时维护TaskTracker:如果发现TaskTracker所在节点出现故障,可通过控制脚本输出暂时让该TaskTracker停止接收新任务以便进行维护,待维护完成后,修改脚本输出以让TaskTracker继续接收新任务。

NodeHealthCheckerService线程包含四个可配置参数,具体如表7-2所示。用户可根据需要在mapred-site.xml文件中配置这些参数。

7.3.2 状态发送 - 图1

下面给出一个健康监测脚本实例。在这个Shell脚本中,当一个节点上的空闲内存量低于总内存量的10%时,将打印以“ERROR”开头的字符串,这样,该节点将不再向JobTracker请求新任务。


!/bin/bash

MEMORY_RATIO=0.1

freeMem=grep MemFree/proc/meminfo|awk'{print$2}'

totalMem=grep MemTotal/proc/meminfo|awk'{print$2}'

limitMem=echo|awk'{print int("'$totalMem'"*"'$MEMORY_RATIO'")}'

if[$freeMem-lt$limitMem];then

echo"ERROR, totalMem=$totalMem, freeMem=$freeMem, limitMem=$limitMem"

else

echo"OK, totalMem=$totalMem, freeMem=$freeMem, limitMem=$limitMem"

fi


3.resStatus

resStatus保存了当前TaskTracker资源使用情况。该变量对应ResourceStatus类,定义如下:


static class ResourceStatus implements Writable{

private long totalVirtualMemory;//总的可用虚拟内存量,单位为byte

private long totalPhysicalMemory;//总的可用物理内存量

private long mapSlotMemorySizeOnTT;//每个Map slot对应的内存量

private long reduceSlotMemorySizeOnTT;//每个Reduce slot对应的内存量

private long availableSpace;//可用磁盘空间

private long availableVirtualMemory=UNAVAILABLE;//可用的虚拟内存量

private long availablePhysicalMemory=UNAVAILABLE;//可用的物理内存量

private int numProcessors=UNAVAILABLE;//节点总的处理器个数

private long cumulativeCpuTime=UNAVAILABLE;//运行以来累计的CPU使用时间

private long cpuFrequency=UNAVAILABLE;//CPU主频,单位为kHz

private float cpuUsage=UNAVAILABLE;//CPU使用率,单位为%

……

}


resStatus是由可插拔组件ResourceCalculatorPlugin(抽象类)获取的,当前只存在Linux版本实现的LinuxResourceCalculatorPlugin中,其他操作系统尚未实现,这意味着只有在Linux下才可以获取资源使用信息。此外,Hadoop也允许用户自己编写ResourceCalculatorPlugin实现,且用户只需通过参数mapreduce.tasktracker.resourcecalculatorplugin指定该类即可启用它。

我们都知道,在Linux操作系统中,proc虚拟文件系统中包含了一些目录和文件,它们向用户动态呈现了内核中的一些实时信息,比如进程运行时的资源使用信息。LinuxResourceCalculatorPlugin类正是通过读取/proc目录下的meminfo、cpuinfo和stat三个文件获取节点上的内存、CPU等资源的实时使用情况的。

[1]https://issues. apache.org/jira/browse/MAPREDUCE-211