在Spring Data Redis提供了RedisTemplate对redis进行读写操作并且支持事务。
如果在同一线程(比如Web环境的一次请求中)中存在下面操作将会造成读操作无法直接读取出数据
1.先在非事务环境下执行reids操作(调用没有加@Transactional注解)
2.然后在事务环境下执行redis操作(调用添加了@Transactional注解的方法)
可以从RedisTemplate源码中找到原因
RedisTemplate中对Redis的各种数据类型的操作都抽象出了相对于的操作类 如 ValueOperations,ListOperations,SetOperations等,而这些类在执行操作时最终还是会调用RedisTemplate的public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline),这个方法是RedisTemplate的操作Reids的核心方法
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {
Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
Assert.notNull(action, "Callback object must not be null");
RedisConnectionFactory factory = getConnectionFactory();
RedisConnection conn = null;
try {
if (enableTransactionSupport) {
// only bind resources in case of potential transaction synchronization
//如果设置了启用事务,则调用bindConnection
conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
} else {
conn = RedisConnectionUtils.getConnection(factory);
}
boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
//预留钩子函数可在执行具体操作前对connection做一些处理
RedisConnection connToUse = preProcessConnection(conn, existingConnection);
boolean pipelineStatus = connToUse.isPipelined();
if (pipeline && !pipelineStatus) {
connToUse.openPipeline();
}
RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
T result = action.doInRedis(connToExpose);
// close pipeline
if (pipeline && !pipelineStatus) {
connToUse.closePipeline();
}
// TODO: any other connection processing?
//预留钩子函数可在执行具体操作后对connection做一些处理
return postProcessResult(result, connToUse, existingConnection);
} finally {
RedisConnectionUtils.releaseConnection(conn, factory);
}
}
可以看出这个方法是个模板方法,实现了整个操作的流程
RedisConnectionUtils是获取连接的工具类,在配置RedisTemplate是如果设置了enableTransactionSupport=true时,则会通过bindConnection方法获取连接
//bindConnection调用了doGetConnection
public static RedisConnection bindConnection(RedisConnectionFactory factory, boolean enableTranactionSupport) {
return doGetConnection(factory, true, true, enableTranactionSupport);
}
public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,
boolean enableTransactionSupport) {
Assert.notNull(factory, "No RedisConnectionFactory specified");
//从当前线程中获取连接
RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);
if (connHolder != null) {
if (enableTransactionSupport) {
//开启reids事务
potentiallyRegisterTransactionSynchronisation(connHolder, factory);
}
return connHolder.getConnection();
}
if (!allowCreate) {
throw new IllegalArgumentException("No connection found and allowCreate = false");
}
if (log.isDebugEnabled()) {
log.debug("Opening RedisConnection");
}
//如果当前线程中不存在连接则创建连接
RedisConnection conn = factory.getConnection();
if (bind) {
RedisConnection connectionToBind = conn;
//如果开启的事务且调用添加了@Transactional的方法,这里会创建一个连接的代理对象
if (enableTransactionSupport && isActualNonReadonlyTransactionActive()) {
connectionToBind = createConnectionProxy(conn, factory);
}
connHolder = new RedisConnectionHolder(connectionToBind);
//绑定连接到当前线程中
TransactionSynchronizationManager.bindResource(factory, connHolder);
if (enableTransactionSupport) {
//开启reids事务
potentiallyRegisterTransactionSynchronisation(connHolder, factory);
}
return connHolder.getConnection();
}
return conn;
}
//开启reids事务
private static void potentiallyRegisterTransactionSynchronisation(RedisConnectionHolder connHolder,
final RedisConnectionFactory factory) {
if (isActualNonReadonlyTransactionActive()) {
if (!connHolder.isTransactionSyncronisationActive()) {
connHolder.setTransactionSyncronisationActive(true);
RedisConnection conn = connHolder.getConnection();
conn.multi();
//注册一个事务完成时的回调,用于提交或回滚redis事务
TransactionSynchronizationManager.registerSynchronization(new RedisTransactionSynchronizer(connHolder, conn,
factory));
}
}
}
上面代码可以看出获取连接的整个流程
- TransactionSynchronizationManager.getResource(factory)(从当前线程中获取连接,TransactionSynchronizationManager使用ThreadLocal把连接绑定到当前线程上。
- 如果获取到连接则开启事务,返回连接,如果没有获取到则创建连接
- 创建完连接后会判断当前操作是否在事务中isActualNonReadonlyTransactionActive (是否添加了@Transactional注解,并且事务不是ReadOnly的)
- 如果操作实在事务中,则会创建一个连接的代理对象
- TransactionSynchronizationManager.bindResource(factory, connHolder); 绑定事务到当前线程中
- potentiallyRegisterTransactionSynchronisation(connHolder, factory); 开启redis事务
- 返回连接
从上面流程可以看出在事务中执行和不在事务中执行的关键区别在于,是否创建了一个连接的代理对象,下面看一下createConnectionProxy的代码
//创建了一个ConnectionSplittingInterceptor类用于拦截RedisConnection所有方法
private static RedisConnection createConnectionProxy(RedisConnection connection, RedisConnectionFactory factory) {
ProxyFactory proxyFactory = new ProxyFactory(connection);
proxyFactory.addAdvice(new ConnectionSplittingInterceptor(factory));
return RedisConnection.class.cast(proxyFactory.getProxy());
}


上面代码中创建了一个ConnectionSplittingInterceptor类用于拦截RedisConnection中的所有方法,ConnectionSplittingInterceptor中的核心代码是intecepter方法
@Override
public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
RedisCommand commandToExecute = RedisCommand.failsafeCommandLookup(method.getName());
//判断命令是否为只读命令,如果是则新开一个连接执行度操作,如果是写命令则放在事务中执行
if (isPotentiallyThreadBoundCommand(commandToExecute)) {
if (log.isDebugEnabled()) {
log.debug(String.format("Invoke '%s' on bound conneciton", method.getName()));
}
return invoke(method, obj, args);
}
if (log.isDebugEnabled()) {
log.debug(String.format("Invoke '%s' on unbound conneciton", method.getName()));
}
RedisConnection connection = factory.getConnection();
try {
return invoke(method, connection, args);
} finally {
// properly close the unbound connection after executing command
if (!connection.isClosed()) {
connection.close();
}
}
}


intecepter方法中会判断这次执行的命令是否是读命令。如果不是,会用当前线程中的连接执行也就是放在事务中执行,如果是读操作,会创建一个新的连接执行,这样就能立即获得读取的数据。
通过代码可以看出出错的大致流程:
- 调用没有使用事务的reids操作
- 创建一个连接并绑定到当前线程中(由于没有使用事务,不会创建连接的代理对象)
- 执行reids操作 (操作完成后并没有把当前线程中的连接清除)
- 调用使用事务的redis操作(方法上添加了@Transactional注解)
- 获取连接方向当前线程中已经存在了连接不再重新创建(获取到的是没有使用事务时创建的连接,此连接对象不是代理对象)
- 开启事务
- 执行操作(如果执行的是读操作,由于连接对象不是代理对象,读操作并不会重新创建一个连接,而是使用当前连接,并且放在事务中运行,因此读操作并不会立即执行而是等到事务提交时才能执行,导致读操作读取的结果为null)
解决方案:
此问题关键在于如果执行了为使用事务的reids操作,在操作完成后要将当前线程中绑定的连接对象给清除掉,或者在使用的事务的reids操作之前,判断获取到的连接是否是代理对象,如果不是则清除掉,重新获取连接。在RedisTemplate的execute方法中我们看到了 reids为我们预留了两个钩子函数,
preProcessConnection(conn, existingConnection) 和 postProcessResult(result, connToUse, existingConnection) 因此我们可以继承RedisTemplate来对连接进行处理
public class CustomRedisTemplate<K, V> extends RedisTemplate<K, V> {
private boolean enableTransactionSupport = false;
private static boolean isActualNonReadonlyTransactionActive() {
return TransactionSynchronizationManager.isActualTransactionActive()
&& !TransactionSynchronizationManager.isCurrentTransactionReadOnly();
}
/**
* 解决 redis先非事务中运行,然后又在事务中运行,出现取到的连接还是非事务连接的问题
* 在事务环境中用非事务连接,读取操作无法马上读出数据
*
* @param connection
* @param existingConnection
* @return
*/
@Override
protected RedisConnection preProcessConnection(RedisConnection connection, boolean existingConnection) {
if (existingConnection && !Proxy.isProxyClass(connection.getClass()) && isActualNonReadonlyTransactionActive()) {
RedisConnectionUtils.unbindConnection(getConnectionFactory());
List<TransactionSynchronization> list = new ArrayList<>(TransactionSynchronizationManager.getSynchronizations());
TransactionSynchronizationManager.clearSynchronization();
TransactionSynchronizationManager.initSynchronization();
//移除最后一个回调(由于之前回去连接是会注册一个事务回调,下面如果再获取连接会导致注册两个事务回调。事务完成后会执行两次回调,
// 回调中会清除资源,第一次已经清除,第二次再清的时候回抛出异常)
list.remove(list.size() - 1);
list.forEach(TransactionSynchronizationManager::registerSynchronization);
connection = RedisConnectionUtils.bindConnection(getConnectionFactory(), enableTransactionSupport);
}
return connection;
}
@Override
public void setEnableTransactionSupport(boolean enableTransactionSupport) {
super.setEnableTransactionSupport(enableTransactionSupport);
this.enableTransactionSupport = enableTransactionSupport;
}
}
网友评论