7.1 调度平台的设计与实现

如果要设计实现一个简单的作业调度器,应该怎么做呢?

我们需要这个调度器提供机器资源的管理功能,可以支持上传任务包,任务包里包含并行计算的程序和资源请求配置,比如需要多少台机器来运行,然后用调度器来判断是否有足够的机器资源来启动,当然还需要一个存放任务包的队列,等等。

我们初步想了想,得到一个调度平台的架构,如图7-1所示。

7.1 调度平台的设计与实现 - 图1

图7-1 MPI调度器

我们设想这个设计结构大致由任务包、任务队列、资源池、任务调度器、任务执行器、分布式锁、监控管理等几个核心组件组成,能基本解决任务调度和资源调度问题,同时它不仅支持命令方式操作,也是可编程的,这样更灵活。

组件说明如下:

1)“任务包”是一个任务运行压缩包,包括了完整的任务运行程序、配置属性、优先级设置、安全权限等运行时需要的信息,可以定义任务运行所需要的资源,比如启动该任务程序,最少要占用多少CUP和内存。由开发者按照“任务包”的规范开发打包好,然后提交给“任务队列”等待执行,提交的方式可以是命令行也可以是管理界面。

2)“任务队列”是一个分布式消息队列的实现,它主要用于存放任务包,以排队的方式等待获取资源进行执行,而不是全部任务同时运行争抢资源,任务队列一般是先进先出,也可以根据优先级配置信息调整策略。

3)“监控管理”是建立在任务队列基础之上的一个界面系统,可以通过它查看队列里任务的数量、等待状况、执行状况,并且可以取消任务和查看任务输出日志,支持命令行和界面操作,同时对任务执行的关键环节(如完成或者取消中止)可以发出事件响应。

4)“资源池”是一个记录集群计算机资源实时信息的组件,它记录了集群可用计算机的CPU,内存的可用清单,并根据使用情况更新最新信息,资源池可以放在内存里,也可以持久保存在文件数据里。

5)“任务调度器”是一个一直运行并轮循检测是否未执行任务并有足够的资源运行。它会判断任务队列和资源池两个条件都满足的情况下,获取任务队列里的任务包发送到指定资源的机器上运行,并且根据任务包里的运行属性配置,比如超时检测,如果正常运行完成或者超时,都会释放所占有的资源,并且更新资源池,并标记任务队列里的队列属性状态(已完成或者中止等)或者删除。“任务调度器”从“资源池”获取到的资源有可能是多台机器的,也就是将任务包发送到多台机器并行执行,并且实时检查并行执行状况,这和Torque的方式完全不同。

6)“分布式锁”,由于“任务调度器”是一个不停止运行的重要组件,所以为了避免单点故障,它是一个主备关系,同时有“任务调度器-主”和“任务调度器-备”同时竞争一个分布式锁,竞争到者为主,未竞争到者为备进行替补等待,一旦“任务调度器-主”出故障,“任务调度器-备”马上抢到分布式锁并继续进行调度检测工作。

7)“任务执行器”负责PC机器资源的注册和资源隔离,以及在隔离的资源内执行任务。它首先会获取所在计算机的CUP可用核数、内存可用数等资源,发送到资源池进行注册,初始化资源池里的信息。当“任务调度器”发送任务到“任务执行器”执行时,并告诉所需要资源,“任务执行器”使用操作系统或者虚拟机的资源隔离技术,比如Linux的lxc和cgroup,进行资源的隔离,并且在该分配的资源下执行任务,同时将任务的执行状况实时汇报给“任务调度器”。当任务开始执行时,它会更新资源池的该资源占用信息,当任务结束释放资源时,它也会更新最新的资源信息。

设计完成后,我们再考虑一下如何落地实现,根据我们前面掌握的分布式实现技术,借助Fourinone大部分都可以实现了,例如:

❏ 任务队列使用消息队列实现,资源池使用缓存实现。

❏ 调度器根据任务队列和资源池条件,根据调度算法进行调度。

❏ 分布式锁采用分布式协调功能实现,任务执行采用自动部署实现。

不清楚的地方可以回过头查看前面章节相应的技术内容和Demo。

我们在前面的2.3节里面介绍过MPI,接下来我们基于上面的调度平台设计动手做一个MPI调度器,代码全部在MpiScheduler.java里。

实现说明:由于MPI的运行都是通过mpirun脚本命令进行的,一般会封装成一个sh的脚本文件,用户将sh脚本提交到MpiScheduler上,由MpiScheduler通过判断“机器列表资源池”配置和“任务队列”配置两个条件,计算出是否进行该计算单元的调度,如果机器资源不足或者没有任务则不调度,否则调用sh脚本执行,根据脚本返回值获取调度是否成功,完成后释放机器资源信息,MpiScheduler更新“机器列表资源池”。如果MpiScheduler接到新的提交作业的命令,没有足够的机器资源,则放入“任务队列”中等待。如果执行任务超时,则调用杀死MPI进程的脚本进行停止,并返回释放机器资源更新“机器列表”。

这样初步实现了作业提交,队列管理,机器资源分配和管理,MPI的sh脚本调度,调度结果检查等功能,代码里有详细注释,所用到统一配置,队列,分布式锁等分布式基础功能,前面有相关功能指南和demo。

运行步骤如下:

1)启动配置和队列服务:

  1. java cp fourinone.jar: ParkServerDemo

2)启动调度器或提交:

  1. jobjava cp fourinone.jar: MpiScheduler 0 /home/mpi/ mpi.sh param 40 60

整体代码如下:

  1. //MpiScheduler.java
  2. import com.fourinone.Contractor;
  3. import com.fourinone.WareHouse;
  4. import com.fourinone.BeanContext;
  5. import com.fourinone.ParkLocal;
  6. import com.fourinone.ObjectBean;
  7. import com.fourinone.StartResult;
  8. import com.fourinone.FileAdapter;
  9. import java.util.List;
  10. import java.util.ArrayList;
  11. import java.io.Serializable;
  12.  
  13. public class MpiScheduler extends Contractor
  14. {
  15. private static boolean runFlag = false;
  16. private static ParkLocal pl = BeanContext.getPark();//配置和队列管理
  17.  
  18. public WareHouse giveTask(WareHouse inhouse)
  19. {
  20. //初始化mpi机器列表配置
  21. pl.create("mpi", 0+"", new Integer(50));
  22. pl.create("mpi", 1+"", new Integer(100));
  23. pl.create("mpi", 2+"", new Integer(50));
  24.  
  25. ArrayList<StartResult<Integer>> rsarr = new ArrayList<StartResult <Integer>>();//调度结果数组
  26. while(true){
  27. //根据队列是否存在作业和是否有足够机器判断调度
  28. for(int i=0;i<3;i++){
  29. ObjectBean job = receive(i+"");//不阻塞接收队列,获取到作业对象
  30. if(job!=null){
  31. WareHouse jobobj = (WareHouse)job.toObject();
  32. int num = jobobj.getStringInt("computnum");//获取申请机器数
  33. Integer iob = (Integer)(pl.get("mpi",i+"").toObject());//获取配置列表里 剩下的机器数量
  34. if(iob>=num){
  35. System.out.print(i+"type spent "+num+" computer to doTask");
  36. pl.delete(job.getDomain(), job.getNode());//删除队列
  37. pl.update("mpi", i+"", new Integer(iob-num));//更新机器列表
  38. System.out.println(", and remain "+(iob-num));
  39. //FileAdapter.createTempFile
  40. //调度job对象里的sh脚本
  41. StartResult<Integer> res = BeanContext.tryStart(new FileAdapter (jobobj.getString("shdir")),"sh", jobobj.getString("sh"), jobobj.getString("param"), jobobj.getString("computnum"), ">>log/"+i+".log", "2>>&1");
  42. res.setObj("job",jobobj);
  43. rsarr.add(res);//将调度结果添加到结果数组
  44. }//else System.out.println(i+"type remain "+iob+" is not enough for "+num);
  45. }
  46. }
  47.  
  48. //检查结果数组是否完成调度
  49. ArrayList<StartResult<Integer>> rmvrs = new ArrayList<StartResult<Integer >>();//记录完成的结果
  50. for(StartResult<Integer> rs:rsarr){
  51. WareHouse jobwh = (WareHouse)rs.getObj("job");
  52. int timeout = jobwh.getStringInt("timeout");
  53.  
  54. if(rs!=null&&rs.getStatus(StartResult.s(timeout))!=StartResult. NOTREADY){//检查结果是否完成或超时
  55. System.out.print("Result:"+rs.getResult());
  56. Integer iob = (Integer)(pl.get("mpi",jobwh.getString("mpiType")). toObject());
  57. Integer newiob = iob+jobwh.getStringInt("computnum");
  58. pl.update("mpi", jobwh.getString("mpiType"), newiob);
  59. System.out.println(", and remain "+newiob+" "+jobwh);
  60. rmvrs.add(rs);
  61. }
  62. }
  63. rsarr.removeAll(rmvrs);//完成从结果数组删除
  64. }
  65. }
  66.  
  67. //发送到队列
  68. public static void send(String queue, Object obj){
  69. pl.create(queue, (Serializable)obj);
  70. }
  71.  
  72. //从队列接收
  73. public static ObjectBean receive(String queue){
  74. ObjectBean ob=null;
  75. List<ObjectBean> oblist = pl.get(queue);
  76. if(oblist!=null)
  77. ob = oblist.get(0);
  78. return ob;
  79. }
  80.  
  81. public static void main(String[] args)
  82. {
  83. WareHouse jobwh = new WareHouse();
  84. jobwh.setString("mpiType",args[0]);//mpi计算类型
  85. jobwh.setString("shdir",args[1]);//sh脚本运行目录
  86. jobwh.setString("sh",args[2]);//sh脚本名称
  87. jobwh.setString("param",args[3]);//sh脚本参数
  88. jobwh.setString("computnum",args[4]);//申请计算机数量
  89. jobwh.setString("timeout",args[5]);//超时时间,单位为秒
  90. send(args[0],jobwh);//提交到队列
  91. System.out.println("submit job to queue:"+jobwh);
  92.  
  93. //判断锁是否作为调度服务,不作为调度服务提交job后直接退出
  94. ObjectBean oblock = pl.get("mpi","lock");
  95. if(oblock==null){
  96. pl.create("mpi", "lock", true, true);
  97. MpiScheduler a = new MpiScheduler();
  98. a.giveTask(null);
  99. }
  100. }
  101. }

关于MPI调度在7.4.1节“其他MPI作业资源调度技术”我们还会介绍市场上基于Torque等任务管理工具方式的做法。