5.4 如何实现主题订阅模式
我们可以将Domain视为订阅主题,将每个订阅者注册到Domain的节点(Node)上,发布者将消息逐一更新每个节点,订阅者监控每个属于自己的节点的变化事件获取订阅消息,收到后清空内容等待下一个消息,多个消息用一个arraylist存放。
图5-5 主题订阅模式实现
❏ Publisher:是一个主题发布者,它通过pl.get(topic)获取topic(主题)的所有订阅者节点,并将消息更新到每个节点上,如果有多个追加到arraylist存放。
❏ Subscriber:是一个消息订阅者,他通过subscrib(String topic,String subscribeName,LastestListener lister)实现消息订阅,其中3个参数分别是主题名、订阅者名称、事件处理实现。Subscriber实现了LastestListener事件处理接口happenLastest(LastestEvent le),这个接口会传入更新的节点内容对象,然后Subscriber用一个空的arraylist清空内容,等待下一次接收消息。happenLastest有个boolean返回值,如果返回false,它会一直监控变化,继续有新的变化时还会进行事件调用;如果返回true,它完成本次事件调用后就终止。
运行步骤:
1)启动ParkServerDemo(它的IP端口已经在配置文件指定):
- java -cp fourinone.jar; ParkServerDemo
2)运行Subscriber,因为Subscriber可以有多个,传入不同的subscribeName参数代表不同的Subscriber,如图5-6所示。
- java -cp fourinone.jar; Subscriber aaa
- java -cp fourinone.jar; Subscriber bbb
- java -cp fourinone.jar; Subscriber ccc
图5-6 Subscriber
我们启动了3个订阅者,名称依次为aaa、bbb、ccc,订阅者启动好后处于事件监听状态,等待发布者投递消息,如图5-6所示。
3)运行Publisher,结果如图5-7所示。
- java -cp fourinone.jar; Publisher
图5-7 Publisher
运行Publisher开始投递消息,投递完成后Publisher退出,我们看看各个订阅者的窗口显示如图5-8所示。
图5-8 订阅消息结果
可以看到,3个订阅者都显示出收到了发布者的"hello world"的消息。
完整demo源码如下:
- // ParkServerDemo
- import com.fourinone.BeanContext;
- public class ParkServerDemo
- {
- public static void main(String[] args)
- {
- BeanContext.startPark();
- }
- }
- // Subscriber
- import com.fourinone.BeanContext;
- import com.fourinone.ParkLocal;
- import com.fourinone.ObjectBean;
- import com.fourinone.LastestEvent;
- import com.fourinone.LastestListener;
- import java.util.ArrayList;
- public class Subscriber implements LastestListener
- {
- private static ParkLocal pl = BeanContext.getPark();
- public boolean happenLastest(LastestEvent le)
- {
- ObjectBean ob = (ObjectBean)le.getSource();
- ArrayList arr = (ArrayList)ob.toObject();
- System.out.println("published message:"+arr);
- ObjectBean newob = pl.update(ob.getDomain(), ob.getNode(), new ArrayList());
- le.setSource(newob);
- return false;
- }
- public static void subscrib(String topic, String subscribeName, LastestListener lister)
- {
- ArrayList arr = new ArrayList();
- ObjectBean ob = pl.create(topic, subscribeName, arr);
- pl.addLastestListener(topic, subscribeName, ob, lister);
- }
- public static void main(String[] args)
- {
- subscrib("topic1", args[0], new Subscriber());
- }
- }
- // Publisher
- import com.fourinone.BeanContext;
- import com.fourinone.ParkLocal;
- import com.fourinone.ObjectBean;
- import java.util.ArrayList;
- import java.util.List;
- public class Publisher
- {
- private static ParkLocal pl = BeanContext.getPark();
- public static Object publish(String topic, Object obj)
- {
- List<ObjectBean> oblist = pl.get(topic);
- if(oblist!=null)
- {
- for(ObjectBean ob:oblist)
- {
- ArrayList arr = (ArrayList)ob.toObject();
- arr.add(obj);
- pl.update(ob.getDomain(), ob.getNode(), arr);
- }
- }else return null;
- return obj;
- }
- public static void main(String[] args)
- {
- publish("topic1", "helloworld");
- }
- }
Fourinone不实现JMS的规范,不提供JMS的消息确认和消息过滤等特殊功能,不过开发者可以基于Fourinone去扩充自已的这些功能,包括MQ集群。如果需要事务处理可以将多个消息封装在一个集合内进行发送,上面的队列接收者收到消息后删除实际上是一种消息确认方式,也可以将业务逻辑处理完后再进行删除。如果需要持久保存消息可以在封装一层消息发送者,发送前后根据需要进行数据库或者文件持久保存。利用一个独立的Domain/Node建立队列或者主题的key隐射,再仿照上面分布式缓存的智能根据key定位服务器的做法实现集群管理。