5.4 如何实现主题订阅模式

我们可以将Domain视为订阅主题,将每个订阅者注册到Domain的节点(Node)上,发布者将消息逐一更新每个节点,订阅者监控每个属于自己的节点的变化事件获取订阅消息,收到后清空内容等待下一个消息,多个消息用一个arraylist存放。

5.4 如何实现主题订阅模式 - 图1

图5-5 主题订阅模式实现

❏ Publisher:是一个主题发布者,它通过pl.get(topic)获取topic(主题)的所有订阅者节点,并将消息更新到每个节点上,如果有多个追加到arraylist存放。

❏ Subscriber:是一个消息订阅者,他通过subscrib(String topic,String subscribeName,LastestListener lister)实现消息订阅,其中3个参数分别是主题名、订阅者名称、事件处理实现。Subscriber实现了LastestListener事件处理接口happenLastest(LastestEvent le),这个接口会传入更新的节点内容对象,然后Subscriber用一个空的arraylist清空内容,等待下一次接收消息。happenLastest有个boolean返回值,如果返回false,它会一直监控变化,继续有新的变化时还会进行事件调用;如果返回true,它完成本次事件调用后就终止。

运行步骤:

1)启动ParkServerDemo(它的IP端口已经在配置文件指定):

  1. java -cp fourinone.jar; ParkServerDemo

2)运行Subscriber,因为Subscriber可以有多个,传入不同的subscribeName参数代表不同的Subscriber,如图5-6所示。

  1. java -cp fourinone.jar; Subscriber aaa
  2. java -cp fourinone.jar; Subscriber bbb
  3. java -cp fourinone.jar; Subscriber ccc

5.4 如何实现主题订阅模式 - 图2

图5-6 Subscriber

我们启动了3个订阅者,名称依次为aaa、bbb、ccc,订阅者启动好后处于事件监听状态,等待发布者投递消息,如图5-6所示。

3)运行Publisher,结果如图5-7所示。

  1. java -cp fourinone.jar; Publisher

5.4 如何实现主题订阅模式 - 图3

图5-7 Publisher

运行Publisher开始投递消息,投递完成后Publisher退出,我们看看各个订阅者的窗口显示如图5-8所示。

5.4 如何实现主题订阅模式 - 图4

图5-8 订阅消息结果

可以看到,3个订阅者都显示出收到了发布者的"hello world"的消息。

完整demo源码如下:

  1. // ParkServerDemo
  2. import com.fourinone.BeanContext;
  3. public class ParkServerDemo
  4. {
  5. public static void main(String[] args)
  6. {
  7. BeanContext.startPark();
  8. }
  9. }
  10.  
  11. // Subscriber
  12. import com.fourinone.BeanContext;
  13. import com.fourinone.ParkLocal;
  14. import com.fourinone.ObjectBean;
  15. import com.fourinone.LastestEvent;
  16. import com.fourinone.LastestListener;
  17. import java.util.ArrayList;
  18.  
  19. public class Subscriber implements LastestListener
  20. {
  21. private static ParkLocal pl = BeanContext.getPark();
  22.  
  23. public boolean happenLastest(LastestEvent le)
  24. {
  25. ObjectBean ob = (ObjectBean)le.getSource();
  26. ArrayList arr = (ArrayList)ob.toObject();
  27. System.out.println("published message:"+arr);
  28. ObjectBean newob = pl.update(ob.getDomain(), ob.getNode(), new ArrayList());
  29. le.setSource(newob);
  30. return false;
  31. }
  32.  
  33. public static void subscrib(String topic, String subscribeName, LastestListener lister)
  34. {
  35. ArrayList arr = new ArrayList();
  36. ObjectBean ob = pl.create(topic, subscribeName, arr);
  37. pl.addLastestListener(topic, subscribeName, ob, lister);
  38. }
  39.  
  40. public static void main(String[] args)
  41. {
  42. subscrib("topic1", args[0], new Subscriber());
  43. }
  44. }
  45.  
  46. // Publisher
  47. import com.fourinone.BeanContext;
  48. import com.fourinone.ParkLocal;
  49. import com.fourinone.ObjectBean;
  50. import java.util.ArrayList;
  51. import java.util.List;
  52.  
  53. public class Publisher
  54. {
  55. private static ParkLocal pl = BeanContext.getPark();
  56.  
  57. public static Object publish(String topic, Object obj)
  58. {
  59. List<ObjectBean> oblist = pl.get(topic);
  60. if(oblist!=null)
  61. {
  62. for(ObjectBean ob:oblist)
  63. {
  64. ArrayList arr = (ArrayList)ob.toObject();
  65. arr.add(obj);
  66. pl.update(ob.getDomain(), ob.getNode(), arr);
  67. }
  68. }else return null;
  69. return obj;
  70. }
  71.  
  72. public static void main(String[] args)
  73. {
  74. publish("topic1", "helloworld");
  75. }
  76. }

Fourinone不实现JMS的规范,不提供JMS的消息确认和消息过滤等特殊功能,不过开发者可以基于Fourinone去扩充自已的这些功能,包括MQ集群。如果需要事务处理可以将多个消息封装在一个集合内进行发送,上面的队列接收者收到消息后删除实际上是一种消息确认方式,也可以将业务逻辑处理完后再进行删除。如果需要持久保存消息可以在封装一层消息发送者,发送前后根据需要进行数据库或者文件持久保存。利用一个独立的Domain/Node建立队列或者主题的key隐射,再仿照上面分布式缓存的智能根据key定位服务器的做法实现集群管理。