12.3.4 MapReduce与YARN结合
如果用户想要让一个新的计算框架运行在YARN上,需要将该框架重新封装成一个ApplicationMaster,而ApplicationMaster将作为用户应用程序的一部分被提交到YARN中。换句话说,YARN中的所有计算框架实际上只是客户端的一个库,因此,不必单独将这个框架以服务的形式部署到各个节点上。
由于YARN是直接从MRv 1衍化过来的,因此,它天生支持MapReduce计算框架。MapReduce框架对应的ApplicationMaster为MRAppMaster。接下来我们将详细对其进行介绍。
1.MRAppMaster基本构成
为了能够让MapReduce高效地运行在YARN之上,YARN采用异步事件模型对MRv 1中的几个重要数据结构进行了重写,使其更加高效。YARN对MRv 1的修改主要包括以下几方面:
❑将JobTracker中的作业控制(如作业创建,监控作业运行状态等)部分拆分出来,按照规范实现MapReduce计算框架的ApplicationMaster—MRAppMaster。
❑TaskTracker的部分功能由模块TaskAttemptListenerImpl完成。
❑利用状态机重写JobInProgress类,其主要功能由JobImpl完成。
❑利用状态机重写TaskInProgress类,其主要功能由MapTaskImpl/ReduceTaskImpl完成。
新的MapReduce计算框架的所有功能都浓缩在了MRAppMaster中,其主要架构如图12-10所示。它主要由以下几个模块组成。
❑ContainerAllocator:ContainerAllocator负责将Map Task和Reduce Task需要的资源转化成ResourceManager可识别的表示形式,并通过AMRMProtocol协议向Resource-Manager申请资源。
❑ContainerLauncher:MRAppMaster从ResourceManager端获取的资源是以Container表示的。Container中包含资源所在节点、资源种类以及每种资源的可用量等信息。ContainerLauncher通过ContainerManager协议与对应的节点通信,要求它启动任务。
❑TaskAttemptListener:功能类似于MRv 1中的TaskTracker;不同的是,它不是管理某个节点上的任务,而是所有节点上的任务。它实现了TaskUmbilicalProtocol协议,所有任务周期性向其汇报运行状态。如果某个任务在一段时间内未发送心跳信息,它会将其杀死以重新计算。
❑JobTokenSecretManager:管理作业令牌。作业令牌是Task与MRAppMaster通信,Reduce Task从Map Task拷贝数据的凭证,具体可参考第10章。
❑Task Cleaner:Task Cleaner负责清理失败或者被杀死任务产生的不完整输出结果,以防止大量无用的任务产生的结果塞满磁盘。
❑Speculator:如果发现某个任务的执行速度落后于同作业的其他任务,则Speculator会为该任务启动一个备份任务,具体参考第6章。
❑Recovery Service:当ApplicationMaster因失败重新启动后,Recovery Service可从日志中重构已经运行完成的任务的信息,进而避免重新计算这些任务。
❑MRClientService:MRClientService负责与客户端交互,可为客户端提供作业当前执行状态、进度等信息。
图 12-10 MRAppMaster的内部结构
2.MRAppMaster工作流程
按照作业大小不同,MRAppMaster提供了三种作业运行模式:本地模式(通常用于作业调试,与MRv 1一样,不再赘述)、Uber模式[1]和Non-Uber模式。对于小作业,为了降低其延迟,可采用Uber模式。在该模式下,所有Map Task和Reduce Task在同一个Container(MRAppMaster所在Container)中顺次执行。对于大作业,则采用Non-Uber模式。在该模式下,MRAppMaster先为Map Task申请资源,当Map Task运行完成的数目达到一定比例后再为Reduce Task申请资源。
(1)Uber运行模式
为了降低小作业延时,YARN专门对小作业运行方式进行了优化。对于小作业而言,MRAppMaster无须再为每个任务分别申请资源,而是让其重用一个Container,并按照先Map Task后Reduce Task的运行方式串行执行每个任务。在YARN中,如果一个MapReduce作业同时满足以下条件,则认为是小作业,可运行在Uber模式下:
❑Map Task数目不超过mapreduce.job.ubertask.maxmaps(默认是9)。
❑Reduce Task数目不超过mapreduce.job.ubertask.maxmaps(默认是1)。
❑输入文件大小不超过mapreduce.job.ubertask.maxbytes(默认是一个Block大小)。
❑Map Task和Reduce Task需要的资源量不超过MRAppMaster可使用的资源量。
另外,由于链式作业会并发执行Map Task和Reduce Task,因此不允许运行在Uber模式下。
(2)Non-Uber运行模式
在大数据环境下,Uber运行模式通常只能覆盖到一小部分作业,而对于其他大多数作业,仍将运行在Non-Uber模式下。在Non-Uber模式下,MRAppMaster将一个作业的Map Task和Reduce Task分为以下四种状态。
❑pending:刚启动但尚未向ResourceManager发送资源请求。
❑scheduled:已经向ResourceManager发送资源请求但尚未分配到资源。
❑assigned:已经分配到了资源且正在运行。
❑completed:已经运行完成。
对于Map Task而言,它的生命周期为scheduled→assigned→completed;而对于Reduce Task而言,它的生命周期为pending→scheduled→assigned→completed。由于Reduce Task的执行依赖于Map Task的输出结果,因此,为避免Reduce Task过早启动而造成资源利用率低下,MRAppMaster让刚启动的Reduce Task处于pending状态,以便能够根据Map Task运行情况决定是否对其进行调度。在YARN之上运行MapReduce作业需要解决两个关键问题:如何确定Reduce Task启动时机以及如何完成Shuffle功能。
如何确定Reduce Task启动时机
由于YARN中不再有Map slot和Reduce slot的概念,且RedouceManager也不知道Map Task与Reduce Task之间存在依赖关系,因此,MRAppMaster自己需设计资源申请策略以防止因Reduce Task过早启动而造成资源利用率低下和Map Task因分配不到资源而“饿死”。MRAppMaster在MRv 1原有策略(Map Task完成数目达到一定比例后才允许启动Reduce Task)基础上添加了更为严格的资源控制策略和抢占策略。总结起来,Reduce Task启动时机由以下三个参数控制。
❑mapreduce. job.reduce.slowstart.completedmaps:当Map Task完成的比例达到该值后才会为Reduce Task申请资源,默认是0.05。
❑yarn. app.mapreduce.am.job.reduce.rampup.limit:在Map Task完成前,最多启动的ReduceTask比例,默认为0.5。
❑yarn. app.mapreduce.am.job.reduce.preemption.limit:当Map Task需要资源但暂时无法获取资源(比如Reduce Task运行过程中,部分Map Task因结果丢失需重算)时,为了保证至少一个Map Task可以得到资源,最多可以抢占的Reduce Task比例,默认为0.5。
如何完成Shuffle功能
按照MapReduce的基本逻辑,Shuffle HTTP Server应该分布到各个节点上,以便能够支持各个Reduce Task远程拷贝数据。然而,由于Shuffle是MapReduce框架中特有的一个处理流程,从设计上讲,不应该将它直接嵌到YARN的某个组件(比如NodeManager)中。
前面提到,YARN采用了服务模型管理各个对象,且多个服务可以通过组合的方式交由一个服务进行统一管理。在YARN中,NodeManager作为一种组合服务模式,允许动态加载应用程序临时需要的附加服务。利用这一特性,YARN将Shuffle HTTP Server组装成了一种服务,以便让各个NodeManager加载它。
[1]https://issues. apache.org/jira/browse/MAPREDUCE-2405