5.3 如何实现发送接收的队列模式

我们可以将Domain视为MQ队列,每个node为一个队列消息,检查Domain的变化来获取队列消息。

❏ Sender:是一个队列发送者,它发送消息的实现是在queue上创建一个匿名节点来存放消息

  1. pl.create(queue, (Serializable)obj);

❏ Receiver:是一个队列接收者,他轮循queue上有没有最新消息,有就取出,并删除该节点,注意它是每次获取第一个消息,这样保证消息读取的顺序。如图5-1所示。

5.3 如何实现发送接收的队列模式 - 图1

图5-1 发送接收模式实现

运行步骤:

1)启动ParkServerDemo(它的IP端口已经在配置文件指定),结果如图5-2所示:

  1. java -cp fourinone.jar; ParkServerDemo

5.3 如何实现发送接收的队列模式 - 图2

图5-2 ParkServerDemo

2)运行Sender,结果如图5-3所示:

  1. java -cp fourinone.jar; Sender

5.3 如何实现发送接收的队列模式 - 图3

图5-3 Sender

在Sender的程序里,往"queue1"队列里发了"hello"、"world"、"mq"三个消息。

3)运行Receiver,结果如图5-4所示:

  1. java -cp fourinone.jar; Receiver

5.3 如何实现发送接收的队列模式 - 图4

图5-4 Receiver

可以看到,按照发送的先后顺序,依次收到"hello"、"world"、"mq"三个消息。

我们打开Receiver程序可以看到:

  1. while(true)
  2. {
  3. oblist = pl.get(queue);
  4. if(oblist!=null)
  5. {
  6. ObjectBean ob = oblist.get(0);
  7. pl.delete
  8. break;

Receiver实际上轮循检查队列是否有消息,有的话每次取出所有队列的消息list,然后再取出第一个消息并删除,或许这样不是最高效,因为如果消息队列的消息太多,一次性取出队列所有消息会影响性能,但是这里旨在演示清楚MQ接收消息的实现,开发者明白了实现机制可以发挥自己的创造性,根据自己需求的消息特点针对性的设计MQ发送接收的实现,比如模仿商业MQ对每个消息队列的容量做一个限制等等。

完整demo源码如下:

  1. // ParkServerDemo
  2. import com.fourinone.BeanContext;
  3. public class ParkServerDemo
  4. {
  5. public static void main(String[] args)
  6. {
  7. BeanContext.startPark();
  8. }
  9. }
  10.  
  11. // Sender
  12. import com.fourinone.BeanContext;
  13. import com.fourinone.ParkLocal;
  14. import com.fourinone.ObjectBean;
  15. import java.io.Serializable;
  16.  
  17. public class Sender
  18. {
  19. private static ParkLocal pl = BeanContext.getPark();
  20.  
  21. public static void send(String queue, Object obj)
  22. {
  23. pl.create(queue, (Serializable)obj);
  24. }
  25.  
  26. public static void main(String[] args)
  27. {
  28. send("queue1","hello");
  29. send("queue1","world");
  30. send("queue1","mq");
  31. }
  32. }
  33.  
  34. // Receiver
  35. import com.fourinone.BeanContext;
  36. import com.fourinone.ParkLocal;
  37. import com.fourinone.ObjectBean;
  38. import java.util.List;
  39.  
  40. public class Receiver
  41. {
  42. private static ParkLocal pl = BeanContext.getPark();
  43.  
  44. public static Object receive(String queue)
  45. {
  46. Object obj=null;
  47. List<ObjectBean> oblist = null;
  48. while(true)
  49. {
  50. oblist = pl.get(queue);
  51. if(oblist!=null)
  52. {
  53. ObjectBean ob = oblist.get(0);
  54. obj = ob.toObject();
  55. pl.delete(ob.getDomain(), ob.getNode());
  56. break;
  57. }
  58. }
  59. return obj;
  60. }
  61.  
  62. public static void main(String[] args)
  63. {
  64. System.out.println(receive("queue1"));
  65. System.out.println(receive("queue1"));
  66. System.out.println(receive("queue1"));
  67. }
  68. }