Mybatis工作机制源码分析—缓存机制及事务机制

时间:2021-09-05 05:07:14

        本文主要以源码分析的角度分析Mybatis的缓存工作机制及事务机制。

缓存工作机制

整体设计图

         网上一张关于Mybatis的缓存工作机制示意图:

Mybatis工作机制源码分析—缓存机制及事务机制

Cache类结构图

Mybatis工作机制源码分析—缓存机制及事务机制

相关源码

         二级缓存处理主要体现在CachingExecutor;一级缓存处理主要体现在BaseExecutor,其带localCache、localOutputParameterCache本地缓存,它们都是PerpetualCache,而PerpetualCache本质上为包装的HashMap<Object, Object>版本。下面从update、select请求处理分别分析缓存的工作机制:

启用二级缓存(一级缓存直接使用)

/** Configuration.java */
public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
executorType = executorType == null ? defaultExecutorType : executorType;
executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
Executor executor;
if (ExecutorType.BATCH == executorType) {
executor = new BatchExecutor(this, transaction);
} else if (ExecutorType.REUSE == executorType) {
executor = new ReuseExecutor(this, transaction);
} else {
executor = new SimpleExecutor(this, transaction);
}
// 在Mybatis config配置文件中,配置<setting name="cacheEnabled" value="true"/>,打开二级缓存
// 二级缓存可细粒度到mapper配置文件中的每条sql语句,useCache="true"则表示该sql语句使用二级缓存
if (cacheEnabled) {
// 装饰CachingExecutor
executor = new CachingExecutor(executor);
}
executor = (Executor) interceptorChain.pluginAll(executor);
return executor;
}

处理update请求过程中缓存工作机制

       update的二级缓存处理:sql语句设置flushCache="true",则清空当前cache对应的TransactionalCache;update的一级缓存处理,清空本地缓存:

/** CachingExecutor.java */
// CachingExecutor update
// CachingExecutor用TransactionalCacheManager,进行TransactionalCache层的管理
public int update(MappedStatement ms, Object parameterObject) throws SQLException {
// update的二级缓存处理
flushCacheIfRequired(ms);
return delegate.update(ms, parameterObject);
}

// flush MappedStatement cache
private void flushCacheIfRequired(MappedStatement ms) {
Cache cache = ms.getCache();
if (cache != null && ms.isFlushCacheRequired()) {
// sql语句配置了flushCache="true",则进行当前cache对应的TransactionalCache clear工作
tcm.clear(cache);
}
}

/** TransactionalCacheManager.java */
public void clear(Cache cache) {
getTransactionalCache(cache).clear();
}

private TransactionalCache getTransactionalCache(Cache cache) {
TransactionalCache txCache = transactionalCaches.get(cache);
if (txCache == null) {
txCache = new TransactionalCache(cache);
transactionalCaches.put(cache, txCache);
}
return txCache;
}

/** TransactionalCache.java */
// clearOnCommit标识设置为true
// entriesToAddOnCommit清空
public void clear() {
clearOnCommit = true;
entriesToAddOnCommit.clear();
}

/** BaseExecutor.java(SimpleExecutor) */
public int update(MappedStatement ms, Object parameter) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
// update的一级缓存处理
// 清空BaseExecutor下的localCache、localOutputParameterCache,其均为PerpetualCache
// PerpetualCache实际缓存为HashMap<Object, Object>
clearLocalCache();
return doUpdate(ms, parameter);
}

public void clearLocalCache() {
if (!closed) {
localCache.clear();
localOutputParameterCache.clear();
}
}

处理select请求过程中缓存工作机制

       构建CacheKey,select二级缓存处理:从缓存中获取相关结果,一般select语句配置flushCache="false",否则二级缓存返回null,如果一级缓存返回有结果,则将结果保存到二级缓存TransactionalCache;select一级缓存处理:从缓存中获取结果,有则缓存输出参数到localOutputParameterCache,否则,从database获取结果,进行一级缓存到localCache、localOutputParameterCache:

/** CachingExecutor.java */
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
// MappedStatement构建BoundSql,实际用DynamicSqlSource或StaticSqlSource构建BoundSql
BoundSql boundSql = ms.getBoundSql(parameterObject);
// 构建CacheKey(同一次缓存,二级缓存key与一级缓存key相同)
CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}

/** CachingExecutor.java */
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
return delegate.createCacheKey(ms, parameterObject, rowBounds, boundSql);
}

/** BaseExecutor.java */
// 创建CacheKey,用MappedStatement、parameterObject、rowBounds、boundSql影响CacheKey的hashCode
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
if (closed) {
throw new ExecutorException("Executor was closed.");
}
CacheKey cacheKey = new CacheKey();
// cacheKey关联MappedStatement id
cacheKey.update(ms.getId());
// cacheKey关联rowBounds offset
cacheKey.update(Integer.valueOf(rowBounds.getOffset()));
// cacheKey关联Offset limit
cacheKey.update(Integer.valueOf(rowBounds.getLimit()));
// cacheKey关联boundSql sql
cacheKey.update(boundSql.getSql());
List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();
// mimic DefaultParameterHandler logic
for (int i = 0; i < parameterMappings.size(); i++) {
ParameterMapping parameterMapping = parameterMappings.get(i);
if (parameterMapping.getMode() != ParameterMode.OUT) {
Object value;
String propertyName = parameterMapping.getProperty();
if (boundSql.hasAdditionalParameter(propertyName)) {
value = boundSql.getAdditionalParameter(propertyName);
} else if (parameterObject == null) {
value = null;
} else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
value = parameterObject;
} else {
MetaObject metaObject = configuration.newMetaObject(parameterObject);
value = metaObject.getValue(propertyName);
}
// cacheKey关联parameterMapping value
cacheKey.update(value);
}
}
if (configuration.getEnvironment() != null) {
// cacheKey关联environment id
cacheKey.update(configuration.getEnvironment().getId());
}
return cacheKey;
}

/** CachingExecutor.java */
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
throws SQLException {
Cache cache = ms.getCache();
if (cache != null) {
// select二级缓存处理,同update二级缓存处理
flushCacheIfRequired(ms);
if (ms.isUseCache() && resultHandler == null) {
ensureNoOutParams(ms, parameterObject, boundSql);
@SuppressWarnings("unchecked")
// 从二级缓存cache中获取,如果sql语句配置了flushCache="true",则获取为null
// 因为flushCacheIfRequired已经将clearOnCommit设置为true
// 所以一般select语句配置flushCache="false"
List<E> list = (List<E>) tcm.getObject(cache, key);
if (list == null) {
list = delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
// 获取结果保存到二级缓存TransactionalCache
tcm.putObject(cache, key, list); // issue #578 and #116
}
return list;
}
}
return delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}

/** BaseExecutor.java */
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
if (queryStack == 0 && ms.isFlushCacheRequired()) {
clearLocalCache();
}
List<E> list;
try {
queryStack++;
// select一级缓存处理
// 查看本地缓存,有则直接返回
list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
if (list != null) {
// 处理本地缓存的输出参数
handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {
// 没有则继续从Database查询
list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
queryStack--;
}
if (queryStack == 0) {
for (DeferredLoad deferredLoad : deferredLoads) {
deferredLoad.load();
}
// issue #601
deferredLoads.clear();
if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
// MappedStatement级别的缓存,则清空本地缓存
clearLocalCache();
}
}
return list;
}

private void handleLocallyCachedOutputParameters(MappedStatement ms, CacheKey key, Object parameter, BoundSql boundSql) {
if (ms.getStatementType() == StatementType.CALLABLE) {
final Object cachedParameter = localOutputParameterCache.getObject(key);
if (cachedParameter != null && parameter != null) {
final MetaObject metaCachedParameter = configuration.newMetaObject(cachedParameter);
final MetaObject metaParameter = configuration.newMetaObject(parameter);
for (ParameterMapping parameterMapping : boundSql.getParameterMappings()) {
if (parameterMapping.getMode() != ParameterMode.IN) {
final String parameterName = parameterMapping.getProperty();
final Object cachedValue = metaCachedParameter.getValue(parameterName);
metaParameter.setValue(parameterName, cachedValue);
}
}
}
}
}

private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
List<E> list;
localCache.putObject(key, EXECUTION_PLACEHOLDER);
try {
// 从Database查询
list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
} finally {
// 清空本地之前CacheKey对应的缓存
localCache.removeObject(key);
}
// 将查询结果key——list缓存到localCache
localCache.putObject(key, list);
if (ms.getStatementType() == StatementType.CALLABLE) {
// 将key——parameter缓存到localOutputParameterCache
localOutputParameterCache.putObject(key, parameter);
}
return list;
}

事务工作机制

      一般Mybatis与Spring集成使用,这样Mybatis的事务管理工作交给Spring,事务管理工作对Mybatis是透明的。

Spring事务管理相关了解

        事务隔离级别:

隔离级别是指若干个并发的事务之间的隔离程度。TransactionDefinition 接口中定义了五个表示隔离级别的常量:

TransactionDefinition.ISOLATION_DEFAULT:这是默认值,表示使用底层数据库的默认隔离级别。对大部分数据库而言,通常这值就是TransactionDefinition.ISOLATION_READ_COMMITTED。

TransactionDefinition.ISOLATION_READ_UNCOMMITTED:该隔离级别表示一个事务可以读取另一个事务修改但还没有提交的数据。该级别不能防止脏读,不可重复读和幻读,因此很少使用该隔离级别。比如PostgreSQL实际上并没有此级别。

TransactionDefinition.ISOLATION_READ_COMMITTED:该隔离级别表示一个事务只能读取另一个事务已经提交的数据。该级别可以防止脏读,这也是大多数情况下的推荐值。

TransactionDefinition.ISOLATION_REPEATABLE_READ:该隔离级别表示一个事务在整个过程中可以多次重复执行某个查询,并且每次返回的记录都相同。该级别可以防止脏读和不可重复读。

TransactionDefinition.ISOLATION_SERIALIZABLE:所有的事务依次逐个执行,这样事务之间就完全不可能产生干扰,也就是说,该级别可以防止脏读、不可重复读以及幻读。但是这将严重影响程序的性能。通常情况下也不会用到该级别。

        事务传播行为:

所谓事务的传播行为是指,如果在开始当前事务之前,一个事务上下文已经存在,此时有若干选项可以指定一个事务性方法的执行行为。在TransactionDefinition定义中包括了如下几个表示传播行为的常量:

TransactionDefinition.PROPAGATION_REQUIRED:如果当前存在事务,则加入该事务;如果当前没有事务,则创建一个新的事务。这是默认值。

TransactionDefinition.PROPAGATION_REQUIRES_NEW:创建一个新的事务,如果当前存在事务,则把当前事务挂起。

TransactionDefinition.PROPAGATION_SUPPORTS:如果当前存在事务,则加入该事务;如果当前没有事务,则以非事务的方式继续运行。

TransactionDefinition.PROPAGATION_NOT_SUPPORTED:以非事务方式运行,如果当前存在事务,则把当前事务挂起。

TransactionDefinition.PROPAGATION_NEVER:以非事务方式运行,如果当前存在事务,则抛出异常。

TransactionDefinition.PROPAGATION_MANDATORY:如果当前存在事务,则加入该事务;如果当前没有事务,则抛出异常。

TransactionDefinition.PROPAGATION_NESTED:如果当前存在事务,则创建一个事务作为当前事务的嵌套事务来运行;如果当前没有事务,则该取值等价于TransactionDefinition.PROPAGATION_REQUIRED。

@Transactional注解

Mybatis工作机制源码分析—缓存机制及事务机制