2.5.2 工头工人计算模式更完整的示例

从前面章节的原理介绍里知道,我们现实中的分布式计算存在多个环节,比如有的任务拆分,有的计算结果合并,或者多个拆分和合并,它们之间是串行关系,也就是合并必须等待拆分和计算完成才能进行,同时每个拆分或者合并的任务又都是并行的过程。

CtorDemo:是包含了3个工头实例,对应3个环节,链式处理,实现过程获取到线上工人节点,进行调用,所有的分配任务和中间结果存储都由自己实现处理。

这里简单地将20条数据分配给多个工人处理。数据用data变量表示,j用来记录计算结果,如果j==20,标志结束。任务初始为一个id的字符传给工头实例1,工头加上自己名称的描述和数据data后传给工人处理,工人再加上自己的名称和处理信息返回给工头,工头实例1处理完再传给工头实例2,直到3个工头都链式处理完,这里将上一个工头的处理结果又当做下一个工头的输入。

2.5.2 工头工人计算模式更完整的示例 - 图1注意

工头和工人之间是异步调用,会马上返回,需要检查结果是否完成。

  1. WareHouse[] hmarr = new WareHouse[wks.length];

这里通过hmarr数组来记录每次每个工人任务分配的结果,需要轮循hmarr的每个结果是否已经计算完成,如果计算完成就设置为null,进行新的任务安排。

❏ WorkerDemo:是一个工人实现,工人可以指定某种类型,比如有的工人用于计算,有的用于合并,也都是自己实现。这里只是简单地在工头传入的id后加上自己的名称信息代表处理。

  1. waitWorking("localhost",Integer.parseInt(args[1]),"workdemo");

该方法进行任务等待,其中3个参数分别指定工人监听ip、工人监听端口、工人类型。

❏ ParkServerDemo:分布式计算过程的协同服务park。

另外,工头和工人之间的计算交互有两种模式,一种是工头直接调用工人,一种是通过park消息中枢调用工人,可以在配置文件里配置COMPUTEMODE的默认值进行指定,默认是直接调用方式。

部署:将CtorDemo、WorkerDemo、ParkServerDemo分别部署在不同机器或者同台机器不同进程,Worker可以有多个。

运行步骤:

1)启动ParkServerDemo(它的IP端口已经在配置文件的PARK部分的SERVERS指定):

  1. Java classpath fourinone.jar; ParkServerDemo

2)运行WorkerDemo,通过传入不同的端口和名称参数指定多个Worker,这里假设在同机演示,ip设置为localhost,如果如图2-14所示:

  1. java -cp fourinone.jar; WorkerDemo aaa 2008
  2. java -cp fourinone.jar; WorkerDemo bbb 2009
  3. java -cp fourinone.jar; WorkerDemo ccc 2010

2.5.2 工头工人计算模式更完整的示例 - 图2

图2-14 WorkerDemo

3)运行CtorDemo:

  1. java -cp fourinone.jar; CtorDemo

可以看到工头窗口的输出如图2-15所示。

2.5.2 工头工人计算模式更完整的示例 - 图3

图2-15 CtorDemo

总共三个包工头链式处理,先后将任务分配给3个工人并行执行,窗口内的信息"ThreeCtor16-ccc"代表“第三个工头的第16条任务分配给ccc工人执行”。最后将所有包工头的执行结果汇总输出。

三个工人窗口的信息输出如下:

2.5.2 工头工人计算模式更完整的示例 - 图4

图2-16 工人窗口结果

每个工人窗口输出了执行每个工头的任务,“bbb inhouse:ThreeCtor17”表示bbb工人执行第3个工头的第17条任务。可以看到每个工人都是以并行争抢方式去执行包工头的任务。

Demo完整源码如下:

  1. // ParkServerDemo
  2. import com.fourinone.BeanContext;
  3. public class ParkServerDemo{
  4. public static void main(String[] args){
  5. BeanContext.startPark();
  6. }
  7. }
  8.  
  9. // WorkerDemo
  10. import com.fourinone.MigrantWorker;
  11. import com.fourinone.WareHouse;
  12.  
  13. public class WorkerDemo extends MigrantWorker
  14. {
  15. private String workname;
  16. public WorkerDemo(String workname)
  17. {
  18. this.workname = workname;
  19. }
  20.  
  21. public WareHouse doTask(WareHouse inhouse)
  22. {
  23. String v = inhouse.getString("id");
  24. System.out.println(workname+" inhouse:"+v);
  25. return new WareHouse("id",v+"-"+workname+"-");
  26. }
  27.  
  28. public static void main(String[] args)
  29. {
  30. WorkerDemo wd = new WorkerDemo(args[0]);
  31. wd.waitWorking("localhost",Integer.parseInt(args[1]),"workdemo");
  32. }
  33. }
  34.  
  35. // CtorDemo
  36. import com.fourinone.Contractor;
  37. import com.fourinone.WareHouse;
  38. import com.fourinone.WorkerLocal;
  39. import java.util.ArrayList;
  40.  
  41. public class CtorDemo extends Contractor
  42. {
  43. private String ctorname;
  44.  
  45. CtorDemo(String ctorname)
  46. {
  47. this.ctorname = ctorname;
  48. }
  49.  
  50. public WareHouse giveTask(WareHouse inhouse)
  51. {
  52. WorkerLocal[] wks = getWaitingWorkers("workdemo");
  53. System.out.println("wks.length:"+wks.length);
  54.  
  55. String outStr = inhouse.getString("id");
  56. WareHouse[] hmarr = new WareHouse[wks.length];
  57.  
  58. int data=0;
  59. for(int j=0;j<20;)
  60. {
  61. for(int i=0;i<wks.length;i++){
  62. if(hmarr[i]==null){
  63. WareHouse wh = new WareHouse();
  64. wh.put("id",ctorname+(data++));
  65. hmarr[i] = wks[i].doTask(wh);
  66. }
  67. else if(hmarr[i].getStatus()!=WareHouse.NOTREADY)
  68. {
  69. System.out.println(hmarr[i]);
  70. outStr+=hmarr[i];
  71. hmarr[i]=null;
  72. j++;
  73. }
  74. }
  75. }
  76.  
  77. inhouse.setString("id", outStr);
  78. return inhouse;
  79. }
  80.  
  81. public static void main(String[] args)
  82. {
  83. Contractor a = new CtorDemo("OneCtor");
  84. a.toNext(new CtorDemo("TwoCtor")).toNext(new CtorDemo ("ThreeCtor"));
  85. WareHouse house = new WareHouse("id","begin ");
  86. System.out.println(a.giveTask(house,true));
  87. }
  88. }