Guava cache源码解析

时间:2023-04-03 18:08:15


下面源码地址为:https://gitee.com/lidishan/guava-code-analysis/blob/master/guava/src/com/google/common/cache/LocalCache.java

涉及依赖

<dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
       <version>29.0-jre</version>
   </dependency>
    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>failureaccess</artifactId>
       <version>1.0.1</version>
   </dependency>

数据结构图

Guava cache源码解析

初始化Guava Cache缓存对象方式

初始化Guava Cache有两种方式:

第一种:loader方式。初始化之后只能固定调用load()方法加载数据。例子如下:

// 第一种:loader方式
LoadingCache<Object, Object> cache = CacheBuilder.newBuilder()
        .build(new CacheLoader<Object, Object>() {
            @Override
            public Object load(Object name) {
                // 在cache找不到就从load()方法中取数据
                return String.format("重新load(%s):%s", System.currentTimeMillis(), name);
            }
        });
cache.get("a");

第二种:callable方式。初始化之后,可以通过在get、put()方法参数里面执行callable参数进行加载,限制更小。例子如下:

Cache<Object, Object> cache = CacheBuilder.newBuilder().build();
String key = "a";
// 跟loader的最大区别是callable可以直接在不同的get、put定制自己的返回方式,限制更少。
cache.get(key, new Callable<String>() {
            @Override
            public String call() throws Exception {
                // 取不到数据就从callable里面的get()方法获取
                return "aaa:" + key;
            }
        });

调用参数例子

LoadingCache<Object, Object> userCache = CacheBuilder.newBuilder()
        // 基于容量回收。缓存的最大数量。超过就取MAXIMUM_CAPACITY = 1 << 30。依靠LRU队列recencyQueue来进行容量淘汰
        .maximumSize(1000)
        // 基于容量回收。但这是统计占用内存大小,maximumWeight与maximumSize不能同时使用。设置最大总权重
        .maximumWeight(1000)
        // 设置权重(可当成每个缓存占用的大小)
        .weigher((o, o2) -> 5)
        // 软弱引用(引用强度顺序:强软弱虚)
        // -- 弱引用key
        .weakKeys()
        // -- 弱引用value
        .weakValues()
        // -- 软引用value
        .softValues()
        // 过期失效回收
        // -- 没读写访问下,超过5秒会失效(非自动失效,需有任意getput方法才会扫描过期失效数据)
        .expireAfterAccess(5L, TimeUnit.SECONDS)
        // -- 没写访问下,超过5秒会失效(非自动失效,需有任意putget方法才会扫描过期失效数据)
        .expireAfterWrite(5L, TimeUnit.SECONDS)
        // 没写访问下,超过5秒会失效(非自动失效,需有任意putget方法才会扫描过期失效数据。但区别是会开一个异步线程进行刷新,刷新过程中访问返回旧数据)
        .refreshAfterWrite(5L, TimeUnit.SECONDS)
        // 移除监听事件
        .removalListener(removal -> {
            // 可做一些删除后动作,比如上报删除数据用于统计
            System.out.printf("触发删除动作,删除的key=%s%n", removal);
        })
        // 并行等级。决定segment数量的参数,concurrencyLevel与maxWeight共同决定
        .concurrencyLevel(16)
        // 开启缓存统计。比如命中次数、未命中次数等
        .recordStats()
        // 所有segment的初始总容量大小
        .initialCapacity(512)
        // 用于测试,可任意改变当前时间。参考:https://www.geek-share.com/detail/2689756248.html
        .ticker(new Ticker() {
            @Override
            public long read() {
                    return 0;
                    }
        })
        .build(new CacheLoader<Object, Object>() {
            @Override
            public Object load(Object name) {
                    // 在cache找不到就取数据
                    return String.format("重新load(%s):%s", System.currentTimeMillis(), name);
                    }
        });
// 简单使用
userCache.put("a", "aaa");
System.out.println(userCache.get("a"));

淘汰方式

基于容量有三种方式。分别为:基于容量回收、定时回收、基于引用回收。

其中定时回收分为两种:按照写入时间,最早写入的最先回收;按照访问时间,最早访问的最早回收

淘汰方式-基于容量回收

有两种方式:

第一种:基于缓存数量决定回收。Xxx.maximumSize(1000),缓存数量超过1000即回收。

第二种:基于缓存大小决定回收。Xxx.maximumWeight(1000).weigher((o, o2) -> 5),每个缓存权重5,总权重超过1000就回收。

由于第二种方式我们很难判定一个value占据内存大小,所以一般使用基于缓存数量决定回收。Xxx.maximumSize(1000)。下面源码解析:

  • 触发基于容量数量回收的场景demo:
Cache<Object, Object> cache = CacheBuilder.newBuilder()
        // 缓存的最大数量,这里等于2。超过就取MAXIMUM_CAPACITY = 1 << 30。依靠LRU队列recencyQueue来进行容量淘汰
        .maximumSize(2).build();
cache.put("a", "aa");
cache.put("b", "bb");
cache.put("c", "cc");// 执行到第三个put的时候会根据LRU回收“a”
  • 接下来我们解析一下上面的代码是怎么进行回收的?

其中一条调用链路为:LocalCache#put() -> LocalCache#evictEntries() -> LocalCache#removeEntry()

put、get、replace之类的时候会进行基于容量的回收,这是不会自动定时回收的!!!

V put(K key, int hash, V value, boolean onlyIfAbsent) {
    ....
    evictEntries(newEntry); 
    ....
}
void evictEntries(ReferenceEntry<K, V> newest) {
    if (!map.evictsBySize()) {// 如果一个元素都没就不回收处理了
        return;
    }
    // 把recencyQueue的元素放到access(如果在access存在的话)
    drainRecencyQueue();

    // If the newest entry by itself is too heavy for the segment, don't bother evicting
    // anything else, just that
    // 如果新entry大于最大权重,就进行移除
    if (newest.getValueReference().getWeight() > maxSegmentWeight) {
        if (!removeEntry(newest, newest.getHash(), RemovalCause.SIZE)) {
            throw new AssertionError();
        }
    }
    // 重点!!!!如果总权重大于最大权重(由于我们这里用的是基于maximumSize(1000),所以maxSegmentWeight=1000)
    // 重点!!!!如果总权重大于最大权重(由于我们这里用的是基于maximumSize(1000),所以maxSegmentWeight=1000)
    while (totalWeight > maxSegmentWeight) {
        // 获取accessQueue队列第一个权重大于零的数据(基于容量数量的方式,每个元素weight=1)
        ReferenceEntry<K, V> e = getNextEvictable();
        if (!removeEntry(e, e.getHash(), RemovalCause.SIZE)) {
            throw new AssertionError();
        }
    }
}

淘汰方式-定时回收

定时回收是不会定时自动回收的(即没用额外的线程进行定时回收),所有的回收都需要外部的操作如get、put、replace等才会判断是否过期,然后触发回收。<br>

  • expireAfterXxxx:过期会加锁重新加载数据,其他所有线程访问都会被阻塞。
  • refreshAfterWrite:过期会加锁重新异步加载数据,原来的线程可能会返回旧值,但其他线程访问不会被阻塞。
  • 执行步骤:先判断expireAfterXxxx需不需要回收,再判断refreshAfterWrite需不需要回收
  • 优缺点对比:refreshAfterWrite异步刷新,性能比较好,适用于数据一致性要求不高的场景(事实上一般能用到本地缓存的,哪里还需要这么多强数据一致性,建议直接用这个)

定时回收有三个参数如下:

CacheBuilder.newBuilder()
    // -- 没读写访问下,超过5秒会失效(非自动失效,需有任意getput之类方法才会扫描过期失效数据)
    .expireAfterAccess(5L, TimeUnit.SECONDS)
    // -- 没写访问下,超过5秒会失效(非自动失效,需有任意put、replace之类方法才会扫描过期失效数据)
    .expireAfterWrite(5L, TimeUnit.SECONDS)
    // 没写访问下,超过5秒会失效(非自动失效,需有任意put、replace之类方法才会扫描过期失效数据。但区别是会开一个异步线程进行刷新,刷新过程中访问返回旧数据)
    .refreshAfterWrite(5L, TimeUnit.SECONDS)
    .build();

回收的逻辑代码如下:

/**
 * get方法主要分为三步:
 * - 第一步:先判断expireAfterXxxx设置的是否超时存活,如果存活就返回value不为空的
 * - 第二步:上面如果没超时,value就不为空,会判断refreshAfterWrite设置的参数,然后看是否需要异步刷新。如果判断到loading中就会返回旧值。
 * - 第三步:调用到lockedGetOrLoad(),如果第一步超时了,就会调用到这里。会加锁,然后判断是否loading来决定是否阻塞等待或者是直接获取数据。
 */
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
  checkNotNull(key);
  checkNotNull(loader);
  try {
    if (count != 0) { // read-volatile  count不为空说明还有元素可以读取
      // don't call getLiveEntry, which would ignore loading values
      // 获取对应下标的链表的第一个节点。然后遍历链表获取对应key的值
      ReferenceEntry<K, V> e = getEntry(key, hash);
      if (e != null) {
        long now = map.ticker.read();// 获取当前时间,纳秒
        // 获取value的值 根据expireAfterAccess和expireAfterWrite进行判断是否过期。如果没设置expireAfterXxxx或者设置了没过期,那就不会返回null
        // 第一步:!!!!!!!先判断expireAfterXxxx
        // 第一步:!!!!!!!先判断expireAfterXxxx
        V value = getLiveValue(e, now);// 拿到没过期存活的数据
        if (value != null) {// 终于发现一个正常值
          recordRead(e, now);// 记录值的方法时间accessTime,以及进最近使用使用队列recencyQueue
          statsCounter.recordHits(1);// 累加命中次数,用于计算命中率
          // 第二步:!!!!!!!expireAfterXxxx不需要处理,在判断是否需要处理refreshAfterWrite
          // 第二步:!!!!!!!expireAfterXxxx不需要处理,在判断是否需要处理refreshAfterWrite
          return scheduleRefresh(e, key, hash, value, now, loader);
        }
        // 走到这步,说明取出来的值 value == null, 可能是过期了,也有可能正在刷新
        ValueReference<K, V> valueReference = e.getValueReference();
        // 如果此时value正在loading,那么此时等待刷新结果
        if (valueReference.isLoading()) {
          return waitForLoadingValue(e, key, valueReference);
        }
      }
    }

    // at this point e is either null or expired;
    // 到了这步,e要么空要么超时,需要加锁进行加载
    // 第三步:调用到lockedGetOrLoad(),如果第一步超时了,就会调用到这里。会加锁,然后判断是否loading来决定是否阻塞等待或者是直接获取数据。
    // 第三步:调用到lockedGetOrLoad(),如果第一步超时了,就会调用到这里。会加锁,然后判断是否loading来决定是否阻塞等待或者是直接获取数据。
    return lockedGetOrLoad(key, hash, loader);
  } catch (ExecutionException ee) {
    Throwable cause = ee.getCause();
    if (cause instanceof Error) {
      throw new ExecutionError((Error) cause);
    } else if (cause instanceof RuntimeException) {
      throw new UncheckedExecutionException(cause);
    }
    throw ee;
  } finally {
    postReadCleanup();
  }
}
V scheduleRefresh(
    ReferenceEntry<K, V> entry,
    K key,
    int hash,
    V oldValue,
    long now,
    CacheLoader<? super K, V> loader) {
  // 开启了定时刷新(配置了refreshAfterWrite(n), n > 0)
  // && 当前时间 - 上次更新时间 > 刷新时间
  // && 除了不是LoadingValueReference
  if (map.refreshes()
      && (now - entry.getWriteTime() > map.refreshNanos)
      && !entry.getValueReference().isLoading()) {
    V newValue = refresh(key, hash, loader, true);// 进行刷新
    if (newValue != null) {
      return newValue;
    }
  }
  return oldValue;
}
V refresh(K key, int hash, CacheLoader<? super K, V> loader, boolean checkTime) {
  // 插入loadingValueReference,表示该值正在loading。其实就是告诉其他线程正在loading中。
  // - 有如下两种情况:
  // -- 第一种:expireAfterXxxx设置的时间过期了。发现正在loading中,会等阻塞等待loading结束在获取值
  // -- 第二种:expireAfterXxxx设置的时间没过期。发现正在loading中,会直接跳过异步刷新步骤, return oldValue;
  final LoadingValueReference<K, V> loadingValueReference =
      insertLoadingValueReference(key, hash, checkTime);
  if (loadingValueReference == null) {
    return null;
  }
  // 用future进行了阻塞
  ListenableFuture<V> result = loadAsync(key, hash, loadingValueReference, loader);
  if (result.isDone()) {
    try {
      return Uninterruptibles.getUninterruptibly(result);
    } catch (Throwable t) {
      // don't let refresh exceptions propagate; error was already logged
    }
  }
  return null;
}
/**
 * 清除过期的数据
 */
@GuardedBy("this")
void expireEntries(long now) {
  drainRecencyQueue();

  ReferenceEntry<K, V> e;
  // 清理write、access队列超时的元素
  while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {
    if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
      throw new AssertionError();
    }
  }
  while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {
    if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
      throw new AssertionError();
    }
  }
}

淘汰方式-基于引用回收(软弱引用容易被GC)

基于引用回收有三个入参参数:

弱引用key:get/put时发现有key被回收了就回收当前entry。Xxx.weakKeys()

弱引用value:get/put时发现有value被回收了就回收当前entry。Xxx.weakValues()

软引用value:get/put时发现有value被回收了就回收当前entry。Xxx.softValues()
设置弱引用key时会遍历keyReferenceQueue看是否需要回收,设置软弱引用value同样会遍历valueReferenceQueue,不过默认最多遍历16次。

  • 基于引用回收的场景demo:
Cache<Object, Object> cache = CacheBuilder.newBuilder()
        .weakKeys().build();
// 执行的时候扫描发现有的key被GC了,就把对应的entry干掉(不一定是“a”对应的entry key被回收了,其他扫描到就收掉。每次最多收16个)
cache.get("a", "aa");
  • 接下来我们解析一下上面的代码是怎么进行回收的?

其中一条调用链路为:LocalCache#put() -> LocalCache#preWriteCleanup()
-> LocalCache#runLockedCleanup(now) -> LocalCache#drainReferenceQueues()

put、get、replace之类的时候会进行基于容量的回收,这是不会自动定时回收的!!!

void runLockedCleanup(long now) {
  if (tryLock()) {
    try {
      // 清除非强key/value的entry
      drainReferenceQueues();
      // 清除过期的数据
      expireEntries(now); // calls drainRecencyQueue
      readCount.set(0);
    } finally {
      unlock();
    }
  }
}
void drainReferenceQueues() {
    // key非强引用类型,就进行回收下一步操作
    if (map.usesKeyReferences()) {
        drainKeyReferenceQueue();
    }
    // value非强引用类型,就进行回收下一步操作
    if (map.usesValueReferences()) {
        drainValueReferenceQueue();
    }
}
void drainKeyReferenceQueue() {
  Reference<? extends K> ref;
  int i = 0;
  // keyReferenceQueue只有设置了weakKeys()才会用到
  while ((ref = keyReferenceQueue.poll()) != null) {
    ReferenceEntry<K, V> entry = (ReferenceEntry<K, V>) ref;
    map.reclaimKey(entry);// 会调用到removeValueFromChain进行各个步骤的移除
    if (++i == DRAIN_MAX) {// 单次清除的最大数量
      break;
    }
  }
}
void drainValueReferenceQueue() {
  Reference<? extends V> ref;
  int i = 0;
  // keyReferenceQueue只有设置了 weakValues()、softValues() 才会用到
  while ((ref = valueReferenceQueue.poll()) != null) {
    ValueReference<K, V> valueReference = (ValueReference<K, V>) ref;
    map.reclaimValue(valueReference);// 会调用到removeValueFromChain进行各个步骤的移除
    if (++i == DRAIN_MAX) {// 单次清除的最大数量
      break;
    }
  }
}

补充知识

创建entry方式

创建entry是通过枚举工厂来创建,步骤如下:

第一步:通过构造方法LocalCache()初始化entryFactory(这里是一个枚举工厂,具体实现见代码分析)

entryFactory = EntryFactory.getFactory(keyStrength, usesAccessEntries(), usesWriteEntries());

第二步:调用newEntry()方法通过entryFactory创建一个entry

其代码分析如下

/** 用于创建entry的工厂
* 能生产多种方式,其为 引用方式x访问方式 的笛卡儿积
* - 引用方式:strong、week、soft
* - 访问方式:access、write、access_write
*
* Factory used to create new entries.
 * * */
final EntryFactory entryFactory;
/**
 * 用特定的策略创建一个新的空entry,初始化容量和并发等级
 * Creates a new, empty map with the specified strategy, initial capacity and concurrency level.
 **/
LocalCache(
    CacheBuilder<? super K, ? super V> builder, @Nullable CacheLoader<? super K, V> loader){
    ....
    // 用引用类型、是否访问队列、是否写队列 来做三元表达式+位运算 得出下标,拿到对应的工厂的枚举实例
    entryFactory = EntryFactory.getFactory(keyStrength, usesAccessEntries(), usesWriteEntries());
    ....
}
ReferenceEntry<K, V> newEntry(K key, int hash, @Nullable ReferenceEntry<K, V> next) {
    return map.entryFactory.newEntry(this, checkNotNull(key), hash, next);
}
/** 不同引用类型的工厂,感觉这样用枚举有点骚气   Look-up table for factories. */
static final EntryFactory[] factories = {
    STRONG,
    STRONG_ACCESS,
    STRONG_WRITE,
    STRONG_ACCESS_WRITE,
    WEAK,
    WEAK_ACCESS,
    WEAK_WRITE,
    WEAK_ACCESS_WRITE,
};
/**
 * 获取工厂(根据key的类型取)
 */
static EntryFactory getFactory(
    Strength keyStrength, boolean usesAccessQueue, boolean usesWriteQueue) {
    // 这个三元表达式的位运算有点巧妙,居然能最终定位到具体的工厂枚举
    // 以弱引用为起点,弱引用WEAK下标为4,即WEAK=0100,
    //      如果允许访问,就 | ACCESS_MASK,作用等同于+1
    //      如果允许写,就 | WRITE_MASK,作用等同于+2
    // 备注:能用这个方式的前提:第一个元素与后面没冲突,比如WEAK=0100,他的低2为都为0,可以直接用“|”实现累加的效果
    int flags = ((keyStrength == Strength.WEAK) ? WEAK_MASK : 0)// WEAK_MASK=0100
        | (usesAccessQueue ? ACCESS_MASK : 0)// ACCESS_MASK:0001
        | (usesWriteQueue ? WRITE_MASK : 0);// WRITE_MASK:0010
    return factories[flags];
}

移除监听事件

移除监听事件。见名知意就是entry被移除的时候会触发,然后在这里我们写对应的业务处理逻辑即可。


使用方式(下面entry移除的时候就会调用里面的方法System.out.printf(“触发删除动作,删除的key=%s%n”, removal)):

Cache<Object, Object> cache = CacheBuilder.newBuilder()
    // 移除监听事件
    .removalListener(removal -> {
        // 可做一些删除后动作,比如上报删除数据用于统计
        System.out.printf("触发删除动作,删除的key=%s%n", removal);
    }).build();

其实现过程为:

第一步:guava cache维护了一个removalNotificationQueue队列,在原有的entry被搞掉就会把数据压入removalNotificationQueue队列中。

具体触发入队的 的移除枚举是RemovalCause(这几种事件会触发移除监听队列入队),含:超时、容量、回收、替换、手动干掉

第二步:执行完程序后会在finally调用例行清理方法或执行其他缓存清除也会触发到removalNotificationQueue.poll()出队。

/**
 * 移除的具体事件枚举:超时、容量、回收、替换、手动干掉 
 */
public enum RemovalCause {
  EXPLICIT {// 业务调用缓存清除相关方法
    @Override
    boolean wasEvicted() {
      return false;
    }
  },
  REPLACED {// 替换
    @Override
    boolean wasEvicted() {
      return false;
    }
  },
  COLLECTED {// key或value被GC回收
    @Override
    boolean wasEvicted() {
      return true;
    }
  },
  EXPIRED {// 超时
    @Override
    boolean wasEvicted() {
      return true;
    }
  },
  SIZE {// 容量不够
    @Override
    boolean wasEvicted() {
      return true;
    }
  }
  abstract boolean wasEvicted();
}
/**
 * 这里是移除调用到的入队方法=======================================
 * 这里是移除调用到的入队方法=======================================
 * 把需要处理的消息压入map.removalNotificationQueue。然后会有对应的回调方法进行处理
 */
@GuardedBy("this")
void enqueueNotification(
    @Nullable K key, int hash, @Nullable V value, int weight, RemovalCause cause) {
  totalWeight -= weight;// 扣减权重
  if (cause.wasEvicted()) {// 如果是自动移除,就返回true,即cause=COLLECTED、EXPIRED、SIZE
    statsCounter.recordEviction();// 自增回收数量 evictionCount.increment()
  }
  // 如果不是废弃队列,就压入移除调用回调通知队列中
  //      (默认没开启expireAfterXxx、refreshAfterXxx、removalListener之类的,就会默认初始化为DISCARDING_QUEUE)
  if (map.removalNotificationQueue != DISCARDING_QUEUE) {
    RemovalNotification<K, V> notification = RemovalNotification.create(key, value, cause);
    // 进行移除回调队列,由于队列是阻塞性,所以生产之后,消费方就会消费处理。消费方:processPendingNotifications
    map.removalNotificationQueue.offer(notification);
  }
}

/**
 * 这里是移除调用到的出队方法=======================================
 * 这里是移除调用到的出队方法=======================================
 * 入队过程,在节点不存在、节点已过期、缓存clear等操作时调用,生产方为:enqueueNotification
 */
void processPendingNotifications() {
    RemovalNotification<K, V> notification;
    while ((notification = removalNotificationQueue.poll()) != null) {
      try {
        removalListener.onRemoval(notification);// 调用移除触发方法,做移除后的处理逻辑
      } catch (Throwable e) {
        logger.log(Level.WARNING, "Exception thrown by removal listener", e);
      }
    }
}