美文网首页
RocketMQ学习笔记-常用分布式环境问题解决方案

RocketMQ学习笔记-常用分布式环境问题解决方案

作者: jhon_11 | 来源:发表于2021-12-08 21:39 被阅读0次

简介

本章主要介绍大型互联网企业使用mq如何解决分布式环境下的常见问题

image.png

rocketMQ确保消息不丢失

rocketMQ可以在生产端生产消息的时候带上消息序号,并且序号是连续递增的。在消费端进行消费的时候可以使用分布式计数器(redis)来存储上一次处理成功的消息序号,每次进行消费到消息的业务逻辑处理时进行判断当前消息的序号是否符合预期(类似于cas算法)
rockeMQ可以在发消息的时候使用拦截器生成序号,在消费消息的时候有消费拦截器进行处理,无需侵入消费的业务代码。

rocketMQ可靠性投递保证

一条消息从生产到消费完成这个过程可以分为三个阶段:

  • 生产阶段:在这个阶段,从消息在Producer创建出来,经过网络传输发送到broker端
  • 存储阶段:在这个阶段,消息在Broker端存储,如果是集群,消息会在这个阶段被复制到其他副本(slave)上
  • 消费阶段:在这个阶段,consumer从broker上拉取消息,经过网络传输发送到consumer上

生产阶段

在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠性投递,当在代码中调用发送方法时,消息队列的客户端会把消息发送到broker,broker收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常发送消息的发送.
只要Producer收到了broker的确认响应就可以保证消息在生产阶段不会丢失,

存储阶段

在存储阶段正常的情况下,只要broker在正常运行,就不会出现消息丢失的问题,但如果broker出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢消息的

如果对消息的可靠性要求非常高,可以通过配置broker参数来避免因为宕机丢消息。

对于单个节点的broker,需要配置broker参数,在收到消息后,奖消息写入磁盘后再给procducer返回确认响应,这样即使繁盛宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费,例如,在rockerMq中,需要将刷盘方式改为同步刷盘

如果broker是由多节点组成的集群,需要将broker集群配置成:至少将消息发送到两个以上的节点,在给客户端回复发送确认响应。这样当某个broker宕机后,其他的broker可以替代宕机的broker,也不会发生消息丢失。

消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从Broker拉取消息后,执行用户的消费业务逻辑,成功后,才会给Broker发送消费确认响应。如果Broker没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确认消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失
在编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认

rocketMQ如何避免重复消费

消费重复的情况必然存在

在MQTT协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

  • At most once:至多一次。消息在传递时,最多会被送达一次。也就是说,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失
  • At least once:至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现
  • Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级

这个服务质量标准不仅适用于MQTT,对所有的消息队列都是适用的。现在常用的绝大部分消息队列提供的服务质量都是At least once,包括RocketMQ、RabbitMQ和Kafka都是这样。也就是说,消息队列很难保证消息不重复

用幂等解决重复消费的问题

一般解决重复消息的办法是,在消费端,让我们消费消息的操作具备幂等性
一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。一个幂等的方法使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。所以,对于幂等的方法,不用担心重复执行会对系统造成任何改变
从对系统的影响结果来说:At least once+幂等消费=Exactly once
几种常用的设计幂等操作的方法:
1)、利用数据库的唯一约束实现幂等
举个例子:将账户X的余额加100元。可以通过改造业务逻辑,让它具备幂等性
首先,可以限定对于每个转账单每个账户只可以执行一次变更操作,最简单的是在数据库建一张转账流水表,这个表有三个字段:转账单ID、账户ID和变更金额,然后给转账单ID和账户ID这两个字段联合起来创建一个唯一约束,这样对于相同的转账单ID和账户ID,表里至多只能存在一条记录
这样,消费消息的逻辑可以变为:在转账流水表中增加一条转账记录,然后再根据转账记录,异步操作更新用户余额即可。在转账流水表增加一条转账记录这个操作中,由于在这个表中预先定义了账户ID转账单ID的唯一索引,对于同一个转账单同一个账户只能插入一条记录,后续重复的插入操作都会失败,这样就实现了一个幂等的操作
只要是支持类似INSERT IF NOT EXIST语义的存储类系统都可以用于实现幂等,比如,可以用Redis的SETNX命令来替代数据库中的唯一约束,来实现幂等消费
2)、为更新的数据设置前置条件
另外一种实现幂等的思路是,给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作
比如,将账户X的余额增加100元这个操作并不满足幂等性,可以把这个操作加上一个前置条件,变为:如果账户X当前的月为500元,将余额加100元,这个操作就具备了幂等性。对应到消息队列中的使用时,可以在发消息时在消息体中带上当前的余额,在消费的时候判断数据库中当前余额是否与消息中的余额相等,只有相等才执行变更操作
更加通用的方法是,给数据增加一个版本号属性,每次更新数据前,比较当前数据的版本号是否和消息中的版本号一直,如果不一致就拒绝更新数据,更新数据的同时将版本号+1,一样可以实现幂等更新
3)、记录并检查操作
还有一种通用性最强的实现幂等性方法:记录并检查操作,也称为Token机制或者GUID(全局唯一ID)机制,实现思路:在执行数据更新操作之前,先检查一下是否执行过这个更新操作
具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的ID,消费时,先根据这个ID检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费
但在分布式系统中,这个方法非常难以实现。首先,给每个消息指定一个全局唯一的ID就是一件不那么简单的事情,方法有很多,但都不太好同时满足简单、高可用和高性能,或多或少都要有些牺牲。更加麻烦的是,检查消费状态,然后更新数据并且设置消费状态这三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现Bug

rocketMQ消息积压处理

消息积压的直接原因一定是系统中的某个部分出现了性能问题,来不及处理上游发送的消息,才会导致消息积压

优化性能来避免消息积压

1)、发送端性能优化
对于发送消息的业务逻辑,只需要设置合适的并发和批量大小,就可以达到很多的发送性能
Producer发送消息的过程包括:Producer发送消息给Broker,Broker收到消息返回确认响应。假设这一次交互的平均时延是1ms,这1ms包括了下面这些步骤的耗时:

  • 发送端准备数据、序列化消息、构造请求等逻辑的时间,也就是发送端在网络请求之前的耗时
  • 发送消息和返回响应在网络传输中的耗时
  • Broker处理消息的时延

如果是单线程发送,每次只发送1条消息,那么每秒只能发送1000ms/1ms*1条/ms=1000条消息。无论是增加每次发送消息的批量大小,还是增加并发都能成倍地提升发送性能
比如说,消息发送端主要接收RPC请求处理在线业务,因为所有RPC框架都是多线程支持多并发的,自然就实现了并行发送消息。并且在线业务比较在意的是请求响应时延,选择批量发送会影响RPC服务的时延
如果是一个离线系统,它在性能上更注重整个系统的吞吐量,发送端的数据都是来自于数据库,这种情况就更适合批量发送。可以批量从数据库读取数据,然后批量来发送消息,同样用少量的并发就可以获得非常高的吞吐量
2)、消费端性能优化
使用消息队列的时候,大部分的性能问题都出现在消费端,如果消费的速度跟不上发送生产消息的速度,就会造成消息积压。如果这种性能倒挂的问题只是暂时的,只要消费单的性能恢复之后,超过发送端的性能,那积压的消息是可以逐渐被消化掉的
要是消费速度一直比生产速度慢,时间长了,整个系统就会出现问题,要么,消息队列的存储被填满无法提供服务,要么消息丢失,这对于整个系统来说都是严重故障
在设计系统的时候,一定要保证消费端的消费性能要高于生产端的发送性能
消费端的性能优化除了优化消费业务逻辑之外,也可以通过水平扩容,增加消费端的并发数来提升总体的消费性能。在扩容Consumer的实例数量的同时,必须同步扩容主题中的分区数量,确保Consumer的实例数和分区数量是相等的。如果Consumer的实例数量超过分区数量,这样的扩容是无效的

消息积压了该如何处理.

还有一种消息积压的情况是,日常系统正常运转的时候,没有积压或者只有少量积压很快就消费掉了,但是某一时刻,突然就开始积压消息并且积压持续上涨。这种情况下需要在短时间内找到消息积压的原因,迅速解决问题
能导致积压突然增加,最粗粒度的原因,只有两种:要么是发送变快了,要么是消费变慢了
大部分消息队列都内置了监控的功能,只要通过监控数据,很容易确定是哪种原因。如果是单位事件发送的消息增多,比如说是赶上大促或者抢购,短时间内不太可能优化消费端的代码来提升消费性能,唯一的方法是通过扩容消费端的实例来提升总体的消费能力
如果短时间内没有足够的服务器资源进行扩容,没办法的办法是将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务
还有一种不太常见的情况,通过监控发现,无论是发送消息的速度还是消费消息的速度和原来都没什么变化,这时候需要检查一下消费端是不是消费失败导致的一条消息发福消费这种情况比较多,这种情况也会拖垮整个系统的消费速度

相关文章

网友评论

      本文标题:RocketMQ学习笔记-常用分布式环境问题解决方案

      本文链接:https://www.haomeiwen.com/subject/zmwneltx.html