java实现memcache服务器的示例代码

时间:2022-04-20 05:47:16

什么是memcache

memcache集群环境下缓存解决方案

memcache是一个高性能的分布式的内存对象缓存系统,通过在内存里维护一个统一的巨大的hash表,它能够用来存储各种格式的数据,包括图像、视频、文件以及数据库检索的结果等。简单的说就是将数据调用到内存中,然后从内存中读取,从而大大提高读取速度。  

 memcache是danga的一个项目,最早是livejournal 服务的,最初为了加速 livejournal 访问速度而开发的,后来被很多大型的网站采用。  

memcached是以守护程序方式运行于一个或多个服务器中,随时会接收客户端的连接和操作

为什么会有memcache和memcached两种名称?

其实memcache是这个项目的名称,而memcached是它服务器端的主程序文件名,知道我的意思了吧。一个是项目名称,一个是主程序文件名,在网上看到了很多人不明白,于是混用了。 

memcached是高性能的,分布式的内存对象缓存系统,用于在动态应用中减少数据库负载,提升访问速度。memcached由danga interactive开发,用于提升livejournal.com访问速度的。lj每秒动态页面访问量几千次,用户700万。memcached将数据库负载大幅度降低,更好的分配资源,更快速访问。

这篇文章将会涉及以下内容:

  1. java socket多线程服务器
  2. java io
  3. concurrency
  4. memcache特性和协议

memcache

memcache is an in-memory key-value store for small chunks of arbitrary data (strings, objects) from results of databasecalls, api calls, or page rendering.

即内存缓存数据库,是一个键值对数据库。该数据库的存在是为了将从其他服务中获取的数据暂存在内存中,在重复访问时可以直接从命中的缓存中返回。既加快了访问速率,也减少了其他服务的负载。这里将实现一个单服务器版本的memcache,并且支持多个客户端的同时连接。

客户端将与服务器建立telnet连接,然后按照memcache协议与服务器缓存进行交互。这里实现的指令为get,set和del。先来看一下各个指令的格式

set

set属于存储指令,存储指令的特点时,第一行输入基本信息,第二行输入其对应的value值。

set <key> <flags> <exptime> <bytes> [noreply]\r\n
<value>\r\n

如果存储成功,将会返回stored,如果指令中包含noreply属性,则服务器将不会返回信息。

该指令中每个域的内容如下:

  1. key: 键
  2. flags: 16位无符号整数,会在get时随键值对返回
  3. exptime: 过期时间,以秒为单位
  4. bytes:即将发送的value的长度
  5. noreply:是否需要服务器响应,为可选属性
  6.  

如果指令不符合标准,服务器将会返回error。

get

get属于获取指令,该指令特点如下:

get <key>*\r\n

它支持传入多个key的值,如果缓存命中了一个或者多个key,则会返回相应的数据,并以end作为结尾。如果没有命中,则返回的消息中不包含该key对应的值。格式如下:

?
1
2
3
4
5
6
value <key> <flags> <bytes>\r\n
<data block>\r\n
value <key> <flags> <bytes>\r\n
<data block>\r\n
end
del

删除指令,该指令格式如下:

?
1
del <key> [noreply]\r\n

如果删除成功,则返回deleted\r\n,否则返回not_found。如果有noreply参数,则服务器不会返回响应。

java socket

java socket需要了解的只是包括tcp协议,套接字,以及io流。这里就不详细赘述,可以参考我的这系列文章,也建议去阅读java network programming。一书。

代码实现

这里贴图功能出了点问题,可以去文末我的项目地址查看类图。

这里采用了指令模式和工厂模式实现指令的创建和执行的解耦。指令工厂将会接收commandline并且返回一个command实例。每一个command都拥有execute方法用来执行各自独特的操作。这里只贴上del指令的特殊实现。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
 * 各种指令
 * 目前支持get,set,delete
 *
 * 以及自定义的
 * error,end
 */
public interface command {
 
  /**
   * 执行指令
   * @param reader
   * @param writer
   */
  void execute(reader reader, writer writer);
 
  /**
   * 获取指令的类型
   * @return
   */
  commandtype gettype();
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
 * 指令工厂 单一实例
 */
public class commandfactory {
 
  private static commandfactory commandfactory;
  private static cache<item> memcache;
  private commandfactory(){}
 
  public static commandfactory getinstance(cache<item> cache) {
    if (commandfactory == null) {
      commandfactory = new commandfactory();
      memcache = cache;
    }
    return commandfactory;
  }
 
  /**
   * 根据指令的类型获取command
   * @param commandline
   * @return
   */
  public command getcommand(string commandline){
    if (commandline.matches("^set .*$")){
      return new setcommand(commandline, memcache);
    }else if (commandline.matches("^get .*$")){
      return new getcommand(commandline, memcache);
    }else if (commandline.matches("^del .*$")){
      return new deletecommand(commandline, memcache);
    }else if (commandline.matches("^end$")){
      return new endcommand(commandline);
    }else{
      return new errorcommand(commandline, errorcommand.errortype.error);
    }
  }
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
 * 删除缓存指令
 */
public class deletecommand implements command{
 
  private final string command;
  private final cache<item> cache;
 
  private string key;
  private boolean noreply;
  public deletecommand(final string command, final cache<item> cache){
    this.command = command;
    this.cache = cache;
    initcommand();
  }
 
  private void initcommand(){
    if (this.command.contains("noreply")){
      noreply = true;
    }
    string[] info = command.split(" ");
    key = info[1];
  }
 
  @override
  public void execute(reader reader, writer writer) {
    bufferedwriter bfw = (bufferedwriter) writer;
    item item = cache.delete(key);
    if (!noreply){
      try {
        if (item == null){
          bfw.write("not_found\r\n");
        }else {
          bfw.write("deleted\r\n");
        }
        bfw.flush();
      } catch (ioexception e) {
        try {
          bfw.write("error\r\n");
          bfw.flush();
        } catch (ioexception e1) {
          e1.printstacktrace();
        }
        e.printstacktrace();
      }
    }
 
 
  }
 
  @override
  public commandtype gettype() {
    return commandtype.search;
  }
}

然后是实现内存服务器,为了支持先进先出功能,这里使用了linkedtreemap作为底层实现,并且重写了removeoldest方法。同时还使用cachemanager的后台线程及时清除过期的缓存条目。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class memcache implements cache<item>{
  private logger logger = logger.getlogger(memcache.class.getname());
  //利用linkedhashmap实现lru
  private static linkedhashmap<string, item> cache;
  private final int maxsize;
  //负载因子
  private final float default_load_factor = 0.75f;
  public memcache(final int maxsize){
    this.maxsize = maxsize;
    //确保cache不会在达到maxsize之后自动扩容
    int capacity = (int) math.ceil(maxsize / default_load_factor) + 1;
 
    this.cache = new linkedhashmap<string, item>(capacity, default_load_factor, true){
      @override
      protected boolean removeeldestentry(map.entry<string,item> eldest) {
        if (size() > maxsize){
          logger.info("缓存数量已经达到上限,会删除最近最少使用的条目");
        }
        return size() > maxsize;
      }
    };
 
    //实现同步访问
    collections.synchronizedmap(cache);
  }
 
  public synchronized boolean isfull(){
    return cache.size() >= maxsize;
  }
 
  @override
  public item get(string key) {
    item item = cache.get(key);
 
    if (item == null){
      logger.info("缓存中key:" + key + "不存在");
      return null;
    }else if(item!=null && item.isexpired()){ //如果缓存过期则删除并返回null
      logger.info("从缓存中读取key:" + key + " value:" + item.getvalue() + "已经失效");
      cache.remove(key);
      return null;
    }
 
    logger.info("从缓存中读取key:" + key + " value:" + item.getvalue() + " 剩余有效时间" + item.remaintime());
    return item;
  }
 
  @override
  public void set(string key, item value) {
    logger.info("向缓存中写入key:" + key + " value:" + value);
    cache.put(key, value);
  }
 
  @override
  public item delete(string key) {
    logger.info("从缓存中删除key:" + key);
    return cache.remove(key);
  }
 
  @override
  public int size(){
    return cache.size();
  }
 
  @override
  public int capacity() {
    return maxsize;
  }
 
  @override
  public iterator<map.entry<string, item>> iterator() {
    return cache.entryset().iterator();
  }
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
 * 缓存管理器
 * 后台线程
 * 将cache中过期的缓存删除
 */
public class cachemanager implements runnable {
 
  private logger logger = logger.getlogger(cachemanager.class.getname());
 
  //缓存
  public cache<item> cache;
 
  public cachemanager(cache<item> cache){
    this.cache = cache;
  }
 
 
  @override
  public void run() {
    while (true){
      iterator<map.entry<string, item>> itemiterator = cache.iterator();
      while (itemiterator.hasnext()){
        map.entry<string, item> entry = itemiterator.next();
        item item = entry.getvalue();
        if(item.isexpired()){
          logger.info("key:" + entry.getkey() + " value" + item.getvalue() + " 已经过期,从数据库中删除");
          itemiterator.remove();
        }
      }
 
      try {
        //每隔5秒钟再运行该后台程序
        timeunit.seconds.sleep(5);
      } catch (interruptedexception e) {
        e.printstacktrace();
      }
 
    }
  }
}

最后是实现一个多线程的socket服务器,这里就是将serversocket绑定到一个接口,并且将accept到的socket交给额外的线程处理。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
 * 服务器
 */
public class ioserver implements server {
  private boolean stop;
  //端口号
  private final int port;
  //服务器线程
  private serversocket serversocket;
  private final logger logger = logger.getlogger(ioserver.class.getname());
  //线程池,线程容量为maxconnection
  private final executorservice executorservice;
  private final cache<item> cache;
  public ioserver(int port, int maxconnection, cache<item> cache){
    if (maxconnection<=0) throw new illegalargumentexception("支持的最大连接数量必须为正整数");
    this.port = port;
    executorservice = executors.newfixedthreadpool(maxconnection);
    this.cache = cache;
  }
 
  @override
  public void start() {
    try {
      serversocket = new serversocket(port);
      logger.info("服务器在端口"+port+"上启动");
      while (true){
        try {
          socket socket = serversocket.accept();
          logger.info("收到"+socket.getlocaladdress()+"的连接");
          executorservice.submit(new sockethandler(socket, cache));
        } catch (ioexception e) {
          e.printstacktrace();
        }
      }
    } catch (ioexception e) {
      logger.log(level.warning, "服务器即将关闭...");
      e.printstacktrace();
    } finally {
      executorservice.shutdown();
      shutdown();
    }
 
 
  }
 
  /**
   * 服务器是否仍在运行
   * @return
   */
  public boolean isrunning() {
    return !serversocket.isclosed();
  }
  /**
   * 停止服务器
   */
  public void shutdown(){
    try {
      if (serversocket!=null){
        serversocket.close();
      }
    } catch (ioexception e) {
      e.printstacktrace();
    }
  }
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
 * 处理各个客户端的连接
 * 在获得end指令后关闭连接s
 */
public class sockethandler implements runnable{
 
  private static logger logger = logger.getlogger(sockethandler.class.getname());
 
  private final socket socket;
 
  private final cache<item> cache;
 
  private boolean finish;
 
 
  public sockethandler(socket s, cache<item> cache){
    this.socket = s;
    this.cache = cache;
  }
 
  @override
  public void run() {
    try {
      //获取socket输入流
      final bufferedreader reader = new bufferedreader(new inputstreamreader(socket.getinputstream()));
      //获取socket输出流
      final bufferedwriter writer = new bufferedwriter(new outputstreamwriter(socket.getoutputstream()));
 
      commandfactory commandfactory = commandfactory.getinstance(cache);
 
      while (!finish){
        final string commandline = reader.readline();
        logger.info("ip:" + socket.getlocaladdress() + " 指令:" + commandline);
 
        if (commandline == null || commandline.trim().isempty()) {
          continue;
        }
 
        //使用指令工厂获取指令实例
        final command command = commandfactory.getcommand(commandline);
        command.execute(reader, writer);
 
        if (command.gettype() == commandtype.end){
          logger.info("请求关闭连接");
          finish = true;
        }
      }
    } catch (ioexception e) {
      e.printstacktrace();
      logger.info("关闭来自" + socket.getlocaladdress() + "的连接");
    } finally {
      try {
        if (socket != null){
          socket.close();
        }
      } catch (ioexception e) {
        e.printstacktrace();
      }
    }
  }
}

项目地址请戳这里,如果觉得还不错的话,希望能给个星哈><

参考资料

memcached官网
memcache协议

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://segmentfault.com/a/1190000014215001