分布式事务

作者: spilledyear | 来源:发表于2018-10-12 17:03 被阅读48次

    分布式事务
    分布式事务

    数据库事务

    断电了,该怎么处理?通过日志的方式!在执行事务的时候数据库首先会记录下这个事务的redo操作日志,然后才开始真正操作数据库,在操作之前首先会把日志文件写入磁盘,那么当突然断电的时候,即使操作没有完成,在重新启动数据库时候,数据库会根据当前数据的情况进行undo回滚或者是redo前滚,这样就保证了数据的强一致性。

    CAP定理

    • 一致性(Consistency) : 客户端知道一系列的操作都会同时发生(生效);
    • 可用性(Availability) : 每个操作都必须以可预期的响应结束;
    • 分区容错性(Partition tolerance) : 即使出现单个组件无法可用,操作依然可以完成;

    在分布式系统中,在任何数据库设计中,一个Web应用至多只能同时支持上面的两个属性。显然,任何横向扩展策略都要依赖于数据分区。因此,设计人员必须在一致性与可用性之间做出选择。

    一般情况下往往为了可用性和分区容错性,忍痛放弃强一致支持,转而追求最终一致性。大部分业务场景下,是可以接受短暂的不一致的。

    BASE理论

    在分布式系统中,我们往往追求的是可用性,它的重要程序比一致性要高,那么如何实现高可用性呢? 前人已经给我们提出来了另外一个理论,就是BASE理论,它是用来对CAP定理进行进一步扩充的。BASE理论指的是:

    • Basically Available(基本可用)
    • Soft state(软状态)
    • Eventually consistent(最终一致性)

    BASE理论是对CAP中的一致性和可用性进行一个权衡的结果,理论的核心思想就是:我们无法做到强一致,但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性(Eventual consistency)。

    分布式事务

    两段式提交(2PC)

    两阶段提交就是使用XA协议的原理:

    • 第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是否可以提交.
    • 第二阶段:事务协调器要求每个数据库提交数据。

    其中,如果有任何一个数据库否决此次提交,那么所有数据库都会被要求回滚它们在此事务中的那部分信息。在CAP中,影响可用性。

    优点: 尽量保证了数据的强一致,适合对数据强一致要求很高的关键领域(其实也不能100%保证强一致)。
    缺点: 实现复杂,牺牲了可用性,对性能影响较大,不适合高并发高性能场景,如果分布式系统跨接口调用,目前 .NET 界还没有实现方案。

    补偿事务(TCC)

    CC 其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。它分为三个阶段:

    • Try阶段:主要是对业务系统做检测及资源预留;
    • Confirm阶段:主要是对业务系统做确认提交,Try阶段执行成功并开始执行Confirm阶段时,默认Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。
    • Cancel阶段:主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

    举个例子,假入 Bob 要向 Smith 转账,思路大概是:
    我们有一个本地方法,里面依次调用
    1、首先在 Try 阶段,要先调用远程接口把 Smith 和 Bob 的钱给冻结起来。
    2、在 Confirm 阶段,执行远程调用的转账的操作,转账成功进行解冻。
    3、如果第2步执行成功,那么转账成功,如果第二步执行失败,则调用远程冻结接口对应的解冻方法 (Cancel)。

    优点: 跟2PC比起来,实现以及流程相对简单了一些,但数据的一致性比2PC也要差一些。
    缺点: 缺点还是比较明显的,在2,3步中都有可能失败。TCC属于应用层的一种补偿方式,所以需要程序员在实现的时候多写很多补偿的代码,在一些场景中,一些业务流程可能用TCC不太好定义及处理。

    本地消息表(异步确保)

    本地消息表这种实现方式应该是业界使用最多的,其核心思想是将分布式事务拆分成本地事务进行处理,这种思路是来源于ebay。
    消息生产方,需要额外建一个消息表,并记录消息发送状态。消息表和业务数据要在一个事务里提交,也就是说他们要在一个数据库里面。然后消息会经过MQ发送到消息的消费方。如果消息发送失败,会进行重试发送。

    消息消费方,需要处理这个消息,并完成自己的业务逻辑。此时如果本地事务处理成功,表明已经处理成功了,如果处理失败,那么就会重试执行。如果是业务上面的失败,可以给生产方发送一个业务补偿消息,通知生产方进行回滚等操作。

    生产方和消费方定时扫描本地消息表,把还没处理完成的消息或者失败的消息再发送一遍。如果有靠谱的自动对账补账逻辑,这种方案还是非常实用的。

    优点: 一种非常经典的实现,避免了分布式事务,实现了最终一致性。在 .NET中 有现成的解决方案。
    缺点: 消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。

    MQ事务消息

    有一些第三方的MQ是支持事务消息的,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如 RabbitMQ 和 Kafka 都不支持。

    以阿里的 RocketMQ 中间件为例,其思路大致为:
    第一阶段Prepared消息,会拿到消息的地址。
    第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。

    Sagas事务模型

    该模型其核心思想就是拆分分布式系统中的长事务为多个短事务,或者叫多个本地事务,然后由 Sagas 工作流引擎负责协调,如果整个流程正常结束,那么就算是业务成功完成,如果在这过程中实现失败,那么Sagas工作流引擎就会以相反的顺序调用补偿操作,重新进行业务回滚。

    比如我们一次关于购买旅游套餐业务操作涉及到三个操作,他们分别是预定车辆,预定宾馆,预定机票,他们分别属于三个不同的远程接口。可能从我们程序的角度来说他们不属于一个事务,但是从业务角度来说是属于同一个事务的。

    案例

    以上只是介绍了分布式事务的几种不同解决方案,但上面只是笼统的介绍,每一种方案都有很多的细节,这里以本地消息表这种方式进行详细讲解,并提供案例以便更深入的了解。

    假设现在有三个系统:系统A、消息中间件M、系统B,在A 和 B 之间存在分布式事务的需求。
    根据分布式事务这篇文章上方案二的理解,大概是这么个流程:

    1. A向M 发送一条消息,告诉M它准备干活了。
    2. M向A回应一条消息,告诉A说:我收到你的消息了,你干吧!(并不一定就真的要回应一条消息给A,可以通过判断等方式达到目的)。
    3. A开始干活,即处理该分布式事务中的A部分业务。这里要分两种情况考虑:
      (1) A处理业务的过程中出项异常,干活失败。
      (2) A处理业务的过程中表现良好,干活成功。
      我们要知道的是,不管是哪种情况,A都需要向M发送一条指令:如果A干活失败,A就Rollback,然后向M发送一条Rollback指令,这时候M就会将这条消息从消息中间件中删除,这种情况就不需要和B打交道了,整个流程就相当于结束了;如果A干活成功,就向M发送一条Confirm指令,可以认为这个指令的作用就是改变消息的状态,比如改成Confirm等,只有消息是这个状态,M才能向B投递消息,这是后话,不多说了。
    4. 如果A发送的是Confirm指令,M就向B投递该消息,B收到消息后,就开始干活了。
    5. 如果B干活成功,就向M回应,M这时候可以将这条消息删除或者作废,至此整个分布式事务完成;如果B干活失败,可能就需要调用A的回滚接口,上面没有讨论这种情况,应该挺麻烦吧?

    上这种情况,我们都是假设A B 和 M之间不会丢失消息,如果在上面的 3 、5 步骤中发生丢失消息的情况就会出现问题,针对以上情况,有如下解决方案:

    • 针对步骤3
      当M收到一条事务型消息后便开始计时,如果到了超时时间也没收到系统A发来的Confirm或Rollback指令的话,就会主动调用系统A提供的事务询问接口询问该系统目前的状态。该接口会返回三种结果:
      提交:若获得的状态是“提交”,则将该消息投递给系统B。
      回滚:若获得的状态是“回滚”,则直接将条消息丢弃。
      处理中:若获得的状态是“处理中”,则继续等待。

    • 针对步骤5
      M在等待确认应答超时之后就会重新向B投递消息,直到B返回消费成功响应为止。当然,一般M可以设置消息重试的次数和时间间隔,比如:当第一次投递失败后,每隔五分钟重试一次,一共重试3次。如果重试3次之后仍然投递失败,那么这条消息就需要人工干预。

    准备

    系统A、系统B、消息中间件使用RabbitMQ、测试工具Jmeter。

    不过这个案例并不是完全按照上面说到的那样,主要区别在于:

    1. A向M发一条消息,并没有写一条消息到RabbitMQ,而仅仅是向event里面写了一条记录
    2. 为了防止A向M提交Confirm和Cancle指令时失败,M需要定时去event表里查看哪些消息创建时间大于3s并且还是unfinished状态。然后再根据这些消息的uuid去A的相关业务表查找记录,如果找到了,就置为Confirm,没找到就置为Cancel,然后再根据这两种状态分别进行不同的操作,Cancel就将消息删除;Confirm就发送一条消息到MQ,从而触发系统B的逻辑。
    3. 当B中的业务代码出现问题时,A并没有提供相应的回滚接口。
    消息表
    CREATE TABLE `event` (
      `uuid` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
      `status` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '状态',
      `create_time` bigint(20) NOT NULL COMMENT 'event的创建时间',
      `publisher_service_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '发布事件的服务id',
      `type` varchar(30) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'event的类型',
      `json_messages` text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '要发送给消息队列的消息集合',
      PRIMARY KEY (`uuid`) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    系统A
    public boolean doA(TradeDto tradeDto) throws Exception {
        //1. 生成全局uuid
        String uuid = EventTools.generateUuid();
        //2. 创建消息
        Event event = new Event();
        event.setUuid(uuid);
        //本服务的服务名
        event.setPublisherServiceId("hap-event-demo-trade-service");
        //message包含消息队列的队列名和数据
        //payload必须含有uuid字段!!!
        TradeAmqp payload = new TradeAmqp(uuid, tradeDto.getAmount(), tradeDto.getBuyerId(), tradeDto.getSellerId());
        Event.Message message = new Event.Message("trade2", mapper.writeValueAsString(payload));
        event.setMessages(Collections.singletonList(message));
        event.setType(EVENT_TYPE_TRADE);
        
        // 消息发送之后回调执行业务逻辑
        boolean result =  eventTemplate.execute(() -> {
            TradeRecord record = new TradeRecord();
            record.setBuyerId(tradeDto.getBuyerId());
            record.setSellerId(tradeDto.getSellerId());
            record.setUuid(uuid);
            record.setCreateTime(new Timestamp(System.currentTimeMillis()));
            if (tradeRecordMapper.insert(record) != 1){
                throw new RuntimeException("init event failed");
            }
        },event);
        if (!result){
            throw new RuntimeException("error.trade.create");
        }
        return true;
    }
    
    
    public boolean execute(EventCallback eventCallback, Event event){
        // 开启事物
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setPropagationBehavior(DefaultTransactionDefinition.PROPAGATION_REQUIRED);
        TransactionStatus status = transactionManager.getTransaction(def);
        String eventId;
        // 发送事件创建消息
        try {
            // 向event表里写一条记录
            eventId = eventClient.createEvent(event);
        }catch (Exception e){
            logger.info("create event failed {}",e);
            return false;
        }
        if(eventId == null){
            return false;
        }
        try {
            // 执行具体业务,通过Feign调用M的接口
            doSomething(eventCallback);
            // 提交事务
            transactionManager.commit(status);
        } catch (Exception e) {
            logger.info("execute failed {}",e);
            // 异常回滚
            transactionManager.rollback(status);
            try{
                // 从event表里面删除消息,通过Feign调用M的接口
                eventClient.cancelEvent(eventId);
            }catch (Exception e1){
                logger.info("cancel event failed {}",e1);
            }
            return false;
        }
        try{
            // 该表event表里的消息状态为confirm,同时向RabbitMQ发送一条消息,通过Feign调用M的接口
            eventClient.confirmEvent(eventId);
        }catch (Exception e1){
            logger.info("confirm event failed {}",e1);
        }
        return true;
    
    }
    
    
    // Feign服务调用
    @FeignClient(value = "hap-event-store-service")
    public interface EventClient {
        @PostMapping("/v1/events")
        String createEvent(@RequestBody Event event);
    
        @PutMapping("/v1/events/{eventId}/confirm")
        void confirmEvent(@PathVariable("eventId") String eventId);
    
        @PutMapping("/v1/events/{eventId}/cancel")
        void cancelEvent(@PathVariable("eventId") String eventId);
    }
    
    消息中间件M
    @Service
    @Transactional(noRollbackFor = EventNotExistException.class)
    public class EventServiceImpl extends BaseServiceImpl<Event> implements EventService {
    
      // 向event表里写一条消息,该消息的初始状态为 unfinished
      public String createEvent(Event event) {
        beanTools.setJsonMessages(event);
        event.setStatus(Event.STATUS_INIT);
        event.setCreateTime(System.currentTimeMillis());
        if (insert(event) != 1) {
          throw new HapException("error.event.create");
        }
        return event.getUuid();
      }
    
      // 系统A提交事务之后,改变event表里的消息状态为confirm,并向RabbitMQ发送一条消息
      public void confirmEvent(String uuid) {
        Event event = selectByPrimaryKey(uuid);
        if (event == null) {
          throw new EventNotExistException(uuid);
        }
        Event temp = new Event();
        temp.setUuid(uuid);
        temp.setStatus(Event.STATUS_CONFIRM);
        if (updateByPrimaryKeySelective(temp) != 1) {
          throw new HapException("error.event.confirm");
        }
    
        beanTools.setMessageList(event);
        // 这里仅仅是向队列里面插入一个元素,真正向RabbitMQ发送消息的逻辑是通过定时任务完成得
        MsgPublishExecuter.getQueue().offer(event);
      }
    
       // 从event表删除消息
      public void cancelEvent(String uuid) {
         if (deleteByPrimaryKey(uuid) != 1) {
          if (selectByPrimaryKey(uuid) == null) {
            throw new EventNotExistException(uuid);
          } else {
            throw new HapException("error.event.cancel");
          }
        }
      }
    
    }
    

    有关于MsgPublishExecuter代码

    @Component
    public class MsgPublishExecuter {
    
      private static final long QUERY_INTERVAL =  100;
    
      // 该队列线程安全,FIFO
      private static ConcurrentLinkedQueue<Event> queue = new ConcurrentLinkedQueue<>();
    
      private MsgPublishExecTaskImpl msgPublishExecTask;
    
      @Autowired
      public MsgPublishExecuter(MsgPublishExecTaskImpl msgPublishExecTask) {
        this.msgPublishExecTask = msgPublishExecTask;
      }
    
      static ConcurrentLinkedQueue<Event> getQueue() {
        return queue;
      }
      
      // 每100毫秒执行一次
      @Scheduled(fixedDelay = QUERY_INTERVAL)
      public void scheduledQueryJob() {
        while (!queue.isEmpty()) {
          // 发送消息到RabbitMQ,最后通过RabbitTemplate发送
          msgPublishExecTask.publishMsg(queue.poll());
        }
      }
    }
    

    有关于向RabbitMQ发送消息和防系统A到M消息丢失措施

    @Configuration
    @EnableKafka
    public class EventConfig {
    
      private static final int MAX_PUBLISH_THREAD_NUM =  2;
      private static final int MAX_QUERY_THREAD_NUM =  2;
      public static final String PUBLISHER_STATUS_FINISHED = "finished";
      public static final String PUBLISHER_STATUS_CANCEL = "canceled";
    
      // 该线程池负责维护 发送消息到MQ的线程
      @Bean(name = "publish-executor")
      public AsyncTaskExecutor publishTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("publish-executor");
        executor.setMaxPoolSize(MAX_PUBLISH_THREAD_NUM);
        return executor;
      }
    
      // 该线程池负责维护 根据消息向上游系统查询消息状态的线程,主要是从msg_record表查询
      @Bean(name = "queryStatus-executor")
      public AsyncTaskExecutor queryStatusTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("queryStatus-executor");
        executor.setMaxPoolSize(MAX_QUERY_THREAD_NUM);
        return executor;
      }
    
    }
    
    
    
    @EnableBinding
    @Component
    public class MsgPublishExecTaskImpl implements PublishMsgListener{
    
      private PublishEventService publishEventService;
    
      @Autowired
      public MsgPublishExecTaskImpl(PublishEventService publishEventService) {
        this.publishEventService = publishEventService;
      }
    
       // 异步执行,publish-executor是一个AsyncTaskExecutor对象
      @Async("publish-executor")
      public void publishMsg(Event event) {
        publishEventService.publishMsg (event,this);
      }
    
      @Override
      public void onFailure(Event event) {
        MsgPublishExecuter.getQueue().offer(event);
      }
    }
    
    
    // 这个其实就相当于是上面提到的补救措施,即系统A向M发送消息Confirm和Cancel指令失败的情况
    @Async("queryStatus-executor")
    void execQuery(Event event) {
    StatusQueryExecuter.eventNum.decrementAndGet();
    logger.info("execQuery event: {}", event);
    EventRecord result = null;
    try {
        StatusQuery query= HystrixFeign
                .builder()
                .client(client)
                .decoder(new JsonDecoder())
                .target(StatusQuery.class, "http://" + event.getPublisherServiceId());
        result = query.getEventRecord(event.getUuid(), event.getType());
    } catch (Exception e) {
        logger.warn("execQuery error, event.publisherServiceId: {} event.uuid: {} event.type: {}",
                event.getPublisherServiceId(), event.getUuid(), event.getType());
    }
    if (result == null) {
        return;
    }
    
    logger.info("execQuery result: {}", result);
    if (EventConfig.PUBLISHER_STATUS_FINISHED.equals(result.getStatus())) {
        eventService.confirmEvent(event.getUuid());
    }
    if (EventConfig.PUBLISHER_STATUS_CANCEL.equals(result.getStatus())) {
        eventService.cancelEvent(event.getUuid());
    }
    }
    
    系统B

    系统B主要是监听RabbitMQ中的消息,要做的也很简单, 每条消息中包含了买方和卖方的用户ID、金额,每次交易向买方账户见钱、卖方账户加钱

    @Topic(value = "trade2", retryTimes = 5, retryInterval = 10000)
    @Override
    public void receive(String msg) throws Exception{
        logger.info("=== 消息队列接收信息 : {}", msg);
        TradeAmqp tradeAmqp = mapper.readValue(msg, TradeAmqp.class);
        User seller = userMapper.selectByPrimaryKey(tradeAmqp.getSellerId());
        seller.setAccount(seller.getAccount() + tradeAmqp.getTradeAmount());
        logger.info("=== 用户 : {} 账户增加 : {}", seller.getName(), tradeAmqp.getTradeAmount());
        userMapper.updateByPrimaryKeySelective(seller);
    
        User buyer = userMapper.selectByPrimaryKey(tradeAmqp.getBuyerId());
        buyer.setAccount(buyer.getAccount() - tradeAmqp.getTradeAmount());
        logger.info("=== 用户 : {} 账户减少 : {}", buyer.getName(), tradeAmqp.getTradeAmount());
        userMapper.updateByPrimaryKeySelective(buyer);
        logger.info("AtomicInteger   {}", a.incrementAndGet());
    }
    
    测试

    在测试前,先看看表里面的初始化信息


    消息表 系统A业务表 系统B业务表

    从上图中可以看出,初始化的时候系统B中买方和卖方的账户余额都是0。

    接下来通过Jmeter进行测试:

    单线程测试

    只开一个线程,循环执行1000次请求,查看结果


    单线程测试 RabbitMQ

    但系统A后台报错了,太长了,所以删掉了一些。从日志中可以看出,好像在服务间调用的时候,即系统A调用系统M接口,向消息表写数据的时候,好像出现了问题!

    2018-08-26 16:39:54.377  INFO [hap-event-demo-trade-service,6dda6aa15fe370c8,6dda6aa15fe370c8,false] 10044 --- [nio-9021-exec-6] com.hand.hap.cloud.event.EventTemplate   : confirm event failed {}
    
    com.netflix.hystrix.exception.HystrixRuntimeException: EventClient#confirmEvent(String) timed-out and no fallback available.
        at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:822) ~[hystrix-core-1.5.10.jar:1.5.10]
        at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:807) ~[hystrix-core-1.5.10.jar:1.5.10]
        at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140) ~[rxjava-1.1.10.jar:1.1.10]
        at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) ~[rxjava-1.1.10.jar:1.1.10]
        at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) ~[rxjava-1.1.10.jar:1.1.10]
        at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1472) ~[hystrix-core-1.5.10.jar:1.5.10]
        at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1397) ~[hystrix-core-1.5.10.jar:1.5.10]
    Caused by: java.util.concurrent.TimeoutException: null
        at com.netflix.hystrix.AbstractCommand.handleTimeoutViaFallback(AbstractCommand.java:997) ~[hystrix-core-1.5.10.jar:1.5.10]
        at com.netflix.hystrix.AbstractCommand.access$500(AbstractCommand.java:60) ~[hystrix-core-1.5.10.jar:1.5.10]
        at com.netflix.hystrix.AbstractCommand$12.call(AbstractCommand.java:609) ~[hystrix-core-1.5.10.jar:1.5.10]
        at com.netflix.hystrix.AbstractCommand$12.call(AbstractCommand.java:601) ~[hystrix-core-1.5.10.jar:1.5.10]
        at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140) ~[rxjava-1.1.10.jar:1.1.10]
        ... 16 common frames omitted
    
    2018-08-26 16:39:55.393  INFO [hap-event-demo-trade-service,3e17e98019352cb8,3e17e98019352cb8,false] 10044 --- [nio-9021-exec-9] com.hand.hap.cloud.event.EventTemplate   : create event failed {}
    

    注意上面两条非常重要的日志:confirm event failed {}、create event failed {} ,都是调用系统M接口时出了问题。这时候再观察数据库里面的记录

    SELECT count(0) FROM event;        -- 1000条
    SELECT count(0) FROM trade_record;    -- 913条
    
    SELECT * FROM user; 
    1   xiaohong    29970
    2   xiaoming    -29970
    

    从上面我们可以发现两个问题

    1. 消息表有1000条记录,系统A却只有913条记录,正确的结果应该是系统A和消息表都是1000条记录。
    2. 从user表中的账户余额来看,1000次请求,每次30,正确的结果应该是30000 和 -30000,结果却少了30,也就是少了一次计算。

    下面来分析来分析一下这两个问题:

    • 针对对1个问题
    try {
        eventId = eventClient.createEvent(event);
    }catch (Exception e){
        logger.info("create event failed {}",e);
        return false;
    }
    

    消息表里面的记录是1000,说明系统M的针对系统A发过来的每个创建消息的请求都执行成功了;系统A少了87条记录,说明系统M在成功插入消息后将结果返回给系统A的时候出问题了,原因可能就是日志上显示的那样,timed-out and no fallback available,导致系统A接下来的逻辑不能正常执行,结果A丢了一些记录。

    • 针对对2个问题
      消息表里有1000条记录,每次30,正确结果应该是30000和-30000,实际上却是是29970和-29970,买方和卖方的钱对不上,少计算了一次。买卖双方账户余额完全是由系统B的业务逻辑控制,而触发系统B执行该段业务逻辑的条件是系统M中的消息状态是confirm,现在系统M有1000条记录,系统B却只计算了999次,并且观察系统B的后台日志并没有异常发生,说明有可能系统 M中存在一条消息的状态值不是confirm
    SELECT * FROM event WHERE status != 'confirm';
    256012c6df01473692ae92f7f058e8a9    unfinished  1535272794404   hap-event-demo-trade-service    trade   [{"topicName":"trade2","payload":"{\"uuid\":\"256012c6df01473692ae92f7f058e8a9\",\"tradeAmount\":30.0,\"buyerId\":2,\"sellerId\":1}"}]
    

    从上图中可以看出,确实有这么一条消息,它的状态为unfinished,说明系统M只向RabbitMQ发送了999条消息,因为只有状态为confirm的消息记录才会publish到RabbitMQ,从而导致系统B只消费了999条消息。而之所以系统M只向RabbitMQ发送999条消息,主要原因在于系统A在向系统M发送Confirm指令的时候出了问题,导致M中的消息状态不能正常变为confirm,从而导致M无法向RabbitMQ推送消息。

    经过这么一个简单测试,我们就发现了这个系统存在非常大的漏洞,这还仅仅是在单线程情况下。如果你看的仔细一些,就会发现一些问题,系统A在向系统M发送Confirm指令的时候出了问题,这不是我们上面考虑到的一种情况吗?消息丢失本来就可能发生,这时M根据A提供的接口,在系统A找到记录然后将M中的那条消息记录更新为confirm不就可以吗?这样想是没有问题的,但问题的关键在于,系统M根据那条状态为unfinished消息的uuid去系统A里面找,根本找不到啊!不信我们可以看看系统M中的后台日志,每5s一次的查询:

    M调用A接口查消息

    可以看到,日志中提示消息丢失了,所以问题的关键还是在于系统A,我觉得 在往系统M插一条消息和往系统A里面插入一条记录这两个操作应该要在一个事务里面完成,但现在A和M是两个系统,这又涉及到分布式事务的问题,所以是否可以将消息表放到系统A中?然后这两个操作在一个事务中提交

    实际情况可能会比这复杂很多吧,回到上面的报错日志:

    com.netflix.hystrix.exception.HystrixRuntimeException: EventClient#confirmEvent(String) timed-out and no fallback available.
    

    这个错误可能是因为服务调用超时导致的,基本是出现在Hystrix熔断器,在系统M中添加了以下的配置,不过这一块我也并不是很清楚

    ribbon:
      ReadTimeout: 60000
      ConnectTimeout: 60000
    hystrix:
      command:
        default:
          execution:
            timeout:
              enabled: false
            isolation:
              thread:
                timeoutInMilliseconds: 10000
    

    修改之后,重修做了几次测:1000、3000、5000,还是照样会有这个问题,todo。

    多线程测试

    这里开50个线程,每个线程循环请求20次


    多线程测试

    结果好像惨不忍睹,大量的 Hystrix circuit short-circuited and is OPEN

    Caused by: java.lang.RuntimeException: Hystrix circuit short-circuited and is OPEN
        at com.netflix.hystrix.AbstractCommand.handleShortCircuitViaFallback(AbstractCommand.java:979) ~[hystrix-core-1.5.10.jar:1.5.10]
        at com.netflix.hystrix.AbstractCommand.applyHystrixSemantics(AbstractCommand.java:557) ~[hystrix-core-1.5.10.jar:1.5.10]
        at com.netflix.hystrix.AbstractCommand.access$200(AbstractCommand.java:60) ~[hystrix-core-1.5.10.jar:1.5.10]
        at com.netflix.hystrix.AbstractCommand$4.call(AbstractCommand.java:419) ~[hystrix-core-1.5.10.jar:1.5.10]
        at com.netflix.hystrix.AbstractCommand$4.call(AbstractCommand.java:413) ~[hystrix-core-1.5.10.jar:1.5.10]
        at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:46) ~[rxjava-1.1.10.jar:1.1.10]
        ... 90 common frames omitted
    

    在网上找了一下资料记一次feign的问题排查引起这个问题的原因是在一个滚动窗口内,失败了二十个(默认),就会发生短路,短路时间默认为5秒,5秒之内拒绝所有的请求,之后开始运行
    解决办法:设置熔断器失败的个数,默认为20个,这里我给了1000个,只有超过1000个才会发生短路

    hystrix.command.default.circuitBreaker.requestVolumeThreshold=1000
    

    配置之后,重新测试,发现又报这种错

    com.netflix.hystrix.exception.HystrixRuntimeException: EventClient#createEvent(Event) could not be queued for execution and no fallback available
    Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@3931cca2 rejected from java.util.concurrent.ThreadPoolExecutor@13ed04ed[Running, pool size = 10, active threads = 10, queued tasks = 0, completed tasks = 116]
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) ~[na:1.8.0_101]
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) [na:1.8.0_101]
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) [na:1.8.0_101]
        at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) ~[na:1.8.0_101]
    

    不过这个RejectedExecutionException好像是线程池里面的抛出的一个异常,后面那些就是线程池的参数。
    Fegin默认的核心线程数是10个,默认的阻塞队列个数maxQueueSize是-1,这里我们开了50个线程,它的最大线程数应该是小于50,所以会抛RejectedExecutionException,添加以下配置

    hystrix.threadpool.default.coreSize=50
    

    但是我这里有个疑问,根据线程池的特点,在coreSize不够的情况下,不是可以放到阻塞队列吗?但是我在这里配置阻塞队列的个数好像作用不大,例如,我以下的配置是没用的,还是会报错

    hystrix.threadpool.default.coreSize=30
    hystrix.threadpool.default.maxQueueSize=20
    

    发现又会报以下错:

    com.netflix.hystrix.exception.HystrixRuntimeException: EventClient#confirmEvent(String) could not be queued for execution and no fallback available.
        at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:822) ~[hystrix-core-1.5.10.jar:1.5.10]
        at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:807) ~[hystrix-core-1.5.10.jar:1.5.10]
        at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140) ~[rxjava-1.1.10.jar:1.1.10]
        at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) ~[rxjava-1.1.10.jar:1.1.10]
        at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) ~[rxjava-1.1.10.jar:1.1.10]
        at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1472) ~[hystrix-core-1.5.10.jar:1.5.10]
        at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1397) ~[hystrix-core-1.5.10.jar:1.5.10]
    Caused by: java.util.concurrent.RejectedExecutionException: Rejected command because thread-pool queueSize is at rejection threshold.
    

    这时候再添加一个配置,表示即使maxQueueSize没有达到,达到queueSizeRejectionThreshold该值后,请求也会被拒绝

    hystrix.threadpool.default.queueSizeRejectionThreshold = 50
    

    所以我最终的配置如下:

    ribbon:
      ReadTimeout: 60000
      ConnectTimeout: 60000
    hystrix:
      threadpool:
        default:
          coreSize: 50
    #      maxQueueSize: 0
          queueSizeRejectionThreshold: 50
      command:
        default:
          circuitBreaker:
            requestVolumeThreshold: 1000
          execution:
            timeout:
              enabled: false
            isolation:
              thread:
                timeoutInMilliseconds: 10000
    
    计算机配置

    50个线程,每个线程循环请求20次
    因为这里请求的是系统A,所以图上的5s完成请求仅仅是指系统A完成请求,系统M发送消息到RabbitMQ是一个异步的过程,所以5s仅仅代表系统A完成业务处理,并不代表系统B也完成了业务处理

    50*20 MQ

    后台无异常,表数据正确:1000、1000、30000 和 -30000

    50个线程,每个线程循环请求60次

    50*60 MQ

    后台无异常,表数据正确:4000、4000、120000 和 -120000

    50个线程,每个线程循环请求100次

    50*100 MQ

    后台无异常,表数据正确:9000、9000、270000 和 -270000

    相关文章

      网友评论

        本文标题:分布式事务

        本文链接:https://www.haomeiwen.com/subject/dcewwftx.html