美文网首页javaWeb学习工作生活
改造Spring事务实现Spring Cloud分布式事务

改造Spring事务实现Spring Cloud分布式事务

作者: giafei | 来源:发表于2019-07-04 18:08 被阅读34次

    网上很多资料或组件都是完全重新实现事务管理,而本文选择改造Spring事务的提交过程,这样在即简化了集成成本,又不会因事务管理代码出现Bug导致系统没有满足事务特性(ACID),即如果在一次微服务的调用过程中所有的Spring的事务都提交或撤销了,整个系统也满足ACID。

    说起事务本身就挺麻烦的,在单点应用中事务由数据库来实现,但却是个业务层概念。Spring通过线程存储共享事务对象实现了业务层事务。网上常见的分布式事务协议有2PC、3PC与TCC,其中TCC对业务层侵入性强,对业务设计改动比较大,不符合前置设想。
    2PC/3PC协议是资源层协议,比较适用于数据库直接互相沟通事务,我们可以借助于Spring的业务层事务方式实现业务层的分布式事务。3PC第一个阶段要求所有参与者达成一致后再进行事务操作,即在一开始要求所有参与者都已就位,而微服务的链式调用是同步的,参与者是逐步增加的,所以3PC在微服务场景基本不可实现。本文选择实现2PC协议,2PC协议也是异步的,但2PC不要求所有参与者一开始就位,只要在阶段2之前就位即可,同步可以看做是2PC的异步的一种特殊的情况。

    一致性与原子性

    一致性这个词在各种资料中算是歧义特别大了,原子性反而出奇的统一。

    原子性目前有两种意思,一个是多线程中操作是不可分的,另一个是事务中所有的操作要么都成功,要么全都失败。这两种意思语境很少重复,所以提到原子性基本不会有歧义。

    一致性有分布式系统的一致性、事务的一致性、一致性Hash等解释,事务的一致性简单来说就是事务前后数据库的完整性约束不能破坏,它是由事务的其他3个特性共同保证的。
    分布式系统中的一致性一般指的是CAP理论中的C,它的定义是所有节点的数据是相同的,又有人把这个定义称为一致性状态,而在分布式系统中协商达成一致性状态的协议算法为一致性协议,2PC就是一个一致性协议。

    当事务和分布式系统有交集时,一致性这个词就彻底乱套了,甚至和原子性搅在了一起,网上铺天盖地的最终一致性的资料很多其实是事务的最终原子性。

    本文中一致性从CAP理论取义为 所有节点的数据是相同的,英文为"all nodes see the same data at the same time"

    因为分布式系统天然有事务的隔离性,事务的持久性由数据库保证,因此我们只要在Spring事务基础上实现分布式事务的原子性,就进而实现了分布式事务的一致性。

    即,只要在数据库事务的基础上实现分布式事务的原子性,就实现了分布式事务的ACID4个特性。因此在后文中用原子性指代分布式事务的整体4个特性。

    因Spring的事务使用导致的事务一致性错误不在本文讨论范围内。

    对于2PC协议,一致性状态为所有节点对事务的提交或回滚达成一致,一致性的结果是事务的原子性。因此2PC协议的一致性和分布式事务的一致性可以看做是等价的。

    本文实现与2PC协议的差异性

    2PC的各个参与者是平等,互相隔离的。但微服务不是,微服务是有调用与被调用关系的,被调用者的异常会天然返回给调用者并自然引起调用者的连锁回滚。
    2PC协议要求服务恢复后能回到事务中,但这个能力只有数据库自己有,Spring的事务提交前宕机,数据库会自己回滚。因此本文实现的2PC协议会在任何一个参与者宕机后整体回滚,此外还有在一致提交时,业务宕机也可能导致的原子性失败。
    由于无法保证业务宕机恢复后重试事务的上下文与宕机前一致,所以这个问题不能简单的在技术上解决,必须要业务配合。比如订单、仓储业务,当用户支付订单后仓储服务提交事务前宕机,仓储服务恢复后并不确定还有没有货,如果没货,只能在业务中退款。

    协调者宕机异常

    协调者正常的前提下,参与者宕机,我们可以简单的回滚事务才达成一致性。如果协调者宕机且没有Fail Over,这种情况需要详细分析。
    如果协调者在阶段一宕机,由于微服务的同步性,整个调用栈会在某个业务阻塞等待进行阶段一投票,与2PC协议相同。即协调者在阶段一宕机会导致业务阻塞但不会造成不一致。
    如果协调者阶段二宕机,此时同步调用已经完成,等待提交的消息。如果等待超时,也会导致参与者回滚。所以本文的实现,在阶段二协调者和参与者任一宕机都可能导致不一致。


    协调者

    本文采用Redis充当协调者,但Redis主从切换可能导致数据不一致,上一节中讨论阶段一协调者宕机不会造成不一致,我们要在设计时弥补这一可能性。

    协调者协议实现

    设计的存储结构如下

    数据类型 描述
    cloud-transaction/事务ID/state Long 事务存在标志,
    值为0时表示事务异常,后续的参与者自行回滚
    值为1表示阶段1,值为2表示阶段2
    cloud-transaction/事务ID/result String 事务的最终结果,是COMMIT还是ROLLBACK
    cloud-transaction/事务ID/notice String 协调者向参与者的通知订阅KEY
    cloud-transaction/事务ID/vote Hash[参与者,状态] 投票记录,
    HKEY为参与者ID,HValue为投票状态
    cloud-transaction/事务ID/ack Hash[参与者,状态] 执行记录,
    HKEY为参与者ID,HValue为执行状态

    正常流程为:

    1. 业务的开始方为watchdog,负责创建事务ID,写入state为1,并在微服务之间传递事务ID
    2. 判断state值是否为1,若是将Spring事务的执行结果写入投票记录,否则自行ROLLBACK
    3. 调用栈返回至watchdog处,此时可以获取到调用栈是否有异常,同时与投票记录互补
    4. watchdog写state为2,同时将结果写入result
    5. watchdog在notice上广播结果,并设置各键值的过期时间
    6. 各参与者收到通知后执行结果,并写入执行记录

    为防范1-3步(阶段1)Redis或参与者异常,在步骤4做如下设定:

    1. 调用栈或投票记录只要有一个为ROLLBACK,整体ROLLBACK
    2. state必须为1或键不存在

    即如果Redis异常未能恢复或参与者业务异常,调用栈必然异常。反之调用栈无异常,则表明无业务异常并且Redis最终正常(最终一致性的最终),可通过校验state值判断vote值的可信度。当参与者超时后可以从state和result综合判断是否应该提交事务。有的Redis设置Master宕机时,Slave可以读数据,这种设置最终会在各事务超时回滚。

    这么多问题这东西还能用么

    标准2PC的协调者是没有存储的,宕机再恢复需要从各参与者获取事务数据。本实现用的是Redis,Redis还是挺可靠的,还可以主从顶一顶。如果系统压力没那么大,负载并不高,用起来是没啥问题的。并且本实现还支持事务重用,即当微服务调用兜兜转转又回来的时候,事务是重用的,可以解决隔离性问题。

    为什么要造轮子

    • 纸上得来终觉浅
    • 找工作简历投出去就没影了也很苦恼啊

    关键代码

    在微服务之间共享事务ID

    通过RequestInterceptor为请求微服务之间的调用增加一个http头,这样可以方便的传递事务ID。同时如果事务创建时没有这个http头,那么当前业务就处在微服务栈的栈底

    @Component
    public class CloudTransactionIdFeignInterceptor implements RequestInterceptor {
    
        private static final String REQUEST_ATTRIBUTE_TRANSACTION_ID = "X-TRANSACTION-ID";
        private static final String REQUEST_HEADER_TRANSACTION_ID = "X-TRANSACTION-ID";
    
        public static HttpServletRequest getCurrentRequest() {
            ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.currentRequestAttributes();
            return attributes.getRequest();
        }
    
        @Override
        public void apply(RequestTemplate template) {
            try {
                HttpServletRequest currentRequest = getCurrentRequest();
                Object attribute = currentRequest.getAttribute(REQUEST_ATTRIBUTE_TRANSACTION_ID);
                if (attribute != null) {
                    template.header(REQUEST_HEADER_TRANSACTION_ID, attribute.toString());
                } else {
                    String header = currentRequest.getHeader(REQUEST_HEADER_TRANSACTION_ID);
                    if (StringUtils.hasText(header)) {
                        template.header(REQUEST_HEADER_TRANSACTION_ID, header);
                    }
                }
            } catch (Throwable e) {
                //不能影响正常的流程运行
            }
        }
        //获取事务ID
        public static String getCloudTransactionId() {
            HttpServletRequest currentRequest = getCurrentRequest();
            Object attribute = currentRequest.getAttribute(REQUEST_ATTRIBUTE_TRANSACTION_ID);
            if (attribute != null)
                return attribute.toString();
    
            return currentRequest.getHeader(REQUEST_HEADER_TRANSACTION_ID);
        }
        //广播事务ID
        public static void broadcastCloudTransactionId(String id) {
            getCurrentRequest().setAttribute(REQUEST_ATTRIBUTE_TRANSACTION_ID, id);
        }
    }
    

    拦截Spring事务的提交

    Spring通过PlatformTransactionManager Bean对象管理事务,我们实现一个Wrapper把Spring的Bean包起来,拦截关键调用。具体代码见类 WrappedDataSourceTransactionManager

    对Spring 事务的 ThreadLocal数据的处理

    private String doMoveThreadData(String id) {
    
            TransactionThreadData data = new TransactionThreadData();
            data.resources = new HashMap<>(TransactionSynchronizationManager.getResourceMap());
            data.synchronizations = TransactionSynchronizationManager.getSynchronizations();
            data.currentTransactionName = TransactionSynchronizationManager.getCurrentTransactionName();
            data.currentTransactionReadOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
            data.currentTransactionIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            data.actualTransactionActive = TransactionSynchronizationManager.isActualTransactionActive();
    
            for (Object key : data.resources.keySet()) {
                TransactionSynchronizationManager.unbindResource(key);
            }
    
            TransactionSynchronizationManager.clear();
    
            dataMap.put(id, data);
    
            return id;
        }
    
        public boolean restoreThreadData(String id) {
            TransactionThreadData data = dataMap.get(id);
            dataMap.remove(id);
    
            if (data == null)
                return false;
    
            for (Map.Entry<Object, Object> entry : data.resources.entrySet()) {
                TransactionSynchronizationManager.bindResource(entry.getKey(), entry.getValue());
            }
    
            TransactionSynchronizationManager.initSynchronization();
            for (TransactionSynchronization synchronization : data.synchronizations) {
                TransactionSynchronizationManager.registerSynchronization(synchronization);
            }
    
            TransactionSynchronizationManager.setCurrentTransactionName(data.currentTransactionName);
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(data.currentTransactionReadOnly);
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(data.currentTransactionIsolationLevel);
            TransactionSynchronizationManager.setActualTransactionActive(data.actualTransactionActive);
    
            return true;
        }
    

    完整代码见 类 TransactionThreadDataContainer

    微服务饶了一圈又回来,继续用未提交的事务

    Spring事务是基于ThreadLocal的,只要把ThreadLocal搬过来就可以了

        public static class LoadBalancerFeignClientWrapper implements Client {
            private LoadBalancerFeignClient wrapped;
            private WrappedDataSourceTransactionManager transactionManager;
    
            public LoadBalancerFeignClientWrapper(Client delegate,
                                           CachingSpringLoadBalancerFactory lbClientFactory,
                                           SpringClientFactory clientFactory, PlatformTransactionManager transactionManager) {
                wrapped = new LoadBalancerFeignClient(delegate, lbClientFactory, clientFactory);
                this.transactionManager = (WrappedDataSourceTransactionManager)transactionManager;
            }
    
            @Override
            public Response execute(Request request, Request.Options options) throws IOException {
                try {
                     //这里是调用其他微服务,调用的时候把TLS剥离出来
                     //调用完毕或异常时再恢复回来
                     //剥离后的数据就可以安放到其他的线程,达到重用事务的目的
                    transactionManager.stealTransactionThreadData();
                    Response response = wrapped.execute(request, options);
                    transactionManager.returnTransactionThreadData();
    
                    return response;
                } catch (Throwable e) {
                    transactionManager.returnTransactionThreadData();
                    throw e;
                }
            }
        }
    

    超时

    超时使用的是reactor-coreMono,详细见Reactor 3 Reference Guide

            //超时设置
            Mono.delay(Duration.ofMillis(maxWaitTime))
                    .map(t -> onTransactionTimeout(transactionId))
                    .publishOn(Schedulers.parallel())
                    .subscribe();
    

    用的不是timeout而是delay,所以onTransactionTimeout一定会执行,检测事务的结果。

    阶段2结果通告

    用的时Redis的订阅发布功能。

    测试

    代码见 https://github.com/giafei/cloud-transaction

    相关文章

      网友评论

        本文标题:改造Spring事务实现Spring Cloud分布式事务

        本文链接:https://www.haomeiwen.com/subject/byafhctx.html