引言
Spring提供的声明式事务想必大家都很熟悉了,简简单单一个@Transactional
注解便能提供如此强大的功能,那么它是如何实现的呢?带着这点好奇心,我们一起扒拉扒拉spring-tx-5.2.6.RELEASE
的源码吧。
所谓工欲善其事,必先利其器,深入源码之前了解清楚spring-tx
的相关概念还是很有必要的。本篇算是Spring AOP源码解析的姊妹篇,毕竟声明式事务就是AOP思想的一个实际应用嘛。
Spring Tx Concepts
PlatformTransactionManager
PlatformTransactionManager
是Spring事务管理的核心接口,它规范了应用程序操作事务的方式。
public interface PlatformTransactionManager extends TransactionManager {
/**
* 获取事务的状态信息(依据传播行为的不同,可能返回一个已激活的事务或创建一个新的独立事务)
*/
TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException;
/**
* 提交当前事务,依据事务的当前状态,也可能会进行回滚,比如标记为rollback-only的事务
*/
void commit(TransactionStatus status) throws TransactionException;
/**
* 回滚当前事务,依据传播行为的不同,非独立的内部事务仅会打上rollback-only标记
*/
void rollback(TransactionStatus status) throws TransactionException;
}
TransactionDefinition
TransactionDefinition
描述了事务的相关属性,比如事务的隔离级别、传播行为,亦或是超时时长是多少等等。
public interface TransactionDefinition {
/**
* 返回事务的传播行为
*/
default int getPropagationBehavior() {
return PROPAGATION_REQUIRED;
}
/**
* 返回事务的隔离级别
*/
default int getIsolationLevel() {
return ISOLATION_DEFAULT;
}
/**
* 获取事务的超时时长
*/
default int getTimeout() {
return TIMEOUT_DEFAULT;
}
/**
* 是否是只读事务
*/
default boolean isReadOnly() {
return false;
}
/**
* 获取事务名称,多用于debug
*/
@Nullable
default String getName() {
return null;
}
}
事务的隔离级别想必不用多做解释,传播行为(Propagation Behavior
)是什么概念呢?简单来说,当一个事务方法被另一个事务方法调用时,传播行为可以控制是否需要创建事务以及如何创建事务,spring-tx
中定义了7种事务传播行为:
-
PROPAGATION_REQUIRED
: 表示方法必须运行在事务中,如果当前事务存在,方法将会在该事务中运行,否则将开启一个新的独立事务 -
PROPAGATION_SUPPORTS
:表示方法支持在事务中运行,如果当前事务存在,方法将会在该事务中运行,否则以非事务方式运行 -
PROPAGATION_MANDATORY
:表示方法必须运行在事务中,如果当前事务不存在,抛出异常 -
PROPAGATION_REQUIRES_NEW
:表示方法必须运行在独立事务中,无论当前是否存在事务,该级别总会开启一个新的独立事务 -
PROPAGATION_NOT_SUPPORTED
:表示方法不支持在事务中运行,如果当前事务存在,挂起当前事务从而以非事务方式运行 -
PROPAGATION_NEVER
:表示方法不支持在事务中运行,如果当前事务存在,抛出异常 -
PROPAGATION_NESTED
:表示方法必须运行在事务中,如果当前事务存在,开启一个嵌套事务(Savepoint
),否则等同于PROPAGATION_REQUIRED
TransactionDefition
更多的是表达通用的概念,它的子接口TransactionAttribute
添加了基于AOP的rollbackOn(...)
操作。
public interface TransactionAttribute extends TransactionDefinition {
/**
* 用于在Spring IoC Container中获取PlatformTransactionManager
*/
@Nullable
String getQualifier();
/**
* 判断是否在遇到指定类型的异常时进行回滚
*/
boolean rollbackOn(Throwable ex);
}
再看一眼@Transactional
注解的定义。
public @interface Transactional {
@AliasFor("transactionManager")
String value() default "";
@AliasFor("value")
String transactionManager() default "";
Propagation propagation() default Propagation.REQUIRED;
Isolation isolation() default Isolation.DEFAULT;
int timeout() default TransactionDefinition.TIMEOUT_DEFAULT;
boolean readOnly() default false;
Class<? extends Throwable>[] rollbackFor() default {};
String[] rollbackForClassName() default {};
Class<? extends Throwable>[] noRollbackFor() default {};
String[] noRollbackForClassName() default {};
}
想必你也看出来了,TransactionAttribute
正是对运行时获取到的@Transactional
注解的封装。
TransctionStatus
TransctionStatus
描述了某一时间点上事务的状态信息,比如是否是新开启的独立事务、是否已完成以及是否打上了rollback-only
标记等等,并且为了支持嵌套事务,TransctionStatus
还额外提供了对保存点(Savepoint
)的支持。
// 管理Savepoint
public interface SavepointManager {
/**
* 创建一个新的Savepoint,后续可以回滚到指定的SavePoint
*/
Object createSavepoint() throws TransactionException;
/**
* 回滚到指定的Savepoint
*/
void rollbackToSavepoint(Object savepoint) throws TransactionException;
/**
* 释放指定Savepoint
*/
void releaseSavepoint(Object savepoint) throws TransactionException;
}
// 代表事务的当前状态
public interface TransactionExecution {
/**
* 检查当前事务是否是一个独立事务,返回false表示是加入的一个已存在的事务或以非事务方式运行
*/
boolean isNewTransaction();
/**
* 给当前事务打上rollback-only标记,被标记为rollback-only的事务只能被回滚而不会被提交
*/
void setRollbackOnly();
/**
* 检查当前事务是否打上了rollback-only标记
*/
boolean isRollbackOnly();
/**
* 检查当前事务是否已完成,已提交或已回滚都认为是已完成
*/
boolean isCompleted();
}
public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {
/**
* 检查当前事务是否携带有Savepoint,也就是说是否创建了嵌套事务
*/
boolean hasSavepoint();
/**
* 如果底层TxManager支持的话就有用,比如Hibernate Session,
* 而对于JDBC DataSource/Connection来讲是没有flush这个概念的,基本上是no-op
*/
@Override
void flush();
}
Related Concepts
ResourceHolder
当一个事务方法 A 调用另一个事务方法 B 的时候,如何保证这两个方法运行在同一个事务中呢?如果方法 A 和 B 使用不同的java.sql.Connection
来操作数据库,能保证它们运行在同一个事务中吗?很明显,是不能的。多个方法运行在同一个事务中的前提是它们必须使用同一个java.sql.Connection
,以伪代码的形式就是:
try {
connection.setAutoCommit(false)
// 方法 A 执行 sql
methodA(connection);
// 方法 B 执行 sql
methodB(connection);
connection.commit()
} catch (Exception ex) {
connection.rollback()
}
java.sql.Connection
实例必须传递给方法 A 和方法 B,这样才能保证它们使用同一个连接对象。实际开发中,我们并没有像这样传递过连接对象,spring-tx
将我们从这些细节中解放了出来。
ResouceHolder
就是设计来包裹底层连接资源的,spring-tx
内部会使用线程私有存储ThreadLocal
在同一个线程中进行传递,对方法 A 和 方法 B 来说,只要它们运行在同一个线程中,就能使用上同一个连接对象。当然了,ResourceHolder
并非只能携带java.sql.Connection
,对于使用MyBatis的用户来说,它携带的就是SqlSession
了。
public interface ResourceHolder {
/**
* 重置
*/
void reset();
/**
* 解绑连接资源
*/
void unbound();
/**
* 检查此Holder是否携带有连接资源
*/
boolean isVoid();
}
TransactionSynchronization
由于spring-tx
全盘接管了事务管理,那么它自然可以管理事务的生命周期。 TransactionSynchronization
就是这样一个回调接口,它为我们揭示了事务运行时的各个阶段,如果我们需要在事务执行前后做一些额外的操作,使用它就再好不过了。
public interface TransactionSynchronization extends Flushable {
/** 事务已提交 */
int STATUS_COMMITTED = 0;
/** 事务已回滚 */
int STATUS_ROLLED_BACK = 1;
/** 状态未知 */
int STATUS_UNKNOWN = 2;
/**
* 事务被挂起时回调
*/
default void suspend() {
}
/**
* 事务恢复时回调
*/
default void resume() {
}
@Override
default void flush() {
}
/**
* 事务提交前回调
*/
default void beforeCommit(boolean readOnly) {
}
/**
* 事务完成前回调,也就是在事务管理器 commit/rollback 之前
*/
default void beforeCompletion() {
}
/**
* 事务成功提交后回调
*/
default void afterCommit() {
}
/**
* 事务完成后回调,status揭示了事务当前状态
*/
default void afterCompletion(int status) {
}
}
TransactionSynchronizationManager
spring-tx
中ResourceHolder
的绑定和传递、TransactionSynchronization
的注册和获取,均是代理给TransactionSynchronizationManager
来完成的。
public abstract class TransactionSynchronizationManager {
private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class);
// ResourceHolder的绑定关系
// 比如在DataSourceTransactionManager中key是java.sql.DataSource,value是ConnectionHolder
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
// 已注册的TransactionSynchronization
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
// 当前的事务名称
private static final ThreadLocal<String> currentTransactionName =
new NamedThreadLocal<>("Current transaction name");
// 当前事务是否是只读事务
private static final ThreadLocal<Boolean> currentTransactionReadOnly =
new NamedThreadLocal<>("Current transaction read-only status");
// 当前事务的隔离级别
private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
new NamedThreadLocal<>("Current transaction isolation level");
// 事务是否真的被激活
private static final ThreadLocal<Boolean> actualTransactionActive =
new NamedThreadLocal<>("Actual transaction active");
//-------------------------------------------------------------------------
// Management of transaction-associated resource handles
//-------------------------------------------------------------------------
/**
* 根据提供的key找寻底层资源
*/
@Nullable
public static Object getResource(Object key) {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doGetResource(actualKey);
if (value != null && logger.isTraceEnabled()) {
logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
Thread.currentThread().getName() + "]");
}
return value;
}
@Nullable
private static Object doGetResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
// 对于ResourceHolder#isVid()的情况,认为其没有实际绑定上任何资源
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
/**
* 绑定一个资源
*/
public static void bindResource(Object key, Object value) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Assert.notNull(value, "Value must not be null");
Map<Object, Object> map = resources.get();
if (map == null) {
map = new HashMap<>();
resources.set(map);
}
Object oldValue = map.put(actualKey, value);
// ResourceHolder#isVid()等同于不存在任何资源
if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
oldValue = null;
}
// TransactionSynchronizationManager中的任何操作,都只能先解除再操作,而不能进行覆盖
if (oldValue != null) {
throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
}
if (logger.isTraceEnabled()) {
logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" +
Thread.currentThread().getName() + "]");
}
}
/**
* 解绑资源,如果资源不存在抛出异常
*/
public static Object unbindResource(Object key) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doUnbindResource(actualKey);
if (value == null) {
throw new IllegalStateException(
"No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
}
return value;
}
/**
* 解绑资源,资源不存在等于no-op
*/
@Nullable
public static Object unbindResourceIfPossible(Object key) {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
return doUnbindResource(actualKey);
}
@Nullable
private static Object doUnbindResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.remove(actualKey);
if (map.isEmpty()) {
resources.remove();
}
// ResourceHolder#isVid()等同于不存在任何资源
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
value = null;
}
if (value != null && logger.isTraceEnabled()) {
logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" +
Thread.currentThread().getName() + "]");
}
return value;
}
//-------------------------------------------------------------------------
// Management of transaction synchronizations
//-------------------------------------------------------------------------
/**
* 检查transaction synchronization是否已激活
*/
public static boolean isSynchronizationActive() {
return (synchronizations.get() != null);
}
/**
* 激活transaction synchronization,如果已激活,抛出异常
*/
public static void initSynchronization() throws IllegalStateException {
if (isSynchronizationActive()) {
throw new IllegalStateException("Cannot activate transaction synchronization - already active");
}
logger.trace("Initializing transaction synchronization");
synchronizations.set(new LinkedHashSet<>());
}
/**
* 注册一个TransactionSynchronization,如果transaction synchronization未激活,抛出异常
*/
public static void registerSynchronization(TransactionSynchronization synchronization)
throws IllegalStateException {
Assert.notNull(synchronization, "TransactionSynchronization must not be null");
Set<TransactionSynchronization> synchs = synchronizations.get();
if (synchs == null) {
throw new IllegalStateException("Transaction synchronization is not active");
}
synchs.add(synchronization);
}
/**
* 返回已注册的所有TransactionSynchronization,如果transaction synchronization未激活,抛出异常
*/
public static List<TransactionSynchronization> getSynchronizations() throws IllegalStateException {
Set<TransactionSynchronization> synchs = synchronizations.get();
if (synchs == null) {
throw new IllegalStateException("Transaction synchronization is not active");
}
if (synchs.isEmpty()) {
return Collections.emptyList();
}
else {
List<TransactionSynchronization> sortedSynchs = new ArrayList<>(synchs);
// 排序,TransactionSynchronization可实现Ordered接口或使用@Order注解指定优先级
AnnotationAwareOrderComparator.sort(sortedSynchs);
return Collections.unmodifiableList(sortedSynchs);
}
}
/**
* 关闭transaction synchronization
*/
public static void clearSynchronization() throws IllegalStateException {
if (!isSynchronizationActive()) {
throw new IllegalStateException("Cannot deactivate transaction synchronization - not active");
}
logger.trace("Clearing transaction synchronization");
synchronizations.remove();
}
//-------------------------------------------------------------------------
// Exposure of transaction characteristics
//-------------------------------------------------------------------------
/**
* 记录当前事务名称
*/
public static void setCurrentTransactionName(@Nullable String name) {
currentTransactionName.set(name);
}
/**
* 返回当前事务名称
*/
@Nullable
public static String getCurrentTransactionName() {
return currentTransactionName.get();
}
/**
* 设置当前事务为只读事务
*/
public static void setCurrentTransactionReadOnly(boolean readOnly) {
currentTransactionReadOnly.set(readOnly ? Boolean.TRUE : null);
}
/**
* 检查当前事务是否为只读事务
*/
public static boolean isCurrentTransactionReadOnly() {
return (currentTransactionReadOnly.get() != null);
}
/**
* 设置当前事务的隔离级别
*/
public static void setCurrentTransactionIsolationLevel(@Nullable Integer isolationLevel) {
currentTransactionIsolationLevel.set(isolationLevel);
}
/**
* 返回当前事务的隔离级别
*/
@Nullable
public static Integer getCurrentTransactionIsolationLevel() {
return currentTransactionIsolationLevel.get();
}
/**
* 设置当前事务已开启
*/
public static void setActualTransactionActive(boolean active) {
actualTransactionActive.set(active ? Boolean.TRUE : null);
}
/**
* 检查当前事务已开启,也就是说是否真的存在事务而不仅仅只有transaction synchronization
*/
public static boolean isActualTransactionActive() {
return (actualTransactionActive.get() != null);
}
/**
* 完全清除transaction synchronization各种状态信息
*/
public static void clear() {
synchronizations.remove();
currentTransactionName.remove();
currentTransactionReadOnly.remove();
currentTransactionIsolationLevel.remove();
actualTransactionActive.remove();
}
}
注意,TransactionSynchronizationManager
在操作ResourceHolder
时是不允许直接覆盖的,旧的资源必须先解绑才能绑定新的资源。同时TransactionSynchronization
只能在transaction synchronization
激活时才能绑定,为此TransactionSynchronizationManager
提供了initSynchronization()
和clearSynchronization()
来分别开启开启和关闭transaction synchronization
。
结语
抽象是编程的先决条件,编码不过是对抽象的具体实现。好的抽象才能带出好的代码,spring-tx
也是如此,下一篇让我们一起钻到具体的实现细节里去吧~~
网友评论