2.5.2 工头工人计算模式更完整的示例
从前面章节的原理介绍里知道,我们现实中的分布式计算存在多个环节,比如有的任务拆分,有的计算结果合并,或者多个拆分和合并,它们之间是串行关系,也就是合并必须等待拆分和计算完成才能进行,同时每个拆分或者合并的任务又都是并行的过程。
CtorDemo:是包含了3个工头实例,对应3个环节,链式处理,实现过程获取到线上工人节点,进行调用,所有的分配任务和中间结果存储都由自己实现处理。
这里简单地将20条数据分配给多个工人处理。数据用data变量表示,j用来记录计算结果,如果j==20,标志结束。任务初始为一个id的字符传给工头实例1,工头加上自己名称的描述和数据data后传给工人处理,工人再加上自己的名称和处理信息返回给工头,工头实例1处理完再传给工头实例2,直到3个工头都链式处理完,这里将上一个工头的处理结果又当做下一个工头的输入。
注意
工头和工人之间是异步调用,会马上返回,需要检查结果是否完成。
- WareHouse[] hmarr = new WareHouse[wks.length];
这里通过hmarr数组来记录每次每个工人任务分配的结果,需要轮循hmarr的每个结果是否已经计算完成,如果计算完成就设置为null,进行新的任务安排。
❏ WorkerDemo:是一个工人实现,工人可以指定某种类型,比如有的工人用于计算,有的用于合并,也都是自己实现。这里只是简单地在工头传入的id后加上自己的名称信息代表处理。
- waitWorking("localhost",Integer.parseInt(args[1]),"workdemo");
该方法进行任务等待,其中3个参数分别指定工人监听ip、工人监听端口、工人类型。
❏ ParkServerDemo:分布式计算过程的协同服务park。
另外,工头和工人之间的计算交互有两种模式,一种是工头直接调用工人,一种是通过park消息中枢调用工人,可以在配置文件里配置COMPUTEMODE的默认值进行指定,默认是直接调用方式。
部署:将CtorDemo、WorkerDemo、ParkServerDemo分别部署在不同机器或者同台机器不同进程,Worker可以有多个。
运行步骤:
1)启动ParkServerDemo(它的IP端口已经在配置文件的PARK部分的SERVERS指定):
- Java –classpath fourinone.jar; ParkServerDemo
2)运行WorkerDemo,通过传入不同的端口和名称参数指定多个Worker,这里假设在同机演示,ip设置为localhost,如果如图2-14所示:
- java -cp fourinone.jar; WorkerDemo aaa 2008
- java -cp fourinone.jar; WorkerDemo bbb 2009
- java -cp fourinone.jar; WorkerDemo ccc 2010
图2-14 WorkerDemo
3)运行CtorDemo:
- java -cp fourinone.jar; CtorDemo
可以看到工头窗口的输出如图2-15所示。
图2-15 CtorDemo
总共三个包工头链式处理,先后将任务分配给3个工人并行执行,窗口内的信息"ThreeCtor16-ccc"代表“第三个工头的第16条任务分配给ccc工人执行”。最后将所有包工头的执行结果汇总输出。
三个工人窗口的信息输出如下:
图2-16 工人窗口结果
每个工人窗口输出了执行每个工头的任务,“bbb inhouse:ThreeCtor17”表示bbb工人执行第3个工头的第17条任务。可以看到每个工人都是以并行争抢方式去执行包工头的任务。
Demo完整源码如下:
- // ParkServerDemo
- import com.fourinone.BeanContext;
- public class ParkServerDemo{
- public static void main(String[] args){
- BeanContext.startPark();
- }
- }
- // WorkerDemo
- import com.fourinone.MigrantWorker;
- import com.fourinone.WareHouse;
- public class WorkerDemo extends MigrantWorker
- {
- private String workname;
- public WorkerDemo(String workname)
- {
- this.workname = workname;
- }
- public WareHouse doTask(WareHouse inhouse)
- {
- String v = inhouse.getString("id");
- System.out.println(workname+" inhouse:"+v);
- return new WareHouse("id",v+"-"+workname+"-");
- }
- public static void main(String[] args)
- {
- WorkerDemo wd = new WorkerDemo(args[0]);
- wd.waitWorking("localhost",Integer.parseInt(args[1]),"workdemo");
- }
- }
- // CtorDemo
- import com.fourinone.Contractor;
- import com.fourinone.WareHouse;
- import com.fourinone.WorkerLocal;
- import java.util.ArrayList;
- public class CtorDemo extends Contractor
- {
- private String ctorname;
- CtorDemo(String ctorname)
- {
- this.ctorname = ctorname;
- }
- public WareHouse giveTask(WareHouse inhouse)
- {
- WorkerLocal[] wks = getWaitingWorkers("workdemo");
- System.out.println("wks.length:"+wks.length);
- String outStr = inhouse.getString("id");
- WareHouse[] hmarr = new WareHouse[wks.length];
- int data=0;
- for(int j=0;j<20;)
- {
- for(int i=0;i<wks.length;i++){
- if(hmarr[i]==null){
- WareHouse wh = new WareHouse();
- wh.put("id",ctorname+(data++));
- hmarr[i] = wks[i].doTask(wh);
- }
- else if(hmarr[i].getStatus()!=WareHouse.NOTREADY)
- {
- System.out.println(hmarr[i]);
- outStr+=hmarr[i];
- hmarr[i]=null;
- j++;
- }
- }
- }
- inhouse.setString("id", outStr);
- return inhouse;
- }
- public static void main(String[] args)
- {
- Contractor a = new CtorDemo("OneCtor");
- a.toNext(new CtorDemo("TwoCtor")).toNext(new CtorDemo ("ThreeCtor"));
- WareHouse house = new WareHouse("id","begin ");
- System.out.println(a.giveTask(house,true));
- }
- }