昨天女朋友去阿里面试,哭着回来跟我说挂了,被消息队列完虐了,叫我给她恶补一波消息队列。这还不简单!立马安排!奥利给!
Jesse :乖,宝贝别哭,先说说你是挂在哪几个点上的呢?女朋友:前面答的还可以,消息队列基本概念,模型。后面的夺命四问,我当时就人傻了。就这四个!
如何保证消息不丢失?
如何处理重复消息?
如何保证消息的有效性?
如何处理消息堆积?
不难不难,只是你没准备好啦,下面就系统的给你补习一波消息队列!
什么是消息队列
消息队列就是进程之间或者线程之间用来通信的一个队列组件,也就是我们常说的消息中间件,但是它的存在并不仅仅只是为了解决通信的问题。
为什么需要消息队列
因为互联网的快速发展,业务不断扩展,导致由以前的单体架构升级到现在的微服务架构,很多服务之间相互调用依赖,所以我们需要消息队列来进行服务之间的解耦、控制资源合理使用以及缓冲流量洪峰等等。
前面刚说过,消息队列的存才并不仅仅为了解决通信的问题,更重要的,它主要是用来实现异步处理、服务解耦、流量控制。
下面就分别讲讲这三个核心作用到底是什么
异步处理
随着业务的不断扩展,你会发现请求链路越来越长,请求链长了,响应速度也就慢了,所以需要用消息队列实现异步处理,从而减少处理请求的等待时间,让服务异步并发处理,提升系统的总体性能。
下面以一个库存、下单、短信、积分服务为例,用流程图的方式让你更好的理解一下什么是异步处理
服务解耦
看上面的例子,订单服务下游还有短信服务和积分服务,如果现在还要加营销服务和数据分析服务,甚至更多的服务,为了迎合下游服务的改动,订单服务需要经常修改,这订单服务项目组也太难受了吧!
所以就需要使用消息队列来进行对服务之间关系的解耦,订单服务把相关信息放到消息队列里面,下游的服务谁需要谁就订阅订单服务主题,这样订单服务项目组就会轻松很多了啦!
流量控制
在并发量级很大的情况下,比如秒杀活动爆发式的流量打入后台,后台很可能会顶不住,所以就需要一个中间件,先将请求全部放入消息队列,然后后台尽自己最大的能力去处理消息队列中的请求,等待超时的请求可以直接返回错误。这种就属于生产者生产过快的情况。
有一些请求不需要实时响应,但是业务复杂度很高,逻辑流程链很长,如果实时处理需要耗费很多时间。那么就可以将请求放入消息队列,然后后台服务根据自己的节奏对消息队列中的请求进行处理。何种就属于消费者消费过慢的情况。
上面这两种情况,消息队列都可以在其中发挥很好的缓冲作用。
消息队列的两种模型
队列模型
生产者向队列中发送消息,一个队列可以有多个生产者生产的消息,也可以有多个消费者,但是每个消费者之间存在竞争关系,每一条消息只能由一个消费者消费。
发布/订阅模型
如果想让一条消息被多个消费者消费,该怎么办呢?那么发布/订阅模式派上用场了,该模型是将一个消息发布到主题(topic)中,所有订阅了这个主题的消费者都可以消费这个主题的消息。
注:这里所说的一条消息被多个消费者消费,有一个前提,这多个消费者不属于同一个消费者,同一个消息只能由同一个消费组中的某一个消费者去消费,这里说的多个消费者消费同一个消息,是因为可以有多个消费组都订阅了这个主题,消费组的概念,下面会讲到。
女朋友:你干嘛说这么抽象啦,人家不理解!呜呜呜。。。
Jesse :你可真呆萌呢!那就给你举个超级简单的例子,你就理解啦!
QQ你肯定用过,发布/订阅模型就相当于我跟你都加入了同一个群聊,这个群里还有很多人,我在群里发一条消息,群里的所有人都可以看到这条消息。而队列模型就相当于我跟你私聊,我给你发了一条消息,只有你看得到,其他人是看不到的(前提是没有人偷看你的聊天窗口哈)。
女朋友 : 那我用一个QQ给多个人私聊发同样的消息那不就也可以实现一条消息被多个消费者接受嘛!也就是用队列模型实现一个消息被多个消费者消费。
是的没错,可以让多个队列全量存储相同的信息,也就是数据的冗余可以实现一条消息被多个消费者消费。像我们常用的RabbitMQ就是使用队列模型,通过交换机(Exchange)来把消息发送给多个队列,从而以队列模型实现一条消息被多个消费者消费。
女朋友 : 那是不是如果QQ群里面只有我们两个人,我给你发消息,只有你一个人接收的到,这种情况发布/订阅模型不就跟队列模型一样了嘛!
Jesse : 是的呢!你可真是个聪明的小仙女呢!下面给你来个小小的总结,加深一下你的理解!
小结一下下
队列模型一条消息只能被一个消费者消费,发布/订阅模型一条消息可以被多个消费者消费。当然队列模型也可以通过把信息全量存储在多个队列中的方式来实现一条消息被多个消费者消费,但是会存在数据的冗余。
发布/订阅模型是兼容队列模型的,也就是只有一个消费者的情况下发布/订阅模型跟队列模型是一样的。
RabbitMQ采用队列模型,RocketMQ和Kafka采用的是发布/订阅模型。
Jesse : 为了后面的东西你能听懂,再给你补一下消息队列中的一些必备基础,就以发布/订阅模型为例
必备基础
发送消息方叫做生产者(Producer),接收并消费消息方叫做消费者(Consumer),消息队列服务端是Broker。消息从Producer发送至Broker,Broker将消息存储在本地,然后Consumer从Broker中获取消息或者Broker把消息推送给Consumer,然后由Consumer消费消息。
为了提高并发度,发布/订阅模型还会存在队列和分区的概念,消息就是发送到一个主题(topic)下的某个队列或者某个分区的,RocketMQ中叫做队列,Kafka中叫做分区。
比如一个主题下面有5个队列,那么这个主题的并发度就是5,也就是同时可以有5个消费者并行消费该主题下的消息。一般可以采用轮询或者key hash取余的策略把同一个主题下的消息分配到不同的队列中。
消费者还有组的概念,也就是消费组(Consumer Group),所有的消费者都是属于某个消费组的,同一条消息会发送给订阅了这个主题的多个消费组。比如现在有两个消费组,Group 1和Group 2,他们都订阅了主题Topic a,此时有一条消息发送到了topic a,那么Group 1和Group 2都可以接收到这条消息。这条消息是写入Topic a中的某个队列的,消费组中会有一个消费者去消费这条消息。
每个消费组都会有自己的一个消费点(offset)来表示消费到的位置,在消费点之前的消息说明这个消息已经被同一个消费组和其他消费者消费过了,如果这个offset是队列级别的,每个消费者都会维护订阅的Topic下的每个队列的offset。来看张图吧!
看看女朋友当时崇拜的小眼神!
Jesse : 咳咳,不要这么看着我啦,会害羞的,基本操作而已!下面我就来教你如何回答面试官的那几个问题!
如何保证消息不丢失
对于常见的消息队列,一般只要配置得当,是不会丢失消息的。先看看这个关系图:
一共有生产消息、存储消息、消费消息三个阶段,下面就以这三个阶段入手看看如何保证消息不丢失。
生产消息
生产者发送消息至Borker,需要对Broket的响应进行处理,不管是同步发送消息还是异步发送消息,都需要做好try-catch对Broket的响应做出妥善的处理,如果Broket返回写入消息失败等其他错误信息,生产者需要重新发送消息至Broket,如果多次发送失败则需要报警并且记录日志。这样可以保证消息在生产阶段不会丢失。
存储消息
存储消息阶段需要等待消息写入磁盘(消息刷盘)之后再给生产者做出响应。因为如果消息仅仅写入了内存就给生产者响应的话,这个时候如果断电导致机器停了,那么消息也就没了,但是生产者却以为消息已经存储成功了。
如果Borket是集群部署,有多副本机制,那么消息不仅仅要写入当前Broket,还需要写入副本机器中,那么必须等待消息写入两台机器之后再给生产者做出相应,这样就可以保证消息在存储阶段不丢失了。
如果两台机器都挂了呢?那就加!三台!四台!五台!如果整个机房地震了,全挂呢咋办!先关心一下人有没有事好伐!这种情况,异地部署吧!没招!不要杠精说整个地球地震了咋办!
消费消息
消费者拿到消息之后,等他真正执行完逻辑之后,也就是处理完消息之后,再给Borket做出相应,如果在消费者刚拿到消息就做相应的话,消费者宕机了,那这消息就没了!这样就可以保证消息在消费阶段不会丢失了。
小结一下
保证消息不丢失,需要三方共同配合:
生产者需要做好对Borket相应的处理,异常处理、错误重试、报警机制、日志记录。
Broket需要把握好相应的时机,在单机情况下等待消息刷盘之后再做相应,集群部署多副本情况下,保证消息发送至两个以及以上副本的时候再做相应。
消费者需要真正处理完消息之后(完成所有业务逻辑处理)在给Broket进行相应。
注:以上这些操作,虽然是保证了消息的可靠性,保证了消息不丢失,但是程序性能会降低很多。
如何处理重复消息
Jesse : 在讲这个问题之前,我觉得还是有必要先说一下,消息重复是怎么出现的
上一个问题已经说过,为了保证消息不丢失,在生产消息阶段生产者需要等待Broket的响应,如果在Briket做出相应的时候,网络出现问题,导致生产者没有收到相应,那么生产者会再次重复发送,这样Broket中就会有两条重复的消息了!
再看消费消息阶段,如果一个消费者已经处理完了消息,业务逻辑已经处理完,事物也提交了,准备更新Consumer offset了,就在这时消费者挂了,也就是更新Consumer offset还没更新成功,所以此时另一个消费者还是会拿到刚才那条消息再重复执行一次,这就造成了消息重复消费!
Jesse : 从正常的业务流程来看,消息重复好像是不可避免的,因为我们总不能为了解决消息重复的问题又导致消息丢失的问题吧!那么我们换个角度去思考这个问题。既然我们不能避免消息的重复,那么我们就对因为消息重复带来的业务上面的影响进行处理,关键点就在于幂等。
什么叫做幂等
学过数学的都知道,这是一个数学的概念。但是在代码的角度来思考,何为幂等,其实我们可以这样简单理解,幂等就是使用同样的参数多次调用同一个接口和一次调用同一个接口执行结果是一样的、举个例子:
执行一条SQL语句:update table money = 100 where id = 1 and money = 50;
对于这条SQL语句,不管执行多少次,money的值都是150,这个就叫做幂等。
几个常用的实现幂等的套路
我们现在要做的,就是对业务逻辑进行改造处理,实现幂等,从而保证就算消息重复了也不会影响最终的结果。怎么做呢?常见的套路主要有以下这些:
提交结果的时候,做一个前置的判断,就比如上面的money=50。
加一个version,利用版本号机制,对比消息中的版本号和数据库中的版本号是否相等。
使用数据库的约束列,比如唯一键。
记录关键的key,比如保存订单处理数据的时候,如果有重复消息,就先判断一下这个ID的订单是否已经被处理过了,如果没有被处理过再进行后面的业务逻辑。
方法绝不仅仅只有这些,如何实现还是要看业务的具体细节,根据业务逻辑而定。
如何保证消息的有序性
有序性分为全局有序和部分有序。
全局有序
必须只有一个生产者往Topic中发送消息,并且Topic中只有一个队列(分区),消费者必须单线程消费这个队列(分区)中的消息,这样消息就是全局有序的,但是一般我们不需要全局有序。
部分有序
绝大多数时候我们都是使用部分有序,将Topic内部划分成我们指定数量的队列(分区),然后通过特定的策略将消息发送给指定的队列,然后每个队列对应一个单线程的消费者去消费队列中的信息,这样既可以实现消息的部分有序,还可以通过Topic中队列的数量提高并发处理消息的效率。
注:这里的生产者也可以是一个,只要根据特定策略将同类消息发送到指定队列即可。
如何处理消息堆积
消息堆积往往是因为生产者的生产能力和消费者的消费能力不匹配造成的,有可能是因为消费者消费消息出现错误反复重试导致,也有可能是消费者消费能力太弱导致。要解决消息堆积的问题,首先要确定消费慢的原因,如果是因为消费错误导致反复重试,那么就先解决代码中的bug。如果是因为消费者消费能力太弱,就对消息处理的业务逻辑代码进行优化,如果优化之后还是消费慢,那么就进行扩容,也就是增加Topic中队列的数量和消费者的数量。
注:增加一个消费者就必须增加一个队列,否则消费者是没有东西消费的,在同一个Topic中,一个队列只会分配给一个消费者。
Jesse : 怎么样,现在清楚了吧!其实这些问题,都很基础啦,面试官并没有刁难你哦!
Jesse : 好了!赶快去复习吧!明天你还要去腾讯面试呢!
作者:Yz_Jesse
原文链接:https://blog.csdn.net/w1453114339/article/details/107413744
网友评论