2.5.3 工人合并互相say hello的示例

假设你已经看过前面的分布式计算上手demo指南,对Fourinone基本的分布式并行计算方式有了初步了解。

本demo演示了工头和几个工人之间互相sayhello的简单例子,从而了解到集群计算节点之间互相交互,以及工头批量处理和工人互相传递数据(多用于合并)的功能。

❏ HelloCtor:是一个工头实现,它实现giveTask接口,它首先通过getWaitingWorkers获取到一个线上工人的集合,然后通过doTaskBatch进行批量任务处理,这里工头向每个工人说句"hello"打招呼。doTaskBatch有两个参数,分别是工人集合和任务,该方法会等到每个工人都执行完该任务才返回,因此使用doTaskBatch不需要轮循检查每一个调用结果,它是一个批量处理。为了节省资源利用,工头运行结束后不会退出jvm,可以使用exit方法强行退出。

❏ HelloWorker:是一个工人实现,这里它实现了doTask和receive接口,分别用于被工头和其他工人调用。doTask实现了被工头调用执行任务的内容,这里该工人向工头和其他工人"say hello"招呼,并告诉自己的名字。它通过getWorkerElse获取到集群中除自己以外的其他工人,getWorkerElse可以传入一个参数指定工人类型,然后依次调用其他工人的receive方法传递信息。receive实现了该工人被其他工人调用的处理内容,参数WareHouse由其他工人传入,它返回一个boolean值,可以代表接收和处理是否成功。这里简单的将其他工人的问候输出。

运行步骤:

1)启动ParkServerDemo(它的IP端口已经在配置文件的PARK部分的SERVERS指定):

  1. Java classpath fourinone.jar; ParkServerDemo

2)运行一到多个HelloWorker(传入3个参数,依次是该工人的名字、IP或者域名、端口):

  1. java -cp fourinone.jar; HelloWorker aaa localhost 2008
  2. java -cp fourinone.jar; HelloWorker bbb localhost 2009

2.5.3 工人合并互相say hello的示例 - 图1

图2-17 HelloWorker

3)运行HelloCtor:

  1. java -cp fourinone.jar; HelloCtor

2.5.3 工人合并互相say hello的示例 - 图2

图2-18 HelloCtor

从上面可以看到,工头和每个工人都收到来自对方的hello招呼。

2.5.3 工人合并互相say hello的示例 - 图3注意

doTaskBatch会等集群中最慢的一个工人完成任务才统一返回,如果希望能让机器运行快的机器在完成后能马上又分配新的任务,而不用等待,实现能者多劳,可以不使用doTaskBatch,而采用逐个调用每个工人的doTask并轮循结果状态的方式实现,具体请参考前面的分布式计算完整demo。

实际上,工头对工人的调用是通过doTask,工人对工人的调用是通过receive。doTask用于工头分配任务,receive多用于工人之间合并传递数据,每个工人都可以同时向其他工人传递数据,并接收来自其他工人的数据。集群中每个工人向其他工人传递数据都完成了,也就意味着每个工人都接收完成了(详细“工人-工人”交互原理可参见2.1节)。

Demo完整源码如下:

  1. // ParkServerDemo
  2. import com.fourinone.BeanContext;
  3. public class ParkServerDemo{
  4. public static void main(String[] args){
  5. BeanContext.startPark();
  6. }
  7. }
  8.  
  9. // HelloWorker
  10. import com.fourinone.MigrantWorker;
  11. import com.fourinone.WareHouse;
  12. import com.fourinone.Workman;
  13.  
  14. public class HelloWorker extends MigrantWorker
  15. {
  16. private String name;
  17. public HelloWorker(String name){
  18. this.name = name;
  19. }
  20.  
  21. public WareHouse doTask(WareHouse inhouse)
  22. {
  23. System.out.println(inhouse.getString("word"));
  24. WareHouse wh = new WareHouse("word", "hello, i am "+name);
  25. Workman[] wms = getWorkerElse("helloworker");
  26. for(Workman wm:wms)
  27. wm.receive(wh);
  28. return wh;
  29. }
  30.  
  31. public boolean receive(WareHouse inhouse)
  32. {
  33. System.out.println(inhouse.getString("word"));
  34. return true;
  35. }
  36.  
  37. public static void main(String[] args)
  38. {
  39. HelloWorker mw = new HelloWorker(args[0]);
  40. mw.waitWorking(args[1],Integer.parseInt(args[2]),"helloworker");
  41. }
  42. }
  43.  
  44. // HelloCtor
  45. import com.fourinone.Contractor;
  46. import com.fourinone.WareHouse;
  47. import com.fourinone.WorkerLocal;
  48.  
  49. public class HelloCtor extends Contractor
  50. {
  51. public WareHouse giveTask(WareHouse inhouse)
  52. {
  53. WorkerLocal[] wks = getWaitingWorkers("helloworker");
  54. System.out.println("wks.length:"+wks.length);
  55. WareHouse wh = new WareHouse("word", "hello, i am your Contractor.");
  56. WareHouse[] hmarr = doTaskBatch(wks, wh);
  57.  
  58. for(WareHouse result:hmarr)
  59. System.out.println(result);
  60.  
  61. return null;
  62. }
  63.  
  64. public static void main(String[] args)
  65. {
  66. HelloCtor a = new HelloCtor();
  67. a.giveTask(null);
  68. a.exit();
  69. }
  70. }