美文网首页Go语言
Go之一步步学习RabbitMQ(一)

Go之一步步学习RabbitMQ(一)

作者: 灰常出色 | 来源:发表于2019-11-05 09:02 被阅读0次

    本文来自公众号:灰子学技术,

    原文链接:https://mp.weixin.qq.com/s/YDtJzDYn7EMeoupRFwrulg

    写在前面的话:最近笔者在学习RabbitMQ,便尝试着通过下面的学习过程,来尽量还原RabbitMQ为什么如此设计,以及它是如何解决这些问题的。当中如有不对或者理解偏差的地方,还请大家不吝赐教,多多留言。如果你觉得这篇文章真的帮到了你,还请你顺手转发下。


    背景知识:

    在学习RabbitMQ之前,我们需要对下面的知识有些概念,

    生产者(producer):产生并发送消息的程序。

    队列(queue):存在RabbitMQ中的邮筒,虽然消息是在应用程序和RabbitMQ中进行传递,但队列才是唯一能够存储消息的地方。队列的大小取决于宿主机器的内存和磁盘容量,它本质上是一个巨大的消息缓存池。多个生产者可以发送消息给同一个队列,多个消费者也可以从同一个队列中读取消息。这个队列有一个特点,先进先出。

    消费者(consuming):等待接收消息的程序。

    参考知识:

    消息队列基础知识,还请参考笔者的另外两篇文章:

    https://mp.weixin.qq.com/s/uFL6a52FwAAneSJ4GniP5Q

    https://mp.weixin.qq.com/s/F0DbjgavwH3MUmPlRc9sDg

    Go 语言中与RabbitMQ交互的客户端包go-amqp:

    https://mp.weixin.qq.com/s/ALjCxEGrNOBjGV7vn5_SHQ


    问题一:RabbitMQ如何解决生产者生产过快,消费者消费过慢的问题?

    在看这个问题之前,我们先看下这个问题:网络中,如果一个机器(producer)想把数据发送给另外一台机器(consumer),那么它应该怎么做?

    答案是:它们之间需要建立一个连接,如下图所示,这样貌似就解决了生产者与消费者之间传递数据的问题。不过这样以来producer与consumer之间就绑定了,这个连接也要一直存在,要不然它们之间就没有办法通讯。

    如果它们都很闲,或者它们的处理速度差不多(备注:生产者生产数据的速度和消费者消费数据的速度相当)的情况下,这都不是问题。

    可是,一旦生产者生产数据过快,或者消费者消费数据过慢,这样就会出问题,生产者产生的数据没有办法被及时处理完。这样就会导致这些数据被丢弃掉,或者生产者只能暂时停止继续生产数据,但是生产者又被绑死在这个消费者上面,也没有办法去干别的事情。

    要解决上面的问题,我们该怎么办呢?一般有两种办法:

    方法一:新增消费者并让生产者与它再建立连接,然后生产者自己决策如何给这么多的消费者分配数据。这样的话会有两个结果:

    第一,生产者需要与另外一个消费者再建立一条连接。第二,生产者需要自己添加数据分发策略,这样会导致生产者的逻辑变得复杂了很多。

    方法二:将生产者产生的数据放到缓存中(也就是消息队列中),而消费者也从这个缓存中获取数据,如下图所示,这也是RabbitMQ的实现方式。这样的话,会有两个好处:

    第一,生产者不需要与消费者绑定,它们只需要与消息队列绑定就好了,生产者和消费者成功完成解耦操作。第二,生产者和消费者的速度,可以不一致,就算生产者很快,消费者很慢也没有问题,只要它们能够保证消息队列不满的话,消费者就可以慢慢处理,生产者可以不停的去生产数据。

    下面我们来看一下go-amqp例子,是如何实现的这一步操作:

    左边是生产者的核心代码部分,右边是消费者的核心代码部分。

    运行的时候,我们需要按照下面的步骤来操作:

    首先,启动rabbitmq服务器

    $ rabbitmq-server

    ........

    Starting broker... completed with 6 plugins // 表示启动成功

    其次,启动消费者

    $ ./receive

    最后,启动生产者,分别发送三次数据给rabbitmq-server

    $ ./send hello    2019/11/03 16:32:45 [x] Sent hello

    $ ./send world   2019/11/03 16:32:53 [x] Sent world

    $ ./send I love U  2019/11/03 16:33:13 [x] Sent I love U

    说明:通过上面的消费者的输出,我们可以看出,生产者每生产一个数据,消费者都会立即取走一个数据进行处理。

    问题二:RabbitMQ如何解决多个消费者调度的问题?

    当一个消费者怎么都处理不过来的时候,最终还是应该新增消费者来处理,如下图所示。在新增消费者的时候后,RabbitMQ的优势就体现出来了,新增消费者的时候,消费者只是与消息队列建立了新的连接,并且也不会增加生产者的代码复杂度。

    不过这样也带来了一个新的问题:消息队列怎么决定,同一时刻哪一个消费者来消费这个消息?

    RabbitMQ最简单的方式就是时间轮询策略,也就是保证队列先进先出,本时刻哪一个消费者来消息数据,就给到哪一个消费者。

    下面是多个消费者调度的展示例子, 我们启动两个消费者,一个生产者,如下图所示:
    消费者一:$ ./receive

    消费者二:$ ./receive

    生产者:

    备注:通过消费者一和二输出的结果来看,对于生产者生产的数据,两个消费者按照时间顺序,依次轮询输出。

    问题三:RabbitMQ如何保证消息队列中的数据,确实被消费者已经处理掉了?

    在真实的网络中,网络往往不可靠。也就是说有可能会存在消息被消费者拿走之后,因为网络原因导致消息并没有真正发送到消费者。

    RabbitMQ采用的是消息确认机制,也就是消费者取走消息之后,在处理完了这个消息,需要要主动发送ACK给消息队列,消息队列在收到这个ACK之后,才可以删除这个消息。例子如下所示:

    消息确认机制的代码,只要是在对消费者设置的时候,auto-ack设置成false,也就是需要消费者主动回复Ack。

    1. 消费者主动回复ACK的过程,与上面的例子类似,并无特别之处。

    2. 消费者不回复ACK消息,会发现生产者发送的消息,一直在rabbitmq-server上面保留着,只要有消费者启动,就会将这些数据再消费一次。

    生产者发送的消息内容:

    消费者消费了第一和第二次数据:

    不过,这也要求消息队列必须在ACK回来的这段时间内保证不删除,可是如果ACK一直不来呢?

    这样就会导致这个消息一直放在消息队列中不被处理,进而导致RabbitMQ上面的内存泄漏。

    我们先来看下消息丢失的场景,一般有三种:第一种,消费者真的就没有回复;第二种,消费者回复了,但是网络原因给丢弃了;第三种,网络断开或者连接关掉。第一种和第三种最为常见,第二种,其实并不是很常见。

    第一种情况,其实是消费者那端的代码问题,需要消费者修复才行。

    第二种情况,往往是因为使用的底层通讯库有bug导致的,因为在连接不断开的前提下,只要消息发出去了,TCP协议会保证消息到达对端的。RabbitMQ并没有对这种场景做处理,因为RabbitMQ并不知道,这个消息是消费者丢失了,还是网络丢失了,当然了它也不应该关心这也业务场景。不过在设计的时候,我们到可以让消费者根据自身业务,添加超时处理机制,例如:消费者在长时间得不到RabbitMQ新的消息的时候,可以尝试去重发上一个消息的ACK消息。

    第三种情况,RabbitMQ在监测到网络断开或者连接关掉的时候,会主动将这个消息再一次放回到消息队列里面,让后续消费者可以再取一次消息。

    问题四:RabbitMQ如何保证消费者处理的公平性?

    上面讨论的消息内容都是相同或者相似大小的情况下,一旦者消息的大小不同,在RabbitMQ的轮询策略下,就很有可能导致大任务的消息被分配给同一个消费者,导致这个消费者很忙,而其他的消费者却比较闲。

    基于这个问题,RabbitMQ采用公平策略做了处理,大体就是在消费者没有将分配到的消息处理完的时候,不在分配新的消息给他,这样就能够让闲一点的消费者去消息队列继续拿新的消息,而忙的消费者一心一意的处理拿到的这个大任务消息。例子如下所示:

    代码主要涉及在消费者里,新增的ch.Qos中prefetch count的设定,我们这里设定的数值是1,也就是当消费者拿走数据之后,一直没有回复ACK给rabbitmq-server,那么rabbitmq-server就一直不在给这个消费者分配新的消息。

    消费者一,拿到了一个处理时间比较久的数据,所以一直在处理这个消息。

    消费者二,拿到了比较短的数据,所以可以很快的处理完,便可以很快的分配到别的数据。

    问题五:一旦RabbitMQ挂掉了,该怎么办呢?

    基于这个问题,RabbitMQ也做了处理,叫做消息持久化,在RabbitMQ挂掉之前的那些消息队列中的消息,它都会存到硬盘里面,等到RabbitMQ重启之后,会将这些数据重新恢复出来。

    当然,对于生产者已经发送,却没有收到确认的消息,需要生产者单独做异常处理。

    这一部分操作代码里面主要是一个消息持久化flag的设定,生产者和消费者里面都需要设置,对于效果的展示这里就不做介绍了。

    总结:

    本文只是对rabbitmq的基本使用,碰到的问题以及解决方法做了详解和举例说明,希望对你有所帮助。对于rabbitmq的路由部分,是另外一类内容,笔者会在后面一篇给出。


    文章会在灰子学技术公众号首发,如果你觉得文章对你有帮助,还请关注“灰子学技术”。

    灰子学技术:

    相关文章

      网友评论

        本文标题:Go之一步步学习RabbitMQ(一)

        本文链接:https://www.haomeiwen.com/subject/nectbctx.html