源码分析
本地事务部分依赖于spring事务
@TxTransactional
注解和spring自带的Transactional
注解没联系,是手动调用TransactionManager
的一些方法来使用spring的事务的。具体是在拦截@TxTransactional
注解时,调用getTransaction
方法中,其内部调用doGetTransaction
时,若使用jdbc会将AutoCommit置为false,执行切面方法,然后阻塞。这里就是两阶段提交中的第一阶段,这时候并没有真正的提交数据库,需要等待协调者的反馈。然后再根据协调者即raincat-manager返回的状态,手动调用commit
或者rollback
方法来使用spring的事务。
事务的协调者是raincat-manager
raincat-manager也作为EurekaServer兼职了服务注册的作用。
事务协调者和事务参与者通信是通过netty
事务参与者在发起事务、加入事务组、预提交、事务完成、回滚等操作时都会通过netty-handler发送心跳数据。协调者即raincat-manager在收到心跳包后会判断行为来进行协调的操作,并回发心跳来决定参与者的下一步操作。
事务组的管理部分全部在协调者即raincat-manager。比如说,事务发起者在执行本地业务前会发送CREATE_GROUP的心跳给协调者,协调者在收到心跳包后会创建事务组,若是创建失败则会通知事务发起者不必执行本地业务,若是创建成功则通知事务发起者可以进行下一步操作。加入事务组之类的逻辑同理。
协调者根据事务组来协调事务所有的参与者
事务发起者会在业务代码执行前生成一个事务组Id,将其放入ThreadLocal中,在使用rpc时,因为是同一个线程,可以从ThreadLocal中获取事务组Id,再将事务组Id放进请求头,调用其他服务时在rpc的提供者即事务的参与者就可以获取同一个事务组Id,这样就可以将发起者和参与者放进同一个事务组,事务管理器会协调事务组的整体提交。
分布式事务发起者在使用rpc组件调用其他服务时,会将生成的事务Id放在请求头中,主要代码是在RpcMediator
类中,通过transmit
方法添加,通过aquire
方法获取。不同的rpc放入的位置不同,若是feign就是通过RequestInterceprot
放在requestTemplate::header
,若是dubbo就是通过Filter
放在RpcContext.getContext()::setAttachment
。在服务提供端会在调用业务代码前从请求头中取出,具体代码是在各个rpc对TxTransactionInterceptor
的实现中。
通过是否存在txGroupId来判断当前服务是事务的发起者还是事务的参与者
因为txGroupId是事务发起者创建的,因此在请求事务发起者时,请求头中不存在txGroupId,而发起者在调用参与者时将txGroupId放进了请求头。具体的判断代码在TxTransactionFactoryServiceImpl.factoryOf(txTransactionInfo)
。
参与者使用ReentrantLock和Condition来阻塞自己
参与者在发送心跳包后会阻塞自己,因为需要等待协调者的反馈。得到协调者的反馈后会根据回写的心跳包中的key找到对应的锁,解锁后继续执行。锁的实现代码是在BlockTask
类中,锁的使用代码主要是在NettyClientMessageHandler.sendTxManagerMessage(heartBeat)
事务参与者的提交和回滚需要获取到对应的Channel
事务发起者在发起事务或事务参与者在加入事务组时会将自己的remoteAddress保存起来。事务协调者在发起提交或者回滚时会根据上面保存的remoteAddress找到对应的Channel,代码在SocketManager.getChannelByModelName(name)
,找到对应的Channel后就会向对应的参与者的Channel发送心跳包。
拦截器部分
因为要在调用业务方法前找到当前操作所在的事务组,并将当前操作加入事务组,因此在最外面对于TxTransaction
的Aspect拦截部分就做了rpc的抽象,要根据特定的rpc找到请求头中的事务组。
不同的rpc实现通过spi调用。
要分清客户端和服务端
客户端只连一个服务端,因此在NetyClientMessageHandler
中对当前连接的服务端的ctx做了保存,在和服务端主动通信时直接使用这个ctx来write。
源码疑问
在org.dromara.raincat.core.service.impl.TxManagerLocator
中存储对象的部分使用的是AtomicReference
。这里的修改部分又加了锁,如果不用原子类,直接用volatile不可以吗。
网友评论