2.5.11 如何中止工人计算和超时中止

在并行计算的典型应用中,通常会让多计算机各自寻解或者并行搜索,当其中一台计算机找到解后,应该中止其他计算机继续计算;或者就是在某一个时间范围内寻找,超时便要全部中止。所以计算过程中对工人计算的中止功能是非常有用的(见图2-28)。具体实施如下:

❏ 工人的中止功能是通过提供interrupt和isInterrupted方法实现的。

❏ 包工头调用interrupt方法中止工人计算。

❏ 工人检查isInterrupted方法的boolean返回值响应中止请求并返回结果。

2.5.11 如何中止工人计算和超时中止 - 图1

图2-28 工人计算中止

详细过程描述:包工头在调用工人的doTask后,轮询结果是否完成,如果其中一个完成,可以通过调用其他工人的interrupt方法进行中止任务,在工人的doTask实现里面,需要通过isInterrupted判断是否被中止,如果是,停止计算并返回。

要注意以下问题:

1)跟强行杀死进程或者线程的实现方式不同(请区别“一次性启动多个工人”里谈到的杀死进程方式),interrupt在这里实际上是一种请求通知机制,由工头根据计算过程的进展发起中止请求,由工人在doTask实现逻辑中检查isInterrupted响应该请求(如果工人不听指挥拒绝执行请求,那包工头也没办法)。

2)为什么要这样设计而不让框架直接强行杀死进程呢,这是为了让开发者能自由控制中止的策略和可以在中止时做一些保存/备份/日志等善后工作,并可以决定正常运行完成和中止时分别返回什么结果给工头,从而有更大处理的灵活度。

如果工人是公共服务状态(配置文件worker部分的<SERVICE>true</SERVICE>,默认为false),调用interrupt无效,并会产生InterruptedException,因为公共服务状态下的工人不能被中止,它需要一直做为服务程序存在。

另外,工头对interrupt的调用需要在doTask的执行过程中,否则没有效果,因为interrupt调用后会检查当前是否有任务在执行,如果尚未发生任务调用或者已经执行完成,便认为无效。

这个demo演示了一个查找随机数的例子,由几台机器同时各自获取10万以内的随机数,看谁最先获取到888这个数字,一旦某台机器获取到,中止其他机器的寻找,整体完成计算。

CancelCtor:是一个工头实现,它的giveTask实现中,首先获取集群工人数量,然后调用各工人的doTask开始计算,并将各自结果保存起来轮询检查,它的程序结构使用了3个for循环完成检查和中止,第一个for用于记录完成的结果数,第二个for轮询各结果是否完成,当找到888的结果后,用第三个for中止掉其他工人计算。

CancelWorker:是一个工人实现,它的doTask实现中通过一个while循环不断的生产10万以内的随机数,然后判断是否等于888,如果找到就返回结果。在while循环同时,它还会不断检查isInterrupted,如果其他工人已经找到该数字,它便会马上中止计算并返回。

运行步骤(在本机模拟):

1)编译demo的Java类:

  1. Javac classpath fourinone.jar; *.java

2)启动ParkServerDemo(它的IP端口已经在配置文件的PARK部分的SERVERS指定):

  1. Java classpath fourinone.jar; ParkServerDemo

3)运行一到多个CancelWorker(传入一个端口号参数区分不同工人,结果如图2-29所示):

  1. Java classpath fourinone.jar; CancelWorker 2008
  2. Java classpath fourinone.jar; CancelWorker 2009
  3. Java classpath fourinone.jar; CancelWorker 2010

2.5.11 如何中止工人计算和超时中止 - 图2

图2-29 CancelWorker

2.5.11 如何中止工人计算和超时中止 - 图3

4)运行CancelCtor,结果如图2-30所示:

  1. Java classpath fourinone.jar; CancelCtor

图2-30 CancelCtor

可以看到,在第1个工人找到888后,整体结束计算,工头将结果输出。

完整demo源码如下:

  1. // ParkServerDemo
  2. import com.fourinone.BeanContext;
  3. public class ParkServerDemo
  4. {
  5. public static void main(String[] args)
  6. {
  7. BeanContext.startPark();
  8. }
  9. }
  10.  
  11. // CancelWorker
  12. import com.fourinone.MigrantWorker;
  13. import com.fourinone.WareHouse;
  14. import com.fourinone.FileAdapter;
  15. import java.util.List;
  16. import java.util.Collections;
  17. import java.util.Random;
  18.  
  19. public class CancelWorker extends MigrantWorker
  20. {
  21. public WareHouse doTask(WareHouse inhouse)
  22. {
  23. int n = 0;
  24. Random rd = new Random();
  25. while(!isInterrupted()){
  26. n=rd.nextInt(100000);
  27. System.out.println(n);
  28. if(n==888
  29. break;
  30. }
  31. return new WareHouse("result", n);
  32. }
  33.  
  34. public static void main(String[] args)
  35. {
  36. CancelWorker mw = new CancelWorker();
  37. mw.waitWorking("localhost",Integer.parseInt(args[0]),"cancelworker");
  38. }
  39. }
  40.  
  41. // CancelCtor
  42. import com.fourinone.Contractor;
  43. import com.fourinone.WareHouse;
  44. import com.fourinone.WorkerLocal;
  45. import java.util.ArrayList;
  46.  
  47. public class CancelCtor extends Contractor
  48. {
  49. public WareHouse giveTask(WareHouse inhouse)
  50. {
  51. WorkerLocal[] wks = getWaitingWorkers("cancelworker");
  52. System.out.println("wks.length:"+wks.length);
  53.  
  54. WareHouse[] hmarr = new WareHouse[wks.length];
  55. for(int i=0;i<wks.length;i++){
  56. hmarr[i] = wks[i].doTask(new WareHouse());//3
  57. }
  58.  
  59. for(int j=0;j<hmarr.length;){//记录完成的结果数
  60. for(int i=0;i<wks.length;i++){//检查结果是否完成
  61. if(hmarr[i]!=null&&hmarr[i].getStatus()==WareHouse.READY){
  62. System.out.println(i+":"+hmarr[i]);
  63. //找到888后,停止其他工人计算
  64. if((Integer)hmarr[i].getObj("result")==888){
  65. for(int k=0;k<wks.length;k++){
  66. if(k!=i)
  67. wks[k].interrupt();
  68. }
  69. }
  70. hmarr[i]=null;
  71. j++;
  72. }
  73. }
  74. }
  75.  
  76. return null;
  77. }
  78.  
  79. public static void main(String[] args)
  80. {
  81. CancelCtor a = new CancelCtor();
  82. a.giveTask(null);
  83. a.exit();
  84. }
  85. }

如果我们需要定义一个计算时间,超时便中止计算,可以通过四种方法完成:

方法一:工头分配完任务后开始看时间,超时便指挥各工人停止。

工头自行检查超时,工头在调用doTask时开始计时,每次轮询结果时检查是否超时,超时便调用interrupt通知工人进行中止。

方法二:工头要求工人自觉,工人自己看时间,超时自觉停止。

工人自行检查超时,在工人的doTask实现逻辑里加入计时检查,如果超时便退出返回结果(注意和方法三的区别)。

方法三:框架调用超时抛异常方式。

工人不自行检查超时,框架检查到工人doTask计算超时抛出系统异常,中断任务调用。如果要使用该方式,请将配置文件config.xml中:

  1. <PROPSROW DESC="WORKER">
  2. <TIMEOUT DESC="FALSE">2</TIMEOUT>
  3. </PROPSROW>

TIMEOUT DESC设置为TRUE,2表示超时时间,小时为单位,这里默认是2小时,也就是如果工人执行doTask超过2小时仍未完成,框架放弃调用,抛出系统异常。

方法四:doTask的interrupt方式。

为了方便超时中止,框架也提供了一个便利方法:

  1. public WareHouse doTask(WareHouse inhouse, long timeoutseconds);

也就是在调用doTask时,可以传入一个超时时间参数(秒为单位),它的实际效果就相当于doTask+interrupt,超时自动调用interrupt请求工人中止。