2.5.7 计算过程中的故障和容灾处理

使用Fourinone可以完成大部分分布式并行计算需求,但是计算过程中的故障和容灾处理是怎么进行的呢,这里详细分析一下。

总的来说,Fourinone框架不会在设计中抛弃错误不处理或者容忍错误导致框架崩溃,框架通常会捕获所有的错误反馈给开发者去处理,但是框架本身不自作主张,替开发者考虑处理方案,只有这样框架才能从特定场景中抽象出来,给开发者更灵活的发挥和去满足各种更复杂业务容错情况。

那么框架究竟关注和不关注哪个层面的故障呢?

并行计算过程中,通常有两种类型的故障:一种是系统故障引起的计算中断(宕机和网络故障),一种是业务逻辑意义上的错误数据。前者是框架关注的,后者是业务逻辑开发者关注的。

系统故障导致网络断掉或者宕机,框架会捕获故障信息并通告,工头在检验工人执行状态时会获知,并进行相应的业务上的故障处理,比如重发或者单独记录日志。业务逻辑意义上的错误数据,通常在工人的业务实现逻辑里去判断,比如计算结果的金额为负数是一个不符合业务要求的错误数据,这个是由开发者去控制,框架不做业务逻辑上的错误处理。

针对故障,框架又是怎样容灾的呢?

通常一个典型的分布式计算结构,由工头、工人、职介所组成,我们详细分析一下这几个角色在故障时各自如何容灾:

工头是嵌入式的,他不是一个服务程序,由嵌入他的系统new工头类并管理他的生命周期,工头不存在恢复或者容灾的概念,就好比我们写一个Helloworld的main函数,很少考虑程序运行到hello,world没有输出时就宕机了。但是如果嵌入工头的系统是一个定时执行的计算任务时,也许要考虑容灾,因为涉及单点问题,可以让两个工头竞争一个分布式锁实现(详细参考3.6.2分布式锁demo)。

工人和职介所是服务程序,如果工人节点故障,职介所会实时感知,工头分配计算时会获取到最新活跃工人数量,如果是职介所节点故障,Fourinone实现了领导者选举机制,会实时切换到备份职介所上(详细参考3.6.1统一配置管理主备领导者切换)。

换句话说,如果一个工人节点在计算开始前发生故障不可用,工头通过getWaiting Workers获取可用工人时不会包括该工人节点,因为职介所会感知每个工人的可用状态。

如果工人在计算过程中发生故障,框架会进行截获,然后提前返回计算结果,并设置结果的状态为异常。

也就是正常完成计算时:result.getStatus()==WareHouse.READY

计算过程发生故障中断时:result.getStatus()==WareHouse.EXCEPTION

这样工头就可以根据检查结果的状态,来做故障时的容灾处理。

实际上也可以在工人的doTask实现方法内部捕捉业务异常,由开发者根据程序实现自由决定。

以下demo演示了Fourinone计算过程中的故障容灾处理:

❏ FaultCtor:是一个工头实现,它调用集群中一个工人doTask执行任务,然后轮询该结果,判断结果是否完成或者是否异常,如果结果状态为异常,则打印消息。实际上这里只是简单演示机制,现实场景中,可以将任务先记录,工人执行成功后再删除并跳转下一个任务,如果异常则继续重发其他工人执行该任务,或者采用其他故障策略,统一记录到错误日志,在其他时间再另行排查处理。

❏ FaultWorker:是一个工人实现,它模拟了一个任务执行,睡眠了8秒钟,然后再制造一个空指针异常。该工人模拟了两种系统异常状况,计算过程中可以关闭它,或者等待它运行到空指针异常查看效果,注意这里doTask本身是不抛出和捕捉异常的,由框架去处理。

运行步骤:

1)编译demo的java类:

  1. Javac classpath fourinone.jar; *.java

2)启动ParkServerDemo(它的IP端口已经在配置文件的PARK部分的SERVERS指定)

  1. Java classpath fourinone.jar; ParkServerDemo

3)运行FaultWorker(传入端口号参数)

  1. Java classpath fourinone.jar; FaultWorker 2008

4)运行FaultCtor

  1. Java classpath fourinone.jar; FaultCtor

运行后工人进入8秒中“任务执行”,这时可以将该工人进程关闭,然后会查看到工头界面输出something wrong about wks[0]result,说明框架已经屏蔽系统故障并反馈到任务结果的异常状态中,如果8秒中内不关闭,会引发另外一个空指针异常,产生同样的异常状态。

完整demo源码如下:

  1. // ParkServerDemo
  2. import com.fourinone.BeanContext;
  3. public class ParkServerDemo
  4. {
  5. public static void main(String[] args)
  6. {
  7. BeanContext.startPark();
  8. }
  9. }
  10.  
  11. // FaultWorker
  12. import com.fourinone.MigrantWorker;
  13. import com.fourinone.WareHouse;
  14. import com.fourinone.Workman;
  15.  
  16. public class FaultWorker extends MigrantWorker
  17. {
  18. public WareHouse doTask(WareHouse inhouse)
  19. {
  20. System.out.println(inhouse.getString("word"));
  21. try{Thread.sleep(8000L);}catch(Exception ex){}
  22. String[] strs = null;
  23. System.out.println(strs.length);
  24. WareHouse wh = new WareHouse("word", "hello ");
  25. return wh;
  26. }
  27.  
  28. public static void main(String[] args)
  29. {
  30. FaultWorker mw = new FaultWorker();
  31. mw.waitWorking("localhost",Integer.parseInt(args[0]),"faultworker");
  32. }
  33. }
  34.  
  35. // FaultCtor
  36. import com.fourinone.Contractor;
  37. import com.fourinone.WareHouse;
  38. import com.fourinone.WorkerLocal;
  39. import java.util.ArrayList;
  40.  
  41. public class FaultCtor extends Contractor
  42. {
  43. public WareHouse giveTask(WareHouse inhouse)
  44. {
  45. WorkerLocal[] wks = getWaitingWorkers("faultworker");
  46. System.out.println("wks.length:"+wks.length);
  47.  
  48. WareHouse wh = new WareHouse("word", "hello");
  49. WareHouse result = wks[0].doTask(wh);
  50. System.out.println("result:"+result);
  51.  
  52. while(true){
  53. if(result.getStatus()==WareHouse.READY){
  54. System.out.println("result:"+result);
  55. break;
  56. }
  57. else if(result.getStatus()==WareHouse.EXCEPTION){
  58. System.out.println("something wrong about wks[0] result");
  59. //doTask(wh) again or put wh into log
  60. break;
  61. }
  62. }
  63.  
  64. return null;
  65. }
  66.  
  67. public static void main(String[] args)
  68. {
  69. FaultCtor a = new FaultCtor();
  70. a.giveTask(null);
  71. a.exit();
  72. }
  73. }