7.1 调度平台的设计与实现
如果要设计实现一个简单的作业调度器,应该怎么做呢?
我们需要这个调度器提供机器资源的管理功能,可以支持上传任务包,任务包里包含并行计算的程序和资源请求配置,比如需要多少台机器来运行,然后用调度器来判断是否有足够的机器资源来启动,当然还需要一个存放任务包的队列,等等。
我们初步想了想,得到一个调度平台的架构,如图7-1所示。
图7-1 MPI调度器
我们设想这个设计结构大致由任务包、任务队列、资源池、任务调度器、任务执行器、分布式锁、监控管理等几个核心组件组成,能基本解决任务调度和资源调度问题,同时它不仅支持命令方式操作,也是可编程的,这样更灵活。
组件说明如下:
1)“任务包”是一个任务运行压缩包,包括了完整的任务运行程序、配置属性、优先级设置、安全权限等运行时需要的信息,可以定义任务运行所需要的资源,比如启动该任务程序,最少要占用多少CUP和内存。由开发者按照“任务包”的规范开发打包好,然后提交给“任务队列”等待执行,提交的方式可以是命令行也可以是管理界面。
2)“任务队列”是一个分布式消息队列的实现,它主要用于存放任务包,以排队的方式等待获取资源进行执行,而不是全部任务同时运行争抢资源,任务队列一般是先进先出,也可以根据优先级配置信息调整策略。
3)“监控管理”是建立在任务队列基础之上的一个界面系统,可以通过它查看队列里任务的数量、等待状况、执行状况,并且可以取消任务和查看任务输出日志,支持命令行和界面操作,同时对任务执行的关键环节(如完成或者取消中止)可以发出事件响应。
4)“资源池”是一个记录集群计算机资源实时信息的组件,它记录了集群可用计算机的CPU,内存的可用清单,并根据使用情况更新最新信息,资源池可以放在内存里,也可以持久保存在文件数据里。
5)“任务调度器”是一个一直运行并轮循检测是否未执行任务并有足够的资源运行。它会判断任务队列和资源池两个条件都满足的情况下,获取任务队列里的任务包发送到指定资源的机器上运行,并且根据任务包里的运行属性配置,比如超时检测,如果正常运行完成或者超时,都会释放所占有的资源,并且更新资源池,并标记任务队列里的队列属性状态(已完成或者中止等)或者删除。“任务调度器”从“资源池”获取到的资源有可能是多台机器的,也就是将任务包发送到多台机器并行执行,并且实时检查并行执行状况,这和Torque的方式完全不同。
6)“分布式锁”,由于“任务调度器”是一个不停止运行的重要组件,所以为了避免单点故障,它是一个主备关系,同时有“任务调度器-主”和“任务调度器-备”同时竞争一个分布式锁,竞争到者为主,未竞争到者为备进行替补等待,一旦“任务调度器-主”出故障,“任务调度器-备”马上抢到分布式锁并继续进行调度检测工作。
7)“任务执行器”负责PC机器资源的注册和资源隔离,以及在隔离的资源内执行任务。它首先会获取所在计算机的CUP可用核数、内存可用数等资源,发送到资源池进行注册,初始化资源池里的信息。当“任务调度器”发送任务到“任务执行器”执行时,并告诉所需要资源,“任务执行器”使用操作系统或者虚拟机的资源隔离技术,比如Linux的lxc和cgroup,进行资源的隔离,并且在该分配的资源下执行任务,同时将任务的执行状况实时汇报给“任务调度器”。当任务开始执行时,它会更新资源池的该资源占用信息,当任务结束释放资源时,它也会更新最新的资源信息。
设计完成后,我们再考虑一下如何落地实现,根据我们前面掌握的分布式实现技术,借助Fourinone大部分都可以实现了,例如:
❏ 任务队列使用消息队列实现,资源池使用缓存实现。
❏ 调度器根据任务队列和资源池条件,根据调度算法进行调度。
❏ 分布式锁采用分布式协调功能实现,任务执行采用自动部署实现。
不清楚的地方可以回过头查看前面章节相应的技术内容和Demo。
我们在前面的2.3节里面介绍过MPI,接下来我们基于上面的调度平台设计动手做一个MPI调度器,代码全部在MpiScheduler.java里。
实现说明:由于MPI的运行都是通过mpirun脚本命令进行的,一般会封装成一个sh的脚本文件,用户将sh脚本提交到MpiScheduler上,由MpiScheduler通过判断“机器列表资源池”配置和“任务队列”配置两个条件,计算出是否进行该计算单元的调度,如果机器资源不足或者没有任务则不调度,否则调用sh脚本执行,根据脚本返回值获取调度是否成功,完成后释放机器资源信息,MpiScheduler更新“机器列表资源池”。如果MpiScheduler接到新的提交作业的命令,没有足够的机器资源,则放入“任务队列”中等待。如果执行任务超时,则调用杀死MPI进程的脚本进行停止,并返回释放机器资源更新“机器列表”。
这样初步实现了作业提交,队列管理,机器资源分配和管理,MPI的sh脚本调度,调度结果检查等功能,代码里有详细注释,所用到统一配置,队列,分布式锁等分布式基础功能,前面有相关功能指南和demo。
运行步骤如下:
1)启动配置和队列服务:
- java –cp fourinone.jar: ParkServerDemo
2)启动调度器或提交:
- job:java –cp fourinone.jar: MpiScheduler 0 /home/mpi/ mpi.sh param 40 60
整体代码如下:
- //MpiScheduler.java
- import com.fourinone.Contractor;
- import com.fourinone.WareHouse;
- import com.fourinone.BeanContext;
- import com.fourinone.ParkLocal;
- import com.fourinone.ObjectBean;
- import com.fourinone.StartResult;
- import com.fourinone.FileAdapter;
- import java.util.List;
- import java.util.ArrayList;
- import java.io.Serializable;
- public class MpiScheduler extends Contractor
- {
- private static boolean runFlag = false;
- private static ParkLocal pl = BeanContext.getPark();//配置和队列管理
- public WareHouse giveTask(WareHouse inhouse)
- {
- //初始化mpi机器列表配置
- pl.create("mpi", 0+"", new Integer(50));
- pl.create("mpi", 1+"", new Integer(100));
- pl.create("mpi", 2+"", new Integer(50));
- ArrayList<StartResult<Integer>> rsarr = new ArrayList<StartResult <Integer>>();//调度结果数组
- while(true){
- //根据队列是否存在作业和是否有足够机器判断调度
- for(int i=0;i<3;i++){
- ObjectBean job = receive(i+"");//不阻塞接收队列,获取到作业对象
- if(job!=null){
- WareHouse jobobj = (WareHouse)job.toObject();
- int num = jobobj.getStringInt("computnum");//获取申请机器数
- Integer iob = (Integer)(pl.get("mpi",i+"").toObject());//获取配置列表里 剩下的机器数量
- if(iob>=num){
- System.out.print(i+"type spent "+num+" computer to doTask");
- pl.delete(job.getDomain(), job.getNode());//删除队列
- pl.update("mpi", i+"", new Integer(iob-num));//更新机器列表
- System.out.println(", and remain "+(iob-num));
- //FileAdapter.createTempFile
- //调度job对象里的sh脚本
- StartResult<Integer> res = BeanContext.tryStart(new FileAdapter (jobobj.getString("shdir")),"sh", jobobj.getString("sh"), jobobj.getString("param"), jobobj.getString("computnum"), ">>log/"+i+".log", "2>>&1");
- res.setObj("job",jobobj);
- rsarr.add(res);//将调度结果添加到结果数组
- }//else System.out.println(i+"type remain "+iob+" is not enough for "+num);
- }
- }
- //检查结果数组是否完成调度
- ArrayList<StartResult<Integer>> rmvrs = new ArrayList<StartResult<Integer >>();//记录完成的结果
- for(StartResult<Integer> rs:rsarr){
- WareHouse jobwh = (WareHouse)rs.getObj("job");
- int timeout = jobwh.getStringInt("timeout");
- if(rs!=null&&rs.getStatus(StartResult.s(timeout))!=StartResult. NOTREADY){//检查结果是否完成或超时
- System.out.print("Result:"+rs.getResult());
- Integer iob = (Integer)(pl.get("mpi",jobwh.getString("mpiType")). toObject());
- Integer newiob = iob+jobwh.getStringInt("computnum");
- pl.update("mpi", jobwh.getString("mpiType"), newiob);
- System.out.println(", and remain "+newiob+" "+jobwh);
- rmvrs.add(rs);
- }
- }
- rsarr.removeAll(rmvrs);//完成从结果数组删除
- }
- }
- //发送到队列
- public static void send(String queue, Object obj){
- pl.create(queue, (Serializable)obj);
- }
- //从队列接收
- public static ObjectBean receive(String queue){
- ObjectBean ob=null;
- List<ObjectBean> oblist = pl.get(queue);
- if(oblist!=null)
- ob = oblist.get(0);
- return ob;
- }
- public static void main(String[] args)
- {
- WareHouse jobwh = new WareHouse();
- jobwh.setString("mpiType",args[0]);//mpi计算类型
- jobwh.setString("shdir",args[1]);//sh脚本运行目录
- jobwh.setString("sh",args[2]);//sh脚本名称
- jobwh.setString("param",args[3]);//sh脚本参数
- jobwh.setString("computnum",args[4]);//申请计算机数量
- jobwh.setString("timeout",args[5]);//超时时间,单位为秒
- send(args[0],jobwh);//提交到队列
- System.out.println("submit job to queue:"+jobwh);
- //判断锁是否作为调度服务,不作为调度服务提交job后直接退出
- ObjectBean oblock = pl.get("mpi","lock");
- if(oblock==null){
- pl.create("mpi", "lock", true, true);
- MpiScheduler a = new MpiScheduler();
- a.giveTask(null);
- }
- }
- }
关于MPI调度在7.4.1节“其他MPI作业资源调度技术”我们还会介绍市场上基于Torque等任务管理工具方式的做法。