2.6 实时流计算

实时流计算的场景:业务系统根据实时的操作,不断生成事件(消息/调用),然后引起一系列的处理分析,这个过程是分散在多台计算机上并行完成的,看上去就像事件连续不断地流经多个计算节点处理,形成一个实时流计算系统。

市场上流计算产品有很多,主要是通过消息中枢结合工人模式实现的,大致过程如下:

1)开发者实现好流程输入输出节点逻辑,上传job到任务生产者。

2)任务生产者将任务发送到ZooKeeper,然后监控任务状态。

3)任务消费者从ZooKeeper上获取任务。

4)任务消费者启动多个工人进程,每个进程又启动多个线程执行任务。

5)工人之间通过zeroMQ交互。

我们也可以做一个简单的流计算系统,做法跟上面有些不同:

1)首先不过多依赖ZooKeerper,任务的分配最好直接给到工人,并能直接监控工人完成状态,这样效率会更高。

2)工人之间直接通信,不依赖zeroMQ转发。

3)并行管理扁平化,多进程下再分多线程意义不大,增加管理成本,实际上一台机器8个进程,每个进程再开8个线程,总体跟8~10个进程或者线程的效果差不多(数量视机器性能不同)。

4)做成一个流计算系统,而不是平台。

设计思路:用工头去做任务生产和分配,用工人去做任务执行,为了达到流的效果,需要在工人里面调用工头的方式,将多个工人节点串起来,形成一个计算拓扑图。

下面程序演示了连续多个消息先发到一个工人节点A处理,然后再发到两个工人节点B并行处理的流计算过程,并且获取到最后处理结果打印输出(如果不需要获取结果可以直接返回)。

❏ StreamCtorA:工头A实现,它获取到线上工人A,然后将消息发给它处理,并轮循等待结果。工头A的main函数模拟了多个消息的连续调用。

❏ StreamWorkerA:工人A实现,它接收到工头A的消息进行处理,然后创建一个工头B,通过工头B将结果同时发给两个工人B处理,然后将结果返回工头A。

❏ StreamCtorB:工头B实现,它获取到线上两个工人B,调用doTaskBatch等待两个工人处理完成,然后返回结果给工人A。

❏ StreamWorkerB:工人B实现,它接收到任务消息后模拟处理后返回结果。

运行步骤(在本地模拟)如下:

1)启动ParkServerDemo(它的IP端口已经在配置文件指定)。

  1. java -cp fourinone.jar; ParkServerDemo

2)启动工人A。

  1. java -cp fourinone.jar; StreamWorkerA localhost 2008

3)启动两个工人B。

  1. java -cp fourinone.jar; StreamWorkerB localhost 2009
  2. java -cp fourinone.jar; StreamWorkerB localhost 2010

4)启动工头A。

  1. java -cp fourinone.jar; StreamCtorA

多机部署说明:StreamCtorA可以单独部署一台机器,StreamWorkerA和StreamCtorB部署一台机器,两个StreamWorkerB可以部署两台机器。

总结:如何选择计算平台和计算系统。

如果我们只有几台机器,但是每天有人开发不同的流处理应用要在这几台机器上运行,我们需要一个计算平台来管理好job,让开发者按照规范配置好流程和运行时节点申请,打包成job上传,然后平台根据每个job配置动态分配资源依次执行每个job内容。

如果我们的几台机器只为一个流处理业务服务,比如实时营销,我们需要一个流计算系统,按照业务流程部署好计算节点即可,不需要运行多个job和动态分配资源。按照计算平台的方式做只会增加复杂性,开发者也不清楚每台机器上到底运行了什么逻辑。

如果你想实现一个计算平台,可以参考动态部署和进程管理功能。

完整源码如下:

  1. // ParkServerDemo
  2. import com.fourinone.BeanContext;
  3. public class ParkServerDemo
  4. {
  5. public static void main(String[] args)
  6. {
  7. BeanContext.startPark();
  8. }
  9. }
  10. //StreamCtorA
  11. import com.fourinone.Contractor;
  12. import com.fourinone.WareHouse;
  13. import com.fourinone.WorkerLocal;
  14. import java.util.ArrayList;
  15.  
  16. public class StreamCtorA extends Contractor
  17. {
  18. public WareHouse giveTask(WareHouse inhouse)
  19. {
  20. WorkerLocal[] wks = getWaitingWorkers("StreamWorkerA");
  21. System.out.println("wks.length:"+wks.length);
  22.  
  23. WareHouse result = wks[0].doTask(inhouse);
  24. while(true){
  25. if(result.getStatus()!=WareHouse.NOTREADY)
  26. {
  27. break;
  28. }
  29. }
  30. return result;
  31. }
  32.  
  33. public static void main(String[] args)
  34. {
  35. StreamCtorA sc = new StreamCtorA();
  36. for(int i=0;i<10;i++){
  37. WareHouse msg = new WareHouse();
  38. msg.put("msg","hello"+i);
  39. WareHouse wh = sc.giveTask(msg);
  40. System.out.println(wh);
  41. }
  42. sc.exit();
  43. }
  44. }
  45. //StreamWorkerA
  46. import com.fourinone.MigrantWorker;
  47. import com.fourinone.WareHouse;
  48.  
  49. public class StreamWorkerA extends MigrantWorker
  50. {
  51. public WareHouse doTask(WareHouse inhouse)
  52. {
  53. System.out.println(inhouse);
  54. //do something
  55. StreamCtorB sc = new StreamCtorB();
  56. WareHouse msg = new WareHouse();
  57. msg.put("msg",inhouse.getString("msg")+",from StreamWorkerA");
  58. WareHouse wh = sc.giveTask(msg);
  59. sc.exit();
  60.  
  61. return wh;
  62. }
  63.  
  64. public static void main(String[] args)
  65. {
  66. StreamWorkerA wd = new StreamWorkerA();
  67. wd.waitWorking(args[0],Integer.parseInt(args[1]),"StreamWorkerA");
  68. }
  69. }
  70. //StreamCtorB
  71. import com.fourinone.Contractor;
  72. import com.fourinone.WareHouse;
  73. import com.fourinone.WorkerLocal;
  74. import java.util.ArrayList;
  75.  
  76. public class StreamCtorB extends Contractor
  77. {
  78. public WareHouse giveTask(WareHouse inhouse)
  79. {
  80. WorkerLocal[] wks = getWaitingWorkers("StreamWorkerB");
  81. System.out.println("wks.length:"+wks.length);
  82.  
  83. WareHouse[] hmarr = doTaskBatch(wks, inhouse);
  84.  
  85. WareHouse result = new WareHouse();
  86. result.put("B1",hmarr[0]);
  87. result.put("B2",hmarr[1]);
  88.  
  89. return result;
  90. }
  91. }
  92. //StreamWorkerB
  93. import com.fourinone.MigrantWorker;
  94. import com.fourinone.WareHouse;
  95.  
  96. public class StreamWorkerB extends MigrantWorker
  97. {
  98. public WareHouse doTask(WareHouse inhouse)
  99. {
  100. System.out.println(inhouse);
  101. //do something
  102. inhouse.put("msg",inhouse.getString("msg")+",from StreamWorkerB");
  103. return inhouse;
  104. }
  105.  
  106. public static void main(String[] args)
  107. {
  108. StreamWorkerB wd = new StreamWorkerB();
  109. wd.waitWorking(args[0],Integer.parseInt(args[1]),"StreamWorkerB");
  110. }
  111. }