源码分析基于spring 4.3.x
文章主要记录看spring jdbc源码时的一些关键点。主要关注如下几点
- 事务传播性
- 数据库连接
- 动态代理
事务传播性
![](https://img.haomeiwen.com/i3804367/7e64e17bab644535.png)
spring事务管理的核心在于PlatformTransactionManager, PlatformTransactionManager中的getTransaction/commit方法提供了spring事务管理的基石。
PlatformTransactionManager的基本逻辑由AbstractPlatformTransactionManager实现
看看AbstractPlatformTransactionManager.getTransaction
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
// 模板方法,由子类实现
Object transaction = doGetTransaction();
// 已存在事务
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// PROPAGATION_MANDATORY:如果当前没有事务,就抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创建一个TransactionStatus
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 开启一个事务
doBegin(transaction, definition);
// 绑定事务信息到当前线程中
prepareSynchronization(status, definition);
return status;
}
...
}
else {
// SUPPORTS:如果当前没有事务,就以非事务方式执行
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
看看已存在事务时,不同事务传播性的处理方案
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// PROPAGATION_NEVER: 已有事务就抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// PROPAGATION_NOT_SUPPORTED: 已有事务就挂起事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// PROPAGATION_REQUIRES_NEW: 每次都要一个新事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
...
}
// PROPAGATION_NESTED: 嵌套事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
...
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
数据库连接
doGetTransaction负责创建Transaction,由子类实现,看看DataSourceTransactionManager.doGetTransaction
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed()); // 设置Savepoint
// 获取数据库连接
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
TransactionSynchronizationManager.getResource主要调用doGetResource方法
private static Object doGetResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
...
return value;
}
主要这里的 resources属性是ThreadLocal<Map<Object, Object>> resources
,spring jdbc就是通过ThreadLocal上下文存储每一线程上,每一个dataSource和对应的Connection。
doBegin负责开启数据库事务,它也是模板方法,由DataSourceTransactionManager实现
protected void doBegin(Object transaction, TransactionDefinition definition) {
// 获取数据库连接
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
// 创建数据库连接
if (txObject.getConnectionHolder() == null ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.dataSource.getConnection();
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
// 数据库操作
...
// 绑定数据库连接
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
}
catch (Throwable ex) {
...
}
}
从doBegin可以看到,如果当前上下文中没有Connection,会创建一个Connection,最后会绑定到上下文中。
动态代理
我们都知道,只要在spring 配置中开启事务管理tx:annotation-driven
, 然后在spring bean的方法上添加注解@Transactional
, spring就会为我们管理事务,那么spring如何做到的呢?
从TxNamespaceHandler看到AnnotationDrivenBeanDefinitionParser解析标签annotation-driven
public BeanDefinition parse(Element element, ParserContext parserContext) {
registerTransactionalEventListenerFactory(parserContext);
String mode = element.getAttribute("mode");
if ("aspectj".equals(mode)) {
// mode="aspectj"
registerTransactionAspect(element, parserContext);
}
else {
// mode="proxy"
AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
}
return null;
}
下面只关注mode为"proxy"的处理,看看AopAutoProxyConfigurer的configureAutoProxyCreator
public static void configureAutoProxyCreator(Element element, ParserContext parserContext) {
AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);
String txAdvisorBeanName = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME;
if (!parserContext.getRegistry().containsBeanDefinition(txAdvisorBeanName)) {
Object eleSource = parserContext.extractSource(element);
// Create the TransactionAttributeSource definition.
RootBeanDefinition sourceDef = new RootBeanDefinition(
"org.springframework.transaction.annotation.AnnotationTransactionAttributeSource");
...
// Create the TransactionInterceptor definition.
RootBeanDefinition interceptorDef = new RootBeanDefinition(TransactionInterceptor.class);
...
// Create the TransactionAttributeSourceAdvisor definition.
RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);
...
}
}
代码很多,但主要是创建了TransactionAttributeSource/TransactionInterceptor/TransactionAttributeSourceAdvisor definition。
注意,AopNamespaceUtils.registerAutoProxyCreatorIfNecessary
会调用AopConfigUtils.registerAutoProxyCreatorIfNecessary
public static BeanDefinition registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry, Object source) {
return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
}
InfrastructureAdvisorAutoProxyCreator继承了AbstractAdvisorAutoProxyCreator,正是它负责完成动态代理工作。
看看关键的advisor类BeanFactoryTransactionAttributeSourceAdvisor,它聚合了切入点TransactionAttributeSourcePointcut,
public boolean matches(Method method, Class<?> targetClass) {
if (TransactionalProxy.class.isAssignableFrom(targetClass)) {
return false;
}
TransactionAttributeSource tas = getTransactionAttributeSource();
return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}
getTransactionAttribute获取目标方法注解Transactional中的属性,如timeout/readOnly等。
拦截类TransactionInterceptor实现了MethodInterceptor,它负责实际的事务管理工作。 核心方法为
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
//声明式事务处理
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
commitTransactionAfterReturning(txInfo);
return retVal;
}
//编程式事务处理
else {
...
}
}
completeTransactionAfterThrowing会根据异常类型处理。
protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.hasTransaction()) {
// 支持回滚的异常,进行回滚操作
if (txInfo.transactionAttribute.rollbackOn(ex)) {
try {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
...
}
// 不支持回滚的异常,继续提交
else {
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
...
}
}
}
网友评论