CachingExecutor
CachingExecutor 是一个 Executor 接口的装饰器,它为 Executor 对象增加了二级缓存的相关功能。
它封装了一个用于执行数据库操作的 Executor 对象,以及一个用于管理缓存的 TransactionalCacheManager 对象。
TransactionalCache 和 TransactionalCacheManager 是 CachingExecutor 依赖的两个组件。
组件以及二级缓存关系
- CachingExecutor 为 Executor 装饰了二级缓存功能
- TransactionalCacheManager 管理了所有的 以 namespace 为存储单位的二级缓存
- TransactionalCache 是二级缓存 Cache 的代理类,对外提供了二级缓存的功能,里面包装了二级缓存 Cache 对象(delegate),这个delegate 提供了真实的数据缓存功能。

public class CachingExecutor implements Executor {
private final Executor delegate;
private final TransactionalCacheManager tcm = new TransactionalCacheManager();
public CachingExecutor(Executor delegate) {
this.delegate = delegate;
delegate.setExecutorWrapper(this);
}
@Override
public Transaction getTransaction() {
return delegate.getTransaction();
}
@Override
public void close(boolean forceRollback) {
try {
//issues #499, #524 and #573
if (forceRollback) {
tcm.rollback();
} else {
tcm.commit();
}
} finally {
delegate.close(forceRollback);
}
}
@Override
public boolean isClosed() {
return delegate.isClosed();
}
@Override
public int update(MappedStatement ms, Object parameterObject) throws SQLException {
flushCacheIfRequired(ms);
return delegate.update(ms, parameterObject);
}
/**
* 查询过程
* (1)获取 BoundSql 对象,创建查询语句对应的 CacheKey 对象
* (2)检测是否开启了二级缓存,如果没有开启二级缓存,则直接调用底层 Executor 对象的 query() 方法查询数据库。如果开启了二级缓存,则继续后面的步骤
* (3)检测查询操作是否包含输出类型的参数,如果是这种情况,则报错
* (4)调用 TransactionalCacheManager.getObject()方法查询二级缓存,如果二级缓存中查找到相应的结果对象,则直接将该结果对象返回。
* (5)如果二级缓存没有相应的结果对象,则调用底层 Executor 对象的 query() 方法,正如前面介绍的 ,它会先查询一级缓存,一级缓存未命中时,才会查询数据库。
* 最后还会将得到的结果对象放入 TransactionalCache.entriesToAddOnCommit 集合中保存。
*
* @param ms
* @param parameterObject
* @param rowBounds
* @param resultHandler
* @param <E>
* @return
* @throws SQLException
*/
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
// 步骤1:获取 BoundSql 对象,解析 BoundSql
BoundSql boundSql = ms.getBoundSql(parameterObject);
// 创建 CacheKey 对象
CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
@Override
public <E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException {
flushCacheIfRequired(ms);
return delegate.queryCursor(ms, parameter, rowBounds);
}
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
throws SQLException {
// 获取查询语句所在命名空间对应的二级缓存
Cache cache = ms.getCache();
// 步骤2:是否开启了二级缓存功能
if (cache != null) {
flushCacheIfRequired(ms); // 根据 <select> 节点的配置,决定是否需要清空二级缓存
// 检测 SQL 节点的 useCache 配置以及是否使用了 resultHandler 配置
if (ms.isUseCache() && resultHandler == null) {
//步骤3: 二级缓存不能保存输出类型的参数 如果查询操作调用了包含输出参数的存储过程,则报错
ensureNoOutParams(ms, boundSql);
// 步骤4:查询二级缓存
@SuppressWarnings("unchecked")
List<E> list = (List<E>) tcm.getObject(cache, key);
if (list == null) {
// 步骤5:二级缓存没用相应的结果对象,调用封装的 Executor 对象的 query() 方法,这个 query() 方法会先查询一级缓存
list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
// 将查询结果保存到 TransactionalCache.entriesToAddOnCommit 集合中
tcm.putObject(cache, key, list); // issue #578 and #116
}
return list;
}
}
// 没有启动二级缓存,直接调用底层 Executor 执行数据数据库查询操作
return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
/**
* 调用底层 Executor 的 flushStatements() 方法
* @return
* @throws SQLException
*/
@Override
public List<BatchResult> flushStatements() throws SQLException {
return delegate.flushStatements();
}
/**
* 首先调用底层 Executor 对象对应的方法完成事务的提交,然后再调用 TransactionalCacheManager 的对应方法完成对二级缓存的相应操作
* @param required
* @throws SQLException
*/
@Override
public void commit(boolean required) throws SQLException {
delegate.commit(required); // 调用底层的 Executor 提交事务
tcm.commit(); //遍历所有相关的 TransactionalCache 对象执行 commit() 方法
}
/**
* 首先调用底层 Executor 对象对应的方法完成事务的回滚,然后再调用 TransactionalCacheManager 的对应方法完成对二级缓存的相应操作
* @param required
* @throws SQLException
*/
@Override
public void rollback(boolean required) throws SQLException {
try {
delegate.rollback(required); // 调用底层的 Executor 回滚事务
} finally {
if (required) {
tcm.rollback(); //遍历所有相关的 TransactionalCache 对象执行 rollback() 方法
}
}
}
/**
* 二级缓存不能保存输出类型的参数 如果查询操作调用了包含输出参数的存储过程,则报错
* @param ms
* @param boundSql
*/
private void ensureNoOutParams(MappedStatement ms, BoundSql boundSql) {
if (ms.getStatementType() == StatementType.CALLABLE) {
for (ParameterMapping parameterMapping : boundSql.getParameterMappings()) {
if (parameterMapping.getMode() != ParameterMode.IN) {
throw new ExecutorException("Caching stored procedures with OUT params is not supported. Please configure useCache=false in " + ms.getId() + " statement.");
}
}
}
}
@Override
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
return delegate.createCacheKey(ms, parameterObject, rowBounds, boundSql);
}
@Override
public boolean isCached(MappedStatement ms, CacheKey key) {
return delegate.isCached(ms, key);
}
@Override
public void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType) {
delegate.deferLoad(ms, resultObject, property, key, targetType);
}
/**
* 清空一级缓存
*/
@Override
public void clearLocalCache() {
delegate.clearLocalCache();
}
/**
* 是否要求刷新 一级和二级缓存
* @param ms
*/
private void flushCacheIfRequired(MappedStatement ms) {
// 获取 二级缓存
Cache cache = ms.getCache();
if (cache != null && ms.isFlushCacheRequired()) {
tcm.clear(cache);
}
}
@Override
public void setExecutorWrapper(Executor executor) {
throw new UnsupportedOperationException("This method should not be called");
}
}
TransactionalCacheManager
TransactionalCacheManager 用于管理 CachingExecutor 使用的二级缓存对象,其中只定义了一个 transactionalCaches,它的 key 是对应的 CachingExecutor 使用的二级缓存对象,value 是相应的 TransactionalCache 对象,在该 TransactionalCache 中封装了对应的二级缓存对象,也就是这里的 key。
public class TransactionalCacheManager {
private final Map<Cache, TransactionalCache> transactionalCaches = new HashMap<>();
/**
* 调用指定二级缓存对应的 TransactionalCache 对象对应的方法
* @param cache
*/
public void clear(Cache cache) {
getTransactionalCache(cache).clear();
}
/**
* 调用指定二级缓存对应的 TransactionalCache 对象对应的方法
* @param cache
*/
public Object getObject(Cache cache, CacheKey key) {
return getTransactionalCache(cache).getObject(key);
}
/**
* 调用指定二级缓存对应的 TransactionalCache 对象对应的方法
* @param cache
* @param key
* @param value
*/
public void putObject(Cache cache, CacheKey key, Object value) {
getTransactionalCache(cache).putObject(key, value);
}
/**
* 遍历 transactionalCaches 集合,调用 TransactionalCache 相应的方法
*/
public void commit() {
for (TransactionalCache txCache : transactionalCaches.values()) {
txCache.commit();
}
}
/**
* 遍历 transactionalCaches 集合,调用 TransactionalCache 相应的方法
*/
public void rollback() {
for (TransactionalCache txCache : transactionalCaches.values()) {
txCache.rollback();
}
}
/**
* 如果集合中不包含 TransactionalCache,则创建一个,并放入 transactionalCaches 中
* @param cache
* @return
*/
private TransactionalCache getTransactionalCache(Cache cache) {
return transactionalCaches.computeIfAbsent(cache, TransactionalCache::new);
}
}
TransactionalCache
TransactionalCache 主要用于保存 某个 SqlSession 的 某个事务中需要向 某个二级缓存中添加的缓存 的数据。
名字为 TransactionalCache 主要是针对解决了二级缓存中一些事务的问题,所以起名字为 TransactionalCache。
The 2nd level cache transactional buffer.
This class holds all cache entries that are to be added to the 2nd level cache during a Session.
public class TransactionalCache implements Cache {
private static final Log log = LogFactory.getLog(TransactionalCache.class);
// 底层封装的二级缓存所对应的 Cache 对象,以存储 namespace 为单位的 Cache 对象,默认为 PerpetualCache
private final Cache delegate;
// 当改字段为 true 时,则表示当前 TransactionalCache 不可查询,且提交事务时会将底层 Cache 清空
private boolean clearOnCommit;
// 暂时记录添加到 TransactionalCache 中的数据。在事务提交时,会将其中的数据添加到二级缓存中
private final Map<Object, Object> entriesToAddOnCommit;
// 记录缓存未命中的 CacheKey 对象
private final Set<Object> entriesMissedInCache;
public TransactionalCache(Cache delegate) {
this.delegate = delegate;
this.clearOnCommit = false;
this.entriesToAddOnCommit = new HashMap<>();
this.entriesMissedInCache = new HashSet<>();
}
@Override
public String getId() {
return delegate.getId();
}
@Override
public int getSize() {
return delegate.getSize();
}
/**
* 它首先会查询底层的二级缓存,并将为命中的 key 记录到 entriesMissedInCache,之后根据 clearOnCommit 字段的值决定具体的返回值
* @param key The key
* @return
*/
@Override
public Object getObject(Object key) {
// issue #116
// 查询底层的 Cache 是否包含了指定的 key
Object object = delegate.getObject(key);
// 如果底层缓存对象中不包含改缓存项,则将该 key 记录到 entriesMissedInCache 集合中
if (object == null) {
entriesMissedInCache.add(key);
}
// issue #146
if (clearOnCommit) {
return null;
} else {
// 返回层底层 Cache 中查询到的对象
return object;
}
}
/**
* 该方法并没有直接将结果对象记录到其封装的二级缓存中,而是暂时保存在 entriesToAddOnCommit 集合中,
* 在事务提交时才会将这些结果对象从 entriesToAddOnCommit 集合 添加到二级缓存中。
* @param key Can be any object but usually it is a {@link CacheKey}
* @param object
*/
@Override
public void putObject(Object key, Object object) {
entriesToAddOnCommit.put(key, object);
}
@Override
public Object removeObject(Object key) {
return null;
}
/**
* 清空 entriesToAddOnCommit 集合,并设置 clearOnCommit 为 true
*/
@Override
public void clear() {
clearOnCommit = true;
entriesToAddOnCommit.clear();
}
/**
* 会根据 clearOnCommit 字段的值决定是否清空二级缓存,然后调用 flushPendingEntries() 方法将 entriesToAddOnCommit 集合中记录的结果对象保存到二级缓存中
*/
public void commit() {
// 在事务提交前,清空二级缓存
if (clearOnCommit) {
delegate.clear();
}
// 将 entriesToAddOnCommit 集合中记录的结果对象保存到二级缓存中
flushPendingEntries();
// 重置 clearOnCommit 值,清空 entriesToAddOnCommit 和 entriesMissedInCache 集合
reset();
}
/**
* 将 entriesMissedInCache 集合中记录的缓存项从二级缓存中删除,并清空 entriesToAddOnCommit 和 entriesMissedInCache 集合
*/
public void rollback() {
// 将 entriesMissedInCache 集合中记录的缓存项从二级缓存中删除
unlockMissedEntries();
// 清空 entriesToAddOnCommit 和 entriesMissedInCache 集合
reset();
}
/**
* 重置 clearOnCommit 值,清空 entriesToAddOnCommit 和 entriesMissedInCache 集合
*/
private void reset() {
clearOnCommit = false;
entriesToAddOnCommit.clear();
entriesMissedInCache.clear();
}
/**
* 将 entriesToAddOnCommit 集合中记录的结果对象保存到二级缓存中
*/
private void flushPendingEntries() {
for (Map.Entry<Object, Object> entry : entriesToAddOnCommit.entrySet()) {
delegate.putObject(entry.getKey(), entry.getValue());
}
// 遍历 entriesMissedInCache 集合,缓存为命中的,且 entriesToAddOnCommit 集合中不包含的缓存项, 添加到二级缓冲中
for (Object entry : entriesMissedInCache) {
if (!entriesToAddOnCommit.containsKey(entry)) {
delegate.putObject(entry, null);
}
}
}
/**
* 将 entriesMissedInCache 集合中记录的缓存项从二级缓存中删除
*/
private void unlockMissedEntries() {
for (Object entry : entriesMissedInCache) {
try {
delegate.removeObject(entry);
} catch (Exception e) {
log.warn("Unexpected exception while notifiying a rollback to the cache adapter."
+ "Consider upgrading your cache adapter to the latest version. Cause: " + e);
}
}
}
}
线程安全
不同的 CachingExecutor 对象由不同的线程操作,二级缓存会不会存在线程不安全的问题呢?
CacheBuilder.build() 方法,其中回调用 setStandardDecorators() 方法,为 PerpetualCache 类型的 Cache 对象添加 SynchronizedCache 装饰器,从而保证了二级线程安全。
事务提交
为什么要在事务提交时才将 TransactionalCache. entriesToAddOnCommit 集合中缓存的数据写入到二级缓存,而不是像一级缓存那样,将每次查询结果都直接写入二级缓存呢?
这是为了防止出现“脏读” 情况 最终实现的效果有点类似于“不可重复读”的事务隔离级别。假设当前数据库的隔离级别是“不可重复读”,先
后开启 Tl、T2 两个事务,如图 3- 52 示,在事务 Tl 添加了记录A, 之后查询A记录,最后提交事务,事务T2 会查询记录A。 如果事务 Tl 查询记录A时,就将A对应的结果对象放人二级缓存,则在事务 T2 第一次查询记录A时即可从二级缓存中直接获取其对应的结果对象。此时 Tl 仍然未提交,这就出现了"脏读"的情况,显然不是用户期的结果。

按照 CacheExecutor 的本身实现,事务T1查询记录A时二级缓存未命中,会查询数据库,因为是同一事务,所以可以查询到记录A 并得到相应的结果对象,并且会将记录 A 保存到 TransactionalCache .entriesToAddOnCommit 集合中。而事务T2第一次查询记录A时,二级缓存未命中,则会访问数据库,因为是不同个事务,数据库的“不可重复读”隔离级别会保证事务T2无法查询到记录A,这样就避免了上面的“脏读”的场景。在图 3-52 中,事务T1提交时会将 entriesToAddOnCommit 集合中的数据添加到二级缓存中,所以事务T2第二次查询记录A时,二级缓存才会命中,这就导致了同一事务中多次读取的结果不一致,也就是 “不可重复读”的场景。
网友评论