5.3 如何实现发送接收的队列模式
我们可以将Domain视为MQ队列,每个node为一个队列消息,检查Domain的变化来获取队列消息。
❏ Sender:是一个队列发送者,它发送消息的实现是在queue上创建一个匿名节点来存放消息
- pl.create(queue, (Serializable)obj);
❏ Receiver:是一个队列接收者,他轮循queue上有没有最新消息,有就取出,并删除该节点,注意它是每次获取第一个消息,这样保证消息读取的顺序。如图5-1所示。
图5-1 发送接收模式实现
运行步骤:
1)启动ParkServerDemo(它的IP端口已经在配置文件指定),结果如图5-2所示:
- java -cp fourinone.jar; ParkServerDemo
图5-2 ParkServerDemo
2)运行Sender,结果如图5-3所示:
- java -cp fourinone.jar; Sender
图5-3 Sender
在Sender的程序里,往"queue1"队列里发了"hello"、"world"、"mq"三个消息。
3)运行Receiver,结果如图5-4所示:
- java -cp fourinone.jar; Receiver
图5-4 Receiver
可以看到,按照发送的先后顺序,依次收到"hello"、"world"、"mq"三个消息。
我们打开Receiver程序可以看到:
- while(true)
- {
- oblist = pl.get(queue);
- if(oblist!=null)
- {
- ObjectBean ob = oblist.get(0);
- …
- pl.delete…
- break;
Receiver实际上轮循检查队列是否有消息,有的话每次取出所有队列的消息list,然后再取出第一个消息并删除,或许这样不是最高效,因为如果消息队列的消息太多,一次性取出队列所有消息会影响性能,但是这里旨在演示清楚MQ接收消息的实现,开发者明白了实现机制可以发挥自己的创造性,根据自己需求的消息特点针对性的设计MQ发送接收的实现,比如模仿商业MQ对每个消息队列的容量做一个限制等等。
完整demo源码如下:
- // ParkServerDemo
- import com.fourinone.BeanContext;
- public class ParkServerDemo
- {
- public static void main(String[] args)
- {
- BeanContext.startPark();
- }
- }
- // Sender
- import com.fourinone.BeanContext;
- import com.fourinone.ParkLocal;
- import com.fourinone.ObjectBean;
- import java.io.Serializable;
- public class Sender
- {
- private static ParkLocal pl = BeanContext.getPark();
- public static void send(String queue, Object obj)
- {
- pl.create(queue, (Serializable)obj);
- }
- public static void main(String[] args)
- {
- send("queue1","hello");
- send("queue1","world");
- send("queue1","mq");
- }
- }
- // Receiver
- import com.fourinone.BeanContext;
- import com.fourinone.ParkLocal;
- import com.fourinone.ObjectBean;
- import java.util.List;
- public class Receiver
- {
- private static ParkLocal pl = BeanContext.getPark();
- public static Object receive(String queue)
- {
- Object obj=null;
- List<ObjectBean> oblist = null;
- while(true)
- {
- oblist = pl.get(queue);
- if(oblist!=null)
- {
- ObjectBean ob = oblist.get(0);
- obj = ob.toObject();
- pl.delete(ob.getDomain(), ob.getNode());
- break;
- }
- }
- return obj;
- }
- public static void main(String[] args)
- {
- System.out.println(receive("queue1"));
- System.out.println(receive("queue1"));
- System.out.println(receive("queue1"));
- }
- }