2.5.7 计算过程中的故障和容灾处理
使用Fourinone可以完成大部分分布式并行计算需求,但是计算过程中的故障和容灾处理是怎么进行的呢,这里详细分析一下。
总的来说,Fourinone框架不会在设计中抛弃错误不处理或者容忍错误导致框架崩溃,框架通常会捕获所有的错误反馈给开发者去处理,但是框架本身不自作主张,替开发者考虑处理方案,只有这样框架才能从特定场景中抽象出来,给开发者更灵活的发挥和去满足各种更复杂业务容错情况。
那么框架究竟关注和不关注哪个层面的故障呢?
并行计算过程中,通常有两种类型的故障:一种是系统故障引起的计算中断(宕机和网络故障),一种是业务逻辑意义上的错误数据。前者是框架关注的,后者是业务逻辑开发者关注的。
系统故障导致网络断掉或者宕机,框架会捕获故障信息并通告,工头在检验工人执行状态时会获知,并进行相应的业务上的故障处理,比如重发或者单独记录日志。业务逻辑意义上的错误数据,通常在工人的业务实现逻辑里去判断,比如计算结果的金额为负数是一个不符合业务要求的错误数据,这个是由开发者去控制,框架不做业务逻辑上的错误处理。
针对故障,框架又是怎样容灾的呢?
通常一个典型的分布式计算结构,由工头、工人、职介所组成,我们详细分析一下这几个角色在故障时各自如何容灾:
工头是嵌入式的,他不是一个服务程序,由嵌入他的系统new工头类并管理他的生命周期,工头不存在恢复或者容灾的概念,就好比我们写一个Helloworld的main函数,很少考虑程序运行到hello,world没有输出时就宕机了。但是如果嵌入工头的系统是一个定时执行的计算任务时,也许要考虑容灾,因为涉及单点问题,可以让两个工头竞争一个分布式锁实现(详细参考3.6.2分布式锁demo)。
工人和职介所是服务程序,如果工人节点故障,职介所会实时感知,工头分配计算时会获取到最新活跃工人数量,如果是职介所节点故障,Fourinone实现了领导者选举机制,会实时切换到备份职介所上(详细参考3.6.1统一配置管理主备领导者切换)。
换句话说,如果一个工人节点在计算开始前发生故障不可用,工头通过getWaiting Workers获取可用工人时不会包括该工人节点,因为职介所会感知每个工人的可用状态。
如果工人在计算过程中发生故障,框架会进行截获,然后提前返回计算结果,并设置结果的状态为异常。
也就是正常完成计算时:result.getStatus()==WareHouse.READY
计算过程发生故障中断时:result.getStatus()==WareHouse.EXCEPTION
这样工头就可以根据检查结果的状态,来做故障时的容灾处理。
实际上也可以在工人的doTask实现方法内部捕捉业务异常,由开发者根据程序实现自由决定。
以下demo演示了Fourinone计算过程中的故障容灾处理:
❏ FaultCtor:是一个工头实现,它调用集群中一个工人doTask执行任务,然后轮询该结果,判断结果是否完成或者是否异常,如果结果状态为异常,则打印消息。实际上这里只是简单演示机制,现实场景中,可以将任务先记录,工人执行成功后再删除并跳转下一个任务,如果异常则继续重发其他工人执行该任务,或者采用其他故障策略,统一记录到错误日志,在其他时间再另行排查处理。
❏ FaultWorker:是一个工人实现,它模拟了一个任务执行,睡眠了8秒钟,然后再制造一个空指针异常。该工人模拟了两种系统异常状况,计算过程中可以关闭它,或者等待它运行到空指针异常查看效果,注意这里doTask本身是不抛出和捕捉异常的,由框架去处理。
运行步骤:
1)编译demo的java类:
- Javac –classpath fourinone.jar; *.java
2)启动ParkServerDemo(它的IP端口已经在配置文件的PARK部分的SERVERS指定)
- Java –classpath fourinone.jar; ParkServerDemo
3)运行FaultWorker(传入端口号参数)
- Java –classpath fourinone.jar; FaultWorker 2008
4)运行FaultCtor
- Java –classpath fourinone.jar; FaultCtor
运行后工人进入8秒中“任务执行”,这时可以将该工人进程关闭,然后会查看到工头界面输出something wrong about wks[0]result,说明框架已经屏蔽系统故障并反馈到任务结果的异常状态中,如果8秒中内不关闭,会引发另外一个空指针异常,产生同样的异常状态。
完整demo源码如下:
- // ParkServerDemo
- import com.fourinone.BeanContext;
- public class ParkServerDemo
- {
- public static void main(String[] args)
- {
- BeanContext.startPark();
- }
- }
- // FaultWorker
- import com.fourinone.MigrantWorker;
- import com.fourinone.WareHouse;
- import com.fourinone.Workman;
- public class FaultWorker extends MigrantWorker
- {
- public WareHouse doTask(WareHouse inhouse)
- {
- System.out.println(inhouse.getString("word"));
- try{Thread.sleep(8000L);}catch(Exception ex){}
- String[] strs = null;
- System.out.println(strs.length);
- WareHouse wh = new WareHouse("word", "hello ");
- return wh;
- }
- public static void main(String[] args)
- {
- FaultWorker mw = new FaultWorker();
- mw.waitWorking("localhost",Integer.parseInt(args[0]),"faultworker");
- }
- }
- // FaultCtor
- import com.fourinone.Contractor;
- import com.fourinone.WareHouse;
- import com.fourinone.WorkerLocal;
- import java.util.ArrayList;
- public class FaultCtor extends Contractor
- {
- public WareHouse giveTask(WareHouse inhouse)
- {
- WorkerLocal[] wks = getWaitingWorkers("faultworker");
- System.out.println("wks.length:"+wks.length);
- WareHouse wh = new WareHouse("word", "hello");
- WareHouse result = wks[0].doTask(wh);
- System.out.println("result:"+result);
- while(true){
- if(result.getStatus()==WareHouse.READY){
- System.out.println("result:"+result);
- break;
- }
- else if(result.getStatus()==WareHouse.EXCEPTION){
- System.out.println("something wrong about wks[0] result");
- //doTask(wh) again or put wh into log
- break;
- }
- }
- return null;
- }
- public static void main(String[] args)
- {
- FaultCtor a = new FaultCtor();
- a.giveTask(null);
- a.exit();
- }
- }