BoundedLocalManualCache put() 方法源码解析
先看一下BoundedLocalManualCache
的类图
com.github.benmanes.caffeine.cache.BoundedLocalCache
中定义的BoundedLocalManualCache
静态内部类。
static class BoundedLocalManualCache<K, V> implements LocalManualCache<K, V>, Serializable
实现了LocalManualCache
接口,这个接口提供了Cache
接口的骨架实现,以最简的方式去实现一个LocalCache
。
详细查看LocalManualCache
接口里定义的内容,代码也不多,直接贴到内容里:
interface LocalManualCache<K, V> extends Cache<K, V> {
/** Returns the backing {@link LocalCache} data store. */
LocalCache<K, V> cache();
@Override
default long estimatedSize() {
return cache().estimatedSize();
}
@Override
default void cleanUp() {
cache().cleanUp();
}
@Override
default @Nullable V getIfPresent(Object key) {
return cache().getIfPresent(key, /* recordStats */ true);
}
@Override
default @Nullable V get(K key, Function<? super K, ? extends V> mappingFunction) {
return cache().computeIfAbsent(key, mappingFunction);
}
@Override
default Map<K, V> getAllPresent(Iterable<?> keys) {
return cache().getAllPresent(keys);
}
@Override
default Map<K, V> getAll(Iterable<? extends K> keys,
Function<Iterable<? extends K>, Map<K, V>> mappingFunction) {
requireNonNull(mappingFunction);
Set<K> keysToLoad = new LinkedHashSet<>();
Map<K, V> found = cache().getAllPresent(keys);
Map<K, V> result = new LinkedHashMap<>(found.size());
for (K key : keys) {
V value = found.get(key);
if (value == null) {
keysToLoad.add(key);
}
result.put(key, value);
}
if (keysToLoad.isEmpty()) {
return found;
}
bulkLoad(keysToLoad, result, mappingFunction);
return Collections.unmodifiableMap(result);
}
/**
* Performs a non-blocking bulk load of the missing keys. Any missing entry that materializes
* during the load are replaced when the loaded entries are inserted into the cache.
*/
default void bulkLoad(Set<K> keysToLoad, Map<K, V> result,
Function<Iterable<? extends @NonNull K>, @NonNull Map<K, V>> mappingFunction) {
boolean success = false;
long startTime = cache().statsTicker().read();
try {
Map<K, V> loaded = mappingFunction.apply(keysToLoad);
loaded.forEach((key, value) ->
cache().put(key, value, /* notifyWriter */ false));
for (K key : keysToLoad) {
V value = loaded.get(key);
if (value == null) {
result.remove(key);
} else {
result.put(key, value);
}
}
success = !loaded.isEmpty();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new CompletionException(e);
} finally {
long loadTime = cache().statsTicker().read() - startTime;
if (success) {
cache().statsCounter().recordLoadSuccess(loadTime);
} else {
cache().statsCounter().recordLoadFailure(loadTime);
}
}
}
@Override
default void put(K key, V value) {
cache().put(key, value);
}
@Override
default void putAll(Map<? extends K, ? extends V> map) {
cache().putAll(map);
}
@Override
default void invalidate(Object key) {
cache().remove(key);
}
@Override
default void invalidateAll(Iterable<?> keys) {
cache().invalidateAll(keys);
}
@Override
default void invalidateAll() {
cache().clear();
}
@Override
default CacheStats stats() {
return cache().statsCounter().snapshot();
}
@Override
default ConcurrentMap<K, V> asMap() {
return cache();
}
}
可以看到,CacheLoader
接口定义了load
、loadAll
、put
、putAll
、invalidate
、invalidateAll
、stats
和asMap
等方法,做一个简单实现。这些方法提供了缓存的基本操作,如加载缓存、添加缓存、移除缓存、获取缓存统计信息等。
Manual Cache 源码
static class BoundedLocalManualCache<K, V> implements LocalManualCache<K, V>, Serializable {
private static final long serialVersionUID = 1;
final BoundedLocalCache<K, V> cache;
final boolean isWeighted;
@Nullable Policy<K, V> policy;
BoundedLocalManualCache(Caffeine<K, V> builder) {
this(builder, null);
}
BoundedLocalManualCache(Caffeine<K, V> builder, @Nullable CacheLoader<? super K, V> loader) {
cache = LocalCacheFactory.newBoundedLocalCache(builder, loader, /* async */ false);
isWeighted = builder.isWeighted();
}
@Override
public BoundedLocalCache<K, V> cache() {
return cache;
}
@Override
public Policy<K, V> policy() {
return (policy == null)
? (policy = new BoundedPolicy<>(cache, Function.identity(), isWeighted))
: policy;
}
@SuppressWarnings("UnusedVariable")
private void readObject(ObjectInputStream stream) throws InvalidObjectException {
throw new InvalidObjectException("Proxy required");
}
Object writeReplace() {
return makeSerializationProxy(cache, isWeighted);
}
}
定义了一个BoundedLocalCache
属性,还有权重的标志位isWeighted
,以及一个Policy
属性。BoundedLocalManualCache
的构造方法中,调用了LocalCacheFactory.newBoundedLocalCache
方法,创建了一个BoundedLocalCache
对象,并赋值给cache
属性。policy
属性则是在policy()
方法中创建的。policy 是一个BoundedPolicy
对象,它实现了Policy
接口,用于管理缓存策略。BoundedPolicy
源码紧接着就在BoundedLocalManualCache
下面,这里就不贴出来了。
static final class BoundedPolicy<K, V> implements Policy<K, V>
,里具体定义了了BoundedLocalCache
的缓存策略,比如缓存大小,缓存权重,缓存过期时间等。
接下来我们看BoundedLocalCache
的put
方法
手动使用调用cache.put(k, v);
会调用put(key, value, expiry(), /* notifyWriter */ true, /* onlyIfAbsent */ false);
具体的参数解释如下:
- key:要放入缓存的键。
- value:要放入缓存的值。
- expiry:缓存的过期时间,默认为
Duration.ZERO
,表示永不过期。 - notifyWriter:是否通知写入者,默认为
true
。 - onlyIfAbsent:是否只在缓存中不存在该键时才放入,默认为
false
。
put 方法源码如下:
@Nullable V put(K key, V value, Expiry<K, V> expiry, boolean notifyWriter, boolean onlyIfAbsent) {
requireNonNull(key);
requireNonNull(value);
Node<K, V> node = null;
long now = expirationTicker().read();
int newWeight = weigher.weigh(key, value);
for (;;) {
// 获取 prior 节点
Node<K, V> prior = data.get(nodeFactory.newLookupKey(key));
if (prior == null) {
// 如果不存在 prior 节点,则创建新的节点
if (node == null) {
node = nodeFactory.newNode(key, keyReferenceQueue(),
value, valueReferenceQueue(), newWeight, now);
setVariableTime(node, expireAfterCreate(key, value, expiry, now));
}
// notifyWriter 为 true 且存在Writer时,通知Writer
if (notifyWriter && hasWriter()) {
Node<K, V> computed = node;
prior = data.computeIfAbsent(node.getKeyReference(), k -> {
writer.write(key, value);
return computed;
});
// 如果存在 prior 节点,调用 afterWrite 方法
if (prior == node) {
afterWrite(new AddTask(node, newWeight));
return null;
// 如果onlyIfAbsent 为 true。代表只在缓存中不存在该键时才放入缓存
} else if (onlyIfAbsent) {
V currentValue = prior.getValue();
if ((currentValue != null) && !hasExpired(prior, now)) {
if (!isComputingAsync(prior)) {
tryExpireAfterRead(prior, key, currentValue, expiry(), now);
setAccessTime(prior, now);
}
afterRead(prior, now, /* recordHit */ false);
return currentValue;
}
}
// 如果 notifyWriter 为 false,直接放入缓存
} else {
prior = data.putIfAbsent(node.getKeyReference(), node);
if (prior == null) {
afterWrite(new AddTask(node, newWeight));
return null;
} else if (onlyIfAbsent) {
// An optimistic fast path to avoid unnecessary locking
V currentValue = prior.getValue();
if ((currentValue != null) && !hasExpired(prior, now)) {
if (!isComputingAsync(prior)) {
tryExpireAfterRead(prior, key, currentValue, expiry(), now);
setAccessTime(prior, now);
}
afterRead(prior, now, /* recordHit */ false);
return currentValue;
}
}
}
} else if (onlyIfAbsent) {
// An optimistic fast path to avoid unnecessary locking
V currentValue = prior.getValue();
if ((currentValue != null) && !hasExpired(prior, now)) {
if (!isComputingAsync(prior)) {
tryExpireAfterRead(prior, key, currentValue, expiry(), now);
setAccessTime(prior, now);
}
afterRead(prior, now, /* recordHit */ false);
return currentValue;
}
}
// 如果 prior != null,则说明该节点已经存在,则尝试获取锁
V oldValue;
long varTime;
int oldWeight;
boolean expired = false;
boolean mayUpdate = true;
boolean exceedsTolerance = false;
synchronized (prior) {
if (!prior.isAlive()) {
continue;
}
oldValue = prior.getValue();
oldWeight = prior.getWeight();
// 如果 oldValue == null,通过 expireAfterCreate 方法计算过期时间,并删除key对应的值
if (oldValue == null) {
varTime = expireAfterCreate(key, value, expiry, now);
writer.delete(key, null, RemovalCause.COLLECTED);
// 返回prior是否过期,true,则删除key对应的值
} else if (hasExpired(prior, now)) {
expired = true;
varTime = expireAfterCreate(key, value, expiry, now);
writer.delete(key, oldValue, RemovalCause.EXPIRED);
// 如果 onlyIfAbsent 为 true,则不更新key对应的值,返回新的过期时间
} else if (onlyIfAbsent) {
mayUpdate = false;
varTime = expireAfterRead(prior, key, value, expiry, now);
} else {
varTime = expireAfterUpdate(prior, key, value, expiry, now);
}
// notifyWriter 为true,如果过期或者更新了值,则通知Writer
if (notifyWriter && (expired || (mayUpdate && (value != oldValue)))) {
writer.write(key, value);
}
// 如果mayUpdate为true,计算过期时间是否超出容忍度
if (mayUpdate) {
exceedsTolerance =
(expiresAfterWrite() && (now - prior.getWriteTime()) > EXPIRE_WRITE_TOLERANCE)
|| (expiresVariable()
&& Math.abs(varTime - prior.getVariableTime()) > EXPIRE_WRITE_TOLERANCE);
setWriteTime(prior, now);
prior.setWeight(newWeight);
prior.setValue(value, valueReferenceQueue());
}
// 设置访问时间和过期时间
setVariableTime(prior, varTime);
setAccessTime(prior, now);
}
// 如果在创建缓存时设置了移除监听器,则通知移除监听器
if (hasRemovalListener()) {
if (expired) {
notifyRemoval(key, oldValue, RemovalCause.EXPIRED);
} else if (oldValue == null) {
notifyRemoval(key, /* oldValue */ null, RemovalCause.COLLECTED);
} else if (mayUpdate && (value != oldValue)) {
notifyRemoval(key, oldValue, RemovalCause.REPLACED);
}
}
// 更新权重,判断是不是第一写入,如果是,调用afterWrite方法
int weightedDifference = mayUpdate ? (newWeight - oldWeight) : 0;
if ((oldValue == null) || (weightedDifference != 0) || expired) {
afterWrite(new UpdateTask(prior, weightedDifference));
// 判断 onlyIfAbsent 是否为 true,以及是否超过容忍度,如果超过容忍度,调用afterWrite方法
} else if (!onlyIfAbsent && exceedsTolerance) {
afterWrite(new UpdateTask(prior, weightedDifference));
} else {
if (mayUpdate) {
setWriteTime(prior, now);
}
//执行 afterRead 方法
afterRead(prior, now, /* recordHit */ false);
}
return expired ? null : oldValue;
}
}
案例中通过 cache.put(k,v)
调用方法,走到这个方法中,因为是第一次尝试储存key和value,所以代码中声明的 node = null
,获取的prior = null
,if (prior == null)
,创建新节点,设置创建后过期时间。notifyWriter=true
但hasWriter=false
,执行else中方法
prior = data.putIfAbsent(node.getKeyReference(), node);
if (prior == null) {
afterWrite(new AddTask(node, newWeight));
return null;
} else if (onlyIfAbsent) {
// An optimistic fast path to avoid unnecessary locking
V currentValue = prior.getValue();
if ((currentValue != null) && !hasExpired(prior, now)) {
if (!isComputingAsync(prior)) {
tryExpireAfterRead(prior, key, currentValue, expiry(), now);
setAccessTime(prior, now);
}
afterRead(prior, now, /* recordHit */ false);
return currentValue;
}
}
putIfAbsent 方法:由于data中不存在我们的key,value,返回 null,调用 afterWrite() 方法,将任务放入writeBuffer中,调用scheduleAfterWrite()方法
void afterWrite(Runnable task) {
for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
if (writeBuffer.offer(task)) {
scheduleAfterWrite();
return;
}
scheduleDrainBuffers();
}
scheduleAfterWrite()方法:
void scheduleAfterWrite() {
for (;;) {
switch (drainStatus()) {
case IDLE:
casDrainStatus(IDLE, REQUIRED);
scheduleDrainBuffers();
return;
case REQUIRED:
scheduleDrainBuffers();
return;
case PROCESSING_TO_IDLE:
if (casDrainStatus(PROCESSING_TO_IDLE, PROCESSING_TO_REQUIRED)) {
return;
}
continue;
case PROCESSING_TO_REQUIRED:
return;
default:
throw new IllegalStateException();
}
}
}
看到这我其实是有点蒙了,因为笔者的异步编程基础薄弱,只看方法名字做一个不负责任的猜想,写入后安排异步任务,条件符合执行清理计划,会继续调用 scheduleDrainBuffers() 方法
scheduleDrainBuffers() 方法:
void scheduleDrainBuffers() {
if (drainStatus() >= PROCESSING_TO_IDLE) {
return;
}
if (evictionLock.tryLock()) {
try {
int drainStatus = drainStatus();
if (drainStatus >= PROCESSING_TO_IDLE) {
return;
}
lazySetDrainStatus(PROCESSING_TO_IDLE);
executor.execute(drainBuffersTask);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown when submitting maintenance task", t);
maintenance(/* ignored */ null);
} finally {
evictionLock.unlock();
}
}
}
drainStatus() 就是返回这条件的值,如果大于等于 PROCESSING_TO_IDLE 就直接返回,否则执行 tryLock() 方法,如果成功,则执行 executor.execute(drainBuffersTask); 方法,否则执行 maintenance() 方法,这个方法就是执行清理任务的方法。
传进来的drainBuffersTask
是一个PerformCleanupTask
,这个类实现了Runnable
接口,重写了run()
方法,这个方法就是执行清理任务的方法。
@Override
public void run() {
BoundedLocalCache<?, ?> cache = reference.get();
if (cache != null) {
cache.performCleanUp(/* ignored */ null);
}
}
继续看performCleanUp()
方法:
void performCleanUp(@Nullable Runnable task) {
evictionLock.lock();
try {
maintenance(task);
} finally {
evictionLock.unlock();
}
if ((drainStatus() == REQUIRED) && (executor == ForkJoinPool.commonPool())) {
scheduleDrainBuffers();
}
}
可以看到,这里也是调用了maintenance()
方法,然后判断drainStatus()
是否等于REQUIRED
,如果等于,则调用scheduleDrainBuffers()
方法。
@GuardedBy("evictionLock")
void maintenance(@Nullable Runnable task) {
lazySetDrainStatus(PROCESSING_TO_IDLE);
try {
drainReadBuffer();
drainWriteBuffer();
if (task != null) {
task.run();
}
drainKeyReferences();
drainValueReferences();
expireEntries();
evictEntries();
climb();
} finally {
if ((drainStatus() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
lazySetDrainStatus(REQUIRED);
}
}
}
maintenance()
是实际的清理方法,它首先将drainStatus()
设置为PROCESSING_TO_IDLE
,然后调用drainReadBuffer()
、drainWriteBuffer()
、drainKeyReferences()
、drainValueReferences()
、expireEntries()
、evictEntries()
、climb()
等方法,清理读写缓冲区、过期条目、驱逐条目等。
到这里,afterWrite()
基本就执行完了,写入一次(key,value),都会去判断是否需要清理,如果需要清理,就异步调用maintenance()
方法进行清理。
如果是给已经存在的key设置值,put方法执行到最后会调用 afterRead()
方法
void afterRead(Node<K, V> node, long now, boolean recordHit) {
if (recordHit) {
statsCounter().recordHits(1);
}
boolean delayable = skipReadBuffer() || (readBuffer.offer(node) != Buffer.FULL);
if (shouldDrainBuffers(delayable)) {
scheduleDrainBuffers();
}
refreshIfNeeded(node, now);
}
afterRead()
方法会记录命中次数,然后判断是否需要延迟写入缓冲区,如果需要延迟写入缓冲区,则将节点放入读取缓冲区,如果读取缓冲区已满,则调用scheduleDrainBuffers()
方法异步清理缓冲区,最后调用refreshIfNeeded()
方法异步刷新节点。
refreshIfNeeded()
方法会根据节点的过期时间、访问时间、更新时间等判断是否需要刷新节点,如果需要刷新节点,则调用refresh()
方法刷新节点。
本例中没有设置过期时间,直接返回。
总结
本文算是比较详细的把put()
方法执行流程分析了一遍,通过分析put()
方法,我们可以了解到Caffeine缓存的基本原理,以及如何使用Caffeine缓存,学习如何自己实现一个本地缓存的 put()方法,怎样执行一个异步的清理任务,怎样判断是否需要清理,怎样异步刷新节点等等。
笔者也是一个小菜鸟,刚开始看一些源码,可能有些地方理解的不对,欢迎指正,谢谢!
希望本文对你有所帮助,如果有任何问题,欢迎在评论区留言讨论。