4.2 大型分布式缓存系统实现过程

我们接着上一节的思路,如果是大型网站的缓存,单台ParkServer的压力不能承受,需要建立多台CacheServer,并使用CacheFacade进行负载均衡。CacheFacade会根据key自动寻找存储它的CacheServer,数据在多台CacheServer上是均匀分布的,虽然每台CacheServer的数据都不一样,但是每台CacheServer都可以有自己的备份服务器,CacheServer出现故障时,几乎实时就能切换到备份服务器处理请求,所以既能保证高性能又能保证高可靠。改进后的架构如图4-2所示。

4.2 大型分布式缓存系统实现过程 - 图1

图4-2 大型分布式缓存系统实现架构

Fourinone提供了FacadeServer的解决方案去解决大集群的分布式缓存,利用硬件负载均衡路由到一组Facade服务器上,Facade可以自动为缓存内容生成key,并根据key准确找到散落在背后的缓存集群的具体哪台服务器,当缓存服务器的容量到达限制时,可以自由扩容,不需要成倍扩容,因为Facade的算法会登记服务器扩容时间版本,并将key智能地跟这个时间匹配,这样在扩容后还能准确找到之前分配到的服务器。我们在4.4节还会详细介绍日期key取模算法,并介绍如何基于分布式缓存实现一个Session系统。

Facade服务器部署在一系列的服务器上,对应图4-2所示的架构图里的FacadeServer,缓存服务器对应图4-2中的CacheServer。

FacadeServer是一组相同的软件服务器,它们彼此可互相替换,也可以自由增加或减少,具体取决于请求读写量的高峰期规模,FacadeServer的作用是根据读写请求自动判断数据落到后端哪台CacheServer上,它在完成这个匹配计算的过程中不需要借助其他任何服务器,因此它不会有瓶颈限制。另外,FacadeServer并不限制只跟后端集群中的某几台CacheServer发生交互,它可以跟任何CacheServer交互,而且它有很强的故障处理能力和高可用性。当它准备交互的CacheServer主机出现故障时,它会转而请求备机,如果主机和备机都出现故障时,它会重新选择其他的CacheServer直到成功,并且当CacheServer扩充时FacadeServer也能准确判断新数据的写和旧数据的读归属哪台CacheServer。在物理上,每个FacadeServer独立部署在一台计算机服务器上。

CacheServer是一个可以水平扩充的cache集群,它可以根据缓存规模增大而增大,它的扩容是任意的,没有规则限制,可以每次扩容一台至多台。cache集群由多个单元组成,缓存的数据被FacadeServer水平切割存储在不同的单元里,虽然每个单元保存的数据都不同,但是每个单元都是主备结构,主备之间的数据是相同并且同步的。在物理上,每个CacheServer主备独立部署一台计算机服务器。

接下来的DEMO同时演示了小型缓存和大型缓存的使用:

❏ CachePutDemo:先将100条数据分布式存储在A、B、C、三台缓存Server中,然后再将这100条数据的key保存在ParkServer的小型缓存中。

❏ CacheGetDemo:先将100条数据的key从ParkServer中取出,再根据key从分布式缓存的A,B,C三台Server中取出。

运行步骤如下:

1)启动3个CacheServer进程,每个输入的参数分别为A、B、C:

  1. java -cp fourinone.jar; CacheServer A
  2. java -cp fourinone.jar; CacheServer B
  3. java -cp fourinone.jar; CacheServer C

2)启动完的结果如图4-3所示,三个CacheServer已经准备就绪。

4.2 大型分布式缓存系统实现过程 - 图2

图4-3 CacheServer

3)启动ParkServerDemo(它的IP端口已经在配置文件指定):

  1. java -cp fourinone.jar; ParkServerDemo

4)启动好一个新的ParkServer进程,如图4-4所示。

4.2 大型分布式缓存系统实现过程 - 图3

图4-4 ParkServerDemo

5)启动CacheFacadeDemo(它的IP端口已经在配置文件指定):

  1. java -cp fourinone.jar; CacheFacadeDemo

6)启动好一个FacadeServer如图4-5所示。

4.2 大型分布式缓存系统实现过程 - 图4

图4-5 CacheFacadeDemo

在三个CacheServer(用于分布式缓存)、1个ParkServer(用于小型缓存)、1个FacadeServer(用于负载均衡)启动完成后,我们已经搭建好一个分布式缓存DEMO,接下来我们就可以使用客户端写入和读取数据了。

7)运行CachePutDemo:

  1. java -cp fourinone.jar; CachePutDemo

插入100条数据到CacheServer中,并将返回的key保存到ParkServer,然后程序完成退出,如图4-6所示。

4.2 大型分布式缓存系统实现过程 - 图5

图4-6 CachePutDemo

8)运行CacheGetDemo:

  1. java -cp fourinone.jar; CacheGetDemo

可以看到,CacheGetDemo先从ParkServer里取出所有的key,再通过key到三个CacheServer上取出所有缓存的数据并显示出来,FacadeServer负责完成key到CacheServer的路由。如图4-7所示。

4.2 大型分布式缓存系统实现过程 - 图6

图4-7 CacheGetDemo

4.2 大型分布式缓存系统实现过程 - 图7注意

我们在CacheServer的代码里可以看到建立了三组主备关系的缓存服务器和端口(这里是在程序里指定了地址和端口,也可以在每个CacheServer各自config文件的CacheService部分配置)。

  1. String[][] cacheServerA = new
  2. String[][]{{"localhost","2000"},{"localhost","2001"}};
  3. String[][] cacheServerB = new
  4. String[][]{{"localhost","2002"},{"localhost","2003"}};
  5. String[][] cacheServerC = new
  6. String[][]{{"localhost","2004"},{"localhost","2005"}};

除此之外,FacadeServer还需要了解以上CacheServer加入集群的时间信息,我们找到config.xml配置里的CACHEGROUP部分:

  1. <PROPSROW DESC="CACHEGROUP">
  2. <STARTTIME>2010-01-01</STARTTIME>
  3. <GROUP> localhost:2000...@2010-01-01;localhost:2002...;localhost:2004...</GROUP>
  4. </PROPSROW>
  5. <PROPSROW DESC="CACHEGROUP">
  6. <STARTTIME>2018-05-01</STARTTIME>
  7. <GROUP>...</GROUP>
  8. </PROPSROW>

我们发现localhost:2000、localhost:2002、localhost:2004这三组CacheServer的加入集群时间都是在2010-01-01后2018-05-01前,它们应该属于第一组。这个分组配置信息是很重要的,会影响它们的key寻址路由。在后面的章节会在对照一致性哈希讲日期取模算法原理时,详细讲解日期分组的配置方法。

4.2 大型分布式缓存系统实现过程 - 图8提示

我们这里为了演示只有一个FacadeServer,如果有多个FacadeServer,它们各自的config.xml里面的CACHEGROUP配置要保持一致。

DEMO源码如下:

  1. // ParkServerDemo
  2. import com.fourinone.BeanContext;
  3. public class ParkServerDemo
  4. {
  5. public static void main(String[] args)
  6. {
  7. BeanContext.startPark();
  8. }
  9. }
  10.  
  11. // CacheFacadeDemo
  12. import com.fourinone.BeanContext;
  13. public class CacheFacadeDemo
  14. {
  15. public static void main(String[] args)
  16. {
  17. BeanContext.startCacheFacade();
  18. System.out.println("CacheFacade is ok...");
  19. }
  20. }
  21.  
  22. // CacheServer
  23. import com.fourinone.BeanContext;
  24. public class CacheServer
  25. {
  26. public static void main(String[] args)
  27. {
  28. String[][] cacheServerA = new String[][]{{"localhost","2000"},{"localhost"
  29. "2001"}};
  30. String[][] cacheServerB = new String[][]{{"localhost""2002"},
  31. {"localhost""2003"}};
  32. String[][] cacheServerC = new String[][]{{"localhost""2004"},
  33. {"localhost""2005"}};
  34.  
  35. String[][] cacheServer = null;
  36. if(args[0].equals("A"))
  37. cacheServer = cacheServerA;
  38. else if(args[0].equals("B"))
  39. cacheServer = cacheServerB;
  40. else if(args[0].equals("C"))
  41. cacheServer = cacheServerC;
  42.  
  43. BeanContext.startCache(cacheServer[0][0],Integer.parseInt(cacheServer[0] [1]),cacheServer);
  44. //如果使用配置文件配置地址端口,则使用BeanContext.startCache();启动
  45. }
  46. }
  47.  
  48. // CachePutDemo
  49. import com.fourinone.BeanContext;
  50. import com.fourinone.ParkLocal;
  51. import com.fourinone.CacheLocal;
  52.  
  53. public class CachePutDemo
  54. {
  55. public static void putSmallCache(String[] keyArray)
  56. {
  57. ParkLocal pl = BeanContext.getPark();
  58. pl.create("cache", "keyArray", keyArray);
  59. }
  60.  
  61. public static String[] putBigCache()
  62. {
  63. CacheLocal cc = BeanContext.getCache();
  64. String[] keyArray = new String[100];
  65. for(int i=0;i<100;i++)
  66. keyArray[i] = cc.add("key", "value"+i);
  67. return keyArray;
  68. }
  69.  
  70. public static void main(String[] args){
  71. String[] keyArray = putBigCache();
  72. putSmallCache(keyArray);
  73. }
  74. }
  75.  
  76. // CacheGetDemo
  77. import com.fourinone.BeanContext;
  78. import com.fourinone.ParkLocal;
  79. import com.fourinone.CacheLocal;
  80.  
  81. public class CacheGetDemo
  82. {
  83. public static String[] getSmallCache()
  84. {
  85. ParkLocal pl = BeanContext.getPark();
  86. return (String[])pl.get("cache", "keyArray").toObject();
  87. }
  88.  
  89. public static void getBigCache(String[] keyArray)
  90. {
  91. CacheLocal cc = BeanContext.getCache();
  92. for(String k:keyArray)
  93. System.out.println(cc.get(k, "key"));
  94. }
  95.  
  96. public static void main(String[] args){
  97. String[] keyArray = getSmallCache();
  98. getBigCache(keyArray);
  99. }
  100. }