美文网首页
消息中间件实战(下)

消息中间件实战(下)

作者: herohua | 来源:发表于2020-02-18 13:49 被阅读0次

34.生产案例:从 RocketMQ 全链路分析一下为什么用户支付后没收到红包?

有用户反馈说,按照规则应该是在支付之后可以拿到一个现金红包的,但是他在支付了一个订单之后,却并没有收到这个现金红包,于是就反馈给了客服。
经过一通排查,从订单系统和红包系统当天那个时间段的日志来看,居然只看到了订单系统有推送消息到RocketMQ的日志,但是并没有看到红包系统从RocketMQ中接收消息以及发现金红包的日志。
可能原因一:订单系统推送消息到MQ的过程会丢失消息

网络故障导致订单系统推送消息失败.png
可能原因二:Broker丢失消息:消息写入os cache但没有写入磁盘
image.png
可能原因三:Broker丢失消息:消息写入磁盘但磁盘坏了
磁盘故障.png
可能原因四:红包系统获取到消息后丢失消息
默认情况下,MQ的消费者有可能会自动提交已经消费的offset,那么如果此时你还没处理这个消息派发红包的情况下,MQ的消费者可能直接自动给你提交这个消息1的offset到broker去了,标识为你已经成功处理了这个消息。接着恰巧在这个时候,我们的红包系统突然重启了,或者是宕机了,或者是可能在派发红包的时候更新数据库失败了,总之就是他突然故障了,红包系统的机器重启了一下,然后此时内存里的消息1必然就丢失了,而且红包也没发出去。
红包系统故障.png

35.发送消息零丢失方案:RocketMQ事务消息的实现流程分析

  1. 首先要让订单系统去发送一条half消息到MQ去,这个half消息本质就是一个订单支付成功的消息,只不过你可以理解为他这个消息的状态是half状态,这个时候红包系统是看不见这个half消息的。


    image.png
  2. 万一half消息写入失败
    这个时候订单系统就应该执行一系列的回滚操作,比如对订单状态做一个更新,让状态变成“关闭交易”,同时通知支付系统自动进行退款。
  3. half消息成功之后,订单系统完成自己的任务
    这个时候订单系统就应该在自己本地的数据库里执行一些增删改操作了,因为一旦half消息写成功了,就说明MQ肯定已经收到这条消息了,MQ还活着,而且目前你是可以跟MQ正常沟通的。


    image.png
  4. 如果订单系统的本地事务执行失败
    这个时候其实也很简单,直接就是让订单系统发送一个rollback请求给MQ就可以了。这个意思就是说,你可以把之前我发给你的half消息给删除掉了,因为我自己这里都出问题了,已经无力跟你继续后续的流程了。请求给MQ删除那个half消息之后,你的订单系统就必须走后续的回退流程了,就是通知支付系统退款。


    image.png
  5. 订单系统完成了本地事务
    如果订单系统成功完成了本地的事务操作,比如把订单状态都更新为“已完成”了,此时你就可以发送一个commit请求给MQ,要求让MQ对之前的half消息进行commit操作,让红包系统可以看见这个订单支付成功消息。


    image.png
  6. 如果发送half消息成功了,但是没收到响应呢?
    这个时候我们没收到响应,可能就会网络超时报错,也可能直接有其他的异常错误,这个时候订单系统会误以为是发送half消息到MQ失败了,订单系统就直接会执行退款流程了,订单状态也会标记为“已关闭”。


    image.png

    其实RocketMQ这里有一个补偿流程,他会去扫描自己处于half状态的消息,如果我们一直没有对这个消息执行commit/rollback操作,超过了一定的时间,他就会回调你的订单系统的一个接口,系统就得去查一下数据库,看看这个订单当前的状态,一下发现订单状态是“已关闭”,此时就知道,你必然得发送rollback请求给MQ去删除之前那个half消息了!


    image.png
    image.png
  7. 如果rollback或者commit发送失败了呢?
    这个时候其实也很简单,因为MQ里的消息一直是half状态,所以说他过了一定的超时时间会发现这个half消息有问题,他会回调你的订单系统的接口,此时要判断一下,这个订单的状态如果更新为了“已完成”,那就得再次执行commit请求,反之则再次执行rollback请求。
    本质这个MQ的回调就是一个补偿机制,如果你的half消息响应没收到,或者rollback、commit请求没发送成功,他都会来找你问问对half消息后续如何处理。
  8. 总结
    其实很简单,如果你的MQ有问题或者网络有问题,half消息根本都发不出去,此时half消息肯定是失败的,那么订单系统就不会执行后续流程了!
    如果要是half消息发送出去了,但是half消息的响应都没收到,然后执行了退款流程,那MQ会有补偿机制来回调找你询问要commit还是rollback,此时你选择rollback删除消息就可以了,不会执行后续流程!
    如果要是订单系统收到half消息了,结果订单系统自己更新数据库失败了,那么他也会进行回滚,不会执行后续流程了!
    如果要是订单系统收到half消息了,然后还更新自己数据库成功了,订单状态是“已完成”了,此时就必然会发送commit请求给MQ,一旦消息commit了,那么必然保证红包系统可以收到这个消息!
    而且即使你commit请求发送失败了,MQ也会有补偿机制,回调你接口让你判断是否重新发送commit请求。
    总之,就是你的订单系统只要成功了,那么必然要保证MQ里的消息是commit了可以让红包系统看到他!

36.事务消息机制的底层实现原理

  1. half 消息是如何对消费者不可见的?
    RocketMQ一旦发现你发送的是一个half消息,他不会把这个half消息的offset写入OrderPaySuccessTopic的ConsumeQueue里去。他会把这条half消息写入到自己内部的“RMQ_SYS_TRANS_HALF_TOPIC”这个Topic对应的一个ConsumeQueue里去,所以你的红包系统自然无法从OrderPaySuccessTopic的ConsumeQueue中看到这条half消息了。half消息进入到RocketMQ内部的RMQ_SYS_TRANS_HALF_TOPIC的ConsumeQueue文件了,此时就会认为half消息写入成功了,然后就会返回响应给订单系统。

    RMQ_SYS_TRANS_HALF_TOPIC.png
  2. 假如因为各种问题,没有执行rollback或者commit会怎么样?
    其实这个时候他会在后台有定时任务,定时任务会去扫描RMQ_SYS_TRANS_HALF_TOPIC中的half消息,如果你超过一定时间还是half消息,他会回调订单系统的接口,让你判断这个half消息是要rollback还是commit。

    定时任务扫描.png
  3. 如果执行rollback操作的话,如何标记消息回滚?
    因为RocketMQ都是顺序把消息写入磁盘文件的,所以在这里如果你执行rollback,他的本质就是用一个OP操作来标记half消息的状态,RocketMQ内部有一个OP_TOPIC,此时可以写一条rollback OP记录到这个Topic里,标记某个half消息是rollback了。另外,假设你一直没有执行commit/rollback,RocketMQ会回调订单系统的接口去判断half消息的状态,但是他最多就是回调15次,如果15次之后你都没法告知他half消息的状态,就自动把消息标记为rollback。

    如何标记消息回滚.png
  4. 如果执行commit操作,如何让消息对红包系统可见?
    执行commit操作之后,RocketMQ就会在OP_TOPIC里写入一条记录,标记half消息已经是commit状态了。接着需要把放在RMQ_SYS_TRANS_HALF_TOPIC中的half消息给写入到OrderPaySuccessTopic的ConsumeQueue里去,然后我们的红包系统可以就可以看到这条消息进行消费了。

    如何标记消息提交.png

其实本质都是基于CommitLog、ConsumeQueue这套存储机制来做的,只不过中间有一些Topic的变换,half消息可能就是写入内部Topic的

37.同步发送消息 + 反复多次重试方案 VS RocketMQ事务消息方案

RocketMQ事务消息方案虽然能保证消息零丢失,但是机制复杂,完全有可能导致整体性能比较差,而且吞吐量比较低,是否有更加简单的方法来确保消息一定可以到达MQ呢?能不能基于重试机制来确保消息到达MQ?
只要我们在代码中发送消息到MQ之后,同步等待MQ返回响应给我们,一直等待,如果半路中有网络异常或者MQ内部异常,我们肯定会收到一个异常,比如网络错误,或者请求超时之类的。
如果我们在收到异常之后,就认为消息到MQ发送失败了,然后再次重试尝试发送消息到MQ,接着再次同步等待MQ返回响应给我们,这样反复重试,是否可以确保消息一定会到达MQ?

同步等待+反复重试.png
先执行订单本地事务,还是先发消息到MQ?
如果我们先执行订单本地事务,接着再发送消息到MQ的话,伪代码是这样的:
image.png
假设你刚执行完成了订单本地事务了,结果还没等到你发送消息到MQ,结果你的订单系统突然崩溃了!这就导致你的订单状态可能已经修改为了“已完成”,但是消息却没发送到MQ去!这就是这个方案最大的隐患。
image.png

把订单本地事务和重试发送MQ消息放到一个事务代码中
伪代码改成这样:

image.png
上面这个代码看起来似乎解决了我们的问题,就是在这个方法上加入事务,在这个事务方法中,我们哪怕执行了orderService.finishOrderPay(),但是其实也仅仅执行了一些增删改SQL语句,还没提交订单本地事务。
如果发送MQ消息失败了,而且多次重试还不奏效,则我们抛出异常会自动回滚订单本地事务;如果你刚执行了orderService.finishOrderPay(),结果订单系统直接崩溃了,此时订单本地事务会回滚,因为根本没提交过。
但是对于这个方案,还是非常的不理想,原因就出在那个MQ多次重试的地方。
假设用户支付成功了,然后支付系统回调通知你的订单系统说,有一笔订单已经支付成功了,这个时候你的订单系统卡在多次重试MQ的代码那里,可能耗时了好几秒种,此时回调通知你的系统早就等不及可能都超时异常了。而且你把重试MQ的代码放在这个逻辑里,可能会导致订单系统的这个接口性能很差。
image.png

一定可以依靠本地事务回滚吗?
看下面的代码:

image.png
虽然在方法上加了事务注解,但是代码里还有更新Redis缓存和Elasticsearch数据的代码逻辑,如果你要是已
经完成了订单数据库更新、Redis缓存更新、ES数据更新了,结果没法送MQ呢订单系统崩溃了。虽然订单数据库的操作会回滚,但是Redis、Elasticsearch中的数据更新会自动回滚吗?不会的,因为他们根本没法自动回滚,此时数据还是会不一致的。所以说,完全寄希望于本地事务自动回滚是不现实的。

所以分析完了这个同步发送消息 + 反复多次重试的方案之后,我们会发现他实际落地的时候是可以的,但是里面存在一些问题。最后保证业务系统一致性的最佳方案还是:基于RocketMQ的事务消息机制。

38.分析RocketMQ事物消息的代码实现细节

public class TransactionProducer {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {

        // 这个东西就是用来接受RocketMQ回调的一个监听器接口
        // 这里会实现执行订单本地事务,commit、rollback,回调查询等逻辑
        TransactionListener transactionListener = new TransactionListenerImpl();

        // 创建一个支持事务消息的Producer
        TransactionMQProducer producer = new TransactionMQProducer("TestProducerGroup");

        // 这个线程池是用来处理RocketMQ回调你的请求的
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2000),
                (r) -> {
                    Thread thread = new Thread(r);
                    thread.setName("TestThread");
                    return thread;
                });

        // 给事务生产者设置对应的线程池,负责执行RocketMQ回调请求
        producer.setExecutorService(threadPool);
        // 给事务生产者设置对应的回调函数
        producer.setTransactionListener(transactionListener);
        // 启动这个事务消息生产者
        producer.start();

        // 构建一条订单支付成功的消息,指定Topic
        Message message = new Message("PayOrderSuccessTopic", "TestTag", "TestKey",
                "订单消息".getBytes(RemotingHelper.DEFAULT_CHARSET));

        try {
            SendResult sendResult = producer.sendMessageInTransaction(message, null);
        } catch (MQClientException e) {
            // half消息发送失败
            // 订单系统执行回滚逻辑,比如说触发支付退款,更新订单状态为“已关闭”
        }
    }
}
public class TransactionListenerImpl implements TransactionListener {

    // 如果half消息发送成功了,就会毁掉你的这个函数,你就可以执行本地事务了
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务
        // 根据本地事务一连串执行结果,去选择commit or rollback
        try {
            // 如果本地事务都成功了,返回commit
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            // 本地事务都成功了,回滚一切执行过的操作
            // 如果本地事务执行失败了,返回rollback,标记half消息无效
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    // 如果因为各种原因,没有返回commit或者rollback
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 查询本地事务,是否执行成功了
        Integer status = localTrans.get(msg.getTransactionId());
        // 根据本地事务的情况取选择commit or rollback
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

39.Broker消息零丢失方案:同步刷盘 + Raft协议主从同步

Broker消息丢失的可能原因:

  1. 消息被写入到os cache,但没有被写入到磁盘,此时Broke宕机;
  2. 消息已经被写入到磁盘,但是磁盘损坏了,导致磁盘中消息丢失,并且此时消费者还没有来得及消费。

解决方案:
对于1,将异步刷盘调整为同步刷盘,这样就保证了只有消息被刷入到磁盘中,该消息才被认为写入成功,返回响应给生产者。比如我们发送half消息的时候,只要MQ返回响应是half消息发送成功了,那么就说明消息已经进入磁盘文件了,不会停留在os cache里。具体做法:调整broker的配置文件,将其中的flushDiskType配置设置为:SYNC_FLUSH,默认他的值是ASYNC_FLUSH,即默认是异步刷盘的。
对于2,通过主从架构模式避免磁盘故障导致的数据丢失,这样一来,你一条消息但凡写入成功了,此时主从两个Broker上都有这条数据了,此时如果你的Master Broker的磁盘坏了,但是Slave Broker上至少还是有数据的,数据是不会因为磁盘故障而丢失的。

40.Consumer消息零丢失方案:手动提交offset + 自动故障转移

Consumer消息丢失原因:红包系统已经拿到了这条消息,但是消息目前还在他的内存里,还没执行派发红包的逻辑,此时他就直接提交了这条消息的offset到broker去说自己已经处理过了,接着红包系统在上图这个状态的时候就直接崩溃了,内存里的消息就没了,红包也没派发出去,结果Broker已经收到他提交的消息offset了,还以为他已经处理完这条消息了。等红包系统重启的时候,就不会再次消费这条消息了。

解决方案:


手动提交offest代码.png

对于RocketMQ而言,其实只要你的红包系统是在这个监听器的函数中先处理一批消息,基于这批消息都派发完了红包,然后返回了那个消费成功的状态,接着才会去提交这批消息的offset到broker去。所以在这个情况下,如果你对一批消息都处理完毕了,然后再提交消息的offset给broker,接着红包系统崩溃了,此时是不会丢失消息的。


手动提交offest.png

那么如果是红包系统获取到一批消息之后,还没处理完,也就没返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS这个状态呢,自然没提交这批消息的offset给broker呢,此时红包系统突然挂了,会怎么样?
其实在这种情况下,你对一批消息都没提交他的offset给broker的话,broker不会认为你已经处理完了这批消息,此时你突然红包系统的一台机器宕机了,他其实会感知到你的红包系统的一台机器作为一个Consumer挂了。接着他会把你没处理完的那批消息交给红包系统的其他机器去进行处理,所以在这种情况下,消息也绝对是不会丢失的。

需要警惕的地方:不能异步消费消息
不能在代码中对消息进行异步的处理,如下错误的示范,我们开启了一个子线程去处理这批消息,然后启动线程之后,就直接返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态了。

不能异步消费消息.png
如果要是用这种方式来处理消息的话,那可能就会出现你开启的子线程还没处理完消息呢,你已经返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态了,就可能提交这批消息的offset给broker了,认为已经处理结束了。然后此时你红包系统突然宕机,必然会导致你的消息丢失了!

41.基于 RocketMQ 设计的全链路消息零丢失方案总结

发送消息到MQ的零丢失:
方案一(同步发送消息 + 反复多次重试)
方案二(事务消息机制),两者都有保证消息发送零丢失的效果,但是经过分析,事务消息方案整体会更好一些
MQ收到消息之后的零丢失:开启同步刷盘策略 + 主从架构同步机制,只要让一个Broker收到消息之后同步写入磁盘,同时同步复制给其他Broker,然后再返回响应给生产者说写入成功,此时就可以保证MQ自己不会弄丢消息
消费消息的零丢失:采用RocketMQ的消费者天然就可以保证你处理完消息之后,才会提交消息的offset到broker去,只要记住别采用多线程异步处理消息的方式即可

消息零丢失方案的优势与劣势
优势:消息零丢失
劣势:整个从头到尾的消息流转链路的性能大幅度下降,MQ的吞吐量大幅度的下降

消息零丢失方案到底适用场景
一般我们建议,对于跟金钱、交易以及核心数据相关的系统和核心链路,可以上这套消息零丢失方案。
比如支付系统,他是绝对不能丢失任何一条消息的,你的性能可以低一些,但是不能有任何一笔支付记录丢失。
比如订单系统,公司一般是不能轻易丢失一个订单的,毕竟一个订单就对应一笔交易,如果订单丢失,用户还支付成功了,你轻则要给用户赔付损失,重则弄不好要经受官司,特别是一些B2B领域的电商,一笔线上交易可能多大几万几十万。
所以对这种非常非常核心的场景和少数几条核心链路,才会建议大家上这套复杂的消息0丢失方案。对于非核心的链路,非金钱交易的链路,大家可以适当简化这套方案,用一些方法避免数据轻易丢失,但是同时性能整体很高,即使有极个别的数据丢失,对非核心的场景,也不会有太大的影响。

42.生产案例:从 RocketMQ 底层原理分析为什么会重复发优惠券?

重复发优惠券.png
  1. 用于发送消息到MQ的订单系统,如果出现了接口超时等问题,可能会导致上游的支付系统重试调用订单系统的接口,进而导致订单系统对一个消息重复发送两条到MQ里去!


    可能原因1.png
  2. 发送MQ的重试机制可能因为网络原因出现超时异常,从而重复发送MQ。


    重试代码.png
    网络原因导致重发MQ.png
  3. 优惠券系统刚刚发完优惠券,还没来得及提交消息offset到broker,就宕机了(或者重启),这时因为你没提交这条消息的offset给broker,broker并不知道你已经处理完了这条消息,然后优惠券系统重启之后,broker就会再次把这条消息交给你,让你再一次进行处理,然后你会再一次发送一张优惠券,导致重复发送了两次优惠券!


    第一次发送优惠券.png
    第二次发送优惠券.png

43.对订单系统核心流程引入 幂等性机制,保证数据不会重复

什么是幂等性机制?
这个幂等性机制,其实就是用来避免对同一个请求或者同一条消息进行重复处理的机制,所谓的幂等,他的意思就是,比如你有一个接口,然后如果别人对一次请求重试了多次,来调用你的接口,你必须保证自己系统的数据是正常的,不能多出来一些重复的数据,这就是幂等性的意思。

发送消息到MQ的时候如何保证幂等性?
1. 业务判断法:当你的订单系统的接口被重试调用的时候,你这个接口上来就应该发送请求到MQ里去查询一下,比如对订单id=1100这个订单的支付成功消息,在你MQ那里有没有?如果有的话,我就不再重复发送消息了!

业务判断法.png
弊端:在这个环节你直接从MQ查询消息是没这个必要的,他的性能也不是太好,会影响你的接口的性能。
2. 状态判断法-基于Redis缓存的幂等性机制:这个方法的核心在于,你需要引入一个Redis缓存来存储你是否发送过消息的状态,如果你成功发送了一个消息到MQ里去,你得在Redis缓存里写一条数据,标记这个消息已经发送过,那么当你的订单接口被重复调用的时候,你只要根据订单id去Redis缓存里查询一下,这个订单的支付消息是否已经发送给MQ了,如果发送过了,你就别再次发送了!
基于Redis缓存的幂等性机制.png
弊端:这种方案一般情况下是可以做到幂等性的,但是如果有时候你刚发送了消息到MQ,还没来得及写Redis,系统就挂了,之后你的接口被重试调用的时候,你查Redis还以为消息没发过,就会发送重复的消息到MQ去。

优惠券系统如何保证消息处理的幂等性?
其实这里就比较简单了,直接基于业务判断法就可以了,因为优惠券系统每次拿到一条消息后给用户发一张优惠券,实际上核心就是在数据库里给用户插入一条优惠券记录。那么如果优惠券系统从MQ那里拿到一个订单的两条重复的支付成功消息,这个时候其实很简单,他只要先去优惠券数据库中查询一下,比如对订单id=1100的订单,是否已经发放过优惠券了,是否有优惠券记录,如果有的话,就不要重复发券了!

优惠券系统保证消息处理的幂等性.png

总结
一般来说,对于MQ的重复消息而言,往MQ里重复发送一样的消息还是可以接受的,因为MQ里有多条重复消息,它不会对系统的核心数据造成影响,但是关键要保证的是,从MQ里获取消息进行处理的时候,必须要保证消息不能重复处理。

44.如果优惠券系统的数据库宕机,如何用死信队列解决这种异常场景?

假设了一个场景,就是订单支付成功之后会推送消息到MQ,然后优惠券系统、红包系统会从MQ里获取消息去执行后续的处理,比如发红包或者发优惠券。那么如果这个时候,优惠券系统的数据库宕机了,针对这样的一个坑爹的异常场景我们应该怎么处理?
数据库宕机的时候,返回RECONSUME_LATER
实际上如果我们因为数据库宕机等问题,对这批消息的处理是异常的,此时没法处理这批消息,我们就应该返回一个RECONSUME_LATER状态,他的意思是,我现在没法完成这批消息的处理,麻烦你稍后过段时间再次给我这批消息让我重新试一下!

RECONSUME_LATER.png
RocketMQ是如何让你进行消费重试的?
简单来说,RocketMQ会有一个针对你这个ConsumerGroup的重试队列。如果你返回了RECONSUME_LATER状态,他会把你这批消息放到你这个消费组的重试队列中去比如你的消费组的名称是“VoucherConsumerGroup”,意思是优惠券系统的消费组,那么他会有一个
“%RETRY%VoucherConsumerGroup”这个名字的重试队列,然后过一段时间之后,重试队列中的消息会再次给我们,让我们进行处理。如果再次失败,又返回了RECONSUME_LATER,那么会再过一段时间让我们来进行处理,默认最多是重试16次!每次重试之间的间隔时间是不一样的,这个间隔时间可以如下进行配置:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
重试队列.png
如果连续重试16次还是无法处理消息,然后怎么办?
其实就是一批消息交给你处理,你重试了16次还一直没处理成功,就不要继续重试这批消息了,你就认为他们死掉了就可以了。然后这批消息会自动进入死信队列。死信队列的名字是“%DLQ%VoucherConsumerGroup”
那么对死信队列中的消息我们怎么处理?
其实这个就看你的使用场景了,比如我们可以专门开一个后台线程,就是订阅“%DLQ%VoucherConsumerGroup”这个死信队列,对死信队列中的消息,还是一直不停的重试。
死信队列.png

消费者底层的一些依赖可能有故障了,比如数据库宕机,缓存宕机之类的,此时你就没办法完成消息的处理了,那么可以通过一些返回状态去让消息进入RocketMQ自带的重试队列,同时如果反复重试还是不行,可以让消息进入RocketMQ自带的死信队列,后续针对死信队列中的消息进行单独的处理就可以了。

45.生产案例:为什么基于 RocketMQ 进行订单库数据同步时会消息乱序?

场景再现:大数据系统在基于Mysql binlog同步订单数据时,binlog里有两条日志,依次时insert、update操作,但是大数据系统在处理消息的时候哦,先处理了upodate消息,后处理insert消息,导致消息乱序,数据出现问题。

消息乱序现象.png
原因分析:原本有顺序的消息,完全可能会分发到不同的MessageQueue中去,然后大数据系统的不同机器上部署的Consumer可能会用混乱的顺序从不同的MessageQueue里获取消息然后处理。
消息乱序原因.png

46.如何解决订单数据库同步的消息乱序问题?

  1. 采用取模的方式让属于同一个订单的binlog进入一个MessageQueue
  2. 获取binlog的时候也得有序
    解决订单数据库同步的消息乱序问题的方案.png
    万一消息处理失败了不可以走重试队列
    因为如果你的consumer获取到订单的一个insert binlog,结果处理失败了,此时返回了RECONSUME_LATER,那么这条消息会进入重试队列,过一会儿才会交给你重试。但是此时broker会直接把下一条消息,也就是这个订单的update binlog交给你来处理,此时万一你执行成功了,就根本没有数据可以更新!又会出现消息乱序的问题。
    所以对于有序消息的方案中,如果你遇到消息处理失败的场景,就必须返回SUSPEND_CURRENT_QUEUE_A_MOMENT这个状态,意思是先等一会儿,一会儿再继续处理这批消息,而不能把这批消息放入重试队列去,然后直接处理下一批消息。

RocketMQ的顺序消息机制的代码实现

  1. 让一个订单的binlog进入一个MessageQueue


    让一个订单的binlog进入一个MessageQueue.png
  2. 消费者按照顺序来获取一个MessageQueue中的消息
    消费者按照顺序来获取一个MessageQueue中的消息.png
    使用的是MessageListenerOrderly这个东西,他里面有Orderly这个名称,也就是说,Consumer会对每一个ConsumeQueue,都仅仅用一个线程来处理其中的消息。比如对ConsumeQueue01中的订单id=1100的多个binlog,会交给一个线程来按照binlog顺序来依次处理。否则如果ConsumeQueue01中的订单id=1100的多个binlog交给Consumer中的多个线程来处理的话,那还是会有消息乱序的问题。

47.基于RocketMQ的数据过滤机制,提升订单数据库同步的处理效率

一个数据库中可能会包含很多表的数据,比如订单数据库,他里面除了订单信息表以外,可能还包含很多其他的表。所以我们在进行数据库binlog同步的时候,很可能是把一个数据库里所有表的binlog都推送到MQ里去的!
假设我们的大数据系统仅仅关注订单数据库中的表A的binlog,并不关注其他表的binlog,那么大数据系统可能需要在获取到所有表的binlog之后,对每条binlog判断一下,是否是表A的binlog?
如果不是表A的binlog,那么就直接丢弃不要处理;如果是表A的binlog,才会去进行处理!但是这样的话,必然会导致大数据系统处理很多不关注的表的binlog,也会很浪费时间,降低消息的效率.

解决方案:在发送消息的时候,给消息设置tag和属性
针对这个问题,我们可以采用RocketMQ支持的数据过滤机制,来让大数据系统仅仅关注他想要的表的binlog数据即可。
发送消息的时候,可以给消息设置tag和属性:

给消息设置tag和属性.png
在消费数据的时候根据tag和属性进行过滤:
根据tag过滤.png
根据属性过滤
RocketMQ还是支持比较丰富的数据过滤语法的,如下所示:
(1)数值比较,比如:>,>=,<,<=,BETWEEN,=;
(2)字符比较,比如:=,<>,IN;
(3)IS NULL 或者 IS NOT NULL;
(4)逻辑符号 AND,OR,NOT;
(5)数值,比如:123,3.1415;
(6)字符,比如:'abc',必须用单引号包裹起来;
(7)NULL,特殊的常量
(8)布尔值,TRUE 或 FALSE

48.生产案例:基于延迟消息机制优化大量订单的定时退款扫描问题!

场景:在实际情况中,其实APP的大量用户每天会下很多订单,但是不少订单可能是一直没有进行支付的,可能他下单之后犹豫了,可能是他忘了支付了!所以一般订单系统都必须设置一个规则,当一个订单下单之后,超过比如30分钟没有支付,那么就必须订单系统自动关闭这个订单,后续你如果要购买这个订单里的商品,就得重新下订单了。
问题:那么订单系统就需要有一个后台线程,不停的扫描订单数据库里所有的未支付状态的订单,看他如果超过30分钟了还没支付,那么就必须自动把订单状态 更新为“已关闭”。

后台线程扫描未支付订单.png
但是这里就引入了一个问题,就是订单系统的后台线程必须要不停的扫描各种未支付的订单,这种实现方式实际上并不是很好。
  1. 一个原因是未支付状态的订单可能是比较多的,然后你需要不停的扫描他们,可能每个未支付状态的订单要被扫描N多遍,才会发现他已经超过30分钟没支付了。
  2. 另外一个是很难去分布式并行扫描你的订单。因为假设你的订单数据量特别的多,然后你要是打算用多台机器部署订单扫描服务,但是每台机器扫描哪些订单?怎么扫描?什么时候扫描?这都是一系列的麻烦问题。
    方案:针对类似这种场景,MQ里的延迟消息可以派上用场了。所谓延迟消息,意思就是说,我们订单系统在创建了一个订单之后,可以发送一条消息到MQ里去,我们指定这条消息是延迟消息,比如要等待30分钟之后,才能被订单扫描服务给消费到,这样当订单扫描服务在30分钟后消费到了一条消息之后,就可以针对这条消息的信息,去订单数据库里查询这个订单,看看他在创建过后都过了30分钟了,此时他是否还是未支付状态?如果此时订单还是未支付状态,那么就可以关闭他,否则订单如果已经支付了,就什么都不用做了。
    延迟消息用法.png
    这种方式就比你用后台线程扫描订单的方式要好的多了,一个是对每个订单你只会在他创建30分钟后查询他一次而已,不会反复扫描订单多次。
    另外就是如果你的订单数量很多,你完全可以让订单扫描服务多部署几台机器,然后对于MQ中的Topic可以多指定一个MessageQueue,这样每个订单扫描服务的机器作为一个Consumer都会处理一部分订单的查询任务。
    延迟消息代码实现:
    延迟消息生产者:
    生产者代码.png
    发送延迟消息的核心,就是设置消息的delayTimeLevel,也就是延迟级别
    RocketMQ默认支持一些延迟级别如下:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。所以上面代码中设置延迟级别为3,意思就是延迟10s,你发送出去的消息,会过10s被消费者获取到。那么如果是订单延迟扫描场景,可以设置延迟级别为16,也就是对应上面的30分钟。
    延迟消息消费者:
    消费者代码.png

49.在RocketMQ的生产实践中积累的各种一手经验总结

1. 灵活的运用 tags来过滤数据
在真正的生产项目中,建议大家合理的规划Topic和里面的tags,一个Topic代表了一类业务消息数据,然后对于这类业务消息数据,如果你希望继续划分一些类别的话,可以在发送消息的时候设置tags。
举个例子,比如我们都知道现在常见的外卖平台有美团外卖、饿了么外卖还有别的一些外卖,那么假设你现在一个系统要发送外卖订单数据到MQ里去,就可以针对性的设置tags,比如不同的外卖数据都到一个“WaimaiOrderTopic”里去。但是不同类型的外卖可以有不同的tags:“meituan_waimai”,“eleme_waimai”,“other_waimai”,等等。然后对你消费“WaimaiOrderTopic”的系统,可以根据tags来筛选,可能你就需要某一种类别的外卖数据罢了。
2. 基于消息key来定位消息是否丢失
之前我们给大家讲过,在消息0丢失方案中,可能要解决的是消息是否丢失的问题,那么如果消息真的丢失了,我们是不是要排查?此时是不是要从MQ里查一下,这个消息是否丢失了?
那么怎么从MQ里查消息是否丢失呢?可以基于消息key来实现,比如通过下面的方式设置一个消息的key为订单id:message.setKeys(orderId),这样这个消息就具备一个key了。接着这个消息到broker上,会基于key构建hash索引,这个hash索引就存放在IndexFile索引文件里。然后后续我们可以通过MQ提供的命令去根据key查询这个消息,类似下面这样:mqadmin queryMsgByKey -n 127.0.0.1:9876 -t SCANRECORD -k orderId
3. 消息零丢失方案的补充
之前我们给大家分析过消息零丢失方案,其实在消息零丢失方案中还有一个问题,那就是MQ集群彻底故障了,此时就是不可用了,那么怎么办呢?
其实对于一些金融级的系统,或者跟钱相关的支付系统,或者是广告系统,类似这样的系统,都必须有超高级别的高可用保障机制。
一般假设MQ集群彻底崩溃了,你生产者就应该把消息写入到本地磁盘文件里去进行持久化,或者是写入数据库里去暂存起来,等待MQ恢复之后,然后再把持久化的消息继续投递到MQ里去。
4. 提高消费者的吞吐量
如果消费的时候发现消费的比较慢,那么可以提高消费者的并行度,常见的就是部署更多的consumer机器
但是这里要注意,你的Topic的MessageQueue得是有对应的增加,因为如果你的consumer机器有5台,然后MessageQueue只有4个,那么意味着有一个consumer机器是获取不到消息的。
然后就是可以增加consumer的线程数量,可以设置consumer端的参数:consumeThreadMin、consumeThreadMax,这样一台consumer机器上的消费线程越多,消费的速度就越快。此外,还可以开启消费者的批量消费功能,就是设置consumeMessageBatchMaxSize参数,他默认是1,但是你可以设置的多一些,那么一次就会交给你的回调函数一批消息给你来处理了,此时你可以通过SQL语句一次性批量处理一些数据,比如:update xxx setxxx where id in (xx,xx,xx)。通过批量处理消息的方式,也可以大幅度提升消息消费的速度。
5. 要不要消费历史消息
其实consumer是支持设置从哪里开始消费消息的,常见的有两种:一个是从Topic的第一条数据开始消费,一个是从最后一次消费过的消息之后开始消费。对应的是:CONSUME_FROM_LAST_OFFSETCONSUME_FROM_FIRST_OFFSET。一般来说,我们都会选择CONSUME_FROM_FIRST_OFFSET,这样你刚开始就从Topic的第一条消息开始消费,但是以后每次重启,你都是从上一次消费到的位置继续往后进行消费的。

50.企业级的RocketMQ集群如何进行权限机制的控制?

在RocketMQ中实现权限控制也不难,首先我们需要在broker端放一个额外的ACK权限控制配置文件,里面需要规定好权限,包括什么用户对哪些Topic有什么操作权限,这样的话,各个Broker才知道你每个用户的权限。
首先在每个Broker的配置文件里需要设置aclEnable=true这个配置,开启权限控制
其次,在每个Broker部署机器的${ROCKETMQ_HOME}/store/config目录下,可以放一个plain_acl.yml的配置文件,这个里面就可以进行权限配置,类似下面这样子:

# 这个参数就是全局性的白名单
# 这里定义的ip地址,都是可以访问Topic的
globalWhiteRemoteAddresses:
- 13.21.33.*
- 192.168.0.*
# 这个accounts就是说,你在这里可以定义很多账号
# 每个账号都可以在这里配置对哪些Topic具有一些操作权限
accounts:
# 这个accessKey其实就是用户名的意思,比如我们这里叫做“订单技术团队”
- accessKey: OrderTeam
# 这个secretKey其实就是这个用户名的密码
secretKey: 123456
# 下面这个是当前这个用户名下哪些机器要加入白名单的
whiteRemoteAddress:
# admin指的是这个账号是不是管理员账号
admin: false
# 这个指的是默认情况下这个账号的Topic权限和ConsumerGroup权限
defaultTopicPerm: DENY
defaultGroupPerm: SUB
# 这个就是这个账号具体的堆一些账号的权限
# 下面就是说当前这个账号对两个Topic,都具备PUB|SUB权限,就是发布和订阅的权限
# PUB就是发布消息的权限,SUB就是订阅消息的权限
# DENY就是拒绝你这个账号访问这个Topic
topicPerms:
- CreateOrderInformTopic=PUB|SUB
- PaySuccessInformTopic=PUB|SUB
# 下面就是对ConsumerGroup的权限,也是同理的
groupPerms:
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB
# 下面就是另外一个账号了,比如是商品技术团队的账号
- accessKey: ProductTeam
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
# 如果admin设置为true,就是具备一切权限
admin: true

如果你一个账号没有对某个Topic显式的指定权限,那么就是会采用默认Topic权限。
接着我们看看在你的生产者和消费者里,如何指定你的团队分配到的RocketMQ的账号,当你使用一个账号的时候,就只能访问你有权限的Topic。


权限控制代码.png

上面的代码中就是在创建Producer的时候后,传入进去一个AclClientRPCHook,里面就可以设置你这个Producer的账号密码,对于创建Consumer也是同理的。通过这样的方式,就可以在Broker端设置好每个账号对Topic的访问权限,然后你不同的技术团队就用不同的账号就可以了。

51.如何对线上生产环境的RocketMQ集群进行消息轨迹的追踪?

首先需要在broker的配置文件里开启traceTopicEnable=true这个选项,此时就会开启消息轨迹追踪的功能。
接着当我们开启了上述的选项之后,我们启动这个Broker的时候会自动创建出来一个内部的Topic,就是RMQ_SYS_TRACE_TOPIC,这个Topic就是用来存储所有的消息轨迹追踪的数据的。
此时创建Producer的时候要用如下的方式,下面构造函数中的第二个参数,就是enableMsgTrace参数,他设置为true,就是说可以对消息开启轨迹追踪,在订阅消息的时候,对于Consumer也是同理的,在构造函数的第二个参数设置为true,就是开启了消费时候的轨迹追踪。


image.png

接着如果我们想要查询消息轨迹,也很简单,在RocketMQ控制台里,在导航栏里就有一个消息轨迹,在里面可以创建查询任务,你可以根据messageId、message key或者Topic来查询,查询任务执行完毕之后,就可以看到消息轨迹的界面了。

52.由于消费系统故障导致的RocketMQ百万消息积压问题,应该如何处理?

1. MessageQueue数量大于消费者系统数量->增加机器
假如你的Topic有20个MessageQueue,然后你只有4个消费者系统在消费,那么每个消费者系统会从5个MessageQueue里获取消息,所以此时如果你仅仅依靠4个消费者系统是肯定不够的,毕竟MQ里积压了百万消息了。
所以此时你可以临时申请16台机器多部署16个消费者系统的实例,然后20个消费者系统同时消费,每个人消费一个MessageQueue的消息,此时你会发现你消费的速度提高了5倍,很快积压的百万消息都会被处理完毕。
当你处理完百万积压的消息之后,就可以下线多余的16台机器了。
2. MessageQueue数量等于消费者系统数量->写入临时队列
那么如果你的Topic总共就只有4个MessageQueue,然后你就只有4个消费者系统呢?
这个时候就没办法扩容消费者系统了,因为你加再多的消费者系统,还是只有4个MessageQueue,没法并行消费。
所以此时往往是临时修改那4个消费者系统的代码,让他们获取到消息然后不写入NoSQL,而是直接把消息写入一个新的Topic,这个速度是很快的,因为仅仅是读写MQ而已。
然后新的Topic有20个MessageQueue,然后再部署20台临时增加的消费者系统,去消费新的Topic后写入数据到NoSQL里去,这样子也可以迅速的增加消费者系统的并行处理能力,使用一个新的Topic来允许更多的消费者系统并行处理。

53.金融级的系统如何针对RocketMQ集群崩溃设计高可用方案?

跟金钱相关的一些系统,他可能需要依赖MQ去传递消息,如果你MQ突然崩溃了,可能导致很多跟钱相关的东西就会出问题。
针对这种场景,我们通常都会在你发送消息到MQ的那个系统中设计高可用的降级方案,这个降级方案通常的思路是,你需要在你发送消息到MQ代码里去try catch捕获异常,如果你发现发送消息到MQ有异常,此时你需要进行重试
如果你发现连续重试了比如超过3次还是失败,说明此时可能就是你的MQ集群彻底崩溃了,此时你必须把这条重要的消息写入到本地存储中去,可以是写入数据库里,也可以是写入到机器的本地磁盘文件里去,或者是NoSQL存储中去。
之后你要不停的尝试发送消息到MQ去,一旦发现MQ集群恢复了,你必须有一个后台线程可以把之前持久化存储的消息都查询出来,然后依次按照顺序发送到MQ集群里去,这样才能保证你的消息不会因为MQ彻底崩溃会丢失。
这里要有一个很关键的注意点,就是你把消息写入存储中暂存时,一定要保证他的顺序,比如按照顺序一条一条的写入本地磁盘文件去暂存消息。而且一旦MQ集群故障了,你后续的所有写消息的代码必须严格的按照顺序把消息写入到本地磁盘文件里去暂存,这个顺序性是要严格保证的。

54.为什么要给RocketMQ增加消息限流功能保证其高可用性?

其实本质上来说,限流功能就是对系统的一个保护功能。
在接收消息这块,必须引入一个限流机制,也就是说要限制好,你这台机器每秒钟最多就只能处理比如3万条消息,根据你的MQ集群的压测结果来,你可以通过压测看看你的MQ最多可以抗多少QPS,然后就做好限流。
一般来说,限流算法可以采取令牌桶算法,也就是说你每秒钟就发放多少个令牌,然后只能允许多少个请求通过。关于限流算法的实现,不在我们的讨论范围内,大家可以自己查阅一下资料,也并不是很难。
我们这里主要是给大家讲一下,很多互联网大厂其实都会改造开源MQ的内核源码,引入限流机制,然后只能允许指定范围内的消息被在一秒内被处理,避免因为一些异常的情况,导致MQ集群挂掉。

55.设计一套Kafka到RocketMQ的双写+双读技术方案,实现无缝迁移!

假设你们公司本来线上的MQ用的主要是Kafka,现在要从Kafka迁移到RocketMQ去,那么这个迁移的过程应
该怎么做呢?应该采用什么样的技术方案来做迁移呢?
MQ集群迁移过程中的双写+双读技术方案

  1. 一般来说,首先你要做到双写,也就是说,在你所有的Producer系统中,要引入一个双写的代码,让他同时往Kafka和RocketMQ中去写入消息,然后多写几天,起码双写要持续个1周左右,因为MQ一般都是实时数据,里面数据也就最多保留一周。
  2. 当你的双写持续一周过后,你会发现你的Kafka和RocketMQ里的数据看起来是几乎一模一样了,因为MQ反正也就保留最近几天的数据,当你双写持续超过一周过后,你会发现Kafka和RocketMQ里的数据几乎一模一样了。
  3. 但是光是双写还是不够的,还需要同时进行双读,也就是说在你双写的同时,你所有的Consumer系统都需要同时从Kafka和RocketMQ里获取消息,分别都用一模一样的逻辑处理一遍。只不过从Kafka里获取到的消息还是走核心逻辑去处理,然后可以落入数据库或者是别的存储什么的,但是对于RocketMQ里获取到的消息,你可以用一样的逻辑处理,但是不能把处理结果具体的落入数据库之类的地方。
  4. 你的Consumer系统在同时从Kafka和RocketMQ进行消息读取的时候,你需要统计每个MQ当日读取和处理的消息的数量,这点非常的重要,同时对于RocketMQ读取到的消息处理之后的结果,可以写入一个临时的存储中。
  5. 同时你要观察一段时间,当你发现持续双写和双读一段时间之后,如果所有的Consumer系统通过对比发现,从Kafka和RocketMQ读取和处理的消息数量一致,同时处理之后得到的结果也都是一致的,此时就可以判断说当前Kafka和RocketMQ里的消息是一致的,而且计算出来的结果也都是一致的。
    6.这个时候就可以实施正式的切换了,你可以停机Producer系统,再重新修改后上线,全部修改为仅仅写RocketMQ,这个时候他数据不会丢,因为之前已经双写了一段时间了,然后所有的Consumer系统可以全部下线后修改代码再上线,全部基于RocketMQ来获取消息,计算和处理,结果写入存储中。基本上对于类似的一些重要中间件的迁移,往往都会采取双写的方法,双写一段时间,然后观察两个方案的结果都一致了,你再正式下线旧的一套东西。

相关文章

网友评论

      本文标题:消息中间件实战(下)

      本文链接:https://www.haomeiwen.com/subject/ussgfhtx.html